roadrunner

package module
v2.0.0-alpha11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2020 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// EventWorkerConstruct thrown when new worker is spawned.
	EventWorkerConstruct = iota + 100

	// EventWorkerDestruct thrown after worker destruction.
	EventWorkerDestruct

	// EventWorkerKill thrown after worker is being forcefully killed.
	EventWorkerKill

	// EventWorkerError thrown any worker related even happen (passed with WorkerError)
	EventWorkerEvent

	// EventWorkerDead thrown when worker stops worker for any reason.
	EventWorkerDead

	// EventPoolError caused on pool wide errors
	EventPoolError
)
View Source
const (
	// EventMaxMemory caused when worker consumes more memory than allowed.
	EventMaxMemory = iota + 8000

	// EventTTL thrown when worker is removed due TTL being reached. Context is rr.WorkerError
	EventTTL

	// EventIdleTTL triggered when worker spends too much time at rest.
	EventIdleTTL

	// EventExecTTL triggered when worker spends too much time doing the task (max_execution_time).
	EventExecTTL
)
View Source
const (
	// StateInactive - no associated process
	StateInactive int64 = iota

	// StateReady - ready for job.
	StateReady

	// StateWorking - working on given payload.
	StateWorking

	// StateInvalid - indicates that WorkerProcess is being disabled and will be removed.
	StateInvalid

	// StateStopping - process is being softly stopped.
	StateStopping

	StateKilling
	StateKilled

	// State of worker, when no need to allocate new one
	StateDestroyed

	// StateStopped - process has been terminated.
	StateStopped

	// StateErrored - error state (can't be used).
	StateErrored

	StateRemove
)
View Source
const (
	// EventWorkerError triggered after WorkerProcess. Except payload to be error.
	EventWorkerError int64 = iota + 200

	// EventWorkerLog triggered on every write to WorkerProcess StdErr pipe (batched). Except payload to be []byte string.
	EventWorkerLog

	// EventWorkerWaitDone triggered when worker exit from process Wait
	EventWorkerWaitDone

	EventWorkerBufferClosed

	EventRelayCloseError

	EventWorkerProcessError
)

EventWorkerKill thrown after WorkerProcess is being forcefully killed.

View Source
const MB = 1024 * 1024
View Source
const (
	// StopRequest can be sent by worker to indicate that restart is required.
	StopRequest = "{\"stop\":true}"
)
View Source
const (
	// WaitDuration - for how long error buffer should attempt to aggregate error messages
	// before merging output together since lastError update (required to keep error update together).
	WaitDuration = 100 * time.Millisecond
)

Variables

View Source
var EmptyPayload = Payload{}
View Source
var ErrWatcherStopped = errors.New("watcher stopped")

Functions

This section is empty.

Types

type Config

type Config struct {
	// NumWorkers defines how many sub-processes can be run at once. This value
	// might be doubled by Swapper while hot-swap.
	NumWorkers int64

	// MaxJobs defines how many executions is allowed for the worker until
	// it's destruction. set 1 to create new process for each new task, 0 to let
	// worker handle as many tasks as it can.
	MaxJobs int64

	// AllocateTimeout defines for how long pool will be waiting for a worker to
	// be freed to handle the task.
	AllocateTimeout time.Duration

	// DestroyTimeout defines for how long pool should be waiting for worker to
	// properly destroy, if timeout reached worker will be killed.
	DestroyTimeout time.Duration

	// TTL defines maximum time worker is allowed to live.
	TTL int64

	// IdleTTL defines maximum duration worker can spend in idle mode.
	IdleTTL int64

	// ExecTTL defines maximum lifetime per job.
	ExecTTL time.Duration

	// MaxPoolMemory defines maximum amount of memory allowed for worker. In megabytes.
	MaxPoolMemory uint64

	MaxWorkerMemory uint64
}

Config defines basic behaviour of worker creation and handling process.

func (*Config) InitDefaults

func (cfg *Config) InitDefaults() error

InitDefaults allows to init blank config with pre-defined set of default values.

func (*Config) Valid

func (cfg *Config) Valid() error

Valid returns error if config not valid.

type Factory

type Factory interface {
	// SpawnWorker creates new WorkerProcess process based on given command.
	// Process must not be started.
	SpawnWorkerWithContext(context.Context, *exec.Cmd) (WorkerBase, error)

	SpawnWorker(*exec.Cmd) (WorkerBase, error)

	// Close the factory and underlying connections.
	Close(ctx context.Context) error
}

Factory is responsible of wrapping given command into tasks WorkerProcess.

func NewSocketServer

func NewSocketServer(ls net.Listener, tout time.Duration) Factory

NewSocketServer returns SocketFactory attached to a given socket listener. tout specifies for how long factory should serve for incoming relay connection

type Payload

type Payload struct {
	// Context represent payload context, might be omitted.
	Context []byte

	// body contains binary payload to be processed by WorkerProcess.
	Body []byte
}

Payload carries binary header and body to stack and back to the server.

func (*Payload) String

func (p *Payload) String() string

String returns payload body as string

type PipeFactory

type PipeFactory struct {
}

PipeFactory connects to stack using standard streams (STDIN, STDOUT pipes).

func NewPipeFactory

func NewPipeFactory() *PipeFactory

todo: review tests

func (*PipeFactory) Close

func (f *PipeFactory) Close(ctx context.Context) error

Close the factory.

func (*PipeFactory) SpawnWorker

func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error)

func (*PipeFactory) SpawnWorkerWithContext

func (f *PipeFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error)

SpawnWorker creates new WorkerProcess and connects it to goridge relay, method Wait() must be handled on level above.

type Pool

type Pool interface {
	// ATTENTION, YOU SHOULD CONSUME EVENTS, OTHERWISE POOL WILL BLOCK
	Events() chan PoolEvent

	// Exec one task with given payload and context, returns result or error.
	Exec(ctx context.Context, rqs Payload) (Payload, error)

	// Workers returns worker list associated with the pool.
	Workers() (workers []WorkerBase)

	RemoveWorker(ctx context.Context, worker WorkerBase) error

	Config() Config

	// Destroy all underlying stack (but let them to complete the task).
	Destroy(ctx context.Context)
}

Pool managed set of inner worker processes.

func NewPool

func NewPool(ctx context.Context, cmd func() *exec.Cmd, factory Factory, cfg *Config) (Pool, error)

NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker. supervisor Supervisor, todo: think about it stack func() (WorkerBase, error),

type PoolEvent

type PoolEvent struct {
	Payload interface{}
}

type ProcessState

type ProcessState struct {
	// Pid contains process id.
	Pid int `json:"pid"`

	// Status of the worker.
	Status string `json:"status"`

	// Number of worker executions.
	NumJobs int64 `json:"numExecs"`

	// Created is unix nano timestamp of worker creation time.
	Created int64 `json:"created"`

	// MemoryUsage holds the information about worker memory usage in bytes.
	// Values might vary for different operating systems and based on RSS.
	MemoryUsage uint64 `json:"memoryUsage"`
}

ProcessState provides information about specific worker.

func PoolState

func PoolState(pool Pool) ([]ProcessState, error)

ServerState returns list of all worker states of a given rr server.

func WorkerProcessState

func WorkerProcessState(w WorkerBase) (ProcessState, error)

WorkerProcessState creates new worker state definition.

type SocketFactory

type SocketFactory struct {
	ErrCh chan error
	// contains filtered or unexported fields
}

SocketFactory connects to external stack using socket server.

func (*SocketFactory) Close

func (f *SocketFactory) Close(ctx context.Context) error

Close socket factory and underlying socket connection.

func (*SocketFactory) SpawnWorker

func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (WorkerBase, error)

func (*SocketFactory) SpawnWorkerWithContext

func (f *SocketFactory) SpawnWorkerWithContext(ctx context.Context, cmd *exec.Cmd) (WorkerBase, error)

SpawnWorker creates WorkerProcess and connects it to appropriate relay or returns error

type SpawnResult

type SpawnResult struct {
	// contains filtered or unexported fields
}

type Stack

type Stack struct {
	// contains filtered or unexported fields
}

func NewWorkersStack

func NewWorkersStack() *Stack

func (*Stack) IsEmpty

func (stack *Stack) IsEmpty() bool

func (*Stack) Pop

func (stack *Stack) Pop() (WorkerBase, bool)

func (*Stack) Push

func (stack *Stack) Push(w WorkerBase)

func (*Stack) Reset

func (stack *Stack) Reset()

type State

type State interface {
	fmt.Stringer

	// Value returns state value
	Value() int64
	Set(value int64)

	// NumJobs shows how many times WorkerProcess was invoked
	NumExecs() int64

	// IsActive returns true if WorkerProcess not Inactive or Stopped
	IsActive() bool

	RegisterExec()

	SetLastUsed(lu uint64)

	LastUsed() uint64
}

State represents WorkerProcess status and updated time.

type StaticPool

type StaticPool struct {
	// contains filtered or unexported fields
}

StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.

func (*StaticPool) Config

func (p *StaticPool) Config() Config

Config returns associated pool configuration. Immutable.

func (*StaticPool) Destroy

func (p *StaticPool) Destroy(ctx context.Context)

Destroy all underlying stack (but let them to complete the task).

func (*StaticPool) Events

func (p *StaticPool) Events() chan PoolEvent

func (*StaticPool) Exec

func (p *StaticPool) Exec(ctx context.Context, rqs Payload) (Payload, error)

Exec one task with given payload and context, returns result or error.

func (*StaticPool) RemoveWorker

func (p *StaticPool) RemoveWorker(ctx context.Context, wb WorkerBase) error

func (*StaticPool) Workers

func (p *StaticPool) Workers() (workers []WorkerBase)

Workers returns worker list associated with the pool.

type Supervisor

type Supervisor interface {
	Attach(pool Pool)
	StartWatching() error
	StopWatching()
	Detach()
}

func NewStaticPoolSupervisor

func NewStaticPoolSupervisor(maxWorkerMemory, maxPoolMemory, maxTtl, maxIdle, watchTimeout uint64) Supervisor

The arguments are: maxWorkerMemory - maximum memory allowed for a single worker maxPoolMemory - maximum pool memory allowed for a pool of a workers maxTtl - maximum ttl for the worker after which it will be killed and replaced maxIdle - maximum time to live for the worker in Ready state watchTimeout - time between watching for the workers/pool status

TODO might be just wrap the pool and return ControlledPool with included Pool interface

type SyncWorker

type SyncWorker interface {
	// WorkerBase provides basic functionality for the SyncWorker
	WorkerBase
	// Exec used to execute payload on the SyncWorker
	Exec(ctx context.Context, rqs Payload) (Payload, error)
}

func NewSyncWorker

func NewSyncWorker(w WorkerBase) (SyncWorker, error)

type TaskError

type TaskError []byte

TaskError is job level error (no WorkerProcess halt), wraps at top of error context

func (TaskError) Error

func (te TaskError) Error() string

Error converts error context to string

type WorkerBase

type WorkerBase interface {
	fmt.Stringer

	Created() time.Time

	Events() <-chan WorkerEvent

	Pid() int64

	// State return receive-only WorkerProcess state object, state can be used to safely access
	// WorkerProcess status, time when status changed and number of WorkerProcess executions.
	State() State

	// Start used to run Cmd and immediately return
	Start() error
	// Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is
	// complete and will return process error (if any), if stderr is presented it's value
	// will be wrapped as WorkerError. Method will return error code if php process fails
	// to find or Start the script.
	Wait(ctx context.Context) error

	// Stop sends soft termination command to the WorkerProcess and waits for process completion.
	Stop(ctx context.Context) error
	// Kill kills underlying process, make sure to call Wait() func to gather
	// error log from the stderr. Does not waits for process completion!
	Kill(ctx context.Context) error
	// Relay returns attached to worker goridge relay
	Relay() goridge.Relay
	// AttachRelay used to attach goridge relay to the worker process
	AttachRelay(rl goridge.Relay)
}

func InitBaseWorker

func InitBaseWorker(cmd *exec.Cmd) (WorkerBase, error)

InitBaseWorker creates new WorkerProcess over given exec.cmd.

type WorkerError

type WorkerError struct {
	// Worker
	Worker WorkerBase

	// Caused error
	Caused error
}

WorkerError is WorkerProcess related error

func (WorkerError) Error

func (e WorkerError) Error() string

Error converts error context to string

type WorkerEvent

type WorkerEvent struct {
	Event   int64
	Worker  WorkerBase
	Payload interface{}
}

todo: write comment

type WorkerProcess

type WorkerProcess struct {
	// contains filtered or unexported fields
}

WorkerProcess - supervised process with api over goridge.Relay.

func (*WorkerProcess) AttachRelay

func (w *WorkerProcess) AttachRelay(rl goridge.Relay)

State return receive-only WorkerProcess state object, state can be used to safely access WorkerProcess status, time when status changed and number of WorkerProcess executions.

func (*WorkerProcess) Created

func (w *WorkerProcess) Created() time.Time

func (*WorkerProcess) Events

func (w *WorkerProcess) Events() <-chan WorkerEvent

func (*WorkerProcess) Kill

func (w *WorkerProcess) Kill(ctx context.Context) error

Kill kills underlying process, make sure to call Wait() func to gather error log from the stderr. Does not waits for process completion!

func (*WorkerProcess) Pid

func (w *WorkerProcess) Pid() int64

func (*WorkerProcess) Relay

func (w *WorkerProcess) Relay() goridge.Relay

State return receive-only WorkerProcess state object, state can be used to safely access WorkerProcess status, time when status changed and number of WorkerProcess executions.

func (*WorkerProcess) Start

func (w *WorkerProcess) Start() error

func (*WorkerProcess) State

func (w *WorkerProcess) State() State

State return receive-only WorkerProcess state object, state can be used to safely access WorkerProcess status, time when status changed and number of WorkerProcess executions.

func (*WorkerProcess) Stop

func (w *WorkerProcess) Stop(ctx context.Context) error

Stop sends soft termination command to the WorkerProcess and waits for process completion.

func (*WorkerProcess) String

func (w *WorkerProcess) String() string

String returns WorkerProcess description. fmt.Stringer interface

func (*WorkerProcess) Wait

func (w *WorkerProcess) Wait(ctx context.Context) error

Wait must be called once for each WorkerProcess, call will be released once WorkerProcess is complete and will return process error (if any), if stderr is presented it's value will be wrapped as WorkerError. Method will return error code if php process fails to find or Start the script.

type WorkerWatcher

type WorkerWatcher interface {
	// AddToWatch used to add stack to wait its state
	AddToWatch(ctx context.Context, workers []WorkerBase) error
	// GetFreeWorker provide first free worker
	GetFreeWorker(ctx context.Context) (WorkerBase, error)
	// PutWorker enqueues worker back
	PushWorker(w WorkerBase)
	// AllocateNew used to allocate new worker and put in into the WorkerWatcher
	AllocateNew(ctx context.Context) error
	// Destroy destroys the underlying stack
	Destroy(ctx context.Context)
	// WorkersList return all stack w/o removing it from internal storage
	WorkersList() []WorkerBase
	// RemoveWorker remove worker from the stack
	RemoveWorker(ctx context.Context, wb WorkerBase) error
}

type WorkersWatcher

type WorkersWatcher struct {
	// contains filtered or unexported fields
}

func NewWorkerWatcher

func NewWorkerWatcher(allocator func(args ...interface{}) (WorkerBase, error), numWorkers int64, events chan PoolEvent) *WorkersWatcher

workerCreateFunc can be nil, but in that case, dead stack will not be replaced

func (*WorkersWatcher) AddToWatch

func (ww *WorkersWatcher) AddToWatch(ctx context.Context, workers []WorkerBase) error

func (*WorkersWatcher) AllocateNew

func (ww *WorkersWatcher) AllocateNew(ctx context.Context) error

func (*WorkersWatcher) Destroy

func (ww *WorkersWatcher) Destroy(ctx context.Context)

Destroy all underlying stack (but let them to complete the task)

func (*WorkersWatcher) GetFreeWorker

func (ww *WorkersWatcher) GetFreeWorker(ctx context.Context) (WorkerBase, error)

func (*WorkersWatcher) PushWorker

func (ww *WorkersWatcher) PushWorker(w WorkerBase)

O(1) operation

func (*WorkersWatcher) ReduceWorkersCount

func (ww *WorkersWatcher) ReduceWorkersCount()

func (*WorkersWatcher) RemoveWorker

func (ww *WorkersWatcher) RemoveWorker(ctx context.Context, wb WorkerBase) error

func (*WorkersWatcher) WorkersList

func (ww *WorkersWatcher) WorkersList() []WorkerBase

Warning, this is O(n) operation

Directories

Path Synopsis
plugins

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL