Documentation ¶
Index ¶
- Variables
- type CachedRepository
- type ChangeDiscarder
- type IsRetryable
- type Option
- func AfterInsert(fn func(context.Context, aggregate.Aggregate) error) Option
- func BeforeInsert(fn func(context.Context, aggregate.Aggregate) error) Option
- func ModifyQueries(...) Option
- func OnDelete(fn func(context.Context, aggregate.Aggregate) error) Option
- func OnFailedInsert(fn func(context.Context, aggregate.Aggregate, error) error) Option
- func ValidateConsistency(validate bool) Option
- func WithSnapshots(store snapshot.Store, s snapshot.Schedule) Option
- type Repository
- func (r *Repository) Delete(ctx context.Context, a aggregate.Aggregate) error
- func (r *Repository) Fetch(ctx context.Context, a aggregate.Aggregate) error
- func (r *Repository) FetchVersion(ctx context.Context, a aggregate.Aggregate, v int) error
- func (r *Repository) Query(ctx context.Context, q aggregate.Query) (<-chan aggregate.History, <-chan error, error)
- func (r *Repository) Save(ctx context.Context, a aggregate.Aggregate) error
- func (r *Repository) Use(ctx context.Context, a aggregate.Aggregate, fn func() error) error
- type RetryTrigger
- type RetryTriggerFunc
- type Retryer
- type TypedRepository
- func (r *TypedRepository[Aggregate]) Delete(ctx context.Context, a Aggregate) error
- func (r *TypedRepository[Aggregate]) Fetch(ctx context.Context, id uuid.UUID) (Aggregate, error)
- func (r *TypedRepository[Aggregate]) FetchVersion(ctx context.Context, id uuid.UUID, version int) (Aggregate, error)
- func (r *TypedRepository[Aggregate]) NewFunc() func(uuid.UUID) Aggregate
- func (r *TypedRepository[Aggregate]) Query(ctx context.Context, q aggregate.Query) (<-chan Aggregate, <-chan error, error)
- func (r *TypedRepository[Aggregate]) Repository() aggregate.Repository
- func (r *TypedRepository[Aggregate]) Save(ctx context.Context, a Aggregate) error
- func (r *TypedRepository[Aggregate]) Use(ctx context.Context, id uuid.UUID, fn func(Aggregate) error) error
Constants ¶
This section is empty.
Variables ¶
var ( // ErrVersionNotFound is an error returned when a requested aggregate version // cannot be found in the event store or snapshot store. ErrVersionNotFound = errors.New("version not found") // ErrDeleted is an error returned when an aggregate has been soft-deleted and // an operation on the deleted aggregate is attempted. ErrDeleted = errors.New("aggregate was soft-deleted") )
Functions ¶
This section is empty.
Types ¶
type CachedRepository ¶ added in v0.4.4
type CachedRepository[Aggregate aggregate.TypedAggregate] struct { aggregate.TypedRepository[Aggregate] // contains filtered or unexported fields }
CachedRepository is a type that provides a caching layer over an underlying repository of typed aggregates. It stores fetched aggregates in memory to reduce the need for repeated fetches from the wrapped repository. It uses UUIDs as keys to access stored aggregates. CachedRepository is safe for concurrent use.
CachedRepository currently only caches calls to Fetch.
func Cached ¶ added in v0.4.4
func Cached[Aggregate aggregate.TypedAggregate](repo aggregate.TypedRepository[Aggregate]) *CachedRepository[Aggregate]
Cached returns a new CachedRepository. If the provided repository is already a CachedRepository, it is returned as is. Otherwise, a new CachedRepository is created with the provided repository as its underlying repository. The returned CachedRepository uses an in-memory cache to avoid unnecessary fetches from the underlying repository.
func (*CachedRepository[Aggregate]) Clear ¶ added in v0.4.4
func (repo *CachedRepository[Aggregate]) Clear(ids ...uuid.UUID)
Clear removes the specified aggregates from the cache of a CachedRepository. If no UUIDs are provided, it clears all aggregates from the cache. This operation is safe for concurrent use.
func (*CachedRepository[Aggregate]) Fetch ¶ added in v0.4.4
func (repo *CachedRepository[Aggregate]) Fetch(ctx context.Context, id uuid.UUID) (Aggregate, error)
Fetch retrieves an aggregate of type Aggregate from the CachedRepository. If the aggregate is present in the cache, it's returned directly. Otherwise, Fetch retrieves the aggregate from the underlying TypedRepository, stores it in the cache for future retrievals, and then returns it. An error is returned if there was a problem fetching the aggregate from the TypedRepository.
type ChangeDiscarder ¶ added in v0.1.2
type ChangeDiscarder interface { // DiscardChanges discards any changes made to the underlying data within the // ChangeDiscarder, effectively reverting it to its original state before the // changes were made. DiscardChanges() }
ChangeDiscarder is an interface that provides a method for discarding changes in a repository.
type IsRetryable ¶ added in v0.1.2
IsRetryable is a function type that determines if an error should trigger a retry. It returns true if the error is considered retryable, and false otherwise. It is typically used in conjunction with RetryTrigger to implement custom retry strategies in a Retryer interface.
type Option ¶
type Option func(*Repository)
Option is a function that modifies the configuration of a Repository. It is used to customize the behavior of a Repository by providing hooks, enabling consistency validation, modifying event queries, and configuring snapshot handling.
func AfterInsert ¶
AfterInsert appends a function to the Repository's afterInsert slice. The function will be called after an Aggregate's events are successfully inserted into the event store during the Save operation.
func BeforeInsert ¶
BeforeInsert is an Option for the Repository that appends a function to the beforeInsert slice. The function is called with the aggregate and context before its events are inserted into the event store. If the function returns an error, the insertion of events is aborted and the error is returned.
func ModifyQueries ¶
func ModifyQueries(mods ...func(ctx context.Context, q aggregate.Query, prev event.Query) (event.Query, error)) Option
ModifyQueries appends the provided query modifiers to the Repository's queryModifiers slice. These modifiers are applied to event queries when executing an aggregate.Query with the Repository.
func OnDelete ¶
OnDelete appends a function to the Repository's onDelete slice. The function will be called with the aggregate and context when the Delete method is called for the given aggregate. If the function returns an error, the deletion process is halted and the error is returned.
func OnFailedInsert ¶
OnFailedInsert is an Option for the Repository that appends a function to the onFailedInsert slice. The function is called with the aggregate, context and error if an error occurs during the insertion of events into the event store. If the function returns an error, that error is returned, otherwise the original insertion error is returned.
func ValidateConsistency ¶ added in v0.2.7
ValidateConsistency is an Option for the Repository that configures whether consistency validation should be performed when saving an Aggregate. If set to true (default), the Repository will validate consistency before inserting events into the event store. If set to false, consistency validation will be skipped.
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
Repository is responsible for saving, fetching, and deleting aggregates while handling snapshots, consistency validation, and various hooks. It supports querying events from the event store, applying an aggregate's event history, and managing aggregate versions. Additionally, it provides customizable options to modify queries and manage hooks before and after inserting events or on failed insertions.
func New ¶
func New(store event.Store, opts ...Option) *Repository
New creates a new Repository instance with the provided event.Store and options. The Repository is used for saving, fetching, and deleting aggregates while handling snapshots, consistency validation, and various hooks.
func (*Repository) Delete ¶
Delete fetches the aggregate's events from the event store, deletes them, and calls OnDelete hooks. It returns an error if the deletion fails or any of the OnDelete hooks return an error.
func (*Repository) Fetch ¶
Fetch retrieves the latest state of the provided aggregate by applying its event history. If the aggregate implements snapshot.Target and a snapshot store is configured, Fetch loads the latest snapshot and applies events that occurred after the snapshot was taken.
func (*Repository) FetchVersion ¶
FetchVersion fetches the specified version of the aggregate from the event store and applies its history. It returns ErrVersionNotFound if the requested version is not found, and ErrDeleted if the aggregate was soft-deleted.
func (*Repository) Query ¶
func (r *Repository) Query(ctx context.Context, q aggregate.Query) (<-chan aggregate.History, <-chan error, error)
Query returns a channel of aggregate.History and a channel of errors by executing the provided aggregate.Query. An error is returned if there is an issue with constructing the event.Query or querying events from the event store.
func (*Repository) Save ¶
Save stores the changes of an Aggregate into the event store and creates a snapshot of the Aggregate if the snapshot schedule is met. It validates consistency and calls the appropriate hooks before and after inserting events. If an error occurs, it calls the OnFailedInsert hook.
type RetryTrigger ¶ added in v0.1.2
type RetryTrigger interface {
// contains filtered or unexported methods
}
RetryTrigger is an interface that defines a method for determining the next attempt's timing in a retryable operation. It is used in conjunction with IsRetryable to implement custom retry strategies within a Retryer interface.
func RetryApprox ¶ added in v0.1.2
func RetryApprox(interval, deviation time.Duration, maxTries int) RetryTrigger
RetryApprox creates a RetryTrigger that retries an operation with a randomized interval between attempts. The interval is calculated by adding a random percentage of deviation to the base interval, and the maximum number of attempts is specified by maxTries.
func RetryEvery ¶ added in v0.1.2
func RetryEvery(interval time.Duration, maxTries int) RetryTrigger
RetryEvery returns a RetryTrigger that retries an operation at a fixed interval for a specified number of maxTries. The operation will be retried until the maximum number of tries is reached, or the provided context is canceled.
type RetryTriggerFunc ¶ added in v0.1.2
RetryTriggerFunc is a function type that implements the RetryTrigger interface, allowing users to define custom logic for determining the timing of retry attempts in a Retryer. It takes a context as an input and returns an error if any occurs during its execution.
type Retryer ¶ added in v0.1.2
type Retryer interface { // RetryUse returns a RetryTrigger and an IsRetryable function for use in // retrying operations. The RetryTrigger determines the next attempt's timing, // and the IsRetryable function checks if an error is retryable. RetryUse() (RetryTrigger, IsRetryable) }
Retryer is an interface for providing a retry mechanism in operations. It consists of a RetryTrigger, which determines the timing of the next attempt, and an IsRetryable function, which checks if an error should be retried.
type TypedRepository ¶ added in v0.1.2
type TypedRepository[Aggregate aggregate.TypedAggregate] struct { // contains filtered or unexported fields }
TypedRepository is a specialized version of an aggregate repository that deals with a specific type of aggregate. It provides methods for saving, fetching, querying and deleting aggregates of a certain type. The type of the aggregate this repository works with is determined by the factory function passed during the creation of the TypedRepository instance. It also provides access to the underlying generic repository and the factory function used to create new instances of the aggregate. The provided context is used for cancellation and timeout handling.
func NewOf ¶ added in v0.1.2
func NewOf[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]
NewOf creates a new instance of a TypedRepository for the specified Aggregate type. The provided aggregate.Repository and make function are used to construct the TypedRepository. The make function is used to create new instances of the Aggregate when needed.
func Typed ¶ added in v0.1.2
func Typed[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]
Typed constructs a new TypedRepository for the provided aggregate.Repository and makeFunc. The makeFunc is used to create new instances of Aggregate. The returned TypedRepository only operates on Aggregates of the type created by makeFunc.
func (*TypedRepository[Aggregate]) Delete ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Delete(ctx context.Context, a Aggregate) error
Delete removes the specified Aggregate from the TypedRepository. It operates within the context passed to it. If any error occurs during the deletion, it is returned for handling.
func (*TypedRepository[Aggregate]) Fetch ¶ added in v0.1.2
Fetch retrieves the aggregate of the specified type and identifier from the repository. It returns the fetched aggregate and any error encountered during the process. If the aggregate does not exist, Fetch returns an error.
func (*TypedRepository[Aggregate]) FetchVersion ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) FetchVersion(ctx context.Context, id uuid.UUID, version int) (Aggregate, error)
FetchVersion retrieves a specific version of the aggregate identified by its UUID from the repository. The function returns an error if the requested version does not exist or if there is an issue fetching it from the repository.
func (*TypedRepository[Aggregate]) NewFunc ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) NewFunc() func(uuid.UUID) Aggregate
NewFunc returns a function that creates a new instance of the Aggregate type. The returned function takes a UUID as an argument and returns an Aggregate. The UUID is used to identify the new Aggregate instance.
func (*TypedRepository[Aggregate]) Query ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Query(ctx context.Context, q aggregate.Query) (<-chan Aggregate, <-chan error, error)
Query returns a channel of Aggregates and a channel of errors found during the query execution. The Aggregates are retrieved from the underlying repository and are of the type that the TypedRepository is configured for. The returned Aggregates are created with the factory function provided during the construction of the TypedRepository. If an Aggregate is not of the expected type, it's skipped. The query execution can be cancelled through the provided context. The function also returns an error if there's an issue executing the query on the underlying repository.
func (*TypedRepository[Aggregate]) Repository ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Repository() aggregate.Repository
Repository returns the underlying aggregate.Repository of the TypedRepository. This allows direct access to the repository that stores and retrieves the aggregates.
func (*TypedRepository[Aggregate]) Save ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Save(ctx context.Context, a Aggregate) error
Save stores the provided aggregate into the repository. It takes a context for cancellation and deadline purposes and the aggregate to be stored. Returns an error if the saving process encounters any issues.
func (*TypedRepository[Aggregate]) Use ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Use(ctx context.Context, id uuid.UUID, fn func(Aggregate) error) error
Use retrieves an [Aggregate] with the provided UUID, applies the function fn to it, and then saves the [Aggregate] back into the repository. If fn returns an error, the [Aggregate] is not saved and the error is returned. The operation is performed in a context, and its execution may be cancelled if the context is closed.