Documentation ¶
Overview ¶
Package controller provides a Kubernetes controller for a TFJob resource.
Index ¶
- Constants
- func ContainsChiefOrMasterSpec(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) bool
- func GetPortFromTFJob(tfJob *kubeflowv1.TFJob, rtype commonv1.ReplicaType) (int32, error)
- type ClusterSpec
- type SparseClusterSpec
- type SparseTFConfig
- type TFConfig
- type TFJobReconciler
- func (r *TFJobReconciler) ControllerName() string
- func (r *TFJobReconciler) DeleteJob(job interface{}) error
- func (r *TFJobReconciler) GetAPIGroupVersion() schema.GroupVersion
- func (r *TFJobReconciler) GetAPIGroupVersionKind() schema.GroupVersionKind
- func (r *TFJobReconciler) GetDefaultContainerName() string
- func (r *TFJobReconciler) GetDefaultContainerPortName() string
- func (r *TFJobReconciler) GetGroupNameLabelValue() string
- func (r *TFJobReconciler) GetJobFromAPIClient(namespace, name string) (metav1.Object, error)
- func (r *TFJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error)
- func (r *TFJobReconciler) GetPodsForJob(jobObject interface{}) ([]*corev1.Pod, error)
- func (r *TFJobReconciler) GetServicesForJob(jobObject interface{}) ([]*corev1.Service, error)
- func (r *TFJobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, ...) bool
- func (r *TFJobReconciler) IsWorker0Completed(tfJob *kubeflowv1.TFJob, ...) (bool, error)
- func (r *TFJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error)
- func (r *TFJobReconciler) ReconcilePods(job interface{}, jobStatus *commonv1.JobStatus, pods []*v1.Pod, ...) error
- func (r *TFJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error
- func (r *TFJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error
- func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, ...) error
- func (r *TFJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatus *commonv1.JobStatus) error
- type TaskSpec
Constants ¶
const ( FailedDeleteJobReason = "FailedDeleteJob" SuccessfulDeleteJobReason = "SuccessfulDeleteJob" )
const ( // EnvCustomClusterDomain is the custom defined cluster domain, such as "svc.cluster.local". // Ref: https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/#a-records EnvCustomClusterDomain = "CUSTOM_CLUSTER_DOMAIN" )
Variables ¶
This section is empty.
Functions ¶
func ContainsChiefOrMasterSpec ¶
func ContainsChiefOrMasterSpec(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) bool
ContainsChiefOrMasterSpec returns true if the tfjob contains chief or master spec.
func GetPortFromTFJob ¶
func GetPortFromTFJob(tfJob *kubeflowv1.TFJob, rtype commonv1.ReplicaType) (int32, error)
GetPortFromTFJob gets the port of tensorflow container.
Types ¶
type ClusterSpec ¶
ClusterSpec represents a cluster TensorFlow specification. https://www.tensorflow.org/deploy/distributed#create_a_tftrainclusterspec_to_describe_the_cluster It is a map from job names to network addresses.
type SparseClusterSpec ¶
SparseClusterSpec enables a server to be configured without needing to know the identity of (for example) all other worker tasks. https://www.tensorflow.org/api_docs/python/tf/train/ClusterSpec
type SparseTFConfig ¶
type SparseTFConfig struct { Cluster SparseClusterSpec `json:"sparseCluster"` Task TaskSpec `json:"task"` }
type TFConfig ¶
type TFConfig struct { // Cluster represents a TensorFlow ClusterSpec. // See: https://www.tensorflow.org/api_docs/python/tf/train/ClusterSpec Cluster ClusterSpec `json:"cluster"` Task TaskSpec `json:"task"` // Environment is used by tensorflow.contrib.learn.python.learn in versions <= 1.3 // TODO(jlewi): I don't think it is used in versions TF >- 1.4. So we can eventually get rid of it. Environment string `json:"environment"` }
TFConfig is a struct representing the distributed TensorFlow config. This struct is turned into an environment variable TF_CONFIG which is used by TensorFlow processes to configure themselves. https://www.tensorflow.org/api_docs/python/tf/estimator/RunConfig#methods https://cloud.google.com/ml-engine/docs/tensorflow/distributed-training-details
type TFJobReconciler ¶
type TFJobReconciler struct { common.JobController client.Client Scheme *runtime.Scheme Log logr.Logger // contains filtered or unexported fields }
TFJobReconciler reconciles a TFJob object
func NewReconciler ¶
func NewReconciler(mgr manager.Manager, gangSchedulingSetupFunc common.GangSchedulingSetupFunc) *TFJobReconciler
func (*TFJobReconciler) ControllerName ¶
func (r *TFJobReconciler) ControllerName() string
func (*TFJobReconciler) DeleteJob ¶
func (r *TFJobReconciler) DeleteJob(job interface{}) error
func (*TFJobReconciler) GetAPIGroupVersion ¶
func (r *TFJobReconciler) GetAPIGroupVersion() schema.GroupVersion
func (*TFJobReconciler) GetAPIGroupVersionKind ¶
func (r *TFJobReconciler) GetAPIGroupVersionKind() schema.GroupVersionKind
func (*TFJobReconciler) GetDefaultContainerName ¶
func (r *TFJobReconciler) GetDefaultContainerName() string
func (*TFJobReconciler) GetDefaultContainerPortName ¶
func (r *TFJobReconciler) GetDefaultContainerPortName() string
func (*TFJobReconciler) GetGroupNameLabelValue ¶
func (r *TFJobReconciler) GetGroupNameLabelValue() string
func (*TFJobReconciler) GetJobFromAPIClient ¶
func (r *TFJobReconciler) GetJobFromAPIClient(namespace, name string) (metav1.Object, error)
func (*TFJobReconciler) GetJobFromInformerCache ¶
func (r *TFJobReconciler) GetJobFromInformerCache(namespace, name string) (metav1.Object, error)
func (*TFJobReconciler) GetPodsForJob ¶
func (r *TFJobReconciler) GetPodsForJob(jobObject interface{}) ([]*corev1.Pod, error)
GetPodsForJob returns the set of pods that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned Pods are pointers into the cache.
func (*TFJobReconciler) GetServicesForJob ¶
func (r *TFJobReconciler) GetServicesForJob(jobObject interface{}) ([]*corev1.Service, error)
GetServicesForJob returns the set of services that this job should manage. It also reconciles ControllerRef by adopting/orphaning. Note that the returned services are pointers into the cache.
func (*TFJobReconciler) IsMasterRole ¶
func (r *TFJobReconciler) IsMasterRole(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rtype commonv1.ReplicaType, index int) bool
func (*TFJobReconciler) IsWorker0Completed ¶
func (r *TFJobReconciler) IsWorker0Completed(tfJob *kubeflowv1.TFJob, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) (bool, error)
IsWorker0Completed returns true if pod of worker0 succeeded and exited with 0
func (*TFJobReconciler) Reconcile ¶
Reconcile is part of the main kubernetes reconciliation loop which aims to move the current state of the cluster closer to the desired state.
func (*TFJobReconciler) ReconcilePods ¶
func (r *TFJobReconciler) ReconcilePods( job interface{}, jobStatus *commonv1.JobStatus, pods []*v1.Pod, rtype commonv1.ReplicaType, spec *commonv1.ReplicaSpec, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, ) error
In order to minimize the changes, we copy TFController's logic here to override kubeflow/commons reconcile logic This should be removed later unless TF has specific logics there reconcilePods checks and updates pods for each given TFReplicaSpec. It will requeue the tfjob in case of an error while creating/deleting pods.
func (*TFJobReconciler) SetClusterSpec ¶
func (r *TFJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error
Same as Func (tc *TFController) SetClusterSpec(...) in pod.go
func (*TFJobReconciler) SetupWithManager ¶
func (r *TFJobReconciler) SetupWithManager(mgr ctrl.Manager, controllerThreads int) error
SetupWithManager sets up the controller with the Manager.
func (*TFJobReconciler) UpdateJobStatus ¶
func (r *TFJobReconciler) UpdateJobStatus(job interface{}, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, jobStatus *commonv1.JobStatus) error
func (*TFJobReconciler) UpdateJobStatusInApiServer ¶
func (r *TFJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobStatus *commonv1.JobStatus) error