Documentation ¶
Index ¶
- func DialerSCRAM512(username string, password string) (*kafka.Dialer, error)
- func NonStopExponentialBackOff() backoff.BackOff
- type ClientLogger
- type Config
- type Consumer
- type Group
- type GroupConfig
- type Handler
- type HandlerRetryBackOffConstructor
- type Message
- type Metadata
- type NotifyError
- type Option
- func WithDataDogTracing() Option
- func WithExplicitCommit() Option
- func WithGroupBalancers(groupBalancers ...kafka.GroupBalancer) Option
- func WithHandlerBackOffRetry(backOffConstructor HandlerRetryBackOffConstructor) Option
- func WithKafkaDialer(dialer *kafka.Dialer) Option
- func WithKafkaReader(readerFn func() Reader) Option
- func WithLogger(logger ClientLogger) Option
- func WithNotifyError(notifier NotifyError) Option
- func WithQueueCapacity(queueCapacity int) Option
- type Reader
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DialerSCRAM512 ¶
DialerSCRAM512 returns a Kafka dialer configured with SASL authentication to securely transmit the provided credentials to Kafka using SCRAM-SHA-512.
func NonStopExponentialBackOff ¶
func NonStopExponentialBackOff() backoff.BackOff
NonStopExponentialBackOff is the suggested backoff retry strategy for consumers groups handling messages where ordering matters e.g. a data-capture stream from a database. This results in endless retries to prevent Kafka from re-balancing the group so that each consumer does not eventually experience the same error.
Retry intervals: 500ms, 4s, 32s, 4m, 34m, 4.5h, 5h (max).
The max interval of 5 hours is intended to leave enough time for manual intervention if necessary.
Types ¶
type ClientLogger ¶
type Config ¶
type Config struct { ID string // Default: UUID Brokers []string Topic string MinBytes int // Default: 1MB MaxBytes int // Default: 10MB MaxWait time.Duration // Default: 250ms QueueCapacity int // Default: 100 // contains filtered or unexported fields }
Config is a configuration object used to create a new Consumer.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer provides a high level API for consuming and handling messages from a Kafka topic.
It is worth noting that publishing failed messages to a dead letter queue is not supported and instead would need to be included in your handler implementation.
func NewConsumer ¶
NewConsumer returns a new Consumer configured with the provided dialer and config.
type Group ¶
type Group struct { ID string // contains filtered or unexported fields }
Group groups consumers together to concurrently consume and handle messages from a Kafka topic. Many groups with the same group ID are safe to use, which is particularly useful for groups across separate instances.
It is worth noting that publishing failed messages to a dead letter queue is not supported and instead would need to be included in your handler implementation.
func NewGroup ¶
func NewGroup(config GroupConfig, opts ...Option) *Group
NewGroup returns a new Group configured with the provided dialer and config.
func (*Group) Run ¶
Run concurrently consumes and handles messages from the topic across all consumers in the group. The method call returns an error channel that is used to receive any consumer errors. The run process is only stopped if the context is canceled, the consumer has been closed, or all consumers in the group have errored.
type GroupConfig ¶
type GroupConfig struct { Count int Brokers []string Topic string GroupID string MinBytes int // Default: 1MB MaxBytes int // Default: 10MB MaxWait time.Duration // Default: 250ms QueueCapacity int // Default: 100 }
GroupConfig is a configuration object used to create a new Group. The default consumer count in a group is 1 unless specified otherwise.
type HandlerRetryBackOffConstructor ¶
type HandlerRetryBackOffConstructor func() backoff.BackOff
type NotifyError ¶
NotifyError is a notify-on-error function used to report consumer handler errors.
type Option ¶
type Option func(consumer *Consumer)
func WithDataDogTracing ¶
func WithDataDogTracing() Option
WithDataDogTracing adds Data Dog tracing to the consumer.
A span is started each time a Kafka message is read and finished when the offset is committed. The consumer span can also be retrieved from within your handler using tracer.SpanFromContext.
func WithExplicitCommit ¶
func WithExplicitCommit() Option
WithExplicitCommit enables offset commit only after a message is successfully handled.
Do not use this option if the default behaviour of auto committing offsets on initial read (before handling the message) is required.
func WithGroupBalancers ¶
func WithGroupBalancers(groupBalancers ...kafka.GroupBalancer) Option
WithGroupBalancers adds a priority-ordered list of client-side consumer group balancing strategies that will be offered to the coordinator. The first strategy that all group members support will be chosen by the leader.
Default: [Range, RoundRobin]
Only used by consumer group.
func WithHandlerBackOffRetry ¶
func WithHandlerBackOffRetry(backOffConstructor HandlerRetryBackOffConstructor) Option
WithHandlerBackOffRetry adds a back off retry policy on the consumer handler.
func WithKafkaDialer ¶
func WithKafkaDialer(dialer *kafka.Dialer) Option
func WithKafkaReader ¶
WithKafkaReader allows a custom reader to be injected into the Consumer/Group. Using this will ignore any other reader specific options passed in.
It is highly recommended to not use this option unless injecting a mock reader implementation for testing.
func WithLogger ¶
func WithLogger(logger ClientLogger) Option
WithLogger specifies a logger used to report internal consumer reader changes.
func WithNotifyError ¶
func WithNotifyError(notifier NotifyError) Option
WithNotifyError adds the NotifyError function to the consumer for it to be invoked on each consumer handler error.
func WithQueueCapacity ¶
WithQueueCapacity sets the internal message queue capacity. Defaults to 100 if none is set.