Documentation ¶
Overview ¶
Package controller provides reusable support for controller implementations.
Index ¶
- func RetryAlways(obj interface{}, err error, retries Retry) bool
- func RetryNever(obj interface{}, err error, retries Retry) bool
- type Queue
- type QueueRetryManager
- type ReQueue
- type Retry
- type RetryController
- type RetryFunc
- type RetryManager
- type RunnableController
- type Scheduler
- func (s *Scheduler) Add(key, value interface{})
- func (s *Scheduler) Delay(key interface{})
- func (s *Scheduler) Len() int
- func (s *Scheduler) Map() map[interface{}]interface{}
- func (s *Scheduler) Remove(key, value interface{}) bool
- func (s *Scheduler) RunOnce()
- func (s *Scheduler) RunUntil(ch <-chan struct{})
- type StoppableController
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RetryAlways ¶
RetryNever is a RetryFunc implementation that will always retry
func RetryNever ¶
RetryNever is a RetryFunc implementation that will never retry
Types ¶
type Queue ¶
type Queue interface {
Pop() interface{}
}
Queue is a narrow abstraction of a cache.FIFO.
type QueueRetryManager ¶
type QueueRetryManager struct {
// contains filtered or unexported fields
}
QueueRetryManager retries a resource by re-queueing it into a ReQueue as long as retryFunc returns true.
func NewQueueRetryManager ¶
func NewQueueRetryManager(queue ReQueue, keyFn kcache.KeyFunc, retryFn RetryFunc, limiter flowcontrol.RateLimiter) *QueueRetryManager
NewQueueRetryManager safely creates a new QueueRetryManager.
func (*QueueRetryManager) Forget ¶
func (r *QueueRetryManager) Forget(resource interface{})
Forget resets the retry count for resource.
func (*QueueRetryManager) Retry ¶
func (r *QueueRetryManager) Retry(resource interface{}, err error)
Retry will enqueue resource until retryFunc returns false for that resource has been exceeded, at which point resource will be forgotten and no longer retried. The current retry count will be passed to each invocation of retryFunc.
type Retry ¶
type Retry struct { // Count is the number of retries Count int // StartTimestamp is retry start timestamp StartTimestamp unversioned.Time }
Retry describes provides additional information regarding retries.
type RetryController ¶
type RetryController struct { // Queue is where work is retrieved for Handle. Queue // Handle is expected to process the next resource from the queue. Handle func(interface{}) error // RetryManager is fed the handled resource if Handle returns a Retryable // error. If Handle returns no error, the RetryManager is asked to forget // the resource. RetryManager }
RetryController is a RunnableController which delegates resource handling to a function and knows how to safely manage retries of a resource which failed to be successfully handled.
func (*RetryController) Run ¶
func (c *RetryController) Run()
Run begins processing resources from Queue asynchronously.
func (*RetryController) RunUntil ¶
func (c *RetryController) RunUntil(stopCh <-chan struct{})
RunUntil begins processing resources from Queue asynchronously until stopCh is closed.
type RetryFunc ¶
RetryFunc should return true if the given object and error should be retried after the provided number of times.
type RetryManager ¶
type RetryManager interface { // Retry will cause resource processing to be retried (for example, by // requeueing resource) Retry(resource interface{}, err error) // Forget will cause the manager to erase all prior knowledge of resource // and reclaim internal resources associated with state tracking of // resource. Forget(resource interface{}) }
RetryManager knows how to retry processing of a resource, and how to forget a resource it may be tracking the state of.
type RunnableController ¶
type RunnableController interface {
// Run starts the asynchronous controller loop.
Run()
}
RunnableController is a controller which implements a Run loop.
type Scheduler ¶ added in v1.1.2
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler is a self-balancing, rate-limited, bucketed queue that can periodically invoke an action on all items in a bucket before moving to the next bucket. A ratelimiter sets an upper bound on the number of buckets processed per unit time. The queue has a key and a value, so both uniqueness and equality can be tested (key must be unique, value can carry info for the next processing). Items remain in the queue until removed by a call to Remove().
func NewScheduler ¶ added in v1.1.2
func NewScheduler(bucketCount int, bucketLimiter flowcontrol.RateLimiter, fn func(key, value interface{})) *Scheduler
NewScheduler creates a scheduler with bucketCount buckets, a rate limiter for restricting the rate at which buckets are processed, and a function to invoke when items are scanned in a bucket. TODO: remove DEBUG statements from this file once this logic has been adequately validated.
func (*Scheduler) Add ¶ added in v1.1.2
func (s *Scheduler) Add(key, value interface{})
Add places the key in the bucket with the least entries (except the current bucket). The key is used to determine uniqueness, while value can be used to associate additional data for later retrieval. An Add removes the previous key and value and will place the item in a new bucket. This allows callers to ensure that Add'ing a new item to the queue purges old versions of the item, while Remove can be conditional on removing only the known old version.
func (*Scheduler) Delay ¶ added in v1.1.2
func (s *Scheduler) Delay(key interface{})
Delay moves the key to the end of the chain if it exists.
func (*Scheduler) Map ¶ added in v1.1.2
func (s *Scheduler) Map() map[interface{}]interface{}
Map returns a copy of the scheduler contents, but does not copy the keys or values themselves. If values and keys are not immutable, changing the value will affect the value in the queue.
func (*Scheduler) Remove ¶ added in v1.1.2
Remove takes the key out of all buckets. If value is non-nil, the key will only be removed if it has the same value. Returns true if the key was removed.
type StoppableController ¶ added in v1.1.2
type StoppableController interface { // RunUntil starts the asynchronous controller loop, which runs until // ch is closed. RunUntil(ch <-chan struct{}) }
StoppableController is a controller which implements a Run loop.