Documentation ¶
Index ¶
- Constants
- func FlushAll(p kafkaProducer)
- func LagForTopicPartition(client MetadataClient, topic string, partition int) (int64, error)
- func MessageCount(client MetadataClient, topic string, partition int) (int64, error)
- func ReadTopicPartitions(ctx context.Context, opts ReaderOptions) error
- func SyncProduce(ctx context.Context, p kafkaProducer, topic string, key, value []byte) error
- func SyncProduceMessage(ctx context.Context, p kafkaProducer, msg *kafka.Message) error
- type CancelFunc
- type Commit
- type Consumer
- type ConsumerOptions
- type ErrorCallback
- type LogLevel
- type Logger
- type LoggerFunc
- type MarshallFunc
- type MessageHandler
- type MessageHandlerFunc
- type MetadataClient
- type OffsetWatermarks
- type PartitionEOFCallback
- type Producer
- func (p *Producer) Close()
- func (p *Producer) Flush(timeout time.Duration) int
- func (p *Producer) Produce(topic string, key any, val any, deliveryChan chan kafka.Event) error
- func (p *Producer) ProduceMessage(msg *kafka.Message, deliveryChan chan kafka.Event) error
- func (p *Producer) SyncProduce(ctx context.Context, topic string, key any, val any) error
- func (p *Producer) SyncProduceMessage(ctx context.Context, msg *kafka.Message) error
- type ProducerOptions
- type Reader
- type ReaderOptions
- type UnmarshallFunc
Constants ¶
const ( // LastOffset specifies to use the most recent offset available for a partition LastOffset kafka.Offset = -1 // FirstOffset specifies to use the least recent offset available for a partition FirstOffset kafka.Offset = -2 )
const ( // DefaultPollTimeout is the default timeout when polling events from Kafka. DefaultPollTimeout = time.Second * 10 )
Variables ¶
This section is empty.
Functions ¶
func FlushAll ¶
func FlushAll(p kafkaProducer)
FlushAll will continuously call Flush on the Kafka Producer until there are zero messages awaiting delivery.
This function is blocking and should really only be called if you need to force the internal queue empty. An example might be an application exiting.
func LagForTopicPartition ¶
func LagForTopicPartition(client MetadataClient, topic string, partition int) (int64, error)
LagForTopicPartition fetches the current lag for a given consumer, topic and partition.
Lag is meant to be used when working in a consumer group. To fetch how many messages are in a given topic/partition use MessageCount instead.
func MessageCount ¶
func MessageCount(client MetadataClient, topic string, partition int) (int64, error)
MessageCount returns the count of messages for a given topic/partition.
func ReadTopicPartitions ¶
func ReadTopicPartitions(ctx context.Context, opts ReaderOptions) error
ReadTopicPartitions consumes messages from Kafka outside a consumer group. A new Confluent Kafka consumer is created from the configuration provided but ReadTopicPartitions automatically will add or overwrite specific values to ensure it guarantees certain behaviors. The following Kafka configuration cannot be overridden.
enable.partition.eof -> true enable.auto.commit -> false group.id -> kefkareader
ReadTopicPartitions is blocking and will run forever unless the context is either cancelled or exceeds a deadline. In most use cases you'll want to call ReadTopicPartitions from a new goroutine.
ReadTopicPartitions is capable or consuming multiple topics/partitions. But, the through put will likely be higher if this function is used with one topic and one partition.
func SyncProduce ¶
SyncProduce is a convenient function for producing messages synchronously. Technically the Confluent Kafka Producer doesn't support producing events synchronously. Instead, this function creates a delivery channel and wait on it for the delivery report/acknowledgement.
SyncProduce accepts a context so that this operation can be cancelled or timeout. However, it is very important to note this does not cancel producing the message. It simply cancels waiting on the delivery report. The message still may be delivered.
func SyncProduceMessage ¶
SyncProduceMessage is a convenient function for producing messages synchronously. Technically the Confluent Kafka Producer doesn't support producing events synchronously. Instead, this function creates a delivery channel and wait on it for the delivery report/acknowledgement.
SyncProduceMessage accepts a context so that this operation can be cancelled or timeout. However, it is very important to note this does not cancel producing the message. It simply cancels waiting on the delivery report. The message still may be delivered.
Types ¶
type CancelFunc ¶
type CancelFunc func()
CancelFunc is a function used to cancel Consumer operations
func Consume ¶
func Consume(consumer *kafka.Consumer, handler MessageHandler, errCb ErrorCallback) CancelFunc
Consume uses the provided Consumer and reads messages from Kafka in a separate goroutine. A CancelFunc is returned to cancel/stop consumption of messages from Kafka.
The consumer and handler parameters are mandatory, while the ErrorCallback is optionally. Providing an ErrorCallback is highly recommended, otherwise error will end up in the void.
Calling Close on kafka.Consumer will cause all hell to break loose. Ensure the CancelFunc is called first, and then the kafka.Consumer can be safely closed.
type Commit ¶
type Commit func()
Commit is a function that commits the offsets to Kafka synchronously. Using synchronous writes can significantly impact throughput and should be used sparingly.
If using auto commit there is no need to invoke this function.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a type for consumer messages from Kafka.
Under the hood Consumer uses Confluent Kafka consumer type. Consumer is in essence a wrapper around the Confluent Kafka Go library.
The zero value of Consumer is not usable. Instances of Consumer should be created using the NewConsumer function.
The Consumer type is only meant to be used when using Kafka consumer groups. If not using Consumer groups use the Reader type instead. The `group.id` property must be set in the Kafka ConfigMap.
The Kafka configuration provided is very important to the behavior of the Consumer. The Consumer type does not support all the features of the underlying Confluent Kafka client. It does however, support manual commits if that behavior is required. The MessageHandler accepts a Kafka message and a Commit func. If auto commits are disabled the caller must invoke the Commit function when they want to commit offsets back to Kafka. If auto commits are on the Commit function can be ignored. In fact, it should not be called at all if using auto commits.
func NewConsumer ¶
func NewConsumer(opts ConsumerOptions) (*Consumer, error)
NewConsumer creates and initializes a new instance of the Consumer type.
If the API is misused (missing required fields, not setting group.id in the Kafka config, etc.) this function will panic. If the Consumer cannot be created with the provided configuration or subscribing to the provided topic fails a non-nil error value will be returned.
func (*Consumer) Assignments ¶
func (c *Consumer) Assignments() ([]kafka.TopicPartition, error)
Assignments fetches and returns the currently assigned topics and partitions for the Consumer.
func (*Consumer) Close ¶
Close stops polling messages/events from Kafka and cleans up resources including calling Close on the underlying Confluent Kafka consumer. Once Close has been called the instance of Consumer is no longer usable.
This function should only be called once.
func (*Consumer) Consume ¶
func (c *Consumer) Consume()
Consume begins polling Kafka for messages/events passing the read messages off to the provided MessageHandler.
This function is blocking and in most use cases it should be called in a separate goroutine. It will continue to run until Close is called or the program exits.
type ConsumerOptions ¶
type ConsumerOptions struct { // The Kafka configuration that is used to create the underlying Confluent Kafka // Consumer type. This is a required field. A zero value (nil) will cause a panic. KafkaConfig *kafka.ConfigMap // The handler that will be handed the message from Kafka and process it. This is // a required field. A zero value (nil) will cause a panic. Handler MessageHandler // The topic to consume. This is a required field. A zero value ("") will cause // a panic. Topic string // Configures the timeout polling messages from Kafka. This field is optional. // If the zero-value is provided DefaultPollTimeout will be used. PollTimeout time.Duration // An optional callback that is invoked when an error occurs polling/reading // messages from Kafka. // // While optional, it is highly recommended to provide an ErrorCallback. // Otherwise, errors from the underlying Confluent Kafka Consumer are discarded. // Ideally, these errors should be logged and/or capture metrics. ErrorHandler ErrorCallback // Allows plugging in a third party Logger. By default, the Logger from the // standard library will be used if one is not provided at INFO level. Logger Logger }
ConsumerOptions contains the configuration options to instantiate and initialize a Consumer.
Note: Some of the fields are required and will cause a panic. Take note of the GoDoc comments for each of the fields in the struct.
type ErrorCallback ¶
type ErrorCallback func(err error)
ErrorCallback is a function that is invoked with an error value when an error occurs processing messages from Kafka.
type LogLevel ¶
type LogLevel uint
LogLevel represents the level/importance of a log message
const ( // DebugLevel is for logs that are useful for debugging or tracing the behavior // of the code. DebugLevel LogLevel = iota // InfoLevel is for information log messages that do not need to be actioned but // are still useful. InfoLevel // WarnLevel is for log messages that may need to be actioned/addressed but aren't // necessarily errors. WarnLevel // ErrorLevel is for logs regarding errors ErrorLevel // FatalLevel are for catastrophic errors for which the system cannot recover or // continue FatalLevel )
type Logger ¶
Logger is an interface type that defines the logging behavior of Kefka. This interface can be implemented to allow any third party logger to integrate into Kefka.
type LoggerFunc ¶
LoggerFunc is a convenient type to implement Logger interface without needing to create a new type.
type MarshallFunc ¶
func JsonMarshaller ¶
func JsonMarshaller() MarshallFunc
func MsgpackMarshaller ¶
func MsgpackMarshaller() MarshallFunc
func StringMarshaller ¶
func StringMarshaller() MarshallFunc
StringMarshaller returns a MarshallFunc that is only capable of marshalling a string or any type that implements the fmt.Stringer interface. If any other type is provided an error will be returned.
StringMarshaller is useful for marshalling keys which are typically a string.
type MessageHandler ¶
MessageHandler is a type that handles/processes messages from Kafka.
Implementations of MessageHandler are expected to perform all processing and business logic around the received message. Implementations of MessageHandler should handle any and all retry logic, error handling, etc. A MessageHandler can return an error but the error is ignored by the underlying Consumer and Reader types. However, it does allow a MessageHandler to be wrapped to handle errors and retries in an elegant way.
Implementations of MessageHandler should call the Commit function when using manual/synchronous commits with Kafka. Otherwise, it should not be called as it can have negative impacts on throughput/performance.
type MessageHandlerFunc ¶
MessageHandlerFunc is a convenient way to satisfy the MessageHandler interface without creating a type.
See MessageHandler for more details.
type MetadataClient ¶
type MetadataClient interface { Assignment() (partitions []kafka.TopicPartition, err error) Committed(partitions []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low int64, high int64, err error) }
type OffsetWatermarks ¶
type PartitionEOFCallback ¶
PartitionEOFCallback is a function type that is invoked when the end of the partition is reached.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a type that supports producing messages to a Kafka cluster. Producer wraps and uses Confluent Kafka producer type under the hood. Producing messages asynchronously and synchronously are supported.
Producer can automatically handle marshalling of keys and values.
The zero-value is not usable and a Producer should be instantiated with the NewProducer function.
func NewProducer ¶
func NewProducer(opts ProducerOptions) (*Producer, error)
NewProducer creates and initializes a new Producer.
If an error occurs while creating the Confluent Kafka producer from the provided configuration a non-nil error value will be returned. If the ProducerOptions is missing required properties (misusing the API) this function will panic.
func (*Producer) Close ¶
func (p *Producer) Close()
Close delegates to the close method on the underlying Confluent Kafka consumer and closes all channels. After Close has been called the instance of Producer is no longer usable.
As a best practice Flush or FlushAll should be called before calling close to ensure all produce messages/events have been transmitted to the Kafka brokers before closing.
func (*Producer) Flush ¶
Flush and wait for outstanding messages and requests to complete delivery. Includes messages on ProduceChannel. Runs until value reaches zero or on timeoutMs. Returns the number of outstanding events still un-flushed.
func (*Producer) Produce ¶
Produce produces a single message asynchronously. Messages are enqueued into and internal transmit queue, thus returning immediately. If the message could not be enqueued a non-nil error value is returned.
Delivery reports are sent on the provided deliveryChan if provided. If a nil delivery channel is provided delivery reports are polled from the producers events but will not be available to the caller. In essence, if a delivery chan is not provided this method is fire and forget. It will ensure the message was enqueued but will not verify it was delivered. If the caller needs to verify messages were successfully delivered to the Kafka brokers a delivery chan should be provided listen for the delivery reports to confirm delivery.
Produce automatically handles marshalling the key and value to binary using the provided MarshallFunc.
func (*Producer) ProduceMessage ¶
ProduceMessage produces a single message asynchronously. Messages are enqueued into and internal transmit queue, thus returning immediately. If the message could not be enqueued a non-nil error value is returned.
Delivery reports are sent on the provided deliveryChan if provided. If a nil delivery channel is provided delivery reports are polled from the producers events but will not be available to the caller. In essence, if a delivery chan is not provided this method is fire and forget. It will ensure the message was enqueued but will not verify it was delivered. If the caller needs to verify messages were successfully delivered to the Kafka brokers a delivery chan should be provided listen for the delivery reports to confirm delivery.
ProduceMessage in contrast to Produce takes a kafka.Message and doesn't handle marshalling the key and values. However, it provides more control over the message if you need to produce to a specific partition, set headers, etc.
func (*Producer) SyncProduce ¶
SyncProduce produces a single message synchronously.
Technically the underlying Confluent Kafka library doesn't directly support producing events synchronously. Instead, SyncProduce produces the message asynchronously and awaits notification of delivery on a channel. If the context is done before receiving on the delivery chan this method will return with an error but there is a possibility the message may still be delivered.
func (*Producer) SyncProduceMessage ¶
SyncProduceMessage produces a single message asynchronously. SyncProduceMessage accepts a raw kafka.Message. SyncProduceMessage is meant to compliment SyncProduce when the caller needs more control over the message.
Technically the underlying Confluent Kafka library doesn't directly support producing events synchronously. Instead, SyncProduceMessage produces the message asynchronously and awaits notification of delivery on a channel. If the context is done before receiving on the delivery chan this method will return with an error but there is a possibility the message may still be delivered.
type ProducerOptions ¶
type ProducerOptions struct { // The Kafka configuration that is used to create the underlying Confluent Kafka // Producer type. This is a required field. A zero value (nil) will cause a panic. KafkaConfig *kafka.ConfigMap // Configures how the Producer will marshall the key into []byte. This is a // required field. A zero value (nil) will cause a panic. KeyMarshaller MarshallFunc // Configures how the Producer will marshall the value into []byte. This is // a required field. A zero value (nil) will cause a panic. ValueMarshaller MarshallFunc // Allows plugging in a third party Logger. By default, the Logger from the // standard library will be used if one is not provided at INFO level. Logger Logger }
ProducerOptions is a type containing all the configuration options to instantiate and initialize a Producer.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader is a type for reading messages from Kafka topics/partitions.
Under the hood Reader uses Confluent Kafka consumer type. Reader is in essence a wrapper around the Confluent Kafka Go library for reading messages outside a consumer group.
The zero value of Reader is not usable. Instances of Reader should be created using the NewReader function
In contrast to Consumer, Reader is meant to serve use cases where messages are read from Kafka but not consumed. In other words, offsets are not committed back to Kafka, and it is safe for multiple instances to read the same messages possibly over and over. As messages are read from Kafka they are passed to a provided MessageHandler to handle/process the message. A noop Commit is passed to the MessageHandler, so even if its called it has no effect.
In order to guarantee the behavior of Reader certain Kafka configuration properties are overridden and cannot be altered.
enable.partition.eof -> true enable.auto.commit -> false group.id -> kefkareader
func NewReader ¶
func NewReader(opts ReaderOptions) (*Reader, error)
NewReader creates and initializes a new ready to use instance of Reader.
If the API is misused (missing required fields, not setting group.id in the Kafka config, etc.) this function will panic. If the Reader cannot be created with the provided configuration or assigning the topics/partitions fails a non-nil error value will be returned.
func (*Reader) Close ¶
Close stops the Consumer. After calling Close the Consumer is no longer usable.
func (*Reader) QueryWatermarkOffsets ¶
func (r *Reader) QueryWatermarkOffsets() map[string]OffsetWatermarks
QueryWatermarkOffsets queries Kafka to get the starting and ending offsets for all the topics/partitions specified in the TopicPartitions field in ReaderOptions and returning them as a map where the key is topic|partition -> OffsetWatermarks.
Because each topic and partition is queried individually the caller must check the Error field of the OffsetWatermarks type to ensure the operation succeeded.
func (*Reader) Read ¶
func (r *Reader) Read()
Read begins polling Kafka for messages/events and passing the messages to the configured MessageHandler.
This method is blocking and will run until Close is called. In most cases this method should be called on a new goroutine.
This method should never be called more than once.
type ReaderOptions ¶
type ReaderOptions struct { // The Kafka configuration that is used to create the underlying Confluent Kafka // Consumer type. This is a required field. A zero value (nil) will cause a panic. KafkaConfig *kafka.ConfigMap // The handler that will be handed the message from Kafka and process it. This is // a required field. A zero value (nil) will cause a panic. MessageHandler MessageHandler // An optional callback that is invoked when an error occurs polling/reading // messages from Kafka. // // While optional, it is highly recommended to provide an ErrorCallback. // Otherwise, errors from the underlying Confluent Kafka Consumer are discarded. // Ideally, these errors should be logged and/or capture metrics. ErrorCallback ErrorCallback // An optional callback that is invoked when the Reader reaches the end of a // topic/partition. PartitionEOFCallback PartitionEOFCallback // The topics and partitions to be read. This is a required field and at least // one TopicPartition must be supplied. Each TopicPartition should provide the // topic, partition, and starting offset. It is important to note the starting // offset of 0 will default to the latest offset if 0 is not a valid offset. // Optionally the values FirstOffset and LastOffset can be passed to start at // the beginning or end of the partition respectively. // // Example: // topic := "test" // TopicPartitions: []kafka.TopicPartition{ // { // Topic: &topic, // Partition: 0, // Offset: kefka.FirstOffset, // }, // }, TopicPartitions kafka.TopicPartitions // Configures the timeout polling messages from Kafka. This field is optional. // If the zero-value is provided DefaultPollTimeout will be used. PollTimeout time.Duration // Allows plugging in a third party Logger. By default, the Logger from the // standard library will be used if one is not provided at INFO level. Logger Logger }
ReaderOptions is a type representing the configuration options to instantiate and initialize a Reader.
Note: There are fields that are mandatory and if not provided will result in a panic.
type UnmarshallFunc ¶
func JsonUnmarshaller ¶
func JsonUnmarshaller() UnmarshallFunc
func MsgpackUnmarshaller ¶
func MsgpackUnmarshaller() UnmarshallFunc