Documentation ¶
Overview ¶
Package proto provides kafka binary protocol implementation.
Index ¶
- Constants
- Variables
- func ComputeCrc(m *Message, compression Compression) uint32
- func NewDecoder(r io.Reader) *decoder
- func NewEncoder(w io.Writer) *encoder
- func ReadReq(r io.Reader) (requestKind int16, b []byte, err error)
- func ReadResp(r io.Reader) (correlationID int32, b []byte, err error)
- type Compression
- type FetchReq
- type FetchReqPartition
- type FetchReqTopic
- type FetchResp
- type FetchRespPartition
- type FetchRespTopic
- type GroupCoordinatorReq
- type GroupCoordinatorResp
- type KafkaError
- type Message
- type MetadataReq
- type MetadataResp
- type MetadataRespBroker
- type MetadataRespPartition
- type MetadataRespTopic
- type OffsetCommitReq
- type OffsetCommitReqPartition
- type OffsetCommitReqTopic
- type OffsetCommitResp
- type OffsetCommitRespPartition
- type OffsetCommitRespTopic
- type OffsetFetchReq
- type OffsetFetchReqTopic
- type OffsetFetchResp
- type OffsetFetchRespPartition
- type OffsetFetchRespTopic
- type OffsetReq
- type OffsetReqPartition
- type OffsetReqTopic
- type OffsetResp
- type OffsetRespPartition
- type OffsetRespTopic
- type ProduceReq
- type ProduceReqPartition
- type ProduceReqTopic
- type ProduceResp
- type ProduceRespPartition
- type ProduceRespTopic
- type Request
Constants ¶
View Source
const ( ProduceReqKind = 0 FetchReqKind = 1 OffsetReqKind = 2 MetadataReqKind = 3 OffsetCommitReqKind = 8 OffsetFetchReqKind = 9 GroupCoordinatorReqKind = 10 // receive the latest offset (i.e. the offset of the next coming message) OffsetReqTimeLatest = -1 // receive the earliest available offset. Note that because offsets are // pulled in descending order, asking for the earliest offset will always // return you a single element. OffsetReqTimeEarliest = -2 // Server will not send any response. RequiredAcksNone = 0 // Server will block until the message is committed by all in sync replicas // before sending a response. RequiredAcksAll = -1 // Server will wait the data is written to the local log before sending a // response. RequiredAcksLocal = 1 )
Variables ¶
View Source
var ( ErrUnknown = &KafkaError{-1, "unknown error"} ErrOffsetOutOfRange = &KafkaError{1, "offset out of range"} ErrInvalidMessage = &KafkaError{2, "invalid message"} ErrUnknownTopicOrPartition = &KafkaError{3, "[transient] unknown topic or partition"} ErrInvalidMessageSize = &KafkaError{4, "invalid message size"} ErrLeaderNotAvailable = &KafkaError{5, "[transient] leader not available"} ErrNotLeaderForPartition = &KafkaError{6, "[transient] not leader for partition"} ErrRequestTimeout = &KafkaError{7, "[transient] request timed out"} ErrBrokerNotAvailable = &KafkaError{8, "broker not available"} ErrReplicaNotAvailable = &KafkaError{9, "replica not available"} ErrMessageSizeTooLarge = &KafkaError{10, "message size too large"} ErrScaleControllerEpoch = &KafkaError{11, "scale controller epoch"} ErrOffsetMetadataTooLarge = &KafkaError{12, "offset metadata too large"} ErrOffsetLoadInProgress = &KafkaError{14, "[transient] offsets load in progress"} ErrNoCoordinator = &KafkaError{15, "[transient] consumer coordinator not available"} ErrNotCoordinator = &KafkaError{16, "[transient] not coordinator for consumer"} ErrInvalidTopic = &KafkaError{17, "operation on an invalid topic"} ErrRecordListTooLarge = &KafkaError{18, "message batch larger than the configured segment size"} ErrNotEnoughReplicas = &KafkaError{19, "[transient] not enough in-sync replicas"} ErrNotEnoughReplicasAfterAppend = &KafkaError{20, "[transient] messages are written to the log, but to fewer in-sync replicas than required"} ErrInvalidRequiredAcks = &KafkaError{21, "invalid value for required acks"} ErrIllegalGeneration = &KafkaError{22, "consumer generation id is not valid"} ErrInconsistentPartitionAssignmentStrategy = &KafkaError{23, "partition assignment strategy does not match that of the group"} ErrUnknownParititonAssignmentStrategy = &KafkaError{24, "partition assignment strategy is unknown to the broker"} ErrUnknownConsumerID = &KafkaError{25, "coordinator is not aware of this consumer"} ErrInvalidSessionTimeout = &KafkaError{26, "invalid session timeout"} ErrCommitingParitionsNotAssigned = &KafkaError{27, "committing partitions are not assigned the committer"} ErrInvalidCommitOffsetSize = &KafkaError{28, "offset data size is not valid"} ErrAuthorizationFailed = &KafkaError{29, "not authorized"} ErrRebalanceInProgress = &KafkaError{30, "group is rebalancing, rejoin is needed"} )
View Source
var ErrNotEnoughData = errors.New("not enough data")
Functions ¶
func ComputeCrc ¶
func ComputeCrc(m *Message, compression Compression) uint32
ComputeCrc returns crc32 hash for given message content.
func NewDecoder ¶
func NewEncoder ¶
func ReadReq ¶
ReadReq returns request kind ID and byte representation of the whole message in wire protocol format.
func ReadResp ¶
ReadResp returns message correlation ID and byte representation of the whole message in wire protocol that is returned when reading from given stream, including 4 bytes of message size itself. Byte representation returned by ReadResp can be parsed by all response reeaders to transform it into specialized response structure.
Types ¶
type Compression ¶
type Compression int8
const ( CompressionNone Compression = 0 CompressionGzip Compression = 1 CompressionSnappy Compression = 2 )
type FetchReq ¶
type FetchReqPartition ¶
type FetchReqTopic ¶
type FetchReqTopic struct { Name string Partitions []FetchReqPartition }
type FetchResp ¶
type FetchResp struct { CorrelationID int32 Topics []FetchRespTopic }
type FetchRespPartition ¶
type FetchRespTopic ¶
type FetchRespTopic struct { Name string Partitions []FetchRespPartition }
type GroupCoordinatorReq ¶
func ReadGroupCoordinatorReq ¶
func ReadGroupCoordinatorReq(r io.Reader) (*GroupCoordinatorReq, error)
func (*GroupCoordinatorReq) Bytes ¶
func (r *GroupCoordinatorReq) Bytes() ([]byte, error)
type GroupCoordinatorResp ¶
type GroupCoordinatorResp struct { CorrelationID int32 Err error CoordinatorID int32 CoordinatorHost string CoordinatorPort int32 }
func ReadGroupCoordinatorResp ¶
func ReadGroupCoordinatorResp(r io.Reader) (*GroupCoordinatorResp, error)
func (*GroupCoordinatorResp) Bytes ¶
func (r *GroupCoordinatorResp) Bytes() ([]byte, error)
type KafkaError ¶
type KafkaError struct {
// contains filtered or unexported fields
}
func (*KafkaError) Errno ¶
func (err *KafkaError) Errno() int
func (*KafkaError) Error ¶
func (err *KafkaError) Error() string
type Message ¶
type Message struct { Key []byte Value []byte Offset int64 // set when fetching and after successful producing Crc uint32 // set when fetching, ignored when producing Topic string // set when fetching, ignored when producing Partition int32 // set when fetching, ignored when producing TipOffset int64 // set when fetching, ignored when processing }
Message represents single entity of message set.
type MetadataReq ¶
func ReadMetadataReq ¶
func ReadMetadataReq(r io.Reader) (*MetadataReq, error)
func (*MetadataReq) Bytes ¶
func (r *MetadataReq) Bytes() ([]byte, error)
type MetadataResp ¶
type MetadataResp struct { CorrelationID int32 Brokers []MetadataRespBroker Topics []MetadataRespTopic }
func ReadMetadataResp ¶
func ReadMetadataResp(r io.Reader) (*MetadataResp, error)
func (*MetadataResp) Bytes ¶
func (r *MetadataResp) Bytes() ([]byte, error)
type MetadataRespBroker ¶
type MetadataRespPartition ¶
type MetadataRespTopic ¶
type MetadataRespTopic struct { Name string Err error Partitions []MetadataRespPartition }
type OffsetCommitReq ¶
type OffsetCommitReq struct { CorrelationID int32 ClientID string ConsumerGroup string Topics []OffsetCommitReqTopic }
func ReadOffsetCommitReq ¶
func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error)
func (*OffsetCommitReq) Bytes ¶
func (r *OffsetCommitReq) Bytes() ([]byte, error)
type OffsetCommitReqTopic ¶
type OffsetCommitReqTopic struct { Name string Partitions []OffsetCommitReqPartition }
type OffsetCommitResp ¶
type OffsetCommitResp struct { CorrelationID int32 Topics []OffsetCommitRespTopic }
func ReadOffsetCommitResp ¶
func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error)
func (*OffsetCommitResp) Bytes ¶
func (r *OffsetCommitResp) Bytes() ([]byte, error)
type OffsetCommitRespTopic ¶
type OffsetCommitRespTopic struct { Name string Partitions []OffsetCommitRespPartition }
type OffsetFetchReq ¶
type OffsetFetchReq struct { CorrelationID int32 ClientID string ConsumerGroup string Topics []OffsetFetchReqTopic }
func ReadOffsetFetchReq ¶
func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error)
func (*OffsetFetchReq) Bytes ¶
func (r *OffsetFetchReq) Bytes() ([]byte, error)
type OffsetFetchReqTopic ¶
type OffsetFetchResp ¶
type OffsetFetchResp struct { CorrelationID int32 Topics []OffsetFetchRespTopic }
func ReadOffsetFetchResp ¶
func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error)
func (*OffsetFetchResp) Bytes ¶
func (r *OffsetFetchResp) Bytes() ([]byte, error)
type OffsetFetchRespTopic ¶
type OffsetFetchRespTopic struct { Name string Partitions []OffsetFetchRespPartition }
type OffsetReq ¶
type OffsetReq struct { CorrelationID int32 ClientID string ReplicaID int32 Topics []OffsetReqTopic }
type OffsetReqPartition ¶
type OffsetReqTopic ¶
type OffsetReqTopic struct { Name string Partitions []OffsetReqPartition }
type OffsetResp ¶
type OffsetResp struct { CorrelationID int32 Topics []OffsetRespTopic }
func ReadOffsetResp ¶
func ReadOffsetResp(r io.Reader) (*OffsetResp, error)
func (*OffsetResp) Bytes ¶
func (r *OffsetResp) Bytes() ([]byte, error)
type OffsetRespPartition ¶
type OffsetRespTopic ¶
type OffsetRespTopic struct { Name string Partitions []OffsetRespPartition }
type ProduceReq ¶
type ProduceReq struct { CorrelationID int32 ClientID string Compression Compression // only used when sending ProduceReqs RequiredAcks int16 Timeout time.Duration Topics []ProduceReqTopic }
func ReadProduceReq ¶
func ReadProduceReq(r io.Reader) (*ProduceReq, error)
func (*ProduceReq) Bytes ¶
func (r *ProduceReq) Bytes() ([]byte, error)
type ProduceReqPartition ¶
type ProduceReqTopic ¶
type ProduceReqTopic struct { Name string Partitions []ProduceReqPartition }
type ProduceResp ¶
type ProduceResp struct { CorrelationID int32 Topics []ProduceRespTopic }
func ReadProduceResp ¶
func ReadProduceResp(r io.Reader) (*ProduceResp, error)
func (*ProduceResp) Bytes ¶
func (r *ProduceResp) Bytes() ([]byte, error)
type ProduceRespPartition ¶
type ProduceRespTopic ¶
type ProduceRespTopic struct { Name string Partitions []ProduceRespPartition }
Click to show internal directories.
Click to hide internal directories.