Documentation ¶
Index ¶
- Constants
- Variables
- func NewResizableSemaphore(size uint) *resizableSemaphore
- type AdaptiveCalculator
- type AdaptiveLimit
- type AdaptiveLimiter
- type AdaptiveSetting
- type AfterUpdateHook
- type BackoffEvent
- type ConcurrencyLimiter
- type ConcurrencyMonitor
- type LimitedFunc
- type Limiter
- type PromMonitor
- func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)
- func (p *PromMonitor) Dequeued(ctx context.Context)
- func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)
- func (p *PromMonitor) Dropped(ctx context.Context, key string, queueLength int, inProgress int, ...)
- func (p *PromMonitor) Enter(ctx context.Context, inProgress int, acquireTime time.Duration)
- func (p *PromMonitor) Exit(ctx context.Context)
- func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int)
- type QueueTickerCreator
- type RateLimiter
- type ResourceWatcher
Constants ¶
const ( // MaximumWatcherTimeout is the number of maximum allowed timeout when polling backoff events from watchers. // When this threshold is reached, a timeout polling is treated as a backoff event. MaximumWatcherTimeout = 5 // DefaultCalibrateFrequency is the default time period between two calibrations. DefaultCalibrateFrequency = 30 * time.Second // DefaultBackoffFactor is the default recommended backoff factor when the concurrency decreases. By default, // the factor is 0.5, meaning the limit is cut off by half when a backoff event occurs. DefaultBackoffFactor = 0.5 )
const ( // TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All // requests of the same method shares the concurrency limit. TypePerRPC = "per-rpc" // TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request // information (RemoteIP/Repository/User) as the limiting key. TypePackObjects = "pack-objects" )
Variables ¶
var ErrMaxQueueSize = errors.New("maximum queue size reached")
ErrMaxQueueSize indicates the concurrency queue has reached its maximum size
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")
ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the concurrency queue.
var ErrRateLimit = errors.New("rate limit reached")
ErrRateLimit is returned when RateLimiter determined a request has breached the rate request limit.
Functions ¶
func NewResizableSemaphore ¶ added in v16.4.0
func NewResizableSemaphore(size uint) *resizableSemaphore
NewResizableSemaphore creates a new resizableSemaphore with the specified initial size.
Types ¶
type AdaptiveCalculator ¶ added in v16.3.0
AdaptiveCalculator is responsible for calculating the adaptive limits based on additive increase/multiplicative decrease (AIMD) algorithm. This method involves gradually increasing the limit during normal process functioning but quickly reducing it when an issue (backoff event) occurs. It receives a list of AdaptiveLimiter and a list of ResourceWatcher. Although the limits may have different settings (Initial, Min, Max, BackoffFactor), they all move as a whole. The caller accesses the current limits via AdaptiveLimiter.Current method.
When the calculator starts, each limit value is set to its Initial limit. Periodically, the calculator polls the backoff events from the watchers. The current value of each limit is re-calibrated as follows: * limit = limit + 1 if there is no backoff event since the last calibration. The new limit cannot exceed max limit. * limit = limit * BackoffFactor otherwise. The new limit cannot be lower than min limit.
A watcher returning an error is treated as a no backoff event.
func NewAdaptiveCalculator ¶ added in v16.3.0
func NewAdaptiveCalculator(calibration time.Duration, logger log.Logger, limits []AdaptiveLimiter, watchers []ResourceWatcher) *AdaptiveCalculator
NewAdaptiveCalculator constructs a AdaptiveCalculator object. It's the responsibility of the caller to validate the correctness of input AdaptiveLimiter and ResourceWatcher.
func (*AdaptiveCalculator) Collect ¶ added in v16.3.0
func (c *AdaptiveCalculator) Collect(metrics chan<- prometheus.Metric)
Collect is used to collect Prometheus metrics.
func (*AdaptiveCalculator) Describe ¶ added in v16.3.0
func (c *AdaptiveCalculator) Describe(descs chan<- *prometheus.Desc)
Describe is used to describe Prometheus metrics.
type AdaptiveLimit ¶ added in v16.3.0
AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses a mutex to ensure thread-safe access to the limit value.
func NewAdaptiveLimit ¶ added in v16.4.0
func NewAdaptiveLimit(name string, setting AdaptiveSetting) *AdaptiveLimit
NewAdaptiveLimit initializes a new AdaptiveLimit object
func (*AdaptiveLimit) AfterUpdate ¶ added in v16.4.0
func (l *AdaptiveLimit) AfterUpdate(hook AfterUpdateHook)
AfterUpdate registers a callback when the current limit is updated. Because all updates and hooks are synchronized, calling l.Current() inside the update hook in the same goroutine will cause deadlock. Hence, the update hook must use the newVal argument instead.
func (*AdaptiveLimit) Current ¶ added in v16.3.0
func (l *AdaptiveLimit) Current() int
Current returns the current limit. This function can be called without the need for synchronization.
func (*AdaptiveLimit) Initial ¶ added in v16.5.0
func (l *AdaptiveLimit) Initial() int
Initial returns the initial limit.
func (*AdaptiveLimit) Name ¶ added in v16.3.0
func (l *AdaptiveLimit) Name() string
Name returns the name of the adaptive limit
func (*AdaptiveLimit) Setting ¶ added in v16.3.0
func (l *AdaptiveLimit) Setting() AdaptiveSetting
Setting returns the configuration parameters for an adaptive limiter.
func (*AdaptiveLimit) Update ¶ added in v16.3.0
func (l *AdaptiveLimit) Update(val int)
Update adjusts the current limit value and executes all registered update hooks.
type AdaptiveLimiter ¶ added in v16.3.0
type AdaptiveLimiter interface { Name() string Current() int Update(val int) AfterUpdate(AfterUpdateHook) Setting() AdaptiveSetting }
AdaptiveLimiter is an interface for managing and updating adaptive limits. It exposes methods to get the name, current limit value, update the limit value, and access its settings.
type AdaptiveSetting ¶ added in v16.3.0
AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter.
type AfterUpdateHook ¶ added in v16.4.0
type AfterUpdateHook func(newVal int)
AfterUpdateHook is a function hook that is triggered when the current limit changes. The callers need to register a hook to the AdaptiveLimiter implementation beforehand. They are required to handle errors inside the hook function.
type BackoffEvent ¶ added in v16.3.0
type BackoffEvent struct { WatcherName string ShouldBackoff bool Reason string Stats map[string]any }
BackoffEvent is a signal that the current system is under pressure. It's returned by the watchers under the management of the AdaptiveCalculator at calibration points.
type ConcurrencyLimiter ¶
type ConcurrencyLimiter struct { // SetWaitTimeoutContext is a function for setting up timeout context. If this is nill, context.WithTimeout is // used. This function is for internal testing purpose only. SetWaitTimeoutContext func() context.Context // contains filtered or unexported fields }
ConcurrencyLimiter contains rate limiter state.
func NewConcurrencyLimiter ¶
func NewConcurrencyLimiter(limit *AdaptiveLimit, maxQueueLength int, maxQueueWait time.Duration, monitor ConcurrencyMonitor) *ConcurrencyLimiter
NewConcurrencyLimiter creates a new concurrency rate limiter.
func (*ConcurrencyLimiter) Limit ¶
func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error)
Limit will limit the concurrency of the limited function f. There are two distinct mechanisms that limit execution of the function:
- First, every call will enter the per-key queue. This queue limits how many callers may try to acquire their per-key semaphore at the same time. If the queue is full the caller will be rejected.
- Second, when the caller has successfully entered the queue, they try to acquire their per-key semaphore. If this takes longer than the maximum queueing limit then the caller will be dequeued and gets an error.
type ConcurrencyMonitor ¶
type ConcurrencyMonitor interface { Queued(ctx context.Context, key string, length int) Dequeued(ctx context.Context) Enter(ctx context.Context, inProgress int, acquireTime time.Duration) Exit(ctx context.Context) Dropped(ctx context.Context, key string, queueLength int, inProgress int, acquireTime time.Duration, message string) }
ConcurrencyMonitor allows the concurrency monitor to be observed.
func NewNoopConcurrencyMonitor ¶
func NewNoopConcurrencyMonitor() ConcurrencyMonitor
NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor
type LimitedFunc ¶
type LimitedFunc func() (resp interface{}, err error)
LimitedFunc represents a function that will be limited
type Limiter ¶
type Limiter interface {
Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
}
Limiter limits incoming requests
type PromMonitor ¶
type PromMonitor struct {
// contains filtered or unexported fields
}
PromMonitor keeps track of prometheus metrics for limiters. It conforms to both the ConcurrencyMonitor, and prometheus.Collector interfaces.
func NewPackObjectsConcurrencyMonitor ¶
func NewPackObjectsConcurrencyMonitor(latencyBuckets []float64) *PromMonitor
NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use with limiting pack objects processes.
func NewPerRPCPromMonitor ¶
func NewPerRPCPromMonitor( system, fullMethod string, queuedMetric, inProgressMetric *prometheus.GaugeVec, acquiringSecondsVec *prometheus.HistogramVec, requestsDroppedMetric *prometheus.CounterVec, ) *PromMonitor
NewPerRPCPromMonitor creates a new ConcurrencyMonitor that tracks limiter activity in Prometheus.
func (*PromMonitor) Collect ¶
func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)
Collect collects all the metrics that PromMonitor keeps track of.
func (*PromMonitor) Dequeued ¶
func (p *PromMonitor) Dequeued(ctx context.Context)
Dequeued is called when a request has been dequeued.
func (*PromMonitor) Describe ¶
func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)
Describe describes all the metrics that PromMonitor keeps track of.
func (*PromMonitor) Dropped ¶
func (p *PromMonitor) Dropped(ctx context.Context, key string, queueLength int, inProgress int, acquireTime time.Duration, reason string)
Dropped is called when a request is dropped.
func (*PromMonitor) Exit ¶
func (p *PromMonitor) Exit(ctx context.Context)
Exit is called when a request has finished processing.
type QueueTickerCreator ¶
QueueTickerCreator is a function that provides a ticker
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter is an implementation of Limiter that puts a hard limit on the number of requests per second
func NewRateLimiter ¶
func NewRateLimiter( refillInterval time.Duration, burst int, ticker helper.Ticker, requestsDroppedMetric prometheus.Counter, ) *RateLimiter
NewRateLimiter creates a new instance of RateLimiter
func (*RateLimiter) Limit ¶
func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
Limit rejects an incoming reequest if the maximum number of requests per second has been reached
func (*RateLimiter) PruneUnusedLimiters ¶
func (r *RateLimiter) PruneUnusedLimiters(ctx context.Context)
PruneUnusedLimiters enters an infinite loop to periodically check if any limiters can be cleaned up. This is meant to be called in a separate goroutine.
type ResourceWatcher ¶ added in v16.3.0
type ResourceWatcher interface { // Name returns the name of the resource watcher Name() string // Poll returns a backoff event when a watcher determine something goes wrong with the resource it is // monitoring. If everything is fine, it returns `nil`. Watchers are expected to respect the cancellation of // the input context. Poll(context.Context) (*BackoffEvent, error) }
ResourceWatcher is an interface of the watchers that monitor the system resources.