zrpc

package module
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 24, 2022 License: Apache-2.0 Imports: 35 Imported by: 0

README

zrpc

简介

zrpc 是一款简单易用的 RPC 框架。

其支持以下4种请求类型的 RPC 方法:

  1. 请求-响应
  2. 流式请求
  3. 流式响应
  4. 双向流式

zrpc 依赖 ZeroMQ

安装 zeromq, 在 release 下载并编译安装:

tar -zxvf zeromq-4.x.x.tar.gz
cd zeromq-4.x.x
./configure
make && make install 
# 编译后生成的库文件 在目录 /usr/local/lib 下,将其移动到 /usr/lib64 目录
# 或将路径添加到 /etc/ld.so.conf,然后执行 ldconfig 刷新动态链接库。

使用

注意:下面的简单示例为单服务节点, 多服务节点需要使用 etcd、zk、consul 等服务发现组件。

接口定义:

type ISayHello interface {
    SayHello(ctx context.Context, name string) (string, error)
}

type SayHelloProxy struct {
    SayHello func(ctx context.Context, name string) (string, error)
}

服务端:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    "github.com/hunyxv/zrpc"
)

var _ ISayHello = (*SayHello)(nil)

type SayHello struct{}

func (s *SayHello) SayHello(ctx context.Context, name string) (string, error) {
    fmt.Println(name)
    return fmt.Sprintf("Hello %s!", name), nil
}

func main() {
    var i *ISayHello
    // 注册服务
    err := zrpc.RegisterServer("sayhello", &SayHello{}, i)
    if err != nil {
        panic(err)
    }

    // 注册多个服务
    // err = zrpc.RegisterServer("sayhello2", &SayHello{}, i)
    // if err != nil {
    //  panic(err)
    // }

    // 启动服务
    go zrpc.Run()
    log.Println("server id: ", zrpc.DefaultNode.NodeID)

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
    <-ch
    zrpc.Close()
}

/*
输出:
2022/03/19 13:55:36 register:  sayhello/SayHello 0
2022/03/19 13:55:36 server id:  32a168e5-a749-11ec-9bf9-00163e343ac0
*/

客户端调用 RPC 服务:

package main

import (
    "context"
    "log"
    "time"

    "github.com/hunyxv/zrpc"
    zrpcCli "github.com/hunyxv/zrpc/client"
)

func main() {
    serverinfo := zrpc.DefaultNode
    cli, err := zrpcCli.NewDirectClient(zrpcCli.ServerInfo{
        ServerName:    serverinfo.ServiceName,
        NodeID:        "32a168e5-a749-11ec-9bf9-00163e343ac0",   // 注意节点 id 为上面输出的 server id
        LocalEndpoint: serverinfo.LocalEndpoint,
        StateEndpoint: serverinfo.StateEndpoint,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    time.Sleep(100 * time.Millisecond)

    sayHello := new(SayHelloProxy)
    // 装饰一下
    err = cli.Decorator("sayhello", sayHello, 3) // 重试次数为 3 次

    // 调用 RPC 方法
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    resp, err := sayHello.SayHello(ctx, "Hunyxv")
    if err != nil {
        log.Fatal(err)
    }
    log.Println(resp)
}

关于“流式请求”、“流式相应”和链路追踪请看 example

待续...

友情链接

Documentation

Index

Constants

View Source
const (
	// stage
	REQUEST    = string(rune(iota + 1)) // 请求包
	REPLY                               // 响应包,与 req 对应的 回复
	STREAM                              // stream
	STREAM_END                          // stream 结束
	ERROR                               // 异常包

	// method name
	HEARTBEAT  = string(rune(iota + 1000)) // 心跳包
	SYNCSTATE                              // 同步节点状态
	DISCONNECT                             // 通知让客户端断开连接
)
View Source
const (
	Singleton mode = iota // 单节点模式
	Cloud                 // 云模式/集群模式
)
View Source
const (
	// 关于链路追踪的数据
	TracePayloadKey zrpcContextKey = "__trace_ctx__"
	// 其他数据
	PayloadKey zrpcContextKey = "__ctx__"

	// ctx 截止时间
	DeadlineKey zrpcContextKey = "__deadline__"
)
View Source
const (
	PACKPATH    = "__pack_path__"   // pack 在集群中传播路径
	TTL         = "__ttl__"         // pack 在集群中传播跳数
	BLOCKSIZE   = "__block_size__"  // stream 请求中块大小
	METHOD_NAME = "__method_name__" // 方法名称
	MESSAGEID   = "__msg_id__"      // 消息id
)
View Source
const (
	Frontend socMode = iota + 1 // 前端
	Backend                     // 后端
)

Variables

View Source
var (
	// errors
	ErrInvalidServer     = errors.New("zrpc: register server err: invalid server")
	ErrNotImplements     = errors.New("zrpc: the type not implements the given interface")
	ErrTooFewReturn      = errors.New("zrpc: too few return values")
	ErrInvalidResultType = errors.New("zrpc: the last return value must be error")
	ErrInvalidParamType  = errors.New("zrpc: the first param must be Context")
	ErrTooFewParam       = errors.New("zrpc: too few parameters")
	ErrSubmitTimeout     = errors.New("zrpc: submit task timed out")
)
View Source
var (
	ErrNoMessageID = errors.New("zrpc: pack: no messageid")
)
View Source
var (
	MethodNameTable = map[string]string{
		REQUEST:    "REQUEST",
		REPLY:      "REPLY",
		HEARTBEAT:  "HEARTBEAT",
		DISCONNECT: "DISCONNECT",
		ERROR:      "ERROR",
	}
)

Functions

func Close

func Close()

Close 关闭 zrpc 服务

func InjectTrace2ctx

func InjectTrace2ctx(ctx context.Context) context.Context

InjectTrace2ctx 提取链路追中上下文信息,并注入到新的 context 中

func NewMessageID

func NewMessageID() (id string)

NewMessageID 生成消息ID,前5字节是时间戳(ms),后11字节是随机数

func RegisterServer

func RegisterServer(name string, server interface{}, conventions interface{}) error

RegisterServer 注册服务

func Run

func Run(opts ...Option) error

Run 启动 zrpc 服务

func ScanLines

func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error)

func SetWorkPoolSize

func SetWorkPoolSize(size int) (err error)

SetWorkPoolSize 设置工作池大小(默认无限大)

Types

type Broker

type Broker interface {
	// AddPeerNode 添加平行节点
	AddPeerNode(node *Node)
	// DelPeerNode 删除平行节点
	DelPeerNode(node *Node)
	// AllPeerNode 获取所有平行节点 endpoint
	AllPeerNode() []Node
	// ForwardToPeerNode 本节点处理不了了,转发给其他节点
	ForwardToPeerNode(to string, pack *Pack)
	// NewTask 获得新任务
	NewTask() <-chan *Pack
	// Reply 回复结果
	Reply(p *Pack) error
	// SetBrokerMode 设置 broker 运行模式
	SetBrokerMode(m mode)
	// PublishNodeState 发布本节点状态
	PublishNodeState() error
	// Run
	Run()
	// Close 关闭
	Close(clis []string)
}

Broker 代理

func NewBroker

func NewBroker(state *NodeState, hbInterval time.Duration, logger Logger) (Broker, error)

type Context

type Context struct {
	context.Context
	// contains filtered or unexported fields
}

Context 上下文信息

思路:几个固定字段作为上下文信息,比如超时时间、环境变量、链路追踪等

func NewContext

func NewContext() *Context

func (*Context) Cancel

func (ctx *Context) Cancel()

func (*Context) MarshalMsgpack

func (ctx *Context) MarshalMsgpack() ([]byte, error)

func (*Context) UnmarshalMsgpack

func (ctx *Context) UnmarshalMsgpack(b []byte) error

type DiscoverConfig

type DiscoverConfig struct {
	Registries          []string // 注册中心 endpoint
	ServicePrefix       string   // 服务前缀
	ServiceName         string
	HeartBeatPeriod     time.Duration
	HealthCheckEndPoint string // 注册中心进行健康检测回调的地址(Consul可能会用到)
	Logger              Logger
}

DiscoverConfig 服务发现所需配置

type Endpoint

type Endpoint struct {
	Scheme string
	Host   string
	Port   int
}

func (Endpoint) String

func (e Endpoint) String() string

type FuncMode

type FuncMode int
const (
	ReqRep FuncMode = iota
	StreamReqRep
	ReqStreamRep
	Stream
)
type Header map[string][]string

func (Header) Add

func (h Header) Add(key, value string)

func (Header) Get

func (h Header) Get(key string) string

func (Header) Has

func (h Header) Has(key string) bool

func (Header) Pop

func (h Header) Pop(key string) string

func (Header) Set

func (h Header) Set(key, value string)

type Logger

type Logger interface {
	Debug(args ...interface{})
	Debugf(format string, args ...interface{})
	Info(args ...interface{})
	Infof(format string, args ...interface{})
	Warn(args ...interface{})
	Warnf(format string, args ...interface{})
	Error(args ...interface{})
	Errorf(format string, args ...interface{})
	Fatal(args ...interface{})
}

type Node

type Node struct {
	ServiceName     string   `json:"service_name" msgpack:"service_name"`
	NodeID          string   `json:"nodeid" msgpack:"nodeid"`
	LocalEndpoint   Endpoint `json:"local_endpoint" msgpack:"local_endpoint"`     // 本地 endpoint
	ClusterEndpoint Endpoint `json:"cluster_endpoint" msgpack:"cluster_endpoint"` // 集群 endpoint
	StateEndpoint   Endpoint `json:"state_endpoint" msgpack:"state_endpoint"`     // 状态 endpoint
	IsIdle          bool     `json:"is_idle" msgpack:"is_idle"`
}

Node 节点信息

var (
	DefaultNode Node
)

type NodeState

type NodeState struct {
	*Node
	// contains filtered or unexported fields
}

func NewNodeState

func NewNodeState(node *Node, workPoolSize int) (*NodeState, error)

func (*NodeState) MarshalJSON

func (s *NodeState) MarshalJSON() ([]byte, error)

func (*NodeState) MarshalMsgpack

func (s *NodeState) MarshalMsgpack() ([]byte, error)

func (*NodeState) UnmarshalJSON

func (s *NodeState) UnmarshalJSON(b []byte) error

func (*NodeState) UnmarshalMsgpack

func (s *NodeState) UnmarshalMsgpack(b []byte) error

type Option

type Option func(opt *options)

func WithHeartbeatInterval

func WithHeartbeatInterval(t time.Duration) Option

WithHeartbeatInterval 设置节点间心跳间隔

func WithLogger

func WithLogger(logger Logger) Option

WithLogger 设置 logger

func WithMaxTimeoutPeriod

func WithMaxTimeoutPeriod(t time.Duration) Option

WithMaxTimeoutPeriod 函数执行最大时间期限

func WithNodeInfo

func WithNodeInfo(node Node) Option

WithNodeInfo 设置启动节点

func WithPackTTL

func WithPackTTL(ttl int) Option

WithPackTTL 设置数据包的最大跳数

func WithRegisterDiscover

func WithRegisterDiscover(rd RegisterDiscover) Option

type Pack

type Pack struct {
	Identity string   `msgpack:"identity"`
	Stage    string   `msgpack:"method"`
	Header   Header   `msgpack:"head"`
	Args     [][]byte `msgpack:"args"`
}

func (*Pack) Get

func (p *Pack) Get(key string) string

func (*Pack) MarshalMsgpack

func (p *Pack) MarshalMsgpack() (pack []byte, err error)

func (*Pack) MethodName

func (p *Pack) MethodName() string

func (*Pack) Set

func (p *Pack) Set(key, value string)

func (*Pack) SetMethodName

func (p *Pack) SetMethodName(method string)

type RPCInstance

type RPCInstance struct {
	// contains filtered or unexported fields
}

RPCInstance 保存管理 rpc 实例

func NewRPCInstance

func NewRPCInstance() *RPCInstance

func (*RPCInstance) GenerateExecFunc

func (rpc *RPCInstance) GenerateExecFunc(name string, r iReply) (methodFunc, error)

GenerateExecFunc 查找并返回可执行函数

name: /{servername}/methodname

func (*RPCInstance) RegisterServer

func (rpc *RPCInstance) RegisterServer(name string, server interface{}, conventions interface{}) error

RegisterServer 注册 server

type RegisterConfig

type RegisterConfig struct {
	Registries          []string      // 注册中心 endpoint
	ServicePrefix       string        // 服务前缀
	HeartBeatPeriod     time.Duration // 心跳间隔
	ServerInfo          Node
	HealthCheckEndPoint string // 注册中心进行健康检测回调的地址(Consul可能会用到)
	Logger              Logger
}

RegisterConfig 服务注册所需配置

type RegisterDiscover

type RegisterDiscover interface {
	ServiceRegister
	ServiceDiscover
}

RegisterDiscover 服务注册与发现

type Server

type Server struct {
	ServerName string
	// contains filtered or unexported fields
}

type ServiceDiscover

type ServiceDiscover interface {
	// Watch 监控节点变化
	Watch(callback WatchCallback)
	// Stop 停止监控
	Stop()
}

ServiceDiscover 服务发现

func NewConsulDiscover

func NewConsulDiscover(cnf *DiscoverConfig) (ServiceDiscover, error)

NewConsulDiscover consul 服务发现

func NewEtcdDiscover

func NewEtcdDiscover(cnf *DiscoverConfig) (ServiceDiscover, error)

func NewZookeeperDiscover

func NewZookeeperDiscover(cnf *DiscoverConfig) (ServiceDiscover, error)

type ServiceRegister

type ServiceRegister interface {
	// Register 注册节点
	Register()
	// Deregister 注销节点
	Deregister()
}

ServiceRegister 服务注册

func NewConsulRegister

func NewConsulRegister(cnf *RegisterConfig) (ServiceRegister, error)

NewConsulRegister consul 服务注册

func NewEtcdRegistry

func NewEtcdRegistry(cnf *RegisterConfig) (ServiceRegister, error)

func NewZookeeperRegister

func NewZookeeperRegister(cnf *RegisterConfig) (ServiceRegister, error)

type Socket

type Socket struct {
	// contains filtered or unexported fields
}

func NewSocket

func NewSocket(id string, t zmq.Type, mode socMode, endpoint string) (*Socket, error)

func (*Socket) Close

func (s *Socket) Close()

Close 关闭 socket

func (*Socket) Connect

func (s *Socket) Connect(endpoint string)

Connect 连接到端点 endpoint (仅后端, zmq.SUB除外)

func (*Socket) Disconnect

func (s *Socket) Disconnect(endpoint string)

Disconnect 断开到端点 endpoint 的连接(仅后端)

func (*Socket) Recv

func (s *Socket) Recv() <-chan [][]byte

func (*Socket) Send

func (s *Socket) Send() chan<- [][]byte

func (*Socket) Subscribe

func (s *Socket) Subscribe(topic string)

Subscribe 订阅消息

func (*Socket) Unsubscribe

func (s *Socket) Unsubscribe(topic string)

Unsubscribe 取消订阅

type SvcMultiplexer

type SvcMultiplexer struct {
	// contains filtered or unexported fields
}

func NewSvcMultiplexer

func NewSvcMultiplexer(rpc *RPCInstance, opts ...Option) *SvcMultiplexer

func (*SvcMultiplexer) AddOrUpdate

func (m *SvcMultiplexer) AddOrUpdate(nodeid string, metadata []byte) error

func (*SvcMultiplexer) AddPeerNode

func (m *SvcMultiplexer) AddPeerNode(n *Node)

AddPeerNode 用于测试,后面删掉

func (*SvcMultiplexer) Close

func (m *SvcMultiplexer) Close()

func (*SvcMultiplexer) Delete

func (m *SvcMultiplexer) Delete(nodeid string)

func (*SvcMultiplexer) Reply

func (m *SvcMultiplexer) Reply(p *Pack) error

func (*SvcMultiplexer) Run

func (m *SvcMultiplexer) Run()

func (*SvcMultiplexer) SelectPeerNode

func (m *SvcMultiplexer) SelectPeerNode() (n Node, err error)

func (*SvcMultiplexer) SendError

func (m *SvcMultiplexer) SendError(pack *Pack, e error)

type WatchCallback

type WatchCallback interface {
	AddOrUpdate(nodeid string, metadata []byte) error
	Delete(nodeid string)
}

WatchCallback 服务发现,节点变更事件回调接口

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL