aggregate

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: MIT Imports: 9 Imported by: 9

README

Aggregates

The aggregate package builds on top of the Event System to provide event-sourced aggregate tooling.

Design

The aggregate.Aggregate interface defines the minimum method set of an aggregate. The *aggregate.Base type implement this interface and can be embedded into your structs to provide the base implementation for your aggregates. The aggregate.New() function instantiates the *aggregate.Base type.

package aggregate

type Aggregate interface {
	// Aggregate returns the id, name and version of the aggregate.
	Aggregate() (uuid.UUID, string, int)

	// AggregateChanges returns the uncommited events of the aggregate.
	AggregateChanges() []event.Event

	// ApplyEvent applies the event on the aggregate.
	ApplyEvent(event.Event)
}
// Package auth is an example authentication service.
package auth

const UserAggregate = "auth.user"

type User struct {
  *aggregate.Base
}

// NewUser returns the user with the given id.
func NewUser(id uuid.UUID) *User {
  return &User{
    Base: aggregate.New(UserAggregate, id),
  }
}
Example

Example user aggregate:

package auth

import (
  "github.com/modernice/goes/event"
  "github.com/modernice/goes/aggregate"
)

// UserAggregate is the name of the User aggregate.
const UserAggregate = "auth.user"

// Events
const (
  UserRegistered = "auth.user.registered"
)

// UserRegisteredData is the event data for UserRegistered.
type UserRegisteredData struct {
  Name  string
  Email string
}

// User represents a user of the application.
type User struct {
  *aggregate.Base

  Name  string
  Email string
}

// NewUser returns the user with the given id.
func NewUser(id uuid.UUID) *User {
  return &User{
    Base: aggregate.New(UserAggregate, id),
  }
}

// Register registers the user with the given name and email address.
func (u *User) Register(name, email string) error {
  if name = strings.TrimSpace(name); name == "" {
    return errors.New("empty name")
  }

  if err := validateEmail(email); err != nil {
    return errors.New("invalid email %q: %v", email, err)
  }

  // aggregate.NextEvent() creates and applies the next event for the User using
  // u.ApplyEvent(evt). u.ApplyEvent then calls u.register(evt) which actually
  // updates the state of the User.
  aggregate.NextEvent(u, UserRegistered, UserRegisteredData{
    Name:  name,
    Email: email,
  })

  return nil
}

func (u *User) register(evt event.Event) {
  data := evt.Data().(UserRegisteredData)
  u.Name = data.Name
  u.Email = data.Email
}

// ApplyEvent overrides the ApplyEvent function of u.Base.
func (u *User) ApplyEvent(evt event.Event) {
  switch evt.Name() {
  case UserRegistered:
    u.register(evt)
  }
}

Repository

The github.com/modernice/goes/aggregate/repository package implements the aggregate.Repository interface. The aggregate repository uses the underlying event store to save and query aggregates from and to the event store.

package aggregate

type Repository interface {
	Save(ctx context.Context, a Aggregate) error
	Fetch(ctx context.Context, a Aggregate) error
	FetchVersion(ctx context.Context, a Aggregate, v int) error
	Query(ctx context.Context, q Query) (<-chan History, <-chan error, error)
	Delete(ctx context.Context, a Aggregate) error
}
Fetch an Aggregate

To fetch the current state of an aggregate, you need to pass the already instantiated aggregate to the aggregate.Repository.Fetch method.

package example

func fetchAggregate(repo aggregate.Repository) {
  userID := uuid.New() // Get this from somewhere
  u := NewUser(userID) // Instantiate the aggregate
  
  err := repo.Fetch(context.TODO(), u) // Fetch and apply events
  // handle err
}
Query Aggregates

You can query multiple aggregates with the aggregate.Repository.Query method, which accepts a query to filter aggregate events from the event store.

Queries return streams of aggregate.Historys, which provide the name and id of the streamed aggregate. Histories can be applied onto aggregates to build their current state.

package example

import (
  "context"
  "github.com/modernice/goes/aggregate"
  "github.com/modernice/goes/aggregate/query"
)

func queryAggregates(repo aggregate.Repository) {
  str, errs, err := repo.Query(context.TODO(), query.New(
    query.Name("auth.user"), // Query "auth.user" aggregates
  ))
  // handle err

  histories, err := aggregate.Drain(context.TODO(), str, errs) // Drain the stream
  // handle err

  for _, h := range histories {
    u := NewUser(h.AggregateID()) // Instantiate the aggregate
    h.Apply(u) // Build the aggregate state
  }
}
Delete an Aggregate

Deleting an aggregate means deleting all its events from the event store.

package example

func deleteAggregate(repo aggregate.Repository) {
  userID := uuid.New() // Get this from somewhere
  u := NewUser(userID)

  err := repo.Delete(context.TODO(), u)
  // handle err
}

Guides

Stream Helpers

goes provides helper functions to work with channels of aggregates and aggregate tuples:

  • aggregate.Walk
  • aggregate.Drain
  • aggregate.ForEach
  • aggregate.WalkRefs
  • aggregate.DrainRefs
  • aggregate.ForEachRef

Documentation

Index

Constants

View Source
const (
	// ID means there is an inconsistency in the ID of an Aggregate.
	InconsistentID = ConsistencyKind(iota + 1)

	// Name means there is an inconsistency in the Aggregate names of the Events
	// of an Aggregate.
	InconsistentName

	// Version means there is an inconsistency in the Event versions of an
	// Aggregate.
	InconsistentVersion

	// Time means there is an inconsistency in the Event times of an Aggregate.
	InconsistentTime
)
View Source
const (
	// SortName sorts aggregates by name.
	SortName = Sorting(iota)
	// SortID sorts aggregates by id.
	SortID
	// SortVersion sorts aggregates by version.
	SortVersion

	// SortAsc sorts aggregates in ascending order.
	SortAsc = SortDirection(iota)
	// SortDesc sorts aggregates in descending order.
	SortDesc
)

Variables

View Source
var ExtractID = PickID

Deprecated: Use PickID instead.

View Source
var ExtractName = PickName

Deprecated: Use PickName instead.

View Source
var ExtractVersion = PickVersion

Deprecated: Use PickVersion instead.

View Source
var ForEvery = ForEach

ForEvery is an alias for ForEach.

Deprecated: Use ForEach instead.

View Source
var ForEveryTuple = ForEachRef

ForEveryTuple is an alias for ForEachTuple.

Deprecated: Use ForEachRef instead.

View Source
var WalkTuples = WalkRefs

Deprecated: Use RefWalk instead.

Functions

func ApplyHistory

func ApplyHistory(a Aggregate, events ...event.Event) error

ApplyHistory applies the given events to the aggregate a to reconstruct the state of a at the time of the latest event. If the aggregate implements Committer, a.TrackChange(events) and a.Commit() are called before returning.

func ForEach

func ForEach(
	applyFn func(h History),
	errFn func(error),
	histories <-chan History,
	errs ...<-chan error,
)

ForEach iterates over the provided History and error channels and for every History h calls applyFn(h) and for every error e calls errFn(e) until all channels are closed or ctx is canceled.

func ForEachRef

func ForEachRef(
	applyFn func(Ref),
	errFn func(error),
	refs <-chan Ref,
	errs ...<-chan error,
)

ForEachRef iterates over the provided Ref and error channels and for every Ref r calls applyFn(r) and for every error e calls errFn(e) until all channels are closed or ctx is canceled.

func HasChange

func HasChange(a Aggregate, eventName string) bool

HasChange returns whether Aggregate a has an uncommitted Event with the given name.

func NextEvent

func NextEvent(a Aggregate, name string, data interface{}, opts ...event.Option) event.Event

NextEvent makes and returns the next Event e for the aggregate a. NextEvent calls a.ApplyEvent(e) and a.TrackChange(e) before returning the Event.

var foo aggregate.Aggregate
evt := aggregate.NextEvent(foo, "event-name", ...)

func NextVersion

func NextVersion(a Aggregate) int

NextVersion returns the next (uncommitted) version of an aggregate (UncommittedVersion(a) + 1).

func PickID

func PickID(a Aggregate) uuid.UUID

PickID returns the UUID of the given aggregate.

func PickName

func PickName(a Aggregate) string

PickName returns the name of the given aggregate.

func PickVersion

func PickVersion(a Aggregate) int

PickVersion returns the version of the given aggregate.

func UncommittedVersion

func UncommittedVersion(a Aggregate) int

UncommittedVersion returns the version the aggregate, including any uncommitted changes.

func ValidateConsistency

func ValidateConsistency(a Aggregate, events ...event.Event) error

Validate tests the consistency of the given events against the given aggregate.

An Event e is invalid if e.AggregateName() doesn't match a.AggregateName(), e.AggregateID() doesn't match a.AggregateID() or if e.AggregateVersion() doesn't match the position in events relative to a.AggregateVersion(). This means that events[0].AggregateVersion() must equal a.AggregateVersion() + 1, events[1].AggregateVersion() must equal a.AggregateVersion() + 2 etc.

An Event a is also invalid if its time is equal to or after the time of the previous Event.

The first Event e in events that is invalid causes Validate to return an *Error containing the Kind of inconsistency and the Event that caused the inconsistency.

func Walk

func Walk(
	ctx context.Context,
	walkFn func(History) error,
	str <-chan History,
	errs ...<-chan error,
) error

Walk receives from the given History channel until it and all provided error channels are closed, ctx is closed or any of the provided error channels receives an error. For every History h that is received from the History channel, walkFn(h) is called. Should ctx be canceled before the channels are closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.

Example:

var repo aggregate.Repository
str, errs, err := repo.Query(context.TODO(), query.New())
// handle err
err := aggregate.Walk(context.TODO(), func(h aggregate.History) {
	log.Println(fmt.Sprintf("Received History: %v", h))
}, str, errs)
// handle err

func WalkRefs

func WalkRefs(
	ctx context.Context,
	walkFn func(Ref) error,
	str <-chan Ref,
	errs ...<-chan error,
) error

WalkRefs receives from the given Ref channel until it and all provided error channels are closed, ctx is closed or any of the provided error channels receives an error. For every Ref r that is received from the Ref channel, walkFn(h) is called. Should ctx be canceled before the channels are closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.

Types

type Aggregate

type Aggregate interface {
	// Aggregate returns the id, name and version of the aggregate.
	Aggregate() (uuid.UUID, string, int)

	// AggregateChanges returns the uncommited events of the aggregate.
	AggregateChanges() []event.Event

	// ApplyEvent applies the event on the aggregate.
	ApplyEvent(event.Event)
}

Aggregate is an event-sourced Aggregate.

func Sort

func Sort(as []Aggregate, s Sorting, dir SortDirection) []Aggregate

Sort sorts aggregates and returns the sorted aggregates.

func SortMulti

func SortMulti(as []Aggregate, sorts ...SortOptions) []Aggregate

SortMulti sorts aggregates by multiple fields and returns the sorted aggregates.

type Base

type Base struct {
	ID      uuid.UUID
	Name    string
	Version int
	Changes []event.Event
}

Base can be embedded into structs to make them fulfill the Aggregate interface.

func New

func New(name string, id uuid.UUID, opts ...Option) *Base

New returns a new Base for an Aggregate.

func (*Base) Aggregate

func (b *Base) Aggregate() (uuid.UUID, string, int)

func (*Base) AggregateChanges

func (b *Base) AggregateChanges() []event.Event

AggregateChanges implements Aggregate.

func (*Base) AggregateID

func (b *Base) AggregateID() uuid.UUID

AggregateID implements Aggregate.

func (*Base) AggregateName

func (b *Base) AggregateName() string

AggregateName implements Aggregate.

func (*Base) AggregateVersion

func (b *Base) AggregateVersion() int

AggregateVersion implements Aggregate.

func (*Base) ApplyEvent

func (*Base) ApplyEvent(event.Event)

ApplyEvent implements aggregate. Aggregates that embed *Base should overide ApplyEvent.

func (*Base) Commit

func (b *Base) Commit()

Commit implement Aggregate.

func (*Base) SetVersion

func (b *Base) SetVersion(v int)

SetVersion implements snapshot.Aggregate.

func (*Base) TrackChange

func (b *Base) TrackChange(events ...event.Event)

TrackChange implements Aggregate.

type Committer

type Committer interface {
	// TrackChange adds events as changes to the aggregate.
	TrackChange(...event.Event)

	// Commit commits the uncommitted changes of the aggregate. The changes
	// should be removed and the aggregate version set to the version of last
	// tracked event.
	Commit()
}

Committer commits aggregate changes. Types that implement Committer are considered when applying the aggregate history onto the implementing type. The Commit function is called after applying the events onto the aggregate (using the ApplyEvent function) to commit the changes to the aggregate.

*Base implements Committer.

type ConsistencyError

type ConsistencyError struct {
	// Kind is the kind of incosistency.
	Kind ConsistencyKind
	// Aggregate is the handled aggregate.
	Aggregate Aggregate
	// Events are the tested events.
	Events []event.Event
	// EventIndex is the index of the Event that caused the Error.
	EventIndex int
}

Error is a consistency error.

func (*ConsistencyError) Error

func (err *ConsistencyError) Error() string

func (*ConsistencyError) Event

func (err *ConsistencyError) Event() event.Event

Event return the first Event that caused an inconsistency.

type ConsistencyKind

type ConsistencyKind int

ConsistencyKind is the kind of inconsistency.

func (ConsistencyKind) String

func (k ConsistencyKind) String() string

type History

type History interface {
	// AggregateName returns the name of the aggregate.
	AggregateName() string

	// AggregateID returns the UUID of the aggregate.
	AggregateID() uuid.UUID

	// Apply applies the history onto the aggregate to rebuild its current state.
	Apply(Aggregate)
}

A History provides the event history of an aggregate. A History can be applied onto an aggregate to rebuild its current state.

func Drain

func Drain(ctx context.Context, str <-chan History, errs ...<-chan error) ([]History, error)

Drain drains the given History channel and returns its Histories.

Drain accepts optional error channels which will cause Drain to fail on any error. When Drain encounters an error from any of the error channels, the already drained Histories and that error are returned. Similarly, when ctx is canceled, the drained Histories and ctx.Err() are returned.

Drain returns when the provided History channel is closed or it encounters an error from an error channel and does not wait for the error channels to be closed.

type Option

type Option func(*Base)

Option is an Aggregate option.

func Version

func Version(v int) Option

Version returns an Option that sets the version of an Aggregate.

type Query

type Query interface {
	// Names returns the aggregate names to query for.
	Names() []string

	// IDs returns the aggregate UUIDs to query for.
	IDs() []uuid.UUID

	// Versions returns the version constraints for the query.
	Versions() version.Constraints

	// Sortings returns the SortConfig for the query.
	Sortings() []SortOptions
}

Query is used by Repositories to filter aggregates.

type Ref

type Ref = event.AggregateRef

Ref is a reference to a specific aggregate, identified by its name and id.

func DrainRefs

func DrainRefs(ctx context.Context, str <-chan Ref, errs ...<-chan error) ([]Ref, error)

DrainRefs drains the given Ref channel and returns its Refs.

DrainRefs accepts optional error channels which will cause DrainRefs to fail on any error. When DrainRefs encounters an error from any of the error channels, the already drained Refs and that error are returned. Similarly, when ctx is canceled, the drained Refs and ctx.Err() are returned.

DrainRefs returns when the provided Ref channel is closed or it encounters an error from an error channel and does not wait for the error channels to be closed.

type Repository

type Repository interface {
	// Save inserts the changes of Aggregate a into the event store.
	Save(ctx context.Context, a Aggregate) error

	// Fetch fetches the events for the given Aggregate from the event store,
	// beginning from version a.AggregateVersion()+1 up to the latest version
	// for that Aggregate and applies them to a, so that a is in the latest
	// state. If the event store does not return any events, a stays untouched.
	Fetch(ctx context.Context, a Aggregate) error

	// FetchVersion fetches the events for the given Aggregate from the event
	// store, beginning from version a.AggregateVersion()+1 up to v and applies
	// them to a, so that a is in the state of the time of the event with
	// version v. If the event store does not return any events, a stays
	// untouched.
	FetchVersion(ctx context.Context, a Aggregate, v int) error

	// Query queries the Event Store for Aggregates and returns a channel of
	// Histories and an error channel. If the query fails, Query returns nil
	// channels and an error.
	//
	// A History can be applied on an Aggregate to reconstruct its state from
	// the History.
	//
	// The Drain function can be used to get the result of the stream as slice
	// and a single error:
	//
	//	res, errs, err := r.Query(context.TODO(), query.New())
	//	// handle err
	//	appliers, err := stream.Drain(context.TODO(), res, errs)
	//	// handle err
	//	for _, app := range appliers {
	//		// Initialize your Aggregate.
	//		var a Aggregate = newAggregate(app.AggregateName(), app.AggregateID())
	//		a.Apply(a)
	//	}
	Query(ctx context.Context, q Query) (<-chan History, <-chan error, error)

	// Delete deletes an Aggregate by deleting its Events from the Event Store.
	Delete(ctx context.Context, a Aggregate) error
}

Repository is the aggregate repository. It saves and fetches aggregates to and from the underlying event store.

type SortDirection

type SortDirection int

SortDirection is a sorting direction.

func (SortDirection) Bool

func (dir SortDirection) Bool(b bool) bool

Bool returns either b if dir=SortAsc or !b if dir=SortDesc.

type SortOptions

type SortOptions struct {
	Sort Sorting
	Dir  SortDirection
}

SortOptions defines the sorting behaviour for a Query.

type Sorting

type Sorting int

Sorting is a sorting.

func (Sorting) Compare

func (s Sorting) Compare(a, b Aggregate) (cmp int8)

Compare compares a and b and returns -1 if a < b, 0 if a == b or 1 if a > b.

type Tuple deprecated

type Tuple = Ref

Deprecated: Use Ref instead.

Directories

Path Synopsis
Package mock_aggregate is a generated GoMock package.
Package mock_aggregate is a generated GoMock package.
Package ref provides utilities for working with aggregate.Ref.
Package ref provides utilities for working with aggregate.Ref.
mocks
Package mock_snapshot is a generated GoMock package.
Package mock_snapshot is a generated GoMock package.
Deprecated: Use github.com/modernice/goes/aggregate/ref instead
Deprecated: Use github.com/modernice/goes/aggregate/ref instead

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL