core

package
v0.9.68 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2018 License: BSD-2-Clause Imports: 40 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JOB_STAT_INIT             = "init"        // inital state
	JOB_STAT_QUEUING          = "queuing"     // transition from "init" to "queued"
	JOB_STAT_QUEUED           = "queued"      // all tasks have been added to taskmap
	JOB_STAT_INPROGRESS       = "in-progress" // a first task went into state in-progress
	JOB_STAT_COMPLETED        = "completed"
	JOB_STAT_SUSPEND          = "suspend"
	JOB_STAT_FAILED_PERMANENT = "failed-permanent" // this sepcific error state can be trigger by the workflow software
	JOB_STAT_DELETED          = "deleted"
)
View Source
const (
	TASK_STAT_INIT             = "init"        // initial state on creation of a task
	TASK_STAT_PENDING          = "pending"     // a task that wants to be enqueued
	TASK_STAT_READY            = "ready"       // a task ready to be enqueued
	TASK_STAT_QUEUED           = "queued"      // a task for which workunits have been created/queued
	TASK_STAT_INPROGRESS       = "in-progress" // a first workunit has been checkout (this does not guarantee a workunit is running right now)
	TASK_STAT_SUSPEND          = "suspend"
	TASK_STAT_FAILED           = "failed"
	TASK_STAT_FAILED_PERMANENT = "failed-permanent" // on exit code 42
	TASK_STAT_COMPLETED        = "completed"
	TASK_STAT_SKIPPED          = "user_skipped" // deprecated
	TASK_STAT_FAIL_SKIP        = "skipped"      // deprecated
	TASK_STAT_PASSED           = "passed"       // deprecated ?
)
View Source
const (
	TASK_TYPE_UNKNOWN  = ""
	TASK_TYPE_SCATTER  = "scatter"
	TASK_TYPE_WORKFLOW = "workflow"
	TASK_TYPE_NORMAL   = "normal"
)
View Source
const (
	WORK_STAT_INIT             = "init"             // initial state
	WORK_STAT_QUEUED           = "queued"           // after requeue ; after failures below max ; on WorkQueue.Add()
	WORK_STAT_RESERVED         = "reserved"         // short lived state between queued and checkout. when a worker checks the workunit out, the state is reserved.
	WORK_STAT_CHECKOUT         = "checkout"         // normal work checkout ; client registers that already has a workunit (e.g. after reboot of server)
	WORK_STAT_SUSPEND          = "suspend"          // on MAX_FAILURE ; on SuspendJob
	WORK_STAT_FAILED_PERMANENT = "failed-permanent" // app had exit code 42
	WORK_STAT_DONE             = "done"             // client only: done
	WORK_STAT_ERROR            = "fail"             // client only: workunit computation or IO error (variable was renamed to ERROR but not the string fail, to maintain backwards compability)
	WORK_STAT_PREPARED         = "prepared"         // client only: after argument parsing
	WORK_STAT_COMPUTED         = "computed"         // client only: after computation is done, before upload
	WORK_STAT_DISCARDED        = "discarded"        // client only: job / task suspended or server UUID changes
	WORK_STAT_PROXYQUEUED      = "proxyqueued"      // proxy only
)

Variables

View Source
var (
	QMgr          ResourceMgr
	Service       string = "unknown"
	Self          *Client
	ProxyWorkChan chan bool
	Server_UUID   string
	JM            *JobMap
	Start_time    time.Time
)
View Source
var (
	CGNameRegex = regexp.MustCompile(`^[A-Za-z0-9\_\-\.]+$`)
)
View Source
var DocumentMaxByte = 16777216

mongodb has hard limit of 16 MB docuemnt size

View Source
var JobInfoIndexes = []string{"name", "submittime", "completedtime", "pipeline", "clientgroups", "project", "service", "user", "priority", "userattr.submission"}

indexed info fields for search

View Source
var WAIT_TIMEOUT = time.Minute * 3

Functions

func CWL_input_check added in v0.9.62

func CWL_input_check(job_input *cwl.Job_document, cwl_workflow *cwl.Workflow) (err error)

func DBGetJobAcl added in v0.9.33

func DBGetJobAcl(job_id string) (_acl acl.Acl, err error)

func DbFindDistinct added in v0.9.26

func DbFindDistinct(q bson.M, d string) (results interface{}, err error)

func DbUpdateJobField added in v0.9.33

func DbUpdateJobField(job_id string, key string, value interface{}) (err error)

func DeleteClientGroup added in v0.9.3

func DeleteClientGroup(id string) (err error)

func Deserialize_b64 added in v0.9.62

func Deserialize_b64(encoding string, target interface{}) (err error)

func GetAdminView added in v0.9.62

func GetAdminView(special string) (data []interface{}, err error)

patch the admin view data function from the job controller through to the db.go

func GetJobCount added in v0.9.12

func GetJobCount(q bson.M) (count int, err error)

func GetJobIdByTaskId_deprecated added in v0.9.62

func GetJobIdByTaskId_deprecated(taskid string) (jobid string, err error)

func GetJobIdByWorkId_deprecated added in v0.9.62

func GetJobIdByWorkId_deprecated(workid string) (jobid string, err error)

func GetTaskIdByWorkId_deprecated added in v0.9.62

func GetTaskIdByWorkId_deprecated(workid string) (taskid string, err error)

func HasInfoField added in v0.9.26

func HasInfoField(a string) bool

func InitAwfMgr

func InitAwfMgr()

func InitClientGroupDB added in v0.9.3

func InitClientGroupDB()

func InitJobDB

func InitJobDB()

func InitProxyWorkChan

func InitProxyWorkChan()

func InitReaper added in v0.9.26

func InitReaper()

func InitResMgr

func InitResMgr(service string)

func IsFirstTask

func IsFirstTask(taskid string) bool

func IsValidUUID added in v0.9.62

func IsValidUUID(uuid string) bool

func NotifyWorkunitProcessed

func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error)

functions for REST API communication (=deprecated=) notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"

func PushOutputData

func PushOutputData(work *Workunit) (size int64, err error)

deprecated, see cache.UploadOutputData

func ReloadFromDisk

func ReloadFromDisk(path string) (err error)

func RemoveWorkFromClient added in v0.9.45

func RemoveWorkFromClient(client *Client, workid Workunit_Unique_Identifier) (err error)

func SetClientProfile added in v0.9.33

func SetClientProfile(profile *Client)

func UpdateJobState_deprecated added in v0.9.62

func UpdateJobState_deprecated(jobid string, newstate string, oldstates []string) (err error)

update job state to "newstate" only if the current state is in one of the "oldstates" // TODO make this a job.SetState function

Types

type BaseResponse added in v0.9.62

type BaseResponse struct {
	Status int      `json:"status"`
	Error  []string `json:"error"`
}

type CQMgr

type CQMgr struct {
	// contains filtered or unexported fields
}

this struct is embedded in ServerMgr

func (*CQMgr) AddClient added in v0.9.33

func (qm *CQMgr) AddClient(client *Client, lock bool) (err error)

func (*CQMgr) CheckClient added in v0.9.33

func (qm *CQMgr) CheckClient(client *Client) (ok bool, err error)

func (*CQMgr) CheckoutWorkunits

func (qm *CQMgr) CheckoutWorkunits(req_policy string, client_id string, client *Client, available_bytes int64, num int) (workunits []*Workunit, err error)

func (*CQMgr) ClientChecker

func (qm *CQMgr) ClientChecker()

func (*CQMgr) ClientHandle added in v0.9.16

func (qm *CQMgr) ClientHandle()

func (*CQMgr) ClientHeartBeat

func (qm *CQMgr) ClientHeartBeat(id string, cg *ClientGroup, workerstate WorkerState) (hbmsg HeartbeatInstructions, err error)

func (*CQMgr) ClientStatusChange_deprecated added in v0.9.62

func (qm *CQMgr) ClientStatusChange_deprecated(client *Client, new_status string, client_write_lock bool) (err error)

func (*CQMgr) DeleteClients added in v0.9.33

func (qm *CQMgr) DeleteClients(delete_clients []string)

func (*CQMgr) EnqueueWorkunit

func (qm *CQMgr) EnqueueWorkunit(work *Workunit) (err error)

func (*CQMgr) FetchDataToken

func (qm *CQMgr) FetchDataToken(workid string, clientid string) (token string, err error)

func (*CQMgr) GetAllClientsByUser added in v0.9.3

func (qm *CQMgr) GetAllClientsByUser(u *user.User) (clients []*Client, err error)

func (*CQMgr) GetClient

func (qm *CQMgr) GetClient(id string, lock_clientmap bool) (client *Client, ok bool, err error)

func (*CQMgr) GetClientByUser added in v0.9.3

func (qm *CQMgr) GetClientByUser(id string, u *user.User) (client *Client, err error)

func (*CQMgr) GetClientMap added in v0.9.33

func (qm *CQMgr) GetClientMap() *ClientMap

func (*CQMgr) GetWorkById

func (qm *CQMgr) GetWorkById(id Workunit_Unique_Identifier) (workunit *Workunit, err error)

func (*CQMgr) HasClient added in v0.9.16

func (qm *CQMgr) HasClient(id string, lock_clientmap bool) (has bool, err error)

func (*CQMgr) ListClients added in v0.9.16

func (qm *CQMgr) ListClients() (ids []string, err error)

func (*CQMgr) NotifyWorkStatus

func (qm *CQMgr) NotifyWorkStatus(notice Notice)

func (*CQMgr) ReQueueWorkunitByClient

func (qm *CQMgr) ReQueueWorkunitByClient(client *Client, client_write_lock bool) (err error)

func (*CQMgr) RegisterNewClient

func (qm *CQMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)

This can be a new client or an old client that re-registers

func (*CQMgr) RemoveClient added in v0.9.16

func (qm *CQMgr) RemoveClient(id string, lock bool) (err error)

lock is for clientmap

func (*CQMgr) ResumeClient

func (qm *CQMgr) ResumeClient(id string) (err error)

func (*CQMgr) ResumeClientByUser added in v0.9.3

func (qm *CQMgr) ResumeClientByUser(id string, u *user.User) (err error)

func (*CQMgr) ResumeSuspendedClients

func (qm *CQMgr) ResumeSuspendedClients() (count int, err error)

func (*CQMgr) ResumeSuspendedClientsByUser added in v0.9.3

func (qm *CQMgr) ResumeSuspendedClientsByUser(u *user.User) (count int)

func (*CQMgr) ShowWorkunits

func (qm *CQMgr) ShowWorkunits(status string) (workunits []*Workunit, err error)

func (*CQMgr) ShowWorkunitsByUser added in v0.9.3

func (qm *CQMgr) ShowWorkunitsByUser(status string, u *user.User) (workunits []*Workunit)

func (*CQMgr) SuspendAllClients

func (qm *CQMgr) SuspendAllClients(reason string) (count int, err error)

func (*CQMgr) SuspendAllClientsByUser added in v0.9.3

func (qm *CQMgr) SuspendAllClientsByUser(u *user.User, reason string) (count int, err error)

func (*CQMgr) SuspendClient

func (qm *CQMgr) SuspendClient(id string, client *Client, reason string, client_write_lock bool) (err error)

use id OR client

func (*CQMgr) SuspendClientByUser added in v0.9.3

func (qm *CQMgr) SuspendClientByUser(id string, u *user.User, reason string) (err error)

func (*CQMgr) UpdateSubClients

func (qm *CQMgr) UpdateSubClients(id string, count int) (err error)

func (*CQMgr) UpdateSubClientsByUser added in v0.9.3

func (qm *CQMgr) UpdateSubClientsByUser(id string, count int, u *user.User)

type CWL_workunit added in v0.9.48

type CWL_workunit struct {
	Job_input          *cwl.Job_document `bson:"job_input,omitempty" json:"job_input,omitempty" mapstructure:"job_input,omitempty"`
	Job_input_filename string            `bson:"job_input_filename,omitempty" json:"job_input_filename,omitempty" mapstructure:"job_input_filename,omitempty"`
	//CWL_tool           *cwl.CommandLineTool      `bson:"cwl_tool,omitempty" json:"cwl_tool,omitempty" mapstructure:"cwl_tool,omitempty"`
	//CWL_tool_filename  string                    `bson:"cwl_tool_filename,omitempty" json:"cwl_tool_filename,omitempty" mapstructure:"cwl_tool_filename,omitempty"`
	Tool            interface{}               `bson:"tool,omitempty" json:"tool,omitempty" mapstructure:"tool,omitempty"`
	Tool_filename   string                    `bson:"tool_filename,omitempty" json:"tool_filename,omitempty" mapstructure:"tool_filename,omitempty"`
	Outputs         *cwl.Job_document         `bson:"outputs,omitempty" json:"outputs,omitempty" mapstructure:"outputs,omitempty"`
	OutputsExpected *[]cwl.WorkflowStepOutput `bson:"outputs_expected,omitempty" json:"outputs_expected,omitempty" mapstructure:"outputs_expected,omitempty"` // this is the subset of outputs that are needed by the workflow
	Notice          `bson:",inline" json:",inline" mapstructure:",squash"`
}

func NewCWL_workunit added in v0.9.62

func NewCWL_workunit() *CWL_workunit

func NewCWL_workunit_from_interface added in v0.9.62

func NewCWL_workunit_from_interface(native interface{}) (workunit *CWL_workunit, schemata []cwl.CWLType_Type, err error)

type Client

type Client struct {
	RWMutex         `bson:"-" json:"-"`
	WorkerRuntime   `bson:",inline" json:",inline"`
	WorkerState     `bson:",inline" json:",inline"`
	RegTime         time.Time     `bson:"regtime" json:"regtime"`
	LastCompleted   time.Time     `bson:"lastcompleted" json:"lastcompleted"` // time of last time a job was completed (can be used to compute idle time)
	Serve_time      string        `bson:"serve_time" json:"serve_time"`
	Total_checkout  int           `bson:"total_checkout" json:"total_checkout"`
	Total_completed int           `bson:"total_completed" json:"total_completed"`
	Total_failed    int           `bson:"total_failed" json:"total_failed"`
	Skip_work       []string      `bson:"skip_work" json:"skip_work"`
	Last_failed     int           `bson:"-" json:"-"`
	Tag             bool          `bson:"-" json:"-"`
	Proxy           bool          `bson:"proxy" json:"proxy"`
	SubClients      int           `bson:"subclients" json:"subclients"`
	Online          bool          `bson:"online" json:"online"`                 // a state
	Suspended       bool          `bson:"suspended" json:"suspended"`           // a state
	Suspend_reason  string        `bson:"suspend_reason" json:"suspend_reason"` // a state
	Status          string        `bson:"Status" json:"Status"`                 // 1) suspended? 2) busy ? 3) online (call is idle) 4) offline
	Assigned_work   *WorkunitList `bson:"assigned_work" json:"assigned_work"`   // this is for exporting into json
	// contains filtered or unexported fields
}

this is the Worker

func NewClient

func NewClient() (client *Client)

func NewProfileClient

func NewProfileClient(filepath string) (client *Client, err error)

create Client object from json file

func (*Client) Add added in v0.9.62

func (this *Client) Add(workid Workunit_Unique_Identifier) (err error)

func (*Client) Append_Skip_work added in v0.9.33

func (cl *Client) Append_Skip_work(workid Workunit_Unique_Identifier, write_lock bool) (err error)

func (*Client) Contains_Skip_work_nolock added in v0.9.33

func (cl *Client) Contains_Skip_work_nolock(workid string) (c bool)

func (*Client) Get_Ack added in v0.9.33

func (cl *Client) Get_Ack() (ack CoAck, err error)

func (*Client) Get_Busy added in v0.9.62

func (cl *Client) Get_Busy(do_read_lock bool) (b bool, err error)

func (*Client) Get_Group added in v0.9.62

func (cl *Client) Get_Group(do_read_lock bool) (g string, err error)

func (*Client) Get_Id added in v0.9.45

func (cl *Client) Get_Id(do_read_lock bool) (s string, err error)

func (*Client) Get_Last_failed added in v0.9.33

func (cl *Client) Get_Last_failed() (count int, err error)

func (*Client) Get_New_Status added in v0.9.62

func (cl *Client) Get_New_Status(do_read_lock bool) (s string, err error)

this function should not be used internally, this is only for backwards-compatibility and human readability

func (*Client) Get_Suspended added in v0.9.62

func (cl *Client) Get_Suspended(do_read_lock bool) (s bool, err error)

func (*Client) Get_Total_checkout added in v0.9.33

func (cl *Client) Get_Total_checkout() (count int, err error)

func (*Client) Get_Total_completed added in v0.9.33

func (cl *Client) Get_Total_completed() (count int, err error)

func (*Client) Get_Total_failed added in v0.9.33

func (cl *Client) Get_Total_failed() (count int, err error)

func (*Client) Increment_last_failed added in v0.9.33

func (cl *Client) Increment_last_failed(write_lock bool) (value int, err error)

func (*Client) Increment_total_checkout added in v0.9.33

func (cl *Client) Increment_total_checkout(err error)

func (*Client) Increment_total_completed added in v0.9.33

func (cl *Client) Increment_total_completed() (err error)

func (*Client) Increment_total_failed added in v0.9.33

func (cl *Client) Increment_total_failed(write_lock bool) (err error)

func (*Client) Init added in v0.9.33

func (client *Client) Init()

invoked by NewClient or manually after unmarshalling

func (*Client) Marshal added in v0.9.33

func (cl *Client) Marshal() (result []byte, err error)

func (*Client) Resume added in v0.9.62

func (cl *Client) Resume(write_lock bool) (err error)

func (*Client) Set_Busy added in v0.9.62

func (cl *Client) Set_Busy(b bool, do_write_lock bool) (err error)

func (*Client) Set_Online added in v0.9.62

func (cl *Client) Set_Online(o bool, write_lock bool) (err error)

func (*Client) Set_Status_deprecated added in v0.9.62

func (cl *Client) Set_Status_deprecated(s string, write_lock bool) (err error)

func (*Client) Set_Suspended added in v0.9.62

func (cl *Client) Set_Suspended(s bool, reason string, write_lock bool) (err error)

func (*Client) Suspend added in v0.9.62

func (cl *Client) Suspend(reason string, write_lock bool) (err error)

func (*Client) Update_Status added in v0.9.62

func (cl *Client) Update_Status(write_lock bool) (err error)

type ClientGroup added in v0.9.3

type ClientGroup struct {
	Id           string                        `bson:"id" json:"id"`
	IP_CIDR      string                        `bson:"ip_cidr" json:"ip_cidr"`
	Name         string                        `bson:"name" json:"name"`
	Token        string                        `bson:"token" json:"token"`
	Acl          clientGroupAcl.ClientGroupAcl `bson:"acl" json:"-"`
	CreatedOn    time.Time                     `bson:"created_on" json:"created_on"`
	Expiration   time.Time                     `bson:"expiration" json:"expiration"`
	LastModified time.Time                     `bson:"last_modified" json:"last_modified"`
}

func CreateClientGroup added in v0.9.3

func CreateClientGroup(name string, u *user.User) (cg *ClientGroup, err error)

func LoadClientGroup added in v0.9.3

func LoadClientGroup(id string) (clientgroup *ClientGroup, err error)

func LoadClientGroupByName added in v0.9.3

func LoadClientGroupByName(name string) (clientgroup *ClientGroup, err error)

func LoadClientGroupByToken added in v0.9.3

func LoadClientGroupByToken(token string) (clientgroup *ClientGroup, err error)

func (*ClientGroup) Save added in v0.9.3

func (cg *ClientGroup) Save() (err error)

func (*ClientGroup) SetToken added in v0.9.3

func (cg *ClientGroup) SetToken()

type ClientGroups added in v0.9.3

type ClientGroups []ClientGroup

ClientGroup array type

func (*ClientGroups) GetPaginated added in v0.9.3

func (n *ClientGroups) GetPaginated(q bson.M, limit int, offset int, order string, direction string) (count int, err error)

type ClientMap added in v0.9.33

type ClientMap struct {
	RWMutex
	// contains filtered or unexported fields
}

func NewClientMap added in v0.9.33

func NewClientMap() *ClientMap

func (*ClientMap) Add added in v0.9.33

func (cl *ClientMap) Add(client *Client, lock bool) (err error)

func (*ClientMap) Delete added in v0.9.33

func (cl *ClientMap) Delete(client_id string, lock bool) (err error)

func (*ClientMap) Get added in v0.9.33

func (cl *ClientMap) Get(client_id string, lock bool) (client *Client, ok bool, err error)

func (*ClientMap) GetClientIds added in v0.9.33

func (cl *ClientMap) GetClientIds() (ids []string, err error)

func (*ClientMap) GetClients added in v0.9.33

func (cl *ClientMap) GetClients() (clients []*Client, err error)

func (*ClientMap) Has added in v0.9.33

func (cl *ClientMap) Has(client_id string, lock bool) (ok bool, err error)

type ClientMgr

type ClientMgr interface {
	RegisterNewClient(FormFiles, *ClientGroup) (*Client, error)
	ClientHeartBeat(string, *ClientGroup, WorkerState) (HeartbeatInstructions, error)
	GetClient(string, bool) (*Client, bool, error)
	GetClientByUser(string, *user.User) (*Client, error)
	//GetAllClients() []*Client
	GetClientMap() *ClientMap
	GetAllClientsByUser(*user.User) ([]*Client, error)
	//DeleteClient(*Client) error
	//DeleteClientById(string) error
	//DeleteClientByUser(string, *user.User) error
	SuspendClient(string, *Client, string, bool) error
	SuspendClientByUser(string, *user.User, string) error
	ResumeClient(string) error
	ResumeClientByUser(string, *user.User) error
	ResumeSuspendedClients() (int, error)
	ResumeSuspendedClientsByUser(*user.User) int
	SuspendAllClients(string) (int, error)
	SuspendAllClientsByUser(*user.User, string) (int, error)
	ClientChecker()
	UpdateSubClients(string, int) error
	UpdateSubClientsByUser(string, int, *user.User)
}

type ClientWorkMgr

type ClientWorkMgr interface {
	ClientMgr
	WorkMgr
}

type Clients added in v0.9.33

type Clients []*Client

func (*Clients) RLockRecursive added in v0.9.33

func (cs *Clients) RLockRecursive()

func (*Clients) RUnlockRecursive added in v0.9.33

func (cs *Clients) RUnlockRecursive()

type CoAck

type CoAck struct {
	// contains filtered or unexported fields
}

type CoReq

type CoReq struct {
	// contains filtered or unexported fields
}

type Command

type Command struct {
	Name          string   `bson:"name" json:"name" mapstructure:"name"`
	Args          string   `bson:"args" json:"args" mapstructure:"args"`
	ArgsArray     []string `bson:"args_array" json:"args_array" mapstructure:"args_array"`    // use this instead of Args, which is just a string
	Dockerimage   string   `bson:"Dockerimage" json:"Dockerimage" mapstructure:"Dockerimage"` // for Shock (TODO rename this !)
	DockerPull    string   `bson:"dockerPull" json:"dockerPull" mapstructure:"dockerPull"`    // docker pull
	Cmd_script    []string `bson:"cmd_script" json:"cmd_script" mapstructure:"cmd_script"`
	Environ       Envs     `bson:"environ" json:"environ" mapstructure:"environ"`
	HasPrivateEnv bool     `bson:"has_private_env" json:"has_private_env" mapstructure:"has_private_env"`
	Description   string   `bson:"description" json:"description" mapstructure:"description"`
	ParsedArgs    []string `bson:"-" json:"-" mapstructure:"-"`
	Local         bool     // indicates local execution, i.e. working directory is same as current working directory (do not delete !)
}

func NewCommand

func NewCommand(name string) *Command

type Command_p added in v0.9.3

type Command_p struct {
	Environ *Environ_p `json:"environ"`
}

type Environ_p added in v0.9.3

type Environ_p struct {
	Private map[string]string `json:"private"`
}

following special code is in order to unmarshal the private field Command.Environ.Private, so put them in to this file for less confusion

type Envs added in v0.9.3

type Envs struct {
	Public  map[string]string `bson:"public" json:"public"`
	Private map[string]string `bson:"private" json:"-"`
}

type Filter_work_stats added in v0.9.62

type Filter_work_stats struct {
	Total             int
	Skip_work         int
	Wrong_clientgroup int
	Wrong_app         int
}

type FormFile

type FormFile struct {
	Name     string
	Path     string
	Checksum map[string]string
}

type FormFiles

type FormFiles map[string]FormFile

type HeartbeatInstructions added in v0.9.62

type HeartbeatInstructions map[string]string //map[op]obj1,obj2 e.g. map[discard]=work1,work2

heartbeat response from awe-server to awe-worker used for issue operation request to client, e.g. discard suspended workunits

type Helper added in v0.9.33

type Helper struct {
	AWE_tasks *map[string]*Task
	// contains filtered or unexported fields
}

type IO

type IO struct {
	FileName      string                   `bson:"filename" json:"filename" mapstructure:"filename"`
	Name          string                   `bson:"name" json:"name" mapstructure:"name"`  // specifies abstract name of output as defined by the app
	AppPosition   int                      `bson:"appposition" json:"-" mapstructure:"-"` // specifies position in app output array
	Directory     string                   `bson:"directory" json:"directory" mapstructure:"directory"`
	Host          string                   `bson:"host" json:"host" mapstructure:"host"`
	Node          string                   `bson:"node" json:"node" mapstructure:"node"`
	Url           string                   `bson:"url"  json:"url" mapstructure:"url"` // can be shock or any other url
	Size          int64                    `bson:"size" json:"size" mapstructure:"size"`
	MD5           string                   `bson:"md5" json:"-" mapstructure:"-"`
	Cache         bool                     `bson:"cache" json:"cache" mapstructure:"cache"` // indicates that this files is "predata"" that needs to be cached
	Origin        string                   `bson:"origin" json:"origin" mapstructure:"origin"`
	Path          string                   `bson:"-" json:"-" mapstructure:"-"`
	Optional      bool                     `bson:"optional" json:"-" mapstructure:"-"`
	Nonzero       bool                     `bson:"nonzero"  json:"nonzero" mapstructure:"nonzero"`
	DataToken     string                   `bson:"datatoken"  json:"-" mapstructure:"-"`
	Intermediate  bool                     `bson:"Intermediate"  json:"-" mapstructure:"-"`
	Temporary     bool                     `bson:"temporary"  json:"temporary" mapstructure:"temporary"`
	ShockFilename string                   `bson:"shockfilename" json:"shockfilename" mapstructure:"shockfilename"`
	ShockIndex    string                   `bson:"shockindex" json:"shockindex" mapstructure:"shockindex"` // on input it indicates that Shock node has to be indexed by AWE server
	AttrFile      string                   `bson:"attrfile" json:"attrfile" mapstructure:"attrfile"`
	NoFile        bool                     `bson:"nofile" json:"nofile" mapstructure:"nofile"`
	Delete        bool                     `bson:"delete" json:"delete" mapstructure:"delete"` // speficies that this is a temorary node, to be deleted from shock on job completion
	Type          string                   `bson:"type" json:"type" mapstructure:"type"`
	NodeAttr      map[string]interface{}   `bson:"nodeattr" json:"nodeattr" mapstructure:"nodeattr"` // specifies attribute data to be stored in shock node (output only)
	FormOptions   map[string]string        `bson:"formoptions" json:"formoptions" mapstructure:"formoptions"`
	Uncompress    string                   `bson:"uncompress" json:"uncompress" mapstructure:"uncompress"` // tells AWE client to uncompress this file, e.g. "gzip"
	Indexes       map[string]shock.IdxInfo `bson:"-" json:"-" mapstructure:"-"`                            // copy of shock node.Indexes, not saved
}

func NewIO

func NewIO() *IO

func (*IO) DataUrl

func (io *IO) DataUrl() (dataurl string, err error)

func (*IO) DeleteNode added in v0.9.3

func (io *IO) DeleteNode() (err error)

func (*IO) IndexFile added in v0.9.68

func (io *IO) IndexFile(indextype string) (idxInfo shock.IdxInfo, err error)

func (*IO) UpdateFileSize added in v0.9.68

func (io *IO) UpdateFileSize() (modified bool, err error)

func (*IO) Url2Shock added in v0.9.62

func (io *IO) Url2Shock() (err error)

type IOmap

type IOmap map[string]*IO // [filename]attributes

Deprecated JobDep struct uses deprecated TaskDep struct which uses the deprecated IOmap. Maintained for backwards compatibility. Jobs that cannot be parsed into the Job struct, but can be parsed into the JobDep struct will be translated to the new Job struct. (=deprecated=)

func NewIOmap

func NewIOmap() IOmap

(=deprecated=)

func (IOmap) Add

func (i IOmap) Add(name string, host string, node string, md5 string, cache bool)

(=deprecated=)

func (IOmap) Find

func (i IOmap) Find(name string) *IO

(=deprecated=)

func (IOmap) Has

func (i IOmap) Has(name string) bool

(=deprecated=)

type Info

type Info struct {
	Name          string                 `bson:"name" json:"name" mapstructure:"name"`
	Xref          string                 `bson:"xref" json:"xref" mapstructure:"xref"`
	Service       string                 `bson:"service" json:"service" mapstructure:"service"`
	Project       string                 `bson:"project" json:"project" mapstructure:"project"`
	User          string                 `bson:"user" json:"user" mapstructure:"user"`
	Pipeline      string                 `bson:"pipeline" json:"pipeline" mapstructure:"pipeline"` // or workflow
	ClientGroups  string                 `bson:"clientgroups" json:"clientgroups" mapstructure:"clientgroups"`
	SubmitTime    time.Time              `bson:"submittime" json:"submittime" mapstructure:"submittime"`
	StartedTime   time.Time              `bson:"startedtime" json:"startedtime" mapstructure:"startedtime"`
	CompletedTime time.Time              `bson:"completedtime" json:"completedtime" mapstructure:"completedtime"`
	Priority      int                    `bson:"priority" json:"priority" mapstructure:"priority"`
	Auth          bool                   `bson:"auth" json:"auth" mapstructure:"auth"`
	DataToken     string                 `bson:"datatoken" json:"-" mapstructure:"-"`
	NoRetry       bool                   `bson:"noretry" json:"noretry" mapstructure:"noretry"`
	UserAttr      map[string]interface{} `bson:"userattr" json:"userattr" mapstructure:"userattr"`
	Description   string                 `bson:"description" json:"description" mapstructure:"description"`
	Tracking      bool                   `bson:"tracking" json:"tracking" mapstructure:"tracking"`
	StartAt       time.Time              `bson:"start_at" json:"start_at" mapstructure:"start_at"` // will start tasks at this timepoint or shortly after
}

job info

func NewInfo

func NewInfo() *Info

func (*Info) SetStartedTime added in v0.9.62

func (this *Info) SetStartedTime(jobid string, t time.Time) (err error)

type Job

type Job struct {
	JobRaw `bson:",inline"`
	Tasks  []*Task `bson:"tasks" json:"tasks"`
}

func CWL2AWE added in v0.9.33

func CWL2AWE(_user *user.User, files FormFiles, job_input *cwl.Job_document, cwl_workflow *cwl.Workflow, collection *cwl.CWL_collection) (job *Job, err error)

func CreateJobImport added in v0.9.26

func CreateJobImport(u *user.User, file FormFile) (job *Job, err error)

func CreateJobUpload

func CreateJobUpload(u *user.User, files FormFiles) (job *Job, err error)

func GetJob added in v0.9.33

func GetJob(id string) (job *Job, err error)

func JobDepToJob added in v0.9.16

func JobDepToJob(jobDep *JobDep) (job *Job, err error)

Takes the deprecated (version 1) Job struct and returns the version 2 Job struct or an error

func LoadJob

func LoadJob(id string) (job *Job, err error)

func NewJob added in v0.9.33

func NewJob() (job *Job)

func ReadJobFile added in v0.9.33

func ReadJobFile(filename string) (job *Job, err error)

func (*Job) AddTask added in v0.9.62

func (job *Job) AddTask(task *Task) (err error)

func (*Job) AddWorkflowInstance added in v0.9.62

func (job *Job) AddWorkflowInstance(id string, inputs cwl.Job_document, remain_tasks int) (err error)

func (*Job) Decrease_WorkflowInstance_RemainTasks added in v0.9.62

func (job *Job) Decrease_WorkflowInstance_RemainTasks(id string) (remain_tasks int, err error)

func (*Job) Delete added in v0.9.26

func (job *Job) Delete() (err error)

func (*Job) FilePath

func (job *Job) FilePath() (path string, err error)

func (*Job) GetDataToken

func (job *Job) GetDataToken() (token string)

func (*Job) GetJobLogs added in v0.9.27

func (job *Job) GetJobLogs() (jlog *JobLog, err error)

func (*Job) GetPrivateEnv added in v0.9.3

func (job *Job) GetPrivateEnv(taskid string) (env map[string]string, err error)

func (*Job) GetRemainTasks added in v0.9.33

func (job *Job) GetRemainTasks() (remain_tasks int, err error)

func (*Job) GetState added in v0.9.33

func (job *Job) GetState(do_lock bool) (state string, err error)

func (*Job) GetTasks added in v0.9.33

func (job *Job) GetTasks() (tasks []*Task, err error)

func (*Job) GetWorkflowInstance added in v0.9.62

func (job *Job) GetWorkflowInstance(id string, do_read_lock bool) (wi *WorkflowInstance, err error)

func (*Job) GetWorkflowInstanceIndex added in v0.9.62

func (job *Job) GetWorkflowInstanceIndex(id string, do_read_lock bool) (index int, err error)

func (*Job) IncrementRemainTasks added in v0.9.33

func (job *Job) IncrementRemainTasks(inc int) (err error)

func (*Job) IncrementResumed added in v0.9.33

func (job *Job) IncrementResumed(inc int) (err error)

func (*Job) Init added in v0.9.33

func (job *Job) Init() (changed bool, err error)

this has to be called after Unmarshalling from JSON

func (*Job) Mkdir

func (job *Job) Mkdir() (err error)

func (*Job) NumTask

func (job *Job) NumTask() int

func (*Job) Path

func (job *Job) Path() (path string, err error)

---Path functions

func (*Job) RLockRecursive added in v0.9.33

func (job *Job) RLockRecursive()

func (*Job) RUnlockRecursive added in v0.9.33

func (job *Job) RUnlockRecursive()

func (*Job) Rmdir added in v0.9.26

func (job *Job) Rmdir() (err error)

func (*Job) Save

func (job *Job) Save() (err error)

func (*Job) SaveToDisk added in v0.9.33

func (job *Job) SaveToDisk() (err error)

func (*Job) SetClientgroups added in v0.9.33

func (job *Job) SetClientgroups(clientgroups string) (err error)

func (*Job) SetDataToken

func (job *Job) SetDataToken(token string) (err error)

func (*Job) SetError added in v0.9.48

func (job *Job) SetError(newError *JobError) (err error)

func (*Job) SetExpiration added in v0.9.26

func (job *Job) SetExpiration(expire string) (err error)

func (*Job) SetFile

func (job *Job) SetFile(file FormFile) (err error)

func (*Job) SetPipeline added in v0.9.26

func (job *Job) SetPipeline(pipeline string) (err error)

func (*Job) SetPriority added in v0.9.33

func (job *Job) SetPriority(priority int) (err error)

func (*Job) SetRemainTasks added in v0.9.33

func (job *Job) SetRemainTasks(remain_tasks int) (err error)

func (*Job) SetState added in v0.9.33

func (job *Job) SetState(newState string, oldstates []string) (err error)

func (*Job) Set_WorkflowInstance_Outputs added in v0.9.62

func (job *Job) Set_WorkflowInstance_Outputs(id string, outputs cwl.Job_document) (err error)

func (*Job) TaskList

func (job *Job) TaskList() []*Task

---Task functions

func (*Job) UpdateFile

func (job *Job) UpdateFile(files FormFiles, field string) (err error)

---Script upload (e.g. field="upload")

type JobDep added in v0.9.16

type JobDep struct {
	JobRaw `bson:",inline"`
	Tasks  []*TaskDep `bson:"tasks" json:"tasks"`
}

Deprecated JobDep struct uses deprecated TaskDep struct which uses the deprecated IOmap. Maintained for backwards compatibility. Jobs that cannot be parsed into the Job struct, but can be parsed into the JobDep struct will be translated to the new Job struct. (=deprecated=)

func NewJobDep added in v0.9.33

func NewJobDep() (job *JobDep)

type JobError added in v0.9.48

type JobError struct {
	ClientFailed string `bson:"clientfailed" json:"clientfailed"`
	WorkFailed   string `bson:"workfailed" json:"workfailed"`
	TaskFailed   string `bson:"taskfailed" json:"taskfailed"`
	ServerNotes  string `bson:"servernotes" json:"servernotes"`
	WorkNotes    string `bson:"worknotes" json:"worknotes"`
	AppError     string `bson:"apperror" json:"apperror"`
	Status       string `bson:"status" json:"status"`
}

type JobLog added in v0.9.27

type JobLog struct {
	Id         string     `bson:"id" json:"id"`
	State      string     `bson:"state" json:"state"`
	UpdateTime time.Time  `bson:"updatetime" json:"updatetime"`
	Error      *JobError  `bson:"error" json:"error"`
	Resumed    int        `bson:"resumed" json:"resumed"`
	Tasks      []*TaskLog `bson:"tasks" json:"tasks"`
}

type JobMap added in v0.9.33

type JobMap struct {
	RWMutex
	// contains filtered or unexported fields
}

func NewJobMap added in v0.9.33

func NewJobMap() (t *JobMap)

func (*JobMap) Add added in v0.9.33

func (jm *JobMap) Add(job *Job) (err error)

func (*JobMap) Delete added in v0.9.62

func (jm *JobMap) Delete(jobid string, lock bool) (err error)

func (*JobMap) Get added in v0.9.33

func (jm *JobMap) Get(jobid string, lock bool) (job *Job, ok bool, err error)

func (*JobMap) Get_List added in v0.9.62

func (jm *JobMap) Get_List(lock bool) (jobs []*Job, err error)

func (*JobMap) Len added in v0.9.33

func (jm *JobMap) Len() (length int, err error)

type JobMgr

type JobMgr interface {
	EnqueueTasksByJobId(string) error
	GetActiveJobs() map[string]bool
	IsJobRegistered(string) bool
	GetSuspendJobs() map[string]bool
	SuspendJob(string, *JobError) error
	ResumeSuspendedJobByUser(string, *user.User) error
	ResumeSuspendedJobsByUser(*user.User) int
	ResubmitJob(string) error
	DeleteJobByUser(string, *user.User, bool) error
	DeleteSuspendedJobsByUser(*user.User, bool) int
	DeleteZombieJobsByUser(*user.User, bool) int
	RecoverJob(string, *Job) (bool, error)
	RecoverJobs() (int, int, error)
	FinalizeWorkPerf(Workunit_Unique_Identifier, string) error
	SaveStdLog(Workunit_Unique_Identifier, string, string) error
	GetReportMsg(Workunit_Unique_Identifier, string) (string, error)
	RecomputeJob(string, string) error
	UpdateQueueToken(*Job) error
}

type JobMin added in v0.9.8

type JobMin struct {
	Id            string                 `bson:"id" json:"id"`
	Name          string                 `bson:"name" json:"name"`
	Size          int64                  `bson:"size" json:"size"`
	SubmitTime    time.Time              `bson:"submittime" json:"submittime"`
	CompletedTime time.Time              `bson:"completedtime" json:"completedtime"`
	ComputeTime   int                    `bson:"computetime" json:"computetime"`
	Task          []int                  `bson:"task" json:"task"`
	State         []string               `bson:"state" json:"state"`
	UserAttr      map[string]interface{} `bson:"userattr" json:"userattr"`
}

type JobPerf

type JobPerf struct {
	Id     string               `bson:"id" json:"id"`
	Queued int64                `bson:"queued" json:"queued"`
	Start  int64                `bson:"start" json:"start"`
	End    int64                `bson:"end" json:"end"`
	Resp   int64                `bson:"resp" json:"resp"` //End - Queued
	Ptasks map[string]*TaskPerf `bson:"task_stats" json:"task_stats"`
	Pworks map[string]*WorkPerf `bson:"work_stats" json:"work_stats"`
}

func LoadJobPerf

func LoadJobPerf(id string) (perf *JobPerf, err error)

func NewJobPerf

func NewJobPerf(id string) *JobPerf

type JobRaw added in v0.9.33

type JobRaw struct {
	RWMutex
	Id                   string                       `bson:"id" json:"id"` // uuid
	Acl                  acl.Acl                      `bson:"acl" json:"-"`
	Info                 *Info                        `bson:"info" json:"info"`
	Script               script                       `bson:"script" json:"-"`
	State                string                       `bson:"state" json:"state"`
	Registered           bool                         `bson:"registered" json:"registered"`
	RemainTasks          int                          `bson:"remaintasks" json:"remaintasks"`
	Expiration           time.Time                    `bson:"expiration" json:"expiration"` // 0 means no expiration
	UpdateTime           time.Time                    `bson:"updatetime" json:"updatetime"`
	Error                *JobError                    `bson:"error" json:"error"`         // error struct exists when in suspended state
	Resumed              int                          `bson:"resumed" json:"resumed"`     // number of times the job has been resumed from suspension
	ShockHost            string                       `bson:"shockhost" json:"shockhost"` // this is a fall-back default if not specified at a lower level
	IsCWL                bool                         `bson:"is_cwl" json:"is_cwl`
	CwlVersion           cwl.CWLVersion               `bson:"cwl_version" json:"cwl_version"`
	CWL_objects          interface{}                  `bson:"cwl_objects" json:"cwl_objects`
	CWL_job_input        interface{}                  `bson:"cwl_job_input" json:"cwl_job_input` // has to be an array for mongo (id as key would not work)
	CWL_collection       *cwl.CWL_collection          `bson:"-" json:"-" yaml:"-" mapstructure:"-"`
	CWL_workflow         *cwl.Workflow                `bson:"-" json:"-" yaml:"-" mapstructure:"-"`
	WorkflowInstances    []interface{}                `bson:"workflow_instances" json:"workflow_instances" yaml:"workflow_instances" mapstructure:"workflow_instances"`
	WorkflowInstancesMap map[string]*WorkflowInstance `bson:"-" json:"-" yaml:"-" mapstructure:"-"`
	Entrypoint           string                       `bson:"entrypoint" json:"entrypoint"` // name of main workflow (typically has name #main or #entrypoint)
}

func NewJobRaw added in v0.9.33

func NewJobRaw() (job *JobRaw)

func (*JobRaw) GetId added in v0.9.62

func (job *JobRaw) GetId(do_read_lock bool) (id string, err error)

type JobReaper added in v0.9.26

type JobReaper struct{}
var (
	Ttl         *JobReaper
	ExpireRegex = regexp.MustCompile(`^(\d+)(M|H|D)$`)
)

func NewJobReaper added in v0.9.26

func NewJobReaper() *JobReaper

func (*JobReaper) Handle added in v0.9.26

func (jr *JobReaper) Handle()

type Job_Acl added in v0.9.36

type Job_Acl struct {
	Acl acl.Acl `bson:"acl" json:"-"`
}

type Job_p added in v0.9.3

type Job_p struct {
	Tasks []*Task_p `json:"tasks"`
}

type Jobs

type Jobs []*Job

Job array type

func (*Jobs) GetAll

func (n *Jobs) GetAll(q bson.M, order string, direction string, do_init bool) (err error)

func (*Jobs) GetAllLimitOffset

func (n *Jobs) GetAllLimitOffset(q bson.M, limit int, offset int) (err error)

func (*Jobs) GetAllRecent

func (n *Jobs) GetAllRecent(q bson.M, recent int, do_init bool) (count int, err error)

func (*Jobs) GetAllUnsorted added in v0.9.26

func (n *Jobs) GetAllUnsorted(q bson.M) (err error)

func (*Jobs) GetJobAt

func (n *Jobs) GetJobAt(index int) *Job

func (*Jobs) GetPaginated

func (n *Jobs) GetPaginated(q bson.M, limit int, offset int, order string, direction string, do_init bool) (count int, err error)

func (*Jobs) Init added in v0.9.33

func (n *Jobs) Init() (changed_count int, err error)

func (*Jobs) Length

func (n *Jobs) Length() int

func (*Jobs) RLockRecursive added in v0.9.33

func (n *Jobs) RLockRecursive()

func (*Jobs) RUnlockRecursive added in v0.9.33

func (n *Jobs) RUnlockRecursive()

type Notice

type Notice struct {
	Id          Workunit_Unique_Identifier `bson:"id" json:"id" mapstructure:"id"` // redundant field, for reporting
	WorkerId    string                     `bson:"worker_id" json:"worker_id" mapstructure:"worker_id"`
	Results     *cwl.Job_document          `bson:"results" json:"results" mapstructure:"results"`                            // subset of tool_results with Shock URLs
	Status      string                     `bson:"status,omitempty" json:"status,omitempty" mapstructure:"status,omitempty"` // this is redundant as workunit already has state, but this is only used for transfer
	ComputeTime int                        `bson:"computetime,omitempty" json:"computetime,omitempty" mapstructure:"computetime,omitempty"`
	Notes       string
	Stderr      string
}

func NewNotice added in v0.9.62

func NewNotice(native interface{}) (workunit_result *Notice, err error)

type PartInfo

type PartInfo struct {
	Input         string `bson:"input" json:"input" mapstructure:"input"`
	Index         string `bson:"index" json:"index" mapstructure:"index"`
	TotalIndex    int    `bson:"totalindex" json:"totalindex" mapstructure:"totalindex"`
	MaxPartSizeMB int    `bson:"maxpartsize_mb" json:"maxpartsize_mb" mapstructure:"maxpartsize_mb"`
	Options       string `bson:"options" json:"-" mapstructure:"-"`
}

type Pipeline

type Pipeline struct {
	Info  *Info  `bson:"info" json:"info"`
	Tasks []Task `bson:"tasks" json:"tasks"`
}

func NewPipeline

func NewPipeline() *Pipeline

type ProxyMgr

type ProxyMgr struct {
	CQMgr
}

func NewProxyMgr

func NewProxyMgr() *ProxyMgr

func (*ProxyMgr) ClientChecker

func (qm *ProxyMgr) ClientChecker()

func (*ProxyMgr) ClientHandle added in v0.9.16

func (qm *ProxyMgr) ClientHandle()

func (*ProxyMgr) DeleteJobByUser added in v0.9.3

func (qm *ProxyMgr) DeleteJobByUser(jobid string, u *user.User, full bool) (err error)

func (*ProxyMgr) DeleteSuspendedJobsByUser added in v0.9.3

func (qm *ProxyMgr) DeleteSuspendedJobsByUser(u *user.User, full bool) (num int)

func (*ProxyMgr) DeleteZombieJobsByUser added in v0.9.3

func (qm *ProxyMgr) DeleteZombieJobsByUser(u *user.User, full bool) (num int)

func (*ProxyMgr) EnqueueTasksByJobId

func (qm *ProxyMgr) EnqueueTasksByJobId(jobid string) (err error)

func (*ProxyMgr) FetchDataToken

func (qm *ProxyMgr) FetchDataToken(workunit *Workunit, clientid string) (token string, err error)

func (*ProxyMgr) FetchPrivateEnv added in v0.9.3

func (qm *ProxyMgr) FetchPrivateEnv(workid string, clientid string) (env map[string]string, err error)

func (*ProxyMgr) FinalizeWorkPerf

func (qm *ProxyMgr) FinalizeWorkPerf(string, string) (err error)

func (*ProxyMgr) GetActiveJobs

func (qm *ProxyMgr) GetActiveJobs() map[string]bool

func (*ProxyMgr) GetJsonStatus added in v0.9.26

func (qm *ProxyMgr) GetJsonStatus() (status map[string]map[string]int, err error)

func (*ProxyMgr) GetQueue added in v0.9.26

func (qm *ProxyMgr) GetQueue(name string) interface{}

func (*ProxyMgr) GetReportMsg

func (qm *ProxyMgr) GetReportMsg(string, string) (report string, err error)

func (*ProxyMgr) GetSuspendJobs

func (qm *ProxyMgr) GetSuspendJobs() map[string]bool

func (*ProxyMgr) GetTextStatus added in v0.9.26

func (qm *ProxyMgr) GetTextStatus() string

func (*ProxyMgr) IsJobRegistered

func (qm *ProxyMgr) IsJobRegistered(id string) bool

func (*ProxyMgr) JobRegister

func (qm *ProxyMgr) JobRegister() (jid string, err error)

func (*ProxyMgr) Lock added in v0.9.33

func (qm *ProxyMgr) Lock()

func (*ProxyMgr) NoticeHandle added in v0.9.33

func (qm *ProxyMgr) NoticeHandle()

func (*ProxyMgr) QueueStatus added in v0.9.23

func (qm *ProxyMgr) QueueStatus() string

func (*ProxyMgr) RLock added in v0.9.33

func (qm *ProxyMgr) RLock()

func (*ProxyMgr) RUnlock added in v0.9.33

func (qm *ProxyMgr) RUnlock()

func (*ProxyMgr) RecomputeJob

func (qm *ProxyMgr) RecomputeJob(jobid string, stage string) (err error)

recompute jobs from specified task stage

func (*ProxyMgr) RecoverJob added in v0.9.26

func (qm *ProxyMgr) RecoverJob(id string, job *Job) (err error)

recover job not in queue

func (*ProxyMgr) RecoverJobs

func (qm *ProxyMgr) RecoverJobs() (recovered int, total int, err error)

recover jobs not completed before awe-server restarts

func (*ProxyMgr) RegisterNewClient

func (qm *ProxyMgr) RegisterNewClient(files FormFiles, cg *ClientGroup) (client *Client, err error)

func (*ProxyMgr) ResubmitJob

func (qm *ProxyMgr) ResubmitJob(id string) (err error)

re-submit a job in db but not in the queue (caused by server restarting)

func (*ProxyMgr) ResumeQueue added in v0.9.23

func (qm *ProxyMgr) ResumeQueue()

func (*ProxyMgr) ResumeSuspendedJobByUser added in v0.9.3

func (qm *ProxyMgr) ResumeSuspendedJobByUser(id string, u *user.User) (recovered bool, err error)

resubmit a suspended job if user has rights

func (*ProxyMgr) ResumeSuspendedJobsByUser added in v0.9.3

func (qm *ProxyMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)

func (*ProxyMgr) SaveStdLog

func (qm *ProxyMgr) SaveStdLog(string, string, string) (err error)

func (*ProxyMgr) SuspendJob

func (qm *ProxyMgr) SuspendJob(jobid string, jerror *JobError) (err error)

func (*ProxyMgr) SuspendQueue added in v0.9.23

func (qm *ProxyMgr) SuspendQueue()

func (*ProxyMgr) TaskHandle added in v0.9.16

func (qm *ProxyMgr) TaskHandle()

func (*ProxyMgr) Unlock added in v0.9.33

func (qm *ProxyMgr) Unlock()

func (*ProxyMgr) UpdateQueueLoop added in v0.9.33

func (qm *ProxyMgr) UpdateQueueLoop()

func (*ProxyMgr) UpdateQueueToken added in v0.9.48

func (qm *ProxyMgr) UpdateQueueToken(job *Job) (err error)

type RWMutex added in v0.9.33

type RWMutex struct {
	Name string `bson:"-" json:"-"`
	// contains filtered or unexported fields
}

func (*RWMutex) Init added in v0.9.33

func (m *RWMutex) Init(name string)

func (*RWMutex) Lock added in v0.9.33

func (m *RWMutex) Lock() (err error)

func (*RWMutex) LockNamed added in v0.9.33

func (m *RWMutex) LockNamed(name string) (err error)

func (*RWMutex) RCount added in v0.9.33

func (m *RWMutex) RCount() (c int)

func (*RWMutex) RList added in v0.9.33

func (m *RWMutex) RList() (list []string)

func (*RWMutex) RLock added in v0.9.33

func (m *RWMutex) RLock()

func (*RWMutex) RLockAnon added in v0.9.33

func (m *RWMutex) RLockAnon()

func (*RWMutex) RLockNamed added in v0.9.33

func (m *RWMutex) RLockNamed(name string) (rl ReadLock, err error)

func (*RWMutex) RUnlock added in v0.9.33

func (m *RWMutex) RUnlock()

func (*RWMutex) RUnlockAnon added in v0.9.33

func (m *RWMutex) RUnlockAnon()

func (*RWMutex) RUnlockNamed added in v0.9.33

func (m *RWMutex) RUnlockNamed(rl ReadLock)

func (*RWMutex) Unlock added in v0.9.33

func (m *RWMutex) Unlock()

type ReadLock added in v0.9.33

type ReadLock struct {
	// contains filtered or unexported fields
}

func (*ReadLock) Get_Id added in v0.9.33

func (r *ReadLock) Get_Id() string

type RequestQueue added in v0.9.62

type RequestQueue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewRequestQueue added in v0.9.62

func NewRequestQueue() (q *RequestQueue)

func (RequestQueue) Pull added in v0.9.62

func (q RequestQueue) Pull() (req *CoReq, err error)

func (RequestQueue) Push added in v0.9.62

func (q RequestQueue) Push(req *CoReq) (err error)

type ResourceMgr

type ResourceMgr interface {
	ClientWorkMgr
	JobMgr
	TaskHandle()
	ClientHandle()
	UpdateQueueLoop()
	NoticeHandle()
	GetJsonStatus() (map[string]map[string]int, error)
	GetTextStatus() string
	QueueStatus() string
	GetQueue(string) interface{}
	SuspendQueue()
	ResumeQueue()
	Lock()
	Unlock()
	RLock()
	RUnlock()
}

type ServerMgr

type ServerMgr struct {
	CQMgr

	TaskMap TaskMap
	// contains filtered or unexported fields
}

func NewServerMgr

func NewServerMgr() *ServerMgr

func (*ServerMgr) ClientHandle added in v0.9.16

func (qm *ServerMgr) ClientHandle()

func (*ServerMgr) CreateAndEnqueueWorkunits added in v0.9.33

func (qm *ServerMgr) CreateAndEnqueueWorkunits(task *Task, job *Job) (err error)

func (*ServerMgr) CreateJobPerf

func (qm *ServerMgr) CreateJobPerf(jobid string)

---perf related methods

func (*ServerMgr) CreateTaskPerf

func (qm *ServerMgr) CreateTaskPerf(task *Task) (err error)

func (*ServerMgr) CreateWorkPerf

func (qm *ServerMgr) CreateWorkPerf(id Workunit_Unique_Identifier) (err error)

func (*ServerMgr) DeleteJobByUser added in v0.9.3

func (qm *ServerMgr) DeleteJobByUser(jobid string, u *user.User, full bool) (err error)

func (*ServerMgr) DeleteSuspendedJobsByUser added in v0.9.3

func (qm *ServerMgr) DeleteSuspendedJobsByUser(u *user.User, full bool) (num int)

func (*ServerMgr) DeleteZombieJobsByUser added in v0.9.3

func (qm *ServerMgr) DeleteZombieJobsByUser(u *user.User, full bool) (num int)

delete jobs in db with "queued" or "in-progress" state but not in the queue (zombie jobs) that user has access to

func (*ServerMgr) EnqueueTasksByJobId

func (qm *ServerMgr) EnqueueTasksByJobId(jobid string) (err error)

---task methods---- this is invoked after a job is uploaded and saved in mongo

func (*ServerMgr) FetchDataToken

func (qm *ServerMgr) FetchDataToken(work_id Workunit_Unique_Identifier, clientid string) (token string, err error)

--workunit methds (servermgr implementation)

func (*ServerMgr) FetchPrivateEnv added in v0.9.3

func (qm *ServerMgr) FetchPrivateEnv(id Workunit_Unique_Identifier, clientid string) (env map[string]string, err error)

func (*ServerMgr) FinalizeJobPerf

func (qm *ServerMgr) FinalizeJobPerf(jobid string)

func (*ServerMgr) FinalizeTaskPerf

func (qm *ServerMgr) FinalizeTaskPerf(task *Task) (err error)

TODO evaluate err

func (*ServerMgr) FinalizeWorkPerf

func (qm *ServerMgr) FinalizeWorkPerf(id Workunit_Unique_Identifier, reportfile string) (err error)

func (*ServerMgr) GetActiveJobs

func (qm *ServerMgr) GetActiveJobs() (ajobs map[string]bool)

func (*ServerMgr) GetJsonStatus added in v0.9.26

func (qm *ServerMgr) GetJsonStatus() (status map[string]map[string]int, err error)

func (*ServerMgr) GetQueue added in v0.9.26

func (qm *ServerMgr) GetQueue(name string) interface{}

func (*ServerMgr) GetReportMsg

func (qm *ServerMgr) GetReportMsg(id Workunit_Unique_Identifier, logname string) (report string, err error)

func (*ServerMgr) GetStepInputObjects added in v0.9.62

func (qm *ServerMgr) GetStepInputObjects(job *Job, task_id Task_Unique_Identifier, workflow_input_map map[string]cwl.CWLType, workflow_step *cwl.WorkflowStep) (workunit_input_map cwl.JobDocMap, err error)

func (*ServerMgr) GetSuspendJobs

func (qm *ServerMgr) GetSuspendJobs() (sjobs map[string]bool)

func (*ServerMgr) GetTextStatus added in v0.9.26

func (qm *ServerMgr) GetTextStatus() string

func (*ServerMgr) IsJobRegistered

func (qm *ServerMgr) IsJobRegistered(id string) bool

func (*ServerMgr) Lock added in v0.9.33

func (qm *ServerMgr) Lock()

func (*ServerMgr) LogJobPerf

func (qm *ServerMgr) LogJobPerf(jobid string)

func (*ServerMgr) NoticeHandle added in v0.9.33

func (qm *ServerMgr) NoticeHandle()

func (*ServerMgr) QueueStatus added in v0.9.23

func (qm *ServerMgr) QueueStatus() string

func (*ServerMgr) RLock added in v0.9.33

func (qm *ServerMgr) RLock()

func (*ServerMgr) RUnlock added in v0.9.33

func (qm *ServerMgr) RUnlock()

func (*ServerMgr) RecomputeJob

func (qm *ServerMgr) RecomputeJob(jobid string, stage string) (err error)

recompute job from specified task stage

func (*ServerMgr) RecoverJob added in v0.9.26

func (qm *ServerMgr) RecoverJob(id string, job *Job) (recovered bool, err error)

recover a job in db that is missing from queue (caused by server restarting)

func (*ServerMgr) RecoverJobs

func (qm *ServerMgr) RecoverJobs() (recovered int, total int, err error)

recover jobs not completed before awe-server restarts

func (*ServerMgr) ResubmitJob

func (qm *ServerMgr) ResubmitJob(jobid string) (err error)

recompute job from beginning

func (*ServerMgr) ResumeQueue added in v0.9.23

func (qm *ServerMgr) ResumeQueue()

func (*ServerMgr) ResumeSuspendedJobByUser added in v0.9.3

func (qm *ServerMgr) ResumeSuspendedJobByUser(id string, u *user.User) (err error)

resubmit a suspended job if the user is authorized

func (*ServerMgr) ResumeSuspendedJobsByUser added in v0.9.3

func (qm *ServerMgr) ResumeSuspendedJobsByUser(u *user.User) (num int)

func (*ServerMgr) SaveStdLog

func (qm *ServerMgr) SaveStdLog(id Workunit_Unique_Identifier, logname string, tmppath string) (err error)

func (*ServerMgr) SuspendJob

func (qm *ServerMgr) SuspendJob(jobid string, jerror *JobError) (err error)

use for JOB_STAT_SUSPEND and JOB_STAT_FAILED_PERMANENT

func (*ServerMgr) SuspendQueue added in v0.9.23

func (qm *ServerMgr) SuspendQueue()

func (*ServerMgr) TaskHandle added in v0.9.16

func (qm *ServerMgr) TaskHandle()

func (*ServerMgr) Unlock added in v0.9.33

func (qm *ServerMgr) Unlock()

func (*ServerMgr) UpdateJobPerfStartTime

func (qm *ServerMgr) UpdateJobPerfStartTime(jobid string)

func (*ServerMgr) UpdateJobTaskToInProgress

func (qm *ServerMgr) UpdateJobTaskToInProgress(works []*Workunit) (err error)

update job/task states from "queued" to "in-progress" once the first workunit is checked out

func (*ServerMgr) UpdateQueueLoop added in v0.9.33

func (qm *ServerMgr) UpdateQueueLoop()

func (*ServerMgr) UpdateQueueToken added in v0.9.48

func (qm *ServerMgr) UpdateQueueToken(job *Job) (err error)

update tokens for in-memory data structures

func (*ServerMgr) UpdateTaskPerfStartTime

func (qm *ServerMgr) UpdateTaskPerfStartTime(task *Task) (err error)

type StandardResponse added in v0.9.33

type StandardResponse struct {
	Status int         `json:"status"`
	Data   interface{} `json:"data"`
	Error  []string    `json:"error"`
}

func NotifyWorkunitProcessedWithLogs

func NotifyWorkunitProcessedWithLogs(work *Workunit, perf *WorkPerf, sendstdlogs bool) (response *StandardResponse, err error)

type StringLocked added in v0.9.33

type StringLocked struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*StringLocked) Get added in v0.9.33

func (s *StringLocked) Get() string

func (*StringLocked) Set added in v0.9.33

func (s *StringLocked) Set(value string)

type StructContainer added in v0.9.43

type StructContainer struct {
	Data interface{} `json:"data"`
}

type Task

type Task struct {
	TaskRaw `bson:",inline"`
	Inputs  []*IO `bson:"inputs" json:"inputs"`
	Outputs []*IO `bson:"outputs" json:"outputs"`
	Predata []*IO `bson:"predata" json:"predata"`
}

func CreateTasks added in v0.9.62

func CreateTasks(job *Job, workflow string, steps []cwl.WorkflowStep) (tasks []*Task, err error)

func NewTask

func NewTask(job *Job, workflow string, task_id string) (t *Task, err error)

currently this is only used to make a new task from a depricated task

func (*Task) CollectDependencies added in v0.9.62

func (task *Task) CollectDependencies() (changed bool, err error)

populate DependsOn

func (*Task) CreateInputIndexes added in v0.9.67

func (task *Task) CreateInputIndexes() (err error)

checks and creates indices on input shock nodes if needed

func (*Task) CreateOutputIndexes added in v0.9.67

func (task *Task) CreateOutputIndexes() (err error)

checks and creates indices on output shock nodes if needed if worker failed to do so, this will catch it

func (*Task) CreateWorkunits added in v0.9.33

func (task *Task) CreateWorkunits(qm *ServerMgr, job *Job) (wus []*Workunit, err error)

func (*Task) DeleteInput added in v0.9.12

func (task *Task) DeleteInput() (modified int)

func (*Task) DeleteLogs added in v0.9.67

func (task *Task) DeleteLogs(logname string, writelock bool) (err error)

func (*Task) DeleteOutput added in v0.9.3

func (task *Task) DeleteOutput() (modified int)

func (*Task) GetOutput added in v0.9.36

func (task *Task) GetOutput(filename string) (output *IO, err error)

func (*Task) GetOutputs added in v0.9.33

func (task *Task) GetOutputs() (outputs []*IO, err error)

func (*Task) GetTaskLogs added in v0.9.27

func (task *Task) GetTaskLogs() (tlog *TaskLog, err error)

func (*Task) IncrementComputeTime added in v0.9.33

func (task *Task) IncrementComputeTime(inc int) (err error)

func (*Task) IncrementRemainWork added in v0.9.33

func (task *Task) IncrementRemainWork(inc int, writelock bool) (remainwork int, err error)

func (*Task) Init added in v0.9.36

func (task *Task) Init(job *Job) (changed bool, err error)

func (*Task) InitPartIndex

func (task *Task) InitPartIndex() (err error)

get part size based on partition/index info this resets task.Partition when called only 1 task.Inputs allowed unless 'partinfo.input' specified on POST if fail to get index info, task.TotalWork set to 1 and task.Partition set to nil

func (*Task) ResetTaskTrue added in v0.9.67

func (task *Task) ResetTaskTrue(name string) (err error)

func (*Task) SetRemainWork added in v0.9.33

func (task *Task) SetRemainWork(num int, writelock bool) (err error)

func (*Task) SetResetTask added in v0.9.67

func (task *Task) SetResetTask(info *Info) (err error)

func (*Task) SetTaskType added in v0.9.62

func (task *Task) SetTaskType(type_str string, writelock bool) (err error)

func (*Task) ValidateDependants added in v0.9.67

func (task *Task) ValidateDependants(qm *ServerMgr) (reason string, err error)

func (*Task) ValidateInputs added in v0.9.67

func (task *Task) ValidateInputs(qm *ServerMgr) (err error)

func (*Task) ValidateOutputs added in v0.9.67

func (task *Task) ValidateOutputs() (err error)

func (*Task) ValidatePredata added in v0.9.67

func (task *Task) ValidatePredata() (err error)

type TaskDep added in v0.9.16

type TaskDep struct {
	TaskRaw `bson:",inline"`
	Inputs  IOmap `bson:"inputs" json:"inputs"`
	Outputs IOmap `bson:"outputs" json:"outputs"`
	Predata IOmap `bson:"predata" json:"predata"`
}

Deprecated JobDep struct uses deprecated TaskDep struct which uses the deprecated IOmap. Maintained for backwards compatibility. Jobs that cannot be parsed into the Job struct, but can be parsed into the JobDep struct will be translated to the new Job struct. (=deprecated=)

type TaskLog added in v0.9.27

type TaskLog struct {
	Id            string     `bson:"taskid" json:"taskid"`
	State         string     `bson:"state" json:"state"`
	TotalWork     int        `bson:"totalwork" json:"totalwork"`
	CompletedDate time.Time  `bson:"completedDate" json:"completeddate"`
	Workunits     []*WorkLog `bson:"workunits" json:"workunits"`
}

type TaskMap added in v0.9.33

type TaskMap struct {
	RWMutex
	// contains filtered or unexported fields
}

func NewTaskMap added in v0.9.33

func NewTaskMap() (t *TaskMap)

func (*TaskMap) Add added in v0.9.33

func (tm *TaskMap) Add(task *Task) (modified bool, err error)

func (*TaskMap) Delete added in v0.9.33

func (tm *TaskMap) Delete(taskid Task_Unique_Identifier) (task *Task, ok bool)

func (*TaskMap) Get added in v0.9.33

func (tm *TaskMap) Get(taskid Task_Unique_Identifier, lock bool) (task *Task, ok bool, err error)

func (*TaskMap) GetTasks added in v0.9.33

func (tm *TaskMap) GetTasks() (tasks []*Task, err error)

func (*TaskMap) Has added in v0.9.62

func (tm *TaskMap) Has(taskid Task_Unique_Identifier, lock bool) (ok bool, err error)

func (*TaskMap) Len added in v0.9.33

func (tm *TaskMap) Len() (length int, err error)

type TaskPerf

type TaskPerf struct {
	Queued       int64   `bson:"queued" json:"queued"`
	Start        int64   `bson:"start" json:"start"`
	End          int64   `bson:"end" json:"end"`
	Resp         int64   `bson:"resp" json:"resp"` //End -Queued
	InFileSizes  []int64 `bson:"size_infile" json:"size_infile"`
	OutFileSizes []int64 `bson:"size_outfile" json:"size_outfile"`
}

func NewTaskPerf

func NewTaskPerf(id string) *TaskPerf

type TaskRaw added in v0.9.33

type TaskRaw struct {
	RWMutex                `bson:"-" json:"-"`
	Task_Unique_Identifier `bson:",inline"`

	Id                  string                   `bson:"taskid" json:"taskid"` // old-style
	TaskType            string                   `bson:"task_type" json:"task_type"`
	Info                *Info                    `bson:"-" json:"-"` // this is just a pointer to the job.Info
	Cmd                 *Command                 `bson:"cmd" json:"cmd"`
	Partition           *PartInfo                `bson:"partinfo" json:"-"`
	DependsOn           []string                 `bson:"dependsOn" json:"dependsOn"` // only needed if dependency cannot be inferred from Input.Origin
	TotalWork           int                      `bson:"totalwork" json:"totalwork"`
	MaxWorkSize         int                      `bson:"maxworksize"   json:"maxworksize"`
	RemainWork          int                      `bson:"remainwork" json:"remainwork"`
	ResetTask           bool                     `bson:"resettask" json:"-"` // trigged by function - resume, recompute, resubmit
	State               string                   `bson:"state" json:"state"`
	CreatedDate         time.Time                `bson:"createdDate" json:"createddate"`
	StartedDate         time.Time                `bson:"startedDate" json:"starteddate"`
	CompletedDate       time.Time                `bson:"completedDate" json:"completeddate"`
	ComputeTime         int                      `bson:"computetime" json:"computetime"`
	UserAttr            map[string]interface{}   `bson:"userattr" json:"userattr"`
	ClientGroups        string                   `bson:"clientgroups" json:"clientgroups"`
	WorkflowStep        *cwl.WorkflowStep        `bson:"workflowStep" json:"workflowStep"` // CWL-only
	StepOutputInterface interface{}              `bson:"stepOutput" json:"stepOutput"`     // CWL-only
	StepInput           *cwl.Job_document        `bson:"-" json:"-"`                       // CWL-only
	StepOutput          *cwl.Job_document        `bson:"-" json:"-"`                       // CWL-only
	Scatter_task        bool                     `bson:"scatter_task" json:"scatter_task"` // CWL-only, indicates if this is a scatter_task TODO: compare with TaskType ?
	Children            []Task_Unique_Identifier `bson:"children" json:"children"`         // CWL-only, list of all children in a subworkflow task
	Children_ptr        []*Task                  `bson:"-" json:"-"`                       // CWL-only
	Finalizing          bool                     `bson:"-" json:"-"`                       // CWL-only, a lock mechanism
}

func NewTaskRaw added in v0.9.33

func NewTaskRaw(task_id Task_Unique_Identifier, info *Info) (tr TaskRaw, err error)

func (*TaskRaw) Finalize added in v0.9.62

func (task *TaskRaw) Finalize() (ok bool, err error)

this function prevents a dead-lock when a sub-workflow task finalizes

func (*TaskRaw) GetChildren added in v0.9.62

func (task *TaskRaw) GetChildren(qm *ServerMgr) (children []*Task, err error)

func (*TaskRaw) GetDependsOn added in v0.9.33

func (task *TaskRaw) GetDependsOn() (dep []string, err error)

func (*TaskRaw) GetId added in v0.9.33

func (task *TaskRaw) GetId(me string) (id Task_Unique_Identifier, err error)

func (*TaskRaw) GetJobId added in v0.9.33

func (task *TaskRaw) GetJobId() (id string, err error)

func (*TaskRaw) GetParent added in v0.9.62

func (task *TaskRaw) GetParent() (p string, err error)

func (*TaskRaw) GetState added in v0.9.33

func (task *TaskRaw) GetState() (state string, err error)

func (*TaskRaw) GetStateNamed added in v0.9.33

func (task *TaskRaw) GetStateNamed(name string) (state string, err error)

only for debugging purposes

func (*TaskRaw) GetTaskType added in v0.9.62

func (task *TaskRaw) GetTaskType() (type_str string, err error)

func (*TaskRaw) InitRaw added in v0.9.36

func (task *TaskRaw) InitRaw(job *Job) (changed bool, err error)

func (*TaskRaw) SetCompletedDate added in v0.9.43

func (task *TaskRaw) SetCompletedDate(t time.Time, lock bool) (err error)

func (*TaskRaw) SetCreatedDate added in v0.9.33

func (task *TaskRaw) SetCreatedDate(t time.Time) (err error)

func (*TaskRaw) SetStartedDate added in v0.9.33

func (task *TaskRaw) SetStartedDate(t time.Time) (err error)

func (*TaskRaw) SetState added in v0.9.33

func (task *TaskRaw) SetState(new_state string, write_lock bool) (err error)

func (*TaskRaw) SetStepOutput added in v0.9.62

func (task *TaskRaw) SetStepOutput(jd *cwl.Job_document, lock bool) (err error)

type Task_Unique_Identifier added in v0.9.62

type Task_Unique_Identifier struct {
	TaskName string `bson:"task_name" json:"task_name" mapstructure:"task_name"` // example: #main/filter
	Parent   string `bson:"parent" json:"parent" mapstructure:"parent"`
	JobId    string `bson:"jobid" json:"jobid" mapstructure:"jobid"`
}

func New_Task_Unique_Identifier added in v0.9.62

func New_Task_Unique_Identifier(jobid string, parent string, taskname string) (t Task_Unique_Identifier, err error)

func New_Task_Unique_Identifier_FromString added in v0.9.62

func New_Task_Unique_Identifier_FromString(old_style_id string) (t Task_Unique_Identifier, err error)

func (Task_Unique_Identifier) String added in v0.9.62

func (taskid Task_Unique_Identifier) String() (s string, err error)

type Task_p added in v0.9.3

type Task_p struct {
	Cmd *Command_p `json:"cmd"`
}

type WorkList

type WorkList []*Workunit

queuing/prioritizing related functions

func (WorkList) Len

func (wl WorkList) Len() int

func (WorkList) Swap

func (wl WorkList) Swap(i, j int)

type WorkLog added in v0.9.27

type WorkLog struct {
	Id   string            `bson:"wuid" json:"wuid"` // TODO change !
	Rank int               `bson:"rank" json:"rank"`
	Logs map[string]string `bson:"logs" json:"logs"`
}

func NewWorkLog added in v0.9.27

func NewWorkLog(id Workunit_Unique_Identifier) (wlog *WorkLog, err error)

type WorkMgr

type WorkMgr interface {
	GetWorkById(Workunit_Unique_Identifier) (*Workunit, error)
	ShowWorkunits(string) ([]*Workunit, error)
	ShowWorkunitsByUser(string, *user.User) []*Workunit
	CheckoutWorkunits(string, string, *Client, int64, int) ([]*Workunit, error)
	NotifyWorkStatus(Notice)
	EnqueueWorkunit(*Workunit) error
	FetchDataToken(Workunit_Unique_Identifier, string) (string, error)
	FetchPrivateEnv(Workunit_Unique_Identifier, string) (map[string]string, error)
}

type WorkPerf

type WorkPerf struct {
	Queued             int64   `bson:"queued" json:"queued"`                   // WQ (queued at server or client, depending on who creates it)
	Done               int64   `bson:"done" json:"done"`                       // WD (done at server)
	Resp               int64   `bson:"resp" json:"resp"`                       // Done - Queued (server metric)
	Checkout           int64   `bson:"checkout" json:"checkout"`               // checkout at client
	Deliver            int64   `bson:"deliver" json:"deliver"`                 // done at client
	ClientResp         int64   `bson:"clientresp" json:"clientresp"`           // Deliver - Checkout (client metric)
	PreDataIn          float64 `bson:"time_predata_in" json:"time_predata_in"` // time in seconds for downloading prerequisite data at client
	DataIn             float64 `bson:"time_data_in" json:"time_data_in"`       // time in seconds for input data move-in at client
	DataOut            float64 `bson:"time_data_out" json:"time_data_out"`     // time in seconds for output data move-out at client
	Runtime            int64   `bson:"runtime" json:"runtime"`                 // time in seconds for computation at client
	DockerPrep         int64   `bson:"dockerprep" json:"dockerprep"`           // time in seconds for docker preparation on client
	MaxMemUsage        int64   `bson:"max_mem_usage" json:"max_mem_usage"`     // maxium memory consumption
	MaxMemoryTotalRss  int64   `bson:"max_memory_total_rss" json:"max_memory_total_rss"`
	MaxMemoryTotalSwap int64   `bson:"max_memory_total_swap" json:"max_memory_total_swap"`
	ClientId           string  `bson:"client_id" json:"client_id"`
	PreDataSize        int64   `bson:"size_predata" json:"size_predata"` //predata moved over network
	InFileSize         int64   `bson:"size_infile" json:"size_infile"`   //input file moved over network
	OutFileSize        int64   `bson:"size_outfile" json:"size_outfile"` //outpuf file moved over network
}

func NewWorkPerf

func NewWorkPerf() *WorkPerf

type WorkQueue added in v0.9.33

type WorkQueue struct {
	Queue    WorkunitMap // WORK_STAT_QUEUED - waiting workunits
	Checkout WorkunitMap // WORK_STAT_CHECKOUT - workunits being checked out
	Suspend  WorkunitMap // WORK_STAT_SUSPEND - suspended workunits
	// contains filtered or unexported fields
}

func NewWorkQueue added in v0.9.33

func NewWorkQueue() *WorkQueue

func (*WorkQueue) Add added in v0.9.33

func (wq *WorkQueue) Add(workunit *Workunit) (err error)

func (*WorkQueue) Clean added in v0.9.33

func (wq *WorkQueue) Clean() (workunits []*Workunit)

func (*WorkQueue) Delete added in v0.9.33

func (wq *WorkQueue) Delete(id Workunit_Unique_Identifier) (err error)

func (*WorkQueue) Get added in v0.9.33

func (wq *WorkQueue) Get(id Workunit_Unique_Identifier) (w *Workunit, ok bool, err error)

func (*WorkQueue) GetAll added in v0.9.33

func (wq *WorkQueue) GetAll() (worklist []*Workunit, err error)

func (*WorkQueue) GetForJob added in v0.9.33

func (wq *WorkQueue) GetForJob(jobid string) (worklist []*Workunit, err error)

func (*WorkQueue) Has added in v0.9.33

func (wq *WorkQueue) Has(id Workunit_Unique_Identifier) (has bool, err error)

func (*WorkQueue) Len added in v0.9.33

func (wq *WorkQueue) Len() (int, error)

func (*WorkQueue) StatusChange added in v0.9.33

func (wq *WorkQueue) StatusChange(id Workunit_Unique_Identifier, workunit *Workunit, new_status string, reason string) (err error)

type WorkerRuntime added in v0.9.62

type WorkerRuntime struct {
	Id           string `bson:"id" json:"id"`     // this is a uuid (the only relevant identifier)
	Name         string `bson:"name" json:"name"` // this can be anything you want
	Group        string `bson:"group" json:"group"`
	User         string `bson:"user" json:"user"`
	Domain       string `bson:"domain" json:"domain"`
	InstanceId   string `bson:"instance_id" json:"instance_id"`     // Openstack specific
	InstanceType string `bson:"instance_type" json:"instance_type"` // Openstack specific
	//Host          string   `bson:"host" json:"host"`                   // deprecated
	Hostname      string   `bson:"hostname" json:"hostname"`
	Host_ip       string   `bson:"host_ip" json:"host_ip"` // Host can be physical machine or VM, whatever is helpful for management
	CPUs          int      `bson:"cores" json:"cores"`
	Apps          []string `bson:"apps" json:"apps"`
	GitCommitHash string   `bson:"git_commit_hash" json:"git_commit_hash"`
	Version       string   `bson:"version" json:"version"`
}

worker info that does not change at runtime

type WorkerState added in v0.9.62

type WorkerState struct {
	Busy         bool          `bson:"busy" json:"busy"` // a state
	Current_work *WorkunitList `bson:"current_work" json:"current_work"`
}

changes at runtime

func NewWorkerState added in v0.9.62

func NewWorkerState() (ws *WorkerState)

type Workflow

type Workflow struct {
	WfInfo     awf_info          `bson:"workflow_info" json:"workflow_info"`
	JobInfo    awf_jobinfo       `bson:"job_info" json:"job_info"`
	RawInputs  map[string]string `bson:"raw_inputs" json:"raw_inputs"`
	Variables  map[string]string `bson:"variables" json:"variables"`
	DataServer string            `bson:"data_server" json:"data_server"`
	Tasks      []*awf_task       `bson:"tasks" json:"tasks"`
}

type WorkflowInstance added in v0.9.62

type WorkflowInstance struct {
	Id          string           `bson:"id" json:"id" mapstructure:"id"`
	Inputs      cwl.Job_document `bson:"inputs" json:"inputs" mapstructure:"inputs"`
	Outputs     cwl.Job_document `bson:"outputs" json:"outputs" mapstructure:"outputs"`
	RemainTasks int              `bson:"remaintasks" json:"remaintasks" mapstructure:"remaintasks"`
}

func NewWorkflowInstanceFromInterface added in v0.9.62

func NewWorkflowInstanceFromInterface(original interface{}) (wi WorkflowInstance, err error)

type WorkflowMgr

type WorkflowMgr struct {
	// contains filtered or unexported fields
}
var (
	AwfMgr *WorkflowMgr
)

func NewWorkflowMgr

func NewWorkflowMgr() *WorkflowMgr

func (*WorkflowMgr) AddWorkflow

func (wfm *WorkflowMgr) AddWorkflow(name string, awf *Workflow)

func (*WorkflowMgr) GetAllWorkflows

func (wfm *WorkflowMgr) GetAllWorkflows() (workflows []*Workflow)

func (*WorkflowMgr) GetWorkflow

func (wfm *WorkflowMgr) GetWorkflow(name string) (awf *Workflow, err error)

func (*WorkflowMgr) LoadWorkflows

func (wfm *WorkflowMgr) LoadWorkflows() (err error)

type Workunit

type Workunit struct {
	Workunit_Unique_Identifier `bson:",inline" json:",inline" mapstructure:",squash"`
	Id                         string                 `bson:"id,omitempty" json:"id,omitempty" mapstructure:"id,omitempty"`       // global identifier: jobid_taskid_rank (for backwards coompatibility only)
	WuId                       string                 `bson:"wuid,omitempty" json:"wuid,omitempty" mapstructure:"wuid,omitempty"` // deprecated !
	Info                       *Info                  `bson:"info,omitempty" json:"info,omitempty" mapstructure:"info,omitempty"`
	Inputs                     []*IO                  `bson:"inputs,omitempty" json:"inputs,omitempty" mapstructure:"inputs,omitempty"`
	Outputs                    []*IO                  `bson:"outputs,omitempty" json:"outputs,omitempty" mapstructure:"outputs,omitempty"`
	Predata                    []*IO                  `bson:"predata,omitempty" json:"predata,omitempty" mapstructure:"predata,omitempty"`
	Cmd                        *Command               `bson:"cmd,omitempty" json:"cmd,omitempty" mapstructure:"cmd,omitempty"`
	TotalWork                  int                    `bson:"totalwork,omitempty" json:"totalwork,omitempty" mapstructure:"totalwork,omitempty"`
	Partition                  *PartInfo              `bson:"part,omitempty" json:"part,omitempty" mapstructure:"part,omitempty"`
	State                      string                 `bson:"state,omitempty" json:"state,omitempty" mapstructure:"state,omitempty"`
	Failed                     int                    `bson:"failed,omitempty" json:"failed,omitempty" mapstructure:"failed,omitempty"`
	CheckoutTime               time.Time              `bson:"checkout_time,omitempty" json:"checkout_time,omitempty" mapstructure:"checkout_time,omitempty"`
	Client                     string                 `bson:"client,omitempty" json:"client,omitempty" mapstructure:"client,omitempty"`
	ComputeTime                int                    `bson:"computetime,omitempty" json:"computetime,omitempty" mapstructure:"computetime,omitempty"`
	ExitStatus                 int                    `bson:"exitstatus,omitempty" json:"exitstatus,omitempty" mapstructure:"exitstatus,omitempty"` // Linux Exit Status Code (0 is success)
	Notes                      []string               `bson:"notes,omitempty" json:"notes,omitempty" mapstructure:"notes,omitempty"`
	UserAttr                   map[string]interface{} `bson:"userattr,omitempty" json:"userattr,omitempty" mapstructure:"userattr,omitempty"`
	ShockHost                  string                 `bson:"shockhost,omitempty" json:"shockhost,omitempty" mapstructure:"shockhost,omitempty"` // specifies default Shock host for outputs
	CWL_workunit               *CWL_workunit          `bson:"cwl,omitempty" json:"cwl,omitempty" mapstructure:"cwl,omitempty"`
	WorkPath                   string                 // this is the working directory. If empty, it will be computed.
	WorkPerf                   *WorkPerf
}

func NewWorkunit

func NewWorkunit(qm *ServerMgr, task *Task, rank int, job *Job) (workunit *Workunit, err error)

func (*Workunit) CDworkpath

func (work *Workunit) CDworkpath() (err error)

func (*Workunit) GetId added in v0.9.62

func (w *Workunit) GetId() (id Workunit_Unique_Identifier)

func (*Workunit) GetNotes added in v0.9.49

func (work *Workunit) GetNotes() string

func (*Workunit) Mkdir

func (work *Workunit) Mkdir() (err error)

func (*Workunit) Part

func (work *Workunit) Part() (part string)

calculate the range of data part algorithm: try to evenly distribute indexed parts to workunits e.g. totalWork=4, totalParts=10, then each workunits have parts 3,3,2,2

func (*Workunit) Path

func (work *Workunit) Path() (path string, err error)

func (*Workunit) RemoveDir

func (work *Workunit) RemoveDir() (err error)

func (*Workunit) SetState added in v0.9.36

func (work *Workunit) SetState(new_state string, reason string) (err error)

type WorkunitList added in v0.9.62

type WorkunitList struct {
	RWMutex `bson:"-" json:"-"`

	Data []string `json:"data"`
	// contains filtered or unexported fields
}

func NewWorkunitList added in v0.9.62

func NewWorkunitList() *WorkunitList

func (*WorkunitList) Add added in v0.9.62

func (cl *WorkunitList) Add(workid Workunit_Unique_Identifier) (err error)

lock always

func (*WorkunitList) Delete added in v0.9.62

func (cl *WorkunitList) Delete(workid Workunit_Unique_Identifier, write_lock bool) (err error)

func (*WorkunitList) Delete_all added in v0.9.62

func (cl *WorkunitList) Delete_all(workid string, write_lock bool) (err error)

func (*WorkunitList) FillMap added in v0.9.62

func (cl *WorkunitList) FillMap() (err error)

opposite of sync; take Data entries and copy them into map

func (*WorkunitList) Get_list added in v0.9.62

func (cl *WorkunitList) Get_list(do_read_lock bool) (assigned_work_ids []Workunit_Unique_Identifier, err error)

func (*WorkunitList) Get_string_list added in v0.9.62

func (cl *WorkunitList) Get_string_list(do_read_lock bool) (work_ids []string, err error)

func (*WorkunitList) Has added in v0.9.62

func (cl *WorkunitList) Has(workid Workunit_Unique_Identifier) (ok bool, err error)

func (*WorkunitList) Init added in v0.9.62

func (this *WorkunitList) Init(name string)

func (*WorkunitList) Length added in v0.9.62

func (cl *WorkunitList) Length(lock bool) (clength int, err error)

type WorkunitMap added in v0.9.33

type WorkunitMap struct {
	RWMutex
	Map map[Workunit_Unique_Identifier]*Workunit
}

func NewWorkunitMap added in v0.9.33

func NewWorkunitMap() *WorkunitMap

func (*WorkunitMap) Delete added in v0.9.33

func (wm *WorkunitMap) Delete(id Workunit_Unique_Identifier) (err error)

func (*WorkunitMap) Get added in v0.9.33

func (wm *WorkunitMap) Get(id Workunit_Unique_Identifier) (workunit *Workunit, ok bool, err error)

func (*WorkunitMap) GetWorkunits added in v0.9.33

func (wm *WorkunitMap) GetWorkunits() (workunits []*Workunit, err error)

func (*WorkunitMap) Len added in v0.9.33

func (wm *WorkunitMap) Len() (length int, err error)

func (*WorkunitMap) Set added in v0.9.33

func (wm *WorkunitMap) Set(workunit *Workunit) (err error)

type Workunit_Unique_Identifier added in v0.9.62

type Workunit_Unique_Identifier struct {
	Task_Unique_Identifier `bson:",inline" json:",inline" mapstructure:",squash"` // TaskName, Workflow, JobId
	Rank                   int                                                    `bson:"rank" json:"rank" mapstructure:"rank"` // this is the local identifier

}

func New_Workunit_Unique_Identifier added in v0.9.62

func New_Workunit_Unique_Identifier(task Task_Unique_Identifier, rank int) (wui Workunit_Unique_Identifier)

func New_Workunit_Unique_Identifier_FromString added in v0.9.62

func New_Workunit_Unique_Identifier_FromString(old_style_id string) (w Workunit_Unique_Identifier, err error)

func New_Workunit_Unique_Identifier_from_interface added in v0.9.62

func New_Workunit_Unique_Identifier_from_interface(original interface{}) (wui Workunit_Unique_Identifier, err error)

func (Workunit_Unique_Identifier) GetTask added in v0.9.62

func (Workunit_Unique_Identifier) String added in v0.9.62

func (w Workunit_Unique_Identifier) String() (work_str string, err error)

type WorkunitsSortby added in v0.9.8

type WorkunitsSortby struct {
	Order     string
	Direction string
	Workunits []*Workunit
}

func (WorkunitsSortby) Len added in v0.9.8

func (w WorkunitsSortby) Len() int

func (WorkunitsSortby) Less added in v0.9.8

func (w WorkunitsSortby) Less(i, j int) bool

func (WorkunitsSortby) Swap added in v0.9.8

func (w WorkunitsSortby) Swap(i, j int)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL