protocol

package
v0.3.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 15, 2024 License: Apache-2.0 Imports: 10 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	TypeBool               = &Bool{}
	TypeInt16              = &Int16{}
	TypeInt32              = &Int32{}
	TypeStr                = &Str{}
	TypeNullableStr        = &NullableStr{}
	TypeCompactStr         = &CompactStr{}
	TypeCompactNullableStr = &CompactNullableStr{}
	TypeUuid               = &Uuid{}
)
View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")

ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var MaxRequestSize int32 = 100 * 1024 * 1024

MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.

View Source
var MaxResponseSize int32 = 100 * 1024 * 1024

MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to protect the client from running out of memory. Please note that brokers do not have any natural limit on the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers (see https://issues.apache.org/jira/browse/KAFKA-2063).

Functions

func Decode

func Decode(buf []byte, in decoder) error

Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes, interpreted using Kafka's encoding rules.

func Encode

func Encode(e encoder) ([]byte, error)

Encode takes an Encoder and turns it into bytes while potentially recording metrics.

func EncodeSchema

func EncodeSchema(s *Struct, schema Schema) ([]byte, error)

Types

type Array added in v0.2.9

type Array struct {
	Name string
	Ty   Schema
}

func (*Array) GetName added in v0.2.9

func (f *Array) GetName() string

func (*Array) GetSchema added in v0.2.9

func (f *Array) GetSchema() Schema

type Bool

type Bool struct{}

func (*Bool) GetFields added in v0.2.4

func (f *Bool) GetFields() []boundField

func (*Bool) GetFieldsByName added in v0.2.4

func (f *Bool) GetFieldsByName() map[string]*boundField

func (*Bool) GetName added in v0.2.4

func (f *Bool) GetName() string

type ByteReader added in v0.2.0

type ByteReader struct {
	io.Reader
}

func (ByteReader) ReadByte added in v0.2.0

func (r ByteReader) ReadByte() (byte, error)

type CompactArray added in v0.2.9

type CompactArray struct {
	Name string
	Ty   Schema
}

func (*CompactArray) GetName added in v0.2.9

func (f *CompactArray) GetName() string

func (*CompactArray) GetSchema added in v0.2.9

func (f *CompactArray) GetSchema() Schema

type CompactNullableArray added in v0.2.9

type CompactNullableArray struct {
	Name string
	Ty   EncoderDecoder
}

func (*CompactNullableArray) GetName added in v0.2.9

func (f *CompactNullableArray) GetName() string

type CompactNullableStr added in v0.2.0

type CompactNullableStr struct{}

func (*CompactNullableStr) GetFields added in v0.2.4

func (f *CompactNullableStr) GetFields() []boundField

func (*CompactNullableStr) GetFieldsByName added in v0.2.4

func (f *CompactNullableStr) GetFieldsByName() map[string]*boundField

func (*CompactNullableStr) GetName added in v0.2.4

func (f *CompactNullableStr) GetName() string

type CompactStr added in v0.2.0

type CompactStr struct {
}

func (*CompactStr) GetFields added in v0.2.4

func (f *CompactStr) GetFields() []boundField

func (*CompactStr) GetFieldsByName added in v0.2.4

func (f *CompactStr) GetFieldsByName() map[string]*boundField

func (*CompactStr) GetName added in v0.2.4

func (f *CompactStr) GetName() string

type EncoderDecoder

type EncoderDecoder interface {
	// contains filtered or unexported methods
}

type Field

type Field interface {
	EncoderDecoder
	GetName() string
	GetSchema() Schema
}

type Int16

type Int16 struct{}

func (*Int16) GetFields added in v0.2.4

func (f *Int16) GetFields() []boundField

func (*Int16) GetFieldsByName added in v0.2.4

func (f *Int16) GetFieldsByName() map[string]*boundField

func (*Int16) GetName added in v0.2.4

func (f *Int16) GetName() string

type Int32

type Int32 struct{}

func (*Int32) GetFields added in v0.2.4

func (f *Int32) GetFields() []boundField

func (*Int32) GetFieldsByName added in v0.2.4

func (f *Int32) GetFieldsByName() map[string]*boundField

func (*Int32) GetName added in v0.2.4

func (f *Int32) GetName() string

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See http://kafka.apache.org/protocol.html#protocol_error_codes

const (
	ErrNoError                            KError = 0
	ErrUnknown                            KError = -1
	ErrOffsetOutOfRange                   KError = 1
	ErrInvalidMessage                     KError = 2
	ErrUnknownTopicOrPartition            KError = 3
	ErrInvalidMessageSize                 KError = 4
	ErrLeaderNotAvailable                 KError = 5
	ErrNotLeaderForPartition              KError = 6
	ErrRequestTimedOut                    KError = 7
	ErrBrokerNotAvailable                 KError = 8
	ErrReplicaNotAvailable                KError = 9
	ErrMessageSizeTooLarge                KError = 10
	ErrStaleControllerEpochCode           KError = 11
	ErrOffsetMetadataTooLarge             KError = 12
	ErrNetworkException                   KError = 13
	ErrOffsetsLoadInProgress              KError = 14
	ErrConsumerCoordinatorNotAvailable    KError = 15
	ErrNotCoordinatorForConsumer          KError = 16
	ErrInvalidTopic                       KError = 17
	ErrMessageSetSizeTooLarge             KError = 18
	ErrNotEnoughReplicas                  KError = 19
	ErrNotEnoughReplicasAfterAppend       KError = 20
	ErrInvalidRequiredAcks                KError = 21
	ErrIllegalGeneration                  KError = 22
	ErrInconsistentGroupProtocol          KError = 23
	ErrInvalidGroupId                     KError = 24
	ErrUnknownMemberId                    KError = 25
	ErrInvalidSessionTimeout              KError = 26
	ErrRebalanceInProgress                KError = 27
	ErrInvalidCommitOffsetSize            KError = 28
	ErrTopicAuthorizationFailed           KError = 29
	ErrGroupAuthorizationFailed           KError = 30
	ErrClusterAuthorizationFailed         KError = 31
	ErrInvalidTimestamp                   KError = 32
	ErrUnsupportedSASLMechanism           KError = 33
	ErrIllegalSASLState                   KError = 34
	ErrUnsupportedVersion                 KError = 35
	ErrTopicAlreadyExists                 KError = 36
	ErrInvalidPartitions                  KError = 37
	ErrInvalidReplicationFactor           KError = 38
	ErrInvalidReplicaAssignment           KError = 39
	ErrInvalidConfig                      KError = 40
	ErrNotController                      KError = 41
	ErrInvalidRequest                     KError = 42
	ErrUnsupportedForMessageFormat        KError = 43
	ErrPolicyViolation                    KError = 44
	ErrOutOfOrderSequenceNumber           KError = 45
	ErrDuplicateSequenceNumber            KError = 46
	ErrInvalidProducerEpoch               KError = 47
	ErrInvalidTxnState                    KError = 48
	ErrInvalidProducerIDMapping           KError = 49
	ErrInvalidTransactionTimeout          KError = 50
	ErrConcurrentTransactions             KError = 51
	ErrTransactionCoordinatorFenced       KError = 52
	ErrTransactionalIDAuthorizationFailed KError = 53
	ErrSecurityDisabled                   KError = 54
	ErrOperationNotAttempted              KError = 55
	ErrKafkaStorageError                  KError = 56
	ErrLogDirNotFound                     KError = 57
	ErrSASLAuthenticationFailed           KError = 58
	ErrUnknownProducerID                  KError = 59
	ErrReassignmentInProgress             KError = 60
)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type Mfield added in v0.2.9

type Mfield struct {
	Name string
	Ty   Schema
}

func (*Mfield) GetName added in v0.2.9

func (f *Mfield) GetName() string

func (*Mfield) GetSchema added in v0.2.9

func (f *Mfield) GetSchema() Schema

type NullableStr

type NullableStr struct{}

func (*NullableStr) GetFields added in v0.2.4

func (f *NullableStr) GetFields() []boundField

func (*NullableStr) GetFieldsByName added in v0.2.4

func (f *NullableStr) GetFieldsByName() map[string]*boundField

func (*NullableStr) GetName added in v0.2.4

func (f *NullableStr) GetName() string

type PacketDecodingError

type PacketDecodingError struct {
	Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (PacketDecodingError) Error

func (err PacketDecodingError) Error() string

type PacketEncodingError

type PacketEncodingError struct {
	Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

func (PacketEncodingError) Error

func (err PacketEncodingError) Error() string

type ProtocolBody

type ProtocolBody interface {
	// contains filtered or unexported methods
}

type Request

type Request struct {
	CorrelationID int32
	ClientID      string
	Body          ProtocolBody
}

type RequestAcksReader added in v0.2.5

type RequestAcksReader struct {
}

func (RequestAcksReader) ReadAndDiscardHeaderV1Part added in v0.2.5

func (r RequestAcksReader) ReadAndDiscardHeaderV1Part(reader io.Reader) (err error)

func (RequestAcksReader) ReadAndDiscardProduceAcks added in v0.2.5

func (r RequestAcksReader) ReadAndDiscardProduceAcks(reader io.Reader) (acks int16, err error)

func (RequestAcksReader) ReadAndDiscardProduceTxnAcks added in v0.2.5

func (r RequestAcksReader) ReadAndDiscardProduceTxnAcks(reader io.Reader) (acks int16, err error)

type RequestKeyVersion

type RequestKeyVersion struct {
	Length     int32
	ApiKey     int16
	ApiVersion int16
}

func (*RequestKeyVersion) ResponseHeaderVersion added in v0.2.0

func (r *RequestKeyVersion) ResponseHeaderVersion() int16

Determine response header version. Function returns -1 for unknown api key. See also public short responseHeaderVersion(short _version) in kafka/clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java

type RequestV2 added in v0.2.4

type RequestV2 struct {
	CorrelationID int32
	ClientID      string
	TaggedFields  TaggedFields
	Body          ProtocolBody
}

type ResponseHeader

type ResponseHeader struct {
	Length        int32
	CorrelationID int32
}

type ResponseHeaderTaggedFields added in v0.2.0

type ResponseHeaderTaggedFields struct {
	// contains filtered or unexported fields
}

func NewResponseHeaderTaggedFields added in v0.2.0

func NewResponseHeaderTaggedFields(req *RequestKeyVersion) (*ResponseHeaderTaggedFields, error)

func (*ResponseHeaderTaggedFields) MaybeRead added in v0.2.0

func (r *ResponseHeaderTaggedFields) MaybeRead(reader io.Reader) ([]byte, error)

type ResponseHeaderV1 added in v0.2.4

type ResponseHeaderV1 struct {
	Length          int32
	CorrelationID   int32
	RawTaggedFields []rawTaggedField
}

type ResponseModifier

type ResponseModifier interface {
	Apply(resp []byte) ([]byte, error)
}

func GetResponseModifier

func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc config.NetAddressMappingFunc) (ResponseModifier, error)

type SaslAuthenticateRequestV0 added in v0.0.5

type SaslAuthenticateRequestV0 struct {
	SaslAuthBytes []byte
}

type SaslAuthenticateRequestV1 added in v0.2.4

type SaslAuthenticateRequestV1 struct {
	SaslAuthBytes []byte
}

type SaslAuthenticateRequestV2 added in v0.2.4

type SaslAuthenticateRequestV2 struct {
	SaslAuthBytes []byte
	TaggedFields  TaggedFields
}

type SaslAuthenticateResponseV0 added in v0.0.5

type SaslAuthenticateResponseV0 struct {
	Err           KError
	ErrMsg        *string
	SaslAuthBytes []byte
}

type SaslAuthenticateResponseV1 added in v0.2.4

type SaslAuthenticateResponseV1 struct {
	Err               KError
	ErrMsg            *string
	SaslAuthBytes     []byte
	SessionLifetimeMs int64
}

type SaslAuthenticateResponseV2 added in v0.2.4

type SaslAuthenticateResponseV2 struct {
	Err               KError
	ErrMsg            *string
	SaslAuthBytes     []byte
	SessionLifetimeMs int64
	TaggedFields      TaggedFields
}

type SaslHandshakeRequestV0orV1 added in v0.0.5

type SaslHandshakeRequestV0orV1 struct {
	Version   int16 // not encoded / decoded
	Mechanism string
}

type SaslHandshakeResponseV0orV1 added in v0.0.5

type SaslHandshakeResponseV0orV1 struct {
	Err               KError
	EnabledMechanisms []string
}

type Schema

type Schema interface {
	EncoderDecoder
	GetFields() []boundField
	GetFieldsByName() map[string]*boundField
	GetName() string
}

func NewSchema

func NewSchema(name string, fs ...Field) Schema

NewSchema creates new schema. It panics when a duplicate field is provided

type SchemaDecodingError

type SchemaDecodingError struct {
	Info string
}

SchemaDecodingError is returned from a failure while decoding a schema .

func (SchemaDecodingError) Error

func (err SchemaDecodingError) Error() string

type SchemaEncodingError

type SchemaEncodingError struct {
	Info string
}

SchemaEncodingError is returned from a failure while encoding a schema .

func (SchemaEncodingError) Error

func (err SchemaEncodingError) Error() string

type SchemaTaggedFields added in v0.2.9

type SchemaTaggedFields struct {
	Name string
}

func (SchemaTaggedFields) GetName added in v0.2.9

func (f SchemaTaggedFields) GetName() string

func (SchemaTaggedFields) GetSchema added in v0.2.9

func (f SchemaTaggedFields) GetSchema() Schema

type Str

type Str struct {
}

func (*Str) GetFields added in v0.2.4

func (f *Str) GetFields() []boundField

func (*Str) GetFieldsByName added in v0.2.4

func (f *Str) GetFieldsByName() map[string]*boundField

func (*Str) GetName added in v0.2.4

func (f *Str) GetName() string

type Struct

type Struct struct {
	Schema Schema
	Values []interface{}
}

func DecodeSchema

func DecodeSchema(buf []byte, schema Schema) (*Struct, error)

func (Struct) Get

func (s Struct) Get(name string) interface{}

func (*Struct) GetSchema added in v0.2.4

func (s *Struct) GetSchema() Schema

func (*Struct) Replace

func (s *Struct) Replace(name string, value interface{}) error

func (Struct) String

func (s Struct) String() string

type TaggedFields added in v0.2.4

type TaggedFields struct {
	// contains filtered or unexported fields
}

type Uuid added in v0.2.9

type Uuid struct {
}

func (*Uuid) GetFields added in v0.2.9

func (f *Uuid) GetFields() []boundField

func (*Uuid) GetFieldsByName added in v0.2.9

func (f *Uuid) GetFieldsByName() map[string]*boundField

func (*Uuid) GetName added in v0.2.9

func (f *Uuid) GetName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL