Documentation ¶
Index ¶
- Variables
- type BaseEndpoint
- type ChainExecutor
- type ComponentExecutor
- type ComponentRegistry
- type Endpoint
- type Exchange
- type Executor
- type ExecutorFactory
- type From
- func (f *From) End() *Router
- func (f *From) ExecuteProcess(router *Router, exchange *Exchange) bool
- func (f *From) GetProcessList() []Process
- func (f *From) GetTo() *To
- func (f *From) Process(process Process) *From
- func (f *From) To(to string, configs ...types.Configuration) *To
- func (f *From) ToComponent(node types.Node) *To
- func (f *From) ToString() string
- func (f *From) Transform(transform Process) *From
- type Message
- type Process
- type Router
- type RouterOption
- type To
- func (t *To) End() *Router
- func (t *To) Execute(ctx context.Context, exchange *Exchange)
- func (t *To) GetProcessList() []Process
- func (t *To) Process(process Process) *To
- func (t *To) ToString() string
- func (t *To) ToStringByDict(dict map[string]string) string
- func (t *To) Transform(transform Process) *To
- func (t *To) Wait() *To
Constants ¶
This section is empty.
Variables ¶
var ( //ChainNotFoundErr 规则链不存在错误 ChainNotFoundErr = errors.New("chain not found error") //StopErr endpoint服务停止错误 StopErr = errors.New("endpoint stop") )
var DefaultExecutorFactory = new(ExecutorFactory)
DefaultExecutorFactory 默认to端执行器注册器
var Registry = new(ComponentRegistry)
Registry endpoint组件默认注册器
Functions ¶
This section is empty.
Types ¶
type BaseEndpoint ¶
type BaseEndpoint struct { //endpoint 路由存储器 RouterStorage map[string]*Router sync.RWMutex // contains filtered or unexported fields }
BaseEndpoint 基础端点 实现全局拦截器基础方法
func (*BaseEndpoint) AddInterceptors ¶
func (e *BaseEndpoint) AddInterceptors(interceptors ...Process)
AddInterceptors 添加全局拦截器
func (*BaseEndpoint) DoProcess ¶
func (e *BaseEndpoint) DoProcess(router *Router, exchange *Exchange)
func (*BaseEndpoint) OnMsg ¶ added in v0.14.0
func (e *BaseEndpoint) OnMsg(ctx types.RuleContext, msg types.RuleMsg)
type ChainExecutor ¶
type ChainExecutor struct { }
ChainExecutor 规则链执行器
func (*ChainExecutor) Execute ¶
func (ce *ChainExecutor) Execute(ctx context.Context, router *Router, exchange *Exchange)
func (*ChainExecutor) Init ¶
func (ce *ChainExecutor) Init(_ types.Config, _ types.Configuration) error
func (*ChainExecutor) IsPathSupportVar ¶
func (ce *ChainExecutor) IsPathSupportVar() bool
IsPathSupportVar to路径允许带变量
func (*ChainExecutor) New ¶
func (ce *ChainExecutor) New() Executor
type ComponentExecutor ¶
type ComponentExecutor struct {
// contains filtered or unexported fields
}
ComponentExecutor node组件执行器
func (*ComponentExecutor) Execute ¶
func (ce *ComponentExecutor) Execute(ctx context.Context, router *Router, exchange *Exchange)
func (*ComponentExecutor) Init ¶
func (ce *ComponentExecutor) Init(config types.Config, configuration types.Configuration) error
func (*ComponentExecutor) IsPathSupportVar ¶
func (ce *ComponentExecutor) IsPathSupportVar() bool
IsPathSupportVar to路径不允许带变量
func (*ComponentExecutor) New ¶
func (ce *ComponentExecutor) New() Executor
type ComponentRegistry ¶ added in v0.17.0
ComponentRegistry 组件注册器
func (*ComponentRegistry) New ¶ added in v0.17.0
func (r *ComponentRegistry) New(componentType string, ruleConfig types.Config, configuration interface{}) (Endpoint, error)
New 创建一个新的endpoint实例
func (*ComponentRegistry) Register ¶ added in v0.17.0
func (r *ComponentRegistry) Register(endpoint Endpoint) error
Register 注册规则引擎节点组件
func (*ComponentRegistry) Unregister ¶ added in v0.17.0
func (r *ComponentRegistry) Unregister(componentType string) error
type Endpoint ¶ added in v0.14.0
type Endpoint interface { //Node 继承node types.Node //Id 类型标识 Id() string //Start 启动服务 Start() error //AddInterceptors 添加全局拦截器 AddInterceptors(interceptors ...Process) //AddRouter 添加路由,指定参数 //params 为路由额外参数 //返回路由ID,路由ID一般是from值,但某些Endpoint允许from值重复,Endpoint会返回新的路由ID, //路由ID用于路由删除 AddRouter(router *Router, params ...interface{}) (string, error) //RemoveRouter 删除路由,指定参数 //params 为路由额外参数 //routerId:路由ID RemoveRouter(routerId string, params ...interface{}) error }
type Executor ¶
type Executor interface { //New 创建新的实例 New() Executor //IsPathSupportVar to路径是否支持${}变量方式,默认不支持 IsPathSupportVar() bool //Init 初始化 Init(config types.Config, configuration types.Configuration) error //Execute 执行逻辑 Execute(ctx context.Context, router *Router, exchange *Exchange) }
Executor to端执行器
type ExecutorFactory ¶
ExecutorFactory to端执行器工厂
func (*ExecutorFactory) New ¶
func (r *ExecutorFactory) New(name string) (Executor, bool)
New 根据类型创建to端执行器实例
func (*ExecutorFactory) Register ¶
func (r *ExecutorFactory) Register(name string, executor Executor)
Register 注册to端执行器
type From ¶
type From struct { //Config 配置 Config types.Configuration //Router router指针 Router *Router //来源路径 From string // contains filtered or unexported fields }
From from端
func (*From) ExecuteProcess ¶
ExecuteProcess 执行处理函数 true:执行To端逻辑,否则不执行
func (*From) To ¶
func (f *From) To(to string, configs ...types.Configuration) *To
To To端 参数是组件路径,格式{executorType}:{path} executorType:执行器组件类型,path:组件路径 如:chain:{chainId} 执行rulego中注册的规则链 component:{nodeType} 执行在config.ComponentsRegistry 中注册的组件 可在DefaultExecutorFactory中注册自定义执行器组件类型 componentConfigs 组件配置参数
func (*From) ToComponent ¶
ToComponent to组件 参数是types.Node类型组件
type Message ¶
type Message interface { //Body message body Body() []byte Headers() textproto.MIMEHeader From() string //GetParam http.Request#FormValue GetParam(key string) string //SetMsg set RuleMsg SetMsg(msg *types.RuleMsg) //GetMsg 把接收数据转换成 RuleMsg GetMsg() *types.RuleMsg //SetStatusCode 响应 code SetStatusCode(statusCode int) //SetBody 响应 body SetBody(body []byte) //SetError 设置错误 SetError(err error) //GetError 获取错误 GetError() error }
Message 接收端点数据抽象接口 不同输入源数据统一接口
type Router ¶
type Router struct { //规则链池,默认使用rulego.DefaultRuleGo RuleGo *rulego.RuleGo //Config ruleEngine Config Config types.Config // contains filtered or unexported fields }
Router 路由,抽象不同输入源数据路由 把消息从输入端(From),经过转换(Transform)成RuleMsg结构,或者处理Process,然后交给规则链处理(To) 或者 把消息从输入端(From),经过转换(Transform),然后处理响应(Process) 用法: http endpoint endpoint.NewRouter().From("/api/v1/msg/").Transform().To("chain:xx") endpoint.NewRouter().From("/api/v1/msg/").Transform().Process().To("chain:xx") endpoint.NewRouter().From("/api/v1/msg/").Transform().Process().To("component:nodeType") endpoint.NewRouter().From("/api/v1/msg/").Transform().Process() mqtt endpoint endpoint.NewRouter().From("#").Transform().Process().To("chain:xx") endpoint.NewRouter().From("topic").Transform().Process().To("chain:xx")
func (*Router) FromToString ¶
type RouterOption ¶
RouterOption 选项函数
func WithRuleGo ¶
func WithRuleGo(ruleGo *rulego.RuleGo) RouterOption
WithRuleGo 更改规则链池,默认使用rulego.DefaultRuleGo
type To ¶
type To struct { //toPath是否有占位符变量 HasVars bool //Config to组件配置 Config types.Configuration //Router router指针 Router *Router //流转目标路径,例如"chain:{chainId}",则是交给规则引擎处理数据 To string //去掉to执行器标记的路径 ToPath string // contains filtered or unexported fields }
To to端
func (*To) ToStringByDict ¶
ToStringByDict 转换路径中的变量,并返回最终字符串