rater

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package rater provides the functionality to calculate the processing rate of each vertex partition. The processing rate is calculated based on the metric forwarder_data_read_total.

Index

Constants

View Source
const CountWindow = time.Second * 10

CountWindow is the time window for which we maintain the timestamped counts, currently 10 seconds e.g., if the current time is 12:00:07, the retrieved count will be tracked in the 12:00:00-12:00:10 time window using 12:00:10 as the timestamp

Variables

This section is empty.

Functions

func CalculateRate

func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64

CalculateRate calculates the rate of the vertex partition in the last lookback seconds

func UpdateCount

func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount)

UpdateCount updates the count of processed messages for a pod at a given time

Types

type Option

type Option func(*options)

func WithTaskInterval

func WithTaskInterval(n int) Option

func WithWorkers

func WithWorkers(n int) Option

type PodReadCount added in v0.9.0

type PodReadCount struct {
	// contains filtered or unexported fields
}

PodReadCount is a struct to maintain count of messages read from each partition by a pod

func (*PodReadCount) Name added in v0.9.0

func (p *PodReadCount) Name() string

func (*PodReadCount) PartitionReadCounts added in v0.9.0

func (p *PodReadCount) PartitionReadCounts() map[string]float64

type PodTracker

type PodTracker struct {
	// contains filtered or unexported fields
}

PodTracker maintains a set of active pods for a pipeline It periodically sends http requests to pods to check if they are still active

func NewPodTracker

func NewPodTracker(ctx context.Context, p *v1alpha1.Pipeline, opts ...PodTrackerOption) *PodTracker

func (*PodTracker) GetActivePodsCount added in v0.10.0

func (pt *PodTracker) GetActivePodsCount() int

GetActivePodsCount returns the number of active pods.

func (*PodTracker) GetPodInfo added in v1.0.0

func (pt *PodTracker) GetPodInfo(key string) (*podInfo, error)

func (*PodTracker) IsActive added in v0.10.0

func (pt *PodTracker) IsActive(podKey string) bool

IsActive returns true if the pod is active, false otherwise.

func (*PodTracker) LeastRecentlyUsed added in v0.10.0

func (pt *PodTracker) LeastRecentlyUsed() string

LeastRecentlyUsed returns the least recently used pod from the active pod list. if there are no active pods, it returns an empty string.

func (*PodTracker) Start

func (pt *PodTracker) Start(ctx context.Context) error

type PodTrackerOption

type PodTrackerOption func(*PodTracker)

func WithRefreshInterval

func WithRefreshInterval(d time.Duration) PodTrackerOption

WithRefreshInterval sets how often to refresh the rate metrics.

type Ratable added in v0.9.0

type Ratable interface {
	Start(ctx context.Context) error
	GetRates(vertexName, partitionName string) map[string]*wrapperspb.DoubleValue
}

type Rater

type Rater struct {
	// contains filtered or unexported fields
}

Rater is a struct that maintains information about the processing rate of each vertex. It monitors the number of processed messages for each pod in a vertex and calculates the rate.

func NewRater

func NewRater(ctx context.Context, p *v1alpha1.Pipeline, opts ...Option) *Rater

func (*Rater) GetRates

func (r *Rater) GetRates(vertexName, partitionName string) map[string]*wrapperspb.DoubleValue

GetRates returns the processing rates of the vertex partition in the format of lookback second to rate mappings

func (*Rater) Start

func (r *Rater) Start(ctx context.Context) error

type TimestampedCounts

type TimestampedCounts struct {
	// contains filtered or unexported fields
}

TimestampedCounts track the total count of processed messages for a list of pods at a given timestamp

func NewTimestampedCounts

func NewTimestampedCounts(t int64) *TimestampedCounts

func (*TimestampedCounts) PodPartitionCountSnapshot added in v0.9.0

func (tc *TimestampedCounts) PodPartitionCountSnapshot() map[string]map[string]float64

PodPartitionCountSnapshot returns a copy of podPartitionCount it's used to ensure the returned map is not modified by other goroutines

func (*TimestampedCounts) String added in v0.10.0

func (tc *TimestampedCounts) String() string

String returns a string representation of the TimestampedCounts it's used for debugging purpose

func (*TimestampedCounts) Update

func (tc *TimestampedCounts) Update(podReadCount *PodReadCount)

Update updates the count of processed messages for a pod

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL