Documentation ¶
Index ¶
- Variables
- type ChangeDiscarder
- type IsRetryable
- type Option
- func AfterInsert(fn func(context.Context, aggregate.Aggregate) error) Option
- func BeforeInsert(fn func(context.Context, aggregate.Aggregate) error) Option
- func ModifyQueries(...) Option
- func OnDelete(fn func(context.Context, aggregate.Aggregate) error) Option
- func OnFailedInsert(fn func(context.Context, aggregate.Aggregate, error) error) Option
- func WithSnapshots(store snapshot.Store, s snapshot.Schedule) Option
- type Repository
- func (r *Repository) Delete(ctx context.Context, a aggregate.Aggregate) error
- func (r *Repository) Fetch(ctx context.Context, a aggregate.Aggregate) error
- func (r *Repository) FetchVersion(ctx context.Context, a aggregate.Aggregate, v int) error
- func (r *Repository) Query(ctx context.Context, q aggregate.Query) (<-chan aggregate.History, <-chan error, error)
- func (r *Repository) Save(ctx context.Context, a aggregate.Aggregate) error
- func (r *Repository) Use(ctx context.Context, a aggregate.Aggregate, fn func() error) error
- type RetryTrigger
- type RetryTriggerFunc
- type Retryer
- type TypedRepository
- func (r *TypedRepository[Aggregate]) Delete(ctx context.Context, a Aggregate) error
- func (r *TypedRepository[Aggregate]) Fetch(ctx context.Context, id uuid.UUID) (Aggregate, error)
- func (r *TypedRepository[Aggregate]) FetchVersion(ctx context.Context, id uuid.UUID, version int) (Aggregate, error)
- func (r *TypedRepository[Aggregate]) NewFunc() func(uuid.UUID) Aggregate
- func (r *TypedRepository[Aggregate]) Query(ctx context.Context, q aggregate.Query) (<-chan Aggregate, <-chan error, error)
- func (r *TypedRepository[Aggregate]) Repository() aggregate.Repository
- func (r *TypedRepository[Aggregate]) Save(ctx context.Context, a Aggregate) error
- func (r *TypedRepository[Aggregate]) Use(ctx context.Context, id uuid.UUID, fn func(Aggregate) error) error
Constants ¶
This section is empty.
Variables ¶
var ( // ErrVersionNotFound is returned when trying to fetch an aggregate with a // version higher than the current version of the aggregate. ErrVersionNotFound = errors.New("version not found") // ErrDeleted is returned when trying to fetch an aggregate that has been soft-deleted. ErrDeleted = errors.New("aggregate was soft-deleted") )
Functions ¶
This section is empty.
Types ¶
type ChangeDiscarder ¶ added in v0.1.2
type ChangeDiscarder interface {
DiscardChanges()
}
A ChangeDiscarder discards changes to the aggregate. The DiscardChanges() method is called each time a Repository.Use() call is retried for the aggregate.
type IsRetryable ¶ added in v0.1.2
IsRetryable is a function that determines if an error within a Reposistory.Use() call is retryable.
type Option ¶
type Option func(*Repository)
Option is a repository option.
func AfterInsert ¶
AfterInsert returns an Option that adds fn as a hook to a Repository. fn is called after the changes to an aggregate are inserted into the event store.
func BeforeInsert ¶
BeforeInsert returns an Option that adds fn as a hook to a Repository. fn is called before the changes to an aggregate are inserted into the event store.
func ModifyQueries ¶
func ModifyQueries(mods ...func(ctx context.Context, q aggregate.Query, prev event.Query) (event.Query, error)) Option
ModifyQueries returns an Option that adds mods as Query modifiers to a Repository. When the Repository builds a Query, it is passed to every modifier before the event store is queried.
func OnDelete ¶
OnDelete returns an Option that adds fn as a hook to a Repository. fn is called after an aggregate has been deleted.
func OnFailedInsert ¶
OnFailedInsert returns an Option that adds fn as a hook to a Repository. fn is called when the Repository fails to insert the changes to an aggregate into the event store.
func WithSnapshots ¶
WithSnapshots returns an Option that add a Snapshot Store to a Repository.
A Repository that has a Snapshot Store will fetch the latest valid Snapshot for an aggregate before fetching the necessary events to reconstruct the state of the Agrgegate.
An optional Snapshot Schedule can be provided to instruct the Repository to make and save Snapshots into the Snapshot Store when appropriate:
var store snapshot.Store r := repository.New(store, snapshot.Every(3))
The example above will make a Snapshot of an aggregate every third version of the aggregate.
Aggregates must implement snapshot.Marshaler & snapshot.Unmarshaler in order for Snapshots to work.
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
func New ¶
func New(store event.Store, opts ...Option) *Repository
New returns an event-sourced aggregate Repository. It uses the provided event Store to persist and query aggregates.
func (*Repository) Delete ¶
Delete deletes an aggregate by deleting its events from the event store.
func (*Repository) Fetch ¶
Fetch fetches the events of the provided aggregate from the event store and applies them to it to build its current state.
It is allowed to pass an aggregate that does't have any events in the event store yet.
It is also allowed to pass an aggregate that has already events applied onto it. Only events with a version higher than the current version of the passed Aggregate are fetched from the event store.
func (*Repository) FetchVersion ¶
FetchVersion does the same as r.Fetch, but only fetches events up until the given version v. If the event store has no event for the provided aggregate with the requested version, ErrVersionNotFound is returned.
func (*Repository) Query ¶
func (r *Repository) Query(ctx context.Context, q aggregate.Query) (<-chan aggregate.History, <-chan error, error)
Query queries the event store for events that match the given Query and returns a stream of aggregate Histories and errors. Use the returned Histories to build the current state of the queried aggregates:
var r *Repository str, errs, err := r.Query(context.TODO(), query.New(...)) // handle err histories, err := streams.Drain(context.TODO(), str, errs) // handle err for _, his := range histories { aggregateName := his.AggregateName() aggregateID := his.AggregateID() // Create the aggregate from its name and UUID foo := newFoo(aggregateID) // Then apply its History his.Apply(foo) }
type RetryTrigger ¶ added in v0.1.2
type RetryTrigger interface {
// contains filtered or unexported methods
}
A RetryTrigger triggers a retry of Repository.Use().
func RetryApprox ¶ added in v0.1.2
func RetryApprox(interval, deviation time.Duration, maxTries int) RetryTrigger
RetryApprox returns a RetryTrigger that retries approximately every interval up to maxTries. The provided deviation is used to randomize the interval. If the interval is 1s and deviation is 100ms, then the retry is triggered after somewhere between 900ms to 1100ms.
func RetryEvery ¶ added in v0.1.2
func RetryEvery(interval time.Duration, maxTries int) RetryTrigger
RetryEvery returns a RetryTrigger that retries every interval up to maxTries.
type RetryTriggerFunc ¶ added in v0.1.2
RetryTriggerFunc allows a function to be used as a RetryTrigger.
type Retryer ¶ added in v0.1.2
type Retryer interface {
RetryUse() (RetryTrigger, IsRetryable)
}
Retryer is an aggregate that can retry a failed Repository.Use() operation. If the RetryUse() method of the aggregate returns a non-nil RetryTrigger rt, failed Repository.Use() calls will be retried until rt.next() returns a non-nil error. The returned IsRetryable function is used to determine if the error that made Repository.Use() fail is retryable.
The following example retries calls that failed due to consistency errors. It is retried every second, up to 3 times before giving up. If the call does not succeed after 3 tries, the error of the last attempt is returned to the caller.
type Foo struct { *aggregate.Base } func (f *Foo) RetryUse() (rt repository.RetryTrigger, isRetryable repository.IsRetryable) { return repository.RetryEvery(time.Second, 3), aggregate.IsConsistencyError }
type TypedRepository ¶ added in v0.1.2
type TypedRepository[Aggregate aggregate.TypedAggregate] struct { // contains filtered or unexported fields }
TypedRepository implements aggregate.TypedRepository. Use Typed to create a typed repository from an underlying repository.
type Foo struct { *aggregate.Base } func NewFoo(id uuid.UUID) *Foo { return &Foo{Base: aggregate.New("foo", id)} } var repo aggregate.Repository typed := repository.Typed(repo, NewFoo)
Now you can use the typed repository like this:
foo, err := typed.Fetch(context.TODO(), uuid.UUID{...})
For comparison, using an untyped repository:
var foo Foo err := repo.Fetch(context.TODO(), &foo)
func NewOf ¶ added in v0.1.2
func NewOf[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]
NewOf is an alias for Typed.
func Typed ¶ added in v0.1.2
func Typed[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]
Typed returns a TypedRepository for the given generic aggregate type. The provided makeFunc is used to initialize aggregates when querying from the event store.
type Foo struct { *aggregate.Base } func NewFoo(id uuid.UUID) *Foo { return &Foo{Base: aggregate.New("foo", id)} } var repo aggregate.Repository typed := repository.Typed(repo, NewFoo)
func (*TypedRepository[Aggregate]) Delete ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Delete(ctx context.Context, a Aggregate) error
Delete implements aggregate.TypedRepository.Delete.
func (*TypedRepository[Aggregate]) Fetch ¶ added in v0.1.2
Fetch implements aggregate.TypedRepository.Fetch.
func (*TypedRepository[Aggregate]) FetchVersion ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) FetchVersion(ctx context.Context, id uuid.UUID, version int) (Aggregate, error)
FetchVersion implements aggregate.TypedRepository.FetchVersion.
func (*TypedRepository[Aggregate]) NewFunc ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) NewFunc() func(uuid.UUID) Aggregate
NewFunc returns the underlying constructor function for the aggregate of this repository.
func (*TypedRepository[Aggregate]) Query ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Query(ctx context.Context, q aggregate.Query) (<-chan Aggregate, <-chan error, error)
Query implements aggregate.TypedRepository.Query.
func (*TypedRepository[Aggregate]) Repository ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Repository() aggregate.Repository
Repository returns the underlying, untyped aggregate.Repository.