Documentation ¶
Index ¶
- Constants
- Variables
- func Hash(key uint64, buckets int32) int32
- func HashString(s string) uint64
- func JumpConsistentHash(len int, options ...interface{}) int
- type Breaker
- type Call
- type Client
- func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, ...) error
- func (client *Client) Close() error
- func (c *Client) Connect(network, address string) error
- func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, ...) *Call
- func (client *Client) IsClosing() bool
- func (client *Client) IsShutdown() bool
- func (client *Client) RegisterServerMessageChan(ch chan<- *protocol.Message)
- func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
- func (client *Client) UnregisterServerMessageChan()
- type ConsecCircuitBreaker
- type ConsistentAddrStrFunction
- type FailMode
- type HashServiceAndArgs
- type InprocessDiscovery
- type KVPair
- type MDNSDiscovery
- type MultipleServersDiscovery
- func (d MultipleServersDiscovery) Clone(servicePath string) ServiceDiscovery
- func (d *MultipleServersDiscovery) Close()
- func (d MultipleServersDiscovery) GetServices() []*KVPair
- func (d *MultipleServersDiscovery) RemoveWatcher(ch chan []*KVPair)
- func (d *MultipleServersDiscovery) Update(pairs []*KVPair)
- func (d *MultipleServersDiscovery) WatchService() chan []*KVPair
- type Option
- type Peer2PeerDiscovery
- type Plugin
- type PluginContainer
- type PostCallPlugin
- type PreCallPlugin
- type RPCClient
- type SelectMode
- type Selector
- type ServiceDiscovery
- func NewInprocessDiscovery() ServiceDiscovery
- func NewMDNSDiscovery(service string, timeout time.Duration, watchInterval time.Duration, ...) ServiceDiscovery
- func NewMDNSDiscoveryTemplate(timeout time.Duration, watchInterval time.Duration, domain string) ServiceDiscovery
- func NewMultipleServersDiscovery(pairs []*KVPair) ServiceDiscovery
- func NewPeer2PeerDiscovery(server, metadata string) ServiceDiscovery
- type ServiceError
- type Weighted
- type XClient
Constants ¶
const ( XVersion = "X-RPCX-Version" XMessageType = "X-RPCX-MesssageType" XHeartbeat = "X-RPCX-Heartbeat" XOneway = "X-RPCX-Oneway" XMessageStatusType = "X-RPCX-MessageStatusType" XSerializeType = "X-RPCX-SerializeType" XMessageID = "X-RPCX-MessageID" XServicePath = "X-RPCX-ServicePath" XServiceMethod = "X-RPCX-ServiceMethod" XMeta = "X-RPCX-Meta" XErrorMessage = "X-RPCX-ErrorMessage" )
const ( // ReaderBuffsize is used for bufio reader. ReaderBuffsize = 16 * 1024 // WriterBuffsize is used for bufio writer. WriterBuffsize = 16 * 1024 )
Variables ¶
var ( ErrBreakerOpen = errors.New("breaker open") ErrBreakerTimeout = errors.New("breaker time out") )
var ( ErrShutdown = errors.New("connection is shut down") ErrUnsupportedCodec = errors.New("unsupported codec") )
ErrShutdown connection is closed.
var ( // ErrXClientShutdown xclient is shutdown. ErrXClientShutdown = errors.New("xClient is shut down") // ErrXClientNoServer selector can't found one server. ErrXClientNoServer = errors.New("can not found any server") ErrServerUnavailable = errors.New("selected server is unavilable") )
var DefaultOption = Option{ Retries: 3, RPCPath: share.DefaultRPCPath, ConnectTimeout: 10 * time.Second, SerializeType: protocol.MsgPack, CompressType: protocol.None, BackupLatency: 10 * time.Millisecond, }
DefaultOption is a common option configuration for client.
var InprocessClient = &inprocessClient{ services: make(map[string]interface{}), methods: make(map[string]*reflect.Value), }
InprocessClient is a in-process client for test.
Functions ¶
func Hash ¶
Hash consistently chooses a hash bucket number in the range [0, numBuckets) for the given key. numBuckets must be >= 1.
func JumpConsistentHash ¶
JumpConsistentHash selects a server by serviceMethod and args
Types ¶
type Breaker ¶
Breaker is a CircuitBreaker interface.
var CircuitBreaker Breaker = circuit.NewRateBreaker(0.95, 100)
CircuitBreaker is a default circuit breaker (RateBreaker(0.95, 100)).
type Call ¶
type Call struct { ServicePath string // The name of the service and method to call. ServiceMethod string // The name of the service and method to call. Metadata map[string]string //metadata ResMetadata map[string]string Args interface{} // The argument to the function (*struct). Reply interface{} // The reply from the function (*struct). Error error // After completion, the error status. Done chan *Call // Strobes when call is complete. Raw bool // raw message or not }
Call represents an active RPC.
type Client ¶
type Client struct { Conn net.Conn Plugins PluginContainer ServerMessageChan chan<- *protocol.Message // contains filtered or unexported fields }
Client represents a RPC client.
func (*Client) Call ¶
func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
Call invokes the named function, waits for it to complete, and returns its error status.
func (*Client) Close ¶
Close calls the underlying codec's Close method. If the connection is already shutting down, ErrShutdown is returned.
func (*Client) Go ¶
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.
func (*Client) IsShutdown ¶
IsShutdown client is shutdown or not.
func (*Client) RegisterServerMessageChan ¶
RegisterServerMessageChan registers the channel that receives server requests.
func (*Client) SendRaw ¶
func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
SendRaw sends raw messages. You don't care args and replys.
func (*Client) UnregisterServerMessageChan ¶
func (client *Client) UnregisterServerMessageChan()
UnregisterServerMessageChan removes ServerMessageChan.
type ConsecCircuitBreaker ¶
type ConsecCircuitBreaker struct {
// contains filtered or unexported fields
}
ConsecCircuitBreaker is window sliding CircuitBreaker with failure threshold.
func NewConsecCircuitBreaker ¶
func NewConsecCircuitBreaker(failureThreshold uint64, window time.Duration) *ConsecCircuitBreaker
NewConsecCircuitBreaker returns a new ConsecCircuitBreaker.
type ConsistentAddrStrFunction ¶
type ConsistentAddrStrFunction func(options ...interface{}) string
ConsistentFunction define a hash function Return service address, like "tcp@127.0.0.1:8970"
type FailMode ¶
type FailMode int
FailMode decides how clients action when clients fail to invoke services
type HashServiceAndArgs ¶
HashServiceAndArgs define a hash function
type InprocessDiscovery ¶
type InprocessDiscovery struct { }
InprocessDiscovery is a in-process service discovery. Clients and servers are in one process and communicate without tcp/udp.
func (InprocessDiscovery) Clone ¶
func (d InprocessDiscovery) Clone(servicePath string) ServiceDiscovery
Clone clones this ServiceDiscovery with new servicePath.
func (*InprocessDiscovery) Close ¶
func (d *InprocessDiscovery) Close()
func (InprocessDiscovery) GetServices ¶
func (d InprocessDiscovery) GetServices() []*KVPair
GetServices returns the static server
func (InprocessDiscovery) RemoveWatcher ¶
func (d InprocessDiscovery) RemoveWatcher(ch chan []*KVPair)
func (InprocessDiscovery) WatchService ¶
func (d InprocessDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type MDNSDiscovery ¶
type MDNSDiscovery struct { Timeout time.Duration WatchInterval time.Duration // contains filtered or unexported fields }
MDNSDiscovery is a mdns service discovery. It always returns the registered servers in etcd.
func (MDNSDiscovery) Clone ¶
func (d MDNSDiscovery) Clone(servicePath string) ServiceDiscovery
Clone clones this ServiceDiscovery with new servicePath.
func (*MDNSDiscovery) Close ¶
func (d *MDNSDiscovery) Close()
func (MDNSDiscovery) GetServices ¶
func (d MDNSDiscovery) GetServices() []*KVPair
GetServices returns the servers
func (*MDNSDiscovery) RemoveWatcher ¶
func (d *MDNSDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*MDNSDiscovery) WatchService ¶
func (d *MDNSDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type MultipleServersDiscovery ¶
type MultipleServersDiscovery struct {
// contains filtered or unexported fields
}
MultipleServersDiscovery is a multiple servers service discovery. It always returns the current servers and uses can change servers dynamically.
func (MultipleServersDiscovery) Clone ¶
func (d MultipleServersDiscovery) Clone(servicePath string) ServiceDiscovery
Clone clones this ServiceDiscovery with new servicePath.
func (*MultipleServersDiscovery) Close ¶
func (d *MultipleServersDiscovery) Close()
func (MultipleServersDiscovery) GetServices ¶
func (d MultipleServersDiscovery) GetServices() []*KVPair
GetServices returns the configured server
func (*MultipleServersDiscovery) RemoveWatcher ¶
func (d *MultipleServersDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*MultipleServersDiscovery) Update ¶
func (d *MultipleServersDiscovery) Update(pairs []*KVPair)
Update is used to update servers at runtime.
func (*MultipleServersDiscovery) WatchService ¶
func (d *MultipleServersDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type Option ¶
type Option struct { // Group is used to select the services in the same group. Services set group info in their meta. // If it is empty, clients will ignore group. Group string // Retries retries to send Retries int // TLSConfig for tcp and quic TLSConfig *tls.Config // kcp.BlockCrypt Block interface{} // RPCPath for http connection RPCPath string //ConnectTimeout sets timeout for dialing ConnectTimeout time.Duration // ReadTimeout sets readdeadline for underlying net.Conns ReadTimeout time.Duration // WriteTimeout sets writedeadline for underlying net.Conns WriteTimeout time.Duration // BackupLatency is used for Failbackup mode. rpcx will sends another request if the first response doesn't return in BackupLatency time. BackupLatency time.Duration // Breaker is used to config CircuitBreaker Breaker Breaker SerializeType protocol.SerializeType CompressType protocol.CompressType Heartbeat bool HeartbeatInterval time.Duration }
Option contains all options for creating clients.
type Peer2PeerDiscovery ¶
type Peer2PeerDiscovery struct {
// contains filtered or unexported fields
}
Peer2PeerDiscovery is a peer-to-peer service discovery. It always returns the static server.
func (Peer2PeerDiscovery) Clone ¶
func (d Peer2PeerDiscovery) Clone(servicePath string) ServiceDiscovery
Clone clones this ServiceDiscovery with new servicePath.
func (*Peer2PeerDiscovery) Close ¶
func (d *Peer2PeerDiscovery) Close()
func (Peer2PeerDiscovery) GetServices ¶
func (d Peer2PeerDiscovery) GetServices() []*KVPair
GetServices returns the static server
func (*Peer2PeerDiscovery) RemoveWatcher ¶
func (d *Peer2PeerDiscovery) RemoveWatcher(ch chan []*KVPair)
func (Peer2PeerDiscovery) WatchService ¶
func (d Peer2PeerDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type PluginContainer ¶
type PluginContainer interface { Add(plugin Plugin) Remove(plugin Plugin) All() []Plugin DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error }
PluginContainer represents a plugin container that defines all methods to manage plugins. And it also defines all extension points.
type PostCallPlugin ¶
type PostCallPlugin interface {
DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error
}
PostCallPlugin is invoked after the client calls a server.
type PreCallPlugin ¶
type PreCallPlugin interface {
DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error
}
PreCallPlugin is invoked before the client calls a server.
type RPCClient ¶
type RPCClient interface { Connect(network, address string) error Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error) Close() error RegisterServerMessageChan(ch chan<- *protocol.Message) UnregisterServerMessageChan() IsClosing() bool IsShutdown() bool }
RPCClient is interface that defines one client to call one server.
type SelectMode ¶
type SelectMode int
SelectMode defines the algorithm of selecting a services from candidates.
const ( //RandomSelect is selecting randomly RandomSelect SelectMode = iota //RoundRobin is selecting by round robin RoundRobin //WeightedRoundRobin is selecting by weighted round robin WeightedRoundRobin //WeightedICMP is selecting by weighted Ping time WeightedICMP //ConsistentHash is selecting by hashing ConsistentHash //Closest is selecting the closest server Closest // SelectByUser is selecting by implementation of users SelectByUser = 1000 )
func (SelectMode) String ¶
func (s SelectMode) String() string
type Selector ¶
type Selector interface { Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string UpdateServer(servers map[string]string) }
Selector defines selector that selects one service from candidates.
type ServiceDiscovery ¶
type ServiceDiscovery interface { GetServices() []*KVPair WatchService() chan []*KVPair RemoveWatcher(ch chan []*KVPair) Clone(servicePath string) ServiceDiscovery Close() }
ServiceDiscovery defines ServiceDiscovery of zookeeper, etcd and consul
func NewInprocessDiscovery ¶
func NewInprocessDiscovery() ServiceDiscovery
NewInprocessDiscovery returns a new InprocessDiscovery.
func NewMDNSDiscovery ¶
func NewMDNSDiscovery(service string, timeout time.Duration, watchInterval time.Duration, domain string) ServiceDiscovery
NewMDNSDiscovery returns a new MDNSDiscovery. If domain is empty, use "local." in default.
func NewMDNSDiscoveryTemplate ¶
func NewMDNSDiscoveryTemplate(timeout time.Duration, watchInterval time.Duration, domain string) ServiceDiscovery
NewMDNSDiscoveryTemplate returns a new MDNSDiscovery template.
func NewMultipleServersDiscovery ¶
func NewMultipleServersDiscovery(pairs []*KVPair) ServiceDiscovery
NewMultipleServersDiscovery returns a new MultipleServersDiscovery.
func NewPeer2PeerDiscovery ¶
func NewPeer2PeerDiscovery(server, metadata string) ServiceDiscovery
NewPeer2PeerDiscovery returns a new Peer2PeerDiscovery.
type ServiceError ¶
type ServiceError string
ServiceError is an error from server.
func (ServiceError) Error ¶
func (e ServiceError) Error() string
type XClient ¶
type XClient interface { SetPlugins(plugins PluginContainer) SetSelector(s Selector) ConfigGeoSelector(latitude, longitude float64) Auth(auth string) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error) Close() error }
XClient is an interface that used by client with service discovery and service governance. One XClient is used only for one service. You should create multiple XClient for multiple services.
func NewBidirectionalXClient ¶
func NewBidirectionalXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option, serverMessageChan chan<- *protocol.Message) XClient
NewBidirectionalXClient creates a new xclient that can receive notifications from servers.
func NewXClient ¶
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient
NewXClient creates a XClient that supports service discovery and service governance.
Source Files ¶
- circuit_breaker.go
- client.go
- connection.go
- connection_nonkcp.go
- connection_nonquic.go
- geo_utils.go
- hash_utils.go
- inprocess_client.go
- inprocess_discovery.go
- mdns_discovery.go
- mode.go
- multiple_servers_discovery.go
- peer2peer_discovery.go
- ping_excluded.go
- plugin.go
- selector.go
- smooth-weighted-round-robin.go
- xclient.go