Documentation ¶
Overview ¶
Package consumer - Kafka consumer subsystem. The consumer subsystem is responsible for getting consumer offset information and sending that information to the storage subsystem. This consumer information could be stored in a variety of places, and each module supports a different type of repository.
Modules ¶
Currently, the following modules are provided:
* kafka - Consume a Kafka cluster's __consumer_offsets topic to get consumer information (new consumer)
* kafka_zk - Parse the /consumers tree of a Kafka cluster's metadata to get consumer information (old consumer)
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Coordinator ¶
type Coordinator struct { // App is a pointer to the application context. This stores the channel to the storage subsystem App *protocol.ApplicationContext // Log is a logger that has been configured for this module to use. Normally, this means it has been set up with // fields that are appropriate to identify this coordinator Log *zap.Logger // contains filtered or unexported fields }
Coordinator manages all consumer modules, making sure they are configured, started, and stopped at the appropriate time.
func (*Coordinator) Configure ¶
func (cc *Coordinator) Configure()
Configure is called to create each of the configured consumer modules and call their Configure funcs to validate their individual configurations and set them up. If there are any problems, it is expected that these funcs will panic with a descriptive error message, as configuration failures are not recoverable errors.
func (*Coordinator) Start ¶
func (cc *Coordinator) Start() error
Start calls each of the configured consumer modules' underlying Start funcs. As the coordinator itself has no ongoing work to do, it does not start any other goroutines. If any module Start returns an error, this func stops immediately and returns that error to the caller. No further modules will be loaded after that.
func (*Coordinator) Stop ¶
func (cc *Coordinator) Stop() error
Stop calls each of the configured consumer modules' underlying Stop funcs. It is expected that the module Stop will not return until the module has been completely stopped. While an error can be returned, this func always returns no error, as a failure during stopping is not a critical failure
type KafkaClient ¶
type KafkaClient struct { // App is a pointer to the application context. This stores the channel to the storage subsystem App *protocol.ApplicationContext // Log is a logger that has been configured for this module to use. Normally, this means it has been set up with // fields that are appropriate to identify this coordinator Log *zap.Logger // contains filtered or unexported fields }
KafkaClient is a consumer module which connects to a single Apache Kafka cluster and reads consumer group information from the offsets topic in the cluster, which is typically __consumer_offsets. The messages in this topic are decoded and the information is forwarded to the storage subsystem for use in evaluations.
func (*KafkaClient) Configure ¶
func (module *KafkaClient) Configure(name string, configRoot string)
Configure validates the configuration for the consumer. At minimum, there must be a cluster name to which these consumers belong, as well as a list of servers provided for the Kafka cluster, of the form host:port. If not explicitly configured, the offsets topic is set to the default for Kafka, which is __consumer_offsets. If the cluster name is unknown, or if the server list is missing or invalid, this func will panic.
func (*KafkaClient) Start ¶
func (module *KafkaClient) Start() error
Start connects to the Kafka cluster using the Shopify/sarama client. Any error connecting to the cluster is returned to the caller. Once the client is set up, the consumers for the configured offsets topic are started.
func (*KafkaClient) Stop ¶
func (module *KafkaClient) Stop() error
Stop closes the goroutines that listen to the client consumer.
type KafkaZkClient ¶
type KafkaZkClient struct { // App is a pointer to the application context. This stores the channel to the storage subsystem App *protocol.ApplicationContext // Log is a logger that has been configured for this module to use. Normally, this means it has been set up with // fields that are appropriate to identify this coordinator Log *zap.Logger // contains filtered or unexported fields }
KafkaZkClient is a consumer module which connects to the Zookeeper ensemble where an Apache Kafka cluster maintains metadata, and reads consumer group information from the /consumers tree (older ZK-based consumers). It uses watches to monitor every group and offset, and the information is forwarded to the storage subsystem for use in evaluations.
func (*KafkaZkClient) Configure ¶
func (module *KafkaZkClient) Configure(name string, configRoot string)
Configure validates the configuration for the consumer. At minimum, there must be a cluster name to which these consumers belong, as well as a list of servers provided for the Zookeeper ensemble, of the form host:port. If not explicitly configured, it is assumed that the Kafka cluster metadata is present in the ensemble root path. If the cluster name is unknown, or if the server list is missing or invalid, this func will panic.
func (*KafkaZkClient) Start ¶
func (module *KafkaZkClient) Start() error
Start connects to the Zookeeper ensemble configured. Any error connecting to the cluster is returned to the caller. Once the client is set up, the consumer group list is enumerated and watches are set up for each group, topic, partition, and offset. A goroutine is also started to monitor the Zookeeper connection state, and reset the watches in the case the the session expires.
func (*KafkaZkClient) Stop ¶
func (module *KafkaZkClient) Stop() error
Stop closes the Zookeeper client.