watcher

package
v0.36.0-flux.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package watcher is a library for computing the status of kubernetes resource objects based on watching object state from a cluster. It keeps watching until it is cancelled through the provided context. Updates on the status of objects are streamed back to the caller through a channel.

Watching Resources

In order to watch a set of resources objects, create a StatusWatcher and pass in the list of object identifiers to the Watch function.

import (
  "github.com/fluxcd/cli-utils/pkg/kstatus/watcher"
)

ids := []prune.ObjMetadata{
  {
    GroupKind: schema.GroupKind{
      Group: "apps",
      Kind: "Deployment",
    },
    Name: "dep",
    Namespace: "default",
  }
}

statusWatcher := watcher.NewDefaultStatusWatcher(dynamicClient, mapper)
ctx, cancelFunc := context.WithCancel(context.Background())
eventCh := statusWatcher.Watch(ctx, ids, watcher.Options{})
for e := range eventCh {
   // Handle event
   if e.Type == event.ErrorEvent {
     cancelFunc()
     return e.Err
   }
}

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AllowListObjectFilter

type AllowListObjectFilter struct {
	AllowList object.ObjMetadataSet
}

AllowListObjectFilter filters objects not in the allow list. AllowListObjectFilter implements ObjectFilter.

func (*AllowListObjectFilter) Filter

Filter returns true if the object should be skipped, because it is NOT in the AllowList.

type BlindStatusWatcher

type BlindStatusWatcher struct{}

BlindStatusWatcher sees nothing. BlindStatusWatcher sends no update or error events. BlindStatusWatcher waits patiently to be cancelled. BlindStatusWatcher implements the StatusWatcher interface.

func (BlindStatusWatcher) Watch

Watch nothing. See no changes.

type DefaultStatusWatcher

type DefaultStatusWatcher struct {
	// DynamicClient is used to watch of resource objects.
	DynamicClient dynamic.Interface

	// Mapper is used to map from GroupKind to GroupVersionKind.
	Mapper meta.RESTMapper

	// ResyncPeriod is how often the objects are retrieved to re-synchronize,
	// in case any events were missed.
	ResyncPeriod time.Duration

	// StatusReader specifies a custom implementation of the
	// engine.StatusReader interface that will be used to compute reconcile
	// status for resource objects.
	StatusReader engine.StatusReader

	// ClusterReader is used to look up generated objects on-demand.
	// Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes
	// required for computing parent object status, to compensate for
	// controllers that aren't following status conventions.
	ClusterReader engine.ClusterReader
}

DefaultStatusWatcher reports on status updates to a set of objects.

Use NewDefaultStatusWatcher to build a DefaultStatusWatcher with default settings.

func NewDefaultStatusWatcher

func NewDefaultStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper) *DefaultStatusWatcher

NewDefaultStatusWatcher constructs a DynamicStatusWatcher with defaults chosen for general use. If you need different settings, consider building a DynamicStatusWatcher directly.

func (*DefaultStatusWatcher) Watch

func (w *DefaultStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadataSet, opts Options) <-chan event.Event

Watch the cluster for changes made to the specified objects. Returns an event channel on which these updates (and errors) will be reported. Each update event includes the computed status of the object.

type DynamicInformerFactory

type DynamicInformerFactory struct {
	Client       dynamic.Interface
	ResyncPeriod time.Duration
	Indexers     cache.Indexers
}

func NewDynamicInformerFactory

func NewDynamicInformerFactory(client dynamic.Interface, resyncPeriod time.Duration) *DynamicInformerFactory

func (*DynamicInformerFactory) NewInformer

func (f *DynamicInformerFactory) NewInformer(ctx context.Context, mapping *meta.RESTMapping, namespace string) cache.SharedIndexInformer

type EventFunnelClosedError

type EventFunnelClosedError struct {
	ContextError error
}

func (*EventFunnelClosedError) Error

func (e *EventFunnelClosedError) Error() string

func (*EventFunnelClosedError) Is

func (e *EventFunnelClosedError) Is(err error) bool

func (*EventFunnelClosedError) Unwrap

func (e *EventFunnelClosedError) Unwrap() error

type GroupKindNamespace

type GroupKindNamespace struct {
	Group     string
	Kind      string
	Namespace string
}

GroupKindNamespace identifies an informer target. When used as an informer target, the namespace is optional. When the namespace is empty for namespaced resources, all namespaces are watched.

func (GroupKindNamespace) GroupKind

func (gkn GroupKindNamespace) GroupKind() schema.GroupKind

func (GroupKindNamespace) String

func (gkn GroupKindNamespace) String() string

String returns a serialized form suitable for logging.

type ObjectFilter

type ObjectFilter interface {
	// Filter returns true if the object should be skipped.
	Filter(obj *unstructured.Unstructured) bool
}

ObjectFilter allows for filtering objects.

type ObjectStatusReporter

type ObjectStatusReporter struct {
	// InformerFactory is used to build informers
	InformerFactory *DynamicInformerFactory

	// Mapper is used to map from GroupKind to GroupVersionKind.
	Mapper meta.RESTMapper

	// StatusReader specifies a custom implementation of the
	// engine.StatusReader interface that will be used to compute reconcile
	// status for resource objects.
	StatusReader engine.StatusReader

	// ClusterReader is used to look up generated objects on-demand.
	// Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes
	// required for computing parent object status, to compensate for
	// controllers that aren't following status conventions.
	ClusterReader engine.ClusterReader

	// GroupKinds is the list of GroupKinds to watch.
	Targets []GroupKindNamespace

	// ObjectFilter is used to decide which objects to ingore.
	ObjectFilter ObjectFilter

	// RESTScope specifies whether to ListAndWatch resources at the namespace
	// or cluster (root) level. Using root scope is more efficient, but
	// namespace scope may require fewer permissions.
	RESTScope meta.RESTScope
	// contains filtered or unexported fields
}

ObjectStatusReporter reports on updates to objects (instances) using a network of informers to watch one or more resources (types).

Unlike SharedIndexInformer, ObjectStatusReporter...

  • Reports object status.
  • Can watch multiple resource types simultaneously.
  • Specific objects can be ignored for efficiency by specifying an ObjectFilter.
  • Resolves GroupKinds into Resources at runtime, to pick up newly added resources.
  • Starts and Stops individual watches automaically to reduce errors when a CRD or Namespace is deleted.
  • Resources can be watched in root-scope mode or namespace-scope mode, allowing the caller to optimize for efficiency or least-privilege.
  • Gives unschedulable Pods (and objects that generate them) a 15s grace period before reporting them as Failed.
  • Resets the RESTMapper cache automatically when CRDs are modified.

ObjectStatusReporter is NOT repeatable. It will panic if started more than once. If you need a repeatable factory, use DefaultStatusWatcher.

TODO: support detection of added/removed api extensions at runtime TODO: Watch CRDs & Namespaces, even if not in the set of IDs. TODO: Retry with backoff if in namespace-scoped mode, to allow CRDs & namespaces to be created asynchronously

func (*ObjectStatusReporter) HasSynced

func (w *ObjectStatusReporter) HasSynced() bool

HasSynced returns true if all the started informers have been synced.

Use the following to block waiting for synchronization: synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)

func (*ObjectStatusReporter) Start

func (w *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event

func (*ObjectStatusReporter) Stop

func (w *ObjectStatusReporter) Stop()

Stop triggers the cancellation of the reporter context, and closure of the event channel without sending an error event.

type Options

type Options struct {
	// RESTScopeStrategy specifies which strategy to use when listing and
	// watching resources. By default, the strategy is selected automatically.
	RESTScopeStrategy RESTScopeStrategy
}

Options can be provided when creating a new StatusWatcher to customize the behavior.

type RESTScopeStrategy

type RESTScopeStrategy int
const (
	RESTScopeAutomatic RESTScopeStrategy = iota // automatic
	RESTScopeRoot                               // root
	RESTScopeNamespace                          // namespace
)

func (RESTScopeStrategy) String

func (i RESTScopeStrategy) String() string

type StatusWatcher

type StatusWatcher interface {

	// Watch a set of objects for status updates.
	// Watching should stop if the context is cancelled.
	// Events should only be sent for the specified objects.
	// The event channel should be closed when the watching stops.
	Watch(context.Context, object.ObjMetadataSet, Options) <-chan event.Event
}

StatusWatcher watches a set of objects for status updates.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL