Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DeleteEvent ¶ added in v1.13.0
type DeleteEvent[T k8sRuntime.Object] struct { Key Key Object T // contains filtered or unexported fields }
DeleteEvent is emitted when an object has been deleted
type ErrorAction ¶
type ErrorAction string
var ( // ErrorActionRetry instructs to retry the processing. The key is requeued after // rate limiting. ErrorActionRetry ErrorAction = "retry" // ErrorActionIgnore instructs to ignore the error. ErrorActionIgnore ErrorAction = "ignore" // ErrorActionStop instructs to stop the processing for this subscriber. // The stream is completed with the error leading to this action. ErrorActionStop ErrorAction = "stop" )
func AlwaysRetry ¶
func AlwaysRetry(Key, int, error) ErrorAction
AlwaysRetry is an error handler that always retries the error.
type ErrorHandler ¶
type ErrorHandler func(key Key, numRetries int, err error) ErrorAction
func RetryUpTo ¶
func RetryUpTo(n int) ErrorHandler
RetryUpTo is an error handler that retries a key up to specified number of times before stopping.
type Event ¶
type Event[T k8sRuntime.Object] interface { // Handle the event by invoking the right handler. The handlers can be // nil in which case the event is just marked as processed successfully. // // On error from the event handlers the event is requeued by key for later retrying. // The retry behaviour can be configured with the WithErrorHandler option. // // If you use Handle(), then Done() should not be called. // If you need to process the events in parallel/asynchronously, // then do a type-switch on Event[T] and call Done() after the // event has been processed. Handle( onSync func() error, onUpdate func(Key, T) error, onDelete func(Key, T) error, ) // Done marks the event as processed. If err is non-nil, the // key of the object is requeued and the processing retried at // a later time with a potentially new version of the object. // // If you choose not to use Handle(), then this must always be called after the // event has been processed. Done(err error) }
Event emitted from resource. One of SyncEvent, UpdateEvent or DeleteEvent.
type Key ¶
type Key struct { // Name is the name of the object Name string // Namespace is the namespace, or empty if object is not namespaced. Namespace string }
Key of an K8s object, e.g. name and optional namespace.
type Option ¶ added in v1.13.0
type Option func(*options)
func WithErrorHandler ¶
func WithErrorHandler(h ErrorHandler) Option
WithErrorHandler sets the function that decides how to handle an error from event processing.
func WithRateLimiter ¶
func WithRateLimiter(newLimiter func() workqueue.RateLimiter) Option
WithRateLimiter sets the rate limiter to use with the resource.
type Resource ¶
type Resource[T k8sRuntime.Object] interface { stream.Observable[Event[T]] // Store retrieves the read-only store for the resource. Blocks until // the store has been synchronized or the context cancelled. // Returns a non-nil error if context is cancelled or the resource // has been stopped before store has synchronized. Store(context.Context) (Store[T], error) }
Resource provides access to a Kubernetes resource through either a stream of events or a read-only store.
Observing of the events can be done from a constructor as subscriber registration is non-blocking.
Store() however should only be called from a start hook, or from a goroutine forked from the start hook as it blocks until the store has been synchronized.
The subscriber can process the event synchronously with the Event[T].Handle function, or asynchronously by calling Event[T].Done once the event has been processed. On errors the object's key is requeued for later processing. Once maximum number of retries is reached the subscriber's event stream will be completed with the error from the last retry attempt.
The resource is lazy, e.g. it will not start the informer until a call has been made to Observe() or Store().
func New ¶
func New[T k8sRuntime.Object](lc hive.Lifecycle, lw cache.ListerWatcher, opts ...Option) Resource[T]
New creates a new Resource[T]. Use with hive.Provide:
var exampleCell = hive.Module( "example", cell.Provide( // Provide `Resource[*slim_corev1.Pod]` to the hive: func(lc hive.Lifecycle, c k8sClient.Clientset) resource.Resource[*slim_corev1.Pod] { lw := utils.ListerWatcherFromTyped[*slim_corev1.PodList]( c.Slim().CoreV1().Pods(""), ) return resource.New(lc, lw) } // Use the resource: newExample, }), ... ) func newExample(pods resource.Resource[*slim_corev1.Pod]) *Example { e := &Example{...} pods.Observe(e.ctx, e.onPodUpdated, e.onPodsComplete) return e } func (e *Example) onPodUpdated(key resource.Key, pod *slim_core.Pod) error { // Process event ... } func (e *Example) onPodsComplete(err error) { // Handle error ... }
See also pkg/k8s/resource/example/main.go for a runnable example.
type Store ¶
type Store[T k8sRuntime.Object] interface { // List returns all items currently in the store. List() []T // IterKeys returns a key iterator. IterKeys() KeyIter // Get returns the latest version by deriving the key from the given object. Get(obj T) (item T, exists bool, err error) // GetByKey returns the latest version of the object with given key. GetByKey(key Key) (item T, exists bool, err error) // CacheStore returns the underlying cache.Store instance. Use for temporary // compatibility purposes only! CacheStore() cache.Store }
Store is a read-only typed wrapper for cache.Store.
type SyncEvent ¶ added in v1.13.0
type SyncEvent[T k8sRuntime.Object] struct { // contains filtered or unexported fields }
SyncEvent is emitted after a set of initial objects has been emitted as UpdateEvents. At this point the subscriber will have a consistent snapshot of the state of this resource and can perform e.g. garbage collection operations.
type UpdateEvent ¶ added in v1.13.0
type UpdateEvent[T k8sRuntime.Object] struct { Key Key Object T // contains filtered or unexported fields }
UpdateEvent is emitted when an object has been added or updated