dynamodb

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2023 License: MIT Imports: 18 Imported by: 3

Documentation

Index

Constants

View Source
const (
	HashKey            string = "_pk"
	RangeKey           string = "_sk"
	TTLAttribute       string = "_ttl"
	GVerAttribute      string = "_gver"
	LocalIndexRangeKey string = "_lsik"
	LocalIndex         string = "_lsi"
)
View Source
const (
	EventSizeLimit = 256000 // 250 KB
)

Variables

View Source
var (
	ErrIndexingRecordFailed = errors.New("indexing record failed")
)
View Source
var (
	ErrTxAlreadyStarted = errors.New("transaction already started")
)

Functions

func ContextWithSession

func ContextWithSession(ctx context.Context, s Session) context.Context

func CreateTable

func CreateTable(ctx context.Context, svc AdminAPI, table string) error

func DeleteTable

func DeleteTable(ctx context.Context, svc AdminAPI, table string) error

func IsConditionCheckFailure

func IsConditionCheckFailure(err error) bool

IsConditionCheckFailure checks if the given error is an aws error that expresses a conditional failure exception. It works seamlessly in both single write and within a transaction operation.

func NewIndexer

func NewIndexer(dbsvc ClientAPI, table string, opts ...func(cfg *IndexerConfig)) *indexer

NewIndexer returns a dynamodb global stream indexer. It keeps track of the global stream current version and increments sequence accordingly. Indexer has an in-memory cache that works effectively only if the instance receives records that belong to the same global stream, otherwise the cache is cleared and synced with the new global stream current state.

Note that Dynamodb change stream Lambda source mapper allows the use of a such cache mechanism: It does not concurrently distribute a partition-related events to different Lambda instances. At the moment the partition key and global stream have a one-to-one relation. Changing this in the future requires a draining logic to allow a safe transition to the next partition (for a given global stream).

func UnmarshalRecord

func UnmarshalRecord(ctx context.Context, r Record, serializer event.Serializer) ([]event.Envelope, error)

Types

type AdminAPI

type AdminAPI interface {
	ClientAPI

	dynamodb.DescribeTableAPIClient

	CreateTable(ctx context.Context, params *dynamodb.CreateTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.CreateTableOutput, error)
	DeleteTable(ctx context.Context, params *dynamodb.DeleteTableInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteTableOutput, error)
	UpdateTimeToLive(ctx context.Context, params *dynamodb.UpdateTimeToLiveInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateTimeToLiveOutput, error)
}

AdminAPI presents a sub part of Dynamodb Client Operations especially the ones related table.

type ClientAPI

type ClientAPI interface {
	dynamodb.QueryAPIClient

	GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error)
	PutItem(ctx context.Context, params *dynamodb.PutItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.PutItemOutput, error)
	UpdateItem(ctx context.Context, params *dynamodb.UpdateItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.UpdateItemOutput, error)
	DeleteItem(ctx context.Context, params *dynamodb.DeleteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.DeleteItemOutput, error)
	TransactWriteItems(ctx context.Context, params *dynamodb.TransactWriteItemsInput, optFns ...func(*dynamodb.Options)) (*dynamodb.TransactWriteItemsOutput, error)
}

ClientAPI presents a sub part of Dynamodb Client Operations especially the ones related to table items.

func NewClient

func NewClient(cfg aws.Config) ClientAPI

type ContextKey

type ContextKey string

type Indexer

type Indexer interface {
	Index(ctx context.Context, rec Record) (err error)
}

type IndexerConfig

type IndexerConfig struct{}

type Item

type Item struct {
	HashKey     string `dynamodbav:"_pk"`
	RangeKey    string `dynamodbav:"_sk"`
	LSIRangeKey string `dynamodbav:"_lsik,omitempty" localIndex:"_lsi,range"`
	TTL         int64  `dynamodbav:"_ttl,omitempty"`
}

type Record

type Record struct {
	Item
	Events []byte `dynamodbav:"_evts"`

	Since    int64  `dynamodbav:"_since"`
	Until    int64  `dynamodbav:"_until"`
	Version  string `dynamodbav:"_ver,omitempty"`
	GID      string `dynamodbav:"_gid"`
	GVersion string `dynamodbav:"_gver,omitempty"`
}

type Session

type Session interface {
	CommitTx(ctx context.Context) error
	StartTx() error
	CloseTx() error
	HasTx() bool

	Put(ctx context.Context, p *dynamodb.PutItemInput) error
	Update(ctx context.Context, u *dynamodb.UpdateItemInput) error
	Check(ctx context.Context, c *types.ConditionCheck) error
	Delete(ctx context.Context, d *dynamodb.DeleteItemInput) error
}

func NewSession

func NewSession(db ClientAPI) Session

func SessionFrom

func SessionFrom(ctx context.Context) (Session, bool)

type Store

type Store struct {
	*StoreConfig
	// contains filtered or unexported fields
}

Store implements event.Store, sourcing.Store, and event.Streamer interfaces

func NewEventStore

func NewEventStore(svc ClientAPI, table string, opts ...func(*StoreConfig)) *Store

NewEventStore returns an implementation of event store interfaces. It panics if ClientAPI or table are empty.

func (*Store) Append

func (s *Store) Append(ctx context.Context, id event.StreamID, events []event.Envelope, optFns ...func(*event.AppendOptions)) error

AppendToStream implements event.Store interface

func (*Store) AppendToStream

func (s *Store) AppendToStream(ctx context.Context, stm sourcing.Stream, optFns ...func(opt *event.AppendOptions)) (err error)

AppendToStream implements sourcing.Store interface

func (*Store) Load

func (s *Store) Load(ctx context.Context, id event.StreamID, trange ...time.Time) ([]event.Envelope, error)

AppendToStream implements event.Store interface

func (*Store) LoadStream

func (s *Store) LoadStream(ctx context.Context, id event.StreamID, vrange ...event.Version) (stm *sourcing.Stream, err error)

AppendToStream implements sourcing.Store interface

func (*Store) Replay

type StoreConfig

type StoreConfig struct {
	Serializer event.Serializer
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL