protocol

package
v0.0.0-...-16c1efd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2024 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

Package protocol is a pure Go library for dealing with Apache Kafka (versions 0.8 and later). It includes a low-level API for controlling bytes on the wire and the ability to encode and decode both requests and responses.

Example (EncodeAndDecodeRequest)

The following example shows how to use the package to encode and subsequently decode a Request.

package main

import (
	"bytes"
	"fmt"

	"github.com/MadsRC/gofka/protocol"
)

func main() {
	request := protocol.Request{
		CorrelationID: 123,
		ClientID:      "anID",
		Body: &protocol.ApiVersionsRequest{
			Version:               3,
			ClientSoftwareName:    "gofka",
			ClientSoftwareVersion: "0.10.0",
		},
	}

	packet, err := protocol.Encode(&request)
	if err != nil {
		panic(err)
	}
	fmt.Printf("On-wire request: %v\n", packet)

	decoded, _, err := protocol.DecodeRequest(bytes.NewReader(packet))
	if err != nil {
		panic(err)
	}

	body := decoded.Body.(*protocol.ApiVersionsRequest)
	fmt.Printf("Correlation ID: %d, Client ID: %s\n", decoded.CorrelationID, decoded.ClientID)
	fmt.Printf("Body: Version %d, ClientSoftwareName %s, ClientSoftwareVersion %s\n", body.Version, body.ClientSoftwareName, body.ClientSoftwareVersion)
}
Output:

On-wire request: [0 0 0 29 0 18 0 3 0 0 0 123 0 4 97 110 73 68 0 6 103 111 102 107 97 7 48 46 49 48 46 48 0]
Correlation ID: 123, Client ID: anID
Body: Version 3, ClientSoftwareName gofka, ClientSoftwareVersion 0.10.0

Index

Examples

Constants

View Source
const (
	// SASLTypeSCRAMSHA256 represents the SCRAM-SHA-256 mechanism.
	SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
	// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
	SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
)
View Source
const (
	SCRAM_MECHANISM_UNKNOWN = iota // 0
	SCRAM_MECHANISM_SHA_256        // 1
	SCRAM_MECHANISM_SHA_512        // 2
)
View Source
const APIKeySASLAuth = 36

APIKeySASLAuth is the API key for the SaslAuthenticate Kafka API

View Source
const ReceiveTime int64 = -1

ReceiveTime is a special value for the timestamp field of Offset Commit Requests which tells the broker to set the timestamp to the time at which the Request was received. The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.

Variables

View Source
var (
	// Logger is the instance of a StdLogger interface that Sarama writes connection
	// management events to. By default it is set to discard all log messages via io.Discard,
	// but you can set it to redirect wherever you want.
	Logger StdLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags)

	// MaxRequestSize is the maximum size (in bytes) of any Request that Sarama will attempt to send. Trying
	// to send a Request larger than this will result in an PacketEncodingError. The default of 100 MiB is aligned
	// with Kafka's default `socket.Request.max.bytes`, which is the largest Request the broker will attempt
	// to process.
	MaxRequestSize int32 = 100 * 1024 * 1024

	// MaxResponseSize is the maximum size (in bytes) of any response that Sarama will attempt to parse. If
	// a broker returns a response message larger than this value, Sarama will return a PacketDecodingError to
	// protect the client from running out of memory. Please note that brokers do not have any natural limit on
	// the size of responses they send. In particular, they can send arbitrarily large fetch responses to consumers
	// (see https://issues.apache.org/jira/browse/KAFKA-2063).
	MaxResponseSize int32 = 100 * 1024 * 1024
)
View Source
var (
	V0_8_2_0  = newKafkaVersion(0, 8, 2, 0)
	V0_8_2_1  = newKafkaVersion(0, 8, 2, 1)
	V0_8_2_2  = newKafkaVersion(0, 8, 2, 2)
	V0_9_0_0  = newKafkaVersion(0, 9, 0, 0)
	V0_9_0_1  = newKafkaVersion(0, 9, 0, 1)
	V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
	V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
	V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
	V0_10_1_1 = newKafkaVersion(0, 10, 1, 1)
	V0_10_2_0 = newKafkaVersion(0, 10, 2, 0)
	V0_10_2_1 = newKafkaVersion(0, 10, 2, 1)
	V0_10_2_2 = newKafkaVersion(0, 10, 2, 2)
	V0_11_0_0 = newKafkaVersion(0, 11, 0, 0)
	V0_11_0_1 = newKafkaVersion(0, 11, 0, 1)
	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
	V1_0_1_0  = newKafkaVersion(1, 0, 1, 0)
	V1_0_2_0  = newKafkaVersion(1, 0, 2, 0)
	V1_1_0_0  = newKafkaVersion(1, 1, 0, 0)
	V1_1_1_0  = newKafkaVersion(1, 1, 1, 0)
	V2_0_0_0  = newKafkaVersion(2, 0, 0, 0)
	V2_0_1_0  = newKafkaVersion(2, 0, 1, 0)
	V2_1_0_0  = newKafkaVersion(2, 1, 0, 0)
	V2_1_1_0  = newKafkaVersion(2, 1, 1, 0)
	V2_2_0_0  = newKafkaVersion(2, 2, 0, 0)
	V2_2_1_0  = newKafkaVersion(2, 2, 1, 0)
	V2_2_2_0  = newKafkaVersion(2, 2, 2, 0)
	V2_3_0_0  = newKafkaVersion(2, 3, 0, 0)
	V2_3_1_0  = newKafkaVersion(2, 3, 1, 0)
	V2_4_0_0  = newKafkaVersion(2, 4, 0, 0)
	V2_4_1_0  = newKafkaVersion(2, 4, 1, 0)
	V2_5_0_0  = newKafkaVersion(2, 5, 0, 0)
	V2_5_1_0  = newKafkaVersion(2, 5, 1, 0)
	V2_6_0_0  = newKafkaVersion(2, 6, 0, 0)
	V2_6_1_0  = newKafkaVersion(2, 6, 1, 0)
	V2_6_2_0  = newKafkaVersion(2, 6, 2, 0)
	V2_6_3_0  = newKafkaVersion(2, 6, 3, 0)
	V2_7_0_0  = newKafkaVersion(2, 7, 0, 0)
	V2_7_1_0  = newKafkaVersion(2, 7, 1, 0)
	V2_7_2_0  = newKafkaVersion(2, 7, 2, 0)
	V2_8_0_0  = newKafkaVersion(2, 8, 0, 0)
	V2_8_1_0  = newKafkaVersion(2, 8, 1, 0)
	V2_8_2_0  = newKafkaVersion(2, 8, 2, 0)
	V3_0_0_0  = newKafkaVersion(3, 0, 0, 0)
	V3_0_1_0  = newKafkaVersion(3, 0, 1, 0)
	V3_0_2_0  = newKafkaVersion(3, 0, 2, 0)
	V3_1_0_0  = newKafkaVersion(3, 1, 0, 0)
	V3_1_1_0  = newKafkaVersion(3, 1, 1, 0)
	V3_1_2_0  = newKafkaVersion(3, 1, 2, 0)
	V3_2_0_0  = newKafkaVersion(3, 2, 0, 0)
	V3_2_1_0  = newKafkaVersion(3, 2, 1, 0)
	V3_2_2_0  = newKafkaVersion(3, 2, 2, 0)
	V3_2_3_0  = newKafkaVersion(3, 2, 3, 0)
	V3_3_0_0  = newKafkaVersion(3, 3, 0, 0)
	V3_3_1_0  = newKafkaVersion(3, 3, 1, 0)
	V3_3_2_0  = newKafkaVersion(3, 3, 2, 0)
	V3_4_0_0  = newKafkaVersion(3, 4, 0, 0)
	V3_4_1_0  = newKafkaVersion(3, 4, 1, 0)
	V3_5_0_0  = newKafkaVersion(3, 5, 0, 0)
	V3_5_1_0  = newKafkaVersion(3, 5, 1, 0)
	V3_6_0_0  = newKafkaVersion(3, 6, 0, 0)

	SupportedVersions = []KafkaVersion{
		V0_8_2_0,
		V0_8_2_1,
		V0_8_2_2,
		V0_9_0_0,
		V0_9_0_1,
		V0_10_0_0,
		V0_10_0_1,
		V0_10_1_0,
		V0_10_1_1,
		V0_10_2_0,
		V0_10_2_1,
		V0_10_2_2,
		V0_11_0_0,
		V0_11_0_1,
		V0_11_0_2,
		V1_0_0_0,
		V1_0_1_0,
		V1_0_2_0,
		V1_1_0_0,
		V1_1_1_0,
		V2_0_0_0,
		V2_0_1_0,
		V2_1_0_0,
		V2_1_1_0,
		V2_2_0_0,
		V2_2_1_0,
		V2_2_2_0,
		V2_3_0_0,
		V2_3_1_0,
		V2_4_0_0,
		V2_4_1_0,
		V2_5_0_0,
		V2_5_1_0,
		V2_6_0_0,
		V2_6_1_0,
		V2_6_2_0,
		V2_7_0_0,
		V2_7_1_0,
		V2_8_0_0,
		V2_8_1_0,
		V2_8_2_0,
		V3_0_0_0,
		V3_0_1_0,
		V3_0_2_0,
		V3_1_0_0,
		V3_1_1_0,
		V3_1_2_0,
		V3_2_0_0,
		V3_2_1_0,
		V3_2_2_0,
		V3_2_3_0,
		V3_3_0_0,
		V3_3_1_0,
		V3_3_2_0,
		V3_4_0_0,
		V3_4_1_0,
		V3_5_0_0,
		V3_5_1_0,
		V3_6_0_0,
	}
	MinVersion     = V0_8_2_0
	MaxVersion     = V3_6_0_0
	DefaultVersion = V2_1_0_0
)

Effective constants defining the supported kafka versions.

View Source
var ErrAddPartitionsToTxn = errors.New("transaction manager: failed to send partitions to transaction")

ErrAddPartitionsToTxn is returned when AddPartitionsToTxn failed multiple times

View Source
var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated")

ErrAlreadyConnected is the error returned when calling Open() on a Broker that is already connected or connecting.

View Source
var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found")

ErrBrokerNotFound is the error returned when there's no broker found for the requested ID.

View Source
var ErrCannotTransitionNilError = errors.New("transaction manager: cannot transition with a nil error")

ErrCannotTransitionNilError when transition is attempted with an nil error.

View Source
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")

ErrClosedClient is the error returned when a method is called on a client that has been closed.

View Source
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")

ErrConsumerOffsetNotAdvanced is returned when a partition consumer didn't advance its offset after parsing a RecordBatch.

View Source
var ErrControllerNotAvailable = errors.New("kafka: controller is not available")

ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version is lower than 0.10.0.0.

View Source
var ErrCreateACLs = errors.New("kafka server: failed to create one or more ACL rules")

ErrCreateACLs is the type of error returned when ACL creation failed

View Source
var ErrDeleteRecords = errors.New("kafka server: failed to delete Records")

ErrDeleteRecords is the type of error returned when fail to delete the required Records

View Source
var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks")

ErrIncompleteResponse is the error returned when the server returns a syntactically valid response, but it does not contain the expected information.

View Source
var ErrInsufficientData = errors.New("kafka: insufficient data to Decode packet, more bytes expected")

ErrInsufficientData is returned when decoding and the packet is truncated. This can be expected when requesting messages, since as an optimization the server is allowed to return a partial message at the end of the message set.

View Source
var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index")

ErrInvalidPartition is the error returned when a partitioner returns an invalid partition index (meaning one outside of the range [0...numPartitions-1]).

View Source
var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")

ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max

View Source
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")

ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update the metadata.

View Source
var ErrNonTransactedProducer = errors.New("transaction manager: you need to add TransactionalID to producer")

ErrNonTransactedProducer when calling BeginTxn, CommitTxn or AbortTxn on a non transactional producer.

View Source
var ErrNotConnected = errors.New("kafka: broker not connected")

ErrNotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.

View Source
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to")

ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored or otherwise failed to respond.

View Source
var ErrReassignPartitions = errors.New("failed to reassign partitions for topic")

ErrReassignPartitions is returned when altering partition assignments for a topic fails

View Source
var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down")

ErrShuttingDown is returned when a producer receives a message during shutdown.

View Source
var ErrTransactionNotReady = errors.New("transaction manager: transaction is not ready")

ErrTransactionNotReady when transaction status is invalid for the current action.

View Source
var ErrTransitionNotAllowed = errors.New("transaction manager: invalid transition attempted")

ErrTransitionNotAllowed when txnmgr state transition is not valid.

View Source
var ErrTxnOffsetCommit = errors.New("transaction manager: failed to send offsets to transaction")

ErrTxnOffsetCommit is returned when TxnOffsetCommit failed multiple times

View Source
var ErrTxnUnableToParseResponse = errors.New("transaction manager: unable to parse response")

ErrTxnUnableToParseResponse when response is nil

View Source
var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided")

ErrUnknownScramMechanism is returned when user tries to AlterUserScramCredentials with unknown SCRAM mechanism

View Source
var NoNode = &Broker{id: -1, addr: ":-1"}
View Source
var NullUUID = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}

Functions

func Decode

func Decode(buf []byte, in decoder) error

Decode takes bytes and a decoder and fills the fields of the decoder from the bytes, interpreted using Kafka's encoding rules.

Example (Request)

ExampleDecode_request shows how to decode a Request from raw bytes. Note how, when we call Decode, that the function does not expect the payload to be prefixed with the length of the payload.

If you're reading directly from a byte stream, an alternative is to use DecodeRequest which will read the length of the payload and then call Decode on the payload.

length := int32(binary.BigEndian.Uint32(rawRequest[0:4]))
bytesRead := 4

encodedReq := make([]byte, length)
encodedReq = rawRequest[4:]

bytesRead += len(encodedReq)

req := &Request{}
err := Decode(encodedReq, req)
if err != nil {
	panic(err)
}

fmt.Println("On-wire request:", rawRequest)
fmt.Printf("Correlation ID: %d, Client ID: %s\n", req.CorrelationID, req.ClientID)
fmt.Printf("Body: Version %d, ClientSoftwareName %s, ClientSoftwareVersion %s\n", req.Body.version(), req.Body.(*ApiVersionsRequest).ClientSoftwareName, req.Body.(*ApiVersionsRequest).ClientSoftwareVersion)
Output:

On-wire request: [0 0 0 29 0 18 0 3 0 0 0 123 0 4 97 110 73 68 0 6 103 111 102 107 97 7 48 46 49 48 46 48 0]
Correlation ID: 123, Client ID: anID
Body: Version 3, ClientSoftwareName gofka, ClientSoftwareVersion 0.10.0

func Encode

func Encode(e encoder) ([]byte, error)

Encode takes an Encoder and turns it into bytes.

Example (Request)

ExampleEncode_request shows how to encode a Request into raw bytes.

req := &Request{
	CorrelationID: 123,
	ClientID:      "anID",
	Body: &ApiVersionsRequest{
		Version:               3,
		ClientSoftwareName:    "gofka",
		ClientSoftwareVersion: "0.10.0",
	},
}

encodedReq, err := Encode(req)

if err != nil {
	panic(err)
}

fmt.Println("On-wire request:", encodedReq)
Output:

On-wire request: [0 0 0 29 0 18 0 3 0 0 0 123 0 4 97 110 73 68 0 6 103 111 102 107 97 7 48 46 49 48 46 48 0]
Example (RequestBody)

ExampleEncode_requestBody shows how to encode a particular request body into raw bytes.

avr := &ApiVersionsRequest{
	Version:               3,
	ClientSoftwareName:    "gofka",
	ClientSoftwareVersion: "0.10.0",
}

encodedReq, err := Encode(avr)
if err != nil {
	panic(err)
}

fmt.Println("On-wire request:", encodedReq)
Output:

On-wire request: [6 103 111 102 107 97 7 48 46 49 48 46 48 0]

Types

type AbortedTransaction

type AbortedTransaction struct {
	// ProducerID contains the producer id associated with the aborted transaction.
	ProducerID int64
	// FirstOffset contains the first offset in the aborted transaction.
	FirstOffset int64
}

type Acl

type Acl struct {
	Principal      string
	Host           string
	Operation      AclOperation
	PermissionType AclPermissionType
}

Acl holds information about acl type

type AclCreation

type AclCreation struct {
	Resource
	Acl
}

AclCreation is a wrapper around Resource and Acl type

type AclCreationResponse

type AclCreationResponse struct {
	Err    KError
	ErrMsg *string
}

AclCreationResponse is an acl creation response type

type AclFilter

type AclFilter struct {
	Version                   int
	ResourceType              AclResourceType
	ResourceName              *string
	ResourcePatternTypeFilter AclResourcePatternType
	Principal                 *string
	Host                      *string
	Operation                 AclOperation
	PermissionType            AclPermissionType
}

type AclOperation

type AclOperation int
const (
	AclOperationUnknown AclOperation = iota
	AclOperationAny
	AclOperationAll
	AclOperationRead
	AclOperationWrite
	AclOperationCreate
	AclOperationDelete
	AclOperationAlter
	AclOperationDescribe
	AclOperationClusterAction
	AclOperationDescribeConfigs
	AclOperationAlterConfigs
	AclOperationIdempotentWrite
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java

func (*AclOperation) MarshalText

func (a *AclOperation) MarshalText() ([]byte, error)

MarshalText returns the text form of the AclOperation (name without prefix)

func (*AclOperation) String

func (a *AclOperation) String() string

func (*AclOperation) UnmarshalText

func (a *AclOperation) UnmarshalText(text []byte) error

UnmarshalText takes a text representation of the operation and converts it to an AclOperation

type AclPermissionType

type AclPermissionType int
const (
	AclPermissionUnknown AclPermissionType = iota
	AclPermissionAny
	AclPermissionDeny
	AclPermissionAllow
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java

func (*AclPermissionType) MarshalText

func (a *AclPermissionType) MarshalText() ([]byte, error)

MarshalText returns the text form of the AclPermissionType (name without prefix)

func (*AclPermissionType) String

func (a *AclPermissionType) String() string

func (*AclPermissionType) UnmarshalText

func (a *AclPermissionType) UnmarshalText(text []byte) error

UnmarshalText takes a text representation of the permission type and converts it to an AclPermissionType

type AclResourcePatternType

type AclResourcePatternType int
const (
	AclPatternUnknown AclResourcePatternType = iota
	AclPatternAny
	AclPatternMatch
	AclPatternLiteral
	AclPatternPrefixed
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java

func (*AclResourcePatternType) MarshalText

func (a *AclResourcePatternType) MarshalText() ([]byte, error)

MarshalText returns the text form of the AclResourcePatternType (name without prefix)

func (*AclResourcePatternType) String

func (a *AclResourcePatternType) String() string

func (*AclResourcePatternType) UnmarshalText

func (a *AclResourcePatternType) UnmarshalText(text []byte) error

UnmarshalText takes a text representation of the resource pattern type and converts it to an AclResourcePatternType

type AclResourceType

type AclResourceType int
const (
	AclResourceUnknown AclResourceType = iota
	AclResourceAny
	AclResourceTopic
	AclResourceGroup
	AclResourceCluster
	AclResourceTransactionalID
	AclResourceDelegationToken
)

ref: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java

func (*AclResourceType) MarshalText

func (a *AclResourceType) MarshalText() ([]byte, error)

MarshalText returns the text form of the AclResourceType (name without prefix)

func (*AclResourceType) String

func (a *AclResourceType) String() string

func (*AclResourceType) UnmarshalText

func (a *AclResourceType) UnmarshalText(text []byte) error

UnmarshalText takes a text representation of the resource type and converts it to an AclResourceType

type AddOffsetsToTxnRequest

type AddOffsetsToTxnRequest struct {
	Version         int16
	TransactionalID string
	ProducerID      int64
	ProducerEpoch   int16
	GroupID         string
}

AddOffsetsToTxnRequest adds offsets to a transaction Request

type AddOffsetsToTxnResponse

type AddOffsetsToTxnResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Err          KError
}

AddOffsetsToTxnResponse is a response type for adding offsets to txns

type AddPartitionsToTxnRequest

type AddPartitionsToTxnRequest struct {
	Version         int16
	TransactionalID string
	ProducerID      int64
	ProducerEpoch   int16
	TopicPartitions map[string][]int32
}

AddPartitionsToTxnRequest is a add partition Request

type AddPartitionsToTxnResponse

type AddPartitionsToTxnResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Errors       map[string][]*PartitionError
}

AddPartitionsToTxnResponse is a partition errors to transaction type

type AlterClientQuotasEntry

type AlterClientQuotasEntry struct {
	Entity []QuotaEntityComponent // The quota entity to alter.
	Ops    []ClientQuotasOp       // An individual quota configuration entry to alter.
}

type AlterClientQuotasEntryResponse

type AlterClientQuotasEntryResponse struct {
	ErrorCode KError                 // The error code, or `0` if the quota alteration succeeded.
	ErrorMsg  *string                // The error message, or `null` if the quota alteration succeeded.
	Entity    []QuotaEntityComponent // The quota entity altered.
}

type AlterClientQuotasRequest

type AlterClientQuotasRequest struct {
	Version      int16
	Entries      []AlterClientQuotasEntry // The quota configuration entries to alter.
	ValidateOnly bool                     // Whether the alteration should be validated, but not performed.
}

type AlterClientQuotasResponse

type AlterClientQuotasResponse struct {
	Version      int16
	ThrottleTime time.Duration                    // The duration in milliseconds for which the Request was throttled due to a quota violation, or zero if the Request did not violate any quota.
	Entries      []AlterClientQuotasEntryResponse // The quota configuration entries altered.
}

type AlterConfigError

type AlterConfigError struct {
	Err    KError
	ErrMsg string
}

func (*AlterConfigError) Error

func (c *AlterConfigError) Error() string

type AlterConfigsRequest

type AlterConfigsRequest struct {
	Version      int16
	Resources    []*AlterConfigsResource
	ValidateOnly bool
}

AlterConfigsRequest is an alter config Request type

type AlterConfigsResource

type AlterConfigsResource struct {
	Type          ConfigResourceType
	Name          string
	ConfigEntries map[string]*string
}

AlterConfigsResource is an alter config resource type

type AlterConfigsResourceResponse

type AlterConfigsResourceResponse struct {
	ErrorCode int16
	ErrorMsg  string
	Type      ConfigResourceType
	Name      string
}

AlterConfigsResourceResponse is a response type for alter config resource

type AlterConfigsResponse

type AlterConfigsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Resources    []*AlterConfigsResourceResponse
}

AlterConfigsResponse is a response type for alter config

type AlterPartitionReassignmentsRequest

type AlterPartitionReassignmentsRequest struct {
	TimeoutMs int32

	Version int16
	// contains filtered or unexported fields
}

func (*AlterPartitionReassignmentsRequest) AddBlock

func (r *AlterPartitionReassignmentsRequest) AddBlock(topic string, partitionID int32, replicas []int32)

type AlterPartitionReassignmentsResponse

type AlterPartitionReassignmentsResponse struct {
	Version        int16
	ThrottleTimeMs int32
	ErrorCode      KError
	ErrorMessage   *string
	Errors         map[string]map[int32]*alterPartitionReassignmentsErrorBlock
}

func (*AlterPartitionReassignmentsResponse) AddError

func (r *AlterPartitionReassignmentsResponse) AddError(topic string, partition int32, kerror KError, message *string)

type AlterUserScramCredentialsDelete

type AlterUserScramCredentialsDelete struct {
	Name      string
	Mechanism ScramMechanismType
}

type AlterUserScramCredentialsRequest

type AlterUserScramCredentialsRequest struct {
	Version int16

	// Deletions represent list of SCRAM credentials to remove
	Deletions []AlterUserScramCredentialsDelete

	// Upsertions represent list of SCRAM credentials to update/insert
	Upsertions []AlterUserScramCredentialsUpsert
}

type AlterUserScramCredentialsResponse

type AlterUserScramCredentialsResponse struct {
	Version int16

	ThrottleTime time.Duration

	Results []*AlterUserScramCredentialsResult
}

type AlterUserScramCredentialsResult

type AlterUserScramCredentialsResult struct {
	User string

	ErrorCode    KError
	ErrorMessage *string
}

type AlterUserScramCredentialsUpsert

type AlterUserScramCredentialsUpsert struct {
	Name       string
	Mechanism  ScramMechanismType
	Iterations int32
	Salt       []byte

	// This field is never transmitted over the wire
	// @see: https://tools.ietf.org/html/rfc5802
	Password []byte
	// contains filtered or unexported fields
}

type ApiVersionsRequest

type ApiVersionsRequest struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ClientSoftwareName contains the name of the client.
	ClientSoftwareName string
	// ClientSoftwareVersion contains the version of the client.
	ClientSoftwareVersion string
}

type ApiVersionsResponse

type ApiVersionsResponse struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ErrorCode contains the top-level error code.
	ErrorCode int16
	// ApiKeys contains the APIs supported by the broker.
	ApiKeys []ApiVersionsResponseKey
	// ThrottleTimeMs contains the duration in milliseconds for which the Request was throttled due to a quota violation, or zero if the Request did not violate any quota.
	ThrottleTimeMs int32
}

type ApiVersionsResponseKey

type ApiVersionsResponseKey struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ApiKey contains the API index.
	ApiKey int16
	// MinVersion contains the minimum supported version, inclusive.
	MinVersion int16
	// MaxVersion contains the maximum supported version, inclusive.
	MaxVersion int16
}

ApiVersionsResponseKey contains the APIs supported by the broker.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.

type ByteEncoder

type ByteEncoder []byte

ByteEncoder implements the Encoder interface for Go byte slices so that they can be used as the Key or Value in a ProducerMessage.

func (ByteEncoder) Encode

func (b ByteEncoder) Encode() ([]byte, error)

func (ByteEncoder) Length

func (b ByteEncoder) Length() int

type ClientQuotasOp

type ClientQuotasOp struct {
	Key    string  // The quota configuration key.
	Value  float64 // The value to set, otherwise ignored if the value is to be removed.
	Remove bool    // Whether the quota configuration value should be removed, otherwise set.
}

type CompressionCodec

type CompressionCodec int8

CompressionCodec represents the various compression codecs recognized by Kafka in messages.

const (
	// CompressionNone no compression
	CompressionNone CompressionCodec = iota
	// CompressionGZIP compression using GZIP
	CompressionGZIP
	// CompressionSnappy compression using snappy
	CompressionSnappy
	// CompressionLZ4 compression using LZ4
	CompressionLZ4
	// CompressionZSTD compression using ZSTD
	CompressionZSTD

	// CompressionLevelDefault is the constant to use in CompressionLevel
	// to have the default compression level for any codec. The value is picked
	// that we don't use any existing compression levels.
	CompressionLevelDefault = -1000
)

func (CompressionCodec) MarshalText

func (cc CompressionCodec) MarshalText() ([]byte, error)

MarshalText transforms a CompressionCodec into its string representation.

func (CompressionCodec) String

func (cc CompressionCodec) String() string

func (*CompressionCodec) UnmarshalText

func (cc *CompressionCodec) UnmarshalText(text []byte) error

UnmarshalText returns a CompressionCodec from its string representation.

type ConfigEntry

type ConfigEntry struct {
	Name      string
	Value     string
	ReadOnly  bool
	Default   bool
	Source    ConfigSource
	Sensitive bool
	Synonyms  []*ConfigSynonym
}

type ConfigResource

type ConfigResource struct {
	Type        ConfigResourceType
	Name        string
	ConfigNames []string
}

type ConfigResourceType

type ConfigResourceType int8

ConfigResourceType is a type for resources that have configs.

const (

	// TopicResource constant type
	TopicResource ConfigResourceType = 2
)

type ConfigSource

type ConfigSource int8
const (
	SourceUnknown ConfigSource = iota
	SourceTopic
	SourceDynamicBroker
	SourceDynamicDefaultBroker
	SourceStaticBroker
	SourceDefault
)

func (ConfigSource) String

func (s ConfigSource) String() string

type ConfigSynonym

type ConfigSynonym struct {
	ConfigName  string
	ConfigValue string
	Source      ConfigSource
}

type ConfigurationError

type ConfigurationError string

ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer) when the specified configuration is invalid.

func (ConfigurationError) Error

func (err ConfigurationError) Error() string

type ConsumerGroupMemberAssignment

type ConsumerGroupMemberAssignment struct {
	Version  int16
	Topics   map[string][]int32
	UserData []byte
}

ConsumerGroupMemberAssignment holds the member assignment for a consume group https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolAssignment.json

type ConsumerGroupMemberMetadata

type ConsumerGroupMemberMetadata struct {
	Version         int16
	Topics          []string
	UserData        []byte
	OwnedPartitions []*OwnedPartition
	GenerationID    int32
	RackID          *string
}

ConsumerGroupMemberMetadata holds the metadata for consumer group https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/ConsumerProtocolSubscription.json

type ConsumerMetadataRequest

type ConsumerMetadataRequest struct {
	Version       int16
	ConsumerGroup string
}

ConsumerMetadataRequest is used for metadata requests

type ConsumerMetadataResponse

type ConsumerMetadataResponse struct {
	Version         int16
	Err             KError
	Coordinator     *Broker
	CoordinatorID   int32  // deprecated: use Coordinator.ID()
	CoordinatorHost string // deprecated: use Coordinator.Addr()
	CoordinatorPort int32  // deprecated: use Coordinator.Addr()
}

ConsumerMetadataResponse holds the response for a consumer group meta data requests

type ControlRecord

type ControlRecord struct {
	Version          int16
	CoordinatorEpoch int32
	Type             ControlRecordType
}

Control Records are returned as a record by fetchRequest However unlike "normal" Records, they mean nothing application wise. They only serve internal logic for supporting transactions.

type ControlRecordType

type ControlRecordType int

ControlRecordType ...

const (
	// ControlRecordAbort is a control record for abort
	ControlRecordAbort ControlRecordType = iota
	// ControlRecordCommit is a control record for commit
	ControlRecordCommit
	// ControlRecordUnknown is a control record of unknown type
	ControlRecordUnknown
)

type CoordinatorType

type CoordinatorType int8
const (
	CoordinatorGroup CoordinatorType = iota
	CoordinatorTransaction
)

type CreateAclsRequest

type CreateAclsRequest struct {
	Version      int16
	AclCreations []*AclCreation
}

CreateAclsRequest is an acl creation Request

type CreateAclsResponse

type CreateAclsResponse struct {
	Version              int16
	ThrottleTime         time.Duration
	AclCreationResponses []*AclCreationResponse
}

CreateAclsResponse is a an acl response creation type

type CreatePartitionsRequest

type CreatePartitionsRequest struct {
	Version         int16
	TopicPartitions map[string]*TopicPartition
	Timeout         time.Duration
	ValidateOnly    bool
}

type CreatePartitionsResponse

type CreatePartitionsResponse struct {
	Version              int16
	ThrottleTime         time.Duration
	TopicPartitionErrors map[string]*TopicPartitionError
}

type CreateTopicsRequest

type CreateTopicsRequest struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// TopicDetails contains the topics to create.
	TopicDetails map[string]*TopicDetail
	// Timeout contains how long to wait before timing out the Request.
	Timeout time.Duration
	// ValidateOnly if true, check that the topics can be created as specified,
	// but don't create anything.
	ValidateOnly bool
}

type CreateTopicsResponse

type CreateTopicsResponse struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ThrottleTime contains the duration for which the Request was throttled due
	// to a quota violation, or zero if the Request did not violate any quota.
	ThrottleTime time.Duration
	// TopicErrors contains a map of any errors for the topics we tried to create.
	TopicErrors map[string]*TopicError
}

type DeleteAclsRequest

type DeleteAclsRequest struct {
	Version int
	Filters []*AclFilter
}

DeleteAclsRequest is a delete acl Request

type DeleteAclsResponse

type DeleteAclsResponse struct {
	Version         int16
	ThrottleTime    time.Duration
	FilterResponses []*FilterResponse
}

DeleteAclsResponse is a delete acl response

type DeleteGroupsRequest

type DeleteGroupsRequest struct {
	Version int16
	Groups  []string
}

func (*DeleteGroupsRequest) AddGroup

func (r *DeleteGroupsRequest) AddGroup(group string)

type DeleteGroupsResponse

type DeleteGroupsResponse struct {
	Version         int16
	ThrottleTime    time.Duration
	GroupErrorCodes map[string]KError
}

type DeleteOffsetsRequest

type DeleteOffsetsRequest struct {
	Version int16
	Group   string
	// contains filtered or unexported fields
}

func (*DeleteOffsetsRequest) AddPartition

func (r *DeleteOffsetsRequest) AddPartition(topic string, partitionID int32)

type DeleteOffsetsResponse

type DeleteOffsetsResponse struct {
	Version int16
	// The top-level error code, or 0 if there was no error.
	ErrorCode    KError
	ThrottleTime time.Duration
	// The responses for each partition of the topics.
	Errors map[string]map[int32]KError
}

func (*DeleteOffsetsResponse) AddError

func (r *DeleteOffsetsResponse) AddError(topic string, partition int32, errorCode KError)

type DeleteRecordsRequest

type DeleteRecordsRequest struct {
	Version int16
	Topics  map[string]*DeleteRecordsRequestTopic
	Timeout time.Duration
}

type DeleteRecordsRequestTopic

type DeleteRecordsRequestTopic struct {
	PartitionOffsets map[int32]int64 // partition => offset
}

type DeleteRecordsResponse

type DeleteRecordsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Topics       map[string]*DeleteRecordsResponseTopic
}

type DeleteRecordsResponsePartition

type DeleteRecordsResponsePartition struct {
	LowWatermark int64
	Err          KError
}

type DeleteRecordsResponseTopic

type DeleteRecordsResponseTopic struct {
	Partitions map[int32]*DeleteRecordsResponsePartition
}

type DeleteTopicsRequest

type DeleteTopicsRequest struct {
	Version int16
	Topics  []string
	Timeout time.Duration
}

type DeleteTopicsResponse

type DeleteTopicsResponse struct {
	Version         int16
	ThrottleTime    time.Duration
	TopicErrorCodes map[string]KError
}

type DescribeAclsRequest

type DescribeAclsRequest struct {
	Version int
	AclFilter
}

DescribeAclsRequest is a describe acl Request type

type DescribeAclsResponse

type DescribeAclsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Err          KError
	ErrMsg       *string
	ResourceAcls []*ResourceAcls
}

DescribeAclsResponse is a describe acl response type

type DescribeClientQuotasEntry

type DescribeClientQuotasEntry struct {
	Entity []QuotaEntityComponent // The quota entity description.
	Values map[string]float64     // The quota values for the entity.
}

type DescribeClientQuotasRequest

type DescribeClientQuotasRequest struct {
	Version    int16
	Components []QuotaFilterComponent
	Strict     bool
}

A filter to be applied to matching client quotas. Components: the components to filter on Strict: whether the filter only includes specified components

type DescribeClientQuotasResponse

type DescribeClientQuotasResponse struct {
	Version      int16
	ThrottleTime time.Duration               // The duration in milliseconds for which the Request was throttled due to a quota violation, or zero if the Request did not violate any quota.
	ErrorCode    KError                      // The error code, or `0` if the quota description succeeded.
	ErrorMsg     *string                     // The error message, or `null` if the quota description succeeded.
	Entries      []DescribeClientQuotasEntry // A result entry.
}

type DescribeConfigError

type DescribeConfigError struct {
	Err    KError
	ErrMsg string
}

func (*DescribeConfigError) Error

func (c *DescribeConfigError) Error() string

type DescribeConfigsRequest

type DescribeConfigsRequest struct {
	Version         int16
	Resources       []*ConfigResource
	IncludeSynonyms bool
}

type DescribeConfigsResponse

type DescribeConfigsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Resources    []*ResourceResponse
}

type DescribeGroupsRequest

type DescribeGroupsRequest struct {
	Version                     int16
	Groups                      []string
	IncludeAuthorizedOperations bool
}

func (*DescribeGroupsRequest) AddGroup

func (r *DescribeGroupsRequest) AddGroup(group string)

type DescribeGroupsResponse

type DescribeGroupsResponse struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ThrottleTimeMs contains the duration in milliseconds for which the
	// Request was throttled due to a quota violation, or zero if the Request
	// did not violate any quota.
	ThrottleTimeMs int32
	// Groups contains each described group.
	Groups []*GroupDescription
}

type DescribeLogDirsRequest

type DescribeLogDirsRequest struct {
	// Version 0 and 1 are equal
	// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
	Version int16

	// If this is an empty array, all topics will be queried
	DescribeTopics []DescribeLogDirsRequestTopic
}

DescribeLogDirsRequest is a describe Request to get partitions' log size

type DescribeLogDirsRequestTopic

type DescribeLogDirsRequestTopic struct {
	Topic        string
	PartitionIDs []int32
}

DescribeLogDirsRequestTopic is a describe Request about the log dir of one or more partitions within a Topic

type DescribeLogDirsResponse

type DescribeLogDirsResponse struct {
	ThrottleTime time.Duration

	// Version 0 and 1 are equal
	// The version number is bumped to indicate that on quota violation brokers send out responses before throttling.
	Version int16

	LogDirs []DescribeLogDirsResponseDirMetadata
}

type DescribeLogDirsResponseDirMetadata

type DescribeLogDirsResponseDirMetadata struct {
	ErrorCode KError

	// The absolute log directory path
	Path   string
	Topics []DescribeLogDirsResponseTopic
}

type DescribeLogDirsResponsePartition

type DescribeLogDirsResponsePartition struct {
	PartitionID int32

	// The size of the log segments of the partition in bytes.
	Size int64

	// The lag of the log's LEO w.r.t. partition's HW (if it is the current log for the partition) or
	// current replica's LEO (if it is the future log for the partition)
	OffsetLag int64

	// True if this log is created by AlterReplicaLogDirsRequest and will replace the current log of
	// the replica in the future.
	IsTemporary bool
}

DescribeLogDirsResponsePartition describes a partition's log directory

type DescribeLogDirsResponseTopic

type DescribeLogDirsResponseTopic struct {
	Topic      string
	Partitions []DescribeLogDirsResponsePartition
}

DescribeLogDirsResponseTopic contains a topic's partitions descriptions

type DescribeUserScramCredentialsRequest

type DescribeUserScramCredentialsRequest struct {
	// Version 0 is currently only supported
	Version int16

	// If this is an empty array, all users will be queried
	DescribeUsers []DescribeUserScramCredentialsRequestUser
}

DescribeUserScramCredentialsRequest is a Request to get list of SCRAM user names

type DescribeUserScramCredentialsRequestUser

type DescribeUserScramCredentialsRequestUser struct {
	Name string
}

DescribeUserScramCredentialsRequestUser is a describe Request about specific user name

type DescribeUserScramCredentialsResponse

type DescribeUserScramCredentialsResponse struct {
	// Version 0 is currently only supported
	Version int16

	ThrottleTime time.Duration

	ErrorCode    KError
	ErrorMessage *string

	Results []*DescribeUserScramCredentialsResult
}

type DescribeUserScramCredentialsResult

type DescribeUserScramCredentialsResult struct {
	User string

	ErrorCode    KError
	ErrorMessage *string

	CredentialInfos []*UserScramCredentialsResponseInfo
}

type Encoder

type Encoder interface {
	Encode() ([]byte, error)
	Length() int
}

Encoder is a simple interface for any type that can be encoded as an array of bytes in order to be sent as the key or value of a Kafka message. Length() is provided as an optimization, and must return the same as len() on the result of Encode().

type EndTxnRequest

type EndTxnRequest struct {
	Version           int16
	TransactionalID   string
	ProducerID        int64
	ProducerEpoch     int16
	TransactionResult bool
}

type EndTxnResponse

type EndTxnResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Err          KError
}

type FetchRequest

type FetchRequest struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ReplicaID contains the broker ID of the follower, of -1 if this Request
	// is from a consumer.
	// ReplicaID int32
	// MaxWaitTime contains the maximum time in milliseconds to wait for the response.
	MaxWaitTime int32
	// MinBytes contains the minimum bytes to accumulate in the response.
	MinBytes int32
	// MaxBytes contains the maximum bytes to fetch.  See KIP-74 for cases
	// where this limit may not be honored.
	MaxBytes int32
	// Isolation contains a This setting controls the visibility of
	// transactional Records. Using READ_UNCOMMITTED (isolation_level = 0)
	// makes all Records visible. With READ_COMMITTED (isolation_level = 1),
	// non-transactional and COMMITTED transactional Records are visible. To be
	// more concrete, READ_COMMITTED returns all data from offsets smaller than
	// the current LSO (last stable offset), and enables the inclusion of the
	// list of aborted transactions in the result, which allows consumers to
	// discard ABORTED transactional Records
	Isolation IsolationLevel
	// SessionID contains the fetch session ID.
	SessionID int32
	// SessionEpoch contains the epoch of the partition leader as known to the
	// follower replica or a consumer.
	SessionEpoch int32

	// RackID contains a Rack ID of the consumer making this Request
	RackID string
	// contains filtered or unexported fields
}

FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

func (*FetchRequest) AddBlock

func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32, leaderEpoch int32)

type FetchResponse

type FetchResponse struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ThrottleTime contains the duration in milliseconds for which the Request
	// was throttled due to a quota violation, or zero if the Request did not
	// violate any quota.
	ThrottleTime time.Duration
	// ErrorCode contains the top level response error code.
	ErrorCode int16
	// SessionID contains the fetch session ID, or 0 if this is not part of a fetch session.
	SessionID int32
	// Blocks contains the response topics.
	Blocks map[string]map[int32]*FetchResponseBlock

	LogAppendTime bool
	Timestamp     time.Time
}

func (*FetchResponse) AddControlRecord

func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType)

func (*FetchResponse) AddControlRecordWithTimestamp

func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time)

func (*FetchResponse) AddError

func (r *FetchResponse) AddError(topic string, partition int32, err KError)

func (*FetchResponse) AddMessage

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) AddMessageWithTimestamp

func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8)

func (*FetchResponse) AddRecord

func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64)

func (*FetchResponse) AddRecordBatch

func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool)

func (*FetchResponse) AddRecordBatchWithTimestamp

func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time)

AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions

func (*FetchResponse) AddRecordWithTimestamp

func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time)

func (*FetchResponse) GetBlock

func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock

func (*FetchResponse) SetLastOffsetDelta

func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32)

func (*FetchResponse) SetLastStableOffset

func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64)

type FetchResponseBlock

type FetchResponseBlock struct {
	// Err contains the error code, or 0 if there was no fetch error.
	Err KError
	// HighWatermarkOffset contains the current high water mark.
	HighWaterMarkOffset int64
	// LastStableOffset contains the last stable offset (or LSO) of the
	// partition. This is the last offset such that the state of all
	// transactional Records prior to this offset have been decided (ABORTED or
	// COMMITTED)
	LastStableOffset       int64
	LastRecordsBatchOffset *int64
	// LogStartOffset contains the current log start offset.
	LogStartOffset int64
	// AbortedTransactions contains the aborted transactions.
	AbortedTransactions []*AbortedTransaction
	// PreferredReadReplica contains the preferred read replica for the
	// consumer to use on its next fetch Request
	PreferredReadReplica int32
	// RecordsSet contains the record data.
	RecordsSet []*Records

	Partial bool
	Records *Records // deprecated: use FetchResponseBlock.RecordsSet
}

type FilterResponse

type FilterResponse struct {
	Err          KError
	ErrMsg       *string
	MatchingAcls []*MatchingAcl
}

FilterResponse is a filter response type

type FindCoordinatorRequest

type FindCoordinatorRequest struct {
	Version         int16
	CoordinatorKey  string
	CoordinatorType CoordinatorType
}

type FindCoordinatorResponse

type FindCoordinatorResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Err          KError
	ErrMsg       *string
	Coordinator  *Broker
}

type GroupData

type GroupData struct {
	GroupState string // version 4 or later
}

type GroupDescription

type GroupDescription struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// Err contains the describe error as the KError type.
	Err KError
	// ErrorCode contains the describe error, or 0 if there was no error.
	ErrorCode int16
	// GroupId contains the group ID string.
	GroupId string
	// State contains the group state string, or the empty string.
	State string
	// ProtocolType contains the group protocol type, or the empty string.
	ProtocolType string
	// Protocol contains the group protocol data, or the empty string.
	Protocol string
	// Members contains the group members.
	Members map[string]*GroupMemberDescription
	// AuthorizedOperations contains a 32-bit bitfield to represent authorized
	// operations for this group.
	AuthorizedOperations int32
}

GroupDescription contains each described group.

type GroupMember

type GroupMember struct {
	// MemberId contains the group member ID.
	MemberId string
	// GroupInstanceId contains the unique identifier of the consumer instance
	// provided by end user.
	GroupInstanceId *string
	// Metadata contains the group member metadata.
	Metadata []byte
}

type GroupMemberDescription

type GroupMemberDescription struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// MemberId contains the member ID assigned by the group coordinator.
	MemberId string
	// GroupInstanceId contains the unique identifier of the consumer instance
	// provided by end user.
	GroupInstanceId *string
	// ClientId contains the client ID used in the member's latest join group
	// Request.
	ClientId string
	// ClientHost contains the client host.
	ClientHost string
	// MemberMetadata contains the metadata corresponding to the current group
	// protocol in use.
	MemberMetadata []byte
	// MemberAssignment contains the current assignment provided by the group
	// leader.
	MemberAssignment []byte
}

GroupMemberDescription contains the group members.

func (*GroupMemberDescription) GetMemberAssignment

func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

func (*GroupMemberDescription) GetMemberMetadata

func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error)

type GroupProtocol

type GroupProtocol struct {
	// Name contains the protocol name.
	Name string
	// Metadata contains the protocol metadata.
	Metadata []byte
}

type HeartbeatRequest

type HeartbeatRequest struct {
	Version         int16
	GroupId         string
	GenerationId    int32
	MemberId        string
	GroupInstanceId *string
}

type HeartbeatResponse

type HeartbeatResponse struct {
	Version      int16
	ThrottleTime int32
	Err          KError
}

type IncrementalAlterConfigsEntry

type IncrementalAlterConfigsEntry struct {
	Operation IncrementalAlterConfigsOperation
	Value     *string
}

type IncrementalAlterConfigsOperation

type IncrementalAlterConfigsOperation int8
const (
	IncrementalAlterConfigsOperationSet IncrementalAlterConfigsOperation = iota
	IncrementalAlterConfigsOperationDelete
	IncrementalAlterConfigsOperationAppend
	IncrementalAlterConfigsOperationSubtract
)

type IncrementalAlterConfigsRequest

type IncrementalAlterConfigsRequest struct {
	Version      int16
	Resources    []*IncrementalAlterConfigsResource
	ValidateOnly bool
}

IncrementalAlterConfigsRequest is an incremental alter config Request type

type IncrementalAlterConfigsResource

type IncrementalAlterConfigsResource struct {
	Type          ConfigResourceType
	Name          string
	ConfigEntries map[string]IncrementalAlterConfigsEntry
}

type IncrementalAlterConfigsResponse

type IncrementalAlterConfigsResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Resources    []*AlterConfigsResourceResponse
}

IncrementalAlterConfigsResponse is a response type for incremental alter config

type InitProducerIDRequest

type InitProducerIDRequest struct {
	Version            int16
	TransactionalID    *string
	TransactionTimeout time.Duration
	ProducerID         int64
	ProducerEpoch      int16
}

type InitProducerIDResponse

type InitProducerIDResponse struct {
	ThrottleTime  time.Duration
	Err           KError
	Version       int16
	ProducerID    int64
	ProducerEpoch int16
}

type IsolationLevel

type IsolationLevel int8
const (
	ReadUncommitted IsolationLevel = iota
	ReadCommitted
)

type JoinGroupRequest

type JoinGroupRequest struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// GroupId contains the group identifier.
	GroupId string
	// SessionTimeout specifies that the coordinator should consider the consumer
	// dead if it receives no heartbeat after this timeout in milliseconds.
	SessionTimeout int32
	// RebalanceTimeout contains the maximum time in milliseconds that the
	// coordinator will wait for each member to rejoin when rebalancing the
	// group.
	RebalanceTimeout int32
	// MemberId contains the member id assigned by the group coordinator.
	MemberId string
	// GroupInstanceId contains the unique identifier of the consumer instance
	// provided by end user.
	GroupInstanceId *string
	// ProtocolType contains the unique name the for class of protocols
	// implemented by the group we want to join.
	ProtocolType string
	// GroupProtocols contains the list of protocols that the member supports.
	// deprecated; use OrderedGroupProtocols
	GroupProtocols map[string][]byte
	// OrderedGroupProtocols contains an ordered list of protocols that the member
	// supports.
	OrderedGroupProtocols []*GroupProtocol
}

func (*JoinGroupRequest) AddGroupProtocol

func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte)

func (*JoinGroupRequest) AddGroupProtocolMetadata

func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error

type JoinGroupResponse

type JoinGroupResponse struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ThrottleTime contains the duration for which the Request was throttled due
	// to a quota violation, or zero if the Request did not violate any quota.
	ThrottleTime int32
	// Err contains the error code, or 0 if there was no error.
	Err KError
	// GenerationId contains the generation ID of the group.
	GenerationId int32
	// GroupProtocol contains the group protocol selected by the coordinator.
	GroupProtocol string
	// LeaderId contains the leader of the group.
	LeaderId string
	// MemberId contains the member ID assigned by the group coordinator.
	MemberId string
	// Members contains the per-group-member information.
	Members []GroupMember
}

func (*JoinGroupResponse) GetMembers

type KError

type KError int16

KError is the type of error that can be returned directly by the Kafka broker. See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

const (
	ErrUnknown                            KError = -1 // Errors.UNKNOWN_SERVER_ERROR
	ErrNoError                            KError = 0  // Errors.NONE
	ErrOffsetOutOfRange                   KError = 1  // Errors.OFFSET_OUT_OF_RANGE
	ErrInvalidMessage                     KError = 2  // Errors.CORRUPT_MESSAGE
	ErrUnknownTopicOrPartition            KError = 3  // Errors.UNKNOWN_TOPIC_OR_PARTITION
	ErrInvalidMessageSize                 KError = 4  // Errors.INVALID_FETCH_SIZE
	ErrLeaderNotAvailable                 KError = 5  // Errors.LEADER_NOT_AVAILABLE
	ErrNotLeaderForPartition              KError = 6  // Errors.NOT_LEADER_OR_FOLLOWER
	ErrRequestTimedOut                    KError = 7  // Errors.REQUEST_TIMED_OUT
	ErrBrokerNotAvailable                 KError = 8  // Errors.BROKER_NOT_AVAILABLE
	ErrReplicaNotAvailable                KError = 9  // Errors.REPLICA_NOT_AVAILABLE
	ErrMessageSizeTooLarge                KError = 10 // Errors.MESSAGE_TOO_LARGE
	ErrStaleControllerEpochCode           KError = 11 // Errors.STALE_CONTROLLER_EPOCH
	ErrOffsetMetadataTooLarge             KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE
	ErrNetworkException                   KError = 13 // Errors.NETWORK_EXCEPTION
	ErrOffsetsLoadInProgress              KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS
	ErrConsumerCoordinatorNotAvailable    KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE
	ErrNotCoordinatorForConsumer          KError = 16 // Errors.NOT_COORDINATOR
	ErrInvalidTopic                       KError = 17 // Errors.INVALID_TOPIC_EXCEPTION
	ErrMessageSetSizeTooLarge             KError = 18 // Errors.RECORD_LIST_TOO_LARGE
	ErrNotEnoughReplicas                  KError = 19 // Errors.NOT_ENOUGH_REPLICAS
	ErrNotEnoughReplicasAfterAppend       KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
	ErrInvalidRequiredAcks                KError = 21 // Errors.INVALID_REQUIRED_ACKS
	ErrIllegalGeneration                  KError = 22 // Errors.ILLEGAL_GENERATION
	ErrInconsistentGroupProtocol          KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL
	ErrInvalidGroupId                     KError = 24 // Errors.INVALID_GROUP_ID
	ErrUnknownMemberId                    KError = 25 // Errors.UNKNOWN_MEMBER_ID
	ErrInvalidSessionTimeout              KError = 26 // Errors.INVALID_SESSION_TIMEOUT
	ErrRebalanceInProgress                KError = 27 // Errors.REBALANCE_IN_PROGRESS
	ErrInvalidCommitOffsetSize            KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE
	ErrTopicAuthorizationFailed           KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED
	ErrGroupAuthorizationFailed           KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED
	ErrClusterAuthorizationFailed         KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED
	ErrInvalidTimestamp                   KError = 32 // Errors.INVALID_TIMESTAMP
	ErrUnsupportedSASLMechanism           KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM
	ErrIllegalSASLState                   KError = 34 // Errors.ILLEGAL_SASL_STATE
	ErrUnsupportedVersion                 KError = 35 // Errors.UNSUPPORTED_VERSION
	ErrTopicAlreadyExists                 KError = 36 // Errors.TOPIC_ALREADY_EXISTS
	ErrInvalidPartitions                  KError = 37 // Errors.INVALID_PARTITIONS
	ErrInvalidReplicationFactor           KError = 38 // Errors.INVALID_REPLICATION_FACTOR
	ErrInvalidReplicaAssignment           KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT
	ErrInvalidConfig                      KError = 40 // Errors.INVALID_CONFIG
	ErrNotController                      KError = 41 // Errors.NOT_CONTROLLER
	ErrInvalidRequest                     KError = 42 // Errors.INVALID_REQUEST
	ErrUnsupportedForMessageFormat        KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT
	ErrPolicyViolation                    KError = 44 // Errors.POLICY_VIOLATION
	ErrOutOfOrderSequenceNumber           KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER
	ErrDuplicateSequenceNumber            KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER
	ErrInvalidProducerEpoch               KError = 47 // Errors.INVALID_PRODUCER_EPOCH
	ErrInvalidTxnState                    KError = 48 // Errors.INVALID_TXN_STATE
	ErrInvalidProducerIDMapping           KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING
	ErrInvalidTransactionTimeout          KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT
	ErrConcurrentTransactions             KError = 51 // Errors.CONCURRENT_TRANSACTIONS
	ErrTransactionCoordinatorFenced       KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED
	ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
	ErrSecurityDisabled                   KError = 54 // Errors.SECURITY_DISABLED
	ErrOperationNotAttempted              KError = 55 // Errors.OPERATION_NOT_ATTEMPTED
	ErrKafkaStorageError                  KError = 56 // Errors.KAFKA_STORAGE_ERROR
	ErrLogDirNotFound                     KError = 57 // Errors.LOG_DIR_NOT_FOUND
	ErrSASLAuthenticationFailed           KError = 58 // Errors.SASL_AUTHENTICATION_FAILED
	ErrUnknownProducerID                  KError = 59 // Errors.UNKNOWN_PRODUCER_ID
	ErrReassignmentInProgress             KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS
	ErrDelegationTokenAuthDisabled        KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED
	ErrDelegationTokenNotFound            KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND
	ErrDelegationTokenOwnerMismatch       KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH
	ErrDelegationTokenRequestNotAllowed   KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
	ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
	ErrDelegationTokenExpired             KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED
	ErrInvalidPrincipalType               KError = 67 // Errors.INVALID_PRINCIPAL_TYPE
	ErrNonEmptyGroup                      KError = 68 // Errors.NON_EMPTY_GROUP
	ErrGroupIDNotFound                    KError = 69 // Errors.GROUP_ID_NOT_FOUND
	ErrFetchSessionIDNotFound             KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND
	ErrInvalidFetchSessionEpoch           KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH
	ErrListenerNotFound                   KError = 72 // Errors.LISTENER_NOT_FOUND
	ErrTopicDeletionDisabled              KError = 73 // Errors.TOPIC_DELETION_DISABLED
	ErrFencedLeaderEpoch                  KError = 74 // Errors.FENCED_LEADER_EPOCH
	ErrUnknownLeaderEpoch                 KError = 75 // Errors.UNKNOWN_LEADER_EPOCH
	ErrUnsupportedCompressionType         KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE
	ErrStaleBrokerEpoch                   KError = 77 // Errors.STALE_BROKER_EPOCH
	ErrOffsetNotAvailable                 KError = 78 // Errors.OFFSET_NOT_AVAILABLE
	ErrMemberIdRequired                   KError = 79 // Errors.MEMBER_ID_REQUIRED
	ErrPreferredLeaderNotAvailable        KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE
	ErrGroupMaxSizeReached                KError = 81 // Errors.GROUP_MAX_SIZE_REACHED
	ErrFencedInstancedId                  KError = 82 // Errors.FENCED_INSTANCE_ID
	ErrEligibleLeadersNotAvailable        KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
	ErrElectionNotNeeded                  KError = 84 // Errors.ELECTION_NOT_NEEDED
	ErrNoReassignmentInProgress           KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS
	ErrGroupSubscribedToTopic             KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC
	ErrInvalidRecord                      KError = 87 // Errors.INVALID_RECORD
	ErrUnstableOffsetCommit               KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT

)

Numeric error codes returned by the Kafka server.

func (KError) Error

func (err KError) Error() string

type KafkaVersion

type KafkaVersion struct {
	// contains filtered or unexported fields
}

KafkaVersion instances represent versions of the upstream Kafka broker.

func ParseKafkaVersion

func ParseKafkaVersion(s string) (KafkaVersion, error)

ParseKafkaVersion parses and returns kafka version or error from a string

func (KafkaVersion) IsAtLeast

func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool

IsAtLeast return true if and only if the version it is called on is greater than or equal to the version passed in:

V1.IsAtLeast(V2) // false
V2.IsAtLeast(V1) // true

func (KafkaVersion) String

func (v KafkaVersion) String() string

type LeaveGroupRequest

type LeaveGroupRequest struct {
	Version  int16
	GroupId  string
	MemberId string           // Removed in Version 3
	Members  []MemberIdentity // Added in Version 3
}

type LeaveGroupResponse

type LeaveGroupResponse struct {
	Version      int16
	ThrottleTime int32
	Err          KError
	Members      []MemberResponse
}

type ListGroupsRequest

type ListGroupsRequest struct {
	Version      int16
	StatesFilter []string // version 4 or later
}

type ListGroupsResponse

type ListGroupsResponse struct {
	Version      int16
	ThrottleTime int32
	Err          KError
	Groups       map[string]string
	GroupsData   map[string]GroupData // version 4 or later
}

type ListPartitionReassignmentsRequest

type ListPartitionReassignmentsRequest struct {
	TimeoutMs int32

	Version int16
	// contains filtered or unexported fields
}

func (*ListPartitionReassignmentsRequest) AddBlock

func (r *ListPartitionReassignmentsRequest) AddBlock(topic string, partitionIDs []int32)

type ListPartitionReassignmentsResponse

type ListPartitionReassignmentsResponse struct {
	Version        int16
	ThrottleTimeMs int32
	ErrorCode      KError
	ErrorMessage   *string
	TopicStatus    map[string]map[int32]*PartitionReplicaReassignmentsStatus
}

func (*ListPartitionReassignmentsResponse) AddBlock

func (r *ListPartitionReassignmentsResponse) AddBlock(topic string, partition int32, replicas, addingReplicas, removingReplicas []int32)

type MatchingAcl

type MatchingAcl struct {
	Err    KError
	ErrMsg *string
	Resource
	Acl
}

MatchingAcl is a matching acl type

type MemberIdentity

type MemberIdentity struct {
	MemberId        string
	GroupInstanceId *string
}

type MemberResponse

type MemberResponse struct {
	MemberId        string
	GroupInstanceId *string
	Err             KError
}

type Message

type Message struct {
	Codec            CompressionCodec // codec used to compress the message contents
	CompressionLevel int              // compression level
	LogAppendTime    bool             // the used timestamp is LogAppendTime
	Key              []byte           // the message key, may be nil
	Value            []byte           // the message contents
	Set              *MessageSet      // the message set a message might wrap
	Version          int8             // v1 requires Kafka 0.10
	Timestamp        time.Time        // the timestamp of the message (version 1+ only)
	// contains filtered or unexported fields
}

Message is a kafka message type

type MessageBlock

type MessageBlock struct {
	Offset int64
	Msg    *Message
}

func (*MessageBlock) Messages

func (msb *MessageBlock) Messages() []*MessageBlock

Messages convenience helper which returns either all the messages that are wrapped in this block

type MessageSet

type MessageSet struct {
	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
	OverflowMessage        bool // whether the set on the wire contained an overflow message
	Messages               []*MessageBlock
}

type MetadataRequest

type MetadataRequest struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// Topics contains the topics to fetch metadata for.
	Topics []string
	// AllowAutoTopicCreation contains a If this is true, the broker may auto-create topics that we requested which do not already exist, if it is configured to do so.
	AllowAutoTopicCreation             bool
	IncludeClusterAuthorizedOperations bool // version 8 and up
	IncludeTopicAuthorizedOperations   bool // version 8 and up
}

type MetadataResponse

type MetadataResponse struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ThrottleTimeMs contains the duration in milliseconds for which the Request was throttled due to a quota violation, or zero if the Request did not violate any quota.
	ThrottleTimeMs int32
	// Brokers contains each broker in the response.
	Brokers []*Broker
	// ClusterID contains the cluster ID that responding broker belongs to.
	ClusterID *string
	// ControllerID contains the ID of the controller broker.
	ControllerID int32
	// Topics contains each topic in the response.
	Topics                      []*TopicMetadata
	ClusterAuthorizedOperations int32 // Only valid for Version >= 8
}

func (*MetadataResponse) AddBroker

func (r *MetadataResponse) AddBroker(addr string, id int32)

func (*MetadataResponse) AddTopic

func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata

func (*MetadataResponse) AddTopicPartition

func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError)

type OffsetCommitRequest

type OffsetCommitRequest struct {
	ConsumerGroup           string
	ConsumerGroupGeneration int32   // v1 or later
	ConsumerID              string  // v1 or later
	GroupInstanceId         *string // v7 or later
	RetentionTime           int64   // v2 or later

	// Version can be:
	// - 0 (kafka 0.8.1 and later)
	// - 1 (kafka 0.8.2 and later)
	// - 2 (kafka 0.9.0 and later)
	// - 3 (kafka 0.11.0 and later)
	// - 4 (kafka 2.0.0 and later)
	// - 5&6 (kafka 2.1.0 and later)
	// - 7 (kafka 2.3.0 and later)
	Version int16
	// contains filtered or unexported fields
}

func (*OffsetCommitRequest) AddBlock

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string)

func (*OffsetCommitRequest) AddBlockWithLeaderEpoch

func (r *OffsetCommitRequest) AddBlockWithLeaderEpoch(topic string, partitionID int32, offset int64, leaderEpoch int32, timestamp int64, metadata string)

func (*OffsetCommitRequest) Offset

func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error)

type OffsetCommitResponse

type OffsetCommitResponse struct {
	Version        int16
	ThrottleTimeMs int32
	Errors         map[string]map[int32]KError
}

func (*OffsetCommitResponse) AddError

func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError)

type OffsetFetchRequest

type OffsetFetchRequest struct {
	Version       int16
	ConsumerGroup string
	RequireStable bool // requires v7+
	// contains filtered or unexported fields
}

func (*OffsetFetchRequest) AddPartition

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32)

func (*OffsetFetchRequest) ZeroPartitions

func (r *OffsetFetchRequest) ZeroPartitions()

type OffsetFetchResponse

type OffsetFetchResponse struct {
	Version        int16
	ThrottleTimeMs int32
	Blocks         map[string]map[int32]*OffsetFetchResponseBlock
	Err            KError
}

func (*OffsetFetchResponse) AddBlock

func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock)

func (*OffsetFetchResponse) GetBlock

func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock

type OffsetFetchResponseBlock

type OffsetFetchResponseBlock struct {
	Offset      int64
	LeaderEpoch int32
	Metadata    string
	Err         KError
}

type OffsetRequest

type OffsetRequest struct {
	Version        int16
	IsolationLevel IsolationLevel
	// contains filtered or unexported fields
}

func (*OffsetRequest) AddBlock

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, timestamp int64, maxOffsets int32)

func (*OffsetRequest) ReplicaID

func (r *OffsetRequest) ReplicaID() int32

func (*OffsetRequest) SetReplicaID

func (r *OffsetRequest) SetReplicaID(id int32)

type OffsetResponse

type OffsetResponse struct {
	Version        int16
	ThrottleTimeMs int32
	Blocks         map[string]map[int32]*OffsetResponseBlock
}

func (*OffsetResponse) AddTopicPartition

func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64)

func (*OffsetResponse) GetBlock

func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock

type OffsetResponseBlock

type OffsetResponseBlock struct {
	Err KError
	// Offsets contains the result offsets (for V0/V1 compatibility)
	Offsets []int64 // Version 0
	// Timestamp contains the timestamp associated with the returned offset.
	Timestamp int64 // Version 1
	// Offset contains the returned offset.
	Offset int64 // Version 1
	// LeaderEpoch contains the current leader epoch of the partition.
	LeaderEpoch int32
}

type OwnedPartition

type OwnedPartition struct {
	Topic      string
	Partitions []int32
}

type PacketDecodingError

type PacketDecodingError struct {
	Info string
}

PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response. This can be a bad CRC or length field, or any other invalid value.

func (PacketDecodingError) Error

func (err PacketDecodingError) Error() string

type PacketEncodingError

type PacketEncodingError struct {
	Info string
}

PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, if you try to Encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.

func (PacketEncodingError) Error

func (err PacketEncodingError) Error() string

type PartitionError

type PartitionError struct {
	Partition int32
	Err       KError
}

PartitionError is a partition error type

type PartitionMetadata

type PartitionMetadata struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// Err contains the partition error, or 0 if there was no error.
	Err KError
	// ID contains the partition index.
	ID int32
	// Leader contains the ID of the leader broker.
	Leader int32
	// LeaderEpoch contains the leader epoch of this partition.
	LeaderEpoch int32
	// Replicas contains the set of all nodes that host this partition.
	Replicas []int32
	// Isr contains the set of nodes that are in sync with the leader for this partition.
	Isr []int32
	// OfflineReplicas contains the set of offline replicas of this partition.
	OfflineReplicas []int32
}

PartitionMetadata contains each partition in the topic.

type PartitionOffsetMetadata

type PartitionOffsetMetadata struct {
	// Partition contains the index of the partition within the topic.
	Partition int32
	// Offset contains the message offset to be committed.
	Offset int64
	// LeaderEpoch contains the leader epoch of the last consumed record.
	LeaderEpoch int32
	// Metadata contains any associated metadata the client wants to keep.
	Metadata *string
}

type PartitionReplicaReassignmentsStatus

type PartitionReplicaReassignmentsStatus struct {
	Replicas         []int32
	AddingReplicas   []int32
	RemovingReplicas []int32
}

type ProduceRequest

type ProduceRequest struct {
	TransactionalID *string
	RequiredAcks    RequiredAcks
	Timeout         int32
	Version         int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
	Records         map[string]map[int32]Records
}

func (*ProduceRequest) AddBatch

func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch)

func (*ProduceRequest) AddMessage

func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)

func (*ProduceRequest) AddSet

func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet)

type ProduceResponse

type ProduceResponse struct {
	Blocks       map[string]map[int32]*ProduceResponseBlock // v0, responses
	Version      int16
	ThrottleTime time.Duration // v1, throttle_time_ms
}

func (*ProduceResponse) AddTopicPartition

func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError)

func (*ProduceResponse) GetBlock

func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock

type ProduceResponseBlock

type ProduceResponseBlock struct {
	Err         KError    // v0, error_code
	Offset      int64     // v0, base_offset
	Timestamp   time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
	StartOffset int64     // v5, log_start_offset
}

partition_responses in protocol

type QuotaEntityComponent

type QuotaEntityComponent struct {
	EntityType QuotaEntityType
	MatchType  QuotaMatchType
	Name       string
}

type QuotaEntityType

type QuotaEntityType string
const (
	QuotaEntityUser     QuotaEntityType = "user"
	QuotaEntityClientID QuotaEntityType = "client-id"
)

type QuotaFilterComponent

type QuotaFilterComponent struct {
	EntityType QuotaEntityType
	MatchType  QuotaMatchType
	Match      string
}

Describe a component for applying a client quota filter. EntityType: the entity type the filter component applies to ("user", "client-id", "ip") MatchType: the match type of the filter component (any, exact, default) Match: the name that's matched exactly (used when MatchType is QuotaMatchExact)

type Record

type Record struct {
	Headers []*RecordHeader

	Attributes     int8
	TimestampDelta time.Duration
	OffsetDelta    int64
	Key            []byte
	Value          []byte
	// contains filtered or unexported fields
}

Record is kafka record type

type RecordBatch

type RecordBatch struct {
	FirstOffset           int64
	PartitionLeaderEpoch  int32
	Version               int8
	Codec                 CompressionCodec
	CompressionLevel      int
	Control               bool
	LogAppendTime         bool
	LastOffsetDelta       int32
	FirstTimestamp        time.Time
	MaxTimestamp          time.Time
	ProducerID            int64
	ProducerEpoch         int16
	FirstSequence         int32
	Records               []*Record
	PartialTrailingRecord bool
	IsTransactional       bool
	// contains filtered or unexported fields
}

func (*RecordBatch) LastOffset

func (b *RecordBatch) LastOffset() int64

type RecordHeader

type RecordHeader struct {
	Key   []byte
	Value []byte
}

RecordHeader stores key and value for a record header

type Records

type Records struct {
	MsgSet      *MessageSet
	RecordBatch *RecordBatch
	// contains filtered or unexported fields
}

Records implements a union type containing either a RecordBatch or a legacy MessageSet.

type Request

type Request struct {
	CorrelationID int32
	ClientID      string
	Body          protocolBody
}

func DecodeRequest

func DecodeRequest(r io.Reader) (*Request, int, error)

type RequiredAcks

type RequiredAcks int16

RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid. On broker versions prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced by setting the `min.isr` value in the brokers configuration).

type Resource

type Resource struct {
	ResourceType        AclResourceType
	ResourceName        string
	ResourcePatternType AclResourcePatternType
}

Resource holds information about acl resource type

type ResourceAcls

type ResourceAcls struct {
	Resource
	Acls []*Acl
}

ResourceAcls is an acl resource type

type ResourceResponse

type ResourceResponse struct {
	ErrorCode int16
	ErrorMsg  string
	Type      ConfigResourceType
	Name      string
	Configs   []*ConfigEntry
}

type SaslAuthenticateRequest

type SaslAuthenticateRequest struct {
	// Version defines the protocol version to use for Encode and Decode
	Version       int16
	SaslAuthBytes []byte
}

type SaslAuthenticateResponse

type SaslAuthenticateResponse struct {
	// Version defines the protocol version to use for Encode and Decode
	Version           int16
	Err               KError
	ErrorMessage      *string
	SaslAuthBytes     []byte
	SessionLifetimeMs int64
}

type SaslHandshakeRequest

type SaslHandshakeRequest struct {
	Mechanism string
	Version   int16
}

type SaslHandshakeResponse

type SaslHandshakeResponse struct {
	Version           int16
	Err               KError
	EnabledMechanisms []string
}

type ScramMechanismType

type ScramMechanismType int8

func (ScramMechanismType) String

func (s ScramMechanismType) String() string

type StdLogger

type StdLogger interface {
	Print(v ...interface{})
	Printf(format string, v ...interface{})
	Println(v ...interface{})
}

StdLogger is used to log error messages.

var DebugLogger StdLogger = &debugLogger{}

DebugLogger is the instance of a StdLogger that Sarama writes more verbose debug information to. By default it is set to redirect all debug to the default Logger above, but you can optionally set it to another StdLogger instance to (e.g.,) discard debug information

type StringEncoder

type StringEncoder string

StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.

func (StringEncoder) Encode

func (s StringEncoder) Encode() ([]byte, error)

func (StringEncoder) Length

func (s StringEncoder) Length() int

type SyncGroupRequest

type SyncGroupRequest struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// GroupId contains the unique group identifier.
	GroupId string
	// GenerationId contains the generation of the group.
	GenerationId int32
	// MemberId contains the member ID assigned by the group.
	MemberId string
	// GroupInstanceId contains the unique identifier of the consumer instance provided by end user.
	GroupInstanceId *string
	// GroupAssignments contains each assignment.
	GroupAssignments []SyncGroupRequestAssignment
}

func (*SyncGroupRequest) AddGroupAssignment

func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte)

func (*SyncGroupRequest) AddGroupAssignmentMember

func (r *SyncGroupRequest) AddGroupAssignmentMember(
	memberId string,
	memberAssignment *ConsumerGroupMemberAssignment,
) error

type SyncGroupRequestAssignment

type SyncGroupRequestAssignment struct {
	// MemberId contains the ID of the member to assign.
	MemberId string
	// Assignment contains the member assignment.
	Assignment []byte
}

type SyncGroupResponse

type SyncGroupResponse struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// ThrottleTime contains the duration in milliseconds for which the
	// Request was throttled due to a quota violation, or zero if the Request
	// did not violate any quota.
	ThrottleTime int32
	// Err contains the error code, or 0 if there was no error.
	Err KError
	// MemberAssignment contains the member assignment.
	MemberAssignment []byte
}

func (*SyncGroupResponse) GetMemberAssignment

func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error)

type Timestamp

type Timestamp struct {
	*time.Time
}

type TopicDetail

type TopicDetail struct {
	// NumPartitions contains the number of partitions to create in the topic, or
	// -1 if we are either specifying a manual partition assignment or using the
	// default partitions.
	NumPartitions int32
	// ReplicationFactor contains the number of replicas to create for each
	// partition in the topic, or -1 if we are either specifying a manual
	// partition assignment or using the default replication factor.
	ReplicationFactor int16
	// ReplicaAssignment contains the manual partition assignment, or the empty
	// array if we are using automatic assignment.
	ReplicaAssignment map[int32][]int32
	// ConfigEntries contains the custom topic configurations to set.
	ConfigEntries map[string]*string
}

type TopicError

type TopicError struct {
	Err    KError
	ErrMsg *string
}

func (*TopicError) Error

func (t *TopicError) Error() string

func (*TopicError) Unwrap

func (t *TopicError) Unwrap() error

type TopicMetadata

type TopicMetadata struct {
	// Version defines the protocol version to use for Encode and Decode
	Version int16
	// Err contains the topic error, or 0 if there was no error.
	Err KError
	// Name contains the topic name.
	Name string
	Uuid Uuid
	// IsInternal contains a True if the topic is internal.
	IsInternal bool
	// Partitions contains each partition in the topic.
	Partitions                []*PartitionMetadata
	TopicAuthorizedOperations int32 // Only valid for Version >= 8
}

TopicMetadata contains each topic in the response.

type TopicPartition

type TopicPartition struct {
	Count      int32
	Assignment [][]int32
}

type TopicPartitionError

type TopicPartitionError struct {
	Err    KError
	ErrMsg *string
}

func (*TopicPartitionError) Error

func (t *TopicPartitionError) Error() string

func (*TopicPartitionError) Unwrap

func (t *TopicPartitionError) Unwrap() error

type TxnOffsetCommitRequest

type TxnOffsetCommitRequest struct {
	Version         int16
	TransactionalID string
	GroupID         string
	ProducerID      int64
	ProducerEpoch   int16
	Topics          map[string][]*PartitionOffsetMetadata
}

type TxnOffsetCommitResponse

type TxnOffsetCommitResponse struct {
	Version      int16
	ThrottleTime time.Duration
	Topics       map[string][]*PartitionError
}

type UserScramCredentialsResponseInfo

type UserScramCredentialsResponseInfo struct {
	Mechanism  ScramMechanismType
	Iterations int32
}

type Uuid

type Uuid [16]byte

func (Uuid) String

func (u Uuid) String() string

type ZstdDecoderParams

type ZstdDecoderParams struct {
}

type ZstdEncoderParams

type ZstdEncoderParams struct {
	Level int
}

Source Files

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL