Documentation ¶
Overview ¶
Package batch implements functions for building, marshaling, and unmarshaling Kafka record batches.
Producing ¶
When producting messages, call NewBuilder, and Add records to it. Call Builder.Build and pass the returned Batch to the producer. Set the Builder to nil when done with it to release references to added records.
Consuming ¶
Fetch result (if successful) will contain RecordSet. Call its Batches method to get byte slices containing individual batches. Unmarshal each batch individually. To get individual records, call Batch.Records and then record.Unmarshal. Passing around batches is much more efficient than passing individual records, so save record unmarshaling until the very end.
Index ¶
Constants ¶
const ( TimestampCreate = 0b0000 TimestampLogAppend = 0b1000 )
Variables ¶
var (
CorruptedBatchError = errors.New("batch crc does not match bytes")
)
var ErrEmpty = errors.New("empty batch")
Functions ¶
This section is empty.
Types ¶
type Batch ¶
type Batch struct { BaseOffset int64 BatchLengthBytes int32 PartitionLeaderEpoch int32 Magic int8 // this should be =2 Crc uint32 Attributes int16 LastOffsetDelta int32 FirstTimestamp int64 // ms since epoch MaxTimestamp int64 // ms since epoch ProducerId int64 // for transactions only see KIP-360 ProducerEpoch int16 // for transactions only see KIP-360 BaseSequence int32 NumRecords int32 // MarshaledRecords []byte `wire:"omit" json:"-"` }
Batch defines Kafka record batch in wire format. Not safe for concurrent use.
func (*Batch) CompressionType ¶
func (*Batch) LastOffset ¶ added in v0.0.3
func (*Batch) Records ¶
func (batch *Batch) Records(d Decompressor) ([][]byte, error)
Records returns byte slices containing decompressed records. It is up to the user to provide appropriate Decompressor based on the value of Batch.CompressionType.
func (*Batch) TimestampType ¶
type Builder ¶
type Builder struct {
// contains filtered or unexported fields
}
Builder is used for building record batches. There is no limit on the number of records (up to the user). Not safe for concurrent use.
func NewBuilder ¶
func (*Builder) Add ¶
Add record to the batch. References to added records are not released on call to Build.
func (*Builder) AddStrings ¶
func (*Builder) Build ¶
Build a record batch. Call this after adding records to the batch. Serializes and compresses records. Does not set the Crc (this is done by Batch.Marshal). Returns ErrEmpty if batch has no records. Build is idempotent.
func (*Builder) NumRecords ¶
NumRecords that have been added to the builder.
type Compressor ¶
Compressor implementations are supplied to the Builder by the user. This gives the user flexibility (say in picking zstd Data Dog or Klaus Post implementations).
type Decompressor ¶
Ditto Compressor.