rater

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2024 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const CountWindow = time.Second * 10
View Source
const MonoVtxReadMetricName = "monovtx_read_total"

Variables

This section is empty.

Functions

func CalculateRate

func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64

CalculateRate calculates the rate of a MonoVertex for a given lookback period.

func UpdateCount

func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount)

UpdateCount updates the count for a given timestamp in the queue.

Types

type MonoVtxRatable

type MonoVtxRatable interface {
	Start(ctx context.Context) error
	GetRates() map[string]*wrapperspb.DoubleValue
}

MonoVtxRatable is the interface for the Rater struct.

type Option

type Option func(*options)

func WithTaskInterval

func WithTaskInterval(n int) Option

func WithWorkers

func WithWorkers(n int) Option

type PodReadCount

type PodReadCount struct {
	// contains filtered or unexported fields
}

PodReadCount is a struct to maintain count of messages read by a pod of MonoVertex

func (*PodReadCount) Name

func (p *PodReadCount) Name() string

Name returns the pod name

func (*PodReadCount) ReadCount

func (p *PodReadCount) ReadCount() float64

ReadCount returns the value of the messages read by the Pod

type PodTracker

type PodTracker struct {
	// contains filtered or unexported fields
}

PodTracker maintains a set of active pods for a MonoVertex It periodically sends http requests to pods to check if they are still active

func NewPodTracker

func NewPodTracker(ctx context.Context, mv *v1alpha1.MonoVertex, opts ...PodTrackerOption) *PodTracker

func (*PodTracker) GetActivePodsCount

func (pt *PodTracker) GetActivePodsCount() int

GetActivePodsCount returns the number of active pods.

func (*PodTracker) GetPodInfo

func (pt *PodTracker) GetPodInfo(key string) (*podInfo, error)

func (*PodTracker) IsActive

func (pt *PodTracker) IsActive(podKey string) bool

IsActive returns true if the pod is active, false otherwise.

func (*PodTracker) LeastRecentlyUsed

func (pt *PodTracker) LeastRecentlyUsed() string

LeastRecentlyUsed returns the least recently used pod from the active pod list. if there are no active pods, it returns an empty string.

func (*PodTracker) Start

func (pt *PodTracker) Start(ctx context.Context) error

type PodTrackerOption

type PodTrackerOption func(*PodTracker)

func WithRefreshInterval

func WithRefreshInterval(d time.Duration) PodTrackerOption

WithRefreshInterval sets how often to refresh the rate metrics.

type Rater

type Rater struct {
	// contains filtered or unexported fields
}

Rater is a struct that maintains information about the processing rate of the MonoVertex. It monitors the number of processed messages for each pod in a MonoVertex and calculates the rate.

func NewRater

func NewRater(ctx context.Context, mv *v1alpha1.MonoVertex, opts ...Option) *Rater

func (*Rater) GetRates

func (r *Rater) GetRates() map[string]*wrapperspb.DoubleValue

GetRates returns the rate metrics for the MonoVertex. It calculates the rate metrics for the given lookback seconds.

func (*Rater) Start

func (r *Rater) Start(ctx context.Context) error

type TimestampedCounts

type TimestampedCounts struct {
	// contains filtered or unexported fields
}

TimestampedCounts track the total count of processed messages for a list of pods at a given timestamp

func NewTimestampedCounts

func NewTimestampedCounts(t int64) *TimestampedCounts

func (*TimestampedCounts) PodCountSnapshot

func (tc *TimestampedCounts) PodCountSnapshot() map[string]float64

PodCountSnapshot returns a copy of podReadCounts it's used to ensure the returned map is not modified by other goroutines

func (*TimestampedCounts) String

func (tc *TimestampedCounts) String() string

String returns a string representation of the TimestampedCounts it's used for debugging purpose

func (*TimestampedCounts) Update

func (tc *TimestampedCounts) Update(podReadCount *PodReadCount)

Update updates the count of processed messages for a pod

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL