Documentation ¶
Overview ¶
Package workqueue provides a simple queue that supports the following features:
- Fair: items processed in the order in which they are added.
- Stingy: a single item will not be processed multiple times concurrently, and if an item is added multiple times before it can be processed, it will only be processed once.
- Multiple consumers and producers. In particular, it is allowed for an item to be reenqueued while it is being processed.
- Shutdown notifications.
Index ¶
- func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, ...)
- func SetProvider(metricsProvider MetricsProvider)
- func WithChunkSize(c int) func(*options)
- type BucketRateLimiterdeprecated
- type CounterMetric
- type DelayingInterfacedeprecated
- func NewDelayingQueue() DelayingInterfacedeprecated
- func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterfacedeprecated
- func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface
- func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface
- func NewNamedDelayingQueue(name string) DelayingInterface
- type DelayingQueueConfigdeprecated
- type DoWorkPieceFunc
- type GaugeMetric
- type HistogramMetric
- type Interfacedeprecated
- type ItemExponentialFailureRateLimiterdeprecated
- type ItemFastSlowRateLimiter
- type MaxOfRateLimiterdeprecated
- type MetricsProvider
- type Options
- type Queue
- type QueueConfig
- type RateLimiterdeprecated
- func DefaultControllerRateLimiter() RateLimiterdeprecated
- func DefaultItemBasedRateLimiter() RateLimiterdeprecated
- func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiterdeprecated
- func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiterdeprecated
- func NewMaxOfRateLimiter(limiters ...TypedRateLimiter[any]) RateLimiterdeprecated
- func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiterdeprecated
- type RateLimitingInterfacedeprecated
- func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface
- func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterfacedeprecated
- func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterfacedeprecated
- func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface
- type RateLimitingQueueConfigdeprecated
- type SettableGaugeMetric
- type SummaryMetric
- type Type
- type Typed
- type TypedBucketRateLimiter
- type TypedDelayingInterface
- type TypedDelayingQueueConfig
- type TypedInterface
- type TypedItemExponentialFailureRateLimiter
- type TypedItemFastSlowRateLimiter
- type TypedMaxOfRateLimiter
- type TypedQueueConfig
- type TypedRateLimiter
- func DefaultTypedControllerRateLimiter[T comparable]() TypedRateLimiter[T]
- func DefaultTypedItemBasedRateLimiter[T comparable]() TypedRateLimiter[T]
- func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T]
- func NewTypedItemFastSlowRateLimiter[T comparable](fastDelay, slowDelay time.Duration, maxFastAttempts int) TypedRateLimiter[T]
- func NewTypedMaxOfRateLimiter[T comparable](limiters ...TypedRateLimiter[T]) TypedRateLimiter[T]
- func NewTypedWithMaxWaitRateLimiter[T comparable](limiter TypedRateLimiter[T], maxDelay time.Duration) TypedRateLimiter[T]
- type TypedRateLimitingInterface
- type TypedRateLimitingQueueConfig
- type TypedWithMaxWaitRateLimiter
- type WithMaxWaitRateLimiter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ParallelizeUntil ¶
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options)
ParallelizeUntil is a framework that allows for parallelizing N independent pieces of work until done or the context is canceled.
func SetProvider ¶
func SetProvider(metricsProvider MetricsProvider)
SetProvider sets the metrics provider for all subsequently created work queues. Only the first call has an effect.
func WithChunkSize ¶ added in v0.19.0
func WithChunkSize(c int) func(*options)
WithChunkSize allows to set chunks of work items to the workers, rather than processing one by one. It is recommended to use this option if the number of pieces significantly higher than the number of workers and the work done for each item is small.
Types ¶
type BucketRateLimiter
deprecated
type BucketRateLimiter = TypedBucketRateLimiter[any]
Deprecated: BucketRateLimiter is deprecated, use TypedBucketRateLimiter instead.
type CounterMetric ¶
type CounterMetric interface {
Inc()
}
CounterMetric represents a single numerical value that only ever goes up.
type DelayingInterface
deprecated
type DelayingInterface TypedDelayingInterface[any]
DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.
Deprecated: use TypedDelayingInterface instead.
func NewDelayingQueue
deprecated
func NewDelayingQueue() DelayingInterface
NewDelayingQueue constructs a new workqueue with delayed queuing ability. NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use NewDelayingQueueWithConfig instead and specify a name.
Deprecated: use TypedNewDelayingQueue instead.
func NewDelayingQueueWithConfig
deprecated
added in
v0.27.0
func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface
NewDelayingQueueWithConfig constructs a new workqueue with options to customize different properties.
Deprecated: use TypedNewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomClock ¶ added in v0.17.0
func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface
NewDelayingQueueWithCustomClock constructs a new named workqueue with ability to inject real or fake clock for testing purposes. Deprecated: Use NewDelayingQueueWithConfig instead.
func NewDelayingQueueWithCustomQueue ¶ added in v0.19.0
func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface
NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to inject custom queue Interface instead of the default one Deprecated: Use NewDelayingQueueWithConfig instead.
func NewNamedDelayingQueue ¶
func NewNamedDelayingQueue(name string) DelayingInterface
NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability. Deprecated: Use NewDelayingQueueWithConfig instead.
type DelayingQueueConfig
deprecated
added in
v0.27.0
type DelayingQueueConfig = TypedDelayingQueueConfig[any]
DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
Deprecated: use TypedDelayingQueueConfig instead.
type DoWorkPieceFunc ¶
type DoWorkPieceFunc func(piece int)
type GaugeMetric ¶
type GaugeMetric interface { Inc() Dec() }
GaugeMetric represents a single numerical value that can arbitrarily go up and down.
type HistogramMetric ¶
type HistogramMetric interface {
Observe(float64)
}
HistogramMetric counts individual observations.
type Interface
deprecated
type Interface TypedInterface[any]
Deprecated: Interface is deprecated, use TypedInterface instead.
type ItemExponentialFailureRateLimiter
deprecated
type ItemExponentialFailureRateLimiter = TypedItemExponentialFailureRateLimiter[any]
Deprecated: ItemExponentialFailureRateLimiter is deprecated, use TypedItemExponentialFailureRateLimiter instead.
type ItemFastSlowRateLimiter ¶
type ItemFastSlowRateLimiter = TypedItemFastSlowRateLimiter[any]
ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that Deprecated: Use TypedItemFastSlowRateLimiter instead.
type MaxOfRateLimiter
deprecated
type MaxOfRateLimiter = TypedMaxOfRateLimiter[any]
MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.
Deprecated: Use TypedMaxOfRateLimiter instead.
type MetricsProvider ¶
type MetricsProvider interface { NewDepthMetric(name string) GaugeMetric NewAddsMetric(name string) CounterMetric NewLatencyMetric(name string) HistogramMetric NewWorkDurationMetric(name string) HistogramMetric NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric NewRetriesMetric(name string) CounterMetric }
MetricsProvider generates various metrics used by the queue.
type Queue ¶ added in v0.31.0
type Queue[T comparable] interface { // Touch can be hooked when an existing item is added again. This may be // useful if the implementation allows priority change for the given item. Touch(item T) // Push adds a new item. Push(item T) // Len tells the total number of items. Len() int // Pop retrieves an item. Pop() (item T) }
Queue is the underlying storage for items. The functions below are always called from the same goroutine.
func DefaultQueue ¶ added in v0.31.0
func DefaultQueue[T comparable]() Queue[T]
DefaultQueue is a slice based FIFO queue.
type QueueConfig ¶ added in v0.27.0
type QueueConfig = TypedQueueConfig[any]
QueueConfig specifies optional configurations to customize an Interface. Deprecated: use TypedQueueConfig instead.
type RateLimiter
deprecated
type RateLimiter TypedRateLimiter[any]
Deprecated: RateLimiter is deprecated, use TypedRateLimiter instead.
func DefaultControllerRateLimiter
deprecated
func DefaultControllerRateLimiter() RateLimiter
DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
Deprecated: Use DefaultTypedControllerRateLimiter instead.
func DefaultItemBasedRateLimiter
deprecated
func DefaultItemBasedRateLimiter() RateLimiter
Deprecated: DefaultItemBasedRateLimiter is deprecated, use DefaultTypedItemBasedRateLimiter instead.
func NewItemExponentialFailureRateLimiter
deprecated
func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
Deprecated: NewItemExponentialFailureRateLimiter is deprecated, use NewTypedItemExponentialFailureRateLimiter instead.
func NewItemFastSlowRateLimiter
deprecated
func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
Deprecated: NewItemFastSlowRateLimiter is deprecated, use NewTypedItemFastSlowRateLimiter instead.
func NewMaxOfRateLimiter
deprecated
func NewMaxOfRateLimiter(limiters ...TypedRateLimiter[any]) RateLimiter
Deprecated: NewMaxOfRateLimiter is deprecated, use NewTypedMaxOfRateLimiter instead.
func NewWithMaxWaitRateLimiter
deprecated
added in
v0.23.0
func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter
Deprecated: NewWithMaxWaitRateLimiter is deprecated, use NewTypedWithMaxWaitRateLimiter instead.
type RateLimitingInterface
deprecated
type RateLimitingInterface TypedRateLimitingInterface[any]
RateLimitingInterface is an interface that rate limits items being added to the queue.
Deprecated: Use TypedRateLimitingInterface instead.
func NewNamedRateLimitingQueue ¶
func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface
NewNamedRateLimitingQueue constructs a new named workqueue with rateLimited queuing ability. Deprecated: Use NewRateLimitingQueueWithConfig instead.
func NewRateLimitingQueue
deprecated
func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface
NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever. NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use NewRateLimitingQueueWithConfig instead and specify a name.
Deprecated: Use NewTypedRateLimitingQueue instead.
func NewRateLimitingQueueWithConfig
deprecated
added in
v0.27.0
func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface
NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability with options to customize different properties. Remember to call Forget! If you don't, you may end up tracking failures forever.
Deprecated: Use NewTypedRateLimitingQueueWithConfig instead.
func NewRateLimitingQueueWithDelayingInterface ¶ added in v0.25.0
func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface
NewRateLimitingQueueWithDelayingInterface constructs a new named workqueue with rateLimited queuing ability with the option to inject a custom delaying queue instead of the default one. Deprecated: Use NewRateLimitingQueueWithConfig instead.
type RateLimitingQueueConfig
deprecated
added in
v0.27.0
type RateLimitingQueueConfig = TypedRateLimitingQueueConfig[any]
RateLimitingQueueConfig specifies optional configurations to customize a RateLimitingInterface.
Deprecated: Use TypedRateLimitingQueueConfig instead.
type SettableGaugeMetric ¶
type SettableGaugeMetric interface {
Set(float64)
}
SettableGaugeMetric represents a single numerical value that can arbitrarily go up and down. (Separate from GaugeMetric to preserve backwards compatibility.)
type SummaryMetric ¶
type SummaryMetric interface {
Observe(float64)
}
SummaryMetric captures individual observations.
type Type ¶
Type is a work queue (see the package comment). Deprecated: Use Typed instead.
func NewWithConfig
deprecated
added in
v0.27.0
func NewWithConfig(config QueueConfig) *Type
NewWithConfig constructs a new workqueue with ability to customize different properties.
Deprecated: use NewTypedWithConfig instead.
type Typed ¶ added in v0.31.0
type Typed[t comparable] struct { // contains filtered or unexported fields }
func NewTyped ¶ added in v0.31.0
func NewTyped[T comparable]() *Typed[T]
NewTyped constructs a new work queue (see the package comment).
func NewTypedWithConfig ¶ added in v0.31.0
func NewTypedWithConfig[T comparable](config TypedQueueConfig[T]) *Typed[T]
NewTypedWithConfig constructs a new workqueue with ability to customize different properties.
func (*Typed[T]) Add ¶ added in v0.31.0
func (q *Typed[T]) Add(item T)
Add marks item as needing processing.
func (*Typed[T]) Done ¶ added in v0.31.0
func (q *Typed[T]) Done(item T)
Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.
func (*Typed[T]) Get ¶ added in v0.31.0
Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.
func (*Typed[T]) Len ¶ added in v0.31.0
Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.
func (*Typed[T]) ShutDown ¶ added in v0.31.0
func (q *Typed[T]) ShutDown()
ShutDown will cause q to ignore all new items added to it and immediately instruct the worker goroutines to exit.
func (*Typed[T]) ShutDownWithDrain ¶ added in v0.31.0
func (q *Typed[T]) ShutDownWithDrain()
ShutDownWithDrain will cause q to ignore all new items added to it. As soon as the worker goroutines have "drained", i.e: finished processing and called Done on all existing items in the queue; they will be instructed to exit and ShutDownWithDrain will return. Hence: a strict requirement for using this is; your workers must ensure that Done is called on all items in the queue once the shut down has been initiated, if that is not the case: this will block indefinitely. It is, however, safe to call ShutDown after having called ShutDownWithDrain, as to force the queue shut down to terminate immediately without waiting for the drainage.
func (*Typed[T]) ShuttingDown ¶ added in v0.31.0
type TypedBucketRateLimiter ¶ added in v0.31.0
type TypedBucketRateLimiter[T comparable] struct { *rate.Limiter }
TypedBucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
func (*TypedBucketRateLimiter[T]) Forget ¶ added in v0.31.0
func (r *TypedBucketRateLimiter[T]) Forget(item T)
func (*TypedBucketRateLimiter[T]) NumRequeues ¶ added in v0.31.0
func (r *TypedBucketRateLimiter[T]) NumRequeues(item T) int
func (*TypedBucketRateLimiter[T]) When ¶ added in v0.31.0
func (r *TypedBucketRateLimiter[T]) When(item T) time.Duration
type TypedDelayingInterface ¶ added in v0.31.0
type TypedDelayingInterface[T comparable] interface { TypedInterface[T] // AddAfter adds an item to the workqueue after the indicated duration has passed AddAfter(item T, duration time.Duration) }
TypedDelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.
func NewTypedDelayingQueueWithConfig ¶ added in v0.31.0
func NewTypedDelayingQueueWithConfig[T comparable](config TypedDelayingQueueConfig[T]) TypedDelayingInterface[T]
NewTypedDelayingQueueWithConfig constructs a new workqueue with options to customize different properties.
func TypedNewDelayingQueue ¶ added in v0.31.0
func TypedNewDelayingQueue[T comparable]() TypedDelayingInterface[T]
TypedNewDelayingQueue constructs a new workqueue with delayed queuing ability. TypedNewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use TypedNewDelayingQueueWithConfig instead and specify a name.
type TypedDelayingQueueConfig ¶ added in v0.31.0
type TypedDelayingQueueConfig[T comparable] struct { // Name for the queue. If unnamed, the metrics will not be registered. Name string // MetricsProvider optionally allows specifying a metrics provider to use for the queue // instead of the global provider. MetricsProvider MetricsProvider // Clock optionally allows injecting a real or fake clock for testing purposes. Clock clock.WithTicker // Queue optionally allows injecting custom queue Interface instead of the default one. Queue TypedInterface[T] }
TypedDelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
type TypedInterface ¶ added in v0.31.0
type TypedInterface[T comparable] interface { Add(item T) Len() int Get() (item T, shutdown bool) Done(item T) ShutDown() ShutDownWithDrain() ShuttingDown() bool }
type TypedItemExponentialFailureRateLimiter ¶ added in v0.31.0
type TypedItemExponentialFailureRateLimiter[T comparable] struct { // contains filtered or unexported fields }
TypedItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit dealing with max failures and expiration are up to the caller
func (*TypedItemExponentialFailureRateLimiter[T]) Forget ¶ added in v0.31.0
func (r *TypedItemExponentialFailureRateLimiter[T]) Forget(item T)
func (*TypedItemExponentialFailureRateLimiter[T]) NumRequeues ¶ added in v0.31.0
func (r *TypedItemExponentialFailureRateLimiter[T]) NumRequeues(item T) int
func (*TypedItemExponentialFailureRateLimiter[T]) When ¶ added in v0.31.0
func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration
type TypedItemFastSlowRateLimiter ¶ added in v0.31.0
type TypedItemFastSlowRateLimiter[T comparable] struct { // contains filtered or unexported fields }
TypedItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
func (*TypedItemFastSlowRateLimiter[T]) Forget ¶ added in v0.31.0
func (r *TypedItemFastSlowRateLimiter[T]) Forget(item T)
func (*TypedItemFastSlowRateLimiter[T]) NumRequeues ¶ added in v0.31.0
func (r *TypedItemFastSlowRateLimiter[T]) NumRequeues(item T) int
func (*TypedItemFastSlowRateLimiter[T]) When ¶ added in v0.31.0
func (r *TypedItemFastSlowRateLimiter[T]) When(item T) time.Duration
type TypedMaxOfRateLimiter ¶ added in v0.31.0
type TypedMaxOfRateLimiter[T comparable] struct { // contains filtered or unexported fields }
TypedMaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.
func (*TypedMaxOfRateLimiter[T]) Forget ¶ added in v0.31.0
func (r *TypedMaxOfRateLimiter[T]) Forget(item T)
func (*TypedMaxOfRateLimiter[T]) NumRequeues ¶ added in v0.31.0
func (r *TypedMaxOfRateLimiter[T]) NumRequeues(item T) int
func (*TypedMaxOfRateLimiter[T]) When ¶ added in v0.31.0
func (r *TypedMaxOfRateLimiter[T]) When(item T) time.Duration
type TypedQueueConfig ¶ added in v0.31.0
type TypedQueueConfig[T comparable] struct { // Name for the queue. If unnamed, the metrics will not be registered. Name string // MetricsProvider optionally allows specifying a metrics provider to use for the queue // instead of the global provider. MetricsProvider MetricsProvider // Clock ability to inject real or fake clock for testing purposes. Clock clock.WithTicker // Queue provides the underlying queue to use. It is optional and defaults to slice based FIFO queue. Queue Queue[T] }
type TypedRateLimiter ¶ added in v0.31.0
type TypedRateLimiter[T comparable] interface { // When gets an item and gets to decide how long that item should wait When(item T) time.Duration // Forget indicates that an item is finished being retried. Doesn't matter whether it's for failing // or for success, we'll stop tracking it Forget(item T) // NumRequeues returns back how many failures the item has had NumRequeues(item T) int }
func DefaultTypedControllerRateLimiter ¶ added in v0.31.0
func DefaultTypedControllerRateLimiter[T comparable]() TypedRateLimiter[T]
DefaultTypedControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
func DefaultTypedItemBasedRateLimiter ¶ added in v0.31.0
func DefaultTypedItemBasedRateLimiter[T comparable]() TypedRateLimiter[T]
func NewTypedItemExponentialFailureRateLimiter ¶ added in v0.31.0
func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T]
func NewTypedItemFastSlowRateLimiter ¶ added in v0.31.0
func NewTypedItemFastSlowRateLimiter[T comparable](fastDelay, slowDelay time.Duration, maxFastAttempts int) TypedRateLimiter[T]
func NewTypedMaxOfRateLimiter ¶ added in v0.31.0
func NewTypedMaxOfRateLimiter[T comparable](limiters ...TypedRateLimiter[T]) TypedRateLimiter[T]
func NewTypedWithMaxWaitRateLimiter ¶ added in v0.31.0
func NewTypedWithMaxWaitRateLimiter[T comparable](limiter TypedRateLimiter[T], maxDelay time.Duration) TypedRateLimiter[T]
type TypedRateLimitingInterface ¶ added in v0.31.0
type TypedRateLimitingInterface[T comparable] interface { TypedDelayingInterface[T] // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok AddRateLimited(item T) // Forget indicates that an item is finished being retried. Doesn't matter whether it's for perm failing // or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you // still have to call `Done` on the queue. Forget(item T) // NumRequeues returns back how many times the item was requeued NumRequeues(item T) int }
TypedRateLimitingInterface is an interface that rate limits items being added to the queue.
func NewTypedRateLimitingQueue ¶ added in v0.31.0
func NewTypedRateLimitingQueue[T comparable](rateLimiter TypedRateLimiter[T]) TypedRateLimitingInterface[T]
NewTypedRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever. NewTypedRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use NewTypedRateLimitingQueueWithConfig instead and specify a name.
func NewTypedRateLimitingQueueWithConfig ¶ added in v0.31.0
func NewTypedRateLimitingQueueWithConfig[T comparable](rateLimiter TypedRateLimiter[T], config TypedRateLimitingQueueConfig[T]) TypedRateLimitingInterface[T]
NewTypedRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability with options to customize different properties. Remember to call Forget! If you don't, you may end up tracking failures forever.
type TypedRateLimitingQueueConfig ¶ added in v0.31.0
type TypedRateLimitingQueueConfig[T comparable] struct { // Name for the queue. If unnamed, the metrics will not be registered. Name string // MetricsProvider optionally allows specifying a metrics provider to use for the queue // instead of the global provider. MetricsProvider MetricsProvider // Clock optionally allows injecting a real or fake clock for testing purposes. Clock clock.WithTicker // DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one. DelayingQueue TypedDelayingInterface[T] }
TypedRateLimitingQueueConfig specifies optional configurations to customize a TypedRateLimitingInterface.
type TypedWithMaxWaitRateLimiter ¶ added in v0.31.0
type TypedWithMaxWaitRateLimiter[T comparable] struct { // contains filtered or unexported fields }
TypedWithMaxWaitRateLimiter have maxDelay which avoids waiting too long
func (TypedWithMaxWaitRateLimiter[T]) Forget ¶ added in v0.31.0
func (w TypedWithMaxWaitRateLimiter[T]) Forget(item T)
func (TypedWithMaxWaitRateLimiter[T]) NumRequeues ¶ added in v0.31.0
func (w TypedWithMaxWaitRateLimiter[T]) NumRequeues(item T) int
func (TypedWithMaxWaitRateLimiter[T]) When ¶ added in v0.31.0
func (w TypedWithMaxWaitRateLimiter[T]) When(item T) time.Duration
type WithMaxWaitRateLimiter ¶ added in v0.23.0
type WithMaxWaitRateLimiter = TypedWithMaxWaitRateLimiter[any]
WithMaxWaitRateLimiter have maxDelay which avoids waiting too long Deprecated: Use TypedWithMaxWaitRateLimiter instead.