projection

package
v0.3.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2023 License: Apache-2.0 Imports: 13 Imported by: 5

README

Projections

Package projection provides a framework for building and managing projections.

Introduction

A projection is any type that implements the Target interface. A projection target can apply events to itself to project its state.

package projection

type Target interface {
	ApplyEvent(event.Event)
}

To build the projection state, you can use the Apply() function provided by this package. Each of the provided events will be applied to the target:

package example

import (
	"github.com/modernice/goes/event"
	"github.com/modernice/goes/projection"
)

func example(target projection.Target, events []event.Event) {
	projection.Apply(target, events)
}

The Apply() function also supports the following optional APIs:

Example – Lookup table

This example shows how to implement a lookup table for email addresses of registered users. Each time a "user_registered" event occurs, the lookup table is updated:

package example

import (
	"sync"
	"github.com/google/uuid"
	"github.com/modernice/goes/event"
	"github.com/modernice/goes/helper/pick"
)

// Emails is a thread-safe lookup table for email addresses <> user ids.
type Emails struct {
	mux sync.RWMutex
	users map[string]uuid.UUID // map[EMAIL]USER_ID
	emails map[uuid.UUID]string // map[USER_ID]EMAIL
}

// NewEmails returns the lookup table for email addresses <> user ids.
func NewEmails() *Emails {
	return &Emails{
		users: make(map[string]uuid.UUID),
		emails: make(map[uuid.UUID]string),
	}
}

// UserID returns the id of the user with the given email address.
func (emails *Emails) UserID(email string) (uuid.UUID, bool) {
	emails.mux.RLock()
	defer emails.mux.RUnlock()
	id, ok := emails.users[email]
	return id, ok
}

// Email returns the email address of the user with the given id.
func (emails *Emails) Email(userID uuid.UUID) (string, bool) {
	emails.mux.RLock()
	defer emails.mux.RUnlock()
	email, ok := emails.emails[userID]
	return email, ok
}

// ApplyEvent implements projection.Target.
func (emails *Emails) ApplyEvent(evt event.Event) {
	emails.mux.Lock()
	defer emails.mux.Unlock()

	switch evt.Name() {
	case "user_registered":
		userID := pick.AggregateID(evt)
		email := evt.Data().(string)
		emails.users[email] = userID
		emails.emails[userID] = email
	case "user_deleted":
		userID := pick.AggregateID(evt)
		if email, ok := emails.emails[userID]; ok {
			delete(emails.users, email)
		}
		delete(emails.emails, userID)
	}
}

Scheduling

Given the example above, the lookup table would never be automatically populated. Typically, you want a projection to be updated with every published event within a specified set of events. Using the lookup table as an example, it should be updated on every published "user_registered" and "user_deleted" event. This can be achieved using a continuous schedule.

Continuous

The continuous schedule subscribes to events over an event bus to trigger projection jobs when events of a specified set are published. The projection job can be applied to a projection to update its state.

package example

// ... previous code ...

import (
	"context"
	"github.com/modernice/goes/projection/schedule"
)

func example(bus event.Bus, store event.Store) {
	emails := NewEmails()

	s := schedule.Continuously(bus, store, []string{
		"user_registered",
		"user_deleted",
	})

	errs, err := s.Subscribe(context.TODO(), func(ctx projection.Job) error {
		// Apply the projection job to the projection.
		return ctx.Apply(ctx, emails)
	})

	if err != nil {
		panic(fmt.Errorf("subscribe to projection schedule: %w", err))
	}

	for err := range errs {
		log.Printf("failed to apply projection: %v", err)
	}
}
Debounce

If your application publishes a lot of events in a short amount of time, consider providing the Debounce(time.Duration) option to continuous schedules to improve the performance of your application.

Each time one of the configured events is published, the schedule will wait for the specified duration before triggering a projection job. Additional events that are published during this time will be buffered by the schedule and passed as a unit to the triggered projection job. Should an event be published and buffered during this wait time, the wait timer resets.

TODO: Implement a wait cap.

package example

func example(bus event.Bus, store event.Store) {
	s := schedule.Continuously(
		bus, store, []string{"..."},

		// Debounce projection jobs by 1 second.
		schedule.Debounce(time.Second),
	)
}
Periodic

A periodic schedule triggers projection jobs at a specified interval. Periodic schedules always fetch the entire history of the configured events from the event store to apply to the projections.

package example

// ... previous code ...

func example(store event.Store) {
	emails := NewEmails()

	// Trigger a projection job every hour.
	s := schedule.Periodically(store, time.Hour, []string{
		"user_registered",
		"user_deleted",
	})

	errs, err := s.Subscribe(context.TODO(), func(ctx projectio.Job) error {
		return ctx.Apply(ctx, emails)
	})

	if err != nil {
		panic(fmt.Errorf("subscribe to projection schedule: %w", err))
	}

	for err := range errs {
		log.Printf("failed to apply projection: %v", err)
	}
}

Projection jobs

Jobs are typically created by schedules when triggering a projection update. A job can be applied to projections to update their state by applying the events that are configured in the job. Depending on the schedule that triggered the job, the job may fetch events on-the-fly from the event store when applied onto a projection.

A job provides additional query helpers to extract event and aggregate information from the events in the job. All query functions of a Job use caching to avoid querying the underlying event stream unnecessarily. Jobs are thread-safe, which means that they can be applied concurrently onto multiple projections.

package example

// ... previous code ...

func example(s projection.Schedule) {
	emails := NewEmails()

  errs, err := s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    // Query all events of the job.
    events, errs, err := ctx.Events(ctx)

    // Query all events of the job that belong to one of the given aggregate names.
    events, errs, err := ctx.EventsOf(ctx, "user")

    // Query all events of the job that would be applied to the given projection.
    events, errs, err := ctx.EventsFor(ctx, emails)

    // Extract all aggregates from the job's events as aggregate.Refs.
    refs, errs, err := ctx.Aggregates(ctx)

    // Extract all aggregates with one of the given names from the job's events
    // as aggregate.Refs.
    refs, errs, err := ctx.Aggregates(ctx, "user")

    // Extract the first UUID of the aggregate with the given name from the events
    // of the job.
    id, err := ctx.Aggregate(ctx, "user")

    return nil
  })
  if err != nil {
    log.Fatalf("subscribe to projection schedule: %v", err)
  }

  for err := range errs {
    log.Printf("failed to project: %v", err)
  }
}
Startup / initial projections

When subscribing to a schedule, you can provide the Startup(TriggerOption) option to trigger an initial projection update on startup.

package example

func example(s projection.Schedule) {
	errs, err := s.Subscribe(
		context.TODO(),
		func(projection.Job) error { ... },

		// immediately create and apply a projection job
		projection.Startup(), 
	)

	if err != nil {
		// initial projection job failed
	}

	for err := range errs {
		log.Printf("subsequent projection job failed: %v", err)
	}
}
Manually trigger a job

Both continuous and periodic schedules can be manually triggered at any time using the Schedule.Trigger() method.

When triggering a continuous schedule, if no custom event query is provided to the trigger, the entire history of the configured events is fetched from the event store, just like it's done within periodic schedules.

package example

func example(s projection.Schedule) {
	if err := s.Trigger(context.TODO()); err != nil {
		panic(fmt.Error("failed to trigger projection: %w", err))
	}
}

You can override the query that is used by the job's Aggregates() helper to improve its query performance:

package example

func example(s projection.Schedule) {
	s.Subscribe(context.TODO(), func(ctx projection.Job) error {
		// We want to modify the underlying event query of this call.
		refs, errs, err := ctx.Aggregates(ctx)
	})

	s.Trigger(context.TODO(), projection.AggregateQuery(query.New(
		// query "foo" and "bar" events from which the aggregate data
		// will be extracted
		query.Name("foo", "bar"),
	)))
}

Alternatively, you can use the NewJob() constructor to create a job manually without using a schedule:

package example

// ... previous code ...

func example(store event.Store) {
	emails := NewEmails()

	job := projection.NewJob(context.TODO(), store, query.New(
		query.Name("user_registered", "user_deleted"),
	))

	if err := job.Apply(context.TODO(), emails); err != nil {
		panic(fmt.Errorf("apply projection job: %w", err))
	}
}

Extensions

ProgressAware

You can embed the *Progressor type into your projection to make the projection ProgressAware. Such a projection keeps track of its "projection progress" when updated, which

  1. guards the projection from old, already applied events
  2. can optimize the query performance of projection jobs by only querying events that occured after a projection's progress time
  3. allows for simple and performant startup projection jobs
package example

type Foo struct {
	*projection.Progressor
}

func NewFoo() *Foo {
	return &Foo{
		Progressor: projection.NewProgressor(),
	}
}
Guard

If a projection implements Guard, its GuardProjection(event.Event) is called for every event that is about to be applied to the projection, and is only applied if GuardProjection() returns true.

Example – Ecommerce orders

Given an ecommerce app where read models of orders need to be projected for the customers. For brevity, the read model just needs to provide the id and total price of the order. One could implement a projector for the order read models like this:

package example

// ... order aggregate implementation ..

// OrderPlaced is the event data for the "order_placed" event.
type OrderPlaced struct {
	UnitPrice int64
	Quantity int
}

// CustomerOrder is the read model of an order for the customer of the order.
type CustomerOrder struct {
	ID uuid.UUID
	Total int64
}

// NewCustomerOrder returns the customer read model of the order with the given id.
func NewCustomerOrder(id uuid.UUID) *CustomerOrder {
	return &CustomerOrder{
		ID: id,
	}
}

// GuardProjection implements projection.Guard.
func (order *CustomerOrder) GuardProjection(evt event.Event) bool {
	// Only allow events of the order that this read model represents.
	return pick.AggregateID(evt) == order.ID
}

// ApplyEvent implements projection.Target.
func (order *CustomerOrder) ApplyEvent(evt event.Event) {
	switch evt.Name() {
	case "order_placed":
		data := order.Data().(OrderPlaced)
		order.Total += data.UnitPrice * data.Quantity
	}
}

// ProjectCustomerOrders continuously projects order read models for
// customers until ctx is canceled.
func ProjectCustomerOrders(
	ctx context.Context,
	bus event.Bus,
	store event.Store,
) (<-chan error, error) {
	// Create a schedule that is triggered by the "order_placed" event.
	// We debounce the schedule by 1 second which can trigger a single
	// projection job for events of multiple orders.
	s := schedule.Continuously(
		bus, store, []string{"order_placed"},
		schedule.Debounce(time.Second),
	)

	return s.Subscribe(ctx, func(ctx projection.Job) error {
		// Extract the the orders from the events.
		refs, errs, err := ctx.Aggregates(ctx)
		if err != nil {
			return fmt.Error("extract aggregates: %w", err)
		}

		// For each order, create (or fetch) the read model and apply
		// the job to it.
		return streams.Walk(ctx, func(ref aggregate.Ref) error {
			order := NewCustomerOrder(ref.ID) // or fetch it from a repository

			// Simply apply the job to the projection and let the projection
			// guard determine which events are actually applied.
			return ctx.Apply(ctx, order)
		}, refs, errs)
	})
}

The projection guard example above could also be rewritten using the QueryGuard provided by this package:

package example

type CustomerOrder struct {
	projection.Guard

	ID uuid.UUID
	Total int64
}

func NewCustomerOrder(id uuid.UUID) *CustomerOrder {
	return &CustomerOrder{
		// Use an event query as the projection guard.
		Guard: projection.QueryGuard(query.New(query.AggregateID(id))),
		ID: id,
	}
}

Projection service

The projection service allows projections to be triggered from external services / processes. Communication between projection services is done using events.

package service1

func example(reg *codec.Registry, bus event.Bus) {
  // Register the events of the projection service into a registry.
  projection.RegisterService(reg)

  svc := projection.NewService(bus)

  // Given some named schedules
  var schedules map[string]projection.Schedule

  // When registering them in the projection service
  for name, s := range schedules {
    svc.Register(name, s)
  }
}

package service2

func example(bus event.Bus) {
  svc := projection.NewService(bus)

  // Then another service that uses the same underlying event bus can
  // trigger the registered schedules
  err := svc.Trigger(context.TODO(), "foo")
}

Generic helpers

Applying events within the ApplyEvent function is the most straightforward way to implement a projection but can become quite messy if a projection depends on many events.

goes provides type-safe, generic helpers that allow you to setup an event applier function for each individual event. This is what the lookup example looks like using generics:

package example

type Emails struct {
	*projection.Base // implements convenience methods

	mux sync.RWMutex
	users map[string]uuid.UUID // map[EMAIL]USER_ID
	emails map[uuid.UUID]string // map[USER_ID]EMAIL
}

func NewEmails() *Emails {
	emails := &Emails{Base: projection.New()}

	event.ApplyWith(emails, emails.userRegistered, "user_registered")
	event.ApplyWith(emails, emails.userDeleted, "user_deleted")

	return emails
}

func (emails *Emails) userRegistered(evt event.Of[string]) {
	emails.mux.Lock()
	defer emails.mux.Unlock()

	userID := pick.AggregateID(evt)
	email := evt.Data()
	emails.users[email] = userID
	emails.emails[userID] = email
}

func (emails *Emails) userDeleted(evt event.Event) {
	emails.mux.Lock()
	defer emails.mux.Unlock()

	userID := pick.AggregateID(evt)
	if email, ok := emails.emails[userID]; ok {
		delete(emails.users, email)
	}
	delete(emails.emails, userID)
}

Tips

Startup projection jobs

Consider making your projection ProgressAware if you want to use the Startup() option when subscribing to a schedule. When combined with the AggregateQuery() TriggerOption, this can hugely improve the query performance of the initial projection job because the job can optimize its queries when applying a projection job to projection targets, using the progress time provided by the ProgressAware interface. If your projection does not implement ProgressAware, then the initial projection job will query the entire history of the configured events, which – depending on the size of your event store – could take a long time.

Additionally, when providing an optimized query to the AggregateQuery() option, the Aggregates() helper of the projection job can extract the aggregates from using the provided query, instead of the default query that queries the entirety of the configured events.

Projection finalization

Avoid long-running function calls in the event appliers. Move such calls to a finalizer method on your projection and call it after applying the job, like this:

package example

type Foo struct { ... }

func (*Foo) ApplyEvent(event.Event) {}

func (f *Foo) finalize(ctx context.Context, dep SomeDependency) error {
	// do stuff
	if err := dep.Do(ctx, "..."); err != nil {
		return err
	}
	// do more stuff
	return nil
}

func example(s projection.Schedule) {
	var foo Foo
	var dep SomeDependency

	s.Subscribe(context.TODO(), func(ctx projection.Job) error {
		if err := ctx.Apply(ctx, &foo); err != nil {
			return err
		}

		return foo.finalize(ctx, dep)
	})
}

TODO: Implement finalization queue to allow for batch finalization.

Documentation

Index

Constants

View Source
const (
	// Triggered is the event name for triggering a Schedule.
	Triggered = "goes.projection.schedule.triggered"

	// TriggerAccepted is the event name for accepting a trigger.
	TriggerAccepted = "goes.projection.schedule.trigger_accepted"
)

Variables

View Source
var (
	// DefaultTriggerTimeout is the default timeout for triggering a Schedule.
	DefaultTriggerTimeout = 5 * time.Second

	// ErrUnhandledTrigger is returned when trying to trigger a Schedule that
	// isn't registered in a running Service.
	ErrUnhandledTrigger = errors.New("unhandled trigger")
)
View Source
var (
	// ErrAggregateNotFound is returned when trying to extract an aggregateID
	// from a Job's events and none of those Events belong to an aggregate with
	// that name.
	ErrAggregateNotFound = errors.New("aggregate not found in events")
)

Functions

func Apply

func Apply(proj Target[any], events []event.Event, opts ...ApplyOption)

Apply applies events to the given projection.

If the projection implements Guard, proj.GuardProjection(evt) is called for every event to determine if the event should be applied to the projection.

If the projection implements ProgressAware, the time of the last applied event is applied to the projection by calling proj.SetProgress(evt).

func ApplyStream added in v0.1.2

func ApplyStream(target Target[any], events <-chan event.Event, opts ...ApplyOption)

ApplyStream applies events to the given projection.

If the projection implements Guard, proj.GuardProjection(evt) is called for every event to determine if the event should be applied to the projection.

If the projection implements ProgressAware, the time of the last applied event is applied to the projection by calling proj.SetProgress(evt).

func RegisterService

func RegisterService(r codec.Registerer)

RegisterService register the projection service events into an event registry.

Types

type ApplyOption

type ApplyOption func(*applyConfig)

ApplyOption is an option for Apply.

func IgnoreProgress

func IgnoreProgress() ApplyOption

IgnoreProgress returns an ApplyOption that makes Apply ignore the current progress of a projection so that it applies events to a projection even if an event's time is before the progress time of the projection.

type Base added in v0.1.2

type Base struct {
	// contains filtered or unexported fields
}

Base is a simple implementation of a projection that allows registering event handlers and applying events to registered handlers. Handlers can be registered for specific event names using the RegisterEventHandler method, and events can be applied using the ApplyEvent method. RegisteredEvents returns a slice of event names that have registered handlers in the Base projection.

func New added in v0.1.2

func New() *Base

New creates and returns a new Base projection with an empty appliers map for registering event handlers.

func (*Base) ApplyEvent added in v0.1.2

func (a *Base) ApplyEvent(evt event.Event)

ApplyEvent applies the given event to the Base projection by calling its registered event handler, if one exists for the event name. If no handler is registered for the event name, ApplyEvent does nothing.

func (*Base) RegisterEventHandler added in v0.1.2

func (a *Base) RegisterEventHandler(eventName string, handler func(event.Event))

RegisterEventHandler associates the given event handler function with the specified event name in the Base projection. The handler will be called when an event with the matching name is applied to the projection using ApplyEvent method.

func (*Base) RegisteredEvents added in v0.3.5

func (a *Base) RegisteredEvents() []string

RegisteredEvents returns a slice of event names that have registered handlers in the projection.

type Guard

type Guard interface {
	// GuardProjection determines whether an event is allowed to be applied to a projection.
	GuardProjection(event.Event) bool
}

Guard can be implemented by projections to "guard" the projection from illegal events. If a projection implements Guard, GuardProjection(evt) is called for every event that should be applied to the projection to determine if the event is allows to be applied. If GuardProjection(evt) returns false, the event is not applied.

QueryGuard implements Guard.

type GuardFunc

type GuardFunc func(event.Event) bool

GuardFunc allows functions to be used as Guards.

func (GuardFunc) GuardProjection

func (guard GuardFunc) GuardProjection(evt event.Event) bool

GuardProjection returns guard(evt).

type Job

type Job interface {
	context.Context

	// Events queries the events of the job. Any provided filters are applied
	// in-memory to the query result.
	//
	//	var job Job
	//	str, errs, err := job.Events(job)
	//	// handle err
	//	events, err := streams.Drain(job, str, errs)
	//
	// If you need the events that would be applied to a given projection,
	// call EventsFor() instead.
	Events(_ context.Context, filters ...event.Query) (<-chan event.Event, <-chan error, error)

	// EventsOf queries the events that belong to one of the given aggregate names.
	//
	//	var job Job
	//	str, errs, err := job.EventsOf(job, "foo", "bar", "baz")
	//	// handle err
	//	events, err := streams.Drain(job, str, errs)
	EventsOf(_ context.Context, aggregateNames ...string) (<-chan event.Event, <-chan error, error)

	// EventsFor queries the events that would be applied to the given
	// projection when calling Apply().
	//
	//	var job Job
	//	var proj projection.Projection
	//	str, errs, err := job.EventsFor(job, proj)
	//	// handle err
	//	events, err := streams.Drain(job, str, errs)
	EventsFor(context.Context, Target[any]) (<-chan event.Event, <-chan error, error)

	// Aggregates extracts the aggregates of the job's events as aggregate
	// references. If aggregate names are provided, only references that have
	// one of the given names are returned. References are deduplicated, so each
	// of the returned references is unique.
	//
	//	var job Job
	//	str, errs, err := job.Aggregates(job, "foo", "bar", "baz")
	//	// handle err
	//	events, err := streams.Drain(job, str, errs)
	Aggregates(_ context.Context, aggregateNames ...string) (<-chan aggregate.Ref, <-chan error, error)

	// Aggregate returns the id of the first aggregate with the given name that
	// can be extracted from the events of the job. If no event that belongs to
	// this kind of aggregate can be found, an error that satisfies
	// errors.Is(err, ErrAggregateNotFound) is returned.
	Aggregate(_ context.Context, aggregateName string) (uuid.UUID, error)

	// Apply applies the Job to the projection. It applies the events that
	// would be returned by EventsFor(). A job may be applied concurrently to
	// multiple projections.
	Apply(context.Context, Target[any], ...ApplyOption) error
}

Job is a projection job. Jobs are typically created within Schedules and passed to subscribers of those Schedules.

func NewJob

func NewJob(ctx context.Context, store event.Store, q event.Query, opts ...JobOption) Job

NewJob returns a new projection Job. The Job uses the provided Query to fetch the events from the Store.

type JobOption

type JobOption func(*job)

JobOption is a Job option.

func WithAggregateQuery added in v0.1.2

func WithAggregateQuery(q event.Query) JobOption

WithAggregateQuery returns a JobOption that specifies the event query that is used for the `Aggregates()` and `Aggregate()` methods of a job. If this option is not provided, the main query of the job is used instead.

func WithBeforeEvent added in v0.1.2

func WithBeforeEvent(fns ...func(context.Context, event.Event) ([]event.Event, error)) JobOption

WithBeforeEvent returns a JobOption that adds the given functions as "before"-interceptors to the event streams returned by a job's `EventsFor()` and `Apply()` methods. For each received event of a stream, all provided functions are called in order, and the returned events are inserted into the stream before the intercepted event.

func WithFilter

func WithFilter(queries ...event.Query) JobOption

WithFilter returns a JobOption that adds queries as filters to the Job. Fetched events are matched against every Query and only returned in the result if they match all Queries.

func WithReset

func WithReset() JobOption

WithReset returns a JobOption that resets projections before applying events to them. Resetting a projection is done by first resetting the progress of the projection (if it implements ProgressAware). Then, if the Projection has a Reset method, that method is called to allow for custom reset logic.

type ProgressAware added in v0.1.2

type ProgressAware interface {
	// Progress returns the projection progress in terms of the time of the last
	// applied events and the id's of those events. In most cases, Progress()
	// should only a single event id. Multiple event ids should be returned if
	// more than one event was applied at the returned time.
	Progress() (time.Time, []uuid.UUID)

	// SetProgress sets the projection progress in terms of the time of the last
	// applied event. The ids of the last applied events should be provided to
	// SetProgress().
	SetProgress(time.Time, ...uuid.UUID)
}

A ProgressAware projection keeps track of its projection progress in terms of the time and ids of the last applied events. When applying events to a projection with projection.Apply(), only those events with a later time than the current progress time are applied to the projection.

*Progressor implements ProgressAware, and can be embedded in your projections.

type Progressor

type Progressor struct {
	// Time of the last applied events as elapsed nanoseconds since January 1, 1970 UTC.
	LastEventTime int64

	// LastEvents are the ids of last applied events that have LastEventTime as their time.
	LastEvents []uuid.UUID
}

Progressor can be embedded into a projection to implement ProgressAware.

func NewProgressor added in v0.1.2

func NewProgressor() *Progressor

NewProgressor returns a new *Progressor that can be embeded into a projection to implement the ProgressAware API.

func (*Progressor) Progress

func (p *Progressor) Progress() (time.Time, []uuid.UUID)

Progress returns the projection progress in terms of the time and ids of the last applied events. If p.LastEventTime is 0, the zero Time is returned.

func (*Progressor) SetProgress

func (p *Progressor) SetProgress(t time.Time, ids ...uuid.UUID)

SetProgress sets the projection progress as the time of the latest applied event. The ids of the applied events that have the given time should be provided.

type QueryGuard

type QueryGuard query.Query

QueryGuard is a Guard that uses an event query to determine if an event is allowed to be applied to a projection.

type MyProjection struct {
	projection.QueryGuard
}

func NewMyProjection() *MyProjection {
	return &MyProjection{
		QueryGuard: query.New(query.Name("foo", "bar", "baz")), // allow "foo", "bar", and "baz" events
	}
}

func (QueryGuard) GuardProjection

func (g QueryGuard) GuardProjection(evt event.Event) bool

GuardProjection tests the Guard's Query against a given event and returns whether the event is allowed to be applied to the projection.

type Resetter

type Resetter interface {
	// Reset should implement any custom logic to reset the state of a projection.
	Reset()
}

A Resetter is a projection that can reset its state. projections that implement Resetter can be reset by projection jobs before applying events to the projection. projection jobs reset a projection if the WithReset() option was used to create the job.

type Schedule

type Schedule interface {
	// Subscribe subscribes the provided function to the Schedule and returns a
	// channel of asynchronous projection errors. When the Schedule is
	// triggered, a Job is created and passed to subscribers of the Schedule.
	// Errors returned from subscribers are pushed into the returned error
	// channel.
	//
	//	var proj projection.Projection // created by yourself
	//	s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"})
	//	errs, err := s.Subscribe(context.TODO(), func(job projection.Job) error {
	//		return job.Apply(job, proj) // job.Apply applies the appropriate events to the projection
	//	})
	//	// handle err
	//	for err := range errs {
	//		log.Printf("projection failed: %v\n", err)
	//	}
	Subscribe(context.Context, func(Job) error, ...SubscribeOption) (<-chan error, error)

	// Trigger manually triggers the Schedule immediately. A Job is created and
	// passed to every subscriber of the Schedule. Trigger does not wait for the
	// Job to be handled by the subscribers.
	//
	// Reset projections
	//
	// The created Job can be configured to reset projections before applying
	// events to them, effectively rebuilding the entire projection from the
	// beginning (first event):
	//
	//	var s projection.Schedule
	//	err := s.Trigger(context.TODO(), projection.Reset())
	//
	// When a projection implements progressor (or embeds *Progressor), the
	// progress time of the projection is set to 0.
	//
	// When a projection has a `Reset()` method, that method is called to allow
	// for custom reset logic. Implementers of projection should appropriately
	// reset the state of the projection.
	//
	// Custom event query
	//
	// When a Job is created, it is passed an event query to fetch the events
	// for the projections. By default, this query fetches the events configured
	// in the Schedule sorted by time. A custom query may be provided using the
	// Query option. Don't forget to configure correct sorting when providing a
	// custom query:
	//
	//	var s projection.Schedule
	//	err := s.Trigger(context.TODO(), projection.Query(query.New(
	//		query.AggregateName("foo", "bar"),
	//		query.SortBy(event.SortTime, event.SortAsc),
	//	)))
	//
	// Event filters
	//
	// Queried events can be further filtered using the Filter option. Filters
	// are applied in-memory, after the events have already been fetched from
	// the event store. When multiple filters are passed, events must match
	// against every filter to be applied to the projections. Sorting options of
	// filters are ignored.
	//
	//	var s projection.Schedule
	//	err := s.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))
	Trigger(context.Context, ...TriggerOption) error
}

Schedule is a projection schedule.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is an event-driven projection service. A Service allows to trigger Schedules that are registered in Services that communicate over the same event bus.

func NewService

func NewService(bus event.Bus, opts ...ServiceOption) *Service

NewService returns a new Service.

var bus event.Bus
var fooSchedule projection.Schedule
var barSchedule projection.Schedule
svc := NewService(
	bus,
	projection.RegisterSchedule("foo", fooSchedule),
	projection.RegisterSchedule("bar", barSchedule),
)

errs, err := svc.Run(context.TODO())

func (*Service) Register

func (svc *Service) Register(name string, s Schedule)

Register registers a Schedule with the given name into the Service.

func (*Service) Run

func (svc *Service) Run(ctx context.Context) (<-chan error, error)

Run starts the Service is a new goroutine and returns a channel of asynchronous errors, or a single error if the event bus fails to subscribe. When another Service triggers a Schedule with a name that is registered in svc, svc accepts that trigger by publishing a TriggerAccepted event and then actually triggers the Schedule.

func (*Service) Trigger

func (svc *Service) Trigger(ctx context.Context, name string, opts ...TriggerOption) error

Trigger triggers the Schedule with the given name.

Trigger publishes a Triggered event over the event bus and waits for a TriggerAccepted event to be published by another Service. Should the TriggerAccepted event not be published within the trigger timeout, ErrUnhandledTrigger is returned. When ctx is canceled, ctx.Err() is returned.

type ServiceOption

type ServiceOption func(*Service)

ServiceOption is an option for creating a Service.

func RegisterSchedule

func RegisterSchedule(name string, s Schedule) ServiceOption

RegisterSchedule returns a ServiceOption that registers the Schedule s with the given name into a Service

func TriggerTimeout

func TriggerTimeout(d time.Duration) ServiceOption

TriggerTimeout returns a ServiceOption that overrides the default timeout for triggering a Schedule. Default is 5s. Zero Duration means no timeout.

type SubscribeOption added in v0.1.2

type SubscribeOption func(*Subscription)

SubscribeOption is an option for Schedule.Subscribe.

func BeforeEvent added in v0.1.2

func BeforeEvent[Data any](fn func(context.Context, event.Of[Data]) ([]event.Event, error), events ...string) SubscribeOption

BeforeEvent returns a SubscribeOption that registers the given function as a "before"-interceptor for the event streams created by a job's `EventsFor()` and `Apply()` methods. For each received event of a stream that has one of the provided event names, the provided function is called, and the returned events of that function are inserted into the stream before the intercepted event. If no event names are provided, the interceptor is applied to all events.

func Startup added in v0.1.2

func Startup(opts ...TriggerOption) SubscribeOption

Startup returns a SubscribeOption that triggers an initial projection run when subscribing to a projection schedule.

type Subscription added in v0.1.2

type Subscription struct {
	// If provided, the projection schedule triggers a projection job on startup.
	Startup *Trigger

	// BeforeEvent are the "before"-interceptors for the event streams created
	// by a job's `EventsFor()` and `Apply()` methods.
	BeforeEvent []func(context.Context, event.Event) ([]event.Event, error)
}

Subscripion is the configuration for a subscription to a projection schedule.

func NewSubscription added in v0.1.2

func NewSubscription(opts ...SubscribeOption) Subscription

NewSubscription creates a Subscription using the provided options.

type Target added in v0.1.2

type Target[Data any] interface {
	ApplyEvent(event.Of[Data])
}

Target is a projection target. Projections must implement this interface to be used within goes' projection system.

*Base implements Target, and can be embedded into projections to implement this interface.

type Trigger

type Trigger struct {
	// Reset projections before applying events.
	Reset bool

	// If provided, overrides the query that is used to fetch events that are
	// applied to projections.
	Query event.Query

	// If provided, overrides the query that is used to extract events from a
	// projection job. The `Aggregates()` and `Aggregate()` methods of a
	// projection job will use this query. This allows to optimize the query
	// performance of initial projection runs, which often times need to fetch
	// all ids of specific aggregates from the event store in order to get all
	// projections up-to-date.
	AggregateQuery event.Query

	// Additional filters that are applied in-memory to the query result of a
	// job's `EventsFor()` and `Apply()` methods.
	Filter []event.Query
}

A Trigger is used by Schedules to trigger a Job.

func NewTrigger

func NewTrigger(opts ...TriggerOption) Trigger

NewTrigger returns a projection trigger.

func (Trigger) JobOptions added in v0.1.2

func (t Trigger) JobOptions() []JobOption

JobOptions returns the options for a job that is triggered by this trigger.

func (Trigger) Options

func (t Trigger) Options() []TriggerOption

Options returns the TriggerOptions to build t.

type TriggerAcceptedData

type TriggerAcceptedData struct {
	TriggerID uuid.UUID
}

TriggerAcceptedData is the event data for accepting a trigger.

type TriggerOption

type TriggerOption func(*Trigger)

TriggerOption is a Trigger option.

func AggregateQuery added in v0.1.2

func AggregateQuery(q event.Query) TriggerOption

AggregateQuery returns a TriggerOption that sets the AggregateQuery of a Trigger.

The `Aggregates()` and `Aggregate()` methods of a projection job will use this query to extract the aggregates from the projection job.

var s projection.Schedule
err := s.Trigger(context.TODO(), projection.AggregateQuery(query.New(
	query.Name("foo", "bar"), // extract aggregates from "foo" and "bar" events
)))

func Filter

func Filter(queries ...event.Query) TriggerOption

Filter returns a TriggerOption that adds filters to a Trigger.

Filters are applied in-memory, after the events have been fetched from the event store. When multiple filters are configured, events must match against every filter to be applied to projections. Sorting options of queries are ignored.

var s projection.Schedule
err := s.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))

func Query

func Query(q event.Query) TriggerOption

Query returns a TriggerOption that sets the Query of a Trigger.

When a Job is created by a Schedule, it is passed an event query to fetch the events for the projections. By default, this query fetches all events that are configured in the triggered Schedule, sorted by time. A custom query may be provided using the `Query` option. Don't forget to configure correct sorting when providing a custom query:

var s projection.Schedule
err := s.Trigger(context.TODO(), projection.Query(query.New(
	query.AggregateName("foo", "bar"),
	query.SortBy(event.SortTime, event.SortAsc), // to ensure correct sorting
)))

func Reset

func Reset(reset bool) TriggerOption

Reset returns a TriggerOption that resets projections before applying events to them. Resetting a projection is done by first resetting the progress of the projection (if it implements progressor). Then, if the projection has a `Reset()` method, that method is called to allow for custom reset logic.

type TriggeredData

type TriggeredData struct {
	TriggerID uuid.UUID
	Trigger   Trigger
	Schedule  string
}

TriggeredData is the event data for triggering a Schedule.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL