Documentation ¶
Index ¶
- func GenExpectationPodsKey(jobKey, replicaType string) string
- func GenExpectationServicesKey(jobKey, replicaType string) string
- func GenGeneralName(jobName, rtype, index string) string
- func RecheckDeletionTimestamp(getObject func() (metav1.Object, error)) func() error
- type ControllerInterface
- type JobController
- func (jc *JobController) AddPod(obj interface{})
- func (jc *JobController) AddService(obj interface{})
- func (jc *JobController) DeletePdb(job metav1.Object) error
- func (jc *JobController) DeletePod(obj interface{})
- func (jc *JobController) DeleteService(obj interface{})
- func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)
- func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error)
- func (jc *JobController) GenLabels(jobName string) map[string]string
- func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference
- func (jc *JobController) GetPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) [][]*v1.Pod
- func (jc *JobController) GetPodsForJob(job metav1.Object) ([]*v1.Pod, error)
- func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service
- func (jc *JobController) GetServicesForJob(job metav1.Object) ([]*v1.Service, error)
- func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error)
- func (jc *JobController) UpdatePod(old, cur interface{})
- func (jc *JobController) UpdateService(old, cur interface{})
- type JobControllerConfiguration
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenExpectationPodsKey ¶
func GenGeneralName ¶
func RecheckDeletionTimestamp ¶
RecheckDeletionTimestamp returns a CanAdopt() function to recheck deletion.
The CanAdopt() function calls getObject() to fetch the latest value, and denies adoption attempts if that object has a non-nil DeletionTimestamp.
Types ¶
type ControllerInterface ¶
type ControllerInterface interface { // Returns the Controller name ControllerName() string // Returns the GroupVersionKind of the API GetAPIGroupVersionKind() schema.GroupVersionKind // Returns the GroupVersion of the API GetAPIGroupVersion() schema.GroupVersion // Returns the Group Name(key) in the labels of the job GetGroupNameLabelKey() string // Returns the Job Name(key) in the labels of the job GetJobNameLabelKey() string // Returns the Group Name(value) in the labels of the job GetGroupNameLabelValue() string // Returns the Replica Type(key) in the labels of the job GetReplicaTypeLabelKey() string // Returns the Replica Index(value) in the labels of the job GetReplicaIndexLabelKey() string // Returns the Job from Informer Cache GetJobFromInformerCache(namespace, name string) (metav1.Object, error) // Returns the Job from API server GetJobFromAPIClient(namespace, name string) (metav1.Object, error) }
Common Interface to be implemented by all operators.
type JobController ¶
type JobController struct { Controller ControllerInterface Config JobControllerConfiguration // podControl is used to add or delete pods. PodControl controller.PodControlInterface // serviceControl is used to add or delete services. ServiceControl control.ServiceControlInterface // kubeClientSet is a standard kubernetes clientset. KubeClientSet kubeclientset.Interface // podLister can list/get pods from the shared informer's store. PodLister corelisters.PodLister // serviceLister can list/get services from the shared informer's store. ServiceLister corelisters.ServiceLister // podInformerSynced returns true if the pod store has been synced at least once. PodInformerSynced cache.InformerSynced // serviceInformerSynced returns true if the service store has been synced at least once. ServiceInformerSynced cache.InformerSynced // A TTLCache of pod/services creates/deletes each job expects to see // We use Job namespace/name + ReplicaType + pods/services as an expectation key, // For example, there is a TFJob with namespace "tf-operator" and name "tfjob-abc": // { // "PS": { // "Replicas": 2, // }, // "Worker": { // "Replicas": 4, // } // } // We will create 4 expectations: // - "tf-operator/tfjob-abc/ps/services", expects 2 adds. // - "tf-operator/tfjob-abc/ps/pods", expects 2 adds. // - "tf-operator/tfjob-abc/worker/services", expects 4 adds. // - "tf-operator/tfjob-abc/worker/pods", expects 4 adds. Expectations controller.ControllerExpectationsInterface // workQueue is a rate limited work queue. This is used to queue work to be // processed instead of performing it as soon as a change happens. This // means we can ensure we only process a fixed amount of resources at a // time, and makes it easy to ensure we are never processing the same item // simultaneously in two different workers. WorkQueue workqueue.RateLimitingInterface // recorder is an event recorder for recording Event resources to the // Kubernetes API. Recorder record.EventRecorder }
JobController abstracts other operators to manage the lifecycle of Jobs.
func NewJobController ¶
func NewJobController( controllerImpl ControllerInterface, reconcilerSyncPeriod metav1.Duration, enableGangScheduling bool, kubeClientSet kubeclientset.Interface, kubeInformerFactory kubeinformers.SharedInformerFactory, workQueueName string) JobController
func (*JobController) AddPod ¶
func (jc *JobController) AddPod(obj interface{})
When a pod is created, enqueue the job that manages it and update its expectations.
func (*JobController) AddService ¶
func (jc *JobController) AddService(obj interface{})
When a service is created, enqueue the controller that manages it and update its expectations.
func (*JobController) DeletePod ¶
func (jc *JobController) DeletePod(obj interface{})
When a pod is deleted, enqueue the job that manages the pod and update its expectations. obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
func (*JobController) DeleteService ¶
func (jc *JobController) DeleteService(obj interface{})
When a service is deleted, enqueue the job that manages the service and update its expectations. obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (*JobController) FilterPodsForReplicaType ¶
func (jc *JobController) FilterPodsForReplicaType(pods []*v1.Pod, replicaType string) ([]*v1.Pod, error)
FilterPodsForReplicaType returns pods belong to a replicaType.
func (*JobController) FilterServicesForReplicaType ¶
func (jc *JobController) FilterServicesForReplicaType(services []*v1.Service, replicaType string) ([]*v1.Service, error)
FilterServicesForReplicaType returns service belong to a replicaType.
func (*JobController) GenLabels ¶
func (jc *JobController) GenLabels(jobName string) map[string]string
func (*JobController) GenOwnerReference ¶
func (jc *JobController) GenOwnerReference(obj metav1.Object) *metav1.OwnerReference
func (*JobController) GetPodSlices ¶
getPodSlices returns a slice, which element is the slice of pod.
func (*JobController) GetPodsForJob ¶
getPodsForJob returns the set of pods that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned Pods are pointers into the cache.
func (*JobController) GetServiceSlices ¶
func (jc *JobController) GetServiceSlices(services []*v1.Service, replicas int, logger *log.Entry) [][]*v1.Service
getServiceSlices returns a slice, which element is the slice of service. Assume the return object is serviceSlices, then serviceSlices[i] is an array of pointers to services corresponding to Services for replica i.
func (*JobController) GetServicesForJob ¶
getServicesForJob returns the set of services that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned services are pointers into the cache.
func (*JobController) SyncPdb ¶
func (jc *JobController) SyncPdb(job metav1.Object, minAvailableReplicas int32) (*v1beta1.PodDisruptionBudget, error)
SyncPdb will create a PDB for gang scheduling by kube-batch.
func (*JobController) UpdatePod ¶
func (jc *JobController) UpdatePod(old, cur interface{})
When a pod is updated, figure out what tfjob/s manage it and wake them up. If the labels of the pod have changed we need to awaken both the old and new replica set. old and cur must be *v1.Pod types.
func (*JobController) UpdateService ¶
func (jc *JobController) UpdateService(old, cur interface{})
When a service is updated, figure out what job/s manage it and wake them up. If the labels of the service have changed we need to awaken both the old and new replica set. old and cur must be *v1.Service types.
type JobControllerConfiguration ¶
type JobControllerConfiguration struct { // ReconcilerSyncLoopPeriod is the amount of time the reconciler sync states loop // wait between two reconciler sync. // It is set to 15 sec by default. // TODO(cph): maybe we can let it grows by multiple in the future // and up to 5 minutes to reduce idle loop. // e.g. 15s, 30s, 60s, 120s... ReconcilerSyncLoopPeriod metav1.Duration // Enable gang scheduling by kube-batch EnableGangScheduling bool }
JobControllerConfiguration contains configuration of operator.