core

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2018 License: GPL-3.0 Imports: 20 Imported by: 4

Documentation

Index

Constants

View Source
const (

	// HotsubHostRoot is a mount point mounted by containers on this host machine.
	HotsubHostRoot = "/tmp"

	// HotsubContainerRoot is a root directory inside each container.
	// This path is passed as "HOTSUB_ROOT" to workflows,
	// wish you don't need to refer "HOTSUB_ROOT" inside your workflow.
	HotsubContainerRoot = "/tmp"

	// HotsubSharedDirectoryPath is a path-prefix from ContainerRoot,
	// in which all the shared data are located.
	HotsubSharedDirectoryPath = "__shared"

	// HotsubSharedInstanceMountPoint is a mount point mountedd by containers of computing instances.
	HotsubSharedInstanceMountPoint = "/tmp"

	// CreateMaxRetry represents max count for retrying `docker-machine create`
	CreateMaxRetry = 6

	// ContainerMaxRetry represents max count for retrying operations inside docker containers,
	// such as "pull image", "exec create" and "exec create".
	// See https://github.com/otiai10/daap/commit/8b5dfbd93d169c0ae30ce30ea23f81e97f009f7f
	// for more information.
	ContainerMaxRetry = 4
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Component

type Component struct {

	// Identity specifies the unique identity of this component.
	Identity Identity

	// Jobs represent specific set of jobs which should be executed on this component.
	Jobs []*Job

	// CommonParameters represents the common env (and TODO: input, output) for all the workflow containers.
	CommonParameters *Parameters

	// SharedData ...
	SharedData *SharedData

	// Machine represents the spec of machines on which each job is executed.
	Machine *Machine

	Runtime struct {
		Image  *Image
		Script *Script
	}

	// JobLoggerFactory is an interface to specify logger for each job.
	JobLoggerFactory LoggerFactory

	// Concurrency for creating machines.
	// We estimate that creating machines is the most costly process in job lifecycle,
	// therefore, this "Concurrency" prop should throttle the concurrency of them.
	Concurrency int64
}

Component represents a independent workflow component, handling only 1 input set.

func RootComponentTemplate

func RootComponentTemplate(name string) *Component

RootComponentTemplate ...

func (*Component) Destroy

func (component *Component) Destroy() error

Destroy ...

func (*Component) Prepare

func (component *Component) Prepare() error

Prepare ...

func (*Component) Run

func (component *Component) Run(ctx context.Context) error

Run executes all the jobs recursively. The concurrency of creating machines is managed here.

type Driver

type Driver struct {
	// contains filtered or unexported fields
}

Driver represents Job Driver.

func (*Driver) Attach

func (d *Driver) Attach() error

Attach [lifecycle] mount onto SharedDataInstance.

func (*Driver) Create

func (d *Driver) Create() error

Create [lifecycle] create an instance.

func (*Driver) Destroy

func (d *Driver) Destroy() error

Destroy [lifecycle] delete this instance.

func (*Driver) Exec

func (d *Driver) Exec() error

Exec [lifecycle] execute user defined process specified with Image and Script.

func (*Driver) Fetch

func (d *Driver) Fetch() error

Fetch [lifecycle] download input files and translate URLs to local file paths.

func (*Driver) Init

func (d *Driver) Init() error

Init [lifecycle] initialize containers inside the instance.

func (*Driver) Push

func (d *Driver) Push() error

Push [lifecycle] upload output files to the specified URL.

type Env

type Env struct {
	Name  string `json:"name"`
	Value string `json:"value"`
}

Env represent an envirionment variable.

func (Env) Pair

func (env Env) Pair() string

Pair returns key=value pair string. TODO: better name

type Identity

type Identity struct {
	Timestamp int64
	Name      string
	Prefix    string
	Index     int
}

Identity ...

func NewIdentity

func NewIdentity(prefix string) Identity

NewIdentity ...

type Image

type Image struct {

	// Name is the container image name with format `IMAGE[:TAG|@DIGEST]`.
	// IMAGE is described as "[registry/]owner/name".
	Name string

	// Executable means if this image has its own ENTRYPOINT or CMD
	// and doesn't need to Exec any other additional script.
	Executable bool
}

Image ...

type Include added in v0.4.0

type Include struct {

	// Because `Include` model represents client local path,
	// `URL` of `Include` does NOT have any meaning.
	Resource `json:",inline"`

	// LocalPath represents a file path in the client machine,
	// where `hotsub` command is issued.
	// This file is `docker cp` to VM and traslated to `DeployedPath`.
	LocalPath string
}

Include represents a file which should be included and transferd to VM.

type Includes added in v0.4.0

type Includes []*Include

Includes represent local files which should be transfered to VM.

type Input

type Input struct {
	Resource `json:",inline"`
}

Input represents a input specified as a cell of tasks file.

type Inputs

type Inputs []*Input

Inputs represent given input set

type Job

type Job struct {

	// Identity specifies the identity of this job.
	Identity Identity

	// Parameters specifies the parameters assigned to this job.
	// It is exactly what the corresponding row in tasks file is parsed to.
	Parameters *Parameters

	// Container spedifies the settings which is used the real execution runtime.
	Container *JobContainer

	Machine struct {
		Spec     *dkmachine.CreateOptions
		Instance *dkmachine.Machine
	}

	// Report ...
	Report *Report

	// Type represents the type of workflows,
	// MUST be either of ['Script','CWL']
	Type JobType
}

Job represents a input/output/env set specified as an independent row of tasks file.

func NewJob

func NewJob(index int, prefix string) *Job

NewJob ...

func (*Job) Commit

func (job *Job) Commit() error

Commit represents a main process of this job. The main process of this job consists of Fetch, Exec, and Push.

func (*Job) Construct

func (job *Job) Construct(shared *SharedData) (err error)

Construct creates containers inside job instance.

func (*Job) Create

func (job *Job) Create() error

Create creates physical machine and wake the required containers up. In most cases, containers with hotsub/routine and user defined image are required.

func (*Job) Destroy

func (job *Job) Destroy() error

Destroy ...

func (*Job) Exec

func (job *Job) Exec() error

Exec executes user-defined script inside the workflow container.

func (*Job) Fetch

func (job *Job) Fetch() error

Fetch downloads resources from cloud storage services, localize those URLs to local path, and, additionally, ensure the output directories exist.

func (*Job) Lifetime

func (job *Job) Lifetime(lifecycle Lifecycle, format string, v ...interface{})

Lifetime ...

func (*Job) Push

func (job *Job) Push() error

Push uploads result files to cloud storage services according to specified "output" URLs of tasks file.

func (*Job) Run

func (job *Job) Run(ctx context.Context, shared *SharedData, sem *semaphore.Weighted) error

Run ... sem is a semaphore for running concurrency

func (*Job) Stdio

func (job *Job) Stdio(streamtype daap.HijackedStreamType, lifecycle Lifecycle, text string)

Stdio logs stdout/stderr.

type JobContainer

type JobContainer struct {
	// Envs shold have evetything translated from Parameters.
	Envs   []Env
	Image  *Image
	Script *Script

	// container ...
	Routine  *daap.Container
	Workflow *daap.Container
}

JobContainer ...

type JobType added in v0.4.0

type JobType string

JobType represents the type

const (

	// ShellScriptJob ...
	// Recommended
	ShellScriptJob JobType = "Script"

	// CommonWorkflowLanguageJob ...
	// See https://github.com/common-workflow-language/common-workflow-language
	CommonWorkflowLanguageJob JobType = "CWL"

	// WorkflowDescriptionLanguageJob ...
	// See https://github.com/openwdl/wdl
	WorkflowDescriptionLanguageJob JobType = "WDL"
)

type Lifecycle

type Lifecycle string

Lifecycle represents the lifecycles of Component and Job.

const (
	// CREATE is creating computing instances, physically.
	CREATE Lifecycle = "CREATE"
	// CONSTRUCT is creating containers inside the instances.
	CONSTRUCT Lifecycle = "CONSTRUCT"
	// FETCH is downloading specified input files from remote storage service.
	FETCH Lifecycle = "FETCH"
	// EXECUTE is executing the specified script inside user-workflow container.
	EXECUTE Lifecycle = "EXECUTE"
	// PUSH is uploading the result files to remote storage service.
	PUSH Lifecycle = "PUSH"
	// DESTROY is deleting the physical instances which are no longer used.
	DESTROY Lifecycle = "DESTROY"
)

type Logger

type Logger interface {
	Lifetime(string, string, ...interface{})
	Stdio(int, string, string)
	Close() error
}

Logger is an interface of log writer

type LoggerFactory

type LoggerFactory interface {
	Logger(*Job) (Logger, error)
}

LoggerFactory can generate a Logger struct corresponding to the specified Job.

type Machine

type Machine struct {

	// {{{ TODO: Not used so far
	// Provider specifies machine provider, either of [ec2] ~~[gce, k8s, local]~~
	Provider string
	// CPU specifies how many CPU cores are required for the "Job"
	CPU int
	// Memory specifies how much memory are required (in GB) for the "Job"
	Memory string

	// Spec represent options to create instance.
	Spec *dkmachine.CreateOptions
}

Machine ...

func (*Machine) Instantiate

func (m *Machine) Instantiate() (*dkmachine.Machine, error)

Instantiate ...

type Output

type Output struct {
	Resource `json:",inline"`
}

Output represents a input specified as a cell of tasks file.

type Outputs

type Outputs []*Output

Outputs represent given input set

type Parameters

type Parameters struct {
	Inputs  Inputs
	Outputs Outputs
	Envs    []Env

	// Includes represent **local** files
	// which should be transfered to VM from local.
	Includes Includes
}

Parameters specifies the parameters assigned to this job. It is exactly what the corresponding row in tasks file is parsed to.

type Report

type Report struct {
	Log     Logger
	Metrics struct {
		Writer io.Writer
	}
}

Report ...

type Resource

type Resource struct {
	// Name is a name label for this output
	Name string `json:"name"`

	// Recursive specify if this input is a directory or just a file.
	Recursive bool `json:"recursive"  yaml:"recursive"`

	// URL is (in most cases) a resource location through the Internet,
	// s3://..., gs://... for examples.
	// The output location specified by this URL would be translated to
	// local file path (== DeployedPath) on computing node, and pushed to this URL after the job.
	URL string `json:"url"        yaml:"url"`
	// DeployedPath is a path (in VM) which is translated from URL.
	DeployedPath string `json:"deployed_path" yaml:"deployed_path"`
}

Resource represents a common struct for both Input and Output.

func (*Resource) Env

func (resource *Resource) Env() Env

Env ...

func (*Resource) EnvForFetch

func (resource *Resource) EnvForFetch() []string

EnvForFetch ...

func (*Resource) Localize

func (resource *Resource) Localize(rootdir string) error

Localize convert given resource URL to local file path inside the container.

type Script

type Script struct {

	// Path is a file path to the script which should be executed on the container.
	Path string

	// Inline is inline command which should be executed on the container.
	// If "Path" is specified, Inline is ignored.
	Inline []string
}

Script specifies what to do inside the container. If the Image.Executable is true, whole the "Script" is ignored.

type SharedData

type SharedData struct {
	Spec *dkmachine.CreateOptions

	// {{{ TODO: multiple SharedDataInstances design
	Instance *dkmachine.Machine

	Inputs    Inputs
	Root      string
	Container struct {
		Routine   *daap.Container
		NFSServer *daap.Container
	}
}

SharedData ...

func (*SharedData) Create

func (sd *SharedData) Create() error

Create ...

func (*SharedData) CreateNFSVolumesOn added in v0.1.2

func (sd *SharedData) CreateNFSVolumesOn(m *dkmachine.Machine) ([]*daap.Volume, error)

CreateNFSVolumesOn creates volumes from `*SharedData` for specified computing machine. **This must not mutate `*SharedData` struct itself.**

func (*SharedData) Envs

func (sd *SharedData) Envs() (envs []Env)

Envs ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL