Documentation ¶
Overview ¶
uwe (Ubiquitous Workers Engine) is a common toolset for building and organizing Go application with actor-like workers.
`Chief` is a supervisor that can be placed at the top of the go application's execution stack, it is blocked until SIGTERM is intercepted and then it shutdown all workers gracefully. Also, `Chief` can be used as a child supervisor inside the` Worker`, which is launched by `Chief` at the top-level.
`Worker` is an interface for async workers which launches and manages by the **Chief**.
1. `Init()` - method used to initialize some state of the worker that required interaction with outer context, for example, initialize some connectors. In many cases this method is optional, so it can be implemented as empty:
`func (*W) Init() error { return nil }`.
2. `Run(ctx Context) error` - starts the `Worker` instance execution. The context will provide a signal when a worker must stop through the `ctx.Done()`.
Workers lifecycle:
```text (*) -> [New] -> [Initialized] -> [Run] -> [Stopped]
| | | | | ↓ |-------------|------> [Failed]
```
Example ¶
package main import ( "log" "time" ) func main() { // initialize new instance of Chief chief := NewChief() // will add workers into the pool chief.AddWorker("dummy", NewDummy()) // will enable recover of internal panics chief.UseDefaultRecover() // pass handler for internal events like errors, panics, warning, etc. // you can log it with you favorite logger (ex Logrus, Zap, etc) chief.SetEventHandler(STDLogEventHandler()) // init all registered workers and run it all chief.Run() } type dummy struct{} // NewDummy initialize new instance of dummy Worker. func NewDummy() Worker { // At this point in most cases there we are preparing some state of the worker, // like a logger, configuration, variable, and fields. return &dummy{} } // Init is an interface method used to initialize some state of the worker // that required interaction with outer context, for example, initialize some connectors. func (d *dummy) Init() error { return nil } // Run starts event loop of worker. func (d *dummy) Run(ctx Context) error { // initialize all required stuffs for the execution flow ticker := time.NewTicker(time.Second) for { select { case <-ticker.C: // define all the processing code here // or move it to a method and make a call here log.Println("do something") case <-ctx.Done(): // close all connections, channels and finalise state if needed log.Println("good bye") return nil } } }
Output:
Index ¶
- Constants
- func CliCheckCommand(app AppInfo, workerListProvider func(c *cli.Context) []WorkerName) cli.Command
- func STDLogEventHandler() func(event Event)
- type AppInfo
- type Chief
- type Context
- type Event
- type EventHandler
- type EventLevel
- type Locker
- type Recover
- type Shutdown
- type StateInfo
- type Worker
- type WorkerExistRule
- type WorkerName
- type WorkerPool
- func (p *WorkerPool) FailWorker(name WorkerName) error
- func (p *WorkerPool) GetState(name WorkerName) sam.State
- func (p *WorkerPool) GetWorkersStates() map[WorkerName]sam.State
- func (p *WorkerPool) InitWorker(name WorkerName) error
- func (p *WorkerPool) IsRun(name WorkerName) bool
- func (p *WorkerPool) ReplaceWorker(name WorkerName, worker Worker)
- func (p *WorkerPool) RunWorkerExec(ctx Context, name WorkerName) (err error)
- func (p *WorkerPool) SetState(name WorkerName, state sam.State) error
- func (p *WorkerPool) SetWorker(name WorkerName, worker Worker) error
- func (p *WorkerPool) StartWorker(name WorkerName) error
- func (p *WorkerPool) StopWorker(name WorkerName) error
Examples ¶
Constants ¶
const ( // StatusAction is a command useful for health-checks, because it returns status of all workers. StatusAction = "status" // PingAction is a simple command that returns the "pong" message. PingAction = "ping" )
const ( WStateNotExists sam.State = "NotExists" WStateNew sam.State = "New" WStateInitialized sam.State = "Initialized" WStateRun sam.State = "Run" WStateStopped sam.State = "Stopped" WStateFailed sam.State = "Failed" )
DefaultForceStopTimeout is a timeout for killing all workers.
Variables ¶
This section is empty.
Functions ¶
func CliCheckCommand ¶
CliCheckCommand returns `cli.Command`, which allows you to check the health of a running instance **Application** with `ServiceSocket` enabled using `(Chief) .EnableServiceSocket(...)`
Types ¶
type AppInfo ¶
type AppInfo struct { Name string `json:"name"` Version string `json:"version"` Build string `json:"build"` Tag string `json:"tag"` }
AppInfo is a details of the *Application* build.
type Chief ¶
type Chief interface { // AddWorker registers the worker in the pool. AddWorker(WorkerName, Worker) // AddWorkers registers the list of workers in the pool. AddWorkers(map[WorkerName]Worker) // GetWorkersStates returns the current state of all registered workers. GetWorkersStates() map[WorkerName]sam.State // EnableServiceSocket initializes `net.Socket` server for internal management purposes. // By default includes two actions: // - "status" is a command useful for health-checks, because it returns status of all workers; // - "ping" is a simple command that returns the "pong" message. // The user can provide his own list of actions with handler closures. EnableServiceSocket(app AppInfo, actions ...socket.Action) // Event returns the channel with internal Events. // ATTENTION: `Event() <-chan Event` and `SetEventHandler(EventHandler)` is mutually exclusive, // but one of them must be used! Event() <-chan Event // SetEventHandler adds a callback that processes the `Chief` // internal events and can log them or do something else. // ATTENTION: `Event() <-chan Event` and `SetEventHandler(EventHandler)` is mutually exclusive, // but one of them must be used! SetEventHandler(EventHandler) // SetContext replaces the default context with the provided one. // It can be used to deliver some values inside `(Worker) .Run (ctx Context)`. SetContext(context.Context) // SetLocker sets a custom `Locker`, if it is not set, // the default `Locker` will be used, which expects SIGTERM or SIGINT system signals. SetLocker(Locker) // SetRecover sets a custom `recover` that catches panic. SetRecover(Recover) // SetShutdown sets `Shutdown` callback. SetShutdown(Shutdown) // UseDefaultRecover sets a standard handler as a `recover` // that catches panic and sends a fatal event to the event channel. UseDefaultRecover() // Run is the main entry point into the `Chief` run loop. // This method initializes all added workers, the server `net.Socket`, // if enabled, starts the workers in separate routines // and waits for the end of lock produced by the locker function. Run() // Shutdown sends stop signal to all child goroutines by triggering of the `context.CancelFunc()` // and executes `Shutdown` callback. Shutdown() }
Chief is a supervisor that can be placed at the top of the go application's execution stack, it is blocked until SIGTERM is intercepted and then it shutdown all workers gracefully. Also, `Chief` can be used as a child supervisor inside the `Worker`, which is launched by `Chief` at the top-level.
type Context ¶
Context is a wrapper over the standard `context.Context`. The main purpose of this is to extend in the future.
type Event ¶
type Event struct { Level EventLevel Worker WorkerName Fields map[string]interface{} Message string }
Event is a message object that is used to signalize about Chief's internal events and processed by `EventHandlers`.
func ErrorEvent ¶
ErrorEvent returns new Event with `LvlError` and provided message.
func (Event) IsError ¶
IsError returns `true` if event level is `Error`
func (Event) IsFatal ¶
IsFatal returns `true` if event level is `Fatal`
func (Event) SetField ¶
SetField add to event some Key/Value.
func (Event) SetWorker ¶
func (e Event) SetWorker(name WorkerName) Event
SetWorker sets the provided `worker` as the event source.
type EventHandler ¶
type EventHandler func(Event)
EventHandler callback that processes the `Chief` internal events, can log them or do something else.
func LogrusEventHandler ¶
func LogrusEventHandler(entry *logrus.Entry) EventHandler
LogrusEventHandler returns default `EventHandler` that can be used for `Chief.SetEventHandler(...)`.
type EventLevel ¶
type EventLevel string
EventLevel ...
const ( LvlFatal EventLevel = "fatal" LvlError EventLevel = "error" LvlInfo EventLevel = "info" )
type Locker ¶
type Locker func()
Locker is a function whose completion of a call is a signal to stop `Chief` and all workers.
type Recover ¶
type Recover func(name WorkerName)
Recover is a function that will be used as a `defer call` to handle each worker's panic.
type Shutdown ¶
type Shutdown func()
// Shutdown is a callback function that will be executed after the Chief and workers are stopped. Its main purpose is to close, complete, or retain some global states or shared resources.
type StateInfo ¶
type StateInfo struct { App AppInfo `json:"app"` Workers map[WorkerName]sam.State `json:"workers"` }
StateInfo is result the `StatusAction` command.
func ParseStateInfo ¶
func ParseStateInfo(data json.RawMessage) (*StateInfo, error)
ParseStateInfo decodes `StateInfo` from the JSON response for the `StatusAction` command.
type Worker ¶
type Worker interface { // Init initializes some state of the worker that required interaction with outer context, // for example, initialize some connectors. In many cases this method is optional, // so it can be implemented as empty: `func (*W) Init() error { return nil }`. Init() error // Run starts the `Worker` instance execution. The context will provide a signal // when a worker must stop through the `ctx.Done()`. Run(ctx Context) error }
Worker is an interface for async workers which launches and manages by the `Chief`.
type WorkerExistRule ¶
type WorkerExistRule struct { AvailableWorkers map[WorkerName]struct{} // contains filtered or unexported fields }
WorkerExistRule is a custom validation rule for the `ozzo-validation` package.
func (*WorkerExistRule) Error ¶
func (r *WorkerExistRule) Error(message string) *WorkerExistRule
Error sets the error message for the rule.
func (*WorkerExistRule) Validate ¶
func (r *WorkerExistRule) Validate(value interface{}) error
Validate checks that service exist on the system
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool provides a mechanism to combine many workers into the one pool, manage them, and run.
func (*WorkerPool) FailWorker ¶
func (p *WorkerPool) FailWorker(name WorkerName) error
FailWorker sets state `WorkerFailed` for workers with the specified `name`.
func (*WorkerPool) GetState ¶
func (p *WorkerPool) GetState(name WorkerName) sam.State
GetState returns current state for workers with the specified `name`.
func (*WorkerPool) GetWorkersStates ¶
func (p *WorkerPool) GetWorkersStates() map[WorkerName]sam.State
GetWorkersStates returns current state of all workers.
func (*WorkerPool) InitWorker ¶
func (p *WorkerPool) InitWorker(name WorkerName) error
InitWorker initializes all present workers.
func (*WorkerPool) IsRun ¶
func (p *WorkerPool) IsRun(name WorkerName) bool
IsRun checks is active worker with passed `name`.
func (*WorkerPool) ReplaceWorker ¶
func (p *WorkerPool) ReplaceWorker(name WorkerName, worker Worker)
ReplaceWorker replaces the worker with `name` by new `worker`.
func (*WorkerPool) RunWorkerExec ¶
func (p *WorkerPool) RunWorkerExec(ctx Context, name WorkerName) (err error)
RunWorkerExec adds worker into pool.
func (*WorkerPool) SetState ¶
func (p *WorkerPool) SetState(name WorkerName, state sam.State) error
SetState updates state of specified worker.
func (*WorkerPool) SetWorker ¶
func (p *WorkerPool) SetWorker(name WorkerName, worker Worker) error
SetWorker adds worker into pool.
func (*WorkerPool) StartWorker ¶
func (p *WorkerPool) StartWorker(name WorkerName) error
StartWorker sets state `WorkerEnabled` for workers with the specified `name`.
func (*WorkerPool) StopWorker ¶
func (p *WorkerPool) StopWorker(name WorkerName) error
StopWorker sets state `WorkerStopped` for workers with the specified `name`.