Versions in this module Expand all Collapse all v1 v1.37.2 Jan 6, 2023 Changes in this version + const APIKeySASLAuth + const GSS_API_FINISH + const GSS_API_GENERIC_TAG + const GSS_API_INITIAL + const GSS_API_VERIFY + const GroupGenerationUndefined + const KRB5_KEYTAB_AUTH + const KRB5_USER_AUTH + const MAX_GROUP_INSTANCE_ID_LENGTH + const OffsetNewest + const OffsetOldest + const RangeBalanceStrategyName + const ReceiveTime + const RoundRobinBalanceStrategyName + const SASLExtKeyAuth + const SASLHandshakeV0 + const SASLHandshakeV1 + const SASLTypeGSSAPI + const SASLTypeOAuth + const SASLTypePlaintext + const SASLTypeSCRAMSHA256 + const SASLTypeSCRAMSHA512 + const StickyBalanceStrategyName + const TOK_ID_KRB_AP_REQ + var BalanceStrategyRange = &balanceStrategy + var BalanceStrategyRoundRobin = new(roundRobinBalancer) + var BalanceStrategySticky = &stickyBalanceStrategy + var DefaultVersion = V1_0_0_0 + var ErrAddPartitionsToTxn = errors.New("transaction manager: failed to send partitions to transaction") + var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated") + var ErrBrokerNotFound = errors.New("kafka: broker for ID is not found") + var ErrCannotTransitionNilError = errors.New("transaction manager: cannot transition with a nil error") + var ErrClosedClient = errors.New("kafka: tried to use a client that was closed") + var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed") + var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch") + var ErrControllerNotAvailable = errors.New("kafka: controller is not available") + var ErrCreateACLs = errors.New("kafka server: failed to create one or more ACL rules") + var ErrDeleteRecords = errors.New("kafka server: failed to delete records") + var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks") + var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected") + var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index") + var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") + var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata") + var ErrNonTransactedProducer = errors.New("transaction manager: you need to add TransactionalID to producer") + var ErrNotConnected = errors.New("kafka: broker not connected") + var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to") + var ErrReassignPartitions = errors.New("failed to reassign partitions for topic") + var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down") + var ErrTransactionNotReady = errors.New("transaction manager: transaction is not ready") + var ErrTransitionNotAllowed = errors.New("transaction manager: invalid transition attempted") + var ErrTxnOffsetCommit = errors.New("transaction manager: failed to send offsets to transaction") + var ErrTxnUnableToParseResponse = errors.New("transaction manager: unable to parse response") + var ErrUnknownScramMechanism = errors.New("kafka: unknown SCRAM mechanism provided") + var GROUP_INSTANCE_ID_REGEXP = regexp.MustCompile(`^[0-9a-zA-Z\._\-]+$`) + var Logger StdLogger = log.New(io.Discard, "[Sarama] ", log.LstdFlags) + var MaxRequestSize int32 = 100 * 1024 * 1024 + var MaxResponseSize int32 = 100 * 1024 * 1024 + var MaxVersion = V3_2_3_0 + var MinVersion = V0_8_2_0 + var MultiErrorFormat multierror.ErrorFormatFunc = func(es []error) string + var NoNode = &Broker + var PanicHandler func(interface{}) + var SupportedVersions = []KafkaVersion + var V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) + var V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) + var V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) + var V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) + var V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) + var V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) + var V0_10_2_2 = newKafkaVersion(0, 10, 2, 2) + var V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) + var V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) + var V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) + var V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) + var V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) + var V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) + var V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) + var V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) + var V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) + var V1_0_1_0 = newKafkaVersion(1, 0, 1, 0) + var V1_0_2_0 = newKafkaVersion(1, 0, 2, 0) + var V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) + var V1_1_1_0 = newKafkaVersion(1, 1, 1, 0) + var V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) + var V2_0_1_0 = newKafkaVersion(2, 0, 1, 0) + var V2_1_0_0 = newKafkaVersion(2, 1, 0, 0) + var V2_1_1_0 = newKafkaVersion(2, 1, 1, 0) + var V2_2_0_0 = newKafkaVersion(2, 2, 0, 0) + var V2_2_1_0 = newKafkaVersion(2, 2, 1, 0) + var V2_2_2_0 = newKafkaVersion(2, 2, 2, 0) + var V2_3_0_0 = newKafkaVersion(2, 3, 0, 0) + var V2_3_1_0 = newKafkaVersion(2, 3, 1, 0) + var V2_4_0_0 = newKafkaVersion(2, 4, 0, 0) + var V2_4_1_0 = newKafkaVersion(2, 4, 1, 0) + var V2_5_0_0 = newKafkaVersion(2, 5, 0, 0) + var V2_5_1_0 = newKafkaVersion(2, 5, 1, 0) + var V2_6_0_0 = newKafkaVersion(2, 6, 0, 0) + var V2_6_1_0 = newKafkaVersion(2, 6, 1, 0) + var V2_6_2_0 = newKafkaVersion(2, 6, 2, 0) + var V2_6_3_0 = newKafkaVersion(2, 6, 3, 0) + var V2_7_0_0 = newKafkaVersion(2, 7, 0, 0) + var V2_7_1_0 = newKafkaVersion(2, 7, 1, 0) + var V2_7_2_0 = newKafkaVersion(2, 7, 2, 0) + var V2_8_0_0 = newKafkaVersion(2, 8, 0, 0) + var V2_8_1_0 = newKafkaVersion(2, 8, 1, 0) + var V2_8_2_0 = newKafkaVersion(2, 8, 2, 0) + var V3_0_0_0 = newKafkaVersion(3, 0, 0, 0) + var V3_0_1_0 = newKafkaVersion(3, 0, 1, 0) + var V3_0_2_0 = newKafkaVersion(3, 0, 2, 0) + var V3_1_0_0 = newKafkaVersion(3, 1, 0, 0) + var V3_1_1_0 = newKafkaVersion(3, 1, 1, 0) + var V3_1_2_0 = newKafkaVersion(3, 1, 2, 0) + var V3_2_0_0 = newKafkaVersion(3, 2, 0, 0) + var V3_2_1_0 = newKafkaVersion(3, 2, 1, 0) + var V3_2_2_0 = newKafkaVersion(3, 2, 2, 0) + var V3_2_3_0 = newKafkaVersion(3, 2, 3, 0) + func Wrap(sentinel error, wrapped ...error) sentinelError + type AbortedTransaction struct + FirstOffset int64 + ProducerID int64 + type AccessToken struct + Extensions map[string]string + Token string + type AccessTokenProvider interface + Token func() (*AccessToken, error) + type Acl struct + Host string + Operation AclOperation + PermissionType AclPermissionType + Principal string + type AclCreation struct + type AclCreationResponse struct + Err KError + ErrMsg *string + type AclFilter struct + Host *string + Operation AclOperation + PermissionType AclPermissionType + Principal *string + ResourceName *string + ResourcePatternTypeFilter AclResourcePatternType + ResourceType AclResourceType + Version int + type AclOperation int + const AclOperationAll + const AclOperationAlter + const AclOperationAlterConfigs + const AclOperationAny + const AclOperationClusterAction + const AclOperationCreate + const AclOperationDelete + const AclOperationDescribe + const AclOperationDescribeConfigs + const AclOperationIdempotentWrite + const AclOperationRead + const AclOperationUnknown + const AclOperationWrite + func (a *AclOperation) MarshalText() ([]byte, error) + func (a *AclOperation) String() string + func (a *AclOperation) UnmarshalText(text []byte) error + type AclPermissionType int + const AclPermissionAllow + const AclPermissionAny + const AclPermissionDeny + const AclPermissionUnknown + func (a *AclPermissionType) MarshalText() ([]byte, error) + func (a *AclPermissionType) String() string + func (a *AclPermissionType) UnmarshalText(text []byte) error + type AclResourcePatternType int + const AclPatternAny + const AclPatternLiteral + const AclPatternMatch + const AclPatternPrefixed + const AclPatternUnknown + func (a *AclResourcePatternType) MarshalText() ([]byte, error) + func (a *AclResourcePatternType) String() string + func (a *AclResourcePatternType) UnmarshalText(text []byte) error + type AclResourceType int + const AclResourceAny + const AclResourceCluster + const AclResourceDelegationToken + const AclResourceGroup + const AclResourceTopic + const AclResourceTransactionalID + const AclResourceUnknown + func (a *AclResourceType) MarshalText() ([]byte, error) + func (a *AclResourceType) String() string + func (a *AclResourceType) UnmarshalText(text []byte) error + type AddOffsetsToTxnRequest struct + GroupID string + ProducerEpoch int16 + ProducerID int64 + TransactionalID string + type AddOffsetsToTxnResponse struct + Err KError + ThrottleTime time.Duration + type AddPartitionsToTxnRequest struct + ProducerEpoch int16 + ProducerID int64 + TopicPartitions map[string][]int32 + TransactionalID string + type AddPartitionsToTxnResponse struct + Errors map[string][]*PartitionError + ThrottleTime time.Duration + type AlterClientQuotasEntry struct + Entity []QuotaEntityComponent + Ops []ClientQuotasOp + type AlterClientQuotasEntryResponse struct + Entity []QuotaEntityComponent + ErrorCode KError + ErrorMsg *string + type AlterClientQuotasRequest struct + Entries []AlterClientQuotasEntry + ValidateOnly bool + type AlterClientQuotasResponse struct + Entries []AlterClientQuotasEntryResponse + ThrottleTime time.Duration + type AlterConfigsRequest struct + Resources []*AlterConfigsResource + ValidateOnly bool + type AlterConfigsResource struct + ConfigEntries map[string]*string + Name string + Type ConfigResourceType + type AlterConfigsResourceResponse struct + ErrorCode int16 + ErrorMsg string + Name string + Type ConfigResourceType + type AlterConfigsResponse struct + Resources []*AlterConfigsResourceResponse + ThrottleTime time.Duration + type AlterPartitionReassignmentsRequest struct + TimeoutMs int32 + Version int16 + func (r *AlterPartitionReassignmentsRequest) AddBlock(topic string, partitionID int32, replicas []int32) + type AlterPartitionReassignmentsResponse struct + ErrorCode KError + ErrorMessage *string + Errors map[string]map[int32]*alterPartitionReassignmentsErrorBlock + ThrottleTimeMs int32 + Version int16 + func (r *AlterPartitionReassignmentsResponse) AddError(topic string, partition int32, kerror KError, message *string) + type AlterUserScramCredentialsDelete struct + Mechanism ScramMechanismType + Name string + type AlterUserScramCredentialsRequest struct + Deletions []AlterUserScramCredentialsDelete + Upsertions []AlterUserScramCredentialsUpsert + Version int16 + type AlterUserScramCredentialsResponse struct + Results []*AlterUserScramCredentialsResult + ThrottleTime time.Duration + Version int16 + type AlterUserScramCredentialsResult struct + ErrorCode KError + ErrorMessage *string + User string + type AlterUserScramCredentialsUpsert struct + Iterations int32 + Mechanism ScramMechanismType + Name string + Password []byte + Salt []byte + type ApiVersionsRequest struct + ClientSoftwareName string + ClientSoftwareVersion string + Version int16 + type ApiVersionsResponse struct + ApiKeys []ApiVersionsResponseKey + ErrorCode int16 + ThrottleTimeMs int32 + Version int16 + type ApiVersionsResponseKey struct + ApiKey int16 + MaxVersion int16 + MinVersion int16 + Version int16 + type AsyncProducer interface + AbortTxn func() error + AddMessageToTxn func(msg *ConsumerMessage, groupId string, metadata *string) error + AddOffsetsToTxn func(offsets map[string][]*PartitionOffsetMetadata, groupId string) error + AsyncClose func() + BeginTxn func() error + Close func() error + CommitTxn func() error + Errors func() <-chan *ProducerError + Input func() chan<- *ProducerMessage + IsTransactional func() bool + Successes func() <-chan *ProducerMessage + TxnStatus func() ProducerTxnStatusFlag + func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) + func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) + type BalanceStrategy interface + AssignmentData func(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) + Name func() string + Plan func(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) + type BalanceStrategyPlan map[string]map[string][]int32 + func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) + type Broker struct + func NewBroker(addr string) *Broker + func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) + func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) + func (b *Broker) Addr() string + func (b *Broker) AlterClientQuotas(request *AlterClientQuotasRequest) (*AlterClientQuotasResponse, error) + func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) + func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) + func (b *Broker) AlterUserScramCredentials(req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) + func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) + func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error + func (b *Broker) Close() error + func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) + func (b *Broker) Connected() (bool, error) + func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) + func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) + func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) + func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) + func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) + func (b *Broker) DeleteOffsets(request *DeleteOffsetsRequest) (*DeleteOffsetsResponse, error) + func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) + func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) + func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) + func (b *Broker) DescribeClientQuotas(request *DescribeClientQuotasRequest) (*DescribeClientQuotasResponse, error) + func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) + func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) + func (b *Broker) DescribeLogDirs(request *DescribeLogDirsRequest) (*DescribeLogDirsResponse, error) + func (b *Broker) DescribeUserScramCredentials(req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) + func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) + func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) + func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) + func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) + func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) + func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) + func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) + func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) + func (b *Broker) ID() int32 + func (b *Broker) IncrementalAlterConfigs(request *IncrementalAlterConfigsRequest) (*IncrementalAlterConfigsResponse, error) + func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) + func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) + func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) + func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) + func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) + func (b *Broker) Open(conf *Config) error + func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) + func (b *Broker) Rack() string + func (b *Broker) ResponseSize() int + func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) + func (b *Broker) TLSConnectionState() (state tls.ConnectionState, ok bool) + func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) + type ByteEncoder []byte + func (b ByteEncoder) Encode() ([]byte, error) + func (b ByteEncoder) Length() int + type Client interface + Broker func(brokerID int32) (*Broker, error) + Brokers func() []*Broker + Close func() error + Closed func() bool + Config func() *Config + Controller func() (*Broker, error) + Coordinator func(consumerGroup string) (*Broker, error) + GetOffset func(topic string, partitionID int32, time int64) (int64, error) + InSyncReplicas func(topic string, partitionID int32) ([]int32, error) + InitProducerID func() (*InitProducerIDResponse, error) + Leader func(topic string, partitionID int32) (*Broker, error) + LeastLoadedBroker func() *Broker + OfflineReplicas func(topic string, partitionID int32) ([]int32, error) + Partitions func(topic string) ([]int32, error) + RefreshBrokers func(addrs []string) error + RefreshController func() (*Broker, error) + RefreshCoordinator func(consumerGroup string) error + RefreshMetadata func(topics ...string) error + RefreshTransactionCoordinator func(transactionID string) error + Replicas func(topic string, partitionID int32) ([]int32, error) + Topics func() ([]string, error) + TransactionCoordinator func(transactionID string) (*Broker, error) + WritablePartitions func(topic string) ([]int32, error) + func NewClient(addrs []string, conf *Config) (Client, error) + type ClientQuotasOp struct + Key string + Remove bool + Value float64 + type ClusterAdmin interface + AlterClientQuotas func(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error + AlterConfig func(resourceType ConfigResourceType, name string, entries map[string]*string, ...) error + AlterPartitionReassignments func(topic string, assignment [][]int32) error + Close func() error + Controller func() (*Broker, error) + CreateACL func(resource Resource, acl Acl) error + CreateACLs func([]*ResourceAcls) error + CreatePartitions func(topic string, count int32, assignment [][]int32, validateOnly bool) error + CreateTopic func(topic string, detail *TopicDetail, validateOnly bool) error + DeleteACL func(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) + DeleteConsumerGroup func(group string) error + DeleteConsumerGroupOffset func(group string, topic string, partition int32) error + DeleteRecords func(topic string, partitionOffsets map[int32]int64) error + DeleteTopic func(topic string) error + DeleteUserScramCredentials func(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) + DescribeClientQuotas func(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) + DescribeCluster func() (brokers []*Broker, controllerID int32, err error) + DescribeConfig func(resource ConfigResource) ([]ConfigEntry, error) + DescribeConsumerGroups func(groups []string) ([]*GroupDescription, error) + DescribeLogDirs func(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error) + DescribeTopics func(topics []string) (metadata []*TopicMetadata, err error) + DescribeUserScramCredentials func(users []string) ([]*DescribeUserScramCredentialsResult, error) + IncrementalAlterConfig func(resourceType ConfigResourceType, name string, ...) error + ListAcls func(filter AclFilter) ([]ResourceAcls, error) + ListConsumerGroupOffsets func(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) + ListConsumerGroups func() (map[string]string, error) + ListPartitionReassignments func(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, ...) + ListTopics func() (map[string]TopicDetail, error) + RemoveMemberFromConsumerGroup func(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) + UpsertUserScramCredentials func(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) + func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) + func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) + type CompressionCodec int8 + const CompressionGZIP + const CompressionLZ4 + const CompressionLevelDefault + const CompressionNone + const CompressionSnappy + const CompressionZSTD + func (cc *CompressionCodec) UnmarshalText(text []byte) error + func (cc CompressionCodec) MarshalText() ([]byte, error) + func (cc CompressionCodec) String() string + type Config struct + Admin struct{ ... } + ApiVersionsRequest bool + ChannelBufferSize int + ClientID string + Consumer struct{ ... } + Metadata struct{ ... } + MetricRegistry metrics.Registry + Net struct{ ... } + Producer struct{ ... } + RackID string + Version KafkaVersion + func NewConfig() *Config + func (c *Config) Validate() error + type ConfigEntry struct + Default bool + Name string + ReadOnly bool + Sensitive bool + Source ConfigSource + Synonyms []*ConfigSynonym + Value string + type ConfigResource struct + ConfigNames []string + Name string + Type ConfigResourceType + type ConfigResourceType int8 + const BrokerLoggerResource + const BrokerResource + const TopicResource + const UnknownResource + type ConfigSource int8 + const SourceDefault + const SourceDynamicBroker + const SourceDynamicDefaultBroker + const SourceStaticBroker + const SourceTopic + const SourceUnknown + func (s ConfigSource) String() string + type ConfigSynonym struct + ConfigName string + ConfigValue string + Source ConfigSource + type ConfigurationError string + func (err ConfigurationError) Error() string + type Consumer interface + Close func() error + ConsumePartition func(topic string, partition int32, offset int64) (PartitionConsumer, error) + HighWaterMarks func() map[string]map[int32]int64 + Partitions func(topic string) ([]int32, error) + Pause func(topicPartitions map[string][]int32) + PauseAll func() + Resume func(topicPartitions map[string][]int32) + ResumeAll func() + Topics func() ([]string, error) + func NewConsumer(addrs []string, config *Config) (Consumer, error) + func NewConsumerFromClient(client Client) (Consumer, error) + type ConsumerError struct + Err error + Partition int32 + Topic string + func (ce ConsumerError) Error() string + func (ce ConsumerError) Unwrap() error + type ConsumerErrors []*ConsumerError + func (ce ConsumerErrors) Error() string + type ConsumerGroup interface + Close func() error + Consume func(ctx context.Context, topics []string, handler ConsumerGroupHandler) error + Errors func() <-chan error + Pause func(partitions map[string][]int32) + PauseAll func() + Resume func(partitions map[string][]int32) + ResumeAll func() + func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) + func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) + type ConsumerGroupClaim interface + HighWaterMarkOffset func() int64 + InitialOffset func() int64 + Messages func() <-chan *ConsumerMessage + Partition func() int32 + Topic func() string + type ConsumerGroupHandler interface + Cleanup func(ConsumerGroupSession) error + ConsumeClaim func(ConsumerGroupSession, ConsumerGroupClaim) error + Setup func(ConsumerGroupSession) error + type ConsumerGroupMemberAssignment struct + Topics map[string][]int32 + UserData []byte + Version int16 + type ConsumerGroupMemberMetadata struct + OwnedPartitions []*OwnedPartition + Topics []string + UserData []byte + Version int16 + type ConsumerGroupSession interface + Claims func() map[string][]int32 + Commit func() + Context func() context.Context + GenerationID func() int32 + MarkMessage func(msg *ConsumerMessage, metadata string) + MarkOffset func(topic string, partition int32, offset int64, metadata string) + MemberID func() string + ResetOffset func(topic string, partition int32, offset int64, metadata string) + type ConsumerInterceptor interface + OnConsume func(*ConsumerMessage) + type ConsumerMessage struct + BlockTimestamp time.Time + Headers []*RecordHeader + Key []byte + Offset int64 + Partition int32 + Timestamp time.Time + Topic string + Value []byte + type ConsumerMetadataRequest struct + ConsumerGroup string + type ConsumerMetadataResponse struct + Coordinator *Broker + CoordinatorHost string + CoordinatorID int32 + CoordinatorPort int32 + Err KError + type ControlRecord struct + CoordinatorEpoch int32 + Type ControlRecordType + Version int16 + type ControlRecordType int + const ControlRecordAbort + const ControlRecordCommit + const ControlRecordUnknown + type CoordinatorType int8 + const CoordinatorGroup + const CoordinatorTransaction + type CreateAclsRequest struct + AclCreations []*AclCreation + Version int16 + type CreateAclsResponse struct + AclCreationResponses []*AclCreationResponse + ThrottleTime time.Duration + type CreatePartitionsRequest struct + Timeout time.Duration + TopicPartitions map[string]*TopicPartition + ValidateOnly bool + type CreatePartitionsResponse struct + ThrottleTime time.Duration + TopicPartitionErrors map[string]*TopicPartitionError + type CreateTopicsRequest struct + Timeout time.Duration + TopicDetails map[string]*TopicDetail + ValidateOnly bool + Version int16 + type CreateTopicsResponse struct + ThrottleTime time.Duration + TopicErrors map[string]*TopicError + Version int16 + type DeleteAclsRequest struct + Filters []*AclFilter + Version int + type DeleteAclsResponse struct + FilterResponses []*FilterResponse + ThrottleTime time.Duration + Version int16 + type DeleteGroupsRequest struct + Groups []string + func (r *DeleteGroupsRequest) AddGroup(group string) + type DeleteGroupsResponse struct + GroupErrorCodes map[string]KError + ThrottleTime time.Duration + type DeleteOffsetsRequest struct + Group string + func (r *DeleteOffsetsRequest) AddPartition(topic string, partitionID int32) + type DeleteOffsetsResponse struct + ErrorCode KError + Errors map[string]map[int32]KError + ThrottleTime time.Duration + func (r *DeleteOffsetsResponse) AddError(topic string, partition int32, errorCode KError) + type DeleteRecordsRequest struct + Timeout time.Duration + Topics map[string]*DeleteRecordsRequestTopic + type DeleteRecordsRequestTopic struct + PartitionOffsets map[int32]int64 + type DeleteRecordsResponse struct + ThrottleTime time.Duration + Topics map[string]*DeleteRecordsResponseTopic + Version int16 + type DeleteRecordsResponsePartition struct + Err KError + LowWatermark int64 + type DeleteRecordsResponseTopic struct + Partitions map[int32]*DeleteRecordsResponsePartition + type DeleteTopicsRequest struct + Timeout time.Duration + Topics []string + Version int16 + type DeleteTopicsResponse struct + ThrottleTime time.Duration + TopicErrorCodes map[string]KError + Version int16 + type DescribeAclsRequest struct + Version int + type DescribeAclsResponse struct + Err KError + ErrMsg *string + ResourceAcls []*ResourceAcls + ThrottleTime time.Duration + Version int16 + type DescribeClientQuotasEntry struct + Entity []QuotaEntityComponent + Values map[string]float64 + type DescribeClientQuotasRequest struct + Components []QuotaFilterComponent + Strict bool + type DescribeClientQuotasResponse struct + Entries []DescribeClientQuotasEntry + ErrorCode KError + ErrorMsg *string + ThrottleTime time.Duration + type DescribeConfigsRequest struct + IncludeSynonyms bool + Resources []*ConfigResource + Version int16 + type DescribeConfigsResponse struct + Resources []*ResourceResponse + ThrottleTime time.Duration + Version int16 + type DescribeGroupsRequest struct + Groups []string + IncludeAuthorizedOperations bool + Version int16 + func (r *DescribeGroupsRequest) AddGroup(group string) + type DescribeGroupsResponse struct + Groups []*GroupDescription + ThrottleTimeMs int32 + Version int16 + type DescribeLogDirsRequest struct + DescribeTopics []DescribeLogDirsRequestTopic + Version int16 + type DescribeLogDirsRequestTopic struct + PartitionIDs []int32 + Topic string + type DescribeLogDirsResponse struct + LogDirs []DescribeLogDirsResponseDirMetadata + ThrottleTime time.Duration + Version int16 + type DescribeLogDirsResponseDirMetadata struct + ErrorCode KError + Path string + Topics []DescribeLogDirsResponseTopic + type DescribeLogDirsResponsePartition struct + IsTemporary bool + OffsetLag int64 + PartitionID int32 + Size int64 + type DescribeLogDirsResponseTopic struct + Partitions []DescribeLogDirsResponsePartition + Topic string + type DescribeUserScramCredentialsRequest struct + DescribeUsers []DescribeUserScramCredentialsRequestUser + Version int16 + type DescribeUserScramCredentialsRequestUser struct + Name string + type DescribeUserScramCredentialsResponse struct + ErrorCode KError + ErrorMessage *string + Results []*DescribeUserScramCredentialsResult + ThrottleTime time.Duration + Version int16 + type DescribeUserScramCredentialsResult struct + CredentialInfos []*UserScramCredentialsResponseInfo + ErrorCode KError + ErrorMessage *string + User string + type DynamicConsistencyPartitioner interface + MessageRequiresConsistency func(message *ProducerMessage) bool + type Encoder interface + Encode func() ([]byte, error) + Length func() int + type EndTxnRequest struct + ProducerEpoch int16 + ProducerID int64 + TransactionResult bool + TransactionalID string + type EndTxnResponse struct + Err KError + ThrottleTime time.Duration + type FetchRequest struct + Isolation IsolationLevel + MaxBytes int32 + MaxWaitTime int32 + MinBytes int32 + RackID string + SessionEpoch int32 + SessionID int32 + Version int16 + func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) + type FetchResponse struct + Blocks map[string]map[int32]*FetchResponseBlock + ErrorCode int16 + LogAppendTime bool + SessionID int32 + ThrottleTime time.Duration + Timestamp time.Time + Version int16 + func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, ...) + func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, ...) + func (r *FetchResponse) AddError(topic string, partition int32, err KError) + func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) + func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...) + func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) + func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, ...) + func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...) + func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...) + func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock + func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) + func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) + type FetchResponseBlock struct + AbortedTransactions []*AbortedTransaction + Err KError + HighWaterMarkOffset int64 + LastRecordsBatchOffset *int64 + LastStableOffset int64 + LogStartOffset int64 + Partial bool + PreferredReadReplica int32 + Records *Records + RecordsSet []*Records + type FilterResponse struct + Err KError + ErrMsg *string + MatchingAcls []*MatchingAcl + type FindCoordinatorRequest struct + CoordinatorKey string + CoordinatorType CoordinatorType + Version int16 + type FindCoordinatorResponse struct + Coordinator *Broker + Err KError + ErrMsg *string + ThrottleTime time.Duration + Version int16 + type GSSAPIConfig struct + AuthType int + DisablePAFXFAST bool + KerberosConfigPath string + KeyTabPath string + Password string + Realm string + ServiceName string + Username string + type GSSAPIKerberosAuth struct + Config *GSSAPIConfig + NewKerberosClientFunc func(config *GSSAPIConfig) (KerberosClient, error) + func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error + type GSSApiHandlerFunc func([]byte) []byte + type GroupDescription struct + AuthorizedOperations int32 + Err KError + ErrorCode int16 + GroupId string + Members map[string]*GroupMemberDescription + Protocol string + ProtocolType string + State string + Version int16 + type GroupMember struct + GroupInstanceId *string + MemberId string + Metadata []byte + type GroupMemberDescription struct + ClientHost string + ClientId string + GroupInstanceId *string + MemberAssignment []byte + MemberId string + MemberMetadata []byte + Version int16 + func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) + func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) + type GroupProtocol struct + Metadata []byte + Name string + type HashPartitionerOption func(*hashPartitioner) + func WithAbsFirst() HashPartitionerOption + func WithCustomFallbackPartitioner(randomHP Partitioner) HashPartitionerOption + func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption + type HeartbeatRequest struct + GenerationId int32 + GroupId string + GroupInstanceId *string + MemberId string + Version int16 + type HeartbeatResponse struct + Err KError + ThrottleTime int32 + Version int16 + type IncrementalAlterConfigsEntry struct + Operation IncrementalAlterConfigsOperation + Value *string + type IncrementalAlterConfigsOperation int8 + const IncrementalAlterConfigsOperationAppend + const IncrementalAlterConfigsOperationDelete + const IncrementalAlterConfigsOperationSet + const IncrementalAlterConfigsOperationSubtract + type IncrementalAlterConfigsRequest struct + Resources []*IncrementalAlterConfigsResource + ValidateOnly bool + type IncrementalAlterConfigsResource struct + ConfigEntries map[string]IncrementalAlterConfigsEntry + Name string + Type ConfigResourceType + type IncrementalAlterConfigsResponse struct + Resources []*AlterConfigsResourceResponse + ThrottleTime time.Duration + type InitProducerIDRequest struct + ProducerEpoch int16 + ProducerID int64 + TransactionTimeout time.Duration + TransactionalID *string + Version int16 + type InitProducerIDResponse struct + Err KError + ProducerEpoch int16 + ProducerID int64 + ThrottleTime time.Duration + Version int16 + type IsolationLevel int8 + const ReadCommitted + const ReadUncommitted + type JoinGroupRequest struct + GroupId string + GroupInstanceId *string + GroupProtocols map[string][]byte + MemberId string + OrderedGroupProtocols []*GroupProtocol + ProtocolType string + RebalanceTimeout int32 + SessionTimeout int32 + Version int16 + func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) + func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error + type JoinGroupResponse struct + Err KError + GenerationId int32 + GroupProtocol string + LeaderId string + MemberId string + Members []GroupMember + ThrottleTime int32 + Version int16 + func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) + type KError int16 + const ErrBrokerNotAvailable + const ErrClusterAuthorizationFailed + const ErrConcurrentTransactions + const ErrConsumerCoordinatorNotAvailable + const ErrDelegationTokenAuthDisabled + const ErrDelegationTokenAuthorizationFailed + const ErrDelegationTokenExpired + const ErrDelegationTokenNotFound + const ErrDelegationTokenOwnerMismatch + const ErrDelegationTokenRequestNotAllowed + const ErrDuplicateSequenceNumber + const ErrElectionNotNeeded + const ErrEligibleLeadersNotAvailable + const ErrFencedInstancedId + const ErrFencedLeaderEpoch + const ErrFetchSessionIDNotFound + const ErrGroupAuthorizationFailed + const ErrGroupIDNotFound + const ErrGroupMaxSizeReached + const ErrGroupSubscribedToTopic + const ErrIllegalGeneration + const ErrIllegalSASLState + const ErrInconsistentGroupProtocol + const ErrInvalidCommitOffsetSize + const ErrInvalidConfig + const ErrInvalidFetchSessionEpoch + const ErrInvalidGroupId + const ErrInvalidMessage + const ErrInvalidMessageSize + const ErrInvalidPartitions + const ErrInvalidPrincipalType + const ErrInvalidProducerEpoch + const ErrInvalidProducerIDMapping + const ErrInvalidRecord + const ErrInvalidReplicaAssignment + const ErrInvalidReplicationFactor + const ErrInvalidRequest + const ErrInvalidRequiredAcks + const ErrInvalidSessionTimeout + const ErrInvalidTimestamp + const ErrInvalidTopic + const ErrInvalidTransactionTimeout + const ErrInvalidTxnState + const ErrKafkaStorageError + const ErrLeaderNotAvailable + const ErrListenerNotFound + const ErrLogDirNotFound + const ErrMemberIdRequired + const ErrMessageSetSizeTooLarge + const ErrMessageSizeTooLarge + const ErrNetworkException + const ErrNoError + const ErrNoReassignmentInProgress + const ErrNonEmptyGroup + const ErrNotController + const ErrNotCoordinatorForConsumer + const ErrNotEnoughReplicas + const ErrNotEnoughReplicasAfterAppend + const ErrNotLeaderForPartition + const ErrOffsetMetadataTooLarge + const ErrOffsetNotAvailable + const ErrOffsetOutOfRange + const ErrOffsetsLoadInProgress + const ErrOperationNotAttempted + const ErrOutOfOrderSequenceNumber + const ErrPolicyViolation + const ErrPreferredLeaderNotAvailable + const ErrProducerFenced + const ErrReassignmentInProgress + const ErrRebalanceInProgress + const ErrReplicaNotAvailable + const ErrRequestTimedOut + const ErrSASLAuthenticationFailed + const ErrSecurityDisabled + const ErrStaleBrokerEpoch + const ErrStaleControllerEpochCode + const ErrThrottlingQuotaExceeded + const ErrTopicAlreadyExists + const ErrTopicAuthorizationFailed + const ErrTopicDeletionDisabled + const ErrTransactionCoordinatorFenced + const ErrTransactionalIDAuthorizationFailed + const ErrUnknown + const ErrUnknownLeaderEpoch + const ErrUnknownMemberId + const ErrUnknownProducerID + const ErrUnknownTopicOrPartition + const ErrUnstableOffsetCommit + const ErrUnsupportedCompressionType + const ErrUnsupportedForMessageFormat + const ErrUnsupportedSASLMechanism + const ErrUnsupportedVersion + func (err KError) Error() string + type KafkaGSSAPIHandler struct + func (h *KafkaGSSAPIHandler) MockKafkaGSSAPI(buffer []byte) []byte + type KafkaVersion struct + func ParseKafkaVersion(s string) (KafkaVersion, error) + func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool + func (v KafkaVersion) String() string + type KerberosClient interface + CName func() types.PrincipalName + Destroy func() + Domain func() string + GetServiceTicket func(spn string) (messages.Ticket, types.EncryptionKey, error) + Login func() error + func NewKerberosClient(config *GSSAPIConfig) (KerberosClient, error) + type KerberosGoKrb5Client struct + func (c *KerberosGoKrb5Client) CName() types.PrincipalName + func (c *KerberosGoKrb5Client) Domain() string + type LeaveGroupRequest struct + GroupId string + MemberId string + Members []MemberIdentity + Version int16 + type LeaveGroupResponse struct + Err KError + Members []MemberResponse + ThrottleTime int32 + Version int16 + type ListGroupsRequest struct + type ListGroupsResponse struct + Err KError + Groups map[string]string + type ListPartitionReassignmentsRequest struct + TimeoutMs int32 + Version int16 + func (r *ListPartitionReassignmentsRequest) AddBlock(topic string, partitionIDs []int32) + type ListPartitionReassignmentsResponse struct + ErrorCode KError + ErrorMessage *string + ThrottleTimeMs int32 + TopicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus + Version int16 + func (r *ListPartitionReassignmentsResponse) AddBlock(topic string, partition int32, ...) + type MatchingAcl struct + Err KError + ErrMsg *string + type MemberIdentity struct + GroupInstanceId *string + MemberId string + type MemberResponse struct + Err KError + GroupInstanceId *string + MemberId string + type Message struct + Codec CompressionCodec + CompressionLevel int + Key []byte + LogAppendTime bool + Set *MessageSet + Timestamp time.Time + Value []byte + Version int8 + type MessageBlock struct + Msg *Message + Offset int64 + func (msb *MessageBlock) Messages() []*MessageBlock + type MessageSet struct + Messages []*MessageBlock + OverflowMessage bool + PartialTrailingMessage bool + type MetadataRequest struct + AllowAutoTopicCreation bool + Topics []string + Version int16 + type MetadataResponse struct + Brokers []*Broker + ClusterID *string + ControllerID int32 + ThrottleTimeMs int32 + Topics []*TopicMetadata + Version int16 + func (r *MetadataResponse) AddBroker(addr string, id int32) + func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata + func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, ...) + type MockAlterConfigsResponse struct + func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse + func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockAlterConfigsResponseWithErrorCode struct + func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode + func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader + type MockAlterPartitionReassignmentsResponse struct + func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse + func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockApiVersionsResponse struct + func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse + func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader + func (m *MockApiVersionsResponse) SetApiKeys(apiKeys []ApiVersionsResponseKey) *MockApiVersionsResponse + type MockBroker struct + func NewMockBroker(t TestReporter, brokerID int32) *MockBroker + func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker + func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker + func (b *MockBroker) Addr() string + func (b *MockBroker) BrokerID() int32 + func (b *MockBroker) Close() + func (b *MockBroker) History() []RequestResponse + func (b *MockBroker) Port() int32 + func (b *MockBroker) Returns(e encoderWithHeader) + func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) + func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) + func (b *MockBroker) SetLatency(latency time.Duration) + func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) + type MockConsumerMetadataResponse struct + func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse + func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader + func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse + func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse + type MockCreateAclsResponse struct + func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse + func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockCreateAclsResponseError struct + func NewMockCreateAclsResponseWithError(t TestReporter) *MockCreateAclsResponseError + func (mr *MockCreateAclsResponseError) For(reqBody versionedDecoder) encoderWithHeader + type MockCreatePartitionsResponse struct + func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse + func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockCreateTopicsResponse struct + func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse + func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockDeleteAclsResponse struct + func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse + func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockDeleteGroupsResponse struct + func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse + func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader + func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse + type MockDeleteOffsetResponse struct + func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse + func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader + func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse + type MockDeleteRecordsResponse struct + func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse + func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockDeleteTopicsResponse struct + func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse + func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockDescribeConfigsResponse struct + func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse + func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockDescribeConfigsResponseWithErrorCode struct + func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode + func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader + type MockDescribeGroupsResponse struct + func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse + func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse + func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockDescribeLogDirsResponse struct + func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse + func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader + func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse + type MockFetchResponse struct + func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse + func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader + func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse + func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse + func (mfr *MockFetchResponse) SetMessageWithKey(topic string, partition int32, offset int64, key, msg Encoder) *MockFetchResponse + type MockFindCoordinatorResponse struct + func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse + func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader + func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse + func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse + type MockHeartbeatResponse struct + Err KError + func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse + func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader + func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse + type MockIncrementalAlterConfigsResponse struct + func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse + func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockIncrementalAlterConfigsResponseWithErrorCode struct + func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode + func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader + type MockJoinGroupResponse struct + Err KError + GenerationId int32 + GroupProtocol string + LeaderId string + MemberId string + Members []GroupMember + ThrottleTime int32 + func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse + func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader + func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse + func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse + func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse + func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse + func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse + func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse + func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse + type MockKerberosClient struct + ASRep messages.ASRep + func (c *MockKerberosClient) CName() types.PrincipalName + func (c *MockKerberosClient) Destroy() + func (c *MockKerberosClient) Domain() string + func (c *MockKerberosClient) GetServiceTicket(spn string) (messages.Ticket, types.EncryptionKey, error) + func (c *MockKerberosClient) Login() error + type MockLeaveGroupResponse struct + Err KError + func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse + func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader + func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse + type MockListAclsResponse struct + func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse + func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockListGroupsResponse struct + func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse + func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse + func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockListPartitionReassignmentsResponse struct + func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse + func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader + type MockMetadataResponse struct + func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse + func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader + func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse + func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse + func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse + type MockOffsetCommitResponse struct + func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse + func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader + func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse + type MockOffsetFetchResponse struct + func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse + func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader + func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse + func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, ...) *MockOffsetFetchResponse + type MockOffsetResponse struct + func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse + func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader + func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse + type MockProduceResponse struct + func NewMockProduceResponse(t TestReporter) *MockProduceResponse + func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader + func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse + func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse + type MockResponse interface + For func(reqBody versionedDecoder) (res encoderWithHeader) + type MockSaslAuthenticateResponse struct + func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse + func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader + func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse + func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse + func (msar *MockSaslAuthenticateResponse) SetSessionLifetimeMs(sessionLifetimeMs int64) *MockSaslAuthenticateResponse + type MockSaslHandshakeResponse struct + func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse + func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader + func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse + func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse + type MockSequence struct + func NewMockSequence(responses ...interface{}) *MockSequence + func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) + type MockSyncGroupResponse struct + Err KError + MemberAssignment []byte + func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse + func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader + func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse + func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse + type MockWrapper struct + func NewMockWrapper(res encoderWithHeader) *MockWrapper + func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) + type OffsetCommitRequest struct + ConsumerGroup string + ConsumerGroupGeneration int32 + ConsumerID string + GroupInstanceId *string + RetentionTime int64 + Version int16 + func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, leaderEpoch int32, ...) + func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) + type OffsetCommitResponse struct + Errors map[string]map[int32]KError + ThrottleTimeMs int32 + Version int16 + func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) + type OffsetFetchRequest struct + ConsumerGroup string + RequireStable bool + Version int16 + func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) + func (r *OffsetFetchRequest) ZeroPartitions() + type OffsetFetchResponse struct + Blocks map[string]map[int32]*OffsetFetchResponseBlock + Err KError + ThrottleTimeMs int32 + Version int16 + func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) + func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock + type OffsetFetchResponseBlock struct + Err KError + LeaderEpoch int32 + Metadata string + Offset int64 + type OffsetManager interface + Close func() error + Commit func() + ManagePartition func(topic string, partition int32) (PartitionOffsetManager, error) + func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) + type OffsetRequest struct + IsolationLevel IsolationLevel + Version int16 + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) + func (r *OffsetRequest) ReplicaID() int32 + func (r *OffsetRequest) SetReplicaID(id int32) + type OffsetResponse struct + Blocks map[string]map[int32]*OffsetResponseBlock + ThrottleTimeMs int32 + Version int16 + func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) + func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock + type OffsetResponseBlock struct + Err KError + Offset int64 + Offsets []int64 + Timestamp int64 + type OwnedPartition struct + Partitions []int32 + Topic string + type PacketDecodingError struct + Info string + func (err PacketDecodingError) Error() string + type PacketEncodingError struct + Info string + func (err PacketEncodingError) Error() string + type PartitionConsumer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + HighWaterMarkOffset func() int64 + IsPaused func() bool + Messages func() <-chan *ConsumerMessage + Pause func() + Resume func() + type PartitionError struct + Err KError + Partition int32 + type PartitionMetadata struct + Err KError + ID int32 + Isr []int32 + Leader int32 + OfflineReplicas []int32 + Replicas []int32 + type PartitionOffsetManager interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + MarkOffset func(offset int64, metadata string) + NextOffset func() (int64, string) + ResetOffset func(offset int64, metadata string) + type PartitionOffsetMetadata struct + Metadata *string + Offset int64 + Partition int32 + type PartitionReplicaReassignmentsStatus struct + AddingReplicas []int32 + RemovingReplicas []int32 + Replicas []int32 + type Partitioner interface + Partition func(message *ProducerMessage, numPartitions int32) (int32, error) + RequiresConsistency func() bool + func NewHashPartitioner(topic string) Partitioner + func NewManualPartitioner(topic string) Partitioner + func NewRandomPartitioner(topic string) Partitioner + func NewReferenceHashPartitioner(topic string) Partitioner + func NewRoundRobinPartitioner(topic string) Partitioner + type PartitionerConstructor func(topic string) Partitioner + func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor + func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor + type ProduceCallback func(*ProduceResponse, error) + type ProduceRequest struct + RequiredAcks RequiredAcks + Timeout int32 + TransactionalID *string + Version int16 + func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) + func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) + func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) + type ProduceResponse struct + Blocks map[string]map[int32]*ProduceResponseBlock + ThrottleTime time.Duration + Version int16 + func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) + func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock + type ProduceResponseBlock struct + Err KError + Offset int64 + StartOffset int64 + Timestamp time.Time + type ProducerError struct + Err error + Msg *ProducerMessage + func (pe ProducerError) Error() string + func (pe ProducerError) Unwrap() error + type ProducerErrors []*ProducerError + func (pe ProducerErrors) Error() string + type ProducerInterceptor interface + OnSend func(*ProducerMessage) + type ProducerMessage struct + Headers []RecordHeader + Key Encoder + Metadata interface{} + Offset int64 + Partition int32 + Timestamp time.Time + Topic string + Value Encoder + func (m *ProducerMessage) ByteSize(version int) int + type ProducerTxnStatusFlag int16 + const ProducerTxnFlagAbortableError + const ProducerTxnFlagAbortingTransaction + const ProducerTxnFlagCommittingTransaction + const ProducerTxnFlagEndTransaction + const ProducerTxnFlagFatalError + const ProducerTxnFlagInError + const ProducerTxnFlagInTransaction + const ProducerTxnFlagInitializing + const ProducerTxnFlagReady + const ProducerTxnFlagUninitialized + func (s ProducerTxnStatusFlag) String() string + type QuotaEntityComponent struct + EntityType QuotaEntityType + MatchType QuotaMatchType + Name string + type QuotaEntityType string + const QuotaEntityClientID + const QuotaEntityIP + const QuotaEntityUser + type QuotaFilterComponent struct + EntityType QuotaEntityType + Match string + MatchType QuotaMatchType + type QuotaMatchType int + const QuotaMatchAny + const QuotaMatchDefault + const QuotaMatchExact + type Record struct + Attributes int8 + Headers []*RecordHeader + Key []byte + OffsetDelta int64 + TimestampDelta time.Duration + Value []byte + type RecordBatch struct + Codec CompressionCodec + CompressionLevel int + Control bool + FirstOffset int64 + FirstSequence int32 + FirstTimestamp time.Time + IsTransactional bool + LastOffsetDelta int32 + LogAppendTime bool + MaxTimestamp time.Time + PartialTrailingRecord bool + PartitionLeaderEpoch int32 + ProducerEpoch int16 + ProducerID int64 + Records []*Record + Version int8 + func (b *RecordBatch) LastOffset() int64 + type RecordHeader struct + Key []byte + Value []byte + type Records struct + MsgSet *MessageSet + RecordBatch *RecordBatch + type RequestNotifierFunc func(bytesRead, bytesWritten int) + type RequestResponse struct + Request protocolBody + Response encoder + type RequiredAcks int16 + const NoResponse + const WaitForAll + const WaitForLocal + type Resource struct + ResourceName string + ResourcePatternType AclResourcePatternType + ResourceType AclResourceType + type ResourceAcls struct + Acls []*Acl + type ResourceResponse struct + Configs []*ConfigEntry + ErrorCode int16 + ErrorMsg string + Name string + Type ConfigResourceType + type SASLMechanism string + type SCRAMClient interface + Begin func(userName, password, authzID string) error + Done func() bool + Step func(challenge string) (response string, err error) + type SaslAuthenticateRequest struct + SaslAuthBytes []byte + Version int16 + type SaslAuthenticateResponse struct + Err KError + ErrorMessage *string + SaslAuthBytes []byte + SessionLifetimeMs int64 + Version int16 + type SaslHandshakeRequest struct + Mechanism string + Version int16 + type SaslHandshakeResponse struct + EnabledMechanisms []string + Err KError + type ScramMechanismType int8 + const SCRAM_MECHANISM_SHA_256 + const SCRAM_MECHANISM_SHA_512 + const SCRAM_MECHANISM_UNKNOWN + func (s ScramMechanismType) String() string + type StdLogger interface + Print func(v ...interface{}) + Printf func(format string, v ...interface{}) + Println func(v ...interface{}) + var DebugLogger StdLogger = &debugLogger{} + type StickyAssignorUserData interface + type StickyAssignorUserDataV0 struct + Topics map[string][]int32 + type StickyAssignorUserDataV1 struct + Generation int32 + Topics map[string][]int32 + type StringEncoder string + func (s StringEncoder) Encode() ([]byte, error) + func (s StringEncoder) Length() int + type SyncGroupRequest struct + GenerationId int32 + GroupAssignments []SyncGroupRequestAssignment + GroupId string + GroupInstanceId *string + MemberId string + Version int16 + func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) + func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error + type SyncGroupRequestAssignment struct + Assignment []byte + MemberId string + type SyncGroupResponse struct + Err KError + MemberAssignment []byte + ThrottleTime int32 + Version int16 + func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) + type SyncProducer interface + AbortTxn func() error + AddMessageToTxn func(msg *ConsumerMessage, groupId string, metadata *string) error + AddOffsetsToTxn func(offsets map[string][]*PartitionOffsetMetadata, groupId string) error + BeginTxn func() error + Close func() error + CommitTxn func() error + IsTransactional func() bool + SendMessage func(msg *ProducerMessage) (partition int32, offset int64, err error) + SendMessages func(msgs []*ProducerMessage) error + TxnStatus func() ProducerTxnStatusFlag + func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) + func NewSyncProducerFromClient(client Client) (SyncProducer, error) + type TestReporter interface + Error func(...interface{}) + Errorf func(string, ...interface{}) + Fatal func(...interface{}) + Fatalf func(string, ...interface{}) + type Timestamp struct + type TopicDetail struct + ConfigEntries map[string]*string + NumPartitions int32 + ReplicaAssignment map[int32][]int32 + ReplicationFactor int16 + type TopicError struct + Err KError + ErrMsg *string + func (t *TopicError) Error() string + func (t *TopicError) Unwrap() error + type TopicMetadata struct + Err KError + IsInternal bool + Name string + Partitions []*PartitionMetadata + type TopicPartition struct + Assignment [][]int32 + Count int32 + type TopicPartitionError struct + Err KError + ErrMsg *string + func (t *TopicPartitionError) Error() string + func (t *TopicPartitionError) Unwrap() error + type TxnOffsetCommitRequest struct + GroupID string + ProducerEpoch int16 + ProducerID int64 + Topics map[string][]*PartitionOffsetMetadata + TransactionalID string + type TxnOffsetCommitResponse struct + ThrottleTime time.Duration + Topics map[string][]*PartitionError + type UserScramCredentialsResponseInfo struct + Iterations int32 + Mechanism ScramMechanismType + type ZstdDecoderParams struct + type ZstdEncoderParams struct + Level int