Documentation ¶
Overview ¶
Package builder implements a partitioning record batch builder.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type HashPartitioner ¶
type HashPartitioner struct {
// contains filtered or unexported fields
}
Uses Fnv32a.
func (*HashPartitioner) Partition ¶
func (p *HashPartitioner) Partition(key []byte) int32
func (*HashPartitioner) SetNumPartitions ¶
func (p *HashPartitioner) SetNumPartitions(n int32)
type NopPartitioner ¶
type NopPartitioner struct{}
Always sets partition to -1.
func (*NopPartitioner) Partition ¶
func (*NopPartitioner) Partition([]byte) int32
func (*NopPartitioner) SetNumPartitions ¶
func (*NopPartitioner) SetNumPartitions(int32)
type Partitioner ¶
type Partitioner interface { // Partition must be safe for concurrent use. Nil is a valid value for // the key. Behavior for when number of partitions set with // SetNumPartitions <1 is undefined. Partition(key []byte) int32 // SetNumPartitions does not need to be safe for concurrent use. The // intention is that it be called once as part of builder setup. // Behavior on subsequent calls, or on calls while partitioner is being // used, is undefined. Number of partitions should be >0. If <1 // behavior is undefined. SetNumPartitions(int32) }
type SequentialBuilder ¶
type SequentialBuilder struct { // Compressor must be safe for concurrent use Compressor batch.Compressor // Each batch will have at least this many records. There is no "max": // user can send slices or any size on the input channel. It is up to // the user to enforce sanity of input slices. MinRecords int // Each batch will have uncompressed payload (sum of uncompressed // record values) of at least this many bytes. Combined with MinRecords // (both have to be true) this determines when to "flush". MinUncompressedBytes int // Incoming records are collected into sets, the size of which (the // number of records in each set) is determined by MinRecords and // MinUncompressedBytes. Each of these sets of records must be built // into a batch: records need to be serialized into wire format and // then compressed. // // Each set of records is processed by a worker and results in a single // producer.Batch. NumWorkers determines the number of workers doing // the serialization and compression. This is most likely the most // expensive part of the whole pipeline (especially when compression is // enabled) so set this accordingly (but doesn't make sense for it to // be more than the number of available cores). Must be >0 NumWorkers int // Partitioner to use. Max number of "in flight" batches will be equal // to number of partitions plus NumWorkers. See NopPartitioner. Partitioner Partitioner // contains filtered or unexported fields }
Builder for record batches. Make sure to set public field values before calling Start. Do not change them after calling Start. Safe for concurrent use.
The builder collects records from its input channel. It groups (partitions) incoming records. When a group (set of records for given partition) reaches thresholds defined by MinRecords and MinUncompressedBytes, it is sent to a worker that marshals the records into a batch and compresses the batch.
func (*SequentialBuilder) Flush ¶
func (b *SequentialBuilder) Flush(d time.Duration)
Flush all batches "older" than d. If the builder is partitioned, will look through all partition batches; if the builder is not partitioned, there is only 1 batch. This takes precedence over MinRecords and MinUncompressedBytes. Passing d=0 immediately flushes all batches. Empty batches are never flushed. Does not block. If there is already a flush enqueued / in progress, flush returns immediately as nop.
func (*SequentialBuilder) Start ¶
func (b *SequentialBuilder) Start(input <-chan []Record) <-chan *producer.Batch
Start building batches. Returns channel on which workers return completed batches. The depth of that channel is equal to the number of workers. When input channel is closed the workers drain it, output any remaining batches (even if smaller than MinRecords), exit, and the output channel is closed. It is more efficient to send multiple records at a time on the input channel but the size of the input slices is independent of MinRecords (and so open to abuse: you could send a huge input slice; up to you to ensure slice sanity). Empty slices and nil records within slices are silently dropped, and so batches returned on the output channel SHOULD always be error free and have >0 records. You should call Start only once.