Versions in this module Expand all Collapse all v1 v1.11.0 Dec 20, 2016 Changes in this version + const GroupGenerationUndefined + const OffsetNewest + const OffsetOldest + const ReceiveTime + var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated") + var ErrClosedClient = errors.New("kafka: tried to use a client that was closed") + var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks") + var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected") + var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index") + var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") + var ErrNotConnected = errors.New("kafka: broker not connected") + var ErrOutOfBrokers = errors.New(...) + var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down") + var MaxRequestSize int32 = 100 * 1024 * 1024 + var MaxResponseSize int32 = 100 * 1024 * 1024 + var PanicHandler func(interface{}) + var V0_10_0_0 = newKafkaVersion(0, 10, 0, 0) + var V0_10_0_1 = newKafkaVersion(0, 10, 0, 1) + var V0_10_1_0 = newKafkaVersion(0, 10, 1, 0) + var V0_8_2_0 = newKafkaVersion(0, 8, 2, 0) + var V0_8_2_1 = newKafkaVersion(0, 8, 2, 1) + var V0_8_2_2 = newKafkaVersion(0, 8, 2, 2) + var V0_9_0_0 = newKafkaVersion(0, 9, 0, 0) + var V0_9_0_1 = newKafkaVersion(0, 9, 0, 1) + type ApiVersionsRequest struct + type ApiVersionsResponse struct + ApiVersions []*ApiVersionsResponseBlock + Err KError + type ApiVersionsResponseBlock struct + ApiKey int16 + MaxVersion int16 + MinVersion int16 + type AsyncProducer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ProducerError + Input func() chan<- *ProducerMessage + Successes func() <-chan *ProducerMessage + func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) + func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) + type Broker struct + func NewBroker(addr string) *Broker + func (b *Broker) Addr() string + func (b *Broker) Close() error + func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) + func (b *Broker) Connected() (bool, error) + func (b *Broker) DescribeGroups(request *DescribeGroupsRequest) (*DescribeGroupsResponse, error) + func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) + func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) + func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) + func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) + func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) + func (b *Broker) Heartbeat(request *HeartbeatRequest) (*HeartbeatResponse, error) + func (b *Broker) ID() int32 + func (b *Broker) JoinGroup(request *JoinGroupRequest) (*JoinGroupResponse, error) + func (b *Broker) LeaveGroup(request *LeaveGroupRequest) (*LeaveGroupResponse, error) + func (b *Broker) ListGroups(request *ListGroupsRequest) (*ListGroupsResponse, error) + func (b *Broker) Open(conf *Config) error + func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) + func (b *Broker) SyncGroup(request *SyncGroupRequest) (*SyncGroupResponse, error) + type ByteEncoder []byte + func (b ByteEncoder) Encode() ([]byte, error) + func (b ByteEncoder) Length() int + type Client interface + Close func() error + Closed func() bool + Config func() *Config + Coordinator func(consumerGroup string) (*Broker, error) + GetOffset func(topic string, partitionID int32, time int64) (int64, error) + Leader func(topic string, partitionID int32) (*Broker, error) + Partitions func(topic string) ([]int32, error) + RefreshCoordinator func(consumerGroup string) error + RefreshMetadata func(topics ...string) error + Replicas func(topic string, partitionID int32) ([]int32, error) + Topics func() ([]string, error) + WritablePartitions func(topic string) ([]int32, error) + func NewClient(addrs []string, conf *Config) (Client, error) + type CompressionCodec int8 + const CompressionGZIP + const CompressionLZ4 + const CompressionNone + const CompressionSnappy + type Config struct + ChannelBufferSize int + ClientID string + Consumer struct{ ... } + Metadata struct{ ... } + MetricRegistry metrics.Registry + Net struct{ ... } + Producer struct{ ... } + Version KafkaVersion + func NewConfig() *Config + func (c *Config) Validate() error + type ConfigurationError string + func (err ConfigurationError) Error() string + type Consumer interface + Close func() error + ConsumePartition func(topic string, partition int32, offset int64) (PartitionConsumer, error) + HighWaterMarks func() map[string]map[int32]int64 + Partitions func(topic string) ([]int32, error) + Topics func() ([]string, error) + func NewConsumer(addrs []string, config *Config) (Consumer, error) + func NewConsumerFromClient(client Client) (Consumer, error) + type ConsumerError struct + Err error + Partition int32 + Topic string + func (ce ConsumerError) Error() string + type ConsumerErrors []*ConsumerError + func (ce ConsumerErrors) Error() string + type ConsumerGroupMemberAssignment struct + Topics map[string][]int32 + UserData []byte + Version int16 + type ConsumerGroupMemberMetadata struct + Topics []string + UserData []byte + Version int16 + type ConsumerMessage struct + Key []byte + Offset int64 + Partition int32 + Timestamp time.Time + Topic string + Value []byte + type ConsumerMetadataRequest struct + ConsumerGroup string + type ConsumerMetadataResponse struct + Coordinator *Broker + CoordinatorHost string + CoordinatorID int32 + CoordinatorPort int32 + Err KError + type DescribeGroupsRequest struct + Groups []string + func (r *DescribeGroupsRequest) AddGroup(group string) + type DescribeGroupsResponse struct + Groups []*GroupDescription + type Encoder interface + Encode func() ([]byte, error) + Length func() int + type FetchRequest struct + MaxWaitTime int32 + MinBytes int32 + Version int16 + func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) + type FetchResponse struct + Blocks map[string]map[int32]*FetchResponseBlock + ThrottleTime time.Duration + Version int16 + func (r *FetchResponse) AddError(topic string, partition int32, err KError) + func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) + func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock + type FetchResponseBlock struct + Err KError + HighWaterMarkOffset int64 + MsgSet MessageSet + type GroupDescription struct + Err KError + GroupId string + Members map[string]*GroupMemberDescription + Protocol string + ProtocolType string + State string + type GroupMemberDescription struct + ClientHost string + ClientId string + MemberAssignment []byte + MemberMetadata []byte + func (gmd *GroupMemberDescription) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) + func (gmd *GroupMemberDescription) GetMemberMetadata() (*ConsumerGroupMemberMetadata, error) + type HeartbeatRequest struct + GenerationId int32 + GroupId string + MemberId string + type HeartbeatResponse struct + Err KError + type JoinGroupRequest struct + GroupId string + GroupProtocols map[string][]byte + MemberId string + ProtocolType string + SessionTimeout int32 + func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) + func (r *JoinGroupRequest) AddGroupProtocolMetadata(name string, metadata *ConsumerGroupMemberMetadata) error + type JoinGroupResponse struct + Err KError + GenerationId int32 + GroupProtocol string + LeaderId string + MemberId string + Members map[string][]byte + func (r *JoinGroupResponse) GetMembers() (map[string]ConsumerGroupMemberMetadata, error) + type KError int16 + const ErrBrokerNotAvailable + const ErrClusterAuthorizationFailed + const ErrConsumerCoordinatorNotAvailable + const ErrGroupAuthorizationFailed + const ErrIllegalGeneration + const ErrIllegalSASLState + const ErrInconsistentGroupProtocol + const ErrInvalidCommitOffsetSize + const ErrInvalidGroupId + const ErrInvalidMessage + const ErrInvalidMessageSize + const ErrInvalidRequiredAcks + const ErrInvalidSessionTimeout + const ErrInvalidTimestamp + const ErrInvalidTopic + const ErrLeaderNotAvailable + const ErrMessageSetSizeTooLarge + const ErrMessageSizeTooLarge + const ErrNetworkException + const ErrNoError + const ErrNotCoordinatorForConsumer + const ErrNotEnoughReplicas + const ErrNotEnoughReplicasAfterAppend + const ErrNotLeaderForPartition + const ErrOffsetMetadataTooLarge + const ErrOffsetOutOfRange + const ErrOffsetsLoadInProgress + const ErrRebalanceInProgress + const ErrReplicaNotAvailable + const ErrRequestTimedOut + const ErrStaleControllerEpochCode + const ErrTopicAuthorizationFailed + const ErrUnknown + const ErrUnknownMemberId + const ErrUnknownTopicOrPartition + const ErrUnsupportedForMessageFormat + const ErrUnsupportedSASLMechanism + const ErrUnsupportedVersion + func (err KError) Error() string + type KafkaVersion struct + func (v KafkaVersion) IsAtLeast(other KafkaVersion) bool + type LeaveGroupRequest struct + GroupId string + MemberId string + type LeaveGroupResponse struct + Err KError + type ListGroupsRequest struct + type ListGroupsResponse struct + Err KError + Groups map[string]string + type Message struct + Codec CompressionCodec + Key []byte + Set *MessageSet + Timestamp time.Time + Value []byte + Version int8 + type MessageBlock struct + Msg *Message + Offset int64 + func (msb *MessageBlock) Messages() []*MessageBlock + type MessageSet struct + Messages []*MessageBlock + PartialTrailingMessage bool + type MetadataRequest struct + Topics []string + type MetadataResponse struct + Brokers []*Broker + Topics []*TopicMetadata + func (r *MetadataResponse) AddBroker(addr string, id int32) + func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata + func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) + type MockBroker struct + func NewMockBroker(t TestReporter, brokerID int32) *MockBroker + func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker + func (b *MockBroker) Addr() string + func (b *MockBroker) BrokerID() int32 + func (b *MockBroker) Close() + func (b *MockBroker) History() []RequestResponse + func (b *MockBroker) Port() int32 + func (b *MockBroker) Returns(e encoder) + func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) + func (b *MockBroker) SetLatency(latency time.Duration) + func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) + type MockConsumerMetadataResponse struct + func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse + func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder + func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse + func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse + type MockFetchResponse struct + func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse + func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder + func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse + func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse + type MockMetadataResponse struct + func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse + func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder + func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse + func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse + type MockOffsetCommitResponse struct + func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse + func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder + func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse + type MockOffsetFetchResponse struct + func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse + func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder + func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, ...) *MockOffsetFetchResponse + type MockOffsetResponse struct + func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse + func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder + func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse + type MockProduceResponse struct + func NewMockProduceResponse(t TestReporter) *MockProduceResponse + func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder + func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse + type MockResponse interface + For func(reqBody versionedDecoder) (res encoder) + type MockSequence struct + func NewMockSequence(responses ...interface{}) *MockSequence + func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) + type MockWrapper struct + func NewMockWrapper(res encoder) *MockWrapper + func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) + type OffsetCommitRequest struct + ConsumerGroup string + ConsumerGroupGeneration int32 + ConsumerID string + RetentionTime int64 + Version int16 + func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, ...) + type OffsetCommitResponse struct + Errors map[string]map[int32]KError + func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) + type OffsetFetchRequest struct + ConsumerGroup string + Version int16 + func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) + type OffsetFetchResponse struct + Blocks map[string]map[int32]*OffsetFetchResponseBlock + func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) + func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock + type OffsetFetchResponseBlock struct + Err KError + Metadata string + Offset int64 + type OffsetManager interface + Close func() error + ManagePartition func(topic string, partition int32) (PartitionOffsetManager, error) + func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) + type OffsetRequest struct + Version int16 + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) + type OffsetResponse struct + Blocks map[string]map[int32]*OffsetResponseBlock + Version int16 + func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) + func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock + type OffsetResponseBlock struct + Err KError + Offset int64 + Offsets []int64 + Timestamp int64 + type PacketDecodingError struct + Info string + func (err PacketDecodingError) Error() string + type PacketEncodingError struct + Info string + func (err PacketEncodingError) Error() string + type PartitionConsumer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + HighWaterMarkOffset func() int64 + Messages func() <-chan *ConsumerMessage + type PartitionMetadata struct + Err KError + ID int32 + Isr []int32 + Leader int32 + Replicas []int32 + type PartitionOffsetManager interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + MarkOffset func(offset int64, metadata string) + NextOffset func() (int64, string) + type Partitioner interface + Partition func(message *ProducerMessage, numPartitions int32) (int32, error) + RequiresConsistency func() bool + func NewHashPartitioner(topic string) Partitioner + func NewManualPartitioner(topic string) Partitioner + func NewRandomPartitioner(topic string) Partitioner + func NewRoundRobinPartitioner(topic string) Partitioner + type PartitionerConstructor func(topic string) Partitioner + type ProduceRequest struct + RequiredAcks RequiredAcks + Timeout int32 + Version int16 + func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) + func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) + type ProduceResponse struct + Blocks map[string]map[int32]*ProduceResponseBlock + ThrottleTime time.Duration + Version int16 + func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) + func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock + type ProduceResponseBlock struct + Err KError + Offset int64 + Timestamp time.Time + type ProducerError struct + Err error + Msg *ProducerMessage + func (pe ProducerError) Error() string + type ProducerErrors []*ProducerError + func (pe ProducerErrors) Error() string + type ProducerMessage struct + Key Encoder + Metadata interface{} + Offset int64 + Partition int32 + Timestamp time.Time + Topic string + Value Encoder + type RequestNotifierFunc func(bytesRead, bytesWritten int) + type RequestResponse struct + Request protocolBody + Response encoder + type RequiredAcks int16 + const NoResponse + const WaitForAll + const WaitForLocal + type SaslHandshakeRequest struct + Mechanism string + type SaslHandshakeResponse struct + EnabledMechanisms []string + Err KError + type StdLogger interface + Print func(v ...interface{}) + Printf func(format string, v ...interface{}) + Println func(v ...interface{}) + var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) + type StringEncoder string + func (s StringEncoder) Encode() ([]byte, error) + func (s StringEncoder) Length() int + type SyncGroupRequest struct + GenerationId int32 + GroupAssignments map[string][]byte + GroupId string + MemberId string + func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) + func (r *SyncGroupRequest) AddGroupAssignmentMember(memberId string, memberAssignment *ConsumerGroupMemberAssignment) error + type SyncGroupResponse struct + Err KError + MemberAssignment []byte + func (r *SyncGroupResponse) GetMemberAssignment() (*ConsumerGroupMemberAssignment, error) + type SyncProducer interface + Close func() error + SendMessage func(msg *ProducerMessage) (partition int32, offset int64, err error) + SendMessages func(msgs []*ProducerMessage) error + func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) + func NewSyncProducerFromClient(client Client) (SyncProducer, error) + type TestReporter interface + Error func(...interface{}) + Errorf func(string, ...interface{}) + Fatal func(...interface{}) + Fatalf func(string, ...interface{}) + type TopicMetadata struct + Err KError + Name string + Partitions []*PartitionMetadata