dynamoutils

package
v1.5.39 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 6, 2024 License: AGPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DynamoWriteBatchSize = 25
	DynamoReadBatchSize  = 100
)

Variables

View Source
var (
	EmptyString = &types.AttributeValueMemberS{Value: ""}
)

Functions

func AsSlice

func AsSlice(stream Stream) (values []map[string]types.AttributeValue, err error)

AsSlice read the stream fully and returns a slice.

func AttributeValueMemberS added in v1.5.26

func AttributeValueMemberS(value string) types.AttributeValue

func BufferedPutItems added in v1.5.26

func BufferedPutItems(ctx context.Context, db DynamoBatchWriteItem, items []interface{}, table string, dynamoWriteBatchSize int) error

func BufferedWriteItems

func BufferedWriteItems(ctx context.Context, db DynamoBatchWriteItem, requests []types.WriteRequest, table string, dynamoWriteBatchSize int) error

BufferedWriteItems writes items in batch with backoff and replay unprocessed items

func CreateOrUpdateTable

func CreateOrUpdateTable(ctx context.Context, input *CreateOrUpdateTableInput) error

CreateOrUpdateTable creates the table if it doesn't exist ; or update it. Implementation is not mature and is subject to a lot of limitations (order in which fields and index are deleted)

Types

type CreateOrUpdateTableInput

type CreateOrUpdateTableInput struct {
	Client     *dynamodb.Client
	TableName  string
	Definition *dynamodb.CreateTableInput
}

type DynamoBatchGetItem

type DynamoBatchGetItem interface {
	BatchGetItem(ctx context.Context, params *dynamodb.BatchGetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.BatchGetItemOutput, error)
}

DynamoBatchGetItem is implemented by dynamodb.New()

type DynamoBatchWriteItem

type DynamoBatchWriteItem interface {
	BatchWriteItem(ctx context.Context, params *dynamodb.BatchWriteItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.BatchWriteItemOutput, error)
}

DynamoBatchWriteItem is implemented by dynamodb.New()

type DynamoGetStreamExecutor

type DynamoGetStreamExecutor struct {
	// contains filtered or unexported fields
}

DynamoGetStreamExecutor creates the query and execute in on the dynamoDB

type DynamoQuery

type DynamoQuery interface {
	Query(ctx context.Context, d *dynamodb.QueryInput, optFns ...func(*dynamodb.Options)) (*dynamodb.QueryOutput, error)
}

DynamoQuery is implemented by dynamodb.New()

type GetStreamAdapter

type GetStreamAdapter interface {
	BatchGet(ctx context.Context, key []map[string]types.AttributeValue) (*dynamodb.BatchGetItemOutput, error)
}

GetStreamAdapter is building the batch query before passing it to dynamodb.

func NewGetBatchItem

func NewGetBatchItem(delegate DynamoBatchGetItem, tableName string, projectionExpression string) GetStreamAdapter

type ScanStreamExecutor

type ScanStreamExecutor interface {
	Scan(ctx context.Context, params *dynamodb.ScanInput, optFns ...func(*dynamodb.Options)) (*dynamodb.ScanOutput, error)
}

type Stream

type Stream interface {
	HasNext() bool                         // HasNext returns true if it has another element and no error has been raised
	Next() map[string]types.AttributeValue // Next return current element and move forward the cursor
	Error() error                          // Error returns the error that interrupted the Stream
	Count() int64                          // Count return the number of element found so far
}

Stream is inspired from Java streams to chain transformations in a functional programming style

func NewArrayStream

func NewArrayStream(results []map[string]types.AttributeValue) Stream

NewArrayStream creates a stream from a slice ; the slice will be updated.

func NewGetStream

func NewGetStream(ctx context.Context, executor GetStreamAdapter, keys []map[string]types.AttributeValue, bufferSize int64) Stream

NewGetStream batches requests to get each item by their natural key

func NewQueryStream

func NewQueryStream(ctx context.Context, executor DynamoQuery, queries []*dynamodb.QueryInput) Stream

NewQueryStream creates a stream that will execute the list of queries. If a query is paginated, all the pages will be requested before moving on the next query.

func NewScanStream

func NewScanStream(ctx context.Context, executor ScanStreamExecutor, tableName string) Stream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL