cprpc

package module
v0.2.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 26, 2022 License: BSD-3-Clause Imports: 19 Imported by: 0

README

cprpc

cprpc is a RPC package that copied and modified from the standard package of net/rpc, provides an easier way to use:

Usage

Server:

package main

import (
	"log"
	"net"

	"github.com/hmgle/cprpc"
)

type (
	HelloV1API struct {
		Name string
	}
	HelloV1Ret struct {
		Data string
	}
)

func (h *HelloV1API) Serve(ctx *cprpc.Context) {
	ctx.ReplyOk(&HelloV1Ret{
		Data: "hello, " + h.Name,
	})
}

func main() {
	srv := cprpc.NewServer()
	srv.RegisterAPI("/v1/hello", &HelloV1API{})

	listener, err := net.Listen("tcp", ":1234")
	if err != nil {
		log.Fatal("ListenTCP error:", err)
	}
	srv.Serve(listener)
}

Client:

package main

import (
	"log"

	"github.com/hmgle/cprpc"
)

func main() {
	client, err := cprpc.Dial("tcp4", "127.0.0.1:1234")
	if err != nil {
		log.Fatal(err)
	}
	args := &HelloV1API{
		Name: "world",
	}
	reply := &HelloV1Ret{}
	err = client.Call("/v1/hello", args, &reply)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("reply: %+v\n", reply)
}

Performance

Below are benchmark results comparing cprpc performance to net/rpc:

$ go test -bench=. -benchmem -run=none
goos: linux
goarch: amd64
pkg: github.com/hmgle/cprpc
cpu: Intel(R) Core(TM) i5-5200U CPU @ 2.20GHz
BenchmarkCprpc-4                   23352             51321 ns/op             328 B/op         11 allocs/op
BenchmarkCprpcJSON-4               17560             64298 ns/op             336 B/op         13 allocs/op
BenchmarkCprpcMsgpack-4            13821             83964 ns/op             328 B/op         11 allocs/op
BenchmarkCprpcPool-4               15102             74708 ns/op            1070 B/op         22 allocs/op
BenchmarkCprpcPoolJSON-4           19048             62061 ns/op             368 B/op         13 allocs/op
BenchmarkCprpcPoolMsgpack-4        12912             86324 ns/op             360 B/op         12 allocs/op
BenchmarkNetRpc-4                  18945             62589 ns/op             358 B/op         12 allocs/op
PASS
ok      github.com/hmgle/cprpc  13.357s

Documentation

Index

Constants

View Source
const (
	// DefaultRPCPath used by HandleHTTP
	DefaultRPCPath = "/_goRPC_"
	// DefaultDebugPath used by HandleHTTP
	DefaultDebugPath = "/debug/rpc"
)

Variables

View Source
var DefaultServer = NewServer()

DefaultServer is the default instance of *Server.

View Source
var ErrServerClosed = errors.New("cprpc: Server closed")

ErrServerClosed is returned by the Server's Serve method after a call to Shutdown or Close.

View Source
var ErrShutdown = errors.New("connection is shut down")

ErrShutdown connection is closed.

Functions

func HandleHTTP

func HandleHTTP()

HandleHTTP registers an HTTP handler for RPC messages to DefaultServer on DefaultRPCPath and a debugging handler on DefaultDebugPath. It is still necessary to invoke http.Serve(), typically in a go statement.

func Serve

func Serve(lis net.Listener) error

Serve accepts connections on the listener and serves requests to DefaultServer for each incoming connection. Accept blocks; the caller typically invokes it in a go statement.

func ServeCodec

func ServeCodec(codec ServerCodec)

ServeCodec is like ServeConn but uses the specified codec to decode requests and encode responses.

func ServeConn

func ServeConn(conn io.ReadWriteCloser)

ServeConn runs the DefaultServer on a single connection. ServeConn blocks, serving the connection until the client hangs up. The caller typically invokes ServeConn in a go statement. ServeConn uses the gob wire format (see package gob) on the connection. To use an alternate codec, use ServeCodec. See NewClient's comment for information about concurrent access.

func ServeRequest

func ServeRequest(codec ServerCodec) error

ServeRequest is like ServeCodec but synchronously serves a single request. It does not close the codec upon completion.

func Shutdown

func Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the DefaultServer without interrupting any active connection. Shutdown works by first closing all DefaultServer's open listeners, then waiting indefinitely for connections to transition to an idle state before closing them and shutting down. If the provided context expires before the shutdown is complete, Shutdown forcibly closes active connections and returns the context's error, otherwise it returns any error returned from closing the DefaultServer's underlying Listener(s).

Types

type API

type API interface {
	Serve(*Context)
}

A API implements the Serve method.

type Call

type Call struct {
	Path  string
	Args  interface{} // The argument to the function (*struct).
	Reply interface{} // The reply from the function (*struct).
	Error error       // After completion, the error status.
	Done  chan *Call  // Strobes when call is complete.
}

Call represents an active RPC.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents an RPC Client. There may be multiple outstanding Calls associated with a single Client, and a Client may be used by multiple goroutines simultaneously.

func Dial

func Dial(network, address string, codecFunc ...func(io.ReadWriteCloser) ClientCodec) (*Client, error)

Dial connects to an RPC server at the specified network address.

func DialHTTP

func DialHTTP(network, address string) (*Client, error)

DialHTTP connects to an HTTP RPC server at the specified network address listening on the default HTTP RPC path.

func DialHTTPPath

func DialHTTPPath(network, address, path string, codecFunc ...func(io.ReadWriteCloser) ClientCodec) (*Client, error)

DialHTTPPath connects to an HTTP RPC server at the specified network address and path.

func NewClient

func NewClient(conn io.ReadWriteCloser, codecFunc ...func(io.ReadWriteCloser) ClientCodec) *Client

NewClient returns a new Client to handle requests to the set of services at the other end of the connection. It adds a buffer to the write side of the connection so the header and payload are sent as a unit.

The read and write halves of the connection are serialized independently, so no interlocking is required. However each half may be accessed concurrently so the implementation of conn should protect against concurrent reads or concurrent writes.

func NewClientWithCodec

func NewClientWithCodec(codec ClientCodec) *Client

NewClientWithCodec is like NewClient but uses the specified codec to encode requests and decode responses.

func (*Client) Call

func (client *Client) Call(path string, args interface{}, reply interface{}) error

Call invokes the named function, waits for it to complete, and returns its error status.

func (*Client) Close

func (client *Client) Close() error

Close calls the underlying codec's Close method. If the connection is already shutting down, ErrShutdown is returned.

func (*Client) Go

func (client *Client) Go(path string, args interface{}, reply interface{}, done chan *Call) *Call

Go invokes the function asynchronously. It returns the Call structure representing the invocation. The done channel will signal when the call is complete by returning the same Call object. If done is nil, Go will allocate a new channel. If non-nil, done must be buffered or Go will deliberately crash.

type ClientCodec

type ClientCodec interface {
	WriteRequest(*Request, interface{}) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(interface{}) error

	Close() error
}

A ClientCodec implements writing of RPC requests and reading of RPC responses for the client side of an RPC session. The client calls WriteRequest to write a request to the connection and calls ReadResponseHeader and ReadResponseBody in pairs to read responses. The client calls Close when finished with the connection. ReadResponseBody may be called with a nil argument to force the body of the response to be read and then discarded. See NewClient's comment for information about concurrent access.

type Codec

type Codec struct {
	Timeout time.Duration
	Closer  io.ReadWriteCloser
	Decoder *gob.Decoder
	Encoder *gob.Encoder
	EncBuf  *bufio.Writer
}

Codec ...

func (*Codec) Close

func (c *Codec) Close() error

Close ...

func (*Codec) ReadResponseBody

func (c *Codec) ReadResponseBody(body interface{}) error

ReadResponseBody ...

func (*Codec) ReadResponseHeader

func (c *Codec) ReadResponseHeader(r *Response) error

ReadResponseHeader ...

func (*Codec) WriteRequest

func (c *Codec) WriteRequest(r *Request, body interface{}) (err error)

WriteRequest ...

type Context

type Context struct {
	// contains filtered or unexported fields
}

Context including Server, seq, ServerCodec and sync.Mutex.

func (*Context) ReplyOk

func (ctx *Context) ReplyOk(ret interface{})

ReplyOk reply ok data.

type Options

type Options struct {

	// InitTargets init targets
	InitTargets []string
	// init connection
	InitCap int
	// max connections
	MaxCap       int
	DialTimeout  time.Duration
	IdleTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration

	CodecFunc func(io.ReadWriteCloser) ClientCodec
	// contains filtered or unexported fields
}

Options pool options

func NewOptions

func NewOptions() *Options

NewOptions returns a new newOptions instance with sane defaults.

func (*Options) Input

func (o *Options) Input() chan<- *[]string

Input is the input channel.

type RPCPool

type RPCPool struct {
	Mu          sync.Mutex
	IdleTimeout time.Duration
	// contains filtered or unexported fields
}

RPCPool pool info

func NewRPCPool

func NewRPCPool(o *Options) (*RPCPool, error)

NewRPCPool init pool.

func (*RPCPool) Call

func (c *RPCPool) Call(path string, args interface{}, reply interface{}) error

Call invokes the named function, waits for it to complete, and returns its error status.

func (*RPCPool) Close

func (c *RPCPool) Close()

Close all connection.

func (*RPCPool) Go

func (c *RPCPool) Go(path string, args interface{}, reply interface{}, done chan *Call) (*Call, error)

Go invokes the function asynchronously.

type Request

type Request struct {
	Path string
	Seq  uint64 // sequence number chosen by client
	// contains filtered or unexported fields
}

Request is a header written before every RPC call. It is used internally but documented here as an aid to debugging, such as when analyzing network traffic.

type Response

type Response struct {
	Path  string
	Seq   uint64 // echoes that of the request
	Error string // error, if any.
	// contains filtered or unexported fields
}

Response is a header written before every RPC return. It is used internally but documented here as an aid to debugging, such as when analyzing network traffic.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server represents an RPC Server.

func NewServer

func NewServer(codecFunc ...func(io.ReadWriteCloser) ServerCodec) *Server

NewServer returns a new Server.

func (*Server) Close

func (server *Server) Close() error

Close immediately closes all active net.Listeners and any connections in state StateNew, StateActive, or StateIdle. For a graceful shutdown, use Shutdown.

func (*Server) HandleHTTP

func (server *Server) HandleHTTP(rpcPath, debugPath string)

HandleHTTP registers an HTTP handler for RPC messages on rpcPath, and a debugging handler on debugPath. It is still necessary to invoke http.Serve(), typically in a go statement.

func (*Server) RegisterAPI

func (server *Server) RegisterAPI(path string, api API)

RegisterAPI bind the path to api.

func (*Server) Serve

func (server *Server) Serve(lis net.Listener) error

Serve accepts connections on the listener and serves requests for each incoming connection. Accept blocks until the listener returns a non-nil error. The caller typically invokes Accept in a go statement.

func (*Server) ServeCodec

func (server *Server) ServeCodec(codec ServerCodec)

ServeCodec is like ServeConn but uses the specified codec to decode requests and encode responses.

func (*Server) ServeConn

func (server *Server) ServeConn(conn io.ReadWriteCloser)

ServeConn runs the server on a single connection. ServeConn blocks, serving the connection until the client hangs up. The caller typically invokes ServeConn in a go statement. ServeConn uses the gob wire format (see package gob) on the connection. To use an alternate codec, use ServeCodec. See NewClient's comment for information about concurrent access.

func (*Server) ServeHTTP

func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)

ServeHTTP implements an http.Handler that answers RPC requests.

func (*Server) ServeRequest

func (server *Server) ServeRequest(codec ServerCodec) error

ServeRequest is like ServeCodec but synchronously serves a single request. It does not close the codec upon completion.

func (*Server) Shutdown

func (server *Server) Shutdown(ctx context.Context) error

Shutdown gracefully shuts down the server without interrupting any active connections. Shutdown works by first closing all open listeners, then closing all idle connections, and then waiting indefinitely for connections to return to idle and then shut down. If the provided context expires before the shutdown is complete, Shutdown returns the context's error, otherwise it returns any error returned from closing the Server's underlying Listener(s).

type ServerCodec

type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(interface{}) error
	WriteResponse(*Response, interface{}) error

	// Close can be called multiple times and must be idempotent.
	Close() error

	GetRwc() io.ReadWriteCloser
}

A ServerCodec implements reading of RPC requests and writing of RPC responses for the server side of an RPC session. The server calls ReadRequestHeader and ReadRequestBody in pairs to read requests from the connection, and it calls WriteResponse to write a response back. The server calls Close when finished with the connection. ReadRequestBody may be called with a nil argument to force the body of the request to be read and discarded. See NewClient's comment for information about concurrent access.

type ServerError

type ServerError string

ServerError represents an error that has been returned from the remote side of the RPC connection.

func (ServerError) Error

func (e ServerError) Error() string

Directories

Path Synopsis
codec

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL