Documentation ¶
Index ¶
- Constants
- func AddHostInfo(machine drmaa2interface.Machine, hostInfo *host.InfoStat) drmaa2interface.Machine
- func AddMemory(machine drmaa2interface.Machine, mem *mem.VirtualMemoryStat) drmaa2interface.Machine
- func CollectSocketCoreThreads(cpuInfo []cpu.InfoStat) (int64, int64, int64, error)
- func GetAllProcesses() ([]string, error)
- func GetJobInfo(id int32) (drmaa2interface.JobInfo, error)
- func GetLocalMachineInfo() (drmaa2interface.Machine, error)
- func GetNextJobID() string
- func IsPidRunning(pid int) (bool, error)
- func KillPid(pid int) error
- func NewAllocator() *allocator
- func NewJobID() *lastJobID
- func ProcessToJobInfo(proc *process.Process) drmaa2interface.JobInfo
- func ResumePid(pid int) error
- func SetJobID(jobid int64)
- func StartProcess(jobid string, task int, t drmaa2interface.JobTemplate, ...) (int, error)
- func SuspendPid(pid int) error
- func TrackProcess(cmd *exec.Cmd, proc *os.Process, jobID string, startTime time.Time, ...)
- type InternalJob
- type JobEvent
- type JobStore
- func (js *JobStore) GetArrayJobTaskIDs(arrayjobID string) []string
- func (js *JobStore) GetJobIDs() []string
- func (js *JobStore) GetJobInfo(jobid string) (drmaa2interface.JobInfo, error)
- func (js *JobStore) GetJobTemplate(jobID string) (drmaa2interface.JobTemplate, error)
- func (js *JobStore) GetPID(jobid string) (int, error)
- func (js *JobStore) HasJob(jobid string) bool
- func (js *JobStore) IsArrayJob(jobid string) bool
- func (js *JobStore) NewJobID() string
- func (js *JobStore) RemoveJob(jobid string)
- func (js *JobStore) SaveArrayJob(arrayjobid string, pids []int, t drmaa2interface.JobTemplate, ...)
- func (js *JobStore) SaveArrayJobPID(arrayjobid string, taskid, pid int) error
- func (js *JobStore) SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int)
- func (js *JobStore) SaveJobInfo(jobid string, jobInfo drmaa2interface.JobInfo) error
- type JobStorer
- type JobTracker
- func (jt *JobTracker) AddArrayJob(t drmaa2interface.JobTemplate, begin, end, step, maxParallel int) (string, error)
- func (jt *JobTracker) AddJob(t drmaa2interface.JobTemplate) (string, error)
- func (jt *JobTracker) Close() error
- func (m *JobTracker) CloseMonitoringSession(name string) error
- func (jt *JobTracker) DeleteJob(jobid string) error
- func (jt *JobTracker) Destroy() error
- func (m *JobTracker) GetAllJobIDs(filter *drmaa2interface.JobInfo) ([]string, error)
- func (m *JobTracker) GetAllMachines(names []string) ([]drmaa2interface.Machine, error)
- func (m *JobTracker) GetAllQueueNames(names []string) ([]string, error)
- func (jt *JobTracker) JobControl(jobid, state string) error
- func (jt *JobTracker) JobInfo(jobid string) (drmaa2interface.JobInfo, error)
- func (m *JobTracker) JobInfoFromMonitor(id string) (drmaa2interface.JobInfo, error)
- func (jt *JobTracker) JobState(jobid string) (drmaa2interface.JobState, string, error)
- func (jt *JobTracker) JobTemplate(jobID string) (drmaa2interface.JobTemplate, error)
- func (jt *JobTracker) ListArrayJobs(id string) ([]string, error)
- func (jt *JobTracker) ListJobCategories() ([]string, error)
- func (jt *JobTracker) ListJobs() ([]string, error)
- func (m *JobTracker) OpenMonitoringSession(name string) error
- func (jt *JobTracker) Wait(jobid string, d time.Duration, state ...drmaa2interface.JobState) error
- type PersistentJobStorage
- func (js *PersistentJobStorage) Close() error
- func (js *PersistentJobStorage) GetArrayJobTaskIDs(arrayjobID string) []string
- func (js *PersistentJobStorage) GetJobIDs() []string
- func (js *PersistentJobStorage) GetJobInfo(jobid string) (drmaa2interface.JobInfo, error)
- func (js *PersistentJobStorage) GetJobTemplate(jobid string) (drmaa2interface.JobTemplate, error)
- func (js *PersistentJobStorage) GetPID(jobid string) (int, error)
- func (js *PersistentJobStorage) HasJob(jobid string) bool
- func (js *PersistentJobStorage) IsArrayJob(jobid string) bool
- func (js *PersistentJobStorage) NewJobID() string
- func (js *PersistentJobStorage) RemoveJob(jobid string)
- func (js *PersistentJobStorage) SaveArrayJob(arrayjobid string, pids []int, t drmaa2interface.JobTemplate, ...)
- func (js *PersistentJobStorage) SaveArrayJobPID(arrayjobid string, taskid, pid int) error
- func (js *PersistentJobStorage) SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int)
- func (js *PersistentJobStorage) SaveJobInfo(jobid string, jobinfo drmaa2interface.JobInfo) error
- type PubSub
- func (ps *PubSub) GetJobInfo(jobID string) (drmaa2interface.JobInfo, error)
- func (ps *PubSub) NotifyAndWait(evt JobEvent)
- func (ps *PubSub) Register(jobid string, states ...drmaa2interface.JobState) (chan drmaa2interface.JobState, error)
- func (ps *PubSub) StartBookKeeper()
- func (ps *PubSub) Unregister(jobid string)
- type SimpleTrackerInitParams
- type StoreCloser
Constants ¶
const HighestJobIDStorageKey string = "HighestJobIDStorageKey"
const IsArrayJobStorageKey string = "IsArrayJobStorageKey"
const JobIDsStorageKey string = "JobIDsStorageKey"
const JobInfoStorageKey string = "JobInfoStorageKey"
const JobStorageKey string = "JobStorageKey"
const JobTemplatesStorageKey string = "JobTemplatesStorageKey"
Variables ¶
This section is empty.
Functions ¶
func AddHostInfo ¶ added in v0.3.16
func AddHostInfo(machine drmaa2interface.Machine, hostInfo *host.InfoStat) drmaa2interface.Machine
func AddMemory ¶ added in v0.3.16
func AddMemory(machine drmaa2interface.Machine, mem *mem.VirtualMemoryStat) drmaa2interface.Machine
func CollectSocketCoreThreads ¶ added in v0.3.16
CollectSocketCoreThreads returns the amount of sockets, cores per socket, and threads per core.
func GetAllProcesses ¶ added in v0.3.16
func GetJobInfo ¶ added in v0.3.16
func GetJobInfo(id int32) (drmaa2interface.JobInfo, error)
func GetLocalMachineInfo ¶ added in v0.3.16
func GetLocalMachineInfo() (drmaa2interface.Machine, error)
GetLocalMachineInfo collects information about the local machine and returns a current DRMAA2 machine info struct.
func GetNextJobID ¶
func GetNextJobID() string
func IsPidRunning ¶ added in v0.3.14
IsPidRunning returns true if the process is still alive.
func NewAllocator ¶ added in v0.3.0
func NewAllocator() *allocator
func ProcessToJobInfo ¶ added in v0.3.16
func ProcessToJobInfo(proc *process.Process) drmaa2interface.JobInfo
func StartProcess ¶
func StartProcess(jobid string, task int, t drmaa2interface.JobTemplate, finishedJobChannel chan JobEvent) (int, error)
StartProcess creates a new process based on the JobTemplate. It returns the PID or 0 and an error if the process could be created. The given channel is used for communicating back when the job state changed.
func SuspendPid ¶
SuspendPid stops a process group from its execution. Note that it sends SIGTSTP which can be caught by the application and hence could be ignored.
func TrackProcess ¶
func TrackProcess(cmd *exec.Cmd, proc *os.Process, jobID string, startTime time.Time, finishedJobChannel chan JobEvent, waitForFiles int, waitCh chan bool)
TrackProcess supervises a running process and sends a notification when the process is finished. If the process was started from this process cmd is given otherwise when when re-attaching to an already existing process proc is given.
Types ¶
type InternalJob ¶
type InternalJob struct { TaskID int State drmaa2interface.JobState PID int }
InternalJob represents a process.
type JobEvent ¶
type JobEvent struct { JobID string JobState drmaa2interface.JobState JobInfo drmaa2interface.JobInfo // contains filtered or unexported fields }
JobEvent is send whenever a job status change is happening to inform all registered listeners.
type JobStore ¶
JobStore is an internal storage for jobs and job templates processed by the job tracker. Jobs are stored until Reap(). Locking must be done externally.
func NewJobStore ¶
func NewJobStore() *JobStore
NewJobStore returns a new in memory job store for jobs.
func (*JobStore) GetArrayJobTaskIDs ¶ added in v0.2.6
GetArrayJobTaskIDs returns the IDs of all tasks of a job array.
func (*JobStore) GetJobInfo ¶ added in v0.3.16
func (js *JobStore) GetJobInfo(jobid string) (drmaa2interface.JobInfo, error)
func (*JobStore) GetJobTemplate ¶ added in v0.3.14
func (js *JobStore) GetJobTemplate(jobID string) (drmaa2interface.JobTemplate, error)
func (*JobStore) GetPID ¶
GetPID returns the PID of a job or an array job task. It returns -1 and an error if the job is not known.
func (*JobStore) IsArrayJob ¶ added in v0.3.14
func (*JobStore) RemoveJob ¶ added in v0.2.0
RemoveJob deletes all occurrences of a job within the job storage. The jobid can be the identifier of a job or a job array. In case of a job array it removes all tasks which belong to the array job.
func (*JobStore) SaveArrayJob ¶
func (js *JobStore) SaveArrayJob(arrayjobid string, pids []int, t drmaa2interface.JobTemplate, begin, end, step int)
SaveArrayJob stores all process IDs of the tasks of an array job.
func (*JobStore) SaveArrayJobPID ¶ added in v0.2.0
SaveArrayJobPID stores the current PID of main process of the job array task.
func (*JobStore) SaveJob ¶
func (js *JobStore) SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int)
SaveJob stores a job, the job submission template, and the process PID of the job in an internal job store.
func (*JobStore) SaveJobInfo ¶ added in v0.3.16
func (js *JobStore) SaveJobInfo(jobid string, jobInfo drmaa2interface.JobInfo) error
type JobStorer ¶ added in v0.3.13
type JobStorer interface { SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int) HasJob(jobid string) bool RemoveJob(jobid string) IsArrayJob(jobid string) bool SaveArrayJob(arrayjobid string, pids []int, t drmaa2interface.JobTemplate, begin, end, step int) SaveArrayJobPID(arrayjobid string, taskid, pid int) error GetPID(jobid string) (int, error) GetJobIDs() []string GetArrayJobTaskIDs(arrayjobID string) []string // NewJobID returns a new unique job ID NewJobID() string // Require job template GetJobTemplate(jobid string) (drmaa2interface.JobTemplate, error) SaveJobInfo(jobid string, jobInfo drmaa2interface.JobInfo) error GetJobInfo(jobid string) (drmaa2interface.JobInfo, error) }
JobStorer has all methods required for storing job related information.
type JobTracker ¶
JobTracker implements the JobTracker interface and treats jobs as OS processes.
func EnableCheckpointRestart ¶ added in v0.3.16
func EnableCheckpointRestart(jobtracker *JobTracker) *JobTracker
EnableCheckpointRestart turns a job tracker which handles suspend / resume with signals into a job tracker which does suspend and resume with CRIU
func NewWithJobStore ¶ added in v0.3.14
func NewWithJobStore(jobsession string, jobstore JobStorer, persistent bool) (*JobTracker, error)
func (*JobTracker) AddArrayJob ¶
func (jt *JobTracker) AddArrayJob(t drmaa2interface.JobTemplate, begin, end, step, maxParallel int) (string, error)
AddArrayJob starts end-begin/step processes based on the given JobTemplate. Note that maxParallel is not yet implemented.
func (*JobTracker) AddJob ¶
func (jt *JobTracker) AddJob(t drmaa2interface.JobTemplate) (string, error)
AddJob creates a process, fills in the internal job state and saves the job internally.
func (*JobTracker) Close ¶ added in v0.3.14
func (jt *JobTracker) Close() error
Close implmements the jobtracker.Closer interface to disengage from a DB or the DRM when the job session gets closed.
func (*JobTracker) CloseMonitoringSession ¶ added in v0.3.16
func (m *JobTracker) CloseMonitoringSession(name string) error
func (*JobTracker) DeleteJob ¶
func (jt *JobTracker) DeleteJob(jobid string) error
DeleteJob removes a job from the internal job storage but only when the job is in any finished state.
func (*JobTracker) Destroy ¶
func (jt *JobTracker) Destroy() error
Destroy signals the JobTracker to shutdown.
func (*JobTracker) GetAllJobIDs ¶ added in v0.3.16
func (m *JobTracker) GetAllJobIDs(filter *drmaa2interface.JobInfo) ([]string, error)
func (*JobTracker) GetAllMachines ¶ added in v0.3.16
func (m *JobTracker) GetAllMachines(names []string) ([]drmaa2interface.Machine, error)
func (*JobTracker) GetAllQueueNames ¶ added in v0.3.16
func (m *JobTracker) GetAllQueueNames(names []string) ([]string, error)
func (*JobTracker) JobControl ¶
func (jt *JobTracker) JobControl(jobid, state string) error
JobControl suspends, resumes, or terminates a job.
func (*JobTracker) JobInfo ¶
func (jt *JobTracker) JobInfo(jobid string) (drmaa2interface.JobInfo, error)
JobInfo returns more detailed information about a job.
func (*JobTracker) JobInfoFromMonitor ¶ added in v0.3.16
func (m *JobTracker) JobInfoFromMonitor(id string) (drmaa2interface.JobInfo, error)
func (*JobTracker) JobState ¶
func (jt *JobTracker) JobState(jobid string) (drmaa2interface.JobState, string, error)
JobState returns the current state of the job (running, suspended, done, failed).
func (*JobTracker) JobTemplate ¶ added in v0.3.14
func (jt *JobTracker) JobTemplate(jobID string) (drmaa2interface.JobTemplate, error)
JobTemplate returns the stored job template of the job. This job tracker implements the JobTemplater interface additional to the JobTracker interface.
func (*JobTracker) ListArrayJobs ¶
func (jt *JobTracker) ListArrayJobs(id string) ([]string, error)
ListArrayJobs returns all job IDs the job array ID is associated with.
func (*JobTracker) ListJobCategories ¶
func (jt *JobTracker) ListJobCategories() ([]string, error)
ListJobCategories returns an empty list as JobCategories are currently not defined for OS processes.
func (*JobTracker) ListJobs ¶
func (jt *JobTracker) ListJobs() ([]string, error)
ListJobs returns a list of all job IDs stored in the job store.
func (*JobTracker) OpenMonitoringSession ¶ added in v0.3.16
func (m *JobTracker) OpenMonitoringSession(name string) error
func (*JobTracker) Wait ¶
func (jt *JobTracker) Wait(jobid string, d time.Duration, state ...drmaa2interface.JobState) error
Wait blocks until the job with the given job id is in on of the given states. If the job is after the given duration is still not in any of the states the method returns an error. If the duration is 0 then it waits infitely.
type PersistentJobStorage ¶ added in v0.3.13
type PersistentJobStorage struct {
// contains filtered or unexported fields
}
PersistentJobStorage is an internal storage for jobs and job templates processed by the job tracker. Jobs are stored until Reap(). Locking must be done externally.
func NewPersistentJobStore ¶ added in v0.3.13
func NewPersistentJobStore(path string) (*PersistentJobStorage, error)
NewPersistentJobStore returns a new job store which uses a file based DB to be persistent over process restarts. The PersistentJobStore implements the JobStorer interface.
func (*PersistentJobStorage) Close ¶ added in v0.3.14
func (js *PersistentJobStorage) Close() error
func (*PersistentJobStorage) GetArrayJobTaskIDs ¶ added in v0.3.13
func (js *PersistentJobStorage) GetArrayJobTaskIDs(arrayjobID string) []string
GetArrayJobTaskIDs returns the IDs of all tasks of a job array.
func (*PersistentJobStorage) GetJobIDs ¶ added in v0.3.13
func (js *PersistentJobStorage) GetJobIDs() []string
GetJobIDs returns the IDs of all jobs.
func (*PersistentJobStorage) GetJobInfo ¶ added in v0.3.16
func (js *PersistentJobStorage) GetJobInfo(jobid string) (drmaa2interface.JobInfo, error)
func (*PersistentJobStorage) GetJobTemplate ¶ added in v0.3.14
func (js *PersistentJobStorage) GetJobTemplate(jobid string) (drmaa2interface.JobTemplate, error)
func (*PersistentJobStorage) GetPID ¶ added in v0.3.13
func (js *PersistentJobStorage) GetPID(jobid string) (int, error)
GetPID returns the PID of a job or an array job task. It returns -1 and an error if the job is not known.
func (*PersistentJobStorage) HasJob ¶ added in v0.3.13
func (js *PersistentJobStorage) HasJob(jobid string) bool
HasJob returns true if the job is saved in the job store.
func (*PersistentJobStorage) IsArrayJob ¶ added in v0.3.14
func (js *PersistentJobStorage) IsArrayJob(jobid string) bool
func (*PersistentJobStorage) NewJobID ¶ added in v0.3.14
func (js *PersistentJobStorage) NewJobID() string
func (*PersistentJobStorage) RemoveJob ¶ added in v0.3.13
func (js *PersistentJobStorage) RemoveJob(jobid string)
RemoveJob deletes all occurrences of a job within the job storage. The jobid can be the identifier of a job or a job array. In case of a job array it removes all tasks which belong to the array job.
func (*PersistentJobStorage) SaveArrayJob ¶ added in v0.3.13
func (js *PersistentJobStorage) SaveArrayJob(arrayjobid string, pids []int, t drmaa2interface.JobTemplate, begin, end, step int)
SaveArrayJob stores all process IDs of the tasks of an array job.
func (*PersistentJobStorage) SaveArrayJobPID ¶ added in v0.3.13
func (js *PersistentJobStorage) SaveArrayJobPID(arrayjobid string, taskid, pid int) error
SaveArrayJobPID stores the current PID of main process of the job array task.
func (*PersistentJobStorage) SaveJob ¶ added in v0.3.13
func (js *PersistentJobStorage) SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int)
SaveJob stores a job, the job submission template, and the process PID of the job in an internal job store.
func (*PersistentJobStorage) SaveJobInfo ¶ added in v0.3.16
func (js *PersistentJobStorage) SaveJobInfo(jobid string, jobinfo drmaa2interface.JobInfo) error
type PubSub ¶
PubSub distributes job status change events to clients which Register() at PubSub.
func NewPubSub ¶
NewPubSub returns an initialized PubSub structure and the JobEvent channel which is used by the caller to publish job events (i.e. job state transitions).
func (*PubSub) GetJobInfo ¶ added in v0.3.18
func (ps *PubSub) GetJobInfo(jobID string) (drmaa2interface.JobInfo, error)
func (*PubSub) NotifyAndWait ¶ added in v0.2.0
NotifyAndWait sends a job event and waits until it was distributed to all waiting functions.
func (*PubSub) Register ¶
func (ps *PubSub) Register(jobid string, states ...drmaa2interface.JobState) (chan drmaa2interface.JobState, error)
Register returns a channel which emits a job state once the given job transitions in one of the given states. If job is already in the expected state it returns nil as channel and nil as error.
TODO add method for removing specific wait functions.
func (*PubSub) StartBookKeeper ¶
func (ps *PubSub) StartBookKeeper()
StartBookKeeper processes all job state changes from the process trackers and notifies registered wait functions.
func (*PubSub) Unregister ¶
Unregister removes all functions waiting for a specific job and all occurences of the job itself.
type SimpleTrackerInitParams ¶ added in v0.3.14
type StoreCloser ¶ added in v0.3.14
type StoreCloser interface {
Close() error
}
StoreCloser closes any DB handle when called so that a new JobStorer can be created.