protocol

package
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2024 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Marshal

func Marshal(version int16, value interface{}) ([]byte, error)

func ReadAll

func ReadAll(b Bytes) ([]byte, error)

ReadAll is similar to ioutil.ReadAll, but it takes advantage of knowing the length of b to minimize the memory footprint.

The function returns a nil slice if b is nil.

func Register

func Register(req, res Message)

Register is automatically called by sub-packages are imported to install a new pair of request/response message types.

func RegisterOverride

func RegisterOverride(req, res Message, key OverrideTypeKey)

func Unmarshal

func Unmarshal(data []byte, version int16, value interface{}) error

func WriteRequest

func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error

func WriteResponse

func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error

Types

type ApiKey

type ApiKey int16
const (
	Produce                      ApiKey = 0
	Fetch                        ApiKey = 1
	ListOffsets                  ApiKey = 2
	Metadata                     ApiKey = 3
	LeaderAndIsr                 ApiKey = 4
	StopReplica                  ApiKey = 5
	UpdateMetadata               ApiKey = 6
	ControlledShutdown           ApiKey = 7
	OffsetCommit                 ApiKey = 8
	OffsetFetch                  ApiKey = 9
	FindCoordinator              ApiKey = 10
	JoinGroup                    ApiKey = 11
	Heartbeat                    ApiKey = 12
	LeaveGroup                   ApiKey = 13
	SyncGroup                    ApiKey = 14
	DescribeGroups               ApiKey = 15
	ListGroups                   ApiKey = 16
	SaslHandshake                ApiKey = 17
	ApiVersions                  ApiKey = 18
	CreateTopics                 ApiKey = 19
	DeleteTopics                 ApiKey = 20
	DeleteRecords                ApiKey = 21
	InitProducerId               ApiKey = 22
	OffsetForLeaderEpoch         ApiKey = 23
	AddPartitionsToTxn           ApiKey = 24
	AddOffsetsToTxn              ApiKey = 25
	EndTxn                       ApiKey = 26
	WriteTxnMarkers              ApiKey = 27
	TxnOffsetCommit              ApiKey = 28
	DescribeAcls                 ApiKey = 29
	CreateAcls                   ApiKey = 30
	DeleteAcls                   ApiKey = 31
	DescribeConfigs              ApiKey = 32
	AlterConfigs                 ApiKey = 33
	AlterReplicaLogDirs          ApiKey = 34
	DescribeLogDirs              ApiKey = 35
	SaslAuthenticate             ApiKey = 36
	CreatePartitions             ApiKey = 37
	CreateDelegationToken        ApiKey = 38
	RenewDelegationToken         ApiKey = 39
	ExpireDelegationToken        ApiKey = 40
	DescribeDelegationToken      ApiKey = 41
	DeleteGroups                 ApiKey = 42
	ElectLeaders                 ApiKey = 43
	IncrementalAlterConfigs      ApiKey = 44
	AlterPartitionReassignments  ApiKey = 45
	ListPartitionReassignments   ApiKey = 46
	OffsetDelete                 ApiKey = 47
	DescribeClientQuotas         ApiKey = 48
	AlterClientQuotas            ApiKey = 49
	DescribeUserScramCredentials ApiKey = 50
	AlterUserScramCredentials    ApiKey = 51
)

func (ApiKey) MaxVersion

func (k ApiKey) MaxVersion() int16

func (ApiKey) MinVersion

func (k ApiKey) MinVersion() int16

func (ApiKey) SelectVersion

func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16

func (ApiKey) String

func (k ApiKey) String() string

type Attributes

type Attributes int16

Attributes is a bitset representing special attributes set on records.

const (
	Gzip          Attributes = Attributes(compress.Gzip)   // 1
	Snappy        Attributes = Attributes(compress.Snappy) // 2
	Lz4           Attributes = Attributes(compress.Lz4)    // 3
	Zstd          Attributes = Attributes(compress.Zstd)   // 4
	Transactional Attributes = 1 << 4
	Control       Attributes = 1 << 5
)

func (Attributes) Compression

func (a Attributes) Compression() compress.Compression

func (Attributes) Control

func (a Attributes) Control() bool

func (Attributes) String

func (a Attributes) String() string

func (Attributes) Transactional

func (a Attributes) Transactional() bool

type Broker

type Broker struct {
	Rack string
	Host string
	Port int32
	ID   int32
}

func (Broker) Format

func (b Broker) Format(w fmt.State, v rune)

func (Broker) String

func (b Broker) String() string

type BrokerMessage

type BrokerMessage interface {
	// Given a representation of the kafka cluster state as argument, returns
	// the broker that the message should be routed to.
	Broker(Cluster) (Broker, error)
}

BrokerMessage is an extension of the Message interface implemented by some request types to customize the broker assignment logic.

type Bytes

type Bytes interface {
	io.ReadCloser
	// Returns the number of bytes remaining to be read from the payload.
	Len() int
}

Bytes is an interface implemented by types that represent immutable sequences of bytes.

Bytes values are used to abstract the location where record keys and values are read from (e.g. in-memory buffers, network sockets, files).

The Close method should be called to release resources held by the object when the program is done with it.

Bytes values are generally not safe to use concurrently from multiple goroutines.

func NewBytes

func NewBytes(b []byte) Bytes

NewBytes constructs a Bytes value from b.

The returned value references b, it does not make a copy of the backing array.

If b is nil, nil is returned to represent a null BYTES value in the kafka protocol.

type Cluster

type Cluster struct {
	ClusterID  string
	Controller int32
	Brokers    map[int32]Broker
	Topics     map[string]Topic
}

func (Cluster) BrokerIDs

func (c Cluster) BrokerIDs() []int32

func (Cluster) Format

func (c Cluster) Format(w fmt.State, _ rune)

func (Cluster) IsZero

func (c Cluster) IsZero() bool

func (Cluster) TopicNames

func (c Cluster) TopicNames() []string

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

func NewConn

func NewConn(conn net.Conn, clientID string) *Conn

func (*Conn) Close

func (c *Conn) Close() error

func (*Conn) Discard

func (c *Conn) Discard(n int) (int, error)

func (*Conn) LocalAddr

func (c *Conn) LocalAddr() net.Addr

func (*Conn) Peek

func (c *Conn) Peek(n int) ([]byte, error)

func (*Conn) Read

func (c *Conn) Read(b []byte) (int, error)

func (*Conn) RemoteAddr

func (c *Conn) RemoteAddr() net.Addr

func (*Conn) RoundTrip

func (c *Conn) RoundTrip(msg Message) (Message, error)

func (*Conn) SetDeadline

func (c *Conn) SetDeadline(t time.Time) error

func (*Conn) SetReadDeadline

func (c *Conn) SetReadDeadline(t time.Time) error

func (*Conn) SetVersions

func (c *Conn) SetVersions(versions map[ApiKey]int16)

func (*Conn) SetWriteDeadline

func (c *Conn) SetWriteDeadline(t time.Time) error

func (*Conn) String

func (c *Conn) String() string

func (*Conn) Write

func (c *Conn) Write(b []byte) (int, error)

type ControlBatch

type ControlBatch struct {
	Attributes           Attributes
	PartitionLeaderEpoch int32
	BaseOffset           int64
	ProducerID           int64
	ProducerEpoch        int16
	BaseSequence         int32
	Records              RecordReader
}

ControlBatch is an implementation of the RecordReader interface representing control batches returned by kafka brokers.

func NewControlBatch

func NewControlBatch(records ...ControlRecord) *ControlBatch

NewControlBatch constructs a control batch from the list of records passed as arguments.

func (*ControlBatch) Offset

func (c *ControlBatch) Offset() int64

func (*ControlBatch) ReadControlRecord

func (c *ControlBatch) ReadControlRecord() (*ControlRecord, error)

func (*ControlBatch) ReadRecord

func (c *ControlBatch) ReadRecord() (*Record, error)

func (*ControlBatch) Version

func (c *ControlBatch) Version() int

type ControlRecord

type ControlRecord struct {
	Offset  int64
	Time    time.Time
	Version int16
	Type    int16
	Data    []byte
	Headers []Header
}

ControlRecord represents a record read from a control batch.

func ReadControlRecord

func ReadControlRecord(r *Record) (*ControlRecord, error)

func (*ControlRecord) Key

func (cr *ControlRecord) Key() Bytes

func (*ControlRecord) Record

func (cr *ControlRecord) Record() Record

func (*ControlRecord) Value

func (cr *ControlRecord) Value() Bytes

type Error

type Error string

Error represents client-side protocol errors.

const (
	// ErrNoTopic is returned when a request needs to be sent to a specific.
	ErrNoTopic Error = "topic not found"

	// ErrNoPartition is returned when a request needs to be sent to a specific
	// partition, but the client did not find it in the cluster metadata.
	ErrNoPartition Error = "topic partition not found"

	// ErrNoLeader is returned when a request needs to be sent to a partition
	// leader, but the client could not determine what the leader was at this
	// time.
	ErrNoLeader Error = "topic partition has no leader"

	// ErrNoRecord is returned when attempting to write a message containing an
	// empty record set (which kafka forbids).
	//
	// We handle this case client-side because kafka will close the connection
	// that it received an empty produce request on, causing all concurrent
	// requests to be aborted.
	ErrNoRecord Error = "record set contains no records"

	// ErrNoReset is returned by ResetRecordReader when the record reader does
	// not support being reset.
	ErrNoReset Error = "record sequence does not support reset"
)

func Errorf

func Errorf(msg string, args ...interface{}) Error

func (Error) Error

func (e Error) Error() string

type GroupMessage

type GroupMessage interface {
	// Returns the group configured on the message.
	Group() string
}

GroupMessage is an extension of the Message interface implemented by some request types to inform the program that they should be routed to a group coordinator.

type Header struct {
	Key   string
	Value []byte
}

Header represents a single entry in a list of record headers.

type Merger

type Merger interface {
	// Given a list of message and associated results, merge them back into a
	// response (or an error). The results must be either Message or error
	// values, other types should trigger a panic.
	Merge(messages []Message, results []interface{}) (Message, error)
}

Merger is an interface implemented by messages which can merge multiple results into one response.

type Message

type Message interface {
	ApiKey() ApiKey
}

Message is an interface implemented by all request and response types of the kafka protocol.

This interface is used mostly as a safe-guard to provide a compile-time check for values passed to functions dealing kafka message types.

func ReadRequest

func ReadRequest(r io.Reader) (apiVersion int16, correlationID int32, clientID string, msg Message, err error)

func ReadResponse

func ReadResponse(r io.Reader, apiKey ApiKey, apiVersion int16) (correlationID int32, msg Message, err error)

func Result

func Result(r interface{}) (Message, error)

Result converts r to a Message or an error, or panics if r could not be converted to these types.

func RoundTrip

func RoundTrip(rw io.ReadWriter, apiVersion int16, correlationID int32, clientID string, req Message) (Message, error)

RoundTrip sends a request to a kafka broker and returns the response.

type MessageSet

type MessageSet struct {
	Attributes Attributes
	BaseOffset int64
	Records    RecordReader
}

MessageSet is an implementation of the RecordReader interface representing regular message sets (v1).

func (*MessageSet) Offset

func (m *MessageSet) Offset() int64

func (*MessageSet) ReadRecord

func (m *MessageSet) ReadRecord() (*Record, error)

func (*MessageSet) Version

func (m *MessageSet) Version() int

type OverrideTypeKey

type OverrideTypeKey int16
const (
	RawProduceOverride OverrideTypeKey = 0
)

type OverrideTypeMessage

type OverrideTypeMessage interface {
	TypeKey() OverrideTypeKey
}

OverrideTypeMessage is an interface implemented by messages that want to override the standard request/response types for a given API.

type Partition

type Partition struct {
	ID       int32
	Error    int16
	Leader   int32
	Replicas []int32
	ISR      []int32
	Offline  []int32
}

type PreparedMessage

type PreparedMessage interface {
	// Prepares the message before being sent to a kafka broker using the API
	// version passed as argument.
	Prepare(apiVersion int16)
}

PreparedMessage is an extension of the Message interface implemented by some request types which may need to run some pre-processing on their state before being sent.

type RawExchanger

type RawExchanger interface {
	// Required should return true when a RawExchange is needed.
	// The passed in versions are the negotiated versions for the connection
	// performing the request.
	Required(versions map[ApiKey]int16) bool
	// RawExchange is given the raw connection to the broker and the Message
	// is responsible for writing itself to the connection as well as reading
	// the response.
	RawExchange(rw io.ReadWriter) (Message, error)
}

RawExchanger is an extention to the Message interface to allow messages to control the request response cycle for the message. This is currently only used to facilitate v0 SASL Authenticate requests being written in a non-standard fashion when the SASL Handshake was done at v0 but not when done at v1.

type RawRecordSet

type RawRecordSet struct {
	// Reader exposes the raw sequence of record set bytes.
	Reader io.Reader
}

RawRecordSet represents a record set for a RawProduce request. The record set is represented as a raw sequence of pre-encoded record set bytes.

func (*RawRecordSet) ReadFrom

func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error)

ReadFrom reads the representation of a record set from r into rrs. It re-uses the existing RecordSet.ReadFrom implementation to first read/decode data into a RecordSet, then writes/encodes the RecordSet to a buffer referenced by the RawRecordSet.

Note: re-using the RecordSet.ReadFrom implementation makes this suboptimal from a performance standpoint as it require an extra copy of the record bytes. Holding off on optimizing, as this code path is only invoked in tests.

func (*RawRecordSet) WriteTo

func (rrs *RawRecordSet) WriteTo(w io.Writer) (int64, error)

WriteTo writes the RawRecordSet to an io.Writer. Since this is a raw record set representation, all that is done here is copying bytes from the underlying reader to the specified writer.

type Record

type Record struct {
	// The offset at which the record exists in a topic partition. This value
	// is ignored in produce requests.
	Offset int64

	// Returns the time of the record. This value may be omitted in produce
	// requests to let kafka set the time when it saves the record.
	Time time.Time

	// Returns a byte sequence containing the key of this record. The returned
	// sequence may be nil to indicate that the record has no key. If the record
	// is part of a RecordSet, the content of the key must remain valid at least
	// until the record set is closed (or until the key is closed).
	Key Bytes

	// Returns a byte sequence containing the value of this record. The returned
	// sequence may be nil to indicate that the record has no value. If the
	// record is part of a RecordSet, the content of the value must remain valid
	// at least until the record set is closed (or until the value is closed).
	Value Bytes

	// Returns the list of headers associated with this record. The returned
	// slice may be reused across calls, the program should use it as an
	// immutable value.
	Headers []Header
}

Record is an interface representing a single kafka record.

Record values are not safe to use concurrently from multiple goroutines.

type RecordBatch

type RecordBatch struct {
	Attributes           Attributes
	PartitionLeaderEpoch int32
	BaseOffset           int64
	ProducerID           int64
	ProducerEpoch        int16
	BaseSequence         int32
	Records              RecordReader
}

RecordBatch is an implementation of the RecordReader interface representing regular record batches (v2).

func (*RecordBatch) Offset

func (r *RecordBatch) Offset() int64

func (*RecordBatch) ReadRecord

func (r *RecordBatch) ReadRecord() (*Record, error)

func (*RecordBatch) Version

func (r *RecordBatch) Version() int

type RecordReader

type RecordReader interface {
	// Returns the next record in the set, or io.EOF if the end of the sequence
	// has been reached.
	//
	// The returned Record is guaranteed to be valid until the next call to
	// ReadRecord. If the program needs to retain the Record value it must make
	// a copy.
	ReadRecord() (*Record, error)
}

RecordReader is an interface representing a sequence of records. Record sets are used in both produce and fetch requests to represent the sequence of records that are sent to or receive from kafka brokers.

RecordSet values are not safe to use concurrently from multiple goroutines.

func MultiRecordReader

func MultiRecordReader(batches ...RecordReader) RecordReader

MultiRecordReader merges multiple record batches into one.

func NewRecordReader

func NewRecordReader(records ...Record) RecordReader

NewRecordReader constructs a reader exposing the records passed as arguments.

type RecordSet

type RecordSet struct {
	// The message version that this record set will be represented as, valid
	// values are 1, or 2.
	//
	// When reading, this is the value of the highest version used in the
	// batches that compose the record set.
	//
	// When writing, this value dictates the format that the records will be
	// encoded in.
	Version int8

	// Attributes set on the record set.
	//
	// When reading, the attributes are the combination of all attributes in
	// the batches that compose the record set.
	//
	// When writing, the attributes apply to the whole sequence of records in
	// the set.
	Attributes Attributes

	// A reader exposing the sequence of records.
	//
	// When reading a RecordSet from an io.Reader, the Records field will be a
	// *RecordStream. If the program needs to access the details of each batch
	// that compose the stream, it may use type assertions to access the
	// underlying types of each batch.
	Records RecordReader
}

RecordSet represents a sequence of records in Produce requests and Fetch responses. All v0, v1, and v2 formats are supported.

func (*RecordSet) ReadFrom

func (rs *RecordSet) ReadFrom(r io.Reader) (int64, error)

ReadFrom reads the representation of a record set from r into rs, returning the number of bytes consumed from r, and an non-nil error if the record set could not be read.

func (*RecordSet) WriteTo

func (rs *RecordSet) WriteTo(w io.Writer) (int64, error)

WriteTo writes the representation of rs into w. The value of rs.Version dictates which format that the record set will be represented as.

The error will be ErrNoRecord if rs contained no records.

Note: since this package is only compatible with kafka 0.10 and above, the method never produces messages in version 0. If rs.Version is zero, the method defaults to producing messages in version 1.

type RecordStream

type RecordStream struct {
	Records []RecordReader
	// contains filtered or unexported fields
}

RecordStream is an implementation of the RecordReader interface which combines multiple underlying RecordReader and only expose records that are not from control batches.

func (*RecordStream) ReadRecord

func (s *RecordStream) ReadRecord() (*Record, error)

type Splitter

type Splitter interface {
	// For a given cluster layout, returns the list of messages constructed
	// from the receiver for each requests that should be sent to the cluster.
	// The second return value is a Merger which can be used to merge back the
	// results of each request into a single message (or an error).
	Split(Cluster) ([]Message, Merger, error)
}

Splitter is an interface implemented by messages that can be split into multiple requests and have their results merged back by a Merger.

type Topic

type Topic struct {
	Name       string
	Error      int16
	Partitions map[int32]Partition
}

type TopicError

type TopicError struct {
	Topic string
	Err   error
}

func NewErrNoTopic

func NewErrNoTopic(topic string) *TopicError

func NewTopicError

func NewTopicError(topic string, err error) *TopicError

func (*TopicError) Error

func (e *TopicError) Error() string

func (*TopicError) Unwrap

func (e *TopicError) Unwrap() error

type TopicPartitionError

type TopicPartitionError struct {
	Topic     string
	Partition int32
	Err       error
}

func NewErrNoLeader

func NewErrNoLeader(topic string, partition int32) *TopicPartitionError

func NewErrNoPartition

func NewErrNoPartition(topic string, partition int32) *TopicPartitionError

func NewTopicPartitionError

func NewTopicPartitionError(topic string, partition int32, err error) *TopicPartitionError

func (*TopicPartitionError) Error

func (e *TopicPartitionError) Error() string

func (*TopicPartitionError) Unwrap

func (e *TopicPartitionError) Unwrap() error

type TransactionalMessage

type TransactionalMessage interface {
	// Returns the transactional id configured on the message.
	Transaction() string
}

TransactionalMessage is an extension of the Message interface implemented by some request types to inform the program that they should be routed to a transaction coordinator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL