Documentation ¶
Index ¶
- Constants
- func ApplyWith[Data any](r Registerer, handler func(Of[Data]), eventNames ...string)
- func CompareSorting[A, B any](s Sorting, a Of[A], b Of[B]) (cmp int8)
- func Equal(events ...Of[any]) bool
- func Filter[D any](events <-chan Of[D], queries ...Query) <-chan Of[D]
- func HandleWith[Data any](r Registerer, handler func(Of[Data]), eventNames ...string)
- func Must[D any](events <-chan Of[D], errs <-chan error, err error) (<-chan Of[D], <-chan error)
- func NewRegistry(opts ...codec.Option) *codec.Registry
- func RegisterHandler[Data any](r Registerer, eventName string, handler func(Of[Data]))
- func Sort[Events ~[]Of[D], D any](events Events, sort Sorting, dir SortDirection) Events
- func SortMulti[Events ~[]Of[D], D any](events Events, sorts ...SortOptions) Events
- func Test[Data any](q Query, evt Of[Data]) bool
- type AggregateRef
- type Bus
- type Data
- type Event
- type Evt
- type Handlers
- type Of
- type Option
- type Publisher
- type Query
- type Registerer
- type SortDirection
- type SortOptions
- type Sorting
- type Store
- type Subscriber
Constants ¶
const ( // SortTime sorts events by time. SortTime = Sorting(iota) // SortAggregateName sorts events by aggregate name. SortAggregateName // SortAggregateID sorts events by aggregate id. SortAggregateID // SortAggregateVersion sorts events by aggregate version. SortAggregateVersion )
#region sortings
const ( // SortAsc sorts events in ascending order. SortAsc = SortDirection(iota) // SortDesc sorts events in descending order. SortDesc )
const All = "*"
All is a special event name that matches all events.
Variables ¶
This section is empty.
Functions ¶
func ApplyWith ¶ added in v0.1.2
func ApplyWith[Data any](r Registerer, handler func(Of[Data]), eventNames ...string)
ApplyWith is an alias for RegisterHandler. Use ApplyWith instead of RegisterHandler to make code more concise:
type Foo struct { *projection.Base Foo string } func NewFoo() *Foo { foo := &Foo{Base: projection.New()} // Because we "apply" events to the projection. event.ApplyWith(foo, foo.applyFoo, "foo") return foo } func (f *Foo) applyFoo(e event.Of[string]) { f.Foo = e.Data() }
func CompareSorting ¶ added in v0.1.2
CompareSorting compares a and b based on the given sorting and returns
-1 if a < b 0 is a == b 1 if a > b
func Equal ¶
Equal compares events to determine if they're equal. Two events are equal if their ids, names, times, and data are equal. Equality of time.Times is checked using a.Time().Equal(b.Time()) for the two events a and b. Event data is compared using the "==" equality operator.
func Filter ¶
Filter filters an event stream using the provided query. The returned stream only includes events that match the query.
func HandleWith ¶ added in v0.1.2
func HandleWith[Data any](r Registerer, handler func(Of[Data]), eventNames ...string)
HandleWith is an alias for RegisterHandler. Use HandleWith instead of RegisterHandler to make code more concise:
import "github.com/modernice/goes/event/handler" var bus event.Bus h := handler.New(bus) event.HandleWith(h, h.handleFoo, "foo")
func Must ¶
Must can be used to panic on failed event subscriptions:
var bus Bus events, errs := Must(bus.Subscribe(context.TODO(), "foo", "bar", "baz"))
func NewRegistry ¶ added in v0.1.2
NewRegistry returns a new event registry for encoding and decoding of event data for transmission over a network.
func RegisterHandler ¶ added in v0.1.2
func RegisterHandler[Data any](r Registerer, eventName string, handler func(Of[Data]))
RegisterHandler registers the handler for the given event.
Example using *aggregate.Base:
type Foo struct { *aggregate.Base Foo string Bar int Baz bool } type FooEvent { Foo string } type BazEvent { Baz bool } func NewFoo(id uuid.UUID) *Foo { foo := &Foo{Base: aggregate.New("foo", id)} event.RegisterHandler(foo, "foo", foo.applyFoo) event.RegisterHandler(foo, "bar", foo.applyBar) event.RegisterHandler(foo, "baz", foo.applyBaz) return foo } func (f *Foo) applyFoo(e event.Of[FooEvent]) { f.Foo = e.Data().Foo } func (f *Foo) applyBar(e event.Of[int]) { f.Bar = e.Data() } func (f *Foo) applyBaz(e event.Of[BazEvent]) { f.Baz = e.Data().Baz }
func Sort ¶
func Sort[Events ~[]Of[D], D any](events Events, sort Sorting, dir SortDirection) Events
Sort sorts events and returns the sorted events.
func SortMulti ¶
func SortMulti[Events ~[]Of[D], D any](events Events, sorts ...SortOptions) Events
SortMulti sorts events by multiple fields and returns the sorted events.
Types ¶
type AggregateRef ¶
AggregateRef is a reference to a specific aggregate, identified by its name and id.
func (AggregateRef) Aggregate ¶ added in v0.1.2
func (ref AggregateRef) Aggregate() (uuid.UUID, string, int)
Aggregate returns the id and name of the aggregate, and -1 as its version.
AggregateRef implements pick.AggregateProvider to allow for this:
var ref event.AggregateRef name := pick.AggregateName(ref) id := pick.AggregateID(ref)
func (AggregateRef) IsZero ¶ added in v0.1.2
func (ref AggregateRef) IsZero() bool
IsZero returns whether the ref has an empty name and a nil-UUID.
func (*AggregateRef) Parse ¶ added in v0.1.2
func (ref *AggregateRef) Parse(v string) error
Parse parses the string-representation of an AggregateRef into ref. Parse accepts values that are returned by AggregateRef.String().
func (AggregateRef) Split ¶ added in v0.1.2
func (ref AggregateRef) Split() (uuid.UUID, string)
Split splits the reference into its id and name.
func (AggregateRef) String ¶ added in v0.1.2
func (ref AggregateRef) String() string
String returns the string representation of the aggregate:
"NAME(ID)"
type Bus ¶
type Bus interface { Publisher Subscriber }
#region bus Bus is the pub-sub client for events.
type Data ¶
type Data[D any] struct { ID uuid.UUID Name string Time time.Time Data D AggregateName string AggregateID uuid.UUID AggregateVersion int }
Data contains the fields of an Evt.
type Evt ¶ added in v0.1.2
Evt is the event implementation.
func Cast ¶ added in v0.1.2
Cast casts the type paramater of given event to the type `To`. Cast panics if the event data is not a `To`.
Use TryCast to test if the event data can be casted to `To`.
func New ¶
New creates an event with the given name and data. A random UUID is generated and set as the event id. Its time is set to now.
Available options:
ID(uuid.UUID): Provide a custom event id Time(time.Time): Provide a custom event time Aggregate(string, uuid.UUID, int): Put the event into the event stream of an aggregate Previous(event.Event): Put the event into the event stream of an aggregate based on its previous event
func TryCast ¶ added in v0.1.2
TryCast casts the type paramater of given event to the type `To`. Cast returns false if the event data cannot be casted to `To`.
func (Evt[D]) Aggregate ¶ added in v0.1.2
Aggregate returns the aggregate of the stream that the event is a part of, or zero values if the event is not an aggregate event.
type Handlers ¶ added in v0.2.1
Handlers is a map of event names to event handlers. Handlers can be embedded into structs to implement Registerer. *github.com/modernice/goes/aggregate.Base embeds Handlers to allow for convenient registration of event handlers.
func (Handlers) EventHandlers ¶ added in v0.2.1
EventHandlers returns the handlers for the given event.
func (Handlers) HandleEvent ¶ added in v0.2.1
HandleEvent calls each registered handler of the given Event.
func (Handlers) RegisterEventHandler ¶ added in v0.2.1
RegisterEventHandler implements Registerer.
type Of ¶ added in v0.1.2
type Of[Data any] interface { // ID returns the id of the event. ID() uuid.UUID // Name returns the name of the event. Name() string // Time returns the time of the event. Time() time.Time // Data returns the event data. Data() Data // Aggregate returns the id, name and version of the aggregate that the // event belongs to. aggregate should return zero values if the event is not // an aggregate event. Aggregate() (id uuid.UUID, name string, version int) }
Of is an event with the given specific data type. An event has a unique id, a name, user-provided event data, and the time at which the event was raised.
If the Aggregate method of an event returns non-zero values, the event is considered to belong to the event stream of that aggregate:
var evt event.Event id, name, version := evt.Aggregate() // id is the UUID of the aggregate that the event belongs to // name is the name of the aggregate that the event belongs to // version is the optimistic concurrency version of the event within the // event stream of the aggregate
If an event is not part of an aggregate, the Aggregate method should return only zero values.
Use the New function to create an event:
evt := event.New("foo", 3) // evt.Name() == "foo" // evt.Data() == 3 // evt.Time() == time.Now()
To create an event for an aggregate, use the Aggregate() option:
var aggregateID uuid.UUID var aggregateName string var aggregateVersion int evt := event.New("foo", 3, event.Aggregate(aggregateID, aggregateName, aggregateVersion))
type Option ¶
Option is an event option.
func Aggregate ¶
Aggregate returns an Option that puts an event into the event stream of an aggregate.
type Publisher ¶ added in v0.1.2
type Publisher interface { // Publish publishes events. Each event is sent to all subscribers of the event. Publish(ctx context.Context, events ...Event) error }
A Publisher allows to publish events to subscribers of these events.
type Query ¶
type Query interface { // Names returns the event names to query for. Names() []string // IDs returns the event ids to query for. IDs() []uuid.UUID // Times returns the event time constraints for the query. Times() time.Constraints // AggregateNames returns the aggregate names to query for. AggregateNames() []string // AggregateIDs returns the aggregate ids to query for. AggregateIDs() []uuid.UUID // AggregateVersions returns the event version constraints for the query. AggregateVersions() version.Constraints // Aggregates returns a list of specific aggregates (name & id pairs) to // query for. If an AggregateRef has a nil-UUID, every Aggregate with the // given name is queried. // // Example: // id := uuid.New() // q := query.New(query.Aggregate("foo", id), query.Aggregate("bar", uuid.Nil)) // // The above query allows the "foo" aggregate with the specified id and // every "bar" aggregate. Events that do not fulfill any of these two // constraints will not be returned. // // Advantage of using this filter instead of using the `AggregateNames()` // and `AggregateIDs()` filters is that this filter allows to query multiple // specific aggregates with different names. Aggregates() []AggregateRef // Sorting returns the sorting options for the query. Events are sorted as // they would be by calling SortMulti(). Sortings() []SortOptions }
#region query A Query can be used to query events from an event store. Each of the query's methods that return a non-nil filter are considered when filtering events. Different (non-nil) filters must all be fulfilled by an event to be included in the result. Within a single filter that allows multiple values, the event must match at least one of the values.
type Registerer ¶ added in v0.1.2
type Registerer interface { // RegisterEventHandler registers an event handler for the given event name. RegisterEventHandler(eventName string, handler func(Event)) }
A Registerer can register handlers for different events. Types that implement Registerer can be passed to RegisterHandler(), ApplyWith(), and HandleWith() to conveniently register handlers for events.
var reg event.Registerer event.RegisterEventHandler(reg, "foo", func(e event.Of[FooEvent]) { log.Printf("handled %q event with data %v", e.Name(), e.Data()) }
ApplyWith() and HandleWith() are aliases for RegisterHandler(), to allow for more concise code.
type SortDirection ¶
type SortDirection int
SortDirection is a sorting direction, either ascending or descending.
func (SortDirection) Bool ¶
func (dir SortDirection) Bool(b bool) bool
Bool returns either b if dir=SortAsc or !b if dir=SortDesc.
type SortOptions ¶
type SortOptions struct { Sort Sorting Dir SortDirection }
SortOptions defines the sorting of a query.
type Store ¶
type Store interface { // Insert inserts events into the store. Insert(context.Context, ...Event) error // Find fetches the given event from the store. Find(context.Context, uuid.UUID) (Event, error) // Query queries the store for events and returns two channels – one for the // returned events and one for any asynchronous errors that occur during the // query. // // var store event.Store // events, errs, err := store.Query(context.TODO(), query.New(...)) // // handle err // err := streams.Walk(context.TODO(), func(evt event.Event) { // log.Println(fmt.Sprintf("Queried event: %s", evt.Name())) // }, events, errs) // // handle err Query(context.Context, Query) (<-chan Event, <-chan error, error) // Delete deletes events from the store. Delete(context.Context, ...Event) error }
#region store A Store provides persistence for events.
type Subscriber ¶ added in v0.1.2
type Subscriber interface { // Subscribe subscribes to events with the given names and returns two // channels – one for the received events and one for any asynchronous // errors that occur during the subscription. If Subscribe fails to // subscribe to all events, nil channels and an error are returned instead. // // When the provided context is canceled, the subscription is also canceled // and the returned channels are closed. Subscribe(ctx context.Context, names ...string) (<-chan Event, <-chan error, error) }
A Subscriber allows to subscribe to events.
Directories ¶
Path | Synopsis |
---|---|
Package query provides an event query builder.
|
Package query provides an event query builder. |
time
Package time provides time constraints for queries.
|
Package time provides time constraints for queries. |
version
Package version provides version constraints for queries.
|
Package version provides version constraints for queries. |