Documentation ¶
Index ¶
- Constants
- Variables
- func Apply(proj Target[any], events []event.Event, opts ...ApplyOption)
- func ApplyStream(target Target[any], events <-chan event.Event, opts ...ApplyOption)
- func RegisterService(r codec.Registerer)
- type ApplyOption
- type Base
- type Guard
- type GuardFunc
- type Job
- type JobOption
- type ProgressAware
- type Progressor
- type QueryGuard
- type Resetter
- type Schedule
- type Service
- type ServiceOption
- type SubscribeOption
- type Subscription
- type Target
- type Trigger
- type TriggerAcceptedData
- type TriggerOption
- type TriggeredData
Constants ¶
const ( // Triggered is the event name for triggering a Schedule. Triggered = "goes.projection.schedule.triggered" // TriggerAccepted is the event name for accepting a trigger. TriggerAccepted = "goes.projection.schedule.trigger_accepted" )
Variables ¶
var ( // DefaultTriggerTimeout is the default timeout for triggering a Schedule. DefaultTriggerTimeout = 5 * time.Second // ErrUnhandledTrigger is returned when trying to trigger a Schedule that // isn't registered in a running Service. ErrUnhandledTrigger = errors.New("unhandled trigger") )
var ( // ErrAggregateNotFound is returned when trying to extract an aggregateID // from a Job's events and none of those Events belong to an aggregate with // that name. ErrAggregateNotFound = errors.New("aggregate not found in events") )
Functions ¶
func Apply ¶
func Apply(proj Target[any], events []event.Event, opts ...ApplyOption)
Apply applies events to the given projection.
If the projection implements Guard, proj.GuardProjection(evt) is called for every event to determine if the event should be applied to the projection.
If the projection implements ProgressAware, the time of the last applied event is applied to the projection by calling proj.SetProgress(evt).
func ApplyStream ¶ added in v0.1.2
func ApplyStream(target Target[any], events <-chan event.Event, opts ...ApplyOption)
ApplyStream applies events to the given projection.
If the projection implements Guard, proj.GuardProjection(evt) is called for every event to determine if the event should be applied to the projection.
If the projection implements ProgressAware, the time of the last applied event is applied to the projection by calling proj.SetProgress(evt).
func RegisterService ¶
func RegisterService(r codec.Registerer)
RegisterService register the projection service events into an event registry.
Types ¶
type ApplyOption ¶
type ApplyOption func(*applyConfig)
ApplyOption is an option for Apply.
func IgnoreProgress ¶
func IgnoreProgress() ApplyOption
IgnoreProgress returns an ApplyOption that makes Apply ignore the current progress of a projection so that it applies events to a projection even if an event's time is before the progress time of the projection.
type Base ¶ added in v0.1.2
type Base struct {
// contains filtered or unexported fields
}
Base can be embedded into projections to implement event.Handler.
func New ¶ added in v0.1.2
func New() *Base
New returns a new base for a projection. Use the RegisterHandler function to add
func (*Base) ApplyEvent ¶ added in v0.1.2
ApplyEvent implements eventApplier.
type Guard ¶
type Guard interface { // GuardProjection determines whether an event is allowed to be applied to a projection. GuardProjection(event.Event) bool }
Guard can be implemented by projections to "guard" the projection from illegal events. If a projection implements Guard, GuardProjection(evt) is called for every event that should be applied to the projection to determine if the event is allows to be applied. If GuardProjection(evt) returns false, the event is not applied.
QueryGuard implements Guard.
type Job ¶
type Job interface { context.Context // Events queries the events of the job. Any provided filters are applied // in-memory to the query result. // // var job Job // str, errs, err := job.Events(job) // // handle err // events, err := streams.Drain(job, str, errs) // // If you need the events that would be applied to a given projection, // call EventsFor() instead. Events(_ context.Context, filters ...event.Query) (<-chan event.Event, <-chan error, error) // EventsOf queries the events that belong to one of the given aggregate names. // // var job Job // str, errs, err := job.EventsOf(job, "foo", "bar", "baz") // // handle err // events, err := streams.Drain(job, str, errs) EventsOf(_ context.Context, aggregateNames ...string) (<-chan event.Event, <-chan error, error) // EventsFor queries the events that would be applied to the given // projection when calling Apply(). // // var job Job // var proj projection.Projection // str, errs, err := job.EventsFor(job, proj) // // handle err // events, err := streams.Drain(job, str, errs) EventsFor(context.Context, Target[any]) (<-chan event.Event, <-chan error, error) // Aggregates extracts the aggregates of the job's events as aggregate // references. If aggregate names are provided, only references that have // one of the given names are returned. References are deduplicated, so each // of the returned references is unique. // // var job Job // str, errs, err := job.Aggregates(job, "foo", "bar", "baz") // // handle err // events, err := streams.Drain(job, str, errs) Aggregates(_ context.Context, aggregateNames ...string) (<-chan aggregate.Ref, <-chan error, error) // Aggregate returns the id of the first aggregate with the given name that // can be extracted from the events of the job. If no event that belongs to // this kind of aggregate can be found, an error that satisfies // errors.Is(err, ErrAggregateNotFound) is returned. Aggregate(_ context.Context, aggregateName string) (uuid.UUID, error) // Apply applies the Job to the projection. It applies the events that // would be returned by EventsFor(). A job may be applied concurrently to // multiple projections. Apply(context.Context, Target[any], ...ApplyOption) error }
Job is a projection job. Jobs are typically created within Schedules and passed to subscribers of those Schedules.
type JobOption ¶
type JobOption func(*job)
JobOption is a Job option.
func WithAggregateQuery ¶ added in v0.1.2
WithAggregateQuery returns a JobOption that specifies the event query that is used for the `Aggregates()` and `Aggregate()` methods of a job. If this option is not provided, the main query of the job is used instead.
func WithBeforeEvent ¶ added in v0.1.2
WithBeforeEvent returns a JobOption that adds the given functions as "before"-interceptors to the event streams returned by a job's `EventsFor()` and `Apply()` methods. For each received event of a stream, all provided functions are called in order, and the returned events are inserted into the stream before the intercepted event.
func WithFilter ¶
WithFilter returns a JobOption that adds queries as filters to the Job. Fetched events are matched against every Query and only returned in the result if they match all Queries.
func WithReset ¶
func WithReset() JobOption
WithReset returns a JobOption that resets projections before applying events to them. Resetting a projection is done by first resetting the progress of the projection (if it implements ProgressAware). Then, if the Projection has a Reset method, that method is called to allow for custom reset logic.
type ProgressAware ¶ added in v0.1.2
type ProgressAware interface { // Progress returns the projection progress in terms of the time of the last // applied events and the id's of those events. In most cases, Progress() // should only a single event id. Multiple event ids should be returned if // more than one event was applied at the returned time. Progress() (time.Time, []uuid.UUID) // SetProgress sets the projection progress in terms of the time of the last // applied event. The ids of the last applied events should be provided to // SetProgress(). SetProgress(time.Time, ...uuid.UUID) }
A ProgressAware projection keeps track of its projection progress in terms of the time and ids of the last applied events. When applying events to a projection with projection.Apply(), only those events with a later time than the current progress time are applied to the projection.
*Progressor implements ProgressAware, and can be embedded in your projections.
type Progressor ¶
type Progressor struct { // Time of the last applied events as elapsed nanoseconds since January 1, 1970 UTC. LastEventTime int64 // LastEvents are the ids of last applied events that have LastEventTime as their time. LastEvents []uuid.UUID }
Progressor can be embedded into a projection to implement ProgressAware.
func NewProgressor ¶ added in v0.1.2
func NewProgressor() *Progressor
NewProgressor returns a new *Progressor that can be embeded into a projection to implement the ProgressAware API.
func (*Progressor) Progress ¶
func (p *Progressor) Progress() (time.Time, []uuid.UUID)
Progress returns the projection progress in terms of the time and ids of the last applied events. If p.LastEventTime is 0, the zero Time is returned.
func (*Progressor) SetProgress ¶
func (p *Progressor) SetProgress(t time.Time, ids ...uuid.UUID)
SetProgress sets the projection progress as the time of the latest applied event. The ids of the applied events that have the given time should be provided.
type QueryGuard ¶
QueryGuard is a Guard that uses an event query to determine if an event is allowed to be applied to a projection.
type MyProjection struct { projection.QueryGuard } func NewMyProjection() *MyProjection { return &MyProjection{ QueryGuard: query.New(query.Name("foo", "bar", "baz")), // allow "foo", "bar", and "baz" events } }
func (QueryGuard) GuardProjection ¶
func (g QueryGuard) GuardProjection(evt event.Event) bool
GuardProjection tests the Guard's Query against a given event and returns whether the event is allowed to be applied to the projection.
type Resetter ¶
type Resetter interface {
// Reset should implement any custom logic to reset the state of a projection.
Reset()
}
A Resetter is a projection that can reset its state. projections that implement Resetter can be reset by projection jobs before applying events to the projection. projection jobs reset a projection if the WithReset() option was used to create the job.
type Schedule ¶
type Schedule interface { // Subscribe subscribes the provided function to the Schedule and returns a // channel of asynchronous projection errors. When the Schedule is // triggered, a Job is created and passed to subscribers of the Schedule. // Errors returned from subscribers are pushed into the returned error // channel. // // var proj projection.Projection // created by yourself // s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"}) // errs, err := s.Subscribe(context.TODO(), func(job projection.Job) error { // return job.Apply(job, proj) // job.Apply applies the appropriate events to the projection // }) // // handle err // for err := range errs { // log.Printf("projection failed: %v\n", err) // } Subscribe(context.Context, func(Job) error, ...SubscribeOption) (<-chan error, error) // Trigger manually triggers the Schedule immediately. A Job is created and // passed to every subscriber of the Schedule. Trigger does not wait for the // Job to be handled by the subscribers. // // Reset projections // // The created Job can be configured to reset projections before applying // events to them, effectively rebuilding the entire projection from the // beginning (first event): // // var s projection.Schedule // err := s.Trigger(context.TODO(), projection.Reset()) // // When a projection implements progressor (or embeds *Progressor), the // progress time of the projection is set to 0. // // When a projection has a `Reset()` method, that method is called to allow // for custom reset logic. Implementers of projection should appropriately // reset the state of the projection. // // Custom event query // // When a Job is created, it is passed an event query to fetch the events // for the projections. By default, this query fetches the events configured // in the Schedule sorted by time. A custom query may be provided using the // Query option. Don't forget to configure correct sorting when providing a // custom query: // // var s projection.Schedule // err := s.Trigger(context.TODO(), projection.Query(query.New( // query.AggregateName("foo", "bar"), // query.SortBy(event.SortTime, event.SortAsc), // ))) // // Event filters // // Queried events can be further filtered using the Filter option. Filters // are applied in-memory, after the events have already been fetched from // the event store. When multiple filters are passed, events must match // against every filter to be applied to the projections. Sorting options of // filters are ignored. // // var s projection.Schedule // err := s.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...))) Trigger(context.Context, ...TriggerOption) error }
Schedule is a projection schedule.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service is an event-driven projection service. A Service allows to trigger Schedules that are registered in Services that communicate over the same event bus.
func NewService ¶
func NewService(bus event.Bus, opts ...ServiceOption) *Service
NewService returns a new Service.
var bus event.Bus var fooSchedule projection.Schedule var barSchedule projection.Schedule svc := NewService( bus, projection.RegisterSchedule("foo", fooSchedule), projection.RegisterSchedule("bar", barSchedule), ) errs, err := svc.Run(context.TODO())
func (*Service) Run ¶
Run starts the Service is a new goroutine and returns a channel of asynchronous errors, or a single error if the event bus fails to subscribe. When another Service triggers a Schedule with a name that is registered in svc, svc accepts that trigger by publishing a TriggerAccepted event and then actually triggers the Schedule.
func (*Service) Trigger ¶
Trigger triggers the Schedule with the given name.
Trigger publishes a Triggered event over the event bus and waits for a TriggerAccepted event to be published by another Service. Should the TriggerAccepted event not be published within the trigger timeout, ErrUnhandledTrigger is returned. When ctx is canceled, ctx.Err() is returned.
type ServiceOption ¶
type ServiceOption func(*Service)
ServiceOption is an option for creating a Service.
func RegisterSchedule ¶
func RegisterSchedule(name string, s Schedule) ServiceOption
RegisterSchedule returns a ServiceOption that registers the Schedule s with the given name into a Service
func TriggerTimeout ¶
func TriggerTimeout(d time.Duration) ServiceOption
TriggerTimeout returns a ServiceOption that overrides the default timeout for triggering a Schedule. Default is 5s. Zero Duration means no timeout.
type SubscribeOption ¶ added in v0.1.2
type SubscribeOption func(*Subscription)
SubscribeOption is an option for Schedule.Subscribe.
func BeforeEvent ¶ added in v0.1.2
func BeforeEvent[Data any](fn func(context.Context, event.Of[Data]) ([]event.Event, error), events ...string) SubscribeOption
BeforeEvent returns a SubscribeOption that registers the given function as a "before"-interceptor for the event streams created by a job's `EventsFor()` and `Apply()` methods. For each received event of a stream that has one of the provided event names, the provided function is called, and the returned events of that function are inserted into the stream before the intercepted event. If no event names are provided, the interceptor is applied to all events.
func Startup ¶ added in v0.1.2
func Startup(opts ...TriggerOption) SubscribeOption
Startup returns a SubscribeOption that triggers an initial projection run when subscribing to a projection schedule.
type Subscription ¶ added in v0.1.2
type Subscription struct { // If provided, the projection schedule triggers a projection job on startup. Startup *Trigger // BeforeEvent are the "before"-interceptors for the event streams created // by a job's `EventsFor()` and `Apply()` methods. BeforeEvent []func(context.Context, event.Event) ([]event.Event, error) }
Subscripion is the configuration for a subscription to a projection schedule.
func NewSubscription ¶ added in v0.1.2
func NewSubscription(opts ...SubscribeOption) Subscription
NewSubscription creates a Subscription using the provided options.
type Target ¶ added in v0.1.2
Target is a projection target. Projections must implement this interface to be used within goes' projection system.
*Base implements Target, and can be embedded into projections to implement this interface.
type Trigger ¶
type Trigger struct { // Reset projections before applying events. Reset bool // If provided, overrides the query that is used to fetch events that are // applied to projections. Query event.Query // If provided, overrides the query that is used to extract events from a // projection job. The `Aggregates()` and `Aggregate()` methods of a // projection job will use this query. This allows to optimize the query // performance of initial projection runs, which often times need to fetch // all ids of specific aggregates from the event store in order to get all // projections up-to-date. AggregateQuery event.Query // Additional filters that are applied in-memory to the query result of a // job's `EventsFor()` and `Apply()` methods. Filter []event.Query }
A Trigger is used by Schedules to trigger a Job.
func NewTrigger ¶
func NewTrigger(opts ...TriggerOption) Trigger
NewTrigger returns a projection trigger.
func (Trigger) JobOptions ¶ added in v0.1.2
JobOptions returns the options for a job that is triggered by this trigger.
func (Trigger) Options ¶
func (t Trigger) Options() []TriggerOption
Options returns the TriggerOptions to build t.
type TriggerAcceptedData ¶
TriggerAcceptedData is the event data for accepting a trigger.
type TriggerOption ¶
type TriggerOption func(*Trigger)
TriggerOption is a Trigger option.
func AggregateQuery ¶ added in v0.1.2
func AggregateQuery(q event.Query) TriggerOption
AggregateQuery returns a TriggerOption that sets the AggregateQuery of a Trigger.
The `Aggregates()` and `Aggregate()` methods of a projection job will use this query to extract the aggregates from the projection job.
var s projection.Schedule err := s.Trigger(context.TODO(), projection.AggregateQuery(query.New( query.Name("foo", "bar"), // extract aggregates from "foo" and "bar" events )))
func Filter ¶
func Filter(queries ...event.Query) TriggerOption
Filter returns a TriggerOption that adds filters to a Trigger.
Filters are applied in-memory, after the events have been fetched from the event store. When multiple filters are configured, events must match against every filter to be applied to projections. Sorting options of queries are ignored.
var s projection.Schedule err := s.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))
func Query ¶
func Query(q event.Query) TriggerOption
Query returns a TriggerOption that sets the Query of a Trigger.
When a Job is created by a Schedule, it is passed an event query to fetch the events for the projections. By default, this query fetches all events that are configured in the triggered Schedule, sorted by time. A custom query may be provided using the `Query` option. Don't forget to configure correct sorting when providing a custom query:
var s projection.Schedule err := s.Trigger(context.TODO(), projection.Query(query.New( query.AggregateName("foo", "bar"), query.SortBy(event.SortTime, event.SortAsc), // to ensure correct sorting )))
func Reset ¶
func Reset(reset bool) TriggerOption
Reset returns a TriggerOption that resets projections before applying events to them. Resetting a projection is done by first resetting the progress of the projection (if it implements progressor). Then, if the projection has a `Reset()` method, that method is called to allow for custom reset logic.