Documentation ¶
Overview ¶
Package rater provides the functionality to calculate the processing rate of each vertex partition. The processing rate is calculated based on the metric forwarder_data_read_total.
Index ¶
- Constants
- func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, ...) float64
- func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, ...)
- type Option
- type PodReadCount
- type PodTracker
- type PodTrackerOption
- type Ratable
- type Rater
- type TimestampedCounts
Constants ¶
const CountWindow = time.Second * 10
CountWindow is the time window for which we maintain the timestamped counts, currently 10 seconds e.g., if the current time is 12:00:07, the retrieved count will be tracked in the 12:00:00-12:00:10 time window using 12:00:10 as the timestamp
Variables ¶
This section is empty.
Functions ¶
func CalculateRate ¶
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64, partitionName string) float64
CalculateRate calculates the rate of the vertex partition in the last lookback seconds
func UpdateCount ¶
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount)
UpdateCount updates the count of processed messages for a pod at a given time
Types ¶
type PodReadCount ¶ added in v0.9.0
type PodReadCount struct {
// contains filtered or unexported fields
}
PodReadCount is a struct to maintain count of messages read from each partition by a pod
func (*PodReadCount) Name ¶ added in v0.9.0
func (p *PodReadCount) Name() string
func (*PodReadCount) PartitionReadCounts ¶ added in v0.9.0
func (p *PodReadCount) PartitionReadCounts() map[string]float64
type PodTracker ¶
type PodTracker struct {
// contains filtered or unexported fields
}
PodTracker maintains a set of active pods for a pipeline It periodically sends http requests to pods to check if they are still active
func NewPodTracker ¶
func NewPodTracker(ctx context.Context, p *v1alpha1.Pipeline, opts ...PodTrackerOption) *PodTracker
func (*PodTracker) GetActivePodsCount ¶ added in v0.10.0
func (pt *PodTracker) GetActivePodsCount() int
GetActivePodsCount returns the number of active pods.
func (*PodTracker) GetPodInfo ¶ added in v1.0.0
func (pt *PodTracker) GetPodInfo(key string) (*podInfo, error)
func (*PodTracker) IsActive ¶ added in v0.10.0
func (pt *PodTracker) IsActive(podKey string) bool
IsActive returns true if the pod is active, false otherwise.
func (*PodTracker) LeastRecentlyUsed ¶ added in v0.10.0
func (pt *PodTracker) LeastRecentlyUsed() string
LeastRecentlyUsed returns the least recently used pod from the active pod list. if there are no active pods, it returns an empty string.
type PodTrackerOption ¶
type PodTrackerOption func(*PodTracker)
func WithRefreshInterval ¶
func WithRefreshInterval(d time.Duration) PodTrackerOption
WithRefreshInterval sets how often to refresh the rate metrics.
type Ratable ¶ added in v0.9.0
type Ratable interface { Start(ctx context.Context) error GetRates(vertexName, partitionName string) map[string]*wrapperspb.DoubleValue }
type Rater ¶
type Rater struct {
// contains filtered or unexported fields
}
Rater is a struct that maintains information about the processing rate of each vertex. It monitors the number of processed messages for each pod in a vertex and calculates the rate.
func (*Rater) GetRates ¶
func (r *Rater) GetRates(vertexName, partitionName string) map[string]*wrapperspb.DoubleValue
GetRates returns the processing rates of the vertex partition in the format of lookback second to rate mappings
type TimestampedCounts ¶
type TimestampedCounts struct {
// contains filtered or unexported fields
}
TimestampedCounts track the total count of processed messages for a list of pods at a given timestamp
func NewTimestampedCounts ¶
func NewTimestampedCounts(t int64) *TimestampedCounts
func (*TimestampedCounts) PodPartitionCountSnapshot ¶ added in v0.9.0
func (tc *TimestampedCounts) PodPartitionCountSnapshot() map[string]map[string]float64
PodPartitionCountSnapshot returns a copy of podPartitionCount it's used to ensure the returned map is not modified by other goroutines
func (*TimestampedCounts) String ¶ added in v0.10.0
func (tc *TimestampedCounts) String() string
String returns a string representation of the TimestampedCounts it's used for debugging purpose
func (*TimestampedCounts) Update ¶
func (tc *TimestampedCounts) Update(podReadCount *PodReadCount)
Update updates the count of processed messages for a pod