kafka

package module
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2020 License: MIT Imports: 38 Imported by: 0

README

kafka-go CircleCI Go Report Card GoDoc

Motivations

We rely on both Go and Kafka a lot at Segment. Unfortunately, the state of the Go client libraries for Kafka at the time of this writing was not ideal. The available options were:

  • sarama, which is by far the most popular but is quite difficult to work with. It is poorly documented, the API exposes low level concepts of the Kafka protocol, and it doesn't support recent Go features like contexts. It also passes all values as pointers which causes large numbers of dynamic memory allocations, more frequent garbage collections, and higher memory usage.

  • confluent-kafka-go is a cgo based wrapper around librdkafka, which means it introduces a dependency to a C library on all Go code that uses the package. It has much better documentation than sarama but still lacks support for Go contexts.

  • goka is a more recent Kafka client for Go which focuses on a specific usage pattern. It provides abstractions for using Kafka as a message passing bus between services rather than an ordered log of events, but this is not the typical use case of Kafka for us at Segment. The package also depends on sarama for all interactions with Kafka.

This is where kafka-go comes into play. It provides both low and high level APIs for interacting with Kafka, mirroring concepts and implementing interfaces of the Go standard library to make it easy to use and integrate with existing software.

Migrating to 0.4

Version 0.4 introduces a few breaking changes to the repository structure which should have minimal impact on programs and should only manifest at compile time (the runtime behavior should remain unchanged).

  • Programs do not need to import compression packages anymore in order to read compressed messages from kafka. All compression codecs are supported by default.

  • Programs that used the compression codecs direction must be adapted. Compression codecs are now exposed in the compress sub-package.

  • The experimental kafka.Client API has been updated and slightly modified: the kafka.NewClient function and kafka.ClientConfig type were removed. Programs now configure the client values directly through exported fields.

  • The kafka.(*Client).ConsumerOffsets method is now deprecated (along with the kafka.TopicAndGroup type, and will be removed when we release version 1.0. Programs should use the kafka.(*Client).OffsetFetch API instead.

With 0.4, we know that we are starting to introduce a bit more complexity in the code, but the plan is to eventually converge towards a simpler and more effective API, allowing us to keep up with Kafka's ever growing feature set, and bringing a more efficient implementation to programs depending on kafka-go.

We truly appreciate everyone's input and contributions, which have made this project way more than what it was when we started it, and we're looking forward to receive more feedback on where we should take it.

Kafka versions

kafka-go is currently compatible with Kafka versions from 0.10.1.0 to 2.1.0. While latest versions will be working, some features available from the Kafka API may not be implemented yet.

Golang version

kafka-go is currently compatible with golang version from 1.12+. To use with older versions of golang use release v0.2.5.

Connection GoDoc

The Conn type is the core of the kafka-go package. It wraps around a raw network connection to expose a low-level API to a Kafka server.

Here are some examples showing typical use of a connection object:

// to produce messages
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)

conn.SetWriteDeadline(time.Now().Add(10*time.Second))
conn.WriteMessages(
    kafka.Message{Value: []byte("one!")},
    kafka.Message{Value: []byte("two!")},
    kafka.Message{Value: []byte("three!")},
)

conn.Close()
// to consume messages
topic := "my-topic"
partition := 0

conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)

conn.SetReadDeadline(time.Now().Add(10*time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

b := make([]byte, 10e3) // 10KB max per message
for {
    _, err := batch.Read(b)
    if err != nil {
        break
    }
    fmt.Println(string(b))
}

batch.Close()
conn.Close()

Because it is low level, the Conn type turns out to be a great building block for higher level abstractions, like the Reader for example.

Reader GoDoc

A Reader is another concept exposed by the kafka-go package, which intends to make it simpler to implement the typical use case of consuming from a single topic-partition pair. A Reader also automatically handles reconnections and offset management, and exposes an API that supports asynchronous cancellations and timeouts using Go contexts.

// make a new reader that consumes from topic-A, partition 0, at offset 42
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092"},
    Topic:     "topic-A",
    Partition: 0,
    MinBytes:  10e3, // 10KB
    MaxBytes:  10e6, // 10MB
})
r.SetOffset(42)

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}

r.Close()
Consumer Groups

kafka-go also supports Kafka consumer groups including broker managed offsets. To enable consumer groups, simply specify the GroupID in the ReaderConfig.

ReadMessage automatically commits offsets when using consumer groups.

// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:   []string{"localhost:9092"},
    GroupID:   "consumer-group-id",
    Topic:     "topic-A",
    MinBytes:  10e3, // 10KB
    MaxBytes:  10e6, // 10MB
})

for {
    m, err := r.ReadMessage(context.Background())
    if err != nil {
        break
    }
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
}

r.Close()

There are a number of limitations when using consumer groups:

  • (*Reader).SetOffset will return an error when GroupID is set
  • (*Reader).Offset will always return -1 when GroupID is set
  • (*Reader).Lag will always return -1 when GroupID is set
  • (*Reader).ReadLag will return an error when GroupID is set
  • (*Reader).Stats will return a partition of -1 when GroupID is set
Explicit Commits

kafka-go also supports explicit commits. Instead of calling ReadMessage, call FetchMessage followed by CommitMessages.

ctx := context.Background()
for {
    m, err := r.FetchMessage(ctx)
    if err != nil {
        break
    }
    fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
    r.CommitMessages(ctx, m)
}
Managing Commits

By default, CommitMessages will synchronously commit offsets to Kafka. For improved performance, you can instead periodically commit offsets to Kafka by setting CommitInterval on the ReaderConfig.

// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9092"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    MinBytes:       10e3, // 10KB
    MaxBytes:       10e6, // 10MB
    CommitInterval: time.Second, // flushes commits to Kafka every second
})

Writer GoDoc

To produce messages to Kafka, a program may use the low-level Conn API, but the package also provides a higher level Writer type which is more appropriate to use in most cases as it provides additional features:

  • Automatic retries and reconnections on errors.
  • Configurable distribution of messages across available partitions.
  • Synchronous or asynchronous writes of messages to Kafka.
  • Asynchronous cancellation using contexts.
  • Flushing of pending messages on close to support graceful shutdowns.
// make a writer that produces to topic-A, using the least-bytes distribution
w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9092"},
	Topic:   "topic-A",
	Balancer: &kafka.LeastBytes{},
})

w.WriteMessages(context.Background(),
	kafka.Message{
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
	kafka.Message{
		Key:   []byte("Key-B"),
		Value: []byte("One!"),
	},
	kafka.Message{
		Key:   []byte("Key-C"),
		Value: []byte("Two!"),
	},
)

w.Close()

Note: Even though kafka.Message contain Topic and Partition fields, they MUST NOT be set when writing messages. They are intended for read use only.

Compatibility with other clients
Sarama

If you're switching from Sarama and need/want to use the same algorithm for message partitioning, you can use the kafka.Hash balancer. kafka.Hash routes messages to the same partitions that Sarama's default partitioner would route to.

w := kafka.NewWriter(kafka.WriterConfig{
	Brokers:  []string{"localhost:9092"},
	Topic:    "topic-A",
	Balancer: &kafka.Hash{},
})
librdkafka and confluent-kafka-go

Use the kafka.CRC32Balancer balancer to get the same behaviour as librdkafka's default consistent_random partition strategy.

w := kafka.NewWriter(kafka.WriterConfig{
	Brokers:  []string{"localhost:9092"},
	Topic:    "topic-A",
	Balancer: kafka.CRC32Balancer{},
})
Java

Use the kafka.Murmur2Balancer balancer to get the same behaviour as the canonical Java client's default partitioner. Note: the Java class allows you to directly specify the partition which is not permitted.

w := kafka.NewWriter(kafka.WriterConfig{
	Brokers:  []string{"localhost:9092"},
	Topic:    "topic-A",
	Balancer: kafka.Murmur2Balancer{},
})
Compression

Compression can be enabled on the Writer by configuring the CompressionCodec:

import (
    "github.com/DarkMristov/kafka-go/compress/snappy"
)

...

w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9092"},
	Topic:   "topic-A",
	CompressionCodec: snappy.NewCompressionCodec(),
})

The Reader will by determine if the consumed messages are compressed by examining the message attributes. However, the package(s) for all expected codecs must be imported so that they get loaded correctly.

Note: in versions prior to 0.4 programs had to import compression packages to install codecs and support reading compressed messages from kafka. This is no longer the case and import of the compression packages are now no-ops.

TLS Support

For a bare bones Conn type or in the Reader/Writer configs you can specify a dialer option for TLS support. If the TLS field is nil, it will not connect with TLS.

Connection
dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},
}

conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
Reader
dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},
}

r := kafka.NewReader(kafka.ReaderConfig{
    Brokers:        []string{"localhost:9093"},
    GroupID:        "consumer-group-id",
    Topic:          "topic-A",
    Dialer:         dialer,
})
Writer
dialer := &kafka.Dialer{
    Timeout:   10 * time.Second,
    DualStack: true,
    TLS:       &tls.Config{...tls config...},
}

w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9093"},
	Topic:   "topic-A",
	Balancer: &kafka.Hash{},
	Dialer:   dialer,
})

Documentation

Index

Examples

Constants

View Source
const (
	SeekStart    = 0 // Seek relative to the first offset available in the partition.
	SeekAbsolute = 1 // Seek to an absolute offset.
	SeekEnd      = 2 // Seek relative to the last offset available in the partition.
	SeekCurrent  = 3 // Seek relative to the current offset.

	// This flag may be combined to any of the SeekAbsolute and SeekCurrent
	// constants to skip the bound check that the connection would do otherwise.
	// Programs can use this flag to avoid making a metadata request to the kafka
	// broker to read the current first and last offsets of the partition.
	SeekDontCheck = 1 << 30
)
View Source
const (
	LastOffset  int64 = -1 // The most recent offset available for a partition.
	FirstOffset int64 = -2 // The least recent offset available for a partition.
)

Variables

View Source
var (
	// DefaultClientID is the default value used as ClientID of kafka
	// connections.
	DefaultClientID string
)
View Source
var DefaultDialer = &Dialer{
	Timeout:   10 * time.Second,
	DualStack: true,
}

DefaultDialer is the default dialer used when none is specified.

View Source
var ErrGenerationEnded = errors.New("consumer group generation has ended")

ErrGenerationEnded is returned by the context.Context issued by the Generation's Start function when the context has been closed.

View Source
var ErrGroupClosed = errors.New("consumer group is closed")

ErrGroupClosed is returned by ConsumerGroup.Next when the group has already been closed.

Functions

func Addr added in v0.4.2

func Addr(network, address string) net.Addr

Addr returns a net.Addr from a pair of network and address.

func TCP added in v0.4.2

func TCP(address string) net.Addr

TCP constructs an address with the network set to "tcp".

func TLS added in v0.4.2

func TLS(address string) net.Addr

TLS constructs an address with the network set to "tls".

Types

type ApiVersion added in v0.4.2

type ApiVersion struct {
	ApiKey     int16
	MinVersion int16
	MaxVersion int16
}

func (ApiVersion) Format added in v0.4.2

func (v ApiVersion) Format(w fmt.State, r rune)

type Balancer

type Balancer interface {
	// Balance receives a message and a set of available partitions and
	// returns the partition number that the message should be routed to.
	//
	// An application should refrain from using a balancer to manage multiple
	// sets of partitions (from different topics for examples), use one balancer
	// instance for each partition set, so the balancer can detect when the
	// partitions change and assume that the kafka topic has been rebalanced.
	Balance(msg Message, partitions ...int) (partition int)
}

The Balancer interface provides an abstraction of the message distribution logic used by Writer instances to route messages to the partitions available on a kafka cluster.

Instances of Balancer do not have to be safe to use concurrently by multiple goroutines, the Writer implementation ensures that calls to Balance are synchronized.

type BalancerFunc

type BalancerFunc func(Message, ...int) int

BalancerFunc is an implementation of the Balancer interface that makes it possible to use regular functions to distribute messages across partitions.

func (BalancerFunc) Balance

func (f BalancerFunc) Balance(msg Message, partitions ...int) int

Balance calls f, satisfies the Balancer interface.

type Batch

type Batch struct {
	// contains filtered or unexported fields
}

A Batch is an iterator over a sequence of messages fetched from a kafka server.

Batches are created by calling (*Conn).ReadBatch. They hold a internal lock on the connection, which is released when the batch is closed. Failing to call a batch's Close method will likely result in a dead-lock when trying to use the connection.

Batches are safe to use concurrently from multiple goroutines.

func (*Batch) Close

func (batch *Batch) Close() error

Close closes the batch, releasing the connection lock and returning an error if reading the batch failed for any reason.

func (*Batch) Err added in v0.4.2

func (batch *Batch) Err() error

Err returns a non-nil error if the batch is broken. This is the same error that would be returned by Read, ReadMessage or Close (except in the case of io.EOF which is never returned by Close).

This method is useful when building retry mechanisms for (*Conn).ReadBatch, the program can check whether the batch carried a error before attempting to read the first message.

Note that checking errors on a batch is optional, calling Read or ReadMessage is always valid and can be used to either read a message or an error in cases where that's convenient.

func (*Batch) HighWaterMark

func (batch *Batch) HighWaterMark() int64

Watermark returns the current highest watermark in a partition.

func (*Batch) Offset

func (batch *Batch) Offset() int64

Offset returns the offset of the next message in the batch.

func (*Batch) Read

func (batch *Batch) Read(b []byte) (int, error)

Read reads the value of the next message from the batch into b, returning the number of bytes read, or an error if the next message couldn't be read.

If an error is returned the batch cannot be used anymore and calling Read again will keep returning that error. All errors except io.EOF (indicating that the program consumed all messages from the batch) are also returned by Close.

The method fails with io.ErrShortBuffer if the buffer passed as argument is too small to hold the message value.

func (*Batch) ReadMessage

func (batch *Batch) ReadMessage() (Message, error)

ReadMessage reads and return the next message from the batch.

Because this method allocate memory buffers for the message key and value it is less memory-efficient than Read, but has the advantage of never failing with io.ErrShortBuffer.

func (*Batch) Throttle

func (batch *Batch) Throttle() time.Duration

Throttle gives the throttling duration applied by the kafka server on the connection.

type Broker

type Broker struct {
	Host string
	Port int
	ID   int
	Rack string
}

Broker represents a kafka broker in a kafka cluster.

type Bytes added in v0.4.2

type Bytes = protocol.ByteSequence

Bytes is an interface representing a sequence of bytes. This abstraction makes it possible for programs to inject data into produce requests without having to load in into an intermediary buffer, or read record keys and values from a fetch response directly from internal buffers.

Bytes are not safe to use concurrently from multiple goroutines.

type CRC32Balancer added in v0.4.2

type CRC32Balancer struct {
	Consistent bool
	// contains filtered or unexported fields
}

CRC32Balancer is a Balancer that uses the CRC32 hash function to determine which partition to route messages to. This ensures that messages with the same key are routed to the same partition. This balancer is compatible with the built-in hash partitioners in librdkafka and the language bindings that are built on top of it, including the github.com/confluentinc/confluent-kafka-go Go package.

With the Consistent field false (default), this partitioner is equivalent to the "consistent_random" setting in librdkafka. When Consistent is true, this partitioner is equivalent to the "consistent" setting. The latter will hash empty or nil keys into the same partition.

Unless you are absolutely certain that all your messages will have keys, it's best to leave the Consistent flag off. Otherwise, you run the risk of creating a very hot partition.

func (CRC32Balancer) Balance added in v0.4.2

func (b CRC32Balancer) Balance(msg Message, partitions ...int) (partition int)

type Client added in v0.4.2

type Client struct {
	// Address of the kafka cluster (or specific broker) that the client will be
	// sending requests to.
	//
	// This field is optional, the address may be provided in each request
	// instead. The request address takes precedence if both were specified.
	Addr net.Addr

	// Time limit for requests sent by this client.
	//
	// If zero, no timeout is applied.
	Timeout time.Duration

	// A transport used to communicate with the kafka brokers.
	//
	// If nil, DefaultTransport is used.
	Transport RoundTripper
}

Client is a high-level API to interract with kafka brokers.

All methods of the Client type accept a context as first argument, which may be used to asynchronously cancel the requests.

Clients are safe to use concurrently from multiple goroutines, as long as their configuration is not changed after it was first used.

func (*Client) ConsumerOffsets added in v0.4.2

func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error)

ConsumerOffsets returns a map[int]int64 of partition to committed offset for a consumer group id and topic.

DEPRECATED: this method will be removed in version 1.0, programs should migrate to use kafka.(*Client).OffsetFetch instead.

func (*Client) CreateTopics added in v0.4.2

func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error)

CreateTopics sends a topic creation request to a kafka broker and returns the response.

func (*Client) DeleteTopics added in v0.4.2

func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error)

DeleteTopics sends a topic deletion request to a kafka broker and returns the response.

func (*Client) Fetch added in v0.4.2

func (c *Client) Fetch(ctx context.Context, req *FetchRequest) (*FetchResponse, error)

Fetch sends a fetch request to a kafka broker and returns the response.

func (*Client) ListOffsets added in v0.4.2

func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error)

ListOffsets sends an offset request to a kafka broker and returns the response.

func (*Client) Metadata added in v0.4.2

func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error)

Metadata sends a metadata request to a kafka broker and returns the response.

func (*Client) OffsetFetch added in v0.4.2

func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error)

OffsetFetch sends a offset fetch request to a kafka broker and returns the response.

func (*Client) Produce added in v0.4.2

func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error)

Produce sends a produce request to a kafka broker and returns the response.

type CompressionCodec

type CompressionCodec = compress.Codec

type ConfigEntry

type ConfigEntry struct {
	ConfigName  string
	ConfigValue string
	// Only used in CreateTopicResponse, ignored if set in request.
	ReadOnly    bool
	IsSensitive bool
}

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents a connection to a kafka broker.

Instances of Conn are safe to use concurrently from multiple goroutines.

func Dial

func Dial(network string, address string) (*Conn, error)

Dial is a convenience wrapper for DefaultDialer.Dial.

func DialContext

func DialContext(ctx context.Context, network string, address string) (*Conn, error)

DialContext is a convenience wrapper for DefaultDialer.DialContext.

func DialLeader

func DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)

DialLeader is a convenience wrapper for DefaultDialer.DialLeader.

func DialPartition

func DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)

DialPartition is a convenience wrapper for DefaultDialer.DialPartition.

func NewConn

func NewConn(conn net.Conn, topic string, partition int) *Conn

NewConn returns a new kafka connection for the given topic and partition.

func NewConnWith

func NewConnWith(conn net.Conn, config ConnConfig) *Conn

NewConnWith returns a new kafka connection configured with config. The offset is initialized to FirstOffset.

func (*Conn) ApiVersions added in v0.4.2

func (c *Conn) ApiVersions() ([]ApiVersion, error)

func (*Conn) Brokers added in v0.4.2

func (c *Conn) Brokers() ([]Broker, error)

Brokers retrieve the broker list from the Kafka metadata

func (*Conn) Close

func (c *Conn) Close() error

Close closes the kafka connection.

func (*Conn) Controller added in v0.4.2

func (c *Conn) Controller() (broker Broker, err error)

Controller requests kafka for the current controller and returns its URL

func (*Conn) CreateTopics

func (c *Conn) CreateTopics(topics ...TopicConfig) error

CreateTopics creates one topic per provided configuration with idempotent operational semantics. In other words, if CreateTopics is invoked with a configuration for an existing topic, it will have no effect.

func (*Conn) DeleteTopics

func (c *Conn) DeleteTopics(topics ...string) error

DeleteTopics deletes the specified topics.

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

LocalAddr returns the local network address.

func (*Conn) Offset

func (c *Conn) Offset() (offset int64, whence int)

Offset returns the current offset of the connection as pair of integers, where the first one is an offset value and the second one indicates how to interpret it.

See Seek for more details about the offset and whence values.

func (*Conn) Read

func (c *Conn) Read(b []byte) (int, error)

Read reads the message at the current offset from the connection, advancing the offset on success so the next call to a read method will produce the next message. The method returns the number of bytes read, or an error if something went wrong.

While it is safe to call Read concurrently from multiple goroutines it may be hard for the program to predict the results as the connection offset will be read and written by multiple goroutines, they could read duplicates, or messages may be seen by only some of the goroutines.

The method fails with io.ErrShortBuffer if the buffer passed as argument is too small to hold the message value.

This method is provided to satisfy the net.Conn interface but is much less efficient than using the more general purpose ReadBatch method.

func (*Conn) ReadBatch

func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch

ReadBatch reads a batch of messages from the kafka server. The method always returns a non-nil Batch value. If an error occurred, either sending the fetch request or reading the response, the error will be made available by the returned value of the batch's Close method.

While it is safe to call ReadBatch concurrently from multiple goroutines it may be hard for the program to predict the results as the connection offset will be read and written by multiple goroutines, they could read duplicates, or messages may be seen by only some of the goroutines.

A program doesn't specify the number of messages in wants from a batch, but gives the minimum and maximum number of bytes that it wants to receive from the kafka server.

func (*Conn) ReadBatchWith added in v0.4.2

func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch

ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured with the default values in ReadBatchConfig except for minBytes and maxBytes.

func (*Conn) ReadFirstOffset

func (c *Conn) ReadFirstOffset() (int64, error)

ReadFirstOffset returns the first offset available on the connection.

func (*Conn) ReadLastOffset

func (c *Conn) ReadLastOffset() (int64, error)

ReadLastOffset returns the last offset available on the connection.

func (*Conn) ReadMessage

func (c *Conn) ReadMessage(maxBytes int) (Message, error)

ReadMessage reads the message at the current offset from the connection, advancing the offset on success so the next call to a read method will produce the next message.

Because this method allocate memory buffers for the message key and value it is less memory-efficient than Read, but has the advantage of never failing with io.ErrShortBuffer.

While it is safe to call Read concurrently from multiple goroutines it may be hard for the program to predict the results as the connection offset will be read and written by multiple goroutines, they could read duplicates, or messages may be seen by only some of the goroutines.

This method is provided for convenience purposes but is much less efficient than using the more general purpose ReadBatch method.

func (*Conn) ReadOffset

func (c *Conn) ReadOffset(t time.Time) (int64, error)

ReadOffset returns the offset of the first message with a timestamp equal or greater to t.

func (*Conn) ReadOffsets

func (c *Conn) ReadOffsets() (first, last int64, err error)

ReadOffsets returns the absolute first and last offsets of the topic used by the connection.

func (*Conn) ReadPartitions

func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error)

ReadPartitions returns the list of available partitions for the given list of topics.

If the method is called with no topic, it uses the topic configured on the connection. If there are none, the method fetches all partitions of the kafka cluster.

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

RemoteAddr returns the remote network address.

func (*Conn) Seek

func (c *Conn) Seek(offset int64, whence int) (int64, error)

Seek sets the offset for the next read or write operation according to whence, which should be one of SeekStart, SeekAbsolute, SeekEnd, or SeekCurrent. When seeking relative to the end, the offset is subtracted from the current offset. Note that for historical reasons, these do not align with the usual whence constants as in lseek(2) or os.Seek. The method returns the new absolute offset of the connection.

func (*Conn) SetDeadline

func (c *Conn) SetDeadline(t time.Time) error

SetDeadline sets the read and write deadlines associated with the connection. It is equivalent to calling both SetReadDeadline and SetWriteDeadline.

A deadline is an absolute time after which I/O operations fail with a timeout (see type Error) instead of blocking. The deadline applies to all future and pending I/O, not just the immediately following call to Read or Write. After a deadline has been exceeded, the connection may be closed if it was found to be in an unrecoverable state.

A zero value for t means I/O operations will not time out.

func (*Conn) SetReadDeadline

func (c *Conn) SetReadDeadline(t time.Time) error

SetReadDeadline sets the deadline for future Read calls and any currently-blocked Read call. A zero value for t means Read will not time out.

func (*Conn) SetRequiredAcks

func (c *Conn) SetRequiredAcks(n int) error

SetRequiredAcks sets the number of acknowledges from replicas that the connection requests when producing messages.

func (*Conn) SetWriteDeadline

func (c *Conn) SetWriteDeadline(t time.Time) error

SetWriteDeadline sets the deadline for future Write calls and any currently-blocked Write call. Even if write times out, it may return n > 0, indicating that some of the data was successfully written. A zero value for t means Write will not time out.

func (*Conn) Write

func (c *Conn) Write(b []byte) (int, error)

Write writes a message to the kafka broker that this connection was established to. The method returns the number of bytes written, or an error if something went wrong.

The operation either succeeds or fail, it never partially writes the message.

This method is exposed to satisfy the net.Conn interface but is less efficient than the more general purpose WriteMessages method.

func (*Conn) WriteCompressedMessages

func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, err error)

WriteCompressedMessages writes a batch of messages to the connection's topic and partition, returning the number of bytes written. The write is an atomic operation, it either fully succeeds or fails.

If the compression codec is not nil, the messages will be compressed.

func (*Conn) WriteCompressedMessagesAt added in v0.4.2

func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error)

WriteCompressedMessagesAt writes a batch of messages to the connection's topic and partition, returning the number of bytes written, partition and offset numbers and timestamp assigned by the kafka broker to the message set. The write is an atomic operation, it either fully succeeds or fails.

If the compression codec is not nil, the messages will be compressed.

func (*Conn) WriteMessages

func (c *Conn) WriteMessages(msgs ...Message) (int, error)

WriteMessages writes a batch of messages to the connection's topic and partition, returning the number of bytes written. The write is an atomic operation, it either fully succeeds or fails.

type ConnConfig

type ConnConfig struct {
	ClientID  string
	Topic     string
	Partition int

	// The transactional id to use for transactional delivery. Idempotent
	// deliver should be enabled if transactional id is configured.
	// For more details look at transactional.id description here: http://kafka.apache.org/documentation.html#producerconfigs
	// Empty string means that this connection can't be transactional.
	TransactionalID string
}

ConnConfig is a configuration object used to create new instances of Conn.

type ConsumerGroup added in v0.4.2

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

ConsumerGroup models a Kafka consumer group. A caller doesn't interact with the group directly. Rather, they interact with a Generation. Every time a member enters or exits the group, it results in a new Generation. The Generation is where partition assignments and offset management occur. Callers will use Next to get a handle to the Generation.

func NewConsumerGroup added in v0.4.2

func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error)

NewConsumerGroup creates a new ConsumerGroup. It returns an error if the provided configuration is invalid. It does not attempt to connect to the Kafka cluster. That happens asynchronously, and any errors will be reported by Next.

func (*ConsumerGroup) Close added in v0.4.2

func (cg *ConsumerGroup) Close() error

Close terminates the current generation by causing this member to leave and releases all local resources used to participate in the consumer group. Close will also end the current generation if it is still active.

func (*ConsumerGroup) Next added in v0.4.2

func (cg *ConsumerGroup) Next(ctx context.Context) (*Generation, error)

Next waits for the next consumer group generation. There will never be two active generations. Next will never return a new generation until the previous one has completed.

If there are errors setting up the next generation, they will be surfaced here.

If the ConsumerGroup has been closed, then Next will return ErrGroupClosed.

type ConsumerGroupConfig added in v0.4.2

type ConsumerGroupConfig struct {
	// ID is the consumer group ID.  It must not be empty.
	ID string

	// The list of broker addresses used to connect to the kafka cluster.  It
	// must not be empty.
	Brokers []string

	// An dialer used to open connections to the kafka server. This field is
	// optional, if nil, the default dialer is used instead.
	Dialer *Dialer

	// Topics is the list of topics that will be consumed by this group.  It
	// will usually have a single value, but it is permitted to have multiple
	// for more complex use cases.
	Topics []string

	// GroupBalancers is the priority-ordered list of client-side consumer group
	// balancing strategies that will be offered to the coordinator.  The first
	// strategy that all group members support will be chosen by the leader.
	//
	// Default: [Range, RoundRobin]
	GroupBalancers []GroupBalancer

	// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
	// group heartbeat update.
	//
	// Default: 3s
	HeartbeatInterval time.Duration

	// PartitionWatchInterval indicates how often a reader checks for partition changes.
	// If a reader sees a partition change (such as a partition add) it will rebalance the group
	// picking up new partitions.
	//
	// Default: 5s
	PartitionWatchInterval time.Duration

	// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
	// polling the brokers and rebalancing if any partition changes happen to the topic.
	WatchPartitionChanges bool

	// SessionTimeout optionally sets the length of time that may pass without a heartbeat
	// before the coordinator considers the consumer dead and initiates a rebalance.
	//
	// Default: 30s
	SessionTimeout time.Duration

	// RebalanceTimeout optionally sets the length of time the coordinator will wait
	// for members to join as part of a rebalance.  For kafka servers under higher
	// load, it may be useful to set this value higher.
	//
	// Default: 30s
	RebalanceTimeout time.Duration

	// JoinGroupBackoff optionally sets the length of time to wait before re-joining
	// the consumer group after an error.
	//
	// Default: 5s
	JoinGroupBackoff time.Duration

	// RetentionTime optionally sets the length of time the consumer group will
	// be saved by the broker.  -1 will disable the setting and leave the
	// retention up to the broker's offsets.retention.minutes property.  By
	// default, that setting is 1 day for kafka < 2.0 and 7 days for kafka >=
	// 2.0.
	//
	// Default: -1
	RetentionTime time.Duration

	// StartOffset determines from whence the consumer group should begin
	// consuming when it finds a partition without a committed offset.  If
	// non-zero, it must be set to one of FirstOffset or LastOffset.
	//
	// Default: FirstOffset
	StartOffset int64

	// If not nil, specifies a logger used to report internal changes within the
	// reader.
	Logger Logger

	// ErrorLogger is the logger used to report errors. If nil, the reader falls
	// back to using Logger instead.
	ErrorLogger Logger
	// contains filtered or unexported fields
}

ConsumerGroupConfig is a configuration object used to create new instances of ConsumerGroup.

func (*ConsumerGroupConfig) Validate added in v0.4.2

func (config *ConsumerGroupConfig) Validate() error

Validate method validates ConsumerGroupConfig properties and sets relevant defaults.

type CreateTopicStatus added in v0.4.2

type CreateTopicStatus struct {
	// Name of the topic that the status is being reported for.
	Topic string

	// An error that may have occured while attempting to create the topic.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error

	// Number of partitions in the topic.
	//
	// This field will be zero if the kafka broker did not support the
	// CreateTopics API in version 5 or above.
	NumPartitions int

	// Replication factor of the topic.
	//
	// This field will be zero if the kafka broker did not support the
	// CreateTopics API in version 5 or above.
	ReplicationFactor int

	// Configuration of the topic, including read-only and potentially sensitive
	// entries.
	//
	// This field will be zero if the kafka broker did not support the
	// CreateTopics API in version 5 or above.
	ConfigEntries []ConfigEntry
}

CreateTopicStatus contains the detailed results of a topic creation request for a specific topic.

type CreateTopicsRequest added in v0.4.2

type CreateTopicsRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// List of topics to create and their configuration.
	Topics []TopicConfig

	// When set to true, topics are not created but the configuration is
	// validated as if they were.
	//
	// This field will be ignored if the kafka broker did no support the
	// CreateTopics API in version 1 or above.
	ValidateOnly bool
}

CreateTopicRequests represents a request sent to a kafka broker to create new topics.

type CreateTopicsResponse added in v0.4.2

type CreateTopicsResponse struct {
	// The amount of time that the broker throttled the request.
	//
	// This field will be zero if the kafka broker did no support the
	// CreateTopics API in version 2 or above.
	Throttle time.Duration

	// Details about the results of each topic creation request received by the
	// kafka broker.
	Topics []CreateTopicStatus
}

CreateTopicResponse represents a response from a kafka broker to a topic creation request.

type DeleteTopicsRequest added in v0.4.2

type DeleteTopicsRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// Names of topics to delete.
	Topics []string
}

DeleteTopicsRequest represents a request sent to a kafka broker to delete topics.

type DeleteTopicsResponse added in v0.4.2

type DeleteTopicsResponse struct {
	// The amount of time that the broker throttled the request.
	//
	// This field will be zero if the kafka broker did no support the
	// DeleteTopics API in version 1 or above.
	Throttle time.Duration

	// Mapping of topic names to errors that occured while attempting to delete
	// the topics.
	//
	// The errors contain the kafka error code. Programs may use the standard
	// errors.Is function to test the error against kafka error codes.
	Errors map[string]error
}

DeleteTopicsResponse represents a response from a kafka broker to a topic deletion request.

type Dialer

type Dialer struct {
	// Unique identifier for client connections established by this Dialer.
	ClientID string

	// Timeout is the maximum amount of time a dial will wait for a connect to
	// complete. If Deadline is also set, it may fail earlier.
	//
	// The default is no timeout.
	//
	// When dialing a name with multiple IP addresses, the timeout may be
	// divided between them.
	//
	// With or without a timeout, the operating system may impose its own
	// earlier timeout. For instance, TCP timeouts are often around 3 minutes.
	Timeout time.Duration

	// Deadline is the absolute point in time after which dials will fail.
	// If Timeout is set, it may fail earlier.
	// Zero means no deadline, or dependent on the operating system as with the
	// Timeout option.
	Deadline time.Time

	// LocalAddr is the local address to use when dialing an address.
	// The address must be of a compatible type for the network being dialed.
	// If nil, a local address is automatically chosen.
	LocalAddr net.Addr

	// DualStack enables RFC 6555-compliant "Happy Eyeballs" dialing when the
	// network is "tcp" and the destination is a host name with both IPv4 and
	// IPv6 addresses. This allows a client to tolerate networks where one
	// address family is silently broken.
	DualStack bool

	// FallbackDelay specifies the length of time to wait before spawning a
	// fallback connection, when DualStack is enabled.
	// If zero, a default delay of 300ms is used.
	FallbackDelay time.Duration

	// KeepAlive specifies the keep-alive period for an active network
	// connection.
	// If zero, keep-alives are not enabled. Network protocols that do not
	// support keep-alives ignore this field.
	KeepAlive time.Duration

	// Resolver optionally specifies an alternate resolver to use.
	Resolver Resolver

	// TLS enables Dialer to open secure connections.  If nil, standard net.Conn
	// will be used.
	TLS *tls.Config

	// SASLMechanism configures the Dialer to use SASL authentication.  If nil,
	// no authentication will be performed.
	SASLMechanism sasl.Mechanism

	// The transactional id to use for transactional delivery. Idempotent
	// deliver should be enabled if transactional id is configured.
	// For more details look at transactional.id description here: http://kafka.apache.org/documentation.html#producerconfigs
	// Empty string means that the connection will be non-transactional.
	TransactionalID string
}

The Dialer type mirrors the net.Dialer API but is designed to open kafka connections instead of raw network connections.

func (*Dialer) Dial

func (d *Dialer) Dial(network string, address string) (*Conn, error)

Dial connects to the address on the named network.

func (*Dialer) DialContext

func (d *Dialer) DialContext(ctx context.Context, network string, address string) (*Conn, error)

DialContext connects to the address on the named network using the provided context.

The provided Context must be non-nil. If the context expires before the connection is complete, an error is returned. Once successfully connected, any expiration of the context will not affect the connection.

When using TCP, and the host in the address parameter resolves to multiple network addresses, any dial timeout (from d.Timeout or ctx) is spread over each consecutive dial, such that each is given an appropriate fraction of the time to connect. For example, if a host has 4 IP addresses and the timeout is 1 minute, the connect to each single address will be given 15 seconds to complete before trying the next one.

func (*Dialer) DialLeader

func (d *Dialer) DialLeader(ctx context.Context, network string, address string, topic string, partition int) (*Conn, error)

DialLeader opens a connection to the leader of the partition for a given topic.

The address given to the DialContext method may not be the one that the connection will end up being established to, because the dialer will lookup the partition leader for the topic and return a connection to that server. The original address is only used as a mechanism to discover the configuration of the kafka cluster that we're connecting to.

func (*Dialer) DialPartition

func (d *Dialer) DialPartition(ctx context.Context, network string, address string, partition Partition) (*Conn, error)

DialPartition opens a connection to the leader of the partition specified by partition descriptor. It's strongly advised to use descriptor of the partition that comes out of functions LookupPartition or LookupPartitions.

func (*Dialer) LookupLeader

func (d *Dialer) LookupLeader(ctx context.Context, network string, address string, topic string, partition int) (Broker, error)

LookupLeader searches for the kafka broker that is the leader of the partition for a given topic, returning a Broker value representing it.

func (*Dialer) LookupPartition

func (d *Dialer) LookupPartition(ctx context.Context, network string, address string, topic string, partition int) (Partition, error)

LookupPartition searches for the description of specified partition id.

func (*Dialer) LookupPartitions

func (d *Dialer) LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error)

LookupPartitions returns the list of partitions that exist for the given topic.

type DurationStats

type DurationStats struct {
	Avg time.Duration `metric:"avg" type:"gauge"`
	Min time.Duration `metric:"min" type:"gauge"`
	Max time.Duration `metric:"max" type:"gauge"`
}

DurationStats is a data structure that carries a summary of observed duration values. The average, minimum, and maximum are reported.

type Error

type Error int

Error represents the different error codes that may be returned by kafka. https://kafka.apache.org/protocol#protocol_error_codes

const (
	Unknown                            Error = -1
	OffsetOutOfRange                   Error = 1
	InvalidMessage                     Error = 2
	UnknownTopicOrPartition            Error = 3
	InvalidMessageSize                 Error = 4
	LeaderNotAvailable                 Error = 5
	NotLeaderForPartition              Error = 6
	RequestTimedOut                    Error = 7
	BrokerNotAvailable                 Error = 8
	ReplicaNotAvailable                Error = 9
	MessageSizeTooLarge                Error = 10
	StaleControllerEpoch               Error = 11
	OffsetMetadataTooLarge             Error = 12
	NetworkException                   Error = 13
	GroupLoadInProgress                Error = 14
	GroupCoordinatorNotAvailable       Error = 15
	NotCoordinatorForGroup             Error = 16
	InvalidTopic                       Error = 17
	RecordListTooLarge                 Error = 18
	NotEnoughReplicas                  Error = 19
	NotEnoughReplicasAfterAppend       Error = 20
	InvalidRequiredAcks                Error = 21
	IllegalGeneration                  Error = 22
	InconsistentGroupProtocol          Error = 23
	InvalidGroupId                     Error = 24
	UnknownMemberId                    Error = 25
	InvalidSessionTimeout              Error = 26
	RebalanceInProgress                Error = 27
	InvalidCommitOffsetSize            Error = 28
	TopicAuthorizationFailed           Error = 29
	GroupAuthorizationFailed           Error = 30
	ClusterAuthorizationFailed         Error = 31
	InvalidTimestamp                   Error = 32
	UnsupportedSASLMechanism           Error = 33
	IllegalSASLState                   Error = 34
	UnsupportedVersion                 Error = 35
	TopicAlreadyExists                 Error = 36
	InvalidPartitionNumber             Error = 37
	InvalidReplicationFactor           Error = 38
	InvalidReplicaAssignment           Error = 39
	InvalidConfiguration               Error = 40
	NotController                      Error = 41
	InvalidRequest                     Error = 42
	UnsupportedForMessageFormat        Error = 43
	PolicyViolation                    Error = 44
	OutOfOrderSequenceNumber           Error = 45
	DuplicateSequenceNumber            Error = 46
	InvalidProducerEpoch               Error = 47
	InvalidTransactionState            Error = 48
	InvalidProducerIDMapping           Error = 49
	InvalidTransactionTimeout          Error = 50
	ConcurrentTransactions             Error = 51
	TransactionCoordinatorFenced       Error = 52
	TransactionalIDAuthorizationFailed Error = 53
	SecurityDisabled                   Error = 54
	BrokerAuthorizationFailed          Error = 55
	KafkaStorageError                  Error = 56
	LogDirNotFound                     Error = 57
	SASLAuthenticationFailed           Error = 58
	UnknownProducerId                  Error = 59
	ReassignmentInProgress             Error = 60
	DelegationTokenAuthDisabled        Error = 61
	DelegationTokenNotFound            Error = 62
	DelegationTokenOwnerMismatch       Error = 63
	DelegationTokenRequestNotAllowed   Error = 64
	DelegationTokenAuthorizationFailed Error = 65
	DelegationTokenExpired             Error = 66
	InvalidPrincipalType               Error = 67
	NonEmptyGroup                      Error = 68
	GroupIdNotFound                    Error = 69
	FetchSessionIDNotFound             Error = 70
	InvalidFetchSessionEpoch           Error = 71
	ListenerNotFound                   Error = 72
	TopicDeletionDisabled              Error = 73
	FencedLeaderEpoch                  Error = 74
	UnknownLeaderEpoch                 Error = 75
	UnsupportedCompressionType         Error = 76
	StaleBrokerEpoch                   Error = 77
	OffsetNotAvailable                 Error = 78
	MemberIDRequired                   Error = 79
	PreferredLeaderNotAvailable        Error = 80
	GroupMaxSizeReached                Error = 81
	FencedInstanceID                   Error = 82
)

func (Error) Description

func (e Error) Description() string

Description returns a human readable description of cause of the error.

func (Error) Error

func (e Error) Error() string

Error satisfies the error interface.

func (Error) Temporary

func (e Error) Temporary() bool

Temporary returns true if the operation that generated the error may succeed if retried at a later time. Kafka error documentation specifies these as "retriable" https://kafka.apache.org/protocol#protocol_error_codes

func (Error) Timeout

func (e Error) Timeout() bool

Timeout returns true if the error was due to a timeout.

func (Error) Title

func (e Error) Title() string

Title returns a human readable title for the error.

type FetchRequest added in v0.4.2

type FetchRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// Topic, partition, and offset to retrieve records from.
	Topic     string
	Partition int
	Offset    int64

	// Size and time limits of the response returned by the broker.
	MinBytes int64
	MaxBytes int64
	MaxWait  time.Duration

	// The isolation level for the request.
	//
	// Defaults to ReadUncommitted.
	//
	// This field requires the kafka broker to support the Fetch API in version
	// 4 or above (otherwise the value is ignored).
	IsolationLevel IsolationLevel
}

FetchRequest represents a request sent to a kafka broker to retrieve records from a topic partition.

type FetchResponse added in v0.4.2

type FetchResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// The topic and partition that the response came for (will match the values
	// in the request).
	Topic     string
	Partition int

	// Informations about the topic partition layout returned from the broker.
	//
	// LastStableOffset requires the kafka broker to support the Fetch API in
	// version 4 or above (otherwise the value is zero).
	//
	/// LogStartOffset requires the kafka broker to support the Fetch API in
	// version 5 or above (otherwise the value is zero).
	HighWatermark    int64
	LastStableOffset int64
	LogStartOffset   int64

	// An error that may have occured while attempting to fetch the records.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error

	// The set of records returned in the response.
	//
	// The program is expected to call the RecordSet's Close method when it
	// finished reading the records.
	Records RecordSet

	// Indicates whether the response returned by the broker was transactional
	// or a control batch.
	Transactional bool
	ControlBatch  bool
}

FetchResponse represents a response from a kafka broker to a fetch request.

type Generation added in v0.4.2

type Generation struct {
	// ID is the generation ID as assigned by the consumer group coordinator.
	ID int32

	// GroupID is the name of the consumer group.
	GroupID string

	// MemberID is the ID assigned to this consumer by the consumer group
	// coordinator.
	MemberID string

	// Assignments is the initial state of this Generation.  The partition
	// assignments are grouped by topic.
	Assignments map[string][]PartitionAssignment
	// contains filtered or unexported fields
}

Generation represents a single consumer group generation. The generation carries the topic+partition assignments for the given. It also provides facilities for committing offsets and for running functions whose lifecycles are bound to the generation.

func (*Generation) CommitOffsets added in v0.4.2

func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error

CommitOffsets commits the provided topic+partition+offset combos to the consumer group coordinator. This can be used to reset the consumer to explicit offsets.

func (*Generation) Start added in v0.4.2

func (g *Generation) Start(fn func(ctx context.Context))

Start launches the provided function in a go routine and adds accounting such that when the function exits, it stops the current generation (if not already in the process of doing so).

The provided function MUST support cancellation via the ctx argument and exit in a timely manner once the ctx is complete. When the context is closed, the context's Error() function will return ErrGenerationEnded.

When closing out a generation, the consumer group will wait for all functions launched by Start to exit before the group can move on and join the next generation. If the function does not exit promptly, it will stop forward progress for this consumer and potentially cause consumer group membership churn.

type GroupBalancer

type GroupBalancer interface {
	// ProtocolName of the GroupBalancer
	ProtocolName() string

	// UserData provides the GroupBalancer an opportunity to embed custom
	// UserData into the metadata.
	//
	// Will be used by JoinGroup to begin the consumer group handshake.
	//
	// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupRequest
	UserData() ([]byte, error)

	// DefineMemberships returns which members will be consuming
	// which topic partitions
	AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments
}

GroupBalancer encapsulates the client side rebalancing logic

type GroupMember

type GroupMember struct {
	// ID is the unique ID for this member as taken from the JoinGroup response.
	ID string

	// Topics is a list of topics that this member is consuming.
	Topics []string

	// UserData contains any information that the GroupBalancer sent to the
	// consumer group coordinator.
	UserData []byte
}

GroupMember describes a single participant in a consumer group.

type GroupMemberAssignments

type GroupMemberAssignments map[string]map[string][]int

GroupMemberAssignments holds MemberID => topic => partitions

type Hash

type Hash struct {
	Hasher hash.Hash32
	// contains filtered or unexported fields
}

Hash is a Balancer that uses the provided hash function to determine which partition to route messages to. This ensures that messages with the same key are routed to the same partition.

The logic to calculate the partition is:

hasher.Sum32() % len(partitions) => partition

By default, Hash uses the FNV-1a algorithm. This is the same algorithm used by the Sarama Producer and ensures that messages produced by kafka-go will be delivered to the same topics that the Sarama producer would be delivered to

func (*Hash) Balance

func (h *Hash) Balance(msg Message, partitions ...int) int
type Header = protocol.Header

Header is a key/value pair type representing headers set on records.

type IsolationLevel added in v0.4.2

type IsolationLevel int8
const (
	ReadUncommitted IsolationLevel = 0
	ReadCommitted   IsolationLevel = 1
)

type LeastBytes

type LeastBytes struct {
	// contains filtered or unexported fields
}

LeastBytes is a Balancer implementation that routes messages to the partition that has received the least amount of data.

Note that no coordination is done between multiple producers, having good balancing relies on the fact that each producer using a LeastBytes balancer should produce well balanced messages.

func (*LeastBytes) Balance

func (lb *LeastBytes) Balance(msg Message, partitions ...int) int

Balance satisfies the Balancer interface.

type ListOffsetsRequest added in v0.4.2

type ListOffsetsRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// A mapping of topic names to list of partitions that the program wishes to
	// get the offsets for.
	Topics map[string][]OffsetRequest

	// The isolation level for the request.
	//
	// Defaults to ReadUncommitted.
	//
	// This field requires the kafka broker to support the ListOffsets API in
	// version 2 or above (otherwise the value is ignored).
	IsolationLevel IsolationLevel
}

ListOffsetsRequest represents a request sent to a kafka broker to list of the offsets of topic partitions.

type ListOffsetsResponse added in v0.4.2

type ListOffsetsResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// Mappings of topics names to partition offsets, there will be one entry
	// for each topic in the request.
	Topics map[string][]PartitionOffsets
}

ListOffsetsResponse represents a response from a kafka broker to a offset listing request.

type Logger added in v0.4.2

type Logger interface {
	Printf(string, ...interface{})
}

Logger interface API for log.Logger

type LoggerFunc added in v0.4.2

type LoggerFunc func(string, ...interface{})

LoggerFunc is a bridge between Logger and any third party logger Usage:

l := NewLogger() // some logger
r := kafka.NewReader(kafka.ReaderConfig{
  Logger:      kafka.LoggerFunc(l.Infof),
  ErrorLogger: kafka.LoggerFunc(l.Errorf),
})

func (LoggerFunc) Printf added in v0.4.2

func (f LoggerFunc) Printf(msg string, args ...interface{})

type Message

type Message struct {
	// Topic is reads only and MUST NOT be set when writing messages
	Topic string

	// Partition is reads only and MUST NOT be set when writing messages
	Partition int
	Offset    int64
	Key       []byte
	Value     []byte
	Headers   []Header

	// If not set at the creation, Time will be automatically set when
	// writing the message.
	Time time.Time
}

Message is a data structure representing kafka messages.

type MessageTooLargeError added in v0.4.2

type MessageTooLargeError struct {
	Message   Message
	Remaining []Message
}

func (MessageTooLargeError) Error added in v0.4.2

func (e MessageTooLargeError) Error() string

type MetadataRequest added in v0.4.2

type MetadataRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// The list of topics to retrieve metadata for.
	Topics []string
}

MetadataRequest represents a request sent to a kafka broker to retrieve its cluster metadata.

type MetadataResponse added in v0.4.2

type MetadataResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// Name of the kafka cluster that client retrieved metadata from.
	ClusterID string

	// The broker which is currently the controller for the cluster.
	Controller Broker

	// The list of brokers registered to the cluster.
	Brokers []Broker

	// The list of topics available on the cluster.
	Topics []Topic
}

MetadatResponse represents a response from a kafka broker to a metadata request.

type Murmur2Balancer added in v0.4.2

type Murmur2Balancer struct {
	Consistent bool
	// contains filtered or unexported fields
}

Murmur2Balancer is a Balancer that uses the Murmur2 hash function to determine which partition to route messages to. This ensures that messages with the same key are routed to the same partition. This balancer is compatible with the partitioner used by the Java library and by librdkafka's "murmur2" and "murmur2_random" partitioners. /

With the Consistent field false (default), this partitioner is equivalent to the "murmur2_random" setting in librdkafka. When Consistent is true, this partitioner is equivalent to the "murmur2" setting. The latter will hash nil keys into the same partition. Empty, non-nil keys are always hashed to the same partition regardless of configuration.

Unless you are absolutely certain that all your messages will have keys, it's best to leave the Consistent flag off. Otherwise, you run the risk of creating a very hot partition.

Note that the librdkafka documentation states that the "murmur2_random" is functionally equivalent to the default Java partitioner. That's because the Java partitioner will use a round robin balancer instead of random on nil keys. We choose librdkafka's implementation because it arguably has a larger install base.

func (Murmur2Balancer) Balance added in v0.4.2

func (b Murmur2Balancer) Balance(msg Message, partitions ...int) (partition int)

type OffsetFetchPartition added in v0.4.2

type OffsetFetchPartition struct {
	// ID of the partition.
	Partition int

	// Last committed offsets on the partition when the request was served by
	// the kafka broker.
	CommittedOffset int64

	// Consumer group metadata for this partition.
	Metadata string

	// An error that may have occured while attempting to retrieve consumer
	// group offsets for this partition.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error
}

OffsetFetchPartition represents the state of a single partition in a consumer group.

type OffsetFetchRequest added in v0.4.2

type OffsetFetchRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// ID of the consumer group to retrieve the offsets for.
	GroupID string

	// Set of topic partitions to retrieve the offsets for.
	Topics map[string][]int

	// When set to true, the broker is allowed to return unstable offsets.
	//
	// This field requires the kafka broker to suppor the OffsetFetch API in
	// version 7 or above (otherwise it is ignored).
	RequireStable bool
}

OffsetFetchRequest represents a request setnt to a kafka broker to read the currently committed offsets of topic partitions.

type OffsetFetchResponse added in v0.4.2

type OffsetFetchResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// Set of topic partitions that the kafka broker has returned offsets for.
	Topics map[string][]OffsetFetchPartition

	// An error that may have occured while attempting to retrieve consumer
	// group offsets.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error
}

OffsetFetchResponse represents a response from a kafka broker to a offset fetch request.

type OffsetRequest added in v0.4.2

type OffsetRequest struct {
	Partition int
	Timestamp int64
}

OffsetRequest represents a request to retrieve a single partition offset.

func FirstOffsetOf added in v0.4.2

func FirstOffsetOf(partition int) OffsetRequest

FirstOffsetOf constructs an OffsetRequest which asks for the first offset of the parition given as argument.

func LastOffsetOf added in v0.4.2

func LastOffsetOf(partition int) OffsetRequest

LastOffsetOf constructs an OffsetRequest which asks for the last offset of the partition given as argument.

func TimeOffsetOf added in v0.4.2

func TimeOffsetOf(partition int, at time.Time) OffsetRequest

TimeOffsetOf constructs an OffsetRequest which asks for a partition offset at a given time.

type Partition

type Partition struct {
	// Name of the topic that the partition belongs to, and its index in the
	// topic.
	Topic string
	ID    int

	// Leader, replicas, and ISR for the partition.
	Leader   Broker
	Replicas []Broker
	Isr      []Broker

	// An error that may have occured while attempting to read the partition
	// metadata.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error
}

Partition carries the metadata associated with a kafka partition.

func LookupPartition

func LookupPartition(ctx context.Context, network string, address string, topic string, partition int) (Partition, error)

LookupPartition is a convenience wrapper for DefaultDialer.LookupPartition.

func LookupPartitions

func LookupPartitions(ctx context.Context, network string, address string, topic string) ([]Partition, error)

LookupPartitions is a convenience wrapper for DefaultDialer.LookupPartitions.

type PartitionAssignment added in v0.4.2

type PartitionAssignment struct {
	// ID is the partition ID.
	ID int

	// Offset is the initial offset at which this assignment begins.  It will
	// either be an absolute offset if one has previously been committed for
	// the consumer group or a relative offset such as FirstOffset when this
	// is the first time the partition have been assigned to a member of the
	// group.
	Offset int64
}

PartitionAssignment represents the starting state of a partition that has been assigned to a consumer.

type PartitionOffsets added in v0.4.2

type PartitionOffsets struct {
	Partition   int
	FirstOffset int64
	LastOffset  int64
	Offsets     map[int64]time.Time
	Error       error
}

PartitionOffsets carries information about offsets available in a topic partition.

type ProduceRequest added in v0.4.2

type ProduceRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// The topic to produce the records to.
	Topic string

	// The partition to produce the records to.
	Partition int

	// The level of required acknowledgements to ask the kafka broker for.
	RequiredAcks int

	// The message format version used when encoding the records.
	//
	// By default, the client automatically determine which version should be
	// used based on the version of the Produce API supported by the server.
	MessageVersion int

	// An optional transaction id when producing to the kafka broker is part of
	// a transaction.
	TransactionalID string

	// The sequence of records to produce to the topic partition.
	Records RecordSet

	// An optional compression algorithm to apply to the batch of records sent
	// to the kafka broker.
	Compression compress.Compression
}

ProduceRequest represents a request sent to a kafka broker to produce records to a topic partition.

type ProduceResponse added in v0.4.2

type ProduceResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// An error that may have occured while attempting to produce the records.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error

	// Offset of the first record that was written to the topic partition.
	//
	// This field will be zero if the kafka broker did no support the Produce
	// API in version 3 or above.
	BaseOffset int64

	// Time at which the broker wrote the records to the topic partition.
	//
	// This field will be zero if the kafka broker did no support the Produce
	// API in version 2 or above.
	LogAppendTime time.Time

	// First offset in the topic partition that the records were written to.
	//
	// This field will be zero if the kafka broker did no support the Produce
	// API in version 5 or above (or if the first offset is zero).
	LogStartOffset int64

	// If errors occured writing specific records, they will be reported in this
	// map.
	//
	// This field will always be empty if the kafka broker did no support the
	// Produce API in version 8 or above.
	RecordErrors map[int]error
}

ProduceResponse represents a response from a kafka broker to a produce request.

type RackAffinityGroupBalancer added in v0.4.2

type RackAffinityGroupBalancer struct {
	// Rack is the name of the rack where this consumer is running.  It will be
	// communicated to the consumer group leader via the UserData so that
	// assignments can be made with affinity to the partition leader.
	Rack string
}

RackAffinityGroupBalancer makes a best effort to pair up consumers with partitions whose leader is in the same rack. This strategy can have performance benefits by minimizing round trip latency between the consumer and the broker. In environments where network traffic across racks incurs charges (such as cross AZ data transfer in AWS), this strategy is also a cost optimization measure because it keeps network traffic within the local rack where possible.

The primary objective is to spread partitions evenly across consumers with a secondary focus on maximizing the number of partitions where the leader and the consumer are in the same rack. For best affinity, it's recommended to have a balanced spread of consumers and partition leaders across racks.

This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do not return the brokers' racks in the metadata request.

func (RackAffinityGroupBalancer) AssignGroups added in v0.4.2

func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments

func (RackAffinityGroupBalancer) ProtocolName added in v0.4.2

func (r RackAffinityGroupBalancer) ProtocolName() string

func (RackAffinityGroupBalancer) UserData added in v0.4.2

func (r RackAffinityGroupBalancer) UserData() ([]byte, error)

type RangeGroupBalancer

type RangeGroupBalancer struct{}

RangeGroupBalancer groups consumers by partition

Example: 5 partitions, 2 consumers

C0: [0, 1, 2]
C1: [3, 4]

Example: 6 partitions, 3 consumers

C0: [0, 1]
C1: [2, 3]
C2: [4, 5]

func (RangeGroupBalancer) AssignGroups

func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments

func (RangeGroupBalancer) ProtocolName

func (r RangeGroupBalancer) ProtocolName() string

func (RangeGroupBalancer) UserData

func (r RangeGroupBalancer) UserData() ([]byte, error)

type ReadBatchConfig added in v0.4.2

type ReadBatchConfig struct {
	MinBytes int
	MaxBytes int

	// IsolationLevel controls the visibility of transactional records.
	// ReadUncommitted makes all records visible. With ReadCommitted only
	// non-transactional and committed records are visible.
	IsolationLevel IsolationLevel
}

ReadBatchConfig is a configuration object used for reading batches of messages.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader provides a high-level API for consuming messages from kafka.

A Reader automatically manages reconnections to a kafka server, and blocking methods have context support for asynchronous cancellations.

func NewReader

func NewReader(config ReaderConfig) *Reader

NewReader creates and returns a new Reader configured with config. The offset is initialized to FirstOffset.

func (*Reader) Close

func (r *Reader) Close() error

Close closes the stream, preventing the program from reading any more messages from it.

func (*Reader) CommitMessages

func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error

CommitMessages commits the list of messages passed as argument. The program may pass a context to asynchronously cancel the commit operation when it was configured to be blocking.

func (*Reader) Config

func (r *Reader) Config() ReaderConfig

Config returns the reader's configuration.

func (*Reader) FetchMessage

func (r *Reader) FetchMessage(ctx context.Context) (Message, error)

FetchMessage reads and return the next message from the r. The method call blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.

The method returns io.EOF to indicate that the reader has been closed.

FetchMessage does not commit offsets automatically when using consumer groups. Use CommitMessages to commit the offset.

func (*Reader) Lag

func (r *Reader) Lag() int64

Lag returns the lag of the last message returned by ReadMessage, or -1 if r is backed by a consumer group.

func (*Reader) Offset

func (r *Reader) Offset() int64

Offset returns the current absolute offset of the reader, or -1 if r is backed by a consumer group.

func (*Reader) ReadLag

func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error)

ReadLag returns the current lag of the reader by fetching the last offset of the topic and partition and computing the difference between that value and the offset of the last message returned by ReadMessage.

This method is intended to be used in cases where a program may be unable to call ReadMessage to update the value returned by Lag, but still needs to get an up to date estimation of how far behind the reader is. For example when the consumer is not ready to process the next message.

The function returns a lag of zero when the reader's current offset is negative.

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) (Message, error)

ReadMessage reads and return the next message from the r. The method call blocks until a message becomes available, or an error occurs. The program may also specify a context to asynchronously cancel the blocking operation.

The method returns io.EOF to indicate that the reader has been closed.

If consumer groups are used, ReadMessage will automatically commit the offset when called.

func (*Reader) SetOffset

func (r *Reader) SetOffset(offset int64) error

SetOffset changes the offset from which the next batch of messages will be read. The method fails with io.ErrClosedPipe if the reader has already been closed.

From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first or last available offset in the partition. Please note while -1 and -2 were accepted to indicate the first or last offset in previous versions, the meanings of the numbers were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol specification.

func (*Reader) SetOffsetAt added in v0.4.2

func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error

SetOffsetAt changes the offset from which the next batch of messages will be read given the timestamp t.

The method fails if the unable to connect partition leader, or unable to read the offset given the ts, or if the reader has been closed.

func (*Reader) Stats

func (r *Reader) Stats() ReaderStats

Stats returns a snapshot of the reader stats since the last time the method was called, or since the reader was created if it is called for the first time.

A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka reader and report the metrics to a stats collection system.

type ReaderConfig

type ReaderConfig struct {
	// The list of broker addresses used to connect to the kafka cluster.
	Brokers []string

	// GroupID holds the optional consumer group id.  If GroupID is specified, then
	// Partition should NOT be specified e.g. 0
	GroupID string

	// The topic to read messages from.
	Topic string

	// Partition to read messages from.  Either Partition or GroupID may
	// be assigned, but not both
	Partition int

	// An dialer used to open connections to the kafka server. This field is
	// optional, if nil, the default dialer is used instead.
	Dialer *Dialer

	// The capacity of the internal message queue, defaults to 100 if none is
	// set.
	QueueCapacity int

	// Min and max number of bytes to fetch from kafka in each request.
	MinBytes int
	MaxBytes int

	// Maximum amount of time to wait for new data to come when fetching batches
	// of messages from kafka.
	MaxWait time.Duration

	// ReadLagInterval sets the frequency at which the reader lag is updated.
	// Setting this field to a negative value disables lag reporting.
	ReadLagInterval time.Duration

	// GroupBalancers is the priority-ordered list of client-side consumer group
	// balancing strategies that will be offered to the coordinator.  The first
	// strategy that all group members support will be chosen by the leader.
	//
	// Default: [Range, RoundRobin]
	//
	// Only used when GroupID is set
	GroupBalancers []GroupBalancer

	// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
	// group heartbeat update.
	//
	// Default: 3s
	//
	// Only used when GroupID is set
	HeartbeatInterval time.Duration

	// CommitInterval indicates the interval at which offsets are committed to
	// the broker.  If 0, commits will be handled synchronously.
	//
	// Default: 0
	//
	// Only used when GroupID is set
	CommitInterval time.Duration

	// PartitionWatchInterval indicates how often a reader checks for partition changes.
	// If a reader sees a partition change (such as a partition add) it will rebalance the group
	// picking up new partitions.
	//
	// Default: 5s
	//
	// Only used when GroupID is set and WatchPartitionChanges is set.
	PartitionWatchInterval time.Duration

	// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
	// polling the brokers and rebalancing if any partition changes happen to the topic.
	WatchPartitionChanges bool

	// SessionTimeout optionally sets the length of time that may pass without a heartbeat
	// before the coordinator considers the consumer dead and initiates a rebalance.
	//
	// Default: 30s
	//
	// Only used when GroupID is set
	SessionTimeout time.Duration

	// RebalanceTimeout optionally sets the length of time the coordinator will wait
	// for members to join as part of a rebalance.  For kafka servers under higher
	// load, it may be useful to set this value higher.
	//
	// Default: 30s
	//
	// Only used when GroupID is set
	RebalanceTimeout time.Duration

	// JoinGroupBackoff optionally sets the length of time to wait between re-joining
	// the consumer group after an error.
	//
	// Default: 5s
	JoinGroupBackoff time.Duration

	// RetentionTime optionally sets the length of time the consumer group will be saved
	// by the broker
	//
	// Default: 24h
	//
	// Only used when GroupID is set
	RetentionTime time.Duration

	// StartOffset determines from whence the consumer group should begin
	// consuming when it finds a partition without a committed offset.  If
	// non-zero, it must be set to one of FirstOffset or LastOffset.
	//
	// Default: FirstOffset
	//
	// Only used when GroupID is set
	StartOffset int64

	// BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
	// polling for new messages
	//
	// Default: 100ms
	ReadBackoffMin time.Duration

	// BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
	// polling for new messages
	//
	// Default: 1s
	ReadBackoffMax time.Duration

	// If not nil, specifies a logger used to report internal changes within the
	// reader.
	Logger Logger

	// ErrorLogger is the logger used to report errors. If nil, the reader falls
	// back to using Logger instead.
	ErrorLogger Logger

	// IsolationLevel controls the visibility of transactional records.
	// ReadUncommitted makes all records visible. With ReadCommitted only
	// non-transactional and committed records are visible.
	IsolationLevel IsolationLevel

	// Limit of how many attempts will be made before delivering the error.
	//
	// The default is to try 3 times.
	MaxAttempts int
}

ReaderConfig is a configuration object used to create new instances of Reader.

func (*ReaderConfig) Validate added in v0.4.2

func (config *ReaderConfig) Validate() error

Validate method validates ReaderConfig properties.

type ReaderStats

type ReaderStats struct {
	Dials      int64 `metric:"kafka.reader.dial.count"      type:"counter"`
	Fetches    int64 `metric:"kafka.reader.fetch.count"     type:"counter"`
	Messages   int64 `metric:"kafka.reader.message.count"   type:"counter"`
	Bytes      int64 `metric:"kafka.reader.message.bytes"   type:"counter"`
	Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"`
	Timeouts   int64 `metric:"kafka.reader.timeout.count"   type:"counter"`
	Errors     int64 `metric:"kafka.reader.error.count"     type:"counter"`

	DialTime   DurationStats `metric:"kafka.reader.dial.seconds"`
	ReadTime   DurationStats `metric:"kafka.reader.read.seconds"`
	WaitTime   DurationStats `metric:"kafka.reader.wait.seconds"`
	FetchSize  SummaryStats  `metric:"kafka.reader.fetch.size"`
	FetchBytes SummaryStats  `metric:"kafka.reader.fetch.bytes"`

	Offset        int64         `metric:"kafka.reader.offset"          type:"gauge"`
	Lag           int64         `metric:"kafka.reader.lag"             type:"gauge"`
	MinBytes      int64         `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`
	MaxBytes      int64         `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`
	MaxWait       time.Duration `metric:"kafka.reader.fetch_wait.max"  type:"gauge"`
	QueueLength   int64         `metric:"kafka.reader.queue.length"    type:"gauge"`
	QueueCapacity int64         `metric:"kafka.reader.queue.capacity"  type:"gauge"`

	ClientID  string `tag:"client_id"`
	Topic     string `tag:"topic"`
	Partition string `tag:"partition"`

	// The original `Fetches` field had a typo where the metric name was called
	// "kafak..." instead of "kafka...", in order to offer time to fix monitors
	// that may be relying on this mistake we are temporarily introducing this
	// field.
	DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"`
}

ReaderStats is a data structure returned by a call to Reader.Stats that exposes details about the behavior of the reader.

type Record added in v0.4.2

type Record interface {
	// The offset at which the record exists in a topic partition. This value
	// is ignored in produce requests.
	Offset() int64

	// Returns the time of the record. This value may be omitted in produce
	// requests to let kafka set the time when it saves the record.
	Time() time.Time

	// Returns a byte sequence containing the key of this record. The returned
	// sequence may be nil to indicate that the record has no key. If the record
	// is part of a RecordSet, the content of the key must remain valid at least
	// until the record set is closed (or until the key is closed).
	Key() Bytes

	// Returns a byte sequence containing the value of this record. The returned
	// sequence may be nil to indicate that the record has no value. If the
	// record is part of a RecordSet, the content of the value must remain valid
	// at least until the record set is closed (or until the value is closed).
	Value() Bytes

	// Returns the list of headers associated with this record. The returned
	// slice may be reused across calls, the program should use it as an
	// immutable value.
	Headers() []Header
}

Record is an interface representing a single kafka record.

Record values are not safe to use concurrently from multiple goroutines.

func NewRecord added in v0.4.2

func NewRecord(offset int64, time time.Time, key, value []byte, headers ...Header) Record

NewRecord constructs a new record from the arguments passed to the function.

type RecordSet added in v0.4.2

type RecordSet interface {
	io.Closer
	// Returns the next record in the set, or nil if all records of the sequence
	// have already been returned.
	Next() Record
}

RecordSet is an interface representing a sequence of records. Record sets are used in both produce and fetch requests to represent the sequence of records that are sent to or receive from kafka brokers.

RecordSet values are not safe to use concurrently from multiple goroutines.

func NewRecordSet added in v0.4.2

func NewRecordSet(records ...Record) RecordSet

NewRecordSet constructs a RecordSet which exposes the sequence of records passed as arguments.

type ReplicaAssignment

type ReplicaAssignment struct {
	Partition int
	// The list of brokers where the partition should be allocated. There must
	// be as many entries in thie list as there are replicas of the partition.
	// The first entry represents the broker that will be the preferred leader
	// for the partition.
	//
	// This field changed in 0.4 from `int` to `[]int`. It was invalid to pass
	// a single integer as this is supposed to be a list. While this introduces
	// a breaking change, it probably never worked before.
	Replicas []int
}

type Resolver

type Resolver interface {
	// LookupHost looks up the given host using the local resolver.
	// It returns a slice of that host's addresses.
	LookupHost(ctx context.Context, host string) (addrs []string, err error)
}

The Resolver interface is used as an abstraction to provide service discovery of the hosts of a kafka cluster.

type RoundRobin

type RoundRobin struct {
	// contains filtered or unexported fields
}

RoundRobin is an Balancer implementation that equally distributes messages across all available partitions.

func (*RoundRobin) Balance

func (rr *RoundRobin) Balance(msg Message, partitions ...int) int

Balance satisfies the Balancer interface.

type RoundRobinGroupBalancer

type RoundRobinGroupBalancer struct{}

RoundrobinGroupBalancer divides partitions evenly among consumers

Example: 5 partitions, 2 consumers

C0: [0, 2, 4]
C1: [1, 3]

Example: 6 partitions, 3 consumers

C0: [0, 3]
C1: [1, 4]
C2: [2, 5]

func (RoundRobinGroupBalancer) AssignGroups

func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments

func (RoundRobinGroupBalancer) ProtocolName

func (r RoundRobinGroupBalancer) ProtocolName() string

func (RoundRobinGroupBalancer) UserData

func (r RoundRobinGroupBalancer) UserData() ([]byte, error)

type RoundTripper added in v0.4.2

type RoundTripper interface {
	// RoundTrip sends a request to a kafka broker and returns the response that
	// was received, or a non-nil error.
	//
	// The context passed as first argument can be used to asynchronnously abort
	// the call if needed.
	RoundTrip(context.Context, net.Addr, protocol.Message) (protocol.Message, error)
}

RoundTripper is an interface implemented by types which support interacting with kafka brokers.

var DefaultTransport RoundTripper = &Transport{
	Dial: (&net.Dialer{
		Timeout:   3 * time.Second,
		DualStack: true,
	}).DialContext,
}

DefaultTransport is the default transport used by kafka clients in this package.

type SummaryStats

type SummaryStats struct {
	Avg int64 `metric:"avg" type:"gauge"`
	Min int64 `metric:"min" type:"gauge"`
	Max int64 `metric:"max" type:"gauge"`
}

SummaryStats is a data structure that carries a summary of observed values. The average, minimum, and maximum are reported.

type Topic added in v0.4.2

type Topic struct {
	// Name of the topic.
	Name string

	// True if the topic is internal.
	Internal bool

	// The list of partition currently available on this topic.
	Partitions []Partition

	// An error that may have occured while attempting to read the topic
	// metadata.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error
}

Topic represents a topic in a kafka cluster.

type TopicAndGroup added in v0.4.2

type TopicAndGroup struct {
	Topic   string
	GroupId string
}

A ConsumerGroup and Topic as these are both strings we define a type for clarity when passing to the Client as a function argument

N.B TopicAndGroup is currently experimental! Therefore, it is subject to change, including breaking changes between MINOR and PATCH releases.

DEPRECATED: this type will be removed in version 1.0, programs should migrate to use kafka.(*Client).OffsetFetch instead.

type TopicConfig

type TopicConfig struct {
	// Topic name
	Topic string

	// NumPartitions created. -1 indicates unset.
	NumPartitions int

	// ReplicationFactor for the topic. -1 indicates unset.
	ReplicationFactor int

	// ReplicaAssignments among kafka brokers for this topic partitions. If this
	// is set num_partitions and replication_factor must be unset.
	ReplicaAssignments []ReplicaAssignment

	// ConfigEntries holds topic level configuration for topic to be set.
	ConfigEntries []ConfigEntry
}

type Transport added in v0.4.2

type Transport struct {
	// A function used to establish connections to the kafka cluster.
	Dial func(context.Context, string, string) (net.Conn, error)

	// Time limit set for establishing connections to the kafka cluster. This
	// limit includes all round trips done to establish the connections (TLS
	// hadbhaske, SASL negotiation, etc...).
	//
	// Defaults to 5s.
	DialTimeout time.Duration

	// Maximum amount of time that the connections will wait for responses from
	// the kafka cluster. Be mindful that this value should be longer than the
	// duration comunicated in some messages (like the fetch MaxWait value for
	// example).
	//
	// Default to 10s.
	ReadTimeout time.Duration

	// Maximum amount of time that connections will remain open and unused.
	// The transport will manage to automatically close connections that have
	// been idle for too long, and re-open them on demand when the transport is
	// used again.
	//
	// Defaults to 30s.
	IdleTimeout time.Duration

	// TTL for the metadata cached by this transport. Note that the value
	// configured here is an upper bound, the transport randomizes the TTLs to
	// avoid getting into states where multiple clients end up synchronized and
	// cause bursts of requests to the kafka broker.
	//
	// Default to 6s.
	MetadataTTL time.Duration

	// Unique identifier that the transport communicates to the brokers when it
	// sends requests.
	ClientID string

	// An optional configuration for TLS connections established by this
	// transport.
	//
	// If the Server
	TLS *tls.Config

	// SASL configures the Transfer to use SASL authentication.
	SASL sasl.Mechanism
	// contains filtered or unexported fields
}

Transport is an implementation of the RoundTripper interface.

Transport values manage a pool of connections and automatically discovers the clusters layout to route requests to the appropriate brokers.

Transport values are safe to use concurrently from multiple goroutines.

Note: The intent is for the Transport to become the underlying layer of the kafka.Reader and kafka.Writer types.

func (*Transport) CloseIdleConnections added in v0.4.2

func (t *Transport) CloseIdleConnections()

CloseIdleConnections closes all idle connections immediately, and marks all connections that are in use to be closed when they become idle again.

func (*Transport) RoundTrip added in v0.4.2

func (t *Transport) RoundTrip(ctx context.Context, addr net.Addr, req protocol.Message) (protocol.Message, error)

RoundTrip sends a request to a kafka cluster and returns the response, or an error if no responses were received.

Message types are available in sub-packages of the protocol package. Each kafka API is implemented in a different sub-package. For example, the request and response types for the Fetch API are available in the protocol/fetch package.

The type of the response message will match the type of the request. For exmple, if RoundTrip was called with a *fetch.Request as argument, the value returned will be of type *fetch.Response. It is safe for the program to do a type assertion after checking that no error was returned.

This example illustrates the way this method is expected to be used:

r, err := transport.RoundTrip(ctx, addr, &fetch.Request{ ... })
if err != nil {
	...
} else {
	res := r.(*fetch.Response)
	...
}

The transport automatically selects the highest version of the API that is supported by both the kafka-go package and the kafka broker. The negotiation happens transparently once when connections are established.

This API was introduced in version 0.4 as a way to leverage the lower-level features of the kafka protocol, but also provide a more efficient way of managing connections to kafka brokers.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

The Writer type provides the implementation of a producer of kafka messages that automatically distributes messages across partitions of a single topic using a configurable balancing policy.

Instances of Writer are safe to use concurrently from multiple goroutines.

Example
w := kafka.NewWriter(kafka.WriterConfig{
	Brokers: []string{"localhost:9092"},
	Topic:   "Topic-1",
})

w.WriteMessages(context.Background(),
	kafka.Message{
		Key:   []byte("Key-A"),
		Value: []byte("Hello World!"),
	},
)

w.Close()
Output:

func NewWriter

func NewWriter(config WriterConfig) *Writer

NewWriter creates and returns a new Writer configured with config.

func (*Writer) Close

func (w *Writer) Close() (err error)

Close flushes all buffered messages and closes the writer. The call to Close aborts any concurrent calls to WriteMessages, which then return with the io.ErrClosedPipe error.

func (*Writer) Stats

func (w *Writer) Stats() WriterStats

Stats returns a snapshot of the writer stats since the last time the method was called, or since the writer was created if it is called for the first time.

A typical use of this method is to spawn a goroutine that will periodically call Stats on a kafka writer and report the metrics to a stats collection system.

func (*Writer) WriteMessages

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error

WriteMessages writes a batch of messages to the kafka topic configured on this writer.

Unless the writer was configured to write messages asynchronously, the method blocks until all messages have been written, or until the maximum number of attempts was reached.

When sending synchronously and the writer's batch size is configured to be greater than 1, this method blocks until either a full batch can be assembled or the batch timeout is reached. The batch size and timeouts are evaluated per partition, so the choice of Balancer can also influence the flushing behavior. For example, the Hash balancer will require on average N * batch size messages to trigger a flush where N is the number of partitions. The best way to achieve good batching behavior is to share one Writer amongst multiple go routines.

When the method returns an error, there's no way to know yet which messages have succeeded of failed.

The context passed as first argument may also be used to asynchronously cancel the operation. Note that in this case there are no guarantees made on whether messages were written to kafka. The program should assume that the whole batch failed and re-write the messages later (which could then cause duplicates).

type WriterConfig

type WriterConfig struct {
	// The list of brokers used to discover the partitions available on the
	// kafka cluster.
	//
	// This field is required, attempting to create a writer with an empty list
	// of brokers will panic.
	Brokers []string

	// The topic that the writer will produce messages to.
	//
	// This field is required, attempting to create a writer with an empty topic
	// will panic.
	Topic string

	// The dialer used by the writer to establish connections to the kafka
	// cluster.
	//
	// If nil, the default dialer is used instead.
	Dialer *Dialer

	// The balancer used to distribute messages across partitions.
	//
	// The default is to use a round-robin distribution.
	Balancer Balancer

	// Limit on how many attempts will be made to deliver a message.
	//
	// The default is to try at most 10 times.
	MaxAttempts int

	// A hint on the capacity of the writer's internal message queue.
	//
	// The default is to use a queue capacity of 100 messages.
	QueueCapacity int

	// Limit on how many messages will be buffered before being sent to a
	// partition.
	//
	// The default is to use a target batch size of 100 messages.
	BatchSize int

	// Limit the maximum size of a request in bytes before being sent to
	// a partition.
	//
	// The default is to use a kafka default value of 1048576.
	BatchBytes int

	// Time limit on how often incomplete message batches will be flushed to
	// kafka.
	//
	// The default is to flush at least every second.
	BatchTimeout time.Duration

	// Timeout for read operations performed by the Writer.
	//
	// Defaults to 10 seconds.
	ReadTimeout time.Duration

	// Timeout for write operation performed by the Writer.
	//
	// Defaults to 10 seconds.
	WriteTimeout time.Duration

	// This interval defines how often the list of partitions is refreshed from
	// kafka. It allows the writer to automatically handle when new partitions
	// are added to a topic.
	//
	// The default is to refresh partitions every 15 seconds.
	RebalanceInterval time.Duration

	// Connections that were idle for this duration will not be reused.
	//
	// Defaults to 9 minutes.
	IdleConnTimeout time.Duration

	// Number of acknowledges from partition replicas required before receiving
	// a response to a produce request (default to -1, which means to wait for
	// all replicas).
	RequiredAcks int

	// Setting this flag to true causes the WriteMessages method to never block.
	// It also means that errors are ignored since the caller will not receive
	// the returned value. Use this only if you don't care about guarantees of
	// whether the messages were written to kafka.
	Async bool

	// CompressionCodec set the codec to be used to compress Kafka messages.
	// Note that messages are allowed to overwrite the compression codec individually.
	CompressionCodec

	// If not nil, specifies a logger used to report internal changes within the
	// writer.
	Logger Logger

	// ErrorLogger is the logger used to report errors. If nil, the writer falls
	// back to using Logger instead.
	ErrorLogger Logger
	// contains filtered or unexported fields
}

WriterConfig is a configuration type used to create new instances of Writer.

func (*WriterConfig) Validate added in v0.4.2

func (config *WriterConfig) Validate() error

Validate method validates WriterConfig properties.

type WriterStats

type WriterStats struct {
	Dials      int64 `metric:"kafka.writer.dial.count"      type:"counter"`
	Writes     int64 `metric:"kafka.writer.write.count"     type:"counter"`
	Messages   int64 `metric:"kafka.writer.message.count"   type:"counter"`
	Bytes      int64 `metric:"kafka.writer.message.bytes"   type:"counter"`
	Rebalances int64 `metric:"kafka.writer.rebalance.count" type:"counter"`
	Errors     int64 `metric:"kafka.writer.error.count"     type:"counter"`

	DialTime   DurationStats `metric:"kafka.writer.dial.seconds"`
	WriteTime  DurationStats `metric:"kafka.writer.write.seconds"`
	WaitTime   DurationStats `metric:"kafka.writer.wait.seconds"`
	Retries    SummaryStats  `metric:"kafka.writer.retries.count"`
	BatchSize  SummaryStats  `metric:"kafka.writer.batch.size"`
	BatchBytes SummaryStats  `metric:"kafka.writer.batch.bytes"`

	MaxAttempts       int64         `metric:"kafka.writer.attempts.max"       type:"gauge"`
	MaxBatchSize      int64         `metric:"kafka.writer.batch.max"          type:"gauge"`
	BatchTimeout      time.Duration `metric:"kafka.writer.batch.timeout"      type:"gauge"`
	ReadTimeout       time.Duration `metric:"kafka.writer.read.timeout"       type:"gauge"`
	WriteTimeout      time.Duration `metric:"kafka.writer.write.timeout"      type:"gauge"`
	RebalanceInterval time.Duration `metric:"kafka.writer.rebalance.interval" type:"gauge"`
	RequiredAcks      int64         `metric:"kafka.writer.acks.required"      type:"gauge"`
	Async             bool          `metric:"kafka.writer.async"              type:"gauge"`
	QueueLength       int64         `metric:"kafka.writer.queue.length"       type:"gauge"`
	QueueCapacity     int64         `metric:"kafka.writer.queue.capacity"     type:"gauge"`

	ClientID string `tag:"client_id"`
	Topic    string `tag:"topic"`
}

WriterStats is a data structure returned by a call to Writer.Stats that exposes details about the behavior of the writer.

Directories

Path Synopsis
lz4
zstd
Package zstd implements Zstandard compression.
Package zstd implements Zstandard compression.
Package gzip does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package gzip does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package lz4 does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package lz4 does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package snappy does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package snappy does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package zstd does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.
Package zstd does nothing, it's kept for backward compatibility to avoid breaking the majority of programs that imported it to install the compression codec, which is now always included.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL