controller

package
v1.3.0-alpha.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 25, 2016 License: Apache-2.0 Imports: 6 Imported by: 0

Documentation

Overview

Package controller provides reusable support for controller implementations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RetryAlways

func RetryAlways(obj interface{}, err error, retries Retry) bool

RetryNever is a RetryFunc implementation that will always retry

func RetryNever

func RetryNever(obj interface{}, err error, retries Retry) bool

RetryNever is a RetryFunc implementation that will never retry

Types

type Queue

type Queue interface {
	Pop() interface{}
}

Queue is a narrow abstraction of a cache.FIFO.

type QueueRetryManager

type QueueRetryManager struct {
	// contains filtered or unexported fields
}

QueueRetryManager retries a resource by re-queueing it into a ReQueue as long as retryFunc returns true.

func NewQueueRetryManager

func NewQueueRetryManager(queue ReQueue, keyFn kcache.KeyFunc, retryFn RetryFunc, limiter flowcontrol.RateLimiter) *QueueRetryManager

NewQueueRetryManager safely creates a new QueueRetryManager.

func (*QueueRetryManager) Forget

func (r *QueueRetryManager) Forget(resource interface{})

Forget resets the retry count for resource.

func (*QueueRetryManager) Retry

func (r *QueueRetryManager) Retry(resource interface{}, err error)

Retry will enqueue resource until retryFunc returns false for that resource has been exceeded, at which point resource will be forgotten and no longer retried. The current retry count will be passed to each invocation of retryFunc.

type ReQueue

type ReQueue interface {
	Queue
	AddIfNotPresent(interface{}) error
}

ReQueue is a queue that allows an object to be requeued

type Retry

type Retry struct {
	// Count is the number of retries
	Count int

	// StartTimestamp is retry start timestamp
	StartTimestamp unversioned.Time
}

Retry describes provides additional information regarding retries.

type RetryController

type RetryController struct {
	// Queue is where work is retrieved for Handle.
	Queue

	// Handle is expected to process the next resource from the queue.
	Handle func(interface{}) error

	// RetryManager is fed the handled resource if Handle returns a Retryable
	// error. If Handle returns no error, the RetryManager is asked to forget
	// the resource.
	RetryManager
}

RetryController is a RunnableController which delegates resource handling to a function and knows how to safely manage retries of a resource which failed to be successfully handled.

func (*RetryController) Run

func (c *RetryController) Run()

Run begins processing resources from Queue asynchronously.

func (*RetryController) RunUntil

func (c *RetryController) RunUntil(stopCh <-chan struct{})

RunUntil begins processing resources from Queue asynchronously until stopCh is closed.

type RetryFunc

type RetryFunc func(obj interface{}, err error, retries Retry) bool

RetryFunc should return true if the given object and error should be retried after the provided number of times.

type RetryManager

type RetryManager interface {
	// Retry will cause resource processing to be retried (for example, by
	// requeueing resource)
	Retry(resource interface{}, err error)

	// Forget will cause the manager to erase all prior knowledge of resource
	// and reclaim internal resources associated with state tracking of
	// resource.
	Forget(resource interface{})
}

RetryManager knows how to retry processing of a resource, and how to forget a resource it may be tracking the state of.

type RunnableController

type RunnableController interface {
	// Run starts the asynchronous controller loop.
	Run()
}

RunnableController is a controller which implements a Run loop.

type Scheduler added in v1.1.2

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler is a self-balancing, rate-limited, bucketed queue that can periodically invoke an action on all items in a bucket before moving to the next bucket. A ratelimiter sets an upper bound on the number of buckets processed per unit time. The queue has a key and a value, so both uniqueness and equality can be tested (key must be unique, value can carry info for the next processing). Items remain in the queue until removed by a call to Remove().

func NewScheduler added in v1.1.2

func NewScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *Scheduler

NewScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting the rate at which buckets are processed, and a function to invoke when items are scanned in a bucket. TODO: remove DEBUG statements from this file once this logic has been adequately validated.

func (*Scheduler) Add added in v1.1.2

func (s *Scheduler) Add(key, value interface{})

Add places the key in the bucket with the least entries (except the current bucket). The key is used to determine uniqueness, while value can be used to associate additional data for later retrieval. An Add removes the previous key and value and will place the item in a new bucket. This allows callers to ensure that Add'ing a new item to the queue purges old versions of the item, while Remove can be conditional on removing only the known old version.

func (*Scheduler) Delay added in v1.1.2

func (s *Scheduler) Delay(key interface{})

Delay moves the key to the end of the chain if it exists.

func (*Scheduler) Len added in v1.1.2

func (s *Scheduler) Len() int

Len returns the number of scheduled items.

func (*Scheduler) Map added in v1.1.2

func (s *Scheduler) Map() map[interface{}]interface{}

Map returns a copy of the scheduler contents, but does not copy the keys or values themselves. If values and keys are not immutable, changing the value will affect the value in the queue.

func (*Scheduler) Remove added in v1.1.2

func (s *Scheduler) Remove(key, value interface{}) bool

Remove takes the key out of all buckets. If value is non-nil, the key will only be removed if it has the same value. Returns true if the key was removed.

func (*Scheduler) RunOnce added in v1.1.2

func (s *Scheduler) RunOnce()

RunOnce takes a single item out of the current bucket and processes it. If the bucket is empty, we wait for the rate limiter before returning.

func (*Scheduler) RunUntil added in v1.1.2

func (s *Scheduler) RunUntil(ch <-chan struct{})

RunUntil launches the scheduler until ch is closed.

type StoppableController added in v1.1.2

type StoppableController interface {
	// RunUntil starts the asynchronous controller loop, which runs until
	// ch is closed.
	RunUntil(ch <-chan struct{})
}

StoppableController is a controller which implements a Run loop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL