cluster

package module
v2.1.7+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2017 License: MIT Imports: 8 Imported by: 1

README

Sarama Cluster

GoDoc Build Status Go Report Card License

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Documentation

Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster

Example

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume messages, watch errors and notifications
	for {
		select {
		case msg, more := <-consumer.Messages():
			if more {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "")	// mark message as processed
			}
		case err, more := <-consumer.Errors():
			if more {
				log.Printf("Error: %s\n", err.Error())
			}
		case ntf, more := <-consumer.Notifications():
			if more {
				log.Printf("Rebalanced: %+v\n", ntf)
			}
		case <-signals:
			return
		}
	}
}

Running tests

You need to install Ginkgo & Gomega to run tests. Please see http://onsi.github.io/ginkgo for more details.

To run tests, call:

$ make test

Troubleshooting

Consumer not receiving any messages?

By default, sarama's Config.Consumer.Offsets.Initial is set to sarama.OffsetNewest. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.

If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set Config.Consumer.Offsets.Initial to sarama.OffsetOldest.

Documentation

Overview

Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.

It requires Kafka v0.9+ and follows the steps guide, described in: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	sarama.Client
	// contains filtered or unexported fields
}

Client is a group client

func NewClient

func NewClient(addrs []string, config *Config) (*Client, error)

NewClient creates a new client instance

type Config

type Config struct {
	sarama.Config

	// Group is the namespace for group management properties
	Group struct {
		// The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange)
		PartitionStrategy Strategy
		Offsets           struct {
			Retry struct {
				// The numer retries when committing offsets (defaults to 3).
				Max int
			}
		}
		Session struct {
			// The allowed session timeout for registered consumers (defaults to 30s).
			// Must be within the allowed server range.
			Timeout time.Duration
		}
		Heartbeat struct {
			// Interval between each heartbeat (defaults to 3s). It should be no more
			// than 1/3rd of the Group.Session.Timout setting
			Interval time.Duration
		}
		// Return specifies which group channels will be populated. If they are set to true,
		// you must read from the respective channels to prevent deadlock.
		Return struct {
			// If enabled, rebalance notification will be returned on the
			// Notifications channel (default disabled).
			Notifications bool
		}

		Topics struct {
			// An additional whitelist of topics to subscribe to.
			Whitelist *regexp.Regexp
			// An additional blacklist of topics to avoid. If set, this will precede over
			// the Whitelist setting.
			Blacklist *regexp.Regexp
		}

		Member struct {
			// Custom metadata to include when joining the group. The user data for all joined members
			// can be retrieved by sending a DescribeGroupRequest to the broker that is the
			// coordinator for the group.
			UserData []byte
		}
	}
}

Config extends sarama.Config with Group specific namespace

func NewConfig

func NewConfig() *Config

NewConfig returns a new configuration instance with sane defaults.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a cluster group consumer

Example

This example shows how to use the consumer can read messages from a multiple topics.

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume messages, watch errors and notifications
	for {
		select {
		case msg, more := <-consumer.Messages():
			if more {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "") // mark message as processed
			}
		case err, more := <-consumer.Errors():
			if more {
				log.Printf("Error: %s\n", err.Error())
			}
		case ntf, more := <-consumer.Notifications():
			if more {
				log.Printf("Rebalanced: %+v\n", ntf)
			}
		case <-signals:
			return
		}
	}
}
Output:

func NewConsumer

func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error)

NewConsumer initializes a new consumer

func NewConsumerFromClient

func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error)

NewConsumerFromClient initializes a new consumer from an existing client

func (*Consumer) Close

func (c *Consumer) Close() (err error)

Close safely closes the consumer and releases all resources

func (*Consumer) CommitOffsets

func (c *Consumer) CommitOffsets() error

CommitOffsets manually commits marked offsets

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

Errors returns a read channel of errors that occur during offset management, if enabled. By default, errors are logged and not returned over this channel. If you want to implement any custom error handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.

func (*Consumer) HighWaterMarks

func (c *Consumer) HighWaterMarks() map[string]map[int32]int64

HighWaterMarks returns the current high water marks for each topic and partition Consistency between partitions is not guaranteed since high water marks are updated separately.

func (*Consumer) MarkOffset

func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)

MarkOffset marks the provided message as processed, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.

Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice, and your processing should ideally be idempotent.

func (*Consumer) MarkOffsets

func (c *Consumer) MarkOffsets(s *OffsetStash)

MarkOffsets marks stashed offsets as processed. See MarkOffset for additional explanation.

func (*Consumer) MarkPartitionOffset

func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)

MarkPartitionOffset marks an offset of the provided topic/partition as processed. See MarkOffset for additional explanation.

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage

Messages returns the read channel for the messages that are returned by the broker.

func (*Consumer) Notifications

func (c *Consumer) Notifications() <-chan *Notification

Notifications returns a channel of Notifications that occur during consumer rebalancing. Notifications will only be emitted over this channel, if your config's Group.Return.Notifications setting to true.

func (*Consumer) Subscriptions

func (c *Consumer) Subscriptions() map[string][]int32

Subscriptions returns the consumed topics and partitions

type Error

type Error struct {
	Ctx string
	// contains filtered or unexported fields
}

Error instances are wrappers for internal errors with a context and may be returned through the consumer's Errors() channel

type Notification

type Notification struct {
	// Claimed contains topic/partitions that were claimed by this rebalance cycle
	Claimed map[string][]int32

	// Released contains topic/partitions that were released as part of this rebalance cycle
	Released map[string][]int32

	// Current are topic/partitions that are currently claimed to the consumer
	Current map[string][]int32
}

Notification events are emitted by the consumers on rebalancing

type OffsetStash

type OffsetStash struct {
	// contains filtered or unexported fields
}

OffsetStash allows to accumulate offsets and mark them as processed in a bulk

func NewOffsetStash

func NewOffsetStash() *OffsetStash

NewOffsetStash inits a blank stash

func (*OffsetStash) MarkOffset

func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string)

MarkOffset stashes the provided message offset

func (*OffsetStash) MarkPartitionOffset

func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)

MarkPartitionOffset stashes the offset for the provided topic/partition combination

func (*OffsetStash) Offsets

func (s *OffsetStash) Offsets() map[string]int64

Offsets returns the latest stashed offsets by topic-partition

type Strategy

type Strategy string

Strategy for partition to consumer assignement

const (
	// StrategyRange is the default and assigns partition ranges to consumers.
	// Example with six partitions and two consumers:
	//   C1: [0, 1, 2]
	//   C2: [3, 4, 5]
	StrategyRange Strategy = "range"

	// StrategyRoundRobin assigns partitions by alternating over consumers.
	// Example with six partitions and two consumers:
	//   C1: [0, 2, 4]
	//   C2: [1, 3, 5]
	StrategyRoundRobin Strategy = "roundrobin"
)

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL