kafka

package module
v1.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2023 License: MIT Imports: 20 Imported by: 0

README

Kafka Konsumer

Description

Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).

Guide

Installation
go get github.com/Trendyol/kafka-konsumer@latest
Examples

You can find a number of ready-to-run examples at this directory.

After running docker-compose up command, you can run any application you want.

Simple Consumer
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        ConsumeFn:    consumeFn,
        RetryEnabled: false,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
Simple Consumer With Retry/Exception Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        ConsumeFn: consumeFn,
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()
    
    consumer.Consume()
}

func consumeFn(message kafka.Message) error {
    fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
    return nil
}
With Batch Option
func main() {
    consumerCfg := &kafka.ConsumerConfig{
        Reader: kafka.ReaderConfig{
            Brokers: []string{"localhost:29092"},
            Topic:   "standart-topic",
            GroupID: "standart-cg",
        },
        LogLevel:     kafka.LogLevelDebug,
        RetryEnabled: true,
        RetryConfiguration: kafka.RetryConfiguration{
            Brokers:       []string{"localhost:29092"},
            Topic:         "retry-topic",
            StartTimeCron: "*/1 * * * *",
            WorkDuration:  50 * time.Second,
            MaxRetry:      3,
        },
        BatchConfiguration: kafka.BatchConfiguration{
            MessageGroupLimit:    1000,
            MessageGroupDuration: time.Second,
            BatchConsumeFn:       batchConsumeFn,
        },
    }

    consumer, _ := kafka.NewConsumer(consumerCfg)
    defer consumer.Stop()

    consumer.Consume()
}

func batchConsumeFn(messages []kafka.Message) error {
    fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
    return nil
}
With Grafana & Prometheus

In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can see the example by going to the with-grafana folder in the examples folder and running the infrastructure with docker compose up and then the application.

grafana

With SASL-PLAINTEXT Authentication

Under the examples - with-sasl-plaintext folder, you can find an example of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up under the specified folder and then start the application.

Configurations

config description default
reader Describes all segmentio kafka reader configurations
consumeFn Kafka consumer function, if retry enabled it, is also used to consume retriable messages
logLevel Describes log level; valid options are debug, info, warn, and error info
concurrency Number of goroutines used at listeners 1
retryEnabled Retry/Exception consumer is working or not false
rack see doc
clientId see doc
dial.Timeout see doc no timeout
dial.KeepAlive see doc not enabled
transport.DialTimeout see doc 5s
transport.IdleTimeout see doc 30s
transport.MetadataTTL see doc 6s
transport.MetadataTopics see doc all topics in cluster
retryConfiguration.clientId see doc
retryConfiguration.startTimeCron Cron expression when retry consumer (kafka-cronsumer) starts to work at
retryConfiguration.workDuration Work duration exception consumer actively consuming messages
retryConfiguration.topic Retry/Exception topic names
retryConfiguration.brokers Retry topic brokers urls
retryConfiguration.maxRetry Maximum retry value for attempting to retry a message 3
retryConfiguration.tls.rootCAPath see doc ""
retryConfiguration.tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
retryConfiguration.sasl.authType SCRAM or PLAIN
retryConfiguration.sasl.username SCRAM OR PLAIN username
retryConfiguration.sasl.password SCRAM OR PLAIN password
batchConfiguration.messageGroupLimit Maximum number of messages in a batch
batchConfiguration.messageGroupDuration Maximum time to wait for a batch
tls.rootCAPath see doc ""
tls.intermediateCAPath Same with rootCA, if you want to specify two rootca you can use it with rootCAPath ""
sasl.authType SCRAM or PLAIN
sasl.username SCRAM OR PLAIN username
sasl.password SCRAM OR PLAIN password
logger If you want to custom logger info
apiEnabled Enabled metrics false
apiConfiguration.port Set API port 8090
apiConfiguration.healtCheckPath Set Health check path healthcheck
metricConfiguration.path Set metric endpoint path /metrics

Monitoring

Kafka Konsumer offers an API that handles exposing several metrics.

Exposed Metrics
Metric Name Description Value Type
kafka_konsumer_processed_messages_total Total number of processed messages. Counter
kafka_konsumer_processed_batch_messages_total Total number of processed batch messages. Counter
kafka_konsumer_unprocessed_messages_total Total number of unprocessed messages. Counter
kafka_konsumer_unprocessed_batch_messages_total Total number of unprocessed batch messages. Counter

Documentation

Index

Constants

View Source
const (
	MechanismScram = "scram"
	MechanismPlain = "plain"
)
View Source
const Name = "kafka_konsumer"

Variables

This section is empty.

Functions

func NewMetricMiddleware added in v1.3.3

func NewMetricMiddleware(cfg *ConsumerConfig,
	app *fiber.App,
	consumerMetric *ConsumerMetric,
	metricCollectors ...prometheus.Collector,
) (func(ctx *fiber.Ctx) error, error)

Types

type API

type API interface {
	Start()
	Stop()
}

func NewAPI

func NewAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector) API

type APIConfiguration

type APIConfiguration struct {
	// Port default is 8090
	Port *int

	// HealthCheckPath default is /healthcheck
	HealthCheckPath *string
}

type BatchConfiguration

type BatchConfiguration struct {
	BatchConsumeFn       BatchConsumeFn
	MessageGroupLimit    int
	MessageGroupDuration time.Duration
}

type BatchConsumeFn

type BatchConsumeFn func([]Message) error

type ConsumeFn

type ConsumeFn func(Message) error

type Consumer

type Consumer interface {
	Consume()
	WithLogger(logger LoggerInterface)
	Stop() error
}

func NewConsumer

func NewConsumer(cfg *ConsumerConfig) (Consumer, error)

type ConsumerConfig

type ConsumerConfig struct {
	APIConfiguration    APIConfiguration
	Logger              LoggerInterface
	MetricConfiguration MetricConfiguration
	SASL                *SASLConfig
	TLS                 *TLSConfig
	Dial                *DialConfig
	BatchConfiguration  *BatchConfiguration
	ConsumeFn           ConsumeFn
	ClientID            string
	Rack                string
	LogLevel            LogLevel
	Reader              ReaderConfig
	RetryConfiguration  RetryConfiguration
	Concurrency         int
	RetryEnabled        bool
	APIEnabled          bool
}

type ConsumerMetric added in v1.3.3

type ConsumerMetric struct {
	TotalUnprocessedMessagesCounter      int64
	TotalProcessedMessagesCounter        int64
	TotalUnprocessedBatchMessagesCounter int64
	TotalProcessedBatchMessagesCounter   int64
}

type DialConfig added in v1.4.6

type DialConfig struct {
	Timeout   time.Duration
	KeepAlive time.Duration
}

type Dialer

type Dialer struct {
	*kafka.Dialer
}

func (*Dialer) SetSASL

func (t *Dialer) SetSASL(mechanism sasl.Mechanism)

func (*Dialer) SetTLSConfig

func (t *Dialer) SetTLSConfig(config *tls.Config)

type Layer

type Layer interface {
	SetTLSConfig(config *tls.Config)
	SetSASL(mechanism sasl.Mechanism)
}

type LogLevel

type LogLevel string
const (
	LogLevelDebug LogLevel = "debug"
	LogLevelInfo  LogLevel = "info"
	LogLevelWarn  LogLevel = "warn"
	LogLevelError LogLevel = "error"
)

type LoggerInterface

type LoggerInterface interface {
	// With returns a logger based off the root logger and decorates it with the given context and arguments.
	With(args ...interface{}) LoggerInterface

	// Debug uses fmt.Sprint to construct and log a message at DEBUG level
	Debug(args ...interface{})
	// Info uses fmt.Sprint to construct and log a message at INFO level
	Info(args ...interface{})
	// Warn uses fmt.Sprint to construct and log a message at ERROR level
	Warn(args ...interface{})
	// Error uses fmt.Sprint to construct and log a message at ERROR level
	Error(args ...interface{})

	// Debugf uses fmt.Sprintf to construct and log a message at DEBUG level
	Debugf(format string, args ...interface{})
	// Infof uses fmt.Sprintf to construct and log a message at INFO level
	Infof(format string, args ...interface{})
	// Warnf uses fmt.Sprintf to construct and log a message at WARN level
	Warnf(format string, args ...interface{})
	// Errorf uses fmt.Sprintf to construct and log a message at ERROR level
	Errorf(format string, args ...interface{})

	Infow(msg string, keysAndValues ...interface{})
	Errorw(msg string, keysAndValues ...interface{})
	Warnw(msg string, keysAndValues ...interface{})
}

LoggerInterface is a logger that supports log levels, context and structured logging.

func NewZapLogger

func NewZapLogger(level LogLevel) LoggerInterface

type Mechanism

type Mechanism string

type Message

type Message kafka.Message

func (*Message) AddHeader

func (m *Message) AddHeader(header ...kafka.Header)

func (*Message) Header

func (m *Message) Header(key string) *kafka.Header

func (*Message) RemoveHeader

func (m *Message) RemoveHeader(header kafka.Header)

type MetricConfiguration

type MetricConfiguration struct {
	// Path default is /metrics
	Path *string
}

type Producer

type Producer interface {
	Produce(ctx context.Context, message Message) error
	ProduceBatch(ctx context.Context, messages []Message) error
	Close() error
}

func NewProducer

func NewProducer(cfg ProducerConfig) (Producer, error)

type ProducerConfig

type ProducerConfig struct {
	Transport *TransportConfig
	SASL      *SASLConfig
	TLS       *TLSConfig
	ClientID  string
	Writer    WriterConfig
}

type ReaderConfig

type ReaderConfig kafka.ReaderConfig

type RetryConfiguration

type RetryConfiguration struct {
	SASL          *SASLConfig
	TLS           *TLSConfig
	ClientID      string
	StartTimeCron string
	Topic         string
	Rack          string
	Brokers       []string
	MaxRetry      int
	WorkDuration  time.Duration
	LogLevel      LogLevel
}

type SASLConfig

type SASLConfig struct {
	Type     Mechanism
	Username string
	Password string
}

func (*SASLConfig) IsEmpty

func (s *SASLConfig) IsEmpty() bool

func (*SASLConfig) Mechanism

func (s *SASLConfig) Mechanism() (sasl.Mechanism, error)

type TLSConfig

type TLSConfig struct {
	RootCAPath         string
	IntermediateCAPath string
}

func (*TLSConfig) IsEmpty

func (c *TLSConfig) IsEmpty() bool

func (*TLSConfig) TLSConfig

func (c *TLSConfig) TLSConfig() (*tls.Config, error)

type Transport

type Transport struct {
	*kafka.Transport
}

func (*Transport) SetSASL

func (t *Transport) SetSASL(mechanism sasl.Mechanism)

func (*Transport) SetTLSConfig

func (t *Transport) SetTLSConfig(config *tls.Config)

type TransportConfig added in v1.4.6

type TransportConfig struct {
	DialTimeout    time.Duration
	IdleTimeout    time.Duration
	MetadataTTL    time.Duration
	MetadataTopics []string
}

type WriterConfig

type WriterConfig struct {
	ErrorLogger            kafka.Logger
	Logger                 kafka.Logger
	Balancer               kafka.Balancer
	Completion             func(messages []kafka.Message, err error)
	Topic                  string
	Brokers                []string
	ReadTimeout            time.Duration
	BatchTimeout           time.Duration
	BatchBytes             int64
	WriteTimeout           time.Duration
	RequiredAcks           kafka.RequiredAcks
	BatchSize              int
	WriteBackoffMax        time.Duration
	WriteBackoffMin        time.Duration
	MaxAttempts            int
	Async                  bool
	Compression            kafka.Compression
	AllowAutoTopicCreation bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL