grpcstream

package
v0.28.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const Type = types.EndpointTypePrefix + "grpc/stream"

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) Close

func (c *Client) Close()

func (*Client) IsActive

func (c *Client) IsActive() bool

type Config

type Config struct {
	Server        string            // gRPC服务器地址
	Service       string            // gRPC服务名称
	Method        string            // gRPC方法名称
	Headers       map[string]string // 请求头
	Request       string            // 请求数据(如果为nil,发送空数据,看服务端的处理逻辑)
	CheckInterval int               // gRPC服务器检查间隔(ms)
}

Config gRPC流配置

type Endpoint

type Endpoint = GrpcStream

type GrpcStream

type GrpcStream struct {
	impl.BaseEndpoint
	base.SharedNode[*Client]
	RuleConfig types.Config
	Config     Config

	Router endpointApi.Router
	// contains filtered or unexported fields
}

GrpcStream 提供了基于 gRPC 流式通信的端点实现。 支持与 gRPC 服务端建立长连接,接收服务端推送的消息并通过路由转发处理

特性: - 自动重连:当连接断开时会自动尝试重新建立连接 - 单路由模式:每个端点实例只支持配置一个消息处理路由 - 共享连接:相同服务器地址(Server)的多个端点实例会复用同一个gRPC连接,避免重复创建连接 - 支持配置:可通过 Config 结构体配置服务地址、方法、请求参数、gRPC服务器检查间隔

示例:

"endpoints": [

{
  "id": "GRPC Stream",
  "type": "endpoint/grpc/stream",
  "name": "GRPC Stream",
  "debugMode": false,
  "configuration": {
	"checkInterval": 10000,
	"method": "SayHello",
	"server": "127.0.0.1:9000",
	"service": "helloworld.Greeter"
  },
  "processors": null,
  "routers": [
	{
	  "id": "",
	  "params": null,
	  "from": {
		"path": "*",                      //!!! 路由只能填写 *,表示所有来源
		"configuration": null,
		"processors": null
	  },
	  "to": {
		"path": "bkn3fIAr8x4w:MQTT",
		"configuration": null,
		"wait": false,
		"processors": null
	  }
	}
  ]

func (*GrpcStream) AddRouter

func (x *GrpcStream) AddRouter(router endpointApi.Router, params ...interface{}) (string, error)

AddRouter 添加路由

func (*GrpcStream) Destroy

func (x *GrpcStream) Destroy()

Destroy 销毁组件

func (*GrpcStream) Id

func (x *GrpcStream) Id() string

Id 返回组件ID

func (*GrpcStream) Init

func (x *GrpcStream) Init(ruleConfig types.Config, configuration types.Configuration) error

Init 初始化组件

func (*GrpcStream) New

func (x *GrpcStream) New() types.Node

New 创建新的实例

func (*GrpcStream) Printf

func (x *GrpcStream) Printf(format string, v ...interface{})

Printf 日志输出

func (*GrpcStream) RemoveRouter

func (x *GrpcStream) RemoveRouter(routerId string, params ...interface{}) error

RemoveRouter 移除路由

func (*GrpcStream) Start

func (x *GrpcStream) Start() error

Start 启动组件

func (*GrpcStream) Type

func (x *GrpcStream) Type() string

Type 返回组件类型

type RequestMessage

type RequestMessage struct {
	// contains filtered or unexported fields
}

RequestMessage 请求消息结构

func (*RequestMessage) Body

func (r *RequestMessage) Body() []byte

func (*RequestMessage) From

func (r *RequestMessage) From() string

func (*RequestMessage) GetError

func (r *RequestMessage) GetError() error

func (*RequestMessage) GetMsg

func (r *RequestMessage) GetMsg() *types.RuleMsg

func (*RequestMessage) GetParam

func (r *RequestMessage) GetParam(key string) string

func (*RequestMessage) Headers

func (r *RequestMessage) Headers() textproto.MIMEHeader

func (*RequestMessage) SetBody

func (r *RequestMessage) SetBody(body []byte)

func (*RequestMessage) SetError

func (r *RequestMessage) SetError(err error)

func (*RequestMessage) SetMsg

func (r *RequestMessage) SetMsg(msg *types.RuleMsg)

func (*RequestMessage) SetStatusCode

func (r *RequestMessage) SetStatusCode(statusCode int)

type ResponseMessage

type ResponseMessage struct {
	// contains filtered or unexported fields
}

ResponseMessage 响应消息结构

func (*ResponseMessage) Body

func (r *ResponseMessage) Body() []byte

func (*ResponseMessage) From

func (r *ResponseMessage) From() string

func (*ResponseMessage) GetError

func (r *ResponseMessage) GetError() error

func (*ResponseMessage) GetMsg

func (r *ResponseMessage) GetMsg() *types.RuleMsg

func (*ResponseMessage) GetParam

func (r *ResponseMessage) GetParam(key string) string

func (*ResponseMessage) Headers

func (r *ResponseMessage) Headers() textproto.MIMEHeader

func (*ResponseMessage) SetBody

func (r *ResponseMessage) SetBody(body []byte)

func (*ResponseMessage) SetError

func (r *ResponseMessage) SetError(err error)

func (*ResponseMessage) SetMsg

func (r *ResponseMessage) SetMsg(msg *types.RuleMsg)

func (*ResponseMessage) SetStatusCode

func (r *ResponseMessage) SetStatusCode(statusCode int)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL