action

package
v0.20.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DelayNodeMsgType = "DELAY_NODE_MSG_TYPE"
View Source
var Functions = &FunctionsRegistry{}

Functions 自定义函数注册器

View Source
var JsLogReturnFormatErr = errors.New("return the value is not a string")

JsLogReturnFormatErr 如果脚本返回值不是string错误

Functions

This section is empty.

Types

type DelayNode added in v0.16.0

type DelayNode struct {
	//节点配置
	Config DelayNodeConfiguration
	//消息队列
	PendingMsgs map[string]types.RuleMsg
	//上一条pending msg id
	LastPendingMsgId atomic.Value
	// contains filtered or unexported fields
}

DelayNode 当消息的延迟期达到后,该消息将从挂起队列中删除,并通过成功链路(`Success`)路由到下一个节点。 如果已经达到了最大挂起消息限制,则每个下一条消息都会通过失败链路(`Failure`)路由。 如果overwrite为true,则消息会被覆盖,直到队列里的消息被处理后,才会再次进入延迟队列。

func (*DelayNode) Destroy added in v0.16.0

func (x *DelayNode) Destroy()

Destroy 销毁

func (*DelayNode) Init added in v0.16.0

func (x *DelayNode) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化

func (*DelayNode) New added in v0.16.0

func (x *DelayNode) New() types.Node

func (*DelayNode) OnMsg added in v0.16.0

func (x *DelayNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 处理消息

func (*DelayNode) Type added in v0.16.0

func (x *DelayNode) Type() string

Type 组件类型

type DelayNodeConfiguration added in v0.16.0

type DelayNodeConfiguration struct {
	//延迟时间,单位秒
	PeriodInSeconds int
	//最大允许挂起消息的数量
	MaxPendingMsgs int
	//通过${metadataKey}方式从metadata变量中获取,延迟时间,如果该值有值,优先取该值。
	PeriodInSecondsPattern string
	//是否覆盖周期内的消息
	//true:周期内只保留一条消息,新的消息会覆盖之前的消息。直到队列里的消息被处理后,才会再次进入延迟队列。
	//false:周期内保留所有消息,直到达到最大挂起消息限制后,才会进入失败链路。
	Overwrite bool
}

DelayNodeConfiguration 节点配置

type FunctionsNode added in v0.16.0

type FunctionsNode struct {
	//节点配置
	Config FunctionsNodeConfiguration
	//functionName是否有占位符变量
	HasVars bool
}

FunctionsNode 通过方法名调用注册在Functions的处理函数 如果没找到函数,则TellFailure

func (*FunctionsNode) Destroy added in v0.16.0

func (x *FunctionsNode) Destroy()

Destroy 销毁

func (*FunctionsNode) Init added in v0.16.0

func (x *FunctionsNode) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化

func (*FunctionsNode) New added in v0.16.0

func (x *FunctionsNode) New() types.Node

func (*FunctionsNode) OnMsg added in v0.16.0

func (x *FunctionsNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 处理消息

func (*FunctionsNode) Type added in v0.16.0

func (x *FunctionsNode) Type() string

Type 组件类型

type FunctionsNodeConfiguration added in v0.16.0

type FunctionsNodeConfiguration struct {
	//FunctionName 调用的函数名称,支持通过${metadataKey}方式从metadata动态获取函数名称值
	FunctionName string
}

FunctionsNodeConfiguration 节点配置

type FunctionsRegistry added in v0.16.0

type FunctionsRegistry struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

FunctionsRegistry 函数注册器

func (*FunctionsRegistry) Get added in v0.16.0

func (x *FunctionsRegistry) Get(functionName string) (func(ctx types.RuleContext, msg types.RuleMsg), bool)

Get 获取函数

func (*FunctionsRegistry) Register added in v0.16.0

func (x *FunctionsRegistry) Register(functionName string, f func(ctx types.RuleContext, msg types.RuleMsg))

Register 注册函数

func (*FunctionsRegistry) UnRegister added in v0.16.0

func (x *FunctionsRegistry) UnRegister(functionName string)

UnRegister 删除函数

type GroupActionNode added in v0.19.0

type GroupActionNode struct {
	//节点配置
	Config GroupActionNodeConfiguration
	//节点ID列表
	NodeIdList []string
	//节点列表长度
	Length int32
}

GroupActionNode 把多个节点组成一个分组,异步执行所有节点,等待所有节点执行完成后,把所有节点结果合并,发送到下一个节点 如果匹配到 Config.MatchNum 个节点是 Config.MatchRelationType 类型,则把数据到`Success`链, 否则发到`Failure`链。 nodeIds为空或者执行超时,发送到`Failure`链

func (*GroupActionNode) Destroy added in v0.19.0

func (x *GroupActionNode) Destroy()

Destroy 销毁

func (*GroupActionNode) Init added in v0.19.0

func (x *GroupActionNode) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化

func (*GroupActionNode) New added in v0.19.0

func (x *GroupActionNode) New() types.Node

func (*GroupActionNode) OnMsg added in v0.19.0

func (x *GroupActionNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 处理消息

func (*GroupActionNode) Type added in v0.19.0

func (x *GroupActionNode) Type() string

Type 组件类型

type GroupActionNodeConfiguration added in v0.19.0

type GroupActionNodeConfiguration struct {
	//MatchRelationType 匹配组内节点关系类型,支持`Success`、`Failure`、`True`、`False`和自定义等关系,默认`Success`
	MatchRelationType string
	//MatchNum 匹配满足节点数量
	//默认0:代表组内所有节点都是 MatchRelationType 指定类型,发送到`Success`链,否则发送到`Failure`链。
	//MatchNum>0,则表示任意匹配 MatchNum 个节点是 MatchRelationType 指定类型,发送到`Success`链,否则发送到`Failure`链。
	//MatchNum>=len(NodeIds)则等价于MatchNum=0
	MatchNum int
	//NodeIds 组内节点ID列表,多个ID与`,`隔开
	NodeIds string
	//Timeout 执行超时,单位秒,默认0:代表不限制。
	Timeout int
}

GroupActionNodeConfiguration 节点配置

type IteratorNode added in v0.19.0

type IteratorNode struct {
	//节点配置
	Config IteratorNodeConfiguration
	// contains filtered or unexported fields
}

IteratorNode 遍历msg或者msg中指定字段item值到下一个节点,遍历字段值必须是`数组`或者`{key:value}`类型 如果item满足JsScript,则会把item数据通过`True`链发到下一个节点,否则通过`False`链发到下一个节点 如果找不到指定字段、js脚本执行失败或者遍历的对象不是`数组`或者`{key:value}`,则会把错误信息通过Failure链发到下一个节点 遍历结束后,通过Success链把原始msg发送到下一个节点

func (*IteratorNode) Destroy added in v0.19.0

func (x *IteratorNode) Destroy()

Destroy 销毁

func (*IteratorNode) Init added in v0.19.0

func (x *IteratorNode) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化

func (*IteratorNode) New added in v0.19.0

func (x *IteratorNode) New() types.Node

func (*IteratorNode) OnMsg added in v0.19.0

func (x *IteratorNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 处理消息

func (*IteratorNode) Type added in v0.19.0

func (x *IteratorNode) Type() string

Type 组件类型

type IteratorNodeConfiguration added in v0.19.0

type IteratorNodeConfiguration struct {
	// 遍历字段名称,如果空,遍历整个msg,支持嵌套方式获取msg字段值,例如items.value、items
	FieldName string
	// 过滤item js脚本,可选,默认为空,匹配所有item
	// function ItemFilter(item,index,metadata)
	// item: 当前遍历的item
	// index: 如果是数组,则表示当前遍历的index,如果是map,则表示当前遍历的key
	JsScript string
}

IteratorNodeConfiguration 节点配置

type LogNode

type LogNode struct {
	//节点配置
	Config LogNodeConfiguration
	// contains filtered or unexported fields
}

LogNode 使用JS脚本将传入消息转换为字符串,并将最终值记录到日志文件中 使用`types.Config.Logger`记录日志 消息体可以通过`msg`变量访问,msg 是string类型。例如:`return msg.temperature > 50;` 消息元数据可以通过`metadata`变量访问。例如 `metadata.customerName === 'Lala';` 消息类型可以通过`msgType`变量访问. 脚本执行成功,发送信息到`Success`链, 否则发到`Failure`链。

func (*LogNode) Destroy

func (x *LogNode) Destroy()

Destroy 销毁

func (*LogNode) Init

func (x *LogNode) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化

func (*LogNode) New

func (x *LogNode) New() types.Node

func (*LogNode) OnMsg

func (x *LogNode) OnMsg(ctx types.RuleContext, msg types.RuleMsg)

OnMsg 处理消息

func (*LogNode) Type

func (x *LogNode) Type() string

Type 组件类型

type LogNodeConfiguration

type LogNodeConfiguration struct {
	//JsScript 只配置函数体脚本内容,对消息进行格式化,脚本返回值string
	//例如
	//return 'Incoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);
	//完整脚本函数:
	//"function ToString(msg, metadata, msgType) { ${JsScript} }"
	//脚本返回值string
	JsScript string
}

LogNodeConfiguration 节点配置

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL