Documentation ¶
Overview ¶
Package workqueue provides a simple queue that supports the following features:
- Fair: items processed in the order in which they are added.
- Stingy: a single item will not be processed multiple times concurrently, and if an item is added multiple times before it can be processed, it will only be processed once.
- Multiple consumers and producers. In particular, it is allowed for an item to be reenqueued while it is being processed.
- Shutdown notifications.
Index ¶
- func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, ...)
- func SetProvider(metricsProvider MetricsProvider)
- func WithChunkSize(c int) func(*options)
- type BucketRateLimiter
- type CounterMetric
- type DelayingInterface
- type DoWorkPieceFunc
- type GaugeMetric
- type HistogramMetric
- type Interface
- type ItemExponentialFailureRateLimiter
- type ItemFastSlowRateLimiter
- type MaxOfRateLimiter
- type MetricsProvider
- type Options
- type RateLimiter
- func DefaultControllerRateLimiter() RateLimiter
- func DefaultItemBasedRateLimiter() RateLimiter
- func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
- func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
- func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
- func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter
- type RateLimitingInterface
- type SettableGaugeMetric
- type SummaryMetric
- type Type
- type WithMaxWaitRateLimiter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ParallelizeUntil ¶
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options)
ParallelizeUntil is a framework that allows for parallelizing N independent pieces of work until done or the context is canceled.
func SetProvider ¶
func SetProvider(metricsProvider MetricsProvider)
SetProvider sets the metrics provider for all subsequently created work queues. Only the first call has an effect.
func WithChunkSize ¶
func WithChunkSize(c int) func(*options)
WithChunkSize allows to set chunks of work items to the workers, rather than processing one by one. It is recommended to use this option if the number of pieces significantly higher than the number of workers and the work done for each item is small.
Types ¶
type BucketRateLimiter ¶
BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
func (*BucketRateLimiter) Forget ¶
func (r *BucketRateLimiter) Forget(item interface{})
func (*BucketRateLimiter) NumRequeues ¶
func (r *BucketRateLimiter) NumRequeues(item interface{}) int
func (*BucketRateLimiter) When ¶
func (r *BucketRateLimiter) When(item interface{}) time.Duration
type CounterMetric ¶
type CounterMetric interface {
Inc()
}
CounterMetric represents a single numerical value that only ever goes up.
type DelayingInterface ¶
type DelayingInterface interface { Interface // AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter(item interface{}, duration time.Duration) }
DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.
func NewDelayingQueue ¶
func NewDelayingQueue() DelayingInterface
NewDelayingQueue constructs a new workqueue with delayed queuing ability. NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use NewNamedDelayingQueue instead.
func NewDelayingQueueWithCustomClock ¶
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface
NewDelayingQueueWithCustomClock constructs a new named workqueue with ability to inject real or fake clock for testing purposes
func NewDelayingQueueWithCustomQueue ¶
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface
NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to inject custom queue Interface instead of the default one
func NewNamedDelayingQueue ¶
func NewNamedDelayingQueue(name string) DelayingInterface
NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
type DoWorkPieceFunc ¶
type DoWorkPieceFunc func(piece int)
type GaugeMetric ¶
type GaugeMetric interface { Inc() Dec() }
GaugeMetric represents a single numerical value that can arbitrarily go up and down.
type HistogramMetric ¶
type HistogramMetric interface {
Observe(float64)
}
HistogramMetric counts individual observations.
type ItemExponentialFailureRateLimiter ¶
type ItemExponentialFailureRateLimiter struct {
// contains filtered or unexported fields
}
ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit dealing with max failures and expiration are up to the caller
func (*ItemExponentialFailureRateLimiter) Forget ¶
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{})
func (*ItemExponentialFailureRateLimiter) NumRequeues ¶
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int
func (*ItemExponentialFailureRateLimiter) When ¶
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration
type ItemFastSlowRateLimiter ¶
type ItemFastSlowRateLimiter struct {
// contains filtered or unexported fields
}
ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
func (*ItemFastSlowRateLimiter) Forget ¶
func (r *ItemFastSlowRateLimiter) Forget(item interface{})
func (*ItemFastSlowRateLimiter) NumRequeues ¶
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int
func (*ItemFastSlowRateLimiter) When ¶
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration
type MaxOfRateLimiter ¶
type MaxOfRateLimiter struct {
// contains filtered or unexported fields
}
MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.
func (*MaxOfRateLimiter) Forget ¶
func (r *MaxOfRateLimiter) Forget(item interface{})
func (*MaxOfRateLimiter) NumRequeues ¶
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int
func (*MaxOfRateLimiter) When ¶
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration
type MetricsProvider ¶
type MetricsProvider interface { NewDepthMetric(name string) GaugeMetric NewAddsMetric(name string) CounterMetric NewLatencyMetric(name string) HistogramMetric NewWorkDurationMetric(name string) HistogramMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric }
MetricsProvider generates various metrics used by the queue.
type RateLimiter ¶
type RateLimiter interface { // When gets an item and gets to decide how long that item should wait When(item interface{}) time.Duration // Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing // or for success, we'll stop tracking it Forget(item interface{}) // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int }
func DefaultControllerRateLimiter ¶
func DefaultControllerRateLimiter() RateLimiter
DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultItemBasedRateLimiter ¶
func DefaultItemBasedRateLimiter() RateLimiter
func NewItemExponentialFailureRateLimiter ¶
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
func NewItemFastSlowRateLimiter ¶
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
func NewMaxOfRateLimiter ¶
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
func NewWithMaxWaitRateLimiter ¶
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter
type RateLimitingInterface ¶
type RateLimitingInterface interface { DelayingInterface // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok AddRateLimited(item interface{}) // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) // NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int }
RateLimitingInterface is an interface that rate limits items being added to the queue.
func NewNamedRateLimitingQueue ¶
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface
func NewRateLimitingQueue ¶
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface
NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever. NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use NewNamedRateLimitingQueue instead.
type SettableGaugeMetric ¶
type SettableGaugeMetric interface {
Set(float64)
}
SettableGaugeMetric represents a single numerical value that can arbitrarily go up and down. (Separate from GaugeMetric to preserve backwards compatibility.)
type SummaryMetric ¶
type SummaryMetric interface {
Observe(float64)
}
SummaryMetric captures individual observations.
type Type ¶
type Type struct {
// contains filtered or unexported fields
}
Type is a work queue (see the package comment).
func (*Type) Done ¶
func (q *Type) Done(item interface{})
Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.
func (*Type) Get ¶
Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.
func (*Type) Len ¶
Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.
func (*Type) ShutDown ¶
func (q *Type) ShutDown()
ShutDown will cause q to ignore all new items added to it and immediately instruct the worker goroutines to exit.
func (*Type) ShutDownWithDrain ¶
func (q *Type) ShutDownWithDrain()
ShutDownWithDrain will cause q to ignore all new items added to it. As soon as the worker goroutines have "drained", i.e: finished processing and called Done on all existing items in the queue; they will be instructed to exit and ShutDownWithDrain will return. Hence: a strict requirement for using this is; your workers must ensure that Done is called on all items in the queue once the shut down has been initiated, if that is not the case: this will block indefinitely. It is, however, safe to call ShutDown after having called ShutDownWithDrain, as to force the queue shut down to terminate immediately without waiting for the drainage.
func (*Type) ShuttingDown ¶
type WithMaxWaitRateLimiter ¶
type WithMaxWaitRateLimiter struct {
// contains filtered or unexported fields
}
WithMaxWaitRateLimiter have maxDelay which avoids waiting too long
func (WithMaxWaitRateLimiter) Forget ¶
func (w WithMaxWaitRateLimiter) Forget(item interface{})
func (WithMaxWaitRateLimiter) NumRequeues ¶
func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int
func (WithMaxWaitRateLimiter) When ¶
func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration