Documentation ¶
Index ¶
- Constants
- Variables
- type ConsumerGroup
- func (cg *ConsumerGroup) Checkout(callback func(*PartitionConsumer) error) error
- func (cg *ConsumerGroup) Claims() []int32
- func (cg *ConsumerGroup) Close() error
- func (cg *ConsumerGroup) Commit(partition int32, offset int64) error
- func (cg *ConsumerGroup) EventsBehindLatest() (map[int32]int64, error)
- func (cg *ConsumerGroup) Offset(partition int32) (int64, error)
- func (cg *ConsumerGroup) Stream() <-chan *sarama.ConsumerEvent
- type ConsumerGroupConfig
- type EventBatch
- type EventStream
- type Notification
- type PartitionConsumer
- type ZK
- func (z *ZK) Brokers() ([]string, error)
- func (z *ZK) Claim(group, topic string, partition int32, id string) (err error)
- func (z *ZK) Commit(group, topic string, partition int32, offset int64) (err error)
- func (z *ZK) Consumers(group string) ([]string, <-chan zk.Event, error)
- func (z *ZK) Create(node string, value []byte, ephemeral bool) (err error)
- func (z *ZK) DeleteAll(node string) (err error)
- func (z *ZK) Exists(node string) (ok bool, err error)
- func (z *ZK) MkdirAll(node string) (err error)
- func (z *ZK) Offset(group, topic string, partition int32) (int64, error)
- func (z *ZK) RegisterConsumer(group, id, topic string) error
- func (z *ZK) RegisterGroup(group string) error
- func (z *ZK) Release(group, topic string, partition int32, id string) error
Examples ¶
Constants ¶
const ( REBALANCE_START uint8 = iota + 1 REBALANCE_OK REBALANCE_ERROR )
Variables ¶
var ( DiscardCommit = errors.New("sarama: commit discarded") NoCheckout = errors.New("sarama: not checkout") )
Functions ¶
This section is empty.
Types ¶
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
A ConsumerGroup operates on all partitions of a single topic. The goal is to ensure each topic message is consumed only once, no matter of the number of consumer instances within a cluster, as described in: http://kafka.apache.org/documentation.html#distributionimpl.
The ConsumerGroup internally creates multiple Consumer instances. It uses Zookkeper and follows a simple consumer rebalancing algorithm which allows all the consumers in a group to come into consensus on which consumer is consuming which partitions. Each ConsumerGroup can 'claim' 0-n partitions and will consume their messages until another ConsumerGroup instance with the same name joins or leaves the cluster.
Unlike stated in the Kafka documentation, consumer rebalancing is *only* triggered on each addition or removal of consumers within the same group, while the addition of broker nodes and/or partition *does currently not trigger* a rebalancing cycle.
Example ¶
consumerGroupName := "my_consumer_group_name" kafkaTopic := "my_topic" zookeeper := []string{"localhost:2181"} consumer, consumerErr := JoinConsumerGroup(consumerGroupName, kafkaTopic, zookeeper, nil) if consumerErr != nil { log.Fatalln(consumerErr) } c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) go func() { <-c consumer.Close() }() eventCount := 0 stream := consumer.Stream() for { event, ok := <-stream if !ok { break } // Process event log.Println(string(event.Value)) eventCount += 1 } log.Printf("Processed %d events.", eventCount)
Output:
func JoinConsumerGroup ¶
func JoinConsumerGroup(name string, topic string, zookeeper []string, config *ConsumerGroupConfig) (cg *ConsumerGroup, err error)
Connects to a consumer group, using Zookeeper for auto-discovery
func NewConsumerGroup ¶
func NewConsumerGroup(client *sarama.Client, zoo *ZK, name string, topic string, listener chan *Notification, config *ConsumerGroupConfig) (group *ConsumerGroup, err error)
NewConsumerGroup creates a new consumer group for a given topic.
You MUST call Close() on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
func (*ConsumerGroup) Checkout ¶
func (cg *ConsumerGroup) Checkout(callback func(*PartitionConsumer) error) error
Checkout applies a callback function to a single partition consumer. The latest consumer offset is automatically comitted to zookeeper if successful. The callback may return a DiscardCommit error to skip the commit silently. Returns an error if any, but may also return a NoCheckout error to indicate that no partition was available. You should add an artificial delay keep your CPU cool.
func (*ConsumerGroup) Claims ¶
func (cg *ConsumerGroup) Claims() []int32
Claims returns the claimed partitions
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close() error
Close closes the consumer group
func (*ConsumerGroup) Commit ¶
func (cg *ConsumerGroup) Commit(partition int32, offset int64) error
Commit manually commits an offset for a partition
func (*ConsumerGroup) EventsBehindLatest ¶
func (cg *ConsumerGroup) EventsBehindLatest() (map[int32]int64, error)
func (*ConsumerGroup) Offset ¶
func (cg *ConsumerGroup) Offset(partition int32) (int64, error)
Offset manually retrives an offset for a partition
func (*ConsumerGroup) Stream ¶
func (cg *ConsumerGroup) Stream() <-chan *sarama.ConsumerEvent
type ConsumerGroupConfig ¶
type ConsumerGroupConfig struct { // The Zookeeper read timeout ZookeeperTimeout time.Duration // Zookeeper chroot to use. Should not include a trailing slash. // Leave this empty for to not set a chroot. ZookeeperChroot string // The preempt interval when listening to a single partition of a topic. // After this interval, the current offset will be committed to Zookeeper, // and a different partition will be checked out to consume next. CheckoutInterval time.Duration KafkaClientConfig *sarama.ClientConfig // This will be passed to Sarama when creating a new Client KafkaConsumerConfig *sarama.ConsumerConfig // This will be passed to Sarama when creating a new Consumer }
func NewConsumerGroupConfig ¶
func NewConsumerGroupConfig() *ConsumerGroupConfig
func (*ConsumerGroupConfig) Validate ¶
func (cgc *ConsumerGroupConfig) Validate() error
type EventBatch ¶
type EventBatch struct { Topic string Partition int32 Events []sarama.ConsumerEvent }
EventBatch is a batch of events from a single topic/partition
type EventStream ¶
type EventStream interface { Events() <-chan *sarama.ConsumerEvent Close() error }
EventStream is an abstraction of a sarama.Consumer
type Notification ¶
type Notification struct { Type uint8 Src *ConsumerGroup Err error }
A subscribable notification
type PartitionConsumer ¶
type PartitionConsumer struct {
// contains filtered or unexported fields
}
PartitionConsumer can consume a single partition of a single topic
func NewPartitionConsumer ¶
func NewPartitionConsumer(group *ConsumerGroup, partition int32) (*PartitionConsumer, error)
NewPartitionConsumer creates a new partition consumer instance
func (*PartitionConsumer) Close ¶
func (p *PartitionConsumer) Close() error
Close closes a partition consumer
func (*PartitionConsumer) Fetch ¶
func (p *PartitionConsumer) Fetch(stream chan *sarama.ConsumerEvent, duration time.Duration, stopper chan bool) error
Fetch returns a batch of events WARNING: may return nil if not events are available
type ZK ¶
ZK wraps a zookeeper connection
func (*ZK) RegisterConsumer ¶
CreateConsumer registers a new consumer within a group
func (*ZK) RegisterGroup ¶
RegisterGroup creates/updates a group directory