Documentation ¶
Index ¶
- Constants
- Variables
- type Chief
- func (chief *Chief) AddValueToContext(key, value interface{})
- func (chief *Chief) AddWorker(name WorkerName, worker Worker)
- func (chief *Chief) EnableWorker(name WorkerName) error
- func (chief *Chief) EnableWorkers(names ...WorkerName) (err error)
- func (chief *Chief) GetContext() context.Context
- func (chief *Chief) GetWorkersStates() map[WorkerName]sam.State
- func (chief *Chief) Init(logger *logrus.Entry) *Chief
- func (chief *Chief) IsEnabled(name WorkerName) bool
- func (chief *Chief) Run(workers ...WorkerName) error
- func (chief *Chief) RunWithContext(ctx context.Context, workers ...WorkerName) error
- func (chief *Chief) RunWithLocker(locker func(), workers ...WorkerName) (err error)
- func (chief *Chief) StartPool(parentCtx context.Context) int
- type CtxKey
- type ExitCode
- type Message
- type WContext
- type Worker
- type WorkerExistRule
- type WorkerName
- type WorkerPool
- func (pool *WorkerPool) DisableWorker(name WorkerName) error
- func (pool *WorkerPool) EnableWorker(name WorkerName) error
- func (pool *WorkerPool) FailWorker(name WorkerName) error
- func (pool *WorkerPool) GetState(name WorkerName) sam.State
- func (pool *WorkerPool) GetWorkersStates() map[WorkerName]sam.State
- func (pool *WorkerPool) InitWorker(ctx context.Context, name WorkerName) error
- func (pool *WorkerPool) IsDisabled(name WorkerName) bool
- func (pool *WorkerPool) IsEnabled(name WorkerName) bool
- func (pool *WorkerPool) IsRun(name WorkerName) bool
- func (pool *WorkerPool) ReplaceWorker(name WorkerName, worker Worker)
- func (pool *WorkerPool) RunWorkerExec(name WorkerName, ctx WContext) (err error)
- func (pool *WorkerPool) SetState(name WorkerName, state sam.State) error
- func (pool *WorkerPool) SetWorker(name WorkerName, worker Worker)
- func (pool *WorkerPool) StartWorker(name WorkerName) error
- func (pool *WorkerPool) StopWorker(name WorkerName) error
Constants ¶
const ( WStateDisabled sam.State = "Disabled" WStateEnabled sam.State = "Enabled" WStateInitialized sam.State = "Initialized" WStateRun sam.State = "Run" WStateStopped sam.State = "Stopped" WStateFailed sam.State = "Failed" )
Const for worker state
Variables ¶
var ( //ErrWorkerNotExist custom error for not-existing worker ErrWorkerNotExist = func(name WorkerName) error { return fmt.Errorf("%s: not exist", name) } )
var ForceStopTimeout = 45 * time.Second // nolint:gochecknoglobals
ForceStopTimeout is a timeout for killing all workers.
var WorkersStates = map[sam.State]struct{}{ WStateDisabled: {}, WStateEnabled: {}, WStateInitialized: {}, WStateRun: {}, WStateStopped: {}, WStateFailed: {}, }
WorkersStates list of valid workers states.
Functions ¶
This section is empty.
Types ¶
type Chief ¶
type Chief struct { // EnableByDefault sets all the working `Enabled` // if none of the workers is passed on to enable. EnableByDefault bool // AppName main app identifier of instance for logger and etc. AppName string // contains filtered or unexported fields }
Chief is a head of workers, it must be used to register, initialize and correctly start and stop asynchronous executors of the type `Worker`.
func (*Chief) AddValueToContext ¶
func (chief *Chief) AddValueToContext(key, value interface{})
AddValueToContext update chief context and set new value by key
func (*Chief) AddWorker ¶
func (chief *Chief) AddWorker(name WorkerName, worker Worker)
AddWorker register a new `Worker` to the `Chief` worker pool.
func (*Chief) EnableWorker ¶
func (chief *Chief) EnableWorker(name WorkerName) error
EnableWorker enables the worker with the specified `name`. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active
func (*Chief) EnableWorkers ¶
func (chief *Chief) EnableWorkers(names ...WorkerName) (err error)
EnableWorkers enables all worker from the `names` list. By default, all added workers are enabled. After the first call of this method, only directly enabled workers will be active
func (*Chief) GetContext ¶
GetContext returns chief context
func (*Chief) GetWorkersStates ¶
func (chief *Chief) GetWorkersStates() map[WorkerName]sam.State
GetWorkersStates returns worker state by name map
func (*Chief) IsEnabled ¶
func (chief *Chief) IsEnabled(name WorkerName) bool
IsEnabled checks is enable worker with passed `name`.
func (*Chief) Run ¶
func (chief *Chief) Run(workers ...WorkerName) error
Run enables passed workers, starts worker pool and lock context until it intercepts `syscall.SIGTERM`, `syscall.SIGINT`. NOTE: Use this method ONLY as a top-level action.
func (*Chief) RunWithContext ¶
func (chief *Chief) RunWithContext(ctx context.Context, workers ...WorkerName) error
RunWithContext add function waitForSignal for RunWithLocker
func (*Chief) RunWithLocker ¶
func (chief *Chief) RunWithLocker(locker func(), workers ...WorkerName) (err error)
RunWithLocker `locker` function should block the execution context and wait for some signal to stop.
type CtxKey ¶
type CtxKey string
CtxKey is the type of context keys for the values placed by`Chief`.
const ( // CtxKeyLog is a context key for a `*logrus.Entry` value. CtxKeyLog CtxKey = "chief-log" )
type ExitCode ¶
type ExitCode int
ExitCode custom type
const ( // ExitCodeOk means that the worker is stopped. ExitCodeOk ExitCode = iota // ExitCodeInterrupted means that the work cycle has been interrupted and can be restarted. ExitCodeInterrupted // ExitCodeFailed means that the worker fails. ExitCodeFailed // ExitReinitReq means that the worker can't do job and requires reinitialization. ExitReinitReq )
type Message ¶
type Message struct { UID int64 Target WorkerName Sender WorkerName Data interface{} }
Message type declaration
type WContext ¶
type WContext interface { context.Context SendMessage(target WorkerName, data interface{}) error MessageBus() <-chan *Message }
WContext declare interface for workers with context
func NewContext ¶
func NewContext(ctx context.Context, name WorkerName, in, out chan *Message) WContext
NewContext constructor for worker with context
type Worker ¶
type Worker interface { // Init initializes new instance of the `Worker` implementation, // this context should be used only as Key/Value transmitter, // DO NOT use it for `<- ctx.Done()` Init(ctx context.Context) Worker // RestartOnFail determines the need to restart the worker, if it stopped. RestartOnFail() bool // Run starts the `Worker` instance execution. Run(ctx WContext) ExitCode }
Worker is an interface for async workers which launches and manages by the `Chief`.
type WorkerExistRule ¶
type WorkerExistRule struct { AvailableWorkers map[WorkerName]struct{} // contains filtered or unexported fields }
WorkerExistRule implements Rule interface for validation
func (*WorkerExistRule) Error ¶
func (r *WorkerExistRule) Error(message string) *WorkerExistRule
Error sets the error message for the rule.
func (*WorkerExistRule) Validate ¶
func (r *WorkerExistRule) Validate(value interface{}) error
Validate checks that service exist on the system
type WorkerPool ¶
type WorkerPool struct {
// contains filtered or unexported fields
}
WorkerPool is
func (*WorkerPool) DisableWorker ¶
func (pool *WorkerPool) DisableWorker(name WorkerName) error
DisableWorker sets state `WorkerDisabled` for workers with the specified `name`.
func (*WorkerPool) EnableWorker ¶
func (pool *WorkerPool) EnableWorker(name WorkerName) error
EnableWorker sets state `WorkerEnabled` for workers with the specified `name`.
func (*WorkerPool) FailWorker ¶
func (pool *WorkerPool) FailWorker(name WorkerName) error
FailWorker sets state `WorkerFailed` for workers with the specified `name`.
func (*WorkerPool) GetState ¶
func (pool *WorkerPool) GetState(name WorkerName) sam.State
GetState returns current state for workers with the specified `name`.
func (*WorkerPool) GetWorkersStates ¶
func (pool *WorkerPool) GetWorkersStates() map[WorkerName]sam.State
GetWorkersStates returns current state of all workers.
func (*WorkerPool) InitWorker ¶
func (pool *WorkerPool) InitWorker(ctx context.Context, name WorkerName) error
InitWorker initializes all present workers.
func (*WorkerPool) IsDisabled ¶
func (pool *WorkerPool) IsDisabled(name WorkerName) bool
IsDisabled checks is disabled worker with passed `name`.
func (*WorkerPool) IsEnabled ¶
func (pool *WorkerPool) IsEnabled(name WorkerName) bool
IsEnabled checks is enabled worker with passed `name`.
func (*WorkerPool) IsRun ¶
func (pool *WorkerPool) IsRun(name WorkerName) bool
IsRun checks is active worker with passed `name`.
func (*WorkerPool) ReplaceWorker ¶
func (pool *WorkerPool) ReplaceWorker(name WorkerName, worker Worker)
ReplaceWorker replace worker in the pool
func (*WorkerPool) RunWorkerExec ¶
func (pool *WorkerPool) RunWorkerExec(name WorkerName, ctx WContext) (err error)
RunWorkerExec adds worker into pool.
func (*WorkerPool) SetState ¶
func (pool *WorkerPool) SetState(name WorkerName, state sam.State) error
SetState updates state of specified worker.
func (*WorkerPool) SetWorker ¶
func (pool *WorkerPool) SetWorker(name WorkerName, worker Worker)
SetWorker adds worker into pool.
func (*WorkerPool) StartWorker ¶
func (pool *WorkerPool) StartWorker(name WorkerName) error
StartWorker sets state `WorkerEnabled` for workers with the specified `name`.
func (*WorkerPool) StopWorker ¶
func (pool *WorkerPool) StopWorker(name WorkerName) error
StopWorker sets state `WorkerStopped` for workers with the specified `name`.