sarama

package module
v1.43.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2024 License: MIT Imports: 52 Imported by: 1,439

README

sarama

Go Reference OpenSSF Scorecard OpenSSF Best Practices

Sarama is an MIT-licensed Go client library for Apache Kafka.

Getting started

  • API documentation and examples are available via pkg.go.dev.
  • Mocks for testing are available in the mocks subpackage.
  • The examples directory contains more elaborate example applications.
  • The tools directory contains command line tools that can be useful for testing, diagnostics, and instrumentation.

You might also want to look at the Frequently Asked Questions.

Compatibility and API stability

Sarama provides a "2 releases + 2 months" compatibility guarantee: we support the two latest stable releases of Kafka and Go, and we provide a two month grace period for older releases. However, older releases of Kafka are still likely to work.

Sarama follows semantic versioning and provides API stability via the standard Go module version numbering scheme.

A changelog is available here.

Contributing

Documentation

Overview

Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient. Usage examples for the high-level APIs are provided inline with their full documentation.

To produce messages, use either the AsyncProducer or the SyncProducer. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible; it is preferred in most cases. The SyncProducer provides a method which will block until Kafka acknowledges the message as produced. This can be useful but comes with two caveats: it will generally be less efficient, and the actual durability guarantees depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

To consume messages, use Consumer or Consumer-Group API.

For lower-level needs, the Broker and Request/Response objects permit precise control over each connection and message sent on the wire; the Client provides higher-level metadata management that is shared between the producers and the consumer. The Request/Response objects and properties are mostly undocumented, as they line up exactly with the protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

Metrics are exposed through https://github.com/rcrowley/go-metrics library in a local registry.

Broker related metrics:

+---------------------------------------------------------+------------+---------------------------------------------------------------+
| Name                                                    | Type       | Description                                                   |
+---------------------------------------------------------+------------+---------------------------------------------------------------+
| incoming-byte-rate                                      | meter      | Bytes/second read off all brokers                             |
| incoming-byte-rate-for-broker-<broker-id>               | meter      | Bytes/second read off a given broker                          |
| outgoing-byte-rate                                      | meter      | Bytes/second written off all brokers                          |
| outgoing-byte-rate-for-broker-<broker-id>               | meter      | Bytes/second written off a given broker                       |
| request-rate                                            | meter      | Requests/second sent to all brokers                           |
| request-rate-for-broker-<broker-id>                     | meter      | Requests/second sent to a given broker                        |
| request-size                                            | histogram  | Distribution of the request size in bytes for all brokers     |
| request-size-for-broker-<broker-id>                     | histogram  | Distribution of the request size in bytes for a given broker  |
| request-latency-in-ms                                   | histogram  | Distribution of the request latency in ms for all brokers     |
| request-latency-in-ms-for-broker-<broker-id>            | histogram  | Distribution of the request latency in ms for a given broker  |
| response-rate                                           | meter      | Responses/second received from all brokers                    |
| response-rate-for-broker-<broker-id>                    | meter      | Responses/second received from a given broker                 |
| response-size                                           | histogram  | Distribution of the response size in bytes for all brokers    |
| response-size-for-broker-<broker-id>                    | histogram  | Distribution of the response size in bytes for a given broker |
| requests-in-flight                                      | counter    | The current number of in-flight requests awaiting a response  |
|                                                         |            | for all brokers                                               |
| requests-in-flight-for-broker-<broker-id>               | counter    | The current number of in-flight requests awaiting a response  |
|                                                         |            | for a given broker                                            |
| protocol-requests-rate-<api-key>          	          | meter      | Number of api requests sent to the brokers for all brokers    |
|                                                         |            | https://kafka.apache.org/protocol.html#protocol_api_keys      |                                        |
| protocol-requests-rate-<api-key>-for-broker-<broker-id> | meter      | Number of packets sent to the brokers by api-key for a given  |
|                                                         |            | broker                                                        |
+---------------------------------------------------------+------------+---------------------------------------------------------------+

Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.

Producer related metrics:

+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| Name                                      | Type       | Description                                                                          |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| batch-size                                | histogram  | Distribution of the number of bytes sent per partition per request for all topics    |
| batch-size-for-topic-<topic>              | histogram  | Distribution of the number of bytes sent per partition per request for a given topic |
| record-send-rate                          | meter      | Records/second sent to all topics                                                    |
| record-send-rate-for-topic-<topic>        | meter      | Records/second sent to a given topic                                                 |
| records-per-request                       | histogram  | Distribution of the number of records sent per request for all topics                |
| records-per-request-for-topic-<topic>     | histogram  | Distribution of the number of records sent per request for a given topic             |
| compression-ratio                         | histogram  | Distribution of the compression ratio times 100 of record batches for all topics     |
| compression-ratio-for-topic-<topic>       | histogram  | Distribution of the compression ratio times 100 of record batches for a given topic  |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+

Consumer related metrics:

+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| Name                                      | Type       | Description                                                                          |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+
| consumer-batch-size                       | histogram  | Distribution of the number of messages in a batch                                    |
| consumer-fetch-rate                       | meter      | Fetch requests/second sent to all brokers                                            |
| consumer-fetch-rate-for-broker-<broker>   | meter      | Fetch requests/second sent to a given broker                                         |
| consumer-fetch-rate-for-topic-<topic>     | meter      | Fetch requests/second sent for a given topic                                         |
| consumer-fetch-response-size              | histogram  | Distribution of the fetch response size in bytes                                     |
| consumer-group-join-total-<GroupID>       | counter    | Total count of consumer group join attempts                                          |
| consumer-group-join-failed-<GroupID>      | counter    | Total count of consumer group join failures                                          |
| consumer-group-sync-total-<GroupID>       | counter    | Total count of consumer group sync attempts                                          |
| consumer-group-sync-failed-<GroupID>      | counter    | Total count of consumer group sync failures                                          |
+-------------------------------------------+------------+--------------------------------------------------------------------------------------+

Index

Examples

Constants

View Source
const (
	// RangeBalanceStrategyName identifies strategies that use the range partition assignment strategy
	RangeBalanceStrategyName = "range"

	// RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy
	RoundRobinBalanceStrategyName = "roundrobin"

	// StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy
	StickyBalanceStrategyName = "sticky"
)
View Source
const (
	// SASLTypeOAuth represents the SASL/OAUTHBEARER mechanism (Kafka 2.0.0+)
	SASLTypeOAuth = "OAUTHBEARER"
	// SASLTypePlaintext represents the SASL/PLAIN mechanism
	SASLTypePlaintext = "PLAIN"
	// SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
	SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
	// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
	SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
	SASLTypeGSSAPI      = "GSSAPI"
	// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
	// server negotiate SASL auth using opaque packets.
	SASLHandshakeV0 = int16(0)
	// SASLHandshakeV1 is v1 of the Kafka SASL handshake protocol. Client and
	// server negotiate SASL by wrapping tokens with Kafka protocol headers.
	SASLHandshakeV1 = int16(1)
	// SASLExtKeyAuth is the reserved extension key name sent as part of the
	// SASL/OAUTHBEARER initial client response
	SASLExtKeyAuth = "auth"
)
View Source
const (
	// OffsetNewest stands for the log head offset, i.e. the offset that will be
	// assigned to the next message that will be produced to the partition. You
	// can send this to a client's GetOffset method to get this offset, or when
	// calling ConsumePartition to start consuming new messages.
	OffsetNewest int64 = -1
	// OffsetOldest stands for the oldest offset available on the broker for a
	// partition. You can send this to a client's GetOffset method to get this
	// offset, or when calling ConsumePartition to start consuming from the
	// oldest offset that is still available on the broker.
	OffsetOldest int64 = -2
)
View Source
const (
	TOK_ID_KRB_AP_REQ   = 256
	GSS_API_GENERIC_TAG = 0x60
	KRB5_USER_AUTH      = 1
	KRB5_KEYTAB_AUTH    = 2
	KRB5_CCACHE_AUTH    = 3
	GSS_API_INITIAL     = 1
	GSS_API_VERIFY      = 2
	GSS_API_FINISH      = 3
)
View Source
const APIKeySASLAuth = 36

APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API

View Source
const GroupGenerationUndefined = -1

GroupGenerationUndefined is a special value for the group generation field of Offset Commit Requests that should be used when a consumer group does not rely on Kafka for partition management.

View Source
const MAX_GROUP_INSTANCE_ID_LENGTH = 249
View Source
const ReceiveTime int64 = -1

ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.

Variables

View Source
var (
	// Logger is the instance of a StdLogger interface that Sarama writes connection
	// management events to. By default it is set to discard all log messages via io.Discard,
	// but you can set it to redirect wherever you want.
	Logger StdLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags)

	// PanicHandler is called for recovering from panics spawned internally to the library (and thus
	// not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
	PanicHandler func(interface{})

	// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
	// to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
	// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
	// to process.
	MaxRequestSize int32 = 100 * 1024 * 1024

	// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
	// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
	// protect the client from running out of memory. Please note that brokers do not have any natural limit on
	// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
	// (see https://issues.apache.org/jira/browse/KAFKA-2063).
	MaxResponseSize int32 = 100 * 1024 * 1024
)
View Source
var (
	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
	V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
	V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
	V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
	V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
	V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
	V0_10_2_2 = newKafkaVersion(0, 10, 2, 2)
	V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
	V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
	V1_0_1_0  = newKafkaVersion(1, 0, 1, 0)
	V1_0_2_0  = newKafkaVersion(1, 0, 2, 0)
	V1_1_0_0  = newKafkaVersion(1, 1, 0, 0)
	V1_1_1_0  = newKafkaVersion(1, 1, 1, 0)
	V2_0_0_0  = newKafkaVersion(2, 0, 0, 0)
	V2_0_1_0  = newKafkaVersion(2, 0, 1, 0)
	V2_1_0_0  = newKafkaVersion(2, 1, 0, 0)
	V2_1_1_0  = newKafkaVersion(2, 1, 1, 0)
	V2_2_0_0  = newKafkaVersion(2, 2, 0, 0)
	V2_2_1_0  = newKafkaVersion(2, 2, 1, 0)
	V2_2_2_0  = newKafkaVersion(2, 2, 2, 0)
	V2_3_0_0  = newKafkaVersion(2, 3, 0, 0)
	V2_3_1_0  = newKafkaVersion(2, 3, 1, 0)
	V2_4_0_0  = newKafkaVersion(2, 4, 0, 0)
	V2_4_1_0  = newKafkaVersion(2, 4, 1, 0)
	V2_5_0_0  = newKafkaVersion(2, 5, 0, 0)
	V2_5_1_0  = newKafkaVersion(2, 5, 1, 0)
	V2_6_0_0  = newKafkaVersion(2, 6, 0, 0)
	V2_6_1_0  = newKafkaVersion(2, 6, 1, 0)
	V2_6_2_0  = newKafkaVersion(2, 6, 2, 0)
	V2_6_3_0  = newKafkaVersion(2, 6, 3, 0)
	V2_7_0_0  = newKafkaVersion(2, 7, 0, 0)
	V2_7_1_0  = newKafkaVersion(2, 7, 1, 0)
	V2_7_2_0  = newKafkaVersion(2, 7, 2, 0)
	V2_8_0_0  = newKafkaVersion(2, 8, 0, 0)
	V2_8_1_0  = newKafkaVersion(2, 8, 1, 0)
	V2_8_2_0  = newKafkaVersion(2, 8, 2, 0)
	V3_0_0_0  = newKafkaVersion(3, 0, 0, 0)
	V3_0_1_0  = newKafkaVersion(3, 0, 1, 0)
	V3_0_2_0  = newKafkaVersion(3, 0, 2, 0)
	V3_1_0_0  = newKafkaVersion(3, 1, 0, 0)
	V3_1_1_0  = newKafkaVersion(3, 1, 1, 0)
	V3_1_2_0  = newKafkaVersion(3, 1, 2, 0)
	V3_2_0_0  = newKafkaVersion(3, 2, 0, 0)
	V3_2_1_0  = newKafkaVersion(3, 2, 1, 0)
	V3_2_2_0  = newKafkaVersion(3, 2, 2, 0)
	V3_2_3_0  = newKafkaVersion(3, 2, 3, 0)
	V3_3_0_0  = newKafkaVersion(3, 3, 0, 0)
	V3_3_1_0  = newKafkaVersion(3, 3, 1, 0)
	V3_3_2_0  = newKafkaVersion(3, 3, 2, 0)
	V3_4_0_0  = newKafkaVersion(3, 4, 0, 0)
	V3_4_1_0  = newKafkaVersion(3, 4, 1, 0)
	V3_5_0_0  = newKafkaVersion(3, 5, 0, 0)
	V3_5_1_0  = newKafkaVersion(3, 5, 1, 0)
	V3_6_0_0  = newKafkaVersion(3, 6, 0, 0)

	SupportedVersions = []KafkaVersion{
		V0_8_2_0,
		V0_8_2_1,
		V0_8_2_2,
		V0_9_0_0,
		V0_9_0_1,
		V0_10_0_0,
		V0_10_0_1,
		V0_10_1_0,
		V0_10_1_1,
		V0_10_2_0,
		V0_10_2_1,
		V0_10_2_2,
		V0_11_0_0,
		V0_11_0_1,
		V0_11_0_2,
		V1_0_0_0,
		V1_0_1_0,
		V1_0_2_0,
		V1_1_0_0,
		V1_1_1_0,
		V2_0_0_0,
		V2_0_1_0,
		V2_1_0_0,
		V2_1_1_0,
		V2_2_0_0,
		V2_2_1_0,
		V2_2_2_0,
		V2_3_0_0,
		V2_3_1_0,
		V2_4_0_0,
		V2_4_1_0,
		V2_5_0_0,
		V2_5_1_0,
		V2_6_0_0,
		V2_6_1_0,
		V2_6_2_0,
		V2_7_0_0,
		V2_7_1_0,
		V2_8_0_0,
		V2_8_1_0,
		V2_8_2_0,
		V3_0_0_0,
		V3_0_1_0,
		V3_0_2_0,
		V3_1_0_0,
		V3_1_1_0,
		V3_1_2_0,
		V3_2_0_0,
		V3_2_1_0,
		V3_2_2_0,
		V3_2_3_0,
		V3_3_0_0,
		V3_3_1_0,
		V3_3_2_0,
		V3_4_0_0,
		V3_4_1_0,
		V3_5_0_0,
		V3_5_1_0,
		V3_6_0_0,
	}
	MinVersion     = V0_8_2_0
	MaxVersion     = V3_6_0_0
	DefaultVersion = V2_1_0_0
)

Effective constants defining the supported kafka versions.

View Source
var BalanceStrategyRange = NewBalanceStrategyRange()

Deprecated: use NewBalanceStrategyRange to avoid data race issue

View Source
var BalanceStrategyRoundRobin = NewBalanceStrategyRoundRobin()

Deprecated: use NewBalanceStrategyRoundRobin to avoid data race issue

View Source
var BalanceStrategySticky = NewBalanceStrategySticky()

Deprecated: use NewBalanceStrategySticky to avoid data race issue

View Source
var ErrAddPartitionsToTxn = errors.New("transaction manager: failed to send partitions to transaction")

ErrAddPartitionsToTxn is returned when AddPartitionsToTxn failed multiple times

View Source
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")

ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.

View Source
var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")

ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.

View Source
var ErrCannotTransitionNilError = errors.New("transaction manager: cannot transition with a nil error")

ErrCannotTransitionNilError when transition is attempted with an nil error.

View Source
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

ErrClosedClient is the error returned when a method is called on a client that has been closed.

View Source
var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")

ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.

View Source
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")

ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.

View Source
var ErrControllerNotAvailable = errors.New("kafka: controller is not available")

ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.

View Source
var ErrCreateACLs = errors.New("kafka server: failed to create one or more ACL rules")

ErrCreateACLs is the type of error returned when ACL creation failed

View Source
var ErrDeleteRecords = errors.New("kafka server: failed to delete records")

ErrDeleteRecords is the type of error returned when fail to delete the required records

View Source
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")

ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")

ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")

ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

View Source
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max

View Source
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")

ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.

View Source
var ErrNonTransactedProducer = errors.New("transaction manager: you need to add TransactionalID to producer")

ErrNonTransactedProducer when calling BeginTxn, CommitTxn or AbortTxn on a non transactional producer.

View Source
var ErrNotConnected = errors.New("kafka: broker not connected")

ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

View Source
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to")

ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

View Source
var ErrReassignPartitions = errors.New("failed to reassign partitions for topic")

ErrReassignPartitions is returned when altering partition assignments for a topic fails

View Source
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")

ErrShuttingDown is returned when a producer receives a message during shutdown.

View Source
var ErrTransactionNotReady = errors.New("transaction manager: transaction is not ready")

ErrTransactionNotReady when transaction status is invalid for the current action.

View Source
var ErrTransitionNotAllowed = errors.New("transaction manager: invalid transition attempted")

ErrTransitionNotAllowed when txnmgr state transition is not valid.

View Source
var ErrTxnOffsetCommit = errors.New("transaction manager: failed to send offsets to transaction")

ErrTxnOffsetCommit is returned when TxnOffsetCommit failed multiple times

View Source
var ErrTxnUnableToParseResponse = errors.New("transaction manager: unable to parse response")

ErrTxnUnableToParseResponse when response is nil

View Source
var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")

ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism

View Source
var GROUP_INSTANCE_ID_REGEXP = regexp.MustCompile(`^[0-9a-zA-Z\._\-]+$`)
View Source
var MultiErrorFormat multierror.ErrorFormatFunc = func(es []error) string {
	if len(es) == 1 {
		return es[0].Error()
	}

	points := make([]string, len(es))
	for i, err := range es {
		points[i] = fmt.Sprintf("* %s", err)
	}

	return fmt.Sprintf(
		"%d errors occurred:\n\t%s\n",
		len(es), strings.Join(points, "\n\t"))
}

MultiErrorFormat specifies the formatter applied to format multierrors. The default implementation is a condensed version of the hashicorp/go-multierror default one

View Source
var NoNode = &Broker{id: -1, addr: ":-1"}
View Source
var NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}

Functions

func Wrap added in v1.40.0

func Wrap(sentinel error, wrapped ...error) sentinelError

Types

type AbortedTransaction added in v1.14.0

type AbortedTransaction struct {
	// ProducerID contains the producer id associated with the aborted transaction.
	ProducerID int64
	// FirstOffset contains the first offset in the aborted transaction.
	FirstOffset int64
}

type AccessToken added in v1.40.0

type AccessToken struct {
	// Token is the access token payload.
	Token string
	// Extensions is a optional map of arbitrary key-value pairs that can be
	// sent with the SASL/OAUTHBEARER initial client response. These values are
	// ignored by the SASL server if they are unexpected. This feature is only
	// supported by Kafka >= 2.1.0.
	Extensions map[string]string
}

AccessToken contains an access token used to authenticate a SASL/OAUTHBEARER client along with associated metadata.

type AccessTokenProvider added in v1.40.0

type AccessTokenProvider interface {
	// Token returns an access token. The implementation should ensure token
	// reuse so that multiple calls at connect time do not create multiple
	// tokens. The implementation should also periodically refresh the token in
	// order to guarantee that each call returns an unexpired token.  This
	// method should not block indefinitely--a timeout error should be returned
	// after a short period of inactivity so that the broker connection logic
	// can log debugging information and retry.
	Token() (*AccessToken, error)
}

AccessTokenProvider is the interface that encapsulates how implementors can generate access tokens for Kafka broker authentication.

type Acl added in v1.16.0

type Acl struct {
	Principal      string
	Host           string
	Operation      AclOperation
	PermissionType AclPermissionType
}

Acl holds information about acl type

type AclCreation added in v1.16.0

type AclCreation struct {
	Resource
	Acl
}

AclCreation is a wrapper around Resource and Acl type

type AclCreationResponse added in v1.16.0

type AclCreationResponse struct {
	Err    KError
	ErrMsg *string
}

AclCreationResponse is an acl creation response type

type AclFilter added in v1.16.0

type AclFilter struct {
	Version                   int
	ResourceType              AclResourceType
	ResourceName              *string
	ResourcePatternTypeFilter AclResourcePatternType
	Principal                 *string
	Host                      *string
	Operation                 AclOperation
	PermissionType            AclPermissionType
}

type AclOperation added in v1.16.0

type AclOperation int
const (
	AclOperationUnknown AclOperation = iota
	AclOperationAny
	AclOperationAll
	AclOperationRead
	AclOperationWrite
	AclOperationCreate
	AclOperationDelete
	AclOperationAlter
	AclOperationDescribe
	AclOperationClusterAction
	AclOperationDescribeConfigs
	AclOperationAlterConfigs
	AclOperationIdempotentWrite
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java

func (*AclOperation) MarshalText added in v1.40.0

func (a *AclOperation) MarshalText() ([]byte, error)

MarshalText returns the text form of the AclOperation (name without prefix)

func (*AclOperation) String added in v1.40.0

func (a *AclOperation) String() string

func (*AclOperation) UnmarshalText added in v1.40.0

func (a *AclOperation) UnmarshalText(text []byte) error

UnmarshalText takes a text representation of the operation and converts it to an AclOperation

type AclPermissionType added in v1.16.0

type AclPermissionType int
const (
	AclPermissionUnknown AclPermissionType = iota
	AclPermissionAny
	AclPermissionDeny
	AclPermissionAllow
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java

func (*AclPermissionType) MarshalText added in v1.40.0

func (a *AclPermissionType) MarshalText() ([]byte, error)

MarshalText returns the text form of the AclPermissionType (name without prefix)

func (*AclPermissionType) String added in v1.40.0

func (a *AclPermissionType) String() string

func (*AclPermissionType) UnmarshalText added in v1.40.0

func (a *AclPermissionType) UnmarshalText(text []byte) error

UnmarshalText takes a text representation of the permission type and converts it to an AclPermissionType

type AclResourcePatternType added in v1.40.0

type AclResourcePatternType int
const (
	AclPatternUnknown AclResourcePatternType = iota
	AclPatternAny
	AclPatternMatch
	AclPatternLiteral
	AclPatternPrefixed
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java

func (*AclResourcePatternType) MarshalText added in v1.40.0

func (a *AclResourcePatternType) MarshalText() ([]byte, error)

MarshalText returns the text form of the AclResourcePatternType (name without prefix)

func (*AclResourcePatternType) String added in v1.40.0

func (a *AclResourcePatternType) String() string

func (*AclResourcePatternType) UnmarshalText added in v1.40.0

func (a *AclResourcePatternType) UnmarshalText(text []byte) error

UnmarshalText takes a text representation of the resource pattern type and converts it to an AclResourcePatternType

type AclResourceType added in v1.16.0

type AclResourceType int
const (
	AclResourceUnknown AclResourceType = iota
	AclResourceAny
	AclResourceTopic
	AclResourceGroup
	AclResourceCluster
	AclResourceTransactionalID
	AclResourceDelegationToken
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java

func (*AclResourceType) MarshalText added in v1.40.0

func (a *AclResourceType) MarshalText() ([]byte, error)

MarshalText returns the text form of the AclResourceType (name without prefix)

func (*AclResourceType) String added in v1.40.0

func (a *AclResourceType) String() string

func (*AclResourceType) UnmarshalText added in v1.40.0

func (a *AclResourceType) UnmarshalText(text []byte) error

UnmarshalText takes a text representation of the resource type and converts it to an AclResourceType

type AddOffsetsToTxnRequest added in v1.16.0

type AddOffsetsToTxnRequest struct {
	Version         int16
	TransactionalID string
	ProducerID      int64
	ProducerEpoch   int16
	GroupID         string
}

AddOffsetsToTxnRequest adds offsets to a transaction request

type AddOffsetsToTxnResponse added in v1.16.0

type AddOffsetsToTxnResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Err          KError
}

AddOffsetsToTxnResponse is a response type for adding offsets to txns

type AddPartitionsToTxnRequest added in v1.16.0

type AddPartitionsToTxnRequest struct {
	Version         int16
	TransactionalID string
	ProducerID      int64
	ProducerEpoch   int16
	TopicPartitions map[string][]int32
}

AddPartitionsToTxnRequest is a add partition request

type AddPartitionsToTxnResponse added in v1.16.0

type AddPartitionsToTxnResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Errors       map[string][]*PartitionError
}

AddPartitionsToTxnResponse is a partition errors to transaction type

type AlterClientQuotasEntry added in v1.40.0

type AlterClientQuotasEntry struct {
	Entity []QuotaEntityComponent // The quota entity to alter.
	Ops    []ClientQuotasOp       // An individual quota configuration entry to alter.
}

type AlterClientQuotasEntryResponse added in v1.40.0

type AlterClientQuotasEntryResponse struct {
	ErrorCode KError                 // The error code, or `0` if the quota alteration succeeded.
	ErrorMsg  *string                // The error message, or `null` if the quota alteration succeeded.
	Entity    []QuotaEntityComponent // The quota entity altered.
}

type AlterClientQuotasRequest added in v1.40.0

type AlterClientQuotasRequest struct {
	Version      int16
	Entries      []AlterClientQuotasEntry // The quota configuration entries to alter.
	ValidateOnly bool                     // Whether the alteration should be validated, but not performed.
}

type AlterClientQuotasResponse added in v1.40.0

type AlterClientQuotasResponse struct {
	Version      int16
	ThrottleTime time.Duration                    // The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
	Entries      []AlterClientQuotasEntryResponse // The quota configuration entries altered.
}

type AlterConfigError added in v1.42.2

type AlterConfigError struct {
	Err    KError
	ErrMsg string
}

func (*AlterConfigError) Error added in v1.42.2

func (c *AlterConfigError) Error() string

type AlterConfigsRequest added in v1.16.0

type AlterConfigsRequest struct {
	Version      int16
	Resources    []*AlterConfigsResource
	ValidateOnly bool
}

AlterConfigsRequest is an alter config request type

type AlterConfigsResource added in v1.16.0

type AlterConfigsResource struct {
	Type          ConfigResourceType
	Name          string
	ConfigEntries map[string]*string
}

AlterConfigsResource is an alter config resource type

type AlterConfigsResourceResponse added in v1.16.0

type AlterConfigsResourceResponse struct {
	ErrorCode int16
	ErrorMsg  string
	Type      ConfigResourceType
	Name      string
}

AlterConfigsResourceResponse is a response type for alter config resource

type AlterConfigsResponse added in v1.16.0

type AlterConfigsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Resources    []*AlterConfigsResourceResponse
}

AlterConfigsResponse is a response type for alter config

type AlterPartitionReassignmentsRequest added in v1.40.0

type AlterPartitionReassignmentsRequest struct {
	TimeoutMs int32

	Version int16
	// contains filtered or unexported fields
}

func (*AlterPartitionReassignmentsRequest) AddBlock added in v1.40.0

func (r *AlterPartitionReassignmentsRequest) AddBlock(topic string, partitionID int32, replicas []int32)

type AlterPartitionReassignmentsResponse added in v1.40.0

type AlterPartitionReassignmentsResponse struct {
	Version        int16
	ThrottleTimeMs int32
	ErrorCode      KError
	ErrorMessage   *string
	Errors         map[string]map[int32]*alterPartitionReassignmentsErrorBlock
}

func (*AlterPartitionReassignmentsResponse) AddError added in v1.40.0

func (r *AlterPartitionReassignmentsResponse) AddError(topic string, partition int32, kerror KError, message *string)

type AlterUserScramCredentialsDelete added in v1.40.0

type AlterUserScramCredentialsDelete struct {
	Name      string
	Mechanism ScramMechanismType
}

type AlterUserScramCredentialsRequest added in v1.40.0

type AlterUserScramCredentialsRequest struct {
	Version int16

	// Deletions represent list of SCRAM credentials to remove
	Deletions []AlterUserScramCredentialsDelete

	// Upsertions represent list of SCRAM credentials to update/insert
	Upsertions []AlterUserScramCredentialsUpsert
}

type AlterUserScramCredentialsResponse added in v1.40.0

type AlterUserScramCredentialsResponse struct {
	Version int16

	ThrottleTime time.Duration

	Results []*AlterUserScramCredentialsResult
}

type AlterUserScramCredentialsResult added in v1.40.0

type AlterUserScramCredentialsResult struct {
	User string

	ErrorCode    KError
	ErrorMessage *string
}

type AlterUserScramCredentialsUpsert added in v1.40.0

type AlterUserScramCredentialsUpsert struct {
	Name       string
	Mechanism  ScramMechanismType
	Iterations int32
	Salt       []byte

	// This field is never transmitted over the wire
	// @see: https://tools.ietf.org/html/rfc5802
	Password []byte
	// contains filtered or unexported fields
}

type ApiVersionsRequest added in v1.10.0

type ApiVersionsRequest struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ClientSoftwareName contains the name of the client.
	ClientSoftwareName string
	// ClientSoftwareVersion contains the version of the client.
	ClientSoftwareVersion string
}

type ApiVersionsResponse added in v1.10.0

type ApiVersionsResponse struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ErrorCode contains the top-level error code.
	ErrorCode int16
	// ApiKeys contains the APIs supported by the broker.
	ApiKeys []ApiVersionsResponseKey
	// ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
	ThrottleTimeMs int32
}

type ApiVersionsResponseKey added in v1.40.0

type ApiVersionsResponseKey struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ApiKey contains the API index.
	ApiKey int16
	// MinVersion contains the minimum supported version, inclusive.
	MinVersion int16
	// MaxVersion contains the maximum supported version, inclusive.
	MaxVersion int16
}

ApiVersionsResponseKey contains the APIs supported by the broker.

type AsyncProducer

type AsyncProducer interface {
	// AsyncClose triggers a shutdown of the producer. The shutdown has completed
	// when both the Errors and Successes channels have been closed. When calling
	// AsyncClose, you *must* continue to read from those channels in order to
	// drain the results of any messages in flight.
	AsyncClose()

	// Close shuts down the producer and waits for any buffered messages to be
	// flushed. You must call this function before a producer object passes out of
	// scope, as it may otherwise leak memory. You must call this before process
	// shutting down, or you may lose messages. You must call this before calling
	// Close on the underlying client.
	Close() error

	// Input is the input channel for the user to write messages to that they
	// wish to send.
	Input() chan<- *ProducerMessage

	// Successes is the success output channel back to the user when Return.Successes is
	// enabled. If Return.Successes is true, you MUST read from this channel or the
	// Producer will deadlock. It is suggested that you send and read messages
	// together in a single select statement.
	Successes() <-chan *ProducerMessage

	// Errors is the error output channel back to the user. You MUST read from this
	// channel or the Producer will deadlock when the channel is full. Alternatively,
	// you can set Producer.Return.Errors in your config to false, which prevents
	// errors to be returned.
	Errors() <-chan *ProducerError

	// IsTransactional return true when current producer is transactional.
	IsTransactional() bool

	// TxnStatus return current producer transaction status.
	TxnStatus() ProducerTxnStatusFlag

	// BeginTxn mark current transaction as ready.
	BeginTxn() error

	// CommitTxn commit current transaction.
	CommitTxn() error

	// AbortTxn abort current transaction.
	AbortTxn() error

	// AddOffsetsToTxn add associated offsets to current transaction.
	AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error

	// AddMessageToTxn add message offsets to current transaction.
	AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
}

AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages to the correct broker for the provided topic-partition, refreshing metadata as appropriate, and parses responses for errors. You must read from the Errors() channel or the producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid leaks and message lost: it will not be garbage-collected automatically when it passes out of scope and buffered messages may not be flushed.

Example (Goroutines)

This example shows how to use the producer with separate goroutines reading from the Successes and Errors channels. Note that in order for the Successes channel to be populated, you have to set config.Producer.Return.Successes to true.

config := NewTestConfig()
config.Producer.Return.Successes = true
producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
	panic(err)
}

// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var (
	wg                                  sync.WaitGroup
	enqueued, successes, producerErrors int
)

wg.Add(1)
go func() {
	defer wg.Done()
	for range producer.Successes() {
		successes++
	}
}()

wg.Add(1)
go func() {
	defer wg.Done()
	for err := range producer.Errors() {
		log.Println(err)
		producerErrors++
	}
}()

ProducerLoop:
for {
	message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
	select {
	case producer.Input() <- message:
		enqueued++

	case <-signals:
		producer.AsyncClose() // Trigger a shutdown of the producer.
		break ProducerLoop
	}
}

wg.Wait()

log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
Output:

Example (Select)

This example shows how to use the producer while simultaneously reading the Errors channel to know about any failures.

producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
	panic(err)
}

defer func() {
	if err := producer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var enqueued, producerErrors int
ProducerLoop:
for {
	select {
	case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
		enqueued++
	case err := <-producer.Errors():
		log.Println("Failed to produce message", err)
		producerErrors++
	case <-signals:
		break ProducerLoop
	}
}

log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors)
Output:

func NewAsyncProducer

func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error)

NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.

func NewAsyncProducerFromClient

func NewAsyncProducerFromClient(client Client) (AsyncProducer, error)

NewAsyncProducerFromClient creates a new Producer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

type BalanceStrategy added in v1.19.0

type BalanceStrategy interface {
	// Name uniquely identifies the strategy.
	Name() string

	// Plan accepts a map of `memberID -> metadata` and a map of `topic -> partitions`
	// and returns a distribution plan.
	Plan(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error)

	// AssignmentData returns the serialized assignment data for the specified
	// memberID
	AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error)
}

BalanceStrategy is used to balance topics and partitions across members of a consumer group

func NewBalanceStrategyRange added in v1.40.0

func NewBalanceStrategyRange() BalanceStrategy

NewBalanceStrategyRange returns a range balance strategy, which is the default and assigns partitions as ranges to consumer group members. This follows the same logic as https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html

Example with two topics T1 and T2 with six partitions each (0..5) and two members (M1, M2):

M1: {T1: [0, 1, 2], T2: [0, 1, 2]}
M2: {T1: [3, 4, 5], T2: [3, 4, 5]}

func NewBalanceStrategyRoundRobin added in v1.40.0

func NewBalanceStrategyRoundRobin() BalanceStrategy

NewBalanceStrategyRoundRobin returns a round-robin balance strategy, which assigns partitions to members in alternating order. For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): M0: [t0p0, t0p2, t1p1] M1: [t0p1, t1p0, t1p2]

func NewBalanceStrategySticky added in v1.40.0

func NewBalanceStrategySticky() BalanceStrategy

NewBalanceStrategySticky returns a sticky balance strategy, which assigns partitions to members with an attempt to preserve earlier assignments while maintain a balanced partition distribution. Example with topic T with six partitions (0..5) and two members (M1, M2):

M1: {T: [0, 2, 4]}
M2: {T: [1, 3, 5]}

On reassignment with an additional consumer, you might get an assignment plan like:

M1: {T: [0, 2]}
M2: {T: [1, 3]}
M3: {T: [4, 5]}

type BalanceStrategyPlan added in v1.19.0

type BalanceStrategyPlan map[string]map[string][]int32

BalanceStrategyPlan is the results of any BalanceStrategy.Plan attempt. It contains an allocation of topic/partitions by memberID in the form of a `memberID -> topic -> partitions` map.

func (BalanceStrategyPlan) Add added in v1.19.0

func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32)

Add assigns a topic with a number partitions to a member.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.

Example
broker := NewBroker("localhost:9092")
err := broker.Open(nil)
if err != nil {
	panic(err)
}

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata(&request)
if err != nil {
	_ = broker.Close()
	panic(err)
}

fmt.Println("There are", len(response.Topics), "topics active in the cluster.")

if err = broker.Close(); err != nil {
	panic(err)
}
Output:

func NewBroker

func NewBroker(addr string) *Broker

NewBroker creates and returns a Broker targeting the given host:port address. This does not attempt to actually connect, you have to call Open() for that.

func (*Broker) AddOffsetsToTxn added in v1.16.0

func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error)

AddOffsetsToTxn sends a request to add offsets to txn and returns a response or error

func (*Broker) AddPartitionsToTxn added in v1.16.0

func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error)

AddPartitionsToTxn send a request to add partition to txn and returns a response or error

func (*Broker) Addr

func (b *Broker) Addr() string

Addr returns the broker address as either retrieved from Kafka's metadata or passed to NewBroker.

func (*Broker) AlterClientQuotas added in v1.40.0

func (b *Broker) AlterClientQuotas(request *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error)

AlterClientQuotas sends a request to alter the broker's quotas

func (*Broker) AlterConfigs added in v1.16.0

func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error)

AlterConfigs sends a request to alter config and return a response or error

func (*Broker) AlterPartitionReassignments added in v1.40.0

func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error)

AlterPartitionReassignments sends a alter partition reassignments request and returns alter partition reassignments response

func (*Broker) AlterUserScramCredentials added in v1.40.0

func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error)

func (*Broker) ApiVersions added in v1.12.0

func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error)

ApiVersions return api version response or error

func (*Broker) AsyncProduce added in v1.40.0

func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error

AsyncProduce sends a produce request and eventually call the provided callback with a produce response or an error.

Waiting for the response is generally not blocking on the contrary to using Produce. If the maximum number of in flight request configured is reached then the request will be blocked till a previous response is received.

When configured with RequiredAcks == NoResponse, the callback will not be invoked. If an error is returned because the request could not be sent then the callback will not be invoked either.

Make sure not to Close the broker in the callback as it will lead to a deadlock.

func (*Broker) Close

func (b *Broker) Close() error

Close closes the broker resources

func (*Broker) CommitOffset

func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error)

CommitOffset return an Offset commit response or error

func (*Broker) Connected

func (b *Broker) Connected() (bool, error)

Connected returns true if the broker is connected and false otherwise. If the broker is not connected but it had tried to connect, the error from that connection attempt is also returned.

func (*Broker) CreateAcls added in v1.16.0

func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error)

CreateAcls sends a create acl request and returns a response or error

func (*Broker) CreatePartitions added in v1.17.0

func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error)

CreatePartitions sends a create partition request and returns create partitions response or error

func (*Broker) CreateTopics added in v1.16.0

func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error)

CreateTopics send a create topic request and returns create topic response

func (*Broker) DeleteAcls added in v1.16.0

func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error)

DeleteAcls sends a delete acl request and returns a response or error

func (*Broker) DeleteGroups added in v1.17.0

func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error)

DeleteGroups sends a request to delete groups and returns a response or error

func (*Broker) DeleteOffsets added in v1.40.0

func (b *Broker) DeleteOffsets(request *DeleteOffsetsRequest) (*DeleteOffsetsResponse, error)

DeleteOffsets sends a request to delete group offsets and returns a response or error

func (*Broker) DeleteRecords added in v1.17.0

func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error)

DeleteRecords send a request to delete records and return delete record response or error

func (*Broker) DeleteTopics added in v1.16.0

func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error)

DeleteTopics sends a delete topic request and returns delete topic response

func (*Broker) DescribeAcls added in v1.16.0

func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error)

DescribeAcls sends a describe acl request and returns a response or error

func (*Broker) DescribeClientQuotas added in v1.40.0

func (b *Broker) DescribeClientQuotas(request *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error)

DescribeClientQuotas sends a request to get the broker's quotas

func (*Broker) DescribeConfigs added in v1.16.0

func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error)

DescribeConfigs sends a request to describe config and returns a response or error

func (*Broker) DescribeGroups added in v1.8.0

func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error)

DescribeGroups return describe group response or error

func (*Broker) DescribeLogDirs added in v1.40.0

func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error)

DescribeLogDirs sends a request to get the broker's log dir paths and sizes

func (*Broker) DescribeUserScramCredentials added in v1.40.0

DescribeUserScramCredentials sends a request to get SCRAM users

func (*Broker) EndTxn added in v1.16.0

func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error)

EndTxn sends a request to end txn and returns a response or error

func (*Broker) Fetch

func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error)

Fetch returns a FetchResponse or error

func (*Broker) FetchOffset

func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error)

FetchOffset returns an offset fetch response or error

func (*Broker) FindCoordinator added in v1.17.0

func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error)

FindCoordinator sends a find coordinate request and returns a response or error

func (*Broker) GetAvailableOffsets

func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error)

GetAvailableOffsets return an offset response or error

func (*Broker) GetConsumerMetadata

func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error)

GetConsumerMetadata send a consumer metadata request and returns a consumer metadata response or error

func (*Broker) GetMetadata

func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error)

GetMetadata send a metadata request and returns a metadata response or error

func (*Broker) Heartbeat added in v1.8.0

func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error)

Heartbeat returns a heartbeat response or error

func (*Broker) ID

func (b *Broker) ID() int32

ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.

func (*Broker) IncrementalAlterConfigs added in v1.40.0

func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error)

IncrementalAlterConfigs sends a request to incremental alter config and return a response or error

func (*Broker) InitProducerID added in v1.16.0

func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error)

InitProducerID sends an init producer request and returns a response or error

func (*Broker) JoinGroup added in v1.8.0

func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error)

JoinGroup returns a join group response or error

func (*Broker) LeaveGroup added in v1.8.0

func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error)

LeaveGroup return a leave group response or error

func (*Broker) ListGroups added in v1.8.0

func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error)

ListGroups return a list group response or error

func (*Broker) ListPartitionReassignments added in v1.40.0

func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error)

ListPartitionReassignments sends a list partition reassignments request and returns list partition reassignments response

func (*Broker) Open

func (b *Broker) Open(conf *Config) error

Open tries to connect to the Broker if it is not already connected or connecting, but does not block waiting for the connection to complete. This means that any subsequent operations on the broker will block waiting for the connection to succeed or fail. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). The only errors Open will return directly are ConfigurationError or AlreadyConnected. If conf is nil, the result of NewConfig() is used.

func (*Broker) Produce

func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error)

Produce returns a produce response or error

func (*Broker) Rack added in v1.20.0

func (b *Broker) Rack() string

Rack returns the broker's rack as retrieved from Kafka's metadata or the empty string if it is not known. The returned value corresponds to the broker's broker.rack configuration setting. Requires protocol version to be at least v0.10.0.0.

func (*Broker) ResponseSize added in v1.40.0

func (b *Broker) ResponseSize() int

func (*Broker) SyncGroup added in v1.8.0

func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error)

SyncGroup returns a sync group response or error

func (*Broker) TLSConnectionState added in v1.40.0

func (b *Broker) TLSConnectionState() (state tls.ConnectionState, ok bool)

TLSConnectionState returns the client's TLS connection state. The second return value is false if this is not a tls connection or the connection has not yet been established.

func (*Broker) TxnOffsetCommit added in v1.16.0

func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error)

TxnOffsetCommit sends a request to commit transaction offsets and returns a response or error

type BuildSpnFunc added in v1.43.0

type BuildSpnFunc func(serviceName, host string) string

type ByteEncoder

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.

func (ByteEncoder) Encode

func (b ByteEncoder) Encode() ([]byte, error)

func (ByteEncoder) Length

func (b ByteEncoder) Length() int

type Client

type Client interface {
	// Config returns the Config struct of the client. This struct should not be
	// altered after it has been created.
	Config() *Config

	// Controller returns the cluster controller broker. It will return a
	// locally cached value if it's available. You can call RefreshController
	// to update the cached value. Requires Kafka 0.10 or higher.
	Controller() (*Broker, error)

	// RefreshController retrieves the cluster controller from fresh metadata
	// and stores it in the local cache. Requires Kafka 0.10 or higher.
	RefreshController() (*Broker, error)

	// Brokers returns the current set of active brokers as retrieved from cluster metadata.
	Brokers() []*Broker

	// Broker returns the active Broker if available for the broker ID.
	Broker(brokerID int32) (*Broker, error)

	// Topics returns the set of available topics as retrieved from cluster metadata.
	Topics() ([]string, error)

	// Partitions returns the sorted list of all partition IDs for the given topic.
	Partitions(topic string) ([]int32, error)

	// WritablePartitions returns the sorted list of all writable partition IDs for
	// the given topic, where "writable" means "having a valid leader accepting
	// writes".
	WritablePartitions(topic string) ([]int32, error)

	// Leader returns the broker object that is the leader of the current
	// topic/partition, as determined by querying the cluster metadata.
	Leader(topic string, partitionID int32) (*Broker, error)

	// LeaderAndEpoch returns the leader and its epoch for the current
	// topic/partition, as determined by querying the cluster metadata.
	LeaderAndEpoch(topic string, partitionID int32) (*Broker, int32, error)

	// Replicas returns the set of all replica IDs for the given partition.
	Replicas(topic string, partitionID int32) ([]int32, error)

	// InSyncReplicas returns the set of all in-sync replica IDs for the given
	// partition. In-sync replicas are replicas which are fully caught up with
	// the partition leader.
	InSyncReplicas(topic string, partitionID int32) ([]int32, error)

	// OfflineReplicas returns the set of all offline replica IDs for the given
	// partition. Offline replicas are replicas which are offline
	OfflineReplicas(topic string, partitionID int32) ([]int32, error)

	// RefreshBrokers takes a list of addresses to be used as seed brokers.
	// Existing broker connections are closed and the updated list of seed brokers
	// will be used for the next metadata fetch.
	RefreshBrokers(addrs []string) error

	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
	// available metadata for those topics. If no topics are provided, it will refresh
	// metadata for all topics.
	RefreshMetadata(topics ...string) error

	// GetOffset queries the cluster to get the most recent available offset at the
	// given time (in milliseconds) on the topic/partition combination.
	// Time should be OffsetOldest for the earliest available offset,
	// OffsetNewest for the offset of the message that will be produced next, or a time.
	GetOffset(topic string, partitionID int32, time int64) (int64, error)

	// Coordinator returns the coordinating broker for a consumer group. It will
	// return a locally cached value if it's available. You can call
	// RefreshCoordinator to update the cached value. This function only works on
	// Kafka 0.8.2 and higher.
	Coordinator(consumerGroup string) (*Broker, error)

	// RefreshCoordinator retrieves the coordinator for a consumer group and stores it
	// in local cache. This function only works on Kafka 0.8.2 and higher.
	RefreshCoordinator(consumerGroup string) error

	// Coordinator returns the coordinating broker for a transaction id. It will
	// return a locally cached value if it's available. You can call
	// RefreshCoordinator to update the cached value. This function only works on
	// Kafka 0.11.0.0 and higher.
	TransactionCoordinator(transactionID string) (*Broker, error)

	// RefreshCoordinator retrieves the coordinator for a transaction id and stores it
	// in local cache. This function only works on Kafka 0.11.0.0 and higher.
	RefreshTransactionCoordinator(transactionID string) error

	// InitProducerID retrieves information required for Idempotent Producer
	InitProducerID() (*InitProducerIDResponse, error)

	// LeastLoadedBroker retrieves broker that has the least responses pending
	LeastLoadedBroker() *Broker

	// Close shuts down all broker connections managed by this client. It is required
	// to call this function before a client object passes out of scope, as it will
	// otherwise leak memory. You must close any Producers or Consumers using a client
	// before you close the client.
	Close() error

	// Closed returns true if the client has already had Close called on it
	Closed() bool
}

Client is a generic Kafka client. It manages connections to one or more Kafka brokers. You MUST call Close() on a client to avoid leaks, it will not be garbage-collected automatically when it passes out of scope. It is safe to share a client amongst many users, however Kafka will process requests from a single client strictly in serial, so it is generally more efficient to use the default one client per producer/consumer.

func NewClient

func NewClient(addrs []string, conf *Config) (Client, error)

NewClient creates a new Client. It connects to one of the given broker addresses and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot be retrieved from any of the given broker addresses, the client is not created.

type ClientQuotasOp added in v1.40.0

type ClientQuotasOp struct {
	Key    string  // The quota configuration key.
	Value  float64 // The value to set, otherwise ignored if the value is to be removed.
	Remove bool    // Whether the quota configuration value should be removed, otherwise set.
}

type ClusterAdmin added in v1.18.0

type ClusterAdmin interface {
	// Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
	// It may take several seconds after CreateTopic returns success for all the brokers
	// to become aware that the topic has been created. During this time, listTopics
	// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
	CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error

	// List the topics available in the cluster with the default options.
	ListTopics() (map[string]TopicDetail, error)

	// Describe some topics in the cluster.
	DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)

	// Delete a topic. It may take several seconds after the DeleteTopic to returns success
	// and for all the brokers to become aware that the topics are gone.
	// During this time, listTopics  may continue to return information about the deleted topic.
	// If delete.topic.enable is false on the brokers, deleteTopic will mark
	// the topic for deletion, but not actually delete them.
	// This operation is supported by brokers with version 0.10.1.0 or higher.
	DeleteTopic(topic string) error

	// Increase the number of partitions of the topics  according to the corresponding values.
	// If partitions are increased for a topic that has a key, the partition logic or ordering of
	// the messages will be affected. It may take several seconds after this method returns
	// success for all the brokers to become aware that the partitions have been created.
	// During this time, ClusterAdmin#describeTopics may not return information about the
	// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error

	// Alter the replica assignment for partitions.
	// This operation is supported by brokers with version 2.4.0.0 or higher.
	AlterPartitionReassignments(topic string, assignment [][]int32) error

	// Provides info on ongoing partitions replica reassignments.
	// This operation is supported by brokers with version 2.4.0.0 or higher.
	ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)

	// Delete records whose offset is smaller than the given offset of the corresponding partition.
	// This operation is supported by brokers with version 0.11.0.0 or higher.
	DeleteRecords(topic string, partitionOffsets map[int32]int64) error

	// Get the configuration for the specified resources.
	// The returned configuration includes default values and the Default is true
	// can be used to distinguish them from user supplied values.
	// Config entries where ReadOnly is true cannot be updated.
	// The value of config entries where Sensitive is true is always nil so
	// sensitive information is not disclosed.
	// This operation is supported by brokers with version 0.11.0.0 or higher.
	DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)

	// Update the configuration for the specified resources with the default options.
	// This operation is supported by brokers with version 0.11.0.0 or higher.
	// The resources with their configs (topic is the only resource type with configs
	// that can be updated currently Updates are not transactional so they may succeed
	// for some resources while fail for others. The configs for a particular resource are updated automatically.
	AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error

	// IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
	// This operation is supported by brokers with version 2.3.0.0 or higher.
	// Updates are not transactional so they may succeed for some resources while fail for others.
	// The configs for a particular resource are updated automatically.
	IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error

	// Creates an access control list (ACL) which is bound to a specific resource.
	// This operation is not transactional so it may succeed or fail.
	// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
	// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
	// Deprecated: Use CreateACLs instead.
	CreateACL(resource Resource, acl Acl) error

	// Creates access control lists (ACLs) which are bound to specific resources.
	// This operation is not transactional so it may succeed for some ACLs while fail for others.
	// If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
	// no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
	CreateACLs([]*ResourceAcls) error

	// Lists access control lists (ACLs) according to the supplied filter.
	// it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
	// This operation is supported by brokers with version 0.11.0.0 or higher.
	ListAcls(filter AclFilter) ([]ResourceAcls, error)

	// Deletes access control lists (ACLs) according to the supplied filters.
	// This operation is not transactional so it may succeed for some ACLs while fail for others.
	// This operation is supported by brokers with version 0.11.0.0 or higher.
	DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)

	// List the consumer groups available in the cluster.
	ListConsumerGroups() (map[string]string, error)

	// Describe the given consumer groups.
	DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)

	// List the consumer group offsets available in the cluster.
	ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)

	// Deletes a consumer group offset
	DeleteConsumerGroupOffset(group string, topic string, partition int32) error

	// Delete a consumer group.
	DeleteConsumerGroup(group string) error

	// Get information about the nodes in the cluster
	DescribeCluster() (brokers []*Broker, controllerID int32, err error)

	// Get information about all log directories on the given set of brokers
	DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)

	// Get information about SCRAM users
	DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)

	// Delete SCRAM users
	DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)

	// Upsert SCRAM users
	UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)

	// Get client quota configurations corresponding to the specified filter.
	// This operation is supported by brokers with version 2.6.0.0 or higher.
	DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)

	// Alters client quota configurations with the specified alterations.
	// This operation is supported by brokers with version 2.6.0.0 or higher.
	AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error

	// Controller returns the cluster controller broker. It will return a
	// locally cached value if it's available.
	Controller() (*Broker, error)

	// Remove members from the consumer group by given member identities.
	// This operation is supported by brokers with version 2.3 or higher
	// This is for static membership feature. KIP-345
	RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)

	// Close shuts down the admin and closes underlying client.
	Close() error
}

ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics, brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0. Methods with stricter requirements will specify the minimum broker version required. You MUST call Close() on a client to avoid leaks

func NewClusterAdmin added in v1.18.0

func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error)

NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.

func NewClusterAdminFromClient added in v1.40.0

func NewClusterAdminFromClient(client Client) (ClusterAdmin, error)

NewClusterAdminFromClient creates a new ClusterAdmin using the given client. Note that underlying client will also be closed on admin's Close() call.

type CompressionCodec

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
	// CompressionNone no compression
	CompressionNone CompressionCodec = iota
	// CompressionGZIP compression using GZIP
	CompressionGZIP
	// CompressionSnappy compression using snappy
	CompressionSnappy
	// CompressionLZ4 compression using LZ4
	CompressionLZ4
	// CompressionZSTD compression using ZSTD
	CompressionZSTD

	// CompressionLevelDefault is the constant to use in CompressionLevel
	// to have the default compression level for any codec. The value is picked
	// that we don't use any existing compression levels.
	CompressionLevelDefault = -1000
)

func (CompressionCodec) MarshalText added in v1.40.0

func (cc CompressionCodec) MarshalText() ([]byte, error)

MarshalText transforms a CompressionCodec into its string representation.

func (CompressionCodec) String added in v1.17.0

func (cc CompressionCodec) String() string

func (*CompressionCodec) UnmarshalText added in v1.40.0

func (cc *CompressionCodec) UnmarshalText(text []byte) error

UnmarshalText returns a CompressionCodec from its string representation.

type Config

type Config struct {
	// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
	Admin struct {
		Retry struct {
			// The total number of times to retry sending (retriable) admin requests (default 5).
			// Similar to the `retries` setting of the JVM AdminClientConfig.
			Max int
			// Backoff time between retries of a failed request (default 100ms)
			Backoff time.Duration
		}
		// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
		// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
		Timeout time.Duration
	}

	// Net is the namespace for network-level properties used by the Broker, and
	// shared by the Client/Producer/Consumer.
	Net struct {
		// How many outstanding requests a connection is allowed to have before
		// sending on it blocks (default 5).
		// Throughput can improve but message ordering is not guaranteed if Producer.Idempotent is disabled, see:
		// https://kafka.apache.org/protocol#protocol_network
		// https://kafka.apache.org/28/documentation.html#producerconfigs_max.in.flight.requests.per.connection
		MaxOpenRequests int

		// All three of the below configurations are similar to the
		// `socket.timeout.ms` setting in JVM kafka. All of them default
		// to 30 seconds.
		DialTimeout  time.Duration // How long to wait for the initial connection.
		ReadTimeout  time.Duration // How long to wait for a response.
		WriteTimeout time.Duration // How long to wait for a transmit.

		// ResolveCanonicalBootstrapServers turns each bootstrap broker address
		// into a set of IPs, then does a reverse lookup on each one to get its
		// canonical hostname. This list of hostnames then replaces the
		// original address list. Similar to the `client.dns.lookup` option in
		// the JVM client, this is especially useful with GSSAPI, where it
		// allows providing an alias record instead of individual broker
		// hostnames. Defaults to false.
		ResolveCanonicalBootstrapServers bool

		TLS struct {
			// Whether or not to use TLS when connecting to the broker
			// (defaults to false).
			Enable bool
			// The TLS configuration to use for secure connections if
			// enabled (defaults to nil).
			Config *tls.Config
		}

		// SASL based authentication with broker. While there are multiple SASL authentication methods
		// the current implementation is limited to plaintext (SASL/PLAIN) authentication
		SASL struct {
			// Whether or not to use SASL authentication when connecting to the broker
			// (defaults to false).
			Enable bool
			// SASLMechanism is the name of the enabled SASL mechanism.
			// Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN).
			Mechanism SASLMechanism
			// Version is the SASL Protocol Version to use
			// Kafka > 1.x should use V1, except on Azure EventHub which use V0
			Version int16
			// Whether or not to send the Kafka SASL handshake first if enabled
			// (defaults to true). You should only set this to false if you're using
			// a non-Kafka SASL proxy.
			Handshake bool
			// AuthIdentity is an (optional) authorization identity (authzid) to
			// use for SASL/PLAIN authentication (if different from User) when
			// an authenticated user is permitted to act as the presented
			// alternative user. See RFC4616 for details.
			AuthIdentity string
			// User is the authentication identity (authcid) to present for
			// SASL/PLAIN or SASL/SCRAM authentication
			User string
			// Password for SASL/PLAIN authentication
			Password string
			// authz id used for SASL/SCRAM authentication
			SCRAMAuthzID string
			// SCRAMClientGeneratorFunc is a generator of a user provided implementation of a SCRAM
			// client used to perform the SCRAM exchange with the server.
			SCRAMClientGeneratorFunc func() SCRAMClient
			// TokenProvider is a user-defined callback for generating
			// access tokens for SASL/OAUTHBEARER auth. See the
			// AccessTokenProvider interface docs for proper implementation
			// guidelines.
			TokenProvider AccessTokenProvider

			GSSAPI GSSAPIConfig
		}

		// KeepAlive specifies the keep-alive period for an active network connection (defaults to 0).
		// If zero or positive, keep-alives are enabled.
		// If negative, keep-alives are disabled.
		KeepAlive time.Duration

		// LocalAddr is the local address to use when dialing an
		// address. The address must be of a compatible type for the
		// network being dialed.
		// If nil, a local address is automatically chosen.
		LocalAddr net.Addr

		Proxy struct {
			// Whether or not to use proxy when connecting to the broker
			// (defaults to false).
			Enable bool
			// The proxy dialer to use enabled (defaults to nil).
			Dialer proxy.Dialer
		}
	}

	// Metadata is the namespace for metadata management properties used by the
	// Client, and shared by the Producer/Consumer.
	Metadata struct {
		Retry struct {
			// The total number of times to retry a metadata request when the
			// cluster is in the middle of a leader election (default 3).
			Max int
			// How long to wait for leader election to occur before retrying
			// (default 250ms). Similar to the JVM's `retry.backoff.ms`.
			Backoff time.Duration
			// Called to compute backoff time dynamically. Useful for implementing
			// more sophisticated backoff strategies. This takes precedence over
			// `Backoff` if set.
			BackoffFunc func(retries, maxRetries int) time.Duration
		}
		// How frequently to refresh the cluster metadata in the background.
		// Defaults to 10 minutes. Set to 0 to disable. Similar to
		// `topic.metadata.refresh.interval.ms` in the JVM version.
		RefreshFrequency time.Duration

		// Whether to maintain a full set of metadata for all topics, or just
		// the minimal set that has been necessary so far. The full set is simpler
		// and usually more convenient, but can take up a substantial amount of
		// memory if you have many topics and partitions. Defaults to true.
		Full bool

		// How long to wait for a successful metadata response.
		// Disabled by default which means a metadata request against an unreachable
		// cluster (all brokers are unreachable or unresponsive) can take up to
		// `Net.[Dial|Read]Timeout * BrokerCount * (Metadata.Retry.Max + 1) + Metadata.Retry.Backoff * Metadata.Retry.Max`
		// to fail.
		Timeout time.Duration

		// Whether to allow auto-create topics in metadata refresh. If set to true,
		// the broker may auto-create topics that we requested which do not already exist,
		// if it is configured to do so (`auto.create.topics.enable` is true). Defaults to true.
		AllowAutoTopicCreation bool
	}

	// Producer is the namespace for configuration related to producing messages,
	// used by the Producer.
	Producer struct {
		// The maximum permitted size of a message (defaults to 1000000). Should be
		// set equal to or smaller than the broker's `message.max.bytes`.
		MaxMessageBytes int
		// The level of acknowledgement reliability needed from the broker (defaults
		// to WaitForLocal). Equivalent to the `request.required.acks` setting of the
		// JVM producer.
		RequiredAcks RequiredAcks
		// The maximum duration the broker will wait the receipt of the number of
		// RequiredAcks (defaults to 10 seconds). This is only relevant when
		// RequiredAcks is set to WaitForAll or a number > 1. Only supports
		// millisecond resolution, nanoseconds will be truncated. Equivalent to
		// the JVM producer's `request.timeout.ms` setting.
		Timeout time.Duration
		// The type of compression to use on messages (defaults to no compression).
		// Similar to `compression.codec` setting of the JVM producer.
		Compression CompressionCodec
		// The level of compression to use on messages. The meaning depends
		// on the actual compression type used and defaults to default compression
		// level for the codec.
		CompressionLevel int
		// Generates partitioners for choosing the partition to send messages to
		// (defaults to hashing the message key). Similar to the `partitioner.class`
		// setting for the JVM producer.
		Partitioner PartitionerConstructor
		// If enabled, the producer will ensure that exactly one copy of each message is
		// written.
		Idempotent bool
		// Transaction specify
		Transaction struct {
			// Used in transactions to identify an instance of a producer through restarts
			ID string
			// Amount of time a transaction can remain unresolved (neither committed nor aborted)
			// default is 1 min
			Timeout time.Duration

			Retry struct {
				// The total number of times to retry sending a message (default 50).
				// Similar to the `message.send.max.retries` setting of the JVM producer.
				Max int
				// How long to wait for the cluster to settle between retries
				// (default 10ms). Similar to the `retry.backoff.ms` setting of the
				// JVM producer.
				Backoff time.Duration
				// Called to compute backoff time dynamically. Useful for implementing
				// more sophisticated backoff strategies. This takes precedence over
				// `Backoff` if set.
				BackoffFunc func(retries, maxRetries int) time.Duration
			}
		}

		// Return specifies what channels will be populated. If they are set to true,
		// you must read from the respective channels to prevent deadlock. If,
		// however, this config is used to create a `SyncProducer`, both must be set
		// to true and you shall not read from the channels since the producer does
		// this internally.
		Return struct {
			// If enabled, successfully delivered messages will be returned on the
			// Successes channel (default disabled).
			Successes bool

			// If enabled, messages that failed to deliver will be returned on the
			// Errors channel, including error (default enabled).
			Errors bool
		}

		// The following config options control how often messages are batched up and
		// sent to the broker. By default, messages are sent as fast as possible, and
		// all messages received while the current batch is in-flight are placed
		// into the subsequent batch.
		Flush struct {
			// The best-effort number of bytes needed to trigger a flush. Use the
			// global sarama.MaxRequestSize to set a hard upper limit.
			Bytes int
			// The best-effort number of messages needed to trigger a flush. Use
			// `MaxMessages` to set a hard upper limit.
			Messages int
			// The best-effort frequency of flushes. Equivalent to
			// `queue.buffering.max.ms` setting of JVM producer.
			Frequency time.Duration
			// The maximum number of messages the producer will send in a single
			// broker request. Defaults to 0 for unlimited. Similar to
			// `queue.buffering.max.messages` in the JVM producer.
			MaxMessages int
		}

		Retry struct {
			// The total number of times to retry sending a message (default 3).
			// Similar to the `message.send.max.retries` setting of the JVM producer.
			Max int
			// How long to wait for the cluster to settle between retries
			// (default 100ms). Similar to the `retry.backoff.ms` setting of the
			// JVM producer.
			Backoff time.Duration
			// Called to compute backoff time dynamically. Useful for implementing
			// more sophisticated backoff strategies. This takes precedence over
			// `Backoff` if set.
			BackoffFunc func(retries, maxRetries int) time.Duration
		}

		// Interceptors to be called when the producer dispatcher reads the
		// message for the first time. Interceptors allows to intercept and
		// possible mutate the message before they are published to Kafka
		// cluster. *ProducerMessage modified by the first interceptor's
		// OnSend() is passed to the second interceptor OnSend(), and so on in
		// the interceptor chain.
		Interceptors []ProducerInterceptor
	}

	// Consumer is the namespace for configuration related to consuming messages,
	// used by the Consumer.
	Consumer struct {
		// Group is the namespace for configuring consumer group.
		Group struct {
			Session struct {
				// The timeout used to detect consumer failures when using Kafka's group management facility.
				// The consumer sends periodic heartbeats to indicate its liveness to the broker.
				// If no heartbeats are received by the broker before the expiration of this session timeout,
				// then the broker will remove this consumer from the group and initiate a rebalance.
				// Note that the value must be in the allowable range as configured in the broker configuration
				// by `group.min.session.timeout.ms` and `group.max.session.timeout.ms` (default 10s)
				Timeout time.Duration
			}
			Heartbeat struct {
				// The expected time between heartbeats to the consumer coordinator when using Kafka's group
				// management facilities. Heartbeats are used to ensure that the consumer's session stays active and
				// to facilitate rebalancing when new consumers join or leave the group.
				// The value must be set lower than Consumer.Group.Session.Timeout, but typically should be set no
				// higher than 1/3 of that value.
				// It can be adjusted even lower to control the expected time for normal rebalances (default 3s)
				Interval time.Duration
			}
			Rebalance struct {
				// Strategy for allocating topic partitions to members.
				// Deprecated: Strategy exists for historical compatibility
				// and should not be used. Please use GroupStrategies.
				Strategy BalanceStrategy

				// GroupStrategies is the priority-ordered list of client-side consumer group
				// balancing strategies that will be offered to the coordinator. The first
				// strategy that all group members support will be chosen by the leader.
				// default: [ NewBalanceStrategyRange() ]
				GroupStrategies []BalanceStrategy

				// The maximum allowed time for each worker to join the group once a rebalance has begun.
				// This is basically a limit on the amount of time needed for all tasks to flush any pending
				// data and commit offsets. If the timeout is exceeded, then the worker will be removed from
				// the group, which will cause offset commit failures (default 60s).
				Timeout time.Duration

				Retry struct {
					// When a new consumer joins a consumer group the set of consumers attempt to "rebalance"
					// the load to assign partitions to each consumer. If the set of consumers changes while
					// this assignment is taking place the rebalance will fail and retry. This setting controls
					// the maximum number of attempts before giving up (default 4).
					Max int
					// Backoff time between retries during rebalance (default 2s)
					Backoff time.Duration
				}
			}
			Member struct {
				// Custom metadata to include when joining the group. The user data for all joined members
				// can be retrieved by sending a DescribeGroupRequest to the broker that is the
				// coordinator for the group.
				UserData []byte
			}

			// support KIP-345
			InstanceId string

			// If true, consumer offsets will be automatically reset to configured Initial value
			// if the fetched consumer offset is out of range of available offsets. Out of range
			// can happen if the data has been deleted from the server, or during situations of
			// under-replication where a replica does not have all the data yet. It can be
			// dangerous to reset the offset automatically, particularly in the latter case. Defaults
			// to true to maintain existing behavior.
			ResetInvalidOffsets bool
		}

		Retry struct {
			// How long to wait after a failing to read from a partition before
			// trying again (default 2s).
			Backoff time.Duration
			// Called to compute backoff time dynamically. Useful for implementing
			// more sophisticated backoff strategies. This takes precedence over
			// `Backoff` if set.
			BackoffFunc func(retries int) time.Duration
		}

		// Fetch is the namespace for controlling how many bytes are retrieved by any
		// given request.
		Fetch struct {
			// The minimum number of message bytes to fetch in a request - the broker
			// will wait until at least this many are available. The default is 1,
			// as 0 causes the consumer to spin when no messages are available.
			// Equivalent to the JVM's `fetch.min.bytes`.
			Min int32
			// The default number of message bytes to fetch from the broker in each
			// request (default 1MB). This should be larger than the majority of
			// your messages, or else the consumer will spend a lot of time
			// negotiating sizes and not actually consuming. Similar to the JVM's
			// `fetch.message.max.bytes`.
			Default int32
			// The maximum number of message bytes to fetch from the broker in a
			// single request. Messages larger than this will return
			// ErrMessageTooLarge and will not be consumable, so you must be sure
			// this is at least as large as your largest message. Defaults to 0
			// (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
			// global `sarama.MaxResponseSize` still applies.
			Max int32
		}
		// The maximum amount of time the broker will wait for Consumer.Fetch.Min
		// bytes to become available before it returns fewer than that anyways. The
		// default is 250ms, since 0 causes the consumer to spin when no events are
		// available. 100-500ms is a reasonable range for most cases. Kafka only
		// supports precision up to milliseconds; nanoseconds will be truncated.
		// Equivalent to the JVM's `fetch.max.wait.ms`.
		MaxWaitTime time.Duration

		// The maximum amount of time the consumer expects a message takes to
		// process for the user. If writing to the Messages channel takes longer
		// than this, that partition will stop fetching more messages until it
		// can proceed again.
		// Note that, since the Messages channel is buffered, the actual grace time is
		// (MaxProcessingTime * ChannelBufferSize). Defaults to 100ms.
		// If a message is not written to the Messages channel between two ticks
		// of the expiryTicker then a timeout is detected.
		// Using a ticker instead of a timer to detect timeouts should typically
		// result in many fewer calls to Timer functions which may result in a
		// significant performance improvement if many messages are being sent
		// and timeouts are infrequent.
		// The disadvantage of using a ticker instead of a timer is that
		// timeouts will be less accurate. That is, the effective timeout could
		// be between `MaxProcessingTime` and `2 * MaxProcessingTime`. For
		// example, if `MaxProcessingTime` is 100ms then a delay of 180ms
		// between two messages being sent may not be recognized as a timeout.
		MaxProcessingTime time.Duration

		// Return specifies what channels will be populated. If they are set to true,
		// you must read from them to prevent deadlock.
		Return struct {
			// If enabled, any errors that occurred while consuming are returned on
			// the Errors channel (default disabled).
			Errors bool
		}

		// Offsets specifies configuration for how and when to commit consumed
		// offsets. This currently requires the manual use of an OffsetManager
		// but will eventually be automated.
		Offsets struct {
			// Deprecated: CommitInterval exists for historical compatibility
			// and should not be used. Please use Consumer.Offsets.AutoCommit
			CommitInterval time.Duration

			// AutoCommit specifies configuration for commit messages automatically.
			AutoCommit struct {
				// Whether or not to auto-commit updated offsets back to the broker.
				// (default enabled).
				Enable bool

				// How frequently to commit updated offsets. Ineffective unless
				// auto-commit is enabled (default 1s)
				Interval time.Duration
			}

			// The initial offset to use if no offset was previously committed.
			// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
			Initial int64

			// The retention duration for committed offsets. If zero, disabled
			// (in which case the `offsets.retention.minutes` option on the
			// broker will be used).  Kafka only supports precision up to
			// milliseconds; nanoseconds will be truncated. Requires Kafka
			// broker version 0.9.0 or later.
			// (default is 0: disabled).
			Retention time.Duration

			Retry struct {
				// The total number of times to retry failing commit
				// requests during OffsetManager shutdown (default 3).
				Max int
			}
		}

		// IsolationLevel support 2 mode:
		// 	- use `ReadUncommitted` (default) to consume and return all messages in message channel
		//	- use `ReadCommitted` to hide messages that are part of an aborted transaction
		IsolationLevel IsolationLevel

		// Interceptors to be called just before the record is sent to the
		// messages channel. Interceptors allows to intercept and possible
		// mutate the message before they are returned to the client.
		// *ConsumerMessage modified by the first interceptor's OnConsume() is
		// passed to the second interceptor OnConsume(), and so on in the
		// interceptor chain.
		Interceptors []ConsumerInterceptor
	}

	// A user-provided string sent with every request to the brokers for logging,
	// debugging, and auditing purposes. Defaults to "sarama", but you should
	// probably set it to something specific to your application.
	ClientID string
	// A rack identifier for this client. This can be any string value which
	// indicates where this client is physically located.
	// It corresponds with the broker config 'broker.rack'
	RackID string
	// The number of events to buffer in internal and external channels. This
	// permits the producer and consumer to continue processing some messages
	// in the background while user code is working, greatly improving throughput.
	// Defaults to 256.
	ChannelBufferSize int
	// ApiVersionsRequest determines whether Sarama should send an
	// ApiVersionsRequest message to each broker as part of its initial
	// connection. This defaults to `true` to match the official Java client
	// and most 3rdparty ones.
	ApiVersionsRequest bool
	// The version of Kafka that Sarama will assume it is running against.
	// Defaults to the oldest supported stable version. Since Kafka provides
	// backwards-compatibility, setting it to a version older than you have
	// will not break anything, although it may prevent you from using the
	// latest features. Setting it to a version greater than you are actually
	// running may lead to random breakage.
	Version KafkaVersion
	// The registry to define metrics into.
	// Defaults to a local registry.
	// If you want to disable metrics gathering, set "metrics.UseNilMetrics" to "true"
	// prior to starting Sarama.
	// See Examples on how to use the metrics registry
	MetricRegistry metrics.Registry
}

Config is used to pass multiple configuration options to Sarama's constructors.

Example (Metrics)

This example shows how to integrate with an existing registry as well as publishing metrics on the standard output

// Our application registry
appMetricRegistry := metrics.NewRegistry()
appGauge := metrics.GetOrRegisterGauge("m1", appMetricRegistry)
appGauge.Update(1)

config := NewTestConfig()
// Use a prefix registry instead of the default local one
config.MetricRegistry = metrics.NewPrefixedChildRegistry(appMetricRegistry, "sarama.")

// Simulate a metric created by sarama without starting a broker
saramaGauge := metrics.GetOrRegisterGauge("m2", config.MetricRegistry)
saramaGauge.Update(2)

metrics.WriteOnce(appMetricRegistry, os.Stdout)
Output:

gauge m1
  value:               1
gauge sarama.m2
  value:               2

func NewConfig

func NewConfig() *Config

NewConfig returns a new configuration instance with sane defaults.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks a Config instance. It will return a ConfigurationError if the specified values don't make sense.

type ConfigEntry added in v1.16.0

type ConfigEntry struct {
	Name      string
	Value     string
	ReadOnly  bool
	Default   bool
	Source    ConfigSource
	Sensitive bool
	Synonyms  []*ConfigSynonym
}

type ConfigResource added in v1.16.0

type ConfigResource struct {
	Type        ConfigResourceType
	Name        string
	ConfigNames []string
}

type ConfigResourceType added in v1.16.0

type ConfigResourceType int8

ConfigResourceType is a type for resources that have configs.

const (
	// UnknownResource constant type
	UnknownResource ConfigResourceType = 0
	// TopicResource constant type
	TopicResource ConfigResourceType = 2
	// BrokerResource constant type
	BrokerResource ConfigResourceType = 4
	// BrokerLoggerResource constant type
	BrokerLoggerResource ConfigResourceType = 8
)

type ConfigSource added in v1.20.1

type ConfigSource int8
const (
	SourceUnknown ConfigSource = iota
	SourceTopic
	SourceDynamicBroker
	SourceDynamicDefaultBroker
	SourceStaticBroker
	SourceDefault
)

func (ConfigSource) String added in v1.20.1

func (s ConfigSource) String() string

type ConfigSynonym added in v1.20.1

type ConfigSynonym struct {
	ConfigName  string
	ConfigValue string
	Source      ConfigSource
}

type ConfigurationError

type ConfigurationError string

ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.

func (ConfigurationError) Error

func (err ConfigurationError) Error() string

type Consumer

type Consumer interface {
	// Topics returns the set of available topics as retrieved from the cluster
	// metadata. This method is the same as Client.Topics(), and is provided for
	// convenience.
	Topics() ([]string, error)

	// Partitions returns the sorted list of all partition IDs for the given topic.
	// This method is the same as Client.Partitions(), and is provided for convenience.
	Partitions(topic string) ([]int32, error)

	// ConsumePartition creates a PartitionConsumer on the given topic/partition with
	// the given offset. It will return an error if this Consumer is already consuming
	// on the given topic/partition. Offset can be a literal offset, or OffsetNewest
	// or OffsetOldest
	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)

	// HighWaterMarks returns the current high water marks for each topic and partition.
	// Consistency between partitions is not guaranteed since high water marks are updated separately.
	HighWaterMarks() map[string]map[int32]int64

	// Close shuts down the consumer. It must be called after all child
	// PartitionConsumers have already been closed.
	Close() error

	// Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
	// records from these partitions until they have been resumed using Resume()/ResumeAll().
	// Note that this method does not affect partition subscription.
	// In particular, it does not cause a group rebalance when automatic assignment is used.
	Pause(topicPartitions map[string][]int32)

	// Resume resumes specified partitions which have been paused with Pause()/PauseAll().
	// New calls to the broker will return records from these partitions if there are any to be fetched.
	Resume(topicPartitions map[string][]int32)

	// PauseAll suspends fetching from all partitions. Future calls to the broker will not return any
	// records from these partitions until they have been resumed using Resume()/ResumeAll().
	// Note that this method does not affect partition subscription.
	// In particular, it does not cause a group rebalance when automatic assignment is used.
	PauseAll()

	// ResumeAll resumes all partitions which have been paused with Pause()/PauseAll().
	// New calls to the broker will return records from these partitions if there are any to be fetched.
	ResumeAll()
}

Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

Example

This example shows how to use the consumer to read messages from a single partition.

consumer, err := NewConsumer([]string{"localhost:9092"}, NewTestConfig())
if err != nil {
	panic(err)
}

defer func() {
	if err := consumer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
if err != nil {
	panic(err)
}

defer func() {
	if err := partitionConsumer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

consumed := 0
ConsumerLoop:
for {
	select {
	case msg := <-partitionConsumer.Messages():
		log.Printf("Consumed message offset %d\n", msg.Offset)
		consumed++
	case <-signals:
		break ConsumerLoop
	}
}

log.Printf("Consumed: %d\n", consumed)
Output:

func NewConsumer

func NewConsumer(addrs []string, config *Config) (Consumer, error)

NewConsumer creates a new consumer using the given broker addresses and configuration.

func NewConsumerFromClient

func NewConsumerFromClient(client Client) (Consumer, error)

NewConsumerFromClient creates a new consumer using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer.

type ConsumerError

type ConsumerError struct {
	Topic     string
	Partition int32
	Err       error
}

ConsumerError is what is provided to the user when an error occurs. It wraps an error and includes the topic and partition.

func (ConsumerError) Error

func (ce ConsumerError) Error() string

func (ConsumerError) Unwrap added in v1.40.0

func (ce ConsumerError) Unwrap() error

type ConsumerErrors

type ConsumerErrors []*ConsumerError

ConsumerErrors is a type that wraps a batch of errors and implements the Error interface. It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors when stopping.

func (ConsumerErrors) Error

func (ce ConsumerErrors) Error() string

type ConsumerGroup added in v1.19.0

type ConsumerGroup interface {
	// Consume joins a cluster of consumers for a given list of topics and
	// starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
	//
	// The life-cycle of a session is represented by the following steps:
	//
	// 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
	//    and is assigned their "fair share" of partitions, aka 'claims'.
	// 2. Before processing starts, the handler's Setup() hook is called to notify the user
	//    of the claims and allow any necessary preparation or alteration of state.
	// 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
	//    in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
	//    from concurrent reads/writes.
	// 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
	//    parent context is canceled or when a server-side rebalance cycle is initiated.
	// 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
	//    to allow the user to perform any final tasks before a rebalance.
	// 6. Finally, marked offsets are committed one last time before claims are released.
	//
	// Please note, that once a rebalance is triggered, sessions must be completed within
	// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
	// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
	// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
	// commit failures.
	// This method should be called inside an infinite loop, when a
	// server-side rebalance happens, the consumer session will need to be
	// recreated to get the new claims.
	Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error

	// Errors returns a read channel of errors that occurred during the consumer life-cycle.
	// By default, errors are logged and not returned over this channel.
	// If you want to implement any custom error handling, set your config's
	// Consumer.Return.Errors setting to true, and read from this channel.
	Errors() <-chan error

	// Close stops the ConsumerGroup and detaches any running sessions. It is required to call
	// this function before the object passes out of scope, as it will otherwise leak memory.
	Close() error

	// Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
	// records from these partitions until they have been resumed using Resume()/ResumeAll().
	// Note that this method does not affect partition subscription.
	// In particular, it does not cause a group rebalance when automatic assignment is used.
	Pause(partitions map[string][]int32)

	// Resume resumes specified partitions which have been paused with Pause()/PauseAll().
	// New calls to the broker will return records from these partitions if there are any to be fetched.
	Resume(partitions map[string][]int32)

	// Pause suspends fetching from all partitions. Future calls to the broker will not return any
	// records from these partitions until they have been resumed using Resume()/ResumeAll().
	// Note that this method does not affect partition subscription.
	// In particular, it does not cause a group rebalance when automatic assignment is used.
	PauseAll()

	// Resume resumes all partitions which have been paused with Pause()/PauseAll().
	// New calls to the broker will return records from these partitions if there are any to be fetched.
	ResumeAll()
}

ConsumerGroup is responsible for dividing up processing of topics and partitions over a collection of processes (the members of the consumer group).

Example
package main

import (
	"context"
	"fmt"
)

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
		sess.MarkMessage(msg, "")
	}
	return nil
}

func main() {
	config := NewTestConfig()
	config.Version = V2_0_0_0 // specify appropriate version
	config.Consumer.Return.Errors = true

	group, err := NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)
	if err != nil {
		panic(err)
	}
	defer func() { _ = group.Close() }()

	// Track errors
	go func() {
		for err := range group.Errors() {
			fmt.Println("ERROR", err)
		}
	}()

	// Iterate over consumer sessions.
	ctx := context.Background()
	for {
		topics := []string{"my-topic"}
		handler := exampleConsumerGroupHandler{}

		// `Consume` should be called inside an infinite loop, when a
		// server-side rebalance happens, the consumer session will need to be
		// recreated to get the new claims
		err := group.Consume(ctx, topics, handler)
		if err != nil {
			panic(err)
		}
	}
}
Output:

func NewConsumerGroup added in v1.19.0

func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error)

NewConsumerGroup creates a new consumer group the given broker addresses and configuration.

func NewConsumerGroupFromClient added in v1.19.0

func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error)

NewConsumerGroupFromClient creates a new consumer group using the given client. It is still necessary to call Close() on the underlying client when shutting down this consumer. PLEASE NOTE: consumer groups can only re-use but not share clients.

type ConsumerGroupClaim added in v1.19.0

type ConsumerGroupClaim interface {
	// Topic returns the consumed topic name.
	Topic() string

	// Partition returns the consumed partition.
	Partition() int32

	// InitialOffset returns the initial offset that was used as a starting point for this claim.
	InitialOffset() int64

	// HighWaterMarkOffset returns the high watermark offset of the partition,
	// i.e. the offset that will be used for the next message that will be produced.
	// You can use this to determine how far behind the processing is.
	HighWaterMarkOffset() int64

	// Messages returns the read channel for the messages that are returned by
	// the broker. The messages channel will be closed when a new rebalance cycle
	// is due. You must finish processing and mark offsets within
	// Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
	// re-assigned to another group member.
	Messages() <-chan *ConsumerMessage
}

ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.

type ConsumerGroupHandler added in v1.19.0

type ConsumerGroupHandler interface {
	// Setup is run at the beginning of a new session, before ConsumeClaim.
	Setup(ConsumerGroupSession) error

	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
	// but before the offsets are committed for the very last time.
	Cleanup(ConsumerGroupSession) error

	// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
	// Once the Messages() channel is closed, the Handler must finish its processing
	// loop and exit.
	ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}

ConsumerGroupHandler instances are used to handle individual topic/partition claims. It also provides hooks for your consumer group session life-cycle and allow you to trigger logic before or after the consume loop(s).

PLEASE NOTE that handlers are likely be called from several goroutines concurrently, ensure that all state is safely protected against race conditions.

type ConsumerGroupMemberAssignment added in v1.8.0

type ConsumerGroupMemberAssignment struct {
	Version  int16
	Topics   map[string][]int32
	UserData []byte
}

ConsumerGroupMemberAssignment holds the member assignment for a consume group https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json

type ConsumerGroupMemberMetadata added in v1.8.0

type ConsumerGroupMemberMetadata struct {
	Version         int16
	Topics          []string
	UserData        []byte
	OwnedPartitions []*OwnedPartition
	GenerationID    int32
	RackID          *string
}

ConsumerGroupMemberMetadata holds the metadata for consumer group https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json

type ConsumerGroupSession added in v1.19.0

type ConsumerGroupSession interface {
	// Claims returns information about the claimed partitions by topic.
	Claims() map[string][]int32

	// MemberID returns the cluster member ID.
	MemberID() string

	// GenerationID returns the current generation ID.
	GenerationID() int32

	// MarkOffset marks the provided offset, alongside a metadata string
	// that represents the state of the partition consumer at that point in time. The
	// metadata string can be used by another consumer to restore that state, so it
	// can resume consumption.
	//
	// To follow upstream conventions, you are expected to mark the offset of the
	// next message to read, not the last message read. Thus, when calling `MarkOffset`
	// you should typically add one to the offset of the last consumed message.
	//
	// Note: calling MarkOffset does not necessarily commit the offset to the backend
	// store immediately for efficiency reasons, and it may never be committed if
	// your application crashes. This means that you may end up processing the same
	// message twice, and your processing should ideally be idempotent.
	MarkOffset(topic string, partition int32, offset int64, metadata string)

	// Commit the offset to the backend
	//
	// Note: calling Commit performs a blocking synchronous operation.
	Commit()

	// ResetOffset resets to the provided offset, alongside a metadata string that
	// represents the state of the partition consumer at that point in time. Reset
	// acts as a counterpart to MarkOffset, the difference being that it allows to
	// reset an offset to an earlier or smaller value, where MarkOffset only
	// allows incrementing the offset. cf MarkOffset for more details.
	ResetOffset(topic string, partition int32, offset int64, metadata string)

	// MarkMessage marks a message as consumed.
	MarkMessage(msg *ConsumerMessage, metadata string)

	// Context returns the session context.
	Context() context.Context
}

ConsumerGroupSession represents a consumer group member session.

type ConsumerInterceptor added in v1.40.0

type ConsumerInterceptor interface {

	// OnConsume is called when the consumed message is intercepted. Please
	// avoid modifying the message until it's safe to do so, as this is _not_ a
	// copy of the message.
	OnConsume(*ConsumerMessage)
}

ConsumerInterceptor allows you to intercept (and possibly mutate) the records received by the consumer before they are sent to the messages channel. https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation

type ConsumerMessage

type ConsumerMessage struct {
	Headers        []*RecordHeader // only set if kafka is version 0.11+
	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp

	Key, Value []byte
	Topic      string
	Partition  int32
	Offset     int64
}

ConsumerMessage encapsulates a Kafka message returned by the consumer.

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	Version       int16
	ConsumerGroup string
}

ConsumerMetadataRequest is used for metadata requests

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Version         int16
	Err             KError
	Coordinator     *Broker
	CoordinatorID   int32  // deprecated: use Coordinator.ID()
	CoordinatorHost string // deprecated: use Coordinator.Addr()
	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
}

ConsumerMetadataResponse holds the response for a consumer group meta data requests

type ControlRecord added in v1.40.0

type ControlRecord struct {
	Version          int16
	CoordinatorEpoch int32
	Type             ControlRecordType
}

Control records are returned as a record by fetchRequest However unlike "normal" records, they mean nothing application wise. They only serve internal logic for supporting transactions.

type ControlRecordType added in v1.40.0

type ControlRecordType int

ControlRecordType ...

const (
	// ControlRecordAbort is a control record for abort
	ControlRecordAbort ControlRecordType = iota
	// ControlRecordCommit is a control record for commit
	ControlRecordCommit
	// ControlRecordUnknown is a control record of unknown type
	ControlRecordUnknown
)

type CoordinatorType added in v1.17.0

type CoordinatorType int8
const (
	CoordinatorGroup CoordinatorType = iota
	CoordinatorTransaction
)

type CreateAclsRequest added in v1.16.0

type CreateAclsRequest struct {
	Version      int16
	AclCreations []*AclCreation
}

CreateAclsRequest is an acl creation request

type CreateAclsResponse added in v1.16.0

type CreateAclsResponse struct {
	Version              int16
	ThrottleTime         time.Duration
	AclCreationResponses []*AclCreationResponse
}

CreateAclsResponse is a an acl response creation type

type CreatePartitionsRequest added in v1.15.0

type CreatePartitionsRequest struct {
	Version         int16
	TopicPartitions map[string]*TopicPartition
	Timeout         time.Duration
	ValidateOnly    bool
}

type CreatePartitionsResponse added in v1.15.0

type CreatePartitionsResponse struct {
	Version              int16
	ThrottleTime         time.Duration
	TopicPartitionErrors map[string]*TopicPartitionError
}

type CreateTopicsRequest added in v1.16.0

type CreateTopicsRequest struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// TopicDetails contains the topics to create.
	TopicDetails map[string]*TopicDetail
	// Timeout contains how long to wait before timing out the request.
	Timeout time.Duration
	// ValidateOnly if true, check that the topics can be created as specified,
	// but don't create anything.
	ValidateOnly bool
}

type CreateTopicsResponse added in v1.16.0

type CreateTopicsResponse struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ThrottleTime contains the duration for which the request was throttled due
	// to a quota violation, or zero if the request did not violate any quota.
	ThrottleTime time.Duration
	// TopicErrors contains a map of any errors for the topics we tried to create.
	TopicErrors map[string]*TopicError
}

type DeleteAclsRequest added in v1.16.0

type DeleteAclsRequest struct {
	Version int
	Filters []*AclFilter
}

DeleteAclsRequest is a delete acl request

type DeleteAclsResponse added in v1.16.0

type DeleteAclsResponse struct {
	Version         int16
	ThrottleTime    time.Duration
	FilterResponses []*FilterResponse
}

DeleteAclsResponse is a delete acl response

type DeleteGroupsRequest added in v1.17.0

type DeleteGroupsRequest struct {
	Version int16
	Groups  []string
}

func (*DeleteGroupsRequest) AddGroup added in v1.17.0

func (r *DeleteGroupsRequest) AddGroup(group string)

type DeleteGroupsResponse added in v1.17.0

type DeleteGroupsResponse struct {
	Version         int16
	ThrottleTime    time.Duration
	GroupErrorCodes map[string]KError
}

type DeleteOffsetsRequest added in v1.40.0

type DeleteOffsetsRequest struct {
	Version int16
	Group   string
	// contains filtered or unexported fields
}

func (*DeleteOffsetsRequest) AddPartition added in v1.40.0

func (r *DeleteOffsetsRequest) AddPartition(topic string, partitionID int32)

type DeleteOffsetsResponse added in v1.40.0

type DeleteOffsetsResponse struct {
	Version int16
	// The top-level error code, or 0 if there was no error.
	ErrorCode    KError
	ThrottleTime time.Duration
	// The responses for each partition of the topics.
	Errors map[string]map[int32]KError
}

func (*DeleteOffsetsResponse) AddError added in v1.40.0

func (r *DeleteOffsetsResponse) AddError(topic string, partition int32, errorCode KError)

type DeleteRecordsRequest added in v1.17.0

type DeleteRecordsRequest struct {
	Version int16
	Topics  map[string]*DeleteRecordsRequestTopic
	Timeout time.Duration
}

type DeleteRecordsRequestTopic added in v1.17.0

type DeleteRecordsRequestTopic struct {
	PartitionOffsets map[int32]int64 // partition => offset
}

type DeleteRecordsResponse added in v1.17.0

type DeleteRecordsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Topics       map[string]*DeleteRecordsResponseTopic
}

type DeleteRecordsResponsePartition added in v1.17.0

type DeleteRecordsResponsePartition struct {
	LowWatermark int64
	Err          KError
}

type DeleteRecordsResponseTopic added in v1.17.0

type DeleteRecordsResponseTopic struct {
	Partitions map[int32]*DeleteRecordsResponsePartition
}

type DeleteTopicsRequest added in v1.16.0

type DeleteTopicsRequest struct {
	Version int16
	Topics  []string
	Timeout time.Duration
}

type DeleteTopicsResponse added in v1.16.0

type DeleteTopicsResponse struct {
	Version         int16
	ThrottleTime    time.Duration
	TopicErrorCodes map[string]KError
}

type DescribeAclsRequest added in v1.16.0

type DescribeAclsRequest struct {
	Version int
	AclFilter
}

DescribeAclsRequest is a describe acl request type

type DescribeAclsResponse added in v1.16.0

type DescribeAclsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Err          KError
	ErrMsg       *string
	ResourceAcls []*ResourceAcls
}

DescribeAclsResponse is a describe acl response type

type DescribeClientQuotasEntry added in v1.40.0

type DescribeClientQuotasEntry struct {
	Entity []QuotaEntityComponent // The quota entity description.
	Values map[string]float64     // The quota values for the entity.
}

type DescribeClientQuotasRequest added in v1.40.0

type DescribeClientQuotasRequest struct {
	Version    int16
	Components []QuotaFilterComponent
	Strict     bool
}

A filter to be applied to matching client quotas. Components: the components to filter on Strict: whether the filter only includes specified components

type DescribeClientQuotasResponse added in v1.40.0

type DescribeClientQuotasResponse struct {
	Version      int16
	ThrottleTime time.Duration               // The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
	ErrorCode    KError                      // The error code, or `0` if the quota description succeeded.
	ErrorMsg     *string                     // The error message, or `null` if the quota description succeeded.
	Entries      []DescribeClientQuotasEntry // A result entry.
}

type DescribeConfigError added in v1.42.2

type DescribeConfigError struct {
	Err    KError
	ErrMsg string
}

func (*DescribeConfigError) Error added in v1.42.2

func (c *DescribeConfigError) Error() string

type DescribeConfigsRequest added in v1.16.0

type DescribeConfigsRequest struct {
	Version         int16
	Resources       []*ConfigResource
	IncludeSynonyms bool
}

type DescribeConfigsResponse added in v1.16.0

type DescribeConfigsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Resources    []*ResourceResponse
}

type DescribeGroupsRequest added in v1.7.0

type DescribeGroupsRequest struct {
	Version                     int16
	Groups                      []string
	IncludeAuthorizedOperations bool
}

func (*DescribeGroupsRequest) AddGroup added in v1.7.0

func (r *DescribeGroupsRequest) AddGroup(group string)

type DescribeGroupsResponse added in v1.7.0

type DescribeGroupsResponse struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ThrottleTimeMs contains the duration in milliseconds for which the
	// request was throttled due to a quota violation, or zero if the request
	// did not violate any quota.
	ThrottleTimeMs int32
	// Groups contains each described group.
	Groups []*GroupDescription
}

type DescribeLogDirsRequest added in v1.40.0

type DescribeLogDirsRequest struct {
	// Version 0 and 1 are equal
	// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
	Version int16

	// If this is an empty array, all topics will be queried
	DescribeTopics []DescribeLogDirsRequestTopic
}

DescribeLogDirsRequest is a describe request to get partitions' log size

type DescribeLogDirsRequestTopic added in v1.40.0

type DescribeLogDirsRequestTopic struct {
	Topic        string
	PartitionIDs []int32
}

DescribeLogDirsRequestTopic is a describe request about the log dir of one or more partitions within a Topic

type DescribeLogDirsResponse added in v1.40.0

type DescribeLogDirsResponse struct {
	ThrottleTime time.Duration

	// Version 0 and 1 are equal
	// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
	Version int16

	LogDirs []DescribeLogDirsResponseDirMetadata
}

type DescribeLogDirsResponseDirMetadata added in v1.40.0

type DescribeLogDirsResponseDirMetadata struct {
	ErrorCode KError

	// The absolute log directory path
	Path   string
	Topics []DescribeLogDirsResponseTopic
}

type DescribeLogDirsResponsePartition added in v1.40.0

type DescribeLogDirsResponsePartition struct {
	PartitionID int32

	// The size of the log segments of the partition in bytes.
	Size int64

	// The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
	// current replica's LEO (if it is the future log for the partition)
	OffsetLag int64

	// True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
	// the replica in the future.
	IsTemporary bool
}

DescribeLogDirsResponsePartition describes a partition's log directory

type DescribeLogDirsResponseTopic added in v1.40.0

type DescribeLogDirsResponseTopic struct {
	Topic      string
	Partitions []DescribeLogDirsResponsePartition
}

DescribeLogDirsResponseTopic contains a topic's partitions descriptions

type DescribeUserScramCredentialsRequest added in v1.40.0

type DescribeUserScramCredentialsRequest struct {
	// Version 0 is currently only supported
	Version int16

	// If this is an empty array, all users will be queried
	DescribeUsers []DescribeUserScramCredentialsRequestUser
}

DescribeUserScramCredentialsRequest is a request to get list of SCRAM user names

type DescribeUserScramCredentialsRequestUser added in v1.40.0

type DescribeUserScramCredentialsRequestUser struct {
	Name string
}

DescribeUserScramCredentialsRequestUser is a describe request about specific user name

type DescribeUserScramCredentialsResponse added in v1.40.0

type DescribeUserScramCredentialsResponse struct {
	// Version 0 is currently only supported
	Version int16

	ThrottleTime time.Duration

	ErrorCode    KError
	ErrorMessage *string

	Results []*DescribeUserScramCredentialsResult
}

type DescribeUserScramCredentialsResult added in v1.40.0

type DescribeUserScramCredentialsResult struct {
	User string

	ErrorCode    KError
	ErrorMessage *string

	CredentialInfos []*UserScramCredentialsResponseInfo
}

type DynamicConsistencyPartitioner added in v1.18.0

type DynamicConsistencyPartitioner interface {
	Partitioner

	// MessageRequiresConsistency is similar to Partitioner.RequiresConsistency,
	// but takes in the message being partitioned so that the partitioner can
	// make a per-message determination.
	MessageRequiresConsistency(message *ProducerMessage) bool
}

DynamicConsistencyPartitioner can optionally be implemented by Partitioners in order to allow more flexibility than is originally allowed by the RequiresConsistency method in the Partitioner interface. This allows partitioners to require consistency sometimes, but not all times. It's useful for, e.g., the HashPartitioner, which does not require consistency if the message key is nil.

type Encoder

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

type EndTxnRequest added in v1.16.0

type EndTxnRequest struct {
	Version           int16
	TransactionalID   string
	ProducerID        int64
	ProducerEpoch     int16
	TransactionResult bool
}

type EndTxnResponse added in v1.16.0

type EndTxnResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Err          KError
}

type FetchRequest

type FetchRequest struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ReplicaID contains the broker ID of the follower, of -1 if this request
	// is from a consumer.
	// ReplicaID int32
	// MaxWaitTime contains the maximum time in milliseconds to wait for the response.
	MaxWaitTime int32
	// MinBytes contains the minimum bytes to accumulate in the response.
	MinBytes int32
	// MaxBytes contains the maximum bytes to fetch.  See KIP-74 for cases
	// where this limit may not be honored.
	MaxBytes int32
	// Isolation contains a This setting controls the visibility of
	// transactional records. Using READ_UNCOMMITTED (isolation_level = 0)
	// makes all records visible. With READ_COMMITTED (isolation_level = 1),
	// non-transactional and COMMITTED transactional records are visible. To be
	// more concrete, READ_COMMITTED returns all data from offsets smaller than
	// the current LSO (last stable offset), and enables the inclusion of the
	// list of aborted transactions in the result, which allows consumers to
	// discard ABORTED transactional records
	Isolation IsolationLevel
	// SessionID contains the fetch session ID.
	SessionID int32
	// SessionEpoch contains the epoch of the partition leader as known to the
	// follower replica or a consumer.
	SessionEpoch int32

	// RackID contains a Rack ID of the consumer making this request
	RackID string
	// contains filtered or unexported fields
}

FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

func (*FetchRequest) AddBlock

func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32, leaderEpoch int32)

type FetchResponse

type FetchResponse struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ThrottleTime contains the duration in milliseconds for which the request
	// was throttled due to a quota violation, or zero if the request did not
	// violate any quota.
	ThrottleTime time.Duration
	// ErrorCode contains the top level response error code.
	ErrorCode int16
	// SessionID contains the fetch session ID, or 0 if this is not part of a fetch session.
	SessionID int32
	// Blocks contains the response topics.
	Blocks map[string]map[int32]*FetchResponseBlock

	LogAppendTime bool
	Timestamp     time.Time
}

func (*FetchResponse) AddControlRecord added in v1.40.0

func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType)

func (*FetchResponse) AddControlRecordWithTimestamp added in v1.40.0

func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time)

func (*FetchResponse) AddError

func (r *FetchResponse) AddError(topic string, partition int32, err KError)

func (*FetchResponse) AddMessage

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) AddMessageWithTimestamp added in v1.40.0

func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8)

func (*FetchResponse) AddRecord added in v1.14.0

func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) AddRecordBatch added in v1.40.0

func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool)

func (*FetchResponse) AddRecordBatchWithTimestamp added in v1.40.0

func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time)

AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions

func (*FetchResponse) AddRecordWithTimestamp added in v1.40.0

func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time)

func (*FetchResponse) GetBlock

func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock

func (*FetchResponse) SetLastOffsetDelta added in v1.16.0

func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32)

func (*FetchResponse) SetLastStableOffset added in v1.14.0

func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64)

type FetchResponseBlock

type FetchResponseBlock struct {
	// Err contains the error code, or 0 if there was no fetch error.
	Err KError
	// HighWatermarkOffset contains the current high water mark.
	HighWaterMarkOffset int64
	// LastStableOffset contains the last stable offset (or LSO) of the
	// partition. This is the last offset such that the state of all
	// transactional records prior to this offset have been decided (ABORTED or
	// COMMITTED)
	LastStableOffset       int64
	LastRecordsBatchOffset *int64
	// LogStartOffset contains the current log start offset.
	LogStartOffset int64
	// AbortedTransactions contains the aborted transactions.
	AbortedTransactions []*AbortedTransaction
	// PreferredReadReplica contains the preferred read replica for the
	// consumer to use on its next fetch request
	PreferredReadReplica int32
	// RecordsSet contains the record data.
	RecordsSet []*Records

	Partial bool
	Records *Records // deprecated: use FetchResponseBlock.RecordsSet
}

type FilterResponse added in v1.16.0

type FilterResponse struct {
	Err          KError
	ErrMsg       *string
	MatchingAcls []*MatchingAcl
}

FilterResponse is a filter response type

type FindCoordinatorRequest added in v1.17.0

type FindCoordinatorRequest struct {
	Version         int16
	CoordinatorKey  string
	CoordinatorType CoordinatorType
}

type FindCoordinatorResponse added in v1.17.0

type FindCoordinatorResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Err          KError
	ErrMsg       *string
	Coordinator  *Broker
}

type GSSAPIConfig added in v1.40.0

type GSSAPIConfig struct {
	AuthType           int
	KeyTabPath         string
	CCachePath         string
	KerberosConfigPath string
	ServiceName        string
	Username           string
	Password           string
	Realm              string
	DisablePAFXFAST    bool
	BuildSpn           BuildSpnFunc
}

type GSSAPIKerberosAuth added in v1.40.0

type GSSAPIKerberosAuth struct {
	Config *GSSAPIConfig

	NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error)
	// contains filtered or unexported fields
}

func (*GSSAPIKerberosAuth) Authorize added in v1.40.0

func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error

This does the handshake for authorization

type GSSApiHandlerFunc added in v1.40.0

type GSSApiHandlerFunc func([]byte) []byte

type GroupData added in v1.41.0

type GroupData struct {
	GroupState string // version 4 or later
}

type GroupDescription added in v1.7.0

type GroupDescription struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// Err contains the describe error as the KError type.
	Err KError
	// ErrorCode contains the describe error, or 0 if there was no error.
	ErrorCode int16
	// GroupId contains the group ID string.
	GroupId string
	// State contains the group state string, or the empty string.
	State string
	// ProtocolType contains the group protocol type, or the empty string.
	ProtocolType string
	// Protocol contains the group protocol data, or the empty string.
	Protocol string
	// Members contains the group members.
	Members map[string]*GroupMemberDescription
	// AuthorizedOperations contains a 32-bit bitfield to represent authorized
	// operations for this group.
	AuthorizedOperations int32
}

GroupDescription contains each described group.

type GroupMember added in v1.40.0

type GroupMember struct {
	// MemberId contains the group member ID.
	MemberId string
	// GroupInstanceId contains the unique identifier of the consumer instance
	// provided by end user.
	GroupInstanceId *string
	// Metadata contains the group member metadata.
	Metadata []byte
}

type GroupMemberDescription added in v1.7.0

type GroupMemberDescription struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// MemberId contains the member ID assigned by the group coordinator.
	MemberId string
	// GroupInstanceId contains the unique identifier of the consumer instance
	// provided by end user.
	GroupInstanceId *string
	// ClientId contains the client ID used in the member's latest join group
	// request.
	ClientId string
	// ClientHost contains the client host.
	ClientHost string
	// MemberMetadata contains the metadata corresponding to the current group
	// protocol in use.
	MemberMetadata []byte
	// MemberAssignment contains the current assignment provided by the group
	// leader.
	MemberAssignment []byte
}

GroupMemberDescription contains the group members.

func (*GroupMemberDescription) GetMemberAssignment added in v1.11.0

func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

func (*GroupMemberDescription) GetMemberMetadata added in v1.11.0

func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error)

type GroupProtocol added in v1.12.0

type GroupProtocol struct {
	// Name contains the protocol name.
	Name string
	// Metadata contains the protocol metadata.
	Metadata []byte
}

type HashPartitionerOption added in v1.18.0

type HashPartitionerOption func(*hashPartitioner)

HashPartitionerOption lets you modify default values of the partitioner

func WithAbsFirst added in v1.18.0

func WithAbsFirst() HashPartitionerOption

WithAbsFirst means that the partitioner handles absolute values in the same way as the reference Java implementation

func WithCustomFallbackPartitioner added in v1.18.0

func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption

WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty

func WithCustomHashFunction added in v1.18.0

func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption

WithCustomHashFunction lets you specify what hash function to use for the partitioning

func WithHashUnsigned added in v1.41.0

func WithHashUnsigned() HashPartitionerOption

WithHashUnsigned means the partitioner treats the hashed value as unsigned when partitioning. This is intended to be combined with the crc32 hash algorithm to be compatible with librdkafka's implementation

type HeartbeatRequest added in v1.7.0

type HeartbeatRequest struct {
	Version         int16
	GroupId         string
	GenerationId    int32
	MemberId        string
	GroupInstanceId *string
}

type HeartbeatResponse added in v1.7.0

type HeartbeatResponse struct {
	Version      int16
	ThrottleTime int32
	Err          KError
}

type IncrementalAlterConfigsEntry added in v1.40.0

type IncrementalAlterConfigsEntry struct {
	Operation IncrementalAlterConfigsOperation
	Value     *string
}

type IncrementalAlterConfigsOperation added in v1.40.0

type IncrementalAlterConfigsOperation int8
const (
	IncrementalAlterConfigsOperationSet IncrementalAlterConfigsOperation = iota
	IncrementalAlterConfigsOperationDelete
	IncrementalAlterConfigsOperationAppend
	IncrementalAlterConfigsOperationSubtract
)

type IncrementalAlterConfigsRequest added in v1.40.0

type IncrementalAlterConfigsRequest struct {
	Version      int16
	Resources    []*IncrementalAlterConfigsResource
	ValidateOnly bool
}

IncrementalAlterConfigsRequest is an incremental alter config request type

type IncrementalAlterConfigsResource added in v1.40.0

type IncrementalAlterConfigsResource struct {
	Type          ConfigResourceType
	Name          string
	ConfigEntries map[string]IncrementalAlterConfigsEntry
}

type IncrementalAlterConfigsResponse added in v1.40.0

type IncrementalAlterConfigsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Resources    []*AlterConfigsResourceResponse
}

IncrementalAlterConfigsResponse is a response type for incremental alter config

type InitProducerIDRequest added in v1.16.0

type InitProducerIDRequest struct {
	Version            int16
	TransactionalID    *string
	TransactionTimeout time.Duration
	ProducerID         int64
	ProducerEpoch      int16
}

type InitProducerIDResponse added in v1.16.0

type InitProducerIDResponse struct {
	ThrottleTime  time.Duration
	Err           KError
	Version       int16
	ProducerID    int64
	ProducerEpoch int16
}

type IsolationLevel added in v1.14.0

type IsolationLevel int8
const (
	ReadUncommitted IsolationLevel = iota
	ReadCommitted
)

type JoinGroupRequest added in v1.7.0

type JoinGroupRequest struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// GroupId contains the group identifier.
	GroupId string
	// SessionTimeout specifies that the coordinator should consider the consumer
	// dead if it receives no heartbeat after this timeout in milliseconds.
	SessionTimeout int32
	// RebalanceTimeout contains the maximum time in milliseconds that the
	// coordinator will wait for each member to rejoin when rebalancing the
	// group.
	RebalanceTimeout int32
	// MemberId contains the member id assigned by the group coordinator.
	MemberId string
	// GroupInstanceId contains the unique identifier of the consumer instance
	// provided by end user.
	GroupInstanceId *string
	// ProtocolType contains the unique name the for class of protocols
	// implemented by the group we want to join.
	ProtocolType string
	// GroupProtocols contains the list of protocols that the member supports.
	// deprecated; use OrderedGroupProtocols
	GroupProtocols map[string][]byte
	// OrderedGroupProtocols contains an ordered list of protocols that the member
	// supports.
	OrderedGroupProtocols []*GroupProtocol
}

func (*JoinGroupRequest) AddGroupProtocol added in v1.7.0

func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte)

func (*JoinGroupRequest) AddGroupProtocolMetadata added in v1.8.0

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error

type JoinGroupResponse added in v1.7.0

type JoinGroupResponse struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ThrottleTime contains the duration for which the request was throttled due
	// to a quota violation, or zero if the request did not violate any quota.
	ThrottleTime int32
	// Err contains the error code, or 0 if there was no error.
	Err KError
	// GenerationId contains the generation ID of the group.
	GenerationId int32
	// GroupProtocol contains the group protocol selected by the coordinator.
	GroupProtocol string
	// LeaderId contains the leader of the group.
	LeaderId string
	// MemberId contains the member ID assigned by the group coordinator.
	MemberId string
	// Members contains the per-group-member information.
	Members []GroupMember
}

func (*JoinGroupResponse) GetMembers added in v1.8.0

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
	ErrUnknown                            KError = -1 // Errors.UNKNOWN_SERVER_ERROR
	ErrNoError                            KError = 0  // Errors.NONE
	ErrOffsetOutOfRange                   KError = 1  // Errors.OFFSET_OUT_OF_RANGE
	ErrInvalidMessage                     KError = 2  // Errors.CORRUPT_MESSAGE
	ErrUnknownTopicOrPartition            KError = 3  // Errors.UNKNOWN_TOPIC_OR_PARTITION
	ErrInvalidMessageSize                 KError = 4  // Errors.INVALID_FETCH_SIZE
	ErrLeaderNotAvailable                 KError = 5  // Errors.LEADER_NOT_AVAILABLE
	ErrNotLeaderForPartition              KError = 6  // Errors.NOT_LEADER_OR_FOLLOWER
	ErrRequestTimedOut                    KError = 7  // Errors.REQUEST_TIMED_OUT
	ErrBrokerNotAvailable                 KError = 8  // Errors.BROKER_NOT_AVAILABLE
	ErrReplicaNotAvailable                KError = 9  // Errors.REPLICA_NOT_AVAILABLE
	ErrMessageSizeTooLarge                KError = 10 // Errors.MESSAGE_TOO_LARGE
	ErrStaleControllerEpochCode           KError = 11 // Errors.STALE_CONTROLLER_EPOCH
	ErrOffsetMetadataTooLarge             KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE
	ErrNetworkException                   KError = 13 // Errors.NETWORK_EXCEPTION
	ErrOffsetsLoadInProgress              KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS
	ErrConsumerCoordinatorNotAvailable    KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE
	ErrNotCoordinatorForConsumer          KError = 16 // Errors.NOT_COORDINATOR
	ErrInvalidTopic                       KError = 17 // Errors.INVALID_TOPIC_EXCEPTION
	ErrMessageSetSizeTooLarge             KError = 18 // Errors.RECORD_LIST_TOO_LARGE
	ErrNotEnoughReplicas                  KError = 19 // Errors.NOT_ENOUGH_REPLICAS
	ErrNotEnoughReplicasAfterAppend       KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
	ErrInvalidRequiredAcks                KError = 21 // Errors.INVALID_REQUIRED_ACKS
	ErrIllegalGeneration                  KError = 22 // Errors.ILLEGAL_GENERATION
	ErrInconsistentGroupProtocol          KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL
	ErrInvalidGroupId                     KError = 24 // Errors.INVALID_GROUP_ID
	ErrUnknownMemberId                    KError = 25 // Errors.UNKNOWN_MEMBER_ID
	ErrInvalidSessionTimeout              KError = 26 // Errors.INVALID_SESSION_TIMEOUT
	ErrRebalanceInProgress                KError = 27 // Errors.REBALANCE_IN_PROGRESS
	ErrInvalidCommitOffsetSize            KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE
	ErrTopicAuthorizationFailed           KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED
	ErrGroupAuthorizationFailed           KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED
	ErrClusterAuthorizationFailed         KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED
	ErrInvalidTimestamp                   KError = 32 // Errors.INVALID_TIMESTAMP
	ErrUnsupportedSASLMechanism           KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM
	ErrIllegalSASLState                   KError = 34 // Errors.ILLEGAL_SASL_STATE
	ErrUnsupportedVersion                 KError = 35 // Errors.UNSUPPORTED_VERSION
	ErrTopicAlreadyExists                 KError = 36 // Errors.TOPIC_ALREADY_EXISTS
	ErrInvalidPartitions                  KError = 37 // Errors.INVALID_PARTITIONS
	ErrInvalidReplicationFactor           KError = 38 // Errors.INVALID_REPLICATION_FACTOR
	ErrInvalidReplicaAssignment           KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT
	ErrInvalidConfig                      KError = 40 // Errors.INVALID_CONFIG
	ErrNotController                      KError = 41 // Errors.NOT_CONTROLLER
	ErrInvalidRequest                     KError = 42 // Errors.INVALID_REQUEST
	ErrUnsupportedForMessageFormat        KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT
	ErrPolicyViolation                    KError = 44 // Errors.POLICY_VIOLATION
	ErrOutOfOrderSequenceNumber           KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER
	ErrDuplicateSequenceNumber            KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER
	ErrInvalidProducerEpoch               KError = 47 // Errors.INVALID_PRODUCER_EPOCH
	ErrInvalidTxnState                    KError = 48 // Errors.INVALID_TXN_STATE
	ErrInvalidProducerIDMapping           KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING
	ErrInvalidTransactionTimeout          KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT
	ErrConcurrentTransactions             KError = 51 // Errors.CONCURRENT_TRANSACTIONS
	ErrTransactionCoordinatorFenced       KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED
	ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
	ErrSecurityDisabled                   KError = 54 // Errors.SECURITY_DISABLED
	ErrOperationNotAttempted              KError = 55 // Errors.OPERATION_NOT_ATTEMPTED
	ErrKafkaStorageError                  KError = 56 // Errors.KAFKA_STORAGE_ERROR
	ErrLogDirNotFound                     KError = 57 // Errors.LOG_DIR_NOT_FOUND
	ErrSASLAuthenticationFailed           KError = 58 // Errors.SASL_AUTHENTICATION_FAILED
	ErrUnknownProducerID                  KError = 59 // Errors.UNKNOWN_PRODUCER_ID
	ErrReassignmentInProgress             KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS
	ErrDelegationTokenAuthDisabled        KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED
	ErrDelegationTokenNotFound            KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND
	ErrDelegationTokenOwnerMismatch       KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH
	ErrDelegationTokenRequestNotAllowed   KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
	ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
	ErrDelegationTokenExpired             KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED
	ErrInvalidPrincipalType               KError = 67 // Errors.INVALID_PRINCIPAL_TYPE
	ErrNonEmptyGroup                      KError = 68 // Errors.NON_EMPTY_GROUP
	ErrGroupIDNotFound                    KError = 69 // Errors.GROUP_ID_NOT_FOUND
	ErrFetchSessionIDNotFound             KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND
	ErrInvalidFetchSessionEpoch           KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH
	ErrListenerNotFound                   KError = 72 // Errors.LISTENER_NOT_FOUND
	ErrTopicDeletionDisabled              KError = 73 // Errors.TOPIC_DELETION_DISABLED
	ErrFencedLeaderEpoch                  KError = 74 // Errors.FENCED_LEADER_EPOCH
	ErrUnknownLeaderEpoch                 KError = 75 // Errors.UNKNOWN_LEADER_EPOCH
	ErrUnsupportedCompressionType         KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE
	ErrStaleBrokerEpoch                   KError = 77 // Errors.STALE_BROKER_EPOCH
	ErrOffsetNotAvailable                 KError = 78 // Errors.OFFSET_NOT_AVAILABLE
	ErrMemberIdRequired                   KError = 79 // Errors.MEMBER_ID_REQUIRED
	ErrPreferredLeaderNotAvailable        KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE
	ErrGroupMaxSizeReached                KError = 81 // Errors.GROUP_MAX_SIZE_REACHED
	ErrFencedInstancedId                  KError = 82 // Errors.FENCED_INSTANCE_ID
	ErrEligibleLeadersNotAvailable        KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
	ErrElectionNotNeeded                  KError = 84 // Errors.ELECTION_NOT_NEEDED
	ErrNoReassignmentInProgress           KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS
	ErrGroupSubscribedToTopic             KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC
	ErrInvalidRecord                      KError = 87 // Errors.INVALID_RECORD
	ErrUnstableOffsetCommit               KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT
	ErrThrottlingQuotaExceeded            KError = 89 // Errors.THROTTLING_QUOTA_EXCEEDED
	ErrProducerFenced                     KError = 90 // Errors.PRODUCER_FENCED
)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type KafkaGSSAPIHandler added in v1.40.0

type KafkaGSSAPIHandler struct {
	// contains filtered or unexported fields
}

func (*KafkaGSSAPIHandler) MockKafkaGSSAPI added in v1.40.0

func (h *KafkaGSSAPIHandler) MockKafkaGSSAPI(buffer []byte) []byte

type KafkaVersion added in v1.10.0

type KafkaVersion struct {
	// contains filtered or unexported fields
}

KafkaVersion instances represent versions of the upstream Kafka broker.

func ParseKafkaVersion added in v1.15.0

func ParseKafkaVersion(s string) (KafkaVersion, error)

ParseKafkaVersion parses and returns kafka version or error from a string

func (KafkaVersion) IsAtLeast added in v1.10.0

func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool

IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:

V1.IsAtLeast(V2) // false
V2.IsAtLeast(V1) // true

func (KafkaVersion) String added in v1.15.0

func (v KafkaVersion) String() string

type KerberosClient added in v1.40.0

type KerberosClient interface {
	Login() error
	GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)
	Domain() string
	CName() types.PrincipalName
	Destroy()
}

func NewKerberosClient added in v1.40.0

func NewKerberosClient(config *GSSAPIConfig) (KerberosClient, error)

NewKerberosClient creates kerberos client used to obtain TGT and TGS tokens. It uses pure go Kerberos 5 solution (RFC-4121 and RFC-4120). uses gokrb5 library underlying which is a pure go kerberos client with some GSS-API capabilities.

type KerberosGoKrb5Client added in v1.40.0

type KerberosGoKrb5Client struct {
	krb5client.Client
}

func (*KerberosGoKrb5Client) CName added in v1.40.0

func (*KerberosGoKrb5Client) Domain added in v1.40.0

func (c *KerberosGoKrb5Client) Domain() string

type LeaveGroupRequest added in v1.7.0

type LeaveGroupRequest struct {
	Version  int16
	GroupId  string
	MemberId string           // Removed in Version 3
	Members  []MemberIdentity // Added in Version 3
}

type LeaveGroupResponse added in v1.7.0

type LeaveGroupResponse struct {
	Version      int16
	ThrottleTime int32
	Err          KError
	Members      []MemberResponse
}

type ListGroupsRequest added in v1.7.0

type ListGroupsRequest struct {
	Version      int16
	StatesFilter []string // version 4 or later
}

type ListGroupsResponse added in v1.7.0

type ListGroupsResponse struct {
	Version      int16
	ThrottleTime int32
	Err          KError
	Groups       map[string]string
	GroupsData   map[string]GroupData // version 4 or later
}

type ListPartitionReassignmentsRequest added in v1.40.0

type ListPartitionReassignmentsRequest struct {
	TimeoutMs int32

	Version int16
	// contains filtered or unexported fields
}

func (*ListPartitionReassignmentsRequest) AddBlock added in v1.40.0

func (r *ListPartitionReassignmentsRequest) AddBlock(topic string, partitionIDs []int32)

type ListPartitionReassignmentsResponse added in v1.40.0

type ListPartitionReassignmentsResponse struct {
	Version        int16
	ThrottleTimeMs int32
	ErrorCode      KError
	ErrorMessage   *string
	TopicStatus    map[string]map[int32]*PartitionReplicaReassignmentsStatus
}

func (*ListPartitionReassignmentsResponse) AddBlock added in v1.40.0

func (r *ListPartitionReassignmentsResponse) AddBlock(topic string, partition int32, replicas, addingReplicas, removingReplicas []int32)

type MatchingAcl added in v1.16.0

type MatchingAcl struct {
	Err    KError
	ErrMsg *string
	Resource
	Acl
}

MatchingAcl is a matching acl type

type MemberIdentity added in v1.40.0

type MemberIdentity struct {
	MemberId        string
	GroupInstanceId *string
}

type MemberResponse added in v1.40.0

type MemberResponse struct {
	MemberId        string
	GroupInstanceId *string
	Err             KError
}

type Message

type Message struct {
	Codec            CompressionCodec // codec used to compress the message contents
	CompressionLevel int              // compression level
	LogAppendTime    bool             // the used timestamp is LogAppendTime
	Key              []byte           // the message key, may be nil
	Value            []byte           // the message contents
	Set              *MessageSet      // the message set a message might wrap
	Version          int8             // v1 requires Kafka 0.10
	Timestamp        time.Time        // the timestamp of the message (version 1+ only)
	// contains filtered or unexported fields
}

Message is a kafka message type

type MessageBlock

type MessageBlock struct {
	Offset int64
	Msg    *Message
}

func (*MessageBlock) Messages

func (msb *MessageBlock) Messages() []*MessageBlock

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet

type MessageSet struct {
	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
	OverflowMessage        bool // whether the set on the wire contained an overflow message
	Messages               []*MessageBlock
}

type MetadataRequest

type MetadataRequest struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// Topics contains the topics to fetch metadata for.
	Topics []string
	// AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so.
	AllowAutoTopicCreation             bool
	IncludeClusterAuthorizedOperations bool // version 8 and up
	IncludeTopicAuthorizedOperations   bool // version 8 and up
}

func NewMetadataRequest added in v1.40.0

func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest

type MetadataResponse

type MetadataResponse struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ThrottleTimeMs contains the duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
	ThrottleTimeMs int32
	// Brokers contains each broker in the response.
	Brokers []*Broker
	// ClusterID contains the cluster ID that responding broker belongs to.
	ClusterID *string
	// ControllerID contains the ID of the controller broker.
	ControllerID int32
	// Topics contains each topic in the response.
	Topics                      []*TopicMetadata
	ClusterAuthorizedOperations int32 // Only valid for Version >= 8
}

func (*MetadataResponse) AddBroker

func (r *MetadataResponse) AddBroker(addr string, id int32)

func (*MetadataResponse) AddTopic added in v1.1.0

func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata

func (*MetadataResponse) AddTopicPartition

func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError)

type MockAlterConfigsResponse added in v1.18.0

type MockAlterConfigsResponse struct {
	// contains filtered or unexported fields
}

func NewMockAlterConfigsResponse added in v1.18.0

func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse

func (*MockAlterConfigsResponse) For added in v1.18.0

func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockAlterConfigsResponseWithErrorCode added in v1.40.0

type MockAlterConfigsResponseWithErrorCode struct {
	// contains filtered or unexported fields
}

func NewMockAlterConfigsResponseWithErrorCode added in v1.40.0

func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode

func (*MockAlterConfigsResponseWithErrorCode) For added in v1.40.0

func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader

type MockAlterPartitionReassignmentsResponse added in v1.40.0

type MockAlterPartitionReassignmentsResponse struct {
	// contains filtered or unexported fields
}

func NewMockAlterPartitionReassignmentsResponse added in v1.40.0

func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse

func (*MockAlterPartitionReassignmentsResponse) For added in v1.40.0

func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockApiVersionsResponse added in v1.40.0

type MockApiVersionsResponse struct {
	// contains filtered or unexported fields
}

func NewMockApiVersionsResponse added in v1.40.0

func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse

func (*MockApiVersionsResponse) For added in v1.40.0

func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockApiVersionsResponse) SetApiKeys added in v1.40.0

type MockBroker added in v1.9.0

type MockBroker struct {
	// contains filtered or unexported fields
}

MockBroker is a mock Kafka broker that is used in unit tests. It is exposed to facilitate testing of higher level or specialized consumers and producers built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol, but rather provides a facility to do that. It takes care of the TCP transport, request unmarshalling, response marshaling, and makes it the test writer responsibility to program correct according to the Kafka API protocol MockBroker behavior.

MockBroker is implemented as a TCP server listening on a kernel-selected localhost port that can accept many connections. It reads Kafka requests from that connection and returns responses programmed by the SetHandlerByMap function. If a MockBroker receives a request that it has no programmed response for, then it returns nothing and the request times out.

A set of MockRequest builders to define mappings used by MockBroker is provided by Sarama. But users can develop MockRequests of their own and use them along with or instead of the standard ones.

When running tests with MockBroker it is strongly recommended to specify a timeout to `go test` so that if the broker hangs waiting for a response, the test panics.

It is not necessary to prefix message length or correlation ID to your response bytes, the server does that automatically as a convenience.

func NewMockBroker added in v1.9.0

func NewMockBroker(t TestReporter, brokerID int32) *MockBroker

NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the test framework and a channel of responses to use. If an error occurs it is simply logged to the TestReporter and the broker exits.

func NewMockBrokerAddr added in v1.9.0

func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker

NewMockBrokerAddr behaves like newMockBroker but listens on the address you give it rather than just some ephemeral port.

func NewMockBrokerListener added in v1.16.0

func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker

NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.

func (*MockBroker) Addr added in v1.9.0

func (b *MockBroker) Addr() string

Addr returns the broker connection string in the form "<address>:<port>".

func (*MockBroker) BrokerID added in v1.9.0

func (b *MockBroker) BrokerID() int32

BrokerID returns broker ID assigned to the broker.

func (*MockBroker) Close added in v1.9.0

func (b *MockBroker) Close()

Close terminates the broker blocking until it stops internal goroutines and releases all resources.

func (*MockBroker) History added in v1.9.0

func (b *MockBroker) History() []RequestResponse

History returns a slice of RequestResponse pairs in the order they were processed by the broker. Note that in case of multiple connections to the broker the order expected by a test can be different from the order recorded in the history, unless some synchronization is implemented in the test.

func (*MockBroker) Port added in v1.9.0

func (b *MockBroker) Port() int32

Port returns the TCP port number the broker is listening for requests on.

func (*MockBroker) Returns added in v1.9.0

func (b *MockBroker) Returns(e encoderWithHeader)

func (*MockBroker) SetGSSAPIHandler added in v1.40.0

func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc)

func (*MockBroker) SetHandlerByMap added in v1.9.0

func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse)

SetHandlerByMap defines mapping of Request types to MockResponses. When a request is received by the broker, it looks up the request type in the map and uses the found MockResponse instance to generate an appropriate reply. If the request type is not found in the map then nothing is sent.

func (*MockBroker) SetHandlerFuncByMap added in v1.41.2

func (b *MockBroker) SetHandlerFuncByMap(handlerMap map[string]requestHandlerFunc)

SetHandlerFuncByMap defines mapping of Request types to RequestHandlerFunc. When a request is received by the broker, it looks up the request type in the map and invoke the found RequestHandlerFunc instance to generate an appropriate reply.

func (*MockBroker) SetLatency added in v1.9.0

func (b *MockBroker) SetLatency(latency time.Duration)

SetLatency makes broker pause for the specified period every time before replying.

func (*MockBroker) SetNotifier added in v1.11.0

func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc)

SetNotifier set a function that will get invoked whenever a request has been processed successfully and will provide the number of bytes read and written

type MockConsumerMetadataResponse added in v1.9.0

type MockConsumerMetadataResponse struct {
	// contains filtered or unexported fields
}

MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.

func NewMockConsumerMetadataResponse added in v1.9.0

func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse

func (*MockConsumerMetadataResponse) For added in v1.9.0

func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockConsumerMetadataResponse) SetCoordinator added in v1.9.0

func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse

func (*MockConsumerMetadataResponse) SetError added in v1.9.0

type MockCreateAclsResponse added in v1.18.0

type MockCreateAclsResponse struct {
	// contains filtered or unexported fields
}

func NewMockCreateAclsResponse added in v1.18.0

func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse

func (*MockCreateAclsResponse) For added in v1.18.0

func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockCreateAclsResponseError added in v1.40.0

type MockCreateAclsResponseError struct {
	// contains filtered or unexported fields
}

func NewMockCreateAclsResponseWithError added in v1.40.0

func NewMockCreateAclsResponseWithError(t TestReporter) *MockCreateAclsResponseError

func (*MockCreateAclsResponseError) For added in v1.40.0

func (mr *MockCreateAclsResponseError) For(reqBody versionedDecoder) encoderWithHeader

type MockCreatePartitionsResponse added in v1.18.0

type MockCreatePartitionsResponse struct {
	// contains filtered or unexported fields
}

func NewMockCreatePartitionsResponse added in v1.18.0

func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse

func (*MockCreatePartitionsResponse) For added in v1.18.0

func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockCreateTopicsResponse added in v1.18.0

type MockCreateTopicsResponse struct {
	// contains filtered or unexported fields
}

func NewMockCreateTopicsResponse added in v1.18.0

func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse

func (*MockCreateTopicsResponse) For added in v1.18.0

func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockDeleteAclsResponse added in v1.18.0

type MockDeleteAclsResponse struct {
	// contains filtered or unexported fields
}

func NewMockDeleteAclsResponse added in v1.18.0

func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse

func (*MockDeleteAclsResponse) For added in v1.18.0

func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockDeleteGroupsResponse added in v1.40.0

type MockDeleteGroupsResponse struct {
	// contains filtered or unexported fields
}

func NewMockDeleteGroupsRequest added in v1.40.0

func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse

func (*MockDeleteGroupsResponse) For added in v1.40.0

func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockDeleteGroupsResponse) SetDeletedGroups added in v1.40.0

func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse

type MockDeleteOffsetResponse added in v1.40.0

type MockDeleteOffsetResponse struct {
	// contains filtered or unexported fields
}

func NewMockDeleteOffsetRequest added in v1.40.0

func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse

func (*MockDeleteOffsetResponse) For added in v1.40.0

func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockDeleteOffsetResponse) SetDeletedOffset added in v1.40.0

func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse

type MockDeleteRecordsResponse added in v1.18.0

type MockDeleteRecordsResponse struct {
	// contains filtered or unexported fields
}

func NewMockDeleteRecordsResponse added in v1.18.0

func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse

func (*MockDeleteRecordsResponse) For added in v1.18.0

func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockDeleteTopicsResponse added in v1.18.0

type MockDeleteTopicsResponse struct {
	// contains filtered or unexported fields
}

func NewMockDeleteTopicsResponse added in v1.18.0

func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse

func (*MockDeleteTopicsResponse) For added in v1.18.0

func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockDeleteTopicsResponse) SetError added in v1.41.0

type MockDescribeConfigsResponse added in v1.18.0

type MockDescribeConfigsResponse struct {
	// contains filtered or unexported fields
}

func NewMockDescribeConfigsResponse added in v1.18.0

func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse

func (*MockDescribeConfigsResponse) For added in v1.18.0

func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockDescribeConfigsResponseWithErrorCode added in v1.40.0

type MockDescribeConfigsResponseWithErrorCode struct {
	// contains filtered or unexported fields
}

func NewMockDescribeConfigsResponseWithErrorCode added in v1.40.0

func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode

func (*MockDescribeConfigsResponseWithErrorCode) For added in v1.40.0

func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader

type MockDescribeGroupsResponse added in v1.40.0

type MockDescribeGroupsResponse struct {
	// contains filtered or unexported fields
}

func NewMockDescribeGroupsResponse added in v1.40.0

func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse

func (*MockDescribeGroupsResponse) AddGroupDescription added in v1.40.0

func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse

func (*MockDescribeGroupsResponse) For added in v1.40.0

func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockDescribeLogDirsResponse added in v1.40.0

type MockDescribeLogDirsResponse struct {
	// contains filtered or unexported fields
}

func NewMockDescribeLogDirsResponse added in v1.40.0

func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse

func (*MockDescribeLogDirsResponse) For added in v1.40.0

func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockDescribeLogDirsResponse) SetLogDirs added in v1.40.0

func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse

type MockFetchResponse added in v1.9.0

type MockFetchResponse struct {
	// contains filtered or unexported fields
}

MockFetchResponse is a `FetchResponse` builder.

func NewMockFetchResponse added in v1.9.0

func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse

func (*MockFetchResponse) For added in v1.9.0

func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockFetchResponse) SetHighWaterMark added in v1.9.0

func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse

func (*MockFetchResponse) SetMessage added in v1.9.0

func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse

func (*MockFetchResponse) SetMessageWithKey added in v1.40.0

func (mfr *MockFetchResponse) SetMessageWithKey(topic string, partition int32, offset int64, key, msg Encoder) *MockFetchResponse

type MockFindCoordinatorResponse added in v1.17.0

type MockFindCoordinatorResponse struct {
	// contains filtered or unexported fields
}

MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.

func NewMockFindCoordinatorResponse added in v1.17.0

func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse

func (*MockFindCoordinatorResponse) For added in v1.17.0

func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockFindCoordinatorResponse) SetCoordinator added in v1.17.0

func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse

func (*MockFindCoordinatorResponse) SetError added in v1.17.0

func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse

type MockHeartbeatResponse added in v1.40.0

type MockHeartbeatResponse struct {
	Err KError
	// contains filtered or unexported fields
}

func NewMockHeartbeatResponse added in v1.40.0

func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse

func (*MockHeartbeatResponse) For added in v1.40.0

func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockHeartbeatResponse) SetError added in v1.40.0

type MockIncrementalAlterConfigsResponse added in v1.40.0

type MockIncrementalAlterConfigsResponse struct {
	// contains filtered or unexported fields
}

func NewMockIncrementalAlterConfigsResponse added in v1.40.0

func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse

func (*MockIncrementalAlterConfigsResponse) For added in v1.40.0

func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockIncrementalAlterConfigsResponseWithErrorCode added in v1.40.0

type MockIncrementalAlterConfigsResponseWithErrorCode struct {
	// contains filtered or unexported fields
}

func NewMockIncrementalAlterConfigsResponseWithErrorCode added in v1.40.0

func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode

func (*MockIncrementalAlterConfigsResponseWithErrorCode) For added in v1.40.0

func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader

type MockInitProducerIDResponse added in v1.42.0

type MockInitProducerIDResponse struct {
	// contains filtered or unexported fields
}

MockInitProducerIDResponse is an `InitPorducerIDResponse` builder.

func NewMockInitProducerIDResponse added in v1.42.0

func NewMockInitProducerIDResponse(t TestReporter) *MockInitProducerIDResponse

func (*MockInitProducerIDResponse) For added in v1.42.0

func (m *MockInitProducerIDResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockInitProducerIDResponse) SetError added in v1.42.0

func (*MockInitProducerIDResponse) SetProducerEpoch added in v1.42.0

func (m *MockInitProducerIDResponse) SetProducerEpoch(epoch int) *MockInitProducerIDResponse

func (*MockInitProducerIDResponse) SetProducerID added in v1.42.0

type MockJoinGroupResponse added in v1.40.0

type MockJoinGroupResponse struct {
	ThrottleTime  int32
	Err           KError
	GenerationId  int32
	GroupProtocol string
	LeaderId      string
	MemberId      string
	Members       []GroupMember
	// contains filtered or unexported fields
}

func NewMockJoinGroupResponse added in v1.40.0

func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse

func (*MockJoinGroupResponse) For added in v1.40.0

func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockJoinGroupResponse) SetError added in v1.40.0

func (*MockJoinGroupResponse) SetGenerationId added in v1.40.0

func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse

func (*MockJoinGroupResponse) SetGroupProtocol added in v1.40.0

func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse

func (*MockJoinGroupResponse) SetLeaderId added in v1.40.0

func (*MockJoinGroupResponse) SetMember added in v1.40.0

func (*MockJoinGroupResponse) SetMemberId added in v1.40.0

func (*MockJoinGroupResponse) SetThrottleTime added in v1.40.0

func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse

type MockKerberosClient added in v1.40.0

type MockKerberosClient struct {
	ASRep messages.ASRep
	// contains filtered or unexported fields
}

func (*MockKerberosClient) CName added in v1.40.0

func (*MockKerberosClient) Destroy added in v1.40.0

func (c *MockKerberosClient) Destroy()

func (*MockKerberosClient) Domain added in v1.40.0

func (c *MockKerberosClient) Domain() string

func (*MockKerberosClient) GetServiceTicket added in v1.40.0

func (c *MockKerberosClient) GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error)

func (*MockKerberosClient) Login added in v1.40.0

func (c *MockKerberosClient) Login() error

type MockLeaveGroupResponse added in v1.40.0

type MockLeaveGroupResponse struct {
	Err KError
	// contains filtered or unexported fields
}

func NewMockLeaveGroupResponse added in v1.40.0

func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse

func (*MockLeaveGroupResponse) For added in v1.40.0

func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockLeaveGroupResponse) SetError added in v1.40.0

type MockListAclsResponse added in v1.18.0

type MockListAclsResponse struct {
	// contains filtered or unexported fields
}

func NewMockListAclsResponse added in v1.18.0

func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse

func (*MockListAclsResponse) For added in v1.18.0

func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockListGroupsResponse added in v1.40.0

type MockListGroupsResponse struct {
	// contains filtered or unexported fields
}

func NewMockListGroupsResponse added in v1.40.0

func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse

func (*MockListGroupsResponse) AddGroup added in v1.40.0

func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse

func (*MockListGroupsResponse) For added in v1.40.0

func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockListPartitionReassignmentsResponse added in v1.40.0

type MockListPartitionReassignmentsResponse struct {
	// contains filtered or unexported fields
}

func NewMockListPartitionReassignmentsResponse added in v1.40.0

func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse

func (*MockListPartitionReassignmentsResponse) For added in v1.40.0

func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader

type MockMetadataResponse added in v1.9.0

type MockMetadataResponse struct {
	// contains filtered or unexported fields
}

MockMetadataResponse is a `MetadataResponse` builder.

func NewMockMetadataResponse added in v1.9.0

func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse

func (*MockMetadataResponse) For added in v1.9.0

func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockMetadataResponse) SetBroker added in v1.9.0

func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse

func (*MockMetadataResponse) SetController added in v1.17.0

func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse

func (*MockMetadataResponse) SetError added in v1.41.0

func (mmr *MockMetadataResponse) SetError(topic string, kerror KError) *MockMetadataResponse

func (*MockMetadataResponse) SetLeader added in v1.9.0

func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse

type MockOffsetCommitResponse added in v1.9.0

type MockOffsetCommitResponse struct {
	// contains filtered or unexported fields
}

MockOffsetCommitResponse is a `OffsetCommitResponse` builder.

func NewMockOffsetCommitResponse added in v1.9.0

func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse

func (*MockOffsetCommitResponse) For added in v1.9.0

func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockOffsetCommitResponse) SetError added in v1.9.0

func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse

type MockOffsetFetchResponse added in v1.9.0

type MockOffsetFetchResponse struct {
	// contains filtered or unexported fields
}

MockOffsetFetchResponse is a `OffsetFetchResponse` builder.

func NewMockOffsetFetchResponse added in v1.9.0

func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse

func (*MockOffsetFetchResponse) For added in v1.9.0

func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockOffsetFetchResponse) SetError added in v1.40.0

func (*MockOffsetFetchResponse) SetOffset added in v1.9.0

func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse

type MockOffsetResponse added in v1.9.0

type MockOffsetResponse struct {
	// contains filtered or unexported fields
}

MockOffsetResponse is an `OffsetResponse` builder.

func NewMockOffsetResponse added in v1.9.0

func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse

func (*MockOffsetResponse) For added in v1.9.0

func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockOffsetResponse) SetOffset added in v1.9.0

func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse

type MockProduceResponse added in v1.9.0

type MockProduceResponse struct {
	// contains filtered or unexported fields
}

MockProduceResponse is a `ProduceResponse` builder.

func NewMockProduceResponse added in v1.9.0

func NewMockProduceResponse(t TestReporter) *MockProduceResponse

func (*MockProduceResponse) For added in v1.9.0

func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockProduceResponse) SetError added in v1.9.0

func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse

func (*MockProduceResponse) SetVersion added in v1.16.0

func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse

type MockResponse added in v1.9.0

type MockResponse interface {
	For(reqBody versionedDecoder) (res encoderWithHeader)
}

MockResponse is a response builder interface it defines one method that allows generating a response based on a request body. MockResponses are used to program behavior of MockBroker in tests.

type MockSaslAuthenticateResponse added in v1.40.0

type MockSaslAuthenticateResponse struct {
	// contains filtered or unexported fields
}

func NewMockSaslAuthenticateResponse added in v1.40.0

func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse

func (*MockSaslAuthenticateResponse) For added in v1.40.0

func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockSaslAuthenticateResponse) SetAuthBytes added in v1.40.0

func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse

func (*MockSaslAuthenticateResponse) SetError added in v1.40.0

func (*MockSaslAuthenticateResponse) SetSessionLifetimeMs added in v1.40.0

func (msar *MockSaslAuthenticateResponse) SetSessionLifetimeMs(sessionLifetimeMs int64) *MockSaslAuthenticateResponse

type MockSaslHandshakeResponse added in v1.40.0

type MockSaslHandshakeResponse struct {
	// contains filtered or unexported fields
}

func NewMockSaslHandshakeResponse added in v1.40.0

func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse

func (*MockSaslHandshakeResponse) For added in v1.40.0

func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockSaslHandshakeResponse) SetEnabledMechanisms added in v1.40.0

func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse

func (*MockSaslHandshakeResponse) SetError added in v1.40.0

type MockSequence added in v1.9.0

type MockSequence struct {
	// contains filtered or unexported fields
}

MockSequence is a mock response builder that is created from a sequence of concrete responses. Every time when a `MockBroker` calls its `For` method the next response from the sequence is returned. When the end of the sequence is reached the last element from the sequence is returned.

func NewMockSequence added in v1.9.0

func NewMockSequence(responses ...interface{}) *MockSequence

func (*MockSequence) For added in v1.9.0

func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader)

type MockSyncGroupResponse added in v1.40.0

type MockSyncGroupResponse struct {
	Err              KError
	MemberAssignment []byte
	// contains filtered or unexported fields
}

func NewMockSyncGroupResponse added in v1.40.0

func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse

func (*MockSyncGroupResponse) For added in v1.40.0

func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader

func (*MockSyncGroupResponse) SetError added in v1.40.0

func (*MockSyncGroupResponse) SetMemberAssignment added in v1.40.0

func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse

type MockWrapper added in v1.9.0

type MockWrapper struct {
	// contains filtered or unexported fields
}

MockWrapper is a mock response builder that returns a particular concrete response regardless of the actual request passed to the `For` method.

func NewMockWrapper added in v1.9.0

func NewMockWrapper(res encoderWithHeader) *MockWrapper

func (*MockWrapper) For added in v1.9.0

func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader)

type OffsetCommitRequest

type OffsetCommitRequest struct {
	ConsumerGroup           string
	ConsumerGroupGeneration int32   // v1 or later
	ConsumerID              string  // v1 or later
	GroupInstanceId         *string // v7 or later
	RetentionTime           int64   // v2 or later

	// Version can be:
	// - 0 (kafka 0.8.1 and later)
	// - 1 (kafka 0.8.2 and later)
	// - 2 (kafka 0.9.0 and later)
	// - 3 (kafka 0.11.0 and later)
	// - 4 (kafka 2.0.0 and later)
	// - 5&6 (kafka 2.1.0 and later)
	// - 7 (kafka 2.3.0 and later)
	Version int16
	// contains filtered or unexported fields
}

func (*OffsetCommitRequest) AddBlock

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

func (*OffsetCommitRequest) AddBlockWithLeaderEpoch added in v1.40.0

func (r *OffsetCommitRequest) AddBlockWithLeaderEpoch(topic string, partitionID int32, offset int64, leaderEpoch int32, timestamp int64, metadata string)

func (*OffsetCommitRequest) Offset added in v1.17.0

func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	Version        int16
	ThrottleTimeMs int32
	Errors         map[string]map[int32]KError
}

func (*OffsetCommitResponse) AddError added in v1.5.0

func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError)

type OffsetFetchRequest

type OffsetFetchRequest struct {
	Version       int16
	ConsumerGroup string
	RequireStable bool // requires v7+
	// contains filtered or unexported fields
}

func NewOffsetFetchRequest added in v1.42.1

func NewOffsetFetchRequest(
	version KafkaVersion,
	group string,
	partitions map[string][]int32,
) *OffsetFetchRequest

func (*OffsetFetchRequest) AddPartition

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

func (*OffsetFetchRequest) ZeroPartitions added in v1.20.0

func (r *OffsetFetchRequest) ZeroPartitions()

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Version        int16
	ThrottleTimeMs int32
	Blocks         map[string]map[int32]*OffsetFetchResponseBlock
	Err            KError
}

func (*OffsetFetchResponse) AddBlock added in v1.5.0

func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock)

func (*OffsetFetchResponse) GetBlock added in v1.5.0

func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock

type OffsetFetchResponseBlock

type OffsetFetchResponseBlock struct {
	Offset      int64
	LeaderEpoch int32
	Metadata    string
	Err         KError
}

type OffsetManager added in v1.6.0

type OffsetManager interface {
	// ManagePartition creates a PartitionOffsetManager on the given topic/partition.
	// It will return an error if this OffsetManager is already managing the given
	// topic/partition.
	ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)

	// Close stops the OffsetManager from managing offsets. It is required to call
	// this function before an OffsetManager object passes out of scope, as it
	// will otherwise leak memory. You must call this after all the
	// PartitionOffsetManagers are closed.
	Close() error

	// Commit commits the offsets. This method can be used if AutoCommit.Enable is
	// set to false.
	Commit()
}

OffsetManager uses Kafka to store and fetch consumed partition offsets.

func NewOffsetManagerFromClient added in v1.6.0

func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error)

NewOffsetManagerFromClient creates a new OffsetManager from the given client. It is still necessary to call Close() on the underlying client when finished with the partition manager.

type OffsetRequest

type OffsetRequest struct {
	Version        int16
	IsolationLevel IsolationLevel
	// contains filtered or unexported fields
}

func (*OffsetRequest) AddBlock

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, timestamp int64, maxOffsets int32)

func (*OffsetRequest) ReplicaID added in v1.20.1

func (r *OffsetRequest) ReplicaID() int32

func (*OffsetRequest) SetReplicaID added in v1.20.1

func (r *OffsetRequest) SetReplicaID(id int32)

type OffsetResponse

type OffsetResponse struct {
	Version        int16
	ThrottleTimeMs int32
	Blocks         map[string]map[int32]*OffsetResponseBlock
}

func (*OffsetResponse) AddTopicPartition

func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)

func (*OffsetResponse) GetBlock

func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock

type OffsetResponseBlock

type OffsetResponseBlock struct {
	Err KError
	// Offsets contains the result offsets (for V0/V1 compatibility)
	Offsets []int64 // Version 0
	// Timestamp contains the timestamp associated with the returned offset.
	Timestamp int64 // Version 1
	// Offset contains the returned offset.
	Offset int64 // Version 1
	// LeaderEpoch contains the current leader epoch of the partition.
	LeaderEpoch int32
}

type OwnedPartition added in v1.40.0

type OwnedPartition struct {
	Topic      string
	Partitions []int32
}

type PacketDecodingError

type PacketDecodingError struct {
	Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (PacketDecodingError) Error

func (err PacketDecodingError) Error() string

type PacketEncodingError

type PacketEncodingError struct {
	Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

func (PacketEncodingError) Error

func (err PacketEncodingError) Error() string

type PartitionConsumer

type PartitionConsumer interface {
	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
	// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
	// this before calling Close on the underlying client.
	AsyncClose()

	// Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
	// the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
	// the Messages channel when this function is called, you will be competing with Close for messages; consider
	// calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
	// out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
	Close() error

	// Messages returns the read channel for the messages that are returned by
	// the broker.
	Messages() <-chan *ConsumerMessage

	// Errors returns a read channel of errors that occurred during consuming, if
	// enabled. By default, errors are logged and not returned over this channel.
	// If you want to implement any custom error handling, set your config's
	// Consumer.Return.Errors setting to true, and read from this channel.
	Errors() <-chan *ConsumerError

	// HighWaterMarkOffset returns the high water mark offset of the partition,
	// i.e. the offset that will be used for the next message that will be produced.
	// You can use this to determine how far behind the processing is.
	HighWaterMarkOffset() int64

	// Pause suspends fetching from this partition. Future calls to the broker will not return
	// any records from these partition until it have been resumed using Resume().
	// Note that this method does not affect partition subscription.
	// In particular, it does not cause a group rebalance when automatic assignment is used.
	Pause()

	// Resume resumes this partition which have been paused with Pause().
	// New calls to the broker will return records from these partitions if there are any to be fetched.
	// If the partition was not previously paused, this method is a no-op.
	Resume()

	// IsPaused indicates if this partition consumer is paused or not
	IsPaused() bool
}

PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out of scope.

The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported as out of range by the brokers. In this case you should decide what you want to do (try a different offset, notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying. By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.

To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will also drain the Messages channel, harvest all errors & return them once cleanup has completed.

type PartitionError added in v1.16.0

type PartitionError struct {
	Partition int32
	Err       KError
}

PartitionError is a partition error type

type PartitionMetadata

type PartitionMetadata struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// Err contains the partition error, or 0 if there was no error.
	Err KError
	// ID contains the partition index.
	ID int32
	// Leader contains the ID of the leader broker.
	Leader int32
	// LeaderEpoch contains the leader epoch of this partition.
	LeaderEpoch int32
	// Replicas contains the set of all nodes that host this partition.
	Replicas []int32
	// Isr contains the set of nodes that are in sync with the leader for this partition.
	Isr []int32
	// OfflineReplicas contains the set of offline replicas of this partition.
	OfflineReplicas []int32
}

PartitionMetadata contains each partition in the topic.

type PartitionOffsetManager added in v1.6.0

type PartitionOffsetManager interface {
	// NextOffset returns the next offset that should be consumed for the managed
	// partition, accompanied by metadata which can be used to reconstruct the state
	// of the partition consumer when it resumes. NextOffset() will return
	// `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
	// was committed for this partition yet.
	NextOffset() (int64, string)

	// MarkOffset marks the provided offset, alongside a metadata string
	// that represents the state of the partition consumer at that point in time. The
	// metadata string can be used by another consumer to restore that state, so it
	// can resume consumption.
	//
	// To follow upstream conventions, you are expected to mark the offset of the
	// next message to read, not the last message read. Thus, when calling `MarkOffset`
	// you should typically add one to the offset of the last consumed message.
	//
	// Note: calling MarkOffset does not necessarily commit the offset to the backend
	// store immediately for efficiency reasons, and it may never be committed if
	// your application crashes. This means that you may end up processing the same
	// message twice, and your processing should ideally be idempotent.
	MarkOffset(offset int64, metadata string)

	// ResetOffset resets to the provided offset, alongside a metadata string that
	// represents the state of the partition consumer at that point in time. Reset
	// acts as a counterpart to MarkOffset, the difference being that it allows to
	// reset an offset to an earlier or smaller value, where MarkOffset only
	// allows incrementing the offset. cf MarkOffset for more details.
	ResetOffset(offset int64, metadata string)

	// Errors returns a read channel of errors that occur during offset management, if
	// enabled. By default, errors are logged and not returned over this channel. If
	// you want to implement any custom error handling, set your config's
	// Consumer.Return.Errors setting to true, and read from this channel.
	Errors() <-chan *ConsumerError

	// AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
	// return immediately, after which you should wait until the 'errors' channel has
	// been drained and closed. It is required to call this function, or Close before
	// a consumer object passes out of scope, as it will otherwise leak memory. You
	// must call this before calling Close on the underlying client.
	AsyncClose()

	// Close stops the PartitionOffsetManager from managing offsets. It is required to
	// call this function (or AsyncClose) before a PartitionOffsetManager object
	// passes out of scope, as it will otherwise leak memory. You must call this
	// before calling Close on the underlying client.
	Close() error
}

PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close() on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes out of scope.

type PartitionOffsetMetadata added in v1.16.0

type PartitionOffsetMetadata struct {
	// Partition contains the index of the partition within the topic.
	Partition int32
	// Offset contains the message offset to be committed.
	Offset int64
	// LeaderEpoch contains the leader epoch of the last consumed record.
	LeaderEpoch int32
	// Metadata contains any associated metadata the client wants to keep.
	Metadata *string
}

type PartitionReplicaReassignmentsStatus added in v1.40.0

type PartitionReplicaReassignmentsStatus struct {
	Replicas         []int32
	AddingReplicas   []int32
	RemovingReplicas []int32
}

type Partitioner

type Partitioner interface {
	// Partition takes a message and partition count and chooses a partition
	Partition(message *ProducerMessage, numPartitions int32) (int32, error)

	// RequiresConsistency indicates to the user of the partitioner whether the
	// mapping of key->partition is consistent or not. Specifically, if a
	// partitioner requires consistency then it must be allowed to choose from all
	// partitions (even ones known to be unavailable), and its choice must be
	// respected by the caller. The obvious example is the HashPartitioner.
	RequiresConsistency() bool
}

Partitioner is anything that, given a Kafka message and a number of partitions indexed [0...numPartitions-1], decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided as simple default implementations.

Example (Manual)

This example shows how to assign partitions to your messages manually.

config := NewTestConfig()

// First, we tell the producer that we are going to partition ourselves.
config.Producer.Partitioner = NewManualPartitioner

producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
	log.Println(err)
	return
}
defer func() {
	if err := producer.Close(); err != nil {
		log.Println("Failed to close producer:", err)
	}
}()

// Now, we set the Partition field of the ProducerMessage struct.
msg := &ProducerMessage{Topic: "test", Partition: 6, Value: StringEncoder("test")}

partition, offset, err := producer.SendMessage(msg)
if err != nil {
	log.Println("Failed to produce message to kafka cluster.")
	return
}

if partition != 6 {
	log.Println("Message should have been produced to partition 6!")
	return
}

log.Printf("Produced message to partition %d with offset %d", partition, offset)
Output:

Example (Per_topic)

This example shows how to set a different partitioner depending on the topic.

config := NewTestConfig()
config.Producer.Partitioner = func(topic string) Partitioner {
	switch topic {
	case "access_log", "error_log":
		return NewRandomPartitioner(topic)

	default:
		return NewHashPartitioner(topic)
	}
}

// ...
Output:

Example (Random)

By default, Sarama uses the message's key to consistently assign a partition to a message using hashing. If no key is set, a random partition will be chosen. This example shows how you can partition messages randomly, even when a key is set, by overriding Config.Producer.Partitioner.

config := NewTestConfig()
config.Producer.Partitioner = NewRandomPartitioner

producer, err := NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
	log.Println(err)
	return
}
defer func() {
	if err := producer.Close(); err != nil {
		log.Println("Failed to close producer:", err)
	}
}()

msg := &ProducerMessage{Topic: "test", Key: StringEncoder("key is set"), Value: StringEncoder("test")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
	log.Println("Failed to produce message to kafka cluster.")
	return
}

log.Printf("Produced message to partition %d with offset %d", partition, offset)
Output:

func NewConsistentCRCHashPartitioner added in v1.41.0

func NewConsistentCRCHashPartitioner(topic string) Partitioner

NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash of the encoded bytes of the message key modulus the number of partitions. This is compatible with librdkafka's `consistent_random` partitioner

func NewHashPartitioner

func NewHashPartitioner(topic string) Partitioner

NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, modulus the number of partitions. This ensures that messages with the same key always end up on the same partition.

func NewManualPartitioner

func NewManualPartitioner(topic string) Partitioner

NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided ProducerMessage's Partition field as the partition to produce to.

func NewRandomPartitioner

func NewRandomPartitioner(topic string) Partitioner

NewRandomPartitioner returns a Partitioner which chooses a random partition each time.

func NewReferenceHashPartitioner added in v1.18.0

func NewReferenceHashPartitioner(topic string) Partitioner

NewReferenceHashPartitioner is like NewHashPartitioner except that it handles absolute values in the same way as the reference Java implementation. NewHashPartitioner was supposed to do that but it had a mistake and now there are people depending on both behaviors. This will all go away on the next major version bump.

func NewRoundRobinPartitioner

func NewRoundRobinPartitioner(topic string) Partitioner

NewRoundRobinPartitioner returns a Partitioner which walks through the available partitions one at a time.

type PartitionerConstructor

type PartitionerConstructor func(topic string) Partitioner

PartitionerConstructor is the type for a function capable of constructing new Partitioners.

func NewCustomHashPartitioner added in v1.12.0

func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor

NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. The argument is a function providing the instance, implementing the hash.Hash32 interface. This is to ensure that each partition dispatcher gets its own hasher, to avoid concurrency issues by sharing an instance.

func NewCustomPartitioner added in v1.18.0

func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor

NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options

type ProduceCallback added in v1.40.0

type ProduceCallback func(*ProduceResponse, error)

ProduceCallback function is called once the produce response has been parsed or could not be read.

type ProduceRequest

type ProduceRequest struct {
	TransactionalID *string
	RequiredAcks    RequiredAcks
	Timeout         int32
	Version         int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
	// contains filtered or unexported fields
}

func (*ProduceRequest) AddBatch added in v1.14.0

func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch)

func (*ProduceRequest) AddMessage

func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

func (*ProduceRequest) AddSet

func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)

type ProduceResponse

type ProduceResponse struct {
	Blocks       map[string]map[int32]*ProduceResponseBlock // v0, responses
	Version      int16
	ThrottleTime time.Duration // v1, throttle_time_ms
}

func (*ProduceResponse) AddTopicPartition

func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)

func (*ProduceResponse) GetBlock

func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock

type ProduceResponseBlock

type ProduceResponseBlock struct {
	Err         KError    // v0, error_code
	Offset      int64     // v0, base_offset
	Timestamp   time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
	StartOffset int64     // v5, log_start_offset
}

partition_responses in protocol

type ProducerError

type ProducerError struct {
	Msg *ProducerMessage
	Err error
}

ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.

func (ProducerError) Error

func (pe ProducerError) Error() string

func (ProducerError) Unwrap added in v1.40.0

func (pe ProducerError) Unwrap() error

type ProducerErrors

type ProducerErrors []*ProducerError

ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface. It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel when closing a producer.

func (ProducerErrors) Error

func (pe ProducerErrors) Error() string

type ProducerInterceptor added in v1.40.0

type ProducerInterceptor interface {

	// OnSend is called when the producer message is intercepted. Please avoid
	// modifying the message until it's safe to do so, as this is _not_ a copy
	// of the message.
	OnSend(*ProducerMessage)
}

ProducerInterceptor allows you to intercept (and possibly mutate) the records received by the producer before they are published to the Kafka cluster. https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation

type ProducerMessage

type ProducerMessage struct {
	Topic string // The Kafka topic for this message.
	// The partitioning key for this message. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Key Encoder
	// The actual message to store in Kafka. Pre-existing Encoders include
	// StringEncoder and ByteEncoder.
	Value Encoder

	// The headers are key-value pairs that are transparently passed
	// by Kafka between producers and consumers.
	Headers []RecordHeader

	// This field is used to hold arbitrary data you wish to include so it
	// will be available when receiving on the Successes and Errors channels.
	// Sarama completely ignores this field and is only to be used for
	// pass-through data.
	Metadata interface{}

	// Offset is the offset of the message stored on the broker. This is only
	// guaranteed to be defined if the message was successfully delivered and
	// RequiredAcks is not NoResponse.
	Offset int64
	// Partition is the partition that the message was sent to. This is only
	// guaranteed to be defined if the message was successfully delivered.
	Partition int32
	// Timestamp can vary in behavior depending on broker configuration, being
	// in either one of the CreateTime or LogAppendTime modes (default CreateTime),
	// and requiring version at least 0.10.0.
	//
	// When configured to CreateTime, the timestamp is specified by the producer
	// either by explicitly setting this field, or when the message is added
	// to a produce set.
	//
	// When configured to LogAppendTime, the timestamp assigned to the message
	// by the broker. This is only guaranteed to be defined if the message was
	// successfully delivered and RequiredAcks is not NoResponse.
	Timestamp time.Time
	// contains filtered or unexported fields
}

ProducerMessage is the collection of elements passed to the Producer in order to send a message.

func (*ProducerMessage) ByteSize added in v1.40.0

func (m *ProducerMessage) ByteSize(version int) int

type ProducerTxnStatusFlag added in v1.40.0

type ProducerTxnStatusFlag int16

ProducerTxnStatusFlag mark current transaction status.

const (
	// ProducerTxnFlagUninitialized when txnmgr is created
	ProducerTxnFlagUninitialized ProducerTxnStatusFlag = 1 << iota
	// ProducerTxnFlagInitializing when txnmgr is initializing
	ProducerTxnFlagInitializing
	// ProducerTxnFlagReady when is ready to receive transaction
	ProducerTxnFlagReady
	// ProducerTxnFlagInTransaction when transaction is started
	ProducerTxnFlagInTransaction
	// ProducerTxnFlagEndTransaction when transaction will be committed
	ProducerTxnFlagEndTransaction
	// ProducerTxnFlagInError when having abortable or fatal error
	ProducerTxnFlagInError
	// ProducerTxnFlagCommittingTransaction when committing txn
	ProducerTxnFlagCommittingTransaction
	// ProducerTxnFlagAbortingTransaction when committing txn
	ProducerTxnFlagAbortingTransaction
	// ProducerTxnFlagAbortableError when producer encounter an abortable error
	// Must call AbortTxn in this case.
	ProducerTxnFlagAbortableError
	// ProducerTxnFlagFatalError when producer encounter an fatal error
	// Must Close an recreate it.
	ProducerTxnFlagFatalError
)

func (ProducerTxnStatusFlag) String added in v1.40.0

func (s ProducerTxnStatusFlag) String() string

type QuotaEntityComponent added in v1.40.0

type QuotaEntityComponent struct {
	EntityType QuotaEntityType
	MatchType  QuotaMatchType
	Name       string
}

type QuotaEntityType added in v1.40.0

type QuotaEntityType string

type QuotaFilterComponent added in v1.40.0

type QuotaFilterComponent struct {
	EntityType QuotaEntityType
	MatchType  QuotaMatchType
	Match      string
}

Describe a component for applying a client quota filter. EntityType: the entity type the filter component applies to ("user", "client-id", "ip") MatchType: the match type of the filter component (any, exact, default) Match: the name that's matched exactly (used when MatchType is QuotaMatchExact)

type QuotaMatchType added in v1.40.0

type QuotaMatchType int

type Record added in v1.14.0

type Record struct {
	Headers []*RecordHeader

	Attributes     int8
	TimestampDelta time.Duration
	OffsetDelta    int64
	Key            []byte
	Value          []byte
	// contains filtered or unexported fields
}

Record is kafka record type

type RecordBatch added in v1.14.0

type RecordBatch struct {
	FirstOffset           int64
	PartitionLeaderEpoch  int32
	Version               int8
	Codec                 CompressionCodec
	CompressionLevel      int
	Control               bool
	LogAppendTime         bool
	LastOffsetDelta       int32
	FirstTimestamp        time.Time
	MaxTimestamp          time.Time
	ProducerID            int64
	ProducerEpoch         int16
	FirstSequence         int32
	Records               []*Record
	PartialTrailingRecord bool
	IsTransactional       bool
	// contains filtered or unexported fields
}

func (*RecordBatch) LastOffset added in v1.40.0

func (b *RecordBatch) LastOffset() int64

type RecordHeader added in v1.14.0

type RecordHeader struct {
	Key   []byte
	Value []byte
}

RecordHeader stores key and value for a record header

type Records added in v1.14.0

type Records struct {
	MsgSet      *MessageSet
	RecordBatch *RecordBatch
	// contains filtered or unexported fields
}

Records implements a union type containing either a RecordBatch or a legacy MessageSet.

type RequestNotifierFunc added in v1.11.0

type RequestNotifierFunc func(bytesRead, bytesWritten int)

RequestNotifierFunc is invoked when a mock broker processes a request successfully and will provides the number of bytes read and written.

type RequestResponse added in v1.9.0

type RequestResponse struct {
	Request  protocolBody
	Response encoder
}

RequestResponse represents a Request/Response pair processed by MockBroker.

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
)

type Resource added in v1.16.0

type Resource struct {
	ResourceType        AclResourceType
	ResourceName        string
	ResourcePatternType AclResourcePatternType
}

Resource holds information about acl resource type

type ResourceAcls added in v1.16.0

type ResourceAcls struct {
	Resource
	Acls []*Acl
}

ResourceAcls is an acl resource type

type ResourceResponse added in v1.16.0

type ResourceResponse struct {
	ErrorCode int16
	ErrorMsg  string
	Type      ConfigResourceType
	Name      string
	Configs   []*ConfigEntry
}

type SASLMechanism added in v1.40.0

type SASLMechanism string

SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker

type SCRAMClient added in v1.40.0

type SCRAMClient interface {
	// Begin prepares the client for the SCRAM exchange
	// with the server with a user name and a password
	Begin(userName, password, authzID string) error
	// Step steps client through the SCRAM exchange. It is
	// called repeatedly until it errors or `Done` returns true.
	Step(challenge string) (response string, err error)
	// Done should return true when the SCRAM conversation
	// is over.
	Done() bool
}

SCRAMClient is a an interface to a SCRAM client implementation.

type SaslAuthenticateRequest added in v1.40.0

type SaslAuthenticateRequest struct {
	// Version defines the protocol version to use for encode and decode
	Version       int16
	SaslAuthBytes []byte
}

type SaslAuthenticateResponse added in v1.40.0

type SaslAuthenticateResponse struct {
	// Version defines the protocol version to use for encode and decode
	Version           int16
	Err               KError
	ErrorMessage      *string
	SaslAuthBytes     []byte
	SessionLifetimeMs int64
}

type SaslHandshakeRequest added in v1.10.0

type SaslHandshakeRequest struct {
	Mechanism string
	Version   int16
}

type SaslHandshakeResponse added in v1.10.0

type SaslHandshakeResponse struct {
	Version           int16
	Err               KError
	EnabledMechanisms []string
}

type ScramMechanismType added in v1.40.0

type ScramMechanismType int8
const (
	SCRAM_MECHANISM_UNKNOWN ScramMechanismType = iota // 0
	SCRAM_MECHANISM_SHA_256                           // 1
	SCRAM_MECHANISM_SHA_512                           // 2
)

func (ScramMechanismType) String added in v1.40.0

func (s ScramMechanismType) String() string

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

StdLogger is used to log error messages.

var DebugLogger StdLogger = &debugLogger{}

DebugLogger is the instance of a StdLogger that Sarama writes more verbose debug information to. By default it is set to redirect all debug to the default Logger above, but you can optionally set it to another StdLogger instance to (e.g.,) discard debug information

type StickyAssignorUserData added in v1.40.0

type StickyAssignorUserData interface {
	// contains filtered or unexported methods
}

type StickyAssignorUserDataV0 added in v1.40.0

type StickyAssignorUserDataV0 struct {
	Topics map[string][]int32
	// contains filtered or unexported fields
}

StickyAssignorUserDataV0 holds topic partition information for an assignment

type StickyAssignorUserDataV1 added in v1.40.0

type StickyAssignorUserDataV1 struct {
	Topics     map[string][]int32
	Generation int32
	// contains filtered or unexported fields
}

StickyAssignorUserDataV1 holds topic partition information for an assignment

type StringEncoder

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

func (StringEncoder) Encode

func (s StringEncoder) Encode() ([]byte, error)

func (StringEncoder) Length

func (s StringEncoder) Length() int

type SyncGroupRequest added in v1.7.0

type SyncGroupRequest struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// GroupId contains the unique group identifier.
	GroupId string
	// GenerationId contains the generation of the group.
	GenerationId int32
	// MemberId contains the member ID assigned by the group.
	MemberId string
	// GroupInstanceId contains the unique identifier of the consumer instance provided by end user.
	GroupInstanceId *string
	// GroupAssignments contains each assignment.
	GroupAssignments []SyncGroupRequestAssignment
}

func (*SyncGroupRequest) AddGroupAssignment added in v1.7.0

func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte)

func (*SyncGroupRequest) AddGroupAssignmentMember added in v1.8.0

func (r *SyncGroupRequest) AddGroupAssignmentMember(
	memberId string,
	memberAssignment *ConsumerGroupMemberAssignment,
) error

type SyncGroupRequestAssignment added in v1.40.0

type SyncGroupRequestAssignment struct {
	// MemberId contains the ID of the member to assign.
	MemberId string
	// Assignment contains the member assignment.
	Assignment []byte
}

type SyncGroupResponse added in v1.7.0

type SyncGroupResponse struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ThrottleTime contains the duration in milliseconds for which the
	// request was throttled due to a quota violation, or zero if the request
	// did not violate any quota.
	ThrottleTime int32
	// Err contains the error code, or 0 if there was no error.
	Err KError
	// MemberAssignment contains the member assignment.
	MemberAssignment []byte
}

func (*SyncGroupResponse) GetMemberAssignment added in v1.8.0

func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

type SyncProducer

type SyncProducer interface {

	// SendMessage produces a given message, and returns only when it either has
	// succeeded or failed to produce. It will return the partition and the offset
	// of the produced message, or an error if the message failed to produce.
	SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)

	// SendMessages produces a given set of messages, and returns only when all
	// messages in the set have either succeeded or failed. Note that messages
	// can succeed and fail individually; if some succeed and some fail,
	// SendMessages will return an error.
	SendMessages(msgs []*ProducerMessage) error

	// Close shuts down the producer; you must call this function before a producer
	// object passes out of scope, as it may otherwise leak memory.
	// You must call this before calling Close on the underlying client.
	Close() error

	// TxnStatus return current producer transaction status.
	TxnStatus() ProducerTxnStatusFlag

	// IsTransactional return true when current producer is transactional.
	IsTransactional() bool

	// BeginTxn mark current transaction as ready.
	BeginTxn() error

	// CommitTxn commit current transaction.
	CommitTxn() error

	// AbortTxn abort current transaction.
	AbortTxn() error

	// AddOffsetsToTxn add associated offsets to current transaction.
	AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error

	// AddMessageToTxn add message offsets to current transaction.
	AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
}

SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.

The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`. There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.

For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to be set to true in its configuration.

Example

This example shows the basic usage pattern of the SyncProducer.

producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
	log.Fatalln(err)
}
defer func() {
	if err := producer.Close(); err != nil {
		log.Fatalln(err)
	}
}()

msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
	log.Printf("FAILED to send message: %s\n", err)
} else {
	log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
Output:

func NewSyncProducer

func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error)

NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.

func NewSyncProducerFromClient

func NewSyncProducerFromClient(client Client) (SyncProducer, error)

NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still necessary to call Close() on the underlying client when shutting down this producer.

type TestReporter added in v1.9.0

type TestReporter interface {
	Error(...interface{})
	Errorf(string, ...interface{})
	Fatal(...interface{})
	Fatalf(string, ...interface{})
	Helper()
}

TestReporter has methods matching go's testing.T to avoid importing `testing` in the main part of the library.

type Timestamp added in v1.14.0

type Timestamp struct {
	*time.Time
}

type TopicDetail added in v1.16.0

type TopicDetail struct {
	// NumPartitions contains the number of partitions to create in the topic, or
	// -1 if we are either specifying a manual partition assignment or using the
	// default partitions.
	NumPartitions int32
	// ReplicationFactor contains the number of replicas to create for each
	// partition in the topic, or -1 if we are either specifying a manual
	// partition assignment or using the default replication factor.
	ReplicationFactor int16
	// ReplicaAssignment contains the manual partition assignment, or the empty
	// array if we are using automatic assignment.
	ReplicaAssignment map[int32][]int32
	// ConfigEntries contains the custom topic configurations to set.
	ConfigEntries map[string]*string
}

type TopicError added in v1.16.0

type TopicError struct {
	Err    KError
	ErrMsg *string
}

func (*TopicError) Error added in v1.40.0

func (t *TopicError) Error() string

func (*TopicError) Unwrap added in v1.40.0

func (t *TopicError) Unwrap() error

type TopicMetadata

type TopicMetadata struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// Err contains the topic error, or 0 if there was no error.
	Err KError
	// Name contains the topic name.
	Name string
	Uuid Uuid
	// IsInternal contains a True if the topic is internal.
	IsInternal bool
	// Partitions contains each partition in the topic.
	Partitions                []*PartitionMetadata
	TopicAuthorizedOperations int32 // Only valid for Version >= 8
}

TopicMetadata contains each topic in the response.

type TopicPartition added in v1.15.0

type TopicPartition struct {
	Count      int32
	Assignment [][]int32
}

type TopicPartitionError added in v1.15.0

type TopicPartitionError struct {
	Err    KError
	ErrMsg *string
}

func (*TopicPartitionError) Error added in v1.40.0

func (t *TopicPartitionError) Error() string

func (*TopicPartitionError) Unwrap added in v1.40.0

func (t *TopicPartitionError) Unwrap() error

type TxnOffsetCommitRequest added in v1.16.0

type TxnOffsetCommitRequest struct {
	Version         int16
	TransactionalID string
	GroupID         string
	ProducerID      int64
	ProducerEpoch   int16
	Topics          map[string][]*PartitionOffsetMetadata
}

type TxnOffsetCommitResponse added in v1.16.0

type TxnOffsetCommitResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Topics       map[string][]*PartitionError
}

type UserScramCredentialsResponseInfo added in v1.40.0

type UserScramCredentialsResponseInfo struct {
	Mechanism  ScramMechanismType
	Iterations int32
}

type Uuid added in v1.41.0

type Uuid [16]byte

func (Uuid) String added in v1.41.0

func (u Uuid) String() string

type ZstdDecoderParams added in v1.40.0

type ZstdDecoderParams struct {
}

type ZstdEncoderParams added in v1.40.0

type ZstdEncoderParams struct {
	Level int
}

Source Files

Directories

Path Synopsis
examples
exactly_once Module
internal
Package mocks provides mocks that can be used for testing applications that use Sarama.
Package mocks provides mocks that can be used for testing applications that use Sarama.
tools
tls

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL