Documentation ¶
Overview ¶
Package cluster - Kafka cluster subsystem. The cluster subsystem is responsible for getting topic and partition information, as well as current broker offsets, from Kafka clusters and sending that information to the storage subsystem. It does not handle any consumer group information.
Modules ¶
Currently, the following modules are provided:
* kafka - Fetch topic, partition, and offset information from a Kafka cluster
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Coordinator ¶
type Coordinator struct { // App is a pointer to the application context. This stores the channel to the storage subsystem App *protocol.ApplicationContext // Log is a logger that has been configured for this module to use. Normally, this means it has been set up with // fields that are appropriate to identify this coordinator Log *zap.Logger // contains filtered or unexported fields }
Coordinator manages all cluster modules, making sure they are configured, started, and stopped at the appropriate time.
func (*Coordinator) Configure ¶
func (bc *Coordinator) Configure()
Configure is called to create each of the configured cluster modules and call their Configure funcs to validate their individual configurations and set them up. If there are any problems, it is expected that these funcs will panic with a descriptive error message, as configuration failures are not recoverable errors.
func (*Coordinator) Start ¶
func (bc *Coordinator) Start() error
Start calls each of the configured cluster modules' underlying Start funcs. As the coordinator itself has no ongoing work to do, it does not start any other goroutines. If any module Start returns an error, this func stops immediately and returns that error to the caller. No further modules will be loaded after that.
func (*Coordinator) Stop ¶
func (bc *Coordinator) Stop() error
Stop calls each of the configured cluster modules' underlying Stop funcs. It is expected that the module Stop will not return until the module has been completely stopped. While an error can be returned, this func always returns no error, as a failure during stopping is not a critical failure
type KafkaCluster ¶
type KafkaCluster struct { // App is a pointer to the application context. This stores the channel to the storage subsystem App *protocol.ApplicationContext // Log is a logger that has been configured for this module to use. Normally, this means it has been set up with // fields that are appropriate to identify this coordinator Log *zap.Logger // contains filtered or unexported fields }
KafkaCluster is a cluster module which connects to a single Apache Kafka cluster and manages the broker topic and partition information. It periodically updates a list of all topics and partitions, and also fetches the broker end offset (latest) for each partition. This information is forwarded to the storage module for use in consumer evaluations.
func (*KafkaCluster) Configure ¶
func (module *KafkaCluster) Configure(name string, configRoot string)
Configure validates the configuration for the cluster. At minimum, there must be a list of servers provided for the Kafka cluster, of the form host:port. Default values will be set for the intervals to use for refreshing offsets (10 seconds) and topics (60 seconds). A missing, or bad, list of servers will cause this func to panic.
func (*KafkaCluster) Start ¶
func (module *KafkaCluster) Start() error
Start connects to the Kafka cluster using the Shopify/sarama client. Any error connecting to the cluster is returned to the caller. Once the client is set up, tickers are started to periodically refresh topics and offsets.
func (*KafkaCluster) Stop ¶
func (module *KafkaCluster) Stop() error
Stop causes both the topic and offset refresh tickers to be stopped, and then it closes the Kafka client.