Documentation ¶
Index ¶
- Constants
- Variables
- type AdaptiveCalculator
- type AdaptiveLimit
- type AdaptiveLimiter
- type AdaptiveSetting
- type BackoffEvent
- type ConcurrencyLimiter
- type ConcurrencyMonitor
- type LimitedFunc
- type Limiter
- type PromMonitor
- func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)
- func (p *PromMonitor) Dequeued(ctx context.Context)
- func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)
- func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, ...)
- func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration)
- func (p *PromMonitor) Exit(ctx context.Context)
- func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int)
- type QueueTickerCreator
- type RateLimiter
- type ResourceWatcher
Constants ¶
const ( // TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All // requests of the same method shares the concurrency limit. TypePerRPC = "per-rpc" // TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request // information (RemoteIP/Repository/User) as the limiting key. TypePackObjects = "pack-objects" )
const ( // MaximumWatcherTimeout is the number of maximum allowed timeout when polling backoff events from watchers. // When this threshold is reached, a timeout polling is treated as a backoff event. MaximumWatcherTimeout = 5 )
Variables ¶
var ErrMaxQueueSize = errors.New("maximum queue size reached")
ErrMaxQueueSize indicates the concurrency queue has reached its maximum size
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")
ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the concurrency queue.
var ErrRateLimit = errors.New("rate limit reached")
ErrRateLimit is returned when RateLimiter determined a request has breached the rate request limit.
Functions ¶
This section is empty.
Types ¶
type AdaptiveCalculator ¶ added in v16.3.0
AdaptiveCalculator is responsible for calculating the adaptive limits based on additive increase/multiplicative decrease (AIMD) algorithm. This method involves gradually increasing the limit during normal process functioning but quickly reducing it when an issue (backoff event) occurs. It receives a list of AdaptiveLimiter and a list of ResourceWatcher. Although the limits may have different settings (Initial, Min, Max, BackoffFactor), they all move as a whole. The caller accesses the current limits via AdaptiveLimiter.Current method.
When the calculator starts, each limit value is set to its Initial limit. Periodically, the calculator polls the backoff events from the watchers. The current value of each limit is re-calibrated as follows: * limit = limit + 1 if there is no backoff event since the last calibration. The new limit cannot exceed max limit. * limit = limit * BackoffFactor otherwise. The new limit cannot be lower than min limit.
A watcher returning an error is treated as a no backoff event.
func NewAdaptiveCalculator ¶ added in v16.3.0
func NewAdaptiveCalculator(calibration time.Duration, logger *logrus.Entry, limits []AdaptiveLimiter, watchers []ResourceWatcher) *AdaptiveCalculator
NewAdaptiveCalculator constructs a AdaptiveCalculator object. It's the responsibility of the caller to validate the correctness of input AdaptiveLimiter and ResourceWatcher.
func (*AdaptiveCalculator) Collect ¶ added in v16.3.0
func (c *AdaptiveCalculator) Collect(metrics chan<- prometheus.Metric)
Collect is used to collect Prometheus metrics.
func (*AdaptiveCalculator) Describe ¶ added in v16.3.0
func (c *AdaptiveCalculator) Describe(descs chan<- *prometheus.Desc)
Describe is used to describe Prometheus metrics.
type AdaptiveLimit ¶ added in v16.3.0
type AdaptiveLimit struct {
// contains filtered or unexported fields
}
AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses an atomic Int32 to represent the current limit value, ensuring thread-safe updates.
func (*AdaptiveLimit) Current ¶ added in v16.3.0
func (l *AdaptiveLimit) Current() int
Current returns the current limit. This function can be called without the need for synchronization.
func (*AdaptiveLimit) Name ¶ added in v16.3.0
func (l *AdaptiveLimit) Name() string
Name returns the name of the adaptive limit
func (*AdaptiveLimit) Setting ¶ added in v16.3.0
func (l *AdaptiveLimit) Setting() AdaptiveSetting
Setting returns the configuration parameters for an adaptive limiter.
func (*AdaptiveLimit) Update ¶ added in v16.3.0
func (l *AdaptiveLimit) Update(val int)
Update adjusts current limit value.
type AdaptiveLimiter ¶ added in v16.3.0
type AdaptiveLimiter interface { Name() string Current() int Update(val int) Setting() AdaptiveSetting }
AdaptiveLimiter is an interface for managing and updating adaptive limits. It exposes methods to get the name, current limit value, update the limit value, and access its settings.
type AdaptiveSetting ¶ added in v16.3.0
AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter.
type BackoffEvent ¶ added in v16.3.0
BackoffEvent is a signal that the current system is under pressure. It's returned by the watchers under the management of the AdaptiveCalculator at calibration points.
type ConcurrencyLimiter ¶
type ConcurrencyLimiter struct {
// contains filtered or unexported fields
}
ConcurrencyLimiter contains rate limiter state.
func NewConcurrencyLimiter ¶
func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter
NewConcurrencyLimiter creates a new concurrency rate limiter.
func (*ConcurrencyLimiter) Limit ¶
func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error)
Limit will limit the concurrency of the limited function f. There are two distinct mechanisms that limit execution of the function:
- First, every call will enter the per-key queue. This queue limits how many callers may try to acquire their per-key semaphore at the same time. If the queue is full the caller will be rejected.
- Second, when the caller has successfully entered the queue, they try to acquire their per-key semaphore. If this takes longer than the maximum queueing limit then the caller will be dequeued and gets an error.
type ConcurrencyMonitor ¶
type ConcurrencyMonitor interface { Queued(ctx context.Context, key string, length int) Dequeued(ctx context.Context) Enter(ctx context.Context, acquireTime time.Duration) Exit(ctx context.Context) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, message string) }
ConcurrencyMonitor allows the concurrency monitor to be observed.
func NewNoopConcurrencyMonitor ¶
func NewNoopConcurrencyMonitor() ConcurrencyMonitor
NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor
type LimitedFunc ¶
type LimitedFunc func() (resp interface{}, err error)
LimitedFunc represents a function that will be limited
type Limiter ¶
type Limiter interface {
Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
}
Limiter limits incoming requests
type PromMonitor ¶
type PromMonitor struct {
// contains filtered or unexported fields
}
PromMonitor keeps track of prometheus metrics for limiters. It conforms to both the ConcurrencyMonitor, and prometheus.Collector interfaces.
func NewPackObjectsConcurrencyMonitor ¶
func NewPackObjectsConcurrencyMonitor(latencyBuckets []float64) *PromMonitor
NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use with limiting pack objects processes.
func NewPerRPCPromMonitor ¶
func NewPerRPCPromMonitor( system, fullMethod string, queuedMetric, inProgressMetric *prometheus.GaugeVec, acquiringSecondsVec *prometheus.HistogramVec, requestsDroppedMetric *prometheus.CounterVec, ) *PromMonitor
NewPerRPCPromMonitor creates a new ConcurrencyMonitor that tracks limiter activity in Prometheus.
func (*PromMonitor) Collect ¶
func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)
Collect collects all the metrics that PromMonitor keeps track of.
func (*PromMonitor) Dequeued ¶
func (p *PromMonitor) Dequeued(ctx context.Context)
Dequeued is called when a request has been dequeued.
func (*PromMonitor) Describe ¶
func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)
Describe describes all the metrics that PromMonitor keeps track of.
func (*PromMonitor) Dropped ¶
func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string)
Dropped is called when a request is dropped.
func (*PromMonitor) Enter ¶
func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration)
Enter is called when a request begins to be processed.
func (*PromMonitor) Exit ¶
func (p *PromMonitor) Exit(ctx context.Context)
Exit is called when a request has finished processing.
type QueueTickerCreator ¶
QueueTickerCreator is a function that provides a ticker
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter is an implementation of Limiter that puts a hard limit on the number of requests per second
func NewRateLimiter ¶
func NewRateLimiter( refillInterval time.Duration, burst int, ticker helper.Ticker, requestsDroppedMetric prometheus.Counter, ) *RateLimiter
NewRateLimiter creates a new instance of RateLimiter
func (*RateLimiter) Limit ¶
func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
Limit rejects an incoming reequest if the maximum number of requests per second has been reached
func (*RateLimiter) PruneUnusedLimiters ¶
func (r *RateLimiter) PruneUnusedLimiters(ctx context.Context)
PruneUnusedLimiters enters an infinite loop to periodically check if any limiters can be cleaned up. This is meant to be called in a separate goroutine.
type ResourceWatcher ¶ added in v16.3.0
type ResourceWatcher interface { // Name returns the name of the resource watcher Name() string // Poll returns a backoff event when a watcher determine something goes wrong with the resource it is // monitoring. If everything is fine, it returns `nil`. Watchers are expected to respect the cancellation of // the input context. Poll(context.Context) (*BackoffEvent, error) }
ResourceWatcher is an interface of the watchers that monitor the system resources.