jobqueue

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2016 License: GPL-3.0 Imports: 35 Imported by: 7

Documentation

Overview

Package jobqueue provides server/client functions to interact with the queue structure provided by the queue package over a network.

It provides a job queue and running system which guarantees: # Created jobs are never lost accidentally. # The same job will not run more than once simultaneously:

  • Duplicate jobs are not created
  • Each job is handled by only a single client

# Jobs are handled in the desired order (user priority and fifo). # Jobs still get run despite crashing clients. # Completed jobs are kept forever for historical purposes.

This file contains all the functions for clients to interact with the server. See server.go for the functions needed to implement a server executable.

Index

Constants

View Source
const (
	FailReasonEnv      = "failed to get environment variables"
	FailReasonCwd      = "working directory does not exist"
	FailReasonStart    = "command failed to start"
	FailReasonCPerm    = "command permission problem"
	FailReasonCFound   = "command not found"
	FailReasonCExit    = "command invalid exit code"
	FailReasonExit     = "command exited non-zero"
	FailReasonRAM      = "command used too much RAM"
	FailReasonTime     = "command used too much time"
	FailReasonAbnormal = "command failed to complete normally"
	FailReasonSignal   = "runner received a signal to stop"
	FailReasonResource = "resource requirements cannot be met"
)

FailReason* are the reasons for cmd line failure stored on Jobs

View Source
const (
	ErrInternalError  = "internal error"
	ErrUnknownCommand = "unknown command"
	ErrBadRequest     = "bad request (missing arguments?)"
	ErrBadJob         = "bad job (not in queue or correct sub-queue)"
	ErrMissingJob     = "corresponding job not found"
	ErrUnknown        = "unknown error"
	ErrClosedInt      = "queues closed due to SIGINT"
	ErrClosedTerm     = "queues closed due to SIGTERM"
	ErrClosedStop     = "queues closed due to manual Stop()"
	ErrQueueClosed    = "queue closed"
	ErrNoHost         = "could not determine the non-loopback ip address of this host"
	ErrNoServer       = "could not reach the server"
	ErrMustReserve    = "you must Reserve() a Job before passing it to other methods"
	ErrDBError        = "failed to use database"
	ServerModeNormal  = "started"
	ServerModeDrain   = "draining"
)

Err* constants are found in the our returned Errors under err.Err, so you can cast and check if it's a certain type of error. ServerMode* constants are used to report on the status of the server, found inside ServerInfo.

Variables

View Source
var (
	ClientTouchInterval               = 15 * time.Second
	ClientReleaseDelay                = 30 * time.Second
	RAMIncreaseMin            float64 = 1000
	RAMIncreaseMultLow                = 2.0
	RAMIncreaseMultHigh               = 1.3
	RAMIncreaseMultBreakpoint float64 = 8192
)

these global variables are primarily exported for testing purposes; you probably shouldn't change them (*** and they should probably be re-factored as fields of a config struct...)

View Source
var (
	RecMBRound  = 100  // when we recommend amount of memory to reserve for a job, we round up to the nearest RecMBRound MBs
	RecSecRound = 1800 // when we recommend time to reserve for a job, we round up to the nearest RecSecRound seconds
)

Rec* variables are only exported for testing purposes (*** though they should probably be user configurable somewhere...)

View Source
var (
	ServerInterruptTime   = 1 * time.Second
	ServerItemTTR         = 60 * time.Second
	ServerReserveTicker   = 1 * time.Second
	ServerLogClientErrors = true
)

these global variables are primarily exported for testing purposes; you probably shouldn't change them (*** and they should probably be re-factored as fields of a config struct...)

Functions

func CurrentIP

func CurrentIP() (ip string)

CurrentIP returns the IP address of the machine we're running on right now

Types

type Client

type Client struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Client represents the client side of the socket that the jobqueue server is Serve()ing, specific to a particular queue.

func Connect

func Connect(addr string, queue string, timeout time.Duration) (c *Client, err error)

Connect creates a connection to the jobqueue server, specific to a single queue. Timeout determines how long to wait for a response from the server, not only while connecting, but for all subsequent interactions with it using the returned Client.

func (*Client) Add

func (c *Client) Add(jobs []*Job) (added int, existed int, err error)

Add adds new jobs to the job queue, but only if those jobs aren't already in there. If any were already there, you will not get an error, but the returned 'existed' count will be > 0. Note that no cross-queue checking is done, so you need to be careful not to add the same job to different queues.

func (*Client) Archive

func (c *Client) Archive(job *Job) (err error)

Archive removes a job from the jobqueue and adds it to the database of complete jobs, for use after you have run the job successfully. You have to have been the one to Reserve() the supplied Job, and the Job must be marked as having successfully run, or you will get an error.

func (*Client) Bury

func (c *Client) Bury(job *Job, failreason string) (err error)

Bury marks a job as unrunnable, so it will be ignored (until the user does something to perhaps make it runnable and kicks the job). Note that you must reserve a job before you can bury it.

func (*Client) Delete

func (c *Client) Delete(ccs [][2]string) (deleted int, err error)

Delete removes previously Bury()'d jobs from the queue completely. For use when jobs were created incorrectly/ by accident, or they can never be fixed. It returns a count of jobs that it actually removed. Errors will only be related to not being able to contact the server. The ccs argument is the same as for GetByCmds().

func (*Client) Disconnect

func (c *Client) Disconnect()

Disconnect closes the connection to the jobqueue server. It is CRITICAL that you call Disconnect() before calling Connect() again in the same process.

func (*Client) DrainServer

func (c *Client) DrainServer() (running int, etc time.Duration, err error)

DrainServer tells the server to stop spawning new runners, stop letting existing runners reserve new jobs, and exit once existing runners stop running. You get back a count of existing runners and and an estimated time until completion for the last of those runners.

func (*Client) Ended

func (c *Client) Ended(job *Job, exitcode int, peakram int, cputime time.Duration, stdout []byte, stderr []byte) (err error)

Ended updates a Job on the server with information that you've finished running the Job's Cmd. (The Job's Walltime is handled by the server internally, based on you calling this.) Peakram should be in MB.

func (*Client) Execute

func (c *Client) Execute(job *Job, shell string) error

Execute runs the given Job's Cmd and blocks until it exits. Internally it calls Started() and Ended() and keeps track of peak RAM used. It regularly calls Touch() on the Job so that the server knows we are still alive and handling the Job successfully. It also intercepts SIGTERM, SIGINT, SIGQUIT, SIGUSR1 and SIGUSR2, sending SIGKILL to the running Cmd and returning Error.Err(FailReasonSignal); you should check for this and exit your process). If no error is returned, the Cmd will have run OK, exited with status 0, and been Archive()d from the queue while being placed in the permanent store. Otherwise, it will have been Release()d or Bury()ied as appropriate. The supplied shell is the shell to execute the Cmd under, ideally bash (something that understand the command "set -o pipefail"). You have to have been the one to Reserve() the supplied Job, or this will immediately return an error. NB: the peak RAM tracking assumes we are running on a modern linux system with /proc/*/smaps.

func (*Client) GetByCmd

func (c *Client) GetByCmd(cmd string, cwd string, getstd bool, getenv bool) (j *Job, err error)

GetByCmd gets a Job given its Cmd and Cwd. With the boolean args set to true, this is the only way to get a Job that StdOut() and StdErr() will work on, and one of 2 ways that Env() will work (the other being Reserve()).

func (*Client) GetByCmds

func (c *Client) GetByCmds(ccs [][2]string) (out []*Job, err error)

GetByCmds gets multiple Jobs at once given their Cmds and Cwds. You supply a slice of cmd/cwd string tuples like: [][2]string{[2]string{cmd1, cwd1}, [2]string{cmd2, cwd2}, ...}. It is also possible to supply "",key if you know the "key" of the desired job; you can get these keys when you use GetByRepGroup() or GetIncomplete() with a limit.

func (*Client) GetByRepGroup

func (c *Client) GetByRepGroup(repgroup string, limit int, state string, getStd bool, getEnv bool) (jobs []*Job, err error)

GetByRepGroup gets multiple Jobs at once given their RepGroup (an arbitrary user-supplied identifier for the purpose of grouping related jobs together for reporting purposes). 'limit', if greater than 0, limits the number of jobs returned that have the same State, FailReason and Exitcode, and on the the last job of each State+FailReason group it populates 'Similar' with the number of other excluded jobs there were in that group. Providing 'state' only returns jobs in that State. 'getStd' and 'getEnv', if true, retrieve the stdout, stderr and environement variables for the Jobs, but only if 'limit' is <= 5.

func (*Client) GetIncomplete

func (c *Client) GetIncomplete(limit int, state string, getStd bool, getEnv bool) (jobs []*Job, err error)

GetIncomplete gets all Jobs that are currently in the jobqueue, ie. excluding those that are complete and have been Archive()d. The args are as in GetByRepGroup().

func (*Client) Kick

func (c *Client) Kick(ccs [][2]string) (kicked int, err error)

Kick makes previously Bury()'d jobs runnable again (it can be Reserve()d in the future). It returns a count of jobs that it actually kicked. Errors will only be related to not being able to contact the server. The ccs argument is the same as for GetByCmds()

func (*Client) Ping

func (c *Client) Ping(timeout time.Duration) bool

Ping tells you if your connection to the server is working.

func (*Client) Release

func (c *Client) Release(job *Job, failreason string, delay time.Duration) (err error)

Release places a job back on the jobqueue, for use when you can't handle the job right now (eg. there was a suspected transient error) but maybe someone else can later. Note that you must reserve a job before you can release it. The delay arg is the duration to wait after your call to Release() before anyone else can Reserve() this job again - could help you stop immediately Reserve()ing the job again yourself. You can only Release() the same job 3 times if it has been run and failed; a subsequent call to Release() will instead result in a Bury(). (If the job's Cmd was not run, you can Release() an unlimited number of times.)

func (*Client) Reserve

func (c *Client) Reserve(timeout time.Duration) (j *Job, err error)

Reserve takes a job off the jobqueue. If you process the job successfully you should Archive() it. If you can't deal with it right now you should Release() it. If you think it can never be dealt with you should Bury() it. If you die unexpectedly, the job will automatically be released back to the queue after some time. If no job was available in the queue for as long as the timeout argument, nil is returned for both job and error. If your timeout is 0, you will wait indefinitely for a job.

func (*Client) ReserveScheduled

func (c *Client) ReserveScheduled(timeout time.Duration, schedulerGroup string) (j *Job, err error)

ReserveScheduled is like Reserve(), except that it will only return jobs from the specified schedulerGroup. Based on the scheduler the server was configured with, it will group jobs based on their resource requirements and then submit runners to handle them to your system's job scheduler (such as LSF), possibly in different scheduler queues. These runners are told the group they are a part of, and that same group name is applied internally to the Jobs as the "schedulerGroup", so that the runners can reserve only Jobs that they're supposed to. Therefore, it does not make sense for you to call this yourself; it is only for use by runners spawned by the server.

func (*Client) ServerStats

func (c *Client) ServerStats() (s *ServerStats, err error)

ServerStats returns stats of the jobqueue server itself.

func (*Client) ShutdownServer

func (c *Client) ShutdownServer() bool

ShutdownServer tells the server to immediately cease all operations. Its last act will be to backup its internal database. Any existing runners will fail. Because the server gets shut down it can't respond with success/failure, so we indirectly report if the server was shut down successfully.

func (*Client) Started

func (c *Client) Started(job *Job, pid int, host string) (err error)

Started updates a Job on the server with information that you've started running the Job's Cmd. (The Job's Walltime is handled by the server internally, based on you calling this.)

func (*Client) Touch

func (c *Client) Touch(job *Job) (err error)

Touch adds to a job's ttr, allowing you more time to work on it. Note that you must have reserved the job before you can touch it.

type Error

type Error struct {
	Queue string // the queue's Name
	Op    string // name of the method
	Item  string // the item's key
	Err   string // one of our Err* vars
}

Error records an error and the operation, item and queue that caused it.

func (Error) Error

func (e Error) Error() string

type Job

type Job struct {
	RepGroup    string // a name associated with related Jobs to help group them together when reporting on their status etc.
	ReqGroup    string
	Cmd         string
	Cwd         string        // the working directory to cd to before running Cmd
	RAM         int           // the expected peak RAM in MB Cmd will use while running
	Time        time.Duration // the expected time Cmd will take to run
	Cores       int           // how many processor cores the Cmd will use
	Override    uint8
	Priority    uint8
	Retries     uint8         // the number of times to retry running a Cmd if it fails
	PeakRAM     int           // the actual peak RAM is recorded here (MB)
	Exited      bool          // true if the Cmd was run and exited
	Exitcode    int           // if the job ran and exited, its exit code is recorded here, but check Exited because when this is not set it could like like exit code 0
	FailReason  string        // if the job failed to complete successfully, this will hold one of the FailReason* strings
	Pid         int           // the pid of the running or ran process is recorded here
	Host        string        // the host the process is running or did run on is recorded here
	Walltime    time.Duration // if the job ran or is running right now, the walltime for the run is recorded here
	CPUtime     time.Duration // if the job ran, the CPU time is recorded here
	StdErrC     []byte        // to read, call job.StdErr() instead; if the job ran, its (truncated) STDERR will be here
	StdOutC     []byte        // to read, call job.StdOut() instead; if the job ran, its (truncated) STDOUT will be here
	EnvC        []byte        // to read, call job.Env() instead, to get the environment variables as a []string, where each string is like "key=value"
	State       string        // the job's state in the queue: 'delayed', 'ready', 'reserved', 'running', 'buried' or 'complete'
	Attempts    uint32        // the number of times the job had ever entered 'running' state
	UntilBuried uint8         // the remaining number of Release()s allowed before being buried instead

	ReservedBy uuid.UUID // we note which client reserved this job, for validating if that client has permission to do other stuff to this Job; the server only ever sets this on Reserve(), so clients can't cheat by changing this on their end
	EnvKey     string    // on the server we don't store EnvC with the job, but look it up in db via this key
	Similar    int       // when retrieving jobs with a limit, this tells you how many jobs were excluded
	Queue      string    // the name of the queue the Job was added to
	// contains filtered or unexported fields
}

Job is a struct that represents a command that needs to be run and some associated metadata. ReqGroup is a string that you supply to group together all commands that you expect to have similar RAM and time requirements. RAM and Time are added by the system based on past experience of running jobs with the same ReqGroup. If you supply these yourself, your RAM and time will be used if there is insufficient past experience, or if you also supply Override, which can be 0 to not override, 1 to override past experience if your supplied values are higher, or 2 to always override. Priority is a number between 0 and 255 inclusive - higher numbered jobs will run before lower numbered ones (the default is 0). If you get a Job back from the server (via Reserve() or Get*()), you should treat the properties as read-only: changing them will have no effect.

func NewJob

func NewJob(cmd string, cwd string, group string, ram int, time time.Duration, cores int, override uint8, priority uint8, retries uint8, repgroup string) *Job

NewJob makes it a little easier to make a new Job, for use with Add().

func (*Job) Env

func (j *Job) Env() (env []string, err error)

Env decompresses and decodes job.EnvC (the output of compressEnv(), which are the environment variables the Job's Cmd should run/ran under). Note that EnvC is only populated if you got the Job from GetByCmd(_, _, true) or Reserve().

func (*Job) StdErr

func (j *Job) StdErr() (stderr string, err error)

StdErr returns the decompressed job.StdErrC, which is the head and tail of job.Cmd's STDERR when it ran. If the Cmd hasn't run yet, or if it output nothing to STDERR, you will get an empty string. Note that StdErrC is only populated if you got the Job from GetByCmd(_, true), and if the Job's Cmd ran but failed.

func (*Job) StdOut

func (j *Job) StdOut() (stdout string, err error)

StdOut returns the decompressed job.StdOutC, which is the head and tail of job.Cmd's STDOUT when it ran. If the Cmd hasn't run yet, or if it output nothing to STDOUT, you will get an empty string. Note that StdOutC is only populated if you got the Job from GetByCmd(_, true), and if the Job's Cmd ran but failed.

type Server

type Server struct {
	ServerInfo *ServerInfo

	sync.Mutex
	// contains filtered or unexported fields
}

Server represents the server side of the socket that clients Connect() to.

func Serve

func Serve(config ServerConfig) (s *Server, msg string, err error)

Serve is for use by a server executable and makes it start listening on localhost at the configured port for Connect()ions from clients, and then handles those clients. It returns a *Server that you will typically call Block() on to block until until your executable receives a SIGINT or SIGTERM, or you call Stop(), at which point the queues will be safely closed (you'd probably just exit at that point). The possible errors from Serve() will be related to not being able to start up at the supplied address; errors encountered while dealing with clients are logged but otherwise ignored. If it creates a db file or recreates one from backup, it will say what it did in the returned msg string. It also spawns your runner clients as needed, running them via the configured job scheduler, using the configured shell. It determines the command line to execute for your runner client from the configured RunnerCmd string you supplied.

func (*Server) Block

func (s *Server) Block() (err error)

Block makes you block while the server does the job of serving clients. This will return with an error indicating why it stopped blocking, which will be due to receiving a signal or because you called Stop()

func (*Server) Drain

func (s *Server) Drain() (err error)

Drain will stop the server spawning new runners and stop Reserve*() from returning any more Jobs. Once all current runners exit, we Stop().

func (*Server) GetServerStats

func (s *Server) GetServerStats() *ServerStats

GetServerStats returns basic info about the server along with some simple live stats about what's happening in the server's queues.

func (*Server) HasRunners

func (s *Server) HasRunners() bool

HasRunners tells you if there are currently runner clients in the job scheduler (either running or pending).

func (*Server) Stop

func (s *Server) Stop() (err error)

Stop will cause a graceful shut down of the server.

type ServerConfig

type ServerConfig struct {
	// Port for client-server communication.
	Port string

	// Port for the web interface.
	WebPort string

	// Name of the desired scheduler (eg. "local" or "lsf" or "openstack") that
	// jobs will be submitted to.
	SchedulerName string

	// SchedulerConfig should define the config options needed by the chosen
	// scheduler, eg. scheduler.ConfigLocal{Deployment: "production", Shell:
	// "bash"} if using the local scheduler.
	SchedulerConfig interface{}

	// The command line needed to bring up a jobqueue runner client, which
	// should contain 6 %s parts which will be replaced with the queue name,
	// scheduler group, deployment ip:host address of the server, reservation
	// time out and maximum number of minutes allowed, eg.
	// "my_jobqueue_runner_client --queue %s --group '%s' --deployment %s
	// --server '%s' --reserve_timeout %d --max_mins %d". If you supply an empty
	// string (the default), runner clients will not be spawned; for any work to
	// be done you will have to run your runner client yourself manually.
	RunnerCmd string

	// Absolute path to where the database file should be saved. The database is
	// used to ensure no loss of added commands, to keep a permanent history of
	// all jobs completed, and to keep various stats, amongst other things.
	DBFile string

	// Absolute path to where the database file should be backed up to.
	DBFileBackup string

	// Name of the deployment ("development" or "production"); development
	// databases are deleted and recreated on start up by default.
	Deployment string
}

ServerConfig is supplied to Serve() to configure your jobqueue server. All fields are required with no working default unless otherwise noted.

type ServerInfo

type ServerInfo struct {
	Addr       string // ip:port
	Host       string // hostname
	Port       string // port
	WebPort    string // port of the web interface
	PID        int    // process id of server
	Deployment string // deployment the server is running under
	Scheduler  string // the name of the scheduler that jobs are being submitted to
	Mode       string // ServerModeNormal if the server is running normally, or ServerModeDrain if draining
}

ServerInfo holds basic addressing info about the server.

type ServerStats

type ServerStats struct {
	ServerInfo *ServerInfo
	Delayed    int           // how many jobs are waiting following a possibly transient error
	Ready      int           // how many jobs are ready to begin running
	Running    int           // how many jobs are currently running
	Buried     int           // how many jobs are no longer being processed because of seemingly permanent errors
	ETC        time.Duration // how long until the the slowest of the currently running jobs is expected to complete
}

ServerStats holds information about the jobqueue server for sending to clients.

Directories

Path Synopsis
Package scheduler lets the jobqueue server interact with the local job scheduler (if any) to submit jobqueue clients and have them run on a compute cluster (or local machine).
Package scheduler lets the jobqueue server interact with the local job scheduler (if any) to submit jobqueue clients and have them run on a compute cluster (or local machine).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL