core

package
v0.9.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2015 License: BSD-2-Clause Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CLIENT_STAT_ACTIVE_BUSY = "active-busy"
	CLIENT_STAT_ACTIVE_IDLE = "active-idle"
	CLIENT_STAT_SUSPEND     = "suspend"
	CLIENT_STAT_DELETED     = "deleted"
)
View Source
const (
	JOB_STAT_INIT       = "init"
	JOB_STAT_QUEUED     = "queued"
	JOB_STAT_INPROGRESS = "in-progress"
	JOB_STAT_COMPLETED  = "completed"
	JOB_STAT_SUSPEND    = "suspend"
	JOB_STAT_DELETED    = "deleted"
)
View Source
const (
	TASK_STAT_INIT       = "init"
	TASK_STAT_QUEUED     = "queued"
	TASK_STAT_INPROGRESS = "in-progress"
	TASK_STAT_PENDING    = "pending"
	TASK_STAT_SUSPEND    = "suspend"
	TASK_STAT_COMPLETED  = "completed"
	TASK_STAT_SKIPPED    = "user_skipped"
	TASK_STAT_FAIL_SKIP  = "skipped"
	TASK_STAT_PASSED     = "passed"
)
View Source
const (
	WORK_STAT_QUEUED      = "queued"
	WORK_STAT_CHECKOUT    = "checkout"
	WORK_STAT_SUSPEND     = "suspend"
	WORK_STAT_DONE        = "done"
	WORK_STAT_FAIL        = "fail"
	WORK_STAT_PREPARED    = "prepared"
	WORK_STAT_COMPUTED    = "computed"
	WORK_STAT_DISCARDED   = "discarded"
	WORK_STAT_PROXYQUEUED = "proxyqueued"
)

Variables

View Source
var (
	QMgr          ResourceMgr
	Service       string = "unknown"
	Self          *Client
	ProxyWorkChan chan bool
)
View Source
var (
	CGNameRegex = regexp.MustCompile(`^[A-Za-z0-9\_\-\.]+$`)
)

Functions

func DeleteClientGroup added in v0.9.3

func DeleteClientGroup(id string) (err error)

func Expand_app_variables added in v0.9.3

func Expand_app_variables(app_variables AppVariables, cmd_script []string) (err error)

func GetJobCount added in v0.9.12

func GetJobCount(q bson.M) (count int, err error)

func GetJobIdByTaskId

func GetJobIdByTaskId(taskid string) (jobid string, err error)

misc

func GetJobIdByWorkId

func GetJobIdByWorkId(workid string) (jobid string, err error)

func GetTaskIdByWorkId

func GetTaskIdByWorkId(workid string) (taskid string, err error)

func InitAwfMgr

func InitAwfMgr()

func InitClientGroupDB added in v0.9.3

func InitClientGroupDB()

func InitClientProfile

func InitClientProfile(profile *Client)

func InitJobDB

func InitJobDB()

func InitProxyWorkChan

func InitProxyWorkChan()

func InitResMgr

func InitResMgr(service string)

func IsFirstTask

func IsFirstTask(taskid string) bool

func NotifyWorkunitProcessed

func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error)

functions for REST API communication (=deprecated=) notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"

func NotifyWorkunitProcessedWithLogs

func NotifyWorkunitProcessedWithLogs(work *Workunit, perf *WorkPerf, sendstdlogs bool) (err error)

func ParseResource added in v0.9.13

func ParseResource(input_arg AppResource, app_variables AppVariables, job *Job, task *Task, taskid2task map[string]*Task) (err error)

func PostNode

func PostNode(io *IO, numParts int) (nodeid string, err error)

create a shock node for output (=deprecated=)

func PostNodeWithToken

func PostNodeWithToken(io *IO, numParts int, token string) (nodeid string, err error)

func PushOutputData

func PushOutputData(work *Workunit) (size int64, err error)

deprecated, see cache.UploadOutputData

func PutFileToShock

func PutFileToShock(filename string, host string, nodeid string, rank int, token string, attrfile string, ntype string, formopts map[string]string, nodeattr map[string]interface{}) (err error)

func ReloadFromDisk

func ReloadFromDisk(path string) (err error)

func ShockPutIndex

func ShockPutIndex(host string, nodeid string, indexname string, token string) (err error)

func UpdateJobState

func UpdateJobState(jobid string, newstate string, oldstates []string) (err error)

update job state to "newstate" only if the current state is in one of the "oldstates"

Types

type App added in v0.9.13

type App struct {
	Name     string        `bson:"name" json:"name"`
	App_args []AppResource `bson:"app_args" json:"app_args"`

	AppDef *AppCommandMode `bson:"appdef" json:"appdef"` // App defintion

}

type AppCommandMode added in v0.9.3

type AppCommandMode struct {
	Input           []AppInput          `bson:"input" json:"input"`
	Output_array    []string            `bson:"output_array" json:"output_array"`
	Outputs         []IO                `bson:"outputs" json:"outputs"`
	Predata         IOmap               `bson:"predata" json:"predata"`
	Cmd             string              `bson:"cmd" json:"cmd"`
	Cmd_interpreter string              `bson:"cmd_interpreter" json:"cmd_interpreter"`
	Cmd_script      []string            `bson:"cmd_script" json:"cmd_script"`
	Variables       []map[string]string `bson:"variables" json:"variables"`
	Dockerimage     string              // just for convenience
}

func (AppCommandMode) Get_app_variables added in v0.9.3

func (acm AppCommandMode) Get_app_variables(app_variables AppVariables) (err error)

func (AppCommandMode) Get_default_app_variables added in v0.9.3

func (acm AppCommandMode) Get_default_app_variables() (app_variables AppVariables, err error)

func (AppCommandMode) ParseAppInput added in v0.9.3

func (acm AppCommandMode) ParseAppInput(app_variables AppVariables, args_array []AppResource, job *Job, task *Task, taskid2task map[string]*Task) (err error)

read variables and (optionally) populate with input nodes also transfers information from app defintions to app inputs 1) for reading variables, it needs only acm.Get_default_app_variables(), job and task will be nil 2) for populating input nodes it needs output of 2 ! this is done server-side !

type AppInput added in v0.9.4

type AppInput struct {
	Type         string `bson:"type" json:"type"`
	Name         string `bson:"name" json:"name"`
	DefaultValue string `bson:"default_value" json:"default_value"`
	Required     bool   `bson:"required" json:"required"` // or use optional // TODO remove
	Optional     bool   `bson:"optional" json:"optional"`
	Option       string `bson:"option" json:"option"`         // this is the name used by the command line proramm, e.g. "--input="
	Cache        bool   `bson:"cache" json:"cache"`           // specifies that input has to be cached (predata)
	ShockIndex   string `bson:"shockindex" json:"shockindex"` // specifies that (shock) input has to be indexed in Shock by the AWE server

}

part of the app-definition

type AppInputType added in v0.9.3

type AppInputType int
const (
	Ait_undefined AppInputType = iota
	Ait_file
	Ait_string
	Ait_shock
	Ait_url
	Ait_task
	Ait_list
)

func String2apptype added in v0.9.13

func String2apptype(type_string string) (ait AppInputType, err error)

func (AppInputType) HasType added in v0.9.3

func (this_ait AppInputType) HasType(ait AppInputType) bool

type AppPackage added in v0.9.3

type AppPackage struct {
	Dockerimage string                                `bson:"dockerimage" json:"dockerimage"`
	Commands    map[string]map[string]*AppCommandMode // package_command, package_mode
}

type AppRegistry added in v0.9.3

type AppRegistry map[string]*AppPackage

func MakeAppRegistry added in v0.9.3

func MakeAppRegistry() (new_instance AppRegistry, err error)

generator function for app registry

func (AppRegistry) GetAppPackage added in v0.9.13

func (apr AppRegistry) GetAppPackage(app_package string) (ap *AppPackage, err error)

func (AppRegistry) Get_cmd_mode_object added in v0.9.3

func (appr AppRegistry) Get_cmd_mode_object(app_package_name string, app_command_name string, app_cmd_mode_name string) (app_cmd_mode_object_ref *AppCommandMode, err error)

func (AppRegistry) Get_dockerimage added in v0.9.3

func (appr AppRegistry) Get_dockerimage(app_package_name string) (dockerimage string, err error)

type AppResource added in v0.9.3

type AppResource struct {
	Resource       string        `bson:"resource" json:"resource"`
	Host           string        `bson:"host" json:"host"`
	Node           string        `bson:"node" json:"node"`
	Url            string        `bson:"url" json:"url"`
	Filename       string        `bson:"filename" json:"filename"`
	Key            string        `bson:"key" json:"key"`
	Value          string        `bson:"value" json:"value"`
	Task           string        `bson:"task" json:"task"`
	OutputPosition *int          `bson:"position" json:"position"`
	OutputName     string        `bson:"name" json:"name"`
	Uncompress     string        `bson:"uncompress" json:"uncompress"` // tells AWE client to uncompress this file, e.g. "gzip"
	List           []AppResource `bson:"list" json:"list"`
	Cache          bool          `bson:"cache" json:"cache"`
	ShockIndex     string        `bson:"shockindex" json:"shockindex"` // specifies that (shock) input has to be indexed in Shock by the AWE server
}

part of workflow document, used in "Command", defines input: shock, task, string those can generate IO structs (see io.go)

type AppVariable added in v0.9.3

type AppVariable struct {
	Value    string
	Var_type AppInputType
	Option   string // a flag that is needed to activate an argument on the command line, e.g. "--input ", mainly used for optional arguments
	Optional bool   // indicates that an empty value is ok and not an error
}

type AppVariables added in v0.9.3

type AppVariables map[string]AppVariable

part of the (internal-only) workflow document, used in "Task""

type CQMgr

type CQMgr struct {
	// contains filtered or unexported fields
}

func NewCQMgr

func NewCQMgr() *CQMgr

func (*CQMgr) CheckoutWorkunits

func (qm *CQMgr) CheckoutWorkunits(req_policy string, client_id string, num int) (workunits []*Workunit, err error)

func (*CQMgr) ClientChecker

func (qm *CQMgr) ClientChecker()

func (*CQMgr) ClientHeartBeat

func (qm *CQMgr) ClientHeartBeat(id string, cg *ClientGroup) (hbmsg HBmsg, err error)

func (*CQMgr) DeleteClient

func (qm *CQMgr) DeleteClient(id string) (err error)

func (*CQMgr) DeleteClientByUser added in v0.9.3

func (qm *CQMgr) DeleteClientByUser(id string, u *user.User) (err error)

func (*CQMgr) EnqueueWorkunit

func (qm *CQMgr) EnqueueWorkunit(work *Workunit) (err error)

func (*CQMgr) FetchDataToken

func (qm *CQMgr) FetchDataToken(workid string, clientid string) (token string, err error)

func (*CQMgr) GetAllClients

func (qm *CQMgr) GetAllClients() []*Client

func (*CQMgr) GetAllClientsByUser added in v0.9.3

func (qm *CQMgr) GetAllClientsByUser(u *user.User) []*Client

func (*CQMgr) GetClient

func (qm *CQMgr) GetClient(id string) (client *Client, err error)

func (*CQMgr) GetClientByUser added in v0.9.3

func (qm *CQMgr) GetClientByUser(id string, u *user.User) (client *Client, err error)

func (*CQMgr) GetWorkById

func (qm *CQMgr) GetWorkById(id string) (workunit *Workunit, err error)

func (*CQMgr) Handle

func (qm *CQMgr) Handle()

func (*CQMgr) LockSemaphore added in v0.9.11

func (qm *CQMgr) LockSemaphore()

func (*CQMgr) NotifyWorkStatus

func (qm *CQMgr) NotifyWorkStatus(notice Notice)

func (*CQMgr) ReQueueWorkunitByClient

func (qm *CQMgr) ReQueueWorkunitByClient(clientid string) (err error)

func (*CQMgr) RegisterNewClient

func (qm *CQMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)

func (*CQMgr) ResumeClient

func (qm *CQMgr) ResumeClient(id string) (err error)

func (*CQMgr) ResumeClientByUser added in v0.9.3

func (qm *CQMgr) ResumeClientByUser(id string, u *user.User) (err error)

func (*CQMgr) ResumeSuspendedClients

func (qm *CQMgr) ResumeSuspendedClients() (count int)

func (*CQMgr) ResumeSuspendedClientsByUser added in v0.9.3

func (qm *CQMgr) ResumeSuspendedClientsByUser(u *user.User) (count int)

func (*CQMgr) ShowWorkQueue

func (qm *CQMgr) ShowWorkQueue()

show functions used in debug

func (*CQMgr) ShowWorkunits

func (qm *CQMgr) ShowWorkunits(status string) (workunits []*Workunit)

func (*CQMgr) ShowWorkunitsByUser added in v0.9.3

func (qm *CQMgr) ShowWorkunitsByUser(status string, u *user.User) (workunits []*Workunit)

func (*CQMgr) SuspendAllClients

func (qm *CQMgr) SuspendAllClients() (count int)

func (*CQMgr) SuspendAllClientsByUser added in v0.9.3

func (qm *CQMgr) SuspendAllClientsByUser(u *user.User) (count int)

func (*CQMgr) SuspendClient

func (qm *CQMgr) SuspendClient(id string) (err error)

func (*CQMgr) SuspendClientByUser added in v0.9.3

func (qm *CQMgr) SuspendClientByUser(id string, u *user.User) (err error)

func (*CQMgr) Timer

func (qm *CQMgr) Timer()

func (*CQMgr) UnlockSemaphore added in v0.9.11

func (qm *CQMgr) UnlockSemaphore()

func (*CQMgr) UpdateSubClients

func (qm *CQMgr) UpdateSubClients(id string, count int)

func (*CQMgr) UpdateSubClientsByUser added in v0.9.3

func (qm *CQMgr) UpdateSubClientsByUser(id string, count int, u *user.User)

type Client

type Client struct {
	Id              string          `bson:"id" json:"id"`
	Name            string          `bson:"name" json:"name"`
	Group           string          `bson:"group" json:"group"`
	User            string          `bson:"user" json:"user"`
	Domain          string          `bson:"domain" json:"domain"`
	InstanceId      string          `bson:"instance_id" json:"instance_id"`
	InstanceType    string          `bson:"instance_type" json:"instance_type"`
	Host            string          `bson:"host" json:"host"`
	CPUs            int             `bson:"cores" json:"cores"`
	Apps            []string        `bson:"apps" json:"apps"`
	RegTime         time.Time       `bson:"regtime" json:"regtime"`
	Serve_time      string          `bson:"serve_time" json:"serve_time"`
	Idle_time       int             `bson:"idle_time" json:"idle_time"`
	Status          string          `bson:"Status" json:"Status"`
	Total_checkout  int             `bson:"total_checkout" json:"total_checkout"`
	Total_completed int             `bson:"total_completed" json:"total_completed"`
	Total_failed    int             `bson:"total_failed" json:"total_failed"`
	Current_work    map[string]bool `bson:"current_work" json:"current_work"`
	Skip_work       []string        `bson:"skip_work" json:"skip_work"`
	Last_failed     int             `bson:"-" json:"-"`
	Tag             bool            `bson:"-" json:"-"`
	Proxy           bool            `bson:"proxy" json:"proxy"`
	SubClients      int             `bson:"subclients" json:"subclients"`
	GitCommitHash   string          `bson:"git_commit_hash" json:"git_commit_hash"`
	Version         string          `bson:"version" json:"version"`
}

func NewClient

func NewClient() (client *Client)

func NewProfileClient

func NewProfileClient(filepath string) (client *Client, err error)

func (*Client) IsBusy

func (cl *Client) IsBusy() bool

type ClientGroup added in v0.9.3

type ClientGroup struct {
	Id           string                        `bson:"id" json:"id"`
	IP_CIDR      string                        `bson:"ip_cidr" json:"ip_cidr"`
	Name         string                        `bson:"name" json:"name"`
	Token        string                        `bson:"token" json:"token"`
	Acl          clientGroupAcl.ClientGroupAcl `bson:"acl" json:"-"`
	CreatedOn    time.Time                     `bson:"created_on" json:"created_on"`
	Expiration   time.Time                     `bson:"expiration" json:"expiration"`
	LastModified time.Time                     `bson:"last_modified" json:"last_modified"`
}

func CreateClientGroup added in v0.9.3

func CreateClientGroup(name string, u *user.User) (cg *ClientGroup, err error)

func LoadClientGroup added in v0.9.3

func LoadClientGroup(id string) (clientgroup *ClientGroup, err error)

func LoadClientGroupByName added in v0.9.3

func LoadClientGroupByName(name string) (clientgroup *ClientGroup, err error)

func LoadClientGroupByToken added in v0.9.3

func LoadClientGroupByToken(token string) (clientgroup *ClientGroup, err error)

func (*ClientGroup) Save added in v0.9.3

func (cg *ClientGroup) Save() (err error)

func (*ClientGroup) SetToken added in v0.9.3

func (cg *ClientGroup) SetToken()

type ClientGroups added in v0.9.3

type ClientGroups []ClientGroup

ClientGroup array type

func (*ClientGroups) GetPaginated added in v0.9.3

func (n *ClientGroups) GetPaginated(q bson.M, limit int, offset int, order string, direction string) (count int, err error)

type ClientMgr

type ClientMgr interface {
	RegisterNewClient(FormFiles, *ClientGroup) (*Client, error)
	ClientHeartBeat(string, *ClientGroup) (HBmsg, error)
	GetClient(string) (*Client, error)
	GetClientByUser(string, *user.User) (*Client, error)
	GetAllClients() []*Client
	GetAllClientsByUser(*user.User) []*Client
	DeleteClient(string) error
	DeleteClientByUser(string, *user.User) error
	SuspendClient(string) error
	SuspendClientByUser(string, *user.User) error
	ResumeClient(string) error
	ResumeClientByUser(string, *user.User) error
	ResumeSuspendedClients() int
	ResumeSuspendedClientsByUser(*user.User) int
	SuspendAllClients() int
	SuspendAllClientsByUser(*user.User) int
	ClientChecker()
	UpdateSubClients(string, int)
	UpdateSubClientsByUser(string, int, *user.User)
}

type ClientWorkMgr

type ClientWorkMgr interface {
	ClientMgr
	WorkMgr
}

type CoAck

type CoAck struct {
	// contains filtered or unexported fields
}

type CoReq

type CoReq struct {
	// contains filtered or unexported fields
}

type Command

type Command struct {
	Name          string   `bson:"name" json:"name"`
	Args          string   `bson:"args" json:"args"`
	Dockerimage   string   `bson:"Dockerimage" json:"Dockerimage"`
	Cmd_script    []string `bson:"cmd_script" json:"cmd_script"`
	Environ       Envs     `bson:"environ" json:"environ"`
	HasPrivateEnv bool     `bson:"has_private_env" json:"has_private_env"`
	Description   string   `bson:"description" json:"description"`
	ParsedArgs    []string `bson:"-" json:"-"`
}

func NewCommand

func NewCommand(name string) *Command

type Command_p added in v0.9.3

type Command_p struct {
	Environ *Environ_p `json:"environ"`
}

type Environ_p added in v0.9.3

type Environ_p struct {
	Private map[string]string `json:"private"`
}

following special code is in order to unmarshal the private field Command.Environ.Private, so put them in to this file for less confusion

type Envs added in v0.9.3

type Envs struct {
	Public  map[string]string `bson:"public" json:"public"`
	Private map[string]string `bson:"private" json:"-"`
}

type FormFile

type FormFile struct {
	Name     string
	Path     string
	Checksum map[string]string
}

type FormFiles

type FormFiles map[string]FormFile

type HBmsg

type HBmsg map[string]string //map[op]obj1,obj2 e.g. map[discard]=work1,work2

heartbeat response from awe-server to awe-client used for issue operation request to client, e.g. discard suspended workunits

type IO

type IO struct {
	FileName      string                 `bson:"filename" json:"filename"`
	Name          string                 `bson:"name" json:"name"`     // specifies abstract name of output as defined by the app
	AppPosition   int                    `bson:"appposition" json:"-"` // specifies position in app output array
	Directory     string                 `bson:"directory" json:"directory"`
	Host          string                 `bson:"host" json:"host"`
	Node          string                 `bson:"node" json:"node"`
	Url           string                 `bson:"url"  json:"url"` // can be shock or any other url
	Size          int64                  `bson:"size" json:"size"`
	MD5           string                 `bson:"md5" json:"-"`
	Cache         bool                   `bson:"cache" json:"cache"` // indicates that this files is "predata"" that needs to be cached
	Origin        string                 `bson:"origin" json:"origin"`
	Path          string                 `bson:"path" json:"-"`
	Optional      bool                   `bson:"optional" json:"-"`
	Nonzero       bool                   `bson:"nonzero"  json:"nonzero"`
	DataToken     string                 `bson:"datatoken"  json:"-"`
	Intermediate  bool                   `bson:"Intermediate"  json:"-"`
	Temporary     bool                   `bson:"temporary"  json:"temporary"`
	ShockFilename string                 `bson:"shockfilename" json:"shockfilename"`
	ShockIndex    string                 `bson:"shockindex" json:"shockindex"` // on input it indicates that Shock node has to be indexed by AWE server
	AttrFile      string                 `bson:"attrfile" json:"attrfile"`
	NoFile        bool                   `bson:"nofile" json:"nofile"`
	Delete        bool                   `bson:"delete" json:"delete"`
	Type          string                 `bson:"type" json:"type"`
	NodeAttr      map[string]interface{} `bson:"nodeattr" json:"nodeattr"` // specifies attribute data to be stored in shock node (output only)
	FormOptions   map[string]string      `bson:"formoptions" json:"formoptions"`
	Uncompress    string                 `bson:"uncompress" json:"uncompress"` // tells AWE client to uncompress this file, e.g. "gzip"
}

func NewIO

func NewIO() *IO

func (*IO) DataUrl

func (io *IO) DataUrl() (dataurl string, err error)

func (*IO) DeleteNode added in v0.9.3

func (io *IO) DeleteNode() (nodeid string, err error)

func (*IO) GetFileSize

func (io *IO) GetFileSize() int64

func (*IO) GetIndexInfo

func (io *IO) GetIndexInfo() (idxinfo map[string]shock.IdxInfo, err error)

func (*IO) GetIndexUnits

func (io *IO) GetIndexUnits(indextype string) (totalunits int, err error)

func (*IO) GetShockNode

func (io *IO) GetShockNode() (node *shock.ShockNode, err error)

func (*IO) TotalUnits

func (io *IO) TotalUnits(indextype string) (count int, err error)

type IOmap

type IOmap map[string]*IO // [filename]attributes

func NewIOmap

func NewIOmap() IOmap

func (IOmap) Add

func (i IOmap) Add(name string, host string, node string, md5 string, cache bool)

func (IOmap) Find

func (i IOmap) Find(name string) *IO

func (IOmap) Has

func (i IOmap) Has(name string) bool

type Info

type Info struct {
	Name          string            `bson:"name" json:"name"`
	Xref          string            `bson:"xref" json:"xref"`
	Service       string            `bson:"service" json:"service"`
	Project       string            `bson:"project" json:"project"`
	User          string            `bson:"user" json:"user"`
	Pipeline      string            `bson:"pipeline" json:"pipeline"`
	ClientGroups  string            `bson:"clientgroups" json:"clientgroups"`
	SubmitTime    time.Time         `bson:"submittime" json:"submittime"`
	StartedTme    time.Time         `bson:"startedtime" json:"startedtime"`
	CompletedTime time.Time         `bson:"completedtime" json:"completedtime"`
	Priority      int               `bson:"priority" json:"priority"`
	Auth          bool              `bson:"auth" json:"auth"`
	DataToken     string            `bson:"datatoken" json:"-"`
	NoRetry       bool              `bson:"noretry" json:"noretry"`
	UserAttr      map[string]string `bson:"userattr" json:"userattr"`
	Description   string            `bson:"description" json:"description"`
	Tracking      bool              `bson:"tracking" json:"tracking"`
}

job info

func NewInfo

func NewInfo() *Info

type Job

type Job struct {
	Id          string    `bson:"id" json:"id"`
	Jid         string    `bson:"jid" json:"jid"`
	Acl         acl.Acl   `bson:"acl" json:"-"`
	Info        *Info     `bson:"info" json:"info"`
	Tasks       []*Task   `bson:"tasks" json:"tasks"`
	Script      script    `bson:"script" json:"-"`
	State       string    `bson:"state" json:"state"`
	Registered  bool      `bson:"registered" json:"registered"`
	RemainTasks int       `bson:"remaintasks" json:"remaintasks"`
	UpdateTime  time.Time `bson:"updatetime" json:"updatetime"`
	Notes       string    `bson:"notes" json:"notes"`
	LastFailed  string    `bson:"lastfailed" json:"lastfailed"`
	Resumed     int       `bson:"resumed" json:"resumed"`     //number of times the job has been resumed from suspension
	ShockHost   string    `bson:"shockhost" json:"shockhost"` // this is a fall-back default if not specified at a lower level
}

func AwfToJob

func AwfToJob(awf *Workflow, jid string) (job *Job, err error)

func CreateJobUpload

func CreateJobUpload(u *user.User, params map[string]string, files FormFiles, jid string) (job *Job, err error)

func LoadJob

func LoadJob(id string) (job *Job, err error)

func ParseAwf

func ParseAwf(filename string, jid string) (job *Job, err error)

parse .awf.json - sudo-function only, to be finished

func ParseJobTasks

func ParseJobTasks(filename string, jid string) (job *Job, err error)

parse job by job script

func (*Job) FilePath

func (job *Job) FilePath() string

func (*Job) GetDataToken

func (job *Job) GetDataToken() (token string)

func (*Job) GetPrivateEnv added in v0.9.3

func (job *Job) GetPrivateEnv(taskid string) (env map[string]string)

func (*Job) Mkdir

func (job *Job) Mkdir() (err error)

func (*Job) NumTask

func (job *Job) NumTask() int

func (*Job) Path

func (job *Job) Path() string

---Path functions

func (*Job) Save

func (job *Job) Save() (err error)

func (*Job) SetDataToken

func (job *Job) SetDataToken(token string)

set token

func (*Job) SetFile

func (job *Job) SetFile(file FormFile) (err error)

func (*Job) TaskList

func (job *Job) TaskList() []*Task

---Task functions

func (*Job) UpdateFile

func (job *Job) UpdateFile(params map[string]string, files FormFiles) (err error)

---Script upload

func (*Job) UpdateState

func (job *Job) UpdateState(newState string, notes string) (err error)

---Field update functions

func (*Job) UpdateTask

func (job *Job) UpdateTask(task *Task) (remainTasks int, err error)

invoked to modify job info in mongodb when a task in that job changed to the new status

type JobMgr

type JobMgr interface {
	JobRegister() (string, error)
	EnqueueTasksByJobId(string, []*Task) error
	GetActiveJobs() map[string]*JobPerf
	IsJobRegistered(string) bool
	GetSuspendJobs() map[string]bool
	SuspendJob(string, string, string) error
	ResumeSuspendedJob(string) error
	ResumeSuspendedJobByUser(string, *user.User) error
	ResumeSuspendedJobs() int
	ResumeSuspendedJobsByUser(*user.User) int
	ResubmitJob(string) error
	DeleteJob(string) error
	DeleteJobByUser(string, *user.User) error
	DeleteSuspendedJobs() int
	DeleteSuspendedJobsByUser(*user.User) int
	DeleteZombieJobs() int
	DeleteZombieJobsByUser(*user.User) int
	InitMaxJid() error
	RecoverJobs() error
	FinalizeWorkPerf(string, string) error
	SaveStdLog(string, string, string) error
	GetReportMsg(string, string) (string, error)
	RecomputeJob(string, string) error
	UpdateGroup(string, string) error
	UpdatePriority(string, int) error
}

type JobMin added in v0.9.8

type JobMin struct {
	Id            string            `bson:"id" json:"id"`
	Name          string            `bson:"name" json:"name"`
	Size          int64             `bson:"size" json:"size"`
	SubmitTime    time.Time         `bson:"submittime" json:"submittime"`
	CompletedTime time.Time         `bson:"completedtime" json:"completedtime"`
	Task          []int             `bson:"task" json:"task"`
	State         []string          `bson:"state" json:"state"`
	UserAttr      map[string]string `bson:"userattr" json:"userattr"`
}

type JobPerf

type JobPerf struct {
	Id     string               `bson:"id" json:"id"`
	Queued int64                `bson:"queued" json:"queued"`
	Start  int64                `bson:"start" json:"start"`
	End    int64                `bson:"end" json:"end"`
	Resp   int64                `bson:"resp" json:"resp"` //End - Queued
	Ptasks map[string]*TaskPerf `bson:"task_stats" json:"task_stats"`
	Pworks map[string]*WorkPerf `bson:"work_stats" json:"work_stats"`
}

func LoadJobPerf

func LoadJobPerf(id string) (perf *JobPerf, err error)

func NewJobPerf

func NewJobPerf(id string) *JobPerf

type Job_p added in v0.9.3

type Job_p struct {
	Tasks []*Task_p `json:"tasks"`
}

type Jobs

type Jobs []Job

Job array type

func (*Jobs) GetAll

func (n *Jobs) GetAll(q bson.M, order string, direction string) (err error)

func (*Jobs) GetAllLimitOffset

func (n *Jobs) GetAllLimitOffset(q bson.M, limit int, offset int) (err error)

func (*Jobs) GetAllRecent

func (n *Jobs) GetAllRecent(q bson.M, recent int) (count int, err error)

func (*Jobs) GetJobAt

func (n *Jobs) GetJobAt(index int) Job

func (*Jobs) GetPaginated

func (n *Jobs) GetPaginated(q bson.M, limit int, offset int, order string, direction string) (count int, err error)

func (*Jobs) Length

func (n *Jobs) Length() int

type Notice

type Notice struct {
	WorkId      string
	Status      string
	ClientId    string
	ComputeTime int
	Notes       string
}

type Opts

type Opts map[string]string

func (*Opts) HasKey

func (o *Opts) HasKey(key string) bool

func (*Opts) Value

func (o *Opts) Value(key string) string

type PartInfo

type PartInfo struct {
	Input         string `bson:"input" json:"input"`
	Index         string `bson:"index" json:"index"`
	TotalIndex    int    `bson:"totalindex" json:"totalindex"`
	MaxPartSizeMB int    `bson:"maxpartsize_mb" json:"maxpartsize_mb"`
	Options       string `bson:"options" json:"-"`
}

type Pipeline

type Pipeline struct {
	Info  *Info  `bson:"info" json:"info"`
	Tasks []Task `bson:"tasks" json:"tasks"`
}

func NewPipeline

func NewPipeline() *Pipeline

type ProxyMgr

type ProxyMgr struct {
	CQMgr
}

func NewProxyMgr

func NewProxyMgr() *ProxyMgr

func (*ProxyMgr) ClientChecker

func (qm *ProxyMgr) ClientChecker()

func (*ProxyMgr) DeleteJob

func (qm *ProxyMgr) DeleteJob(jobid string) (err error)

func (*ProxyMgr) DeleteJobByUser added in v0.9.3

func (qm *ProxyMgr) DeleteJobByUser(jobid string, u *user.User) (err error)

func (*ProxyMgr) DeleteSuspendedJobs

func (qm *ProxyMgr) DeleteSuspendedJobs() (num int)

func (*ProxyMgr) DeleteSuspendedJobsByUser added in v0.9.3

func (qm *ProxyMgr) DeleteSuspendedJobsByUser(u *user.User) (num int)

func (*ProxyMgr) DeleteZombieJobs

func (qm *ProxyMgr) DeleteZombieJobs() (num int)

func (*ProxyMgr) DeleteZombieJobsByUser added in v0.9.3

func (qm *ProxyMgr) DeleteZombieJobsByUser(u *user.User) (num int)

func (*ProxyMgr) EnqueueTasksByJobId

func (qm *ProxyMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)

func (*ProxyMgr) FetchDataToken

func (qm *ProxyMgr) FetchDataToken(workid string, clientid string) (token string, err error)

func (*ProxyMgr) FetchPrivateEnv added in v0.9.3

func (qm *ProxyMgr) FetchPrivateEnv(workid string, clientid string) (env map[string]string, err error)

func (*ProxyMgr) FinalizeWorkPerf

func (qm *ProxyMgr) FinalizeWorkPerf(string, string) (err error)

func (*ProxyMgr) GetActiveJobs

func (qm *ProxyMgr) GetActiveJobs() map[string]*JobPerf

func (*ProxyMgr) GetReportMsg

func (qm *ProxyMgr) GetReportMsg(string, string) (report string, err error)

func (*ProxyMgr) GetSuspendJobs

func (qm *ProxyMgr) GetSuspendJobs() map[string]bool

func (*ProxyMgr) Handle

func (qm *ProxyMgr) Handle()

func (*ProxyMgr) InitMaxJid

func (qm *ProxyMgr) InitMaxJid() (err error)

func (*ProxyMgr) IsJobRegistered

func (qm *ProxyMgr) IsJobRegistered(id string) bool

func (*ProxyMgr) JobRegister

func (qm *ProxyMgr) JobRegister() (jid string, err error)

func (*ProxyMgr) RecomputeJob

func (qm *ProxyMgr) RecomputeJob(jobid string, stage string) (err error)

recompute jobs from specified task stage

func (*ProxyMgr) RecoverJobs

func (qm *ProxyMgr) RecoverJobs() (err error)

recover jobs not completed before awe-server restarts

func (*ProxyMgr) RegisterNewClient

func (qm *ProxyMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)

func (*ProxyMgr) ResubmitJob

func (qm *ProxyMgr) ResubmitJob(id string) (err error)

re-submit a job in db but not in the queue (caused by server restarting)

func (*ProxyMgr) ResumeSuspendedJob

func (qm *ProxyMgr) ResumeSuspendedJob(id string) (err error)

resubmit a suspended job

func (*ProxyMgr) ResumeSuspendedJobByUser added in v0.9.3

func (qm *ProxyMgr) ResumeSuspendedJobByUser(id string, u *user.User) (err error)

resubmit a suspended job if user has rights

func (*ProxyMgr) ResumeSuspendedJobs

func (qm *ProxyMgr) ResumeSuspendedJobs() (num int)

func (*ProxyMgr) ResumeSuspendedJobsByUser added in v0.9.3

func (qm *ProxyMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)

func (*ProxyMgr) SaveStdLog

func (qm *ProxyMgr) SaveStdLog(string, string, string) (err error)

func (*ProxyMgr) ShowStatus

func (qm *ProxyMgr) ShowStatus() string

func (*ProxyMgr) SuspendJob

func (qm *ProxyMgr) SuspendJob(jobid string, reason string, id string) (err error)

func (*ProxyMgr) Timer

func (qm *ProxyMgr) Timer()

func (*ProxyMgr) UpdateGroup

func (qm *ProxyMgr) UpdateGroup(jobid string, newgroup string) (err error)

func (*ProxyMgr) UpdatePriority added in v0.9.3

func (qm *ProxyMgr) UpdatePriority(jobid string, priority int) (err error)

type ResourceMgr

type ResourceMgr interface {
	ClientWorkMgr
	JobMgr
	Handle()
	ShowStatus() string
	Timer()
}

type ServerMgr

type ServerMgr struct {
	CQMgr
	// contains filtered or unexported fields
}

func NewServerMgr

func NewServerMgr() *ServerMgr

func (*ServerMgr) CreateJobPerf

func (qm *ServerMgr) CreateJobPerf(jobid string)

---perf related methods

func (*ServerMgr) CreateTaskPerf

func (qm *ServerMgr) CreateTaskPerf(taskid string)

func (*ServerMgr) CreateWorkPerf

func (qm *ServerMgr) CreateWorkPerf(workid string)

func (*ServerMgr) DeleteJob

func (qm *ServerMgr) DeleteJob(jobid string) (err error)

func (*ServerMgr) DeleteJobByUser added in v0.9.3

func (qm *ServerMgr) DeleteJobByUser(jobid string, u *user.User) (err error)

func (*ServerMgr) DeleteJobPerf

func (qm *ServerMgr) DeleteJobPerf(jobid string)

func (*ServerMgr) DeleteSuspendedJobs

func (qm *ServerMgr) DeleteSuspendedJobs() (num int)

func (*ServerMgr) DeleteSuspendedJobsByUser added in v0.9.3

func (qm *ServerMgr) DeleteSuspendedJobsByUser(u *user.User) (num int)

func (*ServerMgr) DeleteZombieJobs

func (qm *ServerMgr) DeleteZombieJobs() (num int)

delete jobs in db with "queued" or "in-progress" state but not in the queue (zombie jobs)

func (*ServerMgr) DeleteZombieJobsByUser added in v0.9.3

func (qm *ServerMgr) DeleteZombieJobsByUser(u *user.User) (num int)

delete jobs in db with "queued" or "in-progress" state but not in the queue (zombie jobs) that user has access to

func (*ServerMgr) EnqueueTasksByJobId

func (qm *ServerMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)

func (*ServerMgr) FetchDataToken

func (qm *ServerMgr) FetchDataToken(workid string, clientid string) (token string, err error)

--workunit methds (servermgr implementation)

func (*ServerMgr) FetchPrivateEnv added in v0.9.3

func (qm *ServerMgr) FetchPrivateEnv(workid string, clientid string) (env map[string]string, err error)

func (*ServerMgr) FetchPrivateEnvs added in v0.9.3

func (qm *ServerMgr) FetchPrivateEnvs(workid string, clientid string) (envs map[string]string, err error)

func (*ServerMgr) FinalizeJobPerf

func (qm *ServerMgr) FinalizeJobPerf(jobid string)

func (*ServerMgr) FinalizeTaskPerf

func (qm *ServerMgr) FinalizeTaskPerf(taskid string)

func (*ServerMgr) FinalizeWorkPerf

func (qm *ServerMgr) FinalizeWorkPerf(workid string, reportfile string) (err error)

func (*ServerMgr) GetActiveJobs

func (qm *ServerMgr) GetActiveJobs() map[string]*JobPerf

func (*ServerMgr) GetReportMsg

func (qm *ServerMgr) GetReportMsg(workid string, logname string) (report string, err error)

func (*ServerMgr) GetSuspendJobs

func (qm *ServerMgr) GetSuspendJobs() map[string]bool

func (*ServerMgr) Handle

func (qm *ServerMgr) Handle()

func (*ServerMgr) InitMaxJid

func (qm *ServerMgr) InitMaxJid() (err error)

func (*ServerMgr) IsJobRegistered

func (qm *ServerMgr) IsJobRegistered(id string) bool

func (*ServerMgr) JobRegister

func (qm *ServerMgr) JobRegister() (jid string, err error)

---job methods---

func (*ServerMgr) LogJobPerf

func (qm *ServerMgr) LogJobPerf(jobid string)

func (*ServerMgr) RecomputeJob

func (qm *ServerMgr) RecomputeJob(jobid string, stage string) (err error)

recompute jobs from specified task stage

func (*ServerMgr) RecoverJobs

func (qm *ServerMgr) RecoverJobs() (err error)

recover jobs not completed before awe-server restarts

func (*ServerMgr) ResubmitJob

func (qm *ServerMgr) ResubmitJob(id string) (err error)

re-submit a job in db but not in the queue (caused by server restarting)

func (*ServerMgr) ResumeSuspendedJob

func (qm *ServerMgr) ResumeSuspendedJob(id string) (err error)

resubmit a suspended job

func (*ServerMgr) ResumeSuspendedJobByUser added in v0.9.3

func (qm *ServerMgr) ResumeSuspendedJobByUser(id string, u *user.User) (err error)

resubmit a suspended job if the user is authorized

func (*ServerMgr) ResumeSuspendedJobs

func (qm *ServerMgr) ResumeSuspendedJobs() (num int)

func (*ServerMgr) ResumeSuspendedJobsByUser added in v0.9.3

func (qm *ServerMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)

func (*ServerMgr) SaveStdLog

func (qm *ServerMgr) SaveStdLog(workid string, logname string, tmppath string) (err error)

func (*ServerMgr) ShowStatus

func (qm *ServerMgr) ShowStatus() string

func (*ServerMgr) ShowTasks

func (qm *ServerMgr) ShowTasks()

func (*ServerMgr) SuspendJob

func (qm *ServerMgr) SuspendJob(jobid string, reason string, id string) (err error)

func (*ServerMgr) Timer

func (qm *ServerMgr) Timer()

func (*ServerMgr) UpdateGroup

func (qm *ServerMgr) UpdateGroup(jobid string, newgroup string) (err error)

update job group

func (*ServerMgr) UpdateJobPerfStartTime

func (qm *ServerMgr) UpdateJobPerfStartTime(jobid string)

func (*ServerMgr) UpdateJobTaskToInProgress

func (qm *ServerMgr) UpdateJobTaskToInProgress(works []*Workunit)

update job/task states from "queued" to "in-progress" once the first workunit is checked out

func (*ServerMgr) UpdatePriority added in v0.9.3

func (qm *ServerMgr) UpdatePriority(jobid string, priority int) (err error)

func (*ServerMgr) UpdateTaskPerfStartTime

func (qm *ServerMgr) UpdateTaskPerfStartTime(taskid string)

type Task

type Task struct {
	Id            string            `bson:"taskid" json:"taskid"`
	Info          *Info             `bson:"info" json:"-"`
	Inputs        IOmap             `bson:"inputs" json:"inputs"`
	Outputs       IOmap             `bson:"outputs" json:"outputs"`
	Predata       IOmap             `bson:"predata" json:"predata"`
	Cmd           *Command          `bson:"cmd" json:"cmd"`
	App           *App              `bson:"app" json:"app"`
	AppVariables  AppVariables      // not in App as workunit does not need AppVariables and I want to pass App
	Partition     *PartInfo         `bson:"partinfo" json:"-"`
	DependsOn     []string          `bson:"dependsOn" json:"dependsOn"`
	TotalWork     int               `bson:"totalwork" json:"totalwork"`
	MaxWorkSize   int               `bson:"maxworksize"   json:"maxworksize"`
	RemainWork    int               `bson:"remainwork" json:"remainwork"`
	WorkStatus    []string          `bson:"workstatus" json:"-"`
	State         string            `bson:"state" json:"state"`
	Skip          int               `bson:"skip" json:"-"`
	CreatedDate   time.Time         `bson:"createdDate" json:"createddate"`
	StartedDate   time.Time         `bson:"startedDate" json:"starteddate"`
	CompletedDate time.Time         `bson:"completedDate" json:"completeddate"`
	ComputeTime   int               `bson:"computetime" json:"computetime"`
	UserAttr      map[string]string `bson:"userattr" json:"userattr"`
	ClientGroups  string            `bson:"clientgroups" json:"clientgroups"`
}

func NewTask

func NewTask(job *Job, rank int) *Task

func (*Task) CreateIndex added in v0.9.4

func (task *Task) CreateIndex() (err error)

func (*Task) DeleteInput added in v0.9.12

func (task *Task) DeleteInput()

func (*Task) DeleteOutput added in v0.9.3

func (task *Task) DeleteOutput()

func (*Task) InitPartIndex

func (task *Task) InitPartIndex() (err error)

get part size based on partition/index info if fail to get index info, task.TotalWork fall back to 1 and return nil

func (*Task) InitTask

func (task *Task) InitTask(job *Job) (err error)

fill some info (lacked in input json) for a task

func (*Task) ParseWorkunit

func (task *Task) ParseWorkunit() (wus []*Workunit, err error)

func (*Task) Skippable

func (task *Task) Skippable() bool

func (*Task) UpdateState

func (task *Task) UpdateState(newState string) string

type TaskPerf

type TaskPerf struct {
	Queued       int64   `bson:"queued" json:"queued"`
	Start        int64   `bson:"start" json:"start"`
	End          int64   `bson:"end" json:"end"`
	Resp         int64   `bson:"resp" json:"resp"` //End -Queued
	InFileSizes  []int64 `bson:"size_infile" json:"size_infile"`
	OutFileSizes []int64 `bson:"size_outfile" json:"size_outfile"`
}

func NewTaskPerf

func NewTaskPerf(id string) *TaskPerf

type Task_p added in v0.9.3

type Task_p struct {
	Cmd *Command_p `json:"cmd"`
}

type VariableExpander added in v0.9.3

type VariableExpander struct {
	// contains filtered or unexported fields
}

func NewVariableExpander added in v0.9.3

func NewVariableExpander(app_variables AppVariables) VariableExpander

func (VariableExpander) Expand added in v0.9.3

func (va VariableExpander) Expand(line string) (expanded string, err error)

type WQueue

type WQueue struct {
	// contains filtered or unexported fields
}

func NewWQueue

func NewWQueue() *WQueue

func (*WQueue) Add

func (wq *WQueue) Add(workunit *Workunit) (err error)

func (*WQueue) Delete

func (wq *WQueue) Delete(id string)

func (*WQueue) Get

func (wq *WQueue) Get(id string) (work *Workunit, ok bool)

func (*WQueue) Has

func (wq *WQueue) Has(id string) (has bool)

func (WQueue) Len

func (wq WQueue) Len() int

func (*WQueue) StatusChange

func (wq *WQueue) StatusChange(id string, new_status string) (err error)

type WorkList

type WorkList []*Workunit

queuing/prioritizing related functions

func (WorkList) Len

func (wl WorkList) Len() int

func (WorkList) Swap

func (wl WorkList) Swap(i, j int)

type WorkMgr

type WorkMgr interface {
	GetWorkById(string) (*Workunit, error)
	ShowWorkunits(string) []*Workunit
	ShowWorkunitsByUser(string, *user.User) []*Workunit
	CheckoutWorkunits(string, string, int) ([]*Workunit, error)
	NotifyWorkStatus(Notice)
	EnqueueWorkunit(*Workunit) error
	FetchDataToken(string, string) (string, error)
	FetchPrivateEnv(string, string) (map[string]string, error)
}

type WorkPerf

type WorkPerf struct {
	Queued             int64   `bson:"queued" json:"queued"`                   // WQ (queued at server or client, depending on who creates it)
	Done               int64   `bson:"done" json:"done"`                       // WD (done at server)
	Resp               int64   `bson:"resp" json:"resp"`                       // Done - Queued (server metric)
	Checkout           int64   `bson:"checkout" json:"checkout"`               // checkout at client
	Deliver            int64   `bson:"deliver" json:"deliver"`                 // done at client
	ClientResp         int64   `bson:"clientresp" json:"clientresp"`           // Deliver - Checkout (client metric)
	PreDataIn          float64 `bson:"time_predata_in" json:"time_predata_in"` // time in seconds for downloading prerequisite data at client
	DataIn             float64 `bson:"time_data_in" json:"time_data_in"`       // time in seconds for input data move-in at client
	DataOut            float64 `bson:"time_data_out" json:"time_data_out"`     // time in seconds for output data move-out at client
	Runtime            int64   `bson:"runtime" json:"runtime"`                 // time in seconds for computation at client
	DockerPrep         int64   `bson:"dockerprep" json:"dockerprep"`           // time in seconds for docker preparation on client
	MaxMemUsage        int64   `bson:"max_mem_usage" json:"max_mem_usage"`     // maxium memory consumption
	MaxMemoryTotalRss  int64   `bson:"max_memory_total_rss" json:"max_memory_total_rss"`
	MaxMemoryTotalSwap int64   `bson:"max_memory_total_swap" json:"max_memory_total_swap"`
	ClientId           string  `bson:"client_id" json:"client_id"`
	PreDataSize        int64   `bson:"size_predata" json:"size_predata"` //predata moved over network
	InFileSize         int64   `bson:"size_infile" json:"size_infile"`   //input file moved over network
	OutFileSize        int64   `bson:"size_outfile" json:"size_outfile"` //outpuf file moved over network
}

func NewWorkPerf

func NewWorkPerf(id string) *WorkPerf

type Workflow

type Workflow struct {
	WfInfo     awf_info          `bson:"workflow_info" json:"workflow_info"`
	JobInfo    awf_jobinfo       `bson:"job_info" json:"job_info"`
	RawInputs  map[string]string `bson:"raw_inputs" json:"raw_inputs"`
	Variables  map[string]string `bson:"variables" json:"variables"`
	DataServer string            `bson:"data_server" json:"data_server"`
	Tasks      []*awf_task       `bson:"tasks" json:"tasks"`
}

type WorkflowMgr

type WorkflowMgr struct {
	// contains filtered or unexported fields
}
var (
	AwfMgr *WorkflowMgr
)

func NewWorkflowMgr

func NewWorkflowMgr() *WorkflowMgr

func (*WorkflowMgr) AddWorkflow

func (wfm *WorkflowMgr) AddWorkflow(name string, awf *Workflow)

func (*WorkflowMgr) GetAllWorkflows

func (wfm *WorkflowMgr) GetAllWorkflows() (workflows []*Workflow)

func (*WorkflowMgr) GetWorkflow

func (wfm *WorkflowMgr) GetWorkflow(name string) (awf *Workflow, err error)

func (*WorkflowMgr) LoadWorkflows

func (wfm *WorkflowMgr) LoadWorkflows() (err error)

type Workunit

type Workunit struct {
	Id           string            `bson:"wuid" json:"wuid"`
	Info         *Info             `bson:"info" json:"info"`
	Inputs       IOmap             `bson:"inputs" json:"inputs"`
	Outputs      IOmap             `bson:"outputs" json:"outputs"`
	Predata      IOmap             `bson:"predata" json:"predata"`
	Cmd          *Command          `bson:"cmd" json:"cmd"`
	App          *App              `bson:"app" json:"app"`
	Rank         int               `bson:"rank" json:"rank"`
	TotalWork    int               `bson:"totalwork" json:"totalwork"`
	Partition    *PartInfo         `bson:"part" json:"part"`
	State        string            `bson:"state" json:"state"`
	Failed       int               `bson:"failed" json:"failed"`
	CheckoutTime time.Time         `bson:"checkout_time" json:"checkout_time"`
	Client       string            `bson:"client" json:"client"`
	ComputeTime  int               `bson:"computetime" json:"computetime"`
	Notes        string            `bson:"notes" json:"notes"`
	UserAttr     map[string]string `bson:"userattr" json:"userattr"`
}

func NewWorkunit

func NewWorkunit(task *Task, rank int) *Workunit

func (*Workunit) CDworkpath

func (work *Workunit) CDworkpath() (err error)

func (*Workunit) IndexType

func (work *Workunit) IndexType() (indextype string)

func (*Workunit) Mkdir

func (work *Workunit) Mkdir() (err error)

func (*Workunit) Part

func (work *Workunit) Part() (part string)

calculate the range of data part algorithm: try to evenly distribute indexed parts to workunits e.g. totalWork=4, totalParts=10, then each workunits have parts 3,3,2,2

func (*Workunit) Path

func (work *Workunit) Path() string

func (*Workunit) RemoveDir

func (work *Workunit) RemoveDir() (err error)

type WorkunitsSortby added in v0.9.8

type WorkunitsSortby struct {
	Order     string
	Direction string
	Workunits []*Workunit
}

func (WorkunitsSortby) Len added in v0.9.8

func (w WorkunitsSortby) Len() int

func (WorkunitsSortby) Less added in v0.9.8

func (w WorkunitsSortby) Less(i, j int) bool

func (WorkunitsSortby) Swap added in v0.9.8

func (w WorkunitsSortby) Swap(i, j int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL