simpletracker

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 5, 2019 License: Apache-2.0 Imports: 12 Imported by: 0

README

OS Process Tacker

Introduction

OS Process Tracker implements the JobTracker interface used by the Go DRMAA2 implementation in order to use standard OS processes as a backend for managing jobs as processes from the DRMAA2 interface.

Functionality

Basic Usage

A JobTemplate requires at least:

  • RemoteCommand -> which is path to the executable
Job Control Mapping
DRMAA2 Job Control OS Process
Suspend SIGTSTP
Resume SIGCONT
Terminate SIGKILL
Hold Unsupported
Release Unsupported
State Mapping
DRMAA2 State Process State
Queued Unsupported
Running PID is found
Suspended
Done
Failed
DeleteJob
Job Template Mapping

A JobTemplate is mapped into the process creation process in the following way:

DRMAA2 JobTemplate OS Process
RemoteCommand Executable to start
JobName
Args Arguments of the executable
WorkingDir Working directory
JobEnvironment Environment variables set

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetNextJobID

func GetNextJobID() string

func KillPid

func KillPid(pid int) error

KillPid terminates a process and all processes belonging to the process group.

func NewJobID

func NewJobID() *lastJobID

func ResumePid

func ResumePid(pid int) error

ResumePid contiues to run a previously suspended process group.

func SetJobID

func SetJobID(jobid int64)

func StartProcess

func StartProcess(jobid string, task int, t drmaa2interface.JobTemplate, finishedJobChannel chan JobEvent) (int, error)

StartProcess creates a new process based on the JobTemplate. It returns the PID or 0 and an error if the process could be created. The given channel is used for communicating back when the job state changed.

func SuspendPid

func SuspendPid(pid int) error

SuspendPid stops a process group from its execution. Note that it sends SIGTSTP which can be caught by the application and hence could be ignored.

func TrackProcess

func TrackProcess(cmd *exec.Cmd, jobid string, startTime time.Time,
	finishedJobChannel chan JobEvent, waitForFiles int, waitCh chan bool)

TrackProcess supervises a running process and sends a notification when the process is finished.

Types

type InternalJob

type InternalJob struct {
	TaskID int
	State  drmaa2interface.JobState
	PID    int
}

InternalJob represents a process.

type JobEvent

type JobEvent struct {
	JobID    string
	JobState drmaa2interface.JobState
	JobInfo  drmaa2interface.JobInfo
	// contains filtered or unexported fields
}

JobEvent is send whenever a job status change is happening to inform all registered listeners.

type JobStore

type JobStore struct {
	// contains filtered or unexported fields
}

JobStore is an internal storage for jobs and job templates processed by the job tracker. Jobs are stored until Reap().

func NewJobStore

func NewJobStore() *JobStore

NewJobStore returns a new in memory job store for jobs.

func (*JobStore) GetPID

func (js *JobStore) GetPID(jobid string) (int, error)

GetPID returns the PID of a job or an array job task. It returns -1 and an error if the job is not known.

func (*JobStore) HasJob added in v0.2.0

func (js *JobStore) HasJob(jobid string) bool

HasJob returns true if the job is saved in the job store.

func (*JobStore) RemoveJob added in v0.2.0

func (js *JobStore) RemoveJob(jobid string)

RemoveJob deletes all occurrences of a job within the job storage. The jobid can be the identifier of a job or a job array. In case of a job array it removes all tasks which belong to the array job.

func (*JobStore) SaveArrayJob

func (js *JobStore) SaveArrayJob(arrayjobid string, pids []int,
	t drmaa2interface.JobTemplate, begin, end, step int)

SaveArrayJob stores all process IDs of the tasks of an array job.

func (*JobStore) SaveArrayJobPID added in v0.2.0

func (js *JobStore) SaveArrayJobPID(arrayjobid string, taskid, pid int) error

SaveArrayJobPID stores the current PID of main process of the job array task.

func (*JobStore) SaveJob

func (js *JobStore) SaveJob(jobid string, t drmaa2interface.JobTemplate, pid int)

SaveJob stores a job, the job submission template, and the process PID of the job in an internal job store.

type JobTracker

type JobTracker struct {
	sync.Mutex
	// contains filtered or unexported fields
}

JobTracker implements the JobTracker interface and treats jobs as OS processes.

func New

func New(jobsession string) *JobTracker

New creates and initializes a JobTracker.

func (*JobTracker) AddArrayJob

func (jt *JobTracker) AddArrayJob(t drmaa2interface.JobTemplate, begin, end, step, maxParallel int) (string, error)

AddArrayJob starts end-begin/step processes based on the given JobTemplate. Note that maxParallel is not yet implemented.

func (*JobTracker) AddJob

AddJob creates a process, fills in the internal job state and saves the job internally.

func (*JobTracker) DeleteJob

func (jt *JobTracker) DeleteJob(jobid string) error

DeleteJob removes a job from the internal job storage but only when the job is in any finished state.

func (*JobTracker) Destroy

func (jt *JobTracker) Destroy() error

Destroy signals the JobTracker to shutdown.

func (*JobTracker) JobControl

func (jt *JobTracker) JobControl(jobid, state string) error

JobControl suspends, resumes, or terminates a job.

func (*JobTracker) JobInfo

func (jt *JobTracker) JobInfo(jobid string) (drmaa2interface.JobInfo, error)

JobInfo returns more detailed information about a job.

func (*JobTracker) JobState

func (jt *JobTracker) JobState(jobid string) drmaa2interface.JobState

JobState returns the current state of the job (running, suspended, done, failed).

func (*JobTracker) ListArrayJobs

func (jt *JobTracker) ListArrayJobs(id string) ([]string, error)

ListArrayJobs returns all job IDs the job array ID is associated with.

func (*JobTracker) ListJobCategories

func (jt *JobTracker) ListJobCategories() ([]string, error)

ListJobCategories returns an empty list as JobCategories are currently not defined for OS processes.

func (*JobTracker) ListJobs

func (jt *JobTracker) ListJobs() ([]string, error)

ListJobs returns a list of all job IDs stored in the job store.

func (*JobTracker) Wait

func (jt *JobTracker) Wait(jobid string, d time.Duration, state ...drmaa2interface.JobState) error

Wait blocks until the job with the given job id is in on of the given states. If the job is after the given duration is still not in any of the states the method returns an error. If the duration is 0 then it waits infitely.

type PubSub

type PubSub struct {
	sync.Mutex
	// contains filtered or unexported fields
}

PubSub distributes job status change events to clients which Register() at PubSub.

func NewPubSub

func NewPubSub() (*PubSub, chan JobEvent)

NewPubSub returns an initialized PubSub structure and the JobEvent channel which is used by the caller to publish job events (i.e. job state transitions).

func (*PubSub) NotifyAndWait added in v0.2.0

func (ps *PubSub) NotifyAndWait(evt JobEvent)

NotifyAndWait sends a job event and waits until it was distributed to all waiting functions.

func (*PubSub) Register

func (ps *PubSub) Register(jobid string, states ...drmaa2interface.JobState) (chan drmaa2interface.JobState, error)

Register returns a channel which emits a job state once the given job transitions in one of the given states. If job is already in the expected state it returns nil as channel and nil as error.

TODO add method for removing specific wait functions.

func (*PubSub) StartBookKeeper

func (ps *PubSub) StartBookKeeper()

StartBookKeeper processes all job state changes from the process trackers and notifies registered wait functions.

func (*PubSub) Unregister

func (ps *PubSub) Unregister(jobid string)

Unregister removes all functions waiting for a specific job and all occurences of the job itself.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL