controller

package
v1.15.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 30, 2023 License: MPL-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RequeueAfter

func RequeueAfter(after time.Duration) error

RequeueAfter constructs a RequeueAfterError with the given duration setting.

func RequeueNow

func RequeueNow() error

RequeueNow constructs a RequeueAfterError that reschedules the Request immediately.

Types

type Controller

type Controller interface {
	// Run begins the Controller's main processing loop. When the given
	// context is canceled, the Controller stops processing any remaining work.
	// The Run function should only ever be called once.
	Run(ctx context.Context) error
	// Subscribe tells the controller to subscribe to updates for config entries based
	// on the given request. Optional transformation functions can also be passed in
	// to Subscribe, allowing a controller to map a config entry to a different type of
	// request under the hood (i.e. watching a dependency and triggering a Reconcile on
	// the dependent resource). This should only ever be called prior to calling Run.
	Subscribe(request *stream.SubscribeRequest, transformers ...Transformer) Controller
	// WithBackoff changes the base and maximum backoff values for the Controller's
	// Request retry rate limiter. This should only ever be called prior to
	// running Run.
	WithBackoff(base, max time.Duration) Controller
	// WithLogger sets the logger for the controller, it should be called prior to Start
	// being invoked.
	WithLogger(logger hclog.Logger) Controller
	// WithWorkers sets the number of worker goroutines used to process the queue
	// this defaults to 1 goroutine.
	WithWorkers(i int) Controller
	// WithQueueFactory allows a Controller to replace its underlying work queue
	// implementation. This is most useful for testing. This should only ever be called
	// prior to running Run.
	WithQueueFactory(fn func(ctx context.Context, baseBackoff time.Duration, maxBackoff time.Duration) WorkQueue) Controller
	// AddTrigger allows for triggering a reconciliation request when a
	// triggering function returns, when the passed in context is canceled
	// the trigger must return
	AddTrigger(request Request, trigger func(ctx context.Context) error)
	// RemoveTrigger removes the triggering function associated with the Request object
	RemoveTrigger(request Request)
	// Enqueue adds all of the given requests into the work queue.
	Enqueue(requests ...Request)
}

Controller subscribes to a set of watched resources from the state store and delegates processing them to a given Reconciler. If a Reconciler errors while processing a Request, then the Controller handles rescheduling the Request to be re-processed.

func New

func New(publisher state.EventPublisher, reconciler Reconciler) Controller

New returns a new Controller associated with the given state store and reconciler.

type DeferQueue

type DeferQueue interface {
	// Defer defers processing a Request until a given time. When
	// the timeout is hit, the request will be processed by the
	// callback given in the Process loop. If the given context
	// is canceled, the item is not deferred.
	Defer(ctx context.Context, item Request, until time.Time)
	// Process processes all items in the defer queue with the
	// given callback, blocking until the given context is canceled.
	// Callers should only ever call Process once, likely in a
	// long-lived goroutine.
	Process(ctx context.Context, callback func(item Request))
}

DeferQueue is a generic priority queue implementation that allows for deferring and later processing Requests.

func NewDeferQueue

func NewDeferQueue(tick time.Duration) DeferQueue

NewDeferQueue returns a priority queue for deferred Requests.

type Limiter

type Limiter interface {
	// NextRetry returns the remaining time until the queue should
	// reprocess a Request.
	NextRetry(request Request) time.Duration
	// Forget causes the Limiter to reset the backoff for the Request.
	Forget(request Request)
}

Limiter is an interface for a rate limiter that can limit the number of retries processed in the work queue.

func NewRateLimiter

func NewRateLimiter(base, max time.Duration) Limiter

NewRateLimiter returns a Limiter that does per-item exponential backoff.

type Reconciler

type Reconciler interface {
	// Reconcile performs a reconciliation on the config entry referred to by the Request.
	// The Controller will requeue the Request to be processed again if an error is non-nil.
	// If no error is returned, the Request will be removed from the working queue.
	Reconcile(context.Context, Request) error
}

Reconciler is the main implementation interface for Controllers. A Reconciler receives any change notifications for config entries that the controller is subscribed to and processes them with its Reconcile function.

type Request

type Request struct {
	Kind string
	Name string
	Meta *acl.EnterpriseMeta
}

Request contains the information necessary to reconcile a config entry. This includes only the information required to uniquely identify the config entry.

type RequeueAfterError

type RequeueAfterError time.Duration

RequeueAfterError is an error that allows a Reconciler to override the exponential backoff behavior of the Controller, rather than applying the backoff algorithm, returning a RequeueAfterError will cause the Controller to reschedule the Request at a given time in the future.

func (RequeueAfterError) Error

func (r RequeueAfterError) Error() string

Error implements the error interface.

type Transformer

type Transformer func(entry structs.ConfigEntry) []Request

Transformer is a function that takes one type of config entry that has changed and transforms that into a set of reconciliation requests to enqueue.

type WorkQueue

type WorkQueue interface {
	// Get retrieves the next Request in the queue, blocking until a Request is
	// available, if shutdown is true, then the queue is shutting down and should
	// no longer be used by the caller.
	Get() (item Request, shutdown bool)
	// Add immediately adds a Request to the work queue.
	Add(item Request)
	// AddAfter adds a Request to the work queue after a given amount of time.
	AddAfter(item Request, duration time.Duration)
	// AddRateLimited adds a Request to the work queue after the amount of time
	// specified by applying the queue's rate limiter.
	AddRateLimited(item Request)
	// Forget signals the queue to reset the rate-limiting for the given Request.
	Forget(item Request)
	// Done tells the work queue that the Request has been successfully processed
	// and can be deleted from the queue.
	Done(item Request)
}

WorkQueue is an interface for a work queue with semantics to help with retries and rate limiting.

func RunWorkQueue

func RunWorkQueue(ctx context.Context, baseBackoff, maxBackoff time.Duration) WorkQueue

RunWorkQueue returns a started WorkQueue that has per-Request exponential backoff rate-limiting. When the passed in context is canceled, the queue shuts down.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL