Documentation ¶
Index ¶
- Variables
- type ChangeDiscarder
- type IsRetryable
- type Option
- func AfterInsert(fn func(context.Context, aggregate.Aggregate) error) Option
- func BeforeInsert(fn func(context.Context, aggregate.Aggregate) error) Option
- func ModifyQueries(...) Option
- func OnDelete(fn func(context.Context, aggregate.Aggregate) error) Option
- func OnFailedInsert(fn func(context.Context, aggregate.Aggregate, error) error) Option
- func ValidateConsistency(validate bool) Option
- func WithSnapshots(store snapshot.Store, s snapshot.Schedule) Option
- type Repository
- func (r *Repository) Delete(ctx context.Context, a aggregate.Aggregate) error
- func (r *Repository) Fetch(ctx context.Context, a aggregate.Aggregate) error
- func (r *Repository) FetchVersion(ctx context.Context, a aggregate.Aggregate, v int) error
- func (r *Repository) Query(ctx context.Context, q aggregate.Query) (<-chan aggregate.History, <-chan error, error)
- func (r *Repository) Save(ctx context.Context, a aggregate.Aggregate) error
- func (r *Repository) Use(ctx context.Context, a aggregate.Aggregate, fn func() error) error
- type RetryTrigger
- type RetryTriggerFunc
- type Retryer
- type TypedRepository
- func (r *TypedRepository[Aggregate]) Delete(ctx context.Context, a Aggregate) error
- func (r *TypedRepository[Aggregate]) Fetch(ctx context.Context, id uuid.UUID) (Aggregate, error)
- func (r *TypedRepository[Aggregate]) FetchVersion(ctx context.Context, id uuid.UUID, version int) (Aggregate, error)
- func (r *TypedRepository[Aggregate]) NewFunc() func(uuid.UUID) Aggregate
- func (r *TypedRepository[Aggregate]) Query(ctx context.Context, q aggregate.Query) (<-chan Aggregate, <-chan error, error)
- func (r *TypedRepository[Aggregate]) Repository() aggregate.Repository
- func (r *TypedRepository[Aggregate]) Save(ctx context.Context, a Aggregate) error
- func (r *TypedRepository[Aggregate]) Use(ctx context.Context, id uuid.UUID, fn func(Aggregate) error) error
Constants ¶
This section is empty.
Variables ¶
var ( // ErrVersionNotFound is an error returned when a requested aggregate version // cannot be found in the event store or snapshot store. ErrVersionNotFound = errors.New("version not found") // ErrDeleted is an error returned when an aggregate has been soft-deleted and // an operation on the deleted aggregate is attempted. ErrDeleted = errors.New("aggregate was soft-deleted") )
Functions ¶
This section is empty.
Types ¶
type ChangeDiscarder ¶ added in v0.1.2
type ChangeDiscarder interface { // DiscardChanges discards any changes made to the underlying data within the // ChangeDiscarder, effectively reverting it to its original state before the // changes were made. DiscardChanges() }
ChangeDiscarder is an interface that provides a method for discarding changes in a repository.
type IsRetryable ¶ added in v0.1.2
IsRetryable is a function type that determines if an error should trigger a retry. It returns true if the error is considered retryable, and false otherwise. It is typically used in conjunction with RetryTrigger to implement custom retry strategies in a Retryer interface.
type Option ¶
type Option func(*Repository)
Option is a function that modifies the configuration of a Repository. It is used to customize the behavior of a Repository by providing hooks, enabling consistency validation, modifying event queries, and configuring snapshot handling.
func AfterInsert ¶
AfterInsert appends a function to the Repository's afterInsert slice. The function will be called after an Aggregate's events are successfully inserted into the event store during the Save operation.
func BeforeInsert ¶
BeforeInsert is an Option for the Repository that appends a function to the beforeInsert slice. The function is called with the aggregate and context before its events are inserted into the event store. If the function returns an error, the insertion of events is aborted and the error is returned.
func ModifyQueries ¶
func ModifyQueries(mods ...func(ctx context.Context, q aggregate.Query, prev event.Query) (event.Query, error)) Option
ModifyQueries appends the provided query modifiers to the Repository's queryModifiers slice. These modifiers are applied to event queries when executing an aggregate.Query with the Repository.
func OnDelete ¶
OnDelete appends a function to the Repository's onDelete slice. The function will be called with the aggregate and context when the Delete method is called for the given aggregate. If the function returns an error, the deletion process is halted and the error is returned.
func OnFailedInsert ¶
OnFailedInsert is an Option for the Repository that appends a function to the onFailedInsert slice. The function is called with the aggregate, context and error if an error occurs during the insertion of events into the event store. If the function returns an error, that error is returned, otherwise the original insertion error is returned.
func ValidateConsistency ¶ added in v0.2.7
ValidateConsistency is an Option for the Repository that configures whether consistency validation should be performed when saving an Aggregate. If set to true (default), the Repository will validate consistency before inserting events into the event store. If set to false, consistency validation will be skipped.
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
Repository is responsible for saving, fetching, and deleting aggregates while handling snapshots, consistency validation, and various hooks. It supports querying events from the event store, applying an aggregate's event history, and managing aggregate versions. Additionally, it provides customizable options to modify queries and manage hooks before and after inserting events or on failed insertions.
func New ¶
func New(store event.Store, opts ...Option) *Repository
New creates a new Repository instance with the provided event.Store and options. The Repository is used for saving, fetching, and deleting aggregates while handling snapshots, consistency validation, and various hooks.
func (*Repository) Delete ¶
Delete fetches the aggregate's events from the event store, deletes them, and calls OnDelete hooks. It returns an error if the deletion fails or any of the OnDelete hooks return an error.
func (*Repository) Fetch ¶
Fetch retrieves the latest state of the provided aggregate by applying its event history. If the aggregate implements snapshot.Target and a snapshot store is configured, Fetch loads the latest snapshot and applies events that occurred after the snapshot was taken.
func (*Repository) FetchVersion ¶
FetchVersion fetches the specified version of the aggregate from the event store and applies its history. It returns ErrVersionNotFound if the requested version is not found, and ErrDeleted if the aggregate was soft-deleted.
func (*Repository) Query ¶
func (r *Repository) Query(ctx context.Context, q aggregate.Query) (<-chan aggregate.History, <-chan error, error)
Query returns a channel of aggregate.History and a channel of errors by executing the provided aggregate.Query. An error is returned if there is an issue with constructing the event.Query or querying events from the event store.
func (*Repository) Save ¶
Save stores the changes of an Aggregate into the event store and creates a snapshot of the Aggregate if the snapshot schedule is met. It validates consistency and calls the appropriate hooks before and after inserting events. If an error occurs, it calls the OnFailedInsert hook.
type RetryTrigger ¶ added in v0.1.2
type RetryTrigger interface {
// contains filtered or unexported methods
}
RetryTrigger is an interface that defines a method for determining the next attempt's timing in a retryable operation. It is used in conjunction with IsRetryable to implement custom retry strategies within a Retryer interface.
func RetryApprox ¶ added in v0.1.2
func RetryApprox(interval, deviation time.Duration, maxTries int) RetryTrigger
RetryApprox creates a RetryTrigger that retries an operation with a randomized interval between attempts. The interval is calculated by adding a random percentage of deviation to the base interval, and the maximum number of attempts is specified by maxTries.
func RetryEvery ¶ added in v0.1.2
func RetryEvery(interval time.Duration, maxTries int) RetryTrigger
RetryEvery returns a RetryTrigger that retries an operation at a fixed interval for a specified number of maxTries. The operation will be retried until the maximum number of tries is reached, or the provided context is canceled.
type RetryTriggerFunc ¶ added in v0.1.2
RetryTriggerFunc is a function type that implements the RetryTrigger interface, allowing users to define custom logic for determining the timing of retry attempts in a Retryer. It takes a context as an input and returns an error if any occurs during its execution.
type Retryer ¶ added in v0.1.2
type Retryer interface { // RetryUse returns a RetryTrigger and an IsRetryable function for use in // retrying operations. The RetryTrigger determines the next attempt's timing, // and the IsRetryable function checks if an error is retryable. RetryUse() (RetryTrigger, IsRetryable) }
Retryer is an interface for providing a retry mechanism in operations. It consists of a RetryTrigger, which determines the timing of the next attempt, and an IsRetryable function, which checks if an error should be retried.
type TypedRepository ¶ added in v0.1.2
type TypedRepository[Aggregate aggregate.TypedAggregate] struct { // contains filtered or unexported fields }
TypedRepository is a wrapper around aggregate.Repository that works with specific typed aggregates. It provides methods to save, fetch, query, and delete typed aggregates and utilizes an aggregate factory function to instantiate aggregates when needed.
func NewOf ¶ added in v0.1.2
func NewOf[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]
NewOf creates a new TypedRepository for the specified aggregate.TypedAggregate using the provided aggregate.Repository and factory function to instantiate Aggregates with a given UUID.
func Typed ¶ added in v0.1.2
func Typed[Aggregate aggregate.TypedAggregate](r aggregate.Repository, makeFunc func(uuid.UUID) Aggregate) *TypedRepository[Aggregate]
Typed returns a new TypedRepository for the provided aggregate.Repository and makeFunc. The makeFunc is used to create new instances of the typed Aggregate with a given UUID when fetching them from the underlying aggregate.Repository.
func (*TypedRepository[Aggregate]) Delete ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Delete(ctx context.Context, a Aggregate) error
Delete removes the specified Aggregate from the repository.
func (*TypedRepository[Aggregate]) Fetch ¶ added in v0.1.2
Fetch retrieves the latest version of an Aggregate with the given UUID from the repository, returning the fetched Aggregate and any error encountered.
func (*TypedRepository[Aggregate]) FetchVersion ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) FetchVersion(ctx context.Context, id uuid.UUID, version int) (Aggregate, error)
FetchVersion retrieves an Aggregate with the specified ID and version from the TypedRepository, returning an error if it cannot be fetched.
func (*TypedRepository[Aggregate]) NewFunc ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) NewFunc() func(uuid.UUID) Aggregate
NewFunc returns the function that creates a new typed Aggregate with the given UUID. This function is used by the TypedRepository to instantiate Aggregates when fetching them from the underlying aggregate.Repository.
func (*TypedRepository[Aggregate]) Query ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Query(ctx context.Context, q aggregate.Query) (<-chan Aggregate, <-chan error, error)
Query retrieves Aggregates from the underlying aggregate.Repository that match the specified aggregate.Query. It returns channels for receiving the Aggregates and any errors encountered during the query, as well as an error if the query itself cannot be executed.
func (*TypedRepository[Aggregate]) Repository ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Repository() aggregate.Repository
Repository returns the underlying aggregate.Repository of the TypedRepository.
func (*TypedRepository[Aggregate]) Save ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Save(ctx context.Context, a Aggregate) error
Save stores the given Aggregate in the repository and returns an error if the operation fails.
func (*TypedRepository[Aggregate]) Use ¶ added in v0.1.2
func (r *TypedRepository[Aggregate]) Use(ctx context.Context, id uuid.UUID, fn func(Aggregate) error) error
Use retrieves an Aggregate with the specified ID from the TypedRepository, applies the given function to it, and returns any error encountered during this process.