kefka

package module
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2022 License: MIT Imports: 10 Imported by: 0

README

Kefka

Kefka is a simple library that wraps the official Confluent Go Kafka library. My motivation for creating this library came from working with so many code bases that needed to either consume and/or produce messages from/to Kafka. I found myself writing a lot of the same code over and over and decided to move it into an open source library. Hopefully other people find it useful.

Kefka is named after Kefka Palazzo, the main antagonist of the critically acclaimed video game Final Fantasy VI.

Usage

At the heart of Kefka are three types:

  1. Producer - Produces messages to Kafka asynchronously, although it has semantics to emulate producing synchronously. The Producer type also supports automatically marshalling of key and values.

  2. Consumer - Consumes messages from Kafka when using a consumer group. As messages are read/consumed they are handed off to a MessageHandler.

  3. Reader - Similar to Consumer type, but the Reader does not support using consumer groups. Reader is meant for use cases where you want to read through specific topics and partitions from a specified offset without any need to coordinate between different consumers. IE its okay for multiple consumers to be reading the same messages.

Kefka also provides some convenient helper functions for handling common tasks for Kafka.

For examples please refer to the examples directory in this repo.

Documentation

Index

Constants

View Source
const (
	// LastOffset specifies to use the most recent offset available for a partition
	LastOffset kafka.Offset = -1
	// FirstOffset specifies to use the least recent offset available for a partition
	FirstOffset kafka.Offset = -2
)
View Source
const (
	// DefaultPollTimeout is the default timeout when polling events from Kafka.
	DefaultPollTimeout = time.Second * 10
)

Variables

This section is empty.

Functions

func FlushAll

func FlushAll(p kafkaProducer)

FlushAll will continuously call Flush on the Kafka Producer until there are zero messages awaiting delivery.

This function is blocking and should really only be called if you need to force the internal queue empty. An example might be an application exiting.

func LagForTopicPartition

func LagForTopicPartition(client MetadataClient, topic string, partition int) (int64, error)

LagForTopicPartition fetches the current lag for a given consumer, topic and partition.

Lag is meant to be used when working in a consumer group. To fetch how many messages are in a given topic/partition use MessageCount instead.

func MessageCount

func MessageCount(client MetadataClient, topic string, partition int) (int64, error)

MessageCount returns the count of messages for a given topic/partition.

func ReadTopicPartitions

func ReadTopicPartitions(ctx context.Context, opts ReaderOptions) error

ReadTopicPartitions consumes messages from Kafka outside a consumer group. A new Confluent Kafka consumer is created from the configuration provided but ReadTopicPartitions automatically will add or overwrite specific values to ensure it guarantees certain behaviors. The following Kafka configuration cannot be overridden.

enable.partition.eof -> true
enable.auto.commit -> false
group.id -> kefkareader

ReadTopicPartitions is blocking and will run forever unless the context is either cancelled or exceeds a deadline. In most use cases you'll want to call ReadTopicPartitions from a new goroutine.

ReadTopicPartitions is capable or consuming multiple topics/partitions. But, the through put will likely be higher if this function is used with one topic and one partition.

func SyncProduce

func SyncProduce(ctx context.Context, p kafkaProducer, topic string, key, value []byte) error

SyncProduce is a convenient function for producing messages synchronously. Technically the Confluent Kafka Producer doesn't support producing events synchronously. Instead, this function creates a delivery channel and wait on it for the delivery report/acknowledgement.

SyncProduce accepts a context so that this operation can be cancelled or timeout. However, it is very important to note this does not cancel producing the message. It simply cancels waiting on the delivery report. The message still may be delivered.

func SyncProduceMessage

func SyncProduceMessage(ctx context.Context, p kafkaProducer, msg *kafka.Message) error

SyncProduceMessage is a convenient function for producing messages synchronously. Technically the Confluent Kafka Producer doesn't support producing events synchronously. Instead, this function creates a delivery channel and wait on it for the delivery report/acknowledgement.

SyncProduceMessage accepts a context so that this operation can be cancelled or timeout. However, it is very important to note this does not cancel producing the message. It simply cancels waiting on the delivery report. The message still may be delivered.

Types

type CancelFunc

type CancelFunc func()

CancelFunc is a function used to cancel Consumer operations

func Consume

func Consume(consumer *kafka.Consumer, handler MessageHandler, errCb ErrorCallback) CancelFunc

Consume uses the provided Consumer and reads messages from Kafka in a separate goroutine. A CancelFunc is returned to cancel/stop consumption of messages from Kafka.

The consumer and handler parameters are mandatory, while the ErrorCallback is optionally. Providing an ErrorCallback is highly recommended, otherwise error will end up in the void.

Calling Close on kafka.Consumer will cause all hell to break loose. Ensure the CancelFunc is called first, and then the kafka.Consumer can be safely closed.

type Commit

type Commit func()

Commit is a function that commits the offsets to Kafka synchronously. Using synchronous writes can significantly impact throughput and should be used sparingly.

If using auto commit there is no need to invoke this function.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a type for consumer messages from Kafka.

Under the hood Consumer uses Confluent Kafka consumer type. Consumer is in essence a wrapper around the Confluent Kafka Go library.

The zero value of Consumer is not usable. Instances of Consumer should be created using the NewConsumer function.

The Consumer type is only meant to be used when using Kafka consumer groups. If not using Consumer groups use the Reader type instead. The `group.id` property must be set in the Kafka ConfigMap.

The Kafka configuration provided is very important to the behavior of the Consumer. The Consumer type does not support all the features of the underlying Confluent Kafka client. It does however, support manual commits if that behavior is required. The MessageHandler accepts a Kafka message and a Commit func. If auto commits are disabled the caller must invoke the Commit function when they want to commit offsets back to Kafka. If auto commits are on the Commit function can be ignored. In fact, it should not be called at all if using auto commits.

func NewConsumer

func NewConsumer(opts ConsumerOptions) (*Consumer, error)

NewConsumer creates and initializes a new instance of the Consumer type.

If the API is misused (missing required fields, not setting group.id in the Kafka config, etc.) this function will panic. If the Consumer cannot be created with the provided configuration or subscribing to the provided topic fails a non-nil error value will be returned.

func (*Consumer) Assignments

func (c *Consumer) Assignments() ([]kafka.TopicPartition, error)

Assignments fetches and returns the currently assigned topics and partitions for the Consumer.

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops polling messages/events from Kafka and cleans up resources including calling Close on the underlying Confluent Kafka consumer. Once Close has been called the instance of Consumer is no longer usable.

This function should only be called once.

func (*Consumer) Consume

func (c *Consumer) Consume()

Consume begins polling Kafka for messages/events passing the read messages off to the provided MessageHandler.

This function is blocking and in most use cases it should be called in a separate goroutine. It will continue to run until Close is called or the program exits.

func (*Consumer) Lag

func (c *Consumer) Lag() (map[string]int64, error)

Lag calculates the lag for each topic/partition assigned to the Consumer.

Note: It does not calculate the lag for entire consumer group if there are other consumers in the group.

func (*Consumer) LagForTopicPartition

func (c *Consumer) LagForTopicPartition(topic string, partition int) (int64, error)

LagForTopicPartition returns the current lag of the Consumer for a specific topic/partition.

type ConsumerOptions

type ConsumerOptions struct {

	// The Kafka configuration that is used to create the underlying Confluent Kafka
	// Consumer type. This is a required field. A zero value (nil) will cause a panic.
	KafkaConfig *kafka.ConfigMap

	// The handler that will be handed the message from Kafka and process it. This is
	// a required field. A zero value (nil) will cause a panic.
	Handler MessageHandler

	// The topic to consume. This is a required field. A zero value ("") will cause
	// a panic.
	Topic string

	// Configures the timeout polling messages from Kafka. This field is optional.
	// If the zero-value is provided DefaultPollTimeout will be used.
	PollTimeout time.Duration

	// An optional callback that is invoked when an error occurs polling/reading
	// messages from Kafka.
	//
	// While optional, it is highly recommended to provide an ErrorCallback.
	// Otherwise, errors from the underlying Confluent Kafka Consumer are discarded.
	// Ideally, these errors should be logged and/or capture metrics.
	ErrorHandler ErrorCallback

	// Allows plugging in a third party Logger. By default, the Logger from the
	// standard library will be used if one is not provided at INFO level.
	Logger Logger
}

ConsumerOptions contains the configuration options to instantiate and initialize a Consumer.

Note: Some of the fields are required and will cause a panic. Take note of the GoDoc comments for each of the fields in the struct.

type ErrorCallback

type ErrorCallback func(err error)

ErrorCallback is a function that is invoked with an error value when an error occurs processing messages from Kafka.

type LogLevel

type LogLevel uint

LogLevel represents the level/importance of a log message

const (
	// DebugLevel is for logs that are useful for debugging or tracing the behavior
	// of the code.
	DebugLevel LogLevel = iota
	// InfoLevel is for information log messages that do not need to be actioned but
	// are still useful.
	InfoLevel
	// WarnLevel is for log messages that may need to be actioned/addressed but aren't
	// necessarily errors.
	WarnLevel
	// ErrorLevel is for logs regarding errors
	ErrorLevel
	// FatalLevel are for catastrophic errors for which the system cannot recover or
	// continue
	FatalLevel
)

type Logger

type Logger interface {
	Printf(lvl LogLevel, format string, args ...any)
}

Logger is an interface type that defines the logging behavior of Kefka. This interface can be implemented to allow any third party logger to integrate into Kefka.

type LoggerFunc

type LoggerFunc func(lvl LogLevel, format string, args ...any)

LoggerFunc is a convenient type to implement Logger interface without needing to create a new type.

func (LoggerFunc) Printf

func (l LoggerFunc) Printf(level LogLevel, msg string, args ...any)

type MarshallFunc

type MarshallFunc func(v any) ([]byte, error)

func JsonMarshaller

func JsonMarshaller() MarshallFunc

func MsgpackMarshaller

func MsgpackMarshaller() MarshallFunc

func StringMarshaller

func StringMarshaller() MarshallFunc

StringMarshaller returns a MarshallFunc that is only capable of marshalling a string or any type that implements the fmt.Stringer interface. If any other type is provided an error will be returned.

StringMarshaller is useful for marshalling keys which are typically a string.

type MessageHandler

type MessageHandler interface {
	Handle(msg *kafka.Message, ack Commit) error
}

MessageHandler is a type that handles/processes messages from Kafka.

Implementations of MessageHandler are expected to perform all processing and business logic around the received message. Implementations of MessageHandler should handle any and all retry logic, error handling, etc. A MessageHandler can return an error but the error is ignored by the underlying Consumer and Reader types. However, it does allow a MessageHandler to be wrapped to handle errors and retries in an elegant way.

Implementations of MessageHandler should call the Commit function when using manual/synchronous commits with Kafka. Otherwise, it should not be called as it can have negative impacts on throughput/performance.

type MessageHandlerFunc

type MessageHandlerFunc func(msg *kafka.Message, ack Commit) error

MessageHandlerFunc is a convenient way to satisfy the MessageHandler interface without creating a type.

See MessageHandler for more details.

func (MessageHandlerFunc) Handle

func (m MessageHandlerFunc) Handle(msg *kafka.Message, ack Commit) error

type MetadataClient

type MetadataClient interface {
	Assignment() (partitions []kafka.TopicPartition, err error)
	Committed(partitions []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
	QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low int64, high int64, err error)
}

type OffsetWatermarks

type OffsetWatermarks struct {
	Low   int64
	High  int64
	Error error
}

type PartitionEOFCallback

type PartitionEOFCallback func(topic string, partition int, offset int64)

PartitionEOFCallback is a function type that is invoked when the end of the partition is reached.

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer is a type that supports producing messages to a Kafka cluster. Producer wraps and uses Confluent Kafka producer type under the hood. Producing messages asynchronously and synchronously are supported.

Producer can automatically handle marshalling of keys and values.

The zero-value is not usable and a Producer should be instantiated with the NewProducer function.

func NewProducer

func NewProducer(opts ProducerOptions) (*Producer, error)

NewProducer creates and initializes a new Producer.

If an error occurs while creating the Confluent Kafka producer from the provided configuration a non-nil error value will be returned. If the ProducerOptions is missing required properties (misusing the API) this function will panic.

func (*Producer) Close

