watch

package
v0.39.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2025 License: Apache-2.0 Imports: 15 Imported by: 0

README

util/watch explainer

Here's a high-level overview of .../util/watch - our abstraction for the Kubernetes Watch API.

Our implementation is similar to the client-go watch implementation, with some notable differences:

  1. client-go's is type-erased; ours is generic
  2. client-go's periodically "resyncs" when necessary, but does not provide the external control or ability to be notified on resync completion that would be necessary to use it. Ours provides explicit control for "relisting" (fundamentally the same as "resyncing", just a different name), with notification when relisting has finished.
  3. Ours has some additional features:
    1. Support for custom indexes
    2. The ability to choose when the handlers the initial listing are called (before or after returning; see watch.InitMode)
    3. Metrics describing the API calls, their results, and the current state of the watch (e.g. whether it's healthy)

How watching generally works

At a high level, watching generally works by first calling List to fetch the current state of the items, and then calling Watch with the resource version from listing to stream the changes to the objects since the initial fetch.

If the Watch stream closes, we create a new one from the latest resource version, and on error, we call List again ("relist") and retry the Watch. This commonly occurs when the resource version is too old.

So, our general flow looks like:

for item in List() {
    update store with item
}
stream := Watch()
loop {
    // stream events...
    loop {
        item, ok := <-stream
        if !ok {
            // stream closed; watch ended.
            goto rewatch
        }

        if item is error {
            goto relist
        }

        update store with item
    }

  relist:
    for item in List() {
        update store with item
    }

  rewatch:
    stream = Watch()
}

Note that resource versions are opaque strings (even if they happen to look a lot like monotonically increasing integers), so we must start a new Watch every time we call List, because we have no way to tell if an event represents the state before or after the List.

The basic interface

Calls to watch.Watch internally produce an event stream. These are exposed with watch.HandlerFuncs, a set of callbacks used on every event — whether that comes from updating the store with List or the actual Watch events themselves.

Our changes: Relisting

Relisting was originally motivated by the scheduler: There, we needed the ability to associate an incoming Pod with the VirtualMachine that owns it. Fundamentally, this is racy if we're only using watch: it's possible to get an event about the Pod before we get any events about the VirtualMachine.

In order to allow establishing happens-after relationship between an external event and the contents of the store, we have support for externally triggering a "relist" — which guarantees the contents of the store are at least as recent as the request to relist.

Concretely, this allows the scheduler to always fetch the VirtualMachine associated with a Pod by relisting if the VM is not immediately found (the store is probably just out of date!).

The details of this implementation can be found in the relisting portion of the watch.Watch function, and in (*watch.Store[T]).Relist().

Our changes: Custom indexes

In various places in the codebase, it's been useful to have mappings that are updated for each watch event, so that we can amortize the cost of lookups (rather than incurring a linear scan over all items every time we want to find a particular one, or set of them).

While indexes could be implemented inside of the callbacks, there's explicit support provided for them by the watch.Store, which allows lookups into the indexes to share the same lock as fetches from the full list of items in the watch.Store.

Some examples of indexes:

  • Indexing by namespace+name, for efficient lookup of a single item (watch.NameIndex)
  • Indexing by the node a VirtualMachine is on, to efficiently fetch all VMs on a node (pkg/agent/billing.VMNodeIndex)

... and a couple others used elsewhere :)

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Accessors

type Accessors[L any, T any] struct {
	Items func(L) []T
}

Accessors provides the "glue" functions for Watch to go from a list L (returned by the client's List) to the underlying slice of items []T

type Client

type Client[L any] interface {
	List(context.Context, metav1.ListOptions) (L, error)
	Watch(context.Context, metav1.ListOptions) (watch.Interface, error)
}

Client is implemented by the specific interfaces of kubernetes clients, like `Clientset.CoreV1().Pods(namespace)` or `..Nodes()`

This interface should be *already implemented* by whatever the correct client is.

type Config

type Config struct {
	// ObjectNameLogField determines the key given to the logger to use when describing the type
	// being watched -- for example, "pod" or "virtualmachine"
	//
	// This can help with standardizing keys between the watcher and everything else using it.
	ObjectNameLogField string

	// Metrics will be used by the Watch call to report some information about its internal
	// operations
	//
	// Refer to the Metrics and MetricsConfig types for more information.
	Metrics MetricsConfig

	// RetryRelistAfter gives a retry interval when a re-list fails. If left nil, then Watch will
	// not retry.
	RetryRelistAfter *util.TimeRange
	// RetryWatchAfter gives a retry interval when a non-initial watch fails. If left nil, then
	// Watch will not retry.
	RetryWatchAfter *util.TimeRange
}

Config is the miscellaneous configuration used by Watch

type FlatNameIndex added in v0.12.0

type FlatNameIndex[T any] struct {
	// contains filtered or unexported fields
}

func NewFlatNameIndex added in v0.12.0

func NewFlatNameIndex[T any]() *FlatNameIndex[T]

func (*FlatNameIndex[T]) Add added in v0.12.0

func (i *FlatNameIndex[T]) Add(obj *T)

func (*FlatNameIndex[T]) Delete added in v0.12.0

func (i *FlatNameIndex[T]) Delete(obj *T)

func (*FlatNameIndex[T]) Get added in v0.12.0

func (i *FlatNameIndex[T]) Get(name string) (obj *T, ok bool)

func (*FlatNameIndex[T]) Update added in v0.12.0

func (i *FlatNameIndex[T]) Update(oldObj, newObj *T)

type HandlerFuncs

type HandlerFuncs[P any] struct {
	AddFunc    func(obj P, preexisting bool)
	UpdateFunc func(oldObj P, newObj P)
	DeleteFunc func(obj P, mayBeStale bool)
}

HandlerFuncs provides the set of callbacks to use for events from Watch

type Index

type Index[T any] interface {
	Add(obj *T)
	Update(oldObj, newObj *T)
	Delete(obj *T)
}

Index represents types that provide some kind of additional index on top of the base listing

Indexing is functionally implemented in the same way that WatchHandlerFuncs is, with the main difference being that more things are done for you with WatchIndexes. In particular, indexes can be added and removed after the Watch has already started, and the locking behavior is explicit.

type IndexedStore

type IndexedStore[T any, I Index[T]] struct {
	*Store[T]
	// contains filtered or unexported fields
}

IndexedStore represents a WatchStore, wrapped with a privileged WatchIndex that can be used to efficiently answer queries.

func NewIndexedStore

func NewIndexedStore[T any, I Index[T]](store *Store[T], index I) IndexedStore[T, I]

NewIndexedStore creates a new IndexedWatchStore from the WatchStore and the index to use.

Note: the index type is assumed to have reference semantics; i.e. any shallow copy of the value will affect any other shallow copy.

For more information, refer to IndexedWatchStore.

func (IndexedStore[T, I]) GetIndexed

func (w IndexedStore[T, I]) GetIndexed(f func(I) (*T, bool)) (obj *T, ok bool)

func (IndexedStore[T, I]) ListIndexed

func (w IndexedStore[T, I]) ListIndexed(f func(I) []*T) (list []*T)

func (IndexedStore[T, I]) WithIndex

func (w IndexedStore[T, I]) WithIndex(f func(I))

WithIndex calls a function with the current state of the index, locking the WatchStore around it.

It is almost guaranteed to be an error to indirectly return the index with this function.

type InitMode

type InitMode string

InitMode dictates the behavior of Watch with respect to any initial calls to handlers.AddFunc before returning

If set to InitWatchModeSync, then AddFunc will be called while processing the initial listing, meaning that the returned WatchStore is guaranteed contain the state of the cluster (although it may update before any access).

Otherwise, if set to InitWatchModeDefer, then AddFunc will not be called until after Watch returns. Correspondingly, the WatchStore will not update until then either.

const (
	InitModeSync  InitMode = "sync"
	InitModeDefer InitMode = "defer"
)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics holds some common prometheus collectors that are used by Watch

The metrics used are:

- client_calls_total (number of calls to k8s client.{Watch,List}, labeled by method) - relist_requests_total (number of "relist" requests from the Store) - events_total (number of K8s watch.Events that have occurred, including errors) - errors_total (number of errors, either error events or re-List errors, labeled by source: ["List", "Watch", "Watch.Event"]) - alive_current (1 iff the watcher is currently running or failing, else 0) - failing_current (1 iff the watcher's last request failed *and* it's waiting to retry, else 0)

Prefixes are typically of the form "COMPONENT_watchers" (e.g. "autoscaling_agent_watchers"). Separate reporting per call to Watch is automatically done with the "watcher_instance" label attached to the metrics, using MetricsConfig.

A brief note about "alive" and "failing": Reading from a pair of collectors is fundamentally racy. It may be possible to temporarily view "failing" but not "alive".

func NewMetrics

func NewMetrics(prefix string) Metrics

NewMetrics creates a new set of metrics for one or many Watch calls

All metrics' names will be prefixed with the provided string.

func (*Metrics) MustRegister

func (m *Metrics) MustRegister(reg *prometheus.Registry)

MustRegister registers all the collectors in the Metrics

type MetricsConfig

type MetricsConfig struct {
	Metrics
	// Instance provides the value of the "watcher_instance" label that will be applied to all
	// metrics collected for the Watch call
	Instance string
}

type NameIndex

type NameIndex[T any] struct {
	// contains filtered or unexported fields
}

NameIndex is a WatchIndex that provides efficient lookup for a value with a particular name

func NewNameIndex

func NewNameIndex[T any]() *NameIndex[T]

func (*NameIndex[T]) Add

func (i *NameIndex[T]) Add(obj *T)

func (*NameIndex[T]) Delete

func (i *NameIndex[T]) Delete(obj *T)

func (*NameIndex[T]) Get

func (i *NameIndex[T]) Get(namespace string, name string) (obj *T, ok bool)

func (*NameIndex[T]) Update

func (i *NameIndex[T]) Update(oldObj, newObj *T)

type Object

type Object[T any] interface {
	~*T
	runtime.Object
	metav1.ObjectMetaAccessor
}

Object is implemented by pointers to T, where T is typically the resource that we're actually watching.

Example implementers: *corev1.Pod, *corev1.Node

type Store

type Store[T any] struct {
	// contains filtered or unexported fields
}

Store provides an interface for getting information about a list of Ts using the event listener from a previous call to Watch

func Watch

func Watch[C Client[L], L metav1.ListMetaAccessor, T any, P Object[T]](
	ctx context.Context,
	logger *zap.Logger,
	client C,
	config Config,
	accessors Accessors[L, T],
	mode InitMode,
	listOpts metav1.ListOptions,
	handlers HandlerFuncs[P],
) (*Store[T], error)

Watch starts a goroutine for watching events, using the provided WatchHandlerFuncs as the callbacks for each type of event.

The type C is the kubernetes client we use to get the objects, L representing a list of these, T representing the object type, and P as a pointer to T.

func (*Store[T]) Failing added in v0.14.1

func (w *Store[T]) Failing() bool

func (*Store[T]) Items

func (w *Store[T]) Items() []*T

func (*Store[T]) Relist

func (w *Store[T]) Relist() <-chan struct{}

Relist triggers re-listing the WatchStore, returning a channel that will be closed once the re-list is complete

func (*Store[T]) Stop

func (w *Store[T]) Stop()

func (*Store[T]) Stopped

func (w *Store[T]) Stopped() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL