rpcx

package module
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 29, 2017 License: Apache-2.0 Imports: 23 Imported by: 0

README

rpcx

中文介绍 中文电子书

License GoDoc travis Go Report Card QQ群

rpcx is a distributed RPC framework like Alibaba Dubbo and Weibo Motan. It is based on Go net/rpc and provides extra governance features.

Throughput

very very good performance. Much better than gRPC, Dubbo and weibo Motan.

When we talk about RPC frameworks, Dubbo is the first framework we should mention, and also Dubbox mantained by dangdang. Dubbo has been widely used in e-commerce companies in China, for example, Alibaba, Jingdong and Dangdang.

Though Dubbo still uses Spring 2.5.6.SEC03, and is no longer supported by Alibaba, some other companies still usea it and maintain their own branches.

DUBBO is a distributed service framework , provides high performance and transparent RPC remote service call. It is the core framework of Alibaba SOA service governance programs. There are 3,000,000,000 calls for 2,000+ services per day, and it has been widely used in various member sites of Alibaba Group.

Motan is open source now by Weibo. As Zhang Lei said, he is the current main developer of Motan:

Motan started in 2013. There are 100 billion calls for hundreds of service calls every day.

Those two RPC frameworks are developed in Java. There are other famous RPC frameworks such as thrift、and finagle

The goal of rpcx is to implement an RPC framework like Dubbo in Go. It is developed in Go, and to be used in Go.

It is a distributed、pluggable RPC framework with governance (service discovery、load balancer、fault tolerance、monitoring, etc.).

As you know, RPC frameworks already exists, for example, net/rpcgrpc-gogorilla-rpc, So why re-invent the wheel?

Although those Go RPC frameworks work well, their function is relatively simple and they only implement end-to-end communication. They lack some product features of service management functions like service discovery, load balancing, and fault tolerance.

So I created rpcx and expect it to become an RPC framework like Dubbo.

A similar project in go is called go-micro.

What's RPC

From wikiPedia:

In distributed computing, a remote procedure call (RPC) is when a computer program causes a procedure (subroutine) to execute in another address space (commonly on another computer on a shared network), which is coded as if it were a normal (local) procedure call, without the programmer explicitly coding the details for the remote interaction. That is, the programmer writes essentially the same code whether the subroutine is local to the executing program, or remote.[1] This is a form of client–server interaction (caller is client, executer is server), typically implemented via a request–response message-passing system. The object-oriented programming analog is remote method invocation (RMI). The RPC model implies a level of location transparency, namely that calling procedures is largely the same whether it is local or remote, but usually they are not identical, so local calls can be distinguished from remote calls. Remote calls are usually orders of magnitude slower and less reliable than local calls, so distinguishing them is useful.

RPCs are a form of inter-process communication (IPC), in that different processes have different address spaces: if on the same host machine, they have distinct virtual address spaces, even though the physical address space is the same; while if they are on different hosts, the physical address space is different. Many different (often incompatible) technologies have been used to implement the concept.

Sequence of events during an RPC

  1. The client calls the client stub. The call is a local procedure call, with parameters pushed on to the stack in the normal way.
  2. The client stub packs the parameters into a message and makes a system call to send the message. Packing the parameters is called marshalling.
  3. The client's local operating system sends the message from the client machine to the server machine.
  4. The local operating system on the server machine passes the incoming packets to the server stub.
  5. The server stub unpacks the parameters from the message. Unpacking the parameters is called unmarshalling.
  6. Finally, the server stub calls the server procedure. The reply traces the same steps in the reverse direction.

There are two ways to implement RPC frameworks. One focusses on cross-language calls and the other focusses on service governance.

Dubbo、DubboX、and Motan are RPC frameworks focusing on service governance. Thrift、gRPC、Hessian、and Hprose are RPC frameworks focusing on cross-language calls.

rpcx focus in on service governance.

Features

  • Based on net/rpc. a Go net/prc project can be converted to use rpcx with a few changes.
  • Pluggable. Features are implemented by Plugins, such as service discovery.
  • Communicate with TCP long connections.
  • Support GeoLocation
  • Support many codec. for example, Gob、Json、MessagePack、gencode、ProtoBuf.
  • Service dicovery. support ZooKeeper、Etcd.
  • Fault tolerance:Failover、Failfast、Failtry.
  • Load banlancer:support randomSelecter, RoundRobin, consistent hashing, etc.
  • Scalable.
  • Other: metrics、log、timeout.
  • Authorization.
  • Compression:inflate and snappy.
  • Alias name for services.
  • kcp support: a full-featured reliable-UDP library for golang

rpcx has fixed/implemented the below issues in golang/go rpc and I believe those issues won't be fixed in the official library because the official rpc library will be frozen.

  • #16449: proposal: support custom method names in net/rpc
  • #15236: net/rpc: expected Timeout based alternatives to functions for rpc.Dial, rpc.DialHTTP, rpc.DialHTTPPath [proposal].
  • #13395: net/rpc: Server.ServeHTTP assumes default http mux
  • #10929: net/rpc/jsonrpc: Missing support for JSON-RPC 2.0
  • #7946: net/rpc: add client support for RPC over https
  • #4591: Authentication for RPC and HTTP
  • #4584: net/rpc: access to client's IP address
  • #3615: rpc: allow aliasing methods

rpcx-ui provides a web ui for services management. rpcx-ui

Architecture

rpcx contains three roles : RPC Server,RPC Client and Registry.

  • Server registers services on the Registry
  • Client queries the service list and select a server from the server list returned from the Registry.
  • When a Server is down, the Registry can remove the server from the list, and subsequently the client can remove it too.

So far rpcx supports ZooKeeper and Etcd as Registry,Consul support is in developement.

Examples

There is a client calling the Posts service at tr.colobu.com. This service returns a hot tech post list of the day:

package main

import (
	"encoding/json"
	"fmt"
	"time"

	"gopkg.in/mgo.v2/bson"

	"github.com/smallnest/rpcx"
  "github.com/smallnest/rpcx/log"
)

type Args struct {
	PostType string `msg:"posttype"`
}

type Reply struct {
	Posts []Post `msg:"posts"`
}

type Post struct {
	PostID      bson.ObjectId `json:"id" xml:"id" bson:"_id,omitempty"`
	PostType    string        `json:"ptype" xml:"ptype" bson:"ptype,omitempty"`
	Title       string        `json:"title" xml:"title" bson:"title"`
	URL         string        `json:"url" xml:"url" bson:"url"`
	Domain      string        `json:"domain" xml:"domain" bson:"domain"`
	ShortURL    string        `json:"surl" xml:"surl" bson:"surl"`
	Description string        `json:"desc" xml:"desc" bson:"desc"`
	LikeCount   int           `json:"like" xml:"like" bson:"like"`
	ImageURL    string        `json:"imgurl" xml:"imgurl" bson:"imgurl"`
	RecommendBy string        `json:"-" xml:"-" bson:"-"`
	Tags        string        `json:"tags" xml:"tags" bson:"tags"`
	State       int           `json:"-" xml:"-" bson:"-"`
	Timestamp   time.Time     `json:"ts" xml:"timestamp" bson:"ts"`
}

func main() {
	s := &rpcx.DirectClientSelector{Network: "tcp", Address: "tr.colobu.com:8972", DialTimeout: 10 * time.Second}
	client := rpcx.NewClient(s)
	defer client.Close()

	args := &Args{"golang"}
	var reply Reply
	err := client.Call("Posts.Query", args, &reply)
	if err != nil {
		log.Infof("error for Posts: %s, %v \n", args.PostType, err)
		return
	}

	posts := reply.Posts
	data, _ := json.MarshalIndent(&posts, "", "\t")

	log.Infof("Posts: %s \n", string(data))
}

you can found more examples at _examples

Benchmark

Test Environment

  • CPU: Intel(R) Xeon(R) CPU E5-2620 v2 @ 2.10GHz, 24 cores
  • Memory: 16G
  • OS: Linux Server-3 2.6.32-358.el6.x86_64, CentOS 6.4
  • Go: 1.7

Test request is copied from protobuf project and encoded to a proto message. Its size is 581 bytes. The response update two fields of decoded requests so the server goes through decoding and encoding.

The test proto file is:

syntax = "proto2";

package main;

option optimize_for = SPEED;


message BenchmarkMessage {
  required string field1 = 1;
  optional string field9 = 9;
  optional string field18 = 18;
  optional bool field80 = 80 [default=false];
  optional bool field81 = 81 [default=true];
  required int32 field2 = 2;
  required int32 field3 = 3;
  optional int32 field280 = 280;
  optional int32 field6 = 6 [default=0];
  optional int64 field22 = 22;
  optional string field4 = 4;
  repeated fixed64 field5 = 5;
  optional bool field59 = 59 [default=false];
  optional string field7 = 7;
  optional int32 field16 = 16;
  optional int32 field130 = 130 [default=0];
  optional bool field12 = 12 [default=true];
  optional bool field17 = 17 [default=true];
  optional bool field13 = 13 [default=true];
  optional bool field14 = 14 [default=true];
  optional int32 field104 = 104 [default=0];
  optional int32 field100 = 100 [default=0];
  optional int32 field101 = 101 [default=0];
  optional string field102 = 102;
  optional string field103 = 103;
  optional int32 field29 = 29 [default=0];
  optional bool field30 = 30 [default=false];
  optional int32 field60 = 60 [default=-1];
  optional int32 field271 = 271 [default=-1];
  optional int32 field272 = 272 [default=-1];
  optional int32 field150 = 150;
  optional int32 field23 = 23 [default=0];
  optional bool field24 = 24 [default=false];
  optional int32 field25 = 25 [default=0];
  optional bool field78 = 78;
  optional int32 field67 = 67 [default=0];
  optional int32 field68 = 68;
  optional int32 field128 = 128 [default=0];
  optional string field129 = 129 [default="xxxxxxxxxxxxxxxxxxxxx"];
  optional int32 field131 = 131 [default=0];
}

The test is run with 100, 1000, 2000 and 5000 concurrent clients. The total number of requests per clients is 1,000,000.

Test Result

rpcx: one client and one server on the same machine
concurrent clients mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 0 0 17 0 164338
500 2 1 40 0 181126
1000 4 3 56 0 186219
2000 9 7 105 0 182815
5000 25 22 200 0 178858

you can use test code in _benchmark to test. server is used to start a server and client is used as clients via protobuf.

The above test is with a client and a server running on the same mechine.

rpcx: one client and one server running on different machines

If I run them on separate servers, the test results are:

concurrent clients mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 1 1 20 0 127975
500 5 1 4350 0 136407
1000 10 2 3233 0 155255
2000 17 2 9735 0 159438
5000 44 2 12788 0 161917
rpcx: one client on one machine and two servers on two machines

When running in cluster mode, with one machine with one client and another two running as servers, the test results are:

concurrent clients mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 0 0 41 0 128932
500 3 2 273 0 150285
1000 5 5 621 0 150152
2000 10 7 288 0 159974
5000 23 12 629 0 155279
benchmarks of serialization libraries:
[root@localhost rpcx]# go test -bench . -test.benchmem
PASS
BenchmarkNetRPC_gob-16            100000             18742 ns/op             321 B/op          9 allocs/op
BenchmarkNetRPC_jsonrpc-16        100000             21360 ns/op            1170 B/op         31 allocs/op
BenchmarkNetRPC_msgp-16           100000             18617 ns/op             776 B/op         35 allocs/op
BenchmarkRPCX_gob-16              100000             18718 ns/op             320 B/op          9 allocs/op
BenchmarkRPCX_json-16             100000             21238 ns/op            1170 B/op         31 allocs/op
BenchmarkRPCX_msgp-16             100000             18635 ns/op             776 B/op         35 allocs/op
BenchmarkRPCX_gencodec-16         100000             18454 ns/op            4485 B/op         17 allocs/op
BenchmarkRPCX_protobuf-16         100000             17234 ns/op             733 B/op         13 allocs/op

Comparision with gRPC

gRPC is the RPC framework by Google. It supports multiple programming lanaguage. I have compared three cases for prcx and gRPC. It shows rpcx is much better than gRPC.

current gRPC version is 1.0.

The test results for rpcx can be found above. Below is the test results for gRPC.

gRPC: one client and one server on the same machine
concurrent clients mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 1 0 21 0 68250
500 5 1 3059 0 78486
1000 10 1 6274 0 79980
2000 19 1 9736 0 58129
5000 43 2 14224 0 44724

gRPC: one client and one server running on different machines
concurrent clients mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 1 0 21 0 68250
500 5 1 3059 0 78486
1000 10 1 6274 0 79980
2000 19 1 9736 0 58129
5000 43 2 14224 0 44724

gRPC: one client on one machine and two servers on two machines
concurrent clients mean(ms) median(ms) max(ms) min(ms) throughput(TPS)
100 1 0 19 0 88082
500 4 1 1461 0 90334
1000 9 1 6315 0 62305
2000 17 1 9736 0 44487
5000 38 1 25087 0 33198

Documentation

Index

Constants

View Source
const (
	//DefaultRPCPath is the defaut HTTP RPC PATH
	DefaultRPCPath = "/_goRPC_"
)

Variables

View Source
var (
	ErrPluginAlreadyExists   = NewRPCError("cannot use the same plugin again, '%s' is already exists")
	ErrPluginActivate        = NewRPCError("while trying to activate plugin '%s'. Trace: %s")
	ErrPluginRemoveNoPlugins = NewRPCError("no plugins are registed yet, you cannot remove a plugin from an empty list!")
	ErrPluginRemoveEmptyName = NewRPCError("plugin with an empty name cannot be removed")
	ErrPluginRemoveNotFound  = NewRPCError("cannot remove a plugin which doesn't exists")

	ErrWrongServiceMethod = NewRPCError("? is not allowed in service method name):%s")
)
View Source
var IP4Reg = regexp.MustCompile(`^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$`)

Functions

func Auth

func Auth(fn AuthorizationFunc) error

Auth sets authorization handler

func Close

func Close() error

Close closes RPC server.

func GetListenedAddress

func GetListenedAddress() string

GetListenedAddress return the listening address.

func NewCompressConn added in v1.1.2

func NewCompressConn(conn net.Conn, compressType CompressType) net.Conn

NewCompressConn creates a wrapped net.Conn with CompressType

func NewDirectHTTPRPCClient

func NewDirectHTTPRPCClient(c *Client, clientCodecFunc ClientCodecFunc, network, address string, path string, timeout time.Duration) (*rpc.Client, error)

NewDirectHTTPRPCClient creates a rpc http client

func NewDirectKCPRPCClient added in v1.1.2

func NewDirectKCPRPCClient(c *Client, clientCodecFunc ClientCodecFunc, network, address string, path string, timeout time.Duration) (*rpc.Client, error)

NewDirectKCPRPCClient creates a kcp client. kcp project: https://github.com/xtaci/kcp-go

func NewDirectRPCClient

func NewDirectRPCClient(c *Client, clientCodecFunc ClientCodecFunc, network, address string, timeout time.Duration) (*rpc.Client, error)

NewDirectRPCClient creates a rpc client

func RegisterName

func RegisterName(name string, service interface{}, metadata ...string)

RegisterName publishes in the server the set of methods .

func Serve

func Serve(n, address string) (err error)

Serve starts and listens RPC requests. It is blocked until receiving connectings from clients.

func ServeByHTTP

func ServeByHTTP(ln net.Listener)

ServeByHTTP implements RPC via HTTP

func ServeByMux added in v1.1.2

func ServeByMux(ln net.Listener, mux *http.ServeMux)

ServeByMux implements RPC via HTTP with customized mux

func ServeListener

func ServeListener(ln net.Listener)

ServeListener serve with a listener

func ServeTLS

func ServeTLS(n, address string, config *tls.Config) (err error)

ServeTLS starts and listens RPC requests. It is blocked until receiving connectings from clients.

func SetServerCodecFunc

func SetServerCodecFunc(fn ServerCodecFunc)

SetServerCodecFunc sets a ServerCodecFunc

func Start

func Start(n, address string) (err error)

Start starts and listens RPC requests without blocking.

func StartTLS

func StartTLS(n, address string, config *tls.Config) (err error)

StartTLS starts and listens RPC requests without blocking.

Types

type ArgsContext added in v1.1.2

type ArgsContext interface {
	Value(key string) interface{}
	SetValue(key string, value interface{})
}

ArgsContext contains net.Conn so services can get net.Conn info, for example, remote address.

type AuthorizationAndServiceMethod

type AuthorizationAndServiceMethod struct {
	Authorization string // Authorization
	ServiceMethod string // real ServiceMethod name
	Tag           string // extra tag for Authorization
}

AuthorizationAndServiceMethod represents Authorization header and ServiceMethod.

func (*AuthorizationAndServiceMethod) String added in v1.1.1

func (aasm *AuthorizationAndServiceMethod) String() string

type AuthorizationClientPlugin

type AuthorizationClientPlugin struct {
	AuthorizationAndServiceMethod *AuthorizationAndServiceMethod
}

AuthorizationClientPlugin is used to set Authorization info at client side.

func NewAuthorizationClientPlugin

func NewAuthorizationClientPlugin(authorization, tag string) *AuthorizationClientPlugin

NewAuthorizationClientPlugin creates a AuthorizationClientPlugin with authorization header and tag

func (*AuthorizationClientPlugin) Name

func (plugin *AuthorizationClientPlugin) Name() string

Name return name of this plugin.

func (*AuthorizationClientPlugin) PreWriteRequest

func (plugin *AuthorizationClientPlugin) PreWriteRequest(r *rpc.Request, body interface{}) error

PreWriteRequest adds Authorization info in requests

type AuthorizationFunc

type AuthorizationFunc func(p *AuthorizationAndServiceMethod) error

AuthorizationFunc defines a method type which handles Authorization info

type AuthorizationServerPlugin

type AuthorizationServerPlugin struct {
	AuthorizationFunc AuthorizationFunc
}

AuthorizationServerPlugin is used to authorize clients.

func (*AuthorizationServerPlugin) Name

func (plugin *AuthorizationServerPlugin) Name() string

Name return name of this plugin.

func (*AuthorizationServerPlugin) PostReadRequestHeader

func (plugin *AuthorizationServerPlugin) PostReadRequestHeader(r *rpc.Request) (err error)

PostReadRequestHeader extracts Authorization header from ServiceMethod field.

type Client

type Client struct {
	ClientSelector  ClientSelector
	ClientCodecFunc ClientCodecFunc
	PluginContainer IClientPluginContainer
	FailMode        FailMode
	TLSConfig       *tls.Config
	Block           kcp.BlockCrypt
	Retries         int
	//Timeout sets deadline for underlying net.Conns
	Timeout time.Duration
	//Timeout sets readdeadline for underlying net.Conns
	ReadTimeout time.Duration
	//Timeout sets writedeadline for underlying net.Conns
	WriteTimeout time.Duration
}

Client represents a RPC client.

func NewClient

func NewClient(s ClientSelector) *Client

NewClient create a client.

func (*Client) Auth

func (c *Client) Auth(authorization, tag string) error

Auth sets Authorization info

func (*Client) Call

func (c *Client) Call(serviceMethod string, args interface{}, reply interface{}) (err error)

Call invokes the named function, waits for it to complete, and returns its error status.

func (*Client) Close

func (c *Client) Close() error

Close closes the connection

func (*Client) Go

func (c *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *rpc.Call) *rpc.Call

Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.

type ClientCodecFunc

type ClientCodecFunc func(conn io.ReadWriteCloser) rpc.ClientCodec

ClientCodecFunc is used to create a rpc.ClientCodecFunc from net.Conn.

type ClientPluginContainer

type ClientPluginContainer struct {
	// contains filtered or unexported fields
}

ClientPluginContainer implements IPluginContainer interface.

func (*ClientPluginContainer) Add

func (p *ClientPluginContainer) Add(plugin IPlugin) error

Add adds a plugin.

func (*ClientPluginContainer) DoPostConnected added in v1.1.2

func (p *ClientPluginContainer) DoPostConnected(conn net.Conn) (net.Conn, bool)

DoPostConnected handles connected

func (*ClientPluginContainer) DoPostReadResponseBody

func (p *ClientPluginContainer) DoPostReadResponseBody(body interface{}) error

DoPostReadResponseBody invokes DoPostReadResponseBody plugin.

func (*ClientPluginContainer) DoPostReadResponseHeader

func (p *ClientPluginContainer) DoPostReadResponseHeader(r *rpc.Response) error

DoPostReadResponseHeader invokes DoPostReadResponseHeader plugin.

func (*ClientPluginContainer) DoPostWriteRequest

func (p *ClientPluginContainer) DoPostWriteRequest(r *rpc.Request, body interface{}) error

DoPostWriteRequest invokes DoPostWriteRequest plugin.

func (*ClientPluginContainer) DoPreReadResponseBody

func (p *ClientPluginContainer) DoPreReadResponseBody(body interface{}) error

DoPreReadResponseBody invokes DoPreReadResponseBody plugin.

func (*ClientPluginContainer) DoPreReadResponseHeader

func (p *ClientPluginContainer) DoPreReadResponseHeader(r *rpc.Response) error

DoPreReadResponseHeader invokes DoPreReadResponseHeader plugin.

func (*ClientPluginContainer) DoPreWriteRequest

func (p *ClientPluginContainer) DoPreWriteRequest(r *rpc.Request, body interface{}) error

DoPreWriteRequest invokes DoPreWriteRequest plugin.

func (*ClientPluginContainer) GetAll

func (p *ClientPluginContainer) GetAll() []IPlugin

GetAll returns all activated plugins

func (*ClientPluginContainer) GetByName

func (p *ClientPluginContainer) GetByName(pluginName string) IPlugin

GetByName returns a plugin instance by it's name

func (*ClientPluginContainer) GetName

func (p *ClientPluginContainer) GetName(plugin IPlugin) string

GetName returns the name of a plugin, if no GetName() implemented it returns an empty string ""

func (*ClientPluginContainer) Remove

func (p *ClientPluginContainer) Remove(pluginName string) error

Remove removes a plugin by it's name.

type ClientSelector

type ClientSelector interface {
	//Select returns a new client and it also update current client
	Select(clientCodecFunc ClientCodecFunc, options ...interface{}) (*rpc.Client, error)
	//SetClient set current client
	SetClient(*Client)
	SetSelectMode(SelectMode)
	//AllClients returns all Clients
	AllClients(clientCodecFunc ClientCodecFunc) []*rpc.Client
	//handle failed client
	HandleFailedClient(client *rpc.Client)
}

ClientSelector defines an interface to create a rpc.Client from cluster or standalone.

type CompressConn added in v1.1.2

type CompressConn struct {
	net.Conn
	// contains filtered or unexported fields
}

CompressConn wraps a net.Conn and supports compression

func (*CompressConn) Read added in v1.1.2

func (c *CompressConn) Read(b []byte) (n int, err error)

func (*CompressConn) Write added in v1.1.2

func (c *CompressConn) Write(b []byte) (n int, err error)

type CompressType added in v1.1.2

type CompressType byte

CompressType is compression type. Currently only support zip and snappy

const (
	// CompressNone represents no compression
	CompressNone CompressType = iota
	// CompressFlate represents zip
	CompressFlate
	// CompressSnappy represents snappy
	CompressSnappy
	// CompressZstd represents Facebook/Zstandard
	CompressZstd
	// CompressLZ4 represents LZ4 (http://www.lz4.org)
	CompressLZ4
)

type DirectClientSelector

type DirectClientSelector struct {
	Network, Address string
	DialTimeout      time.Duration
	Client           *Client
	// contains filtered or unexported fields
}

DirectClientSelector is used to a direct rpc server. It don't select a node from service cluster but a specific rpc server.

func (*DirectClientSelector) AllClients

func (s *DirectClientSelector) AllClients(clientCodecFunc ClientCodecFunc) []*rpc.Client

AllClients returns rpc.Clients to all servers

func (*DirectClientSelector) HandleFailedClient added in v1.1.2

func (s *DirectClientSelector) HandleFailedClient(client *rpc.Client)

func (*DirectClientSelector) Select

func (s *DirectClientSelector) Select(clientCodecFunc ClientCodecFunc, options ...interface{}) (*rpc.Client, error)

Select returns a rpc client.

func (*DirectClientSelector) SetClient

func (s *DirectClientSelector) SetClient(c *Client)

SetClient sets the unique client.

func (*DirectClientSelector) SetSelectMode

func (s *DirectClientSelector) SetSelectMode(sm SelectMode)

SetSelectMode is meaningless for DirectClientSelector because there is only one client.

type FailMode

type FailMode int

FailMode is a feature to decide client actions when clients fail to invoke services

const (
	//Failover selects another server automaticaly
	Failover FailMode = iota
	//Failfast returns error immediately
	Failfast
	//Failtry use current client again
	Failtry
	//Broadcast sends requests to all servers and Success only when all servers return OK
	Broadcast
	//Forking sends requests to all servers and Success once one server returns OK
	Forking
)
type Header map[string][]string

A Header represents the key-value pairs in an RPCX header. It is imlemented refer to http Header.

func (Header) Add added in v1.1.3

func (h Header) Add(key, value string)

Add adds the key, value pair to the header. It appends to any existing values associated with key.

func (Header) Del added in v1.1.3

func (h Header) Del(key string)

Del deletes the values associated with key.

func (Header) Get added in v1.1.3

func (h Header) Get(key string) string

Get gets the first value associated with the given key. It is case insensitive; textproto.CanonicalMIMEHeaderKey is used to canonicalize the provided key. If there are no values associated with the key, Get returns "". To access multiple values of a key, or to use non-canonical keys, access the map directly.

func (Header) Len added in v1.1.3

func (h Header) Len() int

Len returns length of h.

func (Header) Set added in v1.1.3

func (h Header) Set(key, value string)

Set sets the header entries associated with key to the single element value. It replaces any existing values associated with key.

type IClientPluginContainer

type IClientPluginContainer interface {
	Add(plugin IPlugin) error
	Remove(pluginName string) error
	GetName(plugin IPlugin) string
	GetByName(pluginName string) IPlugin
	GetAll() []IPlugin

	DoPostConnected(net.Conn) (net.Conn, bool)

	DoPreReadResponseHeader(*rpc.Response) error
	DoPostReadResponseHeader(*rpc.Response) error
	DoPreReadResponseBody(interface{}) error
	DoPostReadResponseBody(interface{}) error

	DoPreWriteRequest(*rpc.Request, interface{}) error
	DoPostWriteRequest(*rpc.Request, interface{}) error
}

IClientPluginContainer represents a plugin container that defines all methods to manage plugins. And it also defines all extension points.

type IPlugin

type IPlugin interface {
	Name() string
}

IPlugin represents a plugin.

type IPostConnAcceptPlugin

type IPostConnAcceptPlugin interface {
	HandleConnAccept(net.Conn) (net.Conn, bool)
}

IPostConnAcceptPlugin represents connection accept plugin. if returns false, it means subsequent IPostConnAcceptPlugins should not contiune to handle this conn and this conn has been closed.

type IPostConnectedPlugin added in v1.1.2

type IPostConnectedPlugin interface {
	HandleConnected(net.Conn) (net.Conn, bool)
}

IPostConnectedPlugin represents connected plugin.

type IPostReadRequestBodyPlugin

type IPostReadRequestBodyPlugin interface {
	PostReadRequestBody(body interface{}) error
}

IPostReadRequestBodyPlugin represents .

type IPostReadRequestHeaderPlugin

type IPostReadRequestHeaderPlugin interface {
	PostReadRequestHeader(r *rpc.Request) error
}

IPostReadRequestHeaderPlugin represents .

type IPostReadResponseBodyPlugin

type IPostReadResponseBodyPlugin interface {
	PostReadResponseBody(interface{}) error
}

IPostReadResponseBodyPlugin represents .

type IPostReadResponseHeaderPlugin

type IPostReadResponseHeaderPlugin interface {
	PostReadResponseHeader(*rpc.Response) error
}

IPostReadResponseHeaderPlugin represents .

type IPostWriteRequestPlugin

type IPostWriteRequestPlugin interface {
	PostWriteRequest(*rpc.Request, interface{}) error
}

IPostWriteRequestPlugin represents .

type IPostWriteResponsePlugin

type IPostWriteResponsePlugin interface {
	PostWriteResponse(*rpc.Response, interface{}) error
}

IPostWriteResponsePlugin represents .

type IPreReadRequestBodyPlugin

type IPreReadRequestBodyPlugin interface {
	PreReadRequestBody(body interface{}) error
}

IPreReadRequestBodyPlugin represents .

type IPreReadRequestHeaderPlugin

type IPreReadRequestHeaderPlugin interface {
	PreReadRequestHeader(r *rpc.Request) error
}

IPreReadRequestHeaderPlugin represents .

type IPreReadResponseBodyPlugin

type IPreReadResponseBodyPlugin interface {
	PreReadResponseBody(interface{}) error
}

IPreReadResponseBodyPlugin represents .

type IPreReadResponseHeaderPlugin

type IPreReadResponseHeaderPlugin interface {
	PreReadResponseHeader(*rpc.Response) error
}

IPreReadResponseHeaderPlugin represents .

type IPreWriteRequestPlugin

type IPreWriteRequestPlugin interface {
	PreWriteRequest(*rpc.Request, interface{}) error
}

IPreWriteRequestPlugin represents .

type IPreWriteResponsePlugin

type IPreWriteResponsePlugin interface {
	PreWriteResponse(*rpc.Response, interface{}) error
}

IPreWriteResponsePlugin represents .

type IRegisterPlugin

type IRegisterPlugin interface {
	Register(name string, rcvr interface{}, metadata ...string) error
}

IRegisterPlugin represents register plugin.

type IServerPluginContainer

type IServerPluginContainer interface {
	Add(plugin IPlugin) error
	Remove(pluginName string) error
	GetName(plugin IPlugin) string
	GetByName(pluginName string) IPlugin
	GetAll() []IPlugin

	DoRegister(name string, rcvr interface{}, metadata ...string) error

	DoPostConnAccept(net.Conn) (net.Conn, bool)

	DoPreReadRequestHeader(r *rpc.Request) error
	DoPostReadRequestHeader(r *rpc.Request) error

	DoPreReadRequestBody(body interface{}) error
	DoPostReadRequestBody(body interface{}) error

	DoPreWriteResponse(*rpc.Response, interface{}) error
	DoPostWriteResponse(*rpc.Response, interface{}) error
}

IServerPluginContainer represents a plugin container that defines all methods to manage plugins. And it also defines all extension points.

func GetPluginContainer

func GetPluginContainer() IServerPluginContainer

GetPluginContainer get PluginContainer of default server.

type MultiError

type MultiError struct {
	Errors []error
}

MultiError holds multiple errors

func NewMultiError

func NewMultiError(errors []error) *MultiError

NewMultiError creates and returns an Error with error splice

func (*MultiError) Error

func (e *MultiError) Error() string

Error returns the message of the actual error

type RPCError

type RPCError struct {
	// contains filtered or unexported fields
}

RPCError holds the error

func NewRPCError

func NewRPCError(errMsg string) *RPCError

NewRPCError creates and returns an Error with a message

func (*RPCError) Error

func (e *RPCError) Error() string

Error returns the message of the actual error

func (*RPCError) Format

func (e *RPCError) Format(args ...interface{}) error

Format returns a formatted new error based on the arguments

func (*RPCError) Panic

func (e *RPCError) Panic()

Panic output the message and after panics

func (*RPCError) Panicf

func (e *RPCError) Panicf(args ...interface{})

Panicf output the formatted message and after panics

func (*RPCError) Return

func (e *RPCError) Return() error

Return returns the actual error as it is

func (*RPCError) With

func (e *RPCError) With(err error) error

With does the same thing as Format but it receives an error type which if it's nil it returns a nil error

type SelectMode

type SelectMode int

SelectMode defines the algorithm of selecting a services from cluster

const (
	//RandomSelect is selecting randomly
	RandomSelect SelectMode = iota
	//RoundRobin is selecting by round robin
	RoundRobin
	//WeightedRoundRobin is selecting by weighted round robin
	WeightedRoundRobin
	//WeightedICMP is selecting by weighted Ping time
	WeightedICMP
	//ConsistentHash is selecting by hashing
	ConsistentHash
	//Closest is selecting the closest server
	Closest
)

func (SelectMode) String

func (s SelectMode) String() string

type Server

type Server struct {
	ServerCodecFunc ServerCodecFunc
	//PluginContainer must be configured before starting and Register plugins must be configured before invoking RegisterName method
	PluginContainer IServerPluginContainer
	//Metadata describes extra info about this service, for example, weight, active status
	Metadata string

	Timeout      time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
	// contains filtered or unexported fields
}

Server represents a RPC Server.

func NewServer

func NewServer() *Server

NewServer returns a new Server.

func (*Server) Address

func (s *Server) Address() string

Address return the listening address.

func (*Server) Auth

func (s *Server) Auth(fn AuthorizationFunc) error

Auth sets authorization function

func (*Server) Close

func (s *Server) Close() error

Close closes RPC server.

func (*Server) RegisterName

func (s *Server) RegisterName(name string, service interface{}, metadata ...string)

RegisterName publishes in the server the set of methods of the receiver value that satisfy the following conditions:

  • exported method of exported type
  • two arguments, both of exported type
  • the second argument is a pointer
  • one return value, of type error

It returns an error if the receiver is not an exported type or has no suitable methods. It also logs the error using package log. The client accesses each method using a string of the form "Type.Method", where Type is the receiver's concrete type.

func (*Server) Serve

func (s *Server) Serve(network, address string) (err error)

Serve starts and listens RPC requests. It is blocked until receiving connectings from clients.

func (*Server) ServeByHTTP

func (s *Server) ServeByHTTP(ln net.Listener, rpcPath string)

ServeByHTTP serves

func (*Server) ServeByMux added in v1.1.2

func (s *Server) ServeByMux(ln net.Listener, rpcPath string, mux *http.ServeMux)

ServeByMux serves

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)

ServeHTTP implements net handler interface

func (*Server) ServeListener

func (s *Server) ServeListener(ln net.Listener)

ServeListener starts

func (*Server) ServeTLS

func (s *Server) ServeTLS(network, address string, config *tls.Config) (err error)

ServeTLS starts and listens RPC requests. It is blocked until receiving connectings from clients.

func (*Server) Start

func (s *Server) Start(network, address string) (err error)

Start starts and listens RPC requests without blocking.

func (*Server) StartTLS

func (s *Server) StartTLS(network, address string, config *tls.Config) (err error)

StartTLS starts and listens RPC requests without blocking.

type ServerCodecFunc

type ServerCodecFunc func(conn io.ReadWriteCloser) rpc.ServerCodec

ServerCodecFunc is used to create a rpc.ServerCodec from net.Conn.

type ServerPluginContainer

type ServerPluginContainer struct {
	// contains filtered or unexported fields
}

ServerPluginContainer implements IPluginContainer interface.

func (*ServerPluginContainer) Add

func (p *ServerPluginContainer) Add(plugin IPlugin) error

Add adds a plugin.

func (*ServerPluginContainer) DoPostConnAccept

func (p *ServerPluginContainer) DoPostConnAccept(conn net.Conn) (net.Conn, bool)

DoPostConnAccept handles accepted conn

func (*ServerPluginContainer) DoPostReadRequestBody

func (p *ServerPluginContainer) DoPostReadRequestBody(body interface{}) error

DoPostReadRequestBody invokes DoPostReadRequestBody plugin.

func (*ServerPluginContainer) DoPostReadRequestHeader

func (p *ServerPluginContainer) DoPostReadRequestHeader(r *rpc.Request) error

DoPostReadRequestHeader invokes DoPostReadRequestHeader plugin.

func (*ServerPluginContainer) DoPostWriteResponse

func (p *ServerPluginContainer) DoPostWriteResponse(resp *rpc.Response, body interface{}) error

DoPostWriteResponse invokes DoPostWriteResponse plugin.

func (*ServerPluginContainer) DoPreReadRequestBody

func (p *ServerPluginContainer) DoPreReadRequestBody(body interface{}) error

DoPreReadRequestBody invokes DoPreReadRequestBody plugin.

func (*ServerPluginContainer) DoPreReadRequestHeader

func (p *ServerPluginContainer) DoPreReadRequestHeader(r *rpc.Request) error

DoPreReadRequestHeader invokes DoPreReadRequestHeader plugin.

func (*ServerPluginContainer) DoPreWriteResponse

func (p *ServerPluginContainer) DoPreWriteResponse(resp *rpc.Response, body interface{}) error

DoPreWriteResponse invokes DoPreWriteResponse plugin.

func (*ServerPluginContainer) DoRegister

func (p *ServerPluginContainer) DoRegister(name string, rcvr interface{}, metadata ...string) error

DoRegister invokes DoRegister plugin.

func (*ServerPluginContainer) GetAll

func (p *ServerPluginContainer) GetAll() []IPlugin

GetAll returns all activated plugins

func (*ServerPluginContainer) GetByName

func (p *ServerPluginContainer) GetByName(pluginName string) IPlugin

GetByName returns a plugin instance by it's name

func (*ServerPluginContainer) GetName

func (p *ServerPluginContainer) GetName(plugin IPlugin) string

GetName returns the name of a plugin, if no GetName() implemented it returns an empty string ""

func (*ServerPluginContainer) Remove

func (p *ServerPluginContainer) Remove(pluginName string) error

Remove removes a plugin by it's name.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL