Documentation ¶
Overview ¶
package limiter provides primatives for limiting the number of concurrent operations in-flight.
Index ¶
Constants ¶
const Unlimited uint32 = 0
Unlimited can be used to allow an unlimited number of concurrent sessions.
Variables ¶
var ErrCapacityReached = errors.New("active session limit reached")
ErrCapacityReached is returned when there is no capacity for additional sessions.
Functions ¶
This section is empty.
Types ¶
type Session ¶
type Session interface { // End the session. // // This MUST be called when the session-holder is done (e.g. the gRPC stream // is closed). End() // Terminated is a channel that is closed when the session is terminated. // // The session-holder MUST receive on it and exit (e.g. close the gRPC stream) // when it is closed. Terminated() SessionTerminatedChan }
Session allows its holder to perform an operation (e.g. serve a gRPC stream) concurrenly with other session-holders. Sessions may be terminated abruptly by the SessionLimiter, so it is the responsibility of the holder to receive on the Terminated channel and halt the operation when it is closed.
type SessionLimiter ¶
type SessionLimiter struct {
// contains filtered or unexported fields
}
SessionLimiter is a session-based concurrency limiter, it provides the basis of gRPC/xDS load balancing.
Stream handlers obtain a session with BeginSession before they begin serving resources - if the server has reached capacity ErrCapacityReached is returned, otherwise a Session is returned.
It is the session-holder's responsibility to:
- Call End on the session when finished.
- Receive on the session's Terminated channel and exit (e.g. close the gRPC stream) when it is closed.
The maximum number of concurrent sessions is controlled with SetMaxSessions. If there are more than the given maximum sessions already in-flight, SessionLimiter will drain randomly-selected sessions at a rate controlled by SetDrainRateLimit.
func NewSessionLimiter ¶
func NewSessionLimiter() *SessionLimiter
NewSessionLimiter creates a new SessionLimiter.
func (*SessionLimiter) BeginSession ¶
func (l *SessionLimiter) BeginSession() (Session, error)
BeginSession begins a new session, or returns ErrCapacityReached if the concurrent session limit has been reached.
It is the session-holder's responsibility to:
- Call End on the session when finished.
- Receive on the session's Terminated channel and exit (e.g. close the gRPC stream) when it is closed.
func (*SessionLimiter) Run ¶
func (l *SessionLimiter) Run(ctx context.Context)
Run the SessionLimiter's drain loop, which terminates excess sessions if the limit is lowered. It will exit when the given context is canceled or reaches its deadline.
func (*SessionLimiter) SetDrainRateLimit ¶
func (l *SessionLimiter) SetDrainRateLimit(limit rate.Limit)
SetDrainRateLimit controls the rate at which excess sessions will be drained.
func (*SessionLimiter) SetMaxSessions ¶
func (l *SessionLimiter) SetMaxSessions(max uint32)
SetMaxSessions controls the maximum number of concurrent sessions. If it is lower, randomly-selected sessions will be drained.
type SessionTerminatedChan ¶ added in v1.14.4
type SessionTerminatedChan <-chan struct{}
SessionTerminatedChan is a channel that will be closed to notify session- holders that a session has been terminated.