Documentation
¶
Overview ¶
Package protocol is a pure Go library for dealing with Apache Kafka (versions 0.8 and later). It includes a low-level API for controlling bytes on the wire and the ability to encode and decode both requests and responses.
Example (EncodeAndDecodeRequest) ¶
The following example shows how to use the package to encode and subsequently decode a Request.
package main import ( "bytes" "fmt" "github.com/MadsRC/gofka/protocol" ) func main() { request := protocol.Request{ CorrelationID: 123, ClientID: "anID", Body: &protocol.ApiVersionsRequest{ Version: 3, ClientSoftwareName: "gofka", ClientSoftwareVersion: "0.10.0", }, } packet, err := protocol.Encode(&request) if err != nil { panic(err) } fmt.Printf("On-wire request: %v\n", packet) decoded, _, err := protocol.DecodeRequest(bytes.NewReader(packet)) if err != nil { panic(err) } body := decoded.Body.(*protocol.ApiVersionsRequest) fmt.Printf("Correlation ID: %d, Client ID: %s\n", decoded.CorrelationID, decoded.ClientID) fmt.Printf("Body: Version %d, ClientSoftwareName %s, ClientSoftwareVersion %s\n", body.Version, body.ClientSoftwareName, body.ClientSoftwareVersion) }
Output: On-wire request: [0 0 0 29 0 18 0 3 0 0 0 123 0 4 97 110 73 68 0 6 103 111 102 107 97 7 48 46 49 48 46 48 0] Correlation ID: 123, Client ID: anID Body: Version 3, ClientSoftwareName gofka, ClientSoftwareVersion 0.10.0
Index ¶
- Constants
- Variables
- func Decode(buf []byte, in decoder) error
- func Encode(e encoder) ([]byte, error)
- type AbortedTransaction
- type Acl
- type AclCreation
- type AclCreationResponse
- type AclFilter
- type AclOperation
- type AclPermissionType
- type AclResourcePatternType
- type AclResourceType
- type AddOffsetsToTxnRequest
- type AddOffsetsToTxnResponse
- type AddPartitionsToTxnRequest
- type AddPartitionsToTxnResponse
- type AlterClientQuotasEntry
- type AlterClientQuotasEntryResponse
- type AlterClientQuotasRequest
- type AlterClientQuotasResponse
- type AlterConfigError
- type AlterConfigsRequest
- type AlterConfigsResource
- type AlterConfigsResourceResponse
- type AlterConfigsResponse
- type AlterPartitionReassignmentsRequest
- type AlterPartitionReassignmentsResponse
- type AlterUserScramCredentialsDelete
- type AlterUserScramCredentialsRequest
- type AlterUserScramCredentialsResponse
- type AlterUserScramCredentialsResult
- type AlterUserScramCredentialsUpsert
- type ApiVersionsRequest
- type ApiVersionsResponse
- type ApiVersionsResponseKey
- type Broker
- type ByteEncoder
- type ClientQuotasOp
- type CompressionCodec
- type ConfigEntry
- type ConfigResource
- type ConfigResourceType
- type ConfigSource
- type ConfigSynonym
- type ConfigurationError
- type ConsumerGroupMemberAssignment
- type ConsumerGroupMemberMetadata
- type ConsumerMetadataRequest
- type ConsumerMetadataResponse
- type ControlRecord
- type ControlRecordType
- type CoordinatorType
- type CreateAclsRequest
- type CreateAclsResponse
- type CreatePartitionsRequest
- type CreatePartitionsResponse
- type CreateTopicsRequest
- type CreateTopicsResponse
- type DeleteAclsRequest
- type DeleteAclsResponse
- type DeleteGroupsRequest
- type DeleteGroupsResponse
- type DeleteOffsetsRequest
- type DeleteOffsetsResponse
- type DeleteRecordsRequest
- type DeleteRecordsRequestTopic
- type DeleteRecordsResponse
- type DeleteRecordsResponsePartition
- type DeleteRecordsResponseTopic
- type DeleteTopicsRequest
- type DeleteTopicsResponse
- type DescribeAclsRequest
- type DescribeAclsResponse
- type DescribeClientQuotasEntry
- type DescribeClientQuotasRequest
- type DescribeClientQuotasResponse
- type DescribeConfigError
- type DescribeConfigsRequest
- type DescribeConfigsResponse
- type DescribeGroupsRequest
- type DescribeGroupsResponse
- type DescribeLogDirsRequest
- type DescribeLogDirsRequestTopic
- type DescribeLogDirsResponse
- type DescribeLogDirsResponseDirMetadata
- type DescribeLogDirsResponsePartition
- type DescribeLogDirsResponseTopic
- type DescribeUserScramCredentialsRequest
- type DescribeUserScramCredentialsRequestUser
- type DescribeUserScramCredentialsResponse
- type DescribeUserScramCredentialsResult
- type Encoder
- type EndTxnRequest
- type EndTxnResponse
- type FetchRequest
- type FetchResponse
- func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, ...)
- func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, ...)
- func (r *FetchResponse) AddError(topic string, partition int32, err KError)
- func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)
- func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...)
- func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64)
- func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, ...)
- func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...)
- func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...)
- func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock
- func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32)
- func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64)
- type FetchResponseBlock
- type FilterResponse
- type FindCoordinatorRequest
- type FindCoordinatorResponse
- type GroupData
- type GroupDescription
- type GroupMember
- type GroupMemberDescription
- type GroupProtocol
- type HeartbeatRequest
- type HeartbeatResponse
- type IncrementalAlterConfigsEntry
- type IncrementalAlterConfigsOperation
- type IncrementalAlterConfigsRequest
- type IncrementalAlterConfigsResource
- type IncrementalAlterConfigsResponse
- type InitProducerIDRequest
- type InitProducerIDResponse
- type IsolationLevel
- type JoinGroupRequest
- type JoinGroupResponse
- type KError
- type KafkaVersion
- type LeaveGroupRequest
- type LeaveGroupResponse
- type ListGroupsRequest
- type ListGroupsResponse
- type ListPartitionReassignmentsRequest
- type ListPartitionReassignmentsResponse
- type MatchingAcl
- type MemberIdentity
- type MemberResponse
- type Message
- type MessageBlock
- type MessageSet
- type MetadataRequest
- type MetadataResponse
- type OffsetCommitRequest
- func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, ...)
- func (r *OffsetCommitRequest) AddBlockWithLeaderEpoch(topic string, partitionID int32, offset int64, leaderEpoch int32, ...)
- func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error)
- type OffsetCommitResponse
- type OffsetFetchRequest
- type OffsetFetchResponse
- type OffsetFetchResponseBlock
- type OffsetRequest
- type OffsetResponse
- type OffsetResponseBlock
- type OwnedPartition
- type PacketDecodingError
- type PacketEncodingError
- type PartitionError
- type PartitionMetadata
- type PartitionOffsetMetadata
- type PartitionReplicaReassignmentsStatus
- type ProduceRequest
- type ProduceResponse
- type ProduceResponseBlock
- type QuotaEntityComponent
- type QuotaEntityType
- type QuotaFilterComponent
- type QuotaMatchType
- type Record
- type RecordBatch
- type RecordHeader
- type Records
- type Request
- type RequiredAcks
- type Resource
- type ResourceAcls
- type ResourceResponse
- type SaslAuthenticateRequest
- type SaslAuthenticateResponse
- type SaslHandshakeRequest
- type SaslHandshakeResponse
- type ScramMechanismType
- type StdLogger
- type StringEncoder
- type SyncGroupRequest
- type SyncGroupRequestAssignment
- type SyncGroupResponse
- type Timestamp
- type TopicDetail
- type TopicError
- type TopicMetadata
- type TopicPartition
- type TopicPartitionError
- type TxnOffsetCommitRequest
- type TxnOffsetCommitResponse
- type UserScramCredentialsResponseInfo
- type Uuid
- type ZstdDecoderParams
- type ZstdEncoderParams
Examples ¶
Constants ¶
const ( // SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism. SASLTypeSCRAMSHA256 = "SCRAM-SHA-256" // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512" )
const ( SCRAM_MECHANISM_UNKNOWN = iota // 0 SCRAM_MECHANISM_SHA_256 // 1 SCRAM_MECHANISM_SHA_512 // 2 )
const APIKeySASLAuth = 36
APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API
const ReceiveTime int64 = -1
ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the Request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
Variables ¶
var ( // Logger is the instance of a StdLogger interface that Sarama writes connection // management events to. By default it is set to discard all log messages via io.Discard, // but you can set it to redirect wherever you want. Logger StdLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags) // MaxRequestSize is the maximum size (in bytes) of any Request that Sarama will attempt to send. Trying // to send a Request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned // with Kafka's default `socket.Request.max.bytes`, which is the largest Request the broker will attempt // to process. MaxRequestSize int32 = 100 * 1024 * 1024 // MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If // a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to // protect the client from running out of memory. Please note that brokers do not have any natural limit on // the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers // (see https://issues.apache.org/jira/browse/KAFKA-2063). MaxResponseSize int32 = 100 * 1024 * 1024 )
var ( V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) V0_10_2_2 = newKafkaVersion(0, 10, 2, 2) V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) V1_0_1_0 = newKafkaVersion(1, 0, 1, 0) V1_0_2_0 = newKafkaVersion(1, 0, 2, 0) V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) V1_1_1_0 = newKafkaVersion(1, 1, 1, 0) V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) V2_0_1_0 = newKafkaVersion(2, 0, 1, 0) V2_1_0_0 = newKafkaVersion(2, 1, 0, 0) V2_1_1_0 = newKafkaVersion(2, 1, 1, 0) V2_2_0_0 = newKafkaVersion(2, 2, 0, 0) V2_2_1_0 = newKafkaVersion(2, 2, 1, 0) V2_2_2_0 = newKafkaVersion(2, 2, 2, 0) V2_3_0_0 = newKafkaVersion(2, 3, 0, 0) V2_3_1_0 = newKafkaVersion(2, 3, 1, 0) V2_4_0_0 = newKafkaVersion(2, 4, 0, 0) V2_4_1_0 = newKafkaVersion(2, 4, 1, 0) V2_5_0_0 = newKafkaVersion(2, 5, 0, 0) V2_5_1_0 = newKafkaVersion(2, 5, 1, 0) V2_6_0_0 = newKafkaVersion(2, 6, 0, 0) V2_6_1_0 = newKafkaVersion(2, 6, 1, 0) V2_6_2_0 = newKafkaVersion(2, 6, 2, 0) V2_6_3_0 = newKafkaVersion(2, 6, 3, 0) V2_7_0_0 = newKafkaVersion(2, 7, 0, 0) V2_7_1_0 = newKafkaVersion(2, 7, 1, 0) V2_7_2_0 = newKafkaVersion(2, 7, 2, 0) V2_8_0_0 = newKafkaVersion(2, 8, 0, 0) V2_8_1_0 = newKafkaVersion(2, 8, 1, 0) V2_8_2_0 = newKafkaVersion(2, 8, 2, 0) V3_0_0_0 = newKafkaVersion(3, 0, 0, 0) V3_0_1_0 = newKafkaVersion(3, 0, 1, 0) V3_0_2_0 = newKafkaVersion(3, 0, 2, 0) V3_1_0_0 = newKafkaVersion(3, 1, 0, 0) V3_1_1_0 = newKafkaVersion(3, 1, 1, 0) V3_1_2_0 = newKafkaVersion(3, 1, 2, 0) V3_2_0_0 = newKafkaVersion(3, 2, 0, 0) V3_2_1_0 = newKafkaVersion(3, 2, 1, 0) V3_2_2_0 = newKafkaVersion(3, 2, 2, 0) V3_2_3_0 = newKafkaVersion(3, 2, 3, 0) V3_3_0_0 = newKafkaVersion(3, 3, 0, 0) V3_3_1_0 = newKafkaVersion(3, 3, 1, 0) V3_3_2_0 = newKafkaVersion(3, 3, 2, 0) V3_4_0_0 = newKafkaVersion(3, 4, 0, 0) V3_4_1_0 = newKafkaVersion(3, 4, 1, 0) V3_5_0_0 = newKafkaVersion(3, 5, 0, 0) V3_5_1_0 = newKafkaVersion(3, 5, 1, 0) V3_6_0_0 = newKafkaVersion(3, 6, 0, 0) SupportedVersions = []KafkaVersion{ V0_8_2_0, V0_8_2_1, V0_8_2_2, V0_9_0_0, V0_9_0_1, V0_10_0_0, V0_10_0_1, V0_10_1_0, V0_10_1_1, V0_10_2_0, V0_10_2_1, V0_10_2_2, V0_11_0_0, V0_11_0_1, V0_11_0_2, V1_0_0_0, V1_0_1_0, V1_0_2_0, V1_1_0_0, V1_1_1_0, V2_0_0_0, V2_0_1_0, V2_1_0_0, V2_1_1_0, V2_2_0_0, V2_2_1_0, V2_2_2_0, V2_3_0_0, V2_3_1_0, V2_4_0_0, V2_4_1_0, V2_5_0_0, V2_5_1_0, V2_6_0_0, V2_6_1_0, V2_6_2_0, V2_7_0_0, V2_7_1_0, V2_8_0_0, V2_8_1_0, V2_8_2_0, V3_0_0_0, V3_0_1_0, V3_0_2_0, V3_1_0_0, V3_1_1_0, V3_1_2_0, V3_2_0_0, V3_2_1_0, V3_2_2_0, V3_2_3_0, V3_3_0_0, V3_3_1_0, V3_3_2_0, V3_4_0_0, V3_4_1_0, V3_5_0_0, V3_5_1_0, V3_6_0_0, } MinVersion = V0_8_2_0 MaxVersion = V3_6_0_0 DefaultVersion = V2_1_0_0 )
Effective constants defining the supported kafka versions.
var ErrAddPartitionsToTxn = errors.New("transaction manager: failed to send partitions to transaction")
ErrAddPartitionsToTxn is returned when AddPartitionsToTxn failed multiple times
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")
ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.
var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")
ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.
var ErrCannotTransitionNilError = errors.New("transaction manager: cannot transition with a nil error")
ErrCannotTransitionNilError when transition is attempted with an nil error.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")
ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.
var ErrControllerNotAvailable = errors.New("kafka: controller is not available")
ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.
var ErrCreateACLs = errors.New("kafka server: failed to create one or more ACL rules")
ErrCreateACLs is the type of error returned when ACL creation failed
var ErrDeleteRecords = errors.New("kafka server: failed to delete Records")
ErrDeleteRecords is the type of error returned when fail to delete the required Records
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")
ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.
var ErrInsufficientData = errors.New("kafka: insufficient data to Decode packet, more bytes expected")
ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")
ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")
ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.
var ErrNonTransactedProducer = errors.New("transaction manager: you need to add TransactionalID to producer")
ErrNonTransactedProducer when calling BeginTxn, CommitTxn or AbortTxn on a non transactional producer.
var ErrNotConnected = errors.New("kafka: broker not connected")
ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to")
ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.
var ErrReassignPartitions = errors.New("failed to reassign partitions for topic")
ErrReassignPartitions is returned when altering partition assignments for a topic fails
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")
ErrShuttingDown is returned when a producer receives a message during shutdown.
var ErrTransactionNotReady = errors.New("transaction manager: transaction is not ready")
ErrTransactionNotReady when transaction status is invalid for the current action.
var ErrTransitionNotAllowed = errors.New("transaction manager: invalid transition attempted")
ErrTransitionNotAllowed when txnmgr state transition is not valid.
var ErrTxnOffsetCommit = errors.New("transaction manager: failed to send offsets to transaction")
ErrTxnOffsetCommit is returned when TxnOffsetCommit failed multiple times
var ErrTxnUnableToParseResponse = errors.New("transaction manager: unable to parse response")
ErrTxnUnableToParseResponse when response is nil
var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")
ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism
var NoNode = &Broker{id: -1, addr: ":-1"}
var NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
Functions ¶
func Decode ¶
Decode takes bytes and a decoder and fills the fields of the decoder from the bytes, interpreted using Kafka's encoding rules.
Example (Request) ¶
ExampleDecode_request shows how to decode a Request from raw bytes. Note how, when we call Decode, that the function does not expect the payload to be prefixed with the length of the payload.
If you're reading directly from a byte stream, an alternative is to use DecodeRequest which will read the length of the payload and then call Decode on the payload.
length := int32(binary.BigEndian.Uint32(rawRequest[0:4])) bytesRead := 4 encodedReq := make([]byte, length) encodedReq = rawRequest[4:] bytesRead += len(encodedReq) req := &Request{} err := Decode(encodedReq, req) if err != nil { panic(err) } fmt.Println("On-wire request:", rawRequest) fmt.Printf("Correlation ID: %d, Client ID: %s\n", req.CorrelationID, req.ClientID) fmt.Printf("Body: Version %d, ClientSoftwareName %s, ClientSoftwareVersion %s\n", req.Body.version(), req.Body.(*ApiVersionsRequest).ClientSoftwareName, req.Body.(*ApiVersionsRequest).ClientSoftwareVersion)
Output: On-wire request: [0 0 0 29 0 18 0 3 0 0 0 123 0 4 97 110 73 68 0 6 103 111 102 107 97 7 48 46 49 48 46 48 0] Correlation ID: 123, Client ID: anID Body: Version 3, ClientSoftwareName gofka, ClientSoftwareVersion 0.10.0
func Encode ¶
Encode takes an Encoder and turns it into bytes.
Example (Request) ¶
ExampleEncode_request shows how to encode a Request into raw bytes.
req := &Request{ CorrelationID: 123, ClientID: "anID", Body: &ApiVersionsRequest{ Version: 3, ClientSoftwareName: "gofka", ClientSoftwareVersion: "0.10.0", }, } encodedReq, err := Encode(req) if err != nil { panic(err) } fmt.Println("On-wire request:", encodedReq)
Output: On-wire request: [0 0 0 29 0 18 0 3 0 0 0 123 0 4 97 110 73 68 0 6 103 111 102 107 97 7 48 46 49 48 46 48 0]
Example (RequestBody) ¶
ExampleEncode_requestBody shows how to encode a particular request body into raw bytes.
avr := &ApiVersionsRequest{ Version: 3, ClientSoftwareName: "gofka", ClientSoftwareVersion: "0.10.0", } encodedReq, err := Encode(avr) if err != nil { panic(err) } fmt.Println("On-wire request:", encodedReq)
Output: On-wire request: [6 103 111 102 107 97 7 48 46 49 48 46 48 0]
Types ¶
type AbortedTransaction ¶
type Acl ¶
type Acl struct { Principal string Host string Operation AclOperation PermissionType AclPermissionType }
Acl holds information about acl type
type AclCreation ¶
AclCreation is a wrapper around Resource and Acl type
type AclCreationResponse ¶
AclCreationResponse is an acl creation response type
type AclFilter ¶
type AclFilter struct { Version int ResourceType AclResourceType ResourceName *string ResourcePatternTypeFilter AclResourcePatternType Principal *string Host *string Operation AclOperation PermissionType AclPermissionType }
type AclOperation ¶
type AclOperation int
const ( AclOperationUnknown AclOperation = iota AclOperationAny AclOperationAll AclOperationRead AclOperationWrite AclOperationCreate AclOperationDelete AclOperationAlter AclOperationDescribe AclOperationClusterAction AclOperationDescribeConfigs AclOperationAlterConfigs AclOperationIdempotentWrite )
func (*AclOperation) MarshalText ¶
func (a *AclOperation) MarshalText() ([]byte, error)
MarshalText returns the text form of the AclOperation (name without prefix)
func (*AclOperation) String ¶
func (a *AclOperation) String() string
func (*AclOperation) UnmarshalText ¶
func (a *AclOperation) UnmarshalText(text []byte) error
UnmarshalText takes a text representation of the operation and converts it to an AclOperation
type AclPermissionType ¶
type AclPermissionType int
const ( AclPermissionUnknown AclPermissionType = iota AclPermissionAny AclPermissionDeny AclPermissionAllow )
func (*AclPermissionType) MarshalText ¶
func (a *AclPermissionType) MarshalText() ([]byte, error)
MarshalText returns the text form of the AclPermissionType (name without prefix)
func (*AclPermissionType) String ¶
func (a *AclPermissionType) String() string
func (*AclPermissionType) UnmarshalText ¶
func (a *AclPermissionType) UnmarshalText(text []byte) error
UnmarshalText takes a text representation of the permission type and converts it to an AclPermissionType
type AclResourcePatternType ¶
type AclResourcePatternType int
const ( AclPatternUnknown AclResourcePatternType = iota AclPatternAny AclPatternMatch AclPatternLiteral AclPatternPrefixed )
func (*AclResourcePatternType) MarshalText ¶
func (a *AclResourcePatternType) MarshalText() ([]byte, error)
MarshalText returns the text form of the AclResourcePatternType (name without prefix)
func (*AclResourcePatternType) String ¶
func (a *AclResourcePatternType) String() string
func (*AclResourcePatternType) UnmarshalText ¶
func (a *AclResourcePatternType) UnmarshalText(text []byte) error
UnmarshalText takes a text representation of the resource pattern type and converts it to an AclResourcePatternType
type AclResourceType ¶
type AclResourceType int
const ( AclResourceUnknown AclResourceType = iota AclResourceAny AclResourceTopic AclResourceGroup AclResourceCluster AclResourceTransactionalID AclResourceDelegationToken )
func (*AclResourceType) MarshalText ¶
func (a *AclResourceType) MarshalText() ([]byte, error)
MarshalText returns the text form of the AclResourceType (name without prefix)
func (*AclResourceType) String ¶
func (a *AclResourceType) String() string
func (*AclResourceType) UnmarshalText ¶
func (a *AclResourceType) UnmarshalText(text []byte) error
UnmarshalText takes a text representation of the resource type and converts it to an AclResourceType
type AddOffsetsToTxnRequest ¶
type AddOffsetsToTxnRequest struct { Version int16 TransactionalID string ProducerID int64 ProducerEpoch int16 GroupID string }
AddOffsetsToTxnRequest adds offsets to a transaction Request
type AddOffsetsToTxnResponse ¶
AddOffsetsToTxnResponse is a response type for adding offsets to txns
type AddPartitionsToTxnRequest ¶
type AddPartitionsToTxnRequest struct { Version int16 TransactionalID string ProducerID int64 ProducerEpoch int16 TopicPartitions map[string][]int32 }
AddPartitionsToTxnRequest is a add partition Request
type AddPartitionsToTxnResponse ¶
type AddPartitionsToTxnResponse struct { Version int16 ThrottleTime time.Duration Errors map[string][]*PartitionError }
AddPartitionsToTxnResponse is a partition errors to transaction type
type AlterClientQuotasEntry ¶
type AlterClientQuotasEntry struct { Entity []QuotaEntityComponent // The quota entity to alter. Ops []ClientQuotasOp // An individual quota configuration entry to alter. }
type AlterClientQuotasEntryResponse ¶
type AlterClientQuotasEntryResponse struct { ErrorCode KError // The error code, or `0` if the quota alteration succeeded. ErrorMsg *string // The error message, or `null` if the quota alteration succeeded. Entity []QuotaEntityComponent // The quota entity altered. }
type AlterClientQuotasRequest ¶
type AlterClientQuotasRequest struct { Version int16 Entries []AlterClientQuotasEntry // The quota configuration entries to alter. ValidateOnly bool // Whether the alteration should be validated, but not performed. }
type AlterClientQuotasResponse ¶
type AlterClientQuotasResponse struct { Version int16 ThrottleTime time.Duration // The duration in milliseconds for which the Request was throttled due to a quota violation, or zero if the Request did not violate any quota. Entries []AlterClientQuotasEntryResponse // The quota configuration entries altered. }
type AlterConfigError ¶
func (*AlterConfigError) Error ¶
func (c *AlterConfigError) Error() string
type AlterConfigsRequest ¶
type AlterConfigsRequest struct { Version int16 Resources []*AlterConfigsResource ValidateOnly bool }
AlterConfigsRequest is an alter config Request type
type AlterConfigsResource ¶
type AlterConfigsResource struct { Type ConfigResourceType Name string ConfigEntries map[string]*string }
AlterConfigsResource is an alter config resource type
type AlterConfigsResourceResponse ¶
type AlterConfigsResourceResponse struct { ErrorCode int16 ErrorMsg string Type ConfigResourceType Name string }
AlterConfigsResourceResponse is a response type for alter config resource
type AlterConfigsResponse ¶
type AlterConfigsResponse struct { Version int16 ThrottleTime time.Duration Resources []*AlterConfigsResourceResponse }
AlterConfigsResponse is a response type for alter config
type AlterPartitionReassignmentsRequest ¶
type AlterPartitionReassignmentsResponse ¶
type AlterUserScramCredentialsDelete ¶
type AlterUserScramCredentialsDelete struct { Name string Mechanism ScramMechanismType }
type AlterUserScramCredentialsRequest ¶
type AlterUserScramCredentialsRequest struct { Version int16 // Deletions represent list of SCRAM credentials to remove Deletions []AlterUserScramCredentialsDelete // Upsertions represent list of SCRAM credentials to update/insert Upsertions []AlterUserScramCredentialsUpsert }
type AlterUserScramCredentialsResponse ¶
type AlterUserScramCredentialsResponse struct { Version int16 ThrottleTime time.Duration Results []*AlterUserScramCredentialsResult }
type AlterUserScramCredentialsUpsert ¶
type AlterUserScramCredentialsUpsert struct { Name string Mechanism ScramMechanismType Iterations int32 Salt []byte // This field is never transmitted over the wire // @see: https://tools.ietf.org/html/rfc5802 Password []byte // contains filtered or unexported fields }
type ApiVersionsRequest ¶
type ApiVersionsResponse ¶
type ApiVersionsResponse struct { // Version defines the protocol version to use for Encode and Decode Version int16 // ErrorCode contains the top-level error code. ErrorCode int16 // ApiKeys contains the APIs supported by the broker. ApiKeys []ApiVersionsResponseKey // ThrottleTimeMs contains the duration in milliseconds for which the Request was throttled due to a quota violation, or zero if the Request did not violate any quota. ThrottleTimeMs int32 }
type ApiVersionsResponseKey ¶
type ApiVersionsResponseKey struct { // Version defines the protocol version to use for Encode and Decode Version int16 // ApiKey contains the API index. ApiKey int16 // MinVersion contains the minimum supported version, inclusive. MinVersion int16 // MaxVersion contains the maximum supported version, inclusive. MaxVersion int16 }
ApiVersionsResponseKey contains the APIs supported by the broker.
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
type ByteEncoder ¶
type ByteEncoder []byte
ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.
func (ByteEncoder) Encode ¶
func (b ByteEncoder) Encode() ([]byte, error)
func (ByteEncoder) Length ¶
func (b ByteEncoder) Length() int
type ClientQuotasOp ¶
type CompressionCodec ¶
type CompressionCodec int8
CompressionCodec represents the various compression codecs recognized by Kafka in messages.
const ( // CompressionNone no compression CompressionNone CompressionCodec = iota // CompressionGZIP compression using GZIP CompressionGZIP // CompressionSnappy compression using snappy CompressionSnappy // CompressionLZ4 compression using LZ4 CompressionLZ4 // CompressionZSTD compression using ZSTD CompressionZSTD // CompressionLevelDefault is the constant to use in CompressionLevel // to have the default compression level for any codec. The value is picked // that we don't use any existing compression levels. CompressionLevelDefault = -1000 )
func (CompressionCodec) MarshalText ¶
func (cc CompressionCodec) MarshalText() ([]byte, error)
MarshalText transforms a CompressionCodec into its string representation.
func (CompressionCodec) String ¶
func (cc CompressionCodec) String() string
func (*CompressionCodec) UnmarshalText ¶
func (cc *CompressionCodec) UnmarshalText(text []byte) error
UnmarshalText returns a CompressionCodec from its string representation.
type ConfigEntry ¶
type ConfigEntry struct { Name string Value string ReadOnly bool Default bool Source ConfigSource Sensitive bool Synonyms []*ConfigSynonym }
type ConfigResource ¶
type ConfigResource struct { Type ConfigResourceType Name string ConfigNames []string }
type ConfigResourceType ¶
type ConfigResourceType int8
ConfigResourceType is a type for resources that have configs.
const ( // TopicResource constant type TopicResource ConfigResourceType = 2 )
type ConfigSource ¶
type ConfigSource int8
const ( SourceUnknown ConfigSource = iota SourceTopic SourceDynamicBroker SourceDynamicDefaultBroker SourceStaticBroker SourceDefault )
func (ConfigSource) String ¶
func (s ConfigSource) String() string
type ConfigSynonym ¶
type ConfigSynonym struct { ConfigName string ConfigValue string Source ConfigSource }
type ConfigurationError ¶
type ConfigurationError string
ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.
func (ConfigurationError) Error ¶
func (err ConfigurationError) Error() string
type ConsumerGroupMemberAssignment ¶
type ConsumerGroupMemberAssignment struct { Version int16 Topics map[string][]int32 UserData []byte }
ConsumerGroupMemberAssignment holds the member assignment for a consume group https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json
type ConsumerGroupMemberMetadata ¶
type ConsumerGroupMemberMetadata struct { Version int16 Topics []string UserData []byte OwnedPartitions []*OwnedPartition GenerationID int32 RackID *string }
ConsumerGroupMemberMetadata holds the metadata for consumer group https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json
type ConsumerMetadataRequest ¶
ConsumerMetadataRequest is used for metadata requests
type ConsumerMetadataResponse ¶
type ConsumerMetadataResponse struct { Version int16 Err KError Coordinator *Broker CoordinatorID int32 // deprecated: use Coordinator.ID() CoordinatorHost string // deprecated: use Coordinator.Addr() CoordinatorPort int32 // deprecated: use Coordinator.Addr() }
ConsumerMetadataResponse holds the response for a consumer group meta data requests
type ControlRecord ¶
type ControlRecord struct { Version int16 CoordinatorEpoch int32 Type ControlRecordType }
Control Records are returned as a record by fetchRequest However unlike "normal" Records, they mean nothing application wise. They only serve internal logic for supporting transactions.
type ControlRecordType ¶
type ControlRecordType int
ControlRecordType ...
const ( // ControlRecordAbort is a control record for abort ControlRecordAbort ControlRecordType = iota // ControlRecordCommit is a control record for commit ControlRecordCommit // ControlRecordUnknown is a control record of unknown type ControlRecordUnknown )
type CoordinatorType ¶
type CoordinatorType int8
const ( CoordinatorGroup CoordinatorType = iota CoordinatorTransaction )
type CreateAclsRequest ¶
type CreateAclsRequest struct { Version int16 AclCreations []*AclCreation }
CreateAclsRequest is an acl creation Request
type CreateAclsResponse ¶
type CreateAclsResponse struct { Version int16 ThrottleTime time.Duration AclCreationResponses []*AclCreationResponse }
CreateAclsResponse is a an acl response creation type
type CreatePartitionsRequest ¶
type CreatePartitionsResponse ¶
type CreatePartitionsResponse struct { Version int16 ThrottleTime time.Duration TopicPartitionErrors map[string]*TopicPartitionError }
type CreateTopicsRequest ¶
type CreateTopicsRequest struct { // Version defines the protocol version to use for Encode and Decode Version int16 // TopicDetails contains the topics to create. TopicDetails map[string]*TopicDetail // Timeout contains how long to wait before timing out the Request. Timeout time.Duration // ValidateOnly if true, check that the topics can be created as specified, // but don't create anything. ValidateOnly bool }
type CreateTopicsResponse ¶
type CreateTopicsResponse struct { // Version defines the protocol version to use for Encode and Decode Version int16 // ThrottleTime contains the duration for which the Request was throttled due // to a quota violation, or zero if the Request did not violate any quota. ThrottleTime time.Duration // TopicErrors contains a map of any errors for the topics we tried to create. TopicErrors map[string]*TopicError }
type DeleteAclsRequest ¶
DeleteAclsRequest is a delete acl Request
type DeleteAclsResponse ¶
type DeleteAclsResponse struct { Version int16 ThrottleTime time.Duration FilterResponses []*FilterResponse }
DeleteAclsResponse is a delete acl response
type DeleteGroupsRequest ¶
func (*DeleteGroupsRequest) AddGroup ¶
func (r *DeleteGroupsRequest) AddGroup(group string)
type DeleteGroupsResponse ¶
type DeleteOffsetsRequest ¶
type DeleteOffsetsRequest struct { Version int16 Group string // contains filtered or unexported fields }
func (*DeleteOffsetsRequest) AddPartition ¶
func (r *DeleteOffsetsRequest) AddPartition(topic string, partitionID int32)
type DeleteOffsetsResponse ¶
type DeleteRecordsRequest ¶
type DeleteRecordsRequest struct { Version int16 Topics map[string]*DeleteRecordsRequestTopic Timeout time.Duration }
type DeleteRecordsResponse ¶
type DeleteRecordsResponse struct { Version int16 ThrottleTime time.Duration Topics map[string]*DeleteRecordsResponseTopic }
type DeleteRecordsResponseTopic ¶
type DeleteRecordsResponseTopic struct {
Partitions map[int32]*DeleteRecordsResponsePartition
}
type DeleteTopicsRequest ¶
type DeleteTopicsResponse ¶
type DescribeAclsRequest ¶
DescribeAclsRequest is a describe acl Request type
type DescribeAclsResponse ¶
type DescribeAclsResponse struct { Version int16 ThrottleTime time.Duration Err KError ErrMsg *string ResourceAcls []*ResourceAcls }
DescribeAclsResponse is a describe acl response type
type DescribeClientQuotasEntry ¶
type DescribeClientQuotasEntry struct { Entity []QuotaEntityComponent // The quota entity description. Values map[string]float64 // The quota values for the entity. }
type DescribeClientQuotasRequest ¶
type DescribeClientQuotasRequest struct { Version int16 Components []QuotaFilterComponent Strict bool }
A filter to be applied to matching client quotas. Components: the components to filter on Strict: whether the filter only includes specified components
type DescribeClientQuotasResponse ¶
type DescribeClientQuotasResponse struct { Version int16 ThrottleTime time.Duration // The duration in milliseconds for which the Request was throttled due to a quota violation, or zero if the Request did not violate any quota. ErrorCode KError // The error code, or `0` if the quota description succeeded. ErrorMsg *string // The error message, or `null` if the quota description succeeded. Entries []DescribeClientQuotasEntry // A result entry. }
type DescribeConfigError ¶
func (*DescribeConfigError) Error ¶
func (c *DescribeConfigError) Error() string
type DescribeConfigsRequest ¶
type DescribeConfigsRequest struct { Version int16 Resources []*ConfigResource IncludeSynonyms bool }
type DescribeConfigsResponse ¶
type DescribeConfigsResponse struct { Version int16 ThrottleTime time.Duration Resources []*ResourceResponse }
type DescribeGroupsRequest ¶
type DescribeGroupsRequest struct { Version int16 Groups []string IncludeAuthorizedOperations bool }
func (*DescribeGroupsRequest) AddGroup ¶
func (r *DescribeGroupsRequest) AddGroup(group string)
type DescribeGroupsResponse ¶
type DescribeGroupsResponse struct { // Version defines the protocol version to use for Encode and Decode Version int16 // ThrottleTimeMs contains the duration in milliseconds for which the // Request was throttled due to a quota violation, or zero if the Request // did not violate any quota. ThrottleTimeMs int32 // Groups contains each described group. Groups []*GroupDescription }
type DescribeLogDirsRequest ¶
type DescribeLogDirsRequest struct { // Version 0 and 1 are equal // The version number is bumped to indicate that on quota violation brokers send out responses before throttling. Version int16 // If this is an empty array, all topics will be queried DescribeTopics []DescribeLogDirsRequestTopic }
DescribeLogDirsRequest is a describe Request to get partitions' log size
type DescribeLogDirsRequestTopic ¶
DescribeLogDirsRequestTopic is a describe Request about the log dir of one or more partitions within a Topic
type DescribeLogDirsResponse ¶
type DescribeLogDirsResponse struct { ThrottleTime time.Duration // Version 0 and 1 are equal // The version number is bumped to indicate that on quota violation brokers send out responses before throttling. Version int16 LogDirs []DescribeLogDirsResponseDirMetadata }
type DescribeLogDirsResponseDirMetadata ¶
type DescribeLogDirsResponseDirMetadata struct { ErrorCode KError // The absolute log directory path Path string Topics []DescribeLogDirsResponseTopic }
type DescribeLogDirsResponsePartition ¶
type DescribeLogDirsResponsePartition struct { PartitionID int32 // The size of the log segments of the partition in bytes. Size int64 // The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or // current replica's LEO (if it is the future log for the partition) OffsetLag int64 // True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of // the replica in the future. IsTemporary bool }
DescribeLogDirsResponsePartition describes a partition's log directory
type DescribeLogDirsResponseTopic ¶
type DescribeLogDirsResponseTopic struct { Topic string Partitions []DescribeLogDirsResponsePartition }
DescribeLogDirsResponseTopic contains a topic's partitions descriptions
type DescribeUserScramCredentialsRequest ¶
type DescribeUserScramCredentialsRequest struct { // Version 0 is currently only supported Version int16 // If this is an empty array, all users will be queried DescribeUsers []DescribeUserScramCredentialsRequestUser }
DescribeUserScramCredentialsRequest is a Request to get list of SCRAM user names
type DescribeUserScramCredentialsRequestUser ¶
type DescribeUserScramCredentialsRequestUser struct {
Name string
}
DescribeUserScramCredentialsRequestUser is a describe Request about specific user name
type DescribeUserScramCredentialsResult ¶
type DescribeUserScramCredentialsResult struct { User string ErrorCode KError ErrorMessage *string CredentialInfos []*UserScramCredentialsResponseInfo }
type Encoder ¶
Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().
type EndTxnRequest ¶
type EndTxnResponse ¶
type FetchRequest ¶
type FetchRequest struct { // Version defines the protocol version to use for Encode and Decode Version int16 // ReplicaID contains the broker ID of the follower, of -1 if this Request // is from a consumer. // ReplicaID int32 // MaxWaitTime contains the maximum time in milliseconds to wait for the response. MaxWaitTime int32 // MinBytes contains the minimum bytes to accumulate in the response. MinBytes int32 // MaxBytes contains the maximum bytes to fetch. See KIP-74 for cases // where this limit may not be honored. MaxBytes int32 // Isolation contains a This setting controls the visibility of // transactional Records. Using READ_UNCOMMITTED (isolation_level = 0) // makes all Records visible. With READ_COMMITTED (isolation_level = 1), // non-transactional and COMMITTED transactional Records are visible. To be // more concrete, READ_COMMITTED returns all data from offsets smaller than // the current LSO (last stable offset), and enables the inclusion of the // list of aborted transactions in the result, which allows consumers to // discard ABORTED transactional Records Isolation IsolationLevel // SessionID contains the fetch session ID. SessionID int32 // SessionEpoch contains the epoch of the partition leader as known to the // follower replica or a consumer. SessionEpoch int32 // RackID contains a Rack ID of the consumer making this Request RackID string // contains filtered or unexported fields }
FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchResponse ¶
type FetchResponse struct { // Version defines the protocol version to use for Encode and Decode Version int16 // ThrottleTime contains the duration in milliseconds for which the Request // was throttled due to a quota violation, or zero if the Request did not // violate any quota. ThrottleTime time.Duration // ErrorCode contains the top level response error code. ErrorCode int16 // SessionID contains the fetch session ID, or 0 if this is not part of a fetch session. SessionID int32 // Blocks contains the response topics. Blocks map[string]map[int32]*FetchResponseBlock LogAppendTime bool Timestamp time.Time }
func (*FetchResponse) AddControlRecord ¶
func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType)
func (*FetchResponse) AddControlRecordWithTimestamp ¶
func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time)
func (*FetchResponse) AddError ¶
func (r *FetchResponse) AddError(topic string, partition int32, err KError)
func (*FetchResponse) AddMessage ¶
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)
func (*FetchResponse) AddMessageWithTimestamp ¶
func (*FetchResponse) AddRecord ¶
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64)
func (*FetchResponse) AddRecordBatch ¶
func (*FetchResponse) AddRecordBatchWithTimestamp ¶
func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time)
AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
func (*FetchResponse) AddRecordWithTimestamp ¶
func (*FetchResponse) GetBlock ¶
func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock
func (*FetchResponse) SetLastOffsetDelta ¶
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32)
func (*FetchResponse) SetLastStableOffset ¶
func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64)
type FetchResponseBlock ¶
type FetchResponseBlock struct { // Err contains the error code, or 0 if there was no fetch error. Err KError // HighWatermarkOffset contains the current high water mark. HighWaterMarkOffset int64 // LastStableOffset contains the last stable offset (or LSO) of the // partition. This is the last offset such that the state of all // transactional Records prior to this offset have been decided (ABORTED or // COMMITTED) LastStableOffset int64 LastRecordsBatchOffset *int64 // LogStartOffset contains the current log start offset. LogStartOffset int64 // AbortedTransactions contains the aborted transactions. AbortedTransactions []*AbortedTransaction // PreferredReadReplica contains the preferred read replica for the // consumer to use on its next fetch Request PreferredReadReplica int32 // RecordsSet contains the record data. RecordsSet []*Records Partial bool Records *Records // deprecated: use FetchResponseBlock.RecordsSet }
type FilterResponse ¶
type FilterResponse struct { Err KError ErrMsg *string MatchingAcls []*MatchingAcl }
FilterResponse is a filter response type
type FindCoordinatorRequest ¶
type FindCoordinatorRequest struct { Version int16 CoordinatorKey string CoordinatorType CoordinatorType }
type FindCoordinatorResponse ¶
type GroupDescription ¶
type GroupDescription struct { // Version defines the protocol version to use for Encode and Decode Version int16 // Err contains the describe error as the KError type. Err KError // ErrorCode contains the describe error, or 0 if there was no error. ErrorCode int16 // GroupId contains the group ID string. GroupId string // State contains the group state string, or the empty string. State string // ProtocolType contains the group protocol type, or the empty string. ProtocolType string // Protocol contains the group protocol data, or the empty string. Protocol string // Members contains the group members. Members map[string]*GroupMemberDescription // AuthorizedOperations contains a 32-bit bitfield to represent authorized // operations for this group. AuthorizedOperations int32 }
GroupDescription contains each described group.
type GroupMember ¶
type GroupMemberDescription ¶
type GroupMemberDescription struct { // Version defines the protocol version to use for Encode and Decode Version int16 // MemberId contains the member ID assigned by the group coordinator. MemberId string // GroupInstanceId contains the unique identifier of the consumer instance // provided by end user. GroupInstanceId *string // ClientId contains the client ID used in the member's latest join group // Request. ClientId string // ClientHost contains the client host. ClientHost string // MemberMetadata contains the metadata corresponding to the current group // protocol in use. MemberMetadata []byte // MemberAssignment contains the current assignment provided by the group // leader. MemberAssignment []byte }
GroupMemberDescription contains the group members.
func (*GroupMemberDescription) GetMemberAssignment ¶
func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)
func (*GroupMemberDescription) GetMemberMetadata ¶
func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error)
type GroupProtocol ¶
type HeartbeatRequest ¶
type HeartbeatResponse ¶
type IncrementalAlterConfigsEntry ¶
type IncrementalAlterConfigsEntry struct { Operation IncrementalAlterConfigsOperation Value *string }
type IncrementalAlterConfigsOperation ¶
type IncrementalAlterConfigsOperation int8
const ( IncrementalAlterConfigsOperationSet IncrementalAlterConfigsOperation = iota IncrementalAlterConfigsOperationDelete IncrementalAlterConfigsOperationAppend IncrementalAlterConfigsOperationSubtract )
type IncrementalAlterConfigsRequest ¶
type IncrementalAlterConfigsRequest struct { Version int16 Resources []*IncrementalAlterConfigsResource ValidateOnly bool }
IncrementalAlterConfigsRequest is an incremental alter config Request type
type IncrementalAlterConfigsResource ¶
type IncrementalAlterConfigsResource struct { Type ConfigResourceType Name string ConfigEntries map[string]IncrementalAlterConfigsEntry }
type IncrementalAlterConfigsResponse ¶
type IncrementalAlterConfigsResponse struct { Version int16 ThrottleTime time.Duration Resources []*AlterConfigsResourceResponse }
IncrementalAlterConfigsResponse is a response type for incremental alter config
type InitProducerIDRequest ¶
type InitProducerIDResponse ¶
type IsolationLevel ¶
type IsolationLevel int8
const ( ReadUncommitted IsolationLevel = iota ReadCommitted )
type JoinGroupRequest ¶
type JoinGroupRequest struct { // Version defines the protocol version to use for Encode and Decode Version int16 // GroupId contains the group identifier. GroupId string // SessionTimeout specifies that the coordinator should consider the consumer // dead if it receives no heartbeat after this timeout in milliseconds. SessionTimeout int32 // RebalanceTimeout contains the maximum time in milliseconds that the // coordinator will wait for each member to rejoin when rebalancing the // group. RebalanceTimeout int32 // MemberId contains the member id assigned by the group coordinator. MemberId string // GroupInstanceId contains the unique identifier of the consumer instance // provided by end user. GroupInstanceId *string // ProtocolType contains the unique name the for class of protocols // implemented by the group we want to join. ProtocolType string // GroupProtocols contains the list of protocols that the member supports. // deprecated; use OrderedGroupProtocols GroupProtocols map[string][]byte // OrderedGroupProtocols contains an ordered list of protocols that the member // supports. OrderedGroupProtocols []*GroupProtocol }
func (*JoinGroupRequest) AddGroupProtocol ¶
func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte)
func (*JoinGroupRequest) AddGroupProtocolMetadata ¶
func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error
type JoinGroupResponse ¶
type JoinGroupResponse struct { // Version defines the protocol version to use for Encode and Decode Version int16 // ThrottleTime contains the duration for which the Request was throttled due // to a quota violation, or zero if the Request did not violate any quota. ThrottleTime int32 // Err contains the error code, or 0 if there was no error. Err KError // GenerationId contains the generation ID of the group. GenerationId int32 // GroupProtocol contains the group protocol selected by the coordinator. GroupProtocol string // LeaderId contains the leader of the group. LeaderId string // MemberId contains the member ID assigned by the group coordinator. MemberId string // Members contains the per-group-member information. Members []GroupMember }
func (*JoinGroupResponse) GetMembers ¶
func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error)
type KError ¶
type KError int16
KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
const ( ErrUnknown KError = -1 // Errors.UNKNOWN_SERVER_ERROR ErrNoError KError = 0 // Errors.NONE ErrOffsetOutOfRange KError = 1 // Errors.OFFSET_OUT_OF_RANGE ErrInvalidMessage KError = 2 // Errors.CORRUPT_MESSAGE ErrUnknownTopicOrPartition KError = 3 // Errors.UNKNOWN_TOPIC_OR_PARTITION ErrInvalidMessageSize KError = 4 // Errors.INVALID_FETCH_SIZE ErrLeaderNotAvailable KError = 5 // Errors.LEADER_NOT_AVAILABLE ErrNotLeaderForPartition KError = 6 // Errors.NOT_LEADER_OR_FOLLOWER ErrRequestTimedOut KError = 7 // Errors.REQUEST_TIMED_OUT ErrBrokerNotAvailable KError = 8 // Errors.BROKER_NOT_AVAILABLE ErrReplicaNotAvailable KError = 9 // Errors.REPLICA_NOT_AVAILABLE ErrMessageSizeTooLarge KError = 10 // Errors.MESSAGE_TOO_LARGE ErrStaleControllerEpochCode KError = 11 // Errors.STALE_CONTROLLER_EPOCH ErrOffsetMetadataTooLarge KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE ErrNetworkException KError = 13 // Errors.NETWORK_EXCEPTION ErrOffsetsLoadInProgress KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS ErrConsumerCoordinatorNotAvailable KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE ErrNotCoordinatorForConsumer KError = 16 // Errors.NOT_COORDINATOR ErrInvalidTopic KError = 17 // Errors.INVALID_TOPIC_EXCEPTION ErrMessageSetSizeTooLarge KError = 18 // Errors.RECORD_LIST_TOO_LARGE ErrNotEnoughReplicas KError = 19 // Errors.NOT_ENOUGH_REPLICAS ErrNotEnoughReplicasAfterAppend KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND ErrInvalidRequiredAcks KError = 21 // Errors.INVALID_REQUIRED_ACKS ErrIllegalGeneration KError = 22 // Errors.ILLEGAL_GENERATION ErrInconsistentGroupProtocol KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL ErrInvalidGroupId KError = 24 // Errors.INVALID_GROUP_ID ErrUnknownMemberId KError = 25 // Errors.UNKNOWN_MEMBER_ID ErrInvalidSessionTimeout KError = 26 // Errors.INVALID_SESSION_TIMEOUT ErrRebalanceInProgress KError = 27 // Errors.REBALANCE_IN_PROGRESS ErrInvalidCommitOffsetSize KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE ErrTopicAuthorizationFailed KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED ErrGroupAuthorizationFailed KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED ErrClusterAuthorizationFailed KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED ErrInvalidTimestamp KError = 32 // Errors.INVALID_TIMESTAMP ErrUnsupportedSASLMechanism KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM ErrIllegalSASLState KError = 34 // Errors.ILLEGAL_SASL_STATE ErrUnsupportedVersion KError = 35 // Errors.UNSUPPORTED_VERSION ErrTopicAlreadyExists KError = 36 // Errors.TOPIC_ALREADY_EXISTS ErrInvalidPartitions KError = 37 // Errors.INVALID_PARTITIONS ErrInvalidReplicationFactor KError = 38 // Errors.INVALID_REPLICATION_FACTOR ErrInvalidReplicaAssignment KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT ErrInvalidConfig KError = 40 // Errors.INVALID_CONFIG ErrNotController KError = 41 // Errors.NOT_CONTROLLER ErrInvalidRequest KError = 42 // Errors.INVALID_REQUEST ErrUnsupportedForMessageFormat KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT ErrPolicyViolation KError = 44 // Errors.POLICY_VIOLATION ErrOutOfOrderSequenceNumber KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER ErrDuplicateSequenceNumber KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER ErrInvalidProducerEpoch KError = 47 // Errors.INVALID_PRODUCER_EPOCH ErrInvalidTxnState KError = 48 // Errors.INVALID_TXN_STATE ErrInvalidProducerIDMapping KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING ErrInvalidTransactionTimeout KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT ErrConcurrentTransactions KError = 51 // Errors.CONCURRENT_TRANSACTIONS ErrTransactionCoordinatorFenced KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ErrSecurityDisabled KError = 54 // Errors.SECURITY_DISABLED ErrOperationNotAttempted KError = 55 // Errors.OPERATION_NOT_ATTEMPTED ErrKafkaStorageError KError = 56 // Errors.KAFKA_STORAGE_ERROR ErrLogDirNotFound KError = 57 // Errors.LOG_DIR_NOT_FOUND ErrSASLAuthenticationFailed KError = 58 // Errors.SASL_AUTHENTICATION_FAILED ErrUnknownProducerID KError = 59 // Errors.UNKNOWN_PRODUCER_ID ErrReassignmentInProgress KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS ErrDelegationTokenAuthDisabled KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED ErrDelegationTokenNotFound KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND ErrDelegationTokenOwnerMismatch KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH ErrDelegationTokenRequestNotAllowed KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED ErrDelegationTokenExpired KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED ErrInvalidPrincipalType KError = 67 // Errors.INVALID_PRINCIPAL_TYPE ErrNonEmptyGroup KError = 68 // Errors.NON_EMPTY_GROUP ErrGroupIDNotFound KError = 69 // Errors.GROUP_ID_NOT_FOUND ErrFetchSessionIDNotFound KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND ErrInvalidFetchSessionEpoch KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH ErrListenerNotFound KError = 72 // Errors.LISTENER_NOT_FOUND ErrTopicDeletionDisabled KError = 73 // Errors.TOPIC_DELETION_DISABLED ErrFencedLeaderEpoch KError = 74 // Errors.FENCED_LEADER_EPOCH ErrUnknownLeaderEpoch KError = 75 // Errors.UNKNOWN_LEADER_EPOCH ErrUnsupportedCompressionType KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE ErrStaleBrokerEpoch KError = 77 // Errors.STALE_BROKER_EPOCH ErrOffsetNotAvailable KError = 78 // Errors.OFFSET_NOT_AVAILABLE ErrMemberIdRequired KError = 79 // Errors.MEMBER_ID_REQUIRED ErrPreferredLeaderNotAvailable KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE ErrGroupMaxSizeReached KError = 81 // Errors.GROUP_MAX_SIZE_REACHED ErrFencedInstancedId KError = 82 // Errors.FENCED_INSTANCE_ID ErrEligibleLeadersNotAvailable KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE ErrElectionNotNeeded KError = 84 // Errors.ELECTION_NOT_NEEDED ErrNoReassignmentInProgress KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS ErrGroupSubscribedToTopic KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC ErrInvalidRecord KError = 87 // Errors.INVALID_RECORD ErrUnstableOffsetCommit KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT )
Numeric error codes returned by the Kafka server.
type KafkaVersion ¶
type KafkaVersion struct {
// contains filtered or unexported fields
}
KafkaVersion instances represent versions of the upstream Kafka broker.
func ParseKafkaVersion ¶
func ParseKafkaVersion(s string) (KafkaVersion, error)
ParseKafkaVersion parses and returns kafka version or error from a string
func (KafkaVersion) IsAtLeast ¶
func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool
IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:
V1.IsAtLeast(V2) // false V2.IsAtLeast(V1) // true
func (KafkaVersion) String ¶
func (v KafkaVersion) String() string
type LeaveGroupRequest ¶
type LeaveGroupRequest struct { Version int16 GroupId string MemberId string // Removed in Version 3 Members []MemberIdentity // Added in Version 3 }
type LeaveGroupResponse ¶
type LeaveGroupResponse struct { Version int16 ThrottleTime int32 Err KError Members []MemberResponse }
type ListGroupsRequest ¶
type ListGroupsResponse ¶
type ListPartitionReassignmentsRequest ¶
type ListPartitionReassignmentsRequest struct { TimeoutMs int32 Version int16 // contains filtered or unexported fields }
func (*ListPartitionReassignmentsRequest) AddBlock ¶
func (r *ListPartitionReassignmentsRequest) AddBlock(topic string, partitionIDs []int32)
type ListPartitionReassignmentsResponse ¶
type MatchingAcl ¶
MatchingAcl is a matching acl type
type MemberIdentity ¶
type MemberResponse ¶
type Message ¶
type Message struct { Codec CompressionCodec // codec used to compress the message contents CompressionLevel int // compression level LogAppendTime bool // the used timestamp is LogAppendTime Key []byte // the message key, may be nil Value []byte // the message contents Set *MessageSet // the message set a message might wrap Version int8 // v1 requires Kafka 0.10 Timestamp time.Time // the timestamp of the message (version 1+ only) // contains filtered or unexported fields }
Message is a kafka message type
type MessageBlock ¶
func (*MessageBlock) Messages ¶
func (msb *MessageBlock) Messages() []*MessageBlock
Messages convenience helper which returns either all the messages that are wrapped in this block
type MessageSet ¶
type MessageSet struct { PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock OverflowMessage bool // whether the set on the wire contained an overflow message Messages []*MessageBlock }
type MetadataRequest ¶
type MetadataRequest struct { // Version defines the protocol version to use for Encode and Decode Version int16 // Topics contains the topics to fetch metadata for. Topics []string // AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so. AllowAutoTopicCreation bool IncludeClusterAuthorizedOperations bool // version 8 and up IncludeTopicAuthorizedOperations bool // version 8 and up }
type MetadataResponse ¶
type MetadataResponse struct { // Version defines the protocol version to use for Encode and Decode Version int16 // ThrottleTimeMs contains the duration in milliseconds for which the Request was throttled due to a quota violation, or zero if the Request did not violate any quota. ThrottleTimeMs int32 // Brokers contains each broker in the response. Brokers []*Broker // ClusterID contains the cluster ID that responding broker belongs to. ClusterID *string // ControllerID contains the ID of the controller broker. ControllerID int32 // Topics contains each topic in the response. Topics []*TopicMetadata ClusterAuthorizedOperations int32 // Only valid for Version >= 8 }
func (*MetadataResponse) AddBroker ¶
func (r *MetadataResponse) AddBroker(addr string, id int32)
func (*MetadataResponse) AddTopic ¶
func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata
func (*MetadataResponse) AddTopicPartition ¶
type OffsetCommitRequest ¶
type OffsetCommitRequest struct { ConsumerGroup string ConsumerGroupGeneration int32 // v1 or later ConsumerID string // v1 or later GroupInstanceId *string // v7 or later RetentionTime int64 // v2 or later // Version can be: // - 0 (kafka 0.8.1 and later) // - 1 (kafka 0.8.2 and later) // - 2 (kafka 0.9.0 and later) // - 3 (kafka 0.11.0 and later) // - 4 (kafka 2.0.0 and later) // - 5&6 (kafka 2.1.0 and later) // - 7 (kafka 2.3.0 and later) Version int16 // contains filtered or unexported fields }
func (*OffsetCommitRequest) AddBlockWithLeaderEpoch ¶
type OffsetCommitResponse ¶
type OffsetFetchRequest ¶
type OffsetFetchRequest struct { Version int16 ConsumerGroup string RequireStable bool // requires v7+ // contains filtered or unexported fields }
func (*OffsetFetchRequest) AddPartition ¶
func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)
func (*OffsetFetchRequest) ZeroPartitions ¶
func (r *OffsetFetchRequest) ZeroPartitions()
type OffsetFetchResponse ¶
type OffsetFetchResponse struct { Version int16 ThrottleTimeMs int32 Blocks map[string]map[int32]*OffsetFetchResponseBlock Err KError }
func (*OffsetFetchResponse) AddBlock ¶
func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock)
func (*OffsetFetchResponse) GetBlock ¶
func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock
type OffsetRequest ¶
type OffsetRequest struct { Version int16 IsolationLevel IsolationLevel // contains filtered or unexported fields }
func (*OffsetRequest) AddBlock ¶
func (r *OffsetRequest) AddBlock(topic string, partitionID int32, timestamp int64, maxOffsets int32)
func (*OffsetRequest) ReplicaID ¶
func (r *OffsetRequest) ReplicaID() int32
func (*OffsetRequest) SetReplicaID ¶
func (r *OffsetRequest) SetReplicaID(id int32)
type OffsetResponse ¶
type OffsetResponse struct { Version int16 ThrottleTimeMs int32 Blocks map[string]map[int32]*OffsetResponseBlock }
func (*OffsetResponse) AddTopicPartition ¶
func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)
func (*OffsetResponse) GetBlock ¶
func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock
type OffsetResponseBlock ¶
type OffsetResponseBlock struct { Err KError // Offsets contains the result offsets (for V0/V1 compatibility) Offsets []int64 // Version 0 // Timestamp contains the timestamp associated with the returned offset. Timestamp int64 // Version 1 // Offset contains the returned offset. Offset int64 // Version 1 // LeaderEpoch contains the current leader epoch of the partition. LeaderEpoch int32 }
type OwnedPartition ¶
type PacketDecodingError ¶
type PacketDecodingError struct {
Info string
}
PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.
func (PacketDecodingError) Error ¶
func (err PacketDecodingError) Error() string
type PacketEncodingError ¶
type PacketEncodingError struct {
Info string
}
PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to Encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
func (PacketEncodingError) Error ¶
func (err PacketEncodingError) Error() string
type PartitionError ¶
PartitionError is a partition error type
type PartitionMetadata ¶
type PartitionMetadata struct { // Version defines the protocol version to use for Encode and Decode Version int16 // Err contains the partition error, or 0 if there was no error. Err KError // ID contains the partition index. ID int32 // Leader contains the ID of the leader broker. Leader int32 // LeaderEpoch contains the leader epoch of this partition. LeaderEpoch int32 // Replicas contains the set of all nodes that host this partition. Replicas []int32 // Isr contains the set of nodes that are in sync with the leader for this partition. Isr []int32 // OfflineReplicas contains the set of offline replicas of this partition. OfflineReplicas []int32 }
PartitionMetadata contains each partition in the topic.
type PartitionOffsetMetadata ¶
type PartitionOffsetMetadata struct { // Partition contains the index of the partition within the topic. Partition int32 // Offset contains the message offset to be committed. Offset int64 // LeaderEpoch contains the leader epoch of the last consumed record. LeaderEpoch int32 // Metadata contains any associated metadata the client wants to keep. Metadata *string }
type ProduceRequest ¶
type ProduceRequest struct { TransactionalID *string RequiredAcks RequiredAcks Timeout int32 Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11 Records map[string]map[int32]Records }
func (*ProduceRequest) AddBatch ¶
func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch)
func (*ProduceRequest) AddMessage ¶
func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)
func (*ProduceRequest) AddSet ¶
func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)
type ProduceResponse ¶
type ProduceResponse struct { Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses Version int16 ThrottleTime time.Duration // v1, throttle_time_ms }
func (*ProduceResponse) AddTopicPartition ¶
func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)
func (*ProduceResponse) GetBlock ¶
func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock
type ProduceResponseBlock ¶
type ProduceResponseBlock struct { Err KError // v0, error_code Offset int64 // v0, base_offset Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime` StartOffset int64 // v5, log_start_offset }
partition_responses in protocol
type QuotaEntityComponent ¶
type QuotaEntityComponent struct { EntityType QuotaEntityType MatchType QuotaMatchType Name string }
type QuotaEntityType ¶
type QuotaEntityType string
const ( QuotaEntityUser QuotaEntityType = "user" QuotaEntityClientID QuotaEntityType = "client-id" )
type QuotaFilterComponent ¶
type QuotaFilterComponent struct { EntityType QuotaEntityType MatchType QuotaMatchType Match string }
Describe a component for applying a client quota filter. EntityType: the entity type the filter component applies to ("user", "client-id", "ip") MatchType: the match type of the filter component (any, exact, default) Match: the name that's matched exactly (used when MatchType is QuotaMatchExact)
type QuotaMatchType ¶
type QuotaMatchType int
const ( QuotaMatchExact QuotaMatchType = iota QuotaMatchDefault QuotaMatchAny )
type Record ¶
type Record struct { Headers []*RecordHeader Attributes int8 TimestampDelta time.Duration OffsetDelta int64 Key []byte Value []byte // contains filtered or unexported fields }
Record is kafka record type
type RecordBatch ¶
type RecordBatch struct { FirstOffset int64 PartitionLeaderEpoch int32 Version int8 Codec CompressionCodec CompressionLevel int Control bool LogAppendTime bool LastOffsetDelta int32 FirstTimestamp time.Time MaxTimestamp time.Time ProducerID int64 ProducerEpoch int16 FirstSequence int32 Records []*Record PartialTrailingRecord bool IsTransactional bool // contains filtered or unexported fields }
func (*RecordBatch) LastOffset ¶
func (b *RecordBatch) LastOffset() int64
type RecordHeader ¶
RecordHeader stores key and value for a record header
type Records ¶
type Records struct { MsgSet *MessageSet RecordBatch *RecordBatch // contains filtered or unexported fields }
Records implements a union type containing either a RecordBatch or a legacy MessageSet.
type RequiredAcks ¶
type RequiredAcks int16
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).
type Resource ¶
type Resource struct { ResourceType AclResourceType ResourceName string ResourcePatternType AclResourcePatternType }
Resource holds information about acl resource type
type ResourceAcls ¶
ResourceAcls is an acl resource type
type ResourceResponse ¶
type ResourceResponse struct { ErrorCode int16 ErrorMsg string Type ConfigResourceType Name string Configs []*ConfigEntry }
type SaslAuthenticateRequest ¶
type SaslHandshakeRequest ¶
type SaslHandshakeResponse ¶
type ScramMechanismType ¶
type ScramMechanismType int8
func (ScramMechanismType) String ¶
func (s ScramMechanismType) String() string
type StdLogger ¶
type StdLogger interface { Print(v ...interface{}) Printf(format string, v ...interface{}) Println(v ...interface{}) }
StdLogger is used to log error messages.
var DebugLogger StdLogger = &debugLogger{}
DebugLogger is the instance of a StdLogger that Sarama writes more verbose debug information to. By default it is set to redirect all debug to the default Logger above, but you can optionally set it to another StdLogger instance to (e.g.,) discard debug information
type StringEncoder ¶
type StringEncoder string
StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.
func (StringEncoder) Encode ¶
func (s StringEncoder) Encode() ([]byte, error)
func (StringEncoder) Length ¶
func (s StringEncoder) Length() int
type SyncGroupRequest ¶
type SyncGroupRequest struct { // Version defines the protocol version to use for Encode and Decode Version int16 // GroupId contains the unique group identifier. GroupId string // GenerationId contains the generation of the group. GenerationId int32 // MemberId contains the member ID assigned by the group. MemberId string // GroupInstanceId contains the unique identifier of the consumer instance provided by end user. GroupInstanceId *string // GroupAssignments contains each assignment. GroupAssignments []SyncGroupRequestAssignment }
func (*SyncGroupRequest) AddGroupAssignment ¶
func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte)
func (*SyncGroupRequest) AddGroupAssignmentMember ¶
func (r *SyncGroupRequest) AddGroupAssignmentMember( memberId string, memberAssignment *ConsumerGroupMemberAssignment, ) error
type SyncGroupResponse ¶
type SyncGroupResponse struct { // Version defines the protocol version to use for Encode and Decode Version int16 // ThrottleTime contains the duration in milliseconds for which the // Request was throttled due to a quota violation, or zero if the Request // did not violate any quota. ThrottleTime int32 // Err contains the error code, or 0 if there was no error. Err KError // MemberAssignment contains the member assignment. MemberAssignment []byte }
func (*SyncGroupResponse) GetMemberAssignment ¶
func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)
type TopicDetail ¶
type TopicDetail struct { // NumPartitions contains the number of partitions to create in the topic, or // -1 if we are either specifying a manual partition assignment or using the // default partitions. NumPartitions int32 // ReplicationFactor contains the number of replicas to create for each // partition in the topic, or -1 if we are either specifying a manual // partition assignment or using the default replication factor. ReplicationFactor int16 // ReplicaAssignment contains the manual partition assignment, or the empty // array if we are using automatic assignment. ReplicaAssignment map[int32][]int32 // ConfigEntries contains the custom topic configurations to set. ConfigEntries map[string]*string }
type TopicError ¶
func (*TopicError) Error ¶
func (t *TopicError) Error() string
func (*TopicError) Unwrap ¶
func (t *TopicError) Unwrap() error
type TopicMetadata ¶
type TopicMetadata struct { // Version defines the protocol version to use for Encode and Decode Version int16 // Err contains the topic error, or 0 if there was no error. Err KError // Name contains the topic name. Name string Uuid Uuid // IsInternal contains a True if the topic is internal. IsInternal bool // Partitions contains each partition in the topic. Partitions []*PartitionMetadata TopicAuthorizedOperations int32 // Only valid for Version >= 8 }
TopicMetadata contains each topic in the response.
type TopicPartition ¶
type TopicPartitionError ¶
func (*TopicPartitionError) Error ¶
func (t *TopicPartitionError) Error() string
func (*TopicPartitionError) Unwrap ¶
func (t *TopicPartitionError) Unwrap() error
type TxnOffsetCommitRequest ¶
type TxnOffsetCommitResponse ¶
type TxnOffsetCommitResponse struct { Version int16 ThrottleTime time.Duration Topics map[string][]*PartitionError }
type UserScramCredentialsResponseInfo ¶
type UserScramCredentialsResponseInfo struct { Mechanism ScramMechanismType Iterations int32 }
type ZstdDecoderParams ¶
type ZstdDecoderParams struct { }
type ZstdEncoderParams ¶
type ZstdEncoderParams struct {
Level int
}
Source Files
¶
- acl_bindings.go
- acl_create_request.go
- acl_create_response.go
- acl_delete_request.go
- acl_delete_response.go
- acl_describe_request.go
- acl_describe_response.go
- acl_filter.go
- acl_types.go
- add_offsets_to_txn_request.go
- add_offsets_to_txn_response.go
- add_partitions_to_txn_request.go
- add_partitions_to_txn_response.go
- alter_client_quotas_request.go
- alter_client_quotas_response.go
- alter_configs_request.go
- alter_configs_response.go
- alter_partition_reassignments_request.go
- alter_partition_reassignments_response.go
- alter_user_scram_credentials_request.go
- alter_user_scram_credentials_response.go
- api_versions_request.go
- api_versions_response.go
- broker.go
- compress.go
- config_resource_type.go
- consumer_group_members.go
- consumer_metadata_request.go
- consumer_metadata_response.go
- control_record.go
- crc32_field.go
- create_partitions_request.go
- create_partitions_response.go
- create_topics_request.go
- create_topics_response.go
- decompress.go
- delete_groups_request.go
- delete_groups_response.go
- delete_offsets_request.go
- delete_offsets_response.go
- delete_records_request.go
- delete_records_response.go
- delete_topics_request.go
- delete_topics_response.go
- describe_client_quotas_request.go
- describe_client_quotas_response.go
- describe_configs_request.go
- describe_configs_response.go
- describe_groups_request.go
- describe_groups_response.go
- describe_log_dirs_request.go
- describe_log_dirs_response.go
- describe_user_scram_credentials_request.go
- describe_user_scram_credentials_response.go
- encoder_decoder.go
- end_txn_request.go
- end_txn_response.go
- errors.go
- fetch_request.go
- fetch_response.go
- find_coordinator_request.go
- find_coordinator_response.go
- gofka.go
- heartbeat_request.go
- heartbeat_response.go
- incremental_alter_configs_request.go
- incremental_alter_configs_response.go
- init_producer_id_request.go
- init_producer_id_response.go
- join_group_request.go
- join_group_response.go
- leave_group_request.go
- leave_group_response.go
- length_field.go
- list_groups_request.go
- list_groups_response.go
- list_partition_reassignments_request.go
- list_partition_reassignments_response.go
- message.go
- message_set.go
- metadata_request.go
- metadata_response.go
- offset_commit_request.go
- offset_commit_response.go
- offset_fetch_request.go
- offset_fetch_response.go
- offset_request.go
- offset_response.go
- packet_decoder.go
- packet_encoder.go
- prep_encoder.go
- produce_request.go
- produce_response.go
- quota_types.go
- real_decoder.go
- real_encoder.go
- record.go
- record_batch.go
- records.go
- request.go
- response_header.go
- sasl_authenticate_request.go
- sasl_authenticate_response.go
- sasl_handshake_request.go
- sasl_handshake_response.go
- scram_formatter.go
- sync_group_request.go
- sync_group_response.go
- timestamp.go
- txn_offset_commit_request.go
- txn_offset_commit_response.go
- utils.go
- zstd.go