func (p *Producer) Close()

Close delegates to the close method on the underlying Confluent Kafka consumer and closes all channels. After Close has been called the instance of Producer is no longer usable.

As a best practice Flush or FlushAll should be called before calling close to ensure all produce messages/events have been transmitted to the Kafka brokers before closing.

func (*Producer) Flush

func (p *Producer) Flush(timeout time.Duration) int

Flush and wait for outstanding messages and requests to complete delivery. Includes messages on ProduceChannel. Runs until value reaches zero or on timeoutMs. Returns the number of outstanding events still un-flushed.

func (*Producer) Produce

func (p *Producer) Produce(topic string, key any, val any, deliveryChan chan kafka.Event) error

Produce produces a single message asynchronously. Messages are enqueued into and internal transmit queue, thus returning immediately. If the message could not be enqueued a non-nil error value is returned.

Delivery reports are sent on the provided deliveryChan if provided. If a nil delivery channel is provided delivery reports are polled from the producers events but will not be available to the caller. In essence, if a delivery chan is not provided this method is fire and forget. It will ensure the message was enqueued but will not verify it was delivered. If the caller needs to verify messages were successfully delivered to the Kafka brokers a delivery chan should be provided listen for the delivery reports to confirm delivery.

Produce automatically handles marshalling the key and value to binary using the provided MarshallFunc.

func (*Producer) ProduceMessage

func (p *Producer) ProduceMessage(msg *kafka.Message, deliveryChan chan kafka.Event) error

ProduceMessage produces a single message asynchronously. Messages are enqueued into and internal transmit queue, thus returning immediately. If the message could not be enqueued a non-nil error value is returned.

Delivery reports are sent on the provided deliveryChan if provided. If a nil delivery channel is provided delivery reports are polled from the producers events but will not be available to the caller. In essence, if a delivery chan is not provided this method is fire and forget. It will ensure the message was enqueued but will not verify it was delivered. If the caller needs to verify messages were successfully delivered to the Kafka brokers a delivery chan should be provided listen for the delivery reports to confirm delivery.

ProduceMessage in contrast to Produce takes a kafka.Message and doesn't handle marshalling the key and values. However, it provides more control over the message if you need to produce to a specific partition, set headers, etc.

func (*Producer) SyncProduce

func (p *Producer) SyncProduce(ctx context.Context, topic string, key any, val any) error

SyncProduce produces a single message synchronously.

Technically the underlying Confluent Kafka library doesn't directly support producing events synchronously. Instead, SyncProduce produces the message asynchronously and awaits notification of delivery on a channel. If the context is done before receiving on the delivery chan this method will return with an error but there is a possibility the message may still be delivered.

func (*Producer) SyncProduceMessage

func (p *Producer) SyncProduceMessage(ctx context.Context, msg *kafka.Message) error

SyncProduceMessage produces a single message asynchronously. SyncProduceMessage accepts a raw kafka.Message. SyncProduceMessage is meant to compliment SyncProduce when the caller needs more control over the message.

Technically the underlying Confluent Kafka library doesn't directly support producing events synchronously. Instead, SyncProduceMessage produces the message asynchronously and awaits notification of delivery on a channel. If the context is done before receiving on the delivery chan this method will return with an error but there is a possibility the message may still be delivered.

type ProducerOptions

type ProducerOptions struct {
	// The Kafka configuration that is used to create the underlying Confluent Kafka
	// Producer type. This is a required field. A zero value (nil) will cause a panic.
	KafkaConfig *kafka.ConfigMap
	// Configures how the Producer will marshall the key into []byte. This is a
	// required field. A zero value (nil) will cause a panic.
	KeyMarshaller MarshallFunc
	// Configures how the Producer will marshall the value into []byte. This is
	// a required field. A zero value (nil) will cause a panic.
	ValueMarshaller MarshallFunc
	// Allows plugging in a third party Logger. By default, the Logger from the
	// standard library will be used if one is not provided at INFO level.
	Logger Logger
}

ProducerOptions is a type containing all the configuration options to instantiate and initialize a Producer.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader is a type for reading messages from Kafka topics/partitions.

Under the hood Reader uses Confluent Kafka consumer type. Reader is in essence a wrapper around the Confluent Kafka Go library for reading messages outside a consumer group.

The zero value of Reader is not usable. Instances of Reader should be created using the NewReader function

In contrast to Consumer, Reader is meant to serve use cases where messages are read from Kafka but not consumed. In other words, offsets are not committed back to Kafka, and it is safe for multiple instances to read the same messages possibly over and over. As messages are read from Kafka they are passed to a provided MessageHandler to handle/process the message. A noop Commit is passed to the MessageHandler, so even if its called it has no effect.

In order to guarantee the behavior of Reader certain Kafka configuration properties are overridden and cannot be altered.

enable.partition.eof -> true
enable.auto.commit -> false
group.id -> kefkareader

func NewReader

func NewReader(opts ReaderOptions) (*Reader, error)

NewReader creates and initializes a new ready to use instance of Reader.

If the API is misused (missing required fields, not setting group.id in the Kafka config, etc.) this function will panic. If the Reader cannot be created with the provided configuration or assigning the topics/partitions fails a non-nil error value will be returned.

func (*Reader) Close

func (r *Reader) Close() error

Close stops the Consumer. After calling Close the Consumer is no longer usable.

func (*Reader) QueryWatermarkOffsets

func (r *Reader) QueryWatermarkOffsets() map[string]OffsetWatermarks

QueryWatermarkOffsets queries Kafka to get the starting and ending offsets for all the topics/partitions specified in the TopicPartitions field in ReaderOptions and returning them as a map where the key is topic|partition -> OffsetWatermarks.

Because each topic and partition is queried individually the caller must check the Error field of the OffsetWatermarks type to ensure the operation succeeded.

func (*Reader) Read

func (r *Reader) Read()

Read begins polling Kafka for messages/events and passing the messages to the configured MessageHandler.

This method is blocking and will run until Close is called. In most cases this method should be called on a new goroutine.

This method should never be called more than once.

type ReaderOptions

type ReaderOptions struct {
	// The Kafka configuration that is used to create the underlying Confluent Kafka
	// Consumer type. This is a required field. A zero value (nil) will cause a panic.
	KafkaConfig *kafka.ConfigMap
	// The handler that will be handed the message from Kafka and process it. This is
	// a required field. A zero value (nil) will cause a panic.
	MessageHandler MessageHandler
	// An optional callback that is invoked when an error occurs polling/reading
	// messages from Kafka.
	//
	// While optional, it is highly recommended to provide an ErrorCallback.
	// Otherwise, errors from the underlying Confluent Kafka Consumer are discarded.
	// Ideally, these errors should be logged and/or capture metrics.
	ErrorCallback ErrorCallback
	// An optional callback that is invoked when the Reader reaches the end of a
	// topic/partition.
	PartitionEOFCallback PartitionEOFCallback
	// The topics and partitions to be read. This is a required field and at least
	// one TopicPartition must be supplied. Each TopicPartition should provide the
	// topic, partition, and starting offset. It is important to note the starting
	// offset of 0 will default to the latest offset if 0 is not a valid offset.
	// Optionally the values FirstOffset and LastOffset can be passed to start at
	// the beginning or end of the partition respectively.
	//
	// Example:
	//	topic := "test"
	//	TopicPartitions: []kafka.TopicPartition{
	//		{
	//			Topic:     &topic,
	//			Partition: 0,
	//			Offset:    kefka.FirstOffset,
	//		},
	//	},
	TopicPartitions kafka.TopicPartitions
	// Configures the timeout polling messages from Kafka. This field is optional.
	// If the zero-value is provided DefaultPollTimeout will be used.
	PollTimeout time.Duration
	// Allows plugging in a third party Logger. By default, the Logger from the
	// standard library will be used if one is not provided at INFO level.
	Logger Logger
}

ReaderOptions is a type representing the configuration options to instantiate and initialize a Reader.

Note: There are fields that are mandatory and if not provided will result in a panic.

type UnmarshallFunc

type UnmarshallFunc func(data []byte, v any) error

func JsonUnmarshaller

func JsonUnmarshaller() UnmarshallFunc

func MsgpackUnmarshaller

func MsgpackUnmarshaller() UnmarshallFunc

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL