Documentation ¶
Index ¶
- Constants
- Variables
- func RestartHandler(runner *Runner, node NodeIFace, startMessage MessageIFace, err error)
- type CallResponse
- type CallStarted
- type Context
- func (ctx *Context) AddFlow(name string) *Flow
- func (ctx *Context) CloneFlow(flow *Flow) *Flow
- func (ctx *Context) GetFlowByName(name string) (FlowIFace, bool)
- func (ctx *Context) GetLogger() utils.LoggerIFace
- func (ctx *Context) GetRunnerByFlow(flow *Flow) *Runner
- func (ctx *Context) IsMessageTypeRegistered(mType MessageType) bool
- func (ctx *Context) IsNodeTypeRegistered(nType db.NodeType) bool
- func (ctx *Context) LoadConfig(path string)
- func (ctx *Context) NewMessageByType(mType MessageType) MessageIFace
- func (ctx *Context) NewNodeByType(nType db.NodeType) NodeIFace
- func (ctx *Context) RegisterMessage(mType MessageType, createFunc CreateMessageFunction)
- func (ctx *Context) RegisterNode(nType db.NodeType, createFunc CreateNodeFunction)
- func (ctx *Context) RemoveFlow(flow *Flow)
- func (ctx *Context) RunFlowByID(config *FlowConfig, flowID globals.UID, message MessageIFace) <-chan struct{}
- func (ctx *Context) RunFlowByIDWithWait(flowConfig *FlowConfig, flowID globals.UID, message MessageIFace)
- func (ctx *Context) RunFlowByName(config *FlowConfig, flowName string, message MessageIFace) <-chan struct{}
- func (ctx *Context) RunFlowByNameWithWait(flowConfig *FlowConfig, flowName string, message MessageIFace)
- func (ctx *Context) SetCustomRunner(runner *Runner)
- func (ctx *Context) SetDefaultFlowConfig(config *FlowConfig)
- func (ctx *Context) SetLogger(logger utils.LoggerIFace) *Context
- func (ctx *Context) Shutdown()
- type ContextIFace
- type CreateMessageFunction
- type CreateNodeFunction
- type DefaultSupervisor
- type DispatchedFunc
- type DoneItem
- type DummyMessage
- func (dm *DummyMessage) Get() interface{}
- func (dm *DummyMessage) GetAsMap() map[string]interface{}
- func (dm *DummyMessage) GetBody() []byte
- func (bm *DummyMessage) GetDeliveryTime() time.Time
- func (bm *DummyMessage) GetSentTime() time.Time
- func (dm *DummyMessage) GetType() MessageType
- func (dm *DummyMessage) Put(data interface{})
- func (dm *DummyMessage) PutAsMap(data map[string]interface{})
- func (dm *DummyMessage) SetBody(body []byte)
- func (bm *DummyMessage) SetDeliveryTime(dt time.Time)
- func (bm *DummyMessage) SetSentTime(st time.Time)
- func (dm *DummyMessage) SetType(mType MessageType)
- type EventHandler
- type Flow
- func (f *Flow) AddNode(nodeType db.NodeType) NodeIFace
- func (f *Flow) Connect(startNode NodeIFace, startPortLabel string, endNode NodeIFace, ...) *Link
- func (f *Flow) EnableTrace(WithMessageRecording bool)
- func (f *Flow) GetCache() *portCache
- func (f *Flow) GetConfig() *FlowConfig
- func (f *Flow) GetContext() *Context
- func (f *Flow) GetDictionary() *FlowDictionary
- func (f *Flow) GetID() globals.UID
- func (f *Flow) GetLogger() utils.LoggerIFace
- func (f *Flow) GetName() string
- func (f *Flow) GetNodesCount() int
- func (f *Flow) GetRegistry() *Registry
- func (f *Flow) GetStartNode() NodeIFace
- func (f *Flow) Init()
- func (f *Flow) IsOsThreadNeeded() bool
- func (f *Flow) IsTraced() (bool, bool)
- func (f *Flow) QueryConnect(startNode NodeIFace, startPortLabel string, endNode NodeIFace, ...)
- func (f *Flow) RemoveNode(node NodeIFace)
- func (f *Flow) SetCache(cache *portCache)
- func (f *Flow) SetContext(runtime *Context)
- func (f *Flow) SetLogger(logger utils.LoggerIFace)
- func (f *Flow) SetName(name string)
- func (f *Flow) SetOsThreadNeeded()
- func (f *Flow) SetRegistry(reg *Registry)
- func (f *Flow) SetStartNode(startNode NodeIFace)
- func (f *Flow) UpdateConfig(config *FlowConfig)
- type FlowConfig
- type FlowDictionary
- type FlowIFace
- type FuncParams
- type FunctionDispatcher
- type InnerMessage
- type InnerObjCreateFunc
- type Link
- type MessageDescriptor
- type MessageIFace
- type MessageType
- type NodeDescriptor
- type NodeError
- type NodeIFace
- type Port
- func (p *Port) AddConnectedLink(startPortID globals.UID, endPortID globals.UID) *Link
- func (lh *Port) DeRegisterLink(link *Link)
- func (p *Port) EnableEndPortForLinkByID(linkID globals.UID)
- func (p *Port) EnableEndPortForLinkByIndex(linkIndex int)
- func (p *Port) GetConnectedLinkNum() int
- func (p *Port) GetConnectedLinks() []db.LinkModel
- func (p *Port) GetID() globals.UID
- func (p *Port) GetLabel() string
- func (p *Port) GetLinkIDByIndex(linkIndex int) globals.UID
- func (p *Port) GetParentNode() NodeIFace
- func (p *Port) HasValidEndPortForLinkByID(linkID globals.UID) bool
- func (p *Port) HasValidEndPortForLinkByIndex(linkIndex int) bool
- func (p *Port) Init()
- func (p *Port) IsVirtual() bool
- func (p *Port) MakeVirtual(alias string)
- func (lh *Port) RegisterLink(link *Link)
- func (p *Port) RemoveConnectedLink(link *Link)
- func (p *Port) SetID(id globals.UID)
- type PropertyManager
- func (pm *PropertyManager) Get(propName string) interface{}
- func (pm *PropertyManager) GetKeys() []string
- func (pm *PropertyManager) GetPropsFromMessage(message MessageIFace) MessageIFace
- func (pm *PropertyManager) HasKey(key string) bool
- func (pm *PropertyManager) Set(propName string, value interface{})
- func (pm *PropertyManager) SetPropsFromMessage(message MessageIFace)
- type ReferenceNode
- func (rf *ReferenceNode) Get(message MessageIFace) MessageIFace
- func (rf *ReferenceNode) GetActInputCounter() *utils.Counter
- func (rf *ReferenceNode) GetActOutputCounter() *utils.Counter
- func (rf *ReferenceNode) GetID() globals.UID
- func (rf *ReferenceNode) GetIdAsString() string
- func (rf *ReferenceNode) GetInputPorts() []db.PortModel
- func (rf *ReferenceNode) GetOrig() NodeIFace
- func (rf *ReferenceNode) GetOutputPorts() []db.PortModel
- func (rf *ReferenceNode) GetParentFlow() *Flow
- func (rf *ReferenceNode) GetType() db.NodeType
- func (rf *ReferenceNode) Init()
- func (rf *ReferenceNode) IsBusy() bool
- func (rf *ReferenceNode) IsEndNode() bool
- func (rf *ReferenceNode) IsInited() bool
- func (rf *ReferenceNode) IsRef() bool
- func (rf *ReferenceNode) IsService() bool
- func (rf *ReferenceNode) IsStartNode() bool
- func (rf *ReferenceNode) IsSupervised() bool
- func (rf *ReferenceNode) MakeService()
- func (rf *ReferenceNode) Mount()
- func (rf *ReferenceNode) Run(runner *Runner, message MessageIFace) MessageIFace
- func (rf *ReferenceNode) Set(message MessageIFace)
- func (rf *ReferenceNode) SetBusy(value bool)
- func (rf *ReferenceNode) SetID(id globals.UID)
- func (rf *ReferenceNode) SetInited()
- func (rf *ReferenceNode) SetLogger(logger utils.LoggerIFace)
- func (rf *ReferenceNode) SetParentFlow(flow *Flow)
- func (rf *ReferenceNode) SetSupervised(value bool)
- func (rf *ReferenceNode) SetType(nType db.NodeType)
- func (rf *ReferenceNode) Terminate()
- type Registry
- func (r *Registry) AddLink(link *Link)
- func (r *Registry) AddNode(node NodeIFace)
- func (r *Registry) AddPort(port *Port)
- func (r *Registry) GetLinkByID(linkID globals.UID) *Link
- func (r *Registry) GetNodeByID(nodeID globals.UID) NodeIFace
- func (r *Registry) GetPortByID(portID globals.UID) *Port
- func (r *Registry) RemoveLink(link *Link)
- func (r *Registry) RemoveNode(node NodeIFace)
- func (r *Registry) RemovePort(port *Port)
- type Request
- type Runner
- func (r *Runner) Call(senderNode NodeIFace, port *Port, linkIndex int, message MessageIFace) (MessageIFace, bool)
- func (r *Runner) CallAll(senderNode NodeIFace, messages []MessageIFace)
- func (r *Runner) GetContext() *Context
- func (r *Runner) GetFlow() *Flow
- func (r *Runner) GetInputMessagesByNodeID(nodeID globals.UID) []InnerMessage
- func (r *Runner) GetLogger() utils.LoggerIFace
- func (r *Runner) GetOutputMessagesByNodeID(nodeID globals.UID) []InnerMessage
- func (r *Runner) GetTrace() []NodeIFace
- func (r *Runner) Init()
- func (r *Runner) IsBusy() bool
- func (r *Runner) Query(senderNode NodeIFace, port *Port, messageType MessageType) (MessageIFace, bool)
- func (r *Runner) QueryAll(senderNode NodeIFace, messageType MessageType) []MessageIFace
- func (r *Runner) RaiseError(sender NodeIFace, info string)
- func (r *Runner) RunNode(node NodeIFace, message MessageIFace)
- func (r *Runner) Send(senderNode NodeIFace, port *Port, message MessageIFace) bool
- func (r *Runner) SendAll(senderNode NodeIFace, messages []MessageIFace)
- func (r *Runner) SetConfig(config *config.RunnerConfig)
- func (r *Runner) SetContext(rt *Context)
- func (r *Runner) SetFlow(flow *Flow)
- func (r *Runner) SetLogger(logger utils.LoggerIFace)
- func (r *Runner) SetTraced(withMessageRecording bool)
- func (r *Runner) Shutdown(doGraceful bool)
- func (r *Runner) StartFlow(flow *Flow, message MessageIFace)
- type SendFinished
- type ShutdownMode
- type StrategyHandlingFunc
- type SupervisorIFace
- type VirtualFunction
- type WaitForType
- type WaitQueueItem
- type WaitSettings
Constants ¶
const CRITICAL_ERROR_MSG = "__critical_error__"
const RAISED_ERROR_MSG = "__raised_error__"
Variables ¶
var DefaultFlowConfig = FlowConfig{Context: context.TODO(), DedicateOSThread: false, MaxRetries: 5, ErrorHandlingFunc: RestartHandler, MaxIdleTime: 10 * time.Second, ShutdownMode: Auto}
Functions ¶
func RestartHandler ¶
func RestartHandler(runner *Runner, node NodeIFace, startMessage MessageIFace, err error)
Types ¶
type CallResponse ¶
type CallResponse struct {
// contains filtered or unexported fields
}
func (CallResponse) GetTimeStamp ¶
func (cr CallResponse) GetTimeStamp() time.Time
type CallStarted ¶
type CallStarted struct {
// contains filtered or unexported fields
}
func (CallStarted) GetTimeStamp ¶
func (cs CallStarted) GetTimeStamp() time.Time
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
func NewContext ¶
func NewContext(configPath string, logger utils.LoggerIFace) *Context
func (*Context) AddFlow ¶
AddFlow should be used to add new flow to the context to handle The passed name param can be used later to run or get the flow object
func (*Context) GetFlowByName ¶
GetFlowByName should be used to access a flow object It should be used only from outside of the scope of the runner
func (*Context) GetLogger ¶
func (ctx *Context) GetLogger() utils.LoggerIFace
func (*Context) GetRunnerByFlow ¶
GetRunnerByFlow can used to get runner object of a flow It can be useful for debugging or testing only
func (*Context) IsMessageTypeRegistered ¶
func (ctx *Context) IsMessageTypeRegistered(mType MessageType) bool
IsMessageTypeRegistered can be used to check whether a message type is known by runtime
func (*Context) IsNodeTypeRegistered ¶
IsMessageTypeRegistered can be used to check whether a node type is known by runtime
func (*Context) LoadConfig ¶
func (*Context) NewMessageByType ¶
func (ctx *Context) NewMessageByType(mType MessageType) MessageIFace
NewMessageByType can be used to create a registered message
func (*Context) NewNodeByType ¶
NewNodeByType can be used to create a registered node
func (*Context) RegisterMessage ¶
func (ctx *Context) RegisterMessage(mType MessageType, createFunc CreateMessageFunction)
RegisterMessage should be used before calling a flow which has any node of that type It is essential for the runtime to have the type registered in order to work correctly
func (*Context) RegisterNode ¶
func (ctx *Context) RegisterNode(nType db.NodeType, createFunc CreateNodeFunction)
RegisterNode should be used to be able to create a node as part of a flow It is also essential for the runtime to have the type registered in order to work correctly
func (*Context) RemoveFlow ¶
RemoveFlow can be used to remove a flow from the context It can be useful only in case of a memory restrictions as usually it will be handled by the GC
func (*Context) RunFlowByID ¶
func (ctx *Context) RunFlowByID(config *FlowConfig, flowID globals.UID, message MessageIFace) <-chan struct{}
RunFlowByID should be used to start a flow by its id The message will be start message of the flow It will return a channel which should be used to wait for the flow The flowConfig param can be nil only if a default config is set The start message can be nil only if node is in service mode
func (*Context) RunFlowByIDWithWait ¶
func (ctx *Context) RunFlowByIDWithWait(flowConfig *FlowConfig, flowID globals.UID, message MessageIFace)
RunFlowByIDWithWait does the same as RunFlowByID but it will do the waiting itself
func (*Context) RunFlowByName ¶
func (ctx *Context) RunFlowByName(config *FlowConfig, flowName string, message MessageIFace) <-chan struct{}
RunFlowName should be used to start a flow by its name The message will be start message of the flow It will return a channel which should be used to wait for the flow The flowConfig param can be nil only if a default config is set The start message can be nil only if node is in service mode
func (*Context) RunFlowByNameWithWait ¶
func (ctx *Context) RunFlowByNameWithWait(flowConfig *FlowConfig, flowName string, message MessageIFace)
RunFlowByNameWithWait does the same as RunFlowByName but it will do the waiting itself
func (*Context) SetCustomRunner ¶
TODO Make it more safe to use as it can be used to set a runner with custom supervisor but for every different flow it has to be new one it can be reused only fo the same flow
func (*Context) SetDefaultFlowConfig ¶
func (ctx *Context) SetDefaultFlowConfig(config *FlowConfig)
SetDefaultFlowConfig should be used to set a config which will be used in case no config is passed to the RunFlow type of commands
type ContextIFace ¶
type ContextIFace interface { GetLogger() utils.LoggerIFace NewMessageByType(MessageType) MessageIFace RunFlowByNameWithWait(*FlowConfig, string, MessageIFace) GetFlowByName(string) (FlowIFace, bool) }
TODO It is only used for testing
type CreateMessageFunction ¶
type CreateMessageFunction func() MessageIFace
type CreateNodeFunction ¶
type CreateNodeFunction func() NodeIFace
type DefaultSupervisor ¶
type DefaultSupervisor struct { MaxRetries utils.LockedCounter // contains filtered or unexported fields }
func (*DefaultSupervisor) ApplyStrategy ¶
func (ds *DefaultSupervisor) ApplyStrategy(runner *Runner, node NodeIFace, startMessage MessageIFace, err error)
type DispatchedFunc ¶
type DispatchedFunc func(params FuncParams) MessageIFace
type DummyMessage ¶
type DummyMessage struct {
// contains filtered or unexported fields
}
func (*DummyMessage) Get ¶
func (dm *DummyMessage) Get() interface{}
func (*DummyMessage) GetAsMap ¶
func (dm *DummyMessage) GetAsMap() map[string]interface{}
func (*DummyMessage) GetBody ¶
func (dm *DummyMessage) GetBody() []byte
func (*DummyMessage) GetDeliveryTime ¶
func (bm *DummyMessage) GetDeliveryTime() time.Time
func (*DummyMessage) GetSentTime ¶
func (bm *DummyMessage) GetSentTime() time.Time
func (*DummyMessage) GetType ¶
func (dm *DummyMessage) GetType() MessageType
func (*DummyMessage) Put ¶
func (dm *DummyMessage) Put(data interface{})
func (*DummyMessage) PutAsMap ¶
func (dm *DummyMessage) PutAsMap(data map[string]interface{})
func (*DummyMessage) SetBody ¶
func (dm *DummyMessage) SetBody(body []byte)
func (*DummyMessage) SetDeliveryTime ¶
func (bm *DummyMessage) SetDeliveryTime(dt time.Time)
func (*DummyMessage) SetSentTime ¶
func (bm *DummyMessage) SetSentTime(st time.Time)
func (*DummyMessage) SetType ¶
func (dm *DummyMessage) SetType(mType MessageType)
type EventHandler ¶
type EventHandler struct {
// contains filtered or unexported fields
}
func (*EventHandler) HandleEvents ¶
func (eh *EventHandler) HandleEvents()
func (*EventHandler) SetRunner ¶
func (eh *EventHandler) SetRunner(runner *Runner)
type Flow ¶
type Flow struct {
// contains filtered or unexported fields
}
func (*Flow) AddNode ¶
AddNode can be use to add a node to the flow It also has to be connected in order to use as a part of the flow To make connections the Connect or QueryConnect method should be used
func (*Flow) Connect ¶
func (f *Flow) Connect(startNode NodeIFace, startPortLabel string, endNode NodeIFace, endPortLabel string) *Link
Connect should be used to connect the added nodes by creating links between them so they can be used for messaging between the nodes
func (*Flow) EnableTrace ¶
EnableTrace enables tracing for test or debug purposes If WithMessageRecording is false than it will only store all the nodes touched If WithMessageRecording is true than it will store all the messages as well
func (*Flow) GetConfig ¶
func (f *Flow) GetConfig() *FlowConfig
func (*Flow) GetContext ¶
func (*Flow) GetDictionary ¶
func (f *Flow) GetDictionary() *FlowDictionary
GetDictionary returns the flow dictionary is map that can be use to share bigger data between nodes by adding a data to the dict using unique id then sharing that id through messages
func (*Flow) GetLogger ¶
func (f *Flow) GetLogger() utils.LoggerIFace
func (*Flow) GetNodesCount ¶
func (*Flow) GetRegistry ¶
func (*Flow) GetStartNode ¶
func (*Flow) IsOsThreadNeeded ¶
func (*Flow) QueryConnect ¶
func (f *Flow) QueryConnect(startNode NodeIFace, startPortLabel string, endNode NodeIFace, endPortLabel string)
QueryConnect is like the Connect function but for connecting queries only as query link has an inverse direction than the other links meaning that the source is following the destination node in that case only
func (*Flow) RemoveNode ¶
RemoveNode can be use to remove a node from the flow It should be used only when there is a memory restrictions Be aware that removing the node from the flow will break the ability of accessing its internal states as well
func (*Flow) SetContext ¶
func (*Flow) SetLogger ¶
func (f *Flow) SetLogger(logger utils.LoggerIFace)
func (*Flow) SetOsThreadNeeded ¶
func (f *Flow) SetOsThreadNeeded()
func (*Flow) SetRegistry ¶
func (*Flow) SetStartNode ¶
SetStartNode can be used to set start node of the flow for the runner For more info about the start and the end node concept see the docs
func (*Flow) UpdateConfig ¶
func (f *Flow) UpdateConfig(config *FlowConfig)
UpdateConfig can be used to apply changes on the config of the flow Usually handled by the runtime or by the rest server Can be useful in cases when you need some special configuration just only for a few flow If the default config has to be changed than SetDefaultFlowConfig func of context should be used instead
type FlowConfig ¶
type FlowConfig struct { Context context.Context `json:"-"` DedicateOSThread bool MaxRetries int ErrorHandlingFunc StrategyHandlingFunc `json:"-"` MaxIdleTime time.Duration ShutdownMode ShutdownMode }
func (*FlowConfig) Clone ¶
func (fc *FlowConfig) Clone() *FlowConfig
type FlowDictionary ¶
type FlowDictionary struct {
// contains filtered or unexported fields
}
func NewFlowDict ¶
func NewFlowDict() *FlowDictionary
func (*FlowDictionary) Add ¶
func (fd *FlowDictionary) Add(key string, data interface{}) error
func (*FlowDictionary) Get ¶
func (fd *FlowDictionary) Get(key string) (interface{}, bool)
func (*FlowDictionary) Init ¶
func (fd *FlowDictionary) Init()
func (*FlowDictionary) Pop ¶
func (fd *FlowDictionary) Pop(key string) (interface{}, bool)
func (*FlowDictionary) Set ¶
func (fd *FlowDictionary) Set(key string, data interface{})
type FlowIFace ¶
type FlowIFace interface { SetName(string) GetName() string UpdateConfig(*FlowConfig) GetConfig() *FlowConfig }
TODO It is only used for testing
type FuncParams ¶
type FuncParams interface{}
type FunctionDispatcher ¶
type FunctionDispatcher struct {
// contains filtered or unexported fields
}
func NewFunctionDispatcher ¶
func NewFunctionDispatcher(logger utils.LoggerIFace) *FunctionDispatcher
func (*FunctionDispatcher) AddFunc ¶
func (fd *FunctionDispatcher) AddFunc(funcName string, dispFunc DispatchedFunc)
func (*FunctionDispatcher) DispatchFromMessage ¶
func (fd *FunctionDispatcher) DispatchFromMessage(message MessageIFace) []MessageIFace
func (*FunctionDispatcher) GetFunctionByName ¶
func (fd *FunctionDispatcher) GetFunctionByName(funcName string) DispatchedFunc
type InnerMessage ¶
type InnerMessage struct { MType MessageType Mjson []byte }
type InnerObjCreateFunc ¶
type InnerObjCreateFunc func() interface{}
type Link ¶
func (*Link) GetEndPortID ¶
func (*Link) GetStartPortID ¶
func (*Link) GetWaitSettings ¶
func (l *Link) GetWaitSettings() *WaitSettings
func (*Link) SetWaitSettings ¶
func (l *Link) SetWaitSettings(waitSetting WaitSettings)
SetWaitSettings should be used after the link is created To set the rules of the link (see the related doc) It can also be called before a call to change the link settings
type MessageDescriptor ¶
type MessageDescriptor struct {
// contains filtered or unexported fields
}
type MessageIFace ¶
type MessageIFace interface { SetType(MessageType) GetType() MessageType Put(interface{}) Get() interface{} PutAsMap(map[string]interface{}) GetAsMap() map[string]interface{} SetBody([]byte) GetBody() []byte SetSentTime(time.Time) SetDeliveryTime(time.Time) GetSentTime() time.Time GetDeliveryTime() time.Time }
func NewDummyMessage ¶
func NewDummyMessage() MessageIFace
type MessageType ¶
type MessageType string
type NodeDescriptor ¶
type NodeDescriptor struct {
// contains filtered or unexported fields
}
type NodeIFace ¶
type NodeIFace interface { SetID(id globals.UID) GetID() globals.UID GetIdAsString() string SetType(db.NodeType) GetType() db.NodeType SetParentFlow(flow *Flow) GetParentFlow() *Flow SetLogger(utils.LoggerIFace) Set(MessageIFace) Get(MessageIFace) MessageIFace Mount() Init() Run(*Runner, MessageIFace) MessageIFace Terminate() GetInputPorts() []db.PortModel GetOutputPorts() []db.PortModel GetActInputCounter() *utils.Counter GetActOutputCounter() *utils.Counter MakeService() IsService() bool SetSupervised(bool) IsSupervised() bool SetInited() IsInited() bool SetBusy(bool) IsBusy() bool IsRef() bool IsStartNode() bool IsEndNode() bool }
func NewReferenceNode ¶
type Port ¶
type Port struct { PDir db.PortDirection // PDir PortDirection in DB PType db.PortType // PType PortType in DB // contains filtered or unexported fields }
func (*Port) AddConnectedLink ¶
func (*Port) DeRegisterLink ¶
func (lh *Port) DeRegisterLink(link *Link)
func (*Port) EnableEndPortForLinkByID ¶
func (*Port) EnableEndPortForLinkByIndex ¶
func (*Port) GetConnectedLinkNum ¶
GetConnectedLinkNum returns the num of links attached to the port It can be used to get that info without using len on the returned values of GetConnectedLinks
func (*Port) GetConnectedLinks ¶
Returns the links attached to the node Be aware that it returns the link models not the link objects to get the link objects by id the registry of the flow should be used
func (*Port) GetParentNode ¶
func (*Port) HasValidEndPortForLinkByID ¶
func (*Port) HasValidEndPortForLinkByIndex ¶
func (*Port) MakeVirtual ¶
MakeVirtual is a helper function to give public interface for properties or functions For details check the docs
func (*Port) RegisterLink ¶
func (lh *Port) RegisterLink(link *Link)
func (*Port) RemoveConnectedLink ¶
TODO Check if it is enough or we have to do something with the Associations as well
type PropertyManager ¶
type PropertyManager struct {
// contains filtered or unexported fields
}
func NewPropertyManager ¶
func NewPropertyManager(logger utils.LoggerIFace) *PropertyManager
func (*PropertyManager) Get ¶
func (pm *PropertyManager) Get(propName string) interface{}
func (*PropertyManager) GetKeys ¶
func (pm *PropertyManager) GetKeys() []string
func (*PropertyManager) GetPropsFromMessage ¶
func (pm *PropertyManager) GetPropsFromMessage(message MessageIFace) MessageIFace
func (*PropertyManager) HasKey ¶
func (pm *PropertyManager) HasKey(key string) bool
func (*PropertyManager) Set ¶
func (pm *PropertyManager) Set(propName string, value interface{})
func (*PropertyManager) SetPropsFromMessage ¶
func (pm *PropertyManager) SetPropsFromMessage(message MessageIFace)
type ReferenceNode ¶
type ReferenceNode struct {
// contains filtered or unexported fields
}
func (*ReferenceNode) Get ¶
func (rf *ReferenceNode) Get(message MessageIFace) MessageIFace
func (*ReferenceNode) GetActInputCounter ¶
func (rf *ReferenceNode) GetActInputCounter() *utils.Counter
func (*ReferenceNode) GetActOutputCounter ¶
func (rf *ReferenceNode) GetActOutputCounter() *utils.Counter
func (*ReferenceNode) GetID ¶
func (rf *ReferenceNode) GetID() globals.UID
func (*ReferenceNode) GetIdAsString ¶
func (rf *ReferenceNode) GetIdAsString() string
func (*ReferenceNode) GetInputPorts ¶
func (rf *ReferenceNode) GetInputPorts() []db.PortModel
func (*ReferenceNode) GetOrig ¶
func (rf *ReferenceNode) GetOrig() NodeIFace
func (*ReferenceNode) GetOutputPorts ¶
func (rf *ReferenceNode) GetOutputPorts() []db.PortModel
func (*ReferenceNode) GetParentFlow ¶
func (rf *ReferenceNode) GetParentFlow() *Flow
func (*ReferenceNode) GetType ¶
func (rf *ReferenceNode) GetType() db.NodeType
func (*ReferenceNode) Init ¶
func (rf *ReferenceNode) Init()
func (*ReferenceNode) IsBusy ¶
func (rf *ReferenceNode) IsBusy() bool
func (*ReferenceNode) IsEndNode ¶
func (rf *ReferenceNode) IsEndNode() bool
func (*ReferenceNode) IsInited ¶
func (rf *ReferenceNode) IsInited() bool
func (*ReferenceNode) IsRef ¶
func (rf *ReferenceNode) IsRef() bool
func (*ReferenceNode) IsService ¶
func (rf *ReferenceNode) IsService() bool
func (*ReferenceNode) IsStartNode ¶
func (rf *ReferenceNode) IsStartNode() bool
func (*ReferenceNode) IsSupervised ¶
func (rf *ReferenceNode) IsSupervised() bool
func (*ReferenceNode) MakeService ¶
func (rf *ReferenceNode) MakeService()
func (*ReferenceNode) Mount ¶
func (rf *ReferenceNode) Mount()
func (*ReferenceNode) Run ¶
func (rf *ReferenceNode) Run(runner *Runner, message MessageIFace) MessageIFace
func (*ReferenceNode) Set ¶
func (rf *ReferenceNode) Set(message MessageIFace)
func (*ReferenceNode) SetBusy ¶
func (rf *ReferenceNode) SetBusy(value bool)
func (*ReferenceNode) SetID ¶
func (rf *ReferenceNode) SetID(id globals.UID)
func (*ReferenceNode) SetInited ¶
func (rf *ReferenceNode) SetInited()
func (*ReferenceNode) SetLogger ¶
func (rf *ReferenceNode) SetLogger(logger utils.LoggerIFace)
func (*ReferenceNode) SetParentFlow ¶
func (rf *ReferenceNode) SetParentFlow(flow *Flow)
func (*ReferenceNode) SetSupervised ¶
func (rf *ReferenceNode) SetSupervised(value bool)
func (*ReferenceNode) SetType ¶
func (rf *ReferenceNode) SetType(nType db.NodeType)
func (*ReferenceNode) Terminate ¶
func (rf *ReferenceNode) Terminate()
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
func NewRegistry ¶
func NewRegistry() *Registry
func (*Registry) RemoveLink ¶
func (*Registry) RemoveNode ¶
func (*Registry) RemovePort ¶
type Runner ¶
type Runner struct { EventHandler // id globals.CounterID Config *config.RunnerConfig // contains filtered or unexported fields }
func NewRunner ¶
func NewRunner(supervisor SupervisorIFace, context context.Context, timeout time.Duration, shutdownMode ShutdownMode) *Runner
func (*Runner) Call ¶
func (r *Runner) Call(senderNode NodeIFace, port *Port, linkIndex int, message MessageIFace) (MessageIFace, bool)
Call should be used to execute the Run function of an attached node For details about the params and the usage check the docs
func (*Runner) CallAll ¶
func (r *Runner) CallAll(senderNode NodeIFace, messages []MessageIFace)
CallAll is helper function to simplify the execution of many nodes Its can be useful if multiple nodes should be executed for example if the node uses an external script and as its result multiple nodes should be executed
func (*Runner) GetContext ¶
func (*Runner) GetFlow ¶
GetFlow should be used to get the flow object which is handled by the runner
func (*Runner) GetInputMessagesByNodeID ¶
func (r *Runner) GetInputMessagesByNodeID(nodeID globals.UID) []InnerMessage
GetInputMessagesByNodeID can be used to get all the input messages of a node after flow run It will only work if the tracing is enabled with message recording option
func (*Runner) GetLogger ¶
func (r *Runner) GetLogger() utils.LoggerIFace
func (*Runner) GetOutputMessagesByNodeID ¶
func (r *Runner) GetOutputMessagesByNodeID(nodeID globals.UID) []InnerMessage
GetInputMessagesByNodeID can be used to get all the output messages of a node after flow run It will only work if the tracing is enabled with message recording option
func (*Runner) GetTrace ¶
GetTrace should be used to get the list of the touched nodes in case tracing is enabled on the flow
func (*Runner) Query ¶
func (r *Runner) Query(senderNode NodeIFace, port *Port, messageType MessageType) (MessageIFace, bool)
Query should be used to query an exposed internal state of an attached node For details about the params and the usage check the docs
func (*Runner) QueryAll ¶
func (r *Runner) QueryAll(senderNode NodeIFace, messageType MessageType) []MessageIFace
QueryAll is helper function to simplify the querying of many nodes Its can be used useful if multiple nodes should be queried for example if the node uses an external script and queries params for the script coming from multiple sources (nodes)
func (*Runner) RaiseError ¶
RaiseError should be used to notify the runtime about an error For details about the params and the usage check the docs
func (*Runner) RunNode ¶
func (r *Runner) RunNode(node NodeIFace, message MessageIFace)
RunNode can be used to run only one node of the flow Usually to run a node the Call function should be used but in some special cases it could be useful to do that for example to handle retry with the supervisor
func (*Runner) Send ¶
func (r *Runner) Send(senderNode NodeIFace, port *Port, message MessageIFace) bool
Send should be used to set an exposed internal state of an attached node For details about the params and the usage check the docs
func (*Runner) SendAll ¶
func (r *Runner) SendAll(senderNode NodeIFace, messages []MessageIFace)
SendAll is helper function to simplify the setting of the internals for many nodes Its can be useful if multiple nodes should be modified for example if the node uses an external script and the output of the script should be sent to multiple destinations (nodes)
func (*Runner) SetConfig ¶
func (r *Runner) SetConfig(config *config.RunnerConfig)
func (*Runner) SetContext ¶
func (*Runner) SetLogger ¶
func (r *Runner) SetLogger(logger utils.LoggerIFace)
func (*Runner) Shutdown ¶
Shutdown can be used to force a shutdown of a flow By default it will be handled automatically when all the nodes are touched by changing the related flow config param the default behavior can disabled in that case it should be done manually or the flow will only stop because of timeout
func (*Runner) StartFlow ¶
func (r *Runner) StartFlow(flow *Flow, message MessageIFace)
type SendFinished ¶
type SendFinished struct {
// contains filtered or unexported fields
}
func (SendFinished) GetTimeStamp ¶
func (sf SendFinished) GetTimeStamp() time.Time
type StrategyHandlingFunc ¶
type StrategyHandlingFunc func(*Runner, NodeIFace, MessageIFace, error)
type SupervisorIFace ¶
type SupervisorIFace interface {
ApplyStrategy(*Runner, NodeIFace, MessageIFace, error)
}
func NewDefaultSupervisor ¶
func NewDefaultSupervisor(maxRetries int, handlingFunc StrategyHandlingFunc) SupervisorIFace
type VirtualFunction ¶
type VirtualFunction struct {
// contains filtered or unexported fields
}
func (*VirtualFunction) GetName ¶
func (vf *VirtualFunction) GetName() string
func (*VirtualFunction) GetParams ¶
func (vf *VirtualFunction) GetParams() FuncParams
type WaitForType ¶
type WaitForType int
const ( WaitForNone WaitForType = iota WaitForResponse WaitForCaller WaitForNode WaitForEndNode )
type WaitQueueItem ¶
type WaitQueueItem struct {
// contains filtered or unexported fields
}
type WaitSettings ¶
type WaitSettings struct { Condition WaitForType CallTimeout time.Duration }
func (*WaitSettings) GetCallTimeout ¶
func (wfs *WaitSettings) GetCallTimeout() time.Duration
func (*WaitSettings) GetCondition ¶
func (wfs *WaitSettings) GetCondition() WaitForType
func (*WaitSettings) SetCallTimeout ¶
func (wfs *WaitSettings) SetCallTimeout(timeout time.Duration)
func (*WaitSettings) SetCondition ¶
func (wfs *WaitSettings) SetCondition(wType WaitForType)