Documentation ¶
Index ¶
- Variables
- type AllGroup
- type Consumer
- type Counter
- type Group
- type Option
- func WithAggregation(a bool) Option
- func WithClient(client kinesisClient) Option
- func WithCounter(counter Counter) Option
- func WithGroup(group Group) Option
- func WithLogger(logger *slog.Logger) Option
- func WithMaxRecords(n int64) Option
- func WithMetricRegistry(registry prometheus.Registerer) Option
- func WithParallelProcessing(numWorkers int) Option
- func WithScanInterval(d time.Duration) Option
- func WithShardClosedHandler(h ShardClosedHandler) Option
- func WithShardIteratorType(t string) Option
- func WithStore(store Store) Option
- func WithTimestamp(t time.Time) Option
- type Record
- type Result
- type ScanFunc
- type ShardClosedHandler
- type Store
- type WorkerPool
Constants ¶
This section is empty.
Variables ¶
var ErrSkipCheckpoint = errors.New("skip checkpoint")
ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that the current checkpoint should be skipped. It is not returned as an error by any function.
Functions ¶
This section is empty.
Types ¶
type AllGroup ¶ added in v0.3.7
type AllGroup struct { Store // contains filtered or unexported fields }
AllGroup is used to consume all shards from a single consumer. It caches a local list of the shards we are already processing and routinely polls the stream looking for new shards to process.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer wraps the interaction with the Kinesis stream
func New ¶ added in v0.2.0
New creates a kinesis consumer with default settings. Use Option to override any of the optional attributes.
type Counter ¶ added in v0.2.0
Counter interface is used for exposing basic metrics from the scanner Deprecated. Will be removed in favor of prometheus in a future release.
type Group ¶ added in v0.3.7
type Group interface { Start(ctx context.Context, shardc chan types.Shard) GetCheckpoint(ctx context.Context, streamName, shardID string) (string, error) SetCheckpoint(ctx context.Context, streamName, shardID, sequenceNumber string) error }
Group interface used to manage which shard to process
type Option ¶ added in v0.2.0
type Option func(*Consumer)
Option is used to override defaults when creating a new Consumer
func WithAggregation ¶ added in v0.3.7
WithAggregation overrides the default option for aggregating records
func WithClient ¶ added in v0.2.0
func WithClient(client kinesisClient) Option
WithClient overrides the default client
func WithCounter ¶ added in v0.2.0
WithCounter overrides the default counter. Deprecated. Will be removed in favor of WithMetricRegistry in a future release.
func WithLogger ¶ added in v0.2.0
WithLogger overrides the default logger
func WithMaxRecords ¶ added in v0.3.7
WithMaxRecords overrides the maximum number of records to be returned in a single GetRecords call for the consumer (specify a value of up to 10,000)
func WithMetricRegistry ¶ added in v0.3.11
func WithMetricRegistry(registry prometheus.Registerer) Option
WithMetricRegistry specifies a registry to add the prometheus metrics to. Defaults to nil.
func WithParallelProcessing ¶ added in v0.3.15
WithParallelProcessing sets the size of the Worker Pool that processes incoming requests. Defaults to 1
func WithScanInterval ¶ added in v0.3.7
WithScanInterval overrides the scan interval for the consumer
func WithShardClosedHandler ¶ added in v0.3.7
func WithShardClosedHandler(h ShardClosedHandler) Option
WithShardClosedHandler defines a custom handler for closed shards.
func WithShardIteratorType ¶ added in v0.3.1
WithShardIteratorType overrides the starting point for the consumer
func WithTimestamp ¶ added in v0.3.7
WithTimestamp overrides the starting point for the consumer
type Record ¶ added in v0.2.0
Record wraps the record returned from the Kinesis library and extends to include the shard id.
type Result ¶ added in v0.3.15
Result is the output of the worker. It contains the ID of the worker that processed it, the record itself (mainly to maintain the offset that the record has and the error of processing to propagate up.
type ScanFunc ¶ added in v0.3.7
ScanFunc is the type of the function called for each message read from the stream. The record argument contains the original record returned from the AWS Kinesis library. If an error is returned, scanning stops. The sole exception is when the function returns the special value ErrSkipCheckpoint.
type ShardClosedHandler ¶ added in v0.3.7
ShardClosedHandler is a handler that will be called when the consumer has reached the end of a closed shard. No more records for that shard will be provided by the consumer. An error can be returned to stop the consumer.
type Store ¶ added in v0.3.7
type Store interface { GetCheckpoint(ctx context.Context, streamName, shardID string) (string, error) SetCheckpoint(ctx context.Context, streamName, shardID, sequenceNumber string) error }
Store interface used to persist scan progress
type WorkerPool ¶ added in v0.3.15
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool allows to parallel process records
func NewWorkerPool ¶ added in v0.3.15
func NewWorkerPool(name string, numWorkers int, fn ScanFunc) *WorkerPool
NewWorkerPool returns an instance of WorkerPool
func (*WorkerPool) Result ¶ added in v0.3.15
func (wp *WorkerPool) Result() (*Result, error)
Result returns the Result of the Submit-ed Record after it has been processed.
func (*WorkerPool) Start ¶ added in v0.3.15
func (wp *WorkerPool) Start(ctx context.Context)
Start spawns the amount of workers specified in numWorkers and starts them.
func (*WorkerPool) Stop ¶ added in v0.3.15
func (wp *WorkerPool) Stop()
Stop stops the WorkerPool by closing the channels used for processing.
func (*WorkerPool) Submit ¶ added in v0.3.15
func (wp *WorkerPool) Submit(r Record)
Submit a new Record for processing