Documentation ¶
Overview ¶
Package protocol - Burrow types and interfaces. The protocol module provides the definitions for most of the common Burrow types and interfaces that are used in the rest of the application. The documentation here is primarily targeted at developers of Burrow modules, and not the end user.
Index ¶
- type ApplicationContext
- type ConsumerGroupStatus
- type ConsumerOffset
- type ConsumerPartition
- type ConsumerPartitions
- type ConsumerTopics
- type Coordinator
- type EvaluatorRequest
- type Module
- type PartitionStatus
- type StatusConstant
- type StorageRequest
- type StorageRequestConstant
- type ZookeeperClient
- type ZookeeperLock
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ApplicationContext ¶
type ApplicationContext struct { // Logger is a configured zap.Logger instance. It is to be used by the main routine directly, and the main routine // creates loggers for each of the coordinators to use that have fields set to identify that coordinator. // // This field can be set prior to calling core.Start() in order to pre-configure the logger. If it is not set, // core.Start() will set up a default logger using the application config. Logger *zap.Logger // LogLevel is an AtomicLevel instance that has been used to set the default level of the Logger. It is used to // dynamically adjust the logging level (such as via an HTTP call) // // If Logger has been set prior to calling core.Start(), LogLevel must be set as well. LogLevel *zap.AtomicLevel // This is used by the main Burrow routines to signal that the configuration is valid. The rest of the code should // not care about this, as the application will exit if the configuration is not valid. ConfigurationValid bool // This is a ZookeeperClient that can be used by any coordinator or module in order to store metadata about their // operation, or to create locks. Any module that uses this client must honor the ZookeeperRoot, which is the root // path under which all ZNodes should be created. Zookeeper ZookeeperClient ZookeeperRoot string // This is a boolean flag which is set or unset by the Zookeeper coordinator to signal when the connection is // established. It should be used in coordination with the ZookeeperExpired condition to monitor when the session // has expired. This indicates locks have been released or watches must be reset. ZookeeperConnected bool ZookeeperExpired *sync.Cond // This is the channel over which any module should send a consumer group evaluation request. It is serviced by the // evaluator Coordinator, and passed to an appropriate evaluator module. EvaluatorChannel chan *EvaluatorRequest // This is the channel over which any module should send storage requests for storage of offsets and group // information, or to fetch the same information. It is serviced by the storage Coordinator. StorageChannel chan *StorageRequest }
ApplicationContext is a structure that holds objects that are used across all coordinators and modules. This is used in lieu of passing individual arguments to all functions.
type ConsumerGroupStatus ¶
type ConsumerGroupStatus struct { // The name of the cluster in which the group exists Cluster string `json:"cluster"` // The name of the consumer group Group string `json:"group"` // The status of the consumer group. This is either NOTFOUND, OK, WARN, or ERR. It is calculated from the highest // Status for the individual partitions Status StatusConstant `json:"status"` // A number between 0.0 and 1.0 that describes the percentage complete the partition information is for this group. // A partition that has a Complete value of less than 1.0 will be treated as zero. Complete float32 `json:"complete"` // A slice of PartitionStatus objects showing individual partition status. If the request ShowAll field was true, // this slice will contain every partition consumed by the group. If ShowAll was false, this slice will only // contain the partitions that have a status of WARN or above. Partitions []*PartitionStatus `json:"partitions"` // A count of the total number of partitions that the group has committed offsets for. Note, this may not be the // same as the total number of partitions consumed by the group, if Burrow has not seen commits for all partitions // yet. TotalPartitions int `json:"partition_count"` // A PartitionStatus object for the partition with the highest CurrentLag value Maxlag *PartitionStatus `json:"maxlag"` // The sum of all partition CurrentLag values for the group TotalLag uint64 `json:"totallag"` }
ConsumerGroupStatus is the response object that is sent in reply to an EvaluatorRequest. It describes the current status of a single consumer group.
type ConsumerOffset ¶
type ConsumerOffset struct { // The offset that is stored Offset int64 `json:"offset"` // The timestamp at which the offset was committed Timestamp int64 `json:"timestamp"` // The number of messages that the consumer was behind at the time that the offset was committed. This number is // not updated after the offset was committed, so it does not represent the current lag of the consumer. Lag uint64 `json:"lag"` }
ConsumerOffset represents a single offset stored. It is used as part of the response to a StorageFetchConsumer request
type ConsumerPartition ¶
type ConsumerPartition struct { // A slice containing a ConsumerOffset object for each offset Burrow has stored for this partition. This can be any // length up to the number of intervals Burrow has been configured to store, depending on how many offset commits // have been seen for this partition Offsets []*ConsumerOffset `json:"offsets"` // A slice containing the history of broker offsets stored for this partition. This is used for evaluation only, // and as such it is not provided when encoding to JSON (for HTTP responses) BrokerOffsets []int64 `json:"-"` // A string that describes the consumer host that currently owns this partition, if the information is available // (for active new consumers) Owner string `json:"owner"` // A string containing the client_id set by the consumer (for active new consumers) ClientID string `json:"client_id"` // The current number of messages that the consumer is behind for this partition. This is calculated using the // last committed offset and the current broker end offset CurrentLag uint64 `json:"current-lag"` }
ConsumerPartition represents the information stored for a group for a single partition. It is used as part of the response to a StorageFetchConsumer request
type ConsumerPartitions ¶
type ConsumerPartitions []*ConsumerPartition
ConsumerPartitions describes all partitions for a single topic. The index indicates the partition ID, and the value is a pointer to a ConsumerPartition object with the offset information for that partition.
type ConsumerTopics ¶
type ConsumerTopics map[string]ConsumerPartitions
ConsumerTopics is the response that is sent for a StorageFetchConsumer request. It is a map of topic names to ConsumerPartitions objects that describe that topic
type Coordinator ¶
type Coordinator interface { // Configure is called to initially set up the coordinator. In this func, it should validate any configurations // that the coordinator requires, and then call the Configure func for each of its modules. If there are any errors // in configuration, it is expected that this call will panic. The coordinator may also set up data structures that // are critical for the subsystem, such as communication channels. It must NOT make any connections to resources // outside of the coordinator itself, including either the storage or evaluator channels in the application context. Configure() // Start is called to start the operation of the coordinator. In this func, the coordinator should call the Start // func for any of its modules, and then start any additional logic the coordinator needs to run. This func must // return (any running code must be started as a goroutine). If there is a problem starting up, the coordinator // should stop anything it has already started and return a non-nil error. Start() error // Stop is called to stop operation of the coordinator. In this func, the coordinator should call the Stop func for // any of its modules, and stop any goroutines that it has started. While it can return an error if there is a // problem, the errors are mostly ignored. Stop() error }
Coordinator is a common interface for all subsystem coordinators so that the core routine can manage them in a consistent manner. The interface provides a way to configure the coordinator, and then methods to start it and stop it safely. It is expected that when any of these funcs are called, the coordinator will then call the corresponding func on its modules.
The struct that implements this interface is expected to have an App and Log literal, at the very least. The App literal will contain the protocol.ApplicationContext object with resources that the coordinator and modules may use. The Log literal will be set up with a logger that has fields set that identify the coordinator. These are set up by the core routine before Configure is called. The coordinator can use Log to create the individual loggers for the modules it controls.
type EvaluatorRequest ¶
type EvaluatorRequest struct { // Reply is the channel over which the evaluator will send the status response. The sender should expect to receive // only one message over this channel for each request, and the channel will not be closed after the response is // sent (to facilitate the notifier, which uses a single channel for all responses) Reply chan *ConsumerGroupStatus // The name of the cluster in which the group is found Cluster string // The name of the group to get the status for Group string // If ShowAll is true, the returned status object contains a partition entry for every partition the group consumes, // regardless of the state of that partition. If false (the default), only partitions that have a status of WARN // or above are returned in the status object. ShowAll bool }
EvaluatorRequest is sent over the EvaluatorChannel that is stored in the application context. It is a query for the status of a group in a cluster. The response to this query is sent over the reply channel. This request is typically used in the HTTP server and notifier subsystems.
type Module ¶
type Module interface { // Configure is called to initially set up the module. The name of the module, as well as the root string to be // used when looking up configurations with viper, are provided. In this func, the module must completely validate // it's own configuration, and panic if it is not correct. It may also set up data structures that are critical // for the module. It must NOT make any connections to resources outside of the module itself, including either // the storage or evaluator channels in the application context. Configure(name string, configRoot string) // Start is called to start the operation of the module. In this func, the module should make connections to // external resources and start operation. This func must return (any running code must be started as a goroutine). // If there is a problem starting up, the module should stop anything it has already started and return a non-nil // error. Start() error // Stop is called to stop operation of the module. In this func, the module should clean up any goroutines it has // started and close any external connections. While it can return an error if there is a problem, the errors are // mostly ignored. Stop() error }
Module is a common interface for all modules so that they can be manipulated by the coordinators in the same way. The interface provides a way to configure the module, and then methods to start it and stop it safely. Each coordinator may have its own Module interface definition, as well, that adds specific requirements for that type of module.
The struct that implements this interface is expected to have an App and Log literal, at the very least. The App literal will contain the protocol.ApplicationContext object with resources that the module may use. The Log literal will be set up with a logger that has fields set that identify the module. These are set up by the module's coordinator before Configure is called.
type PartitionStatus ¶
type PartitionStatus struct { // The topic name for this partition Topic string `json:"topic"` // The partition ID Partition int32 `json:"partition"` // If available (for active new consumers), the consumer host that currently owns this partiton Owner string `json:"owner"` // If available (for active new consumers), the client_id of the consumer that currently owns this partition ClientID string `json:"client_id"` // The status of the partition Status StatusConstant `json:"status"` // A ConsumerOffset object that describes the first (oldest) offset that Burrow is storing for this partition Start *ConsumerOffset `json:"start"` // A ConsumerOffset object that describes the last (latest) offset that Burrow is storing for this partition End *ConsumerOffset `json:"end"` // The current number of messages that the consumer is behind for this partition. This is calculated using the // last committed offset and the current broker end offset CurrentLag uint64 `json:"current_lag"` // A number between 0.0 and 1.0 that describes the percentage complete the offset information is for this partition. // For example, if Burrow has been configured to store 10 offsets, and Burrow has only stored 7 commits for this // partition, Complete will be 0.7 Complete float32 `json:"complete"` }
PartitionStatus represents the state of a single consumed partition
type StatusConstant ¶
type StatusConstant int
StatusConstant describes the state of a partition or group as a single value. These values are ordered from least to most "bad", with zero being reserved to indicate that a group is not found.
const ( // StatusNotFound indicates that the consumer group does not exist. It is not used for partition status. StatusNotFound StatusConstant = 0 // StatusOK indicates that a partition is in a good state. For a group, it indicates that all partitions are in a // good state. StatusOK StatusConstant = 1 // StatusWarning indicates that a partition is lagging - it is making progress, but falling further behind. For a // group, it indicates that one or more partitions are lagging. StatusWarning StatusConstant = 2 // StatusError indicates that a group has one or more partitions that are in the Stop, Stall, or Rewind states. It // is not used for partition status. StatusError StatusConstant = 3 // StatusStop indicates that the consumer has not committed an offset for that partition in some time, and the lag // is non-zero. It is not used for group status. StatusStop StatusConstant = 4 // StatusStall indicates that the consumer is committing offsets for the partition, but they are not increasing and // the lag is non-zero. It is not used for group status. StatusStall StatusConstant = 5 // StatusRewind indicates that the consumer has committed an offset for the partition that is less than the // previous offset. It is not used for group status. StatusRewind StatusConstant = 6 )
func (StatusConstant) MarshalJSON ¶
func (c StatusConstant) MarshalJSON() ([]byte, error)
MarshalJSON implements the json.Marshaler interface. The status is the string representation of StatusConstant
func (StatusConstant) MarshalText ¶
func (c StatusConstant) MarshalText() ([]byte, error)
MarshalText implements the encoding.TextMarshaler interface. The status is the string representation of StatusConstant
func (StatusConstant) String ¶
func (c StatusConstant) String() string
String returns a string representation of a StatusConstant
type StorageRequest ¶
type StorageRequest struct { // The type of request that this struct encapsulates RequestType StorageRequestConstant // If the RequestType is a "Fetch" request, Reply must contain a channel to receive the response on Reply chan interface{} // The name of the cluster to which the request applies. Required for all request types except StorageFetchClusters Cluster string // The name of the consumer group to which the request applies Group string // The name of the topic to which the request applies Topic string // The ID of the partition to which the request applies Partition int32 // For StorageSetBrokerOffset requests, TopicPartitionCount indiciates the total number of partitions for the topic TopicPartitionCount int32 // For StorageSetBrokerOffset and StorageSetConsumerOffset requests, the offset to store Offset int64 // For StorageSetConsumerOffset requests, the timestamp of the offset being stored Timestamp int64 // For StorageSetConsumerOwner requests, a string describing the consumer host that owns the partition Owner string // For StorageSetConsumerOwner requests, a string containing the client_id set by the consumer ClientID string }
StorageRequest is sent over the StorageChannel that is stored in the application context. It is a query to either send information to the storage subsystem, or retrieve information from it . The RequestType indiciates the particular type of request. "Set" and "Clear" requests do not get a response. "Fetch" requests will send a response over the Reply channel supplied in the request
type StorageRequestConstant ¶
type StorageRequestConstant int
StorageRequestConstant is used in StorageRequest to indicate the type of request. Numeric ordering is not important
const ( // StorageSetBrokerOffset is the request type to store a broker offset. Requires Cluster, Topic, Partition, // TopicPartitionCount, and Offset fields StorageSetBrokerOffset StorageRequestConstant = 0 // StorageSetConsumerOffset is the request type to store a consumer offset. Requires Cluster, Group, Topic, // Partition, Offset, and Timestamp fields StorageSetConsumerOffset StorageRequestConstant = 1 // StorageSetConsumerOwner is the request type to store a consumer owner. Requires Cluster, Group, Topic, Partition, // and Owner fields StorageSetConsumerOwner StorageRequestConstant = 2 // StorageSetDeleteTopic is the request type to remove a topic from the broker and all consumers. Requires Cluster, // Group, and Topic fields StorageSetDeleteTopic StorageRequestConstant = 3 // StorageSetDeleteGroup is the request type to remove a consumer group. Requires Cluster and Group fields StorageSetDeleteGroup StorageRequestConstant = 4 // StorageFetchClusters is the request type to retrieve a list of clusters. Requires Reply. Returns a []string StorageFetchClusters StorageRequestConstant = 5 // StorageFetchConsumers is the request type to retrieve a list of consumer groups in a cluster. Requires Reply and // Cluster fields. Returns a []string StorageFetchConsumers StorageRequestConstant = 6 // StorageFetchTopics is the request type to retrieve a list of topics in a cluster. Requires Reply and Cluster // fields. Returns a []string StorageFetchTopics StorageRequestConstant = 7 // StorageFetchConsumer is the request type to retrieve all stored information for a single consumer group. Requires // Reply, Cluster, and Group fields. Returns a ConsumerTopics object StorageFetchConsumer StorageRequestConstant = 8 // StorageFetchTopic is the request type to retrieve the current broker offsets (one per partition) for a topic. // Requires Reply, Cluster, and Topic fields. // Returns a []int64 StorageFetchTopic StorageRequestConstant = 9 // StorageClearConsumerOwners is the request type to remove all partition owner information for a single group. // Requires Cluster and Group fields StorageClearConsumerOwners StorageRequestConstant = 10 // StorageFetchConsumersForTopic is the request type to obtain a list of all consumer groups consuming from a topic. // Returns a []string StorageFetchConsumersForTopic StorageRequestConstant = 11 )
func (StorageRequestConstant) MarshalJSON ¶
func (c StorageRequestConstant) MarshalJSON() ([]byte, error)
MarshalJSON implements the json.Marshaler interface. The status is the string representation of StorageRequestConstant
func (StorageRequestConstant) MarshalText ¶
func (c StorageRequestConstant) MarshalText() ([]byte, error)
MarshalText implements the encoding.TextMarshaler interface. The status is the string representation of StorageRequestConstant
func (StorageRequestConstant) String ¶
func (c StorageRequestConstant) String() string
String returns a string representation of a StorageRequestConstant for logging
type ZookeeperClient ¶
type ZookeeperClient interface { // Close the connection to Zookeeper Close() // For the given path in Zookeeper, return a slice of strings which list the immediate child nodes. This method also // sets a watch on the children of the specified path, providing an event channel that will receive a message when // the watch fires ChildrenW(path string) ([]string, *zk.Stat, <-chan zk.Event, error) // For the given path in Zookeeper, return the data in the node as a byte slice. This method also sets a watch on // the children of the specified path, providing an event channel that will receive a message when the watch fires GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error) // For the given path in Zookeeper, return a boolean stating whether or not the node exists. This method also sets // a watch on the node (exists if it does not currently exist, or a data watch otherwise), providing an event // channel that will receive a message when the watch fires ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) // Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be // provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is\ // desired, specify // zk.WorldACL(zk.PermAll) Create(string, []byte, int32, []zk.ACL) (string, error) // NewLock creates a lock using the provided path. Multiple Zookeeper clients, using the same lock path, can // synchronize with each other to assure that only one client has the lock at any point. NewLock(path string) ZookeeperLock }
ZookeeperClient is a minimal interface for working with a Zookeeper connection. We provide this interface, rather than using the underlying library directly, as it makes it easier to test code that uses Zookeeper. This interface should be expanded with additional methods as needed.
Note that the interface is specified in the protocol package, rather than in the helper package or the zookeeper coordinator package, as it has to be referenced by ApplicationContext. Moving it elsewhere generates a dependency loop.
type ZookeeperLock ¶
type ZookeeperLock interface { // Lock acquires the lock, blocking until it is able to do so, and returns nil. If the lock cannot be acquired, such // as if the session has been lost, a non-nil error will be returned instead. Lock() error // Unlock releases the lock, returning nil. If there is an error releasing the lock, such as if it is not held, an // error is returned instead. Unlock() error }
ZookeeperLock is an interface for the operation of a lock in Zookeeper. Multiple Zookeeper clients, using the same lock path, can synchronize with each other to assure that only one client has the lock at any point.