Documentation ¶
Index ¶
- Variables
- func MetaNamespaceKeyFunc(obj any) (string, error)
- func Pop(queue Queue) any
- type DeletedFinalStateUnknown
- type Delta
- type DeltaFIFO
- func (f *DeltaFIFO) Add(obj any) error
- func (f *DeltaFIFO) AddIfNotPresent(obj any) error
- func (f *DeltaFIFO) Close()
- func (f *DeltaFIFO) Delete(obj any) error
- func (f *DeltaFIFO) Get(obj any) (item any, exists bool, err error)
- func (f *DeltaFIFO) GetByKey(key string) (item any, exists bool, err error)
- func (f *DeltaFIFO) HasSynced() bool
- func (f *DeltaFIFO) IsClosed() bool
- func (f *DeltaFIFO) KeyOf(obj any) (string, error)
- func (f *DeltaFIFO) List() []any
- func (f *DeltaFIFO) ListKeys() []string
- func (f *DeltaFIFO) Pop(process PopProcessFunc) (any, error)
- func (f *DeltaFIFO) Replace(list []any, _ string) error
- func (f *DeltaFIFO) Resync() error
- func (f *DeltaFIFO) Update(obj any) error
- type DeltaFIFOOptions
- type DeltaType
- type Deltas
- type ErrRequeue
- type ExplicitKey
- type FIFO
- func (f *FIFO) Add(obj any) error
- func (f *FIFO) AddIfNotPresent(obj any) error
- func (f *FIFO) Close()
- func (f *FIFO) Delete(obj any) error
- func (f *FIFO) Get(obj any) (item any, exists bool, err error)
- func (f *FIFO) GetByKey(key string) (item any, exists bool, err error)
- func (f *FIFO) HasSynced() bool
- func (f *FIFO) IsClosed() bool
- func (f *FIFO) List() []any
- func (f *FIFO) ListKeys() []string
- func (f *FIFO) Pop(process PopProcessFunc) (any, error)
- func (f *FIFO) Replace(list []any, _ string) error
- func (f *FIFO) Resync() error
- func (f *FIFO) Update(obj any) error
- type Index
- type IndexFunc
- type Indexer
- type Indexers
- type Indices
- type KeyError
- type KeyFunc
- type KeyGetter
- type KeyLister
- type KeyListerGetter
- type PopProcessFunc
- type Queue
- type Store
- type ThreadSafeStore
Constants ¶
This section is empty.
Variables ¶
var ErrFIFOClosed = xerrors.New("DeltaFIFO: manipulating with closed queue")
ErrFIFOClosed used when FIFO is closed
var ( // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas // object with zero length is encountered (should be impossible, // but included for completeness). ErrZeroLengthDeltasObject = xerrors.New("0 length Deltas object; can't get key") )
Functions ¶
func MetaNamespaceKeyFunc ¶
Types ¶
type DeletedFinalStateUnknown ¶
DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object was deleted but the watch deletion event was missed while disconnected from apiserver. In this case we don't know the final "resting" state of the object, so there's a chance the included `Obj` is stale.
type Delta ¶
Delta is a member of Deltas (a list of Delta objects) which in its turn is the type stored by a DeltaFIFO. It tells you what change happened, and the object's state after* that change.
[*] Unless the change is a deletion, and then you'll get the final state of the object before it was deleted.
type DeltaFIFO ¶
type DeltaFIFO struct {
// contains filtered or unexported fields
}
DeltaFIFO is like FIFO, but differs in two ways. One is that the accumulator associated with a given object's key is not that object but rather a Deltas, which is a slice of Delta values for that object. Applying an object to a Deltas means to append a Delta except when the potentially appended Delta is a Deleted and the Deltas already ends with a Deleted. In that case the Deltas does not grow, although the terminal Deleted will be replaced by the new Deleted if the older Deleted's object is a DeletedFinalStateUnknown.
The other difference is that DeltaFIFO has two additional ways that an object can be applied to an accumulator: Replaced and Sync. If EmitDeltaTypeReplaced is not set to true, Sync will be used in replace events for backwards compatibility. Sync is used for periodic resync events.
DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, and the consumer is whatever calls the Pop() method.
DeltaFIFO solves this use case:
- You want to process every object change (delta) at most once.
- When you process an object, you want to see everything that's happened to it since you last processed it.
- You want to process the deletion of some of the objects.
- You might want to periodically reprocess objects.
DeltaFIFO's Pop(), Get(), and GetByKey() methods return any to satisfy the Store/Queue interfaces, but they will always return an object of type Deltas. List() returns the newest object from each accumulator in the FIFO.
A DeltaFIFO's knownObjects KeyListerGetter provides the abilities to list Store keys and to get objects by Store key. The objects in question are called "known objects" and this set of objects modifies the behavior of the Delete, Replace, and Resync methods (each in a different way).
A note on threading: If you call Pop() in parallel from multiple threads, you could end up with multiple threads processing slightly different versions of the same object.
func NewDeltaFIFO
deprecated
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO
NewDeltaFIFO returns a Queue which can be used to process changes to items.
keyFunc is used to figure out what key an object should have. (It is exposed in the returned DeltaFIFO's KeyOf() method, with additional handling around deleted objects and queue state).
'knownObjects' may be supplied to modify the behavior of Delete, Replace, and Resync. It may be nil if you do not need those modifications.
consider merging keyLister with this object, tracking a list of "known" keys when Pop() is called. Have to think about how that affects error retrying.
NOTE: It is possible to misuse this and cause a race when using an external known object source. Whether there is a potential race depends on how the consumer modifies knownObjects. In Pop(), process function is called under lock, so it is safe to update data structures in it that need to be in sync with the queue (e.g. knownObjects). Example: In case of sharedIndexInformer being a consumer there is no race as knownObjects (s.indexer) is modified safely under DeltaFIFO's lock. The only exceptions are GetStore() and GetIndexer() methods, which expose ways to modify the underlying storage. Currently these two methods are used for creating Lister and internal tests.
Also see the comment on DeltaFIFO.
Warning: This constructs a DeltaFIFO that does not differentiate between events caused by a call to Replace (e.g., from a relist, which may contain object updates), and synthetic events caused by a periodic resync (which just emit the existing object). See https://issue.k8s.io/86015 for details.
Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` instead to receive a `Replaced` event depending on the type.
Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
func NewDeltaFIFOWithOptions ¶
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO
NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to items. See also the comment on DeltaFIFO.
func (*DeltaFIFO) Add ¶
Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.
func (*DeltaFIFO) AddIfNotPresent ¶
AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.
This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.
Important: obj must be a Deltas (the output of the Pop() function). Yes, this is different from the Add/Update/Delete functions.
func (*DeltaFIFO) Delete ¶
Delete is just like Add, but makes a Deleted Delta. If the given object does not already exist, it will be ignored. (It may have already been deleted by a Replace (re-list), for example.) In this method `f.knownObjects`, if not nil, provides (via GetByKey) _additional_ objects that are considered to already exist.
func (*DeltaFIFO) Get ¶
Get returns the complete list of deltas for the requested item, or sets exists=false. You should treat the items returned inside the deltas as immutable.
func (*DeltaFIFO) GetByKey ¶
GetByKey returns the complete list of deltas for the requested item, setting exists=false if that list is empty. You should treat the items returned inside the deltas as immutable.
func (*DeltaFIFO) HasSynced ¶
HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.
func (*DeltaFIFO) KeyOf ¶
KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or DeletedFinalStateUnknown objects.
func (*DeltaFIFO) List ¶
List returns a list of all the items; it returns the object from the most recent Delta. You should treat the items returned inside the deltas as immutable.
func (*DeltaFIFO) ListKeys ¶
ListKeys returns a list of all the keys of the objects currently in the FIFO.
func (*DeltaFIFO) Pop ¶
func (f *DeltaFIFO) Pop(process PopProcessFunc) (any, error)
Pop blocks until the queue has some items, and then returns one. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is returned, so if you don't successfully process it, you need to add it back with AddIfNotPresent(). process function is called under lock, so it is safe to update data structures in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc may return an instance of ErrRequeue with a nested error to indicate the current item should be requeued (equivalent to calling AddIfNotPresent under the lock). process should avoid expensive I/O operation so that other queue operations, i.e. Add() and Get(), won't be blocked for too long.
Pop returns a 'Deltas', which has a complete list of all the things that happened to the object (deltas) while it was sitting in the queue.
func (*DeltaFIFO) Replace ¶
Replace atomically does two things: (1) it adds the given objects using the Sync or Replace DeltaType and then (2) it does some deletions. In particular: for every pre-existing key K that is not the key of an object in `list` there is the effect of `Delete(DeletedFinalStateUnknown{K, O})` where O is current object of K. If `f.knownObjects == nil` then the pre-existing keys are those in `f.items` and the current object of K is the `.Newest()` of the Deltas associated with K. Otherwise the pre-existing keys are those listed by `f.knownObjects` and the current object of K is what `f.knownObjects.GetByKey(K)` returns.
type DeltaFIFOOptions ¶
type DeltaFIFOOptions struct { // KeyFunction is used to figure out what key an object should have. (It's // exposed in the returned DeltaFIFO's KeyOf() method, with additional // handling around deleted objects and queue state). // Optional, the default is MetaNamespaceKeyFunc. KeyFunction KeyFunc // KnownObjects is expected to return a list of keys that the consumer of // this queue "knows about". It is used to decide which items are missing // when Replace() is called; 'Deleted' deltas are produced for the missing items. // KnownObjects may be nil if you can tolerate missing deletions on Replace(). KnownObjects KeyListerGetter // EmitDeltaTypeReplaced indicates that the queue consumer // understands the Replaced DeltaType. Before the `Replaced` event type was // added, calls to Replace() were handled the same as Sync(). For // backwards-compatibility purposes, this is false by default. // When true, `Replaced` events will be sent for items passed to a Replace() call. // When false, `Sync` events will be sent instead. EmitDeltaTypeReplaced bool }
DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are optional.
type DeltaType ¶
type DeltaType string
DeltaType is the type of a change (addition, deletion, etc)
const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" // Replaced is emitted when we encountered watch errors and had to do a // relist. We don't know if the replaced object has changed. // // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events // as well. Hence, Replaced is only emitted when the option // EmitDeltaTypeReplaced is true. Replaced DeltaType = "Replaced" // Sync is for synthetic events during a periodic resync. Sync DeltaType = "Sync" )
Change type definition
type Deltas ¶
type Deltas []Delta
Deltas is a list of one or more 'Delta's to an individual object. The oldest delta is at index 0, the newest delta is the last one.
type ErrRequeue ¶
type ErrRequeue struct { // Err is returned by the Pop function Err error }
ErrRequeue may be returned by a PopProcessFunc to safely requeue the current item. The value of Err will be returned from Pop.
func (ErrRequeue) Error ¶
func (e ErrRequeue) Error() string
type ExplicitKey ¶
type ExplicitKey string
ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for the object but not the object itself.
type FIFO ¶
type FIFO struct {
// contains filtered or unexported fields
}
FIFO is a Queue in which (a) each accumulator is simply the most recently provided object and (b) the collection of keys to process is a FIFO. The accumulators all start out empty, and deleting an object from its accumulator empties the accumulator. The Resync operation is a no-op.
Thus: if multiple adds/updates of a single object happen while that object's key is in the queue before it has been processed then it will only be processed once, and when it is processed the most recent version will be processed. This can't be done with a channel
FIFO solves this use case:
- You want to process every object (exactly) once.
- You want to process the most recent version of the object when you process it.
- You do not want to process deleted objects, they should be removed from the queue.
- You do not want to periodically reprocess objects.
Compare with DeltaFIFO for other use cases.
func (*FIFO) Add ¶
Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.
func (*FIFO) AddIfNotPresent ¶
AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.
This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.
func (*FIFO) Delete ¶
Delete removes an item. It doesn't add it to the queue, because this implementation assumes the consumer only cares about the objects, not the order in which they were created/added.
func (*FIFO) HasSynced ¶
HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.
func (*FIFO) ListKeys ¶
ListKeys returns a list of all the keys of the objects currently in the FIFO.
func (*FIFO) Pop ¶
func (f *FIFO) Pop(process PopProcessFunc) (any, error)
Pop waits until an item is ready and processes it. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is processed, so if you don't successfully process it, it should be added back with AddIfNotPresent(). process function is called under lock, so it is safe update data structures in it that need to be in sync with the queue.
func (*FIFO) Replace ¶
Replace will delete the contents of 'f', using instead the given map. 'f' takes ownership of the map, you should not reference the map again after calling this function. f's queue is reset, too; upon return, it will contain the items in the map, in no particular order.
type Indexer ¶
type Indexer interface { Store // Index returns the stored objects whose set of indexed values // intersects the set of indexed values of the given object, for // the named index Index(indexName string, obj any) ([]any, error) // IndexKeys returns the storage keys of the stored objects whose // set of indexed values for the named index includes the given // indexed value IndexKeys(indexName, indexedValue string) ([]string, error) // ListIndexFuncValues returns all the indexed values of the given index ListIndexFuncValues(indexName string) []string // ByIndex returns the stored objects whose set of indexed values // for the named index includes the given indexed value ByIndex(indexName, indexedValue string) ([]any, error) // GetIndexers return the indexers GetIndexers() Indexers // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error }
Indexer extends Store with multiple indices and restricts each accumulator to simply hold the current object (and be empty after Delete).
There are three kinds of strings here:
- a storage key, as defined in the Store interface,
- a name of an index, and
- an "indexed value", which is produced by an IndexFunc and can be a field value or any other string computed from the object.
func NewIndexer ¶
NewIndexer returns an Indexer implemented simply with a map and a lock.
type KeyError ¶
KeyError will be returned any time a KeyFunc gives an error; it includes the object at fault.
type KeyFunc ¶
KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyGetter ¶
type KeyGetter interface { // GetByKey returns the value associated with the key, or sets exists=false. GetByKey(key string) (value any, exists bool, err error) }
A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyLister ¶
type KeyLister interface {
ListKeys() []string
}
A KeyLister is anything that knows how to list its keys.
type KeyListerGetter ¶
A KeyListerGetter is anything that knows how to list its keys and look up by key.
type PopProcessFunc ¶
PopProcessFunc is passed to Pop() method of Queue interface. It is supposed to process the accumulator popped from the queue.
type Queue ¶
type Queue interface { Store // Pop blocks until there is at least one key to process or the // Queue is closed. In the latter case Pop returns with an error. // In the former case Pop atomically picks one key to process, // removes that (key, accumulator) association from the Store, and // processes the accumulator. Pop returns the accumulator that // was processed and the result of processing. The PopProcessFunc // may return an ErrRequeue{inner} and in this case Pop will (a) // return that (key, accumulator) association to the Queue as part // of the atomic processing and (b) return the inner error from // Pop. Pop(PopProcessFunc) (any, error) // AddIfNotPresent puts the given accumulator into the Queue (in // association with the accumulator's key) if and only if that key // is not already associated with a non-empty accumulator. AddIfNotPresent(any) error // HasSynced returns true if the first batch of keys have all been // popped. The first batch of keys are those of the first Replace // operation if that happened before any Add, AddIfNotPresent, // Update, or Delete; otherwise the first batch is empty. HasSynced() bool // Close the queue Close() }
Queue extends Store with a collection of Store keys to "process". Every Add, Update, or Delete may put the object's key in that collection. A Queue has a way to derive the corresponding key given an accumulator. A Queue can be accessed concurrently from multiple goroutines. A Queue can be "closed", after which Pop operations return an error.
type Store ¶
type Store interface { // Add adds the given object to the accumulator associated with the given object's key Add(obj any) error // Update updates the given object in the accumulator associated with the given object's key Update(obj any) error // Delete deletes the given object from the accumulator associated with the given object's key Delete(obj any) error // List returns a list of all the currently non-empty accumulators List() []any // ListKeys returns a list of all the keys currently associated with non-empty accumulators ListKeys() []string // Get returns the accumulator associated with the given object's key Get(obj any) (item any, exists bool, err error) // GetByKey returns the accumulator associated with the given key GetByKey(key string) (item any, exists bool, err error) // Replace will delete the contents of the store, using instead the // given list. Store takes ownership of the list, you should not reference // it after calling this function. Replace([]any, string) error // Resync is meaningless in the terms appearing here but has // meaning in some implementations that have non-trivial // additional behavior (e.g., DeltaFIFO). Resync() error }
Store is a generic object storage and processing interface. A Store holds a map from string keys to accumulators, and has operations to add, update, and delete a given object to/from the accumulator currently associated with a given key. A Store also knows how to extract the key from a given object, so many operations are given only the object.
In the simplest Store implementations each accumulator is simply the last given object, or empty after Delete, and thus the Store's behavior is simple storage.
Reflector knows how to watch a server and update a Store. This package provides a variety of implementations of Store.
type ThreadSafeStore ¶
type ThreadSafeStore interface { Add(key string, obj any) Update(key string, obj any) Delete(key string) Get(key string) (item any, exists bool) List() []any ListKeys() []string Replace(map[string]any, string) Index(indexName string, obj any) ([]any, error) IndexKeys(indexName, indexedValue string) ([]string, error) ListIndexFuncValues(name string) []string ByIndex(indexName, indexedValue string) ([]any, error) GetIndexers() Indexers // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(newIndexers Indexers) error // Resync is a no-op and is deprecated Resync() error }
ThreadSafeStore is an interface that allows concurrent indexed access to a storage backend. It is like Indexer but does not (necessarily) know how to extract the Store key from a given object.
TL;DR caveats: you must not modify anything returned by Get or List as it will break the indexing feature in addition to not being thread safe.
The guarantees of thread safety provided by List/Get are only valid if the caller treats returned items as read-only. For example, a pointer inserted in the store through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get` on the same key and modify the pointer in a non-thread-safe way. Also note that modifying objects stored by the indexers (if any) will *not* automatically lead to a re-index. So it's not a good idea to directly modify the objects returned by Get/List, in general.
func NewThreadSafeStore ¶
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore
NewThreadSafeStore creates a new instance of ThreadSafeStore.