workqueue

package
v1.4.0-alpha.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2016 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package workqueue provides a simple queue that supports the following features:

  • Fair: items processed in the order in which they are added.
  • Stingy: a single item will not be processed multiple times concurrently, and if an item is added multiple times before it can be processed, it will only be processed once.
  • Multiple consumers and producers. In particular, it is allowed for an item to be reenqueued while it is being processed.
  • Shutdown notifications.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Parallelize added in v1.3.0

func Parallelize(workers, pieces int, doWorkPiece DoWorkPieceFunc)

Parallelize is a very simple framework that allow for parallelizing N independent pieces of work.

Types

type BucketRateLimiter added in v1.3.0

type BucketRateLimiter struct {
	*ratelimit.Bucket
}

BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API

func (*BucketRateLimiter) Forget added in v1.3.0

func (r *BucketRateLimiter) Forget(item interface{})

func (*BucketRateLimiter) NumRequeues added in v1.3.0

func (r *BucketRateLimiter) NumRequeues(item interface{}) int

func (*BucketRateLimiter) When added in v1.3.0

func (r *BucketRateLimiter) When(item interface{}) time.Duration

type DelayingInterface added in v1.3.0

type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}

DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.

func NewDelayingQueue added in v1.3.0

func NewDelayingQueue() DelayingInterface

NewDelayingQueue constructs a new workqueue with delayed queuing ability

func NewNamedDelayingQueue

func NewNamedDelayingQueue(name string) DelayingInterface

type DoWorkPieceFunc added in v1.3.0

type DoWorkPieceFunc func(piece int)

type Interface added in v1.3.0

type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShuttingDown() bool
}

type ItemExponentialFailureRateLimiter added in v1.3.0

type ItemExponentialFailureRateLimiter struct {
	// contains filtered or unexported fields
}

ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit dealing with max failures and expiration are up to the caller

func (*ItemExponentialFailureRateLimiter) Forget added in v1.3.0

func (r *ItemExponentialFailureRateLimiter) Forget(item interface{})

func (*ItemExponentialFailureRateLimiter) NumRequeues added in v1.3.0

func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int

func (*ItemExponentialFailureRateLimiter) When added in v1.3.0

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration

type ItemFastSlowRateLimiter added in v1.3.0

type ItemFastSlowRateLimiter struct {
	// contains filtered or unexported fields
}

ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that

func (*ItemFastSlowRateLimiter) Forget added in v1.3.0

func (r *ItemFastSlowRateLimiter) Forget(item interface{})

func (*ItemFastSlowRateLimiter) NumRequeues added in v1.3.0

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int

func (*ItemFastSlowRateLimiter) When added in v1.3.0

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration

type MaxOfRateLimiter added in v1.3.0

type MaxOfRateLimiter struct {
	// contains filtered or unexported fields
}

MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.

func (*MaxOfRateLimiter) Forget added in v1.3.0

func (r *MaxOfRateLimiter) Forget(item interface{})

func (*MaxOfRateLimiter) NumRequeues added in v1.3.0

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int

func (*MaxOfRateLimiter) When added in v1.3.0

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration

type RateLimiter added in v1.3.0

type RateLimiter interface {
	// When gets an item and gets to decide how long that item should wait
	When(item interface{}) time.Duration
	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
	// or for success, we'll stop tracking it
	Forget(item interface{})
	// NumRequeues returns back how many failures the item has had
	NumRequeues(item interface{}) int
}

func DefaultControllerRateLimiter added in v1.3.0

func DefaultControllerRateLimiter() RateLimiter

DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential

func DefaultItemBasedRateLimiter added in v1.3.0

func DefaultItemBasedRateLimiter() RateLimiter

func NewItemExponentialFailureRateLimiter added in v1.3.0

func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter

func NewItemFastSlowRateLimiter added in v1.3.0

func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter

func NewMaxOfRateLimiter added in v1.3.0

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter

type RateLimitingInterface added in v1.3.0

type RateLimitingInterface interface {
	DelayingInterface
	// AddRateLimited adds an item to the workqueue after the rate limiter says its ok
	AddRateLimited(item interface{})

	// Forget indicates that an item is finished being retried.  Doesn't matter whether its for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
	Forget(item interface{})
	// NumRequeues returns back how many times the item was requeued
	NumRequeues(item interface{}) int
}

RateLimitingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.

func NewNamedRateLimitingQueue

func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface

func NewRateLimitingQueue added in v1.3.0

func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface

NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever.

type TimedWorkQueue

type TimedWorkQueue struct {
	*Type
}

func NewTimedWorkQueue

func NewTimedWorkQueue() *TimedWorkQueue

func (TimedWorkQueue) Add

func (q TimedWorkQueue) Add(timedItem *TimedWorkQueueItem)

Add adds the obj along with the current timestamp to the queue.

func (TimedWorkQueue) Done

func (q TimedWorkQueue) Done(timedItem *TimedWorkQueueItem) error

func (TimedWorkQueue) Get

func (q TimedWorkQueue) Get() (timedItem *TimedWorkQueueItem, shutdown bool)

Get gets the obj along with its timestamp from the queue.

type TimedWorkQueueItem

type TimedWorkQueueItem struct {
	StartTime time.Time
	Object    interface{}
}

type Type

type Type struct {
	// contains filtered or unexported fields
}

Type is a work queue (see the package comment).

func New

func New() *Type

New constructs a new workqueue (see the package comment).

func NewNamed

func NewNamed(name string) *Type

func (*Type) Add

func (q *Type) Add(item interface{})

Add marks item as needing processing.

func (*Type) Done

func (q *Type) Done(item interface{})

Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.

func (*Type) Get

func (q *Type) Get() (item interface{}, shutdown bool)

Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.

func (*Type) Len added in v0.17.0

func (q *Type) Len() int

Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.

func (*Type) ShutDown

func (q *Type) ShutDown()

Shutdown will cause q to ignore all new items added to it. As soon as the worker goroutines have drained the existing items in the queue, they will be instructed to exit.

func (*Type) ShuttingDown added in v1.3.0

func (q *Type) ShuttingDown() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL