core

package
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2014 License: BSD-2-Clause Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CLIENT_STAT_ACTIVE_BUSY = "active-busy"
	CLIENT_STAT_ACTIVE_IDLE = "active-idle"
	CLIENT_STAT_SUSPEND     = "suspend"
	CLIENT_STAT_DELETED     = "deleted"
)
View Source
const (
	JOB_STAT_INIT       = "init"
	JOB_STAT_QUEUED     = "queued"
	JOB_STAT_INPROGRESS = "in-progress"
	JOB_STAT_COMPLETED  = "completed"
	JOB_STAT_SUSPEND    = "suspend"
	JOB_STAT_DELETED    = "deleted"
)
View Source
const (
	TASK_STAT_INIT       = "init"
	TASK_STAT_QUEUED     = "queued"
	TASK_STAT_INPROGRESS = "in-progress"
	TASK_STAT_PENDING    = "pending"
	TASK_STAT_SUSPEND    = "suspend"
	TASK_STAT_COMPLETED  = "completed"
	TASK_STAT_SKIPPED    = "user_skipped"
	TASK_STAT_FAIL_SKIP  = "skipped"
	TASK_STAT_PASSED     = "passed"
)
View Source
const (
	WORK_STAT_QUEUED      = "queued"
	WORK_STAT_CHECKOUT    = "checkout"
	WORK_STAT_SUSPEND     = "suspend"
	WORK_STAT_DONE        = "done"
	WORK_STAT_FAIL        = "fail"
	WORK_STAT_PREPARED    = "prepared"
	WORK_STAT_COMPUTED    = "computed"
	WORK_STAT_PROXYQUEUED = "proxyqueued"
)

Variables

View Source
var (
	QMgr          ResourceMgr
	Service       string = "unknown"
	Self          *Client
	ProxyWorkChan chan bool
)

Functions

func GetJobIdByTaskId

func GetJobIdByTaskId(taskid string) (jobid string, err error)

misc

func GetJobIdByWorkId

func GetJobIdByWorkId(workid string) (jobid string, err error)

func GetTaskIdByWorkId

func GetTaskIdByWorkId(workid string) (taskid string, err error)

func InitAwfMgr

func InitAwfMgr()

func InitClientProfile

func InitClientProfile(profile *Client)

func InitJobDB

func InitJobDB()

func InitProxyWorkChan

func InitProxyWorkChan()

func InitResMgr

func InitResMgr(service string)

func IsFirstTask

func IsFirstTask(taskid string) bool

func NotifyWorkunitProcessed

func NotifyWorkunitProcessed(work *Workunit, perf *WorkPerf) (err error)

functions for REST API communication (=deprecated=) notify AWE server a workunit is finished with status either "failed" or "done", and with perf statistics if "done"

func NotifyWorkunitProcessedWithLogs

func NotifyWorkunitProcessedWithLogs(work *Workunit, perf *WorkPerf, sendstdlogs bool) (err error)

func PostNode

func PostNode(io *IO, numParts int) (nodeid string, err error)

create a shock node for output (=deprecated=)

func PostNodeWithToken

func PostNodeWithToken(io *IO, numParts int, token string) (nodeid string, err error)

func PushOutputData

func PushOutputData(work *Workunit) (size int64, err error)

func PutFileToShock

func PutFileToShock(filename string, host string, nodeid string, rank int, token string, attrfile string) (err error)

func ReloadFromDisk

func ReloadFromDisk(path string) (err error)

func ShockPutIndex

func ShockPutIndex(host string, nodeid string, indexname string, token string) (err error)

func UpdateJobState

func UpdateJobState(jobid string, newstate string, oldstates []string) (err error)

update job state to "newstate" only if the current state is in one of the "oldstates"

Types

type CQMgr

type CQMgr struct {
	// contains filtered or unexported fields
}

func NewCQMgr

func NewCQMgr() *CQMgr

func (*CQMgr) CheckoutWorkunits

func (qm *CQMgr) CheckoutWorkunits(req_policy string, client_id string, num int) (workunits []*Workunit, err error)

func (*CQMgr) ClientChecker

func (qm *CQMgr) ClientChecker()

func (*CQMgr) ClientHeartBeat

func (qm *CQMgr) ClientHeartBeat(id string) (hbmsg HBmsg, err error)

func (*CQMgr) DeleteClient

func (qm *CQMgr) DeleteClient(id string) (err error)

func (*CQMgr) EnqueueWorkunit

func (qm *CQMgr) EnqueueWorkunit(work *Workunit) (err error)

func (*CQMgr) FetchDataToken

func (qm *CQMgr) FetchDataToken(workid string, clientid string) (token string, err error)

func (*CQMgr) GetAllClients

func (qm *CQMgr) GetAllClients() []*Client

func (*CQMgr) GetClient

func (qm *CQMgr) GetClient(id string) (client *Client, err error)

func (*CQMgr) GetWorkById

func (qm *CQMgr) GetWorkById(id string) (workunit *Workunit, err error)

func (*CQMgr) Handle

func (qm *CQMgr) Handle()

func (*CQMgr) NotifyWorkStatus

func (qm *CQMgr) NotifyWorkStatus(notice Notice)

func (*CQMgr) ReQueueWorkunitByClient

func (qm *CQMgr) ReQueueWorkunitByClient(clientid string) (err error)

func (*CQMgr) RegisterNewClient

func (qm *CQMgr) RegisterNewClient(files FormFiles) (client *Client, err error)

func (*CQMgr) ResumeClient

func (qm *CQMgr) ResumeClient(id string) (err error)

func (*CQMgr) ResumeSuspendedClients

func (qm *CQMgr) ResumeSuspendedClients() (count int)

func (*CQMgr) ShowWorkQueue

func (qm *CQMgr) ShowWorkQueue()

show functions used in debug

func (*CQMgr) ShowWorkunits

func (qm *CQMgr) ShowWorkunits(status string) (workunits []*Workunit)

func (*CQMgr) SuspendAllClients

func (qm *CQMgr) SuspendAllClients() (count int)

func (*CQMgr) SuspendClient

func (qm *CQMgr) SuspendClient(id string) (err error)

func (*CQMgr) Timer

func (qm *CQMgr) Timer()

func (*CQMgr) UpdateSubClients

func (qm *CQMgr) UpdateSubClients(id string, count int)

type Client

type Client struct {
	Id              string          `bson:"id" json:"id"`
	Name            string          `bson:"name" json:"name"`
	Group           string          `bson:"group" json:"group"`
	User            string          `bson:"user" json:"user"`
	Domain          string          `bson:"domain" json:"domain"`
	InstanceId      string          `bson:"instance_id" json:"instance_id"`
	InstanceType    string          `bson:"instance_type" json:"instance_type"`
	Host            string          `bson:"host" json:"host"`
	CPUs            int             `bson:"cores" json:"cores"`
	Apps            []string        `bson:"apps" json:"apps"`
	RegTime         time.Time       `bson:"regtime" json:"regtime"`
	Serve_time      string          `bson:"serve_time" json:"serve_time"`
	Idle_time       int             `bson:"idle_time" json:"idle_time"`
	Status          string          `bson:"Status" json:"Status"`
	Total_checkout  int             `bson:"total_checkout" json:"total_checkout"`
	Total_completed int             `bson:"total_completed" json:"total_completed"`
	Total_failed    int             `bson:"total_failed" json:"total_failed"`
	Current_work    map[string]bool `bson:"current_work" json:"current_work"`
	Skip_work       []string        `bson:"skip_work" json:"skip_work"`
	Last_failed     int             `bson:"-" json:"-"`
	Tag             bool            `bson:"-" json:"-"`
	Proxy           bool            `bson:"proxy" json:"proxy"`
	SubClients      int             `bson:"subclients" json:"subclients"`
}

func NewClient

func NewClient() (client *Client)

func NewProfileClient

func NewProfileClient(filepath string) (client *Client, err error)

func (*Client) IsBusy

func (cl *Client) IsBusy() bool

type ClientMgr

type ClientMgr interface {
	RegisterNewClient(FormFiles) (*Client, error)
	ClientHeartBeat(string) (HBmsg, error)
	GetClient(string) (*Client, error)
	GetAllClients() []*Client
	DeleteClient(string) (err error)
	SuspendClient(string) (err error)
	ResumeClient(string) (err error)
	ResumeSuspendedClients() (count int)
	SuspendAllClients() (count int)
	ClientChecker()
	UpdateSubClients(id string, count int)
}

type ClientWorkMgr

type ClientWorkMgr interface {
	ClientMgr
	WorkMgr
}

type CoAck

type CoAck struct {
	// contains filtered or unexported fields
}

type CoReq

type CoReq struct {
	// contains filtered or unexported fields
}

type Command

type Command struct {
	Name        string            `bson:"name" json:"name"`
	Args        string            `bson:"args" json:"args"`
	Dockerimage string            `bson:"Dockerimage" json:"Dockerimage"`
	Environ     map[string]string `bson:"environ" json:"environ"`
	Description string            `bson:"description" json:"description"`
	ParsedArgs  []string          `bson:"-" json:"-"`
}

func NewCommand

func NewCommand(name string) *Command

type FormFile

type FormFile struct {
	Name     string
	Path     string
	Checksum map[string]string
}

type FormFiles

type FormFiles map[string]FormFile

type HBmsg

type HBmsg map[string]string //map[op]obj1,obj2 e.g. map[discard]=work1,work2

heartbeat response from awe-server to awe-client used for issue operation request to client, e.g. discard suspended workunits

type IO

type IO struct {
	Name          string `bson:"name" json:"-"`
	Directory     string `bson:"directory" json:"directory"`
	Host          string `bson:"host" json:"host"`
	Node          string `bson:"node" json:"node"`
	Url           string `bson:"url"  json:"url"`
	Size          int64  `bson:"size" json:"size"`
	MD5           string `bson:"md5" json:"-"`
	Cache         bool   `bson:"cache" json:"-"`
	Origin        string `bson:"origin" json:"origin"`
	Path          string `bson:"path" json:"-"`
	Optional      bool   `bson:"optional" json:"-"`
	Nonzero       bool   `bson:"nonzero"  json:"nonzero"`
	DataToken     string `bson:"datatoken"  json:"-"`
	Intermediate  bool   `bson:"Intermediate"  json:"-"`
	ShockFilename string `bson:"shockfilename" json:"shockfilename"`
	AttrFile      string `bson:"attrfile" json:"attrfile"`
}

func NewIO

func NewIO() *IO

func (*IO) DataUrl

func (io *IO) DataUrl() string

func (*IO) GetFileSize

func (io *IO) GetFileSize() int64

func (*IO) GetIndexInfo

func (io *IO) GetIndexInfo() (idxinfo map[string]IdxInfo, err error)

func (*IO) GetIndexUnits

func (io *IO) GetIndexUnits(indextype string) (totalunits int, err error)

func (*IO) GetShockNode

func (io *IO) GetShockNode() (node *ShockNode, err error)

func (*IO) TotalUnits

func (io *IO) TotalUnits(indextype string) (count int, err error)

type IOmap

type IOmap map[string]*IO // [filename]attributes

func NewIOmap

func NewIOmap() IOmap

func (IOmap) Add

func (i IOmap) Add(name string, host string, node string, md5 string, cache bool)

func (IOmap) Find

func (i IOmap) Find(name string) *IO

func (IOmap) Has

func (i IOmap) Has(name string) bool

type IdxInfo

type IdxInfo struct {
	Type        string `bson:"index_type" json:"-"`
	TotalUnits  int64  `bson:"total_units" json:"total_units"`
	AvgUnitSize int64  `bson:"average_unit_size" json:"average_unit_size"`
}

type Info

type Info struct {
	Name          string    `bson:"name" json:"name"`
	Xref          string    `bson:"xref" json:"xref"`
	Project       string    `bson:"project" json:"project"`
	User          string    `bson:"user" json:"user"`
	Pipeline      string    `bson:"pipeline" json:"pipeline"`
	ClientGroups  string    `bson:"clientgroups" json:"clientgroups"`
	SubmitTime    time.Time `bson:"submittime" json:"submittime"`
	StartedTme    time.Time `bson:"startedtime" json:"startedtime"`
	CompletedTime time.Time `bson:"completedtime" json:"completedtime"`
	Priority      int       `bson:"priority" json:"-"`
	Auth          bool      `bson:"auth" json:"auth"`
	DataToken     string    `bson:"datatoken" json:"-"`
	NoRetry       bool      `bson:"noretry" json:"noretry"`
}

job info

func NewInfo

func NewInfo() *Info

type Job

type Job struct {
	Id          string    `bson:"id" json:"id"`
	Jid         string    `bson:"jid" json:"jid"`
	Info        *Info     `bson:"info" json:"info"`
	Tasks       []*Task   `bson:"tasks" json:"tasks"`
	Script      script    `bson:"script" json:"-"`
	State       string    `bson:"state" json:"state"`
	Registered  bool      `bson:"-" json:"registered"` //job in memory (not only in mongodb) (assigned on the fly, db value meaningless)
	RemainTasks int       `bson:"remaintasks" json:"remaintasks"`
	UpdateTime  time.Time `bson:"updatetime" json:"updatetime"`
	Notes       string    `bson:"notes" json:"notes"`
	Resumed     int       `bson:"resumed" json:"resumed"` //number of times the job has been resumed from suspension
}

func AwfToJob

func AwfToJob(awf *Workflow, jid string) (job *Job, err error)

func CreateJobUpload

func CreateJobUpload(params map[string]string, files FormFiles, jid string) (job *Job, err error)

func LoadJob

func LoadJob(id string) (job *Job, err error)

func ParseAwf

func ParseAwf(filename string, jid string) (job *Job, err error)

parse .awf.json - sudo-function only, to be finished

func ParseJobTasks

func ParseJobTasks(filename string, jid string) (job *Job, err error)

parse job by job script

func (*Job) FilePath

func (job *Job) FilePath() string

func (*Job) GetDataToken

func (job *Job) GetDataToken() (token string)

func (*Job) Mkdir

func (job *Job) Mkdir() (err error)

func (*Job) NumTask

func (job *Job) NumTask() int

func (*Job) Path

func (job *Job) Path() string

---Path functions

func (*Job) Save

func (job *Job) Save() (err error)

func (*Job) SetDataToken

func (job *Job) SetDataToken(token string)

set token

func (*Job) SetFile

func (job *Job) SetFile(file FormFile) (err error)

func (*Job) TaskList

func (job *Job) TaskList() []*Task

---Task functions

func (*Job) UpdateFile

func (job *Job) UpdateFile(params map[string]string, files FormFiles) (err error)

---Script upload

func (*Job) UpdateState

func (job *Job) UpdateState(newState string, notes string) (err error)

---Field update functions

func (*Job) UpdateTask

func (job *Job) UpdateTask(task *Task) (remainTasks int, err error)

invoked to modify job info in mongodb when a task in that job changed to the new status

type JobMgr

type JobMgr interface {
	JobRegister() (string, error)
	EnqueueTasksByJobId(string, []*Task) error
	GetActiveJobs() map[string]*JobPerf
	IsJobRegistered(string) bool
	GetSuspendJobs() map[string]bool
	SuspendJob(string, string) error
	ResumeSuspendedJob(string) error
	ResumeSuspendedJobs() int
	ResubmitJob(string) error
	DeleteJob(string) error
	DeleteSuspendedJobs() int
	DeleteZombieJobs() int
	InitMaxJid() error
	RecoverJobs() error
	FinalizeWorkPerf(string, string) error
	SaveStdLog(string, string, string) error
	GetReportMsg(string, string) (string, error)
	RecomputeJob(string, string) error
	UpdateGroup(string, string) error
}

type JobPerf

type JobPerf struct {
	Id     string               `bson:"id" json:"id"`
	Queued int64                `bson:"queued" json:"queued"`
	Start  int64                `bson:"start" json:"start"`
	End    int64                `bson:"end" json:"end"`
	Resp   int64                `bson:"resp" json:"resp"` //End - Queued
	Ptasks map[string]*TaskPerf `bson:"task_stats" json:"task_stats"`
	Pworks map[string]*WorkPerf `bson:"work_stats" json:"work_stats"`
}

func LoadJobPerf

func LoadJobPerf(id string) (perf *JobPerf, err error)

func NewJobPerf

func NewJobPerf(id string) *JobPerf

type Jobs

type Jobs []Job

Job array type

func (*Jobs) GetAll

func (n *Jobs) GetAll(q bson.M) (err error)

func (*Jobs) GetAllLimitOffset

func (n *Jobs) GetAllLimitOffset(q bson.M, limit int, offset int) (err error)

func (*Jobs) GetAllRecent

func (n *Jobs) GetAllRecent(q bson.M, recent int) (count int, err error)

func (*Jobs) GetJobAt

func (n *Jobs) GetJobAt(index int) Job

func (*Jobs) GetPaginated

func (n *Jobs) GetPaginated(q bson.M, limit int, offset int) (count int, err error)

func (*Jobs) Length

func (n *Jobs) Length() int

type Notice

type Notice struct {
	WorkId      string
	Status      string
	ClientId    string
	ComputeTime int
	Notes       string
}

type Opts

type Opts map[string]string

func (*Opts) HasKey

func (o *Opts) HasKey(key string) bool

func (*Opts) Value

func (o *Opts) Value(key string) string

type PartInfo

type PartInfo struct {
	Input         string `bson:"input" json:"input"`
	Index         string `bson:"index" json:"index"`
	TotalIndex    int    `bson:"totalindex" json:"totalindex"`
	MaxPartSizeMB int    `bson:"maxpartsize_mb" json:"maxpartsize_mb"`
	Options       string `bson:"options" json:"-"`
}

type Pipeline

type Pipeline struct {
	Info  *Info  `bson:"info" json:"info"`
	Tasks []Task `bson:"tasks" json:"tasks"`
}

func NewPipeline

func NewPipeline() *Pipeline

type ProxyMgr

type ProxyMgr struct {
	CQMgr
}

func NewProxyMgr

func NewProxyMgr() *ProxyMgr

func (*ProxyMgr) ClientChecker

func (qm *ProxyMgr) ClientChecker()

func (*ProxyMgr) DeleteJob

func (qm *ProxyMgr) DeleteJob(jobid string) (err error)

func (*ProxyMgr) DeleteSuspendedJobs

func (qm *ProxyMgr) DeleteSuspendedJobs() (num int)

func (*ProxyMgr) DeleteZombieJobs

func (qm *ProxyMgr) DeleteZombieJobs() (num int)

func (*ProxyMgr) EnqueueTasksByJobId

func (qm *ProxyMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)

func (*ProxyMgr) FetchDataToken

func (qm *ProxyMgr) FetchDataToken(workid string, clientid string) (token string, err error)

func (*ProxyMgr) FinalizeWorkPerf

func (qm *ProxyMgr) FinalizeWorkPerf(string, string) (err error)

func (*ProxyMgr) GetActiveJobs

func (qm *ProxyMgr) GetActiveJobs() map[string]*JobPerf

func (*ProxyMgr) GetReportMsg

func (qm *ProxyMgr) GetReportMsg(string, string) (report string, err error)

func (*ProxyMgr) GetSuspendJobs

func (qm *ProxyMgr) GetSuspendJobs() map[string]bool

func (*ProxyMgr) Handle

func (qm *ProxyMgr) Handle()

func (*ProxyMgr) InitMaxJid

func (qm *ProxyMgr) InitMaxJid() (err error)

func (*ProxyMgr) IsJobRegistered

func (qm *ProxyMgr) IsJobRegistered(id string) bool

func (*ProxyMgr) JobRegister

func (qm *ProxyMgr) JobRegister() (jid string, err error)

func (*ProxyMgr) RecomputeJob

func (qm *ProxyMgr) RecomputeJob(jobid string, stage string) (err error)

recompute jobs from specified task stage

func (*ProxyMgr) RecoverJobs

func (qm *ProxyMgr) RecoverJobs() (err error)

recover jobs not completed before awe-server restarts

func (*ProxyMgr) RegisterNewClient

func (qm *ProxyMgr) RegisterNewClient(files FormFiles) (client *Client, err error)

func (*ProxyMgr) ResubmitJob

func (qm *ProxyMgr) ResubmitJob(id string) (err error)

re-submit a job in db but not in the queue (caused by server restarting)

func (*ProxyMgr) ResumeSuspendedJob

func (qm *ProxyMgr) ResumeSuspendedJob(id string) (err error)

resubmit a suspended job

func (*ProxyMgr) ResumeSuspendedJobs

func (qm *ProxyMgr) ResumeSuspendedJobs() (num int)

func (*ProxyMgr) SaveStdLog

func (qm *ProxyMgr) SaveStdLog(string, string, string) (err error)

func (*ProxyMgr) ShowStatus

func (qm *ProxyMgr) ShowStatus() string

func (*ProxyMgr) SuspendJob

func (qm *ProxyMgr) SuspendJob(jobid string, reason string) (err error)

func (*ProxyMgr) Timer

func (qm *ProxyMgr) Timer()

func (*ProxyMgr) UpdateGroup

func (qm *ProxyMgr) UpdateGroup(jobid string, newgroup string) (err error)

type ResourceMgr

type ResourceMgr interface {
	ClientWorkMgr
	JobMgr
	Handle()
	ShowStatus() string
	Timer()
}

type ServerMgr

type ServerMgr struct {
	CQMgr
	// contains filtered or unexported fields
}

func NewServerMgr

func NewServerMgr() *ServerMgr

func (*ServerMgr) CreateJobPerf

func (qm *ServerMgr) CreateJobPerf(jobid string)

---perf related methods

func (*ServerMgr) CreateTaskPerf

func (qm *ServerMgr) CreateTaskPerf(taskid string)

func (*ServerMgr) CreateWorkPerf

func (qm *ServerMgr) CreateWorkPerf(workid string)

func (*ServerMgr) DeleteJob

func (qm *ServerMgr) DeleteJob(jobid string) (err error)

func (*ServerMgr) DeleteJobPerf

func (qm *ServerMgr) DeleteJobPerf(jobid string)

func (*ServerMgr) DeleteSuspendedJobs

func (qm *ServerMgr) DeleteSuspendedJobs() (num int)

func (*ServerMgr) DeleteZombieJobs

func (qm *ServerMgr) DeleteZombieJobs() (num int)

delete jobs in db with "queued" or "in-progress" state but not in the queue (zombie jobs)

func (*ServerMgr) EnqueueTasksByJobId

func (qm *ServerMgr) EnqueueTasksByJobId(jobid string, tasks []*Task) (err error)

func (*ServerMgr) FetchDataToken

func (qm *ServerMgr) FetchDataToken(workid string, clientid string) (token string, err error)

--workunit methds (servermgr implementation)

func (*ServerMgr) FinalizeJobPerf

func (qm *ServerMgr) FinalizeJobPerf(jobid string)

func (*ServerMgr) FinalizeTaskPerf

func (qm *ServerMgr) FinalizeTaskPerf(taskid string)

func (*ServerMgr) FinalizeWorkPerf

func (qm *ServerMgr) FinalizeWorkPerf(workid string, reportfile string) (err error)

func (*ServerMgr) GetActiveJobs

func (qm *ServerMgr) GetActiveJobs() map[string]*JobPerf

func (*ServerMgr) GetReportMsg

func (qm *ServerMgr) GetReportMsg(workid string, logname string) (report string, err error)

func (*ServerMgr) GetSuspendJobs

func (qm *ServerMgr) GetSuspendJobs() map[string]bool

func (*ServerMgr) Handle

func (qm *ServerMgr) Handle()

func (*ServerMgr) InitMaxJid

func (qm *ServerMgr) InitMaxJid() (err error)

func (*ServerMgr) IsJobRegistered

func (qm *ServerMgr) IsJobRegistered(id string) bool

func (*ServerMgr) JobRegister

func (qm *ServerMgr) JobRegister() (jid string, err error)

---job methods---

func (*ServerMgr) LogJobPerf

func (qm *ServerMgr) LogJobPerf(jobid string)

func (*ServerMgr) RecomputeJob

func (qm *ServerMgr) RecomputeJob(jobid string, stage string) (err error)

recompute jobs from specified task stage

func (*ServerMgr) RecoverJobs

func (qm *ServerMgr) RecoverJobs() (err error)

recover jobs not completed before awe-server restarts

func (*ServerMgr) ResubmitJob

func (qm *ServerMgr) ResubmitJob(id string) (err error)

re-submit a job in db but not in the queue (caused by server restarting)

func (*ServerMgr) ResumeSuspendedJob

func (qm *ServerMgr) ResumeSuspendedJob(id string) (err error)

resubmit a suspended job

func (*ServerMgr) ResumeSuspendedJobs

func (qm *ServerMgr) ResumeSuspendedJobs() (num int)

func (*ServerMgr) SaveStdLog

func (qm *ServerMgr) SaveStdLog(workid string, logname string, tmppath string) (err error)

func (*ServerMgr) ShowStatus

func (qm *ServerMgr) ShowStatus() string

func (*ServerMgr) ShowTasks

func (qm *ServerMgr) ShowTasks()

func (*ServerMgr) SuspendJob

func (qm *ServerMgr) SuspendJob(jobid string, reason string) (err error)

func (*ServerMgr) Timer

func (qm *ServerMgr) Timer()

func (*ServerMgr) UpdateGroup

func (qm *ServerMgr) UpdateGroup(jobid string, newgroup string) (err error)

update job group

func (*ServerMgr) UpdateJobPerfStartTime

func (qm *ServerMgr) UpdateJobPerfStartTime(jobid string)

func (*ServerMgr) UpdateJobTaskToInProgress

func (qm *ServerMgr) UpdateJobTaskToInProgress(works []*Workunit)

update job/task states from "queued" to "in-progress" once the first workunit is checked out

func (*ServerMgr) UpdateTaskPerfStartTime

func (qm *ServerMgr) UpdateTaskPerfStartTime(taskid string)

type ShockClient

type ShockClient struct {
	Host  string
	Token string
}

TODO use Token

func (*ShockClient) Get_node_download_url

func (sc *ShockClient) Get_node_download_url(node ShockNode) (download_url string, err error)

func (*ShockClient) Get_request

func (sc *ShockClient) Get_request(resource string, query url.Values, response interface{}) (err error)

func (*ShockClient) Query

func (sc *ShockClient) Query(query url.Values) (sqr_p *ShockQueryResponse, err error)

example: query_response_p, err := sc.Shock_query(host, url.Values{"docker": {"1"}, "docker_image_name" : {"wgerlach/bowtie2:2.2.0"}});

type ShockNode

type ShockNode struct {
	Id         string             `bson:"id" json:"id"`
	Version    string             `bson:"version" json:"version"`
	File       shockfile          `bson:"file" json:"file"`
	Attributes interface{}        `bson:"attributes" json:"attributes"`
	Indexes    map[string]IdxInfo `bson:"indexes" json:"indexes"`
	//Acl          Acl                `bson:"acl" json:"-"`
	VersionParts map[string]string `bson:"version_parts" json:"-"`
	Tags         []string          `bson:"tags" json:"tags"`
	//	Revisions    []ShockNode       `bson:"revisions" json:"-"`
	Linkages []linkage `bson:"linkage" json:"linkages"`
}

func ShockGet

func ShockGet(host string, nodeid string, token string) (node *ShockNode, err error)

type ShockQueryResponse

type ShockQueryResponse struct {
	Code       int         `bson:"status" json:"status"`
	Data       []ShockNode `bson:"data" json:"data"`
	Errs       []string    `bson:"error" json:"error"`
	Limit      int         `bson:"limit" json:"limit"`
	Offset     int         `bson:"offset" json:"offset"`
	TotalCount int         `bson:"total_count" json:"total_count"`
}

type ShockResponse

type ShockResponse struct {
	Code int       `bson:"status" json:"status"`
	Data ShockNode `bson:"data" json:"data"`
	Errs []string  `bson:"error" json:"error"`
}

type Task

type Task struct {
	Id            string    `bson:"taskid" json:"taskid"`
	Info          *Info     `bson:"info" json:"-"`
	Inputs        IOmap     `bson:"inputs" json:"inputs"`
	Outputs       IOmap     `bson:"outputs" json:"outputs"`
	Predata       IOmap     `bson:"predata" json:"predata"`
	Cmd           *Command  `bson:"cmd" json:"cmd"`
	Partition     *PartInfo `bson:"partinfo" json:"-"`
	DependsOn     []string  `bson:"dependsOn" json:"dependsOn"`
	TotalWork     int       `bson:"totalwork" json:"totalwork"`
	MaxWorkSize   int       `bson:"maxworksize"   json:"maxworksize"`
	RemainWork    int       `bson:"remainwork" json:"remainwork"`
	WorkStatus    []string  `bson:"workstatus" json:"-"`
	State         string    `bson:"state" json:"state"`
	Skip          int       `bson:"skip" json:"-"`
	CreatedDate   time.Time `bson:"createdDate" json:"createddate"`
	StartedDate   time.Time `bson:"startedDate" json:"starteddate"`
	CompletedDate time.Time `bson:"completedDate" json:"completeddate"`
	ComputeTime   int       `bson:"computetime" json:"computetime"`
}

func NewTask

func NewTask(job *Job, rank int) *Task

func (*Task) InitPartIndex

func (task *Task) InitPartIndex() (err error)

get part size based on partition/index info if fail to get index info, task.TotalWork fall back to 1 and return nil

func (*Task) InitTask

func (task *Task) InitTask(job *Job, rank int) (err error)

fill some info (lacked in input json) for a task

func (*Task) ParseWorkunit

func (task *Task) ParseWorkunit() (wus []*Workunit, err error)

func (*Task) Skippable

func (task *Task) Skippable() bool

func (*Task) UpdateState

func (task *Task) UpdateState(newState string) string

type TaskPerf

type TaskPerf struct {
	Queued       int64   `bson:"queued" json:"queued"`
	Start        int64   `bson:"start" json:"start"`
	End          int64   `bson:"end" json:"end"`
	Resp         int64   `bson:"resp" json:"resp"` //End -Queued
	InFileSizes  []int64 `bson:"size_infile" json:"size_infile"`
	OutFileSizes []int64 `bson:"size_outfile" json:"size_outfile"`
}

func NewTaskPerf

func NewTaskPerf(id string) *TaskPerf

type WQueue

type WQueue struct {
	// contains filtered or unexported fields
}

func NewWQueue

func NewWQueue() *WQueue

func (*WQueue) Add

func (wq *WQueue) Add(workunit *Workunit) (err error)

func (*WQueue) Delete

func (wq *WQueue) Delete(id string)

func (*WQueue) Get

func (wq *WQueue) Get(id string) (work *Workunit, ok bool)

func (*WQueue) Has

func (wq *WQueue) Has(id string) (has bool)

func (WQueue) Len

func (wq WQueue) Len() int

func (*WQueue) StatusChange

func (wq *WQueue) StatusChange(id string, new_status string) (err error)

type WorkList

type WorkList []*Workunit

queuing/prioritizing related functions

func (WorkList) Len

func (wl WorkList) Len() int

func (WorkList) Swap

func (wl WorkList) Swap(i, j int)

type WorkMgr

type WorkMgr interface {
	GetWorkById(string) (*Workunit, error)
	ShowWorkunits(string) []*Workunit
	CheckoutWorkunits(string, string, int) ([]*Workunit, error)
	NotifyWorkStatus(Notice)
	EnqueueWorkunit(*Workunit) error
	FetchDataToken(string, string) (string, error)
}

type WorkPerf

type WorkPerf struct {
	Queued      int64   `bson:"queued" json:"queued"`                   // WQ (queued at server or client, depending on who creates it)
	Done        int64   `bson:"done" json:"done"`                       // WD (done at server)
	Resp        int64   `bson:"resp" json:"resp"`                       // Done - Queued (server metric)
	Checkout    int64   `bson:"checkout" json:"checkout"`               // checkout at client
	Deliver     int64   `bson:"deliver" json:"deliver"`                 // done at client
	ClientResp  int64   `bson:"clientresp" json:"clientresp"`           // Deliver - Checkout (client metric)
	PreDataIn   float64 `bson:"time_predata_in" json:"time_predata_in"` // time in seconds for downloading prerequisite data at client
	DataIn      float64 `bson:"time_data_in" json:"time_data_in"`       // time in seconds for input data move-in at client
	DataOut     float64 `bson:"time_data_out" json:"time_data_out"`     // time in seconds for output data move-out at client
	Runtime     int64   `bson:"runtime" json:"runtime"`                 // time in seconds for computation at client
	MaxMemUsage uint64  `bson:"max_mem_usage" json:"max_mem_usage"`     // maxium memery consumption
	ClientId    string  `bson:"client_id" json:"client_id"`
	PreDataSize int64   `bson:"size_predata" json:"size_predata"` //predata moved over network
	InFileSize  int64   `bson:"size_infile" json:"size_infile"`   //input file moved over network
	OutFileSize int64   `bson:"size_outfile" json:"size_outfile"` //outpuf file moved over network
}

func NewWorkPerf

func NewWorkPerf(id string) *WorkPerf

type Workflow

type Workflow struct {
	WfInfo     awf_info          `bson:"workflow_info" json:"workflow_info"`
	JobInfo    awf_jobinfo       `bson:"job_info" json:"job_info"`
	RawInputs  map[string]string `bson:"raw_inputs" json:"raw_inputs"`
	Variables  map[string]string `bson:"variables" json:"variables"`
	DataServer string            `bson:"data_server" json:"data_server"`
	Tasks      []*awf_task       `bson:"tasks" json:"tasks"`
}

type WorkflowMgr

type WorkflowMgr struct {
	// contains filtered or unexported fields
}
var (
	AwfMgr *WorkflowMgr
)

func NewWorkflowMgr

func NewWorkflowMgr() *WorkflowMgr

func (*WorkflowMgr) AddWorkflow

func (wfm *WorkflowMgr) AddWorkflow(name string, awf *Workflow)

func (*WorkflowMgr) GetAllWorkflows

func (wfm *WorkflowMgr) GetAllWorkflows() (workflows []*Workflow)

func (*WorkflowMgr) GetWorkflow

func (wfm *WorkflowMgr) GetWorkflow(name string) (awf *Workflow, err error)

func (*WorkflowMgr) LoadWorkflows

func (wfm *WorkflowMgr) LoadWorkflows() (err error)

type Workunit

type Workunit struct {
	Id           string    `bson:"wuid" json:"wuid"`
	Info         *Info     `bson:"info" json:"info"`
	Inputs       IOmap     `bson:"inputs" json:"inputs"`
	Outputs      IOmap     `bson:"outputs" json:"outputs"`
	Predata      IOmap     `bson:"predata" json:"predata"`
	Cmd          *Command  `bson:"cmd" json:"cmd"`
	Rank         int       `bson:"rank" json:"rank"`
	TotalWork    int       `bson:"totalwork" json:"totalwork"`
	Partition    *PartInfo `bson:"part" json:"part"`
	State        string    `bson:"state" json:"state"`
	Failed       int       `bson:"failed" json:"failed"`
	CheckoutTime time.Time `bson:"checkout_time" json:"checkout_time"`
	Client       string    `bson:"client" json:"client"`
	ComputeTime  int       `bson:"computetime" json:"computetime"`
	Notes        string    `bson:"-" json:"-"`
}

func NewWorkunit

func NewWorkunit(task *Task, rank int) *Workunit

func (*Workunit) CDworkpath

func (work *Workunit) CDworkpath() (err error)

func (*Workunit) IndexType

func (work *Workunit) IndexType() (indextype string)

func (*Workunit) Mkdir

func (work *Workunit) Mkdir() (err error)

func (*Workunit) Part

func (work *Workunit) Part() (part string)

calculate the range of data part algorithm: try to evenly distribute indexed parts to workunits e.g. totalWork=4, totalParts=10, then each workunits have parts 3,3,2,2

func (*Workunit) Path

func (work *Workunit) Path() string

func (*Workunit) RemoveDir

func (work *Workunit) RemoveDir() (err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL