rater

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 3, 2023 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Overview

Package rater provides the functionality to calculate the processing rate of each vertex partition. The processing rate is calculated based on the metric forwarder_data_read_total.

Index

Constants

View Source
const CountWindow = time.Second * 10

CountWindow is the time window for which we maintain the timestamped counts, currently 10 seconds e.g., if the current time is 12:00:07, the retrieved count will be tracked in the 12:00:00-12:00:10 time window using 12:00:10 as the timestamp

Variables

This section is empty.

Functions

func CalculateRate

func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64

CalculateRate calculates the rate of the vertex partition in the last lookback seconds

func UpdateCount

func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount)

UpdateCount updates the count of processed messages for a pod at a given time

Types

type Option

type Option func(*options)

func WithTaskInterval

func WithTaskInterval(n int) Option

func WithWorkers

func WithWorkers(n int) Option

type PodReadCount added in v0.9.0

type PodReadCount struct {
	// contains filtered or unexported fields
}

PodReadCount is a struct to maintain count of messages read from each partition by a pod

func (*PodReadCount) Name added in v0.9.0

func (p *PodReadCount) Name() string

func (*PodReadCount) PartitionReadCounts added in v0.9.0

func (p *PodReadCount) PartitionReadCounts() map[string]float64

type PodTracker

type PodTracker struct {
	// contains filtered or unexported fields
}

PodTracker maintains a set of active pods for a pipeline It periodically sends http requests to pods to check if they are still active

func NewPodTracker

func NewPodTracker(ctx context.Context, p *v1alpha1.Pipeline, opts ...PodTrackerOption) *PodTracker

func (*PodTracker) GetActivePodsCount added in v0.10.0

func (pt *PodTracker) GetActivePodsCount() int

GetActivePodsCount returns the number of active pods.

func (*PodTracker) GetPodInfo added in v1.0.0

func (pt *PodTracker) GetPodInfo(key string) (*podInfo, error)

func (*PodTracker) IsActive added in v0.10.0

func (pt *PodTracker) IsActive(podKey string) bool

IsActive returns true if the pod is active, false otherwise.

func (*PodTracker) LeastRecentlyUsed added in v0.10.0

func (pt *PodTracker) LeastRecentlyUsed() string

LeastRecentlyUsed returns the least recently used pod from the active pod list. if there are no active pods, it returns an empty string.

func (*PodTracker) Start

func (pt *PodTracker) Start(ctx context.Context) error

type PodTrackerOption

type PodTrackerOption func(*PodTracker)

func WithRefreshInterval

func WithRefreshInterval(d time.Duration) PodTrackerOption

WithRefreshInterval sets how often to refresh the rate metrics.

type Ratable added in v0.9.0

type Ratable interface {
	Start(ctx context.Context) error
	GetRates(vertexName, partitionName string) map[string]float64
}

type Rater

type Rater struct {
	// contains filtered or unexported fields
}

Rater is a struct that maintains information about the processing rate of each vertex. It monitors the number of processed messages for each pod in a vertex and calculates the rate.

func NewRater

func NewRater(ctx context.Context, p *v1alpha1.Pipeline, opts ...Option) *Rater

func (*Rater) GetRates

func (r *Rater) GetRates(vertexName, partitionName string) map[string]float64

GetRates returns the processing rates of the vertex partition in the format of lookback second to rate mappings

func (*Rater) Start

func (r *Rater) Start(ctx context.Context) error

type TimestampedCounts

type TimestampedCounts struct {
	// contains filtered or unexported fields
}

TimestampedCounts track the total count of processed messages for a list of pods at a given timestamp

func NewTimestampedCounts

func NewTimestampedCounts(t int64) *TimestampedCounts

func (*TimestampedCounts) PodPartitionCountSnapshot added in v0.9.0

func (tc *TimestampedCounts) PodPartitionCountSnapshot() map[string]map[string]float64

PodPartitionCountSnapshot returns a copy of podPartitionCount it's used to ensure the returned map is not modified by other goroutines

func (*TimestampedCounts) String added in v0.10.0

func (tc *TimestampedCounts) String() string

String returns a string representation of the TimestampedCounts it's used for debugging purpose

func (*TimestampedCounts) Update

func (tc *TimestampedCounts) Update(podReadCount *PodReadCount)

Update updates the count of processed messages for a pod

type UniqueStringList

type UniqueStringList struct {
	// contains filtered or unexported fields
}

UniqueStringList is a list of strings that only allows unique values. the underlying list is a doubly linked list.

func NewUniqueStringList

func NewUniqueStringList() *UniqueStringList

func (*UniqueStringList) Contains

func (l *UniqueStringList) Contains(value string) bool

Contains returns true if the value exists in the list.

func (*UniqueStringList) Front

func (l *UniqueStringList) Front() string

Front returns the first string of list l or empty string if the list is empty.

func (*UniqueStringList) Length

func (l *UniqueStringList) Length() int

Length returns the number of elements of list l.

func (*UniqueStringList) MoveToBack

func (l *UniqueStringList) MoveToBack(value string)

MoveToBack moves the element to the back of the list, if the value exists in the list.

func (*UniqueStringList) PushBack

func (l *UniqueStringList) PushBack(value string)

PushBack adds a value to the back of the list, if the value doesn't exist in the list.

func (*UniqueStringList) Remove

func (l *UniqueStringList) Remove(value string)

Remove removes the string from the list, if it exists in the list.

func (*UniqueStringList) ToString

func (l *UniqueStringList) ToString() string

ToString returns a comma separated string of the list values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL