stream

package
v0.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2022 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func New

func New(ctx context.Context, events <-chan event.Event, opts ...Option) (<-chan aggregate.History, <-chan error)

New takes a channel of events and returns both a channel of aggregate Histories and an error channel. A History apply itself on an aggregate to build the current state of the aggregate.

Use the Drain function to get the Histories as a slice and a single error:

var events <-chan event.Event
str, errs := stream.New(events)
histories, err := streams.Drain(context.TODO(), str, errs)
// handle err
for _, h := range histories {
	foo := newFoo(h.AggregateID())
	h.Apply(foo)
}

func NewOf added in v0.1.2

func NewOf[D any, Event event.Of[D]](ctx context.Context, events <-chan Event, opts ...Option) (<-chan aggregate.History, <-chan error)

NewOf takes a channel of events and returns both a channel of aggregate Histories and an error channel. A History apply itself on an aggregate to build the current state of the aggregate.

Use the Drain function to get the Histories as a slice and a single error:

var events <-chan event.Event
str, errs := stream.New(events)
histories, err := streams.Drain(context.TODO(), str, errs)
// handle err
for _, h := range histories {
	foo := newFoo(h.AggregateID())
	h.Apply(foo)
}

Types

type Option

type Option func(*options)

Option is a stream option.

func Errors

func Errors(errs ...<-chan error) Option

Errors returns an Option that provides a Stream with error channels. A Stream will cancel its operation as soon as an error can be received from one of the error channels.

func Filter

func Filter(fns ...func(event.Event) bool) Option

Filter returns an Option that filters incoming events before they're handled by the Stream. events are passed to every fn in fns until a fn returns false. If any of fns returns false, the event is discarded by the Stream.

func Grouped

func Grouped(v bool) Option

Grouped returns an Option that optimizes aggregate builds by giving the Stream information about the order of incoming events from the streams.New.

When Grouped is disabled, the Stream has to wait for the streams.New to be drained before it can be sure no more events will arrive for a specific Aggregate. When Grouped is enabled, the Stream knows when all events for an Aggregate have been received and can therefore return the aggregate as soon as its last event has been received and applied.

Grouped is disabled by default and should only be enabled if the correct order of events is guaranteed by the underlying streams.New. events are correctly ordered only if they're sequentially grouped by aggregate. Sorting within a group of events does not matter if IsSorted is disabled (which it is by default). When IsSorted is enabled, events within a group must be ordered by aggregateVersion.

An example for correctly ordered events (with IsSorted disabled):

name="foo" id="BBXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=2
name="foo" id="BBXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=1
name="foo" id="BBXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=4
name="foo" id="BBXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=3
name="bar" id="AXXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=1
name="bar" id="AXXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=2
name="bar" id="AXXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=3
name="bar" id="AXXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=4
name="foo" id="AAXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=4
name="foo" id="AAXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=3
name="foo" id="AAXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=2
name="foo" id="AAXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=1
name="bar" id="BXXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=2
name="bar" id="BXXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=1
name="bar" id="BXXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=3
name="bar" id="BXXXXXXX-XXXX-XXXX-XXXXXXXXXXXX" version=4

func Sorted

func Sorted(v bool) Option

Sorted returns an Option that optimizes aggregate builds by giving the Stream information about the order of incoming events from the streams.New.

When Sorted is disabled (which it is by default), the Stream sorts the collected events for a specific aggregate by the AggregateVersion of the Events before applying them to the aggregate.

Enable this option only if the underlying streams.New guarantees that incoming events are sorted by aggregateVersion.

func ValidateConsistency

func ValidateConsistency(v bool) Option

ValidateConsistency returns an Option that optimizes aggregate builds by controlling if the consistency of events is validated before building an Aggregate from those events.

This option is enabled by default and should only be disabled if the consistency of events is guaranteed by the underlying streams.New or if it's explicitly desired to put an aggregate into an invalid state.

func WithSoftDeleted added in v0.1.2

func WithSoftDeleted(v bool) Option

WithSoftDeletes returns an Option that specifies if the stream should return soft-deleted aggregates in the returned History stream. Soft-deleted aggregates are by default excluded from the result.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL