Documentation ¶
Overview ¶
Package limit provides several useful limit implementations.
Index ¶
- Constants
- type AIMDLimit
- type BuiltinLimitLogger
- type FixedLimit
- type Gradient2Limit
- type GradientLimit
- type Logger
- type NoopLimitLogger
- type SettableLimit
- type TracedLimit
- type VegasLimit
- func NewDefaultVegasLimit(name string, logger Logger, registry core.MetricRegistry, tags ...string) *VegasLimit
- func NewDefaultVegasLimitWithLimit(name string, initialLimit int, logger Logger, registry core.MetricRegistry, ...) *VegasLimit
- func NewVegasLimitWithRegistry(name string, initialLimit int, rttNoLoad core.MeasurementInterface, ...) *VegasLimit
- type WindowedLimit
Constants ¶
const ProbeDisabled = -1
ProbeDisabled represents the disabled value for probing.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AIMDLimit ¶
type AIMDLimit struct {
// contains filtered or unexported fields
}
AIMDLimit implements a Loss based dynamic Limit that does an additive increment as long as there are no errors and a multiplicative decrement when there is an error.
func NewAIMDLimit ¶
func NewAIMDLimit( name string, initialLimit int, backOffRatio float64, increaseBy int, registry core.MetricRegistry, tags ...string, ) *AIMDLimit
NewAIMDLimit will create a new AIMDLimit.
func NewDefaultAIMDLimit ¶ added in v0.6.0
func NewDefaultAIMDLimit( name string, registry core.MetricRegistry, tags ...string, ) *AIMDLimit
NewDefaultAIMDLimit will create a default AIMDLimit.
func (*AIMDLimit) BackOffRatio ¶
BackOffRatio return the current back-off-ratio for the AIMDLimit
func (*AIMDLimit) EstimatedLimit ¶
EstimatedLimit returns the current estimated limit.
func (*AIMDLimit) NotifyOnChange ¶
func (l *AIMDLimit) NotifyOnChange(consumer core.LimitChangeListener)
NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
type BuiltinLimitLogger ¶
type BuiltinLimitLogger struct{}
BuiltinLimitLogger implements a STDOUT limit logger.
func (BuiltinLimitLogger) Debugf ¶
func (l BuiltinLimitLogger) Debugf(msg string, params ...interface{})
Debugf debug formatted log
func (BuiltinLimitLogger) IsDebugEnabled ¶
func (l BuiltinLimitLogger) IsDebugEnabled() bool
IsDebugEnabled will return true if debug is enabled. BuiltinLimitLogger is always `true`
func (BuiltinLimitLogger) String ¶
func (l BuiltinLimitLogger) String() string
type FixedLimit ¶
type FixedLimit struct {
// contains filtered or unexported fields
}
FixedLimit is a non dynamic limit with fixed value.
func NewFixedLimit ¶
func NewFixedLimit(name string, limit int, registry core.MetricRegistry, tags ...string) *FixedLimit
NewFixedLimit will return a new FixedLimit
func (*FixedLimit) EstimatedLimit ¶
func (l *FixedLimit) EstimatedLimit() int
EstimatedLimit will return the current limit.
func (*FixedLimit) NotifyOnChange ¶
func (l *FixedLimit) NotifyOnChange(consumer core.LimitChangeListener)
NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
func (*FixedLimit) OnSample ¶
func (l *FixedLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)
OnSample will update the limit with the sample.
func (FixedLimit) String ¶
func (l FixedLimit) String() string
type Gradient2Limit ¶
type Gradient2Limit struct {
// contains filtered or unexported fields
}
Gradient2Limit implements a concurrency limit algorithm that adjust the limits based on the gradient of change current average RTT and a long term exponentially smoothed average RTT. Unlike traditional congestion control algorithms we use average instead of minimum since RPC methods can be very bursty due to various factors such as non-homogenous request processing complexity as well as a wide distribution of data size. We have also found that using minimum can result in an bias towards an impractically low base RTT resulting in excessive load shedding. An exponential decay is applied to the base RTT so that the value is kept stable yet is allowed to adapt to long term changes in latency characteristics.
The core algorithm re-calculates the limit every sampling window (ex. 1 second) using the formula
// Calculate the gradient limiting to the range [0.5, 1.0] to filter outliers gradient = max(0.5, min(1.0, longtermRtt / currentRtt)); // Calculate the new limit by applying the gradient and allowing for some queuing newLimit = gradient * currentLimit + queueSize; // Update the limit using a smoothing factor (default 0.2) newLimit = currentLimit * (1-smoothing) + newLimit * smoothing
The limit can be in one of three main states ¶
- Steady state In this state the average RTT is very stable and the current measurement whipsaws around this value, sometimes reducing the limit, sometimes increasing it.
- Transition from steady state to load In this state either the RPS to latency has spiked. The gradient is < 1.0 due to a growing request queue that cannot be handled by the system. Excessive requests and rejected due to the low limit. The baseline RTT grows using exponential decay but lags the current measurement, which keeps the gradient < 1.0 and limit low.
- Transition from load to steady state In this state the system goes back to steady state after a prolonged period of excessive load. Requests aren't rejected and the sample RTT remains low. During this state the long term RTT may take some time to go back to normal and could potentially be several multiples higher than the current RTT.
func NewDefaultGradient2Limit ¶
func NewDefaultGradient2Limit( name string, logger Logger, registry core.MetricRegistry, tags ...string, ) *Gradient2Limit
NewDefaultGradient2Limit create a default Gradient2Limit
func NewGradient2Limit ¶
func NewGradient2Limit( name string, initialLimit int, maxConurrency int, minLimit int, queueSizeFunc func(limit int) int, smoothing float64, longWindow int, logger Logger, registry core.MetricRegistry, tags ...string, ) (*Gradient2Limit, error)
NewGradient2Limit will create a new Gradient2Limit @param initialLimit: Initial limit used by the limiter. @param maxConcurrency: Maximum allowable concurrency. Any estimated concurrency will be capped. @param minLimit: Minimum concurrency limit allowed. The minimum helps prevent the algorithm from adjust the limit
too far down. Note that this limit is not desirable when use as backpressure for batch apps.
@param queueSizeFunc: Function to dynamically determine the amount the estimated limit can grow while
latencies remain low as a function of the current limit.
@param smoothing: Smoothing factor to limit how aggressively the estimated limit can shrink when queuing has been
detected. Value of 0.0 to 1.0 where 1.0 means the limit is completely replicated by the new estimate.
@param longWindow: long time window for the exponential avg recordings. @param registry: metric registry to publish metrics
func (*Gradient2Limit) EstimatedLimit ¶
func (l *Gradient2Limit) EstimatedLimit() int
EstimatedLimit returns the current estimated limit.
func (*Gradient2Limit) NotifyOnChange ¶
func (l *Gradient2Limit) NotifyOnChange(consumer core.LimitChangeListener)
NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
func (*Gradient2Limit) OnSample ¶
func (l *Gradient2Limit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)
OnSample the concurrency limit using a new rtt sample.
func (*Gradient2Limit) String ¶
func (l *Gradient2Limit) String() string
type GradientLimit ¶
type GradientLimit struct {
// contains filtered or unexported fields
}
GradientLimit implements a concurrency limit algorithm that adjust the limits based on the gradient of change in the samples minimum RTT and absolute minimum RTT allowing for a queue of square root of the current limit. Why square root? Because it's better than a fixed queue size that becomes too small for large limits but still prevents the limit from growing too much by slowing down growth as the limit grows.
func NewGradientLimitWithRegistry ¶
func NewGradientLimitWithRegistry( name string, initialLimit int, minLimit int, maxConcurrency int, smoothing float64, queueSizeFunc func(estimatedLimit int) int, rttTolerance float64, probeInterval int, logger Logger, registry core.MetricRegistry, tags ...string, ) *GradientLimit
NewGradientLimitWithRegistry will create a new GradientLimitWithRegistry.
func (*GradientLimit) EstimatedLimit ¶
func (l *GradientLimit) EstimatedLimit() int
EstimatedLimit returns the current estimated limit.
func (*GradientLimit) NotifyOnChange ¶
func (l *GradientLimit) NotifyOnChange(consumer core.LimitChangeListener)
NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
func (*GradientLimit) OnSample ¶
func (l *GradientLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)
OnSample the concurrency limit using a new rtt sample.
func (*GradientLimit) RTTNoLoad ¶
func (l *GradientLimit) RTTNoLoad() int64
RTTNoLoad returns the current RTT No Load value.
func (*GradientLimit) String ¶
func (l *GradientLimit) String() string
type Logger ¶
type Logger interface { // Log a Debug statement already formatted. Debugf(msg string, params ...interface{}) // Check if debug is enabled IsDebugEnabled() bool }
Logger implements a basic dependency to log. Feel free to report stats as well.
type NoopLimitLogger ¶
type NoopLimitLogger struct{}
NoopLimitLogger implements a NO-OP logger, it does nothing.
func (NoopLimitLogger) Debugf ¶
func (l NoopLimitLogger) Debugf(msg string, params ...interface{})
Debugf debug formatted log
func (NoopLimitLogger) IsDebugEnabled ¶
func (l NoopLimitLogger) IsDebugEnabled() bool
IsDebugEnabled will return true if debug is enabled. NoopLimitLogger is always `false`
func (NoopLimitLogger) String ¶
func (l NoopLimitLogger) String() string
type SettableLimit ¶
type SettableLimit struct {
// contains filtered or unexported fields
}
SettableLimit is a fixed limit that can be changed. Note: to be used mostly for testing where the limit can be manually adjusted.
func NewSettableLimit ¶
func NewSettableLimit(name string, limit int, registry core.MetricRegistry, tags ...string) *SettableLimit
NewSettableLimit will create a new SettableLimit.
func (*SettableLimit) EstimatedLimit ¶
func (l *SettableLimit) EstimatedLimit() int
EstimatedLimit will return the estimated limit.
func (*SettableLimit) NotifyOnChange ¶
func (l *SettableLimit) NotifyOnChange(consumer core.LimitChangeListener)
NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
func (*SettableLimit) OnSample ¶
func (l *SettableLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)
OnSample will update the limit with the given sample.
func (*SettableLimit) SetLimit ¶
func (l *SettableLimit) SetLimit(limit int)
SetLimit will update the current limit.
func (*SettableLimit) String ¶
func (l *SettableLimit) String() string
type TracedLimit ¶
type TracedLimit struct {
// contains filtered or unexported fields
}
TracedLimit implements core.Limit but adds some additional logging
func NewTracedLimit ¶
func NewTracedLimit(limit core.Limit, logger Logger) *TracedLimit
NewTracedLimit returns a new wrapped Limit with TracedLimit.
func (*TracedLimit) EstimatedLimit ¶
func (l *TracedLimit) EstimatedLimit() int
EstimatedLimit returns the estimated limit.
func (*TracedLimit) NotifyOnChange ¶
func (l *TracedLimit) NotifyOnChange(consumer core.LimitChangeListener)
NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
func (*TracedLimit) OnSample ¶
func (l *TracedLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)
OnSample will log and deleate the update of the sample.
func (*TracedLimit) String ¶
func (l *TracedLimit) String() string
type VegasLimit ¶
type VegasLimit struct {
// contains filtered or unexported fields
}
VegasLimit implements a Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is small < alpha and decreases by alpha if the queue_use is large > beta.
Queue size is calculated using the formula,
queue_use = limit − BWE×RTTnoLoad = limit × (1 − RTTnoLoad/RTTactual)
For traditional TCP Vegas alpha is typically 2-3 and beta is typically 4-6. To allow for better growth and stability at higher limits we set alpha=Max(3, 10% of the current limit) and beta=Max(6, 20% of the current limit).
func NewDefaultVegasLimit ¶
func NewDefaultVegasLimit( name string, logger Logger, registry core.MetricRegistry, tags ...string, ) *VegasLimit
NewDefaultVegasLimit returns a new default VegasLimit.
func NewDefaultVegasLimitWithLimit ¶
func NewDefaultVegasLimitWithLimit( name string, initialLimit int, logger Logger, registry core.MetricRegistry, tags ...string, ) *VegasLimit
NewDefaultVegasLimitWithLimit creates a new VegasLimit.
func NewVegasLimitWithRegistry ¶
func NewVegasLimitWithRegistry( name string, initialLimit int, rttNoLoad core.MeasurementInterface, maxConcurrency int, smoothing float64, alphaFunc func(estimatedLimit int) int, betaFunc func(estimatedLimit int) int, thresholdFunc func(estimatedLimit int) int, increaseFunc func(estimatedLimit float64) float64, decreaseFunc func(estimatedLimit float64) float64, probeMultiplier int, logger Logger, registry core.MetricRegistry, tags ...string, ) *VegasLimit
NewVegasLimitWithRegistry will create a new VegasLimit.
func (*VegasLimit) EstimatedLimit ¶
func (l *VegasLimit) EstimatedLimit() int
EstimatedLimit returns the current estimated limit.
func (*VegasLimit) NotifyOnChange ¶
func (l *VegasLimit) NotifyOnChange(consumer core.LimitChangeListener)
NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
func (*VegasLimit) OnSample ¶
func (l *VegasLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)
OnSample the concurrency limit using a new rtt sample.
func (*VegasLimit) RTTNoLoad ¶
func (l *VegasLimit) RTTNoLoad() int64
RTTNoLoad returns the current RTT No Load value.
func (*VegasLimit) String ¶
func (l *VegasLimit) String() string
type WindowedLimit ¶
type WindowedLimit struct {
// contains filtered or unexported fields
}
WindowedLimit implements a windowed limit
func NewDefaultWindowedLimit ¶
func NewDefaultWindowedLimit( name string, delegate core.Limit, registry core.MetricRegistry, tags ...string, ) *WindowedLimit
NewDefaultWindowedLimit will create a new default WindowedLimit
func NewWindowedLimit ¶
func NewWindowedLimit( name string, minWindowTime int64, maxWindowTime int64, windowSize int32, minRTTThreshold int64, delegate core.Limit, registry core.MetricRegistry, tags ...string, ) (*WindowedLimit, error)
NewWindowedLimit will create a new WindowedLimit
func (*WindowedLimit) EstimatedLimit ¶
func (l *WindowedLimit) EstimatedLimit() int
EstimatedLimit returns the current estimated limit.
func (*WindowedLimit) NotifyOnChange ¶
func (l *WindowedLimit) NotifyOnChange(consumer core.LimitChangeListener)
NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.
func (*WindowedLimit) OnSample ¶
func (l *WindowedLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)
OnSample the concurrency limit using a new rtt sample.
func (*WindowedLimit) String ¶
func (l *WindowedLimit) String() string