resource

package
v1.17.0-pre.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2024 License: Apache-2.0 Imports: 30 Imported by: 5

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AddToScheme = localSchemeBuilder.AddToScheme

Functions

func EventStreamFromFiles

func EventStreamFromFiles[T runtime.Object](paths []string) func() stream.Observable[Event[T]]

EventStreamFromFiles returns an observable stream of events created from decoding the given slice of files and emitted as Upserts.

Types

type ErrorAction

type ErrorAction string
var (
	// ErrorActionRetry instructs to retry the processing. The key is requeued after
	// rate limiting.
	ErrorActionRetry ErrorAction = "retry"

	// ErrorActionIgnore instructs to ignore the error.
	ErrorActionIgnore ErrorAction = "ignore"

	// ErrorActionStop instructs to stop the processing for this subscriber.
	ErrorActionStop ErrorAction = "stop"
)

func AlwaysRetry

func AlwaysRetry(Key, int, error) ErrorAction

AlwaysRetry is an error handler that always retries the error.

type ErrorHandler

type ErrorHandler func(key Key, numRetries int, err error) ErrorAction

ErrorHandler is a function that takes the key of the failing object (zero key if event was sync), the number of times the key has been retried and the error that occurred. The function returns the action that should be taken.

func RetryUpTo

func RetryUpTo(n int) ErrorHandler

RetryUpTo is an error handler that retries a key up to specified number of times before stopping.

type Event

type Event[T k8sRuntime.Object] struct {
	Kind   EventKind
	Key    Key
	Object T

	// Done marks the event as processed.  If err is non-nil, the
	// key of the object is requeued and the processing retried at
	// a later time with a potentially new version of the object.
	//
	// If this method is not called after the references to the event
	// are gone, the finalizer will panic.
	Done func(err error)
}

Event emitted from resource.

type EventKind

type EventKind string
const (
	Sync   EventKind = "sync"
	Upsert EventKind = "upsert"
	Delete EventKind = "delete"
)

type EventsOpt

type EventsOpt func(*eventsOpts)

func WithErrorHandler

func WithErrorHandler(h ErrorHandler) EventsOpt

WithErrorHandler specifies the error handling strategy for failed events. By default the strategy is to always requeue the processing of a failed event.

func WithRateLimiter

func WithRateLimiter(r workqueue.RateLimiter) EventsOpt

WithRateLimiter sets the rate limiting algorithm to be used when requeueing failed events.

type Key

type Key struct {
	// Name is the name of the object
	Name string

	// Namespace is the namespace, or empty if object is not namespaced.
	Namespace string
}

Key of an K8s object, e.g. name and optional namespace.

func NewKey

func NewKey(obj any) Key

func (Key) String

func (k Key) String() string

type KeyIter

type KeyIter interface {
	// Next returns true if there is a key, false if iteration has finished.
	Next() bool
	Key() Key
}

type Resource

type Resource[T k8sRuntime.Object] interface {
	// Resource can be observed either via Observe() or via Events(). The observable
	// is implemented in terms of Events() and same semantics apply.
	stream.Observable[Event[T]]

	// Events returns a channel of events. Each event must be marked as handled
	// with a call to Done() which marks the key processed. No new events for this key
	// will be emitted before Done() is called.
	//
	// A missing Done() will lead to an eventual panic (via finalizer on Event[T]).
	// Panic on this situation is needed as otherwise no new events would be emitted
	// and thus this needs to be enforced.
	//
	// A stream of Upsert events are emitted first to replay the current state of the
	// store after which incremental upserts and deletes follow until the underlying
	// store is synchronized after which a Sync event is emitted and further incremental
	// updates:
	//
	//	(start observing), Upsert, Upsert, Upsert, (done replaying store contents), Upsert, Upsert,
	//	  (store synchronized with API server), Sync, Upsert, Delete, Upsert, ...
	//
	// The emitting of the Sync event does not depend on whether or not Upsert events have
	// all been marked Done() without an error. The sync event solely signals that the underlying
	// store has synchronized and that Upsert events for objects in a synchronized store have been
	// sent to the observer.
	//
	// When Done() is called with non-nil error the error handler is invoked, which
	// can ignore, requeue the event (by key) or close the channel. The default error handler
	// will requeue.
	//
	// If an Upsert is retried and the object has been deleted, a Delete event will be emitted instead.
	// Conversely if a Delete event is retried and the object has been recreated with the same key,
	// an Upsert will be emitted instead.
	//
	// If an objects is created and immediately deleted, then a slow observer may not observe this at
	// all. In all cases a Delete event is only emitted if the observer has seen an Upsert. Whether or
	// not it had been successfully handled (via Done(nil)) does not affect this property.
	Events(ctx context.Context, opts ...EventsOpt) <-chan Event[T]

	// Store retrieves the read-only store for the resource. Blocks until
	// the store has been synchronized or the context cancelled.
	// Returns a non-nil error if context is cancelled or the resource
	// has been stopped before store has synchronized.
	Store(context.Context) (Store[T], error)
}

Resource provides access to a Kubernetes resource through either a stream of events or a read-only store.

Observing of the events can be done from a constructor as subscriber registration is non-blocking.

Store() however should only be called from a start hook, or from a goroutine forked from the start hook as it blocks until the store has been synchronized.

The subscriber can process the events from Events() asynchronously and in parallel, but for each event the Done() function must be called to mark the event as handled. If not done no new events will be emitted for this key. If an event handling is marked as failed the configured error handler is called (WithErrorHandler). The default error handler will requeue the event (by its key) for later retried processing. The requeueing is rate limited and can be configured with WithRateLimiter option to Events().

The resource is lazy, e.g. it will not start the informer until a call has been made to Events() or Store().

func New

New creates a new Resource[T]. Use with hive.Provide:

var exampleCell = hive.Module(
	"example",
 	cell.Provide(
	 	// Provide `Resource[*slim_corev1.Pod]` to the hive:
	 	func(lc cell.Lifecycle, c k8sClient.Clientset) resource.Resource[*slim_corev1.Pod] {
			lw := utils.ListerWatcherFromTyped[*slim_corev1.PodList](
				c.Slim().CoreV1().Pods(""),
			)
	 		return resource.New(lc, lw)
	 	}
	}),
	...
)

func usePods(pods resource.Resource[*slim_corev1.Pod]) {
	go func() {
		for ev := range podEvents {
	   		onPodEvent(ev)
	   	}
	}
	return e
}
func onPodEvent(event resource.Event[*slim_core.Pod]) {
	switch event.Kind {
	case resource.Sync:
		// Pods have now been synced and the set of Upsert events
		// received thus far forms a coherent snapshot.

		// Must always call event.Done(error) to mark the event as processed.
		event.Done(nil)
	case resource.Upsert:
		event.Done(onPodUpsert(event.Object))
	case resource.Delete:
		event.Done(onPodDelete(event.Object))
	}
}

See also pkg/k8s/resource/example/main.go for a runnable example.

type ResourceOption

type ResourceOption func(o *options)

func WithCRDSync added in v1.16.0

func WithCRDSync(crdSyncPromise promise.Promise[synced.CRDSync]) ResourceOption

func WithIndexers

func WithIndexers(indexers cache.Indexers) ResourceOption

WithIndexers sets additional custom indexers on the resource store.

func WithLazyTransform

func WithLazyTransform(sourceObj func() k8sRuntime.Object, transform cache.TransformFunc) ResourceOption

WithLazyTransform sets the function to transform the object before storing it. Unlike "WithTransform", this defers the resolving of the source object type until the resource is needed. Use this in situations where the source object depends on api-server capabilities.

func WithMetric

func WithMetric(scope string) ResourceOption

WithMetric enables metrics collection for the resource using the provided scope.

func WithName added in v1.15.0

func WithName(name string) ResourceOption

WithName sets the name of the resource. Used for workqueue metrics.

func WithStoppableInformer added in v1.15.0

func WithStoppableInformer() ResourceOption

WithStoppableInformer marks the resource as releasable. A releasable resource stops the underlying informer if the last active subscriber cancels its subscription. In this case the resource is stopped and prepared again for a subsequent call to either Events() or Store(). A subscriber is a consumer who has taken a reference to the store with Store() or that is listening to the events stream channel with Events(). This option is meant to be used for very specific cases of resources with a high rate of updates that can potentially hinder scalability in very large clusters, like CiliumNode and CiliumEndpoint. For this cases, stopping the informer is required when switching to other data sources that scale better.

func WithTransform

func WithTransform[From, To k8sRuntime.Object](transform func(From) (To, error)) ResourceOption

WithTransform sets the function to transform the object before storing it.

type Store

type Store[T k8sRuntime.Object] interface {
	// List returns all items currently in the store.
	List() []T

	// IterKeys returns a key iterator.
	IterKeys() KeyIter

	// Get returns the latest version by deriving the key from the given object.
	Get(obj T) (item T, exists bool, err error)

	// GetByKey returns the latest version of the object with given key.
	GetByKey(key Key) (item T, exists bool, err error)

	// IndexKeys returns the keys of the stored objects whose set of indexed values
	// for the index includes the given indexed value.
	IndexKeys(indexName, indexedValue string) ([]string, error)

	// ByIndex returns the stored objects whose set of indexed values for the index
	// includes the given indexed value.
	ByIndex(indexName, indexedValue string) ([]T, error)

	// CacheStore returns the underlying cache.Store instance. Use for temporary
	// compatibility purposes only!
	CacheStore() cache.Store

	// Release the store and allows the associated resource to stop its informer if
	// this is the last reference to it.
	// This is a no-op if the resource is not releasable.
	Release()
}

Store is a read-only typed wrapper for cache.Store.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL