kafka

package
v6.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package kafka wraps github.com/segmentio/kafka-go API and adds additional functionalities such as tracing and buffering.

Example
ctx := GetCorrelationContext()
co, _ := kafka.NewPoller(kafka.WithConsumerTopic([]string{"test.topic1"}))
defer co.Close(ctx)
pr, _ := kafka.NewProducer(kafka.WithProducerTopic("test.topic1"))
defer pr.Close(ctx)
ch := make(chan *cKafka.Message, 100)
var wg sync.WaitGroup
totalCount := 100000
wg.Add(1)
uuidVal := "TestKafkaPoll" + uuid.NewString()
go func() {
	defer wg.Done()
	for i := 0; i < totalCount; i++ {
		ctx := GetCorrelationContext()
		pr.ProduceMessage(ctx, strconv.Itoa(i), &utils.Message{
			Event: uuidVal,
		}, nil)
	}
	pr.Flush(ctx)
}()
tCtx, cancel := context.WithTimeout(ctx, time.Second*45)
defer cancel()
st := time.Now()
go co.Poll(tCtx, ch)
count := 0
msgCount := 0
for i := range ch {
	m, err := kafka.LoadMessage(i)
	msgCount++
	if m.Event == uuidVal {
		count++
	}
	if totalCount == count {
		cancel()
	}
	if err != nil {
		KafkaTestLogger.Error(ctx, "parse error", err)
	}
	KafkaTestLogger.Info(ctx, "Kafka message", m)
}
KafkaTestLogger.Info(ctx, "Total matched", count)
KafkaTestLogger.Info(ctx, "Total received", msgCount)
wg.Wait()
KafkaTestLogger.Notice(ctx, "Time taken in ms", time.Now().Sub(st)/1000000)
Output:

Example (ConfluentKafka)
ctx := GetCorrelationContext()
cred := kafka.GetDefaultCredConfig()
cred.TLSConfig = &tls.Config{ //For Confluent kafka
	MinVersion: tls.VersionTLS12,
}
co, _ := kafka.NewPoller(kafka.WithConsumerCredConfig(cred), kafka.WithConsumerTopic([]string{"test.topic1"}))
defer co.Close(ctx)
pr, _ := kafka.NewProducer(kafka.WithProducerCredConfig(cred))
defer pr.Close(ctx)
Output:

Index

Examples

Constants

View Source
const (
	ModuleProducer = "KafkaProducer"
	ModuleConsumer = "KafkaConsumer"
)

Variables

View Source
var ErrReaderBufferFull = fmt.Errorf("Reader.StoreMessage: Buffer full")

ErrReaderBufferFull is returned when the buffer is full and a message cannot be stored.

View Source
var ErrWriterBufferFull = fmt.Errorf("Reader.Send: Buffer full")

ErrWriterBufferFull is an error indicating that the writer buffer is full.

Functions

func LoadMessage

func LoadMessage(src *kafka.Message) (*utils.Message, error)

LoadMessage loads a message from the given Kafka message.

func ValidateConsumerConfig

func ValidateConsumerConfig(config *ConsumerConfig) error

func ValidateProducerConfig

func ValidateProducerConfig(config *ProducerConfig) error

Types

type ConsumerConfig

type ConsumerConfig struct {
	*CredConfig                       // Embeds CredConfig for credential and connection details.
	GroupID            string         // Consumer group id
	AutoCommit         bool           // Flag to enable auto commit for consumed messages
	MaxBuffer          uint           // Count of message for batch commit
	AutoCommitInterval uint64         // Interval in milliseconds to auto commit messages.
	Log                log.Log        // Logger instance
	Trace              ConsumerTracer // Tracer for consuming messages
	Reader             *kafka.Reader  // Reader for consuming messages
	Topics             []string       // Topics to consume
	ModuleName         string         // Name of the module for logging.
	ClientId           string         // Name of the service for client id
}

ConsumerConfig represents the configuration for a Kafka consumer.

func GetDefaultConsumerConfig

func GetDefaultConsumerConfig() *ConsumerConfig

GetDefaultConsumerConfig creates a new ConsumerConfig with values from environment variables or default values.

Environment Variables
- SERVICE_NAME: Sets [ClientId]
- KAFKA__CONSUMER__GROUP_ID: Sets [GroupID]
- KAFKA__CONSUMER__TOPICS: Sets [Topics]
- KAFKA__CONSUMER__AUTO_COMMIT: Sets [AutoCommit]
- KAFKA__CONSUMER__MAX_BUFFER: Sets [MaxBuffer]
- KAFKA__CONSUMER__AUTO_COMMIT_INTERVAL: Sets [AutoCommitInterval]

type ConsumerOption

type ConsumerOption func(*ConsumerConfig)

ConsumerOption defines a function type that modifies the ConsumerConfig.

func WithAutoCommit

func WithAutoCommit(autoCommit bool) ConsumerOption

WithAutoCommit sets the auto-commit option for the Kafka consumer.

func WithAutoCommitInterval

func WithAutoCommitInterval(intervalInMs uint64) ConsumerOption

WithAutoCommitInterval sets the auto-commit interval for the Kafka consumer.

func WithConsumerBuffer

func WithConsumerBuffer(maxBuffer uint) ConsumerOption

WithConsumerBuffer sets the maximum buffer size for the Kafka consumer.

func WithConsumerCredConfig

func WithConsumerCredConfig(creds *CredConfig) ConsumerOption

WithConsumerCredConfig sets the Kafka credentials configuration.

func WithConsumerLogger

func WithConsumerLogger(logger log.Log) ConsumerOption

WithConsumerLogger sets the logger for the Kafka consumer.

func WithConsumerModuleName

func WithConsumerModuleName(name string) ConsumerOption

WithConsumerModuleName sets the module name for kafka consumer.

func WithConsumerServiceName

func WithConsumerServiceName(name string) ConsumerOption

WithConsumerServiceName sets the service name for kafka consumer.

func WithConsumerTopic

func WithConsumerTopic(topics []string) ConsumerOption

WithConsumerTopic sets the topics for the Kafka consumer.

func WithConsumerTracer

func WithConsumerTracer(tracer ConsumerTracer) ConsumerOption

WithConsumerTracer sets the tracer for the Kafka consumer.

func WithGroupID

func WithGroupID(groupID string) ConsumerOption

WithGroupID sets the group ID for the Kafka consumer.

func WithReader

func WithReader(reader *kafka.Reader) ConsumerOption

WithReader sets the Kafka reader for the consumer.

type ConsumerTracer

type ConsumerTracer interface {
	span.SpanOp
}

ConsumerTracer defines an interface for tracing consumer operations.

type CredConfig

type CredConfig struct {
	Brokers       []string       // List of Kafka broker addresses.
	SASLMechanism sasl.Mechanism // SASL mechanism for authentication.
	TLSConfig     *tls.Config    // TLS configuration for secure connections.
}

CredConfig holds the configuration for Kafka credentials and connection details.

Environment Variables
- KAFAK__BROKER: Sets [Brokers]
- KAFAK__SASL__TYPE: Sets [SASLType]

func GetDefaultCredConfig

func GetDefaultCredConfig() *CredConfig

GetDefaultCredConfig returns a default CredConfig with values from environment variables or default values.

type Message

type Message struct {
	*kafka.Message
	// contains filtered or unexported fields
}

Message wraps a kafka.Message and provides additional functionalities.

func (*Message) GetBody

func (m *Message) GetBody() string

GetBody returns the message value as a string. It caches the result for subsequent calls.

func (*Message) GetHeaders

func (m *Message) GetHeaders() map[string]string

GetHeaders returns the headers of the Kafka message as a map. It lazily initializes and caches the headers map on first access.

func (*Message) GetKey

func (m *Message) GetKey() string

GetKey returns the key of the Kafka message as a string.

func (*Message) GetMeta

func (m *Message) GetMeta() map[string]any

GetMeta returns a map containing metadata of the Kafka message, including key, headers, partition, offset, topic, and time.

func (*Message) LoadBody

func (m *Message) LoadBody(v any) error

LoadBody unmarshals the message value into the provided interface.

type Poller

type Poller struct {
	*Reader
	// contains filtered or unexported fields
}

Poller is a high-level API that extends Reader with time and count-based auto commit and implements a shutdown hook.

If Reader is not set in ConsumerConfig then creates a new kafka.Reader with the options passed to the function, and adds addition params

readerConfig := kafka.ReaderConfig{
	Brokers:           config.Brokers,
	GroupID:           config.GroupID,
	GroupTopics:       config.Topics,
	HeartbeatInterval: time.Second,
	QueueCapacity:     int(config.MaxBuffer),
	MaxBytes:          10e6, // 10MB,
	Dialer: &kafka.Dialer{
		Timeout:       10 * time.Second,
		DualStack:     true,
		SASLMechanism: config.SASLMechanism,
		TLS:           config.TLSConfig,
	},
	Logger: &kafkaLogger{
		Log:     logger.NewResourceLogger(config.ModuleName + ":InfoLog"),
		ctx:     ctx,
		isError: false,
	},
	ErrorLogger: &kafkaLogger{
		Log:     logger.NewResourceLogger(config.ModuleName + ":ErrorLog"),
		ctx:     ctx,
		isError: true,
	},
}
config.Reader = kafka.NewReader(readerConfig)

func NewPoller

func NewPoller(options ...ConsumerOption) (*Poller, error)

NewPoller creates a new Poller with the provided consumer options.

func (*Poller) Close

func (k *Poller) Close(ctx context.Context) error

Close closes the Poller and waits for any ongoing operations to complete.

func (*Poller) Poll

func (k *Poller) Poll(ctx context.Context, ch chan<- *kafka.Message) error

Poll fetches messages from the broker and passes them to the provided channel. This function is meant to be run as a goroutine.

type ProduceTracer

type ProduceTracer interface {
	KafkaInject(ctx context.Context, msg *kafka.Message)
	span.SpanOp
}

ProduceTracer is an interface for injecting tracing information into Kafka messages.

type Producer

type Producer struct {
	*Writer
	// contains filtered or unexported fields
}

Producer is a high-level API that extends Writer with time and count based auto flush and implements a Shutdown hook.

func NewProducer

func NewProducer(options ...ProducerOption) (*Producer, error)

NewProducer creates a new Producer instance with the provided configuration options.

If Writer is not set in ProducerConfig then creates a new kafka.Writer with the options passed to the function, and adds addition params

kafka.Writer{
	Addr:     kafka.TCP(config.Brokers...),
	Topic:    config.Topic,
	Balancer: &kafka.Hash{},
	Transport: &kafka.Transport{
		SASL: config.SASLMechanism,
		TLS:  config.TLSConfig,
	},
	Completion:   kLog.DeliveryReport,
	RequiredAcks: kafka.RequiredAcks(config.RequiredAcks),
	Async:        config.Async,
	Logger: &kafkaLogger{
		Log:     logger.NewResourceLogger(config.ModuleName + ":InfoLog"),
		ctx:     ctx,
		isError: false,
	},
	ErrorLogger: &kafkaLogger{
		isError: true,
		Log:     logger.NewResourceLogger(config.ModuleName + ":ErrorLog"),
		ctx:     ctx,
	},
}

func (*Producer) Close

func (k *Producer) Close(ctx context.Context) error

Close gracefully closes the Producer, ensuring all messages are flushed.

func (*Producer) Name

func (k *Producer) Name(ctx context.Context) string

Name returns the module name of the Producer.

func (*Producer) Produce

func (k *Producer) Produce(ctx context.Context, topic, key string, message []byte, headers map[string]string) (err error)

Produce writes a message to a specific topic with the given key and headers. Appends correlation and user identity header.

func (*Producer) ProduceMessage

func (k *Producer) ProduceMessage(ctx context.Context, key string, message *utils.Message, headers map[string]string) (err error)

ProduceMessage writes a message (utils.Message) to the topic with the given key and headers. Appends correlation and user identity header.

func (*Producer) ProduceMessageWithTopic

func (k *Producer) ProduceMessageWithTopic(ctx context.Context, topic, key string, message *utils.Message, headers map[string]string) (err error)

ProduceMessageWithTopic writes a message (utils.Message) to a specific topic with the given key and headers. Appends correlation and user identity header.

func (*Producer) Shutdown

func (k *Producer) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the Producer, ensuring all messages are flushed.

type ProducerConfig

type ProducerConfig struct {
	*CredConfig                     // Embeds CredConfig for credential and connection details.
	RequiredAcks      int           // Number of acknowledgments required from Kafka.
	MaxBuffer         int           // Maximum buffer size for the producer.
	AutoFlushInterval uint64        // Interval in milliseconds to auto flush messages.
	Async             bool          // Flag to indicate if the producer should work asynchronously.
	Batch             bool          // Flag to indicate if messages should be batched.
	Topic             string        // Kafka topic to produce messages to.
	ModuleName        string        // Name of the module for logging.
	Log               log.Log       // Logger instance.
	Trace             ProduceTracer // Tracer for producing messages.
	Writer            *kafka.Writer // Writer for producing messages.
}

ProducerConfig holds the configuration for a Kafka producer.

func GetDefaultProducerConfig

func GetDefaultProducerConfig() *ProducerConfig

GetDefaultProducerConfig creates a new ProducerConfig with values from environment variables or default values.

Environment Variables
- KAFKA__PRODUCER__ACKNOWLEDGE: Sets [RequiredAcks]
- KAFKA__PRODUCER__MAX_BUFFER: Sets [MaxBuffer]
- KAFKA__PRODUCER__AUTO_FLUSH_INTERVAL: Sets [AutoFlushInterval]
- KAFKA__PRODUCER__ASYNC: Sets [Async]
- KAFKA__PRODUCER__BATCH: Sets [Batch]

type ProducerOption

type ProducerOption func(*ProducerConfig)

ProducerOption defines a function signature for applying options for kafka producer.

func WithAcknowledge

func WithAcknowledge(ack int) ProducerOption

WithAcknowledge sets the acknowledge value for kafka producer.

func WithAsync

func WithAsync(async bool) ProducerOption

WithAsync sets the async flag for kafka producer.

func WithAutoFlushInterval

func WithAutoFlushInterval(interval uint64) ProducerOption

WithAutoFlushInterval sets the auto flush interval in milliseconds for kafka producer.

func WithBatch

func WithBatch(batch bool) ProducerOption

WithBatch sets the batch flag for kafka producer.

func WithPoducerLogger

func WithPoducerLogger(logger log.Log) ProducerOption

WithPoducerLogger sets the logger for kafka producer.

func WithProducerBuffer

func WithProducerBuffer(buffer int) ProducerOption

WithProducerBuffer sets the batch max buffer value for kafka producer.

func WithProducerCredConfig

func WithProducerCredConfig(credConfig *CredConfig) ProducerOption

WithProducerCredConfig sets the Kafka credentials config for kafka producer.

func WithProducerModuleName

func WithProducerModuleName(name string) ProducerOption

WithProducerModuleName sets the module name for kafka producer.

func WithProducerTopic

func WithProducerTopic(topic string) ProducerOption

WithProducerTopic sets the topic for kafka producer.

func WithProducerTracer

func WithProducerTracer(tracer ProduceTracer) ProducerOption

WithProducerTracer sets the tracer for kafka producer.

func WithWriter

func WithWriter(writer *kafka.Writer) ProducerOption

WithWriter sets the kafka.Writer for kafka producer.

type Reader

type Reader struct {
	*kafka.Reader
	// contains filtered or unexported fields
}

Reader extends kafka.Reader with batch commit, tracing, and a StatusCheck hook.

func NewReader

func NewReader(ctx context.Context, log log.Log, r *kafka.Reader, bufferSize uint, tr ConsumerTracer) *Reader

NewReader creates a new Reader with the provided context, logger, Kafka reader, buffer size, and tracer.

func (*Reader) Close

func (k *Reader) Close(ctx context.Context) error

Close closes the Kafka reader.

func (*Reader) Commit

func (k *Reader) Commit(ctx context.Context) error

Commit commits the messages stored in the buffer.

func (*Reader) StatusCheck

func (k *Reader) StatusCheck(ctx context.Context) (any, error)

StatusCheck returns the current stats of the Kafka reader.

func (*Reader) StoreMessage

func (k *Reader) StoreMessage(ctx context.Context, msg *kafka.Message) error

StoreMessage stores a message in the buffer. Returns an error if the buffer is full.

type Writer

type Writer struct {
	*kafka.Writer
	// contains filtered or unexported fields
}

Writer extends kafka.Writer with batch writing, tracing, and StatusCheck hook.

func NewWriter

func NewWriter(ctx context.Context, w *kafka.Writer, bufferLen int, log log.Log, tr ProduceTracer) *Writer

NewWriter creates a new Writer.

func (*Writer) Close

func (w *Writer) Close(ctx context.Context) error

Close closes the Writer, ensuring all messages are flushed.

func (*Writer) Flush

func (w *Writer) Flush(ctx context.Context) error

Flush writes the message batch to the broker.

func (*Writer) StatusCheck

func (w *Writer) StatusCheck(ctx context.Context) (any, error)

StatusCheck returns the current status of the Writer.

func (*Writer) WriteMessage

func (w *Writer) WriteMessage(ctx context.Context, msg *kafka.Message) error

WriteMessage writes the message to the broker in async mode or batch mode.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL