Documentation ¶
Index ¶
- Constants
- Variables
- func Equal(events ...Event) bool
- func Filter(events <-chan Event, queries ...Query) <-chan Event
- func ForEach(ctx context.Context, evtFn func(evt Event), errFn func(error), ...)
- func Must(events <-chan Event, errs <-chan error, err error) (<-chan Event, <-chan error)
- func PickAggregateID(evt Event) uuid.UUID
- func PickAggregateName(evt Event) string
- func PickAggregateVersion(evt Event) int
- func Stream(events ...Event) <-chan Event
- func Test(q Query, evt Event) bool
- func Walk(ctx context.Context, walkFn func(Event) error, events <-chan Event, ...) error
- type AggregateRef
- type AggregateTupledeprecated
- type Bus
- type Data
- type E
- type Event
- func Await(ctx context.Context, events <-chan Event, errs <-chan error) (Event, error)
- func Drain(ctx context.Context, events <-chan Event, errs ...<-chan error) ([]Event, error)
- func New(name string, data interface{}, opts ...Option) Event
- func Sort(events []Event, sort Sorting, dir SortDirection) []Event
- func SortMulti(events []Event, sorts ...SortOptions) []Event
- type Option
- type Query
- type SortDirection
- type SortOptions
- type Sorting
- type Store
Constants ¶
const ( // SortTime sorts events by time. SortTime = Sorting(iota) // SortAggregateName sorts events by their aggregate name. SortAggregateName // SortAggregateID sorts events by their aggregate id. SortAggregateID // SortAggregateVersion sorts events by their aggregate version. SortAggregateVersion )
const ( // SortAsc sorts events in ascending order. SortAsc = SortDirection(iota) // SortDesc sorts events in descending order. SortDesc )
Variables ¶
var ExtractAggregateName = PickAggregateName
Deprecated: Use PickAggregateName instead.
var ExtractAggregatecVersion = PickAggregateName
Deprecated: Use PickAggregateVersion instead.
var ExtractAggregatecVersionID = PickAggregateID
Deprecated: Use PickAggregateID instead.
var ForEvery = ForEach
ForEvery is an alias for ForEach.
Deprecated: Use ForEach instead.
Functions ¶
func Equal ¶
Equal compares events and determines if they're equal. It works exactly like a normal "==" comparison except for the Time field which is being compared by calling a.Time().Equal(b.Time()) for the two Events a and b that are being compared.
func Filter ¶
Filter accepts a channel of Events and returns a filtered channel of Events Only Events that test against all provided Queries are pushed into the returned channel. The returned channel is closed when the provided channel is closed.
func ForEach ¶
func ForEach( ctx context.Context, evtFn func(evt Event), errFn func(error), events <-chan Event, errs ...<-chan error, )
ForEach iterates over the provided Event and error channels and for every Event evt calls evtFn(evt) and for every error e calls errFn(e) until all channels are closed or ctx is canceled.
func Must ¶
Must can be used to panic on failed event subscriptions:
var bus Bus events, errs := Must(bus.Subscribe(context.TODO(), "foo", "bar", "baz"))
func PickAggregateID ¶
PickAggregateID returns the AggregateID of the given event.
func PickAggregateName ¶
PickAggregateName returns the AggregateName of the given event.
func PickAggregateVersion ¶
PickAggregateVersion returns the AggregateVersion of the given event.
func Stream ¶
Stream returns an Event channel that is filled and closed with the provided Events in a separate goroutine.
func Test ¶
Test tests the Event evt against the Query q and returns true if q should include evt in its results. Test can be used by in-memory event.Store implementations to filter events based on the query.
func Walk ¶
func Walk( ctx context.Context, walkFn func(Event) error, events <-chan Event, errs ...<-chan error, ) error
Walk receives from the given Event channel until it and and all provided error channels are closed, ctx is closed or any of the provided error channels receives an error. For every Event e that is received from the Event channel, walkFn(e) is called. Should ctx be canceled before the channels are closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.
Example:
var bus Bus events, errs, err := bus.Subscribe(context.TODO(), "foo", "bar", "baz") // handle err err := event.Walk(context.TODO(), func(e Event) { log.Println(fmt.Sprintf("Received %q Event: %v", e.Name(), e)) }, events, errs) // handle err
Types ¶
type AggregateRef ¶
AggregateRef is a reference to a specific aggregate, identified by its name and id.
type AggregateTuple
deprecated
type AggregateTuple = AggregateRef
Deprecated: Use AggregateRef instead.
type Bus ¶
type Bus interface { // Publish sends the given Events to subscribers of such Events. Publish(ctx context.Context, events ...Event) error // Subscribe returns a channel of Events and a channel of asynchronous errors. // Only Events whose name is one of the provided names will be received from the // returned Event channel. // // When Subscribe fails to create the subscription, the returned channels // are nil and an error is returned. // // When ctx is canceled, both the Event and error channel are closed. // // Errors // // Callers of Subscribe must ensure that errors are received from the // returned error channel; otherwise the Bus may be blocked by the error // channel. Subscribe(ctx context.Context, names ...string) (<-chan Event, <-chan error, error) }
Bus is the pub-sub client for Events.
type Data ¶
type Data struct { ID uuid.UUID Name string Time stdtime.Time Data interface{} AggregateName string AggregateID uuid.UUID AggregateVersion int }
Data can be used to provide the data that is needed to implement the Event interface. E embeds Data and provides the methods that return the data in Data.
type Event ¶
type Event interface { // ID returns the unique id of the Event. ID() uuid.UUID // Name returns the name of the Event. Name() string // Time returns the time of the Event. Time() stdtime.Time // Data returns the Event Data. Data() interface{} // Aggregate returns the id, name and version of the aggregate that the // event belongs to. Aggregate should return zero values if the event is not // an aggregate event. Aggregate() (id uuid.UUID, name string, version int) }
An Event describes something that has happened in the application or specifically something that has happened to an Aggregate in the application.
Publish & Subscribe
An Event can be published through a Bus and sent to subscribers of Events with the same name.
Example (publish):
var b event.Bus evt := event.New("foo", someData{}) err := b.Publish(context.TODO(), evt) // handle err
Example (subscribe):
var b event.Bus res, errs, err := b.Subscribe(context.TODO(), "foo") // handle err err := event.Walk(context.TODO(), func(e event.Event) { log.Println(fmt.Sprintf("Received %q event: %v", e.Name(), e)) }, res, errs) // handle err
func Await ¶
Await returns the first Event OR error that is received from events or errs. If ctx is canceled before an Event or error is received, ctx.Err() is returned.
func Drain ¶
Drain drains the given Event channel and returns its Events.
Drain accepts optional error channels which will cause Drain to fail on any error. When Drain encounters an error from any of the error channels, the already drained Events and that error are returned. Similarly, when ctx is canceled, the drained Events and ctx.Err() are returned.
Drain returns when the provided Event channel is closed or it encounters an error from an error channel and does not wait for the error channels to be closed.
func New ¶
New creates an Event with the given name and Data. A UUID is generated for the Event and its time is set to xtime.Now().
Provide Options to override or add data to the Event:
ID(uuid.UUID): Use a custom UUID Time(time.Time): Use a custom Time Aggregate(string, uuid.UUID, int): Add Aggregate data Previous(event.Event): Set Aggregate data based on previous Event
func Sort ¶
func Sort(events []Event, sort Sorting, dir SortDirection) []Event
Sort sorts events and returns the sorted events.
func SortMulti ¶
func SortMulti(events []Event, sorts ...SortOptions) []Event
SortMulti sorts events by multiple fields and returns the sorted events.
type Option ¶
type Option func(*E)
Option is an event option.
type Query ¶
type Query interface { // Names returns the event names to query for. Names() []string // IDs returns the event ids to query for. IDs() []uuid.UUID // Times returns the time.Constraints for the query. Times() time.Constraints // AggregateNames returns the aggregate names to query for. AggregateNames() []string // AggregateIDs returns the aggregate ids to query for. AggregateIDs() []uuid.UUID // AggregateVersions returns the version.Constraints for the query. AggregateVersions() version.Constraints // Aggregates returns a list of specific Aggregates (name & ID pairs) to // query for. If an AggregateTuple has a nil-UUID, every Aggregate with the // name of the tuple is queried. // // Example: // id := uuid.New() // q := query.New(query.Aggregate("foo", id), query.Aggregate("bar", uuid.Nil)) // // The above Query q allows "foo" Aggregates with the UUID id and every "bar" Aggregate. Aggregates() []AggregateTuple // Sorting returns the SortConfigs for the query. Sortings() []SortOptions }
A Query is used by Stores to query Events.
type SortDirection ¶
type SortDirection int
SortDirection is a sorting direction.
func (SortDirection) Bool ¶
func (dir SortDirection) Bool(b bool) bool
Bool returns either b if dir=SortAsc or !b if dir=SortDesc.
type SortOptions ¶
type SortOptions struct { Sort Sorting Dir SortDirection }
SortOptions defines the sorting behaviour of a Query.
type Store ¶
type Store interface { // Insert inserts Events into the store. Insert(context.Context, ...Event) error // Find fetches the Event with the specified UUID from the store. Find(context.Context, uuid.UUID) (Event, error) // Query queries the Store for Events that fit the given Query and returns a // channel of Events and a channel of errors. // // Example: // // var store event.Store // events, errs, err := store.Query(context.TODO(), query.New()) // // handle err // err := event.Walk(context.TODO(), func(evt event.Event) { // log.Println(fmt.Sprintf("Queried Event: %v", evt)) // }, events, errs) // // handle err Query(context.Context, Query) (<-chan Event, <-chan error, error) // Delete deletes Events from the Store. Delete(context.Context, ...Event) error }
A Store persists and queries Events.
Directories ¶
Path | Synopsis |
---|---|
natsbus
Package natsbus provides an event.Bus implementation with support for both NATS Core and NATS Streaming as the backend.
|
Package natsbus provides an event.Bus implementation with support for both NATS Core and NATS Streaming as the backend. |
mongostore
Package mongostore provides a MongoDB event.Store.
|
Package mongostore provides a MongoDB event.Store. |
Package mock_event is a generated GoMock package.
|
Package mock_event is a generated GoMock package. |
Package query provides an event query builder.
|
Package query provides an event query builder. |
time
Package time provides time constraints for queries.
|
Package time provides time constraints for queries. |
version
Package version provides version constraints for queries.
|
Package version provides version constraints for queries. |