Documentation ¶
Index ¶
- Constants
- Variables
- type After
- type Allocator
- type Before
- type ErrorEncoder
- type Factory
- type Payload
- type PipeFactory
- type Pool
- type PoolConfig
- type PoolEvent
- type PoolOptions
- type ProcessState
- type SocketFactory
- type SpawnResult
- type Stack
- type State
- type StaticPool
- func (sp *StaticPool) AddListener(listener util.EventListener)
- func (sp *StaticPool) Destroy(ctx context.Context)
- func (sp *StaticPool) Exec(p Payload) (Payload, error)
- func (sp *StaticPool) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error)
- func (sp *StaticPool) GetConfig() PoolConfig
- func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error
- func (sp *StaticPool) Workers() (workers []WorkerBase)
- type SupervisedPool
- type SupervisorConfig
- type SyncWorker
- type WorkerBase
- type WorkerEvent
- type WorkerProcess
- func (w *WorkerProcess) AddListener(listener util.EventListener)
- func (w *WorkerProcess) AttachRelay(rl goridge.Relay)
- func (w *WorkerProcess) Created() time.Time
- func (w *WorkerProcess) Kill() error
- func (w *WorkerProcess) Pid() int64
- func (w *WorkerProcess) Relay() goridge.Relay
- func (w *WorkerProcess) Start() error
- func (w *WorkerProcess) State() State
- func (w *WorkerProcess) Stop(ctx context.Context) error
- func (w *WorkerProcess) String() string
- func (w *WorkerProcess) Wait(ctx context.Context) error
- type WorkerWatcher
Constants ¶
const ( // EventWorkerConstruct thrown when new worker is spawned. EventWorkerConstruct = iota + 7800 // EventWorkerDestruct thrown after worker destruction. EventWorkerDestruct // EventPoolError caused on pool wide errors. EventPoolError // EventSupervisorError triggered when supervisor can not complete work. EventSupervisorError // todo: EventMaxMemory caused when worker consumes more memory than allowed. EventMaxMemory // todo: EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError EventTTL // todo: EventIdleTTL triggered when worker spends too much time at rest. EventIdleTTL // todo: EventExecTTL triggered when worker spends too much time doing the task (max_execution_time). EventExecTTL )
const ( // StateInactive - no associated process StateInactive int64 = iota // StateReady - ready for job. StateReady // StateWorking - working on given payload. StateWorking // StateInvalid - indicates that WorkerProcess is being disabled and will be removed. StateInvalid // StateStopping - process is being softly stopped. StateStopping StateKilling // State of worker, when no need to allocate new one StateDestroyed // StateStopped - process has been terminated. StateStopped // StateErrored - error state (can't be used). StateErrored StateRemove )
const ( // EventWorkerError triggered after WorkerProcess. Except payload to be error. EventWorkerError int64 = iota + 200 // EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string. EventWorkerLog )
EventWorkerKill thrown after WorkerProcess is being forcefully killed.
const MB = 1024 * 1024
const StopRequest = "{\"stop\":true}"
StopRequest can be sent by worker to indicate that restart is required.
const ( // WaitDuration - for how long error buffer should attempt to aggregate error messages // before merging output together since lastError update (required to keep error update together). WaitDuration = 25 * time.Millisecond )
Variables ¶
var EmptyPayload = Payload{}
Functions ¶
This section is empty.
Types ¶
type Allocator ¶
type Allocator func() (WorkerBase, error)
Allocator is responsible for worker allocation in the pool
type ErrorEncoder ¶
type ErrorEncoder func(err error, w WorkerBase) (Payload, error)
ErrorEncoder encode error or make a decision based on the error type
type Factory ¶
type Factory interface { // SpawnWorker creates new WorkerProcess process based on given command. // Process must not be started. SpawnWorkerWithContext(context.Context, *exec.Cmd) (WorkerBase, error) SpawnWorker(*exec.Cmd) (WorkerBase, error) // Close the factory and underlying connections. Close(ctx context.Context) error }
Factory is responsible of wrapping given command into tasks WorkerProcess.
type Payload ¶
type Payload struct { // Context represent payload context, might be omitted. Context []byte // body contains binary payload to be processed by WorkerProcess. Body []byte }
Payload carries binary header and body to stack and back to the server.
type PipeFactory ¶
type PipeFactory struct { }
PipeFactory connects to stack using standard streams (STDIN, STDOUT pipes).
func (*PipeFactory) Close ¶
func (f *PipeFactory) Close(ctx context.Context) error
Close the factory.
func (*PipeFactory) SpawnWorker ¶
func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error)
func (*PipeFactory) SpawnWorkerWithContext ¶
func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error)
SpawnWorker creates new WorkerProcess and connects it to goridge relay, method Wait() must be handled on level above.
type Pool ¶
type Pool interface { // AddListener connects event listener to the pool. AddListener(listener util.EventListener) // GetConfig returns pool configuration. GetConfig() PoolConfig // Exec Exec(rqs Payload) (Payload, error) ExecWithContext(ctx context.Context, rqs Payload) (Payload, error) // Workers returns worker list associated with the pool. Workers() (workers []WorkerBase) // Remove worker from the pool. RemoveWorker(ctx context.Context, worker WorkerBase) error // Destroy all underlying stack (but let them to complete the task). Destroy(ctx context.Context) }
Pool managed set of inner worker processes.
type PoolConfig ¶
type PoolConfig struct { // Debug flag creates new fresh worker before every request. Debug bool // NumWorkers defines how many sub-processes can be run at once. This value // might be doubled by Swapper while hot-swap. Defaults to number of CPU cores. NumWorkers int64 // MaxJobs defines how many executions is allowed for the worker until // it's destruction. set 1 to create new process for each new task, 0 to let // worker handle as many tasks as it can. MaxJobs int64 // AllocateTimeout defines for how long pool will be waiting for a worker to // be freed to handle the task. Defaults to 60s. AllocateTimeout time.Duration // DestroyTimeout defines for how long pool should be waiting for worker to // properly destroy, if timeout reached worker will be killed. Defaults to 60s. DestroyTimeout time.Duration // Supervision config to limit worker and pool memory usage. Supervisor *SupervisorConfig }
Configures the pool behaviour.
func (*PoolConfig) InitDefaults ¶
func (cfg *PoolConfig) InitDefaults()
InitDefaults enables default config values.
type PoolEvent ¶
type PoolEvent struct { // Event type, see below. Event int64 // Payload depends on event type, typically it's worker or error. Payload interface{} }
PoolEvent triggered by pool on different events. Pool as also trigger WorkerEvent in case of log.
type PoolOptions ¶
type PoolOptions func(p *StaticPool)
func PoolAfter ¶
func PoolAfter(after ...After) PoolOptions
func PoolBefore ¶
func PoolBefore(before ...Before) PoolOptions
type ProcessState ¶
type ProcessState struct { // Pid contains process id. Pid int `json:"pid"` // Status of the worker. Status string `json:"status"` // Number of worker executions. NumJobs int64 `json:"numExecs"` // Created is unix nano timestamp of worker creation time. Created int64 `json:"created"` // MemoryUsage holds the information about worker memory usage in bytes. // Values might vary for different operating systems and based on RSS. MemoryUsage uint64 `json:"memoryUsage"` }
ProcessState provides information about specific worker.
func PoolState ¶
func PoolState(pool Pool) ([]ProcessState, error)
ServerState returns list of all worker states of a given rr server.
func WorkerProcessState ¶
func WorkerProcessState(w WorkerBase) (ProcessState, error)
WorkerProcessState creates new worker state definition.
type SocketFactory ¶
type SocketFactory struct { ErrCh chan error // contains filtered or unexported fields }
SocketFactory connects to external stack using socket server.
func (*SocketFactory) Close ¶
func (f *SocketFactory) Close(ctx context.Context) error
Close socket factory and underlying socket connection.
func (*SocketFactory) SpawnWorker ¶
func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error)
func (*SocketFactory) SpawnWorkerWithContext ¶
func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error)
SpawnWorker creates WorkerProcess and connects it to appropriate relay or returns error
type SpawnResult ¶
type SpawnResult struct {
// contains filtered or unexported fields
}
type Stack ¶
type Stack struct {
// contains filtered or unexported fields
}
func NewWorkersStack ¶
func NewWorkersStack() *Stack
func (*Stack) Pop ¶
func (stack *Stack) Pop() (WorkerBase, bool)
func (*Stack) Push ¶
func (stack *Stack) Push(w WorkerBase)
type State ¶
type State interface { fmt.Stringer // Value returns state value Value() int64 Set(value int64) // NumJobs shows how many times WorkerProcess was invoked NumExecs() int64 // IsActive returns true if WorkerProcess not Inactive or Stopped IsActive() bool RegisterExec() SetLastUsed(lu uint64) LastUsed() uint64 }
State represents WorkerProcess status and updated time.
type StaticPool ¶
type StaticPool struct {
// contains filtered or unexported fields
}
StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
func (*StaticPool) AddListener ¶
func (sp *StaticPool) AddListener(listener util.EventListener)
AddListener connects event listener to the pool.
func (*StaticPool) Destroy ¶
func (sp *StaticPool) Destroy(ctx context.Context)
Destroy all underlying stack (but let them to complete the task).
func (*StaticPool) ExecWithContext ¶
func (*StaticPool) GetConfig ¶
func (sp *StaticPool) GetConfig() PoolConfig
PoolConfig returns associated pool configuration. Immutable.
func (*StaticPool) RemoveWorker ¶
func (sp *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error
func (*StaticPool) Workers ¶
func (sp *StaticPool) Workers() (workers []WorkerBase)
Workers returns worker list associated with the pool.
type SupervisedPool ¶
type SupervisedPool interface { Pool // Start used to start watching process for all pool workers Start() }
type SupervisorConfig ¶
type SupervisorConfig struct { // WatchTick defines how often to check the state of worker. WatchTick uint64 // TTL defines maximum time worker is allowed to live. TTL uint64 // IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0. IdleTTL uint64 // ExecTTL defines maximum lifetime per job. ExecTTL uint64 // MaxWorkerMemory limits memory per worker. MaxWorkerMemory uint64 }
func (*SupervisorConfig) InitDefaults ¶
func (cfg *SupervisorConfig) InitDefaults()
InitDefaults enables default config values.
type SyncWorker ¶
type SyncWorker interface { // WorkerBase provides basic functionality for the SyncWorker WorkerBase // Exec used to execute payload on the SyncWorker, there is no TIMEOUTS Exec(rqs Payload) (Payload, error) // ExecWithContext used to handle Exec with TTL ExecWithContext(ctx context.Context, p Payload) (Payload, error) }
func NewSyncWorker ¶
func NewSyncWorker(w WorkerBase) (SyncWorker, error)
type WorkerBase ¶
type WorkerBase interface { fmt.Stringer // Pid returns worker pid. Pid() int64 // Created returns time worker was created at. Created() time.Time // AddListener attaches listener to consume worker events. AddListener(listener util.EventListener) // State return receive-only WorkerProcess state object, state can be used to safely access // WorkerProcess status, time when status changed and number of WorkerProcess executions. State() State // Start used to run Cmd and immediately return Start() error // Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is // complete and will return process error (if any), if stderr is presented it's value // will be wrapped as WorkerError. Method will return error code if php process fails // to find or Start the script. Wait(ctx context.Context) error // Stop sends soft termination command to the WorkerProcess and waits for process completion. Stop(ctx context.Context) error // Kill kills underlying process, make sure to call Wait() func to gather // error log from the stderr. Does not waits for process completion! Kill() error // Relay returns attached to worker goridge relay Relay() goridge.Relay // AttachRelay used to attach goridge relay to the worker process AttachRelay(rl goridge.Relay) }
func InitBaseWorker ¶
func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error)
InitBaseWorker creates new WorkerProcess over given exec.cmd.
type WorkerEvent ¶
type WorkerEvent struct { // Event id, see below. Event int64 // Worker triggered the event. Worker WorkerBase // Event specific payload. Payload interface{} }
WorkerEvent wraps worker events.
type WorkerProcess ¶
type WorkerProcess struct {
// contains filtered or unexported fields
}
WorkerProcess - supervised process with api over goridge.Relay.
func (*WorkerProcess) AddListener ¶
func (w *WorkerProcess) AddListener(listener util.EventListener)
AddListener registers new worker event listener.
func (*WorkerProcess) AttachRelay ¶
func (w *WorkerProcess) AttachRelay(rl goridge.Relay)
State return receive-only WorkerProcess state object, state can be used to safely access WorkerProcess status, time when status changed and number of WorkerProcess executions.
func (*WorkerProcess) Created ¶
func (w *WorkerProcess) Created() time.Time
Created returns time worker was created at.
func (*WorkerProcess) Kill ¶
func (w *WorkerProcess) Kill() error
Kill kills underlying process, make sure to call Wait() func to gather error log from the stderr. Does not waits for process completion!
func (*WorkerProcess) Relay ¶
func (w *WorkerProcess) Relay() goridge.Relay
State return receive-only WorkerProcess state object, state can be used to safely access WorkerProcess status, time when status changed and number of WorkerProcess executions.
func (*WorkerProcess) Start ¶
func (w *WorkerProcess) Start() error
func (*WorkerProcess) State ¶
func (w *WorkerProcess) State() State
State return receive-only WorkerProcess state object, state can be used to safely access WorkerProcess status, time when status changed and number of WorkerProcess executions.
func (*WorkerProcess) Stop ¶
func (w *WorkerProcess) Stop(ctx context.Context) error
Stop sends soft termination command to the WorkerProcess and waits for process completion.
func (*WorkerProcess) String ¶
func (w *WorkerProcess) String() string
String returns WorkerProcess description. fmt.Stringer interface
func (*WorkerProcess) Wait ¶
func (w *WorkerProcess) Wait(ctx context.Context) error
Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is complete and will return process error (if any), if stderr is presented it's value will be wrapped as WorkerError. Method will return error code if php process fails to find or Start the script.
type WorkerWatcher ¶
type WorkerWatcher interface { // AddToWatch used to add stack to wait its state AddToWatch(ctx context.Context, workers []WorkerBase) error // GetFreeWorker provide first free worker GetFreeWorker(ctx context.Context) (WorkerBase, error) // PutWorker enqueues worker back PushWorker(w WorkerBase) // AllocateNew used to allocate new worker and put in into the WorkerWatcher AllocateNew(ctx context.Context) error // Destroy destroys the underlying stack Destroy(ctx context.Context) // WorkersList return all stack w/o removing it from internal storage WorkersList() []WorkerBase // RemoveWorker remove worker from the stack RemoveWorker(ctx context.Context, wb WorkerBase) error }