scheduler

package
v0.42.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: Apache-2.0 Imports: 10 Imported by: 3

README

Knative Eventing Multi-Tenant Scheduler with High-Availability

An eventing source instance (for example, KafkaSource, RedisStreamSource, etc) gets materialized as a virtual pod (vpod) and can be scaled up and down by increasing or decreasing the number of virtual pod replicas (vreplicas). A vreplica corresponds to a resource in the source that can replicated for maximum distributed processing (for example, number of consumers running in a consumer group).

The vpod multi-tenant scheduler is responsible for placing vreplicas onto real Kubernetes pods. Each pod is limited in capacity and can hold a maximum number of vreplicas. The scheduler takes a list of (source, # of vreplicas) tuples and computes a set of Placements. Placement info are added to the source status.

Scheduling strategies rely on pods having a sticky identity (StatefulSet replicas) and the current State of the cluster.

When a vreplica cannot be scheduled it is added to the list of pending vreplicas. The Autoscaler monitors this list and allocates more pods for placing it.

To support high-availability the scheduler distributes vreplicas uniformly across failure domains such as zones/nodes/pods containing replicas from a StatefulSet.

General Scheduler Requirements

  1. High Availability: Vreplicas for a source must be evenly spread across domains to reduce impact of failure when a zone/node/pod goes unavailable for scheduling.*

  2. Equal event consumption: Vreplicas for a source must be evenly spread across adapter pods to provide an equal rate of processing events. For example, Kafka broker spreads partitions equally across pods so if vreplicas aren’t equally spread, pods with fewer vreplicas will consume events slower than others.

  3. Pod spread not more than available resources: Vreplicas for a source must be evenly spread across pods such that the total number of pods with placements does not exceed the number of resources available from the source (for example, number of Kafka partitions for the topic it's consuming from). Else, the additional pods have no resources (Kafka partitions) to consume events from and could waste Kubernetes resources.

  • Note: StatefulSet anti-affinity rules guarantee new pods to be scheduled on a new zone and node.

Components:

1.Scheduler

The scheduling framework has a pluggable architecture where plugins are registered and compiled into the scheduler. It allows many scheduling features to be implemented as plugins, while keeping the scheduling "core" simple and maintainable.

Scheduling happens in a series of stages:

  1. Filter: These plugins (predicates) are used to filter out pods where a vreplica cannot be placed. If any filter plugin marks the pod as infeasible, the remaining plugins will not be called for that pod. A vreplica is marked as unschedulable if no pods pass all the filters.

  2. Score: These plugins (priorities) provide a score to each pod that has passed the filtering phase. Scheduler will then select the pod with the highest weighted scores sum.

Scheduler must be Knative generic with its core functionality implemented as core plugins. Anything specific to an eventing source will be implemented as separate plugins (for example, number of Kafka partitions)

It allocates one vreplica at a time by filtering and scoring schedulable pods.

A vreplica can be unschedulable for several reasons such as pods not having enough capacity, constraints cannot be fulfilled, etc.

2.Descheduler

Similar to scheduler but has its own set of priorities (no predicates today).

3.Autoscaler

The autoscaler scales up pod replicas of the statefulset adapter when there are vreplicas pending to be scheduled, and scales down if there are unused pods. It takes into consideration a scaling factor that is based on number of domains for HA.

4.State Collector

Current state information about the cluster is collected after placing each vreplica and during intervals. Cluster information include computing the free capacity for each pod, list of schedulable pods (unschedulable pods are pods that are marked for eviction for compacting, and pods that are on unschedulable nodes (cordoned or unreachable nodes), number of pods (stateful set replicas), number of available nodes, number of zones, a node to zone map, total number of vreplicas in each pod for each vpod (spread), total number of vreplicas in each node for each vpod (spread), total number of vreplicas in each zone for each vpod (spread), etc.

5.Reservation

Scheduler also tracks vreplicas that have been placed (ie. scheduled) but haven't been committed yet to its vpod status. These reserved veplicas are taken into consideration when computing cluster's state for scheduling the next vreplica.

6.Evictor

Autoscaler periodically attempts to compact veplicas into a smaller number of free replicas with lower ordinals. Vreplicas placed on higher ordinal pods are evicted and rescheduled to pods with a lower ordinal using the same scheduling strategies.

Scheduler Profile

Predicates:
  1. PodFitsResources: check if a pod has enough capacity [CORE]

  2. NoMaxResourceCount: check if total number of placement pods exceed available resources [KAFKA]. It has an argument NumPartitions to configure the plugin with the total number of Kafka partitions.

  3. EvenPodSpread: check if resources are evenly spread across pods [CORE]. It has an argument MaxSkew to configure the plugin with an allowed skew factor.

Priorities:
  1. AvailabilityNodePriority: make sure resources are evenly spread across nodes [CORE]. It has an argument MaxSkew to configure the plugin with an allowed skew factor.

  2. AvailabilityZonePriority: make sure resources are evenly spread across zones [CORE]. It has an argument MaxSkew to configure the plugin with an allowed skew factor.

  3. LowestOrdinalPriority: make sure vreplicas are placed on free smaller ordinal pods to minimize resource usage [CORE]

Example ConfigMap for config-scheduler:

data:
  predicates: |+
                  [
                    {"Name": "PodFitsResources"},
                    {"Name": "NoMaxResourceCount",
                    "Args": "{\"NumPartitions\": 100}"},
                    {"Name": "EvenPodSpread",
                    "Args": "{\"MaxSkew\": 2}"}
                  ]
  priorities: |+
                  [
                    {"Name": "AvailabilityZonePriority",
                    "Weight": 10,
                    "Args":  "{\"MaxSkew\": 2}"},
                    {"Name": "LowestOrdinalPriority",
                    "Weight": 2}
                  ]

Descheduler Profile:

Priorities:
  1. RemoveWithAvailabilityNodePriority: make sure resources are evenly spread across nodes [CORE]

  2. RemoveWithAvailabilityZonePriority: make sure resources are evenly spread across zones [CORE]

  3. HighestOrdinalPriority: make sure vreps are removed from higher ordinal pods to minimize resource usage [CORE]

Example ConfigMap for config-descheduler:

data:
  priorities: |+
                  [
                    {"Name": "RemoveWithEvenPodSpreadPriority",
                    "Weight": 10,
                    "Args": "{\"MaxSkew\": 2}"},
                    {"Name": "RemoveWithAvailabilityZonePriority",
                    "Weight": 10,
                    "Args":  "{\"MaxSkew\": 2}"},
                    {"Name": "RemoveWithHighestOrdinalPriority",
                    "Weight": 2}
                  ]

Normal Operation

  1. Busy scheduler:

Scheduler can be very busy allocating the best placements for multiple eventing sources at a time using the scheduler predicates and priorities configured. During this time, the cluster could see statefulset replicas increasing, as the autoscaler computes how many more pods are needed to complete scheduling successfully. Also, the replicas could be decreasing during idle time, either caused by less events flowing through the system, or the evictor compacting vreplicas placements into a smaller number of pods or the deletion of event sources. The current placements are stored in the eventing source's status field for observability.

  1. Software upgrades:

We can expect periodic software version upgrades or fixes to be performed on the Kubernetes cluster running the scheduler or on the Knative framework installed. Either of these scenarios could involve graceful rebooting of nodes and/or reapplying of controllers, adapters and other resources.

All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica scheduler. (For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member changes.)

TODO: Measure latencies in events processing using a performance tool (KPerf eventing).

  1. No more cluster resources:

When there are no resources available on existing nodes in the cluster to schedule more pods and the autoscaler continues to scale up replicas, the new pods are left in a Pending state till cluster size is increased. Nothing to do for the scheduler until then.

Disaster Recovery

Some failure scenarios are described below:

  1. Pod failure:

When a pod/replica in a StatefulSet goes down due to some reason (but its node and zone are healthy), a new replica is spun up by the StatefulSet with the same pod identity (pod can come up on a different node) almost immediately.

All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica scheduler. (For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member changes.)

TODO: Measure latencies in events processing using a performance tool (KPerf eventing).

  1. Node failure (graceful):

When a node is rebooted for upgrades etc, running pods on the node will be evicted (drained), gracefully terminated and rescheduled on a different node. The drained node will be marked as unschedulable by K8 (node.Spec.Unschedulable = True) after its cordoning.

k describe node knative-worker4
Name:               knative-worker4
CreationTimestamp:  Mon, 30 Aug 2021 11:13:11 -0400
Taints:            none
Unschedulable:      true

All existing vreplica placements will still be valid and no rebalancing will be done by the vreplica scheduler. (For Kafka, its broker may trigger a rebalancing of partitions due to consumer group member changes.)

TODO: Measure latencies in events processing using a performance tool (KPerf eventing).

New vreplicas will not be scheduled on pods running on this cordoned node.

  1. Node failure (abrupt):

When a node goes down unexpectedly due to some physical machine failure (network isolation/ loss, CPU issue, power loss, etc), the node controller does the following few steps

Pods running on the failed node receives a NodeNotReady Warning event

k describe pod kafkasource-mt-adapter-5 -n knative-eventing
Name:         kafkasource-mt-adapter-5
Namespace:    knative-eventing
Priority:     0
Node:         knative-worker4/172.18.0.3
Tolerations:                 node.kubernetes.io/not-ready:NoExecute op=Exists for 300s                 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s

Events:
  Type     Reason        Age    From               Message
  ----     ------        ----   ----               -------
  Normal   Scheduled     11m    default-scheduler  Successfully assigned knative-eventing/kafkasource-mt-adapter-5 to knative-worker4
  Normal   Pulled        11m    kubelet            Container image
  Normal   Created       11m    kubelet           Created container receive-adapter
  Normal   Started       11m    kubelet            Started container receive-adapter
  Warning  NodeNotReady  3m48s  node-controller    Node is not ready

Failing node is tainted with the following Key:Condition: by the node controller if the node controller has not heard from the node in the last node-monitor-grace-period (default is 40 seconds)

k describe node knative-worker4
Name:               knative-worker4
Taints:             node.kubernetes.io/unreachable:NoExecute
                       node.kubernetes.io/unreachable:NoSchedule
Unschedulable:      false
 Events:
  Type    Reason              Age    From     Message
  ----    ------              ----   ----     -------
  Normal  NodeNotSchedulable  5m42s  kubelet  Node knative-worker4 status is now: NodeNotSchedulable
  Normal  NodeSchedulable     2m31s  kubelet  Node knative-worker4 status is now: NodeSchedulable
k get nodes
NAME                    STATUS     ROLES                  AGE     VERSION
knative-control-plane   Ready      control-plane,master   7h23m   v1.21.1
knative-worker          Ready      <none>                 7h23m   v1.21.1
knative-worker2         Ready      <none>                 7h23m   v1.21.1
knative-worker3         Ready      <none>                 7h23m   v1.21.1
knative-worker4         NotReady   <none>                 7h23m   v1.21.1

After a timeout period (pod-eviction-timeout == 5 mins (default)), the pods move to the Terminating state.

Since statefulset now has a terminationGracePeriodSeconds: 0 setting, the terminating pods are immediately restarted on another functioning Node. A new replica is spun up with the same ordinal.

During the time period of the failing node being unreachable (~5mins), vreplicas placed on that pod aren’t available to process work from the eventing source. (Theory) Consumption rate goes down and Kafka eventually triggers rebalancing of partitions. Also, KEDA will scale up the number of consumers to resolve the processing lag. A scale up will cause the Eventing scheduler to rebalance the total vreplicas for that source on available running pods.

  1. Zone failure:

All nodes running in the failing zone will be unavailable for scheduling. Nodes will either be tainted with unreachable or Spec’ed as Unschedulable See node failure scenarios above for what happens to vreplica placements.

References:


To learn more about Knative, please visit the /docs repository.

This repo falls under the Knative Code of Conduct

Documentation

Overview

The scheduler is responsible for placing virtual pod (VPod) replicas within real pods.

Index

Constants

View Source
const (
	ZoneLabel = "topology.kubernetes.io/zone"

	UnknownZone = "unknown"
)
View Source
const (
	// MaxWeight is the maximum weight that can be assigned for a priority.
	MaxWeight uint64 = 10
	// MinWeight is the minimum weight that can be assigned for a priority.
	MinWeight uint64 = 0
)

Variables

This section is empty.

Functions

func GetPlacementForPod

func GetPlacementForPod(placements []duckv1alpha1.Placement, podName string) *duckv1alpha1.Placement

GetPlacementForPod returns the placement corresponding to podName

func GetPodCount

func GetPodCount(placements []duckv1alpha1.Placement) int

GetPodCount returns the number of pods with the given placements

func GetTotalVReplicas

func GetTotalVReplicas(placements []duckv1alpha1.Placement) int32

GetTotalVReplicas returns the total number of placed virtual replicas

Types

type Evictor

type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) error

Evictor allows for vreplicas to be evicted. For instance, the evictor is used by the statefulset scheduler to move vreplicas to pod with a lower ordinal.

type PredicatePolicy

type PredicatePolicy struct {
	// Identifier of the predicate policy
	Name string `json:"name"`
	// Holds the parameters to configure the given predicate
	Args interface{} `json:"args"`
}

PredicatePolicy describes a struct of a predicate policy.

type PriorityPolicy

type PriorityPolicy struct {
	// Identifier of the priority policy
	Name string `json:"name"`
	// The numeric multiplier for the pod scores that the priority function generates
	// The weight should be a positive integer
	Weight uint64 `json:"weight"`
	// Holds the parameters to configure the given priority function
	Args interface{} `json:"args"`
}

PriorityPolicy describes a struct of a priority policy.

type ScaleCache added in v0.41.0

type ScaleCache struct {
	// contains filtered or unexported fields
}

func NewScaleCache added in v0.41.0

func NewScaleCache(ctx context.Context, namespace string, scaleClient ScaleClient, config ScaleCacheConfig) *ScaleCache

func (*ScaleCache) GetScale added in v0.41.0

func (sc *ScaleCache) GetScale(ctx context.Context, statefulSetName string, options metav1.GetOptions) (*autoscalingv1.Scale, error)

func (*ScaleCache) Reset added in v0.41.0

func (sc *ScaleCache) Reset()

func (*ScaleCache) UpdateScale added in v0.41.0

func (sc *ScaleCache) UpdateScale(ctx context.Context, statefulSetName string, scale *autoscalingv1.Scale, opts metav1.UpdateOptions) (*autoscalingv1.Scale, error)

type ScaleCacheConfig added in v0.41.0

type ScaleCacheConfig struct {
	RefreshPeriod time.Duration `json:"refreshPeriod"`
}

type ScaleClient added in v0.41.0

type ScaleClient interface {
	GetScale(ctx context.Context, name string, options metav1.GetOptions) (*autoscalingv1.Scale, error)
	UpdateScale(ctx context.Context, name string, scale *autoscalingv1.Scale, options metav1.UpdateOptions) (*autoscalingv1.Scale, error)
}

type Scheduler

type Scheduler interface {
	// Schedule computes the new set of placements for vpod.
	Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)
}

Scheduler is responsible for placing VPods into real Kubernetes pods

type SchedulerFunc added in v0.29.0

type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)

SchedulerFunc type is an adapter to allow the use of ordinary functions as Schedulers. If f is a function with the appropriate signature, SchedulerFunc(f) is a Scheduler that calls f.

func (SchedulerFunc) Schedule added in v0.29.0

func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)

Schedule implements the Scheduler interface.

type SchedulerPolicy

type SchedulerPolicy struct {
	// Holds the information to configure the fit predicate functions.
	Predicates []PredicatePolicy `json:"predicates"`
	// Holds the information to configure the priority functions.
	Priorities []PriorityPolicy `json:"priorities"`
}

Policy describes a struct of a policy resource.

type SchedulerPolicyType

type SchedulerPolicyType string
const (
	// MAXFILLUP policy type adds vreplicas to existing pods to fill them up before adding to new pods
	MAXFILLUP SchedulerPolicyType = "MAXFILLUP"

	// PodAnnotationKey is an annotation used by the scheduler to be informed of pods
	// being evicted and not use it for placing vreplicas
	PodAnnotationKey = "eventing.knative.dev/unschedulable"
)

type VPod

type VPod interface {
	// GetKey returns the VPod key (namespace/name).
	GetKey() types.NamespacedName

	// GetVReplicas returns the number of expected virtual replicas
	GetVReplicas() int32

	// GetPlacements returns the current list of placements
	// Do not mutate!
	GetPlacements() []duckv1alpha1.Placement

	GetResourceVersion() string
}

VPod represents virtual replicas placed into real Kubernetes pods The scheduler is responsible for placing VPods

type VPodLister

type VPodLister func() ([]VPod, error)

VPodLister is the function signature for returning a list of VPods

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL