worker

package
v3.2.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2020 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	STATUS_NONE        = "none"
	STATUS_ADDED       = "added"
	STATUS_STARTED     = "started"
	STATUS_INITIALIZED = "initialized"
	STATUS_INIT_FAILED = "initialization failed"
	STATUS_TERMINATING = "terminating"
	STATUS_TERMINATED  = "terminated"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseExchangeContext

type BaseExchangeContext struct {
	Id          string
	Token       string
	URL         string
	CSSURL      string
	HTTPFactory *config.HTTPClientFactory
}

func NewExchangeContext

func NewExchangeContext(id string, token string, url string, cssurl string, httpFactory *config.HTTPClientFactory) *BaseExchangeContext

type BaseWorker

type BaseWorker struct {
	Name string
	Manager
	Commands         chan Command          // workers can receive commands
	DeferredCommands []Command             // commands can be deferred
	DeferredDelay    int                   // the number of seconds to delay before retrying
	SubWorkers       map[string]*SubWorker // workers can have sub go routines that they own
	ShuttingDown     bool
	EC               *BaseExchangeContext // Holds the exchange context state
	// contains filtered or unexported fields
}

func NewBaseWorker

func NewBaseWorker(name string, cfg *config.HorizonConfig, ec *BaseExchangeContext) BaseWorker

func (*BaseWorker) AddDeferredCommand

func (w *BaseWorker) AddDeferredCommand(command Command)

func (*BaseWorker) AddSubworker

func (w *BaseWorker) AddSubworker(name string) chan bool

Subworker framework functions

func (*BaseWorker) AreAllSubworkersTerminated

func (w *BaseWorker) AreAllSubworkersTerminated() bool

func (*BaseWorker) CommandHandler

func (w *BaseWorker) CommandHandler(command Command) bool

This function is called every time a command arrives on the worker's command queue.

func (*BaseWorker) DispatchSubworker

func (w *BaseWorker) DispatchSubworker(name string, runSubWorker func() int, interval int, logOptOut bool)

func (*BaseWorker) GetCSSURL

func (w *BaseWorker) GetCSSURL() string

func (*BaseWorker) GetExchangeId

func (w *BaseWorker) GetExchangeId() string

This function should return the id in the form org/id.

func (*BaseWorker) GetExchangeToken

func (w *BaseWorker) GetExchangeToken() string

func (*BaseWorker) GetExchangeURL

func (w *BaseWorker) GetExchangeURL() string

func (*BaseWorker) GetHTTPFactory

func (w *BaseWorker) GetHTTPFactory() *config.HTTPClientFactory

func (*BaseWorker) GetName

func (w *BaseWorker) GetName() string

func (*BaseWorker) GetNoWorkInterval

func (w *BaseWorker) GetNoWorkInterval() int

func (*BaseWorker) HandleFrameworkCommands

func (w *BaseWorker) HandleFrameworkCommands(command Command) (bool, bool)

Return handled (boolean) and terminate(boolean)

func (*BaseWorker) HasDeferredCommands

func (w *BaseWorker) HasDeferredCommands() bool

func (*BaseWorker) Initialize

func (w *BaseWorker) Initialize() bool

This function is called one time, when the worker first starts. The function returns false when it was not successful and the worker should terminate.

func (*BaseWorker) IsSubworkerTerminated

func (w *BaseWorker) IsSubworkerTerminated(name string) bool

func (*BaseWorker) IsWorkerShuttingDown

func (w *BaseWorker) IsWorkerShuttingDown() bool

func (*BaseWorker) NoWorkHandler

func (w *BaseWorker) NoWorkHandler()

This function is called if the worker wants a chance to run after no doing anything for a given period of time. If the Start() method is called with a zero noWorkInterval, then this function will never be called.

func (*BaseWorker) RequeueDeferredCommands

func (w *BaseWorker) RequeueDeferredCommands()

func (*BaseWorker) SetDeferredDelay

func (w *BaseWorker) SetDeferredDelay(delay int)

func (*BaseWorker) SetNoWorkInterval

func (w *BaseWorker) SetNoWorkInterval(interval int)

func (*BaseWorker) SetSubworkerTerminated

func (w *BaseWorker) SetSubworkerTerminated(name string)

func (*BaseWorker) SetWorkerShuttingDown

func (w *BaseWorker) SetWorkerShuttingDown()

func (*BaseWorker) Start

func (w *BaseWorker) Start(worker Worker, noWorkInterval int)

This function kicks off the go routine that the worker's logic runs in.

func (*BaseWorker) String

func (w *BaseWorker) String() string

func (*BaseWorker) TerminateSubworker

func (w *BaseWorker) TerminateSubworker(name string)

func (*BaseWorker) TerminateSubworkers

func (w *BaseWorker) TerminateSubworkers()

type BeginShutdownCommand

type BeginShutdownCommand struct {
}

A builtin command that workers use to terminate subworkers.

func NewBeginShutdownCommand

func NewBeginShutdownCommand() *BeginShutdownCommand

func (*BeginShutdownCommand) ShortString

func (b *BeginShutdownCommand) ShortString() string

func (*BeginShutdownCommand) String

func (b *BeginShutdownCommand) String() string

type Command

type Command interface {
	ShortString() string
}

type Manager

type Manager struct {
	Config   *config.HorizonConfig
	Messages chan events.Message // managers send messages
}

The manager that holds the outbound message queue for this worker.

func (*Manager) String

func (m *Manager) String() string

type MessageHandler

type MessageHandler interface {
	GetName() string
	NewEvent(events.Message)
	Messages() chan events.Message
}

All workers need to implement this interface in order to have events/messages dispatched to them.

type MessageHandlerRegistry

type MessageHandlerRegistry struct {
	Handlers map[string]*MessageHandler
}

func NewMessageHandlerRegistry

func NewMessageHandlerRegistry() *MessageHandlerRegistry

func (*MessageHandlerRegistry) Add

func (m *MessageHandlerRegistry) Add(mh interface {
	MessageHandler
})

func (*MessageHandlerRegistry) Contains

func (m *MessageHandlerRegistry) Contains(name string) bool

func (*MessageHandlerRegistry) IsEmpty

func (m *MessageHandlerRegistry) IsEmpty() bool

func (*MessageHandlerRegistry) ProcessEventMessages

func (workers *MessageHandlerRegistry) ProcessEventMessages()

func (*MessageHandlerRegistry) Remove

func (m *MessageHandlerRegistry) Remove(name string)

type SubWorker

type SubWorker struct {
	TermChan   chan bool
	Terminated bool
}

A go routine that is managed by an anax worker. It is not a full blown worker.

func (*SubWorker) String

func (s *SubWorker) String() string

type SubWorkerTerminationCommand

type SubWorkerTerminationCommand struct {
	// contains filtered or unexported fields
}

A builtin command that subworkers can use to tell their parent that they are terminating.

func NewSubWorkerTerminationCommand

func NewSubWorkerTerminationCommand(name string) *SubWorkerTerminationCommand

func (*SubWorkerTerminationCommand) Name

func (*SubWorkerTerminationCommand) ShortString

func (s *SubWorkerTerminationCommand) ShortString() string

func (*SubWorkerTerminationCommand) String

func (s *SubWorkerTerminationCommand) String() string

type TerminateCommand

type TerminateCommand struct {
	// contains filtered or unexported fields
}

A builtin comand that workers can use to terminate themselves.

func NewTerminateCommand

func NewTerminateCommand(reason string) *TerminateCommand

func (*TerminateCommand) ShortString

func (t *TerminateCommand) ShortString() string

func (*TerminateCommand) String

func (t *TerminateCommand) String() string

type Worker

type Worker interface {
	Start(worker Worker, noWorkInterval int) // Called by a concrete worker to get the worker framework started
	Initialize() bool                        // Called by the worker FW to allow the concrete worker to initialize itself before starting the command loop
	CommandHandler(command Command) bool     // Called by the worker framework when there is a command for the worker to handle
	NoWorkHandler()                          // Called by the worker framework when the worker has been idle for noWorkInterval seconds

	// Methods that implement the ExchangeContext interface in the exchange package. The exchange package does not need
	// to be imported for this to work. And in fact, doing so would create cyclical dependencies, which are forbidden in go.
	GetExchangeId() string
	GetExchangeToken() string
	GetExchangeURL() string
	GetCSSURL() string
	GetHTTPFactory() *config.HTTPClientFactory
}

A worker is a first class event/message handler that performs async functions based on event input or by polling a remote resource.

type WorkerStatus

type WorkerStatus struct {
	Name            string            `json:"name"`
	Status          string            `json:"status"`
	SubworkerStatus map[string]string `json:"subworker_status"`
	StatusLock      sync.Mutex        `json:"-"` // The lock that protects modification from different threads at the same time
}

status for a worker

func (*WorkerStatus) SetSubworkerStatus

func (w *WorkerStatus) SetSubworkerStatus(name string, status string)

func (*WorkerStatus) SetWorkerStatus

func (w *WorkerStatus) SetWorkerStatus(status string)

type WorkerStatusManager

type WorkerStatusManager struct {
	Workers     map[string]*WorkerStatus `json:"workers"`
	StatusLog   []string                 `json:"worker_status_log"`
	ManagerLock sync.Mutex               `json:"-"` // The lock that protects modification from different threads at the same time
}

func GetWorkerStatusManager

func GetWorkerStatusManager() *WorkerStatusManager

func NewWorkerStatusManager

func NewWorkerStatusManager() *WorkerStatusManager

func (*WorkerStatusManager) GetAllSubworkerStatus

func (w *WorkerStatusManager) GetAllSubworkerStatus(name string) map[string]string

Get all the subworer status for the given worker. It returns nil if the worker does not exist.

func (*WorkerStatusManager) GetSubworkerStatus

func (w *WorkerStatusManager) GetSubworkerStatus(name string, subname string) string

Get the status string for the given subworker. It returns an empty string if the subworker does not exist.

func (*WorkerStatusManager) GetWorkerStatus

func (w *WorkerStatusManager) GetWorkerStatus(name string) string

Get the status string for the given worker. It returns an empty string if the worker does not exist.

func (*WorkerStatusManager) SetSubworkerStatus

func (w *WorkerStatusManager) SetSubworkerStatus(name string, subname string, status string)

func (*WorkerStatusManager) SetWorkerStatus

func (w *WorkerStatusManager) SetWorkerStatus(name string, status string)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL