Documentation ¶
Index ¶
- func OutboxInsertHandler[K eventsourcing.ID](tableName string) store.InTxHandler[K]
- type DBConfig
- type EsRepository
- func (r *EsRepository[K, PK]) Forget(ctx context.Context, req eventsourcing.ForgetRequest[K], ...) error
- func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)
- func (r *EsRepository[K, PK]) GetEvents(ctx context.Context, after, until eventid.EventID, batchSize int, ...) ([]*eventsourcing.Event[K], error)
- func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)
- func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)
- func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace(ctx context.Context, revision int, snapshotThreshold uint32, ...) error
- func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)
- func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error
- type Event
- type EventsRepository
- type Feed
- type FeedOption
- func WithBackoffMaxElapsedTime[K eventsourcing.ID, PK eventsourcing.IDPt[K]](duration time.Duration) FeedOption[K, PK]
- func WithFeedEventsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) FeedOption[K, PK]
- func WithFilter[K eventsourcing.ID, PK eventsourcing.IDPt[K]](filter *store.Filter) FeedOption[K, PK]
- func WithFlavour[K eventsourcing.ID, PK eventsourcing.IDPt[K]](flavour string) FeedOption[K, PK]
- type KVStore
- type Option
- func WithEventsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]
- func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]
- func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]
- func WithSnapshotsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]
- func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]
- type OutboxRepository
- type Snapshot
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OutboxInsertHandler ¶ added in v0.34.0
func OutboxInsertHandler[K eventsourcing.ID](tableName string) store.InTxHandler[K]
Types ¶
type EsRepository ¶
type EsRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct { store.Repository // contains filtered or unexported fields }
func NewStore ¶
func NewStore[K eventsourcing.ID, PK eventsourcing.IDPt[K]](db *sql.DB, options ...Option[K, PK]) *EsRepository[K, PK]
func NewStoreWithURL ¶ added in v0.33.0
func NewStoreWithURL[K eventsourcing.ID, PK eventsourcing.IDPt[K]](connString string, options ...Option[K, PK]) (*EsRepository[K, PK], error)
func (*EsRepository[K, PK]) Forget ¶
func (r *EsRepository[K, PK]) Forget(ctx context.Context, req eventsourcing.ForgetRequest[K], forget func(kind eventsourcing.Kind, body []byte) ([]byte, error)) error
func (*EsRepository[K, PK]) GetAggregateEvents ¶
func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)
func (*EsRepository[K, PK]) GetEventsByRawIDs ¶ added in v0.35.0
func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)
func (*EsRepository[K, PK]) GetSnapshot ¶
func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)
func (*EsRepository[K, PK]) MigrateInPlaceCopyReplace ¶ added in v0.21.0
func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace( ctx context.Context, revision int, snapshotThreshold uint32, rehydrateFunc func(eventsourcing.Aggregater[K], *eventsourcing.Event[K]) error, codec eventsourcing.Codec[K], handler eventsourcing.MigrationHandler[K], targetAggregateKind eventsourcing.Kind, aggregateKind eventsourcing.Kind, eventTypeCriteria ...eventsourcing.Kind, ) error
func (*EsRepository[K, PK]) SaveEvent ¶
func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)
func (*EsRepository[K, PK]) SaveSnapshot ¶
func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error
type Event ¶
type Event struct { ID eventid.EventID AggregateID string AggregateIDHash int32 AggregateVersion uint32 AggregateKind eventsourcing.Kind Kind eventsourcing.Kind Body []byte CreatedAt time.Time Migration int Migrated bool Metadata eventsourcing.Metadata }
Event is the event data is stored in the database
type EventsRepository ¶ added in v0.34.0
type EventsRepository[K eventsourcing.ID] interface { GetEventsByRawIDs(context.Context, []string) ([]*eventsourcing.Event[K], error) }
type Feed ¶
type Feed[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct { // contains filtered or unexported fields }
func NewFeed ¶
func NewFeed[K eventsourcing.ID, PK eventsourcing.IDPt[K]](logger *slog.Logger, config DBConfig, sinker sink.Sinker[K], opts ...FeedOption[K, PK]) (*Feed[K, PK], error)
type FeedOption ¶
type FeedOption[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*Feed[K, PK])
func WithBackoffMaxElapsedTime ¶ added in v0.18.0
func WithBackoffMaxElapsedTime[K eventsourcing.ID, PK eventsourcing.IDPt[K]](duration time.Duration) FeedOption[K, PK]
func WithFeedEventsTable ¶ added in v0.36.0
func WithFeedEventsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) FeedOption[K, PK]
func WithFilter ¶ added in v0.36.0
func WithFilter[K eventsourcing.ID, PK eventsourcing.IDPt[K]](filter *store.Filter) FeedOption[K, PK]
func WithFlavour ¶
func WithFlavour[K eventsourcing.ID, PK eventsourcing.IDPt[K]](flavour string) FeedOption[K, PK]
type KVStore ¶ added in v0.34.0
type KVStore struct { store.Repository // contains filtered or unexported fields }
func NewKVStoreWithURL ¶ added in v0.34.0
type Option ¶ added in v0.28.0
type Option[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*EsRepository[K, PK])
func WithEventsTable ¶ added in v0.36.0
func WithEventsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]
func WithMetadata ¶ added in v0.36.0
func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]
WithMetadata defines the metadata to be save on every event. Data keys will be converted to lower case
func WithMetadataHook ¶ added in v0.36.0
func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]
WithMetadataHook defines the hook that will return the metadata. This metadata will override any metadata defined at the repository level
func WithSnapshotsTable ¶ added in v0.36.0
func WithSnapshotsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]
func WithTxHandler ¶ added in v0.34.0
func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]
type OutboxRepository ¶ added in v0.34.0
type OutboxRepository[K eventsourcing.ID] struct { store.Repository // contains filtered or unexported fields }
func NewOutboxStore ¶ added in v0.34.0
func NewOutboxStore[K eventsourcing.ID](db *sql.DB, tableName string, eventsRepo EventsRepository[K]) *OutboxRepository[K]
func (*OutboxRepository[K]) PendingEvents ¶ added in v0.34.0
func (r *OutboxRepository[K]) PendingEvents(ctx context.Context, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)
Click to show internal directories.
Click to hide internal directories.