Documentation ¶
Index ¶
- Variables
- func FieldsProducer(ctx context.Context, _ error) logrus.Fields
- func InitLimitStats(ctx context.Context) context.Context
- func LimitConcurrencyByRepo(ctx context.Context) string
- func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, ...) error
- func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, ...) (interface{}, error)
- func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware)
- type ConcurrencyLimiter
- type ConcurrencyMonitor
- type GetLockKey
- type LimitStats
- type LimitedFunc
- type Limiter
- type LimiterMiddleware
- type PromMonitor
- func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)
- func (p *PromMonitor) Dequeued(ctx context.Context)
- func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)
- func (p *PromMonitor) Dropped(ctx context.Context, reason string)
- func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration)
- func (p *PromMonitor) Exit(ctx context.Context)
- func (p *PromMonitor) Queued(ctx context.Context)
- type QueueTickerCreator
- type RateLimiter
- type SetupFunc
Constants ¶
This section is empty.
Variables ¶
var ErrMaxQueueSize = errors.New("maximum queue size reached")
ErrMaxQueueSize indicates the concurrency queue has reached its maximum size
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")
ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the concurrency queue.
var ErrRateLimit = errors.New("rate limit reached")
ErrRateLimit is returned when RateLimiter determined a request has breached the rate request limit.
Functions ¶
func FieldsProducer ¶ added in v15.9.0
FieldsProducer extracts stats info from the context and returns it as a logging fields.
func InitLimitStats ¶ added in v15.9.0
InitLimitStats initializes context with a per-RPC stats struct
func LimitConcurrencyByRepo ¶
LimitConcurrencyByRepo implements GetLockKey by using the repository path as lock.
func StatsStreamInterceptor ¶ added in v15.9.0
func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error
StatsStreamInterceptor returns a Stream Interceptor
func StatsUnaryInterceptor ¶ added in v15.9.0
func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)
StatsUnaryInterceptor returns a Unary Interceptor that initializes the context
func WithConcurrencyLimiters ¶
func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware)
WithConcurrencyLimiters sets up middleware to limit the concurrency of requests based on RPC and repository
Types ¶
type ConcurrencyLimiter ¶
type ConcurrencyLimiter struct {
// contains filtered or unexported fields
}
ConcurrencyLimiter contains rate limiter state.
func NewConcurrencyLimiter ¶
func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter
NewConcurrencyLimiter creates a new concurrency rate limiter.
func (*ConcurrencyLimiter) Limit ¶
func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error)
Limit will limit the concurrency of the limited function f. There are two distinct mechanisms that limit execution of the function:
- First, every call will enter the per-key queue. This queue limits how many callers may try to acquire their per-key semaphore at the same time. If the queue is full the caller will be rejected.
- Second, when the caller has successfully entered the queue, they try to acquire their per-key semaphore. If this takes longer than the maximum queueing limit then the caller will be dequeued and gets an error.
type ConcurrencyMonitor ¶
type ConcurrencyMonitor interface { Queued(ctx context.Context) Dequeued(ctx context.Context) Enter(ctx context.Context, acquireTime time.Duration) Exit(ctx context.Context) Dropped(ctx context.Context, message string) }
ConcurrencyMonitor allows the concurrency monitor to be observed
func NewNoopConcurrencyMonitor ¶ added in v15.6.0
func NewNoopConcurrencyMonitor() ConcurrencyMonitor
NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor
type GetLockKey ¶
GetLockKey function defines the lock key of an RPC invocation based on its context
type LimitStats ¶ added in v15.9.0
type LimitStats struct {
// contains filtered or unexported fields
}
LimitStats contains info about the concurrency limiter.
func (*LimitStats) AddConcurrencyQueueMs ¶ added in v15.9.0
func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64)
AddConcurrencyQueueMs adds queue time.
func (*LimitStats) Fields ¶ added in v15.9.0
func (s *LimitStats) Fields() logrus.Fields
Fields returns logging info.
type LimitedFunc ¶
type LimitedFunc func() (resp interface{}, err error)
LimitedFunc represents a function that will be limited
type Limiter ¶
type Limiter interface {
Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
}
Limiter limits incoming requests
type LimiterMiddleware ¶
type LimiterMiddleware struct {
// contains filtered or unexported fields
}
LimiterMiddleware contains rate limiter state
func New ¶
func New(cfg config.Cfg, getLockKey GetLockKey, setupMiddleware SetupFunc) *LimiterMiddleware
New creates a new middleware that limits requests. SetupFunc sets up the middlware with a specific kind of limiter.
func (*LimiterMiddleware) Collect ¶
func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric)
Collect is used to collect Prometheus metrics.
func (*LimiterMiddleware) Describe ¶
func (c *LimiterMiddleware) Describe(descs chan<- *prometheus.Desc)
Describe is used to describe Prometheus metrics.
func (*LimiterMiddleware) StreamInterceptor ¶
func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor
StreamInterceptor returns a Stream Interceptor
func (*LimiterMiddleware) UnaryInterceptor ¶
func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor
UnaryInterceptor returns a Unary Interceptor
type PromMonitor ¶ added in v15.6.0
type PromMonitor struct {
// contains filtered or unexported fields
}
PromMonitor keeps track of prometheus metrics for limithandlers. It conforms to both the ConcurrencyMonitor, and prometheus.Collector interfaces
func NewPackObjectsConcurrencyMonitor ¶ added in v15.6.0
func NewPackObjectsConcurrencyMonitor(keyType string, latencyBuckets []float64) *PromMonitor
NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use with limiting pack objects processes
func (*PromMonitor) Collect ¶ added in v15.6.0
func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)
Collect collects all the metrics that PromMonitor keeps track of
func (*PromMonitor) Dequeued ¶ added in v15.6.0
func (p *PromMonitor) Dequeued(ctx context.Context)
Dequeued is called when a request has been dequeued
func (*PromMonitor) Describe ¶ added in v15.6.0
func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)
Describe describes all the metrics that PromMonitor keeps track of
func (*PromMonitor) Dropped ¶ added in v15.6.0
func (p *PromMonitor) Dropped(ctx context.Context, reason string)
Dropped is called when a request is dropped.
func (*PromMonitor) Enter ¶ added in v15.6.0
func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration)
Enter is called when a request begins to be processed
func (*PromMonitor) Exit ¶ added in v15.6.0
func (p *PromMonitor) Exit(ctx context.Context)
Exit is called when a request has finished processing
func (*PromMonitor) Queued ¶ added in v15.6.0
func (p *PromMonitor) Queued(ctx context.Context)
Queued is called when a request has been queued
type QueueTickerCreator ¶
QueueTickerCreator is a function that provides a ticker
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter is an implementation of Limiter that puts a hard limit on the number of requests per second
func NewRateLimiter ¶
func NewRateLimiter( refillInterval time.Duration, burst int, ticker helper.Ticker, requestsDroppedMetric prometheus.Counter, ) *RateLimiter
NewRateLimiter creates a new instance of RateLimiter
func (*RateLimiter) Limit ¶
func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
Limit rejects an incoming reequest if the maximum number of requests per second has been reached
func (*RateLimiter) PruneUnusedLimiters ¶
func (r *RateLimiter) PruneUnusedLimiters(ctx context.Context)
PruneUnusedLimiters enters an infinite loop to periodically check if any limiters can be cleaned up. This is meant to be called in a separate goroutine.
type SetupFunc ¶
type SetupFunc func(cfg config.Cfg, middleware *LimiterMiddleware)
SetupFunc set up a middleware to limiting requests
func WithRateLimiters ¶
WithRateLimiters sets up a middleware with limiters that limit requests based on its rate per second per RPC