Documentation ¶
Index ¶
- type Set
- type T
- func (p *T) AsyncProduce(topic string, key, message sarama.Encoder)
- func (p *T) Consume(group, topic string) (*consumer.Message, error)
- func (p *T) GetAllTopicConsumers(topic string) (map[string]map[string][]int32, error)
- func (p *T) GetGroupOffsets(group, topic string) ([]admin.PartitionOffset, error)
- func (p *T) GetTopicConsumers(group, topic string) (map[string][]int32, error)
- func (p *T) Produce(topic string, key, message sarama.Encoder) (*sarama.ProducerMessage, error)
- func (p *T) SetGroupOffsets(group, topic string, offsets []admin.PartitionOffset) error
- func (p *T) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Set ¶
type Set struct {
// contains filtered or unexported fields
}
Set represents a collection of proxy.T instances with a default value.
type T ¶
type T struct {
// contains filtered or unexported fields
}
T implements a proxy to a particular Kafka/ZooKeeper cluster.
func (*T) AsyncProduce ¶
AsyncProduce is an asynchronously counterpart of the `Produce` function. Errors are silently ignored.
func (*T) Consume ¶
Consume consumes a message from the specified topic on behalf of the specified consumer group. If there are no more new messages in the topic at the time of the request then it will block for `Config.Consumer.LongPollingTimeout`. If no new message is produced during that time, then `ErrRequestTimeout` is returned.
Note that during state transitions topic subscribe<->unsubscribe and consumer group register<->deregister the method may return either `ErrBufferOverflow` or `ErrRequestTimeout` even when there are messages available for consumption. In that case the user should back off a bit and then repeat the request.
func (*T) GetAllTopicConsumers ¶
GetAllTopicConsumers returns group -> client-id -> consumed-partitions-list mapping for a particular topic. Warning, the function performs scan of all consumer groups registered in ZooKeeper and therefore can take a lot of time.
func (*T) GetGroupOffsets ¶
func (p *T) GetGroupOffsets(group, topic string) ([]admin.PartitionOffset, error)
GetGroupOffsets for every partition of the specified topic it returns the current offset range along with the latest offset and metadata committed by the specified consumer group.
func (*T) GetTopicConsumers ¶
GetTopicConsumers returns client-id -> consumed-partitions-list mapping for a clients from a particular consumer group and a particular topic.
func (*T) Produce ¶
Produce submits a message to the specified `topic` of the Kafka cluster using `key` to identify a destination partition. The exact algorithm used to map keys to partitions is implementation specific but it is guaranteed that it returns consistent results. If `key` is `nil`, then the message is placed into a random partition.
Errors usually indicate a catastrophic failure of the Kafka cluster, or missing topic if there cluster is not configured to auto create topics.
func (*T) SetGroupOffsets ¶
func (p *T) SetGroupOffsets(group, topic string, offsets []admin.PartitionOffset) error
SetGroupOffsets commits specific offset values along with metadata for a list of partitions of a particular topic on behalf of the specified group.