Documentation ¶
Index ¶
- Constants
- Variables
- func ResponseHeaderVersion(apiKey, apiVersion int16) int16
- func VersionedDecode(buf []byte, in versionedDecoder, version int16) (int, error)
- type AbortedTransaction
- type ByteEncoder
- type CompressionCodec
- type ConfigurationError
- type Encoder
- type FetchResponse
- type FetchResponseBlock
- type KError
- type KafkaVersion
- type Message
- type MessageBlock
- type MessageSet
- type PacketDecodingError
- type PacketEncodingError
- type ProduceRequest
- type ProtocolBody
- type Record
- type RecordBatch
- type RecordHeader
- type Records
- type Request
- type RequiredAcks
- type ResponseHeader
- type StringEncoder
- type Timestamp
- type ZstdDecoderParams
- type ZstdEncoderParams
Constants ¶
const MaxRequestSize int32 = 100 * 1024 * 1024
const MaxResponseSize int32 = 100 * 1024 * 1024
Variables ¶
var ( V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) V0_10_2_2 = newKafkaVersion(0, 10, 2, 2) V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) V1_0_1_0 = newKafkaVersion(1, 0, 1, 0) V1_0_2_0 = newKafkaVersion(1, 0, 2, 0) V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) V1_1_1_0 = newKafkaVersion(1, 1, 1, 0) V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) V2_0_1_0 = newKafkaVersion(2, 0, 1, 0) V2_1_0_0 = newKafkaVersion(2, 1, 0, 0) V2_1_1_0 = newKafkaVersion(2, 1, 1, 0) V2_2_0_0 = newKafkaVersion(2, 2, 0, 0) V2_2_1_0 = newKafkaVersion(2, 2, 1, 0) V2_2_2_0 = newKafkaVersion(2, 2, 2, 0) V2_3_0_0 = newKafkaVersion(2, 3, 0, 0) V2_3_1_0 = newKafkaVersion(2, 3, 1, 0) V2_4_0_0 = newKafkaVersion(2, 4, 0, 0) V2_4_1_0 = newKafkaVersion(2, 4, 1, 0) V2_5_0_0 = newKafkaVersion(2, 5, 0, 0) V2_5_1_0 = newKafkaVersion(2, 5, 1, 0) V2_6_0_0 = newKafkaVersion(2, 6, 0, 0) V2_6_1_0 = newKafkaVersion(2, 6, 1, 0) V2_6_2_0 = newKafkaVersion(2, 6, 2, 0) V2_6_3_0 = newKafkaVersion(2, 6, 3, 0) V2_7_0_0 = newKafkaVersion(2, 7, 0, 0) V2_7_1_0 = newKafkaVersion(2, 7, 1, 0) V2_7_2_0 = newKafkaVersion(2, 7, 2, 0) V2_8_0_0 = newKafkaVersion(2, 8, 0, 0) V2_8_1_0 = newKafkaVersion(2, 8, 1, 0) V2_8_2_0 = newKafkaVersion(2, 8, 2, 0) V3_0_0_0 = newKafkaVersion(3, 0, 0, 0) V3_0_1_0 = newKafkaVersion(3, 0, 1, 0) V3_0_2_0 = newKafkaVersion(3, 0, 2, 0) V3_1_0_0 = newKafkaVersion(3, 1, 0, 0) V3_1_1_0 = newKafkaVersion(3, 1, 1, 0) V3_1_2_0 = newKafkaVersion(3, 1, 2, 0) V3_2_0_0 = newKafkaVersion(3, 2, 0, 0) V3_2_1_0 = newKafkaVersion(3, 2, 1, 0) V3_2_2_0 = newKafkaVersion(3, 2, 2, 0) V3_2_3_0 = newKafkaVersion(3, 2, 3, 0) V3_3_0_0 = newKafkaVersion(3, 3, 0, 0) V3_3_1_0 = newKafkaVersion(3, 3, 1, 0) V3_3_2_0 = newKafkaVersion(3, 3, 2, 0) V3_4_0_0 = newKafkaVersion(3, 4, 0, 0) V3_4_1_0 = newKafkaVersion(3, 4, 1, 0) V3_5_0_0 = newKafkaVersion(3, 5, 0, 0) V3_5_1_0 = newKafkaVersion(3, 5, 1, 0) V3_6_0_0 = newKafkaVersion(3, 6, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, V0_8_2_1, V0_8_2_2, V0_9_0_0, V0_9_0_1, V0_10_0_0, V0_10_0_1, V0_10_1_0, V0_10_1_1, V0_10_2_0, V0_10_2_1, V0_10_2_2, V0_11_0_0, V0_11_0_1, V0_11_0_2, V1_0_0_0, V1_0_1_0, V1_0_2_0, V1_1_0_0, V1_1_1_0, V2_0_0_0, V2_0_1_0, V2_1_0_0, V2_1_1_0, V2_2_0_0, V2_2_1_0, V2_2_2_0, V2_3_0_0, V2_3_1_0, V2_4_0_0, V2_4_1_0, V2_5_0_0, V2_5_1_0, V2_6_0_0, V2_6_1_0, V2_6_2_0, V2_7_0_0, V2_7_1_0, V2_8_0_0, V2_8_1_0, V2_8_2_0, V3_0_0_0, V3_0_1_0, V3_0_2_0, V3_1_0_0, V3_1_1_0, V3_1_2_0, V3_2_0_0, V3_2_1_0, V3_2_2_0, V3_2_3_0, V3_3_0_0, V3_3_1_0, V3_3_2_0, V3_4_0_0, V3_4_1_0, V3_5_0_0, V3_5_1_0, V3_6_0_0, } MinVersion = V0_8_2_0 MaxVersion = V3_6_0_0 DefaultVersion = V2_1_0_0 )
Effective constants defining the supported kafka versions.
var ErrAddPartitionsToTxn = errors.New("transaction manager: failed to send partitions to transaction")
ErrAddPartitionsToTxn is returned when AddPartitionsToTxn failed multiple times
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")
ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.
var ErrCannotTransitionNilError = errors.New("transaction manager: cannot transition with a nil error")
ErrCannotTransitionNilError when transition is attempted with an nil error.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")
ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.
var ErrControllerNotAvailable = errors.New("kafka: controller is not available")
ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.
var ErrCreateACLs = errors.New("kafka server: failed to create one or more ACL rules")
ErrCreateACLs is the type of error returned when ACL creation failed
var ErrDeleteRecords = errors.New("kafka server: failed to delete records")
ErrDeleteRecords is the type of error returned when fail to delete the required records
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")
ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")
ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")
ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.
var ErrNonTransactedProducer = errors.New("transaction manager: you need to add TransactionalID to producer")
ErrNonTransactedProducer when calling BeginTxn, CommitTxn or AbortTxn on a non transactional producer.
var ErrNotConnected = errors.New("kafka: broker not connected")
ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to")
ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.
var ErrReassignPartitions = errors.New("failed to reassign partitions for topic")
ErrReassignPartitions is returned when altering partition assignments for a topic fails
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
ErrShuttingDown is returned when a producer receives a message during shutdown.
var ErrTransactionNotReady = errors.New("transaction manager: transaction is not ready")
ErrTransactionNotReady when transaction status is invalid for the current action.
var ErrTransitionNotAllowed = errors.New("transaction manager: invalid transition attempted")
ErrTransitionNotAllowed when txnmgr state transition is not valid.
var ErrTxnOffsetCommit = errors.New("transaction manager: failed to send offsets to transaction")
ErrTxnOffsetCommit is returned when TxnOffsetCommit failed multiple times
var ErrTxnUnableToParseResponse = errors.New("transaction manager: unable to parse response")
ErrTxnUnableToParseResponse when response is nil
var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")
ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism
Functions ¶
func ResponseHeaderVersion ¶
headerVersion derives the header version from the request api key and request api version
Types ¶
type AbortedTransaction ¶
type ByteEncoder ¶
type ByteEncoder []byte
ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.
func (ByteEncoder) Encode ¶
func (b ByteEncoder) Encode() ([]byte, error)
func (ByteEncoder) Length ¶
func (b ByteEncoder) Length() int
type CompressionCodec ¶
type CompressionCodec int8
CompressionCodec represents the various compression codecs recognized by Kafka in messages.
const ( // CompressionNone no compression CompressionNone CompressionCodec = iota // CompressionGZIP compression using GZIP CompressionGZIP // CompressionSnappy compression using snappy CompressionSnappy // CompressionLZ4 compression using LZ4 CompressionLZ4 // CompressionZSTD compression using ZSTD CompressionZSTD // CompressionLevelDefault is the constant to use in CompressionLevel // to have the default compression level for any codec. The value is picked // that we don't use any existing compression levels. CompressionLevelDefault = -1000 )
func (CompressionCodec) MarshalText ¶
func (cc CompressionCodec) MarshalText() ([]byte, error)
MarshalText transforms a CompressionCodec into its string representation.
func (CompressionCodec) String ¶
func (cc CompressionCodec) String() string
func (*CompressionCodec) UnmarshalText ¶
func (cc *CompressionCodec) UnmarshalText(text []byte) error
UnmarshalText returns a CompressionCodec from its string representation.
type ConfigurationError ¶
type ConfigurationError string
ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.
func (ConfigurationError) Error ¶
func (err ConfigurationError) Error() string
type Encoder ¶
Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().
type FetchResponse ¶
type FetchResponse struct { // Version defines the protocol version to use for encode and decode Version int16 // ThrottleTime contains the duration in milliseconds for which the request // was throttled due to a quota violation, or zero if the request did not // violate any quota. ThrottleTime time.Duration // ErrorCode contains the top level response error code. ErrorCode int16 // SessionID contains the fetch session ID, or 0 if this is not part of a fetch session. SessionID int32 // Blocks contains the response topics. Blocks map[string]map[int32]*FetchResponseBlock LogAppendTime bool Timestamp time.Time }
type FetchResponseBlock ¶
type FetchResponseBlock struct { // Err contains the error code, or 0 if there was no fetch error. Err KError // HighWatermarkOffset contains the current high water mark. HighWaterMarkOffset int64 // LastStableOffset contains the last stable offset (or LSO) of the // partition. This is the last offset such that the state of all // transactional records prior to this offset have been decided (ABORTED or // COMMITTED) LastStableOffset int64 LastRecordsBatchOffset *int64 // LogStartOffset contains the current log start offset. LogStartOffset int64 // AbortedTransactions contains the aborted transactions. AbortedTransactions []*AbortedTransaction // PreferredReadReplica contains the preferred read replica for the // consumer to use on its next fetch request PreferredReadReplica int32 // RecordsSet contains the record data. RecordsSet []*Records Partial bool Records *Records // deprecated: use FetchResponseBlock.RecordsSet }
type KError ¶
type KError int16
KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
const ( ErrUnknown KError = -1 // Errors.UNKNOWN_SERVER_ERROR ErrNoError KError = 0 // Errors.NONE ErrOffsetOutOfRange KError = 1 // Errors.OFFSET_OUT_OF_RANGE ErrInvalidMessage KError = 2 // Errors.CORRUPT_MESSAGE ErrUnknownTopicOrPartition KError = 3 // Errors.UNKNOWN_TOPIC_OR_PARTITION ErrInvalidMessageSize KError = 4 // Errors.INVALID_FETCH_SIZE ErrLeaderNotAvailable KError = 5 // Errors.LEADER_NOT_AVAILABLE ErrNotLeaderForPartition KError = 6 // Errors.NOT_LEADER_OR_FOLLOWER ErrRequestTimedOut KError = 7 // Errors.REQUEST_TIMED_OUT ErrBrokerNotAvailable KError = 8 // Errors.BROKER_NOT_AVAILABLE ErrReplicaNotAvailable KError = 9 // Errors.REPLICA_NOT_AVAILABLE ErrMessageSizeTooLarge KError = 10 // Errors.MESSAGE_TOO_LARGE ErrStaleControllerEpochCode KError = 11 // Errors.STALE_CONTROLLER_EPOCH ErrOffsetMetadataTooLarge KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE ErrNetworkException KError = 13 // Errors.NETWORK_EXCEPTION ErrOffsetsLoadInProgress KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS ErrConsumerCoordinatorNotAvailable KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE ErrNotCoordinatorForConsumer KError = 16 // Errors.NOT_COORDINATOR ErrInvalidTopic KError = 17 // Errors.INVALID_TOPIC_EXCEPTION ErrMessageSetSizeTooLarge KError = 18 // Errors.RECORD_LIST_TOO_LARGE ErrNotEnoughReplicas KError = 19 // Errors.NOT_ENOUGH_REPLICAS ErrNotEnoughReplicasAfterAppend KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND ErrInvalidRequiredAcks KError = 21 // Errors.INVALID_REQUIRED_ACKS ErrIllegalGeneration KError = 22 // Errors.ILLEGAL_GENERATION ErrInconsistentGroupProtocol KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL ErrInvalidGroupId KError = 24 // Errors.INVALID_GROUP_ID ErrUnknownMemberId KError = 25 // Errors.UNKNOWN_MEMBER_ID ErrInvalidSessionTimeout KError = 26 // Errors.INVALID_SESSION_TIMEOUT ErrRebalanceInProgress KError = 27 // Errors.REBALANCE_IN_PROGRESS ErrInvalidCommitOffsetSize KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE ErrTopicAuthorizationFailed KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED ErrGroupAuthorizationFailed KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED ErrClusterAuthorizationFailed KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED ErrInvalidTimestamp KError = 32 // Errors.INVALID_TIMESTAMP ErrUnsupportedSASLMechanism KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM ErrIllegalSASLState KError = 34 // Errors.ILLEGAL_SASL_STATE ErrUnsupportedVersion KError = 35 // Errors.UNSUPPORTED_VERSION ErrTopicAlreadyExists KError = 36 // Errors.TOPIC_ALREADY_EXISTS ErrInvalidPartitions KError = 37 // Errors.INVALID_PARTITIONS ErrInvalidReplicationFactor KError = 38 // Errors.INVALID_REPLICATION_FACTOR ErrInvalidReplicaAssignment KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT ErrInvalidConfig KError = 40 // Errors.INVALID_CONFIG ErrNotController KError = 41 // Errors.NOT_CONTROLLER ErrInvalidRequest KError = 42 // Errors.INVALID_REQUEST ErrUnsupportedForMessageFormat KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT ErrPolicyViolation KError = 44 // Errors.POLICY_VIOLATION ErrOutOfOrderSequenceNumber KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER ErrDuplicateSequenceNumber KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER ErrInvalidProducerEpoch KError = 47 // Errors.INVALID_PRODUCER_EPOCH ErrInvalidTxnState KError = 48 // Errors.INVALID_TXN_STATE ErrInvalidProducerIDMapping KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING ErrInvalidTransactionTimeout KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT ErrConcurrentTransactions KError = 51 // Errors.CONCURRENT_TRANSACTIONS ErrTransactionCoordinatorFenced KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ErrSecurityDisabled KError = 54 // Errors.SECURITY_DISABLED ErrOperationNotAttempted KError = 55 // Errors.OPERATION_NOT_ATTEMPTED ErrKafkaStorageError KError = 56 // Errors.KAFKA_STORAGE_ERROR ErrLogDirNotFound KError = 57 // Errors.LOG_DIR_NOT_FOUND ErrSASLAuthenticationFailed KError = 58 // Errors.SASL_AUTHENTICATION_FAILED ErrUnknownProducerID KError = 59 // Errors.UNKNOWN_PRODUCER_ID ErrReassignmentInProgress KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS ErrDelegationTokenAuthDisabled KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED ErrDelegationTokenNotFound KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND ErrDelegationTokenOwnerMismatch KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH ErrDelegationTokenRequestNotAllowed KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED ErrDelegationTokenExpired KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED ErrInvalidPrincipalType KError = 67 // Errors.INVALID_PRINCIPAL_TYPE ErrNonEmptyGroup KError = 68 // Errors.NON_EMPTY_GROUP ErrGroupIDNotFound KError = 69 // Errors.GROUP_ID_NOT_FOUND ErrFetchSessionIDNotFound KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND ErrInvalidFetchSessionEpoch KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH ErrListenerNotFound KError = 72 // Errors.LISTENER_NOT_FOUND ErrTopicDeletionDisabled KError = 73 // Errors.TOPIC_DELETION_DISABLED ErrFencedLeaderEpoch KError = 74 // Errors.FENCED_LEADER_EPOCH ErrUnknownLeaderEpoch KError = 75 // Errors.UNKNOWN_LEADER_EPOCH ErrUnsupportedCompressionType KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE ErrStaleBrokerEpoch KError = 77 // Errors.STALE_BROKER_EPOCH ErrOffsetNotAvailable KError = 78 // Errors.OFFSET_NOT_AVAILABLE ErrMemberIdRequired KError = 79 // Errors.MEMBER_ID_REQUIRED ErrPreferredLeaderNotAvailable KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE ErrGroupMaxSizeReached KError = 81 // Errors.GROUP_MAX_SIZE_REACHED ErrFencedInstancedId KError = 82 // Errors.FENCED_INSTANCE_ID ErrEligibleLeadersNotAvailable KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE ErrElectionNotNeeded KError = 84 // Errors.ELECTION_NOT_NEEDED ErrNoReassignmentInProgress KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS ErrGroupSubscribedToTopic KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC ErrInvalidRecord KError = 87 // Errors.INVALID_RECORD ErrUnstableOffsetCommit KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT ErrThrottlingQuotaExceeded KError = 89 // Errors.THROTTLING_QUOTA_EXCEEDED ErrProducerFenced KError = 90 // Errors.PRODUCER_FENCED )
Numeric error codes returned by the Kafka server.
type KafkaVersion ¶
type KafkaVersion struct {
// contains filtered or unexported fields
}
KafkaVersion instances represent versions of the upstream Kafka broker.
func ParseKafkaVersion ¶
func ParseKafkaVersion(s string) (KafkaVersion, error)
ParseKafkaVersion parses and returns kafka version or error from a string
func (KafkaVersion) IsAtLeast ¶
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool
IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:
V1.IsAtLeast(V2) // false V2.IsAtLeast(V1) // true
func (KafkaVersion) String ¶
func (v KafkaVersion) String() string
type Message ¶
type Message struct { Codec CompressionCodec // codec used to compress the message contents CompressionLevel int // compression level LogAppendTime bool // the used timestamp is LogAppendTime Key []byte // the message key, may be nil Value []byte // the message contents Set *MessageSet // the message set a message might wrap Version int8 // v1 requires Kafka 0.10 Timestamp time.Time // the timestamp of the message (version 1+ only) }
Message is a kafka message type
type MessageBlock ¶
func (*MessageBlock) Messages ¶
func (msb *MessageBlock) Messages() []*MessageBlock
Messages convenience helper which returns either all the messages that are wrapped in this block
type MessageSet ¶
type MessageSet struct { PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock OverflowMessage bool // whether the set on the wire contained an overflow message Messages []*MessageBlock }
type PacketDecodingError ¶
type PacketDecodingError struct {
Info string
}
PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.
func (PacketDecodingError) Error ¶
func (err PacketDecodingError) Error() string
type PacketEncodingError ¶
type PacketEncodingError struct {
Info string
}
PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
func (PacketEncodingError) Error ¶
func (err PacketEncodingError) Error() string
type ProduceRequest ¶
type ProtocolBody ¶
type ProtocolBody interface {
// contains filtered or unexported methods
}
type Record ¶
type Record struct { Headers []*RecordHeader Attributes int8 TimestampDelta time.Duration OffsetDelta int64 Key []byte Value []byte // contains filtered or unexported fields }
Record is kafka record type
type RecordBatch ¶
type RecordBatch struct { FirstOffset int64 PartitionLeaderEpoch int32 Version int8 Codec CompressionCodec CompressionLevel int Control bool LogAppendTime bool LastOffsetDelta int32 FirstTimestamp time.Time MaxTimestamp time.Time ProducerID int64 ProducerEpoch int16 FirstSequence int32 Records []*Record PartialTrailingRecord bool IsTransactional bool // contains filtered or unexported fields }
func (*RecordBatch) LastOffset ¶
func (b *RecordBatch) LastOffset() int64
type RecordHeader ¶
RecordHeader stores key and value for a record header
type Records ¶
type Records struct { MsgSet *MessageSet RecordBatch *RecordBatch // contains filtered or unexported fields }
Records implements a union type containing either a RecordBatch or a legacy MessageSet.
type Request ¶
type Request struct { CorrelationID int32 ClientID string Body ProtocolBody }
type RequiredAcks ¶
type RequiredAcks int16
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).
const ( // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // WaitForAll waits for all in-sync replicas to commit before responding. // The minimum number of in-sync replicas is configured on the broker via // the `min.insync.replicas` configuration key. WaitForAll RequiredAcks = -1 )
type ResponseHeader ¶
type StringEncoder ¶
type StringEncoder string
StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.
func (StringEncoder) Encode ¶
func (s StringEncoder) Encode() ([]byte, error)
func (StringEncoder) Length ¶
func (s StringEncoder) Length() int
type ZstdDecoderParams ¶
type ZstdDecoderParams struct { }
type ZstdEncoderParams ¶
type ZstdEncoderParams struct {
Level int
}