resource

package
v1.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2023 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DeleteEvent added in v1.13.0

type DeleteEvent[T k8sRuntime.Object] struct {
	Key    Key
	Object T
	// contains filtered or unexported fields
}

DeleteEvent is emitted when an object has been deleted

func (*DeleteEvent) Done added in v1.13.0

func (b *DeleteEvent) Done(err error)

func (*DeleteEvent[T]) Handle added in v1.13.0

func (ev *DeleteEvent[T]) Handle(onSync func() error, onUpdate func(Key, T) error, onDelete func(Key, T) error)

type ErrorAction

type ErrorAction string
var (
	// ErrorActionRetry instructs to retry the processing. The key is requeued after
	// rate limiting.
	ErrorActionRetry ErrorAction = "retry"

	// ErrorActionIgnore instructs to ignore the error.
	ErrorActionIgnore ErrorAction = "ignore"

	// ErrorActionStop instructs to stop the processing for this subscriber.
	// The stream is completed with the error leading to this action.
	ErrorActionStop ErrorAction = "stop"
)

func AlwaysRetry

func AlwaysRetry(Key, int, error) ErrorAction

AlwaysRetry is an error handler that always retries the error.

type ErrorHandler

type ErrorHandler func(key Key, numRetries int, err error) ErrorAction

func RetryUpTo

func RetryUpTo(n int) ErrorHandler

RetryUpTo is an error handler that retries a key up to specified number of times before stopping.

type Event

type Event[T k8sRuntime.Object] interface {
	// Handle the event by invoking the right handler. The handlers can be
	// nil in which case the event is just marked as processed successfully.
	//
	// On error from the event handlers the event is requeued by key for later retrying.
	// The retry behaviour can be configured with the WithErrorHandler option.
	//
	// If you use Handle(), then Done() should not be called.
	// If you need to process the events in parallel/asynchronously,
	// then do a type-switch on Event[T] and call Done() after the
	// event has been processed.
	Handle(
		onSync func() error,
		onUpdate func(Key, T) error,
		onDelete func(Key, T) error,
	)

	// Done marks the event as processed.  If err is non-nil, the
	// key of the object is requeued and the processing retried at
	// a later time with a potentially new version of the object.
	//
	// If you choose not to use Handle(), then this must always be called after the
	// event has been processed.
	Done(err error)
}

Event emitted from resource. One of SyncEvent, UpdateEvent or DeleteEvent.

type Key

type Key struct {
	// Name is the name of the object
	Name string

	// Namespace is the namespace, or empty if object is not namespaced.
	Namespace string
}

Key of an K8s object, e.g. name and optional namespace.

func NewKey

func NewKey(obj any) Key

func (Key) String

func (k Key) String() string

type KeyIter

type KeyIter interface {
	// Next returns true if there is a key, false if iteration has finished.
	Next() bool
	Key() Key
}

type Option added in v1.13.0

type Option func(*options)

func WithErrorHandler

func WithErrorHandler(h ErrorHandler) Option

WithErrorHandler sets the function that decides how to handle an error from event processing.

func WithRateLimiter

func WithRateLimiter(newLimiter func() workqueue.RateLimiter) Option

WithRateLimiter sets the rate limiter to use with the resource.

type Resource

type Resource[T k8sRuntime.Object] interface {
	stream.Observable[Event[T]]

	// Store retrieves the read-only store for the resource. Blocks until
	// the store has been synchronized or the context cancelled.
	// Returns a non-nil error if context is cancelled or the resource
	// has been stopped before store has synchronized.
	Store(context.Context) (Store[T], error)
}

Resource provides access to a Kubernetes resource through either a stream of events or a read-only store.

Observing of the events can be done from a constructor as subscriber registration is non-blocking.

Store() however should only be called from a start hook, or from a goroutine forked from the start hook as it blocks until the store has been synchronized.

The subscriber can process the event synchronously with the Event[T].Handle function, or asynchronously by calling Event[T].Done once the event has been processed. On errors the object's key is requeued for later processing. Once maximum number of retries is reached the subscriber's event stream will be completed with the error from the last retry attempt.

The resource is lazy, e.g. it will not start the informer until a call has been made to Observe() or Store().

func New

func New[T k8sRuntime.Object](lc hive.Lifecycle, lw cache.ListerWatcher, opts ...Option) Resource[T]

New creates a new Resource[T]. Use with hive.Provide:

var exampleCell = hive.Module(
	"example",
 	cell.Provide(
	 	// Provide `Resource[*slim_corev1.Pod]` to the hive:
	 	func(lc hive.Lifecycle, c k8sClient.Clientset) resource.Resource[*slim_corev1.Pod] {
			lw := utils.ListerWatcherFromTyped[*slim_corev1.PodList](
				c.Slim().CoreV1().Pods(""),
			)
	 		return resource.New(lc, lw)
	 	}
		// Use the resource:
		newExample,
	}),
	...
)
func newExample(pods resource.Resource[*slim_corev1.Pod]) *Example {
	e := &Example{...}
	pods.Observe(e.ctx, e.onPodUpdated, e.onPodsComplete)
	return e
}
func (e *Example) onPodUpdated(key resource.Key, pod *slim_core.Pod) error {
	// Process event ...
}
func (e *Example) onPodsComplete(err error) {
	// Handle error ...
}

See also pkg/k8s/resource/example/main.go for a runnable example.

type Store

type Store[T k8sRuntime.Object] interface {
	// List returns all items currently in the store.
	List() []T

	// IterKeys returns a key iterator.
	IterKeys() KeyIter

	// Get returns the latest version by deriving the key from the given object.
	Get(obj T) (item T, exists bool, err error)

	// GetByKey returns the latest version of the object with given key.
	GetByKey(key Key) (item T, exists bool, err error)

	// CacheStore returns the underlying cache.Store instance. Use for temporary
	// compatibility purposes only!
	CacheStore() cache.Store
}

Store is a read-only typed wrapper for cache.Store.

type SyncEvent added in v1.13.0

type SyncEvent[T k8sRuntime.Object] struct {
	// contains filtered or unexported fields
}

SyncEvent is emitted after a set of initial objects has been emitted as UpdateEvents. At this point the subscriber will have a consistent snapshot of the state of this resource and can perform e.g. garbage collection operations.

func (*SyncEvent) Done added in v1.13.0

func (b *SyncEvent) Done(err error)

func (*SyncEvent[T]) Handle added in v1.13.0

func (ev *SyncEvent[T]) Handle(onSync func() error, onUpdate func(Key, T) error, onDelete func(Key, T) error)

type UpdateEvent added in v1.13.0

type UpdateEvent[T k8sRuntime.Object] struct {
	Key    Key
	Object T
	// contains filtered or unexported fields
}

UpdateEvent is emitted when an object has been added or updated

func (*UpdateEvent) Done added in v1.13.0

func (b *UpdateEvent) Done(err error)

func (*UpdateEvent[T]) Handle added in v1.13.0

func (ev *UpdateEvent[T]) Handle(onSync func() error, onUpdate func(Key, T) error, onDelete func(Key, T) error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL