operator

package
v0.9.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 12, 2023 License: Apache-2.0 Imports: 15 Imported by: 6

Documentation

Index

Constants

View Source
const (
	ResourceActionCreate = ResourceAction("CREATE")
	ResourceActionUpdate = ResourceAction("UPDATE")
	ResourceActionDelete = ResourceAction("DELETE")
)

Variables

View Source
var DefaultRetryPolicy = ExponentialBackoffRetryPolicy(5*time.Second, 5)

DefaultRetryPolicy is an Exponential Backoff RetryPolicy with an initial 5-second delay and a max of 5 attempts

View Source
var EmptyLabelFilters []string
View Source
var ErrInformerAlreadyAdded = errors.New("informer for resource kind already added")

ErrInformerAlreadyAdded indicates that there is already an informer for the resource kind mapped

View Source
var OpinionatedRetryDequeuePolicy = func(newAction ResourceAction, newObject resource.Object, retryAction ResourceAction, retryObject resource.Object, retryError error) bool {
	if newAction == ResourceActionDelete {
		return true
	}
	if newAction != retryAction {
		return false
	}
	if getGeneration(newObject) == getGeneration(retryObject) {
		return false
	}
	return true
}

OpinionatedRetryDequeuePolicy is a RetryDequeuePolicy which has the following logic: 1. If the newAction is a delete, dequeue the retry 2. If the newAction and retryAction are different, keep the retry (for example, a queued create retry, and a received update action) 3. If the generation of newObject and retryObject is the same, keep the retry 4. Otherwise, dequeue the retry

Functions

func DefaultFinalizerSupplier

func DefaultFinalizerSupplier(sch resource.Schema) string

DefaultFinalizerSupplier crates finalizer following to pattern `operator.{version}.{kind}.{group}`.

Types

type Controller

type Controller interface {
	Run(<-chan struct{}) error
}

Controller is an interface that describes a controller which can be run as part of an operator

type ConvertableIntoResourceObject

type ConvertableIntoResourceObject interface {
	Into(object resource.Object) error
}

ConvertableIntoResourceObject describes any object which can be marshaled into a resource.Object. This is specifically useful for objects which may wrap underlying data which can be marshaled into a resource.Object, but need the exact implementation provided to them (by `into`).

type FinalizerSupplier

type FinalizerSupplier func(sch resource.Schema) string

FinalizerSupplier represents a function that creates string finalizer from provider schema.

type Informer

type Informer interface {
	AddEventHandler(handler ResourceWatcher) error
	Run(stopCh <-chan struct{}) error
}

Informer is an interface describing an informer which can be managed by InformerController

type InformerController

type InformerController struct {
	// ErrorHandler is a user-specified error handling function. This is typically for logging/metrics use,
	// as retry logic is covered by the RetryPolicy.
	ErrorHandler func(error)
	// RetryPolicy is a user-specified retry logic function which will be used when ResourceWatcher function calls fail.
	RetryPolicy RetryPolicy
	// RetryDequeuePolicy is a user-specified retry dequeue logic function which will be used for new informer actions
	// when one or more retries for the object are still pending. If not present, existing retries are always dequeued.
	RetryDequeuePolicy RetryDequeuePolicy
	// contains filtered or unexported fields
}

InformerController is an object that handles coordinating informers and observers. Unlike adding a Watcher directly to an Informer with AddEventHandler, the InformerController guarantees sequential execution of watchers, based on add order.

func NewInformerController

func NewInformerController() *InformerController

NewInformerController creates a new controller

func (*InformerController) AddInformer

func (c *InformerController) AddInformer(informer Informer, resourceKind string) error

AddInformer adds an informer for a specific resourceKind. The `resourceKind` string is used for internal tracking and correlation to observers, and does not necessarily need to match the informer's type.

Multiple informers may be added for the same resource kind, and each will trigger all watchers for that resource kind. The most common usage of this is to have informers partitioned by namespace or labels for the same resource kind, which share a watcher.

func (*InformerController) AddWatcher

func (c *InformerController) AddWatcher(watcher ResourceWatcher, resourceKind string) error

AddWatcher adds an observer to an informer with a matching `resourceKind`. Any time the informer sees an add, update, or delete, it will call the observer's corresponding method. Multiple watchers can exist for the same resource kind. They will be run in the order they were added to the informer.

func (*InformerController) RemoveAllWatchersForResource

func (c *InformerController) RemoveAllWatchersForResource(resourceKind string)

RemoveAllWatchersForResource removes all watchers for a specific resourceKind

func (*InformerController) RemoveWatcher

func (c *InformerController) RemoveWatcher(watcher ResourceWatcher, resourceKind string)

RemoveWatcher removes the given ResourceWatcher from the list for the resourceKind, provided it exists in the list.

func (*InformerController) Run

func (c *InformerController) Run(stopCh <-chan struct{}) error

Run runs the controller, which starts all informers, until stopCh is closed

type KubernetesBasedInformer

type KubernetesBasedInformer struct {
	ErrorHandler        func(error)
	SharedIndexInformer cache.SharedIndexInformer
	// contains filtered or unexported fields
}

KubernetesBasedInformer is a k8s apimachinery-based informer. It wraps a k8s cache.SharedIndexInformer, and works most optimally with a client that has a Watch response that implements KubernetesCompatibleWatch.

func NewKubernetesBasedInformer

func NewKubernetesBasedInformer(sch resource.Schema, client ListWatchClient, namespace string) (
	*KubernetesBasedInformer, error)

NewKubernetesBasedInformer creates a new KubernetesBasedInformer for the provided schema and namespace, using the ListWatchClient provided to do its List and Watch requests.

func NewKubernetesBasedInformerWithFilters

func NewKubernetesBasedInformerWithFilters(sch resource.Schema, client ListWatchClient, namespace string, labelFilters []string) (
	*KubernetesBasedInformer, error)

NewKubernetesBasedInformerWithFilters creates a new KubernetesBasedInformer for the provided schema and namespace, using the ListWatchClient provided to do its List and Watch requests applying provided labelFilters if it is not empty.

func (*KubernetesBasedInformer) AddEventHandler

func (k *KubernetesBasedInformer) AddEventHandler(handler ResourceWatcher) error

AddEventHandler adds a ResourceWatcher as an event handler for watch events from the informer. Event handlers are not guaranteed to be executed in parallel or in any particular order by the underlying kubernetes apimachinery code. If you want to coordinate ResourceWatchers, use am InformerController.

func (*KubernetesBasedInformer) Run

func (k *KubernetesBasedInformer) Run(stopCh <-chan struct{}) error

Run starts the informer and blocks until stopCh receives a message

func (*KubernetesBasedInformer) Schema

Schema returns the resource.Schema this informer is set up for

type KubernetesCompatibleWatch

type KubernetesCompatibleWatch interface {
	KubernetesWatch() watch.Interface
}

KubernetesCompatibleWatch describes a watch response that either is wrapping a kubernetes watch.Interface, or can return a compatibility layer that implements watch.Interface.

type ListMap

type ListMap[K comparable, V any] struct {
	// contains filtered or unexported fields
}

ListMap is a map of lists which is thread-safe, with read and write distinction. The underlying map and slice(s) are not directly accessible, as it would prevent the read/write safety.

func NewListMap

func NewListMap[T any]() *ListMap[string, T]

NewListMap returns a pointer to a new properly-initialized ListMap. The type parameter is the type of elements in the lists

func (*ListMap[K, V]) AddItem

func (l *ListMap[K, V]) AddItem(key K, items ...V)

AddItem adds one or more items to the list for a key. If the key does not exist, it will be added. AddItem locks the particular list for writing, so simultaneous AddItem calls for the same key will be sequential, and simultaneous AddItem calls for different keys will not impact each other.

func (*ListMap[K, V]) ItemAt

func (l *ListMap[K, V]) ItemAt(key K, index int) (V, bool)

ItemAt returns the item at index `index` in the list for the map key `key`. If the index or key do not exist, it will return an empty value, and a false.

func (*ListMap[K, V]) KeySize

func (l *ListMap[K, V]) KeySize(key K) int

KeySize returns the current length of the list for a key.

func (*ListMap[K, V]) Keys

func (l *ListMap[K, V]) Keys() []K

Keys returns a list of all keys in the map at the time of the call. It does not lock the map for writing, so new keys may be added during this call which may not be present in the result.

func (*ListMap[K, V]) Range

func (l *ListMap[K, V]) Range(key K, rangeFunc func(index int, value V))

Range performs a range operation for a given key's list. It consumes a rangeFunc, which takes arguments identical to a traditional go `range` function: `index` is the index within the list of the current item, and `value` is the reused pointer to the item.

func (*ListMap[K, V]) RangeAll

func (l *ListMap[K, V]) RangeAll(rangeFunc func(key K, index int, value V))

RangeAll ranges through all keys in the map, and all items in each key's list. It calls rangeFunc for each item in all lists. `key` is the list's map key, `index` is the index within the list, and `value` is the value pointer, as in a normal `range` operation.

func (*ListMap[K, V]) RemoveItem

func (l *ListMap[K, V]) RemoveItem(key K, match func(V) bool) bool

RemoveItem removes the first item in the list for a key which satisfies the `match` function. If the key does not exist, or no item in the list satisfies the `match` function, it is a no-op. The function returns `true` if an item was deleted, and `false` otherwise. The delete preserves list order after delete, meaning that all items subsequent to the index are left-shifted.

func (*ListMap[K, V]) RemoveItemAt

func (l *ListMap[K, V]) RemoveItemAt(key K, index int)

RemoveItemAt removes a specific index from the list for a key. If the key or index does not exist, it is a no-op. The delete preserves list order after delete, meaning that all items subsequent to the index are left-shifted.

func (*ListMap[K, V]) RemoveItems added in v0.9.9

func (l *ListMap[K, V]) RemoveItems(key K, match func(V) bool, limit int) int

RemoveItems removes the first N (`limit`) items in the list for a key which satisfies the `match` function. If `limit` is less than 1, there is no limit to the number of items which can be removed. If the key does not exist, or no item in the list satisfies the `match` function, it is a no-op. The function returns the number of removed items. The delete preserves list order after delete, meaning that all items subsequent to the index are left-shifted.

func (*ListMap[K, V]) RemoveKey

func (l *ListMap[K, V]) RemoveKey(key K)

RemoveKey removes the key from the map. If the key has a list, the list is deleted.

func (*ListMap[K, V]) Size

func (l *ListMap[K, V]) Size() int

Size returns the current size of the map.

type ListWatchClient

type ListWatchClient interface {
	ListInto(ctx context.Context, namespace string, options resource.ListOptions, into resource.ListObject) error
	Watch(ctx context.Context, namespace string, options resource.WatchOptions) (resource.WatchResponse, error)
}

ListWatchClient describes a client which can do ListInto and Watch requests.

type Operator

type Operator struct {
	// contains filtered or unexported fields
}

Operator is the highest-level construct of the `operator` package, and contains one or more controllers which can be run. Operator handles scaling and error propagation for its underlying controllers

func New

func New() *Operator

New creates a new Operator

func (*Operator) AddController

func (o *Operator) AddController(c Controller)

AddController adds a new controller to the operator. If called after `Run`, it will not be added to the currently-running controllers.

func (*Operator) Run

func (o *Operator) Run(stopCh <-chan struct{}) error

Run runs the operator until an unrecoverable error occurs or the stopCh is closed/receives a message.

type OpinionatedWatcher

type OpinionatedWatcher struct {
	AddFunc    func(ctx context.Context, object resource.Object) error
	UpdateFunc func(ctx context.Context, old resource.Object, new resource.Object) error
	DeleteFunc func(ctx context.Context, object resource.Object) error
	SyncFunc   func(ctx context.Context, object resource.Object) error
	// contains filtered or unexported fields
}

OpinionatedWatcher is a ResourceWatcher implementation that handles extra state logic, ensuring that downtime and restarts will not result in missed events. It does this via a few mechanisms is transparently handles for the user:

It adds a finalizer for all newly-created resources. This ensures that deletes cannot complete until the finalizer is removed, so the event will not be missed if the operator is down.

It only removes the finalizer after a successful call to `DeleteFunc`, which ensures that the resource is only deleted once the handler has succeeded.

On startup, it is able to differentiate between `Add` events, which are newly-created resources the operator has not yet handled, and `Add` events which are previously-created resources that have already been handled by the operator. Fully new resources call the `AddFunc` handler, and previously-created call the `SyncFunc` handler.

`Update` events which do not update anything in the spec or significant parts of the metadata are ignored.

OpinionatedWatcher contains unexported fields, and must be created with NewOpinionatedWatcher

func NewOpinionatedWatcher

func NewOpinionatedWatcher(sch resource.Schema, client PatchClient) (*OpinionatedWatcher, error)

NewOpinionatedWatcher sets up a new OpinionatedWatcher and returns a pointer to it.

func NewOpinionatedWatcherWithFinalizer

func NewOpinionatedWatcherWithFinalizer(sch resource.Schema, client PatchClient, supplier FinalizerSupplier) (*OpinionatedWatcher, error)

NewOpinionatedWatcherWithFinalizer sets up a new OpinionatedWatcher with finalizer from provided supplier and returns a pointer to it.

func (*OpinionatedWatcher) Add

func (o *OpinionatedWatcher) Add(ctx context.Context, object resource.Object) error

Add is part of implementing ResourceWatcher, and calls the underlying AddFunc, SyncFunc, or DeleteFunc based upon internal logic. When the object is first added, AddFunc is called and a finalizer is attached to it. Subsequent calls to Add will check the finalizer list and call SyncFunc if the finalizer is already attached, or if ObjectMetadata.DeletionTimestamp is non-nil, they will call DeleteFunc and remove the finalizer (the finalizer prevents the resource from being hard deleted until it is removed).

func (*OpinionatedWatcher) Delete

Delete exists to implement ResourceWatcher, but, due to deletes only happening after the finalizer is removed, this function does nothing.

func (*OpinionatedWatcher) Update

Update is part of implementing ResourceWatcher and calls the underlying UpdateFunc or DeleteFunc based on internal logic. If the new object has a non-nil ObjectMetadata.DeletionTimestamp in its metadata, DeleteFunc will be called, and the object's finalizer will be removed to allow kubernetes to hard delete it. Otherwise, UpdateFunc is called, provided the update is non-trivial (that is, the metadata.Generation has changed).

func (*OpinionatedWatcher) Wrap

func (o *OpinionatedWatcher) Wrap(watcher ResourceWatcher, syncToAdd bool)

Wrap wraps the Add, Update, and Delete calls in another ResourceWatcher by having the AddFunc call watcher. Add, UpdateFunc call watcher.Update, and DeleteFunc call watcher.Delete. If syncToAdd is true, SyncFunc will also call resource.Add. If it is false, SyncFunc will not be assigned.

type PatchClient

PatchClient is a Client capable of making PatchInto requests. This is used by OpinionatedWatch to update finalizers.

type ResourceAction added in v0.9.9

type ResourceAction string

type ResourceObjectWrapper

type ResourceObjectWrapper interface {
	ResourceObject() resource.Object
}

ResourceObjectWrapper describes anything which wraps a resource.Object, such that it can be extracted.

type ResourceWatcher

type ResourceWatcher interface {
	Add(context.Context, resource.Object) error
	Update(ctx context.Context, old, new resource.Object) error
	Delete(context.Context, resource.Object) error
}

ResourceWatcher describes an object which handles Add/Update/Delete actions for a resource

type RetryDequeuePolicy added in v0.9.9

type RetryDequeuePolicy func(newAction ResourceAction, newObject resource.Object, retryAction ResourceAction, retryObject resource.Object, retryError error) bool

RetryDequeuePolicy is a function that defines when a retry should be dequeued when a new action is taken on a resource. It accepts information about the new action being taken, and information about the current queued retry, and returns `true` if the retry should be dequeued. A RetryDequeuePolicy may be called multiple times for the same action, depending on the number of pending retries for the object.

type RetryPolicy

type RetryPolicy func(err error, attempt int) (bool, time.Duration)

RetryPolicy is a function that defines whether an event should be retried, based on the error and number of attempts. It returns a boolean indicating whether another attempt should be made, and a time.Duration after which that attempt should be made again.

func ExponentialBackoffRetryPolicy

func ExponentialBackoffRetryPolicy(initialDelay time.Duration, maxAttempts int) RetryPolicy

ExponentialBackoffRetryPolicy returns an Exponential Backoff RetryPolicy function, which follows the following formula: retry time = initialDelay * (2^attempt). If maxAttempts is exceeded, it will return false for the retry.

type SimpleWatcher

type SimpleWatcher struct {
	AddFunc    func(context.Context, resource.Object) error
	UpdateFunc func(context.Context, resource.Object, resource.Object) error
	DeleteFunc func(context.Context, resource.Object) error
}

SimpleWatcher is a struct that implements ResourceWatcher, but takes no action on its own. For each method in (Add, Update, Delete) the corresponding exported function field is called, if non-nil.

func (*SimpleWatcher) Add

func (w *SimpleWatcher) Add(ctx context.Context, object resource.Object) error

Add calls AddFunc, if non-nil

func (*SimpleWatcher) Delete

func (w *SimpleWatcher) Delete(ctx context.Context, object resource.Object) error

Delete calls DeleteFunc, if non-nil

func (*SimpleWatcher) Update

func (w *SimpleWatcher) Update(ctx context.Context, old resource.Object, new resource.Object) error

Update calls UpdateFunc, if non-nil

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL