mongo

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2022 License: Apache-2.0 Imports: 22 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotFound is returned when a snapshot can't be found in the database.
	ErrNotFound = errors.New("snapshot not found")
)

Functions

This section is empty.

Types

type CommandError added in v0.1.2

type CommandError mongo.CommandError

CommandError is a mongo.CommandError that satisfies aggregate.IsConsistencyError(err).

func (CommandError) CommandError added in v0.1.2

func (err CommandError) CommandError() mongo.CommandError

CommandError returns the error as a mongo.CommandError.

func (CommandError) Error added in v0.1.2

func (err CommandError) Error() string

func (CommandError) IsConsistencyError added in v0.1.2

func (err CommandError) IsConsistencyError() bool

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore is the MongoDB event.Store.

func NewEventStore

func NewEventStore(enc codec.Encoding, opts ...EventStoreOption) *EventStore

NewEventStore returns a MongoDB event.Store.

func (*EventStore) Client

func (s *EventStore) Client() *mongo.Client

Client returns the underlying mongo.Client. If no mongo.Client is provided with the Client option, Client returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query. Otherwise Client returns the provided mongo.Client.

func (*EventStore) Collection

func (s *EventStore) Collection() *mongo.Collection

Collection returns the underlying *mongo.Collection where the events are stored in. Collection returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query.

func (*EventStore) Connect

func (s *EventStore) Connect(ctx context.Context, opts ...*options.ClientOptions) (*mongo.Client, error)

Connect establishes the connection to the underlying MongoDB and returns the mongo.Client. Connect doesn't need to be called manually as it's called automatically on the first call to s.Insert, s.Find, s.Delete or s.Query. Use Connect if you want to explicitly control when to connect to MongoDB.

func (*EventStore) Database

func (s *EventStore) Database() *mongo.Database

Database returns the underlying mongo.Database. Database returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query.

func (*EventStore) Delete

func (s *EventStore) Delete(ctx context.Context, events ...event.Event) error

Delete deletes the given event from the database.

func (*EventStore) Find

func (s *EventStore) Find(ctx context.Context, id uuid.UUID) (event.Event, error)

Find returns the event with the specified UUID from the database if it exists.

func (*EventStore) Insert

func (s *EventStore) Insert(ctx context.Context, events ...event.Event) error

Insert saves the given events into the database.

func (*EventStore) Query

func (s *EventStore) Query(ctx context.Context, q event.Query) (<-chan event.Event, <-chan error, error)

Query queries the database for events filtered by Query q and returns an streams.New for those events.

func (*EventStore) StateCollection

func (s *EventStore) StateCollection() *mongo.Collection

StateCollection returns the underlying *mongo.Collection where aggregate states are stored in. StateCollection returns nil until the connection to MongoDB has been established by either explicitly calling s.Connect or implicitly by calling s.Insert, s.Find, s.Delete or s.Query.

type EventStoreOption

type EventStoreOption func(*EventStore)

EventStoreOption is an eventStore option.

func Client

func Client(c *mongo.Client) EventStoreOption

Client returns an Option that specifies the underlying mongo.Client to be used by the Store.

func Collection

func Collection(name string) EventStoreOption

Collection returns an Option that sets the mongo collection where the events are stored in.

func Database

func Database(name string) EventStoreOption

Database returns an Option that sets the mongo database to use for the events.

func NoIndex added in v0.1.2

func NoIndex(ni bool) EventStoreOption

NoIndex returns an option to completely disable index creation when connecting to the event bus.

func StateCollection

func StateCollection(name string) EventStoreOption

StateCollection returns an Option that specifies the name of the Collection where the current state of aggregates are stored in.

func Transactions

func Transactions(tx bool) EventStoreOption

Transactions returns an Option that, if tx is true, configures a Store to use MongoDB Transactions when inserting events.

Transactions can only be used in replica sets or sharded clusters: https://docs.mongodb.com/manual/core/transactions/

func URL

func URL(url string) EventStoreOption

URL returns an Option that specifies the URL to the MongoDB instance. An empty URL means "use the default".

Defaults to the environment variable "MONGO_URL".

func ValidateVersions

func ValidateVersions(v bool) EventStoreOption

ValidateVersions returns an Option that enables validation of event versions before inserting them into the Store.

Defaults to true.

func WithIndices added in v0.1.2

func WithIndices(models ...mongo.IndexModel) EventStoreOption

WithIndices returns an EventStoreOption that creates additional indices for the event collection. Can be used to create builtin edge-case indices:

WithIndices(indices.EventStore.NameAndVersion)

type ModelRepository added in v0.1.2

type ModelRepository[Model model.Model[ID], ID model.ID] struct {
	// contains filtered or unexported fields
}

ModelRepository is a MongoDB backed model repository.

func NewModelRepository added in v0.1.2

func NewModelRepository[Model model.Model[ID], ID model.ID](col *mongo.Collection, opts ...ModelRepositoryOption) *ModelRepository[Model, ID]

NewModelRepository returns a MongoDB backed model repository.

func (*ModelRepository[Model, ID]) Collection added in v0.1.2

func (r *ModelRepository[Model, ID]) Collection() *mongo.Collection

Collection returns the MongoDB collection of the model.

func (*ModelRepository[Model, ID]) CreateIndexes added in v0.1.2

func (r *ModelRepository[Model, ID]) CreateIndexes(ctx context.Context) error

CreateIndexes creates the index for the configured "id" field of the model. If no custom "id" field has been configured, no index is created because MongoDB automatically creates an index for the "_id" field.

func (*ModelRepository[Model, ID]) Delete added in v0.1.2

func (r *ModelRepository[Model, ID]) Delete(ctx context.Context, m Model) error

Delete deletes the given model from the database.

func (*ModelRepository[Model, ID]) Fetch added in v0.1.2

func (r *ModelRepository[Model, ID]) Fetch(ctx context.Context, id ID) (Model, error)

Fetch fetches the given model from the database. If the model cannot be found, an error that unwraps to model.ErrNotFound is returned.

func (*ModelRepository[Model, ID]) Save added in v0.1.2

func (r *ModelRepository[Model, ID]) Save(ctx context.Context, m Model) error

Save saves the given model to the database using the MongoDB "ReplaceOne" command with the upsert option set to true.

func (*ModelRepository[Model, ID]) Use added in v0.1.2

func (r *ModelRepository[Model, ID]) Use(ctx context.Context, id ID, fn func(Model) error) error

Use fetches the given model from the database, passes the model to the provided function and finally saves the model back to the database. If the ModelTransactions option is set to true, the operation is done within a MongoDB transaction (must be supported by your MongoDB cluster).

type ModelRepositoryOption added in v0.1.2

type ModelRepositoryOption func(*modelRepositoryOptions)

ModelRepositoryOption is an option for the model repository.

func ModelDecoder added in v0.1.2

func ModelDecoder[Model model.Model[ID], ID model.ID](decode func(*mongo.SingleResult, *Model) error) ModelRepositoryOption

ModelDecoder returns a ModelRepositoryOption that specifies a custom decoder for the model.

func ModelEncoder added in v0.1.2

func ModelEncoder[Model model.Model[ID], ID model.ID](encode func(Model) (any, error)) ModelRepositoryOption

ModelEncoder returns a ModelRepositoryOption that specifies a custom encoder for the model. Then a model is saved, the document that is returned by the provided encode function is used as the replacement document.

func ModelFactory added in v0.1.2

func ModelFactory[Model model.Model[ID], ID model.ID](factory func(ID) Model, createIfNotFound bool) ModelRepositoryOption

ModelFactory returns a ModelRepositoryOption that provides a factory function for the models to a model repository. The repository will use the function to create the model before decoding the MongoDB document into it. Without a model factory, the repository will just use the zero value of the provided model type. If `createIfNotFound` is true, the repository will create and return the model using the factory function instead of returning a model.ErrNotFound error.

func ModelIDKey added in v0.1.2

func ModelIDKey(key string) ModelRepositoryOption

ModelIDKey returns a ModelRepositoryOption that specifies which field of the model is the id of the model.

func ModelTransactions added in v0.1.2

func ModelTransactions(tx bool) ModelRepositoryOption

ModelTransactions returns a ModelRepositoryOption that enables MongoDB transactions for the repository. Currently, only the Use() function makes use of transactions. Transactions are disabled by default and must be supported by your MongoDB cluster.

type Option added in v0.1.2

type Option func(*SnapshotStore)

Option is a Store option.

func SnapshotCollection added in v0.1.2

func SnapshotCollection(name string) Option

SnapshotCollection returns an Option that specifies the collection name for Snapshots.

func SnapshotDatabase added in v0.1.2

func SnapshotDatabase(name string) Option

SnapshotDatabase returns an Option that specifies the database name for snapshots.

func SnapshotURL added in v0.1.2

func SnapshotURL(url string) Option

SnapshotURL returns an Option that specifies the URL to the MongoDB instance. An empty SnapshotURL means "use the default".

Defaults to the environment variable "MONGO_URL".

type SnapshotStore added in v0.1.2

type SnapshotStore struct {
	// contains filtered or unexported fields
}

SnapshotStore is the MongoDB implementation of a snapshot store.

func NewSnapshotStore added in v0.1.2

func NewSnapshotStore(opts ...Option) *SnapshotStore

NewSnapshotStore returns a new Store.

func (*SnapshotStore) Connect added in v0.1.2

func (s *SnapshotStore) Connect(ctx context.Context) (*mongo.Client, error)

Connect establishes the connection to the underlying MongoDB and returns the mongo.Client. Connect doesn't need to be called manually as it's called automatically on the first call to s.Save, s.Latest, s.Version, s.Query or s.Delete. Use Connect if you want to explicitly control when to connect to MongoDB.

func (*SnapshotStore) Delete added in v0.1.2

func (s *SnapshotStore) Delete(ctx context.Context, snap snapshot.Snapshot) error

Delete deletes a Snapshot from the database.

func (*SnapshotStore) Latest added in v0.1.2

func (s *SnapshotStore) Latest(ctx context.Context, name string, id uuid.UUID) (snapshot.Snapshot, error)

Latest returns the latest Snapshot for the aggregate with the given name and UUID or ErrNotFound if no Snapshots for that aggregate exist in the database.

func (*SnapshotStore) Limit added in v0.1.2

func (s *SnapshotStore) Limit(ctx context.Context, name string, id uuid.UUID, v int) (snapshot.Snapshot, error)

Limit returns the latest Snapshot that has a version equal to or lower than the given version.

Limit returns ErrNotFound if no such Snapshot can be found in the database.

func (*SnapshotStore) Query added in v0.1.2

func (s *SnapshotStore) Query(ctx context.Context, q snapshot.Query) (<-chan snapshot.Snapshot, <-chan error, error)

func (*SnapshotStore) Save added in v0.1.2

func (s *SnapshotStore) Save(ctx context.Context, snap snapshot.Snapshot) error

Save saves the given Snapshot into the database.

func (*SnapshotStore) Version added in v0.1.2

func (s *SnapshotStore) Version(ctx context.Context, name string, id uuid.UUID, version int) (snapshot.Snapshot, error)

Version returns the Snapshot for the aggregate with the given name, UUID and version. If no Snapshot for the given version exists, Version returns ErrNotFound.

type VersionError

type VersionError struct {
	// AggregateName is the name of the aggregate.
	AggregateName string

	// AggregateID is the UUID of the aggregate.
	AggregateID uuid.UUID

	// CurrentVersion is the current version of the aggregate.
	CurrentVersion int

	// Event is the event with the invalid version.
	Event event.Event
	// contains filtered or unexported fields
}

A VersionError means the insertion of events failed because at least one of the events has an invalid/inconsistent version.

func (VersionError) Error

func (err VersionError) Error() string

func (VersionError) IsConsistencyError added in v0.1.2

func (err VersionError) IsConsistencyError() bool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL