proto

package
v0.0.0-...-daa58b7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package proto provides kafka binary protocol implementation.

Index

Constants

View Source
const (
	KafkaV0 int16 = iota
	KafkaV1
	KafkaV2
	KafkaV3
	KafkaV4
	KafkaV5
)
View Source
const (
	ProduceReqKind          = 0
	FetchReqKind            = 1
	OffsetReqKind           = 2
	MetadataReqKind         = 3
	OffsetCommitReqKind     = 8
	OffsetFetchReqKind      = 9
	ConsumerMetadataReqKind = 10

	// receive the latest offset (i.e. the offset of the next coming message)
	OffsetReqTimeLatest = -1

	// receive the earliest available offset. Note that because offsets are
	// pulled in descending order, asking for the earliest offset will always
	// return you a single element.
	OffsetReqTimeEarliest = -2

	// Server will not send any response.
	RequiredAcksNone = 0

	// Server will block until the message is committed by all in sync replicas
	// before sending a response.
	RequiredAcksAll = -1

	// Server will wait the data is written to the local log before sending a
	// response.
	RequiredAcksLocal = 1
)
View Source
const (
	CorrelationTypeGroup       int8 = 0
	CorrelationTypeTransaction      = 1
)

Variables

View Source
var (
	ErrUnknown                                 = &KafkaError{-1, "unknown error"}
	ErrOffsetOutOfRange                        = &KafkaError{1, "offset out of range"}
	ErrInvalidMessage                          = &KafkaError{2, "invalid message"}
	ErrUnknownTopicOrPartition                 = &KafkaError{3, "unknown topic or partition"}
	ErrInvalidMessageSize                      = &KafkaError{4, "invalid message size"}
	ErrLeaderNotAvailable                      = &KafkaError{5, "leader not available"}
	ErrNotLeaderForPartition                   = &KafkaError{6, "not leader for partition"}
	ErrRequestTimeout                          = &KafkaError{7, "request timeed out"}
	ErrBrokerNotAvailable                      = &KafkaError{8, "broker not available"}
	ErrReplicaNotAvailable                     = &KafkaError{9, "replica not available"}
	ErrMessageSizeTooLarge                     = &KafkaError{10, "message size too large"}
	ErrScaleControllerEpoch                    = &KafkaError{11, "scale controller epoch"}
	ErrOffsetMetadataTooLarge                  = &KafkaError{12, "offset metadata too large"}
	ErrNetwork                                 = &KafkaError{13, "server disconnected before response was received"}
	ErrOffsetLoadInProgress                    = &KafkaError{14, "offsets load in progress"}
	ErrNoCoordinator                           = &KafkaError{15, "consumer coordinator not available"}
	ErrNotCoordinator                          = &KafkaError{16, "not coordinator for consumer"}
	ErrInvalidTopic                            = &KafkaError{17, "operation on an invalid topic"}
	ErrRecordListTooLarge                      = &KafkaError{18, "message batch larger than the configured segment size"}
	ErrNotEnoughReplicas                       = &KafkaError{19, "not enough in-sync replicas"}
	ErrNotEnoughReplicasAfterAppend            = &KafkaError{20, "messages are written to the log, but to fewer in-sync replicas than required"}
	ErrInvalidRequiredAcks                     = &KafkaError{21, "invalid value for required acks"}
	ErrIllegalGeneration                       = &KafkaError{22, "consumer generation id is not valid"}
	ErrInconsistentPartitionAssignmentStrategy = &KafkaError{23, "partition assignment strategy does not match that of the group"}
	ErrUnknownParititonAssignmentStrategy      = &KafkaError{24, "partition assignment strategy is unknown to the broker"}
	ErrUnknownConsumerID                       = &KafkaError{25, "coordinator is not aware of this consumer"}
	ErrInvalidSessionTimeout                   = &KafkaError{26, "invalid session timeout"}
	ErrRebalanceInProgress                     = &KafkaError{27, "group is rebalancing, so a rejoin is needed"}
	ErrInvalidCommitOffsetSize                 = &KafkaError{28, "offset data size is not valid"}
	ErrTopicAuthorizationFailed                = &KafkaError{29, "topic authorization failed"}
	ErrGroupAuthorizationFailed                = &KafkaError{30, "group authorization failed"}
	ErrClusterAuthorizationFailed              = &KafkaError{31, "cluster authorization failed"}
	ErrInvalidTimeStamp                        = &KafkaError{32, "timestamp of the message is out of acceptable range"}
)
View Source
var ErrInvalidArrayLen = errors.New("invalid array length")
View Source
var ErrNotEnoughData = errors.New("not enough data")

Functions

func ComputeCrc

func ComputeCrc(m *Message, compression Compression) uint32

ComputeCrc returns crc32 hash for given message content.

func ConfigureParser

func ConfigureParser(c ParserConfig) error

ConfigureParser configures the parser. It must be called prior to parsing any messages as the structure is currently not prepared for concurrent access.

func NewDecoder

func NewDecoder(r io.Reader) *decoder

func NewEncoder

func NewEncoder(w io.Writer) *encoder

func ReadReq

func ReadReq(r io.Reader) (requestKind int16, b []byte, err error)

ReadReq returns request kind ID and byte representation of the whole message in wire protocol format.

func ReadResp

func ReadResp(r io.Reader) (correlationID int32, b []byte, err error)

ReadResp returns message correlation ID and byte representation of the whole message in wire protocol that is returned when reading from given stream, including 4 bytes of message size itself. Byte representation returned by ReadResp can be parsed by all response reeaders to transform it into specialized response structure.

Types

type Compression

type Compression int8
const (
	CompressionNone   Compression = 0
	CompressionGzip   Compression = 1
	CompressionSnappy Compression = 2
)

type ConsumerMetadataReq

type ConsumerMetadataReq struct {
	Version         int16
	CorrelationID   int32
	ClientID        string
	ConsumerGroup   string
	CoordinatorType int8 // >= KafkaV1
}

func ReadConsumerMetadataReq

func ReadConsumerMetadataReq(r io.Reader) (*ConsumerMetadataReq, error)

func (*ConsumerMetadataReq) Bytes

func (r *ConsumerMetadataReq) Bytes(version int16) ([]byte, error)

func (*ConsumerMetadataReq) WriteTo

func (r *ConsumerMetadataReq) WriteTo(w io.Writer, version int16) (int64, error)

type ConsumerMetadataResp

type ConsumerMetadataResp struct {
	CorrelationID   int32
	ThrottleTime    time.Duration // >= KafkaV1
	Err             error
	ErrMsg          string // >= KafkaV1
	CoordinatorID   int32
	CoordinatorHost string
	CoordinatorPort int32
}

func ReadConsumerMetadataResp

func ReadConsumerMetadataResp(r io.Reader) (*ConsumerMetadataResp, error)

func (*ConsumerMetadataResp) Bytes

func (r *ConsumerMetadataResp) Bytes(version int16) ([]byte, error)

type FetchReq

type FetchReq struct {
	Version        int16
	CorrelationID  int32
	ClientID       string
	ReplicaID      int32
	MaxWaitTime    time.Duration
	MinBytes       int32
	MaxBytes       int32 // >= KafkaV3
	IsolationLevel int8  // >= KafkaV4

	Topics []FetchReqTopic
}

func ReadFetchReq

func ReadFetchReq(r io.Reader) (*FetchReq, error)

func (*FetchReq) Bytes

func (r *FetchReq) Bytes(version int16) ([]byte, error)

func (*FetchReq) WriteTo

func (r *FetchReq) WriteTo(w io.Writer, version int16) (int64, error)

type FetchReqPartition

type FetchReqPartition struct {
	ID             int32
	FetchOffset    int64
	LogStartOffset int64 // >= KafkaV5
	MaxBytes       int32
}

type FetchReqTopic

type FetchReqTopic struct {
	Name       string
	Partitions []FetchReqPartition
}

type FetchResp

type FetchResp struct {
	CorrelationID int32
	ThrottleTime  time.Duration
	Topics        []FetchRespTopic
}

func ReadFetchResp

func ReadFetchResp(r io.Reader) (*FetchResp, error)

func (*FetchResp) Bytes

func (r *FetchResp) Bytes(version int16) ([]byte, error)

type FetchRespAbortedTransaction

type FetchRespAbortedTransaction struct {
	ProducerID  int64
	FirstOffset int64
}

type FetchRespPartition

type FetchRespPartition struct {
	ID                  int32
	Err                 error
	TipOffset           int64
	LastStableOffset    int64
	LogStartOffset      int64
	AbortedTransactions []FetchRespAbortedTransaction
	Messages            []*Message
}

type FetchRespTopic

type FetchRespTopic struct {
	Name       string
	Partitions []FetchRespPartition
}

type KafkaError

type KafkaError struct {
	// contains filtered or unexported fields
}

func (*KafkaError) Errno

func (err *KafkaError) Errno() int

func (*KafkaError) Error

func (err *KafkaError) Error() string

type Message

type Message struct {
	Key       []byte
	Value     []byte
	Offset    int64  // set when fetching and after successful producing
	Crc       uint32 // set when fetching, ignored when producing
	Topic     string // set when fetching, ignored when producing
	Partition int32  // set when fetching, ignored when producing
	TipOffset int64  // set when fetching, ignored when processing
}

Message represents single entity of message set.

type MetadataReq

type MetadataReq struct {
	Version                int16
	CorrelationID          int32
	ClientID               string
	Topics                 []string
	AllowAutoTopicCreation bool // >= KafkaV4 only
}

func ReadMetadataReq

func ReadMetadataReq(r io.Reader) (*MetadataReq, error)

func (*MetadataReq) Bytes

func (r *MetadataReq) Bytes(version int16) ([]byte, error)

func (*MetadataReq) WriteTo

func (r *MetadataReq) WriteTo(w io.Writer, version int16) (int64, error)

type MetadataResp

type MetadataResp struct {
	CorrelationID int32
	ThrottleTime  time.Duration // >= KafkaV3
	Brokers       []MetadataRespBroker
	ClusterID     string // >= KafkaV2
	ControllerID  int32  // >= KafkaV1
	Topics        []MetadataRespTopic
}

func ReadMetadataResp

func ReadMetadataResp(r io.Reader) (*MetadataResp, error)

func (*MetadataResp) Bytes

func (r *MetadataResp) Bytes(version int16) ([]byte, error)

type MetadataRespBroker

type MetadataRespBroker struct {
	NodeID int32
	Host   string
	Port   int32
	Rack   string // >= KafkaV1
}

type MetadataRespPartition

type MetadataRespPartition struct {
	Err      error
	ID       int32
	Leader   int32
	Replicas []int32
	Isrs     []int32
}

type MetadataRespTopic

type MetadataRespTopic struct {
	Name       string
	Err        error
	IsInternal bool // >= KafkaV1
	Partitions []MetadataRespPartition
}

type OffsetCommitReq

type OffsetCommitReq struct {
	Version           int16
	CorrelationID     int32
	ClientID          string
	ConsumerGroup     string
	GroupGenerationID int32  // >= KafkaV1 only
	MemberID          string // >= KafkaV1 only
	RetentionTime     int64  // >= KafkaV2 only
	Topics            []OffsetCommitReqTopic
}

func ReadOffsetCommitReq

func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error)

func (*OffsetCommitReq) Bytes

func (r *OffsetCommitReq) Bytes(version int16) ([]byte, error)

func (*OffsetCommitReq) WriteTo

func (r *OffsetCommitReq) WriteTo(w io.Writer, version int16) (int64, error)

type OffsetCommitReqPartition

type OffsetCommitReqPartition struct {
	ID        int32
	Offset    int64
	TimeStamp time.Time // == KafkaV1 only
	Metadata  string
}

type OffsetCommitReqTopic

type OffsetCommitReqTopic struct {
	Name       string
	Partitions []OffsetCommitReqPartition
}

type OffsetCommitResp

type OffsetCommitResp struct {
	CorrelationID int32
	ThrottleTime  time.Duration // >= KafkaV3 only
	Topics        []OffsetCommitRespTopic
}

func ReadOffsetCommitResp

func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error)

func (*OffsetCommitResp) Bytes

func (r *OffsetCommitResp) Bytes(version int16) ([]byte, error)

type OffsetCommitRespPartition

type OffsetCommitRespPartition struct {
	ID  int32
	Err error
}

type OffsetCommitRespTopic

type OffsetCommitRespTopic struct {
	Name       string
	Partitions []OffsetCommitRespPartition
}

type OffsetFetchReq

type OffsetFetchReq struct {
	Version       int16
	CorrelationID int32
	ClientID      string
	ConsumerGroup string
	Topics        []OffsetFetchReqTopic
}

func ReadOffsetFetchReq

func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error)

func (*OffsetFetchReq) Bytes

func (r *OffsetFetchReq) Bytes(version int16) ([]byte, error)

func (*OffsetFetchReq) WriteTo

func (r *OffsetFetchReq) WriteTo(w io.Writer, version int16) (int64, error)

type OffsetFetchReqTopic

type OffsetFetchReqTopic struct {
	Name       string
	Partitions []int32
}

type OffsetFetchResp

type OffsetFetchResp struct {
	CorrelationID int32
	ThrottleTime  time.Duration // >= KafkaV3
	Topics        []OffsetFetchRespTopic
	Err           error // >= KafkaV2
}

func ReadOffsetFetchResp

func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error)

func (*OffsetFetchResp) Bytes

func (r *OffsetFetchResp) Bytes(version int16) ([]byte, error)

type OffsetFetchRespPartition

type OffsetFetchRespPartition struct {
	ID       int32
	Offset   int64
	Metadata string
	Err      error
}

type OffsetFetchRespTopic

type OffsetFetchRespTopic struct {
	Name       string
	Partitions []OffsetFetchRespPartition
}

type OffsetReq

type OffsetReq struct {
	Version        int16
	CorrelationID  int32
	ClientID       string
	ReplicaID      int32
	IsolationLevel int8
	Topics         []OffsetReqTopic
}

func ReadOffsetReq

func ReadOffsetReq(r io.Reader) (*OffsetReq, error)

func (*OffsetReq) Bytes

func (r *OffsetReq) Bytes(version int16) ([]byte, error)

func (*OffsetReq) WriteTo

func (r *OffsetReq) WriteTo(w io.Writer, version int16) (int64, error)

type OffsetReqPartition

type OffsetReqPartition struct {
	ID         int32
	TimeMs     int64 // cannot be time.Time because of negative values
	MaxOffsets int32 // == KafkaV0 only
}

type OffsetReqTopic

type OffsetReqTopic struct {
	Name       string
	Partitions []OffsetReqPartition
}

type OffsetResp

type OffsetResp struct {
	CorrelationID int32
	ThrottleTime  time.Duration
	Topics        []OffsetRespTopic
}

func ReadOffsetResp

func ReadOffsetResp(r io.Reader) (*OffsetResp, error)

func (*OffsetResp) Bytes

func (r *OffsetResp) Bytes(version int16) ([]byte, error)

type OffsetRespPartition

type OffsetRespPartition struct {
	ID        int32
	Err       error
	TimeStamp time.Time // >= KafkaV1 only
	Offsets   []int64
}

type OffsetRespTopic

type OffsetRespTopic struct {
	Name       string
	Partitions []OffsetRespPartition
}

type ParserConfig

type ParserConfig struct {
	// SimplifiedMessageSetParsing enables a simplified version of the
	// MessageSet parser which will not split MessageSet into slices of
	// Message structures. Instead, the entire MessageSet will be read
	// over. This mode improves parsing speed due to reduce memory read at
	// the cost of not providing access to the message payload after
	// parsing.
	SimplifiedMessageSetParsing bool
}

ParserConfig is optional configuration for the parser. It can be configured via SetParserConfig

type ProduceReq

type ProduceReq struct {
	Version         int16
	CorrelationID   int32
	ClientID        string
	Compression     Compression // only used when sending ProduceReqs
	TransactionalID string
	RequiredAcks    int16
	Timeout         time.Duration
	Topics          []ProduceReqTopic
}

func ReadProduceReq

func ReadProduceReq(r io.Reader) (*ProduceReq, error)

func (*ProduceReq) Bytes

func (r *ProduceReq) Bytes(version int16) ([]byte, error)

func (*ProduceReq) WriteTo

func (r *ProduceReq) WriteTo(w io.Writer, version int16) (int64, error)

type ProduceReqPartition

type ProduceReqPartition struct {
	ID       int32
	Messages []*Message
}

type ProduceReqTopic

type ProduceReqTopic struct {
	Name       string
	Partitions []ProduceReqPartition
}

type ProduceResp

type ProduceResp struct {
	CorrelationID int32
	Topics        []ProduceRespTopic
	ThrottleTime  time.Duration
}

func ReadProduceResp

func ReadProduceResp(r io.Reader) (*ProduceResp, error)

func (*ProduceResp) Bytes

func (r *ProduceResp) Bytes(version int16) ([]byte, error)

type ProduceRespPartition

type ProduceRespPartition struct {
	ID            int32
	Err           error
	Offset        int64
	LogAppendTime int64
}

type ProduceRespTopic

type ProduceRespTopic struct {
	Name       string
	Partitions []ProduceRespPartition
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL