Documentation ¶
Index ¶
- Constants
- Variables
- func KinesisAPI(api kinesisiface.KinesisAPI) (fn func(*KinesisConsumer) error)
- type Checkpointer
- type DynamoCheckpoint
- func (c *DynamoCheckpoint) CheckpointSequence(shard *shardStatus) error
- func (c *DynamoCheckpoint) ClaimShard(shard *shardStatus, claimID string) error
- func (c *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error
- func (c *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo string) error
- func (c *DynamoCheckpoint) Init() error
- func (c *DynamoCheckpoint) ListActiveWorkers() (map[string][]string, error)
- type KinesisConsumer
- type KinesisConsumerConfig
- type MonitoringConfiguration
- type RecordConsumer
- type Records
- type Worker
Examples ¶
Constants ¶
const ( // ErrLeaseNotAquired is returned when we failed to get a lock on the shard ErrLeaseNotAquired = "Lease is already held by another node" // ErrInvalidDynamoDBSchema is returned when there are one or more fields missing from the table ErrInvalidDynamoDBSchema = "The DynamoDB schema is invalid and may need to be re-created" )
const ( // ErrCodeKMSThrottlingException is defined in the API Reference https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.GetRecords // But it's not a constant? ErrCodeKMSThrottlingException = "KMSThrottlingException" )
Variables ¶
var ErrSequenceIDNotFound = errors.New("SequenceIDNotFoundForShard")
ErrSequenceIDNotFound is returned by FetchCheckpoint when no SequenceID is found
Functions ¶
func KinesisAPI ¶
func KinesisAPI(api kinesisiface.KinesisAPI) (fn func(*KinesisConsumer) error)
KinesisAPI is used as option in NewKinesisConsumer
Types ¶
type Checkpointer ¶
type Checkpointer interface { Init() error GetLease(*shardStatus, string) error CheckpointSequence(*shardStatus) error FetchCheckpoint(*shardStatus) error ListActiveWorkers() (map[string][]string, error) ClaimShard(*shardStatus, string) error }
Checkpointer handles checkpointing when a record has been processed
type DynamoCheckpoint ¶
type DynamoCheckpoint struct { TableName string LeaseDuration int Retries int ReadCapacityUnits *int64 WriteCapacityUnits *int64 BillingMode *string Session *session.Session // contains filtered or unexported fields }
DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
func (*DynamoCheckpoint) CheckpointSequence ¶
func (c *DynamoCheckpoint) CheckpointSequence(shard *shardStatus) error
CheckpointSequence writes a checkpoint at the designated sequence ID
func (*DynamoCheckpoint) ClaimShard ¶
func (c *DynamoCheckpoint) ClaimShard(shard *shardStatus, claimID string) error
func (*DynamoCheckpoint) FetchCheckpoint ¶
func (c *DynamoCheckpoint) FetchCheckpoint(shard *shardStatus) error
FetchCheckpoint retrieves the checkpoint for the given shard
func (*DynamoCheckpoint) GetLease ¶
func (c *DynamoCheckpoint) GetLease(shard *shardStatus, newAssignTo string) error
GetLease attempts to gain a lock on the given shard
func (*DynamoCheckpoint) Init ¶
func (c *DynamoCheckpoint) Init() error
Init initialises the DynamoDB Checkpoint
func (*DynamoCheckpoint) ListActiveWorkers ¶
func (c *DynamoCheckpoint) ListActiveWorkers() (map[string][]string, error)
type KinesisConsumer ¶
type KinesisConsumer struct { KinesisConsumerConfig sync.WaitGroup // contains filtered or unexported fields }
KinesisConsumer contains all the configuration and functions necessary to start the Kinesis Consumer
func NewKinesisConsumer ¶
func NewKinesisConsumer(config KinesisConsumerConfig, options ...func(*KinesisConsumer) error) (kc *KinesisConsumer, err error)
NewKinesisConsumer is constructor for KinesisConsumer
func (*KinesisConsumer) Checkpoint ¶
func (kc *KinesisConsumer) Checkpoint(shardID string, sequenceNumber string) error
Checkpoint records the sequence number for the given shard ID as being processed
func (*KinesisConsumer) Shutdown ¶
func (kc *KinesisConsumer) Shutdown()
Shutdown stops consuming records gracefully
func (*KinesisConsumer) StartConsumer ¶
func (kc *KinesisConsumer) StartConsumer() error
StartConsumer starts the RecordConsumer, calls Init and starts sending records to ProcessRecords
type KinesisConsumerConfig ¶
type KinesisConsumerConfig struct { StreamName string ShardIteratorType string RecordConsumer RecordConsumer EmptyRecordBackoffMs int LeaseDuration int Monitoring MonitoringConfiguration DisableAutomaticCheckpoints bool Retries *int IgnoreShardOrdering bool TableName string DynamoReadCapacityUnits *int64 DynamoWriteCapacityUnits *int64 DynamoBillingMode *string Session *session.Session // Setting session means Retries is ignored }
type MonitoringConfiguration ¶
type MonitoringConfiguration struct { MonitoringService string // Type of monitoring to expose. Supported types are "prometheus" Prometheus prometheusMonitoringService CloudWatch cloudWatchMonitoringService // contains filtered or unexported fields }
MonitoringConfiguration allows you to configure how record processing metrics are exposed
type RecordConsumer ¶
type RecordConsumer interface { Init(string) error ProcessRecords([]*Records, *KinesisConsumer) Shutdown() }
RecordConsumer is the interface consumers will implement
Example ¶
package main import ( "fmt" "time" ) type PrintRecordConsumer struct { shardID string } func (p *PrintRecordConsumer) Init(shardID string) error { fmt.Printf("Checkpointer initializing\n") p.shardID = shardID return nil } func (p *PrintRecordConsumer) ProcessRecords(records []*Records, consumer *KinesisConsumer) { if len(records) > 0 { fmt.Printf("%s\n", records[0].Data) } } func (p *PrintRecordConsumer) Shutdown() { fmt.Print("PrintRecordConsumer Shutdown\n") } func main() { // An implementation of the RecordConsumer interface that prints out records rc := &PrintRecordConsumer{} kc := &KinesisConsumer{ StreamName: "KINESIS_STREAM", ShardIteratorType: "TRIM_HORIZON", RecordConsumer: rc, TableName: "gokini", EmptyRecordBackoffMs: 1000, } // Send records to our kinesis stream so we have something to process pushRecordToKinesis("KINESIS_STREAM", []byte("foo"), true) defer deleteStream("KINESIS_STREAM") defer deleteTable("gokini") err := kc.StartConsumer() if err != nil { fmt.Printf("Failed to start consumer: %s", err) } // Wait for it to do it's thing time.Sleep(200 * time.Millisecond) kc.Shutdown() }
Output: Checkpointer initializing foo PrintRecordConsumer Shutdown