Documentation ¶
Index ¶
- Constants
- func BatchJobToJobInfo(project string, job *batchpb.Job) (drmaa2interface.JobInfo, error)
- func ConvertJobState(job *batchpb.Job) (drmaa2interface.JobState, string, error)
- func ConvertJobTemplateToJobRequest(session, project, location string, jt drmaa2interface.JobTemplate) (*batchpb.CreateJobRequest, error)
- func CopyFileFromBucket(bucket string, file string, localFile string) error
- func CopyFileToBucket(bucket string, file string, localFile string) error
- func CreateMissingStageOutBuckets(project string, stageOutFiles map[string]string) error
- func CreateRunnables(barriers bool, prolog string) []*batchpb.Runnable
- func DefaultCPUMilli(machine string) int64
- func DeleteBucket(bucket string) error
- func DeleteFileInBucket(bucket, file string) error
- func GetAcceleratorsExtension(jt drmaa2interface.JobTemplate) (string, int64, bool)
- func GetDockerOptionsExtension(jt drmaa2interface.JobTemplate) (string, bool)
- func GetJobOutput(projectID, jobUid string, limit int64) ([]string, error)
- func GetJobTemplateExtensionFromJobInfo(ji drmaa2interface.JobInfo) (drmaa2interface.JobTemplate, bool)
- func GetJobTemplateFromBase64(base64encondedJT string) (drmaa2interface.JobTemplate, error)
- func GetMachineEpilogExtension(jt drmaa2interface.JobTemplate) (string, bool)
- func GetMachinePrologExtension(jt drmaa2interface.JobTemplate) (string, bool)
- func GetSecretEnvironmentVariables(jt drmaa2interface.JobTemplate) (map[string]string, bool)
- func GetSpotExtension(jt drmaa2interface.JobTemplate) (bool, bool)
- func GetTasksPerNodeExtension(jt drmaa2interface.JobTemplate) (int64, bool)
- func GetUIDExtensionFromJobInfo(ji drmaa2interface.JobInfo) (string, bool)
- func IsInDRMAA2Session(client *batch.Client, session string, jobID string) bool
- func IsInJobSession(session string, job *batchpb.Job) bool
- func JobTemplateToEnv(jt drmaa2interface.JobTemplate) (string, error)
- func MountBucket(jobRequest *batchpb.CreateJobRequest, execPosition int, ...) *batchpb.CreateJobRequest
- func NewAllocator() *allocator
- func ReadFromBucket(bucket string, file string) ([]byte, error)
- func SetAcceleratorsExtension(jt drmaa2interface.JobTemplate, count int64, accelerator string) drmaa2interface.JobTemplate
- func SetDockerOptionsExtension(jt drmaa2interface.JobTemplate, dockerOptions string) drmaa2interface.JobTemplate
- func SetMachineEpilogExtension(jt drmaa2interface.JobTemplate, epilog string) drmaa2interface.JobTemplate
- func SetMachinePrologExtension(jt drmaa2interface.JobTemplate, prolog string) drmaa2interface.JobTemplate
- func SetSecretEnvironmentVariables(jt drmaa2interface.JobTemplate, secretEnv map[string]string) (drmaa2interface.JobTemplate, error)
- func SetSpotExtension(jt drmaa2interface.JobTemplate, isSpot bool) drmaa2interface.JobTemplate
- func SetTasksPerNodeExtension(jt drmaa2interface.JobTemplate, count int64) drmaa2interface.JobTemplate
- func TimesFromStatusEvents(events []*batchpb.StatusEvent) (dispatchTime, finishTime time.Time)
- func ValidateJobTemplate(jt drmaa2interface.JobTemplate) (drmaa2interface.JobTemplate, error)
- func WriteToBucket(bucket string, file string, content []byte) error
- type GCPBatchTracker
- func (t *GCPBatchTracker) AddArrayJob(jt drmaa2interface.JobTemplate, begin int, end int, step int, maxParallel int) (string, error)
- func (t *GCPBatchTracker) AddJob(jt drmaa2interface.JobTemplate) (string, error)
- func (t *GCPBatchTracker) CloseMonitoringSession(name string) error
- func (t *GCPBatchTracker) DeleteJob(jobID string) error
- func (t *GCPBatchTracker) GetAllJobIDs(filter *drmaa2interface.JobInfo) ([]string, error)
- func (t *GCPBatchTracker) GetAllMachines(filter []string) ([]drmaa2interface.Machine, error)
- func (t *GCPBatchTracker) GetAllQueueNames(filter []string) ([]string, error)
- func (t *GCPBatchTracker) JobControl(jobID string, action string) error
- func (t *GCPBatchTracker) JobInfo(jobID string) (drmaa2interface.JobInfo, error)
- func (t *GCPBatchTracker) JobInfoFromMonitor(jobID string) (drmaa2interface.JobInfo, error)
- func (t *GCPBatchTracker) JobOutput(jobID string, lastNLines int64) ([]string, error)
- func (t *GCPBatchTracker) JobState(jobID string) (drmaa2interface.JobState, string, error)
- func (t *GCPBatchTracker) JobTemplate(jobID string) (drmaa2interface.JobTemplate, error)
- func (t *GCPBatchTracker) ListArrayJobs(arrayjobID string) ([]string, error)
- func (t *GCPBatchTracker) ListJobCategories() ([]string, error)
- func (t *GCPBatchTracker) ListJobs() ([]string, error)
- func (t *GCPBatchTracker) OpenMonitoringSession(name string) error
- func (t *GCPBatchTracker) Wait(jobID string, timeout time.Duration, state ...drmaa2interface.JobState) error
- type GoogleBatchTrackerParams
Constants ¶
const ( // ExtensionJobInfoJobTemplate is the job template stored in the job info // extension list as base64 encoded string ExtensionJobInfoJobTemplate = "jobtemplate_base64" // ExtensionJobInfoJobUID is the Google Batch internal job UID ExtensionJobInfoJobUID = "uid" )
const ( // job categories (otherwise it is a container image) JobCategoryScriptPath = "$scriptpath$" // treats RemoteCommand as path to script and ignores args JobCategoryScript = "$script$" // treats RemoteCommand as script and ignores args // Env variable name container job template EnvJobTemplate = "DRMAA2_JOB_TEMPLATE" )
const ( // ResourceLimitRuntime is the key for the ResourceLimits // map which defines the maximum runtime of a job. The // value is a string which can be parsed by time.ParseDuration. // Like "1m30s" for 1 minute 30 seconds. ResourceLimitRuntime = "runtime" // ResourceLimitBootDisk is the key for the ResourceLimits // map which defines the boot disk size of a job. The // value is a string which can be parsed by strconv.ParseInt // with base 10. The unit is MiB. ResourceLimitBootDisk = "bootdiskmib" // ResourceLimitCPUMilli is the key for the ResourceLimits // map which defines the CPU milli of a job. The // value is a string which can be parsed by strconv.ParseInt // with base 10. The unit is milli cores. Like 8000 for 8 cores. ResourceLimitCPUMilli = "cpumilli" )
const ( ExtensionProlog = "prolog" ExtensionEpilog = "epilog" ExtensionSpot = "spot" ExtensionAccelerators = "accelerators" ExtensionTasksPerNode = "tasks_per_node" ExtensionDockerOptions = "docker_options" ExtensionGoogleSecretEnv = "secret_env" )
const (
BatchTaskLogs = "batch_task_logs"
)
Variables ¶
This section is empty.
Functions ¶
func BatchJobToJobInfo ¶
func ConvertJobState ¶
func ConvertJobTemplateToJobRequest ¶
func ConvertJobTemplateToJobRequest(session, project, location string, jt drmaa2interface.JobTemplate) (*batchpb.CreateJobRequest, error)
func CopyFileFromBucket ¶ added in v0.1.2
CopyFileFromBucket reads the content of an object from a bucket and writes it to a local file. It expects the bucket name to be prefixed with gs:// and not contain any other slashes.
func CopyFileToBucket ¶ added in v0.1.2
CopyFileToBucket writes the content of a local file to a bucket. It expects the bucket name to be prefixed with gs:// and not contain any other slashes.
func DefaultCPUMilli ¶ added in v0.2.3
DefaultCPUMilli returns the CPU resource limit in milli cores which fits to the given machine type.
func DeleteBucket ¶ added in v0.1.2
DeleteBucket deletes a bucket. It expects the bucket name prefixed with gs://. The bucket must be empty to be deleted.
func DeleteFileInBucket ¶ added in v0.1.2
DeleteFileInBucket deletes a file in a bucket. It expects the bucket name prefixed with gs://. The file is the name of the file in the bucket (could be like testpath/testfile.txt).
func GetAcceleratorsExtension ¶
func GetAcceleratorsExtension(jt drmaa2interface.JobTemplate) (string, int64, bool)
func GetDockerOptionsExtension ¶
func GetDockerOptionsExtension(jt drmaa2interface.JobTemplate) (string, bool)
func GetJobOutput ¶ added in v0.2.3
func GetJobTemplateExtensionFromJobInfo ¶ added in v0.2.3
func GetJobTemplateExtensionFromJobInfo(ji drmaa2interface.JobInfo) (drmaa2interface.JobTemplate, bool)
GetJobTemplateExtensionFromJobInfo returns the job template which is stored in the job info extension list. If the job info does not contain a job template extension it returns false.
func GetJobTemplateFromBase64 ¶ added in v0.1.1
func GetJobTemplateFromBase64(base64encondedJT string) (drmaa2interface.JobTemplate, error)
func GetMachineEpilogExtension ¶
func GetMachineEpilogExtension(jt drmaa2interface.JobTemplate) (string, bool)
func GetMachinePrologExtension ¶
func GetMachinePrologExtension(jt drmaa2interface.JobTemplate) (string, bool)
func GetSecretEnvironmentVariables ¶ added in v0.2.1
func GetSecretEnvironmentVariables(jt drmaa2interface.JobTemplate) (map[string]string, bool)
func GetSpotExtension ¶
func GetSpotExtension(jt drmaa2interface.JobTemplate) (bool, bool)
func GetTasksPerNodeExtension ¶
func GetTasksPerNodeExtension(jt drmaa2interface.JobTemplate) (int64, bool)
func GetUIDExtensionFromJobInfo ¶ added in v0.2.3
func GetUIDExtensionFromJobInfo(ji drmaa2interface.JobInfo) (string, bool)
GetUIDExtensionFromJobInfo returns the Google Batch Job UID which is stored in the job info extension list. If the job info does not contain a UID extension it returns false.
func IsInDRMAA2Session ¶
func JobTemplateToEnv ¶ added in v0.1.1
func JobTemplateToEnv(jt drmaa2interface.JobTemplate) (string, error)
func MountBucket ¶
func MountBucket(jobRequest *batchpb.CreateJobRequest, execPosition int, destination, source string) *batchpb.CreateJobRequest
MountBucket mounts a bucket into the job request. The source is the bucket name prefixed with gs:// and the destination is the mount path inside the host or container.
func NewAllocator ¶
func NewAllocator() *allocator
func ReadFromBucket ¶ added in v0.1.1
ReadFromBucket reads the content of an object from a bucket. This is a convenience function to read files, like output files from a bucket. The bucket name must be prefixed with gs:// and must not contain any other slashes.
func SetAcceleratorsExtension ¶
func SetAcceleratorsExtension(jt drmaa2interface.JobTemplate, count int64, accelerator string) drmaa2interface.JobTemplate
func SetDockerOptionsExtension ¶
func SetDockerOptionsExtension(jt drmaa2interface.JobTemplate, dockerOptions string) drmaa2interface.JobTemplate
func SetMachineEpilogExtension ¶
func SetMachineEpilogExtension(jt drmaa2interface.JobTemplate, epilog string) drmaa2interface.JobTemplate
func SetMachinePrologExtension ¶
func SetMachinePrologExtension(jt drmaa2interface.JobTemplate, prolog string) drmaa2interface.JobTemplate
func SetSecretEnvironmentVariables ¶ added in v0.2.1
func SetSecretEnvironmentVariables(jt drmaa2interface.JobTemplate, secretEnv map[string]string) (drmaa2interface.JobTemplate, error)
SetSecretEnvironmentVariables sets environment variables which are retrieved from Google Secret Manager as JobTemplate extenion. The map key is the environment variable name and the value is the path to the secret (like "projects/dev/secrets/secret_message/versions/1")
func SetSpotExtension ¶
func SetSpotExtension(jt drmaa2interface.JobTemplate, isSpot bool) drmaa2interface.JobTemplate
func SetTasksPerNodeExtension ¶
func SetTasksPerNodeExtension(jt drmaa2interface.JobTemplate, count int64) drmaa2interface.JobTemplate
func TimesFromStatusEvents ¶ added in v0.2.3
func TimesFromStatusEvents(events []*batchpb.StatusEvent) (dispatchTime, finishTime time.Time)
func ValidateJobTemplate ¶
func ValidateJobTemplate(jt drmaa2interface.JobTemplate) (drmaa2interface.JobTemplate, error)
Types ¶
type GCPBatchTracker ¶
type GCPBatchTracker struct {
// contains filtered or unexported fields
}
GCPBatchTracker implements the JobTracker interface so that it can be used as backend in drmaa2os project.
func NewGCPBatchTracker ¶
func NewGCPBatchTracker(drmaa2session string, project, location string) (*GCPBatchTracker, error)
NewGCPBatchTracker returns a new GCPBatchTracker instance which is used for managing jobs in Google Batch. The project and location parameters define the Google Cloud project and the location (like "us-central1"). The drmaa2session parameter is optional and can be used to filter for jobs which are in the same job session. If the job session is "" then all jobs are made visible. GCPBatchTracker implements the JobTracker interface so that it can be used as backend in drmaa2os project and wfl.
func (*GCPBatchTracker) AddArrayJob ¶
func (t *GCPBatchTracker) AddArrayJob(jt drmaa2interface.JobTemplate, begin int, end int, step int, maxParallel int) (string, error)
AddArrayJob makes a mass submission of jobs defined by the same job template. Many HPC workload manager support job arrays for submitting 10s of thousands of similar jobs by one call. The additional parameters define how many jobs are submitted by defining a TASK_ID range. Begin is the first task ID (like 1), end is the last task ID (like 10), step is a positive integeger which defines the increments from one task ID to the next task ID (like 1). maxParallel is an arguments representating an optional functionality which instructs the backend to limit maxParallel tasks of this job arary to run in parallel. Note, that jobs use the TASK_ID environment variable to identifiy which task they are and determine that way what to do (like which data set is accessed).
With Google Batch job arrays can be created by using MinSlots and MaxSlots in AddJob(). MaxSlots defines the number of tasks in the job array. MinSlots defines the number of tasks which are run in parallel (for MPI).
func (*GCPBatchTracker) AddJob ¶
func (t *GCPBatchTracker) AddJob(jt drmaa2interface.JobTemplate) (string, error)
AddJob creates a Google Batch job which is defined by the DRMAA2 job template. Job names must be unique in Google Batch hence it is automatically created by the backend. The CandidateMachines field is used to define the machine type (like "n2-standard-2") to be used. Exactly one machine type must be specified. The ResourceLimits field is used to define the CPU and runtime limits. On success the job ID (job name) is returned.
func (*GCPBatchTracker) CloseMonitoringSession ¶
func (t *GCPBatchTracker) CloseMonitoringSession(name string) error
func (*GCPBatchTracker) DeleteJob ¶
func (t *GCPBatchTracker) DeleteJob(jobID string) error
DeleteJob removes a job from a potential internal database. It does not stop a job. A job must be in an endstate (terminated, failed) in order to call DeleteJob. In case of an error or the job is not in an end state error must be returned. If the backend does not support cleaning up resources for a finished job nil should be returned.
func (*GCPBatchTracker) GetAllJobIDs ¶
func (t *GCPBatchTracker) GetAllJobIDs(filter *drmaa2interface.JobInfo) ([]string, error)
func (*GCPBatchTracker) GetAllMachines ¶
func (t *GCPBatchTracker) GetAllMachines(filter []string) ([]drmaa2interface.Machine, error)
func (*GCPBatchTracker) GetAllQueueNames ¶
func (t *GCPBatchTracker) GetAllQueueNames(filter []string) ([]string, error)
func (*GCPBatchTracker) JobControl ¶
func (t *GCPBatchTracker) JobControl(jobID string, action string) error
JobControl sends a request to the backend to either "terminate", "suspend", "resume", "hold", or "release" a job. The strings are fixed and are defined by the JobControl constants. This could change in the future to be limited only to constants representing the actions. When the request is not accepted by the system the function must return an error.
func (*GCPBatchTracker) JobInfo ¶
func (t *GCPBatchTracker) JobInfo(jobID string) (drmaa2interface.JobInfo, error)
JobInfo returns the job status of a job in form of a JobInfo struct or an error.
func (*GCPBatchTracker) JobInfoFromMonitor ¶
func (t *GCPBatchTracker) JobInfoFromMonitor(jobID string) (drmaa2interface.JobInfo, error)
JobInfoFromMonitor might collect job state and job info in a different way as a JobSession with persistent storage does
func (*GCPBatchTracker) JobOutput ¶ added in v0.2.3
func (t *GCPBatchTracker) JobOutput(jobID string, lastNLines int64) ([]string, error)
JobOutput is not part of JobTracker interface but it could be a future JobOutputer interface extension. This would be also useful for k8s, Docker, and other JobTracker which currently store the output as JobInfo extension. If lastNLines is 0 then all lines are returned.
func (*GCPBatchTracker) JobState ¶
func (t *GCPBatchTracker) JobState(jobID string) (drmaa2interface.JobState, string, error)
JobState returns the DRMAA2 state and substate (free form string) of the job.
func (*GCPBatchTracker) JobTemplate ¶ added in v0.1.1
func (t *GCPBatchTracker) JobTemplate(jobID string) (drmaa2interface.JobTemplate, error)
func (*GCPBatchTracker) ListArrayJobs ¶
func (t *GCPBatchTracker) ListArrayJobs(arrayjobID string) ([]string, error)
ListArrayJobs returns all job IDs an job array ID (or array job ID) represents or an error.
func (*GCPBatchTracker) ListJobCategories ¶
func (t *GCPBatchTracker) ListJobCategories() ([]string, error)
ListJobCategories returns a list of job categories which can be used in the JobCategory field of the job template. The list is informational. An example is returning a list of supported container images. AddJob() and AddArrayJob() processes a JobTemplate and hence also the JobCategory field.
JobCategories supported by Google Batch are all container images which can be used by the service. If "$script$" is used as JobCategory then the RemoteCommand field of the JobTemplate is used as script. If "$script_path$" is used as JobCategory then the RemoteCommand field of the JobTemplate is used as path to a script which is executed.
func (*GCPBatchTracker) ListJobs ¶
func (t *GCPBatchTracker) ListJobs() ([]string, error)
ListJobs returns all visible job IDs or an error.
func (*GCPBatchTracker) OpenMonitoringSession ¶
func (t *GCPBatchTracker) OpenMonitoringSession(name string) error
func (*GCPBatchTracker) Wait ¶
func (t *GCPBatchTracker) Wait(jobID string, timeout time.Duration, state ...drmaa2interface.JobState) error
Wait blocks until the job is either in one of the given states, the max. waiting time (specified by timeout) is reached or an other internal error occured (like job was not found). In case of a timeout also an error must be returned.
type GoogleBatchTrackerParams ¶
GoogleBatchTrackerParams provide parameters which can be passed to the SessionManager in order to pass things like Google project or region into the job tracker. It needs to be that complicated in order to be used but not tightly integrated with the SessionManager of the DRMAA2OS project, so that not all depenedencies have to be compiled in.