Documentation ¶
Index ¶
- Constants
- func ApplyHistory[Events ~[]event.Of[any]](a Aggregate, events Events) error
- func IsConsistencyError(err error) bool
- func Next[Data any](a Aggregate, name string, data Data, opts ...event.Option) event.Evt[Data]
- func NextEvent[D any](a Aggregate, name string, data D, opts ...event.Option) event.Evt[D]deprecated
- func NextVersion(a Aggregate) int
- func UncommittedVersion(a Aggregate) int
- func ValidateConsistency[Data any, Events ~[]event.Of[Data]](a Aggregate, events Events) error
- type Aggregate
- type Base
- func (b *Base) Aggregate() (uuid.UUID, string, int)
- func (b *Base) AggregateChanges() []event.Event
- func (b *Base) AggregateID() uuid.UUID
- func (b *Base) AggregateName() string
- func (b *Base) AggregateVersion() int
- func (b *Base) ApplyEvent(evt event.Event)
- func (b *Base) Commit()
- func (b *Base) CurrentVersion() int
- func (b *Base) DiscardChanges()
- func (b *Base) ModelID() uuid.UUID
- func (b *Base) RecordChange(events ...event.Event)
- func (b *Base) Ref() Ref
- func (b *Base) SetVersion(v int)
- type Committer
- type ConsistencyError
- type ConsistencyKind
- type History
- type Of
- type Option
- type Query
- type Ref
- type Repository
- type SoftDeleter
- type SoftRestorer
- type SortDirection
- type SortOptions
- type Sorting
- type TypedAggregate
- type TypedRepository
Constants ¶
const ( // ID means there is an inconsistency in the aggregate ids. InconsistentID = ConsistencyKind(iota + 1) // Name means there is an inconsistency in the aggregate names. InconsistentName // Version means there is an inconsistency in the event versions. InconsistentVersion // Time means there is an inconsistency in the event times. InconsistentTime )
const ( // SortName sorts aggregates by name. SortName = Sorting(iota) // SortID sorts aggregates by id. SortID // SortVersion sorts aggregates by version. SortVersion // SortAsc sorts aggregates in ascending order. SortAsc = SortDirection(iota) // SortDesc sorts aggregates in descending order. SortDesc )
Variables ¶
This section is empty.
Functions ¶
func ApplyHistory ¶
ApplyHistory applies an event stream to an aggregate to reconstruct its state. If the aggregate implements Committer, a.RecordChange(events) and a.Commit() are called before returning.
func IsConsistencyError ¶ added in v0.1.2
IsConsistencyError returns whether an error was caused by an inconsistency in the events of an aggregate. An error is considered a consistency error if it either unwraps to a *ConsistencyError or if it has an IsConsistencyError() bool method that return true for the given error.
func Next ¶ added in v0.1.2
Next creates, applies and returns the next event for the given aggregate.
var foo aggregate.Aggregate evt := aggregate.Next(foo, "name", <data>, ...)
func NextVersion ¶
NextVersion returns the version that the next event of the aggregate must have.
func UncommittedVersion ¶
UncommittedVersion returns the version of the aggregate after committing the recorded changes.
func ValidateConsistency ¶
Validate tests the consistency of the given events against the given aggregate.
An event e is invalid if e.AggregateName() doesn't match a.AggregateName(), e.AggregateID() doesn't match a.AggregateID() or if e.AggregateVersion() doesn't match the position in events relative to a.AggregateVersion(). This means that events[0].AggregateVersion() must equal a.AggregateVersion() + 1, events[1].AggregateVersion() must equal a.AggregateVersion() + 2 etc.
An event a is also invalid if its time is equal to or after the time of the previous event.
The first event e in events that is invalid causes Validate to return an *Error containing the Kind of inconsistency and the event that caused the inconsistency.
Types ¶
type Aggregate ¶
Aggregate is an event-sourced aggregate.
func Sort ¶
func Sort(as []Aggregate, s Sorting, dir SortDirection) []Aggregate
Sort sorts aggregates and returns the sorted aggregates.
func SortMulti ¶
func SortMulti(as []Aggregate, sorts ...SortOptions) []Aggregate
SortMulti sorts aggregates by multiple fields and returns the sorted aggregates.
type Base ¶
type Base struct { ID uuid.UUID Name string Version int Changes []event.Event // contains filtered or unexported fields }
Base can be embedded into aggregates to implement the goes' APIs:
- aggregate.Aggregate
- aggregate.Committer
- repository.ChangeDiscarder
- snapshot.Aggregate
func (*Base) AggregateChanges ¶
AggregateChanges returns the recorded changes.
func (*Base) AggregateID ¶
AggregateID returns the aggregate id.
func (*Base) AggregateName ¶
AggregateName returns the aggregate name.
func (*Base) AggregateVersion ¶
AggregateVersion returns the aggregate version.
func (*Base) ApplyEvent ¶
ApplyEvent calls the registered event appliers for the given event.
func (*Base) Commit ¶
func (b *Base) Commit()
Commit clears the recorded changes and sets the aggregate version to the version of the last recorded change. The recorded changes must be sorted by event version.
func (*Base) CurrentVersion ¶ added in v0.2.0
CurrentVersion returns the version of the aggregate with respect to the uncommitted changes/events.
func (*Base) DiscardChanges ¶ added in v0.1.2
func (b *Base) DiscardChanges()
DiscardChanges discards the recorded changes. The aggregate repository calls this method when retrying a failed Repository.Use() call. Note that this method does not discard any state changs that were applied to the aggregate; it only discards recorded changes.
func (*Base) ModelID ¶ added in v0.1.2
ModelID implements goes/persistence/model.Model. This allows *Base to be used as a TypedAggregate for the type parameter of a TypedRepository.
func (*Base) RecordChange ¶ added in v0.1.2
RecordChange records applied changes to the aggregate.
func (*Base) SetVersion ¶
SetVersion manually sets the version of the aggregate.
SetVersion implements snapshot.Aggregate.
type Committer ¶
type Committer interface { // RecordChange records events that were applied to the aggregate. RecordChange(...event.Event) // Commit clears the recorded changes and updates the current version of the // aggregate to the last recorded event. Commit() }
A Committer is an aggregate that records and commits its changes. The ApplyHistory() function calls RecordChange() and Commit() to reconstruct the state of an aggregate, and the aggregate repository calls Commit() after saving the aggregate changes to the event store.
type ConsistencyError ¶
type ConsistencyError struct { // Kind is the kind of incosistency. Kind ConsistencyKind // Aggregate is the handled aggregate. Aggregate Ref // CurrentVersion is the current version of the aggregate. CurrentVersion int // Events are the tested events. Events []event.Event // EventIndex is the index of the event that caused the Error. EventIndex int }
Error is a consistency error.
func (*ConsistencyError) Error ¶
func (err *ConsistencyError) Error() string
func (*ConsistencyError) Event ¶
func (err *ConsistencyError) Event() event.Event
Event return the first event that caused an inconsistency.
func (*ConsistencyError) IsConsistencyError ¶ added in v0.1.2
func (err *ConsistencyError) IsConsistencyError() bool
IsConsistencyError implements error.Is.
type ConsistencyKind ¶
type ConsistencyKind int
ConsistencyKind is the kind of inconsistency.
func (ConsistencyKind) String ¶
func (k ConsistencyKind) String() string
type History ¶
type History interface { // Aggregate returns the reference to the aggregate of this history. Aggregate() Ref // Apply applies the history to the aggregate to rebuild its current state. Apply(Aggregate) }
A History provides the event history of an aggregate. A History can be applied to an aggregate to rebuild its current state.
type Of ¶ added in v0.1.2
type Of[ID comparable] interface { // Aggregate returns the id, name and version of the aggregate. Aggregate() (ID, string, int) // AggregateChanges returns the uncommited events of the aggregate. AggregateChanges() []event.Event // ApplyEvent applies an event to the aggregate. ApplyEvent(event.Event) }
Of is an event-sourced aggregate.
type Query ¶
type Query interface { // Names returns the aggregate names to query for. Names() []string // IDs returns the aggregate UUIDs to query for. IDs() []uuid.UUID // Versions returns the version constraints for the query. Versions() version.Constraints // Sortings returns the SortConfig for the query. Sortings() []SortOptions }
Query is used by repositories to filter aggregates from the event store.
type Ref ¶
type Ref = event.AggregateRef
Ref is a reference to a specific aggregate, identified by its name and id.
type Repository ¶
type Repository interface { // Save inserts the changes of the aggregate into the event store. Save(ctx context.Context, a Aggregate) error // Fetch fetches the events for the given aggregate from the event store, // beginning from version a.AggregateVersion()+1 up to the latest version // for that aggregate and applies them to a, so that a is in the latest // state. If the event store does not return any events, a stays untouched. Fetch(ctx context.Context, a Aggregate) error // FetchVersion fetches the events for the given aggregate from the event // store, beginning from version a.AggregateVersion()+1 up to v and applies // them to a, so that a is in the state of the time of the event with // version v. If the event store does not return any events, a stays // untouched. FetchVersion(ctx context.Context, a Aggregate, v int) error // Query queries the event Store for aggregates and returns a channel of // Histories and an error channel. If the query fails, Query returns nil // channels and an error. // // A History can be applied on an aggregate to reconstruct its state from // the History. // // The Drain function can be used to get the result of the stream as slice // and a single error: // // res, errs, err := r.Query(context.TODO(), query.New()) // // handle err // appliers, err := stream.Drain(context.TODO(), res, errs) // // handle err // for _, app := range appliers { // // Initialize your aggregate. // var a aggregate = newAggregate(app.AggregateName(), app.AggregateID()) // a.Apply(a) // } Query(ctx context.Context, q Query) (<-chan History, <-chan error, error) // Use first fetches the aggregate a from the event store, then calls fn(a) // and finally saves the aggregate changes. If fn returns a non-nil error, // the aggregate is not saved and the error is returned. Use(ctx context.Context, a Aggregate, fn func() error) error // Delete deletes an aggregate by deleting its events from the Event Store. Delete(ctx context.Context, a Aggregate) error }
Repository is the aggregate repository. It saves and fetches aggregates to and from the underlying event store.
type SoftDeleter ¶ added in v0.1.2
type SoftDeleter interface{ SoftDelete() bool }
SoftDeleter is an API that can be implemented by event data to soft-delete an aggregate. Soft-deleted aggregates are excluded from query results of the aggregate repository. When trying to fetch a soft-deleted aggregate from a repository, a repository.ErrDeleted error is returned. Soft-deleted aggregates can be restored by SoftRestorer events.
To soft-delete an aggregate, an event with event data that implements SoftDeleter must be inserted into the aggregate's event stream. The SoftDelete() method of the event data must return true if the aggregate should be soft-deleted.
type DeletedData struct {} func (DeletedData) SoftDelete() bool { return true } type Foo struct { *aggregate.Base } func (f *Foo) Delete() { aggregate.Next(f, "foo.deleted", DeletedData{}) }
type SoftRestorer ¶ added in v0.1.2
type SoftRestorer interface{ SoftRestore() bool }
SoftRestorer is an API that can be implemented by event data to restore a soft-deleted aggregate.
To restore a soft-deleted aggregate, an event with event data that implements SoftRestorer must be inserted into the aggregate's event stream. The SoftDelete() method of the event data must return true if the aggregate should be restored.
type RestoredData struct {} func (RestoredData) SoftRestore() bool { return true } type Foo struct { *aggregate.Base } func (f *Foo) Delete() { aggregate.Next(f, "foo.restored", RestoredData{}) }
type SortDirection ¶
type SortDirection int
SortDirection is a sorting direction.
func (SortDirection) Bool ¶
func (dir SortDirection) Bool(b bool) bool
Bool returns either b if dir=SortAsc or !b if dir=SortDesc.
type SortOptions ¶
type SortOptions struct { Sort Sorting Dir SortDirection }
SortOptions defines the sorting behaviour for a Query.
type TypedAggregate ¶ added in v0.1.2
TypedAggregate is a type constraint for aggregates of a TypedRepository.
type TypedRepository ¶ added in v0.1.2
type TypedRepository[A TypedAggregate] interface { model.Repository[A, uuid.UUID] // FetchVersion fetches all events for the given aggregate up to the given // version from the event store and applies them to the aggregate. // FetchVersion fetches the given aggregate in the given version from the // event store. FetchVersion(ctx context.Context, id uuid.UUID, version int) (A, error) // Query queries the event store for aggregates and returns a channel of // aggregates and an error channel. If the query fails, Query returns nil // channels and an error. // // A query made by this repository will only ever return aggregates of this // repository's generic type, even if the query would normally return other // aggregates. aggregates that cannot be casted to the generic type will be // simply discarded from the stream. // // The streams.Drain returns the query result as slice and a single error: // // str, errs, err := r.Query(context.TODO(), query.New()) // // handle err // res, err := streams.Drain(context.TODO(), str, errs) // // handle err // for _, a := range res { // // a is your aggregate // } Query(ctx context.Context, q Query) (<-chan A, <-chan error, error) }
TypedRepository is a repository for a specific aggregate type. Use the github.com/modernnice/aggregate/repository.Typed function to create a TypedRepository.
func NewFoo(id uuid.UUID) *Foo { ... } var repo aggregate.Repository typed := repository.Typed(repo, NewFoo)