schedule

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 15, 2023 License: Apache-2.0 Imports: 9 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultDebounceCap = 5 * time.Second

DefaultDebounceCap is the default debounce cap if the DebounceCap() option is not provided and the provided debounce duration is <= 2.5s.

If the provided debounce duration is > 2.5s, the cap is set to the double of the duration. For example, a debounce duration of 3s will have a cap of 6s if the DebounceCap() option is not provided.

Functions

This section is empty.

Types

type Continuous

type Continuous struct {
	// contains filtered or unexported fields
}

Continuous is a projection Schedule that creates projection Jobs on every specified published event:

var bus event.Bus
var store event.Store
var proj projection.Projection
s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"})
errs, err := s.Subscribe(context.TODO(), func(job projection.Job) error {
	return job.Apply(job, proj)
})

func Continuously

func Continuously(bus event.Bus, store event.Store, eventNames []string, opts ...ContinuousOption) *Continuous

Continuously returns a Continuous schedule that, when subscribed to, subscribes to events with the given eventNames to create projection Jobs for those events.

Debounce events

It may be desirable to debounce the creation of projection Jobs to avoid creating a Job on every event if Events are published within a short interval:

var bus event.Bus
var store event.Store
s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"}, schedule.Debounce(time.Second))

func (*Continuous) Subscribe

func (schedule *Continuous) Subscribe(ctx context.Context, apply func(projection.Job) error, opts ...projection.SubscribeOption) (<-chan error, error)

Subscribe subscribes to the schedule and returns a channel of asynchronous projection errors, or a single error if subscribing failed. When ctx is canceled, the subscription is canceled and the returned error channel closed.

When a projection Job is created, the apply function is called with that Job. Use Job.Apply to apply the Job's events to a given projection:

var proj projection.Projection
var s *schedule.Continuous
s.Subscribe(context.TODO(), func(job projection.Job) error {
	return job.Apply(job, proj)
})

A Job provides helper functions to extract data from the Job's events. Query results are cached within a Job, so it is safe to call helper functions multiple times; the Job will figure out if it needs to actually perform the query or if it can return the cached result.

s.Subscribe(context.TODO(), func(job projection.Job) error {
	events, errs, err := job.Events(job) // fetch all events of the Job
	events, errs, err := job.Events(job, query.New(...)) // fetch events with filter
	events, errs, err := job.EventsOf(job, "foo", "bar") // fetch events that belong to specific aggregates
	events, errs, err := job.EventsFor(job, proj) // fetch events that would be applied to proj
	tuples, errs, err := job.Aggregates(job) // extract aggregates from events
	tuples, errs, err := job.Aggregates(job, "foo", "bar") // extract specific aggregates from events
	id, err := job.Aggregate(job, "foo") // extract UUID of first aggregate with given name
})

When the schedule is triggered by calling schedule.Trigger, a projection Job will be created and passed to apply.

func (Continuous) Trigger

func (schedule Continuous) Trigger(ctx context.Context, opts ...projection.TriggerOption) error

Trigger manually triggers the schedule. When triggering a schedule, a projection Job is created and passed to subscribers of the schedule. Trigger does not wait for the created Job to be applied. The only error ever returned by Trigger is ctx.Err(), if ctx is canceled before the trigger was accepted by every susbcriber.

Queried events

By default, when a Job is created by a trigger, the event query for the Job queries the configured events from the beginning of time until now, sorted by time. This query can be overriden using the projection.Query TriggerOption:

err := schedule.Trigger(context.TODO(), projection.Query(query.New(...)))

Filter events

Events can be further filtered using additional event queries. Fetched events are tested against the provided Queries to determine whether they should be included in the created Job:

err := schedule.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))

Difference between filters and the base query of a Job is that a Job may have multiple filters but only one query. The query is always used to actually fetch the events from the event store while filters are applied afterwards (in-memory). events must test against every provided filter to be included in the projection Job.

type ContinuousOption

type ContinuousOption func(*Continuous)

ContinuousOption is an option for the Continuous schedule.

func Debounce

func Debounce(d time.Duration) ContinuousOption

Debounce returns a ContinuousOption that debounces projection Jobs by the given Duration. When multiple events are published within the given Duration, only 1 projection Job for all events will be created instead of 1 Job per Event.

var bus event.Bus
var store event.Store
var proj projection.Projection
s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"}, schedule.Debounce(time.Second))
errs, err := s.Subscribe(context.TODO(), func(job projection.Job) error {
	return job.Apply(job, proj)
})

err := bus.Publish(
	context.TODO(),
	event.New("foo", ...),
	event.New("bar", ...),
	event.New("baz", ...),
)

func DebounceCap added in v0.1.2

func DebounceCap(cap time.Duration) ContinuousOption

DebounceCap returns a ContinuousOption that specifies the maximum wait time (cap) before force-triggering a projection job that was deferred by the Debounce() option.

By default, the maximum wait time is determined by this heuristic: If the duration provided to the Debounce() option is <= 2.5s, the wait cap is set to DefaultDebounceCap, which is 5s. Otherwise the cap is computed by doubling the duration provided to Debounce(). For example, a debounce duration of 3s will have a cap of 6s.

type Periodic

type Periodic struct {
	// contains filtered or unexported fields
}

Periodic is a projection schedule that creates projection Jobs in a defined interval.

func Periodically

func Periodically(store event.Store, interval time.Duration, eventNames []string) *Periodic

Periodically returns a Periodic schedule that, when subscribed to, creates a projection Job every interval Duration and passes that Job to every subscriber of the schedule.

func (*Periodic) Subscribe

func (schedule *Periodic) Subscribe(ctx context.Context, apply func(projection.Job) error, opts ...projection.SubscribeOption) (<-chan error, error)

Subscribe subscribes to the schedule and returns a channel of asynchronous projection errors, or a single error if subscribing failed. When ctx is canceled, the subscription is canceled and the returned error channel closed.

When a projection Job is created, the apply function is called with that Job. Use Job.Apply to apply the Job's events to a given projection:

var proj projection.Projection
var s *schedule.Periodic
s.Subscribe(context.TODO(), func(job projection.Job) error {
	return job.Apply(job, proj)
})

A Job provides helper functions to extract data from the Job's events. Query results are cached within a Job, so it is safe to call helper functions multiple times; the Job will figure out if it needs to actually perform the query or if it can return the cached result.

s.Subscribe(context.TODO(), func(job projection.Job) error {
	events, errs, err := job.Events(job) // fetch all events of the Job
	events, errs, err := job.Events(job, query.New(...)) // fetch events with filter
	events, errs, err := job.EventsOf(job, "foo", "bar") // fetch events that belong to specific aggregates
	events, errs, err := job.EventsFor(job, proj) // fetch events that would be applied to proj
	tuples, errs, err := job.Aggregates(job) // extract aggregates from events
	tuples, errs, err := job.Aggregates(job, "foo", "bar") // extract specific aggregates from events
	id, err := job.Aggregate(job, "foo") // extract UUID of first aggregate with given name
})

When the schedule is triggered by calling schedule.Trigger, a projection Job will be created and passed to apply.

func (Periodic) Trigger

func (schedule Periodic) Trigger(ctx context.Context, opts ...projection.TriggerOption) error

Trigger manually triggers the schedule. When triggering a schedule, a projection Job is created and passed to subscribers of the schedule. Trigger does not wait for the created Job to be applied. The only error ever returned by Trigger is ctx.Err(), if ctx is canceled before the trigger was accepted by every susbcriber.

Queried events

By default, when a Job is created by a trigger, the event query for the Job queries the configured events from the beginning of time until now, sorted by time. This query can be overriden using the projection.Query TriggerOption:

err := schedule.Trigger(context.TODO(), projection.Query(query.New(...)))

Filter events

Events can be further filtered using additional event queries. Fetched events are tested against the provided Queries to determine whether they should be included in the created Job:

err := schedule.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))

Difference between filters and the base query of a Job is that a Job may have multiple filters but only one query. The query is always used to actually fetch the events from the event store while filters are applied afterwards (in-memory). events must test against every provided filter to be included in the projection Job.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL