Documentation ¶
Overview ¶
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- func CanIgnoreAPIError(err error) bool
- func FilterPods(pods []v1.Pod, phase v1.PodPhase) int
- func GenerateAppWrapperCondition(condType arbv1.AppWrapperConditionType, condStatus corev1.ConditionStatus, ...) arbv1.AppWrapperCondition
- func GeneratePodFailedCondition(podName string, podCondition []v1.PodCondition) arbv1.PendingPodSpec
- func GetPodResourcesByPhase(phase v1.PodPhase, pods []v1.Pod) *clusterstateapi.Resource
- func GetQueueJobKey(obj interface{}) (string, error)
- func GetXQJFullName(qj *arbv1.AppWrapper) string
- func HigherSystemPriorityQJ(qj1, qj2 *arbv1.AppWrapper) bool
- func IsJsonSyntaxError(err error) bool
- func PendingPodsFailedSchd(pods []v1.Pod) map[string][]v1.PodCondition
- type EtcdErrorClassifier
- type Heap
- func (h *Heap) Add(obj *arbv1.AppWrapper) error
- func (h *Heap) AddIfNotPresent(obj *arbv1.AppWrapper) error
- func (h *Heap) BulkAdd(list []*arbv1.AppWrapper) error
- func (h *Heap) Delete(obj interface{}) error
- func (h *Heap) Get(obj interface{}) (interface{}, bool, error)
- func (h *Heap) GetByKey(key string) (interface{}, bool, error)
- func (h *Heap) List() []interface{}
- func (h *Heap) Pop() (interface{}, error)
- func (h *Heap) Update(obj *arbv1.AppWrapper) error
- type JobAndClusterAgent
- type KeyFunc
- type LessFunc
- type PriorityQueue
- func (p *PriorityQueue) Add(qj *qjobv1.AppWrapper) error
- func (p *PriorityQueue) AddIfNotPresent(qj *qjobv1.AppWrapper) error
- func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) error
- func (p *PriorityQueue) Delete(qj *qjobv1.AppWrapper) error
- func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool
- func (p *PriorityQueue) IfExistActiveQ(qj *qjobv1.AppWrapper) bool
- func (p *PriorityQueue) IfExistUnschedulableQ(qj *qjobv1.AppWrapper) bool
- func (p *PriorityQueue) Length() int
- func (p *PriorityQueue) MoveAllToActiveQueue()
- func (p *PriorityQueue) MoveToActiveQueueIfExists(aw *qjobv1.AppWrapper) error
- func (p *PriorityQueue) Pop() (*qjobv1.AppWrapper, error)
- func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error
- type SchedulingQueue
- type UnschedulableQJMap
- type UnschedulableQJs
- type UnschedulableQueueJobs
- type XController
- func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper) error
- func (qjm *XController) GetAggregatedResources(cqj *arbv1.AppWrapper) *clusterstateapi.Resource
- func (qjm *XController) GetAggregatedResourcesPerGenericItem(cqj *arbv1.AppWrapper) []*clusterstateapi.Resource
- func (qjm *XController) GetQueueJobEligibleForPreemption(value *arbv1.AppWrapper) *arbv1.AppWrapper
- func (cc *XController) IsActiveAppWrapper(name, namespace string) bool
- func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper)
- func (cc *XController) Run(stopCh <-chan struct{})
- func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper)
- func (qjm *XController) UpdateAgent()
- func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error
- func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CanIgnoreAPIError ¶ added in v1.33.0
func FilterPods ¶ added in v1.34.0
filterPods returns pods based on their phase.
func GenerateAppWrapperCondition ¶
func GenerateAppWrapperCondition(condType arbv1.AppWrapperConditionType, condStatus corev1.ConditionStatus, condReason string, condMsg string) arbv1.AppWrapperCondition
GenerateAppWrapperCondition returns condition of a AppWrapper condition.
func GeneratePodFailedCondition ¶ added in v1.34.0
func GeneratePodFailedCondition(podName string, podCondition []v1.PodCondition) arbv1.PendingPodSpec
GeneratePodFailedCondition returns condition of a AppWrapper condition.
func GetPodResourcesByPhase ¶ added in v1.34.0
GetPodResourcesByPhase returns pods based on their phase.
func GetQueueJobKey ¶
func GetXQJFullName ¶
func GetXQJFullName(qj *arbv1.AppWrapper) string
func HigherSystemPriorityQJ ¶
func HigherSystemPriorityQJ(qj1, qj2 *arbv1.AppWrapper) bool
func IsJsonSyntaxError ¶ added in v1.33.0
func PendingPodsFailedSchd ¶ added in v1.34.0
func PendingPodsFailedSchd(pods []v1.Pod) map[string][]v1.PodCondition
PendingPodsFailedSchd checks if pods pending have failed scheduling
Types ¶
type EtcdErrorClassifier ¶ added in v1.33.0
type EtcdErrorClassifier struct { }
type Heap ¶
type Heap struct {
// contains filtered or unexported fields
}
Heap is a thread-safe producer/consumer queue that implements a heap data structure. It can be used to implement priority queues and similar data structures.
func (*Heap) Add ¶
func (h *Heap) Add(obj *arbv1.AppWrapper) error
Add inserts an item, and puts it in the queue. The item is updated if it already exists.
func (*Heap) AddIfNotPresent ¶
func (h *Heap) AddIfNotPresent(obj *arbv1.AppWrapper) error
AddIfNotPresent inserts an item, and puts it in the queue. If an item with the key is present in the map, no changes is made to the item.
func (*Heap) BulkAdd ¶
func (h *Heap) BulkAdd(list []*arbv1.AppWrapper) error
BulkAdd adds all the items in the list to the queue.
type JobAndClusterAgent ¶
type JobAndClusterAgent struct {
// contains filtered or unexported fields
}
type LessFunc ¶
type LessFunc func(qj1, qj2 *arbv1.AppWrapper) bool
type PriorityQueue ¶
type PriorityQueue struct {
// contains filtered or unexported fields
}
PriorityQueue implements a scheduling queue. It is an alternative to FIFO. The head of PriorityQueue is the highest priority pending QJ. This structure has two sub queues. One sub-queue holds QJ that are being considered for scheduling. This is called activeQ and is a Heap. Another queue holds pods that are already tried and are determined to be unschedulable. The latter is called unschedulableQ. Heap is already thread safe, but we need to acquire another lock here to ensure atomicity of operations on the two data structures..
func NewPriorityQueue ¶
func NewPriorityQueue() *PriorityQueue
func (*PriorityQueue) Add ¶
func (p *PriorityQueue) Add(qj *qjobv1.AppWrapper) error
Add adds a QJ to the active queue. It should be called only when a new QJ is added so there is no chance the QJ is already in either queue.
func (*PriorityQueue) AddIfNotPresent ¶
func (p *PriorityQueue) AddIfNotPresent(qj *qjobv1.AppWrapper) error
AddIfNotPresent adds a pod to the active queue if it is not present in any of the two queues. If it is present in any, it doesn't do any thing. used by queuejob_controller_ex.go
func (*PriorityQueue) AddUnschedulableIfNotPresent ¶
func (p *PriorityQueue) AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) error
AddUnschedulableIfNotPresent does nothing if the pod is present in either queue. Otherwise it adds the pod to the unschedulable queue if p.receivedMoveRequest is false, and to the activeQ if p.receivedMoveRequest is true. used by queuejob_controller_ex.go
func (*PriorityQueue) Delete ¶
func (p *PriorityQueue) Delete(qj *qjobv1.AppWrapper) error
Delete deletes the item from either of the two queues. It assumes the pod is only in one queue. used by queuejob_controller_ex.go
func (*PriorityQueue) IfExist ¶
func (p *PriorityQueue) IfExist(qj *qjobv1.AppWrapper) bool
func (*PriorityQueue) IfExistActiveQ ¶
func (p *PriorityQueue) IfExistActiveQ(qj *qjobv1.AppWrapper) bool
used by queuejob_controller_ex.go
func (*PriorityQueue) IfExistUnschedulableQ ¶
func (p *PriorityQueue) IfExistUnschedulableQ(qj *qjobv1.AppWrapper) bool
used by queuejob_controller_ex.go
func (*PriorityQueue) Length ¶
func (p *PriorityQueue) Length() int
func (*PriorityQueue) MoveAllToActiveQueue ¶
func (p *PriorityQueue) MoveAllToActiveQueue()
MoveAllToActiveQueue moves all pods from unschedulableQ to activeQ. This function adds all pods and then signals the condition variable to ensure that if Pop() is waiting for an item, it receives it after all the pods are in the queue and the head is the highest priority pod. TODO(bsalamat): We should add a back-off mechanism here so that a high priority pod which is unschedulable does not go to the head of the queue frequently. For example in a cluster where a lot of pods being deleted, such a high priority pod can deprive other pods from getting scheduled.
func (*PriorityQueue) MoveToActiveQueueIfExists ¶
func (p *PriorityQueue) MoveToActiveQueueIfExists(aw *qjobv1.AppWrapper) error
Move QJ from unschedulableQ to activeQ if exists used by queuejob_controller_ex.go
func (*PriorityQueue) Pop ¶
func (p *PriorityQueue) Pop() (*qjobv1.AppWrapper, error)
Pop removes the head of the active queue and returns it. It blocks if the activeQ is empty and waits until a new item is added to the queue. It also clears receivedMoveRequest to mark the beginning of a new scheduling cycle. used by queuejob_controller_ex.go
func (*PriorityQueue) Update ¶
func (p *PriorityQueue) Update(oldQJ, newQJ *qjobv1.AppWrapper) error
Update updates a pod in the active queue if present. Otherwise, it removes the item from the unschedulable queue and adds the updated one to the active queue.
type SchedulingQueue ¶
type SchedulingQueue interface { Add(qj *qjobv1.AppWrapper) error AddIfNotPresent(qj *qjobv1.AppWrapper) error AddUnschedulableIfNotPresent(qj *qjobv1.AppWrapper) error Pop() (*qjobv1.AppWrapper, error) Update(oldQJ, newQJ *qjobv1.AppWrapper) error Delete(QJ *qjobv1.AppWrapper) error MoveToActiveQueueIfExists(QJ *qjobv1.AppWrapper) error MoveAllToActiveQueue() IfExist(QJ *qjobv1.AppWrapper) bool IfExistActiveQ(QJ *qjobv1.AppWrapper) bool IfExistUnschedulableQ(QJ *qjobv1.AppWrapper) bool Length() int }
SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. The interface follows a pattern similar to cache.FIFO and cache.Heap and makes it easy to use those data structures as a SchedulingQueue.
func NewSchedulingQueue ¶
func NewSchedulingQueue() SchedulingQueue
NewSchedulingQueue initializes a new scheduling queue. If pod priority is enabled a priority queue is returned. If it is disabled, a FIFO is returned.
type UnschedulableQJMap ¶
type UnschedulableQJMap struct {
// contains filtered or unexported fields
}
UnschedulablePodsMap holds pods that cannot be scheduled. This data structure is used to implement unschedulableQ.
func (*UnschedulableQJMap) Add ¶
func (u *UnschedulableQJMap) Add(pod *qjobv1.AppWrapper)
Add adds a pod to the unschedulable pods.
func (*UnschedulableQJMap) Clear ¶
func (u *UnschedulableQJMap) Clear()
Clear removes all the entries from the unschedulable maps.
func (*UnschedulableQJMap) Delete ¶
func (u *UnschedulableQJMap) Delete(pod *qjobv1.AppWrapper)
Delete deletes a pod from the unschedulable pods.
func (*UnschedulableQJMap) Get ¶
func (u *UnschedulableQJMap) Get(pod *qjobv1.AppWrapper) *qjobv1.AppWrapper
Get returns the pod if a pod with the same key as the key of the given "pod" is found in the map. It returns nil otherwise.
func (*UnschedulableQJMap) Update ¶
func (u *UnschedulableQJMap) Update(pod *qjobv1.AppWrapper)
Update updates a pod in the unschedulable pods.
type UnschedulableQJs ¶
type UnschedulableQJs interface { Add(p *qjobv1.AppWrapper) Delete(p *qjobv1.AppWrapper) Update(p *qjobv1.AppWrapper) Get(p *qjobv1.AppWrapper) *qjobv1.AppWrapper Clear() }
UnschedulablePods is an interface for a queue that is used to keep unschedulable pods. These pods are not actively reevaluated for scheduling. They are moved to the active scheduling queue on certain events, such as termination of a pod in the cluster, addition of nodes, etc.
type UnschedulableQueueJobs ¶
type UnschedulableQueueJobs interface { Add(pod *qjobv1.AppWrapper) Delete(pod *qjobv1.AppWrapper) Update(pod *qjobv1.AppWrapper) Get(pod *qjobv1.AppWrapper) *qjobv1.AppWrapper Clear() }
type XController ¶
type XController struct {
// contains filtered or unexported fields
}
XController the AppWrapper Controller type
func NewJobController ¶
func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfiguration, extConfig *config.MCADConfigurationExtended) *XController
NewJobController create new AppWrapper Controller
func (*XController) Cleanup ¶
func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper) error
Cleanup function
func (*XController) GetAggregatedResources ¶
func (qjm *XController) GetAggregatedResources(cqj *arbv1.AppWrapper) *clusterstateapi.Resource
func (*XController) GetAggregatedResourcesPerGenericItem ¶
func (qjm *XController) GetAggregatedResourcesPerGenericItem(cqj *arbv1.AppWrapper) []*clusterstateapi.Resource
func (*XController) GetQueueJobEligibleForPreemption ¶ added in v1.34.1
func (qjm *XController) GetQueueJobEligibleForPreemption(value *arbv1.AppWrapper) *arbv1.AppWrapper
func (*XController) IsActiveAppWrapper ¶ added in v1.33.0
func (cc *XController) IsActiveAppWrapper(name, namespace string) bool
func (*XController) PreemptQueueJobs ¶
func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper)
TODO: We can use informer to filter AWs that do not meet the minScheduling spec. we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer
func (*XController) Run ¶
func (cc *XController) Run(stopCh <-chan struct{})
Run starts AppWrapper Controller
func (*XController) ScheduleNext ¶
func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper)
Thread to find queue-job(QJ) for next schedule
func (*XController) UpdateAgent ¶
func (qjm *XController) UpdateAgent()
func (*XController) UpdateQueueJobStatus ¶ added in v1.34.0
func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error
UpdateQueueJobStatus was part of pod informer, this is now a method of queuejob_controller file. This change is done in an effort to simplify the controller and enable to move to controller runtime.
func (*XController) UpdateQueueJobs ¶
func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper)
Move AW from Running to Completed or RunningHoldCompletion Do not use event queues! Running AWs move to Completed, from which it will never transition to any other state. State transition: Running->RunningHoldCompletion->Completed