kafkamdm

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 30, 2020 License: AGPL-3.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var Enabled bool

Functions

func ConfigProcess

func ConfigProcess(instance string)

func ConfigSetup

func ConfigSetup()

Types

type Explanation

type Explanation struct {
	Status   map[int32]Status
	Priority int
	Updated  time.Time
}

type KafkaMdm

type KafkaMdm struct {
	input.Handler
	// contains filtered or unexported fields
}

func New

func New() *KafkaMdm

func (*KafkaMdm) ExplainPriority

func (k *KafkaMdm) ExplainPriority() interface{}

func (*KafkaMdm) MaintainPriority

func (k *KafkaMdm) MaintainPriority()

func (*KafkaMdm) Name

func (k *KafkaMdm) Name() string

func (*KafkaMdm) Start

func (k *KafkaMdm) Start(handler input.Handler, cancel context.CancelFunc) error

func (*KafkaMdm) Stop

func (k *KafkaMdm) Stop()

Stop will initiate a graceful stop of the Consumer (permanent) and block until it stopped.

type LagMonitor

type LagMonitor struct {
	sync.Mutex
	// contains filtered or unexported fields
}

LagMonitor determines how upToDate this node is. For each partition, we periodically collect: * the consumption lag (we keep the last N measurements) * ingest rate We then combine this data into a score, see the Metric() method.

func NewLagMonitor

func NewLagMonitor(size int, partitions []int32) *LagMonitor

func (*LagMonitor) Explain

func (l *LagMonitor) Explain() interface{}

func (*LagMonitor) GetPartitionPriority added in v0.13.1

func (l *LagMonitor) GetPartitionPriority(partition int32) int

func (*LagMonitor) Metric

func (l *LagMonitor) Metric() int

Metric computes the overall score of up-to-date-ness of this node, as an estimated number of seconds behind kafka. We first compute the score for each partition like so: (minimum lag seen in last N measurements) / input rate. example: lag (in messages/metrics) input rate ---> score (seconds behind)

    10k       1k/second                 10
    200       1k/second                  0 (less than 1s behind)
      0               *                  0 (perfectly in sync)
anything     0 (after startup)          same as lag

The returned total score for the node is the max of the scores of individual partitions. Note that one or more StoreOffset() (rate) calls may have been made but no StoreLag(). This can happen in 3 cases: - we're not consuming yet - trouble querying the partition for latest offset - consumePartition() has called StoreOffset() but the code hasn't advanced yet to StoreLag()

func (*LagMonitor) StoreOffsets added in v0.12.0

func (l *LagMonitor) StoreOffsets(partition int32, readOffset, highWaterMark int64, ts time.Time)

type Status

type Status struct {
	Lag      int
	Rate     int
	Priority int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL