consumer

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 3, 2018 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package consumer - Kafka consumer subsystem. The consumer subsystem is responsible for getting consumer offset information and sending that information to the storage subsystem. This consumer information could be stored in a variety of places, and each module supports a different type of repository.

Modules

Currently, the following modules are provided:

* kafka - Consume a Kafka cluster's __consumer_offsets topic to get consumer information (new consumer)

* kafka_zk - Parse the /consumers tree of a Kafka cluster's metadata to get consumer information (old consumer)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Coordinator

type Coordinator struct {
	// App is a pointer to the application context. This stores the channel to the storage subsystem
	App *protocol.ApplicationContext

	// Log is a logger that has been configured for this module to use. Normally, this means it has been set up with
	// fields that are appropriate to identify this coordinator
	Log *zap.Logger
	// contains filtered or unexported fields
}

Coordinator manages all consumer modules, making sure they are configured, started, and stopped at the appropriate time.

func (*Coordinator) Configure

func (cc *Coordinator) Configure()

Configure is called to create each of the configured consumer modules and call their Configure funcs to validate their individual configurations and set them up. If there are any problems, it is expected that these funcs will panic with a descriptive error message, as configuration failures are not recoverable errors.

func (*Coordinator) Start

func (cc *Coordinator) Start() error

Start calls each of the configured consumer modules' underlying Start funcs. As the coordinator itself has no ongoing work to do, it does not start any other goroutines. If any module Start returns an error, this func stops immediately and returns that error to the caller. No further modules will be loaded after that.

func (*Coordinator) Stop

func (cc *Coordinator) Stop() error

Stop calls each of the configured consumer modules' underlying Stop funcs. It is expected that the module Stop will not return until the module has been completely stopped. While an error can be returned, this func always returns no error, as a failure during stopping is not a critical failure

type KafkaClient

type KafkaClient struct {
	// App is a pointer to the application context. This stores the channel to the storage subsystem
	App *protocol.ApplicationContext

	// Log is a logger that has been configured for this module to use. Normally, this means it has been set up with
	// fields that are appropriate to identify this coordinator
	Log *zap.Logger
	// contains filtered or unexported fields
}

KafkaClient is a consumer module which connects to a single Apache Kafka cluster and reads consumer group information from the offsets topic in the cluster, which is typically __consumer_offsets. The messages in this topic are decoded and the information is forwarded to the storage subsystem for use in evaluations.

func (*KafkaClient) Configure

func (module *KafkaClient) Configure(name string, configRoot string)

Configure validates the configuration for the consumer. At minimum, there must be a cluster name to which these consumers belong, as well as a list of servers provided for the Kafka cluster, of the form host:port. If not explicitly configured, the offsets topic is set to the default for Kafka, which is __consumer_offsets. If the cluster name is unknown, or if the server list is missing or invalid, this func will panic.

func (*KafkaClient) Start

func (module *KafkaClient) Start() error

Start connects to the Kafka cluster using the Shopify/sarama client. Any error connecting to the cluster is returned to the caller. Once the client is set up, the consumers for the configured offsets topic are started.

func (*KafkaClient) Stop

func (module *KafkaClient) Stop() error

Stop closes the goroutines that listen to the client consumer.

type KafkaZkClient

type KafkaZkClient struct {
	// App is a pointer to the application context. This stores the channel to the storage subsystem
	App *protocol.ApplicationContext

	// Log is a logger that has been configured for this module to use. Normally, this means it has been set up with
	// fields that are appropriate to identify this coordinator
	Log *zap.Logger
	// contains filtered or unexported fields
}

KafkaZkClient is a consumer module which connects to the Zookeeper ensemble where an Apache Kafka cluster maintains metadata, and reads consumer group information from the /consumers tree (older ZK-based consumers). It uses watches to monitor every group and offset, and the information is forwarded to the storage subsystem for use in evaluations.

func (*KafkaZkClient) Configure

func (module *KafkaZkClient) Configure(name string, configRoot string)

Configure validates the configuration for the consumer. At minimum, there must be a cluster name to which these consumers belong, as well as a list of servers provided for the Zookeeper ensemble, of the form host:port. If not explicitly configured, it is assumed that the Kafka cluster metadata is present in the ensemble root path. If the cluster name is unknown, or if the server list is missing or invalid, this func will panic.

func (*KafkaZkClient) Start

func (module *KafkaZkClient) Start() error

Start connects to the Zookeeper ensemble configured. Any error connecting to the cluster is returned to the caller. Once the client is set up, the consumer group list is enumerated and watches are set up for each group, topic, partition, and offset. A goroutine is also started to monitor the Zookeeper connection state, and reset the watches in the case the the session expires.

func (*KafkaZkClient) Stop

func (module *KafkaZkClient) Stop() error

Stop closes the Zookeeper client.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL