consumer

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2018 License: MIT Imports: 9 Imported by: 0

README

Golang Kinesis Consumer

Kinesis consumer applications written in Go. This library is intended to be a lightweight wrapper around the Kinesis API to read records, save checkpoints (with swappable backends), and gracefully recover from service timeouts/errors.

Alternate serverless options:

Installation

Get the package source:

$ go get github.com/harlow/kinesis-consumer

Overview

The consumer leverages a handler func that accepts a Kinesis record. The Scan method will consume all shards concurrently and call the callback func as it receives records from the stream.

Important: The default Log, Counter, and Checkpoint are no-op which means no logs, counts, or checkpoints will be emitted when scanning the stream. See the options below to override these defaults.

import(
	// ...

	consumer "github.com/harlow/kinesis-consumer"
)

func main() {
	var stream = flag.String("stream", "", "Stream name")
	flag.Parse()

	// consumer
	c, err := consumer.New(*stream)
	if err != nil {
		log.Fatalf("consumer error: %v", err)
	}

	// start scan
	err = c.Scan(context.TODO(), func(r *consumer.Record) consumer.ScanStatus {
		fmt.Println(string(r.Data))

		return consumer.ScanStatus{
			StopScan:       false,  // true to stop scan
			SkipCheckpoint: false,  // true to skip checkpoint
		}
	})
	if err != nil {
		log.Fatalf("scan error: %v", err)
	}

	// Note: If you need to aggregate based on a specific shard the `ScanShard`
	// method should be leverged instead.
}

Scan status

The scan func returns a consumer.ScanStatus the struct allows some basic flow control.

// continue scanning
return consumer.ScanStatus{}

// continue scanning, skip saving checkpoint
return consumer.ScanStatus{SkipCheckpoint: true}

// stop scanning, return nil
return consumer.ScanStatus{StopScan: true}

// stop scanning, return error
return consumer.ScanStatus{Error: err}

Checkpoint

To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. The boolean value SkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback.

This will allow consumers to re-launch and pick up at the position in the stream where they left off.

The uniq identifier for a consumer is [appName, streamName, shardID]

kinesis-checkpoints

Note: The default checkpoint is no-op. Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started.

To persist scan progress choose one of the following checkpoints:

Redis Checkpoint

The Redis checkpoint requries App Name, and Stream Name:

import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/redis"

// redis checkpoint
ck, err := checkpoint.New(appName)
if err != nil {
	log.Fatalf("new checkpoint error: %v", err)
}
DynamoDB Checkpoint

The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:

import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/ddb"

// ddb checkpoint
ck, err := checkpoint.New(appName, tableName)
if err != nil {
	log.Fatalf("new checkpoint error: %v", err)
}

// Override the Kinesis if any needs on session (e.g. assume role)
myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()))

// For versions of AWS sdk that fixed config being picked up properly, the example of
// setting region should work.
//    myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()), &aws.Config{
//        Region: aws.String("us-west-2"),
//    })

ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient))
if err != nil {
  log.Fatalf("new checkpoint error: %v", err)
}

// Or we can provide your own Retryer to customize what triggers a retry inside checkpoint
// See code in examples
// ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{}))

To leverage the DDB checkpoint we'll also need to create a table:

Partition key: namespace
Sort key: shard_id
screen shot 2017-11-22 at 7 59 36 pm
Postgres Checkpoint

The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString:

import checkpoint "github.com/harlow/kinesis-consumer/checkpoint/postgres"

// postgres checkpoint
ck, err := checkpoint.New(app, table, connStr)
if err != nil {
  log.Fatalf("new checkpoint error: %v", err)
}

To leverage the Postgres checkpoint we'll also need to create a table:

CREATE TABLE kinesis_consumer (
	namespace text NOT NULL,
	shard_id text NOT NULL,
	sequence_number numeric NOT NULL,
	CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id)
);

The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.

Options

The consumer allows the following optional overrides.

Kinesis Client

Override the Kinesis client if there is any special config needed:

// client
client := kinesis.New(session.New(aws.NewConfig()))

// consumer
c, err := consumer.New(streamName, consumer.WithClient(client))
Metrics

Add optional counter for exposing counts for checkpoints and records processed:

// counter
counter := expvar.NewMap("counters")

// consumer
c, err := consumer.New(streamName, consumer.WithCounter(counter))

The expvar package will display consumer counts:

"counters": {
    "checkpoints": 3,
    "records": 13005
},
Logging

Logging supports the basic built-in logging library or use thrid party external one, so long as it implements the Logger interface.

For example, to use the builtin logging package, we wrap it with myLogger structure.

// A myLogger provides a minimalistic logger satisfying the Logger interface.
type myLogger struct {
	logger *log.Logger
}

// Log logs the parameters to the stdlib logger. See log.Println.
func (l *myLogger) Log(args ...interface{}) {
	l.logger.Println(args...)
}

The package defaults to ioutil.Discard so swallow all logs. This can be customized with the preferred logging strategy:

// logger
log := &myLogger{
	logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags)
}

// consumer
c, err := consumer.New(streamName, consumer.WithLogger(logger))

To use a more complicated logging library, e.g. apex log

type myLogger struct {
	logger *log.Logger
}

func (l *myLogger) Log(args ...interface{}) {
	l.logger.Infof("producer", args...)
}

func main() {
	log := &myLogger{
		logger: alog.Logger{
			Handler: text.New(os.Stderr),
			Level:   alog.DebugLevel,
		},
	}

Contributing

Please see CONTRIBUTING.md for more information. Thank you, contributors!

License

Copyright (c) 2015 Harlow Ward. It is free software, and may be redistributed under the terms specified in the LICENSE file.

www.hward.com  ·  GitHub @harlow  ·  Twitter @harlow_ward

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint interface {
	Get(streamName, shardID string) (string, error)
	Set(streamName, shardID, sequenceNumber string) error
}

Checkpoint interface used track consumer progress in the stream

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer wraps the interaction with the Kinesis stream

func New added in v0.2.0

func New(streamName string, opts ...Option) (*Consumer, error)

New creates a kinesis consumer with default settings. Use Option to override any of the optional attributes.

func (*Consumer) Scan added in v0.2.0

func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanStatus) error

Scan scans each of the shards of the stream, calls the callback func with each of the kinesis records.

func (*Consumer) ScanShard added in v0.2.0

func (c *Consumer) ScanShard(
	ctx context.Context,
	shardID string,
	fn func(*Record) ScanStatus,
) error

ScanShard loops over records on a specific shard, calls the callback func for each record and checkpoints the progress of scan.

type Counter added in v0.2.0

type Counter interface {
	Add(string, int64)
}

Counter interface is used for exposing basic metrics from the scanner

type Logger added in v0.2.0

type Logger interface {
	Log(...interface{})
}

A Logger is a minimal interface to as a adaptor for external logging library to consumer

type Option added in v0.2.0

type Option func(*Consumer)

Option is used to override defaults when creating a new Consumer

func WithCheckpoint added in v0.2.0

func WithCheckpoint(checkpoint Checkpoint) Option

WithCheckpoint overrides the default checkpoint

func WithClient added in v0.2.0

func WithClient(client kinesisiface.KinesisAPI) Option

WithClient overrides the default client

func WithCounter added in v0.2.0

func WithCounter(counter Counter) Option

WithCounter overrides the default counter

func WithLogger added in v0.2.0

func WithLogger(logger Logger) Option

WithLogger overrides the default logger

type Record added in v0.2.0

type Record = kinesis.Record

Record is an alias of record returned from kinesis library

type ScanStatus added in v0.3.0

type ScanStatus struct {
	Error          error
	StopScan       bool
	SkipCheckpoint bool
}

ScanStatus signals the consumer if we should continue scanning for next record and whether to checkpoint.

Directories

Path Synopsis
checkpoint
ddb
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL