mongo

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2023 License: Apache-2.0 Imports: 21 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// PreInsert represents a hook that is executed before inserting events into the
	// EventStore. This allows for additional operations to be performed within the
	// same transaction, such as validation or transformation of data. The hook
	// function should return an error if anything goes wrong, causing the
	// transaction to abort.
	PreInsert = TransactionHook("pre:insert")

	// PostInsert represents a hook that is executed after inserting events into the
	// EventStore. This allows for additional operations to be performed within the
	// same transaction, such as cleanup or logging operations. The hook function
	// should return an error if anything goes wrong, causing the transaction to
	// abort.
	PostInsert = TransactionHook("post:insert")
)
View Source
var (
	// ErrNotFound is returned when a snapshot can't be found in the database.
	ErrNotFound = errors.New("snapshot not found")
)

Functions

This section is empty.

Types

type CommandError added in v0.1.2

type CommandError mongo.CommandError

CommandError is a mongo.CommandError that satisfies aggregate.IsConsistencyError(err).

func (CommandError) CommandError added in v0.1.2

func (err CommandError) CommandError() mongo.CommandError

CommandError CommandError returns a mongo.CommandError from the receiver. It is used for type conversion from CommandError to mongo.CommandError.

func (CommandError) Error added in v0.1.2

func (err CommandError) Error() string

func (CommandError) IsConsistencyError added in v0.1.2

func (err CommandError) IsConsistencyError() bool

IsConsistencyError reports whether a CommandError is a consistency error. A CommandError is considered a consistency error if it has an error label indicating that the operation failed due to a transient or unknown transaction error.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore is a type that provides an interface to store, retrieve, and manage events in a MongoDB database. It supports insertion and deletion of events, querying for specific events, and consistency checks through version validation. EventStore also allows for the use of hooks that can be executed before or after inserting events into the store. It provides transactional operations to ensure atomicity and consistency of the stored data. The EventStore can be configured with various options such as MongoDB connection details, collections for storing events and aggregate states, and the use of transactions.

func NewEventStore

func NewEventStore(enc codec.Encoding, opts ...EventStoreOption) *EventStore

NewEventStore creates a new instance of an EventStore. This function accepts a codec.Encoding and any number of EventStoreOption functions. The EventStore instance created by this function will use the provided codec.Encoding for event serialization and deserialization. The EventStoreOptions are used to configure the behavior and characteristics of the EventStore, such as the underlying MongoDB client to use, the database and collections to store events in, and whether or not to validate event versions. If no database or collection names are provided via EventStoreOptions, default names will be used. By default, version validation is enabled.

func (*EventStore) Client

func (s *EventStore) Client() *mongo.Client

Client returns the underlying mongo.Client. If no mongo.Client is provided with the Client option, Client returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query. Otherwise Client returns the provided mongo.Client.

func (*EventStore) Collection

func (s *EventStore) Collection() *mongo.Collection

Collection returns the underlying *mongo.Collection where the events are stored in. Collection returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query.

func (*EventStore) Connect

func (s *EventStore) Connect(ctx context.Context, opts ...*options.ClientOptions) (*mongo.Client, error)

Connect establishes the connection to the underlying MongoDB and returns the mongo.Client. Connect doesn't need to be called manually as it's called automatically on the first call to s.Insert, s.Find, s.Delete or s.Query. Use Connect if you want to explicitly control when to connect to MongoDB.

func (*EventStore) Database

func (s *EventStore) Database() *mongo.Database

Database returns the underlying mongo.Database. Database returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query.

func (*EventStore) Delete

func (s *EventStore) Delete(ctx context.Context, events ...event.Event) error

Delete deletes the given event from the database.

func (*EventStore) Find

func (s *EventStore) Find(ctx context.Context, id uuid.UUID) (event.Event, error)

Find returns the event with the specified UUID from the database if it exists.

func (*EventStore) Insert

func (s *EventStore) Insert(ctx context.Context, events ...event.Event) (out error)

Insert saves the given events into the database.

func (*EventStore) Query

func (s *EventStore) Query(ctx context.Context, q event.Query) (<-chan event.Event, <-chan error, error)

Query queries the database for events filtered by Query q and returns an streams.New for those events.

func (*EventStore) StateCollection

func (s *EventStore) StateCollection() *mongo.Collection

StateCollection returns the underlying *mongo.Collection where aggregate states are stored in. StateCollection returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query.

type EventStoreOption

type EventStoreOption func(*EventStore)

EventStoreOption is a function that modifies an EventStore. These options are used to configure various aspects of the EventStore, such as the MongoDB instance connection details, the collections where events and aggregate states are stored, the use of transactions when inserting events, and more. It also supports the use of hooks that are executed before or after inserting events into the store.

func Client

func Client(c *mongo.Client) EventStoreOption

Client returns an EventStoreOption that sets the provided mongo.Client to be used by the EventStore. This option is useful when you already have a mongo.Client and want to reuse it, instead of letting the EventStore create its own client.

func Collection

func Collection(name string) EventStoreOption

Collection returns an Option that sets the mongo collection where the events are stored in.

func Database

func Database(name string) EventStoreOption

Database returns an Option that sets the mongo database to use for the events.

func NoIndex added in v0.1.2

func NoIndex(ni bool) EventStoreOption

NoIndex returns an option to completely disable index creation when connecting to the event bus.

func StateCollection

func StateCollection(name string) EventStoreOption

StateCollection returns an Option that specifies the name of the Collection where the current state of aggregates are stored in.

func Transactions

func Transactions(tx bool) EventStoreOption

Transactions returns an Option that, if tx is true, configures a Store to use MongoDB Transactions when inserting events.

Transactions can only be used in replica sets or sharded clusters: https://docs.mongodb.com/manual/core/transactions/

func URL

func URL(url string) EventStoreOption

URL returns an Option that specifies the URL to the MongoDB instance. An empty URL means "use the default".

Defaults to the environment variable "MONGO_URL".

func ValidateVersions

func ValidateVersions(v bool) EventStoreOption

ValidateVersions returns an Option that enables validation of event versions before inserting them into the Store.

Defaults to true.

func WithIndices added in v0.1.2

func WithIndices(models ...mongo.IndexModel) EventStoreOption

WithIndices returns an EventStoreOption that creates additional indices for the event collection. Can be used to create builtin edge-case indices:

WithIndices(indices.EventStore.NameAndVersion)

func WithTransactionHook added in v0.4.0

func WithTransactionHook(hook TransactionHook, fn func(TransactionContext) error) EventStoreOption

WithTransactionHook configures a transaction hook for the EventStore. The hook function will be called either before or after inserting events into the EventStore, depending on the specified TransactionHook. If the TransactionHook is "pre:insert", the hook function will be called before insertion. If it's "post:insert", the function will be called after insertion. The hook function should return an error if anything goes wrong, causing the transaction to abort.

type ModelRepository added in v0.1.2

type ModelRepository[Model model.Model[ID], ID model.ID] struct {
	// contains filtered or unexported fields
}

ModelRepository is a MongoDB backed model repository.

func NewModelRepository added in v0.1.2

func NewModelRepository[Model model.Model[ID], ID model.ID](col *mongo.Collection, opts ...ModelRepositoryOption) *ModelRepository[Model, ID]

NewModelRepository returns a MongoDB backed model repository.

func (*ModelRepository[Model, ID]) Collection added in v0.1.2

func (r *ModelRepository[Model, ID]) Collection() *mongo.Collection

Collection returns the MongoDB collection of the model.

func (*ModelRepository[Model, ID]) CreateIndexes added in v0.1.2

func (r *ModelRepository[Model, ID]) CreateIndexes(ctx context.Context) error

CreateIndexes creates the index for the configured "id" field of the model. If no custom "id" field has been configured, no index is created because MongoDB automatically creates an index for the "_id" field.

func (*ModelRepository[Model, ID]) Delete added in v0.1.2

func (r *ModelRepository[Model, ID]) Delete(ctx context.Context, m Model) error

Delete deletes the given model from the database.

func (*ModelRepository[Model, ID]) Fetch added in v0.1.2

func (r *ModelRepository[Model, ID]) Fetch(ctx context.Context, id ID) (Model, error)

Fetch fetches the given model from the database. If the model cannot be found, an error that unwraps to model.ErrNotFound is returned.

func (*ModelRepository[Model, ID]) Save added in v0.1.2

func (r *ModelRepository[Model, ID]) Save(ctx context.Context, m Model) error

Save saves the given model to the database using the MongoDB "ReplaceOne" command with the upsert option set to true.

func (*ModelRepository[Model, ID]) Use added in v0.1.2

func (r *ModelRepository[Model, ID]) Use(ctx context.Context, id ID, fn func(Model) error) error

Use fetches the given model from the database, passes the model to the provided function and finally saves the model back to the database. If the ModelTransactions option is set to true, the operation is done within a MongoDB transaction (must be supported by your MongoDB cluster).

type ModelRepositoryOption added in v0.1.2

type ModelRepositoryOption func(*modelRepositoryOptions)

ModelRepositoryOption is an option for the model repository.

func ModelDecoder added in v0.1.2

func ModelDecoder[Model model.Model[ID], ID model.ID](decode func(*mongo.SingleResult, *Model) error) ModelRepositoryOption

ModelDecoder returns a ModelRepositoryOption that specifies a custom decoder for the model.

func ModelEncoder added in v0.1.2

func ModelEncoder[Model model.Model[ID], ID model.ID](encode func(Model) (any, error)) ModelRepositoryOption

ModelEncoder returns a ModelRepositoryOption that specifies a custom encoder for the model. Then a model is saved, the document that is returned by the provided encode function is used as the replacement document.

func ModelFactory added in v0.1.2

func ModelFactory[Model model.Model[ID], ID model.ID](factory func(ID) Model, createIfNotFound bool) ModelRepositoryOption

ModelFactory returns a ModelRepositoryOption that provides a factory function for the models to a model repository. The repository will use the function to create the model before decoding the MongoDB document into it. Without a model factory, the repository will just use the zero value of the provided model type. If `createIfNotFound` is true, the repository will create and return the model using the factory function instead of returning a model.ErrNotFound error.

func ModelIDKey added in v0.1.2

func ModelIDKey(key string) ModelRepositoryOption

ModelIDKey returns a ModelRepositoryOption that specifies which field of the model is the id of the model.

func ModelTransactions added in v0.1.2

func ModelTransactions(tx bool) ModelRepositoryOption

ModelTransactions returns a ModelRepositoryOption that enables MongoDB transactions for the repository. Currently, only the Use() function makes use of transactions. Transactions are disabled by default and must be supported by your MongoDB cluster.

type Option added in v0.1.2

type Option func(*SnapshotStore)

Option is a Store option.

func SnapshotCollection added in v0.1.2

func SnapshotCollection(name string) Option

SnapshotCollection returns an Option that specifies the collection name for Snapshots.

func SnapshotDatabase added in v0.1.2

func SnapshotDatabase(name string) Option

SnapshotDatabase returns an Option that specifies the database name for snapshots.

func SnapshotURL added in v0.1.2

func SnapshotURL(url string) Option

SnapshotURL returns an Option that specifies the URL to the MongoDB instance. An empty SnapshotURL means "use the default".

Defaults to the environment variable "MONGO_URL".

type SnapshotStore added in v0.1.2

type SnapshotStore struct {
	// contains filtered or unexported fields
}

SnapshotStore is the MongoDB implementation of a snapshot store.

func NewSnapshotStore added in v0.1.2

func NewSnapshotStore(opts ...Option) *SnapshotStore

NewSnapshotStore returns a new Store.

func (*SnapshotStore) Connect added in v0.1.2

func (s *SnapshotStore) Connect(ctx context.Context) (*mongo.Client, error)

Connect establishes the connection to the underlying MongoDB and returns the mongo.Client. Connect doesn't need to be called manually as it's called automatically on the first call to s.Save, s.Latest, s.Version, s.Query or s.Delete. Use Connect if you want to explicitly control when to connect to MongoDB.

func (*SnapshotStore) Delete added in v0.1.2

func (s *SnapshotStore) Delete(ctx context.Context, snap snapshot.Snapshot) error

Delete deletes a Snapshot from the database.

func (*SnapshotStore) Latest added in v0.1.2

func (s *SnapshotStore) Latest(ctx context.Context, name string, id uuid.UUID) (snapshot.Snapshot, error)

Latest returns the latest Snapshot for the aggregate with the given name and UUID or ErrNotFound if no Snapshots for that aggregate exist in the database.

func (*SnapshotStore) Limit added in v0.1.2

func (s *SnapshotStore) Limit(ctx context.Context, name string, id uuid.UUID, v int) (snapshot.Snapshot, error)

Limit returns the latest Snapshot that has a version equal to or lower than the given version.

Limit returns ErrNotFound if no such Snapshot can be found in the database.

func (*SnapshotStore) Query added in v0.1.2

func (s *SnapshotStore) Query(ctx context.Context, q snapshot.Query) (<-chan snapshot.Snapshot, <-chan error, error)

Query returns a channel of Snapshots and a channel of errors. It takes a context.Context and a snapshot.Query as parameters. The returned Snapshots match the criteria defined by the snapshot.Query. The order in which Snapshots are returned is determined by the Sortings defined in the snapshot.Query. The channels are closed when there are no more Snapshots or an error occurs, respectively.

func (*SnapshotStore) Save added in v0.1.2

func (s *SnapshotStore) Save(ctx context.Context, snap snapshot.Snapshot) error

Save saves the given Snapshot into the database.

func (*SnapshotStore) Version added in v0.1.2

func (s *SnapshotStore) Version(ctx context.Context, name string, id uuid.UUID, version int) (snapshot.Snapshot, error)

Version returns the Snapshot for the aggregate with the given name, UUID and version. If no Snapshot for the given version exists, Version returns ErrNotFound.

type Transaction added in v0.4.0

type Transaction interface {
	// Session retrieves the active MongoDB session associated with the current
	// transaction. This function allows for direct interaction with the MongoDB
	// session for operations not directly supported by the Transaction interface.
	// It returns a [mongo.Session] instance that represents the current session.
	// The Session method is only available within a Transaction and should not be
	// used outside of it to avoid unexpected behaviour or errors.
	Session() mongo.Session

	// EventStore is a method of the Transaction interface. It returns an instance
	// of the EventStore type that is associated with the transaction. This can be
	// used to perform operations on the event store within the context of the
	// transaction, ensuring consistency and atomicity of operations.
	EventStore() *EventStore

	// InsertedEvents retrieves all events that have been inserted into the store
	// within the current transaction. The function is part of the [Transaction]
	// interface and returns a slice of [event.Event]. This method is useful for
	// tracking changes made during a transaction, allowing for operations such as
	// rollbacks or additional processing based on the inserted events.
	InsertedEvents() []event.Event
}

Transaction represents a set of operations that are executed within the same session in an EventStore. It encapsulates the session of the MongoDB driver and provides a reference to the associated EventStore. It also keeps track of all events that have been inserted into the EventStore within its session. Transactional operations ensure atomicity and consistency of data in the EventStore, making sure that either all operations are successfully completed, or none are in case of an error.

func TransactionFromContext added in v0.4.0

func TransactionFromContext(ctx context.Context) Transaction

TransactionFromContext retrieves the Transaction from the provided context. If no Transaction exists in the context, it returns nil.

type TransactionContext added in v0.4.0

type TransactionContext interface {
	context.Context
	Transaction
}

TransactionContext is an interface that embeds the standard context.Context and Transaction interfaces. It provides a way to pass transaction-specific data, such as the session information and inserted events, along with the usual context data. This is particularly useful when using hooks in the EventStore, allowing hook functions to access additional information about the ongoing transaction.

type TransactionHook added in v0.4.0

type TransactionHook string

TransactionHook represents a hook that can be executed before or after inserting events into the EventStore. The hook function should return an error if anything goes wrong, causing the transaction to abort.

type VersionError

type VersionError struct {
	AggregateName  string
	AggregateID    uuid.UUID
	CurrentVersion int
	Event          event.Event
	// contains filtered or unexported fields
}

VersionError represents an error that occurs when an event has an incorrect version. This usually happens when the event's version does not match the expected version based on the current state of its corresponding aggregate. This error is used within the EventStore to ensure consistency of events and aggregates. The fields in VersionError provide additional information about the error, including the name and ID of the aggregate, the current version of the aggregate, and the event that caused the error. Methods are provided to return a string representation of the error and to check if it is a consistency error.

func (VersionError) Error

func (err VersionError) Error() string

Error returns a string representation of the VersionError. If an underlying error is present, it prepends "version error: " to the error message. Otherwise, it generates a message indicating that an event has an incorrect version compared to what was expected.

func (VersionError) IsConsistencyError added in v0.1.2

func (err VersionError) IsConsistencyError() bool

IsConsistencyError checks if a VersionError is a consistency error. It always returns true as all VersionErrors are considered consistency errors. This method is useful for handling errors where consistency needs to be ensured, such as when the version of an event does not match the expected version based on the current state of its corresponding aggregate.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL