kubernetestracker

package
v0.3.31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2023 License: Apache-2.0 Imports: 27 Imported by: 3

README

Kubernetes Tracker

Implements the JobTracker interface for Kubernetes batch jobs. The JobTracker is a building block for implementing the DRMAA2 interface for Kubernetes.

Introduction

The Kubernetes tracker provides methods for managing sets of grouped batch jobs (within JobSessions). JobSessions are implemented by using labels attached to batch job objects ("drmaa2jobsession") refering to the JobSession name.

Namespaces other than "default" can be used when initializing the NewKubernetesSessionManager with KubernetesTrackerParameters instead of just a ClientSet or nil.

Functionality

Notes

In the past Kubernetes batch jobs didn't play very well with sidecars. So when using older Kubernetes versions and things like istio you might run in state issues (sidecar container is still running after batch job finished).

Job Control Mapping
DRMAA2 Job Control Kubernetes
Suspend Unsupported
Resume Unsupported
Terminate Delete() - leads to Undetermined state
Hold Unsupported
Release Unsupported

Job.Reap() removes the Kubernetes job and related objects from Kubernetes.

State Mapping

Based on JobStatus

DRMAA2 State. Kubernetes Job State
Done status.Succeeded >= 1
Failed status.Failed >= 1
Suspended -
Running status.Active >= 1
Queued -
Undetermined other / Terminate()
Job Template Mapping
DRMAA2 JobTemplate Kubernetes Batch Job
RemoteCommand v1.Container.Command[0]
Args v1.Container.Args
CandidateMachines[0] v1.Container.Hostname
JobCategory v1.Container.Image
WorkingDir v1.Container.WorkingDir
JobName Note: If set and a job with the same name exists in history submission will fail. metadata: Name
DeadlineTime AbsoluteTime converted to relative time (v1.Container.ActiveDeadlineSeconds)
JobEnvironment v1.EnvVar

Using ExtensionList key "env-from-secrets" (or "env-from-secret") will map the ":" separated secrets listed in the map's values as enviornment variables in the job container. The secrets must exist. (use _extension.JobTemplateK8sEnvFromSecret as key)

Using ExtensionList key "env-from-configmaps" (or "env-from-configmap") will map the ":" separated configmaps listed in the map's values as enviornment variables in the job container. The configmaps must exist. (use extension.JobTemplateK8sEnvFromConfigMap as key)

The job's terminal output is available when the job is in a finished state (failed or done) by the JobInfo extension key "output" (extension.JobInfoK8sJSessionJobOutput)

 if jobInfo.ExtensionList != nil {
  jobOutput, exists := jobInfo.ExtensionList[extension.JobInfoK8sJSessionJobOutput]
  if exists {
   fmt.Printf("Output of the job: %s\n", jobOutput)
  }
 }
File staging using the Job Template

Data movement is not on core focus of the DRMAA2 standard, but it nevertheless defines two string based maps for file staging. In HPC systems data movement is usually done through parallel or network filesystems. Cloud based systems are often using services like S3, GCS etc.

In order to simplify data movement between two pods the StageInFiles and StageOutFiles maps defined in the Job Template are enhanced for smoother Kubernetes integration. Both maps can specifiy to either move data from the DRMAA2 process (or workflow host) to the Kubernetes jobs, move data between two Kubernetes jobs, or transfer data back from a Kubernetes job to the local host. Note, that some of the machanisms have limitations by itself (like relying on Kubernetes etcd when using ConfigMaps which has storage limits itself).

StageInFiles and StageOutFiles have following scheme:

  • Map key specifies the target (in the container), as the target is unique.
  • Map value specifies the data source.

Output can also be fetched through JobInfo when the job is in a terminated state. Here the container logs are made accessible in the JobInfo extension "output".

Following source definition of StageInFiles are currently implemented:

  • "configmap-data:base64encodedstring" can be used to pass a byte array from the workflow process to the job. Internally a ConfigMap with the data is created in the target cluster. The ConfigMap is deleted with the job.Reap() (Delete()) call. The ConfigMap is mounted to the file path specified in the target definition.
  • "secret-data:base64encodedstring" can be used to pass a byte array from the workflow process to the job. Internally a Kubernetes secret with the data is created in the target cluster. The Secret is removed with the job.Reap() (Delete()) call. Note, that the content of the Secret is not stored in the JobTemplate ConfigMap in the cluster.
  • "hostpath:/path/to/host/directory "can be used to mount a directory from the host where the Kubernetes job is executed inside of the job's pod. This requires that the job has root privileges which can be requested with the JobTemplate's extension "privileged".
  • "configmap:name" Mounts an existing ConfigMap into the directory specified as target
  • "pvc:name" Mounts an existing PVC with into the directory specified as target in the map.
  • There are more like "gce-disk", "gce-disk-read", "storageclass", "nfs" (for GoogleFilestore)... which work similarly. Please check the convert.go file. They can also be used for staging out data or as shared storage between multiple jobs.

Target definitions of StageInFiles:

  • /path/to/file - the path of the file in which the data from the source definition is available (for configmap-data and secret-data)
  • /path/to/directory - a path to a directory is required when using a "hostpath" directory as data source or a pre-existing ConfigMap or Secret.

Example:

The value in the map is the type of the volume (like secret or configmap) followed by a colon and the Go base64 encoded content. The key of the map must contain the path where the file is mounted inside the job's pod. Note that only one file can be generated with the same content as the key in the map is unique.

Example:

jobtemplate.StageInFiles = map[string]string{
    "/path/file.txt": "configmap-data:"+base64.StdEncoding.EncodeToString([]byte("content")),
    "/path/password.txt": "secret-data:"+base64.StdEncoding.EncodeToString([]byte("secret")),
    "/container/local/dir": "hostpath:/some/directory",
}
Job Template extensions
Extension key Extension value
"labels" "key=value,key2=value2" v1.Labels
"scheduler" poseidon, kube-batch or any other k8s scheduler
"privileged" "true" or "TRUE"; runs container in privileged mode
"pullpolicy" overrides image pull policy; "always", "never", "ifnotpresent" (in any uppercase, lowercase format)
"distribution" Required for accelerators: "aks", "gke", or "eks"
"accelerator" GPU (or other type) request: "1*nvidia-tesla-v100". For aks it requires a number prefix but the type string can be arbitrary but not empty. Sets resource limits, node selector, tolerations.

Example:

jobtemplate.ExtensionList = map[string]string{
    "labels": "key=value",
    "privileged": "true",
}

Required for JobTemplate:

  • RemoteCommand
  • JobCategory as it specifies the image

Other implicit settings:

  • Parallelism: 1
  • Completions: 1
  • BackoffLimit: 1
Job Info Mapping
DRMAA2 JobInfo. Kubernetes
ExitStatus 0 or 1 (1 if between 1 and 255 / not supported in Status)
SubmissionTime job.CreationTimestamp.Time
DispatchTime job.Status.StartTime.Time
FinishTime job.Status.CompletionTime.Time
State see above
JobID v1.Job.UID

Documentation

Index

Constants

View Source
const K8S_JT_EXTENSION_LABELS = "labels"
View Source
const K8S_JT_EXTENSION_NAMESPACE = "namespace"

Variables

This section is empty.

Functions

func DRMAA2State

func DRMAA2State(jc batchv1.JobInterface, jobid string) drmaa2interface.JobState

func GetExitStatusOfJobContainer added in v0.3.20

func GetExitStatusOfJobContainer(cs kubernetes.Interface, namespace, podName string) (int32, int32, string, error)

GetExitStatusOfJobContainer returns the exit status of a job container.

func GetFirstPod added in v0.3.20

func GetFirstPod(pods []corev1.Pod) corev1.Pod

GetFirstPod is required when a job is deleted by deadline time. Here we have no control about restarts (seeing 3) as deadling takes precedence over backoff limits in Kubernetes. So, when deadline time is used there is no guarantee that we only have one pod.

func GetJobOutput added in v0.3.8

func GetJobOutput(cs kubernetes.Interface, namespace string, jobID, podName string) ([]byte, error)

GetJobOutput returns the output of a job pod after after it has been finished.

func GetLastStartedPod added in v0.3.20

func GetLastStartedPod(pods []corev1.Pod) corev1.Pod

func GetMachineNameForPod added in v0.3.20

func GetMachineNameForPod(cs kubernetes.Interface, namespace, podName string) (string, error)

func GetPodsForJob added in v0.3.20

func GetPodsForJob(cs kubernetes.Interface, namespace, jobID string) ([]corev1.Pod, error)

func IsComplete added in v0.3.24

func IsComplete(c *[]v1.JobCondition) bool

func IsDeadlineTimeException added in v0.3.24

func IsDeadlineTimeException(c []v1.JobCondition) bool

func JobToJobInfo

func JobToJobInfo(jc batchv1.JobInterface, jobid string) (drmaa2interface.JobInfo, error)

JobToJobInfo converts a kubernetes job to a DRMAA2 JobInfo representation.

func NewAllocator added in v0.3.0

func NewAllocator() *allocator

func NewClientSet

func NewClientSet() (*kubernetes.Clientset, error)

NewClientSet create a new clientset by parsing the .kube/config file in the home directory.

Types

type KubernetesTracker

type KubernetesTracker struct {
	// contains filtered or unexported fields
}

func New

func New(jobsession string, namespace string, cs *kubernetes.Clientset) (*KubernetesTracker, error)

New creates a new KubernetesTracker either by using a given kubernetes Clientset or by allocating a new one (if the parameter is zero).

func (*KubernetesTracker) AddArrayJob

func (kt *KubernetesTracker) AddArrayJob(jt drmaa2interface.JobTemplate, begin int, end int, step int, maxParallel int) (string, error)

func (*KubernetesTracker) AddJob

AddJob converts the given DRMAA2 job template into a batchv1.Job and creates the job within Kubernetes.

func (*KubernetesTracker) CloseMonitoringSession added in v0.3.16

func (kt *KubernetesTracker) CloseMonitoringSession(name string) error

func (*KubernetesTracker) DeleteJob

func (kt *KubernetesTracker) DeleteJob(jobid string) error

DeleteJob removes a finished job and the objects created along with the job (like configmaps and secrets) Kubernetes.

func (*KubernetesTracker) GetAllJobIDs added in v0.3.16

func (kt *KubernetesTracker) GetAllJobIDs(filter *drmaa2interface.JobInfo) ([]string, error)

func (*KubernetesTracker) GetAllMachines added in v0.3.16

func (kt *KubernetesTracker) GetAllMachines(filter []string) ([]drmaa2interface.Machine, error)

func (*KubernetesTracker) GetAllQueueNames added in v0.3.16

func (kt *KubernetesTracker) GetAllQueueNames(filter []string) ([]string, error)

GetAllQueueNames returns all namespaces. If filter is != nil it returns only existing namespaces which are defined by the filter.

func (*KubernetesTracker) JobControl

func (kt *KubernetesTracker) JobControl(jobid, state string) error

JobControl changes the state of the given job by execution the given action (suspend, resume, hold, release, terminate).

func (*KubernetesTracker) JobInfo

func (kt *KubernetesTracker) JobInfo(jobID string) (drmaa2interface.JobInfo, error)

func (*KubernetesTracker) JobInfoFromMonitor added in v0.3.16

func (kt *KubernetesTracker) JobInfoFromMonitor(id string) (ji drmaa2interface.JobInfo, err error)

JobInfoFromMonitor might collect job state and job info in a different way as a JobSession with persistent storage does

func (*KubernetesTracker) JobState

func (kt *KubernetesTracker) JobState(jobID string) (drmaa2interface.JobState, string, error)

func (*KubernetesTracker) ListArrayJobs

func (kt *KubernetesTracker) ListArrayJobs(id string) ([]string, error)

func (*KubernetesTracker) ListJobCategories

func (kt *KubernetesTracker) ListJobCategories() ([]string, error)

ListJobCategories returns all container images which are currently found in the cluster. That does not mean that other container images can not be used.

func (*KubernetesTracker) ListJobs

func (kt *KubernetesTracker) ListJobs() ([]string, error)

ListJobs returns a list of job IDs associated with the current DRMAA2 job session.

func (*KubernetesTracker) OpenMonitoringSession added in v0.3.16

func (kt *KubernetesTracker) OpenMonitoringSession(name string) error

func (*KubernetesTracker) Wait

func (kt *KubernetesTracker) Wait(jobid string, timeout time.Duration, states ...drmaa2interface.JobState) error

Wait returns when the job is in one of the given states or when a timeout occurs (errors then).

type KubernetesTrackerParameters added in v0.3.7

type KubernetesTrackerParameters struct {
	Namespace string // if not set it will become "default"
	ClientSet *kubernetes.Clientset
}

KubernetesTrackerParameters can be used as parameter in NewKubernetesSessionManager. Note, that the namespace if set must exist. If not set the "default" namespace is used.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL