Documentation ¶
Index ¶
- func Migrate(config *Config, conn *sqlx.DB) error
- type Config
- type Storage
- func (s *Storage) As(dst interface{}) error
- func (s *Storage) BeginTx(ctx context.Context) (*Transaction, error)
- func (s *Storage) Err(err error) error
- func (s *Storage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)
- func (s *Storage) ListEvents(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)
- func (s *Storage) ListEventsFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)
- func (s *Storage) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)
- func (s *Storage) SaveEvents(ctx context.Context, es []*eventsource.Event) error
- func (s *Storage) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error
- func (s *Storage) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)
- type Transaction
- func (t *Transaction) As(dst interface{}) error
- func (t *Transaction) Commit() error
- func (t *Transaction) Done() bool
- func (s *Transaction) Err(err error) error
- func (s *Transaction) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)
- func (s *Transaction) ListEvents(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)
- func (s *Transaction) ListEventsFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)
- func (s *Transaction) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)
- func (t *Transaction) Rollback() error
- func (s *Transaction) SaveEvents(ctx context.Context, es []*eventsource.Event) error
- func (s *Transaction) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error
- func (s *Transaction) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Config ¶
type Config struct { EventTable string SnapshotTable string SchemaName string // Optional AggregateTable string WorkersCount int }
Config is the configuration for the event storage.
type Storage ¶
type Storage struct {
// contains filtered or unexported fields
}
Storage is the implementation of the eventsource.Storage interface for the sqlx driver.
func (*Storage) BeginTx ¶ added in v0.0.9
func (s *Storage) BeginTx(ctx context.Context) (*Transaction, error)
BeginTx creates and begins a new transaction, which exposes *sqlx.Tx and allows atomic commits.
func (*Storage) GetSnapshot ¶
func (s *Storage) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)
GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.
func (*Storage) ListEvents ¶ added in v0.0.9
func (s *Storage) ListEvents(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)
ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.
func (*Storage) ListEventsFromRevision ¶ added in v0.0.9
func (s *Storage) ListEventsFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)
ListEventsFromRevision gets the event stream for given aggregate where the revision is subsequent from provided.
func (*Storage) NewCursor ¶
func (s *Storage) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)
NewCursor creates a new cursor.
func (*Storage) SaveEvents ¶
func (s *Storage) SaveEvents(ctx context.Context, es []*eventsource.Event) error
SaveEvents stores provided events in the database. Implements eventsource.Storage interface.
func (*Storage) SaveSnapshot ¶
func (s *Storage) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error
SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.
func (*Storage) StreamEvents ¶
func (s *Storage) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)
StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction is the implementation of the
func (*Transaction) As ¶ added in v0.0.9
func (t *Transaction) As(dst interface{}) error
As sets the destination with the *sqlx.Tx implementation.
func (*Transaction) Done ¶ added in v0.0.11
func (t *Transaction) Done() bool
Done checks if the transaction is already done.
func (*Transaction) GetSnapshot ¶
func (s *Transaction) GetSnapshot(ctx context.Context, aggId string, aggType string, aggVersion int64) (*eventsource.Snapshot, error)
GetSnapshot gets the latest snapshot for given aggregate. Implements eventsource.Storage interface.
func (*Transaction) ListEvents ¶ added in v0.0.9
func (s *Transaction) ListEvents(ctx context.Context, aggId, aggType string) ([]*eventsource.Event, error)
ListEvents gets the event stream for provided aggregate. Implements eventsource.Storage interface.
func (*Transaction) ListEventsFromRevision ¶ added in v0.0.9
func (s *Transaction) ListEventsFromRevision(ctx context.Context, aggId string, aggType string, from int64) ([]*eventsource.Event, error)
ListEventsFromRevision gets the event stream for given aggregate where the revision is subsequent from provided.
func (*Transaction) NewCursor ¶
func (s *Transaction) NewCursor(ctx context.Context, aggType string, aggVersion int64) (eventsource.Cursor, error)
NewCursor creates a new cursor.
func (*Transaction) Rollback ¶ added in v0.0.11
func (t *Transaction) Rollback() error
Rollback the transaction.
func (*Transaction) SaveEvents ¶
func (s *Transaction) SaveEvents(ctx context.Context, es []*eventsource.Event) error
SaveEvents stores provided events in the database. Implements eventsource.Storage interface.
func (*Transaction) SaveSnapshot ¶
func (s *Transaction) SaveSnapshot(ctx context.Context, snap *eventsource.Snapshot) error
SaveSnapshot stores the snapshot in the database. Implements eventsource.Storage interface.
func (*Transaction) StreamEvents ¶
func (s *Transaction) StreamEvents(ctx context.Context, req *eventsource.StreamEventsRequest) (<-chan *eventsource.Event, error)
StreamEvents opens the channel of the events stream that matches given request. Implements eventsource.Storage.