limiter

package
v16.11.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaximumWatcherTimeout is the number of maximum allowed timeout when polling backoff events from watchers.
	// When this threshold is reached, a timeout polling is treated as a backoff event.
	MaximumWatcherTimeout = 5
	// DefaultCalibrateFrequency is the default time period between two calibrations.
	DefaultCalibrateFrequency = 30 * time.Second
	// DefaultBackoffFactor is the default recommended backoff factor when the concurrency decreases. By default,
	// the factor is 0.5, meaning the limit is cut off by half when a backoff event occurs.
	DefaultBackoffFactor = 0.5
)
View Source
const (
	// TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All
	// requests of the same method shares the concurrency limit.
	TypePerRPC = "per-rpc"
	// TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request
	// information (RemoteIP/Repository/User) as the limiting key.
	TypePackObjects = "pack-objects"
)

Variables

View Source
var ErrMaxQueueSize = errors.New("maximum queue size reached")

ErrMaxQueueSize indicates the concurrency queue has reached its maximum size

View Source
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")

ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the concurrency queue.

View Source
var ErrRateLimit = errors.New("rate limit reached")

ErrRateLimit is returned when RateLimiter determined a request has breached the rate request limit.

Functions

func NewResizableSemaphore added in v16.4.0

func NewResizableSemaphore(size uint) *resizableSemaphore

NewResizableSemaphore creates a new resizableSemaphore with the specified initial size.

Types

type AdaptiveCalculator added in v16.3.0

type AdaptiveCalculator struct {
	sync.Mutex
	// contains filtered or unexported fields
}

AdaptiveCalculator is responsible for calculating the adaptive limits based on additive increase/multiplicative decrease (AIMD) algorithm. This method involves gradually increasing the limit during normal process functioning but quickly reducing it when an issue (backoff event) occurs. It receives a list of AdaptiveLimiter and a list of ResourceWatcher. Although the limits may have different settings (Initial, Min, Max, BackoffFactor), they all move as a whole. The caller accesses the current limits via AdaptiveLimiter.Current method.

When the calculator starts, each limit value is set to its Initial limit. Periodically, the calculator polls the backoff events from the watchers. The current value of each limit is re-calibrated as follows: * limit = limit + 1 if there is no backoff event since the last calibration. The new limit cannot exceed max limit. * limit = limit * BackoffFactor otherwise. The new limit cannot be lower than min limit.

A watcher returning an error is treated as a no backoff event.

func NewAdaptiveCalculator added in v16.3.0

func NewAdaptiveCalculator(calibration time.Duration, logger log.Logger, limits []AdaptiveLimiter, watchers []ResourceWatcher) *AdaptiveCalculator

NewAdaptiveCalculator constructs a AdaptiveCalculator object. It's the responsibility of the caller to validate the correctness of input AdaptiveLimiter and ResourceWatcher.

func (*AdaptiveCalculator) Collect added in v16.3.0

func (c *AdaptiveCalculator) Collect(metrics chan<- prometheus.Metric)

Collect is used to collect Prometheus metrics.

func (*AdaptiveCalculator) Describe added in v16.3.0

func (c *AdaptiveCalculator) Describe(descs chan<- *prometheus.Desc)

Describe is used to describe Prometheus metrics.

func (*AdaptiveCalculator) Start added in v16.3.0

func (c *AdaptiveCalculator) Start(ctx context.Context) (func(), error)

Start resets the current limit values and start a goroutine to poll the backoff events. This method exits after the mentioned goroutine starts.

type AdaptiveLimit added in v16.3.0

type AdaptiveLimit struct {
	sync.Mutex
	// contains filtered or unexported fields
}

AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses a mutex to ensure thread-safe access to the limit value.

func NewAdaptiveLimit added in v16.4.0

func NewAdaptiveLimit(name string, setting AdaptiveSetting) *AdaptiveLimit

NewAdaptiveLimit initializes a new AdaptiveLimit object

func (*AdaptiveLimit) AfterUpdate added in v16.4.0

func (l *AdaptiveLimit) AfterUpdate(hook AfterUpdateHook)

AfterUpdate registers a callback when the current limit is updated. Because all updates and hooks are synchronized, calling l.Current() inside the update hook in the same goroutine will cause deadlock. Hence, the update hook must use the newVal argument instead.

func (*AdaptiveLimit) Current added in v16.3.0

func (l *AdaptiveLimit) Current() int

Current returns the current limit. This function can be called without the need for synchronization.

func (*AdaptiveLimit) Initial added in v16.5.0

func (l *AdaptiveLimit) Initial() int

Initial returns the initial limit.

func (*AdaptiveLimit) Name added in v16.3.0

func (l *AdaptiveLimit) Name() string

Name returns the name of the adaptive limit

func (*AdaptiveLimit) Setting added in v16.3.0

func (l *AdaptiveLimit) Setting() AdaptiveSetting

Setting returns the configuration parameters for an adaptive limiter.

func (*AdaptiveLimit) Update added in v16.3.0

func (l *AdaptiveLimit) Update(val int)

Update adjusts the current limit value and executes all registered update hooks.

type AdaptiveLimiter added in v16.3.0

type AdaptiveLimiter interface {
	Name() string
	Current() int
	Update(val int)
	AfterUpdate(AfterUpdateHook)
	Setting() AdaptiveSetting
}

AdaptiveLimiter is an interface for managing and updating adaptive limits. It exposes methods to get the name, current limit value, update the limit value, and access its settings.

type AdaptiveSetting added in v16.3.0

type AdaptiveSetting struct {
	Initial       int
	Max           int
	Min           int
	BackoffFactor float64
}

AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter.

type AfterUpdateHook added in v16.4.0

type AfterUpdateHook func(newVal int)

AfterUpdateHook is a function hook that is triggered when the current limit changes. The callers need to register a hook to the AdaptiveLimiter implementation beforehand. They are required to handle errors inside the hook function.

type BackoffEvent added in v16.3.0

type BackoffEvent struct {
	WatcherName   string
	ShouldBackoff bool
	Reason        string
	Stats         map[string]any
}

BackoffEvent is a signal that the current system is under pressure. It's returned by the watchers under the management of the AdaptiveCalculator at calibration points.

type ConcurrencyLimiter

type ConcurrencyLimiter struct {

	// SetWaitTimeoutContext is a function for setting up timeout context. If this is nill, context.WithTimeout is
	// used. This function is for internal testing purpose only.
	SetWaitTimeoutContext func() context.Context
	// contains filtered or unexported fields
}

ConcurrencyLimiter contains rate limiter state.

func NewConcurrencyLimiter

func NewConcurrencyLimiter(limit *AdaptiveLimit, maxQueueLength int, maxQueueWait time.Duration, monitor ConcurrencyMonitor) *ConcurrencyLimiter

NewConcurrencyLimiter creates a new concurrency rate limiter.

func (*ConcurrencyLimiter) Limit

func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error)

Limit will limit the concurrency of the limited function f. There are two distinct mechanisms that limit execution of the function:

  1. First, every call will enter the per-key queue. This queue limits how many callers may try to acquire their per-key semaphore at the same time. If the queue is full the caller will be rejected.
  2. Second, when the caller has successfully entered the queue, they try to acquire their per-key semaphore. If this takes longer than the maximum queueing limit then the caller will be dequeued and gets an error.

type ConcurrencyMonitor

type ConcurrencyMonitor interface {
	Queued(ctx context.Context, key string, length int)
	Dequeued(ctx context.Context)
	Enter(ctx context.Context, inProgress int, acquireTime time.Duration)
	Exit(ctx context.Context)
	Dropped(ctx context.Context, key string, queueLength int, inProgress int, acquireTime time.Duration, message string)
}

ConcurrencyMonitor allows the concurrency monitor to be observed.

func NewNoopConcurrencyMonitor

func NewNoopConcurrencyMonitor() ConcurrencyMonitor

NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor

type LimitedFunc

type LimitedFunc func() (resp interface{}, err error)

LimitedFunc represents a function that will be limited

type Limiter

type Limiter interface {
	Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
}

Limiter limits incoming requests

type PromMonitor

type PromMonitor struct {
	// contains filtered or unexported fields
}

PromMonitor keeps track of prometheus metrics for limiters. It conforms to both the ConcurrencyMonitor, and prometheus.Collector interfaces.

func NewPackObjectsConcurrencyMonitor

func NewPackObjectsConcurrencyMonitor(latencyBuckets []float64) *PromMonitor

NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use with limiting pack objects processes.

func NewPerRPCPromMonitor

func NewPerRPCPromMonitor(
	system, fullMethod string,
	queuedMetric, inProgressMetric *prometheus.GaugeVec,
	acquiringSecondsVec *prometheus.HistogramVec,
	requestsDroppedMetric *prometheus.CounterVec,
) *PromMonitor

NewPerRPCPromMonitor creates a new ConcurrencyMonitor that tracks limiter activity in Prometheus.

func (*PromMonitor) Collect

func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)

Collect collects all the metrics that PromMonitor keeps track of.

func (*PromMonitor) Dequeued

func (p *PromMonitor) Dequeued(ctx context.Context)

Dequeued is called when a request has been dequeued.

func (*PromMonitor) Describe

func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)

Describe describes all the metrics that PromMonitor keeps track of.

func (*PromMonitor) Dropped

func (p *PromMonitor) Dropped(ctx context.Context, key string, queueLength int, inProgress int, acquireTime time.Duration, reason string)

Dropped is called when a request is dropped.

func (*PromMonitor) Enter

func (p *PromMonitor) Enter(ctx context.Context, inProgress int, acquireTime time.Duration)

Enter is called when a request begins to be processed.

func (*PromMonitor) Exit

func (p *PromMonitor) Exit(ctx context.Context)

Exit is called when a request has finished processing.

func (*PromMonitor) Queued

func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int)

Queued is called when a request has been queued.

type QueueTickerCreator

type QueueTickerCreator func() helper.Ticker

QueueTickerCreator is a function that provides a ticker

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter is an implementation of Limiter that puts a hard limit on the number of requests per second

func NewRateLimiter

func NewRateLimiter(
	refillInterval time.Duration,
	burst int,
	ticker helper.Ticker,
	requestsDroppedMetric prometheus.Counter,
) *RateLimiter

NewRateLimiter creates a new instance of RateLimiter

func (*RateLimiter) Limit

func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)

Limit rejects an incoming reequest if the maximum number of requests per second has been reached

func (*RateLimiter) PruneUnusedLimiters

func (r *RateLimiter) PruneUnusedLimiters(ctx context.Context)

PruneUnusedLimiters enters an infinite loop to periodically check if any limiters can be cleaned up. This is meant to be called in a separate goroutine.

type ResourceWatcher added in v16.3.0

type ResourceWatcher interface {
	// Name returns the name of the resource watcher
	Name() string
	// Poll returns a backoff event when a watcher determine something goes wrong with the resource it is
	// monitoring. If everything is fine, it returns `nil`. Watchers are expected to respect the cancellation of
	// the input context.
	Poll(context.Context) (*BackoffEvent, error)
}

ResourceWatcher is an interface of the watchers that monitor the system resources.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL