consumer

package
v0.0.0-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AllGroup

type AllGroup struct {
	Store
	// contains filtered or unexported fields
}

AllGroup is used to consume all shards from a single consumer. It caches a local list of the shards we are already processing and routinely polls the stream looking for new shards to process.

func NewAllGroup

func NewAllGroup(ksis kinesisiface.KinesisAPI, store Store, streamName string, logger log.Logger) *AllGroup

NewAllGroup returns an intitialized AllGroup for consuming all shards on a stream

func (*AllGroup) Start

func (g *AllGroup) Start(ctx context.Context, shardc chan *kinesis.Shard)

Start is a blocking operation which will loop and attempt to find new shards on a regular cadence.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func New

func New(streamName string, opts ...Option) (*Consumer, error)

func (*Consumer) Scan

func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error

Scan launches a goroutine to process each of the shards in the stream. The ScanFunc is passed through to each of the goroutines and called with each message pulled from the stream.

func (*Consumer) ScanShard

func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) error

ScanShard loops over records on a specific shard, calls the callback func for each record and checkpoints the progress of scan.

func (*Consumer) SetCheckpoint

func (c *Consumer) SetCheckpoint(shardID, sequenceNumber string) error

type CoordinatorStore

type CoordinatorStore struct {
	// contains filtered or unexported fields
}

func NewCoordinatorStore

func NewCoordinatorStore(cp coordinator.TransferState, transferID string) *CoordinatorStore

func (*CoordinatorStore) GetCheckpoint

func (c *CoordinatorStore) GetCheckpoint(streamName, shardID string) (string, error)

func (*CoordinatorStore) SetCheckpoint

func (c *CoordinatorStore) SetCheckpoint(streamName, shardID, sequenceNumber string) error

type Group

type Group interface {
	Start(ctx context.Context, shardc chan *kinesis.Shard)
	GetCheckpoint(streamName, shardID string) (string, error)
	SetCheckpoint(streamName, shardID, sequenceNumber string) error
}

Group interface used to manage which shard to process

type Option

type Option func(*Consumer)

Option is used to override defaults when creating a new Consumer

func WithClient

func WithClient(client kinesisiface.KinesisAPI) Option

WithClient overrides the default client

func WithGroup

func WithGroup(group Group) Option

WithGroup overrides the default storage

func WithMaxRecords

func WithMaxRecords(n int64) Option

WithMaxRecords overrides the maximum number of records to be returned in a single GetRecords call for the consumer (specify a value of up to 10,000)

func WithScanInterval

func WithScanInterval(d time.Duration) Option

WithScanInterval overrides the scan interval for the consumer

func WithShardClosedHandler

func WithShardClosedHandler(h ShardClosedHandler) Option

func WithShardIteratorType

func WithShardIteratorType(t string) Option

WithShardIteratorType overrides the starting point for the consumer

func WithStore

func WithStore(store Store) Option

WithStore overrides the default storage

func WithTimestamp

func WithTimestamp(t time.Time) Option

WithTimestamp overrides the starting point for the consumer

type Record

type Record struct {
	*kinesis.Record
	ShardID            string
	MillisBehindLatest *int64
}

Record wraps the record returned from the Kinesis library and extends to include the shard id.

type ScanFunc

type ScanFunc func([]*Record) error

type ShardClosedHandler

type ShardClosedHandler = func(streamName, shardID string) error

ShardClosedHandler is a handler that will be called when the consumer has reached the end of a closed shard. No more records for that shard will be provided by the consumer. An error can be returned to stop the consumer.

type Store

type Store interface {
	GetCheckpoint(streamName, shardID string) (string, error)
	SetCheckpoint(streamName, shardID, sequenceNumber string) error
}

Store interface used to persist scan progress

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL