Documentation ¶
Overview ¶
Package rater provides the functionality to calculate the processing rate of each vertex partition. The processing rate is calculated based on the metric forwarder_data_read_total.
Index ¶
- Constants
- func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, ...) float64
- func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, ...)
- type Option
- type PodReadCount
- type PodTracker
- type PodTrackerOption
- type Ratable
- type Rater
- type TimestampedCounts
- type UniqueStringList
- func (l *UniqueStringList) Contains(value string) bool
- func (l *UniqueStringList) Front() string
- func (l *UniqueStringList) Length() int
- func (l *UniqueStringList) MoveToBack(value string)
- func (l *UniqueStringList) PushBack(value string)
- func (l *UniqueStringList) Remove(value string)
- func (l *UniqueStringList) ToString() string
Constants ¶
const CountWindow = time.Second * 10
CountWindow is the time window for which we maintain the timestamped counts, currently 10 seconds e.g., if the current time is 12:00:07, the retrieved count will be tracked in the 12:00:00-12:00:10 time window using 12:00:10 as the timestamp
Variables ¶
This section is empty.
Functions ¶
func CalculateRate ¶
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64
CalculateRate calculates the rate of the vertex partition in the last lookback seconds
func UpdateCount ¶
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount)
UpdateCount updates the count of processed messages for a pod at a given time
Types ¶
type PodReadCount ¶ added in v0.9.0
type PodReadCount struct {
// contains filtered or unexported fields
}
PodReadCount is a struct to maintain count of messages read from each partition by a pod
func (*PodReadCount) Name ¶ added in v0.9.0
func (p *PodReadCount) Name() string
func (*PodReadCount) PartitionReadCounts ¶ added in v0.9.0
func (p *PodReadCount) PartitionReadCounts() map[string]float64
type PodTracker ¶
type PodTracker struct {
// contains filtered or unexported fields
}
PodTracker maintains a set of active pods for a pipeline It periodically sends http requests to pods to check if they are still active
func NewPodTracker ¶
func NewPodTracker(ctx context.Context, p *v1alpha1.Pipeline, opts ...PodTrackerOption) *PodTracker
func (*PodTracker) GetActivePodsCount ¶ added in v0.10.0
func (pt *PodTracker) GetActivePodsCount() int
GetActivePodsCount returns the number of active pods.
func (*PodTracker) GetPodInfo ¶ added in v1.0.0
func (pt *PodTracker) GetPodInfo(key string) (*podInfo, error)
func (*PodTracker) IsActive ¶ added in v0.10.0
func (pt *PodTracker) IsActive(podKey string) bool
IsActive returns true if the pod is active, false otherwise.
func (*PodTracker) LeastRecentlyUsed ¶ added in v0.10.0
func (pt *PodTracker) LeastRecentlyUsed() string
LeastRecentlyUsed returns the least recently used pod from the active pod list. if there are no active pods, it returns an empty string.
type PodTrackerOption ¶
type PodTrackerOption func(*PodTracker)
func WithRefreshInterval ¶
func WithRefreshInterval(d time.Duration) PodTrackerOption
WithRefreshInterval sets how often to refresh the rate metrics.
type Rater ¶
type Rater struct {
// contains filtered or unexported fields
}
Rater is a struct that maintains information about the processing rate of each vertex. It monitors the number of processed messages for each pod in a vertex and calculates the rate.
type TimestampedCounts ¶
type TimestampedCounts struct {
// contains filtered or unexported fields
}
TimestampedCounts track the total count of processed messages for a list of pods at a given timestamp
func NewTimestampedCounts ¶
func NewTimestampedCounts(t int64) *TimestampedCounts
func (*TimestampedCounts) PodPartitionCountSnapshot ¶ added in v0.9.0
func (tc *TimestampedCounts) PodPartitionCountSnapshot() map[string]map[string]float64
PodPartitionCountSnapshot returns a copy of podPartitionCount it's used to ensure the returned map is not modified by other goroutines
func (*TimestampedCounts) String ¶ added in v0.10.0
func (tc *TimestampedCounts) String() string
String returns a string representation of the TimestampedCounts it's used for debugging purpose
func (*TimestampedCounts) Update ¶
func (tc *TimestampedCounts) Update(podReadCount *PodReadCount)
Update updates the count of processed messages for a pod
type UniqueStringList ¶
type UniqueStringList struct {
// contains filtered or unexported fields
}
UniqueStringList is a list of strings that only allows unique values. the underlying list is a doubly linked list.
func NewUniqueStringList ¶
func NewUniqueStringList() *UniqueStringList
func (*UniqueStringList) Contains ¶
func (l *UniqueStringList) Contains(value string) bool
Contains returns true if the value exists in the list.
func (*UniqueStringList) Front ¶
func (l *UniqueStringList) Front() string
Front returns the first string of list l or empty string if the list is empty.
func (*UniqueStringList) Length ¶
func (l *UniqueStringList) Length() int
Length returns the number of elements of list l.
func (*UniqueStringList) MoveToBack ¶
func (l *UniqueStringList) MoveToBack(value string)
MoveToBack moves the element to the back of the list, if the value exists in the list.
func (*UniqueStringList) PushBack ¶
func (l *UniqueStringList) PushBack(value string)
PushBack adds a value to the back of the list, if the value doesn't exist in the list.
func (*UniqueStringList) Remove ¶
func (l *UniqueStringList) Remove(value string)
Remove removes the string from the list, if it exists in the list.
func (*UniqueStringList) ToString ¶
func (l *UniqueStringList) ToString() string
ToString returns a comma separated string of the list values.