Documentation
¶
Index ¶
- func OutboxInsertHandler[K eventsourcing.ID](database, collName string) store.InTxHandler[K]
- func TxRunner(client *mongo.Client) store.Tx
- type ChangeEvent
- type EsRepository
- func NewStore[K eventsourcing.ID, PK eventsourcing.IDPt[K]](ctx context.Context, client *mongo.Client, database string, ...) (*EsRepository[K, PK], error)
- func NewStoreWithURI[K eventsourcing.ID, PK eventsourcing.IDPt[K]](ctx context.Context, connString, database string, opts ...Option[K, PK]) (*EsRepository[K, PK], error)
- func (r *EsRepository[K, PK]) Client() *mongo.Client
- func (r *EsRepository[K, PK]) Close(ctx context.Context)
- func (r *EsRepository[K, PK]) Forget(ctx context.Context, request eventsourcing.ForgetRequest[K], ...) error
- func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)
- func (r *EsRepository[K, PK]) GetEvents(ctx context.Context, after, until eventid.EventID, batchSize int, ...) ([]*eventsourcing.Event[K], error)
- func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)
- func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)
- func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace(ctx context.Context, revision int, snapshotThreshold uint32, ...) error
- func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)
- func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error
- type Event
- type EventsRepository
- type Feed
- type FeedOption
- type InTxHandler
- type InTxHandlerContext
- type KVStore
- type Option
- func WithEventsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) Option[K, PK]
- func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]
- func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]
- func WithPostSchemaCreation[K eventsourcing.ID, PK eventsourcing.IDPt[K]](post func(Schema) []bson.D) Option[K, PK]
- func WithSkipSchemaCreation[K eventsourcing.ID, PK eventsourcing.IDPt[K]](skip bool) Option[K, PK]
- func WithSnapshotsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](snapshotsCollection string) Option[K, PK]
- func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]
- type Outbox
- type OutboxRepository
- type Repository
- type Schema
- type Snapshot
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OutboxInsertHandler ¶ added in v0.34.0
func OutboxInsertHandler[K eventsourcing.ID](database, collName string) store.InTxHandler[K]
Types ¶
type ChangeEvent ¶
type ChangeEvent struct {
FullDocument Event `bson:"fullDocument,omitempty"`
}
type EsRepository ¶
type EsRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct { Repository // contains filtered or unexported fields }
func NewStore ¶
func NewStore[K eventsourcing.ID, PK eventsourcing.IDPt[K]](ctx context.Context, client *mongo.Client, database string, opts ...Option[K, PK]) (*EsRepository[K, PK], error)
NewStore creates a new instance of MongoEsRepository
func NewStoreWithURI ¶ added in v0.33.0
func NewStoreWithURI[K eventsourcing.ID, PK eventsourcing.IDPt[K]](ctx context.Context, connString, database string, opts ...Option[K, PK]) (*EsRepository[K, PK], error)
NewStoreWithURI creates a new instance of MongoEsRepository
func (*EsRepository[K, PK]) Client ¶ added in v0.33.0
func (r *EsRepository[K, PK]) Client() *mongo.Client
func (*EsRepository[K, PK]) Close ¶
func (r *EsRepository[K, PK]) Close(ctx context.Context)
func (*EsRepository[K, PK]) Forget ¶
func (r *EsRepository[K, PK]) Forget(ctx context.Context, request eventsourcing.ForgetRequest[K], forget func(kind eventsourcing.Kind, body []byte) ([]byte, error)) error
func (*EsRepository[K, PK]) GetAggregateEvents ¶
func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)
func (*EsRepository[K, PK]) GetEventsByRawIDs ¶ added in v0.35.0
func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)
func (*EsRepository[K, PK]) GetSnapshot ¶
func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)
func (*EsRepository[K, PK]) MigrateInPlaceCopyReplace ¶ added in v0.21.0
func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace( ctx context.Context, revision int, snapshotThreshold uint32, rehydrateFunc func(eventsourcing.Aggregater[K], *eventsourcing.Event[K]) error, codec eventsourcing.Codec[K], handler eventsourcing.MigrationHandler[K], targetAggregateKind eventsourcing.Kind, aggregateKind eventsourcing.Kind, eventTypeCriteria ...eventsourcing.Kind, ) error
func (*EsRepository[K, PK]) SaveEvent ¶
func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)
func (*EsRepository[K, PK]) SaveSnapshot ¶
func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error
type Event ¶
type Event struct { ID string `bson:"_id,omitempty"` AggregateID string `bson:"aggregate_id,omitempty"` AggregateIDHash int32 `bson:"aggregate_id_hash,omitempty"` AggregateVersion uint32 `bson:"aggregate_version,omitempty"` AggregateKind eventsourcing.Kind `bson:"aggregate_kind,omitempty"` Kind eventsourcing.Kind `bson:"kind,omitempty"` Body []byte `bson:"body,omitempty"` Metadata bson.M `bson:"metadata,omitempty"` CreatedAt time.Time `bson:"created_at,omitempty"` Migration int `bson:"migration"` Migrated bool `bson:"migrated"` }
Event is the event data stored in the database
type EventsRepository ¶ added in v0.34.0
type EventsRepository[K eventsourcing.ID] interface { GetEventsByRawIDs(context.Context, []string) ([]*eventsourcing.Event[K], error) }
type Feed ¶
type Feed[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct { // contains filtered or unexported fields }
func NewFeed ¶
func NewFeed[K eventsourcing.ID, PK eventsourcing.IDPt[K]](logger *slog.Logger, connString, database string, sinker sink.Sinker[K], opts ...FeedOption[K, PK]) (Feed[K, PK], error)
type FeedOption ¶
type FeedOption[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*Feed[K, PK])
func WithFeedEventsCollection ¶
func WithFeedEventsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) FeedOption[K, PK]
func WithFilter ¶ added in v0.36.0
func WithFilter[K eventsourcing.ID, PK eventsourcing.IDPt[K]](filter *store.Filter) FeedOption[K, PK]
type InTxHandler ¶ added in v0.39.0
type InTxHandler[K eventsourcing.ID] func(*InTxHandlerContext[K]) error
type InTxHandlerContext ¶ added in v0.39.0
type InTxHandlerContext[K eventsourcing.ID] struct { // contains filtered or unexported fields }
func NewInTxHandlerContext ¶ added in v0.39.0
func NewInTxHandlerContext[K eventsourcing.ID](ctx context.Context, event *eventsourcing.Event[K]) *InTxHandlerContext[K]
func (*InTxHandlerContext[K]) Context ¶ added in v0.39.0
func (c *InTxHandlerContext[K]) Context() context.Context
func (*InTxHandlerContext[K]) Event ¶ added in v0.39.0
func (c *InTxHandlerContext[K]) Event() *eventsourcing.Event[K]
type KVStore ¶ added in v0.34.0
type KVStore struct { Repository // contains filtered or unexported fields }
func NewKVStore ¶ added in v0.34.0
func NewKVStoreWithURI ¶ added in v0.34.0
type Option ¶ added in v0.28.0
type Option[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(f *EsRepository[K, PK])
func WithEventsCollection ¶
func WithEventsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) Option[K, PK]
func WithMetadata ¶ added in v0.36.0
func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]
WithMetadata defines the metadata to be save on every event. Data keys will be converted to lower case
func WithMetadataHook ¶ added in v0.36.0
func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]
WithMetadataHook defines the hook that will return the metadata. This metadata will override any metadata defined at the repository level
func WithPostSchemaCreation ¶ added in v0.38.0
func WithPostSchemaCreation[K eventsourcing.ID, PK eventsourcing.IDPt[K]](post func(Schema) []bson.D) Option[K, PK]
func WithSkipSchemaCreation ¶ added in v0.38.0
func WithSkipSchemaCreation[K eventsourcing.ID, PK eventsourcing.IDPt[K]](skip bool) Option[K, PK]
func WithSnapshotsCollection ¶
func WithSnapshotsCollection[K eventsourcing.ID, PK eventsourcing.IDPt[K]](snapshotsCollection string) Option[K, PK]
func WithTxHandler ¶ added in v0.34.0
func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]
type Outbox ¶ added in v0.34.0
type Outbox struct { ID string `bson:"_id,omitempty"` AggregateID string `bson:"aggregate_id,omitempty"` AggregateIDHash uint32 `bson:"aggregate_id_hash,omitempty"` AggregateKind eventsourcing.Kind `bson:"aggregate_kind,omitempty"` Kind eventsourcing.Kind `bson:"kind,omitempty"` Metadata bson.M `bson:"metadata,omitempty"` }
type OutboxRepository ¶ added in v0.34.0
type OutboxRepository[K eventsourcing.ID] struct { Repository // contains filtered or unexported fields }
func NewOutboxStore ¶ added in v0.34.0
func NewOutboxStore[K eventsourcing.ID](client *mongo.Client, database, collectionName string, eventsRepo EventsRepository[K]) *OutboxRepository[K]
func (*OutboxRepository[K]) PendingEvents ¶ added in v0.34.0
func (r *OutboxRepository[K]) PendingEvents(ctx context.Context, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)
type Repository ¶ added in v0.33.0
type Repository struct {
// contains filtered or unexported fields
}
func (Repository) TxRunner ¶ added in v0.39.0
func (r Repository) TxRunner() store.Tx
type Snapshot ¶
type Snapshot struct { ID string `bson:"_id,omitempty"` AggregateID string `bson:"aggregate_id,omitempty"` AggregateVersion uint32 `bson:"aggregate_version,omitempty"` AggregateKind eventsourcing.Kind `bson:"aggregate_kind,omitempty"` Body []byte `bson:"body,omitempty"` CreatedAt time.Time `bson:"created_at,omitempty"` Metadata bson.M }
Click to show internal directories.
Click to hide internal directories.