Documentation ¶
Index ¶
- Constants
- Variables
- func Close()
- func InjectTrace2ctx(ctx context.Context) context.Context
- func NewMessageID() (id string)
- func RegisterServer(name string, server interface{}, conventions interface{}) error
- func Run(opts ...Option) error
- func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error)
- func SetWorkPoolSize(size int) (err error)
- type Broker
- type Context
- type DiscoverConfig
- type Endpoint
- type FuncMode
- type Header
- type Logger
- type Node
- type NodeState
- type Option
- type Pack
- type RPCInstance
- type RegisterConfig
- type RegisterDiscover
- type Server
- type ServiceDiscover
- type ServiceRegister
- type Socket
- type SvcMultiplexer
- func (m *SvcMultiplexer) AddOrUpdate(nodeid string, metadata []byte) error
- func (m *SvcMultiplexer) AddPeerNode(n *Node)
- func (m *SvcMultiplexer) Close()
- func (m *SvcMultiplexer) Delete(nodeid string)
- func (m *SvcMultiplexer) Reply(p *Pack) error
- func (m *SvcMultiplexer) Run()
- func (m *SvcMultiplexer) SelectPeerNode() (n Node, err error)
- func (m *SvcMultiplexer) SendError(pack *Pack, e error)
- type WatchCallback
Constants ¶
View Source
const ( // stage REQUEST = string(rune(iota + 1)) // 请求包 REPLY // 响应包,与 req 对应的 回复 STREAM // stream STREAM_END // stream 结束 ERROR // 异常包 // method name HEARTBEAT = string(rune(iota + 1000)) // 心跳包 SYNCSTATE // 同步节点状态 DISCONNECT // 通知让客户端断开连接 )
View Source
const ( Singleton mode = iota // 单节点模式 Cloud // 云模式/集群模式 )
View Source
const ( // 关于链路追踪的数据 TracePayloadKey zrpcContextKey = "__trace_ctx__" // 其他数据 PayloadKey zrpcContextKey = "__ctx__" // ctx 截止时间 DeadlineKey zrpcContextKey = "__deadline__" )
View Source
const ( PACKPATH = "__pack_path__" // pack 在集群中传播路径 TTL = "__ttl__" // pack 在集群中传播跳数 BLOCKSIZE = "__block_size__" // stream 请求中块大小 METHOD_NAME = "__method_name__" // 方法名称 MESSAGEID = "__msg_id__" // 消息id )
View Source
const ( Frontend socMode = iota + 1 // 前端 Backend // 后端 )
Variables ¶
View Source
var ( // errors ErrInvalidServer = errors.New("zrpc: register server err: invalid server") ErrNotImplements = errors.New("zrpc: the type not implements the given interface") ErrTooFewReturn = errors.New("zrpc: too few return values") ErrInvalidResultType = errors.New("zrpc: the last return value must be error") ErrInvalidParamType = errors.New("zrpc: the first param must be Context") ErrTooFewParam = errors.New("zrpc: too few parameters") ErrSubmitTimeout = errors.New("zrpc: submit task timed out") )
View Source
var (
ErrNoMessageID = errors.New("zrpc: pack: no messageid")
)
View Source
var ( MethodNameTable = map[string]string{ REQUEST: "REQUEST", REPLY: "REPLY", HEARTBEAT: "HEARTBEAT", DISCONNECT: "DISCONNECT", ERROR: "ERROR", } )
Functions ¶
func InjectTrace2ctx ¶
InjectTrace2ctx 提取链路追中上下文信息,并注入到新的 context 中
func RegisterServer ¶
RegisterServer 注册服务
Types ¶
type Broker ¶
type Broker interface { // AddPeerNode 添加平行节点 AddPeerNode(node *Node) // DelPeerNode 删除平行节点 DelPeerNode(node *Node) // AllPeerNode 获取所有平行节点 endpoint AllPeerNode() []Node // ForwardToPeerNode 本节点处理不了了,转发给其他节点 ForwardToPeerNode(to string, pack *Pack) // NewTask 获得新任务 NewTask() <-chan *Pack // Reply 回复结果 Reply(p *Pack) error // SetBrokerMode 设置 broker 运行模式 SetBrokerMode(m mode) // PublishNodeState 发布本节点状态 PublishNodeState() error // Run Run() // Close 关闭 Close(clis []string) }
Broker 代理
type Context ¶
Context 上下文信息
思路:几个固定字段作为上下文信息,比如超时时间、环境变量、链路追踪等
func NewContext ¶
func NewContext() *Context
func (*Context) MarshalMsgpack ¶
func (*Context) UnmarshalMsgpack ¶
type DiscoverConfig ¶
type DiscoverConfig struct { Registries []string // 注册中心 endpoint ServicePrefix string // 服务前缀 ServiceName string HeartBeatPeriod time.Duration HealthCheckEndPoint string // 注册中心进行健康检测回调的地址(Consul可能会用到) Logger Logger }
DiscoverConfig 服务发现所需配置
type Logger ¶
type Logger interface { Debug(args ...interface{}) Debugf(format string, args ...interface{}) Info(args ...interface{}) Infof(format string, args ...interface{}) Warn(args ...interface{}) Warnf(format string, args ...interface{}) Error(args ...interface{}) Errorf(format string, args ...interface{}) Fatal(args ...interface{}) }
type Node ¶
type Node struct { ServiceName string `json:"service_name" msgpack:"service_name"` NodeID string `json:"nodeid" msgpack:"nodeid"` LocalEndpoint Endpoint `json:"local_endpoint" msgpack:"local_endpoint"` // 本地 endpoint ClusterEndpoint Endpoint `json:"cluster_endpoint" msgpack:"cluster_endpoint"` // 集群 endpoint StateEndpoint Endpoint `json:"state_endpoint" msgpack:"state_endpoint"` // 状态 endpoint IsIdle bool `json:"is_idle" msgpack:"is_idle"` }
Node 节点信息
var (
DefaultNode Node
)
type NodeState ¶
type NodeState struct { *Node // contains filtered or unexported fields }
func (*NodeState) MarshalJSON ¶
func (*NodeState) MarshalMsgpack ¶
func (*NodeState) UnmarshalJSON ¶
func (*NodeState) UnmarshalMsgpack ¶
type Option ¶
type Option func(opt *options)
func WithHeartbeatInterval ¶
WithHeartbeatInterval 设置节点间心跳间隔
func WithMaxTimeoutPeriod ¶
WithMaxTimeoutPeriod 函数执行最大时间期限
func WithRegisterDiscover ¶
func WithRegisterDiscover(rd RegisterDiscover) Option
type Pack ¶
type Pack struct { Identity string `msgpack:"identity"` Stage string `msgpack:"method"` Header Header `msgpack:"head"` Args [][]byte `msgpack:"args"` }
func (*Pack) MarshalMsgpack ¶
func (*Pack) MethodName ¶
func (*Pack) SetMethodName ¶
type RPCInstance ¶
type RPCInstance struct {
// contains filtered or unexported fields
}
RPCInstance 保存管理 rpc 实例
func NewRPCInstance ¶
func NewRPCInstance() *RPCInstance
func (*RPCInstance) GenerateExecFunc ¶
func (rpc *RPCInstance) GenerateExecFunc(name string, r iReply) (methodFunc, error)
GenerateExecFunc 查找并返回可执行函数
name: /{servername}/methodname
func (*RPCInstance) RegisterServer ¶
func (rpc *RPCInstance) RegisterServer(name string, server interface{}, conventions interface{}) error
RegisterServer 注册 server
type RegisterConfig ¶
type RegisterConfig struct { Registries []string // 注册中心 endpoint ServicePrefix string // 服务前缀 HeartBeatPeriod time.Duration // 心跳间隔 ServerInfo Node HealthCheckEndPoint string // 注册中心进行健康检测回调的地址(Consul可能会用到) Logger Logger }
RegisterConfig 服务注册所需配置
type RegisterDiscover ¶
type RegisterDiscover interface { ServiceRegister ServiceDiscover }
RegisterDiscover 服务注册与发现
func NewRegisterDiscover ¶
func NewRegisterDiscover(r ServiceRegister, d ServiceDiscover) RegisterDiscover
type ServiceDiscover ¶
type ServiceDiscover interface { // Watch 监控节点变化 Watch(callback WatchCallback) // Stop 停止监控 Stop() }
ServiceDiscover 服务发现
func NewConsulDiscover ¶
func NewConsulDiscover(cnf *DiscoverConfig) (ServiceDiscover, error)
NewConsulDiscover consul 服务发现
func NewEtcdDiscover ¶
func NewEtcdDiscover(cnf *DiscoverConfig) (ServiceDiscover, error)
func NewZookeeperDiscover ¶
func NewZookeeperDiscover(cnf *DiscoverConfig) (ServiceDiscover, error)
type ServiceRegister ¶
type ServiceRegister interface { // Register 注册节点 Register() // Deregister 注销节点 Deregister() }
ServiceRegister 服务注册
func NewConsulRegister ¶
func NewConsulRegister(cnf *RegisterConfig) (ServiceRegister, error)
NewConsulRegister consul 服务注册
func NewEtcdRegistry ¶
func NewEtcdRegistry(cnf *RegisterConfig) (ServiceRegister, error)
func NewZookeeperRegister ¶
func NewZookeeperRegister(cnf *RegisterConfig) (ServiceRegister, error)
type Socket ¶
type Socket struct {
// contains filtered or unexported fields
}
func (*Socket) Disconnect ¶
Disconnect 断开到端点 endpoint 的连接(仅后端)
type SvcMultiplexer ¶
type SvcMultiplexer struct {
// contains filtered or unexported fields
}
func NewSvcMultiplexer ¶
func NewSvcMultiplexer(rpc *RPCInstance, opts ...Option) *SvcMultiplexer
func (*SvcMultiplexer) AddOrUpdate ¶
func (m *SvcMultiplexer) AddOrUpdate(nodeid string, metadata []byte) error
func (*SvcMultiplexer) AddPeerNode ¶
func (m *SvcMultiplexer) AddPeerNode(n *Node)
AddPeerNode 用于测试,后面删掉
func (*SvcMultiplexer) Close ¶
func (m *SvcMultiplexer) Close()
func (*SvcMultiplexer) Delete ¶
func (m *SvcMultiplexer) Delete(nodeid string)
func (*SvcMultiplexer) Reply ¶
func (m *SvcMultiplexer) Reply(p *Pack) error
func (*SvcMultiplexer) Run ¶
func (m *SvcMultiplexer) Run()
func (*SvcMultiplexer) SelectPeerNode ¶
func (m *SvcMultiplexer) SelectPeerNode() (n Node, err error)
func (*SvcMultiplexer) SendError ¶
func (m *SvcMultiplexer) SendError(pack *Pack, e error)
Source Files ¶
Click to show internal directories.
Click to hide internal directories.