limiter

package
v16.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2023 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All
	// requests of the same method shares the concurrency limit.
	TypePerRPC = "per-rpc"
	// TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request
	// information (RemoteIP/Repository/User) as the limiting key.
	TypePackObjects = "pack-objects"
)
View Source
const (
	// MaximumWatcherTimeout is the number of maximum allowed timeout when polling backoff events from watchers.
	// When this threshold is reached, a timeout polling is treated as a backoff event.
	MaximumWatcherTimeout = 5
)

Variables

View Source
var ErrMaxQueueSize = errors.New("maximum queue size reached")

ErrMaxQueueSize indicates the concurrency queue has reached its maximum size

View Source
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")

ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the concurrency queue.

View Source
var ErrRateLimit = errors.New("rate limit reached")

ErrRateLimit is returned when RateLimiter determined a request has breached the rate request limit.

Functions

This section is empty.

Types

type AdaptiveCalculator added in v16.3.0

type AdaptiveCalculator struct {
	sync.Mutex
	// contains filtered or unexported fields
}

AdaptiveCalculator is responsible for calculating the adaptive limits based on additive increase/multiplicative decrease (AIMD) algorithm. This method involves gradually increasing the limit during normal process functioning but quickly reducing it when an issue (backoff event) occurs. It receives a list of AdaptiveLimiter and a list of ResourceWatcher. Although the limits may have different settings (Initial, Min, Max, BackoffFactor), they all move as a whole. The caller accesses the current limits via AdaptiveLimiter.Current method.

When the calculator starts, each limit value is set to its Initial limit. Periodically, the calculator polls the backoff events from the watchers. The current value of each limit is re-calibrated as follows: * limit = limit + 1 if there is no backoff event since the last calibration. The new limit cannot exceed max limit. * limit = limit * BackoffFactor otherwise. The new limit cannot be lower than min limit.

A watcher returning an error is treated as a no backoff event.

func NewAdaptiveCalculator added in v16.3.0

func NewAdaptiveCalculator(calibration time.Duration, logger *logrus.Entry, limits []AdaptiveLimiter, watchers []ResourceWatcher) *AdaptiveCalculator

NewAdaptiveCalculator constructs a AdaptiveCalculator object. It's the responsibility of the caller to validate the correctness of input AdaptiveLimiter and ResourceWatcher.

func (*AdaptiveCalculator) Collect added in v16.3.0

func (c *AdaptiveCalculator) Collect(metrics chan<- prometheus.Metric)

Collect is used to collect Prometheus metrics.

func (*AdaptiveCalculator) Describe added in v16.3.0

func (c *AdaptiveCalculator) Describe(descs chan<- *prometheus.Desc)

Describe is used to describe Prometheus metrics.

func (*AdaptiveCalculator) Start added in v16.3.0

func (c *AdaptiveCalculator) Start(ctx context.Context) (func(), error)

Start resets the current limit values and start a goroutine to poll the backoff events. This method exits after the mentioned goroutine starts.

type AdaptiveLimit added in v16.3.0

type AdaptiveLimit struct {
	// contains filtered or unexported fields
}

AdaptiveLimit is an implementation of the AdaptiveLimiter interface. It uses an atomic Int32 to represent the current limit value, ensuring thread-safe updates.

func (*AdaptiveLimit) Current added in v16.3.0

func (l *AdaptiveLimit) Current() int

Current returns the current limit. This function can be called without the need for synchronization.

func (*AdaptiveLimit) Name added in v16.3.0

func (l *AdaptiveLimit) Name() string

Name returns the name of the adaptive limit

func (*AdaptiveLimit) Setting added in v16.3.0

func (l *AdaptiveLimit) Setting() AdaptiveSetting

Setting returns the configuration parameters for an adaptive limiter.

func (*AdaptiveLimit) Update added in v16.3.0

func (l *AdaptiveLimit) Update(val int)

Update adjusts current limit value.

type AdaptiveLimiter added in v16.3.0

type AdaptiveLimiter interface {
	Name() string
	Current() int
	Update(val int)
	Setting() AdaptiveSetting
}

AdaptiveLimiter is an interface for managing and updating adaptive limits. It exposes methods to get the name, current limit value, update the limit value, and access its settings.

type AdaptiveSetting added in v16.3.0

type AdaptiveSetting struct {
	Initial        int
	Max            int
	Min            int
	BackoffBackoff float64
}

AdaptiveSetting is a struct that holds the configuration parameters for an adaptive limiter.

type BackoffEvent added in v16.3.0

type BackoffEvent struct {
	WatcherName   string
	ShouldBackoff bool
	Reason        string
}

BackoffEvent is a signal that the current system is under pressure. It's returned by the watchers under the management of the AdaptiveCalculator at calibration points.

type ConcurrencyLimiter

type ConcurrencyLimiter struct {
	// contains filtered or unexported fields
}

ConcurrencyLimiter contains rate limiter state.

func NewConcurrencyLimiter

func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter

NewConcurrencyLimiter creates a new concurrency rate limiter.

func (*ConcurrencyLimiter) Limit

func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error)

Limit will limit the concurrency of the limited function f. There are two distinct mechanisms that limit execution of the function:

  1. First, every call will enter the per-key queue. This queue limits how many callers may try to acquire their per-key semaphore at the same time. If the queue is full the caller will be rejected.
  2. Second, when the caller has successfully entered the queue, they try to acquire their per-key semaphore. If this takes longer than the maximum queueing limit then the caller will be dequeued and gets an error.

type ConcurrencyMonitor

type ConcurrencyMonitor interface {
	Queued(ctx context.Context, key string, length int)
	Dequeued(ctx context.Context)
	Enter(ctx context.Context, acquireTime time.Duration)
	Exit(ctx context.Context)
	Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, message string)
}

ConcurrencyMonitor allows the concurrency monitor to be observed.

func NewNoopConcurrencyMonitor

func NewNoopConcurrencyMonitor() ConcurrencyMonitor

NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor

type LimitedFunc

type LimitedFunc func() (resp interface{}, err error)

LimitedFunc represents a function that will be limited

type Limiter

type Limiter interface {
	Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
}

Limiter limits incoming requests

type PromMonitor

type PromMonitor struct {
	// contains filtered or unexported fields
}

PromMonitor keeps track of prometheus metrics for limiters. It conforms to both the ConcurrencyMonitor, and prometheus.Collector interfaces.

func NewPackObjectsConcurrencyMonitor

func NewPackObjectsConcurrencyMonitor(latencyBuckets []float64) *PromMonitor

NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use with limiting pack objects processes.

func NewPerRPCPromMonitor

func NewPerRPCPromMonitor(
	system, fullMethod string,
	queuedMetric, inProgressMetric *prometheus.GaugeVec,
	acquiringSecondsVec *prometheus.HistogramVec,
	requestsDroppedMetric *prometheus.CounterVec,
) *PromMonitor

NewPerRPCPromMonitor creates a new ConcurrencyMonitor that tracks limiter activity in Prometheus.

func (*PromMonitor) Collect

func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)

Collect collects all the metrics that PromMonitor keeps track of.

func (*PromMonitor) Dequeued

func (p *PromMonitor) Dequeued(ctx context.Context)

Dequeued is called when a request has been dequeued.

func (*PromMonitor) Describe

func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)

Describe describes all the metrics that PromMonitor keeps track of.

func (*PromMonitor) Dropped

func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string)

Dropped is called when a request is dropped.

func (*PromMonitor) Enter

func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration)

Enter is called when a request begins to be processed.

func (*PromMonitor) Exit

func (p *PromMonitor) Exit(ctx context.Context)

Exit is called when a request has finished processing.

func (*PromMonitor) Queued

func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int)

Queued is called when a request has been queued.

type QueueTickerCreator

type QueueTickerCreator func() helper.Ticker

QueueTickerCreator is a function that provides a ticker

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter is an implementation of Limiter that puts a hard limit on the number of requests per second

func NewRateLimiter

func NewRateLimiter(
	refillInterval time.Duration,
	burst int,
	ticker helper.Ticker,
	requestsDroppedMetric prometheus.Counter,
) *RateLimiter

NewRateLimiter creates a new instance of RateLimiter

func (*RateLimiter) Limit

func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)

Limit rejects an incoming reequest if the maximum number of requests per second has been reached

func (*RateLimiter) PruneUnusedLimiters

func (r *RateLimiter) PruneUnusedLimiters(ctx context.Context)

PruneUnusedLimiters enters an infinite loop to periodically check if any limiters can be cleaned up. This is meant to be called in a separate goroutine.

type ResourceWatcher added in v16.3.0

type ResourceWatcher interface {
	// Name returns the name of the resource watcher
	Name() string
	// Poll returns a backoff event when a watcher determine something goes wrong with the resource it is
	// monitoring. If everything is fine, it returns `nil`. Watchers are expected to respect the cancellation of
	// the input context.
	Poll(context.Context) (*BackoffEvent, error)
}

ResourceWatcher is an interface of the watchers that monitor the system resources.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL