Documentation ¶
Overview ¶
Package batch implements functions for building, marshaling, and unmarshaling Kafka record batches.
Producing ¶
When producting messages, call NewBuilder, and Add records to it. Call Builder.Build and pass the returned Batch to the producer. Release the reference to Builder when done with it to release references to added records.
Fetching ("consuming")
Fetch result (if successful) will contain RecordSet. Call its Batches method to get byte slices containing individual batches. Unmarshal each batch individually. To get individual records, call Batch.Records and then record.Unmarshal. Passing around batches is much more efficient than passing individual records, so save record unmarshaling until the very end.
Index ¶
- Constants
- Variables
- type Batch
- func (batch *Batch) Compress(c Compressor) error
- func (batch *Batch) CompressionType() int16
- func (batch *Batch) Decompress(d Decompressor) error
- func (batch *Batch) LastOffset() int64
- func (batch *Batch) Marshal() RecordSet
- func (batch *Batch) Records() [][]byte
- func (batch *Batch) TimestampType() int16
- type Builder
- type Compressor
- type Decompressor
- type RecordSet
Constants ¶
const ( TimestampCreate = 0b0000 TimestampLogAppend = 0b1000 )
Variables ¶
var ( ErrEmpty = errors.New("empty batch") ErrNilRecord = errors.New("nil record in batch") )
var ( CorruptedBatchError = errors.New("batch crc does not match bytes") UnsupportedMagicError = errors.New("magic value is not 2") )
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct { BaseOffset int64 BatchLengthBytes int32 PartitionLeaderEpoch int32 Magic int8 // this should be =2 Crc uint32 Attributes int16 LastOffsetDelta int32 // NumRecords-1 // TODO: is this always true? FirstTimestamp int64 // ms since epoch MaxTimestamp int64 // ms since epoch ProducerId int64 // for transactions only see KIP-360 ProducerEpoch int16 // for transactions only see KIP-360 BaseSequence int32 NumRecords int32 // LastOffsetDelta+1 // MarshaledRecords []byte `wire:"omit" json:"-"` }
Batch defines Kafka record batch in wire format. Not safe for concurrent use.
func Unmarshal ¶
Unmarshal the batch. On error batch is nil. If there is an error, it is most likely because the crc failed. In that case there is no way to tell how many records there were in the batch (and to adjust offsets accordingly).
func (*Batch) Compress ¶ added in v0.0.5
func (batch *Batch) Compress(c Compressor) error
Compress batch records with supplied compressor. Mutates batch on success only. Call before Marshal. Not idempotent (on success).
func (*Batch) CompressionType ¶
func (*Batch) Decompress ¶ added in v0.0.5
func (batch *Batch) Decompress(d Decompressor) error
Decompress batch with supplied decompressor. Mutates batch. Call after Unmarshal and before Records. Not idempotent.
func (*Batch) LastOffset ¶ added in v0.0.3
func (*Batch) Marshal ¶
Marshal batch header and append marshaled records. If you want the batch to be compressed call Compress before Marshal. Mutates the batch Crc.
func (*Batch) Records ¶
Records retrieves individual records from the batch. If batch records are compressed you must call Decompress first.
func (*Batch) TimestampType ¶
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder is used for building record batches. There is no limit on the number of records (up to the user). Not safe for concurrent use.
func NewBuilder ¶
func (*Builder) Add ¶
Add records to the batch. References to added records are not released on call to Build. This means you can add more records and call Build again. Don't know why you would want to, but you can.
func (*Builder) AddStrings ¶
func (*Builder) Build ¶
Build a record batch (marshal individual records and set batch metadata). Call this after adding records to the batch. Returns ErrEmpty if batch has no records. Returns ErrNilRecord if any of the records is nil. Marshaled records are not compressed (call Batch.Compress). Batch FirstTimestamp is set to the time when the builder was created (with NewBuilder) and the MaxTimestamp is set to the time passed to Build. Within the batch, each record's TimestampDelta is 0, meaning that all records will appear to have been produced at the time the builder was created (TODO: change? how?) Idempotent.
func (*Builder) NumRecords ¶
NumRecords that have been added to the builder.