Documentation ¶
Index ¶
- Variables
- type CommandError
- type EventStore
- func (s *EventStore) Client() *mongo.Client
- func (s *EventStore) Collection() *mongo.Collection
- func (s *EventStore) Connect(ctx context.Context, opts ...*options.ClientOptions) (*mongo.Client, error)
- func (s *EventStore) Database() *mongo.Database
- func (s *EventStore) Delete(ctx context.Context, events ...event.Event) error
- func (s *EventStore) Find(ctx context.Context, id uuid.UUID) (event.Event, error)
- func (s *EventStore) Insert(ctx context.Context, events ...event.Event) error
- func (s *EventStore) Query(ctx context.Context, q event.Query) (<-chan event.Event, <-chan error, error)
- func (s *EventStore) StateCollection() *mongo.Collection
- type EventStoreOption
- func Client(c *mongo.Client) EventStoreOption
- func Collection(name string) EventStoreOption
- func Database(name string) EventStoreOption
- func NoIndex(ni bool) EventStoreOption
- func StateCollection(name string) EventStoreOption
- func Transactions(tx bool) EventStoreOption
- func URL(url string) EventStoreOption
- func ValidateVersions(v bool) EventStoreOption
- func WithIndices(models ...mongo.IndexModel) EventStoreOption
- type ModelRepository
- func (r *ModelRepository[Model, ID]) Collection() *mongo.Collection
- func (r *ModelRepository[Model, ID]) CreateIndexes(ctx context.Context) error
- func (r *ModelRepository[Model, ID]) Delete(ctx context.Context, m Model) error
- func (r *ModelRepository[Model, ID]) Fetch(ctx context.Context, id ID) (Model, error)
- func (r *ModelRepository[Model, ID]) Save(ctx context.Context, m Model) error
- func (r *ModelRepository[Model, ID]) Use(ctx context.Context, id ID, fn func(Model) error) error
- type ModelRepositoryOption
- func ModelDecoder[Model model.Model[ID], ID model.ID](decode func(*mongo.SingleResult, *Model) error) ModelRepositoryOption
- func ModelEncoder[Model model.Model[ID], ID model.ID](encode func(Model) (any, error)) ModelRepositoryOption
- func ModelFactory[Model model.Model[ID], ID model.ID](factory func(ID) Model, createIfNotFound bool) ModelRepositoryOption
- func ModelIDKey(key string) ModelRepositoryOption
- func ModelTransactions(tx bool) ModelRepositoryOption
- type Option
- type SnapshotStore
- func (s *SnapshotStore) Connect(ctx context.Context) (*mongo.Client, error)
- func (s *SnapshotStore) Delete(ctx context.Context, snap snapshot.Snapshot) error
- func (s *SnapshotStore) Latest(ctx context.Context, name string, id uuid.UUID) (snapshot.Snapshot, error)
- func (s *SnapshotStore) Limit(ctx context.Context, name string, id uuid.UUID, v int) (snapshot.Snapshot, error)
- func (s *SnapshotStore) Query(ctx context.Context, q snapshot.Query) (<-chan snapshot.Snapshot, <-chan error, error)
- func (s *SnapshotStore) Save(ctx context.Context, snap snapshot.Snapshot) error
- func (s *SnapshotStore) Version(ctx context.Context, name string, id uuid.UUID, version int) (snapshot.Snapshot, error)
- type VersionError
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotFound is returned when a snapshot can't be found in the database. ErrNotFound = errors.New("snapshot not found") )
Functions ¶
This section is empty.
Types ¶
type CommandError ¶ added in v0.1.2
type CommandError mongo.CommandError
CommandError is a mongo.CommandError that satisfies aggregate.IsConsistencyError(err).
func (CommandError) CommandError ¶ added in v0.1.2
func (err CommandError) CommandError() mongo.CommandError
CommandError returns the error as a mongo.CommandError.
func (CommandError) Error ¶ added in v0.1.2
func (err CommandError) Error() string
func (CommandError) IsConsistencyError ¶ added in v0.1.2
func (err CommandError) IsConsistencyError() bool
IsConsistencyError reports whether a CommandError is a consistency error. A CommandError is considered a consistency error if it has an error label indicating that the operation failed due to a transient or unknown transaction error.
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore is a MongoDB event store.
func NewEventStore ¶
func NewEventStore(enc codec.Encoding, opts ...EventStoreOption) *EventStore
NewEventStore returns a MongoDB event.Store.
func (*EventStore) Client ¶
func (s *EventStore) Client() *mongo.Client
Client returns the underlying mongo.Client. If no mongo.Client is provided with the Client option, Client returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query. Otherwise Client returns the provided mongo.Client.
func (*EventStore) Collection ¶
func (s *EventStore) Collection() *mongo.Collection
Collection returns the underlying *mongo.Collection where the events are stored in. Collection returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query.
func (*EventStore) Connect ¶
func (s *EventStore) Connect(ctx context.Context, opts ...*options.ClientOptions) (*mongo.Client, error)
Connect establishes the connection to the underlying MongoDB and returns the mongo.Client. Connect doesn't need to be called manually as it's called automatically on the first call to s.Insert, s.Find, s.Delete or s.Query. Use Connect if you want to explicitly control when to connect to MongoDB.
func (*EventStore) Database ¶
func (s *EventStore) Database() *mongo.Database
Database returns the underlying mongo.Database. Database returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query.
func (*EventStore) Find ¶
Find returns the event with the specified UUID from the database if it exists.
func (*EventStore) Query ¶
func (s *EventStore) Query(ctx context.Context, q event.Query) (<-chan event.Event, <-chan error, error)
Query queries the database for events filtered by Query q and returns an streams.New for those events.
func (*EventStore) StateCollection ¶
func (s *EventStore) StateCollection() *mongo.Collection
StateCollection returns the underlying *mongo.Collection where aggregate states are stored in. StateCollection returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query.
type EventStoreOption ¶
type EventStoreOption func(*EventStore)
EventStoreOption is an eventStore option.
func Client ¶
func Client(c *mongo.Client) EventStoreOption
Client returns an Option that specifies the underlying mongo.Client to be used by the Store.
func Collection ¶
func Collection(name string) EventStoreOption
Collection returns an Option that sets the mongo collection where the events are stored in.
func Database ¶
func Database(name string) EventStoreOption
Database returns an Option that sets the mongo database to use for the events.
func NoIndex ¶ added in v0.1.2
func NoIndex(ni bool) EventStoreOption
NoIndex returns an option to completely disable index creation when connecting to the event bus.
func StateCollection ¶
func StateCollection(name string) EventStoreOption
StateCollection returns an Option that specifies the name of the Collection where the current state of aggregates are stored in.
func Transactions ¶
func Transactions(tx bool) EventStoreOption
Transactions returns an Option that, if tx is true, configures a Store to use MongoDB Transactions when inserting events.
Transactions can only be used in replica sets or sharded clusters: https://docs.mongodb.com/manual/core/transactions/
func URL ¶
func URL(url string) EventStoreOption
URL returns an Option that specifies the URL to the MongoDB instance. An empty URL means "use the default".
Defaults to the environment variable "MONGO_URL".
func ValidateVersions ¶
func ValidateVersions(v bool) EventStoreOption
ValidateVersions returns an Option that enables validation of event versions before inserting them into the Store.
Defaults to true.
func WithIndices ¶ added in v0.1.2
func WithIndices(models ...mongo.IndexModel) EventStoreOption
WithIndices returns an EventStoreOption that creates additional indices for the event collection. Can be used to create builtin edge-case indices:
WithIndices(indices.EventStore.NameAndVersion)
type ModelRepository ¶ added in v0.1.2
type ModelRepository[Model model.Model[ID], ID model.ID] struct { // contains filtered or unexported fields }
ModelRepository is a MongoDB backed model repository.
func NewModelRepository ¶ added in v0.1.2
func NewModelRepository[Model model.Model[ID], ID model.ID](col *mongo.Collection, opts ...ModelRepositoryOption) *ModelRepository[Model, ID]
NewModelRepository returns a MongoDB backed model repository.
func (*ModelRepository[Model, ID]) Collection ¶ added in v0.1.2
func (r *ModelRepository[Model, ID]) Collection() *mongo.Collection
Collection returns the MongoDB collection of the model.
func (*ModelRepository[Model, ID]) CreateIndexes ¶ added in v0.1.2
func (r *ModelRepository[Model, ID]) CreateIndexes(ctx context.Context) error
CreateIndexes creates the index for the configured "id" field of the model. If no custom "id" field has been configured, no index is created because MongoDB automatically creates an index for the "_id" field.
func (*ModelRepository[Model, ID]) Delete ¶ added in v0.1.2
func (r *ModelRepository[Model, ID]) Delete(ctx context.Context, m Model) error
Delete deletes the given model from the database.
func (*ModelRepository[Model, ID]) Fetch ¶ added in v0.1.2
func (r *ModelRepository[Model, ID]) Fetch(ctx context.Context, id ID) (Model, error)
Fetch fetches the given model from the database. If the model cannot be found, an error that unwraps to model.ErrNotFound is returned.
func (*ModelRepository[Model, ID]) Save ¶ added in v0.1.2
func (r *ModelRepository[Model, ID]) Save(ctx context.Context, m Model) error
Save saves the given model to the database using the MongoDB "ReplaceOne" command with the upsert option set to true.
func (*ModelRepository[Model, ID]) Use ¶ added in v0.1.2
func (r *ModelRepository[Model, ID]) Use(ctx context.Context, id ID, fn func(Model) error) error
Use fetches the given model from the database, passes the model to the provided function and finally saves the model back to the database. If the ModelTransactions option is set to true, the operation is done within a MongoDB transaction (must be supported by your MongoDB cluster).
type ModelRepositoryOption ¶ added in v0.1.2
type ModelRepositoryOption func(*modelRepositoryOptions)
ModelRepositoryOption is an option for the model repository.
func ModelDecoder ¶ added in v0.1.2
func ModelDecoder[Model model.Model[ID], ID model.ID](decode func(*mongo.SingleResult, *Model) error) ModelRepositoryOption
ModelDecoder returns a ModelRepositoryOption that specifies a custom decoder for the model.
func ModelEncoder ¶ added in v0.1.2
func ModelEncoder[Model model.Model[ID], ID model.ID](encode func(Model) (any, error)) ModelRepositoryOption
ModelEncoder returns a ModelRepositoryOption that specifies a custom encoder for the model. Then a model is saved, the document that is returned by the provided encode function is used as the replacement document.
func ModelFactory ¶ added in v0.1.2
func ModelFactory[Model model.Model[ID], ID model.ID](factory func(ID) Model, createIfNotFound bool) ModelRepositoryOption
ModelFactory returns a ModelRepositoryOption that provides a factory function for the models to a model repository. The repository will use the function to create the model before decoding the MongoDB document into it. Without a model factory, the repository will just use the zero value of the provided model type. If `createIfNotFound` is true, the repository will create and return the model using the factory function instead of returning a model.ErrNotFound error.
func ModelIDKey ¶ added in v0.1.2
func ModelIDKey(key string) ModelRepositoryOption
ModelIDKey returns a ModelRepositoryOption that specifies which field of the model is the id of the model.
func ModelTransactions ¶ added in v0.1.2
func ModelTransactions(tx bool) ModelRepositoryOption
ModelTransactions returns a ModelRepositoryOption that enables MongoDB transactions for the repository. Currently, only the Use() function makes use of transactions. Transactions are disabled by default and must be supported by your MongoDB cluster.
type Option ¶ added in v0.1.2
type Option func(*SnapshotStore)
Option is a Store option.
func SnapshotCollection ¶ added in v0.1.2
SnapshotCollection returns an Option that specifies the collection name for Snapshots.
func SnapshotDatabase ¶ added in v0.1.2
SnapshotDatabase returns an Option that specifies the database name for snapshots.
func SnapshotURL ¶ added in v0.1.2
SnapshotURL returns an Option that specifies the URL to the MongoDB instance. An empty SnapshotURL means "use the default".
Defaults to the environment variable "MONGO_URL".
type SnapshotStore ¶ added in v0.1.2
type SnapshotStore struct {
// contains filtered or unexported fields
}
SnapshotStore is the MongoDB implementation of a snapshot store.
func NewSnapshotStore ¶ added in v0.1.2
func NewSnapshotStore(opts ...Option) *SnapshotStore
NewSnapshotStore returns a new Store.
func (*SnapshotStore) Connect ¶ added in v0.1.2
Connect establishes the connection to the underlying MongoDB and returns the mongo.Client. Connect doesn't need to be called manually as it's called automatically on the first call to s.Save, s.Latest, s.Version, s.Query or s.Delete. Use Connect if you want to explicitly control when to connect to MongoDB.
func (*SnapshotStore) Latest ¶ added in v0.1.2
func (s *SnapshotStore) Latest(ctx context.Context, name string, id uuid.UUID) (snapshot.Snapshot, error)
Latest returns the latest Snapshot for the aggregate with the given name and UUID or ErrNotFound if no Snapshots for that aggregate exist in the database.
func (*SnapshotStore) Limit ¶ added in v0.1.2
func (s *SnapshotStore) Limit(ctx context.Context, name string, id uuid.UUID, v int) (snapshot.Snapshot, error)
Limit returns the latest Snapshot that has a version equal to or lower than the given version.
Limit returns ErrNotFound if no such Snapshot can be found in the database.
func (*SnapshotStore) Query ¶ added in v0.1.2
func (s *SnapshotStore) Query(ctx context.Context, q snapshot.Query) (<-chan snapshot.Snapshot, <-chan error, error)
Query returns a channel of Snapshots and a channel of errors. It takes a context.Context and a snapshot.Query as parameters. The returned Snapshots match the criteria defined by the snapshot.Query. The order in which Snapshots are returned is determined by the Sortings defined in the snapshot.Query. The channels are closed when there are no more Snapshots or an error occurs, respectively.
func (*SnapshotStore) Version ¶ added in v0.1.2
func (s *SnapshotStore) Version(ctx context.Context, name string, id uuid.UUID, version int) (snapshot.Snapshot, error)
Version returns the Snapshot for the aggregate with the given name, UUID and version. If no Snapshot for the given version exists, Version returns ErrNotFound.
type VersionError ¶
type VersionError struct { // AggregateName is the name of the aggregate. AggregateName string // AggregateID is the UUID of the aggregate. AggregateID uuid.UUID // CurrentVersion is the current version of the aggregate. CurrentVersion int // Event is the event with the invalid version. Event event.Event // contains filtered or unexported fields }
A VersionError means the insertion of events failed because at least one of the events has an invalid/inconsistent version.
func (VersionError) Error ¶
func (err VersionError) Error() string
Error returns a string representation of a VersionError. A VersionError indicates that the insertion of events failed because at least one of the events has an invalid/inconsistent version. The returned string describes the validation error(s) and includes details about the aggregate's name, UUID, and current version; and the event with the invalid version.
func (VersionError) IsConsistencyError ¶ added in v0.1.2
func (err VersionError) IsConsistencyError() bool
IsConsistencyError reports whether a VersionError is a consistency error.