consumergroup

package
v0.0.0-...-7c36d94 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2015 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	AlreadyClosing = errors.New("The consumer group is already shutting down.")
)
View Source
var (
	UncleanClose = errors.New("Not all offsets were committed before shutdown was completed")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	*sarama.Config

	Zookeeper *kazoo.Config

	Offsets struct {
		Initial           int64         // The initial offset method to use if the consumer has no previously stored offset. Must be either sarama.OffsetOldest (default) or sarama.OffsetNewest.
		ProcessingTimeout time.Duration // Time to wait for all the offsets for a partition to be processed after stopping to consume from it. Defaults to 1 minute.
		CommitInterval    time.Duration // The interval between which the processed offsets are commited.
		ResetOffsets      bool          // Resets the offsets for the consumergroup so that it won't resume from where it left off previously.
	}
}

func NewConfig

func NewConfig() *Config

func (*Config) Validate

func (cgc *Config) Validate() error

type ConsumerGroup

type ConsumerGroup struct {
	// contains filtered or unexported fields
}

The ConsumerGroup type holds all the information for a consumer that is part of a consumer group. Call JoinConsumerGroup to start a consumer.

Example
consumer, consumerErr := JoinConsumerGroup(
	"ExampleConsumerGroup",
	[]string{TopicWithSinglePartition, TopicWithMultiplePartitions},
	zookeeperPeers,
	nil)

if consumerErr != nil {
	log.Fatalln(consumerErr)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
	<-c
	consumer.Close()
}()

eventCount := 0

for event := range consumer.Messages() {
	// Process event
	log.Println(string(event.Value))
	eventCount += 1

	// Ack event
	consumer.CommitUpto(event)
}

log.Printf("Processed %d events.", eventCount)
Output:

func JoinConsumerGroup

func JoinConsumerGroup(name string, topics []string, zookeeper []string, config *Config) (cg *ConsumerGroup, err error)

Connects to a consumer group, using Zookeeper for auto-discovery

func (*ConsumerGroup) Close

func (cg *ConsumerGroup) Close() error

func (*ConsumerGroup) Closed

func (cg *ConsumerGroup) Closed() bool

func (*ConsumerGroup) CommitUpto

func (cg *ConsumerGroup) CommitUpto(message *sarama.ConsumerMessage) error

func (*ConsumerGroup) Errors

func (cg *ConsumerGroup) Errors() <-chan *sarama.ConsumerError

Returns a channel that you can read to obtain events from Kafka to process.

func (*ConsumerGroup) Instance

func (cg *ConsumerGroup) Instance() *kazoo.ConsumergroupInstance

Returns Kazoo ConsumergroupInstance

func (*ConsumerGroup) InstanceRegistered

func (cg *ConsumerGroup) InstanceRegistered() (bool, error)

func (*ConsumerGroup) Logf

func (cg *ConsumerGroup) Logf(format string, args ...interface{})

func (*ConsumerGroup) Messages

func (cg *ConsumerGroup) Messages() <-chan *sarama.ConsumerMessage

Returns a channel that you can read to obtain events from Kafka to process.

func (*ConsumerGroup) OffsetManager

func (cg *ConsumerGroup) OffsetManager() OffsetManager

Returns OffsetManager

type OffsetManager

type OffsetManager interface {

	// InitializePartition is called when the consumergroup is starting to consume a
	// partition. It should return the last processed offset for this partition. Note:
	// the same partition can be initialized multiple times during a single run of a
	// consumer group due to other consumer instances coming online and offline.
	InitializePartition(topic string, partition int32) (int64, error)

	// MarkAsProcessed tells the offset manager than a certain message has been successfully
	// processed by the consumer, and should be committed. The implementation does not have
	// to store this offset right away, but should return true if it intends to do this at
	// some point.
	//
	// Offsets should generally be increasing if the consumer
	// processes events serially, but this cannot be guaranteed if the consumer does any
	// asynchronous processing. This can be handled in various ways, e.g. by only accepting
	// offsets that are higehr than the offsets seen before for the same partition.
	MarkAsProcessed(topic string, partition int32, offset int64) bool

	// FinalizePartition is called when the consumergroup is done consuming a
	// partition. In this method, the offset manager can flush any remaining offsets to its
	// backend store. It should return an error if it was not able to commit the offset.
	// Note: it's possible that the consumergroup instance will start to consume the same
	// partition again after this function is called.
	FinalizePartition(topic string, partition int32, lastOffset int64, timeout time.Duration) error

	// Close is called when the consumergroup is shutting down. In normal circumstances, all
	// offsets are committed because FinalizePartition is called for all the running partition
	// consumers. You may want to check for this to be true, and try to commit any outstanding
	// offsets. If this doesn't succeed, it should return an error.
	Close() error

	Offsets(topic string) (map[int32]int64, error)
}

OffsetManager is the main interface consumergroup requires to manage offsets of the consumergroup.

func NewZookeeperOffsetManager

func NewZookeeperOffsetManager(cg *ConsumerGroup, config *OffsetManagerConfig) OffsetManager

NewZookeeperOffsetManager returns an offset manager that uses Zookeeper to store offsets.

type OffsetManagerConfig

type OffsetManagerConfig struct {
	CommitInterval time.Duration // Interval between offset flushes to the backend store.
	VerboseLogging bool          // Whether to enable verbose logging.
}

OffsetManagerConfig holds configuration setting son how the offset manager should behave.

func NewOffsetManagerConfig

func NewOffsetManagerConfig() *OffsetManagerConfig

NewOffsetManagerConfig returns a new OffsetManagerConfig with sane defaults.

type OffsetsMap

type OffsetsMap map[string]TopicOffsets

type PartitionOffsetTracker

type PartitionOffsetTracker struct {
	// contains filtered or unexported fields
}

type TopicOffsets

type TopicOffsets map[int32]*PartitionOffsetTracker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL