cluster

package
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: MIT, MIT Imports: 9 Imported by: 0

README

Sarama Cluster

GoDoc Build Status Go Report Card License

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Documentation

Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster

Examples

Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple topics and partitions are all passed to the single channel:

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume errors
	go func() {
		for err := range consumer.Errors() {
			log.Printf("Error: %s\n", err.Error())
		}
	}()

	// consume notifications
	go func() {
		for ntf := range consumer.Notifications() {
			log.Printf("Rebalanced: %+v\n", ntf)
		}
	}()

	// consume messages, watch signals
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "")	// mark message as processed
			}
		case <-signals:
			return
		}
	}
}

Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level consumers:

package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"

  cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, set mode to ConsumerModePartitions
	config := cluster.NewConfig()
	config.Group.Mode = cluster.ConsumerModePartitions

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume partitions
	for {
		select {
		case part, ok := <-consumer.Partitions():
			if !ok {
				return
			}

			// start a separate goroutine to consume messages
			go func(pc cluster.PartitionConsumer) {
				for msg := range pc.Messages() {
					fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
					consumer.MarkOffset(msg, "")	// mark message as processed
				}
			}(part)
		case <-signals:
			return
		}
	}
}

Running tests

You need to install Ginkgo & Gomega to run tests. Please see http://onsi.github.io/ginkgo for more details.

To run tests, call:

$ make test

Troubleshooting

Consumer not receiving any messages?

By default, sarama's Config.Consumer.Offsets.Initial is set to sarama.OffsetNewest. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.

If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set Config.Consumer.Offsets.Initial to sarama.OffsetOldest.

Documentation

Overview

Package cluster provides cluster extensions for Sarama, enabing users to consume topics across from multiple, balanced nodes.

It requires Kafka v0.9+ and follows the steps guide, described in: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	sarama.Client
	// contains filtered or unexported fields
}

Client is a group client

func NewClient

func NewClient(addrs []string, config *Config) (*Client, error)

NewClient creates a new client instance

func (*Client) ClusterConfig

func (c *Client) ClusterConfig() *Config

ClusterConfig returns the cluster configuration.

type Config

type Config struct {
	sarama.Config

	// Group is the namespace for group management properties
	Group struct {

		// The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange)
		PartitionStrategy Strategy

		// By default, messages and errors from the subscribed topics and partitions are all multiplexed and
		// made available through the consumer's Messages() and Errors() channels.
		//
		// Users who require low-level access can enable ConsumerModePartitions where individual partitions
		// are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions
		// themselves.
		Mode ConsumerMode

		Offsets struct {
			Retry struct {
				// The numer retries when committing offsets (defaults to 3).
				Max int
			}
			Synchronization struct {
				// The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance
				// NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration
				DwellTime time.Duration
			}
		}

		Session struct {
			// The allowed session timeout for registered consumers (defaults to 30s).
			// Must be within the allowed server range.
			Timeout time.Duration
		}

		Heartbeat struct {
			// Interval between each heartbeat (defaults to 3s). It should be no more
			// than 1/3rd of the Group.Session.Timout setting
			Interval time.Duration
		}

		// Return specifies which group channels will be populated. If they are set to true,
		// you must read from the respective channels to prevent deadlock.
		Return struct {
			// If enabled, rebalance notification will be returned on the
			// Notifications channel (default disabled).
			Notifications bool
		}

		Topics struct {
			// An additional whitelist of topics to subscribe to.
			Whitelist *regexp.Regexp
			// An additional blacklist of topics to avoid. If set, this will precede over
			// the Whitelist setting.
			Blacklist *regexp.Regexp
		}

		Member struct {
			// Custom metadata to include when joining the group. The user data for all joined members
			// can be retrieved by sending a DescribeGroupRequest to the broker that is the
			// coordinator for the group.
			UserData []byte
		}
	}
}

Config extends sarama.Config with Group specific namespace

Example (Whitelist)

This example shows how to use the consumer with topic whitelists.

package main

import (
	"fmt"
	"os"
	"regexp"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Group.Topics.Whitelist = regexp.MustCompile(`myservice.*`)

	// init consumer
	consumer, err := cluster.NewConsumer([]string{"127.0.0.1:9092"}, "my-consumer-group", nil, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// consume messages
	msg := <-consumer.Messages()
	fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
}
Output:

func NewConfig

func NewConfig() *Config

NewConfig returns a new configuration instance with sane defaults.

func (*Config) Validate

func (c *Config) Validate() error

Validate checks a Config instance. It will return a sarama.ConfigurationError if the specified values don't make sense.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a cluster group consumer

Example

This example shows how to use the consumer to read messages from a multiple topics through a multiplexed channel.

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume errors
	go func() {
		for err := range consumer.Errors() {
			log.Printf("Error: %s\n", err.Error())
		}
	}()

	// consume notifications
	go func() {
		for ntf := range consumer.Notifications() {
			log.Printf("Rebalanced: %+v\n", ntf)
		}
	}()

	// consume messages, watch signals
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "") // mark message as processed
			}
		case <-signals:
			return
		}
	}
}
Output:

func NewConsumer

func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error)

NewConsumer initializes a new consumer

func NewConsumerFromClient

func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error)

NewConsumerFromClient initializes a new consumer from an existing client.

Please note that clients cannot be shared between consumers (due to Kafka internals), they can only be re-used which requires the user to call Close() on the first consumer before using this method again to initialize another one. Attempts to use a client with more than one consumer at a time will return errors.

func (*Consumer) Close

func (c *Consumer) Close() (err error)

Close safely closes the consumer and releases all resources

func (*Consumer) CommitOffsets

func (c *Consumer) CommitOffsets() error

CommitOffsets allows to manually commit previously marked offsets. By default there is no need to call this function as the consumer will commit offsets automatically using the Config.Consumer.Offsets.CommitInterval setting.

Please be aware that calling this function during an internal rebalance cycle may return broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).

func (*Consumer) Errors

func (c *Consumer) Errors() <-chan error

Errors returns a read channel of errors that occur during offset management, if enabled. By default, errors are logged and not returned over this channel. If you want to implement any custom error handling, set your config's Consumer.Return.Errors setting to true, and read from this channel.

func (*Consumer) HighWaterMarks

func (c *Consumer) HighWaterMarks() map[string]map[int32]int64

HighWaterMarks returns the current high water marks for each topic and partition Consistency between partitions is not guaranteed since high water marks are updated separately.

func (*Consumer) MarkOffset

func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string)

MarkOffset marks the provided message as processed, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.

Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice, and your processing should ideally be idempotent.

func (*Consumer) MarkOffsets

func (c *Consumer) MarkOffsets(s *OffsetStash)

MarkOffsets marks stashed offsets as processed. See MarkOffset for additional explanation.

func (*Consumer) MarkPartitionOffset

func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)

MarkPartitionOffset marks an offset of the provided topic/partition as processed. See MarkOffset for additional explanation.

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage

Messages returns the read channel for the messages that are returned by the broker.

This channel will only return if Config.Group.Mode option is set to ConsumerModeMultiplex (default).

func (*Consumer) Notifications

func (c *Consumer) Notifications() <-chan *Notification

Notifications returns a channel of Notifications that occur during consumer rebalancing. Notifications will only be emitted over this channel, if your config's Group.Return.Notifications setting to true.

func (*Consumer) Partitions

func (c *Consumer) Partitions() <-chan PartitionConsumer

Partitions returns the read channels for individual partitions of this broker.

This will channel will only return if Config.Group.Mode option is set to ConsumerModePartitions.

The Partitions() channel must be listened to for the life of this consumer; when a rebalance happens old partitions will be closed (naturally come to completion) and new ones will be emitted. The returned channel will only close when the consumer is completely shut down.

Example

This example shows how to use the consumer to read messages through individual partitions.

package main

import (
	"fmt"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, set mode to ConsumerModePartitions
	config := cluster.NewConfig()
	config.Group.Mode = cluster.ConsumerModePartitions

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume partitions
	for {
		select {
		case part, ok := <-consumer.Partitions():
			if !ok {
				return
			}

			// start a separate goroutine to consume messages
			go func(pc cluster.PartitionConsumer) {
				for msg := range pc.Messages() {
					fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
					consumer.MarkOffset(msg, "") // mark message as processed
				}
			}(part)
		case <-signals:
			return
		}
	}
}
Output:

func (*Consumer) ResetOffset

func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string)

ResetOffsets marks the provided message as processed, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.

Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset

func (*Consumer) ResetOffsets

func (c *Consumer) ResetOffsets(s *OffsetStash)

ResetOffsets marks stashed offsets as processed. See ResetOffset for additional explanation.

func (*Consumer) ResetPartitionOffset

func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string)

ResetPartitionOffset marks an offset of the provided topic/partition as processed. See ResetOffset for additional explanation.

func (*Consumer) Subscriptions

func (c *Consumer) Subscriptions() map[string][]int32

Subscriptions returns the consumed topics and partitions

type ConsumerMode

type ConsumerMode uint8
const (
	ConsumerModeMultiplex ConsumerMode = iota
	ConsumerModePartitions
)

type Error

type Error struct {
	Ctx string
	// contains filtered or unexported fields
}

Error instances are wrappers for internal errors with a context and may be returned through the consumer's Errors() channel

type Notification

type Notification struct {
	// Type exposes the notification type
	Type NotificationType

	// Claimed contains topic/partitions that were claimed by this rebalance cycle
	Claimed map[string][]int32

	// Released contains topic/partitions that were released as part of this rebalance cycle
	Released map[string][]int32

	// Current are topic/partitions that are currently claimed to the consumer
	Current map[string][]int32
}

Notification are state events emitted by the consumers on rebalance

type NotificationType

type NotificationType uint8

NotificationType defines the type of notification

const (
	UnknownNotification NotificationType = iota
	RebalanceStart
	RebalanceOK
	RebalanceError
)

func (NotificationType) String

func (t NotificationType) String() string

String describes the notification type

type OffsetStash

type OffsetStash struct {
	// contains filtered or unexported fields
}

OffsetStash allows to accumulate offsets and mark them as processed in a bulk

func NewOffsetStash

func NewOffsetStash() *OffsetStash

NewOffsetStash inits a blank stash

func (*OffsetStash) MarkOffset

func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string)

MarkOffset stashes the provided message offset

func (*OffsetStash) MarkPartitionOffset

func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)

MarkPartitionOffset stashes the offset for the provided topic/partition combination

func (*OffsetStash) Offsets

func (s *OffsetStash) Offsets() map[string]int64

Offsets returns the latest stashed offsets by topic-partition

func (*OffsetStash) ResetOffset

func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string)

ResetOffset stashes the provided message offset See ResetPartitionOffset for explanation

func (*OffsetStash) ResetPartitionOffset

func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string)

ResetPartitionOffset stashes the offset for the provided topic/partition combination. Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets

type PartitionConsumer

type PartitionConsumer interface {
	sarama.PartitionConsumer

	// Topic returns the consumed topic name
	Topic() string

	// Partition returns the consumed partition
	Partition() int32

	// InitialOffset returns the offset used for creating the PartitionConsumer instance.
	// The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
	InitialOffset() int64

	// MarkOffset marks the offset of a message as preocessed.
	MarkOffset(offset int64, metadata string)

	// ResetOffset resets the offset to a previously processed message.
	ResetOffset(offset int64, metadata string)
}

PartitionConsumer allows code to consume individual partitions from the cluster.

See docs for Consumer.Partitions() for more on how to implement this.

type Strategy

type Strategy string

Strategy for partition to consumer assignement

const (
	// StrategyRange is the default and assigns partition ranges to consumers.
	// Example with six partitions and two consumers:
	//   C1: [0, 1, 2]
	//   C2: [3, 4, 5]
	StrategyRange Strategy = "range"

	// StrategyRoundRobin assigns partitions by alternating over consumers.
	// Example with six partitions and two consumers:
	//   C1: [0, 2, 4]
	//   C2: [1, 3, 5]
	StrategyRoundRobin Strategy = "roundrobin"
)

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL