Documentation ¶
Index ¶
- Constants
- Variables
- func CalculateWeight(rtt int) int
- func Hash(key uint64, buckets int32) int32
- func HashString(s string) uint64
- func JumpConsistentHash(len int, options ...interface{}) int
- func Ping(host string) (rtt int, err error)
- type Breaker
- type Call
- type Client
- func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, ...) error
- func (client *Client) Close() error
- func (c *Client) Connect(network, address string) error
- func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, ...) *Call
- func (client *Client) IsClosing() bool
- func (client *Client) IsShutdown() bool
- func (client *Client) RegisterServerMessageChan(ch chan<- *protocol.Message)
- func (c *Client) RemoteAddr() string
- func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
- func (client *Client) UnregisterServerMessageChan()
- type ClientAfterDecodePlugin
- type ClientBeforeEncodePlugin
- type ClientConnectedPlugin
- type ClientConnectionClosePlugin
- type ConnCreatedPlugin
- type ConnFactoryFn
- type ConsecCircuitBreaker
- type ConsistentAddrStrFunction
- type ConsulDiscovery
- func (d *ConsulDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
- func (d *ConsulDiscovery) Close()
- func (d *ConsulDiscovery) GetServices() []*KVPair
- func (d *ConsulDiscovery) RemoveWatcher(ch chan []*KVPair)
- func (d *ConsulDiscovery) SetFilter(filter ServiceDiscoveryFilter)
- func (d *ConsulDiscovery) WatchService() chan []*KVPair
- type DNSDiscovery
- func (d *DNSDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
- func (d *DNSDiscovery) Close()
- func (d *DNSDiscovery) GetServices() []*KVPair
- func (d *DNSDiscovery) RemoveWatcher(ch chan []*KVPair)
- func (d *DNSDiscovery) SetFilter(filter ServiceDiscoveryFilter)
- func (d *DNSDiscovery) WatchService() chan []*KVPair
- type FailMode
- type HashServiceAndArgs
- type KVPair
- type MDNSDiscovery
- func (d *MDNSDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
- func (d *MDNSDiscovery) Close()
- func (d *MDNSDiscovery) GetServices() []*KVPair
- func (d *MDNSDiscovery) RemoveWatcher(ch chan []*KVPair)
- func (d *MDNSDiscovery) SetFilter(filter ServiceDiscoveryFilter)
- func (d *MDNSDiscovery) WatchService() chan []*KVPair
- type MultipleServersDiscovery
- func (d *MultipleServersDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
- func (d *MultipleServersDiscovery) Close()
- func (d *MultipleServersDiscovery) GetServices() []*KVPair
- func (d *MultipleServersDiscovery) RemoveWatcher(ch chan []*KVPair)
- func (d *MultipleServersDiscovery) SetFilter(filter ServiceDiscoveryFilter)
- func (d *MultipleServersDiscovery) Update(pairs []*KVPair)
- func (d *MultipleServersDiscovery) WatchService() chan []*KVPair
- type NacosDiscovery
- func (d *NacosDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
- func (d *NacosDiscovery) Close()
- func (d *NacosDiscovery) GetServices() []*KVPair
- func (d *NacosDiscovery) RemoveWatcher(ch chan []*KVPair)
- func (d *NacosDiscovery) SetFilter(filter ServiceDiscoveryFilter)
- func (d *NacosDiscovery) WatchService() chan []*KVPair
- type OneClient
- func (c *OneClient) Auth(auth string)
- func (c *OneClient) Broadcast(ctx context.Context, servicePath string, serviceMethod string, ...) error
- func (c *OneClient) Call(ctx context.Context, servicePath string, serviceMethod string, ...) error
- func (c *OneClient) Close() error
- func (c *OneClient) ConfigGeoSelector(latitude, longitude float64)
- func (c *OneClient) DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, ...) error
- func (c *OneClient) Fork(ctx context.Context, servicePath string, serviceMethod string, ...) error
- func (c *OneClient) GetPlugins() PluginContainer
- func (c *OneClient) Go(ctx context.Context, servicePath string, serviceMethod string, ...) (*Call, error)
- func (c *OneClient) SendFile(ctx context.Context, fileName string, rateInBytesPerSecond int64, ...) error
- func (c *OneClient) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
- func (c *OneClient) SetPlugins(plugins PluginContainer)
- func (c *OneClient) SetSelector(servicePath string, s Selector)
- func (c *OneClient) Stream(ctx context.Context, meta map[string]string) (net.Conn, error)
- type OneClientPool
- type OpenCensusPlugin
- type OpenTracingPlugin
- type Option
- type Peer2PeerDiscovery
- func (d *Peer2PeerDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
- func (d *Peer2PeerDiscovery) Close()
- func (d *Peer2PeerDiscovery) GetServices() []*KVPair
- func (d *Peer2PeerDiscovery) RemoveWatcher(ch chan []*KVPair)
- func (d *Peer2PeerDiscovery) SetFilter(filter ServiceDiscoveryFilter)
- func (d *Peer2PeerDiscovery) WatchService() chan []*KVPair
- type Plugin
- type PluginContainer
- type PostCallPlugin
- type PreCallPlugin
- type RPCClient
- type RedisDiscovery
- func (d *RedisDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
- func (d *RedisDiscovery) Close()
- func (d *RedisDiscovery) GetServices() []*KVPair
- func (d *RedisDiscovery) RemoveWatcher(ch chan []*KVPair)
- func (d *RedisDiscovery) SetFilter(filter ServiceDiscoveryFilter)
- func (d *RedisDiscovery) WatchService() chan []*KVPair
- type SelectFunc
- type SelectMode
- type SelectNodePlugin
- type Selector
- type ServiceDiscovery
- func NewConsulDiscovery(basePath, servicePath string, consulAddr []string, options *store.Config) (ServiceDiscovery, error)
- func NewConsulDiscoveryStore(basePath string, kv store.Store) (ServiceDiscovery, error)
- func NewConsulDiscoveryTemplate(basePath string, consulAddr []string, options *store.Config) (ServiceDiscovery, error)
- func NewDNSDiscovery(domain string, network string, port int, d time.Duration) (ServiceDiscovery, error)
- func NewMDNSDiscovery(service string, timeout time.Duration, watchInterval time.Duration, ...) (ServiceDiscovery, error)
- func NewMultipleServersDiscovery(pairs []*KVPair) (ServiceDiscovery, error)
- func NewNacosDiscovery(servicePath string, cluster string, clientConfig constant.ClientConfig, ...) (ServiceDiscovery, error)
- func NewNacosDiscoveryWithClient(servicePath string, cluster string, namingClient naming_client.INamingClient) ServiceDiscovery
- func NewPeer2PeerDiscovery(server, metadata string) (ServiceDiscovery, error)
- func NewRedisDiscovery(basePath string, servicePath string, etcdAddr []string, options *store.Config) (ServiceDiscovery, error)
- func NewRedisDiscoveryStore(basePath string, kv store.Store) (ServiceDiscovery, error)
- func NewRedisDiscoveryTemplate(basePath string, etcdAddr []string, options *store.Config) (ServiceDiscovery, error)
- func NewZookeeperDiscovery(basePath string, servicePath string, zkAddr []string, options *store.Config) (ServiceDiscovery, error)
- func NewZookeeperDiscoveryTemplate(basePath string, zkAddr []string, options *store.Config) (ServiceDiscovery, error)
- func NewZookeeperDiscoveryWithStore(basePath string, kv store.Store) (ServiceDiscovery, error)
- type ServiceDiscoveryFilter
- type ServiceError
- type Weighted
- type XClient
- type XClientPool
- type ZookeeperDiscovery
- func (d *ZookeeperDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
- func (d *ZookeeperDiscovery) Close()
- func (d *ZookeeperDiscovery) GetServices() []*KVPair
- func (d *ZookeeperDiscovery) RemoveWatcher(ch chan []*KVPair)
- func (d *ZookeeperDiscovery) SetFilter(filter ServiceDiscoveryFilter)
- func (d *ZookeeperDiscovery) WatchService() chan []*KVPair
Constants ¶
const ( XVersion = "X-RPCX-Version" XMessageType = "X-RPCX-MesssageType" XHeartbeat = "X-RPCX-Heartbeat" XOneway = "X-RPCX-Oneway" XMessageStatusType = "X-RPCX-MessageStatusType" XSerializeType = "X-RPCX-SerializeType" XMessageID = "X-RPCX-MessageID" XServicePath = "X-RPCX-ServicePath" XServiceMethod = "X-RPCX-ServiceMethod" XMeta = "X-RPCX-Meta" XErrorMessage = "X-RPCX-ErrorMessage" )
const ( // ReaderBuffsize is used for bufio reader. ReaderBuffsize = 16 * 1024 // WriterBuffsize is used for bufio writer. WriterBuffsize = 16 * 1024 )
const (
FileTransferBufferSize = 1024
)
Variables ¶
var ( ErrBreakerOpen = errors.New("breaker open") ErrBreakerTimeout = errors.New("breaker time out") )
var ( ErrShutdown = errors.New("connection is shut down") ErrUnsupportedCodec = errors.New("unsupported codec") )
ErrShutdown connection is closed.
var ( // ErrXClientShutdown xclient is shutdown. ErrXClientShutdown = errors.New("xClient is shut down") // ErrXClientNoServer selector can't found one server. ErrXClientNoServer = errors.New("can not found any server") ErrServerUnavailable = errors.New("selected server is unavilable") )
var ConnFactories = map[string]ConnFactoryFn{
"http": newDirectHTTPConn,
"kcp": newDirectKCPConn,
"quic": newDirectQuicConn,
"unix": newDirectConn,
}
var DefaultOption = Option{ Retries: 3, RPCPath: share.DefaultRPCPath, ConnectTimeout: 10 * time.Second, SerializeType: protocol.MsgPack, CompressType: protocol.None, BackupLatency: 10 * time.Millisecond, MaxWaitForHeartbeat: 30 * time.Second, TCPKeepAlivePeriod: time.Minute, }
DefaultOption is a common option configuration for client.
Functions ¶
func CalculateWeight ¶
CalculateWeight converts the rtt to weighted by:
- weight=191 if t <= 10
- weight=201 -t if 10 < t <=200
- weight=1 if 200 < t < 1000
- weight = 0 if t >= 1000
It means servers that ping time t < 10 will be preferred and servers won't be selected if t > 1000. It is hard coded based on Ops experience.
func Hash ¶
Hash consistently chooses a hash bucket number in the range [0, numBuckets) for the given key. numBuckets must be >= 1.
func JumpConsistentHash ¶
JumpConsistentHash selects a server by serviceMethod and args
Types ¶
type Breaker ¶
Breaker is a CircuitBreaker interface.
var CircuitBreaker Breaker = circuit.NewRateBreaker(0.95, 100)
CircuitBreaker is a default circuit breaker (RateBreaker(0.95, 100)).
type Call ¶
type Call struct { ServicePath string // The name of the service and method to call. ServiceMethod string // The name of the service and method to call. Metadata map[string]string // metadata ResMetadata map[string]string Args interface{} // The argument to the function (*struct). Reply interface{} // The reply from the function (*struct). Error error // After completion, the error status. Done chan *Call // Strobes when call is complete. Raw bool // raw message or not }
Call represents an active RPC.
type Client ¶
type Client struct { Conn net.Conn Plugins PluginContainer ServerMessageChanMu sync.RWMutex ServerMessageChan chan<- *protocol.Message // contains filtered or unexported fields }
Client represents a RPC client.
func (*Client) Call ¶
func (client *Client) Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error
Call invokes the named function, waits for it to complete, and returns its error status.
func (*Client) Close ¶
Close calls the underlying connection's Close method. If the connection is already shutting down, ErrShutdown is returned.
func (*Client) Go ¶
func (client *Client) Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call
Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.
func (*Client) IsShutdown ¶
IsShutdown client is shutdown or not.
func (*Client) RegisterServerMessageChan ¶
RegisterServerMessageChan registers the channel that receives server requests.
func (*Client) RemoteAddr ¶
RemoteAddr returns the remote address.
func (*Client) SendRaw ¶
func (client *Client) SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error)
SendRaw sends raw messages. You don't care args and replys.
func (*Client) UnregisterServerMessageChan ¶
func (client *Client) UnregisterServerMessageChan()
UnregisterServerMessageChan removes ServerMessageChan.
type ClientAfterDecodePlugin ¶
ClientAfterDecodePlugin is invoked when the message is decoded.
type ClientBeforeEncodePlugin ¶
ClientBeforeEncodePlugin is invoked when the message is encoded and sent.
type ClientConnectedPlugin ¶
ClientConnectedPlugin is invoked when the client has connected the server.
type ClientConnectionClosePlugin ¶
ClientConnectionClosePlugin is invoked when the connection is closing.
type ConnCreatedPlugin ¶
ConnCreatedPlugin is invoked when the client connection has created.
type ConsecCircuitBreaker ¶
type ConsecCircuitBreaker struct {
// contains filtered or unexported fields
}
ConsecCircuitBreaker is window sliding CircuitBreaker with failure threshold.
func NewConsecCircuitBreaker ¶
func NewConsecCircuitBreaker(failureThreshold uint64, window time.Duration) *ConsecCircuitBreaker
NewConsecCircuitBreaker returns a new ConsecCircuitBreaker.
func (*ConsecCircuitBreaker) Call ¶
func (cb *ConsecCircuitBreaker) Call(fn func() error, d time.Duration) error
Call Circuit function
func (*ConsecCircuitBreaker) Fail ¶
func (cb *ConsecCircuitBreaker) Fail()
func (*ConsecCircuitBreaker) Ready ¶
func (cb *ConsecCircuitBreaker) Ready() bool
func (*ConsecCircuitBreaker) Success ¶
func (cb *ConsecCircuitBreaker) Success()
type ConsistentAddrStrFunction ¶
type ConsistentAddrStrFunction func(options ...interface{}) string
ConsistentFunction define a hash function Return service address, like "tcp@127.0.0.1:8970"
type ConsulDiscovery ¶
type ConsulDiscovery struct { // -1 means it always retry to watch until zookeeper is ok, 0 means no retry. RetriesAfterWatchFailed int // contains filtered or unexported fields }
ConsulDiscovery is a consul service discovery. It always returns the registered servers in consul.
func (*ConsulDiscovery) Clone ¶
func (d *ConsulDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
Clone clones this ServiceDiscovery with new servicePath.
func (*ConsulDiscovery) Close ¶
func (d *ConsulDiscovery) Close()
func (*ConsulDiscovery) GetServices ¶
func (d *ConsulDiscovery) GetServices() []*KVPair
GetServices returns the servers
func (*ConsulDiscovery) RemoveWatcher ¶
func (d *ConsulDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*ConsulDiscovery) SetFilter ¶
func (d *ConsulDiscovery) SetFilter(filter ServiceDiscoveryFilter)
SetFilter sets the filer.
func (*ConsulDiscovery) WatchService ¶
func (d *ConsulDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type DNSDiscovery ¶
type DNSDiscovery struct {
// contains filtered or unexported fields
}
DNSDiscovery is based on DNS a record. You must set port and network info when you create the DNSDiscovery.
func (*DNSDiscovery) Clone ¶
func (d *DNSDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
Clone clones this ServiceDiscovery with new servicePath.
func (*DNSDiscovery) Close ¶
func (d *DNSDiscovery) Close()
func (*DNSDiscovery) GetServices ¶
func (d *DNSDiscovery) GetServices() []*KVPair
GetServices returns the static server
func (*DNSDiscovery) RemoveWatcher ¶
func (d *DNSDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*DNSDiscovery) SetFilter ¶
func (d *DNSDiscovery) SetFilter(filter ServiceDiscoveryFilter)
SetFilter sets the filer.
func (*DNSDiscovery) WatchService ¶
func (d *DNSDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type FailMode ¶
type FailMode int
FailMode decides how clients action when clients fail to invoke services
func FailModeString ¶
FailModeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func FailModeValues ¶
func FailModeValues() []FailMode
FailModeValues returns all values of the enum
func (FailMode) IsAFailMode ¶
IsAFailMode returns "true" if the value is listed in the enum definition. "false" otherwise
type HashServiceAndArgs ¶
HashServiceAndArgs define a hash function
type MDNSDiscovery ¶
type MDNSDiscovery struct { Timeout time.Duration WatchInterval time.Duration // contains filtered or unexported fields }
MDNSDiscovery is a mdns service discovery. It always returns the registered servers in etcd.
func (*MDNSDiscovery) Clone ¶
func (d *MDNSDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
Clone clones this ServiceDiscovery with new servicePath.
func (*MDNSDiscovery) Close ¶
func (d *MDNSDiscovery) Close()
func (*MDNSDiscovery) GetServices ¶
func (d *MDNSDiscovery) GetServices() []*KVPair
GetServices returns the servers
func (*MDNSDiscovery) RemoveWatcher ¶
func (d *MDNSDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*MDNSDiscovery) SetFilter ¶
func (d *MDNSDiscovery) SetFilter(filter ServiceDiscoveryFilter)
SetFilter sets the filer.
func (*MDNSDiscovery) WatchService ¶
func (d *MDNSDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type MultipleServersDiscovery ¶
type MultipleServersDiscovery struct {
// contains filtered or unexported fields
}
MultipleServersDiscovery is a multiple servers service discovery. It always returns the current servers and uses can change servers dynamically.
func (*MultipleServersDiscovery) Clone ¶
func (d *MultipleServersDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
Clone clones this ServiceDiscovery with new servicePath.
func (*MultipleServersDiscovery) Close ¶
func (d *MultipleServersDiscovery) Close()
func (*MultipleServersDiscovery) GetServices ¶
func (d *MultipleServersDiscovery) GetServices() []*KVPair
GetServices returns the configured server
func (*MultipleServersDiscovery) RemoveWatcher ¶
func (d *MultipleServersDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*MultipleServersDiscovery) SetFilter ¶
func (d *MultipleServersDiscovery) SetFilter(filter ServiceDiscoveryFilter)
SetFilter sets the filer.
func (*MultipleServersDiscovery) Update ¶
func (d *MultipleServersDiscovery) Update(pairs []*KVPair)
Update is used to update servers at runtime.
func (*MultipleServersDiscovery) WatchService ¶
func (d *MultipleServersDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type NacosDiscovery ¶
type NacosDiscovery struct { // nacos client config ClientConfig constant.ClientConfig // nacos server config ServerConfig []constant.ServerConfig Cluster string RetriesAfterWatchFailed int // contains filtered or unexported fields }
NacosDiscovery is a nacos service discovery. It always returns the registered servers in nacos.
func (*NacosDiscovery) Clone ¶
func (d *NacosDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
Clone clones this ServiceDiscovery with new servicePath.
func (*NacosDiscovery) Close ¶
func (d *NacosDiscovery) Close()
func (*NacosDiscovery) GetServices ¶
func (d *NacosDiscovery) GetServices() []*KVPair
GetServices returns the servers
func (*NacosDiscovery) RemoveWatcher ¶
func (d *NacosDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*NacosDiscovery) SetFilter ¶
func (d *NacosDiscovery) SetFilter(filter ServiceDiscoveryFilter)
SetFilter sets the filer.
func (*NacosDiscovery) WatchService ¶
func (d *NacosDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type OneClient ¶
type OneClient struct { Plugins PluginContainer // contains filtered or unexported fields }
OneClient wraps servicesPath and XClients. Users can use a shared oneclient to access multiple services.
func NewBidirectionalOneClient ¶
func NewBidirectionalOneClient(failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option, serverMessageChan chan<- *protocol.Message) *OneClient
NewBidirectionalOneClient creates a new xclient that can receive notifications from servers.
func NewOneClient ¶
func NewOneClient(failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) *OneClient
NewOneClient creates a OneClient that supports service discovery and service governance.
func (*OneClient) Broadcast ¶
func (c *OneClient) Broadcast(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) error
Broadcast sends requests to all servers and Success only when all servers return OK. FailMode and SelectMode are meanless for this method. Please set timeout to avoid hanging.
func (*OneClient) Call ¶
func (c *OneClient) Call(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) error
Call invokes the named function, waits for it to complete, and returns its error status. It handles errors base on FailMode.
func (*OneClient) ConfigGeoSelector ¶
ConfigGeoSelector sets location of client's latitude and longitude, and use newGeoSelector.
func (*OneClient) DownloadFile ¶
func (*OneClient) Fork ¶
func (c *OneClient) Fork(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}) error
Fork sends requests to all servers and Success once one server returns OK. FailMode and SelectMode are meanless for this method.
func (*OneClient) GetPlugins ¶
func (c *OneClient) GetPlugins() PluginContainer
func (*OneClient) Go ¶
func (c *OneClient) Go(ctx context.Context, servicePath string, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error)
Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash. It does not use FailMode.
func (*OneClient) SetPlugins ¶
func (c *OneClient) SetPlugins(plugins PluginContainer)
SetPlugins sets client's plugins.
func (*OneClient) SetSelector ¶
SetSelector sets customized selector by users.
type OneClientPool ¶
type OneClientPool struct {
// contains filtered or unexported fields
}
OneClientPool is a oneclient pool with fixed size. It uses roundrobin algorithm to call its xclients. All oneclients share the same configurations such as ServiceDiscovery and serverMessageChan.
func NewBidirectionalOneClientPool ¶
func NewBidirectionalOneClientPool(count int, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option, serverMessageChan chan<- *protocol.Message) *OneClientPool
NewBidirectionalOneClientPool creates a BidirectionalOneClient pool with fixed size.
func NewOneClientPool ¶
func NewOneClientPool(count int, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) *OneClientPool
NewOneClientPool creates a fixed size OneClient pool.
func (*OneClientPool) Auth ¶
func (c *OneClientPool) Auth(auth string)
Auth sets s token for Authentication.
func (OneClientPool) Close ¶
func (p OneClientPool) Close()
Close this pool. Please make sure it won't be used any more.
func (*OneClientPool) Get ¶
func (p *OneClientPool) Get() *OneClient
Get returns a OneClient. It does not remove this OneClient from its cache so you don't need to put it back. Don't close this OneClient because maybe other goroutines are using this OneClient.
type Option ¶
type Option struct { // Group is used to select the services in the same group. Services set group info in their meta. // If it is empty, clients will ignore group. Group string // Retries retries to send Retries int // TLSConfig for tcp and quic TLSConfig *tls.Config // kcp.BlockCrypt Block interface{} // RPCPath for http connection RPCPath string // ConnectTimeout sets timeout for dialing ConnectTimeout time.Duration // ReadTimeout sets max idle time for underlying net.Conns IdleTimeout time.Duration // BackupLatency is used for Failbackup mode. rpcx will sends another request if the first response doesn't return in BackupLatency time. BackupLatency time.Duration // Breaker is used to config CircuitBreaker GenBreaker func() Breaker SerializeType protocol.SerializeType CompressType protocol.CompressType // send heartbeat message to service and check responses Heartbeat bool // interval for heartbeat HeartbeatInterval time.Duration MaxWaitForHeartbeat time.Duration // TCPKeepAlive, if it is zero we don't set keepalive TCPKeepAlivePeriod time.Duration }
Option contains all options for creating clients.
type Peer2PeerDiscovery ¶
type Peer2PeerDiscovery struct {
// contains filtered or unexported fields
}
Peer2PeerDiscovery is a peer-to-peer service discovery. It always returns the static server.
func (*Peer2PeerDiscovery) Clone ¶
func (d *Peer2PeerDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
Clone clones this ServiceDiscovery with new servicePath.
func (*Peer2PeerDiscovery) Close ¶
func (d *Peer2PeerDiscovery) Close()
func (*Peer2PeerDiscovery) GetServices ¶
func (d *Peer2PeerDiscovery) GetServices() []*KVPair
GetServices returns the static server
func (*Peer2PeerDiscovery) RemoveWatcher ¶
func (d *Peer2PeerDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*Peer2PeerDiscovery) SetFilter ¶
func (d *Peer2PeerDiscovery) SetFilter(filter ServiceDiscoveryFilter)
SetFilter sets the filer.
func (*Peer2PeerDiscovery) WatchService ¶
func (d *Peer2PeerDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type PluginContainer ¶
type PluginContainer interface { Add(plugin Plugin) Remove(plugin Plugin) All() []Plugin DoConnCreated(net.Conn) (net.Conn, error) DoClientConnected(net.Conn) (net.Conn, error) DoClientConnectionClose(net.Conn) error DoPreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error DoPostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error DoClientBeforeEncode(*protocol.Message) error DoClientAfterDecode(*protocol.Message) error DoWrapSelect(SelectFunc) SelectFunc }
PluginContainer represents a plugin container that defines all methods to manage plugins. And it also defines all extension points.
func NewPluginContainer ¶
func NewPluginContainer() PluginContainer
type PostCallPlugin ¶
type PostCallPlugin interface {
PostCall(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, err error) error
}
PostCallPlugin is invoked after the client calls a server.
type PreCallPlugin ¶
type PreCallPlugin interface {
PreCall(ctx context.Context, servicePath, serviceMethod string, args interface{}) error
}
PreCallPlugin is invoked before the client calls a server.
type RPCClient ¶
type RPCClient interface { Connect(network, address string) error Go(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call Call(ctx context.Context, servicePath, serviceMethod string, args interface{}, reply interface{}) error SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error) Close() error RemoteAddr() string RegisterServerMessageChan(ch chan<- *protocol.Message) UnregisterServerMessageChan() IsClosing() bool IsShutdown() bool }
RPCClient is interface that defines one client to call one server.
type RedisDiscovery ¶
type RedisDiscovery struct { // -1 means it always retry to watch until zookeeper is ok, 0 means no retry. RetriesAfterWatchFailed int // contains filtered or unexported fields }
RedisDiscovery is a redis service discovery. It always returns the registered servers in redis.
func (*RedisDiscovery) Clone ¶
func (d *RedisDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
Clone clones this ServiceDiscovery with new servicePath.
func (*RedisDiscovery) Close ¶
func (d *RedisDiscovery) Close()
func (*RedisDiscovery) GetServices ¶
func (d *RedisDiscovery) GetServices() []*KVPair
GetServices returns the servers
func (*RedisDiscovery) RemoveWatcher ¶
func (d *RedisDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*RedisDiscovery) SetFilter ¶
func (d *RedisDiscovery) SetFilter(filter ServiceDiscoveryFilter)
SetFilter sets the filer.
func (*RedisDiscovery) WatchService ¶
func (d *RedisDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
type SelectFunc ¶
type SelectMode ¶
type SelectMode int
SelectMode defines the algorithm of selecting a services from candidates.
const ( //RandomSelect is selecting randomly RandomSelect SelectMode = iota //RoundRobin is selecting by round robin RoundRobin //WeightedRoundRobin is selecting by weighted round robin WeightedRoundRobin //WeightedICMP is selecting by weighted Ping time WeightedICMP //ConsistentHash is selecting by hashing ConsistentHash //Closest is selecting the closest server Closest // SelectByUser is selecting by implementation of users SelectByUser = 1000 )
func SelectModeString ¶
func SelectModeString(s string) (SelectMode, error)
SelectModeString retrieves an enum value from the enum constants string name. Throws an error if the param is not part of the enum.
func SelectModeValues ¶
func SelectModeValues() []SelectMode
SelectModeValues returns all values of the enum
func (SelectMode) IsASelectMode ¶
func (i SelectMode) IsASelectMode() bool
IsASelectMode returns "true" if the value is listed in the enum definition. "false" otherwise
func (SelectMode) String ¶
func (i SelectMode) String() string
type SelectNodePlugin ¶
type SelectNodePlugin interface {
WrapSelect(SelectFunc) SelectFunc
}
SelectNodePlugin can interrupt selecting of xclient and add customized logics such as skipping some nodes.
type Selector ¶
type Selector interface { Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string // SelectFunc UpdateServer(servers map[string]string) }
Selector defines selector that selects one service from candidates.
type ServiceDiscovery ¶
type ServiceDiscovery interface { GetServices() []*KVPair WatchService() chan []*KVPair RemoveWatcher(ch chan []*KVPair) Clone(servicePath string) (ServiceDiscovery, error) SetFilter(ServiceDiscoveryFilter) Close() }
ServiceDiscovery defines ServiceDiscovery of zookeeper, etcd and consul
func NewConsulDiscovery ¶
func NewConsulDiscovery(basePath, servicePath string, consulAddr []string, options *store.Config) (ServiceDiscovery, error)
NewConsulDiscovery returns a new ConsulDiscovery.
func NewConsulDiscoveryStore ¶
func NewConsulDiscoveryStore(basePath string, kv store.Store) (ServiceDiscovery, error)
NewConsulDiscoveryStore returns a new ConsulDiscovery with specified store.
func NewConsulDiscoveryTemplate ¶
func NewConsulDiscoveryTemplate(basePath string, consulAddr []string, options *store.Config) (ServiceDiscovery, error)
NewConsulDiscoveryTemplate returns a new ConsulDiscovery template.
func NewDNSDiscovery ¶
func NewDNSDiscovery(domain string, network string, port int, d time.Duration) (ServiceDiscovery, error)
NewPeer2PeerDiscovery returns a new Peer2PeerDiscovery.
func NewMDNSDiscovery ¶
func NewMDNSDiscovery(service string, timeout time.Duration, watchInterval time.Duration, domain string) (ServiceDiscovery, error)
NewMDNSDiscovery returns a new MDNSDiscovery. If domain is empty, use "local." in default.
func NewMultipleServersDiscovery ¶
func NewMultipleServersDiscovery(pairs []*KVPair) (ServiceDiscovery, error)
NewMultipleServersDiscovery returns a new MultipleServersDiscovery.
func NewNacosDiscovery ¶
func NewNacosDiscovery(servicePath string, cluster string, clientConfig constant.ClientConfig, serverConfig []constant.ServerConfig) (ServiceDiscovery, error)
NewNacosDiscovery returns a new NacosDiscovery.
func NewNacosDiscoveryWithClient ¶
func NewNacosDiscoveryWithClient(servicePath string, cluster string, namingClient naming_client.INamingClient) ServiceDiscovery
func NewPeer2PeerDiscovery ¶
func NewPeer2PeerDiscovery(server, metadata string) (ServiceDiscovery, error)
NewPeer2PeerDiscovery returns a new Peer2PeerDiscovery.
func NewRedisDiscovery ¶
func NewRedisDiscovery(basePath string, servicePath string, etcdAddr []string, options *store.Config) (ServiceDiscovery, error)
NewRedisDiscovery returns a new RedisDiscovery.
func NewRedisDiscoveryStore ¶
func NewRedisDiscoveryStore(basePath string, kv store.Store) (ServiceDiscovery, error)
NewRedisDiscoveryStore return a new RedisDiscovery with specified store.
func NewRedisDiscoveryTemplate ¶
func NewRedisDiscoveryTemplate(basePath string, etcdAddr []string, options *store.Config) (ServiceDiscovery, error)
NewRedisDiscoveryTemplate returns a new RedisDiscovery template.
func NewZookeeperDiscovery ¶
func NewZookeeperDiscovery(basePath string, servicePath string, zkAddr []string, options *store.Config) (ServiceDiscovery, error)
NewZookeeperDiscovery returns a new ZookeeperDiscovery.
func NewZookeeperDiscoveryTemplate ¶
func NewZookeeperDiscoveryTemplate(basePath string, zkAddr []string, options *store.Config) (ServiceDiscovery, error)
NewZookeeperDiscoveryTemplate returns a new ZookeeperDiscovery template.
func NewZookeeperDiscoveryWithStore ¶
func NewZookeeperDiscoveryWithStore(basePath string, kv store.Store) (ServiceDiscovery, error)
NewZookeeperDiscoveryWithStore returns a new ZookeeperDiscovery with specified store.
type ServiceDiscoveryFilter ¶
ServiceDiscoveryFilter can be used to filter services with customized logics. Servers can register its services but clients can use the customized filter to select some services. It returns true if ServiceDiscovery wants to use this service, otherwise it returns false.
type ServiceError ¶
type ServiceError string
ServiceError is an error from server.
func (ServiceError) Error ¶
func (e ServiceError) Error() string
type XClient ¶
type XClient interface { SetPlugins(plugins PluginContainer) GetPlugins() PluginContainer SetSelector(s Selector) ConfigGeoSelector(latitude, longitude float64) Auth(auth string) Go(ctx context.Context, serviceMethod string, args interface{}, reply interface{}, done chan *Call) (*Call, error) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error Broadcast(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error Fork(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) error SendRaw(ctx context.Context, r *protocol.Message) (map[string]string, []byte, error) SendFile(ctx context.Context, fileName string, rateInBytesPerSecond int64, meta map[string]string) error DownloadFile(ctx context.Context, requestFileName string, saveTo io.Writer, meta map[string]string) error Stream(ctx context.Context, meta map[string]string) (net.Conn, error) Close() error }
XClient is an interface that used by client with service discovery and service governance. One XClient is used only for one service. You should create multiple XClient for multiple services.
func NewBidirectionalXClient ¶
func NewBidirectionalXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option, serverMessageChan chan<- *protocol.Message) XClient
NewBidirectionalXClient creates a new xclient that can receive notifications from servers.
func NewXClient ¶
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient
NewXClient creates a XClient that supports service discovery and service governance.
type XClientPool ¶
type XClientPool struct {
// contains filtered or unexported fields
}
XClientPool is a xclient pool with fixed size. It uses roundrobin algorithm to call its xclients. All xclients share the same configurations such as ServiceDiscovery and serverMessageChan.
func NewBidirectionalXClientPool ¶
func NewBidirectionalXClientPool(count int, servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option, serverMessageChan chan<- *protocol.Message) *XClientPool
NewBidirectionalXClientPool creates a BidirectionalXClient pool with fixed size.
func NewXClientPool ¶
func NewXClientPool(count int, servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) *XClientPool
NewXClientPool creates a fixed size XClient pool.
func (*XClientPool) Auth ¶
func (c *XClientPool) Auth(auth string)
Auth sets s token for Authentication.
func (*XClientPool) Close ¶
func (p *XClientPool) Close()
Close this pool. Please make sure it won't be used any more.
func (*XClientPool) Get ¶
func (p *XClientPool) Get() XClient
Get returns a xclient. It does not remove this xclient from its cache so you don't need to put it back. Don't close this xclient because maybe other goroutines are using this xclient.
type ZookeeperDiscovery ¶
type ZookeeperDiscovery struct { // -1 means it always retry to watch until zookeeper is ok, 0 means no retry. RetriesAfterWatchFailed int // contains filtered or unexported fields }
ZookeeperDiscovery is a zoopkeer service discovery. It always returns the registered servers in zookeeper.
func (*ZookeeperDiscovery) Clone ¶
func (d *ZookeeperDiscovery) Clone(servicePath string) (ServiceDiscovery, error)
Clone clones this ServiceDiscovery with new servicePath.
func (*ZookeeperDiscovery) Close ¶
func (d *ZookeeperDiscovery) Close()
func (*ZookeeperDiscovery) GetServices ¶
func (d *ZookeeperDiscovery) GetServices() []*KVPair
GetServices returns the servers
func (*ZookeeperDiscovery) RemoveWatcher ¶
func (d *ZookeeperDiscovery) RemoveWatcher(ch chan []*KVPair)
func (*ZookeeperDiscovery) SetFilter ¶
func (d *ZookeeperDiscovery) SetFilter(filter ServiceDiscoveryFilter)
SetFilter sets the filer.
func (*ZookeeperDiscovery) WatchService ¶
func (d *ZookeeperDiscovery) WatchService() chan []*KVPair
WatchService returns a nil chan.
Source Files ¶
- circuit_breaker.go
- client.go
- connection.go
- connection_nonkcp.go
- connection_nonquic.go
- consul_discovery.go
- dns_discovery.go
- failmode_enumer.go
- geo_utils.go
- hash_utils.go
- mdns_discovery.go
- mode.go
- multiple_servers_discovery.go
- nacos_discovery.go
- oneclient.go
- oneclient_pool.go
- opencensus.go
- opentracing.go
- peer2peer_discovery.go
- ping_utils.go
- plugin.go
- redis_discovery.go
- selectmode_enumer.go
- selector.go
- smooth-weighted-round-robin.go
- xclient.go
- xclient_pool.go
- zookeeper_discovery.go