Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var Enabled bool
Functions ¶
func ConfigProcess ¶
func ConfigProcess(instance string)
func ConfigSetup ¶
func ConfigSetup()
Types ¶
type Explanation ¶
type KafkaMdm ¶
func (*KafkaMdm) ExplainPriority ¶
func (k *KafkaMdm) ExplainPriority() interface{}
func (*KafkaMdm) MaintainPriority ¶
func (k *KafkaMdm) MaintainPriority()
type LagMonitor ¶
LagMonitor determines how upToDate this node is. For each partition, we periodically collect: * the consumption lag (we keep the last N measurements) * ingest rate We then combine this data into a score, see the Metric() method.
func NewLagMonitor ¶
func NewLagMonitor(size int, partitions []int32) *LagMonitor
func (*LagMonitor) Explain ¶
func (l *LagMonitor) Explain() interface{}
func (*LagMonitor) GetPartitionPriority ¶ added in v0.13.1
func (l *LagMonitor) GetPartitionPriority(partition int32) int
func (*LagMonitor) Metric ¶
func (l *LagMonitor) Metric() int
Metric computes the overall score of up-to-date-ness of this node, as an estimated number of seconds behind kafka. We first compute the score for each partition like so: (minimum lag seen in last N measurements) / input rate. example: lag (in messages/metrics) input rate ---> score (seconds behind)
10k 1k/second 10 200 1k/second 0 (less than 1s behind) 0 * 0 (perfectly in sync) anything 0 (after startup) same as lag
The returned total score for the node is the max of the scores of individual partitions. Note that one or more StoreOffset() (rate) calls may have been made but no StoreLag(). This can happen in 3 cases: - we're not consuming yet - trouble querying the partition for latest offset - consumePartition() has called StoreOffset() but the code hasn't advanced yet to StoreLag()
func (*LagMonitor) StoreOffsets ¶ added in v0.12.0
func (l *LagMonitor) StoreOffsets(partition int32, readOffset, highWaterMark int64, ts time.Time)