extender

package
v0.57.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2023 License: Apache-2.0 Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// SingleAzTightlyPack will attempt to schedule pods within a single AZ
	// Note that single-az-tightly-pack does not guarantee that ALL pods will be scheduled in the same AZ, please see
	// the SingleAzTightlyPack docs for more information (see also ShouldScheduleDynamicallyAllocatedExecutorsInSameAZ)
	SingleAzTightlyPack string = "single-az-tightly-pack"
	// SingleAzMinimalFragmentation tries to reduce spark app fragmentation by trying to fit executors on fewer hosts
	// when possible. Dynamically allocated executors are a bit more challenging, but generally speaking we will
	// attempt to schedule them on host already running executors belonging to the same app.
	SingleAzMinimalFragmentation string = "single-az-minimal-fragmentation"
)

Variables

This section is empty.

Functions

func DeleteDemandIfExists

func DeleteDemandIfExists(ctx context.Context, cache *cache.SafeDemandCache, pod *v1.Pod, source string)

DeleteDemandIfExists removes a demand object if it exists, and emits an event tagged by the source of the deletion

func StartDemandGC

func StartDemandGC(ctx context.Context, podInformer coreinformers.PodInformer, demandCache *cache.SafeDemandCache)

StartDemandGC initializes the DemandGC which handles events in the background

Types

type Binpacker

type Binpacker struct {
	Name        string
	BinpackFunc binpack.SparkBinPackFunction
	IsSingleAz  bool
}

Binpacker is a BinpackFunc with a known name

func SelectBinpacker

func SelectBinpacker(name string) *Binpacker

SelectBinpacker selects the binpack function from the given name

type ClusterRequests

type ClusterRequests map[string]NodeRequests

ClusterRequests represents the pod requests in the cluster, indexed by node name

type DemandGC

type DemandGC struct {
	// contains filtered or unexported fields
}

DemandGC is a background pod event handler which deletes any demand we have previously created for a pod when a pod gets scheduled. We also delete demands elsewhere in the extender when we schedule the pod, but those can miss some demands due to race conditions.

type NodeRequests

type NodeRequests map[types.UID]PodRequestInfo

NodeRequests represents the currently present pod requests on this node, indexed by pod uid

type OverheadComputer

type OverheadComputer struct {
	// contains filtered or unexported fields
}

OverheadComputer computes non spark scheduler managed pods total resources periodically

func NewOverheadComputer

func NewOverheadComputer(
	ctx context.Context,
	podInformer coreinformers.PodInformer,
	resourceReservationManager *ResourceReservationManager,
	nodeLister corelisters.NodeLister) *OverheadComputer

NewOverheadComputer creates a new OverheadComputer instance

func (OverheadComputer) GetNonSchedulableOverhead

func (o OverheadComputer) GetNonSchedulableOverhead(ctx context.Context, nodes []*v1.Node) resources.NodeGroupResources

GetNonSchedulableOverhead fills non-schedulable overhead information for given nodes. Non-schedulable overhead is overhead by pods that are running, but do not have 'spark-scheduler' as their scheduler name.

func (OverheadComputer) GetOverhead

func (o OverheadComputer) GetOverhead(ctx context.Context, nodes []*v1.Node) resources.NodeGroupResources

GetOverhead fills overhead information for given nodes.

type PodRequestInfo

type PodRequestInfo struct {
	// contains filtered or unexported fields
}

PodRequestInfo holds information about a pod and its requested resources

type ResourceReservationManager

type ResourceReservationManager struct {
	// contains filtered or unexported fields
}

ResourceReservationManager is a central point which manages the creation and reading of both resource reservations and soft reservations

func NewResourceReservationManager

func NewResourceReservationManager(
	ctx context.Context,
	resourceReservations *cache.ResourceReservationCache,
	softReservationStore *cache.SoftReservationStore,
	podLister *SparkPodLister,
	informer coreinformers.PodInformer) *ResourceReservationManager

NewResourceReservationManager creates and returns a ResourceReservationManager

func (*ResourceReservationManager) CompactDynamicAllocationApplications

func (rrm *ResourceReservationManager) CompactDynamicAllocationApplications(ctx context.Context)

CompactDynamicAllocationApplications compacts reservations for executors belonging to dynamic allocation applications by moving any soft reservations to resource reservations occupied by now-dead executors. This ensures we have relatively up to date resource reservation objects and report correctly on reserved usage.

func (*ResourceReservationManager) CreateReservations

func (rrm *ResourceReservationManager) CreateReservations(
	ctx context.Context,
	driver *v1.Pod,
	applicationResources *sparkApplicationResources,
	driverNode string,
	executorNodes []string) (*v1beta2.ResourceReservation, error)

CreateReservations creates the necessary reservations for an application whether those are resource reservation objects or in-memory soft reservations for extra executors.

func (*ResourceReservationManager) FindAlreadyBoundReservationNode

func (rrm *ResourceReservationManager) FindAlreadyBoundReservationNode(ctx context.Context, executor *v1.Pod) (string, bool, error)

FindAlreadyBoundReservationNode returns a node name that was previously allocated to this executor if any, or false otherwise. Binding reservations have to be idempotent. Binding the pod to the node on kube-scheduler might fail, so we want to get the same executor pod on a retry.

func (*ResourceReservationManager) FindUnboundReservationNodes

func (rrm *ResourceReservationManager) FindUnboundReservationNodes(ctx context.Context, executor *v1.Pod) ([]string, bool, error)

FindUnboundReservationNodes returns a slice of node names that have unbound reservations for this Spark application. This includes both reservations we have not yet scheduled any executors on as well as reservations that have executors that are now dead. Spark will recreate lost executors, so the replacement executors should be placed on the reserved spaces of dead executors.

func (*ResourceReservationManager) GetRemainingAllowedExecutorCount

func (rrm *ResourceReservationManager) GetRemainingAllowedExecutorCount(ctx context.Context, appID string, namespace string) (int, error)

GetRemainingAllowedExecutorCount returns the number of executors the application can still schedule.

func (*ResourceReservationManager) GetReservedResources

func (rrm *ResourceReservationManager) GetReservedResources() resources.NodeGroupResources

GetReservedResources returns the resources per node that are reserved for executors.

func (*ResourceReservationManager) GetResourceReservation

func (rrm *ResourceReservationManager) GetResourceReservation(appID string, namespace string) (*v1beta2.ResourceReservation, bool)

GetResourceReservation returns the resource reservation for the passed pod, if any.

func (*ResourceReservationManager) GetSoftResourceReservation added in v0.55.0

func (rrm *ResourceReservationManager) GetSoftResourceReservation(appID string) (*cache.SoftReservation, bool)

GetSoftResourceReservation returns the soft resource reservation for this appId

func (*ResourceReservationManager) PodHasReservation

func (rrm *ResourceReservationManager) PodHasReservation(ctx context.Context, pod *v1.Pod) bool

PodHasReservation returns if the passed pod has any reservation whether it is a resource reservation or a soft reservation

func (*ResourceReservationManager) ReserveForExecutorOnRescheduledNode

func (rrm *ResourceReservationManager) ReserveForExecutorOnRescheduledNode(ctx context.Context, executor *v1.Pod, node string) error

ReserveForExecutorOnRescheduledNode creates a reservation for the passed executor on the passed node by replacing another unbound reservation. This reservation could either be a resource reservation, or a soft reservation if dynamic allocation is enabled.

func (*ResourceReservationManager) ReserveForExecutorOnUnboundReservation

func (rrm *ResourceReservationManager) ReserveForExecutorOnUnboundReservation(ctx context.Context, executor *v1.Pod, node string) error

ReserveForExecutorOnUnboundReservation binds a resource reservation already tied to the passed node to the executor. This will only succeed if there are unbound reservations on that node.

type SparkPodLister

type SparkPodLister struct {
	corelisters.PodLister
	// contains filtered or unexported fields
}

SparkPodLister is a PodLister which can also list drivers per node selector

func NewSparkPodLister

func NewSparkPodLister(delegate corelisters.PodLister, instanceGroupLabel string) *SparkPodLister

NewSparkPodLister creates and initializes a SparkPodLister

func (SparkPodLister) ListEarlierDrivers

func (s SparkPodLister) ListEarlierDrivers(driver *v1.Pod) ([]*v1.Pod, error)

ListEarlierDrivers lists earlier driver than the given driver that has the same node selectors

type SparkSchedulerExtender

type SparkSchedulerExtender struct {
	// contains filtered or unexported fields
}

SparkSchedulerExtender is a kubernetes scheduler extended responsible for ensuring a spark driver and all of the executors can be scheduled together given current resources available across the nodes

func NewExtender

func NewExtender(
	nodeLister corelisters.NodeLister,
	podLister *SparkPodLister,
	resourceReservations *cache.ResourceReservationCache,
	softReservationStore *cache.SoftReservationStore,
	resourceReservationManager *ResourceReservationManager,
	coreClient corev1.CoreV1Interface,
	demands *cache.SafeDemandCache,
	apiExtensionsClient apiextensionsclientset.Interface,
	isFIFO bool,
	fifoConfig config.FifoConfig,
	binpacker *Binpacker,
	shouldScheduleDynamicallyAllocatedExecutorsInSameAZ bool,
	overheadComputer *OverheadComputer,
	instanceGroupLabel string,
	nodeSorter *ns.NodeSorter,
	wasteMetricsReporter *metrics.WasteMetricsReporter) *SparkSchedulerExtender

NewExtender is responsible for creating and initializing a SparkSchedulerExtender

func (*SparkSchedulerExtender) Predicate

Predicate is responsible for returning a filtered list of nodes that qualify to schedule the pod provided in the ExtenderArgs

type UnschedulablePodMarker

type UnschedulablePodMarker struct {
	// contains filtered or unexported fields
}

UnschedulablePodMarker checks for spark scheduler managed pending driver pods and checks if they can fit if the cluster was empty, else marks them with a custom pod condition.

func NewUnschedulablePodMarker

func NewUnschedulablePodMarker(
	nodeLister corelisters.NodeLister,
	podLister corelisters.PodLister,
	coreClient corev1.CoreV1Interface,
	overheadComputer *OverheadComputer,
	binpacker *Binpacker,
	timeoutDuration time.Duration) *UnschedulablePodMarker

NewUnschedulablePodMarker creates a new UnschedulablePodMarker

func (*UnschedulablePodMarker) DoesPodExceedClusterCapacity

func (u *UnschedulablePodMarker) DoesPodExceedClusterCapacity(ctx context.Context, driver *v1.Pod) (bool, error)

DoesPodExceedClusterCapacity checks if the provided driver pod could ever fit to the cluster

func (*UnschedulablePodMarker) Start

func (u *UnschedulablePodMarker) Start(ctx context.Context)

Start starts periodic scanning for unschedulable applications

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL