proto

package
v2.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2020 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package proto provides kafka binary protocol implementation.

Index

Constants

View Source
const (
	KafkaV0 int16 = iota
	KafkaV1
	KafkaV2
	KafkaV3
	KafkaV4
	KafkaV5
)
View Source
const (
	ProduceReqKind          = 0
	FetchReqKind            = 1
	OffsetReqKind           = 2
	MetadataReqKind         = 3
	OffsetCommitReqKind     = 8
	OffsetFetchReqKind      = 9
	ConsumerMetadataReqKind = 10
	APIVersionsReqKind      = 18
	CreateTopicsReqKind     = 19
	DeleteTopicReqKind      = 20
)
View Source
const (

	// receive the latest offset (i.e. the offset of the next coming message)
	OffsetReqTimeLatest = -1

	// receive the earliest available offset. Note that because offsets are
	// pulled in descending order, asking for the earliest offset will always
	// return you a single element.
	OffsetReqTimeEarliest = -2

	// Server will not send any response.
	RequiredAcksNone = 0

	// Server will block until the message is committed by all in sync replicas
	// before sending a response.
	RequiredAcksAll = -1

	// Server will wait the data is written to the local log before sending a
	// response.
	RequiredAcksLocal = 1
)
View Source
const (
	CorrelationTypeGroup       int8 = 0
	CorrelationTypeTransaction      = 1
)

Variables

View Source
var (
	ErrUnknown                                 = &KafkaError{-1, "unknown error"}
	ErrOffsetOutOfRange                        = &KafkaError{1, "offset out of range"}
	ErrInvalidMessage                          = &KafkaError{2, "invalid message"}
	ErrUnknownTopicOrPartition                 = &KafkaError{3, "unknown topic or partition"}
	ErrInvalidMessageSize                      = &KafkaError{4, "invalid message size"}
	ErrLeaderNotAvailable                      = &KafkaError{5, "leader not available"}
	ErrNotLeaderForPartition                   = &KafkaError{6, "not leader for partition"}
	ErrRequestTimeout                          = &KafkaError{7, "request timeed out"}
	ErrBrokerNotAvailable                      = &KafkaError{8, "broker not available"}
	ErrReplicaNotAvailable                     = &KafkaError{9, "replica not available"}
	ErrMessageSizeTooLarge                     = &KafkaError{10, "message size too large"}
	ErrScaleControllerEpoch                    = &KafkaError{11, "scale controller epoch"}
	ErrOffsetMetadataTooLarge                  = &KafkaError{12, "offset metadata too large"}
	ErrNetwork                                 = &KafkaError{13, "server disconnected before response was received"}
	ErrOffsetLoadInProgress                    = &KafkaError{14, "offsets load in progress"}
	ErrNoCoordinator                           = &KafkaError{15, "consumer coordinator not available"}
	ErrNotCoordinator                          = &KafkaError{16, "not coordinator for consumer"}
	ErrInvalidTopic                            = &KafkaError{17, "operation on an invalid topic"}
	ErrRecordListTooLarge                      = &KafkaError{18, "message batch larger than the configured segment size"}
	ErrNotEnoughReplicas                       = &KafkaError{19, "not enough in-sync replicas"}
	ErrNotEnoughReplicasAfterAppend            = &KafkaError{20, "messages are written to the log, but to fewer in-sync replicas than required"}
	ErrInvalidRequiredAcks                     = &KafkaError{21, "invalid value for required acks"}
	ErrIllegalGeneration                       = &KafkaError{22, "consumer generation id is not valid"}
	ErrInconsistentPartitionAssignmentStrategy = &KafkaError{23, "partition assignment strategy does not match that of the group"}
	ErrUnknownParititonAssignmentStrategy      = &KafkaError{24, "partition assignment strategy is unknown to the broker"}
	ErrUnknownConsumerID                       = &KafkaError{25, "coordinator is not aware of this consumer"}
	ErrInvalidSessionTimeout                   = &KafkaError{26, "invalid session timeout"}
	ErrRebalanceInProgress                     = &KafkaError{27, "group is rebalancing, so a rejoin is needed"}
	ErrInvalidCommitOffsetSize                 = &KafkaError{28, "offset data size is not valid"}
	ErrTopicAuthorizationFailed                = &KafkaError{29, "topic authorization failed"}
	ErrGroupAuthorizationFailed                = &KafkaError{30, "group authorization failed"}
	ErrClusterAuthorizationFailed              = &KafkaError{31, "cluster authorization failed"}
	ErrInvalidTimeStamp                        = &KafkaError{32, "timestamp of the message is out of acceptable range"}
	ErrUnsupportedSaslMechanism                = &KafkaError{33, "The broker does not support the requested SASL mechanism."}
	ErrIllegalSaslState                        = &KafkaError{34, "Request is not valid given the current SASL state."}
	ErrUnsupportedVersion                      = &KafkaError{35, "The version of API is not supported."}
	ErrTopicAlreadyExists                      = &KafkaError{36, "Topic with this name already exists."}
	ErrInvalidPartitions                       = &KafkaError{37, "Number of partitions is invalid."}
	ErrInvalidReplicationFactor                = &KafkaError{38, "Replication-factor is invalid."}
	ErrInvalidReplicaAssignment                = &KafkaError{39, "Replica assignment is invalid."}
	ErrInvalidConfig                           = &KafkaError{40, "Configuration is invalid."}
	ErrNotController                           = &KafkaError{41, "This is not the correct controller for this cluster."}
	ErrInvalidRequest                          = &KafkaError{42, "This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details."}
	ErrUnsupportedForMessageFormat             = &KafkaError{43, "The message format version on the broker does not support the request."}
	ErrPolicyViolation                         = &KafkaError{44, "Request parameters do not satisfy the configured policy."}
	ErrOutOfOrderSequenceNumber                = &KafkaError{45, "The broker received an out of order sequence number"}
	ErrDuplicateSequenceNumber                 = &KafkaError{46, "The broker received a duplicate sequence number"}
	ErrInvalidProducerEpoch                    = &KafkaError{47, "Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker."}
	ErrInvalidTxnState                         = &KafkaError{48, "The producer attempted a transactional operation in an invalid state"}
	ErrInvalidProducerIdMapping                = &KafkaError{49, "The producer attempted to use a producer id which is not currently assigned to its transactional id"}
	ErrInvalidTransactionTimeout               = &KafkaError{50, "The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms)."}
	ErrConcurrentTransactions                  = &KafkaError{51, "The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing"}
	ErrTransactionCoordinatorFenced            = &KafkaError{52, "Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer"}
	ErrTransactionalIdAuthorizationFailed      = &KafkaError{53, "Transactional Id authorization failed"}
	ErrSecurityDisabled                        = &KafkaError{54, "Security features are disabled."}
	ErrOperationNotAttempted                   = &KafkaError{55, "The broker did not attempt to execute this operation. This may happen for batched RPCs where some operations in the batch failed, causing the broker to respond without trying the rest."}
	ErrKafkaStorageError                       = &KafkaError{56, "Disk error when trying to access log file on the disk."}
	ErrLogDirNotFound                          = &KafkaError{57, "The user-specified log directory is not found in the broker config."}
	ErrSaslAuthenticationFailed                = &KafkaError{58, "SASL Authentication failed."}
	ErrUnknownProducerId                       = &KafkaError{59, "This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception."}
	ErrReassignmentInProgress                  = &KafkaError{60, "A partition reassignment is in progress"}
	ErrDelegationTokenAuthDisabled             = &KafkaError{61, "Delegation Token feature is not enabled."}
	ErrDelegationTokenNotFound                 = &KafkaError{62, "Delegation Token is not found on server."}
	ErrDelegationTokenOwnerMismatch            = &KafkaError{63, "Specified Principal is not valid Owner/Renewer."}
	ErrDelegationTokenRequestNotAllowed        = &KafkaError{64, "Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels."}
	ErrDelegationTokenAuthorizationFailed      = &KafkaError{65, "Delegation Token authorization failed."}
	ErrDelegationTokenExpired                  = &KafkaError{66, "Delegation Token is expired."}
	ErrInvalidPrincipalType                    = &KafkaError{67, "Supplied principalType is not supported"}
	ErrNonEmptyGroup                           = &KafkaError{68, "The group The group is not empty is not empty"}
	ErrGroupIdNotFound                         = &KafkaError{69, "The group id The group id does not exist was not found"}
	ErrFetchSessionIdNotFound                  = &KafkaError{70, "The fetch session ID was not found"}
	ErrInvalidFetchSessionEpoch                = &KafkaError{71, "The fetch session epoch is invalid"}
)
View Source
var ErrInvalidArrayLen = errors.New("invalid array length")
View Source
var ErrNotEnoughData = errors.New("not enough data")
View Source
var SupportedByDriver = map[int16]SupportedVersion{
	ProduceReqKind:          SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV2},
	FetchReqKind:            SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV5},
	OffsetReqKind:           SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV2},
	MetadataReqKind:         SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV5},
	OffsetCommitReqKind:     SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV3},
	OffsetFetchReqKind:      SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV3},
	ConsumerMetadataReqKind: SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV1},
	APIVersionsReqKind:      SupportedVersion{MinVersion: KafkaV0, MaxVersion: KafkaV1},
}

Functions

func ComputeCrc

func ComputeCrc(m *Message, compression Compression) uint32

ComputeCrc returns crc32 hash for given message content.

func ConfigureParser

func ConfigureParser(c ParserConfig) error

ConfigureParser configures the parser. It must be called prior to parsing any messages as the structure is currently not prepared for concurrent access.

func NewDecoder

func NewDecoder(r io.Reader) *decoder

func NewEncoder

func NewEncoder(w io.Writer) *encoder

func ReadReq

func ReadReq(r io.Reader) (requestKind int16, b []byte, err error)

ReadReq returns request kind ID and byte representation of the whole message in wire protocol format.

func ReadResp

func ReadResp(r io.Reader) (correlationID int32, b []byte, err error)

ReadResp returns message correlation ID and byte representation of the whole message in wire protocol that is returned when reading from given stream, including 4 bytes of message size itself. Byte representation returned by ReadResp can be parsed by all response reeaders to transform it into specialized response structure.

func SetCorrelationID

func SetCorrelationID(header *RequestHeader, correlationID int32)

func SetVersion

func SetVersion(header *RequestHeader, version int16)

Types

type APIVersionsReq

type APIVersionsReq struct {
	RequestHeader
}

func ReadAPIVersionsReq

func ReadAPIVersionsReq(r io.Reader) (*APIVersionsReq, error)

func (*APIVersionsReq) Bytes

func (r *APIVersionsReq) Bytes() ([]byte, error)

func (APIVersionsReq) Kind

func (r APIVersionsReq) Kind() int16

func (*APIVersionsReq) WriteTo

func (r *APIVersionsReq) WriteTo(w io.Writer) (int64, error)

type APIVersionsResp

type APIVersionsResp struct {
	Version       int16
	CorrelationID int32
	APIVersions   []SupportedVersion
	ThrottleTime  time.Duration
}

func ReadAPIVersionsResp

func ReadAPIVersionsResp(r io.Reader) (*APIVersionsResp, error)

func ReadVersionedAPIVersionsResp

func ReadVersionedAPIVersionsResp(r io.Reader, version int16) (*APIVersionsResp, error)

func (*APIVersionsResp) Bytes

func (r *APIVersionsResp) Bytes() ([]byte, error)

type Compression

type Compression int8
const (
	CompressionNone   Compression = 0
	CompressionGzip   Compression = 1
	CompressionSnappy Compression = 2
)

type ConfigEntry

type ConfigEntry struct {
	ConfigName  string
	ConfigValue string
}

type ConsumerMetadataReq

type ConsumerMetadataReq struct {
	RequestHeader
	ConsumerGroup   string
	CoordinatorType int8 // >= KafkaV1
}

func ReadConsumerMetadataReq

func ReadConsumerMetadataReq(r io.Reader) (*ConsumerMetadataReq, error)

func (*ConsumerMetadataReq) Bytes

func (r *ConsumerMetadataReq) Bytes() ([]byte, error)

func (ConsumerMetadataReq) Kind

func (r ConsumerMetadataReq) Kind() int16

func (*ConsumerMetadataReq) WriteTo

func (r *ConsumerMetadataReq) WriteTo(w io.Writer) (int64, error)

type ConsumerMetadataResp

type ConsumerMetadataResp struct {
	Version         int16
	CorrelationID   int32
	ThrottleTime    time.Duration // >= KafkaV1
	Err             error
	ErrMsg          string // >= KafkaV1
	CoordinatorID   int32
	CoordinatorHost string
	CoordinatorPort int32
}

func ReadConsumerMetadataResp

func ReadConsumerMetadataResp(r io.Reader) (*ConsumerMetadataResp, error)

func ReadVersionedConsumerMetadataResp

func ReadVersionedConsumerMetadataResp(r io.Reader, version int16) (*ConsumerMetadataResp, error)

func (*ConsumerMetadataResp) Bytes

func (r *ConsumerMetadataResp) Bytes() ([]byte, error)

type CreateTopicsReq

type CreateTopicsReq struct {
	RequestHeader
	CreateTopicsRequests []TopicInfo
	Timeout              time.Duration
	ValidateOnly         bool
}

func ReadCreateTopicsReq

func ReadCreateTopicsReq(r io.Reader) (*CreateTopicsReq, error)

func (*CreateTopicsReq) Bytes

func (r *CreateTopicsReq) Bytes() ([]byte, error)

func (CreateTopicsReq) Kind

func (r CreateTopicsReq) Kind() int16

func (*CreateTopicsReq) WriteTo

func (r *CreateTopicsReq) WriteTo(w io.Writer) (int64, error)

type CreateTopicsResp

type CreateTopicsResp struct {
	Version       int16
	CorrelationID int32
	TopicErrors   []TopicError
	ThrottleTime  time.Duration // >= KafkaV2
}

func ReadCreateTopicsResp

func ReadCreateTopicsResp(r io.Reader) (*CreateTopicsResp, error)

func ReadVersionedCreateTopicsResp

func ReadVersionedCreateTopicsResp(r io.Reader, version int16) (*CreateTopicsResp, error)

func (*CreateTopicsResp) Bytes

func (r *CreateTopicsResp) Bytes() ([]byte, error)

type DeleteTopicError

type DeleteTopicError struct {
	Topic     string
	ErrorCode int16
}

type DeleteTopicsReq

type DeleteTopicsReq struct {
	RequestHeader
	Topics  []string
	Timeout time.Duration
}

func ReadDeleteTopicsReq

func ReadDeleteTopicsReq(r io.Reader) (*DeleteTopicsReq, error)

func (*DeleteTopicsReq) Bytes

func (r *DeleteTopicsReq) Bytes() ([]byte, error)

func (DeleteTopicsReq) Kind

func (r DeleteTopicsReq) Kind() int16

func (*DeleteTopicsReq) WriteTo

func (r *DeleteTopicsReq) WriteTo(w io.Writer) (int64, error)

type DeleteTopicsResp

type DeleteTopicsResp struct {
	TopicErrors  []DeleteTopicError
	ThrottleTime time.Duration // >= KafkaV2
}

func ReadDeleteTopicsResp

func ReadDeleteTopicsResp(r io.Reader) (*DeleteTopicsResp, error)

func (*DeleteTopicsResp) Bytes

func (r *DeleteTopicsResp) Bytes() ([]byte, error)

type FetchReq

type FetchReq struct {
	RequestHeader
	ReplicaID      int32
	MaxWaitTime    time.Duration
	MinBytes       int32
	MaxBytes       int32 // >= KafkaV3
	IsolationLevel int8  // >= KafkaV4

	Topics []FetchReqTopic
}

func ReadFetchReq

func ReadFetchReq(r io.Reader) (*FetchReq, error)

func (*FetchReq) Bytes

func (r *FetchReq) Bytes() ([]byte, error)

func (FetchReq) Kind

func (r FetchReq) Kind() int16

func (*FetchReq) WriteTo

func (r *FetchReq) WriteTo(w io.Writer) (int64, error)

type FetchReqPartition

type FetchReqPartition struct {
	ID             int32
	FetchOffset    int64
	LogStartOffset int64 // >= KafkaV5
	MaxBytes       int32
}

type FetchReqTopic

type FetchReqTopic struct {
	Name       string
	Partitions []FetchReqPartition
}

type FetchResp

type FetchResp struct {
	Version       int16
	CorrelationID int32
	ThrottleTime  time.Duration
	Topics        []FetchRespTopic
}

func ReadFetchResp

func ReadFetchResp(r io.Reader) (*FetchResp, error)

func ReadVersionedFetchResp

func ReadVersionedFetchResp(r io.Reader, version int16) (*FetchResp, error)

func (*FetchResp) Bytes

func (r *FetchResp) Bytes() ([]byte, error)

type FetchRespAbortedTransaction

type FetchRespAbortedTransaction struct {
	ProducerID  int64
	FirstOffset int64
}

type FetchRespPartition

type FetchRespPartition struct {
	ID                  int32
	Err                 error
	TipOffset           int64
	LastStableOffset    int64
	LogStartOffset      int64
	AbortedTransactions []FetchRespAbortedTransaction
	Messages            []*Message
	MessageVersion      MessageVersion
	RecordBatches       []*RecordBatch
}

type FetchRespTopic

type FetchRespTopic struct {
	Name       string
	Partitions []FetchRespPartition
}

type KafkaError

type KafkaError struct {
	// contains filtered or unexported fields
}

func (*KafkaError) Errno

func (err *KafkaError) Errno() int

func (*KafkaError) Error

func (err *KafkaError) Error() string

type Message

type Message struct {
	Key       []byte
	Value     []byte
	Offset    int64  // set when fetching and after successful producing
	Crc       uint32 // set when fetching, ignored when producing
	Topic     string // set when fetching, ignored when producing
	Partition int32  // set when fetching, ignored when producing
	TipOffset int64  // set when fetching, ignored when processing
}

Message represents single entity of message set.

type MessageVersion

type MessageVersion int8

Message version define which format of messages is using in this particular Produce/Response MessageV0 and MessageV1 indicate usage of MessageSet MessageV3 indicate usage of RecordBatch See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

const MessageV0 MessageVersion = 0
const MessageV1 MessageVersion = 1
const MessageV2 MessageVersion = 2

type MetadataReq

type MetadataReq struct {
	RequestHeader
	Topics                 []string
	AllowAutoTopicCreation bool // >= KafkaV4 only
}

func ReadMetadataReq

func ReadMetadataReq(r io.Reader) (*MetadataReq, error)

func (*MetadataReq) Bytes

func (r *MetadataReq) Bytes() ([]byte, error)

func (MetadataReq) Kind

func (r MetadataReq) Kind() int16

func (*MetadataReq) WriteTo

func (r *MetadataReq) WriteTo(w io.Writer) (int64, error)

type MetadataResp

type MetadataResp struct {
	Version       int16
	CorrelationID int32
	ThrottleTime  time.Duration // >= KafkaV3
	Brokers       []MetadataRespBroker
	ClusterID     string // >= KafkaV2
	ControllerID  int32  // >= KafkaV1
	Topics        []MetadataRespTopic
}

func ReadMetadataResp

func ReadMetadataResp(r io.Reader) (*MetadataResp, error)

func ReadVersionedMetadataResp

func ReadVersionedMetadataResp(r io.Reader, version int16) (*MetadataResp, error)

func (*MetadataResp) Bytes

func (r *MetadataResp) Bytes() ([]byte, error)

type MetadataRespBroker

type MetadataRespBroker struct {
	NodeID int32
	Host   string
	Port   int32
	Rack   string // >= KafkaV1
}

type MetadataRespPartition

type MetadataRespPartition struct {
	Err             error
	ID              int32
	Leader          int32
	Replicas        []int32
	Isrs            []int32
	OfflineReplicas []int32
}

type MetadataRespTopic

type MetadataRespTopic struct {
	Name       string
	Err        error
	IsInternal bool // >= KafkaV1
	Partitions []MetadataRespPartition
}

type OffsetCommitReq

type OffsetCommitReq struct {
	RequestHeader
	ConsumerGroup     string
	GroupGenerationID int32  // >= KafkaV1 only
	MemberID          string // >= KafkaV1 only
	RetentionTime     int64  // >= KafkaV2 only
	Topics            []OffsetCommitReqTopic
}

func ReadOffsetCommitReq

func ReadOffsetCommitReq(r io.Reader) (*OffsetCommitReq, error)

func (*OffsetCommitReq) Bytes

func (r *OffsetCommitReq) Bytes() ([]byte, error)

func (OffsetCommitReq) Kind

func (r OffsetCommitReq) Kind() int16

func (*OffsetCommitReq) WriteTo

func (r *OffsetCommitReq) WriteTo(w io.Writer) (int64, error)

type OffsetCommitReqPartition

type OffsetCommitReqPartition struct {
	ID        int32
	Offset    int64
	TimeStamp time.Time // == KafkaV1 only
	Metadata  string
}

type OffsetCommitReqTopic

type OffsetCommitReqTopic struct {
	Name       string
	Partitions []OffsetCommitReqPartition
}

type OffsetCommitResp

type OffsetCommitResp struct {
	Version       int16
	CorrelationID int32
	ThrottleTime  time.Duration // >= KafkaV3 only
	Topics        []OffsetCommitRespTopic
}

func ReadOffsetCommitResp

func ReadOffsetCommitResp(r io.Reader) (*OffsetCommitResp, error)

func ReadVersionedOffsetCommitResp

func ReadVersionedOffsetCommitResp(r io.Reader, version int16) (*OffsetCommitResp, error)

func (*OffsetCommitResp) Bytes

func (r *OffsetCommitResp) Bytes() ([]byte, error)

type OffsetCommitRespPartition

type OffsetCommitRespPartition struct {
	ID  int32
	Err error
}

type OffsetCommitRespTopic

type OffsetCommitRespTopic struct {
	Name       string
	Partitions []OffsetCommitRespPartition
}

type OffsetFetchReq

type OffsetFetchReq struct {
	RequestHeader
	ConsumerGroup string
	Topics        []OffsetFetchReqTopic
}

func ReadOffsetFetchReq

func ReadOffsetFetchReq(r io.Reader) (*OffsetFetchReq, error)

func (*OffsetFetchReq) Bytes

func (r *OffsetFetchReq) Bytes() ([]byte, error)

func (OffsetFetchReq) Kind

func (r OffsetFetchReq) Kind() int16

func (*OffsetFetchReq) WriteTo

func (r *OffsetFetchReq) WriteTo(w io.Writer) (int64, error)

type OffsetFetchReqTopic

type OffsetFetchReqTopic struct {
	Name       string
	Partitions []int32
}

type OffsetFetchResp

type OffsetFetchResp struct {
	Version       int16
	CorrelationID int32
	ThrottleTime  time.Duration // >= KafkaV3
	Topics        []OffsetFetchRespTopic
	Err           error // >= KafkaV2
}

func ReadOffsetFetchResp

func ReadOffsetFetchResp(r io.Reader) (*OffsetFetchResp, error)

func ReadVersionedOffsetFetchResp

func ReadVersionedOffsetFetchResp(r io.Reader, version int16) (*OffsetFetchResp, error)

func (*OffsetFetchResp) Bytes

func (r *OffsetFetchResp) Bytes() ([]byte, error)

type OffsetFetchRespPartition

type OffsetFetchRespPartition struct {
	ID       int32
	Offset   int64
	Metadata string
	Err      error
}

type OffsetFetchRespTopic

type OffsetFetchRespTopic struct {
	Name       string
	Partitions []OffsetFetchRespPartition
}

type OffsetReq

type OffsetReq struct {
	RequestHeader
	ReplicaID      int32
	IsolationLevel int8
	Topics         []OffsetReqTopic
}

func ReadOffsetReq

func ReadOffsetReq(r io.Reader) (*OffsetReq, error)

func (*OffsetReq) Bytes

func (r *OffsetReq) Bytes() ([]byte, error)

func (OffsetReq) Kind

func (r OffsetReq) Kind() int16

func (*OffsetReq) WriteTo

func (r *OffsetReq) WriteTo(w io.Writer) (int64, error)

type OffsetReqPartition

type OffsetReqPartition struct {
	ID         int32
	TimeMs     int64 // cannot be time.Time because of negative values
	MaxOffsets int32 // == KafkaV0 only
}

type OffsetReqTopic

type OffsetReqTopic struct {
	Name       string
	Partitions []OffsetReqPartition
}

type OffsetResp

type OffsetResp struct {
	Version       int16
	CorrelationID int32
	ThrottleTime  time.Duration
	Topics        []OffsetRespTopic
}

func ReadOffsetResp

func ReadOffsetResp(r io.Reader) (*OffsetResp, error)

func ReadVersionedOffsetResp

func ReadVersionedOffsetResp(r io.Reader, version int16) (*OffsetResp, error)

func (*OffsetResp) Bytes

func (r *OffsetResp) Bytes() ([]byte, error)

type OffsetRespPartition

type OffsetRespPartition struct {
	ID        int32
	Err       error
	TimeStamp time.Time // >= KafkaV1 only
	Offsets   []int64   // used in KafkaV0
}

type OffsetRespTopic

type OffsetRespTopic struct {
	Name       string
	Partitions []OffsetRespPartition
}

type ParserConfig

type ParserConfig struct {
	// SimplifiedMessageSetParsing enables a simplified version of the
	// MessageSet parser which will not split MessageSet into slices of
	// Message structures. Instead, the entire MessageSet will be read
	// over. This mode improves parsing speed due to reduce memory read at
	// the cost of not providing access to the message payload after
	// parsing.
	SimplifiedMessageSetParsing bool
}

ParserConfig is optional configuration for the parser. It can be configured via SetParserConfig

type ProduceReq

type ProduceReq struct {
	RequestHeader
	Compression     Compression // only used when sending ProduceReqs
	TransactionalID string
	RequiredAcks    int16
	Timeout         time.Duration
	Topics          []ProduceReqTopic
}

func ReadProduceReq

func ReadProduceReq(r io.Reader) (*ProduceReq, error)

func (*ProduceReq) Bytes

func (r *ProduceReq) Bytes() ([]byte, error)

func (ProduceReq) Kind

func (r ProduceReq) Kind() int16

func (*ProduceReq) WriteTo

func (r *ProduceReq) WriteTo(w io.Writer) (int64, error)

type ProduceReqPartition

type ProduceReqPartition struct {
	ID       int32
	Messages []*Message
}

type ProduceReqTopic

type ProduceReqTopic struct {
	Name       string
	Partitions []ProduceReqPartition
}

type ProduceResp

type ProduceResp struct {
	Version       int16
	CorrelationID int32
	Topics        []ProduceRespTopic
	ThrottleTime  time.Duration
}

func ReadProduceResp

func ReadProduceResp(r io.Reader) (*ProduceResp, error)

func ReadVersionedProduceResp

func ReadVersionedProduceResp(r io.Reader, version int16) (*ProduceResp, error)

func (*ProduceResp) Bytes

func (r *ProduceResp) Bytes() ([]byte, error)

type ProduceRespPartition

type ProduceRespPartition struct {
	ID            int32
	Err           error
	Offset        int64
	LogAppendTime int64
}

type ProduceRespTopic

type ProduceRespTopic struct {
	Name       string
	Partitions []ProduceRespPartition
}

type Record

type Record struct {
	Length         int64
	Attributes     int8
	TimestampDelta int64
	OffsetDelta    int64
	Key            []byte
	Value          []byte
	Headers        []RecordHeader
}

type RecordBatch

type RecordBatch struct {
	FirstOffset          int64
	Length               int32
	PartitionLeaderEpoch int32
	Magic                int8
	CRC                  int32
	Attributes           int16
	LastOffsetDelta      int32
	FirstTimestamp       int64
	MaxTimestamp         int64
	ProducerId           int64
	ProducerEpoch        int16
	FirstSequence        int32
	Records              []*Record
}

func (*RecordBatch) Compression

func (rb *RecordBatch) Compression() Compression

type RecordHeader

type RecordHeader struct {
	Key   string
	Value []byte
}

type ReplicaAssignment

type ReplicaAssignment struct {
	Partition int32
	Replicas  []int32
}

type Request

type Request interface {
	Kind() int16
	GetHeader() *RequestHeader
	GetVersion() int16
	GetCorrelationID() int32
	GetClientID() string
	SetClientID(cliendID string)
	io.WriterTo
	Bytes() ([]byte, error)
}

type RequestHeader

type RequestHeader struct {
	ClientID string
	// contains filtered or unexported fields
}

func (*RequestHeader) GetClientID

func (h *RequestHeader) GetClientID() string

func (*RequestHeader) GetCorrelationID

func (h *RequestHeader) GetCorrelationID() int32

func (*RequestHeader) GetHeader

func (h *RequestHeader) GetHeader() *RequestHeader

func (*RequestHeader) GetVersion

func (h *RequestHeader) GetVersion() int16

func (*RequestHeader) SetClientID

func (h *RequestHeader) SetClientID(cliendID string)

type SupportedVersion

type SupportedVersion struct {
	APIKey     int16
	MinVersion int16
	MaxVersion int16
}

type TopicError

type TopicError struct {
	Topic        string
	ErrorCode    int16
	ErrorMessage string // >= KafkaV1
	Err          error
}

type TopicInfo

type TopicInfo struct {
	Topic              string
	NumPartitions      int32
	ReplicationFactor  int16
	ReplicaAssignments []ReplicaAssignment
	ConfigEntries      []ConfigEntry
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL