Documentation ¶
Index ¶
- Variables
- type AllGroup
- type BatchRecord
- type Consumer
- func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error
- func (c *Consumer) ScanBatch(ctx context.Context, fn ScanBatchFunc, ec chan<- error)
- func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error
- func (c *Consumer) ScanShardBatch(ctx context.Context, shardID string, fn ScanBatchFunc) error
- type Counter
- type Group
- type Logger
- type Option
- func WithAggregation(a bool) Option
- func WithClient(client kinesisClient) Option
- func WithCounter(counter Counter) Option
- func WithGroup(group Group) Option
- func WithLogger(logger Logger) Option
- func WithMaxRecords(n int64) Option
- func WithScanInterval(d time.Duration) Option
- func WithShardClosedHandler(h ShardClosedHandler) Option
- func WithShardIteratorType(t string) Option
- func WithStore(store Store) Option
- func WithTimestamp(t time.Time) Option
- type Record
- type ScanBatchFunc
- type ScanFunc
- type ShardClosedHandler
- type Store
Constants ¶
This section is empty.
Variables ¶
var ErrSkipCheckpoint = errors.New("skip checkpoint")
ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that the current checkpoint should be skipped skipped. It is not returned as an error by any function.
Functions ¶
This section is empty.
Types ¶
type AllGroup ¶ added in v0.4.0
type AllGroup struct { Store // contains filtered or unexported fields }
AllGroup is used to consume all shards from a single consumer. It caches a local list of the shards we are already processing and routinely polls the stream looking for new shards to process.
func NewAllGroup ¶ added in v0.4.0
NewAllGroup returns an intitialized AllGroup for consuming all shards on a stream
type BatchRecord ¶ added in v0.4.0
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer wraps the interaction with the Kinesis stream
func New ¶ added in v0.2.0
New creates a kinesis consumer with default settings. Use Option to override any of the optional attributes.
func (*Consumer) Scan ¶ added in v0.2.0
Scan launches a goroutine to process each of the shards in the stream. The ScanFunc is passed through to each of the goroutines and called with each message pulled from the stream.
func (*Consumer) ScanBatch ¶ added in v0.4.0
func (c *Consumer) ScanBatch(ctx context.Context, fn ScanBatchFunc, ec chan<- error)
func (*Consumer) ScanShard ¶ added in v0.2.0
ScanShard loops over records on a specific shard, calls the callback func for each record and checkpoints the progress of scan.
func (*Consumer) ScanShardBatch ¶ added in v0.4.0
type Counter ¶ added in v0.2.0
Counter interface is used for exposing basic metrics from the scanner
type Group ¶ added in v0.4.0
type Group interface { Start(ctx context.Context, shardc chan types.Shard) GetCheckpoint(streamName, shardID string) (string, error) SetCheckpoint(streamName, shardID, sequenceNumber string) error }
Group interface used to manage which shard to process
type Logger ¶ added in v0.2.0
type Logger interface {
Log(...interface{})
}
A Logger is a minimal interface to as a adaptor for external logging library to consumer
type Option ¶ added in v0.2.0
type Option func(*Consumer)
Option is used to override defaults when creating a new Consumer
func WithAggregation ¶ added in v0.4.0
func WithClient ¶ added in v0.2.0
func WithClient(client kinesisClient) Option
WithClient overrides the default client
func WithCounter ¶ added in v0.2.0
WithCounter overrides the default counter
func WithLogger ¶ added in v0.2.0
WithLogger overrides the default logger
func WithMaxRecords ¶ added in v0.4.0
WithMaxRecords overrides the maximum number of records to be returned in a single GetRecords call for the consumer (specify a value of up to 10,000)
func WithScanInterval ¶ added in v0.4.0
WithScanInterval overrides the scan interval for the consumer
func WithShardClosedHandler ¶ added in v0.4.0
func WithShardClosedHandler(h ShardClosedHandler) Option
func WithShardIteratorType ¶ added in v0.3.1
WithShardIteratorType overrides the starting point for the consumer
func WithTimestamp ¶ added in v0.4.0
WithTimestamp overrides the starting point for the consumer
type Record ¶ added in v0.2.0
Record wraps the record returned from the Kinesis library and extends to include the shard id.
type ScanBatchFunc ¶ added in v0.4.0
type ScanBatchFunc func(*BatchRecord) error
add an additional type for scanning multiple records at once
type ScanFunc ¶ added in v0.4.0
ScanFunc is the type of the function called for each message read from the stream. The record argument contains the original record returned from the AWS Kinesis library. If an error is returned, scanning stops. The sole exception is when the function returns the special value ErrSkipCheckpoint.
type ShardClosedHandler ¶ added in v0.4.0
ShardClosedHandler is a handler that will be called when the consumer has reached the end of a closed shard. No more records for that shard will be provided by the consumer. An error can be returned to stop the consumer.