Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultDebounceCap = 5 * time.Second
DefaultDebounceCap is the default debounce cap if the DebounceCap() option is not provided and the provided debounce duration is <= 2.5s.
If the provided debounce duration is > 2.5s, the cap is set to the double of the duration. For example, a debounce duration of 3s will have a cap of 6s if the DebounceCap() option is not provided.
Functions ¶
This section is empty.
Types ¶
type Continuous ¶
type Continuous struct {
// contains filtered or unexported fields
}
Continuous is a projection Schedule that creates projection Jobs on every specified published event:
var bus event.Bus var store event.Store var proj projection.Projection s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"}) errs, err := s.Subscribe(context.TODO(), func(job projection.Job) error { return job.Apply(job, proj) })
func Continuously ¶
func Continuously(bus event.Bus, store event.Store, eventNames []string, opts ...ContinuousOption) *Continuous
Continuously returns a Continuous schedule that, when subscribed to, subscribes to events with the given eventNames to create projection Jobs for those events.
Debounce events ¶
It may be desirable to debounce the creation of projection Jobs to avoid creating a Job on every event if Events are published within a short interval:
var bus event.Bus var store event.Store s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"}, schedule.Debounce(time.Second))
func (*Continuous) Subscribe ¶
func (schedule *Continuous) Subscribe(ctx context.Context, apply func(projection.Job) error, opts ...projection.SubscribeOption) (<-chan error, error)
Subscribe subscribes to the schedule and returns a channel of asynchronous projection errors, or a single error if subscribing failed. When ctx is canceled, the subscription is canceled and the returned error channel closed.
When a projection Job is created, the apply function is called with that Job. Use Job.Apply to apply the Job's events to a given projection:
var proj projection.Projection var s *schedule.Continuous s.Subscribe(context.TODO(), func(job projection.Job) error { return job.Apply(job, proj) })
A Job provides helper functions to extract data from the Job's events. Query results are cached within a Job, so it is safe to call helper functions multiple times; the Job will figure out if it needs to actually perform the query or if it can return the cached result.
s.Subscribe(context.TODO(), func(job projection.Job) error { events, errs, err := job.Events(job) // fetch all events of the Job events, errs, err := job.Events(job, query.New(...)) // fetch events with filter events, errs, err := job.EventsOf(job, "foo", "bar") // fetch events that belong to specific aggregates events, errs, err := job.EventsFor(job, proj) // fetch events that would be applied to proj tuples, errs, err := job.Aggregates(job) // extract aggregates from events tuples, errs, err := job.Aggregates(job, "foo", "bar") // extract specific aggregates from events id, err := job.Aggregate(job, "foo") // extract UUID of first aggregate with given name })
When the schedule is triggered by calling schedule.Trigger, a projection Job will be created and passed to apply.
func (Continuous) Trigger ¶
func (schedule Continuous) Trigger(ctx context.Context, opts ...projection.TriggerOption) error
Trigger manually triggers the schedule. When triggering a schedule, a projection Job is created and passed to subscribers of the schedule. Trigger does not wait for the created Job to be applied. The only error ever returned by Trigger is ctx.Err(), if ctx is canceled before the trigger was accepted by every susbcriber.
Queried events ¶
By default, when a Job is created by a trigger, the event query for the Job queries the configured events from the beginning of time until now, sorted by time. This query can be overriden using the projection.Query TriggerOption:
err := schedule.Trigger(context.TODO(), projection.Query(query.New(...)))
Filter events ¶
Events can be further filtered using additional event queries. Fetched events are tested against the provided Queries to determine whether they should be included in the created Job:
err := schedule.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))
Difference between filters and the base query of a Job is that a Job may have multiple filters but only one query. The query is always used to actually fetch the events from the event store while filters are applied afterwards (in-memory). events must test against every provided filter to be included in the projection Job.
type ContinuousOption ¶
type ContinuousOption func(*Continuous)
ContinuousOption is an option for the Continuous schedule.
func Debounce ¶
func Debounce(d time.Duration) ContinuousOption
Debounce returns a ContinuousOption that debounces projection Jobs by the given Duration. When multiple events are published within the given Duration, only 1 projection Job for all events will be created instead of 1 Job per Event.
var bus event.Bus var store event.Store var proj projection.Projection s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"}, schedule.Debounce(time.Second)) errs, err := s.Subscribe(context.TODO(), func(job projection.Job) error { return job.Apply(job, proj) }) err := bus.Publish( context.TODO(), event.New("foo", ...), event.New("bar", ...), event.New("baz", ...), )
func DebounceCap ¶ added in v0.1.2
func DebounceCap(cap time.Duration) ContinuousOption
DebounceCap returns a ContinuousOption that specifies the maximum wait time (cap) before force-triggering a projection job that was deferred by the Debounce() option.
By default, the maximum wait time is determined by this heuristic: If the duration provided to the Debounce() option is <= 2.5s, the wait cap is set to DefaultDebounceCap, which is 5s. Otherwise the cap is computed by doubling the duration provided to Debounce(). For example, a debounce duration of 3s will have a cap of 6s.
type Periodic ¶
type Periodic struct {
// contains filtered or unexported fields
}
Periodic is a projection schedule that creates projection Jobs in a defined interval.
func Periodically ¶
Periodically returns a Periodic schedule that, when subscribed to, creates a projection Job every interval Duration and passes that Job to every subscriber of the schedule.
func (*Periodic) Subscribe ¶
func (schedule *Periodic) Subscribe(ctx context.Context, apply func(projection.Job) error, opts ...projection.SubscribeOption) (<-chan error, error)
Subscribe subscribes to the schedule and returns a channel of asynchronous projection errors, or a single error if subscribing failed. When ctx is canceled, the subscription is canceled and the returned error channel closed.
When a projection Job is created, the apply function is called with that Job. Use Job.Apply to apply the Job's events to a given projection:
var proj projection.Projection var s *schedule.Periodic s.Subscribe(context.TODO(), func(job projection.Job) error { return job.Apply(job, proj) })
A Job provides helper functions to extract data from the Job's events. Query results are cached within a Job, so it is safe to call helper functions multiple times; the Job will figure out if it needs to actually perform the query or if it can return the cached result.
s.Subscribe(context.TODO(), func(job projection.Job) error { events, errs, err := job.Events(job) // fetch all events of the Job events, errs, err := job.Events(job, query.New(...)) // fetch events with filter events, errs, err := job.EventsOf(job, "foo", "bar") // fetch events that belong to specific aggregates events, errs, err := job.EventsFor(job, proj) // fetch events that would be applied to proj tuples, errs, err := job.Aggregates(job) // extract aggregates from events tuples, errs, err := job.Aggregates(job, "foo", "bar") // extract specific aggregates from events id, err := job.Aggregate(job, "foo") // extract UUID of first aggregate with given name })
When the schedule is triggered by calling schedule.Trigger, a projection Job will be created and passed to apply.
func (Periodic) Trigger ¶
func (schedule Periodic) Trigger(ctx context.Context, opts ...projection.TriggerOption) error
Trigger manually triggers the schedule. When triggering a schedule, a projection Job is created and passed to subscribers of the schedule. Trigger does not wait for the created Job to be applied. The only error ever returned by Trigger is ctx.Err(), if ctx is canceled before the trigger was accepted by every susbcriber.
Queried events ¶
By default, when a Job is created by a trigger, the event query for the Job queries the configured events from the beginning of time until now, sorted by time. This query can be overriden using the projection.Query TriggerOption:
err := schedule.Trigger(context.TODO(), projection.Query(query.New(...)))
Filter events ¶
Events can be further filtered using additional event queries. Fetched events are tested against the provided Queries to determine whether they should be included in the created Job:
err := schedule.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))
Difference between filters and the base query of a Job is that a Job may have multiple filters but only one query. The query is always used to actually fetch the events from the event store while filters are applied afterwards (in-memory). events must test against every provided filter to be included in the projection Job.