Documentation ¶
Index ¶
- Constants
- Variables
- func ContextWithSession(ctx context.Context, s *Session) context.Context
- func CreateTable(ctx context.Context, svc AdminAPI, table string) error
- func DeleteTable(ctx context.Context, svc AdminAPI, table string) error
- func IsConditionCheckFailure(err error) bool
- func IsConditionCheckFailureWithItem(err error, hashKey, rangeKey string) (bool, bool)
- func NewIndexer(api ClientAPI, table string, opts ...func(cfg *IndexerConfig)) *indexer
- func UnpackRecord(ctx context.Context, r Record, serializer event.Serializer) ([]event.Envelope, error)
- type AdminAPI
- type ClientAPI
- type ContextKey
- type Indexer
- type IndexerConfig
- type Item
- type Record
- type Session
- func (s *Session) AddToTx(ctx context.Context, ops []any) error
- func (s *Session) Check(ctx context.Context, c *types.ConditionCheck) error
- func (s *Session) CloseTx() error
- func (s *Session) CommitTx(ctx context.Context) (err error)
- func (s *Session) ConsumedCapacity() *consumedCapacity
- func (s *Session) Delete(ctx context.Context, d *dynamodb.DeleteItemInput) error
- func (s *Session) HasTx() bool
- func (s *Session) Put(ctx context.Context, p *dynamodb.PutItemInput) error
- func (s *Session) StartTx() error
- func (s *Session) Update(ctx context.Context, u *dynamodb.UpdateItemInput) error
- type Store
- func (s Store) Append(ctx context.Context, id event.StreamID, events []event.Envelope, ...) (err error)
- func (s Store) AppendToStream(ctx context.Context, stm sourcing.Stream, ...) (err error)
- func (s Store) Load(ctx context.Context, id event.StreamID, trange ...time.Time) (events []event.Envelope, err error)
- func (s Store) LoadStream(ctx context.Context, id event.StreamID, vrange ...event.Version) (stm *sourcing.Stream, err error)
- func (s Store) Replay(ctx context.Context, id event.StreamID, q event.StreamerQuery, ...) (err error)
- type StoreConfig
Constants ¶
const ( VerAttribute string = "ver" GVerAttribute string = "gver" GIDAttribute string = "gid" )
Variables ¶
var ( ErrTxAlreadyStarted = errors.New("transaction already started") ErrTxInvalidItemType = errors.New("invalid transaction item type") )
var (
ErrIndexingRecordFailed = errors.New("indexing record has failed")
)
Functions ¶
func IsConditionCheckFailure ¶
IsConditionCheckFailure checks if the given error is an aws error that expresses a conditional failure exception. It works seamlessly in both single write and within a transaction operation.
func IsConditionCheckFailureWithItem ¶ added in v0.0.2
func NewIndexer ¶
func NewIndexer(api ClientAPI, table string, opts ...func(cfg *IndexerConfig)) *indexer
NewIndexer returns a dynamodb global stream indexer. It keeps track of the global stream current version and increments sequence accordingly by using a Dynamodb LSI
It has an in-memory cache that works effectively only if the instance keeps receiving records from the same global stream. Otherwise the cache is cleared and synced with the new global stream's current state.
The use of a such cache is only possible because Dynamodb Lambda event source mapper allows it:
- It does not concurrently distribute a partition-related record to different Lambda instances.
- The partition key and global stream have a one-to-one relation.
The later one-to-one point might change in the future to overcome the 10GB size limit. This may require a draining logic to allow a safer transition to the next partition.
func UnpackRecord ¶ added in v0.0.2
func UnpackRecord(ctx context.Context, r Record, serializer event.Serializer) ([]event.Envelope, error)
UnpackRecord does unmarshal events contained in the record and set their respective global stream sequence. It fails if the record is not indexed, and it panics if an event envelope does not implement event.GlobalVersionSetter interface.
Types ¶
type AdminAPI ¶
type AdminAPI interface { ClientAPI dynamodb.DescribeTableAPIClient CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error) DeleteTable(ctx context.Context, params *dynamodb.DeleteTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteTableOutput, error) UpdateTimeToLive(ctx context.Context, params *dynamodb.UpdateTimeToLiveInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateTimeToLiveOutput, error) }
AdminAPI presents a sub part of Dynamodb Client Operations especially the ones related table.
type ClientAPI ¶
type ClientAPI interface { dynamodb.QueryAPIClient GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) }
ClientAPI presents a sub part of Dynamodb Client Operations especially the ones related to table items.
type ContextKey ¶
type ContextKey string
type IndexerConfig ¶
type IndexerConfig struct{}
type Record ¶
type Session ¶
type Session struct {
// contains filtered or unexported fields
}
func NewSession ¶
func (*Session) ConsumedCapacity ¶ added in v0.0.2
func (s *Session) ConsumedCapacity() *consumedCapacity
ConsumedCapacity implements Session.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store implements event.Store, sourcing.Store, and event.Streamer interfaces
func NewEventStore ¶
func NewEventStore(api ClientAPI, table string, opts ...func(*StoreConfig)) *Store
NewEventStore returns an implementation of event store interfaces. It panics if ClientAPI or table are empty.
func (Store) Append ¶
func (s Store) Append(ctx context.Context, id event.StreamID, events []event.Envelope, opts ...func(*event.AppendConfig)) (err error)
AppendToStream implements event.Store interface
func (Store) AppendToStream ¶
func (s Store) AppendToStream(ctx context.Context, stm sourcing.Stream, optFns ...func(opt *event.AppendConfig)) (err error)
AppendToStream implements sourcing.Store interface
func (Store) Load ¶
func (s Store) Load(ctx context.Context, id event.StreamID, trange ...time.Time) (events []event.Envelope, err error)
AppendToStream implements event.Store interface
type StoreConfig ¶
type StoreConfig struct { // Serializer presents the event serializer. By default, the store uses the JSON event serializer. Serializer event.Serializer // AppendEventOptions presents the default pre-append options applied in every events' Append call. AppendEventOptions []func(*event.AppendConfig) // RecordSizeLimit describes the size limit in bytes of events' appended to the stream. // It's ignored if empty. RecordSizeLimit int }
StoreConfig presents the config of dynamodb-based event store implementation.