Documentation ¶
Index ¶
- type Command
- func (cmd *Command) Broadcast(outChannels map[int]chan Command) bool
- func (cmd *Command) Fail(msg string) bool
- func (cmd *Command) Forward(outChannel chan Command) bool
- func (cmd *Command) SafeReply(reply CommandReply) error
- func (cmd *Command) SafeSend(out chan Command) error
- func (cmd Command) Send(outChannel chan Command) string
- func (cmd Command) String() string
- func (cmd *Command) Success(msg string) bool
- func (cmd Command) ToJSON() []byte
- type CommandReply
- type HTTPHandler
- type JobqueueKeepAliveHandler
- type KeepAlive
- type KeepAliveConf
- type Runner
- type SignalHandler
- type TaskManager
- func (task *TaskManager) CopyFrom(autotask TaskManager, tPath string)
- func (task *TaskManager) ListWorkers() []string
- func (task *TaskManager) MaintainWorkerCardinality() error
- func (task *TaskManager) RunCommand(cmd Command)
- func (task *TaskManager) Set(cmd Command)
- func (task *TaskManager) Start(commands chan Command, cmd Command)
- func (task *TaskManager) StartWorker() error
- func (task *TaskManager) Status() (ret map[string]string)
- func (task *TaskManager) Stop()
- func (task *TaskManager) StopWorker(pid int, ch chan Command)
- func (task *TaskManager) StopWorkers()
- func (task *TaskManager) StopWorkersByPid(pids []int)
- type TaskManagerConf
- type Worker
- type WorkerInfo
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Command ¶
type Command struct { Type string `json:"type"` TaskName string `json:"taskname,omitempty"` Params map[string]interface{} `json:"params,omitempty"` Timeout int64 `json:"timeout"` ReplyChannel chan CommandReply }
Command sent on the command channel. Might be specific to a task or generic. The type can be one of 'status', 'set', 'stop', 'listworkers', 'stopworkers' or 'stoppedworkers'
func (*Command) Broadcast ¶
Broadcast the command to other channels, wait for all the replies and close the channel
func (*Command) Forward ¶
Forward the command to another channel, wait for the reply and close the channel
func (*Command) SafeReply ¶
func (cmd *Command) SafeReply(reply CommandReply) error
SafeReply sends a reply but recovers from the panic if the output channel is closed
func (*Command) SafeSend ¶
SafeSend attempts sending a command to an output channel, recovering from the panic if the channel was closed in the meanwhile
type CommandReply ¶
CommandReply is the type of a reply on the ReplyChannel for a Command. It contains the successful response (string) or an error on command failure
func (CommandReply) String ¶
func (r CommandReply) String() string
String implements the Stringer interface
type HTTPHandler ¶
HTTPHandler holds the configuration for the HTTP handler
func (*HTTPHandler) Run ¶
func (handler *HTTPHandler) Run()
Run the HTTP handler, which exposes an HTTP interface to control tasks
type JobqueueKeepAliveHandler ¶
JobqueueKeepAliveHandler contains the configuration for the Keep-Alive handler
func (*JobqueueKeepAliveHandler) Run ¶
func (handler *JobqueueKeepAliveHandler) Run(keepalives chan<- KeepAlive)
Run method: Listen to keep-alive messages sent via ZeroMQ and forward them to a channel
type KeepAliveConf ¶
type KeepAliveConf struct { InboundPort int `json:"inbound_port,omitempty"` InternalPort int `json:"internal_port,omitempty"` Host string `json:"host,omitempty"` StallTimeout int64 `json:"stall_timeout,omitempty"` GracePeriod int64 `json:"grace_period,omitempty"` }
KeepAliveConf contains the configuration for Keep-Alive handler and ZeroMQ channel
type Runner ¶
type Runner struct { Conf TaskManagerConf // contains filtered or unexported fields }
Runner is a container for Task Managers
func NewRunner ¶
func NewRunner(taskMgrConf TaskManagerConf) (Runner, error)
NewRunner Returns an instance of a Task Manager Runner
type SignalHandler ¶
SignalHandler wrap the command channel
func (*SignalHandler) Run ¶
func (handler *SignalHandler) Run()
Run the Signal handler, to intercept interrupts and shut down processes cleanly
type TaskManager ¶
type TaskManager struct { Name string `json:"name,omitempty"` // task name Cmd string `json:"cmd,omitempty"` // cli command Args []string `json:"args,omitempty"` // cli args Cardinality int `json:"cardinality,omitempty"` // number of workers StallTimeout int64 `json:"stall_timeout,omitempty"` // consider the worker dead if no keep-alives are received for this period (ms) GracePeriod int64 `json:"grace_period,omitempty"` // grace period (ms) before killing a worker after being asked to stop AutoStart bool `json:"autostart,omitempty"` // whether to start the task automatically CaptureOutput bool `json:"capture_output,omitempty"` // whether to capture the output and send it to stdout Active bool // contains filtered or unexported fields }
A TaskManager is a process manager for a specific task, keeping the cardinality of the number of worker processes to the desired value, and managing keep-alives
func NewTaskManager ¶
func NewTaskManager(name string, keepAliveConf KeepAliveConf, feedback chan Command) TaskManager
NewTaskManager creates a new Task Manager instance
func (*TaskManager) CopyFrom ¶
func (task *TaskManager) CopyFrom(autotask TaskManager, tPath string)
CopyFrom Updates settings for a task manager from another task manager
func (*TaskManager) ListWorkers ¶
func (task *TaskManager) ListWorkers() []string
ListWorkers returns status information from each worker process for this task
func (*TaskManager) MaintainWorkerCardinality ¶
func (task *TaskManager) MaintainWorkerCardinality() error
MaintainWorkerCardinality keeps the number of workers to the desired cardinality
func (*TaskManager) RunCommand ¶
func (task *TaskManager) RunCommand(cmd Command)
RunCommand runs a command on this task. Results are sent to the reply channel of the command itself
func (*TaskManager) Set ¶
func (task *TaskManager) Set(cmd Command)
Set task options (only "cardinality" is supported ATM)
func (*TaskManager) Start ¶
func (task *TaskManager) Start(commands chan Command, cmd Command)
Start the workers for this task
func (*TaskManager) StartWorker ¶
func (task *TaskManager) StartWorker() error
StartWorker creates a new worker process
func (*TaskManager) Status ¶
func (task *TaskManager) Status() (ret map[string]string)
Status gets the status for this task (number of active workers, last alive TS, etc.)
func (*TaskManager) Stop ¶
func (task *TaskManager) Stop()
Stop asks all of this task's workers stop gracefully (or forcefully if they don't terminate in a timely fashion)
func (*TaskManager) StopWorker ¶
func (task *TaskManager) StopWorker(pid int, ch chan Command)
StopWorker sends a SIGTERM signal to the worker process identified by the given pid
func (*TaskManager) StopWorkers ¶
func (task *TaskManager) StopWorkers()
StopWorkers asks all workers for this task to stop and waits for them to terminate
func (*TaskManager) StopWorkersByPid ¶
func (task *TaskManager) StopWorkersByPid(pids []int)
StopWorkersByPid asks all workers in the pid list to stop and waits for them to terminate
type TaskManagerConf ¶
type TaskManagerConf struct { Path string `json:"path,omitempty"` FileSuffix string `json:"filesuffix,omitempty"` Port int `json:"port,omitempty"` Autotasks map[string]TaskManager `json:"autotasks,omitempty"` Keepalives KeepAliveConf `json:"keepalives,omitempty"` ForceTimeout int64 `json:"force_timeout"` Profiler profiler.Config `json:"profiler"` }
TaskManagerConf contains the configuration for the Task Manager
type Worker ¶
type Worker struct { Taskname string `json:"taskname"` Command string `json:"command,omitempty"` Args []string `json:"args,omitempty"` CaptureOutput bool `json:"capture_output,omitempty"` StallTimeout int64 `json:"stall_timeout,omitempty"` GracePeriod int64 `json:"grace_period,omitempty"` // grace period (ms) before killing a worker after being asked to stop Pid int `json:"pid,omitempty"` StartedAt time.Time `json:"started_at,omitempty"` LastAliveAt time.Time `json:"last_alive_at,omitempty"` Logger *log.Logger `json:"-"` // don't export TaskFeedbackChannel chan<- Command `json:"-"` // don't export CommandsChannel chan Command `json:"-"` // don't export // contains filtered or unexported fields }
Worker manages a single worker process
func (*Worker) HasStalled ¶
HasStalled checks if the worker process is alive and has sent a keep-alive message recently
func (*Worker) Info ¶
func (w *Worker) Info(replyChan chan<- CommandReply)
Info returns the status of the current worker process
func (*Worker) IsProcessAlive ¶
IsProcessAlive checks if the process is still around @see http://stackoverflow.com/questions/15204162/check-if-a-process-exists-in-go-way
func (*Worker) Start ¶
func (w *Worker) Start() (*WorkerInfo, error)
Start spawns a new worker process
type WorkerInfo ¶
WorkerInfo is a wrapper for the process pid and the channels to communicate with the task manager