Documentation ¶
Index ¶
- Variables
- func Decode(buf []byte, in decoder) error
- func Encode(e encoder) ([]byte, error)
- func EncodeSchema(s *Struct, schema Schema) ([]byte, error)
- type Array
- type Bool
- type ByteReader
- type CompactArray
- type CompactNullableArray
- type CompactNullableStr
- type CompactStr
- type EncoderDecoder
- type Field
- type Int16
- type Int32
- type KError
- type Mfield
- type NullableStr
- type PacketDecodingError
- type PacketEncodingError
- type ProtocolBody
- type Request
- type RequestAcksReader
- type RequestKeyVersion
- type RequestV2
- type ResponseHeader
- type ResponseHeaderTaggedFields
- type ResponseHeaderV1
- type ResponseModifier
- type SaslAuthenticateRequestV0
- type SaslAuthenticateRequestV1
- type SaslAuthenticateRequestV2
- type SaslAuthenticateResponseV0
- type SaslAuthenticateResponseV1
- type SaslAuthenticateResponseV2
- type SaslHandshakeRequestV0orV1
- type SaslHandshakeResponseV0orV1
- type Schema
- type SchemaDecodingError
- type SchemaEncodingError
- type SchemaTaggedFields
- type Str
- type Struct
- type TaggedFields
- type Uuid
Constants ¶
This section is empty.
Variables ¶
var ( TypeBool = &Bool{} TypeInt16 = &Int16{} TypeInt32 = &Int32{} TypeStr = &Str{} TypeNullableStr = &NullableStr{} TypeCompactStr = &CompactStr{} TypeCompactNullableStr = &CompactNullableStr{} TypeUuid = &Uuid{} )
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.
var MaxRequestSize int32 = 100 * 1024 * 1024
MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying to send a request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt to process.
var MaxResponseSize int32 = 100 * 1024 * 1024
MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to protect the client from running out of memory. Please note that brokers do not have any natural limit on the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers (see https://issues.apache.org/jira/browse/KAFKA-2063).
Functions ¶
func Decode ¶
Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes, interpreted using Kafka's encoding rules.
Types ¶
type ByteReader ¶ added in v0.2.0
func (ByteReader) ReadByte ¶ added in v0.2.0
func (r ByteReader) ReadByte() (byte, error)
type CompactArray ¶ added in v0.2.9
func (*CompactArray) GetName ¶ added in v0.2.9
func (f *CompactArray) GetName() string
func (*CompactArray) GetSchema ¶ added in v0.2.9
func (f *CompactArray) GetSchema() Schema
type CompactNullableArray ¶ added in v0.2.9
type CompactNullableArray struct { Name string Ty EncoderDecoder }
func (*CompactNullableArray) GetName ¶ added in v0.2.9
func (f *CompactNullableArray) GetName() string
type CompactNullableStr ¶ added in v0.2.0
type CompactNullableStr struct{}
func (*CompactNullableStr) GetFields ¶ added in v0.2.4
func (f *CompactNullableStr) GetFields() []boundField
func (*CompactNullableStr) GetFieldsByName ¶ added in v0.2.4
func (f *CompactNullableStr) GetFieldsByName() map[string]*boundField
func (*CompactNullableStr) GetName ¶ added in v0.2.4
func (f *CompactNullableStr) GetName() string
type CompactStr ¶ added in v0.2.0
type CompactStr struct { }
func (*CompactStr) GetFields ¶ added in v0.2.4
func (f *CompactStr) GetFields() []boundField
func (*CompactStr) GetFieldsByName ¶ added in v0.2.4
func (f *CompactStr) GetFieldsByName() map[string]*boundField
func (*CompactStr) GetName ¶ added in v0.2.4
func (f *CompactStr) GetName() string
type EncoderDecoder ¶
type EncoderDecoder interface {
// contains filtered or unexported methods
}
type Field ¶
type Field interface { EncoderDecoder GetName() string GetSchema() Schema }
type KError ¶
type KError int16
KError is the type of error that can be returned directly by the Kafka broker. See http://kafka.apache.org/protocol.html#protocol_error_codes
const ( ErrNoError KError = 0 ErrUnknown KError = -1 ErrOffsetOutOfRange KError = 1 ErrInvalidMessage KError = 2 ErrUnknownTopicOrPartition KError = 3 ErrInvalidMessageSize KError = 4 ErrLeaderNotAvailable KError = 5 ErrNotLeaderForPartition KError = 6 ErrRequestTimedOut KError = 7 ErrBrokerNotAvailable KError = 8 ErrReplicaNotAvailable KError = 9 ErrMessageSizeTooLarge KError = 10 ErrStaleControllerEpochCode KError = 11 ErrOffsetMetadataTooLarge KError = 12 ErrNetworkException KError = 13 ErrOffsetsLoadInProgress KError = 14 ErrConsumerCoordinatorNotAvailable KError = 15 ErrNotCoordinatorForConsumer KError = 16 ErrInvalidTopic KError = 17 ErrMessageSetSizeTooLarge KError = 18 ErrNotEnoughReplicas KError = 19 ErrNotEnoughReplicasAfterAppend KError = 20 ErrInvalidRequiredAcks KError = 21 ErrIllegalGeneration KError = 22 ErrInconsistentGroupProtocol KError = 23 ErrInvalidGroupId KError = 24 ErrUnknownMemberId KError = 25 ErrInvalidSessionTimeout KError = 26 ErrRebalanceInProgress KError = 27 ErrInvalidCommitOffsetSize KError = 28 ErrTopicAuthorizationFailed KError = 29 ErrGroupAuthorizationFailed KError = 30 ErrClusterAuthorizationFailed KError = 31 ErrInvalidTimestamp KError = 32 ErrUnsupportedSASLMechanism KError = 33 ErrIllegalSASLState KError = 34 ErrUnsupportedVersion KError = 35 ErrTopicAlreadyExists KError = 36 ErrInvalidPartitions KError = 37 ErrInvalidReplicationFactor KError = 38 ErrInvalidReplicaAssignment KError = 39 ErrInvalidConfig KError = 40 ErrNotController KError = 41 ErrInvalidRequest KError = 42 ErrUnsupportedForMessageFormat KError = 43 ErrPolicyViolation KError = 44 ErrOutOfOrderSequenceNumber KError = 45 ErrDuplicateSequenceNumber KError = 46 ErrInvalidProducerEpoch KError = 47 ErrInvalidTxnState KError = 48 ErrInvalidProducerIDMapping KError = 49 ErrInvalidTransactionTimeout KError = 50 ErrConcurrentTransactions KError = 51 ErrTransactionCoordinatorFenced KError = 52 ErrTransactionalIDAuthorizationFailed KError = 53 ErrSecurityDisabled KError = 54 ErrOperationNotAttempted KError = 55 ErrKafkaStorageError KError = 56 ErrLogDirNotFound KError = 57 ErrSASLAuthenticationFailed KError = 58 ErrUnknownProducerID KError = 59 ErrReassignmentInProgress KError = 60 )
Numeric error codes returned by the Kafka server.
type NullableStr ¶
type NullableStr struct{}
func (*NullableStr) GetFields ¶ added in v0.2.4
func (f *NullableStr) GetFields() []boundField
func (*NullableStr) GetFieldsByName ¶ added in v0.2.4
func (f *NullableStr) GetFieldsByName() map[string]*boundField
func (*NullableStr) GetName ¶ added in v0.2.4
func (f *NullableStr) GetName() string
type PacketDecodingError ¶
type PacketDecodingError struct {
Info string
}
PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.
func (PacketDecodingError) Error ¶
func (err PacketDecodingError) Error() string
type PacketEncodingError ¶
type PacketEncodingError struct {
Info string
}
PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
func (PacketEncodingError) Error ¶
func (err PacketEncodingError) Error() string
type ProtocolBody ¶
type ProtocolBody interface {
// contains filtered or unexported methods
}
type Request ¶
type Request struct { CorrelationID int32 ClientID string Body ProtocolBody }
type RequestAcksReader ¶ added in v0.2.5
type RequestAcksReader struct { }
func (RequestAcksReader) ReadAndDiscardHeaderV1Part ¶ added in v0.2.5
func (r RequestAcksReader) ReadAndDiscardHeaderV1Part(reader io.Reader) (err error)
func (RequestAcksReader) ReadAndDiscardProduceAcks ¶ added in v0.2.5
func (r RequestAcksReader) ReadAndDiscardProduceAcks(reader io.Reader) (acks int16, err error)
func (RequestAcksReader) ReadAndDiscardProduceTxnAcks ¶ added in v0.2.5
func (r RequestAcksReader) ReadAndDiscardProduceTxnAcks(reader io.Reader) (acks int16, err error)
type RequestKeyVersion ¶
func (*RequestKeyVersion) ResponseHeaderVersion ¶ added in v0.2.0
func (r *RequestKeyVersion) ResponseHeaderVersion() int16
Determine response header version. Function returns -1 for unknown api key. See also public short responseHeaderVersion(short _version) in kafka/clients/src/generated/java/org/apache/kafka/common/message/ApiMessageType.java
type RequestV2 ¶ added in v0.2.4
type RequestV2 struct { CorrelationID int32 ClientID string TaggedFields TaggedFields Body ProtocolBody }
type ResponseHeader ¶
type ResponseHeaderTaggedFields ¶ added in v0.2.0
type ResponseHeaderTaggedFields struct {
// contains filtered or unexported fields
}
func NewResponseHeaderTaggedFields ¶ added in v0.2.0
func NewResponseHeaderTaggedFields(req *RequestKeyVersion) (*ResponseHeaderTaggedFields, error)
type ResponseHeaderV1 ¶ added in v0.2.4
type ResponseModifier ¶
func GetResponseModifier ¶
func GetResponseModifier(apiKey int16, apiVersion int16, addressMappingFunc config.NetAddressMappingFunc) (ResponseModifier, error)
type SaslAuthenticateRequestV0 ¶ added in v0.0.5
type SaslAuthenticateRequestV0 struct {
SaslAuthBytes []byte
}
type SaslAuthenticateRequestV1 ¶ added in v0.2.4
type SaslAuthenticateRequestV1 struct {
SaslAuthBytes []byte
}
type SaslAuthenticateRequestV2 ¶ added in v0.2.4
type SaslAuthenticateRequestV2 struct { SaslAuthBytes []byte TaggedFields TaggedFields }
type SaslAuthenticateResponseV0 ¶ added in v0.0.5
type SaslAuthenticateResponseV1 ¶ added in v0.2.4
type SaslAuthenticateResponseV2 ¶ added in v0.2.4
type SaslAuthenticateResponseV2 struct { Err KError ErrMsg *string SaslAuthBytes []byte SessionLifetimeMs int64 TaggedFields TaggedFields }
type SaslHandshakeRequestV0orV1 ¶ added in v0.0.5
type SaslHandshakeResponseV0orV1 ¶ added in v0.0.5
type Schema ¶
type Schema interface { EncoderDecoder GetFields() []boundField GetFieldsByName() map[string]*boundField GetName() string }
type SchemaDecodingError ¶
type SchemaDecodingError struct {
Info string
}
SchemaDecodingError is returned from a failure while decoding a schema .
func (SchemaDecodingError) Error ¶
func (err SchemaDecodingError) Error() string
type SchemaEncodingError ¶
type SchemaEncodingError struct {
Info string
}
SchemaEncodingError is returned from a failure while encoding a schema .
func (SchemaEncodingError) Error ¶
func (err SchemaEncodingError) Error() string
type SchemaTaggedFields ¶ added in v0.2.9
type SchemaTaggedFields struct {
Name string
}
func (SchemaTaggedFields) GetName ¶ added in v0.2.9
func (f SchemaTaggedFields) GetName() string
func (SchemaTaggedFields) GetSchema ¶ added in v0.2.9
func (f SchemaTaggedFields) GetSchema() Schema
type TaggedFields ¶ added in v0.2.4
type TaggedFields struct {
// contains filtered or unexported fields
}
Source Files ¶
- encoder_decoder.go
- errors.go
- packet_decoder.go
- packet_encoder.go
- prep_encoder.go
- real_decoder.go
- real_encoder.go
- request_key_version.go
- request_produce_reader.go
- request_v1.go
- request_v2.go
- response_header.go
- response_header_v1.go
- responses.go
- sasl_authenticate_v0.go
- sasl_authenticate_v1.go
- sasl_authenticate_v2.go
- sasl_handshake.go
- schema.go
- tagged_fields.go