cache

package
v0.27.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2023 License: Apache-2.0 Imports: 36 Imported by: 49,632

Documentation

Overview

Package cache is a client-side caching mechanism. It is useful for reducing the number of server calls you'd otherwise need to make. Reflector watches a server and updates a Store. Two stores are provided; one that simply caches objects (for example, to allow a scheduler to list currently available nodes), and one that additionally acts as a FIFO queue (for example, to allow a scheduler to process incoming pods).

Example
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()

// This will hold the downstream state, as we know it.
downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

// This will hold incoming changes. Note how we pass downstream in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
	KeyFunction:  MetaNamespaceKeyFunc,
	KnownObjects: downstream,
})

// Let's do threadsafe output to get predictable test results.
deletionCounter := make(chan string, 1000)

cfg := &Config{
	Queue:            fifo,
	ListerWatcher:    source,
	ObjectType:       &v1.Pod{},
	FullResyncPeriod: time.Millisecond * 100,
	RetryOnError:     false,

	// Let's implement a simple controller that just deletes
	// everything that comes in.
	Process: func(obj interface{}, isInInitialList bool) error {
		// Obj is from the Pop method of the Queue we make above.
		newest := obj.(Deltas).Newest()

		if newest.Type != Deleted {
			// Update our downstream store.
			err := downstream.Add(newest.Object)
			if err != nil {
				return err
			}

			// Delete this object.
			source.Delete(newest.Object.(runtime.Object))
		} else {
			// Update our downstream store.
			err := downstream.Delete(newest.Object)
			if err != nil {
				return err
			}

			// fifo's KeyOf is easiest, because it handles
			// DeletedFinalStateUnknown markers.
			key, err := fifo.KeyOf(newest.Object)
			if err != nil {
				return err
			}

			// Report this deletion.
			deletionCounter <- key
		}
		return nil
	},
}

// Create the controller and run it until we close stop.
stop := make(chan struct{})
defer close(stop)
go New(cfg).Run(stop)

// Let's add a few objects to the source.
testIDs := []string{"a-hello", "b-controller", "c-framework"}
for _, name := range testIDs {
	// Note that these pods are not valid-- the fake source doesn't
	// call validation or anything.
	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}})
}

// Let's wait for the controller to process the things we just added.
outputSet := sets.String{}
for i := 0; i < len(testIDs); i++ {
	outputSet.Insert(<-deletionCounter)
}

for _, key := range outputSet.List() {
	fmt.Println(key)
}
Output:

a-hello
b-controller
c-framework

Index

Examples

Constants

View Source
const (
	// NamespaceIndex is the lookup name for the most common index function, which is to index by the namespace field.
	NamespaceIndex string = "namespace"
)

Variables

View Source
var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue")

ErrFIFOClosed used when FIFO is closed

View Source
var (
	// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
	// object with zero length is encountered (should be impossible,
	// but included for completeness).
	ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
)

Functions

func DefaultWatchErrorHandler added in v0.19.0

func DefaultWatchErrorHandler(r *Reflector, err error)

DefaultWatchErrorHandler is the default implementation of WatchErrorHandler

func DeletionHandlingMetaNamespaceKeyFunc

func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error)

DeletionHandlingMetaNamespaceKeyFunc checks for DeletedFinalStateUnknown objects before calling MetaNamespaceKeyFunc.

func ListAll

func ListAll(store Store, selector labels.Selector, appendFn AppendFunc) error

ListAll calls appendFn with each value retrieved from store which matches the selector.

func ListAllByNamespace

func ListAllByNamespace(indexer Indexer, namespace string, selector labels.Selector, appendFn AppendFunc) error

ListAllByNamespace used to list items belongs to namespace from Indexer.

func MetaNamespaceIndexFunc

func MetaNamespaceIndexFunc(obj interface{}) ([]string, error)

MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace

func MetaNamespaceKeyFunc

func MetaNamespaceKeyFunc(obj interface{}) (string, error)

MetaNamespaceKeyFunc is a convenient default KeyFunc which knows how to make keys for API objects which implement meta.Interface. The key uses the format <namespace>/<name> unless <namespace> is empty, then it's just <name>.

TODO: replace key-as-string with a key-as-struct so that this packing/unpacking won't be necessary.

func NewIndexerInformer

func NewIndexerInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	indexers Indexers,
) (Indexer, Controller)

NewIndexerInformer returns an Indexer and a Controller for populating the index while also providing event notifications. You should only used the returned Index for Get/List operations; Add/Modify/Deletes will cause the event notifications to be faulty.

Parameters:

  • lw is list and watch functions for the source of the resource you want to be informed of.
  • objType is an object of the type that you expect to receive.
  • resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate calls, even if nothing changed). Otherwise, re-list will be delayed as long as possible (until the upstream source closes the watch or times out, or you stop the controller).
  • h is the object you want notifications sent to.
  • indexers is the indexer for the received object type.

func NewInformer

func NewInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
) (Store, Controller)

NewInformer returns a Store and a controller for populating the store while also providing event notifications. You should only used the returned Store for Get/List operations; Add/Modify/Deletes will cause the event notifications to be faulty.

Parameters:

  • lw is list and watch functions for the source of the resource you want to be informed of.
  • objType is an object of the type that you expect to receive.
  • resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate calls, even if nothing changed). Otherwise, re-list will be delayed as long as possible (until the upstream source closes the watch or times out, or you stop the controller).
  • h is the object you want notifications sent to.
Example
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()

// Let's do threadsafe output to get predictable test results.
deletionCounter := make(chan string, 1000)

// Make a controller that immediately deletes anything added to it, and
// logs anything deleted.
_, controller := NewInformer(
	source,
	&v1.Pod{},
	time.Millisecond*100,
	ResourceEventHandlerDetailedFuncs{
		AddFunc: func(obj interface{}, isInInitialList bool) {
			source.Delete(obj.(runtime.Object))
		},
		DeleteFunc: func(obj interface{}) {
			key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err != nil {
				key = "oops something went wrong with the key"
			}

			// Report this deletion.
			deletionCounter <- key
		},
	},
)

// Run the controller and run it until we close stop.
stop := make(chan struct{})
defer close(stop)
go controller.Run(stop)

// Let's add a few objects to the source.
testIDs := []string{"a-hello", "b-controller", "c-framework"}
for _, name := range testIDs {
	// Note that these pods are not valid-- the fake source doesn't
	// call validation or anything.
	source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: name}})
}

// Let's wait for the controller to process the things we just added.
outputSet := sets.String{}
for i := 0; i < len(testIDs); i++ {
	outputSet.Insert(<-deletionCounter)
}

for _, key := range outputSet.List() {
	fmt.Println(key)
}
Output:

a-hello
b-controller
c-framework

func NewNamespaceKeyedIndexerAndReflector

func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector)

NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector The indexer is configured to key on namespace

func NewTransformingIndexerInformer added in v0.23.0

func NewTransformingIndexerInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	indexers Indexers,
	transformer TransformFunc,
) (Indexer, Controller)

NewTransformingIndexerInformer returns an Indexer and a controller for populating the index while also providing event notifications. You should only used the returned Index for Get/List operations; Add/Modify/Deletes will cause the event notifications to be faulty. The given transform function will be called on all objects before they will be put into the Index and corresponding Add/Modify/Delete handlers will be invoked for them.

func NewTransformingInformer added in v0.23.0

func NewTransformingInformer(
	lw ListerWatcher,
	objType runtime.Object,
	resyncPeriod time.Duration,
	h ResourceEventHandler,
	transformer TransformFunc,
) (Store, Controller)

NewTransformingInformer returns a Store and a controller for populating the store while also providing event notifications. You should only used the returned Store for Get/List operations; Add/Modify/Deletes will cause the event notifications to be faulty. The given transform function will be called on all objects before they will put into the Store and corresponding Add/Modify/Delete handlers will be invoked for them.

func Pop

func Pop(queue Queue) interface{}

Pop is helper function for popping from Queue. WARNING: Do NOT use this function in non-test code to avoid races unless you really really really really know what you are doing.

NOTE: This function is deprecated and may be removed in the future without additional warning.

func SetReflectorMetricsProvider

func SetReflectorMetricsProvider(metricsProvider MetricsProvider)

SetReflectorMetricsProvider sets the metrics provider

func SplitMetaNamespaceKey

func SplitMetaNamespaceKey(key string) (namespace, name string, err error)

SplitMetaNamespaceKey returns the namespace and name that MetaNamespaceKeyFunc encoded into key.

TODO: replace key-as-string with a key-as-struct so that this packing/unpacking won't be necessary.

func WaitForCacheSync

func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool

WaitForCacheSync waits for caches to populate. It returns true if it was successful, false if the controller should shutdown callers should prefer WaitForNamedCacheSync()

func WaitForNamedCacheSync added in v0.16.4

func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool

WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages indicating that the caller identified by name is waiting for syncs, followed by either a successful or failed sync.

Types

type AppendFunc

type AppendFunc func(interface{})

AppendFunc is used to add a matching item to whatever list the caller is using

type Config

type Config struct {
	// The queue for your objects - has to be a DeltaFIFO due to
	// assumptions in the implementation. Your Process() function
	// should accept the output of this Queue's Pop() method.
	Queue

	// Something that can list and watch your objects.
	ListerWatcher

	// Something that can process a popped Deltas.
	Process ProcessFunc

	// ObjectType is an example object of the type this controller is
	// expected to handle.
	ObjectType runtime.Object

	// ObjectDescription is the description to use when logging type-specific information about this controller.
	ObjectDescription string

	// FullResyncPeriod is the period at which ShouldResync is considered.
	FullResyncPeriod time.Duration

	// ShouldResync is periodically used by the reflector to determine
	// whether to Resync the Queue. If ShouldResync is `nil` or
	// returns true, it means the reflector should proceed with the
	// resync.
	ShouldResync ShouldResyncFunc

	// If true, when Process() returns an error, re-enqueue the object.
	// TODO: add interface to let you inject a delay/backoff or drop
	//       the object completely if desired. Pass the object in
	//       question to this interface as a parameter.  This is probably moot
	//       now that this functionality appears at a higher level.
	RetryOnError bool

	// Called whenever the ListAndWatch drops the connection with an error.
	WatchErrorHandler WatchErrorHandler

	// WatchListPageSize is the requested chunk size of initial and relist watch lists.
	WatchListPageSize int64
}

Config contains all the settings for one of these low-level controllers.

type Controller

type Controller interface {
	// Run does two things.  One is to construct and run a Reflector
	// to pump objects/notifications from the Config's ListerWatcher
	// to the Config's Queue and possibly invoke the occasional Resync
	// on that Queue.  The other is to repeatedly Pop from the Queue
	// and process with the Config's ProcessFunc.  Both of these
	// continue until `stopCh` is closed.
	Run(stopCh <-chan struct{})

	// HasSynced delegates to the Config's Queue
	HasSynced() bool

	// LastSyncResourceVersion delegates to the Reflector when there
	// is one, otherwise returns the empty string
	LastSyncResourceVersion() string
}

Controller is a low-level controller that is parameterized by a Config and used in sharedIndexInformer.

func New

func New(c *Config) Controller

New makes a new Controller from the given Config.

type CounterMetric

type CounterMetric interface {
	Inc()
}

CounterMetric represents a single numerical value that only ever goes up.

type DeletedFinalStateUnknown

type DeletedFinalStateUnknown struct {
	Key string
	Obj interface{}
}

DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object was deleted but the watch deletion event was missed while disconnected from apiserver. In this case we don't know the final "resting" state of the object, so there's a chance the included `Obj` is stale.

type Delta

type Delta struct {
	Type   DeltaType
	Object interface{}
}

Delta is a member of Deltas (a list of Delta objects) which in its turn is the type stored by a DeltaFIFO. It tells you what change happened, and the object's state after* that change.

[*] Unless the change is a deletion, and then you'll get the final state of the object before it was deleted.

type DeltaFIFO

type DeltaFIFO struct {
	// contains filtered or unexported fields
}

DeltaFIFO is like FIFO, but differs in two ways. One is that the accumulator associated with a given object's key is not that object but rather a Deltas, which is a slice of Delta values for that object. Applying an object to a Deltas means to append a Delta except when the potentially appended Delta is a Deleted and the Deltas already ends with a Deleted. In that case the Deltas does not grow, although the terminal Deleted will be replaced by the new Deleted if the older Deleted's object is a DeletedFinalStateUnknown.

The other difference is that DeltaFIFO has two additional ways that an object can be applied to an accumulator: Replaced and Sync. If EmitDeltaTypeReplaced is not set to true, Sync will be used in replace events for backwards compatibility. Sync is used for periodic resync events.

DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, and the consumer is whatever calls the Pop() method.

DeltaFIFO solves this use case:

  • You want to process every object change (delta) at most once.
  • When you process an object, you want to see everything that's happened to it since you last processed it.
  • You want to process the deletion of some of the objects.
  • You might want to periodically reprocess objects.

DeltaFIFO's Pop(), Get(), and GetByKey() methods return interface{} to satisfy the Store/Queue interfaces, but they will always return an object of type Deltas. List() returns the newest object from each accumulator in the FIFO.

A DeltaFIFO's knownObjects KeyListerGetter provides the abilities to list Store keys and to get objects by Store key. The objects in question are called "known objects" and this set of objects modifies the behavior of the Delete, Replace, and Resync methods (each in a different way).

A note on threading: If you call Pop() in parallel from multiple threads, you could end up with multiple threads processing slightly different versions of the same object.

func NewDeltaFIFO deprecated

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO

NewDeltaFIFO returns a Queue which can be used to process changes to items.

keyFunc is used to figure out what key an object should have. (It is exposed in the returned DeltaFIFO's KeyOf() method, with additional handling around deleted objects and queue state).

'knownObjects' may be supplied to modify the behavior of Delete, Replace, and Resync. It may be nil if you do not need those modifications.

TODO: consider merging keyLister with this object, tracking a list of "known" keys when Pop() is called. Have to think about how that affects error retrying.

NOTE: It is possible to misuse this and cause a race when using an
external known object source.
Whether there is a potential race depends on how the consumer
modifies knownObjects. In Pop(), process function is called under
lock, so it is safe to update data structures in it that need to be
in sync with the queue (e.g. knownObjects).

Example:
In case of sharedIndexInformer being a consumer
(https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
there is no race as knownObjects (s.indexer) is modified safely
under DeltaFIFO's lock. The only exceptions are GetStore() and
GetIndexer() methods, which expose ways to modify the underlying
storage. Currently these two methods are used for creating Lister
and internal tests.

Also see the comment on DeltaFIFO.

Warning: This constructs a DeltaFIFO that does not differentiate between events caused by a call to Replace (e.g., from a relist, which may contain object updates), and synthetic events caused by a periodic resync (which just emit the existing object). See https://issue.k8s.io/86015 for details.

Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` instead to receive a `Replaced` event depending on the type.

Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})

func NewDeltaFIFOWithOptions added in v0.18.0

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO

NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to items. See also the comment on DeltaFIFO.

func (*DeltaFIFO) Add

func (f *DeltaFIFO) Add(obj interface{}) error

Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.

func (*DeltaFIFO) AddIfNotPresent

func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error

AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.

This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.

Important: obj must be a Deltas (the output of the Pop() function). Yes, this is different from the Add/Update/Delete functions.

func (*DeltaFIFO) Close

func (f *DeltaFIFO) Close()

Close the queue.

func (*DeltaFIFO) Delete

func (f *DeltaFIFO) Delete(obj interface{}) error

Delete is just like Add, but makes a Deleted Delta. If the given object does not already exist, it will be ignored. (It may have already been deleted by a Replace (re-list), for example.) In this method `f.knownObjects`, if not nil, provides (via GetByKey) _additional_ objects that are considered to already exist.

func (*DeltaFIFO) Get

func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error)

Get returns the complete list of deltas for the requested item, or sets exists=false. You should treat the items returned inside the deltas as immutable.

func (*DeltaFIFO) GetByKey

func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error)

GetByKey returns the complete list of deltas for the requested item, setting exists=false if that list is empty. You should treat the items returned inside the deltas as immutable.

func (*DeltaFIFO) HasSynced

func (f *DeltaFIFO) HasSynced() bool

HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.

func (*DeltaFIFO) IsClosed

func (f *DeltaFIFO) IsClosed() bool

IsClosed checks if the queue is closed

func (*DeltaFIFO) KeyOf

func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error)

KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or DeletedFinalStateUnknown objects.

func (*DeltaFIFO) List

func (f *DeltaFIFO) List() []interface{}

List returns a list of all the items; it returns the object from the most recent Delta. You should treat the items returned inside the deltas as immutable.

func (*DeltaFIFO) ListKeys

func (f *DeltaFIFO) ListKeys() []string

ListKeys returns a list of all the keys of the objects currently in the FIFO.

func (*DeltaFIFO) Pop

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error)

Pop blocks until the queue has some items, and then returns one. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is returned, so if you don't successfully process it, you need to add it back with AddIfNotPresent(). process function is called under lock, so it is safe to update data structures in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc may return an instance of ErrRequeue with a nested error to indicate the current item should be requeued (equivalent to calling AddIfNotPresent under the lock). process should avoid expensive I/O operation so that other queue operations, i.e. Add() and Get(), won't be blocked for too long.

Pop returns a 'Deltas', which has a complete list of all the things that happened to the object (deltas) while it was sitting in the queue.

func (*DeltaFIFO) Replace

func (f *DeltaFIFO) Replace(list []interface{}, _ string) error

Replace atomically does two things: (1) it adds the given objects using the Sync or Replace DeltaType and then (2) it does some deletions. In particular: for every pre-existing key K that is not the key of an object in `list` there is the effect of `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known object of K. The pre-existing keys are those in the union set of the keys in `f.items` and `f.knownObjects` (if not nil). The last known object for key K is the one present in the last delta in `f.items`. If there is no delta for K in `f.items`, it is the object in `f.knownObjects`

func (*DeltaFIFO) Resync

func (f *DeltaFIFO) Resync() error

Resync adds, with a Sync type of Delta, every object listed by `f.knownObjects` whose key is not already queued for processing. If `f.knownObjects` is `nil` then Resync does nothing.

func (*DeltaFIFO) Update

func (f *DeltaFIFO) Update(obj interface{}) error

Update is just like Add, but makes an Updated Delta.

type DeltaFIFOOptions added in v0.18.0

type DeltaFIFOOptions struct {

	// KeyFunction is used to figure out what key an object should have. (It's
	// exposed in the returned DeltaFIFO's KeyOf() method, with additional
	// handling around deleted objects and queue state).
	// Optional, the default is MetaNamespaceKeyFunc.
	KeyFunction KeyFunc

	// KnownObjects is expected to return a list of keys that the consumer of
	// this queue "knows about". It is used to decide which items are missing
	// when Replace() is called; 'Deleted' deltas are produced for the missing items.
	// KnownObjects may be nil if you can tolerate missing deletions on Replace().
	KnownObjects KeyListerGetter

	// EmitDeltaTypeReplaced indicates that the queue consumer
	// understands the Replaced DeltaType. Before the `Replaced` event type was
	// added, calls to Replace() were handled the same as Sync(). For
	// backwards-compatibility purposes, this is false by default.
	// When true, `Replaced` events will be sent for items passed to a Replace() call.
	// When false, `Sync` events will be sent instead.
	EmitDeltaTypeReplaced bool

	// If set, will be called for objects before enqueueing them. Please
	// see the comment on TransformFunc for details.
	Transformer TransformFunc
}

DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are optional.

type DeltaType

type DeltaType string

DeltaType is the type of a change (addition, deletion, etc)

const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	// Replaced is emitted when we encountered watch errors and had to do a
	// relist. We don't know if the replaced object has changed.
	//
	// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
	// as well. Hence, Replaced is only emitted when the option
	// EmitDeltaTypeReplaced is true.
	Replaced DeltaType = "Replaced"
	// Sync is for synthetic events during a periodic resync.
	Sync DeltaType = "Sync"
)

Change type definition

type Deltas

type Deltas []Delta

Deltas is a list of one or more 'Delta's to an individual object. The oldest delta is at index 0, the newest delta is the last one.

func (Deltas) Newest

func (d Deltas) Newest() *Delta

Newest is a convenience function that returns the newest delta, or nil if there are no deltas.

func (Deltas) Oldest

func (d Deltas) Oldest() *Delta

Oldest is a convenience function that returns the oldest delta, or nil if there are no deltas.

type ErrRequeue

type ErrRequeue struct {
	// Err is returned by the Pop function
	Err error
}

ErrRequeue may be returned by a PopProcessFunc to safely requeue the current item. The value of Err will be returned from Pop.

func (ErrRequeue) Error

func (e ErrRequeue) Error() string

type ExpirationCache

type ExpirationCache struct {
	// contains filtered or unexported fields
}

ExpirationCache implements the store interface

  1. All entries are automatically time stamped on insert a. The key is computed based off the original item/keyFunc b. The value inserted under that key is the timestamped item
  2. Expiration happens lazily on read based on the expiration policy a. No item can be inserted into the store while we're expiring *any* item in the cache.
  3. Time-stamps are stripped off unexpired entries before return

Note that the ExpirationCache is inherently slower than a normal threadSafeStore because it takes a write lock every time it checks if an item has expired.

func (*ExpirationCache) Add

func (c *ExpirationCache) Add(obj interface{}) error

Add timestamps an item and inserts it into the cache, overwriting entries that might exist under the same key.

func (*ExpirationCache) Delete

func (c *ExpirationCache) Delete(obj interface{}) error

Delete removes an item from the cache.

func (*ExpirationCache) Get

func (c *ExpirationCache) Get(obj interface{}) (interface{}, bool, error)

Get returns unexpired items. It purges the cache of expired items in the process.

func (*ExpirationCache) GetByKey

func (c *ExpirationCache) GetByKey(key string) (interface{}, bool, error)

GetByKey returns the item stored under the key, or sets exists=false.

func (*ExpirationCache) List

func (c *ExpirationCache) List() []interface{}

List retrieves a list of unexpired items. It purges the cache of expired items in the process.

func (*ExpirationCache) ListKeys

func (c *ExpirationCache) ListKeys() []string

ListKeys returns a list of all keys in the expiration cache.

func (*ExpirationCache) Replace

func (c *ExpirationCache) Replace(list []interface{}, resourceVersion string) error

Replace will convert all items in the given list to TimestampedEntries before attempting the replace operation. The replace operation will delete the contents of the ExpirationCache `c`.

func (*ExpirationCache) Resync

func (c *ExpirationCache) Resync() error

Resync is a no-op for one of these

func (*ExpirationCache) Update

func (c *ExpirationCache) Update(obj interface{}) error

Update has not been implemented yet for lack of a use case, so this method simply calls `Add`. This effectively refreshes the timestamp.

type ExpirationPolicy

type ExpirationPolicy interface {
	IsExpired(obj *TimestampedEntry) bool
}

ExpirationPolicy dictates when an object expires. Currently only abstracted out so unittests don't rely on the system clock.

type ExplicitKey

type ExplicitKey string

ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for the object but not the object itself.

type FIFO

type FIFO struct {
	// contains filtered or unexported fields
}

FIFO is a Queue in which (a) each accumulator is simply the most recently provided object and (b) the collection of keys to process is a FIFO. The accumulators all start out empty, and deleting an object from its accumulator empties the accumulator. The Resync operation is a no-op.

Thus: if multiple adds/updates of a single object happen while that object's key is in the queue before it has been processed then it will only be processed once, and when it is processed the most recent version will be processed. This can't be done with a channel

FIFO solves this use case:

  • You want to process every object (exactly) once.
  • You want to process the most recent version of the object when you process it.
  • You do not want to process deleted objects, they should be removed from the queue.
  • You do not want to periodically reprocess objects.

Compare with DeltaFIFO for other use cases.

func NewFIFO

func NewFIFO(keyFunc KeyFunc) *FIFO

NewFIFO returns a Store which can be used to queue up items to process.

func (*FIFO) Add

func (f *FIFO) Add(obj interface{}) error

Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.

func (*FIFO) AddIfNotPresent

func (f *FIFO) AddIfNotPresent(obj interface{}) error

AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.

This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.

func (*FIFO) Close

func (f *FIFO) Close()

Close the queue.

func (*FIFO) Delete

func (f *FIFO) Delete(obj interface{}) error

Delete removes an item. It doesn't add it to the queue, because this implementation assumes the consumer only cares about the objects, not the order in which they were created/added.

func (*FIFO) Get

func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error)

Get returns the requested item, or sets exists=false.

func (*FIFO) GetByKey

func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error)

GetByKey returns the requested item, or sets exists=false.

func (*FIFO) HasSynced

func (f *FIFO) HasSynced() bool

HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.

func (*FIFO) IsClosed

func (f *FIFO) IsClosed() bool

IsClosed checks if the queue is closed

func (*FIFO) List

func (f *FIFO) List() []interface{}

List returns a list of all the items.

func (*FIFO) ListKeys

func (f *FIFO) ListKeys() []string

ListKeys returns a list of all the keys of the objects currently in the FIFO.

func (*FIFO) Pop

func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error)

Pop waits until an item is ready and processes it. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is processed, so if you don't successfully process it, it should be added back with AddIfNotPresent(). process function is called under lock, so it is safe update data structures in it that need to be in sync with the queue.

func (*FIFO) Replace

func (f *FIFO) Replace(list []interface{}, resourceVersion string) error

Replace will delete the contents of 'f', using instead the given map. 'f' takes ownership of the map, you should not reference the map again after calling this function. f's queue is reset, too; upon return, it will contain the items in the map, in no particular order.

func (*FIFO) Resync

func (f *FIFO) Resync() error

Resync will ensure that every object in the Store has its key in the queue. This should be a no-op, because that property is maintained by all operations.

func (*FIFO) Update

func (f *FIFO) Update(obj interface{}) error

Update is the same as Add in this implementation.

type FakeCustomStore

type FakeCustomStore struct {
	AddFunc      func(obj interface{}) error
	UpdateFunc   func(obj interface{}) error
	DeleteFunc   func(obj interface{}) error
	ListFunc     func() []interface{}
	ListKeysFunc func() []string
	GetFunc      func(obj interface{}) (item interface{}, exists bool, err error)
	GetByKeyFunc func(key string) (item interface{}, exists bool, err error)
	ReplaceFunc  func(list []interface{}, resourceVersion string) error
	ResyncFunc   func() error
}

FakeCustomStore lets you define custom functions for store operations.

func (*FakeCustomStore) Add

func (f *FakeCustomStore) Add(obj interface{}) error

Add calls the custom Add function if defined

func (*FakeCustomStore) Delete

func (f *FakeCustomStore) Delete(obj interface{}) error

Delete calls the custom Delete function if defined

func (*FakeCustomStore) Get

func (f *FakeCustomStore) Get(obj interface{}) (item interface{}, exists bool, err error)

Get calls the custom Get function if defined

func (*FakeCustomStore) GetByKey

func (f *FakeCustomStore) GetByKey(key string) (item interface{}, exists bool, err error)

GetByKey calls the custom GetByKey function if defined

func (*FakeCustomStore) List

func (f *FakeCustomStore) List() []interface{}

List calls the custom List function if defined

func (*FakeCustomStore) ListKeys

func (f *FakeCustomStore) ListKeys() []string

ListKeys calls the custom ListKeys function if defined

func (*FakeCustomStore) Replace

func (f *FakeCustomStore) Replace(list []interface{}, resourceVersion string) error

Replace calls the custom Replace function if defined

func (*FakeCustomStore) Resync

func (f *FakeCustomStore) Resync() error

Resync calls the custom Resync function if defined

func (*FakeCustomStore) Update

func (f *FakeCustomStore) Update(obj interface{}) error

Update calls the custom Update function if defined

type FakeExpirationPolicy

type FakeExpirationPolicy struct {
	NeverExpire     sets.String
	RetrieveKeyFunc KeyFunc
}

FakeExpirationPolicy keeps the list for keys which never expires.

func (*FakeExpirationPolicy) IsExpired

func (p *FakeExpirationPolicy) IsExpired(obj *TimestampedEntry) bool

IsExpired used to check if object is expired.

type FilteringResourceEventHandler

type FilteringResourceEventHandler struct {
	FilterFunc func(obj interface{}) bool
	Handler    ResourceEventHandler
}

FilteringResourceEventHandler applies the provided filter to all events coming in, ensuring the appropriate nested handler method is invoked. An object that starts passing the filter after an update is considered an add, and an object that stops passing the filter after an update is considered a delete. Like the handlers, the filter MUST NOT modify the objects it is given.

func (FilteringResourceEventHandler) OnAdd

func (r FilteringResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool)

OnAdd calls the nested handler only if the filter succeeds

func (FilteringResourceEventHandler) OnDelete

func (r FilteringResourceEventHandler) OnDelete(obj interface{})

OnDelete calls the nested handler only if the filter succeeds

func (FilteringResourceEventHandler) OnUpdate

func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{})

OnUpdate ensures the proper handler is called depending on whether the filter matches

type GaugeMetric

type GaugeMetric interface {
	Set(float64)
}

GaugeMetric represents a single numerical value that can arbitrarily go up and down.

type GenericLister

type GenericLister interface {
	// List will return all objects across namespaces
	List(selector labels.Selector) (ret []runtime.Object, err error)
	// Get will attempt to retrieve assuming that name==key
	Get(name string) (runtime.Object, error)
	// ByNamespace will give you a GenericNamespaceLister for one namespace
	ByNamespace(namespace string) GenericNamespaceLister
}

GenericLister is a lister skin on a generic Indexer

func NewGenericLister

func NewGenericLister(indexer Indexer, resource schema.GroupResource) GenericLister

NewGenericLister creates a new instance for the genericLister.

type GenericNamespaceLister

type GenericNamespaceLister interface {
	// List will return all objects in this namespace
	List(selector labels.Selector) (ret []runtime.Object, err error)
	// Get will attempt to retrieve by namespace and name
	Get(name string) (runtime.Object, error)
}

GenericNamespaceLister is a lister skin on a generic Indexer

type Getter

type Getter interface {
	Get() *restclient.Request
}

Getter interface knows how to access Get method from RESTClient.

type Heap

type Heap struct {
	// contains filtered or unexported fields
}

Heap is a thread-safe producer/consumer queue that implements a heap data structure. It can be used to implement priority queues and similar data structures.

func NewHeap

func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap

NewHeap returns a Heap which can be used to queue up items to process.

func (*Heap) Add

func (h *Heap) Add(obj interface{}) error

Add inserts an item, and puts it in the queue. The item is updated if it already exists.

func (*Heap) AddIfNotPresent

func (h *Heap) AddIfNotPresent(obj interface{}) error

AddIfNotPresent inserts an item, and puts it in the queue. If an item with the key is present in the map, no changes is made to the item.

This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.

func (*Heap) BulkAdd

func (h *Heap) BulkAdd(list []interface{}) error

BulkAdd adds all the items in the list to the queue and then signals the condition variable. It is useful when the caller would like to add all of the items to the queue before consumer starts processing them.

func (*Heap) Close

func (h *Heap) Close()

Close the Heap and signals condition variables that may be waiting to pop items from the heap.

func (*Heap) Delete

func (h *Heap) Delete(obj interface{}) error

Delete removes an item.

func (*Heap) Get

func (h *Heap) Get(obj interface{}) (interface{}, bool, error)

Get returns the requested item, or sets exists=false.

func (*Heap) GetByKey

func (h *Heap) GetByKey(key string) (interface{}, bool, error)

GetByKey returns the requested item, or sets exists=false.

func (*Heap) IsClosed

func (h *Heap) IsClosed() bool

IsClosed returns true if the queue is closed.

func (*Heap) List

func (h *Heap) List() []interface{}

List returns a list of all the items.

func (*Heap) ListKeys

func (h *Heap) ListKeys() []string

ListKeys returns a list of all the keys of the objects currently in the Heap.

func (*Heap) Pop

func (h *Heap) Pop() (interface{}, error)

Pop waits until an item is ready. If multiple items are ready, they are returned in the order given by Heap.data.lessFunc.

func (*Heap) Update

func (h *Heap) Update(obj interface{}) error

Update is the same as Add in this implementation. When the item does not exist, it is added.

type Index

type Index map[string]sets.String

Index maps the indexed value to a set of keys in the store that match on that value

type IndexFunc

type IndexFunc func(obj interface{}) ([]string, error)

IndexFunc knows how to compute the set of indexed values for an object.

type Indexer

type Indexer interface {
	Store
	// Index returns the stored objects whose set of indexed values
	// intersects the set of indexed values of the given object, for
	// the named index
	Index(indexName string, obj interface{}) ([]interface{}, error)
	// IndexKeys returns the storage keys of the stored objects whose
	// set of indexed values for the named index includes the given
	// indexed value
	IndexKeys(indexName, indexedValue string) ([]string, error)
	// ListIndexFuncValues returns all the indexed values of the given index
	ListIndexFuncValues(indexName string) []string
	// ByIndex returns the stored objects whose set of indexed values
	// for the named index includes the given indexed value
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	// GetIndexers return the indexers
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
}

Indexer extends Store with multiple indices and restricts each accumulator to simply hold the current object (and be empty after Delete).

There are three kinds of strings here:

  1. a storage key, as defined in the Store interface,
  2. a name of an index, and
  3. an "indexed value", which is produced by an IndexFunc and can be a field value or any other string computed from the object.

func NewIndexer

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer

NewIndexer returns an Indexer implemented simply with a map and a lock.

type Indexers

type Indexers map[string]IndexFunc

Indexers maps a name to an IndexFunc

type Indices

type Indices map[string]Index

Indices maps a name to an Index

type InformerSynced

type InformerSynced func() bool

InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.

type KeyError

type KeyError struct {
	Obj interface{}
	Err error
}

KeyError will be returned any time a KeyFunc gives an error; it includes the object at fault.

func (KeyError) Error

func (k KeyError) Error() string

Error gives a human-readable description of the error.

func (KeyError) Unwrap added in v0.22.0

func (k KeyError) Unwrap() error

Unwrap implements errors.Unwrap

type KeyFunc

type KeyFunc func(obj interface{}) (string, error)

KeyFunc knows how to make a key from an object. Implementations should be deterministic.

func IndexFuncToKeyFuncAdapter

func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc

IndexFuncToKeyFuncAdapter adapts an indexFunc to a keyFunc. This is only useful if your index function returns unique values for every object. This conversion can create errors when more than one key is found. You should prefer to make proper key and index functions.

type KeyGetter

type KeyGetter interface {
	// GetByKey returns the value associated with the key, or sets exists=false.
	GetByKey(key string) (value interface{}, exists bool, err error)
}

A KeyGetter is anything that knows how to get the value stored under a given key.

type KeyLister

type KeyLister interface {
	ListKeys() []string
}

A KeyLister is anything that knows how to list its keys.

type KeyListerGetter

type KeyListerGetter interface {
	KeyLister
	KeyGetter
}

A KeyListerGetter is anything that knows how to list its keys and look up by key.

type LessFunc

type LessFunc func(interface{}, interface{}) bool

LessFunc is used to compare two objects in the heap.

type ListFunc

type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

ListFunc knows how to list resources

type ListWatch

type ListWatch struct {
	ListFunc  ListFunc
	WatchFunc WatchFunc
	// DisableChunking requests no chunking for this list watcher.
	DisableChunking bool
}

ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. It is a convenience function for users of NewReflector, etc. ListFunc and WatchFunc must not be nil

func NewFilteredListWatchFromClient

func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch

NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier. Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function to apply modification to ListOptions with a field selector, a label selector, or any other desired options.

func NewListWatchFromClient

func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch

NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.

func (*ListWatch) List

func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error)

List a set of apiserver resources

func (*ListWatch) Watch

func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)

Watch a set of apiserver resources

type Lister

type Lister interface {
	// List should return a list type object; the Items field will be extracted, and the
	// ResourceVersion field will be used to start the watch in the right place.
	List(options metav1.ListOptions) (runtime.Object, error)
}

Lister is any object that knows how to perform an initial list.

type ListerWatcher

type ListerWatcher interface {
	Lister
	Watcher
}

ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.

type MetricsProvider

type MetricsProvider interface {
	NewListsMetric(name string) CounterMetric
	NewListDurationMetric(name string) SummaryMetric
	NewItemsInListMetric(name string) SummaryMetric

	NewWatchesMetric(name string) CounterMetric
	NewShortWatchesMetric(name string) CounterMetric
	NewWatchDurationMetric(name string) SummaryMetric
	NewItemsInWatchMetric(name string) SummaryMetric

	NewLastResourceVersionMetric(name string) GaugeMetric
}

MetricsProvider generates various metrics used by the reflector.

type MutationCache

type MutationCache interface {
	GetByKey(key string) (interface{}, bool, error)
	ByIndex(indexName, indexKey string) ([]interface{}, error)
	Mutation(interface{})
}

MutationCache is able to take the result of update operations and stores them in an LRU that can be used to provide a more current view of a requested object. It requires interpreting resourceVersions for comparisons. Implementations must be thread-safe. TODO find a way to layer this into an informer/lister

func NewIntegerResourceVersionMutationCache

func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache

NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to deal with objects that have a resource version that:

  • is an integer
  • increases when updated
  • is comparable across the same resource in a namespace

Most backends will have these semantics. Indexer may be nil. ttl controls how long an item remains in the mutation cache before it is removed.

If includeAdds is true, objects in the mutation cache will be returned even if they don't exist in the underlying store. This is only safe if your use of the cache can handle mutation entries remaining in the cache for up to ttl when mutations and deletes occur very closely in time.

type MutationDetector added in v0.16.4

type MutationDetector interface {
	// AddObject adds the given object to the set being monitored for a while from now
	AddObject(obj interface{})

	// Run starts the monitoring and does not return until the monitoring is stopped.
	Run(stopCh <-chan struct{})
}

MutationDetector is able to monitor objects for mutation within a limited window of time

func NewCacheMutationDetector

func NewCacheMutationDetector(name string) MutationDetector

NewCacheMutationDetector creates a new instance for the defaultCacheMutationDetector.

type PopProcessFunc

type PopProcessFunc func(obj interface{}, isInInitialList bool) error

PopProcessFunc is passed to Pop() method of Queue interface. It is supposed to process the accumulator popped from the queue.

type ProcessFunc

type ProcessFunc func(obj interface{}, isInInitialList bool) error

ProcessFunc processes a single object.

type Queue

type Queue interface {
	Store

	// Pop blocks until there is at least one key to process or the
	// Queue is closed.  In the latter case Pop returns with an error.
	// In the former case Pop atomically picks one key to process,
	// removes that (key, accumulator) association from the Store, and
	// processes the accumulator.  Pop returns the accumulator that
	// was processed and the result of processing.  The PopProcessFunc
	// may return an ErrRequeue{inner} and in this case Pop will (a)
	// return that (key, accumulator) association to the Queue as part
	// of the atomic processing and (b) return the inner error from
	// Pop.
	Pop(PopProcessFunc) (interface{}, error)

	// AddIfNotPresent puts the given accumulator into the Queue (in
	// association with the accumulator's key) if and only if that key
	// is not already associated with a non-empty accumulator.
	AddIfNotPresent(interface{}) error

	// HasSynced returns true if the first batch of keys have all been
	// popped.  The first batch of keys are those of the first Replace
	// operation if that happened before any Add, AddIfNotPresent,
	// Update, or Delete; otherwise the first batch is empty.
	HasSynced() bool

	// Close the queue
	Close()
}

Queue extends Store with a collection of Store keys to "process". Every Add, Update, or Delete may put the object's key in that collection. A Queue has a way to derive the corresponding key given an accumulator. A Queue can be accessed concurrently from multiple goroutines. A Queue can be "closed", after which Pop operations return an error.

type Reflector

type Reflector struct {

	// WatchListPageSize is the requested chunk size of initial and resync watch lists.
	// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
	// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
	// it will turn off pagination to allow serving them from watch cache.
	// NOTE: It should be used carefully as paginated lists are always served directly from
	// etcd, which is significantly less efficient and may lead to serious performance and
	// scalability problems.
	WatchListPageSize int64
	// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
	ShouldResync func() bool
	// MaxInternalErrorRetryDuration defines how long we should retry internal errors returned by watch.
	MaxInternalErrorRetryDuration time.Duration
	// UseWatchList if turned on instructs the reflector to open a stream to bring data from the API server.
	// Streaming has the primary advantage of using fewer server's resources to fetch data.
	//
	// The old behaviour establishes a LIST request which gets data in chunks.
	// Paginated list is less efficient and depending on the actual size of objects
	// might result in an increased memory consumption of the APIServer.
	//
	// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
	UseWatchList bool
	// contains filtered or unexported fields
}

Reflector watches a specified resource and causes all changes to be reflected in the given store.

func NewNamedReflector

func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector

NewNamedReflector creates a new Reflector with the specified name. See NewReflectorWithOptions for further information.

func NewReflector

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector

NewReflector creates a new Reflector with its name defaulted to the closest source_file.go:line in the call stack that is outside this package. See NewReflectorWithOptions for further information.

func NewReflectorWithOptions added in v0.27.0

func NewReflectorWithOptions(lw ListerWatcher, expectedType interface{}, store Store, options ReflectorOptions) *Reflector

NewReflectorWithOptions creates a new Reflector object which will keep the given store up to date with the server's contents for the given resource. Reflector promises to only put things in the store that have the type of expectedType, unless expectedType is nil. If resyncPeriod is non-zero, then the reflector will periodically consult its ShouldResync function to determine whether to invoke the Store's Resync operation; `ShouldResync==nil` means always "yes". This enables you to use reflectors to periodically process everything as well as incrementally processing the things that change.

func (*Reflector) LastSyncResourceVersion

func (r *Reflector) LastSyncResourceVersion() string

LastSyncResourceVersion is the resource version observed when last sync with the underlying store The value returned is not synchronized with access to the underlying store and is not thread-safe

func (*Reflector) ListAndWatch

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error

ListAndWatch first lists all items and get the resource version at the moment of call, and then use the resource version to watch. It returns error if ListAndWatch didn't even try to initialize watch.

func (*Reflector) Run

func (r *Reflector) Run(stopCh <-chan struct{})

Run repeatedly uses the reflector's ListAndWatch to fetch all the objects and subsequent deltas. Run will exit when stopCh is closed.

type ReflectorOptions added in v0.27.0

type ReflectorOptions struct {
	// Name is the Reflector's name. If unset/unspecified, the name defaults to the closest source_file.go:line
	// in the call stack that is outside this package.
	Name string

	// TypeDescription is the Reflector's type description. If unset/unspecified, the type description is defaulted
	// using the following rules: if the expectedType passed to NewReflectorWithOptions was nil, the type description is
	// "<unspecified>". If the expectedType is an instance of *unstructured.Unstructured and its apiVersion and kind fields
	// are set, the type description is the string encoding of those. Otherwise, the type description is set to the
	// go type of expectedType..
	TypeDescription string

	// ResyncPeriod is the Reflector's resync period. If unset/unspecified, the resync period defaults to 0
	// (do not resync).
	ResyncPeriod time.Duration

	// Clock allows tests to control time. If unset defaults to clock.RealClock{}
	Clock clock.Clock
}

ReflectorOptions configures a Reflector.

type ResourceEventHandler

type ResourceEventHandler interface {
	OnAdd(obj interface{}, isInInitialList bool)
	OnUpdate(oldObj, newObj interface{})
	OnDelete(obj interface{})
}

ResourceEventHandler can handle notifications for events that happen to a resource. The events are informational only, so you can't return an error. The handlers MUST NOT modify the objects received; this concerns not only the top level of structure but all the data structures reachable from it.

  • OnAdd is called when an object is added.
  • OnUpdate is called when an object is modified. Note that oldObj is the last known state of the object-- it is possible that several changes were combined together, so you can't use this to see every single change. OnUpdate is also called when a re-list happens, and it will get called even if nothing changed. This is useful for periodically evaluating or syncing something.
  • OnDelete will get the final state of the item if it is known, otherwise it will get an object of type DeletedFinalStateUnknown. This can happen if the watch is closed and misses the delete event and we don't notice the deletion until the subsequent re-list.

type ResourceEventHandlerDetailedFuncs added in v0.27.0

type ResourceEventHandlerDetailedFuncs struct {
	AddFunc    func(obj interface{}, isInInitialList bool)
	UpdateFunc func(oldObj, newObj interface{})
	DeleteFunc func(obj interface{})
}

ResourceEventHandlerDetailedFuncs is exactly like ResourceEventHandlerFuncs except its AddFunc accepts the isInInitialList parameter, for propagating HasSynced.

func (ResourceEventHandlerDetailedFuncs) OnAdd added in v0.27.0

func (r ResourceEventHandlerDetailedFuncs) OnAdd(obj interface{}, isInInitialList bool)

OnAdd calls AddFunc if it's not nil.

func (ResourceEventHandlerDetailedFuncs) OnDelete added in v0.27.0

func (r ResourceEventHandlerDetailedFuncs) OnDelete(obj interface{})

OnDelete calls DeleteFunc if it's not nil.

func (ResourceEventHandlerDetailedFuncs) OnUpdate added in v0.27.0

func (r ResourceEventHandlerDetailedFuncs) OnUpdate(oldObj, newObj interface{})

OnUpdate calls UpdateFunc if it's not nil.

type ResourceEventHandlerFuncs

type ResourceEventHandlerFuncs struct {
	AddFunc    func(obj interface{})
	UpdateFunc func(oldObj, newObj interface{})
	DeleteFunc func(obj interface{})
}

ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or as few of the notification functions as you want while still implementing ResourceEventHandler. This adapter does not remove the prohibition against modifying the objects.

See ResourceEventHandlerDetailedFuncs if your use needs to propagate HasSynced.

func (ResourceEventHandlerFuncs) OnAdd

func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool)

OnAdd calls AddFunc if it's not nil.

func (ResourceEventHandlerFuncs) OnDelete

func (r ResourceEventHandlerFuncs) OnDelete(obj interface{})

OnDelete calls DeleteFunc if it's not nil.

func (ResourceEventHandlerFuncs) OnUpdate

func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{})

OnUpdate calls UpdateFunc if it's not nil.

type ResourceEventHandlerRegistration added in v0.26.0

type ResourceEventHandlerRegistration interface {
	// HasSynced reports if both the parent has synced and all pre-sync
	// events have been delivered.
	HasSynced() bool
}

Opaque interface representing the registration of ResourceEventHandler for a SharedInformer. Must be supplied back to the same SharedInformer's `RemoveEventHandler` to unregister the handlers.

Also used to tell if the handler is synced (has had all items in the initial list delivered).

type ResourceVersionComparator

type ResourceVersionComparator interface {
	CompareResourceVersion(lhs, rhs runtime.Object) int
}

ResourceVersionComparator is able to compare object versions.

type ResourceVersionUpdater added in v0.20.0

type ResourceVersionUpdater interface {
	// UpdateResourceVersion is called each time current resource version of the reflector
	// is updated.
	UpdateResourceVersion(resourceVersion string)
}

ResourceVersionUpdater is an interface that allows store implementation to track the current resource version of the reflector. This is especially important if storage bookmarks are enabled.

type RetryWithDeadline added in v0.25.0

type RetryWithDeadline interface {
	After(error)
	ShouldRetry() bool
}

func NewRetryWithDeadline added in v0.25.0

func NewRetryWithDeadline(maxRetryDuration, minResetPeriod time.Duration, isRetryable func(error) bool, clock clock.Clock) RetryWithDeadline

type SharedIndexInformer

type SharedIndexInformer interface {
	SharedInformer
	// AddIndexers add indexers to the informer before it starts.
	AddIndexers(indexers Indexers) error
	GetIndexer() Indexer
}

SharedIndexInformer provides add and get Indexers ability based on SharedInformer.

func NewSharedIndexInformer

func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer

NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See NewSharedIndexInformerWithOptions for full details.

func NewSharedIndexInformerWithOptions added in v0.27.0

func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer

NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher. The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each handler that with a non-zero requested resync period, whether added before or after the informer starts, the nominal resync period is the requested resync period rounded up to a multiple of the informer's resync checking period. Such an informer's resync checking period is established when the informer starts running, and is the maximum of (a) the minimum of the resync periods requested before the informer starts and the options.ResyncPeriod given here and (b) the constant `minimumResyncPeriod` defined in this file.

type SharedIndexInformerOptions added in v0.27.0

type SharedIndexInformerOptions struct {
	// ResyncPeriod is the default event handler resync period and resync check
	// period. If unset/unspecified, these are defaulted to 0 (do not resync).
	ResyncPeriod time.Duration

	// Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured.
	Indexers Indexers

	// ObjectDescription is the sharedIndexInformer's object description. This is passed through to the
	// underlying Reflector's type description.
	ObjectDescription string
}

SharedIndexInformerOptions configures a sharedIndexInformer.

type SharedInformer

type SharedInformer interface {
	// AddEventHandler adds an event handler to the shared informer using
	// the shared informer's resync period.  Events to a single handler are
	// delivered sequentially, but there is no coordination between
	// different handlers.
	// It returns a registration handle for the handler that can be used to
	// remove the handler again, or to tell if the handler is synced (has
	// seen every item in the initial list).
	AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
	// AddEventHandlerWithResyncPeriod adds an event handler to the
	// shared informer with the requested resync period; zero means
	// this handler does not care about resyncs.  The resync operation
	// consists of delivering to the handler an update notification
	// for every object in the informer's local cache; it does not add
	// any interactions with the authoritative storage.  Some
	// informers do no resyncs at all, not even for handlers added
	// with a non-zero resyncPeriod.  For an informer that does
	// resyncs, and for each handler that requests resyncs, that
	// informer develops a nominal resync period that is no shorter
	// than the requested period but may be longer.  The actual time
	// between any two resyncs may be longer than the nominal period
	// because the implementation takes time to do work and there may
	// be competing load and scheduling noise.
	// It returns a registration handle for the handler that can be used to remove
	// the handler again and an error if the handler cannot be added.
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
	// RemoveEventHandler removes a formerly added event handler given by
	// its registration handle.
	// This function is guaranteed to be idempotent, and thread-safe.
	RemoveEventHandler(handle ResourceEventHandlerRegistration) error
	// GetStore returns the informer's local cache as a Store.
	GetStore() Store
	// GetController is deprecated, it does nothing useful
	GetController() Controller
	// Run starts and runs the shared informer, returning after it stops.
	// The informer will be stopped when stopCh is closed.
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has been
	// informed by at least one full LIST of the authoritative state
	// of the informer's object collection.  This is unrelated to "resync".
	//
	// Note that this doesn't tell you if an individual handler is synced!!
	// For that, please call HasSynced on the handle returned by
	// AddEventHandler.
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
	// store. The value returned is not synchronized with access to the underlying store and is not
	// thread-safe.
	LastSyncResourceVersion() string

	// The WatchErrorHandler is called whenever ListAndWatch drops the
	// connection with an error. After calling this handler, the informer
	// will backoff and retry.
	//
	// The default implementation looks at the error type and tries to log
	// the error message at an appropriate level.
	//
	// There's only one handler, so if you call this multiple times, last one
	// wins; calling after the informer has been started returns an error.
	//
	// The handler is intended for visibility, not to e.g. pause the consumers.
	// The handler should return quickly - any expensive processing should be
	// offloaded.
	SetWatchErrorHandler(handler WatchErrorHandler) error

	// The TransformFunc is called for each object which is about to be stored.
	//
	// This function is intended for you to take the opportunity to
	// remove, transform, or normalize fields. One use case is to strip unused
	// metadata fields out of objects to save on RAM cost.
	//
	// Must be set before starting the informer.
	//
	// Please see the comment on TransformFunc for more details.
	SetTransform(handler TransformFunc) error

	// IsStopped reports whether the informer has already been stopped.
	// Adding event handlers to already stopped informers is not possible.
	// An informer already stopped will never be started again.
	IsStopped() bool
}

SharedInformer provides eventually consistent linkage of its clients to the authoritative state of a given collection of objects. An object is identified by its API group, kind/resource, namespace (if any), and name; the `ObjectMeta.UID` is not part of an object's ID as far as this contract is concerned. One SharedInformer provides linkage to objects of a particular API group and kind/resource. The linked object collection of a SharedInformer may be further restricted to one namespace (if applicable) and/or by label selector and/or field selector.

The authoritative state of an object is what apiservers provide access to, and an object goes through a strict sequence of states. An object state is either (1) present with a ResourceVersion and other appropriate content or (2) "absent".

A SharedInformer maintains a local cache --- exposed by GetStore(), by GetIndexer() in the case of an indexed informer, and possibly by machinery involved in creating and/or accessing the informer --- of the state of each relevant object. This cache is eventually consistent with the authoritative state. This means that, unless prevented by persistent communication problems, if ever a particular object ID X is authoritatively associated with a state S then for every SharedInformer I whose collection includes (X, S) eventually either (1) I's cache associates X with S or a later state of X, (2) I is stopped, or (3) the authoritative state service for X terminates. To be formally complete, we say that the absent state meets any restriction by label selector or field selector.

For a given informer and relevant object ID X, the sequence of states that appears in the informer's cache is a subsequence of the states authoritatively associated with X. That is, some states might never appear in the cache but ordering among the appearing states is correct. Note, however, that there is no promise about ordering between states seen for different objects.

The local cache starts out empty, and gets populated and updated during `Run()`.

As a simple example, if a collection of objects is henceforth unchanging, a SharedInformer is created that links to that collection, and that SharedInformer is `Run()` then that SharedInformer's cache eventually holds an exact copy of that collection (unless it is stopped too soon, the authoritative state service ends, or communication problems between the two persistently thwart achievement).

As another simple example, if the local cache ever holds a non-absent state for some object ID and the object is eventually removed from the authoritative state then eventually the object is removed from the local cache (unless the SharedInformer is stopped too soon, the authoritative state service ends, or communication problems persistently thwart the desired result).

The keys in the Store are of the form namespace/name for namespaced objects, and are simply the name for non-namespaced objects. Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for a given object, and `SplitMetaNamespaceKey(key)` to split a key into its constituent parts.

Every query against the local cache is answered entirely from one snapshot of the cache's state. Thus, the result of a `List` call will not contain two entries with the same namespace and name.

A client is identified here by a ResourceEventHandler. For every update to the SharedInformer's local cache and for every client added before `Run()`, eventually either the SharedInformer is stopped or the client is notified of the update. A client added after `Run()` starts gets a startup batch of notifications of additions of the objects existing in the cache at the time that client was added; also, for every update to the SharedInformer's local cache after that client was added, eventually either the SharedInformer is stopped or that client is notified of that update. Client notifications happen after the corresponding cache update and, in the case of a SharedIndexInformer, after the corresponding index updates. It is possible that additional cache and index updates happen before such a prescribed notification. For a given SharedInformer and client, the notifications are delivered sequentially. For a given SharedInformer, client, and object ID, the notifications are delivered in order. Because `ObjectMeta.UID` has no role in identifying objects, it is possible that when (1) object O1 with ID (e.g. namespace and name) X and `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted and later (2) another object O2 with ID X and ObjectMeta.UID U2 is created the informer's clients are not notified of (1) and (2) but rather are notified only of an update from O1 to O2. Clients that need to detect such cases might do so by comparing the `ObjectMeta.UID` field of the old and the new object in the code that handles update notifications (i.e. `OnUpdate` method of ResourceEventHandler).

A client must process each notification promptly; a SharedInformer is not engineered to deal well with a large backlog of notifications to deliver. Lengthy processing should be passed off to something else, for example through a `client-go/util/workqueue`.

A delete notification exposes the last locally known non-absent state, except that its ResourceVersion is replaced with a ResourceVersion in which the object is actually absent.

func NewSharedInformer

func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer

NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details.

type ShouldResyncFunc

type ShouldResyncFunc func() bool

ShouldResyncFunc is a type of function that indicates if a reflector should perform a resync or not. It can be used by a shared informer to support multiple event handlers with custom resync periods.

type Store

type Store interface {

	// Add adds the given object to the accumulator associated with the given object's key
	Add(obj interface{}) error

	// Update updates the given object in the accumulator associated with the given object's key
	Update(obj interface{}) error

	// Delete deletes the given object from the accumulator associated with the given object's key
	Delete(obj interface{}) error

	// List returns a list of all the currently non-empty accumulators
	List() []interface{}

	// ListKeys returns a list of all the keys currently associated with non-empty accumulators
	ListKeys() []string

	// Get returns the accumulator associated with the given object's key
	Get(obj interface{}) (item interface{}, exists bool, err error)

	// GetByKey returns the accumulator associated with the given key
	GetByKey(key string) (item interface{}, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error

	// Resync is meaningless in the terms appearing here but has
	// meaning in some implementations that have non-trivial
	// additional behavior (e.g., DeltaFIFO).
	Resync() error
}

Store is a generic object storage and processing interface. A Store holds a map from string keys to accumulators, and has operations to add, update, and delete a given object to/from the accumulator currently associated with a given key. A Store also knows how to extract the key from a given object, so many operations are given only the object.

In the simplest Store implementations each accumulator is simply the last given object, or empty after Delete, and thus the Store's behavior is simple storage.

Reflector knows how to watch a server and update a Store. This package provides a variety of implementations of Store.

func NewExpirationStore

func NewExpirationStore(keyFunc KeyFunc, expirationPolicy ExpirationPolicy) Store

NewExpirationStore creates and returns a ExpirationCache for a given policy

func NewFakeExpirationStore

func NewFakeExpirationStore(keyFunc KeyFunc, deletedKeys chan<- string, expirationPolicy ExpirationPolicy, cacheClock clock.Clock) Store

NewFakeExpirationStore creates a new instance for the ExpirationCache.

func NewStore

func NewStore(keyFunc KeyFunc) Store

NewStore returns a Store implemented simply with a map and a lock.

func NewTTLStore

func NewTTLStore(keyFunc KeyFunc, ttl time.Duration) Store

NewTTLStore creates and returns a ExpirationCache with a TTLPolicy

type SummaryMetric

type SummaryMetric interface {
	Observe(float64)
}

SummaryMetric captures individual observations.

type TTLPolicy

type TTLPolicy struct {
	//	 >0: Expire entries with an age > ttl
	//	<=0: Don't expire any entry
	TTL time.Duration

	// Clock used to calculate ttl expiration
	Clock clock.Clock
}

TTLPolicy implements a ttl based ExpirationPolicy.

func (*TTLPolicy) IsExpired

func (p *TTLPolicy) IsExpired(obj *TimestampedEntry) bool

IsExpired returns true if the given object is older than the ttl, or it can't determine its age.

type ThreadSafeStore

type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	Replace(map[string]interface{}, string)
	Index(indexName string, obj interface{}) ([]interface{}, error)
	IndexKeys(indexName, indexedValue string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexedValue string) ([]interface{}, error)
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
	// Resync is a no-op and is deprecated
	Resync() error
}

ThreadSafeStore is an interface that allows concurrent indexed access to a storage backend. It is like Indexer but does not (necessarily) know how to extract the Store key from a given object.

TL;DR caveats: you must not modify anything returned by Get or List as it will break the indexing feature in addition to not being thread safe.

The guarantees of thread safety provided by List/Get are only valid if the caller treats returned items as read-only. For example, a pointer inserted in the store through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get` on the same key and modify the pointer in a non-thread-safe way. Also note that modifying objects stored by the indexers (if any) will *not* automatically lead to a re-index. So it's not a good idea to directly modify the objects returned by Get/List, in general.

func NewThreadSafeStore

func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore

NewThreadSafeStore creates a new instance of ThreadSafeStore.

type TimestampedEntry

type TimestampedEntry struct {
	Obj       interface{}
	Timestamp time.Time
	// contains filtered or unexported fields
}

TimestampedEntry is the only type allowed in a ExpirationCache. Keep in mind that it is not safe to share timestamps between computers. Behavior may be inconsistent if you get a timestamp from the API Server and use it on the client machine as part of your ExpirationCache.

type TransformFunc added in v0.23.0

type TransformFunc func(interface{}) (interface{}, error)

TransformFunc allows for transforming an object before it will be processed. TransformFunc (similarly to ResourceEventHandler functions) should be able to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.

New in v1.27: In such cases, the contained object will already have gone through the transform object separately (when it was added / updated prior to the delete), so the TransformFunc can likely safely ignore such objects (i.e., just return the input object).

The most common usage pattern is to clean-up some parts of the object to reduce component memory usage if a given component doesn't care about them.

New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc sees the object before any other actor, and it is now safe to mutate the object in place instead of making a copy.

Note that TransformFunc is called while inserting objects into the notification queue and is therefore extremely performance sensitive; please do not do anything that will take a long time.

type UndeltaStore

type UndeltaStore struct {
	Store
	PushFunc func([]interface{})
}

UndeltaStore listens to incremental updates and sends complete state on every change. It implements the Store interface so that it can receive a stream of mirrored objects from Reflector. Whenever it receives any complete (Store.Replace) or incremental change (Store.Add, Store.Update, Store.Delete), it sends the complete state by calling PushFunc. It is thread-safe. It guarantees that every change (Add, Update, Replace, Delete) results in one call to PushFunc, but sometimes PushFunc may be called twice with the same values. PushFunc should be thread safe.

func NewUndeltaStore

func NewUndeltaStore(pushFunc func([]interface{}), keyFunc KeyFunc) *UndeltaStore

NewUndeltaStore returns an UndeltaStore implemented with a Store.

func (*UndeltaStore) Add

func (u *UndeltaStore) Add(obj interface{}) error

Add inserts an object into the store and sends complete state by calling PushFunc. Note about thread safety. The Store implementation (cache.cache) uses a lock for all methods. In the functions below, the lock gets released and reacquired betweend the {Add,Delete,etc} and the List. So, the following can happen, resulting in two identical calls to PushFunc. time thread 1 thread 2 0 UndeltaStore.Add(a) 1 UndeltaStore.Add(b) 2 Store.Add(a) 3 Store.Add(b) 4 Store.List() -> [a,b] 5 Store.List() -> [a,b]

func (*UndeltaStore) Delete

func (u *UndeltaStore) Delete(obj interface{}) error

Delete removes an item from the cache and sends complete state by calling PushFunc.

func (*UndeltaStore) Replace

func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error

Replace will delete the contents of current store, using instead the given list. 'u' takes ownership of the list, you should not reference the list again after calling this function. The new contents complete state will be sent by calling PushFunc after replacement.

func (*UndeltaStore) Update

func (u *UndeltaStore) Update(obj interface{}) error

Update sets an item in the cache to its updated state and sends complete state by calling PushFunc.

type WatchErrorHandler added in v0.19.0

type WatchErrorHandler func(r *Reflector, err error)

The WatchErrorHandler is called whenever ListAndWatch drops the connection with an error. After calling this handler, the informer will backoff and retry.

The default implementation looks at the error type and tries to log the error message at an appropriate level.

Implementations of this handler may display the error message in other ways. Implementations should return quickly - any expensive processing should be offloaded.

type WatchFunc

type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

WatchFunc knows how to watch resources

type Watcher

type Watcher interface {
	// Watch should begin a watch at the specified version.
	Watch(options metav1.ListOptions) (watch.Interface, error)
}

Watcher is any object that knows how to start a watch on a resource.

Directories

Path Synopsis
Package synctrack contains utilities for helping controllers track whether they are "synced" or not, that is, whether they have processed all items from the informer's initial list.
Package synctrack contains utilities for helping controllers track whether they are "synced" or not, that is, whether they have processed all items from the informer's initial list.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL