Documentation
¶
Index ¶
- Constants
- type Client
- type Config
- type Endpoint
- type GrpcStream
- func (x *GrpcStream) AddRouter(router endpointApi.Router, params ...interface{}) (string, error)
- func (x *GrpcStream) Destroy()
- func (x *GrpcStream) Id() string
- func (x *GrpcStream) Init(ruleConfig types.Config, configuration types.Configuration) error
- func (x *GrpcStream) New() types.Node
- func (x *GrpcStream) Printf(format string, v ...interface{})
- func (x *GrpcStream) RemoveRouter(routerId string, params ...interface{}) error
- func (x *GrpcStream) Start() error
- func (x *GrpcStream) Type() string
- type RequestMessage
- func (r *RequestMessage) Body() []byte
- func (r *RequestMessage) From() string
- func (r *RequestMessage) GetError() error
- func (r *RequestMessage) GetMsg() *types.RuleMsg
- func (r *RequestMessage) GetParam(key string) string
- func (r *RequestMessage) Headers() textproto.MIMEHeader
- func (r *RequestMessage) SetBody(body []byte)
- func (r *RequestMessage) SetError(err error)
- func (r *RequestMessage) SetMsg(msg *types.RuleMsg)
- func (r *RequestMessage) SetStatusCode(statusCode int)
- type ResponseMessage
- func (r *ResponseMessage) Body() []byte
- func (r *ResponseMessage) From() string
- func (r *ResponseMessage) GetError() error
- func (r *ResponseMessage) GetMsg() *types.RuleMsg
- func (r *ResponseMessage) GetParam(key string) string
- func (r *ResponseMessage) Headers() textproto.MIMEHeader
- func (r *ResponseMessage) SetBody(body []byte)
- func (r *ResponseMessage) SetError(err error)
- func (r *ResponseMessage) SetMsg(msg *types.RuleMsg)
- func (r *ResponseMessage) SetStatusCode(statusCode int)
Constants ¶
View Source
const Type = types.EndpointTypePrefix + "grpc/stream"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Server string // gRPC服务器地址 Service string // gRPC服务名称 Method string // gRPC方法名称 Headers map[string]string // 请求头 Request string // 请求数据(如果为nil,发送空数据,看服务端的处理逻辑) CheckInterval int // gRPC服务器检查间隔(ms) }
Config gRPC流配置
type Endpoint ¶
type Endpoint = GrpcStream
type GrpcStream ¶
type GrpcStream struct { impl.BaseEndpoint base.SharedNode[*Client] RuleConfig types.Config Config Config Router endpointApi.Router // contains filtered or unexported fields }
GrpcStream 提供了基于 gRPC 流式通信的端点实现。 支持与 gRPC 服务端建立长连接,接收服务端推送的消息并通过路由转发处理
特性: - 自动重连:当连接断开时会自动尝试重新建立连接 - 单路由模式:每个端点实例只支持配置一个消息处理路由 - 共享连接:相同服务器地址(Server)的多个端点实例会复用同一个gRPC连接,避免重复创建连接 - 支持配置:可通过 Config 结构体配置服务地址、方法、请求参数、gRPC服务器检查间隔
示例:
"endpoints": [
{ "id": "GRPC Stream", "type": "endpoint/grpc/stream", "name": "GRPC Stream", "debugMode": false, "configuration": { "checkInterval": 10000, "method": "SayHello", "server": "127.0.0.1:9000", "service": "helloworld.Greeter" }, "processors": null, "routers": [ { "id": "", "params": null, "from": { "path": "*", //!!! 路由只能填写 *,表示所有来源 "configuration": null, "processors": null }, "to": { "path": "bkn3fIAr8x4w:MQTT", "configuration": null, "wait": false, "processors": null } } ]
func (*GrpcStream) AddRouter ¶
func (x *GrpcStream) AddRouter(router endpointApi.Router, params ...interface{}) (string, error)
AddRouter 添加路由
func (*GrpcStream) Init ¶
func (x *GrpcStream) Init(ruleConfig types.Config, configuration types.Configuration) error
Init 初始化组件
func (*GrpcStream) Printf ¶
func (x *GrpcStream) Printf(format string, v ...interface{})
Printf 日志输出
func (*GrpcStream) RemoveRouter ¶
func (x *GrpcStream) RemoveRouter(routerId string, params ...interface{}) error
RemoveRouter 移除路由
type RequestMessage ¶
type RequestMessage struct {
// contains filtered or unexported fields
}
RequestMessage 请求消息结构
func (*RequestMessage) Body ¶
func (r *RequestMessage) Body() []byte
func (*RequestMessage) From ¶
func (r *RequestMessage) From() string
func (*RequestMessage) GetError ¶
func (r *RequestMessage) GetError() error
func (*RequestMessage) GetMsg ¶
func (r *RequestMessage) GetMsg() *types.RuleMsg
func (*RequestMessage) GetParam ¶
func (r *RequestMessage) GetParam(key string) string
func (*RequestMessage) Headers ¶
func (r *RequestMessage) Headers() textproto.MIMEHeader
func (*RequestMessage) SetBody ¶
func (r *RequestMessage) SetBody(body []byte)
func (*RequestMessage) SetError ¶
func (r *RequestMessage) SetError(err error)
func (*RequestMessage) SetMsg ¶
func (r *RequestMessage) SetMsg(msg *types.RuleMsg)
func (*RequestMessage) SetStatusCode ¶
func (r *RequestMessage) SetStatusCode(statusCode int)
type ResponseMessage ¶
type ResponseMessage struct {
// contains filtered or unexported fields
}
ResponseMessage 响应消息结构
func (*ResponseMessage) Body ¶
func (r *ResponseMessage) Body() []byte
func (*ResponseMessage) From ¶
func (r *ResponseMessage) From() string
func (*ResponseMessage) GetError ¶
func (r *ResponseMessage) GetError() error
func (*ResponseMessage) GetMsg ¶
func (r *ResponseMessage) GetMsg() *types.RuleMsg
func (*ResponseMessage) GetParam ¶
func (r *ResponseMessage) GetParam(key string) string
func (*ResponseMessage) Headers ¶
func (r *ResponseMessage) Headers() textproto.MIMEHeader
func (*ResponseMessage) SetBody ¶
func (r *ResponseMessage) SetBody(body []byte)
func (*ResponseMessage) SetError ¶
func (r *ResponseMessage) SetError(err error)
func (*ResponseMessage) SetMsg ¶
func (r *ResponseMessage) SetMsg(msg *types.RuleMsg)
func (*ResponseMessage) SetStatusCode ¶
func (r *ResponseMessage) SetStatusCode(statusCode int)
Click to show internal directories.
Click to hide internal directories.