Documentation ¶
Index ¶
- Constants
- Variables
- func Apply(proj EventApplier, events []event.Event, opts ...ApplyOption) error
- func RegisterService(r *codec.Registry)
- type ApplyOption
- type EventApplier
- type Guard
- type GuardFunc
- type HistoryDependent
- type Job
- type JobOption
- type Progressing
- type Progressor
- type QueryGuard
- type Resetter
- type Schedule
- type Service
- type ServiceOption
- type Trigger
- type TriggerAcceptedData
- type TriggerOption
- type TriggeredData
Constants ¶
const ( // Triggered is the event name for triggering a Schedule. Triggered = "goes.projection.schedule.triggered" // TriggerAccepted is the event name for accepting a trigger. TriggerAccepted = "goes.projection.schedule.trigger_accepted" )
Variables ¶
var ( // DefaultTriggerTimeout is the default timeout for triggering a Schedule. DefaultTriggerTimeout = 5 * time.Second // ErrUnhandledTrigger is returned when trying to trigger a Schedule that // isn't registered in a running Service. ErrUnhandledTrigger = errors.New("unhandled trigger") )
var ( // ErrAggregateNotFound is returned when trying to extract an AggregateID // from a Job's Events and none of those Events belong to an aggregate with // that name. ErrAggregateNotFound = errors.New("aggregate not found in events") )
var ( // ErrProgressed is returned when trying to apply an Event onto a Projection // that has a progress Time that is after the Time of the Event. ErrProgressed = errors.New("projection already progressed") )
Functions ¶
func Apply ¶
func Apply(proj EventApplier, events []event.Event, opts ...ApplyOption) error
Apply applies events onto the given projection.
If proj implements guard (or embeds Guard), proj.GuardProjection(evt) is called for every Event evt to determine if the Event should be applied onto the Projection.
If proj implements progressor (or embeds *Progressor), proj.SetProgress(evt) is called for every applied Event evt.
func RegisterService ¶
RegisterService register the projection service events into an event registry.
Types ¶
type ApplyOption ¶
type ApplyOption func(*applyConfig)
ApplyOption is an option for Apply.
func IgnoreProgress ¶
func IgnoreProgress() ApplyOption
IgnoreProgress returns an ApplyOption that makes Apply ignore the current progress of a projection so that it applies Events onto a projection even if an Event's time is before the progress time of the projection.
type EventApplier ¶
An EventApplier applies events onto itself to build the projection state.
type Guard ¶
type Guard interface { // GuardProjection determines whether an Event is allowed to be applied onto a projection. GuardProjection(event.Event) bool }
Guard can be implemented by projection to prevent the application of an events. When a projection p implements Guard, p.GuardProjection(e) is called for every Event e and prevents the p.ApplyEvent(e) call if GuardProjection returns false.
type HistoryDependent ¶
type HistoryDependent interface {
RequiresFullHistory() bool
}
HistoryDependent can be implemented by continuous projections that need the full event history (of the events that are configured in the Schedule) instead of just the events that triggered the continuous projection.
Example:
// A Product is a product of a specific shop. type Product struct { *aggregate.Base ShopID uuid.UUID Name string } // SearchIndex projections the product catalog of a specific shop. type SearchIndex struct { shopID uuid.UUID products []Product initialized bool } func (idx SearchIndex) ApplyEvent(event.Event) { ... } // RequiresFullHistory implements projection.HistoryDependent. If the // projection hasn't been run yet, the full history of the events is // required for the projection. func (idx SearchIndex) RequiresFullHistory() bool { return !idx.initialized } var repo agggregate.Repository s := schedule.Continuously(bus, store, []string{"product.created"}) s.Subscribe(context.TODO(), func(ctx projection.Job) error { str, errs, err := ctx.Aggregates(ctx) done := make(map[uuid.UUID]bool) // SearchIndexes that have been projected return aggregate.WalkRefs(ctx, func(r aggregate.Ref) error { p := &Product{Base: aggregate.New("product", r.ID)} err := repo.Fetch(ctx, p) shopID := p.ShopID if done[shopID] { return nil } done[shopID] = true // Fetch the current SearchIndex for the given shop. // If it does not exist yet, create the SearchIndex. var idx *SearchIndex // if idx.initialized == false, ctx.Apply fetches all past events. if err := ctx.Apply(ctx, idx); err != nil { return err } // Set initialized to true (after the first run of the projection). idx.initialized = true return nil }, str, errs) })
type Job ¶
type Job interface { context.Context // Events fetches all Events that match the Job's Query and returns an Event // channel and a channel of asynchronous query errors. // // var job Job // str, errs, err := job.Events(job) // // handle err // events, err := event.Drain(job, str, errs) // // Optional Queries may be provided as filters for the fetched Events. If // filters are provided, the returned Event channel will only receive Events // that match all provided Queries: // // var job Job // str, errs, err := job.Events(job, query.New(query.Name("foo")), query.New(...)) // // handle err // events, err := event.Drain(job, str, errs) // // If you need the Events for a specific Projection, use EventsFor instead. Events(_ context.Context, filters ...event.Query) (<-chan event.Event, <-chan error, error) // EventsOf fetches all Events that belong to aggregates that have one of // aggregateNames and returns an Event channel and a channel of asynchronous // query errors. // // var job Job // str, errs, err := job.EventsOf(job, "foo", "bar", "baz") // // handle err // events, err := event.Drain(job, str, errs) EventsOf(_ context.Context, aggregateNames ...string) (<-chan event.Event, <-chan error, error) // EventsFor fetches all Events that are appropriate for the given // Projection and returns an Event channel and a channel of asynchronous // query errors. Which Events are queried depends on the Projection: If the // Projection implements guard (or embeds Guard), the Guard's Query is added // as a filter when querying Events. If the Projection implements progressor // (or embeds *Progressor), the progress time of the Projection is used to // only query Events that happened after that time. // // var job Job // var proj projection.Projection // str, errs, err := job.EventsFor(job, proj) // // handle err // events, err := event.Drain(job, str, errs) EventsFor(context.Context, EventApplier) (<-chan event.Event, <-chan error, error) // Aggregates returns a channel of aggregate Tuples and a channel of // asynchronous query errors. It fetches Events, extracts the Tuples from // those Events and pushes them into the returned Tuple channel. Every // unique Tuple is guarenteed to be received exactly once, even if there are // muliple Events that belong to the same aggregate. // // If aggregateNames are provided, they are used to query only Events that // belong to one of the given aggregates. // // var job Job // str, errs, err := job.Aggregates(job, "foo", "bar", "baz") // // handle err // events, err := event.Drain(job, str, errs) Aggregates(_ context.Context, aggregateNames ...string) (<-chan aggregate.Ref, <-chan error, error) // Aggregate returns the UUID of the first aggregate with the given // aggregateName that can be found in the Events of the Job, or // ErrAggregateNotFound if no Event belongs to an aggregate with that name. Aggregate(_ context.Context, aggregateName string) (uuid.UUID, error) // Apply applies the Job onto the Projection. A Job may be applied onto as // many Projections as needed. Apply(context.Context, EventApplier, ...ApplyOption) error }
Job is a projection job. Jobs are typically created within Schedules and passed to subscribers of those Schedules.
type JobOption ¶
type JobOption func(*job)
JobOption is a Job option.
func WithFilter ¶
WithFilter returns a JobOption that adds queries as filters to the Job. Fetched Events are matched against every Query and only returned in the result if they match all Queries.
func WithHistoryStore ¶
WithHistoryStore returns a JobOption that provides the projection job with an event store that is used to query events for projections that require the full event history to build the projection state.
func WithReset ¶
func WithReset() JobOption
WithReset returns a JobOption that resets Projections before applying Events onto them. Resetting a Projection is done by first resetting the progress of the Projection (if it implements progressor). Then, if the Projection has a Reset method, that method is called to allow for custom reset logic.
type Progressing ¶
type Progressing interface { // Progress returns the projection's progress as the Time of the last // applied event. Progress() time.Time // SetProgress sets the progress of the projection to the provided Time. SetProgress(time.Time) }
Progressing makes projections track their projection progress.
Embed *Progressor into a projection type to implement this interface.
The current progress of a projection is the Time of the last applied event. A projection that provides its projection progress only receives events with a Time that is after the current progress Time.
type Progressor ¶
type Progressor struct {
LatestEventTime int64
}
Progressor can be embedded into a projection to implement the Progressing interface.
func (*Progressor) Progress ¶
func (p *Progressor) Progress() time.Time
Progress returns the projection progress in terms of the time of the latest applied event. If p.LatestEventTime is 0, the zero Time is returned.
func (*Progressor) SetProgress ¶
func (p *Progressor) SetProgress(t time.Time)
SetProgress sets the projection progress as the time of the latest applied event.
type QueryGuard ¶
QueryGuard is a Guard that used an event query to determine the events that are allowed to be applied onto a projection.
func (QueryGuard) GuardProjection ¶
func (g QueryGuard) GuardProjection(evt event.Event) bool
GuardProjection tests the Guard's Query against a given Event and returns whether the Event is allowed to be applied onto the projection.
type Resetter ¶
type Resetter interface { // Reset should implement any custom logic to reset the state of a // projection besides resetting the progress (if the projection implements // Progressing). Reset() }
A Resetter is a projection that can reset its state.
type Schedule ¶
type Schedule interface { // Subscribe subscribes the provided function to the Schedule and returns a // channel of asynchronous projection errors. When the Schedule is // triggered, a Job is created and passed to subscribers of the Schedule. // Errors returned from subscribers are pushed into the returned error // channel. // // var proj projection.Projection // created by yourself // s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"}) // errs, err := s.Subscribe(context.TODO(), func(job projection.Job) error { // return job.Apply(job, proj) // job.Apply applies the appropriate events onto the projection // }) // // handle err // for err := range errs { // log.Printf("projection failed: %v\n", err) // } Subscribe(context.Context, func(Job) error) (<-chan error, error) // Trigger manually triggers the Schedule immediately. A Job is created and // passed to every subscriber of the Schedule. Trigger does not wait for the // Job to be handled by the subscribers. // // Reset projections // // The created Job can be configured to reset Projections before applying // events onto them, effectively rebuilding the entire projection from the // beginning (first event): // // var s projection.Schedule // err := s.Trigger(context.TODO(), projection.Reset()) // // When a Projection implements progressor (or embeds *Progressor), the // progress time of the Projection is set to 0. // // When a Projection has a `Reset()` method, that method is called to allow // for custom reset logic. Implementers of Projection should appropriately // reset the state of the Projection. // // Custom event query // // When a Job is created, it is passed an event query to fetch the events // for the Projections. By default, this query fetches the events configured // in the Schedule sorted by time. A custom query may be provided using the // Query option. Don't forget to configure correct sorting when providing a // custom query: // // var s projection.Schedule // err := s.Trigger(context.TODO(), projection.Query(query.New( // query.AggregateName("foo", "bar"), // query.SortBy(event.SortTime, event.SortAsc), // ))) // // Event filters // // Queried events can be further filtered using the Filter option. Filters // are applied in-memory, after the events have already been fetched from // the event store. When multiple filters are passed, events must match // against every filter to be applied to the Projections. Sorting options of // filters are ignored. // // var s projection.Schedule // err := s.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...))) Trigger(context.Context, ...TriggerOption) error }
Schedule is a projection schedule.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is an event-driven projection service. A Service allows to trigger Schedules that are registered in Services that communicate over the same event bus.
func NewService ¶
func NewService(bus event.Bus, opts ...ServiceOption) *Service
NewService returns a new Service.
var bus event.Bus var fooSchedule projection.Schedule var barSchedule projection.Schedule svc := NewService( bus, projection.RegisterSchedule("foo", fooSchedule), projection.RegisterSchedule("bar", barSchedule), ) errs, err := svc.Run(context.TODO())
func (*Service) Run ¶
Run starts the Service is a new goroutine and returns a channel of asynchronous errors, or a single error if the event bus fails to subscribe. When another Service triggers a Schedule with a name that is registered in svc, svc accepts that trigger by publishing a TriggerAccepted event and then actually triggers the Schedule.
func (*Service) Trigger ¶
Trigger triggers the Schedule with the given name.
Trigger publishes a Triggered event over the event bus and waits for a TriggerAccepted event to be published by another Service. Should the TriggerAccepted event not be published within the trigger timeout, ErrUnhandledTrigger is returned. When ctx is canceled, ctx.Err() is returned.
type ServiceOption ¶
type ServiceOption func(*Service)
ServiceOption is an option for creating a Service.
func RegisterSchedule ¶
func RegisterSchedule(name string, s Schedule) ServiceOption
RegisterSchedule returns a ServiceOption that registers the Schedule s with the given name into a Service
func TriggerTimeout ¶
func TriggerTimeout(d time.Duration) ServiceOption
TriggerTimeout returns a ServiceOption that overrides the default timeout for triggering a Schedule. Default is 5s. Zero Duration means no timeout.
type Trigger ¶
type Trigger struct { // Reset projections before applying events. Reset bool // Override the Query that is used to query events from the event store. Query event.Query // Additional filters that are applied in-memory to the query result from // the event store. Filter []event.Query }
A Trigger is used by Schedules to trigger a Job.
func (Trigger) Options ¶
func (t Trigger) Options() []TriggerOption
Options returns the TriggerOptions to build t.
type TriggerAcceptedData ¶
TriggerAcceptedData is the event data for accepting a trigger.
type TriggerOption ¶
type TriggerOption func(*Trigger)
TriggerOption is a Trigger option.
func Filter ¶
func Filter(queries ...event.Query) TriggerOption
Filter returns a TriggerOption that adds filters to a Trigger.
Queried events can be further filtered using the `Filter` option. Filters are applied in-memory, after the events have been fetched from the event store. When multiple filters are configured, events must match against every filter to be applied to projections. Sorting options of filters are ignored.
var s projection.Schedule err := s.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))
func Query ¶
func Query(q event.Query) TriggerOption
Query returns a TriggerOption that sets the Query of a Trigger.
When a Job is created by a Schedule, it is passed an event query to fetch the events for the projections. By default, this query fetches all events that are configured in the triggered Schedule, sorted by time. A custom query may be provided using the `Query` option. Don't forget to configure correct sorting when providing a custom query:
var s projection.Schedule err := s.Trigger(context.TODO(), projection.Query(query.New( query.AggregateName("foo", "bar"), query.SortBy(event.SortTime, event.SortAsc), // to ensure correct sorting )))
func Reset ¶
func Reset(reset bool) TriggerOption
Reset returns a TriggerOption that resets projections before applying events onto them. Resetting a projection is done by first resetting the progress of the projection (if it implements progressor). Then, if the projection has a `Reset()` method, that method is called to allow for custom reset logic.