limit

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 12, 2024 License: Apache-2.0 Imports: 10 Imported by: 20

Documentation

Overview

Package limit provides several useful limit implementations.

Index

Constants

View Source
const ProbeDisabled = -1

ProbeDisabled represents the disabled value for probing.

Variables

This section is empty.

Functions

This section is empty.

Types

type AIMDLimit

type AIMDLimit struct {
	// contains filtered or unexported fields
}

AIMDLimit implements a Loss based dynamic Limit that does an additive increment as long as there are no errors and a multiplicative decrement when there is an error.

func NewAIMDLimit

func NewAIMDLimit(
	name string,
	initialLimit int,
	backOffRatio float64,
	increaseBy int,
	registry core.MetricRegistry,
	tags ...string,
) *AIMDLimit

NewAIMDLimit will create a new AIMDLimit.

func NewDefaultAIMDLimit added in v0.6.0

func NewDefaultAIMDLimit(
	name string,
	registry core.MetricRegistry,
	tags ...string,
) *AIMDLimit

NewDefaultAIMDLimit will create a default AIMDLimit.

func (*AIMDLimit) BackOffRatio

func (l *AIMDLimit) BackOffRatio() float64

BackOffRatio return the current back-off-ratio for the AIMDLimit

func (*AIMDLimit) EstimatedLimit

func (l *AIMDLimit) EstimatedLimit() int

EstimatedLimit returns the current estimated limit.

func (*AIMDLimit) NotifyOnChange

func (l *AIMDLimit) NotifyOnChange(consumer core.LimitChangeListener)

NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.

func (*AIMDLimit) OnSample

func (l *AIMDLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)

OnSample the concurrency limit using a new rtt sample.

func (*AIMDLimit) String

func (l *AIMDLimit) String() string

type BuiltinLimitLogger

type BuiltinLimitLogger struct{}

BuiltinLimitLogger implements a STDOUT limit logger.

func (BuiltinLimitLogger) Debugf

func (l BuiltinLimitLogger) Debugf(msg string, params ...interface{})

Debugf debug formatted log

func (BuiltinLimitLogger) IsDebugEnabled

func (l BuiltinLimitLogger) IsDebugEnabled() bool

IsDebugEnabled will return true if debug is enabled. BuiltinLimitLogger is always `true`

func (BuiltinLimitLogger) String

func (l BuiltinLimitLogger) String() string

type FixedLimit

type FixedLimit struct {
	// contains filtered or unexported fields
}

FixedLimit is a non dynamic limit with fixed value.

func NewFixedLimit

func NewFixedLimit(name string, limit int, registry core.MetricRegistry, tags ...string) *FixedLimit

NewFixedLimit will return a new FixedLimit

func (*FixedLimit) EstimatedLimit

func (l *FixedLimit) EstimatedLimit() int

EstimatedLimit will return the current limit.

func (*FixedLimit) NotifyOnChange

func (l *FixedLimit) NotifyOnChange(consumer core.LimitChangeListener)

NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.

func (*FixedLimit) OnSample

func (l *FixedLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)

OnSample will update the limit with the sample.

func (FixedLimit) String

func (l FixedLimit) String() string

type Gradient2Limit

type Gradient2Limit struct {
	// contains filtered or unexported fields
}

Gradient2Limit implements a concurrency limit algorithm that adjust the limits based on the gradient of change current average RTT and a long term exponentially smoothed average RTT. Unlike traditional congestion control algorithms we use average instead of minimum since RPC methods can be very bursty due to various factors such as non-homogenous request processing complexity as well as a wide distribution of data size. We have also found that using minimum can result in an bias towards an impractically low base RTT resulting in excessive load shedding. An exponential decay is applied to the base RTT so that the value is kept stable yet is allowed to adapt to long term changes in latency characteristics.

The core algorithm re-calculates the limit every sampling window (ex. 1 second) using the formula

// Calculate the gradient limiting to the range [0.5, 1.0] to filter outliers
gradient = max(0.5, min(1.0, longtermRtt / currentRtt));

// Calculate the new limit by applying the gradient and allowing for some queuing
newLimit = gradient * currentLimit + queueSize;

// Update the limit using a smoothing factor (default 0.2)
newLimit = currentLimit * (1-smoothing) + newLimit * smoothing

The limit can be in one of three main states

  1. Steady state In this state the average RTT is very stable and the current measurement whipsaws around this value, sometimes reducing the limit, sometimes increasing it.
  2. Transition from steady state to load In this state either the RPS to latency has spiked. The gradient is < 1.0 due to a growing request queue that cannot be handled by the system. Excessive requests and rejected due to the low limit. The baseline RTT grows using exponential decay but lags the current measurement, which keeps the gradient < 1.0 and limit low.
  3. Transition from load to steady state In this state the system goes back to steady state after a prolonged period of excessive load. Requests aren't rejected and the sample RTT remains low. During this state the long term RTT may take some time to go back to normal and could potentially be several multiples higher than the current RTT.

func NewDefaultGradient2Limit

func NewDefaultGradient2Limit(
	name string,
	logger Logger,
	registry core.MetricRegistry,
	tags ...string,
) *Gradient2Limit

NewDefaultGradient2Limit create a default Gradient2Limit

func NewGradient2Limit

func NewGradient2Limit(
	name string,
	initialLimit int,
	maxConurrency int,
	minLimit int,
	queueSizeFunc func(limit int) int,
	smoothing float64,
	longWindow int,
	logger Logger,
	registry core.MetricRegistry,
	tags ...string,
) (*Gradient2Limit, error)

NewGradient2Limit will create a new Gradient2Limit @param initialLimit: Initial limit used by the limiter. @param maxConcurrency: Maximum allowable concurrency. Any estimated concurrency will be capped. @param minLimit: Minimum concurrency limit allowed. The minimum helps prevent the algorithm from adjust the limit

too far down.  Note that this limit is not desirable when use as backpressure for batch apps.

@param queueSizeFunc: Function to dynamically determine the amount the estimated limit can grow while

latencies remain low as a function of the current limit.

@param smoothing: Smoothing factor to limit how aggressively the estimated limit can shrink when queuing has been

detected.  Value of 0.0 to 1.0 where 1.0 means the limit is completely replicated by the new estimate.

@param longWindow: long time window for the exponential avg recordings. @param registry: metric registry to publish metrics

func (*Gradient2Limit) EstimatedLimit

func (l *Gradient2Limit) EstimatedLimit() int

EstimatedLimit returns the current estimated limit.

func (*Gradient2Limit) NotifyOnChange

func (l *Gradient2Limit) NotifyOnChange(consumer core.LimitChangeListener)

NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.

func (*Gradient2Limit) OnSample

func (l *Gradient2Limit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)

OnSample the concurrency limit using a new rtt sample.

func (*Gradient2Limit) String

func (l *Gradient2Limit) String() string

type GradientLimit

type GradientLimit struct {
	// contains filtered or unexported fields
}

GradientLimit implements a concurrency limit algorithm that adjust the limits based on the gradient of change in the samples minimum RTT and absolute minimum RTT allowing for a queue of square root of the current limit. Why square root? Because it's better than a fixed queue size that becomes too small for large limits but still prevents the limit from growing too much by slowing down growth as the limit grows.

func NewGradientLimitWithRegistry

func NewGradientLimitWithRegistry(
	name string,
	initialLimit int,
	minLimit int,
	maxConcurrency int,
	smoothing float64,
	queueSizeFunc func(estimatedLimit int) int,
	rttTolerance float64,
	probeInterval int,
	logger Logger,
	registry core.MetricRegistry,
	tags ...string,
) *GradientLimit

NewGradientLimitWithRegistry will create a new GradientLimitWithRegistry.

func (*GradientLimit) EstimatedLimit

func (l *GradientLimit) EstimatedLimit() int

EstimatedLimit returns the current estimated limit.

func (*GradientLimit) NotifyOnChange

func (l *GradientLimit) NotifyOnChange(consumer core.LimitChangeListener)

NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.

func (*GradientLimit) OnSample

func (l *GradientLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)

OnSample the concurrency limit using a new rtt sample.

func (*GradientLimit) RTTNoLoad

func (l *GradientLimit) RTTNoLoad() int64

RTTNoLoad returns the current RTT No Load value.

func (*GradientLimit) String

func (l *GradientLimit) String() string

type Logger

type Logger interface {
	// Log a Debug statement already formatted.
	Debugf(msg string, params ...interface{})
	// Check if debug is enabled
	IsDebugEnabled() bool
}

Logger implements a basic dependency to log. Feel free to report stats as well.

type NoopLimitLogger

type NoopLimitLogger struct{}

NoopLimitLogger implements a NO-OP logger, it does nothing.

func (NoopLimitLogger) Debugf

func (l NoopLimitLogger) Debugf(msg string, params ...interface{})

Debugf debug formatted log

func (NoopLimitLogger) IsDebugEnabled

func (l NoopLimitLogger) IsDebugEnabled() bool

IsDebugEnabled will return true if debug is enabled. NoopLimitLogger is always `false`

func (NoopLimitLogger) String

func (l NoopLimitLogger) String() string

type SettableLimit

type SettableLimit struct {
	// contains filtered or unexported fields
}

SettableLimit is a fixed limit that can be changed. Note: to be used mostly for testing where the limit can be manually adjusted.

func NewSettableLimit

func NewSettableLimit(name string, limit int, registry core.MetricRegistry, tags ...string) *SettableLimit

NewSettableLimit will create a new SettableLimit.

func (*SettableLimit) EstimatedLimit

func (l *SettableLimit) EstimatedLimit() int

EstimatedLimit will return the estimated limit.

func (*SettableLimit) NotifyOnChange

func (l *SettableLimit) NotifyOnChange(consumer core.LimitChangeListener)

NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.

func (*SettableLimit) OnSample

func (l *SettableLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)

OnSample will update the limit with the given sample.

func (*SettableLimit) SetLimit

func (l *SettableLimit) SetLimit(limit int)

SetLimit will update the current limit.

func (*SettableLimit) String

func (l *SettableLimit) String() string

type TracedLimit

type TracedLimit struct {
	// contains filtered or unexported fields
}

TracedLimit implements core.Limit but adds some additional logging

func NewTracedLimit

func NewTracedLimit(limit core.Limit, logger Logger) *TracedLimit

NewTracedLimit returns a new wrapped Limit with TracedLimit.

func (*TracedLimit) EstimatedLimit

func (l *TracedLimit) EstimatedLimit() int

EstimatedLimit returns the estimated limit.

func (*TracedLimit) NotifyOnChange

func (l *TracedLimit) NotifyOnChange(consumer core.LimitChangeListener)

NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.

func (*TracedLimit) OnSample

func (l *TracedLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)

OnSample will log and deleate the update of the sample.

func (*TracedLimit) String

func (l *TracedLimit) String() string

type VegasLimit

type VegasLimit struct {
	// contains filtered or unexported fields
}

VegasLimit implements a Limiter based on TCP Vegas where the limit increases by alpha if the queue_use is small < alpha and decreases by alpha if the queue_use is large > beta.

Queue size is calculated using the formula,

queue_use = limit − BWE×RTTnoLoad = limit × (1 − RTTnoLoad/RTTactual)

For traditional TCP Vegas alpha is typically 2-3 and beta is typically 4-6. To allow for better growth and stability at higher limits we set alpha=Max(3, 10% of the current limit) and beta=Max(6, 20% of the current limit).

func NewDefaultVegasLimit

func NewDefaultVegasLimit(
	name string,
	logger Logger,
	registry core.MetricRegistry,
	tags ...string,
) *VegasLimit

NewDefaultVegasLimit returns a new default VegasLimit.

func NewDefaultVegasLimitWithLimit

func NewDefaultVegasLimitWithLimit(
	name string,
	initialLimit int,
	logger Logger,
	registry core.MetricRegistry,
	tags ...string,
) *VegasLimit

NewDefaultVegasLimitWithLimit creates a new VegasLimit.

func NewVegasLimitWithRegistry

func NewVegasLimitWithRegistry(
	name string,
	initialLimit int,
	rttNoLoad core.MeasurementInterface,
	maxConcurrency int,
	smoothing float64,
	alphaFunc func(estimatedLimit int) int,
	betaFunc func(estimatedLimit int) int,
	thresholdFunc func(estimatedLimit int) int,
	increaseFunc func(estimatedLimit float64) float64,
	decreaseFunc func(estimatedLimit float64) float64,
	probeMultiplier int,
	logger Logger,
	registry core.MetricRegistry,
	tags ...string,
) *VegasLimit

NewVegasLimitWithRegistry will create a new VegasLimit.

func (*VegasLimit) EstimatedLimit

func (l *VegasLimit) EstimatedLimit() int

EstimatedLimit returns the current estimated limit.

func (*VegasLimit) NotifyOnChange

func (l *VegasLimit) NotifyOnChange(consumer core.LimitChangeListener)

NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.

func (*VegasLimit) OnSample

func (l *VegasLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)

OnSample the concurrency limit using a new rtt sample.

func (*VegasLimit) RTTNoLoad

func (l *VegasLimit) RTTNoLoad() int64

RTTNoLoad returns the current RTT No Load value.

func (*VegasLimit) String

func (l *VegasLimit) String() string

type WindowedLimit

type WindowedLimit struct {
	// contains filtered or unexported fields
}

WindowedLimit implements a windowed limit

func NewDefaultWindowedLimit

func NewDefaultWindowedLimit(
	name string,
	delegate core.Limit,
	registry core.MetricRegistry,
	tags ...string,
) *WindowedLimit

NewDefaultWindowedLimit will create a new default WindowedLimit

func NewWindowedLimit

func NewWindowedLimit(
	name string,
	minWindowTime int64,
	maxWindowTime int64,
	windowSize int32,
	minRTTThreshold int64,
	delegate core.Limit,
	registry core.MetricRegistry,
	tags ...string,
) (*WindowedLimit, error)

NewWindowedLimit will create a new WindowedLimit

func (*WindowedLimit) EstimatedLimit

func (l *WindowedLimit) EstimatedLimit() int

EstimatedLimit returns the current estimated limit.

func (*WindowedLimit) NotifyOnChange

func (l *WindowedLimit) NotifyOnChange(consumer core.LimitChangeListener)

NotifyOnChange will register a callback to receive notification whenever the limit is updated to a new value.

func (*WindowedLimit) OnSample

func (l *WindowedLimit) OnSample(startTime int64, rtt int64, inFlight int, didDrop bool)

OnSample the concurrency limit using a new rtt sample.

func (*WindowedLimit) String

func (l *WindowedLimit) String() string

Directories

Path Synopsis
Package functions provides additional helper functions to the limit package.
Package functions provides additional helper functions to the limit package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL