Documentation ¶
Index ¶
- Constants
- func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64
- func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, ...)
- type MonoVtxRatable
- type Option
- type PodReadCount
- type PodTracker
- type PodTrackerOption
- type Rater
- type TimestampedCounts
Constants ¶
const CountWindow = time.Second * 10
Variables ¶
This section is empty.
Functions ¶
func CalculateRate ¶
func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSeconds int64) float64
CalculateRate calculates the rate of a MonoVertex for a given lookback period.
func UpdateCount ¶
func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podReadCounts *PodReadCount)
UpdateCount updates the count for a given timestamp in the queue.
Types ¶
type MonoVtxRatable ¶
type MonoVtxRatable interface { Start(ctx context.Context) error GetRates() map[string]*wrapperspb.DoubleValue }
MonoVtxRatable is the interface for the Rater struct.
type PodReadCount ¶
type PodReadCount struct {
// contains filtered or unexported fields
}
PodReadCount is a struct to maintain count of messages read by a pod of MonoVertex
func (*PodReadCount) ReadCount ¶
func (p *PodReadCount) ReadCount() float64
ReadCount returns the value of the messages read by the Pod
type PodTracker ¶
type PodTracker struct {
// contains filtered or unexported fields
}
PodTracker maintains a set of active pods for a MonoVertex It periodically sends http requests to pods to check if they are still active
func NewPodTracker ¶
func NewPodTracker(ctx context.Context, mv *v1alpha1.MonoVertex, opts ...PodTrackerOption) *PodTracker
func (*PodTracker) GetActivePodsCount ¶
func (pt *PodTracker) GetActivePodsCount() int
GetActivePodsCount returns the number of active pods.
func (*PodTracker) GetPodInfo ¶
func (pt *PodTracker) GetPodInfo(key string) (*podInfo, error)
func (*PodTracker) IsActive ¶
func (pt *PodTracker) IsActive(podKey string) bool
IsActive returns true if the pod is active, false otherwise.
func (*PodTracker) LeastRecentlyUsed ¶
func (pt *PodTracker) LeastRecentlyUsed() string
LeastRecentlyUsed returns the least recently used pod from the active pod list. if there are no active pods, it returns an empty string.
type PodTrackerOption ¶
type PodTrackerOption func(*PodTracker)
func WithRefreshInterval ¶
func WithRefreshInterval(d time.Duration) PodTrackerOption
WithRefreshInterval sets how often to refresh the rate metrics.
type Rater ¶
type Rater struct {
// contains filtered or unexported fields
}
Rater is a struct that maintains information about the processing rate of the MonoVertex. It monitors the number of processed messages for each pod in a MonoVertex and calculates the rate.
func (*Rater) GetRates ¶
func (r *Rater) GetRates() map[string]*wrapperspb.DoubleValue
GetRates returns the rate metrics for the MonoVertex. It calculates the rate metrics for the given lookback seconds.
type TimestampedCounts ¶
type TimestampedCounts struct {
// contains filtered or unexported fields
}
TimestampedCounts track the total count of processed messages for a list of pods at a given timestamp
func NewTimestampedCounts ¶
func NewTimestampedCounts(t int64) *TimestampedCounts
func (*TimestampedCounts) PodCountSnapshot ¶
func (tc *TimestampedCounts) PodCountSnapshot() map[string]float64
PodCountSnapshot returns a copy of podReadCounts it's used to ensure the returned map is not modified by other goroutines
func (*TimestampedCounts) String ¶
func (tc *TimestampedCounts) String() string
String returns a string representation of the TimestampedCounts it's used for debugging purpose
func (*TimestampedCounts) Update ¶
func (tc *TimestampedCounts) Update(podReadCount *PodReadCount)
Update updates the count of processed messages for a pod