Versions in this module Expand all Collapse all v0 v0.22.2 Mar 15, 2016 v0.22.1 Mar 9, 2016 v0.22.0 Mar 1, 2016 v0.21.1 Feb 6, 2016 v0.21.0 Feb 4, 2016 Changes in this version + const OffsetNewest + const OffsetOldest + const ReceiveTime + var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated") + var ErrClosedClient = errors.New("kafka: tried to use a client that was closed") + var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks") + var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected") + var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index") + var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") + var ErrNotConnected = errors.New("kafka: broker not connected") + var ErrOutOfBrokers = errors.New(...) + var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down") + var MaxRequestSize int32 = 100 * 1024 * 1024 + var MaxResponseSize int32 = 100 * 1024 * 1024 + var PanicHandler func(interface{}) + type AsyncProducer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ProducerError + Input func() chan<- *ProducerMessage + Successes func() <-chan *ProducerMessage + func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) + func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) + type Broker struct + func NewBroker(addr string) *Broker + func (b *Broker) Addr() string + func (b *Broker) Close() error + func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) + func (b *Broker) Connected() (bool, error) + func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) + func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) + func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) + func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) + func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) + func (b *Broker) ID() int32 + func (b *Broker) Open(conf *Config) error + func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) + type ByteEncoder []byte + func (b ByteEncoder) Encode() ([]byte, error) + func (b ByteEncoder) Length() int + type Client interface + Close func() error + Closed func() bool + Config func() *Config + Coordinator func(consumerGroup string) (*Broker, error) + GetOffset func(topic string, partitionID int32, time int64) (int64, error) + Leader func(topic string, partitionID int32) (*Broker, error) + Partitions func(topic string) ([]int32, error) + RefreshCoordinator func(consumerGroup string) error + RefreshMetadata func(topics ...string) error + Replicas func(topic string, partitionID int32) ([]int32, error) + Topics func() ([]string, error) + WritablePartitions func(topic string) ([]int32, error) + func NewClient(addrs []string, conf *Config) (Client, error) + type CompressionCodec int8 + const CompressionGZIP + const CompressionNone + const CompressionSnappy + type Config struct + ChannelBufferSize int + ClientID string + Consumer struct{ ... } + Metadata struct{ ... } + Net struct{ ... } + Producer struct{ ... } + func NewConfig() *Config + func (c *Config) Validate() error + type ConfigurationError string + func (err ConfigurationError) Error() string + type Consumer interface + Close func() error + ConsumePartition func(topic string, partition int32, offset int64) (PartitionConsumer, error) + Partitions func(topic string) ([]int32, error) + Topics func() ([]string, error) + func NewConsumer(addrs []string, config *Config) (Consumer, error) + func NewConsumerFromClient(client Client) (Consumer, error) + type ConsumerError struct + Err error + Partition int32 + Topic string + func (ce ConsumerError) Error() string + type ConsumerErrors []*ConsumerError + func (ce ConsumerErrors) Error() string + type ConsumerMessage struct + Key []byte + Offset int64 + Partition int32 + Topic string + Value []byte + type ConsumerMetadataRequest struct + ConsumerGroup string + type ConsumerMetadataResponse struct + Coordinator *Broker + CoordinatorHost string + CoordinatorID int32 + CoordinatorPort int32 + Err KError + type DescribeGroupsRequest struct + Groups []string + func (r *DescribeGroupsRequest) AddGroup(group string) + type DescribeGroupsResponse struct + Groups []*GroupDescription + type Encoder interface + Encode func() ([]byte, error) + Length func() int + type FetchRequest struct + MaxWaitTime int32 + MinBytes int32 + func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) + type FetchResponse struct + Blocks map[string]map[int32]*FetchResponseBlock + func (fr *FetchResponse) AddError(topic string, partition int32, err KError) + func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) + func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock + type FetchResponseBlock struct + Err KError + HighWaterMarkOffset int64 + MsgSet MessageSet + type GroupDescription struct + Err KError + GroupId string + Members map[string]*GroupMemberDescription + Protocol string + ProtocolType string + State string + type GroupMemberDescription struct + ClientHost string + ClientId string + MemberAssignment []byte + MemberMetadata []byte + type HeartbeatRequest struct + GenerationId int32 + GroupId string + MemberId string + type HeartbeatResponse struct + Err KError + type JoinGroupRequest struct + GroupId string + GroupProtocols map[string][]byte + MemberId string + ProtocolType string + SessionTimeout int32 + func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) + type JoinGroupResponse struct + Err KError + GenerationId int32 + GroupProtocol string + LeaderId string + MemberId string + Members map[string][]byte + type KError int16 + const ErrBrokerNotAvailable + const ErrClusterAuthorizationFailed + const ErrConsumerCoordinatorNotAvailable + const ErrGroupAuthorizationFailed + const ErrIllegalGeneration + const ErrInconsistentGroupProtocol + const ErrInvalidCommitOffsetSize + const ErrInvalidGroupId + const ErrInvalidMessage + const ErrInvalidMessageSize + const ErrInvalidRequiredAcks + const ErrInvalidSessionTimeout + const ErrInvalidTopic + const ErrLeaderNotAvailable + const ErrMessageSetSizeTooLarge + const ErrMessageSizeTooLarge + const ErrNoError + const ErrNotCoordinatorForConsumer + const ErrNotEnoughReplicas + const ErrNotEnoughReplicasAfterAppend + const ErrNotLeaderForPartition + const ErrOffsetMetadataTooLarge + const ErrOffsetOutOfRange + const ErrOffsetsLoadInProgress + const ErrRebalanceInProgress + const ErrReplicaNotAvailable + const ErrRequestTimedOut + const ErrStaleControllerEpochCode + const ErrTopicAuthorizationFailed + const ErrUnknown + const ErrUnknownMemberId + const ErrUnknownTopicOrPartition + func (err KError) Error() string + type LeaveGroupRequest struct + GroupId string + MemberId string + type LeaveGroupResponse struct + Err KError + type ListGroupsRequest struct + type ListGroupsResponse struct + Err KError + Groups map[string]string + type Message struct + Codec CompressionCodec + Key []byte + Set *MessageSet + Value []byte + type MessageBlock struct + Msg *Message + Offset int64 + func (msb *MessageBlock) Messages() []*MessageBlock + type MessageSet struct + Messages []*MessageBlock + PartialTrailingMessage bool + type MetadataRequest struct + Topics []string + type MetadataResponse struct + Brokers []*Broker + Topics []*TopicMetadata + func (m *MetadataResponse) AddBroker(addr string, id int32) + func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata + func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) + type OffsetCommitRequest struct + ConsumerGroup string + ConsumerGroupGeneration int32 + ConsumerID string + RetentionTime int64 + Version int16 + func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, ...) + type OffsetCommitResponse struct + Errors map[string]map[int32]KError + func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) + type OffsetFetchRequest struct + ConsumerGroup string + Version int16 + func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) + type OffsetFetchResponse struct + Blocks map[string]map[int32]*OffsetFetchResponseBlock + func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) + func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock + type OffsetFetchResponseBlock struct + Err KError + Metadata string + Offset int64 + type OffsetManager interface + Close func() error + ManagePartition func(topic string, partition int32) (PartitionOffsetManager, error) + func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) + type OffsetRequest struct + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) + type OffsetResponse struct + Blocks map[string]map[int32]*OffsetResponseBlock + func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) + func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock + type OffsetResponseBlock struct + Err KError + Offsets []int64 + type PacketDecodingError struct + Info string + func (err PacketDecodingError) Error() string + type PacketEncodingError struct + Info string + func (err PacketEncodingError) Error() string + type PartitionConsumer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + HighWaterMarkOffset func() int64 + Messages func() <-chan *ConsumerMessage + type PartitionMetadata struct + Err KError + ID int32 + Isr []int32 + Leader int32 + Replicas []int32 + type PartitionOffsetManager interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + MarkOffset func(offset int64, metadata string) + NextOffset func() (int64, string) + type Partitioner interface + Partition func(message *ProducerMessage, numPartitions int32) (int32, error) + RequiresConsistency func() bool + func NewHashPartitioner(topic string) Partitioner + func NewManualPartitioner(topic string) Partitioner + func NewRandomPartitioner(topic string) Partitioner + func NewRoundRobinPartitioner(topic string) Partitioner + type PartitionerConstructor func(topic string) Partitioner + type ProduceRequest struct + RequiredAcks RequiredAcks + Timeout int32 + func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) + func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) + type ProduceResponse struct + Blocks map[string]map[int32]*ProduceResponseBlock + func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) + func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock + type ProduceResponseBlock struct + Err KError + Offset int64 + type ProducerError struct + Err error + Msg *ProducerMessage + func (pe ProducerError) Error() string + type ProducerErrors []*ProducerError + func (pe ProducerErrors) Error() string + type ProducerMessage struct + Key Encoder + Metadata interface{} + Offset int64 + Partition int32 + Topic string + Value Encoder + type RequiredAcks int16 + const NoResponse + const WaitForAll + const WaitForLocal + type StdLogger interface + Print func(v ...interface{}) + Printf func(format string, v ...interface{}) + Println func(v ...interface{}) + var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) + type StringEncoder string + func (s StringEncoder) Encode() ([]byte, error) + func (s StringEncoder) Length() int + type SyncGroupRequest struct + GenerationId int32 + GroupAssignments map[string][]byte + GroupId string + MemberId string + func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) + type SyncGroupResponse struct + Err KError + MemberAssignment []byte + type SyncProducer interface + Close func() error + SendMessage func(msg *ProducerMessage) (partition int32, offset int64, err error) + func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) + func NewSyncProducerFromClient(client Client) (SyncProducer, error) + type TopicMetadata struct + Err KError + Name string + Partitions []*PartitionMetadata