Documentation ¶
Overview ¶
Package storage - Data storage subsystem. The storage subsystem receives information from the cluster and consumer subsystems and serves that information out to other subsystems on request.
Modules ¶
Currently, only one module is provided:
* inmemory - Store all information in a set of in-memory maps
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Coordinator ¶
type Coordinator struct { // App is a pointer to the application context. This stores the channel to the storage subsystem App *protocol.ApplicationContext // Log is a logger that has been configured for this module to use. Normally, this means it has been set up with // fields that are appropriate to identify this coordinator Log *zap.Logger // contains filtered or unexported fields }
Coordinator (storage) manages a single storage module (only one module is supported at this time), making sure it is configured, started, and stopped at the appropriate time. It is also responsible for listening to the StorageChannel that is provided in the application context and forwarding those requests to the storage module. If no storage module has been configured explicitly, the coordinator starts the inmemory module as a default.
func CoordinatorWithOffsets ¶
func CoordinatorWithOffsets() *Coordinator
CoordinatorWithOffsets sets up a Coordinator with a single inmemory module defined. This module is loaded with offsets for a test cluster and group. This func should never be called in normal code. It is only provided to facilitate testing by other subsystems.
func (*Coordinator) Configure ¶
func (sc *Coordinator) Configure()
Configure is called to create the configured storage module and call its Configure func to validate the configuration and set it up. The coordinator will panic is more than one module is configured, and if no modules have been configured, it will set up a default inmemory storage module. If there are any problems, it is expected that this func will panic with a descriptive error message, as configuration failures are not recoverable errors.
func (*Coordinator) Start ¶
func (sc *Coordinator) Start() error
Start calls the storage module's underlying Start func. If the module Start returns an error, this func stops immediately and returns that error to the caller.
We also start a request forwarder goroutine. This listens to the StorageChannel that is provided in the application context that all modules receive, and forwards those requests to the storage modules. At the present time, the storage subsystem only supports one module, so this is a simple "accept and forward".
func (*Coordinator) Stop ¶
func (sc *Coordinator) Stop() error
Stop calls the configured storage module's underlying Stop func. It is expected that the module Stop will not return until the module has been completely stopped. While an error can be returned, this func always returns no error, as a failure during stopping is not a critical failure
type InMemoryStorage ¶
type InMemoryStorage struct { // App is a pointer to the application context. This stores the channel to the storage subsystem App *protocol.ApplicationContext // Log is a logger that has been configured for this module to use. Normally, this means it has been set up with // fields that are appropriate to identify this coordinator Log *zap.Logger // contains filtered or unexported fields }
InMemoryStorage is a storage module that maintains the entire data set in memory in a series of maps. It has a configurable number of worker goroutines to service requests, and for requests that are group-specific, the group and cluster name are used to hash the request to a consistent worker. This assures that requests for a group are processed in order.
func (*InMemoryStorage) Configure ¶
func (module *InMemoryStorage) Configure(name string, configRoot string)
Configure validates the configuration for the module, creates a channel to receive requests on, and sets up the storage map. If no expiration time for groups is set, a default value of 7 days is used. If no interval count is set, a default of 10 intervals is used. If no worker count is set, a default of 20 workers is used.
func (*InMemoryStorage) GetCommunicationChannel ¶
func (module *InMemoryStorage) GetCommunicationChannel() chan *protocol.StorageRequest
GetCommunicationChannel returns the RequestChannel that has been setup for this module.
func (*InMemoryStorage) Start ¶
func (module *InMemoryStorage) Start() error
Start sets up the rest of the storage map for each configured cluster. It then starts the configured number of worker routines to handle requests. Finally, it starts a main loop which will receive requests and hash them to the correct worker.
func (*InMemoryStorage) Stop ¶
func (module *InMemoryStorage) Stop() error
Stop closes the incoming request channel, which will close the main loop. It then closes each of the worker channels, to close the workers, and waits for all goroutines to exit before returning.
type Module ¶
type Module interface { protocol.Module GetCommunicationChannel() chan *protocol.StorageRequest }
Module (storage) is responsible for maintaining all the broker and consumer offsets for all clusters that Burrow watches. It must accept and respond to all protocol.StorageRequest types. This interface conforms to the overall protocol.Module interface, but it adds a func to fetch the channel that the module is listening on for requests, so that requests can be forwarded to it by the coordinator.