Documentation ¶
Index ¶
- Constants
- Variables
- func FieldsProducer(ctx context.Context, _ error) logrus.Fields
- func InitLimitStats(ctx context.Context) context.Context
- func LimitConcurrencyByRepo(ctx context.Context) string
- func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, ...) error
- func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, ...) (interface{}, error)
- func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware)
- type ConcurrencyLimiter
- type ConcurrencyMonitor
- type GetLockKey
- type LimitStats
- func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64)
- func (s *LimitStats) Fields() logrus.Fields
- func (s *LimitStats) SetConcurrencyDroppedReason(reason string)
- func (s *LimitStats) SetConcurrencyQueueLength(queueLength int)
- func (s *LimitStats) SetLimitingKey(limitingType string, limitingKey string)
- type LimitedFunc
- type Limiter
- type LimiterMiddleware
- type PromMonitor
- func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)
- func (p *PromMonitor) Dequeued(ctx context.Context)
- func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)
- func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, ...)
- func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration)
- func (p *PromMonitor) Exit(ctx context.Context)
- func (p *PromMonitor) Queued(ctx context.Context, key string, queueLength int)
- type QueueTickerCreator
- type RateLimiter
- type SetupFunc
Constants ¶
const ( // TypePerRPC is a concurrency limiter whose key is the full method of gRPC server. All // requests of the same method shares the concurrency limit. TypePerRPC = "per-rpc" // TypePackObjects is a dedicated concurrency limiter for pack-objects. It uses request // information (RemoteIP/Repository/User) as the limiting key. TypePackObjects = "pack-objects" )
Variables ¶
var ErrMaxQueueSize = errors.New("maximum queue size reached")
ErrMaxQueueSize indicates the concurrency queue has reached its maximum size
var ErrMaxQueueTime = errors.New("maximum time in concurrency queue reached")
ErrMaxQueueTime indicates a request has reached the maximum time allowed to wait in the concurrency queue.
var ErrRateLimit = errors.New("rate limit reached")
ErrRateLimit is returned when RateLimiter determined a request has breached the rate request limit.
Functions ¶
func FieldsProducer ¶
FieldsProducer extracts stats info from the context and returns it as a logging fields.
func InitLimitStats ¶
InitLimitStats initializes context with a per-RPC stats struct.
func LimitConcurrencyByRepo ¶
LimitConcurrencyByRepo implements GetLockKey by using the repository path as lock.
func StatsStreamInterceptor ¶
func StatsStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error
StatsStreamInterceptor returns a Stream Interceptor.
func StatsUnaryInterceptor ¶
func StatsUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error)
StatsUnaryInterceptor returns a Unary Interceptor that initializes the context.
func WithConcurrencyLimiters ¶
func WithConcurrencyLimiters(cfg config.Cfg, middleware *LimiterMiddleware)
WithConcurrencyLimiters sets up middleware to limit the concurrency of requests based on RPC and repository
Types ¶
type ConcurrencyLimiter ¶
type ConcurrencyLimiter struct {
// contains filtered or unexported fields
}
ConcurrencyLimiter contains rate limiter state.
func NewConcurrencyLimiter ¶
func NewConcurrencyLimiter(maxConcurrencyLimit, maxQueueLength int, maxQueuedTickerCreator QueueTickerCreator, monitor ConcurrencyMonitor) *ConcurrencyLimiter
NewConcurrencyLimiter creates a new concurrency rate limiter.
func (*ConcurrencyLimiter) Limit ¶
func (c *ConcurrencyLimiter) Limit(ctx context.Context, limitingKey string, f LimitedFunc) (interface{}, error)
Limit will limit the concurrency of the limited function f. There are two distinct mechanisms that limit execution of the function:
- First, every call will enter the per-key queue. This queue limits how many callers may try to acquire their per-key semaphore at the same time. If the queue is full the caller will be rejected.
- Second, when the caller has successfully entered the queue, they try to acquire their per-key semaphore. If this takes longer than the maximum queueing limit then the caller will be dequeued and gets an error.
type ConcurrencyMonitor ¶
type ConcurrencyMonitor interface { Queued(ctx context.Context, key string, length int) Dequeued(ctx context.Context) Enter(ctx context.Context, acquireTime time.Duration) Exit(ctx context.Context) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, message string) }
ConcurrencyMonitor allows the concurrency monitor to be observed.
func NewNoopConcurrencyMonitor ¶
func NewNoopConcurrencyMonitor() ConcurrencyMonitor
NewNoopConcurrencyMonitor returns a noopConcurrencyMonitor
type GetLockKey ¶
GetLockKey function defines the lock key of an RPC invocation based on its context
type LimitStats ¶
LimitStats contains info about the concurrency limiter.
func (*LimitStats) AddConcurrencyQueueMs ¶
func (s *LimitStats) AddConcurrencyQueueMs(queueMs int64)
AddConcurrencyQueueMs adds queue time.
func (*LimitStats) Fields ¶
func (s *LimitStats) Fields() logrus.Fields
Fields returns logging info.
func (*LimitStats) SetConcurrencyDroppedReason ¶
func (s *LimitStats) SetConcurrencyDroppedReason(reason string)
SetConcurrencyDroppedReason sets the reason why a call has been dropped from the queue.
func (*LimitStats) SetConcurrencyQueueLength ¶
func (s *LimitStats) SetConcurrencyQueueLength(queueLength int)
SetConcurrencyQueueLength set concurrency queue length.
func (*LimitStats) SetLimitingKey ¶
func (s *LimitStats) SetLimitingKey(limitingType string, limitingKey string)
SetLimitingKey set limiting key.
type LimitedFunc ¶
type LimitedFunc func() (resp interface{}, err error)
LimitedFunc represents a function that will be limited
type Limiter ¶
type Limiter interface {
Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
}
Limiter limits incoming requests
type LimiterMiddleware ¶
type LimiterMiddleware struct {
// contains filtered or unexported fields
}
LimiterMiddleware contains rate limiter state
func New ¶
func New(cfg config.Cfg, getLockKey GetLockKey, setupMiddleware SetupFunc) *LimiterMiddleware
New creates a new middleware that limits requests. SetupFunc sets up the middlware with a specific kind of limiter.
func (*LimiterMiddleware) Collect ¶
func (c *LimiterMiddleware) Collect(metrics chan<- prometheus.Metric)
Collect is used to collect Prometheus metrics.
func (*LimiterMiddleware) Describe ¶
func (c *LimiterMiddleware) Describe(descs chan<- *prometheus.Desc)
Describe is used to describe Prometheus metrics.
func (*LimiterMiddleware) StreamInterceptor ¶
func (c *LimiterMiddleware) StreamInterceptor() grpc.StreamServerInterceptor
StreamInterceptor returns a Stream Interceptor
func (*LimiterMiddleware) UnaryInterceptor ¶
func (c *LimiterMiddleware) UnaryInterceptor() grpc.UnaryServerInterceptor
UnaryInterceptor returns a Unary Interceptor
type PromMonitor ¶
type PromMonitor struct {
// contains filtered or unexported fields
}
PromMonitor keeps track of prometheus metrics for limithandlers. It conforms to both the ConcurrencyMonitor, and prometheus.Collector interfaces.
func NewPackObjectsConcurrencyMonitor ¶
func NewPackObjectsConcurrencyMonitor(latencyBuckets []float64) *PromMonitor
NewPackObjectsConcurrencyMonitor returns a concurrency monitor for use with limiting pack objects processes.
func (*PromMonitor) Collect ¶
func (p *PromMonitor) Collect(metrics chan<- prometheus.Metric)
Collect collects all the metrics that PromMonitor keeps track of.
func (*PromMonitor) Dequeued ¶
func (p *PromMonitor) Dequeued(ctx context.Context)
Dequeued is called when a request has been dequeued.
func (*PromMonitor) Describe ¶
func (p *PromMonitor) Describe(descs chan<- *prometheus.Desc)
Describe describes all the metrics that PromMonitor keeps track of.
func (*PromMonitor) Dropped ¶
func (p *PromMonitor) Dropped(ctx context.Context, key string, length int, acquireTime time.Duration, reason string)
Dropped is called when a request is dropped.
func (*PromMonitor) Enter ¶
func (p *PromMonitor) Enter(ctx context.Context, acquireTime time.Duration)
Enter is called when a request begins to be processed.
func (*PromMonitor) Exit ¶
func (p *PromMonitor) Exit(ctx context.Context)
Exit is called when a request has finished processing.
type QueueTickerCreator ¶
QueueTickerCreator is a function that provides a ticker
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter is an implementation of Limiter that puts a hard limit on the number of requests per second
func NewRateLimiter ¶
func NewRateLimiter( refillInterval time.Duration, burst int, ticker helper.Ticker, requestsDroppedMetric prometheus.Counter, ) *RateLimiter
NewRateLimiter creates a new instance of RateLimiter
func (*RateLimiter) Limit ¶
func (r *RateLimiter) Limit(ctx context.Context, lockKey string, f LimitedFunc) (interface{}, error)
Limit rejects an incoming reequest if the maximum number of requests per second has been reached
func (*RateLimiter) PruneUnusedLimiters ¶
func (r *RateLimiter) PruneUnusedLimiters(ctx context.Context)
PruneUnusedLimiters enters an infinite loop to periodically check if any limiters can be cleaned up. This is meant to be called in a separate goroutine.
type SetupFunc ¶
type SetupFunc func(cfg config.Cfg, middleware *LimiterMiddleware)
SetupFunc set up a middleware to limiting requests
func WithRateLimiters ¶
WithRateLimiters sets up a middleware with limiters that limit requests based on its rate per second per RPC