Documentation ¶
Overview ¶
Package workqueue provides a simple queue that supports the following features:
- Fair: items processed in the order in which they are added.
- Stingy: a single item will not be processed multiple times concurrently, and if an item is added multiple times before it can be processed, it will only be processed once.
- Multiple consumers and producers. In particular, it is allowed for an item to be reenqueued while it is being processed.
- Shutdown notifications.
Index ¶
- func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc)
- func SetProvider(metricsProvider MetricsProvider)
- type BucketRateLimiter
- type CounterMetric
- type DelayingInterface
- type DoWorkPieceFunc
- type GaugeMetric
- type HistogramMetric
- type Interface
- type ItemExponentialFailureRateLimiter
- type ItemFastSlowRateLimiter
- type MaxOfRateLimiter
- type MetricsProvider
- type RateLimiter
- func DefaultControllerRateLimiter() RateLimiter
- func DefaultItemBasedRateLimiter() RateLimiter
- func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
- func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
- func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
- type RateLimitingInterface
- type SettableGaugeMetric
- type SummaryMetric
- type Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ParallelizeUntil ¶
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc)
ParallelizeUntil is a framework that allows for parallelizing N independent pieces of work until done or the context is canceled.
func SetProvider ¶
func SetProvider(metricsProvider MetricsProvider)
SetProvider sets the metrics provider for all subsequently created work queues. Only the first call has an effect.
Types ¶
type BucketRateLimiter ¶
BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
func (*BucketRateLimiter) Forget ¶
func (r *BucketRateLimiter) Forget(item interface{})
func (*BucketRateLimiter) NumRequeues ¶
func (r *BucketRateLimiter) NumRequeues(item interface{}) int
func (*BucketRateLimiter) When ¶
func (r *BucketRateLimiter) When(item interface{}) time.Duration
type CounterMetric ¶
type CounterMetric interface {
Inc()
}
CounterMetric represents a single numerical value that only ever goes up.
type DelayingInterface ¶
type DelayingInterface interface { Interface // AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter(item interface{}, duration time.Duration) }
DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.
func NewDelayingQueue ¶
func NewDelayingQueue() DelayingInterface
NewDelayingQueue constructs a new workqueue with delayed queuing ability
func NewDelayingQueueWithCustomClock ¶ added in v0.17.0
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface
NewDelayingQueueWithCustomClock constructs a new named workqueue with ability to inject real or fake clock for testing purposes
func NewNamedDelayingQueue ¶
func NewNamedDelayingQueue(name string) DelayingInterface
NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
type DoWorkPieceFunc ¶
type DoWorkPieceFunc func(piece int)
type GaugeMetric ¶
type GaugeMetric interface { Inc() Dec() }
GaugeMetric represents a single numerical value that can arbitrarily go up and down.
type HistogramMetric ¶
type HistogramMetric interface {
Observe(float64)
}
HistogramMetric counts individual observations.
type ItemExponentialFailureRateLimiter ¶
type ItemExponentialFailureRateLimiter struct {
// contains filtered or unexported fields
}
ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit dealing with max failures and expiration are up to the caller
func (*ItemExponentialFailureRateLimiter) Forget ¶
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{})
func (*ItemExponentialFailureRateLimiter) NumRequeues ¶
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int
func (*ItemExponentialFailureRateLimiter) When ¶
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration
type ItemFastSlowRateLimiter ¶
type ItemFastSlowRateLimiter struct {
// contains filtered or unexported fields
}
ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
func (*ItemFastSlowRateLimiter) Forget ¶
func (r *ItemFastSlowRateLimiter) Forget(item interface{})
func (*ItemFastSlowRateLimiter) NumRequeues ¶
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int
func (*ItemFastSlowRateLimiter) When ¶
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration
type MaxOfRateLimiter ¶
type MaxOfRateLimiter struct {
// contains filtered or unexported fields
}
MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.
func (*MaxOfRateLimiter) Forget ¶
func (r *MaxOfRateLimiter) Forget(item interface{})
func (*MaxOfRateLimiter) NumRequeues ¶
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int
func (*MaxOfRateLimiter) When ¶
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration
type MetricsProvider ¶
type MetricsProvider interface { NewDepthMetric(name string) GaugeMetric NewAddsMetric(name string) CounterMetric NewLatencyMetric(name string) HistogramMetric NewWorkDurationMetric(name string) HistogramMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric }
MetricsProvider generates various metrics used by the queue.
type RateLimiter ¶
type RateLimiter interface { // When gets an item and gets to decide how long that item should wait When(item interface{}) time.Duration // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing // or for success, we'll stop tracking it Forget(item interface{}) // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int }
func DefaultControllerRateLimiter ¶
func DefaultControllerRateLimiter() RateLimiter
DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultItemBasedRateLimiter ¶
func DefaultItemBasedRateLimiter() RateLimiter
func NewItemExponentialFailureRateLimiter ¶
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
func NewItemFastSlowRateLimiter ¶
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
func NewMaxOfRateLimiter ¶
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
type RateLimitingInterface ¶
type RateLimitingInterface interface { DelayingInterface // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok AddRateLimited(item interface{}) // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) // NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int }
RateLimitingInterface is an interface that rate limits items being added to the queue.
func NewNamedRateLimitingQueue ¶
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface
func NewRateLimitingQueue ¶
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface
NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever.
type SettableGaugeMetric ¶
type SettableGaugeMetric interface {
Set(float64)
}
SettableGaugeMetric represents a single numerical value that can arbitrarily go up and down. (Separate from GaugeMetric to preserve backwards compatibility.)
type SummaryMetric ¶
type SummaryMetric interface {
Observe(float64)
}
SummaryMetric captures individual observations.
type Type ¶
type Type struct {
// contains filtered or unexported fields
}
Type is a work queue (see the package comment).
func (*Type) Done ¶
func (q *Type) Done(item interface{})
Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.
func (*Type) Get ¶
Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.
func (*Type) Len ¶
Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.
func (*Type) ShutDown ¶
func (q *Type) ShutDown()
ShutDown will cause q to ignore all new items added to it. As soon as the worker goroutines have drained the existing items in the queue, they will be instructed to exit.