Documentation ¶
Index ¶
- Constants
- type Attacher
- type CommandProducer
- type Config
- type Controllable
- type Controller
- type Factory
- type JobError
- type Payload
- type PipeFactory
- type Pool
- type Server
- func (s *Server) Attach(c Controller)
- func (s *Server) Exec(rqs *Payload) (rsp *Payload, err error)
- func (s *Server) Listen(l func(event int, ctx interface{}))
- func (s *Server) Pool() Pool
- func (s *Server) Reconfigure(cfg *ServerConfig) error
- func (s *Server) Reset() error
- func (s *Server) Start() (err error)
- func (s *Server) Stop()
- func (s *Server) Workers() (workers []*Worker)
- type ServerConfig
- type SocketFactory
- type State
- type StaticPool
- func (p *StaticPool) Config() Config
- func (p *StaticPool) Destroy()
- func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error)
- func (p *StaticPool) Listen(l func(event int, ctx interface{}))
- func (p *StaticPool) Remove(w *Worker, err error) bool
- func (p *StaticPool) Workers() (workers []*Worker)
- type Worker
- type WorkerError
Constants ¶
const ( // EventStderrOutput - is triggered when worker sends data into stderr. The context // is error message ([]byte). EventStderrOutput = 1900 // WaitDuration - for how long error buffer should attempt to aggregate error messages // before merging output together since lastError update (required to keep error update together). WaitDuration = 100 * time.Millisecond )
const ( // EventWorkerConstruct thrown when new worker is spawned. EventWorkerConstruct = iota + 100 // EventWorkerDestruct thrown after worker destruction. EventWorkerDestruct // EventWorkerKill thrown after worker is being forcefully killed. EventWorkerKill // EventWorkerError thrown any worker related even happen (passed with WorkerError) EventWorkerError // EventWorkerDead thrown when worker stops worker for any reason. EventWorkerDead // EventPoolError caused on pool wide errors EventPoolError )
const ( // EventServerStart triggered when server creates new pool. EventServerStart = iota + 200 // EventServerStop triggered when server creates new pool. EventServerStop // EventServerFailure triggered when server is unable to replace dead pool. EventServerFailure // EventPoolConstruct triggered when server creates new pool. EventPoolConstruct // EventPoolDestruct triggered when server destroys existed pool. EventPoolDestruct )
const ( // StateInactive - no associated process StateInactive int64 = iota // StateReady - ready for job. StateReady // StateWorking - working on given payload. StateWorking // StateInvalid - indicates that worker is being disabled and will be removed. StateInvalid // StateStopping - process is being softly stopped. StateStopping // StateStopped - process has been terminated. StateStopped // StateErrored - error state (can't be used). StateErrored )
const (
// StopRequest can be sent by worker to indicate that restart is required.
StopRequest = "{\"stop\":true}"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Attacher ¶ added in v1.6.3
type Attacher interface { // Attach attaches controller to the service. Attach(c Controller) }
Attacher defines the ability to attach rr controller.
type CommandProducer ¶ added in v1.5.3
type CommandProducer func(cfg *ServerConfig) func() *exec.Cmd
CommandProducer can produce commands.
type Config ¶
type Config struct { // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. NumWorkers int64 // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. MaxJobs int64 // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. AllocateTimeout time.Duration // DestroyTimeout defines for how long pool should be waiting for worker to // properly stop, if timeout reached worker will be killed. DestroyTimeout time.Duration }
Config defines basic behaviour of worker creation and handling process.
func (*Config) InitDefaults ¶ added in v1.2.2
InitDefaults allows to init blank config with pre-defined set of default values.
type Controllable ¶ added in v1.6.3
type Controllable interface { // Server represents RR server Server() *Server }
Controllable defines the ability to attach rr controller.
type Controller ¶ added in v1.4.0
type Controller interface { // Lock controller on given pool instance. Attach(p Pool) Controller // Detach pool watching. Detach() }
Controller observes pool state and decides if any worker must be destroyed.
type Factory ¶
type Factory interface { // SpawnWorker creates new worker process based on given command. // Process must not be started. SpawnWorker(cmd *exec.Cmd) (w *Worker, err error) // Close the factory and underlying connections. Close() error }
Factory is responsible of wrapping given command into tasks worker.
type JobError ¶
type JobError []byte
JobError is job level error (no worker halt), wraps at top of error context
type Payload ¶
type Payload struct { // Context represent payload context, might be omitted. Context []byte // body contains binary payload to be processed by worker. Body []byte }
Payload carries binary header and body to workers and back to the server.
type PipeFactory ¶
type PipeFactory struct { }
PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes).
func NewPipeFactory ¶
func NewPipeFactory() *PipeFactory
NewPipeFactory returns new factory instance and starts listening
func (*PipeFactory) SpawnWorker ¶
func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
SpawnWorker creates new worker and connects it to goridge relay, method Wait() must be handled on level above.
type Pool ¶
type Pool interface { // Listen all caused events to attached controller. Listen(l func(event int, ctx interface{})) // Exec one task with given payload and context, returns result or error. Exec(rqs *Payload) (rsp *Payload, err error) // Workers returns worker list associated with the pool. Workers() (workers []*Worker) // Remove forces pool to remove specific worker. Return true is this is first remove request on given worker. Remove(w *Worker, err error) bool // Destroy all underlying workers (but let them to complete the task). Destroy() }
Pool managed set of inner worker processes.
type Server ¶ added in v1.0.0
type Server struct {
// contains filtered or unexported fields
}
Server manages pool creation and swapping.
func NewServer ¶ added in v1.0.0
func NewServer(cfg *ServerConfig) *Server
NewServer creates new router. Make sure to call configure before the usage.
func (*Server) Attach ¶ added in v1.4.0
func (s *Server) Attach(c Controller)
Attach attaches worker controller.
func (*Server) Exec ¶ added in v1.0.0
Exec one task with given payload and context, returns result or error.
func (*Server) Reconfigure ¶ added in v1.0.0
func (s *Server) Reconfigure(cfg *ServerConfig) error
Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory and relay settings.
func (*Server) Reset ¶ added in v1.0.0
Reset resets the state of underlying pool and rebuilds all of it's workers.
func (*Server) Start ¶ added in v1.0.0
Start underlying worker pool, configure factory and command provider.
type ServerConfig ¶ added in v1.0.0
type ServerConfig struct { // Command includes command strings with all the parameters, example: "php worker.php pipes". Command string // User under which process will be started User string // CommandProducer overwrites CommandProducer CommandProducer // Relay defines connection method and factory to be used to connect to workers: // "pipes", "tcp://:6001", "unix://rr.sock" // This config section must not change on re-configuration. Relay string // RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section // must not change on re-configuration. RelayTimeout time.Duration // Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change // while server is running. Pool *Config // contains filtered or unexported fields }
ServerConfig config combines factory, pool and cmd configurations.
func (*ServerConfig) Differs ¶ added in v1.0.0
func (cfg *ServerConfig) Differs(new *ServerConfig) bool
Differs returns true if configuration has changed but ignores pool or cmd changes.
func (*ServerConfig) GetEnv ¶ added in v1.5.3
func (cfg *ServerConfig) GetEnv() (env []string)
GetEnv must return list of env variables.
func (*ServerConfig) InitDefaults ¶ added in v1.2.2
func (cfg *ServerConfig) InitDefaults() error
InitDefaults sets missing values to their default values.
func (*ServerConfig) SetEnv ¶ added in v1.1.1
func (cfg *ServerConfig) SetEnv(k, v string)
SetEnv sets new environment variable. Value is automatically uppercase-d.
func (*ServerConfig) UpscaleDurations ¶ added in v1.2.0
func (cfg *ServerConfig) UpscaleDurations()
UpscaleDurations converts duration values from nanoseconds to seconds.
type SocketFactory ¶
type SocketFactory struct {
// contains filtered or unexported fields
}
SocketFactory connects to external workers using socket server.
func NewSocketFactory ¶
func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory
NewSocketFactory returns SocketFactory attached to a given socket lsn. tout specifies for how long factory should serve for incoming relay connection
func (*SocketFactory) Close ¶ added in v1.0.0
func (f *SocketFactory) Close() error
Close socket factory and underlying socket connection.
func (*SocketFactory) SpawnWorker ¶
func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)
SpawnWorker creates worker and connects it to appropriate relay or returns error
type State ¶
type State interface { fmt.Stringer // Value returns state value Value() int64 // NumJobs shows how many times worker was invoked NumExecs() int64 // IsActive returns true if worker not Inactive or Stopped IsActive() bool }
State represents worker status and updated time.
type StaticPool ¶ added in v1.0.0
type StaticPool struct {
// contains filtered or unexported fields
}
StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers.
func NewPool ¶
NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.
func (*StaticPool) Config ¶ added in v1.0.0
func (p *StaticPool) Config() Config
Config returns associated pool configuration. Immutable.
func (*StaticPool) Destroy ¶ added in v1.0.0
func (p *StaticPool) Destroy()
Destroy all underlying workers (but let them to complete the task).
func (*StaticPool) Exec ¶ added in v1.0.0
func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error)
Exec one task with given payload and context, returns result or error.
func (*StaticPool) Listen ¶ added in v1.0.0
func (p *StaticPool) Listen(l func(event int, ctx interface{}))
Listen attaches pool event controller.
func (*StaticPool) Remove ¶ added in v1.4.0
func (p *StaticPool) Remove(w *Worker, err error) bool
Remove forces pool to remove specific worker.
func (*StaticPool) Workers ¶ added in v1.0.0
func (p *StaticPool) Workers() (workers []*Worker)
Workers returns worker list associated with the pool.
type Worker ¶
type Worker struct { // Pid of the process, points to Pid of underlying process and // can be nil while process is not started. Pid *int // Created indicates at what time worker has been created. Created time.Time // contains filtered or unexported fields }
Worker - supervised process with api over goridge.Relay.
func (*Worker) Exec ¶
Exec sends payload to worker, executes it and returns result or error. Make sure to handle worker.Wait() to gather worker level errors. Method might return JobError indicating issue with payload.
func (*Worker) Kill ¶
Kill kills underlying process, make sure to call Wait() func to gather error log from the stderr. Does not waits for process completion!
func (*Worker) State ¶
State return receive-only worker state object, state can be used to safely access worker status, time when status changed and number of worker executions.
func (*Worker) Stop ¶
Stop sends soft termination command to the worker and waits for process completion.
type WorkerError ¶ added in v1.0.0
WorkerError is worker related error
func (WorkerError) Error ¶ added in v1.0.0
func (e WorkerError) Error() string
Error converts error context to string