Documentation ¶
Index ¶
- Constants
- Variables
- func ApplyHistory(a Aggregate, events ...event.Event) error
- func ForEach(applyFn func(h History), errFn func(error), histories <-chan History, ...)
- func ForEachRef(applyFn func(Ref), errFn func(error), refs <-chan Ref, errs ...<-chan error)
- func HasChange(a Aggregate, eventName string) bool
- func NextEvent(a Aggregate, name string, data interface{}, opts ...event.Option) event.Event
- func NextVersion(a Aggregate) int
- func PickID(a Aggregate) uuid.UUID
- func PickName(a Aggregate) string
- func PickVersion(a Aggregate) int
- func UncommittedVersion(a Aggregate) int
- func ValidateConsistency(a Aggregate, events ...event.Event) error
- func Walk(ctx context.Context, walkFn func(History) error, str <-chan History, ...) error
- func WalkRefs(ctx context.Context, walkFn func(Ref) error, str <-chan Ref, ...) error
- type Aggregate
- type Base
- func (b *Base) Aggregate() (uuid.UUID, string, int)
- func (b *Base) AggregateChanges() []event.Event
- func (b *Base) AggregateID() uuid.UUID
- func (b *Base) AggregateName() string
- func (b *Base) AggregateVersion() int
- func (*Base) ApplyEvent(event.Event)
- func (b *Base) Commit()
- func (b *Base) SetVersion(v int)
- func (b *Base) TrackChange(events ...event.Event)
- type Committer
- type ConsistencyError
- type ConsistencyKind
- type History
- type Option
- type Query
- type Ref
- type Repository
- type SortDirection
- type SortOptions
- type Sorting
- type Tupledeprecated
Constants ¶
const ( // ID means there is an inconsistency in the ID of an Aggregate. InconsistentID = ConsistencyKind(iota + 1) // Name means there is an inconsistency in the Aggregate names of the Events // of an Aggregate. InconsistentName // Version means there is an inconsistency in the Event versions of an // Aggregate. InconsistentVersion // Time means there is an inconsistency in the Event times of an Aggregate. InconsistentTime )
const ( // SortName sorts aggregates by name. SortName = Sorting(iota) // SortID sorts aggregates by id. SortID // SortVersion sorts aggregates by version. SortVersion // SortAsc sorts aggregates in ascending order. SortAsc = SortDirection(iota) // SortDesc sorts aggregates in descending order. SortDesc )
Variables ¶
var ExtractID = PickID
Deprecated: Use PickID instead.
var ExtractName = PickName
Deprecated: Use PickName instead.
var ExtractVersion = PickVersion
Deprecated: Use PickVersion instead.
var ForEvery = ForEach
ForEvery is an alias for ForEach.
Deprecated: Use ForEach instead.
var ForEveryTuple = ForEachRef
ForEveryTuple is an alias for ForEachTuple.
Deprecated: Use ForEachRef instead.
var WalkTuples = WalkRefs
Deprecated: Use RefWalk instead.
Functions ¶
func ApplyHistory ¶
ApplyHistory applies the given events to the aggregate a to reconstruct the state of a at the time of the latest event. If the aggregate implements Committer, a.TrackChange(events) and a.Commit() are called before returning.
func ForEach ¶
func ForEach( applyFn func(h History), errFn func(error), histories <-chan History, errs ...<-chan error, )
ForEach iterates over the provided History and error channels and for every History h calls applyFn(h) and for every error e calls errFn(e) until all channels are closed or ctx is canceled.
func ForEachRef ¶
ForEachRef iterates over the provided Ref and error channels and for every Ref r calls applyFn(r) and for every error e calls errFn(e) until all channels are closed or ctx is canceled.
func HasChange ¶
HasChange returns whether Aggregate a has an uncommitted Event with the given name.
func NextEvent ¶
NextEvent makes and returns the next Event e for the aggregate a. NextEvent calls a.ApplyEvent(e) and a.TrackChange(e) before returning the Event.
var foo aggregate.Aggregate evt := aggregate.NextEvent(foo, "event-name", ...)
func NextVersion ¶
NextVersion returns the next (uncommitted) version of an aggregate (UncommittedVersion(a) + 1).
func PickVersion ¶
PickVersion returns the version of the given aggregate.
func UncommittedVersion ¶
UncommittedVersion returns the version the aggregate, including any uncommitted changes.
func ValidateConsistency ¶
Validate tests the consistency of the given events against the given aggregate.
An Event e is invalid if e.AggregateName() doesn't match a.AggregateName(), e.AggregateID() doesn't match a.AggregateID() or if e.AggregateVersion() doesn't match the position in events relative to a.AggregateVersion(). This means that events[0].AggregateVersion() must equal a.AggregateVersion() + 1, events[1].AggregateVersion() must equal a.AggregateVersion() + 2 etc.
An Event a is also invalid if its time is equal to or after the time of the previous Event.
The first Event e in events that is invalid causes Validate to return an *Error containing the Kind of inconsistency and the Event that caused the inconsistency.
func Walk ¶
func Walk( ctx context.Context, walkFn func(History) error, str <-chan History, errs ...<-chan error, ) error
Walk receives from the given History channel until it and all provided error channels are closed, ctx is closed or any of the provided error channels receives an error. For every History h that is received from the History channel, walkFn(h) is called. Should ctx be canceled before the channels are closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.
Example:
var repo aggregate.Repository str, errs, err := repo.Query(context.TODO(), query.New()) // handle err err := aggregate.Walk(context.TODO(), func(h aggregate.History) { log.Println(fmt.Sprintf("Received History: %v", h)) }, str, errs) // handle err
func WalkRefs ¶
func WalkRefs( ctx context.Context, walkFn func(Ref) error, str <-chan Ref, errs ...<-chan error, ) error
WalkRefs receives from the given Ref channel until it and all provided error channels are closed, ctx is closed or any of the provided error channels receives an error. For every Ref r that is received from the Ref channel, walkFn(h) is called. Should ctx be canceled before the channels are closed, ctx.Err() is returned. Should an error be received from one of the error channels, that error is returned. Otherwise Walk returns nil.
Types ¶
type Aggregate ¶
type Aggregate interface { // Aggregate returns the id, name and version of the aggregate. Aggregate() (uuid.UUID, string, int) // AggregateChanges returns the uncommited events of the aggregate. AggregateChanges() []event.Event // ApplyEvent applies the event on the aggregate. ApplyEvent(event.Event) }
Aggregate is an event-sourced Aggregate.
func Sort ¶
func Sort(as []Aggregate, s Sorting, dir SortDirection) []Aggregate
Sort sorts aggregates and returns the sorted aggregates.
func SortMulti ¶
func SortMulti(as []Aggregate, sorts ...SortOptions) []Aggregate
SortMulti sorts aggregates by multiple fields and returns the sorted aggregates.
type Base ¶
Base can be embedded into structs to make them fulfill the Aggregate interface.
func (*Base) AggregateChanges ¶
AggregateChanges implements Aggregate.
func (*Base) AggregateName ¶
AggregateName implements Aggregate.
func (*Base) AggregateVersion ¶
AggregateVersion implements Aggregate.
func (*Base) ApplyEvent ¶
ApplyEvent implements aggregate. Aggregates that embed *Base should overide ApplyEvent.
func (*Base) SetVersion ¶
SetVersion implements snapshot.Aggregate.
func (*Base) TrackChange ¶
TrackChange implements Aggregate.
type Committer ¶
type Committer interface { // TrackChange adds events as changes to the aggregate. TrackChange(...event.Event) // Commit commits the uncommitted changes of the aggregate. The changes // should be removed and the aggregate version set to the version of last // tracked event. Commit() }
Committer commits aggregate changes. Types that implement Committer are considered when applying the aggregate history onto the implementing type. The Commit function is called after applying the events onto the aggregate (using the ApplyEvent function) to commit the changes to the aggregate.
*Base implements Committer.
type ConsistencyError ¶
type ConsistencyError struct { // Kind is the kind of incosistency. Kind ConsistencyKind // Aggregate is the handled aggregate. Aggregate Aggregate // Events are the tested events. Events []event.Event // EventIndex is the index of the Event that caused the Error. EventIndex int }
Error is a consistency error.
func (*ConsistencyError) Error ¶
func (err *ConsistencyError) Error() string
func (*ConsistencyError) Event ¶
func (err *ConsistencyError) Event() event.Event
Event return the first Event that caused an inconsistency.
type ConsistencyKind ¶
type ConsistencyKind int
ConsistencyKind is the kind of inconsistency.
func (ConsistencyKind) String ¶
func (k ConsistencyKind) String() string
type History ¶
type History interface { // AggregateName returns the name of the aggregate. AggregateName() string // AggregateID returns the UUID of the aggregate. AggregateID() uuid.UUID // Apply applies the history onto the aggregate to rebuild its current state. Apply(Aggregate) }
A History provides the event history of an aggregate. A History can be applied onto an aggregate to rebuild its current state.
func Drain ¶
Drain drains the given History channel and returns its Histories.
Drain accepts optional error channels which will cause Drain to fail on any error. When Drain encounters an error from any of the error channels, the already drained Histories and that error are returned. Similarly, when ctx is canceled, the drained Histories and ctx.Err() are returned.
Drain returns when the provided History channel is closed or it encounters an error from an error channel and does not wait for the error channels to be closed.
type Query ¶
type Query interface { // Names returns the aggregate names to query for. Names() []string // IDs returns the aggregate UUIDs to query for. IDs() []uuid.UUID // Versions returns the version constraints for the query. Versions() version.Constraints // Sortings returns the SortConfig for the query. Sortings() []SortOptions }
Query is used by Repositories to filter aggregates.
type Ref ¶
type Ref = event.AggregateRef
Ref is a reference to a specific aggregate, identified by its name and id.
func DrainRefs ¶
DrainRefs drains the given Ref channel and returns its Refs.
DrainRefs accepts optional error channels which will cause DrainRefs to fail on any error. When DrainRefs encounters an error from any of the error channels, the already drained Refs and that error are returned. Similarly, when ctx is canceled, the drained Refs and ctx.Err() are returned.
DrainRefs returns when the provided Ref channel is closed or it encounters an error from an error channel and does not wait for the error channels to be closed.
type Repository ¶
type Repository interface { // Save inserts the changes of Aggregate a into the event store. Save(ctx context.Context, a Aggregate) error // Fetch fetches the events for the given Aggregate from the event store, // beginning from version a.AggregateVersion()+1 up to the latest version // for that Aggregate and applies them to a, so that a is in the latest // state. If the event store does not return any events, a stays untouched. Fetch(ctx context.Context, a Aggregate) error // FetchVersion fetches the events for the given Aggregate from the event // store, beginning from version a.AggregateVersion()+1 up to v and applies // them to a, so that a is in the state of the time of the event with // version v. If the event store does not return any events, a stays // untouched. FetchVersion(ctx context.Context, a Aggregate, v int) error // Query queries the Event Store for Aggregates and returns a channel of // Histories and an error channel. If the query fails, Query returns nil // channels and an error. // // A History can be applied on an Aggregate to reconstruct its state from // the History. // // The Drain function can be used to get the result of the stream as slice // and a single error: // // res, errs, err := r.Query(context.TODO(), query.New()) // // handle err // appliers, err := stream.Drain(context.TODO(), res, errs) // // handle err // for _, app := range appliers { // // Initialize your Aggregate. // var a Aggregate = newAggregate(app.AggregateName(), app.AggregateID()) // a.Apply(a) // } Query(ctx context.Context, q Query) (<-chan History, <-chan error, error) // Delete deletes an Aggregate by deleting its Events from the Event Store. Delete(ctx context.Context, a Aggregate) error }
Repository is the aggregate repository. It saves and fetches aggregates to and from the underlying event store.
type SortDirection ¶
type SortDirection int
SortDirection is a sorting direction.
func (SortDirection) Bool ¶
func (dir SortDirection) Bool(b bool) bool
Bool returns either b if dir=SortAsc or !b if dir=SortDesc.
type SortOptions ¶
type SortOptions struct { Sort Sorting Dir SortDirection }
SortOptions defines the sorting behaviour for a Query.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package mock_aggregate is a generated GoMock package.
|
Package mock_aggregate is a generated GoMock package. |
Package ref provides utilities for working with aggregate.Ref.
|
Package ref provides utilities for working with aggregate.Ref. |
mocks
Package mock_snapshot is a generated GoMock package.
|
Package mock_snapshot is a generated GoMock package. |
Deprecated: Use github.com/modernice/goes/aggregate/ref instead
|
Deprecated: Use github.com/modernice/goes/aggregate/ref instead |