event

package
v0.2.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2022 License: Apache-2.0 Imports: 12 Imported by: 13

README

Events

Package event is the core of goes. It defines and implements a generic event system that is used as the building block for all the other components provided by goes.

The core type of this package is the Event interface. An event is either an application event or an aggregate event, depending on the provided data. Read the documentation of the Event interface for more information.

To create an event, pass at least the name of the event and some arbitrary event data to the New function:

import "github.com/modernice/goes/event"

func example() {
	evt := event.New("foo", 3)
	// evt.ID() == uuid.UUID{...}
	// evt.Name() == "foo"
	// evt.Time() == time.Now()
	// evt.Data() == 3
}

Events can be published over an event bus:

import "github.com/modernice/goes/event"

func example(bus event.Bus) {
	evt := event.New("foo", 3)
	err := bus.Publish(context.TODO(), evt)
}

Events can also be subscribed to using an event bus:

import "github.com/modernice/goes/event"

func example(bus event.Bus) {
	// Subscribe to "foo", "bar", and "baz" events.
	events, errs, err := bus.Subscribe(context.TODO(), "foo", "bar", "baz")

Events can be inserted into and queried from an event store:

import "github.com/modernice/goes/event"

func example(store event.Store) {
	evt := event.New("foo", 3)
	err := store.Insert(context.TODO(), evt)

	events, errs, err := store.Query(context.TODO(), query.New(
		query.Name("foo"), // query "foo" events
		query.SortByTime(), // sort events by time
 ))

Depending on the used event store and/or event bus implementations, it may be required to pass an encoder for event data to the store/bus.

Example using encoding/gob:

import (
	"github.com/modernice/goes/backend/mongo"
	"github.com/modernice/goes/event"
)

func example() {
	enc := codec.Gob(event.NewRegistry())
	codec.GobRegister[int](enc, "foo") // register "foo" as an int
	codec.GobRegister[string](enc, "bar") // register "bar" as a string
	codec.GobRegister[struct{Foo string}](enc, "baz") // register "baz" as a struct{Foo string}

	store := mongo.NewEventStore(enc)
}

// Alternative without the use of generics.
func example() {
	enc := codec.Gob(event.NewRegistry())
	enc.GobRegister("foo", func() any { return 0 })
	enc.GobRegister("bar", func() any { return "" })
	enc.GobRegister("baz", func() any { return struct{Foo string}{} })
}

Documentation

Index

Constants

View Source
const (
	// SortTime sorts events by time.
	SortTime = Sorting(iota)
	// SortAggregateName sorts events by aggregate name.
	SortAggregateName
	// SortAggregateID sorts events by aggregate id.
	SortAggregateID
	// SortAggregateVersion sorts events by aggregate version.
	SortAggregateVersion
)

#region sortings

View Source
const (
	// SortAsc sorts events in ascending order.
	SortAsc = SortDirection(iota)
	// SortDesc sorts events in descending order.
	SortDesc
)
View Source
const All = "*"

All is a special event name that matches all events.

Variables

This section is empty.

Functions

func ApplyWith added in v0.1.2

func ApplyWith[Data any](r Registerer, handler func(Of[Data]), eventNames ...string)

ApplyWith is an alias for RegisterHandler. Use ApplyWith instead of RegisterHandler to make code more concise:

type Foo struct {
	*projection.Base

	Foo string
}

func NewFoo() *Foo  {
	foo := &Foo{Base: projection.New()}

	// Because we "apply" events to the projection.
	event.ApplyWith(foo, foo.applyFoo, "foo")

	return foo
}

func (f *Foo) applyFoo(e event.Of[string]) {
	f.Foo = e.Data()
}

func CompareSorting added in v0.1.2

func CompareSorting[A, B any](s Sorting, a Of[A], b Of[B]) (cmp int8)

CompareSorting compares a and b based on the given sorting and returns

-1 if a < b
0 is a == b
1 if a > b

func Equal

func Equal(events ...Of[any]) bool

Equal compares events to determine if they're equal. Two events are equal if their ids, names, times, and data are equal. Equality of time.Times is checked using a.Time().Equal(b.Time()) for the two events a and b. Event data is compared using the "==" equality operator.

func Filter

func Filter[D any](events <-chan Of[D], queries ...Query) <-chan Of[D]

Filter filters an event stream using the provided query. The returned stream only includes events that match the query.

func HandleWith added in v0.1.2

func HandleWith[Data any](r Registerer, handler func(Of[Data]), eventNames ...string)

HandleWith is an alias for RegisterHandler. Use HandleWith instead of RegisterHandler to make code more concise:

import "github.com/modernice/goes/event/handler"

var bus event.Bus
h := handler.New(bus)

event.HandleWith(h, h.handleFoo, "foo")

func Must

func Must[D any](events <-chan Of[D], errs <-chan error, err error) (<-chan Of[D], <-chan error)

Must can be used to panic on failed event subscriptions:

var bus Bus
events, errs := Must(bus.Subscribe(context.TODO(), "foo", "bar", "baz"))

func NewRegistry added in v0.1.2

func NewRegistry(opts ...codec.Option) *codec.Registry

NewRegistry returns a new event registry for encoding and decoding of event data for transmission over a network.

func RegisterHandler added in v0.1.2

func RegisterHandler[Data any](r Registerer, eventName string, handler func(Of[Data]))

RegisterHandler registers the handler for the given event.

Example using *aggregate.Base:

type Foo struct {
	*aggregate.Base

	Foo string
	Bar int
	Baz bool
}

type FooEvent { Foo string }
type BazEvent { Baz bool }

func NewFoo(id uuid.UUID) *Foo  {
	foo := &Foo{Base: aggregate.New("foo", id)}

	event.RegisterHandler(foo, "foo", foo.applyFoo)
	event.RegisterHandler(foo, "bar", foo.applyBar)
	event.RegisterHandler(foo, "baz", foo.applyBaz)

	return foo
}

func (f *Foo) applyFoo(e event.Of[FooEvent]) {
	f.Foo = e.Data().Foo
}

func (f *Foo) applyBar(e event.Of[int]) {
	f.Bar = e.Data()
}

func (f *Foo) applyBaz(e event.Of[BazEvent]) {
	f.Baz = e.Data().Baz
}

func Sort

func Sort[Events ~[]Of[D], D any](events Events, sort Sorting, dir SortDirection) Events

Sort sorts events and returns the sorted events.

func SortMulti

func SortMulti[Events ~[]Of[D], D any](events Events, sorts ...SortOptions) Events

SortMulti sorts events by multiple fields and returns the sorted events.

func Test

func Test[Data any](q Query, evt Of[Data]) bool

Test tests an event against a query and returns whether the query would include the event in its result.

Types

type AggregateRef

type AggregateRef struct {
	Name string
	ID   uuid.UUID
}

AggregateRef is a reference to a specific aggregate, identified by its name and id.

func (AggregateRef) Aggregate added in v0.1.2

func (ref AggregateRef) Aggregate() (uuid.UUID, string, int)

Aggregate returns the id and name of the aggregate, and -1 as its version.

AggregateRef implements pick.AggregateProvider to allow for this:

var ref event.AggregateRef
name := pick.AggregateName(ref)
id := pick.AggregateID(ref)

func (AggregateRef) IsZero added in v0.1.2

func (ref AggregateRef) IsZero() bool

IsZero returns whether the ref has an empty name and a nil-UUID.

func (*AggregateRef) Parse added in v0.1.2

func (ref *AggregateRef) Parse(v string) error

Parse parses the string-representation of an AggregateRef into ref. Parse accepts values that are returned by AggregateRef.String().

func (AggregateRef) Split added in v0.1.2

func (ref AggregateRef) Split() (uuid.UUID, string)

Split splits the reference into its id and name.

func (AggregateRef) String added in v0.1.2

func (ref AggregateRef) String() string

String returns the string representation of the aggregate:

"NAME(ID)"

type Bus

type Bus interface {
	Publisher
	Subscriber
}

#region bus Bus is the pub-sub client for events.

type Data

type Data[D any] struct {
	ID               uuid.UUID
	Name             string
	Time             time.Time
	Data             D
	AggregateName    string
	AggregateID      uuid.UUID
	AggregateVersion int
}

Data contains the fields of an Evt.

type Event

type Event = Of[any]

#region event Event is an event with arbitrary data.

type Evt added in v0.1.2

type Evt[D any] struct {
	D Data[D]
}

Evt is the event implementation.

func Any added in v0.1.2

func Any[Data any](evt Of[Data]) Evt[any]

Any casts the event data of the given event to `any`.

func Cast added in v0.1.2

func Cast[To, From any](evt Of[From]) Evt[To]

Cast casts the type paramater of given event to the type `To`. Cast panics if the event data is not a `To`.

Use TryCast to test if the event data can be casted to `To`.

func Expand added in v0.1.2

func Expand[D any](evt Of[D]) Evt[D]

Expand returns an Evt from an interface.

func New

func New[D any](name string, data D, opts ...Option) Evt[D]

New creates an event with the given name and data. A random UUID is generated and set as the event id. Its time is set to now.

Available options:

ID(uuid.UUID): Provide a custom event id
Time(time.Time): Provide a custom event time
Aggregate(string, uuid.UUID, int): Put the event into the event stream of an aggregate
Previous(event.Event): Put the event into the event stream of an aggregate
based on its previous event

func TryCast added in v0.1.2

func TryCast[To, From any](evt Of[From]) (Evt[To], bool)

TryCast casts the type paramater of given event to the type `To`. Cast returns false if the event data cannot be casted to `To`.

func (Evt[D]) Aggregate added in v0.1.2

func (evt Evt[D]) Aggregate() (uuid.UUID, string, int)

Aggregate returns the aggregate of the stream that the event is a part of, or zero values if the event is not an aggregate event.

func (Evt[D]) Any added in v0.1.2

func (evt Evt[D]) Any() Evt[any]

Any returns the event with its data type set to `any`.

func (Evt[D]) Data added in v0.1.2

func (evt Evt[D]) Data() D

Data returns the event data.

func (Evt[D]) Event added in v0.1.2

func (evt Evt[D]) Event() Of[D]

Event returns the event as an interface.

func (Evt[D]) ID added in v0.1.2

func (evt Evt[D]) ID() uuid.UUID

ID returns the event id.

func (Evt[D]) Name added in v0.1.2

func (evt Evt[D]) Name() string

Name returns the event name.

func (Evt[D]) Time added in v0.1.2

func (evt Evt[D]) Time() time.Time

Time returns the event time.

type Handlers added in v0.2.1

type Handlers map[string][]func(Event)

Handlers is a map of event names to event handlers. Handlers can be embedded into structs to implement Registerer. *github.com/modernice/goes/aggregate.Base embeds Handlers to allow for convenient registration of event handlers.

func (Handlers) EventHandlers added in v0.2.1

func (h Handlers) EventHandlers(eventName string) []func(Event)

EventHandlers returns the handlers for the given event.

func (Handlers) HandleEvent added in v0.2.1

func (h Handlers) HandleEvent(evt Event)

HandleEvent calls each registered handler of the given Event.

func (Handlers) RegisterEventHandler added in v0.2.1

func (h Handlers) RegisterEventHandler(eventName string, handler func(Event))

RegisterEventHandler implements Registerer.

type Of added in v0.1.2

type Of[Data any] interface {
	// ID returns the id of the event.
	ID() uuid.UUID
	// Name returns the name of the event.
	Name() string
	// Time returns the time of the event.
	Time() time.Time
	// Data returns the event data.
	Data() Data

	// Aggregate returns the id, name and version of the aggregate that the
	// event belongs to. aggregate should return zero values if the event is not
	// an aggregate event.
	Aggregate() (id uuid.UUID, name string, version int)
}

Of is an event with the given specific data type. An event has a unique id, a name, user-provided event data, and the time at which the event was raised.

If the Aggregate method of an event returns non-zero values, the event is considered to belong to the event stream of that aggregate:

var evt event.Event
id, name, version := evt.Aggregate()
// id is the UUID of the aggregate that the event belongs to
// name is the name of the aggregate that the event belongs to
// version is the optimistic concurrency version of the event within the
// event stream of the aggregate

If an event is not part of an aggregate, the Aggregate method should return only zero values.

Use the New function to create an event:

evt := event.New("foo", 3)
// evt.Name() == "foo"
// evt.Data() == 3
// evt.Time() == time.Now()

To create an event for an aggregate, use the Aggregate() option:

var aggregateID uuid.UUID
var aggregateName string
var aggregateVersion int
evt := event.New("foo", 3, event.Aggregate(aggregateID, aggregateName, aggregateVersion))

type Option

type Option func(*Evt[any])

Option is an event option.

func Aggregate

func Aggregate(id uuid.UUID, name string, version int) Option

Aggregate returns an Option that puts an event into the event stream of an aggregate.

func ID

func ID(id uuid.UUID) Option

ID returns an Option that overrides the auto-generated UUID of an event.

func Previous

func Previous[Data any](prev Of[Data]) Option

Previous returns an Option that puts an event into the event stream of an aggregate. If prev provides non-zero aggregate data, the created event will have the same data but with its version increased by 1.

func Time

func Time(t time.Time) Option

Time returns an Option that overrides the auto-generated timestamp of an event.

type Publisher added in v0.1.2

type Publisher interface {
	// Publish publishes events. Each event is sent to all subscribers of the event.
	Publish(ctx context.Context, events ...Event) error
}

A Publisher allows to publish events to subscribers of these events.

type Query

type Query interface {
	// Names returns the event names to query for.
	Names() []string

	// IDs returns the event ids to query for.
	IDs() []uuid.UUID

	// Times returns the event time constraints for the query.
	Times() time.Constraints

	// AggregateNames returns the aggregate names to query for.
	AggregateNames() []string

	// AggregateIDs returns the aggregate ids to query for.
	AggregateIDs() []uuid.UUID

	// AggregateVersions returns the event version constraints for the query.
	AggregateVersions() version.Constraints

	// Aggregates returns a list of specific aggregates (name & id pairs) to
	// query for. If an AggregateRef has a nil-UUID, every Aggregate with the
	// given name is queried.
	//
	// Example:
	//	id := uuid.New()
	//	q := query.New(query.Aggregate("foo", id), query.Aggregate("bar", uuid.Nil))
	//
	// The above query allows the "foo" aggregate with the specified id and
	// every "bar" aggregate. Events that do not fulfill any of these two
	// constraints will not be returned.
	//
	// Advantage of using this filter instead of using the `AggregateNames()`
	// and `AggregateIDs()` filters is that this filter allows to query multiple
	// specific aggregates with different names.
	Aggregates() []AggregateRef

	// Sorting returns the sorting options for the query. Events are sorted as
	// they would be by calling SortMulti().
	Sortings() []SortOptions
}

#region query A Query can be used to query events from an event store. Each of the query's methods that return a non-nil filter are considered when filtering events. Different (non-nil) filters must all be fulfilled by an event to be included in the result. Within a single filter that allows multiple values, the event must match at least one of the values.

type Registerer added in v0.1.2

type Registerer interface {
	// RegisterEventHandler registers an event handler for the given event name.
	RegisterEventHandler(eventName string, handler func(Event))
}

A Registerer can register handlers for different events. Types that implement Registerer can be passed to RegisterHandler(), ApplyWith(), and HandleWith() to conveniently register handlers for events.

var reg event.Registerer
event.RegisterEventHandler(reg, "foo", func(e event.Of[FooEvent]) {
	log.Printf("handled %q event with data %v", e.Name(), e.Data())
}

ApplyWith() and HandleWith() are aliases for RegisterHandler(), to allow for more concise code.

type SortDirection

type SortDirection int

SortDirection is a sorting direction, either ascending or descending.

func (SortDirection) Bool

func (dir SortDirection) Bool(b bool) bool

Bool returns either b if dir=SortAsc or !b if dir=SortDesc.

type SortOptions

type SortOptions struct {
	Sort Sorting
	Dir  SortDirection
}

SortOptions defines the sorting of a query.

type Sorting

type Sorting int

Sorting is a sorting.

func (Sorting) Compare

func (s Sorting) Compare(a, b Of[any]) (cmp int8)

Compare compares a and b based on the given sorting and returns

-1 if a < b
0 is a == b
1 if a > b

type Store

type Store interface {
	// Insert inserts events into the store.
	Insert(context.Context, ...Event) error

	// Find fetches the given event from the store.
	Find(context.Context, uuid.UUID) (Event, error)

	// Query queries the store for events and returns two channels – one for the
	// returned events and one for any asynchronous errors that occur during the
	// query.
	//
	//	var store event.Store
	//	events, errs, err := store.Query(context.TODO(), query.New(...))
	//	// handle err
	//	err := streams.Walk(context.TODO(), func(evt event.Event) {
	//		log.Println(fmt.Sprintf("Queried event: %s", evt.Name()))
	//	}, events, errs)
	//	// handle err
	Query(context.Context, Query) (<-chan Event, <-chan error, error)

	// Delete deletes events from the store.
	Delete(context.Context, ...Event) error
}

#region store A Store provides persistence for events.

type Subscriber added in v0.1.2

type Subscriber interface {
	// Subscribe subscribes to events with the given names and returns two
	// channels – one for the received events and one for any asynchronous
	// errors that occur during the subscription. If Subscribe fails to
	// subscribe to all events, nil channels and an error are returned instead.
	//
	// When the provided context is canceled, the subscription is also canceled
	// and the returned channels are closed.
	Subscribe(ctx context.Context, names ...string) (<-chan Event, <-chan error, error)
}

A Subscriber allows to subscribe to events.

Directories

Path Synopsis
Package query provides an event query builder.
Package query provides an event query builder.
time
Package time provides time constraints for queries.
Package time provides time constraints for queries.
version
Package version provides version constraints for queries.
Package version provides version constraints for queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL