Documentation ¶
Index ¶
- Constants
- Variables
- func GetJobIdByTaskId(taskid string) (jobid string, err error)
- func GetJobIdByWorkId(workid string) (jobid string, err error)
- func InitAwfMgr()
- func InitClientProfile(profile *Client)
- func InitJobDB()
- func InitProxyWorkChan()
- func InitResMgr(service string)
- func IsFirstTask(taskid string) bool
- func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error)
- func PostNode(io *IO, numParts int) (nodeid string, err error)
- func PostNodeWithToken(io *IO, numParts int, token string) (nodeid string, err error)
- func PushOutputData(work *Workunit) (err error)
- func ReloadFromDisk(path string) (err error)
- func ShockPutIndex(host string, nodeid string, indexname string, token string) (err error)
- func UpdateJobState(jobid string, newstate string) (err error)
- type CQMgr
- func (qm *CQMgr) CheckoutWorkunits(req_policy string, client_id string, num int) (workunits []*Workunit, err error)
- func (qm *CQMgr) ClientChecker()
- func (qm *CQMgr) ClientHeartBeat(id string) (hbmsg HBmsg, err error)
- func (qm *CQMgr) DeleteClient(id string)
- func (qm *CQMgr) EnqueueWorkunit(work *Workunit) (err error)
- func (qm *CQMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *CQMgr) GetAllClients() []*Client
- func (qm *CQMgr) GetClient(id string) (client *Client, err error)
- func (qm *CQMgr) GetWorkById(id string) (workunit *Workunit, err error)
- func (qm *CQMgr) Handle()
- func (qm *CQMgr) NotifyWorkStatus(notice Notice)
- func (qm *CQMgr) RegisterNewClient(files FormFiles) (client *Client, err error)
- func (qm *CQMgr) ShowWorkQueue()
- func (qm *CQMgr) ShowWorkunits(status string) (workunits []*Workunit)
- func (qm *CQMgr) Timer()
- func (qm *CQMgr) UpdateSubClients(id string, count int)
- type Client
- type ClientMgr
- type ClientWorkMgr
- type CoAck
- type CoReq
- type Command
- type FormFile
- type FormFiles
- type HBmsg
- type IO
- func (io *IO) DataUrl() string
- func (io *IO) GetFileSize() int64
- func (io *IO) GetIndexInfo() (idxinfo map[string]IdxInfo, err error)
- func (io *IO) GetIndexUnits(indextype string) (totalunits int, err error)
- func (io *IO) GetShockNode() (node *ShockNode, err error)
- func (io *IO) TotalUnits(indextype string) (count int, err error)
- type IOmap
- type IdxInfo
- type Info
- type Job
- func AwfToJob(awf *Workflow, jid string) (job *Job, err error)
- func CreateJobUpload(params map[string]string, files FormFiles, jid string) (job *Job, err error)
- func LoadJob(id string) (job *Job, err error)
- func ParseAwf(filename string, jid string) (job *Job, err error)
- func ParseJobTasks(filename string, jid string) (job *Job, err error)
- func (job *Job) FilePath() string
- func (job *Job) GetDataToken() (token string)
- func (job *Job) Mkdir() (err error)
- func (job *Job) NumTask() int
- func (job *Job) Path() string
- func (job *Job) Save() (err error)
- func (job *Job) SetDataToken(token string)
- func (job *Job) SetFile(file FormFile) (err error)
- func (job *Job) TaskList() []*Task
- func (job *Job) UpdateFile(params map[string]string, files FormFiles) (err error)
- func (job *Job) UpdateState(newState string, notes string) (err error)
- func (job *Job) UpdateTask(task *Task) (remainTasks int, err error)
- type JobMgr
- type JobPerf
- type Jobs
- func (n *Jobs) GetAll(q bson.M) (err error)
- func (n *Jobs) GetAllLimitOffset(q bson.M, limit int, offset int) (err error)
- func (n *Jobs) GetAllRecent(q bson.M, recent int) (count int, err error)
- func (n *Jobs) GetJobAt(index int) Job
- func (n *Jobs) GetPaginated(q bson.M, limit int, offset int) (count int, err error)
- func (n *Jobs) Length() int
- type Notice
- type Opts
- type PartInfo
- type Pipeline
- type ProxyMgr
- func (qm *ProxyMgr) ClientChecker()
- func (qm *ProxyMgr) DeleteJob(jobid string) (err error)
- func (qm *ProxyMgr) DeleteSuspendedJobs() (num int)
- func (qm *ProxyMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)
- func (qm *ProxyMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *ProxyMgr) FinalizeWorkPerf(string, string) (err error)
- func (qm *ProxyMgr) GetActiveJobs() map[string]*JobPerf
- func (qm *ProxyMgr) GetSuspendJobs() map[string]bool
- func (qm *ProxyMgr) Handle()
- func (qm *ProxyMgr) InitMaxJid() (err error)
- func (qm *ProxyMgr) JobRegister() (jid string, err error)
- func (qm *ProxyMgr) RecomputeJob(jobid string, stage string) (err error)
- func (qm *ProxyMgr) RecoverJobs() (err error)
- func (qm *ProxyMgr) RegisterNewClient(files FormFiles) (client *Client, err error)
- func (qm *ProxyMgr) ResubmitJob(id string) (err error)
- func (qm *ProxyMgr) ResumeSuspendedJob(id string) (err error)
- func (qm *ProxyMgr) ShowStatus() string
- func (qm *ProxyMgr) SuspendJob(jobid string, reason string) (err error)
- func (qm *ProxyMgr) Timer()
- func (qm *ProxyMgr) UpdateGroup(jobid string, newgroup string) (err error)
- type ResourceMgr
- type ServerMgr
- func (qm *ServerMgr) CreateJobPerf(jobid string)
- func (qm *ServerMgr) CreateTaskPerf(taskid string)
- func (qm *ServerMgr) CreateWorkPerf(workid string)
- func (qm *ServerMgr) DeleteJob(jobid string) (err error)
- func (qm *ServerMgr) DeleteJobPerf(jobid string)
- func (qm *ServerMgr) DeleteSuspendedJobs() (num int)
- func (qm *ServerMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)
- func (qm *ServerMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *ServerMgr) FinalizeJobPerf(jobid string)
- func (qm *ServerMgr) FinalizeTaskPerf(taskid string)
- func (qm *ServerMgr) FinalizeWorkPerf(workid string, reportfile string) (err error)
- func (qm *ServerMgr) GetActiveJobs() map[string]*JobPerf
- func (qm *ServerMgr) GetSuspendJobs() map[string]bool
- func (qm *ServerMgr) Handle()
- func (qm *ServerMgr) InitMaxJid() (err error)
- func (qm *ServerMgr) JobRegister() (jid string, err error)
- func (qm *ServerMgr) LogJobPerf(jobid string)
- func (qm *ServerMgr) RecomputeJob(jobid string, stage string) (err error)
- func (qm *ServerMgr) RecoverJobs() (err error)
- func (qm *ServerMgr) ResubmitJob(id string) (err error)
- func (qm *ServerMgr) ResumeSuspendedJob(id string) (err error)
- func (qm *ServerMgr) ShowStatus() string
- func (qm *ServerMgr) ShowTasks()
- func (qm *ServerMgr) SuspendJob(jobid string, reason string) (err error)
- func (qm *ServerMgr) Timer()
- func (qm *ServerMgr) UpdateGroup(jobid string, newgroup string) (err error)
- type ShockNode
- type ShockResponse
- type Task
- type TaskPerf
- type WQueue
- type WorkList
- type WorkMgr
- type WorkPerf
- type Workflow
- type WorkflowMgr
- type Workunit
Constants ¶
const ( CLIENT_STAT_ACTIVE = "active" CLIENT_STAT_SUSPEND = "suspend" )
const ( JOB_STAT_SUBMITTED = "submitted" JOB_STAT_INPROGRESS = "in-progress" JOB_STAT_COMPLETED = "completed" JOB_STAT_SUSPEND = "suspend" JOB_STAT_DELETED = "deleted" )
const ( TASK_STAT_INIT = "init" TASK_STAT_QUEUED = "queued" TASK_STAT_PENDING = "pending" TASK_STAT_SUSPEND = "suspend" TASK_STAT_COMPLETED = "completed" TASK_STAT_SKIPPED = "user_skipped" TASK_STAT_FAIL_SKIP = "skipped" TASK_STAT_PASSED = "passed" )
const ( WORK_STAT_QUEUED = "queued" WORK_STAT_CHECKOUT = "checkout" WORK_STAT_SUSPEND = "suspend" WORK_STAT_DONE = "done" WORK_STAT_FAIL = "fail" WORK_STAT_PREPARED = "prepared" WORK_STAT_COMPUTED = "computed" WORK_STAT_PROXYQUEUED = "proxyqueued" )
Variables ¶
var ( QMgr ResourceMgr Service string = "unknown" Self *Client ProxyWorkChan chan bool )
var ( //<inputs::i1> <inputs::i2> <outputs::o1> TemplateRe = regexp.MustCompile("\\w+::\\w+") )
Functions ¶
func GetJobIdByWorkId ¶
func InitAwfMgr ¶
func InitAwfMgr()
func InitClientProfile ¶
func InitClientProfile(profile *Client)
func InitProxyWorkChan ¶
func InitProxyWorkChan()
func InitResMgr ¶
func InitResMgr(service string)
func IsFirstTask ¶
func NotifyWorkunitProcessed ¶
functions for REST API communication notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"
func PostNodeWithToken ¶
func PushOutputData ¶
func ReloadFromDisk ¶
func ShockPutIndex ¶
func UpdateJobState ¶
Types ¶
type CQMgr ¶
type CQMgr struct {
// contains filtered or unexported fields
}
func (*CQMgr) CheckoutWorkunits ¶
func (*CQMgr) ClientChecker ¶
func (qm *CQMgr) ClientChecker()
func (*CQMgr) ClientHeartBeat ¶
func (*CQMgr) DeleteClient ¶
func (*CQMgr) EnqueueWorkunit ¶
func (*CQMgr) FetchDataToken ¶
func (*CQMgr) GetAllClients ¶
func (*CQMgr) NotifyWorkStatus ¶
func (*CQMgr) RegisterNewClient ¶
func (*CQMgr) ShowWorkunits ¶
func (*CQMgr) UpdateSubClients ¶
type Client ¶
type Client struct { Id string `bson:"id" json:"id"` Name string `bson:"name" json:"name"` Group string `bson:"group" json:"group"` User string `bson:"user" json:"user"` Host string `bson:"host" json:"host"` CPUs int `bson:"cores" json:"cores"` Apps []string `bson:"apps" json:"apps"` RegTime time.Time `bson:"regtime" json:"regtime"` Serve_time string `bson:"serve_time" json:"serve_time"` Idle_time int `bson:"idle_time" json:"idle_time"` Status string `bson:"Status" json:"Status"` Total_checkout int `bson:"total_checkout" json:"total_checkout"` Total_completed int `bson:"total_completed" json:"total_completed"` Total_failed int `bson:"total_failed" json:"total_failed"` Current_work map[string]bool `bson:"current_work" json:"current_work"` Skip_work []string `bson:"skip_work" json:"skip_work"` Last_failed int `bson:"-" json:"-"` Tag bool `bson:"-" json:"-"` Proxy bool `bson:"proxy" json:"proxy"` SubClients int `bson:"subclients" json:"subclients"` }
func NewProfileClient ¶
type ClientWorkMgr ¶
type Command ¶
type Command struct { Name string `bson:"name" json:"name"` Options string `bson:"options" json:"-"` Args string `bson:"args" json:"args"` Template string `bson:"template" json:"-"` Description string `bson:"description" json:"description"` ParsedArgs []string `bson:"-" json:"-"` }
func NewCommand ¶
type HBmsg ¶
heartbeat response from awe-server to awe-client used for issue operation request to client, e.g. discard suspended workunits
type IO ¶
type IO struct { Name string `bson:"name" json:"-"` Host string `bson:"host" json:"host"` Node string `bson:"node" json:"node"` Url string `bson:"url" json:"url"` Size int64 `bson:"size" json:"size"` MD5 string `bson:"md5" json:"-"` Cache bool `bson:"cache" json:"-"` Origin string `bson:"origin" json:"origin"` Path string `bson:"path" json:"-"` Optional bool `bson:"optional" json:"-"` Nonzero bool `bson:"nonzero" json:"nonzero"` DataToken string `bson:"datatoken" json:"-"` }
func (*IO) GetFileSize ¶
func (*IO) GetIndexUnits ¶
func (*IO) GetShockNode ¶
type Info ¶
type Info struct { Name string `bson:"name" json:"name"` Xref string `bson:"xref" json:"xref"` Project string `bson:"project" json:"project"` User string `bson:"user" json:"user"` Pipeline string `bson:"pipeline" json:"pipeline"` ClientGroups string `bson:"clientgroups" json:"clientgroups"` SubmitTime time.Time `bson:"submittime" json:"submittime"` Priority int `bson:"priority" json:"-"` Auth bool `bson:"auth" json:"auth"` DataToken string `bson:"datatoken" json:"-"` }
type Job ¶
type Job struct { Id string `bson:"id" json:"id"` Jid string `bson:"jid" json:"jid"` Info *Info `bson:"info" json:"info"` Tasks []*Task `bson:"tasks" json:"tasks"` Script script `bson:"script" json:"-"` State string `bson:"state" json:"state"` RemainTasks int `bson:"remaintasks" json:"remaintasks"` UpdateTime time.Time `bson:"updatetime" json:"updatetime"` Notes string `bson:"notes" json:"notes"` }
func CreateJobUpload ¶
func ParseJobTasks ¶
parse job by job script
func (*Job) GetDataToken ¶
func (*Job) UpdateFile ¶
---Script upload
func (*Job) UpdateState ¶
---Field update functions
type JobMgr ¶
type JobMgr interface { JobRegister() (string, error) EnqueueTasksByJobId(string, []*Task) error GetActiveJobs() map[string]*JobPerf GetSuspendJobs() map[string]bool SuspendJob(string, string) error ResumeSuspendedJob(string) error ResubmitJob(string) error DeleteJob(string) error DeleteSuspendedJobs() int InitMaxJid() error RecoverJobs() error FinalizeWorkPerf(string, string) error RecomputeJob(string, string) error UpdateGroup(string, string) error }
type JobPerf ¶
type JobPerf struct { Id string Queued int64 //Start int64 End int64 Resp int64 //End - Queued Ptasks map[string]*TaskPerf Pworks map[string]*WorkPerf }
func NewJobPerf ¶
type Jobs ¶
type Jobs []Job
Job array type
func (*Jobs) GetAllLimitOffset ¶
func (*Jobs) GetAllRecent ¶
func (*Jobs) GetPaginated ¶
type Pipeline ¶
type Pipeline struct { Info *Info `bson:"info" json:"info"` Tasks []Task `bson:"tasks" json:"tasks"` }
func NewPipeline ¶
func NewPipeline() *Pipeline
type ProxyMgr ¶
type ProxyMgr struct {
CQMgr
}
func NewProxyMgr ¶
func NewProxyMgr() *ProxyMgr
func (*ProxyMgr) ClientChecker ¶
func (qm *ProxyMgr) ClientChecker()
func (*ProxyMgr) DeleteSuspendedJobs ¶
func (*ProxyMgr) EnqueueTasksByJobId ¶
func (*ProxyMgr) FetchDataToken ¶
func (*ProxyMgr) FinalizeWorkPerf ¶
func (*ProxyMgr) GetActiveJobs ¶
func (*ProxyMgr) GetSuspendJobs ¶
func (*ProxyMgr) InitMaxJid ¶
func (*ProxyMgr) JobRegister ¶
func (*ProxyMgr) RecomputeJob ¶
recompute jobs from specified task stage
func (*ProxyMgr) RecoverJobs ¶
recover jobs not completed before awe-server restarts
func (*ProxyMgr) RegisterNewClient ¶
func (*ProxyMgr) ResubmitJob ¶
re-submit a job in db but not in the queue (caused by server restarting)
func (*ProxyMgr) ResumeSuspendedJob ¶
resubmit a suspended job
func (*ProxyMgr) ShowStatus ¶
func (*ProxyMgr) SuspendJob ¶
type ResourceMgr ¶
type ResourceMgr interface { ClientWorkMgr JobMgr Handle() ShowStatus() string Timer() }
type ServerMgr ¶
type ServerMgr struct { CQMgr // contains filtered or unexported fields }
func NewServerMgr ¶
func NewServerMgr() *ServerMgr
func (*ServerMgr) CreateJobPerf ¶
---perf related methods
func (*ServerMgr) CreateTaskPerf ¶
func (*ServerMgr) CreateWorkPerf ¶
func (*ServerMgr) DeleteJobPerf ¶
func (*ServerMgr) DeleteSuspendedJobs ¶
func (*ServerMgr) EnqueueTasksByJobId ¶
func (*ServerMgr) FetchDataToken ¶
--workunit methds (servermgr implementation)
func (*ServerMgr) FinalizeJobPerf ¶
func (*ServerMgr) FinalizeTaskPerf ¶
func (*ServerMgr) FinalizeWorkPerf ¶
func (*ServerMgr) GetActiveJobs ¶
func (*ServerMgr) GetSuspendJobs ¶
func (*ServerMgr) InitMaxJid ¶
func (*ServerMgr) JobRegister ¶
---job methods---
func (*ServerMgr) LogJobPerf ¶
func (*ServerMgr) RecomputeJob ¶
recompute jobs from specified task stage
func (*ServerMgr) RecoverJobs ¶
recover jobs not completed before awe-server restarts
func (*ServerMgr) ResubmitJob ¶
re-submit a job in db but not in the queue (caused by server restarting)
func (*ServerMgr) ResumeSuspendedJob ¶
resubmit a suspended job
func (*ServerMgr) ShowStatus ¶
func (*ServerMgr) SuspendJob ¶
type ShockNode ¶
type ShockNode struct { Id string `bson:"id" json:"id"` Version string `bson:"version" json:"version"` File shockfile `bson:"file" json:"file"` Attributes interface{} `bson:"attributes" json:"attributes"` Indexes map[string]IdxInfo `bson:"indexes" json:"indexes"` //Acl Acl `bson:"acl" json:"-"` VersionParts map[string]string `bson:"version_parts" json:"-"` Tags []string `bson:"tags" json:"tags"` // Revisions []ShockNode `bson:"revisions" json:"-"` Linkages []linkage `bson:"linkage" json:"linkages"` }
type ShockResponse ¶
type Task ¶
type Task struct { Id string `bson:"taskid" json:"taskid"` Info *Info `bson:"info" json:"-"` Inputs IOmap `bson:"inputs" json:"inputs"` Outputs IOmap `bson:"outputs" json:"outputs"` Predata IOmap `bson:"predata" json:"predata"` Cmd *Command `bson:"cmd" json:"cmd"` Partition *PartInfo `bson:"partinfo" json:"-"` DependsOn []string `bson:"dependsOn" json:"dependsOn"` TotalWork int `bson:"totalwork" json:"totalwork"` MaxWorkSize int `bson:"maxworksize" json:"maxworksize"` RemainWork int `bson:"remainwork" json:"remainwork"` WorkStatus []string `bson:"workstatus" json:"-"` State string `bson:"state" json:"state"` Skip int `bson:"skip" json:"-"` }
func (*Task) InitPartIndex ¶
get part size based on partition/index info if fail to get index info, task.TotalWork fall back to 1 and return nil
func (*Task) ParseWorkunit ¶
func (*Task) UpdateState ¶
type TaskPerf ¶
type TaskPerf struct { Size int64 Queued int64 End int64 Resp int64 //End -Queued InFileSizes []int64 OutFileSizes []int64 }
func NewTaskPerf ¶
type WorkPerf ¶
type WorkPerf struct { Queued int64 // WQ (queued at server or client, depending on who creates it) Done int64 // WD (done at server) Resp int64 // Done - Queued (server metric) Checkout int64 // checkout at client Deliver int64 // done at client ClientResp int64 // Deliver - Checkout (client metric) DataIn int64 // input data move-in at client DataOut int64 // output data move-out at client Runtime int64 // computing time at client ClientId string }
func NewWorkPerf ¶
type Workflow ¶
type Workflow struct { WfInfo awf_info `bson:"workflow_info" json:"workflow_info"` JobInfo awf_jobinfo `bson:"job_info" json:"job_info"` RawInputs map[string]string `bson:"raw_inputs" json:"raw_inputs"` Variables map[string]string `bson:"variables" json:"variables"` DataServer string `bson:"data_server" json:"data_server"` Tasks []*awf_task `bson:"tasks" json:"tasks"` }
type WorkflowMgr ¶
type WorkflowMgr struct {
// contains filtered or unexported fields
}
var (
AwfMgr *WorkflowMgr
)
func NewWorkflowMgr ¶
func NewWorkflowMgr() *WorkflowMgr
func (*WorkflowMgr) AddWorkflow ¶
func (wfm *WorkflowMgr) AddWorkflow(name string, awf *Workflow)
func (*WorkflowMgr) GetAllWorkflows ¶
func (wfm *WorkflowMgr) GetAllWorkflows() (workflows []*Workflow)
func (*WorkflowMgr) GetWorkflow ¶
func (wfm *WorkflowMgr) GetWorkflow(name string) (awf *Workflow, err error)
func (*WorkflowMgr) LoadWorkflows ¶
func (wfm *WorkflowMgr) LoadWorkflows() (err error)
type Workunit ¶
type Workunit struct { Id string `bson:"wuid" json:"wuid"` Info *Info `bson:"info" json:"info"` Inputs IOmap `bson:"inputs" json:"inputs"` Outputs IOmap `bson:"outputs" json:"outputs"` Predata IOmap `bson:"predata" json:"predata"` Cmd *Command `bson:"cmd" json:"cmd"` Rank int `bson:"rank" json:"rank"` TotalWork int `bson:"totalwork" json:"totalwork"` Partition *PartInfo `bson:"part" json:"part"` State string `bson:"state" json:"state"` Failed int `bson:"failed" json:"failed"` CheckoutTime time.Time `bson:"checkout_time" json:"checkout_time"` Client string `bson:"client" json:"client"` }