workqueue

package
v0.19.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: GPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BucketRateLimiter

type BucketRateLimiter struct {
	*rate.Limiter
}

BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API

func (*BucketRateLimiter) Forget

func (r *BucketRateLimiter) Forget(_ interface{})

func (*BucketRateLimiter) NumRequeues

func (r *BucketRateLimiter) NumRequeues(_ interface{}) int

func (*BucketRateLimiter) When

func (r *BucketRateLimiter) When(_ interface{}) time.Duration

type DelayingInterface

type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}

DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.

func NewDelayingQueue

func NewDelayingQueue() DelayingInterface

NewDelayingQueue constructs a new workqueue with delayed queuing ability. NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use NewNamedDelayingQueue instead.

func NewDelayingQueueWithCustomClock

func NewDelayingQueueWithCustomClock(clockWithTicker clock.WithTicker, name string) DelayingInterface

NewDelayingQueueWithCustomClock constructs a new named workqueue with ability to inject real or fake clock for testing purposes

func NewDelayingQueueWithCustomQueue

func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface

NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to inject custom queue Interface instead of the default one

func NewNamedDelayingQueue

func NewNamedDelayingQueue(name string) DelayingInterface

NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability

type Interface

type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShutDownWithDrain()
	ShuttingDown() bool
}

type ItemExponentialFailureRateLimiter

type ItemExponentialFailureRateLimiter struct {
	// contains filtered or unexported fields
}

ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit dealing with max failures and expiration are up to the caller

func (*ItemExponentialFailureRateLimiter) Forget

func (r *ItemExponentialFailureRateLimiter) Forget(item interface{})

func (*ItemExponentialFailureRateLimiter) NumRequeues

func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int

func (*ItemExponentialFailureRateLimiter) When

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration

type ItemFastSlowRateLimiter

type ItemFastSlowRateLimiter struct {
	// contains filtered or unexported fields
}

ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that

func (*ItemFastSlowRateLimiter) Forget

func (r *ItemFastSlowRateLimiter) Forget(item interface{})

func (*ItemFastSlowRateLimiter) NumRequeues

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int

func (*ItemFastSlowRateLimiter) When

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration

type MaxOfRateLimiter

type MaxOfRateLimiter struct {
	// contains filtered or unexported fields
}

MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.

func (*MaxOfRateLimiter) Forget

func (r *MaxOfRateLimiter) Forget(item interface{})

func (*MaxOfRateLimiter) NumRequeues

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int

func (*MaxOfRateLimiter) When

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration

type RateLimiter

type RateLimiter interface {
	// When gets an item and gets to decide how long that item should wait
	When(item interface{}) time.Duration
	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing
	// or for success, we'll stop tracking it
	Forget(item interface{})
	// NumRequeues returns how many failures the item has had
	NumRequeues(item interface{}) int
}

func DefaultControllerRateLimiter

func DefaultControllerRateLimiter() RateLimiter

DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential

func DefaultItemBasedRateLimiter

func DefaultItemBasedRateLimiter() RateLimiter

func NewItemExponentialFailureRateLimiter

func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter

func NewItemFastSlowRateLimiter

func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter

func NewMaxOfRateLimiter

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter

func NewWithMaxWaitRateLimiter

func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter

type RateLimitingInterface

type RateLimitingInterface interface {
	DelayingInterface

	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
	AddRateLimited(item interface{})

	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
	Forget(item interface{})

	// NumRequeues returns how many times the item was requeued
	NumRequeues(item interface{}) int
}

RateLimitingInterface is an interface that rate limits items being added to the queue.

func NewNamedRateLimitingQueue

func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface

func NewRateLimitingQueue

func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface

NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever. NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use NewNamedRateLimitingQueue instead.

type Type

type Type struct {
	// contains filtered or unexported fields
}

Type is a work queue (see the package comment).

func New

func New() *Type

New constructs a new work queue (see the package comment).

func NewNamed

func NewNamed(_ string) *Type

func (*Type) Add

func (q *Type) Add(item interface{})

Add marks item as needing processing.

func (*Type) Done

func (q *Type) Done(item interface{})

Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.

func (*Type) Get

func (q *Type) Get() (item interface{}, shutdown bool)

Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.

func (*Type) Len

func (q *Type) Len() int

Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.

func (*Type) ShutDown

func (q *Type) ShutDown()

ShutDown will cause q to ignore all new items added to it and immediately instruct the worker goroutines to exit.

func (*Type) ShutDownWithDrain

func (q *Type) ShutDownWithDrain()

ShutDownWithDrain will cause q to ignore all new items added to it. As soon as the worker goroutines have "drained", i.e: finished processing and called Done on all existing items in the queue; they will be instructed to exit and ShutDownWithDrain will return. Hence: a strict requirement for using this is; your workers must ensure that Done is called on all items in the queue once the shut down has been initiated, if that is not the case: this will block indefinitely. It is, however, safe to call ShutDown after having called ShutDownWithDrain, as to force the queue shut down to terminate immediately without waiting for the drainage.

func (*Type) ShuttingDown

func (q *Type) ShuttingDown() bool

type WithMaxWaitRateLimiter

type WithMaxWaitRateLimiter struct {
	// contains filtered or unexported fields
}

WithMaxWaitRateLimiter have maxDelay which avoids waiting too long

func (WithMaxWaitRateLimiter) Forget

func (w WithMaxWaitRateLimiter) Forget(item interface{})

func (WithMaxWaitRateLimiter) NumRequeues

func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int

func (WithMaxWaitRateLimiter) When

func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL