Documentation ¶
Overview ¶
Package jobqueue provides server/client functions to interact with the queue structure provided by the queue package over a network.
It provides a job queue and running system which guarantees: # Created jobs are never lost accidentally. # The same job will not run more than once simultaneously:
- Duplicate jobs are not created
- Each job is handled by only a single client
# Jobs are handled in the desired order (user priority and fifo). # Jobs still get run despite crashing clients. # Completed jobs are kept forever for historical purposes.
This file contains all the functions for clients to interact with the server. See server.go for the functions needed to implement a server executable.
Index ¶
- Constants
- Variables
- func CurrentIP() (ip string)
- type Client
- func (c *Client) Add(jobs []*Job) (added int, existed int, err error)
- func (c *Client) Archive(job *Job) (err error)
- func (c *Client) Bury(job *Job, failreason string) (err error)
- func (c *Client) Delete(ccs [][2]string) (deleted int, err error)
- func (c *Client) Disconnect()
- func (c *Client) DrainServer() (running int, etc time.Duration, err error)
- func (c *Client) Ended(job *Job, exitcode int, peakram int, cputime time.Duration, stdout []byte, ...) (err error)
- func (c *Client) Execute(job *Job, shell string) error
- func (c *Client) GetByCmd(cmd string, cwd string, getstd bool, getenv bool) (j *Job, err error)
- func (c *Client) GetByCmds(ccs [][2]string) (out []*Job, err error)
- func (c *Client) GetByRepGroup(repgroup string, limit int, state string, getStd bool, getEnv bool) (jobs []*Job, err error)
- func (c *Client) GetIncomplete(limit int, state string, getStd bool, getEnv bool) (jobs []*Job, err error)
- func (c *Client) Kick(ccs [][2]string) (kicked int, err error)
- func (c *Client) Ping(timeout time.Duration) bool
- func (c *Client) Release(job *Job, failreason string, delay time.Duration) (err error)
- func (c *Client) Reserve(timeout time.Duration) (j *Job, err error)
- func (c *Client) ReserveScheduled(timeout time.Duration, schedulerGroup string) (j *Job, err error)
- func (c *Client) ServerStats() (s *ServerStats, err error)
- func (c *Client) ShutdownServer() bool
- func (c *Client) Started(job *Job, pid int, host string) (err error)
- func (c *Client) Touch(job *Job) (err error)
- type Error
- type Job
- type Server
- type ServerConfig
- type ServerInfo
- type ServerStats
Constants ¶
const ( FailReasonEnv = "failed to get environment variables" FailReasonCwd = "working directory does not exist" FailReasonStart = "command failed to start" FailReasonCPerm = "command permission problem" FailReasonCFound = "command not found" FailReasonCExit = "command invalid exit code" FailReasonExit = "command exited non-zero" FailReasonRAM = "command used too much RAM" FailReasonTime = "command used too much time" FailReasonAbnormal = "command failed to complete normally" FailReasonSignal = "runner received a signal to stop" FailReasonResource = "resource requirements cannot be met" )
FailReason* are the reasons for cmd line failure stored on Jobs
const ( ErrInternalError = "internal error" ErrUnknownCommand = "unknown command" ErrBadRequest = "bad request (missing arguments?)" ErrBadJob = "bad job (not in queue or correct sub-queue)" ErrMissingJob = "corresponding job not found" ErrUnknown = "unknown error" ErrClosedInt = "queues closed due to SIGINT" ErrClosedTerm = "queues closed due to SIGTERM" ErrClosedStop = "queues closed due to manual Stop()" ErrQueueClosed = "queue closed" ErrNoHost = "could not determine the non-loopback ip address of this host" ErrNoServer = "could not reach the server" ErrMustReserve = "you must Reserve() a Job before passing it to other methods" ErrDBError = "failed to use database" ServerModeNormal = "started" ServerModeDrain = "draining" )
Err* constants are found in the our returned Errors under err.Err, so you can cast and check if it's a certain type of error. ServerMode* constants are used to report on the status of the server, found inside ServerInfo.
Variables ¶
var ( ClientTouchInterval = 15 * time.Second ClientReleaseDelay = 30 * time.Second RAMIncreaseMin float64 = 1000 RAMIncreaseMultLow = 2.0 RAMIncreaseMultHigh = 1.3 RAMIncreaseMultBreakpoint float64 = 8192 )
these global variables are primarily exported for testing purposes; you probably shouldn't change them (*** and they should probably be re-factored as fields of a config struct...)
var ( RecMBRound = 100 // when we recommend amount of memory to reserve for a job, we round up to the nearest RecMBRound MBs RecSecRound = 1800 // when we recommend time to reserve for a job, we round up to the nearest RecSecRound seconds )
Rec* variables are only exported for testing purposes (*** though they should probably be user configurable somewhere...)
var ( ServerInterruptTime = 1 * time.Second ServerItemTTR = 60 * time.Second ServerReserveTicker = 1 * time.Second ServerLogClientErrors = true )
these global variables are primarily exported for testing purposes; you probably shouldn't change them (*** and they should probably be re-factored as fields of a config struct...)
Functions ¶
Types ¶
type Client ¶
Client represents the client side of the socket that the jobqueue server is Serve()ing, specific to a particular queue.
func Connect ¶
Connect creates a connection to the jobqueue server, specific to a single queue. Timeout determines how long to wait for a response from the server, not only while connecting, but for all subsequent interactions with it using the returned Client.
func (*Client) Add ¶
Add adds new jobs to the job queue, but only if those jobs aren't already in there. If any were already there, you will not get an error, but the returned 'existed' count will be > 0. Note that no cross-queue checking is done, so you need to be careful not to add the same job to different queues.
func (*Client) Archive ¶
Archive removes a job from the jobqueue and adds it to the database of complete jobs, for use after you have run the job successfully. You have to have been the one to Reserve() the supplied Job, and the Job must be marked as having successfully run, or you will get an error.
func (*Client) Bury ¶
Bury marks a job as unrunnable, so it will be ignored (until the user does something to perhaps make it runnable and kicks the job). Note that you must reserve a job before you can bury it.
func (*Client) Delete ¶
Delete removes previously Bury()'d jobs from the queue completely. For use when jobs were created incorrectly/ by accident, or they can never be fixed. It returns a count of jobs that it actually removed. Errors will only be related to not being able to contact the server. The ccs argument is the same as for GetByCmds().
func (*Client) Disconnect ¶
func (c *Client) Disconnect()
Disconnect closes the connection to the jobqueue server. It is CRITICAL that you call Disconnect() before calling Connect() again in the same process.
func (*Client) DrainServer ¶
DrainServer tells the server to stop spawning new runners, stop letting existing runners reserve new jobs, and exit once existing runners stop running. You get back a count of existing runners and and an estimated time until completion for the last of those runners.
func (*Client) Ended ¶
func (c *Client) Ended(job *Job, exitcode int, peakram int, cputime time.Duration, stdout []byte, stderr []byte) (err error)
Ended updates a Job on the server with information that you've finished running the Job's Cmd. (The Job's Walltime is handled by the server internally, based on you calling this.) Peakram should be in MB.
func (*Client) Execute ¶
Execute runs the given Job's Cmd and blocks until it exits. Internally it calls Started() and Ended() and keeps track of peak RAM used. It regularly calls Touch() on the Job so that the server knows we are still alive and handling the Job successfully. It also intercepts SIGTERM, SIGINT, SIGQUIT, SIGUSR1 and SIGUSR2, sending SIGKILL to the running Cmd and returning Error.Err(FailReasonSignal); you should check for this and exit your process). If no error is returned, the Cmd will have run OK, exited with status 0, and been Archive()d from the queue while being placed in the permanent store. Otherwise, it will have been Release()d or Bury()ied as appropriate. The supplied shell is the shell to execute the Cmd under, ideally bash (something that understand the command "set -o pipefail"). You have to have been the one to Reserve() the supplied Job, or this will immediately return an error. NB: the peak RAM tracking assumes we are running on a modern linux system with /proc/*/smaps.
func (*Client) GetByCmd ¶
GetByCmd gets a Job given its Cmd and Cwd. With the boolean args set to true, this is the only way to get a Job that StdOut() and StdErr() will work on, and one of 2 ways that Env() will work (the other being Reserve()).
func (*Client) GetByCmds ¶
GetByCmds gets multiple Jobs at once given their Cmds and Cwds. You supply a slice of cmd/cwd string tuples like: [][2]string{[2]string{cmd1, cwd1}, [2]string{cmd2, cwd2}, ...}. It is also possible to supply "",key if you know the "key" of the desired job; you can get these keys when you use GetByRepGroup() or GetIncomplete() with a limit.
func (*Client) GetByRepGroup ¶
func (c *Client) GetByRepGroup(repgroup string, limit int, state string, getStd bool, getEnv bool) (jobs []*Job, err error)
GetByRepGroup gets multiple Jobs at once given their RepGroup (an arbitrary user-supplied identifier for the purpose of grouping related jobs together for reporting purposes). 'limit', if greater than 0, limits the number of jobs returned that have the same State, FailReason and Exitcode, and on the the last job of each State+FailReason group it populates 'Similar' with the number of other excluded jobs there were in that group. Providing 'state' only returns jobs in that State. 'getStd' and 'getEnv', if true, retrieve the stdout, stderr and environement variables for the Jobs, but only if 'limit' is <= 5.
func (*Client) GetIncomplete ¶
func (c *Client) GetIncomplete(limit int, state string, getStd bool, getEnv bool) (jobs []*Job, err error)
GetIncomplete gets all Jobs that are currently in the jobqueue, ie. excluding those that are complete and have been Archive()d. The args are as in GetByRepGroup().
func (*Client) Kick ¶
Kick makes previously Bury()'d jobs runnable again (it can be Reserve()d in the future). It returns a count of jobs that it actually kicked. Errors will only be related to not being able to contact the server. The ccs argument is the same as for GetByCmds()
func (*Client) Release ¶
Release places a job back on the jobqueue, for use when you can't handle the job right now (eg. there was a suspected transient error) but maybe someone else can later. Note that you must reserve a job before you can release it. The delay arg is the duration to wait after your call to Release() before anyone else can Reserve() this job again - could help you stop immediately Reserve()ing the job again yourself. You can only Release() the same job 3 times if it has been run and failed; a subsequent call to Release() will instead result in a Bury(). (If the job's Cmd was not run, you can Release() an unlimited number of times.)
func (*Client) Reserve ¶
Reserve takes a job off the jobqueue. If you process the job successfully you should Archive() it. If you can't deal with it right now you should Release() it. If you think it can never be dealt with you should Bury() it. If you die unexpectedly, the job will automatically be released back to the queue after some time. If no job was available in the queue for as long as the timeout argument, nil is returned for both job and error. If your timeout is 0, you will wait indefinitely for a job.
func (*Client) ReserveScheduled ¶
ReserveScheduled is like Reserve(), except that it will only return jobs from the specified schedulerGroup. Based on the scheduler the server was configured with, it will group jobs based on their resource requirements and then submit runners to handle them to your system's job scheduler (such as LSF), possibly in different scheduler queues. These runners are told the group they are a part of, and that same group name is applied internally to the Jobs as the "schedulerGroup", so that the runners can reserve only Jobs that they're supposed to. Therefore, it does not make sense for you to call this yourself; it is only for use by runners spawned by the server.
func (*Client) ServerStats ¶
func (c *Client) ServerStats() (s *ServerStats, err error)
ServerStats returns stats of the jobqueue server itself.
func (*Client) ShutdownServer ¶
ShutdownServer tells the server to immediately cease all operations. Its last act will be to backup its internal database. Any existing runners will fail. Because the server gets shut down it can't respond with success/failure, so we indirectly report if the server was shut down successfully.
type Error ¶
type Error struct { Queue string // the queue's Name Op string // name of the method Item string // the item's key Err string // one of our Err* vars }
Error records an error and the operation, item and queue that caused it.
type Job ¶
type Job struct { RepGroup string // a name associated with related Jobs to help group them together when reporting on their status etc. ReqGroup string Cmd string Cwd string // the working directory to cd to before running Cmd RAM int // the expected peak RAM in MB Cmd will use while running Time time.Duration // the expected time Cmd will take to run Cores int // how many processor cores the Cmd will use Override uint8 Priority uint8 Retries uint8 // the number of times to retry running a Cmd if it fails PeakRAM int // the actual peak RAM is recorded here (MB) Exited bool // true if the Cmd was run and exited Exitcode int // if the job ran and exited, its exit code is recorded here, but check Exited because when this is not set it could like like exit code 0 FailReason string // if the job failed to complete successfully, this will hold one of the FailReason* strings Pid int // the pid of the running or ran process is recorded here Host string // the host the process is running or did run on is recorded here Walltime time.Duration // if the job ran or is running right now, the walltime for the run is recorded here CPUtime time.Duration // if the job ran, the CPU time is recorded here StdErrC []byte // to read, call job.StdErr() instead; if the job ran, its (truncated) STDERR will be here StdOutC []byte // to read, call job.StdOut() instead; if the job ran, its (truncated) STDOUT will be here EnvC []byte // to read, call job.Env() instead, to get the environment variables as a []string, where each string is like "key=value" State string // the job's state in the queue: 'delayed', 'ready', 'reserved', 'running', 'buried' or 'complete' Attempts uint32 // the number of times the job had ever entered 'running' state UntilBuried uint8 // the remaining number of Release()s allowed before being buried instead ReservedBy uuid.UUID // we note which client reserved this job, for validating if that client has permission to do other stuff to this Job; the server only ever sets this on Reserve(), so clients can't cheat by changing this on their end EnvKey string // on the server we don't store EnvC with the job, but look it up in db via this key Similar int // when retrieving jobs with a limit, this tells you how many jobs were excluded Queue string // the name of the queue the Job was added to // contains filtered or unexported fields }
Job is a struct that represents a command that needs to be run and some associated metadata. ReqGroup is a string that you supply to group together all commands that you expect to have similar RAM and time requirements. RAM and Time are added by the system based on past experience of running jobs with the same ReqGroup. If you supply these yourself, your RAM and time will be used if there is insufficient past experience, or if you also supply Override, which can be 0 to not override, 1 to override past experience if your supplied values are higher, or 2 to always override. Priority is a number between 0 and 255 inclusive - higher numbered jobs will run before lower numbered ones (the default is 0). If you get a Job back from the server (via Reserve() or Get*()), you should treat the properties as read-only: changing them will have no effect.
func NewJob ¶
func NewJob(cmd string, cwd string, group string, ram int, time time.Duration, cores int, override uint8, priority uint8, retries uint8, repgroup string) *Job
NewJob makes it a little easier to make a new Job, for use with Add().
func (*Job) Env ¶
Env decompresses and decodes job.EnvC (the output of compressEnv(), which are the environment variables the Job's Cmd should run/ran under). Note that EnvC is only populated if you got the Job from GetByCmd(_, _, true) or Reserve().
func (*Job) StdErr ¶
StdErr returns the decompressed job.StdErrC, which is the head and tail of job.Cmd's STDERR when it ran. If the Cmd hasn't run yet, or if it output nothing to STDERR, you will get an empty string. Note that StdErrC is only populated if you got the Job from GetByCmd(_, true), and if the Job's Cmd ran but failed.
func (*Job) StdOut ¶
StdOut returns the decompressed job.StdOutC, which is the head and tail of job.Cmd's STDOUT when it ran. If the Cmd hasn't run yet, or if it output nothing to STDOUT, you will get an empty string. Note that StdOutC is only populated if you got the Job from GetByCmd(_, true), and if the Job's Cmd ran but failed.
type Server ¶
type Server struct { ServerInfo *ServerInfo sync.Mutex // contains filtered or unexported fields }
Server represents the server side of the socket that clients Connect() to.
func Serve ¶
func Serve(config ServerConfig) (s *Server, msg string, err error)
Serve is for use by a server executable and makes it start listening on localhost at the configured port for Connect()ions from clients, and then handles those clients. It returns a *Server that you will typically call Block() on to block until until your executable receives a SIGINT or SIGTERM, or you call Stop(), at which point the queues will be safely closed (you'd probably just exit at that point). The possible errors from Serve() will be related to not being able to start up at the supplied address; errors encountered while dealing with clients are logged but otherwise ignored. If it creates a db file or recreates one from backup, it will say what it did in the returned msg string. It also spawns your runner clients as needed, running them via the configured job scheduler, using the configured shell. It determines the command line to execute for your runner client from the configured RunnerCmd string you supplied.
func (*Server) Block ¶
Block makes you block while the server does the job of serving clients. This will return with an error indicating why it stopped blocking, which will be due to receiving a signal or because you called Stop()
func (*Server) Drain ¶
Drain will stop the server spawning new runners and stop Reserve*() from returning any more Jobs. Once all current runners exit, we Stop().
func (*Server) GetServerStats ¶
func (s *Server) GetServerStats() *ServerStats
GetServerStats returns basic info about the server along with some simple live stats about what's happening in the server's queues.
func (*Server) HasRunners ¶
HasRunners tells you if there are currently runner clients in the job scheduler (either running or pending).
type ServerConfig ¶
type ServerConfig struct { // Port for client-server communication. Port string // Port for the web interface. WebPort string // Name of the desired scheduler (eg. "local" or "lsf" or "openstack") that // jobs will be submitted to. SchedulerName string // SchedulerConfig should define the config options needed by the chosen // scheduler, eg. scheduler.ConfigLocal{Deployment: "production", Shell: // "bash"} if using the local scheduler. SchedulerConfig interface{} // The command line needed to bring up a jobqueue runner client, which // should contain 6 %s parts which will be replaced with the queue name, // scheduler group, deployment ip:host address of the server, reservation // time out and maximum number of minutes allowed, eg. // "my_jobqueue_runner_client --queue %s --group '%s' --deployment %s // --server '%s' --reserve_timeout %d --max_mins %d". If you supply an empty // string (the default), runner clients will not be spawned; for any work to // be done you will have to run your runner client yourself manually. RunnerCmd string // Absolute path to where the database file should be saved. The database is // used to ensure no loss of added commands, to keep a permanent history of // all jobs completed, and to keep various stats, amongst other things. DBFile string // Absolute path to where the database file should be backed up to. DBFileBackup string // Name of the deployment ("development" or "production"); development // databases are deleted and recreated on start up by default. Deployment string }
ServerConfig is supplied to Serve() to configure your jobqueue server. All fields are required with no working default unless otherwise noted.
type ServerInfo ¶
type ServerInfo struct { Addr string // ip:port Host string // hostname Port string // port WebPort string // port of the web interface PID int // process id of server Deployment string // deployment the server is running under Scheduler string // the name of the scheduler that jobs are being submitted to Mode string // ServerModeNormal if the server is running normally, or ServerModeDrain if draining }
ServerInfo holds basic addressing info about the server.
type ServerStats ¶
type ServerStats struct { ServerInfo *ServerInfo Delayed int // how many jobs are waiting following a possibly transient error Ready int // how many jobs are ready to begin running Running int // how many jobs are currently running Buried int // how many jobs are no longer being processed because of seemingly permanent errors ETC time.Duration // how long until the the slowest of the currently running jobs is expected to complete }
ServerStats holds information about the jobqueue server for sending to clients.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package scheduler lets the jobqueue server interact with the local job scheduler (if any) to submit jobqueue clients and have them run on a compute cluster (or local machine).
|
Package scheduler lets the jobqueue server interact with the local job scheduler (if any) to submit jobqueue clients and have them run on a compute cluster (or local machine). |