Documentation ¶
Overview ¶
Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.
It requires Kafka v0.9+ and follows the steps guide, described in: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
Index ¶
- type Client
- type Config
- type Consumer
- func (c *Consumer) Close() (err error)
- func (c *Consumer) CommitOffsets() error
- func (c *Consumer) Errors() <-chan error
- func (c *Consumer) HighWaterMarks() map[string]map[int32]int64
- func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)
- func (c *Consumer) MarkOffsets(s *OffsetStash)
- func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
- func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage
- func (c *Consumer) Notifications() <-chan *Notification
- func (c *Consumer) Subscriptions() map[string][]int32
- type Error
- type Notification
- type OffsetStash
- type Strategy
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { sarama.Config // Group is the namespace for group management properties Group struct { // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange) PartitionStrategy Strategy Offsets struct { Retry struct { // The numer retries when committing offsets (defaults to 3). Max int } } Session struct { // The allowed session timeout for registered consumers (defaults to 30s). // Must be within the allowed server range. Timeout time.Duration } Heartbeat struct { // Interval between each heartbeat (defaults to 3s). It should be no more // than 1/3rd of the Group.Session.Timout setting Interval time.Duration } // Return specifies which group channels will be populated. If they are set to true, // you must read from the respective channels to prevent deadlock. Return struct { // If enabled, rebalance notification will be returned on the // Notifications channel (default disabled). Notifications bool } Topics struct { // An additional whitelist of topics to subscribe to. Whitelist *regexp.Regexp // An additional blacklist of topics to avoid. If set, this will precede over // the Whitelist setting. Blacklist *regexp.Regexp } Member struct { // Custom metadata to include when joining the group. The user data for all joined members // can be retrieved by sending a DescribeGroupRequest to the broker that is the // coordinator for the group. UserData []byte } } }
Config extends sarama.Config with Group specific namespace
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a cluster group consumer
Example ¶
This example shows how to use the consumer can read messages from a multiple topics.
package main import ( "fmt" "log" "os" "os/signal" cluster "github.com/bsm/sarama-cluster" ) func main() { // init (custom) config, enable errors and notifications config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true // init consumer brokers := []string{"127.0.0.1:9092"} topics := []string{"my_topic", "other_topic"} consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config) if err != nil { panic(err) } defer consumer.Close() // watch errors go func() { for err := range consumer.Errors() { log.Printf("Error: %s\n", err.Error()) } }() // watch notifications go func() { for note := range consumer.Notifications() { log.Printf("Rebalanced: %+v\n", note) } }() // trap SIGINT to trigger a shutdown. signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // consume messages for { select { case msg := <-consumer.Messages(): fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) consumer.MarkOffset(msg, "") // mark message as processed case <-signals: return } } }
Output:
func NewConsumer ¶
func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error)
NewConsumer initializes a new consumer
func NewConsumerFromClient ¶
NewConsumerFromClient initializes a new consumer from an existing client
func (*Consumer) CommitOffsets ¶
CommitOffsets manually commits marked offsets
func (*Consumer) Errors ¶
Errors returns a read channel of errors that occur during offset management, if enabled. By default, errors are logged and not returned over this channel. If you want to implement any custom error handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.
func (*Consumer) HighWaterMarks ¶
HighWaterMarks returns the current high water marks for each topic and partition Consistency between partitions is not guaranteed since high water marks are updated separately.
func (*Consumer) MarkOffset ¶
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)
MarkOffset marks the provided message as processed, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.
Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice, and your processing should ideally be idempotent.
func (*Consumer) MarkOffsets ¶
func (c *Consumer) MarkOffsets(s *OffsetStash)
MarkOffsets marks stashed offsets as processed. See MarkOffset for additional explanation.
func (*Consumer) MarkPartitionOffset ¶
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
MarkPartitionOffset marks an offset of the provided topic/partition as processed. See MarkOffset for additional explanation.
func (*Consumer) Messages ¶
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage
Messages returns the read channel for the messages that are returned by the broker.
func (*Consumer) Notifications ¶
func (c *Consumer) Notifications() <-chan *Notification
Notifications returns a channel of Notifications that occur during consumer rebalancing. Notifications will only be emitted over this channel, if your config's Group.Return.Notifications setting to true.
func (*Consumer) Subscriptions ¶
Subscriptions returns the consumed topics and partitions
type Error ¶
type Error struct { Ctx string // contains filtered or unexported fields }
Error instances are wrappers for internal errors with a context and may be returned through the consumer's Errors() channel
type Notification ¶
type Notification struct { // Claimed contains topic/partitions that were claimed by this rebalance cycle Claimed map[string][]int32 // Released contains topic/partitions that were released as part of this rebalance cycle Released map[string][]int32 // Current are topic/partitions that are currently claimed to the consumer Current map[string][]int32 }
Notification events are emitted by the consumers on rebalancing
type OffsetStash ¶
type OffsetStash struct {
// contains filtered or unexported fields
}
OffsetStash allows to accumulate offsets and mark them as processed in a bulk
func (*OffsetStash) MarkOffset ¶
func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string)
MarkOffset stashes the provided message offset
func (*OffsetStash) MarkPartitionOffset ¶
func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
MarkPartitionOffset stashes the offset for the provided topic/partition combination
func (*OffsetStash) Offsets ¶
func (s *OffsetStash) Offsets() map[string]int64
Offsets returns the latest stashed offsets by topic-partition
type Strategy ¶
type Strategy string
Strategy for partition to consumer assignement
const ( // StrategyRange is the default and assigns partition ranges to consumers. // Example with six partitions and two consumers: // C1: [0, 1, 2] // C2: [3, 4, 5] StrategyRange Strategy = "range" // StrategyRoundRobin assigns partitions by alternating over consumers. // Example with six partitions and two consumers: // C1: [0, 2, 4] // C2: [1, 3, 5] StrategyRoundRobin Strategy = "roundrobin" )