wfl

package module
v1.2.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 5, 2023 License: BSD-2-Clause Imports: 22 Imported by: 25

README

💙💛

wfl - A Simple and Pluggable Workflow Language for Go

Don't mix wfl with WFL.

CircleCI codecov

Creating process, container, pod, task, or job workflows based on raw interfaces of operating systems, Docker, Google Batch, Kubernetes, Cloud Foundry, and HPC job schedulers can be a tedious. Lots of repeating code is required. All workload management systems have a different API.

wfl abstracts away from the underlying details of the processes, containers, and workload management systems. wfl provides a simple, unified interface which allows to quickly define and execute a job workflow and change between different execution backends without changing the workflow itself.

wfl does not come with many features but is simple to use and enough to define and run jobs and job workflows with inter-job dependencies.

In its simplest form a process can be started and waited for:

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("convert", "image.jpg", "image.png").Wait()

If the output of the command needs to be displayed on the terminal you can set the out path in the default JobTemplate (see below) configuration:

 template := drmaa2interface.JobTemplate{
  ErrorPath:  "/dev/stderr",
  OutputPath: "/dev/stdout",
 }
 flow := wfl.NewWorkflow(wfl.NewProcessContextByCfg(wfl.ProcessConfig{
  DefaultTemplate: template,
 }))
 flow.Run("echo", "hello").Wait()

Running a job as a Docker container requires a different context (and the image already pulled before).

    import (
 "github.com/dgruber/drmaa2interface"
 "github.com/dgruber/wfl"
 "github.com/dgruber/wfl/pkg/context/docker"
    )
    
    ctx := docker.NewDockerContextByCfg(docker.Config{DefaultDockerImage: "golang:latest"})
    wfl.NewWorkflow(ctx).Run("sleep", "60").Wait()

Starting a Docker container without a run command which exposes ports requires more configuration which can be provided by using a JobTemplate together with the RunT() method.

    jt := drmaa2interface.JobTemplate{
        JobCategory: "swaggerapi/swagger-editor",
    }
    jt.ExtensionList = map[string]string{"exposedPorts": "80:8080/tcp"}
    
    wfl.NewJob(wfl.NewWorkflow(docker.NewDockerContext())).RunT(jt).Wait()

Starting a Kubernetes batch job and waiting for its end is not much different.

    wfl.NewWorkflow(kubernetes.NewKubernetesContext()).Run("sleep", "60").Wait()

wfl also supports submitting jobs into HPC schedulers like SLURM, Grid Engine and so on.

    wfl.NewWorkflow(libdrmaa.NewLibDRMAAContext()).Run("sleep", "60").Wait()

wfl aims to work for any kind of workload. It works on a Mac and Raspberry Pi the same way as on a high-performance compute cluster. Things missing: On small scale you probably miss data management - moving results from one job to another. That's deliberately not implemented.

There is now support for getting the job output as a string back with the Output() method. It is a convenience wrapper which just reads the job output from a file which must be set before with OutputPath. Note that when having multiple tasks, they need to have different output paths set (hence use RunT(), or different flows, or try the new "{{.ID}}" replacement in the OutputPath). Output() is currently implemented for the OS, Docker, and Kubernetes backend.

Some backend implementations (like for Kubernetes) support basic file transfer in the JobTemplate (when using RunT()) using the StageInFiles and StageOutFiles maps. On large scale you are missing checkpoint and restart functionality or HA of the workflow process itself. Here the idea is not to require any complicated runtime environment for the workflow applications rather keeping workflows small and repeatably executable from other workflows.

wfl works with simple primitives: context, workflow, job, and jobtemplate

Experimental: Jobs can also be processed in job control streams.

First support for logging is also available. Log levels can be controlled by environment variables (export WFL_LOGLEVEL=DEBUG or INFO/WARNING/ERROR/NONE). Applications can use the same logging facility by getting the logger from the workflow (workflow.Logger()) or registering your own logger in a workflow (workflow.SetLogger(Logger interface)). Default is set to ERROR.

Getting Started

Dependencies of wfl (like drmaa2) are vendored in. The only external package required to be installed manually is the drmaa2interface.

    go get github.com/dgruber/drmaa2interface

Context

A context defines the execution backend for the workflow. Contexts can be easily created with the New functions which are defined in the context.go file or in the separate packages found in pkg/context.

For creating a context which executes the jobs of a workflow in operating system processes use:

    wfl.NewProcessContext()

If the workflow needs to be executed in containers the DockerContext can be used:

    docker.NewDockerContext()

If the Docker context needs to be configured with a default Docker image (when Run() is used or RunT() without a configured JobCategory (which is the Docker image)) then the ContextByCfg() can be called.

    docker.NewDockerContextByCfg(docker.Config{DefaultDockerImage: "golang:latest"})

For running jobs either in VMs or in containers in Google Batch the GoogleBatchContext needs to be allocated:

    googlebatch.NewGoogleBatchContextByCfg(
  googlebatch.Config{
   DefaultJobCategory: googlebatch.JobCategoryScript, // default container image Run() is using or script if cmd runs as script
   GoogleProjectID:    "google-project",
   Region:             "europe-north1",
   DefaultTemplate: drmaa2interface.JobTemplate{
    MinSlots: 1, // for MPI set MinSlots = MaxSlots and > 1
    MaxSlots: 1, // for just a bunch of tasks MinSlots = 1 (parallelism) and MaxSlots = <tasks>
   },
  })

When you want to run the workflow as Cloud Foundry tasks the CloudFoundryContext can be used:

    cloudfoundry.NewCloudFoundryContext()

Without a config it uses following environment variables to access the Cloud Foundry cloud controller API:

For submitting Kubernetes batch jobs a Kubernetes context exists.

   ctx := kubernetes.NewKubernetesContext()

Note that each job requires a container image specified which can be done by using the JobTemplate's JobCategory. When the same container image is used within the whole job workflow it makes sense to use the Kubernetes config.

   ctx := kubernetes.NewKubernetesContextByCfg(kubernetes.Config{DefaultImage: "busybox:latest"})

Singularity containers can be executed within the Singularity context. When setting the DefaultImage (like in the Kubernetes Context) then then Run() methods can be used otherwise the Container image must be specified in the JobTemplate's JobCategory field separately for each job. The DefaultImage can always be overridden by the JobCategory. Note that each task / job executes a separate Singularity container process.

   ctx := wfl.NewSingularityContextByCfg(wfl.SingularityConfig{DefaultImage: ""}))

For working with HPC schedulers the libdrmaa context can be used. This context requires libdrmaa.so available in the library path at runtime. Grid Engine ships libdrmaa.so but the LD_LIBRARY_PATH needs to be typically set. For SLURM libdrmaa.so often needs to be build.

Since C go is used under the hood (drmaa2os which uses go drmaa) some compiler flags needs to be set during build time. Those flags depend on the workload manager used. Best check out the go drmaa project for finding the right flags. immeadiately For building SLURM requires:

    export CGO_LDFLAGS="-L$SLURM_DRMAA_ROOT/lib"
    export CGO_CFLAGS="-DSLURM -I$SLURM_DRMAA_ROOT/include"

If all set a libdrmaa context can be created by importing:

   ctx := libdrmaa.NewLibDRMAAContext()

The JobCategory is whatever the workload-manager associates with it. Typically it is a set of submission parameters. A basic example is here.

Workflow

A workflow encapsulates a set of jobs/tasks using the same backend (context). Depending on the execution backend it can be seen as a namespace.

It can be created by using:

    wf := wfl.NewWorkflow(ctx)

Errors during creation can be catched with

    wf := wfl.NewWorkflow(ctx).OnError(func(e error) {panic(e)})

or with

    if wf.HasError() {
        panic(wf.Error())
    }

Job

Jobs are the main objects in wfl. A job defines helper methods for dealing with the workload. Many of those methods return the job object itself to allow chaining calls in an easy way. Errors are stored internally and can be fetched with special methods. A job is as a container and control unit for tasks. Tasks are mapped in most cases to jobs of the underlying workload manager (like in Kubernetes, HPC schedulers etc.) or raw processes or containers.

The Run() method submits a new task and returns immediately, i.e. not waiting for the job to be started or finished. When the Run() method errors the job submission has failed. The Wait() method waits until the task has been finished. If multiple Run() methods are called in a chain, multiple tasks might be executed in parallel (depending on the backend). When the same task should be executed multiple times the RunArray() method might be convenient. When using a HPC workload manager using the LibDRMAA implementation it gets translated to an array job, which is used for submitting and running 10s of thousands of tasks in an HPC clusters (like for bioinformatics or for electronic design automation workloads). Each task gets an unique task number set as environment variable. This is used for accessing specific data sets.

The method RunMatrixT() allows to submit and run multiple tasks based on a job template with placeholders. Those placeholders get replaced with defined values before jobs get submitted. That allows to submit many tasks using different job templates in a convenient way (like for executing a range of commands in a set of different container images for testing).

In some systems it is required to delete job related resources after the job is finished and no more information needs to be queried about its execution. This functionality is implemented in the DRMAA2 Reap() method which can be executed by ReapAll() for each task in the job object. Afterwards the job object should not be used anymore as some information might not be available anymore. In a Kubernetes environment it removes the job objects and potentially related objects like configmaps.

Methods can be classified in blocking, non-blocking, job template based, function based, and error handlers.

Job Submission
Function Name Purpose Blocking Examples
Run() Starts a process, container, or submits a task and comes back immediately no
RunT() Like above but with a JobTemplate as parameter no
RunArray() Submits a bulk job which runs many iterations of the same command no
Resubmit() Submits a job n-times (Run().Run().Run()...) no
RunEvery() Submits a task every d time.Duration yes
RunEveryT() Like RunEvery() but with JobTemplate as param yes
RunMatrixT() Replaces placeholders in the job template and submits combinations no
Job Control
Function Name Purpose Blocking Examples
Suspend() Stops a task from execution (e.g. sending SIGTSTP to the process group)...
Resume() Continues a task (e.g. sending SIGCONT)...
Kill() Stops process (SIGKILL), container, task, job immediately.
Function Execution
Function Name Purpose Blocking Examples
Do() Executes a Go function yes
Then() Waits for end of process and executes a Go function yes
OnSuccess() Executes a function if the task run successfully (exit code 0) yes
OnFailure() Executes a function if the task failed (exit code != 0) yes
OnError() Executes a function if the task could not be created yes
ForAll(f, interface{}) Executes a user defined function on all tasks no
Blocker
Function Name Purpose Blocking Examples
After() Blocks a specific amount of time and continues yes
Wait() Waits until the task submitted latest finished yes
Synchronize() Waits until all submitted tasks finished yes
Output() Waits until the last submitted task is finished and returns the output as string yes Only for process, Docker, and K8s currently.
Job Flow Control
Function Name Purpose Blocking Examples
ThenRun() Wait() (last task finished) followed by an async Run() partially
ThenRunT() ThenRun() with template partially
OnSuccessRun() Wait() if Success() then Run() partially  
OnSuccessRunT() OnSuccessRun() but with template as param partially
OnFailureRun() Wait() if Failed() then Run() partially  
OnFailureRunT() OnFailureRun() but with template as param partially
Retry() wait() + !success() + resubmit() + wait() + !success() yes  
AnyFailed() Cchecks if one of the tasks in the job failed yes  
Job Status and General Checks
Function Name Purpose Blocking Examples
JobID() Returns the ID of the submitted job no  
JobInfo() Returns the DRMAA2 JobInfo of the job no  
Template() no  
State() no  
LastError() no  
Failed() no  
Success() no  
ExitStatus() no  
ReapAll() Cleans up all job related resources from the workload manager. Do not
use the job object afterwards. Calls DRMAA2 Reap() on all tasks. no  
ListAllFailed() Waits for all tasks and returns the failed tasks as DRMAA2 jobs yes
ListAll() Returns all tasks as a slice of DRMAA2 jobs no

JobTemplate

JobTemplates are specifying the details about a job. In the simplest case the job is specified by the application name and its arguments like it is typically done in the OS shell. In that case the Run() methods (ThenRun(), OnSuccessRun(), OnFailureRun()) can be used. Job template based methods (like RunT()) can be completely avoided by providing a default template when creating the context (...ByConfig()). Then each Run() inherits the settings (like JobCategory for the container image name and OutputPath for redirecting output to stdout). If more details for specifying the jobs are required the RunT() methods needs to be used. I'm using currently the DRMAA2 Go JobTemplate. In most cases only RemoteCommand, Args, WorkingDirectory, JobCategory, JobEnvironment, StageInFiles are evaluated. Functionality and semantic is up to the underlying drmaa2os job tracker.

The Template object provides helper functions for job templates and required as generators of job streams. For an example see here.

Examples

For examples please have a look into the examples directory. template is a canonical example of a pre-processing job, followed by parallel execution, followed by a post-processing job.

test is an use case for testing. It compiles all examples with the local go compiler and then within a Docker container using the golang:latest image and reports errors.

cloudfoundry demonstrates how a Cloud Foundry tasks can be created.

Singularity containers can also be created which is helpful when managing a simple Singularity wfl container workflow within a single HPC job either to fully exploit all resources and reduce the amount of HPC jobs.

Creating a Workflow which is Executed as OS Processes

The allocated context defines which workload management system / job execution backend is used.

    ctx := wfl.NewProcessContext()

Different contexts can be used within a single program. That way multi-clustering potentially over different cloud solutions is supported.

Using a context a workflow can be established.

    wfl.NewWorkflow(wfl.NewProcessContext())

Handling an error during workflow generation can be done by specifying a function which is only called in the case of an error.

    wfl.NewWorkflow(wfl.NewProcessContext()).OnError(func(e error) {
  panic(e)
 })

The workflow is used in order to instantiate the first job using the Run() method.

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "123")

But you can also create an initial job like that:

    job := wfl.NewJob(wfl.NewWorkflow(wfl.NewProcessContext()))

For more detailed settings (like resource limits) the DRMAA2 job template can be used as parameter for RunT().

Jobs allow the execution of workload as well as expressing dependencies.

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "2").ThenRun("sleep", "1").Wait()

The line above executes two OS processes sequentially and waits until the last job in chain is finished.

In the following example the two sleep processes are executed in parallel. Wait() only waits for the sleep 1 job. Hence sleep 2 still runs after the wait call comes back!

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "2").Run("sleep", "1").Wait()

Running two jobs in parallel and waiting until all jobs finished can be done with Synchronize().

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "2").Run("sleep", "1").Synchronize()

Jobs can also be suspended (stopped) and resumed (continued) - if supported by the execution backend (like OS, Docker).

    wf.Run("sleep", "1").After(time.Millisecond * 100).Suspend().After(time.Millisecond * 100).Resume().Wait()

The exit status is available as well. ExitStatus() blocks until the previously submitted job is finished.

    wfl.NewWorkflow(ctx).Run("echo", "hello").ExitStatus()

In order to run jobs depending on the exit status the OnFailure and OnSuccess methods can be used:

    wf.Run("false").OnFailureRun("true").OnSuccessRun("false")

For executing a function on a submission error OnError() can be used.

For running multiple jobs on a similar job template (like for test workflows) the RunMatrixT() can be used. It expects a JobTemplate with self-defined placeholders (can be any string). Those placeholders are getting replaced by the lists specified in the Replacements structs. Then any combination of the replacement lists are evaluated and new job templates are generated and submitted.

The following example submits and waits for 4 tasks:

  • sleep 0.1
  • echo 0.1
  • sleep 0.2
  • echo 0.2

If only a list of replacements is required then the second replacement can just left empty (wfl.Replacement{}). For JobTemplate fields with numbers the replacement strings are automatically converted to numbers.

job := flow.NewJob().RunMatrixT(
    drmaa2interface.JobTemplate{
     RemoteCommand: "{{cmd}}",
     Args:          []string{"{{arg}}"},
    },
    wfl.Replacement{
     Fields:       []wfl.JobTemplateField{{wfl.RemoteCommand},

     Pattern:      "{{cmd}}",
     Replacements: []string{"sleep", "echo"},
    },
    wfl.Replacement{
     Fields:       []wfl.JobTemplateField{{wfl.Args},

     Pattern:      "{{arg}}",
     Replacements: []string{"0.1", "0.2"},
    },
   )
job.Synchronize()

More methods can be found in the sources.

Basic Workflow Patterns

Sequence

The successor task runs after the completion of the predecessor task.

    flow := wfl.NewWorkflow(ctx)
    flow.Run("echo", "first task").ThenRun("echo", "second task")
    ...

or

    flow := wfl.NewWorkflow(ctx)
    job := flow.Run("echo", "first task")
    job.Wait()
    job.Run("echo", "second task")
    ...
Parallel Split

After completion of a task run multiple branches of tasks.


    flow := wfl.NewWorkflow(ctx)
    flow.Run("echo", "first task").Wait()

    notifier := wfl.NewNotifier()

    go func() {
        wfl.NewJob(wfl.NewWorkflow(ctx)).
            TagWith("BranchA").
            Run("sleep", "1").
            ThenRun("sleep", "3").
            Synchronize().
            Notify(notifier)
    }

    go func() {
        wfl.NewJob(wfl.NewWorkflow(ctx)).
            TagWith("BranchB").
            Run("sleep", "1").
            ThenRun("sleep", "3").
            Synchronize().
            Notify(notifier)
    }

    notifier.ReceiveJob()
    notifier.ReceiveJob()

    ...
Synchronization of Tasks

Wait until all tasks of a job which are running in parallel are finished.

    flow := wfl.NewWorkflow(ctx)
    flow.Run("echo", "first task").
        Run("echo", "second task").
        Run("echo", "third task").
        Synchronize()

Synchronization of Branches

Wait until all branches of a workflow are finished.


    notifier := wfl.NewNotifier()

    go func() {
        wfl.NewJob(wfl.NewWorkflow(ctx)).
            TagWith("BranchA").
            Run("sleep", "1").
            Wait().
   Notify(notifier)
    }

    go func() {
        wfl.NewJob(wfl.NewWorkflow(ctx)).
            TagWith("BranchB").
            Run("sleep", "1").
            Wait().
   Notify(notifier)
    }

    notifier.ReceiveJob()
    notifier.ReceiveJob()

    ...
Exclusive Choice
    flow := wfl.NewWorkflow(ctx)
    job := flow.Run("echo", "first task")
    job.Wait()

    if job.Success() {
        // do something
    } else {
        // do something else
    }
    ...
Fork Pattern

When a task is finished n tasks needs to be started in parallel.

    job := wfl.NewWorkflow(ctx).Run("echo", "first task").
        ThenRun("echo", "parallel task 1").
        Run("echo", "parallel task 2").
        Run("echo", "parallel task 3")
    ...

or

    flow := wfl.NewWorkflow(ctx)
    
    job := flow.Run("echo", "first task")
    job.Wait()
    for i := 1; i <= 3; i++ {
        job.Run("echo", fmt.Sprintf("parallel task %d", i))
    }
    ...

For missing functionality or bugs please open an issue on github. Contributions welcome!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func TmpFile

func TmpFile() string

TmpFile returns a path to a tmp file in the tmp dir which does not exist yet.

Types

type Context

type Context struct {
	CtxCreationErr     error
	SM                 drmaa2interface.SessionManager
	SMType             SessionManagerType
	DefaultDockerImage string
	// DefaultTemplate contains all default settings for job submission
	// which are copied (if not set) to Run() or RunT() methods
	DefaultTemplate drmaa2interface.JobTemplate
	// ContextTaskID is a number which is incremented for each submitted
	// task. After incrementing and before submitting the task
	// all occurencies of the "{{.ID}}" string in the job template
	// are replaced by the current task ID. Following fields are
	// evaluated: OuputPath, ErrorPath. The workflow can be started
	// with an offset by setting the ContextTaskID to a value > 0.
	ContextTaskID int64
	// Mutext is used for protecting the ContextTaskID
	sync.Mutex
	// JobSessionName is set to "wfl" by default. It can be changed
	// to a custom name. The name is used to create a DRMAA2 session.
	JobSessionName string
}

Context contains a pointer to execution backend and configuration for it.

func DRMAA2SessionManagerContext

func DRMAA2SessionManagerContext(sm drmaa2interface.SessionManager) *Context

DRMAA2SessionManagerContext creates a new Context using any given DRMAA2 Session manager (implementing the drmaa2interface).

func ErrorTestContext

func ErrorTestContext() *Context

ErrorTestContext always returns an error.

func NewProcessContext

func NewProcessContext() *Context

NewProcessContext returns a new *Context which manages processes.

func NewProcessContextByCfg

func NewProcessContextByCfg(cfg ProcessConfig) *Context

NewProcessContextByCfg returns a new *Context which manages processes which is configured by the ProcessConfig.

func NewProcessContextByCfgWithInitParams added in v1.2.7

func NewProcessContextByCfgWithInitParams(cfg ProcessConfig, initParams simpletracker.SimpleTrackerInitParams) *Context

NewProcessContextByCfgWithInitParams returns a new *Context which manages processes which is configured by the ProcessConfig.

func NewRemoteContext added in v1.2.8

func NewRemoteContext(cfg RemoteConfig, initParams *client.ClientTrackerParams) *Context

NewRemoteContext creates a wfl Context for executing jobs through a remote connection. The details of the server must be provided in the initParams.

func NewSingularityContext added in v1.0.1

func NewSingularityContext() *Context

NewSingularityContext creates a new Context which allows to run the jobs in Singularity containers. It only works with JobTemplate based run methods (like RunT()) as it requires the JobCategory set to the the Singularity container image.

func NewSingularityContextByCfg added in v1.0.1

func NewSingularityContextByCfg(cfg SingularityConfig) *Context

NewSingularityContextByCfg creates a new Context which allows to run the jobs in Singularit containers. If the given SingularityConfig has set the DefaultImage to valid Singularity image then the Run() methods are using that container image. That image can be overridden by the RunT() method when setting the JobCategory.

func (*Context) Error

func (c *Context) Error() error

Error returns the error occurred during context creation.

func (*Context) GetNextContextTaskID added in v1.2.12

func (c *Context) GetNextContextTaskID() int64

func (*Context) HasError

func (c *Context) HasError() bool

HasError returns true if an error during context creation happened.

func (*Context) OnError

func (c *Context) OnError(f func(e error)) *Context

OnError executes a function when an error occurred during context creation with the error as parameter.

func (*Context) WithDefaultDockerImage added in v1.2.12

func (c *Context) WithDefaultDockerImage(image string) *Context

func (*Context) WithDefaultJobTemplate added in v1.2.12

func (c *Context) WithDefaultJobTemplate(t drmaa2interface.JobTemplate) *Context

func (*Context) WithSessionName added in v1.2.12

func (c *Context) WithSessionName(jobSessionName string) *Context

type Iterator

Iterator is a function which transforms a JobTemplate when called.

func NewEnvSequenceIterator

func NewEnvSequenceIterator(env string, start, incr int) Iterator

NewEnvSequenceIterator returns an iterator which increments the environment variable env each time when called.

func NewTimeIterator

func NewTimeIterator(d time.Duration) Iterator

NewTimeIterator returns a template iterator which return a job template every d time.

type Job

type Job struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Job defines methods for job life-cycle management. A job is always bound to a workflow which defines the context and job session (logical separation of jobs) of the underlying backend. The Job object allows to create an manage tasks.

func EmptyJob

func EmptyJob() *Job

EmptyJob creates an empty job.

func NewJob

func NewJob(wfl *Workflow) *Job

NewJob creates the initial empty job with the given workflow.

func (*Job) After

func (j *Job) After(d time.Duration) *Job

After blocks the given duration and continues by returning the same job.

func (*Job) AnyFailed

func (j *Job) AnyFailed() bool

AnyFailed returns true when at least job in the whole chain failed.

func (*Job) Do

func (j *Job) Do(f func(job drmaa2interface.Job)) *Job

Do executes a function which gets the DRMAA2 job object as parameter. This allows working with the low-level DRMAA2 job object. In case of an array job submit the function is called on each job in the job array.

func (*Job) Errored added in v1.0.1

func (j *Job) Errored() bool

Errored returns if an error occurred at the last operation.

func (*Job) ExitStatus

func (j *Job) ExitStatus() int

ExitStatus waits until the previously submitted task is finished and returns the exit status of the task. In case of an internal error it returns -1.

func (*Job) ForAll added in v1.2.9

func (j *Job) ForAll(f func(drmaa2interface.Job, interface{}) error, params interface{}) error

ForAll executes a user defined function for each task of the job. The function has an interface as input parameter which can be used to pass additional data into or out of the function as a result (like a pointer to a struct or pointer to an output slice).

func (*Job) HasAnyFailed

func (j *Job) HasAnyFailed() bool

HasAnyFailed returns true if there is any failed task in the chain. Note that the functions implicitly waits until all tasks finsihed.

func (*Job) JobID

func (j *Job) JobID() string

JobID returns the job ID of the previously submitted job.

func (*Job) JobInfo

func (j *Job) JobInfo() drmaa2interface.JobInfo

JobInfo returns information about the last task/job. Which values are actually set depends on the DRMAA2 implementation of the backend specified in the context. TODO job array support

func (*Job) JobInfos

func (j *Job) JobInfos() []drmaa2interface.JobInfo

JobInfos returns all JobInfo objects of all tasks/job run in the workflow. JobInfo contains run-time details of the jobs. The availability of the values depends on the underlying DRMAA2 implementation of the execution Context. TODO job array support

func (*Job) Kill

func (j *Job) Kill() *Job

Kill stops the job from execution.

func (*Job) LastError

func (j *Job) LastError() error

LastError returns the error if occurred during last job operation. Don't use LastError() to find the reason why a job was failing! Check exit code / stderr output etc.

func (*Job) ListAll added in v1.2.9

func (j *Job) ListAll() []drmaa2interface.Job

ListAll returns all tasks as slice of DRMAA2 jobs. If there is no task the function returns an empty slice.

func (*Job) ListAllFailed

func (j *Job) ListAllFailed() []drmaa2interface.Job

ListAllFailed returns all tasks which failed as array of DRMAA2 jobs. Note that it implicitly blocks and waits until all tasks are finished.

func (*Job) Notify

func (j *Job) Notify(n *Notifier) *Job

Notify send the job to a notifier.

func (*Job) Observe

func (j *Job) Observe(o Observer) *Job

Observe executes the functions defined in the Observer when task submission errors, the task failed, and when the job finished successfully. Note that this is a blocking call.

func (*Job) OnError

func (j *Job) OnError(f func(err error)) *Job

OnError executes the given function if the last Job operation resulted in an error (like a job submission failure).

func (*Job) OnFailure

func (j *Job) OnFailure(f func(job drmaa2interface.Job)) *Job

OnFailure executes the given function when the previous task in the list failed. Fails mean the job was started successfully by the system but then existed with an exit code != 0.

When running the task resulted in an error (i.e. the job run function errored), then the function is not executed.

func (*Job) OnFailureRun

func (j *Job) OnFailureRun(cmd string, args ...string) *Job

OnFailureRun submits a task when the previous task ended in a state different than drmaa2interface.Done.

func (*Job) OnFailureRunT

func (j *Job) OnFailureRunT(jt drmaa2interface.JobTemplate) *Job

OnFailureRunT submits a task when the previous job ended in a state different than drmaa2interface.Done.

func (*Job) OnSuccess

func (j *Job) OnSuccess(f func(job drmaa2interface.Job)) *Job

OnSuccess executes the given function after the previously submitted task finished in the drmaa2interface.Done state.

func (*Job) OnSuccessRun

func (j *Job) OnSuccessRun(cmd string, args ...string) *Job

OnSuccessRun submits a task when the previous task ended in the state drmaa2interface.Done.

func (*Job) OnSuccessRunT

func (j *Job) OnSuccessRunT(jt drmaa2interface.JobTemplate) *Job

OnSuccessRunT submits a task when the previous task ended in the state drmaa2interface.Done.

func (*Job) Output added in v1.2.12

func (j *Job) Output() string

Output returns the output of the last task if it is in an end state. In case of OS process backend the output is read from the file specified in the JobTemplate.OutputPath. In case of other backends the output is not available.

func (*Job) ReapAll added in v1.2.3

func (j *Job) ReapAll() *Job

ReapAll removes all job resources from the workload manager. It calls the DRMAA2 Reap() method for all tasks. The behavior is backend specific. After the ReapAll() call the job object should not be used anymore. Reap() must be called only when all tasks are in a terminated state.

func (*Job) Resubmit

func (j *Job) Resubmit(r int) *Job

Resubmit starts the previously submitted task n-times. All tasks are executed in parallel.

func (*Job) Resume

func (j *Job) Resume() *Job

Resume continues a suspended job to continue execution.

func (*Job) Retry

func (j *Job) Retry(r int) *Job

Retry waits until the last task in chain (not for the previous ones) is finished. When it failed it resubmits it and waits again for a successful end.

func (*Job) RetryAnyFailed

func (j *Job) RetryAnyFailed(amount int) *Job

RetryAnyFailed reruns any failed tasks and replaces them with a new task incarnation.

func (*Job) Run

func (j *Job) Run(cmd string, args ...string) *Job

Run submits a task which executes the given command and args. The command needs to be available on the execution backend.

func (*Job) RunArray added in v1.2.1

func (j *Job) RunArray(begin, end, step, maxParallel int, cmd string, args ...string) *Job

RunArray executes the given command multiple times. If begin is set to 1 end to 10, and step to 1, it executes the command 10 times. Each job run gets a different internal array job task ID environment variable set which depends on the backend. The maxParallel parameter is respected only by some backends. It restricts the parallel execution to that amount of commands at any given time. If set to 1 it forces sequential execution. If not required it should be set to the total amount of tasks specified.

func (*Job) RunArrayT added in v1.2.6

func (j *Job) RunArrayT(begin, end, step, maxParallel int, jt drmaa2interface.JobTemplate) *Job

RunArrayT executes the job defined in a JobTemplate multiple times. See also RunArray().

func (*Job) RunEvery

func (j *Job) RunEvery(d time.Duration, end time.Time, cmd string, args ...string) error

RunEvery provides the same functionally like RunEveryT but the job is created based on the given command with the arguments.

func (*Job) RunEveryT

func (j *Job) RunEveryT(d time.Duration, end time.Time, jt drmaa2interface.JobTemplate) error

RunEveryT submits a job every d time.Duration regardless if the previously job is still running or finished or failed. The method only aborts and returns an error if an error during job submission happened and the job could not be submitted.

func (*Job) RunMatrixT added in v1.2.9

func (j *Job) RunMatrixT(jt drmaa2interface.JobTemplate, x, y Replacement) *Job

RunMatrixT executes the job defined in a JobTemplate exactly len(x.Replacement)*len(y.Replacement) times. It generates a new job template for each combination of replacements executed from x and y on the given JobTemplate.

Example: Submit two different commands (sleep 1 and sleep 2) in two different container images, having 4 jobs in total submitted.

j.RunMatrixT(drmaa2interface.JobTemplate{
		JobCategory: "{{image}}",
		RemoteCommand: "sleep",
		Args: []string{"{{arg}}",
	}, wfl.Replacement{
		Fields: []string{"JobCategory"},
		Pattern: "{{image}}",
		Replacements: []string{"busybox:latest", "golang:latest"},
	}, wfl.Replacement{
		Fields: []string{"Args"},
		Pattern: "{{arg}}",
		Replacements: []string{"1", "2"},
	}).WaitAll()

func (*Job) RunT

func (j *Job) RunT(t drmaa2interface.JobTemplate) *Job

RunT submits a task given specified with the JobTemplate.

func (*Job) State

func (j *Job) State() drmaa2interface.JobState

State returns the current state of the job previously submitted.

func (*Job) Success

func (j *Job) Success() bool

Success returns true in case the current task stated equals drmaa2interface.Done and the job exit status is 0.

func (*Job) Suspend

func (j *Job) Suspend() *Job

Suspend stops the last task of the job from execution. How this is done depends on the Context. Typically a signal (like SIGTSTP) is sent to the tasks of the job.

func (*Job) Synchronize

func (j *Job) Synchronize() *Job

Synchronize waits until the tasks of the job are finished. All jobs are terminated when the call returns.

func (*Job) Tag

func (j *Job) Tag() string

Tag returns the tag of the job.

func (*Job) TagWith

func (j *Job) TagWith(tag string) *Job

TagWith tags a job with a string for identification. Global for all tasks of the job.

func (*Job) Template

func (j *Job) Template() *drmaa2interface.JobTemplate

Template returns the JobTemplate of the previous job submission.

func (*Job) Then

func (j *Job) Then(f func(job drmaa2interface.Job)) *Job

Then waits until the previous task is terminated and executes the given function by providing the DRMAA2 job interface which gives access to the low-level DRMAA2 job methods.

func (*Job) ThenRun

func (j *Job) ThenRun(cmd string, args ...string) *Job

ThenRun waits until the previous task is terminated and then executes the given command as new task.

func (*Job) ThenRunArray added in v1.2.1

func (j *Job) ThenRunArray(begin, end, step, maxParallel int, cmd string, args ...string) *Job

ThenRunArray waits until the previous task is terminated and then executes a new task based on the given JobTemplate.

func (*Job) ThenRunT

func (j *Job) ThenRunT(jt drmaa2interface.JobTemplate) *Job

ThenRunT waits until the previous task is terminated and then executes a new task based on the given JobTemplate.

func (*Job) Wait

func (j *Job) Wait() *Job

Wait until the most recent task is finished. In case of a job array it waits for all tasks of the array.

type JobTemplateField added in v1.2.9

type JobTemplateField string
const (
	RemoteCommand     JobTemplateField = "RemoteCommand"
	Args              JobTemplateField = "Args"
	SubmitAsHold      JobTemplateField = "SubmitAsHold"
	ReRunnable        JobTemplateField = "ReRunnable"
	JobEnvironment    JobTemplateField = "JobEnvironment"
	WorkingDirectory  JobTemplateField = "WorkingDirectory"
	JobCategory       JobTemplateField = "JobCategory"
	Email             JobTemplateField = "Email"
	EmailOnStarted    JobTemplateField = "EmailOnStarted"
	EmailOnTerminated JobTemplateField = "EmailOnTerminated"
	JobName           JobTemplateField = "JobName"
	InputPath         JobTemplateField = "InputPath"
	OutputPath        JobTemplateField = "OutputPath"
	ErrorPath         JobTemplateField = "ErrorPath"
	JoinFiles         JobTemplateField = "JoinFiles"
	ReservationID     JobTemplateField = "ReservationID"
	QueueName         JobTemplateField = "QueueName"
	MinSlots          JobTemplateField = "MinSlots"
	MaxSlots          JobTemplateField = "MaxSlots"
	Priority          JobTemplateField = "Priority"
	CandidateMachines JobTemplateField = "CandidateMachines"
	MinPhysMemory     JobTemplateField = "MinPhysMemory"
	MachineOS         JobTemplateField = "MachineOS"
	MachineArch       JobTemplateField = "MachineArch"
	StartTime         JobTemplateField = "StartTime"
	DeadlineTime      JobTemplateField = "DeadlineTime"
	StageInFiles      JobTemplateField = "StageInFiles"
	StageOutFiles     JobTemplateField = "StageOutFiles"
	ResourceLimits    JobTemplateField = "ResourceLimits"
	AccountingID      JobTemplateField = "AccountingID"
)

type Notifier

type Notifier struct {
	// contains filtered or unexported fields
}

func NewNotifier

func NewNotifier() *Notifier

NewNotifier creates a job notifier which allows to synchronize multiple job workflows executed concurrently in go functions. Note that there is an internal buffer of 1024 jobs which causes SendJob() to block if the buffer is full.

func (*Notifier) Destroy

func (n *Notifier) Destroy()

Destroy closes the job channel inside the notfier.

func (*Notifier) ReceiveJob

func (n *Notifier) ReceiveJob() *Job

ReceiveJob returns a job sent to the notifier.

func (*Notifier) SendJob

func (n *Notifier) SendJob(job *Job)

SendJob sends a job to the notifier.

type Observer

type Observer struct {
	ErrorHandler   func(error)
	FailedHandler  func(drmaa2interface.Job)
	SuccessHandler func(drmaa2interface.Job)
}

Observer is a collection of functions which implements behavior which should be executed when a task submission failed, when the task failed, or when then the job was running successfully.

func NewDefaultObserver

func NewDefaultObserver() Observer

NewDefaultObserver returns an Observer which panics when a task submission error occurred, prints a message and exits the application when the task exits with error code != 0, and prints a message and continues when a task was running successfully.

type ProcessConfig

type ProcessConfig struct {
	// DBFile is the local file which contains the internal state DB.
	DBFile string
	// DefaultTemplate contains the default job submission settings if
	// not overridden by the RunT() like methods.
	DefaultTemplate drmaa2interface.JobTemplate
	// PersistentJobStorage keeps job state on disk. This slows down
	// job submission but prevents waiting forever for processes which
	// disappeared
	PersistentJobStorage bool
	// JobDBFile is used when PersistentJobStorage is set to true. It must
	// be different from DBFile.
	JobDBFile string
}

ProcessConfig contains the configuration for the process context.

type RemoteConfig added in v1.2.8

type RemoteConfig struct {
	LocalDBFile string // job session DB file
	// DefaultTemplate contains the default job submission settings if
	// not overridden by the RunT() like methods.
	DefaultTemplate drmaa2interface.JobTemplate
}

type Replacement added in v1.2.9

type Replacement struct {
	// Fields are JobTemplate field names which are evaluated for
	// the pattern to get replaced. Special fields are:
	// - allStrings - all fields which are strings, string slices,
	// or string maps are going to be searched for the pattern
	// which is then replaced by one of the replacements.
	Fields []JobTemplateField
	// Pattern defines a string in the job template which is going to be
	// replaced by the value of the replacement string.
	Pattern string
	// Replacements defines all values the Pattern is going to be replaced
	// in the job template. For each replacement a new job template is
	// created and submitted.
	Replacements []string
}

Replacement defines the fields and the values to be replaced in the workflow JobTemplate. The len(Replacement) defines how many job templates are generated for this replacement instruction.

type SessionManagerType added in v1.2.12

type SessionManagerType int
const (
	// DefaultSessionManager handles jobs as processes
	DefaultSessionManager SessionManagerType = iota
	// DockerSessionManager manages Docker containers
	DockerSessionManager
	// CloudFoundrySessionManager manages Cloud Foundry application tasks
	CloudFoundrySessionManager
	// KubernetesSessionManager creates Kubernetes jobs
	KubernetesSessionManager
	// SingularitySessionManager manages Singularity containers
	SingularitySessionManager
	// SlurmSessionManager manages slurm jobs as cli commands
	SlurmSessionManager
	// LibDRMAASessionManager manages jobs through libdrmaa.so
	LibDRMAASessionManager
	// PodmanSessionManager manages jobs as podman containers either locally or remote
	PodmanSessionManager
	// RemoteSessionManager manages jobs over the network through a remote server
	RemoteSessionManager
	// ExternalSessionManager can be used by external JobTracker implementations
	// during development time before they get added here
	ExternalSessionManager
	// GoogleBatchSessionManager manages Google Cloud Batch jobs
	GoogleBatchSessionManager
	// MPIOperatorSessionManager manages jobs as MPI operator jobs on Kubernetes
	MPIOperatorSessionManager
)

type SingularityConfig added in v1.0.1

type SingularityConfig struct {
	DefaultImage    string
	DBFile          string
	DefaultTemplate drmaa2interface.JobTemplate
}

SingularityConfig contains the default settings for the Singularity containers.

type Template

type Template struct {
	Jt drmaa2interface.JobTemplate
	// contains filtered or unexported fields
}

Template is a higher level job template for simplifying creating dynamically JobTemplates.

func NewTemplate

func NewTemplate(jt drmaa2interface.JobTemplate) *Template

NewTemplate creates a Template out of a drmaa2interface.JobTemplate

func (*Template) AddIterator

func (t *Template) AddIterator(name string, itr Iterator) *Template

AddIterator registers an interation function which transforms the internal JobTemplate into another JobTemplate. The function is called each time when Next() is called. Multiple Iterators can be registered. The execution order or the Iterators is undefined and does not depend on the registration order.

func (*Template) AddMap

func (t *Template) AddMap(name string, f Iterator) *Template

AddMap registers a mapping function (same as Iterator) which converts the underlying DRMAA2 JobTemplate into a specific form. In difference to the iterator functions it does not make any persistent changes to the job template. Its intention is to cover the differencens required in the job template so that a job can run on different backends.

func (*Template) MapTo

func (t *Template) MapTo(system string) drmaa2interface.JobTemplate

MapTo transforms the JobTemplate and returns it. It does not make changes to the underlying Template.

func (*Template) Next

Next applies all registered Iterators to the internal job template and returns the next version of the job template.

func (*Template) NextMap

func (t *Template) NextMap(name string) drmaa2interface.JobTemplate

NextMap applies all registered Iterators to the internal job template and finally does a temporary mapping of the job template with the mapping function specified.

type Workflow

type Workflow struct {
	// contains filtered or unexported fields
}

Workflow contains the backend context and a job session. The DRMAA2 job session provides typically logical isolation between jobs.

func NewWorkflow

func NewWorkflow(context *Context) *Workflow

NewWorkflow creates a new Workflow based on the given execution context. Internally it creates a DRMAA2 JobSession which is used for separating jobs.

func (*Workflow) Error

func (w *Workflow) Error() error

Error returns the error if happened during creating a job session or opening a job session.

func (*Workflow) HasError

func (w *Workflow) HasError() bool

HasError returns true if there was an error during creating a job session or opening a job session.

func (*Workflow) ListJobs added in v1.2.4

func (w *Workflow) ListJobs() []*Job

ListJobs returns all jobs visible in the workflow (i.e. available in the underlying drmaa2session). It may wrap one task in one Job object and return multiple Job objects even when only one Job with many tasks was submitted.

func (*Workflow) Logger added in v1.0.1

func (w *Workflow) Logger() log.Logger

Logger return the current logger of the workflow.

func (*Workflow) NewJob added in v1.2.9

func (w *Workflow) NewJob() *Job

NewJob creates a new empty Job object for the given workflow. Equivalent to NewJob(*Workflow).

func (*Workflow) OnError

func (w *Workflow) OnError(f func(e error)) *Workflow

OnError executes a function if happened during creating a job session or opening a job session.

func (*Workflow) Run

func (w *Workflow) Run(cmd string, args ...string) *Job

Run submits the first task in the workflow and returns the Job object. Same as NewJob(w).Run().

func (*Workflow) RunArrayJob added in v1.2.1

func (w *Workflow) RunArrayJob(begin, end, step, maxParallel int, cmd string, args ...string) *Job

RunArrayJob executes the given command multiple times as specified with begin, end, and step. To run a command 10 times, begin can be set to 1, end to 10 and step to 1. maxParallel can limit the amount of executions which are running in parallel if supported by the context. The process context sets the TASK_ID env variable to the task ID.

func (*Workflow) RunArrayJobT added in v1.2.6

func (w *Workflow) RunArrayJobT(begin, end, step, maxParallel int, jt drmaa2interface.JobTemplate) *Job

RunArrayJob executes the given job defined in the JobTemplate multiple times. See RunArrayJob().

func (*Workflow) RunMatrixT added in v1.2.9

func (w *Workflow) RunMatrixT(jt drmaa2interface.JobTemplate, x, y Replacement) *Job

func (*Workflow) RunT

func (w *Workflow) RunT(jt drmaa2interface.JobTemplate) *Job

RunT submits the first task in the workflow and returns the Job object. Same as NewJob(w).RunT().

func (*Workflow) SetLogLevel added in v1.2.9

func (w *Workflow) SetLogLevel(logLevel log.LogLevel) *Workflow

SetLogLevel changes the log level filter for the workflow. The default log level is log.Warning.

func (*Workflow) SetLogger added in v1.0.1

func (w *Workflow) SetLogger(log log.Logger) *Workflow

SetLogger sets a new logger for the workflow which writes processes internal log messages. Note that nil loggers are not accepted.

Example: w.SetLogger(log.NewKlogLogger("INFO"))

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL