producer

package
v0.0.43 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2022 License: BSD-3-Clause Imports: 9 Imported by: 0

Documentation

Overview

Package producer impelments an asynchronous kafka producer.

Index

Constants

This section is empty.

Variables

View Source
var ErrNilResponse = fmt.Errorf("nil response from broker")
View Source
var ErrNoProducerForPartition = errors.New("no producer for partition")

ErrNoProducerForPartition is set for batch.ProducerError when the producer is not configured to produce to partition specified in batch.Partition. This does not necessarily mean that the partition does not exist, just that the producer was not configured to write to it.

Functions

This section is empty.

Types

type Async

type Async struct {
	// Kafka bootstrap either host:port or SRV
	Bootstrap string
	TLS       *tls.Config
	Topic     string
	// Produce to these partitions.
	Partitions []int
	// Spin up this many workers. Each worker is synchronous. Each worker processes one batch at
	// a time trying to send it to a random partition. On error the worker retries up to
	// NumRetries each time trying to send the batch to a different partition. Details are
	// returned in Exchange structs. After each error the underlying connection to the kafka
	// topic partition leader is closed, and reopened on the next call to that leader. Because
	// workers are synchronous NumWorkers determines the maximum number of "in flight" batches.
	// It makes no sense to have more workers than partitions.  Setting NumWorkers=1 results in
	// Producer being synchronous. Must be >0.
	NumWorkers int
	// 1 means 1 initial attempt and no retries. 2 means 1 initial attempt and 1 more attempt on
	// error. Must be >0.
	NumAttempts int
	// Sleep this long between repeated produce calls for the same batch. This is a mixed
	// thing: if errors to produce are because partition leadership has moved, then it would
	// make sense to make next call immediately (because the leader would be found etc). But if
	// there is a problem with leadership / some other kind of slow down, then waiting for a bit
	// is good: keeps the producer from "needlessly" dropping the batch. This logic will become
	// even more complicated if data is partitioned (right now delivery is to random partition).
	// Anyway. Feel free to set to 0.
	SleepBetweenAttempts time.Duration
	// StrictPartitioning, when true, attempts to write each batch into the
	// partition which was set for it by the batch builder. This means that
	// if the partition is not available for some reason, the producer will
	// keep trying (up to NumAttempts) and then possibly lose data. If
	// StrictPartitioning is false, then after the first error the
	// partition for the batch will be set to -1 (meaning "first available"
	// round robin). If the batch builder does not partition (all batches
	// have partition set to -1) then this setting does nothing.
	StrictPartitioning bool
	// ConnMaxIdle is passed down to libkafa client.PartitionClient. See
	// documentation there. Should be less than connections.max.idle.ms
	// specified in broker config.
	ConnMaxIdle time.Duration
	// Acks required. 0 no acks, 1 leader only, -1 all ISRs (as specified
	// by min.insync.replicas).
	Acks int
	// Timeout to set for kafka produce api calls. This determines how long
	// the broker waits to get all required acks from replicas.
	Timeout time.Duration
	// contains filtered or unexported fields
}

Async producer sends record batches to Kafka. Make sure to set public field values before calling Start. Do not change them after calling Start. Safe for concurrent use.

func (*Async) Start

func (p *Async) Start(input <-chan *Batch) (<-chan *Batch, error)

Start sending batches to Kafka. Returns a channel on which all produced (success or no) batches are sent. You need to read from that channel or production will be blocked. When input channel is closed the workers drain it, send any remaining batches to kafka, output the final batches, exit, and close the output channel. You should call Start only once.

func (*Async) Wait added in v0.0.5

func (p *Async) Wait()

Wait until all outstanding batches have been produced and the producer has cleanly shut down. Calling Wait before Start is a nop.

type Batch added in v0.0.8

type Batch struct {
	libkafka.Batch
	// Topic is set by the producer. Batches received from the builder have
	// no topic set, producer sets it before attempting first exchange.
	Topic string
	// Partition to which the batch is intended to be produced. Special
	// value -1 means "next partition round robin". Zero value (0) means
	// "produce to partition 0". So be careful with that. To see the
	// partition to which the batch ended up actually being produced (or
	// attempted) see the Exchanges (.Response.Topic).
	Partition         int32
	BuildEnqueued     time.Time
	BuildBegin        time.Time
	BuildComplete     time.Time
	BuildError        error
	CompressComplete  time.Time
	CompressError     error
	UncompressedBytes int32       // batch size can't be more than MaxInt32
	ProducerError     error       // for example ErrNoProducerForPartition
	Exchanges         []*Exchange // each exchange records a Produce api call and response
}

Batch is the unit on which the producer operates. In addition to fields inherited from libkafka.Batch (related to the wire protocol), the producer Batch records the entire "life cycle" of a batch: from recording timings on batch production, errors, through multiple (possibly) Produce api calls. Batches are created by builders (in the batch package) and are passed along to methods that mutate them recording additional information.

func (*Batch) Produced added in v0.0.8

func (b *Batch) Produced() bool

Produced returns true if the batch has been successfuly produced (built, sent, and acked by a broker).

func (*Batch) String added in v0.0.8

func (b *Batch) String() string

type Exchange

type Exchange struct {
	// Enqueued records the time the producer started "looking" for a
	// worker to make the produce request. Delta between Enqueued and Begin
	// shows how long it took to get a worker. This delta can be reduced by
	// increasing the number of produce workers (unless all requests are to
	// produce to a single partition, in which case the number of produce
	// workers doesn't matter).
	Enqueued time.Time
	// Begin records the time the kafka produce call began. Delta between
	// that and Complete shows how long it took to send the request over
	// the wire and for kafka to respond to it. Large delta indicates
	// problems on kafka side.
	Begin    time.Time
	Complete time.Time
	Response *producer.Response
	// Error indicates that exchange failed. This could be a "low level"
	// error such as network, or it could be a libkafka.Error, which means
	// response from the broker had ErrorCode other than ERR_NONE (0).
	Error error
}

Exchange records information about a single Produce api call and response. A batch will have one or more exchanges attached to it. If Response.ErrorCode != libkafka.ERR_NONE then Error will be set to corresponding libkafka.Error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL