Documentation ¶
Overview ¶
Package gorpc provides simple RPC API for highload projects.
Gorpc has the following features:
- Easy-to-use API.
- Optimized for high load (>10K qps).
- Uses as low network bandwidth as possible.
- Minimizes the number of TCP connections in TIME_WAIT and WAIT_CLOSE states.
- Minimizes the number of send() and recv() syscalls.
- Provides ability to use arbitrary underlying transport. By default TCP is used, but TLS and UNIX sockets are already available.
Index ¶
- Constants
- Variables
- func NilErrorLogger(format string, args ...interface{})
- func RegisterType(x interface{})
- func SetErrorLogger(f LoggerFunc)
- type AsyncResult
- type Batch
- type BatchResult
- type Client
- func (c *Client) Call(request interface{}) (response interface{}, err error)
- func (c *Client) CallAsync(request interface{}) (*AsyncResult, error)
- func (c *Client) CallTimeout(request interface{}, timeout time.Duration) (response interface{}, err error)
- func (c *Client) NewBatch() *Batch
- func (c *Client) PendingRequestsCount() int
- func (c *Client) Send(request interface{}) error
- func (c *Client) Start()
- func (c *Client) Stop()
- type ClientError
- type ConnStats
- type DialFunc
- type Dispatcher
- func (d *Dispatcher) AddFunc(funcName string, f interface{})
- func (d *Dispatcher) AddService(serviceName string, service interface{})
- func (d *Dispatcher) NewFuncClient(c *Client) *DispatcherClient
- func (d *Dispatcher) NewHandlerFunc() HandlerFunc
- func (d *Dispatcher) NewServiceClient(serviceName string, c *Client) *DispatcherClient
- type DispatcherBatch
- type DispatcherClient
- func (dc *DispatcherClient) Call(funcName string, request interface{}) (response interface{}, err error)
- func (dc *DispatcherClient) CallAsync(funcName string, request interface{}) (*AsyncResult, error)
- func (dc *DispatcherClient) CallTimeout(funcName string, request interface{}, timeout time.Duration) (response interface{}, err error)
- func (dc *DispatcherClient) NewBatch() *DispatcherBatch
- func (dc *DispatcherClient) Send(funcName string, request interface{}) error
- type HandlerFunc
- type Listener
- type LoggerFunc
- type OnConnectFunc
- type Server
Examples ¶
Constants ¶
const ( // DefaultConcurrency is the default number of concurrent rpc calls // the server can process. DefaultConcurrency = 8 * 1024 // DefaultRequestTimeout is the default timeout for client request. DefaultRequestTimeout = 20 * time.Second // DefaultRedialDelay is the default delay before trying to redial. DefaultRedialDelay = 1 * time.Second // DefaultPendingMessages is the default number of pending messages // handled by Client and Server. DefaultPendingMessages = 32 * 1024 // DefaultFlushDelay is the default delay between message flushes // on Client and Server. DefaultFlushDelay = -1 // DefaultBufferSize is the default size for Client and Server buffers. DefaultBufferSize = 64 * 1024 )
Variables ¶
var ErrCanceled = &ClientError{ Canceled: true, err: fmt.Errorf("the call has been canceled"), }
ErrCanceled may be returned from rpc call if AsyncResult.Cancel has been called.
Functions ¶
func NilErrorLogger ¶
func NilErrorLogger(format string, args ...interface{})
NilErrorLogger discards all error messages.
Pass NilErrorLogger to SetErrorLogger() in order to suppress error log generated by gorpc.
func RegisterType ¶
func RegisterType(x interface{})
RegisterType registers the given type to send via rpc.
The client must register all the response types the server may send. The server must register all the request types the client may send.
There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.
There is no need in registering argument and return value types for functions and methods registered via Dispatcher.
func SetErrorLogger ¶
func SetErrorLogger(f LoggerFunc)
SetErrorLogger sets the given error logger to use in gorpc.
By default log.Printf is used for error logging.
Types ¶
type AsyncResult ¶
type AsyncResult struct { // The response can be read only after <-Done unblocks. Response interface{} // The error can be read only after <-Done unblocks. // The error can be casted to ClientError. Error error // Response and Error become available after <-Done unblocks. Done <-chan struct{} // contains filtered or unexported fields }
AsyncResult is a result returned from Client.CallAsync().
func (*AsyncResult) Cancel ¶
func (m *AsyncResult) Cancel()
Cancel cancels async call.
Canceled call isn't sent to the server unless it is already sent there. Canceled call may successfully complete if it has been already sent to the server before Cancel call.
It is safe calling this function multiple times from concurrently running goroutines.
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch allows grouping and executing multiple RPCs in a single batch.
Batch may be created via Client.NewBatch().
func (*Batch) Add ¶
func (b *Batch) Add(request interface{}) *BatchResult
Add ads new request to the RPC batch.
The order of batched RPCs execution on the server is unspecified.
All the requests added to the batch are sent to the server at once when Batch.Call*() is called.
Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.
It is safe adding multiple requests to the same batch from concurrently running goroutines.
func (*Batch) AddSkipResponse ¶
func (b *Batch) AddSkipResponse(request interface{})
AddSkipResponse adds new request to the RPC batch and doesn't care about the response.
The order of batched RPCs execution on the server is unspecified.
All the requests added to the batch are sent to the server at once when Batch.Call*() is called.
All the request types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.
It is safe adding multiple requests to the same batch from concurrently running goroutines.
func (*Batch) Call ¶
Call calls all the RPCs added via Batch.Add().
The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.
The caller may read all BatchResult contents returned from Batch.Add() after the Call returns.
It is guaranteed that all <-BatchResult.Done channels are unblocked after the Call returns.
func (*Batch) CallTimeout ¶
CallTimeout calls all the RPCs added via Batch.Add() and waits for all the RPC responses during the given timeout.
The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.
The caller may read all BatchResult contents returned from Batch.Add() after the CallTimeout returns.
It is guaranteed that all <-BatchResult.Done channels are unblocked after the CallTimeout returns.
type BatchResult ¶
type BatchResult struct { // The response can be read only after Batch.Call*() returns. Response interface{} // The error can be read only after Batch.Call*() returns. // The error can be casted to ClientError. Error error // <-Done unblocks after Batch.Call*() returns. // Response and Error become available after <-Done unblocks. Done <-chan struct{} // contains filtered or unexported fields }
BatchResult is a result returned from Batch.Add*().
type Client ¶
type Client struct { // Server address to connect to. // // The address format depends on the underlying transport provided // by Client.Dial. The following transports are provided out of the box: // * TCP - see NewTCPClient() and NewTCPServer(). // * TLS - see NewTLSClient() and NewTLSServer(). // * Unix sockets - see NewUnixClient() and NewUnixServer(). // // By default TCP transport is used. Addr string // The number of concurrent connections the client should establish // to the sever. // By default only one connection is established. Conns int // The maximum number of pending requests in the queue. // // The number of pending requsts should exceed the expected number // of concurrent goroutines calling client's methods. // Otherwise a lot of ClientError.Overflow errors may appear. // // Default is DefaultPendingMessages. PendingRequests int // Delay between request flushes. // // Negative values lead to immediate requests' sending to the server // without their buffering. This minimizes rpc latency at the cost // of higher CPU and network usage. // // Default value is DefaultFlushDelay. FlushDelay time.Duration // Maximum request time. // Default value is DefaultRequestTimeout. RequestTimeout time.Duration // Time to wait for redial. // Default value is DefaultRedialDelay. RedialDelay time.Duration // Disable data compression. // By default data compression is enabled. DisableCompression bool // Size of send buffer per each underlying connection in bytes. // Default value is DefaultBufferSize. SendBufferSize int // Size of recv buffer per each underlying connection in bytes. // Default value is DefaultBufferSize. RecvBufferSize int // OnConnect is called whenever connection to server is established. // The callback can be used for authentication/authorization/encryption // and/or for custom transport wrapping. // // See also Dial callback, which can be used for sophisticated // transport implementation. OnConnect OnConnectFunc // The client calls this callback when it needs new connection // to the server. // The client passes Client.Addr into Dial(). // // Override this callback if you want custom underlying transport // and/or authentication/authorization. // Don't forget overriding Server.Listener accordingly. // // See also OnConnect for authentication/authorization purposes. // // * NewTLSClient() and NewTLSServer() can be used for encrypted rpc. // * NewUnixClient() and NewUnixServer() can be used for fast local // inter-process rpc. // // By default it returns TCP connections established to the Client.Addr. Dial DialFunc // LogError is used for error logging. // // By default the function set via SetErrorLogger() is used. LogError LoggerFunc // Connection statistics. // // The stats doesn't reset automatically. Feel free resetting it // any time you wish. Stats ConnStats // contains filtered or unexported fields }
Client implements RPC client.
The client must be started with Client.Start() before use.
It is absolutely safe and encouraged using a single client across arbitrary number of concurrently running goroutines.
Default client settings are optimized for high load, so don't override them without valid reason.
func NewTCPClient ¶
NewTCPClient creates a client connecting over TCP to the server listening to the given addr.
The returned client must be started after optional settings' adjustment.
The corresponding server must be created with NewTCPServer().
func NewTLSClient ¶
NewTLSClient creates a client connecting over TLS (aka SSL) to the server listening to the given addr using the given TLS config.
The returned client must be started after optional settings' adjustment.
The corresponding server must be created with NewTLSServer().
func NewUnixClient ¶
NewUnixClient creates a client connecting over unix socket to the server listening to the given addr.
The returned client must be started after optional settings' adjustment.
The corresponding server must be created with NewUnixServer().
func (*Client) Call ¶
Call sends the given request to the server and obtains response from the server. Returns non-nil error if the response cannot be obtained during Client.RequestTimeout or server connection problems occur. The returned error can be casted to ClientError.
Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.
Hint: use Dispatcher for distinct calls' construction.
Don't forget starting the client with Client.Start() before calling Client.Call().
func (*Client) CallAsync ¶
func (c *Client) CallAsync(request interface{}) (*AsyncResult, error)
CallAsync starts async rpc call.
Rpc call is complete after <-AsyncResult.Done unblocks. If you want canceling the request, just throw away the returned AsyncResult.
CallAsync doesn't respect Client.RequestTimeout - response timeout may be controlled by the caller via something like:
r := c.CallAsync("foobar") select { case <-time.After(c.RequestTimeout): log.Printf("rpc timeout!") case <-r.Done: processResponse(r.Response, r.Error) }
Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.
Don't forget starting the client with Client.Start() before calling Client.CallAsync().
func (*Client) CallTimeout ¶
func (c *Client) CallTimeout(request interface{}, timeout time.Duration) (response interface{}, err error)
CallTimeout sends the given request to the server and obtains response from the server. Returns non-nil error if the response cannot be obtained during the given timeout or server connection problems occur. The returned error can be casted to ClientError.
Request and response types may be arbitrary. All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.
Hint: use Dispatcher for distinct calls' construction.
Don't forget starting the client with Client.Start() before calling Client.Call().
func (*Client) NewBatch ¶
NewBatch creates new RPC batch.
It is safe creating multiple concurrent batches from a single client.
Don't forget starting the client with Client.Start() before working with batched RPC.
func (*Client) PendingRequestsCount ¶
PendingRequestsCount returns the instant number of pending requests.
The main purpose of this function is to use in load-balancing schemes where load should be balanced between multiple rpc clients.
Don't forget starting the client with Client.Start() before calling this function.
func (*Client) Send ¶
Send sends the given request to the server and doesn't wait for response.
Since this is 'fire and forget' function, which never waits for response, it cannot guarantee that the server receives and successfully processes the given request. Though in most cases under normal conditions requests should reach the server and it should successfully process them. Send semantics is similar to UDP messages' semantics.
The server may return arbitrary response on Send() request, but the response is totally ignored.
All the request types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.
Don't forget starting the client with Client.Start() before calling Client.Send().
func (*Client) Start ¶
func (c *Client) Start()
Start starts rpc client. Establishes connection to the server on Client.Addr.
All the request and response types the client may use must be registered via RegisterType() before starting the client. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.
type ClientError ¶
type ClientError struct { // Set if the error is timeout-related. Timeout bool // Set if the error is connection-related. Connection bool // Set if the error is server-related. Server bool // Set if the error is related to internal resources' overflow. // Increase PendingRequests if you see a lot of such errors. Overflow bool // May be set if AsyncResult.Cancel is called. Canceled bool // contains filtered or unexported fields }
ClientError is an error Client methods can return.
func (*ClientError) Error ¶
func (e *ClientError) Error() string
type ConnStats ¶
type ConnStats struct { // The number of rpc calls performed. RPCCalls uint64 // The total aggregate time for all rpc calls in milliseconds. // // This time can be used for calculating the average response time // per RPC: // avgRPCTtime = RPCTime / RPCCalls RPCTime uint64 // The number of bytes written to the underlying connections. BytesWritten uint64 // The number of bytes read from the underlying connections. BytesRead uint64 // The number of Read() calls. ReadCalls uint64 // The number of Read() errors. ReadErrors uint64 // The number of Write() calls. WriteCalls uint64 // The number of Write() errors. WriteErrors uint64 // The number of Dial() calls. DialCalls uint64 // The number of Dial() errors. DialErrors uint64 // The number of Accept() calls. AcceptCalls uint64 // The number of Accept() errors. AcceptErrors uint64 // contains filtered or unexported fields }
ConnStats provides connection statistics. Applied to both gorpc.Client and gorpc.Server.
Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.
func (*ConnStats) AvgRPCBytes ¶
AvgRPCBytes returns the average bytes sent / received per RPC.
Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.
func (*ConnStats) AvgRPCCalls ¶
AvgRPCCalls returns the average number of write() / read() syscalls per PRC.
Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.
func (*ConnStats) AvgRPCTime ¶
AvgRPCTime returns the average RPC execution time.
Use stats returned from ConnStats.Snapshot() on live Client and / or Server, since the original stats can be updated by concurrently running goroutines.
type DialFunc ¶
type DialFunc func(addr string) (conn io.ReadWriteCloser, err error)
DialFunc is a function intended for setting to Client.Dial.
It is expected that the returned conn immediately sends all the data passed via Write() to the server. Otherwise gorpc may hang. The conn implementation must call Flush() on underlying buffered streams before returning from Write().
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher helps constructing HandlerFunc for dispatching across multiple functions and/or services.
Dispatcher automatically registers all the request and response types for all functions and/or methods registered via AddFunc() and AddService(), so there is no need in calling RegisterType() for these types on server side. Client-side code must call RegisterType() for non-internal request and response types before issuing RPCs via DispatcherClient.
See examples for details.
Example (FuncCalls) ¶
d := NewDispatcher() // Function without args and return values incCalls := 0 d.AddFunc("Inc", func() { incCalls++ }) // Function without args d.AddFunc("Func42", func() int { return 42 }) // Echo function for string d.AddFunc("Echo", func(s string) string { return s }) // Function with struct arg and return value type ExampleRequestStruct struct { Foo int Bar string } type ExampleResponseStruct struct { Baz string BarLen int } d.AddFunc("Struct", func(s *ExampleRequestStruct) *ExampleResponseStruct { return &ExampleResponseStruct{ Baz: fmt.Sprintf("foo=%d, bar=%s", s.Foo, s.Bar), BarLen: len(s.Bar), } }) // Echo function for map d.AddFunc("Map", func(m map[string]int) map[string]int { return m }) // Echo function for slice d.AddFunc("Slice", func(s []string) []string { return s }) // Function returning errors d.AddFunc("Error", func() error { return errors.New("error") }) // Echo function, which may return error if arg is 42 d.AddFunc("Error42", func(x int) (int, error) { if x == 42 { return 0, errors.New("error42") } return x, nil }) // Echo function with client address' validation d.AddFunc("ClientAddr", func(clientAddr string, x int) (int, error) { clientHost := strings.SplitN(clientAddr, ":", 2)[0] if clientHost != "allowed.client.host" { return 0, fmt.Errorf("invalid rpc client host: [%s]", clientHost) } return x, nil }) // Start the server serving all the registered functions above s := NewTCPServer("127.0.0.1:12345", d.NewHandlerFunc()) if err := s.Start(); err != nil { log.Fatalf("Cannot start rpc server: [%s]", err) } defer s.Stop() // Start the client and connect it to the server c := NewTCPClient("127.0.0.1:12345") c.Start() defer c.Stop() // Create a client wrapper for calling server functions. dc := d.NewFuncClient(c) // Call functions defined above res, err := dc.Call("Inc", nil) fmt.Printf("Inc=%+v, %+v, %d\n", res, err, incCalls) res, err = dc.Call("Func42", nil) fmt.Printf("Func42=%+v, %+v\n", res, err) res, err = dc.Call("Echo", "foobar") fmt.Printf("Echo=%+v, %+v\n", res, err) reqst := &ExampleRequestStruct{ Foo: 42, Bar: "bar", } res, err = dc.Call("Struct", reqst) fmt.Printf("Struct=%+v, %+v\n", res, err) res, err = dc.Call("Map", map[string]int{"foo": 1, "bar": 2}) resm := res.(map[string]int) fmt.Printf("Map=foo:%d, bar:%d, %+v\n", resm["foo"], resm["bar"], err) res, err = dc.Call("Slice", []string{"foo", "bar"}) fmt.Printf("Slice=%+v, %+v\n", res, err) res, err = dc.Call("Error", nil) fmt.Printf("Error=%+v, %+v\n", res, err) res, err = dc.Call("Error42", 123) fmt.Printf("Error42(123)=%+v, %+v\n", res, err) res, err = dc.Call("Error42", 42) fmt.Printf("Error42(42)=%+v, %+v\n", res, err)
Output: Inc=<nil>, <nil>, 1 Func42=42, <nil> Echo=foobar, <nil> Struct=&{Baz:foo=42, bar=bar BarLen:3}, <nil> Map=foo:1, bar:2, <nil> Slice=[foo bar], <nil> Error=<nil>, error Error42(123)=123, <nil> Error42(42)=<nil>, error42
Example (ServiceCalls) ¶
package main import ( "errors" "fmt" "log" ) type ExampleDispatcherService struct { state int } func (s *ExampleDispatcherService) Get() int { return s.state } func (s *ExampleDispatcherService) Set(x int) { s.state = x } func (s *ExampleDispatcherService) GetError42() (int, error) { if s.state == 42 { return 0, errors.New("error42") } return s.state, nil } func (s *ExampleDispatcherService) privateFunc(string) { s.state = 0 } func main() { d := NewDispatcher() service := &ExampleDispatcherService{ state: 123, } // Register exported service functions d.AddService("MyService", service) // Start rpc server serving registered service. addr := "127.0.0.1:7892" s := NewTCPServer(addr, d.NewHandlerFunc()) if err := s.Start(); err != nil { log.Fatalf("Cannot start rpc server: [%s]", err) } defer s.Stop() // Start rpc client connected to the server. c := NewTCPClient(addr) c.Start() defer c.Stop() // Create client wrapper for calling service functions. dc := d.NewServiceClient("MyService", c) res, err := dc.Call("Get", nil) fmt.Printf("Get=%+v, %+v\n", res, err) service.state = 456 res, err = dc.Call("Get", nil) fmt.Printf("Get=%+v, %+v\n", res, err) res, err = dc.Call("Set", 78) fmt.Printf("Set=%+v, %+v, %+v\n", res, err, service.state) res, err = dc.Call("GetError42", nil) fmt.Printf("GetError42=%+v, %+v\n", res, err) service.state = 42 res, err = dc.Call("GetError42", nil) fmt.Printf("GetError42=%+v, %+v\n", res, err) }
Output: Get=123, <nil> Get=456, <nil> Set=<nil>, <nil>, 78 GetError42=78, <nil> GetError42=<nil>, error42
func (*Dispatcher) AddFunc ¶
func (d *Dispatcher) AddFunc(funcName string, f interface{})
AddFunc registers the given function f under the name funcName.
The function must accept zero, one or two input arguments. If the function has two arguments, then the first argument must have string type - the server will pass client address in this parameter.
The function must return zero, one or two values.
If the function has two return values, then the second value must have error type - the server will propagate this error to the client.
If the function returns only error value, then the server treats it as error, not return value, when sending to the client.
Arbitrary number of functions can be registered in the dispatcher.
See examples for details.
Example ¶
d := NewDispatcher() // Function without arguments and return values d.AddFunc("NoArgsNoRets", func() {}) // Function with one argument and no return values d.AddFunc("OneArgNoRets", func(request string) {}) // Function without arguments and one return value d.AddFunc("NoArgsOneRet", func() int { return 42 }) // Function with two arguments and no return values. // The first argument must have string type - the server passes // client address in it. d.AddFunc("TwoArgsNoRets", func(clientAddr string, requests []byte) {}) // Function with one argument and two return values. // The second return value must have error type. d.AddFunc("OneArgTwoRets", func(request []string) ([]string, error) { if len(request) == 42 { return nil, errors.New("need 42 strings") } return request, nil })
Output:
func (*Dispatcher) AddService ¶
func (d *Dispatcher) AddService(serviceName string, service interface{})
AddService registers public methods of the given service under the given name serviceName.
Since only public methods are registered, the service must have at least one public method.
All public methods must conform requirements described in AddFunc().
func (*Dispatcher) NewFuncClient ¶
func (d *Dispatcher) NewFuncClient(c *Client) *DispatcherClient
NewFuncClient returns a client suitable for calling functions registered via AddFunc().
func (*Dispatcher) NewHandlerFunc ¶
func (d *Dispatcher) NewHandlerFunc() HandlerFunc
NewHandlerFunc returns HandlerFunc serving all the functions and/or services registered via AddFunc() and AddService().
The returned HandlerFunc must be assigned to Server.Handler or passed to New*Server().
func (*Dispatcher) NewServiceClient ¶
func (d *Dispatcher) NewServiceClient(serviceName string, c *Client) *DispatcherClient
NewServiceClient returns a client suitable for calling methods of the service with name serviceName registered via AddService().
It is safe creating multiple service clients over a single underlying client.
type DispatcherBatch ¶
type DispatcherBatch struct {
// contains filtered or unexported fields
}
DispatcherBatch allows grouping and executing multiple RPCs in a single batch.
DispatcherBatch may be created via DispatcherClient.NewBatch().
func (*DispatcherBatch) Add ¶
func (b *DispatcherBatch) Add(funcName string, request interface{}) *BatchResult
Add ads new request to the RPC batch.
The order of batched RPCs execution on the server is unspecified.
All the requests added to the batch are sent to the server at once when DispatcherBatch.Call*() is called.
All the non-internal request and response types must be registered via RegisterType() before the first call to this function.
It is safe adding multiple requests to the same batch from concurrently running goroutines.
func (*DispatcherBatch) AddSkipResponse ¶
func (b *DispatcherBatch) AddSkipResponse(funcName string, request interface{})
AddSkipResponse adds new request to the RPC batch and doesn't care about the response.
The order of batched RPCs execution on the server is unspecified.
All the requests added to the batch are sent to the server at once when DispatcherBatch.Call*() is called.
All the non-internal request types must be registered via RegisterType() before the first call to this function.
It is safe adding multiple requests to the same batch from concurrently running goroutines.
func (*DispatcherBatch) Call ¶
func (b *DispatcherBatch) Call() error
Call calls all the RPCs added via DispatcherBatch.Add().
The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.
The caller may read all BatchResult contents returned from DispatcherBatch.Add() after the Call returns.
It is guaranteed that all <-BatchResult.Done channels are unblocked after the Call returns.
func (*DispatcherBatch) CallTimeout ¶
func (b *DispatcherBatch) CallTimeout(timeout time.Duration) error
CallTimeout calls all the RPCs added via DispatcherBatch.Add() and waits for all the RPC responses during the given timeout.
The order of batched RPCs execution on the server is unspecified. Usually batched RPCs are executed concurrently on the server.
The caller may read all BatchResult contents returned from DispatcherBatch.Add() after the CallTimeout returns.
It is guaranteed that all <-BatchResult.Done channels are unblocked after the CallTimeout returns.
type DispatcherClient ¶
type DispatcherClient struct {
// contains filtered or unexported fields
}
DispatcherClient is a Client wrapper suitable for calling registered functions and/or for calling methods of the registered services.
func (*DispatcherClient) Call ¶
func (dc *DispatcherClient) Call(funcName string, request interface{}) (response interface{}, err error)
Call calls the given function with the given request.
All the non-internal request and response types must be registered via RegisterType() before the first call to this function.
func (*DispatcherClient) CallAsync ¶
func (dc *DispatcherClient) CallAsync(funcName string, request interface{}) (*AsyncResult, error)
CallAsync calls the given function asynchronously.
All the non-internal request and response types must be registered via RegisterType() before the first call to this function.
func (*DispatcherClient) CallTimeout ¶
func (dc *DispatcherClient) CallTimeout(funcName string, request interface{}, timeout time.Duration) (response interface{}, err error)
CallTimeout calls the given function and waits for response during the given timeout.
All the non-internal request and response types must be registered via RegisterType() before the first call to this function.
func (*DispatcherClient) NewBatch ¶
func (dc *DispatcherClient) NewBatch() *DispatcherBatch
NewBatch creates new RPC batch for the given DispatcherClient.
It is safe creating multiple concurrent batches from a single client.
Example ¶
// Create new dispatcher. d := NewDispatcher() // Register echo function in the dispatcher. d.AddFunc("Echo", func(x int) int { return x }) // Start the server serving all the registered functions above s := NewTCPServer("127.0.0.1:12445", d.NewHandlerFunc()) if err := s.Start(); err != nil { log.Fatalf("Cannot start rpc server: [%s]", err) } defer s.Stop() // Start the client and connect it to the server c := NewTCPClient("127.0.0.1:12445") c.Start() defer c.Stop() // Create a client wrapper for calling server functions. dc := d.NewFuncClient(c) // Create new batch for calling server functions. b := dc.NewBatch() result := make([]*BatchResult, 3) // Add RPC messages to the batch. for i := 0; i < 3; i++ { result[i] = b.Add("Echo", i) } // Invoke all the RPCs added to the batch. if err := b.Call(); err != nil { log.Fatalf("error when calling RPC batch: [%s]", err) } for i := 0; i < 3; i++ { r := result[i] fmt.Printf("response[%d]=%+v, %+v\n", i, r.Response, r.Error) }
Output: response[0]=0, <nil> response[1]=1, <nil> response[2]=2, <nil>
func (*DispatcherClient) Send ¶
func (dc *DispatcherClient) Send(funcName string, request interface{}) error
Send sends the given request to the given function and doesn't wait for response.
All the non-internal request types must be registered via RegisterType() before the first call to this function.
type HandlerFunc ¶
type HandlerFunc func(clientAddr string, request interface{}) (response interface{})
HandlerFunc is a server handler function.
clientAddr contains client address returned by Listener.Accept(). Request and response types may be arbitrary. All the request and response types the HandlerFunc may use must be registered with RegisterType() before starting the server. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.
Hint: use Dispatcher for HandlerFunc construction.
type Listener ¶
type Listener interface { // Init is called on server start. // // addr contains the address set at Server.Addr. Init(addr string) error // Accept must return incoming connections from clients. // clientAddr must contain client's address in user-readable view. // // It is expected that the returned conn immediately // sends all the data passed via Write() to the client. // Otherwise gorpc may hang. // The conn implementation must call Flush() on underlying buffered // streams before returning from Write(). Accept() (conn io.ReadWriteCloser, clientAddr string, err error) // Close closes the listener. // All pending calls to Accept() must immediately return errors after // Close is called. // All subsequent calls to Accept() must immediately return error. Close() error // Addr returns the listener's network address. ListenAddr() net.Addr }
Listener is an interface for custom listeners intended for the Server.
type LoggerFunc ¶
type LoggerFunc func(format string, args ...interface{})
LoggerFunc is an error logging function to pass to gorpc.SetErrorLogger().
type OnConnectFunc ¶
type OnConnectFunc func(remoteAddr string, rwc io.ReadWriteCloser) (io.ReadWriteCloser, error)
OnConnectFunc is a callback, which may be called by both Client and Server on every connection creation if assigned to Client.OnConnect / Server.OnConnect.
remoteAddr is the address of the remote end for the established connection rwc.
The callback must return either rwc itself or a rwc wrapper. The returned connection wrapper MUST send all the data to the underlying rwc on every Write() call, otherwise the connection will hang forever.
The callback may be used for authentication/authorization and/or custom transport wrapping.
type Server ¶
type Server struct { // Address to listen to for incoming connections. // // The address format depends on the underlying transport provided // by Server.Listener. The following transports are provided // out of the box: // * TCP - see NewTCPServer() and NewTCPClient(). // * TLS (aka SSL) - see NewTLSServer() and NewTLSClient(). // * Unix sockets - see NewUnixServer() and NewUnixClient(). // // By default TCP transport is used. Addr string // Handler function for incoming requests. // // Server calls this function for each incoming request. // The function must process the request and return the corresponding response. // // Hint: use Dispatcher for HandlerFunc construction. Handler HandlerFunc // The maximum number of concurrent rpc calls the server may perform. // Default is DefaultConcurrency. Concurrency int // The maximum delay between response flushes to clients. // // Negative values lead to immediate requests' sending to the client // without their buffering. This minimizes rpc latency at the cost // of higher CPU and network usage. // // Default is DefaultFlushDelay. FlushDelay time.Duration // The maximum number of pending responses in the queue. // Default is DefaultPendingMessages. PendingResponses int // Size of send buffer per each underlying connection in bytes. // Default is DefaultBufferSize. SendBufferSize int // Size of recv buffer per each underlying connection in bytes. // Default is DefaultBufferSize. RecvBufferSize int // OnConnect is called whenever connection from client is accepted. // The callback can be used for authentication/authorization/encryption // and/or for custom transport wrapping. // // See also Listener, which can be used for sophisticated transport // implementation. OnConnect OnConnectFunc // The server obtains new client connections via Listener.Accept(). // // Override the listener if you want custom underlying transport // and/or client authentication/authorization. // Don't forget overriding Client.Dial() callback accordingly. // // See also OnConnect for authentication/authorization purposes. // // * NewTLSClient() and NewTLSServer() can be used for encrypted rpc. // * NewUnixClient() and NewUnixServer() can be used for fast local // inter-process rpc. // // By default it returns TCP connections accepted from Server.Addr. Listener Listener // LogError is used for error logging. // // By default the function set via SetErrorLogger() is used. LogError LoggerFunc // Connection statistics. // // The stats doesn't reset automatically. Feel free resetting it // any time you wish. Stats ConnStats // contains filtered or unexported fields }
Server implements RPC server.
Default server settings are optimized for high load, so don't override them without valid reason.
Example ¶
// Register the given struct for passing as rpc request and/or response. // All structs intended for passing between client and server // must be registered via RegisterType(). // // The struct may contain arbitrary fields, but only public (exported) // fields are passed between client and server. type ExampleStruct struct { Foo int // This feild won't be passed over the wire, // since it is private (unexported) bar string Baz string } RegisterType(&ExampleStruct{}) // Start echo server handlerFunc := func(clientAddr string, request interface{}) interface{} { return request } s := NewTCPServer("127.0.0.1:43216", handlerFunc) if err := s.Start(); err != nil { log.Fatalf("Cannot start server: [%s]", err) } defer s.Stop() // Connect client to the echo server c := NewTCPClient("127.0.0.1:43216") c.Start() defer c.Stop() // Echo string res, err := c.Call("foobar") fmt.Printf("%+v, %+v\n", res, err) // Echo int res, err = c.Call(1234) fmt.Printf("%+v, %+v\n", res, err) // Echo string slice res, err = c.Call([]string{"foo", "bar"}) fmt.Printf("%+v, %+v\n", res, err) // Echo struct res, err = c.Call(&ExampleStruct{ Foo: 123, bar: "324", Baz: "mmm", }) fmt.Printf("%+v, %+v\n", res, err)
Output: foobar, <nil> 1234, <nil> [foo bar], <nil> &{Foo:123 bar: Baz:mmm}, <nil>
func NewTCPServer ¶
func NewTCPServer(addr string, handler HandlerFunc) *Server
NewTCPServer creates a server listening for TCP connections on the given addr and processing incoming requests with the given HandlerFunc.
The returned server must be started after optional settings' adjustment.
The corresponding client must be created with NewTCPClient().
func NewTLSServer ¶
func NewTLSServer(addr string, handler HandlerFunc, cfg *tls.Config) *Server
NewTLSServer creates a server listening for TLS (aka SSL) connections on the given addr and processing incoming requests with the given HandlerFunc. cfg must contain TLS settings for the server.
The returned server must be started after optional settings' adjustment.
The corresponding client must be created with NewTLSClient().
func NewUnixServer ¶
func NewUnixServer(addr string, handler HandlerFunc) *Server
NewUnixServer creates a server listening for unix connections on the given addr and processing incoming requests with the given HandlerFunc.
The returned server must be started after optional settings' adjustment.
The corresponding client must be created with NewUnixClient().
func (*Server) Start ¶
Start starts rpc server.
All the request and response types the Handler may use must be registered with RegisterType() before starting the server. There is no need in registering base Go types such as int, string, bool, float64, etc. or arrays, slices and maps containing base Go types.