quotapool

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2022 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package quotapool provides an abstract implementation of a pool of resources to be distributed among concurrent clients.

The library also offers a concrete implementation of such a quota pool for single-dimension integer quota. This IntPool acts like a weighted semaphore that additionally offers FIFO ordering for serving requests.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrNotEnoughQuota = fmt.Errorf("not enough quota available")

ErrNotEnoughQuota is returned by IntRequestFuncs when they want to be called again once there's new resources.

Functions

func HasErrClosed

func HasErrClosed(err error) bool

HasErrClosed returns true if this error is or contains an ErrClosed error.

func LogSlowAcquisition

func LogSlowAcquisition(ctx context.Context, poolName string, r Request, start time.Time) func()

LogSlowAcquisition is a SlowAcquisitionFunc.

Types

type AbstractPool

type AbstractPool struct {
	// contains filtered or unexported fields
}

AbstractPool is an abstract implementation of a pool that stores some unit of Resource. The basic idea is that it allows requests to acquire a quantity of Resource from the pool in FIFO order in a way that interacts well with context cancelation.

func New

func New(name string, initialResource Resource, options ...Option) *AbstractPool

New returns a new quota pool initialized with a given quota. The quota is capped at this amount, meaning that callers may return more quota than they acquired without ever making more than the quota capacity available.

func (*AbstractPool) Acquire

func (qp *AbstractPool) Acquire(ctx context.Context, r Request) (err error)

Acquire attempts to fulfill the Request with Resource from the qp. Requests are serviced in a FIFO order; only a single request is ever being offered resources at a time. A Request will be offered the pool's current quantity of Resource until it is fulfilled or its context is canceled.

Safe for concurrent use.

func (*AbstractPool) Close

func (qp *AbstractPool) Close(reason string)

Close signals to all ongoing and subsequent acquisitions that they are free to return to their callers. They will receive an *ErrClosed which contains this reason.

Safe for concurrent use.

func (*AbstractPool) Len

func (qp *AbstractPool) Len() int

Len returns the current length of the queue for this AbstractPool.

func (*AbstractPool) TimeSource

func (qp *AbstractPool) TimeSource() timeutil.TimeSource

TimeSource returns the TimeSource associated with this AbstractPool.

func (*AbstractPool) Update

func (qp *AbstractPool) Update(f UpdateFunc)

Update updates the underlying resource with the provided value, notifying the head of the queue if the Resource indicates that it should.

Safe for concurrent use.

type AcquisitionFunc

type AcquisitionFunc func(
	ctx context.Context, poolName string, r Request, start time.Time,
)

AcquisitionFunc is used to configure a quotapool to call a function after an acquisition has occurred.

type ErrClosed

type ErrClosed struct {
	// contains filtered or unexported fields
}

ErrClosed is returned from Acquire after Close has been called.

func (*ErrClosed) Error

func (ec *ErrClosed) Error() string

Error implements error.

type IntAlloc

type IntAlloc struct {
	// contains filtered or unexported fields
}

IntAlloc is an allocated quantity which should be released.

func (*IntAlloc) Acquired

func (ia *IntAlloc) Acquired() uint64

Acquired returns the quantity that this alloc has acquired.

func (*IntAlloc) Freeze

func (ia *IntAlloc) Freeze()

Freeze informs the quota pool that this allocation will never be Release()ed. Releasing it later is illegal and will lead to panics.

Using Freeze and UpdateCapacity on the same pool may require explicit coordination. It is illegal to freeze allocated capacity which is no longer available - specifically it is illegal to make the capacity of an IntPool negative. Imagine the case where the capacity of an IntPool is initially 10. An allocation of 10 is acquired. Then, while it is held, the pool's capacity is updated to be 9. Then the outstanding allocation is frozen. This would make the total capacity of the IntPool negative which is not allowed and will lead to a panic. In general, it's a bad idea to freeze allocated quota from a pool which will ever have its capacity decreased.

AcquireFunc() requests will be woken up with an updated Capacity, and Alloc() requests will be trimmed accordingly.

func (*IntAlloc) Merge

func (ia *IntAlloc) Merge(other *IntAlloc)

Merge adds the acquired resources in other to ia. Other may not be used after it has been merged. It is illegal to merge allocs from different pools and doing so will result in a panic.

func (*IntAlloc) Release

func (ia *IntAlloc) Release()

Release releases an IntAlloc back into the IntPool. It is safe to release into a closed pool.

func (*IntAlloc) String

func (ia *IntAlloc) String() string

String formats an IntAlloc as a string.

type IntPool

type IntPool struct {
	// contains filtered or unexported fields
}

IntPool manages allocating integer units of quota to clients. Clients may acquire quota in two ways, using Acquire which requires the client to specify the quantity of quota at call time and AcquireFunc which allows the client to provide a function which will be used to determine whether a quantity of quota is sufficient when it becomes available.

func NewIntPool

func NewIntPool(name string, capacity uint64, options ...Option) *IntPool

NewIntPool creates a new named IntPool.

capacity is the amount of quota initially available. The maximum capacity is math.MaxInt64. If the capacity argument exceeds that value, this function will panic.

func (*IntPool) Acquire

func (p *IntPool) Acquire(ctx context.Context, v uint64) (*IntAlloc, error)

Acquire acquires the specified amount of quota from the pool. On success, a non-nil alloc is returned and Release() must be called on it to return the quota to the pool.

If 'v' is greater than the total capacity of the pool, we instead try to acquire quota equal to the maximum capacity. If the maximum capacity is decreased while this request is ongoing, the request is again truncated to the maximum capacity.

Acquisitions of 0 return immediately with no error, even if the IntPool is closed.

Acquisitions of more than 0 from a pool with 0 capacity always returns an ErrNotEnoughQuota.

Safe for concurrent use.

func (*IntPool) AcquireFunc

func (p *IntPool) AcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc, error)

AcquireFunc acquires a quantity of quota determined by a function which is called with a quantity of available quota.

Example

An example use case for AcquireFunc is a pool of workers attempting to acquire resources to run a heterogenous set of jobs. Imagine for example we have a set of workers and a list of jobs which need to be run. The function might be used to choose the largest job which can be run by the existing quantity of quota.

const quota = 7
const workers = 3
qp := NewIntPool("work units", quota)
type job struct {
	name string
	cost uint64
}
jobs := []*job{
	{name: "foo", cost: 3},
	{name: "bar", cost: 2},
	{name: "baz", cost: 4},
	{name: "qux", cost: 6},
	{name: "quux", cost: 3},
	{name: "quuz", cost: 3},
}
// sortJobs sorts the jobs in highest-to-lowest order with nil last.
sortJobs := func() {
	sort.Slice(jobs, func(i, j int) bool {
		ij, jj := jobs[i], jobs[j]
		if ij != nil && jj != nil {
			return ij.cost > jj.cost
		}
		return ij != nil
	})
}
// getJob finds the largest job which can be run with the current quota.
getJob := func(
	ctx context.Context, qp *IntPool,
) (j *job, alloc *IntAlloc, err error) {
	alloc, err = qp.AcquireFunc(ctx, func(
		ctx context.Context, pi PoolInfo,
	) (took uint64, err error) {
		sortJobs()
		// There are no more jobs, take 0 and return.
		if jobs[0] == nil {
			return 0, nil
		}
		// Find the largest jobs which can be run.
		for i := range jobs {
			if jobs[i] == nil {
				break
			}
			if jobs[i].cost <= pi.Available {
				j, jobs[i] = jobs[i], nil
				return j.cost, nil
			}
		}
		return 0, ErrNotEnoughQuota
	})
	return j, alloc, err
}
runWorker := func(workerNum int) func(ctx context.Context) error {
	return func(ctx context.Context) error {
		for {
			j, alloc, err := getJob(ctx, qp)
			if err != nil {
				return err
			}
			if j == nil {
				return nil
			}
			alloc.Release()
		}
	}
}
g := ctxgroup.WithContext(context.Background())
for i := 0; i < workers; i++ {
	g.GoCtx(runWorker(i))
}
if err := g.Wait(); err != nil {
	panic(err)
}
Output:

func (*IntPool) ApproximateQuota

func (p *IntPool) ApproximateQuota() (q uint64)

ApproximateQuota will report approximately the amount of quota available in the pool. It's "approximate" because, if there's an acquisition in progress, this might return an "intermediate" value - one that does not fully reflect the capacity either before that acquisitions started or after it will have finished.

func (*IntPool) Capacity

func (p *IntPool) Capacity() uint64

Capacity returns the amount of quota managed by this pool.

func (*IntPool) Close

func (p *IntPool) Close(reason string)

Close signals to all ongoing and subsequent acquisitions that the pool is closed and that an error should be returned.

Safe for concurrent use.

func (*IntPool) Closer

func (p *IntPool) Closer(reason string) IntPoolCloser

Closer returns a struct which implements stop.Closer.

func (*IntPool) Full

func (p *IntPool) Full() bool

Full returns true if no quota is outstanding.

func (*IntPool) Len

func (p *IntPool) Len() int

Len returns the current length of the queue for this IntPool.

func (*IntPool) Release

func (p *IntPool) Release(allocs ...*IntAlloc)

Release will release allocs back to their pool. Allocs which are from p are merged into a single alloc before being added to avoid synchronizing on o multiple times. Allocs which did not come from p are released one at a time. It is legal to pass nil values in allocs.

func (*IntPool) TryAcquire

func (p *IntPool) TryAcquire(ctx context.Context, v uint64) (*IntAlloc, error)

TryAcquire is like Acquire but if there is insufficient quota to acquire immediately the method will return ErrNotEnoughQuota.

func (*IntPool) TryAcquireFunc

func (p *IntPool) TryAcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc, error)

TryAcquireFunc is like AcquireFunc but if insufficient quota exists the method will return ErrNotEnoughQuota rather than waiting for quota to become available.

func (*IntPool) UpdateCapacity

func (p *IntPool) UpdateCapacity(newCapacity uint64)

UpdateCapacity sets the capacity to newCapacity. If the current capacity is higher than the new capacity, currently running requests will not be affected. When the capacity is increased, new quota will be added. The total quantity of outstanding quota will never exceed the maximum value of the capacity which existed when any outstanding quota was acquired.

type IntPoolCloser

type IntPoolCloser struct {
	// contains filtered or unexported fields
}

IntPoolCloser implements stop.Closer.

func (IntPoolCloser) Close

func (ipc IntPoolCloser) Close()

Close makes the IntPoolCloser a stop.Closer.

type IntRequestFunc

type IntRequestFunc func(ctx context.Context, p PoolInfo) (took uint64, err error)

IntRequestFunc is used to request a quantity of quota determined when quota is available rather than before requesting.

If the request is satisfied, the function returns the amount of quota consumed and no error. If the request is not satisfied because there's no enough quota currently available, ErrNotEnoughQuota is returned to cause the function to be called again where more quota becomes available. took has to be 0 (i.e. it is not allowed for the request to save some quota for later use). If any other error is returned, took again has to be 0. The function will not be called any more and the error will be returned from IntPool.AcquireFunc().

type Limit

type Limit float64

Limit defines a rate in terms of quota per second.

type OnWaitStartFunc

type OnWaitStartFunc func(
	ctx context.Context, poolName string, r Request,
)

OnWaitStartFunc is the prototype for functions called to notify the start or finish of a waiting period when a request is blocked.

type Option

type Option interface {
	// contains filtered or unexported methods
}

Option is used to configure a quotapool.

func OnAcquisition

func OnAcquisition(f AcquisitionFunc) Option

OnAcquisition creates an Option to configure a callback upon acquisition. It is often useful for recording metrics.

func OnSlowAcquisition

func OnSlowAcquisition(threshold time.Duration, f SlowAcquisitionFunc) Option

OnSlowAcquisition creates an Option to configure a callback upon slow acquisitions. Only one OnSlowAcquisition may be used. If multiple are specified only the last will be used.

func OnWaitFinish

func OnWaitFinish(onFinish AcquisitionFunc) Option

OnWaitFinish creates an Option to configure a callback which is called when a previously blocked request acquires resources.

func OnWaitStart

func OnWaitStart(onStart OnWaitStartFunc) Option

OnWaitStart creates an Option to configure a callback which is called when a request blocks and has to wait for quota.

func WithCloser

func WithCloser(closer <-chan struct{}) Option

WithCloser allows the client to provide a channel which will lead to the AbstractPool being closed.

func WithMinimumWait

func WithMinimumWait(duration time.Duration) Option

WithMinimumWait is used with the RateLimiter to control the minimum duration which a goroutine will sleep waiting for quota to accumulate. This can help avoid expensive spinning when the workload consists of many small acquisitions. If used with a regular (not rate limiting) quotapool, this option has no effect.

func WithTimeSource

func WithTimeSource(ts timeutil.TimeSource) Option

WithTimeSource is used to configure a quotapool to use the provided TimeSource.

type PoolInfo

type PoolInfo struct {
	// Available is the amount of quota available to be consumed. This is the
	// maximum value that the `took` return value from IntRequestFunc can be set
	// to.
	// Note that Available() can be 0. This happens when the IntRequestFunc() is
	// called as a result of the pool's capacity decreasing.
	Available uint64

	// Capacity returns the maximum capacity available in the pool. This can
	// decrease over time. It can be used to determine that the resources required
	// by a request will never be available.
	Capacity uint64
}

PoolInfo represents the information that the IntRequestFunc gets about the current quota pool conditions.

type RateAlloc

type RateAlloc struct {
	// contains filtered or unexported fields
}

RateAlloc is an allocated quantity of quota which can be released back into the token-bucket RateLimiter.

func (*RateAlloc) Consume

func (ra *RateAlloc) Consume()

Consume destroys the RateAlloc. It is not safe to call any methods on the RateAlloc after this call.

func (*RateAlloc) Return

func (ra *RateAlloc) Return()

Return returns the RateAlloc to the RateLimiter. It is not safe to call any methods on the RateAlloc after this call.

type RateLimiter

type RateLimiter struct {
	// contains filtered or unexported fields
}

RateLimiter implements a token-bucket style rate limiter. It has the added feature that quota acquired from the pool can be returned in the case that they end up not getting used.

func NewRateLimiter

func NewRateLimiter(name string, rate Limit, burst int64, options ...Option) *RateLimiter

NewRateLimiter defines a new RateLimiter. The limiter is implemented as a token bucket which has a maximum capacity of burst. If a request attempts to acquire more than burst, it will block until the bucket is full and then put the token bucket in debt.

func (*RateLimiter) Acquire

func (rl *RateLimiter) Acquire(ctx context.Context, n int64) (*RateAlloc, error)

Acquire acquires n quota from the RateLimiter. This acquired quota may be released back into the token bucket or it may be consumed.

func (*RateLimiter) AdmitN

func (rl *RateLimiter) AdmitN(n int64) bool

AdmitN acquire n quota from the RateLimiter if it succeeds. It will return false and not block if there is currently insufficient quota or the pool is closed.

func (*RateLimiter) UpdateLimit

func (rl *RateLimiter) UpdateLimit(rate Limit, burst int64)

UpdateLimit updates the rate and burst limits. The change in burst will be applied to the current quantity of quota. For example, if the RateLimiter currently had a quota of 5 available with a burst of 10 and the burst is update to 20, the quota will increase to 15. Similarly, if the burst is decreased by 10, the current quota will decrease accordingly, potentially putting the limiter into debt.

func (*RateLimiter) WaitN

func (rl *RateLimiter) WaitN(ctx context.Context, n int64) error

WaitN acquires n quota from the RateLimiter. This acquisition cannot be released.

type Request

type Request interface {

	// Acquire decides whether a Request can be fulfilled by a given quantity of
	// Resource.
	//
	// If it is not fulfilled it must not modify or retain the passed alloc.
	// If it is fulfilled, it should modify the Resource value accordingly.
	//
	// If tryAgainAfter is positive, acquisition will be attempted again after
	// the specified duration. This is critical for the implementation of
	// rate limiters on top of this package.
	Acquire(context.Context, Resource) (fulfilled bool, tryAgainAfter time.Duration)

	// ShouldWait indicates whether this request should be queued. If this method
	// returns false and there is insufficient capacity in the pool when the
	// request is queued then ErrNotEnoughQuota will be returned from calls to
	// Acquire.
	ShouldWait() bool
}

Request is an interface used to acquire quota from the pool. Request is responsible for subdividing a resource into the portion which is retained when the Request is fulfilled and the remainder.

type Resource

type Resource interface{}

Resource is an interface that represents a quantity which is being pooled and allocated. The Resource will be modified by a Request or a call to Update.

type SlowAcquisitionFunc

type SlowAcquisitionFunc func(
	ctx context.Context, poolName string, r Request, start time.Time,
) (onAcquire func())

SlowAcquisitionFunc is used to configure a quotapool to call a function when quota acquisition is slow. The returned callback is called when the acquisition occurs.

type TokenBucket

type TokenBucket struct {
	// contains filtered or unexported fields
}

TokenBucket implements the basic accounting for a token bucket.

A token bucket has a rate of replenishment and a burst limit. Tokens are replenished over time, up to the burst limit.

The token bucket keeps track of the current amount and updates it as time passes. The bucket can go into debt (i.e. negative current amount).

func (*TokenBucket) Adjust

func (tb *TokenBucket) Adjust(delta Tokens)

Adjust returns tokens to the bucket (positive delta) or accounts for a debt of tokens (negative delta).

func (*TokenBucket) Init

func (tb *TokenBucket) Init(rate TokensPerSecond, burst Tokens, timeSource timeutil.TimeSource)

Init the token bucket.

func (*TokenBucket) TryToFulfill

func (tb *TokenBucket) TryToFulfill(amount Tokens) (fulfilled bool, tryAgainAfter time.Duration)

TryToFulfill either removes the given amount if is available, or returns a time after which the request should be retried.

func (*TokenBucket) UpdateConfig

func (tb *TokenBucket) UpdateConfig(rate TokensPerSecond, burst Tokens)

UpdateConfig updates the rate and burst limits. The change in burst will be applied to the current token quantity. For example, if the RateLimiter currently had 5 available tokens and the burst is updated from 10 to 20, the amount will increase to 15. Similarly, if the burst is decreased by 10, the current quota will decrease accordingly, potentially putting the limiter into debt.

type Tokens

type Tokens float64

Tokens are abstract units (usually units of work).

type TokensPerSecond

type TokensPerSecond float64

TokensPerSecond is the rate of token replenishment.

type UpdateFunc

type UpdateFunc func(resource Resource) (shouldNotify bool)

UpdateFunc is used to update a resource.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL