Documentation ¶
Index ¶
- Variables
- type BasicTokenBucket
- func (btb *BasicTokenBucket) GetFillRate() float64
- func (btb *BasicTokenBucket) GetPassThrough() bool
- func (btb *BasicTokenBucket) PreprocessRequest(_ context.Context, request *Request) bool
- func (btb *BasicTokenBucket) Return(_ context.Context, tokens float64, _ string)
- func (btb *BasicTokenBucket) SetFillRate(fillRate float64)
- func (btb *BasicTokenBucket) SetPassThrough(passThrough bool)
- func (btb *BasicTokenBucket) Take(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
- func (btb *BasicTokenBucket) TakeIfAvailable(_ context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
- type GlobalTokenBucket
- func (gtb *GlobalTokenBucket) GetPassThrough() bool
- func (gtb *GlobalTokenBucket) PreprocessRequest(_ context.Context, request *Request) bool
- func (gtb *GlobalTokenBucket) Return(ctx context.Context, tokens float64, _ string)
- func (gtb *GlobalTokenBucket) SetPassThrough(passthrough bool)
- func (gtb *GlobalTokenBucket) Take(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
- func (gtb *GlobalTokenBucket) TakeIfAvailable(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
- type GlobalTokenCounter
- func (gtc *GlobalTokenCounter) GetPassThrough() bool
- func (gtc *GlobalTokenCounter) PreprocessRequest(_ context.Context, request *Request) bool
- func (gtc *GlobalTokenCounter) Return(ctx context.Context, tokens float64, reqID string)
- func (gtc *GlobalTokenCounter) SetPassThrough(passthrough bool)
- func (gtc *GlobalTokenCounter) Take(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
- func (gtc *GlobalTokenCounter) TakeIfAvailable(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
- type LoadMultiplierTokenBucket
- func (tbls *LoadMultiplierTokenBucket) GetPassThrough() bool
- func (tbls *LoadMultiplierTokenBucket) LoadMultiplier() float64
- func (tbls *LoadMultiplierTokenBucket) PreprocessRequest(_ context.Context, request *Request) bool
- func (tbls *LoadMultiplierTokenBucket) Return(_ context.Context, tokens float64, _ string)
- func (tbls *LoadMultiplierTokenBucket) SetContinuousTracking(continuousTracking bool)
- func (tbls *LoadMultiplierTokenBucket) SetLoadDecisionValues(loadDecision *policysyncv1.LoadDecision)
- func (tbls *LoadMultiplierTokenBucket) SetPassThrough(passThrough bool)
- func (tbls *LoadMultiplierTokenBucket) Take(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
- func (tbls *LoadMultiplierTokenBucket) TakeIfAvailable(_ context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
- type PreemptionMetrics
- type Request
- type Scheduler
- type TokenBucketMetrics
- type TokenManager
- type WFQMetrics
- type WFQScheduler
- func (sched *WFQScheduler) GetPendingFlows() int
- func (sched *WFQScheduler) GetPendingRequests() int
- func (sched *WFQScheduler) Identifiers(workloadLabel, fairnessLabel string, priority float64, generation uint64) (string, string)
- func (sched *WFQScheduler) Info() (time.Time, int)
- func (sched *WFQScheduler) Schedule(ctx context.Context, request *Request) (bool, float64, float64, string)
- type WindowedCounter
- type WorkloadState
Constants ¶
This section is empty.
Variables ¶
var (
NumFairnessQueues = 1 << 8
)
Memory pool for heapRequest(s).
Functions ¶
This section is empty.
Types ¶
type BasicTokenBucket ¶
type BasicTokenBucket struct {
// contains filtered or unexported fields
}
BasicTokenBucket is a basic token bucket implementation.
func NewBasicTokenBucket ¶
func NewBasicTokenBucket(clk clockwork.Clock, fillRate float64, metrics *TokenBucketMetrics) *BasicTokenBucket
NewBasicTokenBucket creates a new BasicTokenBucket with adjusted fill rate.
func (*BasicTokenBucket) GetFillRate ¶
func (btb *BasicTokenBucket) GetFillRate() float64
GetFillRate returns the fill rate of the BasicTokenBucket.
func (*BasicTokenBucket) GetPassThrough ¶
func (btb *BasicTokenBucket) GetPassThrough() bool
GetPassThrough returns the passThrough flag of the BasicTokenBucket.
func (*BasicTokenBucket) PreprocessRequest ¶
func (btb *BasicTokenBucket) PreprocessRequest(_ context.Context, request *Request) bool
PreprocessRequest decides whether to proactively accept a request.
func (*BasicTokenBucket) Return ¶
func (btb *BasicTokenBucket) Return(_ context.Context, tokens float64, _ string)
Return returns tokens to the basic token bucket.
func (*BasicTokenBucket) SetFillRate ¶
func (btb *BasicTokenBucket) SetFillRate(fillRate float64)
SetFillRate adjusts the fill rate of the BasicTokenBucket.
func (*BasicTokenBucket) SetPassThrough ¶
func (btb *BasicTokenBucket) SetPassThrough(passThrough bool)
SetPassThrough sets the passThrough flag of the BasicTokenBucket.
func (*BasicTokenBucket) Take ¶
func (btb *BasicTokenBucket) Take(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
Take takes tokens from the basic token bucket even if available tokens are less than asked. If tokens are not available at the moment, it will return amount of wait time and checks whether the operation was successful or not.
type GlobalTokenBucket ¶
type GlobalTokenBucket struct {
// contains filtered or unexported fields
}
GlobalTokenBucket is a distributed rate-limiter token bucket implementation.
func NewGlobalTokenBucket ¶
func NewGlobalTokenBucket(key string, limiter ratelimiter.RateLimiter) *GlobalTokenBucket
NewGlobalTokenBucket creates a new instance of GlobalTokenBucket.
func (*GlobalTokenBucket) GetPassThrough ¶
func (gtb *GlobalTokenBucket) GetPassThrough() bool
GetPassThrough returns the passthrough value.
func (*GlobalTokenBucket) PreprocessRequest ¶
func (gtb *GlobalTokenBucket) PreprocessRequest(_ context.Context, request *Request) bool
PreprocessRequest is a no-op.
func (*GlobalTokenBucket) Return ¶
func (gtb *GlobalTokenBucket) Return(ctx context.Context, tokens float64, _ string)
Return returns tokens.
func (*GlobalTokenBucket) SetPassThrough ¶
func (gtb *GlobalTokenBucket) SetPassThrough(passthrough bool)
SetPassThrough sets the passthrough value.
type GlobalTokenCounter ¶ added in v2.29.0
type GlobalTokenCounter struct {
// contains filtered or unexported fields
}
GlobalTokenCounter is a distributed rate-limiter token bucket implementation.
func NewGlobalTokenCounter ¶ added in v2.29.0
func NewGlobalTokenCounter(key string, limiter concurrencylimiter.ConcurrencyLimiter) *GlobalTokenCounter
NewGlobalTokenCounter creates a new instance of GlobalTokenCounter.
func (*GlobalTokenCounter) GetPassThrough ¶ added in v2.29.0
func (gtc *GlobalTokenCounter) GetPassThrough() bool
GetPassThrough returns the passthrough value.
func (*GlobalTokenCounter) PreprocessRequest ¶ added in v2.29.0
func (gtc *GlobalTokenCounter) PreprocessRequest(_ context.Context, request *Request) bool
PreprocessRequest is a no-op.
func (*GlobalTokenCounter) Return ¶ added in v2.29.0
func (gtc *GlobalTokenCounter) Return(ctx context.Context, tokens float64, reqID string)
Return returns tokens.
func (*GlobalTokenCounter) SetPassThrough ¶ added in v2.29.0
func (gtc *GlobalTokenCounter) SetPassThrough(passthrough bool)
SetPassThrough sets the passthrough value.
type LoadMultiplierTokenBucket ¶
type LoadMultiplierTokenBucket struct {
// contains filtered or unexported fields
}
LoadMultiplierTokenBucket is a token bucket with load multiplier.
func NewLoadMultiplierTokenBucket ¶
func NewLoadMultiplierTokenBucket( clk clockwork.Clock, slotCount uint8, slotDuration time.Duration, lmGauge prometheus.Gauge, tbMetrics *TokenBucketMetrics, ) *LoadMultiplierTokenBucket
NewLoadMultiplierTokenBucket creates a new TokenBucketLoadMultiplier.
func (*LoadMultiplierTokenBucket) GetPassThrough ¶
func (tbls *LoadMultiplierTokenBucket) GetPassThrough() bool
GetPassThrough gets value of passThrough flag.
func (*LoadMultiplierTokenBucket) LoadMultiplier ¶
func (tbls *LoadMultiplierTokenBucket) LoadMultiplier() float64
LoadMultiplier returns the current load multiplier.
func (*LoadMultiplierTokenBucket) PreprocessRequest ¶
func (tbls *LoadMultiplierTokenBucket) PreprocessRequest(_ context.Context, request *Request) bool
PreprocessRequest preprocesses a request and makes decision whether to pro-actively accept a request.
func (*LoadMultiplierTokenBucket) Return ¶
func (tbls *LoadMultiplierTokenBucket) Return(_ context.Context, tokens float64, _ string)
Return returns tokens to the token bucket.
func (*LoadMultiplierTokenBucket) SetContinuousTracking ¶
func (tbls *LoadMultiplierTokenBucket) SetContinuousTracking(continuousTracking bool)
SetContinuousTracking sets whether to continuously track the token rate and adjust the fill rate based on load multiplier.
func (*LoadMultiplierTokenBucket) SetLoadDecisionValues ¶ added in v2.5.0
func (tbls *LoadMultiplierTokenBucket) SetLoadDecisionValues(loadDecision *policysyncv1.LoadDecision)
SetLoadDecisionValues sets the load multiplier number --> 0 = no load accepted, 1 = accept up to 100% of current load, 2 = accept up to 200% of current load.
func (*LoadMultiplierTokenBucket) SetPassThrough ¶
func (tbls *LoadMultiplierTokenBucket) SetPassThrough(passThrough bool)
SetPassThrough sets PassThrough flag which decides whether to pass through requests.
func (*LoadMultiplierTokenBucket) Take ¶
func (tbls *LoadMultiplierTokenBucket) Take(ctx context.Context, tokens float64) (bool, time.Duration, float64, float64, string)
Take takes tokens from the token bucket even if available tokens are less than asked. If tokens are not available at the moment, it will return amount of wait time and checks whether the operation was successful or not.
type PreemptionMetrics ¶ added in v2.31.0
type PreemptionMetrics struct {
// contains filtered or unexported fields
}
PreemptionMetrics holds metrics related to preemption and delay for a queuing system.
func NewPreemptionMetrics ¶ added in v2.31.0
func NewPreemptionMetrics( metricsLabels prometheus.Labels, workloadPreemptedTokensSummary *prometheus.SummaryVec, workloadDelayedTokensSummary *prometheus.SummaryVec, workloadOnTimeCounter *prometheus.CounterVec, ) *PreemptionMetrics
NewPreemptionMetrics creates a new PreemptionMetrics object.
type Request ¶
type Request struct { WorkloadLabel string // for identifying workload FairnessLabel string // for enforcing fairness Tokens float64 // tokens (e.g. expected latency or complexity) for this request InvPriority float64 // larger values represent higher priority }
Request is metadata for request in a flow that is to be allowed or dropped based on controlled delay and queue limits.
type Scheduler ¶
type Scheduler interface { // Schedule sends RequestContext to the underlying scheduler and returns a boolean value, // where true means accept and false means reject. Schedule(ctx context.Context, request *Request) (accept bool, remaining float64, current float64, reqID string) // Info returns the last access time and number of requests that are currently in the queue. Info() (time.Time, int) // Identifiers returns flowID and workloadID for the given request Identifiers(workloadLabel, fairnessLabel string, priority float64, generation uint64) (string, string) }
Scheduler : Interface for schedulers.
func NewWFQScheduler ¶
func NewWFQScheduler(clk clockwork.Clock, tokenManger TokenManager, metrics *WFQMetrics, metricsLabels prometheus.Labels) Scheduler
NewWFQScheduler creates a new weighted fair queue scheduler.
type TokenBucketMetrics ¶
type TokenBucketMetrics struct { FillRateGauge prometheus.Gauge BucketCapacityGauge prometheus.Gauge AvailableTokensGauge prometheus.Gauge }
TokenBucketMetrics holds metrics related to internals of TokenBucket.
type TokenManager ¶
type TokenManager interface { // Take tokens if available, otherwise return false TakeIfAvailable(ctx context.Context, tokens float64) (allowed bool, waitTime time.Duration, remaining float64, current float64, requestID string) // Take tokens even if available tokens are less than asked - returns wait time if tokens are not available immediately. The other return value conveys whether the operation was successful or not. Take(ctx context.Context, tokens float64) (allowed bool, waitTime time.Duration, remaining float64, current float64, requestID string) // Return tokens, useful when requests choose to drop themselves on timeout or cancellation Return(ctx context.Context, tokens float64, requestID string) // Provides TokenManager the request that the scheduler processing -- some TokenManager implementations use this level of visibility for their algorithms. Return value decides whether the request has to be accepted right away in case TokenManger is not yet ready or configured to accept all traffic (short circuit). PreprocessRequest(ctx context.Context, request *Request) (accept bool) }
TokenManager : Interface for token managers.
type WFQMetrics ¶
type WFQMetrics struct { IncomingTokensCounter prometheus.Counter AcceptedTokensCounter prometheus.Counter RejectedTokensCounter prometheus.Counter RequestInQueueDurationSummary *prometheus.SummaryVec WorkloadPreemptedTokensSummary *prometheus.SummaryVec WorkloadDelayedTokensSummary *prometheus.SummaryVec WorkloadOnTimeCounter *prometheus.CounterVec FairnessPreemptedTokensSummary *prometheus.SummaryVec FairnessDelayedTokensSummary *prometheus.SummaryVec FairnessOnTimeCounter *prometheus.CounterVec }
WFQMetrics holds metrics related to internal workings of WFQScheduler.
type WFQScheduler ¶
type WFQScheduler struct {
// contains filtered or unexported fields
}
WFQScheduler : Weighted Fair Queue Scheduler.
func (*WFQScheduler) GetPendingFlows ¶
func (sched *WFQScheduler) GetPendingFlows() int
GetPendingFlows returns the number of flows in the scheduler.
func (*WFQScheduler) GetPendingRequests ¶
func (sched *WFQScheduler) GetPendingRequests() int
GetPendingRequests returns the number of requests in the scheduler.
func (*WFQScheduler) Identifiers ¶ added in v2.31.0
func (sched *WFQScheduler) Identifiers(workloadLabel, fairnessLabel string, priority float64, generation uint64) (string, string)
Identifiers computes fairnessQueueID by hashing fairnessLabel and doing a bit-wise AND with number of fairness queues. Constructs workloadID by appending workloadLabel, Priority and Generation. Constructs flowID by appending workloadID and fairnessQueueID.
type WindowedCounter ¶
type WindowedCounter struct {
// contains filtered or unexported fields
}
WindowedCounter is a token bucket with a windowed counter.
func NewWindowedCounter ¶
func NewWindowedCounter(clk clockwork.Clock, totalSlots uint8, slotDuration time.Duration) *WindowedCounter
NewWindowedCounter creates a new WindowedCounter with extra slot for the current window.
func (*WindowedCounter) AddTokens ¶
func (counter *WindowedCounter) AddTokens(request *Request) bool
AddTokens to the counter. Return value is true when counter shifted slots and the all the slots in the counter is valid.
func (*WindowedCounter) CalculateTokenRate ¶
func (counter *WindowedCounter) CalculateTokenRate() float64
CalculateTokenRate returns the calculated token rate in the current window.
func (*WindowedCounter) IsBootstrapping ¶
func (counter *WindowedCounter) IsBootstrapping() bool
IsBootstrapping checks whether the counter is in bootstrapping mode.
type WorkloadState ¶ added in v2.31.0
type WorkloadState struct {
// contains filtered or unexported fields
}
WorkloadState holds the state of a workload.