kgo

package
v0.6.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 20, 2021 License: BSD-3-Clause Imports: 31 Imported by: 364

Documentation

Overview

Package kgo provides a pure Go efficient Kafka client for Kafka 0.8.0+ with support for transactions, regex topic consuming, the latest partition strategies, and more. This client aims to support all KIPs.

This client aims to be simple to use while still interacting with Kafka in a near ideal way. If any of this client is confusing, please raise GitHub issues so we can make this clearer.

For more overview of the entire client itself, please see the package source's README.

Note that the default group consumer balancing strategy is "cooperative-sticky", which is incompatible with the historical (pre 2.4.0) balancers. If you are planning to work with an older Kafka or in an existing consumer group that uses eager balancers, be sure to use the Balancers option when assigning a group. See the documentation on balancers for more information.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnknownRequestKey is returned when using a kmsg.Request with a
	// key larger than kmsg.MaxKey.
	ErrUnknownRequestKey = errors.New("request key is unknown")

	// ErrClientTooOld is returned when issuing request that are unknown or
	// use an unknown version.
	ErrClientTooOld = errors.New("client is too old; this client does not know what to do with this request")

	// ErrBrokerTooOld is returned if a connection has loaded broker
	// ApiVersions and knows that a broker cannot handle the request that
	// is attempting to be issued.
	ErrBrokerTooOld = errors.New("broker is too old; the broker has already indicated it will not know how to handle the request")

	// ErrNoResp is the error used if Kafka does not reply to a topic or
	// partition in a produce request. This error should never be seen.
	ErrNoResp = errors.New("message was not replied to in a response")

	// ErrUnknownBroker is returned when issuing a request to a broker that
	// the client does not know about.
	ErrUnknownBroker = errors.New("unknown broker")

	// ErrBrokerDead is a temporary error returned when a broker chosen for
	// a request is stopped due to a concurrent metadata response.
	ErrBrokerDead = errors.New("broker has died - the broker id either migrated or no longer exists")

	// ErrNoDial is a temporary error returned when a dial to a broker
	// errors.
	ErrNoDial = errors.New("unable to dial the broker")

	// ErrConnDead is a temporary error returned when any read or write to
	// a broker connection errors.
	ErrConnDead = errors.New("connection is dead")

	// ErrInvalidRespSize is a potentially temporary error returned when
	// the client reads an invalid message response size from Kafka.
	//
	// If this error happens, the client closes the broker connection.
	// This error is potentially retriable; maybe the broker will send
	// less data next time, but it is unlikely.
	ErrInvalidRespSize = errors.New("invalid response size less than zero")

	// ErrInvalidResp is a generic error used when Kafka responded
	// unexpectedly.
	ErrInvalidResp = errors.New("invalid response")

	// ErrCorrelationIDMismatch is a temporary error returned when Kafka
	// replies with a different correlation ID than we were expecting for
	// the request the client issued.
	//
	// If this error happens, the client closes the broker connection.
	ErrCorrelationIDMismatch = errors.New("correlation ID mismatch")

	// ErrNoPartitionsAvailable is returned immediately when producing a
	// non-consistent record to a topic that has no writable partitions.
	ErrNoPartitionsAvailable = errors.New("no partitions available")

	// ErrPartitionDeleted is returned when a partition that was being
	// written to disappears in a metadata update.
	//
	// Kafka does not allow downsizing partition counts in Kafka, so this
	// error should generally not appear. This will only appear if a topic
	// is deleted and recreated with fewer partitions.
	ErrPartitionDeleted = errors.New("partition no longer exists")

	// ErrInvalidPartition is returned if the partitioner chooses a
	// partition that does not exist (returns a partition larger than what
	// was available).
	ErrInvalidPartition = errors.New("invalid partition chosen from partitioner")

	// ErrRecordTimeout is returned when records are unable to be produced
	// and they hit the configured record timeout limit.
	ErrRecordTimeout = errors.New("records have timed out before they were able to be produced")

	// ErrMaxBuffered is returned when producing with manual flushing
	// enabled and the maximum amount of records are buffered.
	ErrMaxBuffered = errors.New("manual flushing is enabled and the maximum amount of records are buffered, cannot buffer more")

	// ErrNotGroup is returned when trying to call group functions when the
	// client is not assigned a group.
	ErrNotGroup = errors.New("invalid group function call when not assigned a group")

	// ErrNotTransactional is returned when trying to begin a transaction
	// with a client that does not have a transactional ID.
	ErrNotTransactional = errors.New("invalid attempt to begin a transaction with a non-transactional client")

	// ErrAlreadyInTransaction is returned if trying to begin a transaction
	// while the producer is already in a transaction.
	ErrAlreadyInTransaction = errors.New("invalid attempt to begin a transaction while already in a transaction")

	// ErrNotInTransaction is returned when trying to produce a record
	// outside of a transaction.
	ErrNotInTransaction = errors.New("cannot produce record transactionally if not in a transaction")

	// ErrAborting is returned for all buffered records while
	// AbortBufferedRecords is being called.
	ErrAborting = errors.New("client is aborting buffered records")

	// ErrCommitWithFatalID is returned when trying to commit in
	// EndTransaction with a producer ID that has failed.
	ErrCommitWithFatalID = errors.New("cannot commit with a fatal producer id; retry with an abort")
)

Functions

This section is empty.

Types

type Acks

type Acks struct {
	// contains filtered or unexported fields
}

Acks represents the number of acks a broker leader must have before a produce request is considered complete.

This controls the durability of written records and corresponds to "acks" in Kafka's Producer Configuration documentation.

The default is LeaderAck.

func AllISRAcks

func AllISRAcks() Acks

AllISRAcks ensures that all in-sync replicas have acknowledged they wrote a record before the leader replies success.

func LeaderAck

func LeaderAck() Acks

LeaderAck causes Kafka to reply that a record is written after only the leader has written a message. The leader does not wait for in-sync replica replies.

func NoAck

func NoAck() Acks

NoAck considers records sent as soon as they are written on the wire. The leader does not reply to records.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker pairs a broker ID with a client to directly issue requests to a specific broker.

func (*Broker) Request

func (b *Broker) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error)

Request issues a request to a broker. If the broker does not exist in the client, this returns ErrUnknownBroker. Requests are not retried.

The passed context can be used to cancel a request and return early. Note that if the request is not canceled before it is written to Kafka, you may just end up canceling and not receiving the response to what Kafka inevitably does.

It is more beneficial to always use RetriableRequest.

func (*Broker) RetriableRequest added in v0.6.2

func (b *Broker) RetriableRequest(ctx context.Context, req kmsg.Request) (kmsg.Response, error)

RetriableRequest issues a request to a broker the same as Broker, but retries in the face of retriable broker connection errors. This does not retry on response internal errors.

type BrokerConnectHook

type BrokerConnectHook interface {
	// OnConnect is passed the broker metadata, how long it took to dial,
	// and either the dial's resulting net.Conn or error.
	OnConnect(meta BrokerMetadata, dialDur time.Duration, conn net.Conn, err error)
}

BrokerConnectHook is called after a connection to a broker is opened.

type BrokerDisconnectHook

type BrokerDisconnectHook interface {
	// OnDisconnect is passed the broker metadata and the connection that
	// is closing.
	OnDisconnect(meta BrokerMetadata, conn net.Conn)
}

BrokerDisconnectHook is called when a connection to a broker is closed.

type BrokerMetadata

type BrokerMetadata struct {
	// NodeID is the broker node ID.
	//
	// Seed brokers will have very negative IDs; kgo does not try to map
	// seed brokers to loaded brokers.
	NodeID int32

	// Port is the port of the broker.
	Port int32

	// Host is the hostname of the broker.
	Host string

	// Rack is an optional rack of the broker. It is invalid to modify this
	// field.
	//
	// Seed brokers will not have a rack.
	Rack *string
	// contains filtered or unexported fields
}

BrokerMetadata is metadata for a broker.

This struct mirrors kmsg.MetadataResponseBroker.

type BrokerReadHook

type BrokerReadHook interface {
	// OnRead is passed the broker metadata, the key for the response that
	// was read, the number of bytes read (may not be the whole read if
	// there was an error), how long the client waited before reading the
	// response, how long it took to read the response, and any error.
	//
	// The bytes read does not count any tls overhead.
	OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error)
}

BrokerReadHook is called after a read from a broker.

Kerberos SASL does not cause read hooks, since it directly reads from the connection. This may change in the future such that the sasl authenticate key is used (even though sasl authenticate requests are not being issued).

type BrokerThrottleHook added in v0.6.2

type BrokerThrottleHook interface {
	// OnThrottle is passed the broker metadata, the imposed throttling
	// interval, and whether the throttle was applied before Kafka
	// responded to them request or after.
	//
	// For Kafka < 2.0.0, the throttle is applied before issuing a response.
	// For Kafka >= 2.0.0, the throttle is applied after issuing a response.
	//
	// If throttledAfterResponse is false, then Kafka already applied the
	// throttle. If it is true, the client internally will not send another
	// request until the throttle deadline has passed.
	OnThrottle(meta BrokerMetadata, throttleInterval time.Duration, throttledAfterResponse bool)
}

BrokerThrottleHook is called after a response to a request is read from a broker, and the response identifies throttling in effect.

type BrokerWriteHook

type BrokerWriteHook interface {
	// OnWrite is passed the broker metadata, the key for the request that
	// was written, the number of bytes that were written (may not be the
	// whole request if there was an error), how long the request waited
	// before being written (including throttling waiting), how long it
	// took to write the request, and any error.
	//
	// The bytes written does not count any tls overhead.
	OnWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error)
}

BrokerWriteHook is called after a write to a broker.

Kerberos SASL does not cause write hooks, since it directly writes to the connection. This may change in the future such that the sasl authenticate key is used (even though sasl authenticate requests are not being issued).

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client issues requests and handles responses to a Kafka cluster.

func NewClient

func NewClient(opts ...Opt) (*Client, error)

NewClient returns a new Kafka client with the given options or an error if the options are invalid. Connections to brokers are lazily created only when requests are written to them.

By default, the client uses the latest stable request versions when talking to Kafka. If you use a broker older than 0.10.0, then you need to manually set a MaxVersions option. Otherwise, there is usually no harm in defaulting to the latest API versions, although occasionally Kafka introduces new required parameters that do not have zero value defaults.

NewClient also launches a goroutine which periodically updates the cached topic metadata.

func (*Client) AbortBufferedRecords

func (cl *Client) AbortBufferedRecords(ctx context.Context) error

AbortBufferedRecords fails all unflushed records with ErrAborted and waits for there to be no buffered records.

This accepts a context to quit the wait early, but it is strongly recommended to always wait for all records to be flushed. Waits should not occur. The only case where this function returns an error is if the context is canceled while flushing.

The intent of this function is to provide a way to clear the client's production backlog.

For example, before aborting a transaction and beginning a new one, it would be erroneous to not wait for the backlog to clear before beginning a new transaction. anything not cleared may be a part of the new transaction.

Records produced during or after a call to this function may not be failed, thus it is incorrect to concurrently produce with this function.

func (*Client) AssignGroup

func (cl *Client) AssignGroup(group string, opts ...GroupOpt)

AssignGroup assigns a group to consume from, overriding any prior assignment.

To leave a group, you can AssignGroup with an empty group, or just close the client. If you are using instance IDs, the client does not explicitly leave the group and instead you must issue a `kmsg.LeaveGroupRequest` manually (as expected when using instance IDs).

It is recommended to do one final blocking commit before leaving a group.

func (*Client) AssignGroupTransactSession

func (cl *Client) AssignGroupTransactSession(group string, opts ...GroupOpt) *GroupTransactSession

AssignGroupTransactSession is exactly the same as AssignGroup, but wraps the group consumer's OnRevoke with a function that will ensure a transaction session is correctly aborted.

When ETLing in a group in a transaction, if a rebalance happens before the transaction is ended, you either (a) must block the rebalance from finishing until you are done producing, and then commit before unblocking, or (b) allow the rebalance to happen, but abort any work you did.

The problem with (a) is that if your ETL work loop is slow, you run the risk of exceeding the rebalance timeout and being kicked from the group. You will try to commit, and depending on the Kafka version, the commit may even be erroneously successful (pre Kafka 2.5.0). This will lead to duplicates.

Instead, for safety, a GroupTransactSession favors (b). If a rebalance occurs at any time before ending a transaction with a commit, this will abort the transaction.

This leaves the risk that ending the transaction itself exceeds the rebalance timeout, but this is just one request with no cpu logic. With a proper rebalance timeout, this single request will not fail and the commit will succeed properly.

func (*Client) AssignPartitions

func (cl *Client) AssignPartitions(opts ...DirectConsumeOpt)

AssignPartitions assigns an exact set of partitions for the client to consume from. Any prior direct assignment or group assignment is invalidated.

This takes ownership of any assignments.

func (*Client) BeginTransaction

func (cl *Client) BeginTransaction() error

BeginTransaction sets the client to a transactional state, erroring if there is no transactional ID or if the client is already in a transaction.

func (*Client) BlockingCommitOffsets

func (cl *Client) BlockingCommitOffsets(
	ctx context.Context,
	uncommitted map[string]map[int32]EpochOffset,
	onDone func(*kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
)

BlockingCommitOffsets cancels any active CommitOffsets, begins a commit that cannot be canceled, and waits for that commit to complete. This function will not return until the commit is done and the onDone callback is complete.

The purpose of this function is for use in OnRevoke or committing before leaving a group.

For OnRevoke, you do not want to have a commit in OnRevoke canceled, because once the commit is done, rebalancing will continue. If you cancel an OnRevoke commit and commit after the revoke, you will be committing for a stale session, the commit will be dropped, and you will likely doubly process records.

For more information about committing, see the documentation for CommitOffsets.

func (*Client) Broker

func (cl *Client) Broker(id int) *Broker

Broker returns a handle to a specific broker to directly issue requests to. Note that there is no guarantee that this broker exists; if it does not, requests will fail with ErrUnknownBroker.

func (*Client) Close

func (cl *Client) Close()

Close leaves any group and closes all connections and goroutines.

func (*Client) CommitOffsets

func (cl *Client) CommitOffsets(
	ctx context.Context,
	uncommitted map[string]map[int32]EpochOffset,
	onDone func(*kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error),
)

CommitOffsets commits the given offsets for a group, calling onDone with the commit request and either the response or an error if the response was not issued. If uncommitted is empty or the client is not consuming as a group, onDone is called with (nil, nil, nil) and this function returns immediately. It is OK if onDone is nil, but you will not know if your commit succeeded.

If autocommitting is enabled, this function blocks autocommitting until this function is complete and the onDone has returned.

This function itself does not wait for the commit to finish. By default, this function is an asyncronous commit. You can use onDone to make it sync.

Note that this function ensures absolute ordering of commit requests by canceling prior requests and ensuring they are done before executing a new one. This means, for absolute control, you can use this function to periodically commit async and then issue a final sync commit before quitting (this is the behavior of autocommiting and using the default revoke). This differs from the Java async commit, which does not retry requests to avoid trampling on future commits.

If using autocommitting, autocommitting will resume once this is complete.

It is invalid to use this function to commit offsets for a transaction.

It is highly recommended to check the response's partition's error codes if the response is non-nil. While unlikely, individual partitions can error. This is most likely to happen if a commit occurs too late in a rebalance event.

If manually committing, you want to set OnRevoked to commit syncronously using BlockingCommitOffsets. Otherwise if committing async OnRevoked may return and a new group session may start before the commit is issued, leading to the commit being ignored and leading to duplicate messages.

func (*Client) CommittedOffsets

func (cl *Client) CommittedOffsets() map[string]map[int32]EpochOffset

CommittedOffsets returns the latest committed offsets. Committed offsets are updated from commits or from joining a group and fetching offsets.

If there are no committed offsets, this returns nil.

func (*Client) DiscoveredBrokers

func (cl *Client) DiscoveredBrokers() []*Broker

DiscoveredBrokers returns all brokers that were discovered from prior metadata responses. This does not actually issue a metadata request to load brokers; if you wish to ensure this returns all brokers, be sure to manually issue a metadata request before this. This also does not include seed brokers, which are internally saved under special internal broker IDs (but, it does include those brokers under their normal IDs as returned from a metadata response).

func (*Client) EndTransaction

func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) error

EndTransaction ends a transaction and resets the client's internal state to not be in a transaction.

Flush and CommitOffsetsForTransaction must be called before this function; this function does not flush and does not itself ensure that all buffered records are flushed. If no record yet has caused a partition to be added to the transaction, this function does nothing and returns nil. Alternatively, AbortBufferedRecords should be called before aborting a transaction to ensure that any buffered records not yet flushed will not be a part of a new transaction.

If records failed with UnknownProducerID and your Kafka version is at least 2.5.0, then aborting here will potentially allow the client to recover for more production.

func (*Client) Flush

func (cl *Client) Flush(ctx context.Context) error

Flush hangs waiting for all buffered records to be flushed, stopping all lingers if necessary.

If the context finishes (Done), this returns the context's error.

func (*Client) LeaveGroup added in v0.6.4

func (cl *Client) LeaveGroup()

LeaveGroup leaves a group if in one. Calling the client's Close function also leaves a group, so this is only necessary to call if you plan to leave the group and continue using the client.

If you have configured the group with an InstanceID, this does not leave the group. With instance IDs, it is expected that clients will restart and re-use the same instance ID. To leave a group using an instance ID, you must manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka scripts or kcl).

func (*Client) PollFetches

func (cl *Client) PollFetches(ctx context.Context) Fetches

PollFetches waits for fetches to be available, returning as soon as any broker returns a fetch. If the ctx quits, this function quits.

It is important to check all partition errors in the returned fetches. If any partition has a fatal error and actually had no records, fake fetch will be injected with the error.

It is invalid to call this multiple times concurrently.

func (*Client) Produce

func (cl *Client) Produce(
	ctx context.Context,
	r *Record,
	promise func(*Record, error),
) error

Produce sends a Kafka record to the topic in the record's Topic field, calling promise with the record or an error when Kafka replies.

The promise is optional, but not using it means you will not know if Kafka recorded a record properly.

If the record is too large to fit in a batch on its own in a produce request, the promise is called immediately before this function returns with kerr.MessageToLarge.

The context is used if the client currently has the max amount of buffered records. If so, the client waits for some records to complete or for the context or client to quit. If the context / client quits, this returns an error.

The first buffered record for an unknown topic begins a timeout for the configured record timeout limit; all records buffered within the wait will expire with the same timeout if the topic does not load in time. For simplicity, any time spent waiting for the topic to load is not persisted through once the topic loads, meaning the record may further wait once buffered. This may be changed in the future if necessary, however, the only reason for a topic to not load promptly is if it does not exist.

If manually flushing and there are already MaxBufferedRecords buffered, this will return ErrMaxBuffered.

If the client is transactional and a transaction has not been begun, this returns ErrNotInTransaction.

Thus, there are only three possible errors: ErrNotInTransaction, and then either a context error or ErrMaxBuffered.

func (*Client) Request

func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error)

Request issues a request to Kafka, waiting for and returning the response. If a retriable network error occurs, or if a retriable group / transaction coordinator error occurs, the request is retried. All other errors are returned.

If the request is an admin request, this will issue it to the Kafka controller. If the controller ID is unknown, this will attempt to fetch it. If the fetch errors, this will return an unknown controller error.

If the request is a group or transaction coordinator request, this will issue the request to the appropriate group or transaction coordinator.

For transaction requests, the request is issued to the transaction coordinator. However, if the request is an init producer ID request and the request has no transactional ID, the request goes to any broker.

Some requests need to be split and sent to many brokers. For these requests, it is *highly* recommended to use RequestSharded. Not all responses from many brokers can be cleanly merged. However, for the requests that are split, this does attempt to merge them in a sane way.

The following requests are split:

ListOffsets
DescribeGroups
ListGroups
DeleteRecords
OffsetForLeaderEpoch
DescribeConfigs
AlterConfigs
AlterReplicaLogDirs
DescribeLogDirs
DeleteGroups
IncrementalAlterConfigs

In short, this method tries to do the correct thing depending on what type of request is being issued.

The passed context can be used to cancel a request and return early. Note that if the request was written to Kafka but the context canceled before a response is received, Kafka may still operate on the received request.

func (*Client) RequestSharded added in v0.6.2

func (cl *Client) RequestSharded(ctx context.Context, req kmsg.Request) []ResponseShard

RequestSharded performs the same logic as Request, but returns all responses from any broker that the request was split to. This always returns at least one shard. If the request does not need to be issued (describing no groups), this issues the request to a random broker just to ensure that one shard exists.

There are only a few requests that are strongly recommended to explicitly use RequestSharded; the rest can by default use Request. These few requests are mentioned in the documentation for Request.

If, in the process of splitting a request, some topics or partitions are found to not exist, or Kafka replies that a request should go to a broker that does not exist, all those non-existent pieces are grouped into one request to the first seed broker. This will show up as a seed broker node ID (min int32) and the response will likely contain purely errors.

The response shards are ordered by broker metadata.

func (*Client) SeedBrokers

func (cl *Client) SeedBrokers() []*Broker

SeedBrokers returns the all seed brokers.

func (*Client) SetOffsets

func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset)

SetOffsets sets any matching offsets in setOffsets to the given epoch/offset. Partitions that are not specified are not set.

If using transactions, it is advised to just use a GroupTransactSession and avoid this function entirely.

func (*Client) UncommittedOffsets

func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset

UncommittedOffsets returns the latest uncommitted offsets. Uncommitted offsets are always updated on calls to PollFetches.

If there are no uncommitted offsets, this returns nil.

Note that, if manually committing, you should be careful with committing during group rebalances. You must ensure you commit before the group's session timeout is reached, otherwise this client will be kicked from the group and the commit will fail.

If using a cooperative balancer, commits while consuming during rebalancing may fail with REBALANCE_IN_PROGRESS.

type CompressionCodec

type CompressionCodec struct {
	// contains filtered or unexported fields
}

CompressionCodec configures how records are compressed before being sent.

Records are compressed within individual topics and partitions, inside of a RecordBatch. All records in a RecordBatch are compressed into one record for that batch.

func GzipCompression

func GzipCompression() CompressionCodec

GzipCompression enables gzip compression with the default compression level.

func Lz4Compression

func Lz4Compression() CompressionCodec

Lz4Compression enables lz4 compression with the fastest compression level.

func NoCompression

func NoCompression() CompressionCodec

NoCompression is the default compression used for messages and can be used as a fallback compression option.

func SnappyCompression

func SnappyCompression() CompressionCodec

SnappyCompression enables snappy compression.

func ZstdCompression

func ZstdCompression() CompressionCodec

ZstdCompression enables zstd compression with the default compression level.

func (CompressionCodec) WithLevel

func (c CompressionCodec) WithLevel(level int) CompressionCodec

WithLevel changes the compression codec's "level", effectively allowing for higher or lower compression ratios at the expense of CPU speed.

For the zstd package, the level is a typed int; simply convert the type back to an int for this function.

If the level is invalid, compressors just use a default level.

type ConsumerOpt

type ConsumerOpt interface {
	Opt
	// contains filtered or unexported methods
}

ConsumerOpt is a consumer-specific option to configure a client. This is simply a namespaced Opt.

func AllowedConcurrentFetches added in v0.6.4

func AllowedConcurrentFetches(n int) ConsumerOpt

AllowedConcurrentFetches sets the maximum number of fetch requests to allow in flight or buffered at once, overriding the unbounded (i.e. number of brokers) default.

This setting, paired with FetchMaxBytes, can upper bound the maximum amount of memory that the client can use for consuming.

Requests are issued to brokers in a FIFO order: once the client is ready to issue a request to a broker, it registers that request and issues it in order with other registrations.

If Kafka replies with any data, the client does not track the fetch as completed until the user has polled the buffered fetch. Thus, a concurrent fetch is not considered complete until all data from it is done being processed and out of the client itself.

Note that brokers are allowed to hang for up to FetchMaxWait before replying to a request, so if this option is too constrained and you are consuming a low throughput topic, the client may take a long time before requesting a broker that has new data. For high throughput topics, or if the allowed concurrent fetches is large enough, this should not be a concern.

A value of 0 implies the allowed concurrency is unbounded and will be limited only by the number of brokers in the cluster.

func ConsumeResetOffset

func ConsumeResetOffset(offset Offset) ConsumerOpt

ConsumeResetOffset sets the offset to restart consuming from when a partition has no commits (for groups) or when a fetch sees an OffsetOutOfRange error, overriding the default ConsumeStartOffset.

func FetchIsolationLevel

func FetchIsolationLevel(level IsolationLevel) ConsumerOpt

FetchIsolationLevel sets the "isolation level" used for fetching records, overriding the default ReadUncommitted.

func FetchMaxBytes

func FetchMaxBytes(b int32) ConsumerOpt

FetchMaxBytes sets the maximum amount of bytes a broker will try to send during a fetch, overriding the default 50MiB. Note that brokers may not obey this limit if it has messages larger than this limit. Also note that this client sends a fetch to each broker concurrently, meaning the client will buffer up to <brokers * max bytes> worth of memory.

This corresponds to the Java fetch.max.bytes setting.

If bumping this, consider bumping BrokerMaxReadBytes.

func FetchMaxPartitionBytes

func FetchMaxPartitionBytes(b int32) ConsumerOpt

FetchMaxPartitionBytes sets the maximum amount of bytes that will be consumed for a single partition in a fetch request, overriding the default 10MiB. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress.

This corresponds to the Java max.partition.fetch.bytes setting.

func FetchMaxWait

func FetchMaxWait(wait time.Duration) ConsumerOpt

FetchMaxWait sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes before returning, overriding the default 5s.

This corresponds to the Java replica.fetch.wait.max.ms setting.

func FetchMinBytes added in v0.6.2

func FetchMinBytes(b int32) ConsumerOpt

FetchMinBYtes sets the minimum amount of bytes a broker will try to send during a fetch, overriding the default 1 byte.

With the default of 1, data is sent as soon as it is available. By bumping this, the broker will try to wait for more data, which may improve server throughput at the expense of added latency.

This corresponds to the Java fetch.min.bytes setting.

func KeepControlRecords

func KeepControlRecords() ConsumerOpt

KeepControlRecords sets the client to keep control messages and return them with fetches, overriding the default that discards them.

Generally, control messages are not useful.

func Rack

func Rack(rack string) ConsumerOpt

Rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.

Consuming from a preferred replica can increase latency but can decrease cross datacenter costs. See KIP-392 for more information.

type DirectConsumeOpt

type DirectConsumeOpt interface {
	// contains filtered or unexported methods
}

DirectConsumeOpt is an option to configure direct topic / partition consuming.

func ConsumePartitions

func ConsumePartitions(partitions map[string]map[int32]Offset) DirectConsumeOpt

ConsumePartitions sets partitions to consume from directly and the offsets to start consuming those partitions from.

Offsets from option have higher precedence than ConsumeTopics. If a topic's partition is set in this option and that topic is also set in ConsumeTopics, offsets on partitions in this option are used in favor of the more general topic offset from ConsumeTopics.

func ConsumeTopics

func ConsumeTopics(offset Offset, topics ...string) DirectConsumeOpt

ConsumeTopics sets topics to consume directly and the offsets to start consuming partitions from in those topics.

If a metadata update sees partitions added to a topic, the client will automatically begin consuming from those new partitions.

func ConsumeTopicsRegex

func ConsumeTopicsRegex() DirectConsumeOpt

ConsumeTopicsRegex sets all topics in ConsumeTopics to be parsed as regular expressions.

type EpochOffset

type EpochOffset struct {
	Epoch  int32
	Offset int64
}

EpochOffset combines a record offset with the leader epoch the broker was at when the record was written.

type ErrDataLoss

type ErrDataLoss struct {
	// Topic is the topic data loss was detected on.
	Topic string
	// Partition is the partition data loss was detected on.
	Partition int32
	// ConsumedTo is what the client had consumed to for this partition before
	// data loss was detected.
	ConsumedTo int64
	// ResetTo is what the client reset the partition to; everything from
	// ResetTo to ConsumedTo was lost.
	ResetTo int64
}

ErrDataLoss is returned for Kafka >=2.1.0 when data loss is detected and the client is able to reset to the last valid offset.

func (*ErrDataLoss) Error

func (e *ErrDataLoss) Error() string

type Fetch

type Fetch struct {
	// Topics are all topics being responded to from a fetch to a broker.
	Topics []FetchTopic
}

Fetch is an individual response from a broker.

type FetchError

type FetchError struct {
	Topic     string
	Partition int32
	Err       error
}

FetchError is an error in a fetch along with the topic and partition that the error was on.

type FetchPartition

type FetchPartition struct {
	// Partition is the partition this is for.
	Partition int32
	// Err is an error for this partition in the fetch.
	//
	// Note that if this is a fatal error, such as data loss or non
	// retriable errors, this partition will never be fetched again.
	Err error
	// HighWatermark is the current high watermark for this partition, that
	// is, the current offset that is on all in sync replicas.
	HighWatermark int64
	// LastStableOffset is the offset at which all prior offsets have been
	// "decided". Non transactional records are always decided immediately,
	// but transactional records are only decided once they are commited or
	// aborted.
	//
	// The LastStableOffset will always be at or under the HighWatermark.
	LastStableOffset int64
	// LogStartOffset is the low watermark of this partition, otherwise
	// known as the earliest offset in the partition.
	LogStartOffset int64
	// Records contains feched records for this partition.
	Records []*Record
}

FetchPartition is a response for a partition in a fetched topic from a broker.

type FetchTopic

type FetchTopic struct {
	// Topic is the topic this is for.
	Topic string
	// Partitions contains individual partitions in the topic that were
	// fetched.
	Partitions []FetchPartition
}

FetchTopic is a response for a fetched topic from a broker.

type Fetches

type Fetches []Fetch

Fetches is a group of fetches from brokers.

func (Fetches) Errors

func (fs Fetches) Errors() []FetchError

Errors returns all errors in a fetch with the topic and partition that errored.

func (Fetches) RecordIter

func (fs Fetches) RecordIter() *FetchesRecordIter

RecordIter returns an iterator over all records in a fetch.

Note that errors should be inspected as well.

type FetchesRecordIter

type FetchesRecordIter struct {
	// contains filtered or unexported fields
}

FetchesRecordIter iterates over records in a fetch.

func (*FetchesRecordIter) Done

func (i *FetchesRecordIter) Done() bool

Done returns whether there are any more records to iterate over.

func (*FetchesRecordIter) Next

func (i *FetchesRecordIter) Next() *Record

Next returns the next record from a fetch.

type GroupBalancer

type GroupBalancer interface {
	// contains filtered or unexported methods
}

GroupBalancer balances topics and partitions among group members.

func CooperativeStickyBalancer

func CooperativeStickyBalancer() GroupBalancer

CooperativeStickyBalancer performs the sticky balancing strategy, but additionally opts the consumer group into "cooperative" rebalancing.

Cooperative rebalancing differs from "eager" (the original) rebalancing in that group members do not stop processing partitions during the rebalance. Instead, once they receive their new assignment, each member determines which partitions it needs to revoke. If any, they send a new join request (before syncing), and the process starts over. This should ultimately end up in only two join rounds, with the major benefit being that processing never needs to stop.

NOTE once a group is collectively using cooperative balancing, it is unsafe to have a member join the group that does not support cooperative balancing. If the only-eager member is elected leader, it will not know of the new multiple join strategy and things will go awry. Thus, once a group is entirely on cooperative rebalancing, it cannot go back.

Migrating an eager group to cooperative balancing requires two rolling bounce deploys. The first deploy should add the cooperative-sticky strategy as an option (that is, each member goes from using one balance strategy to two). During this deploy, Kafka will tell leaders to continue using the old eager strategy, since the old eager strategy is the only one in common among all members. The second rolling deploy removes the old eager strategy. At this point, Kafka will tell the leader to use cooperative-sticky balancing. During this roll, all members in the group that still have both strategies continue to be eager and give up all of their partitions every rebalance. However, once a member only has cooperative-sticky, it can begin using this new strategy and things will work correctly. See KIP-429 for more details.

func RangeBalancer

func RangeBalancer() GroupBalancer

RangeBalancer returns a group balancer that, per topic, maps partitions to group members. Since this works on a topic level, uneven partitions per topic to the number of members can lead to slight partition consumption disparities.

Suppose there are two members M0 and M1, two topics t0 and t1, and each topic has three partitions p0, p1, and p2. The partition balancing will be

M0: [t0p0, t0p1, t1p0, t1p1]
M1: [t0p2, t1p2]

This is equivalent to the Java range balancer.

func RoundRobinBalancer

func RoundRobinBalancer() GroupBalancer

RoundRobinBalancer returns a group balancer that evenly maps topics and partitions to group members.

Suppose there are two members M0 and M1, two topics t0 and t1, and each topic has three partitions p0, p1, and p2. The partition balancing will be

M0: [t0p0, t0p2, t1p1]
M1: [t0p1, t1p0, t1p2]

If all members subscribe to all topics equally, the roundrobin balancer will give a perfect balance. However, if topic subscriptions are quite unequal, the roundrobin balancer may lead to a bad balance. See KIP-49 for one example (note that the fair strategy mentioned in KIP-49 does not exist).

This is equivalent to the Java roundrobin balancer.

func StickyBalancer

func StickyBalancer() GroupBalancer

StickyBalancer returns a group balancer that ensures minimal partition movement on group changes while also ensuring optimal balancing.

Suppose there are three members M0, M1, and M3, and two topics t0 and t1 each with three partitions p0, p1, and p2. If the initial balance plan looks like

M0: [t0p0, t0p1, t0p2]
M1: [t1p0, t1p1, t1p2]
M2: [t2p0, t2p2, t2p2]

If M2 disappears, both roundrobin and range would have mostly destructive reassignments.

Range would result in

M0: [t0p0, t0p1, t1p0, t1p1, t2p0, t2p1]
M1: [t0p2, t1p2, t2p2]

which is imbalanced and has 3 partitions move from members that did not need to move (t0p2, t1p0, t1p1).

RoundRobin would result in

M0: [t0p0, t0p2, t1p1, t2p0, t2p2]
M1: [t0p1, t1p0, t1p2, t2p1]

which is balanced, but has 2 partitions move when they do not need to (t0p1, t1p1).

Sticky balancing results in

M0: [t0p0, t0p1, t0p2, t2p0, t2p2]
M1: [t1p0, t1p1, t1p2, t2p1]

which is balanced and does not cause any unnecessary partition movement. The actual t2 partitions may not be in that exact combination, but they will be balanced.

An advantage of the sticky consumer is that it allows API users to potentially avoid some cleanup until after the consumer knows which partitions it is losing when it gets its new assignment. Users can then only cleanup state for partitions that changed, which will be minimal (see KIP-54; this client also includes the KIP-351 bugfix).

Note that this API implements the sticky partitioning quite differently from the Java implementation. The Java implementaiton is difficult to reason about and has many edge cases that result in non-optimal balancing (albeit, you likely have to be trying to hit those edge cases). This API uses a different algorithm (A*) to ensure optimal balancing while being an order of magnitude faster.

Since the new strategy is a strict improvement over the Java strategy, it is entirely compatible. Any Go client sharing a group with a Java client will not have its decisions undone on leadership change from a Go consumer to a Java one. Java balancers do not apply the strategy it comes up with if it deems the balance score equal to or worse than the original score (the score being effectively equal to the standard deviation of the mean number of assigned partitions). This Go sticky balancer is optimal and extra sticky. Thus, the Java balancer will never back out of a strategy from this balancer.

type GroupOpt

type GroupOpt interface {
	// contains filtered or unexported methods
}

GroupOpt is an option to configure group consuming.

func AutoCommitInterval

func AutoCommitInterval(interval time.Duration) GroupOpt

AutoCommitInterval sets how long to go between autocommits, overriding the default 5s.

func Balancers

func Balancers(balancers ...GroupBalancer) GroupOpt

Balancers sets the group balancers to use for dividing topic partitions among group members, overriding the defaults.

The current default is [cooperative-sticky].

For balancing, Kafka chooses the first protocol that all group members agree to support.

Note that if you want to opt in to cooperative-sticky rebalancing, cooperative group balancing is incompatible with eager (classical) rebalancing and requires a careful rollout strategy (see KIP-429).

func DisableAutoCommit

func DisableAutoCommit() GroupOpt

DisableAutoCommit disable auto committing.

func GroupTopics

func GroupTopics(topics ...string) GroupOpt

GroupTopics adds topics to use for group consuming.

func GroupTopicsRegex

func GroupTopicsRegex() GroupOpt

GroupTopicsRegex sets all topics in GroupTopics to be parsed as regular expressions.

func HeartbeatInterval

func HeartbeatInterval(interval time.Duration) GroupOpt

HeartbeatInterval sets how long a group member goes between heartbeats to Kafka, overriding the default 3,000ms.

Kafka uses heartbeats to ensure that a group member's session stays active. This value can be any value lower than the session timeout, but should be no higher than 1/3rd the session timeout.

This corresponds to Kafka's heartbeat.interval.ms.

func InstanceID

func InstanceID(id string) GroupOpt

InstanceID sets the group consumer's instance ID, switching the group member from "dynamic" to "static".

Prior to Kafka 2.3.0, joining a group gave a group member a new member ID. The group leader could not tell if this was a rejoining member. Thus, any join caused the group to rebalance.

Kafka 2.3.0 introduced the concept of an instance ID, which can persist across restarts. This allows for avoiding many costly rebalances and allows for stickier rebalancing for rejoining members (since the ID for balancing stays the same). The main downsides are that you, the user of a client, have to manage instance IDs properly, and that it may take longer to rebalance in the event that a client legitimately dies.

When using an instance ID, the client does NOT send a leave group request when closing. This allows for the client ot restart with the same instance ID and rejoin the group to avoid a rebalance. It is strongly recommended to increase the session timeout enough to allow time for the restart (remember that the default session timeout is 10s).

To actually leave the group, you must use an external admin command that issues a leave group request on behalf of this instance ID (see kcl), or you can manually use the kmsg package with a proper LeaveGroupRequest.

NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4.0+.

func OnAssigned

func OnAssigned(onAssigned func(context.Context, map[string][]int32)) GroupOpt

OnAssigned sets the function to be called when a group is joined after partitions are assigned before fetches for those partitions begin.

This function combined with OnRevoked should not exceed the rebalance interval. It is possible for the group, immediately after finishing a balance, to re-enter a new balancing session.

The OnAssigned function is passed the group's context, which is only canceled if the group is left or the client is closed.

func OnLost

func OnLost(onLost func(context.Context, map[string][]int32)) GroupOpt

OnLost sets the function to be called on "fatal" group errors, such as IllegalGeneration, UnknownMemberID, and authentication failures. This function differs from OnRevoked in that it is unlikely that commits will succeed when partitions are outright lost, whereas commits likely will succeed when revoking partitions.

If not set, OnRevoked is used.

func OnRevoked

func OnRevoked(onRevoked func(context.Context, map[string][]int32)) GroupOpt

OnRevoked sets the function to be called once this group member has partitions revoked.

This function combined with OnAssigned should not exceed the rebalance interval. It is possible for the group, immediately after finishing a balance, to re-enter a new balancing session.

If autocommit is enabled, the default OnRevoked is a blocking commit all offsets. The reason for a blocking commit is so that no later commit cancels the blocking commit. If the commit in OnRevoked were canceled, then the rebalance would proceed immediately, the commit that canceled the blocking commit would fail, and duplicates could be consumed after the rebalance completes.

The OnRevoked function is passed the group's context, which is only canceled if the group is left or the client is closed.

OnRevoked function is called at the end of a group session even if there are no partitions being revoked.

If you are committing offsets manually (have disabled autocommitting), it is highly recommended to do a proper blocking commit in OnRevoked.

func RebalanceTimeout

func RebalanceTimeout(timeout time.Duration) GroupOpt

RebalanceTimeout sets how long group members are allowed to take when a a rebalance has begun, overriding the default 60,000ms. This timeout is how long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).

Kafka uses the largest rebalance timeout of all members in the group. If a member does not rejoin within this timeout, Kafka will kick that member from the group.

This corresponds to Kafka's rebalance.timeout.ms.

func RequireStableFetchOffsets

func RequireStableFetchOffsets() GroupOpt

RequireStableFetchOffsets sets the group consumer to require "stable" fetch offsets before consuming from the group. Proposed in KIP-447 and introduced in Kafka 2.5.0, stable offsets are important when consuming from partitions that a transactional producer could be committing to.

With this option, Kafka will block group consumers from fetching offsets for partitions that are in an active transaction.

Because this can block consumption, it is strongly recommended to set transactional timeouts to a small value (10s) rather than the default 60s. Lowering the transactional timeout will reduce the chance that consumers are entirely blocked.

func SessionTimeout

func SessionTimeout(timeout time.Duration) GroupOpt

SessionTimeout sets how long a member the group can go between heartbeats, overriding the default 10,000ms. If a member does not heartbeat in this timeout, the broker will remove the member from the group and initiate a rebalance.

This corresponds to Kafka's session.timeout.ms setting and must be within the broker's group.min.session.timeout.ms and group.max.session.timeout.ms.

type GroupTransactSession

type GroupTransactSession struct {
	// contains filtered or unexported fields
}

GroupTransactSession abstracts away the proper way to begin a transaction and more importantly how to end a transaction when consuming in a group, modifying records, and producing (EOS transaction).

func (*GroupTransactSession) Begin

func (s *GroupTransactSession) Begin() error

Begin begins a transaction, returning an error if the client has no transactional id or is already in a transaction.

Begin must be called before producing records in a transaction.

Note that a revoke of any partitions sets the session's revoked state, even if the session has not begun. This state is only reset on EndTransaction. Thus, it is safe to begin transactions after a poll (but still before you produce).

func (*GroupTransactSession) End

End ends a transaction, committing if commit is true, if the group did not rebalance since the transaction began, and if committing offsets is successful. If commit is false, the group has rebalanced, or any partition in committing offsets fails, this aborts.

This returns whether the transaction committed or any error that occurred.

type Hook

type Hook interface{}

Hook is a hook to be called when something happens in kgo.

The base Hook interface is useless, but wherever a hook can occur in kgo, the client checks if your hook implements an appropriate interface. If so, your hook is called.

This allows you to only hook in to behavior you care about, and it allows the client to add more hooks in the future.

All hook interfaces in this package have Hook in the name. Hooks must be safe for concurrent use. It is expected that hooks are fast; if a hook needs to take time, then copy what you need and ensure the hook is async.

type IsolationLevel

type IsolationLevel struct {
	// contains filtered or unexported fields
}

IsolationLevel controls whether uncommitted or only committed records are returned from fetch requests.

func ReadCommitted

func ReadCommitted() IsolationLevel

ReadCommitted is an isolation level to only fetch committed records.

func ReadUncommitted

func ReadUncommitted() IsolationLevel

ReadUncommitted (the default) is an isolation level that returns the latest produced records, be they committed or not.

type LogLevel

type LogLevel int8

LogLevel designates which level the logger should log at.

const (
	// LogLevelNone disables logging.
	LogLevelNone LogLevel = iota
	// LogLevelError logs all errors. Generally, these should not happen.
	LogLevelError
	// LogLevelWarn logs all warnings, such as request failures.
	LogLevelWarn
	// LogLevelInfo logs informational messages, such as requests. This is
	// usually the default log level.
	LogLevelInfo
	// LogLevelDebug logs verbose information, and is usually not used in
	// production.
	LogLevelDebug
)

func (LogLevel) String

func (l LogLevel) String() string

type Logger

type Logger interface {
	// Level returns the log level to log at.
	//
	// Implementations can change their log level on the fly, but this
	// function must be safe to call concurrently.
	Level() LogLevel

	// Log logs a message with key, value pair arguments for the given log
	// level.
	//
	// This must be safe to call concurrently.
	Log(level LogLevel, msg string, keyvals ...interface{})
}

Logger is used to log informational messages.

func BasicLogger

func BasicLogger(dst io.Writer, level LogLevel, prefixFn func() string) Logger

BasicLogger returns a logger that will print to dst in the following format:

prefix [LEVEL] message; key: val, key: val

prefixFn is optional; if non-nil, it is called for a per-message prefix.

Writes to dst are not checked for errors.

type Offset

type Offset struct {
	// contains filtered or unexported fields
}

Offset is a message offset in a partition.

func NewOffset

func NewOffset() Offset

NewOffsetcreates and returns an offset to use in AssignPartitions.

The default offset begins at the end.

func (Offset) At

func (o Offset) At(at int64) Offset

At returns a copy of the calling offset, changing the returned offset to begin at exactly the requested offset.

There are two potential special offsets to use: -2 allows for consuming at the start, and -1 allows for consuming at the end. These two offsets are equivalent to calling AtStart or AtEnd.

If the offset is less than -2, the client bounds it to -2 to consume at the start.

func (Offset) AtEnd

func (o Offset) AtEnd() Offset

AtEnd returns a copy of the calling offset, changing the returned offset to begin at the end of a partition.

func (Offset) AtStart

func (o Offset) AtStart() Offset

AtStart returns a copy of the calling offset, changing the returned offset to begin at the beginning of a partition.

func (Offset) Relative

func (o Offset) Relative(n int64) Offset

Relative returns a copy of the calling offset, changing the returned offset to be n relative to what it currently is. If the offset is beginning at the end, Relative(-100) will begin 100 before the end.

func (Offset) WithEpoch

func (o Offset) WithEpoch(e int32) Offset

WithEpoch returns a copy of the calling offset, changing the returned offset to use the given epoch. This epoch is used for truncation detection; the default of -1 implies no truncation detection.

type Opt

type Opt interface {
	// contains filtered or unexported methods
}

Opt is an option to configure a client.

func AutoTopicCreation

func AutoTopicCreation() Opt

AutoTopicCreation enables topics to be auto created if they do not exist when fetching their metadata.

func BrokerConnDeadRetries

func BrokerConnDeadRetries(n int) Opt

BrokerConnDeadRetries sets the number of tries that are allowed for requests that fail before a response is even received, overriding the default 20.

These retries are for cases where a connection to a broker fails during a request. Generally, you just must retry these requests anyway. However, if Kafka cannot understand our request, it closes a connection deliberately. Since the client cannot differentiate this case, we upper bound the number of times we allow a connection to be cut before we call it quits on the request, assuming that if a connection is cut *so many times* repeatedly, then it is likely not the network but instead Kafka indicating a problem.

The only error to trigger this is ErrConnDead, indicating Kafka closed the connection (or the connection actually just died).

This setting applies to all but internally generated fetch and produce requests.

func BrokerMaxReadBytes added in v0.6.2

func BrokerMaxReadBytes(v int32) Opt

BrokerMaxReadBytes sets the maximum response size that can be read from Kafka, overriding the default 100MiB.

This is a safety measure to avoid OOMing on invalid responses. This is slightly double FetchMaxBytes; if bumping that, consider bump this. No other response should run the risk of hitting this limit.

func BrokerMaxWriteBytes

func BrokerMaxWriteBytes(v int32) Opt

BrokerMaxWriteBytes upper bounds the number of bytes written to a broker connection in a single write, overriding the default 100MiB.

This number corresponds to the a broker's socket.request.max.bytes, which defaults to 100MiB.

The only Kafka request that could come reasonable close to hitting this limit should be produce requests, and thus this limit is only enforced for produce requests.

func ClientID

func ClientID(id string) Opt

ClientID uses id for all requests sent to Kafka brokers, overriding the default "kgo".

func ConnTimeoutOverhead

func ConnTimeoutOverhead(overhead time.Duration) Opt

ConnTimeoutOverhead uses the given time as overhead while deadlining requests, overriding the default overhead of 20s.

For most requests, the overhead will simply be this timeout. However, for any request with a TimeoutMillis field, the overhead is added on top of the request's TimeoutMillis. This ensures that we give Kafka enough time to actually process the request given the timeout, while still having a deadline on the connection as a whole to ensure it does not hang.

For writes, the timeout is always the overhead. We buffer writes in our client before one quick flush, so we always expect the write to be fast.

Using 0 has the overhead disables timeouts.

func Dialer

func Dialer(fn func(ctx context.Context, network, host string) (net.Conn, error)) Opt

Dialer uses fn to dial addresses, overriding the default dialer that uses a 10s dial timeout and no TLS.

The context passed to the dial function is the context used in the request that caused the dial. If the request is a client-internal request, the context is the context on the client itself (which is canceled when the client is closed).

This function has the same signature as net.Dialer's DialContext and tls.Dialer's DialContext, meaning you can use this function like so:

kgo.Dialer((&net.Dialer{Timeout: 10*time.Second}).DialContext)

or

kgo.Dialer((&tls.Dialer{...})}.DialContext)

func DisableClientID

func DisableClientID() Opt

DisableClientID sets the client ID to null for all requests sent to Kafka brokers, overriding the default "kgo".

func MaxVersions

func MaxVersions(versions *kversion.Versions) Opt

MaxVersions sets the maximum Kafka version to try, overriding the internal unbounded (latest stable) versions.

Note that specific max version pinning is required if trying to interact with versions pre 0.10.0. Otherwise, unless using more complicated requests that this client itself does not natively use, it is generally safe to opt for the latest version. If using the kmsg package directly to issue requests, it is recommended to pin versions so that new fields on requests do not get invalid default zero values before you update your usage.

func MetadataMaxAge

func MetadataMaxAge(age time.Duration) Opt

MetadataMaxAge sets the maximum age for the client's cached metadata, overriding the default 5m, to allow detection of new topics, partitions, etc.

This corresponds to Kafka's metadata.max.age.ms.

func MetadataMinAge

func MetadataMinAge(age time.Duration) Opt

MetadataMinAge sets the minimum time between metadata queries, overriding the default 10s. You may want to raise or lower this to reduce the number of metadata queries the client will make. Notably, if metadata detects an error in any topic or partition, it triggers itself to update as soon as allowed. Additionally, any connection failures causing backoff while producing or consuming trigger metadata updates, because the client must assume that maybe the connection died due to a broker dying.

func MinVersions added in v0.6.2

func MinVersions(versions *kversion.Versions) Opt

MinVersions sets the minimum Kafka version a request can be downgraded to, overriding the default of the lowest version.

This option is useful if you are issuing requests that you absolutely do not want to be downgraded; that is, if you are relying on features in newer requests, and you are not sure if your brokers can handle those features. By setting a min version, if the client detects it needs to downgrade past the version, it will instead avoid issuing the request.

Unlike MaxVersions, if a request is issued that is unknown to the min versions, the request is allowed. It is assumed that there is no lower bound for that request.

func RequestRetries

func RequestRetries(n int) Opt

RequestRetries sets the number of tries that retriable requests are allowed, overriding the unlimited default.

This setting applies to all types of requests.

func RetryBackoff

func RetryBackoff(backoff func(int) time.Duration) Opt

RetryBackoff sets the backoff strategy for how long to backoff for a given amount of retries, overriding the default exponential backoff that ranges from 100ms min to 1s max.

This (roughly) corresponds to Kafka's retry.backoff.ms setting and retry.backoff.max.ms (which is being introduced with KIP-500).

func RetryTimeout

func RetryTimeout(t func(int16) time.Duration) Opt

RetryTimeout sets the upper limit on how long we allow requests to retry, overriding the default of 5m for EndTxn requests, 1m for all others.

This timeout applies to any request issued through a client's Request function. It does not apply to fetches nor produces.

The function is called with the request key that is being retried. While it is not expected that the request key will be used, including it gives users the opportinuty to have different retry timeouts for different keys.

If the function returns zero, there is no retry timeout.

The timeout is evaluated after a request is issued. If a retry backoff places the next request past the retry timeout deadline, the request will still be tried once more once the backoff expires.

func SASL

func SASL(sasls ...sasl.Mechanism) Opt

SASL appends sasl authentication options to use for all connections.

SASL is tried in order; if the broker supports the first mechanism, all connections will use that mechanism. If the first mechanism fails, the client will pick the first supported mechanism. If the broker does not support any client mechanisms, connections will fail.

func SeedBrokers

func SeedBrokers(seeds ...string) Opt

SeedBrokers sets the seed brokers for the client to use, overriding the default 127.0.0.1:9092.

Any seeds that are missing a port use the default Kafka port 9092.

func SoftwareNameAndVersion

func SoftwareNameAndVersion(name, version string) Opt

SoftwareNameAndVersion sets the client software name and version that will be sent to Kafka as part of the ApiVersions request as of Kafka 2.4.0, overriding the default "kgo" and internal version number.

Kafka exposes this through metrics to help operators understand the impact of clients.

It is generally not recommended to set this. As well, if you do, the name and version must match the following regular expression:

[a-zA-Z0-9](?:[a-zA-Z0-9\\-.]*[a-zA-Z0-9])?

Note this means neither the name nor version can be empty.

func WithHooks

func WithHooks(hooks ...Hook) Opt

WithHooks sets hooks to call whenever relevant.

Hooks can be used to layer in metrics (such as Prometheus hooks) or anything else. The client will call all hooks in order. See the Hooks interface for more information, as well as any interface that contains "Hook" in the name to know the available hooks. A single hook can implement zero or all hook interfaces, and only the hooks that it implements will be called.

func WithLogger

func WithLogger(l Logger) Opt

WithLogger sets the client to use the given logger, overriding the default to not use a logger.

It is invalid to use a nil logger; doing so will cause panics.

type Partitioner

type Partitioner interface {
	// forTopic returns a partitioner for an individual topic. It is
	// guaranteed that only one record will use the an individual topic's
	// topicPartitioner at a time, meaning partitioning within a topic does
	// not require locks.
	ForTopic(string) TopicPartitioner
}

Partitioner creates topic partitioners to determine which partition messages should be sent to.

func StickyKeyPartitioner

func StickyKeyPartitioner(overrideHasher PartitionerHasher) Partitioner

StickyKeyPartitioner mirrors the default Java partitioner from Kafka's 2.4.0 release (see KAFKA-8601).

This is the same "hash the key consistently, if no key, choose random partition" strategy that the Java partitioner has always used, but rather than always choosing a random partition, the partitioner pins a partition to produce to until that partition rolls over to a new batch. Only when rolling to new batches does this partitioner switch partitions.

The benefit with this pinning is less CPU utilization on Kafka brokers. Over time, the random distribution is the same, but the brokers are handling on average larger batches.

overrideHasher is optional; if nil, this will return a partitioner that partitions exactly how Kafka does. Specifically, the partitioner will use murmur2 to hash keys, will mask out the 32nd bit, and then will mod by the number of potential partitions.

func StickyPartitioner

func StickyPartitioner() Partitioner

StickyPartitioner is the same as StickyKeyPartitioner, but with no logic to consistently hash keys. That is, this only partitions according to the sticky partition strategy.

type PartitionerHasher

type PartitionerHasher func([]byte, int) int

PartitionerHasher returns a partition to use given the input data and number of partitions.

func KafkaHasher

func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher

KafkaHasher returns a PartitionerHasher using hashFn that mirrors how Kafka partitions after hashing data.

func SaramaHasher

func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher

SaramaHasher returns a PartitionerHasher using hashFn that mirrors how Sarama partitions after hashing data.

type ProducerOpt

type ProducerOpt interface {
	Opt
	// contains filtered or unexported methods
}

ProducerOpt is a producer-specific option to configure a client. This is simply a namespaced Opt.

func BatchCompression

func BatchCompression(preference ...CompressionCodec) ProducerOpt

BatchCompression sets the compression codec to use for producing records.

Compression is chosen in the order preferred based on broker support. For example, zstd compression was introduced in Kafka 2.1.0, so the preference can be first zstd, fallback snappy, fallback none.

The default preference is [snappy, none], which should be fine for all old consumers since snappy compression has existed since Kafka 0.8.0. To use zstd, your brokers must be at least 2.1.0 and all consumers must be upgraded to support decoding zstd records.

func BatchMaxBytes

func BatchMaxBytes(v int32) ProducerOpt

BatchMaxBytes upper bounds the size of a record batch, overriding the default 1MB.

This corresponds to Kafka's max.message.bytes, which defaults to 1,000,012 bytes (just over 1MB).

Record batches are independent of a ProduceRequest: a record batch is specific to a topic and partition, whereas the produce request can contain many record batches for many topics.

If a single record encodes larger than this number (before compression), it will will not be written and a callback will have the appropriate error.

Note that this is the maximum size of a record batch before compression. If a batch compresses poorly and actually grows the batch, the uncompressed form will be used.

func Linger

func Linger(linger time.Duration) ProducerOpt

Linger sets how long individual topic partitions will linger waiting for more records before triggering a request to be built.

Note that this option should only be used in low volume producers. The only benefit of lingering is to potentially build a larger batch to reduce cpu usage on the brokers if you have many producers all producing small amounts.

If a produce request is triggered by any topic partition, all partitions with a possible batch to be sent are used and all lingers are reset.

As mentioned, the linger is specific to topic partition. A high volume producer will likely be producing to many partitions; it is both unnecessary to linger in this case and inefficient because the client will have many timers running (and stopping and restarting) unnecessarily.

func ManualFlushing

func ManualFlushing() ProducerOpt

ManualFlushing disables auto-flushing when producing. While you can still set lingering, it would be useless to do so.

With manual flushing, producing while MaxBufferedRecords have already been produced and not flushed will return ErrMaxBuffered.

func MaxBufferedRecords

func MaxBufferedRecords(n int) ProducerOpt

MaxBufferedRecords sets the max amount of records the client will buffer, blocking produces until records are finished if this limit is reached. This overrides the unbounded default.

func OnDataLoss

func OnDataLoss(fn func(string, int32)) ProducerOpt

OnDataLoss sets a function to call if data loss is detected when producing records if the client is configured to continue on data loss. Thus, this option is mutually exclusive with StopOnDataLoss.

The passed function will be called with the topic and partition that data loss was detected on.

func ProduceRequestTimeout

func ProduceRequestTimeout(limit time.Duration) ProducerOpt

ProduceRequestTimeout sets how long Kafka broker's are allowed to respond to produce requests, overriding the default 30s. If a broker exceeds this duration, it will reply with a request timeout error.

This corresponds to Kafka's request.timeout.ms setting, but only applies to produce requests.

func RecordPartitioner

func RecordPartitioner(partitioner Partitioner) ProducerOpt

RecordPartitioner uses the given partitioner to partition records, overriding the default StickyKeyPartitioner.

func RecordTimeout

func RecordTimeout(timeout time.Duration) ProducerOpt

RecordTimeout sets a rough time of how long a record can sit around in a batch before timing out.

Note that the timeout for all records in a batch inherit the timeout of the first record in that batch. That is, once the first record's timeout expires, all records in the batch are expired. This generally is a non-issue unless using this option with lingering. In that case, simply add the linger to the record timeout to avoid problems.

Also note that the timeout is only evaluated after a produce response, and only for batches that need to be retried. Thus, a sink backoff may delay record timeout slightly. As with lingering, this also should generally be a non-issue.

func RequiredAcks

func RequiredAcks(acks Acks) ProducerOpt

RequiredAcks sets the required acks for produced records, overriding the default RequireAllISRAcks.

func StopOnDataLoss

func StopOnDataLoss() ProducerOpt

StopOnDataLoss sets the client to stop producing if data loss is detected, overriding the default false.

Note that if using this option, it is strongly recommended to not have a retry limit. Doing so may lead to errors where the client fails a batch on a recoverable error, which internally bumps the idempotent sequence number used for producing, which may then later cause an inadvertent out of order sequence number and false "data loss" detection.

func TransactionTimeout

func TransactionTimeout(timeout time.Duration) ProducerOpt

TransactionTimeout sets the allowed for a transaction, overriding the default 60s. It may be a good idea to set this under the rebalance timeout for a group, so that a produce will not complete successfully after the consumer group has already moved the partitions the consumer/producer is working on from one group member to another.

Transaction timeouts begin when the first record is produced within a transaction, not when a transaction begins.

func TransactionalID

func TransactionalID(id string) ProducerOpt

TransactionalID sets a transactional ID for the client, ensuring that records are produced transactionally under this ID (exactly once semantics).

For Kafka-to-Kafka transactions, the transactional ID is only one half of the equation. You must also assign a group to consume from.

To produce transactionally, you first BeginTransaction, then produce records consumed from a group, then you EndTransaction. All records prodcued outside of a transaction will fail immediately with an error.

After producing a batch, you must commit what you consumed. Auto committing offsets is disabled during transactional consuming / producing.

Note that, unless using Kafka 2.5.0, a consumer group rebalance may be problematic. Production should finish and be committed before the client rejoins the group. It may be safer to use an eager group balancer and just abort the transaction. Alternatively, any time a partition is revoked, you could abort the transaction and reset offsets being consumed.

If the client detects an unrecoverable error, all records produced thereafter will fail.

Lastly, the default read level is READ_UNCOMMITTED. Be sure to use the ReadIsolationLevel option if you want to only read committed.

type Record

type Record struct {
	// Key is an optional field that can be used for partition assignment.
	//
	// This is generally used with a hash partitioner to cause all records
	// with the same key to go to the same partition.
	Key []byte
	// Value is blob of data to write to Kafka.
	Value []byte

	// Headers are optional key/value pairs that are passed along with
	// records.
	//
	// These are purely for producers and consumers; Kafka does not look at
	// this field and only writes it to disk.
	Headers []RecordHeader

	// Timestamp is the timestamp that will be used for this record.
	//
	// Record batches are always written with "CreateTime", meaning that
	// timestamps are generated by clients rather than brokers.
	//
	// This field is always set in Produce.
	Timestamp time.Time

	// Topic is the topic that a record is written to.
	//
	// This must be set for producing.
	Topic string

	// Partition is the partition that a record is written to.
	//
	// For producing, this is left unset. This will be set by the client
	// as appropriate.
	Partition int32

	// Attrs specifies what attributes were on this record.
	Attrs RecordAttrs

	// ProducerEpoch is the producer epoch of this message if it was
	// produced with a producer ID. An epoch and ID of 0 means it was not.
	//
	// For producing, this is left unset. This will be set by the client
	// as appropriate.
	ProducerEpoch int16

	// ProducerEpoch is the producer ID of this message if it was produced
	// with a producer ID. An epoch and ID of 0 means it was not.
	//
	// For producing, this is left unset. This will be set by the client
	// as appropriate.
	ProducerID int64

	// LeaderEpoch is the leader epoch of the broker at the time this
	// record was written, or -1 if on message sets.
	LeaderEpoch int32

	// Offset is the offset that a record is written as.
	//
	// For producing, this is left unset. This will be set by the client as
	// appropriate. If you are producing with no acks, this will just be
	// the offset used in the produce request and does not mirror the
	// offset actually stored within Kafka.
	Offset int64
}

Record is a record to write to Kafka.

type RecordAttrs

type RecordAttrs struct {
	// contains filtered or unexported fields
}

RecordAttrs contains additional meta information about a record, such as its compression or timestamp type.

func (RecordAttrs) CompressionType

func (a RecordAttrs) CompressionType() uint8

CompressionType signifies with which algorithm this record was compressed.

0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is zstd.

func (RecordAttrs) IsControl

func (a RecordAttrs) IsControl() bool

IsControl returns whether a record is a "control" record (ABORT or COMMIT). These are generally not visible unless explicitly opted into.

func (RecordAttrs) IsTransactional

func (a RecordAttrs) IsTransactional() bool

IsTransactional returns whether a record is a part of a transaction.

func (RecordAttrs) TimestampType

func (a RecordAttrs) TimestampType() int8

TimestampType specifies how Timestamp was determined.

The default, 0, means that the timestamp was determined in a client when the record was produced.

An alternative is 1, which is when the Timestamp is set in Kafka.

Records pre 0.10.0 did not have timestamps and have value -1.

type RecordHeader

type RecordHeader struct {
	Key   string
	Value []byte
}

RecordHeader contains extra information that can be sent with Records.

type ResponseShard added in v0.6.2

type ResponseShard struct {
	// Meta contains the broker that this request was issued to, or an
	// unknown (node ID -1) metadata if the request could not be issued.
	//
	// Requests can fail to even be issued if an appropriate broker cannot
	// be loaded of if the client cannot understand the request.
	Meta BrokerMetadata

	// Req is the request that was issued to this broker.
	Req kmsg.Request

	// Resp is the response received from the broker, if any.
	Resp kmsg.Response

	// Err, if non-nil, is the error that prevented a response from being
	// received or the request from being issued.
	Err error
}

ResponseShard ties together a request with either the response it received or an error that prevented a response from being received.

type TopicPartitioner

type TopicPartitioner interface {
	// OnNewBatch is called when producing a record if that record would
	// trigger a new batch on its current partition.
	OnNewBatch()
	// RequiresConsistency returns true if a record must hash to the same
	// partition even if a partition is down.
	// If true, a record may hash to a partition that cannot be written to
	// and will error until the partition comes back.
	RequiresConsistency(*Record) bool
	// Partition determines, among a set of n partitions, which index should
	// be chosen to use for the partition for r.
	Partition(r *Record, n int) int
}

TopicPartitioner partitions records in an individual topic.

type TransactionEndTry

type TransactionEndTry bool

TransactionEndTry is simply a named bool.

const (
	// TryAbort attempts to end a transaction with an abort.
	TryAbort TransactionEndTry = false

	// TryCommit attempts to end a transaction with a commit.
	TryCommit TransactionEndTry = true
)

Directories

Path Synopsis
internal
sticky
Package sticky provides sticky partitioning strategy for Kafka, with a complete overhaul to be faster, more understandable, and optimal.
Package sticky provides sticky partitioning strategy for Kafka, with a complete overhaul to be faster, more understandable, and optimal.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL