Documentation
¶
Index ¶
- Constants
- Variables
- type Breaker
- type Call
- type Client
- func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, ...) error
- func (client *Client) Close() error
- func (c *Client) Connect(network, address string) error
- func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, ...) *Call
- func (client *Client) IsClosing() bool
- func (client *Client) IsShutdown() bool
- func (client *Client) RegisterServerMessageChan(ch chan<- *protocol.Message)
- func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
- func (client *Client) UnregisterServerMessageChan()
- type FailMode
- type KVPair
- type Option
- type RPCClient
- type SelectMode
- type Selector
- type ServiceDiscovery
- type ServiceError
- type XClient
Constants ¶
const ( XVersion = "X-RPCX-Version" // rpcx版本 XMessageType = "X-RPCX-MesssageType" // 消息类型: http/tcp XHeartbeat = "X-RPCX-Heartbeat" // 消息是否为心跳包(默认值直接忽略) XOneway = "X-RPCX-Oneway" // 是否丢弃response XMessageStatusType = "X-RPCX-MessageStatusType" // 消息状态类型 XSerializeType = "X-RPCX-SerializeType" // 消息序列化类型:json、[]byte、pf XMessageID = "X-RPCX-MessageID" // 消息ID XServicePath = "X-RPCX-ServicePath" // 服务 XServiceMethod = "X-RPCX-ServiceMethod" // 服务中具体的方法 XMeta = "X-RPCX-Meta" // 元数据类似header中key-value对 XErrorMessage = "X-RPCX-ErrorMessage" // )
const ( // ReaderBuffsize is used for bufio reader. // 读缓存容量,默认16KB ReaderBuffsize = 16 * 1024 // WriterBuffsize is used for bufio writer. // 写缓存容量, 默认16KB WriterBuffsize = 16 * 1024 )
Variables ¶
var ( ErrShutdown = errors.New("connection is shut down") // connection关闭触发的error ErrUnsupportedCodec = errors.New("unsupported codec") // 目前rpcx提供格式不支持该内容完成编解码 )
ErrShutdown connection is closed.
var ( // ErrXClientShutdown xclient is shutdown. ErrXClientShutdown = errors.New("xClient is shut down") // ErrXClientNoServer selector can't found one server. ErrXClientNoServer = errors.New("can not found any server") ErrServerUnavailable = errors.New("selected server is unavilable") )
var DefaultOption = Option{ Retries: 3, RPCPath: share.DefaultRPCPath, ConnectTimeout: 10 * time.Second, SerializeType: protocol.MsgPack, CompressType: protocol.None, BackupLatency: 10 * time.Millisecond, }
DefaultOption is a common option configuration for client. 默认通用的client配置
Functions ¶
This section is empty.
Types ¶
type Breaker ¶
Breaker is a CircuitBreaker interface. 断路器接口
var CircuitBreaker Breaker = circuit.NewRateBreaker(0.95, 100)
CircuitBreaker is a default circuit breaker (RateBreaker(0.95, 100)). 默认断路器: 失败率达到0.95将service变为不可用,并在100ms之后再重试该service,成功close断路器,否则继续停止该service并在100ms再重新
type Call ¶
type Call struct { ServicePath string // The name of the service and method to call. ServiceMethod string // The name of the service and method to call. Metadata map[string]string //metadata ResMetadata map[string]string Args interface{} // The argument to the function (*struct). 提供rpc具体服务的func的参数 Reply interface{} // The reply from the function (*struct). 提供rpc具体服务的func的结果 Error error // After completion, the error status. 一次rpc执行完成后 error状态 Done chan *Call // Strobes when call is complete. Raw bool // raw message or not 是否为原始消息 }
Call represents an active RPC. RPC
type Client ¶
type Client struct { Conn net.Conn // connection Plugins PluginContainer // 插件容器 ServerMessageChan chan<- *protocol.Message // contains filtered or unexported fields }
Client represents a RPC client.
func (*Client) Call ¶
func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
Call invokes the named function, waits for it to complete, and returns its error status. 同步调用,完成一次rpc 并返回当前的错误状态
func (*Client) Close ¶
Close calls the underlying connection's Close method. If the connection is already shutting down, ErrShutdown is returned. 关闭连接
func (*Client) Go ¶
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
异步调用,当通过Go方法进行一次call请求,将返回该请求调用的Call对象(ServicePath服务地址、ServiceMethod服务方法、MetaData元数据、Args服务方法参数、Reply服务返回结果、Done完成call通道)
当本次call完成后将channel发送信号:完成后的Call对象 注意:本次Call绑定的channel 不存在时则新建,存在需要保证同时进行的rpc请求;没有buffered存放Call 会触发panic
func (*Client) IsShutdown ¶
IsShutdown client is shutdown or not.
func (*Client) RegisterServerMessageChan ¶
RegisterServerMessageChan registers the channel that receives server requests.
func (*Client) SendRaw ¶
func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
发送原始信息 不需要关注func参数和响应结果 需要使用者自行包装请求信息Message SendRaw sends raw messages. You don't care args and replys.
func (*Client) UnregisterServerMessageChan ¶
func (client *Client) UnregisterServerMessageChan()
UnregisterServerMessageChan removes ServerMessageChan.
type FailMode ¶
type FailMode int
FailMode decides how clients action when clients fail to invoke services
type Option ¶
type Option struct { // Group is used to select the services in the same group. Services set group info in their meta. // If it is empty, clients will ignore group. Group string // 设定services在相同的group便于service的隔离及管理;并且将group信息设置到meta data里;若未指定group 则client忽略 // Retries retries to send Retries int // 重试次数 // TLSConfig for tcp and quic TLSConfig *tls.Config // tls配置用于(tcp/quic协议) // kcp.BlockCrypt Block interface{} // 加密 // RPCPath for http connection RPCPath string // http连接转为rpc请求 指定的path //ConnectTimeout sets timeout for dialing ConnectTimeout time.Duration // connection 有效期 // ReadTimeout sets readdeadline for underlying net.Conns ReadTimeout time.Duration // 读有效期 // WriteTimeout sets writedeadline for underlying net.Conns WriteTimeout time.Duration // 写有效期 // BackupLatency is used for Failbackup mode. rpcx will sends another request if the first response doesn't return in BackupLatency time. BackupLatency time.Duration // Failbackup模式下,当第一个request在有效期内(latency time)未能返回结果,则rpcx会重新发送另一个request,两个request任意一个返回结果即认为请求成功 // Breaker is used to config CircuitBreaker GenBreaker func() Breaker // 断路器 SerializeType protocol.SerializeType // 编解码类型(一般client和server相同的编码格式) CompressType protocol.CompressType // 压缩类型 Heartbeat bool // 是否为心跳信息 HeartbeatInterval time.Duration // 心跳间隔 }
Option contains all options for creating clients. 关于client的配置选项
type RPCClient ¶
type RPCClient interface { Connect(network, address string) error // 连接 Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call // 异步调用 Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error // 同步调用 SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error) // 直连 发送原始内容 Close() error // 关闭connection RegisterServerMessageChan(ch chan<- *protocol.Message) // UnregisterServerMessageChan() IsClosing() bool IsShutdown() bool }
RPCClient is interface that defines one client to call one server. 定义client调用server的相关接口
type SelectMode ¶
type SelectMode int
SelectMode defines the algorithm of selecting a services from candidates.
const ( //RandomSelect is selecting randomly RandomSelect SelectMode = iota //RoundRobin is selecting by round robin RoundRobin //WeightedRoundRobin is selecting by weighted round robin WeightedRoundRobin //WeightedICMP is selecting by weighted Ping time WeightedICMP //ConsistentHash is selecting by hashing ConsistentHash //Closest is selecting the closest server Closest // SelectByUser is selecting by implementation of users SelectByUser = 1000 )
type Selector ¶
type Selector interface { Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string UpdateServer(servers map[string]string) }
Selector defines selector that selects one service from candidates.
type ServiceDiscovery ¶
type ServiceDiscovery interface { GetServices() []*KVPair // 获取注册的服务 WatchService() chan []*KVPair // 监听注册的服务 RemoveWatcher(ch chan []*KVPair) // 移除服务监听 Clone(servicePath string) ServiceDiscovery // 复制 Close() // 关闭 }
ServiceDiscovery defines ServiceDiscovery of zookeeper, etcd and consul 服务发现:提供zookeeper、etcd、consul注册中心、还有peer2peer、mutilpleServers、mDns、Inprocess
type ServiceError ¶
type ServiceError string // 服务端错误字符串形式
ServiceError is an error from server.
func (ServiceError) Error ¶
func (e ServiceError) Error() string
type XClient ¶
type XClient interface { SetPlugins(plugins PluginContainer) // 设定插件 SetSelector(s Selector) // 服务路由模式:随机、轮训、加权轮询、加权ICMP(ping时间)、一致性hash、基于地理位置就近选择、用户自定义路由 ConfigGeoSelector(latitude, longitude float64) // 指定地理位置 Auth(auth string) // 鉴权 Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error) // 异步请求 Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error // 同步请求 Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error // 请求所有节点:请求成功只返回一个节点的结果;若是出现错误,也只会将一个节点error返回 Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error // SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error) // 自己封装protocol.Message 采用原始发送请求server Close() error // 关闭clien }
XClient is an interface that used by client with service discovery and service governance. One XClient is used only for one service. You should create multiple XClient for multiple services.
XClient用于具备服务发现和服务治理的client。 XClient与Service是一一关联的,定义多个service就需要对应个数的XClient XClient是在前面的client基础上增加路由、失败模式、超时机制、断路器等功能
func NewBidirectionalXClient ¶
func NewBidirectionalXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option, serverMessageChan chan<- *protocol.Message) XClient
NewBidirectionalXClient creates a new xclient that can receive notifications from servers.
正常RPC只有client向server的单向通信,而没有server向client的通信 通过该方法能够实现client与server双向通信,大体和NewXClient很类似
func NewXClient ¶
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient
NewXClient creates a XClient that supports service discovery and service governance.
创建XClient:需指定失败模式、路由模式、服务发现方式、client额外选项