kinesumer

package module
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

README

Kinesumer

Run tests Release

Kinesumer is a Go client implementing a client-side distributed consumer group client for Amazon Kinesis. It supports following features:

  • Implement the client-side distributed Kinesis consumer group client.
  • A client can consume messages from multiple Kinesis streams.
  • Clients are automatically assigned a shard id range for each stream.
  • Rebalance each shard id range when clients or upstream shards are changed. (by restart or scaling issues)
  • Manage the checkpoint for each shard, so that clients can continue to consume from the last checkpoints.
  • Able to consume from the Kinesis stream in a different AWS account.
  • Manage all the consumer group client states with a DynamoDB table. (we call this table as state store.)

architecture

Setup

Kinesumer manages the state of the distributed clients with a database, called "state store". It uses the DynamoDB as the state store, so you need to create a DynamoDB table first. Create a table with LSI schema. See the details in here.

Current state store implementation supports multiple applications (you will pass the app name when initialize the client). So, if you already have a kinesumer state store, you don't need to create another state store table.

If your Kinesis stream is in different account

If you want to connect to Kinesis in a different account, you need to set up the IAM role to access to the target account, and pass the role arn (kinesumer.Config.RoleARN) when initialze the Kinesumer client: Reference.

Usage

package main

import (
    "fmt"
    "time"

    "github.com/tazapay/kinesumer"
)

func main() {
    client, err := kinesumer.NewKinesumer(
        &kinesumer.Config{
            App:            "myapp",
            KinesisRegion:  "ap-southeast-1",
            DynamoDBRegion: "ap-southeast-1",
            DynamoDBTable:  "kinesumer-state-store",
            ScanLimit:      1500,
            ScanTimeout:    2 * time.Second,
        },
    )
    if err != nil {
        // Error handling.
    }

    go func() {
        for err := range client.Errors() {
            // Error handling.
        }
    }()

    // Consume multiple streams.
    // You can refresh the streams with `client.Refresh()` method.
    records, err := client.Consume([]string{"stream1", "stream2"})
    if err != nil {
        // Error handling.
    }

    for record := range records {
        fmt.Printf("record: %v\n", record)
    }
}

How it works

Kinesumer implements the client-side distributed consumer group client without any communications between clients. Then, how do clients know the state of an entire system? The answer is the distributed key-value store.

To evenly distribute the shard range among clients, the Kinesumer relies on a centralized database, called state store. State store manages the states of the distributed clients, shard cache, and checkpoints.

This is the overview architecture of Kinesumer:

how-it-works

Following explains how the Kinesumer works:

  • Leader election: Clients register themselves to the state store and set their indexes. The index is determined by sorting all active client ids. And, a client who has zero index will be a leader. So, when clients are scaled or restarted, the leader could be changed.
  • Shard rebalancing: A client will fetch the full shard id list and client list from the state store. Then, divide the shard id list by the number of clients and assign a range of shard id corresponding to their index. All clients will repeat this process periodically.
  • Synchronization: The leader client is responsible to sync the shard cache with the latest shard list, and pruning the outdated client list (to prevent the orphan shard range) periodically.
  • Offset checkpoint: Whenever a client consumes messages from its assigned shards, it updates a per-shard checkpoint with the sequence number of the last message read from each shard.

License

See LICENSE.

Documentation

Overview

Package kinesumer is a generated GoMock package.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptySequenceNumber = errors.New("kinesumer: sequence number can't be empty")
	ErrInvalidStream       = errors.New("kinesumer: invalid stream")
)

Error codes.

View Source
var (
	ErrNoShardCache  = errors.New("kinesumer: shard cache not found")
	ErrEmptyShardIDs = errors.New("kinesumer: empty shard ids given")
)

Error codes.

Functions

This section is empty.

Types

type CommitConfig

type CommitConfig struct {
	// Whether to auto-commit updated sequence number. (default is true)
	Auto bool

	// How frequently to commit updated sequence numbers. (default is 5s)
	Interval time.Duration

	// A Timeout config for commit per stream. (default is 2s)
	Timeout time.Duration
}

CommitConfig holds options for how to offset handled.

func NewDefaultCommitConfig

func NewDefaultCommitConfig() *CommitConfig

NewDefaultCommitConfig returns a new default offset management configuration.

type Config

type Config struct {
	App      string // Application name.
	Region   string // Region name. (optional)
	ClientID string // Consumer group client id. (optional)

	// Kinesis configs.
	KinesisRegion   string
	KinesisEndpoint string // Only for local server.
	// If you want to consume messages from Kinesis in a different account,
	// you need to set up the IAM role to access to target account, and pass the role arn here.
	// Reference: https://docs.aws.amazon.com/kinesisanalytics/latest/java/examples-cross.html.
	RoleARN string

	// State store configs.
	StateStore       *StateStore
	DynamoDBRegion   string
	DynamoDBTable    string
	DynamoDBEndpoint string // Only for local server.

	// These configs are not used in EFO mode.
	ScanLimit    int32
	ScanTimeout  time.Duration
	ScanInterval time.Duration

	EFOMode bool // On/off the Enhanced Fan-Out feature.

	// This config is used for how to manage sequence number.
	Commit *CommitConfig
}

Config defines configs for the Kinesumer client.

type Kinesumer

type Kinesumer struct {
	// contains filtered or unexported fields
}

Kinesumer implements auto re-balancing consumer group for Kinesis. TODO(mingrammer): export prometheus metrics.

func NewKinesumer

func NewKinesumer(ctx context.Context, cfg *Config) (*Kinesumer, error)

NewKinesumer initializes and returns a new Kinesumer client.

func (*Kinesumer) Close

func (k *Kinesumer) Close()

Close stops the consuming and sync jobs.

func (*Kinesumer) Commit

func (k *Kinesumer) Commit()

Commit updates check point using current checkpoints.

func (*Kinesumer) Consume

func (k *Kinesumer) Consume(
	ctx context.Context,
	streams []string,
) (<-chan *Record, error)

Consume consumes messages from Kinesis.

func (*Kinesumer) Errors

func (k *Kinesumer) Errors() <-chan error

Errors returns error channel.

func (*Kinesumer) MarkRecord

func (k *Kinesumer) MarkRecord(record *Record)

MarkRecord marks the provided record as consumed.

func (*Kinesumer) Refresh

func (k *Kinesumer) Refresh(ctx context.Context, streams []string)

Refresh refreshes the consuming streams.

type MockStateStore

type MockStateStore struct {
	// contains filtered or unexported fields
}

MockStateStore is a mock of StateStore interface.

func NewMockStateStore

func NewMockStateStore(ctrl *gomock.Controller) *MockStateStore

NewMockStateStore creates a new mock instance.

func (*MockStateStore) DeregisterClient

func (m *MockStateStore) DeregisterClient(ctx context.Context, clientID string) error

DeregisterClient mocks base method.

func (*MockStateStore) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockStateStore) GetShards

func (m *MockStateStore) GetShards(ctx context.Context, stream string) (Shards, error)

GetShards mocks base method.

func (*MockStateStore) ListAllAliveClientIDs

func (m *MockStateStore) ListAllAliveClientIDs(ctx context.Context) ([]string, error)

ListAllAliveClientIDs mocks base method.

func (*MockStateStore) ListCheckPoints

func (m *MockStateStore) ListCheckPoints(ctx context.Context, stream string, shardIDs []string) (map[string]string, error)

ListCheckPoints mocks base method.

func (*MockStateStore) PingClientAliveness

func (m *MockStateStore) PingClientAliveness(ctx context.Context, clientID string) error

PingClientAliveness mocks base method.

func (*MockStateStore) PruneClients

func (m *MockStateStore) PruneClients(ctx context.Context) error

PruneClients mocks base method.

func (*MockStateStore) RegisterClient

func (m *MockStateStore) RegisterClient(ctx context.Context, clientID string) error

RegisterClient mocks base method.

func (*MockStateStore) UpdateCheckPoints

func (m *MockStateStore) UpdateCheckPoints(ctx context.Context, checkpoints []*ShardCheckPoint) error

UpdateCheckPoints mocks base method.

func (*MockStateStore) UpdateShards

func (m *MockStateStore) UpdateShards(ctx context.Context, stream string, shards Shards) error

UpdateShards mocks base method.

type MockStateStoreMockRecorder

type MockStateStoreMockRecorder struct {
	// contains filtered or unexported fields
}

MockStateStoreMockRecorder is the mock recorder for MockStateStore.

func (*MockStateStoreMockRecorder) DeregisterClient

func (mr *MockStateStoreMockRecorder) DeregisterClient(ctx, clientID interface{}) *gomock.Call

DeregisterClient indicates an expected call of DeregisterClient.

func (*MockStateStoreMockRecorder) GetShards

func (mr *MockStateStoreMockRecorder) GetShards(ctx, stream interface{}) *gomock.Call

GetShards indicates an expected call of GetShards.

func (*MockStateStoreMockRecorder) ListAllAliveClientIDs

func (mr *MockStateStoreMockRecorder) ListAllAliveClientIDs(ctx interface{}) *gomock.Call

ListAllAliveClientIDs indicates an expected call of ListAllAliveClientIDs.

func (*MockStateStoreMockRecorder) ListCheckPoints

func (mr *MockStateStoreMockRecorder) ListCheckPoints(ctx, stream, shardIDs interface{}) *gomock.Call

ListCheckPoints indicates an expected call of ListCheckPoints.

func (*MockStateStoreMockRecorder) PingClientAliveness

func (mr *MockStateStoreMockRecorder) PingClientAliveness(ctx, clientID interface{}) *gomock.Call

PingClientAliveness indicates an expected call of PingClientAliveness.

func (*MockStateStoreMockRecorder) PruneClients

func (mr *MockStateStoreMockRecorder) PruneClients(ctx interface{}) *gomock.Call

PruneClients indicates an expected call of PruneClients.

func (*MockStateStoreMockRecorder) RegisterClient

func (mr *MockStateStoreMockRecorder) RegisterClient(ctx, clientID interface{}) *gomock.Call

RegisterClient indicates an expected call of RegisterClient.

func (*MockStateStoreMockRecorder) UpdateCheckPoints

func (mr *MockStateStoreMockRecorder) UpdateCheckPoints(ctx, checkpoints interface{}) *gomock.Call

UpdateCheckPoints indicates an expected call of UpdateCheckPoints.

func (*MockStateStoreMockRecorder) UpdateShards

func (mr *MockStateStoreMockRecorder) UpdateShards(ctx, stream, shards interface{}) *gomock.Call

UpdateShards indicates an expected call of UpdateShards.

type Record

type Record struct {
	Stream  string
	ShardID string
	types.Record
}

Record represents kinesis.Record with stream name.

type Shard

type Shard struct {
	ID     string
	Closed bool
}

Shard holds shard id and a flag of "CLOSED" state.

type ShardCheckPoint

type ShardCheckPoint struct {
	Stream         string
	ShardID        string
	SequenceNumber string
	UpdatedAt      time.Time
}

ShardCheckPoint manages a shard check point.

type Shards

type Shards []*Shard

Shards is a collection of Shard.

type StateStore

type StateStore interface {
	GetShards(ctx context.Context, stream string) (Shards, error)
	UpdateShards(ctx context.Context, stream string, shards Shards) error
	ListAllAliveClientIDs(ctx context.Context) ([]string, error)
	RegisterClient(ctx context.Context, clientID string) error
	DeregisterClient(ctx context.Context, clientID string) error
	PingClientAliveness(ctx context.Context, clientID string) error
	PruneClients(ctx context.Context) error
	ListCheckPoints(ctx context.Context, stream string, shardIDs []string) (map[string]string, error)
	UpdateCheckPoints(ctx context.Context, checkpoints []*ShardCheckPoint) error
}

StateStore is a distributed key-value store for managing states.

Directories

Path Synopsis
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL