Documentation ¶
Index ¶
- Constants
- func AsSlice(stream Stream) (values []map[string]*dynamodb.AttributeValue, err error)
- func BufferedWriteItems(db DynamoBatchWriteItem, requests []*dynamodb.WriteRequest, table string, ...) error
- func CreateOrUpdateTable(ctx context.Context, input *CreateOrUpdateTableInput) error
- type CreateOrUpdateTableInput
- type DynamoBatchGetItem
- type DynamoBatchWriteItem
- type DynamoGetStreamExecutor
- type DynamoQuery
- type GetStreamAdapter
- type ScanStreamExecutor
- type Stream
- func NewArrayStream(results []map[string]*dynamodb.AttributeValue) Stream
- func NewGetStream(executor GetStreamAdapter, keys []map[string]*dynamodb.AttributeValue, ...) Stream
- func NewQueryStream(executor DynamoQuery, queries []*dynamodb.QueryInput) Stream
- func NewScanStream(executor ScanStreamExecutor, tableName string) Stream
Constants ¶
const ( DynamoWriteBatchSize = 25 DynamoReadBatchSize = 100 )
Variables ¶
This section is empty.
Functions ¶
func AsSlice ¶
func AsSlice(stream Stream) (values []map[string]*dynamodb.AttributeValue, err error)
AsSlice read the stream fully and returns a slice.
func BufferedWriteItems ¶
func BufferedWriteItems(db DynamoBatchWriteItem, requests []*dynamodb.WriteRequest, table string, dynamoWriteBatchSize int) error
BufferedWriteItems writes items in batch with backoff and replay unprocessed items
func CreateOrUpdateTable ¶
func CreateOrUpdateTable(ctx context.Context, input *CreateOrUpdateTableInput) error
CreateOrUpdateTable creates the table if it doesn't exist ; or update it. Implementation is not mature and is subject to a lot of limitations (order in which fields and index are deleted)
Types ¶
type CreateOrUpdateTableInput ¶
type CreateOrUpdateTableInput struct { Client *dynamodb.DynamoDB TableName string Definition *dynamodb.CreateTableInput }
type DynamoBatchGetItem ¶
type DynamoBatchGetItem interface {
BatchGetItem(*dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error)
}
DynamoBatchGetItem is implemented by dynamodb.New()
type DynamoBatchWriteItem ¶
type DynamoBatchWriteItem interface {
BatchWriteItem(*dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error)
}
DynamoBatchWriteItem is implemented by dynamodb.New()
type DynamoGetStreamExecutor ¶
type DynamoGetStreamExecutor struct {
// contains filtered or unexported fields
}
DynamoGetStreamExecutor creates the query and execute in on the dynamoDB
type DynamoQuery ¶
type DynamoQuery interface {
Query(d *dynamodb.QueryInput) (*dynamodb.QueryOutput, error)
}
DynamoQuery is implemented by dynamodb.New()
type GetStreamAdapter ¶
type GetStreamAdapter interface {
BatchGet([]map[string]*dynamodb.AttributeValue) (*dynamodb.BatchGetItemOutput, error)
}
GetStreamAdapter is building the batch query before passing it to dynamodb.
func NewGetBatchItem ¶
func NewGetBatchItem(delegate DynamoBatchGetItem, tableName string, projectionExpression string) GetStreamAdapter
type ScanStreamExecutor ¶
type ScanStreamExecutor interface {
Scan(input *dynamodb.ScanInput) (*dynamodb.ScanOutput, error)
}
type Stream ¶
type Stream interface { HasNext() bool // HasNext returns true if it has another element and no error has been raised Next() map[string]*dynamodb.AttributeValue // Next return current element and move forward the cursor Error() error // Error returns the error that interrupted the Stream Count() int64 // Count return the number of element found so far }
Stream is inspired from Java streams to chain transformations in a functional programming style
func NewArrayStream ¶
func NewArrayStream(results []map[string]*dynamodb.AttributeValue) Stream
NewArrayStream creates a stream from a slice ; the slice will be updated.
func NewGetStream ¶
func NewGetStream(executor GetStreamAdapter, keys []map[string]*dynamodb.AttributeValue, bufferSize int64) Stream
NewGetStream batches requests to get each item by their natural key
func NewQueryStream ¶
func NewQueryStream(executor DynamoQuery, queries []*dynamodb.QueryInput) Stream
NewQueryStream creates a stream that will execute the list of queries. If a query is paginated, all the pages will be requested before moving on the next query.
func NewScanStream ¶
func NewScanStream(executor ScanStreamExecutor, tableName string) Stream