Documentation ¶
Overview ¶
Package node implements the components for operating a node in Kubernetes. This includes controllers for managing the node object, running scheduled pods, and exporting HTTP endpoints expected by the Kubernetes API server.
There are two primary controllers, the node runner and the pod runner.
nodeRunner, _ := node.NewNodeController(...) // setup other things podRunner, _ := node.NewPodController(...) go podRunner.Run(ctx) select { case <-podRunner.Ready(): case <-podRunner.Done(): } if podRunner.Err() != nil { // handle error }
After calling start, cancelling the passed in context will shutdown the controller. Note this example elides error handling.
Up to this point you have an active node in Kubernetes which can have pods scheduled to it. However the API server expects nodes to implement API endpoints in order to support certain features such as fetching logs or execing a new process. The api package provides some helpers for this: `api.AttachPodRoutes` and `api.AttachMetricsRoutes`.
mux := http.NewServeMux() api.AttachPodRoutes(provider, mux)
You must configure your own HTTP server, but these helpers will add handlers at the correct URI paths to your serve mux. You are not required to use go's built-in `*http.ServeMux`, but it does implement the `ServeMux` interface defined in this package which is used for these helpers.
Note: The metrics routes may need to be attached to a different HTTP server, depending on your configuration.
For more fine-grained control over the API, see the `node/api` package which only implements the HTTP handlers that you can use in whatever way you want.
This uses open-cenesus to implement tracing (but no internal metrics yet) which is propagated through the context. This is passed on even to the providers.
Index ¶
- Constants
- Variables
- type ErrorHandler
- type NaiveNodeProvider
- type NaiveNodeProviderV2
- type NodeController
- type NodeControllerOpt
- func WithNodeEnableLeaseV1(client coordclientset.LeaseInterface, leaseDurationSeconds int32) NodeControllerOpt
- func WithNodeEnableLeaseV1WithRenewInterval(client coordclientset.LeaseInterface, leaseDurationSeconds int32, ...) NodeControllerOpt
- func WithNodePingInterval(d time.Duration) NodeControllerOpt
- func WithNodePingTimeout(timeout time.Duration) NodeControllerOpt
- func WithNodeStatusUpdateErrorHandler(h ErrorHandler) NodeControllerOpt
- func WithNodeStatusUpdateInterval(d time.Duration) NodeControllerOpt
- type NodeProvider
- type PodController
- func (pc *PodController) DeletePodsFromKubernetesQueueItemsBeingProcessedLen() int
- func (pc *PodController) DeletePodsFromKubernetesQueueLen() int
- func (pc *PodController) DeletePodsFromKubernetesQueueUnprocessedLen() int
- func (pc *PodController) Done() <-chan struct{}
- func (pc *PodController) Err() error
- func (pc *PodController) Ready() <-chan struct{}
- func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error)
- func (pc *PodController) SyncPodStatusFromProviderQueueItemsBeingProcessedLen() int
- func (pc *PodController) SyncPodStatusFromProviderQueueLen() int
- func (pc *PodController) SyncPodStatusFromProviderQueueUnprocessedLen() int
- func (pc *PodController) SyncPodsFromKubernetesQueueItemsBeingProcessedLen() int
- func (pc *PodController) SyncPodsFromKubernetesQueueLen() int
- func (pc *PodController) SyncPodsFromKubernetesQueueUnprocessedLen() int
- type PodControllerConfig
- type PodEventFilterFunc
- type PodLifecycleHandler
- type PodNotifier
- type ShouldRetryFunc
Constants ¶
const ( // DefaultRenewIntervalFraction is the fraction of lease duration to renew the lease DefaultRenewIntervalFraction = 0.25 // DefaultLeaseDuration is from upstream kubelet, where the default lease duration is 40 seconds DefaultLeaseDuration = 40 )
const ( DefaultPingInterval = 10 * time.Second DefaultStatusUpdateInterval = 1 * time.Minute )
The default intervals used for lease and status updates.
Variables ¶
var DefaultRetryFunc = queue.DefaultRetryFunc
DefaultRetryFunc is the default function used for retries by the queue subsystem. Its only policy is that it gives up after MaxRetries, and falls back to the rate limiter for all other retries.
var ( // ErrConflictingLeaseControllerConfiguration is returned when the lease controller related options have been // specified multiple times ErrConflictingLeaseControllerConfiguration = pkgerrors.New("Multiple, conflicting lease configurations have been put into place") )
var MaxRetries = queue.MaxRetries
MaxRetries is the number of times we try to process a given key before permanently forgetting it.
Functions ¶
This section is empty.
Types ¶
type ErrorHandler ¶
ErrorHandler is a type of function used to allow callbacks for handling errors. It is expected that if a nil error is returned that the error is handled and progress can continue (or a retry is possible).
type NaiveNodeProvider ¶
type NaiveNodeProvider struct{}
NaiveNodeProvider is a basic node provider that only uses the passed in context on `Ping` to determine if the node is healthy.
func (NaiveNodeProvider) NotifyNodeStatus ¶
func (n NaiveNodeProvider) NotifyNodeStatus(_ context.Context, _ func(*corev1.Node))
NotifyNodeStatus implements the NodeProvider interface.
This NaiveNodeProvider does not support updating node status and so this function is a no-op.
type NaiveNodeProviderV2 ¶
type NaiveNodeProviderV2 struct {
// contains filtered or unexported fields
}
NaiveNodeProviderV2 is like NaiveNodeProvider except it supports accepting node status updates. It must be used with as a pointer and must be created with `NewNaiveNodeProvider`
func NewNaiveNodeProvider ¶
func NewNaiveNodeProvider() *NaiveNodeProviderV2
NewNaiveNodeProvider creates a new NaiveNodeProviderV2 You must use this to create a NaiveNodeProviderV2 if you want to be able to send node status updates to the node controller.
func (*NaiveNodeProviderV2) NotifyNodeStatus ¶
func (n *NaiveNodeProviderV2) NotifyNodeStatus(_ context.Context, f func(*corev1.Node))
NotifyNodeStatus implements the NodeProvider interface.
NaiveNodeProvider does not support updating node status unless created with `NewNaiveNodeProvider` Otherwise this is a no-op
func (*NaiveNodeProviderV2) Ping ¶
func (*NaiveNodeProviderV2) Ping(ctx context.Context) error
Ping just implements the NodeProvider interface. It returns the error from the passed in context only.
func (*NaiveNodeProviderV2) UpdateStatus ¶
UpdateStatus sends a node status update to the node controller
type NodeController ¶
type NodeController struct {
// contains filtered or unexported fields
}
NodeController deals with creating and managing a node object in Kubernetes. It can register a node with Kubernetes and periodically update its status. NodeController manages a single node entity.
func NewNodeController ¶
func NewNodeController(p NodeProvider, node *corev1.Node, nodes v1.NodeInterface, opts ...NodeControllerOpt) (*NodeController, error)
NewNodeController creates a new node controller. This does not have any side-effects on the system or kubernetes.
Use the node's `Run` method to register and run the loops to update the node in Kubernetes.
Note: When if there are multiple NodeControllerOpts which apply against the same underlying options, the last NodeControllerOpt will win.
func (*NodeController) Done ¶
func (n *NodeController) Done() <-chan struct{}
Done signals to the caller when the controller is done and the control loop is exited.
Call n.Err() to find out if there was an error.
func (*NodeController) Err ¶
func (n *NodeController) Err() error
Err returns any errors that have occurred that trigger the control loop to exit.
Err only returns a non-nil error after `<-n.Done()` returns.
func (*NodeController) Ready ¶
func (n *NodeController) Ready() <-chan struct{}
Ready returns a channel that gets closed when the node is fully up and running. Note that if there is an error on startup this channel will never be closed.
func (*NodeController) Run ¶
func (n *NodeController) Run(ctx context.Context) (retErr error)
Run registers the node in kubernetes and starts loops for updating the node status in Kubernetes.
The node status must be updated periodically in Kubernetes to keep the node active. Newer versions of Kubernetes support node leases, which are essentially light weight pings. Older versions of Kubernetes require updating the node status periodically.
If Kubernetes supports node leases this will use leases with a much slower node status update (because some things still expect the node to be updated periodically), otherwise it will only use node status update with the configured ping interval.
type NodeControllerOpt ¶
type NodeControllerOpt func(*NodeController) error //nolint:revive
NodeControllerOpt are the functional options used for configuring a node
func WithNodeEnableLeaseV1 ¶
func WithNodeEnableLeaseV1(client coordclientset.LeaseInterface, leaseDurationSeconds int32) NodeControllerOpt
WithNodeEnableLeaseV1 enables support for v1 leases. V1 Leases share all the same properties as v1beta1 leases, except they do not fallback like the v1beta1 lease controller does if the API server does not support it. If the lease duration is not specified (0) then DefaultLeaseDuration will be used
func WithNodeEnableLeaseV1WithRenewInterval ¶
func WithNodeEnableLeaseV1WithRenewInterval(client coordclientset.LeaseInterface, leaseDurationSeconds int32, interval time.Duration) NodeControllerOpt
WithNodeEnableLeaseV1WithRenewInterval enables support for v1 leases, and sets a specific renew interval, as opposed to the standard multiplier specified by DefaultRenewIntervalFraction
func WithNodePingInterval ¶
func WithNodePingInterval(d time.Duration) NodeControllerOpt
WithNodePingInterval sets the interval between checking for node statuses via Ping() If node leases are not supported (or not enabled), this is the frequency with which the node status will be updated in Kubernetes.
func WithNodePingTimeout ¶
func WithNodePingTimeout(timeout time.Duration) NodeControllerOpt
WithNodePingTimeout limits the amount of time that the virtual kubelet will wait for the node provider to respond to the ping callback. If it does not return within this time, it will be considered an error condition
func WithNodeStatusUpdateErrorHandler ¶
func WithNodeStatusUpdateErrorHandler(h ErrorHandler) NodeControllerOpt
WithNodeStatusUpdateErrorHandler adds an error handler for cases where there is an error when updating the node status. This allows the caller to have some control on how errors are dealt with when updating a node's status.
The error passed to the handler will be the error received from kubernetes when updating node status.
func WithNodeStatusUpdateInterval ¶
func WithNodeStatusUpdateInterval(d time.Duration) NodeControllerOpt
WithNodeStatusUpdateInterval sets the interval for updating node status This is only used when leases are supported and only for updating the actual node status, not the node lease. When node leases are not enabled (or are not supported on the cluster) this has no affect and node status is updated on the "ping" interval.
type NodeProvider ¶
type NodeProvider interface { // Ping checks if the node is still active. // This is intended to be lightweight as it will be called periodically as a // heartbeat to keep the node marked as ready in Kubernetes. Ping(context.Context) error // NotifyNodeStatus is used to asynchronously monitor the node. // The passed in callback should be called any time there is a change to the // node's status. // This will generally trigger a call to the Kubernetes API server to update // the status. // // NotifyNodeStatus should not block callers. NotifyNodeStatus(ctx context.Context, cb func(*corev1.Node)) }
NodeProvider is the interface used for registering a node and updating its status in Kubernetes.
Note: Implementers can choose to manage a node themselves, in which case it is not needed to provide an implementation for this interface.
type PodController ¶
type PodController struct {
// contains filtered or unexported fields
}
PodController is the controller implementation for Pod resources.
func NewPodController ¶
func NewPodController(cfg PodControllerConfig) (*PodController, error)
NewPodController creates a new pod controller with the provided config.
func (*PodController) DeletePodsFromKubernetesQueueItemsBeingProcessedLen ¶
func (pc *PodController) DeletePodsFromKubernetesQueueItemsBeingProcessedLen() int
DeletePodsFromKubernetesQueueItemsBeingProcessedLen returns the length of the items being processed in the DeletePodsFromKubernetes queue
func (*PodController) DeletePodsFromKubernetesQueueLen ¶
func (pc *PodController) DeletePodsFromKubernetesQueueLen() int
DeletePodsFromKubernetesQueueLen returns the length of the DeletePodsFromKubernetes queue, which include items being processed and unprocessed
func (*PodController) DeletePodsFromKubernetesQueueUnprocessedLen ¶
func (pc *PodController) DeletePodsFromKubernetesQueueUnprocessedLen() int
DeletePodsFromKubernetesQueueUnprocessedLen returns the length of the unprocessed items in the DeletePodsFromKubernetes queue
func (*PodController) Done ¶
func (pc *PodController) Done() <-chan struct{}
Done returns a channel receiver which is closed when the pod controller has exited. Once the pod controller has exited you can call `pc.Err()` to see if any error occurred.
func (*PodController) Err ¶
func (pc *PodController) Err() error
Err returns any error that has occurred and caused the pod controller to exit.
func (*PodController) Ready ¶
func (pc *PodController) Ready() <-chan struct{}
Ready returns a channel which gets closed once the PodController is ready to handle scheduled pods. This channel will never close if there is an error on startup. The status of this channel after shutdown is indeterminate.
func (*PodController) Run ¶
func (pc *PodController) Run(ctx context.Context, podSyncWorkers int) (retErr error)
Run will set up the event handlers for types we are interested in, as well as syncing informer caches and starting workers. It will block until the context is cancelled, at which point it will shutdown the work queue and wait for workers to finish processing their current work items prior to returning.
Once this returns, you should not re-use the controller.
func (*PodController) SyncPodStatusFromProviderQueueItemsBeingProcessedLen ¶
func (pc *PodController) SyncPodStatusFromProviderQueueItemsBeingProcessedLen() int
SyncPodStatusFromProviderQueueItemsBeingProcessedLen returns the length of the items being processed in the SyncPodStatusFromProvider queue
func (*PodController) SyncPodStatusFromProviderQueueLen ¶
func (pc *PodController) SyncPodStatusFromProviderQueueLen() int
SyncPodStatusFromProviderQueueLen returns the length of the SyncPodStatusFromProvider queue, which include items being processed and unprocessed
func (*PodController) SyncPodStatusFromProviderQueueUnprocessedLen ¶
func (pc *PodController) SyncPodStatusFromProviderQueueUnprocessedLen() int
SyncPodStatusFromProviderQueueUnprocessedLen returns the length of the unprocessed items in the SyncPodStatusFromProvider queue
func (*PodController) SyncPodsFromKubernetesQueueItemsBeingProcessedLen ¶
func (pc *PodController) SyncPodsFromKubernetesQueueItemsBeingProcessedLen() int
SyncPodsFromKubernetesQueueItemsBeingProcessedLen returns the length of the items being processed in the SyncPodsFromKubernetes queue
func (*PodController) SyncPodsFromKubernetesQueueLen ¶
func (pc *PodController) SyncPodsFromKubernetesQueueLen() int
SyncPodsFromKubernetesQueueLen returns the length of the SyncPodsFromKubernetes queue, which include items being processed and unprocessed
func (*PodController) SyncPodsFromKubernetesQueueUnprocessedLen ¶
func (pc *PodController) SyncPodsFromKubernetesQueueUnprocessedLen() int
SyncPodsFromKubernetesQueueUnprocessedLen returns the length of the unprocessed items in the SyncPodsFromKubernetes queue
type PodControllerConfig ¶
type PodControllerConfig struct { // PodClient is used to perform actions on the k8s API, such as updating pod status // This field is required PodClient corev1client.PodsGetter // PodInformer is used as a local cache for pods // This should be configured to only look at pods scheduled to the node which the controller will be managing // If the informer does not filter based on node, then you must provide a `PodEventFilterFunc` parameter so event handlers // can filter pods not assigned to this node. PodInformer corev1informers.PodInformer EventRecorder record.EventRecorder Provider PodLifecycleHandler // Informers used for filling details for things like downward API in pod spec. // // We are using informers here instead of listers because we'll need the // informer for certain features (like notifications for updated ConfigMaps) ConfigMapInformer corev1informers.ConfigMapInformer SecretInformer corev1informers.SecretInformer ServiceInformer corev1informers.ServiceInformer // SyncPodsFromKubernetesRateLimiter defines the rate limit for the SyncPodsFromKubernetes queue SyncPodsFromKubernetesRateLimiter workqueue.RateLimiter // SyncPodsFromKubernetesShouldRetryFunc allows for a custom retry policy for the SyncPodsFromKubernetes queue SyncPodsFromKubernetesShouldRetryFunc ShouldRetryFunc // DeletePodsFromKubernetesRateLimiter defines the rate limit for the DeletePodsFromKubernetesRateLimiter queue DeletePodsFromKubernetesRateLimiter workqueue.RateLimiter // DeletePodsFromKubernetesShouldRetryFunc allows for a custom retry policy for the SyncPodsFromKubernetes queue DeletePodsFromKubernetesShouldRetryFunc ShouldRetryFunc // SyncPodStatusFromProviderRateLimiter defines the rate limit for the SyncPodStatusFromProviderRateLimiter queue SyncPodStatusFromProviderRateLimiter workqueue.RateLimiter // SyncPodStatusFromProviderShouldRetryFunc allows for a custom retry policy for the SyncPodStatusFromProvider queue SyncPodStatusFromProviderShouldRetryFunc ShouldRetryFunc // Add custom filtering for pod informer event handlers // Use this for cases where the pod informer handles more than pods assigned to this node // // For example, if the pod informer is not filtering based on pod.Spec.NodeName, you should // set that filter here so the pod controller does not handle events for pods assigned to other nodes. PodEventFilterFunc PodEventFilterFunc }
PodControllerConfig is used to configure a new PodController.
type PodEventFilterFunc ¶
PodEventFilterFunc is used to filter pod events received from Kubernetes.
Filters that return true means the event handler will be run Filters that return false means the filter will *not* be run.
type PodLifecycleHandler ¶
type PodLifecycleHandler interface { // CreatePod takes a Kubernetes Pod and deploys it within the provider. CreatePod(ctx context.Context, pod *corev1.Pod) error // UpdatePod takes a Kubernetes Pod and updates it within the provider. UpdatePod(ctx context.Context, pod *corev1.Pod) error // DeletePod takes a Kubernetes Pod and deletes it from the provider. Once a pod is deleted, the provider is // expected to call the NotifyPods callback with a terminal pod status where all the containers are in a terminal // state, as well as the pod. DeletePod may be called multiple times for the same pod. DeletePod(ctx context.Context, pod *corev1.Pod) error // GetPod retrieves a pod by name from the provider (can be cached). // The Pod returned is expected to be immutable, and may be accessed // concurrently outside of the calling goroutine. Therefore it is recommended // to return a version after DeepCopy. GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) // GetPodStatus retrieves the status of a pod by name from the provider. // The PodStatus returned is expected to be immutable, and may be accessed // concurrently outside of the calling goroutine. Therefore it is recommended // to return a version after DeepCopy. GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) // GetPods retrieves a list of all pods running on the provider (can be cached). // The Pods returned are expected to be immutable, and may be accessed // concurrently outside of the calling goroutine. Therefore it is recommended // to return a version after DeepCopy. GetPods(context.Context) ([]*corev1.Pod, error) }
PodLifecycleHandler defines the interface used by the PodController to react to new and changed pods scheduled to the node that is being managed.
Errors produced by these methods should implement an interface from github.com/surax98/virtual-kubelet/errdefs package in order for the core logic to be able to understand the type of failure.
type PodNotifier ¶
type PodNotifier interface { // NotifyPods instructs the notifier to call the passed in function when // the pod status changes. It should be called when a pod's status changes. // // The provided pointer to a Pod is guaranteed to be used in a read-only // fashion. The provided pod's PodStatus should be up to date when // this function is called. // // NotifyPods must not block the caller since it is only used to register the callback. // The callback passed into `NotifyPods` may block when called. NotifyPods(context.Context, func(*corev1.Pod)) }
PodNotifier is used as an extension to PodLifecycleHandler to support async updates of pod statuses.
type ShouldRetryFunc ¶
type ShouldRetryFunc = queue.ShouldRetryFunc
ShouldRetryFunc is a mechanism to have a custom retry policy
it is passed metadata about the work item when the handler returns an error. It returns the following: * The key * The number of attempts that this item has already had (and failed) * The (potentially wrapped) error from the queue handler.
The return value is an error, and optionally an amount to delay the work. If an error is returned, the work will be aborted, and the returned error is bubbled up. It can be the error that was passed in or that error can be wrapped.
If the work item should be is to be retried, a delay duration may be specified. The delay is used to schedule when the item should begin processing relative to now, it does not necessarily dictate when the item will start work. Items are processed in the order they are scheduled. If the delay is nil, it will fall back to the default behaviour of the queue, and use the rate limiter that's configured to determine when to start work.
If the delay is negative, the item will be scheduled "earlier" than now. This will result in the item being executed earlier than other items in the FIFO work order.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package api implements HTTP handlers for handling requests that the kubelet would normally implement, such as pod logs, exec, etc.
|
Package api implements HTTP handlers for handling requests that the kubelet would normally implement, such as pod logs, exec, etc. |