Documentation ¶
Overview ¶
Package node provides distributed worker pools with supervisors.
Index ¶
- Constants
- Variables
- func AddErrPool(mach *am.Machine, err error, args am.A) error
- func AddErrPoolStr(mach *am.Machine, msg string, args am.A) error
- func AddErrRpc(mach *am.Machine, err error, args am.A) error
- func AddErrWorker(event *am.Event, mach *am.Machine, err error, args am.A) error
- func AddErrWorkerStr(mach *am.Machine, msg string, args am.A) error
- func GetSuperClientId(name string) string
- func GetWorkerClientId(name string) string
- func LogArgs(args am.A) map[string]string
- func Pass(args *A) am.A
- func PassRpc(args *A) am.A
- type A
- type ARpc
- type Client
- func (c *Client) Dispose(ctx context.Context)
- func (c *Client) ReqWorker(ctx context.Context) error
- func (c *Client) Start(nodesList []string)
- func (c *Client) StartEnd(e *am.Event)
- func (c *Client) StartEnter(e *am.Event) bool
- func (c *Client) StartState(e *am.Event)
- func (c *Client) Stop(ctx context.Context)
- func (c *Client) WorkerPayloadEnter(e *am.Event) bool
- func (c *Client) WorkerPayloadState(e *am.Event)
- func (c *Client) WorkerReadyState(e *am.Event)
- func (c *Client) WorkerRequestedEnter(e *am.Event) bool
- func (c *Client) WorkerRequestedState(e *am.Event)
- type ClientOpts
- type ClientStateDeps
- type Supervisor
- func (s *Supervisor) CheckPool() bool
- func (s *Supervisor) ClientConnectedState(e *am.Event)
- func (s *Supervisor) ClientDisconnectedEnter(e *am.Event) bool
- func (s *Supervisor) ClientDisconnectedState(e *am.Event)
- func (s *Supervisor) Dispose()
- func (s *Supervisor) ErrWorkerState(e *am.Event)
- func (s *Supervisor) ForkWorkerEnter(e *am.Event) bool
- func (s *Supervisor) ForkWorkerState(e *am.Event)
- func (s *Supervisor) ForkingWorkerEnter(e *am.Event) bool
- func (s *Supervisor) ForkingWorkerState(e *am.Event)
- func (s *Supervisor) HeartbeatState(e *am.Event)
- func (s *Supervisor) KillingWorkerEnter(e *am.Event) bool
- func (s *Supervisor) KillingWorkerState(e *am.Event)
- func (s *Supervisor) ListWorkersEnter(e *am.Event) bool
- func (s *Supervisor) ListWorkersState(e *am.Event)
- func (s *Supervisor) NormalizingPoolState(e *am.Event)
- func (s *Supervisor) PoolReadyEnter(e *am.Event) bool
- func (s *Supervisor) PoolReadyExit(e *am.Event) bool
- func (s *Supervisor) ProvideWorkerEnter(e *am.Event) bool
- func (s *Supervisor) ProvideWorkerState(e *am.Event)
- func (s *Supervisor) SetPool(min, max, warm, maxPerClient int)
- func (s *Supervisor) SetWorkerEnter(e *am.Event) bool
- func (s *Supervisor) SetWorkerState(e *am.Event)
- func (s *Supervisor) Start(publicAddr string)
- func (s *Supervisor) StartEnd(e *am.Event)
- func (s *Supervisor) StartEnter(e *am.Event) bool
- func (s *Supervisor) StartState(e *am.Event)
- func (s *Supervisor) Stop()
- func (s *Supervisor) WorkerConnectedEnter(e *am.Event) bool
- func (s *Supervisor) WorkerConnectedState(e *am.Event)
- func (s *Supervisor) WorkerForkedEnter(e *am.Event) bool
- func (s *Supervisor) WorkerForkedState(e *am.Event)
- func (s *Supervisor) WorkerKilledEnter(e *am.Event) bool
- func (s *Supervisor) WorkerKilledState(e *am.Event)
- func (s *Supervisor) Workers(ctx context.Context, state WorkerState) ([]*workerInfo, error)
- type SupervisorOpts
- type Worker
- func (w *Worker) ErrNetworkState(e *am.Event)
- func (w *Worker) HealthcheckState(e *am.Event)
- func (w *Worker) LocalRpcReadyState(e *am.Event)
- func (w *Worker) PublicRpcReadyState(e *am.Event)
- func (w *Worker) RpcReadyState(e *am.Event)
- func (w *Worker) SendPayloadEnter(e *am.Event) bool
- func (w *Worker) ServeClientEnter(e *am.Event) bool
- func (w *Worker) ServeClientState(e *am.Event)
- func (w *Worker) Start(bootAddr string) am.Result
- func (w *Worker) StartEnd(e *am.Event)
- func (w *Worker) StartEnter(e *am.Event) bool
- func (w *Worker) StartState(e *am.Event)
- func (w *Worker) Stop(dispose bool)
- type WorkerOpts
- type WorkerState
Constants ¶
const ( // EnvAmNodeLogSupervisor enables machine logging for node supervisor. EnvAmNodeLogSupervisor = "AM_NODE_LOG_SUPERVISOR" // EnvAmNodeLogClient enables machine logging for node client. EnvAmNodeLogClient = "AM_NODE_LOG_CLIENT" )
Variables ¶
var ( ErrWorker = errors.New("worker error") ErrWorkerMissing = errors.New("worker missing") ErrWorkerHealth = errors.New("worker failed healthcheck") ErrWorkerConn = errors.New("error starting connection") ErrWorkerKill = errors.New("error killing worker") ErrPool = errors.New("pool error") ErrHeartbeat = errors.New("heartbeat failed") ErrRpc = errors.New("rpc error") )
Functions ¶
func AddErrPool ¶
AddErrPool wraps an error in the ErrPool sentinel and adds to a machine.
func AddErrPoolStr ¶
AddErrPoolStr wraps a msg in the ErrPool sentinel and adds to a machine.
func AddErrWorker ¶
AddErrWorker wraps an error in the ErrWorker sentinel and adds to a machine.
func AddErrWorkerStr ¶
AddErrWorkerStr wraps a msg in the ErrWorker sentinel and adds to a machine.
func GetSuperClientId ¶ added in v0.9.0
GetClientId returns a Node Client machine ID from a name.
func GetWorkerClientId ¶ added in v0.9.0
GetWorkerClientId returns a Node Client machine ID from a name.
Types ¶
type A ¶
type A struct { // Id is a machine ID. Id string `log:"id"` // PublicAddr is the public address of a Supervisor or WorkerRpc. PublicAddr string `log:"public_addr"` // LocalAddr is the public address of a Supervisor or WorkerRpc. LocalAddr string `log:"local_addr"` // BootAddr is the local address of the Bootstrap machine. BootAddr string `log:"boot_addr"` // NodesList is a list of available nodes (supervisors' public RPC addresses). NodesList []string // WorkerRpcId is a machine ID of the worker RPC client. WorkerRpcId string `log:"id"` // SuperRpcId is a machine ID of the super RPC client. SuperRpcId string `log:"id"` // WorkerRpc is the RPC client connected to a WorkerRpc. WorkerRpc *rpc.Client // Bootstrap is the RPC machine used to connect WorkerRpc to the Supervisor. Bootstrap *bootstrap // Dispose the worker. Dispose bool // WorkerAddr is an index for WorkerInfo. WorkerAddr string // WorkerInfo describes a worker. WorkerInfo *workerInfo // WorkersCh returns a list of workers. This channel has to be buffered. WorkersCh chan<- []*workerInfo // WorkerState is a requested state of workers, eg for listings. WorkerState WorkerState }
A is a struct for node arguments. It's a typesafe alternative to am.A.
type ARpc ¶
type ARpc struct { // Id is a machine ID. Id string `log:"id"` // PublicAddr is the public address of a Supervisor or Worker. PublicAddr string `log:"public_addr"` // LocalAddr is the public address of a Supervisor, Worker, or [bootstrap]. LocalAddr string `log:"local_addr"` // BootAddr is the local address of the Bootstrap machine. BootAddr string `log:"boot_addr"` // NodesList is a list of available nodes (supervisors' public RPC addresses). NodesList []string // WorkerRpcId is a machine ID of the worker RPC client. WorkerRpcId string `log:"worker_rpc_id"` // SuperRpcId is a machine ID of the super RPC client. SuperRpcId string `log:"super_rpc_id"` }
ARpc is a subset of A, that can be passed over RPC.
type Client ¶
type Client struct { *am.ExceptionHandler Mach *am.Machine Name string SuperAddr string LogEnabled bool // LeaveSuper is a flag to leave the supervisor after connecting to the // worker. TODO LeaveSuper bool // ConnTimeout is the time to wait for an outbound connection to be // established. Default is 5 seconds. ConnTimeout time.Duration SuperRpc *rpc.Client WorkerRpc *rpc.Client // contains filtered or unexported fields }
Client is a node client, connecting to a supervisor and then a worker.
func NewClient ¶
func NewClient(ctx context.Context, id string, workerKind string, stateDeps *ClientStateDeps, opts *ClientOpts, ) (*Client, error)
NewClient creates a new Client instance with the provided context, id, workerKind, state dependencies, and options. Returns a pointer to the Client instance and an error if any validation or initialization fails.
func (*Client) ReqWorker ¶
ReqWorker sends a request to add a "WorkerRequested" state to the client's state machine and waits for "WorkerReady" state.
func (*Client) StartState ¶
func (*Client) Stop ¶
Stop halts the client's connection to both the supervisor and worker RPCs, and removes the client state from the state machine.
func (*Client) WorkerPayloadState ¶
WorkerPayloadState handles both Supervisor and Worker inbound payloads.
func (*Client) WorkerReadyState ¶ added in v0.9.0
func (*Client) WorkerRequestedState ¶
type ClientOpts ¶
type ClientOpts struct { // Parent is a parent state machine for a new client state machine. See // [am.Opts]. Parent am.Api // TODO Tags []string }
ClientOpts provides configuration options for creating a new client state machine.
type ClientStateDeps ¶
type ClientStateDeps struct { ClientSStruct am.Struct ClientSNames am.S WorkerSStruct am.Struct WorkerSNames am.S }
ClientStateDeps contains the state definitions and names of the client and worker machines, needed to create a new client.
type Supervisor ¶
type Supervisor struct { *am.ExceptionHandler Mach *am.Machine // WorkerKind is the kind of worker this supervisor is managing. WorkerKind string // WorkerBin is the path and args to the worker binary. WorkerBin []string // Name is the name of the supervisor. Name string LogEnabled bool // Max is the maximum number of workers. Default is 10. Max int // Min is the minimum number of workers. Default is 2. Min int // Warm is the number of warm (ready) workers. Default is 5. Warm int // MaxClientWorkers is the maximum number of workers per 1 client. Defaults to // Max. MaxClientWorkers int // WorkerErrTtl is the time to keep worker errors in memory. Default is 10m. WorkerErrTtl time.Duration // WorkerErrRecent is the time to consider recent errors. Default is 1m. WorkerErrRecent time.Duration // WorkerErrKill is the number of errors to kill a worker. Default is 3. WorkerErrKill int // ConnTimeout is the time to wait for an outbound connection to be // established. Default is 5s. ConnTimeout time.Duration // DeliveryTimeout is a timeout for RPC delivery. DeliveryTimeout time.Duration // OpTimeout is the default timeout for operations (eg getters). OpTimeout time.Duration // PoolPause is the time to wait between normalizing the pool. Default is 5s. PoolPause time.Duration // HealthcheckPause is the time between trying to get a Healtcheck response // from a worker. HealthcheckPause time.Duration // Heartbeat is the frequency of the Heartbeat state, which normalized the // pool and checks workers. Default 1m. Heartbeat time.Duration // WorkerCheckInterval defines how often to pull a worker's state. Default 1s. WorkerCheckInterval time.Duration // PublicAddr is the address for the public RPC server to listen on. The // effective address is at [PublicMux.Addr]. PublicAddr string // PublicMux is the public listener to create RPC servers for each client. PublicMux *rpc.Mux // PublicRpc are the public RPC servers of connected clients, indexed by // remote addresses. PublicRpcs map[string]*rpc.Server // LocalAddr is the address for the local RPC server to listen on. The // effective address is at [LocalRpc.Addr]. LocalAddr string // LocalRpc is the local RPC server, used by other supervisors to connect. // TODO rpc/mux LocalRpc *rpc.Server TestFork func(string) error TestKill func(string) error WorkerReadyState am.HandlerFinal WorkerGoneState am.HandlerFinal KillWorkerState am.HandlerFinal ClientSendPayloadState am.HandlerFinal SuperSendPayloadState am.HandlerFinal HealthcheckState am.HandlerFinal // contains filtered or unexported fields }
func NewSupervisor ¶
func NewSupervisor( ctx context.Context, workerKind string, workerBin []string, workerStruct am.Struct, workerSNames am.S, opts *SupervisorOpts, ) (*Supervisor, error)
NewSupervisor initializes and returns a new Supervisor instance with specified context, worker attributes, and options.
func (*Supervisor) CheckPool ¶
func (s *Supervisor) CheckPool() bool
CheckPool tries to set pool as ready and normalizes it, if not.
func (*Supervisor) ClientConnectedState ¶
func (s *Supervisor) ClientConnectedState(e *am.Event)
func (*Supervisor) ClientDisconnectedEnter ¶
func (s *Supervisor) ClientDisconnectedEnter(e *am.Event) bool
func (*Supervisor) ClientDisconnectedState ¶
func (s *Supervisor) ClientDisconnectedState(e *am.Event)
func (*Supervisor) Dispose ¶
func (s *Supervisor) Dispose()
func (*Supervisor) ErrWorkerState ¶
func (s *Supervisor) ErrWorkerState(e *am.Event)
func (*Supervisor) ForkWorkerEnter ¶
func (s *Supervisor) ForkWorkerEnter(e *am.Event) bool
func (*Supervisor) ForkWorkerState ¶
func (s *Supervisor) ForkWorkerState(e *am.Event)
func (*Supervisor) ForkingWorkerEnter ¶
func (s *Supervisor) ForkingWorkerEnter(e *am.Event) bool
func (*Supervisor) ForkingWorkerState ¶
func (s *Supervisor) ForkingWorkerState(e *am.Event)
func (*Supervisor) HeartbeatState ¶
func (s *Supervisor) HeartbeatState(e *am.Event)
func (*Supervisor) KillingWorkerEnter ¶
func (s *Supervisor) KillingWorkerEnter(e *am.Event) bool
func (*Supervisor) KillingWorkerState ¶
func (s *Supervisor) KillingWorkerState(e *am.Event)
func (*Supervisor) ListWorkersEnter ¶ added in v0.10.0
func (s *Supervisor) ListWorkersEnter(e *am.Event) bool
func (*Supervisor) ListWorkersState ¶ added in v0.10.0
func (s *Supervisor) ListWorkersState(e *am.Event)
func (*Supervisor) NormalizingPoolState ¶
func (s *Supervisor) NormalizingPoolState(e *am.Event)
func (*Supervisor) PoolReadyEnter ¶
func (s *Supervisor) PoolReadyEnter(e *am.Event) bool
func (*Supervisor) PoolReadyExit ¶
func (s *Supervisor) PoolReadyExit(e *am.Event) bool
func (*Supervisor) ProvideWorkerEnter ¶
func (s *Supervisor) ProvideWorkerEnter(e *am.Event) bool
func (*Supervisor) ProvideWorkerState ¶
func (s *Supervisor) ProvideWorkerState(e *am.Event)
func (*Supervisor) SetPool ¶
func (s *Supervisor) SetPool(min, max, warm, maxPerClient int)
SetPool sets the pool parameters with defaults.
func (*Supervisor) SetWorkerEnter ¶ added in v0.10.0
func (s *Supervisor) SetWorkerEnter(e *am.Event) bool
func (*Supervisor) SetWorkerState ¶ added in v0.10.0
func (s *Supervisor) SetWorkerState(e *am.Event)
func (*Supervisor) Start ¶
func (s *Supervisor) Start(publicAddr string)
func (*Supervisor) StartEnd ¶
func (s *Supervisor) StartEnd(e *am.Event)
func (*Supervisor) StartEnter ¶
func (s *Supervisor) StartEnter(e *am.Event) bool
func (*Supervisor) StartState ¶
func (s *Supervisor) StartState(e *am.Event)
func (*Supervisor) Stop ¶
func (s *Supervisor) Stop()
func (*Supervisor) WorkerConnectedEnter ¶ added in v0.9.0
func (s *Supervisor) WorkerConnectedEnter(e *am.Event) bool
func (*Supervisor) WorkerConnectedState ¶ added in v0.9.0
func (s *Supervisor) WorkerConnectedState(e *am.Event)
func (*Supervisor) WorkerForkedEnter ¶
func (s *Supervisor) WorkerForkedEnter(e *am.Event) bool
func (*Supervisor) WorkerForkedState ¶
func (s *Supervisor) WorkerForkedState(e *am.Event)
func (*Supervisor) WorkerKilledEnter ¶
func (s *Supervisor) WorkerKilledEnter(e *am.Event) bool
func (*Supervisor) WorkerKilledState ¶
func (s *Supervisor) WorkerKilledState(e *am.Event)
func (*Supervisor) Workers ¶ added in v0.10.0
func (s *Supervisor) Workers( ctx context.Context, state WorkerState, ) ([]*workerInfo, error)
Workers returns a list of workers in a desired state. If [ctx] expires, it will reutrn nil, nil.
type SupervisorOpts ¶
type Worker ¶
type Worker struct { *am.ExceptionHandler Mach *am.Machine Name string Kind string // AcceptClient is the ID of a client, passed by the supervisor. Worker should // only accept connections from this client. AcceptClient string // ConnTimeout is the time to wait for an outbound connection to be // established. ConnTimeout time.Duration DeliveryTimeout time.Duration // BootAddr is the address of the bootstrap machine. BootAddr string // BootRpc is the RPC client connection to bootstrap machine, which passes // connection info to the Supervisor. BootRpc *rpc.Client // LocalAddr is the address of the local RPC server. LocalAddr string // LocalRpc is the local RPC server, used by the Supervisor to connect. LocalRpc *rpc.Server // PublicAddr is the address of the public RPC server. PublicAddr string // PublicRpc is the public RPC server, used by the Client to connect. PublicRpc *rpc.Server }
func NewWorker ¶
func NewWorker(ctx context.Context, kind string, workerStruct am.Struct, stateNames am.S, opts *WorkerOpts, ) (*Worker, error)
NewWorker initializes a new Worker instance and returns it, or an error if validation fails.
func (*Worker) ErrNetworkState ¶
func (*Worker) HealthcheckState ¶
func (*Worker) LocalRpcReadyState ¶
func (*Worker) PublicRpcReadyState ¶
func (*Worker) RpcReadyState ¶
func (*Worker) ServeClientState ¶
func (*Worker) StartState ¶
type WorkerOpts ¶
type WorkerState ¶ added in v0.10.0
type WorkerState string
states of a worker
const ( StateIniting WorkerState = "initing" StateRpc WorkerState = "rpc" StateIdle WorkerState = "idle" StateBusy WorkerState = "busy" StateReady WorkerState = "ready" )