workerqueue

package
v1.45.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2024 License: Apache-2.0 Imports: 12 Imported by: 19

Documentation

Overview

Package workerqueue extends client-go's workqueue functionality into an opinionated queue + worker model that is reusable

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FastRateLimiter added in v1.1.0

func FastRateLimiter(maxDelay time.Duration) workqueue.TypedRateLimiter[any]

FastRateLimiter returns a rate limiter without exponential back-off, with specified maximum per-item retry delay.

func NewTraceError added in v1.44.0

func NewTraceError(err error) error

NewTraceError returns a traceError wrapper around an error.

Types

type Handler

type Handler func(context.Context, string) error

Handler is the handler for processing the work queue This is usually a syncronisation handler for a controller or related

type WorkerQueue

type WorkerQueue struct {

	// SyncHandler is exported to make testing easier (hack)
	SyncHandler Handler
	// contains filtered or unexported fields
}

WorkerQueue is an opinionated queue + worker for use with controllers and related and processing Kubernetes watched events and synchronising resources

func NewWorkerQueue

func NewWorkerQueue(handler Handler, logger *logrus.Entry, keyName logfields.ResourceType, queueName string) *WorkerQueue

NewWorkerQueue returns a new worker queue for a given name

func NewWorkerQueueWithRateLimiter added in v0.8.0

func NewWorkerQueueWithRateLimiter(handler Handler, logger *logrus.Entry, keyName logfields.ResourceType, queueName string, rateLimiter workqueue.TypedRateLimiter[any]) *WorkerQueue

NewWorkerQueueWithRateLimiter returns a new worker queue for a given name and a custom rate limiter.

func (*WorkerQueue) Enqueue

func (wq *WorkerQueue) Enqueue(obj interface{})

Enqueue puts the name of the runtime.Object in the queue to be processed. If you need to send through an explicit key, use an cache.ExplicitKey

func (*WorkerQueue) EnqueueAfter added in v0.11.0

func (wq *WorkerQueue) EnqueueAfter(obj interface{}, duration time.Duration)

EnqueueAfter delays an enqueue operation by duration

func (*WorkerQueue) EnqueueImmediately added in v0.8.0

func (wq *WorkerQueue) EnqueueImmediately(obj interface{})

EnqueueImmediately performs Enqueue but without rate-limiting. This should be used to continue partially completed work after giving other items in the queue a chance of running.

func (*WorkerQueue) Healthy

func (wq *WorkerQueue) Healthy() error

Healthy reports whether all the worker goroutines are running.

func (*WorkerQueue) Run

func (wq *WorkerQueue) Run(ctx context.Context, workers int)

Run the WorkerQueue processing via the Handler. Will block until stop is closed. Runs a certain number workers to process the rate limited queue

func (*WorkerQueue) RunCount

func (wq *WorkerQueue) RunCount() int

RunCount reports the number of running worker goroutines started by Run.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL