Documentation ¶
Index ¶
- func Migrate(conn xsql.DB, config *Config) error
- func MigrateEventPartitions(conn xsql.DB, cfg *Config, aggregateTypes ...string) error
- func MigrateEventStatePartitions(conn xsql.DB, cfg *Config, handlerNames ...string) error
- func ToSnakeCase(s string) string
- type Config
- type EventStateConfig
- type StateStorage
- func (s *StateStorage) As(dst interface{}) error
- func (s *StateStorage) BeginTx(ctx context.Context) (esstate.TxStorage, error)
- func (s *StateStorage) Config() Config
- func (s *StateStorage) Err(err error) error
- func (s *StateStorage) ErrorCode(err error) cgerrors.ErrorCode
- func (s *StateStorage) FindFailures(ctx context.Context, query eventstate.FindFailureQuery) ([]eventstate.HandleFailure, error)
- func (s *StateStorage) FindUnhandled(ctx context.Context, query eventstate.FindUnhandledQuery) ([]eventstate.Unhandled, error)
- func (s *StateStorage) FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
- func (s *StateStorage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*es.Snapshot, error)
- func (s *StateStorage) HandlingFailed(ctx context.Context, failure *eventstate.HandleFailure) error
- func (s *StateStorage) ListEvents(ctx context.Context, aggId, aggType string) ([]*es.Event, error)
- func (s *StateStorage) ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, after int64) ([]*es.Event, error)
- func (s *StateStorage) ListHandlers(ctx context.Context) ([]eventstate.Handler, error)
- func (s *StateStorage) MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error
- func (s *StateStorage) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate.Handler) error
- func (s *StateStorage) SaveEvents(ctx context.Context, es []*es.Event) error
- func (s *StateStorage) SaveSnapshot(ctx context.Context, snap *es.Snapshot) error
- func (s *StateStorage) StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
- func (s *StateStorage) StreamEvents(ctx context.Context, req *es.StreamEventsRequest) (<-chan *es.Event, error)
- type Storage
- func (s *Storage) As(dst interface{}) error
- func (s *Storage) BeginTx(ctx context.Context) (es.TxStorage, error)
- func (s *Storage) Config() Config
- func (s *Storage) Err(err error) error
- func (s *Storage) ErrorCode(err error) cgerrors.ErrorCode
- func (s *Storage) FindFailures(ctx context.Context, query eventstate.FindFailureQuery) ([]eventstate.HandleFailure, error)
- func (s *Storage) FindUnhandled(ctx context.Context, query eventstate.FindUnhandledQuery) ([]eventstate.Unhandled, error)
- func (s *Storage) FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
- func (s *Storage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*es.Snapshot, error)
- func (s *Storage) HandlingFailed(ctx context.Context, failure *eventstate.HandleFailure) error
- func (s *Storage) ListEvents(ctx context.Context, aggId, aggType string) ([]*es.Event, error)
- func (s *Storage) ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, after int64) ([]*es.Event, error)
- func (s *Storage) ListHandlers(ctx context.Context) ([]eventstate.Handler, error)
- func (s *Storage) MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error
- func (s *Storage) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate.Handler) error
- func (s *Storage) SaveEvents(ctx context.Context, es []*es.Event) error
- func (s *Storage) SaveSnapshot(ctx context.Context, snap *es.Snapshot) error
- func (s *Storage) StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
- func (s *Storage) StreamEvents(ctx context.Context, req *es.StreamEventsRequest) (<-chan *es.Event, error)
- type Transaction
- func (t *Transaction) As(dst interface{}) error
- func (t *Transaction) Commit(ctx context.Context) error
- func (t *Transaction) Done() bool
- func (s *Transaction) Err(err error) error
- func (s *Transaction) ErrorCode(err error) cgerrors.ErrorCode
- func (s *Transaction) FindFailures(ctx context.Context, query eventstate.FindFailureQuery) ([]eventstate.HandleFailure, error)
- func (s *Transaction) FindUnhandled(ctx context.Context, query eventstate.FindUnhandledQuery) ([]eventstate.Unhandled, error)
- func (s *Transaction) FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
- func (s *Transaction) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*es.Snapshot, error)
- func (s *Transaction) HandlingFailed(ctx context.Context, failure *eventstate.HandleFailure) error
- func (s *Transaction) ListEvents(ctx context.Context, aggId, aggType string) ([]*es.Event, error)
- func (s *Transaction) ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, after int64) ([]*es.Event, error)
- func (s *Transaction) ListHandlers(ctx context.Context) ([]eventstate.Handler, error)
- func (s *Transaction) MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error
- func (s *Transaction) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate.Handler) error
- func (t *Transaction) Rollback(_ context.Context) error
- func (s *Transaction) SaveEvents(ctx context.Context, es []*es.Event) error
- func (s *Transaction) SaveSnapshot(ctx context.Context, snap *es.Snapshot) error
- func (s *Transaction) StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
- func (s *Transaction) StreamEvents(ctx context.Context, req *es.StreamEventsRequest) (<-chan *es.Event, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Migrate ¶
Migrate executes table and types migration for the event store and snapshot. The table names are taken from the config.
func MigrateEventPartitions ¶ added in v0.0.20
MigrateEventPartitions migrates event partitions
func MigrateEventStatePartitions ¶ added in v0.0.20
MigrateEventStatePartitions create partitions on event state by its type.
func ToSnakeCase ¶ added in v0.0.18
Types ¶
type Config ¶
type Config struct { SchemaName string // Optional EventTable string PartitionEventTable bool SnapshotTable string AggregateTable string AggregateTypes []string EventState *EventStateConfig WorkersCount int }
Config is the configuration for the event storage.
func DefaultConfig ¶
DefaultConfig creates a new default config.
func (*Config) WithEventState ¶ added in v0.0.21
func (c *Config) WithEventState(config EventStateConfig) *Config
WithEventState sets up the event state for given config.
type EventStateConfig ¶ added in v0.0.21
type EventStateConfig struct { EventStateTable string PartitionState bool Handlers []eventstate.Handler HandleFailureTable string HandlerTable string }
EventStateConfig is a configuration for the event state part.
func DefaultEventStateConfig ¶ added in v0.0.21
func DefaultEventStateConfig(handlers ...eventstate.Handler) EventStateConfig
DefaultEventStateConfig is a default configuration for the event state tracking.
func (*EventStateConfig) Validate ¶ added in v0.0.21
func (c *EventStateConfig) Validate() error
type StateStorage ¶ added in v0.0.20
type StateStorage struct {
// contains filtered or unexported fields
}
StateStorage is the implementation of the eventstate.Storage interface. It also implements es.StorageBase.
func NewStateStorage ¶ added in v0.0.20
func NewStateStorage(conn *xsql.Conn, cfg *Config) (*StateStorage, error)
NewStateStorage creates a new event storage based on provided sqlx connection.
func (*StateStorage) As ¶ added in v0.0.20
func (s *StateStorage) As(dst interface{}) error
As exposes driver specific implementation.
func (*StateStorage) Config ¶ added in v0.0.20
func (s *StateStorage) Config() Config
Config gets the storage config.
func (*StateStorage) ErrorCode ¶ added in v0.0.20
ErrorCode gets the error code related to given error.
func (*StateStorage) FindFailures ¶ added in v0.0.20
func (s *StateStorage) FindFailures(ctx context.Context, query eventstate.FindFailureQuery) ([]eventstate.HandleFailure, error)
FindFailures implements eventstate.StorageBase.
func (*StateStorage) FindUnhandled ¶ added in v0.0.20
func (s *StateStorage) FindUnhandled(ctx context.Context, query eventstate.FindUnhandledQuery) ([]eventstate.Unhandled, error)
FindUnhandled implements eventstate.StorageBase interface. Finds all unhandled event state matching given query.
func (*StateStorage) FinishHandling ¶ added in v0.0.20
func (s *StateStorage) FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
FinishHandling implements eventstate.StorageBase.
func (*StateStorage) GetSnapshot ¶ added in v0.0.20
func (s *StateStorage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*es.Snapshot, error)
GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.
func (*StateStorage) HandlingFailed ¶ added in v0.0.20
func (s *StateStorage) HandlingFailed(ctx context.Context, failure *eventstate.HandleFailure) error
HandlingFailed implements eventstate.StorageBase.
func (*StateStorage) ListEvents ¶ added in v0.0.20
ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.
func (*StateStorage) ListEventsAfterRevision ¶ added in v0.0.20
func (s *StateStorage) ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, after int64) ([]*es.Event, error)
ListEventsAfterRevision gets the event stream for given aggregate where the revision is subsequent from provided.
func (*StateStorage) ListHandlers ¶ added in v0.0.20
func (s *StateStorage) ListHandlers(ctx context.Context) ([]eventstate.Handler, error)
ListHandlers implements eventstate.StorageBase.
func (*StateStorage) MarkUnhandled ¶ added in v0.0.20
func (s *StateStorage) MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error
MarkUnhandled implements eventstate.StorageBase.
func (*StateStorage) RegisterHandlers ¶ added in v0.0.20
func (s *StateStorage) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate.Handler) error
RegisterHandlers implements eventstate.StorageBase.
func (*StateStorage) SaveEvents ¶ added in v0.0.20
SaveEvents stores provided events in the database. Implements eventsource.Storage interface.
func (*StateStorage) SaveSnapshot ¶ added in v0.0.20
SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.
func (*StateStorage) StartHandling ¶ added in v0.0.20
func (s *StateStorage) StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
StartHandling implements eventstate.StorageBase.
func (*StateStorage) StreamEvents ¶ added in v0.0.20
func (s *StateStorage) StreamEvents(ctx context.Context, req *es.StreamEventsRequest) (<-chan *es.Event, error)
StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage is the implementation of the eventsource.Storage interface for the sqlx driver.
func (*Storage) BeginTx ¶
BeginTx creates and begins a new transaction, which exposes *sqlx.Tx and allows atomic commits.
func (*Storage) FindFailures ¶ added in v0.0.20
func (s *Storage) FindFailures(ctx context.Context, query eventstate.FindFailureQuery) ([]eventstate.HandleFailure, error)
FindFailures implements eventstate.StorageBase.
func (*Storage) FindUnhandled ¶ added in v0.0.20
func (s *Storage) FindUnhandled(ctx context.Context, query eventstate.FindUnhandledQuery) ([]eventstate.Unhandled, error)
FindUnhandled implements eventstate.StorageBase interface. Finds all unhandled event state matching given query.
func (*Storage) FinishHandling ¶ added in v0.0.20
func (s *Storage) FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
FinishHandling implements eventstate.StorageBase.
func (*Storage) GetSnapshot ¶
func (s *Storage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*es.Snapshot, error)
GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.
func (*Storage) HandlingFailed ¶ added in v0.0.20
func (s *Storage) HandlingFailed(ctx context.Context, failure *eventstate.HandleFailure) error
HandlingFailed implements eventstate.StorageBase.
func (*Storage) ListEvents ¶
ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.
func (*Storage) ListEventsAfterRevision ¶
func (s *Storage) ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, after int64) ([]*es.Event, error)
ListEventsAfterRevision gets the event stream for given aggregate where the revision is subsequent from provided.
func (*Storage) ListHandlers ¶ added in v0.0.20
func (s *Storage) ListHandlers(ctx context.Context) ([]eventstate.Handler, error)
ListHandlers implements eventstate.StorageBase.
func (*Storage) MarkUnhandled ¶ added in v0.0.20
func (s *Storage) MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error
MarkUnhandled implements eventstate.StorageBase.
func (*Storage) RegisterHandlers ¶ added in v0.0.20
func (s *Storage) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate.Handler) error
RegisterHandlers implements eventstate.StorageBase.
func (*Storage) SaveEvents ¶
SaveEvents stores provided events in the database. Implements eventsource.Storage interface.
func (*Storage) SaveSnapshot ¶
SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.
func (*Storage) StartHandling ¶ added in v0.0.20
func (s *Storage) StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
StartHandling implements eventstate.StorageBase.
func (*Storage) StreamEvents ¶
func (s *Storage) StreamEvents(ctx context.Context, req *es.StreamEventsRequest) (<-chan *es.Event, error)
StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction is the implementation of the
func (*Transaction) As ¶
func (t *Transaction) As(dst interface{}) error
As sets the destination with the *sqlx.Tx implementation.
func (*Transaction) Commit ¶
func (t *Transaction) Commit(ctx context.Context) error
Commit commits the transaction.
func (*Transaction) Done ¶
func (t *Transaction) Done() bool
Done checks if the transaction is already done.
func (*Transaction) FindFailures ¶ added in v0.0.20
func (s *Transaction) FindFailures(ctx context.Context, query eventstate.FindFailureQuery) ([]eventstate.HandleFailure, error)
FindFailures implements eventstate.StorageBase.
func (*Transaction) FindUnhandled ¶ added in v0.0.20
func (s *Transaction) FindUnhandled(ctx context.Context, query eventstate.FindUnhandledQuery) ([]eventstate.Unhandled, error)
FindUnhandled implements eventstate.StorageBase interface. Finds all unhandled event state matching given query.
func (*Transaction) FinishHandling ¶ added in v0.0.20
func (s *Transaction) FinishHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
FinishHandling implements eventstate.StorageBase.
func (*Transaction) GetSnapshot ¶
func (s *Transaction) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*es.Snapshot, error)
GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.
func (*Transaction) HandlingFailed ¶ added in v0.0.20
func (s *Transaction) HandlingFailed(ctx context.Context, failure *eventstate.HandleFailure) error
HandlingFailed implements eventstate.StorageBase.
func (*Transaction) ListEvents ¶
ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.
func (*Transaction) ListEventsAfterRevision ¶
func (s *Transaction) ListEventsAfterRevision(ctx context.Context, aggId string, aggType string, after int64) ([]*es.Event, error)
ListEventsAfterRevision gets the event stream for given aggregate where the revision is subsequent from provided.
func (*Transaction) ListHandlers ¶ added in v0.0.20
func (s *Transaction) ListHandlers(ctx context.Context) ([]eventstate.Handler, error)
ListHandlers implements eventstate.StorageBase.
func (*Transaction) MarkUnhandled ¶ added in v0.0.20
func (s *Transaction) MarkUnhandled(ctx context.Context, eventID, eventType string, timestamp int64) error
MarkUnhandled implements eventstate.StorageBase.
func (*Transaction) RegisterHandlers ¶ added in v0.0.20
func (s *Transaction) RegisterHandlers(ctx context.Context, eventHandlers ...eventstate.Handler) error
RegisterHandlers implements eventstate.StorageBase.
func (*Transaction) Rollback ¶
func (t *Transaction) Rollback(_ context.Context) error
Rollback the transaction.
func (*Transaction) SaveEvents ¶
SaveEvents stores provided events in the database. Implements eventsource.Storage interface.
func (*Transaction) SaveSnapshot ¶
SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.
func (*Transaction) StartHandling ¶ added in v0.0.20
func (s *Transaction) StartHandling(ctx context.Context, eventID string, handlerName string, timestamp int64) error
StartHandling implements eventstate.StorageBase.
func (*Transaction) StreamEvents ¶
func (s *Transaction) StreamEvents(ctx context.Context, req *es.StreamEventsRequest) (<-chan *es.Event, error)
StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.