event

package
v0.3.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 24, 2023 License: Apache-2.0 Imports: 12 Imported by: 13

README

Events

Package event is the core of goes. It defines and implements a generic event system that is used as the building block for all the other components provided by goes.

The core type of this package is the Event interface. An event is either an application event or an aggregate event, depending on the provided data. Read the documentation of the Event interface for more information.

To create an event, pass at least the name of the event and some arbitrary event data to the New function:

import "github.com/modernice/goes/event"

func example() {
	evt := event.New("foo", 3)
	// evt.ID() == uuid.UUID{...}
	// evt.Name() == "foo"
	// evt.Time() == time.Now()
	// evt.Data() == 3
}

Events can be published over an event bus:

import "github.com/modernice/goes/event"

func example(bus event.Bus) {
	evt := event.New("foo", 3)
	err := bus.Publish(context.TODO(), evt)
}

Events can also be subscribed to using an event bus:

import "github.com/modernice/goes/event"

func example(bus event.Bus) {
	// Subscribe to "foo", "bar", and "baz" events.
	events, errs, err := bus.Subscribe(context.TODO(), "foo", "bar", "baz")

Events can be inserted into and queried from an event store:

import "github.com/modernice/goes/event"

func example(store event.Store) {
	evt := event.New("foo", 3)
	err := store.Insert(context.TODO(), evt)

	events, errs, err := store.Query(context.TODO(), query.New(
		query.Name("foo"), // query "foo" events
		query.SortByTime(), // sort events by time
 ))

Depending on the used event store and/or event bus implementations, it may be required to pass an encoder for event data to the store/bus.

Example using encoding/gob:

import (
	"github.com/modernice/goes/backend/mongo"
	"github.com/modernice/goes/event"
)

func example() {
	enc := codec.Gob(event.NewRegistry())
	codec.GobRegister[int](enc, "foo") // register "foo" as an int
	codec.GobRegister[string](enc, "bar") // register "bar" as a string
	codec.GobRegister[struct{Foo string}](enc, "baz") // register "baz" as a struct{Foo string}

	store := mongo.NewEventStore(enc)
}

// Alternative without the use of generics.
func example() {
	enc := codec.Gob(event.NewRegistry())
	enc.GobRegister("foo", func() any { return 0 })
	enc.GobRegister("bar", func() any { return "" })
	enc.GobRegister("baz", func() any { return struct{Foo string}{} })
}

Documentation

Index

Constants

View Source
const (
	// SortTime is a Sorting option that sorts events by their timestamp, with
	// earlier events coming before later events.
	SortTime = Sorting(iota)

	// SortAggregateName is a Sorting value that sorts events by their aggregate
	// name in a Query. Events with the same aggregate name are considered equal
	// when sorting with this value.
	SortAggregateName

	// SortAggregateID is a Sorting option that sorts events based on the
	// lexicographical order of their aggregate ID.
	SortAggregateID

	// SortAggregateVersion is a Sorting option that sorts events based on their
	// aggregate version, with lower versions coming first.
	SortAggregateVersion
)

#region sortings

View Source
const (
	// SortAsc is a SortDirection constant that represents sorting in ascending
	// order. Use it with Sorting and SortOptions to specify the desired sort
	// direction when querying events from an event store.
	SortAsc = SortDirection(iota)

	// SortDesc is a SortDirection that indicates the order of sorting should be in
	// descending order when comparing values.
	SortDesc
)
View Source
const All = "*"

All is a special event name that matches all events.

Variables

This section is empty.

Functions

func ApplyWith added in v0.1.2

func ApplyWith[Data any](r Registerer, handler func(Of[Data]), eventNames ...string)

ApplyWith is an alias for RegisterHandler. Use ApplyWith instead of RegisterHandler to make code more concise:

type Foo struct {
	*projection.Base

	Foo string
}

func NewFoo() *Foo  {
	foo := &Foo{Base: projection.New()}

	// Because we "apply" events to the projection.
	event.ApplyWith(foo, foo.applyFoo, "foo")

	return foo
}

func (f *Foo) applyFoo(e event.Of[string]) {
	f.Foo = e.Data()
}

func CompareSorting added in v0.1.2

func CompareSorting[A, B any](s Sorting, a Of[A], b Of[B]) (cmp int8)

CompareSorting compares two events a and b using the specified Sorting s. It returns -1 if a is less than b, 0 if they are equal, or 1 if a is greater than b. The comparison is based on the time, aggregate name, aggregate ID, or aggregate version, depending on the provided Sorting s.

func Equal

func Equal(events ...Of[any]) bool

Equal returns true if all provided events have the same ID, name, time, data, and aggregate information. It returns false otherwise. If less than two events are provided, it returns true.

func Filter

func Filter[D any](events <-chan Of[D], queries ...Query) <-chan Of[D]

Filter filters the events from the given input channel based on the provided queries and returns a new channel with only the events that pass all the queries. If no queries are provided, the input channel is returned unchanged.

func HandleWith added in v0.1.2

func HandleWith[Data any](r Registerer, handler func(Of[Data]), eventNames ...string)

HandleWith is an alias for RegisterHandler. Use HandleWith instead of RegisterHandler to make code more concise:

import "github.com/modernice/goes/event/handler"

var bus event.Bus
h := handler.New(bus)

event.HandleWith(h, h.handleFoo, "foo")

func Must

func Must[D any](events <-chan Of[D], errs <-chan error, err error) (<-chan Of[D], <-chan error)

Must wraps the given event and error channels, and panics if the provided error is not nil. It returns the same event and error channels if the error is nil. This function can be used to simplify error handling when setting up event subscriptions, ensuring that a valid subscription is established before proceeding.

Example

events := event.Must(bus.Subscribe(ctx, "event-name"))

func NewRegistry added in v0.1.2

func NewRegistry(opts ...codec.Option) *codec.Registry

NewRegistry returns a new event registry for encoding and decoding of event data for transmission over a network.

func RegisterHandler added in v0.1.2

func RegisterHandler[Data any](r Registerer, eventName string, handler func(Of[Data]))

RegisterHandler registers the handler for the given event.

Example using *aggregate.Base:

type Foo struct {
	*aggregate.Base

	Foo string
	Bar int
	Baz bool
}

type FooEvent { Foo string }
type BazEvent { Baz bool }

func NewFoo(id uuid.UUID) *Foo  {
	foo := &Foo{Base: aggregate.New("foo", id)}

	event.RegisterHandler(foo, "foo", foo.applyFoo)
	event.RegisterHandler(foo, "bar", foo.applyBar)
	event.RegisterHandler(foo, "baz", foo.applyBaz)

	return foo
}

func (f *Foo) applyFoo(e event.Of[FooEvent]) {
	f.Foo = e.Data().Foo
}

func (f *Foo) applyBar(e event.Of[int]) {
	f.Bar = e.Data()
}

func (f *Foo) applyBaz(e event.Of[BazEvent]) {
	f.Baz = e.Data().Baz
}

func Sort

func Sort[Events ~[]Of[D], D any](events Events, sort Sorting, dir SortDirection) Events

Sort sorts the provided events according to the specified Sorting and SortDirection, returning a new slice of sorted events.

func SortMulti

func SortMulti[Events ~[]Of[D], D any](events Events, sorts ...SortOptions) Events

SortMulti sorts a slice of events based on the provided sort options, in the order they appear. If multiple events have the same value for a specified sort option, they will be sorted based on the next sort option in the list. If all sort options are equal, the original order is preserved.

func Test

func Test[Data any](q Query, evt Of[Data]) bool

Types

type AggregateRef

type AggregateRef struct {
	Name string
	ID   uuid.UUID
}

AggregateRef represents a reference to an aggregate with a specific Name and ID. It provides methods to check if it's a zero value, retrieve aggregate information, split the Name and ID, and parse a string into an AggregateRef.

func (AggregateRef) Aggregate added in v0.1.2

func (ref AggregateRef) Aggregate() (uuid.UUID, string, int)

Aggregate returns the ID, name, and version of the AggregateRef. The returned version is always -1 as AggregateRef does not store version information.

func (AggregateRef) IsZero added in v0.1.2

func (ref AggregateRef) IsZero() bool

IsZero reports whether the AggregateRef is a zero value, meaning it has an empty Name and a zero UUID.

func (*AggregateRef) Parse added in v0.1.2

func (ref *AggregateRef) Parse(v string) error

Parse parses the given string representation of an AggregateRef and sets the Name and ID fields of the receiver. The input string should be in the format "Name(ID)" where Name is a non-empty string and ID is a valid UUID. Returns an error if the input string is invalid or cannot be parsed.

func (AggregateRef) Split added in v0.1.2

func (ref AggregateRef) Split() (uuid.UUID, string)

Split returns the ID and Name of the AggregateRef.

func (AggregateRef) String added in v0.1.2

func (ref AggregateRef) String() string

String returns a string representation of the AggregateRef in the format "Name(UUID)".

type Bus

type Bus interface {
	Publisher
	Subscriber
}

#region bus Bus combines the capabilities of both a Publisher and a Subscriber, allowing it to send events to all subscribers and set up subscriptions for specific event names. It is an interface that embeds the Publisher and Subscriber interfaces.

type Data

type Data[D any] struct {
	ID               uuid.UUID
	Name             string
	Time             time.Time
	Data             D
	AggregateName    string
	AggregateID      uuid.UUID
	AggregateVersion int
}

Data is a struct that holds event information such as its unique ID, name, time, and arbitrary data. Additionally, it contains aggregate-related fields like AggregateName, AggregateID, and AggregateVersion.

type Event

type Event = Of[any]

#region event Event is an event with arbitrary data.

type Evt added in v0.1.2

type Evt[D any] struct {
	D Data[D]
}

Evt is a concrete implementation of the Of interface, representing an event with specific data type and associated metadata such as the event's ID, name, time, and aggregate information. Evt can be used to create, manipulate, and test events in a type-safe manner.

func Any added in v0.1.2

func Any[Data any](evt Of[Data]) Evt[any]

Any returns an Evt[any] that is a copy of the given Of[Data] event with its data type erased. This can be useful when working with heterogeneous event lists where the specific data type is not important.

func Cast added in v0.1.2

func Cast[To, From any](evt Of[From]) Evt[To]

Cast converts an event with data of type From to an event with data of type To. The new event has the same ID, name, time, and aggregate information as the original event, but the data is cast to the specified To type.

func Expand added in v0.1.2

func Expand[D any](evt Of[D]) Evt[D]

Expand converts an Of[Data] event into an Evt[Data] event, preserving the event's properties. If the input event is already of type Evt[Data], it is returned directly. Otherwise, a new Evt[Data] event is created with the same properties as the input event.

func New

func New[D any](name string, data D, opts ...Option) Evt[D]

New creates a new event with the specified name and data, applying any provided options such as ID, Time, or Aggregate information. It returns an Evt struct containing the event data and metadata.

func TryCast added in v0.1.2

func TryCast[To, From any](evt Of[From]) (Evt[To], bool)

TryCast attempts to cast an event with data of type From to an event with data of type To. It returns the casted event and a boolean value indicating whether the casting was successful or not.

func (Evt[D]) Aggregate added in v0.1.2

func (evt Evt[D]) Aggregate() (uuid.UUID, string, int)

Aggregate returns the id, name, and version of the aggregate that the event belongs to. If the event is not an aggregate event, it returns zero values.

func (Evt[D]) Any added in v0.1.2

func (evt Evt[D]) Any() Evt[any]

Any converts an event with a specific data type (Of[Data]) to an event with the generic any data type (Evt[any]).

func (Evt[D]) Data added in v0.1.2

func (evt Evt[D]) Data() D

Data returns the event data of the Evt.

func (Evt[D]) Event added in v0.1.2

func (evt Evt[D]) Event() Of[D]

Event returns the unique id, name, user-provided event data, and the time at which the event was raised. If the Aggregate method of an event returns non-zero values, the event is considered to belong to the event stream of that aggregate.

func (Evt[D]) ID added in v0.1.2

func (evt Evt[D]) ID() uuid.UUID

ID returns the unique identifier of the event.

func (Evt[D]) Name added in v0.1.2

func (evt Evt[D]) Name() string

Name returns the name of the event.

func (Evt[D]) Time added in v0.1.2

func (evt Evt[D]) Time() time.Time

Time returns the time at which the event was raised.

type Handlers added in v0.2.1

type Handlers map[string][]func(Event)

Handlers is a map of event names to event handlers. Handlers can be embedded into structs to implement Registerer. *github.com/modernice/goes/aggregate.Base embeds Handlers to allow for convenient registration of event handlers.

func (Handlers) EventHandlers added in v0.2.1

func (h Handlers) EventHandlers(eventName string) []func(Event)

EventHandlers returns the handlers for the given event.

func (Handlers) HandleEvent added in v0.2.1

func (h Handlers) HandleEvent(evt Event)

HandleEvent calls each registered handler of the given Event.

func (Handlers) RegisterEventHandler added in v0.2.1

func (h Handlers) RegisterEventHandler(eventName string, handler func(Event))

RegisterEventHandler implements Registerer.

type Of added in v0.1.2

type Of[Data any] interface {
	// ID returns the id of the event.
	ID() uuid.UUID
	// Name returns the name of the event.
	Name() string
	// Time returns the time of the event.
	Time() time.Time
	// Data returns the event data.
	Data() Data

	// Aggregate returns the id, name and version of the aggregate that the
	// event belongs to. aggregate should return zero values if the event is not
	// an aggregate event.
	Aggregate() (id uuid.UUID, name string, version int)
}

Of is an event with the given specific data type. An event has a unique id, a name, user-provided event data, and the time at which the event was raised.

If the Aggregate method of an event returns non-zero values, the event is considered to belong to the event stream of that aggregate:

var evt event.Event
id, name, version := evt.Aggregate()
// id is the UUID of the aggregate that the event belongs to
// name is the name of the aggregate that the event belongs to
// version is the optimistic concurrency version of the event within the
// event stream of the aggregate

If an event is not part of an aggregate, the Aggregate method should return only zero values.

Use the New function to create an event:

evt := event.New("foo", 3)
// evt.Name() == "foo"
// evt.Data() == 3
// evt.Time() == time.Now()

To create an event for an aggregate, use the Aggregate() option:

var aggregateID uuid.UUID
var aggregateName string
var aggregateVersion int
evt := event.New("foo", 3, event.Aggregate(aggregateID, aggregateName, aggregateVersion))

type Option

type Option func(*Evt[any])

func Aggregate

func Aggregate(id uuid.UUID, name string, version int) Option

Aggregate returns the id, name, and version of the aggregate that the event belongs to. If the event is not an aggregate event, it should return zero values.

func ID

func ID(id uuid.UUID) Option

ID returns the unique identifier of the event.

func Previous

func Previous[Data any](prev Of[Data]) Option

Previous sets the aggregate information for an event based on the provided previous event, incrementing the aggregate version by 1. It returns an Option to be used when creating a new event with New.

func Time

func Time(t time.Time) Option

Time sets the time of an event to the provided time value. It is an Option function used when creating a new event with the New function.

type Publisher added in v0.1.2

type Publisher interface {
	// Publish sends the given events to all subscribers of the event bus. It takes
	// a context and a variadic list of events as arguments and returns an error if
	// any occurred during the process.
	Publish(ctx context.Context, events ...Event) error
}

Publisher is an interface that represents the ability to publish events to a message bus. It provides a single method, Publish, which takes a context and a variadic list of events and returns an error if the publishing process encounters any issues.

type Query

type Query interface {
	// Names returns a slice of event names included in the Query.
	Names() []string

	// IDs returns a slice of UUIDs that the Query should match. The returned events
	// will have their EventID equal to one of the UUIDs in the slice.
	IDs() []uuid.UUID

	// Times returns the time constraints of the query, specifying the desired range
	// of event timestamps to be included in the result set.
	Times() time.Constraints

	// AggregateNames returns a slice of aggregate names that the Query is filtering
	// for.
	AggregateNames() []string

	// AggregateIDs returns a slice of UUIDs representing the aggregate IDs that the
	// Query is constrained to.
	AggregateIDs() []uuid.UUID

	// AggregateVersions returns the version constraints of the queried Aggregates
	// as a version.Constraints value.
	AggregateVersions() version.Constraints

	// Aggregates returns a slice of AggregateRef, representing the aggregate
	// references that match the query.
	Aggregates() []AggregateRef

	// Sortings returns a slice of SortOptions specifying the sorting criteria for
	// the query results. The events in the result set will be sorted according to
	// the provided sorting options in the order they appear in the slice.
	Sortings() []SortOptions
}

#region query

Query is an interface that represents a set of criteria for filtering and sorting events when querying an event store. It provides methods to access constraints on event names, aggregate IDs, time ranges, aggregate names, aggregate versions, and custom sorting options.

type Registerer added in v0.1.2

type Registerer interface {
	// RegisterEventHandler registers an event handler for the given event name.
	RegisterEventHandler(eventName string, handler func(Event))
}

A Registerer can register handlers for different events. Types that implement Registerer can be passed to RegisterHandler(), ApplyWith(), and HandleWith() to conveniently register handlers for events.

var reg event.Registerer
event.RegisterEventHandler(reg, "foo", func(e event.Of[FooEvent]) {
	log.Printf("handled %q event with data %v", e.Name(), e.Data())
}

ApplyWith() and HandleWith() are aliases for RegisterHandler(), to allow for more concise code.

type SortDirection

type SortDirection int

SortDirection determines the order of sorting in a query. It can be either ascending (SortAsc) or descending (SortDesc).

func (SortDirection) Bool

func (dir SortDirection) Bool(b bool) bool

Bool returns true if the given bool b matches the SortDirection, and false otherwise. If SortDirection is SortDesc, the result is the negation of b.

type SortOptions

type SortOptions struct {
	Sort Sorting
	Dir  SortDirection
}

SortOptions is a configuration struct used to specify the sorting criteria and direction for event queries. It contains a Sorting field to determine the sorting attribute (such as time, aggregate name, aggregate ID, or aggregate version) and a SortDirection field to indicate the sorting order (ascending or descending).

type Sorting

type Sorting int

Sorting is an enumeration of the possible ways to sort Events when querying a Store. Supported sort options include sorting by time, aggregate name, aggregate ID, and aggregate version.

func (Sorting) Compare

func (s Sorting) Compare(a, b Of[any]) (cmp int8)

Compare returns the comparison result of two events, a and b, based on the provided Sorting value s. The comparison result is -1 if a < b, 1 if a > b, or 0 if a == b.

type Store

type Store interface {
	// Insert inserts the provided Events into the Store. Returns an error if any
	// Event could not be inserted.
	Insert(context.Context, ...Event) error

	// Find retrieves the Event with the specified UUID from the Store. It returns
	// an error if the Event could not be found or if there was an issue accessing
	// the Store.
	Find(context.Context, uuid.UUID) (Event, error)

	// Query searches for Events in the Store that match the provided Query and
	// returns two channels: one for the found Events and another for errors that
	// may occur during the search. An error is returned if the search cannot be
	// started.
	Query(context.Context, Query) (<-chan Event, <-chan error, error)

	// Delete removes the specified Events from the Store. It returns an error if
	// any of the deletions fail.
	Delete(context.Context, ...Event) error
}

#region store

Store is an interface that provides methods for managing Event storage, including insertion, retrieval, querying, and deletion of events. Implementations should handle the storage and retrieval of events according to the provided Query constraints, such as filtering by aggregate names, IDs, and versions.

type Subscriber added in v0.1.2

type Subscriber interface {
	// Subscribe sets up a subscription for the specified event names, returning
	// channels for receiving events and errors, as well as an error if the
	// subscription fails. The context can be used to cancel the subscription.
	Subscribe(ctx context.Context, names ...string) (<-chan Event, <-chan error, error)
}

Subscriber is an interface that provides a method for subscribing to specific events by their names. The Subscribe method returns a channel for receiving events, a channel for receiving errors, and an error if there are any issues during the subscription process.

Directories

Path Synopsis
Package query provides an event query builder.
Package query provides an event query builder.
time
Package time provides time constraints for queries.
Package time provides time constraints for queries.
version
Package version provides version constraints for queries.
Package version provides version constraints for queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL