Documentation ¶
Overview ¶
package kafkaconsumer is a clone of github.com/wvanbergen/kafka/kafkaconsumer with sarama references replaced with gopkg.in links instead of "raw" Github
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var Logger sarama.StdLogger
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { *sarama.Config Zookeeper *kazoo.Config MaxProcessingTime time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute. Offsets struct { Initial int64 // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest. } }
type Consumer ¶
type Consumer interface { // Interrups will initiate the shutdown procedure of the consumer, and return immediately. // When you are done using the consumer, you must either call Close or Interrupt to prevent leaking memory. Interrupt() // Closes will start the shutdown procedure for the consumer and wait for it to complete. // When you are done using the consumer, you must either call Close or Interrupt to prevent leaking memory. Close() error // Messages returns a channel that you can read to obtain messages from Kafka to process. // Every message that you receive from this channel should be sent to Ack after it has been processed. Messages() <-chan *sarama.ConsumerMessage // Error returns a channel that you can read to obtain errors that occur. Errors() <-chan error // Ack marks a message as processed, indicating that the message offset can be committed // for the message's partition by the offset manager. Note that the offset manager may decide // not to commit every offset immediately for efficiency reasons. Calling Close or Interrupt // will make sure that the last offset provided to this function will be flushed to storage. // You have to provide the messages in the same order as you received them from the Messages // channel. Ack(*sarama.ConsumerMessage) }
Consumer represents a consumer instance and is the main interface to work with as a consumer of this library.
Example ¶
This example sets up a consumer instance that consumes two topics, processes and commits the messages that are consumed, and properly shuts down the consumer when the process is interrupted.
consumer, err := Join( "ExampleConsumerGroup", // name of the consumer group TopicSubscription("access_log", "audit_log"), // topics to subscribe to "zk1:2181,zk2:2181,zk3:2181/chroot", // zookeeper connection string nil) // Set this to a *Config instance to override defaults if err != nil { log.Fatalln(err) } // Trap the interrupt signal to cleanly shut down the consumer c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) go func() { <-c consumer.Interrupt() }() eventCount := 0 for message := range consumer.Messages() { // Process message log.Println(string(message.Value)) eventCount += 1 // Acknowledge that the message has been processed consumer.Ack(message) } log.Printf("Processed %d events.", eventCount)
Output:
func Join ¶
func Join(group string, subscription Subscription, zookeeper string, config *Config) (Consumer, error)
Join joins a Kafka consumer group, and returns a Consumer instance.
- `group` is the name of the group this consumer instance will join . All instances that form a consumer group should use the same name. A group name must be unique per Kafka cluster.
- `subscription` is an object that describes what partitions the group wants to consume. A single instance may end up consuming between zero of them, or all of them, or any number in between. Every running instance in a group should use the same subscription; the behavior is undefined if that is not the case.
- `zookeeper` is the zookeeper connection string, e.g. "zk1:2181,zk2:2181,zk3:2181/chroot"
- `config` specifies the configuration. If it is nil, a default configuration is used.
type Subscription ¶
type Subscription interface { // WatchPartitions returns a list of partitions that the consumer group should // consume, and a channel that will be fired if this list has changed. WatchPartitions(kazoo *kazoo.Kazoo) (kazoo.PartitionList, <-chan zk.Event, error) // JSON returns a JSON-encoded representation of the subscription, which will be // stored in Zookeeper alongside every running instance registration. JSON() ([]byte, error) }
Subscription describes what topics/partitions a consumer instance is subscribed to. This can be a static list of topic, or can be a regular expression that acts as a whitelist or blacklist of topics.
The subscription is responsible for watching zookeeper to changes is the list of topics or partitions, and notify the consumer so it can trigger a rebalance.
func BlacklistSubscription ¶
func BlacklistSubscription(pattern *regexp.Regexp) Subscription
BlacklistSubscription creates a subscription on topics that do not match a given regular expression. It will automatically subscribe to new topics that don't match the pattern when they are created.
func StaticSubscription ¶
func StaticSubscription(subscription map[string]int) Subscription
StaticSubscription creates a static subscription for a map of topics, and the number of streams that will be used to consume it.
func TopicSubscription ¶
func TopicSubscription(topics ...string) Subscription
TopicSubscription creates a static subscription for a list of topics.
func WhitelistSubscription ¶
func WhitelistSubscription(pattern *regexp.Regexp) Subscription
WhitelistSubscription creates a subscription on topics that match a given regular expression. It will automatically subscribe to new topics that match the pattern when they are created.
type SubscriptionPattern ¶
type SubscriptionPattern string
const ( SubscriptionPatternStatic SubscriptionPattern = "static" SubscriptionPatternWhiteList SubscriptionPattern = "white_list" SubscriptionPatternBlackList SubscriptionPattern = "black_list" )