Documentation
¶
Index ¶
- Constants
- Variables
- func ContextWithSession(ctx context.Context, s Session) context.Context
- func CreateTable(ctx context.Context, svc AdminAPI, table string) error
- func DeleteTable(ctx context.Context, svc AdminAPI, table string) error
- func IsConditionCheckFailure(err error) bool
- func NewIndexer(dbsvc ClientAPI, table string, opts ...func(cfg *IndexerConfig)) *indexer
- func UnmarshalRecord(ctx context.Context, r Record, serializer event.Serializer) ([]event.Envelope, error)
- type AdminAPI
- type ClientAPI
- type ContextKey
- type Indexer
- type IndexerConfig
- type Item
- type Record
- type Session
- type Store
- func (s *Store) Append(ctx context.Context, id event.StreamID, events []event.Envelope, ...) error
- func (s *Store) AppendToStream(ctx context.Context, stm sourcing.Stream, ...) (err error)
- func (s *Store) Load(ctx context.Context, id event.StreamID, trange ...time.Time) ([]event.Envelope, error)
- func (s *Store) LoadStream(ctx context.Context, id event.StreamID, vrange ...event.Version) (stm *sourcing.Stream, err error)
- func (s *Store) Replay(ctx context.Context, id event.StreamID, q event.StreamerQuery, ...) error
- type StoreConfig
Constants ¶
const ( HashKey string = "_pk" RangeKey string = "_sk" TTLAttribute string = "_ttl" GVerAttribute string = "_gver" LocalIndexRangeKey string = "_lsik" LocalIndex string = "_lsi" )
const (
EventSizeLimit = 256000 // 250 KB
)
Variables ¶
var (
ErrIndexingRecordFailed = errors.New("indexing record failed")
)
var (
ErrTxAlreadyStarted = errors.New("transaction already started")
)
Functions ¶
func IsConditionCheckFailure ¶
IsConditionCheckFailure checks if the given error is an aws error that expresses a conditional failure exception. It works seamlessly in both single write and within a transaction operation.
func NewIndexer ¶
func NewIndexer(dbsvc ClientAPI, table string, opts ...func(cfg *IndexerConfig)) *indexer
NewIndexer returns a dynamodb global stream indexer. It keeps track of the global stream current version and increments sequence accordingly. Indexer has an in-memory cache that works effectively only if the instance receives records that belong to the same global stream, otherwise the cache is cleared and synced with the new global stream current state.
Note that Dynamodb change stream Lambda source mapper allows the use of a such cache mechanism: It does not concurrently distribute a partition-related events to different Lambda instances. At the moment the partition key and global stream have a one-to-one relation. Changing this in the future requires a draining logic to allow a safe transition to the next partition (for a given global stream).
func UnmarshalRecord ¶
Types ¶
type AdminAPI ¶
type AdminAPI interface { ClientAPI dynamodb.DescribeTableAPIClient CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error) DeleteTable(ctx context.Context, params *dynamodb.DeleteTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteTableOutput, error) UpdateTimeToLive(ctx context.Context, params *dynamodb.UpdateTimeToLiveInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateTimeToLiveOutput, error) }
AdminAPI presents a sub part of Dynamodb Client Operations especially the ones related table.
type ClientAPI ¶
type ClientAPI interface { dynamodb.QueryAPIClient GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error) UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error) DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error) TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error) }
ClientAPI presents a sub part of Dynamodb Client Operations especially the ones related to table items.
type ContextKey ¶
type ContextKey string
type IndexerConfig ¶
type IndexerConfig struct{}
type Session ¶
type Session interface { CommitTx(ctx context.Context) error StartTx() error CloseTx() error HasTx() bool Put(ctx context.Context, p *dynamodb.PutItemInput) error Update(ctx context.Context, u *dynamodb.UpdateItemInput) error Check(ctx context.Context, c *types.ConditionCheck) error Delete(ctx context.Context, d *dynamodb.DeleteItemInput) error }
func NewSession ¶
type Store ¶
type Store struct { *StoreConfig // contains filtered or unexported fields }
Store implements event.Store, sourcing.Store, and event.Streamer interfaces
func NewEventStore ¶
func NewEventStore(svc ClientAPI, table string, opts ...func(*StoreConfig)) *Store
NewEventStore returns an implementation of event store interfaces. It panics if ClientAPI or table are empty.
func (*Store) Append ¶
func (s *Store) Append(ctx context.Context, id event.StreamID, events []event.Envelope, optFns ...func(*event.AppendOptions)) error
AppendToStream implements event.Store interface
func (*Store) AppendToStream ¶
func (s *Store) AppendToStream(ctx context.Context, stm sourcing.Stream, optFns ...func(opt *event.AppendOptions)) (err error)
AppendToStream implements sourcing.Store interface
func (*Store) Load ¶
func (s *Store) Load(ctx context.Context, id event.StreamID, trange ...time.Time) ([]event.Envelope, error)
AppendToStream implements event.Store interface
type StoreConfig ¶
type StoreConfig struct {
Serializer event.Serializer
}