Documentation ¶
Overview ¶
Package proto provides kafka binary protocol implementation.
Index ¶
- Constants
- Variables
- func ComputeCrc(m *Message, compression Compression) uint32
- func ConfigureParser(c ParserConfig) error
- func NewDecoder(r io.Reader) *decoder
- func NewEncoder(w io.Writer) *encoder
- func ReadReq(r io.Reader) (requestKind int16, b []byte, err error)
- func ReadResp(r io.Reader) (correlationID int32, b []byte, err error)
- func SetCorrelationID(header *RequestHeader, correlationID int32)
- func SetVersion(header *RequestHeader, version int16)
- type APIVersionsReq
- type APIVersionsResp
- type Compression
- type ConfigEntry
- type ConsumerMetadataReq
- type ConsumerMetadataResp
- type CreateTopicsReq
- type CreateTopicsResp
- type DeleteTopicError
- type DeleteTopicsReq
- type DeleteTopicsResp
- type FetchReq
- type FetchReqPartition
- type FetchReqTopic
- type FetchResp
- type FetchRespAbortedTransaction
- type FetchRespPartition
- type FetchRespTopic
- type KafkaError
- type Message
- type MessageVersion
- type MetadataReq
- type MetadataResp
- type MetadataRespBroker
- type MetadataRespPartition
- type MetadataRespTopic
- type OffsetCommitReq
- type OffsetCommitReqPartition
- type OffsetCommitReqTopic
- type OffsetCommitResp
- type OffsetCommitRespPartition
- type OffsetCommitRespTopic
- type OffsetFetchReq
- type OffsetFetchReqTopic
- type OffsetFetchResp
- type OffsetFetchRespPartition
- type OffsetFetchRespTopic
- type OffsetReq
- type OffsetReqPartition
- type OffsetReqTopic
- type OffsetResp
- type OffsetRespPartition
- type OffsetRespTopic
- type ParserConfig
- type ProduceReq
- type ProduceReqPartition
- type ProduceReqTopic
- type ProduceResp
- type ProduceRespPartition
- type ProduceRespTopic
- type Record
- type RecordBatch
- type RecordHeader
- type ReplicaAssignment
- type Request
- type RequestHeader
- type SupportedVersion
- type TopicError
- type TopicInfo
Constants ¶
const ( KafkaV0 int16 = iota KafkaV1 KafkaV2 KafkaV3 KafkaV4 KafkaV5 )
const ( ProduceReqKind = 0 FetchReqKind = 1 OffsetReqKind = 2 MetadataReqKind = 3 OffsetCommitReqKind = 8 OffsetFetchReqKind = 9 ConsumerMetadataReqKind = 10 APIVersionsReqKind = 18 CreateTopicsReqKind = 19 DeleteTopicReqKind = 20 )
const ( // receive the latest offset (i.e. the offset of the next coming message) OffsetReqTimeLatest = -1 // receive the earliest available offset. Note that because offsets are // pulled in descending order, asking for the earliest offset will always // return you a single element. OffsetReqTimeEarliest = -2 // Server will not send any response. RequiredAcksNone = 0 // Server will block until the message is committed by all in sync replicas // before sending a response. RequiredAcksAll = -1 // Server will wait the data is written to the local log before sending a // response. RequiredAcksLocal = 1 )
const ( CorrelationTypeGroup int8 = 0 CorrelationTypeTransaction = 1 )
Variables ¶
var ( ErrUnknown = &KafkaError{-1, "unknown error"} ErrOffsetOutOfRange = &KafkaError{1, "offset out of range"} ErrInvalidMessage = &KafkaError{2, "invalid message"} ErrUnknownTopicOrPartition = &KafkaError{3, "unknown topic or partition"} ErrInvalidMessageSize = &KafkaError{4, "invalid message size"} ErrLeaderNotAvailable = &KafkaError{5, "leader not available"} ErrNotLeaderForPartition = &KafkaError{6, "not leader for partition"} ErrRequestTimeout = &KafkaError{7, "request timeed out"} ErrBrokerNotAvailable = &KafkaError{8, "broker not available"} ErrReplicaNotAvailable = &KafkaError{9, "replica not available"} ErrMessageSizeTooLarge = &KafkaError{10, "message size too large"} ErrScaleControllerEpoch = &KafkaError{11, "scale controller epoch"} ErrOffsetMetadataTooLarge = &KafkaError{12, "offset metadata too large"} ErrNetwork = &KafkaError{13, "server disconnected before response was received"} ErrOffsetLoadInProgress = &KafkaError{14, "offsets load in progress"} ErrNoCoordinator = &KafkaError{15, "consumer coordinator not available"} ErrNotCoordinator = &KafkaError{16, "not coordinator for consumer"} ErrInvalidTopic = &KafkaError{17, "operation on an invalid topic"} ErrRecordListTooLarge = &KafkaError{18, "message batch larger than the configured segment size"} ErrNotEnoughReplicas = &KafkaError{19, "not enough in-sync replicas"} ErrNotEnoughReplicasAfterAppend = &KafkaError{20, "messages are written to the log, but to fewer in-sync replicas than required"} ErrInvalidRequiredAcks = &KafkaError{21, "invalid value for required acks"} ErrIllegalGeneration = &KafkaError{22, "consumer generation id is not valid"} ErrInconsistentPartitionAssignmentStrategy = &KafkaError{23, "partition assignment strategy does not match that of the group"} ErrUnknownParititonAssignmentStrategy = &KafkaError{24, "partition assignment strategy is unknown to the broker"} ErrUnknownConsumerID = &KafkaError{25, "coordinator is not aware of this consumer"} ErrInvalidSessionTimeout = &KafkaError{26, "invalid session timeout"} ErrRebalanceInProgress = &KafkaError{27, "group is rebalancing, so a rejoin is needed"} ErrInvalidCommitOffsetSize = &KafkaError{28, "offset data size is not valid"} ErrTopicAuthorizationFailed = &KafkaError{29, "topic authorization failed"} ErrGroupAuthorizationFailed = &KafkaError{30, "group authorization failed"} ErrClusterAuthorizationFailed = &KafkaError{31, "cluster authorization failed"} ErrInvalidTimeStamp = &KafkaError{32, "timestamp of the message is out of acceptable range"} ErrUnsupportedSaslMechanism = &KafkaError{33, "The broker does not support the requested SASL mechanism."} ErrIllegalSaslState = &KafkaError{34, "Request is not valid given the current SASL state."} ErrUnsupportedVersion = &KafkaError{35, "The version of API is not supported."} ErrTopicAlreadyExists = &KafkaError{36, "Topic with this name already exists."} ErrInvalidPartitions = &KafkaError{37, "Number of partitions is invalid."} ErrInvalidReplicationFactor = &KafkaError{38, "Replication-factor is invalid."} ErrInvalidReplicaAssignment = &KafkaError{39, "Replica assignment is invalid."} ErrInvalidConfig = &KafkaError{40, "Configuration is invalid."} ErrNotController = &KafkaError{41, "This is not the correct controller for this cluster."} ErrInvalidRequest = &KafkaError{42, "This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details."} ErrUnsupportedForMessageFormat = &KafkaError{43, "The message format version on the broker does not support the request."} ErrPolicyViolation = &KafkaError{44, "Request parameters do not satisfy the configured policy."} ErrOutOfOrderSequenceNumber = &KafkaError{45, "The broker received an out of order sequence number"} ErrDuplicateSequenceNumber = &KafkaError{46, "The broker received a duplicate sequence number"} ErrInvalidProducerEpoch = &KafkaError{47, "Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker."} ErrInvalidTxnState = &KafkaError{48, "The producer attempted a transactional operation in an invalid state"} ErrInvalidProducerIdMapping = &KafkaError{49, "The producer attempted to use a producer id which is not currently assigned to its transactional id"} ErrInvalidTransactionTimeout = &KafkaError{50, "The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms)."} ErrConcurrentTransactions = &KafkaError{51, "The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing"} ErrTransactionCoordinatorFenced = &KafkaError{52, "Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer"} ErrTransactionalIdAuthorizationFailed = &KafkaError{53, "Transactional Id authorization failed"} ErrSecurityDisabled = &KafkaError{54, "Security features are disabled."} ErrOperationNotAttempted = &KafkaError{55, "The broker did not attempt to execute this operation. This may happen for batched RPCs where some operations in the batch failed, causing the broker to respond without trying the rest."} ErrKafkaStorageError = &KafkaError{56, "Disk error when trying to access log file on the disk."} ErrLogDirNotFound = &KafkaError{57, "The user-specified log directory is not found in the broker config."} ErrSaslAuthenticationFailed = &KafkaError{58, "SASL Authentication failed."} ErrUnknownProducerId = &KafkaError{59, "This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception."} ErrReassignmentInProgress = &KafkaError{60, "A partition reassignment is in progress"} ErrDelegationTokenAuthDisabled = &KafkaError{61, "Delegation Token feature is not enabled."} ErrDelegationTokenNotFound = &KafkaError{62, "Delegation Token is not found on server."} ErrDelegationTokenOwnerMismatch = &KafkaError{63, "Specified Principal is not valid Owner/Renewer."} ErrDelegationTokenRequestNotAllowed = &KafkaError{64, "Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels."} ErrDelegationTokenAuthorizationFailed = &KafkaError{65, "Delegation Token authorization failed."} ErrDelegationTokenExpired = &KafkaError{66, "Delegation Token is expired."} ErrInvalidPrincipalType = &KafkaError{67, "Supplied principalType is not supported"} ErrNonEmptyGroup = &KafkaError{68, "The group The group is not empty is not empty"} ErrGroupIdNotFound = &KafkaError{69, "The group id The group id does not exist was not found"} ErrFetchSessionIdNotFound = &KafkaError{70, "The fetch session ID was not found"} ErrInvalidFetchSessionEpoch = &KafkaError{71, "The fetch session epoch is invalid"} )
var ErrInvalidArrayLen = errors.New("invalid array length")
var ErrNotEnoughData = errors.New("not enough data")
var SupportedByDriver = map[int16]SupportedVersion{ ProduceReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV2}, FetchReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV5}, OffsetReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV2}, MetadataReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV5}, OffsetCommitReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV3}, OffsetFetchReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV3}, ConsumerMetadataReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV1}, APIVersionsReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV1}, }
Functions ¶
func ComputeCrc ¶
func ComputeCrc(m *Message, compression Compression) uint32
ComputeCrc returns crc32 hash for given message content.
func ConfigureParser ¶
func ConfigureParser(c ParserConfig) error
ConfigureParser configures the parser. It must be called prior to parsing any messages as the structure is currently not prepared for concurrent access.
func NewDecoder ¶
func NewEncoder ¶
func ReadReq ¶
ReadReq returns request kind ID and byte representation of the whole message in wire protocol format.
func ReadResp ¶
ReadResp returns message correlation ID and byte representation of the whole message in wire protocol that is returned when reading from given stream, including 4 bytes of message size itself. Byte representation returned by ReadResp can be parsed by all response reeaders to transform it into specialized response structure.
func SetCorrelationID ¶
func SetCorrelationID(header *RequestHeader, correlationID int32)
func SetVersion ¶
func SetVersion(header *RequestHeader, version int16)
Types ¶
type APIVersionsReq ¶
type APIVersionsReq struct {
RequestHeader
}
func ReadAPIVersionsReq ¶
func ReadAPIVersionsReq(r io.Reader) (*APIVersionsReq, error)
func (*APIVersionsReq) Bytes ¶
func (r *APIVersionsReq) Bytes() ([]byte, error)
func (APIVersionsReq) Kind ¶
func (r APIVersionsReq) Kind() int16
type APIVersionsResp ¶
type APIVersionsResp struct { Version int16 CorrelationID int32 APIVersions []SupportedVersion ThrottleTime time.Duration }
func ReadAPIVersionsResp ¶
func ReadAPIVersionsResp(r io.Reader) (*APIVersionsResp, error)
func ReadVersionedAPIVersionsResp ¶
func ReadVersionedAPIVersionsResp(r io.Reader, version int16) (*APIVersionsResp, error)
func (*APIVersionsResp) Bytes ¶
func (r *APIVersionsResp) Bytes() ([]byte, error)
type Compression ¶
type Compression int8
const ( CompressionNone Compression = 0 CompressionGzip Compression = 1 CompressionSnappy Compression = 2 )
type ConfigEntry ¶
type ConsumerMetadataReq ¶
type ConsumerMetadataReq struct { RequestHeader ConsumerGroup string CoordinatorType int8 // >= KafkaV1 }
func ReadConsumerMetadataReq ¶
func ReadConsumerMetadataReq(r io.Reader) (*ConsumerMetadataReq, error)
func (*ConsumerMetadataReq) Bytes ¶
func (r *ConsumerMetadataReq) Bytes() ([]byte, error)
func (ConsumerMetadataReq) Kind ¶
func (r ConsumerMetadataReq) Kind() int16
type ConsumerMetadataResp ¶
type ConsumerMetadataResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration // >= KafkaV1 Err error ErrMsg string // >= KafkaV1 CoordinatorID int32 CoordinatorHost string CoordinatorPort int32 }
func ReadConsumerMetadataResp ¶
func ReadConsumerMetadataResp(r io.Reader) (*ConsumerMetadataResp, error)
func ReadVersionedConsumerMetadataResp ¶
func ReadVersionedConsumerMetadataResp(r io.Reader, version int16) (*ConsumerMetadataResp, error)
func (*ConsumerMetadataResp) Bytes ¶
func (r *ConsumerMetadataResp) Bytes() ([]byte, error)
type CreateTopicsReq ¶
type CreateTopicsReq struct { RequestHeader CreateTopicsRequests []TopicInfo Timeout time.Duration ValidateOnly bool }
func ReadCreateTopicsReq ¶
func ReadCreateTopicsReq(r io.Reader) (*CreateTopicsReq, error)
func (*CreateTopicsReq) Bytes ¶
func (r *CreateTopicsReq) Bytes() ([]byte, error)
func (CreateTopicsReq) Kind ¶
func (r CreateTopicsReq) Kind() int16
type CreateTopicsResp ¶
type CreateTopicsResp struct { Version int16 CorrelationID int32 TopicErrors []TopicError ThrottleTime time.Duration // >= KafkaV2 }
func ReadCreateTopicsResp ¶
func ReadCreateTopicsResp(r io.Reader) (*CreateTopicsResp, error)
func ReadVersionedCreateTopicsResp ¶
func ReadVersionedCreateTopicsResp(r io.Reader, version int16) (*CreateTopicsResp, error)
func (*CreateTopicsResp) Bytes ¶
func (r *CreateTopicsResp) Bytes() ([]byte, error)
type DeleteTopicError ¶
type DeleteTopicsReq ¶
type DeleteTopicsReq struct { RequestHeader Topics []string Timeout time.Duration }
func ReadDeleteTopicsReq ¶
func ReadDeleteTopicsReq(r io.Reader) (*DeleteTopicsReq, error)
func (*DeleteTopicsReq) Bytes ¶
func (r *DeleteTopicsReq) Bytes() ([]byte, error)
func (DeleteTopicsReq) Kind ¶
func (r DeleteTopicsReq) Kind() int16
type DeleteTopicsResp ¶
type DeleteTopicsResp struct { TopicErrors []DeleteTopicError ThrottleTime time.Duration // >= KafkaV2 }
func ReadDeleteTopicsResp ¶
func ReadDeleteTopicsResp(r io.Reader) (*DeleteTopicsResp, error)
func (*DeleteTopicsResp) Bytes ¶
func (r *DeleteTopicsResp) Bytes() ([]byte, error)
type FetchReq ¶
type FetchReq struct { RequestHeader ReplicaID int32 MaxWaitTime time.Duration MinBytes int32 MaxBytes int32 // >= KafkaV3 IsolationLevel int8 // >= KafkaV4 Topics []FetchReqTopic }
type FetchReqPartition ¶
type FetchReqTopic ¶
type FetchReqTopic struct { Name string Partitions []FetchReqPartition }
type FetchResp ¶
type FetchResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration Topics []FetchRespTopic }
func ReadVersionedFetchResp ¶
type FetchRespPartition ¶
type FetchRespPartition struct { ID int32 Err error TipOffset int64 LastStableOffset int64 LogStartOffset int64 AbortedTransactions []FetchRespAbortedTransaction Messages []*Message MessageVersion MessageVersion RecordBatches []*RecordBatch }
type FetchRespTopic ¶
type FetchRespTopic struct { Name string Partitions []FetchRespPartition }
type KafkaError ¶
type KafkaError struct {
// contains filtered or unexported fields
}
func (*KafkaError) Errno ¶
func (err *KafkaError) Errno() int
func (*KafkaError) Error ¶
func (err *KafkaError) Error() string
type Message ¶
type Message struct { Key []byte Value []byte Offset int64 // set when fetching and after successful producing Crc uint32 // set when fetching, ignored when producing Topic string // set when fetching, ignored when producing Partition int32 // set when fetching, ignored when producing TipOffset int64 // set when fetching, ignored when processing }
Message represents single entity of message set.
type MessageVersion ¶
type MessageVersion int8
Message version define which format of messages is using in this particular Produce/Response MessageV0 and MessageV1 indicate usage of MessageSet MessageV3 indicate usage of RecordBatch See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
const MessageV0 MessageVersion = 0
const MessageV1 MessageVersion = 1
const MessageV2 MessageVersion = 2
type MetadataReq ¶
type MetadataReq struct { RequestHeader Topics []string AllowAutoTopicCreation bool // >= KafkaV4 only }
func ReadMetadataReq ¶
func ReadMetadataReq(r io.Reader) (*MetadataReq, error)
func (*MetadataReq) Bytes ¶
func (r *MetadataReq) Bytes() ([]byte, error)
func (MetadataReq) Kind ¶
func (r MetadataReq) Kind() int16
type MetadataResp ¶
type MetadataResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration // >= KafkaV3 Brokers []MetadataRespBroker ClusterID string // >= KafkaV2 ControllerID int32 // >= KafkaV1 Topics []MetadataRespTopic }
func ReadMetadataResp ¶
func ReadMetadataResp(r io.Reader) (*MetadataResp, error)
func ReadVersionedMetadataResp ¶
func ReadVersionedMetadataResp(r io.Reader, version int16) (*MetadataResp, error)
func (*MetadataResp) Bytes ¶
func (r *MetadataResp) Bytes() ([]byte, error)
type MetadataRespBroker ¶
type MetadataRespPartition ¶
type MetadataRespTopic ¶
type MetadataRespTopic struct { Name string Err error IsInternal bool // >= KafkaV1 Partitions []MetadataRespPartition }
type OffsetCommitReq ¶
type OffsetCommitReq struct { RequestHeader ConsumerGroup string GroupGenerationID int32 // >= KafkaV1 only MemberID string // >= KafkaV1 only RetentionTime int64 // >= KafkaV2 only Topics []OffsetCommitReqTopic }
func ReadOffsetCommitReq ¶
func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error)
func (*OffsetCommitReq) Bytes ¶
func (r *OffsetCommitReq) Bytes() ([]byte, error)
func (OffsetCommitReq) Kind ¶
func (r OffsetCommitReq) Kind() int16
type OffsetCommitReqTopic ¶
type OffsetCommitReqTopic struct { Name string Partitions []OffsetCommitReqPartition }
type OffsetCommitResp ¶
type OffsetCommitResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration // >= KafkaV3 only Topics []OffsetCommitRespTopic }
func ReadOffsetCommitResp ¶
func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error)
func ReadVersionedOffsetCommitResp ¶
func ReadVersionedOffsetCommitResp(r io.Reader, version int16) (*OffsetCommitResp, error)
func (*OffsetCommitResp) Bytes ¶
func (r *OffsetCommitResp) Bytes() ([]byte, error)
type OffsetCommitRespTopic ¶
type OffsetCommitRespTopic struct { Name string Partitions []OffsetCommitRespPartition }
type OffsetFetchReq ¶
type OffsetFetchReq struct { RequestHeader ConsumerGroup string Topics []OffsetFetchReqTopic }
func ReadOffsetFetchReq ¶
func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error)
func (*OffsetFetchReq) Bytes ¶
func (r *OffsetFetchReq) Bytes() ([]byte, error)
func (OffsetFetchReq) Kind ¶
func (r OffsetFetchReq) Kind() int16
type OffsetFetchReqTopic ¶
type OffsetFetchResp ¶
type OffsetFetchResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration // >= KafkaV3 Topics []OffsetFetchRespTopic Err error // >= KafkaV2 }
func ReadOffsetFetchResp ¶
func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error)
func ReadVersionedOffsetFetchResp ¶
func ReadVersionedOffsetFetchResp(r io.Reader, version int16) (*OffsetFetchResp, error)
func (*OffsetFetchResp) Bytes ¶
func (r *OffsetFetchResp) Bytes() ([]byte, error)
type OffsetFetchRespTopic ¶
type OffsetFetchRespTopic struct { Name string Partitions []OffsetFetchRespPartition }
type OffsetReq ¶
type OffsetReq struct { RequestHeader ReplicaID int32 IsolationLevel int8 Topics []OffsetReqTopic }
type OffsetReqPartition ¶
type OffsetReqTopic ¶
type OffsetReqTopic struct { Name string Partitions []OffsetReqPartition }
type OffsetResp ¶
type OffsetResp struct { Version int16 CorrelationID int32 ThrottleTime time.Duration Topics []OffsetRespTopic }
func ReadOffsetResp ¶
func ReadOffsetResp(r io.Reader) (*OffsetResp, error)
func ReadVersionedOffsetResp ¶
func ReadVersionedOffsetResp(r io.Reader, version int16) (*OffsetResp, error)
func (*OffsetResp) Bytes ¶
func (r *OffsetResp) Bytes() ([]byte, error)
type OffsetRespPartition ¶
type OffsetRespTopic ¶
type OffsetRespTopic struct { Name string Partitions []OffsetRespPartition }
type ParserConfig ¶
type ParserConfig struct { // SimplifiedMessageSetParsing enables a simplified version of the // MessageSet parser which will not split MessageSet into slices of // Message structures. Instead, the entire MessageSet will be read // over. This mode improves parsing speed due to reduce memory read at // the cost of not providing access to the message payload after // parsing. SimplifiedMessageSetParsing bool }
ParserConfig is optional configuration for the parser. It can be configured via SetParserConfig
type ProduceReq ¶
type ProduceReq struct { RequestHeader Compression Compression // only used when sending ProduceReqs TransactionalID string RequiredAcks int16 Timeout time.Duration Topics []ProduceReqTopic }
func ReadProduceReq ¶
func ReadProduceReq(r io.Reader) (*ProduceReq, error)
func (*ProduceReq) Bytes ¶
func (r *ProduceReq) Bytes() ([]byte, error)
func (ProduceReq) Kind ¶
func (r ProduceReq) Kind() int16
type ProduceReqPartition ¶
type ProduceReqTopic ¶
type ProduceReqTopic struct { Name string Partitions []ProduceReqPartition }
type ProduceResp ¶
type ProduceResp struct { Version int16 CorrelationID int32 Topics []ProduceRespTopic ThrottleTime time.Duration }
func ReadProduceResp ¶
func ReadProduceResp(r io.Reader) (*ProduceResp, error)
func ReadVersionedProduceResp ¶
func ReadVersionedProduceResp(r io.Reader, version int16) (*ProduceResp, error)
func (*ProduceResp) Bytes ¶
func (r *ProduceResp) Bytes() ([]byte, error)
type ProduceRespPartition ¶
type ProduceRespTopic ¶
type ProduceRespTopic struct { Name string Partitions []ProduceRespPartition }
type RecordBatch ¶
type RecordBatch struct { FirstOffset int64 Length int32 PartitionLeaderEpoch int32 Magic int8 CRC int32 Attributes int16 LastOffsetDelta int32 FirstTimestamp int64 MaxTimestamp int64 ProducerId int64 ProducerEpoch int16 FirstSequence int32 Records []*Record }
func (*RecordBatch) Compression ¶
func (rb *RecordBatch) Compression() Compression
type RecordHeader ¶
type ReplicaAssignment ¶
type RequestHeader ¶
type RequestHeader struct { ClientID string // contains filtered or unexported fields }
func (*RequestHeader) GetClientID ¶
func (h *RequestHeader) GetClientID() string
func (*RequestHeader) GetCorrelationID ¶
func (h *RequestHeader) GetCorrelationID() int32
func (*RequestHeader) GetHeader ¶
func (h *RequestHeader) GetHeader() *RequestHeader
func (*RequestHeader) GetVersion ¶
func (h *RequestHeader) GetVersion() int16
func (*RequestHeader) SetClientID ¶
func (h *RequestHeader) SetClientID(cliendID string)
type SupportedVersion ¶
type TopicError ¶
type TopicInfo ¶
type TopicInfo struct { Topic string NumPartitions int32 ReplicationFactor int16 ReplicaAssignments []ReplicaAssignment ConfigEntries []ConfigEntry }