Documentation ¶
Index ¶
- Constants
- type BaseExchangeContext
- type BaseWorker
- func (w *BaseWorker) AddDeferredCommand(command Command)
- func (w *BaseWorker) AddSubworker(name string) chan bool
- func (w *BaseWorker) AreAllSubworkersTerminated() bool
- func (w *BaseWorker) CommandHandler(command Command) bool
- func (w *BaseWorker) DispatchSubworker(name string, runSubWorker func() int, interval int, logOptOut bool)
- func (w *BaseWorker) GetCSSURL() string
- func (w *BaseWorker) GetExchangeId() string
- func (w *BaseWorker) GetExchangeToken() string
- func (w *BaseWorker) GetExchangeURL() string
- func (w *BaseWorker) GetHTTPFactory() *config.HTTPClientFactory
- func (w *BaseWorker) GetName() string
- func (w *BaseWorker) GetNoWorkInterval() int
- func (w *BaseWorker) HandleFrameworkCommands(command Command) (bool, bool)
- func (w *BaseWorker) HasDeferredCommands() bool
- func (w *BaseWorker) Initialize() bool
- func (w *BaseWorker) IsSubworkerTerminated(name string) bool
- func (w *BaseWorker) IsWorkerShuttingDown() bool
- func (w *BaseWorker) NoWorkHandler()
- func (w *BaseWorker) RequeueDeferredCommands()
- func (w *BaseWorker) SetDeferredDelay(delay int)
- func (w *BaseWorker) SetNoWorkInterval(interval int)
- func (w *BaseWorker) SetSubworkerTerminated(name string)
- func (w *BaseWorker) SetWorkerShuttingDown()
- func (w *BaseWorker) Start(worker Worker, noWorkInterval int)
- func (w *BaseWorker) String() string
- func (w *BaseWorker) TerminateSubworker(name string)
- func (w *BaseWorker) TerminateSubworkers()
- type BeginShutdownCommand
- type Command
- type Manager
- type MessageHandler
- type MessageHandlerRegistry
- type SubWorker
- type SubWorkerTerminationCommand
- type TerminateCommand
- type Worker
- type WorkerStatus
- type WorkerStatusManager
- func (w *WorkerStatusManager) GetAllSubworkerStatus(name string) map[string]string
- func (w *WorkerStatusManager) GetSubworkerStatus(name string, subname string) string
- func (w *WorkerStatusManager) GetWorkerStatus(name string) string
- func (w *WorkerStatusManager) SetSubworkerStatus(name string, subname string, status string)
- func (w *WorkerStatusManager) SetWorkerStatus(name string, status string)
Constants ¶
const ( STATUS_NONE = "none" STATUS_ADDED = "added" STATUS_STARTED = "started" STATUS_INITIALIZED = "initialized" STATUS_INIT_FAILED = "initialization failed" STATUS_TERMINATING = "terminating" STATUS_TERMINATED = "terminated" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseExchangeContext ¶
type BaseExchangeContext struct { Id string Token string URL string CSSURL string HTTPFactory *config.HTTPClientFactory }
func NewExchangeContext ¶
func NewExchangeContext(id string, token string, url string, cssurl string, httpFactory *config.HTTPClientFactory) *BaseExchangeContext
type BaseWorker ¶
type BaseWorker struct { Name string Manager Commands chan Command // workers can receive commands DeferredCommands []Command // commands can be deferred DeferredDelay int // the number of seconds to delay before retrying SubWorkers map[string]*SubWorker // workers can have sub go routines that they own ShuttingDown bool EC *BaseExchangeContext // Holds the exchange context state // contains filtered or unexported fields }
func NewBaseWorker ¶
func NewBaseWorker(name string, cfg *config.HorizonConfig, ec *BaseExchangeContext) BaseWorker
func (*BaseWorker) AddDeferredCommand ¶
func (w *BaseWorker) AddDeferredCommand(command Command)
func (*BaseWorker) AddSubworker ¶
func (w *BaseWorker) AddSubworker(name string) chan bool
Subworker framework functions
func (*BaseWorker) AreAllSubworkersTerminated ¶
func (w *BaseWorker) AreAllSubworkersTerminated() bool
func (*BaseWorker) CommandHandler ¶
func (w *BaseWorker) CommandHandler(command Command) bool
This function is called every time a command arrives on the worker's command queue.
func (*BaseWorker) DispatchSubworker ¶
func (w *BaseWorker) DispatchSubworker(name string, runSubWorker func() int, interval int, logOptOut bool)
func (*BaseWorker) GetCSSURL ¶
func (w *BaseWorker) GetCSSURL() string
func (*BaseWorker) GetExchangeId ¶
func (w *BaseWorker) GetExchangeId() string
This function should return the id in the form org/id.
func (*BaseWorker) GetExchangeToken ¶
func (w *BaseWorker) GetExchangeToken() string
func (*BaseWorker) GetExchangeURL ¶
func (w *BaseWorker) GetExchangeURL() string
func (*BaseWorker) GetHTTPFactory ¶
func (w *BaseWorker) GetHTTPFactory() *config.HTTPClientFactory
func (*BaseWorker) GetName ¶
func (w *BaseWorker) GetName() string
func (*BaseWorker) GetNoWorkInterval ¶
func (w *BaseWorker) GetNoWorkInterval() int
func (*BaseWorker) HandleFrameworkCommands ¶
func (w *BaseWorker) HandleFrameworkCommands(command Command) (bool, bool)
Return handled (boolean) and terminate(boolean)
func (*BaseWorker) HasDeferredCommands ¶
func (w *BaseWorker) HasDeferredCommands() bool
func (*BaseWorker) Initialize ¶
func (w *BaseWorker) Initialize() bool
This function is called one time, when the worker first starts. The function returns false when it was not successful and the worker should terminate.
func (*BaseWorker) IsSubworkerTerminated ¶
func (w *BaseWorker) IsSubworkerTerminated(name string) bool
func (*BaseWorker) IsWorkerShuttingDown ¶
func (w *BaseWorker) IsWorkerShuttingDown() bool
func (*BaseWorker) NoWorkHandler ¶
func (w *BaseWorker) NoWorkHandler()
This function is called if the worker wants a chance to run after no doing anything for a given period of time. If the Start() method is called with a zero noWorkInterval, then this function will never be called.
func (*BaseWorker) RequeueDeferredCommands ¶
func (w *BaseWorker) RequeueDeferredCommands()
func (*BaseWorker) SetDeferredDelay ¶
func (w *BaseWorker) SetDeferredDelay(delay int)
func (*BaseWorker) SetNoWorkInterval ¶
func (w *BaseWorker) SetNoWorkInterval(interval int)
func (*BaseWorker) SetSubworkerTerminated ¶
func (w *BaseWorker) SetSubworkerTerminated(name string)
func (*BaseWorker) SetWorkerShuttingDown ¶
func (w *BaseWorker) SetWorkerShuttingDown()
func (*BaseWorker) Start ¶
func (w *BaseWorker) Start(worker Worker, noWorkInterval int)
This function kicks off the go routine that the worker's logic runs in.
func (*BaseWorker) String ¶
func (w *BaseWorker) String() string
func (*BaseWorker) TerminateSubworker ¶
func (w *BaseWorker) TerminateSubworker(name string)
func (*BaseWorker) TerminateSubworkers ¶
func (w *BaseWorker) TerminateSubworkers()
type BeginShutdownCommand ¶
type BeginShutdownCommand struct { }
A builtin command that workers use to terminate subworkers.
func NewBeginShutdownCommand ¶
func NewBeginShutdownCommand() *BeginShutdownCommand
func (*BeginShutdownCommand) ShortString ¶
func (b *BeginShutdownCommand) ShortString() string
func (*BeginShutdownCommand) String ¶
func (b *BeginShutdownCommand) String() string
type Manager ¶
type Manager struct { Config *config.HorizonConfig Messages chan events.Message // managers send messages }
The manager that holds the outbound message queue for this worker.
type MessageHandler ¶
type MessageHandler interface { GetName() string NewEvent(events.Message) Messages() chan events.Message }
All workers need to implement this interface in order to have events/messages dispatched to them.
type MessageHandlerRegistry ¶
type MessageHandlerRegistry struct {
Handlers map[string]*MessageHandler
}
func NewMessageHandlerRegistry ¶
func NewMessageHandlerRegistry() *MessageHandlerRegistry
func (*MessageHandlerRegistry) Add ¶
func (m *MessageHandlerRegistry) Add(mh interface { MessageHandler })
func (*MessageHandlerRegistry) Contains ¶
func (m *MessageHandlerRegistry) Contains(name string) bool
func (*MessageHandlerRegistry) IsEmpty ¶
func (m *MessageHandlerRegistry) IsEmpty() bool
func (*MessageHandlerRegistry) ProcessEventMessages ¶
func (workers *MessageHandlerRegistry) ProcessEventMessages()
func (*MessageHandlerRegistry) Remove ¶
func (m *MessageHandlerRegistry) Remove(name string)
type SubWorkerTerminationCommand ¶
type SubWorkerTerminationCommand struct {
// contains filtered or unexported fields
}
A builtin command that subworkers can use to tell their parent that they are terminating.
func NewSubWorkerTerminationCommand ¶
func NewSubWorkerTerminationCommand(name string) *SubWorkerTerminationCommand
func (*SubWorkerTerminationCommand) Name ¶
func (s *SubWorkerTerminationCommand) Name() string
func (*SubWorkerTerminationCommand) ShortString ¶
func (s *SubWorkerTerminationCommand) ShortString() string
func (*SubWorkerTerminationCommand) String ¶
func (s *SubWorkerTerminationCommand) String() string
type TerminateCommand ¶
type TerminateCommand struct {
// contains filtered or unexported fields
}
A builtin comand that workers can use to terminate themselves.
func NewTerminateCommand ¶
func NewTerminateCommand(reason string) *TerminateCommand
func (*TerminateCommand) ShortString ¶
func (t *TerminateCommand) ShortString() string
func (*TerminateCommand) String ¶
func (t *TerminateCommand) String() string
type Worker ¶
type Worker interface { Start(worker Worker, noWorkInterval int) // Called by a concrete worker to get the worker framework started Initialize() bool // Called by the worker FW to allow the concrete worker to initialize itself before starting the command loop CommandHandler(command Command) bool // Called by the worker framework when there is a command for the worker to handle NoWorkHandler() // Called by the worker framework when the worker has been idle for noWorkInterval seconds // Methods that implement the ExchangeContext interface in the exchange package. The exchange package does not need // to be imported for this to work. And in fact, doing so would create cyclical dependencies, which are forbidden in go. GetExchangeId() string GetExchangeToken() string GetExchangeURL() string GetCSSURL() string GetHTTPFactory() *config.HTTPClientFactory }
A worker is a first class event/message handler that performs async functions based on event input or by polling a remote resource.
type WorkerStatus ¶
type WorkerStatus struct { Name string `json:"name"` Status string `json:"status"` SubworkerStatus map[string]string `json:"subworker_status"` StatusLock sync.Mutex `json:"-"` // The lock that protects modification from different threads at the same time }
status for a worker
func (*WorkerStatus) SetSubworkerStatus ¶
func (w *WorkerStatus) SetSubworkerStatus(name string, status string)
func (*WorkerStatus) SetWorkerStatus ¶
func (w *WorkerStatus) SetWorkerStatus(status string)
type WorkerStatusManager ¶
type WorkerStatusManager struct { Workers map[string]*WorkerStatus `json:"workers"` StatusLog []string `json:"worker_status_log"` ManagerLock sync.Mutex `json:"-"` // The lock that protects modification from different threads at the same time }
func GetWorkerStatusManager ¶
func GetWorkerStatusManager() *WorkerStatusManager
func NewWorkerStatusManager ¶
func NewWorkerStatusManager() *WorkerStatusManager
func (*WorkerStatusManager) GetAllSubworkerStatus ¶
func (w *WorkerStatusManager) GetAllSubworkerStatus(name string) map[string]string
Get all the subworer status for the given worker. It returns nil if the worker does not exist.
func (*WorkerStatusManager) GetSubworkerStatus ¶
func (w *WorkerStatusManager) GetSubworkerStatus(name string, subname string) string
Get the status string for the given subworker. It returns an empty string if the subworker does not exist.
func (*WorkerStatusManager) GetWorkerStatus ¶
func (w *WorkerStatusManager) GetWorkerStatus(name string) string
Get the status string for the given worker. It returns an empty string if the worker does not exist.
func (*WorkerStatusManager) SetSubworkerStatus ¶
func (w *WorkerStatusManager) SetSubworkerStatus(name string, subname string, status string)
func (*WorkerStatusManager) SetWorkerStatus ¶
func (w *WorkerStatusManager) SetWorkerStatus(name string, status string)