Documentation ¶
Index ¶
- type AllGroup
- type Consumer
- type CoordinatorStore
- type Group
- type Option
- func WithClient(client kinesisiface.KinesisAPI) Option
- func WithGroup(group Group) Option
- func WithMaxRecords(n int64) Option
- func WithScanInterval(d time.Duration) Option
- func WithShardClosedHandler(h ShardClosedHandler) Option
- func WithShardIteratorType(t string) Option
- func WithStore(store Store) Option
- func WithTimestamp(t time.Time) Option
- type Record
- type ScanFunc
- type ShardClosedHandler
- type Store
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AllGroup ¶
type AllGroup struct { Store // contains filtered or unexported fields }
AllGroup is used to consume all shards from a single consumer. It caches a local list of the shards we are already processing and routinely polls the stream looking for new shards to process.
func NewAllGroup ¶
func NewAllGroup(ksis kinesisiface.KinesisAPI, store Store, streamName string, logger log.Logger) *AllGroup
NewAllGroup returns an intitialized AllGroup for consuming all shards on a stream
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) Scan ¶
Scan launches a goroutine to process each of the shards in the stream. The ScanFunc is passed through to each of the goroutines and called with each message pulled from the stream.
func (*Consumer) ScanShard ¶
ScanShard loops over records on a specific shard, calls the callback func for each record and checkpoints the progress of scan.
func (*Consumer) SetCheckpoint ¶
type CoordinatorStore ¶
type CoordinatorStore struct {
// contains filtered or unexported fields
}
func NewCoordinatorStore ¶
func NewCoordinatorStore(cp coordinator.TransferState, transferID string) *CoordinatorStore
func (*CoordinatorStore) GetCheckpoint ¶
func (c *CoordinatorStore) GetCheckpoint(streamName, shardID string) (string, error)
func (*CoordinatorStore) SetCheckpoint ¶
func (c *CoordinatorStore) SetCheckpoint(streamName, shardID, sequenceNumber string) error
type Group ¶
type Group interface { Start(ctx context.Context, shardc chan *kinesis.Shard) GetCheckpoint(streamName, shardID string) (string, error) SetCheckpoint(streamName, shardID, sequenceNumber string) error }
Group interface used to manage which shard to process
type Option ¶
type Option func(*Consumer)
Option is used to override defaults when creating a new Consumer
func WithClient ¶
func WithClient(client kinesisiface.KinesisAPI) Option
WithClient overrides the default client
func WithMaxRecords ¶
WithMaxRecords overrides the maximum number of records to be returned in a single GetRecords call for the consumer (specify a value of up to 10,000)
func WithScanInterval ¶
WithScanInterval overrides the scan interval for the consumer
func WithShardClosedHandler ¶
func WithShardClosedHandler(h ShardClosedHandler) Option
func WithShardIteratorType ¶
WithShardIteratorType overrides the starting point for the consumer
func WithTimestamp ¶
WithTimestamp overrides the starting point for the consumer
type Record ¶
Record wraps the record returned from the Kinesis library and extends to include the shard id.
type ShardClosedHandler ¶
ShardClosedHandler is a handler that will be called when the consumer has reached the end of a closed shard. No more records for that shard will be provided by the consumer. An error can be returned to stop the consumer.