Documentation
¶
Overview ¶
Package kafka wraps github.com/segmentio/kafka-go API and adds additional functionalities such as tracing and buffering.
Example ¶
ctx := GetCorrelationContext() co, _ := kafka.NewPoller(kafka.WithConsumerTopic([]string{"test.topic1"})) defer co.Close(ctx) pr, _ := kafka.NewProducer(kafka.WithProducerTopic("test.topic1")) defer pr.Close(ctx) ch := make(chan *cKafka.Message, 100) var wg sync.WaitGroup totalCount := 100000 wg.Add(1) uuidVal := "TestKafkaPoll" + uuid.NewString() go func() { defer wg.Done() for i := 0; i < totalCount; i++ { ctx := GetCorrelationContext() pr.ProduceMessage(ctx, strconv.Itoa(i), &utils.Message{ Event: uuidVal, }, nil) } pr.Flush(ctx) }() tCtx, cancel := context.WithTimeout(ctx, time.Second*45) defer cancel() st := time.Now() go co.Poll(tCtx, ch) count := 0 msgCount := 0 for i := range ch { m, err := kafka.LoadMessage(i) msgCount++ if m.Event == uuidVal { count++ } if totalCount == count { cancel() } if err != nil { KafkaTestLogger.Error(ctx, "parse error", err) } KafkaTestLogger.Info(ctx, "Kafka message", m) } KafkaTestLogger.Info(ctx, "Total matched", count) KafkaTestLogger.Info(ctx, "Total received", msgCount) wg.Wait() KafkaTestLogger.Notice(ctx, "Time taken in ms", time.Now().Sub(st)/1000000)
Output:
Example (ConfluentKafka) ¶
ctx := GetCorrelationContext() cred := kafka.GetDefaultCredConfig() cred.TLSConfig = &tls.Config{ //For Confluent kafka MinVersion: tls.VersionTLS12, } co, _ := kafka.NewPoller(kafka.WithConsumerCredConfig(cred), kafka.WithConsumerTopic([]string{"test.topic1"})) defer co.Close(ctx) pr, _ := kafka.NewProducer(kafka.WithProducerCredConfig(cred)) defer pr.Close(ctx)
Output:
Index ¶
- Constants
- Variables
- func LoadMessage(src *kafka.Message) (*utils.Message, error)
- func ValidateConsumerConfig(config *ConsumerConfig) error
- func ValidateProducerConfig(config *ProducerConfig) error
- type ConsumerConfig
- type ConsumerOption
- func WithAutoCommit(autoCommit bool) ConsumerOption
- func WithAutoCommitInterval(intervalInMs uint64) ConsumerOption
- func WithConsumerBuffer(maxBuffer uint) ConsumerOption
- func WithConsumerCredConfig(creds *CredConfig) ConsumerOption
- func WithConsumerLogger(logger log.Log) ConsumerOption
- func WithConsumerModuleName(name string) ConsumerOption
- func WithConsumerServiceName(name string) ConsumerOption
- func WithConsumerTopic(topics []string) ConsumerOption
- func WithConsumerTracer(tracer ConsumerTracer) ConsumerOption
- func WithGroupID(groupID string) ConsumerOption
- func WithReader(reader *kafka.Reader) ConsumerOption
- type ConsumerTracer
- type CredConfig
- type Message
- type Poller
- type ProduceTracer
- type Producer
- func (k *Producer) Close(ctx context.Context) error
- func (k *Producer) Name(ctx context.Context) string
- func (k *Producer) Produce(ctx context.Context, topic, key string, message []byte, ...) (err error)
- func (k *Producer) ProduceMessage(ctx context.Context, key string, message *utils.Message, ...) (err error)
- func (k *Producer) ProduceMessageWithTopic(ctx context.Context, topic, key string, message *utils.Message, ...) (err error)
- func (k *Producer) Shutdown(ctx context.Context) error
- type ProducerConfig
- type ProducerOption
- func WithAcknowledge(ack int) ProducerOption
- func WithAsync(async bool) ProducerOption
- func WithAutoFlushInterval(interval uint64) ProducerOption
- func WithBatch(batch bool) ProducerOption
- func WithPoducerLogger(logger log.Log) ProducerOption
- func WithProducerBuffer(buffer int) ProducerOption
- func WithProducerCredConfig(credConfig *CredConfig) ProducerOption
- func WithProducerModuleName(name string) ProducerOption
- func WithProducerTopic(topic string) ProducerOption
- func WithProducerTracer(tracer ProduceTracer) ProducerOption
- func WithWriter(writer *kafka.Writer) ProducerOption
- type Reader
- type Writer
Examples ¶
Constants ¶
const ( ModuleProducer = "KafkaProducer" ModuleConsumer = "KafkaConsumer" )
Variables ¶
var ErrReaderBufferFull = fmt.Errorf("Reader.StoreMessage: Buffer full")
ErrReaderBufferFull is returned when the buffer is full and a message cannot be stored.
var ErrWriterBufferFull = fmt.Errorf("Reader.Send: Buffer full")
ErrWriterBufferFull is an error indicating that the writer buffer is full.
Functions ¶
func LoadMessage ¶
LoadMessage loads a message from the given Kafka message.
func ValidateConsumerConfig ¶
func ValidateConsumerConfig(config *ConsumerConfig) error
func ValidateProducerConfig ¶
func ValidateProducerConfig(config *ProducerConfig) error
Types ¶
type ConsumerConfig ¶
type ConsumerConfig struct { *CredConfig // Embeds CredConfig for credential and connection details. GroupID string // Consumer group id AutoCommit bool // Flag to enable auto commit for consumed messages MaxBuffer uint // Count of message for batch commit AutoCommitInterval uint64 // Interval in milliseconds to auto commit messages. Log log.Log // Logger instance Trace ConsumerTracer // Tracer for consuming messages Reader *kafka.Reader // Reader for consuming messages Topics []string // Topics to consume ModuleName string // Name of the module for logging. ClientId string // Name of the service for client id }
ConsumerConfig represents the configuration for a Kafka consumer.
func GetDefaultConsumerConfig ¶
func GetDefaultConsumerConfig() *ConsumerConfig
GetDefaultConsumerConfig creates a new ConsumerConfig with values from environment variables or default values.
Environment Variables - SERVICE_NAME: Sets [ClientId] - KAFKA__CONSUMER__GROUP_ID: Sets [GroupID] - KAFKA__CONSUMER__TOPICS: Sets [Topics] - KAFKA__CONSUMER__AUTO_COMMIT: Sets [AutoCommit] - KAFKA__CONSUMER__MAX_BUFFER: Sets [MaxBuffer] - KAFKA__CONSUMER__AUTO_COMMIT_INTERVAL: Sets [AutoCommitInterval]
type ConsumerOption ¶
type ConsumerOption func(*ConsumerConfig)
ConsumerOption defines a function type that modifies the ConsumerConfig.
func WithAutoCommit ¶
func WithAutoCommit(autoCommit bool) ConsumerOption
WithAutoCommit sets the auto-commit option for the Kafka consumer.
func WithAutoCommitInterval ¶
func WithAutoCommitInterval(intervalInMs uint64) ConsumerOption
WithAutoCommitInterval sets the auto-commit interval for the Kafka consumer.
func WithConsumerBuffer ¶
func WithConsumerBuffer(maxBuffer uint) ConsumerOption
WithConsumerBuffer sets the maximum buffer size for the Kafka consumer.
func WithConsumerCredConfig ¶
func WithConsumerCredConfig(creds *CredConfig) ConsumerOption
WithConsumerCredConfig sets the Kafka credentials configuration.
func WithConsumerLogger ¶
func WithConsumerLogger(logger log.Log) ConsumerOption
WithConsumerLogger sets the logger for the Kafka consumer.
func WithConsumerModuleName ¶
func WithConsumerModuleName(name string) ConsumerOption
WithConsumerModuleName sets the module name for kafka consumer.
func WithConsumerServiceName ¶
func WithConsumerServiceName(name string) ConsumerOption
WithConsumerServiceName sets the service name for kafka consumer.
func WithConsumerTopic ¶
func WithConsumerTopic(topics []string) ConsumerOption
WithConsumerTopic sets the topics for the Kafka consumer.
func WithConsumerTracer ¶
func WithConsumerTracer(tracer ConsumerTracer) ConsumerOption
WithConsumerTracer sets the tracer for the Kafka consumer.
func WithGroupID ¶
func WithGroupID(groupID string) ConsumerOption
WithGroupID sets the group ID for the Kafka consumer.
func WithReader ¶
func WithReader(reader *kafka.Reader) ConsumerOption
WithReader sets the Kafka reader for the consumer.
type ConsumerTracer ¶
ConsumerTracer defines an interface for tracing consumer operations.
type CredConfig ¶
type CredConfig struct { Brokers []string // List of Kafka broker addresses. SASLMechanism sasl.Mechanism // SASL mechanism for authentication. TLSConfig *tls.Config // TLS configuration for secure connections. }
CredConfig holds the configuration for Kafka credentials and connection details.
Environment Variables - KAFAK__BROKER: Sets [Brokers] - KAFAK__SASL__TYPE: Sets [SASLType]
func GetDefaultCredConfig ¶
func GetDefaultCredConfig() *CredConfig
GetDefaultCredConfig returns a default CredConfig with values from environment variables or default values.
type Message ¶
type Message struct { *kafka.Message // contains filtered or unexported fields }
Message wraps a kafka.Message and provides additional functionalities.
func (*Message) GetBody ¶
GetBody returns the message value as a string. It caches the result for subsequent calls.
func (*Message) GetHeaders ¶
GetHeaders returns the headers of the Kafka message as a map. It lazily initializes and caches the headers map on first access.
type Poller ¶
type Poller struct { *Reader // contains filtered or unexported fields }
Poller is a high-level API that extends Reader with time and count-based auto commit and implements a shutdown hook.
If Reader is not set in ConsumerConfig then creates a new kafka.Reader with the options passed to the function, and adds addition params
readerConfig := kafka.ReaderConfig{ Brokers: config.Brokers, GroupID: config.GroupID, GroupTopics: config.Topics, HeartbeatInterval: time.Second, QueueCapacity: int(config.MaxBuffer), MaxBytes: 10e6, // 10MB, Dialer: &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: config.SASLMechanism, TLS: config.TLSConfig, }, Logger: &kafkaLogger{ Log: logger.NewResourceLogger(config.ModuleName + ":InfoLog"), ctx: ctx, isError: false, }, ErrorLogger: &kafkaLogger{ Log: logger.NewResourceLogger(config.ModuleName + ":ErrorLog"), ctx: ctx, isError: true, }, } config.Reader = kafka.NewReader(readerConfig)
func NewPoller ¶
func NewPoller(options ...ConsumerOption) (*Poller, error)
NewPoller creates a new Poller with the provided consumer options.
type ProduceTracer ¶
ProduceTracer is an interface for injecting tracing information into Kafka messages.
type Producer ¶
type Producer struct { *Writer // contains filtered or unexported fields }
Producer is a high-level API that extends Writer with time and count based auto flush and implements a Shutdown hook.
func NewProducer ¶
func NewProducer(options ...ProducerOption) (*Producer, error)
NewProducer creates a new Producer instance with the provided configuration options.
If Writer is not set in ProducerConfig then creates a new kafka.Writer with the options passed to the function, and adds addition params
kafka.Writer{ Addr: kafka.TCP(config.Brokers...), Topic: config.Topic, Balancer: &kafka.Hash{}, Transport: &kafka.Transport{ SASL: config.SASLMechanism, TLS: config.TLSConfig, }, Completion: kLog.DeliveryReport, RequiredAcks: kafka.RequiredAcks(config.RequiredAcks), Async: config.Async, Logger: &kafkaLogger{ Log: logger.NewResourceLogger(config.ModuleName + ":InfoLog"), ctx: ctx, isError: false, }, ErrorLogger: &kafkaLogger{ isError: true, Log: logger.NewResourceLogger(config.ModuleName + ":ErrorLog"), ctx: ctx, }, }
func (*Producer) Produce ¶
func (k *Producer) Produce(ctx context.Context, topic, key string, message []byte, headers map[string]string) (err error)
Produce writes a message to a specific topic with the given key and headers. Appends correlation and user identity header.
func (*Producer) ProduceMessage ¶
func (k *Producer) ProduceMessage(ctx context.Context, key string, message *utils.Message, headers map[string]string) (err error)
ProduceMessage writes a message (utils.Message) to the topic with the given key and headers. Appends correlation and user identity header.
func (*Producer) ProduceMessageWithTopic ¶
func (k *Producer) ProduceMessageWithTopic(ctx context.Context, topic, key string, message *utils.Message, headers map[string]string) (err error)
ProduceMessageWithTopic writes a message (utils.Message) to a specific topic with the given key and headers. Appends correlation and user identity header.
type ProducerConfig ¶
type ProducerConfig struct { *CredConfig // Embeds CredConfig for credential and connection details. RequiredAcks int // Number of acknowledgments required from Kafka. MaxBuffer int // Maximum buffer size for the producer. AutoFlushInterval uint64 // Interval in milliseconds to auto flush messages. Async bool // Flag to indicate if the producer should work asynchronously. Batch bool // Flag to indicate if messages should be batched. Topic string // Kafka topic to produce messages to. ModuleName string // Name of the module for logging. Log log.Log // Logger instance. Trace ProduceTracer // Tracer for producing messages. Writer *kafka.Writer // Writer for producing messages. }
ProducerConfig holds the configuration for a Kafka producer.
func GetDefaultProducerConfig ¶
func GetDefaultProducerConfig() *ProducerConfig
GetDefaultProducerConfig creates a new ProducerConfig with values from environment variables or default values.
Environment Variables - KAFKA__PRODUCER__ACKNOWLEDGE: Sets [RequiredAcks] - KAFKA__PRODUCER__MAX_BUFFER: Sets [MaxBuffer] - KAFKA__PRODUCER__AUTO_FLUSH_INTERVAL: Sets [AutoFlushInterval] - KAFKA__PRODUCER__ASYNC: Sets [Async] - KAFKA__PRODUCER__BATCH: Sets [Batch]
type ProducerOption ¶
type ProducerOption func(*ProducerConfig)
ProducerOption defines a function signature for applying options for kafka producer.
func WithAcknowledge ¶
func WithAcknowledge(ack int) ProducerOption
WithAcknowledge sets the acknowledge value for kafka producer.
func WithAsync ¶
func WithAsync(async bool) ProducerOption
WithAsync sets the async flag for kafka producer.
func WithAutoFlushInterval ¶
func WithAutoFlushInterval(interval uint64) ProducerOption
WithAutoFlushInterval sets the auto flush interval in milliseconds for kafka producer.
func WithBatch ¶
func WithBatch(batch bool) ProducerOption
WithBatch sets the batch flag for kafka producer.
func WithPoducerLogger ¶
func WithPoducerLogger(logger log.Log) ProducerOption
WithPoducerLogger sets the logger for kafka producer.
func WithProducerBuffer ¶
func WithProducerBuffer(buffer int) ProducerOption
WithProducerBuffer sets the batch max buffer value for kafka producer.
func WithProducerCredConfig ¶
func WithProducerCredConfig(credConfig *CredConfig) ProducerOption
WithProducerCredConfig sets the Kafka credentials config for kafka producer.
func WithProducerModuleName ¶
func WithProducerModuleName(name string) ProducerOption
WithProducerModuleName sets the module name for kafka producer.
func WithProducerTopic ¶
func WithProducerTopic(topic string) ProducerOption
WithProducerTopic sets the topic for kafka producer.
func WithProducerTracer ¶
func WithProducerTracer(tracer ProduceTracer) ProducerOption
WithProducerTracer sets the tracer for kafka producer.
func WithWriter ¶
func WithWriter(writer *kafka.Writer) ProducerOption
WithWriter sets the kafka.Writer for kafka producer.
type Reader ¶
type Reader struct { *kafka.Reader // contains filtered or unexported fields }
Reader extends kafka.Reader with batch commit, tracing, and a StatusCheck hook.
func NewReader ¶
func NewReader(ctx context.Context, log log.Log, r *kafka.Reader, bufferSize uint, tr ConsumerTracer) *Reader
NewReader creates a new Reader with the provided context, logger, Kafka reader, buffer size, and tracer.
func (*Reader) StatusCheck ¶
StatusCheck returns the current stats of the Kafka reader.
type Writer ¶
type Writer struct { *kafka.Writer // contains filtered or unexported fields }
Writer extends kafka.Writer with batch writing, tracing, and StatusCheck hook.
func NewWriter ¶
func NewWriter(ctx context.Context, w *kafka.Writer, bufferLen int, log log.Log, tr ProduceTracer) *Writer
NewWriter creates a new Writer.
func (*Writer) StatusCheck ¶
StatusCheck returns the current status of the Writer.