Versions in this module Expand all Collapse all v0 v0.3.2 Jun 22, 2015 Changes in this version + const OffsetNewest + const OffsetOldest + const ReceiveTime + var ErrAlreadyConnected = errors.New("kafka: broker connection already initiated") + var ErrClosedClient = errors.New("kafka: tried to use a client that was closed") + var ErrIncompleteResponse = errors.New("kafka: response did not contain all the expected topic/partition blocks") + var ErrInsufficientData = errors.New("kafka: insufficient data to decode packet, more bytes expected") + var ErrInvalidPartition = errors.New("kafka: partitioner returned an invalid partition index") + var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") + var ErrNotConnected = errors.New("kafka: broker not connected") + var ErrOutOfBrokers = errors.New(...) + var ErrShuttingDown = errors.New("kafka: message received by producer in process of shutting down") + var MaxRequestSize int32 = 100 * 1024 * 1024 + var MaxResponseSize int32 = 100 * 1024 * 1024 + var PanicHandler func(interface{}) + type AsyncProducer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ProducerError + Input func() chan<- *ProducerMessage + Successes func() <-chan *ProducerMessage + func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) + func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) + type Broker struct + func NewBroker(addr string) *Broker + func (b *Broker) Addr() string + func (b *Broker) Close() error + func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitResponse, error) + func (b *Broker) Connected() (bool, error) + func (b *Broker) Fetch(request *FetchRequest) (*FetchResponse, error) + func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) + func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, error) + func (b *Broker) GetConsumerMetadata(request *ConsumerMetadataRequest) (*ConsumerMetadataResponse, error) + func (b *Broker) GetMetadata(request *MetadataRequest) (*MetadataResponse, error) + func (b *Broker) ID() int32 + func (b *Broker) Open(conf *Config) error + func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) + type ByteEncoder []byte + func (b ByteEncoder) Encode() ([]byte, error) + func (b ByteEncoder) Length() int + type Client interface + Close func() error + Closed func() bool + Config func() *Config + Coordinator func(consumerGroup string) (*Broker, error) + GetOffset func(topic string, partitionID int32, time int64) (int64, error) + Leader func(topic string, partitionID int32) (*Broker, error) + Partitions func(topic string) ([]int32, error) + RefreshCoordinator func(consumerGroup string) error + RefreshMetadata func(topics ...string) error + Replicas func(topic string, partitionID int32) ([]int32, error) + Topics func() ([]string, error) + WritablePartitions func(topic string) ([]int32, error) + func NewClient(addrs []string, conf *Config) (Client, error) + type CompressionCodec int8 + const CompressionGZIP + const CompressionNone + const CompressionSnappy + type Config struct + ChannelBufferSize int + ClientID string + Consumer struct{ ... } + Metadata struct{ ... } + Net struct{ ... } + Producer struct{ ... } + func NewConfig() *Config + func (c *Config) Validate() error + type ConfigurationError string + func (err ConfigurationError) Error() string + type Consumer interface + Close func() error + ConsumePartition func(topic string, partition int32, offset int64) (PartitionConsumer, error) + Partitions func(topic string) ([]int32, error) + Topics func() ([]string, error) + func NewConsumer(addrs []string, config *Config) (Consumer, error) + func NewConsumerFromClient(client Client) (Consumer, error) + type ConsumerError struct + Err error + Partition int32 + Topic string + func (ce ConsumerError) Error() string + type ConsumerErrors []*ConsumerError + func (ce ConsumerErrors) Error() string + type ConsumerMessage struct + Key []byte + Offset int64 + Partition int32 + Topic string + Value []byte + type ConsumerMetadataRequest struct + ConsumerGroup string + type ConsumerMetadataResponse struct + Coordinator *Broker + CoordinatorHost string + CoordinatorID int32 + CoordinatorPort int32 + Err KError + type Encoder interface + Encode func() ([]byte, error) + Length func() int + type FetchRequest struct + MaxWaitTime int32 + MinBytes int32 + func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) + type FetchResponse struct + Blocks map[string]map[int32]*FetchResponseBlock + func (fr *FetchResponse) AddError(topic string, partition int32, err KError) + func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) + func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock + type FetchResponseBlock struct + Err KError + HighWaterMarkOffset int64 + MsgSet MessageSet + type KError int16 + const ErrBrokerNotAvailable + const ErrConsumerCoordinatorNotAvailable + const ErrInvalidMessage + const ErrInvalidMessageSize + const ErrInvalidTopic + const ErrLeaderNotAvailable + const ErrMessageSetSizeTooLarge + const ErrMessageSizeTooLarge + const ErrNoError + const ErrNotCoordinatorForConsumer + const ErrNotEnoughReplicas + const ErrNotEnoughReplicasAfterAppend + const ErrNotLeaderForPartition + const ErrOffsetMetadataTooLarge + const ErrOffsetOutOfRange + const ErrOffsetsLoadInProgress + const ErrReplicaNotAvailable + const ErrRequestTimedOut + const ErrStaleControllerEpochCode + const ErrUnknown + const ErrUnknownTopicOrPartition + func (err KError) Error() string + type Message struct + Codec CompressionCodec + Key []byte + Set *MessageSet + Value []byte + type MessageBlock struct + Msg *Message + Offset int64 + func (msb *MessageBlock) Messages() []*MessageBlock + type MessageSet struct + Messages []*MessageBlock + PartialTrailingMessage bool + type MetadataRequest struct + Topics []string + type MetadataResponse struct + Brokers []*Broker + Topics []*TopicMetadata + func (m *MetadataResponse) AddBroker(addr string, id int32) + func (m *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata + func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) + type OffsetCommitRequest struct + ConsumerGroup string + ConsumerGroupGeneration int32 + ConsumerID string + RetentionTime int64 + Version int16 + func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, ...) + type OffsetCommitResponse struct + Errors map[string]map[int32]KError + type OffsetFetchRequest struct + ConsumerGroup string + Version int16 + func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) + type OffsetFetchResponse struct + Blocks map[string]map[int32]*OffsetFetchResponseBlock + type OffsetFetchResponseBlock struct + Err KError + Metadata string + Offset int64 + type OffsetRequest struct + func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) + type OffsetResponse struct + Blocks map[string]map[int32]*OffsetResponseBlock + func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) + func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock + type OffsetResponseBlock struct + Err KError + Offsets []int64 + type PacketDecodingError struct + Info string + func (err PacketDecodingError) Error() string + type PacketEncodingError struct + Info string + func (err PacketEncodingError) Error() string + type PartitionConsumer interface + AsyncClose func() + Close func() error + Errors func() <-chan *ConsumerError + HighWaterMarkOffset func() int64 + Messages func() <-chan *ConsumerMessage + type PartitionMetadata struct + Err KError + ID int32 + Isr []int32 + Leader int32 + Replicas []int32 + type Partitioner interface + Partition func(message *ProducerMessage, numPartitions int32) (int32, error) + RequiresConsistency func() bool + func NewHashPartitioner(topic string) Partitioner + func NewManualPartitioner(topic string) Partitioner + func NewRandomPartitioner(topic string) Partitioner + func NewRoundRobinPartitioner(topic string) Partitioner + type PartitionerConstructor func(topic string) Partitioner + type ProduceRequest struct + RequiredAcks RequiredAcks + Timeout int32 + func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) + func (p *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) + type ProduceResponse struct + Blocks map[string]map[int32]*ProduceResponseBlock + func (pr *ProduceResponse) AddTopicPartition(topic string, partition int32, err KError) + func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock + type ProduceResponseBlock struct + Err KError + Offset int64 + type ProducerError struct + Err error + Msg *ProducerMessage + func (pe ProducerError) Error() string + type ProducerErrors []*ProducerError + func (pe ProducerErrors) Error() string + type ProducerMessage struct + Key Encoder + Metadata interface{} + Offset int64 + Partition int32 + Topic string + Value Encoder + type RequiredAcks int16 + const NoResponse + const WaitForAll + const WaitForLocal + type StdLogger interface + Print func(v ...interface{}) + Printf func(format string, v ...interface{}) + Println func(v ...interface{}) + var Logger StdLogger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) + type StringEncoder string + func (s StringEncoder) Encode() ([]byte, error) + func (s StringEncoder) Length() int + type SyncProducer interface + Close func() error + SendMessage func(msg *ProducerMessage) (partition int32, offset int64, err error) + func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) + func NewSyncProducerFromClient(client Client) (SyncProducer, error) + type TopicMetadata struct + Err KError + Name string + Partitions []*PartitionMetadata