Versions in this module Expand all Collapse all v1 v1.1.1 Feb 4, 2020 v1.1.0 Dec 16, 2019 Changes in this version + const APIKeySASLAuth + const GroupGenerationUndefined + const OffsetNewest + const OffsetOldest + const ReceiveTime + const SASLExtKeyAuth + const SASLHandshakeV0 + const SASLHandshakeV1 + const SASLTypeOAuth + const SASLTypePlaintext + const SASLTypeSCRAMSHA256 + const SASLTypeSCRAMSHA512 + var BalanceStrategyRange = &balanceStrategy + var BalanceStrategyRoundRobin = &balanceStrategy + var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated") + var ErrClosedClient = errors.New("kafka: tried to use a client that was closed") + var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed") + var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch") + var ErrControllerNotAvailable = errors.New("kafka: controller is not available") + var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks") + var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected") + var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index") + var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") + var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata") + var ErrNotConnected = errors.New("kafka: broker not connected") + var ErrOutOfBrokers = errors.New(...) + var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down") + var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) + var MaxRequestSize int32 = 100 * 1024 * 1024 + var MaxResponseSize int32 = 100 * 1024 * 1024 + var MaxVersion = V2_2_0_0 + var MinVersion = V0_8_2_0 + var NoNode = &Broker + var PanicHandler func(interface{}) + var SupportedVersions = []KafkaVersion + var V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) + var V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) + var V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) + var V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) + var V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) + var V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) + var V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) + var V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) + var V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) + var V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) + var V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) + var V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) + var V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) + var V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) + var V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) + var V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) + var V1_1_1_0 = newKafkaVersion(1, 1, 1, 0) + var V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) + var V2_0_1_0 = newKafkaVersion(2, 0, 1, 0) + var V2_1_0_0 = newKafkaVersion(2, 1, 0, 0) + var V2_2_0_0 = newKafkaVersion(2, 2, 0, 0) + type ACL struct + Host string + Operation ACLOperation + PermissionType ACLPermissionType + Principal string + type ACLCreation struct + type ACLCreationResponse struct + Err KError + ErrMsg *string + type ACLFilter struct + Host *string + Operation ACLOperation + PermissionType ACLPermissionType + Principal *string + ResourceName *string + ResourcePatternTypeFilter ACLResourcePatternType + ResourceType ACLResourceType + Version int + type ACLOperation int + const ACLOperationWrite + type ACLPermissionType int + const ACLPermissionAllow + type ACLResourcePatternType int + const ACLPatternLiteral + const ACLPatternUnknown + type ACLResourceType int + const ACLResourceTopic + type APIVersionsRequest struct + type APIVersionsResponse struct + APIVersions []*APIVersionsResponseBlock + Err KError + type APIVersionsResponseBlock struct + APIKey int16 + MaxVersion int16 + MinVersion int16 + type AbortedTransaction struct + FirstOffset int64 + ProducerID int64 + type AccessToken struct + Extensions map[string]string + Token string + type AccessTokenProvider interface + Token func() (*AccessToken, error) + type AddOffsetsToTxnRequest struct + GroupID string + ProducerEpoch int16 + ProducerID int64 + TransactionalID string + type AddOffsetsToTxnResponse struct + Err KError + ThrottleTime time.Duration + type AddPartitionsToTxnRequest struct + ProducerEpoch int16 + ProducerID int64 + TopicPartitions map[string][]int32 + TransactionalID string + type AddPartitionsToTxnResponse struct + Errors map[string][]*PartitionError + ThrottleTime time.Duration + type AlterConfigsRequest struct + Resources []*AlterConfigsResource + ValidateOnly bool + type AlterConfigsResource struct + ConfigEntries map[string]*string + Name string + Type ConfigResourceType + type AlterConfigsResourceResponse struct + ErrorCode int16 + ErrorMsg string + Name string + Type ConfigResourceType + type AlterConfigsResponse struct + Resources []*AlterConfigsResourceResponse + ThrottleTime time.Duration + type AsyncProducer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ProducerError + Input func() chan<- *ProducerMessage + Successes func() <-chan *ProducerMessage + func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) + func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) + type BalanceStrategy interface + Name func() string + Plan func(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) + type BalanceStrategyPlan map[string]map[string][]int32 + func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) + type Broker struct + func NewBroker(addr string) *Broker + func (b *Broker) APIVersions(request *APIVersionsRequest) (*APIVersionsResponse, error) + func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) + func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) + func (b *Broker) Addr() string + func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) + func (b *Broker) Close() error + func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) + func (b *Broker) Connected() (bool, error) + func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) + func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) + func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) + func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) + func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) + func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) + func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) + func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) + func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) + func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) + func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) + func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) + func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) + func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) + func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) + func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) + func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) + func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) + func (b *Broker) ID() int32 + func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) + func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) + func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) + func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) + func (b *Broker) Open(conf *Config) error + func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) + func (b *Broker) Rack() string + func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) + func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) + type ByteEncoder []byte + func (b ByteEncoder) Encode() ([]byte, error) + func (b ByteEncoder) Length() int + type Client interface + Brokers func() []*Broker + Close func() error + Closed func() bool + Config func() *Config + Controller func() (*Broker, error) + Coordinator func(consumerGroup string) (*Broker, error) + GetOffset func(topic string, partitionID int32, time int64) (int64, error) + InSyncReplicas func(topic string, partitionID int32) ([]int32, error) + InitProducerID func() (*InitProducerIDResponse, error) + Leader func(topic string, partitionID int32) (*Broker, error) + OfflineReplicas func(topic string, partitionID int32) ([]int32, error) + Partitions func(topic string) ([]int32, error) + RefreshCoordinator func(consumerGroup string) error + RefreshMetadata func(topics ...string) error + Replicas func(topic string, partitionID int32) ([]int32, error) + Topics func() ([]string, error) + WritablePartitions func(topic string) ([]int32, error) + func NewClient(addrs []string, conf *Config) (Client, error) + type ClusterAdmin interface + AlterConfig func(resourceType ConfigResourceType, name string, entries map[string]*string, ...) error + Close func() error + CreateACL func(resource Resource, acl ACL) error + CreatePartitions func(topic string, count int32, assignment [][]int32, validateOnly bool) error + CreateTopic func(topic string, detail *TopicDetail, validateOnly bool) error + DeleteACL func(filter ACLFilter, validateOnly bool) ([]MatchingACL, error) + DeleteRecords func(topic string, partitionOffsets map[int32]int64) error + DeleteTopic func(topic string) error + DescribeCluster func() (brokers []*Broker, controllerID int32, err error) + DescribeConfig func(resource ConfigResource) ([]ConfigEntry, error) + DescribeConsumerGroups func(groups []string) ([]*GroupDescription, error) + DescribeTopics func(topics []string) (metadata []*TopicMetadata, err error) + ListAcls func(filter ACLFilter) ([]ResourceAcls, error) + ListConsumerGroupOffsets func(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) + ListConsumerGroups func() (map[string]string, error) + ListTopics func() (map[string]TopicDetail, error) + func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) + type CompressionCodec int8 + const CompressionGZIP + const CompressionLZ4 + const CompressionLevelDefault + const CompressionNone + const CompressionSnappy + const CompressionZSTD + func (cc CompressionCodec) String() string + type Config struct + Admin struct{ ... } + ChannelBufferSize int + ClientID string + Consumer struct{ ... } + Metadata struct{ ... } + MetricRegistry metrics.Registry + Net struct{ ... } + Producer struct{ ... } + Version KafkaVersion + func NewConfig() *Config + func (c *Config) Validate() error + type ConfigEntry struct + Default bool + Name string + ReadOnly bool + Sensitive bool + Source ConfigSource + Synonyms []*ConfigSynonym + Value string + type ConfigResource struct + ConfigNames []string + Name string + Type ConfigResourceType + type ConfigResourceType int8 + const AnyResource + const BrokerResource + const ClusterResource + const GroupResource + const TopicResource + const UnknownResource + type ConfigSource int8 + const SourceDefault + const SourceDynamicBroker + const SourceDynamicDefaultBroker + const SourceStaticBroker + const SourceTopic + const SourceUnknown + func (s ConfigSource) String() string + type ConfigSynonym struct + ConfigName string + ConfigValue string + Source ConfigSource + type ConfigurationError string + func (err ConfigurationError) Error() string + type Consumer interface + Close func() error + ConsumePartition func(topic string, partition int32, offset int64) (PartitionConsumer, error) + HighWaterMarks func() map[string]map[int32]int64 + Partitions func(topic string) ([]int32, error) + Topics func() ([]string, error) + func NewConsumer(addrs []string, config *Config) (Consumer, error) + func NewConsumerFromClient(client Client) (Consumer, error) + type ConsumerError struct + Err error + Partition int32 + Topic string + func (ce ConsumerError) Error() string + type ConsumerErrors []*ConsumerError + func (ce ConsumerErrors) Error() string + type ConsumerGroup interface + Close func() error + Consume func(ctx context.Context, topics []string, handler ConsumerGroupHandler) error + Errors func() <-chan error + func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) + func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) + type ConsumerGroupClaim interface + HighWaterMarkOffset func() int64 + InitialOffset func() int64 + Messages func() <-chan *ConsumerMessage + Partition func() int32 + Topic func() string + type ConsumerGroupHandler interface + Cleanup func(ConsumerGroupSession) error + ConsumeClaim func(ConsumerGroupSession, ConsumerGroupClaim) error + Setup func(ConsumerGroupSession) error + type ConsumerGroupMemberAssignment struct + Topics map[string][]int32 + UserData []byte + Version int16 + type ConsumerGroupMemberMetadata struct + Topics []string + UserData []byte + Version int16 + type ConsumerGroupSession interface + Claims func() map[string][]int32 + Context func() context.Context + GenerationID func() int32 + MarkMessage func(msg *ConsumerMessage, metadata string) + MarkOffset func(topic string, partition int32, offset int64, metadata string) + MemberID func() string + ResetOffset func(topic string, partition int32, offset int64, metadata string) + type ConsumerMessage struct + BlockTimestamp time.Time + Headers []*RecordHeader + Key []byte + Offset int64 + Partition int32 + Timestamp time.Time + Topic string + Value []byte + type ConsumerMetadataRequest struct + ConsumerGroup string + type ConsumerMetadataResponse struct + Coordinator *Broker + CoordinatorHost string + CoordinatorID int32 + CoordinatorPort int32 + Err KError + type ControlRecord struct + CoordinatorEpoch int32 + Type ControlRecordType + Version int16 + type ControlRecordType int + const ControlRecordAbort + const ControlRecordCommit + const ControlRecordUnknown + type CoordinatorType int8 + const CoordinatorGroup + const CoordinatorTransaction + type CreateAclsRequest struct + ACLCreations []*ACLCreation + Version int16 + type CreateAclsResponse struct + ACLCreationResponses []*ACLCreationResponse + ThrottleTime time.Duration + type CreatePartitionsRequest struct + Timeout time.Duration + TopicPartitions map[string]*TopicPartition + ValidateOnly bool + type CreatePartitionsResponse struct + ThrottleTime time.Duration + TopicPartitionErrors map[string]*TopicPartitionError + type CreateTopicsRequest struct + Timeout time.Duration + TopicDetails map[string]*TopicDetail + ValidateOnly bool + Version int16 + type CreateTopicsResponse struct + ThrottleTime time.Duration + TopicErrors map[string]*TopicError + Version int16 + type DeleteAclsRequest struct + Filters []*ACLFilter + Version int + type DeleteAclsResponse struct + FilterResponses []*FilterResponse + ThrottleTime time.Duration + Version int16 + type DeleteGroupsRequest struct + Groups []string + func (r *DeleteGroupsRequest) AddGroup(group string) + type DeleteGroupsResponse struct + GroupErrorCodes map[string]KError + ThrottleTime time.Duration + type DeleteRecordsRequest struct + Timeout time.Duration + Topics map[string]*DeleteRecordsRequestTopic + type DeleteRecordsRequestTopic struct + PartitionOffsets map[int32]int64 + type DeleteRecordsResponse struct + ThrottleTime time.Duration + Topics map[string]*DeleteRecordsResponseTopic + Version int16 + type DeleteRecordsResponsePartition struct + Err KError + LowWatermark int64 + type DeleteRecordsResponseTopic struct + Partitions map[int32]*DeleteRecordsResponsePartition + type DeleteTopicsRequest struct + Timeout time.Duration + Topics []string + Version int16 + type DeleteTopicsResponse struct + ThrottleTime time.Duration + TopicErrorCodes map[string]KError + Version int16 + type DescribeAclsRequest struct + Version int + type DescribeAclsResponse struct + Err KError + ErrMsg *string + ResourceAcls []*ResourceAcls + ThrottleTime time.Duration + Version int16 + type DescribeConfigsRequest struct + IncludeSynonyms bool + Resources []*ConfigResource + Version int16 + type DescribeConfigsResponse struct + Resources []*ResourceResponse + ThrottleTime time.Duration + Version int16 + type DescribeGroupsRequest struct + Groups []string + func (r *DescribeGroupsRequest) AddGroup(group string) + type DescribeGroupsResponse struct + Groups []*GroupDescription + type DynamicConsistencyPartitioner interface + MessageRequiresConsistency func(message *ProducerMessage) bool + type Encoder interface + Encode func() ([]byte, error) + Length func() int + type EndTxnRequest struct + ProducerEpoch int16 + ProducerID int64 + TransactionResult bool + TransactionalID string + type EndTxnResponse struct + Err KError + ThrottleTime time.Duration + type FetchRequest struct + Isolation IsolationLevel + MaxBytes int32 + MaxWaitTime int32 + MinBytes int32 + Version int16 + func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) + type FetchResponse struct + Blocks map[string]map[int32]*FetchResponseBlock + LogAppendTime bool + ThrottleTime time.Duration + Timestamp time.Time + Version int16 + func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, ...) + func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, ...) + func (r *FetchResponse) AddError(topic string, partition int32, err KError) + func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) + func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...) + func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) + func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, ...) + func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...) + func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, ...) + func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock + func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) + func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) + type FetchResponseBlock struct + AbortedTransactions []*AbortedTransaction + Err KError + HighWaterMarkOffset int64 + LastStableOffset int64 + Partial bool + Records *Records + RecordsSet []*Records + type FilterResponse struct + Err KError + ErrMsg *string + MatchingAcls []*MatchingACL + type FindCoordinatorRequest struct + CoordinatorKey string + CoordinatorType CoordinatorType + Version int16 + type FindCoordinatorResponse struct + Coordinator *Broker + Err KError + ErrMsg *string + ThrottleTime time.Duration + Version int16 + type GroupDescription struct + Err KError + GroupID string + Members map[string]*GroupMemberDescription + Protocol string + ProtocolType string + State string + type GroupMemberDescription struct + ClientHost string + ClientID string + MemberAssignment []byte + MemberMetadata []byte + func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) + func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) + type GroupProtocol struct + Metadata []byte + Name string + type HashPartitionerOption func(*hashPartitioner) + func WithAbsFirst() HashPartitionerOption + func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption + func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption + type HeartbeatRequest struct + GenerationID int32 + GroupID string + MemberID string + type HeartbeatResponse struct + Err KError + type InitProducerIDRequest struct + TransactionTimeout time.Duration + TransactionalID *string + type InitProducerIDResponse struct + Err KError + ProducerEpoch int16 + ProducerID int64 + ThrottleTime time.Duration + type IsolationLevel int8 + const ReadCommitted + const ReadUncommitted + type JoinGroupRequest struct + GroupID string + GroupProtocols map[string][]byte + MemberID string + OrderedGroupProtocols []*GroupProtocol + ProtocolType string + RebalanceTimeout int32 + SessionTimeout int32 + Version int16 + func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) + func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error + type JoinGroupResponse struct + Err KError + GenerationID int32 + GroupProtocol string + LeaderID string + MemberID string + Members map[string][]byte + ThrottleTime int32 + Version int16 + func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) + type KError int16 + const ErrBrokerNotAvailable + const ErrClusterAuthorizationFailed + const ErrConcurrentTransactions + const ErrConsumerCoordinatorNotAvailable + const ErrDelegationTokenAuthDisabled + const ErrDelegationTokenAuthorizationFailed + const ErrDelegationTokenExpired + const ErrDelegationTokenNotFound + const ErrDelegationTokenOwnerMismatch + const ErrDelegationTokenRequestNotAllowed + const ErrDuplicateSequenceNumber + const ErrFencedLeaderEpoch + const ErrFetchSessionIDNotFound + const ErrGroupAuthorizationFailed + const ErrGroupIDNotFound + const ErrIllegalGeneration + const ErrIllegalSASLState + const ErrInconsistentGroupProtocol + const ErrInvalidCommitOffsetSize + const ErrInvalidConfig + const ErrInvalidFetchSessionEpoch + const ErrInvalidGroupID + const ErrInvalidMessage + const ErrInvalidMessageSize + const ErrInvalidPartitions + const ErrInvalidPrincipalType + const ErrInvalidProducerEpoch + const ErrInvalidProducerIDMapping + const ErrInvalidReplicaAssignment + const ErrInvalidReplicationFactor + const ErrInvalidRequest + const ErrInvalidRequiredAcks + const ErrInvalidSessionTimeout + const ErrInvalidTimestamp + const ErrInvalidTopic + const ErrInvalidTransactionTimeout + const ErrInvalidTxnState + const ErrKafkaStorageError + const ErrLeaderNotAvailable + const ErrListenerNotFound + const ErrLogDirNotFound + const ErrMessageSetSizeTooLarge + const ErrMessageSizeTooLarge + const ErrNetworkException + const ErrNoError + const ErrNonEmptyGroup + const ErrNotController + const ErrNotCoordinatorForConsumer + const ErrNotEnoughReplicas + const ErrNotEnoughReplicasAfterAppend + const ErrNotLeaderForPartition + const ErrOffsetMetadataTooLarge + const ErrOffsetOutOfRange + const ErrOffsetsLoadInProgress + const ErrOperationNotAttempted + const ErrOutOfOrderSequenceNumber + const ErrPolicyViolation + const ErrReassignmentInProgress + const ErrRebalanceInProgress + const ErrReplicaNotAvailable + const ErrRequestTimedOut + const ErrSASLAuthenticationFailed + const ErrSecurityDisabled + const ErrStaleControllerEpochCode + const ErrTopicAlreadyExists + const ErrTopicAuthorizationFailed + const ErrTopicDeletionDisabled + const ErrTransactionCoordinatorFenced + const ErrTransactionalIDAuthorizationFailed + const ErrUnknown + const ErrUnknownLeaderEpoch + const ErrUnknownMemberID + const ErrUnknownProducerID + const ErrUnknownTopicOrPartition + const ErrUnsupportedCompressionType + const ErrUnsupportedForMessageFormat + const ErrUnsupportedSASLMechanism + const ErrUnsupportedVersion + func (err KError) Error() string + type KafkaVersion struct + func ParseKafkaVersion(s string) (KafkaVersion, error) + func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool + func (v KafkaVersion) String() string + type LeaveGroupRequest struct + GroupID string + MemberID string + type LeaveGroupResponse struct + Err KError + type ListGroupsRequest struct + type ListGroupsResponse struct + Err KError + Groups map[string]string + type MatchingACL struct + Err KError + ErrMsg *string + type Message struct + Codec CompressionCodec + CompressionLevel int + Key []byte + LogAppendTime bool + Set *MessageSet + Timestamp time.Time + Value []byte + Version int8 + type MessageBlock struct + Msg *Message + Offset int64 + func (msb *MessageBlock) Messages() []*MessageBlock + type MessageSet struct + Messages []*MessageBlock + OverflowMessage bool + PartialTrailingMessage bool + type MetadataRequest struct + AllowAutoTopicCreation bool + Topics []string + Version int16 + type MetadataResponse struct + Brokers []*Broker + ClusterID *string + ControllerID int32 + ThrottleTimeMs int32 + Topics []*TopicMetadata + Version int16 + func (r *MetadataResponse) AddBroker(addr string, id int32) + func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata + func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, ...) + type OffsetCommitRequest struct + ConsumerGroup string + ConsumerGroupGeneration int32 + ConsumerID string + RetentionTime int64 + Version int16 + func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, ...) + func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) + type OffsetCommitResponse struct + Errors map[string]map[int32]KError + ThrottleTimeMs int32 + Version int16 + func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) + type OffsetFetchRequest struct + ConsumerGroup string + Version int16 + func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) + func (r *OffsetFetchRequest) ZeroPartitions() + type OffsetFetchResponse struct + Blocks map[string]map[int32]*OffsetFetchResponseBlock + Err KError + ThrottleTimeMs int32 + Version int16 + func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) + func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock + type OffsetFetchResponseBlock struct + Err KError + LeaderEpoch int32 + Metadata string + Offset int64 + type OffsetManager interface + Close func() error + ManagePartition func(topic string, partition int32) (PartitionOffsetManager, error) + func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) + type OffsetRequest struct + Version int16 + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) + func (r *OffsetRequest) ReplicaID() int32 + func (r *OffsetRequest) SetReplicaID(id int32) + type OffsetResponse struct + Blocks map[string]map[int32]*OffsetResponseBlock + Version int16 + func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) + func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock + type OffsetResponseBlock struct + Err KError + Offset int64 + Offsets []int64 + Timestamp int64 + type PacketDecodingError struct + Info string + func (err PacketDecodingError) Error() string + type PacketEncodingError struct + Info string + func (err PacketEncodingError) Error() string + type PartitionConsumer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + HighWaterMarkOffset func() int64 + Messages func() <-chan *ConsumerMessage + type PartitionError struct + Err KError + Partition int32 + type PartitionMetadata struct + Err KError + ID int32 + Isr []int32 + Leader int32 + OfflineReplicas []int32 + Replicas []int32 + type PartitionOffsetManager interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + MarkOffset func(offset int64, metadata string) + NextOffset func() (int64, string) + ResetOffset func(offset int64, metadata string) + type PartitionOffsetMetadata struct + Metadata *string + Offset int64 + Partition int32 + type Partitioner interface + Partition func(message *ProducerMessage, numPartitions int32) (int32, error) + RequiresConsistency func() bool + func NewHashPartitioner(topic string) Partitioner + func NewManualPartitioner(topic string) Partitioner + func NewRandomPartitioner(topic string) Partitioner + func NewReferenceHashPartitioner(topic string) Partitioner + func NewRoundRobinPartitioner(topic string) Partitioner + type PartitionerConstructor func(topic string) Partitioner + func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor + func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor + type ProduceRequest struct + RequiredAcks RequiredAcks + Timeout int32 + TransactionalID *string + Version int16 + func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) + func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) + func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) + type ProduceResponse struct + Blocks map[string]map[int32]*ProduceResponseBlock + ThrottleTime time.Duration + Version int16 + func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) + func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock + type ProduceResponseBlock struct + Err KError + Offset int64 + Timestamp time.Time + type ProducerError struct + Err error + Msg *ProducerMessage + func (pe ProducerError) Error() string + type ProducerErrors []*ProducerError + func (pe ProducerErrors) Error() string + type ProducerMessage struct + Headers []RecordHeader + Key Encoder + Metadata interface{} + Offset int64 + Partition int32 + Timestamp time.Time + Topic string + Value Encoder + type Record struct + Attributes int8 + Headers []*RecordHeader + Key []byte + OffsetDelta int64 + TimestampDelta time.Duration + Value []byte + type RecordBatch struct + Codec CompressionCodec + CompressionLevel int + Control bool + FirstOffset int64 + FirstSequence int32 + FirstTimestamp time.Time + IsTransactional bool + LastOffsetDelta int32 + LogAppendTime bool + MaxTimestamp time.Time + PartialTrailingRecord bool + PartitionLeaderEpoch int32 + ProducerEpoch int16 + ProducerID int64 + Records []*Record + Version int8 + func (b *RecordBatch) LastOffset() int64 + type RecordHeader struct + Key []byte + Value []byte + type Records struct + MsgSet *MessageSet + RecordBatch *RecordBatch + type RequiredAcks int16 + const NoResponse + const WaitForAll + const WaitForLocal + type Resource struct + ResoucePatternType ACLResourcePatternType + ResourceName string + ResourceType ACLResourceType + type ResourceAcls struct + Acls []*ACL + type ResourceResponse struct + Configs []*ConfigEntry + ErrorCode int16 + ErrorMsg string + Name string + Type ConfigResourceType + type SASLMechanism string + type SCRAMClient interface + Begin func(userName, password, authzID string) error + Done func() bool + Step func(challenge string) (response string, err error) + type SaslAuthenticateRequest struct + SaslAuthBytes []byte + type SaslAuthenticateResponse struct + Err KError + ErrorMessage *string + SaslAuthBytes []byte + type SaslHandshakeRequest struct + Mechanism string + Version int16 + type SaslHandshakeResponse struct + EnabledMechanisms []string + Err KError + type StdLogger interface + Print func(v ...interface{}) + Printf func(format string, v ...interface{}) + Println func(v ...interface{}) + type StringEncoder string + func (s StringEncoder) Encode() ([]byte, error) + func (s StringEncoder) Length() int + type SyncGroupRequest struct + GenerationID int32 + GroupAssignments map[string][]byte + GroupID string + MemberID string + func (r *SyncGroupRequest) AddGroupAssignment(memberID string, memberAssignment []byte) + func (r *SyncGroupRequest) AddGroupAssignmentMember(memberID string, memberAssignment *ConsumerGroupMemberAssignment) error + type SyncGroupResponse struct + Err KError + MemberAssignment []byte + func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) + type SyncProducer interface + Close func() error + SendMessage func(msg *ProducerMessage) (partition int32, offset int64, err error) + SendMessages func(msgs []*ProducerMessage) error + func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) + func NewSyncProducerFromClient(client Client) (SyncProducer, error) + type Timestamp struct + type TopicDetail struct + ConfigEntries map[string]*string + NumPartitions int32 + ReplicaAssignment map[int32][]int32 + ReplicationFactor int16 + type TopicError struct + Err KError + ErrMsg *string + func (t *TopicError) Error() string + type TopicMetadata struct + Err KError + IsInternal bool + Name string + Partitions []*PartitionMetadata + type TopicPartition struct + Assignment [][]int32 + Count int32 + type TopicPartitionError struct + Err KError + ErrMsg *string + func (t *TopicPartitionError) Error() string + type TxnOffsetCommitRequest struct + GroupID string + ProducerEpoch int16 + ProducerID int64 + Topics map[string][]*PartitionOffsetMetadata + TransactionalID string + type TxnOffsetCommitResponse struct + ThrottleTime time.Duration + Topics map[string][]*PartitionError