aggregate

package
v0.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2022 License: Apache-2.0 Imports: 12 Imported by: 9

README

Aggregates

Package aggregate provides a framework for building event-sourced aggregates. It builds on top of the event system, so make sure to read the event documentation first before reading further.

Introduction

An aggregate is any type that implements the Aggregate interface:

package aggregate

type Aggregate interface {
	// Aggregate returns the id, name and version of the aggregate.
	Aggregate() (uuid.UUID, string, int)

	// AggregateChanges returns the uncommited events of the aggregate.
	AggregateChanges() []event.Event

	// ApplyEvent applies an event to the aggregate.
	ApplyEvent(event.Event)
}

You can either implement this interface by yourself or embed the *Base type. Use the New function to initialize *Base:

package example

// Foo is the "foo" aggregate.
type Foo struct {
	*aggregate.Base
}

// NewFoo returns the "foo" aggregate with the given id.
func NewFoo(id uuid.UUID) *Foo {
	return &Foo{
		Base: aggregate.New("foo", id),
	}
}
Additional APIs

Aggregates can make use of additional, optional APIs provided by goes. An aggregate that embeds *Base implements all of these APIs automatically:

Read the documentation for each of these interfaces for more details.

Aggregate events

An event-sourced aggregate transitions its state by applying events on itself. Events are applied by the ApplyEvent(event.Event) method of the aggregate. Here is a minimal "todo list" example:

package todo

type List struct {
	*aggregate.Base

	Tasks []string
}

// NewList returns the todo list with the given id.
func NewList(id uuid.UUID) *User {
	return &List{Base: aggregate.New("list", id)}
}

func (l *List) ApplyEvent(evt event.Event) {
	switch evt.Name() {
	case "task_added":
		l.Tasks = append(l.Tasks, evt.Data().(string))
	case "task_removed":
		name := evt.Data().(string)
		for i, task := range l.Tasks {
			if task == name {
				l.Tasks = append(l.Tasks[:i], l.Tasks[i+1:]...)
				return
			}
		}
	}
}

The todo list now knows how to apply "task_added" and "task_removed" events. What's missing are the commands to actually create the events and call the ApplyEvent method with the created event:

// ... previous code ...

// AddTask adds the given task to the list.
func (l *List) AddTask(task string) error {
	if l.Contains(task) {
		return fmt.Errorf("list already contains %q", task)
	}

	// aggregate.Next() creates the event and applies it using l.ApplyEvent()
	aggregate.Next(l, "task_added", task)

	return nil
}

// RemoveTask removes the given task from the list.
func (l *List) RemoveTask(task string) error {
	if !l.Contains(task) {
		return fmt.Errorf("list does not contain %q", task)
	}

	aggregate.Next(l, "task_removed", task)

	return nil
}

// Contains returns whether the list contains the given task.
func (l *List) Contains(task string) bool {
	task = strings.ToLower(task)
	for _, t := range l.Tasks {
		if strings.ToLower(t) == task {
			return true
		}
	}
	return false
}

That's it. Now you can create todo lists, add tasks, and remove them again:

// ... previous code ...

func example() {
	list := NewList(uuid.New())

	if err := list.AddTask("do this and that"); err != nil {
		panic(fmt.Errorf("add task: %w", err))
	}

	if err := list.RemoveTask("do this and that"); err != nil {
		panic(fmt.Errorf("remove task: %w", err))
	}

	// list.AggregateVersion() == 2
	// list.AggregateChanges() returns two events – one "task_added" event
	// and one "task_removed" event.
}

Generic helpers

Applying events within the ApplyEvent function is the most straightforward way to implement an aggregate but can become quite messy if an aggregate consists of many events.

goes provides type-safe, generic helpers that allow you to setup an event applier function for each individual event. This is what the todo list example looks like using generics:

package todo

import (
	"github.com/google/uuid"
	"github.com/modernice/goes/aggregate"
	"github.com/modernice/goes/event"
)

type List struct {
	*aggregate.Base

	Tasks []string
}

func NewList(id uuid.UUID) *List {
	l := &List{Base: aggregate.New("list", id)}

	event.ApplyWith(l, l.addTask, "task_added")
	event.ApplyWith(l, l.removeTask, "task_removed")

	return l
}

func (l *List) AddTask(task string) error { ... }
func (l *List) RemoveTask(task string) error { ... }

func (l *List) addTask(evt event.Of[string]) {
	l.Tasks = append(l.Tasks, evt.Data())
}

func (l *List) removeTask(evt event.Of[string]) {
	name := evt.Data()
	for i, task := range l.Tasks {
		if task == name {
			l.Tasks = append(l.Tasks[:i], l.Tasks[i+1:]...)
			return
		}
	}
}

Testing

TL;DR

Use the test.Change() and test.NoChange() testing helpers to ensure correct implementation of aggregate methods.

package todo_test

import (
	"github.com/modernice/goes/test"
)

func TestNewList(t *testing.T) {
	// Test that todo.NewList() returns a valid aggregate.
	test.NewAggregate(t, todo.NewList, "list")
}

func TestXXX(t *testing.T) {
	// Aggregate should have applied and recorded the given event.
	test.Change(t, foo, "<event-name>")

	// Aggregate should have applied and recorded the given event with
	// the given event data.
	test.Change(t, foo, "<event-name>", test.EventData(<event-data>))

	// Aggregate should have applied and recorded the given event with
	// the given event data exactly 3 times.
	test.Change(
		t, foo, "<event-name>",
		test.EventData(<event-data>),
		test.Exactly(3),
	)

	// Aggregate should NOT have applied and recorded the given event.
	test.NoChange(t, foo, "<event-name>")

	// Aggregate should NOT have applied and recorded the given event with
	// the given event data.
	test.NoChange(t, foo, "<event-name>", test.EventData(<event-data>))
}

Testing of aggregates can become error-prone if one forgets to consider that aggregates are event-sourced. Take a look at this example:

package todo_test

func TestList_AddTask(t *testing.T) {
	l := todo.NewList(uuid.New())

	if l.Contains("foo") {
		t.Fatalf("list should not contain %q until added", "foo")
	}

	if err := l.AddTask("foo"); err != nil {
		t.Fatalf("failed to add task %q", "foo")
	}

	if !l.Contains("foo") {
		t.Fatalf("list should contain %q after adding", "foo")
	}
}

Even if the above test suceeds, it does not guarantee that the aggregate was implemented correctly. The following AddTask implementation bypasses the indirection through the ApplyEvent method and updates the state directly, resulting in a passing test even though the aggregate would behave incorrectly when used in goes' components.

package todo

func (l *List) AddTask(task string) error {
	l.Tasks = append(l.Tasks, task)
}

To circumvent this issue, goes provides helpers to test aggregate changes. The above test would be rewritten as:

package todo_test

import "github.com/modernice/goes/test"

func TestList_AddTask(t *testing.T) {
	l := todo.NewList(uuid.New())

	if l.Contains("foo") {
		t.Fatalf("list should not contain %q until added", "foo")
	}

	if err := l.AddTask("foo"); err != nil {
		t.Fatalf("failed to add task %q", "foo")
	}

	if !l.Contains("foo") {
		t.Fatalf("list should contain %q after adding", "foo")
	}

	test.Change(t, l, "task_added", test.EventData("foo"))
}

The test.Change() helper checks if the aggregate has recorded a "task_added" change with "foo" as the event data.

Persistence

The Repository type defines an aggregate repository that allows you to save and fetch aggregates to and from an underlying event store:

package aggregate

type Repository interface {
	Save(ctx context.Context, a Aggregate) error
	Fetch(ctx context.Context, a Aggregate) error
	FetchVersion(ctx context.Context, a Aggregate, v int) error
	Query(ctx context.Context, q Query) (<-chan History, <-chan error, error)
	Use(ctx context.Context, a Aggregate, fn func() error) error
	Delete(ctx context.Context, a Aggregate) error
}

The implementation of this repository can be found in the repository package. Use repository.New to create a repository from an event store:

package example

import (
	"github.com/modernice/goes/aggregate/repository"
	"github.com/modernice/goes/event"
)

func example(store event.Store) {
	repo := repository.New(store)
}
Save an aggregate
package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())
	l.AddTask("foo")
	l.AddTask("bar")
	l.AddTask("baz")

	if err := repo.Save(context.TODO(), l); err != nil {
		panic(fmt.Errorf("save todo list: %w", err))
	}
}
Fetch an aggregate

In order to fetch an aggregate, it must be passed to Repository.Fetch(). The repository fetches and applies the event stream of the aggregate to reconstruct its current state.

An aggregate does not need to have an event stream to be fetched; if an aggregate has no events, Repository.Fetch() is a no-op.

Fetching an aggregate multiple times is also not a problem because the repository will only fetch and apply events that haven't been applied yet. This also means that Repository.Fetch() can be used to "refresh" an aggregate – to get to its most current state without fetching unnecessary events.

package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())

	if err := repo.Fetch(context.TODO(), l); err != nil {
		panic(fmt.Errorf(
			"fetch todo list: %w [id=%s]", err, l.AggregateID(),
		))
	}
}

You can also fetch a specific version of an aggregate, ignoring all events with a version higher than the provided version:

package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())

	if err := repo.FetchVersion(context.TODO(), l, 5); err != nil {
		panic(fmt.Errorf(
			"fetch todo list at version %d: %w [id=%s]",
			5, err, l.AggregateID(),
		))
	}
}
"Use" an aggregate

Repository.Use() is a convenience method to fetch an aggregate, "use" it, and then insert new changes into the event store:

package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())

	if err := repo.Use(context.TODO(), l, func() error {
		return l.AddTask("foo")
	}); err != nil {
		panic(err)
	}
}
Delete an aggregate

Hard-deleting aggregates should be avoided because that can lead to esoteric issues that are hard to debug. Consider using soft-deletes instead.

To delete an aggregate, the repository deletes its event stream from the event store:

package example

func example(repo aggregate.Repository) {
	l := todo.NewList(uuid.New())

	if err := repo.Delete(context.TODO(), l); err != nil {
		panic(fmt.Errorf(
			"delete todo list: %w [id=%s]", err, l.AggregateID(),
		))
	}
}
Soft-delete an aggregate

Soft-deleted aggregates cannot be fetched and are excluded from query results of aggregate repositories. In order to soft-delete an aggregate, a specific event that flags the aggregate as soft-deleted must be inserted into the event store. The event must have event data that implements the SoftDeleter interface:

package example

type DeletedData struct {}

func (DeletedData) SoftDelete() bool { return true }

func example() {
	evt := event.New("deleted", DeletedData{}, event.Aggregate(...))
}

If the event stream of an aggregate contains such an event, the aggregate is considered to be soft-deleted and will be excluded from query results of the aggregate repository. Additionally, the Repository.Fetch() method will return repository.ErrDeleted for the aggregate.

Soft-deleted aggregates can also be restored by inserting an event with event data that implements SoftRestorer:

package example

type RestoredData struct {}

func (RestoredData) SoftRestore() bool { return true }

func example() {
	evt := event.New("restored", RestoredData{}, event.Aggregate(...))
}
Query aggregates

Aggregates can be queried from the event store. When queried, the repository returns a History channel (lol) and an error channel. A History can be applied to an aggregate to reconstruct its current state.

package example

import (
	"github.com/modernice/goes/aggregate"
	"github.com/modernice/goes/aggregate/query"
	"github.com/modernice/goes/helper/streams"
)

func example(repo aggregate.Repository) {
	res, errs, err := repo.Query(context.TODO(), query.New(
		// Query "foo", "bar", and "baz" aggregates.
		query.Name("foo", "bar", "baz"),

		// Query aggregates that have one of the provided ids.
		query.ID(uuid.UUID{...}, uuid.UUID{...}),
	))

	if err := streams.Walk(
		context.TODO(),
		func(his aggregate.History) error {
			log.Printf(
				"Name: %s ID: %s",
				his.AggregateName(),
				his.AggregateID(),
			)

			var foo aggregate.Aggregate // fetch the aggregate
			his.Apply(foo) // apply the history
		},
		res,
		errs,
	); err != nil {
		panic(err)
	}
}
Typed repositories

The Repository interface defines a generic aggregate repository for all kinds of aggregates. The TypedRepository can be used to define a type-safe repository for a specific aggregate. The TypedRepository removes the need for passing the aggregate instance to repository methods.

To create a type-safe repository for an aggregate, pass the Repository and the constructor of the aggregate to repository.Typed():

package todo

import (
	"github.com/modernice/goes/aggregate"
	"github.com/modernice/goes/aggregate/repository"
	"github.com/modernice/goes/event"
)

// List is the "todo list" aggregate.
type List struct { *aggregate.Base }

// ListRepository is the "todo list" repository.
type ListRepository = aggregate.TypedRepository[*List]

// NewList returns the "todo list" with the given id.
func NewList(id uuid.UUID) *List {
	return &List{Base: aggregate.New("list", id)}
}

// NewListRepository returns the "todo list" repository.
func NewListRepository(repo aggregate.Repository) ListRepository {
	return repository.Typed(repo, NewList)
}

func example(store event.Store) {
	repo := repository.New(store)
	lists := NewListRepository(repo)

	// Fetch a todo list by id.
	l, err := lists.Fetch(context.TODO(), uuid.New())
	if err != nil {
		panic(fmt.Errorf("fetch list: %w", err))
	}
	// l is a *List

	// "Use" a list by id.
	if err := lists.Use(context.TODO(), uuid.New(), func(l *List) error {
		return l.AddTask("foo")
	}); err != nil {
		panic(fmt.Errof("use list: %w", err))
	}

	// The TypedRepository will only ever return *List aggregates.
	// All other aggregates that would be returned by the passed query,
	// are simply discarded from the result.
	res, errs, err := lists.Query(context.TODO(), query.New(...))
}

Documentation

Index

Constants

View Source
const (
	// ID means there is an inconsistency in the aggregate ids.
	InconsistentID = ConsistencyKind(iota + 1)

	// Name means there is an inconsistency in the aggregate names.
	InconsistentName

	// Version means there is an inconsistency in the event versions.
	InconsistentVersion

	// Time means there is an inconsistency in the event times.
	InconsistentTime
)
View Source
const (
	// SortName sorts aggregates by name.
	SortName = Sorting(iota)
	// SortID sorts aggregates by id.
	SortID
	// SortVersion sorts aggregates by version.
	SortVersion

	// SortAsc sorts aggregates in ascending order.
	SortAsc = SortDirection(iota)
	// SortDesc sorts aggregates in descending order.
	SortDesc
)

Variables

This section is empty.

Functions

func ApplyHistory

func ApplyHistory[Events ~[]event.Of[any]](a Aggregate, events Events) error

ApplyHistory applies an event stream to an aggregate to reconstruct its state. If the aggregate implements Committer, a.RecordChange(events) and a.Commit() are called before returning.

func IsConsistencyError added in v0.1.2

func IsConsistencyError(err error) bool

IsConsistencyError returns whether an error was caused by an inconsistency in the events of an aggregate. An error is considered a consistency error if it either unwraps to a *ConsistencyError or if it has an IsConsistencyError() bool method that return true for the given error.

func Next added in v0.1.2

func Next[Data any](a Aggregate, name string, data Data, opts ...event.Option) event.Evt[Data]

Next creates, applies and returns the next event for the given aggregate.

var foo aggregate.Aggregate
evt := aggregate.Next(foo, "name", <data>, ...)

func NextEvent deprecated

func NextEvent[D any](a Aggregate, name string, data D, opts ...event.Option) event.Evt[D]

Deprecated: Use Next instead.

func NextVersion

func NextVersion(a Aggregate) int

NextVersion returns the version that the next event of the aggregate must have.

func UncommittedVersion

func UncommittedVersion(a Aggregate) int

UncommittedVersion returns the version of the aggregate after committing the recorded changes.

func ValidateConsistency

func ValidateConsistency[Data any, Events ~[]event.Of[Data]](ref Ref, currentVersion int, events Events) error

Validate tests the consistency of aggregate changes (events).

The provided events are valid if they are correctly sorted by both version and time. No two events may have the same version or time, and their versions must be greater than 0.

Types

type Aggregate

type Aggregate = Of[uuid.UUID]

Aggregate is an event-sourced aggregate.

func Sort

func Sort(as []Aggregate, s Sorting, dir SortDirection) []Aggregate

Sort sorts aggregates and returns the sorted aggregates.

func SortMulti

func SortMulti(as []Aggregate, sorts ...SortOptions) []Aggregate

SortMulti sorts aggregates by multiple fields and returns the sorted aggregates.

type Base

type Base struct {
	ID      uuid.UUID
	Name    string
	Version int
	Changes []event.Event
	// contains filtered or unexported fields
}

Base can be embedded into aggregates to implement the goes' APIs:

  • aggregate.Aggregate
  • aggregate.Committer
  • repository.ChangeDiscarder
  • snapshot.Aggregate

func New

func New(name string, id uuid.UUID, opts ...Option) *Base

New returns a new base aggregate.

func (*Base) Aggregate

func (b *Base) Aggregate() (uuid.UUID, string, int)

Aggregate retrns the id, name, and version of the aggregate.

func (*Base) AggregateChanges

func (b *Base) AggregateChanges() []event.Event

AggregateChanges returns the recorded changes.

func (*Base) AggregateID

func (b *Base) AggregateID() uuid.UUID

AggregateID returns the aggregate id.

func (*Base) AggregateName

func (b *Base) AggregateName() string

AggregateName returns the aggregate name.

func (*Base) AggregateVersion

func (b *Base) AggregateVersion() int

AggregateVersion returns the aggregate version.

func (*Base) ApplyEvent

func (b *Base) ApplyEvent(evt event.Event)

ApplyEvent calls the registered event appliers for the given event.

func (*Base) Commit

func (b *Base) Commit()

Commit clears the recorded changes and sets the aggregate version to the version of the last recorded change. The recorded changes must be sorted by event version.

func (*Base) CurrentVersion added in v0.2.0

func (b *Base) CurrentVersion() int

CurrentVersion returns the version of the aggregate with respect to the uncommitted changes/events.

func (*Base) DiscardChanges added in v0.1.2

func (b *Base) DiscardChanges()

DiscardChanges discards the recorded changes. The aggregate repository calls this method when retrying a failed Repository.Use() call. Note that this method does not discard any state changs that were applied to the aggregate; it only discards recorded changes.

func (*Base) ModelID added in v0.1.2

func (b *Base) ModelID() uuid.UUID

ModelID implements goes/persistence/model.Model. This allows *Base to be used as a TypedAggregate for the type parameter of a TypedRepository.

func (*Base) RecordChange added in v0.1.2

func (b *Base) RecordChange(events ...event.Event)

RecordChange records applied changes to the aggregate.

func (*Base) Ref added in v0.2.0

func (b *Base) Ref() Ref

Ref returns a Ref to the given aggregate.

func (*Base) SetVersion

func (b *Base) SetVersion(v int)

SetVersion manually sets the version of the aggregate.

SetVersion implements snapshot.Aggregate.

type Committer

type Committer interface {
	// RecordChange records events that were applied to the aggregate.
	RecordChange(...event.Event)

	// Commit clears the recorded changes and updates the current version of the
	// aggregate to the last recorded event.
	Commit()
}

A Committer is an aggregate that records and commits its changes. The ApplyHistory() function calls RecordChange() and Commit() to reconstruct the state of an aggregate, and the aggregate repository calls Commit() after saving the aggregate changes to the event store.

type ConsistencyError

type ConsistencyError struct {
	// Kind is the kind of incosistency.
	Kind ConsistencyKind
	// Aggregate is the handled aggregate.
	Aggregate Ref
	// CurrentVersion is the current version of the aggregate (without the tested changes).
	CurrentVersion int
	// Events are the tested events.
	Events []event.Event
	// EventIndex is the index of the event that caused the Error.
	EventIndex int
}

Error is a consistency error.

func (*ConsistencyError) Error

func (err *ConsistencyError) Error() string

func (*ConsistencyError) Event

func (err *ConsistencyError) Event() event.Event

Event return the first event that caused an inconsistency.

func (*ConsistencyError) IsConsistencyError added in v0.1.2

func (err *ConsistencyError) IsConsistencyError() bool

IsConsistencyError implements error.Is.

type ConsistencyKind

type ConsistencyKind int

ConsistencyKind is the kind of inconsistency.

func (ConsistencyKind) String

func (k ConsistencyKind) String() string

type History

type History interface {
	// Aggregate returns the reference to the aggregate of this history.
	Aggregate() Ref

	// Apply applies the history to the aggregate to rebuild its current state.
	Apply(Aggregate)
}

A History provides the event history of an aggregate. A History can be applied to an aggregate to rebuild its current state.

type Of added in v0.1.2

type Of[ID comparable] interface {
	// Aggregate returns the id, name and version of the aggregate.
	Aggregate() (ID, string, int)

	// AggregateChanges returns the uncommited events of the aggregate.
	AggregateChanges() []event.Event

	// ApplyEvent applies an event to the aggregate.
	ApplyEvent(event.Event)
}

Of is an event-sourced aggregate.

type Option

type Option func(*Base)

Option is an option for creating an aggregate.

func Version

func Version(v int) Option

Version returns an Option that sets the version of an aggregate.

type Query

type Query interface {
	// Names returns the aggregate names to query for.
	Names() []string

	// IDs returns the aggregate UUIDs to query for.
	IDs() []uuid.UUID

	// Versions returns the version constraints for the query.
	Versions() version.Constraints

	// Sortings returns the SortConfig for the query.
	Sortings() []SortOptions
}

Query is used by repositories to filter aggregates from the event store.

type Ref

type Ref = event.AggregateRef

Ref is a reference to a specific aggregate, identified by its name and id.

type Repository

type Repository interface {
	// Save inserts the changes of the aggregate into the event store.
	Save(ctx context.Context, a Aggregate) error

	// Fetch fetches the events for the given aggregate from the event store,
	// beginning from version a.AggregateVersion()+1 up to the latest version
	// for that aggregate and applies them to a, so that a is in the latest
	// state. If the event store does not return any events, a stays untouched.
	Fetch(ctx context.Context, a Aggregate) error

	// FetchVersion fetches the events for the given aggregate from the event
	// store, beginning from version a.AggregateVersion()+1 up to v and applies
	// them to a, so that a is in the state of the time of the event with
	// version v. If the event store does not return any events, a stays
	// untouched.
	FetchVersion(ctx context.Context, a Aggregate, v int) error

	// Query queries the event Store for aggregates and returns a channel of
	// Histories and an error channel. If the query fails, Query returns nil
	// channels and an error.
	//
	// A History can be applied on an aggregate to reconstruct its state from
	// the History.
	//
	// The Drain function can be used to get the result of the stream as slice
	// and a single error:
	//
	//	res, errs, err := r.Query(context.TODO(), query.New())
	//	// handle err
	//	appliers, err := stream.Drain(context.TODO(), res, errs)
	//	// handle err
	//	for _, app := range appliers {
	//		// Initialize your aggregate.
	//		var a aggregate = newAggregate(app.AggregateName(), app.AggregateID())
	//		a.Apply(a)
	//	}
	Query(ctx context.Context, q Query) (<-chan History, <-chan error, error)

	// Use first fetches the aggregate a from the event store, then calls fn(a)
	// and finally saves the aggregate changes. If fn returns a non-nil error,
	// the aggregate is not saved and the error is returned.
	Use(ctx context.Context, a Aggregate, fn func() error) error

	// Delete deletes an aggregate by deleting its events from the Event Store.
	Delete(ctx context.Context, a Aggregate) error
}

Repository is the aggregate repository. It saves and fetches aggregates to and from the underlying event store.

type SoftDeleter added in v0.1.2

type SoftDeleter interface{ SoftDelete() bool }

SoftDeleter is an API that can be implemented by event data to soft-delete an aggregate. Soft-deleted aggregates are excluded from query results of the aggregate repository. When trying to fetch a soft-deleted aggregate from a repository, a repository.ErrDeleted error is returned. Soft-deleted aggregates can be restored by SoftRestorer events.

To soft-delete an aggregate, an event with event data that implements SoftDeleter must be inserted into the aggregate's event stream. The SoftDelete() method of the event data must return true if the aggregate should be soft-deleted.

type DeletedData struct {}
func (DeletedData) SoftDelete() bool { return true }

type Foo struct { *aggregate.Base }
func (f *Foo) Delete() {
	aggregate.Next(f, "foo.deleted", DeletedData{})
}

type SoftRestorer added in v0.1.2

type SoftRestorer interface{ SoftRestore() bool }

SoftRestorer is an API that can be implemented by event data to restore a soft-deleted aggregate.

To restore a soft-deleted aggregate, an event with event data that implements SoftRestorer must be inserted into the aggregate's event stream. The SoftDelete() method of the event data must return true if the aggregate should be restored.

type RestoredData struct {}
func (RestoredData) SoftRestore() bool { return true }

type Foo struct { *aggregate.Base }
func (f *Foo) Delete() {
	aggregate.Next(f, "foo.restored", RestoredData{})
}

type SortDirection

type SortDirection int

SortDirection is a sorting direction.

func (SortDirection) Bool

func (dir SortDirection) Bool(b bool) bool

Bool returns either b if dir=SortAsc or !b if dir=SortDesc.

type SortOptions

type SortOptions struct {
	Sort Sorting
	Dir  SortDirection
}

SortOptions defines the sorting behaviour for a Query.

type Sorting

type Sorting int

Sorting is a sorting.

func (Sorting) Compare

func (s Sorting) Compare(a, b Aggregate) (cmp int8)

Compare compares a and b and returns -1 if a < b, 0 if a == b or 1 if a > b.

type TypedAggregate added in v0.1.2

type TypedAggregate interface {
	model.Model[uuid.UUID]
	Aggregate
}

TypedAggregate is a type constraint for aggregates of a TypedRepository.

type TypedRepository added in v0.1.2

type TypedRepository[A TypedAggregate] interface {
	model.Repository[A, uuid.UUID]

	// FetchVersion fetches all events for the given aggregate up to the given
	// version from the event store and applies them to the aggregate.
	// FetchVersion fetches the given aggregate in the given version from the
	// event store.
	FetchVersion(ctx context.Context, id uuid.UUID, version int) (A, error)

	// Query queries the event store for aggregates and returns a channel of
	// aggregates and an error channel. If the query fails, Query returns nil
	// channels and an error.
	//
	// A query made by this repository will only ever return aggregates of this
	// repository's generic type, even if the query would normally return other
	// aggregates. aggregates that cannot be casted to the generic type will be
	// simply discarded from the stream.
	//
	// The streams.Drain returns the query result as slice and a single error:
	//
	//	str, errs, err := r.Query(context.TODO(), query.New())
	//	// handle err
	//	res, err := streams.Drain(context.TODO(), str, errs)
	//	// handle err
	//	for _, a := range res {
	//		// a is your aggregate
	//	}
	Query(ctx context.Context, q Query) (<-chan A, <-chan error, error)
}

TypedRepository is a repository for a specific aggregate type. Use the github.com/modernnice/aggregate/repository.Typed function to create a TypedRepository.

func NewFoo(id uuid.UUID) *Foo { ... }

var repo aggregate.Repository
typed := repository.Typed(repo, NewFoo)

Directories

Path Synopsis
internal
Package ref provides utilities for working with aggregate.Ref.
Package ref provides utilities for working with aggregate.Ref.
mocks
Package mock_snapshot is a generated GoMock package.
Package mock_snapshot is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL