queue

package
v0.19.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2024 License: GPL-3.0 Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrFIFOClosed = xerrors.New("DeltaFIFO: manipulating with closed queue")

ErrFIFOClosed used when FIFO is closed

View Source
var (
	// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
	// object with zero length is encountered (should be impossible,
	// but included for completeness).
	ErrZeroLengthDeltasObject = xerrors.New("0 length Deltas object; can't get key")
)

Functions

func MetaNamespaceKeyFunc

func MetaNamespaceKeyFunc(obj any) (string, error)

func Pop

func Pop(queue Queue) any

Pop is helper function for popping from Queue. WARNING: Do NOT use this function in non-test code to avoid races unless you really really really really know what you are doing.

Types

type DeletedFinalStateUnknown

type DeletedFinalStateUnknown struct {
	Key string
	Obj any
}

DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object was deleted but the watch deletion event was missed while disconnected from apiserver. In this case we don't know the final "resting" state of the object, so there's a chance the included `Obj` is stale.

type Delta

type Delta struct {
	Type   DeltaType
	Object any
}

Delta is a member of Deltas (a list of Delta objects) which in its turn is the type stored by a DeltaFIFO. It tells you what change happened, and the object's state after* that change.

[*] Unless the change is a deletion, and then you'll get the final state of the object before it was deleted.

type DeltaFIFO

type DeltaFIFO struct {
	// contains filtered or unexported fields
}

DeltaFIFO is like FIFO, but differs in two ways. One is that the accumulator associated with a given object's key is not that object but rather a Deltas, which is a slice of Delta values for that object. Applying an object to a Deltas means to append a Delta except when the potentially appended Delta is a Deleted and the Deltas already ends with a Deleted. In that case the Deltas does not grow, although the terminal Deleted will be replaced by the new Deleted if the older Deleted's object is a DeletedFinalStateUnknown.

The other difference is that DeltaFIFO has two additional ways that an object can be applied to an accumulator: Replaced and Sync. If EmitDeltaTypeReplaced is not set to true, Sync will be used in replace events for backwards compatibility. Sync is used for periodic resync events.

DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, and the consumer is whatever calls the Pop() method.

DeltaFIFO solves this use case:

  • You want to process every object change (delta) at most once.
  • When you process an object, you want to see everything that's happened to it since you last processed it.
  • You want to process the deletion of some of the objects.
  • You might want to periodically reprocess objects.

DeltaFIFO's Pop(), Get(), and GetByKey() methods return any to satisfy the Store/Queue interfaces, but they will always return an object of type Deltas. List() returns the newest object from each accumulator in the FIFO.

A DeltaFIFO's knownObjects KeyListerGetter provides the abilities to list Store keys and to get objects by Store key. The objects in question are called "known objects" and this set of objects modifies the behavior of the Delete, Replace, and Resync methods (each in a different way).

A note on threading: If you call Pop() in parallel from multiple threads, you could end up with multiple threads processing slightly different versions of the same object.

func NewDeltaFIFO deprecated

func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO

NewDeltaFIFO returns a Queue which can be used to process changes to items.

keyFunc is used to figure out what key an object should have. (It is exposed in the returned DeltaFIFO's KeyOf() method, with additional handling around deleted objects and queue state).

'knownObjects' may be supplied to modify the behavior of Delete, Replace, and Resync. It may be nil if you do not need those modifications.

consider merging keyLister with this object, tracking a list of "known" keys when Pop() is called. Have to think about how that affects error retrying.

NOTE: It is possible to misuse this and cause a race when using an
external known object source.
Whether there is a potential race depends on how the consumer
modifies knownObjects. In Pop(), process function is called under
lock, so it is safe to update data structures in it that need to be
in sync with the queue (e.g. knownObjects).

Example:
In case of sharedIndexInformer being a consumer
there is no race as knownObjects (s.indexer) is modified safely
under DeltaFIFO's lock. The only exceptions are GetStore() and
GetIndexer() methods, which expose ways to modify the underlying
storage. Currently these two methods are used for creating Lister
and internal tests.

Also see the comment on DeltaFIFO.

Warning: This constructs a DeltaFIFO that does not differentiate between events caused by a call to Replace (e.g., from a relist, which may contain object updates), and synthetic events caused by a periodic resync (which just emit the existing object). See https://issue.k8s.io/86015 for details.

Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` instead to receive a `Replaced` event depending on the type.

Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})

func NewDeltaFIFOWithOptions

func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO

NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to items. See also the comment on DeltaFIFO.

func (*DeltaFIFO) Add

func (f *DeltaFIFO) Add(obj any) error

Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.

func (*DeltaFIFO) AddIfNotPresent

func (f *DeltaFIFO) AddIfNotPresent(obj any) error

AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.

This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.

Important: obj must be a Deltas (the output of the Pop() function). Yes, this is different from the Add/Update/Delete functions.

func (*DeltaFIFO) Close

func (f *DeltaFIFO) Close()

Close the queue.

func (*DeltaFIFO) Delete

func (f *DeltaFIFO) Delete(obj any) error

Delete is just like Add, but makes a Deleted Delta. If the given object does not already exist, it will be ignored. (It may have already been deleted by a Replace (re-list), for example.) In this method `f.knownObjects`, if not nil, provides (via GetByKey) _additional_ objects that are considered to already exist.

func (*DeltaFIFO) Get

func (f *DeltaFIFO) Get(obj any) (item any, exists bool, err error)

Get returns the complete list of deltas for the requested item, or sets exists=false. You should treat the items returned inside the deltas as immutable.

func (*DeltaFIFO) GetByKey

func (f *DeltaFIFO) GetByKey(key string) (item any, exists bool, err error)

GetByKey returns the complete list of deltas for the requested item, setting exists=false if that list is empty. You should treat the items returned inside the deltas as immutable.

func (*DeltaFIFO) HasSynced

func (f *DeltaFIFO) HasSynced() bool

HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.

func (*DeltaFIFO) IsClosed

func (f *DeltaFIFO) IsClosed() bool

IsClosed checks if the queue is closed

func (*DeltaFIFO) KeyOf

func (f *DeltaFIFO) KeyOf(obj any) (string, error)

KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or DeletedFinalStateUnknown objects.

func (*DeltaFIFO) List

func (f *DeltaFIFO) List() []any

List returns a list of all the items; it returns the object from the most recent Delta. You should treat the items returned inside the deltas as immutable.

func (*DeltaFIFO) ListKeys

func (f *DeltaFIFO) ListKeys() []string

ListKeys returns a list of all the keys of the objects currently in the FIFO.

func (*DeltaFIFO) Pop

func (f *DeltaFIFO) Pop(process PopProcessFunc) (any, error)

Pop blocks until the queue has some items, and then returns one. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is returned, so if you don't successfully process it, you need to add it back with AddIfNotPresent(). process function is called under lock, so it is safe to update data structures in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc may return an instance of ErrRequeue with a nested error to indicate the current item should be requeued (equivalent to calling AddIfNotPresent under the lock). process should avoid expensive I/O operation so that other queue operations, i.e. Add() and Get(), won't be blocked for too long.

Pop returns a 'Deltas', which has a complete list of all the things that happened to the object (deltas) while it was sitting in the queue.

func (*DeltaFIFO) Replace

func (f *DeltaFIFO) Replace(list []any, _ string) error

Replace atomically does two things: (1) it adds the given objects using the Sync or Replace DeltaType and then (2) it does some deletions. In particular: for every pre-existing key K that is not the key of an object in `list` there is the effect of `Delete(DeletedFinalStateUnknown{K, O})` where O is current object of K. If `f.knownObjects == nil` then the pre-existing keys are those in `f.items` and the current object of K is the `.Newest()` of the Deltas associated with K. Otherwise the pre-existing keys are those listed by `f.knownObjects` and the current object of K is what `f.knownObjects.GetByKey(K)` returns.

func (*DeltaFIFO) Resync

func (f *DeltaFIFO) Resync() error

Resync adds, with a Sync type of Delta, every object listed by `f.knownObjects` whose key is not already queued for processing. If `f.knownObjects` is `nil` then Resync does nothing.

func (*DeltaFIFO) Update

func (f *DeltaFIFO) Update(obj any) error

Update is just like Add, but makes an Updated Delta.

type DeltaFIFOOptions

type DeltaFIFOOptions struct {

	// KeyFunction is used to figure out what key an object should have. (It's
	// exposed in the returned DeltaFIFO's KeyOf() method, with additional
	// handling around deleted objects and queue state).
	// Optional, the default is MetaNamespaceKeyFunc.
	KeyFunction KeyFunc

	// KnownObjects is expected to return a list of keys that the consumer of
	// this queue "knows about". It is used to decide which items are missing
	// when Replace() is called; 'Deleted' deltas are produced for the missing items.
	// KnownObjects may be nil if you can tolerate missing deletions on Replace().
	KnownObjects KeyListerGetter

	// EmitDeltaTypeReplaced indicates that the queue consumer
	// understands the Replaced DeltaType. Before the `Replaced` event type was
	// added, calls to Replace() were handled the same as Sync(). For
	// backwards-compatibility purposes, this is false by default.
	// When true, `Replaced` events will be sent for items passed to a Replace() call.
	// When false, `Sync` events will be sent instead.
	EmitDeltaTypeReplaced bool
}

DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are optional.

type DeltaType

type DeltaType string

DeltaType is the type of a change (addition, deletion, etc)

const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	// Replaced is emitted when we encountered watch errors and had to do a
	// relist. We don't know if the replaced object has changed.
	//
	// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
	// as well. Hence, Replaced is only emitted when the option
	// EmitDeltaTypeReplaced is true.
	Replaced DeltaType = "Replaced"
	// Sync is for synthetic events during a periodic resync.
	Sync DeltaType = "Sync"
)

Change type definition

type Deltas

type Deltas []Delta

Deltas is a list of one or more 'Delta's to an individual object. The oldest delta is at index 0, the newest delta is the last one.

func (Deltas) Newest

func (d Deltas) Newest() *Delta

Newest is a convenience function that returns the newest delta, or nil if there are no deltas.

func (Deltas) Oldest

func (d Deltas) Oldest() *Delta

Oldest is a convenience function that returns the oldest delta, or nil if there are no deltas.

type ErrRequeue

type ErrRequeue struct {
	// Err is returned by the Pop function
	Err error
}

ErrRequeue may be returned by a PopProcessFunc to safely requeue the current item. The value of Err will be returned from Pop.

func (ErrRequeue) Error

func (e ErrRequeue) Error() string

type ExplicitKey

type ExplicitKey string

ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for the object but not the object itself.

type FIFO

type FIFO struct {
	// contains filtered or unexported fields
}

FIFO is a Queue in which (a) each accumulator is simply the most recently provided object and (b) the collection of keys to process is a FIFO. The accumulators all start out empty, and deleting an object from its accumulator empties the accumulator. The Resync operation is a no-op.

Thus: if multiple adds/updates of a single object happen while that object's key is in the queue before it has been processed then it will only be processed once, and when it is processed the most recent version will be processed. This can't be done with a channel

FIFO solves this use case:

  • You want to process every object (exactly) once.
  • You want to process the most recent version of the object when you process it.
  • You do not want to process deleted objects, they should be removed from the queue.
  • You do not want to periodically reprocess objects.

Compare with DeltaFIFO for other use cases.

func NewFIFO

func NewFIFO(keyFunc KeyFunc) *FIFO

NewFIFO returns a Store which can be used to queue up items to process.

func (*FIFO) Add

func (f *FIFO) Add(obj any) error

Add inserts an item, and puts it in the queue. The item is only enqueued if it doesn't already exist in the set.

func (*FIFO) AddIfNotPresent

func (f *FIFO) AddIfNotPresent(obj any) error

AddIfNotPresent inserts an item, and puts it in the queue. If the item is already present in the set, it is neither enqueued nor added to the set.

This is useful in a single producer/consumer scenario so that the consumer can safely retry items without contending with the producer and potentially enqueueing stale items.

func (*FIFO) Close

func (f *FIFO) Close()

Close the queue.

func (*FIFO) Delete

func (f *FIFO) Delete(obj any) error

Delete removes an item. It doesn't add it to the queue, because this implementation assumes the consumer only cares about the objects, not the order in which they were created/added.

func (*FIFO) Get

func (f *FIFO) Get(obj any) (item any, exists bool, err error)

Get returns the requested item, or sets exists=false.

func (*FIFO) GetByKey

func (f *FIFO) GetByKey(key string) (item any, exists bool, err error)

GetByKey returns the requested item, or sets exists=false.

func (*FIFO) HasSynced

func (f *FIFO) HasSynced() bool

HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, or the first batch of items inserted by Replace() has been popped.

func (*FIFO) IsClosed

func (f *FIFO) IsClosed() bool

IsClosed checks if the queue is closed

func (*FIFO) List

func (f *FIFO) List() []any

List returns a list of all the items.

func (*FIFO) ListKeys

func (f *FIFO) ListKeys() []string

ListKeys returns a list of all the keys of the objects currently in the FIFO.

func (*FIFO) Pop

func (f *FIFO) Pop(process PopProcessFunc) (any, error)

Pop waits until an item is ready and processes it. If multiple items are ready, they are returned in the order in which they were added/updated. The item is removed from the queue (and the store) before it is processed, so if you don't successfully process it, it should be added back with AddIfNotPresent(). process function is called under lock, so it is safe update data structures in it that need to be in sync with the queue.

func (*FIFO) Replace

func (f *FIFO) Replace(list []any, _ string) error

Replace will delete the contents of 'f', using instead the given map. 'f' takes ownership of the map, you should not reference the map again after calling this function. f's queue is reset, too; upon return, it will contain the items in the map, in no particular order.

func (*FIFO) Resync

func (f *FIFO) Resync() error

Resync will ensure that every object in the Store has its key in the queue. This should be a no-op, because that property is maintained by all operations.

func (*FIFO) Update

func (f *FIFO) Update(obj any) error

Update is the same as Add in this implementation.

type Index

type Index map[string]sets.String

Index maps the indexed value to a set of keys in the store that match on that value

type IndexFunc

type IndexFunc func(obj any) ([]string, error)

IndexFunc knows how to compute the set of indexed values for an object.

type Indexer

type Indexer interface {
	Store
	// Index returns the stored objects whose set of indexed values
	// intersects the set of indexed values of the given object, for
	// the named index
	Index(indexName string, obj any) ([]any, error)
	// IndexKeys returns the storage keys of the stored objects whose
	// set of indexed values for the named index includes the given
	// indexed value
	IndexKeys(indexName, indexedValue string) ([]string, error)
	// ListIndexFuncValues returns all the indexed values of the given index
	ListIndexFuncValues(indexName string) []string
	// ByIndex returns the stored objects whose set of indexed values
	// for the named index includes the given indexed value
	ByIndex(indexName, indexedValue string) ([]any, error)
	// GetIndexers return the indexers
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
}

Indexer extends Store with multiple indices and restricts each accumulator to simply hold the current object (and be empty after Delete).

There are three kinds of strings here:

  1. a storage key, as defined in the Store interface,
  2. a name of an index, and
  3. an "indexed value", which is produced by an IndexFunc and can be a field value or any other string computed from the object.

func NewIndexer

func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer

NewIndexer returns an Indexer implemented simply with a map and a lock.

type Indexers

type Indexers map[string]IndexFunc

Indexers maps a name to an IndexFunc

type Indices

type Indices map[string]Index

Indices maps a name to an Index

type KeyError

type KeyError struct {
	Obj any
	Err error
}

KeyError will be returned any time a KeyFunc gives an error; it includes the object at fault.

func (KeyError) Error

func (k KeyError) Error() string

Error gives a human-readable description of the error.

func (KeyError) Unwrap

func (k KeyError) Unwrap() error

Unwrap implements errors.Unwrap

type KeyFunc

type KeyFunc func(obj any) (string, error)

KeyFunc knows how to make a key from an object. Implementations should be deterministic.

type KeyGetter

type KeyGetter interface {
	// GetByKey returns the value associated with the key, or sets exists=false.
	GetByKey(key string) (value any, exists bool, err error)
}

A KeyGetter is anything that knows how to get the value stored under a given key.

type KeyLister

type KeyLister interface {
	ListKeys() []string
}

A KeyLister is anything that knows how to list its keys.

type KeyListerGetter

type KeyListerGetter interface {
	KeyLister
	KeyGetter
}

A KeyListerGetter is anything that knows how to list its keys and look up by key.

type PopProcessFunc

type PopProcessFunc func(any) error

PopProcessFunc is passed to Pop() method of Queue interface. It is supposed to process the accumulator popped from the queue.

type Queue

type Queue interface {
	Store

	// Pop blocks until there is at least one key to process or the
	// Queue is closed.  In the latter case Pop returns with an error.
	// In the former case Pop atomically picks one key to process,
	// removes that (key, accumulator) association from the Store, and
	// processes the accumulator.  Pop returns the accumulator that
	// was processed and the result of processing.  The PopProcessFunc
	// may return an ErrRequeue{inner} and in this case Pop will (a)
	// return that (key, accumulator) association to the Queue as part
	// of the atomic processing and (b) return the inner error from
	// Pop.
	Pop(PopProcessFunc) (any, error)

	// AddIfNotPresent puts the given accumulator into the Queue (in
	// association with the accumulator's key) if and only if that key
	// is not already associated with a non-empty accumulator.
	AddIfNotPresent(any) error

	// HasSynced returns true if the first batch of keys have all been
	// popped.  The first batch of keys are those of the first Replace
	// operation if that happened before any Add, AddIfNotPresent,
	// Update, or Delete; otherwise the first batch is empty.
	HasSynced() bool

	// Close the queue
	Close()
}

Queue extends Store with a collection of Store keys to "process". Every Add, Update, or Delete may put the object's key in that collection. A Queue has a way to derive the corresponding key given an accumulator. A Queue can be accessed concurrently from multiple goroutines. A Queue can be "closed", after which Pop operations return an error.

type Store

type Store interface {

	// Add adds the given object to the accumulator associated with the given object's key
	Add(obj any) error

	// Update updates the given object in the accumulator associated with the given object's key
	Update(obj any) error

	// Delete deletes the given object from the accumulator associated with the given object's key
	Delete(obj any) error

	// List returns a list of all the currently non-empty accumulators
	List() []any

	// ListKeys returns a list of all the keys currently associated with non-empty accumulators
	ListKeys() []string

	// Get returns the accumulator associated with the given object's key
	Get(obj any) (item any, exists bool, err error)

	// GetByKey returns the accumulator associated with the given key
	GetByKey(key string) (item any, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]any, string) error

	// Resync is meaningless in the terms appearing here but has
	// meaning in some implementations that have non-trivial
	// additional behavior (e.g., DeltaFIFO).
	Resync() error
}

Store is a generic object storage and processing interface. A Store holds a map from string keys to accumulators, and has operations to add, update, and delete a given object to/from the accumulator currently associated with a given key. A Store also knows how to extract the key from a given object, so many operations are given only the object.

In the simplest Store implementations each accumulator is simply the last given object, or empty after Delete, and thus the Store's behavior is simple storage.

Reflector knows how to watch a server and update a Store. This package provides a variety of implementations of Store.

func NewStore

func NewStore(keyFunc KeyFunc) Store

NewStore returns a Store implemented simply with a map and a lock.

type ThreadSafeStore

type ThreadSafeStore interface {
	Add(key string, obj any)
	Update(key string, obj any)
	Delete(key string)
	Get(key string) (item any, exists bool)
	List() []any
	ListKeys() []string
	Replace(map[string]any, string)
	Index(indexName string, obj any) ([]any, error)
	IndexKeys(indexName, indexedValue string) ([]string, error)
	ListIndexFuncValues(name string) []string
	ByIndex(indexName, indexedValue string) ([]any, error)
	GetIndexers() Indexers

	// AddIndexers adds more indexers to this store.  If you call this after you already have data
	// in the store, the results are undefined.
	AddIndexers(newIndexers Indexers) error
	// Resync is a no-op and is deprecated
	Resync() error
}

ThreadSafeStore is an interface that allows concurrent indexed access to a storage backend. It is like Indexer but does not (necessarily) know how to extract the Store key from a given object.

TL;DR caveats: you must not modify anything returned by Get or List as it will break the indexing feature in addition to not being thread safe.

The guarantees of thread safety provided by List/Get are only valid if the caller treats returned items as read-only. For example, a pointer inserted in the store through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get` on the same key and modify the pointer in a non-thread-safe way. Also note that modifying objects stored by the indexers (if any) will *not* automatically lead to a re-index. So it's not a good idea to directly modify the objects returned by Get/List, in general.

func NewThreadSafeStore

func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore

NewThreadSafeStore creates a new instance of ThreadSafeStore.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL