repository

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2023 License: Apache-2.0 Imports: 15 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrVersionNotFound is returned when trying to fetch an aggregate with a
	// version higher than the current version of the aggregate.
	ErrVersionNotFound = errors.New("version not found")

	// ErrDeleted is returned when trying to fetch an aggregate that has been soft-deleted.
	ErrDeleted = errors.New("aggregate was soft-deleted")
)

Functions

This section is empty.

Types

type ChangeDiscarder added in v0.1.2

type ChangeDiscarder interface {
	DiscardChanges()
}

A ChangeDiscarder discards changes to the aggregate. The DiscardChanges() method is called each time a Repository.Use() call is retried for the aggregate.

type IsRetryable added in v0.1.2

type IsRetryable func(error) bool

IsRetryable is a function that determines if an error within a Reposistory.Use() call is retryable.

type Option

type Option func(*Repository)

Option is a repository option.

func AfterInsert

func AfterInsert(fn func(context.Context, aggregate.Aggregate) error) Option

AfterInsert returns an Option that adds fn as a hook to a Repository. fn is called after the changes to an aggregate are inserted into the event store.

func BeforeInsert

func BeforeInsert(fn func(context.Context, aggregate.Aggregate) error) Option

BeforeInsert returns an Option that adds fn as a hook to a Repository. fn is called before the changes to an aggregate are inserted into the event store.

func ModifyQueries

func ModifyQueries(mods ...func(ctx context.Context, q aggregate.Query, prev event.Query) (event.Query, error)) Option

ModifyQueries returns an Option that adds mods as Query modifiers to a Repository. When the Repository builds a Query, it is passed to every modifier before the event store is queried.

func OnDelete

func OnDelete(fn func(context.Context, aggregate.Aggregate) error) Option

OnDelete returns an Option that adds fn as a hook to a Repository. fn is called after an aggregate has been deleted.

func OnFailedInsert

func OnFailedInsert(fn func(context.Context, aggregate.Aggregate, error) error) Option

OnFailedInsert returns an Option that adds fn as a hook to a Repository. fn is called when the Repository fails to insert the changes to an aggregate into the event store.

func ValidateConsistency added in v0.2.7

func ValidateConsistency(validate bool) Option

ValidateConsistency returns an Option that configures a Repository to validate the consistency of aggregate events before inserting the events into the event store. Defaults to true.

func WithSnapshots

func WithSnapshots(store snapshot.Store, s snapshot.Schedule) Option

WithSnapshots returns an Option that add a Snapshot Store to a Repository.

A Repository that has a Snapshot Store will fetch the latest valid Snapshot for an aggregate before fetching the necessary events to reconstruct the state of the Agrgegate.

An optional Snapshot Schedule can be provided to instruct the Repository to make and save Snapshots into the Snapshot Store when appropriate:

var store snapshot.Store
r := repository.New(store, snapshot.Every(3))

The example above will make a Snapshot of an aggregate every third version of the aggregate.

Aggregates must implement snapshot.Marshaler & snapshot.Unmarshaler in order for Snapshots to work.

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository provides an event-sourced aggregate repository for persisting and querying aggregates. It uses an event.Store to persist and query aggregates. Repository supports snapshots, hooks for inserting events, and query modifiers. It also supports deleting an aggregate by deleting its events from the event store. Use the Query method to query the event store for events that match a given query and use the returned Histories to build the current state of the queried aggregates.

func New

func New(store event.Store, opts ...Option) *Repository

New returns an event-sourced aggregate Repository. It uses the provided event Store to persist and query aggregates.

func (*Repository) Delete

func (r *Repository) Delete(ctx context.Context, a aggregate.Aggregate) error

Delete deletes an aggregate by deleting its events from the event store.

func (*Repository) Fetch

Fetch fetches the events of the provided aggregate from the event store and applies them to it to build its current state.

It is allowed to pass an aggregate that does't have any events in the event store yet.

It is also allowed to pass an aggregate that has already events applied onto it. Only events with a version higher than the current version of the passed Aggregate are fetched from the event store.

func (*Repository) FetchVersion

func (r *Repository) FetchVersion(ctx context.Context, a aggregate.Aggregate, v int) error

FetchVersion does the same as r.Fetch, but only fetches events up until the given version v. If the event store has no event for the provided aggregate with the requested version, ErrVersionNotFound is returned.

func (*Repository) Query

func (r *Repository) Query(ctx context.Context, q aggregate.Query) (<-chan aggregate.History, <-chan error, error)

Query queries the event store for events that match the given Query and returns a stream of aggregate Histories and errors. Use the returned Histories to build the current state of the queried aggregates:

var r *Repository
str, errs, err := r.Query(context.TODO(), query.New(...))
// handle err
histories, err := streams.Drain(context.TODO(), str, errs)
// handle err
for _, his := range histories {
	aggregateName := his.AggregateName()
	aggregateID := his.AggregateID()

	// Create the aggregate from its name and UUID
	foo := newFoo(aggregateID)

	// Then apply its History
	his.Apply(foo)
}

func (*Repository) Save

Save saves the changes to an aggregate into the underlying event store and flushes its changes afterwards (by calling a.FlushChanges).

func (*Repository) Use

func (r *Repository) Use(ctx context.Context, a aggregate.Aggregate, fn func() error) error

Use first fetches the aggregate a, then calls fn(a) and finally saves the aggregate. If the RetryUse() option is used, Use() is retried up to the configured maxTries option.

type RetryTrigger added in v0.1.2

type RetryTrigger interface {
	// contains filtered or unexported methods
}

A RetryTrigger triggers a retry of Repository.Use().

func RetryApprox added in v0.1.2

func RetryApprox(interval, deviation time.Duration, maxTries int) RetryTrigger

RetryApprox returns a RetryTrigger that retries approximately every interval up to maxTries. The provided deviation is used to randomize the interval. If the interval is 1s and deviation is 100ms, then the retry is triggered after somewhere between 900ms to 1100ms.

func RetryEvery added in v0.1.2

func RetryEvery(interval time.Duration, maxTries int) RetryTrigger

RetryEvery returns a RetryTrigger that retries every interval up to maxTries.

type RetryTriggerFunc added in v0.1.2

type RetryTriggerFunc func(context.Context) error

RetryTriggerFunc allows a function to be used as a RetryTrigger.

type Retryer added in v0.1.2

type Retryer interface {
	RetryUse() (RetryTrigger, IsRetryable)
}

Retryer is an aggregate that can retry a failed Repository.Use() operation. If the RetryUse() method of the aggregate returns a non-nil RetryTrigger rt, failed Repository.Use() calls will be retried until rt.next() returns a non-nil error. The returned IsRetryable function is used to determine if the error that made Repository.Use() fail is retryable.

The following example retries calls that failed due to consistency errors. It is retried every second, up to 3 times before giving up. If the call does not succeed after 3 tries, the error of the last attempt is returned to the caller.

type Foo struct { *aggregate.Base }
func (f *Foo) RetryUse() (rt repository.RetryTrigger, isRetryable repository.IsRetryable) {
  return repository.RetryEvery(time.Second, 3), aggregate.IsConsistencyError
}

type TypedRepository added in v0.1.2

type TypedRepository[Aggregate aggregate.TypedAggregate] struct {
	// contains filtered or unexported fields
}

TypedRepository implements aggregate.TypedRepository. Use Typed to create a typed repository from an underlying repository.

type Foo struct { *aggregate.Base }
func NewFoo(id uuid.UUID) *Foo { return &Foo{Base: aggregate.New("foo", id)} }

var repo aggregate.Repository
typed := repository.Typed(repo, NewFoo)

Now you can use the typed repository like this:

foo, err := typed.Fetch(context.TODO(), uuid.UUID{...})

For comparison, using an untyped repository:

var foo Foo
err := repo.Fetch(context.TODO(), &foo)

func NewOf added in v0.1.2

func NewOf[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]

NewOf is an alias for Typed.

func Typed added in v0.1.2

func Typed[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]

Typed returns a TypedRepository for the given generic aggregate type. The provided makeFunc is used to initialize aggregates when querying from the event store.

type Foo struct { *aggregate.Base }
func NewFoo(id uuid.UUID) *Foo { return &Foo{Base: aggregate.New("foo", id)} }

var repo aggregate.Repository
typed := repository.Typed(repo, NewFoo)

func (*TypedRepository[Aggregate]) Delete added in v0.1.2

func (r *TypedRepository[Aggregate]) Delete(ctx context.Context, a Aggregate) error

Delete implements aggregate.TypedRepository.Delete.

func (*TypedRepository[Aggregate]) Fetch added in v0.1.2

func (r *TypedRepository[Aggregate]) Fetch(ctx context.Context, id uuid.UUID) (Aggregate, error)

Fetch implements aggregate.TypedRepository.Fetch.

func (*TypedRepository[Aggregate]) FetchVersion added in v0.1.2

func (r *TypedRepository[Aggregate]) FetchVersion(ctx context.Context, id uuid.UUID, version int) (Aggregate, error)

FetchVersion implements aggregate.TypedRepository.FetchVersion.

func (*TypedRepository[Aggregate]) NewFunc added in v0.1.2

func (r *TypedRepository[Aggregate]) NewFunc() func(uuid.UUID) Aggregate

NewFunc returns the underlying constructor function for the aggregate of this repository.

func (*TypedRepository[Aggregate]) Query added in v0.1.2

func (r *TypedRepository[Aggregate]) Query(ctx context.Context, q aggregate.Query) (<-chan Aggregate, <-chan error, error)

Query implements aggregate.TypedRepository.Query.

func (*TypedRepository[Aggregate]) Repository added in v0.1.2

func (r *TypedRepository[Aggregate]) Repository() aggregate.Repository

Repository returns the underlying, untyped aggregate.Repository.

func (*TypedRepository[Aggregate]) Save added in v0.1.2

func (r *TypedRepository[Aggregate]) Save(ctx context.Context, a Aggregate) error

Save implements aggregate.TypedRepository.Save.

func (*TypedRepository[Aggregate]) Use added in v0.1.2

func (r *TypedRepository[Aggregate]) Use(ctx context.Context, id uuid.UUID, fn func(Aggregate) error) error

Use implements aggregate.TypedRepository.Use.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL