common

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 26, 2021 License: Apache-2.0 Imports: 35 Imported by: 3

Documentation

Overview

Copyright 2019 The Kubeflow Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

View Source
var (
	// KeyFunc is the short name to DeletionHandlingMetaNamespaceKeyFunc.
	// IndexerInformer uses a delta queue, therefore for deletes we have to use this
	// key function but it should be just fine for non delete events.
	KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc
)

Functions

func GenGeneralName

func GenGeneralName(jobName, rtype, index string) string

func MaxInt

func MaxInt(x, y int) int

func RecheckDeletionTimestamp

func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error

RecheckDeletionTimestamp returns a CanAdopt() function to recheck deletion.

The CanAdopt() function calls getObject() to fetch the latest value, and denies adoption attempts if that object has a non-nil DeletionTimestamp.

Types

type JobController

type JobController struct {
	Controller apiv1.ControllerInterface

	Config JobControllerConfiguration

	// podControl is used to add or delete pods.
	PodControl control.PodControlInterface

	// serviceControl is used to add or delete services.
	ServiceControl control.ServiceControlInterface

	// KubeClientSet is a standard kubernetes clientset.
	KubeClientSet kubeclientset.Interface

	// VolcanoClientSet is a standard volcano clientset.
	VolcanoClientSet volcanoclient.Interface

	// PodLister can list/get pods from the shared informer's store.
	PodLister corelisters.PodLister

	// ServiceLister can list/get services from the shared informer's store.
	ServiceLister corelisters.ServiceLister

	// PodInformerSynced returns true if the pod store has been synced at least once.
	PodInformerSynced cache.InformerSynced

	// ServiceInformerSynced returns true if the service store has been synced at least once.
	ServiceInformerSynced cache.InformerSynced

	// A TTLCache of pod/services creates/deletes each job expects to see
	// We use Job namespace/name + ReplicaType + pods/services as an expectation key,
	// For example, there is a TFJob with namespace "tf-operator" and name "tfjob-abc":
	// {
	//     "PS": {
	//         "Replicas": 2,
	//     },
	//     "Worker": {
	//         "Replicas": 4,
	//     }
	// }
	// We will create 4 expectations:
	// - "tf-operator/tfjob-abc/ps/services", expects 2 adds.
	// - "tf-operator/tfjob-abc/ps/pods", expects 2 adds.
	// - "tf-operator/tfjob-abc/worker/services", expects 4 adds.
	// - "tf-operator/tfjob-abc/worker/pods", expects 4 adds.
	Expectations expectation.ControllerExpectationsInterface

	// WorkQueue is a rate limited work queue. This is used to queue work to be
	// processed instead of performing it as soon as a change happens. This
	// means we can ensure we only process a fixed amount of resources at a
	// time, and makes it easy to ensure we are never processing the same item
	// simultaneously in two different workers.
	WorkQueue workqueue.RateLimitingInterface

	// Recorder is an event recorder for recording Event resources to the
	// Kubernetes API.
	Recorder record.EventRecorder
}

JobController abstracts other operators to manage the lifecycle of Jobs. User need to first implement the ControllerInterface(objectA) and then initialize a JobController(objectB) struct with objectA as the parameter. And then call objectB.ReconcileJobs as mentioned below, the ReconcileJobs method is the entrypoint to trigger the reconcile logic of the job controller

ReconcileJobs(

job interface{},
replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec,
jobStatus apiv1.JobStatus,
runPolicy *apiv1.RunPolicy) error

func NewJobController

func NewJobController(
	controllerImpl apiv1.ControllerInterface,
	reconcilerSyncPeriod metav1.Duration,
	enableGangScheduling bool,
	kubeClientSet kubeclientset.Interface,
	volcanoClientSet volcanoclient.Interface,
	kubeInformerFactory kubeinformers.SharedInformerFactory,
	workQueueName string) JobController

func (*JobController) AddPod

func (jc *JobController) AddPod(obj interface{})

When a pod is created, enqueue the job that manages it and update its expectations.

func (*JobController) AddService

func (jc *JobController) AddService(obj interface{})

When a service is created, enqueue the controller that manages it and update its expectations.

func (*JobController) CleanupJob added in v0.3.1

func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus, job interface{}) error

func (*JobController) CreateNewService

func (jc *JobController) CreateNewService(job metav1.Object, rtype apiv1.ReplicaType,
	spec *apiv1.ReplicaSpec, index string) error

createNewService creates a new service for the given index and type.

func (*JobController) DeletePdb

func (jc *JobController) DeletePdb(job metav1.Object) error

func (*JobController) DeletePod

func (jc *JobController) DeletePod(obj interface{})

When a pod is deleted, enqueue the job that manages the pod and update its expectations. obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.

func (*JobController) DeletePodGroup

func (jc *JobController) DeletePodGroup(job metav1.Object) error

func (*JobController) DeletePodsAndServices added in v0.3.1

func (jc *JobController) DeletePodsAndServices(runPolicy *apiv1.RunPolicy, job interface{}, pods []*v1.Pod) error

func (*JobController) DeleteService

func (jc *JobController) DeleteService(obj interface{})

When a service is deleted, enqueue the job that manages the service and update its expectations. obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.

func (*JobController) FilterPodsForReplicaType

func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)

FilterPodsForReplicaType returns pods belong to a replicaType.

func (*JobController) FilterServicesForReplicaType

func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error)

FilterServicesForReplicaType returns service belong to a replicaType.

func (*JobController) GenLabels

func (jc *JobController) GenLabels(jobName string) map[string]string

func (*JobController) GenOwnerReference

func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference

func (*JobController) GetPodSlices

func (jc *JobController) GetPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) [][]*v1.Pod

getPodSlices returns a slice, which element is the slice of pod. It gives enough information to caller to make decision to up/down scale resources.

func (*JobController) GetPodsForJob added in v0.3.0

func (jc *JobController) GetPodsForJob(jobObject interface{}) ([]*v1.Pod, error)

getPodsForJob returns the set of pods that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned Pods are pointers into the cache.

func (*JobController) GetPortFromJob

func (jc *JobController) GetPortFromJob(spec *apiv1.ReplicaSpec) (*int32, error)

GetPortFromJob gets the port of job container. Port could be nil depending on different distributed communication strategy

func (*JobController) GetServiceSlices

func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service

getServiceSlices returns a slice, which element is the slice of service. Assume the return object is serviceSlices, then serviceSlices[i] is an array of pointers to services corresponding to Services for replica i.

func (*JobController) GetServicesForJob added in v0.3.0

func (jc *JobController) GetServicesForJob(jobObject interface{}) ([]*v1.Service, error)

getServicesForJob returns the set of services that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned services are pointers into the cache.

func (*JobController) PastActiveDeadline added in v0.3.1

func (jc *JobController) PastActiveDeadline(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus) bool

PastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.

func (*JobController) PastBackoffLimit added in v0.3.1

func (jc *JobController) PastBackoffLimit(jobName string, runPolicy *apiv1.RunPolicy,
	replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec, pods []*v1.Pod) (bool, error)

PastBackoffLimit checks if container restartCounts sum exceeds BackoffLimit this method applies only to pods with restartPolicy == OnFailure or Always

func (*JobController) ReconcileJobs

func (jc *JobController) ReconcileJobs(
	job interface{},
	replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec,
	jobStatus apiv1.JobStatus,
	runPolicy *apiv1.RunPolicy) error

ReconcileJobs checks and updates replicas for each given ReplicaSpec. It will requeue the job in case of an error while creating/deleting pods/services.

func (*JobController) ReconcilePods

func (jc *JobController) ReconcilePods(
	job interface{},
	jobStatus *apiv1.JobStatus,
	pods []*v1.Pod,
	rtype apiv1.ReplicaType,
	spec *apiv1.ReplicaSpec,
	replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) error

ReconcilePods checks and updates pods for each given ReplicaSpec. It will requeue the job in case of an error while creating/deleting pods.

func (*JobController) ReconcileServices

func (jc *JobController) ReconcileServices(
	job metav1.Object,
	services []*v1.Service,
	rtype apiv1.ReplicaType,
	spec *apiv1.ReplicaSpec) error

reconcileServices checks and updates services for each given ReplicaSpec. It will requeue the job in case of an error while creating/deleting services.

func (*JobController) SyncPdb

func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*policyapi.PodDisruptionBudget, error)

SyncPdb will create a PDB for gang scheduling by volcano.

func (*JobController) SyncPodGroup

func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodGroup, error)

func (*JobController) UpdatePod

func (jc *JobController) UpdatePod(old, cur interface{})

When a pod is updated, figure out what job is managing it and wake it up. If the labels of the pod have changed we need to awaken both the old and new replica set. old and cur must be *v1.Pod types.

func (*JobController) UpdateService

func (jc *JobController) UpdateService(old, cur interface{})

When a service is updated, figure out what job/s manage it and wake them up. If the labels of the service have changed we need to awaken both the old and new replica set. old and cur must be *v1.Service types.

type JobControllerConfiguration

type JobControllerConfiguration struct {
	// ReconcilerSyncLoopPeriod is the amount of time the reconciler sync states loop
	// wait between two reconciler sync.
	// It is set to 15 sec by default.
	// TODO(cph): maybe we can let it grows by multiple in the future
	// and up to 5 minutes to reduce idle loop.
	// e.g. 15s, 30s, 60s, 120s...
	ReconcilerSyncLoopPeriod metav1.Duration

	// Enable gang scheduling by volcano
	EnableGangScheduling bool
}

JobControllerConfiguration contains configuration of operator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL