Versions in this module Expand all Collapse all v0 v0.1.0 Apr 15, 2021 Changes in this version + const FetchReqKind + const GroupCoordinatorReqKind + const MetadataReqKind + const OffsetCommitReqKind + const OffsetFetchReqKind + const OffsetReqKind + const OffsetReqTimeEarliest + const OffsetReqTimeLatest + const ProduceReqKind + const RequiredAcksAll + const RequiredAcksLocal + const RequiredAcksNone + var ErrAuthorizationFailed = &KafkaError + var ErrBrokerNotAvailable = &KafkaError + var ErrCommitingParitionsNotAssigned = &KafkaError + var ErrIllegalGeneration = &KafkaError + var ErrInconsistentPartitionAssignmentStrategy = &KafkaError + var ErrInvalidCommitOffsetSize = &KafkaError + var ErrInvalidMessage = &KafkaError + var ErrInvalidMessageSize = &KafkaError + var ErrInvalidRequiredAcks = &KafkaError + var ErrInvalidSessionTimeout = &KafkaError + var ErrInvalidTopic = &KafkaError + var ErrLeaderNotAvailable = &KafkaError + var ErrMessageSizeTooLarge = &KafkaError + var ErrNoCoordinator = &KafkaError + var ErrNotCoordinator = &KafkaError + var ErrNotEnoughData = errors.New("not enough data") + var ErrNotEnoughReplicas = &KafkaError + var ErrNotEnoughReplicasAfterAppend = &KafkaError + var ErrNotLeaderForPartition = &KafkaError + var ErrOffsetLoadInProgress = &KafkaError + var ErrOffsetMetadataTooLarge = &KafkaError + var ErrOffsetOutOfRange = &KafkaError + var ErrRebalanceInProgress = &KafkaError + var ErrRecordListTooLarge = &KafkaError + var ErrReplicaNotAvailable = &KafkaError + var ErrRequestTimeout = &KafkaError + var ErrScaleControllerEpoch = &KafkaError + var ErrUnknown = &KafkaError + var ErrUnknownConsumerID = &KafkaError + var ErrUnknownParititonAssignmentStrategy = &KafkaError + var ErrUnknownTopicOrPartition = &KafkaError + func ComputeCrc(m *Message, compression Compression) uint32 + func NewDecoder(r io.Reader) *decoder + func NewEncoder(w io.Writer) *encoder + func ReadReq(r io.Reader) (requestKind int16, b []byte, err error) + func ReadResp(r io.Reader) (correlationID int32, b []byte, err error) + type Compression int8 + const CompressionGzip + const CompressionNone + const CompressionSnappy + type FetchReq struct + ClientID string + CorrelationID int32 + MaxWaitTime time.Duration + MinBytes int32 + Topics []FetchReqTopic + func ReadFetchReq(r io.Reader) (*FetchReq, error) + func (r *FetchReq) Bytes() ([]byte, error) + func (r *FetchReq) WriteTo(w io.Writer) (int64, error) + type FetchReqPartition struct + FetchOffset int64 + ID int32 + MaxBytes int32 + type FetchReqTopic struct + Name string + Partitions []FetchReqPartition + type FetchResp struct + CorrelationID int32 + Topics []FetchRespTopic + func ReadFetchResp(r io.Reader) (*FetchResp, error) + func (r *FetchResp) Bytes() ([]byte, error) + type FetchRespPartition struct + Err error + ID int32 + Messages []*Message + TipOffset int64 + type FetchRespTopic struct + Name string + Partitions []FetchRespPartition + type GroupCoordinatorReq struct + ClientID string + ConsumerGroup string + CorrelationID int32 + func ReadGroupCoordinatorReq(r io.Reader) (*GroupCoordinatorReq, error) + func (r *GroupCoordinatorReq) Bytes() ([]byte, error) + func (r *GroupCoordinatorReq) WriteTo(w io.Writer) (int64, error) + type GroupCoordinatorResp struct + CoordinatorHost string + CoordinatorID int32 + CoordinatorPort int32 + CorrelationID int32 + Err error + func ReadGroupCoordinatorResp(r io.Reader) (*GroupCoordinatorResp, error) + func (r *GroupCoordinatorResp) Bytes() ([]byte, error) + type KafkaError struct + func (err *KafkaError) Errno() int + func (err *KafkaError) Error() string + type Message struct + Crc uint32 + Key []byte + Offset int64 + Partition int32 + TipOffset int64 + Topic string + Value []byte + type MetadataReq struct + ClientID string + CorrelationID int32 + Topics []string + func ReadMetadataReq(r io.Reader) (*MetadataReq, error) + func (r *MetadataReq) Bytes() ([]byte, error) + func (r *MetadataReq) WriteTo(w io.Writer) (int64, error) + type MetadataResp struct + Brokers []MetadataRespBroker + CorrelationID int32 + Topics []MetadataRespTopic + func ReadMetadataResp(r io.Reader) (*MetadataResp, error) + func (r *MetadataResp) Bytes() ([]byte, error) + type MetadataRespBroker struct + Host string + NodeID int32 + Port int32 + type MetadataRespPartition struct + Err error + ID int32 + Isrs []int32 + Leader int32 + Replicas []int32 + type MetadataRespTopic struct + Err error + Name string + Partitions []MetadataRespPartition + type OffsetCommitReq struct + ClientID string + ConsumerGroup string + CorrelationID int32 + Topics []OffsetCommitReqTopic + func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error) + func (r *OffsetCommitReq) Bytes() ([]byte, error) + func (r *OffsetCommitReq) WriteTo(w io.Writer) (int64, error) + type OffsetCommitReqPartition struct + ID int32 + Metadata string + Offset int64 + TimeStamp time.Time + type OffsetCommitReqTopic struct + Name string + Partitions []OffsetCommitReqPartition + type OffsetCommitResp struct + CorrelationID int32 + Topics []OffsetCommitRespTopic + func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error) + func (r *OffsetCommitResp) Bytes() ([]byte, error) + type OffsetCommitRespPartition struct + Err error + ID int32 + type OffsetCommitRespTopic struct + Name string + Partitions []OffsetCommitRespPartition + type OffsetFetchReq struct + ClientID string + ConsumerGroup string + CorrelationID int32 + Topics []OffsetFetchReqTopic + func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error) + func (r *OffsetFetchReq) Bytes() ([]byte, error) + func (r *OffsetFetchReq) WriteTo(w io.Writer) (int64, error) + type OffsetFetchReqTopic struct + Name string + Partitions []int32 + type OffsetFetchResp struct + CorrelationID int32 + Topics []OffsetFetchRespTopic + func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error) + func (r *OffsetFetchResp) Bytes() ([]byte, error) + type OffsetFetchRespPartition struct + Err error + ID int32 + Metadata string + Offset int64 + type OffsetFetchRespTopic struct + Name string + Partitions []OffsetFetchRespPartition + type OffsetReq struct + ClientID string + CorrelationID int32 + ReplicaID int32 + Topics []OffsetReqTopic + func ReadOffsetReq(r io.Reader) (*OffsetReq, error) + func (r *OffsetReq) Bytes() ([]byte, error) + func (r *OffsetReq) WriteTo(w io.Writer) (int64, error) + type OffsetReqPartition struct + ID int32 + MaxOffsets int32 + TimeMs int64 + type OffsetReqTopic struct + Name string + Partitions []OffsetReqPartition + type OffsetResp struct + CorrelationID int32 + Topics []OffsetRespTopic + func ReadOffsetResp(r io.Reader) (*OffsetResp, error) + func (r *OffsetResp) Bytes() ([]byte, error) + type OffsetRespPartition struct + Err error + ID int32 + Offsets []int64 + type OffsetRespTopic struct + Name string + Partitions []OffsetRespPartition + type ProduceReq struct + ClientID string + Compression Compression + CorrelationID int32 + RequiredAcks int16 + Timeout time.Duration + Topics []ProduceReqTopic + func ReadProduceReq(r io.Reader) (*ProduceReq, error) + func (r *ProduceReq) Bytes() ([]byte, error) + func (r *ProduceReq) WriteTo(w io.Writer) (int64, error) + type ProduceReqPartition struct + ID int32 + Messages []*Message + type ProduceReqTopic struct + Name string + Partitions []ProduceReqPartition + type ProduceResp struct + CorrelationID int32 + Topics []ProduceRespTopic + func ReadProduceResp(r io.Reader) (*ProduceResp, error) + func (r *ProduceResp) Bytes() ([]byte, error) + type ProduceRespPartition struct + Err error + ID int32 + Offset int64 + type ProduceRespTopic struct + Name string + Partitions []ProduceRespPartition + type Request interface + WriteTo func(io.Writer) (int64, error)