Documentation ¶
Overview ¶
Package workqueue provides a simple queue that supports the following features:
- Fair: items processed in the order in which they are added.
- Stingy: a single item will not be processed multiple times concurrently, and if an item is added multiple times before it can be processed, it will only be processed once.
- Multiple consumers and producers. In particular, it is allowed for an item to be reenqueued while it is being processed.
- Shutdown notifications.
Index ¶
- func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc)
- type BucketRateLimiter
- type DelayingInterface
- type DoWorkPieceFunc
- type Interface
- type ItemExponentialFailureRateLimiter
- type ItemFastSlowRateLimiter
- type MaxOfRateLimiter
- type RateLimiter
- func DefaultControllerRateLimiter() RateLimiter
- func DefaultItemBasedRateLimiter() RateLimiter
- func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
- func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
- func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
- type RateLimitingInterface
- type TimedWorkQueue
- type TimedWorkQueueItem
- type Type
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Parallelize ¶ added in v1.3.0
func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc)
Parallelize is a very simple framework that allow for parallelizing N independent pieces of work.
Types ¶
type BucketRateLimiter ¶ added in v1.3.0
BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
func (*BucketRateLimiter) Forget ¶ added in v1.3.0
func (r *BucketRateLimiter) Forget(item interface{})
func (*BucketRateLimiter) NumRequeues ¶ added in v1.3.0
func (r *BucketRateLimiter) NumRequeues(item interface{}) int
func (*BucketRateLimiter) When ¶ added in v1.3.0
func (r *BucketRateLimiter) When(item interface{}) time.Duration
type DelayingInterface ¶ added in v1.3.0
type DelayingInterface interface { Interface // AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter(item interface{}, duration time.Duration) }
DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.
func NewDelayingQueue ¶ added in v1.3.0
func NewDelayingQueue() DelayingInterface
NewDelayingQueue constructs a new workqueue with delayed queuing ability
func NewNamedDelayingQueue ¶
func NewNamedDelayingQueue(name string) DelayingInterface
type DoWorkPieceFunc ¶ added in v1.3.0
type DoWorkPieceFunc func(piece int)
type ItemExponentialFailureRateLimiter ¶ added in v1.3.0
type ItemExponentialFailureRateLimiter struct {
// contains filtered or unexported fields
}
ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit dealing with max failures and expiration are up to the caller
func (*ItemExponentialFailureRateLimiter) Forget ¶ added in v1.3.0
func (r *ItemExponentialFailureRateLimiter) Forget(item interface{})
func (*ItemExponentialFailureRateLimiter) NumRequeues ¶ added in v1.3.0
func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int
func (*ItemExponentialFailureRateLimiter) When ¶ added in v1.3.0
func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration
type ItemFastSlowRateLimiter ¶ added in v1.3.0
type ItemFastSlowRateLimiter struct {
// contains filtered or unexported fields
}
ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
func (*ItemFastSlowRateLimiter) Forget ¶ added in v1.3.0
func (r *ItemFastSlowRateLimiter) Forget(item interface{})
func (*ItemFastSlowRateLimiter) NumRequeues ¶ added in v1.3.0
func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int
func (*ItemFastSlowRateLimiter) When ¶ added in v1.3.0
func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration
type MaxOfRateLimiter ¶ added in v1.3.0
type MaxOfRateLimiter struct {
// contains filtered or unexported fields
}
MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.
func (*MaxOfRateLimiter) Forget ¶ added in v1.3.0
func (r *MaxOfRateLimiter) Forget(item interface{})
func (*MaxOfRateLimiter) NumRequeues ¶ added in v1.3.0
func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int
func (*MaxOfRateLimiter) When ¶ added in v1.3.0
func (r *MaxOfRateLimiter) When(item interface{}) time.Duration
type RateLimiter ¶ added in v1.3.0
type RateLimiter interface { // When gets an item and gets to decide how long that item should wait When(item interface{}) time.Duration // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing // or for success, we'll stop tracking it Forget(item interface{}) // NumRequeues returns back how many failures the item has had NumRequeues(item interface{}) int }
func DefaultControllerRateLimiter ¶ added in v1.3.0
func DefaultControllerRateLimiter() RateLimiter
DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential
func DefaultItemBasedRateLimiter ¶ added in v1.3.0
func DefaultItemBasedRateLimiter() RateLimiter
func NewItemExponentialFailureRateLimiter ¶ added in v1.3.0
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
func NewItemFastSlowRateLimiter ¶ added in v1.3.0
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
func NewMaxOfRateLimiter ¶ added in v1.3.0
func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
type RateLimitingInterface ¶ added in v1.3.0
type RateLimitingInterface interface { DelayingInterface // AddRateLimited adds an item to the workqueue after the rate limiter says its ok AddRateLimited(item interface{}) // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item interface{}) // NumRequeues returns back how many times the item was requeued NumRequeues(item interface{}) int }
RateLimitingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.
func NewNamedRateLimitingQueue ¶
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface
func NewRateLimitingQueue ¶ added in v1.3.0
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface
NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever.
type TimedWorkQueue ¶
type TimedWorkQueue struct {
*Type
}
func NewTimedWorkQueue ¶
func NewTimedWorkQueue() *TimedWorkQueue
func (TimedWorkQueue) Add ¶
func (q TimedWorkQueue) Add(timedItem *TimedWorkQueueItem)
Add adds the obj along with the current timestamp to the queue.
func (TimedWorkQueue) Done ¶
func (q TimedWorkQueue) Done(timedItem *TimedWorkQueueItem) error
func (TimedWorkQueue) Get ¶
func (q TimedWorkQueue) Get() (timedItem *TimedWorkQueueItem, shutdown bool)
Get gets the obj along with its timestamp from the queue.
type TimedWorkQueueItem ¶
type Type ¶
type Type struct {
// contains filtered or unexported fields
}
Type is a work queue (see the package comment).
func (*Type) Done ¶
func (q *Type) Done(item interface{})
Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.
func (*Type) Get ¶
Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.
func (*Type) Len ¶ added in v0.17.0
Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.
func (*Type) ShutDown ¶
func (q *Type) ShutDown()
Shutdown will cause q to ignore all new items added to it. As soon as the worker goroutines have drained the existing items in the queue, they will be instructed to exit.