Documentation ¶
Overview ¶
Package workerqueue extends client-go's workqueue functionality into an opinionated queue + worker model that is reusable
Index ¶
- func FastRateLimiter(maxDelay time.Duration) workqueue.TypedRateLimiter[any]
- func NewTraceError(err error) error
- type Handler
- type WorkerQueue
- func (wq *WorkerQueue) Enqueue(obj interface{})
- func (wq *WorkerQueue) EnqueueAfter(obj interface{}, duration time.Duration)
- func (wq *WorkerQueue) EnqueueImmediately(obj interface{})
- func (wq *WorkerQueue) Healthy() error
- func (wq *WorkerQueue) Run(ctx context.Context, workers int)
- func (wq *WorkerQueue) RunCount() int
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FastRateLimiter ¶ added in v1.1.0
func FastRateLimiter(maxDelay time.Duration) workqueue.TypedRateLimiter[any]
FastRateLimiter returns a rate limiter without exponential back-off, with specified maximum per-item retry delay.
func NewTraceError ¶ added in v1.44.0
NewTraceError returns a traceError wrapper around an error.
Types ¶
type Handler ¶
Handler is the handler for processing the work queue This is usually a syncronisation handler for a controller or related
type WorkerQueue ¶
type WorkerQueue struct { // SyncHandler is exported to make testing easier (hack) SyncHandler Handler // contains filtered or unexported fields }
WorkerQueue is an opinionated queue + worker for use with controllers and related and processing Kubernetes watched events and synchronising resources
func NewWorkerQueue ¶
func NewWorkerQueue(handler Handler, logger *logrus.Entry, keyName logfields.ResourceType, queueName string) *WorkerQueue
NewWorkerQueue returns a new worker queue for a given name
func NewWorkerQueueWithRateLimiter ¶ added in v0.8.0
func NewWorkerQueueWithRateLimiter(handler Handler, logger *logrus.Entry, keyName logfields.ResourceType, queueName string, rateLimiter workqueue.TypedRateLimiter[any]) *WorkerQueue
NewWorkerQueueWithRateLimiter returns a new worker queue for a given name and a custom rate limiter.
func (*WorkerQueue) Enqueue ¶
func (wq *WorkerQueue) Enqueue(obj interface{})
Enqueue puts the name of the runtime.Object in the queue to be processed. If you need to send through an explicit key, use an cache.ExplicitKey
func (*WorkerQueue) EnqueueAfter ¶ added in v0.11.0
func (wq *WorkerQueue) EnqueueAfter(obj interface{}, duration time.Duration)
EnqueueAfter delays an enqueue operation by duration
func (*WorkerQueue) EnqueueImmediately ¶ added in v0.8.0
func (wq *WorkerQueue) EnqueueImmediately(obj interface{})
EnqueueImmediately performs Enqueue but without rate-limiting. This should be used to continue partially completed work after giving other items in the queue a chance of running.
func (*WorkerQueue) Healthy ¶
func (wq *WorkerQueue) Healthy() error
Healthy reports whether all the worker goroutines are running.
func (*WorkerQueue) Run ¶
func (wq *WorkerQueue) Run(ctx context.Context, workers int)
Run the WorkerQueue processing via the Handler. Will block until stop is closed. Runs a certain number workers to process the rate limited queue
func (*WorkerQueue) RunCount ¶
func (wq *WorkerQueue) RunCount() int
RunCount reports the number of running worker goroutines started by Run.