limithandler

package
v16.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 8, 2023 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All
	// requests of the same method shares the concurrency limit.
	TypePerRPC = "per-rpc"
	// TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request
	// information (RemoteIP/Repository/User) as the limiting key.
	TypePackObjects = "pack-objects"
)

Variables

View Source
var ErrMaxQueueSize = errors.New("maximum queue size reached")

ErrMaxQueueSize indicates the concurrency queue has reached its maximum size

View Source
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")

ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the concurrency queue.

View Source
var ErrRateLimit = errors.New("rate limit reached")

ErrRateLimit is returned when RateLimiter determined a request has breached the rate request limit.

Functions

func FieldsProducer

func FieldsProducer(ctx context.Context, _ error) logrus.Fields

FieldsProducer extracts stats info from the context and returns it as a logging fields.

func InitLimitStats

func InitLimitStats(ctx context.Context) context.Context

InitLimitStats initializes context with a per-RPC stats struct.

func LimitConcurrencyByRepo

func LimitConcurrencyByRepo(ctx context.Context) string

LimitConcurrencyByRepo implements GetLockKey by using the repository path as lock.

func StatsStreamInterceptor

func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error

StatsStreamInterceptor returns a Stream Interceptor.

func StatsUnaryInterceptor

func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)

StatsUnaryInterceptor returns a Unary Interceptor that initializes the context.

func WithConcurrencyLimiters

func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware)

WithConcurrencyLimiters sets up middleware to limit the concurrency of requests based on RPC and repository

Types

type ConcurrencyLimiter

type ConcurrencyLimiter struct {
	// contains filtered or unexported fields
}

ConcurrencyLimiter contains rate limiter state.

func NewConcurrencyLimiter

func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter

NewConcurrencyLimiter creates a new concurrency rate limiter.

func (*ConcurrencyLimiter) Limit

func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error)

Limit will limit the concurrency of the limited function f. There are two distinct mechanisms that limit execution of the function:

  1. First, every call will enter the per-key queue. This queue limits how many callers may try to acquire their per-key semaphore at the same time. If the queue is full the caller will be rejected.
  2. Second, when the caller has successfully entered the queue, they try to acquire their per-key semaphore. If this takes longer than the maximum queueing limit then the caller will be dequeued and gets an error.

type ConcurrencyMonitor

type ConcurrencyMonitor interface {
	Queued(ctx context.Context, key string, length int)
	Dequeued(ctx context.Context)
	Enter(ctx context.Context, acquireTime time.Duration)
	Exit(ctx context.Context)
	Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, message string)
}

ConcurrencyMonitor allows the concurrency monitor to be observed.

func NewNoopConcurrencyMonitor

func NewNoopConcurrencyMonitor() ConcurrencyMonitor

NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor

type GetLockKey

type GetLockKey func(context.Context) string

GetLockKey function defines the lock key of an RPC invocation based on its context

type LimitStats

type LimitStats struct {
	sync.Mutex
	// contains filtered or unexported fields
}

LimitStats contains info about the concurrency limiter.

func (*LimitStats) AddConcurrencyQueueMs

func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64)

AddConcurrencyQueueMs adds queue time.

func (*LimitStats) Fields

func (s *LimitStats) Fields() logrus.Fields

Fields returns logging info.

func (*LimitStats) SetConcurrencyDroppedReason

func (s *LimitStats) SetConcurrencyDroppedReason(reason string)

SetConcurrencyDroppedReason sets the reason why a call has been dropped from the queue.

func (*LimitStats) SetConcurrencyQueueLength

func (s *LimitStats) SetConcurrencyQueueLength(queueLength int)

SetConcurrencyQueueLength set concurrency queue length.

func (*LimitStats) SetLimitingKey

func (s *LimitStats) SetLimitingKey(limitingType string, limitingKey string)

SetLimitingKey set limiting key.

type LimitedFunc

type LimitedFunc func() (resp interface{}, err error)

LimitedFunc represents a function that will be limited

type Limiter

type Limiter interface {
	Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
}

Limiter limits incoming requests

type LimiterMiddleware

type LimiterMiddleware struct {
	// contains filtered or unexported fields
}

LimiterMiddleware contains rate limiter state

func New

func New(cfg config.Cfg, getLockKey GetLockKey, setupMiddleware SetupFunc) *LimiterMiddleware

New creates a new middleware that limits requests. SetupFunc sets up the middlware with a specific kind of limiter.

func (*LimiterMiddleware) Collect

func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric)

Collect is used to collect Prometheus metrics.

func (*LimiterMiddleware) Describe

func (c *LimiterMiddleware) Describe(descs chan<- *prometheus.Desc)

Describe is used to describe Prometheus metrics.

func (*LimiterMiddleware) StreamInterceptor

func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor

StreamInterceptor returns a Stream Interceptor

func (*LimiterMiddleware) UnaryInterceptor

func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor

UnaryInterceptor returns a Unary Interceptor

type PromMonitor

type PromMonitor struct {
	// contains filtered or unexported fields
}

PromMonitor keeps track of prometheus metrics for limithandlers. It conforms to both the ConcurrencyMonitor, and prometheus.Collector interfaces.

func NewPackObjectsConcurrencyMonitor

func NewPackObjectsConcurrencyMonitor(latencyBuckets []float64) *PromMonitor

NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use with limiting pack objects processes.

func (*PromMonitor) Collect

func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)

Collect collects all the metrics that PromMonitor keeps track of.

func (*PromMonitor) Dequeued

func (p *PromMonitor) Dequeued(ctx context.Context)

Dequeued is called when a request has been dequeued.

func (*PromMonitor) Describe

func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)

Describe describes all the metrics that PromMonitor keeps track of.

func (*PromMonitor) Dropped

func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string)

Dropped is called when a request is dropped.

func (*PromMonitor) Enter

func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration)

Enter is called when a request begins to be processed.

func (*PromMonitor) Exit

func (p *PromMonitor) Exit(ctx context.Context)

Exit is called when a request has finished processing.

func (*PromMonitor) Queued

func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int)

Queued is called when a request has been queued.

type QueueTickerCreator

type QueueTickerCreator func() helper.Ticker

QueueTickerCreator is a function that provides a ticker

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter is an implementation of Limiter that puts a hard limit on the number of requests per second

func NewRateLimiter

func NewRateLimiter(
	refillInterval time.Duration,
	burst int,
	ticker helper.Ticker,
	requestsDroppedMetric prometheus.Counter,
) *RateLimiter

NewRateLimiter creates a new instance of RateLimiter

func (*RateLimiter) Limit

func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)

Limit rejects an incoming reequest if the maximum number of requests per second has been reached

func (*RateLimiter) PruneUnusedLimiters

func (r *RateLimiter) PruneUnusedLimiters(ctx context.Context)

PruneUnusedLimiters enters an infinite loop to periodically check if any limiters can be cleaned up. This is meant to be called in a separate goroutine.

type SetupFunc

type SetupFunc func(cfg config.Cfg, middleware *LimiterMiddleware)

SetupFunc set up a middleware to limiting requests

func WithRateLimiters

func WithRateLimiters(ctx context.Context) SetupFunc

WithRateLimiters sets up a middleware with limiters that limit requests based on its rate per second per RPC

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL