Documentation ¶
Index ¶
- Variables
- type Config
- func (c Config) WithBufferSize(bufferSize int) Config
- func (c Config) WithCommitFrequency(commitFrequency time.Duration) Config
- func (c Config) WithDynamoReadCapacity(readCapacity int64) Config
- func (c Config) WithDynamoWaiterDelay(delay time.Duration) Config
- func (c Config) WithDynamoWriteCapacity(writeCapacity int64) Config
- func (c Config) WithLeaderActionFrequency(leaderActionFrequency time.Duration) Config
- func (c Config) WithShardCheckFrequency(shardCheckFrequency time.Duration) Config
- func (c Config) WithThrottleDelay(delay time.Duration) Config
- type Kinsumer
- func New(streamName, applicationName, clientName string, config Config, role string, ...) (*Kinsumer, error)
- func NewWithInterfaces(kinesisAPI kinesisiface.KinesisAPI, dynamodb dynamodbiface.DynamoDBAPI, ...) (*Kinsumer, error)
- func NewWithSession(session *session.Session, streamName, applicationName, clientName string, ...) (*Kinsumer, error)
Constants ¶
This section is empty.
Variables ¶
var ( // ErrRunTwice - Run() can only ever be run once ErrRunTwice = errors.New("run() can only ever be run once") // ErrNoKinesisInterface - Need a kinesis instance ErrNoKinesisInterface = errors.New("need a kinesis instance") // ErrNoDynamoInterface - Need a dynamodb instance ErrNoDynamoInterface = errors.New("need a dynamodb instance") // ErrNoStreamName - Need a kinesis stream name ErrNoStreamName = errors.New("need a kinesis stream name") // ErrNoApplicationName - Need an application name for the dynamo table names ErrNoApplicationName = errors.New("need an application name for the dynamo table names") // ErrThisClientNotInDynamo - Unable to find this client in the client list ErrThisClientNotInDynamo = errors.New("unable to find this client in the client list") // ErrNoShardsAssigned - We found shards, but got assigned none ErrNoShardsAssigned = errors.New("we found shards, but got assigned none") // ErrConfigInvalidThrottleDelay - ThrottleDelay config value must be at least 200ms ErrConfigInvalidThrottleDelay = errors.New("throttleDelay config value must be at least 200ms (preferably 250ms)") // ErrConfigInvalidCommitFrequency - CommitFrequency config value is mandatory ErrConfigInvalidCommitFrequency = errors.New("commitFrequency config value is mandatory") // ErrConfigInvalidShardCheckFrequency - ShardCheckFrequency config value is mandatory ErrConfigInvalidShardCheckFrequency = errors.New("shardCheckFrequency config value is mandatory") // ErrConfigInvalidLeaderActionFrequency - LeaderActionFrequency config value is mandatory ErrConfigInvalidLeaderActionFrequency = errors.New("leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency") // ErrConfigInvalidBufferSize - BufferSize config value is mandatory ErrConfigInvalidBufferSize = errors.New("bufferSize config value is mandatory") // ErrConfigInvalidDynamoCapacity - Dynamo read/write capacity cannot be 0 ErrConfigInvalidDynamoCapacity = errors.New("dynamo read/write capacity cannot be 0") // ErrStreamBusy - Stream is busy ErrStreamBusy = errors.New("stream is busy") // ErrNoSuchStream - No such stream ErrNoSuchStream = errors.New("no such stream") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config holds all configuration values for a single Kinsumer instance
func (Config) WithBufferSize ¶
WithBufferSize returns a Config with a modified buffer size
func (Config) WithCommitFrequency ¶
WithCommitFrequency returns a Config with a modified commit frequency
func (Config) WithDynamoReadCapacity ¶
WithDynamoReadCapacity returns a Config with a modified dynamo read capacity
func (Config) WithDynamoWaiterDelay ¶
WithDynamoWaiterDelay returns a Config with a modified dynamo waiter delay
func (Config) WithDynamoWriteCapacity ¶
WithDynamoWriteCapacity returns a Config with a modified dynamo write capacity
func (Config) WithLeaderActionFrequency ¶
WithLeaderActionFrequency returns a Config with a modified leader action frequency
func (Config) WithShardCheckFrequency ¶
WithShardCheckFrequency returns a Config with a modified shard check frequency
type Kinsumer ¶
type Kinsumer struct {
// contains filtered or unexported fields
}
Kinsumer is a Kinesis Consumer that tries to reduce duplicate reads while allowing for multiple clients each processing multiple shards
func New ¶
func New(streamName, applicationName, clientName string, config Config, role string, consumerARNvalue string) (*Kinsumer, error)
New returns a Kinsumer Interface with default kinesis and dynamodb instances, to be used in ec2 instances to get default auth and config
func NewWithInterfaces ¶
func NewWithInterfaces(kinesisAPI kinesisiface.KinesisAPI, dynamodb dynamodbiface.DynamoDBAPI, s3manager s3manageriface.UploaderAPI, streamName, applicationName, clientName string, config Config, role string, consumerARNvalue string) (*Kinsumer, error)
NewWithInterfaces allows you to override the Kinesis and Dynamo instances for mocking or using a local set of servers
func NewWithSession ¶
func NewWithSession(session *session.Session, streamName, applicationName, clientName string, config Config, role string, consumerARNvalue string) (*Kinsumer, error)
NewWithSession should be used if you want to override the Kinesis and Dynamo instances with a non-default aws session
func (*Kinsumer) CreateRequiredTables ¶
CreateRequiredTables will create the required dynamodb tables based on the applicationName
func (*Kinsumer) DeleteTables ¶
DeleteTables will delete the dynamodb tables that were created based on the applicationName
func (*Kinsumer) GetClientName ¶
func (*Kinsumer) Run ¶
Run runs the main kinesis consumer process. This is a non-blocking call, use Stop() to force it to return. This goroutine is responsible for startin/stopping consumers, aggregating all consumers' records, updating checkpointers as records are consumed, and refreshing our shard/client list and leadership TODO: Can we unit test this at all?