Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ErrorAction ¶
type ErrorAction string
var ( // ErrorActionRetry instructs to retry the processing. The key is requeued after // rate limiting. ErrorActionRetry ErrorAction = "retry" // ErrorActionIgnore instructs to ignore the error. ErrorActionIgnore ErrorAction = "ignore" // ErrorActionStop instructs to stop the processing for this subscriber. ErrorActionStop ErrorAction = "stop" )
func AlwaysRetry ¶
func AlwaysRetry(Key, int, error) ErrorAction
AlwaysRetry is an error handler that always retries the error.
type ErrorHandler ¶
type ErrorHandler func(key Key, numRetries int, err error) ErrorAction
ErrorHandler is a function that takes the key of the failing object (zero key if event was sync), the number of times the key has been retried and the error that occurred. The function returns the action that should be taken.
func RetryUpTo ¶
func RetryUpTo(n int) ErrorHandler
RetryUpTo is an error handler that retries a key up to specified number of times before stopping.
type Event ¶
type Event[T k8sRuntime.Object] struct { Kind EventKind Key Key Object T // Done marks the event as processed. If err is non-nil, the // key of the object is requeued and the processing retried at // a later time with a potentially new version of the object. // // If this method is not called after the references to the event // are gone, the finalizer will panic. Done func(err error) }
Event emitted from resource.
type EventsOpt ¶
type EventsOpt func(*eventsOpts)
func WithErrorHandler ¶
func WithErrorHandler(h ErrorHandler) EventsOpt
WithErrorHandler specifies the error handling strategy for failed events. By default the strategy is to always requeue the processing of a failed event.
func WithRateLimiter ¶
func WithRateLimiter(r workqueue.RateLimiter) EventsOpt
WithRateLimiter sets the rate limiting algorithm to be used when requeueing failed events.
type Key ¶
type Key struct { // Name is the name of the object Name string // Namespace is the namespace, or empty if object is not namespaced. Namespace string }
Key of an K8s object, e.g. name and optional namespace.
type Resource ¶
type Resource[T k8sRuntime.Object] interface { // Events returns a channel of events. Each event must be marked as handled // with a call to Done(), otherwise no new events for this key will be emitted. // // When Done() is called with non-nil error the error handler is invoked, which // can ignore, requeue the event or close the channel. The default error handler // will requeue. Events(ctx context.Context, opts ...EventsOpt) <-chan Event[T] // Store retrieves the read-only store for the resource. Blocks until // the store has been synchronized or the context cancelled. // Returns a non-nil error if context is cancelled or the resource // has been stopped before store has synchronized. Store(context.Context) (Store[T], error) }
Resource provides access to a Kubernetes resource through either a stream of events or a read-only store.
Observing of the events can be done from a constructor as subscriber registration is non-blocking.
Store() however should only be called from a start hook, or from a goroutine forked from the start hook as it blocks until the store has been synchronized.
The subscriber can process the events from Events() asynchronously and in parallel, but for each event the Done() function must be called to mark the event as handled. If not done no new events will be emitted for this key. If an event handling is marked as failed the configured error handler is called (WithErrorHandler). The default error handler will requeue the event (by its key) for later retried processing. The requeueing is rate limited and can be configured with WithRateLimiter option to Events().
The resource is lazy, e.g. it will not start the informer until a call has been made to Events() or Store().
func New ¶
func New[T k8sRuntime.Object](lc hive.Lifecycle, lw cache.ListerWatcher) Resource[T]
New creates a new Resource[T]. Use with hive.Provide:
var exampleCell = hive.Module( "example", cell.Provide( // Provide `Resource[*slim_corev1.Pod]` to the hive: func(lc hive.Lifecycle, c k8sClient.Clientset) resource.Resource[*slim_corev1.Pod] { lw := utils.ListerWatcherFromTyped[*slim_corev1.PodList]( c.Slim().CoreV1().Pods(""), ) return resource.New(lc, lw) } }), ... ) func usePods(pods resource.Resource[*slim_corev1.Pod]) { go func() { for ev := range podEvents { onPodEvent(ev) } } return e } func onPodEvent(event resource.Event[*slim_core.Pod]) { switch event.Kind { case resource.Sync: // Pods have now been synced and the set of Upsert events // received thus far forms a coherent snapshot. // Must always call event.Done(error) to mark the event as processed. event.Done(nil) case resource.Upsert: event.Done(onPodUpsert(event.Object)) case resource.Delete: event.Done(onPodDelete(event.Object)) } }
See also pkg/k8s/resource/example/main.go for a runnable example.
type Store ¶
type Store[T k8sRuntime.Object] interface { // List returns all items currently in the store. List() []T // IterKeys returns a key iterator. IterKeys() KeyIter // Get returns the latest version by deriving the key from the given object. Get(obj T) (item T, exists bool, err error) // GetByKey returns the latest version of the object with given key. GetByKey(key Key) (item T, exists bool, err error) // CacheStore returns the underlying cache.Store instance. Use for temporary // compatibility purposes only! CacheStore() cache.Store }
Store is a read-only typed wrapper for cache.Store.