Documentation ¶
Overview ¶
Package producer impelments an asynchronous kafka producer.
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ErrNilResponse = fmt.Errorf("nil response from broker")
var ErrNoProducerForPartition = errors.New("no producer for partition")
ErrNoProducerForPartition is set for batch.ProducerError when the producer is not configured to produce to partition specified in batch.Partition. This does not necessarily mean that the partition does not exist, just that the producer was not configured to write to it.
Functions ¶
This section is empty.
Types ¶
type Async ¶
type Async struct { // Kafka bootstrap either host:port or SRV Bootstrap string TLS *tls.Config Topic string // Produce to these partitions. Partitions []int // Spin up this many workers. Each worker is synchronous. Each worker processes one batch at // a time trying to send it to a random partition. On error the worker retries up to // NumRetries each time trying to send the batch to a different partition. Details are // returned in Exchange structs. After each error the underlying connection to the kafka // topic partition leader is closed, and reopened on the next call to that leader. Because // workers are synchronous NumWorkers determines the maximum number of "in flight" batches. // It makes no sense to have more workers than partitions. Setting NumWorkers=1 results in // Producer being synchronous. Must be >0. NumWorkers int // 1 means 1 initial attempt and no retries. 2 means 1 initial attempt and 1 more attempt on // error. Must be >0. NumAttempts int // Sleep this long between repeated produce calls for the same batch. This is a mixed // thing: if errors to produce are because partition leadership has moved, then it would // make sense to make next call immediately (because the leader would be found etc). But if // there is a problem with leadership / some other kind of slow down, then waiting for a bit // is good: keeps the producer from "needlessly" dropping the batch. This logic will become // even more complicated if data is partitioned (right now delivery is to random partition). // Anyway. Feel free to set to 0. SleepBetweenAttempts time.Duration // StrictPartitioning, when true, attempts to write each batch into the // partition which was set for it by the batch builder. This means that // if the partition is not available for some reason, the producer will // keep trying (up to NumAttempts) and then possibly lose data. If // StrictPartitioning is false, then after the first error the // partition for the batch will be set to -1 (meaning "first available" // round robin). If the batch builder does not partition (all batches // have partition set to -1) then this setting does nothing. StrictPartitioning bool // ConnMaxIdle is passed down to libkafa client.PartitionClient. See // documentation there. Should be less than connections.max.idle.ms // specified in broker config. ConnMaxIdle time.Duration // Acks required. 0 no acks, 1 leader only, -1 all ISRs (as specified // by min.insync.replicas). Acks int // Timeout to set for kafka produce api calls. This determines how long // the broker waits to get all required acks from replicas. Timeout time.Duration // contains filtered or unexported fields }
Async producer sends record batches to Kafka. Make sure to set public field values before calling Start. Do not change them after calling Start. Safe for concurrent use.
func (*Async) Start ¶
Start sending batches to Kafka. Returns a channel on which all produced (success or no) batches are sent. You need to read from that channel or production will be blocked. When input channel is closed the workers drain it, send any remaining batches to kafka, output the final batches, exit, and close the output channel. You should call Start only once.
type Batch ¶ added in v0.0.8
type Batch struct { libkafka.Batch // Topic is set by the producer. Batches received from the builder have // no topic set, producer sets it before attempting first exchange. Topic string // Partition to which the batch is intended to be produced. Special // value -1 means "next partition round robin". Zero value (0) means // "produce to partition 0". So be careful with that. To see the // partition to which the batch ended up actually being produced (or // attempted) see the Exchanges (.Response.Topic). Partition int32 BuildEnqueued time.Time BuildBegin time.Time BuildComplete time.Time BuildError error CompressComplete time.Time CompressError error UncompressedBytes int32 // batch size can't be more than MaxInt32 ProducerError error // for example ErrNoProducerForPartition Exchanges []*Exchange // each exchange records a Produce api call and response }
Batch is the unit on which the producer operates. In addition to fields inherited from libkafka.Batch (related to the wire protocol), the producer Batch records the entire "life cycle" of a batch: from recording timings on batch production, errors, through multiple (possibly) Produce api calls. Batches are created by builders (in the batch package) and are passed along to methods that mutate them recording additional information.
type Exchange ¶
type Exchange struct { // Enqueued records the time the producer started "looking" for a // worker to make the produce request. Delta between Enqueued and Begin // shows how long it took to get a worker. This delta can be reduced by // increasing the number of produce workers (unless all requests are to // produce to a single partition, in which case the number of produce // workers doesn't matter). Enqueued time.Time // Begin records the time the kafka produce call began. Delta between // that and Complete shows how long it took to send the request over // the wire and for kafka to respond to it. Large delta indicates // problems on kafka side. Begin time.Time Complete time.Time Response *producer.Response // Error indicates that exchange failed. This could be a "low level" // error such as network, or it could be a libkafka.Error, which means // response from the broker had ErrorCode other than ERR_NONE (0). Error error }
Exchange records information about a single Produce api call and response. A batch will have one or more exchanges attached to it. If Response.ErrorCode != libkafka.ERR_NONE then Error will be set to corresponding libkafka.Error.