mysql

package
v0.39.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OutboxInsertHandler added in v0.34.0

func OutboxInsertHandler[K eventsourcing.ID](tableName string) store.InTxHandler[K]

Types

type DBConfig

type DBConfig struct {
	Database string
	Host     string
	Port     int
	Username string
	Password string
}

type EsRepository

type EsRepository[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	store.Repository
	// contains filtered or unexported fields
}

func NewStore

func NewStore[K eventsourcing.ID, PK eventsourcing.IDPt[K]](db *sql.DB, options ...Option[K, PK]) *EsRepository[K, PK]

func NewStoreWithURL added in v0.33.0

func NewStoreWithURL[K eventsourcing.ID, PK eventsourcing.IDPt[K]](connString string, options ...Option[K, PK]) (*EsRepository[K, PK], error)

func (*EsRepository[K, PK]) Forget

func (r *EsRepository[K, PK]) Forget(ctx context.Context, req eventsourcing.ForgetRequest[K], forget func(kind eventsourcing.Kind, body []byte) ([]byte, error)) error

func (*EsRepository[K, PK]) GetAggregateEvents

func (r *EsRepository[K, PK]) GetAggregateEvents(ctx context.Context, aggregateID K, snapVersion int) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetEvents

func (r *EsRepository[K, PK]) GetEvents(ctx context.Context, after, until eventid.EventID, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetEventsByRawIDs added in v0.35.0

func (r *EsRepository[K, PK]) GetEventsByRawIDs(ctx context.Context, ids []string) ([]*eventsourcing.Event[K], error)

func (*EsRepository[K, PK]) GetSnapshot

func (r *EsRepository[K, PK]) GetSnapshot(ctx context.Context, aggregateID K) (eventsourcing.Snapshot[K], error)

func (*EsRepository[K, PK]) MigrateInPlaceCopyReplace added in v0.21.0

func (r *EsRepository[K, PK]) MigrateInPlaceCopyReplace(
	ctx context.Context,
	revision int,
	snapshotThreshold uint32,
	rehydrateFunc func(eventsourcing.Aggregater[K], *eventsourcing.Event[K]) error,
	codec eventsourcing.Codec[K],
	handler eventsourcing.MigrationHandler[K],
	targetAggregateKind eventsourcing.Kind,
	aggregateKind eventsourcing.Kind,
	eventTypeCriteria ...eventsourcing.Kind,
) error

func (*EsRepository[K, PK]) SaveEvent

func (r *EsRepository[K, PK]) SaveEvent(ctx context.Context, eRec *eventsourcing.EventRecord[K]) (eventid.EventID, uint32, error)

func (*EsRepository[K, PK]) SaveSnapshot

func (r *EsRepository[K, PK]) SaveSnapshot(ctx context.Context, snapshot *eventsourcing.Snapshot[K]) error

type Event

type Event struct {
	ID               eventid.EventID
	AggregateID      string
	AggregateIDHash  int32
	AggregateVersion uint32
	AggregateKind    eventsourcing.Kind
	Kind             eventsourcing.Kind
	Body             []byte
	CreatedAt        time.Time
	Migration        int
	Migrated         bool
	Metadata         eventsourcing.Metadata
}

Event is the event data is stored in the database

type EventsRepository added in v0.34.0

type EventsRepository[K eventsourcing.ID] interface {
	GetEventsByRawIDs(context.Context, []string) ([]*eventsourcing.Event[K], error)
}

type Feed

type Feed[K eventsourcing.ID, PK eventsourcing.IDPt[K]] struct {
	// contains filtered or unexported fields
}

func NewFeed

func NewFeed[K eventsourcing.ID, PK eventsourcing.IDPt[K]](logger *slog.Logger, config DBConfig, sinker sink.Sinker[K], opts ...FeedOption[K, PK]) (*Feed[K, PK], error)

func (*Feed[K, PK]) Run added in v0.25.0

func (f *Feed[K, PK]) Run(ctx context.Context) error

type FeedOption

type FeedOption[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*Feed[K, PK])

func WithBackoffMaxElapsedTime added in v0.18.0

func WithBackoffMaxElapsedTime[K eventsourcing.ID, PK eventsourcing.IDPt[K]](duration time.Duration) FeedOption[K, PK]

func WithFeedEventsTable added in v0.36.0

func WithFeedEventsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](eventsCollection string) FeedOption[K, PK]

func WithFilter added in v0.36.0

func WithFilter[K eventsourcing.ID, PK eventsourcing.IDPt[K]](filter *store.Filter) FeedOption[K, PK]

func WithFlavour

func WithFlavour[K eventsourcing.ID, PK eventsourcing.IDPt[K]](flavour string) FeedOption[K, PK]

type KVStore added in v0.34.0

type KVStore struct {
	store.Repository
	// contains filtered or unexported fields
}

func NewKVStore added in v0.34.0

func NewKVStore(db *sql.DB, table string) KVStore

func NewKVStoreWithURL added in v0.34.0

func NewKVStoreWithURL(connString string, table string) (KVStore, error)

func (KVStore) Get added in v0.34.0

func (r KVStore) Get(ctx context.Context, key string) (string, error)

func (KVStore) Put added in v0.34.0

func (m KVStore) Put(ctx context.Context, key string, value string) error

type Option added in v0.28.0

type Option[K eventsourcing.ID, PK eventsourcing.IDPt[K]] func(*EsRepository[K, PK])

func WithEventsTable added in v0.36.0

func WithEventsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]

func WithMetadata added in v0.36.0

func WithMetadata[K eventsourcing.ID, PK eventsourcing.IDPt[K]](metadata eventsourcing.Metadata) Option[K, PK]

WithMetadata defines the metadata to be save on every event. Data keys will be converted to lower case

func WithMetadataHook added in v0.36.0

func WithMetadataHook[K eventsourcing.ID, PK eventsourcing.IDPt[K]](fn store.MetadataHook[K]) Option[K, PK]

WithMetadataHook defines the hook that will return the metadata. This metadata will override any metadata defined at the repository level

func WithSnapshotsTable added in v0.36.0

func WithSnapshotsTable[K eventsourcing.ID, PK eventsourcing.IDPt[K]](table string) Option[K, PK]

func WithTxHandler added in v0.34.0

func WithTxHandler[K eventsourcing.ID, PK eventsourcing.IDPt[K]](txHandler store.InTxHandler[K]) Option[K, PK]

type OutboxRepository added in v0.34.0

type OutboxRepository[K eventsourcing.ID] struct {
	store.Repository
	// contains filtered or unexported fields
}

func NewOutboxStore added in v0.34.0

func NewOutboxStore[K eventsourcing.ID](db *sql.DB, tableName string, eventsRepo EventsRepository[K]) *OutboxRepository[K]

func (*OutboxRepository[K]) AfterSink added in v0.34.0

func (r *OutboxRepository[K]) AfterSink(ctx context.Context, eID eventid.EventID) error

func (*OutboxRepository[K]) PendingEvents added in v0.34.0

func (r *OutboxRepository[K]) PendingEvents(ctx context.Context, batchSize int, filter store.Filter) ([]*eventsourcing.Event[K], error)

type Snapshot

type Snapshot struct {
	ID               eventid.EventID
	AggregateID      string
	AggregateVersion uint32
	AggregateKind    eventsourcing.Kind
	Body             []byte
	CreatedAt        time.Time
	Metadata         eventsourcing.Metadata
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL