aggregate

package
v0.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 6, 2023 License: Apache-2.0 Imports: 12 Imported by: 9

README

Aggregates

Package aggregate provides a framework for building event-sourced aggregates. It builds on top of the event system, so make sure to read the event documentation first before reading further.

Introduction

An aggregate is any type that implements the Aggregate interface:

package aggregate

type Aggregate interface {
	// Aggregate returns the id, name and version of the aggregate.
	Aggregate() (uuid.UUID, string, int)

	// AggregateChanges returns the uncommited events of the aggregate.
	AggregateChanges() []event.Event

	// ApplyEvent applies an event to the aggregate.
	ApplyEvent(event.Event)
}

You can either implement this interface by yourself or embed the *Base type. Use the New function to initialize *Base:

package example

// Foo is the "foo" aggregate.
type Foo struct {
	*aggregate.Base
}

// NewFoo returns the "foo" aggregate with the given id.
func NewFoo(id uuid.UUID) *Foo {
	return &Foo{
		Base: aggregate.New("foo", id),
	}
}
Additional APIs

Aggregates can make use of additional, optional APIs provided by goes. An aggregate that embeds *Base implements all of these APIs automatically:

Read the documentation for each of these interfaces for more details.

Aggregate events

An event-sourced aggregate transitions its state by applying events on itself. Events are applied by the ApplyEvent(event.Event) method of the aggregate. Here is a minimal "todo list" example:

package todo

type List struct {
	*aggregate.Base

	Tasks []string
}

// NewList returns the todo list with the given id.
func NewList(id uuid.UUID) *User {
	return &List{Base: aggregate.New("list", id)}
}

func (l *List) ApplyEvent(evt event.Event) {
	switch evt.Name() {
	case "task_added":
		l.Tasks = append(l.Tasks, evt.Data().(string))
	case "task_removed":
		name := evt.Data().(string)
		for i, task := range l.Tasks {
			if task == name {
				l.Tasks = append(l.Tasks[:i], l.Tasks[i+1:]...)
				return
			}
		}
	}
}

The todo list now knows how to apply "task_added" and "task_removed" events. What's missing are the commands to actually create the events and call the ApplyEvent method with the created event:

// ... previous code ...

// AddTask adds the given task to the list.
func (l *List) AddTask(task string) error {
	if l.Contains(task) {
		return fmt.Errorf("list already contains %q", task)
	}

	// aggregate.Next() creates the event and applies it using l.ApplyEvent()
	aggregate.Next(l, "task_added", task)

	return nil
}

// RemoveTask removes the given task from the list.
func (l *List) RemoveTask(task string) error {
	if !l.Contains(task) {
		return fmt.Errorf("list does not contain %q", task)
	}

	aggregate.Next(l, "task_removed", task)

	return nil
}

// Contains returns whether the list contains the given task.
func (l *List) Contains(task string) bool {
	task = strings.ToLower(task)
	for _, t := range l.Tasks {
		if strings.ToLower(t) == task {
			return true
		}
	}
	return false
}

That's it. Now you can create todo lists, add tasks, and remove them again:

// ... previous code ...

func example() {
	list := NewList(uuid.New())

	if err := list.AddTask("do this and that"); err != nil {
		panic(fmt.Errorf("add task: %w", err))
	}

	if err := list.RemoveTask("do this and that"); err != nil {
		panic(fmt.Errorf("remove task: %w", err))
	}

	// list.AggregateVersion() == 2
	// list.AggregateChanges() returns two events – one "task_added" event
	// and one "task_removed" event.
}

Generic helpers

Applying events within the ApplyEvent function is the most straightforward way to implement an aggregate but can become quite messy if an aggregate consists of many events.

goes provides type-safe, generic helpers that allow you to setup an event applier function for each individual event. This is what the todo list example looks like using generics:

package todo

import (
	"github.com/google/uuid"
	"github.com/modernice/goes/aggregate"
	"github.com/modernice/goes/event"
)

type List struct {
	*aggregate.Base

	Tasks []string
}

func NewList(id uuid.UUID) *List {
	l := &List{Base: aggregate.New("list", id)}

	event.ApplyWith(l, l.addTask, "task_added")
	event.ApplyWith(l, l.removeTask, "task_removed")

	return l
}

func (l *List) AddTask(task string) error { ... }
func (l *List) RemoveTask(task string) error { ... }

func (l *List) addTask(evt event.Of[string]) {
	l.Tasks = append(l.Tasks, evt.Data())
}

func (l *List) removeTask(evt event.Of[string]) {
	name := evt.Data()
	for i, task := range l.Tasks {
		if task == name {
			l.Tasks = append(l.Tasks[:i], l.Tasks[i+1:]...)
			return
		}
	}
}

Testing

TL;DR

Use the test.Change() and test.NoChange() testing helpers to ensure correct implementation of aggregate methods.

package todo_test

import (
	"github.com/modernice/goes/test"
)

func TestNewList(t *testing.T) {
	// Test that todo.NewList() returns a valid aggregate.
	test.NewAggregate(t, todo.NewList, "list")
}

func TestXXX(t *testing.T) {
	// Aggregate should have applied and recorded the given event.
	test.Change(t, foo, "<event-name>")

	// Aggregate should have applied and recorded the given event with
	// the given event data.
	test.Change(t, foo, "<event-name>", test.EventData(<event-data>))

	// Aggregate should have applied and recorded the given event with
	// the given event data exactly 3 times.
	test.Change(
		t, foo, "<event-name>",
		test.EventData(<event-data>),
		test.Exactly(3),
	)

	// Aggregate should NOT have applied and recorded the given event.
	test.NoChange(t, foo, "<event-name>")

	// Aggregate should NOT have applied and recorded the given event with
	// the given event data.
	test.NoChange(t, foo, "<event-name>", test.EventData(<event-data>))
}

Testing of aggregates can become error-prone if one forgets to consider that aggregates are event-sourced. Take a look at this example:

package todo_test

func TestList_AddTask(t *testing.T) {
	l := todo.NewList(uuid.New())

	if l.Contains("foo") {
		t.Fatalf("list should not contain %q until added", "foo")
	}

	if err := l.AddTask("foo"); err != nil {
		t.Fatalf("failed to add task %q", "foo")
	}

	if !l.Contains("foo") {
		t.Fatalf("list should contain %q after adding", "foo")
	}
}

Even if the above test suceeds, it does not guarantee that the aggregate was implemented correctly. The following AddTask implementation bypasses the indirection through the ApplyEvent method and updates the state directly, resulting in a passing test even though the aggregate would behave incorrectly when used in goes' components.

package todo

func (l *List) AddTask(task string) error {
	l.Tasks = append(l.Tasks, task)
}

To circumvent this issue, goes provides helpers to test aggregate changes. The above test would be rewritten as:

package todo_test

import "github.com/modernice/goes/test"

func TestList_AddTask(t *testing.T) {
	l := todo.NewList(uuid.New())

	if l.Contains("foo") {
		t.Fatalf("list should not contain %q until added", "foo")
	}

	if err := l.AddTask("foo"); err != nil {
		t.Fatalf("failed to add task %q", "foo")
	}

	if !l.Contains("foo") {
		t.Fatalf("list should contain %q after adding", "foo")
	}

	test.Change(t, l, "task_added", test.EventData("foo"))
}

The test.Change() helper checks if the aggregate has recorded a "task_added" change with "foo" as the event data.

Persistence

The Repository type defines an aggregate repository that allows you to save and fetch aggregates to and from an underlying event store:

package aggregate

type Repository interface {
	Save(ctx context.Context, a Aggregate) error
	Fetch(ctx context.Context, a Aggregate) error
	FetchVersion(ctx context.Context, a Aggregate, v int) error
	Query(ctx context.Context, q Query) (<-chan History, <-chan error, error)
	Use(ctx context.Context, a Aggregate, fn func() error) error
	Delete(ctx context.Context, a Aggregate) error
}

The implementation of this repository can be found in the repository package. Use repository.New to create a repository from an event store:

package example

import (
	"github.com/modernice/goes/aggregate/repository"
	"github.com/modernice/goes/event"
)

func example(store event.Store) {
	repo := repository.New(store)
}
Save an aggregate
package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())
	l.AddTask("foo")
	l.AddTask("bar")
	l.AddTask("baz")

	if err := repo.Save(context.TODO(), l); err != nil {
		panic(fmt.Errorf("save todo list: %w", err))
	}
}
Fetch an aggregate

In order to fetch an aggregate, it must be passed to Repository.Fetch(). The repository fetches and applies the event stream of the aggregate to reconstruct its current state.

An aggregate does not need to have an event stream to be fetched; if an aggregate has no events, Repository.Fetch() is a no-op.

Fetching an aggregate multiple times is also not a problem because the repository will only fetch and apply events that haven't been applied yet. This also means that Repository.Fetch() can be used to "refresh" an aggregate – to get to its most current state without fetching unnecessary events.

package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())

	if err := repo.Fetch(context.TODO(), l); err != nil {
		panic(fmt.Errorf(
			"fetch todo list: %w [id=%s]", err, l.AggregateID(),
		))
	}
}

You can also fetch a specific version of an aggregate, ignoring all events with a version higher than the provided version:

package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())

	if err := repo.FetchVersion(context.TODO(), l, 5); err != nil {
		panic(fmt.Errorf(
			"fetch todo list at version %d: %w [id=%s]",
			5, err, l.AggregateID(),
		))
	}
}
"Use" an aggregate

Repository.Use() is a convenience method to fetch an aggregate, "use" it, and then insert new changes into the event store:

package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())

	if err := repo.Use(context.TODO(), l, func() error {
		return l.AddTask("foo")
	}); err != nil {
		panic(err)
	}
}
Delete an aggregate

Hard-deleting aggregates should be avoided because that can lead to esoteric issues that are hard to debug. Consider using soft-deletes instead.

To delete an aggregate, the repository deletes its event stream from the event store:

package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())

	if err := repo.Delete(context.TODO(), l); err != nil {
		panic(fmt.Errorf(
			"delete todo list: %w [id=%s]", err, l.AggregateID(),
		))
	}
}
Soft-delete an aggregate

Soft-deleted aggregates cannot be fetched and are excluded from query results of aggregate repositories. In order to soft-delete an aggregate, a specific event that flags the aggregate as soft-deleted must be inserted into the event store. The event must have event data that implements the SoftDeleter interface:

package example

type DeletedData struct {}

func (DeletedData) SoftDelete() bool { return true }

func example() {
	evt := event.New("deleted", DeletedData{}, event.Aggregate(...))
}

If the event stream of an aggregate contains such an event, the aggregate is considered to be soft-deleted and will be excluded from query results of the aggregate repository. Additionally, the Repository.Fetch() method will return repository.ErrDeleted for the aggregate.

Soft-deleted aggregates can also be restored by inserting an event with event data that implements SoftRestorer:

package example

type RestoredData struct {}

func (RestoredData) SoftRestore() bool { return true }

func example() {
	evt := event.New("restored", RestoredData{}, event.Aggregate(...))
}
Query aggregates

Aggregates can be queried from the event store. When queried, the repository returns a History channel (lol) and an error channel. A History can be applied to an aggregate to reconstruct its current state.

package example

import (
	"github.com/modernice/goes/aggregate"
	"github.com/modernice/goes/aggregate/query"
	"github.com/modernice/goes/helper/streams"
)

func example(repo aggregate.Repository) {
	res, errs, err := repo.Query(context.TODO(), query.New(
		// Query "foo", "bar", and "baz" aggregates.
		query.Name("foo", "bar", "baz"),

		// Query aggregates that have one of the provided ids.
		query.ID(uuid.UUID{...}, uuid.UUID{...}),
	))

	if err := streams.Walk(
		context.TODO(),
		func(his aggregate.History) error {
			log.Printf(
				"Name: %s ID: %s",
				his.AggregateName(),
				his.AggregateID(),
			)

			var foo aggregate.Aggregate // fetch the aggregate
			his.Apply(foo) // apply the history
		},
		res,
		errs,
	); err != nil {
		panic(err)
	}
}
Typed repositories

The Repository interface defines a generic aggregate repository for all kinds of aggregates. The TypedRepository can be used to define a type-safe repository for a specific aggregate. The TypedRepository removes the need for passing the aggregate instance to repository methods.

To create a type-safe repository for an aggregate, pass the Repository and the constructor of the aggregate to repository.Typed():

package todo

import (
	"github.com/modernice/goes/aggregate"
	"github.com/modernice/goes/aggregate/repository"
	"github.com/modernice/goes/event"
)

// List is the "todo list" aggregate.
type List struct { *aggregate.Base }

// ListRepository is the "todo list" repository.
type ListRepository = aggregate.TypedRepository[*List]

// NewList returns the "todo list" with the given id.
func NewList(id uuid.UUID) *List {
	return &List{Base: aggregate.New("list", id)}
}

// NewListRepository returns the "todo list" repository.
func NewListRepository(repo aggregate.Repository) ListRepository {
	return repository.Typed(repo, NewList)
}

func example(store event.Store) {
	repo := repository.New(store)
	lists := NewListRepository(repo)

	// Fetch a todo list by id.
	l, err := lists.Fetch(context.TODO(), uuid.New())
	if err != nil {
		panic(fmt.Errorf("fetch list: %w", err))
	}
	// l is a *List

	// "Use" a list by id.
	if err := lists.Use(context.TODO(), uuid.New(), func(l *List) error {
		return l.AddTask("foo")
	}); err != nil {
		panic(fmt.Errof("use list: %w", err))
	}

	// The TypedRepository will only ever return *List aggregates.
	// All other aggregates that would be returned by the passed query,
	// are simply discarded from the result.
	res, errs, err := lists.Query(context.TODO(), query.New(...))
}

Documentation

Index

Constants

View Source
const (
	// InconsistentID is a ConsistencyKind indicating that an event has an
	// inconsistent AggregateID with the expected one.
	InconsistentID = ConsistencyKind(iota + 1)

	// InconsistentName is a ConsistencyKind representing an error caused by an
	// event having an invalid AggregateName that does not match the expected name
	// for the aggregate.
	InconsistentName

	// InconsistentVersion indicates an inconsistency in the version of an Event
	// within an Aggregate. This occurs when the version of an event is less than or
	// equal to the current version of the aggregate, or when the version of a new
	// event is less than or equal to the version of a previous event.
	InconsistentVersion

	// InconsistentTime indicates that an event has an invalid time, meaning it
	// occurred before its preceding event in the sequence of events being
	// validated.
	InconsistentTime
)
View Source
const (
	// SortName is a Sorting option that compares two Aggregates by their names in
	// ascending order. It is used in the Query interface for specifying the desired
	// sorting method when querying Aggregates.
	SortName = Sorting(iota)

	// SortID is a Sorting value used to sort Aggregates by their ID when comparing
	// them.
	SortID

	// SortVersion is a Sorting option that compares Aggregates based on their
	// version, ordering them by ascending or descending version numbers depending
	// on the SortDirection.
	SortVersion

	// SortAsc is a SortDirection that indicates sorting of aggregates in ascending
	// order based on the specified Sorting criteria.
	SortAsc = SortDirection(iota)

	// SortDesc is a [SortDirection] constant that specifies the descending order
	// for sorting Aggregates by their name, ID, or version.
	SortDesc
)

Variables

This section is empty.

Functions

func ApplyHistory

func ApplyHistory[Events ~[]event.Of[any]](a Aggregate, events Events) error

ApplyHistory applies a sequence of events to the given Aggregate, ensuring consistency before applying. If the Aggregate implements the Committer interface, changes are recorded and committed after applying the events. Returns an error if consistency validation fails.

func IsConsistencyError added in v0.1.2

func IsConsistencyError(err error) bool

IsConsistencyError checks if the given error is a ConsistencyError or an error implementing the IsConsistencyError method returning true.

func Next added in v0.1.2

func Next[Data any](a Aggregate, name string, data Data, opts ...event.Option) event.Evt[Data]

Next creates a new event with the provided name and data, applies it to the given aggregate, and records the change if the aggregate implements the Committer interface. The event is assigned the next available version and a timestamp that is guaranteed to be at least 1 nanosecond after the previous event.

func NextEvent deprecated

func NextEvent[D any](a Aggregate, name string, data D, opts ...event.Option) event.Evt[D]

Deprecated: Use Next instead.

func NextVersion

func NextVersion(a Aggregate) int

NextVersion returns the next version number for the given Aggregate, taking into account both its committed and uncommitted changes.

func UncommittedVersion

func UncommittedVersion(a Aggregate) int

UncommittedVersion returns the version of the given Aggregate after applying all uncommitted changes. It takes into account both the current version and any uncommitted events to calculate the resulting version.

func ValidateConsistency

func ValidateConsistency[Data any, Events ~[]event.Of[Data]](ref Ref, currentVersion int, events Events, opts ...ConsistencyOption) error

ValidateConsistency checks the consistency of the provided events with the given aggregate reference and current version. It returns a ConsistencyError if any inconsistency is found, such as mismatched IDs, names, versions, or event times. The consistency check can be configured with ConsistencyOption functions.

Types

type Aggregate

type Aggregate = Of[uuid.UUID]

Aggregate represents an entity that encapsulates a collection of events and their resulting state. It provides methods to apply new events, access the current state, and retrieve the list of uncommitted changes. It can also be used in conjunction with Committer to record and commit changes. Additionally, Aggregate supports soft deletion and restoration through the SoftDeleter and SoftRestorer interfaces.

func Sort

func Sort(as []Aggregate, s Sorting, dir SortDirection) []Aggregate

Sort sorts the given Aggregates ([]Aggregate) according to the specified Sorting and SortDirection. The sorted Aggregates are returned as a new slice without modifying the input slice.

func SortMulti

func SortMulti(as []Aggregate, sorts ...SortOptions) []Aggregate

SortMulti sorts a slice of Aggregates by multiple SortOptions in the order they are provided. If two Aggregates have the same value for a SortOption, the next SortOption in the list is used to determine their order.

type Base

type Base struct {
	ID      uuid.UUID
	Name    string
	Version int
	Changes []event.Event
	// contains filtered or unexported fields
}

Base provides the core of an event-sourced aggregate. When embedded into an aggregate, the aggregate will implement these APIs:

  • aggregate.Aggregate
  • aggregate.Committer
  • repository.ChangeDiscarder
  • snapshot.Aggregate

func New

func New(name string, id uuid.UUID, opts ...Option) *Base

New creates a new Base aggregate with the specified name and UUID, applying the provided options. The returned Base can be embedded into custom aggregates to provide core functionality for event-sourced aggregates.

func (*Base) Aggregate

func (b *Base) Aggregate() (uuid.UUID, string, int)

Aggregate returns the ID, name, and version of an event-sourced aggregate. It is used to retrieve information about the aggregate without accessing its fields directly.

func (*Base) AggregateChanges

func (b *Base) AggregateChanges() []event.Event

AggregateChanges returns the uncommitted changes (events) of the event-sourced aggregate. These are the events that have been recorded but not yet committed to the event store.

func (*Base) AggregateID

func (b *Base) AggregateID() uuid.UUID

AggregateID returns the UUID of the aggregate associated with the Base struct.

func (*Base) AggregateName

func (b *Base) AggregateName() string

AggregateName returns the name of the aggregate.

func (*Base) AggregateVersion

func (b *Base) AggregateVersion() int

AggregateVersion returns the current version of the aggregate. The version is incremented when events are committed to the aggregate.

func (*Base) ApplyEvent

func (b *Base) ApplyEvent(evt event.Event)

ApplyEvent applies the given event to the aggregate by calling the appropriate event handler registered for the event's name. The event must have been created with the aggregate's ID, name, and version.

func (*Base) Commit

func (b *Base) Commit()

Commit updates the aggregate version to the version of its latest change and clears the changes. If there are no changes, nothing is done.

func (*Base) CurrentVersion added in v0.2.0

func (b *Base) CurrentVersion() int

CurrentVersion returns the current version of the aggregate, which is the sum of its base version and the number of uncommitted changes.

func (*Base) DiscardChanges added in v0.1.2

func (b *Base) DiscardChanges()

DiscardChanges resets the list of recorded changes to an empty state, effectively discarding any uncommitted changes made to the aggregate.

func (*Base) ModelID added in v0.1.2

func (b *Base) ModelID() uuid.UUID

ModelID returns the UUID of the aggregate that the Base is embedded into.

func (*Base) RecordChange added in v0.1.2

func (b *Base) RecordChange(events ...event.Event)

RecordChange appends the provided events to the Changes slice of the Base aggregate.

func (*Base) Ref added in v0.2.0

func (b *Base) Ref() Ref

Ref returns a Ref object containing the Name and ID of the aggregate.

func (*Base) SetVersion

func (b *Base) SetVersion(v int)

SetVersion manually sets the version of the aggregate.

SetVersion implements snapshot.Aggregate.

type Committer

type Committer interface {
	// RecordChange records the given events and associates them with the aggregate
	// in the Committer. These events will be applied and persisted when Commit is
	// called.
	RecordChange(...event.Event)

	// Commit records the changes made to an aggregate by appending the recorded
	// events to the aggregate's event stream.
	Commit()
}

Committer is an interface that handles recording and committing changes in the form of events to an aggregate. It provides methods for recording changes as a series of events and committing them to update the aggregate's state.

type ConsistencyError

type ConsistencyError struct {
	Kind           ConsistencyKind
	Aggregate      Ref
	CurrentVersion int
	Events         []event.Event
	EventIndex     int
}

ConsistencyError represents an error that occurs when the consistency of an aggregate's events is violated. It provides information about the kind of inconsistency, the aggregate reference, the current version of the aggregate, the list of events, and the index of the event causing the inconsistency.

func (*ConsistencyError) Error

func (err *ConsistencyError) Error() string

Error returns a string representation of the ConsistencyError, describing the inconsistency found in the aggregate event. It includes details such as the event name, expected and actual values for AggregateID, AggregateName, AggregateVersion, or Time depending on the Kind of inconsistency.

func (*ConsistencyError) Event

func (err *ConsistencyError) Event() event.Event

Event returns the event that caused the ConsistencyError at the EventIndex. If the EventIndex is out of bounds, it returns nil.

func (*ConsistencyError) IsConsistencyError added in v0.1.2

func (err *ConsistencyError) IsConsistencyError() bool

IsConsistencyError determines if the given error is a ConsistencyError or an error that implements the IsConsistencyError method. It returns true if either condition is met, otherwise false.

type ConsistencyKind

type ConsistencyKind int

ConsistencyKind represents the kind of inconsistency found in an aggregate's events when validating their consistency. Possible kinds are InconsistentID, InconsistentName, InconsistentVersion, and InconsistentTime.

func (ConsistencyKind) String

func (k ConsistencyKind) String() string

String returns a string representation of the ConsistencyKind value, such as "<InconsistentID>", "<InconsistentName>", "<InconsistentVersion>", or "<InconsistentTime>".

type ConsistencyOption added in v0.2.9

type ConsistencyOption func(*consistencyValidation)

ConsistencyOption is a functional option type used to configure consistency validation for event aggregates. It allows customizing the behavior of the ValidateConsistency function, such as ignoring time consistency checks.

func IgnoreTime added in v0.2.9

func IgnoreTime(ignore bool) ConsistencyOption

IgnoreTime returns a ConsistencyOption that configures whether time consistency checks should be ignored when validating aggregate event consistency. If set to true, the validation will not check if events are ordered by their time.

type History

type History interface {
	// Aggregate returns the Ref of the aggregate to which this History belongs. It
	// also applies the history to the given Aggregate, ensuring that events are
	// applied in a consistent manner.
	Aggregate() Ref

	// Apply applies the history of events to the given Aggregate, ensuring
	// consistency and updating the Aggregate's state accordingly. If the Aggregate
	// implements the Committer interface, changes are recorded and committed.
	Apply(Aggregate)
}

History represents an interface for managing the application and consistency of events on an Aggregate. It provides methods to reference the associated Aggregate and apply changes to it.

type Of added in v0.1.2

type Of[ID comparable] interface {
	// Aggregate returns the ID, name, and version of an [Of] aggregate. It also
	// provides methods for obtaining the aggregate's changes as a slice of
	// [event.Event] and applying an event to the aggregate.
	Aggregate() (ID, string, int)

	// AggregateChanges returns a slice of all uncommitted events that have been
	// recorded for the aggregate.
	AggregateChanges() []event.Event

	// ApplyEvent applies the given event.Event to the aggregate, updating its state
	// according to the event data.
	ApplyEvent(event.Event)
}

Of is an interface that represents an event-sourced aggregate with a comparable ID. It provides methods to access the aggregate's ID, name, version, and uncommitted changes, as well as to apply an event to the aggregate. Implementations of this interface can be used alongside other types such as Committer, SoftDeleter, and SoftRestorer to facilitate event sourcing and soft deletion/restoration of aggregates.

type Option

type Option func(*Base)

Option is a function that configures a Base aggregate. It is used as an argument in the New function to customize the created aggregate.

func Version

func Version(v int) Option

Version is an Option that sets the Version of the Base struct when creating a new aggregate with the New function.

type Query

type Query interface {
	// Names returns a slice of aggregate names to be included in the query.
	Names() []string

	// IDs returns a slice of UUIDs that the Query is constrained to.
	IDs() []uuid.UUID

	// Versions returns the version constraints for a Query, which are used to
	// filter the queried Aggregates based on their version.
	Versions() version.Constraints

	// Sortings returns a slice of SortOptions that represent the sorting options
	// applied to a Query. The sorting options dictate the order in which Aggregates
	// are returned when executing the Query.
	Sortings() []SortOptions
}

Query represents a set of criteria for filtering and sorting Aggregates in a Repository. It defines which Aggregates should be included based on their names, IDs, and versions, as well as the order in which they should be sorted.

type Ref

type Ref = event.AggregateRef

Ref is a reference to a specific aggregate, identified by its name and id.

type Repository

type Repository interface {
	// Save persists the given Aggregate to the Repository. Returns an error if the
	// operation fails.
	Save(ctx context.Context, a Aggregate) error

	// Fetch retrieves the latest state of the specified Aggregate from the
	// Repository and updates its state. It returns an error if the Aggregate cannot
	// be fetched.
	Fetch(ctx context.Context, a Aggregate) error

	// FetchVersion retrieves the specified version of an Aggregate from the
	// Repository and updates the provided Aggregate with the fetched data. It
	// returns an error if the version cannot be fetched or if the provided
	// Aggregate is not compatible with the fetched data.
	FetchVersion(ctx context.Context, a Aggregate, v int) error

	// Query executes a query on the Repository and returns channels for receiving
	// aggregate histories and errors. It takes a context and a Query as arguments.
	Query(ctx context.Context, q Query) (<-chan History, <-chan error, error)

	// Use applies the provided function fn to the given Aggregate a inside a
	// Repository, ensuring proper synchronization and error handling in the given
	// context. The function returns an error if any occurs during the execution of
	// fn or while interacting with the Repository.
	Use(ctx context.Context, a Aggregate, fn func() error) error

	// Delete removes the specified Aggregate from the Repository. It returns an
	// error if the deletion fails.
	Delete(ctx context.Context, a Aggregate) error
}

Repository is an interface for persisting, fetching, querying, and deleting Aggregates. It provides methods for saving and fetching specific versions of an Aggregate, as well as querying multiple Aggregates based on a Query specification.

type SoftDeleter added in v0.1.2

type SoftDeleter interface{ SoftDelete() bool }

SoftDeleter is an API that can be implemented by event data to soft-delete an aggregate. Soft-deleted aggregates are excluded from query results of the aggregate repository. When trying to fetch a soft-deleted aggregate from a repository, a repository.ErrDeleted error is returned. Soft-deleted aggregates can be restored by SoftRestorer events.

To soft-delete an aggregate, an event with event data that implements SoftDeleter must be inserted into the aggregate's event stream. The SoftDelete() method of the event data must return true if the aggregate should be soft-deleted.

type DeletedData struct {}
func (DeletedData) SoftDelete() bool { return true }

type Foo struct { *aggregate.Base }
func (f *Foo) Delete() {
	aggregate.Next(f, "foo.deleted", DeletedData{})
}

type SoftRestorer added in v0.1.2

type SoftRestorer interface{ SoftRestore() bool }

SoftRestorer is an API that can be implemented by event data to restore a soft-deleted aggregate.

To restore a soft-deleted aggregate, an event with event data that implements SoftRestorer must be inserted into the aggregate's event stream. The SoftDelete() method of the event data must return true if the aggregate should be restored.

type RestoredData struct {}
func (RestoredData) SoftRestore() bool { return true }

type Foo struct { *aggregate.Base }
func (f *Foo) Delete() {
	aggregate.Next(f, "foo.restored", RestoredData{})
}

type SortDirection

type SortDirection int

SortDirection is an enumeration representing the direction of sorting (ascending or descending) when comparing Aggregates in a Query. It is used alongside Sorting to define the sorting criteria for querying Aggregates from a Repository.

func (SortDirection) Bool

func (dir SortDirection) Bool(b bool) bool

Bool returns true if the SortDirection is SortAsc and the given boolean is true, or if the SortDirection is SortDesc and the given boolean is false. Otherwise, it returns false.

type SortOptions

type SortOptions struct {
	Sort Sorting
	Dir  SortDirection
}

SortOptions is a struct that defines the sorting configuration for a query by specifying the sort field (Sorting) and sort direction (SortDirection). It is used in Query to determine the order of returned Aggregates.

type Sorting

type Sorting int

Sorting is a type that represents the sorting criteria for Aggregates in a Query. It provides methods to compare Aggregates based on their name, ID, or version. Use the constants SortName, SortID, and SortVersion to specify the desired sorting criteria.

func (Sorting) Compare

func (s Sorting) Compare(a, b Aggregate) (cmp int8)

Compare compares two Aggregates based on the specified Sorting criteria. It returns a negative value if the first Aggregate comes before the second, a positive value if it comes after, and 0 if they are equal according to the chosen Sorting.

type TypedAggregate added in v0.1.2

type TypedAggregate interface {
	model.Model[uuid.UUID]
	Aggregate
}

TypedAggregate is an interface that composes the model.Model[uuid.UUID] and Aggregate interfaces, providing functionality for creating, fetching, and manipulating Aggregates with UUIDs as their identifier.

type TypedRepository added in v0.1.2

type TypedRepository[A TypedAggregate] interface {
	model.Repository[A, uuid.UUID]

	// FetchVersion retrieves an Aggregate with the specified UUID and version from
	// the TypedRepository. It returns the fetched Aggregate and an error if any
	// occurs during the fetch operation.
	FetchVersion(ctx context.Context, id uuid.UUID, version int) (A, error)

	// Query returns a channel of [TypedAggregate]s that match the specified query,
	// a channel for errors during the query execution, and an error if the query
	// can't be started. The returned channels must be fully consumed to avoid
	// goroutine leaks.
	Query(ctx context.Context, q Query) (<-chan A, <-chan error, error)
}

TypedRepository is a specialized Repository for managing TypedAggregate instances, providing additional methods for fetching specific versions of an aggregate and querying with type safety. It extends the functionality of model.Repository, allowing for more fine-grained control over aggregate persistence and retrieval.

Directories

Path Synopsis
internal
mocks
Package mock_snapshot is a generated GoMock package.
Package mock_snapshot is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL