Documentation
¶
Index ¶
- Constants
- type Action
- type ActionFunc
- type ActionHub
- func (ah *ActionHub) Do(actionName string, creator WorkGroupCreator, slaves ...SlaveWorkerRequest) error
- func (ah *ActionHub) DoAlias(actionName string, pubsubTopic string, creator WorkGroupCreator, ...) error
- func (ah *ActionHub) Start()
- func (ah *ActionHub) Stats() []WorkerStat
- func (ah *ActionHub) Stop()
- func (ah *ActionHub) Wait()
- type BehaviourType
- type Escalation
- type EscalationNotification
- type EscalationProtocol
- type InstanceType
- type Job
- type MasterWorkerGroup
- type SlaveWorkerRequest
- type WorkGroupCreator
- type WorkRequest
- type WorkerConfig
- type WorkerEscalationNotification
- type WorkerGroup
- func (w *WorkerGroup) Ctx() context.Context
- func (w *WorkerGroup) HandleMessage(ctx context.Context, message sabuhp.Message, t sabuhp.Transport) error
- func (w *WorkerGroup) Start()
- func (w *WorkerGroup) Stats() WorkerStat
- func (w *WorkerGroup) Stop()
- func (w *WorkerGroup) Wait()
- func (w *WorkerGroup) WaitRestart()
- type WorkerProtocol
- type WorkerRequest
- type WorkerStat
- type WorkerStats
- type WorkerTemplateRegistry
Constants ¶
const ( DefaultMaxWorkers = 10 DefaultMessageBuffer = 10 DefaultMinWorkers = 1 DefaultIdleness = time.Minute DefaultMessageAcceptWait = time.Second / 2 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ActionFunc ¶
type ActionHub ¶
type ActionHub struct { Pubsub sabuhp.MessageBus Relay *sabuhp.BusRelay Logger sabuhp.Logger Injector *injectors.Injector WorkRegistry *WorkerTemplateRegistry Context context.Context CancelFn context.CancelFunc EscalationFunc EscalationNotification // contains filtered or unexported fields }
ActionHub is a worker pool supervisor which sits to manage the execution of functions deployed for processing of commands asynchronously, each responding with another message if needed.
func NewActionHub ¶
func NewActionHub( ctx context.Context, escalationHandler EscalationNotification, templates *WorkerTemplateRegistry, injector *injectors.Injector, pubsub sabuhp.MessageBus, relay *sabuhp.BusRelay, logger sabuhp.Logger, ) *ActionHub
NewActionHub returns a new instance of a ActionHub.
func (*ActionHub) Do ¶
func (ah *ActionHub) Do(actionName string, creator WorkGroupCreator, slaves ...SlaveWorkerRequest) error
Do registers giving action (based on actionName) with using action name as address with the pubsub for receiving commands.
Actions generally are unique self contained pure functions describing special case behaviours that perform some action and can if required respond with other commands in return. There payloads are specific and known at implementation time, so that the contract of what an action does are clear.
Generally Actions based on configuration of their work group templates will scaling accordingly if possible to meet processing needs based on worker group constraints. Hence generally unless special cases occur creating multiple worker workerGroupInstances of the same action is not necessary.
Do be aware the actionName is very unique and must be different from others both in the workers template registry as well.
func (*ActionHub) DoAlias ¶
func (ah *ActionHub) DoAlias( actionName string, pubsubTopic string, creator WorkGroupCreator, slaves ...SlaveWorkerRequest, ) error
DoAlias registers a WorkerCreator based on the action name but instead of using the action name as the pubsub topic, uses the value of pubsubTopic provided allowing you to link an action work group to an external pre-defined topic.
We always encourage the actionName to be the topic because they based on configuration are able to scale without the need to spawn separate work groups. But there will be cases for the need to provide support to handle outside events that may not necessary be termed with the action name due to lack of control or due to integration, hence DoAlias exists for this.
The general rule is when you can control the topic name, use the actionName, and if you don't know which to use, then its a safe bet to say: just use the action name, hence using ActionHub.Do.
func (*ActionHub) Stats ¶
func (ah *ActionHub) Stats() []WorkerStat
type BehaviourType ¶
type BehaviourType int
const ( DoNothing BehaviourType = iota RestartAll RestartOne StopAllAndEscalate )
type Escalation ¶
type Escalation struct { // Err is the generated error with tracing data. Err error // Additional data to be attached, could be the returned value // of recover() for a panic protocol. Data interface{} // WorkerProtocol is the escalation protocol communicated by the worker. WorkerProtocol WorkerProtocol // GroupProtocol is the escalation protocol being used by the worker group // for handling the worker protocol. GroupProtocol EscalationProtocol // PendingMessages are the commands left to be processed when // an escalation occurred. It's only set when it's a KillAndEscalate // protocol. PendingMessages chan *sabuhp.Message // OffendingMessage is the message which caused the PanicProtocol // Only has a value when it's a PanicProtocol. OffendingMessage *sabuhp.Message }
type EscalationNotification ¶
type EscalationNotification func(escalation Escalation, hub *ActionHub)
type EscalationProtocol ¶
type EscalationProtocol int
const ( RestartProtocol EscalationProtocol = iota KillAndEscalateProtocol )
type InstanceType ¶
type InstanceType int
const ( NullInstance InstanceType = iota SingleInstance ScalingInstances OneTimeInstance )
type MasterWorkerGroup ¶
type MasterWorkerGroup struct { Master *WorkerGroup Slaves map[string]*WorkerGroup }
MasterWorkerGroup implements a group of worker-group nodes where a master node has specific actions group along as dependent actions where the death of the master leads to the death of the slaves.
func (*MasterWorkerGroup) AddSlave ¶
func (mg *MasterWorkerGroup) AddSlave(slave *WorkerGroup)
func (*MasterWorkerGroup) Start ¶
func (mg *MasterWorkerGroup) Start()
func (*MasterWorkerGroup) Stats ¶
func (mg *MasterWorkerGroup) Stats() []WorkerStat
func (*MasterWorkerGroup) Stop ¶
func (mg *MasterWorkerGroup) Stop()
type SlaveWorkerRequest ¶
type SlaveWorkerRequest struct { // ActionName represent the action name for this // request work group, which identifiers in cases // where the PubSubAddress has no value is the // topic to listen for commands on. ActionName string GroupName string // WorkerCreator is the creation function which will be supplied // the initial WorkerConfig which should be populated accordingly // by the creator to create and return a WorkerGroup which will // service all commands for this action. Action Action // Attributes which are allowed for configuration in the WorkerGroupCreator // as slaves are very special, users do not have rights to decide their behaviour // and instance types. MessageBufferSize int MinWorker int MaxWorkers int MaxIdleness time.Duration MessageDeliveryWait time.Duration }
type WorkGroupCreator ¶
type WorkGroupCreator func(config WorkerConfig) *WorkerGroup
WorkGroupCreator exposes a method which takes a WorkerConfig which the function can modify as it sees fit and return a WorkerGroup which will be managed by the a ActionHub.
type WorkRequest ¶
type WorkerConfig ¶
type WorkerConfig struct { ActionName string Addr string MessageBufferSize int Action Action MinWorker int MaxWorkers int Injector *injectors.Injector Behaviour BehaviourType Instance InstanceType Context context.Context EscalationNotification WorkerEscalationNotification MaxIdleness time.Duration MessageDeliveryWait time.Duration }
WorkerConfig contains necessary properties required by a Worker
An important notice to be given is to ensure any blocking operation in the WorkerConfig.EscalationNotification is shot into a goroutine, else this will block the WorkerGroup's internal run loop. The responsibility is shifted to the user to provide a more concise, expected behaviour, hence the user should be aware.
type WorkerEscalationNotification ¶
type WorkerEscalationNotification func(escalation *Escalation, wk *WorkerGroup)
type WorkerGroup ¶
type WorkerGroup struct {
// contains filtered or unexported fields
}
WorkerGroup embodies a small action based workgroup which at their default state are scaling functions for execution across their maximum allowed range. WorkerGroup provide other settings like SingleInstance where only one function is allowed or OneTime instance type where for a function runs once and dies off.
func NewWorkGroup ¶
func NewWorkGroup(config WorkerConfig) *WorkerGroup
func (*WorkerGroup) Ctx ¶
func (w *WorkerGroup) Ctx() context.Context
func (*WorkerGroup) HandleMessage ¶
func (*WorkerGroup) Start ¶
func (w *WorkerGroup) Start()
func (*WorkerGroup) Stats ¶
func (w *WorkerGroup) Stats() WorkerStat
func (*WorkerGroup) Stop ¶
func (w *WorkerGroup) Stop()
func (*WorkerGroup) Wait ¶
func (w *WorkerGroup) Wait()
Wait block till the group is stopped or killed
func (*WorkerGroup) WaitRestart ¶
func (w *WorkerGroup) WaitRestart()
WaitRestart will block if there is a restart process occurring when it's called.
type WorkerRequest ¶
type WorkerRequest struct { // Err provides a means of response detailing possible error // that occurred during the processing of creating this worker. Err chan error // ActionName represent the action name for this // request work group, which identifiers in cases // where the PubSubTopic has no value is the // topic to listen for commands on. ActionName string // PubSubTopic provides alternate means of redirecting // a specific unique topic to this action worker, where // commands for these are piped accordingly. PubSubTopic string // PubSubGroup indicates the target group for which the // action will be added into. Similar to a Kafka ConsumerGroup. PubSubGroup string // WorkerCreator is the creation function which will be supplied // the initial WorkerConfig which should be populated accordingly // by the creator to create and return a WorkerGroup which will // service all commands for this action. WorkerCreator WorkGroupCreator // Slaves are personalized workers we want generated with this worker. // Slaves have strict naming format which will be generated automatically // internally. // // Name format: [MasterActionName]/slaves/[SlaveActionName] // // WARNING: Use sparingly, most of your use case are doable with shared // workgroups. // // There are cases where we want a worker specific for a work group // which can be used to hand off specific tasks to handle very // special cases, this then allows specific and limited use // around non-generalist work group slaves. // // Example of such is a dangerous operation that is not intrinsic to // the behaviour of the action but must be done, this can be offloaded // a dangerous task or non-secure operation to the slaves who will // always be running and are never ever going die because they will be // restarted and respawned. // They will exists as far as the master exists. Slaves []SlaveWorkerRequest }
WorkerRequest defines a request template used by the ActionHub to request the creation of a worker group for a giving action.
type WorkerStat ¶
type WorkerStat struct { Addr string MaxWorkers int MinWorkers int TotalMessageReceived int TotalMessageProcessed int TotalEscalations int TotalPanics int TotalRestarts int AvailableWorkerCapacity int TotalCurrentWorkers int TotalCreatedWorkers int TotalKilledWorkers int TotalIdledWorkers int Instance InstanceType BehaviourType BehaviourType }
func (WorkerStat) EncodeObject ¶
func (w WorkerStat) EncodeObject(encoder npkg.ObjectEncoder)
type WorkerStats ¶
type WorkerStats []WorkerStat
func (WorkerStats) EncodeList ¶
func (items WorkerStats) EncodeList(encoder npkg.ListEncoder)
type WorkerTemplateRegistry ¶
type WorkerTemplateRegistry struct {
// contains filtered or unexported fields
}
WorkerTemplateRegistry implements a registry of WorkerRequest templates which predefine specific ActionWorkerRequests that define what should be created to handle work for such actions.
It provides the ActionHub a predefined set of templates to boot up action worker groups based on commands for specific addresses.
func NewWorkerTemplateRegistry ¶
func NewWorkerTemplateRegistry() *WorkerTemplateRegistry
NewWorkerTemplateRegistry returns a new instance of the WorkerTemplateRegistry.
func (*WorkerTemplateRegistry) Delete ¶
func (ar *WorkerTemplateRegistry) Delete(actionName string)
func (*WorkerTemplateRegistry) Has ¶
func (ar *WorkerTemplateRegistry) Has(actionName string) bool
func (*WorkerTemplateRegistry) Register ¶
func (ar *WorkerTemplateRegistry) Register(template WorkerRequest)
func (*WorkerTemplateRegistry) Template ¶
func (ar *WorkerTemplateRegistry) Template(actionName string) WorkerRequest