Documentation ¶
Overview ¶
Package kgo provides a pure Go efficient Kafka client for Kafka 0.8.0+ with support for transactions, regex topic consuming, the latest partition strategies, and more. This client supports all client related KIPs.
This client aims to be simple to use while still interacting with Kafka in a near ideal way. For more overview of the entire client itself, please see the README on the project's Github page.
Index ¶
- Variables
- func ParseConsumerSyncAssignment(assignment []byte) (map[string][]int32, error)
- type Acks
- type BalancePlan
- func (p *BalancePlan) AddPartition(member *kmsg.JoinGroupResponseMember, topic string, partition int32)
- func (p *BalancePlan) AddPartitions(member *kmsg.JoinGroupResponseMember, topic string, partitions []int32)
- func (p *BalancePlan) AdjustCooperative(b *ConsumerBalancer)
- func (p *BalancePlan) IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment
- func (p *BalancePlan) String() string
- type Broker
- type BrokerE2E
- type BrokerMetadata
- type Client
- func (cl *Client) AbortBufferedRecords(ctx context.Context) error
- func (cl *Client) BeginTransaction() error
- func (cl *Client) Broker(id int) *Broker
- func (cl *Client) BufferedFetchRecords() int64
- func (cl *Client) BufferedProduceRecords() int64
- func (cl *Client) Close()
- func (cl *Client) CommitOffsets(ctx context.Context, uncommitted map[string]map[int32]EpochOffset, ...)
- func (cl *Client) CommitOffsetsSync(ctx context.Context, uncommitted map[string]map[int32]EpochOffset, ...)
- func (cl *Client) CommitRecords(ctx context.Context, rs ...*Record) error
- func (cl *Client) CommitUncommittedOffsets(ctx context.Context) error
- func (cl *Client) CommittedOffsets() map[string]map[int32]EpochOffset
- func (cl *Client) DiscoveredBrokers() []*Broker
- func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) error
- func (cl *Client) Flush(ctx context.Context) error
- func (cl *Client) ForceRebalance()
- func (cl *Client) LeaveGroup()
- func (cl *Client) MarkCommitRecords(rs ...*Record)
- func (cl *Client) PauseFetchPartitions(topicPartitions map[string][]int32) map[string][]int32
- func (cl *Client) PauseFetchTopics(topics ...string) []string
- func (cl *Client) PollFetches(ctx context.Context) Fetches
- func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches
- func (cl *Client) Produce(ctx context.Context, r *Record, promise func(*Record, error))
- func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults
- func (cl *Client) Request(ctx context.Context, req kmsg.Request) (kmsg.Response, error)
- func (cl *Client) RequestSharded(ctx context.Context, req kmsg.Request) []ResponseShard
- func (cl *Client) ResumeFetchPartitions(topicPartitions map[string][]int32)
- func (cl *Client) ResumeFetchTopics(topics ...string)
- func (cl *Client) SeedBrokers() []*Broker
- func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset)
- func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset
- type CompressionCodec
- type ConsumerBalancer
- func (b *ConsumerBalancer) Balance(topics map[string]int32) IntoSyncAssignment
- func (b *ConsumerBalancer) EachMember(fn func(member *kmsg.JoinGroupResponseMember, meta *kmsg.GroupMemberMetadata))
- func (b *ConsumerBalancer) MemberAt(n int) (*kmsg.JoinGroupResponseMember, *kmsg.GroupMemberMetadata)
- func (b *ConsumerBalancer) MemberTopics() map[string]struct{}
- func (b *ConsumerBalancer) Members() []kmsg.JoinGroupResponseMember
- func (b *ConsumerBalancer) NewPlan() *BalancePlan
- type ConsumerBalancerBalance
- type ConsumerOpt
- func ConsumePartitions(partitions map[string]map[int32]Offset) ConsumerOpt
- func ConsumeRegex() ConsumerOpt
- func ConsumeResetOffset(offset Offset) ConsumerOpt
- func ConsumeTopics(topics ...string) ConsumerOpt
- func FetchIsolationLevel(level IsolationLevel) ConsumerOpt
- func FetchMaxBytes(b int32) ConsumerOpt
- func FetchMaxPartitionBytes(b int32) ConsumerOpt
- func FetchMaxWait(wait time.Duration) ConsumerOpt
- func FetchMinBytes(b int32) ConsumerOpt
- func KeepControlRecords() ConsumerOpt
- func MaxConcurrentFetches(n int) ConsumerOpt
- func Rack(rack string) ConsumerOpt
- type EpochOffset
- type ErrDataLoss
- type Fetch
- type FetchBatchMetrics
- type FetchError
- type FetchPartition
- type FetchTopic
- type FetchTopicPartition
- type Fetches
- func (fs Fetches) EachError(fn func(string, int32, error))
- func (fs Fetches) EachPartition(fn func(FetchTopicPartition))
- func (fs Fetches) EachRecord(fn func(*Record))
- func (fs Fetches) EachTopic(fn func(FetchTopic))
- func (fs Fetches) Errors() []FetchError
- func (fs Fetches) IsClientClosed() bool
- func (fs Fetches) RecordIter() *FetchesRecordIter
- func (fs Fetches) Records() []*Record
- type FetchesRecordIter
- type FirstErrPromise
- type GroupBalancer
- type GroupMemberBalancer
- type GroupOpt
- func AutoCommitCallback(fn func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)) GroupOpt
- func AutoCommitInterval(interval time.Duration) GroupOpt
- func AutoCommitMarks() GroupOpt
- func Balancers(balancers ...GroupBalancer) GroupOpt
- func ConsumerGroup(group string) GroupOpt
- func DisableAutoCommit() GroupOpt
- func GreedyAutoCommit() GroupOpt
- func GroupProtocol(protocol string) GroupOpt
- func HeartbeatInterval(interval time.Duration) GroupOpt
- func InstanceID(id string) GroupOpt
- func OnPartitionsAssigned(onAssigned func(context.Context, *Client, map[string][]int32)) GroupOpt
- func OnPartitionsLost(onLost func(context.Context, *Client, map[string][]int32)) GroupOpt
- func OnPartitionsRevoked(onRevoked func(context.Context, *Client, map[string][]int32)) GroupOpt
- func RebalanceTimeout(timeout time.Duration) GroupOpt
- func RequireStableFetchOffsets() GroupOpt
- func SessionTimeout(timeout time.Duration) GroupOpt
- type GroupTransactSession
- func (s *GroupTransactSession) Begin() error
- func (s *GroupTransactSession) Client() *Client
- func (s *GroupTransactSession) Close()
- func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry) (bool, error)
- func (s *GroupTransactSession) PollFetches(ctx context.Context) Fetches
- func (s *GroupTransactSession) PollRecords(ctx context.Context, maxPollRecords int) Fetches
- func (s *GroupTransactSession) Produce(ctx context.Context, r *Record, promise func(*Record, error))
- func (s *GroupTransactSession) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults
- type Hook
- type HookBrokerConnect
- type HookBrokerDisconnect
- type HookBrokerE2E
- type HookBrokerRead
- type HookBrokerThrottle
- type HookBrokerWrite
- type HookFetchBatchRead
- type HookFetchRecordBuffered
- type HookFetchRecordUnbuffered
- type HookGroupManageError
- type HookNewClient
- type HookProduceBatchWritten
- type HookProduceRecordBuffered
- type HookProduceRecordUnbuffered
- type IntoSyncAssignment
- type IsolationLevel
- type LogLevel
- type Logger
- type Offset
- type Opt
- func AllowAutoTopicCreation() Opt
- func BrokerMaxReadBytes(v int32) Opt
- func BrokerMaxWriteBytes(v int32) Opt
- func ClientID(id string) Opt
- func ConnIdleTimeout(timeout time.Duration) Opt
- func DialTLSConfig(c *tls.Config) Opt
- func Dialer(fn func(ctx context.Context, network, host string) (net.Conn, error)) Opt
- func MaxVersions(versions *kversion.Versions) Opt
- func MetadataMaxAge(age time.Duration) Opt
- func MetadataMinAge(age time.Duration) Opt
- func MinVersions(versions *kversion.Versions) Opt
- func RequestRetries(n int) Opt
- func RequestTimeoutOverhead(overhead time.Duration) Opt
- func RetryBackoffFn(backoff func(int) time.Duration) Opt
- func RetryTimeout(t time.Duration) Opt
- func RetryTimeoutFn(t func(int16) time.Duration) Opt
- func SASL(sasls ...sasl.Mechanism) Opt
- func SeedBrokers(seeds ...string) Opt
- func SoftwareNameAndVersion(name, version string) Opt
- func WithHooks(hooks ...Hook) Opt
- func WithLogger(l Logger) Opt
- type Partitioner
- func BasicConsistentPartitioner(partition func(string) func(r *Record, n int) int) Partitioner
- func LeastBackupPartitioner() Partitioner
- func ManualPartitioner() Partitioner
- func RoundRobinPartitioner() Partitioner
- func StickyKeyPartitioner(overrideHasher PartitionerHasher) Partitioner
- func StickyPartitioner() Partitioner
- type PartitionerHasher
- type ProduceBatchMetrics
- type ProduceResult
- type ProduceResults
- type ProducerOpt
- func DefaultProduceTopic(t string) ProducerOpt
- func DisableIdempotentWrite() ProducerOpt
- func ManualFlushing() ProducerOpt
- func MaxBufferedRecords(n int) ProducerOpt
- func ProduceRequestTimeout(limit time.Duration) ProducerOpt
- func ProducerBatchCompression(preference ...CompressionCodec) ProducerOpt
- func ProducerBatchMaxBytes(v int32) ProducerOpt
- func ProducerLinger(linger time.Duration) ProducerOpt
- func ProducerOnDataLossDetected(fn func(string, int32)) ProducerOpt
- func RecordDeliveryTimeout(timeout time.Duration) ProducerOpt
- func RecordPartitioner(partitioner Partitioner) ProducerOpt
- func RecordRetries(n int) ProducerOpt
- func RequiredAcks(acks Acks) ProducerOpt
- func StopProducerOnDataLossDetected() ProducerOpt
- func TransactionTimeout(timeout time.Duration) ProducerOpt
- func TransactionalID(id string) ProducerOpt
- type Record
- type RecordAttrs
- type RecordHeader
- type ResponseShard
- type TopicBackupIter
- type TopicBackupPartitioner
- type TopicPartitioner
- type TopicPartitionerOnNewBatch
- type TransactionEndTry
Constants ¶
This section is empty.
Variables ¶
var ( // ErrRecordTimeout is passed to produce promises when records are // unable to be produced within the RecordDeliveryTimeout. ErrRecordTimeout = errors.New("records have timed out before they were able to be produced") // ErrRecordRetries is passed to produce promises when records are // unable to be produced after RecordRetries attempts. ErrRecordRetries = errors.New("record failed after being retried too many times") // ErrMaxBuffered is returned when producing with manual flushing // enabled and the maximum amount of records are buffered. ErrMaxBuffered = errors.New("manual flushing is enabled and the maximum amount of records are buffered, cannot buffer more") // ErrAborting is returned for all buffered records while // AbortBufferedRecords is being called. ErrAborting = errors.New("client is aborting buffered records") // ErrClientClosed is returned in various places when the client's // Close function has been called. // // For producing, records are failed with this error. // // For consuming, a fake partition is injected into a poll response // that has this error. // // For any request, the request is failed with this error. ErrClientClosed = errors.New("client closed") )
Functions ¶
Types ¶
type Acks ¶
type Acks struct {
// contains filtered or unexported fields
}
Acks represents the number of acks a broker leader must have before a produce request is considered complete.
This controls the durability of written records and corresponds to "acks" in Kafka's Producer Configuration documentation.
The default is LeaderAck.
func AllISRAcks ¶
func AllISRAcks() Acks
AllISRAcks ensures that all in-sync replicas have acknowledged they wrote a record before the leader replies success.
type BalancePlan ¶ added in v0.7.0
type BalancePlan struct {
// contains filtered or unexported fields
}
BalancePlan is a helper type to build the result of balancing topics and partitions among group members.
func (*BalancePlan) AddPartition ¶ added in v0.7.0
func (p *BalancePlan) AddPartition(member *kmsg.JoinGroupResponseMember, topic string, partition int32)
AddPartition assigns a partition for the topic to a given member.
func (*BalancePlan) AddPartitions ¶ added in v0.7.0
func (p *BalancePlan) AddPartitions(member *kmsg.JoinGroupResponseMember, topic string, partitions []int32)
AddPartitions assigns many partitions for a topic to a given member.
func (*BalancePlan) AdjustCooperative ¶ added in v0.7.0
func (p *BalancePlan) AdjustCooperative(b *ConsumerBalancer)
AdjustCooperative performs the final adjustment to a plan for cooperative balancing.
Over the plan, we remove all partitions that migrated from one member (where it was assigned) to a new member (where it is now planned).
This allows members that had partitions removed to revoke and rejoin, which will then do another rebalance, and in that new rebalance, the planned partitions are now on the free list to be assigned.
func (*BalancePlan) IntoSyncAssignment ¶ added in v0.7.0
func (p *BalancePlan) IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment
IntoSyncAssignment satisfies the IntoSyncAssignment interface.
func (*BalancePlan) String ¶ added in v0.7.0
func (p *BalancePlan) String() string
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker pairs a broker ID with a client to directly issue requests to a specific broker.
func (*Broker) Request ¶
Request issues a request to a broker. If the broker does not exist in the client, this returns an unknown broker error. Requests are not retried.
The passed context can be used to cancel a request and return early. Note that if the request is not canceled before it is written to Kafka, you may just end up canceling and not receiving the response to what Kafka inevitably does.
It is more beneficial to always use RetriableRequest.
func (*Broker) RetriableRequest ¶ added in v0.6.2
RetriableRequest issues a request to a broker the same as Broker, but retries in the face of retriable broker connection errors. This does not retry on response internal errors.
type BrokerE2E ¶ added in v0.8.0
type BrokerE2E struct { // BytesWritten is the number of bytes written for this request. // // This may not be the whole request if there was an error while writing. BytesWritten int // BytesRead is the number of bytes read for this requests's response. // // This may not be the whole response if there was an error while // reading, and this will be zero if there was a write error. BytesRead int // WriteWait is the time spent waiting from when this request was // generated internally in the client to just before the request is // written to the connection. This number is not included in the // DurationE2E method. WriteWait time.Duration // TimeToWrite is how long a request took to be written on the wire. // This specifically tracks only how long conn.Write takes. TimeToWrite time.Duration // ReadWait tracks the span of time immediately following conn.Write // until conn.Read begins. ReadWait time.Duration // TimeToRead tracks how long conn.Read takes for this request to be // entirely read. This includes the time it takes to allocate a buffer // for the response after the initial four size bytes are read. TimeToRead time.Duration // WriteErr is any error encountered during writing. If a write error is // encountered, no read will be attempted. WriteErr error // ReadErr is any error encountered during reading. ReadErr error }
BrokerE2E tracks complete information for a write of a request followed by a read of that requests's response.
Note that if this is for a produce request with no acks, there will be no read wait / time to read.
func (*BrokerE2E) DurationE2E ¶ added in v0.8.0
DurationE2E returns the e2e time from the start of when a request is written to the end of when the response for that request was fully read. If a write or read error occurs, this hook is called with all information possible at the time (e.g., if a write error occurs, all write info is specified).
Kerberos SASL does not cause this hook, since it directly reads from the connection.
type BrokerMetadata ¶
type BrokerMetadata struct { // NodeID is the broker node ID. // // Seed brokers will have very negative IDs; kgo does not try to map // seed brokers to loaded brokers. NodeID int32 // Port is the port of the broker. Port int32 // Host is the hostname of the broker. Host string // Rack is an optional rack of the broker. It is invalid to modify this // field. // // Seed brokers will not have a rack. Rack *string // contains filtered or unexported fields }
BrokerMetadata is metadata for a broker.
This struct mirrors kmsg.MetadataResponseBroker.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client issues requests and handles responses to a Kafka cluster.
func NewClient ¶
NewClient returns a new Kafka client with the given options or an error if the options are invalid. Connections to brokers are lazily created only when requests are written to them.
By default, the client uses the latest stable request versions when talking to Kafka. If you use a broker older than 0.10.0, then you need to manually set a MaxVersions option. Otherwise, there is usually no harm in defaulting to the latest API versions, although occasionally Kafka introduces new required parameters that do not have zero value defaults.
NewClient also launches a goroutine which periodically updates the cached topic metadata.
func (*Client) AbortBufferedRecords ¶
AbortBufferedRecords fails all unflushed records with ErrAborted and waits for there to be no buffered records.
This accepts a context to quit the wait early, but it is strongly recommended to always wait for all records to be flushed. Waits should not occur. The only case where this function returns an error is if the context is canceled while flushing.
The intent of this function is to provide a way to clear the client's production backlog. For example, before aborting a transaction and beginning a new one, it would be erroneous to not wait for the backlog to clear before beginning a new transaction. Anything not cleared may be a part of the new transaction.
Records produced during or after a call to this function may not be failed, thus it is incorrect to concurrently produce with this function.
This function is safe to call multiple times concurrently, and safe to call concurrent with Flush.
func (*Client) BeginTransaction ¶
BeginTransaction sets the client to a transactional state, erroring if there is no transactional ID, or if the producer is currently in a fatal (unrecoverable) state, or if the client is already in a transaction.
This must not be called concurrently with other client functions.
func (*Client) Broker ¶
Broker returns a handle to a specific broker to directly issue requests to. Note that there is no guarantee that this broker exists; if it does not, requests will fail with with an unknown broker error.
func (*Client) BufferedFetchRecords ¶ added in v0.8.7
BufferedFetchRecords returns the number of records currently buffered from fetching within the client.
This can be used as a gauge to determine how behind your application is for processing records the client has fetched. Note that it is perfectly normal to see a spike of buffered records, which would correspond to a fetch response being processed just before a call to this function. It is only problematic if for you if this function is consistently returning large values.
func (*Client) BufferedProduceRecords ¶ added in v0.8.7
BufferedProduceRecords returns the number of records currently buffered for producing within the client.
This can be used as a gauge to determine how far behind the client is for flushing records produced by your client (which can help determine network / cluster health).
func (*Client) Close ¶
func (cl *Client) Close()
Close leaves any group and closes all connections and goroutines.
If you are group consuming and have overridden the default OnRevoked, you must manually commit offsets before closing the client.
func (*Client) CommitOffsets ¶
func (cl *Client) CommitOffsets( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error), )
CommitOffsets commits the given offsets for a group, calling onDone with the commit request and either the response or an error if the response was not issued. If uncommitted is empty or the client is not consuming as a group, onDone is called with (nil, nil, nil) and this function returns immediately. It is OK if onDone is nil, but you will not know if your commit succeeded.
This is an advanced function and is difficult to use correctly. For simpler, more easily understandable committing, see CommitRecords and CommitUncommittedOffsets.
This function itself does not wait for the commit to finish. By default, this function is an asynchronous commit. You can use onDone to make it sync. If autocommitting is enabled, this function blocks autocommitting until this function is complete and the onDone has returned.
It is invalid to use this function to commit offsets for a transaction.
Note that this function ensures absolute ordering of commit requests by canceling prior requests and ensuring they are done before executing a new one. This means, for absolute control, you can use this function to periodically commit async and then issue a final sync commit before quitting (this is the behavior of autocommiting and using the default revoke). This differs from the Java async commit, which does not retry requests to avoid trampling on future commits.
It is highly recommended to check the response's partition's error codes if the response is non-nil. While unlikely, individual partitions can error. This is most likely to happen if a commit occurs too late in a rebalance event.
Do not use this async CommitOffsets in OnRevoked, instead use CommitOffsetsSync. If you commit async, the rebalance will proceed before this function executes, and you will commit offsets for partitions that have moved to a different consumer.
func (*Client) CommitOffsetsSync ¶ added in v0.8.1
func (cl *Client) CommitOffsetsSync( ctx context.Context, uncommitted map[string]map[int32]EpochOffset, onDone func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error), )
CommitOffsetsSync cancels any active CommitOffsets, begins a commit that cannot be canceled, and waits for that commit to complete. This function will not return until the commit is done and the onDone callback is complete.
The purpose of this function is for use in OnRevoke or committing before leaving a group, because you do not want to have a commit issued in OnRevoked canceled.
This is an advanced function, and for simpler, more easily understandable committing, see CommitRecords and CommitUncommittedOffsets.
For more information about committing and committing asynchronously, see CommitOffsets.
func (*Client) CommitRecords ¶ added in v0.8.1
CommitRecords issues a synchronous offset commit for the offsets contained within rs. Retriable errors are retried up to the configured retry limit, and any unretriable error is returned.
This function is useful as a simple way to commit offsets if you have disabled autocommitting. As an alternative if you always want to commit everything, see CommitUncommittedOffsets.
Simple usage of this function may lead to duplicate records if a consumer group rebalance occurs before or while this function is being executed. You can avoid this scenario by calling CommitRecords in a custom OnRevoked, but for most workloads, a small bit of potential duplicate processing is fine. See the documentation on DisableAutoCommit for more details.
It is recommended to always commit records in order (per partition). If you call this function twice with record for partition 0 at offset 999 initially, and then with record for partition 0 at offset 4, you will rewind your commit.
A use case for this function may be to partially process a batch of records, commit, and then continue to process the rest of the records. It is not recommended to call this for every record processed in a high throughput scenario, because you do not want to unnecessarily increase load on Kafka.
If you do not want to wait for this function to complete before continuing processing records, you can call this function in a goroutine.
func (*Client) CommitUncommittedOffsets ¶ added in v0.8.1
CommitUncommittedOffsets issues a synchronous offset commit for any partition that has been consumed from that has uncommitted offsets. Retriable errors are retried up to the configured retry limit, and any unretriable error is returned.
This function is useful as a simple way to commit offsets if you have disabled autocommitting. As an alternative if you want to commit specific records, see CommitRecords.
Simple usage of this function may lead to duplicate records if a consumer group rebalance occurs before or while this function is being executed. You can avoid this scenario by calling CommitRecords in a custom OnRevoked, but for most workloads, a small bit of potential duplicate processing is fine. See the documentation on DisableAutoCommit for more details.
The recommended pattern for using this function is to have a poll / process / commit loop. First PollFetches, then process every record, then call CommitUncommittedOffsets.
If you do not want to wait for this function to complete before continuing processing records, you can call this function in a goroutine.
func (*Client) CommittedOffsets ¶
func (cl *Client) CommittedOffsets() map[string]map[int32]EpochOffset
CommittedOffsets returns the latest committed offsets. Committed offsets are updated from commits or from joining a group and fetching offsets.
If there are no committed offsets, this returns nil.
func (*Client) DiscoveredBrokers ¶
DiscoveredBrokers returns all brokers that were discovered from prior metadata responses. This does not actually issue a metadata request to load brokers; if you wish to ensure this returns all brokers, be sure to manually issue a metadata request before this. This also does not include seed brokers, which are internally saved under special internal broker IDs (but, it does include those brokers under their normal IDs as returned from a metadata response).
func (*Client) EndTransaction ¶
func (cl *Client) EndTransaction(ctx context.Context, commit TransactionEndTry) error
EndTransaction ends a transaction and resets the client's internal state to not be in a transaction.
Flush and CommitOffsetsForTransaction must be called before this function; this function does not flush and does not itself ensure that all buffered records are flushed. If no record yet has caused a partition to be added to the transaction, this function does nothing and returns nil. Alternatively, AbortBufferedRecords should be called before aborting a transaction to ensure that any buffered records not yet flushed will not be a part of a new transaction.
If the producer ID has an error and you are trying to commit, this will return with kerr.OperationNotAttempted. If this happend, retry EndTransaction with TryAbort. Not other error is retriable, and you should not retry with TryAbort.
If records failed with UnknownProducerID and your Kafka version is at least 2.5.0, then aborting here will potentially allow the client to recover for more production.
Note that canceling the context will likely leave the client in an undesirable state, because canceling the context may cancel the in-flight EndTransaction request, making it impossible to know whether the commit or abort was successful. It is recommended to not cancel the context.
func (*Client) Flush ¶
Flush hangs waiting for all buffered records to be flushed, stopping all lingers if necessary.
If the context finishes (Done), this returns the context's error.
This function is safe to call multiple times concurrently, and safe to call concurrent with Flush.
func (*Client) ForceRebalance ¶ added in v0.7.0
func (cl *Client) ForceRebalance()
ForceRebalance quits a group member's heartbeat loop so that the member rejoins with a JoinGroupRequest.
This function is only useful if you either (a) know that the group member is a leader, and want to force a rebalance for any particular reason, or (b) are using a custom group balancer, and have changed the metadata that will be returned from its JoinGroupMetadata method. This function has no other use; see KIP-568 for more details around this function's motivation.
If neither of the cases above are true (this member is not a leader, and the join group metadata has not changed), then Kafka will not actually trigger a rebalance and will instead reply to the member with its current assignment.
func (*Client) LeaveGroup ¶ added in v0.6.4
func (cl *Client) LeaveGroup()
LeaveGroup leaves a group if in one. Calling the client's Close function also leaves a group, so this is only necessary to call if you plan to leave the group and continue using the client.
If you have overridden the default revoke, you must manually commit offsets before leaving the group.
If you have configured the group with an InstanceID, this does not leave the group. With instance IDs, it is expected that clients will restart and re-use the same instance ID. To leave a group using an instance ID, you must manually issue a kmsg.LeaveGroupRequest or use an external tool (kafka scripts or kcl).
func (*Client) MarkCommitRecords ¶ added in v0.10.3
MarkCommitRecords marks records to be available for autocommitting. This function is only useful if you use the AutoCommitMarks config option, see the documentation on that option for more details.
func (*Client) PauseFetchPartitions ¶ added in v0.11.1
PauseFetchPartitions sets the client to no longer fetch the given partitions and returns all currently paused partitions. Paused partitions persist until resumed. You can call this function with no partitions to simply receive the list of currently paused partitions.
In contrast to the canonical Java client, this function does not clear anything currently buffered. Buffered fetches containing paused partitions are still returned from polling.
Pausing individual partitions is independent from pausing topics with the PauseFetchTopics method. If you pause partitions for a topic with PauseFetchPartitions, and then pause that same topic with PauseFetchTopics, the individually paused partitions will not be unpaused if you only call ResumeFetchTopics.
func (*Client) PauseFetchTopics ¶ added in v0.11.1
PauseFetchTopics sets the client to no longer fetch the given topics and returns all currently paused topics. Paused topics persist until resumed. You can call this function with no topics to simply receive the list of currently paused topics.
In contrast to the canonical Java client, this function does not clear anything currently buffered. Buffered fetches containing paused topics are still returned from polling.
Pausing topics is independent from pausing individual partitions with the PauseFetchPartitions method. If you pause partitions for a topic with PauseFetchPartitions, and then pause that same topic with PauseFetchTopics, the individually paused partitions will not be unpaused if you only call ResumeFetchTopics.
func (*Client) PollFetches ¶
PollFetches waits for fetches to be available, returning as soon as any broker returns a fetch. If the context quits, this function quits. If the context is nil or is already canceled, this function will return immediately with any currently buffered records.
It is important to check all partition errors in the returned fetches. If any partition has a fatal error and actually had no records, fake fetch will be injected with the error.
If the client is closing or has closed, a fake fetch will be injected that has no topic, a partition of 0, and a partition error of ErrClientClosed. This can be used to detect if the client is closing and to break out of a poll loop.
func (*Client) PollRecords ¶ added in v0.6.10
PollRecords waits for records to be available, returning as soon as any broker returns records in a fetch. If the context quits, this function quits. If the context is nil or is already canceled, this function will return immediately with any currently buffered records.
This returns a maximum of maxPollRecords total across all fetches, or returns all buffered records if maxPollRecords is <= 0.
It is important to check all partition errors in the returned fetches. If any partition has a fatal error and actually had no records, fake fetch will be injected with the error.
If the client is closing or has closed, a fake fetch will be injected that has no topic, a partition of 0, and a partition error of ErrClientClosed. This can be used to detect if the client is closing and to break out of a poll loop.
func (*Client) Produce ¶
Produce sends a Kafka record to the topic in the record's Topic field, calling promise with the record or an error when Kafka replies. For a synchronous produce, see ProduceSync.
The promise is optional, but not using it means you will not know if Kafka recorded a record properly. Records are produced in per-partition order and promises are called in per-partition order if the record is buffered to a topic that has loaded successfully. Topics that fail loading, or records that cannot be buffered, may not have their promises called in order. Promises may be called concurrently.
If a record is produced successfully, the record's attrs / offset / etc. fields are updated appropriately before a promise is called.
If the record has an empty Topic field, the client will use a default topic if the client was configured with one via ProduceTopic, otherwise the record will be failed immediately. The Partition field is ignored (setting it does not set which partition will be produced to), but, because the field is set only when finishing a record successfully, you can set the Partition field yourself and use the ManualPartitioner to obey the Partition field.
If the record is too large to fit in a batch on its own in a produce request, the promise will be called with kerr.MessageTooLarge and there will be no attempt to produce the record.
The context is used if the client currently has the max amount of buffered records. If so, the client waits for some records to complete or for the context or client to quit. If the context / client quits, the promise is called with ctx.Err().
The context is also used on a per-partition basis to abort buffered records. If the context is done for the first record buffered in a partition, and if it is valid to abort records (i.e., we can avoid invalid sequence numbers), then all buffered records for a partition are aborted. The context checked for doneness is always the first buffered record's context. The context is evaluated before or after writing a request.
The first buffered record for an unknown topic begins a timeout for the configured record timeout limit; all records buffered within the wait will expire with the same timeout if the topic does not load in time. For simplicity, any time spent waiting for the topic to load is not persisted through once the topic loads, meaning the record may further wait once buffered. This may be changed in the future if necessary, however, the only reason for a topic to not load promptly is if it does not exist.
If manual flushing is configured and there are already MaxBufferedRecords buffered, the promise is immediately called with ErrMaxBuffered.
If the client is transactional and a transaction has not been begun, the promise is immediately called with an error corresponding to not being in a transaction.
func (*Client) ProduceSync ¶ added in v0.7.0
func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults
ProduceSync is a synchronous produce. Please see the Produce documentation for an in depth description of how producing works.
This function produces all records in one range loop and waits for them all to be produced before returning.
func (*Client) Request ¶
Request issues a request to Kafka, waiting for and returning the response. If a retriable network error occurs, or if a retriable group / transaction coordinator error occurs, the request is retried. All other errors are returned.
If the request is an admin request, this will issue it to the Kafka controller. If the controller ID is unknown, this will attempt to fetch it. If the fetch errors, this will return an unknown controller error.
If the request is a group or transaction coordinator request, this will issue the request to the appropriate group or transaction coordinator.
For transaction requests, the request is issued to the transaction coordinator. However, if the request is an init producer ID request and the request has no transactional ID, the request goes to any broker.
Some requests need to be split and sent to many brokers. For these requests, it is *highly* recommended to use RequestSharded. Not all responses from many brokers can be cleanly merged. However, for the requests that are split, this does attempt to merge them in a sane way.
The following requests are split:
ListOffsets OffsetFetch (if using v8+ for Kafka 3.0+) DescribeGroups ListGroups DeleteRecords OffsetForLeaderEpoch DescribeConfigs AlterConfigs AlterReplicaLogDirs DescribeLogDirs DeleteGroups IncrementalAlterConfigs DescribeProducers DescribeTransactions ListTransactions
Kafka 3.0 introduced batch OffsetFetch and batch FindCoordinator requests. This function is forward-compatible for the old, singular OffsetFetch and FindCoordinator requests, but is not backward-compatible for batched requests. It is recommended to only use the old format unless you know you are speaking to Kafka 3.0+.
In short, this method tries to do the correct thing depending on what type of request is being issued.
The passed context can be used to cancel a request and return early. Note that if the request was written to Kafka but the context canceled before a response is received, Kafka may still operate on the received request.
If using this function to issue kmsg.ProduceRequest's, you must configure the client with the same RequiredAcks option that you use in the request. If you are issuing produce requests with 0 acks, you must configure the client with the same timeout you use in the request. The client will internally rewrite the incoming request's acks to match the client's configuration, and it will rewrite the timeout millis if the acks is 0. It is strongly recommended to not issue raw kmsg.ProduceRequest's.
func (*Client) RequestSharded ¶ added in v0.6.2
RequestSharded performs the same logic as Request, but returns all responses from any broker that the request was split to. This always returns at least one shard. If the request does not need to be issued (describing no groups), this issues the request to a random broker just to ensure that one shard exists.
There are only a few requests that are strongly recommended to explicitly use RequestSharded; the rest can by default use Request. These few requests are mentioned in the documentation for Request.
If, in the process of splitting a request, some topics or partitions are found to not exist, or Kafka replies that a request should go to a broker that does not exist, all those non-existent pieces are grouped into one request to the first seed broker. This will show up as a seed broker node ID (min int32) and the response will likely contain purely errors.
The response shards are ordered by broker metadata.
func (*Client) ResumeFetchPartitions ¶ added in v0.11.1
ResumeFetchPartitions resumes fetching the input partitions if they were previously paused. Resuming partitions that are not currently paused is a per-topic no-op. See the documentation on PauseFetchPartitions for more details.
func (*Client) ResumeFetchTopics ¶ added in v0.11.1
ResumeFetchTopics resumes fetching the input topics if they were previously paused. Resuming topics that are not currently paused is a per-topic no-op. See the documentation on PauseTfetchTopics for more details.
func (*Client) SeedBrokers ¶
SeedBrokers returns the all seed brokers.
func (*Client) SetOffsets ¶
func (cl *Client) SetOffsets(setOffsets map[string]map[int32]EpochOffset)
SetOffsets, for consumer groups, sets any matching offsets in setOffsets to the given epoch/offset. Partitions that are not specified are not set. It is invalid to set topics that were not yet returned from a PollFetches.
If using transactions, it is advised to just use a GroupTransactSession and avoid this function entirely.
It is strongly recommended to use this function outside of the context of a PollFetches loop and only when you know the group is not revoked (i.e., block any concurrent revoke while issuing this call). Any other usage is prone to odd interactions.
func (*Client) UncommittedOffsets ¶
func (cl *Client) UncommittedOffsets() map[string]map[int32]EpochOffset
UncommittedOffsets returns the latest uncommitted offsets. Uncommitted offsets are always updated on calls to PollFetches.
If there are no uncommitted offsets, this returns nil.
Note that, if manually committing, you should be careful with committing during group rebalances. You must ensure you commit before the group's session timeout is reached, otherwise this client will be kicked from the group and the commit will fail.
If using a cooperative balancer, commits while consuming during rebalancing may fail with REBALANCE_IN_PROGRESS.
type CompressionCodec ¶
type CompressionCodec struct {
// contains filtered or unexported fields
}
CompressionCodec configures how records are compressed before being sent.
Records are compressed within individual topics and partitions, inside of a RecordBatch. All records in a RecordBatch are compressed into one record for that batch.
func GzipCompression ¶
func GzipCompression() CompressionCodec
GzipCompression enables gzip compression with the default compression level.
func Lz4Compression ¶
func Lz4Compression() CompressionCodec
Lz4Compression enables lz4 compression with the fastest compression level.
func NoCompression ¶
func NoCompression() CompressionCodec
NoCompression is a compression option that avoids compression. This can always be used as a fallback compression.
func SnappyCompression ¶
func SnappyCompression() CompressionCodec
SnappyCompression enables snappy compression.
func ZstdCompression ¶
func ZstdCompression() CompressionCodec
ZstdCompression enables zstd compression with the default compression level.
func (CompressionCodec) WithLevel ¶
func (c CompressionCodec) WithLevel(level int) CompressionCodec
WithLevel changes the compression codec's "level", effectively allowing for higher or lower compression ratios at the expense of CPU speed.
For the zstd package, the level is a typed int; simply convert the type back to an int for this function.
If the level is invalid, compressors just use a default level.
type ConsumerBalancer ¶ added in v0.7.0
type ConsumerBalancer struct {
// contains filtered or unexported fields
}
ConsumerBalancer is a helper type for writing balance plans that use the "consumer" protocol, such that each member uses a kmsg.GroupMemberMetadata in its join group request.
func NewConsumerBalancer ¶ added in v0.7.0
func NewConsumerBalancer(balance ConsumerBalancerBalance, members []kmsg.JoinGroupResponseMember) (*ConsumerBalancer, error)
NewConsumerBalancer parses the each member's metadata as a kmsg.GroupMemberMetadata and returns a ConsumerBalancer to use in balancing.
If any metadata parsing fails, this returns an error.
func (*ConsumerBalancer) Balance ¶ added in v0.7.0
func (b *ConsumerBalancer) Balance(topics map[string]int32) IntoSyncAssignment
Balance satisfies the GroupMemberBalancer interface.
func (*ConsumerBalancer) EachMember ¶ added in v0.7.0
func (b *ConsumerBalancer) EachMember(fn func(member *kmsg.JoinGroupResponseMember, meta *kmsg.GroupMemberMetadata))
EachMember calls fn for each member and its corresponding metadata in the consumer group being balanced.
func (*ConsumerBalancer) MemberAt ¶ added in v0.7.0
func (b *ConsumerBalancer) MemberAt(n int) (*kmsg.JoinGroupResponseMember, *kmsg.GroupMemberMetadata)
MemberAt returns the nth member and its corresponding metadata.
func (*ConsumerBalancer) MemberTopics ¶ added in v0.7.0
func (b *ConsumerBalancer) MemberTopics() map[string]struct{}
MemberTopics returns the unique set of topics that all members are interested in.
This can safely be called if the balancer is nil; if so, this will return nil.
func (*ConsumerBalancer) Members ¶ added in v0.7.0
func (b *ConsumerBalancer) Members() []kmsg.JoinGroupResponseMember
Member returns the list of input members for this group balancer.
func (*ConsumerBalancer) NewPlan ¶ added in v0.7.0
func (b *ConsumerBalancer) NewPlan() *BalancePlan
NewPlan returns a type that can be used to build a balance plan. The return satisfies the IntoSyncAssignment interface.
type ConsumerBalancerBalance ¶ added in v0.7.0
type ConsumerBalancerBalance interface {
Balance(*ConsumerBalancer, map[string]int32) IntoSyncAssignment
}
ConsumerBalancerBalance is what the ConsumerBalancer invokes to balance a group.
This is a complicated interface, but in short, this interface has one function that implements the actual balancing logic: using the input balancer, balance the input topics and partitions.
type ConsumerOpt ¶
type ConsumerOpt interface { Opt // contains filtered or unexported methods }
ConsumerOpt is a consumer specific option to configure a client. This is simply a namespaced Opt.
func ConsumePartitions ¶
func ConsumePartitions(partitions map[string]map[int32]Offset) ConsumerOpt
ConsumePartitions sets partitions to consume from directly and the offsets to start consuming those partitions from.
This option is basically a way to explicitly consume from subsets of partitions in topics, or to consume at exact offsets. Offsets from this option have higher precedence than the ConsumeResetOffset.
This option is not compatible with group consuming and regex consuming.
func ConsumeRegex ¶ added in v0.8.0
func ConsumeRegex() ConsumerOpt
ConsumeRegex sets the client to parse all topics passed to ConsumeTopics as regular expressions.
When consuming via regex, every metadata request loads *all* topics, so that all topics can be passed to any regular expressions. Every topic is evaluated only once ever across all regular expressions; either it permanently is known to match, or is permanently known to not match.
func ConsumeResetOffset ¶
func ConsumeResetOffset(offset Offset) ConsumerOpt
ConsumeResetOffset sets the offset to restart consuming from when a partition has no commits (for groups) or when beginning to consume a partition (for direct partition consuming), or when a fetch sees an OffsetOutOfRange error, overriding the default ConsumeStartOffset.
Defaults to: NewOffset().AtStart() / Earliest Offset
func ConsumeTopics ¶
func ConsumeTopics(topics ...string) ConsumerOpt
ConsumeTopics adds topics to use for consuming.
By default, consuming will start at the beginning of partitions. To change this, use the ConsumeResetOffset option.
func FetchIsolationLevel ¶
func FetchIsolationLevel(level IsolationLevel) ConsumerOpt
FetchIsolationLevel sets the "isolation level" used for fetching records, overriding the default ReadUncommitted.
func FetchMaxBytes ¶
func FetchMaxBytes(b int32) ConsumerOpt
FetchMaxBytes sets the maximum amount of bytes a broker will try to send during a fetch, overriding the default 50MiB. Note that brokers may not obey this limit if it has records larger than this limit. Also note that this client sends a fetch to each broker concurrently, meaning the client will buffer up to <brokers * max bytes> worth of memory.
This corresponds to the Java fetch.max.bytes setting.
If bumping this, consider bumping BrokerMaxReadBytes.
If what you are consuming is compressed, and compressed well, it is strongly recommended to set this option so that decompression does not eat all of your RAM.
func FetchMaxPartitionBytes ¶
func FetchMaxPartitionBytes(b int32) ConsumerOpt
FetchMaxPartitionBytes sets the maximum amount of bytes that will be consumed for a single partition in a fetch request, overriding the default 10MiB. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress.
This corresponds to the Java max.partition.fetch.bytes setting.
func FetchMaxWait ¶
func FetchMaxWait(wait time.Duration) ConsumerOpt
FetchMaxWait sets the maximum amount of time a broker will wait for a fetch response to hit the minimum number of required bytes before returning, overriding the default 5s.
This corresponds to the Java replica.fetch.wait.max.ms setting.
func FetchMinBytes ¶ added in v0.6.2
func FetchMinBytes(b int32) ConsumerOpt
FetchMinBytes sets the minimum amount of bytes a broker will try to send during a fetch, overriding the default 1 byte.
With the default of 1, data is sent as soon as it is available. By bumping this, the broker will try to wait for more data, which may improve server throughput at the expense of added latency.
This corresponds to the Java fetch.min.bytes setting.
func KeepControlRecords ¶
func KeepControlRecords() ConsumerOpt
KeepControlRecords sets the client to keep control messages and return them with fetches, overriding the default that discards them.
Generally, control messages are not useful.
func MaxConcurrentFetches ¶ added in v0.10.3
func MaxConcurrentFetches(n int) ConsumerOpt
MaxConcurrentFetches sets the maximum number of fetch requests to allow in flight or buffered at once, overriding the unbounded (i.e. number of brokers) default.
This setting, paired with FetchMaxBytes, can upper bound the maximum amount of memory that the client can use for consuming.
Requests are issued to brokers in a FIFO order: once the client is ready to issue a request to a broker, it registers that request and issues it in order with other registrations.
If Kafka replies with any data, the client does not track the fetch as completed until the user has polled the buffered fetch. Thus, a concurrent fetch is not considered complete until all data from it is done being processed and out of the client itself.
Note that brokers are allowed to hang for up to FetchMaxWait before replying to a request, so if this option is too constrained and you are consuming a low throughput topic, the client may take a long time before requesting a broker that has new data. For high throughput topics, or if the allowed concurrent fetches is large enough, this should not be a concern.
A value of 0 implies the allowed concurrency is unbounded and will be limited only by the number of brokers in the cluster.
func Rack ¶
func Rack(rack string) ConsumerOpt
Rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.
Consuming from a preferred replica can increase latency but can decrease cross datacenter costs. See KIP-392 for more information.
type EpochOffset ¶
EpochOffset combines a record offset with the leader epoch the broker was at when the record was written.
type ErrDataLoss ¶
type ErrDataLoss struct { // Topic is the topic data loss was detected on. Topic string // Partition is the partition data loss was detected on. Partition int32 // ConsumedTo is what the client had consumed to for this partition before // data loss was detected. ConsumedTo int64 // ResetTo is what the client reset the partition to; everything from // ResetTo to ConsumedTo was lost. ResetTo int64 }
ErrDataLoss is returned for Kafka >=2.1.0 when data loss is detected and the client is able to reset to the last valid offset.
func (*ErrDataLoss) Error ¶
func (e *ErrDataLoss) Error() string
type Fetch ¶
type Fetch struct { // Topics are all topics being responded to from a fetch to a broker. Topics []FetchTopic }
Fetch is an individual response from a broker.
type FetchBatchMetrics ¶ added in v0.8.1
type FetchBatchMetrics struct { // NumRecords is the number of records that were fetched in this batch. // // Note that this number includes transaction markers, which are not // actually returned to the user. // // If the batch has an encoding error, this will be 0. NumRecords int // UncompressedBytes is the number of bytes the records deserialized // into after decompresion. // // For record batches (Kafka v0.11.0+), this is the size of the records // in a batch, and does not include record batch overhead. // // For message sets, this size includes message set overhead. // // Note that this number may be higher than the corresponding number // when producing, because as an "optimization", Kafka can return // partial batches when fetching. UncompressedBytes int // CompressedBytes is the number of bytes actually read for this batch, // before decompression. If the batch was not compressed, this will be // equal to UncompressedBytes. // // For record batches, this is the size of the compressed records, and // does not include record batch overhead. // // For message sets, this is the size of the compressed message set. CompressedBytes int // CompressionType signifies which algorithm the batch was compressed // with. // // 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is // zstd. CompressionType uint8 }
FetchBatchMetrics tracks information about fetches of batches.
type FetchError ¶
FetchError is an error in a fetch along with the topic and partition that the error was on.
type FetchPartition ¶
type FetchPartition struct { // Partition is the partition this is for. Partition int32 // Err is an error for this partition in the fetch. // // Note that if this is a fatal error, such as data loss or non // retriable errors, this partition will never be fetched again. Err error // HighWatermark is the current high watermark for this partition, that // is, the current offset that is on all in sync replicas. HighWatermark int64 // LastStableOffset is the offset at which all prior offsets have been // "decided". Non transactional records are always decided immediately, // but transactional records are only decided once they are commited or // aborted. // // The LastStableOffset will always be at or under the HighWatermark. LastStableOffset int64 // LogStartOffset is the low watermark of this partition, otherwise // known as the earliest offset in the partition. LogStartOffset int64 // Records contains feched records for this partition. Records []*Record }
FetchPartition is a response for a partition in a fetched topic from a broker.
func (*FetchPartition) EachRecord ¶ added in v1.1.0
func (p *FetchPartition) EachRecord(fn func(*Record))
EachRecord calls fn for each record in the partition.
type FetchTopic ¶
type FetchTopic struct { // Topic is the topic this is for. Topic string // Partitions contains individual partitions in the topic that were // fetched. Partitions []FetchPartition }
FetchTopic is a response for a fetched topic from a broker.
func (*FetchTopic) EachPartition ¶ added in v1.1.0
func (t *FetchTopic) EachPartition(fn func(FetchPartition))
EachPartition calls fn for each partition in Fetches.
type FetchTopicPartition ¶ added in v0.6.10
type FetchTopicPartition struct { // Topic is the topic this is for. Topic string // FetchPartition is an individual partition within this topic. FetchPartition }
FetchTopicPartition is similar to FetchTopic, but for an individual partition.
func (*FetchTopicPartition) EachRecord ¶ added in v0.6.10
func (r *FetchTopicPartition) EachRecord(fn func(*Record))
EachRecord calls fn for each record in the topic's partition.
type Fetches ¶
type Fetches []Fetch
Fetches is a group of fetches from brokers.
func (Fetches) EachError ¶ added in v0.8.0
EachError calls fn for every partition that had a fetch error with the topic, partition, and error.
This function has the same semantics as the Errors function; refer to the documentation on that function for what types of errors are possible.
func (Fetches) EachPartition ¶ added in v0.6.10
func (fs Fetches) EachPartition(fn func(FetchTopicPartition))
EachPartition calls fn for each partition in Fetches.
Partitions are not visited in any specific order, and a topic may be visited multiple times if it is spread across fetches.
func (Fetches) EachRecord ¶ added in v0.7.0
EachRecord calls fn for each record in Fetches.
This is very similar to using a record iter, and is solely a convenience function depending on which style you prefer.
func (Fetches) EachTopic ¶ added in v0.7.0
func (fs Fetches) EachTopic(fn func(FetchTopic))
EachTopic calls fn for each topic in Fetches.
This is a convenience function that groups all partitions for the same topic from many fetches into one FetchTopic. A map is internally allocated to group partitions per topic before calling fn.
func (Fetches) Errors ¶
func (fs Fetches) Errors() []FetchError
Errors returns all errors in a fetch with the topic and partition that errored.
There are four classes of errors possible:
a normal kerr.Error; these are usually the non-retriable kerr.Errors, but theoretically a non-retriable error can be fixed at runtime (auth error? fix auth). It is worth restarting the client for these errors if you do not intend to fix this problem at runtime.
an injected *ErrDataLoss; these are informational, the client automatically resets consuming to where it should and resumes. This error is worth logging and investigating, but not worth restarting the client for.
an untyped batch parse failure; these are usually unrecoverable by restarts, and it may be best to just let the client continue. However, restarting is an option, but you may need to manually repair your partition.
an injected ErrClientClosed; this is a fatal informational error that is returned from every Poll call if the client has been closed. A corresponding helper function IsClientClosed can be used to detect this error.
func (Fetches) IsClientClosed ¶ added in v0.8.1
IsClientClosed returns whether the fetches includes an error indicating that the client is closed.
This function is useful to break out of a poll loop; you likely want to call this function before calling Errors.
func (Fetches) RecordIter ¶
func (fs Fetches) RecordIter() *FetchesRecordIter
RecordIter returns an iterator over all records in a fetch.
Note that errors should be inspected as well.
type FetchesRecordIter ¶
type FetchesRecordIter struct {
// contains filtered or unexported fields
}
FetchesRecordIter iterates over records in a fetch.
func (*FetchesRecordIter) Done ¶
func (i *FetchesRecordIter) Done() bool
Done returns whether there are any more records to iterate over.
func (*FetchesRecordIter) Next ¶
func (i *FetchesRecordIter) Next() *Record
Next returns the next record from a fetch.
type FirstErrPromise ¶ added in v0.7.0
type FirstErrPromise struct {
// contains filtered or unexported fields
}
FirstErrPromise is a helper type to capture only the first failing error when producing a batch of records with this type's Promise function.
This is useful for when you only care about any record failing, and can use that as a signal (i.e., to abort a batch). The AbortingFirstErrPromise function can be used to abort all records as soon as the first error is encountered. If you do not need to abort, you can use this type with no constructor.
This is similar to using ProduceResult's FirstErr function.
func AbortingFirstErrPromise ¶ added in v0.7.4
func AbortingFirstErrPromise(cl *Client) *FirstErrPromise
AbortingFirstErrPromise returns a FirstErrPromise that will call the client's AbortBufferedRecords function if an error is encountered.
This can be used to quickly exit when any error is encountered, rather than waiting while flushing only to discover things errored.
func (*FirstErrPromise) Err ¶ added in v0.7.0
func (f *FirstErrPromise) Err() error
Err waits for all promises to complete and then returns any stored error.
func (*FirstErrPromise) Promise ¶ added in v0.7.0
func (f *FirstErrPromise) Promise() func(*Record, error)
Promise returns a promise for producing that will store the first error encountered.
The returned promise must eventually be called, because a FirstErrPromise does not return from 'Err' until all promises are completed.
type GroupBalancer ¶
type GroupBalancer interface { // ProtocolName returns the name of the protocol, e.g. roundrobin, // range, sticky. ProtocolName() string // JoinGroupMetadata returns the metadata to use in JoinGroup, given // the topic interests and the current assignment and group generation. // // It is safe to modify the input topics and currentAssignment. The // input topics are guaranteed to be sorted, as are the partitions for // each topic in currentAssignment. It is recommended for your output // to be ordered by topic and partitions. Since Kafka uses the output // from this function to determine whether a rebalance is needed, a // deterministic output will avoid accidental rebalances. JoinGroupMetadata( topicInterests []string, currentAssignment map[string][]int32, generation int32, ) []byte // ParseSyncAssignment returns assigned topics and partitions from an // encoded SyncGroupResponse's MemberAssignment. ParseSyncAssignment(assignment []byte) (map[string][]int32, error) // MemberBalancer returns a GroupMemberBalancer for the given group // members, as well as the topics that all the members are interested // in. If the client does not have some topics in the returned topics, // the client issues a metadata request to load the number of // partitions in those topics before calling the GroupMemberBalancer's // Balance function. // // The input group members are guaranteed to be sorted first by // instance ID, if non-nil, and then by member ID. // // It is up to the user to decide how to decode each member's // ProtocolMetadata field. The default client group protocol of // "consumer" by default uses join group metadata's of type // kmsg.GroupMemberMetadata. If this is the case for you, it may be // useful to use the ConsumerBalancer type to help parse the metadata // and balance. // // If the member metadata cannot be deserialized correctly, this should // return a relevant error. MemberBalancer(members []kmsg.JoinGroupResponseMember) (b GroupMemberBalancer, topics map[string]struct{}, err error) // IsCooperative returns if this is a cooperative balance strategy. IsCooperative() bool }
GroupBalancer balances topics and partitions among group members.
A GroupBalancer is roughly equivalent to Kafka's PartitionAssignor.
func CooperativeStickyBalancer ¶
func CooperativeStickyBalancer() GroupBalancer
CooperativeStickyBalancer performs the sticky balancing strategy, but additionally opts the consumer group into "cooperative" rebalancing.
Cooperative rebalancing differs from "eager" (the original) rebalancing in that group members do not stop processing partitions during the rebalance. Instead, once they receive their new assignment, each member determines which partitions it needs to revoke. If any, they send a new join request (before syncing), and the process starts over. This should ultimately end up in only two join rounds, with the major benefit being that processing never needs to stop.
NOTE once a group is collectively using cooperative balancing, it is unsafe to have a member join the group that does not support cooperative balancing. If the only-eager member is elected leader, it will not know of the new multiple join strategy and things will go awry. Thus, once a group is entirely on cooperative rebalancing, it cannot go back.
Migrating an eager group to cooperative balancing requires two rolling bounce deploys. The first deploy should add the cooperative-sticky strategy as an option (that is, each member goes from using one balance strategy to two). During this deploy, Kafka will tell leaders to continue using the old eager strategy, since the old eager strategy is the only one in common among all members. The second rolling deploy removes the old eager strategy. At this point, Kafka will tell the leader to use cooperative-sticky balancing. During this roll, all members in the group that still have both strategies continue to be eager and give up all of their partitions every rebalance. However, once a member only has cooperative-sticky, it can begin using this new strategy and things will work correctly. See KIP-429 for more details.
func RangeBalancer ¶
func RangeBalancer() GroupBalancer
RangeBalancer returns a group balancer that, per topic, maps partitions to group members. Since this works on a topic level, uneven partitions per topic to the number of members can lead to slight partition consumption disparities.
Suppose there are two members M0 and M1, two topics t0 and t1, and each topic has three partitions p0, p1, and p2. The partition balancing will be
M0: [t0p0, t0p1, t1p0, t1p1] M1: [t0p2, t1p2]
This is equivalent to the Java range balancer.
func RoundRobinBalancer ¶
func RoundRobinBalancer() GroupBalancer
RoundRobinBalancer returns a group balancer that evenly maps topics and partitions to group members.
Suppose there are two members M0 and M1, two topics t0 and t1, and each topic has three partitions p0, p1, and p2. The partition balancing will be
M0: [t0p0, t0p2, t1p1] M1: [t0p1, t1p0, t1p2]
If all members subscribe to all topics equally, the roundrobin balancer will give a perfect balance. However, if topic subscriptions are quite unequal, the roundrobin balancer may lead to a bad balance. See KIP-49 for one example (note that the fair strategy mentioned in KIP-49 does not exist).
This is equivalent to the Java roundrobin balancer.
func StickyBalancer ¶
func StickyBalancer() GroupBalancer
StickyBalancer returns a group balancer that ensures minimal partition movement on group changes while also ensuring optimal balancing.
Suppose there are three members M0, M1, and M3, and two topics t0 and t1 each with three partitions p0, p1, and p2. If the initial balance plan looks like
M0: [t0p0, t0p1, t0p2] M1: [t1p0, t1p1, t1p2] M2: [t2p0, t2p2, t2p2]
If M2 disappears, both roundrobin and range would have mostly destructive reassignments.
Range would result in
M0: [t0p0, t0p1, t1p0, t1p1, t2p0, t2p1] M1: [t0p2, t1p2, t2p2]
which is imbalanced and has 3 partitions move from members that did not need to move (t0p2, t1p0, t1p1).
RoundRobin would result in
M0: [t0p0, t0p2, t1p1, t2p0, t2p2] M1: [t0p1, t1p0, t1p2, t2p1]
which is balanced, but has 2 partitions move when they do not need to (t0p1, t1p1).
Sticky balancing results in
M0: [t0p0, t0p1, t0p2, t2p0, t2p2] M1: [t1p0, t1p1, t1p2, t2p1]
which is balanced and does not cause any unnecessary partition movement. The actual t2 partitions may not be in that exact combination, but they will be balanced.
An advantage of the sticky consumer is that it allows API users to potentially avoid some cleanup until after the consumer knows which partitions it is losing when it gets its new assignment. Users can then only cleanup state for partitions that changed, which will be minimal (see KIP-54; this client also includes the KIP-351 bugfix).
Note that this API implements the sticky partitioning quite differently from the Java implementation. The Java implementaiton is difficult to reason about and has many edge cases that result in non-optimal balancing (albeit, you likely have to be trying to hit those edge cases). This API uses a different algorithm to ensure optimal balancing while being an order of magnitude faster.
Since the new strategy is a strict improvement over the Java strategy, it is entirely compatible. Any Go client sharing a group with a Java client will not have its decisions undone on leadership change from a Go consumer to a Java one. Java balancers do not apply the strategy it comes up with if it deems the balance score equal to or worse than the original score (the score being effectively equal to the standard deviation of the mean number of assigned partitions). This Go sticky balancer is optimal and extra sticky. Thus, the Java balancer will never back out of a strategy from this balancer.
type GroupMemberBalancer ¶ added in v0.7.0
type GroupMemberBalancer interface { // Balance balances topics and partitions among group members, where // the int32 in the topics map corresponds to the number of partitions // known to be in each topic. Balance(topics map[string]int32) IntoSyncAssignment }
GroupMemberBalancer balances topics amongst group members.
type GroupOpt ¶
type GroupOpt interface { Opt // contains filtered or unexported methods }
GroupOpt is a consumer group specific option to configure a client. This is simply a namespaced Opt.
func AutoCommitCallback ¶ added in v0.11.0
func AutoCommitCallback(fn func(*Client, *kmsg.OffsetCommitRequest, *kmsg.OffsetCommitResponse, error)) GroupOpt
AutoCommitCallback sets the callback to use if autocommitting is enabled. This overrides the default callback that logs errors and continues.
func AutoCommitInterval ¶
AutoCommitInterval sets how long to go between autocommits, overriding the default 5s.
func AutoCommitMarks ¶ added in v0.10.3
func AutoCommitMarks() GroupOpt
AutoCommitMarks switches the autocommitting behavior to only commit "marked" records, which can be done with the MarkCommitRecords method.
This option is basically a halfway point between autocommitting and manually committing. If you have slow batch processing of polls, then you can manually mark records to be autocommitted before you poll again. This way, if you usually take a long time between polls, your partial work can still be automatically checkpointed through autocommitting.
func Balancers ¶
func Balancers(balancers ...GroupBalancer) GroupOpt
Balancers sets the group balancers to use for dividing topic partitions among group members, overriding the current default [cooperative-sticky]. This option is equivalent to Kafka's partition.assignment.strategies option.
For balancing, Kafka chooses the first protocol that all group members agree to support.
Note that if you opt in to cooperative-sticky rebalancing, cooperative group balancing is incompatible with eager (classical) rebalancing and requires a careful rollout strategy (see KIP-429).
func ConsumerGroup ¶ added in v0.8.0
ConsumerGroup sets the consumer group for the client to join and consume in. This option is required if using any other group options.
Note that when group consuming, the default is to autocommit every 5s. To be safe, autocommitting only commits what is *previously* polled. If you poll once, nothing will be committed. If you poll again, the first poll is available to be committed. This ensures at-least-once processing, but does mean there is likely some duplicate processing during rebalances. When your client shuts down, you should issue one final synchronous commit before leaving the group (because you will not be polling again, and you are not waiting for an autocommit).
func DisableAutoCommit ¶
func DisableAutoCommit() GroupOpt
DisableAutoCommit disable auto committing.
If you disable autocommitting, you may want to use a custom OnPartitionsRevoked, otherwise you may end up doubly processing records (which is fine, just leads to duplicate processing). Consider the scenario: you, member A, are processing partition 0, and previously committed offset 4 and have now locally processed through offset 30. A rebalance happens, and partition 0 moves to member B. If you use OnPartitionsRevoked, you can detect that you are losing this partition and commit your work through offset 30, so that member B can start processing at offset 30. If you do not commit (i.e. you do not use a custom OnPartitionsRevoked), the other member will start processing at offset 4. It may process through offset 50, leading to double processing of offsets 4 through 29. Worse, you, member A, can rewind member B's commit, because member B may commit offset 50 and you may finally eventually commit offset 30. If a rebalance happens, then even more duplicate processing will occur of offsets 30 through 49.
Again, OnPartitionsRevoked is not necessary, and not using it just means double processing, which for most workloads is fine since a simple group consumer is not EOS / transactional, only at-least-once. But, this is something to be aware of.
func GreedyAutoCommit ¶ added in v0.10.2
func GreedyAutoCommit() GroupOpt
GreedyAutoCommit opts in to committing everything that has been polled when autocommitting (the dirty offsets), rather than committing what has previously been polled. This option may result in message loss if your application crashes.
func GroupProtocol ¶ added in v0.6.10
GroupProtocol sets the group's join protocol, overriding the default value "consumer". The only reason to override this is if you are implementing custom join and sync group logic.
func HeartbeatInterval ¶
HeartbeatInterval sets how long a group member goes between heartbeats to Kafka, overriding the default 3,000ms.
Kafka uses heartbeats to ensure that a group member's session stays active. This value can be any value lower than the session timeout, but should be no higher than 1/3rd the session timeout.
This corresponds to Kafka's heartbeat.interval.ms.
func InstanceID ¶
InstanceID sets the group consumer's instance ID, switching the group member from "dynamic" to "static".
Prior to Kafka 2.3.0, joining a group gave a group member a new member ID. The group leader could not tell if this was a rejoining member. Thus, any join caused the group to rebalance.
Kafka 2.3.0 introduced the concept of an instance ID, which can persist across restarts. This allows for avoiding many costly rebalances and allows for stickier rebalancing for rejoining members (since the ID for balancing stays the same). The main downsides are that you, the user of a client, have to manage instance IDs properly, and that it may take longer to rebalance in the event that a client legitimately dies.
When using an instance ID, the client does NOT send a leave group request when closing. This allows for the client ot restart with the same instance ID and rejoin the group to avoid a rebalance. It is strongly recommended to increase the session timeout enough to allow time for the restart (remember that the default session timeout is 10s).
To actually leave the group, you must use an external admin command that issues a leave group request on behalf of this instance ID (see kcl), or you can manually use the kmsg package with a proper LeaveGroupRequest.
NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4.0+.
func OnPartitionsAssigned ¶ added in v0.11.0
OnPartitionsAssigned sets the function to be called when a group is joined after partitions are assigned before fetches for those partitions begin.
This function combined with OnPartitionsRevoked should not exceed the rebalance interval. It is possible for the group, immediately after finishing a balance, to re-enter a new balancing session.
The OnPartitionsAssigned function is passed the client's context, which is only canceled if the client is closed.
This function is not called concurrent with any other On callback, and this function is given a new map that the user is free to modify.
func OnPartitionsLost ¶ added in v0.11.0
OnPartitionsLost sets the function to be called on "fatal" group errors, such as IllegalGeneration, UnknownMemberID, and authentication failures. This function differs from OnPartitionsRevoked in that it is unlikely that commits will succeed when partitions are outright lost, whereas commits likely will succeed when revoking partitions.
If this is not set, you will not know when a group error occurs that forcefully loses all partitions. If you wish to use the same callback for lost and revoked, you can use OnPartitionsLostAsRevoked as a shortcut.
This function is not called concurrent with any other On callback, and this function is given a new map that the user is free to modify.
func OnPartitionsRevoked ¶ added in v0.11.0
OnPartitionsRevoked sets the function to be called once this group member has partitions revoked.
This function combined with OnPartitionsAssigned should not exceed the rebalance interval. It is possible for the group, immediately after finishing a balance, to re-enter a new balancing session.
If autocommit is enabled, the default OnPartitionsRevoked is a blocking commit all non-dirty offsets (where dirty is the most recent poll). The reason for a blocking commit is so that no later commit cancels the blocking commit. If the commit in OnPartitionsRevoked were canceled, then the rebalance would proceed immediately, the commit that canceled the blocking commit would fail, and duplicates could be consumed after the rebalance completes.
The OnPartitionsRevoked function is passed the client's context, which is only canceled if the client is closed. OnPartitionsRevoked function is called at the end of a group session even if there are no partitions being revoked. If you are committing offsets manually (have disabled autocommitting), it is highly recommended to do a proper blocking commit in OnPartitionsRevoked.
This function is not called concurrent with any other On callback, and this function is given a new map that the user is free to modify.
func RebalanceTimeout ¶
RebalanceTimeout sets how long group members are allowed to take when a a rebalance has begun, overriding the default 60,000ms. This timeout is how long all members are allowed to complete work and commit offsets, minus the time it took to detect the rebalance (from a heartbeat).
Kafka uses the largest rebalance timeout of all members in the group. If a member does not rejoin within this timeout, Kafka will kick that member from the group.
This corresponds to Kafka's rebalance.timeout.ms.
func RequireStableFetchOffsets ¶
func RequireStableFetchOffsets() GroupOpt
RequireStableFetchOffsets sets the group consumer to require "stable" fetch offsets before consuming from the group. Proposed in KIP-447 and introduced in Kafka 2.5.0, stable offsets are important when consuming from partitions that a transactional producer could be committing to.
With this option, Kafka will block group consumers from fetching offsets for partitions that are in an active transaction.
Because this can block consumption, it is strongly recommended to set transactional timeouts to a small value (10s) rather than the default 60s. Lowering the transactional timeout will reduce the chance that consumers are entirely blocked.
func SessionTimeout ¶
SessionTimeout sets how long a member in the group can go between heartbeats, overriding the default 45,000ms. If a member does not heartbeat in this timeout, the broker will remove the member from the group and initiate a rebalance.
If you are using a GroupTransactSession for EOS, wish to lower this, and are talking to a Kafka cluster pre 2.5.0, consider lowering the TransactionTimeout. If you do not, you risk a transaction finishing after a group has rebalanced, which could lead to duplicate processing. If you are talking to a Kafka 2.5.0+ cluster, you can safely use the RequireStableFetchOffsets group option and prevent any problems.
This option corresponds to Kafka's session.timeout.ms setting and must be within the broker's group.min.session.timeout.ms and group.max.session.timeout.ms.
type GroupTransactSession ¶
type GroupTransactSession struct {
// contains filtered or unexported fields
}
GroupTransactSession abstracts away the proper way to begin a transaction and more importantly how to end a transaction when consuming in a group, modifying records, and producing (EOS transaction).
func NewGroupTransactSession ¶ added in v0.8.0
func NewGroupTransactSession(opts ...Opt) (*GroupTransactSession, error)
NewGroupTransactSession is exactly the same as NewClient, but wraps the client's OnRevoked / OnLost to ensure that transactions are correctly aborted whenever necessary so as to properly provide EOS.
When ETLing in a group in a transaction, if a rebalance happens before the transaction is ended, you either (a) must block the rebalance from finishing until you are done producing, and then commit before unblocking, or (b) allow the rebalance to happen, but abort any work you did.
The problem with (a) is that if your ETL work loop is slow, you run the risk of exceeding the rebalance timeout and being kicked from the group. You will try to commit, and depending on the Kafka version, the commit may even be erroneously successful (pre Kafka 2.5.0). This will lead to duplicates.
Instead, for safety, a GroupTransactSession favors (b). If a rebalance occurs at any time before ending a transaction with a commit, this will abort the transaction.
This leaves the risk that ending the transaction itself exceeds the rebalance timeout, but this is just one request with no cpu logic. With a proper rebalance timeout, this single request will not fail and the commit will succeed properly.
func (*GroupTransactSession) Begin ¶
func (s *GroupTransactSession) Begin() error
Begin begins a transaction, returning an error if the client has no transactional id or is already in a transaction.
Begin must be called before producing records in a transaction.
Note that a revoke of any partitions sets the session's revoked state, even if the session has not begun. This state is only reset on EndTransaction. Thus, it is safe to begin transactions after a poll (but still before you produce).
func (*GroupTransactSession) Client ¶ added in v0.8.0
func (s *GroupTransactSession) Client() *Client
Client returns the underlying client that this transact session wraps. This can be useful for functions that require a client, such as raw requests. The returned client should not be used to manage transactions (leave that to the GroupTransactSession).
func (*GroupTransactSession) Close ¶ added in v0.8.0
func (s *GroupTransactSession) Close()
Close is a wrapper around Client.Close, with the exact same semantics. Please refer to that function's documentation.
This function must be called to leave the group before shutting down.
func (*GroupTransactSession) End ¶
func (s *GroupTransactSession) End(ctx context.Context, commit TransactionEndTry) (bool, error)
End ends a transaction, committing if commit is true, if the group did not rebalance since the transaction began, and if committing offsets is successful. If commit is false, the group has rebalanced, or any partition in committing offsets fails, this aborts.
This function calls Flush or AbortBufferedRecords depending on the commit status. If you are flushing, it is strongly recommended to Flush yourself before calling this, so that you can then determine if you need to abort.
This returns whether the transaction committed or any error that occurred. No returned error is retriable. Either the transactional ID has entered a failed state, or the client retried so much that the retry limit was hit, and odds are you should not continue.
Note that canceling the context will likely leave the client in an undesirable state, because canceling the context cancels in flight requests and prevents new requests (multiple requests are issued at the end of a transact session). Thus, while a context is allowed, it is strongly recommended to not cancel it.
func (*GroupTransactSession) PollFetches ¶ added in v0.6.10
func (s *GroupTransactSession) PollFetches(ctx context.Context) Fetches
PollFetches is a wrapper around Client.PollFetches, with the exact same semantics. Please refer to that function's documentation.
It is invalid to call PollFetches concurrently with Begin or End.
func (*GroupTransactSession) PollRecords ¶ added in v0.6.10
func (s *GroupTransactSession) PollRecords(ctx context.Context, maxPollRecords int) Fetches
PollRecords is a wrapper around Client.PollRecords, with the exact same semantics. Please refer to that function's documentation.
It is invalid to call PollRecords concurrently with Begin or End.
func (*GroupTransactSession) Produce ¶ added in v0.6.10
func (s *GroupTransactSession) Produce(ctx context.Context, r *Record, promise func(*Record, error))
Produce is a wrapper around Client.Produce, with the exact same semantics. Please refer to that function's documentation.
It is invalid to call Produce concurrently with Begin or End.
func (*GroupTransactSession) ProduceSync ¶ added in v0.7.0
func (s *GroupTransactSession) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults
ProduceSync is a wrapper around Client.ProduceSync, with the exact same semantics. Please refer to that function's documentation.
It is invalid to call ProduceSync concurrently with Begin or End.
type Hook ¶
type Hook interface{}
Hook is a hook to be called when something happens in kgo.
The base Hook interface is useless, but wherever a hook can occur in kgo, the client checks if your hook implements an appropriate interface. If so, your hook is called.
This allows you to only hook in to behavior you care about, and it allows the client to add more hooks in the future.
All hook interfaces in this package have Hook in the name. Hooks must be safe for concurrent use. It is expected that hooks are fast; if a hook needs to take time, then copy what you need and ensure the hook is async.
type HookBrokerConnect ¶ added in v0.7.0
type HookBrokerConnect interface { // OnBrokerConnect is passed the broker metadata, how long it took to // dial, and either the dial's resulting net.Conn or error. OnBrokerConnect(meta BrokerMetadata, dialDur time.Duration, conn net.Conn, err error) }
HookBrokerConnect is called after a connection to a broker is opened.
type HookBrokerDisconnect ¶ added in v0.7.0
type HookBrokerDisconnect interface { // OnBrokerDisconnect is passed the broker metadata and the connection // that is closing. OnBrokerDisconnect(meta BrokerMetadata, conn net.Conn) }
HookBrokerDisconnect is called when a connection to a broker is closed.
type HookBrokerE2E ¶ added in v0.7.7
type HookBrokerE2E interface { // OnBrokerE2E is passed the broker metadata, the key for the // request/response that was written/read, and the e2e info for the // request and response. OnBrokerE2E(meta BrokerMetadata, key int16, e2e BrokerE2E) }
HookBrokerE2E is called after a write to a broker that errors, or after a read to a broker.
This differs from HookBrokerRead and HookBrokerWrite by tracking all E2E info for a write and a read, which allows for easier e2e metrics. This hook can replace both the read and write hook.
type HookBrokerRead ¶ added in v0.7.0
type HookBrokerRead interface { // OnBrokerRead is passed the broker metadata, the key for the response // that was read, the number of bytes read (may not be the whole read // if there was an error), how long the client waited before reading // the response, how long it took to read the response, and any error. // // The bytes read does not count any tls overhead. OnBrokerRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) }
HookBrokerRead is called after a read from a broker.
Kerberos SASL does not cause read hooks, since it directly reads from the connection.
type HookBrokerThrottle ¶ added in v0.7.0
type HookBrokerThrottle interface { // OnBrokerThrottle is passed the broker metadata, the imposed // throttling interval, and whether the throttle was applied before // Kafka responded to them request or after. // // For Kafka < 2.0.0, the throttle is applied before issuing a response. // For Kafka >= 2.0.0, the throttle is applied after issuing a response. // // If throttledAfterResponse is false, then Kafka already applied the // throttle. If it is true, the client internally will not send another // request until the throttle deadline has passed. OnBrokerThrottle(meta BrokerMetadata, throttleInterval time.Duration, throttledAfterResponse bool) }
HookBrokerThrottle is called after a response to a request is read from a broker, and the response identifies throttling in effect.
type HookBrokerWrite ¶ added in v0.7.0
type HookBrokerWrite interface { // OnBrokerWrite is passed the broker metadata, the key for the request // that was written, the number of bytes that were written (may not be // the whole request if there was an error), how long the request // waited before being written (including throttling waiting), how long // it took to write the request, and any error. // // The bytes written does not count any tls overhead. OnBrokerWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) }
HookBrokerWrite is called after a write to a broker.
Kerberos SASL does not cause write hooks, since it directly writes to the connection.
type HookFetchBatchRead ¶ added in v0.8.1
type HookFetchBatchRead interface { // OnFetchBatchRead is called per batch read from a topic partition. OnFetchBatchRead(meta BrokerMetadata, topic string, partition int32, metrics FetchBatchMetrics) }
HookFetchBatchRead is called whenever a batch if read within the client.
Note that this hook is called when processing, but a batch may be internally discarded after processing in some uncommon specific circumstances.
If the client reads v0 or v1 message sets, and they are not compressed, then this hook will be called per record.
type HookFetchRecordBuffered ¶ added in v0.8.7
type HookFetchRecordBuffered interface { // OnFetchRecordBuffered is passed a record that is now buffered, ready // to be polled. OnFetchRecordBuffered(*Record) }
HookFetchRecordBuffered is called when a record is internally buffered after fetching, ready to be polled.
This hook can be used to write gauge metrics regarding the number of records or bytes buffered, or to write interceptors that modify a record before being returned from polling. If you just want a metric for the number of records buffered, use the client's BufferedFetchRecords method, as it is faster.
Note that this hook may slow down high-volume consuming a bit.
type HookFetchRecordUnbuffered ¶ added in v0.8.7
type HookFetchRecordUnbuffered interface { // OnFetchRecordUnbuffered is passwed a record that is being // "unbuffered" within the client, and whether the record is being // returned from polling. OnFetchRecordUnbuffered(r *Record, polled bool) }
HookFetchRecordUnbuffered is called when a fetched record is unbuffered.
A record can be internally discarded after being in some scenarios without being polled, such as when the internal assignment changes.
As an example, if using HookFetchRecordBuffered for a gauge of how many record bytes are buffered ready to be polled, this hook can be used to decrement the gauge.
Note that this hook may slow down high-volume consuming a bit.
type HookGroupManageError ¶ added in v0.7.0
type HookGroupManageError interface { // OnGroupManageError is passed the error that killed a group session. // This can be used to detect potentially fatal errors and act on them // at runtime to recover (such as group auth errors, or group max size // reached). OnGroupManageError(error) }
HookGroupManageError is called after every error that causes the client, operating as a group member, to break out of the group managing loop and backoff temporarily.
Specifically, any error that would result in OnLost being called will result in this hook being called.
type HookNewClient ¶ added in v0.8.7
type HookNewClient interface { // OnNewClient is passed the newly initialized client, before any // client goroutines are started. OnNewClient(*Client) }
HookNewClient is called in NewClient after a client is initialized. This hook can be used to perform final setup work in your hooks.
type HookProduceBatchWritten ¶ added in v0.8.1
type HookProduceBatchWritten interface { // OnProduceBatchWritten is called per successful batch written to a // topic partition OnProduceBatchWritten(meta BrokerMetadata, topic string, partition int32, metrics ProduceBatchMetrics) }
HookProduceBatchWritten is called whenever a batch is known to be successfully produced.
type HookProduceRecordBuffered ¶ added in v0.8.7
type HookProduceRecordBuffered interface { // OnProduceRecordBuffered is passed a record that is buffered. // // This hook is called immediately after Produce is called, after the // function potentially sets the default topic. OnProduceRecordBuffered(*Record) }
HookProduceRecordBuffered is called when a record is buffered internally in the client from a call to Produce.
This hook can be used to write metrics that gather the number of records or bytes buffered, or the hook can be used to write interceptors that modify a record's key / value / headers before being produced. If you just want a metric for the number of records buffered, use the client's BufferedProduceRecords method, as it is faster.
Note that this hook may slow down high-volume producing a bit.
type HookProduceRecordUnbuffered ¶ added in v0.8.7
type HookProduceRecordUnbuffered interface { // OnProduceRecordUnbuffered is passed a record that is just about to // have its produce promise called, as well as the error that the // promise will be called with. OnProduceRecordUnbuffered(*Record, error) }
HookProduceRecordUnbuffered is called just before a record's promise is finished; this is effectively a mirror of a record promise.
As an example, if using HookProduceRecordBuffered for a gauge of how many record bytes are buffered, this hook can be used to decrement the gauge.
Note that this hook may slow down high-volume producing a bit.
type IntoSyncAssignment ¶ added in v0.7.0
type IntoSyncAssignment interface {
IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment
}
IntoSyncAssignment takes a balance plan and returns a list of assignments to use in a kmsg.SyncGroupRequest.
It is recommended to ensure the output is deterministic and ordered by member / topic / partitions.
type IsolationLevel ¶
type IsolationLevel struct {
// contains filtered or unexported fields
}
IsolationLevel controls whether uncommitted or only committed records are returned from fetch requests.
func ReadCommitted ¶
func ReadCommitted() IsolationLevel
ReadCommitted is an isolation level to only fetch committed records.
func ReadUncommitted ¶
func ReadUncommitted() IsolationLevel
ReadUncommitted (the default) is an isolation level that returns the latest produced records, be they committed or not.
type LogLevel ¶
type LogLevel int8
LogLevel designates which level the logger should log at.
const ( // LogLevelNone disables logging. LogLevelNone LogLevel = iota // LogLevelError logs all errors. Generally, these should not happen. LogLevelError // LogLevelWarn logs all warnings, such as request failures. LogLevelWarn // LogLevelInfo logs informational messages, such as requests. This is // usually the default log level. LogLevelInfo // LogLevelDebug logs verbose information, and is usually not used in // production. LogLevelDebug )
type Logger ¶
type Logger interface { // Level returns the log level to log at. // // Implementations can change their log level on the fly, but this // function must be safe to call concurrently. Level() LogLevel // Log logs a message with key, value pair arguments for the given log // level. Keys are always strings, while values can be any type. // // This must be safe to call concurrently. Log(level LogLevel, msg string, keyvals ...interface{}) }
Logger is used to log informational messages.
func BasicLogger ¶
BasicLogger returns a logger that will print to dst in the following format:
prefix [LEVEL] message; key: val, key: val
prefixFn is optional; if non-nil, it is called for a per-message prefix.
Writes to dst are not checked for errors.
type Offset ¶
type Offset struct {
// contains filtered or unexported fields
}
Offset is a message offset in a partition.
func NewOffset ¶
func NewOffset() Offset
NewOffset creates and returns an offset to use in ConsumePartitions or ConsumeResetOffset.
The default offset begins at the end.
func (Offset) At ¶
At returns a copy of the calling offset, changing the returned offset to begin at exactly the requested offset.
There are two potential special offsets to use: -2 allows for consuming at the start, and -1 allows for consuming at the end. These two offsets are equivalent to calling AtStart or AtEnd.
If the offset is less than -2, the client bounds it to -2 to consume at the start.
func (Offset) AtEnd ¶
AtEnd returns a copy of the calling offset, changing the returned offset to begin at the end of a partition.
func (Offset) AtStart ¶
AtStart returns a copy of the calling offset, changing the returned offset to begin at the beginning of a partition.
func (Offset) MarshalJSON ¶ added in v0.10.3
type Opt ¶
type Opt interface {
// contains filtered or unexported methods
}
Opt is an option to configure a client.
func AllowAutoTopicCreation ¶ added in v0.8.0
func AllowAutoTopicCreation() Opt
AllowAutoTopicCreation enables topics to be auto created if they do not exist when fetching their metadata.
func BrokerMaxReadBytes ¶ added in v0.6.2
BrokerMaxReadBytes sets the maximum response size that can be read from Kafka, overriding the default 100MiB.
This is a safety measure to avoid OOMing on invalid responses. This is slightly double FetchMaxBytes; if bumping that, consider bump this. No other response should run the risk of hitting this limit.
func BrokerMaxWriteBytes ¶
BrokerMaxWriteBytes upper bounds the number of bytes written to a broker connection in a single write, overriding the default 100MiB.
This number corresponds to the a broker's socket.request.max.bytes, which defaults to 100MiB.
The only Kafka request that could come reasonable close to hitting this limit should be produce requests, and thus this limit is only enforced for produce requests.
func ClientID ¶
ClientID uses id for all requests sent to Kafka brokers, overriding the default "kgo".
func ConnIdleTimeout ¶ added in v0.6.11
ConnIdleTimeout is a rough amount of time to allow connections to idle before they are closed, overriding the default 20.
In the worst case, a connection can be allowed to idle for up to 2x this time, while the average is expected to be 1.5x (essentially, a uniform distribution from this interval to 2x the interval).
It is possible that a connection can be reaped just as it is about to be written to, but the client internally retries in these cases.
Connections are not reaped if they are actively being written to or read from; thus, a request can take a really long time itself and not be reaped (however, this may lead to the RequestTimeoutOverhead).
func DialTLSConfig ¶ added in v0.10.0
DialTLSConfig opts in to dialing brokers with the given TLS config with a 10s dial timeout. This is a shortcut for manually specifying a tls dialer using the Dialer option.
Every dial, the input config is cloned. If the config's ServerName is not specified, this function uses net.SplitHostPort to extract the host from the broker being dialed and sets the ServerName. In short, it is not necessary to set the ServerName.
func Dialer ¶
Dialer uses fn to dial addresses, overriding the default dialer that uses a 10s dial timeout and no TLS.
The context passed to the dial function is the context used in the request that caused the dial. If the request is a client-internal request, the context is the context on the client itself (which is canceled when the client is closed).
This function has the same signature as net.Dialer's DialContext and tls.Dialer's DialContext, meaning you can use this function like so:
kgo.Dialer((&net.Dialer{Timeout: 10*time.Second}).DialContext)
or
kgo.Dialer((&tls.Dialer{...}).DialContext)
func MaxVersions ¶
MaxVersions sets the maximum Kafka version to try, overriding the internal unbounded (latest stable) versions.
Note that specific max version pinning is required if trying to interact with versions pre 0.10.0. Otherwise, unless using more complicated requests that this client itself does not natively use, it is generally safe to opt for the latest version. If using the kmsg package directly to issue requests, it is recommended to pin versions so that new fields on requests do not get invalid default zero values before you update your usage.
func MetadataMaxAge ¶
MetadataMaxAge sets the maximum age for the client's cached metadata, overriding the default 5m, to allow detection of new topics, partitions, etc.
This corresponds to Kafka's metadata.max.age.ms.
func MetadataMinAge ¶
MetadataMinAge sets the minimum time between metadata queries, overriding the default 10s. You may want to raise or lower this to reduce the number of metadata queries the client will make. Notably, if metadata detects an error in any topic or partition, it triggers itself to update as soon as allowed. Additionally, any connection failures causing backoff while producing or consuming trigger metadata updates, because the client must assume that maybe the connection died due to a broker dying.
func MinVersions ¶ added in v0.6.2
MinVersions sets the minimum Kafka version a request can be downgraded to, overriding the default of the lowest version.
This option is useful if you are issuing requests that you absolutely do not want to be downgraded; that is, if you are relying on features in newer requests, and you are not sure if your brokers can handle those features. By setting a min version, if the client detects it needs to downgrade past the version, it will instead avoid issuing the request.
Unlike MaxVersions, if a request is issued that is unknown to the min versions, the request is allowed. It is assumed that there is no lower bound for that request.
func RequestRetries ¶
RequestRetries sets the number of tries that retriable requests are allowed, overriding the default of 20.
This option does not apply to produce requests; to limit produce request retries / record retries, see RecordRetries.
func RequestTimeoutOverhead ¶ added in v0.11.0
RequestTimeoutOverhead uses the given time as overhead while deadlining requests, overriding the default overhead of 20s.
For most requests, the overhead will simply be this timeout. However, for any request with a TimeoutMillis field, the overhead is added on top of the request's TimeoutMillis. This ensures that we give Kafka enough time to actually process the request given the timeout, while still having a deadline on the connection as a whole to ensure it does not hang.
For writes, the timeout is always the overhead. We buffer writes in our client before one quick flush, so we always expect the write to be fast.
Note that hitting the timeout kills a connection, which will fail any other active writes or reads on the connection.
This option is roughly equivalent to request.timeout.ms, but grants additional time to requests that have timeout fields.
func RetryBackoffFn ¶ added in v0.10.0
RetryBackoffFn sets the backoff strategy for how long to backoff for a given amount of retries, overriding the default jittery exponential backoff that ranges from 100ms min to 1s max.
This (roughly) corresponds to Kafka's retry.backoff.ms setting and retry.backoff.max.ms (which is being introduced with KIP-500).
func RetryTimeout ¶
RetryTimeout sets the upper limit on how long we allow requests to retry, overriding the default of 5m for EndTxn requests, 1m for all others.
This timeout applies to any request issued through a client's Request function. It does not apply to fetches nor produces.
A value of zero indicates no request timeout.
The timeout is evaluated after a request is issued. If a retry backoff places the next request past the retry timeout deadline, the request will still be tried once more once the backoff expires.
func RetryTimeoutFn ¶ added in v0.10.0
RetryTimeoutFn sets the per-request upper limit on how long we allow requests to retry, overriding the default of 5m for EndTxn requests, 1m for all others.
This timeout applies to any request issued through a client's Request function. It does not apply to fetches nor produces.
The function is called with the request key that is being retried. While it is not expected that the request key will be used, including it gives users the opportinuty to have different retry timeouts for different keys.
If the function returns zero, there is no retry timeout.
The timeout is evaluated after a request is issued. If a retry backoff places the next request past the retry timeout deadline, the request will still be tried once more once the backoff expires.
func SASL ¶
SASL appends sasl authentication options to use for all connections.
SASL is tried in order; if the broker supports the first mechanism, all connections will use that mechanism. If the first mechanism fails, the client will pick the first supported mechanism. If the broker does not support any client mechanisms, connections will fail.
func SeedBrokers ¶
SeedBrokers sets the seed brokers for the client to use, overriding the default 127.0.0.1:9092.
Any seeds that are missing a port use the default Kafka port 9092.
func SoftwareNameAndVersion ¶
SoftwareNameAndVersion sets the client software name and version that will be sent to Kafka as part of the ApiVersions request as of Kafka 2.4.0, overriding the default "kgo" and internal version number.
Kafka exposes this through metrics to help operators understand the impact of clients.
It is generally not recommended to set this. As well, if you do, the name and version must match the following regular expression:
[a-zA-Z0-9](?:[a-zA-Z0-9\.-]*[a-zA-Z0-9])?
Note this means neither the name nor version can be empty.
func WithHooks ¶
WithHooks sets hooks to call whenever relevant.
Hooks can be used to layer in metrics (such as Prometheus hooks) or anything else. The client will call all hooks in order. See the Hooks interface for more information, as well as any interface that contains "Hook" in the name to know the available hooks. A single hook can implement zero or all hook interfaces, and only the hooks that it implements will be called.
func WithLogger ¶
WithLogger sets the client to use the given logger, overriding the default to not use a logger.
It is invalid to use a nil logger; doing so will cause panics.
type Partitioner ¶
type Partitioner interface { // forTopic returns a partitioner for an individual topic. It is // guaranteed that only one record will use the an individual topic's // topicPartitioner at a time, meaning partitioning within a topic does // not require locks. ForTopic(string) TopicPartitioner }
Partitioner creates topic partitioners to determine which partition messages should be sent to.
Note that a record struct is unmodified (minus a potential default topic) from producing through partitioning, so you can set fields in the record struct before producing to aid in partitioning with a custom partitioner.
func BasicConsistentPartitioner ¶ added in v0.8.1
func BasicConsistentPartitioner(partition func(string) func(r *Record, n int) int) Partitioner
BasicConsistentPartitioner wraps a single function to provide a Partitioner and TopicPartitioner (that function is essentially a combination of Partitioner.ForTopic and TopicPartitioner.Partition).
As a minimal example, if you do not care about the topic and you set the partition before producing:
kgo.BasicConsistentPartitioner(func(topic) func(*Record, int) int { return func(r *Record, n int) int { return int(r.Partition) } })
func LeastBackupPartitioner ¶ added in v0.10.1
func LeastBackupPartitioner() Partitioner
LeastBackupPartitioner prioritizes partitioning by three factors, in order:
- pin to the current pick until there is a new batch
- on new batch, choose the least backed up partition (the partition with the fewest amount of buffered records)
- if multiple partitions are equally least-backed-up, choose one at random
This algorithm prioritizes least-backed-up throughput, which may result in unequal partitioning. It is likely that this algorithm will talk most to the broker that it has the best connection to.
This algorithm is resilient to brokers going down: if a few brokers die, it is possible your throughput will be so high that the maximum buffered records will be reached in the now-offline partitions before metadata responds that the broker is offline. With the standard partitioning algorithms, the only recovery is if the partition is remapped or if the broker comes back online. With the least backup partitioner, downed partitions will see slight backup, but then the other partitions that are still accepting writes will get all of the writes and your client will not be blocked.
Under ideal scenarios (no broker / connection issues), StickyPartitioner is equivalent to LeastBackupPartitioner. This partitioner is only recommended if you are a producer consistently dealing with flaky connections or problematic brokers and do not mind uneven load on your brokers.
func ManualPartitioner ¶ added in v0.8.1
func ManualPartitioner() Partitioner
ManualPartitioner is a partitioner that simply returns the Partition field that is already set on any record.
Any record with an invalid partition will be immediately failed. This partitioner is simply the partitioner that is demonstrated in the BasicConsistentPartitioner documentation.
func RoundRobinPartitioner ¶ added in v0.10.2
func RoundRobinPartitioner() Partitioner
RoundRobinPartitioner is a partitioner that round-robin's through all available partitions. This algorithm has lower throughput and causes higher CPU load on brokers, but can be useful if you want to ensure an even distribution of records to partitions.
func StickyKeyPartitioner ¶
func StickyKeyPartitioner(overrideHasher PartitionerHasher) Partitioner
StickyKeyPartitioner mirrors the default Java partitioner from Kafka's 2.4.0 release (see KAFKA-8601).
This is the same "hash the key consistently, if no key, choose random partition" strategy that the Java partitioner has always used, but rather than always choosing a random partition, the partitioner pins a partition to produce to until that partition rolls over to a new batch. Only when rolling to new batches does this partitioner switch partitions.
The benefit with this pinning is less CPU utilization on Kafka brokers. Over time, the random distribution is the same, but the brokers are handling on average larger batches.
overrideHasher is optional; if nil, this will return a partitioner that partitions exactly how Kafka does. Specifically, the partitioner will use murmur2 to hash keys, will mask out the 32nd bit, and then will mod by the number of potential partitions.
func StickyPartitioner ¶
func StickyPartitioner() Partitioner
StickyPartitioner is the same as StickyKeyPartitioner, but with no logic to consistently hash keys. That is, this only partitions according to the sticky partition strategy.
type PartitionerHasher ¶
PartitionerHasher returns a partition to use given the input data and number of partitions.
func KafkaHasher ¶
func KafkaHasher(hashFn func([]byte) uint32) PartitionerHasher
KafkaHasher returns a PartitionerHasher using hashFn that mirrors how Kafka partitions after hashing data.
func SaramaHasher ¶
func SaramaHasher(hashFn func([]byte) uint32) PartitionerHasher
SaramaHasher returns a PartitionerHasher using hashFn that mirrors how Sarama partitions after hashing data.
type ProduceBatchMetrics ¶ added in v0.8.1
type ProduceBatchMetrics struct { // NumRecords is the number of records that were produced in this // batch. NumRecords int // UncompressedBytes is the number of bytes the records serialized as // before compression. // // For record batches (Kafka v0.11.0+), this is the size of the records // in a batch, and does not include record batch overhead. // // For message sets, this size includes message set overhead. UncompressedBytes int // CompressedBytes is the number of bytes actually written for this // batch, after compression. If compression is not used, this will be // equal to UncompresedBytes. // // For record batches, this is the size of the compressed records, and // does not include record batch overhead. // // For message sets, this is the size of the compressed message set. CompressedBytes int // CompressionType signifies which algorithm the batch was compressed // with. // // 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is // zstd. CompressionType uint8 }
ProduceBatchMetrics tracks information about successful produces to partitions.
type ProduceResult ¶ added in v0.7.0
type ProduceResult struct { // Record is the produced record. It is always non-nil. // // If this record was produced successfully, its attrs / offset / id / // epoch / etc. fields are filled in on return if possible (i.e. when // producing with acks required). Record *Record // Err is a potential produce error. If this is non-nil, the record was // not produced successfully. Err error }
ProduceResult is the result of producing a record in a synchronous manner.
type ProduceResults ¶ added in v0.7.0
type ProduceResults []ProduceResult
ProduceResults is a collection of produce results.
func (ProduceResults) First ¶ added in v0.7.0
func (rs ProduceResults) First() (*Record, error)
First the first record and error in the produce results.
This function is useful if you only passed one record to ProduceSync.
func (ProduceResults) FirstErr ¶ added in v0.7.0
func (rs ProduceResults) FirstErr() error
FirstErr returns the first erroring result, if any.
type ProducerOpt ¶
type ProducerOpt interface { Opt // contains filtered or unexported methods }
ProducerOpt is a producer specific option to configure a client. This is simply a namespaced Opt.
func DefaultProduceTopic ¶ added in v0.8.0
func DefaultProduceTopic(t string) ProducerOpt
DefaultProduceTopic sets the default topic to produce to if the topic field is empty in a Record.
If this option is not used, if a record has an empty topic, the record cannot be produced and will be failed immediately.
func DisableIdempotentWrite ¶ added in v0.6.10
func DisableIdempotentWrite() ProducerOpt
DisableIdempotentWrite disables idempotent produce requests, opting out of Kafka server-side deduplication in the face of reissued requests due to transient network problems.
Idempotent production is strictly a win, but does require the IDEMPOTENT_WRITE permission on CLUSTER (pre Kafka 3.0), and not all clients can have that permission.
This option is incompatible with specifying a transactional id.
func ManualFlushing ¶
func ManualFlushing() ProducerOpt
ManualFlushing disables auto-flushing when producing. While you can still set lingering, it would be useless to do so.
With manual flushing, producing while MaxBufferedRecords have already been produced and not flushed will return ErrMaxBuffered.
func MaxBufferedRecords ¶
func MaxBufferedRecords(n int) ProducerOpt
MaxBufferedRecords sets the max amount of records the client will buffer, blocking produces until records are finished if this limit is reached. This overrides the unbounded default.
func ProduceRequestTimeout ¶
func ProduceRequestTimeout(limit time.Duration) ProducerOpt
ProduceRequestTimeout sets how long Kafka broker's are allowed to respond to produce requests, overriding the default 30s. If a broker exceeds this duration, it will reply with a request timeout error.
This corresponds to Kafka's request.timeout.ms setting, but only applies to produce requests.
func ProducerBatchCompression ¶ added in v0.11.0
func ProducerBatchCompression(preference ...CompressionCodec) ProducerOpt
ProducerBatchCompression sets the compression codec to use for producing records.
Compression is chosen in the order preferred based on broker support. For example, zstd compression was introduced in Kafka 2.1.0, so the preference can be first zstd, fallback snappy, fallback none.
The default preference is [snappy, none], which should be fine for all old consumers since snappy compression has existed since Kafka 0.8.0. To use zstd, your brokers must be at least 2.1.0 and all consumers must be upgraded to support decoding zstd records.
func ProducerBatchMaxBytes ¶ added in v0.11.0
func ProducerBatchMaxBytes(v int32) ProducerOpt
ProducerBatchMaxBytes upper bounds the size of a record batch, overriding the default 1MB.
This corresponds to Kafka's max.message.bytes, which defaults to 1,000,012 bytes (just over 1MB).
Record batches are independent of a ProduceRequest: a record batch is specific to a topic and partition, whereas the produce request can contain many record batches for many topics.
If a single record encodes larger than this number (before compression), it will will not be written and a callback will have the appropriate error.
Note that this is the maximum size of a record batch before compression. If a batch compresses poorly and actually grows the batch, the uncompressed form will be used.
func ProducerLinger ¶ added in v0.11.0
func ProducerLinger(linger time.Duration) ProducerOpt
ProducerLinger sets how long individual topic partitions will linger waiting for more records before triggering a request to be built.
Note that this option should only be used in low volume producers. The only benefit of lingering is to potentially build a larger batch to reduce cpu usage on the brokers if you have many producers all producing small amounts.
If a produce request is triggered by any topic partition, all partitions with a possible batch to be sent are used and all lingers are reset.
As mentioned, the linger is specific to topic partition. A high volume producer will likely be producing to many partitions; it is both unnecessary to linger in this case and inefficient because the client will have many timers running (and stopping and restarting) unnecessarily.
func ProducerOnDataLossDetected ¶ added in v0.11.0
func ProducerOnDataLossDetected(fn func(string, int32)) ProducerOpt
ProducerOnDataLossDetected sets a function to call if data loss is detected when producing records if the client is configured to continue on data loss. Thus, this option is mutually exclusive with StopProducerOnDataLossDetected.
The passed function will be called with the topic and partition that data loss was detected on.
func RecordDeliveryTimeout ¶ added in v0.11.0
func RecordDeliveryTimeout(timeout time.Duration) ProducerOpt
RecordDeliveryTimeout sets a rough time of how long a record can sit around in a batch before timing out, overriding the unlimited default.
If idempotency is enabled (as it is by default), this option is only enforced if it is safe to do so without creating invalid sequence numbers. It is safe to enforce if a record was never issued in a request to Kafka, or if it was requested and received a response.
The timeout for all records in a batch inherit the timeout of the first record in that batch. That is, once the first record's timeout expires, all records in the batch are expired. This generally is a non-issue unless using this option with lingering. In that case, simply add the linger to the record timeout to avoid problems.
If a record times out, all records buffered in the same partition are failed as well. This ensures gapless ordering: the client will not fail one record only to produce a later one successfully. This also allows for easier sequence number ordering internally.
The timeout is only evaluated evaluated before writing a request or after a produce response. Thus, a sink backoff may delay record timeout slightly.
This option is roughly equivalent to delivery.timeout.ms.
func RecordPartitioner ¶
func RecordPartitioner(partitioner Partitioner) ProducerOpt
RecordPartitioner uses the given partitioner to partition records, overriding the default StickyKeyPartitioner.
func RecordRetries ¶ added in v0.10.0
func RecordRetries(n int) ProducerOpt
RecordRetries sets the number of tries for producing records, overriding the unlimited default.
If idempotency is enabled (as it is by default), this option is only enforced if it is safe to do so without creating invalid sequence numbers. It is safe to enforce if a record was never issued in a request to Kafka, or if it was requested and received a response.
If a record fails due to retries, all records buffered in the same partition are failed as well. This ensures gapless ordering: the client will not fail one record only to produce a later one successfully. This also allows for easier sequence number ordering internally.
If a topic repeatedly fails to load with UNKNOWN_TOPIC_OR_PARTITION, it has a different, internal retry limit. All records for a topic that repeatedly cannot be loaded are failed when the internal limit is hit.
This option is different from RequestRetries to allow finer grained control of when to fail when producing records.
func RequiredAcks ¶
func RequiredAcks(acks Acks) ProducerOpt
RequiredAcks sets the required acks for produced records, overriding the default RequireAllISRAcks.
func StopProducerOnDataLossDetected ¶ added in v0.11.0
func StopProducerOnDataLossDetected() ProducerOpt
StopProducerOnDataLossDetected sets the client to stop producing if data loss is detected, overriding the default false.
Note that if using this option, it is strongly recommended to not have a retry limit. Doing so may lead to errors where the client fails a batch on a recoverable error, which internally bumps the idempotent sequence number used for producing, which may then later cause an inadvertent out of order sequence number and false "data loss" detection.
func TransactionTimeout ¶
func TransactionTimeout(timeout time.Duration) ProducerOpt
TransactionTimeout sets the allowed for a transaction, overriding the default 40s. It is a good idea to keep this less than a group's session timeout, so that a group member will always be alive for the duration of a transaction even if connectivity dies. This helps prevent a transaction finishing after a rebalance, which is problematic pre-Kafka 2.5.0. If you are on Kafka 2.5.0+, then you can use the RequireStableFetchOffsets option when assigning the group, and you can set this to whatever you would like.
Transaction timeouts begin when the first record is produced within a transaction, not when a transaction begins.
func TransactionalID ¶
func TransactionalID(id string) ProducerOpt
TransactionalID sets a transactional ID for the client, ensuring that records are produced transactionally under this ID (exactly once semantics).
For Kafka-to-Kafka transactions, the transactional ID is only one half of the equation. You must also assign a group to consume from.
To produce transactionally, you first BeginTransaction, then produce records consumed from a group, then you EndTransaction. All records prodcued outside of a transaction will fail immediately with an error.
After producing a batch, you must commit what you consumed. Auto committing offsets is disabled during transactional consuming / producing.
Note that unless using Kafka 2.5.0, a consumer group rebalance may be problematic. Production should finish and be committed before the client rejoins the group. It may be safer to use an eager group balancer and just abort the transaction. Alternatively, any time a partition is revoked, you could abort the transaction and reset offsets being consumed.
If the client detects an unrecoverable error, all records produced thereafter will fail.
Lastly, the default read level is READ_UNCOMMITTED. Be sure to use the ReadIsolationLevel option if you want to only read committed.
type Record ¶
type Record struct { // Key is an optional field that can be used for partition assignment. // // This is generally used with a hash partitioner to cause all records // with the same key to go to the same partition. Key []byte // Value is blob of data to write to Kafka. Value []byte // Headers are optional key/value pairs that are passed along with // records. // // These are purely for producers and consumers; Kafka does not look at // this field and only writes it to disk. Headers []RecordHeader // Timestamp is the timestamp that will be used for this record. // // Record batches are always written with "CreateTime", meaning that // timestamps are generated by clients rather than brokers. // // This field is always set in Produce. Timestamp time.Time // Topic is the topic that a record is written to. // // This must be set for producing. Topic string // Partition is the partition that a record is written to. // // For producing, this is left unset. This will be set by the client as // appropriate. Alternatively, you can use the ManualPartitioner, which // makes it such that this field is always the field chosen when // partitioning (i.e., you partition manually ahead of time). Partition int32 // Attrs specifies what attributes were on this record. Attrs RecordAttrs // ProducerEpoch is the producer epoch of this message if it was // produced with a producer ID. An epoch and ID of 0 means it was not. // // For producing, this is left unset. This will be set by the client // as appropriate. ProducerEpoch int16 // ProducerEpoch is the producer ID of this message if it was produced // with a producer ID. An epoch and ID of 0 means it was not. // // For producing, this is left unset. This will be set by the client // as appropriate. ProducerID int64 // LeaderEpoch is the leader epoch of the broker at the time this // record was written, or -1 if on message sets. LeaderEpoch int32 // Offset is the offset that a record is written as. // // For producing, this is left unset. This will be set by the client as // appropriate. If you are producing with no acks, this will just be // the offset used in the produce request and does not mirror the // offset actually stored within Kafka. Offset int64 }
Record is a record to write to Kafka.
func KeySliceRecord ¶ added in v0.7.0
KeySliceRecord returns a Record with the Key and Value fields set to the input key and value slices. For producing, this function is useful in tandem with the client-level ProduceTopic option.
func KeyStringRecord ¶ added in v0.7.0
KeyStringRecord returns a Record with the Key and Value fields set to the input key and value strings. For producing, this function is useful in tandem with the client-level ProduceTopic option.
This function uses the 'unsafe' package to avoid copying value into a slice.
NOTE: It is NOT SAFE to modify the record's value. This function should only be used if you only ever read record fields. This function can safely be used for producing; the client never modifies a record's key nor value fields.
func SliceRecord ¶ added in v0.7.0
SliceRecord returns a Record with the Value field set to the input value slice. For producing, this function is useful in tandem with the client-level ProduceTopic option.
func StringRecord ¶ added in v0.7.0
StringRecord returns a Record with the Value field set to the input value string. For producing, this function is useful in tandem with the client-level ProduceTopic option.
This function uses the 'unsafe' package to avoid copying value into a slice.
NOTE: It is NOT SAFE to modify the record's value. This function should only be used if you only ever read record fields. This function can safely be used for producing; the client never modifies a record's key nor value fields.
type RecordAttrs ¶
type RecordAttrs struct {
// contains filtered or unexported fields
}
RecordAttrs contains additional meta information about a record, such as its compression or timestamp type.
func (RecordAttrs) CompressionType ¶
func (a RecordAttrs) CompressionType() uint8
CompressionType signifies with which algorithm this record was compressed.
0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is zstd.
func (RecordAttrs) IsControl ¶
func (a RecordAttrs) IsControl() bool
IsControl returns whether a record is a "control" record (ABORT or COMMIT). These are generally not visible unless explicitly opted into.
func (RecordAttrs) IsTransactional ¶
func (a RecordAttrs) IsTransactional() bool
IsTransactional returns whether a record is a part of a transaction.
func (RecordAttrs) TimestampType ¶
func (a RecordAttrs) TimestampType() int8
TimestampType specifies how Timestamp was determined.
The default, 0, means that the timestamp was determined in a client when the record was produced.
An alternative is 1, which is when the Timestamp is set in Kafka.
Records pre 0.10.0 did not have timestamps and have value -1.
type RecordHeader ¶
RecordHeader contains extra information that can be sent with Records.
type ResponseShard ¶ added in v0.6.2
type ResponseShard struct { // Meta contains the broker that this request was issued to, or an // unknown (node ID -1) metadata if the request could not be issued. // // Requests can fail to even be issued if an appropriate broker cannot // be loaded of if the client cannot understand the request. Meta BrokerMetadata // Req is the request that was issued to this broker. Req kmsg.Request // Resp is the response received from the broker, if any. Resp kmsg.Response // Err, if non-nil, is the error that prevented a response from being // received or the request from being issued. Err error }
ResponseShard ties together a request with either the response it received or an error that prevented a response from being received.
type TopicBackupIter ¶ added in v0.10.2
type TopicBackupIter interface { // Next returns the next partition index and the total buffered records // for the partition. If Rem returns 0, calling this function again // will panic. Next() (int, int64) // Rem returns the number of elements left to iterate through. Rem() int }
TopicBackupIter is an iterates through partition indices.
type TopicBackupPartitioner ¶ added in v0.10.1
type TopicBackupPartitioner interface { TopicPartitioner // PartitionByBackup is similar to Partition, but has an additional // backupIter. This iterator will return the number of buffered records // per partition index. The iterator's Next function can only be called // up to n times, calling it any more will panic. PartitionByBackup(r *Record, n int, backupIter TopicBackupIter) int }
TopicBackupPartitioner is an optional extension interface to TopicPartitioner that can partition by the number of records buffered.
If a partitioner implements this interface, the Partition function will never be called.
type TopicPartitioner ¶
type TopicPartitioner interface { // RequiresConsistency returns true if a record must hash to the same // partition even if a partition is down. // If true, a record may hash to a partition that cannot be written to // and will error until the partition comes back. RequiresConsistency(*Record) bool // Partition determines, among a set of n partitions, which index should // be chosen to use for the partition for r. Partition(r *Record, n int) int }
TopicPartitioner partitions records in an individual topic.
type TopicPartitionerOnNewBatch ¶ added in v0.10.2
type TopicPartitionerOnNewBatch interface { // OnNewBatch is called when producing a record if that record would // trigger a new batch on its current partition. OnNewBatch() }
TopicPartitionerOnNewBatch is an optional extension interface to TopicPartitioner that calls OnNewBatch before any new batch is created. If buffering a record would cause a new batch, OnNewBatch is called.
This interface allows for partitioner implementations that effectively pin to a partition until a new batch is created, after which the partitioner can choose which next partition to use.
type TransactionEndTry ¶
type TransactionEndTry bool
TransactionEndTry is simply a named bool.
const ( // TryAbort attempts to end a transaction with an abort. TryAbort TransactionEndTry = false // TryCommit attempts to end a transaction with a commit. TryCommit TransactionEndTry = true )
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
internal
|
|
sticky
Package sticky provides sticky partitioning strategy for Kafka, with a complete overhaul to be faster, more understandable, and optimal.
|
Package sticky provides sticky partitioning strategy for Kafka, with a complete overhaul to be faster, more understandable, and optimal. |