Documentation ¶
Overview ¶
Package group provides kafka consumer group component implementation.
Index ¶
- type Component
- type OptionFunc
- func BatchMessageDeduplication() OptionFunc
- func BatchSize(size uint) OptionFunc
- func BatchTimeout(timeout time.Duration) OptionFunc
- func CheckTopic() OptionFunc
- func CommitSync() OptionFunc
- func FailureStrategy(fs kafka.FailStrategy) OptionFunc
- func Retries(count uint) OptionFunc
- func RetryWait(interval time.Duration) OptionFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component is a kafka consumer implementation that processes messages in batch
func New ¶
func New(name, group string, brokers, topics []string, proc kafka.BatchProcessorFunc, saramaCfg *sarama.Config, oo ...OptionFunc) (*Component, error)
New initializes a new kafka consumer component with support for functional configuration. The default failure strategy is the ExitStrategy. The default batch size is 1 and the batch timeout is 100ms. The default number of retries is 0 and the retry wait is 0.
type OptionFunc ¶
OptionFunc definition for configuring the component in a functional way.
func BatchMessageDeduplication ¶ added in v0.64.0
func BatchMessageDeduplication() OptionFunc
BatchMessageDeduplication enables the deduplication of messages based on the message's key. This implementation does not do additional sorting, but instead relies on the ordering guarantees that Kafka gives within partitions of a topic. Don't use this functionality if you've changed your producer's partition hashing behaviour to a nondeterministic way.
func BatchSize ¶
func BatchSize(size uint) OptionFunc
BatchSize sets the message batch size the component should process at once.
func BatchTimeout ¶
func BatchTimeout(timeout time.Duration) OptionFunc
BatchTimeout sets the message batch timeout. If the desired batch size is not reached and if the timeout elapses without new messages coming in, the messages in the buffer would get processed as a batch.
func CheckTopic ¶ added in v0.62.0
func CheckTopic() OptionFunc
CheckTopic checks whether the component-configured topics exist in the broker.
func CommitSync ¶
func CommitSync() OptionFunc
CommitSync instructs the consumer to commit offsets in a blocking operation after processing every batch of messages
func FailureStrategy ¶
func FailureStrategy(fs kafka.FailStrategy) OptionFunc
FailureStrategy sets the strategy to follow for the component when it encounters an error. The kafka.ExitStrategy will fail the component, if there are Retries > 0 then the component will reconnect and retry the failed message. The kafka.SkipStrategy will skip the message on failure. If a client wants to retry a message before failing then this needs to be handled in the kafka.BatchProcessorFunc.
func Retries ¶
func Retries(count uint) OptionFunc
Retries sets the number of time a component should retry in case of an error. These retries are depleted in these cases: * when there are temporary connection issues * a message batch fails to be processed through the user-defined processing function and the failure strategy is set to kafka.ExitStrategy * any other reason for which the component needs to reconnect.
func RetryWait ¶
func RetryWait(interval time.Duration) OptionFunc
RetryWait sets the wait period for the component retry.