Documentation ¶
Index ¶
- Constants
- Variables
- func CWLInputCheck(jobInput *cwl.Job_document, cwlWorkflow *cwl.Workflow, ...) (jobInputNew *cwl.Job_document, err error)
- func DBGetJobACL(jobID string) (_acl acl.Acl, err error)
- func DBGetJobWorkflow_instance_One(q bson.M, options *DefaultQueryOptions, do_init bool) (result interface{}, err error)
- func DBGetJobWorkflow_instances(q bson.M, options *DefaultQueryOptions, do_init bool) (results []interface{}, count int, err error)
- func DbFindDistinct(q bson.M, d string) (results interface{}, err error)
- func DbUpdateJobField(jobID string, key string, value interface{}) (err error)
- func DeleteClientGroup(id string) (err error)
- func Deserialize_b64(encoding string, target interface{}) (err error)
- func FixTimeInMap(original_map map[string]interface{}, field string) (err error)
- func GetAdminView(special string) (data []interface{}, err error)
- func GetJobCount(q bson.M) (count int, err error)
- func GetJobIDByTaskIDDeprecated(taskid string) (jobid string, err error)
- func GetJobIDByWorkIDDeprecated(workid string) (jobid string, err error)
- func GetTaskIDByWorkIDDeprecated(workid string) (taskid string, err error)
- func HasInfoField(a string) bool
- func InitAwfMgr()
- func InitClientGroupDB()
- func InitJobDB()
- func InitProxyWorkChan()
- func InitReaper()
- func InitResMgr(service string)
- func IsFirstTask(taskid string) bool
- func IsValidUUID(uuid string) bool
- func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error)
- func PushOutputData(work *Workunit) (size int64, err error)
- func ReloadFromDisk(path string) (err error)
- func RemoveWorkFromClient(client *Client, workid Workunit_Unique_Identifier) (err error)
- func SetClientProfile(profile *Client)
- func String2Date(str string) (t time.Time, err error)
- func UpdateJobStateDeprecated(jobid string, newstate string, oldstates []string) (err error)
- type BaseResponse
- type CQMgr
- func (qm *CQMgr) AddClient(client *Client, lock bool) (err error)
- func (qm *CQMgr) CheckClient(client *Client) (ok bool, err error)
- func (qm *CQMgr) CheckoutWorkunits(reqPolicy string, clientID string, client *Client, availableBytes int64, ...) (workunits []*Workunit, err error)
- func (qm *CQMgr) ClientChecker()
- func (qm *CQMgr) ClientHandle()
- func (qm *CQMgr) ClientHeartBeat(id string, cg *ClientGroup, workerstate WorkerState) (hbmsg HeartbeatInstructions, err error)
- func (qm *CQMgr) ClientStatusChangeDeprecated(client *Client, newStatus string, clientWriteLock bool) (err error)
- func (qm *CQMgr) DeleteClients(deleteClients []string)
- func (qm *CQMgr) EnqueueWorkunit(work *Workunit) (err error)
- func (qm *CQMgr) FetchDataToken(workid string, clientid string) (token string, err error)
- func (qm *CQMgr) GetAllClientsByUser(u *user.User) (clients []*Client, err error)
- func (qm *CQMgr) GetClient(id string, lockClientMap bool) (client *Client, ok bool, err error)
- func (qm *CQMgr) GetClientByUser(id string, u *user.User) (client *Client, err error)
- func (qm *CQMgr) GetClientMap() *ClientMap
- func (qm *CQMgr) GetWorkByID(id Workunit_Unique_Identifier) (workunit *Workunit, err error)
- func (qm *CQMgr) HasClient(id string, lockClientMap bool) (has bool, err error)
- func (qm *CQMgr) ListClients() (ids []string, err error)
- func (qm *CQMgr) NotifyWorkStatus(notice Notice)
- func (qm *CQMgr) ReQueueWorkunitByClient(client *Client, clientWriteLock bool) (err error)
- func (qm *CQMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)
- func (qm *CQMgr) RemoveClient(id string, lock bool) (err error)
- func (qm *CQMgr) ResumeClient(id string) (err error)
- func (qm *CQMgr) ResumeClientByUser(id string, u *user.User) (err error)
- func (qm *CQMgr) ResumeSuspendedClients() (count int, err error)
- func (qm *CQMgr) ResumeSuspendedClientsByUser(u *user.User) (count int)
- func (qm *CQMgr) ShowWorkunits(status string) (workunits []*Workunit, err error)
- func (qm *CQMgr) ShowWorkunitsByUser(status string, u *user.User) (workunits []*Workunit)
- func (qm *CQMgr) SuspendAllClients(reason string) (count int, err error)
- func (qm *CQMgr) SuspendAllClientsByUser(u *user.User, reason string) (count int, err error)
- func (qm *CQMgr) SuspendClient(id string, client *Client, reason string, clientWriteLock bool) (err error)
- func (qm *CQMgr) SuspendClientByUser(id string, u *user.User, reason string) (err error)
- func (qm *CQMgr) UpdateSubClients(id string, count int) (err error)
- func (qm *CQMgr) UpdateSubClientsByUser(id string, count int, u *user.User)
- type CWLWorkunit
- type CheckoutRequest
- type Client
- func (client *Client) Add(workid Workunit_Unique_Identifier) (err error)
- func (client *Client) AppendSkipwork(workid Workunit_Unique_Identifier, writeLock bool) (err error)
- func (client *Client) ContainsSkipWorkNolock(workid string) (c bool)
- func (client *Client) GetAck() (ack CoAck, err error)
- func (client *Client) GetBusy(doReadLock bool) (b bool, err error)
- func (client *Client) GetGroup(doReadLock bool) (g string, err error)
- func (client *Client) GetID(doReadLock bool) (s string, err error)
- func (client *Client) GetLastFailed() (count int, err error)
- func (client *Client) GetNewStatus(doReadLock bool) (s string, err error)
- func (client *Client) GetSuspended(doReadLock bool) (s bool, err error)
- func (client *Client) GetTotalCheckout() (count int, err error)
- func (client *Client) GetTotalCompleted() (count int, err error)
- func (client *Client) GetTotalFailed() (count int, err error)
- func (client *Client) IncrementLastFailed(writeLock bool) (value int, err error)
- func (client *Client) IncrementTotalCheckout(err error)
- func (client *Client) IncrementTotalCompleted() (err error)
- func (client *Client) IncrementTotalFailed(writeLock bool) (err error)
- func (client *Client) Init()
- func (client *Client) Marshal() (result []byte, err error)
- func (client *Client) Resume(writeLock bool) (err error)
- func (client *Client) SetBusy(b bool, writeLock bool) (err error)
- func (client *Client) SetOnline(o bool, writeLock bool) (err error)
- func (client *Client) SetStatusDeprecated(s string, writeLock bool) (err error)
- func (client *Client) SetSuspended(s bool, reason string, writeLock bool) (err error)
- func (client *Client) Suspend(reason string, writeLock bool) (err error)
- func (client *Client) UpdateStatus(writeLock bool) (err error)
- type ClientGroup
- func CreateClientGroup(name string, u *user.User) (cg *ClientGroup, err error)
- func LoadClientGroup(id string) (clientgroup *ClientGroup, err error)
- func LoadClientGroupByName(name string) (clientgroup *ClientGroup, err error)
- func LoadClientGroupByToken(token string) (clientgroup *ClientGroup, err error)
- type ClientGroups
- type ClientMap
- func (cl *ClientMap) Add(client *Client, lock bool) (err error)
- func (cl *ClientMap) Delete(clientID string, lock bool) (err error)
- func (cl *ClientMap) Get(clientID string, lock bool) (client *Client, ok bool, err error)
- func (cl *ClientMap) GetClientIds() (ids []string, err error)
- func (cl *ClientMap) GetClients() (clients []*Client, err error)
- func (cl *ClientMap) Has(clientID string, lock bool) (ok bool, err error)
- type ClientMgr
- type ClientWorkMgr
- type Clients
- type CoAck
- type Command
- type CommandP
- type DefaultQueryOptions
- type EnvironP
- type Envs
- type FilterWorkStats
- type FormFile
- type FormFiles
- type HeartbeatInstructions
- type Helper
- type IO
- type IOmap
- type Info
- type Job
- func CWL2AWE(_user *user.User, files FormFiles, jobInput *cwl.Job_document, ...) (job *Job, err error)
- func CreateJobImport(u *user.User, file FormFile) (job *Job, err error)
- func CreateJobUpload(u *user.User, files FormFiles) (job *Job, err error)
- func GetJob(id string) (job *Job, err error)
- func JobDepToJob(jobDep *JobDep) (job *Job, err error)
- func LoadJob(id string) (job *Job, err error)
- func NewJob() (job *Job)
- func ReadJobFile(filename string) (job *Job, err error)
- func (job *Job) AddTask(task *Task) (err error)
- func (job *Job) AddWorkflowInstance(wi *WorkflowInstance, db_sync bool, writeLock bool) (err error)
- func (job *Job) Delete() (err error)
- func (job *Job) FilePath() (path string, err error)
- func (job *Job) GetDataToken() (token string)
- func (job *Job) GetJobLogs() (jlog *JobLog, err error)
- func (job *Job) GetPrivateEnv(taskid string) (env map[string]string, err error)
- func (job *Job) GetRemainTasks() (remainTasks int, err error)
- func (job *Job) GetState(do_lock bool) (state string, err error)
- func (job *Job) GetTasks() (tasks []*Task, err error)
- func (job *Job) GetWorkflowInstance(id string, do_read_lock bool) (wi *WorkflowInstance, ok bool, err error)
- func (job *Job) IncrementRemainTasks(inc int) (err error)
- func (job *Job) IncrementResumed(inc int) (err error)
- func (job *Job) Init() (changed bool, err error)
- func (job *Job) Mkdir() (err error)
- func (job *Job) NumTask() int
- func (job *Job) Path() (path string, err error)
- func (job *Job) RLockRecursive()
- func (job *Job) RUnlockRecursive()
- func (job *Job) Rmdir() (err error)
- func (job *Job) Save() (err error)
- func (job *Job) SaveToDisk() (err error)
- func (job *Job) SetClientgroups(clientgroups string) (err error)
- func (job *Job) SetDataToken(token string) (err error)
- func (job *Job) SetError(newError *JobError) (err error)
- func (job *Job) SetExpiration(expire string) (err error)
- func (job *Job) SetFile(file FormFile) (err error)
- func (job *Job) SetPipeline(pipeline string) (err error)
- func (job *Job) SetPriority(priority int) (err error)
- func (job *Job) SetRemainTasks(remainTasks int) (err error)
- func (job *Job) SetState(newState string, oldstates []string) (err error)
- func (job *Job) TaskList() []*Task
- func (job *Job) UpdateFile(files FormFiles, field string) (err error)
- type JobACL
- type JobDep
- type JobError
- type JobLog
- type JobMap
- type JobMgr
- type JobMin
- type JobP
- type JobPerf
- type JobRaw
- type JobReaper
- type Jobs
- func (n *Jobs) GetAll(q bson.M, order string, direction string, do_init bool) (err error)
- func (n *Jobs) GetAllLimitOffset(q bson.M, limit int, offset int) (err error)
- func (n *Jobs) GetAllRecent(q bson.M, recent int, do_init bool) (count int, err error)
- func (n *Jobs) GetAllUnsorted(q bson.M) (err error)
- func (n *Jobs) GetJobAt(index int) *Job
- func (n *Jobs) GetPaginated(q bson.M, limit int, offset int, order string, direction string, do_init bool) (count int, err error)
- func (n *Jobs) Init() (changed_count int, err error)
- func (n *Jobs) Length() int
- func (n *Jobs) RLockRecursive()
- func (n *Jobs) RUnlockRecursive()
- type MultipartWriter
- func (m *MultipartWriter) AddDataAsFile(fieldname string, filepath string, data *[]byte) (err error)
- func (m *MultipartWriter) AddFile(fieldname string, filepath string) (err error)
- func (m *MultipartWriter) AddForm(fieldname string, value string) (err error)
- func (m *MultipartWriter) Send(method string, url string, header map[string][]string) (response *http.Response, err error)
- type Notice
- type PartInfo
- type Pipeline
- type ProxyMgr
- func (qm *ProxyMgr) ClientChecker()
- func (qm *ProxyMgr) ClientHandle()
- func (qm *ProxyMgr) DeleteJobByUser(jobid string, u *user.User, full bool) (err error)
- func (qm *ProxyMgr) DeleteSuspendedJobsByUser(u *user.User, full bool) (num int)
- func (qm *ProxyMgr) DeleteZombieJobsByUser(u *user.User, full bool) (num int)
- func (qm *ProxyMgr) EnqueueTasksByJobId(jobid string) (err error)
- func (qm *ProxyMgr) FetchDataToken(workunit *Workunit, clientid string) (token string, err error)
- func (qm *ProxyMgr) FetchPrivateEnv(workid string, clientid string) (env map[string]string, err error)
- func (qm *ProxyMgr) FinalizeWorkPerf(string, string) (err error)
- func (qm *ProxyMgr) GetActiveJobs() map[string]bool
- func (qm *ProxyMgr) GetJsonStatus() (status map[string]map[string]int, err error)
- func (qm *ProxyMgr) GetQueue(name string) interface{}
- func (qm *ProxyMgr) GetReportMsg(string, string) (report string, err error)
- func (qm *ProxyMgr) GetSuspendJobs() map[string]bool
- func (qm *ProxyMgr) GetTextStatus() string
- func (qm *ProxyMgr) IsJobRegistered(id string) bool
- func (qm *ProxyMgr) JobRegister() (jid string, err error)
- func (qm *ProxyMgr) Lock()
- func (qm *ProxyMgr) NoticeHandle()
- func (qm *ProxyMgr) QueueStatus() string
- func (qm *ProxyMgr) RLock()
- func (qm *ProxyMgr) RUnlock()
- func (qm *ProxyMgr) RecomputeJob(jobid string, stage string) (err error)
- func (qm *ProxyMgr) RecoverJob(id string, job *Job) (err error)
- func (qm *ProxyMgr) RecoverJobs() (recovered int, total int, err error)
- func (qm *ProxyMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)
- func (qm *ProxyMgr) ResubmitJob(id string) (err error)
- func (qm *ProxyMgr) ResumeQueue()
- func (qm *ProxyMgr) ResumeSuspendedJobByUser(id string, u *user.User) (recovered bool, err error)
- func (qm *ProxyMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)
- func (qm *ProxyMgr) SaveStdLog(string, string, string) (err error)
- func (qm *ProxyMgr) SuspendJob(jobid string, jerror *JobError) (err error)
- func (qm *ProxyMgr) SuspendQueue()
- func (qm *ProxyMgr) Unlock()
- func (qm *ProxyMgr) UpdateQueueLoop()
- func (qm *ProxyMgr) UpdateQueueToken(job *Job) (err error)
- type RequestQueue
- type ResourceMgr
- type ServerMgr
- func (qm *ServerMgr) ClientHandle()
- func (qm *ServerMgr) CreateAndEnqueueWorkunits(task *Task, job *Job) (err error)
- func (qm *ServerMgr) CreateJobPerf(jobid string)
- func (qm *ServerMgr) CreateTaskPerf(task *Task) (err error)
- func (qm *ServerMgr) CreateWorkPerf(id Workunit_Unique_Identifier) (err error)
- func (qm *ServerMgr) DeleteJobByUser(jobid string, u *user.User, full bool) (err error)
- func (qm *ServerMgr) DeleteSuspendedJobsByUser(u *user.User, full bool) (num int)
- func (qm *ServerMgr) DeleteZombieJobsByUser(u *user.User, full bool) (num int)
- func (qm *ServerMgr) EnqueueTasks(tasks []*Task) (err error)
- func (qm *ServerMgr) EnqueueTasksByJobId(jobid string, caller string) (err error)
- func (qm *ServerMgr) EnqueueWorkflowInstance(wi *WorkflowInstance) (err error)
- func (qm *ServerMgr) FetchDataToken(work_id Workunit_Unique_Identifier, clientid string) (token string, err error)
- func (qm *ServerMgr) FetchPrivateEnv(id Workunit_Unique_Identifier, clientid string) (env map[string]string, err error)
- func (qm *ServerMgr) FinalizeJobPerf(jobid string)
- func (qm *ServerMgr) FinalizeTaskPerf(task *Task) (err error)
- func (qm *ServerMgr) FinalizeWorkPerf(id Workunit_Unique_Identifier, reportfile string) (err error)
- func (qm *ServerMgr) GetActiveJobs() (ajobs map[string]bool)
- func (qm *ServerMgr) GetDependencies(job *Job, workflow_instance *WorkflowInstance, ...) (err error)
- func (qm *ServerMgr) GetJsonStatus() (status map[string]map[string]int, err error)
- func (qm *ServerMgr) GetQueue(name string) interface{}
- func (qm *ServerMgr) GetReportMsg(id Workunit_Unique_Identifier, logname string) (report string, err error)
- func (qm *ServerMgr) GetSourceFromWorkflowInstanceInput(workflow_instance *WorkflowInstance, src string, context *cwl.WorkflowContext, ...) (obj cwl.CWLType, ok bool, reason string, err error)
- func (qm *ServerMgr) GetStepInputObjects(job *Job, workflow_instance *WorkflowInstance, ...) (workunit_input_map cwl.JobDocMap, ok bool, reason string, err error)
- func (qm *ServerMgr) GetSuspendJobs() (sjobs map[string]bool)
- func (qm *ServerMgr) GetTextStatus() string
- func (qm *ServerMgr) IsJobRegistered(id string) bool
- func (qm *ServerMgr) Is_WI_ready(job *Job, wi *WorkflowInstance) (ready bool, reason string, err error)
- func (qm *ServerMgr) Lock()
- func (qm *ServerMgr) LogJobPerf(jobid string)
- func (qm *ServerMgr) NoticeHandle()
- func (qm *ServerMgr) QueueStatus() string
- func (qm *ServerMgr) RLock()
- func (qm *ServerMgr) RUnlock()
- func (qm *ServerMgr) RecomputeJob(jobid string, task_stage string) (err error)
- func (qm *ServerMgr) RecoverJob(id string, job *Job) (recovered bool, err error)
- func (qm *ServerMgr) RecoverJobs() (recovered int, total int, err error)
- func (qm *ServerMgr) ResubmitJob(jobid string) (err error)
- func (qm *ServerMgr) ResumeQueue()
- func (qm *ServerMgr) ResumeSuspendedJobByUser(id string, u *user.User) (err error)
- func (qm *ServerMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)
- func (qm *ServerMgr) SaveStdLog(id Workunit_Unique_Identifier, logname string, tmppath string) (err error)
- func (qm *ServerMgr) SuspendJob(jobid string, jerror *JobError) (err error)
- func (qm *ServerMgr) SuspendQueue()
- func (qm *ServerMgr) Unlock()
- func (qm *ServerMgr) UpdateJobPerfStartTime(jobid string)
- func (qm *ServerMgr) UpdateJobTaskToInProgress(works []*Workunit) (err error)
- func (qm *ServerMgr) UpdateQueueLoop()
- func (qm *ServerMgr) UpdateQueueToken(job *Job) (err error)
- func (qm *ServerMgr) UpdateTaskPerfStartTime(task *Task) (err error)
- type SetCounter
- type StandardResponse
- type StructContainer
- type Task
- func CreateWorkflowTasks(job *Job, namePrefix string, steps []cwl.WorkflowStep, stepPrefix string, ...) (tasks []*Task, err error)
- func NewTask(job *Job, workflow_instance_id string, task_id_str string) (t *Task, err error)
- func NewTaskFromInterface(original interface{}, context *cwl.WorkflowContext) (task *Task, err error)
- func NewTasksFromInterface(original interface{}, context *cwl.WorkflowContext) (tasks []*Task, err error)
- func (task *Task) CollectDependencies() (changed bool, err error)
- func (task *Task) CreateInputIndexes() (err error)
- func (task *Task) CreateOutputIndexes() (err error)
- func (task *Task) CreateWorkunits(qm *ServerMgr, job *Job) (wus []*Workunit, err error)
- func (task *Task) DeleteInput() (modified int)
- func (task *Task) DeleteLogs(logname string, writelock bool) (err error)
- func (task *Task) DeleteOutput() (modified int)
- func (task *Task) GetOutput(filename string) (output *IO, err error)
- func (task *Task) GetOutputs() (outputs []*IO, err error)
- func (task *Task) GetStepOutput(name string) (obj cwl.CWLType, ok bool, reason string, err error)
- func (task *Task) GetStepOutputNames() (names []string, err error)
- func (task *Task) GetTaskLogs() (tlog *TaskLog, err error)
- func (task *Task) IncrementComputeTime(inc int) (err error)
- func (task *Task) IncrementRemainWork(inc int, writelock bool) (remainwork int, err error)
- func (task *Task) Init(job *Job, job_id string) (changed bool, err error)
- func (task *Task) InitPartIndex() (err error)
- func (task *Task) ResetTaskTrue(name string) (err error)
- func (task *Task) SetRemainWork(num int, writelock bool) (err error)
- func (task *Task) SetResetTask(info *Info) (err error)
- func (task *Task) SetResetTaskInputs() (err error)
- func (task *Task) SetResetTaskOutputs() (err error)
- func (task *Task) SetTaskType(type_str string, writelock bool) (err error)
- func (task *Task) ValidateDependants(qm *ServerMgr) (reason string, err error)
- func (task *Task) ValidateInputs(qm *ServerMgr) (err error)
- func (task *Task) ValidateOutputs() (err error)
- func (task *Task) ValidatePredata() (err error)
- type TaskDep
- type TaskLog
- type TaskMap
- func (tm *TaskMap) Add(task *Task, caller string) (err error)
- func (tm *TaskMap) Delete(taskid Task_Unique_Identifier) (task *Task, ok bool, err error)
- func (tm *TaskMap) Get(taskid Task_Unique_Identifier, lock bool) (task *Task, ok bool, err error)
- func (tm *TaskMap) GetTasks() (tasks []*Task, err error)
- func (tm *TaskMap) Has(taskid Task_Unique_Identifier, lock bool) (ok bool, err error)
- func (tm *TaskMap) Len() (length int, err error)
- type TaskP
- type TaskPerf
- type TaskRaw
- func (task *TaskRaw) Finalize() (ok bool, err error)
- func (task *TaskRaw) GetDependsOn() (dep []string, err error)
- func (task *TaskRaw) GetId(me string) (id Task_Unique_Identifier, err error)
- func (task *TaskRaw) GetJob() (job *Job, err error)
- func (task *TaskRaw) GetJobId() (id string, err error)
- func (task *TaskRaw) GetScatterChildren(wi *WorkflowInstance, qm *ServerMgr) (children []*Task, err error)
- func (task *TaskRaw) GetState() (state string, err error)
- func (task *TaskRaw) GetStateNamed(name string) (state string, err error)
- func (task *TaskRaw) GetTaskType() (type_str string, err error)
- func (task *TaskRaw) GetWorkflowInstance() (wi *WorkflowInstance, ok bool, err error)
- func (task *TaskRaw) InitRaw(job *Job, job_id string) (changed bool, err error)
- func (task *TaskRaw) SetCompletedDate(t time.Time, lock bool) (err error)
- func (task *TaskRaw) SetCreatedDate(t time.Time) (err error)
- func (task *TaskRaw) SetScatterChildren(qm *ServerMgr, scatterChildren []string, writelock bool) (err error)
- func (task *TaskRaw) SetStartedDate(t time.Time) (err error)
- func (task *TaskRaw) SetState(wi *WorkflowInstance, new_state string, writeLock bool) (err error)
- func (task *TaskRaw) SetStepOutput(jd *cwl.Job_document, lock bool) (err error)
- type Task_Unique_Identifier
- type WorkList
- type WorkLog
- type WorkMgr
- type WorkPerf
- type WorkQueue
- func (wq *WorkQueue) Add(workunit *Workunit) (err error)
- func (wq *WorkQueue) Clean() (workunits []*Workunit)
- func (wq *WorkQueue) Delete(id Workunit_Unique_Identifier) (err error)
- func (wq *WorkQueue) Get(id Workunit_Unique_Identifier) (w *Workunit, ok bool, err error)
- func (wq *WorkQueue) GetAll() (worklist []*Workunit, err error)
- func (wq *WorkQueue) GetForJob(jobid string) (worklist []*Workunit, err error)
- func (wq *WorkQueue) Has(id Workunit_Unique_Identifier) (has bool, err error)
- func (wq *WorkQueue) Len() (int, error)
- func (wq *WorkQueue) StatusChange(id Workunit_Unique_Identifier, workunit *Workunit, new_status string, ...) (err error)
- type WorkerRuntime
- type WorkerState
- type Workflow
- type WorkflowInstance
- func NewWorkflowInstance(localID string, jobid string, workflowDefinition string, job *Job, ...) (wi *WorkflowInstance, err error)
- func NewWorkflowInstanceArrayFromInterface(original []interface{}, job *Job, context *cwl.WorkflowContext) (wis []*WorkflowInstance, err error)
- func NewWorkflowInstanceFromInterface(original interface{}, job *Job, context *cwl.WorkflowContext, doInit bool) (wi *WorkflowInstance, err error)
- func (wi *WorkflowInstance) AddSubworkflow(job *Job, subworkflow string, writeLock bool) (err error)
- func (wi *WorkflowInstance) AddTask(job *Job, task *Task, dbSync bool, writeLock bool) (err error)
- func (wi *WorkflowInstance) GetID(readLock bool) (id string, err error)
- func (wi *WorkflowInstance) GetJob(readLock bool) (job *Job, err error)
- func (wi *WorkflowInstance) GetOutput(name string, readLock bool) (obj cwl.CWLType, ok bool, err error)
- func (wi *WorkflowInstance) GetParent(readLock bool) (parent *WorkflowInstance, err error)
- func (wi *WorkflowInstance) GetParentID(readLock bool) (parentID string, err error)
- func (wi *WorkflowInstance) GetParentRaw(readLock bool) (parent *WorkflowInstance, err error)
- func (wi *WorkflowInstance) GetParentStep_DEPRECATED(readLock bool) (pstep *cwl.WorkflowStep, err error)
- func (wi *WorkflowInstance) GetParentStep_cached_DEPRECATED() (pstep *cwl.WorkflowStep, err error)
- func (wi *WorkflowInstance) GetRemainSteps(readLock bool) (remain int, err error)
- func (wi *WorkflowInstance) GetState(readLock bool) (state string, err error)
- func (wi *WorkflowInstance) GetTask(taskID Task_Unique_Identifier, readLock bool) (task *Task, ok bool, err error)
- func (wi *WorkflowInstance) GetTaskByName(taskName string, readLock bool) (task *Task, ok bool, err error)
- func (wi *WorkflowInstance) GetTasks(readLock bool) (tasks []*Task, err error)
- func (wi *WorkflowInstance) GetWorkflow(context *cwl.WorkflowContext) (workflow *cwl.Workflow, err error)
- func (wi *WorkflowInstance) IncrementRemainSteps(amount int, writeLock bool) (remain int, err error)
- func (wi *WorkflowInstance) Init(job *Job) (changed bool, err error)
- func (wi *WorkflowInstance) Save(readLock bool) (err error)
- func (wi *WorkflowInstance) SetOutputs(outputs cwl.Job_document, context *cwl.WorkflowContext, writeLock bool) (err error)
- func (wi *WorkflowInstance) SetState(state string, dbSync bool, writeLock bool) (err error)
- func (wi *WorkflowInstance) SetSubworkflows(steps []string, writeLock bool) (err error)
- func (wi *WorkflowInstance) TaskCount() (count int)
- type WorkflowInstanceMap
- type WorkflowMgr
- type Workunit
- func (work *Workunit) CDworkpath() (err error)
- func (w *Workunit) Evaluate(inputs interface{}, context *cwl.WorkflowContext) (err error)
- func (work *Workunit) FetchDataToken() (token string, err error)
- func (w *Workunit) GetID() (id Workunit_Unique_Identifier)
- func (work *Workunit) GetIdBase64() (work_id_b64 string, err error)
- func (work *Workunit) GetNotes() string
- func (work *Workunit) Mkdir() (err error)
- func (work *Workunit) Part() (part string)
- func (work *Workunit) Path() (path string, err error)
- func (work *Workunit) RemoveDir() (err error)
- func (work *Workunit) SetState(new_state string, reason string) (err error)
- type WorkunitList
- func (cl *WorkunitList) Add(workid Workunit_Unique_Identifier) (err error)
- func (cl *WorkunitList) Delete(workid Workunit_Unique_Identifier, writeLock bool) (err error)
- func (cl *WorkunitList) Delete_all(workid string, writeLock bool) (err error)
- func (cl *WorkunitList) FillMap() (err error)
- func (cl *WorkunitList) Get_list(do_read_lock bool) (assigned_work_ids []Workunit_Unique_Identifier, err error)
- func (cl *WorkunitList) Get_string_list(do_read_lock bool) (work_ids []string, err error)
- func (cl *WorkunitList) Has(workid Workunit_Unique_Identifier) (ok bool, err error)
- func (this *WorkunitList) Init(name string)
- func (cl *WorkunitList) Length(lock bool) (clength int, err error)
- type WorkunitMap
- func (wm *WorkunitMap) Delete(id Workunit_Unique_Identifier) (err error)
- func (wm *WorkunitMap) Get(id Workunit_Unique_Identifier) (workunit *Workunit, ok bool, err error)
- func (wm *WorkunitMap) GetWorkunits() (workunits []*Workunit, err error)
- func (wm *WorkunitMap) Len() (length int, err error)
- func (wm *WorkunitMap) Set(workunit *Workunit) (err error)
- type WorkunitState
- type Workunit_Unique_Identifier
- func New_Workunit_Unique_Identifier(task Task_Unique_Identifier, rank int) (wui Workunit_Unique_Identifier)
- func New_Workunit_Unique_Identifier_FromString(old_style_id string) (w Workunit_Unique_Identifier, err error)
- func New_Workunit_Unique_Identifier_from_interface(original interface{}) (wui Workunit_Unique_Identifier, err error)
- type WorkunitsSortby
Constants ¶
const ( // DbSyncTrue _ DbSyncTrue = true // DbSyncFalse _ DbSyncFalse = false )
const ( JOB_STAT_INIT = "init" // inital state JOB_STAT_QUEUING = "queuing" // transition from "init" to "queued" JOB_STAT_QUEUED = "queued" // all tasks have been added to taskmap JOB_STAT_INPROGRESS = "in-progress" // a first task went into state in-progress JOB_STAT_COMPLETED = "completed" JOB_STAT_SUSPEND = "suspend" JOB_STAT_FAILED_PERMANENT = "failed-permanent" // this sepcific error state can be trigger by the workflow software JOB_STAT_DELETED = "deleted" )
const ( TASK_STAT_INIT = "init" // initial state on creation of a task TASK_STAT_PENDING = "pending" // a task that wants to be enqueued (but dependent tasks are not complete) TASK_STAT_READY = "ready" // a task ready to be enqueued (all dependent tasks are complete , but workunits habe not yet been created) TASK_STAT_QUEUED = "queued" // a task for which workunits have been created/queued TASK_STAT_INPROGRESS = "in-progress" // a first workunit has been checkout (this does not guarantee a workunit is running right now) TASK_STAT_SUSPEND = "suspend" TASK_STAT_FAILED = "failed" // deprecated ? TASK_STAT_FAILED_PERMANENT = "failed-permanent" // on exit code 42 TASK_STAT_COMPLETED = "completed" TASK_STAT_SKIPPED = "user_skipped" // deprecated TASK_STAT_FAIL_SKIP = "skipped" // deprecated TASK_STAT_PASSED = "passed" // deprecated ? )
const ( TASK_TYPE_UNKNOWN = "" TASK_TYPE_SCATTER = "scatter" //TASK_TYPE_WORKFLOW = "workflow" TASK_TYPE_NORMAL = "normal" )
const ( // WIStateInit initial state on creation WIStateInit = "init" // WIStatePending wants to be enqueued but may have unresolved dependencies WIStatePending = "pending" // WIStateReady a task ready to be enqueued/evaluated (tasks can be enqueued) WIStateReady = "ready" // WIStateQueued tasks have been created WIStateQueued = "queued" // WIStateCompleted _ WIStateCompleted = "completed" // WIStateSuspended _ WIStateSuspended = "suspend" )
const ( WORK_STAT_INIT = "init" // initial state WORK_STAT_QUEUED = "queued" // after requeue ; after failures below max ; on WorkQueue.Add() WORK_STAT_RESERVED = "reserved" // short lived state between queued and checkout. when a worker checks the workunit out, the state is reserved. WORK_STAT_CHECKOUT = "checkout" // normal work checkout ; client registers that already has a workunit (e.g. after reboot of server) WORK_STAT_SUSPEND = "suspend" // on MAX_FAILURE ; on SuspendJob WORK_STAT_FAILED_PERMANENT = "failed-permanent" // app had exit code 42 WORK_STAT_DONE = "done" // client only: done WORK_STAT_ERROR = "fail" // client only: workunit computation or IO error (variable was renamed to ERROR but not the string fail, to maintain backwards compability) WORK_STAT_PREPARED = "prepared" // client only: after argument parsing WORK_STAT_COMPUTED = "computed" // client only: after computation is done, before upload WORK_STAT_DISCARDED = "discarded" // client only: job / task suspended or server UUID changes WORK_STAT_PROXYQUEUED = "proxyqueued" // proxy only )
Variables ¶
var ( //QMgr ResourceMgr QMgr *ServerMgr // Service _ Service = "unknown" // Self _ Self *Client // ProxyWorkChan _ ProxyWorkChan chan bool // ServerUUID _ ServerUUID string // JM _ JM *JobMap // GlobalWorkflowInstanceMap _ GlobalWorkflowInstanceMap *WorkflowInstanceMap // StartTime _ StartTime time.Time )
var ( // CGNameRegex _ CGNameRegex = regexp.MustCompile(`^[A-Za-z0-9\_\-\.]+$`) )
var DocumentMaxByte = 16777216
mongodb has hard limit of 16 MB docuemnt size
var JOB_STATS_ACTIVE = []string{JOB_STAT_QUEUING, JOB_STAT_QUEUED, JOB_STAT_INPROGRESS}
var JOB_STATS_REGISTERED = []string{JOB_STAT_QUEUING, JOB_STAT_QUEUED, JOB_STAT_INPROGRESS, JOB_STAT_SUSPEND}
var JOB_STATS_TO_RECOVER = []string{JOB_STAT_INIT, JOB_STAT_QUEUING, JOB_STAT_QUEUED, JOB_STAT_INPROGRESS, JOB_STAT_SUSPEND}
var JobInfoIndexes = []string{"name", "submittime", "completedtime", "pipeline", "clientgroups", "project", "service", "user", "priority", "userattr.submission"}
JobInfoIndexes _
var TASK_STATS_RESET = []string{TASK_STAT_QUEUED, TASK_STAT_INPROGRESS, TASK_STAT_SUSPEND}
Functions ¶
func CWLInputCheck ¶
func CWLInputCheck(jobInput *cwl.Job_document, cwlWorkflow *cwl.Workflow, context *cwl.WorkflowContext) (jobInputNew *cwl.Job_document, err error)
CWLInputCheck _
func DBGetJobWorkflow_instance_One ¶
func DBGetJobWorkflow_instance_One(q bson.M, options *DefaultQueryOptions, do_init bool) (result interface{}, err error)
func DbFindDistinct ¶ added in v0.9.26
DbFindDistinct _
func DbUpdateJobField ¶ added in v0.9.33
DbUpdateJobField _
func DeleteClientGroup ¶ added in v0.9.3
DeleteClientGroup _
func Deserialize_b64 ¶ added in v0.9.62
func FixTimeInMap ¶
func GetAdminView ¶ added in v0.9.62
patch the admin view data function from the job controller through to the db.go
func GetJobIDByTaskIDDeprecated ¶
GetJobIDByTaskIDDeprecated _
func GetJobIDByWorkIDDeprecated ¶
GetJobIDByWorkIDDeprecated _
func GetTaskIDByWorkIDDeprecated ¶
GetTaskIDByWorkIDDeprecated _
func InitReaper ¶ added in v0.9.26
func InitReaper()
func IsValidUUID ¶ added in v0.9.62
func NotifyWorkunitProcessed ¶
NotifyWorkunitProcessed notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"
func PushOutputData ¶
PushOutputData deprecated, see cache.UploadOutputData
func ReloadFromDisk ¶
func RemoveWorkFromClient ¶ added in v0.9.45
func RemoveWorkFromClient(client *Client, workid Workunit_Unique_Identifier) (err error)
Types ¶
type BaseResponse ¶ added in v0.9.62
BaseResponse _
type CQMgr ¶
type CQMgr struct {
// contains filtered or unexported fields
}
CQMgr this struct is embedded in ServerMgr
func (*CQMgr) CheckClient ¶ added in v0.9.33
CheckClient _
func (*CQMgr) CheckoutWorkunits ¶
func (qm *CQMgr) CheckoutWorkunits(reqPolicy string, clientID string, client *Client, availableBytes int64, num int) (workunits []*Workunit, err error)
CheckoutWorkunits _
func (*CQMgr) ClientHeartBeat ¶
func (qm *CQMgr) ClientHeartBeat(id string, cg *ClientGroup, workerstate WorkerState) (hbmsg HeartbeatInstructions, err error)
ClientHeartBeat this is invoked on server side when clients sends heartbeat
func (*CQMgr) ClientStatusChangeDeprecated ¶
func (qm *CQMgr) ClientStatusChangeDeprecated(client *Client, newStatus string, clientWriteLock bool) (err error)
ClientStatusChangeDeprecated _
func (*CQMgr) DeleteClients ¶ added in v0.9.33
DeleteClients _
func (*CQMgr) EnqueueWorkunit ¶
EnqueueWorkunit _
func (*CQMgr) FetchDataToken ¶
FetchDataToken _
func (*CQMgr) GetAllClientsByUser ¶ added in v0.9.3
GetAllClientsByUser _
func (*CQMgr) GetClientByUser ¶ added in v0.9.3
GetClientByUser _
func (*CQMgr) GetClientMap ¶ added in v0.9.33
GetClientMap _
func (*CQMgr) GetWorkByID ¶
func (qm *CQMgr) GetWorkByID(id Workunit_Unique_Identifier) (workunit *Workunit, err error)
GetWorkByID _
func (*CQMgr) ListClients ¶ added in v0.9.16
ListClients _
func (*CQMgr) NotifyWorkStatus ¶
NotifyWorkStatus _
func (*CQMgr) ReQueueWorkunitByClient ¶
ReQueueWorkunitByClient _
func (*CQMgr) RegisterNewClient ¶
func (qm *CQMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)
RegisterNewClient This can be a new client or an old client that re-registers
func (*CQMgr) RemoveClient ¶ added in v0.9.16
RemoveClient lock is for clientmap
func (*CQMgr) ResumeClientByUser ¶ added in v0.9.3
ResumeClientByUser _
func (*CQMgr) ResumeSuspendedClients ¶
ResumeSuspendedClients _
func (*CQMgr) ResumeSuspendedClientsByUser ¶ added in v0.9.3
ResumeSuspendedClientsByUser _
func (*CQMgr) ShowWorkunits ¶
ShowWorkunits _
func (*CQMgr) ShowWorkunitsByUser ¶ added in v0.9.3
ShowWorkunitsByUser _
func (*CQMgr) SuspendAllClients ¶
SuspendAllClients _
func (*CQMgr) SuspendAllClientsByUser ¶ added in v0.9.3
SuspendAllClientsByUser _
func (*CQMgr) SuspendClient ¶
func (qm *CQMgr) SuspendClient(id string, client *Client, reason string, clientWriteLock bool) (err error)
SuspendClient use id OR client
func (*CQMgr) SuspendClientByUser ¶ added in v0.9.3
SuspendClientByUser _
func (*CQMgr) UpdateSubClients ¶
UpdateSubClients _
type CWLWorkunit ¶
type CWLWorkunit struct { JobInput *cwl.Job_document `bson:"job_input,omitempty" json:"job_input,omitempty" mapstructure:"job_input,omitempty"` JobInputFilename string `bson:"job_input_filename,omitempty" json:"job_input_filename,omitempty" mapstructure:"job_input_filename,omitempty"` //CWL_tool *cwl.CommandLineTool `bson:"cwl_tool,omitempty" json:"cwl_tool,omitempty" mapstructure:"cwl_tool,omitempty"` //CWL_tool_filename string `bson:"cwl_tool_filename,omitempty" json:"cwl_tool_filename,omitempty" mapstructure:"cwl_tool_filename,omitempty"` Tool interface{} `bson:"tool,omitempty" json:"tool,omitempty" mapstructure:"tool,omitempty"` ToolFilename string `bson:"tool_filename,omitempty" json:"tool_filename,omitempty" mapstructure:"tool_filename,omitempty"` Outputs *cwl.Job_document `bson:"outputs,omitempty" json:"outputs,omitempty" mapstructure:"outputs,omitempty"` OutputsExpected *[]cwl.WorkflowStepOutput `bson:"outputs_expected,omitempty" json:"outputs_expected,omitempty" mapstructure:"outputs_expected,omitempty"` // this is the subset of outputs that are needed by the workflow Notice `bson:",inline" json:",inline" mapstructure:",squash"` }
CWLWorkunit _
func NewCWLWorkunitFromInterface ¶
func NewCWLWorkunitFromInterface(native interface{}, context *cwl.WorkflowContext) (workunit *CWLWorkunit, schemata []cwl.CWLType_Type, err error)
NewCWLWorkunitFromInterface _
type CheckoutRequest ¶
type CheckoutRequest struct {
// contains filtered or unexported fields
}
CheckoutRequest Object used by worker to request a workunit
type Client ¶
type Client struct { rwmutex.RWMutex `bson:"-" json:"-"` WorkerRuntime `bson:",inline" json:",inline"` WorkerState `bson:",inline" json:",inline"` RegTime time.Time `bson:"regtime" json:"regtime"` LastCompleted time.Time `bson:"lastcompleted" json:"lastcompleted"` // time of last time a job was completed (can be used to compute idle time) ServeTime string `bson:"serve_time" json:"serve_time"` TotalCheckout int `bson:"total_checkout" json:"total_checkout"` TotalCompleted int `bson:"total_completed" json:"total_completed"` TotalFailed int `bson:"total_failed" json:"total_failed"` SkipWork []string `bson:"skip_work" json:"skip_work"` LastFailed int `bson:"-" json:"-"` Tag bool `bson:"-" json:"-"` Proxy bool `bson:"proxy" json:"proxy"` SubClients int `bson:"subclients" json:"subclients"` Online bool `bson:"online" json:"online"` // a state Suspended bool `bson:"suspended" json:"suspended"` // a state SuspendReason string `bson:"suspend_reason" json:"suspend_reason"` // a state Status string `bson:"Status" json:"Status"` // 0) unhealthy 1) suspended? 2) busy ? 3) online (call is idle) 4) offline AssignedWork *WorkunitList `bson:"assigned_work" json:"assigned_work"` // this is for exporting into json // contains filtered or unexported fields }
Client this is the Worker
func NewProfileClient ¶
NewProfileClient : create Client object from json file
func (*Client) Add ¶ added in v0.9.62
func (client *Client) Add(workid Workunit_Unique_Identifier) (err error)
Add _
func (*Client) AppendSkipwork ¶
func (client *Client) AppendSkipwork(workid Workunit_Unique_Identifier, writeLock bool) (err error)
AppendSkipwork _
func (*Client) ContainsSkipWorkNolock ¶
ContainsSkipWorkNolock _
func (*Client) GetLastFailed ¶
GetLastFailed _
func (*Client) GetNewStatus ¶
GetNewStatus this function should not be used internally, this is only for backwards-compatibility and human readability
func (*Client) GetSuspended ¶
GetSuspended _
func (*Client) GetTotalCheckout ¶
GetTotalCheckout _
func (*Client) GetTotalCompleted ¶
GetTotalCompleted _
func (*Client) GetTotalFailed ¶
GetTotalFailed _
func (*Client) IncrementLastFailed ¶
IncrementLastFailed _
func (*Client) IncrementTotalCheckout ¶
IncrementTotalCheckout _
func (*Client) IncrementTotalCompleted ¶
IncrementTotalCompleted _
func (*Client) IncrementTotalFailed ¶
IncrementTotalFailed _
func (*Client) Init ¶ added in v0.9.33
func (client *Client) Init()
Init : invoked by NewClient or manually after unmarshalling
func (*Client) SetStatusDeprecated ¶
SetStatusDeprecated _
func (*Client) SetSuspended ¶
SetSuspended _
func (*Client) UpdateStatus ¶
UpdateStatus _
type ClientGroup ¶ added in v0.9.3
type ClientGroup struct { ID string `bson:"id" json:"id"` IPCidr string `bson:"ip_cidr" json:"ip_cidr"` Name string `bson:"name" json:"name"` Token string `bson:"token" json:"token"` ACL clientGroupAcl.ClientGroupAcl `bson:"acl" json:"-"` CreatedOn time.Time `bson:"created_on" json:"created_on"` Expiration time.Time `bson:"expiration" json:"expiration"` LastModified time.Time `bson:"last_modified" json:"last_modified"` }
ClientGroup _
func CreateClientGroup ¶ added in v0.9.3
func CreateClientGroup(name string, u *user.User) (cg *ClientGroup, err error)
CreateClientGroup _
func LoadClientGroup ¶ added in v0.9.3
func LoadClientGroup(id string) (clientgroup *ClientGroup, err error)
LoadClientGroup _
func LoadClientGroupByName ¶ added in v0.9.3
func LoadClientGroupByName(name string) (clientgroup *ClientGroup, err error)
LoadClientGroupByName _
func LoadClientGroupByToken ¶ added in v0.9.3
func LoadClientGroupByToken(token string) (clientgroup *ClientGroup, err error)
LoadClientGroupByToken _
type ClientMap ¶ added in v0.9.33
ClientMap _
func (*ClientMap) GetClientIds ¶ added in v0.9.33
GetClientIds _
func (*ClientMap) GetClients ¶ added in v0.9.33
GetClients _
type ClientMgr ¶
type ClientMgr interface { RegisterNewClient(FormFiles, *ClientGroup) (*Client, error) ClientHeartBeat(string, *ClientGroup, WorkerState) (HeartbeatInstructions, error) GetClient(string, bool) (*Client, bool, error) GetClientByUser(string, *user.User) (*Client, error) //GetAllClients() []*Client GetClientMap() *ClientMap GetAllClientsByUser(*user.User) ([]*Client, error) //DeleteClient(*Client) error //DeleteClientById(string) error //DeleteClientByUser(string, *user.User) error SuspendClient(string, *Client, string, bool) error SuspendClientByUser(string, *user.User, string) error ResumeClient(string) error ResumeClientByUser(string, *user.User) error ResumeSuspendedClients() (int, error) ResumeSuspendedClientsByUser(*user.User) int SuspendAllClients(string) (int, error) SuspendAllClientsByUser(*user.User, string) (int, error) ClientChecker() UpdateSubClients(string, int) error UpdateSubClientsByUser(string, int, *user.User) }
type ClientWorkMgr ¶
type Clients ¶ added in v0.9.33
type Clients []*Client
Clients _
func (*Clients) RLockRecursive ¶ added in v0.9.33
func (cs *Clients) RLockRecursive()
RLockRecursive _
func (*Clients) RUnlockRecursive ¶ added in v0.9.33
func (cs *Clients) RUnlockRecursive()
RUnlockRecursive _
type Command ¶
type Command struct { Name string `bson:"name" json:"name" mapstructure:"name"` Args string `bson:"args" json:"args" mapstructure:"args"` ArgsArray []string `bson:"args_array" json:"args_array" mapstructure:"args_array"` // use this instead of Args, which is just a string Dockerimage string `bson:"Dockerimage" json:"Dockerimage" mapstructure:"Dockerimage"` // for Shock (TODO rename this !) DockerPull string `bson:"dockerPull" json:"dockerPull" mapstructure:"dockerPull"` // docker pull CmdScript []string `bson:"cmd_script" json:"cmd_script" mapstructure:"cmd_script"` Environ Envs `bson:"environ" json:"environ" mapstructure:"environ"` HasPrivateEnv bool `bson:"has_private_env" json:"has_private_env" mapstructure:"has_private_env"` Description string `bson:"description" json:"description" mapstructure:"description"` ParsedArgs []string `bson:"-" json:"-" mapstructure:"-"` Local bool // indicates local execution, i.e. working directory is same as current working directory (do not delete !) }
Command _
type DefaultQueryOptions ¶
type EnvironP ¶
EnvironP following special code is in order to unmarshal the private field Command.Environ.Private,
so put them in to this file for less confusion
type Envs ¶ added in v0.9.3
type Envs struct { Public map[string]string `bson:"public" json:"public"` Private map[string]string `bson:"private" json:"-"` }
Envs _
type FilterWorkStats ¶
FilterWorkStats _
type HeartbeatInstructions ¶ added in v0.9.62
HeartbeatInstructions response from awe-server to awe-worker used for issue operation request to client, e.g. discard suspended workunits
type IO ¶
type IO struct { FileName string `bson:"filename" json:"filename" mapstructure:"filename"` Name string `bson:"name" json:"name" mapstructure:"name"` // specifies abstract name of output as defined by the app AppPosition int `bson:"appposition" json:"-" mapstructure:"-"` // specifies position in app output array Directory string `bson:"directory" json:"directory" mapstructure:"directory"` Host string `bson:"host" json:"host" mapstructure:"host"` Node string `bson:"node" json:"node" mapstructure:"node"` Url string `bson:"url" json:"url" mapstructure:"url"` // can be shock or any other url Size int64 `bson:"size" json:"size" mapstructure:"size"` MD5 string `bson:"md5" json:"-" mapstructure:"-"` Cache bool `bson:"cache" json:"cache" mapstructure:"cache"` // indicates that this files is "predata"" that needs to be cached Origin string `bson:"origin" json:"origin" mapstructure:"origin"` Path string `bson:"-" json:"-" mapstructure:"-"` Optional bool `bson:"optional" json:"-" mapstructure:"-"` Nonzero bool `bson:"nonzero" json:"nonzero" mapstructure:"nonzero"` DataToken string `bson:"datatoken" json:"-" mapstructure:"-"` Intermediate bool `bson:"Intermediate" json:"-" mapstructure:"-"` Temporary bool `bson:"temporary" json:"temporary" mapstructure:"temporary"` ShockFilename string `bson:"shockfilename" json:"shockfilename" mapstructure:"shockfilename"` ShockIndex string `bson:"shockindex" json:"shockindex" mapstructure:"shockindex"` // on input it indicates that Shock node has to be indexed by AWE server AttrFile string `bson:"attrfile" json:"attrfile" mapstructure:"attrfile"` NoFile bool `bson:"nofile" json:"nofile" mapstructure:"nofile"` Delete bool `bson:"delete" json:"delete" mapstructure:"delete"` // speficies that this is a temorary node, to be deleted from shock on job completion Type string `bson:"type" json:"type" mapstructure:"type"` NodeAttr map[string]interface{} `bson:"nodeattr" json:"nodeattr" mapstructure:"nodeattr"` // specifies attribute data to be stored in shock node (output only) FormOptions map[string]string `bson:"formoptions" json:"formoptions" mapstructure:"formoptions"` Uncompress string `bson:"uncompress" json:"uncompress" mapstructure:"uncompress"` // tells AWE client to uncompress this file, e.g. "gzip" Indexes map[string]shock.IdxInfo `bson:"-" json:"-" mapstructure:"-"` // copy of shock node.Indexes, not saved }
func (*IO) DeleteNode ¶ added in v0.9.3
func (*IO) UpdateFileSize ¶ added in v0.9.68
type IOmap ¶
Deprecated JobDep struct uses deprecated TaskDep struct which uses the deprecated IOmap. Maintained for backwards compatibility. Jobs that cannot be parsed into the Job struct, but can be parsed into the JobDep struct will be translated to the new Job struct. (=deprecated=)
type Info ¶
type Info struct { Name string `bson:"name" json:"name" mapstructure:"name"` Xref string `bson:"xref" json:"xref" mapstructure:"xref"` Service string `bson:"service" json:"service" mapstructure:"service"` Project string `bson:"project" json:"project" mapstructure:"project"` User string `bson:"user" json:"user" mapstructure:"user"` Pipeline string `bson:"pipeline" json:"pipeline" mapstructure:"pipeline"` // or workflow ClientGroups string `bson:"clientgroups" json:"clientgroups" mapstructure:"clientgroups"` SubmitTime time.Time `bson:"submittime" json:"submittime" mapstructure:"submittime"` StartedTime time.Time `bson:"startedtime" json:"startedtime" mapstructure:"startedtime"` CompletedTime time.Time `bson:"completedtime" json:"completedtime" mapstructure:"completedtime"` Priority int `bson:"priority" json:"priority" mapstructure:"priority"` Auth bool `bson:"auth" json:"auth" mapstructure:"auth"` DataToken string `bson:"datatoken" json:"-" mapstructure:"-"` NoRetry bool `bson:"noretry" json:"noretry" mapstructure:"noretry"` UserAttr map[string]interface{} `bson:"userattr" json:"userattr" mapstructure:"userattr"` Description string `bson:"description" json:"description" mapstructure:"description"` Tracking bool `bson:"tracking" json:"tracking" mapstructure:"tracking"` StartAt time.Time `bson:"start_at" json:"start_at" mapstructure:"start_at"` // will start tasks at this timepoint or shortly after }
job info
type Job ¶
func CWL2AWE ¶ added in v0.9.33
func CWL2AWE(_user *user.User, files FormFiles, jobInput *cwl.Job_document, cwlWorkflow *cwl.Workflow, context *cwl.WorkflowContext) (job *Job, err error)
CWL2AWE _
func CreateJobImport ¶ added in v0.9.26
CreateJobImport _
func CreateJobUpload ¶
CreateJobUpload _
func JobDepToJob ¶ added in v0.9.16
JobDepToJob Takes the deprecated (version 1) Job struct and returns the version 2 Job struct or an error
func ReadJobFile ¶ added in v0.9.33
ReadJobFile _
func (*Job) AddWorkflowInstance ¶ added in v0.9.62
func (job *Job) AddWorkflowInstance(wi *WorkflowInstance, db_sync bool, writeLock bool) (err error)
func (*Job) GetDataToken ¶
func (*Job) GetJobLogs ¶ added in v0.9.27
func (*Job) GetPrivateEnv ¶ added in v0.9.3
func (*Job) GetRemainTasks ¶ added in v0.9.33
GetRemainTasks _
func (*Job) GetWorkflowInstance ¶ added in v0.9.62
func (*Job) IncrementRemainTasks ¶ added in v0.9.33
IncrementRemainTasks _
func (*Job) IncrementResumed ¶ added in v0.9.33
func (*Job) RLockRecursive ¶ added in v0.9.33
func (job *Job) RLockRecursive()
func (*Job) RUnlockRecursive ¶ added in v0.9.33
func (job *Job) RUnlockRecursive()
func (*Job) SaveToDisk ¶ added in v0.9.33
func (*Job) SetClientgroups ¶ added in v0.9.33
func (*Job) SetDataToken ¶
func (*Job) SetExpiration ¶ added in v0.9.26
func (*Job) SetPipeline ¶ added in v0.9.26
func (*Job) SetPriority ¶ added in v0.9.33
func (*Job) SetRemainTasks ¶ added in v0.9.33
SetRemainTasks _
type JobDep ¶ added in v0.9.16
Deprecated JobDep struct uses deprecated TaskDep struct which uses the deprecated IOmap. Maintained for backwards compatibility. Jobs that cannot be parsed into the Job struct, but can be parsed into the JobDep struct will be translated to the new Job struct. (=deprecated=)
type JobError ¶ added in v0.9.48
type JobError struct { ClientFailed string `bson:"clientfailed" json:"clientfailed,omitempty"` WorkFailed string `bson:"workfailed" json:"workfailed,omitempty"` TaskFailed string `bson:"taskfailed" json:"taskfailed,omitempty"` ServerNotes string `bson:"servernotes" json:"servernotes,omitempty"` WorkNotes string `bson:"worknotes" json:"worknotes,omitempty"` AppError string `bson:"apperror" json:"apperror,omitempty"` Status string `bson:"status" json:"status,omitempty"` }
type JobMap ¶ added in v0.9.33
type JobMgr ¶
type JobMgr interface { EnqueueTasksByJobId(string, string) error GetActiveJobs() map[string]bool IsJobRegistered(string) bool GetSuspendJobs() map[string]bool SuspendJob(string, *JobError) error ResumeSuspendedJobByUser(string, *user.User) error ResumeSuspendedJobsByUser(*user.User) int ResubmitJob(string) error DeleteJobByUser(string, *user.User, bool) error DeleteSuspendedJobsByUser(*user.User, bool) int DeleteZombieJobsByUser(*user.User, bool) int RecoverJob(string, *Job) (bool, error) RecoverJobs() (int, int, error) FinalizeWorkPerf(Workunit_Unique_Identifier, string) error SaveStdLog(Workunit_Unique_Identifier, string, string) error GetReportMsg(Workunit_Unique_Identifier, string) (string, error) RecomputeJob(string, string) error UpdateQueueToken(*Job) error }
type JobMin ¶ added in v0.9.8
type JobMin struct { Id string `bson:"id" json:"id"` Name string `bson:"name" json:"name"` Size int64 `bson:"size" json:"size"` SubmitTime time.Time `bson:"submittime" json:"submittime"` CompletedTime time.Time `bson:"completedtime" json:"completedtime"` ComputeTime int `bson:"computetime" json:"computetime"` Task []int `bson:"task" json:"task"` State []string `bson:"state" json:"state"` UserAttr map[string]interface{} `bson:"userattr" json:"userattr"` }
type JobPerf ¶
type JobPerf struct { Id string `bson:"id" json:"id"` Queued int64 `bson:"queued" json:"queued"` Start int64 `bson:"start" json:"start"` End int64 `bson:"end" json:"end"` Resp int64 `bson:"resp" json:"resp"` //End - Queued Ptasks map[string]*TaskPerf `bson:"task_stats" json:"task_stats"` Pworks map[string]*WorkPerf `bson:"work_stats" json:"work_stats"` }
func NewJobPerf ¶
type JobRaw ¶ added in v0.9.33
type JobRaw struct { rwmutex.RWMutex ID string `bson:"id" json:"id"` // uuid ACL acl.Acl `bson:"acl" json:"-"` Info *Info `bson:"info" json:"info"` Script script `bson:"script" json:"-"` State string `bson:"state" json:"state"` Registered bool `bson:"registered" json:"registered"` RemainTasks int `bson:"remaintasks" json:"remaintasks"` // old-style AWE RemainSteps int `bson:"remainsteps" json:"remainteps"` Expiration time.Time `bson:"expiration" json:"expiration"` // 0 means no expiration UpdateTime time.Time `bson:"updatetime" json:"updatetime"` Error *JobError `bson:"error" json:"error"` // error struct exists when in suspended state Resumed int `bson:"resumed" json:"resumed"` // number of times the job has been resumed from suspension ShockHost string `bson:"shockhost" json:"shockhost"` // this is a fall-back default if not specified at a lower level IsCWL bool `bson:"is_cwl" json:"is_cwl"` CWL_job_input interface{} `bson:"cwl_job_input" json:"cwl_job_input"` // has to be an array for mongo (id as key would not work) CWL_ShockRequirement *cwl.ShockRequirement `bson:"cwl_shock_requirement" json:"cwl_shock_requirement"` CWL_workflow *cwl.Workflow `bson:"-" json:"-" yaml:"-" mapstructure:"-"` WorkflowInstancesMap map[string]*WorkflowInstance `bson:"-" json:"-" yaml:"-" mapstructure:"-"` WorkflowInstancesRemain int `bson:"workflow_instances_remain" json:"workflow_instances_remain"` Entrypoint string `bson:"entrypoint" json:"entrypoint"` // name of main workflow (typically has name #main or #entrypoint) WorkflowContext *cwl.WorkflowContext `bson:"context" json:"context" yaml:"context" mapstructure:"context"` }
type JobReaper ¶ added in v0.9.26
type JobReaper struct{}
var ( Ttl *JobReaper ExpireRegex = regexp.MustCompile(`^(\d+)(M|H|D)$`) )
func NewJobReaper ¶ added in v0.9.26
func NewJobReaper() *JobReaper
type Jobs ¶
type Jobs []*Job
Job array type
func (*Jobs) GetAllLimitOffset ¶
func (*Jobs) GetAllRecent ¶
func (*Jobs) GetPaginated ¶
func (*Jobs) RLockRecursive ¶ added in v0.9.33
func (n *Jobs) RLockRecursive()
func (*Jobs) RUnlockRecursive ¶ added in v0.9.33
func (n *Jobs) RUnlockRecursive()
type MultipartWriter ¶
type MultipartWriter struct {
// contains filtered or unexported fields
}
func NewMultipartWriter ¶
func NewMultipartWriter() *MultipartWriter
func (*MultipartWriter) AddDataAsFile ¶
func (m *MultipartWriter) AddDataAsFile(fieldname string, filepath string, data *[]byte) (err error)
func (*MultipartWriter) AddFile ¶
func (m *MultipartWriter) AddFile(fieldname string, filepath string) (err error)
type Notice ¶
type Notice struct { ID Workunit_Unique_Identifier `bson:"id" json:"id" mapstructure:"id"` // redundant field, for reporting WorkerID string `bson:"worker_id" json:"worker_id" mapstructure:"worker_id"` Results *cwl.Job_document `bson:"results" json:"results" mapstructure:"results"` // subset of tool_results with Shock URLs Status string `bson:"status,omitempty" json:"status,omitempty" mapstructure:"status,omitempty"` // this is redundant as workunit already has state, but this is only used for transfer ComputeTime int `bson:"computetime,omitempty" json:"computetime,omitempty" mapstructure:"computetime,omitempty"` Notes string Stderr string }
Notice _
type PartInfo ¶
type PartInfo struct { Input string `bson:"input" json:"input" mapstructure:"input"` Index string `bson:"index" json:"index" mapstructure:"index"` TotalIndex int `bson:"totalindex" json:"totalindex" mapstructure:"totalindex"` MaxPartSizeMB int `bson:"maxpartsize_mb" json:"maxpartsize_mb" mapstructure:"maxpartsize_mb"` Options string `bson:"options" json:"-" mapstructure:"-"` }
type Pipeline ¶
type Pipeline struct { Info *Info `bson:"info" json:"info"` Tasks []Task `bson:"tasks" json:"tasks"` }
func NewPipeline ¶
func NewPipeline() *Pipeline
type ProxyMgr ¶
type ProxyMgr struct {
CQMgr
}
func NewProxyMgr ¶
func NewProxyMgr() *ProxyMgr
func (*ProxyMgr) ClientChecker ¶
func (qm *ProxyMgr) ClientChecker()
func (*ProxyMgr) ClientHandle ¶ added in v0.9.16
func (qm *ProxyMgr) ClientHandle()
func (*ProxyMgr) DeleteJobByUser ¶ added in v0.9.3
func (*ProxyMgr) DeleteSuspendedJobsByUser ¶ added in v0.9.3
func (*ProxyMgr) DeleteZombieJobsByUser ¶ added in v0.9.3
func (*ProxyMgr) EnqueueTasksByJobId ¶
func (*ProxyMgr) FetchDataToken ¶
func (*ProxyMgr) FetchPrivateEnv ¶ added in v0.9.3
func (*ProxyMgr) FinalizeWorkPerf ¶
func (*ProxyMgr) GetActiveJobs ¶
func (*ProxyMgr) GetJsonStatus ¶ added in v0.9.26
func (*ProxyMgr) GetReportMsg ¶
func (*ProxyMgr) GetSuspendJobs ¶
func (*ProxyMgr) GetTextStatus ¶ added in v0.9.26
func (*ProxyMgr) IsJobRegistered ¶
func (*ProxyMgr) JobRegister ¶
func (*ProxyMgr) NoticeHandle ¶ added in v0.9.33
func (qm *ProxyMgr) NoticeHandle()
func (*ProxyMgr) QueueStatus ¶ added in v0.9.23
func (*ProxyMgr) RecomputeJob ¶
recompute jobs from specified task stage
func (*ProxyMgr) RecoverJob ¶ added in v0.9.26
recover job not in queue
func (*ProxyMgr) RecoverJobs ¶
recover jobs not completed before awe-server restarts
func (*ProxyMgr) RegisterNewClient ¶
func (qm *ProxyMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)
func (*ProxyMgr) ResubmitJob ¶
re-submit a job in db but not in the queue (caused by server restarting)
func (*ProxyMgr) ResumeQueue ¶ added in v0.9.23
func (qm *ProxyMgr) ResumeQueue()
func (*ProxyMgr) ResumeSuspendedJobByUser ¶ added in v0.9.3
resubmit a suspended job if user has rights
func (*ProxyMgr) ResumeSuspendedJobsByUser ¶ added in v0.9.3
func (*ProxyMgr) SuspendJob ¶
func (*ProxyMgr) SuspendQueue ¶ added in v0.9.23
func (qm *ProxyMgr) SuspendQueue()
func (*ProxyMgr) UpdateQueueLoop ¶ added in v0.9.33
func (qm *ProxyMgr) UpdateQueueLoop()
func (*ProxyMgr) UpdateQueueToken ¶ added in v0.9.48
type RequestQueue ¶ added in v0.9.62
func NewRequestQueue ¶ added in v0.9.62
func NewRequestQueue() (q *RequestQueue)
func (*RequestQueue) Pull ¶ added in v0.9.62
func (q *RequestQueue) Pull() (req *CheckoutRequest, err error)
func (*RequestQueue) Push ¶ added in v0.9.62
func (q *RequestQueue) Push(req *CheckoutRequest) (err error)
type ResourceMgr ¶
type ServerMgr ¶
ServerMgr _
func (*ServerMgr) ClientHandle ¶ added in v0.9.16
func (qm *ServerMgr) ClientHandle()
func (*ServerMgr) CreateAndEnqueueWorkunits ¶ added in v0.9.33
func (*ServerMgr) CreateJobPerf ¶
---perf related methods
func (*ServerMgr) CreateTaskPerf ¶
func (*ServerMgr) CreateWorkPerf ¶
func (qm *ServerMgr) CreateWorkPerf(id Workunit_Unique_Identifier) (err error)
func (*ServerMgr) DeleteJobByUser ¶ added in v0.9.3
func (*ServerMgr) DeleteSuspendedJobsByUser ¶ added in v0.9.3
func (*ServerMgr) DeleteZombieJobsByUser ¶ added in v0.9.3
delete jobs in db with "queued" or "in-progress" state but not in the queue (zombie jobs) that user has access to
func (*ServerMgr) EnqueueTasks ¶
func (*ServerMgr) EnqueueTasksByJobId ¶
this is trigggered by user action, either job POST or job resume / recover / resubmit
func (*ServerMgr) EnqueueWorkflowInstance ¶
func (qm *ServerMgr) EnqueueWorkflowInstance(wi *WorkflowInstance) (err error)
func (*ServerMgr) FetchDataToken ¶
func (qm *ServerMgr) FetchDataToken(work_id Workunit_Unique_Identifier, clientid string) (token string, err error)
--workunit methds (servermgr implementation)
func (*ServerMgr) FetchPrivateEnv ¶ added in v0.9.3
func (*ServerMgr) FinalizeJobPerf ¶
func (*ServerMgr) FinalizeTaskPerf ¶
TODO evaluate err
func (*ServerMgr) FinalizeWorkPerf ¶
func (qm *ServerMgr) FinalizeWorkPerf(id Workunit_Unique_Identifier, reportfile string) (err error)
func (*ServerMgr) GetActiveJobs ¶
func (*ServerMgr) GetDependencies ¶
func (qm *ServerMgr) GetDependencies(job *Job, workflow_instance *WorkflowInstance, workflow_input_map map[string]cwl.CWLType, workflow_step *cwl.WorkflowStep, context *cwl.WorkflowContext) (err error)
Tasks or Subworkflows
func (*ServerMgr) GetJsonStatus ¶ added in v0.9.26
func (*ServerMgr) GetReportMsg ¶
func (qm *ServerMgr) GetReportMsg(id Workunit_Unique_Identifier, logname string) (report string, err error)
func (*ServerMgr) GetSourceFromWorkflowInstanceInput ¶
func (*ServerMgr) GetStepInputObjects ¶ added in v0.9.62
func (*ServerMgr) GetSuspendJobs ¶
func (*ServerMgr) GetTextStatus ¶ added in v0.9.26
func (*ServerMgr) IsJobRegistered ¶
func (*ServerMgr) Is_WI_ready ¶
func (*ServerMgr) LogJobPerf ¶
func (*ServerMgr) NoticeHandle ¶ added in v0.9.33
func (qm *ServerMgr) NoticeHandle()
func (*ServerMgr) QueueStatus ¶ added in v0.9.23
func (*ServerMgr) RecomputeJob ¶
recompute job from specified task stage
func (*ServerMgr) RecoverJob ¶ added in v0.9.26
recover a job in db that is missing from queue (caused by server restarting)
func (*ServerMgr) RecoverJobs ¶
recover jobs not completed before awe-server restarts
func (*ServerMgr) ResubmitJob ¶
ResubmitJob recompute job from beginning
func (*ServerMgr) ResumeQueue ¶ added in v0.9.23
func (qm *ServerMgr) ResumeQueue()
func (*ServerMgr) ResumeSuspendedJobByUser ¶ added in v0.9.3
resubmit a suspended job if the user is authorized
func (*ServerMgr) ResumeSuspendedJobsByUser ¶ added in v0.9.3
func (*ServerMgr) SaveStdLog ¶
func (qm *ServerMgr) SaveStdLog(id Workunit_Unique_Identifier, logname string, tmppath string) (err error)
func (*ServerMgr) SuspendJob ¶
use for JOB_STAT_SUSPEND and JOB_STAT_FAILED_PERMANENT
func (*ServerMgr) SuspendQueue ¶ added in v0.9.23
func (qm *ServerMgr) SuspendQueue()
func (*ServerMgr) UpdateJobPerfStartTime ¶
func (*ServerMgr) UpdateJobTaskToInProgress ¶
happens when a client checks out a workunit update job/task states from "queued" to "in-progress" once the first workunit is checked out
func (*ServerMgr) UpdateQueueLoop ¶ added in v0.9.33
func (qm *ServerMgr) UpdateQueueLoop()
UpdateQueueLoop _
func (*ServerMgr) UpdateQueueToken ¶ added in v0.9.48
update tokens for in-memory data structures
func (*ServerMgr) UpdateTaskPerfStartTime ¶
type SetCounter ¶
func NewSetCounter ¶
func NewSetCounter(numberOfSets int, array []cwl.Array, scatter_type string) (sc *SetCounter)
func (*SetCounter) Increment ¶
func (sc *SetCounter) Increment() (ok bool)
type StandardResponse ¶ added in v0.9.33
type StandardResponse struct { Status int `json:"status"` Data interface{} `json:"data"` Error []string `json:"error"` }
StandardResponse _
func NotifyWorkunitProcessedWithLogs ¶
func NotifyWorkunitProcessedWithLogs(work *Workunit, perf *WorkPerf, sendstdlogs bool) (response *StandardResponse, err error)
NotifyWorkunitProcessedWithLogs worker code to send workunit results
type StructContainer ¶ added in v0.9.43
type StructContainer struct {
Data interface{} `json:"data"`
}
type Task ¶
type Task struct { TaskRaw `bson:",inline" mapstructure:",squash"` Inputs []*IO `bson:"inputs" json:"inputs" mapstructure:"inputs"` Outputs []*IO `bson:"outputs" json:"outputs" mapstructure:"outputs"` Predata []*IO `bson:"predata" json:"predata" mapstructure:"predata"` Comment string }
func CreateWorkflowTasks ¶
func CreateWorkflowTasks(job *Job, namePrefix string, steps []cwl.WorkflowStep, stepPrefix string, parentID *Task_Unique_Identifier) (tasks []*Task, err error)
CreateWorkflowTasks _
func NewTaskFromInterface ¶
func NewTaskFromInterface(original interface{}, context *cwl.WorkflowContext) (task *Task, err error)
func NewTasksFromInterface ¶
func NewTasksFromInterface(original interface{}, context *cwl.WorkflowContext) (tasks []*Task, err error)
func (*Task) CollectDependencies ¶ added in v0.9.62
populate DependsOn
func (*Task) CreateInputIndexes ¶ added in v0.9.67
checks and creates indices on input shock nodes if needed
func (*Task) CreateOutputIndexes ¶ added in v0.9.67
checks and creates indices on output shock nodes if needed if worker failed to do so, this will catch it
func (*Task) CreateWorkunits ¶ added in v0.9.33
func (*Task) DeleteInput ¶ added in v0.9.12
func (*Task) DeleteLogs ¶ added in v0.9.67
func (*Task) DeleteOutput ¶ added in v0.9.3
func (*Task) GetOutputs ¶ added in v0.9.33
func (*Task) GetStepOutput ¶
func (*Task) GetStepOutputNames ¶
func (*Task) GetTaskLogs ¶ added in v0.9.27
func (*Task) IncrementComputeTime ¶ added in v0.9.33
func (*Task) IncrementRemainWork ¶ added in v0.9.33
func (*Task) InitPartIndex ¶
get part size based on partition/index info this resets task.Partition when called only 1 task.Inputs allowed unless 'partinfo.input' specified on POST if fail to get index info, task.TotalWork set to 1 and task.Partition set to nil
func (*Task) ResetTaskTrue ¶ added in v0.9.67
func (*Task) SetRemainWork ¶ added in v0.9.33
func (*Task) SetResetTask ¶ added in v0.9.67
func (*Task) SetResetTaskInputs ¶
func (*Task) SetResetTaskOutputs ¶
func (*Task) SetTaskType ¶ added in v0.9.62
func (*Task) ValidateDependants ¶ added in v0.9.67
func (*Task) ValidateInputs ¶ added in v0.9.67
func (*Task) ValidateOutputs ¶ added in v0.9.67
func (*Task) ValidatePredata ¶ added in v0.9.67
type TaskDep ¶ added in v0.9.16
type TaskDep struct { TaskRaw `bson:",inline"` Inputs IOmap `bson:"inputs" json:"inputs"` Outputs IOmap `bson:"outputs" json:"outputs"` Predata IOmap `bson:"predata" json:"predata"` }
Deprecated JobDep struct uses deprecated TaskDep struct which uses the deprecated IOmap. Maintained for backwards compatibility. Jobs that cannot be parsed into the Job struct, but can be parsed into the JobDep struct will be translated to the new Job struct. (=deprecated=)
type TaskMap ¶ added in v0.9.33
func NewTaskMap ¶ added in v0.9.33
func NewTaskMap() (t *TaskMap)
func (*TaskMap) Delete ¶ added in v0.9.33
func (tm *TaskMap) Delete(taskid Task_Unique_Identifier) (task *Task, ok bool, err error)
type TaskPerf ¶
type TaskPerf struct { Queued int64 `bson:"queued" json:"queued"` Start int64 `bson:"start" json:"start"` End int64 `bson:"end" json:"end"` Resp int64 `bson:"resp" json:"resp"` //End -Queued InFileSizes []int64 `bson:"size_infile" json:"size_infile"` OutFileSizes []int64 `bson:"size_outfile" json:"size_outfile"` }
func NewTaskPerf ¶
type TaskRaw ¶ added in v0.9.33
type TaskRaw struct { rwmutex.RWMutex `bson:"-" json:"-" mapstructure:"-"` Task_Unique_Identifier `bson:",inline" mapstructure:",squash"` Id string `bson:"taskid" json:"taskid" mapstructure:"taskid"` // old-style TaskType string `bson:"task_type" json:"task_type" mapstructure:"task_type"` Info *Info `bson:"-" json:"-" mapstructure:"-"` // this is just a pointer to the job.Info Cmd *Command `bson:"cmd" json:"cmd" mapstructure:"cmd"` Partition *PartInfo `bson:"partinfo" json:"-" mapstructure:"partinfo"` DependsOn []string `bson:"dependsOn" json:"dependsOn" mapstructure:"dependsOn"` // only needed if dependency cannot be inferred from Input.Origin TotalWork int `bson:"totalwork" json:"totalwork" mapstructure:"totalwork"` MaxWorkSize int `bson:"maxworksize" json:"maxworksize" mapstructure:"maxworksize"` RemainWork int `bson:"remainwork" json:"remainwork" mapstructure:"remainwork"` ResetTask bool `bson:"resettask" json:"-" mapstructure:"resettask"` // trigged by function - resume, recompute, resubmit State string `bson:"state" json:"state" mapstructure:"state"` CreatedDate time.Time `bson:"createdDate" json:"createddate" mapstructure:"createdDate"` StartedDate time.Time `bson:"startedDate" json:"starteddate" mapstructure:"startedDate"` CompletedDate time.Time `bson:"completedDate" json:"completeddate" mapstructure:"completedDate"` ComputeTime int `bson:"computetime" json:"computetime" mapstructure:"computetime"` UserAttr map[string]interface{} `bson:"userattr" json:"userattr" mapstructure:"userattr"` ClientGroups string `bson:"clientgroups" json:"clientgroups" mapstructure:"clientgroups"` WorkflowStep *cwl.WorkflowStep `bson:"workflowStep" json:"workflowStep" mapstructure:"workflowStep"` // CWL-only StepOutputInterface interface{} `bson:"stepOutput" json:"stepOutput" mapstructure:"stepOutput"` // CWL-only StepInput *cwl.Job_document `bson:"-" json:"-" mapstructure:"-"` // CWL-only StepOutput *cwl.Job_document `bson:"-" json:"-" mapstructure:"-"` // CWL-only //Scatter_task bool `bson:"scatter_task" json:"scatter_task" mapstructure:"scatter_task"` // CWL-only, indicates if this is a scatter_task TODO: compare with TaskType ? Scatter_parent *Task_Unique_Identifier `bson:"scatter_parent" json:"scatter_parent" mapstructure:"scatter_parent"` // CWL-only, points to scatter parent ScatterChildren []string `bson:"scatterChildren" json:"scatterChildren" mapstructure:"scatterChildren"` // use simple TaskName , CWL-only, list of all children in a subworkflow task ScatterChildren_ptr []*Task `bson:"-" json:"-" mapstructure:"-"` // caching only, CWL-only Finalizing bool `bson:"-" json:"-" mapstructure:"-"` // CWL-only, a lock mechanism for subworkflows and scatter tasks CwlVersion cwl.CWLVersion `bson:"cwlVersion,omitempty" mapstructure:"cwlVersion,omitempty" mapstructure:"cwlVersion,omitempty"` // CWL-only WorkflowInstanceId string `bson:"workflow_instance_id" json:"workflow_instance_id" mapstructure:"workflow_instance_id"` // CWL-only NotReadyReason string `bson:"-" json:"notReadyReason" mapstructure:"-"` // contains filtered or unexported fields }
func NewTaskRaw ¶ added in v0.9.33
func NewTaskRaw(task_id Task_Unique_Identifier, info *Info) (tr *TaskRaw, err error)
func (*TaskRaw) Finalize ¶ added in v0.9.62
this function prevents a dead-lock when a sub-workflow task finalizes
func (*TaskRaw) GetDependsOn ¶ added in v0.9.33
func (*TaskRaw) GetId ¶ added in v0.9.33
func (task *TaskRaw) GetId(me string) (id Task_Unique_Identifier, err error)
func (*TaskRaw) GetScatterChildren ¶
func (task *TaskRaw) GetScatterChildren(wi *WorkflowInstance, qm *ServerMgr) (children []*Task, err error)
func (*TaskRaw) GetStateNamed ¶ added in v0.9.33
only for debugging purposes
func (*TaskRaw) GetTaskType ¶ added in v0.9.62
func (*TaskRaw) GetWorkflowInstance ¶
func (task *TaskRaw) GetWorkflowInstance() (wi *WorkflowInstance, ok bool, err error)
func (*TaskRaw) SetCompletedDate ¶ added in v0.9.43
func (*TaskRaw) SetCreatedDate ¶ added in v0.9.33
func (*TaskRaw) SetScatterChildren ¶
func (*TaskRaw) SetStartedDate ¶ added in v0.9.33
func (*TaskRaw) SetState ¶ added in v0.9.33
func (task *TaskRaw) SetState(wi *WorkflowInstance, new_state string, writeLock bool) (err error)
also updates wi.RemainTasks, task.SetCompletedDate
func (*TaskRaw) SetStepOutput ¶ added in v0.9.62
func (task *TaskRaw) SetStepOutput(jd *cwl.Job_document, lock bool) (err error)
type Task_Unique_Identifier ¶ added in v0.9.62
type Task_Unique_Identifier struct { TaskName string `bson:"task_name" json:"task_name" mapstructure:"task_name"` // example: #main/filter //Parent string `bson:"parent" json:"parent" mapstructure:"parent"` JobId string `bson:"jobid" json:"jobid" mapstructure:"jobid"` }
func New_Task_Unique_Identifier ¶ added in v0.9.62
func New_Task_Unique_Identifier(jobid string, taskname string) (t Task_Unique_Identifier, err error)
func New_Task_Unique_Identifier_FromString ¶ added in v0.9.62
func New_Task_Unique_Identifier_FromString(old_style_id string) (t Task_Unique_Identifier, err error)
func (Task_Unique_Identifier) String ¶ added in v0.9.62
func (taskid Task_Unique_Identifier) String() (s string, err error)
type WorkLog ¶ added in v0.9.27
type WorkLog struct { Id string `bson:"wuid" json:"wuid"` // TODO change ! Rank int `bson:"rank" json:"rank"` Logs map[string]string `bson:"logs" json:"logs"` }
func NewWorkLog ¶ added in v0.9.27
func NewWorkLog(id Workunit_Unique_Identifier) (wlog *WorkLog, err error)
type WorkMgr ¶
type WorkMgr interface { GetWorkById(Workunit_Unique_Identifier) (*Workunit, error) ShowWorkunits(string) ([]*Workunit, error) ShowWorkunitsByUser(string, *user.User) []*Workunit CheckoutWorkunits(string, string, *Client, int64, int) ([]*Workunit, error) NotifyWorkStatus(Notice) EnqueueWorkunit(*Workunit) error FetchDataToken(Workunit_Unique_Identifier, string) (string, error) FetchPrivateEnv(Workunit_Unique_Identifier, string) (map[string]string, error) }
type WorkPerf ¶
type WorkPerf struct { Queued int64 `bson:"queued" json:"queued"` // WQ (queued at server or client, depending on who creates it) Done int64 `bson:"done" json:"done"` // WD (done at server) Resp int64 `bson:"resp" json:"resp"` // Done - Queued (server metric) Checkout int64 `bson:"checkout" json:"checkout"` // checkout at client Deliver int64 `bson:"deliver" json:"deliver"` // done at client ClientResp int64 `bson:"clientresp" json:"clientresp"` // Deliver - Checkout (client metric) PreDataIn float64 `bson:"time_predata_in" json:"time_predata_in"` // time in seconds for downloading prerequisite data at client DataIn float64 `bson:"time_data_in" json:"time_data_in"` // time in seconds for input data move-in at client DataOut float64 `bson:"time_data_out" json:"time_data_out"` // time in seconds for output data move-out at client Runtime int64 `bson:"runtime" json:"runtime"` // time in seconds for computation at client DockerPrep int64 `bson:"dockerprep" json:"dockerprep"` // time in seconds for docker preparation on client MaxMemUsage int64 `bson:"max_mem_usage" json:"max_mem_usage"` // maxium memory consumption MaxMemoryTotalRss int64 `bson:"max_memory_total_rss" json:"max_memory_total_rss"` MaxMemoryTotalSwap int64 `bson:"max_memory_total_swap" json:"max_memory_total_swap"` ClientId string `bson:"client_id" json:"client_id"` PreDataSize int64 `bson:"size_predata" json:"size_predata"` //predata moved over network InFileSize int64 `bson:"size_infile" json:"size_infile"` //input file moved over network OutFileSize int64 `bson:"size_outfile" json:"size_outfile"` //outpuf file moved over network }
func NewWorkPerf ¶
func NewWorkPerf() *WorkPerf
type WorkQueue ¶ added in v0.9.33
type WorkQueue struct { Queue WorkunitMap // WORK_STAT_QUEUED - waiting workunits Checkout WorkunitMap // WORK_STAT_CHECKOUT - workunits being checked out Suspend WorkunitMap // WORK_STAT_SUSPEND - suspended workunits // contains filtered or unexported fields }
func NewWorkQueue ¶ added in v0.9.33
func NewWorkQueue() *WorkQueue
func (*WorkQueue) Delete ¶ added in v0.9.33
func (wq *WorkQueue) Delete(id Workunit_Unique_Identifier) (err error)
func (*WorkQueue) Get ¶ added in v0.9.33
func (wq *WorkQueue) Get(id Workunit_Unique_Identifier) (w *Workunit, ok bool, err error)
func (*WorkQueue) Has ¶ added in v0.9.33
func (wq *WorkQueue) Has(id Workunit_Unique_Identifier) (has bool, err error)
func (*WorkQueue) StatusChange ¶ added in v0.9.33
type WorkerRuntime ¶ added in v0.9.62
type WorkerRuntime struct { ID string `bson:"id" json:"id"` // this is a uuid (the only relevant identifier) Name string `bson:"name" json:"name"` // this can be anything you want Group string `bson:"group" json:"group"` User string `bson:"user" json:"user"` Domain string `bson:"domain" json:"domain"` InstanceID string `bson:"instance_id" json:"instance_id"` // Openstack specific InstanceType string `bson:"instance_type" json:"instance_type"` // Openstack specific //Host string `bson:"host" json:"host"` // deprecated Hostname string `bson:"hostname" json:"hostname"` HostIP string `bson:"host_ip" json:"host_ip"` // Host can be physical machine or VM, whatever is helpful for management CPUs int `bson:"cores" json:"cores"` Apps []string `bson:"apps" json:"apps"` GitCommitHash string `bson:"git_commit_hash" json:"git_commit_hash"` Version string `bson:"version" json:"version"` }
WorkerRuntime worker info that does not change at runtime
type WorkerState ¶ added in v0.9.62
type WorkerState struct { Healthy bool `bson:"healthy" json:"healthy"` ErrorMessage string `bson:"error_message" json:"error_message"` Busy bool `bson:"busy" json:"busy"` // a state CurrentWork *WorkunitList `bson:"current_work" json:"current_work"` }
WorkerState changes at runtime
func NewWorkerState ¶ added in v0.9.62
func NewWorkerState() (ws *WorkerState)
NewWorkerState creates WorkerState
type Workflow ¶
type Workflow struct { WfInfo awf_info `bson:"workflow_info" json:"workflow_info"` JobInfo awf_jobinfo `bson:"job_info" json:"job_info"` RawInputs map[string]string `bson:"raw_inputs" json:"raw_inputs"` Variables map[string]string `bson:"variables" json:"variables"` DataServer string `bson:"data_server" json:"data_server"` Tasks []*awf_task `bson:"tasks" json:"tasks"` }
type WorkflowInstance ¶ added in v0.9.62
type WorkflowInstance struct { rwmutex.RWMutex `bson:"-" json:"-" mapstructure:"-"` LocalID string `bson:"local_id" json:"id" mapstructure:"local_id"` // workfow id without job id , mongo uses JobId_LocalId to get a globally unique identifier JobID string `bson:"job_id" json:"job_id" mapstructure:"job_id"` //ParentID string `bson:"parent_id" json:"parent_id" mapstructure:"parent_id"` // DEPRECATED!? it can be computed from LocalId ACL *acl.Acl `bson:"acl" json:"-"` State string `bson:"state" json:"state" mapstructure:"state"` // this is unique identifier for the workflow instance WorkflowDefinition string `bson:"workflow_definition" json:"workflow_definition" mapstructure:"workflow_definition"` // name of the workflow this instance is derived from Workflow *cwl.Workflow `bson:"-" json:"-" mapstructure:"-"` // just a cache for the Workflow pointer Inputs cwl.Job_document `bson:"inputs" json:"inputs" mapstructure:"inputs"` Outputs cwl.Job_document `bson:"outputs" json:"outputs" mapstructure:"outputs"` Tasks []*Task `bson:"tasks" json:"tasks" mapstructure:"tasks"` RemainSteps int `bson:"remainsteps" json:"remainsteps" mapstructure:"remainsteps"` TotalTasks int `bson:"totaltasks" json:"totaltasks" mapstructure:"totaltasks"` Subworkflows []string `bson:"subworkflows" json:"subworkflows" mapstructure:"subworkflows"` ParentStep *cwl.WorkflowStep `bson:"-" json:"-" mapstructure:"-"` // cache Parent *WorkflowInstance `bson:"-" json:"-" mapstructure:"-"` // cache for ParentId Job *Job `bson:"-" json:"-" mapstructure:"-"` // cache }
WorkflowInstance _
func NewWorkflowInstance ¶
func NewWorkflowInstance(localID string, jobid string, workflowDefinition string, job *Job, parentWorkflowInstanceID string) (wi *WorkflowInstance, err error)
NewWorkflowInstance _
func NewWorkflowInstanceArrayFromInterface ¶
func NewWorkflowInstanceArrayFromInterface(original []interface{}, job *Job, context *cwl.WorkflowContext) (wis []*WorkflowInstance, err error)
NewWorkflowInstanceArrayFromInterface _
func NewWorkflowInstanceFromInterface ¶ added in v0.9.62
func NewWorkflowInstanceFromInterface(original interface{}, job *Job, context *cwl.WorkflowContext, doInit bool) (wi *WorkflowInstance, err error)
NewWorkflowInstanceFromInterface _
func (*WorkflowInstance) AddSubworkflow ¶
func (wi *WorkflowInstance) AddSubworkflow(job *Job, subworkflow string, writeLock bool) (err error)
AddSubworkflow assumes WorkflowInstance is in mongo already
func (*WorkflowInstance) AddTask ¶
AddTask db_sync is a string because a bool would be misunderstood as a lock indicator ("db_sync_no", db_sync_yes)
func (*WorkflowInstance) GetID ¶
func (wi *WorkflowInstance) GetID(readLock bool) (id string, err error)
GetID includes JobID
func (*WorkflowInstance) GetJob ¶
func (wi *WorkflowInstance) GetJob(readLock bool) (job *Job, err error)
GetJob _
func (*WorkflowInstance) GetOutput ¶
func (wi *WorkflowInstance) GetOutput(name string, readLock bool) (obj cwl.CWLType, ok bool, err error)
GetOutput _
func (*WorkflowInstance) GetParent ¶
func (wi *WorkflowInstance) GetParent(readLock bool) (parent *WorkflowInstance, err error)
GetParent _
func (*WorkflowInstance) GetParentID ¶
func (wi *WorkflowInstance) GetParentID(readLock bool) (parentID string, err error)
GetParentID returns relative ID of parent workflow
func (*WorkflowInstance) GetParentRaw ¶
func (wi *WorkflowInstance) GetParentRaw(readLock bool) (parent *WorkflowInstance, err error)
GetParentRaw _
func (*WorkflowInstance) GetParentStep_DEPRECATED ¶
func (wi *WorkflowInstance) GetParentStep_DEPRECATED(readLock bool) (pstep *cwl.WorkflowStep, err error)
func (*WorkflowInstance) GetParentStep_cached_DEPRECATED ¶
func (wi *WorkflowInstance) GetParentStep_cached_DEPRECATED() (pstep *cwl.WorkflowStep, err error)
func (*WorkflowInstance) GetRemainSteps ¶
func (wi *WorkflowInstance) GetRemainSteps(readLock bool) (remain int, err error)
GetRemainSteps _
func (*WorkflowInstance) GetState ¶
func (wi *WorkflowInstance) GetState(readLock bool) (state string, err error)
GetState _
func (*WorkflowInstance) GetTask ¶
func (wi *WorkflowInstance) GetTask(taskID Task_Unique_Identifier, readLock bool) (task *Task, ok bool, err error)
GetTask _
func (*WorkflowInstance) GetTaskByName ¶
func (wi *WorkflowInstance) GetTaskByName(taskName string, readLock bool) (task *Task, ok bool, err error)
GetTaskByName _
func (*WorkflowInstance) GetTasks ¶
func (wi *WorkflowInstance) GetTasks(readLock bool) (tasks []*Task, err error)
GetTasks get tasks form from all subworkflows in the job
func (*WorkflowInstance) GetWorkflow ¶
func (wi *WorkflowInstance) GetWorkflow(context *cwl.WorkflowContext) (workflow *cwl.Workflow, err error)
GetWorkflow _
func (*WorkflowInstance) IncrementRemainSteps ¶
func (wi *WorkflowInstance) IncrementRemainSteps(amount int, writeLock bool) (remain int, err error)
IncrementRemainSteps _
func (*WorkflowInstance) Init ¶
func (wi *WorkflowInstance) Init(job *Job) (changed bool, err error)
Init _
func (*WorkflowInstance) SetOutputs ¶
func (wi *WorkflowInstance) SetOutputs(outputs cwl.Job_document, context *cwl.WorkflowContext, writeLock bool) (err error)
SetOutputs _
func (*WorkflowInstance) SetState ¶
func (wi *WorkflowInstance) SetState(state string, dbSync bool, writeLock bool) (err error)
SetState will set state and notify parent WorkflowInstance or Job if completed DO NOT CALL directely, use wrapper qm.WISetState()
func (*WorkflowInstance) SetSubworkflows ¶
func (wi *WorkflowInstance) SetSubworkflows(steps []string, writeLock bool) (err error)
SetSubworkflows _
func (*WorkflowInstance) TaskCount ¶
func (wi *WorkflowInstance) TaskCount() (count int)
TaskCount _
type WorkflowInstanceMap ¶
func NewWorkflowInstancesMap ¶
func NewWorkflowInstancesMap() (wim *WorkflowInstanceMap)
func (*WorkflowInstanceMap) Add ¶
func (wim *WorkflowInstanceMap) Add(ID string, workflow_instance *WorkflowInstance) (err error)
func (*WorkflowInstanceMap) Get ¶
func (wim *WorkflowInstanceMap) Get(id string) (workflow_instance *WorkflowInstance, ok bool, err error)
func (*WorkflowInstanceMap) GetWorkflowInstances ¶
func (wim *WorkflowInstanceMap) GetWorkflowInstances() (wis []*WorkflowInstance, err error)
type WorkflowMgr ¶
type WorkflowMgr struct {
// contains filtered or unexported fields
}
WorkflowMgr comment
var ( //AwfMgr manager AwfMgr *WorkflowMgr )
func (*WorkflowMgr) AddWorkflow ¶
func (wfm *WorkflowMgr) AddWorkflow(name string, awf *Workflow)
AddWorkflow _
func (*WorkflowMgr) GetAllWorkflows ¶
func (wfm *WorkflowMgr) GetAllWorkflows() (workflows []*Workflow)
GetAllWorkflows _
func (*WorkflowMgr) GetWorkflow ¶
func (wfm *WorkflowMgr) GetWorkflow(name string) (awf *Workflow, err error)
GetWorkflow _
func (*WorkflowMgr) LoadWorkflows ¶
func (wfm *WorkflowMgr) LoadWorkflows() (err error)
LoadWorkflows _
type Workunit ¶
type Workunit struct { Workunit_Unique_Identifier `bson:",inline" json:",inline" mapstructure:",squash"` WorkunitState `bson:",inline" json:",inline" mapstructure:",squash"` Id string `bson:"id,omitempty" json:"id,omitempty" mapstructure:"id,omitempty"` // global identifier: jobid_taskid_rank (for backwards coompatibility only) WuId string `bson:"wuid,omitempty" json:"wuid,omitempty" mapstructure:"wuid,omitempty"` // deprecated ! Info *Info `bson:"info,omitempty" json:"info,omitempty" mapstructure:"info,omitempty"` // *** Inputs []*IO `bson:"inputs,omitempty" json:"inputs,omitempty" mapstructure:"inputs,omitempty"` Outputs []*IO `bson:"outputs,omitempty" json:"outputs,omitempty" mapstructure:"outputs,omitempty"` Predata []*IO `bson:"predata,omitempty" json:"predata,omitempty" mapstructure:"predata,omitempty"` // *** Cmd *Command `bson:"cmd,omitempty" json:"cmd,omitempty" mapstructure:"cmd,omitempty"` // *** TotalWork int `bson:"totalwork,omitempty" json:"totalwork,omitempty" mapstructure:"totalwork,omitempty"` Partition *PartInfo `bson:"part,omitempty" json:"part,omitempty" mapstructure:"part,omitempty"` // *** CheckoutTime time.Time `bson:"checkout_time,omitempty" json:"checkout_time,omitempty" mapstructure:"checkout_time,omitempty"` ComputeTime int `bson:"computetime,omitempty" json:"computetime,omitempty" mapstructure:"computetime,omitempty"` ExitStatus int `bson:"exitstatus,omitempty" json:"exitstatus,omitempty" mapstructure:"exitstatus,omitempty"` // Linux Exit Status Code (0 is success) Notes []string `bson:"notes,omitempty" json:"notes,omitempty" mapstructure:"notes,omitempty"` UserAttr map[string]interface{} `bson:"userattr,omitempty" json:"userattr,omitempty" mapstructure:"userattr,omitempty"` ShockHost string `bson:"shockhost,omitempty" json:"shockhost,omitempty" mapstructure:"shockhost,omitempty"` // specifies default Shock host for outputs CWLWorkunit *CWLWorkunit `bson:"cwl,omitempty" json:"cwl,omitempty" mapstructure:"cwl,omitempty"` WorkPath string // this is the working directory. If empty, it will be computed. WorkPerf *WorkPerf Context *cwl.WorkflowContext `bson:"-" json:"-" mapstructure:"-"` }
func NewWorkunit ¶
func (*Workunit) CDworkpath ¶
func (*Workunit) Evaluate ¶
func (w *Workunit) Evaluate(inputs interface{}, context *cwl.WorkflowContext) (err error)
func (*Workunit) FetchDataToken ¶
func (*Workunit) GetID ¶
func (w *Workunit) GetID() (id Workunit_Unique_Identifier)
func (*Workunit) GetIdBase64 ¶
type WorkunitList ¶ added in v0.9.62
type WorkunitList struct { rwmutex.RWMutex `bson:"-" json:"-"` Data []string `json:"data"` // contains filtered or unexported fields }
func NewWorkunitList ¶ added in v0.9.62
func NewWorkunitList() *WorkunitList
func (*WorkunitList) Add ¶ added in v0.9.62
func (cl *WorkunitList) Add(workid Workunit_Unique_Identifier) (err error)
lock always
func (*WorkunitList) Delete ¶ added in v0.9.62
func (cl *WorkunitList) Delete(workid Workunit_Unique_Identifier, writeLock bool) (err error)
func (*WorkunitList) Delete_all ¶ added in v0.9.62
func (cl *WorkunitList) Delete_all(workid string, writeLock bool) (err error)
func (*WorkunitList) FillMap ¶ added in v0.9.62
func (cl *WorkunitList) FillMap() (err error)
opposite of sync; take Data entries and copy them into map
func (*WorkunitList) Get_list ¶ added in v0.9.62
func (cl *WorkunitList) Get_list(do_read_lock bool) (assigned_work_ids []Workunit_Unique_Identifier, err error)
func (*WorkunitList) Get_string_list ¶ added in v0.9.62
func (cl *WorkunitList) Get_string_list(do_read_lock bool) (work_ids []string, err error)
func (*WorkunitList) Has ¶ added in v0.9.62
func (cl *WorkunitList) Has(workid Workunit_Unique_Identifier) (ok bool, err error)
func (*WorkunitList) Init ¶ added in v0.9.62
func (this *WorkunitList) Init(name string)
type WorkunitMap ¶ added in v0.9.33
type WorkunitMap struct { rwmutex.RWMutex Map map[Workunit_Unique_Identifier]*Workunit }
func NewWorkunitMap ¶ added in v0.9.33
func NewWorkunitMap() *WorkunitMap
func (*WorkunitMap) Delete ¶ added in v0.9.33
func (wm *WorkunitMap) Delete(id Workunit_Unique_Identifier) (err error)
func (*WorkunitMap) Get ¶ added in v0.9.33
func (wm *WorkunitMap) Get(id Workunit_Unique_Identifier) (workunit *Workunit, ok bool, err error)
func (*WorkunitMap) GetWorkunits ¶ added in v0.9.33
func (wm *WorkunitMap) GetWorkunits() (workunits []*Workunit, err error)
func (*WorkunitMap) Len ¶ added in v0.9.33
func (wm *WorkunitMap) Len() (length int, err error)
func (*WorkunitMap) Set ¶ added in v0.9.33
func (wm *WorkunitMap) Set(workunit *Workunit) (err error)
type WorkunitState ¶
type WorkunitState struct { State string `bson:"state,omitempty" json:"state,omitempty" mapstructure:"state,omitempty"` Failed int `bson:"failed,omitempty" json:"failed,omitempty" mapstructure:"failed,omitempty"` Client string `bson:"client,omitempty" json:"client,omitempty" mapstructure:"client,omitempty"` }
type Workunit_Unique_Identifier ¶ added in v0.9.62
type Workunit_Unique_Identifier struct { Task_Unique_Identifier `bson:",inline" json:",inline" mapstructure:",squash"` // TaskName, Workflow, JobId Rank int `bson:"rank" json:"rank" mapstructure:"rank"` // this is the local identifier, an abstract identifer for the data chunk }
func New_Workunit_Unique_Identifier ¶ added in v0.9.62
func New_Workunit_Unique_Identifier(task Task_Unique_Identifier, rank int) (wui Workunit_Unique_Identifier)
func New_Workunit_Unique_Identifier_FromString ¶ added in v0.9.62
func New_Workunit_Unique_Identifier_FromString(old_style_id string) (w Workunit_Unique_Identifier, err error)
func New_Workunit_Unique_Identifier_from_interface ¶ added in v0.9.62
func New_Workunit_Unique_Identifier_from_interface(original interface{}) (wui Workunit_Unique_Identifier, err error)
func (Workunit_Unique_Identifier) GetTask ¶ added in v0.9.62
func (w Workunit_Unique_Identifier) GetTask() Task_Unique_Identifier
func (Workunit_Unique_Identifier) String ¶ added in v0.9.62
func (w Workunit_Unique_Identifier) String() (work_str string, err error)
type WorkunitsSortby ¶ added in v0.9.8
func (WorkunitsSortby) Len ¶ added in v0.9.8
func (w WorkunitsSortby) Len() int
func (WorkunitsSortby) Less ¶ added in v0.9.8
func (w WorkunitsSortby) Less(i, j int) bool
func (WorkunitsSortby) Swap ¶ added in v0.9.8
func (w WorkunitsSortby) Swap(i, j int)
Source Files ¶
- awfmgr.go
- checkoutRequest.go
- client.go
- clientgroup.go
- clientgroups.go
- clientmap.go
- clients.go
- command.go
- core.go
- cqmgr.go
- cwl2awe.go
- cwl_workunit.go
- cwl_workunit_result.go
- db.go
- db_clientgroup.go
- db_job.go
- db_task.go
- db_workflow_instances.go
- expire.go
- info.go
- io.go
- job.go
- jobmap.go
- jobs.go
- multipartWriter.go
- perf.go
- pipeline.go
- proxymgr.go
- resmgr.go
- servermgr.go
- setcounter.go
- task.go
- taskUniqueIdentifier.go
- taskmap.go
- utilproxy.go
- workRequestQueue.go
- workflow.go
- workflowInstance.go
- workflowInstanceMap.go
- worklog.go
- workqueue.go
- workunit.go
- workunitUniqueIdentifier.go
- workunitlist.go
- workunitmap.go
- workunitsSortby.go