core

package
v0.0.0-...-f5fddd7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2014 License: BSD-2-Clause Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CLIENT_STAT_ACTIVE  = "active"
	CLIENT_STAT_SUSPEND = "suspend"
)
View Source
const (
	JOB_STAT_SUBMITTED  = "submitted"
	JOB_STAT_INPROGRESS = "in-progress"
	JOB_STAT_COMPLETED  = "completed"
	JOB_STAT_SUSPEND    = "suspend"
	JOB_STAT_DELETED    = "deleted"
)
View Source
const (
	TASK_STAT_INIT      = "init"
	TASK_STAT_QUEUED    = "queued"
	TASK_STAT_PENDING   = "pending"
	TASK_STAT_SUSPEND   = "suspend"
	TASK_STAT_COMPLETED = "completed"
	TASK_STAT_SKIPPED   = "user_skipped"
	TASK_STAT_FAIL_SKIP = "skipped"
	TASK_STAT_PASSED    = "passed"
)
View Source
const (
	WORK_STAT_QUEUED      = "queued"
	WORK_STAT_CHECKOUT    = "checkout"
	WORK_STAT_SUSPEND     = "suspend"
	WORK_STAT_DONE        = "done"
	WORK_STAT_FAIL        = "fail"
	WORK_STAT_PREPARED    = "prepared"
	WORK_STAT_COMPUTED    = "computed"
	WORK_STAT_PROXYQUEUED = "proxyqueued"
)

Variables

View Source
var (
	QMgr          ResourceMgr
	Service       string = "unknown"
	Self          *Client
	ProxyWorkChan chan bool
)
View Source
var (
	//<inputs::i1> <inputs::i2> <outputs::o1>
	TemplateRe = regexp.MustCompile("\\w+::\\w+")
)

Functions

func GetJobIdByTaskId

func GetJobIdByTaskId(taskid string) (jobid string, err error)

misc

func GetJobIdByWorkId

func GetJobIdByWorkId(workid string) (jobid string, err error)

func InitAwfMgr

func InitAwfMgr()

func InitClientProfile

func InitClientProfile(profile *Client)

func InitJobDB

func InitJobDB()

func InitProxyWorkChan

func InitProxyWorkChan()

func InitResMgr

func InitResMgr(service string)

func IsFirstTask

func IsFirstTask(taskid string) bool

func NotifyWorkunitProcessed

func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error)

functions for REST API communication notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"

func PostNode

func PostNode(io *IO, numParts int) (nodeid string, err error)

create a shock node for output (=deprecated=)

func PostNodeWithToken

func PostNodeWithToken(io *IO, numParts int, token string) (nodeid string, err error)

func PushOutputData

func PushOutputData(work *Workunit) (err error)

func ReloadFromDisk

func ReloadFromDisk(path string) (err error)

func ShockPutIndex

func ShockPutIndex(host string, nodeid string, indexname string, token string) (err error)

func UpdateJobState

func UpdateJobState(jobid string, newstate string) (err error)

Types

type CQMgr

type CQMgr struct {
	// contains filtered or unexported fields
}

func NewCQMgr

func NewCQMgr() *CQMgr

func (*CQMgr) CheckoutWorkunits

func (qm *CQMgr) CheckoutWorkunits(req_policy string, client_id string, num int) (workunits []*Workunit, err error)

func (*CQMgr) ClientChecker

func (qm *CQMgr) ClientChecker()

func (*CQMgr) ClientHeartBeat

func (qm *CQMgr) ClientHeartBeat(id string) (hbmsg HBmsg, err error)

func (*CQMgr) DeleteClient

func (qm *CQMgr) DeleteClient(id string)

func (*CQMgr) EnqueueWorkunit

func (qm *CQMgr) EnqueueWorkunit(work *Workunit) (err error)

func (*CQMgr) FetchDataToken

func (qm *CQMgr) FetchDataToken(workid string, clientid string) (token string, err error)

func (*CQMgr) GetAllClients

func (qm *CQMgr) GetAllClients() []*Client

func (*CQMgr) GetClient

func (qm *CQMgr) GetClient(id string) (client *Client, err error)

func (*CQMgr) GetWorkById

func (qm *CQMgr) GetWorkById(id string) (workunit *Workunit, err error)

func (*CQMgr) Handle

func (qm *CQMgr) Handle()

func (*CQMgr) NotifyWorkStatus

func (qm *CQMgr) NotifyWorkStatus(notice Notice)

func (*CQMgr) RegisterNewClient

func (qm *CQMgr) RegisterNewClient(files FormFiles) (client *Client, err error)

func (*CQMgr) ShowWorkQueue

func (qm *CQMgr) ShowWorkQueue()

show functions used in debug

func (*CQMgr) ShowWorkunits

func (qm *CQMgr) ShowWorkunits(status string) (workunits []*Workunit)

func (*CQMgr) Timer

func (qm *CQMgr) Timer()

func (*CQMgr) UpdateSubClients

func (qm *CQMgr) UpdateSubClients(id string, count int)

type Client

type Client struct {
	Id              string          `bson:"id" json:"id"`
	Name            string          `bson:"name" json:"name"`
	Group           string          `bson:"group" json:"group"`
	User            string          `bson:"user" json:"user"`
	Host            string          `bson:"host" json:"host"`
	CPUs            int             `bson:"cores" json:"cores"`
	Apps            []string        `bson:"apps" json:"apps"`
	RegTime         time.Time       `bson:"regtime" json:"regtime"`
	Serve_time      string          `bson:"serve_time" json:"serve_time"`
	Idle_time       int             `bson:"idle_time" json:"idle_time"`
	Status          string          `bson:"Status" json:"Status"`
	Total_checkout  int             `bson:"total_checkout" json:"total_checkout"`
	Total_completed int             `bson:"total_completed" json:"total_completed"`
	Total_failed    int             `bson:"total_failed" json:"total_failed"`
	Current_work    map[string]bool `bson:"current_work" json:"current_work"`
	Skip_work       []string        `bson:"skip_work" json:"skip_work"`
	Last_failed     int             `bson:"-" json:"-"`
	Tag             bool            `bson:"-" json:"-"`
	Proxy           bool            `bson:"proxy" json:"proxy"`
	SubClients      int             `bson:"subclients" json:"subclients"`
}

func NewClient

func NewClient() (client *Client)

func NewProfileClient

func NewProfileClient(filepath string) (client *Client, err error)

func (*Client) IsBusy

func (cl *Client) IsBusy() bool

type ClientMgr

type ClientMgr interface {
	RegisterNewClient(FormFiles) (*Client, error)
	ClientHeartBeat(string) (HBmsg, error)
	GetClient(string) (*Client, error)
	GetAllClients() []*Client
	DeleteClient(string)
	ClientChecker()
	UpdateSubClients(id string, count int)
}

type ClientWorkMgr

type ClientWorkMgr interface {
	ClientMgr
	WorkMgr
}

type CoAck

type CoAck struct {
	// contains filtered or unexported fields
}

type CoReq

type CoReq struct {
	// contains filtered or unexported fields
}

type Command

type Command struct {
	Name        string   `bson:"name" json:"name"`
	Options     string   `bson:"options" json:"-"`
	Args        string   `bson:"args" json:"args"`
	Template    string   `bson:"template" json:"-"`
	Description string   `bson:"description" json:"description"`
	ParsedArgs  []string `bson:"-" json:"-"`
}

func NewCommand

func NewCommand(name string) *Command

func (*Command) Substitute

func (c *Command) Substitute(inputs *IOmap, outputs *IOmap) (err error)

type FormFile

type FormFile struct {
	Name     string
	Path     string
	Checksum map[string]string
}

type FormFiles

type FormFiles map[string]FormFile

type HBmsg

type HBmsg map[string]string //map[op]obj1,obj2 e.g. map[discard]=work1,work2

heartbeat response from awe-server to awe-client used for issue operation request to client, e.g. discard suspended workunits

type IO

type IO struct {
	Name      string `bson:"name" json:"-"`
	Host      string `bson:"host" json:"host"`
	Node      string `bson:"node" json:"node"`
	Url       string `bson:"url"  json:"url"`
	Size      int64  `bson:"size" json:"size"`
	MD5       string `bson:"md5" json:"-"`
	Cache     bool   `bson:"cache" json:"-"`
	Origin    string `bson:"origin" json:"origin"`
	Path      string `bson:"path" json:"-"`
	Optional  bool   `bson:"optional" json:"-"`
	Nonzero   bool   `bson:"nonzero"  json:"nonzero"`
	DataToken string `bson:"datatoken"  json:"-"`
}

func NewIO

func NewIO() *IO

func (*IO) DataUrl

func (io *IO) DataUrl() string

func (*IO) GetFileSize

func (io *IO) GetFileSize() int64

func (*IO) GetIndexInfo

func (io *IO) GetIndexInfo() (idxinfo map[string]IdxInfo, err error)

func (*IO) GetIndexUnits

func (io *IO) GetIndexUnits(indextype string) (totalunits int, err error)

func (*IO) GetShockNode

func (io *IO) GetShockNode() (node *ShockNode, err error)

func (*IO) TotalUnits

func (io *IO) TotalUnits(indextype string) (count int, err error)

type IOmap

type IOmap map[string]*IO // [filename]attributes

func NewIOmap

func NewIOmap() IOmap

func (IOmap) Add

func (i IOmap) Add(name string, host string, node string, md5 string, cache bool)

func (IOmap) Find

func (i IOmap) Find(name string) *IO

func (IOmap) Has

func (i IOmap) Has(name string) bool

type IdxInfo

type IdxInfo struct {
	Type        string `bson:"index_type" json:"-"`
	TotalUnits  int64  `bson:"total_units" json:"total_units"`
	AvgUnitSize int64  `bson:"average_unit_size" json:"average_unit_size"`
}

type Info

type Info struct {
	Name         string    `bson:"name" json:"name"`
	Xref         string    `bson:"xref" json:"xref"`
	Project      string    `bson:"project" json:"project"`
	User         string    `bson:"user" json:"user"`
	Pipeline     string    `bson:"pipeline" json:"pipeline"`
	ClientGroups string    `bson:"clientgroups" json:"clientgroups"`
	SubmitTime   time.Time `bson:"submittime" json:"submittime"`
	Priority     int       `bson:"priority" json:"-"`
	Auth         bool      `bson:"auth" json:"auth"`
	DataToken    string    `bson:"datatoken" json:"-"`
}

func NewInfo

func NewInfo() *Info

type Job

type Job struct {
	Id          string    `bson:"id" json:"id"`
	Jid         string    `bson:"jid" json:"jid"`
	Info        *Info     `bson:"info" json:"info"`
	Tasks       []*Task   `bson:"tasks" json:"tasks"`
	Script      script    `bson:"script" json:"-"`
	State       string    `bson:"state" json:"state"`
	RemainTasks int       `bson:"remaintasks" json:"remaintasks"`
	UpdateTime  time.Time `bson:"updatetime" json:"updatetime"`
	Notes       string    `bson:"notes" json:"notes"`
}

func AwfToJob

func AwfToJob(awf *Workflow, jid string) (job *Job, err error)

func CreateJobUpload

func CreateJobUpload(params map[string]string, files FormFiles, jid string) (job *Job, err error)

func LoadJob

func LoadJob(id string) (job *Job, err error)

func ParseAwf

func ParseAwf(filename string, jid string) (job *Job, err error)

parse .awf.json - sudo-function only, to be finished

func ParseJobTasks

func ParseJobTasks(filename string, jid string) (job *Job, err error)

parse job by job script

func (*Job) FilePath

func (job *Job) FilePath() string

func (*Job) GetDataToken

func (job *Job) GetDataToken() (token string)

func (*Job) Mkdir

func (job *Job) Mkdir() (err error)

func (*Job) NumTask

func (job *Job) NumTask() int

func (*Job) Path

func (job *Job) Path() string

---Path functions

func (*Job) Save

func (job *Job) Save() (err error)

func (*Job) SetDataToken

func (job *Job) SetDataToken(token string)

set token

func (*Job) SetFile

func (job *Job) SetFile(file FormFile) (err error)

func (*Job) TaskList

func (job *Job) TaskList() []*Task

---Task functions

func (*Job) UpdateFile

func (job *Job) UpdateFile(params map[string]string, files FormFiles) (err error)

---Script upload

func (*Job) UpdateState

func (job *Job) UpdateState(newState string, notes string) (err error)

---Field update functions

func (*Job) UpdateTask

func (job *Job) UpdateTask(task *Task) (remainTasks int, err error)

invoked to modify job info in mongodb when a task in that job changed to the new status

type JobMgr

type JobMgr interface {
	JobRegister() (string, error)
	EnqueueTasksByJobId(string, []*Task) error
	GetActiveJobs() map[string]*JobPerf
	GetSuspendJobs() map[string]bool
	SuspendJob(string, string) error
	ResumeSuspendedJob(string) error
	ResubmitJob(string) error
	DeleteJob(string) error
	DeleteSuspendedJobs() int
	InitMaxJid() error
	RecoverJobs() error
	FinalizeWorkPerf(string, string) error
	RecomputeJob(string, string) error
	UpdateGroup(string, string) error
}

type JobPerf

type JobPerf struct {
	Id     string
	Queued int64
	//Start  int64
	End    int64
	Resp   int64 //End - Queued
	Ptasks map[string]*TaskPerf
	Pworks map[string]*WorkPerf
}

func NewJobPerf

func NewJobPerf(id string) *JobPerf

type Jobs

type Jobs []Job

Job array type

func (*Jobs) GetAll

func (n *Jobs) GetAll(q bson.M) (err error)

func (*Jobs) GetAllLimitOffset

func (n *Jobs) GetAllLimitOffset(q bson.M, limit int, offset int) (err error)

func (*Jobs) GetAllRecent

func (n *Jobs) GetAllRecent(q bson.M, recent int) (count int, err error)

func (*Jobs) GetJobAt

func (n *Jobs) GetJobAt(index int) Job

func (*Jobs) GetPaginated

func (n *Jobs) GetPaginated(q bson.M, limit int, offset int) (count int, err error)

func (*Jobs) Length

func (n *Jobs) Length() int

type Notice

type Notice struct {
	WorkId   string
	Status   string
	ClientId string
	Notes    string
}

type Opts

type Opts map[string]string

func (*Opts) HasKey

func (o *Opts) HasKey(key string) bool

func (*Opts) Value

func (o *Opts) Value(key string) string

type PartInfo

type PartInfo struct {
	Input         string `bson:"input" json:"input"`
	Index         string `bson:"index" json:"index"`
	TotalIndex    int    `bson:"totalindex" json:"totalindex"`
	MaxPartSizeMB int    `bson:"maxpartsize_mb" json:"maxpartsize_mb"`
	Options       string `bson:"options" json:"-"`
}

type Pipeline

type Pipeline struct {
	Info  *Info  `bson:"info" json:"info"`
	Tasks []Task `bson:"tasks" json:"tasks"`
}

func NewPipeline

func NewPipeline() *Pipeline

type ProxyMgr

type ProxyMgr struct {
	CQMgr
}

func NewProxyMgr

func NewProxyMgr() *ProxyMgr

func (*ProxyMgr) ClientChecker

func (qm *ProxyMgr) ClientChecker()

func (*ProxyMgr) DeleteJob

func (qm *ProxyMgr) DeleteJob(jobid string) (err error)

func (*ProxyMgr) DeleteSuspendedJobs

func (qm *ProxyMgr) DeleteSuspendedJobs() (num int)

func (*ProxyMgr) EnqueueTasksByJobId

func (qm *ProxyMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)

func (*ProxyMgr) FetchDataToken

func (qm *ProxyMgr) FetchDataToken(workid string, clientid string) (token string, err error)

func (*ProxyMgr) FinalizeWorkPerf

func (qm *ProxyMgr) FinalizeWorkPerf(string, string) (err error)

func (*ProxyMgr) GetActiveJobs

func (qm *ProxyMgr) GetActiveJobs() map[string]*JobPerf

func (*ProxyMgr) GetSuspendJobs

func (qm *ProxyMgr) GetSuspendJobs() map[string]bool

func (*ProxyMgr) Handle

func (qm *ProxyMgr) Handle()

func (*ProxyMgr) InitMaxJid

func (qm *ProxyMgr) InitMaxJid() (err error)

func (*ProxyMgr) JobRegister

func (qm *ProxyMgr) JobRegister() (jid string, err error)

func (*ProxyMgr) RecomputeJob

func (qm *ProxyMgr) RecomputeJob(jobid string, stage string) (err error)

recompute jobs from specified task stage

func (*ProxyMgr) RecoverJobs

func (qm *ProxyMgr) RecoverJobs() (err error)

recover jobs not completed before awe-server restarts

func (*ProxyMgr) RegisterNewClient

func (qm *ProxyMgr) RegisterNewClient(files FormFiles) (client *Client, err error)

func (*ProxyMgr) ResubmitJob

func (qm *ProxyMgr) ResubmitJob(id string) (err error)

re-submit a job in db but not in the queue (caused by server restarting)

func (*ProxyMgr) ResumeSuspendedJob

func (qm *ProxyMgr) ResumeSuspendedJob(id string) (err error)

resubmit a suspended job

func (*ProxyMgr) ShowStatus

func (qm *ProxyMgr) ShowStatus() string

func (*ProxyMgr) SuspendJob

func (qm *ProxyMgr) SuspendJob(jobid string, reason string) (err error)

func (*ProxyMgr) Timer

func (qm *ProxyMgr) Timer()

func (*ProxyMgr) UpdateGroup

func (qm *ProxyMgr) UpdateGroup(jobid string, newgroup string) (err error)

type ResourceMgr

type ResourceMgr interface {
	ClientWorkMgr
	JobMgr
	Handle()
	ShowStatus() string
	Timer()
}

type ServerMgr

type ServerMgr struct {
	CQMgr
	// contains filtered or unexported fields
}

func NewServerMgr

func NewServerMgr() *ServerMgr

func (*ServerMgr) CreateJobPerf

func (qm *ServerMgr) CreateJobPerf(jobid string)

---perf related methods

func (*ServerMgr) CreateTaskPerf

func (qm *ServerMgr) CreateTaskPerf(taskid string)

func (*ServerMgr) CreateWorkPerf

func (qm *ServerMgr) CreateWorkPerf(workid string)

func (*ServerMgr) DeleteJob

func (qm *ServerMgr) DeleteJob(jobid string) (err error)

func (*ServerMgr) DeleteJobPerf

func (qm *ServerMgr) DeleteJobPerf(jobid string)

func (*ServerMgr) DeleteSuspendedJobs

func (qm *ServerMgr) DeleteSuspendedJobs() (num int)

func (*ServerMgr) EnqueueTasksByJobId

func (qm *ServerMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)

func (*ServerMgr) FetchDataToken

func (qm *ServerMgr) FetchDataToken(workid string, clientid string) (token string, err error)

--workunit methds (servermgr implementation)

func (*ServerMgr) FinalizeJobPerf

func (qm *ServerMgr) FinalizeJobPerf(jobid string)

func (*ServerMgr) FinalizeTaskPerf

func (qm *ServerMgr) FinalizeTaskPerf(taskid string)

func (*ServerMgr) FinalizeWorkPerf

func (qm *ServerMgr) FinalizeWorkPerf(workid string, reportfile string) (err error)

func (*ServerMgr) GetActiveJobs

func (qm *ServerMgr) GetActiveJobs() map[string]*JobPerf

func (*ServerMgr) GetSuspendJobs

func (qm *ServerMgr) GetSuspendJobs() map[string]bool

func (*ServerMgr) Handle

func (qm *ServerMgr) Handle()

func (*ServerMgr) InitMaxJid

func (qm *ServerMgr) InitMaxJid() (err error)

func (*ServerMgr) JobRegister

func (qm *ServerMgr) JobRegister() (jid string, err error)

---job methods---

func (*ServerMgr) LogJobPerf

func (qm *ServerMgr) LogJobPerf(jobid string)

func (*ServerMgr) RecomputeJob

func (qm *ServerMgr) RecomputeJob(jobid string, stage string) (err error)

recompute jobs from specified task stage

func (*ServerMgr) RecoverJobs

func (qm *ServerMgr) RecoverJobs() (err error)

recover jobs not completed before awe-server restarts

func (*ServerMgr) ResubmitJob

func (qm *ServerMgr) ResubmitJob(id string) (err error)

re-submit a job in db but not in the queue (caused by server restarting)

func (*ServerMgr) ResumeSuspendedJob

func (qm *ServerMgr) ResumeSuspendedJob(id string) (err error)

resubmit a suspended job

func (*ServerMgr) ShowStatus

func (qm *ServerMgr) ShowStatus() string

func (*ServerMgr) ShowTasks

func (qm *ServerMgr) ShowTasks()

func (*ServerMgr) SuspendJob

func (qm *ServerMgr) SuspendJob(jobid string, reason string) (err error)

func (*ServerMgr) Timer

func (qm *ServerMgr) Timer()

func (*ServerMgr) UpdateGroup

func (qm *ServerMgr) UpdateGroup(jobid string, newgroup string) (err error)

update job group

type ShockNode

type ShockNode struct {
	Id         string             `bson:"id" json:"id"`
	Version    string             `bson:"version" json:"version"`
	File       shockfile          `bson:"file" json:"file"`
	Attributes interface{}        `bson:"attributes" json:"attributes"`
	Indexes    map[string]IdxInfo `bson:"indexes" json:"indexes"`
	//Acl          Acl                `bson:"acl" json:"-"`
	VersionParts map[string]string `bson:"version_parts" json:"-"`
	Tags         []string          `bson:"tags" json:"tags"`
	//	Revisions    []ShockNode       `bson:"revisions" json:"-"`
	Linkages []linkage `bson:"linkage" json:"linkages"`
}

func ShockGet

func ShockGet(host string, nodeid string, token string) (node *ShockNode, err error)

type ShockResponse

type ShockResponse struct {
	Code int       `bson:"status" json:"status"`
	Data ShockNode `bson:"data" json:"data"`
	Errs []string  `bson:"error" json:"error"`
}

type Task

type Task struct {
	Id          string    `bson:"taskid" json:"taskid"`
	Info        *Info     `bson:"info" json:"-"`
	Inputs      IOmap     `bson:"inputs" json:"inputs"`
	Outputs     IOmap     `bson:"outputs" json:"outputs"`
	Predata     IOmap     `bson:"predata" json:"predata"`
	Cmd         *Command  `bson:"cmd" json:"cmd"`
	Partition   *PartInfo `bson:"partinfo" json:"-"`
	DependsOn   []string  `bson:"dependsOn" json:"dependsOn"`
	TotalWork   int       `bson:"totalwork" json:"totalwork"`
	MaxWorkSize int       `bson:"maxworksize"   json:"maxworksize"`
	RemainWork  int       `bson:"remainwork" json:"remainwork"`
	WorkStatus  []string  `bson:"workstatus" json:"-"`
	State       string    `bson:"state" json:"state"`
	Skip        int       `bson:"skip" json:"-"`
}

func NewTask

func NewTask(job *Job, rank int) *Task

func (*Task) InitPartIndex

func (task *Task) InitPartIndex() (err error)

get part size based on partition/index info if fail to get index info, task.TotalWork fall back to 1 and return nil

func (*Task) InitTask

func (task *Task) InitTask(job *Job, rank int) (err error)

fill some info (lacked in input json) for a task

func (*Task) ParseWorkunit

func (task *Task) ParseWorkunit() (wus []*Workunit, err error)

func (*Task) Skippable

func (task *Task) Skippable() bool

func (*Task) UpdateState

func (task *Task) UpdateState(newState string) string

type TaskPerf

type TaskPerf struct {
	Size         int64
	Queued       int64
	End          int64
	Resp         int64 //End -Queued
	InFileSizes  []int64
	OutFileSizes []int64
}

func NewTaskPerf

func NewTaskPerf(id string) *TaskPerf

type WQueue

type WQueue struct {
	// contains filtered or unexported fields
}

func NewWQueue

func NewWQueue() *WQueue

func (*WQueue) Add

func (wq *WQueue) Add(workunit *Workunit) (err error)

func (*WQueue) Delete

func (wq *WQueue) Delete(id string)

func (*WQueue) Get

func (wq *WQueue) Get(id string) (work *Workunit, ok bool)

func (*WQueue) Has

func (wq *WQueue) Has(id string) (has bool)

func (WQueue) Len

func (wq WQueue) Len() int

func (*WQueue) StatusChange

func (wq *WQueue) StatusChange(id string, new_status string) (err error)

type WorkList

type WorkList []*Workunit

queuing/prioritizing related functions

func (WorkList) Len

func (wl WorkList) Len() int

func (WorkList) Swap

func (wl WorkList) Swap(i, j int)

type WorkMgr

type WorkMgr interface {
	GetWorkById(string) (*Workunit, error)
	ShowWorkunits(string) []*Workunit
	CheckoutWorkunits(string, string, int) ([]*Workunit, error)
	NotifyWorkStatus(Notice)
	EnqueueWorkunit(*Workunit) error
	FetchDataToken(string, string) (string, error)
}

type WorkPerf

type WorkPerf struct {
	Queued     int64 // WQ (queued at server or client, depending on who creates it)
	Done       int64 // WD (done at server)
	Resp       int64 // Done - Queued (server metric)
	Checkout   int64 // checkout at client
	Deliver    int64 // done at client
	ClientResp int64 // Deliver - Checkout (client metric)
	DataIn     int64 // input data move-in at client
	DataOut    int64 // output data move-out at client
	Runtime    int64 // computing time at client
	ClientId   string
}

func NewWorkPerf

func NewWorkPerf(id string) *WorkPerf

type Workflow

type Workflow struct {
	WfInfo     awf_info          `bson:"workflow_info" json:"workflow_info"`
	JobInfo    awf_jobinfo       `bson:"job_info" json:"job_info"`
	RawInputs  map[string]string `bson:"raw_inputs" json:"raw_inputs"`
	Variables  map[string]string `bson:"variables" json:"variables"`
	DataServer string            `bson:"data_server" json:"data_server"`
	Tasks      []*awf_task       `bson:"tasks" json:"tasks"`
}

type WorkflowMgr

type WorkflowMgr struct {
	// contains filtered or unexported fields
}
var (
	AwfMgr *WorkflowMgr
)

func NewWorkflowMgr

func NewWorkflowMgr() *WorkflowMgr

func (*WorkflowMgr) AddWorkflow

func (wfm *WorkflowMgr) AddWorkflow(name string, awf *Workflow)

func (*WorkflowMgr) GetAllWorkflows

func (wfm *WorkflowMgr) GetAllWorkflows() (workflows []*Workflow)

func (*WorkflowMgr) GetWorkflow

func (wfm *WorkflowMgr) GetWorkflow(name string) (awf *Workflow, err error)

func (*WorkflowMgr) LoadWorkflows

func (wfm *WorkflowMgr) LoadWorkflows() (err error)

type Workunit

type Workunit struct {
	Id           string    `bson:"wuid" json:"wuid"`
	Info         *Info     `bson:"info" json:"info"`
	Inputs       IOmap     `bson:"inputs" json:"inputs"`
	Outputs      IOmap     `bson:"outputs" json:"outputs"`
	Predata      IOmap     `bson:"predata" json:"predata"`
	Cmd          *Command  `bson:"cmd" json:"cmd"`
	Rank         int       `bson:"rank" json:"rank"`
	TotalWork    int       `bson:"totalwork" json:"totalwork"`
	Partition    *PartInfo `bson:"part" json:"part"`
	State        string    `bson:"state" json:"state"`
	Failed       int       `bson:"failed" json:"failed"`
	CheckoutTime time.Time `bson:"checkout_time" json:"checkout_time"`
	Client       string    `bson:"client" json:"client"`
}

func NewWorkunit

func NewWorkunit(task *Task, rank int) *Workunit

func (*Workunit) CDworkpath

func (work *Workunit) CDworkpath() (err error)

func (*Workunit) IndexType

func (work *Workunit) IndexType() (indextype string)

func (*Workunit) Mkdir

func (work *Workunit) Mkdir() (err error)

func (*Workunit) Part

func (work *Workunit) Part() (part string)

calculate the range of data part algorithm: try to evenly distribute indexed parts to workunits e.g. totalWork=4, totalParts=10, then each workunits have parts 3,3,2,2

func (*Workunit) Path

func (work *Workunit) Path() string

func (*Workunit) RemoveDir

func (work *Workunit) RemoveDir() (err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL