Documentation ¶
Index ¶
Constants ¶
const ( ShardIteratorAfterSequenceNumber = "AFTER_SEQUENCE_NUMBER" ShardIteratorAtSequenceNumber = "AT_SEQUENCE_NUMBER" ShardIteratorAtTimestamp = "AT_TIMESTAMP" ShardIteratorLatest = "LATEST" ShardIteratorTrimHorizon = "TRIM_HORIZON" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Buffer ¶
type Buffer struct { MaxRecordCount int // contains filtered or unexported fields }
Buffer holds records and answers questions on when it should be periodically flushed.
func (*Buffer) FirstSeq ¶
FirstSequenceNumber returns the sequence number of the first record in the buffer.
func (*Buffer) Flush ¶
func (b *Buffer) Flush()
Flush empties the buffer and resets the sequence counter.
func (*Buffer) GetRecords ¶
GetRecords returns the records in the buffer.
func (*Buffer) RecordCount ¶
RecordCount returns the number of records in the buffer.
func (*Buffer) ShouldFlush ¶
ShouldFlush determines if the buffer has reached its target size.
type Checkpoint ¶
type Checkpoint interface { CheckpointExists(string) bool SequenceNumber(string) string SetCheckpoint(string, string) }
Checkpoint interface for functions that checkpoints need to implement in order to track consumer progress.
type Config ¶
type Config struct { // AppName is the application name and checkpoint namespace. AppName string // StreamName is the Kinesis stream. StreamName string // FlushInterval is a regular interval for flushing the buffer. Defaults to 1s. FlushInterval time.Duration // BufferSize determines the batch request size. Must not exceed 500. Defaults to 500. BufferSize int // MaxRetries sets the Kinesis client's retry limit. Deafults to 10. MaxRetries int // Logger is the logger used. Defaults to log.Log. Logger log.Interface // Checkpoint for tracking progress of consumer. Checkpoint Checkpoint // Shard Iterator if not checkpoint ShardIteratorType string // URL for Redis Checkpoint for tracking progress of consumer. Defaults to 127.0.0.1:6379 RedisURL string // URL for Kinesis, used for local development. Defaults to empty and is discovered by the Kinesis client. KinesisURL string }
Config vars for the application
type Consumer ¶
type Consumer struct { Config // contains filtered or unexported fields }
func NewConsumer ¶
NewConsumer creates a new consumer with initialied kinesis connection
type HandlerFunc ¶
type HandlerFunc func(b Buffer)
HandlerFunc is a convenience type to avoid having to declare a struct to implement the Handler interface, it can be used like this:
consumer.AddHandler(connector.HandlerFunc(func(b Buffer) { // ... }))
func (HandlerFunc) HandleRecords ¶
func (h HandlerFunc) HandleRecords(b Buffer)
HandleRecords implements the Handler interface
type RedisCheckpoint ¶
type RedisCheckpoint struct { AppName string StreamName string // contains filtered or unexported fields }
RedisCheckpoint implements the Checkpont interface. Used to enable the Pipeline.ProcessShard to checkpoint it's progress while reading records from Kinesis stream.
func (*RedisCheckpoint) CheckpointExists ¶
func (c *RedisCheckpoint) CheckpointExists(shardID string) bool
CheckpointExists determines if a checkpoint for a particular Shard exists. Typically used to determine whether we should start processing the shard with TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
func (*RedisCheckpoint) SequenceNumber ¶
func (c *RedisCheckpoint) SequenceNumber(shardID string) string
SequenceNumber returns the current checkpoint stored for the specified shard.
func (*RedisCheckpoint) SetCheckpoint ¶
func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string)
SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). Upon failover, record processing is resumed from this point.