Versions in this module Expand all Collapse all v2 v2.2.1 Oct 19, 2020 Changes in this version + const APIVersionsReqKind + const ConsumerMetadataReqKind + const CorrelationTypeGroup + const CorrelationTypeTransaction + const CreateTopicsReqKind + const DeleteTopicsReqKind + const FetchReqKind + const KafkaV0 + const KafkaV1 + const KafkaV2 + const KafkaV3 + const KafkaV4 + const KafkaV5 + const MetadataReqKind + const OffsetCommitReqKind + const OffsetFetchReqKind + const OffsetReqKind + const OffsetReqTimeEarliest + const OffsetReqTimeLatest + const ProduceReqKind + const RequiredAcksAll + const RequiredAcksLocal + const RequiredAcksNone + var ErrBrokerNotAvailable = &KafkaError + var ErrClusterAuthorizationFailed = &KafkaError + var ErrConcurrentTransactions = &KafkaError + var ErrDelegationTokenAuthDisabled = &KafkaError + var ErrDelegationTokenAuthorizationFailed = &KafkaError + var ErrDelegationTokenExpired = &KafkaError + var ErrDelegationTokenNotFound = &KafkaError + var ErrDelegationTokenOwnerMismatch = &KafkaError + var ErrDelegationTokenRequestNotAllowed = &KafkaError + var ErrDuplicateSequenceNumber = &KafkaError + var ErrFetchSessionIdNotFound = &KafkaError + var ErrGroupAuthorizationFailed = &KafkaError + var ErrGroupIdNotFound = &KafkaError + var ErrIllegalGeneration = &KafkaError + var ErrIllegalSaslState = &KafkaError + var ErrInconsistentPartitionAssignmentStrategy = &KafkaError + var ErrInvalidArrayLen = errors.New("invalid array length") + var ErrInvalidCommitOffsetSize = &KafkaError + var ErrInvalidConfig = &KafkaError + var ErrInvalidFetchSessionEpoch = &KafkaError + var ErrInvalidMessage = &KafkaError + var ErrInvalidMessageSize = &KafkaError + var ErrInvalidPartitions = &KafkaError + var ErrInvalidPrincipalType = &KafkaError + var ErrInvalidProducerEpoch = &KafkaError + var ErrInvalidProducerIdMapping = &KafkaError + var ErrInvalidReplicaAssignment = &KafkaError + var ErrInvalidReplicationFactor = &KafkaError + var ErrInvalidRequest = &KafkaError + var ErrInvalidRequiredAcks = &KafkaError + var ErrInvalidSessionTimeout = &KafkaError + var ErrInvalidTimeStamp = &KafkaError + var ErrInvalidTopic = &KafkaError + var ErrInvalidTransactionTimeout = &KafkaError + var ErrInvalidTxnState = &KafkaError + var ErrKafkaStorageError = &KafkaError + var ErrLeaderNotAvailable = &KafkaError + var ErrLogDirNotFound = &KafkaError + var ErrMessageSizeTooLarge = &KafkaError + var ErrNetwork = &KafkaError + var ErrNoCoordinator = &KafkaError + var ErrNonEmptyGroup = &KafkaError + var ErrNotController = &KafkaError + var ErrNotCoordinator = &KafkaError + var ErrNotEnoughData = errors.New("not enough data") + var ErrNotEnoughReplicas = &KafkaError + var ErrNotEnoughReplicasAfterAppend = &KafkaError + var ErrNotLeaderForPartition = &KafkaError + var ErrOffsetLoadInProgress = &KafkaError + var ErrOffsetMetadataTooLarge = &KafkaError + var ErrOffsetOutOfRange = &KafkaError + var ErrOperationNotAttempted = &KafkaError + var ErrOutOfOrderSequenceNumber = &KafkaError + var ErrPolicyViolation = &KafkaError + var ErrReassignmentInProgress = &KafkaError + var ErrRebalanceInProgress = &KafkaError + var ErrRecordListTooLarge = &KafkaError + var ErrReplicaNotAvailable = &KafkaError + var ErrRequestTimeout = &KafkaError + var ErrSaslAuthenticationFailed = &KafkaError + var ErrScaleControllerEpoch = &KafkaError + var ErrSecurityDisabled = &KafkaError + var ErrTopicAlreadyExists = &KafkaError + var ErrTopicAuthorizationFailed = &KafkaError + var ErrTransactionCoordinatorFenced = &KafkaError + var ErrTransactionalIdAuthorizationFailed = &KafkaError + var ErrUnknown = &KafkaError + var ErrUnknownConsumerID = &KafkaError + var ErrUnknownParititonAssignmentStrategy = &KafkaError + var ErrUnknownProducerId = &KafkaError + var ErrUnknownTopicOrPartition = &KafkaError + var ErrUnsupportedForMessageFormat = &KafkaError + var ErrUnsupportedSaslMechanism = &KafkaError + var ErrUnsupportedVersion = &KafkaError + var SupportedByDriver = map[int16]SupportedVersion + func ComputeCrc(m *Message, compression Compression) uint32 + func ConfigureParser(c ParserConfig) error + func NewDecoder(r io.Reader) *decoder + func NewEncoder(w io.Writer) *encoder + func ReadReq(r io.Reader) (requestKind int16, b []byte, err error) + func ReadResp(r io.Reader) (correlationID int32, b []byte, err error) + func SetCorrelationID(header *RequestHeader, correlationID int32) + func SetVersion(header *RequestHeader, version int16) + type APIVersionsReq struct + func ReadAPIVersionsReq(r io.Reader) (*APIVersionsReq, error) + func (r *APIVersionsReq) Bytes() ([]byte, error) + func (r *APIVersionsReq) WriteTo(w io.Writer) (int64, error) + func (r APIVersionsReq) Kind() int16 + type APIVersionsResp struct + APIVersions []SupportedVersion + CorrelationID int32 + ThrottleTime time.Duration + Version int16 + func ReadAPIVersionsResp(r io.Reader) (*APIVersionsResp, error) + func ReadVersionedAPIVersionsResp(r io.Reader, version int16) (*APIVersionsResp, error) + func (r *APIVersionsResp) Bytes() ([]byte, error) + type Compression int8 + const CompressionGzip + const CompressionNone + const CompressionSnappy + type ConfigEntry struct + ConfigName string + ConfigValue string + type ConsumerMetadataReq struct + ConsumerGroup string + CoordinatorType int8 + func ReadConsumerMetadataReq(r io.Reader) (*ConsumerMetadataReq, error) + func (r *ConsumerMetadataReq) Bytes() ([]byte, error) + func (r *ConsumerMetadataReq) WriteTo(w io.Writer) (int64, error) + func (r ConsumerMetadataReq) Kind() int16 + type ConsumerMetadataResp struct + CoordinatorHost string + CoordinatorID int32 + CoordinatorPort int32 + CorrelationID int32 + Err error + ErrMsg string + ThrottleTime time.Duration + Version int16 + func ReadConsumerMetadataResp(r io.Reader) (*ConsumerMetadataResp, error) + func ReadVersionedConsumerMetadataResp(r io.Reader, version int16) (*ConsumerMetadataResp, error) + func (r *ConsumerMetadataResp) Bytes() ([]byte, error) + type CreateTopicsReq struct + CreateTopicsRequests []TopicInfo + Timeout time.Duration + ValidateOnly bool + func ReadCreateTopicsReq(r io.Reader) (*CreateTopicsReq, error) + func (r *CreateTopicsReq) Bytes() ([]byte, error) + func (r *CreateTopicsReq) WriteTo(w io.Writer) (int64, error) + func (r CreateTopicsReq) Kind() int16 + type CreateTopicsResp struct + CorrelationID int32 + ThrottleTime time.Duration + TopicErrors []TopicError + Version int16 + func ReadCreateTopicsResp(r io.Reader) (*CreateTopicsResp, error) + func ReadVersionedCreateTopicsResp(r io.Reader, version int16) (*CreateTopicsResp, error) + func (r *CreateTopicsResp) Bytes() ([]byte, error) + type DeleteTopicError struct + ErrorCode int16 + Topic string + type DeleteTopicsReq struct + Timeout time.Duration + Topics []string + func ReadDeleteTopicsReq(r io.Reader) (*DeleteTopicsReq, error) + func (r *DeleteTopicsReq) Bytes() ([]byte, error) + func (r *DeleteTopicsReq) WriteTo(w io.Writer) (int64, error) + func (r DeleteTopicsReq) Kind() int16 + type DeleteTopicsResp struct + ThrottleTime time.Duration + TopicErrors []DeleteTopicError + func ReadDeleteTopicsResp(r io.Reader) (*DeleteTopicsResp, error) + func (r *DeleteTopicsResp) Bytes() ([]byte, error) + type FetchReq struct + IsolationLevel int8 + MaxBytes int32 + MaxWaitTime time.Duration + MinBytes int32 + ReplicaID int32 + Topics []FetchReqTopic + func ReadFetchReq(r io.Reader) (*FetchReq, error) + func (r *FetchReq) Bytes() ([]byte, error) + func (r *FetchReq) WriteTo(w io.Writer) (int64, error) + func (r FetchReq) Kind() int16 + type FetchReqPartition struct + FetchOffset int64 + ID int32 + LogStartOffset int64 + MaxBytes int32 + type FetchReqTopic struct + Name string + Partitions []FetchReqPartition + type FetchResp struct + CorrelationID int32 + ThrottleTime time.Duration + Topics []FetchRespTopic + Version int16 + func ReadFetchResp(r io.Reader) (*FetchResp, error) + func ReadVersionedFetchResp(r io.Reader, version int16) (*FetchResp, error) + func (r *FetchResp) Bytes() ([]byte, error) + type FetchRespAbortedTransaction struct + FirstOffset int64 + ProducerID int64 + type FetchRespPartition struct + AbortedTransactions []FetchRespAbortedTransaction + Err error + ID int32 + LastStableOffset int64 + LogStartOffset int64 + MessageVersion MessageVersion + Messages []*Message + RecordBatches []*RecordBatch + TipOffset int64 + type FetchRespTopic struct + Name string + Partitions []FetchRespPartition + type KafkaError struct + func (err *KafkaError) Errno() int + func (err *KafkaError) Error() string + type Message struct + Crc uint32 + Key []byte + Offset int64 + Partition int32 + TipOffset int64 + Topic string + Value []byte + type MessageVersion int8 + const MessageV0 + const MessageV1 + const MessageV2 + type MetadataReq struct + AllowAutoTopicCreation bool + Topics []string + func ReadMetadataReq(r io.Reader) (*MetadataReq, error) + func (r *MetadataReq) Bytes() ([]byte, error) + func (r *MetadataReq) WriteTo(w io.Writer) (int64, error) + func (r MetadataReq) Kind() int16 + type MetadataResp struct + Brokers []MetadataRespBroker + ClusterID string + ControllerID int32 + CorrelationID int32 + ThrottleTime time.Duration + Topics []MetadataRespTopic + Version int16 + func ReadMetadataResp(r io.Reader) (*MetadataResp, error) + func ReadVersionedMetadataResp(r io.Reader, version int16) (*MetadataResp, error) + func (r *MetadataResp) Bytes() ([]byte, error) + type MetadataRespBroker struct + Host string + NodeID int32 + Port int32 + Rack string + type MetadataRespPartition struct + Err error + ID int32 + Isrs []int32 + Leader int32 + OfflineReplicas []int32 + Replicas []int32 + type MetadataRespTopic struct + Err error + IsInternal bool + Name string + Partitions []MetadataRespPartition + type OffsetCommitReq struct + ConsumerGroup string + GroupGenerationID int32 + MemberID string + RetentionTime int64 + Topics []OffsetCommitReqTopic + func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error) + func (r *OffsetCommitReq) Bytes() ([]byte, error) + func (r *OffsetCommitReq) WriteTo(w io.Writer) (int64, error) + func (r OffsetCommitReq) Kind() int16 + type OffsetCommitReqPartition struct + ID int32 + Metadata string + Offset int64 + TimeStamp time.Time + type OffsetCommitReqTopic struct + Name string + Partitions []OffsetCommitReqPartition + type OffsetCommitResp struct + CorrelationID int32 + ThrottleTime time.Duration + Topics []OffsetCommitRespTopic + Version int16 + func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error) + func ReadVersionedOffsetCommitResp(r io.Reader, version int16) (*OffsetCommitResp, error) + func (r *OffsetCommitResp) Bytes() ([]byte, error) + type OffsetCommitRespPartition struct + Err error + ID int32 + type OffsetCommitRespTopic struct + Name string + Partitions []OffsetCommitRespPartition + type OffsetFetchReq struct + ConsumerGroup string + Topics []OffsetFetchReqTopic + func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error) + func (r *OffsetFetchReq) Bytes() ([]byte, error) + func (r *OffsetFetchReq) WriteTo(w io.Writer) (int64, error) + func (r OffsetFetchReq) Kind() int16 + type OffsetFetchReqTopic struct + Name string + Partitions []int32 + type OffsetFetchResp struct + CorrelationID int32 + Err error + ThrottleTime time.Duration + Topics []OffsetFetchRespTopic + Version int16 + func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error) + func ReadVersionedOffsetFetchResp(r io.Reader, version int16) (*OffsetFetchResp, error) + func (r *OffsetFetchResp) Bytes() ([]byte, error) + type OffsetFetchRespPartition struct + Err error + ID int32 + Metadata string + Offset int64 + type OffsetFetchRespTopic struct + Name string + Partitions []OffsetFetchRespPartition + type OffsetReq struct + IsolationLevel int8 + ReplicaID int32 + Topics []OffsetReqTopic + func ReadOffsetReq(r io.Reader) (*OffsetReq, error) + func (r *OffsetReq) Bytes() ([]byte, error) + func (r *OffsetReq) WriteTo(w io.Writer) (int64, error) + func (r OffsetReq) Kind() int16 + type OffsetReqPartition struct + ID int32 + MaxOffsets int32 + TimeMs int64 + type OffsetReqTopic struct + Name string + Partitions []OffsetReqPartition + type OffsetResp struct + CorrelationID int32 + ThrottleTime time.Duration + Topics []OffsetRespTopic + Version int16 + func ReadOffsetResp(r io.Reader) (*OffsetResp, error) + func ReadVersionedOffsetResp(r io.Reader, version int16) (*OffsetResp, error) + func (r *OffsetResp) Bytes() ([]byte, error) + type OffsetRespPartition struct + Err error + ID int32 + Offsets []int64 + TimeStamp time.Time + type OffsetRespTopic struct + Name string + Partitions []OffsetRespPartition + type ParserConfig struct + SimplifiedMessageSetParsing bool + type ProduceReq struct + Compression Compression + RequiredAcks int16 + Timeout time.Duration + Topics []ProduceReqTopic + TransactionalID string + func ReadProduceReq(r io.Reader) (*ProduceReq, error) + func (r *ProduceReq) Bytes() ([]byte, error) + func (r *ProduceReq) WriteTo(w io.Writer) (int64, error) + func (r ProduceReq) Kind() int16 + type ProduceReqPartition struct + ID int32 + Messages []*Message + type ProduceReqTopic struct + Name string + Partitions []ProduceReqPartition + type ProduceResp struct + CorrelationID int32 + ThrottleTime time.Duration + Topics []ProduceRespTopic + Version int16 + func ReadProduceResp(r io.Reader) (*ProduceResp, error) + func ReadVersionedProduceResp(r io.Reader, version int16) (*ProduceResp, error) + func (r *ProduceResp) Bytes() ([]byte, error) + type ProduceRespPartition struct + Err error + ID int32 + LogAppendTime int64 + Offset int64 + type ProduceRespTopic struct + Name string + Partitions []ProduceRespPartition + type Record struct + Attributes int8 + Headers []RecordHeader + Key []byte + Length int64 + OffsetDelta int64 + TimestampDelta int64 + Value []byte + type RecordBatch struct + Attributes int16 + CRC int32 + FirstOffset int64 + FirstSequence int32 + FirstTimestamp int64 + LastOffsetDelta int32 + Length int32 + Magic int8 + MaxTimestamp int64 + PartitionLeaderEpoch int32 + ProducerEpoch int16 + ProducerId int64 + Records []*Record + func (rb *RecordBatch) Compression() Compression + type RecordHeader struct + Key string + Value []byte + type ReplicaAssignment struct + Partition int32 + Replicas []int32 + type Request interface + Bytes func() ([]byte, error) + GetClientID func() string + GetCorrelationID func() int32 + GetHeader func() *RequestHeader + GetVersion func() int16 + Kind func() int16 + SetClientID func(cliendID string) + type RequestHeader struct + ClientID string + func (h *RequestHeader) GetClientID() string + func (h *RequestHeader) GetCorrelationID() int32 + func (h *RequestHeader) GetHeader() *RequestHeader + func (h *RequestHeader) GetVersion() int16 + func (h *RequestHeader) SetClientID(cliendID string) + type SupportedVersion struct + APIKey int16 + MaxVersion int16 + MinVersion int16 + type TopicError struct + Err error + ErrorCode int16 + ErrorMessage string + Topic string + type TopicInfo struct + ConfigEntries []ConfigEntry + NumPartitions int32 + ReplicaAssignments []ReplicaAssignment + ReplicationFactor int16 + Topic string