grpcx

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 28, 2022 License: Apache-2.0 Imports: 11 Imported by: 0

README

pbrpc

pbrpc是一种基于TCP协议的二进制高性能RPC通信协议实现。它以Protobuf作为基本的数据交换格式。

features:
  • 内置连接池,具备更高的性能,低延迟 QPS: 5w+
  • 支持自动重连功能
  • 支持附件发送
  • 支持超时功能
  • 压缩功能,支持GZip与Snappy
  • 集成内置HTTP管理功能
依赖三方库
  1. golang-protobuf 针对golang开发支持google protocol buffer库, 获取方式如下
    go get github.com/golang/protobuf
  2. Snappy压缩类库,获取方式如下

    go get github.com/golang/snappy
Demo示例
开发RPC服务端
  1. 首先需要实现Service接口

    type Service interface {
    	/*
    	   RPC service call back method.
    	   message : parameter in from RPC client or 'nil' if has no parameter
    	   attachment : attachment content from RPC client or 'nil' if has no attachment
    	   logId : with a int64 type log sequence id from client or 'nil if has no logId
    	   return:
    	   [0] message return back to RPC client or 'nil' if need not return method response
    	   [1] attachment return back to RPC client or 'nil' if need not return attachemnt
    	   [2] return with any error or 'nil' represents success
    	*/
    	DoService(message proto.Message, attachment []byte, logId *int64) (proto.Message, []byte, error)
    	GetServiceName() string
    	GetMethodName() string
        // 获得参数类型,pb会反序化到这个对象
    	NewParameter() proto.Message
    }
    

  2. Service接口实现示例如下

type SimpleService struct {
	serviceName string
	methodName  string
}

func NewSimpleService(serviceName, methodName string) *SimpleService {
	ret := SimpleService{serviceName, methodName}
	return &ret
}

func (ss *SimpleService) DoService(msg proto.Message, attachment []byte, logId *int64) (proto.Message, []byte, error) {
	var ret = "hello "

	if msg != nil {
		var name *string = nil

		m := msg.(*DataMessage)
		name = m.Name

		if len(*name) == 0 {
			ret = ret + "veryone"
		} else {
			ret = ret + *name
		}
	}
	dm := DataMessage{}
	dm.Name = proto.String(ret)
	return &dm, []byte{1, 5, 9}, nil

}

func (ss *SimpleService) GetServiceName() string {
	return ss.serviceName
}

func (ss *SimpleService) GetMethodName() string {
	return ss.methodName
}

func (ss *SimpleService) NewParameter() proto.Message {
	ret := DataMessage{}
	return &ret
}
  1. 创建RPC服务并注册该实现接口

    func main() {
        serverMeta := pbrpc.ServerMeta{}
    	serverMeta.Host = nil
    	serverMeta.Port = 8122
    	rpcServer := pbrpc.NewTpcServer(&serverMeta)
    
    	ss := NewSimpleService("echoService", "echo")
    
    	rpcServer.Register(ss)
    
    	// start server and block here
    	err := rpcServer.StartAndBlock()
    
    	if err != nil {
    		pbrpc.Error(err)
    		os.Exit(-1)
    	}
    }
    

    至此RPC已经开发完成,运行上面代码,就可以发布完成.

  2. DataMessage对象定义如下

type DataMessage struct {
	Name *string `protobuf:"bytes,1,req,name=name" json:"name,omitempty"`
}

func (m *DataMessage) Reset()         { *m = DataMessage{} }
func (m *DataMessage) String() string { return proto.CompactTextString(m) }
func (*DataMessage) ProtoMessage()    {}

func (m *DataMessage) GetName() string {
	if m.Name != nil {
		return *m.Name
	}
	return ""
}
开发RPC客户端
    // 创建链接(本示例使用连接池方式)
	url := pbrpc.URL{}
	url.SetHost(host).SetPort(port)
    timeout := time.Second * 5

    connection, err := pbrpc.NewDefaultTCPConnectionPool(url, &timeout)
	if err != nil {
		fmt.Println(err)
		os.Exit(-1)
	}
    defer connection.Close()

    // 创建client
    rpcClient, err := pbrpc.NewRpcCient(connection)
    if err != nil {
		fmt.Println(err)
		os.Exit(-1)
	}
    // 调用RPC
	serviceName := "echoService"
	methodName := "echo"
	rpcInvocation := pbrpc.NewRpcInvocation(&serviceName, &methodName)

    // 指定压缩算法
    rpcInvocation.CompressType = proto.Int32(pbrpc.COMPRESS_GZIP)

	message := "say hello 中文测试"
	dm := DataMessage{&message}

	rpcInvocation.SetParameterIn(&dm)
	rpcInvocation.LogId = proto.Int64(1)

	parameterOut := DataMessage{}

	response, err := rpcClient.SendRpcRequest(rpcInvocation, &parameterOut)
	if err != nil {
		fmt.Println(err)
		os.Exit(-1)
	}

	if response == nil {
		fmt.Println("Reponse is nil")
		return
	}

更多使用示例参见demo

Documentation

Index

Constants

View Source
const (
	COMPRESS_NO     int32 = 0
	COMPRESS_SNAPPY int32 = 1
	COMPRESS_GZIP   int32 = 2

	HeaderSize = 12
	MagicCode  = "PRPC"
)

Variables

View Source
var ERR_IGNORE_ERR = errors.New("[marshal-001]Ingore error")

error log info definition

View Source
var ERR_META = errors.New("[marshal-003]Get nil value from Meta struct after marshal")
View Source
var ERR_NO_SNAPPY = errors.New("[marshal-002]Snappy compress not support yet.")

Functions

func GUNZIP

func GUNZIP(b []byte) ([]byte, error)

func GZIP

func GZIP(b []byte) ([]byte, error)

Types

type Header struct {
	MagicCode   []byte
	MessageSize int32
	MetaSize    int32
}

Header PbRPC header content

func NewHeader

func NewHeader() *Header

NewHeader create new empty header

func (*Header) Bytes

func (h *Header) Bytes() []byte

Bytes return header struct to byte array

func (*Header) Load

func (h *Header) Load(data []byte) error

Load use bytes array to update field for header

func (*Header) SetMagicCode

func (h *Header) SetMagicCode(MagicCode []byte) error

SetMagicCode change magic code in header

type Package

type Package struct {
	Header     Header
	Meta       pbrpc.RpcMeta
	Data       []byte
	Attachment []byte
}
Data package for baidu RPC.
all request and response data package should apply this.

----------------------------------- | Head | Meta | Data | Attachment | -----------------------------------

1. <Head> with fixed 12 byte length as follow format ---------------------------------------------- | PRPC | MessageSize(int32) | MetaSize(int32) | ---------------------------------------------- MessageSize = totalSize - 12(Fixed Head Size) MetaSize = Meta object size

2. <Meta> body proto description as follow

message RpcMeta {
    optional RpcRequestMeta request = 1;
    optional RpcResponseMeta response = 2;
    optional int32 compress_type = 3; // 0:nocompress 1:Snappy 2:gzip
    optional int64 correlation_id = 4;
    optional int32 attachment_size = 5;
    optional ChunkInfo chuck_info = 6;
    optional bytes authentication_data = 7;
};
message Request {
    required string service_name = 1;
    required string method_name = 2;
    optional int64 log_id = 3;
};
message Response {
    optional int32 error_code = 1;
    optional string error_text = 2;
};
messsage ChunkInfo {
        required int64 stream_id = 1;
        required int64 chunk_id = 2;
};

3. <Data> customize transport data message.

4. <Attachment> attachment body data message

func NewPackage

func NewPackage() *Package

func NewRequestPackage

func NewRequestPackage() *Package

func NewResponsePackage

func NewResponsePackage() *Package

func (*Package) Bytes

func (r *Package) Bytes() ([]byte, error)

Bytes Convert RpcPackage to byte array

func (*Package) GetMagicCode

func (r *Package) GetMagicCode() string

func (*Package) GetTraceId

func (r *Package) GetTraceId() string

func (*Package) Load

func (r *Package) Load(b []byte) error

Load 加载[]byte里的数据至Package

func (*Package) ReadIO

func (r *Package) ReadIO(rw io.Reader) error

ReadIO Read byte array and initialize RpcPackage

func (*Package) SetAttachment

func (r *Package) SetAttachment(Attachment []byte) *Package

func (*Package) SetAuthenticationData

func (r *Package) SetAuthenticationData(authenticationData []byte) *Package

func (*Package) SetChunkInfo

func (r *Package) SetChunkInfo(streamId int64, chunkId int64) *Package

func (*Package) SetCompressType

func (r *Package) SetCompressType(compressType int32) *Package

func (*Package) SetCorrelationId

func (r *Package) SetCorrelationId(correlationId int64) *Package

func (*Package) SetData

func (r *Package) SetData(Data []byte) *Package

TODO: 看看协议,此处叫data还是叫payload合适?

func (*Package) SetErrorCode

func (r *Package) SetErrorCode(errorCode int32) *Package

func (*Package) SetErrorText

func (r *Package) SetErrorText(errorText string) *Package

func (*Package) SetMagicCode

func (r *Package) SetMagicCode(magicCode []byte) error

func (*Package) SetMethodName

func (r *Package) SetMethodName(methodName string) *Package

func (*Package) SetServiceName

func (r *Package) SetServiceName(serviceName string) *Package

func (*Package) SetTraceId

func (r *Package) SetTraceId(traceId string) *Package

func (*Package) WriteIO

func (r *Package) WriteIO(rw io.Writer) (int, error)

WriteIO write package to io.Writer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL