pool

package
v2.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: MIT Imports: 15 Imported by: 9

Documentation

Index

Constants

View Source
const (
	MB = 1024 * 1024
)
View Source
const NSEC_IN_SEC int64 = 1000000000 //nolint:stylecheck

NSEC_IN_SEC nanoseconds in second

View Source
const (
	// StopRequest can be sent by worker to indicate that restart is required.
	StopRequest = `{"stop":true}`
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Command

type Command func() *exec.Cmd

type Config

type Config struct {
	// Debug flag creates new fresh worker before every request.
	Debug bool

	// NumWorkers defines how many sub-processes can be run at once. This value
	// might be doubled by Swapper while hot-swap. Defaults to number of CPU cores.
	NumWorkers uint64 `mapstructure:"num_workers"`

	// MaxJobs defines how many executions is allowed for the worker until
	// it's destruction. set 1 to create new process for each new task, 0 to let
	// worker handle as many tasks as it can.
	MaxJobs uint64 `mapstructure:"max_jobs"`

	// AllocateTimeout defines for how long pool will be waiting for a worker to
	// be freed to handle the task. Defaults to 60s.
	AllocateTimeout time.Duration `mapstructure:"allocate_timeout"`

	// DestroyTimeout defines for how long pool should be waiting for worker to
	// properly destroy, if timeout reached worker will be killed. Defaults to 60s.
	DestroyTimeout time.Duration `mapstructure:"destroy_timeout"`

	// Supervision config to limit worker and pool memory usage.
	Supervisor *SupervisorConfig `mapstructure:"supervisor"`
}

Config .. Pool config Configures the pool behavior.

func (*Config) InitDefaults

func (cfg *Config) InitDefaults()

InitDefaults enables default config values.

type ErrorEncoder

type ErrorEncoder func(err error, w worker.BaseProcess) (*payload.Payload, error)

ErrorEncoder encode error or make a decision based on the error type

type Options

type Options func(p *StaticPool)

func UseParallelAlloc

func UseParallelAlloc() Options

func WithCustomErrEncoder

func WithCustomErrEncoder(errEnc ErrorEncoder) Options

func WithLogger

func WithLogger(z *zap.Logger) Options

type Pool

type Pool interface {
	// GetConfig returns pool configuration.
	GetConfig() interface{}

	// Exec executes task with payload
	Exec(rqs *payload.Payload) (*payload.Payload, error)

	// Workers returns worker list associated with the pool.
	Workers() (workers []worker.BaseProcess)

	// RemoveWorker removes worker from the pool.
	RemoveWorker(worker worker.BaseProcess) error

	// Reset kill all workers inside the watcher and replaces with new
	Reset(ctx context.Context) error

	// Destroy all underlying stack (but let them to complete the task).
	Destroy(ctx context.Context)
	// contains filtered or unexported methods
}

Pool managed set of inner worker processes.

func NewStaticPool

func NewStaticPool(ctx context.Context, cmd Command, factory ipc.Factory, cfg *Config, options ...Options) (Pool, error)

NewStaticPool creates new worker pool and task multiplexer. StaticPool will initiate with one worker.

type StaticPool

type StaticPool struct {
	// contains filtered or unexported fields
}

StaticPool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.

func (*StaticPool) Destroy

func (sp *StaticPool) Destroy(ctx context.Context)

Destroy all underlying stack (but let them complete the task).

func (*StaticPool) Exec

func (sp *StaticPool) Exec(p *payload.Payload) (*payload.Payload, error)

Exec executes provided payload on the worker

func (*StaticPool) GetConfig

func (sp *StaticPool) GetConfig() interface{}

GetConfig returns associated pool configuration. Immutable.

func (*StaticPool) RemoveWorker

func (sp *StaticPool) RemoveWorker(wb worker.BaseProcess) error

func (*StaticPool) Reset

func (sp *StaticPool) Reset(ctx context.Context) error

func (*StaticPool) Workers

func (sp *StaticPool) Workers() (workers []worker.BaseProcess)

Workers returns worker list associated with the pool.

type Supervised

type Supervised interface {
	Pool
	// Start used to start watching process for all pool workers
	Start()
}

type SupervisorConfig

type SupervisorConfig struct {
	// WatchTick defines how often to check the state of worker.
	WatchTick time.Duration `mapstructure:"watch_tick"`

	// TTL defines maximum time worker is allowed to live.
	TTL time.Duration `mapstructure:"ttl"`

	// IdleTTL defines maximum duration worker can spend in idle mode. Disabled when 0.
	IdleTTL time.Duration `mapstructure:"idle_ttl"`

	// ExecTTL defines maximum lifetime per job.
	ExecTTL time.Duration `mapstructure:"exec_ttl"`

	// MaxWorkerMemory limits memory per worker.
	MaxWorkerMemory uint64 `mapstructure:"max_worker_memory"`
}

func (*SupervisorConfig) InitDefaults

func (cfg *SupervisorConfig) InitDefaults()

InitDefaults enables default config values.

type Watcher

type Watcher interface {
	// Watch used to add workers to the container
	Watch(workers []worker.BaseProcess) error

	// Take takes the first free worker
	Take(ctx context.Context) (worker.BaseProcess, error)

	// Release releases the worker putting it back to the queue
	Release(w worker.BaseProcess)

	// Allocate - allocates new worker and put it into the WorkerWatcher
	Allocate() error

	// Destroy destroys the underlying container
	Destroy(ctx context.Context)

	// Reset will replace container and workers array, kill all workers
	Reset(ctx context.Context)

	// List return all container w/o removing it from internal storage
	List() []worker.BaseProcess

	// Remove will remove worker from the container
	Remove(wb worker.BaseProcess)
}

Watcher is an interface for the Sync workers lifecycle

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL