pool

package
v2.0.0-RC.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2021 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const MB = 1024 * 1024
View Source
const NSEC_IN_SEC int64 = 1000000000 //nolint:golint,stylecheck

NSEC_IN_SEC nanoseconds in second

View Source
const StopRequest = "{\"stop\":true}"

StopRequest can be sent by worker to indicate that restart is required.

Variables

This section is empty.

Functions

This section is empty.

Types

type Command

type Command func() *exec.Cmd

type Config

type Config struct {
	// Debug flag creates new fresh worker before every request.
	Debug bool

	// NumWorkers defines how many sub-processes can be run at once. This value
	// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
	NumWorkers uint64 `mapstructure:"num_workers"`

	// MaxJobs defines how many executions is allowed for the worker until
	// it's destruction. set 1 to create new process for each new task, 0 to let
	// worker handle as many tasks as it can.
	MaxJobs uint64 `mapstructure:"max_jobs"`

	// AllocateTimeout defines for how long pool will be waiting for a worker to
	// be freed to handle the task. Defaults to 60s.
	AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`

	// DestroyTimeout defines for how long pool should be waiting for worker to
	// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
	DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`

	// Supervision config to limit worker and pool memory usage.
	Supervisor *SupervisorConfig `mapstructure:"supervisor"`
}

Configures the pool behaviour.

func (*Config) InitDefaults

func (cfg *Config) InitDefaults()

InitDefaults enables default config values.

type ErrorEncoder

type ErrorEncoder func(err error, w worker.BaseProcess) (payload.Payload, error)

ErrorEncoder encode error or make a decision based on the error type

type Options

type Options func(p *StaticPool)

func AddListeners

func AddListeners(listeners ...events.Listener) Options

type Pool

type Pool interface {
	// GetConfig returns pool configuration.
	GetConfig() interface{}

	// Exec executes task with payload
	Exec(rqs payload.Payload) (payload.Payload, error)

	// ExecWithContext executes task with context which is used with timeout
	ExecWithContext(ctx context.Context, rqs payload.Payload) (payload.Payload, error)

	// Workers returns worker list associated with the pool.
	Workers() (workers []worker.BaseProcess)

	// Remove worker from the pool.
	RemoveWorker(worker worker.BaseProcess) error

	// Destroy all underlying stack (but let them to complete the task).
	Destroy(ctx context.Context)
}

Pool managed set of inner worker processes.

func Initialize

func Initialize(ctx context.Context, cmd Command, factory transport.Factory, cfg Config, options ...Options) (Pool, error)

Initialize creates new worker pool and task multiplexer. StaticPool will initiate with one worker.

type StaticPool

type StaticPool struct {
	// contains filtered or unexported fields
}

StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.

func (*StaticPool) Destroy

func (sp *StaticPool) Destroy(ctx context.Context)

Destroy all underlying stack (but let them to complete the task).

func (*StaticPool) Exec

func (sp *StaticPool) Exec(p payload.Payload) (payload.Payload, error)

Be careful, sync Exec with ExecWithContext

func (*StaticPool) ExecWithContext

func (sp *StaticPool) ExecWithContext(ctx context.Context, p payload.Payload) (payload.Payload, error)

Be careful, sync with pool.Exec method

func (*StaticPool) GetConfig

func (sp *StaticPool) GetConfig() interface{}

Config returns associated pool configuration. Immutable.

func (*StaticPool) RemoveWorker

func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error

func (*StaticPool) Workers

func (sp *StaticPool) Workers() (workers []worker.BaseProcess)

Workers returns worker list associated with the pool.

type Supervised

type Supervised interface {
	Pool
	// Start used to start watching process for all pool workers
	Start()
}

type SupervisorConfig

type SupervisorConfig struct {
	// WatchTick defines how often to check the state of worker.
	WatchTick time.Duration `mapstructure:"watch_tick"`

	// TTL defines maximum time worker is allowed to live.
	TTL time.Duration `mapstructure:"ttl"`

	// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
	IdleTTL time.Duration `mapstructure:"idle_ttl"`

	// ExecTTL defines maximum lifetime per job.
	ExecTTL time.Duration `mapstructure:"exec_ttl"`

	// MaxWorkerMemory limits memory per worker.
	MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
}

func (*SupervisorConfig) InitDefaults

func (cfg *SupervisorConfig) InitDefaults()

InitDefaults enables default config values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL