Documentation
¶
Index ¶
- Constants
- Variables
- func GetJobIdByTaskId(taskid string) (jobid string, err error)
- func GetJobIdByWorkId(workid string) (jobid string, err error)
- func GetTaskIdByWorkId(workid string) (taskid string, err error)
- func InitAwfMgr()
- func InitClientProfile(profile *Client)
- func InitJobDB()
- func InitProxyWorkChan()
- func InitResMgr(service string)
- func IsFirstTask(taskid string) bool
- func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error)
- func NotifyWorkunitProcessedWithLogs(work *Workunit, perf *WorkPerf, sendstdlogs bool) (err error)
- func PostNode(io *IO, numParts int) (nodeid string, err error)
- func PostNodeWithToken(io *IO, numParts int, token string) (nodeid string, err error)
- func PushOutputData(work *Workunit) (size int64, err error)
- func PutFileToShock(filename string, host string, nodeid string, rank int, token string, ...) (err error)
- func ReloadFromDisk(path string) (err error)
- func ShockPutIndex(host string, nodeid string, indexname string, token string) (err error)
- func UpdateJobState(jobid string, newstate string, oldstates []string) (err error)
- type CQMgr
- func (qm *CQMgr) CheckoutWorkunits(req_policy string, client_id string, num int) (workunits []*Workunit, err error)
- func (qm *CQMgr) ClientChecker()
- func (qm *CQMgr) ClientHeartBeat(id string) (hbmsg HBmsg, err error)
- func (qm *CQMgr) DeleteClient(id string) (err error)
- func (qm *CQMgr) EnqueueWorkunit(work *Workunit) (err error)
- func (qm *CQMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *CQMgr) GetAllClients() []*Client
- func (qm *CQMgr) GetClient(id string) (client *Client, err error)
- func (qm *CQMgr) GetWorkById(id string) (workunit *Workunit, err error)
- func (qm *CQMgr) Handle()
- func (qm *CQMgr) NotifyWorkStatus(notice Notice)
- func (qm *CQMgr) ReQueueWorkunitByClient(clientid string) (err error)
- func (qm *CQMgr) RegisterNewClient(files FormFiles) (client *Client, err error)
- func (qm *CQMgr) ResumeClient(id string) (err error)
- func (qm *CQMgr) ResumeSuspendedClients() (count int)
- func (qm *CQMgr) ShowWorkQueue()
- func (qm *CQMgr) ShowWorkunits(status string) (workunits []*Workunit)
- func (qm *CQMgr) SuspendAllClients() (count int)
- func (qm *CQMgr) SuspendClient(id string) (err error)
- func (qm *CQMgr) Timer()
- func (qm *CQMgr) UpdateSubClients(id string, count int)
- type Client
- type ClientMgr
- type ClientWorkMgr
- type CoAck
- type CoReq
- type Command
- type FormFile
- type FormFiles
- type HBmsg
- type IO
- func (io *IO) DataUrl() string
- func (io *IO) GetFileSize() int64
- func (io *IO) GetIndexInfo() (idxinfo map[string]IdxInfo, err error)
- func (io *IO) GetIndexUnits(indextype string) (totalunits int, err error)
- func (io *IO) GetShockNode() (node *ShockNode, err error)
- func (io *IO) TotalUnits(indextype string) (count int, err error)
- type IOmap
- type IdxInfo
- type Info
- type Job
- func AwfToJob(awf *Workflow, jid string) (job *Job, err error)
- func CreateJobUpload(params map[string]string, files FormFiles, jid string) (job *Job, err error)
- func LoadJob(id string) (job *Job, err error)
- func ParseAwf(filename string, jid string) (job *Job, err error)
- func ParseJobTasks(filename string, jid string) (job *Job, err error)
- func (job *Job) FilePath() string
- func (job *Job) GetDataToken() (token string)
- func (job *Job) Mkdir() (err error)
- func (job *Job) NumTask() int
- func (job *Job) Path() string
- func (job *Job) Save() (err error)
- func (job *Job) SetDataToken(token string)
- func (job *Job) SetFile(file FormFile) (err error)
- func (job *Job) TaskList() []*Task
- func (job *Job) UpdateFile(params map[string]string, files FormFiles) (err error)
- func (job *Job) UpdateState(newState string, notes string) (err error)
- func (job *Job) UpdateTask(task *Task) (remainTasks int, err error)
- type JobMgr
- type JobPerf
- type Jobs
- func (n *Jobs) GetAll(q bson.M) (err error)
- func (n *Jobs) GetAllLimitOffset(q bson.M, limit int, offset int) (err error)
- func (n *Jobs) GetAllRecent(q bson.M, recent int) (count int, err error)
- func (n *Jobs) GetJobAt(index int) Job
- func (n *Jobs) GetPaginated(q bson.M, limit int, offset int) (count int, err error)
- func (n *Jobs) Length() int
- type Notice
- type Opts
- type PartInfo
- type Pipeline
- type ProxyMgr
- func (qm *ProxyMgr) ClientChecker()
- func (qm *ProxyMgr) DeleteJob(jobid string) (err error)
- func (qm *ProxyMgr) DeleteSuspendedJobs() (num int)
- func (qm *ProxyMgr) DeleteZombieJobs() (num int)
- func (qm *ProxyMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)
- func (qm *ProxyMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *ProxyMgr) FinalizeWorkPerf(string, string) (err error)
- func (qm *ProxyMgr) GetActiveJobs() map[string]*JobPerf
- func (qm *ProxyMgr) GetReportMsg(string, string) (report string, err error)
- func (qm *ProxyMgr) GetSuspendJobs() map[string]bool
- func (qm *ProxyMgr) Handle()
- func (qm *ProxyMgr) InitMaxJid() (err error)
- func (qm *ProxyMgr) IsJobRegistered(id string) bool
- func (qm *ProxyMgr) JobRegister() (jid string, err error)
- func (qm *ProxyMgr) RecomputeJob(jobid string, stage string) (err error)
- func (qm *ProxyMgr) RecoverJobs() (err error)
- func (qm *ProxyMgr) RegisterNewClient(files FormFiles) (client *Client, err error)
- func (qm *ProxyMgr) ResubmitJob(id string) (err error)
- func (qm *ProxyMgr) ResumeSuspendedJob(id string) (err error)
- func (qm *ProxyMgr) ResumeSuspendedJobs() (num int)
- func (qm *ProxyMgr) SaveStdLog(string, string, string) (err error)
- func (qm *ProxyMgr) ShowStatus() string
- func (qm *ProxyMgr) SuspendJob(jobid string, reason string) (err error)
- func (qm *ProxyMgr) Timer()
- func (qm *ProxyMgr) UpdateGroup(jobid string, newgroup string) (err error)
- type ResourceMgr
- type ServerMgr
- func (qm *ServerMgr) CreateJobPerf(jobid string)
- func (qm *ServerMgr) CreateTaskPerf(taskid string)
- func (qm *ServerMgr) CreateWorkPerf(workid string)
- func (qm *ServerMgr) DeleteJob(jobid string) (err error)
- func (qm *ServerMgr) DeleteJobPerf(jobid string)
- func (qm *ServerMgr) DeleteSuspendedJobs() (num int)
- func (qm *ServerMgr) DeleteZombieJobs() (num int)
- func (qm *ServerMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)
- func (qm *ServerMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *ServerMgr) FinalizeJobPerf(jobid string)
- func (qm *ServerMgr) FinalizeTaskPerf(taskid string)
- func (qm *ServerMgr) FinalizeWorkPerf(workid string, reportfile string) (err error)
- func (qm *ServerMgr) GetActiveJobs() map[string]*JobPerf
- func (qm *ServerMgr) GetReportMsg(workid string, logname string) (report string, err error)
- func (qm *ServerMgr) GetSuspendJobs() map[string]bool
- func (qm *ServerMgr) Handle()
- func (qm *ServerMgr) InitMaxJid() (err error)
- func (qm *ServerMgr) IsJobRegistered(id string) bool
- func (qm *ServerMgr) JobRegister() (jid string, err error)
- func (qm *ServerMgr) LogJobPerf(jobid string)
- func (qm *ServerMgr) RecomputeJob(jobid string, stage string) (err error)
- func (qm *ServerMgr) RecoverJobs() (err error)
- func (qm *ServerMgr) ResubmitJob(id string) (err error)
- func (qm *ServerMgr) ResumeSuspendedJob(id string) (err error)
- func (qm *ServerMgr) ResumeSuspendedJobs() (num int)
- func (qm *ServerMgr) SaveStdLog(workid string, logname string, tmppath string) (err error)
- func (qm *ServerMgr) ShowStatus() string
- func (qm *ServerMgr) ShowTasks()
- func (qm *ServerMgr) SuspendJob(jobid string, reason string) (err error)
- func (qm *ServerMgr) Timer()
- func (qm *ServerMgr) UpdateGroup(jobid string, newgroup string) (err error)
- func (qm *ServerMgr) UpdateJobPerfStartTime(jobid string)
- func (qm *ServerMgr) UpdateJobTaskToInProgress(works []*Workunit)
- func (qm *ServerMgr) UpdateTaskPerfStartTime(taskid string)
- type ShockClient
- type ShockNode
- type ShockQueryResponse
- type ShockResponse
- type Task
- type TaskPerf
- type WQueue
- type WorkList
- type WorkMgr
- type WorkPerf
- type Workflow
- type WorkflowMgr
- type Workunit
Constants ¶
const ( CLIENT_STAT_ACTIVE_BUSY = "active-busy" CLIENT_STAT_ACTIVE_IDLE = "active-idle" CLIENT_STAT_SUSPEND = "suspend" CLIENT_STAT_DELETED = "deleted" )
const ( JOB_STAT_INIT = "init" JOB_STAT_QUEUED = "queued" JOB_STAT_INPROGRESS = "in-progress" JOB_STAT_COMPLETED = "completed" JOB_STAT_SUSPEND = "suspend" JOB_STAT_DELETED = "deleted" )
const ( TASK_STAT_INIT = "init" TASK_STAT_QUEUED = "queued" TASK_STAT_INPROGRESS = "in-progress" TASK_STAT_PENDING = "pending" TASK_STAT_SUSPEND = "suspend" TASK_STAT_COMPLETED = "completed" TASK_STAT_SKIPPED = "user_skipped" TASK_STAT_FAIL_SKIP = "skipped" TASK_STAT_PASSED = "passed" )
const ( WORK_STAT_QUEUED = "queued" WORK_STAT_CHECKOUT = "checkout" WORK_STAT_SUSPEND = "suspend" WORK_STAT_DONE = "done" WORK_STAT_FAIL = "fail" WORK_STAT_PREPARED = "prepared" WORK_STAT_COMPUTED = "computed" WORK_STAT_PROXYQUEUED = "proxyqueued" )
Variables ¶
var ( QMgr ResourceMgr Service string = "unknown" Self *Client ProxyWorkChan chan bool )
var JOB_STATS_ACTIVE = []string{JOB_STAT_QUEUED, JOB_STAT_INPROGRESS}
Functions ¶
func GetJobIdByWorkId ¶
func GetTaskIdByWorkId ¶
func InitAwfMgr ¶
func InitAwfMgr()
func InitClientProfile ¶
func InitClientProfile(profile *Client)
func InitProxyWorkChan ¶
func InitProxyWorkChan()
func InitResMgr ¶
func InitResMgr(service string)
func IsFirstTask ¶
func NotifyWorkunitProcessed ¶
functions for REST API communication (=deprecated=) notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"
func PostNodeWithToken ¶
func PushOutputData ¶
func PutFileToShock ¶
func ReloadFromDisk ¶
func ShockPutIndex ¶
Types ¶
type CQMgr ¶
type CQMgr struct {
// contains filtered or unexported fields
}
func (*CQMgr) CheckoutWorkunits ¶
func (*CQMgr) ClientChecker ¶
func (qm *CQMgr) ClientChecker()
func (*CQMgr) ClientHeartBeat ¶
func (*CQMgr) DeleteClient ¶
func (*CQMgr) EnqueueWorkunit ¶
func (*CQMgr) FetchDataToken ¶
func (*CQMgr) GetAllClients ¶
func (*CQMgr) NotifyWorkStatus ¶
func (*CQMgr) ReQueueWorkunitByClient ¶
func (*CQMgr) RegisterNewClient ¶
func (*CQMgr) ResumeClient ¶
func (*CQMgr) ResumeSuspendedClients ¶
func (*CQMgr) ShowWorkunits ¶
func (*CQMgr) SuspendAllClients ¶
func (*CQMgr) SuspendClient ¶
func (*CQMgr) UpdateSubClients ¶
type Client ¶
type Client struct { Id string `bson:"id" json:"id"` Name string `bson:"name" json:"name"` Group string `bson:"group" json:"group"` User string `bson:"user" json:"user"` Domain string `bson:"domain" json:"domain"` InstanceId string `bson:"instance_id" json:"instance_id"` InstanceType string `bson:"instance_type" json:"instance_type"` Host string `bson:"host" json:"host"` CPUs int `bson:"cores" json:"cores"` Apps []string `bson:"apps" json:"apps"` RegTime time.Time `bson:"regtime" json:"regtime"` Serve_time string `bson:"serve_time" json:"serve_time"` Idle_time int `bson:"idle_time" json:"idle_time"` Status string `bson:"Status" json:"Status"` Total_checkout int `bson:"total_checkout" json:"total_checkout"` Total_completed int `bson:"total_completed" json:"total_completed"` Total_failed int `bson:"total_failed" json:"total_failed"` Current_work map[string]bool `bson:"current_work" json:"current_work"` Skip_work []string `bson:"skip_work" json:"skip_work"` Last_failed int `bson:"-" json:"-"` Tag bool `bson:"-" json:"-"` Proxy bool `bson:"proxy" json:"proxy"` SubClients int `bson:"subclients" json:"subclients"` }
func NewProfileClient ¶
type ClientMgr ¶
type ClientMgr interface { RegisterNewClient(FormFiles) (*Client, error) ClientHeartBeat(string) (HBmsg, error) GetClient(string) (*Client, error) GetAllClients() []*Client DeleteClient(string) (err error) SuspendClient(string) (err error) ResumeClient(string) (err error) ResumeSuspendedClients() (count int) SuspendAllClients() (count int) ClientChecker() UpdateSubClients(id string, count int) }
type ClientWorkMgr ¶
type Command ¶
type Command struct { Name string `bson:"name" json:"name"` Args string `bson:"args" json:"args"` Dockerimage string `bson:"Dockerimage" json:"Dockerimage"` Environ map[string]string `bson:"environ" json:"environ"` Description string `bson:"description" json:"description"` ParsedArgs []string `bson:"-" json:"-"` }
func NewCommand ¶
type HBmsg ¶
heartbeat response from awe-server to awe-client used for issue operation request to client, e.g. discard suspended workunits
type IO ¶
type IO struct { Name string `bson:"name" json:"-"` Directory string `bson:"directory" json:"directory"` Host string `bson:"host" json:"host"` Node string `bson:"node" json:"node"` Url string `bson:"url" json:"url"` Size int64 `bson:"size" json:"size"` MD5 string `bson:"md5" json:"-"` Cache bool `bson:"cache" json:"-"` Origin string `bson:"origin" json:"origin"` Path string `bson:"path" json:"-"` Optional bool `bson:"optional" json:"-"` Nonzero bool `bson:"nonzero" json:"nonzero"` DataToken string `bson:"datatoken" json:"-"` Intermediate bool `bson:"Intermediate" json:"-"` ShockFilename string `bson:"shockfilename" json:"shockfilename"` AttrFile string `bson:"attrfile" json:"attrfile"` }
func (*IO) GetFileSize ¶
func (*IO) GetIndexUnits ¶
func (*IO) GetShockNode ¶
type Info ¶
type Info struct { Name string `bson:"name" json:"name"` Xref string `bson:"xref" json:"xref"` Project string `bson:"project" json:"project"` User string `bson:"user" json:"user"` Pipeline string `bson:"pipeline" json:"pipeline"` ClientGroups string `bson:"clientgroups" json:"clientgroups"` SubmitTime time.Time `bson:"submittime" json:"submittime"` StartedTme time.Time `bson:"startedtime" json:"startedtime"` CompletedTime time.Time `bson:"completedtime" json:"completedtime"` Priority int `bson:"priority" json:"-"` Auth bool `bson:"auth" json:"auth"` DataToken string `bson:"datatoken" json:"-"` NoRetry bool `bson:"noretry" json:"noretry"` }
job info
type Job ¶
type Job struct { Id string `bson:"id" json:"id"` Jid string `bson:"jid" json:"jid"` Info *Info `bson:"info" json:"info"` Tasks []*Task `bson:"tasks" json:"tasks"` Script script `bson:"script" json:"-"` State string `bson:"state" json:"state"` Registered bool `bson:"-" json:"registered"` //job in memory (not only in mongodb) (assigned on the fly, db value meaningless) RemainTasks int `bson:"remaintasks" json:"remaintasks"` UpdateTime time.Time `bson:"updatetime" json:"updatetime"` Notes string `bson:"notes" json:"notes"` Resumed int `bson:"resumed" json:"resumed"` //number of times the job has been resumed from suspension }
func CreateJobUpload ¶
func ParseJobTasks ¶
parse job by job script
func (*Job) GetDataToken ¶
func (*Job) UpdateFile ¶
---Script upload
func (*Job) UpdateState ¶
---Field update functions
type JobMgr ¶
type JobMgr interface { JobRegister() (string, error) EnqueueTasksByJobId(string, []*Task) error GetActiveJobs() map[string]*JobPerf IsJobRegistered(string) bool GetSuspendJobs() map[string]bool SuspendJob(string, string) error ResumeSuspendedJob(string) error ResumeSuspendedJobs() int ResubmitJob(string) error DeleteJob(string) error DeleteSuspendedJobs() int DeleteZombieJobs() int InitMaxJid() error RecoverJobs() error FinalizeWorkPerf(string, string) error SaveStdLog(string, string, string) error GetReportMsg(string, string) (string, error) RecomputeJob(string, string) error UpdateGroup(string, string) error }
type JobPerf ¶
type JobPerf struct { Id string `bson:"id" json:"id"` Queued int64 `bson:"queued" json:"queued"` Start int64 `bson:"start" json:"start"` End int64 `bson:"end" json:"end"` Resp int64 `bson:"resp" json:"resp"` //End - Queued Ptasks map[string]*TaskPerf `bson:"task_stats" json:"task_stats"` Pworks map[string]*WorkPerf `bson:"work_stats" json:"work_stats"` }
func LoadJobPerf ¶
func NewJobPerf ¶
type Jobs ¶
type Jobs []Job
Job array type
func (*Jobs) GetAllLimitOffset ¶
func (*Jobs) GetAllRecent ¶
func (*Jobs) GetPaginated ¶
type Pipeline ¶
type Pipeline struct { Info *Info `bson:"info" json:"info"` Tasks []Task `bson:"tasks" json:"tasks"` }
func NewPipeline ¶
func NewPipeline() *Pipeline
type ProxyMgr ¶
type ProxyMgr struct {
CQMgr
}
func NewProxyMgr ¶
func NewProxyMgr() *ProxyMgr
func (*ProxyMgr) ClientChecker ¶
func (qm *ProxyMgr) ClientChecker()
func (*ProxyMgr) DeleteSuspendedJobs ¶
func (*ProxyMgr) DeleteZombieJobs ¶
func (*ProxyMgr) EnqueueTasksByJobId ¶
func (*ProxyMgr) FetchDataToken ¶
func (*ProxyMgr) FinalizeWorkPerf ¶
func (*ProxyMgr) GetActiveJobs ¶
func (*ProxyMgr) GetReportMsg ¶
func (*ProxyMgr) GetSuspendJobs ¶
func (*ProxyMgr) InitMaxJid ¶
func (*ProxyMgr) IsJobRegistered ¶
func (*ProxyMgr) JobRegister ¶
func (*ProxyMgr) RecomputeJob ¶
recompute jobs from specified task stage
func (*ProxyMgr) RecoverJobs ¶
recover jobs not completed before awe-server restarts
func (*ProxyMgr) RegisterNewClient ¶
func (*ProxyMgr) ResubmitJob ¶
re-submit a job in db but not in the queue (caused by server restarting)
func (*ProxyMgr) ResumeSuspendedJob ¶
resubmit a suspended job
func (*ProxyMgr) ResumeSuspendedJobs ¶
func (*ProxyMgr) ShowStatus ¶
func (*ProxyMgr) SuspendJob ¶
type ResourceMgr ¶
type ResourceMgr interface { ClientWorkMgr JobMgr Handle() ShowStatus() string Timer() }
type ServerMgr ¶
type ServerMgr struct { CQMgr // contains filtered or unexported fields }
func NewServerMgr ¶
func NewServerMgr() *ServerMgr
func (*ServerMgr) CreateJobPerf ¶
---perf related methods
func (*ServerMgr) CreateTaskPerf ¶
func (*ServerMgr) CreateWorkPerf ¶
func (*ServerMgr) DeleteJobPerf ¶
func (*ServerMgr) DeleteSuspendedJobs ¶
func (*ServerMgr) DeleteZombieJobs ¶
delete jobs in db with "queued" or "in-progress" state but not in the queue (zombie jobs)
func (*ServerMgr) EnqueueTasksByJobId ¶
func (*ServerMgr) FetchDataToken ¶
--workunit methds (servermgr implementation)
func (*ServerMgr) FinalizeJobPerf ¶
func (*ServerMgr) FinalizeTaskPerf ¶
func (*ServerMgr) FinalizeWorkPerf ¶
func (*ServerMgr) GetActiveJobs ¶
func (*ServerMgr) GetReportMsg ¶
func (*ServerMgr) GetSuspendJobs ¶
func (*ServerMgr) InitMaxJid ¶
func (*ServerMgr) IsJobRegistered ¶
func (*ServerMgr) JobRegister ¶
---job methods---
func (*ServerMgr) LogJobPerf ¶
func (*ServerMgr) RecomputeJob ¶
recompute jobs from specified task stage
func (*ServerMgr) RecoverJobs ¶
recover jobs not completed before awe-server restarts
func (*ServerMgr) ResubmitJob ¶
re-submit a job in db but not in the queue (caused by server restarting)
func (*ServerMgr) ResumeSuspendedJob ¶
resubmit a suspended job
func (*ServerMgr) ResumeSuspendedJobs ¶
func (*ServerMgr) SaveStdLog ¶
func (*ServerMgr) ShowStatus ¶
func (*ServerMgr) SuspendJob ¶
func (*ServerMgr) UpdateGroup ¶
update job group
func (*ServerMgr) UpdateJobPerfStartTime ¶
func (*ServerMgr) UpdateJobTaskToInProgress ¶
update job/task states from "queued" to "in-progress" once the first workunit is checked out
func (*ServerMgr) UpdateTaskPerfStartTime ¶
type ShockClient ¶
TODO use Token
func (*ShockClient) Get_node_download_url ¶
func (sc *ShockClient) Get_node_download_url(node ShockNode) (download_url string, err error)
func (*ShockClient) Get_request ¶
func (sc *ShockClient) Get_request(resource string, query url.Values, response interface{}) (err error)
func (*ShockClient) Query ¶
func (sc *ShockClient) Query(query url.Values) (sqr_p *ShockQueryResponse, err error)
example: query_response_p, err := sc.Shock_query(host, url.Values{"docker": {"1"}, "docker_image_name" : {"wgerlach/bowtie2:2.2.0"}});
type ShockNode ¶
type ShockNode struct { Id string `bson:"id" json:"id"` Version string `bson:"version" json:"version"` File shockfile `bson:"file" json:"file"` Attributes interface{} `bson:"attributes" json:"attributes"` Indexes map[string]IdxInfo `bson:"indexes" json:"indexes"` //Acl Acl `bson:"acl" json:"-"` VersionParts map[string]string `bson:"version_parts" json:"-"` Tags []string `bson:"tags" json:"tags"` // Revisions []ShockNode `bson:"revisions" json:"-"` Linkages []linkage `bson:"linkage" json:"linkages"` }
type ShockQueryResponse ¶
type ShockResponse ¶
type Task ¶
type Task struct { Id string `bson:"taskid" json:"taskid"` Info *Info `bson:"info" json:"-"` Inputs IOmap `bson:"inputs" json:"inputs"` Outputs IOmap `bson:"outputs" json:"outputs"` Predata IOmap `bson:"predata" json:"predata"` Cmd *Command `bson:"cmd" json:"cmd"` Partition *PartInfo `bson:"partinfo" json:"-"` DependsOn []string `bson:"dependsOn" json:"dependsOn"` TotalWork int `bson:"totalwork" json:"totalwork"` MaxWorkSize int `bson:"maxworksize" json:"maxworksize"` RemainWork int `bson:"remainwork" json:"remainwork"` WorkStatus []string `bson:"workstatus" json:"-"` State string `bson:"state" json:"state"` Skip int `bson:"skip" json:"-"` CreatedDate time.Time `bson:"createdDate" json:"createddate"` StartedDate time.Time `bson:"startedDate" json:"starteddate"` CompletedDate time.Time `bson:"completedDate" json:"completeddate"` ComputeTime int `bson:"computetime" json:"computetime"` }
func (*Task) InitPartIndex ¶
get part size based on partition/index info if fail to get index info, task.TotalWork fall back to 1 and return nil
func (*Task) ParseWorkunit ¶
func (*Task) UpdateState ¶
type TaskPerf ¶
type TaskPerf struct { Queued int64 `bson:"queued" json:"queued"` Start int64 `bson:"start" json:"start"` End int64 `bson:"end" json:"end"` Resp int64 `bson:"resp" json:"resp"` //End -Queued InFileSizes []int64 `bson:"size_infile" json:"size_infile"` OutFileSizes []int64 `bson:"size_outfile" json:"size_outfile"` }
func NewTaskPerf ¶
type WorkPerf ¶
type WorkPerf struct { Queued int64 `bson:"queued" json:"queued"` // WQ (queued at server or client, depending on who creates it) Done int64 `bson:"done" json:"done"` // WD (done at server) Resp int64 `bson:"resp" json:"resp"` // Done - Queued (server metric) Checkout int64 `bson:"checkout" json:"checkout"` // checkout at client Deliver int64 `bson:"deliver" json:"deliver"` // done at client ClientResp int64 `bson:"clientresp" json:"clientresp"` // Deliver - Checkout (client metric) PreDataIn float64 `bson:"time_predata_in" json:"time_predata_in"` // time in seconds for downloading prerequisite data at client DataIn float64 `bson:"time_data_in" json:"time_data_in"` // time in seconds for input data move-in at client DataOut float64 `bson:"time_data_out" json:"time_data_out"` // time in seconds for output data move-out at client Runtime int64 `bson:"runtime" json:"runtime"` // time in seconds for computation at client MaxMemUsage uint64 `bson:"max_mem_usage" json:"max_mem_usage"` // maxium memery consumption ClientId string `bson:"client_id" json:"client_id"` PreDataSize int64 `bson:"size_predata" json:"size_predata"` //predata moved over network InFileSize int64 `bson:"size_infile" json:"size_infile"` //input file moved over network OutFileSize int64 `bson:"size_outfile" json:"size_outfile"` //outpuf file moved over network }
func NewWorkPerf ¶
type Workflow ¶
type Workflow struct { WfInfo awf_info `bson:"workflow_info" json:"workflow_info"` JobInfo awf_jobinfo `bson:"job_info" json:"job_info"` RawInputs map[string]string `bson:"raw_inputs" json:"raw_inputs"` Variables map[string]string `bson:"variables" json:"variables"` DataServer string `bson:"data_server" json:"data_server"` Tasks []*awf_task `bson:"tasks" json:"tasks"` }
type WorkflowMgr ¶
type WorkflowMgr struct {
// contains filtered or unexported fields
}
var (
AwfMgr *WorkflowMgr
)
func NewWorkflowMgr ¶
func NewWorkflowMgr() *WorkflowMgr
func (*WorkflowMgr) AddWorkflow ¶
func (wfm *WorkflowMgr) AddWorkflow(name string, awf *Workflow)
func (*WorkflowMgr) GetAllWorkflows ¶
func (wfm *WorkflowMgr) GetAllWorkflows() (workflows []*Workflow)
func (*WorkflowMgr) GetWorkflow ¶
func (wfm *WorkflowMgr) GetWorkflow(name string) (awf *Workflow, err error)
func (*WorkflowMgr) LoadWorkflows ¶
func (wfm *WorkflowMgr) LoadWorkflows() (err error)
type Workunit ¶
type Workunit struct { Id string `bson:"wuid" json:"wuid"` Info *Info `bson:"info" json:"info"` Inputs IOmap `bson:"inputs" json:"inputs"` Outputs IOmap `bson:"outputs" json:"outputs"` Predata IOmap `bson:"predata" json:"predata"` Cmd *Command `bson:"cmd" json:"cmd"` Rank int `bson:"rank" json:"rank"` TotalWork int `bson:"totalwork" json:"totalwork"` Partition *PartInfo `bson:"part" json:"part"` State string `bson:"state" json:"state"` Failed int `bson:"failed" json:"failed"` CheckoutTime time.Time `bson:"checkout_time" json:"checkout_time"` Client string `bson:"client" json:"client"` ComputeTime int `bson:"computetime" json:"computetime"` Notes string `bson:"-" json:"-"` }