Kafka C[r]onsumer
Description
Kafka Cronsumer is mainly used for retry/exception strategy management.
It works based on cron expression and consumes messages in a timely manner
with the power of auto pause and concurrency.
For details check our blog post
If you need a whole consumer lifecycle with exception management, check Kafka Konsumer
How Kafka Cronsumer Works
When to use it?
- Iteration-based back-off strategies are applicable
- Messages could be processed in an eventually consistent state
- Max retry exceeded messages could be ignored and send to dead letter topic
- To increase consumer resiliency
- To increase consumer performance with concurrency
When to avoid?
- Messages should be processed in order
- Messages should be certainly processed (we discard messages if max retry is exceeded)
- Messages should be committed (we use auto-commit interval for increasing performance)
- Messages with TTL (Time to Live)
Guide
Installation
go get github.com/Trendyol/kafka-cronsumer@latest
Examples
You can find a number of ready-to-run examples at this directory.
After running docker-compose up
command, you can run any application you want.
Don't forget its cron based :)
Single Consumer
func main() {
// ...
var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return nil
}
c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}
Single Consumer With Dead Letter
func main() {
// ...
var consumeFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("consumer > Message received: %s\n", string(message.Value))
return errors.New("error occurred")
}
c := cronsumer.New(kafkaConfig, consumeFn)
c.Run()
}
Multiple Consumers
func main() {
// ...
var firstConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("First consumer > Message received: %s\n", string(message.Value))
return nil
}
first := cronsumer.New(firstCfg, firstConsumerFn)
first.Start()
var secondConsumerFn kafka.ConsumeFn = func (message kafka.Message) error {
fmt.Printf("Second consumer > Message received: %s\n", string(message.Value))
return nil
}
second := cronsumer.New(secondCfg, secondConsumerFn)
second.Start()
// ...
}
Single Consumer With Metric collector
func main() {
// ...
var consumeFn kafka.ConsumeFn = func(message kafka.Message) error {
return errors.New("err occurred")
}
c := cronsumer.New(config, consumeFn)
StartAPI(*config, c.GetMetricCollectors()...)
c.Start()
// ...
}
func StartAPI(cfg kafka.Config, metricCollectors ...prometheus.Collector) {
// ...
f := fiber.New(
fiber.Config{},
)
metricMiddleware, err := NewMetricMiddleware(cfg, f, metricCollectors...)
f.Use(metricMiddleware)
// ...
}
Configurations
config |
description |
default |
example |
logLevel |
Describes log level, valid options are debug , info , warn , and error |
info |
|
metricPrefix |
MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer . Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current . So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current |
kafka_cronsumer |
|
consumer.clientId |
see doc |
|
|
consumer.cron |
Cron expression when exception consumer starts to work at |
|
*/1 * * * * |
consumer.backOffStrategy |
Define consumer backoff strategy for retry topics |
fixed |
exponential, linear |
consumer.duration |
Work duration exception consumer actively consuming messages |
NonStopWork (zero duration) |
20s, 15m, 1h, NonStopWork (zero duration) |
consumer.brokers |
broker address |
|
|
consumer.topic |
Exception topic names |
|
exception-topic |
consumer.groupId |
Exception consumer group id |
|
exception-consumer-group |
consumer.maxRetry |
Maximum retry value for attempting to retry a message |
3 |
|
consumer.concurrency |
Number of goroutines used at listeners |
1 |
|
consumer.verifyTopicOnStartup |
it checks existence of the given retry topic on the kafka cluster. |
false |
|
consumer.minBytes |
see doc |
1 |
|
consumer.maxBytes |
see doc |
1 MB |
|
consumer.maxWait |
see doc |
10s |
|
consumer.commitInterval |
see doc |
1s |
|
consumer.heartbeatInterval |
see doc |
3s |
|
consumer.sessionTimeout |
see doc |
30s |
|
consumer.rebalanceTimeout |
see doc |
30s |
|
consumer.startOffset |
see doc |
earliest |
|
consumer.retentionTime |
see doc |
24h |
|
consumer.queueCapacity |
see doc |
100 |
|
consumer.skipMessageByHeaderFn |
Function to filter messages based on headers, return true if you want to skip the message |
nil |
|
producer.clientId |
see doc |
|
|
producer.brokers |
Broker address if it is not given, uses consumer.Brokers addr |
consumer.Brokers addr |
|
producer.batchSize |
see doc |
100 |
|
producer.batchTimeout |
see doc |
1s |
|
producer.balancer |
see doc |
leastBytes |
|
sasl.enabled |
It enables sasl authentication mechanism |
false |
|
sasl.authType |
Currently we only support SCRAM |
"" |
|
sasl.username |
SCRAM username |
"" |
|
sasl.password |
SCRAM password |
"" |
|
sasl.rootCAPath |
see doc |
"" |
|
sasl.intermediateCAPath |
|
"" |
|
sasl.rack |
see doc |
"" |
|
Exposed Metrics
Metric Name |
Description |
Value Type |
kafka_cronsumer_retried_messages_total |
Total number of retried messages. |
Counter |
kafka_cronsumer_discarded_messages_total |
Total number of discarded messages. |
Counter |
Contribute
Use issues for everything
- For a small change, just send a PR.
- For bigger changes open an issue for discussion before sending a PR.
- PR should have:
- Test case
- Documentation
- Example (If it makes sense)
- You can also contribute by:
- Reporting issues
- Suggesting new features or enhancements
- Improve/fix documentation
Please adhere to this project's code of conduct
.
Maintainers
Code of Conduct
Contributor Code of Conduct. By participating in this project you agree to abide by its terms.
Libraries Used For This Project
Additional References