Documentation
¶
Index ¶
- Constants
- Variables
- func DeleteClientGroup(id string) (err error)
- func Expand_app_variables(app_variables AppVariables, cmd_script []string) (err error)
- func GetJobCount(q bson.M) (count int, err error)
- func GetJobIdByTaskId(taskid string) (jobid string, err error)
- func GetJobIdByWorkId(workid string) (jobid string, err error)
- func GetTaskIdByWorkId(workid string) (taskid string, err error)
- func InitAwfMgr()
- func InitClientGroupDB()
- func InitClientProfile(profile *Client)
- func InitJobDB()
- func InitProxyWorkChan()
- func InitResMgr(service string)
- func IsFirstTask(taskid string) bool
- func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error)
- func NotifyWorkunitProcessedWithLogs(work *Workunit, perf *WorkPerf, sendstdlogs bool) (err error)
- func ParseResource(input_arg AppResource, app_variables AppVariables, job *Job, task *Task, ...) (err error)
- func PostNode(io *IO, numParts int) (nodeid string, err error)
- func PostNodeWithToken(io *IO, numParts int, token string) (nodeid string, err error)
- func PushOutputData(work *Workunit) (size int64, err error)
- func PutFileToShock(filename string, host string, nodeid string, rank int, token string, ...) (err error)
- func ReloadFromDisk(path string) (err error)
- func ShockPutIndex(host string, nodeid string, indexname string, token string) (err error)
- func UpdateJobState(jobid string, newstate string, oldstates []string) (err error)
- type App
- type AppCommandMode
- func (acm AppCommandMode) Get_app_variables(app_variables AppVariables) (err error)
- func (acm AppCommandMode) Get_default_app_variables() (app_variables AppVariables, err error)
- func (acm AppCommandMode) ParseAppInput(app_variables AppVariables, args_array []AppResource, job *Job, task *Task, ...) (err error)
- type AppInput
- type AppInputType
- type AppPackage
- type AppRegistry
- func (apr AppRegistry) GetAppPackage(app_package string) (ap *AppPackage, err error)
- func (appr AppRegistry) Get_cmd_mode_object(app_package_name string, app_command_name string, app_cmd_mode_name string) (app_cmd_mode_object_ref *AppCommandMode, err error)
- func (appr AppRegistry) Get_dockerimage(app_package_name string) (dockerimage string, err error)
- type AppResource
- type AppVariable
- type AppVariables
- type CQMgr
- func (qm *CQMgr) CheckoutWorkunits(req_policy string, client_id string, num int) (workunits []*Workunit, err error)
- func (qm *CQMgr) ClientChecker()
- func (qm *CQMgr) ClientHeartBeat(id string, cg *ClientGroup) (hbmsg HBmsg, err error)
- func (qm *CQMgr) DeleteClient(id string) (err error)
- func (qm *CQMgr) DeleteClientByUser(id string, u *user.User) (err error)
- func (qm *CQMgr) EnqueueWorkunit(work *Workunit) (err error)
- func (qm *CQMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *CQMgr) GetAllClients() []*Client
- func (qm *CQMgr) GetAllClientsByUser(u *user.User) []*Client
- func (qm *CQMgr) GetClient(id string) (client *Client, err error)
- func (qm *CQMgr) GetClientByUser(id string, u *user.User) (client *Client, err error)
- func (qm *CQMgr) GetWorkById(id string) (workunit *Workunit, err error)
- func (qm *CQMgr) Handle()
- func (qm *CQMgr) LockSemaphore()
- func (qm *CQMgr) NotifyWorkStatus(notice Notice)
- func (qm *CQMgr) ReQueueWorkunitByClient(clientid string) (err error)
- func (qm *CQMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)
- func (qm *CQMgr) ResumeClient(id string) (err error)
- func (qm *CQMgr) ResumeClientByUser(id string, u *user.User) (err error)
- func (qm *CQMgr) ResumeSuspendedClients() (count int)
- func (qm *CQMgr) ResumeSuspendedClientsByUser(u *user.User) (count int)
- func (qm *CQMgr) ShowWorkQueue()
- func (qm *CQMgr) ShowWorkunits(status string) (workunits []*Workunit)
- func (qm *CQMgr) ShowWorkunitsByUser(status string, u *user.User) (workunits []*Workunit)
- func (qm *CQMgr) SuspendAllClients() (count int)
- func (qm *CQMgr) SuspendAllClientsByUser(u *user.User) (count int)
- func (qm *CQMgr) SuspendClient(id string) (err error)
- func (qm *CQMgr) SuspendClientByUser(id string, u *user.User) (err error)
- func (qm *CQMgr) Timer()
- func (qm *CQMgr) UnlockSemaphore()
- func (qm *CQMgr) UpdateSubClients(id string, count int)
- func (qm *CQMgr) UpdateSubClientsByUser(id string, count int, u *user.User)
- type Client
- type ClientGroup
- func CreateClientGroup(name string, u *user.User) (cg *ClientGroup, err error)
- func LoadClientGroup(id string) (clientgroup *ClientGroup, err error)
- func LoadClientGroupByName(name string) (clientgroup *ClientGroup, err error)
- func LoadClientGroupByToken(token string) (clientgroup *ClientGroup, err error)
- type ClientGroups
- type ClientMgr
- type ClientWorkMgr
- type CoAck
- type CoReq
- type Command
- type Command_p
- type Environ_p
- type Envs
- type FormFile
- type FormFiles
- type HBmsg
- type IO
- func (io *IO) DataUrl() (dataurl string, err error)
- func (io *IO) DeleteNode() (nodeid string, err error)
- func (io *IO) GetFileSize() int64
- func (io *IO) GetIndexInfo() (idxinfo map[string]shock.IdxInfo, err error)
- func (io *IO) GetIndexUnits(indextype string) (totalunits int, err error)
- func (io *IO) GetShockNode() (node *shock.ShockNode, err error)
- func (io *IO) TotalUnits(indextype string) (count int, err error)
- type IOmap
- type Info
- type Job
- func AwfToJob(awf *Workflow, jid string) (job *Job, err error)
- func CreateJobUpload(u *user.User, params map[string]string, files FormFiles, jid string) (job *Job, err error)
- func LoadJob(id string) (job *Job, err error)
- func ParseAwf(filename string, jid string) (job *Job, err error)
- func ParseJobTasks(filename string, jid string) (job *Job, err error)
- func (job *Job) FilePath() string
- func (job *Job) GetDataToken() (token string)
- func (job *Job) GetPrivateEnv(taskid string) (env map[string]string)
- func (job *Job) Mkdir() (err error)
- func (job *Job) NumTask() int
- func (job *Job) Path() string
- func (job *Job) Save() (err error)
- func (job *Job) SetDataToken(token string)
- func (job *Job) SetFile(file FormFile) (err error)
- func (job *Job) TaskList() []*Task
- func (job *Job) UpdateFile(params map[string]string, files FormFiles) (err error)
- func (job *Job) UpdateState(newState string, notes string) (err error)
- func (job *Job) UpdateTask(task *Task) (remainTasks int, err error)
- type JobMgr
- type JobMin
- type JobPerf
- type Job_p
- type Jobs
- func (n *Jobs) GetAll(q bson.M, order string, direction string) (err error)
- func (n *Jobs) GetAllLimitOffset(q bson.M, limit int, offset int) (err error)
- func (n *Jobs) GetAllRecent(q bson.M, recent int) (count int, err error)
- func (n *Jobs) GetJobAt(index int) Job
- func (n *Jobs) GetPaginated(q bson.M, limit int, offset int, order string, direction string) (count int, err error)
- func (n *Jobs) Length() int
- type Notice
- type Opts
- type PartInfo
- type Pipeline
- type ProxyMgr
- func (qm *ProxyMgr) ClientChecker()
- func (qm *ProxyMgr) DeleteJob(jobid string) (err error)
- func (qm *ProxyMgr) DeleteJobByUser(jobid string, u *user.User) (err error)
- func (qm *ProxyMgr) DeleteSuspendedJobs() (num int)
- func (qm *ProxyMgr) DeleteSuspendedJobsByUser(u *user.User) (num int)
- func (qm *ProxyMgr) DeleteZombieJobs() (num int)
- func (qm *ProxyMgr) DeleteZombieJobsByUser(u *user.User) (num int)
- func (qm *ProxyMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)
- func (qm *ProxyMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *ProxyMgr) FetchPrivateEnv(workid string, clientid string) (env map[string]string, err error)
- func (qm *ProxyMgr) FinalizeWorkPerf(string, string) (err error)
- func (qm *ProxyMgr) GetActiveJobs() map[string]*JobPerf
- func (qm *ProxyMgr) GetReportMsg(string, string) (report string, err error)
- func (qm *ProxyMgr) GetSuspendJobs() map[string]bool
- func (qm *ProxyMgr) Handle()
- func (qm *ProxyMgr) InitMaxJid() (err error)
- func (qm *ProxyMgr) IsJobRegistered(id string) bool
- func (qm *ProxyMgr) JobRegister() (jid string, err error)
- func (qm *ProxyMgr) RecomputeJob(jobid string, stage string) (err error)
- func (qm *ProxyMgr) RecoverJobs() (err error)
- func (qm *ProxyMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)
- func (qm *ProxyMgr) ResubmitJob(id string) (err error)
- func (qm *ProxyMgr) ResumeSuspendedJob(id string) (err error)
- func (qm *ProxyMgr) ResumeSuspendedJobByUser(id string, u *user.User) (err error)
- func (qm *ProxyMgr) ResumeSuspendedJobs() (num int)
- func (qm *ProxyMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)
- func (qm *ProxyMgr) SaveStdLog(string, string, string) (err error)
- func (qm *ProxyMgr) ShowStatus() string
- func (qm *ProxyMgr) SuspendJob(jobid string, reason string, id string) (err error)
- func (qm *ProxyMgr) Timer()
- func (qm *ProxyMgr) UpdateGroup(jobid string, newgroup string) (err error)
- func (qm *ProxyMgr) UpdatePriority(jobid string, priority int) (err error)
- type ResourceMgr
- type ServerMgr
- func (qm *ServerMgr) CreateJobPerf(jobid string)
- func (qm *ServerMgr) CreateTaskPerf(taskid string)
- func (qm *ServerMgr) CreateWorkPerf(workid string)
- func (qm *ServerMgr) DeleteJob(jobid string) (err error)
- func (qm *ServerMgr) DeleteJobByUser(jobid string, u *user.User) (err error)
- func (qm *ServerMgr) DeleteJobPerf(jobid string)
- func (qm *ServerMgr) DeleteSuspendedJobs() (num int)
- func (qm *ServerMgr) DeleteSuspendedJobsByUser(u *user.User) (num int)
- func (qm *ServerMgr) DeleteZombieJobs() (num int)
- func (qm *ServerMgr) DeleteZombieJobsByUser(u *user.User) (num int)
- func (qm *ServerMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)
- func (qm *ServerMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *ServerMgr) FetchPrivateEnv(workid string, clientid string) (env map[string]string, err error)
- func (qm *ServerMgr) FetchPrivateEnvs(workid string, clientid string) (envs map[string]string, err error)
- func (qm *ServerMgr) FinalizeJobPerf(jobid string)
- func (qm *ServerMgr) FinalizeTaskPerf(taskid string)
- func (qm *ServerMgr) FinalizeWorkPerf(workid string, reportfile string) (err error)
- func (qm *ServerMgr) GetActiveJobs() map[string]*JobPerf
- func (qm *ServerMgr) GetReportMsg(workid string, logname string) (report string, err error)
- func (qm *ServerMgr) GetSuspendJobs() map[string]bool
- func (qm *ServerMgr) Handle()
- func (qm *ServerMgr) InitMaxJid() (err error)
- func (qm *ServerMgr) IsJobRegistered(id string) bool
- func (qm *ServerMgr) JobRegister() (jid string, err error)
- func (qm *ServerMgr) LogJobPerf(jobid string)
- func (qm *ServerMgr) RecomputeJob(jobid string, stage string) (err error)
- func (qm *ServerMgr) RecoverJobs() (err error)
- func (qm *ServerMgr) ResubmitJob(id string) (err error)
- func (qm *ServerMgr) ResumeSuspendedJob(id string) (err error)
- func (qm *ServerMgr) ResumeSuspendedJobByUser(id string, u *user.User) (err error)
- func (qm *ServerMgr) ResumeSuspendedJobs() (num int)
- func (qm *ServerMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)
- func (qm *ServerMgr) SaveStdLog(workid string, logname string, tmppath string) (err error)
- func (qm *ServerMgr) ShowStatus() string
- func (qm *ServerMgr) ShowTasks()
- func (qm *ServerMgr) SuspendJob(jobid string, reason string, id string) (err error)
- func (qm *ServerMgr) Timer()
- func (qm *ServerMgr) UpdateGroup(jobid string, newgroup string) (err error)
- func (qm *ServerMgr) UpdateJobPerfStartTime(jobid string)
- func (qm *ServerMgr) UpdateJobTaskToInProgress(works []*Workunit)
- func (qm *ServerMgr) UpdatePriority(jobid string, priority int) (err error)
- func (qm *ServerMgr) UpdateTaskPerfStartTime(taskid string)
- type Task
- func (task *Task) CreateIndex() (err error)
- func (task *Task) DeleteInput()
- func (task *Task) DeleteOutput()
- func (task *Task) InitPartIndex() (err error)
- func (task *Task) InitTask(job *Job) (err error)
- func (task *Task) ParseWorkunit() (wus []*Workunit, err error)
- func (task *Task) Skippable() bool
- func (task *Task) UpdateState(newState string) string
- type TaskPerf
- type Task_p
- type VariableExpander
- type WQueue
- type WorkList
- type WorkMgr
- type WorkPerf
- type Workflow
- type WorkflowMgr
- type Workunit
- type WorkunitsSortby
Constants ¶
const ( CLIENT_STAT_ACTIVE_BUSY = "active-busy" CLIENT_STAT_ACTIVE_IDLE = "active-idle" CLIENT_STAT_SUSPEND = "suspend" CLIENT_STAT_DELETED = "deleted" )
const ( JOB_STAT_INIT = "init" JOB_STAT_QUEUED = "queued" JOB_STAT_INPROGRESS = "in-progress" JOB_STAT_COMPLETED = "completed" JOB_STAT_SUSPEND = "suspend" JOB_STAT_DELETED = "deleted" )
const ( TASK_STAT_INIT = "init" TASK_STAT_QUEUED = "queued" TASK_STAT_INPROGRESS = "in-progress" TASK_STAT_PENDING = "pending" TASK_STAT_SUSPEND = "suspend" TASK_STAT_COMPLETED = "completed" TASK_STAT_SKIPPED = "user_skipped" TASK_STAT_FAIL_SKIP = "skipped" TASK_STAT_PASSED = "passed" )
const ( WORK_STAT_QUEUED = "queued" WORK_STAT_CHECKOUT = "checkout" WORK_STAT_SUSPEND = "suspend" WORK_STAT_DONE = "done" WORK_STAT_FAIL = "fail" WORK_STAT_PREPARED = "prepared" WORK_STAT_COMPUTED = "computed" WORK_STAT_DISCARDED = "discarded" WORK_STAT_PROXYQUEUED = "proxyqueued" )
Variables ¶
var ( QMgr ResourceMgr Service string = "unknown" Self *Client ProxyWorkChan chan bool )
var (
CGNameRegex = regexp.MustCompile(`^[A-Za-z0-9\_\-\.]+$`)
)
var JOB_STATS_ACTIVE = []string{JOB_STAT_QUEUED, JOB_STAT_INPROGRESS}
var JOB_STATS_REGISTERED = []string{JOB_STAT_QUEUED, JOB_STAT_INPROGRESS, JOB_STAT_SUSPEND}
var JOB_STATS_TO_RECOVER = []string{JOB_STAT_QUEUED, JOB_STAT_INPROGRESS, JOB_STAT_SUSPEND}
Functions ¶
func DeleteClientGroup ¶ added in v0.9.3
func Expand_app_variables ¶ added in v0.9.3
func Expand_app_variables(app_variables AppVariables, cmd_script []string) (err error)
func GetJobIdByWorkId ¶
func GetTaskIdByWorkId ¶
func InitAwfMgr ¶
func InitAwfMgr()
func InitClientGroupDB ¶ added in v0.9.3
func InitClientGroupDB()
func InitClientProfile ¶
func InitClientProfile(profile *Client)
func InitProxyWorkChan ¶
func InitProxyWorkChan()
func InitResMgr ¶
func InitResMgr(service string)
func IsFirstTask ¶
func NotifyWorkunitProcessed ¶
functions for REST API communication (=deprecated=) notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"
func ParseResource ¶ added in v0.9.13
func ParseResource(input_arg AppResource, app_variables AppVariables, job *Job, task *Task, taskid2task map[string]*Task) (err error)
func PostNodeWithToken ¶
func PushOutputData ¶
deprecated, see cache.UploadOutputData
func PutFileToShock ¶
func ReloadFromDisk ¶
func ShockPutIndex ¶
Types ¶
type App ¶ added in v0.9.13
type App struct { Name string `bson:"name" json:"name"` App_args []AppResource `bson:"app_args" json:"app_args"` AppDef *AppCommandMode `bson:"appdef" json:"appdef"` // App defintion }
type AppCommandMode ¶ added in v0.9.3
type AppCommandMode struct { Input []AppInput `bson:"input" json:"input"` Output_array []string `bson:"output_array" json:"output_array"` Outputs []IO `bson:"outputs" json:"outputs"` Predata IOmap `bson:"predata" json:"predata"` Cmd string `bson:"cmd" json:"cmd"` Cmd_interpreter string `bson:"cmd_interpreter" json:"cmd_interpreter"` Cmd_script []string `bson:"cmd_script" json:"cmd_script"` Variables []map[string]string `bson:"variables" json:"variables"` Dockerimage string // just for convenience }
func (AppCommandMode) Get_app_variables ¶ added in v0.9.3
func (acm AppCommandMode) Get_app_variables(app_variables AppVariables) (err error)
func (AppCommandMode) Get_default_app_variables ¶ added in v0.9.3
func (acm AppCommandMode) Get_default_app_variables() (app_variables AppVariables, err error)
func (AppCommandMode) ParseAppInput ¶ added in v0.9.3
func (acm AppCommandMode) ParseAppInput(app_variables AppVariables, args_array []AppResource, job *Job, task *Task, taskid2task map[string]*Task) (err error)
read variables and (optionally) populate with input nodes also transfers information from app defintions to app inputs 1) for reading variables, it needs only acm.Get_default_app_variables(), job and task will be nil 2) for populating input nodes it needs output of 2 ! this is done server-side !
type AppInput ¶ added in v0.9.4
type AppInput struct { Type string `bson:"type" json:"type"` Name string `bson:"name" json:"name"` DefaultValue string `bson:"default_value" json:"default_value"` Required bool `bson:"required" json:"required"` // or use optional // TODO remove Optional bool `bson:"optional" json:"optional"` Option string `bson:"option" json:"option"` // this is the name used by the command line proramm, e.g. "--input=" Cache bool `bson:"cache" json:"cache"` // specifies that input has to be cached (predata) ShockIndex string `bson:"shockindex" json:"shockindex"` // specifies that (shock) input has to be indexed in Shock by the AWE server }
part of the app-definition
type AppInputType ¶ added in v0.9.3
type AppInputType int
const ( Ait_undefined AppInputType = iota Ait_file Ait_string Ait_shock Ait_url Ait_task Ait_list )
func String2apptype ¶ added in v0.9.13
func String2apptype(type_string string) (ait AppInputType, err error)
func (AppInputType) HasType ¶ added in v0.9.3
func (this_ait AppInputType) HasType(ait AppInputType) bool
type AppPackage ¶ added in v0.9.3
type AppPackage struct { Dockerimage string `bson:"dockerimage" json:"dockerimage"` Commands map[string]map[string]*AppCommandMode // package_command, package_mode }
type AppRegistry ¶ added in v0.9.3
type AppRegistry map[string]*AppPackage
func MakeAppRegistry ¶ added in v0.9.3
func MakeAppRegistry() (new_instance AppRegistry, err error)
generator function for app registry
func (AppRegistry) GetAppPackage ¶ added in v0.9.13
func (apr AppRegistry) GetAppPackage(app_package string) (ap *AppPackage, err error)
func (AppRegistry) Get_cmd_mode_object ¶ added in v0.9.3
func (appr AppRegistry) Get_cmd_mode_object(app_package_name string, app_command_name string, app_cmd_mode_name string) (app_cmd_mode_object_ref *AppCommandMode, err error)
func (AppRegistry) Get_dockerimage ¶ added in v0.9.3
func (appr AppRegistry) Get_dockerimage(app_package_name string) (dockerimage string, err error)
type AppResource ¶ added in v0.9.3
type AppResource struct { Resource string `bson:"resource" json:"resource"` Host string `bson:"host" json:"host"` Node string `bson:"node" json:"node"` Url string `bson:"url" json:"url"` Filename string `bson:"filename" json:"filename"` Key string `bson:"key" json:"key"` Value string `bson:"value" json:"value"` Task string `bson:"task" json:"task"` OutputPosition *int `bson:"position" json:"position"` OutputName string `bson:"name" json:"name"` Uncompress string `bson:"uncompress" json:"uncompress"` // tells AWE client to uncompress this file, e.g. "gzip" List []AppResource `bson:"list" json:"list"` Cache bool `bson:"cache" json:"cache"` ShockIndex string `bson:"shockindex" json:"shockindex"` // specifies that (shock) input has to be indexed in Shock by the AWE server }
part of workflow document, used in "Command", defines input: shock, task, string those can generate IO structs (see io.go)
type AppVariable ¶ added in v0.9.3
type AppVariable struct { Value string Var_type AppInputType Option string // a flag that is needed to activate an argument on the command line, e.g. "--input ", mainly used for optional arguments Optional bool // indicates that an empty value is ok and not an error }
type AppVariables ¶ added in v0.9.3
type AppVariables map[string]AppVariable
part of the (internal-only) workflow document, used in "Task""
type CQMgr ¶
type CQMgr struct {
// contains filtered or unexported fields
}
func (*CQMgr) CheckoutWorkunits ¶
func (*CQMgr) ClientChecker ¶
func (qm *CQMgr) ClientChecker()
func (*CQMgr) ClientHeartBeat ¶
func (qm *CQMgr) ClientHeartBeat(id string, cg *ClientGroup) (hbmsg HBmsg, err error)
func (*CQMgr) DeleteClient ¶
func (*CQMgr) DeleteClientByUser ¶ added in v0.9.3
func (*CQMgr) EnqueueWorkunit ¶
func (*CQMgr) FetchDataToken ¶
func (*CQMgr) GetAllClients ¶
func (*CQMgr) GetAllClientsByUser ¶ added in v0.9.3
func (*CQMgr) GetClientByUser ¶ added in v0.9.3
func (*CQMgr) LockSemaphore ¶ added in v0.9.11
func (qm *CQMgr) LockSemaphore()
func (*CQMgr) NotifyWorkStatus ¶
func (*CQMgr) ReQueueWorkunitByClient ¶
func (*CQMgr) RegisterNewClient ¶
func (qm *CQMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)
func (*CQMgr) ResumeClient ¶
func (*CQMgr) ResumeClientByUser ¶ added in v0.9.3
func (*CQMgr) ResumeSuspendedClients ¶
func (*CQMgr) ResumeSuspendedClientsByUser ¶ added in v0.9.3
func (*CQMgr) ShowWorkunits ¶
func (*CQMgr) ShowWorkunitsByUser ¶ added in v0.9.3
func (*CQMgr) SuspendAllClients ¶
func (*CQMgr) SuspendAllClientsByUser ¶ added in v0.9.3
func (*CQMgr) SuspendClient ¶
func (*CQMgr) SuspendClientByUser ¶ added in v0.9.3
func (*CQMgr) UnlockSemaphore ¶ added in v0.9.11
func (qm *CQMgr) UnlockSemaphore()
func (*CQMgr) UpdateSubClients ¶
type Client ¶
type Client struct { Id string `bson:"id" json:"id"` Name string `bson:"name" json:"name"` Group string `bson:"group" json:"group"` User string `bson:"user" json:"user"` Domain string `bson:"domain" json:"domain"` InstanceId string `bson:"instance_id" json:"instance_id"` InstanceType string `bson:"instance_type" json:"instance_type"` Host string `bson:"host" json:"host"` CPUs int `bson:"cores" json:"cores"` Apps []string `bson:"apps" json:"apps"` RegTime time.Time `bson:"regtime" json:"regtime"` Serve_time string `bson:"serve_time" json:"serve_time"` Idle_time int `bson:"idle_time" json:"idle_time"` Status string `bson:"Status" json:"Status"` Total_checkout int `bson:"total_checkout" json:"total_checkout"` Total_completed int `bson:"total_completed" json:"total_completed"` Total_failed int `bson:"total_failed" json:"total_failed"` Current_work map[string]bool `bson:"current_work" json:"current_work"` Skip_work []string `bson:"skip_work" json:"skip_work"` Last_failed int `bson:"-" json:"-"` Tag bool `bson:"-" json:"-"` Proxy bool `bson:"proxy" json:"proxy"` SubClients int `bson:"subclients" json:"subclients"` GitCommitHash string `bson:"git_commit_hash" json:"git_commit_hash"` Version string `bson:"version" json:"version"` }
func NewProfileClient ¶
type ClientGroup ¶ added in v0.9.3
type ClientGroup struct { Id string `bson:"id" json:"id"` IP_CIDR string `bson:"ip_cidr" json:"ip_cidr"` Name string `bson:"name" json:"name"` Token string `bson:"token" json:"token"` Acl clientGroupAcl.ClientGroupAcl `bson:"acl" json:"-"` CreatedOn time.Time `bson:"created_on" json:"created_on"` Expiration time.Time `bson:"expiration" json:"expiration"` LastModified time.Time `bson:"last_modified" json:"last_modified"` }
func CreateClientGroup ¶ added in v0.9.3
func CreateClientGroup(name string, u *user.User) (cg *ClientGroup, err error)
func LoadClientGroup ¶ added in v0.9.3
func LoadClientGroup(id string) (clientgroup *ClientGroup, err error)
func LoadClientGroupByName ¶ added in v0.9.3
func LoadClientGroupByName(name string) (clientgroup *ClientGroup, err error)
func LoadClientGroupByToken ¶ added in v0.9.3
func LoadClientGroupByToken(token string) (clientgroup *ClientGroup, err error)
func (*ClientGroup) Save ¶ added in v0.9.3
func (cg *ClientGroup) Save() (err error)
func (*ClientGroup) SetToken ¶ added in v0.9.3
func (cg *ClientGroup) SetToken()
type ClientMgr ¶
type ClientMgr interface { RegisterNewClient(FormFiles, *ClientGroup) (*Client, error) ClientHeartBeat(string, *ClientGroup) (HBmsg, error) GetClient(string) (*Client, error) GetClientByUser(string, *user.User) (*Client, error) GetAllClients() []*Client GetAllClientsByUser(*user.User) []*Client DeleteClient(string) error DeleteClientByUser(string, *user.User) error SuspendClient(string) error SuspendClientByUser(string, *user.User) error ResumeClient(string) error ResumeClientByUser(string, *user.User) error ResumeSuspendedClients() int ResumeSuspendedClientsByUser(*user.User) int SuspendAllClients() int SuspendAllClientsByUser(*user.User) int ClientChecker() UpdateSubClients(string, int) UpdateSubClientsByUser(string, int, *user.User) }
type ClientWorkMgr ¶
type Command ¶
type Command struct { Name string `bson:"name" json:"name"` Args string `bson:"args" json:"args"` Dockerimage string `bson:"Dockerimage" json:"Dockerimage"` Cmd_script []string `bson:"cmd_script" json:"cmd_script"` Environ Envs `bson:"environ" json:"environ"` HasPrivateEnv bool `bson:"has_private_env" json:"has_private_env"` Description string `bson:"description" json:"description"` ParsedArgs []string `bson:"-" json:"-"` }
func NewCommand ¶
type Environ_p ¶ added in v0.9.3
following special code is in order to unmarshal the private field Command.Environ.Private, so put them in to this file for less confusion
type HBmsg ¶
heartbeat response from awe-server to awe-client used for issue operation request to client, e.g. discard suspended workunits
type IO ¶
type IO struct { FileName string `bson:"filename" json:"filename"` Name string `bson:"name" json:"name"` // specifies abstract name of output as defined by the app AppPosition int `bson:"appposition" json:"-"` // specifies position in app output array Directory string `bson:"directory" json:"directory"` Host string `bson:"host" json:"host"` Node string `bson:"node" json:"node"` Url string `bson:"url" json:"url"` // can be shock or any other url Size int64 `bson:"size" json:"size"` MD5 string `bson:"md5" json:"-"` Cache bool `bson:"cache" json:"cache"` // indicates that this files is "predata"" that needs to be cached Origin string `bson:"origin" json:"origin"` Path string `bson:"path" json:"-"` Optional bool `bson:"optional" json:"-"` Nonzero bool `bson:"nonzero" json:"nonzero"` DataToken string `bson:"datatoken" json:"-"` Intermediate bool `bson:"Intermediate" json:"-"` Temporary bool `bson:"temporary" json:"temporary"` ShockFilename string `bson:"shockfilename" json:"shockfilename"` ShockIndex string `bson:"shockindex" json:"shockindex"` // on input it indicates that Shock node has to be indexed by AWE server AttrFile string `bson:"attrfile" json:"attrfile"` NoFile bool `bson:"nofile" json:"nofile"` Delete bool `bson:"delete" json:"delete"` Type string `bson:"type" json:"type"` NodeAttr map[string]interface{} `bson:"nodeattr" json:"nodeattr"` // specifies attribute data to be stored in shock node (output only) FormOptions map[string]string `bson:"formoptions" json:"formoptions"` Uncompress string `bson:"uncompress" json:"uncompress"` // tells AWE client to uncompress this file, e.g. "gzip" }
func (*IO) DeleteNode ¶ added in v0.9.3
func (*IO) GetFileSize ¶
func (*IO) GetIndexInfo ¶
func (*IO) GetIndexUnits ¶
type Info ¶
type Info struct { Name string `bson:"name" json:"name"` Xref string `bson:"xref" json:"xref"` Service string `bson:"service" json:"service"` Project string `bson:"project" json:"project"` User string `bson:"user" json:"user"` Pipeline string `bson:"pipeline" json:"pipeline"` ClientGroups string `bson:"clientgroups" json:"clientgroups"` SubmitTime time.Time `bson:"submittime" json:"submittime"` StartedTme time.Time `bson:"startedtime" json:"startedtime"` CompletedTime time.Time `bson:"completedtime" json:"completedtime"` Priority int `bson:"priority" json:"priority"` Auth bool `bson:"auth" json:"auth"` DataToken string `bson:"datatoken" json:"-"` NoRetry bool `bson:"noretry" json:"noretry"` UserAttr map[string]string `bson:"userattr" json:"userattr"` Description string `bson:"description" json:"description"` Tracking bool `bson:"tracking" json:"tracking"` }
job info
type Job ¶
type Job struct { Id string `bson:"id" json:"id"` Jid string `bson:"jid" json:"jid"` Acl acl.Acl `bson:"acl" json:"-"` Info *Info `bson:"info" json:"info"` Tasks []*Task `bson:"tasks" json:"tasks"` Script script `bson:"script" json:"-"` State string `bson:"state" json:"state"` Registered bool `bson:"registered" json:"registered"` RemainTasks int `bson:"remaintasks" json:"remaintasks"` UpdateTime time.Time `bson:"updatetime" json:"updatetime"` Notes string `bson:"notes" json:"notes"` LastFailed string `bson:"lastfailed" json:"lastfailed"` Resumed int `bson:"resumed" json:"resumed"` //number of times the job has been resumed from suspension ShockHost string `bson:"shockhost" json:"shockhost"` // this is a fall-back default if not specified at a lower level }
func CreateJobUpload ¶
func ParseJobTasks ¶
parse job by job script
func (*Job) GetDataToken ¶
func (*Job) GetPrivateEnv ¶ added in v0.9.3
func (*Job) UpdateFile ¶
---Script upload
func (*Job) UpdateState ¶
---Field update functions
type JobMgr ¶
type JobMgr interface { JobRegister() (string, error) EnqueueTasksByJobId(string, []*Task) error GetActiveJobs() map[string]*JobPerf IsJobRegistered(string) bool GetSuspendJobs() map[string]bool SuspendJob(string, string, string) error ResumeSuspendedJob(string) error ResumeSuspendedJobByUser(string, *user.User) error ResumeSuspendedJobs() int ResumeSuspendedJobsByUser(*user.User) int ResubmitJob(string) error DeleteJob(string) error DeleteJobByUser(string, *user.User) error DeleteSuspendedJobs() int DeleteSuspendedJobsByUser(*user.User) int DeleteZombieJobs() int DeleteZombieJobsByUser(*user.User) int InitMaxJid() error RecoverJobs() error FinalizeWorkPerf(string, string) error SaveStdLog(string, string, string) error GetReportMsg(string, string) (string, error) RecomputeJob(string, string) error UpdateGroup(string, string) error UpdatePriority(string, int) error }
type JobMin ¶ added in v0.9.8
type JobMin struct { Id string `bson:"id" json:"id"` Name string `bson:"name" json:"name"` Size int64 `bson:"size" json:"size"` SubmitTime time.Time `bson:"submittime" json:"submittime"` CompletedTime time.Time `bson:"completedtime" json:"completedtime"` Task []int `bson:"task" json:"task"` State []string `bson:"state" json:"state"` UserAttr map[string]string `bson:"userattr" json:"userattr"` }
type JobPerf ¶
type JobPerf struct { Id string `bson:"id" json:"id"` Queued int64 `bson:"queued" json:"queued"` Start int64 `bson:"start" json:"start"` End int64 `bson:"end" json:"end"` Resp int64 `bson:"resp" json:"resp"` //End - Queued Ptasks map[string]*TaskPerf `bson:"task_stats" json:"task_stats"` Pworks map[string]*WorkPerf `bson:"work_stats" json:"work_stats"` }
func LoadJobPerf ¶
func NewJobPerf ¶
type Jobs ¶
type Jobs []Job
Job array type
func (*Jobs) GetAllLimitOffset ¶
func (*Jobs) GetAllRecent ¶
func (*Jobs) GetPaginated ¶
type Pipeline ¶
type Pipeline struct { Info *Info `bson:"info" json:"info"` Tasks []Task `bson:"tasks" json:"tasks"` }
func NewPipeline ¶
func NewPipeline() *Pipeline
type ProxyMgr ¶
type ProxyMgr struct {
CQMgr
}
func NewProxyMgr ¶
func NewProxyMgr() *ProxyMgr
func (*ProxyMgr) ClientChecker ¶
func (qm *ProxyMgr) ClientChecker()
func (*ProxyMgr) DeleteJobByUser ¶ added in v0.9.3
func (*ProxyMgr) DeleteSuspendedJobs ¶
func (*ProxyMgr) DeleteSuspendedJobsByUser ¶ added in v0.9.3
func (*ProxyMgr) DeleteZombieJobs ¶
func (*ProxyMgr) DeleteZombieJobsByUser ¶ added in v0.9.3
func (*ProxyMgr) EnqueueTasksByJobId ¶
func (*ProxyMgr) FetchDataToken ¶
func (*ProxyMgr) FetchPrivateEnv ¶ added in v0.9.3
func (*ProxyMgr) FinalizeWorkPerf ¶
func (*ProxyMgr) GetActiveJobs ¶
func (*ProxyMgr) GetReportMsg ¶
func (*ProxyMgr) GetSuspendJobs ¶
func (*ProxyMgr) InitMaxJid ¶
func (*ProxyMgr) IsJobRegistered ¶
func (*ProxyMgr) JobRegister ¶
func (*ProxyMgr) RecomputeJob ¶
recompute jobs from specified task stage
func (*ProxyMgr) RecoverJobs ¶
recover jobs not completed before awe-server restarts
func (*ProxyMgr) RegisterNewClient ¶
func (qm *ProxyMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)
func (*ProxyMgr) ResubmitJob ¶
re-submit a job in db but not in the queue (caused by server restarting)
func (*ProxyMgr) ResumeSuspendedJob ¶
resubmit a suspended job
func (*ProxyMgr) ResumeSuspendedJobByUser ¶ added in v0.9.3
resubmit a suspended job if user has rights
func (*ProxyMgr) ResumeSuspendedJobs ¶
func (*ProxyMgr) ResumeSuspendedJobsByUser ¶ added in v0.9.3
func (*ProxyMgr) ShowStatus ¶
func (*ProxyMgr) SuspendJob ¶
func (*ProxyMgr) UpdateGroup ¶
type ResourceMgr ¶
type ResourceMgr interface { ClientWorkMgr JobMgr Handle() ShowStatus() string Timer() }
type ServerMgr ¶
type ServerMgr struct { CQMgr // contains filtered or unexported fields }
func NewServerMgr ¶
func NewServerMgr() *ServerMgr
func (*ServerMgr) CreateJobPerf ¶
---perf related methods
func (*ServerMgr) CreateTaskPerf ¶
func (*ServerMgr) CreateWorkPerf ¶
func (*ServerMgr) DeleteJobByUser ¶ added in v0.9.3
func (*ServerMgr) DeleteJobPerf ¶
func (*ServerMgr) DeleteSuspendedJobs ¶
func (*ServerMgr) DeleteSuspendedJobsByUser ¶ added in v0.9.3
func (*ServerMgr) DeleteZombieJobs ¶
delete jobs in db with "queued" or "in-progress" state but not in the queue (zombie jobs)
func (*ServerMgr) DeleteZombieJobsByUser ¶ added in v0.9.3
delete jobs in db with "queued" or "in-progress" state but not in the queue (zombie jobs) that user has access to
func (*ServerMgr) EnqueueTasksByJobId ¶
func (*ServerMgr) FetchDataToken ¶
--workunit methds (servermgr implementation)
func (*ServerMgr) FetchPrivateEnv ¶ added in v0.9.3
func (*ServerMgr) FetchPrivateEnvs ¶ added in v0.9.3
func (*ServerMgr) FinalizeJobPerf ¶
func (*ServerMgr) FinalizeTaskPerf ¶
func (*ServerMgr) FinalizeWorkPerf ¶
func (*ServerMgr) GetActiveJobs ¶
func (*ServerMgr) GetReportMsg ¶
func (*ServerMgr) GetSuspendJobs ¶
func (*ServerMgr) InitMaxJid ¶
func (*ServerMgr) IsJobRegistered ¶
func (*ServerMgr) JobRegister ¶
---job methods---
func (*ServerMgr) LogJobPerf ¶
func (*ServerMgr) RecomputeJob ¶
recompute jobs from specified task stage
func (*ServerMgr) RecoverJobs ¶
recover jobs not completed before awe-server restarts
func (*ServerMgr) ResubmitJob ¶
re-submit a job in db but not in the queue (caused by server restarting)
func (*ServerMgr) ResumeSuspendedJob ¶
resubmit a suspended job
func (*ServerMgr) ResumeSuspendedJobByUser ¶ added in v0.9.3
resubmit a suspended job if the user is authorized
func (*ServerMgr) ResumeSuspendedJobs ¶
func (*ServerMgr) ResumeSuspendedJobsByUser ¶ added in v0.9.3
func (*ServerMgr) SaveStdLog ¶
func (*ServerMgr) ShowStatus ¶
func (*ServerMgr) SuspendJob ¶
func (*ServerMgr) UpdateGroup ¶
update job group
func (*ServerMgr) UpdateJobPerfStartTime ¶
func (*ServerMgr) UpdateJobTaskToInProgress ¶
update job/task states from "queued" to "in-progress" once the first workunit is checked out
func (*ServerMgr) UpdatePriority ¶ added in v0.9.3
func (*ServerMgr) UpdateTaskPerfStartTime ¶
type Task ¶
type Task struct { Id string `bson:"taskid" json:"taskid"` Info *Info `bson:"info" json:"-"` Inputs IOmap `bson:"inputs" json:"inputs"` Outputs IOmap `bson:"outputs" json:"outputs"` Predata IOmap `bson:"predata" json:"predata"` Cmd *Command `bson:"cmd" json:"cmd"` App *App `bson:"app" json:"app"` AppVariables AppVariables // not in App as workunit does not need AppVariables and I want to pass App Partition *PartInfo `bson:"partinfo" json:"-"` DependsOn []string `bson:"dependsOn" json:"dependsOn"` TotalWork int `bson:"totalwork" json:"totalwork"` MaxWorkSize int `bson:"maxworksize" json:"maxworksize"` RemainWork int `bson:"remainwork" json:"remainwork"` WorkStatus []string `bson:"workstatus" json:"-"` State string `bson:"state" json:"state"` Skip int `bson:"skip" json:"-"` CreatedDate time.Time `bson:"createdDate" json:"createddate"` StartedDate time.Time `bson:"startedDate" json:"starteddate"` CompletedDate time.Time `bson:"completedDate" json:"completeddate"` ComputeTime int `bson:"computetime" json:"computetime"` UserAttr map[string]string `bson:"userattr" json:"userattr"` ClientGroups string `bson:"clientgroups" json:"clientgroups"` }
func (*Task) CreateIndex ¶ added in v0.9.4
func (*Task) DeleteInput ¶ added in v0.9.12
func (task *Task) DeleteInput()
func (*Task) DeleteOutput ¶ added in v0.9.3
func (task *Task) DeleteOutput()
func (*Task) InitPartIndex ¶
get part size based on partition/index info if fail to get index info, task.TotalWork fall back to 1 and return nil
func (*Task) ParseWorkunit ¶
func (*Task) UpdateState ¶
type TaskPerf ¶
type TaskPerf struct { Queued int64 `bson:"queued" json:"queued"` Start int64 `bson:"start" json:"start"` End int64 `bson:"end" json:"end"` Resp int64 `bson:"resp" json:"resp"` //End -Queued InFileSizes []int64 `bson:"size_infile" json:"size_infile"` OutFileSizes []int64 `bson:"size_outfile" json:"size_outfile"` }
func NewTaskPerf ¶
type VariableExpander ¶ added in v0.9.3
type VariableExpander struct {
// contains filtered or unexported fields
}
func NewVariableExpander ¶ added in v0.9.3
func NewVariableExpander(app_variables AppVariables) VariableExpander
type WorkMgr ¶
type WorkMgr interface { GetWorkById(string) (*Workunit, error) ShowWorkunits(string) []*Workunit ShowWorkunitsByUser(string, *user.User) []*Workunit CheckoutWorkunits(string, string, int) ([]*Workunit, error) NotifyWorkStatus(Notice) EnqueueWorkunit(*Workunit) error FetchDataToken(string, string) (string, error) FetchPrivateEnv(string, string) (map[string]string, error) }
type WorkPerf ¶
type WorkPerf struct { Queued int64 `bson:"queued" json:"queued"` // WQ (queued at server or client, depending on who creates it) Done int64 `bson:"done" json:"done"` // WD (done at server) Resp int64 `bson:"resp" json:"resp"` // Done - Queued (server metric) Checkout int64 `bson:"checkout" json:"checkout"` // checkout at client Deliver int64 `bson:"deliver" json:"deliver"` // done at client ClientResp int64 `bson:"clientresp" json:"clientresp"` // Deliver - Checkout (client metric) PreDataIn float64 `bson:"time_predata_in" json:"time_predata_in"` // time in seconds for downloading prerequisite data at client DataIn float64 `bson:"time_data_in" json:"time_data_in"` // time in seconds for input data move-in at client DataOut float64 `bson:"time_data_out" json:"time_data_out"` // time in seconds for output data move-out at client Runtime int64 `bson:"runtime" json:"runtime"` // time in seconds for computation at client DockerPrep int64 `bson:"dockerprep" json:"dockerprep"` // time in seconds for docker preparation on client MaxMemUsage int64 `bson:"max_mem_usage" json:"max_mem_usage"` // maxium memory consumption MaxMemoryTotalRss int64 `bson:"max_memory_total_rss" json:"max_memory_total_rss"` MaxMemoryTotalSwap int64 `bson:"max_memory_total_swap" json:"max_memory_total_swap"` ClientId string `bson:"client_id" json:"client_id"` PreDataSize int64 `bson:"size_predata" json:"size_predata"` //predata moved over network InFileSize int64 `bson:"size_infile" json:"size_infile"` //input file moved over network OutFileSize int64 `bson:"size_outfile" json:"size_outfile"` //outpuf file moved over network }
func NewWorkPerf ¶
type Workflow ¶
type Workflow struct { WfInfo awf_info `bson:"workflow_info" json:"workflow_info"` JobInfo awf_jobinfo `bson:"job_info" json:"job_info"` RawInputs map[string]string `bson:"raw_inputs" json:"raw_inputs"` Variables map[string]string `bson:"variables" json:"variables"` DataServer string `bson:"data_server" json:"data_server"` Tasks []*awf_task `bson:"tasks" json:"tasks"` }
type WorkflowMgr ¶
type WorkflowMgr struct {
// contains filtered or unexported fields
}
var (
AwfMgr *WorkflowMgr
)
func NewWorkflowMgr ¶
func NewWorkflowMgr() *WorkflowMgr
func (*WorkflowMgr) AddWorkflow ¶
func (wfm *WorkflowMgr) AddWorkflow(name string, awf *Workflow)
func (*WorkflowMgr) GetAllWorkflows ¶
func (wfm *WorkflowMgr) GetAllWorkflows() (workflows []*Workflow)
func (*WorkflowMgr) GetWorkflow ¶
func (wfm *WorkflowMgr) GetWorkflow(name string) (awf *Workflow, err error)
func (*WorkflowMgr) LoadWorkflows ¶
func (wfm *WorkflowMgr) LoadWorkflows() (err error)
type Workunit ¶
type Workunit struct { Id string `bson:"wuid" json:"wuid"` Info *Info `bson:"info" json:"info"` Inputs IOmap `bson:"inputs" json:"inputs"` Outputs IOmap `bson:"outputs" json:"outputs"` Predata IOmap `bson:"predata" json:"predata"` Cmd *Command `bson:"cmd" json:"cmd"` App *App `bson:"app" json:"app"` Rank int `bson:"rank" json:"rank"` TotalWork int `bson:"totalwork" json:"totalwork"` Partition *PartInfo `bson:"part" json:"part"` State string `bson:"state" json:"state"` Failed int `bson:"failed" json:"failed"` CheckoutTime time.Time `bson:"checkout_time" json:"checkout_time"` Client string `bson:"client" json:"client"` ComputeTime int `bson:"computetime" json:"computetime"` Notes string `bson:"notes" json:"notes"` UserAttr map[string]string `bson:"userattr" json:"userattr"` }
func NewWorkunit ¶
func (*Workunit) CDworkpath ¶
type WorkunitsSortby ¶ added in v0.9.8
func (WorkunitsSortby) Len ¶ added in v0.9.8
func (w WorkunitsSortby) Len() int
func (WorkunitsSortby) Less ¶ added in v0.9.8
func (w WorkunitsSortby) Less(i, j int) bool
func (WorkunitsSortby) Swap ¶ added in v0.9.8
func (w WorkunitsSortby) Swap(i, j int)