Documentation ¶
Overview ¶
Package proto provides kafka binary protocol implementation.
Index ¶
- Constants
- Variables
- func ComputeCrc(m *Message, compression Compression) uint32
- func ConfigureParser(c ParserConfig) error
- func NewDecoder(r io.Reader) *decoder
- func NewEncoder(w io.Writer) *encoder
- func ReadReq(r io.Reader) (requestKind int16, b []byte, err error)
- func ReadResp(r io.Reader) (correlationID int32, b []byte, err error)
- type Compression
- type ConsumerMetadataReq
- type ConsumerMetadataResp
- type FetchReq
- type FetchReqPartition
- type FetchReqTopic
- type FetchResp
- type FetchRespAbortedTransaction
- type FetchRespPartition
- type FetchRespTopic
- type KafkaError
- type Message
- type MetadataReq
- type MetadataResp
- type MetadataRespBroker
- type MetadataRespPartition
- type MetadataRespTopic
- type OffsetCommitReq
- type OffsetCommitReqPartition
- type OffsetCommitReqTopic
- type OffsetCommitResp
- type OffsetCommitRespPartition
- type OffsetCommitRespTopic
- type OffsetFetchReq
- type OffsetFetchReqTopic
- type OffsetFetchResp
- type OffsetFetchRespPartition
- type OffsetFetchRespTopic
- type OffsetReq
- type OffsetReqPartition
- type OffsetReqTopic
- type OffsetResp
- type OffsetRespPartition
- type OffsetRespTopic
- type ParserConfig
- type ProduceReq
- type ProduceReqPartition
- type ProduceReqTopic
- type ProduceResp
- type ProduceRespPartition
- type ProduceRespTopic
Constants ¶
const ( KafkaV0 int16 = iota KafkaV1 KafkaV2 KafkaV3 KafkaV4 KafkaV5 )
const ( ProduceReqKind = 0 FetchReqKind = 1 OffsetReqKind = 2 MetadataReqKind = 3 OffsetCommitReqKind = 8 OffsetFetchReqKind = 9 ConsumerMetadataReqKind = 10 // receive the latest offset (i.e. the offset of the next coming message) OffsetReqTimeLatest = -1 // receive the earliest available offset. Note that because offsets are // pulled in descending order, asking for the earliest offset will always // return you a single element. OffsetReqTimeEarliest = -2 // Server will not send any response. RequiredAcksNone = 0 // Server will block until the message is committed by all in sync replicas // before sending a response. RequiredAcksAll = -1 // Server will wait the data is written to the local log before sending a // response. RequiredAcksLocal = 1 )
const ( CorrelationTypeGroup int8 = 0 CorrelationTypeTransaction = 1 )
Variables ¶
var ( ErrUnknown = &KafkaError{-1, "unknown error"} ErrOffsetOutOfRange = &KafkaError{1, "offset out of range"} ErrInvalidMessage = &KafkaError{2, "invalid message"} ErrUnknownTopicOrPartition = &KafkaError{3, "unknown topic or partition"} ErrInvalidMessageSize = &KafkaError{4, "invalid message size"} ErrLeaderNotAvailable = &KafkaError{5, "leader not available"} ErrNotLeaderForPartition = &KafkaError{6, "not leader for partition"} ErrRequestTimeout = &KafkaError{7, "request timeed out"} ErrBrokerNotAvailable = &KafkaError{8, "broker not available"} ErrReplicaNotAvailable = &KafkaError{9, "replica not available"} ErrMessageSizeTooLarge = &KafkaError{10, "message size too large"} ErrScaleControllerEpoch = &KafkaError{11, "scale controller epoch"} ErrOffsetMetadataTooLarge = &KafkaError{12, "offset metadata too large"} ErrNetwork = &KafkaError{13, "server disconnected before response was received"} ErrOffsetLoadInProgress = &KafkaError{14, "offsets load in progress"} ErrNoCoordinator = &KafkaError{15, "consumer coordinator not available"} ErrNotCoordinator = &KafkaError{16, "not coordinator for consumer"} ErrInvalidTopic = &KafkaError{17, "operation on an invalid topic"} ErrRecordListTooLarge = &KafkaError{18, "message batch larger than the configured segment size"} ErrNotEnoughReplicas = &KafkaError{19, "not enough in-sync replicas"} ErrNotEnoughReplicasAfterAppend = &KafkaError{20, "messages are written to the log, but to fewer in-sync replicas than required"} ErrInvalidRequiredAcks = &KafkaError{21, "invalid value for required acks"} ErrIllegalGeneration = &KafkaError{22, "consumer generation id is not valid"} ErrInconsistentPartitionAssignmentStrategy = &KafkaError{23, "partition assignment strategy does not match that of the group"} ErrUnknownParititonAssignmentStrategy = &KafkaError{24, "partition assignment strategy is unknown to the broker"} ErrUnknownConsumerID = &KafkaError{25, "coordinator is not aware of this consumer"} ErrInvalidSessionTimeout = &KafkaError{26, "invalid session timeout"} ErrRebalanceInProgress = &KafkaError{27, "group is rebalancing, so a rejoin is needed"} ErrInvalidCommitOffsetSize = &KafkaError{28, "offset data size is not valid"} ErrTopicAuthorizationFailed = &KafkaError{29, "topic authorization failed"} ErrGroupAuthorizationFailed = &KafkaError{30, "group authorization failed"} ErrClusterAuthorizationFailed = &KafkaError{31, "cluster authorization failed"} ErrInvalidTimeStamp = &KafkaError{32, "timestamp of the message is out of acceptable range"} )
var ErrInvalidArrayLen = errors.New("invalid array length")
var ErrNotEnoughData = errors.New("not enough data")
Functions ¶
func ComputeCrc ¶
func ComputeCrc(m *Message, compression Compression) uint32
ComputeCrc returns crc32 hash for given message content.
func ConfigureParser ¶
func ConfigureParser(c ParserConfig) error
ConfigureParser configures the parser. It must be called prior to parsing any messages as the structure is currently not prepared for concurrent access.
func NewDecoder ¶
func NewEncoder ¶
func ReadReq ¶
ReadReq returns request kind ID and byte representation of the whole message in wire protocol format.
func ReadResp ¶
ReadResp returns message correlation ID and byte representation of the whole message in wire protocol that is returned when reading from given stream, including 4 bytes of message size itself. Byte representation returned by ReadResp can be parsed by all response reeaders to transform it into specialized response structure.
Types ¶
type Compression ¶
type Compression int8
const ( CompressionNone Compression = 0 CompressionGzip Compression = 1 CompressionSnappy Compression = 2 )
type ConsumerMetadataReq ¶
type ConsumerMetadataReq struct { Version int16 CorrelationID int32 ClientID string ConsumerGroup string CoordinatorType int8 // >= KafkaV1 }
func ReadConsumerMetadataReq ¶
func ReadConsumerMetadataReq(r io.Reader) (*ConsumerMetadataReq, error)
func (*ConsumerMetadataReq) Bytes ¶
func (r *ConsumerMetadataReq) Bytes() ([]byte, error)
type ConsumerMetadataResp ¶
type ConsumerMetadataResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration // >= KafkaV1 Err error ErrMsg string // >= KafkaV1 CoordinatorID int32 CoordinatorHost string CoordinatorPort int32 }
func ReadConsumerMetadataResp ¶
func ReadConsumerMetadataResp(r io.Reader) (*ConsumerMetadataResp, error)
func (*ConsumerMetadataResp) Bytes ¶
func (r *ConsumerMetadataResp) Bytes() ([]byte, error)
type FetchReq ¶
type FetchReqPartition ¶
type FetchReqTopic ¶
type FetchReqTopic struct { Name string Partitions []FetchReqPartition }
type FetchResp ¶
type FetchResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration Topics []FetchRespTopic }
type FetchRespPartition ¶
type FetchRespTopic ¶
type FetchRespTopic struct { Name string Partitions []FetchRespPartition }
type KafkaError ¶
type KafkaError struct {
// contains filtered or unexported fields
}
func (*KafkaError) Errno ¶
func (err *KafkaError) Errno() int
func (*KafkaError) Error ¶
func (err *KafkaError) Error() string
type Message ¶
type Message struct { Key []byte Value []byte Offset int64 // set when fetching and after successful producing Crc uint32 // set when fetching, ignored when producing Topic string // set when fetching, ignored when producing Partition int32 // set when fetching, ignored when producing TipOffset int64 // set when fetching, ignored when processing }
Message represents single entity of message set.
type MetadataReq ¶
type MetadataReq struct { Version int16 CorrelationID int32 ClientID string Topics []string AllowAutoTopicCreation bool // >= KafkaV4 only }
func ReadMetadataReq ¶
func ReadMetadataReq(r io.Reader) (*MetadataReq, error)
func (*MetadataReq) Bytes ¶
func (r *MetadataReq) Bytes() ([]byte, error)
type MetadataResp ¶
type MetadataResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration // >= KafkaV3 Brokers []MetadataRespBroker ClusterID string // >= KafkaV2 ControllerID int32 // >= KafkaV1 Topics []MetadataRespTopic }
func ReadMetadataResp ¶
func ReadMetadataResp(r io.Reader) (*MetadataResp, error)
func (*MetadataResp) Bytes ¶
func (r *MetadataResp) Bytes() ([]byte, error)
type MetadataRespBroker ¶
type MetadataRespPartition ¶
type MetadataRespTopic ¶
type MetadataRespTopic struct { Name string Err error IsInternal bool // >= KafkaV1 Partitions []MetadataRespPartition }
type OffsetCommitReq ¶
type OffsetCommitReq struct { Version int16 CorrelationID int32 ClientID string ConsumerGroup string GroupGenerationID int32 // >= KafkaV1 only MemberID string // >= KafkaV1 only RetentionTime int64 // >= KafkaV2 only Topics []OffsetCommitReqTopic }
func ReadOffsetCommitReq ¶
func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error)
func (*OffsetCommitReq) Bytes ¶
func (r *OffsetCommitReq) Bytes() ([]byte, error)
type OffsetCommitReqTopic ¶
type OffsetCommitReqTopic struct { Name string Partitions []OffsetCommitReqPartition }
type OffsetCommitResp ¶
type OffsetCommitResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration // >= KafkaV3 only Topics []OffsetCommitRespTopic }
func ReadOffsetCommitResp ¶
func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error)
func (*OffsetCommitResp) Bytes ¶
func (r *OffsetCommitResp) Bytes() ([]byte, error)
type OffsetCommitRespTopic ¶
type OffsetCommitRespTopic struct { Name string Partitions []OffsetCommitRespPartition }
type OffsetFetchReq ¶
type OffsetFetchReq struct { Version int16 CorrelationID int32 ClientID string ConsumerGroup string Topics []OffsetFetchReqTopic }
func ReadOffsetFetchReq ¶
func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error)
func (*OffsetFetchReq) Bytes ¶
func (r *OffsetFetchReq) Bytes() ([]byte, error)
type OffsetFetchReqTopic ¶
type OffsetFetchResp ¶
type OffsetFetchResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration // >= KafkaV3 Topics []OffsetFetchRespTopic Err error // >= KafkaV2 }
func ReadOffsetFetchResp ¶
func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error)
func (*OffsetFetchResp) Bytes ¶
func (r *OffsetFetchResp) Bytes() ([]byte, error)
type OffsetFetchRespTopic ¶
type OffsetFetchRespTopic struct { Name string Partitions []OffsetFetchRespPartition }
type OffsetReq ¶
type OffsetReqPartition ¶
type OffsetReqTopic ¶
type OffsetReqTopic struct { Name string Partitions []OffsetReqPartition }
type OffsetResp ¶
type OffsetResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration Topics []OffsetRespTopic }
func ReadOffsetResp ¶
func ReadOffsetResp(r io.Reader) (*OffsetResp, error)
func (*OffsetResp) Bytes ¶
func (r *OffsetResp) Bytes() ([]byte, error)
type OffsetRespPartition ¶
type OffsetRespTopic ¶
type OffsetRespTopic struct { Name string Partitions []OffsetRespPartition }
type ParserConfig ¶
type ParserConfig struct { // SimplifiedMessageSetParsing enables a simplified version of the // MessageSet parser which will not split MessageSet into slices of // Message structures. Instead, the entire MessageSet will be read // over. This mode improves parsing speed due to reduce memory read at // the cost of not providing access to the message payload after // parsing. SimplifiedMessageSetParsing bool }
ParserConfig is optional configuration for the parser. It can be configured via SetParserConfig
type ProduceReq ¶
type ProduceReq struct { Version int16 CorrelationID int32 ClientID string Compression Compression // only used when sending ProduceReqs TransactionalID string RequiredAcks int16 Timeout time.Duration Topics []ProduceReqTopic }
func ReadProduceReq ¶
func ReadProduceReq(r io.Reader) (*ProduceReq, error)
func (*ProduceReq) Bytes ¶
func (r *ProduceReq) Bytes() ([]byte, error)
type ProduceReqPartition ¶
type ProduceReqTopic ¶
type ProduceReqTopic struct { Name string Partitions []ProduceReqPartition }
type ProduceResp ¶
type ProduceResp struct { Version int16 CorrelationID int32 Topics []ProduceRespTopic ThrottleTime time.Duration }
func ReadProduceResp ¶
func ReadProduceResp(r io.Reader) (*ProduceResp, error)
func (*ProduceResp) Bytes ¶
func (r *ProduceResp) Bytes() ([]byte, error)
type ProduceRespPartition ¶
type ProduceRespTopic ¶
type ProduceRespTopic struct { Name string Partitions []ProduceRespPartition }