Documentation ¶
Index ¶
- Variables
- type ErrorAction
- type ErrorHandler
- type Event
- type EventKind
- type EventsOpt
- type Key
- type KeyIter
- type Resource
- type ResourceOption
- func WithCRDSync(crdSyncPromise promise.Promise[synced.CRDSync]) ResourceOption
- func WithIndexers(indexers cache.Indexers) ResourceOption
- func WithLazyTransform(sourceObj func() k8sRuntime.Object, transform cache.TransformFunc) ResourceOption
- func WithMetric(scope string) ResourceOption
- func WithName(name string) ResourceOption
- func WithStoppableInformer() ResourceOption
- func WithTransform[From, To k8sRuntime.Object](transform func(From) (To, error)) ResourceOption
- type Store
Constants ¶
This section is empty.
Variables ¶
var AddToScheme = localSchemeBuilder.AddToScheme
Functions ¶
This section is empty.
Types ¶
type ErrorAction ¶
type ErrorAction string
var ( // ErrorActionRetry instructs to retry the processing. The key is requeued after // rate limiting. ErrorActionRetry ErrorAction = "retry" // ErrorActionIgnore instructs to ignore the error. ErrorActionIgnore ErrorAction = "ignore" // ErrorActionStop instructs to stop the processing for this subscriber. ErrorActionStop ErrorAction = "stop" )
func AlwaysRetry ¶
func AlwaysRetry(Key, int, error) ErrorAction
AlwaysRetry is an error handler that always retries the error.
type ErrorHandler ¶
type ErrorHandler func(key Key, numRetries int, err error) ErrorAction
ErrorHandler is a function that takes the key of the failing object (zero key if event was sync), the number of times the key has been retried and the error that occurred. The function returns the action that should be taken.
func RetryUpTo ¶
func RetryUpTo(n int) ErrorHandler
RetryUpTo is an error handler that retries a key up to specified number of times before stopping.
type Event ¶
type Event[T k8sRuntime.Object] struct { Kind EventKind Key Key Object T // Done marks the event as processed. If err is non-nil, the // key of the object is requeued and the processing retried at // a later time with a potentially new version of the object. // // If this method is not called after the references to the event // are gone, the finalizer will panic. Done func(err error) }
Event emitted from resource.
type EventsOpt ¶
type EventsOpt func(*eventsOpts)
func WithErrorHandler ¶
func WithErrorHandler(h ErrorHandler) EventsOpt
WithErrorHandler specifies the error handling strategy for failed events. By default the strategy is to always requeue the processing of a failed event.
func WithRateLimiter ¶
func WithRateLimiter(r workqueue.RateLimiter) EventsOpt
WithRateLimiter sets the rate limiting algorithm to be used when requeueing failed events.
type Key ¶
type Key struct { // Name is the name of the object Name string // Namespace is the namespace, or empty if object is not namespaced. Namespace string }
Key of an K8s object, e.g. name and optional namespace.
type Resource ¶
type Resource[T k8sRuntime.Object] interface { // Resource can be observed either via Observe() or via Events(). The observable // is implemented in terms of Events() and same semantics apply. stream.Observable[Event[T]] // Events returns a channel of events. Each event must be marked as handled // with a call to Done() which marks the key processed. No new events for this key // will be emitted before Done() is called. // // A missing Done() will lead to an eventual panic (via finalizer on Event[T]). // Panic on this situation is needed as otherwise no new events would be emitted // and thus this needs to be enforced. // // A stream of Upsert events are emitted first to replay the current state of the // store after which incremental upserts and deletes follow until the underlying // store is synchronized after which a Sync event is emitted and further incremental // updates: // // (start observing), Upsert, Upsert, Upsert, (done replaying store contents), Upsert, Upsert, // (store synchronized with API server), Sync, Upsert, Delete, Upsert, ... // // The emitting of the Sync event does not depend on whether or not Upsert events have // all been marked Done() without an error. The sync event solely signals that the underlying // store has synchronized and that Upsert events for objects in a synchronized store have been // sent to the observer. // // When Done() is called with non-nil error the error handler is invoked, which // can ignore, requeue the event (by key) or close the channel. The default error handler // will requeue. // // If an Upsert is retried and the object has been deleted, a Delete event will be emitted instead. // Conversely if a Delete event is retried and the object has been recreated with the same key, // an Upsert will be emitted instead. // // If an objects is created and immediately deleted, then a slow observer may not observe this at // all. In all cases a Delete event is only emitted if the observer has seen an Upsert. Whether or // not it had been successfully handled (via Done(nil)) does not affect this property. Events(ctx context.Context, opts ...EventsOpt) <-chan Event[T] // Store retrieves the read-only store for the resource. Blocks until // the store has been synchronized or the context cancelled. // Returns a non-nil error if context is cancelled or the resource // has been stopped before store has synchronized. Store(context.Context) (Store[T], error) }
Resource provides access to a Kubernetes resource through either a stream of events or a read-only store.
Observing of the events can be done from a constructor as subscriber registration is non-blocking.
Store() however should only be called from a start hook, or from a goroutine forked from the start hook as it blocks until the store has been synchronized.
The subscriber can process the events from Events() asynchronously and in parallel, but for each event the Done() function must be called to mark the event as handled. If not done no new events will be emitted for this key. If an event handling is marked as failed the configured error handler is called (WithErrorHandler). The default error handler will requeue the event (by its key) for later retried processing. The requeueing is rate limited and can be configured with WithRateLimiter option to Events().
The resource is lazy, e.g. it will not start the informer until a call has been made to Events() or Store().
func New ¶
func New[T k8sRuntime.Object](lc cell.Lifecycle, lw cache.ListerWatcher, opts ...ResourceOption) Resource[T]
New creates a new Resource[T]. Use with hive.Provide:
var exampleCell = hive.Module( "example", cell.Provide( // Provide `Resource[*slim_corev1.Pod]` to the hive: func(lc cell.Lifecycle, c k8sClient.Clientset) resource.Resource[*slim_corev1.Pod] { lw := utils.ListerWatcherFromTyped[*slim_corev1.PodList]( c.Slim().CoreV1().Pods(""), ) return resource.New(lc, lw) } }), ... ) func usePods(pods resource.Resource[*slim_corev1.Pod]) { go func() { for ev := range podEvents { onPodEvent(ev) } } return e } func onPodEvent(event resource.Event[*slim_core.Pod]) { switch event.Kind { case resource.Sync: // Pods have now been synced and the set of Upsert events // received thus far forms a coherent snapshot. // Must always call event.Done(error) to mark the event as processed. event.Done(nil) case resource.Upsert: event.Done(onPodUpsert(event.Object)) case resource.Delete: event.Done(onPodDelete(event.Object)) } }
See also pkg/k8s/resource/example/main.go for a runnable example.
type ResourceOption ¶
type ResourceOption func(o *options)
func WithCRDSync ¶ added in v1.16.0
func WithCRDSync(crdSyncPromise promise.Promise[synced.CRDSync]) ResourceOption
func WithIndexers ¶
func WithIndexers(indexers cache.Indexers) ResourceOption
WithIndexers sets additional custom indexers on the resource store.
func WithLazyTransform ¶
func WithLazyTransform(sourceObj func() k8sRuntime.Object, transform cache.TransformFunc) ResourceOption
WithLazyTransform sets the function to transform the object before storing it. Unlike "WithTransform", this defers the resolving of the source object type until the resource is needed. Use this in situations where the source object depends on api-server capabilities.
func WithMetric ¶
func WithMetric(scope string) ResourceOption
WithMetric enables metrics collection for the resource using the provided scope.
func WithName ¶ added in v1.15.0
func WithName(name string) ResourceOption
WithName sets the name of the resource. Used for workqueue metrics.
func WithStoppableInformer ¶ added in v1.15.0
func WithStoppableInformer() ResourceOption
WithStoppableInformer marks the resource as releasable. A releasable resource stops the underlying informer if the last active subscriber cancels its subscription. In this case the resource is stopped and prepared again for a subsequent call to either Events() or Store(). A subscriber is a consumer who has taken a reference to the store with Store() or that is listening to the events stream channel with Events(). This option is meant to be used for very specific cases of resources with a high rate of updates that can potentially hinder scalability in very large clusters, like CiliumNode and CiliumEndpoint. For this cases, stopping the informer is required when switching to other data sources that scale better.
func WithTransform ¶
func WithTransform[From, To k8sRuntime.Object](transform func(From) (To, error)) ResourceOption
WithTransform sets the function to transform the object before storing it.
type Store ¶
type Store[T k8sRuntime.Object] interface { // List returns all items currently in the store. List() []T // IterKeys returns a key iterator. IterKeys() KeyIter // Get returns the latest version by deriving the key from the given object. Get(obj T) (item T, exists bool, err error) // GetByKey returns the latest version of the object with given key. GetByKey(key Key) (item T, exists bool, err error) // IndexKeys returns the keys of the stored objects whose set of indexed values // for the index includes the given indexed value. IndexKeys(indexName, indexedValue string) ([]string, error) // ByIndex returns the stored objects whose set of indexed values for the index // includes the given indexed value. ByIndex(indexName, indexedValue string) ([]T, error) // CacheStore returns the underlying cache.Store instance. Use for temporary // compatibility purposes only! CacheStore() cache.Store // Release the store and allows the associated resource to stop its informer if // this is the last reference to it. // This is a no-op if the resource is not releasable. Release() }
Store is a read-only typed wrapper for cache.Store.