Documentation ¶
Overview ¶
Package evaluator - Group evaluation subsystem. The evaluator subsystem is responsible for fetching group information from the storage subsystem and calculating the group's status based on that. It responds to EvaluatorRequest objects that are send via a channel, and replies with a ConsumerGroupStatus.
Modules ¶
Currently, only one module is provided:
* caching - Evaluate a consumer group and cache the results in memory for a short period of time
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CachingEvaluator ¶
type CachingEvaluator struct { // App is a pointer to the application context. This stores the channel to the storage subsystem App *protocol.ApplicationContext // Log is a logger that has been configured for this module to use. Normally, this means it has been set up with // fields that are appropriate to identify this coordinator Log *zap.Logger RequestChannel chan *protocol.EvaluatorRequest // contains filtered or unexported fields }
CachingEvaluator is an evaluator module that responds to evaluation requests and checks consumer status using the standard Burrow definitions for stall, stop, and lag. The results are stored in an in-memory cache for a configurable amount of time, in order to avoid duplication of work when multiple modules evaluate the same consumer group.
func (*CachingEvaluator) Configure ¶
func (module *CachingEvaluator) Configure(name string, configRoot string)
Configure validates the configuration for the module, creates a channel to receive requests on, and sets up the cache. If no expiration time for cache entries is set, a default value of 10 seconds is used. If there is any problem starting the goswarm cache, this func panics.
func (*CachingEvaluator) GetCommunicationChannel ¶
func (module *CachingEvaluator) GetCommunicationChannel() chan *protocol.EvaluatorRequest
GetCommunicationChannel returns the RequestChannel that has been setup for this module.
func (*CachingEvaluator) Start ¶
func (module *CachingEvaluator) Start() error
Start instantiates the main loop that listens for evaluation requests and returns the result
func (*CachingEvaluator) Stop ¶
func (module *CachingEvaluator) Stop() error
Stop closes the module's RequestChannel, which also terminates the main loop that responds to requests
type Coordinator ¶
type Coordinator struct { // App is a pointer to the application context. This stores the channel to the storage subsystem App *protocol.ApplicationContext // Log is a logger that has been configured for this module to use. Normally, this means it has been set up with // fields that are appropriate to identify this coordinator Log *zap.Logger // contains filtered or unexported fields }
Coordinator manages a single evaluator module (only one module is supported at this time), making sure it is configured, started, and stopped at the appropriate time. It is also responsible for listening to the EvaluatorChannel that is provided in the application context and forwarding those requests to the evaluator module. If no evaluator module has been configured explicitly, the coordinator starts the caching module as a default.
func StorageAndEvaluatorCoordinatorsWithOffsets ¶
func StorageAndEvaluatorCoordinatorsWithOffsets() (*Coordinator, *storage.Coordinator)
StorageAndEvaluatorCoordinatorsWithOffsets sets up a Coordinator with a single caching module defined. In order to do this, it also calls the storage subsystem fixture to get a configured storage.Coordinator with offsets for a test cluster and group. This func should never be called in normal code. It is only provided to facilitate testing by other subsystems.
func (*Coordinator) Configure ¶
func (ec *Coordinator) Configure()
Configure is called to create the configured evaluator module and call its Configure func to validate the configuration and set it up. The coordinator will panic is more than one module is configured, and if no modules have been configured, it will set up a default caching evaluator module. If there are any problems, it is expected that this func will panic with a descriptive error message, as configuration failures are not recoverable errors.
func (*Coordinator) Start ¶
func (ec *Coordinator) Start() error
Start calls the evaluator module's underlying Start func. If the module Start returns an error, this func stops immediately and returns that error to the caller.
We also start a request forwarder goroutine. This listens to the EvaluatorChannel that is provided in the application context that all modules receive, and forwards those requests to the evaluator modules. At the present time, the evaluator only supports one module, so this is a simple "accept and forward".
func (*Coordinator) Stop ¶
func (ec *Coordinator) Stop() error
Stop calls the configured evaluator module's underlying Stop func. It is expected that the module Stop will not return until the module has been completely stopped. While an error can be returned, this func always returns no error, as a failure during stopping is not a critical failure
type Module ¶
type Module interface { protocol.Module GetCommunicationChannel() chan *protocol.EvaluatorRequest }
Module is responsible for answering requests to evaluate the status of a consumer group. It fetches offset information from the storage subsystem and transforms that into a protocol.ConsumerGroupStatus response. It conforms to the overall protocol.Module interface, but it adds a func to fetch the channel that the module is listening on for requests, so that requests can be forwarded to it by the coordinator