Documentation ¶
Overview ¶
Copyright 2019 The Kubeflow Authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Variables
- func AddResourceList(list, req, limit v1.ResourceList)
- func CalcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, ...) *v1.ResourceList
- func GenGeneralName(jobName string, rtype string, index string) string
- func MaxInt(x, y int) int
- func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error
- type FillPodGroupSpecFunc
- type GangScheduler
- type GangSchedulingSetupFunc
- type JobController
- func (jc *JobController) AddPod(obj interface{})
- func (jc *JobController) AddService(obj interface{})
- func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus, job interface{}) error
- func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.ReplicaType, spec *apiv1.ReplicaSpec, ...) error
- func (jc *JobController) DeletePdb(job metav1.Object) error
- func (jc *JobController) DeletePod(obj interface{})
- func (jc *JobController) DeletePodGroup(job metav1.Object) error
- func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job interface{}, pods []*corev1.Pod) error
- func (jc *JobController) DeleteService(obj interface{})
- func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)
- func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error)
- func (jc *JobController) GenLabels(jobName string) map[string]string
- func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference
- func (jc *JobController) GetPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) [][]*v1.Pod
- func (jc *JobController) GetPodsForJob(jobObject interface{}) ([]*v1.Pod, error)
- func (jc *JobController) GetPortsFromJob(spec *apiv1.ReplicaSpec) (map[string]int32, error)
- func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service
- func (jc *JobController) GetServicesForJob(jobObject interface{}) ([]*v1.Service, error)
- func (jc *JobController) PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool
- func (jc *JobController) PastBackoffLimit(jobName string, runPolicy *apiv1.RunPolicy, ...) (bool, error)
- func (jc *JobController) ReconcileJobs(job interface{}, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, ...) error
- func (jc *JobController) ReconcilePods(job interface{}, jobStatus *apiv1.JobStatus, pods []*v1.Pod, ...) error
- func (jc *JobController) ReconcileServices(job metav1.Object, services []*v1.Service, rtype apiv1.ReplicaType, ...) error
- func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec)
- func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*policyapi.PodDisruptionBudget, error)
- func (jc *JobController) SyncPodGroup(job metav1.Object, specFunc FillPodGroupSpecFunc) (metav1.Object, error)
- func (jc *JobController) UpdatePod(old, cur interface{})
- func (jc *JobController) UpdateService(old, cur interface{})
- type JobControllerConfiguration
- type PriorityClassGetFunc
- type ReplicaPriority
- type ReplicasPriority
Constants ¶
This section is empty.
Variables ¶
var GenNonGangSchedulerSetupFunc = func() GangSchedulingSetupFunc { return func(jc *JobController) { jc.Config.GangScheduling = GangSchedulerNone jc.PodGroupControl = nil } }
var GenSchedulerPluginsSetupFunc = func(c client.Client) GangSchedulingSetupFunc { return func(jc *JobController) { jc.Config.GangScheduling = GangSchedulerSchedulerPlugins jc.PodGroupControl = control.NewSchedulerPluginsControl(c) } }
var GenVolcanoSetupFunc = func(vci volcanoclient.Interface) GangSchedulingSetupFunc { return func(jc *JobController) { jc.Config.GangScheduling = GangSchedulerVolcano jc.PodGroupControl = control.NewVolcanoControl(vci) } }
var ( // KeyFunc is the short name to DeletionHandlingMetaNamespaceKeyFunc. // IndexerInformer uses a delta queue, therefore for deletes we have to use this // key function but it should be just fine for non delete events. KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc )
Functions ¶
func AddResourceList ¶
func AddResourceList(list, req, limit v1.ResourceList)
func CalcPGMinResources ¶
func CalcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, pcGetFunc PriorityClassGetFunc) *v1.ResourceList
func RecheckDeletionTimestamp ¶
RecheckDeletionTimestamp returns a CanAdopt() function to recheck deletion.
The CanAdopt() function calls getObject() to fetch the latest value, and denies adoption attempts if that object has a non-nil DeletionTimestamp.
Types ¶
type FillPodGroupSpecFunc ¶
type GangScheduler ¶
type GangScheduler string
const ( GangSchedulerNone GangScheduler = "None" GangSchedulerVolcano GangScheduler = "volcano" GangSchedulerSchedulerPlugins GangScheduler = "scheduler-plugins" GangSchedulerSchedulerPluginsSecond GangScheduler = "scheduler-plugins" GangSchedulerSchedulerPluginsDefault GangScheduler = "default-scheduler" )
type GangSchedulingSetupFunc ¶
type GangSchedulingSetupFunc func(jc *JobController)
type JobController ¶
type JobController struct { Controller apiv1.ControllerInterface Config JobControllerConfiguration // PodControl is used to add or delete pods. PodControl control.PodControlInterface // ServiceControl is used to add or delete services. ServiceControl control.ServiceControlInterface // KubeClientSet is a standard kubernetes clientset. KubeClientSet kubeclientset.Interface // PodGroupControl is used to add or delete PodGroup. PodGroupControl control.PodGroupControlInterface // PodLister can list/get pods from the shared informer's store. PodLister corelisters.PodLister // ServiceLister can list/get services from the shared informer's store. ServiceLister corelisters.ServiceLister // PriorityClassLister can list/get priorityClasses from the shared informer's store. PriorityClassLister schedulinglisters.PriorityClassLister // PodInformerSynced returns true if the pod store has been synced at least once. PodInformerSynced cache.InformerSynced // ServiceInformerSynced returns true if the service store has been synced at least once. ServiceInformerSynced cache.InformerSynced // PriorityClassInformerSynced returns true if the priority class store has been synced at least once. PriorityClassInformerSynced cache.InformerSynced // A TTLCache of pod/services creates/deletes each job expects to see // We use Job namespace/name + ReplicaType + pods/services as an expectation key, // For example, there is a TFJob with namespace "tf-operator" and name "tfjob-abc": // { // "PS": { // "Replicas": 2, // }, // "Worker": { // "Replicas": 4, // } // } // We will create 4 expectations: // - "tf-operator/tfjob-abc/ps/services", expects 2 adds. // - "tf-operator/tfjob-abc/ps/pods", expects 2 adds. // - "tf-operator/tfjob-abc/worker/services", expects 4 adds. // - "tf-operator/tfjob-abc/worker/pods", expects 4 adds. Expectations expectation.ControllerExpectationsInterface // WorkQueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. WorkQueue workqueue.RateLimitingInterface // Recorder is an event recorder for recording Event resources to the // Kubernetes API. Recorder record.EventRecorder }
JobController abstracts other operators to manage the lifecycle of Jobs. User need to first implement the ControllerInterface(objectA) and then initialize a JobController(objectB) struct with objectA as the parameter. And then call objectB.ReconcileJobs as mentioned below, the ReconcileJobs method is the entrypoint to trigger the reconcile logic of the job controller
ReconcileJobs(
job interface{}, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, jobStatus apiv1.JobStatus, runPolicy *apiv1.RunPolicy) error
func NewJobController ¶
func NewJobController( controllerImpl apiv1.ControllerInterface, reconcilerSyncPeriod metav1.Duration, kubeClientSet kubeclientset.Interface, setupPodGroup GangSchedulingSetupFunc, kubeInformerFactory kubeinformers.SharedInformerFactory, workQueueName string) JobController
func (*JobController) AddPod ¶
func (jc *JobController) AddPod(obj interface{})
When a pod is created, enqueue the job that manages it and update its expectations.
func (*JobController) AddService ¶
func (jc *JobController) AddService(obj interface{})
When a service is created, enqueue the controller that manages it and update its expectations.
func (*JobController) CleanupJob ¶
func (*JobController) CreateNewService ¶
func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.ReplicaType, spec *apiv1.ReplicaSpec, index string) error
createNewService creates a new service for the given index and type.
func (*JobController) DeletePod ¶
func (jc *JobController) DeletePod(obj interface{})
When a pod is deleted, enqueue the job that manages the pod and update its expectations. obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (*JobController) DeletePodGroup ¶
func (jc *JobController) DeletePodGroup(job metav1.Object) error
func (*JobController) DeletePodsAndServices ¶
func (*JobController) DeleteService ¶
func (jc *JobController) DeleteService(obj interface{})
When a service is deleted, enqueue the job that manages the service and update its expectations. obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (*JobController) FilterPodsForReplicaType ¶
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)
FilterPodsForReplicaType returns pods belong to a replicaType.
func (*JobController) FilterServicesForReplicaType ¶
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error)
FilterServicesForReplicaType returns service belong to a replicaType.
func (*JobController) GenLabels ¶
func (jc *JobController) GenLabels(jobName string) map[string]string
func (*JobController) GenOwnerReference ¶
func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference
func (*JobController) GetPodSlices ¶
getPodSlices returns a slice, which element is the slice of pod. It gives enough information to caller to make decision to up/down scale resources.
func (*JobController) GetPodsForJob ¶
func (jc *JobController) GetPodsForJob(jobObject interface{}) ([]*v1.Pod, error)
getPodsForJob returns the set of pods that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned Pods are pointers into the cache.
func (*JobController) GetPortsFromJob ¶
func (jc *JobController) GetPortsFromJob(spec *apiv1.ReplicaSpec) (map[string]int32, error)
GetPortsFromJob gets the ports of job container. Port could be nil, if distributed communication strategy doesn't need and no other ports that need to be exposed.
func (*JobController) GetServiceSlices ¶
func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service
GetServiceSlices returns a slice, which element is the slice of service. Assume the return object is serviceSlices, then serviceSlices[i] is an array of pointers to services corresponding to Services for replica i.
func (*JobController) GetServicesForJob ¶
func (jc *JobController) GetServicesForJob(jobObject interface{}) ([]*v1.Service, error)
getServicesForJob returns the set of services that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned services are pointers into the cache.
func (*JobController) PastActiveDeadline ¶
func (jc *JobController) PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool
PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func (*JobController) PastBackoffLimit ¶
func (jc *JobController) PastBackoffLimit(jobName string, runPolicy *apiv1.RunPolicy, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, pods []*corev1.Pod) (bool, error)
PastBackoffLimit checks if container restartCounts sum exceeds BackoffLimit this method applies only to pods when restartPolicy is one of OnFailure, Always or ExitCode
func (*JobController) ReconcileJobs ¶
func (jc *JobController) ReconcileJobs( job interface{}, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, jobStatus apiv1.JobStatus, runPolicy *apiv1.RunPolicy) error
ReconcileJobs checks and updates replicas for each given ReplicaSpec. It will requeue the job in case of an error while creating/deleting pods/services.
func (*JobController) ReconcilePods ¶
func (jc *JobController) ReconcilePods( job interface{}, jobStatus *apiv1.JobStatus, pods []*v1.Pod, rType apiv1.ReplicaType, spec *apiv1.ReplicaSpec, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error
ReconcilePods checks and updates pods for each given ReplicaSpec. It will requeue the job in case of an error while creating/deleting pods.
func (*JobController) ReconcileServices ¶
func (jc *JobController) ReconcileServices( job metav1.Object, services []*v1.Service, rtype apiv1.ReplicaType, spec *apiv1.ReplicaSpec) error
reconcileServices checks and updates services for each given ReplicaSpec. It will requeue the job in case of an error while creating/deleting services.
func (*JobController) ResetExpectations ¶
func (jc *JobController) ResetExpectations(jobKey string, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec)
ResetExpectations reset the expectation for creates and deletes of pod/service to zero.
func (*JobController) SyncPdb ¶
func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*policyapi.PodDisruptionBudget, error)
SyncPdb will create a PDB for gang scheduling.
func (*JobController) SyncPodGroup ¶
func (jc *JobController) SyncPodGroup(job metav1.Object, specFunc FillPodGroupSpecFunc) (metav1.Object, error)
func (*JobController) UpdatePod ¶
func (jc *JobController) UpdatePod(old, cur interface{})
When a pod is updated, figure out what job is managing it and wake it up. If the labels of the pod have changed we need to awaken both the old and new replica set. old and cur must be *v1.Pod types.
func (*JobController) UpdateService ¶
func (jc *JobController) UpdateService(old, cur interface{})
When a service is updated, figure out what job/s manage it and wake them up. If the labels of the service have changed we need to awaken both the old and new replica set. old and cur must be *v1.Service types.
type JobControllerConfiguration ¶
type JobControllerConfiguration struct { // GangScheduling choice: None, volcano and scheduler-plugins GangScheduling GangScheduler }
JobControllerConfiguration contains configuration of operator.
func (*JobControllerConfiguration) EnableGangScheduling ¶
func (c *JobControllerConfiguration) EnableGangScheduling() bool
type PriorityClassGetFunc ¶
type PriorityClassGetFunc func(string) (*schedulingv1.PriorityClass, error)
type ReplicaPriority ¶
type ReplicaPriority struct { apiv1.ReplicaSpec // contains filtered or unexported fields }
type ReplicasPriority ¶
type ReplicasPriority []ReplicaPriority
ReplicasPriority is a slice of ReplicaPriority.
func (ReplicasPriority) Len ¶
func (p ReplicasPriority) Len() int
func (ReplicasPriority) Less ¶
func (p ReplicasPriority) Less(i, j int) bool
func (ReplicasPriority) Swap ¶
func (p ReplicasPriority) Swap(i, j int)