queue

package
v0.3.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxRetries is the number of times we try to process a given key before permanently forgetting it.
	MaxRetries = 20
)

Variables

This section is empty.

Functions

func DefaultRetryFunc

func DefaultRetryFunc(_ context.Context, _ string, timesTried int, _ time.Time, err error) (*time.Duration, error)

DefaultRetryFunc is the default function used for retries by the queue subsystem.

Types

type ItemHandler

type ItemHandler func(ctx context.Context, key string) error

ItemHandler is a callback that handles a single key on the Queue

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue implements a wrapper around workqueue with native VK instrumentation

func New

func New(ratelimiter workqueue.RateLimiter, name string, handler ItemHandler, retryFunc ShouldRetryFunc) *Queue

New creates a queue

It expects to get a item rate limiter, and a friendly name which is used in logs, and in the internal kubernetes metrics. If retryFunc is nil, the default retry function.

func (*Queue) Empty

func (q *Queue) Empty() bool

Empty returns if the queue has no items in it

It should only be used for debugging.

func (*Queue) Enqueue

func (q *Queue) Enqueue(ctx context.Context, key string)

Enqueue enqueues the key in a rate limited fashion

func (*Queue) EnqueueWithoutRateLimit

func (q *Queue) EnqueueWithoutRateLimit(ctx context.Context, key string)

EnqueueWithoutRateLimit enqueues the key without a rate limit

func (*Queue) EnqueueWithoutRateLimitWithDelay

func (q *Queue) EnqueueWithoutRateLimitWithDelay(ctx context.Context, key string, after time.Duration)

EnqueueWithoutRateLimitWithDelay enqueues without rate limiting, but work will not start for this given delay period

func (*Queue) Forget

func (q *Queue) Forget(ctx context.Context, key string)

Forget forgets the key

func (*Queue) ItemsBeingProcessedLen

func (q *Queue) ItemsBeingProcessedLen() int

ProcessedLen returns the count items that are being processed

func (*Queue) Len

func (q *Queue) Len() int

Len includes items that are in the queue, and are being processed

func (*Queue) Run

func (q *Queue) Run(ctx context.Context, workers int)

Run starts the workers

It blocks until context is cancelled, and all of the workers exit.

func (*Queue) String

func (q *Queue) String() string

func (*Queue) UnprocessedLen

func (q *Queue) UnprocessedLen() int

UnprocessedLen returns the count of items yet to be processed in the queue

type ShouldRetryFunc

type ShouldRetryFunc func(ctx context.Context, key string, timesTried int, originallyAdded time.Time, err error) (*time.Duration, error)

ShouldRetryFunc is a mechanism to have a custom retry policy

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL