kafka

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 7, 2024 License: AGPL-3.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const MaxRequestSize int32 = 100 * 1024 * 1024
View Source
const MaxResponseSize int32 = 100 * 1024 * 1024

Variables

View Source
var (
	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
	V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
	V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
	V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
	V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
	V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
	V0_10_2_2 = newKafkaVersion(0, 10, 2, 2)
	V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
	V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
	V1_0_1_0  = newKafkaVersion(1, 0, 1, 0)
	V1_0_2_0  = newKafkaVersion(1, 0, 2, 0)
	V1_1_0_0  = newKafkaVersion(1, 1, 0, 0)
	V1_1_1_0  = newKafkaVersion(1, 1, 1, 0)
	V2_0_0_0  = newKafkaVersion(2, 0, 0, 0)
	V2_0_1_0  = newKafkaVersion(2, 0, 1, 0)
	V2_1_0_0  = newKafkaVersion(2, 1, 0, 0)
	V2_1_1_0  = newKafkaVersion(2, 1, 1, 0)
	V2_2_0_0  = newKafkaVersion(2, 2, 0, 0)
	V2_2_1_0  = newKafkaVersion(2, 2, 1, 0)
	V2_2_2_0  = newKafkaVersion(2, 2, 2, 0)
	V2_3_0_0  = newKafkaVersion(2, 3, 0, 0)
	V2_3_1_0  = newKafkaVersion(2, 3, 1, 0)
	V2_4_0_0  = newKafkaVersion(2, 4, 0, 0)
	V2_4_1_0  = newKafkaVersion(2, 4, 1, 0)
	V2_5_0_0  = newKafkaVersion(2, 5, 0, 0)
	V2_5_1_0  = newKafkaVersion(2, 5, 1, 0)
	V2_6_0_0  = newKafkaVersion(2, 6, 0, 0)
	V2_6_1_0  = newKafkaVersion(2, 6, 1, 0)
	V2_6_2_0  = newKafkaVersion(2, 6, 2, 0)
	V2_6_3_0  = newKafkaVersion(2, 6, 3, 0)
	V2_7_0_0  = newKafkaVersion(2, 7, 0, 0)
	V2_7_1_0  = newKafkaVersion(2, 7, 1, 0)
	V2_7_2_0  = newKafkaVersion(2, 7, 2, 0)
	V2_8_0_0  = newKafkaVersion(2, 8, 0, 0)
	V2_8_1_0  = newKafkaVersion(2, 8, 1, 0)
	V2_8_2_0  = newKafkaVersion(2, 8, 2, 0)
	V3_0_0_0  = newKafkaVersion(3, 0, 0, 0)
	V3_0_1_0  = newKafkaVersion(3, 0, 1, 0)
	V3_0_2_0  = newKafkaVersion(3, 0, 2, 0)
	V3_1_0_0  = newKafkaVersion(3, 1, 0, 0)
	V3_1_1_0  = newKafkaVersion(3, 1, 1, 0)
	V3_1_2_0  = newKafkaVersion(3, 1, 2, 0)
	V3_2_0_0  = newKafkaVersion(3, 2, 0, 0)
	V3_2_1_0  = newKafkaVersion(3, 2, 1, 0)
	V3_2_2_0  = newKafkaVersion(3, 2, 2, 0)
	V3_2_3_0  = newKafkaVersion(3, 2, 3, 0)
	V3_3_0_0  = newKafkaVersion(3, 3, 0, 0)
	V3_3_1_0  = newKafkaVersion(3, 3, 1, 0)
	V3_3_2_0  = newKafkaVersion(3, 3, 2, 0)
	V3_4_0_0  = newKafkaVersion(3, 4, 0, 0)
	V3_4_1_0  = newKafkaVersion(3, 4, 1, 0)
	V3_5_0_0  = newKafkaVersion(3, 5, 0, 0)
	V3_5_1_0  = newKafkaVersion(3, 5, 1, 0)
	V3_6_0_0  = newKafkaVersion(3, 6, 0, 0)

	SupportedVersions = []KafkaVersion{
		V0_8_2_0,
		V0_8_2_1,
		V0_8_2_2,
		V0_9_0_0,
		V0_9_0_1,
		V0_10_0_0,
		V0_10_0_1,
		V0_10_1_0,
		V0_10_1_1,
		V0_10_2_0,
		V0_10_2_1,
		V0_10_2_2,
		V0_11_0_0,
		V0_11_0_1,
		V0_11_0_2,
		V1_0_0_0,
		V1_0_1_0,
		V1_0_2_0,
		V1_1_0_0,
		V1_1_1_0,
		V2_0_0_0,
		V2_0_1_0,
		V2_1_0_0,
		V2_1_1_0,
		V2_2_0_0,
		V2_2_1_0,
		V2_2_2_0,
		V2_3_0_0,
		V2_3_1_0,
		V2_4_0_0,
		V2_4_1_0,
		V2_5_0_0,
		V2_5_1_0,
		V2_6_0_0,
		V2_6_1_0,
		V2_6_2_0,
		V2_7_0_0,
		V2_7_1_0,
		V2_8_0_0,
		V2_8_1_0,
		V2_8_2_0,
		V3_0_0_0,
		V3_0_1_0,
		V3_0_2_0,
		V3_1_0_0,
		V3_1_1_0,
		V3_1_2_0,
		V3_2_0_0,
		V3_2_1_0,
		V3_2_2_0,
		V3_2_3_0,
		V3_3_0_0,
		V3_3_1_0,
		V3_3_2_0,
		V3_4_0_0,
		V3_4_1_0,
		V3_5_0_0,
		V3_5_1_0,
		V3_6_0_0,
	}
	MinVersion     = V0_8_2_0
	MaxVersion     = V3_6_0_0
	DefaultVersion = V2_1_0_0
)

Effective constants defining the supported kafka versions.

View Source
var ErrAddPartitionsToTxn = errors.New("transaction manager: failed to send partitions to transaction")

ErrAddPartitionsToTxn is returned when AddPartitionsToTxn failed multiple times

View Source
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")

ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.

View Source
var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")

ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.

View Source
var ErrCannotTransitionNilError = errors.New("transaction manager: cannot transition with a nil error")

ErrCannotTransitionNilError when transition is attempted with an nil error.

View Source
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

ErrClosedClient is the error returned when a method is called on a client that has been closed.

View Source
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")

ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.

View Source
var ErrControllerNotAvailable = errors.New("kafka: controller is not available")

ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.

View Source
var ErrCreateACLs = errors.New("kafka server: failed to create one or more ACL rules")

ErrCreateACLs is the type of error returned when ACL creation failed

View Source
var ErrDeleteRecords = errors.New("kafka server: failed to delete records")

ErrDeleteRecords is the type of error returned when fail to delete the required records

View Source
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")

ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected")

ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")

ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

View Source
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max

View Source
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")

ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.

View Source
var ErrNonTransactedProducer = errors.New("transaction manager: you need to add TransactionalID to producer")

ErrNonTransactedProducer when calling BeginTxn, CommitTxn or AbortTxn on a non transactional producer.

View Source
var ErrNotConnected = errors.New("kafka: broker not connected")

ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

View Source
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to")

ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

View Source
var ErrReassignPartitions = errors.New("failed to reassign partitions for topic")

ErrReassignPartitions is returned when altering partition assignments for a topic fails

View Source
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")

ErrShuttingDown is returned when a producer receives a message during shutdown.

View Source
var ErrTransactionNotReady = errors.New("transaction manager: transaction is not ready")

ErrTransactionNotReady when transaction status is invalid for the current action.

View Source
var ErrTransitionNotAllowed = errors.New("transaction manager: invalid transition attempted")

ErrTransitionNotAllowed when txnmgr state transition is not valid.

View Source
var ErrTxnOffsetCommit = errors.New("transaction manager: failed to send offsets to transaction")

ErrTxnOffsetCommit is returned when TxnOffsetCommit failed multiple times

View Source
var ErrTxnUnableToParseResponse = errors.New("transaction manager: unable to parse response")

ErrTxnUnableToParseResponse when response is nil

View Source
var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")

ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism

Functions

func ResponseHeaderVersion

func ResponseHeaderVersion(apiKey, apiVersion int16) int16

headerVersion derives the header version from the request api key and request api version

func VersionedDecode

func VersionedDecode(buf []byte, in versionedDecoder, version int16) (int, error)

Types

type AbortedTransaction

type AbortedTransaction struct {
	// ProducerID contains the producer id associated with the aborted transaction.
	ProducerID int64
	// FirstOffset contains the first offset in the aborted transaction.
	FirstOffset int64
}

type ByteEncoder

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.

func (ByteEncoder) Encode

func (b ByteEncoder) Encode() ([]byte, error)

func (ByteEncoder) Length

func (b ByteEncoder) Length() int

type CompressionCodec

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
	// CompressionNone no compression
	CompressionNone CompressionCodec = iota
	// CompressionGZIP compression using GZIP
	CompressionGZIP
	// CompressionSnappy compression using snappy
	CompressionSnappy
	// CompressionLZ4 compression using LZ4
	CompressionLZ4
	// CompressionZSTD compression using ZSTD
	CompressionZSTD

	// CompressionLevelDefault is the constant to use in CompressionLevel
	// to have the default compression level for any codec. The value is picked
	// that we don't use any existing compression levels.
	CompressionLevelDefault = -1000
)

func (CompressionCodec) MarshalText

func (cc CompressionCodec) MarshalText() ([]byte, error)

MarshalText transforms a CompressionCodec into its string representation.

func (CompressionCodec) String

func (cc CompressionCodec) String() string

func (*CompressionCodec) UnmarshalText

func (cc *CompressionCodec) UnmarshalText(text []byte) error

UnmarshalText returns a CompressionCodec from its string representation.

type ConfigurationError

type ConfigurationError string

ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.

func (ConfigurationError) Error

func (err ConfigurationError) Error() string

type Encoder

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

type FetchResponse

type FetchResponse struct {
	// Version defines the protocol version to use for encode and decode
	Version int16
	// ThrottleTime contains the duration in milliseconds for which the request
	// was throttled due to a quota violation, or zero if the request did not
	// violate any quota.
	ThrottleTime time.Duration
	// ErrorCode contains the top level response error code.
	ErrorCode int16
	// SessionID contains the fetch session ID, or 0 if this is not part of a fetch session.
	SessionID int32
	// Blocks contains the response topics.
	Blocks map[string]map[int32]*FetchResponseBlock

	LogAppendTime bool
	Timestamp     time.Time
}

type FetchResponseBlock

type FetchResponseBlock struct {
	// Err contains the error code, or 0 if there was no fetch error.
	Err KError
	// HighWatermarkOffset contains the current high water mark.
	HighWaterMarkOffset int64
	// LastStableOffset contains the last stable offset (or LSO) of the
	// partition. This is the last offset such that the state of all
	// transactional records prior to this offset have been decided (ABORTED or
	// COMMITTED)
	LastStableOffset       int64
	LastRecordsBatchOffset *int64
	// LogStartOffset contains the current log start offset.
	LogStartOffset int64
	// AbortedTransactions contains the aborted transactions.
	AbortedTransactions []*AbortedTransaction
	// PreferredReadReplica contains the preferred read replica for the
	// consumer to use on its next fetch request
	PreferredReadReplica int32
	// RecordsSet contains the record data.
	RecordsSet []*Records

	Partial bool
	Records *Records // deprecated: use FetchResponseBlock.RecordsSet
}

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
	ErrUnknown                            KError = -1 // Errors.UNKNOWN_SERVER_ERROR
	ErrNoError                            KError = 0  // Errors.NONE
	ErrOffsetOutOfRange                   KError = 1  // Errors.OFFSET_OUT_OF_RANGE
	ErrInvalidMessage                     KError = 2  // Errors.CORRUPT_MESSAGE
	ErrUnknownTopicOrPartition            KError = 3  // Errors.UNKNOWN_TOPIC_OR_PARTITION
	ErrInvalidMessageSize                 KError = 4  // Errors.INVALID_FETCH_SIZE
	ErrLeaderNotAvailable                 KError = 5  // Errors.LEADER_NOT_AVAILABLE
	ErrNotLeaderForPartition              KError = 6  // Errors.NOT_LEADER_OR_FOLLOWER
	ErrRequestTimedOut                    KError = 7  // Errors.REQUEST_TIMED_OUT
	ErrBrokerNotAvailable                 KError = 8  // Errors.BROKER_NOT_AVAILABLE
	ErrReplicaNotAvailable                KError = 9  // Errors.REPLICA_NOT_AVAILABLE
	ErrMessageSizeTooLarge                KError = 10 // Errors.MESSAGE_TOO_LARGE
	ErrStaleControllerEpochCode           KError = 11 // Errors.STALE_CONTROLLER_EPOCH
	ErrOffsetMetadataTooLarge             KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE
	ErrNetworkException                   KError = 13 // Errors.NETWORK_EXCEPTION
	ErrOffsetsLoadInProgress              KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS
	ErrConsumerCoordinatorNotAvailable    KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE
	ErrNotCoordinatorForConsumer          KError = 16 // Errors.NOT_COORDINATOR
	ErrInvalidTopic                       KError = 17 // Errors.INVALID_TOPIC_EXCEPTION
	ErrMessageSetSizeTooLarge             KError = 18 // Errors.RECORD_LIST_TOO_LARGE
	ErrNotEnoughReplicas                  KError = 19 // Errors.NOT_ENOUGH_REPLICAS
	ErrNotEnoughReplicasAfterAppend       KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
	ErrInvalidRequiredAcks                KError = 21 // Errors.INVALID_REQUIRED_ACKS
	ErrIllegalGeneration                  KError = 22 // Errors.ILLEGAL_GENERATION
	ErrInconsistentGroupProtocol          KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL
	ErrInvalidGroupId                     KError = 24 // Errors.INVALID_GROUP_ID
	ErrUnknownMemberId                    KError = 25 // Errors.UNKNOWN_MEMBER_ID
	ErrInvalidSessionTimeout              KError = 26 // Errors.INVALID_SESSION_TIMEOUT
	ErrRebalanceInProgress                KError = 27 // Errors.REBALANCE_IN_PROGRESS
	ErrInvalidCommitOffsetSize            KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE
	ErrTopicAuthorizationFailed           KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED
	ErrGroupAuthorizationFailed           KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED
	ErrClusterAuthorizationFailed         KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED
	ErrInvalidTimestamp                   KError = 32 // Errors.INVALID_TIMESTAMP
	ErrUnsupportedSASLMechanism           KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM
	ErrIllegalSASLState                   KError = 34 // Errors.ILLEGAL_SASL_STATE
	ErrUnsupportedVersion                 KError = 35 // Errors.UNSUPPORTED_VERSION
	ErrTopicAlreadyExists                 KError = 36 // Errors.TOPIC_ALREADY_EXISTS
	ErrInvalidPartitions                  KError = 37 // Errors.INVALID_PARTITIONS
	ErrInvalidReplicationFactor           KError = 38 // Errors.INVALID_REPLICATION_FACTOR
	ErrInvalidReplicaAssignment           KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT
	ErrInvalidConfig                      KError = 40 // Errors.INVALID_CONFIG
	ErrNotController                      KError = 41 // Errors.NOT_CONTROLLER
	ErrInvalidRequest                     KError = 42 // Errors.INVALID_REQUEST
	ErrUnsupportedForMessageFormat        KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT
	ErrPolicyViolation                    KError = 44 // Errors.POLICY_VIOLATION
	ErrOutOfOrderSequenceNumber           KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER
	ErrDuplicateSequenceNumber            KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER
	ErrInvalidProducerEpoch               KError = 47 // Errors.INVALID_PRODUCER_EPOCH
	ErrInvalidTxnState                    KError = 48 // Errors.INVALID_TXN_STATE
	ErrInvalidProducerIDMapping           KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING
	ErrInvalidTransactionTimeout          KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT
	ErrConcurrentTransactions             KError = 51 // Errors.CONCURRENT_TRANSACTIONS
	ErrTransactionCoordinatorFenced       KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED
	ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
	ErrSecurityDisabled                   KError = 54 // Errors.SECURITY_DISABLED
	ErrOperationNotAttempted              KError = 55 // Errors.OPERATION_NOT_ATTEMPTED
	ErrKafkaStorageError                  KError = 56 // Errors.KAFKA_STORAGE_ERROR
	ErrLogDirNotFound                     KError = 57 // Errors.LOG_DIR_NOT_FOUND
	ErrSASLAuthenticationFailed           KError = 58 // Errors.SASL_AUTHENTICATION_FAILED
	ErrUnknownProducerID                  KError = 59 // Errors.UNKNOWN_PRODUCER_ID
	ErrReassignmentInProgress             KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS
	ErrDelegationTokenAuthDisabled        KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED
	ErrDelegationTokenNotFound            KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND
	ErrDelegationTokenOwnerMismatch       KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH
	ErrDelegationTokenRequestNotAllowed   KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
	ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
	ErrDelegationTokenExpired             KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED
	ErrInvalidPrincipalType               KError = 67 // Errors.INVALID_PRINCIPAL_TYPE
	ErrNonEmptyGroup                      KError = 68 // Errors.NON_EMPTY_GROUP
	ErrGroupIDNotFound                    KError = 69 // Errors.GROUP_ID_NOT_FOUND
	ErrFetchSessionIDNotFound             KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND
	ErrInvalidFetchSessionEpoch           KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH
	ErrListenerNotFound                   KError = 72 // Errors.LISTENER_NOT_FOUND
	ErrTopicDeletionDisabled              KError = 73 // Errors.TOPIC_DELETION_DISABLED
	ErrFencedLeaderEpoch                  KError = 74 // Errors.FENCED_LEADER_EPOCH
	ErrUnknownLeaderEpoch                 KError = 75 // Errors.UNKNOWN_LEADER_EPOCH
	ErrUnsupportedCompressionType         KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE
	ErrStaleBrokerEpoch                   KError = 77 // Errors.STALE_BROKER_EPOCH
	ErrOffsetNotAvailable                 KError = 78 // Errors.OFFSET_NOT_AVAILABLE
	ErrMemberIdRequired                   KError = 79 // Errors.MEMBER_ID_REQUIRED
	ErrPreferredLeaderNotAvailable        KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE
	ErrGroupMaxSizeReached                KError = 81 // Errors.GROUP_MAX_SIZE_REACHED
	ErrFencedInstancedId                  KError = 82 // Errors.FENCED_INSTANCE_ID
	ErrEligibleLeadersNotAvailable        KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
	ErrElectionNotNeeded                  KError = 84 // Errors.ELECTION_NOT_NEEDED
	ErrNoReassignmentInProgress           KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS
	ErrGroupSubscribedToTopic             KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC
	ErrInvalidRecord                      KError = 87 // Errors.INVALID_RECORD
	ErrUnstableOffsetCommit               KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT
	ErrThrottlingQuotaExceeded            KError = 89 // Errors.THROTTLING_QUOTA_EXCEEDED
	ErrProducerFenced                     KError = 90 // Errors.PRODUCER_FENCED
)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type KafkaVersion

type KafkaVersion struct {
	// contains filtered or unexported fields
}

KafkaVersion instances represent versions of the upstream Kafka broker.

func ParseKafkaVersion

func ParseKafkaVersion(s string) (KafkaVersion, error)

ParseKafkaVersion parses and returns kafka version or error from a string

func (KafkaVersion) IsAtLeast

func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool

IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:

V1.IsAtLeast(V2) // false
V2.IsAtLeast(V1) // true

func (KafkaVersion) String

func (v KafkaVersion) String() string

type Message

type Message struct {
	Codec            CompressionCodec // codec used to compress the message contents
	CompressionLevel int              // compression level
	LogAppendTime    bool             // the used timestamp is LogAppendTime
	Key              []byte           // the message key, may be nil
	Value            []byte           // the message contents
	Set              *MessageSet      // the message set a message might wrap
	Version          int8             // v1 requires Kafka 0.10
	Timestamp        time.Time        // the timestamp of the message (version 1+ only)

}

Message is a kafka message type

type MessageBlock

type MessageBlock struct {
	Offset int64
	Msg    *Message
}

func (*MessageBlock) Messages

func (msb *MessageBlock) Messages() []*MessageBlock

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet

type MessageSet struct {
	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
	OverflowMessage        bool // whether the set on the wire contained an overflow message
	Messages               []*MessageBlock
}

type PacketDecodingError

type PacketDecodingError struct {
	Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (PacketDecodingError) Error

func (err PacketDecodingError) Error() string

type PacketEncodingError

type PacketEncodingError struct {
	Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

func (PacketEncodingError) Error

func (err PacketEncodingError) Error() string

type ProduceRequest

type ProduceRequest struct {
	TransactionalID *string
	RequiredAcks    RequiredAcks
	Timeout         int32
	Version         int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
	Records         map[string]map[int32]Records
}

type ProtocolBody

type ProtocolBody interface {
	// contains filtered or unexported methods
}

type Record

type Record struct {
	Headers []*RecordHeader

	Attributes     int8
	TimestampDelta time.Duration
	OffsetDelta    int64
	Key            []byte
	Value          []byte
	// contains filtered or unexported fields
}

Record is kafka record type

type RecordBatch

type RecordBatch struct {
	FirstOffset           int64
	PartitionLeaderEpoch  int32
	Version               int8
	Codec                 CompressionCodec
	CompressionLevel      int
	Control               bool
	LogAppendTime         bool
	LastOffsetDelta       int32
	FirstTimestamp        time.Time
	MaxTimestamp          time.Time
	ProducerID            int64
	ProducerEpoch         int16
	FirstSequence         int32
	Records               []*Record
	PartialTrailingRecord bool
	IsTransactional       bool
	// contains filtered or unexported fields
}

func (*RecordBatch) LastOffset

func (b *RecordBatch) LastOffset() int64

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

RecordHeader stores key and value for a record header

type Records

type Records struct {
	MsgSet      *MessageSet
	RecordBatch *RecordBatch
	// contains filtered or unexported fields
}

Records implements a union type containing either a RecordBatch or a legacy MessageSet.

type Request

type Request struct {
	CorrelationID int32
	ClientID      string
	Body          ProtocolBody
}

func DecodeRequest

func DecodeRequest(r io.Reader) (*Request, int, error)

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

const (
	// NoResponse doesn't send any response, the TCP ACK is all you get.
	NoResponse RequiredAcks = 0
	// WaitForLocal waits for only the local commit to succeed before responding.
	WaitForLocal RequiredAcks = 1
	// WaitForAll waits for all in-sync replicas to commit before responding.
	// The minimum number of in-sync replicas is configured on the broker via
	// the `min.insync.replicas` configuration key.
	WaitForAll RequiredAcks = -1
)

type ResponseHeader

type ResponseHeader struct {
	Length        int32
	CorrelationID int32
}

type StringEncoder

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

func (StringEncoder) Encode

func (s StringEncoder) Encode() ([]byte, error)

func (StringEncoder) Length

func (s StringEncoder) Length() int

type Timestamp

type Timestamp struct {
	*time.Time
}

type ZstdDecoderParams

type ZstdDecoderParams struct {
}

type ZstdEncoderParams

type ZstdEncoderParams struct {
	Level int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL