event

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2022 License: MIT Imports: 8 Imported by: 13

README

Events

goes defines and implements an event system that can be used both as a simple generic event system for your application events and also as the underlying system for event-sourced aggregates. The type that all of goes' components build around is the event.Event interface. The event.E struct provides the implementation for event.Event.

Design

goes' event design unifies generic events and aggregate events into a single event.Event type:

package event

type Event interface {
	// ID returns the unique id of the event.
	ID() uuid.UUID
	// Name returns the name of the event.
	Name() string
	// Time returns the time of the event.
	Time() time.Time
	// Data returns the event data.
	Data() interface{}

	// Aggregate returns the id, name and version of the aggregate that the
	// event belongs to. Aggregate should return zero values if the event is not
	// an aggregate event.
	Aggregate() (id uuid.UUID, name string, version int)
}

The version field that is returned by event.Event.Aggregate refers to the optimistic concurrency version of the aggregate the event belongs to. Event store implementations use this version to do optimistic concurrency checks when inserting event into the store.

Event Bus

The event.Bus interface defines a simple event bus that is accepted by all of goes' components. An event bus must be able to publish and subscribe to events:

package event

type Bus interface {
  Publish(context.Context, ...Event) error
  Subscribe(context.Context, ...string) (<-chan Event, <-chan error, error)
}

Instead of returning some kind cursor type, the event.Bus.Subscribe method returns two channels: one for the actual subscribed events and one for any asynchronous errors that happen during the subscription.

It is up to the caller to handle any errors to prevent the application from blocking. Common helpers are provided by goes to avoid boilerplate code caused by the use of channels.

Event Store

The event.Store interface defines the event store.

package event

type Store interface {
	Insert(context.Context, ...Event) error
	Find(context.Context, uuid.UUID) (Event, error)
	Query(context.Context, Query) (<-chan Event, <-chan error, error)
	Delete(context.Context, ...Event) error
}

Create Events

Events are created using the event.New constructor, which returns an event.E. event.New requires at least the event name and some user-defined event data. Additional options may be passed as functional event options.

package example

type FooEventData struct {
  Foo string
}

func createEvent() {
  name := "example.foo"
  data := FooEventData{Foo: "foo"}
  now := time.Now()
  evt := event.New(name, data, event.Time(now))

  evt == event.E{
    Name: "example.foo",
    Data: FooEventData{Foo: "foo"},
    Time: now,
  }
}

If no manual time is provided using the event.Time() option, the current time is used as the event time.

Create Aggregate Events

The event.Aggregate option binds an event to the event stream of an aggregate.

package example

func createAggregateEvent() {
  name := "example.foo"
  data := FooEventData{...}
  aggregateName := "example.foobar"
  aggregateID := uuid.UUID{...}
  aggregateVersion := 3

  evt := event.New(name, data, event.Aggregate(
    aggregateID,
    aggregateName,
    aggregateVersion,
  ))

  id, name, version := evt.Aggregate()
  id == aggregateID
  name == aggregateName
  version == aggregateVersion
}

Event Data

Event data is provided by simple user-defined structs. Depending on the used event bus and store implementations, events may have to be registered within a codec so that they can be appropriately encoded and decoded.

package auth

const UserRegistered = "auth.user.registered"

type UserRegisteredData struct {
  Email    string
  Password string
}

func RegisterEvents(r *codec.GobRegistry) {
  r.GobRegister(UserRegistered, func() interface{} { return UserRegisteredData{} })
}

Event Registry

Use codec.New() to create a registry for event encoders and decoders. Each event type registers its own encoder and decoder for its event data:

package example

func setupEvents() {
  reg := codec.New()
  reg.Register(
    "example.foo",
    codec.EncoderFunc(func(w io.Writer, data interface{}) error {
      // encode the event data and write into w
      return nil
    }),
    codec.DecoderFunc(func(r io.Reader) (interface{}, error) {
      // decode and return the event data in r
    }),
  )
}

You can then instantiate event data using the name of the event:

package example

func makeEventData(reg *codec.Registry) {
  data, err := reg.New("example.foo")
  // handle err

  data == FooEventData{}
}

Or encode and decode event data:

package example

// codec.Encoding is implemented by *codec.Registry
func encodeDecodeEventData(enc codec.Encoding, evt event.Event) {
  var buf bytes.Buffer
  err := enc.Encode(&buf, evt.Data())
  // handle err

  data, err := enc.Decode(bytes.NewReader(buf.Bytes()))
  // handle err

  data == evt.Data()
}

Stream Helpers

  • event.Walk : Walks a channel of events and calls a function for each event. Accepts optional error channels that cause the walk to stop on error.
  • event.Drain : Drains a channel of events and returns a slice of all events. Accepts optional error channels that cause the drain to stop on error.
  • event.ForEach : Walks a channel of events and optional channels of errors and calls a function on each event and each error until all channels are closed.

Documentation

Index

Constants

View Source
const (
	// SortTime sorts events by time.
	SortTime = Sorting(iota)
	// SortAggregateName sorts events by their aggregate name.
	SortAggregateName
	// SortAggregateID sorts events by their aggregate id.
	SortAggregateID
	// SortAggregateVersion sorts events by their aggregate version.
	SortAggregateVersion
)
View Source
const (
	// SortAsc sorts events in ascending order.
	SortAsc = SortDirection(iota)
	// SortDesc sorts events in descending order.
	SortDesc
)

Variables

View Source
var ExtractAggregateName = PickAggregateName

Deprecated: Use PickAggregateName instead.

View Source
var ExtractAggregatecVersion = PickAggregateName

Deprecated: Use PickAggregateVersion instead.

View Source
var ExtractAggregatecVersionID = PickAggregateID

Deprecated: Use PickAggregateID instead.

View Source
var ForEvery = ForEach

ForEvery is an alias for ForEach.

Deprecated: Use ForEach instead.

Functions

func Equal

func Equal(events ...Event) bool

Equal compares events and determines if they're equal. It works exactly like a normal "==" comparison except for the Time field which is being compared by calling a.Time().Equal(b.Time()) for the two Events a and b that are being compared.

func Filter

func Filter(events <-chan Event, queries ...Query) <-chan Event

Filter accepts a channel of Events and returns a filtered channel of Events Only Events that test against all provided Queries are pushed into the returned channel. The returned channel is closed when the provided channel is closed.

func ForEach

func ForEach(
	ctx context.Context,
	evtFn func(evt Event),
	errFn func(error),
	events <-chan Event,
	errs ...<-chan error,
)

ForEach iterates over the provided Event and error channels and for every Event evt calls evtFn(evt) and for every error e calls errFn(e) until all channels are closed or ctx is canceled.

func Must

func Must(events <-chan Event, errs <-chan error, err error) (<-chan Event, <-chan error)

Must can be used to panic on failed event subscriptions:

var bus Bus
events, errs := Must(bus.Subscribe(context.TODO(), "foo", "bar", "baz"))

func PickAggregateID

func PickAggregateID(evt Event) uuid.UUID

PickAggregateID returns the AggregateID of the given event.

func PickAggregateName

func PickAggregateName(evt Event) string

PickAggregateName returns the AggregateName of the given event.

func PickAggregateVersion

func PickAggregateVersion(evt Event) int

PickAggregateVersion returns the AggregateVersion of the given event.

func Stream

func Stream(events ...Event) <-chan Event

Stream returns an Event channel that is filled and closed with the provided Events in a separate goroutine.

func Test

func Test(q Query, evt Event) bool

Test tests the Event evt against the Query q and returns true if q should include evt in its results. Test can be used by in-memory event.Store implementations to filter events based on the query.

func Walk

func Walk(
	ctx context.Context,
	walkFn func(Event) error,
	events <-chan Event,
	errs ...<-chan error,
) error

Walk receives from the given Event channel until it and and all provided error channels are closed, ctx is closed or any of the provided error channels receives an error. For every Event e that is received from the Event channel, walkFn(e) is called. Should ctx be canceled before the channels are closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.

Example:

var bus Bus
events, errs, err := bus.Subscribe(context.TODO(), "foo", "bar", "baz")
// handle err
err := event.Walk(context.TODO(), func(e Event) {
	log.Println(fmt.Sprintf("Received %q Event: %v", e.Name(), e))
}, events, errs)
// handle err

Types

type AggregateRef

type AggregateRef struct {
	Name string
	ID   uuid.UUID
}

AggregateRef is a reference to a specific aggregate, identified by its name and id.

type AggregateTuple deprecated

type AggregateTuple = AggregateRef

Deprecated: Use AggregateRef instead.

type Bus

type Bus interface {
	// Publish sends the given Events to subscribers of such Events.
	Publish(ctx context.Context, events ...Event) error

	// Subscribe returns a channel of Events and a channel of asynchronous errors.
	// Only Events whose name is one of the provided names will be received from the
	// returned Event channel.
	//
	// When Subscribe fails to create the subscription, the returned channels
	// are nil and an error is returned.
	//
	// When ctx is canceled, both the Event and error channel are closed.
	//
	// Errors
	//
	// Callers of Subscribe must ensure that errors are received from the
	// returned error channel; otherwise the Bus may be blocked by the error
	// channel.
	Subscribe(ctx context.Context, names ...string) (<-chan Event, <-chan error, error)
}

Bus is the pub-sub client for Events.

type Data

type Data struct {
	ID               uuid.UUID
	Name             string
	Time             stdtime.Time
	Data             interface{}
	AggregateName    string
	AggregateID      uuid.UUID
	AggregateVersion int
}

Data can be used to provide the data that is needed to implement the Event interface. E embeds Data and provides the methods that return the data in Data.

type E

type E struct {
	D Data
}

E implements Event.

func (E) Aggregate

func (evt E) Aggregate() (uuid.UUID, string, int)

func (E) Data

func (evt E) Data() interface{}

func (E) ID

func (evt E) ID() uuid.UUID

func (E) Name

func (evt E) Name() string

func (E) Time

func (evt E) Time() stdtime.Time

type Event

type Event interface {
	// ID returns the unique id of the Event.
	ID() uuid.UUID
	// Name returns the name of the Event.
	Name() string
	// Time returns the time of the Event.
	Time() stdtime.Time
	// Data returns the Event Data.
	Data() interface{}

	// Aggregate returns the id, name and version of the aggregate that the
	// event belongs to. Aggregate should return zero values if the event is not
	// an aggregate event.
	Aggregate() (id uuid.UUID, name string, version int)
}

An Event describes something that has happened in the application or specifically something that has happened to an Aggregate in the application.

Publish & Subscribe

An Event can be published through a Bus and sent to subscribers of Events with the same name.

Example (publish):

var b event.Bus
evt := event.New("foo", someData{})
err := b.Publish(context.TODO(), evt)
// handle err

Example (subscribe):

var b event.Bus
res, errs, err := b.Subscribe(context.TODO(), "foo")
// handle err
err := event.Walk(context.TODO(), func(e event.Event) {
    log.Println(fmt.Sprintf("Received %q event: %v", e.Name(), e))
}, res, errs)
// handle err

func Await

func Await(ctx context.Context, events <-chan Event, errs <-chan error) (Event, error)

Await returns the first Event OR error that is received from events or errs. If ctx is canceled before an Event or error is received, ctx.Err() is returned.

func Drain

func Drain(ctx context.Context, events <-chan Event, errs ...<-chan error) ([]Event, error)

Drain drains the given Event channel and returns its Events.

Drain accepts optional error channels which will cause Drain to fail on any error. When Drain encounters an error from any of the error channels, the already drained Events and that error are returned. Similarly, when ctx is canceled, the drained Events and ctx.Err() are returned.

Drain returns when the provided Event channel is closed or it encounters an error from an error channel and does not wait for the error channels to be closed.

func New

func New(name string, data interface{}, opts ...Option) Event

New creates an Event with the given name and Data. A UUID is generated for the Event and its time is set to xtime.Now().

Provide Options to override or add data to the Event:

ID(uuid.UUID): Use a custom UUID
Time(time.Time): Use a custom Time
Aggregate(string, uuid.UUID, int): Add Aggregate data
Previous(event.Event): Set Aggregate data based on previous Event

func Sort

func Sort(events []Event, sort Sorting, dir SortDirection) []Event

Sort sorts events and returns the sorted events.

func SortMulti

func SortMulti(events []Event, sorts ...SortOptions) []Event

SortMulti sorts events by multiple fields and returns the sorted events.

type Option

type Option func(*E)

Option is an event option.

func Aggregate

func Aggregate(id uuid.UUID, name string, version int) Option

Aggregate returns an Option that links an Event to an Aggregate.

func ID

func ID(id uuid.UUID) Option

ID returns an Option that overrides the auto-generated UUID of an Event.

func Previous

func Previous(prev Event) Option

Previous returns an Option that adds Aggregate data to an Event. If prev provides non-nil Aggregate data (UUID, name & version), the returned Option adds those to the new Event with its version increased by 1.

func Time

func Time(t stdtime.Time) Option

Time returns an Option that overrides the auto-generated timestamp of an Event.

type Query

type Query interface {
	// Names returns the event names to query for.
	Names() []string

	// IDs returns the event ids to query for.
	IDs() []uuid.UUID

	// Times returns the time.Constraints for the query.
	Times() time.Constraints

	// AggregateNames returns the aggregate names to query for.
	AggregateNames() []string

	// AggregateIDs returns the aggregate ids to query for.
	AggregateIDs() []uuid.UUID

	// AggregateVersions returns the version.Constraints for the query.
	AggregateVersions() version.Constraints

	// Aggregates returns a list of specific Aggregates (name & ID pairs) to
	// query for. If an AggregateTuple has a nil-UUID, every Aggregate with the
	// name of the tuple is queried.
	//
	// Example:
	//	id := uuid.New()
	//	q := query.New(query.Aggregate("foo", id), query.Aggregate("bar", uuid.Nil))
	//
	// The above Query q allows "foo" Aggregates with the UUID id and every "bar" Aggregate.
	Aggregates() []AggregateTuple

	// Sorting returns the SortConfigs for the query.
	Sortings() []SortOptions
}

A Query is used by Stores to query Events.

type SortDirection

type SortDirection int

SortDirection is a sorting direction.

func (SortDirection) Bool

func (dir SortDirection) Bool(b bool) bool

Bool returns either b if dir=SortAsc or !b if dir=SortDesc.

type SortOptions

type SortOptions struct {
	Sort Sorting
	Dir  SortDirection
}

SortOptions defines the sorting behaviour of a Query.

type Sorting

type Sorting int

Sorting is a sorting.

func (Sorting) Compare

func (s Sorting) Compare(a, b Event) (cmp int8)

Compare compares a and b and returns -1 if a < b, 0 is a == b or 1 if a > b.

type Store

type Store interface {
	// Insert inserts Events into the store.
	Insert(context.Context, ...Event) error

	// Find fetches the Event with the specified UUID from the store.
	Find(context.Context, uuid.UUID) (Event, error)

	// Query queries the Store for Events that fit the given Query and returns a
	// channel of Events and a channel of errors.
	//
	// Example:
	//
	//	var store event.Store
	//	events, errs, err := store.Query(context.TODO(), query.New())
	//	// handle err
	//	err := event.Walk(context.TODO(), func(evt event.Event) {
	//		log.Println(fmt.Sprintf("Queried Event: %v", evt))
	//	}, events, errs)
	//	// handle err
	Query(context.Context, Query) (<-chan Event, <-chan error, error)

	// Delete deletes Events from the Store.
	Delete(context.Context, ...Event) error
}

A Store persists and queries Events.

Directories

Path Synopsis
natsbus
Package natsbus provides an event.Bus implementation with support for both NATS Core and NATS Streaming as the backend.
Package natsbus provides an event.Bus implementation with support for both NATS Core and NATS Streaming as the backend.
mongostore
Package mongostore provides a MongoDB event.Store.
Package mongostore provides a MongoDB event.Store.
Package mock_event is a generated GoMock package.
Package mock_event is a generated GoMock package.
Package query provides an event query builder.
Package query provides an event query builder.
time
Package time provides time constraints for queries.
Package time provides time constraints for queries.
version
Package version provides version constraints for queries.
Package version provides version constraints for queries.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL