group

package
v0.60.0-beta Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 15, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package group provides kafka consumer group component implementation.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Component

type Component struct {
	// contains filtered or unexported fields
}

Component is a kafka consumer implementation that processes messages in batch

func New

func New(name, group string, brokers, topics []string, proc kafka.BatchProcessorFunc, oo ...OptionFunc) (*Component, error)

New initializes a new kafka consumer component with support for functional configuration. The default failure strategy is the ExitStrategy. The default batch size is 1 and the batch timeout is 100ms. The default number of retries is 0 and the retry wait is 0.

func (*Component) Run

func (c *Component) Run(ctx context.Context) error

Run starts the consumer processing loop to process messages from Kafka.

type OptionFunc

type OptionFunc func(*Component) error

OptionFunc definition for configuring the component in a functional way.

func BatchSize

func BatchSize(size uint) OptionFunc

BatchSize sets the message batch size the component should process at once.

func BatchTimeout

func BatchTimeout(timeout time.Duration) OptionFunc

BatchTimeout sets the message batch timeout. If the desired batch size is not reached and if the timeout elapses without new messages coming in, the messages in the buffer would get processed as a batch.

func CommitSync

func CommitSync() OptionFunc

CommitSync instructs the consumer to commit offsets in a blocking operation after processing every batch of messages

func FailureStrategy

func FailureStrategy(fs kafka.FailStrategy) OptionFunc

FailureStrategy sets the strategy to follow for the component when it encounters an error. The kafka.ExitStrategy will fail the component, if there are Retries > 0 then the component will reconnect and retry the failed message. The kafka.SkipStrategy will skip the message on failure. If a client wants to retry a message before failing then this needs to be handled in the kafka.BatchProcessorFunc.

func Retries

func Retries(count uint) OptionFunc

Retries sets the number of time a component should retry in case of an error. These retries are depleted in these cases: * when there are temporary connection issues * a message batch fails to be processed through the user-defined processing function and the failure strategy is set to kafka.ExitStrategy * any other reason for which the component needs to reconnect.

func RetryWait

func RetryWait(interval time.Duration) OptionFunc

RetryWait sets the wait period for the component retry.

func SaramaConfig

func SaramaConfig(cfg *sarama.Config) OptionFunc

SaramaConfig specifies a sarama consumer config. Use this to set consumer config on sarama level. Check the sarama config documentation for more config options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL