Documentation ¶
Index ¶
- Constants
- func DeleteDemandIfExists(ctx context.Context, cache *cache.SafeDemandCache, pod *v1.Pod, source string)
- func StartDemandGC(ctx context.Context, podInformer coreinformers.PodInformer, ...)
- type Binpacker
- type ClusterRequests
- type DemandGC
- type NodeRequests
- type OverheadComputer
- type PodRequestInfo
- type ResourceReservationManager
- func (rrm *ResourceReservationManager) CompactDynamicAllocationApplications(ctx context.Context)
- func (rrm *ResourceReservationManager) CreateReservations(ctx context.Context, driver *v1.Pod, ...) (*v1beta2.ResourceReservation, error)
- func (rrm *ResourceReservationManager) FindAlreadyBoundReservationNode(ctx context.Context, executor *v1.Pod) (string, bool, error)
- func (rrm *ResourceReservationManager) FindUnboundReservationNodes(ctx context.Context, executor *v1.Pod) ([]string, bool, error)
- func (rrm *ResourceReservationManager) GetRemainingAllowedExecutorCount(ctx context.Context, appID string, namespace string) (int, error)
- func (rrm *ResourceReservationManager) GetReservedResources() resources.NodeGroupResources
- func (rrm *ResourceReservationManager) GetResourceReservation(appID string, namespace string) (*v1beta2.ResourceReservation, bool)
- func (rrm *ResourceReservationManager) GetSoftResourceReservation(appID string) (*cache.SoftReservation, bool)
- func (rrm *ResourceReservationManager) PodHasReservation(ctx context.Context, pod *v1.Pod) bool
- func (rrm *ResourceReservationManager) ReserveForExecutorOnRescheduledNode(ctx context.Context, executor *v1.Pod, node string) error
- func (rrm *ResourceReservationManager) ReserveForExecutorOnUnboundReservation(ctx context.Context, executor *v1.Pod, node string) error
- type SparkPodLister
- type SparkSchedulerExtender
- type UnschedulablePodMarker
Constants ¶
const ( // SingleAzTightlyPack will attempt to schedule pods within a single AZ // Note that single-az-tightly-pack does not guarantee that ALL pods will be scheduled in the same AZ, please see // the SingleAzTightlyPack docs for more information (see also ShouldScheduleDynamicallyAllocatedExecutorsInSameAZ) SingleAzTightlyPack string = "single-az-tightly-pack" // SingleAzMinimalFragmentation tries to reduce spark app fragmentation by trying to fit executors on fewer hosts // when possible. Dynamically allocated executors are a bit more challenging, but generally speaking we will // attempt to schedule them on host already running executors belonging to the same app. SingleAzMinimalFragmentation string = "single-az-minimal-fragmentation" )
Variables ¶
This section is empty.
Functions ¶
func DeleteDemandIfExists ¶
func DeleteDemandIfExists(ctx context.Context, cache *cache.SafeDemandCache, pod *v1.Pod, source string)
DeleteDemandIfExists removes a demand object if it exists, and emits an event tagged by the source of the deletion
func StartDemandGC ¶
func StartDemandGC(ctx context.Context, podInformer coreinformers.PodInformer, demandCache *cache.SafeDemandCache)
StartDemandGC initializes the DemandGC which handles events in the background
Types ¶
type Binpacker ¶
type Binpacker struct { Name string BinpackFunc binpack.SparkBinPackFunction IsSingleAz bool }
Binpacker is a BinpackFunc with a known name
func SelectBinpacker ¶
SelectBinpacker selects the binpack function from the given name
type ClusterRequests ¶
type ClusterRequests map[string]NodeRequests
ClusterRequests represents the pod requests in the cluster, indexed by node name
type DemandGC ¶
type DemandGC struct {
// contains filtered or unexported fields
}
DemandGC is a background pod event handler which deletes any demand we have previously created for a pod when a pod gets scheduled. We also delete demands elsewhere in the extender when we schedule the pod, but those can miss some demands due to race conditions.
type NodeRequests ¶
type NodeRequests map[types.UID]PodRequestInfo
NodeRequests represents the currently present pod requests on this node, indexed by pod uid
type OverheadComputer ¶
type OverheadComputer struct {
// contains filtered or unexported fields
}
OverheadComputer computes non spark scheduler managed pods total resources periodically
func NewOverheadComputer ¶
func NewOverheadComputer( ctx context.Context, podInformer coreinformers.PodInformer, resourceReservationManager *ResourceReservationManager, nodeLister corelisters.NodeLister) *OverheadComputer
NewOverheadComputer creates a new OverheadComputer instance
func (OverheadComputer) GetNonSchedulableOverhead ¶
func (o OverheadComputer) GetNonSchedulableOverhead(ctx context.Context, nodes []*v1.Node) resources.NodeGroupResources
GetNonSchedulableOverhead fills non-schedulable overhead information for given nodes. Non-schedulable overhead is overhead by pods that are running, but do not have 'spark-scheduler' as their scheduler name.
func (OverheadComputer) GetOverhead ¶
func (o OverheadComputer) GetOverhead(ctx context.Context, nodes []*v1.Node) resources.NodeGroupResources
GetOverhead fills overhead information for given nodes.
type PodRequestInfo ¶
type PodRequestInfo struct {
// contains filtered or unexported fields
}
PodRequestInfo holds information about a pod and its requested resources
type ResourceReservationManager ¶
type ResourceReservationManager struct {
// contains filtered or unexported fields
}
ResourceReservationManager is a central point which manages the creation and reading of both resource reservations and soft reservations
func NewResourceReservationManager ¶
func NewResourceReservationManager( ctx context.Context, resourceReservations *cache.ResourceReservationCache, softReservationStore *cache.SoftReservationStore, podLister *SparkPodLister, informer coreinformers.PodInformer) *ResourceReservationManager
NewResourceReservationManager creates and returns a ResourceReservationManager
func (*ResourceReservationManager) CompactDynamicAllocationApplications ¶
func (rrm *ResourceReservationManager) CompactDynamicAllocationApplications(ctx context.Context)
CompactDynamicAllocationApplications compacts reservations for executors belonging to dynamic allocation applications by moving any soft reservations to resource reservations occupied by now-dead executors. This ensures we have relatively up to date resource reservation objects and report correctly on reserved usage.
func (*ResourceReservationManager) CreateReservations ¶
func (rrm *ResourceReservationManager) CreateReservations( ctx context.Context, driver *v1.Pod, applicationResources *sparkApplicationResources, driverNode string, executorNodes []string) (*v1beta2.ResourceReservation, error)
CreateReservations creates the necessary reservations for an application whether those are resource reservation objects or in-memory soft reservations for extra executors.
func (*ResourceReservationManager) FindAlreadyBoundReservationNode ¶
func (rrm *ResourceReservationManager) FindAlreadyBoundReservationNode(ctx context.Context, executor *v1.Pod) (string, bool, error)
FindAlreadyBoundReservationNode returns a node name that was previously allocated to this executor if any, or false otherwise. Binding reservations have to be idempotent. Binding the pod to the node on kube-scheduler might fail, so we want to get the same executor pod on a retry.
func (*ResourceReservationManager) FindUnboundReservationNodes ¶
func (rrm *ResourceReservationManager) FindUnboundReservationNodes(ctx context.Context, executor *v1.Pod) ([]string, bool, error)
FindUnboundReservationNodes returns a slice of node names that have unbound reservations for this Spark application. This includes both reservations we have not yet scheduled any executors on as well as reservations that have executors that are now dead. Spark will recreate lost executors, so the replacement executors should be placed on the reserved spaces of dead executors.
func (*ResourceReservationManager) GetRemainingAllowedExecutorCount ¶
func (rrm *ResourceReservationManager) GetRemainingAllowedExecutorCount(ctx context.Context, appID string, namespace string) (int, error)
GetRemainingAllowedExecutorCount returns the number of executors the application can still schedule.
func (*ResourceReservationManager) GetReservedResources ¶
func (rrm *ResourceReservationManager) GetReservedResources() resources.NodeGroupResources
GetReservedResources returns the resources per node that are reserved for executors.
func (*ResourceReservationManager) GetResourceReservation ¶
func (rrm *ResourceReservationManager) GetResourceReservation(appID string, namespace string) (*v1beta2.ResourceReservation, bool)
GetResourceReservation returns the resource reservation for the passed pod, if any.
func (*ResourceReservationManager) GetSoftResourceReservation ¶ added in v0.55.0
func (rrm *ResourceReservationManager) GetSoftResourceReservation(appID string) (*cache.SoftReservation, bool)
GetSoftResourceReservation returns the soft resource reservation for this appId
func (*ResourceReservationManager) PodHasReservation ¶
PodHasReservation returns if the passed pod has any reservation whether it is a resource reservation or a soft reservation
func (*ResourceReservationManager) ReserveForExecutorOnRescheduledNode ¶
func (rrm *ResourceReservationManager) ReserveForExecutorOnRescheduledNode(ctx context.Context, executor *v1.Pod, node string) error
ReserveForExecutorOnRescheduledNode creates a reservation for the passed executor on the passed node by replacing another unbound reservation. This reservation could either be a resource reservation, or a soft reservation if dynamic allocation is enabled.
func (*ResourceReservationManager) ReserveForExecutorOnUnboundReservation ¶
func (rrm *ResourceReservationManager) ReserveForExecutorOnUnboundReservation(ctx context.Context, executor *v1.Pod, node string) error
ReserveForExecutorOnUnboundReservation binds a resource reservation already tied to the passed node to the executor. This will only succeed if there are unbound reservations on that node.
type SparkPodLister ¶
type SparkPodLister struct { corelisters.PodLister // contains filtered or unexported fields }
SparkPodLister is a PodLister which can also list drivers per node selector
func NewSparkPodLister ¶
func NewSparkPodLister(delegate corelisters.PodLister, instanceGroupLabel string) *SparkPodLister
NewSparkPodLister creates and initializes a SparkPodLister
func (SparkPodLister) ListEarlierDrivers ¶
ListEarlierDrivers lists earlier driver than the given driver that has the same node selectors
type SparkSchedulerExtender ¶
type SparkSchedulerExtender struct {
// contains filtered or unexported fields
}
SparkSchedulerExtender is a kubernetes scheduler extended responsible for ensuring a spark driver and all of the executors can be scheduled together given current resources available across the nodes
func NewExtender ¶
func NewExtender( nodeLister corelisters.NodeLister, podLister *SparkPodLister, resourceReservations *cache.ResourceReservationCache, softReservationStore *cache.SoftReservationStore, resourceReservationManager *ResourceReservationManager, coreClient corev1.CoreV1Interface, demands *cache.SafeDemandCache, apiExtensionsClient apiextensionsclientset.Interface, isFIFO bool, fifoConfig config.FifoConfig, binpacker *Binpacker, shouldScheduleDynamicallyAllocatedExecutorsInSameAZ bool, overheadComputer *OverheadComputer, instanceGroupLabel string, nodeSorter *ns.NodeSorter, wasteMetricsReporter *metrics.WasteMetricsReporter) *SparkSchedulerExtender
NewExtender is responsible for creating and initializing a SparkSchedulerExtender
func (*SparkSchedulerExtender) Predicate ¶
func (s *SparkSchedulerExtender) Predicate(ctx context.Context, args schedulerapi.ExtenderArgs) *schedulerapi.ExtenderFilterResult
Predicate is responsible for returning a filtered list of nodes that qualify to schedule the pod provided in the ExtenderArgs
type UnschedulablePodMarker ¶
type UnschedulablePodMarker struct {
// contains filtered or unexported fields
}
UnschedulablePodMarker checks for spark scheduler managed pending driver pods and checks if they can fit if the cluster was empty, else marks them with a custom pod condition.
func NewUnschedulablePodMarker ¶
func NewUnschedulablePodMarker( nodeLister corelisters.NodeLister, podLister corelisters.PodLister, coreClient corev1.CoreV1Interface, overheadComputer *OverheadComputer, binpacker *Binpacker, timeoutDuration time.Duration) *UnschedulablePodMarker
NewUnschedulablePodMarker creates a new UnschedulablePodMarker
func (*UnschedulablePodMarker) DoesPodExceedClusterCapacity ¶
func (u *UnschedulablePodMarker) DoesPodExceedClusterCapacity(ctx context.Context, driver *v1.Pod) (bool, error)
DoesPodExceedClusterCapacity checks if the provided driver pod could ever fit to the cluster
func (*UnschedulablePodMarker) Start ¶
func (u *UnschedulablePodMarker) Start(ctx context.Context)
Start starts periodic scanning for unschedulable applications