Documentation ¶
Overview ¶
Package process provides process management and supervision implementation.
Index ¶
- Variables
- type Iterator
- type Manager
- type Options
- type Process
- type Runnable
- func Chain(deps ...Runnable) Runnable
- func Leaf(runInForeground, gracefulStop func(ctx context.Context) error) Runnable
- func Named(name string, p Runnable) Runnable
- func Nop() Runnable
- func Parallel(deps ...Runnable) Runnable
- func Sequential(deps ...Runnable) Runnable
- func StartStop(startInBackground, gracefulStop func(ctx context.Context) error) Runnable
- type RunnableFunc
- type State
- type Table
Constants ¶
This section is empty.
Variables ¶
var ( // ErrManagerNotRunning is an error that is returned when a process // manager temporarily unavailable (i.e. it is not running). ErrManagerNotRunning = errors.New("process: manager is not running") // ErrProcessNotFound is an error that is returned if the requested // process was not found. ErrProcessNotFound = errors.New("process: process not found") // ErrProcessNotRunning is an error that is returned when a process is // not running (i.e. exists but is in the starting state). ErrProcessNotRunning = errors.New("process: process is not running") // ErrProcessExists is an error that is returned if the process already // exists for the given ID. ErrProcessExists = errors.New("process: process already exists") // ErrProcessInvalidState is an error that is returned if the process // is not in the valid state for the operation. ErrProcessInvalidState = errors.New("process: invalid process state") )
Functions ¶
This section is empty.
Types ¶
type Iterator ¶
type Iterator[K comparable, V any] interface { // Next prepares the value for the next iteration. It returns true on // success, and false if there is no next value. Consult Err to check // whether iterator successfully reached the end or an error occurred. Next() bool // Get returns the key and value for the current iteration. Get(ctx context.Context) (K, V, error) // Err returns any error that occurred during iteration. Err() error // Close closes the iterator. Close() error }
Iterator iterates over key and value pairs in the Table. It follows the semantics of the standard sql.Rows type and does not necessarily correspond to any consistent snapshot of the Table’s contents.
type Manager ¶
type Manager[K comparable, P Runnable] struct { // contains filtered or unexported fields }
Manager manages and supervises processes from the underlying process table.
func NewManager ¶
func NewManager[K comparable, P Runnable](t Table[K, P], o Options) *Manager[K, P]
NewManager returns a new Manager instance. The given table is used to lookup managed processes and periodically restart failed units (or processes that were added externally). Managed processes are uniquely identifiable by key.
A managed process may remove itself from Manager by deleting the associated entry in table before terminating. Likewise, to stop a process, it must be removed from the table prior to Stop call. That is, processes must be aware of being managed and the removal is tighly coupled with the table.
As a rule of thumb, to keep the underlying table consistent, processes should not be re-added to table after being removed from the table. It is possible to implement re-adding on top of Manager but that requires handling possible orderings of table removal, addition, re-addition and process startup, shutdown and self-removal (or a subset of these operations depending on the use cases).
func (*Manager[K, P]) Get ¶
Get returns a running process or either a ErrProcessNotFound error if the process does not exist or ErrProcessNotRunning is the process exists but is not running.
func (*Manager[K, P]) Run ¶
Run starts the manager and executes callback f on successful initialization.
type Process ¶
type Process struct {
// contains filtered or unexported fields
}
Process represents a stateful process that is running in the background. It exposes Start and Stop methods that use an underlying state machine to prevent operations in invalid states. Process is safe for concurrent use.
Unlike some implementations of the underlying Runnable interface that allow multiple consecutive Run invocations on the same instance, a Process may not be reset and started after being stopped.
func NewProcess ¶
NewProcess returns a new stateful process instance for the given Runnable type parameter that would run with the ctx context.
func (*Process) Done ¶
func (p *Process) Done() <-chan struct{}
Done returns a channel that is closed when process terminates.
func (*Process) Start ¶
Start starts the process or cancels the underlying process context on error.
The startup deadline may be set using the given ctx context. Note that the context would not be used by process directly so the associated values are not propagated.
It returns ErrProcessInvalidState if the process is not in the initial state.
func (*Process) Stop ¶
Stop stops the process by returning from the Run method callback and waiting for the termination.
The shutdown deadline may be set using the given ctx context. If the deadline is exceeded, underlying context is canceled, signaling a forced shutdown to the process.
It returns ErrProcessInvalidState if the process was not started or has already been stopped.
type Runnable ¶
type Runnable interface { // Run executes the process. The given callback f is called when the // process has been initialized and began execution. The process is // interrupted if the callback returns or the given context expires. // On termination the context passed to the callback is canceled. // // To make debugging easier, the given f function runs in the same // goroutine where Run was invoked. Note though that this behavior // currently is not supported by all existing Run implementations. // // It is safe to assume that f is called at most once. Run(ctx context.Context, f func(ctx context.Context) error) error }
Runnable defines a blocking long-running process that that should be considered running and ready for use when the Run’s method callback is called.
Runnable performs graceful shutdown when the callback returns. In this case, the context is not canceled and the process may access external resources, e.g. perform some network requests and persist state to database. Otherwise a context cancellation is used to force shutdown.
func Chain ¶ added in v0.0.3
Chain returns a Runnable instance that starts and runs processes by nesting them in callbacks. If no processes are given, it returns Nop instance.
func Leaf ¶ added in v0.0.2
Leaf converts a “leaf” function to a runnable process function that accepts callback. It accepts an optional gracefulStop function to perform graceful shutdown. If the function is nil, the process will be terminated by context cancellation instead.
The resulting Runnable returns first non-nil error from functions in the following order: callback, gracefulStop, runInForeground. That is, if both callback and gracefulStop return a non-nil error, the latter is ignored.
Example (HTTP):
var lis net.Listener var srv *http.Server process.Leaf( func(_ context.Context) error { err := srv.Serve(lis) if errors.Is(err, http.ErrServerClosed) { return nil } return err }, func(ctx context.Context) error { err := srv.Shutdown(ctx) if err != nil { return srv.Close() } return nil }, )
Example (gRPC):
var lis net.Listener var srv *grpc.Server process.Leaf( func(_ context.Context) error { return srv.Serve(lis) }, func(ctx context.Context) error { done := make(chan struct{}) go func() { srv.GracefulStop() close(done) }() select { case <-ctx.Done(): srv.Stop() <-done case <-done: } return nil }, )
Alternatively, use go.pact.im/x/grpcprocess package for gRPC.
func Named ¶ added in v0.0.3
Named returns a process that returns an error prefixed with name on failure.
func Nop ¶ added in v0.0.3
func Nop() Runnable
Nop returns a Runnable instance that performs no operations and returns when the callback does.
func Parallel ¶ added in v0.0.3
Parallel returns a Runnable instance that starts and runs processes in parallel. If no processes are given, it returns Nop instance.
The resulting Runnable calls callback after all process dependencies are successfully started. If any dependecy fails to start, processes that have already started are gracefully stopped. If any dependency fails before the main callback returns, the context passed to callback is canceled and all processes are gracefully stopped (unless the parent context has expired).
The callbacks of dependencies return after the callback of the resulting dependent process. Run returns callback error if it is not nil, otherwise it returns combined errors from dependencies.
func Sequential ¶ added in v0.0.3
Sequential returns a Runnable instance with the same guarantees as the Parallel function, but starts and stops processes in sequential order.
func StartStop ¶ added in v0.0.2
StartStop returns a Runnable instance for the pair of start/stop functions. The stop function should perform a graceful shutdown until a context expires, then proceed with a forced shutdown.
The resulting Runnable returns either start error or the first non-nil error from callback and stop functions. If both callback and stop return a non-nil error, the latter is ignored.
type RunnableFunc ¶
RunnableFunc is a function that implements the Runnable interface.
type State ¶ added in v0.0.2
type State int
State represents the current state of the process.
const ( // StateInitial is the initial state of the process. If Stop is called // in initial state, it prevents subsequent Start calls from succeeding, // thus entering StateStopped. Otherwise the transition on Start call // is to the StateStarting. StateInitial State = iota // StateStarting is the starting state that process enters when Start // is called from initial state. It transitions to either StateRunning // on success or StateStopped on failure (or premature shutdown observed // during startup). StateStarting // StateRunning is the state process enters after a successful startup. // The only possible transition is to the StateStopped if either Stop // is called or a process terminates. StateRunning // StateStopped is the final state of the process. There are no // transitions from this state. StateStopped )
type Table ¶
type Table[K comparable, V any] interface { // Get returns the value for the given key. Get(ctx context.Context, key K) (V, error) // Iter returns an iterator for key and value pairs in the table. Iter(ctx context.Context) (Iterator[K, V], error) }
Table defines a table of key and value pairs that is potentially backed by a persistent and shared storage.