Documentation ¶
Index ¶
- Constants
- func CreateDatagram(t MessageType, content interface{}) (string, error)
- func GetLatestVersion() string
- func Messaging(log log.T, ipc filewatcherbasedipc.IPCChannel, backend MessagingBackend, ...) (err error)
- type ExecuterBackend
- func (p *ExecuterBackend) Accept() <-chan string
- func (p *ExecuterBackend) Close()
- func (p *ExecuterBackend) CloseStop()
- func (p *ExecuterBackend) ForceQuit()
- func (p *ExecuterBackend) GetBackendState() int32
- func (p *ExecuterBackend) Process(datagram string) error
- func (p *ExecuterBackend) Stop() <-chan int
- type Message
- type MessageType
- type MessagingBackend
- type PluginRunner
- type WorkerBackend
Constants ¶
const ( // Backend states with potential future addition to further manage backend // Currently with Init and Processing statuses to avoid idle process leak. BackendStateInit int32 = 0 BackendStateProc int32 = 1 )
const ( MessageTypePluginConfig = "pluginconfig" MessageTypeComplete = "complete" MessageTypeReply = "reply" MessageTypeCancel = "cancel" )
Message types
Variables ¶
This section is empty.
Functions ¶
func CreateDatagram ¶
func CreateDatagram(t MessageType, content interface{}) (string, error)
CreateDatagram marshals a given arbitrary object to raw json string Message schema is determined by the current version, content struct is indicated by type field TODO add version handling
func GetLatestVersion ¶
func GetLatestVersion() string
GetLatestVersion retrieves the current latest message version of the agent build
func Messaging ¶
func Messaging(log log.T, ipc filewatcherbasedipc.IPCChannel, backend MessagingBackend, stopTimer chan bool) (err error)
Messaging implements the duplex transmission between master and worker, it send datagram it received to data backend, TODO ipc should not be destroyed within this worker, destroying ipc object should be done in its caller: Executer
Types ¶
type ExecuterBackend ¶
type ExecuterBackend struct {
// contains filtered or unexported fields
}
Executer backend formulate the run request to the worker, and collect back the responses from worker
func NewExecuterBackend ¶
func NewExecuterBackend(log log.T, output chan contracts.DocumentResult, docState *contracts.DocumentState, cancelFlag task.CancelFlag) *ExecuterBackend
func (*ExecuterBackend) Accept ¶
func (p *ExecuterBackend) Accept() <-chan string
func (*ExecuterBackend) Close ¶
func (p *ExecuterBackend) Close()
func (*ExecuterBackend) CloseStop ¶
func (p *ExecuterBackend) CloseStop()
func (*ExecuterBackend) ForceQuit ¶
func (p *ExecuterBackend) ForceQuit()
func (*ExecuterBackend) GetBackendState ¶
func (p *ExecuterBackend) GetBackendState() int32
func (*ExecuterBackend) Process ¶
func (p *ExecuterBackend) Process(datagram string) error
TODO handle error and logging, when err, ask messaging to stop TODO version handling?
func (*ExecuterBackend) Stop ¶
func (p *ExecuterBackend) Stop() <-chan int
type Message ¶
type Message struct { Version string `json:"version"` Type MessageType `json:"type"` Content string `json:"content"` }
type MessageType ¶
type MessageType string
func ParseDatagram ¶
func ParseDatagram(datagram string) (MessageType, string)
TODO add version and error handling
type MessagingBackend ¶
type MessagingBackend interface { Accept() <-chan string Stop() <-chan int //Process a given datagram, should not be blocked Process(string) error //Sets input channel to nil. Close() //Sets stop channel to nil. CloseStop() // Get backend state GetBackendState() int32 // Force Quit backend and exits the messaging block. ForceQuit() }
MessagingBackend defines an asycn message in/out processing pipeline
type PluginRunner ¶
type PluginRunner func( context context.T, docState contracts.DocumentState, resChan chan contracts.PluginResult, cancelFlag task.CancelFlag, )
type WorkerBackend ¶
type WorkerBackend struct {
// contains filtered or unexported fields
}
worker backend receives request messages from master, controls a pluginRunner based off the request and send reponses to Executer
func NewWorkerBackend ¶
func NewWorkerBackend(ctx context.T, runner PluginRunner) *WorkerBackend
func (*WorkerBackend) Accept ¶
func (p *WorkerBackend) Accept() <-chan string
func (*WorkerBackend) Close ¶
func (p *WorkerBackend) Close()
func (*WorkerBackend) CloseStop ¶
func (p *WorkerBackend) CloseStop()
func (*WorkerBackend) ForceQuit ¶
func (p *WorkerBackend) ForceQuit()
func (*WorkerBackend) GetBackendState ¶
func (p *WorkerBackend) GetBackendState() int32
func (*WorkerBackend) Process ¶
func (p *WorkerBackend) Process(datagram string) error
func (*WorkerBackend) Stop ¶
func (p *WorkerBackend) Stop() <-chan int