messaging

package
v0.0.0-...-f6ab670 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2024 License: Apache-2.0 Imports: 11 Imported by: 5

Documentation

Index

Constants

View Source
const (

	// Backend states with potential future addition to further manage backend
	// Currently with Init and Processing statuses to avoid idle process leak.
	BackendStateInit int32 = 0
	BackendStateProc int32 = 1
)
View Source
const (
	MessageTypePluginConfig = "pluginconfig"
	MessageTypeComplete     = "complete"
	MessageTypeReply        = "reply"
	MessageTypeCancel       = "cancel"
)

Message types

Variables

This section is empty.

Functions

func CreateDatagram

func CreateDatagram(t MessageType, content interface{}) (string, error)

CreateDatagram marshals a given arbitrary object to raw json string Message schema is determined by the current version, content struct is indicated by type field TODO add version handling

func GetLatestVersion

func GetLatestVersion() string

GetLatestVersion retrieves the current latest message version of the agent build

func Messaging

func Messaging(log log.T, ipc filewatcherbasedipc.IPCChannel, backend MessagingBackend, stopTimer chan bool) (err error)

Messaging implements the duplex transmission between master and worker, it send datagram it received to data backend, TODO ipc should not be destroyed within this worker, destroying ipc object should be done in its caller: Executer

Types

type ExecuterBackend

type ExecuterBackend struct {
	// contains filtered or unexported fields
}

Executer backend formulate the run request to the worker, and collect back the responses from worker

func NewExecuterBackend

func NewExecuterBackend(log log.T, output chan contracts.DocumentResult, docState *contracts.DocumentState, cancelFlag task.CancelFlag) *ExecuterBackend

func (*ExecuterBackend) Accept

func (p *ExecuterBackend) Accept() <-chan string

func (*ExecuterBackend) Close

func (p *ExecuterBackend) Close()

func (*ExecuterBackend) CloseStop

func (p *ExecuterBackend) CloseStop()

func (*ExecuterBackend) ForceQuit

func (p *ExecuterBackend) ForceQuit()

func (*ExecuterBackend) GetBackendState

func (p *ExecuterBackend) GetBackendState() int32

func (*ExecuterBackend) Process

func (p *ExecuterBackend) Process(datagram string) error

TODO handle error and logging, when err, ask messaging to stop TODO version handling?

func (*ExecuterBackend) Stop

func (p *ExecuterBackend) Stop() <-chan int

type Message

type Message struct {
	Version string      `json:"version"`
	Type    MessageType `json:"type"`
	Content string      `json:"content"`
}

type MessageType

type MessageType string

func ParseDatagram

func ParseDatagram(datagram string) (MessageType, string)

TODO add version and error handling

type MessagingBackend

type MessagingBackend interface {
	Accept() <-chan string
	Stop() <-chan int
	//Process a given datagram, should not be blocked
	Process(string) error
	//Sets input channel to nil.
	Close()
	//Sets stop channel to nil.
	CloseStop()
	// Get backend state
	GetBackendState() int32
	// Force Quit backend and exits the messaging block.
	ForceQuit()
}

MessagingBackend defines an asycn message in/out processing pipeline

type PluginRunner

type PluginRunner func(
	context context.T,
	docState contracts.DocumentState,
	resChan chan contracts.PluginResult,
	cancelFlag task.CancelFlag,
)

type WorkerBackend

type WorkerBackend struct {
	// contains filtered or unexported fields
}

worker backend receives request messages from master, controls a pluginRunner based off the request and send reponses to Executer

func NewWorkerBackend

func NewWorkerBackend(ctx context.T, runner PluginRunner) *WorkerBackend

func (*WorkerBackend) Accept

func (p *WorkerBackend) Accept() <-chan string

func (*WorkerBackend) Close

func (p *WorkerBackend) Close()

func (*WorkerBackend) CloseStop

func (p *WorkerBackend) CloseStop()

func (*WorkerBackend) ForceQuit

func (p *WorkerBackend) ForceQuit()

func (*WorkerBackend) GetBackendState

func (p *WorkerBackend) GetBackendState() int32

func (*WorkerBackend) Process

func (p *WorkerBackend) Process(datagram string) error

func (*WorkerBackend) Stop

func (p *WorkerBackend) Stop() <-chan int

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL