zkafka

package module
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2025 License: Apache-2.0 Imports: 26 Imported by: 0

README

zkafka

License GitHub Actions Codecov Go Report Card

Install

go get -u github.com/zillow/zkafka/v2

About

zkafka is built to simplify message processing in Kafka. This library aims to minimize boilerplate code, allowing the developer to focus on writing the business logic for each Kafka message. zkafka takes care of various responsibilities, including:

  1. Reading from the worker's configured topics
  2. Managing message offsets reliably - Kafka offset management can be complex, but zkafka handles it. Developers only need to write code to process a single message and indicate whether or not it encountered an error.
  3. Distributing messages to virtual partitions (details will be explained later)
  4. Implementing dead lettering for failed messages
  5. Providing inspectable and customizable behavior through lifecycle functions (Callbacks) - Developers can add metrics or logging at specific points in the message processing lifecycle.

zkafka provides stateless message processing semantics ( sometimes, called lambda message processing). This is a churched-up way of saying, "You write code which executes on each message individually (without knowledge of other messages)". It is purpose-built with this type of usage in mind. Additionally, the worker implementation guarantees at least once processing (Details of how that's achieved are shown in the Commit Strategy section)

NOTE: zkafka is built on top of confluent-kafka-go which is a CGO module. Therefore, so is zkafka. When building with zkafka, make sure to set CGO_ENABLED=1.

Features

The following subsections detail some useful features. To make the following sections more accessible, there are runnable examples in ./examples directory. The best way to learn is to experiment with the examples. Dive in!

Stateless Message Processing

zkafka makes stateless message processing easy. All you have to do is write a concrete processor implementation and wire it up (shown below).

type processor interface {
    Process(ctx context.Context, message *zkafka.Message) error
}

If you want to skip ahead and see a working processor check out the examples. Specifically example/worker/main.go.

The anatomy of that example is described here:

A zkafka.Client needs to be created which can connect to the kafka broker. Typically, authentication information must also be specified at this point (today that would include username/password).

    client := zkafka.NewClient(zkafka.Config{ BootstrapServers: []string{"localhost:29092"} })

Next, this client should be passed to create a zkafka.WorkFactory instance. The factory design, used by this library, adds a little boilerplate but allows default policies to be injected and proliferated to all instantiated work instances. We find that useful at zillow for transparently injecting the nuts and bolts of components that are necessary for our solutions to cross-cutting concerns (typically those revolving around telemetry)

    wf := zkafka.NewWorkFactory(client)

Next we create the work instance. This is finally where the dots are beginning to connect. zkafka.Work objects are responsible for continually polling topics (the set of whom is specified in the config object) they've been instructed to listen to, and execute specified code (defined in the user-controlled processor and lifecycle functions (not shown here))

   topicConfig := zkafka.TopicConfig{Topic: "my-topic", GroupdID: "mygroup", ClientID: "myclient"}
   // this implements the interface specified above and will be executed for each read message
   processor := &Processor{}
   work := wf.Create(topicConfig, processor)

All that's left now is to kick off the run loop (this will connect to the Kafka broker, create a Kafka consumer group, undergo consumer group assignments, and after the assignment begins polling for messages). The run loop executes a single reader (Kafka consumer) which reads messages and then fans those messages out to N processors (sized by the virtual partition pool size. Described later). It's a processing pipeline with a reader at the front, and processors at the back.

The run loop takes two arguments, both responsible for signaling that the run loop should exit.

  1. context.Context object. When this object is canceled, the internal work loop will begin to abruptly shut down. This involves exiting the reader loop and processor loops immediately.

  2. signal channel. This channel should be closed, and tells zkafka to begin a graceful shutdown. Graceful shutdown means the reader stops reading new messages, and the processors attempt to finish their in-flight work.

At Zillow, we deploy to a kubernetes cluster, and use a strategy that uses both mechanisms. When k8s indicates shutdown is imminent, we close the shutdown channel. Graceful shutdown is time-boxed, and if the deadline is reached, the outer context object is canceled signaling a more aggressive teardown. The below example passes in a nil shutdown signal (which is valid). That's done for brevity in the readme, production use cases should take advantage (see examples).

   err = w.Run(context.Background(), nil)
Hyper Scalability

zkafka.Work supports a concept called virtual partitions. This extends the Kafka partition concept. Message ordering is guaranteed within a Kafka partition, and the same holds true for a virtual partition. Every zkafka.Work object manages a pool of goroutines called processors (1 by default and controlled by the zkafka.Speedup(n int) option). Each processor reads from a goroutine channel called a virtual partition. When a message is read by the reader, it is assigned to one of the virtual partitions based on hash(message.Key) % virtual partition count. This follows the same mechanism used by Kafka. With this strategy, a message with the same key will be assigned to the same virtual partition.

This allows for another layer of scalability. To increase throughput and maintain the same message ordering guarantees, there is no longer a need to increase the Kafka partition count (which can be operationally challenging). Instead, you can use zkafka.Speedup() to increase the virtual partition count.

// sets up Kafka broker locally
make setup;
// terminal 1. Starts producing messages. To juice up the production rate, remove the time.Sleep() in the producer and turn acks off.
make example-producer
// terminal 2. Starts a worker with speedup=5. 
make example-worker
Configurable Dead Letter Topics

A zkafka.Work instance can be configured to write to a Dead Letter Topic (DLT) when message processing fails. This can be accomplished with the zkafka.WithDeadLetterTopic() option. Or, more conveniently, can be controlled by adding a non nil value to the zkafka.ConsumerTopicConfig DeadLetterTopic field. Minimally, the topic name of the (dead letter topic) must be specified (when specified via configuration, no clientID need be specified, as the encompassing consumer topic configs client id will be used).

     zkafka.ConsumerTopicConfig{
        ...
       // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to
       // when a processing error occurs.
       DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
          Topic:    "zkafka-example-deadletter-topic",
       },
    }

The above will be written to zkafka-example-deadletter-topic in the case of a processing error.

The above-returned error will skip writing to the DLT.

To execute a local example of the following pattern:

// sets up kafka broker locally
make setup;
// terminal 1. Starts producing messages (1 per second)
make example-producer
// terminal 2. Starts a worker which fails processing and writes to a DLT. Log statements show when messaages
// are written to a DLT
make example-deadletter-worker

The returned processor error determines whether a message is written to a dead letter topic. In some situations, you might not want to route an error to a DLT. An example might be malformed data.

You have control over this behavior by way of the zkafka.ProcessError.

    return zkafka.ProcessError{
       Err:                 err,
       DisableDLTWrite:     true,
    }
Process Delay Workers

Process Delay Workers can be an important piece of an automated retry policy. A simple example of this would be 2 workers daisy-chained together as follows:

     workerConfig1 := zkafka.ConsumerTopicConfig{
       ClientID: "svc1",
       GroupID: "grp1",
        Topic: "topicA",
       // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to
       // when a processing error occurs.
       DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
          Topic:    "topicB",
       },
    }

    workerConfig2 := zkafka.ConsumerTopicConfig{
      ClientID: "svc1",
      GroupID: "grp1",
      Topic: "topicB",
      // When DeadLetterTopicConfig is specified a dead letter topic will be configured and written to
      // when a processing error occurs.
      DeadLetterTopicConfig: &zkafka.ProducerTopicConfig{
         Topic:    "topicC",
      },
    }

Messages processed by the above worker configuration would:

  1. Worker1 read from topicA
  2. If message processing fails, write to topicB via the DLT configuration
  3. Worker2 read from topicB
  4. If message processing fails, write to topicC via the DLT configuration

This creates a retry pipeline. The issue is that worker2, ideally would process on a delay (giving whatever transient error is occurring a chance to resolve). Luckily, zkafka supports such a pattern. By specifying ProcessDelayMillis in the config, a worker is created which will delay processing of a read message until at least the delay duration has been waited.

    topicConfig := zkafka.ConsumerTopicConfig{
        ... 
       // This value instructs the kafka worker to inspect the message timestamp, and not call the processor call back until
       // at least the process delay duration has passed
       ProcessDelayMillis: &processDelayMillis,
    }

The time awaited by the worker varies. If the message is very old (maybe the worker had been stopped previously), then the worker will detect that the time passed since the message was written > delay. In such a case, it won't delay any further.

To execute a local example of the following pattern:

// sets up kafka broker locally
make setup;
// terminal 1. Starts producing messages (1 per second)
make example-producer
// terminal 2. Starts delay processor. Prints out the duration since msg.Timestamp. 
// How long the delay is between when the message was written and when the process callback is executed.
make example-delay-worker
Commit Strategy:

A zkafka.Worker commit strategy allows for at least once message processing.

There are two quick definitions important to the understanding of the commit strategy:

  1. Commit - involves communicating with kafka broker and durably persisting offsets on a kafka broker.
  2. Store - is the action of updating a local store of message offsets which will be persisted during the commit action

The zkafka.Work instance will store message offsets as message processing concludes. Because the worker manages storing commits the library sets enable.auto.offset.store=false. Additionally, the library offloads actually committing messages to a background process managed by librdkafka (The frequency at which commits are communicated to the broker is controlled by auto.commit.interval.ms, default=5s). Additionally, during rebalance events, explicit commits are executed.

This strategy is based off of Kafka Docs - Offset Management where a strategy of asynchronous/synchronous commits is suggested to reduce duplicate messages.

The above results in the following algorithm:

  1. Before message processing is started, an internal heap structure is used to track in-flight messages.
  2. After message processing concludes, a heap structure managed by zkafka marks the message as complete (regardless of whether processing errored or not).
  3. The inflight heap and the work completed heap are compared. Since offsets increase incrementally (by 1), it can be determined whether message processing finished out of order. If the inflight heap's lowest offset is the same as the completed, then that message is safe to be Stored. This can be done repetitively until the inflight heap is empty, or inflight messages haven't yet been marked as complete.

The remaining steps are implicitly handled by librdkafka

  1. Commit messages whose offsets have been stored at configurable intervals (auto.commit.interval.ms)
  2. Commit messages whose offsets have been stored when partitions are revoked (this is implicitly handled by librdkafka. To see this add debug=cgrp in ConsumerTopicConfig, and there'll be COMMIT logs after a rebalance. If doing this experience, set the auto.commit.interval.ms to a large value to avoid confusion between the rebalance commit)
  3. Commit messages whose offsets have been stored on close of reader (this is implicitly handled by librdkafka. To see this add debug=cgrp in ConsumerTopicConfig, and there'll be COMMIT logs after the client is closed, but before the client is destroyed)

Errors returned on processing are still stored. This avoids issues due to poison pill messages (messages that will never be able to be processed without error) as well as transient errors blocking future message processing. Use dead lettering to sequester these failed messages or Use WithOnDone() option to register callback for special processing of these messages.

SchemaRegistry Support:

zkafka supports schema registry. It extends zfmt to enable this adding three zfmt.FormatterType:

    AvroSchemaRegistry zfmt.FormatterType = "avro_schema_registry"
	ProtoSchemaRegistry zfmt.FormatterType = "proto_schema_registry"
	JSONSchemaRegistry zfmt.FormatterType = "json_schema_registry"

This can be used in ProducerTopicConfig/ConsumerTopicConfig just like the others. Examples have been added example/producer_avro and example/worker_avro which demonstrate the additional configuration (mostly there to enable the schema registry communication that's required)

Consumer/Producer Configuration

See for description of configuration options and their defaults:

  1. Librdkafka Configuration
  2. Consumer Configuration
  3. Producer Configurations

These are primarily specified through the TopicConfig structs (ProducerTopicConfig and ConsumerTopicConfig). TopicConfigs includes strongly typed fields that translate to librdconfig values. To see translation see config.go. An escape hatch is provided for ad hoc config properties via the AdditionalProperties map. Here config values that don't have a strongly typed version in TopicConfig may be specified. Not all specified config values will work (for example enable.auto.commit=false would not work with this client because that value is explicitly set to true after reading of the AdditionalProperties map).

deliveryTimeoutMS := 100
enableIdempotence := false
requiredAcks := "0"

pcfg := ProducerTopicConfig{
   ClientID:            "myclientid",
   Topic: "mytopic",
   DeliveryTimeoutMs:   &deliveryTimeoutMS,
   EnableIdempotence:   &enableIdempotence,
   RequestRequiredAcks: &requiredAcks,
   AdditionalProps: map[string]any{
      "linger.ms":               float64(5),
   },
}

ccfg := ConsumerTopicConfig{
   ClientID: "myclientid2",
   GroupID:  "mygroup",
   Topic: "mytopic",
   AdditionalProps: map[string]any{
      "auto.commit.interval.ms": float32(20),
   },
}

Documentation

Index

Constants

View Source
const (
	// CustomFmt indicates that the user would pass in their own Formatter later
	CustomFmt zfmt.FormatterType = "custom"
	// AvroSchemaRegistry uses confluent's schema registry. It encodes a schemaID as the first 5 bytes and then avro serializes (binary)
	// for the remaining part of the payload. It is the successor to `avro_schema` which ships with zfmt,
	AvroSchemaRegistry zfmt.FormatterType = "avro_schema_registry"

	// ProtoSchemaRegistry uses confluent's schema registry. It encodes a schemaID as well as the message types as
	// a payload prefix and then proto serializes (binary) for the remaining part of the payload.
	// zfmt.ProtoSchemaDeprecatedFmt had a bug in its implementation and didn't work properly with confluent
	ProtoSchemaRegistry zfmt.FormatterType = "proto_schema_registry"

	// JSONSchemaRegistry uses confluent's schema registry. It encodes a schemaID as the first 5 bytes and then json serializes (human readable)
	// for the remaining part of the payload. It is the successor to `json_schema` which ships with zfmt,
	JSONSchemaRegistry zfmt.FormatterType = "json_schema_registry"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Assignment

type Assignment struct {
	Partition int32
	Topic     string
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client helps instantiate usable readers and writers

func NewClient

func NewClient(conf Config, opts ...Option) *Client

NewClient instantiates a kafka client to get readers and writers

func (*Client) Close

func (c *Client) Close() error

Close terminates all cached readers and writers gracefully.

func (*Client) Reader

func (c *Client) Reader(_ context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error)

Reader gets a kafka consumer from the provided config, either from cache or from a new instance

func (*Client) Writer

func (c *Client) Writer(_ context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error)

Writer gets a kafka producer from the provided config, either from cache or from a new instance

type ClientProvider

type ClientProvider interface {
	Reader(ctx context.Context, topicConfig ConsumerTopicConfig, opts ...ReaderOption) (Reader, error)
	Writer(ctx context.Context, topicConfig ProducerTopicConfig, opts ...WriterOption) (Writer, error)
	Close() error
}

ClientProvider is the convenient interface for kafka Client

type Config

type Config struct {
	// BootstrapServers is a list of broker addresses
	BootstrapServers []string

	// SaslUsername and SaslPassword for accessing Kafka Cluster
	SaslUsername *string
	SaslPassword *string

	// CAFile, KeyFile, CertFile are used to enable TLS with valid configuration
	// If not defined, TLS with InsecureSkipVerify=false is used.
	CAFile   string
	KeyFile  string
	CertFile string
}

Config holds configuration to create underlying kafka client

type ConsumerTopicConfig

type ConsumerTopicConfig struct {
	// ClientID is required and should be unique. This is used as a cache key for the client
	ClientID string

	// GroupID is required for observability per ZG Kafka Best Practices
	// http://analytics.pages.zgtools.net/data-engineering/data-infra/streamz/docs/#/guides/kafka-guidelines?id=observability
	// The convention is [team_name]/[service]/[group], e.g. concierge/search/index-reader
	GroupID string

	// Topic is the name of the topic to be consumed. At least one should be specified between the Topic and Topics attributes
	Topic string

	// Topics are the names of the topics to be consumed. At least one should be specified between the Topic and Topics attributes
	Topics []string

	// BootstrapServers are the addresses of the possible brokers to be connected to.
	// If not defined, Reader and Writer will attempt to use the brokers defined by the client
	BootstrapServers []string

	// AutoCommitIntervalMs is a setting which indicates how often offsets will be committed to the kafka broker.
	AutoCommitIntervalMs *int

	// AdditionalProps is defined as an escape hatch to specify properties not specified as strongly typed fields.
	// The values here will be overwritten by the values of TopicConfig fields if specified there as well.
	AdditionalProps map[string]interface{}

	// Formatter is json if not defined
	Formatter zfmt.FormatterType

	SchemaRegistry SchemaRegistryConfig

	// SchemaID defines the schema registered with Confluent schema Registry
	// Default value is 0, and it implies that both Writer and Reader do not care about schema validation
	// and should encode/decode the message based on data type provided.
	// Currently, this only works with SchematizedAvroFormatter
	SchemaID int

	// Enable kafka transaction, default to false
	Transaction bool

	// ReadTimeoutMillis specifies how much time, in milliseconds, before a kafka read times out (and error is returned)
	ReadTimeoutMillis *int

	// ProcessTimeoutMillis specifies how much time, in milliseconds,
	// is given to process a particular message before cancellation is calls.
	// Default to 1 minute
	ProcessTimeoutMillis *int

	// SessionTimeoutMillis specifies how much time, in milliseconds,
	// is given by the broker, where in the absence of a heartbeat being successfully received from the consumer
	// group member, the member is considered failed (and a rebalance is initiated).
	// Defaults to 1 minute 1 second (just over default `ProcessTimeoutMillis`)
	SessionTimeoutMillis *int

	// MaxPollIntervalMillis specifies how much time, in milliseconds,
	// is given by the broker, where in the absence of `Read`/`Poll` being called by a consumer, the member is considered failed (and a rebalance is initiated).
	// Defaults to 1 minute 1 second (just over default `ProcessTimeoutMillis`)
	MaxPollIntervalMillis *int

	// ProcessDelayMillis specifies how much time, in milliseconds,
	// a virtual partition processor should pause prior to calling processor.
	// The specified duration represents the maximum pause a processor will execute. The virtual partition processor
	// uses the message's timestamp and its local estimate of `now` to determine
	// the observed delay. If the observed delay is less than the amount configured here,
	// an additional pause is executed.
	ProcessDelayMillis *int

	// SaslUsername and SaslPassword for accessing Kafka Cluster
	SaslUsername *string
	SaslPassword *string

	// DeadLetterTopicConfig allows you to specify a topic for which to write messages which failed during processing to
	DeadLetterTopicConfig *ProducerTopicConfig
}

ConsumerTopicConfig holds configuration to create reader for a kafka topic

func (ConsumerTopicConfig) GetFormatter

func (p ConsumerTopicConfig) GetFormatter() zfmt.FormatterType

func (ConsumerTopicConfig) GetSchemaID

func (p ConsumerTopicConfig) GetSchemaID() int

type DeserializationConfig

type DeserializationConfig struct {
	// Schema is used exclusively by the avro schema registry formatter today. It's necessary to provide proper schema evolution properties
	// expected by typical use cases.
	Schema string
}

type FakeClient

type FakeClient struct {
	R Reader
	W Writer
}

FakeClient is a convenience struct for testing purposes. It allows the specification of your own Reader/Writer while implementing the `ClientProvider` interface, which makes it compatible with a work factory.

func (FakeClient) Close

func (f FakeClient) Close() error

func (FakeClient) Reader

func (FakeClient) Writer

type FakeMessage

type FakeMessage struct {
	Key   *string
	Value []byte
	// ValueData allows the specification of serializable instance and uses the provided formatter
	// to create ValueData. Any error during serialization is ignored.
	ValueData any
	DoneFunc  func(ctx context.Context)
	Headers   map[string][]byte
	Offset    int64
	Partition int32
	Topic     string
	GroupID   string
	TimeStamp time.Time
	Fmt       zfmt.Formatter
}

FakeMessage can be used during testing to construct Message objects. The Message object has private fields which might need to be tested

type Formatter

type Formatter interface {
	Marshall(v any) ([]byte, error)
	Unmarshal(b []byte, v any) error
}

Formatter allows the user to extend formatting capability to unsupported data types

type KReader

type KReader struct {
	// contains filtered or unexported fields
}

KReader is a Reader implementation which allows for the subscription to multiple topics. It provides methods for consuming messages from its subscribed topics and assigned partitions.

func (*KReader) Assignments

func (r *KReader) Assignments(_ context.Context) ([]Assignment, error)

Assignments returns the current partition assignments for the kafka consumer

func (*KReader) Close

func (r *KReader) Close() error

Close terminates the consumer. This will gracefully unsubscribe the consumer from the kafka topic (which includes properly revoking the assigned partitions)

func (*KReader) Read

func (r *KReader) Read(ctx context.Context) (*Message, error)

Read consumes a single message at a time. Blocks until a message is returned or some non-fatal error occurs in which case a nil message is returned

type KWriter

type KWriter struct {
	// contains filtered or unexported fields
}

KWriter is a kafka producer. KWriter should be initialized from the Client to be usable

func (*KWriter) Close

func (w *KWriter) Close()

Close terminates the writer gracefully and mark it as closed

func (*KWriter) Write

func (w *KWriter) Write(ctx context.Context, value any, opts ...WriteOption) (Response, error)

Write sends messages to kafka with message key set as nil. The value arg passed to this method is marshalled by the configured formatter and used as the kafka message's value

func (*KWriter) WriteKey

func (w *KWriter) WriteKey(ctx context.Context, key string, value any, opts ...WriteOption) (Response, error)

WriteKey send message to kafka with a defined keys. The value arg passed to this method is marshalled by the configured formatter and used as the kafka message's value

func (*KWriter) WriteRaw

func (w *KWriter) WriteRaw(ctx context.Context, key *string, value []byte, opts ...WriteOption) (Response, error)

WriteRaw allows you to write messages using a lower level API than Write and WriteKey. WriteRaw raw doesn't use a formatter to marshall the value data and instead takes the bytes as is and places them as the value for the kafka message It's convenient for forwarding message in dead letter operations.

type KafkaConsumer

type KafkaConsumer interface {
	SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) error
	ReadMessage(timeout time.Duration) (*kafka.Message, error)
	Commit() ([]kafka.TopicPartition, error)
	StoreOffsets(offsets []kafka.TopicPartition) (storedOffsets []kafka.TopicPartition, err error)
	Close() error
	Assignment() (partitions []kafka.TopicPartition, err error)
	AssignmentLost() bool
}

type KafkaProducer

type KafkaProducer interface {
	Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
	Close()
}

type LifecycleHooks

type LifecycleHooks struct {
	// Called by work after reading a message (guaranteed non nil), offers the ability to customize the context object (resulting context object passed to work processor)
	PostRead func(ctx context.Context, meta LifecyclePostReadMeta) (context.Context, error)

	// Called by work immediately after an attempt to read a message. Msg might be nil, if there was an error
	// or no available messages.
	PostReadImmediate func(ctx context.Context, meta LifecyclePostReadImmediateMeta)

	// Called after receiving a message and before processing it.
	PreProcessing func(ctx context.Context, meta LifecyclePreProcessingMeta) (context.Context, error)

	// Called after processing a message
	PostProcessing func(ctx context.Context, meta LifecyclePostProcessingMeta) error

	// Called after sending a message to the queue
	PostAck func(ctx context.Context, meta LifecyclePostAckMeta) error

	// Called prior to executing write operation
	PreWrite func(ctx context.Context, meta LifecyclePreWriteMeta) (LifecyclePreWriteResp, error)

	// Call after the reader attempts a fanOut call.
	PostFanout func(ctx context.Context)
}

func ChainLifecycleHooks

func ChainLifecycleHooks(hooks ...LifecycleHooks) LifecycleHooks

ChainLifecycleHooks chains multiple lifecycle hooks into one. The hooks are called in the order they are passed. All hooks are called, even when errors occur. Errors are accumulated in a wrapper error and returned to the caller.

type LifecycleHooksOption

type LifecycleHooksOption struct {
	// contains filtered or unexported fields
}

type LifecyclePostAckMeta

type LifecyclePostAckMeta struct {
	Topic string
	// Time when the message was published to the queue
	ProduceTime time.Time
}

type LifecyclePostProcessingMeta

type LifecyclePostProcessingMeta struct {
	Topic                 string
	GroupID               string
	VirtualPartitionIndex int
	// Time taken to process the message
	ProcessingTime time.Duration
	// Message processed
	Msg *Message
	// Response code returned by the processor
	ResponseErr error
}

type LifecyclePostReadImmediateMeta

type LifecyclePostReadImmediateMeta struct {
	// Message that was read (could be nil)
	Message *Message
	Err     error
}

type LifecyclePostReadMeta

type LifecyclePostReadMeta struct {
	Topic   string
	GroupID string
	// Message that was read (will be non nil)
	Message *Message
}

type LifecyclePreProcessingMeta

type LifecyclePreProcessingMeta struct {
	Topic                 string
	GroupID               string
	VirtualPartitionIndex int
	// Time since the message was sent to the topic
	TopicLag time.Duration
	// Message containing being processed
	Message *Message
}

type LifecyclePreWriteMeta

type LifecyclePreWriteMeta struct{}

type LifecyclePreWriteResp

type LifecyclePreWriteResp struct {
	Headers map[string][]byte
}

type Logger

type Logger interface {
	Debugw(ctx context.Context, msg string, keysAndValues ...any)
	Infow(ctx context.Context, msg string, keysAndValues ...any)
	Warnw(ctx context.Context, msg string, keysAndValues ...any)
	Errorw(ctx context.Context, msg string, keysAndValues ...any)
}

Logger is the interface that wraps basic logging functions

type Message

type Message struct {
	Key string

	Headers   map[string][]byte
	Offset    int64
	Partition int32
	Topic     string
	GroupID   string
	TimeStamp time.Time
	// contains filtered or unexported fields
}

Message is a container for kafka message

func GetFakeMessage deprecated

func GetFakeMessage(key string, value any, fmt zfmt.Formatter, doneFunc func()) *Message

GetFakeMessage is a helper method for creating a *Message instance.

Deprecated: As of v1.0.0, Prefer `GetMsgFromFake()`

func GetMsgFromFake

func GetMsgFromFake(input *FakeMessage) *Message

GetMsgFromFake allows the construction of a Message object (allowing the specification of some private fields).

func (*Message) Decode

func (m *Message) Decode(v any) error

Decode reads message data and stores it in the value pointed to by v.

func (*Message) Done

func (m *Message) Done()

Done is used to alert that message processing has completed. This marks the message offset to be committed

func (*Message) DoneWithContext

func (m *Message) DoneWithContext(ctx context.Context)

DoneWithContext is used to alert that message processing has completed. This marks the message offset to be committed

func (*Message) Value

func (m *Message) Value() []byte

Value returns a copy of the current value byte array. Useful for debugging

type NoopLogger

type NoopLogger struct{}

func (NoopLogger) Debugw

func (l NoopLogger) Debugw(_ context.Context, _ string, _ ...any)

func (NoopLogger) Errorw

func (l NoopLogger) Errorw(_ context.Context, _ string, _ ...any)

func (NoopLogger) Infow

func (l NoopLogger) Infow(_ context.Context, _ string, _ ...any)

func (NoopLogger) Warnw

func (l NoopLogger) Warnw(_ context.Context, _ string, _ ...any)

type Option

type Option func(*Client)

Option is a function that modify the client configurations

func KafkaGroupPrefixOption

func KafkaGroupPrefixOption(prefix string) Option

KafkaGroupPrefixOption creates a groupPrefix which will be added to all client and producer groupID strings if created after this option is added

func LoggerOption

func LoggerOption(logger Logger) Option

LoggerOption applies logger to the client and to all writers/readers which are created after this call.

func WithClientLifecycleHooks

func WithClientLifecycleHooks(h LifecycleHooks) Option

func WithClientTextMapPropagator

func WithClientTextMapPropagator(p propagation.TextMapPropagator) Option

WithClientTextMapPropagator applies an otel p to the client and to all writers/readers which are created

func WithClientTracerProviderOption

func WithClientTracerProviderOption(tp trace.TracerProvider) Option

WithClientTracerProviderOption applies an otel tracer provider to the client and to all writers/readers which are created

func WithConsumerProvider

func WithConsumerProvider(provider func(config map[string]any) (KafkaConsumer, error)) Option

WithConsumerProvider allows for the specification of a factory which is responsible for returning a KafkaConsumer given a config map.

func WithProducerProvider

func WithProducerProvider(provider func(config map[string]any) (KafkaProducer, error)) Option

WithProducerProvider allows for the specification of a factory which is responsible for returning a KafkaProducer given a config map.

type ProcessError

type ProcessError struct {
	// Err is the actual error that the processor encountered.
	Err error
	// DisableCircuitBreak indicates that this error should be ignored for
	// purposes of managing the circuit breaker. Any returned errors where
	// this is set to true will not cause the processing of messages to slow.
	DisableCircuitBreak bool
	// DisableDLTWrite indicates that this message should not be written to
	// a dead letter topic (if one is configured) as it cannot be retried
	// successfully.
	DisableDLTWrite bool
}

ProcessError wraps an error that a processor encounters, while also exposing controls that allow for specifying how the error should be handled.

func (ProcessError) Error

func (p ProcessError) Error() string

func (ProcessError) Unwrap

func (p ProcessError) Unwrap() error

type ProducerTopicConfig

type ProducerTopicConfig struct {
	// ClientID is required and should be unique. This is used as a cache key for the client
	ClientID string

	// Topic is required
	Topic string

	// BootstrapServers are the addresses of the possible brokers to be connected to.
	// If not defined, Reader and Writer will attempt to use the brokers defined by the client
	BootstrapServers []string

	// DeliveryTimeoutMs is a librdkafka setting. Local message timeout.
	// This value is only enforced locally and limits the time a produced message waits for successful delivery.
	// A time of 0 is infinite. This is the maximum time librdkafka may use to deliver a message (including retries).
	// Delivery error occurs when either the retry count or the message timeout are exceeded.
	// See defaults in librdkafka configuration
	DeliveryTimeoutMs *int

	// AdditionalProps is defined as an escape hatch to specify properties not specified as strongly typed fields.
	// The values here will be overwritten by the values of TopicConfig fields if specified there as well.
	AdditionalProps map[string]interface{}

	// Formatter is json if not defined
	Formatter zfmt.FormatterType

	// SchemaRegistry provides details about connecting to a schema registry including URL
	// as well as others.
	SchemaRegistry SchemaRegistryConfig

	// SchemaID defines the schema registered with Confluent schema Registry
	// Default value is 0, and it implies that both Writer and Reader do not care about schema validation
	// and should encode/decode the message based on data type provided.
	// Currently, this only works with SchematizedAvroFormatter
	SchemaID int

	// Enable kafka transaction, default to false
	Transaction bool

	// RequestRequiredAcks indicates the number of acknowledgments the leader broker must receieve from In Sync Replica (ISR) brokers before responding
	// to the request (0=Broker does not send any response to client, -1 or all=broker blocks until all ISRs commit)
	RequestRequiredAcks *string

	// EnableIdempotence When set to true, the producer will ensure that messages are successfully produced exactly once and in the original produce order.
	// Default to true
	EnableIdempotence *bool

	// LingerMillis specifies the delay, in milliseconds, to wait for messages in the producer to accumulate before constructing message batches.
	LingerMillis int

	NagleDisable *bool

	// SaslUsername and SaslPassword for accessing Kafka Cluster
	SaslUsername *string
	SaslPassword *string
}

ProducerTopicConfig holds configuration to create writer to kafka topic

func (ProducerTopicConfig) GetFormatter

func (p ProducerTopicConfig) GetFormatter() zfmt.FormatterType

func (ProducerTopicConfig) GetSchemaID

func (p ProducerTopicConfig) GetSchemaID() int

type Reader

type Reader interface {
	Read(ctx context.Context) (*Message, error)
	Close() error
}

Reader is the convenient interface for kafka KReader

type ReaderOption

type ReaderOption func(*ReaderSettings)

ReaderOption is a function that modify the KReader configurations

func RFormatterOption

func RFormatterOption(formatter Formatter) ReaderOption

RFormatterOption sets the formatter for this reader

type ReaderSettings

type ReaderSettings struct {
	// contains filtered or unexported fields
}

type Response

type Response struct {
	Partition int32
	Offset    int64
}

Response is a kafka response with the Partition where message was sent to along with its assigned Offset

type SchemaRegistryConfig

type SchemaRegistryConfig struct {
	// URL is the schema registry URL. During serialization and deserialization
	// schema registry is checked against to confirm schema compatability.
	URL string
	// Serialization provides additional information used by schema registry formatters during serialization (data write)
	Serialization SerializationConfig
	// Deserialization provides additional information used by schema registry formatters during deserialization (data read)
	Deserialization DeserializationConfig
	// SubjectName allows the specification of the SubjectName. If not specified defaults to [topic name strategy](https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy)
	SubjectName string
}

type SerializationConfig

type SerializationConfig struct {
	// AutoRegisterSchemas indicates whether new schemas (those that evolve existing schemas or are brand new) should be registered
	// with schema registry dynamically. This feature is typically not used for production workloads
	AutoRegisterSchemas bool
	// Schema is used exclusively by the avro schema registry formatter today. Its necessary to provide proper schema evolution properties
	// expected by typical use cases.
	Schema string
}

type TopicPartition

type TopicPartition struct {
	Partition int32
	Offset    int64
}

type Work

type Work struct {
	// contains filtered or unexported fields
}

Work has a single public method `Run()` which continuously reads and process messages from the topic (or topics) it is registered to listen to. `Run()` executes the following steps

  1. Read a kafka.Message using the provided reader.
  2. Select the virtual partition pool allocated for a specific topic
  3. Select and write the `kafka.Message` to the pool's virtual partition based on a hash of the `kafka.Message.Key` (virtual partition selection)
  4. A goroutine is assigned for each virtual partition. Its responsibility is to continuously read from its virtual partition, call the Process callback function, and then store the offset of the message.

Additional responsibilities includes:

  1. Logging
  2. Executing callbacks of special lifecycle events (useful for observability like tracing/metrics)
  3. Tripping circuit breaker for error conditions
  4. Writing to dead letter topic, when configured.

func (*Work) Run

func (w *Work) Run(ctx context.Context, shutdown <-chan struct{}) error

Run executes a pipeline with a single reader (possibly subscribed to multiple topics) fanning out read messages to virtual partitions (preserving message order) and subsequently being processed by the registered processor (user code which executes per kafka message).

It returns either after context.Context cancellation or receiving a shutdown signal from settings (both of which will cause the awaited execReader and execProcessor methods to return

type WorkFactory

type WorkFactory struct {
	// contains filtered or unexported fields
}

WorkFactory creates a work object which reads messages from kafka topic and processes messages concurrently.

func NewWorkFactory

func NewWorkFactory(
	kafkaProvider ClientProvider,
	options ...WorkFactoryOption,
) WorkFactory

NewWorkFactory initializes a new WorkFactory

func (WorkFactory) Create

func (f WorkFactory) Create(topicConfig ConsumerTopicConfig, processor processor, options ...WorkOption) *Work

Create creates a new Work instance.

func (WorkFactory) CreateWithFunc

func (f WorkFactory) CreateWithFunc(topicConfig ConsumerTopicConfig, p func(_ context.Context, msg *Message) error, options ...WorkOption) *Work

CreateWithFunc creates a new Work instance, but allows for the processor to be specified as a callback function instead of an interface

type WorkFactoryOption

type WorkFactoryOption interface {
	// contains filtered or unexported methods
}

WorkFactoryOption interface to identify functional options

func WithLogger

func WithLogger(l Logger) WorkFactoryOption

WithLogger provides option to override the logger to use. default is noop

func WithTextMapPropagator

func WithTextMapPropagator(p propagation.TextMapPropagator) WorkFactoryOption

WithTextMapPropagator provides option to specify the otel text map propagator used by the created work object. Defaults to nil (which means no propagation of transport across transports)

func WithTracerProvider

func WithTracerProvider(tp trace.TracerProvider) WorkFactoryOption

WithTracerProvider provides option to specify the otel tracer provider used by the created work object. Defaults to nil (which means no tracing)

func WithWorkLifecycleHooks

func WithWorkLifecycleHooks(h LifecycleHooks) WorkFactoryOption

WithWorkLifecycleHooks provides option to override the lifecycle hooks. Default is noop.

type WorkOption

type WorkOption interface {
	// contains filtered or unexported methods
}

WorkOption interface to identify functional options

func CircuitBreakAfter

func CircuitBreakAfter(times uint32) WorkOption

CircuitBreakAfter these many consecutive failures

func CircuitBreakFor

func CircuitBreakFor(duration time.Duration) WorkOption

CircuitBreakFor sets the duration for which to keep the circuit open once broken

func DisableBusyLoopBreaker deprecated

func DisableBusyLoopBreaker() WorkOption

Deprecated: DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes. Without blb we see increased cpu usage when circuit is open

func DisableCircuitBreaker deprecated

func DisableCircuitBreaker() WorkOption

Deprecated: DisableCircuitBreaker disables the circuit breaker so that it never breaks

func Speedup

func Speedup(times uint16) WorkOption

Speedup increases the concurrencyFactor for a worker. concurrencyFactor is how many go routines can be running in parallel. NOTE: it's strongly recommended to add more worker instances rather than using this option to speed up each worker.

func WithDeadLetterTopic

func WithDeadLetterTopic(deadLetterTopicConfig ProducerTopicConfig) WorkOption

WithDeadLetterTopic allows you to specify a dead letter topic to forward messages to when work processing fails

func WithDisableBusyLoopBreaker

func WithDisableBusyLoopBreaker(isDisabled bool) WorkOption

WithDisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes. Without blb we see increased cpu usage when circuit is open

func WithDisableCircuitBreaker

func WithDisableCircuitBreaker(isDisabled bool) WorkOption

WithDisableCircuitBreaker allows the user to control whether circuit breaker is disabled or not

func WithLifecycleHooks

func WithLifecycleHooks(h LifecycleHooks) WorkOption

func WithOnDone

func WithOnDone(f func(ctx context.Context, message *Message, err error)) WorkOption

WithOnDone allows you to specify a callback function executed after processing of a kafka message

func WithWorkTextMapPropagator added in v2.2.0

func WithWorkTextMapPropagator(p propagation.TextMapPropagator) WorkOption

WithWorkTextMapPropagator enables the specification of a propagator at the moment a work instance is instantiated

type WriteOption

type WriteOption interface {
	// contains filtered or unexported methods
}

WriteOption is a function that modifies the kafka.Message to be transmitted

func WithHeaders

func WithHeaders(headers map[string]string) WriteOption

WithHeaders allows for the specification of headers. Specified headers will override collisions.

type Writer

type Writer interface {
	// Write sends messages to kafka with message key set as nil.
	// The value arg passed to this method is marshalled by
	// the configured formatter and used as the kafka message's value
	Write(ctx context.Context, value any, opts ...WriteOption) (Response, error)
	// WriteKey send message to kafka with a defined keys.
	// The value arg passed to this method is marshalled by
	// the configured formatter and used as the kafka message's value
	WriteKey(ctx context.Context, key string, value any, opts ...WriteOption) (Response, error)
	// WriteRaw sends messages to kafka. The caller is responsible for marshalling the data themselves.
	WriteRaw(ctx context.Context, key *string, value []byte, opts ...WriteOption) (Response, error)
	Close()
}

Writer is the convenient interface for kafka KWriter

type WriterOption

type WriterOption func(*WriterSettings)

WriterOption is a function that modify the writer configurations

func WFormatterOption

func WFormatterOption(f Formatter) WriterOption

WFormatterOption sets the formatter for this writer

type WriterSettings

type WriterSettings struct {
	DisableTracePropagation bool
	// contains filtered or unexported fields
}

Directories

Path Synopsis
example
Package mock_zkafka is a generated GoMock package.
Package mock_zkafka is a generated GoMock package.
confluent
Package mock_confluent is a generated GoMock package.
Package mock_confluent is a generated GoMock package.
test
evolution/avro1
Code generated by avro/gen.
Code generated by avro/gen.
evolution/avro2
Code generated by avro/gen.
Code generated by avro/gen.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL