Documentation ¶
Index ¶
- Variables
- type AllGroup
- type Consumer
- func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) (initializedScanChan <-chan bool, errChan <-chan error)
- func (c *Consumer) ScanShard(ctx context.Context, shardID string, initializedScanChan chan bool, ...) error
- func (c *Consumer) SubscribeShard(ctx context.Context, consumerARN, shardID string, ...) error
- type Counter
- type ErrorValue
- type Group
- type KeyValueString
- type LogFunc
- type LogState
- type Logger
- type Option
- func WithAggregation(a bool) Option
- func WithClient(client kinesisiface.KinesisAPI) Option
- func WithConsumerARN(a string) Option
- func WithConsumerName(a string) Option
- func WithCounter(counter Counter) Option
- func WithGroup(group Group) Option
- func WithLogger(logger Logger) Option
- func WithMaxRecords(n int64) Option
- func WithRefreshSubscribeInterval(d time.Duration) Option
- func WithRetrySubscribeInterval(d time.Duration) Option
- func WithScanInterval(d time.Duration) Option
- func WithScanMethod(a ScanMethod) Option
- func WithShardClosedHandler(h ShardClosedHandler) Option
- func WithShardIteratorType(t string) Option
- func WithStore(store Store) Option
- func WithTimestamp(t time.Time) Option
- type Record
- type ScanFunc
- type ScanMethod
- type ShardClosedHandler
- type Store
Constants ¶
This section is empty.
Variables ¶
var ( LogConsumer LogFunc = "CONSUMER" LogGroup LogFunc = "GROUP" LogError LogState = "ERROR" LogWarn LogState = "WARNING" LogInfo LogState = "INFO" LogDebug LogState = "DEBUG" )
var ErrSkipCheckpoint = errors.New("skip checkpoint")
ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that the current checkpoint should be skipped skipped. It is not returned as an error by any function.
Functions ¶
This section is empty.
Types ¶
type AllGroup ¶
type AllGroup struct { Store // contains filtered or unexported fields }
AllGroup is used to consume all shards from a single consumer. It caches a local list of the shards we are already processing and routinely polls the stream looking for new shards to process.
func NewAllGroup ¶
func NewAllGroup(ksis kinesisiface.KinesisAPI, store Store, streamName string, logger Logger) *AllGroup
NewAllGroup returns an intitialized AllGroup for consuming all shards on a stream
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer wraps the interaction with the Kinesis stream
func New ¶
New creates a kinesis consumer with default settings. Use Option to override any of the optional attributes.
func (*Consumer) Scan ¶
func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) (initializedScanChan <-chan bool, errChan <-chan error)
Scan launches a goroutine to process each of the shards in the stream. The ScanFunc is passed through to each of the goroutines and called with each message pulled from the stream.
type ErrorValue ¶
type ErrorValue struct {
Err error
}
func Error ¶
func Error(err error) *ErrorValue
func (*ErrorValue) Error ¶
func (l *ErrorValue) Error() string
func (*ErrorValue) String ¶
func (l *ErrorValue) String() string
type Group ¶
type Group interface { Start(ctx context.Context, shardc chan *kinesis.Shard) GetCheckpoint(streamName, shardID string) (string, error) SetCheckpoint(streamName, shardID, sequenceNumber string) error }
Group interface used to manage which shard to process
type KeyValueString ¶
func LogString ¶
func LogString(key, value string) *KeyValueString
func (*KeyValueString) String ¶
func (l *KeyValueString) String() string
type Logger ¶
type Logger interface {
Log(...interface{})
}
A Logger is a minimal interface to as a adaptor for external logging library to consumer
type Option ¶
type Option func(*Consumer)
Option is used to override defaults when creating a new Consumer
func WithAggregation ¶
func WithClient ¶
func WithClient(client kinesisiface.KinesisAPI) Option
WithClient overrides the default client
func WithConsumerName ¶
WithConsumerName set the consumer name
func WithCounter ¶
WithCounter overrides the default counter
func WithMaxRecords ¶
WithMaxRecords overrides the maximum number of records to be returned in a single GetRecords call for the consumer (specify a value of up to 10,000)
func WithRefreshSubscribeInterval ¶
WithRefreshSubscribeInterval overrides the subscribe refreshing interval for the consumer
func WithRetrySubscribeInterval ¶
WithRetrySubscribeInterval overrides the subscribe API recall interval
func WithScanInterval ¶
WithScanInterval overrides the scan interval for the consumer
func WithScanMethod ¶
func WithScanMethod(a ScanMethod) Option
WithScanMethod overrides the scan method
func WithShardClosedHandler ¶
func WithShardClosedHandler(h ShardClosedHandler) Option
func WithShardIteratorType ¶
WithShardIteratorType overrides the starting point for the consumer
func WithTimestamp ¶
WithTimestamp overrides the starting point for the consumer
type Record ¶
Record wraps the record returned from the Kinesis library and extends to include the shard id.
type ScanFunc ¶
ScanFunc is the type of the function called for each message read from the stream. The record argument contains the original record returned from the AWS Kinesis library. If an error is returned, scanning stops. The sole exception is when the function returns the special value ErrSkipCheckpoint.
type ScanMethod ¶
type ScanMethod string
ScanMethod is scanning type.
var ( ScanWithGetRecords ScanMethod = "GetRecords" // scan using GetRecords ScanWithSubscribeToShard ScanMethod = "SubscribeToShard" // scan using SubscribeToShard )
type ShardClosedHandler ¶
ShardClosedHandler is a handler that will be called when the consumer has reached the end of a closed shard. No more records for that shard will be provided by the consumer. An error can be returned to stop the consumer.