Documentation ¶
Overview ¶
Package watcher is a library for computing the status of kubernetes resource objects based on watching object state from a cluster. It keeps watching until it is cancelled through the provided context. Updates on the status of objects are streamed back to the caller through a channel.
Watching Resources ¶
In order to watch a set of resources objects, create a StatusWatcher and pass in the list of object identifiers to the Watch function.
import ( "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" ) ids := []prune.ObjMetadata{ { GroupKind: schema.GroupKind{ Group: "apps", Kind: "Deployment", }, Name: "dep", Namespace: "default", } } statusWatcher := watcher.NewDefaultStatusWatcher(dynamicClient, mapper) ctx, cancelFunc := context.WithCancel(context.Background()) eventCh := statusWatcher.Watch(ctx, ids, watcher.Options{}) for e := range eventCh { // Handle event if e.Type == event.ErrorEvent { cancelFunc() return e.Err } }
Index ¶
- func DefaultIndexers() cache.Indexers
- func NewFilteredListWatchFromDynamicClient(ctx context.Context, client dynamic.Interface, ...) *cache.ListWatch
- func NewModifiedListWatchFromDynamicClient(ctx context.Context, client dynamic.Interface, ...) *cache.ListWatch
- type AllowListObjectFilter
- type BlindStatusWatcher
- type DefaultStatusWatcher
- type DynamicInformerFactory
- type EventFunnelClosedError
- type Filters
- type GroupKindNamespace
- type ObjectFilter
- type ObjectStatusReporter
- type Options
- type RESTScopeStrategy
- type StatusWatcher
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DefaultIndexers ¶ added in v0.37.0
DefaultIndexers returns the default set of cache indexers, namely the namespace indexer.
func NewFilteredListWatchFromDynamicClient ¶ added in v0.37.0
func NewFilteredListWatchFromDynamicClient( ctx context.Context, client dynamic.Interface, resource schema.GroupVersionResource, namespace string, filters *Filters, ) *cache.ListWatch
NewFilteredListWatchFromDynamicClient creates a new ListWatch from the specified client, resource, namespace, and optional filters.
func NewModifiedListWatchFromDynamicClient ¶ added in v0.37.0
func NewModifiedListWatchFromDynamicClient( ctx context.Context, client dynamic.Interface, resource schema.GroupVersionResource, namespace string, optionsModifier func(*metav1.ListOptions) error, ) *cache.ListWatch
NewModifiedListWatchFromDynamicClient creates a new ListWatch from the specified client, resource, namespace, and options modifier. Options modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function to apply modification to ListOptions with field selectors, label selectors, or any other desired options.
Types ¶
type AllowListObjectFilter ¶
type AllowListObjectFilter struct {
AllowList object.ObjMetadataSet
}
AllowListObjectFilter filters objects not in the allow list. AllowListObjectFilter implements ObjectFilter.
func (*AllowListObjectFilter) Filter ¶
func (f *AllowListObjectFilter) Filter(obj *unstructured.Unstructured) bool
Filter returns true if the object should be skipped, because it is NOT in the AllowList.
type BlindStatusWatcher ¶
type BlindStatusWatcher struct{}
BlindStatusWatcher sees nothing. BlindStatusWatcher sends no update or error events. BlindStatusWatcher waits patiently to be cancelled. BlindStatusWatcher implements the StatusWatcher interface.
func (BlindStatusWatcher) Watch ¶
func (w BlindStatusWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet, _ Options) <-chan event.Event
Watch nothing. See no changes.
type DefaultStatusWatcher ¶
type DefaultStatusWatcher struct { // DynamicClient is used to watch of resource objects. DynamicClient dynamic.Interface // Mapper is used to map from GroupKind to GroupVersionKind. Mapper meta.RESTMapper // ResyncPeriod is how often the objects are retrieved to re-synchronize, // in case any events were missed. ResyncPeriod time.Duration // StatusReader specifies a custom implementation of the // engine.StatusReader interface that will be used to compute reconcile // status for resource objects. StatusReader engine.StatusReader // ClusterReader is used to look up generated objects on-demand. // Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes // required for computing parent object status, to compensate for // controllers that aren't following status conventions. ClusterReader engine.ClusterReader // Indexers control how the watch cache is indexed, allowing namespace // filtering and field selectors. If you watch at namespace scope, you must // provide the namespace indexer. If you specify a field selector filter, // you must also provide an indexer for that field. Indexers cache.Indexers // Filters allows filtering the objects being watched. Filters *Filters }
DefaultStatusWatcher reports on status updates to a set of objects.
Use NewDefaultStatusWatcher to build a DefaultStatusWatcher with default settings.
func NewDefaultStatusWatcher ¶
func NewDefaultStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper) *DefaultStatusWatcher
NewDefaultStatusWatcher constructs a DynamicStatusWatcher with defaults chosen for general use. If you need different settings, consider building a DynamicStatusWatcher directly.
func (*DefaultStatusWatcher) Watch ¶
func (w *DefaultStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadataSet, opts Options) <-chan event.Event
Watch the cluster for changes made to the specified objects. Returns an event channel on which these updates (and errors) will be reported. Each update event includes the computed status of the object.
type DynamicInformerFactory ¶
type DynamicInformerFactory struct { Client dynamic.Interface ResyncPeriod time.Duration Indexers cache.Indexers Filters *Filters }
func NewDynamicInformerFactory ¶
func NewDynamicInformerFactory(client dynamic.Interface, resyncPeriod time.Duration) *DynamicInformerFactory
func (*DynamicInformerFactory) NewInformer ¶
func (f *DynamicInformerFactory) NewInformer(ctx context.Context, mapping *meta.RESTMapping, namespace string) cache.SharedIndexInformer
type EventFunnelClosedError ¶
type EventFunnelClosedError struct {
ContextError error
}
func (*EventFunnelClosedError) Error ¶
func (e *EventFunnelClosedError) Error() string
func (*EventFunnelClosedError) Is ¶
func (e *EventFunnelClosedError) Is(err error) bool
func (*EventFunnelClosedError) Unwrap ¶
func (e *EventFunnelClosedError) Unwrap() error
type GroupKindNamespace ¶
GroupKindNamespace identifies an informer target. When used as an informer target, the namespace is optional. When the namespace is empty for namespaced resources, all namespaces are watched.
func (GroupKindNamespace) GroupKind ¶
func (gkn GroupKindNamespace) GroupKind() schema.GroupKind
func (GroupKindNamespace) String ¶
func (gkn GroupKindNamespace) String() string
String returns a serialized form suitable for logging.
type ObjectFilter ¶
type ObjectFilter interface { // Filter returns true if the object should be skipped. Filter(obj *unstructured.Unstructured) bool }
ObjectFilter allows for filtering objects.
type ObjectStatusReporter ¶
type ObjectStatusReporter struct { // InformerFactory is used to build informers InformerFactory *DynamicInformerFactory // Mapper is used to map from GroupKind to GroupVersionKind. Mapper meta.RESTMapper // StatusReader specifies a custom implementation of the // engine.StatusReader interface that will be used to compute reconcile // status for resource objects. StatusReader engine.StatusReader // ClusterReader is used to look up generated objects on-demand. // Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes // required for computing parent object status, to compensate for // controllers that aren't following status conventions. ClusterReader engine.ClusterReader // GroupKinds is the list of GroupKinds to watch. Targets []GroupKindNamespace // ObjectFilter is used to decide which objects to ingore. ObjectFilter ObjectFilter // RESTScope specifies whether to ListAndWatch resources at the namespace // or cluster (root) level. Using root scope is more efficient, but // namespace scope may require fewer permissions. RESTScope meta.RESTScope // contains filtered or unexported fields }
ObjectStatusReporter reports on updates to objects (instances) using a network of informers to watch one or more resources (types).
Unlike SharedIndexInformer, ObjectStatusReporter...
- Reports object status.
- Can watch multiple resource types simultaneously.
- Specific objects can be ignored for efficiency by specifying an ObjectFilter.
- Resolves GroupKinds into Resources at runtime, to pick up newly added resources.
- Starts and Stops individual watches automaically to reduce errors when a CRD or Namespace is deleted.
- Resources can be watched in root-scope mode or namespace-scope mode, allowing the caller to optimize for efficiency or least-privilege.
- Gives unschedulable Pods (and objects that generate them) a 15s grace period before reporting them as Failed.
- Resets the RESTMapper cache automatically when CRDs are modified.
ObjectStatusReporter is NOT repeatable. It will panic if started more than once. If you need a repeatable factory, use DefaultStatusWatcher.
TODO: support detection of added/removed api extensions at runtime TODO: Watch CRDs & Namespaces, even if not in the set of IDs. TODO: Retry with backoff if in namespace-scoped mode, to allow CRDs & namespaces to be created asynchronously
func (*ObjectStatusReporter) HasSynced ¶
func (w *ObjectStatusReporter) HasSynced() bool
HasSynced returns true if all the started informers have been synced.
Use the following to block waiting for synchronization: synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
func (*ObjectStatusReporter) Start ¶
func (w *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event
func (*ObjectStatusReporter) Stop ¶ added in v0.31.1
func (w *ObjectStatusReporter) Stop()
Stop triggers the cancellation of the reporter context, and closure of the event channel without sending an error event.
type Options ¶
type Options struct { // RESTScopeStrategy specifies which strategy to use when listing and // watching resources. By default, the strategy is selected automatically. RESTScopeStrategy RESTScopeStrategy }
Options can be provided when creating a new StatusWatcher to customize the behavior.
type RESTScopeStrategy ¶
type RESTScopeStrategy int
const ( RESTScopeAutomatic RESTScopeStrategy = iota // automatic RESTScopeRoot // root RESTScopeNamespace // namespace )
func (RESTScopeStrategy) String ¶
func (i RESTScopeStrategy) String() string
type StatusWatcher ¶
type StatusWatcher interface { // Watch a set of objects for status updates. // Watching should stop if the context is cancelled. // Events should only be sent for the specified objects. // The event channel should be closed when the watching stops. Watch(context.Context, object.ObjMetadataSet, Options) <-chan event.Event }
StatusWatcher watches a set of objects for status updates.