protocol

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2020 License: Apache-2.0 Imports: 10 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")

ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var MaxRequestSize int32 = 100 * 1024 * 1024

MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.

View Source
var MaxResponseSize int32 = 100 * 1024 * 1024

MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to protect the client from running out of memory. Please note that brokers do not have any natural limit on the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers (see https://issues.apache.org/jira/browse/KAFKA-2063).

Functions

func Decode

func Decode(buf []byte, in decoder) error

Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes, interpreted using Kafka's encoding rules.

func Encode

func Encode(e encoder) ([]byte, error)

Encode takes an Encoder and turns it into bytes while potentially recording metrics.

func EncodeSchema

func EncodeSchema(s *Struct, schema Schema) ([]byte, error)

Types

type Bool

type Bool struct{}

type ByteReader added in v0.2.0

type ByteReader struct {
	io.Reader
}

func (ByteReader) ReadByte added in v0.2.0

func (r ByteReader) ReadByte() (byte, error)

type CompactNullableStr added in v0.2.0

type CompactNullableStr struct{}

type CompactStr added in v0.2.0

type CompactStr struct {
}

type EncoderDecoder

type EncoderDecoder interface {
	// contains filtered or unexported methods
}

type Field

type Field interface {
	EncoderDecoder
	GetName() string
}

type Int16

type Int16 struct{}

type Int32

type Int32 struct{}

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See http://kafka.apache.org/protocol.html#protocol_error_codes

const (
	ErrNoError                            KError = 0
	ErrUnknown                            KError = -1
	ErrOffsetOutOfRange                   KError = 1
	ErrInvalidMessage                     KError = 2
	ErrUnknownTopicOrPartition            KError = 3
	ErrInvalidMessageSize                 KError = 4
	ErrLeaderNotAvailable                 KError = 5
	ErrNotLeaderForPartition              KError = 6
	ErrRequestTimedOut                    KError = 7
	ErrBrokerNotAvailable                 KError = 8
	ErrReplicaNotAvailable                KError = 9
	ErrMessageSizeTooLarge                KError = 10
	ErrStaleControllerEpochCode           KError = 11
	ErrOffsetMetadataTooLarge             KError = 12
	ErrNetworkException                   KError = 13
	ErrOffsetsLoadInProgress              KError = 14
	ErrConsumerCoordinatorNotAvailable    KError = 15
	ErrNotCoordinatorForConsumer          KError = 16
	ErrInvalidTopic                       KError = 17
	ErrMessageSetSizeTooLarge             KError = 18
	ErrNotEnoughReplicas                  KError = 19
	ErrNotEnoughReplicasAfterAppend       KError = 20
	ErrInvalidRequiredAcks                KError = 21
	ErrIllegalGeneration                  KError = 22
	ErrInconsistentGroupProtocol          KError = 23
	ErrInvalidGroupId                     KError = 24
	ErrUnknownMemberId                    KError = 25
	ErrInvalidSessionTimeout              KError = 26
	ErrRebalanceInProgress                KError = 27
	ErrInvalidCommitOffsetSize            KError = 28
	ErrTopicAuthorizationFailed           KError = 29
	ErrGroupAuthorizationFailed           KError = 30
	ErrClusterAuthorizationFailed         KError = 31
	ErrInvalidTimestamp                   KError = 32
	ErrUnsupportedSASLMechanism           KError = 33
	ErrIllegalSASLState                   KError = 34
	ErrUnsupportedVersion                 KError = 35
	ErrTopicAlreadyExists                 KError = 36
	ErrInvalidPartitions                  KError = 37
	ErrInvalidReplicationFactor           KError = 38
	ErrInvalidReplicaAssignment           KError = 39
	ErrInvalidConfig                      KError = 40
	ErrNotController                      KError = 41
	ErrInvalidRequest                     KError = 42
	ErrUnsupportedForMessageFormat        KError = 43
	ErrPolicyViolation                    KError = 44
	ErrOutOfOrderSequenceNumber           KError = 45
	ErrDuplicateSequenceNumber            KError = 46
	ErrInvalidProducerEpoch               KError = 47
	ErrInvalidTxnState                    KError = 48
	ErrInvalidProducerIDMapping           KError = 49
	ErrInvalidTransactionTimeout          KError = 50
	ErrConcurrentTransactions             KError = 51
	ErrTransactionCoordinatorFenced       KError = 52
	ErrTransactionalIDAuthorizationFailed KError = 53
	ErrSecurityDisabled                   KError = 54
	ErrOperationNotAttempted              KError = 55
	ErrKafkaStorageError                  KError = 56
	ErrLogDirNotFound                     KError = 57
	ErrSASLAuthenticationFailed           KError = 58
	ErrUnknownProducerID                  KError = 59
	ErrReassignmentInProgress             KError = 60
)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type NullableStr

type NullableStr struct{}

type PacketDecodingError

type PacketDecodingError struct {
	Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (PacketDecodingError) Error

func (err PacketDecodingError) Error() string

type PacketEncodingError

type PacketEncodingError struct {
	Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

func (PacketEncodingError) Error

func (err PacketEncodingError) Error() string

type ProtocolBody

type ProtocolBody interface {
	// contains filtered or unexported methods
}

type Request

type Request struct {
	CorrelationID int32
	ClientID      string
	Body          ProtocolBody
}

type RequestKeyVersion

type RequestKeyVersion struct {
	Length     int32
	ApiKey     int16
	ApiVersion int16
}

func (*RequestKeyVersion) ResponseHeaderVersion added in v0.2.0

func (r *RequestKeyVersion) ResponseHeaderVersion() int16

Determine response header version. Function returns -1 for unknown api key. See also public short responseHeaderVersion(short _version) in kafka/clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java

type ResponseHeader

type ResponseHeader struct {
	Length        int32
	CorrelationID int32
}

type ResponseHeaderTaggedFields added in v0.2.0

type ResponseHeaderTaggedFields struct {
	// contains filtered or unexported fields
}

func NewResponseHeaderTaggedFields added in v0.2.0

func NewResponseHeaderTaggedFields(req *RequestKeyVersion) (*ResponseHeaderTaggedFields, error)

func (*ResponseHeaderTaggedFields) MaybeRead added in v0.2.0

func (r *ResponseHeaderTaggedFields) MaybeRead(reader io.Reader) ([]byte, error)

type ResponseModifier

type ResponseModifier interface {
	Apply(resp []byte) ([]byte, error)
}

func GetResponseModifier

func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc config.NetAddressMappingFunc) (ResponseModifier, error)

type SaslAuthenticateRequestV0 added in v0.0.5

type SaslAuthenticateRequestV0 struct {
	SaslAuthBytes []byte
}

type SaslAuthenticateResponseV0 added in v0.0.5

type SaslAuthenticateResponseV0 struct {
	Err           KError
	ErrMsg        *string
	SaslAuthBytes []byte
}

type SaslHandshakeRequestV0orV1 added in v0.0.5

type SaslHandshakeRequestV0orV1 struct {
	Version   int16 // not encoded / decoded
	Mechanism string
}

type SaslHandshakeResponseV0orV1 added in v0.0.5

type SaslHandshakeResponseV0orV1 struct {
	Err               KError
	EnabledMechanisms []string
}

type Schema

type Schema interface {
	EncoderDecoder
}

func NewSchema

func NewSchema(name string, fs ...Field) Schema

NewSchema creates new schema. It panics when a duplicate field is provided

type SchemaDecodingError

type SchemaDecodingError struct {
	Info string
}

SchemaDecodingError is returned from a failure while decoding a schema .

func (SchemaDecodingError) Error

func (err SchemaDecodingError) Error() string

type SchemaEncodingError

type SchemaEncodingError struct {
	Info string
}

SchemaEncodingError is returned from a failure while encoding a schema .

func (SchemaEncodingError) Error

func (err SchemaEncodingError) Error() string

type Str

type Str struct {
}

type Struct

type Struct struct {
	// contains filtered or unexported fields
}

func DecodeSchema

func DecodeSchema(buf []byte, schema Schema) (*Struct, error)

func (Struct) Get

func (s Struct) Get(name string) interface{}

func (*Struct) Replace

func (s *Struct) Replace(name string, value interface{}) error

func (Struct) String

func (s Struct) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL