Documentation ¶
Overview ¶
Package consumer provides a high level API for consuming messages from a Kafka topic.
Consumer ¶
A single consumer is the most basic use case when consuming messages from a topic.
c := consumer.NewConsumer(dialer, config) err := c.Run(ctx, handler)
Consumer Group ¶
Multiple consumers can be created as Group to consume messages at a higher rate.
g := consumer.Group(dialer, config) errCh := g.Run(ctx, handler)
for err := range errCh { if err != nil { panic(err) } }
The consumer count and for the group is specified in the config parameter, which determines the number of goroutines to spawn. Each goroutine has their own Consumer and is set up so that individual consumer errors are reported back via the error channel returned from Group.Run.
It is important to note that Kafka is ultimately responsible for managing group members. As a result, a consumer group can easily be spread across multiple instances by simply using the same group ID for each Group. Kafka will then take care of re-balancing the group if members are added/removed.
Index ¶
- func DialerSCRAM512(username string, password string) (*kafka.Dialer, error)
- func NonStopExponentialBackOff() backoff.BackOff
- type Config
- type Consumer
- type DebugLogger
- type Group
- type GroupConfig
- type Handler
- type HandlerRetryBackOffConstructor
- type LoggerFunc
- type Message
- type Metadata
- type NotifyError
- type Option
- func WithDataDogTracing() Option
- func WithExplicitCommit() Option
- func WithGroupBalancers(groupBalancers ...kafka.GroupBalancer) Option
- func WithHandlerBackOffRetry(backOffConstructor HandlerRetryBackOffConstructor) Option
- func WithKafkaReader(readerFn func() Reader) Option
- func WithNotifyError(notify NotifyError) Option
- func WithReaderErrorLogger(logger kafka.LoggerFunc) Option
- func WithReaderLogger(logger kafka.LoggerFunc) Option
- type Reader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DialerSCRAM512 ¶
DialerSCRAM512 returns a Kafka dialer configured with SASL authentication to securely transmit the provided credentials to Kafka using SCRAM-SHA-512.
func NonStopExponentialBackOff ¶
func NonStopExponentialBackOff() backoff.BackOff
NonStopExponentialBackOff is the suggested backoff retry strategy for consumers groups handling messages where ordering matters e.g. a data-capture stream from a database. This results in endless retries to prevent Kafka from re-balancing the group so that each consumer does not eventually experience the same error.
Retry intervals: 500ms, 4s, 32s, 4m, 34m, 4.5h, 5h (max).
The max interval of 5 hours is intended to leave enough time for manual intervention if necessary.
Types ¶
type Config ¶
type Config struct { ID string // Default: UUID Brokers []string Topic string MinBytes int // Default: 1MB MaxBytes int // Default: 10MB MaxWait time.Duration // Default: 250ms QueueCapacity int // Default: 100 DebugLogger DebugLogger // contains filtered or unexported fields }
Config is a configuration object used to create a new Consumer.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer provides a high level API for consuming and handling messages from a Kafka topic.
It is worth noting that publishing failed messages to a dead letter queue is not supported and instead would need to be included in your handler implementation.
func NewConsumer ¶
NewConsumer returns a new Consumer configured with the provided dialer and config.
type DebugLogger ¶
DebugLogger interface for consumer debug logging.
type Group ¶
type Group struct { ID string // contains filtered or unexported fields }
Group groups consumers together to concurrently consume and handle messages from a Kafka topic. Many groups with the same group ID are safe to use, which is particularly useful for groups across separate instances.
It is worth noting that publishing failed messages to a dead letter queue is not supported and instead would need to be included in your handler implementation.
func NewGroup ¶
func NewGroup(dialer *kafka.Dialer, config GroupConfig, opts ...Option) *Group
NewGroup returns a new Group configured with the provided dialer and config.
func (*Group) Run ¶
Run concurrently consumes and handles messages from the topic across all consumers in the group. The method call returns an error channel that is used to receive any consumer errors. The run process is only stopped if the context is canceled, the consumer has been closed, or all consumers in the group have errored.
type GroupConfig ¶
type GroupConfig struct { Count int Brokers []string Topic string GroupID string MinBytes int // Default: 1MB MaxBytes int // Default: 10MB MaxWait time.Duration // Default: 250ms QueueCapacity int // Default: 100 DebugLogger DebugLogger }
GroupConfig is a configuration object used to create a new Group. The default consumer count in a group is 1 unless specified otherwise.
type HandlerRetryBackOffConstructor ¶
type HandlerRetryBackOffConstructor func() backoff.BackOff
type LoggerFunc ¶
LoggerFunc is a bridge between DebugLogger and any third party logger. Usage:
l := NewLogger() // some logger c consumer.GroupConfig{ DebugLogger: consumer.LoggerFunc(func(msg string, keyvals ...any) { logger.Debug().Fields(keyvals).Msg(msg) }), }
func (LoggerFunc) Print ¶
func (f LoggerFunc) Print(msg string, keyvals ...any)
type NotifyError ¶
NotifyError is a notify-on-error function used to report consumer handler errors.
type Option ¶
type Option func(consumer *Consumer)
func WithDataDogTracing ¶
func WithDataDogTracing() Option
WithDataDogTracing adds Data Dog tracing to the consumer.
A span is started each time a Kafka message is read and finished when the offset is committed. The consumer span can also be retrieved from within your handler using tracer.SpanFromContext.
func WithExplicitCommit ¶
func WithExplicitCommit() Option
WithExplicitCommit enables offset commit only after a message is successfully handled.
Do not use this option if the default behaviour of auto committing offsets on initial read (before handling the message) is required.
func WithGroupBalancers ¶
func WithGroupBalancers(groupBalancers ...kafka.GroupBalancer) Option
WithGroupBalancers adds a priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader.
Default: [Range, RoundRobin]
Only used by consumer group.
func WithHandlerBackOffRetry ¶
func WithHandlerBackOffRetry(backOffConstructor HandlerRetryBackOffConstructor) Option
WithHandlerBackOffRetry adds a back off retry policy on the consumer handler.
func WithKafkaReader ¶
WithKafkaReader allows a custom reader to be injected into the Consumer/Group. Using this will ignore any other reader specific options passed in.
It is highly recommended to not use this option unless injecting a mock reader implementation for testing.
func WithNotifyError ¶
func WithNotifyError(notify NotifyError) Option
WithNotifyError adds the NotifyError function to the consumer for it to be invoked on each consumer handler error.
func WithReaderErrorLogger ¶
func WithReaderErrorLogger(logger kafka.LoggerFunc) Option
WithReaderErrorLogger specifies a logger used to report internal consumer reader errors.
func WithReaderLogger ¶
func WithReaderLogger(logger kafka.LoggerFunc) Option
WithReaderLogger specifies a logger used to report internal consumer reader changes.