README ¶
GO-KAFKA
Go-kafka provides an easy way to use kafka listeners and producers with only a few lines of code. The listener is able to consume from multiple topics, and will execute a separate handler for each topic.
📘 Important note for v3 upgrade:
- The library now relies on the IBM/sarama library instead of Shopify/sarama, which is no longer maintained.
- The
kafka.Handler
type has been changed to a struct containing both the function to execute and the handler's optional configuration.- The global variable
PushConsumerErrorsToTopic
has been replaced by thePushConsumerErrorsToRetryTopic
andPushConsumerErrorsToDeadletterTopic
properties on the handler.These two changes should be the only breaking changes in the v3 release. The rest of the library should be compatible with the previous version.
Quick start
Simple consumer
// topic-specific handlers
var handler1 kafka.Handler
var handler2 kafka.Handler
// map your topics to their handlers
handlers := map[string]kafka.Handler{
"topic-1": handler1,
"topic-2": handler2,
}
// define your listener
kafka.Brokers = []string{"localhost:9092"}
listener, _ := kafka.NewListener("my-consumer-group", handlers)
defer listener.Close()
// listen and enjoy
errc <- listener.Listen(ctx)
Simple producer
// define your producer
kafka.Brokers = []string{"localhost:9092"}
producer, _ := kafka.NewProducer()
// send your message
message := &sarama.ProducerMessage{
Topic: "my-topic",
Value: sarama.StringEncoder("my-message"),
}
_ = producer.Produce(message)
Features
- Create a listener on multiple topics
- Retry policy on message handling
- Create a producer
- Prometheus instrumenting
Consumer error handling
You can customize the error handling of the consumer, using various patterns:
- Blocking retries of the same event (Max number, and delay are configurable by handler)
- Forward to retry topic for automatic retry without blocking the consumer
- Forward to deadletter topic for manual investigation
Here is the overall logic applied to handle errors:
stateDiagram-v2
init: Error processing an event
state is_omitable_err <<choice>>
skipWithoutCounting: Skip the event without impacting counters
state is_retriable_err <<choice>>
state is_deadletter_configured <<choice>>
skip: Skip the event
forwardDL: Forward to deadletter topic
state should_retry <<choice>>
blocking_retry : Blocking Retry of this event
state is_retry_topic_configured <<choice>>
state is_deadletter_configured2 <<choice>>
forwardRQ: Forward to Retry topic
skip2: Skip the event
defaultDL: Forward to Deadletter topic
init --> is_omitable_err
is_omitable_err --> skipWithoutCounting: Error is of type ErrEventOmitted
is_omitable_err --> is_retriable_err: Error is not an ErrEventOmitted
is_retriable_err --> is_deadletter_configured: Error is of type ErrEventUnretriable
is_retriable_err --> should_retry: Error is retriable
should_retry --> blocking_retry: There are some retries left
should_retry --> is_retry_topic_configured : No more blocking retry
is_deadletter_configured --> skip: No Deadletter topic configured
is_deadletter_configured --> forwardDL: Deadletter topic configured
is_retry_topic_configured --> forwardRQ: Retry Topic Configured
is_retry_topic_configured --> is_deadletter_configured2: No Retry Topic Configured
is_deadletter_configured2 --> skip2: No Deadletter topic configured
is_deadletter_configured2 --> defaultDL: Deadletter topic configured
Error types
Two types of errors are introduced, so that application code can return them whenever relevant
kafka.ErrEventUnretriable
- Errors that should not be retriedkafka.ErrEventOmitted
- Errors that should lead to the event being omitted without impacting metrics
All the other errors will be considered as "retryable" errors.
Depending on the Retry topic/Deadletter topic/Max retries configuration, the event will be retried, forwarded to a retry topic, or forwarded to a deadletter topic.
Blocking Retries
By default, failed events consumptions will be retried 3 times (each attempt is separated by 2 seconds). This can be globally configured through the following properties:
ConsumerMaxRetries
DurationBeforeRetry
These properties can also be configured on a per-topic basis by setting the ConsumerMaxRetries
and DurationBeforeRetry
properties on the handler.
If you want to achieve a blocking retry pattern (ie. continuously retrying until the event is successfully consumed), you can set ConsumerMaxRetries
to InfiniteRetries
(-1).
If you want to not retry specific errors, you can wrap them in a kafka.ErrEventUnretriable
error before returning them, or return a kafka.ErrNonRetriable
directly.
// This error will not be retried
err := errors.New("my error")
return errors.Wrap(kafka.ErrNonRetriable, err.Error())
// This error will also not be retried
return kafka.ErrNonRetriable
Deadletter And Retry topics
By default, events that have exceeded the maximum number of blocking retries will be pushed to a retry topic or dead letter topic.
This behaviour can be disabled through the PushConsumerErrorsToRetryTopic
and PushConsumerErrorsToDeadletterTopic
properties.
PushConsumerErrorsToRetryTopic = false
PushConsumerErrorsToDeadletterTopic = false
If these switches are ON, the names of the deadletter and retry topics are dynamically generated based on the original topic name and the consumer group.
For example, if the original topic is my-topic
and the consumer group is my-consumer-group
, the deadletter topic will be my-consumer-group-my-topic-deadletter
.
This pattern can be overridden through the ErrorTopicPattern
property.
Also, the retry and deadletter topics name can be overridden through the RetryTopic
and DeadLetterTopic
properties on the handler.
Note that, if global PushConsumerErrorsToRetryTopic
or PushConsumerErrorsToDeadletterTopic
property are false, but you configure RetryTopic
or DeadLetterTopic
properties on a handler, then the events in error will be forwarder to the error topics only for this handler.
Omitting specific errors
In certain scenarios, you might want to omit some errors. For example, you might want to discard outdated events that are not relevant anymore. Such events would increase a separate, dedicated metric instead of the error one, and would not be retried. To do so, wrap the errors that should lead to omitted events in a ErrEventOmitted, or return a kafka.ErrEventOmitted directly.
// This error will be omitted
err := errors.New("my error")
return errors.Wrap(kafka.ErrEventOmitted, err.Error())
// This error will also be omitted
return kafka.ErrEventOmitted
Instrumenting
Metrics for the listener and the producer can be exported to Prometheus. The following metrics are available:
Metric name | Labels | Description |
---|---|---|
kafka_consumer_record_consumed_total | kafka_topic, consumer_group | Number of messages consumed |
kafka_consumer_record_latency_seconds | kafka_topic, consumer_group | Latency of consuming a message |
kafka_consumer_record_omitted_total | kafka_topic, consumer_group | Number of messages omitted |
kafka_consumer_record_error_total | kafka_topic, consumer_group | Number of errors when consuming a message |
kafka_consumergroup_current_message_timestamp | kafka_topic, consumer_group, partition, type | Timestamp of the current message being processed. Type can be either of LogAppendTime or CreateTime . |
kafka_producer_record_send_total | kafka_topic | Number of messages sent |
kafka_producer_dead_letter_created_total | kafka_topic | Number of messages sent to a dead letter topic |
kafka_producer_record_error_total | kafka_topic | Number of errors when sending a message |
To activate the tracing on go-Kafka:
// define your listener
listener, _ := kafka.NewListener(brokers, "my-consumer-group", handlers, kafka.WithInstrumenting())
defer listener.Close()
// Instances a new HTTP server for metrics using prometheus
go func() {
httpAddr := ":8080"
mux.Handle("/metrics", promhttp.Handler())
errc <- http.ListenAndServe(httpAddr, mux)
}()
Default configuration
Configuration of consumer/producer is opinionated. It aim to resolve simply problems that have taken us by surprise in the past. For this reason:
- the default partioner is based on murmur2 instead of the one sarama use by default
- offset retention is set to 30 days
- initial offset is oldest
License
go-kafka is licensed under the MIT license. (http://opensource.org/licenses/MIT)
Contributing
Pull requests are the way to help us here. We will be really grateful.
Documentation ¶
Overview ¶
Package kafka copied from https://github.com/burdiyan/kafkautil/blob/master/partitioner.go copied here to ensure this stay.
Index ¶
- Constants
- Variables
- func DefaultTracing(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
- func DeserializeContextFromKafkaHeaders(ctx context.Context, kafkaheaders string) (context.Context, error)
- func GetContextFromKafkaMessage(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
- func GetKafkaHeadersFromContext(ctx context.Context) []sarama.RecordHeader
- func MurmurHasher() hash.Hash32
- func NewJVMCompatiblePartitioner(topic string) sarama.Partitioner
- func Ptr[T any](v T) *T
- func SerializeKafkaHeadersFromContext(ctx context.Context) (string, error)
- type ConsumerMetricsService
- type Handler
- type HandlerConfig
- type Handlers
- type Listener
- type ListenerOption
- type Producer
- type ProducerHandler
- type ProducerMetricsService
- type ProducerOption
- type StdLogger
- type TracingFunc
Constants ¶
const ( TimestampTypeLogAppendTime = "LogAppendTime" TimestampTypeCreateTime = "CreateTime" )
const InfiniteRetries = -1
InfiniteRetries is a constant to define infinite retries. It is used to set the ConsumerMaxRetries to a blocking retry process.
Variables ¶
var ( ErrEventUnretriable = errors.New("the event will not be retried") ErrEventOmitted = errors.New("the event will be omitted") )
var Brokers []string
Brokers is the list of Kafka brokers to connect to.
var Config = sarama.NewConfig()
Config is the sarama (cluster) config used for the consumer and producer.
var ConsumerMaxRetries = 3
ConsumerMaxRetries is the maximum number of time we want to retry to process an event before throwing the error. By default 3 times.
var DeadletterTopicPattern = "$$CG$$-$$T$$-deadletter"
DeadletterTopicPattern is the deadletter topic name pattern. By default "consumergroup-topicname-deadletter" Use $$CG$$ as consumer group placeholder Use $$T$$ as original topic name placeholder
var DurationBeforeRetry = 2 * time.Second
DurationBeforeRetry is the duration we wait between process retries. By default 2 seconds.
var PushConsumerErrorsToDeadletterTopic = true
PushConsumerErrorsToDeadletterTopic is a boolean to define if messages in error have to be pushed to a deadletter topic.
var PushConsumerErrorsToRetryTopic = true
PushConsumerErrorsToRetryTopic is a boolean to define if messages in error have to be pushed to a retry topic.
var RetryTopicPattern = "$$CG$$-$$T$$-retry"
RetryTopicPattern is the retry topic name pattern. By default "consumergroup-topicname-retry" Use $$CG$$ as consumer group placeholder Use $$T$$ as original topic name placeholder
Functions ¶
func DefaultTracing ¶
func DefaultTracing(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
DefaultTracing implements TracingFunc It fetches opentracing headers from the kafka message headers, then creates a span using the opentracing.GlobalTracer() usage: `listener, err = kafka.NewListener(brokers, appName, handlers, kafka.WithTracing(kafka.DefaultTracing))`
func DeserializeContextFromKafkaHeaders ¶
func DeserializeContextFromKafkaHeaders(ctx context.Context, kafkaheaders string) (context.Context, error)
DeserializeContextFromKafkaHeaders fetches tracing headers from json encoded carrier and returns the context
func GetContextFromKafkaMessage ¶
func GetContextFromKafkaMessage(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
GetContextFromKafkaMessage fetches tracing headers from the kafka message
func GetKafkaHeadersFromContext ¶
func GetKafkaHeadersFromContext(ctx context.Context) []sarama.RecordHeader
GetKafkaHeadersFromContext fetch tracing metadata from context and returns them in format []RecordHeader
func MurmurHasher ¶
MurmurHasher creates murmur2 hasher implementing hash.Hash32 interface. The implementation is not full and does not support streaming. It only implements the interface to comply with sarama.NewCustomHashPartitioner signature. But Sarama only uses Write method once, when writing keys and values of the message, so streaming support is not necessary.
func NewJVMCompatiblePartitioner ¶
func NewJVMCompatiblePartitioner(topic string) sarama.Partitioner
NewJVMCompatiblePartitioner creates a Sarama partitioner that uses the same hashing algorithm as JVM Kafka clients.
Types ¶
type ConsumerMetricsService ¶
type ConsumerMetricsService struct {
// contains filtered or unexported fields
}
ConsumerMetricsService object represents consumer metrics
func NewConsumerMetricsService ¶
func NewConsumerMetricsService(groupID string) *ConsumerMetricsService
NewConsumerMetricsService creates a layer of service that add metrics capability
func (*ConsumerMetricsService) Instrumentation ¶
func (c *ConsumerMetricsService) Instrumentation(next Handler) Handler
Instrumentation middleware used to add metrics
type Handler ¶
type Handler struct { Processor func(ctx context.Context, msg *sarama.ConsumerMessage) error Config HandlerConfig }
Handler Processor that handle received kafka messages Handler Config can be used to override global configuration for a specific handler
type HandlerConfig ¶
type Listener ¶
Listener is able to listen multiple topics with one handler by topic
func NewListener ¶
func NewListener(groupID string, handlers Handlers, options ...ListenerOption) (Listener, error)
NewListener creates a new instance of Listener
type ListenerOption ¶
type ListenerOption func(l *listener)
ListenerOption add listener option
func WithInstrumenting ¶
func WithInstrumenting() ListenerOption
WithInstrumenting adds the instrumenting layer on a listener.
func WithTracing ¶
func WithTracing(tracer TracingFunc) ListenerOption
WithTracing accepts a TracingFunc to execute before each message
type Producer ¶
type Producer interface {
Produce(msg *sarama.ProducerMessage) error
}
func NewProducer ¶
func NewProducer(options ...ProducerOption) (Producer, error)
NewProducer creates a new producer that uses the default sarama client.
type ProducerHandler ¶
type ProducerHandler func(p *producer, msg *sarama.ProducerMessage) error
ProducerHandler is a function that handles the production of a message. It is exposed to allow for easy middleware building.
type ProducerMetricsService ¶
type ProducerMetricsService struct {
// contains filtered or unexported fields
}
ProducerMetricsService is a service that provides metrics for the producer.
func NewDeadletterProducerMetricsService ¶
func NewDeadletterProducerMetricsService() *ProducerMetricsService
func NewProducerMetricsService ¶
func NewProducerMetricsService() *ProducerMetricsService
func (*ProducerMetricsService) DeadletterInstrumentation ¶
func (p *ProducerMetricsService) DeadletterInstrumentation(next ProducerHandler) ProducerHandler
DeadletterInstrumentation is a middleware that provides metrics for a deadletter producer.
func (*ProducerMetricsService) Instrumentation ¶
func (p *ProducerMetricsService) Instrumentation(next ProducerHandler) ProducerHandler
Instrumentation is a middleware that provides metrics for the producer.
type ProducerOption ¶
type ProducerOption func(p *producer)
ProducerOption is a function that is passed to the producer constructor to configure it.
func WithDeadletterProducerInstrumenting ¶
func WithDeadletterProducerInstrumenting() ProducerOption
WithDeadletterProducerInstrumenting adds the instrumenting layer on a deadletter producer.
func WithProducerInstrumenting ¶
func WithProducerInstrumenting() ProducerOption
WithProducerInstrumenting adds the instrumenting layer on a producer.
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log messages.
ErrorLogger is the instance of a StdLogger interface. By default it is set to output on stderr all log messages, but you can set it to redirect wherever you want.
type TracingFunc ¶
type TracingFunc func(ctx context.Context, msg *sarama.ConsumerMessage) (opentracing.Span, context.Context)
TracingFunc is used to create tracing and/or propagate the tracing context from each messages to the go context.