Versions in this module Expand all Collapse all v0 v0.3.2 Nov 9, 2018 Changes in this version + const CompressionLevelDefault + const GroupGenerationUndefined + const OffsetNewest + const OffsetOldest + const ReceiveTime + var BalanceStrategyRange = &balanceStrategy + var BalanceStrategyRoundRobin = &balanceStrategy + var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated") + var ErrClosedClient = errors.New("kafka: tried to use a client that was closed") + var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed") + var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch") + var ErrControllerNotAvailable = errors.New("kafka: controller is not available") + var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks") + var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected") + var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index") + var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") + var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata") + var ErrNotConnected = errors.New("kafka: broker not connected") + var ErrOutOfBrokers = errors.New(...) + var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down") + var MaxRequestSize int32 = 100 * 1024 * 1024 + var MaxResponseSize int32 = 100 * 1024 * 1024 + var MaxVersion = V2_0_0_0 + var MinVersion = V0_8_2_0 + var NoNode = &Broker + var PanicHandler func(interface{}) + var SupportedVersions = []KafkaVersion + var V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) + var V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) + var V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) + var V0_10_1_1 = newKafkaVersion(0, 10, 1, 1) + var V0_10_2_0 = newKafkaVersion(0, 10, 2, 0) + var V0_10_2_1 = newKafkaVersion(0, 10, 2, 1) + var V0_11_0_0 = newKafkaVersion(0, 11, 0, 0) + var V0_11_0_1 = newKafkaVersion(0, 11, 0, 1) + var V0_11_0_2 = newKafkaVersion(0, 11, 0, 2) + var V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) + var V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) + var V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) + var V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) + var V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) + var V1_0_0_0 = newKafkaVersion(1, 0, 0, 0) + var V1_1_0_0 = newKafkaVersion(1, 1, 0, 0) + var V2_0_0_0 = newKafkaVersion(2, 0, 0, 0) + type AbortedTransaction struct + FirstOffset int64 + ProducerID int64 + type Acl struct + Host string + Operation AclOperation + PermissionType AclPermissionType + Principal string + type AclCreation struct + type AclCreationResponse struct + Err KError + ErrMsg *string + type AclFilter struct + Host *string + Operation AclOperation + PermissionType AclPermissionType + Principal *string + ResourceName *string + ResourceType AclResourceType + type AclOperation int + const AclOperationAll + const AclOperationAlter + const AclOperationAlterConfigs + const AclOperationAny + const AclOperationClusterAction + const AclOperationCreate + const AclOperationDelete + const AclOperationDescribe + const AclOperationDescribeConfigs + const AclOperationIdempotentWrite + const AclOperationRead + const AclOperationUnknown + const AclOperationWrite + type AclPermissionType int + const AclPermissionAllow + const AclPermissionAny + const AclPermissionDeny + const AclPermissionUnknown + type AclResourceType int + const AclResourceAny + const AclResourceCluster + const AclResourceGroup + const AclResourceTopic + const AclResourceTransactionalID + const AclResourceUnknown + type AddOffsetsToTxnRequest struct + GroupID string + ProducerEpoch int16 + ProducerID int64 + TransactionalID string + type AddOffsetsToTxnResponse struct + Err KError + ThrottleTime time.Duration + type AddPartitionsToTxnRequest struct + ProducerEpoch int16 + ProducerID int64 + TopicPartitions map[string][]int32 + TransactionalID string + type AddPartitionsToTxnResponse struct + Errors map[string][]*PartitionError + ThrottleTime time.Duration + type AlterConfigsRequest struct + Resources []*AlterConfigsResource + ValidateOnly bool + type AlterConfigsResource struct + ConfigEntries map[string]*string + Name string + Type ConfigResourceType + type AlterConfigsResourceResponse struct + ErrorCode int16 + ErrorMsg string + Name string + Type ConfigResourceType + type AlterConfigsResponse struct + Resources []*AlterConfigsResourceResponse + ThrottleTime time.Duration + type ApiVersionsRequest struct + type ApiVersionsResponse struct + ApiVersions []*ApiVersionsResponseBlock + Err KError + type ApiVersionsResponseBlock struct + ApiKey int16 + MaxVersion int16 + MinVersion int16 + type AsyncProducer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ProducerError + Input func() chan<- *ProducerMessage + Successes func() <-chan *ProducerMessage + func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) + func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) + type BalanceStrategy interface + Name func() string + Plan func(members map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) + type BalanceStrategyPlan map[string]map[string][]int32 + func (p BalanceStrategyPlan) Add(memberID, topic string, partitions ...int32) + type Broker struct + func NewBroker(addr string) *Broker + func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) + func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) + func (b *Broker) Addr() string + func (b *Broker) AlterConfigs(request *AlterConfigsRequest) (*AlterConfigsResponse, error) + func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse, error) + func (b *Broker) Close() error + func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) + func (b *Broker) Connected() (bool, error) + func (b *Broker) CreateAcls(request *CreateAclsRequest) (*CreateAclsResponse, error) + func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) + func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) + func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, error) + func (b *Broker) DeleteGroups(request *DeleteGroupsRequest) (*DeleteGroupsResponse, error) + func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) + func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) + func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) + func (b *Broker) DescribeConfigs(request *DescribeConfigsRequest) (*DescribeConfigsResponse, error) + func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) + func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) + func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) + func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) + func (b *Broker) FindCoordinator(request *FindCoordinatorRequest) (*FindCoordinatorResponse, error) + func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) + func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) + func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) + func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) + func (b *Broker) ID() int32 + func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) + func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) + func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) + func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) + func (b *Broker) Open(conf *Config) error + func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) + func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) + func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) + type ByteEncoder []byte + func (b ByteEncoder) Encode() ([]byte, error) + func (b ByteEncoder) Length() int + type Client interface + Brokers func() []*Broker + Close func() error + Closed func() bool + Config func() *Config + Controller func() (*Broker, error) + Coordinator func(consumerGroup string) (*Broker, error) + GetOffset func(topic string, partitionID int32, time int64) (int64, error) + InSyncReplicas func(topic string, partitionID int32) ([]int32, error) + Leader func(topic string, partitionID int32) (*Broker, error) + Partitions func(topic string) ([]int32, error) + RefreshCoordinator func(consumerGroup string) error + RefreshMetadata func(topics ...string) error + Replicas func(topic string, partitionID int32) ([]int32, error) + Topics func() ([]string, error) + WritablePartitions func(topic string) ([]int32, error) + func NewClient(addrs []string, conf *Config) (Client, error) + type ClusterAdmin interface + AlterConfig func(resourceType ConfigResourceType, name string, entries map[string]*string, ...) error + Close func() error + CreateACL func(resource Resource, acl Acl) error + CreatePartitions func(topic string, count int32, assignment [][]int32, validateOnly bool) error + CreateTopic func(topic string, detail *TopicDetail, validateOnly bool) error + DeleteACL func(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) + DeleteRecords func(topic string, partitionOffsets map[int32]int64) error + DeleteTopic func(topic string) error + DescribeConfig func(resource ConfigResource) ([]ConfigEntry, error) + ListAcls func(filter AclFilter) ([]ResourceAcls, error) + func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) + type CompressionCodec int8 + const CompressionGZIP + const CompressionLZ4 + const CompressionNone + const CompressionSnappy + func (cc CompressionCodec) String() string + type Config struct + Admin struct{ ... } + ChannelBufferSize int + ClientID string + Consumer struct{ ... } + Metadata struct{ ... } + MetricRegistry metrics.Registry + Net struct{ ... } + Producer struct{ ... } + Version KafkaVersion + func NewConfig() *Config + func (c *Config) Validate() error + type ConfigEntry struct + Default bool + Name string + ReadOnly bool + Sensitive bool + Value string + type ConfigResource struct + ConfigNames []string + Name string + Type ConfigResourceType + type ConfigResourceType int8 + const AnyResource + const BrokerResource + const ClusterResource + const GroupResource + const TopicResource + const UnknownResource + type ConfigurationError string + func (err ConfigurationError) Error() string + type Consumer interface + Close func() error + ConsumePartition func(topic string, partition int32, offset int64) (PartitionConsumer, error) + HighWaterMarks func() map[string]map[int32]int64 + Partitions func(topic string) ([]int32, error) + Topics func() ([]string, error) + func NewConsumer(addrs []string, config *Config) (Consumer, error) + func NewConsumerFromClient(client Client) (Consumer, error) + type ConsumerError struct + Err error + Partition int32 + Topic string + func (ce ConsumerError) Error() string + type ConsumerErrors []*ConsumerError + func (ce ConsumerErrors) Error() string + type ConsumerGroup interface + Close func() error + Consume func(ctx context.Context, topics []string, handler ConsumerGroupHandler) error + Errors func() <-chan error + func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) + func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) + type ConsumerGroupClaim interface + HighWaterMarkOffset func() int64 + InitialOffset func() int64 + Messages func() <-chan *ConsumerMessage + Partition func() int32 + Topic func() string + type ConsumerGroupHandler interface + Cleanup func(ConsumerGroupSession) error + ConsumeClaim func(ConsumerGroupSession, ConsumerGroupClaim) error + Setup func(ConsumerGroupSession) error + type ConsumerGroupMemberAssignment struct + Topics map[string][]int32 + UserData []byte + Version int16 + type ConsumerGroupMemberMetadata struct + Topics []string + UserData []byte + Version int16 + type ConsumerGroupSession interface + Claims func() map[string][]int32 + Context func() context.Context + GenerationID func() int32 + MarkMessage func(msg *ConsumerMessage, metadata string) + MarkOffset func(topic string, partition int32, offset int64, metadata string) + MemberID func() string + ResetOffset func(topic string, partition int32, offset int64, metadata string) + type ConsumerMessage struct + BlockTimestamp time.Time + Headers []*RecordHeader + Key []byte + Offset int64 + Partition int32 + Timestamp time.Time + Topic string + Value []byte + type ConsumerMetadataRequest struct + ConsumerGroup string + type ConsumerMetadataResponse struct + Coordinator *Broker + CoordinatorHost string + CoordinatorID int32 + CoordinatorPort int32 + Err KError + type CoordinatorType int8 + const CoordinatorGroup + const CoordinatorTransaction + type CreateAclsRequest struct + AclCreations []*AclCreation + type CreateAclsResponse struct + AclCreationResponses []*AclCreationResponse + ThrottleTime time.Duration + type CreatePartitionsRequest struct + Timeout time.Duration + TopicPartitions map[string]*TopicPartition + ValidateOnly bool + type CreatePartitionsResponse struct + ThrottleTime time.Duration + TopicPartitionErrors map[string]*TopicPartitionError + type CreateTopicsRequest struct + Timeout time.Duration + TopicDetails map[string]*TopicDetail + ValidateOnly bool + Version int16 + type CreateTopicsResponse struct + ThrottleTime time.Duration + TopicErrors map[string]*TopicError + Version int16 + type DeleteAclsRequest struct + Filters []*AclFilter + type DeleteAclsResponse struct + FilterResponses []*FilterResponse + ThrottleTime time.Duration + type DeleteGroupsRequest struct + Groups []string + func (r *DeleteGroupsRequest) AddGroup(group string) + type DeleteGroupsResponse struct + GroupErrorCodes map[string]KError + ThrottleTime time.Duration + type DeleteRecordsRequest struct + Timeout time.Duration + Topics map[string]*DeleteRecordsRequestTopic + type DeleteRecordsRequestTopic struct + PartitionOffsets map[int32]int64 + type DeleteRecordsResponse struct + ThrottleTime time.Duration + Topics map[string]*DeleteRecordsResponseTopic + Version int16 + type DeleteRecordsResponsePartition struct + Err KError + LowWatermark int64 + type DeleteRecordsResponseTopic struct + Partitions map[int32]*DeleteRecordsResponsePartition + type DeleteTopicsRequest struct + Timeout time.Duration + Topics []string + Version int16 + type DeleteTopicsResponse struct + ThrottleTime time.Duration + TopicErrorCodes map[string]KError + Version int16 + type DescribeAclsRequest struct + type DescribeAclsResponse struct + Err KError + ErrMsg *string + ResourceAcls []*ResourceAcls + ThrottleTime time.Duration + type DescribeConfigsRequest struct + Resources []*ConfigResource + type DescribeConfigsResponse struct + Resources []*ResourceResponse + ThrottleTime time.Duration + type DescribeGroupsRequest struct + Groups []string + func (r *DescribeGroupsRequest) AddGroup(group string) + type DescribeGroupsResponse struct + Groups []*GroupDescription + type DynamicConsistencyPartitioner interface + MessageRequiresConsistency func(message *ProducerMessage) bool + type Encoder interface + Encode func() ([]byte, error) + Length func() int + type EndTxnRequest struct + ProducerEpoch int16 + ProducerID int64 + TransactionResult bool + TransactionalID string + type EndTxnResponse struct + Err KError + ThrottleTime time.Duration + type FetchRequest struct + Isolation IsolationLevel + MaxBytes int32 + MaxWaitTime int32 + MinBytes int32 + Version int16 + func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) + type FetchResponse struct + Blocks map[string]map[int32]*FetchResponseBlock + ThrottleTime time.Duration + Version int16 + func (r *FetchResponse) AddError(topic string, partition int32, err KError) + func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) + func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) + func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock + func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) + func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) + type FetchResponseBlock struct + AbortedTransactions []*AbortedTransaction + Err KError + HighWaterMarkOffset int64 + LastStableOffset int64 + Partial bool + Records *Records + RecordsSet []*Records + type FilterResponse struct + Err KError + ErrMsg *string + MatchingAcls []*MatchingAcl + type FindCoordinatorRequest struct + CoordinatorKey string + CoordinatorType CoordinatorType + Version int16 + type FindCoordinatorResponse struct + Coordinator *Broker + Err KError + ErrMsg *string + ThrottleTime time.Duration + Version int16 + type GroupDescription struct + Err KError + GroupId string + Members map[string]*GroupMemberDescription + Protocol string + ProtocolType string + State string + type GroupMemberDescription struct + ClientHost string + ClientId string + MemberAssignment []byte + MemberMetadata []byte + func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) + func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) + type GroupProtocol struct + Metadata []byte + Name string + type HashPartitionerOption func(*hashPartitioner) + func WithAbsFirst() HashPartitionerOption + func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption + func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption + type HeartbeatRequest struct + GenerationId int32 + GroupId string + MemberId string + type HeartbeatResponse struct + Err KError + type InitProducerIDRequest struct + TransactionTimeout time.Duration + TransactionalID *string + type InitProducerIDResponse struct + Err KError + ProducerEpoch int16 + ProducerID int64 + ThrottleTime time.Duration + type IsolationLevel int8 + const ReadCommitted + const ReadUncommitted + type JoinGroupRequest struct + GroupId string + GroupProtocols map[string][]byte + MemberId string + OrderedGroupProtocols []*GroupProtocol + ProtocolType string + RebalanceTimeout int32 + SessionTimeout int32 + Version int16 + func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) + func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error + type JoinGroupResponse struct + Err KError + GenerationId int32 + GroupProtocol string + LeaderId string + MemberId string + Members map[string][]byte + ThrottleTime int32 + Version int16 + func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) + type KError int16 + const ErrBrokerNotAvailable + const ErrClusterAuthorizationFailed + const ErrConcurrentTransactions + const ErrConsumerCoordinatorNotAvailable + const ErrDuplicateSequenceNumber + const ErrGroupAuthorizationFailed + const ErrIllegalGeneration + const ErrIllegalSASLState + const ErrInconsistentGroupProtocol + const ErrInvalidCommitOffsetSize + const ErrInvalidConfig + const ErrInvalidGroupId + const ErrInvalidMessage + const ErrInvalidMessageSize + const ErrInvalidPartitions + const ErrInvalidProducerEpoch + const ErrInvalidProducerIDMapping + const ErrInvalidReplicaAssignment + const ErrInvalidReplicationFactor + const ErrInvalidRequest + const ErrInvalidRequiredAcks + const ErrInvalidSessionTimeout + const ErrInvalidTimestamp + const ErrInvalidTopic + const ErrInvalidTransactionTimeout + const ErrInvalidTxnState + const ErrKafkaStorageError + const ErrLeaderNotAvailable + const ErrLogDirNotFound + const ErrMessageSetSizeTooLarge + const ErrMessageSizeTooLarge + const ErrNetworkException + const ErrNoError + const ErrNotController + const ErrNotCoordinatorForConsumer + const ErrNotEnoughReplicas + const ErrNotEnoughReplicasAfterAppend + const ErrNotLeaderForPartition + const ErrOffsetMetadataTooLarge + const ErrOffsetOutOfRange + const ErrOffsetsLoadInProgress + const ErrOperationNotAttempted + const ErrOutOfOrderSequenceNumber + const ErrPolicyViolation + const ErrReassignmentInProgress + const ErrRebalanceInProgress + const ErrReplicaNotAvailable + const ErrRequestTimedOut + const ErrSASLAuthenticationFailed + const ErrSecurityDisabled + const ErrStaleControllerEpochCode + const ErrTopicAlreadyExists + const ErrTopicAuthorizationFailed + const ErrTransactionCoordinatorFenced + const ErrTransactionalIDAuthorizationFailed + const ErrUnknown + const ErrUnknownMemberId + const ErrUnknownProducerID + const ErrUnknownTopicOrPartition + const ErrUnsupportedForMessageFormat + const ErrUnsupportedSASLMechanism + const ErrUnsupportedVersion + func (err KError) Error() string + type KafkaVersion struct + func ParseKafkaVersion(s string) (KafkaVersion, error) + func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool + func (v KafkaVersion) String() string + type LeaveGroupRequest struct + GroupId string + MemberId string + type LeaveGroupResponse struct + Err KError + type ListGroupsRequest struct + type ListGroupsResponse struct + Err KError + Groups map[string]string + type MatchingAcl struct + Err KError + ErrMsg *string + type Message struct + Codec CompressionCodec + CompressionLevel int + Key []byte + Set *MessageSet + Timestamp time.Time + Value []byte + Version int8 + type MessageBlock struct + Msg *Message + Offset int64 + func (msb *MessageBlock) Messages() []*MessageBlock + type MessageSet struct + Messages []*MessageBlock + OverflowMessage bool + PartialTrailingMessage bool + type MetadataRequest struct + AllowAutoTopicCreation bool + Topics []string + Version int16 + type MetadataResponse struct + Brokers []*Broker + ClusterID *string + ControllerID int32 + ThrottleTimeMs int32 + Topics []*TopicMetadata + Version int16 + func (r *MetadataResponse) AddBroker(addr string, id int32) + func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata + func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) + type MockAlterConfigsResponse struct + func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse + func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder + type MockBroker struct + func NewMockBroker(t TestReporter, brokerID int32) *MockBroker + func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker + func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker + func (b *MockBroker) Addr() string + func (b *MockBroker) BrokerID() int32 + func (b *MockBroker) Close() + func (b *MockBroker) History() []RequestResponse + func (b *MockBroker) Port() int32 + func (b *MockBroker) Returns(e encoder) + func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) + func (b *MockBroker) SetLatency(latency time.Duration) + func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) + type MockConsumerMetadataResponse struct + func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse + func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder + func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse + func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse + type MockCreateAclsResponse struct + func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse + func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder + type MockCreatePartitionsResponse struct + func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse + func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder + type MockCreateTopicsResponse struct + func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse + func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder + type MockDeleteAclsResponse struct + func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse + func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder + type MockDeleteRecordsResponse struct + func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse + func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder + type MockDeleteTopicsResponse struct + func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse + func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder + type MockDescribeConfigsResponse struct + func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse + func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder + type MockFetchResponse struct + func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse + func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder + func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse + func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse + func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse + type MockFindCoordinatorResponse struct + func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse + func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder + func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse + func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse + type MockListAclsResponse struct + func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse + func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder + type MockMetadataResponse struct + func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse + func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder + func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse + func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse + func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse + type MockOffsetCommitResponse struct + func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse + func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder + func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse + type MockOffsetFetchResponse struct + func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse + func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder + func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, ...) *MockOffsetFetchResponse + type MockOffsetResponse struct + func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse + func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder + func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse + func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse + type MockProduceResponse struct + func NewMockProduceResponse(t TestReporter) *MockProduceResponse + func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder + func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse + func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse + type MockResponse interface + For func(reqBody versionedDecoder) (res encoder) + type MockSequence struct + func NewMockSequence(responses ...interface{}) *MockSequence + func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) + type MockWrapper struct + func NewMockWrapper(res encoder) *MockWrapper + func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) + type OffsetCommitRequest struct + ConsumerGroup string + ConsumerGroupGeneration int32 + ConsumerID string + RetentionTime int64 + Version int16 + func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, ...) + func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) + type OffsetCommitResponse struct + Errors map[string]map[int32]KError + func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) + type OffsetFetchRequest struct + ConsumerGroup string + Version int16 + func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) + type OffsetFetchResponse struct + Blocks map[string]map[int32]*OffsetFetchResponseBlock + func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) + func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock + type OffsetFetchResponseBlock struct + Err KError + Metadata string + Offset int64 + type OffsetManager interface + Close func() error + ManagePartition func(topic string, partition int32) (PartitionOffsetManager, error) + func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) + type OffsetRequest struct + Version int16 + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) + type OffsetResponse struct + Blocks map[string]map[int32]*OffsetResponseBlock + Version int16 + func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) + func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock + type OffsetResponseBlock struct + Err KError + Offset int64 + Offsets []int64 + Timestamp int64 + type PacketDecodingError struct + Info string + func (err PacketDecodingError) Error() string + type PacketEncodingError struct + Info string + func (err PacketEncodingError) Error() string + type PartitionConsumer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + HighWaterMarkOffset func() int64 + Messages func() <-chan *ConsumerMessage + type PartitionError struct + Err KError + Partition int32 + type PartitionMetadata struct + Err KError + ID int32 + Isr []int32 + Leader int32 + OfflineReplicas []int32 + Replicas []int32 + type PartitionOffsetManager interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + MarkOffset func(offset int64, metadata string) + NextOffset func() (int64, string) + ResetOffset func(offset int64, metadata string) + type PartitionOffsetMetadata struct + Metadata *string + Offset int64 + Partition int32 + type Partitioner interface + Partition func(message *ProducerMessage, numPartitions int32) (int32, error) + RequiresConsistency func() bool + func NewHashPartitioner(topic string) Partitioner + func NewManualPartitioner(topic string) Partitioner + func NewRandomPartitioner(topic string) Partitioner + func NewReferenceHashPartitioner(topic string) Partitioner + func NewRoundRobinPartitioner(topic string) Partitioner + type PartitionerConstructor func(topic string) Partitioner + func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor + func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor + type ProduceRequest struct + RequiredAcks RequiredAcks + Timeout int32 + TransactionalID *string + Version int16 + func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) + func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) + func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) + type ProduceResponse struct + Blocks map[string]map[int32]*ProduceResponseBlock + ThrottleTime time.Duration + Version int16 + func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) + func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock + type ProduceResponseBlock struct + Err KError + Offset int64 + Timestamp time.Time + type ProducerError struct + Err error + Msg *ProducerMessage + func (pe ProducerError) Error() string + type ProducerErrors []*ProducerError + func (pe ProducerErrors) Error() string + type ProducerMessage struct + Headers []RecordHeader + Key Encoder + Metadata interface{} + Offset int64 + Partition int32 + Timestamp time.Time + Topic string + Value Encoder + type Record struct + Attributes int8 + Headers []*RecordHeader + Key []byte + OffsetDelta int64 + TimestampDelta time.Duration + Value []byte + type RecordBatch struct + Codec CompressionCodec + CompressionLevel int + Control bool + FirstOffset int64 + FirstSequence int32 + FirstTimestamp time.Time + LastOffsetDelta int32 + MaxTimestamp time.Time + PartialTrailingRecord bool + PartitionLeaderEpoch int32 + ProducerEpoch int16 + ProducerID int64 + Records []*Record + Version int8 + type RecordHeader struct + Key []byte + Value []byte + type Records struct + MsgSet *MessageSet + RecordBatch *RecordBatch + type RequestNotifierFunc func(bytesRead, bytesWritten int) + type RequestResponse struct + Request protocolBody + Response encoder + type RequiredAcks int16 + const NoResponse + const WaitForAll + const WaitForLocal + type Resource struct + ResourceName string + ResourceType AclResourceType + type ResourceAcls struct + Acls []*Acl + type ResourceResponse struct + Configs []*ConfigEntry + ErrorCode int16 + ErrorMsg string + Name string + Type ConfigResourceType + type SaslHandshakeRequest struct + Mechanism string + type SaslHandshakeResponse struct + EnabledMechanisms []string + Err KError + type StdLogger interface + Print func(v ...interface{}) + Printf func(format string, v ...interface{}) + Println func(v ...interface{}) + var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) + type StringEncoder string + func (s StringEncoder) Encode() ([]byte, error) + func (s StringEncoder) Length() int + type SyncGroupRequest struct + GenerationId int32 + GroupAssignments map[string][]byte + GroupId string + MemberId string + func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) + func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error + type SyncGroupResponse struct + Err KError + MemberAssignment []byte + func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) + type SyncProducer interface + Close func() error + SendMessage func(msg *ProducerMessage) (partition int32, offset int64, err error) + SendMessages func(msgs []*ProducerMessage) error + func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) + func NewSyncProducerFromClient(client Client) (SyncProducer, error) + type TestReporter interface + Error func(...interface{}) + Errorf func(string, ...interface{}) + Fatal func(...interface{}) + Fatalf func(string, ...interface{}) + type Timestamp struct + type TopicDetail struct + ConfigEntries map[string]*string + NumPartitions int32 + ReplicaAssignment map[int32][]int32 + ReplicationFactor int16 + type TopicError struct + Err KError + ErrMsg *string + type TopicMetadata struct + Err KError + IsInternal bool + Name string + Partitions []*PartitionMetadata + type TopicPartition struct + Assignment [][]int32 + Count int32 + type TopicPartitionError struct + Err KError + ErrMsg *string + type TxnOffsetCommitRequest struct + GroupID string + ProducerEpoch int16 + ProducerID int64 + Topics map[string][]*PartitionOffsetMetadata + TransactionalID string + type TxnOffsetCommitResponse struct + ThrottleTime time.Duration + Topics map[string][]*PartitionError