Documentation ¶
Overview ¶
Package quotapool provides an abstract implementation of a pool of resources to be distributed among concurrent clients.
The library also offers a concrete implementation of such a quota pool for single-dimension integer quota. This IntPool acts like a weighted semaphore that additionally offers FIFO ordering for serving requests.
Index ¶
- Variables
- func HasErrClosed(err error) bool
- func LogSlowAcquisition(ctx context.Context, poolName string, r Request, start time.Time) func()
- type AbstractPool
- type AcquisitionFunc
- type ErrClosed
- type IntAlloc
- type IntPool
- func (p *IntPool) Acquire(ctx context.Context, v uint64) (*IntAlloc, error)
- func (p *IntPool) AcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc, error)
- func (p *IntPool) ApproximateQuota() (q uint64)
- func (p *IntPool) Capacity() uint64
- func (p *IntPool) Close(reason string)
- func (p *IntPool) Closer(reason string) IntPoolCloser
- func (p *IntPool) Full() bool
- func (p *IntPool) Len() int
- func (p *IntPool) Release(allocs ...*IntAlloc)
- func (p *IntPool) TryAcquire(ctx context.Context, v uint64) (*IntAlloc, error)
- func (p *IntPool) TryAcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc, error)
- func (p *IntPool) UpdateCapacity(newCapacity uint64)
- type IntPoolCloser
- type IntRequestFunc
- type Limit
- type OnWaitStartFunc
- type Option
- func OnAcquisition(f AcquisitionFunc) Option
- func OnSlowAcquisition(threshold time.Duration, f SlowAcquisitionFunc) Option
- func OnWaitFinish(onFinish AcquisitionFunc) Option
- func OnWaitStart(onStart OnWaitStartFunc) Option
- func WithCloser(closer <-chan struct{}) Option
- func WithMinimumWait(duration time.Duration) Option
- func WithTimeSource(ts timeutil.TimeSource) Option
- type PoolInfo
- type RateAlloc
- type RateLimiter
- type Request
- type Resource
- type SlowAcquisitionFunc
- type TokenBucket
- func (tb *TokenBucket) Adjust(delta Tokens)
- func (tb *TokenBucket) Init(rate TokensPerSecond, burst Tokens, timeSource timeutil.TimeSource)
- func (tb *TokenBucket) TryToFulfill(amount Tokens) (fulfilled bool, tryAgainAfter time.Duration)
- func (tb *TokenBucket) UpdateConfig(rate TokensPerSecond, burst Tokens)
- type Tokens
- type TokensPerSecond
- type UpdateFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrNotEnoughQuota = fmt.Errorf("not enough quota available")
ErrNotEnoughQuota is returned by IntRequestFuncs when they want to be called again once there's new resources.
Functions ¶
func HasErrClosed ¶
HasErrClosed returns true if this error is or contains an ErrClosed error.
Types ¶
type AbstractPool ¶
type AbstractPool struct {
// contains filtered or unexported fields
}
AbstractPool is an abstract implementation of a pool that stores some unit of Resource. The basic idea is that it allows requests to acquire a quantity of Resource from the pool in FIFO order in a way that interacts well with context cancelation.
func New ¶
func New(name string, initialResource Resource, options ...Option) *AbstractPool
New returns a new quota pool initialized with a given quota. The quota is capped at this amount, meaning that callers may return more quota than they acquired without ever making more than the quota capacity available.
func (*AbstractPool) Acquire ¶
func (qp *AbstractPool) Acquire(ctx context.Context, r Request) (err error)
Acquire attempts to fulfill the Request with Resource from the qp. Requests are serviced in a FIFO order; only a single request is ever being offered resources at a time. A Request will be offered the pool's current quantity of Resource until it is fulfilled or its context is canceled.
Safe for concurrent use.
func (*AbstractPool) Close ¶
func (qp *AbstractPool) Close(reason string)
Close signals to all ongoing and subsequent acquisitions that they are free to return to their callers. They will receive an *ErrClosed which contains this reason.
Safe for concurrent use.
func (*AbstractPool) Len ¶
func (qp *AbstractPool) Len() int
Len returns the current length of the queue for this AbstractPool.
func (*AbstractPool) TimeSource ¶
func (qp *AbstractPool) TimeSource() timeutil.TimeSource
TimeSource returns the TimeSource associated with this AbstractPool.
func (*AbstractPool) Update ¶
func (qp *AbstractPool) Update(f UpdateFunc)
Update updates the underlying resource with the provided value, notifying the head of the queue if the Resource indicates that it should.
Safe for concurrent use.
type AcquisitionFunc ¶
AcquisitionFunc is used to configure a quotapool to call a function after an acquisition has occurred.
type ErrClosed ¶
type ErrClosed struct {
// contains filtered or unexported fields
}
ErrClosed is returned from Acquire after Close has been called.
type IntAlloc ¶
type IntAlloc struct {
// contains filtered or unexported fields
}
IntAlloc is an allocated quantity which should be released.
func (*IntAlloc) Freeze ¶
func (ia *IntAlloc) Freeze()
Freeze informs the quota pool that this allocation will never be Release()ed. Releasing it later is illegal and will lead to panics.
Using Freeze and UpdateCapacity on the same pool may require explicit coordination. It is illegal to freeze allocated capacity which is no longer available - specifically it is illegal to make the capacity of an IntPool negative. Imagine the case where the capacity of an IntPool is initially 10. An allocation of 10 is acquired. Then, while it is held, the pool's capacity is updated to be 9. Then the outstanding allocation is frozen. This would make the total capacity of the IntPool negative which is not allowed and will lead to a panic. In general, it's a bad idea to freeze allocated quota from a pool which will ever have its capacity decreased.
AcquireFunc() requests will be woken up with an updated Capacity, and Alloc() requests will be trimmed accordingly.
func (*IntAlloc) Merge ¶
Merge adds the acquired resources in other to ia. Other may not be used after it has been merged. It is illegal to merge allocs from different pools and doing so will result in a panic.
type IntPool ¶
type IntPool struct {
// contains filtered or unexported fields
}
IntPool manages allocating integer units of quota to clients. Clients may acquire quota in two ways, using Acquire which requires the client to specify the quantity of quota at call time and AcquireFunc which allows the client to provide a function which will be used to determine whether a quantity of quota is sufficient when it becomes available.
func NewIntPool ¶
NewIntPool creates a new named IntPool.
capacity is the amount of quota initially available. The maximum capacity is math.MaxInt64. If the capacity argument exceeds that value, this function will panic.
func (*IntPool) Acquire ¶
Acquire acquires the specified amount of quota from the pool. On success, a non-nil alloc is returned and Release() must be called on it to return the quota to the pool.
If 'v' is greater than the total capacity of the pool, we instead try to acquire quota equal to the maximum capacity. If the maximum capacity is decreased while this request is ongoing, the request is again truncated to the maximum capacity.
Acquisitions of 0 return immediately with no error, even if the IntPool is closed.
Acquisitions of more than 0 from a pool with 0 capacity always returns an ErrNotEnoughQuota.
Safe for concurrent use.
func (*IntPool) AcquireFunc ¶
AcquireFunc acquires a quantity of quota determined by a function which is called with a quantity of available quota.
Example ¶
An example use case for AcquireFunc is a pool of workers attempting to acquire resources to run a heterogenous set of jobs. Imagine for example we have a set of workers and a list of jobs which need to be run. The function might be used to choose the largest job which can be run by the existing quantity of quota.
const quota = 7 const workers = 3 qp := NewIntPool("work units", quota) type job struct { name string cost uint64 } jobs := []*job{ {name: "foo", cost: 3}, {name: "bar", cost: 2}, {name: "baz", cost: 4}, {name: "qux", cost: 6}, {name: "quux", cost: 3}, {name: "quuz", cost: 3}, } // sortJobs sorts the jobs in highest-to-lowest order with nil last. sortJobs := func() { sort.Slice(jobs, func(i, j int) bool { ij, jj := jobs[i], jobs[j] if ij != nil && jj != nil { return ij.cost > jj.cost } return ij != nil }) } // getJob finds the largest job which can be run with the current quota. getJob := func( ctx context.Context, qp *IntPool, ) (j *job, alloc *IntAlloc, err error) { alloc, err = qp.AcquireFunc(ctx, func( ctx context.Context, pi PoolInfo, ) (took uint64, err error) { sortJobs() // There are no more jobs, take 0 and return. if jobs[0] == nil { return 0, nil } // Find the largest jobs which can be run. for i := range jobs { if jobs[i] == nil { break } if jobs[i].cost <= pi.Available { j, jobs[i] = jobs[i], nil return j.cost, nil } } return 0, ErrNotEnoughQuota }) return j, alloc, err } runWorker := func(workerNum int) func(ctx context.Context) error { return func(ctx context.Context) error { for { j, alloc, err := getJob(ctx, qp) if err != nil { return err } if j == nil { return nil } alloc.Release() } } } g := ctxgroup.WithContext(context.Background()) for i := 0; i < workers; i++ { g.GoCtx(runWorker(i)) } if err := g.Wait(); err != nil { panic(err) }
Output:
func (*IntPool) ApproximateQuota ¶
ApproximateQuota will report approximately the amount of quota available in the pool. It's "approximate" because, if there's an acquisition in progress, this might return an "intermediate" value - one that does not fully reflect the capacity either before that acquisitions started or after it will have finished.
func (*IntPool) Close ¶
Close signals to all ongoing and subsequent acquisitions that the pool is closed and that an error should be returned.
Safe for concurrent use.
func (*IntPool) Closer ¶
func (p *IntPool) Closer(reason string) IntPoolCloser
Closer returns a struct which implements stop.Closer.
func (*IntPool) Release ¶
Release will release allocs back to their pool. Allocs which are from p are merged into a single alloc before being added to avoid synchronizing on o multiple times. Allocs which did not come from p are released one at a time. It is legal to pass nil values in allocs.
func (*IntPool) TryAcquire ¶
TryAcquire is like Acquire but if there is insufficient quota to acquire immediately the method will return ErrNotEnoughQuota.
func (*IntPool) TryAcquireFunc ¶
TryAcquireFunc is like AcquireFunc but if insufficient quota exists the method will return ErrNotEnoughQuota rather than waiting for quota to become available.
func (*IntPool) UpdateCapacity ¶
UpdateCapacity sets the capacity to newCapacity. If the current capacity is higher than the new capacity, currently running requests will not be affected. When the capacity is increased, new quota will be added. The total quantity of outstanding quota will never exceed the maximum value of the capacity which existed when any outstanding quota was acquired.
type IntPoolCloser ¶
type IntPoolCloser struct {
// contains filtered or unexported fields
}
IntPoolCloser implements stop.Closer.
func (IntPoolCloser) Close ¶
func (ipc IntPoolCloser) Close()
Close makes the IntPoolCloser a stop.Closer.
type IntRequestFunc ¶
IntRequestFunc is used to request a quantity of quota determined when quota is available rather than before requesting.
If the request is satisfied, the function returns the amount of quota consumed and no error. If the request is not satisfied because there's no enough quota currently available, ErrNotEnoughQuota is returned to cause the function to be called again where more quota becomes available. took has to be 0 (i.e. it is not allowed for the request to save some quota for later use). If any other error is returned, took again has to be 0. The function will not be called any more and the error will be returned from IntPool.AcquireFunc().
type OnWaitStartFunc ¶
OnWaitStartFunc is the prototype for functions called to notify the start or finish of a waiting period when a request is blocked.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option is used to configure a quotapool.
func OnAcquisition ¶
func OnAcquisition(f AcquisitionFunc) Option
OnAcquisition creates an Option to configure a callback upon acquisition. It is often useful for recording metrics.
func OnSlowAcquisition ¶
func OnSlowAcquisition(threshold time.Duration, f SlowAcquisitionFunc) Option
OnSlowAcquisition creates an Option to configure a callback upon slow acquisitions. Only one OnSlowAcquisition may be used. If multiple are specified only the last will be used.
func OnWaitFinish ¶
func OnWaitFinish(onFinish AcquisitionFunc) Option
OnWaitFinish creates an Option to configure a callback which is called when a previously blocked request acquires resources.
func OnWaitStart ¶
func OnWaitStart(onStart OnWaitStartFunc) Option
OnWaitStart creates an Option to configure a callback which is called when a request blocks and has to wait for quota.
func WithCloser ¶
func WithCloser(closer <-chan struct{}) Option
WithCloser allows the client to provide a channel which will lead to the AbstractPool being closed.
func WithMinimumWait ¶
WithMinimumWait is used with the RateLimiter to control the minimum duration which a goroutine will sleep waiting for quota to accumulate. This can help avoid expensive spinning when the workload consists of many small acquisitions. If used with a regular (not rate limiting) quotapool, this option has no effect.
func WithTimeSource ¶
func WithTimeSource(ts timeutil.TimeSource) Option
WithTimeSource is used to configure a quotapool to use the provided TimeSource.
type PoolInfo ¶
type PoolInfo struct { // Available is the amount of quota available to be consumed. This is the // maximum value that the `took` return value from IntRequestFunc can be set // to. // Note that Available() can be 0. This happens when the IntRequestFunc() is // called as a result of the pool's capacity decreasing. Available uint64 // Capacity returns the maximum capacity available in the pool. This can // decrease over time. It can be used to determine that the resources required // by a request will never be available. Capacity uint64 }
PoolInfo represents the information that the IntRequestFunc gets about the current quota pool conditions.
type RateAlloc ¶
type RateAlloc struct {
// contains filtered or unexported fields
}
RateAlloc is an allocated quantity of quota which can be released back into the token-bucket RateLimiter.
type RateLimiter ¶
type RateLimiter struct {
// contains filtered or unexported fields
}
RateLimiter implements a token-bucket style rate limiter. It has the added feature that quota acquired from the pool can be returned in the case that they end up not getting used.
func NewRateLimiter ¶
func NewRateLimiter(name string, rate Limit, burst int64, options ...Option) *RateLimiter
NewRateLimiter defines a new RateLimiter. The limiter is implemented as a token bucket which has a maximum capacity of burst. If a request attempts to acquire more than burst, it will block until the bucket is full and then put the token bucket in debt.
func (*RateLimiter) Acquire ¶
Acquire acquires n quota from the RateLimiter. This acquired quota may be released back into the token bucket or it may be consumed.
func (*RateLimiter) AdmitN ¶
func (rl *RateLimiter) AdmitN(n int64) bool
AdmitN acquire n quota from the RateLimiter if it succeeds. It will return false and not block if there is currently insufficient quota or the pool is closed.
func (*RateLimiter) UpdateLimit ¶
func (rl *RateLimiter) UpdateLimit(rate Limit, burst int64)
UpdateLimit updates the rate and burst limits. The change in burst will be applied to the current quantity of quota. For example, if the RateLimiter currently had a quota of 5 available with a burst of 10 and the burst is update to 20, the quota will increase to 15. Similarly, if the burst is decreased by 10, the current quota will decrease accordingly, potentially putting the limiter into debt.
type Request ¶
type Request interface { // Acquire decides whether a Request can be fulfilled by a given quantity of // Resource. // // If it is not fulfilled it must not modify or retain the passed alloc. // If it is fulfilled, it should modify the Resource value accordingly. // // If tryAgainAfter is positive, acquisition will be attempted again after // the specified duration. This is critical for the implementation of // rate limiters on top of this package. Acquire(context.Context, Resource) (fulfilled bool, tryAgainAfter time.Duration) // ShouldWait indicates whether this request should be queued. If this method // returns false and there is insufficient capacity in the pool when the // request is queued then ErrNotEnoughQuota will be returned from calls to // Acquire. ShouldWait() bool }
Request is an interface used to acquire quota from the pool. Request is responsible for subdividing a resource into the portion which is retained when the Request is fulfilled and the remainder.
type Resource ¶
type Resource interface{}
Resource is an interface that represents a quantity which is being pooled and allocated. The Resource will be modified by a Request or a call to Update.
type SlowAcquisitionFunc ¶
type SlowAcquisitionFunc func( ctx context.Context, poolName string, r Request, start time.Time, ) (onAcquire func())
SlowAcquisitionFunc is used to configure a quotapool to call a function when quota acquisition is slow. The returned callback is called when the acquisition occurs.
type TokenBucket ¶
type TokenBucket struct {
// contains filtered or unexported fields
}
TokenBucket implements the basic accounting for a token bucket.
A token bucket has a rate of replenishment and a burst limit. Tokens are replenished over time, up to the burst limit.
The token bucket keeps track of the current amount and updates it as time passes. The bucket can go into debt (i.e. negative current amount).
func (*TokenBucket) Adjust ¶
func (tb *TokenBucket) Adjust(delta Tokens)
Adjust returns tokens to the bucket (positive delta) or accounts for a debt of tokens (negative delta).
func (*TokenBucket) Init ¶
func (tb *TokenBucket) Init(rate TokensPerSecond, burst Tokens, timeSource timeutil.TimeSource)
Init the token bucket.
func (*TokenBucket) TryToFulfill ¶
func (tb *TokenBucket) TryToFulfill(amount Tokens) (fulfilled bool, tryAgainAfter time.Duration)
TryToFulfill either removes the given amount if is available, or returns a time after which the request should be retried.
func (*TokenBucket) UpdateConfig ¶
func (tb *TokenBucket) UpdateConfig(rate TokensPerSecond, burst Tokens)
UpdateConfig updates the rate and burst limits. The change in burst will be applied to the current token quantity. For example, if the RateLimiter currently had 5 available tokens and the burst is updated from 10 to 20, the amount will increase to 15. Similarly, if the burst is decreased by 10, the current quota will decrease accordingly, potentially putting the limiter into debt.
type TokensPerSecond ¶
type TokensPerSecond float64
TokensPerSecond is the rate of token replenishment.
type UpdateFunc ¶
UpdateFunc is used to update a resource.