runtime

package
v0.0.0-...-387885d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2024 License: GPL-3.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const CRITICAL_ERROR_MSG = "__critical_error__"
View Source
const RAISED_ERROR_MSG = "__raised_error__"

Variables

View Source
var DefaultFlowConfig = FlowConfig{Context: context.TODO(), DedicateOSThread: false,
	MaxRetries: 5, ErrorHandlingFunc: RestartHandler, MaxIdleTime: 10 * time.Second, ShutdownMode: Auto}

Functions

func RestartHandler

func RestartHandler(runner *Runner, node NodeIFace, startMessage MessageIFace, err error)

Types

type CallResponse

type CallResponse struct {
	// contains filtered or unexported fields
}

func (CallResponse) GetTimeStamp

func (cr CallResponse) GetTimeStamp() time.Time

type CallStarted

type CallStarted struct {
	// contains filtered or unexported fields
}

func (CallStarted) GetTimeStamp

func (cs CallStarted) GetTimeStamp() time.Time

type Context

type Context struct {
	// contains filtered or unexported fields
}

func NewContext

func NewContext(configPath string, logger utils.LoggerIFace) *Context

func (*Context) AddFlow

func (ctx *Context) AddFlow(name string) *Flow

AddFlow should be used to add new flow to the context to handle The passed name param can be used later to run or get the flow object

func (*Context) CloneFlow

func (ctx *Context) CloneFlow(flow *Flow) *Flow

func (*Context) GetFlowByName

func (ctx *Context) GetFlowByName(name string) (FlowIFace, bool)

GetFlowByName should be used to access a flow object It should be used only from outside of the scope of the runner

func (*Context) GetLogger

func (ctx *Context) GetLogger() utils.LoggerIFace

func (*Context) GetRunnerByFlow

func (ctx *Context) GetRunnerByFlow(flow *Flow) *Runner

GetRunnerByFlow can used to get runner object of a flow It can be useful for debugging or testing only

func (*Context) IsMessageTypeRegistered

func (ctx *Context) IsMessageTypeRegistered(mType MessageType) bool

IsMessageTypeRegistered can be used to check whether a message type is known by runtime

func (*Context) IsNodeTypeRegistered

func (ctx *Context) IsNodeTypeRegistered(nType db.NodeType) bool

IsMessageTypeRegistered can be used to check whether a node type is known by runtime

func (*Context) LoadConfig

func (ctx *Context) LoadConfig(path string)

func (*Context) NewMessageByType

func (ctx *Context) NewMessageByType(mType MessageType) MessageIFace

NewMessageByType can be used to create a registered message

func (*Context) NewNodeByType

func (ctx *Context) NewNodeByType(nType db.NodeType) NodeIFace

NewNodeByType can be used to create a registered node

func (*Context) RegisterMessage

func (ctx *Context) RegisterMessage(mType MessageType, createFunc CreateMessageFunction)

RegisterMessage should be used before calling a flow which has any node of that type It is essential for the runtime to have the type registered in order to work correctly

func (*Context) RegisterNode

func (ctx *Context) RegisterNode(nType db.NodeType, createFunc CreateNodeFunction)

RegisterNode should be used to be able to create a node as part of a flow It is also essential for the runtime to have the type registered in order to work correctly

func (*Context) RemoveFlow

func (ctx *Context) RemoveFlow(flow *Flow)

RemoveFlow can be used to remove a flow from the context It can be useful only in case of a memory restrictions as usually it will be handled by the GC

func (*Context) RunFlowByID

func (ctx *Context) RunFlowByID(config *FlowConfig, flowID globals.UID, message MessageIFace) <-chan struct{}

RunFlowByID should be used to start a flow by its id The message will be start message of the flow It will return a channel which should be used to wait for the flow The flowConfig param can be nil only if a default config is set The start message can be nil only if node is in service mode

func (*Context) RunFlowByIDWithWait

func (ctx *Context) RunFlowByIDWithWait(flowConfig *FlowConfig, flowID globals.UID, message MessageIFace)

RunFlowByIDWithWait does the same as RunFlowByID but it will do the waiting itself

func (*Context) RunFlowByName

func (ctx *Context) RunFlowByName(config *FlowConfig, flowName string, message MessageIFace) <-chan struct{}

RunFlowName should be used to start a flow by its name The message will be start message of the flow It will return a channel which should be used to wait for the flow The flowConfig param can be nil only if a default config is set The start message can be nil only if node is in service mode

func (*Context) RunFlowByNameWithWait

func (ctx *Context) RunFlowByNameWithWait(flowConfig *FlowConfig, flowName string, message MessageIFace)

RunFlowByNameWithWait does the same as RunFlowByName but it will do the waiting itself

func (*Context) SetCustomRunner

func (ctx *Context) SetCustomRunner(runner *Runner)

TODO Make it more safe to use as it can be used to set a runner with custom supervisor but for every different flow it has to be new one it can be reused only fo the same flow

func (*Context) SetDefaultFlowConfig

func (ctx *Context) SetDefaultFlowConfig(config *FlowConfig)

SetDefaultFlowConfig should be used to set a config which will be used in case no config is passed to the RunFlow type of commands

func (*Context) SetLogger

func (ctx *Context) SetLogger(logger utils.LoggerIFace) *Context

func (*Context) Shutdown

func (ctx *Context) Shutdown()

Shutdown should be deferred after the context creation to be able to handle the optional Terminate functions of the nodes For details about the usage check the docs

type ContextIFace

type ContextIFace interface {
	GetLogger() utils.LoggerIFace
	NewMessageByType(MessageType) MessageIFace
	RunFlowByNameWithWait(*FlowConfig, string, MessageIFace)
	GetFlowByName(string) (FlowIFace, bool)
}

TODO It is only used for testing

type CreateMessageFunction

type CreateMessageFunction func() MessageIFace

type CreateNodeFunction

type CreateNodeFunction func() NodeIFace

type DefaultSupervisor

type DefaultSupervisor struct {
	MaxRetries utils.LockedCounter
	// contains filtered or unexported fields
}

func (*DefaultSupervisor) ApplyStrategy

func (ds *DefaultSupervisor) ApplyStrategy(runner *Runner, node NodeIFace,
	startMessage MessageIFace, err error)

type DispatchedFunc

type DispatchedFunc func(params FuncParams) MessageIFace

type DoneItem

type DoneItem struct {
	// contains filtered or unexported fields
}

type DummyMessage

type DummyMessage struct {
	// contains filtered or unexported fields
}

func (*DummyMessage) Get

func (dm *DummyMessage) Get() interface{}

func (*DummyMessage) GetAsMap

func (dm *DummyMessage) GetAsMap() map[string]interface{}

func (*DummyMessage) GetBody

func (dm *DummyMessage) GetBody() []byte

func (*DummyMessage) GetDeliveryTime

func (bm *DummyMessage) GetDeliveryTime() time.Time

func (*DummyMessage) GetSentTime

func (bm *DummyMessage) GetSentTime() time.Time

func (*DummyMessage) GetType

func (dm *DummyMessage) GetType() MessageType

func (*DummyMessage) Put

func (dm *DummyMessage) Put(data interface{})

func (*DummyMessage) PutAsMap

func (dm *DummyMessage) PutAsMap(data map[string]interface{})

func (*DummyMessage) SetBody

func (dm *DummyMessage) SetBody(body []byte)

func (*DummyMessage) SetDeliveryTime

func (bm *DummyMessage) SetDeliveryTime(dt time.Time)

func (*DummyMessage) SetSentTime

func (bm *DummyMessage) SetSentTime(st time.Time)

func (*DummyMessage) SetType

func (dm *DummyMessage) SetType(mType MessageType)

type EventHandler

type EventHandler struct {
	// contains filtered or unexported fields
}

func (*EventHandler) HandleEvents

func (eh *EventHandler) HandleEvents()

func (*EventHandler) SetRunner

func (eh *EventHandler) SetRunner(runner *Runner)

type Flow

type Flow struct {
	// contains filtered or unexported fields
}

func NewFlow

func NewFlow(id globals.UID) *Flow

func (*Flow) AddNode

func (f *Flow) AddNode(nodeType db.NodeType) NodeIFace

AddNode can be use to add a node to the flow It also has to be connected in order to use as a part of the flow To make connections the Connect or QueryConnect method should be used

func (*Flow) Connect

func (f *Flow) Connect(startNode NodeIFace, startPortLabel string,
	endNode NodeIFace, endPortLabel string) *Link

Connect should be used to connect the added nodes by creating links between them so they can be used for messaging between the nodes

func (*Flow) EnableTrace

func (f *Flow) EnableTrace(WithMessageRecording bool)

EnableTrace enables tracing for test or debug purposes If WithMessageRecording is false than it will only store all the nodes touched If WithMessageRecording is true than it will store all the messages as well

func (*Flow) GetCache

func (f *Flow) GetCache() *portCache

func (*Flow) GetConfig

func (f *Flow) GetConfig() *FlowConfig

func (*Flow) GetContext

func (f *Flow) GetContext() *Context

func (*Flow) GetDictionary

func (f *Flow) GetDictionary() *FlowDictionary

GetDictionary returns the flow dictionary is map that can be use to share bigger data between nodes by adding a data to the dict using unique id then sharing that id through messages

func (*Flow) GetID

func (f *Flow) GetID() globals.UID

func (*Flow) GetLogger

func (f *Flow) GetLogger() utils.LoggerIFace

func (*Flow) GetName

func (f *Flow) GetName() string

func (*Flow) GetNodesCount

func (f *Flow) GetNodesCount() int

func (*Flow) GetRegistry

func (f *Flow) GetRegistry() *Registry

func (*Flow) GetStartNode

func (f *Flow) GetStartNode() NodeIFace

func (*Flow) Init

func (f *Flow) Init()

func (*Flow) IsOsThreadNeeded

func (f *Flow) IsOsThreadNeeded() bool

func (*Flow) IsTraced

func (f *Flow) IsTraced() (bool, bool)

func (*Flow) QueryConnect

func (f *Flow) QueryConnect(startNode NodeIFace, startPortLabel string,
	endNode NodeIFace, endPortLabel string)

QueryConnect is like the Connect function but for connecting queries only as query link has an inverse direction than the other links meaning that the source is following the destination node in that case only

func (*Flow) RemoveNode

func (f *Flow) RemoveNode(node NodeIFace)

RemoveNode can be use to remove a node from the flow It should be used only when there is a memory restrictions Be aware that removing the node from the flow will break the ability of accessing its internal states as well

func (*Flow) SetCache

func (f *Flow) SetCache(cache *portCache)

func (*Flow) SetContext

func (f *Flow) SetContext(runtime *Context)

func (*Flow) SetLogger

func (f *Flow) SetLogger(logger utils.LoggerIFace)

func (*Flow) SetName

func (f *Flow) SetName(name string)

func (*Flow) SetOsThreadNeeded

func (f *Flow) SetOsThreadNeeded()

func (*Flow) SetRegistry

func (f *Flow) SetRegistry(reg *Registry)

func (*Flow) SetStartNode

func (f *Flow) SetStartNode(startNode NodeIFace)

SetStartNode can be used to set start node of the flow for the runner For more info about the start and the end node concept see the docs

func (*Flow) UpdateConfig

func (f *Flow) UpdateConfig(config *FlowConfig)

UpdateConfig can be used to apply changes on the config of the flow Usually handled by the runtime or by the rest server Can be useful in cases when you need some special configuration just only for a few flow If the default config has to be changed than SetDefaultFlowConfig func of context should be used instead

type FlowConfig

type FlowConfig struct {
	Context           context.Context `json:"-"`
	DedicateOSThread  bool
	MaxRetries        int
	ErrorHandlingFunc StrategyHandlingFunc `json:"-"`
	MaxIdleTime       time.Duration
	ShutdownMode      ShutdownMode
}

func (*FlowConfig) Clone

func (fc *FlowConfig) Clone() *FlowConfig

type FlowDictionary

type FlowDictionary struct {
	// contains filtered or unexported fields
}

func NewFlowDict

func NewFlowDict() *FlowDictionary

func (*FlowDictionary) Add

func (fd *FlowDictionary) Add(key string, data interface{}) error

func (*FlowDictionary) Get

func (fd *FlowDictionary) Get(key string) (interface{}, bool)

func (*FlowDictionary) Init

func (fd *FlowDictionary) Init()

func (*FlowDictionary) Pop

func (fd *FlowDictionary) Pop(key string) (interface{}, bool)

func (*FlowDictionary) Set

func (fd *FlowDictionary) Set(key string, data interface{})

type FlowIFace

type FlowIFace interface {
	SetName(string)
	GetName() string
	UpdateConfig(*FlowConfig)
	GetConfig() *FlowConfig
}

TODO It is only used for testing

type FuncParams

type FuncParams interface{}

type FunctionDispatcher

type FunctionDispatcher struct {
	// contains filtered or unexported fields
}

func NewFunctionDispatcher

func NewFunctionDispatcher(logger utils.LoggerIFace) *FunctionDispatcher

func (*FunctionDispatcher) AddFunc

func (fd *FunctionDispatcher) AddFunc(funcName string, dispFunc DispatchedFunc)

func (*FunctionDispatcher) DispatchFromMessage

func (fd *FunctionDispatcher) DispatchFromMessage(message MessageIFace) []MessageIFace

func (*FunctionDispatcher) GetFunctionByName

func (fd *FunctionDispatcher) GetFunctionByName(funcName string) DispatchedFunc

type InnerMessage

type InnerMessage struct {
	MType MessageType
	Mjson []byte
}

type InnerObjCreateFunc

type InnerObjCreateFunc func() interface{}
type Link struct {
	Id globals.UID
	// contains filtered or unexported fields
}
func NewLink(id globals.UID) *Link

func (*Link) GetEndPortID

func (l *Link) GetEndPortID() globals.UID

func (*Link) GetStartPortID

func (l *Link) GetStartPortID() globals.UID

func (*Link) GetWaitSettings

func (l *Link) GetWaitSettings() *WaitSettings

func (*Link) Init

func (l *Link) Init()

func (*Link) SetWaitSettings

func (l *Link) SetWaitSettings(waitSetting WaitSettings)

SetWaitSettings should be used after the link is created To set the rules of the link (see the related doc) It can also be called before a call to change the link settings

type MessageDescriptor

type MessageDescriptor struct {
	// contains filtered or unexported fields
}

type MessageIFace

type MessageIFace interface {
	SetType(MessageType)
	GetType() MessageType

	Put(interface{})
	Get() interface{}

	PutAsMap(map[string]interface{})
	GetAsMap() map[string]interface{}

	SetBody([]byte)
	GetBody() []byte

	SetSentTime(time.Time)
	SetDeliveryTime(time.Time)

	GetSentTime() time.Time
	GetDeliveryTime() time.Time
}

func NewDummyMessage

func NewDummyMessage() MessageIFace

type MessageType

type MessageType string

type NodeDescriptor

type NodeDescriptor struct {
	// contains filtered or unexported fields
}

type NodeError

type NodeError struct {
	// contains filtered or unexported fields
}

type NodeIFace

type NodeIFace interface {
	SetID(id globals.UID)
	GetID() globals.UID
	GetIdAsString() string

	SetType(db.NodeType)
	GetType() db.NodeType

	SetParentFlow(flow *Flow)
	GetParentFlow() *Flow

	SetLogger(utils.LoggerIFace)

	Set(MessageIFace)
	Get(MessageIFace) MessageIFace

	Mount()
	Init()
	Run(*Runner, MessageIFace) MessageIFace
	Terminate()

	GetInputPorts() []db.PortModel
	GetOutputPorts() []db.PortModel

	GetActInputCounter() *utils.Counter
	GetActOutputCounter() *utils.Counter

	MakeService()
	IsService() bool

	SetSupervised(bool)
	IsSupervised() bool

	SetInited()
	IsInited() bool

	SetBusy(bool)
	IsBusy() bool

	IsRef() bool
	IsStartNode() bool
	IsEndNode() bool
}

func NewReferenceNode

func NewReferenceNode(sourceNode NodeIFace, count uint32) NodeIFace

type Port

type Port struct {
	PDir  db.PortDirection // PDir PortDirection in DB
	PType db.PortType      // PType PortType in DB
	// contains filtered or unexported fields
}

func NewPort

func NewPort(pDir db.PortDirection, pType db.PortType, label string, parentNode NodeIFace) *Port
func (p *Port) AddConnectedLink(startPortID globals.UID,
	endPortID globals.UID) *Link
func (lh *Port) DeRegisterLink(link *Link)

func (*Port) EnableEndPortForLinkByID

func (p *Port) EnableEndPortForLinkByID(linkID globals.UID)

func (*Port) EnableEndPortForLinkByIndex

func (p *Port) EnableEndPortForLinkByIndex(linkIndex int)

func (*Port) GetConnectedLinkNum

func (p *Port) GetConnectedLinkNum() int

GetConnectedLinkNum returns the num of links attached to the port It can be used to get that info without using len on the returned values of GetConnectedLinks

func (p *Port) GetConnectedLinks() []db.LinkModel

Returns the links attached to the node Be aware that it returns the link models not the link objects to get the link objects by id the registry of the flow should be used

func (*Port) GetID

func (p *Port) GetID() globals.UID

func (*Port) GetLabel

func (p *Port) GetLabel() string

func (*Port) GetLinkIDByIndex

func (p *Port) GetLinkIDByIndex(linkIndex int) globals.UID

func (*Port) GetParentNode

func (p *Port) GetParentNode() NodeIFace

func (*Port) HasValidEndPortForLinkByID

func (p *Port) HasValidEndPortForLinkByID(linkID globals.UID) bool

func (*Port) HasValidEndPortForLinkByIndex

func (p *Port) HasValidEndPortForLinkByIndex(linkIndex int) bool

func (*Port) Init

func (p *Port) Init()

func (*Port) IsVirtual

func (p *Port) IsVirtual() bool

func (*Port) MakeVirtual

func (p *Port) MakeVirtual(alias string)

MakeVirtual is a helper function to give public interface for properties or functions For details check the docs

func (lh *Port) RegisterLink(link *Link)
func (p *Port) RemoveConnectedLink(link *Link)

TODO Check if it is enough or we have to do something with the Associations as well

func (*Port) SetID

func (p *Port) SetID(id globals.UID)

type PropertyManager

type PropertyManager struct {
	// contains filtered or unexported fields
}

func NewPropertyManager

func NewPropertyManager(logger utils.LoggerIFace) *PropertyManager

func (*PropertyManager) Get

func (pm *PropertyManager) Get(propName string) interface{}

func (*PropertyManager) GetKeys

func (pm *PropertyManager) GetKeys() []string

func (*PropertyManager) GetPropsFromMessage

func (pm *PropertyManager) GetPropsFromMessage(message MessageIFace) MessageIFace

func (*PropertyManager) HasKey

func (pm *PropertyManager) HasKey(key string) bool

func (*PropertyManager) Set

func (pm *PropertyManager) Set(propName string, value interface{})

func (*PropertyManager) SetPropsFromMessage

func (pm *PropertyManager) SetPropsFromMessage(message MessageIFace)

type ReferenceNode

type ReferenceNode struct {
	// contains filtered or unexported fields
}

func (*ReferenceNode) Get

func (rf *ReferenceNode) Get(message MessageIFace) MessageIFace

func (*ReferenceNode) GetActInputCounter

func (rf *ReferenceNode) GetActInputCounter() *utils.Counter

func (*ReferenceNode) GetActOutputCounter

func (rf *ReferenceNode) GetActOutputCounter() *utils.Counter

func (*ReferenceNode) GetID

func (rf *ReferenceNode) GetID() globals.UID

func (*ReferenceNode) GetIdAsString

func (rf *ReferenceNode) GetIdAsString() string

func (*ReferenceNode) GetInputPorts

func (rf *ReferenceNode) GetInputPorts() []db.PortModel

func (*ReferenceNode) GetOrig

func (rf *ReferenceNode) GetOrig() NodeIFace

func (*ReferenceNode) GetOutputPorts

func (rf *ReferenceNode) GetOutputPorts() []db.PortModel

func (*ReferenceNode) GetParentFlow

func (rf *ReferenceNode) GetParentFlow() *Flow

func (*ReferenceNode) GetType

func (rf *ReferenceNode) GetType() db.NodeType

func (*ReferenceNode) Init

func (rf *ReferenceNode) Init()

func (*ReferenceNode) IsBusy

func (rf *ReferenceNode) IsBusy() bool

func (*ReferenceNode) IsEndNode

func (rf *ReferenceNode) IsEndNode() bool

func (*ReferenceNode) IsInited

func (rf *ReferenceNode) IsInited() bool

func (*ReferenceNode) IsRef

func (rf *ReferenceNode) IsRef() bool

func (*ReferenceNode) IsService

func (rf *ReferenceNode) IsService() bool

func (*ReferenceNode) IsStartNode

func (rf *ReferenceNode) IsStartNode() bool

func (*ReferenceNode) IsSupervised

func (rf *ReferenceNode) IsSupervised() bool

func (*ReferenceNode) MakeService

func (rf *ReferenceNode) MakeService()

func (*ReferenceNode) Mount

func (rf *ReferenceNode) Mount()

func (*ReferenceNode) Run

func (rf *ReferenceNode) Run(runner *Runner, message MessageIFace) MessageIFace

func (*ReferenceNode) Set

func (rf *ReferenceNode) Set(message MessageIFace)

func (*ReferenceNode) SetBusy

func (rf *ReferenceNode) SetBusy(value bool)

func (*ReferenceNode) SetID

func (rf *ReferenceNode) SetID(id globals.UID)

func (*ReferenceNode) SetInited

func (rf *ReferenceNode) SetInited()

func (*ReferenceNode) SetLogger

func (rf *ReferenceNode) SetLogger(logger utils.LoggerIFace)

func (*ReferenceNode) SetParentFlow

func (rf *ReferenceNode) SetParentFlow(flow *Flow)

func (*ReferenceNode) SetSupervised

func (rf *ReferenceNode) SetSupervised(value bool)

func (*ReferenceNode) SetType

func (rf *ReferenceNode) SetType(nType db.NodeType)

func (*ReferenceNode) Terminate

func (rf *ReferenceNode) Terminate()

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry() *Registry
func (r *Registry) AddLink(link *Link)

func (*Registry) AddNode

func (r *Registry) AddNode(node NodeIFace)

func (*Registry) AddPort

func (r *Registry) AddPort(port *Port)

func (*Registry) GetLinkByID

func (r *Registry) GetLinkByID(linkID globals.UID) *Link

func (*Registry) GetNodeByID

func (r *Registry) GetNodeByID(nodeID globals.UID) NodeIFace

func (*Registry) GetPortByID

func (r *Registry) GetPortByID(portID globals.UID) *Port
func (r *Registry) RemoveLink(link *Link)

func (*Registry) RemoveNode

func (r *Registry) RemoveNode(node NodeIFace)

func (*Registry) RemovePort

func (r *Registry) RemovePort(port *Port)

type Request

type Request struct {
	// contains filtered or unexported fields
}

type Runner

type Runner struct {
	EventHandler

	// id                  globals.CounterID
	Config *config.RunnerConfig
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner(supervisor SupervisorIFace, context context.Context,
	timeout time.Duration, shutdownMode ShutdownMode) *Runner

func (*Runner) Call

func (r *Runner) Call(senderNode NodeIFace, port *Port, linkIndex int, message MessageIFace) (MessageIFace, bool)

Call should be used to execute the Run function of an attached node For details about the params and the usage check the docs

func (*Runner) CallAll

func (r *Runner) CallAll(senderNode NodeIFace, messages []MessageIFace)

CallAll is helper function to simplify the execution of many nodes Its can be useful if multiple nodes should be executed for example if the node uses an external script and as its result multiple nodes should be executed

func (*Runner) GetContext

func (r *Runner) GetContext() *Context

func (*Runner) GetFlow

func (r *Runner) GetFlow() *Flow

GetFlow should be used to get the flow object which is handled by the runner

func (*Runner) GetInputMessagesByNodeID

func (r *Runner) GetInputMessagesByNodeID(nodeID globals.UID) []InnerMessage

GetInputMessagesByNodeID can be used to get all the input messages of a node after flow run It will only work if the tracing is enabled with message recording option

func (*Runner) GetLogger

func (r *Runner) GetLogger() utils.LoggerIFace

func (*Runner) GetOutputMessagesByNodeID

func (r *Runner) GetOutputMessagesByNodeID(nodeID globals.UID) []InnerMessage

GetInputMessagesByNodeID can be used to get all the output messages of a node after flow run It will only work if the tracing is enabled with message recording option

func (*Runner) GetTrace

func (r *Runner) GetTrace() []NodeIFace

GetTrace should be used to get the list of the touched nodes in case tracing is enabled on the flow

func (*Runner) Init

func (r *Runner) Init()

func (*Runner) IsBusy

func (r *Runner) IsBusy() bool

func (*Runner) Query

func (r *Runner) Query(senderNode NodeIFace, port *Port, messageType MessageType) (MessageIFace, bool)

Query should be used to query an exposed internal state of an attached node For details about the params and the usage check the docs

func (*Runner) QueryAll

func (r *Runner) QueryAll(senderNode NodeIFace, messageType MessageType) []MessageIFace

QueryAll is helper function to simplify the querying of many nodes Its can be used useful if multiple nodes should be queried for example if the node uses an external script and queries params for the script coming from multiple sources (nodes)

func (*Runner) RaiseError

func (r *Runner) RaiseError(sender NodeIFace, info string)

RaiseError should be used to notify the runtime about an error For details about the params and the usage check the docs

func (*Runner) RunNode

func (r *Runner) RunNode(node NodeIFace, message MessageIFace)

RunNode can be used to run only one node of the flow Usually to run a node the Call function should be used but in some special cases it could be useful to do that for example to handle retry with the supervisor

func (*Runner) Send

func (r *Runner) Send(senderNode NodeIFace, port *Port, message MessageIFace) bool

Send should be used to set an exposed internal state of an attached node For details about the params and the usage check the docs

func (*Runner) SendAll

func (r *Runner) SendAll(senderNode NodeIFace, messages []MessageIFace)

SendAll is helper function to simplify the setting of the internals for many nodes Its can be useful if multiple nodes should be modified for example if the node uses an external script and the output of the script should be sent to multiple destinations (nodes)

func (*Runner) SetConfig

func (r *Runner) SetConfig(config *config.RunnerConfig)

func (*Runner) SetContext

func (r *Runner) SetContext(rt *Context)

func (*Runner) SetFlow

func (r *Runner) SetFlow(flow *Flow)

func (*Runner) SetLogger

func (r *Runner) SetLogger(logger utils.LoggerIFace)

func (*Runner) SetTraced

func (r *Runner) SetTraced(withMessageRecording bool)

func (*Runner) Shutdown

func (r *Runner) Shutdown(doGraceful bool)

Shutdown can be used to force a shutdown of a flow By default it will be handled automatically when all the nodes are touched by changing the related flow config param the default behavior can disabled in that case it should be done manually or the flow will only stop because of timeout

func (*Runner) StartFlow

func (r *Runner) StartFlow(flow *Flow, message MessageIFace)

type SendFinished

type SendFinished struct {
	// contains filtered or unexported fields
}

func (SendFinished) GetTimeStamp

func (sf SendFinished) GetTimeStamp() time.Time

type ShutdownMode

type ShutdownMode int
const (
	Auto ShutdownMode = iota
	Manual
)

type StrategyHandlingFunc

type StrategyHandlingFunc func(*Runner, NodeIFace, MessageIFace, error)

type SupervisorIFace

type SupervisorIFace interface {
	ApplyStrategy(*Runner, NodeIFace, MessageIFace, error)
}

func NewDefaultSupervisor

func NewDefaultSupervisor(maxRetries int, handlingFunc StrategyHandlingFunc) SupervisorIFace

type VirtualFunction

type VirtualFunction struct {
	// contains filtered or unexported fields
}

func (*VirtualFunction) GetName

func (vf *VirtualFunction) GetName() string

func (*VirtualFunction) GetParams

func (vf *VirtualFunction) GetParams() FuncParams

type WaitForType

type WaitForType int
const (
	WaitForNone WaitForType = iota
	WaitForResponse
	WaitForCaller
	WaitForNode
	WaitForEndNode
)

type WaitQueueItem

type WaitQueueItem struct {
	// contains filtered or unexported fields
}

type WaitSettings

type WaitSettings struct {
	Condition   WaitForType
	CallTimeout time.Duration
}

func (*WaitSettings) GetCallTimeout

func (wfs *WaitSettings) GetCallTimeout() time.Duration

func (*WaitSettings) GetCondition

func (wfs *WaitSettings) GetCondition() WaitForType

func (*WaitSettings) SetCallTimeout

func (wfs *WaitSettings) SetCallTimeout(timeout time.Duration)

func (*WaitSettings) SetCondition

func (wfs *WaitSettings) SetCondition(wType WaitForType)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL