consumer

package module
v0.3.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2024 License: MIT Imports: 14 Imported by: 0

README

Golang Kinesis Consumer

technology Go Build Status GoDoc GoReportCard

Kinesis consumer applications written in Go. This library is intended to be a lightweight wrapper around the Kinesis API to read records, save checkpoints (with swappable backends), and gracefully recover from service timeouts/errors.

Alternate serverless options:

Installation

Get the package source:

$ go get github.com/harlow/kinesis-consumer

Note: This repo now requires the AWS SDK V2 package. If you are still using AWS SDK V1 then use: https://github.com/harlow/kinesis-consumer/releases/tag/v0.3.5

Overview

The consumer leverages a handler func that accepts a Kinesis record. The Scan method will consume all shards concurrently and call the callback func as it receives records from the stream.

Important 1: The Scan func will also poll the stream to check for new shards, it will automatically start consuming new shards added to the stream.

Important 2: The default Log, Counter, and Checkpoint are no-op which means no logs, counts, or checkpoints will be emitted when scanning the stream. See the options below to override these defaults.

import(
	// ...

	consumer "github.com/harlow/kinesis-consumer"
)

func main() {
	var stream = flag.String("stream", "", "Stream name")
	flag.Parse()

	// consumer
	c, err := consumer.New(*stream)
	if err != nil {
		log.Fatalf("consumer error: %v", err)
	}

	// start scan
	err = c.Scan(context.TODO(), func(r *consumer.Record) error {
		fmt.Println(string(r.Data))
		return nil // continue scanning
	})
	if err != nil {
		log.Fatalf("scan error: %v", err)
	}

	// Note: If you need to aggregate based on a specific shard
	// the `ScanShard` function should be used instead.
}

ScanFunc

ScanFunc is the type of the function called for each message read from the stream. The record argument contains the original record returned from the AWS Kinesis library.

type ScanFunc func(r *Record) error

If an error is returned, scanning stops. The sole exception is when the function returns the special value SkipCheckpoint.

// continue scanning
return nil

// continue scanning, skip checkpoint
return consumer.SkipCheckpoint

// stop scanning, return error
return errors.New("my error, exit all scans")

Use context cancel to signal the scan to exit without error. For example if we wanted to gracefully exit the scan on interrupt.

// trap SIGINT, wait to trigger shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

// context with cancel
ctx, cancel := context.WithCancel(context.Background())

go func() {
	<-signals
	cancel() // call cancellation
}()

err := c.Scan(ctx, func(r *consumer.Record) error {
	fmt.Println(string(r.Data))
	return nil // continue scanning
})

Options

The consumer allows the following optional overrides.

Store

To record the progress of the consumer in the stream (checkpoint) we use a storage layer to persist the last sequence number the consumer has read from a particular shard. The boolean value ErrSkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback.

This will allow consumers to re-launch and pick up at the position in the stream where they left off.

The uniq identifier for a consumer is [appName, streamName, shardID]

kinesis-checkpoints

Note: The default storage is in-memory (no-op). Which means the scan will not persist any state and the consumer will start from the beginning of the stream each time it is re-started.

The consumer accepts a WithStore option to set the storage layer:

c, err := consumer.New(*stream, consumer.WithStore(db))
if err != nil {
	log.Log("consumer error: %v", err)
}

To persist scan progress choose one of the following storage layers:

Redis

The Redis checkpoint requires App Name, and Stream Name:

import store "github.com/harlow/kinesis-consumer/store/redis"

// redis checkpoint
db, err := store.New(appName)
if err != nil {
	log.Fatalf("new checkpoint error: %v", err)
}
DynamoDB

The DynamoDB checkpoint requires Table Name, App Name, and Stream Name:

import store "github.com/harlow/kinesis-consumer/store/ddb"

// ddb checkpoint
db, err := store.New(appName, tableName)
if err != nil {
	log.Fatalf("new checkpoint error: %v", err)
}

// Override the Kinesis if any needs on session (e.g. assume role)
myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()))

// For versions of AWS sdk that fixed config being picked up properly, the example of
// setting region should work.
//    myDynamoDbClient := dynamodb.New(session.New(aws.NewConfig()), &aws.Config{
//        Region: aws.String("us-west-2"),
//    })

db, err := store.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient))
if err != nil {
  log.Fatalf("new checkpoint error: %v", err)
}

// Or we can provide your own Retryer to customize what triggers a retry inside checkpoint
// See code in examples
// ck, err := checkpoint.New(*app, *table, checkpoint.WithDynamoClient(myDynamoDbClient), checkpoint.WithRetryer(&MyRetryer{}))

To leverage the DDB checkpoint we'll also need to create a table:

Partition key: namespace
Sort key: shard_id
screen shot 2017-11-22 at 7 59 36 pm
Postgres

The Postgres checkpoint requires Table Name, App Name, Stream Name and ConnectionString:

import store "github.com/harlow/kinesis-consumer/store/postgres"

// postgres checkpoint
db, err := store.New(app, table, connStr)
if err != nil {
  log.Fatalf("new checkpoint error: %v", err)
}

To leverage the Postgres checkpoint we'll also need to create a table:

CREATE TABLE kinesis_consumer (
	namespace text NOT NULL,
	shard_id text NOT NULL,
	sequence_number numeric NOT NULL,
	CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id)
);

The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.

Mysql

The Mysql checkpoint requires Table Name, App Name, Stream Name and ConnectionString (just like the Postgres checkpoint!):

import store "github.com/harlow/kinesis-consumer/store/mysql"

// mysql checkpoint
db, err := store.New(app, table, connStr)
if err != nil {
  log.Fatalf("new checkpoint error: %v", err)
}

To leverage the Mysql checkpoint we'll also need to create a table:

CREATE TABLE kinesis_consumer (
	namespace varchar(255) NOT NULL,
	shard_id varchar(255) NOT NULL,
	sequence_number numeric(65,0) NOT NULL,
	CONSTRAINT kinesis_consumer_pk PRIMARY KEY (namespace, shard_id)
);

The table name has to be the same that you specify when creating the checkpoint. The primary key composed by namespace and shard_id is mandatory in order to the checkpoint run without issues and also to ensure data integrity.

Kinesis Client

Override the Kinesis client if there is any special config needed:

// client
client := kinesis.New(session.NewSession(aws.NewConfig()))

// consumer
c, err := consumer.New(streamName, consumer.WithClient(client))
Metrics

Add optional counter for exposing counts for checkpoints and records processed:

// registry
registry, ok := prometheus.DefaultRegisterer.(*prometheus.Registry)
if !ok {
    // handle error
}

// consumer
c, err := consumer.New(streamName, consumer.WithMetricRegistry(registry))

Prometheus will then have the following metrics available.

# TYPE net_kinesis_checkpoints_written_count_total counter
net_kinesis_checkpoints_written_count_total{shard_id="***",stream_name="***"} 1
# TYPE net_kinesis_events_consumed_count_total counter
net_kinesis_events_consumed_count_total{shard_id="***",stream_name="***"} 1
# TYPE net_kinesis_milliseconds_behind_latest summary
net_kinesis_milliseconds_behind_latest_sum{shard_id="***",stream_name="***"} 1.0
net_kinesis_milliseconds_behind_latest_count{shard_id="***",stream_name="***"} 1
Consumer starting point

Kinesis allows consumers to specify where on the stream they'd like to start consuming from. The default in this library is LATEST (Start reading just after the most recent record in the shard).

This can be adjusted by using the WithShardIteratorType option in the library:

// override starting place on stream to use TRIM_HORIZON
c, err := consumer.New(
  *stream,
  consumer.WithShardIteratorType(kinesis.ShardIteratorTypeTrimHorizon)
)

See AWS Docs for more options.

Logging

Logging supports the basic built-in logging library or use third party external one, so long as it implements the Logger interface.

For example, to use the builtin logging package, we wrap it with myLogger structure.

// A myLogger provides a minimalistic logger satisfying the Logger interface.
type myLogger struct {
	logger *log.Logger
}

// Log logs the parameters to the stdlib logger. See log.Println.
func (l *myLogger) Log(args ...interface{}) {
	l.logger.Println(args...)
}

The package defaults to ioutil.Discard so swallow all logs. This can be customized with the preferred logging strategy:

// logger
logger := &myLogger{
	logger: log.New(os.Stdout, "consumer-example: ", log.LstdFlags),
}

// consumer
c, err := consumer.New(streamName, consumer.WithLogger(logger))

To use a more complicated logging library, e.g. apex log

type myLogger struct {
	logger *log.Logger
}

func (l *myLogger) Log(args ...interface{}) {
	l.logger.Infof("producer", args...)
}

func main() {
	log := &myLogger{
		logger: alog.Logger{
			Handler: text.New(os.Stderr),
			Level:   alog.DebugLevel,
		},
	}

Examples

There are examples of producer and comsumer in the /examples directory. These should help give end-to-end examples of setting up consumers with different checkpoint strategies.

The examples run locally against Kinesis Lite.

$ kinesalite &

Produce data to the stream:

$ cat examples/producer/users.txt  | go run examples/producer/main.go --stream myStream

Consume data from the stream:

$ go run examples/consumer/main.go --stream myStream

Contributing

Please see CONTRIBUTING.md for more information. Thank you, contributors!

License

Copyright (c) 2015 Harlow Ward. It is free software, and may be redistributed under the terms specified in the LICENSE file.

www.hward.com  ·  GitHub @harlow  ·  Twitter @harlow_ward

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrSkipCheckpoint = errors.New("skip checkpoint")

ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that the current checkpoint should be skipped. It is not returned as an error by any function.

Functions

This section is empty.

Types

type AllGroup added in v0.3.7

type AllGroup struct {
	Store
	// contains filtered or unexported fields
}

AllGroup is used to consume all shards from a single consumer. It caches a local list of the shards we are already processing and routinely polls the stream looking for new shards to process.

func NewAllGroup added in v0.3.7

func NewAllGroup(kinesis kinesisClient, store Store, streamName string, logger *slog.Logger) *AllGroup

NewAllGroup returns an initialized AllGroup for consuming all shards on a stream

func (*AllGroup) Start added in v0.3.7

func (g *AllGroup) Start(ctx context.Context, shardc chan types.Shard)

Start is a blocking operation which will loop and attempt to find new shards on a regular cadence.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer wraps the interaction with the Kinesis stream

func New added in v0.2.0

func New(streamName string, opts ...Option) (*Consumer, error)

New creates a kinesis consumer with default settings. Use Option to override any of the optional attributes.

func (*Consumer) Scan added in v0.2.0

func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error

Scan launches a goroutine to process each of the shards in the stream. The ScanFunc is passed through to each of the goroutines and called with each message pulled from the stream.

func (*Consumer) ScanShard added in v0.2.0

func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error

ScanShard loops over records on a specific shard, calls the callback func for each record and checkpoints the progress of scan.

type Counter added in v0.2.0

type Counter interface {
	Add(string, int64)
}

Counter interface is used for exposing basic metrics from the scanner Deprecated. Will be removed in favor of prometheus in a future release.

type Group added in v0.3.7

type Group interface {
	Start(ctx context.Context, shardc chan types.Shard)
	GetCheckpoint(ctx context.Context, streamName, shardID string) (string, error)
	SetCheckpoint(ctx context.Context, streamName, shardID, sequenceNumber string) error
}

Group interface used to manage which shard to process

type Option added in v0.2.0

type Option func(*Consumer)

Option is used to override defaults when creating a new Consumer

func WithAggregation added in v0.3.7

func WithAggregation(a bool) Option

WithAggregation overrides the default option for aggregating records

func WithClient added in v0.2.0

func WithClient(client kinesisClient) Option

WithClient overrides the default client

func WithCounter added in v0.2.0

func WithCounter(counter Counter) Option

WithCounter overrides the default counter. Deprecated. Will be removed in favor of WithMetricRegistry in a future release.

func WithGroup added in v0.3.7

func WithGroup(group Group) Option

WithGroup overrides the default storage

func WithLogger added in v0.2.0

func WithLogger(logger *slog.Logger) Option

WithLogger overrides the default logger

func WithMaxRecords added in v0.3.7

func WithMaxRecords(n int64) Option

WithMaxRecords overrides the maximum number of records to be returned in a single GetRecords call for the consumer (specify a value of up to 10,000)

func WithMetricRegistry added in v0.3.11

func WithMetricRegistry(registry prometheus.Registerer) Option

WithMetricRegistry specifies a registry to add the prometheus metrics to. Defaults to nil.

func WithParallelProcessing added in v0.3.15

func WithParallelProcessing(numWorkers int) Option

WithParallelProcessing sets the size of the Worker Pool that processes incoming requests. Defaults to 1

func WithScanInterval added in v0.3.7

func WithScanInterval(d time.Duration) Option

WithScanInterval overrides the scan interval for the consumer

func WithShardClosedHandler added in v0.3.7

func WithShardClosedHandler(h ShardClosedHandler) Option

WithShardClosedHandler defines a custom handler for closed shards.

func WithShardIteratorType added in v0.3.1

func WithShardIteratorType(t string) Option

WithShardIteratorType overrides the starting point for the consumer

func WithStore added in v0.3.7

func WithStore(store Store) Option

WithStore overrides the default storage

func WithTimestamp added in v0.3.7

func WithTimestamp(t time.Time) Option

WithTimestamp overrides the starting point for the consumer

type Record added in v0.2.0

type Record struct {
	types.Record
	ShardID            string
	MillisBehindLatest *int64
}

Record wraps the record returned from the Kinesis library and extends to include the shard id.

type Result added in v0.3.15

type Result struct {
	Record
	WorkerName string
	// contains filtered or unexported fields
}

Result is the output of the worker. It contains the ID of the worker that processed it, the record itself (mainly to maintain the offset that the record has and the error of processing to propagate up.

type ScanFunc added in v0.3.7

type ScanFunc func(*Record) error

ScanFunc is the type of the function called for each message read from the stream. The record argument contains the original record returned from the AWS Kinesis library. If an error is returned, scanning stops. The sole exception is when the function returns the special value ErrSkipCheckpoint.

type ShardClosedHandler added in v0.3.7

type ShardClosedHandler = func(streamName, shardID string) error

ShardClosedHandler is a handler that will be called when the consumer has reached the end of a closed shard. No more records for that shard will be provided by the consumer. An error can be returned to stop the consumer.

type Store added in v0.3.7

type Store interface {
	GetCheckpoint(ctx context.Context, streamName, shardID string) (string, error)
	SetCheckpoint(ctx context.Context, streamName, shardID, sequenceNumber string) error
}

Store interface used to persist scan progress

type WorkerPool added in v0.3.15

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool allows to parallel process records

func NewWorkerPool added in v0.3.15

func NewWorkerPool(name string, numWorkers int, fn ScanFunc) *WorkerPool

NewWorkerPool returns an instance of WorkerPool

func (*WorkerPool) Result added in v0.3.15

func (wp *WorkerPool) Result() (*Result, error)

Result returns the Result of the Submit-ed Record after it has been processed.

func (*WorkerPool) Start added in v0.3.15

func (wp *WorkerPool) Start(ctx context.Context)

Start spawns the amount of workers specified in numWorkers and starts them.

func (*WorkerPool) Stop added in v0.3.15

func (wp *WorkerPool) Stop()

Stop stops the WorkerPool by closing the channels used for processing.

func (*WorkerPool) Submit added in v0.3.15

func (wp *WorkerPool) Submit(r Record)

Submit a new Record for processing

Directories

Path Synopsis
examples
store
ddb
memory
Package store
Package store

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL