repository

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 14, 2023 License: Apache-2.0 Imports: 15 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrVersionNotFound is an error returned when a requested aggregate version
	// cannot be found in the event store or snapshot store.
	ErrVersionNotFound = errors.New("version not found")

	// ErrDeleted is an error returned when an aggregate has been soft-deleted and
	// an operation on the deleted aggregate is attempted.
	ErrDeleted = errors.New("aggregate was soft-deleted")
)

Functions

This section is empty.

Types

type ChangeDiscarder added in v0.1.2

type ChangeDiscarder interface {
	// DiscardChanges discards any changes made to the underlying data within the
	// ChangeDiscarder, effectively reverting it to its original state before the
	// changes were made.
	DiscardChanges()
}

ChangeDiscarder is an interface that provides a method for discarding changes in a repository.

type IsRetryable added in v0.1.2

type IsRetryable func(error) bool

IsRetryable is a function type that determines if an error should trigger a retry. It returns true if the error is considered retryable, and false otherwise. It is typically used in conjunction with RetryTrigger to implement custom retry strategies in a Retryer interface.

type Option

type Option func(*Repository)

Option is a function that modifies the configuration of a Repository. It is used to customize the behavior of a Repository by providing hooks, enabling consistency validation, modifying event queries, and configuring snapshot handling.

func AfterInsert

func AfterInsert(fn func(context.Context, aggregate.Aggregate) error) Option

AfterInsert appends a function to the Repository's afterInsert slice. The function will be called after an Aggregate's events are successfully inserted into the event store during the Save operation.

func BeforeInsert

func BeforeInsert(fn func(context.Context, aggregate.Aggregate) error) Option

BeforeInsert is an Option for the Repository that appends a function to the beforeInsert slice. The function is called with the aggregate and context before its events are inserted into the event store. If the function returns an error, the insertion of events is aborted and the error is returned.

func ModifyQueries

func ModifyQueries(mods ...func(ctx context.Context, q aggregate.Query, prev event.Query) (event.Query, error)) Option

ModifyQueries appends the provided query modifiers to the Repository's queryModifiers slice. These modifiers are applied to event queries when executing an aggregate.Query with the Repository.

func OnDelete

func OnDelete(fn func(context.Context, aggregate.Aggregate) error) Option

OnDelete appends a function to the Repository's onDelete slice. The function will be called with the aggregate and context when the Delete method is called for the given aggregate. If the function returns an error, the deletion process is halted and the error is returned.

func OnFailedInsert

func OnFailedInsert(fn func(context.Context, aggregate.Aggregate, error) error) Option

OnFailedInsert is an Option for the Repository that appends a function to the onFailedInsert slice. The function is called with the aggregate, context and error if an error occurs during the insertion of events into the event store. If the function returns an error, that error is returned, otherwise the original insertion error is returned.

func ValidateConsistency added in v0.2.7

func ValidateConsistency(validate bool) Option

ValidateConsistency is an Option for the Repository that configures whether consistency validation should be performed when saving an Aggregate. If set to true (default), the Repository will validate consistency before inserting events into the event store. If set to false, consistency validation will be skipped.

func WithSnapshots

func WithSnapshots(store snapshot.Store, s snapshot.Schedule) Option

WithSnapshots configures the Repository to use the provided snapshot.Store and snapshot.Schedule for saving and loading aggregate snapshots. The function panics if the provided snapshot.Store is nil.

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository is responsible for saving, fetching, and deleting aggregates while handling snapshots, consistency validation, and various hooks. It supports querying events from the event store, applying an aggregate's event history, and managing aggregate versions. Additionally, it provides customizable options to modify queries and manage hooks before and after inserting events or on failed insertions.

func New

func New(store event.Store, opts ...Option) *Repository

New creates a new Repository instance with the provided event.Store and options. The Repository is used for saving, fetching, and deleting aggregates while handling snapshots, consistency validation, and various hooks.

func (*Repository) Delete

func (r *Repository) Delete(ctx context.Context, a aggregate.Aggregate) error

Delete fetches the aggregate's events from the event store, deletes them, and calls OnDelete hooks. It returns an error if the deletion fails or any of the OnDelete hooks return an error.

func (*Repository) Fetch

Fetch retrieves the latest state of the provided aggregate by applying its event history. If the aggregate implements snapshot.Target and a snapshot store is configured, Fetch loads the latest snapshot and applies events that occurred after the snapshot was taken.

func (*Repository) FetchVersion

func (r *Repository) FetchVersion(ctx context.Context, a aggregate.Aggregate, v int) error

FetchVersion fetches the specified version of the aggregate from the event store and applies its history. It returns ErrVersionNotFound if the requested version is not found, and ErrDeleted if the aggregate was soft-deleted.

func (*Repository) Query

func (r *Repository) Query(ctx context.Context, q aggregate.Query) (<-chan aggregate.History, <-chan error, error)

Query returns a channel of aggregate.History and a channel of errors by executing the provided aggregate.Query. An error is returned if there is an issue with constructing the event.Query or querying events from the event store.

func (*Repository) Save

Save stores the changes of an Aggregate into the event store and creates a snapshot of the Aggregate if the snapshot schedule is met. It validates consistency and calls the appropriate hooks before and after inserting events. If an error occurs, it calls the OnFailedInsert hook.

func (*Repository) Use

func (r *Repository) Use(ctx context.Context, a aggregate.Aggregate, fn func() error) error

Use fetches an aggregate, executes the provided function, and saves the aggregate. It retries the process if the aggregate is a Retryer and an IsRetryable error occurs.

type RetryTrigger added in v0.1.2

type RetryTrigger interface {
	// contains filtered or unexported methods
}

RetryTrigger is an interface that defines a method for determining the next attempt's timing in a retryable operation. It is used in conjunction with IsRetryable to implement custom retry strategies within a Retryer interface.

func RetryApprox added in v0.1.2

func RetryApprox(interval, deviation time.Duration, maxTries int) RetryTrigger

RetryApprox creates a RetryTrigger that retries an operation with a randomized interval between attempts. The interval is calculated by adding a random percentage of deviation to the base interval, and the maximum number of attempts is specified by maxTries.

func RetryEvery added in v0.1.2

func RetryEvery(interval time.Duration, maxTries int) RetryTrigger

RetryEvery returns a RetryTrigger that retries an operation at a fixed interval for a specified number of maxTries. The operation will be retried until the maximum number of tries is reached, or the provided context is canceled.

type RetryTriggerFunc added in v0.1.2

type RetryTriggerFunc func(context.Context) error

RetryTriggerFunc is a function type that implements the RetryTrigger interface, allowing users to define custom logic for determining the timing of retry attempts in a Retryer. It takes a context as an input and returns an error if any occurs during its execution.

type Retryer added in v0.1.2

type Retryer interface {
	// RetryUse returns a RetryTrigger and an IsRetryable function for use in
	// retrying operations. The RetryTrigger determines the next attempt's timing,
	// and the IsRetryable function checks if an error is retryable.
	RetryUse() (RetryTrigger, IsRetryable)
}

Retryer is an interface for providing a retry mechanism in operations. It consists of a RetryTrigger, which determines the timing of the next attempt, and an IsRetryable function, which checks if an error should be retried.

type TypedRepository added in v0.1.2

type TypedRepository[Aggregate aggregate.TypedAggregate] struct {
	// contains filtered or unexported fields
}

TypedRepository is a wrapper around aggregate.Repository that works with specific typed aggregates. It provides methods to save, fetch, query, and delete typed aggregates and utilizes an aggregate factory function to instantiate aggregates when needed.

func NewOf added in v0.1.2

func NewOf[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]

NewOf creates a new TypedRepository for the specified aggregate.TypedAggregate using the provided aggregate.Repository and factory function to instantiate Aggregates with a given UUID.

func Typed added in v0.1.2

func Typed[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]

Typed returns a new TypedRepository for the provided aggregate.Repository and makeFunc. The makeFunc is used to create new instances of the typed Aggregate with a given UUID when fetching them from the underlying aggregate.Repository.

func (*TypedRepository[Aggregate]) Delete added in v0.1.2

func (r *TypedRepository[Aggregate]) Delete(ctx context.Context, a Aggregate) error

Delete removes the specified Aggregate from the repository.

func (*TypedRepository[Aggregate]) Fetch added in v0.1.2

func (r *TypedRepository[Aggregate]) Fetch(ctx context.Context, id uuid.UUID) (Aggregate, error)

Fetch retrieves the latest version of an Aggregate with the given UUID from the repository, returning the fetched Aggregate and any error encountered.

func (*TypedRepository[Aggregate]) FetchVersion added in v0.1.2

func (r *TypedRepository[Aggregate]) FetchVersion(ctx context.Context, id uuid.UUID, version int) (Aggregate, error)

FetchVersion retrieves an Aggregate with the specified ID and version from the TypedRepository, returning an error if it cannot be fetched.

func (*TypedRepository[Aggregate]) NewFunc added in v0.1.2

func (r *TypedRepository[Aggregate]) NewFunc() func(uuid.UUID) Aggregate

NewFunc returns the function that creates a new typed Aggregate with the given UUID. This function is used by the TypedRepository to instantiate Aggregates when fetching them from the underlying aggregate.Repository.

func (*TypedRepository[Aggregate]) Query added in v0.1.2

func (r *TypedRepository[Aggregate]) Query(ctx context.Context, q aggregate.Query) (<-chan Aggregate, <-chan error, error)

Query retrieves Aggregates from the underlying aggregate.Repository that match the specified aggregate.Query. It returns channels for receiving the Aggregates and any errors encountered during the query, as well as an error if the query itself cannot be executed.

func (*TypedRepository[Aggregate]) Repository added in v0.1.2

func (r *TypedRepository[Aggregate]) Repository() aggregate.Repository

Repository returns the underlying aggregate.Repository of the TypedRepository.

func (*TypedRepository[Aggregate]) Save added in v0.1.2

func (r *TypedRepository[Aggregate]) Save(ctx context.Context, a Aggregate) error

Save stores the given Aggregate in the repository and returns an error if the operation fails.

func (*TypedRepository[Aggregate]) Use added in v0.1.2

func (r *TypedRepository[Aggregate]) Use(ctx context.Context, id uuid.UUID, fn func(Aggregate) error) error

Use retrieves an Aggregate with the specified ID from the TypedRepository, applies the given function to it, and returns any error encountered during this process.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL