roadrunner

package module
v1.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2020 License: MIT Imports: 17 Imported by: 9

README

RoadRunner

Total alerts

RoadRunner is an open-source (MIT licensed) high-performance PHP application server, load balancer, and process manager. It supports running as a service with the ability to extend its functionality on a per-project basis.

RoadRunner includes PSR-7/PSR-17 compatible HTTP and HTTP/2 server and can be used to replace classic Nginx+FPM setup with much greater performance and flexibility.

Official Website | Documentation

Features:

  • Production-ready
  • PCI DSS compliant
  • PSR-7 HTTP server (file uploads, error handling, static files, hot reload, middlewares, event listeners)
  • HTTPS and HTTP/2 support (including HTTP/2 Push, H2C)
  • Fully customizable server, FastCGI support
  • Flexible environment configuration
  • No external PHP dependencies (64bit version required), drop-in (based on Goridge)
  • Load balancer, process manager and task pipeline
  • Frontend agnostic (Queue, PSR-7, GRPC, etc)
  • Integrated metrics (Prometheus)
  • Works over TCP, UNIX sockets and standard pipes
  • Automatic worker replacement and safe PHP process destruction
  • Worker create/allocate/destroy timeouts
  • Max jobs per worker
  • Worker lifecycle management (controller)
    • maxMemory (graceful stop)
    • TTL (graceful stop)
    • idleTTL (graceful stop)
    • execTTL (brute, max_execution_time)
  • Payload context and body
  • Protocol, worker and job level error management (including PHP errors)
  • Very fast (~250k rpc calls per second on Ryzen 1700X using 16 threads)
  • Integrations with Symfony, Laravel, Slim, CakePHP, Zend Expressive
  • Application server for Spiral
  • Automatic reloading on file changes
  • Works on Windows (Unix sockets (AF_UNIX) supported on Windows 10)

Installation:

To install:

$ composer require spiral/roadrunner
$ ./vendor/bin/rr get-binary

For getting roadrunner binary file you can use our docker image: spiralscout/roadrunner:X.X.X (more information about image and tags can be found here)

Extensions:

Extension Current Status
spiral/jobs Latest Stable Version Build Status Codecov
spiral/php-grpc Latest Stable Version Build Status Codecov
spiral/broadcast Latest Stable Version Build Status Codecov
spiral/broadcast-ws Latest Stable Version Build Status Codecov

Example:

<?php
// worker.php
ini_set('display_errors', 'stderr');
include "vendor/autoload.php";

$relay = new Spiral\Goridge\StreamRelay(STDIN, STDOUT);
$psr7 = new Spiral\RoadRunner\PSR7Client(new Spiral\RoadRunner\Worker($relay));

while ($req = $psr7->acceptRequest()) {
    try {
        $resp = new \Zend\Diactoros\Response();
        $resp->getBody()->write("hello world");

        $psr7->respond($resp);
    } catch (\Throwable $e) {
        $psr7->getWorker()->error((string)$e);
    }
}

Configuration can be located in .rr.yaml file (full sample):

http:
  address:         0.0.0.0:8080
  workers.command: "php worker.php"

Read more in Documentation.

Run:

To run application server:

$ ./rr serve -v -d

License:

The MIT License (MIT). Please see LICENSE for more information. Maintained by Spiral Scout.

Documentation

Index

Constants

View Source
const (
	// EventStderrOutput - is triggered when worker sends data into stderr. The context
	// is error message ([]byte).
	EventStderrOutput = 1900

	// WaitDuration - for how long error buffer should attempt to aggregate error messages
	// before merging output together since lastError update (required to keep error update together).
	WaitDuration = 100 * time.Millisecond
)
View Source
const (
	// EventWorkerConstruct thrown when new worker is spawned.
	EventWorkerConstruct = iota + 100

	// EventWorkerDestruct thrown after worker destruction.
	EventWorkerDestruct

	// EventWorkerKill thrown after worker is being forcefully killed.
	EventWorkerKill

	// EventWorkerError thrown any worker related even happen (passed with WorkerError)
	EventWorkerError

	// EventWorkerDead thrown when worker stops worker for any reason.
	EventWorkerDead

	// EventPoolError caused on pool wide errors
	EventPoolError
)
View Source
const (
	// EventServerStart triggered when server creates new pool.
	EventServerStart = iota + 200

	// EventServerStop triggered when server creates new pool.
	EventServerStop

	// EventServerFailure triggered when server is unable to replace dead pool.
	EventServerFailure

	// EventPoolConstruct triggered when server creates new pool.
	EventPoolConstruct

	// EventPoolDestruct triggered when server destroys existed pool.
	EventPoolDestruct
)
View Source
const (
	// StateInactive - no associated process
	StateInactive int64 = iota

	// StateReady - ready for job.
	StateReady

	// StateWorking - working on given payload.
	StateWorking

	// StateInvalid - indicates that worker is being disabled and will be removed.
	StateInvalid

	// StateStopping - process is being softly stopped.
	StateStopping

	// StateStopped - process has been terminated.
	StateStopped

	// StateErrored - error state (can't be used).
	StateErrored
)
View Source
const (
	// StopRequest can be sent by worker to indicate that restart is required.
	StopRequest = "{\"stop\":true}"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Attacher added in v1.6.3

type Attacher interface {
	// Attach attaches controller to the service.
	Attach(c Controller)
}

Attacher defines the ability to attach rr controller.

type CommandProducer added in v1.5.3

type CommandProducer func(cfg *ServerConfig) func() *exec.Cmd

CommandProducer can produce commands.

type Config

type Config struct {
	// NumWorkers defines how many sub-processes can be run at once. This value
	// might be doubled by Swapper while hot-swap.
	NumWorkers int64

	// MaxJobs defines how many executions is allowed for the worker until
	// it's destruction. set 1 to create new process for each new task, 0 to let
	// worker handle as many tasks as it can.
	MaxJobs int64

	// AllocateTimeout defines for how long pool will be waiting for a worker to
	// be freed to handle the task.
	AllocateTimeout time.Duration

	// DestroyTimeout defines for how long pool should be waiting for worker to
	// properly stop, if timeout reached worker will be killed.
	DestroyTimeout time.Duration
}

Config defines basic behaviour of worker creation and handling process.

func (*Config) InitDefaults added in v1.2.2

func (cfg *Config) InitDefaults() error

InitDefaults allows to init blank config with pre-defined set of default values.

func (*Config) Valid

func (cfg *Config) Valid() error

Valid returns error if config not valid.

type Controllable added in v1.6.3

type Controllable interface {
	// Server represents RR server
	Server() *Server
}

Controllable defines the ability to attach rr controller.

type Controller added in v1.4.0

type Controller interface {
	// Lock controller on given pool instance.
	Attach(p Pool) Controller

	// Detach pool watching.
	Detach()
}

Controller observes pool state and decides if any worker must be destroyed.

type Factory

type Factory interface {
	// SpawnWorker creates new worker process based on given command.
	// Process must not be started.
	SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)

	// Close the factory and underlying connections.
	Close() error
}

Factory is responsible of wrapping given command into tasks worker.

type JobError

type JobError []byte

JobError is job level error (no worker halt), wraps at top of error context

func (JobError) Error

func (je JobError) Error() string

Error converts error context to string

type Payload

type Payload struct {
	// Context represent payload context, might be omitted.
	Context []byte

	// body contains binary payload to be processed by worker.
	Body []byte
}

Payload carries binary header and body to workers and back to the server.

func (*Payload) String

func (p *Payload) String() string

String returns payload body as string

type PipeFactory

type PipeFactory struct {
}

PipeFactory connects to workers using standard streams (STDIN, STDOUT pipes).

func NewPipeFactory

func NewPipeFactory() *PipeFactory

NewPipeFactory returns new factory instance and starts listening

func (*PipeFactory) Close added in v1.0.0

func (f *PipeFactory) Close() error

Close the factory.

func (*PipeFactory) SpawnWorker

func (f *PipeFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)

SpawnWorker creates new worker and connects it to goridge relay, method Wait() must be handled on level above.

type Pool

type Pool interface {
	// Listen all caused events to attached controller.
	Listen(l func(event int, ctx interface{}))

	// Exec one task with given payload and context, returns result or error.
	Exec(rqs *Payload) (rsp *Payload, err error)

	// Workers returns worker list associated with the pool.
	Workers() (workers []*Worker)

	// Remove forces pool to remove specific worker. Return true is this is first remove request on given worker.
	Remove(w *Worker, err error) bool

	// Destroy all underlying workers (but let them to complete the task).
	Destroy()
}

Pool managed set of inner worker processes.

type Server added in v1.0.0

type Server struct {
	// contains filtered or unexported fields
}

Server manages pool creation and swapping.

func NewServer added in v1.0.0

func NewServer(cfg *ServerConfig) *Server

NewServer creates new router. Make sure to call configure before the usage.

func (*Server) Attach added in v1.4.0

func (s *Server) Attach(c Controller)

Attach attaches worker controller.

func (*Server) Exec added in v1.0.0

func (s *Server) Exec(rqs *Payload) (rsp *Payload, err error)

Exec one task with given payload and context, returns result or error.

func (*Server) Listen added in v1.0.0

func (s *Server) Listen(l func(event int, ctx interface{}))

Listen attaches server event controller.

func (*Server) Pool added in v1.0.0

func (s *Server) Pool() Pool

Pool returns active pool or error.

func (*Server) Reconfigure added in v1.0.0

func (s *Server) Reconfigure(cfg *ServerConfig) error

Reconfigure re-configures underlying pool and destroys it's previous version if any. Reconfigure will ignore factory and relay settings.

func (*Server) Reset added in v1.0.0

func (s *Server) Reset() error

Reset resets the state of underlying pool and rebuilds all of it's workers.

func (*Server) Start added in v1.0.0

func (s *Server) Start() (err error)

Start underlying worker pool, configure factory and command provider.

func (*Server) Stop added in v1.0.0

func (s *Server) Stop()

Stop underlying worker pool and close the factory.

func (*Server) Workers added in v1.0.0

func (s *Server) Workers() (workers []*Worker)

Workers returns worker list associated with the server pool.

type ServerConfig added in v1.0.0

type ServerConfig struct {
	// Command includes command strings with all the parameters, example: "php worker.php pipes".
	Command string

	// User under which process will be started
	User string

	// CommandProducer overwrites
	CommandProducer CommandProducer

	// Relay defines connection method and factory to be used to connect to workers:
	// "pipes", "tcp://:6001", "unix://rr.sock"
	// This config section must not change on re-configuration.
	Relay string

	// RelayTimeout defines for how long socket factory will be waiting for worker connection. This config section
	// must not change on re-configuration.
	RelayTimeout time.Duration

	// Pool defines worker pool configuration, number of workers, timeouts and etc. This config section might change
	// while server is running.
	Pool *Config
	// contains filtered or unexported fields
}

ServerConfig config combines factory, pool and cmd configurations.

func (*ServerConfig) Differs added in v1.0.0

func (cfg *ServerConfig) Differs(new *ServerConfig) bool

Differs returns true if configuration has changed but ignores pool or cmd changes.

func (*ServerConfig) GetEnv added in v1.5.3

func (cfg *ServerConfig) GetEnv() (env []string)

GetEnv must return list of env variables.

func (*ServerConfig) InitDefaults added in v1.2.2

func (cfg *ServerConfig) InitDefaults() error

InitDefaults sets missing values to their default values.

func (*ServerConfig) SetEnv added in v1.1.1

func (cfg *ServerConfig) SetEnv(k, v string)

SetEnv sets new environment variable. Value is automatically uppercase-d.

func (*ServerConfig) UpscaleDurations added in v1.2.0

func (cfg *ServerConfig) UpscaleDurations()

UpscaleDurations converts duration values from nanoseconds to seconds.

type SocketFactory

type SocketFactory struct {
	// contains filtered or unexported fields
}

SocketFactory connects to external workers using socket server.

func NewSocketFactory

func NewSocketFactory(ls net.Listener, tout time.Duration) *SocketFactory

NewSocketFactory returns SocketFactory attached to a given socket lsn. tout specifies for how long factory should serve for incoming relay connection

func (*SocketFactory) Close added in v1.0.0

func (f *SocketFactory) Close() error

Close socket factory and underlying socket connection.

func (*SocketFactory) SpawnWorker

func (f *SocketFactory) SpawnWorker(cmd *exec.Cmd) (w *Worker, err error)

SpawnWorker creates worker and connects it to appropriate relay or returns error

type State

type State interface {
	fmt.Stringer

	// Value returns state value
	Value() int64

	// NumJobs shows how many times worker was invoked
	NumExecs() int64

	// IsActive returns true if worker not Inactive or Stopped
	IsActive() bool
}

State represents worker status and updated time.

type StaticPool added in v1.0.0

type StaticPool struct {
	// contains filtered or unexported fields
}

StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of workers.

func NewPool

func NewPool(cmd func() *exec.Cmd, factory Factory, cfg Config) (*StaticPool, error)

NewPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.

func (*StaticPool) Config added in v1.0.0

func (p *StaticPool) Config() Config

Config returns associated pool configuration. Immutable.

func (*StaticPool) Destroy added in v1.0.0

func (p *StaticPool) Destroy()

Destroy all underlying workers (but let them to complete the task).

func (*StaticPool) Exec added in v1.0.0

func (p *StaticPool) Exec(rqs *Payload) (rsp *Payload, err error)

Exec one task with given payload and context, returns result or error.

func (*StaticPool) Listen added in v1.0.0

func (p *StaticPool) Listen(l func(event int, ctx interface{}))

Listen attaches pool event controller.

func (*StaticPool) Remove added in v1.4.0

func (p *StaticPool) Remove(w *Worker, err error) bool

Remove forces pool to remove specific worker.

func (*StaticPool) Workers added in v1.0.0

func (p *StaticPool) Workers() (workers []*Worker)

Workers returns worker list associated with the pool.

type Worker

type Worker struct {
	// Pid of the process, points to Pid of underlying process and
	// can be nil while process is not started.
	Pid *int

	// Created indicates at what time worker has been created.
	Created time.Time
	// contains filtered or unexported fields
}

Worker - supervised process with api over goridge.Relay.

func (*Worker) Exec

func (w *Worker) Exec(rqs *Payload) (rsp *Payload, err error)

Exec sends payload to worker, executes it and returns result or error. Make sure to handle worker.Wait() to gather worker level errors. Method might return JobError indicating issue with payload.

func (*Worker) Kill

func (w *Worker) Kill() error

Kill kills underlying process, make sure to call Wait() func to gather error log from the stderr. Does not waits for process completion!

func (*Worker) State

func (w *Worker) State() State

State return receive-only worker state object, state can be used to safely access worker status, time when status changed and number of worker executions.

func (*Worker) Stop

func (w *Worker) Stop() error

Stop sends soft termination command to the worker and waits for process completion.

func (*Worker) String

func (w *Worker) String() string

String returns worker description.

func (*Worker) Wait

func (w *Worker) Wait() error

Wait must be called once for each worker, call will be released once worker is complete and will return process error (if any), if stderr is presented it's value will be wrapped as WorkerError. Method will return error code if php process fails to find or start the script.

type WorkerError added in v1.0.0

type WorkerError struct {
	// Worker
	Worker *Worker

	// Caused error
	Caused error
}

WorkerError is worker related error

func (WorkerError) Error added in v1.0.0

func (e WorkerError) Error() string

Error converts error context to string

Directories

Path Synopsis
cmd
rr
env
rpc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL