batch

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2020 License: BSD-3-Clause Imports: 10 Imported by: 2

Documentation

Overview

Package batch implements functions for building, marshaling, and unmarshaling Kafka record batches.

Producing

When producting messages, call NewBuilder, and Add records to it. Call Builder.Build and pass the returned Batch to the producer. Set the Builder to nil when done with it to release references to added records.

Consuming

Fetch result (if successful) will contain RecordSet. Call its Batches method to get byte slices containing individual batches. Unmarshal each batch individually. To get individual records, call Batch.Records and then record.Unmarshal. Passing around batches is much more efficient than passing individual records, so save record unmarshaling until the very end.

Index

Constants

View Source
const (
	TimestampCreate    = 0b0000
	TimestampLogAppend = 0b1000
)

Variables

View Source
var (
	CorruptedBatchError = errors.New("batch crc does not match bytes")
)
View Source
var ErrEmpty = errors.New("empty batch")

Functions

This section is empty.

Types

type Batch

type Batch struct {
	BaseOffset           int64
	BatchLengthBytes     int32
	PartitionLeaderEpoch int32
	Magic                int8 // this should be =2
	Crc                  uint32
	Attributes           int16
	LastOffsetDelta      int32
	FirstTimestamp       int64 // ms since epoch
	MaxTimestamp         int64 // ms since epoch
	ProducerId           int64 // for transactions only see KIP-360
	ProducerEpoch        int16 // for transactions only see KIP-360
	BaseSequence         int32
	NumRecords           int32
	//
	MarshaledRecords []byte `wire:"omit" json:"-"`
}

Batch defines Kafka record batch in wire format. Not safe for concurrent use.

func Unmarshal

func Unmarshal(b []byte) (*Batch, error)

func (*Batch) CompressionType

func (batch *Batch) CompressionType() int16

func (*Batch) LastOffset added in v0.0.3

func (batch *Batch) LastOffset() int64

func (*Batch) Marshal

func (batch *Batch) Marshal() []byte

Marshal batch. Mutates the Crc.

func (*Batch) Records

func (batch *Batch) Records(d Decompressor) ([][]byte, error)

Records returns byte slices containing decompressed records. It is up to the user to provide appropriate Decompressor based on the value of Batch.CompressionType.

func (*Batch) TimestampType

func (batch *Batch) TimestampType() int16

type Builder

type Builder struct {
	// contains filtered or unexported fields
}

Builder is used for building record batches. There is no limit on the number of records (up to the user). Not safe for concurrent use.

func NewBuilder

func NewBuilder(now time.Time) *Builder

func (*Builder) Add

func (b *Builder) Add(records ...*record.Record)

Add record to the batch. References to added records are not released on call to Build.

func (*Builder) AddStrings

func (b *Builder) AddStrings(values ...string) *Builder

func (*Builder) Build

func (b *Builder) Build(now time.Time, c Compressor) (*Batch, error)

Build a record batch. Call this after adding records to the batch. Serializes and compresses records. Does not set the Crc (this is done by Batch.Marshal). Returns ErrEmpty if batch has no records. Build is idempotent.

func (*Builder) NumRecords

func (b *Builder) NumRecords() int

NumRecords that have been added to the builder.

type Compressor

type Compressor interface {
	Compress([]byte) ([]byte, error)
	Type() int16
}

Compressor implementations are supplied to the Builder by the user. This gives the user flexibility (say in picking zstd Data Dog or Klaus Post implementations).

type Decompressor

type Decompressor interface {
	Decompress([]byte) ([]byte, error)
	Type() int16
}

Ditto Compressor.

type RecordSet

type RecordSet []byte

RecordSet is composed of 1 or more record batches. Fetch API calls respond with record sets. Byte representation of a record set with only one record batch is identical to the record batch.

func (RecordSet) Batches added in v0.0.3

func (b RecordSet) Batches() [][]byte

Batches returns the batches in the record set. Because Kafka limits response byte sizes, the last record batch in the set may be truncated (bytes will be missing from the end). In such case the last batch is discarded.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL