Documentation ¶
Index ¶
- Constants
- Variables
- func HTTPRequest2RpcxRequest(r *http.Request) (*protocol.Message, error)
- func NewHandlerViewer(s *Server) viewer.Viewer
- func NewProcessTimeViewer(s *Server) viewer.Viewer
- func NewRequestRateViewer(s *Server) viewer.Viewer
- func RegisterMakeListener(network string, ml MakeListener)
- type CMuxPlugin
- type CORSOptions
- type Context
- func (ctx *Context) Bind(v interface{}) error
- func (ctx *Context) DeleteKey(key interface{})
- func (ctx *Context) Get(key interface{}) interface{}
- func (ctx *Context) Metadata() map[string]string
- func (ctx *Context) Payload() []byte
- func (ctx *Context) ServiceMethod() string
- func (ctx *Context) ServicePath() string
- func (ctx *Context) SetValue(key, val interface{})
- func (ctx *Context) Write(v interface{}) error
- func (ctx *Context) WriteError(err error) error
- type DownloadFileHandler
- type FileTransfer
- type FileTransferHandler
- type FileTransferService
- type Handler
- type HandlerViewer
- type HeartbeatPlugin
- type ID
- type JSONRPCError
- type MakeListener
- type OptionFn
- func WithAsyncWrite() OptionFn
- func WithCustomPool(pool WorkerPool) OptionFn
- func WithPool(maxWorkers, maxCapacity int, options ...pond.Option) OptionFn
- func WithReadTimeout(readTimeout time.Duration) OptionFn
- func WithTCPKeepAlivePeriod(period time.Duration) OptionFn
- func WithTLSConfig(cfg *tls.Config) OptionFn
- func WithWriteTimeout(writeTimeout time.Duration) OptionFn
- type Plugin
- type PluginContainer
- type PostCallPlugin
- type PostConnAcceptPlugin
- type PostConnClosePlugin
- type PostHTTPRequestPlugin
- type PostReadRequestPlugin
- type PostWriteRequestPlugin
- type PostWriteResponsePlugin
- type PreCallPlugin
- type PreHandleRequestPlugin
- type PreReadRequestPlugin
- type PreWriteRequestPlugin
- type PreWriteResponsePlugin
- type ProcessTimeViewer
- type RegisterFunctionPlugin
- type RegisterPlugin
- type RequestRateViewer
- type Reset
- type Server
- func (s *Server) ActiveClientConn() []net.Conn
- func (s *Server) AddHandler(servicePath, serviceMethod string, handler func(*Context) error)
- func (s *Server) Address() net.Addr
- func (s *Server) Close() error
- func (s *Server) EnableFileTransfer(serviceName string, fileTransfer *FileTransfer)
- func (s *Server) EnableStreamService(serviceName string, streamService *StreamService)
- func (s *Server) Register(rcvr interface{}, metadata string) error
- func (s *Server) RegisterFunction(servicePath string, fn interface{}, metadata string) error
- func (s *Server) RegisterFunctionName(servicePath string, name string, fn interface{}, metadata string) error
- func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error
- func (s *Server) RegisterOnRestart(f func(s *Server))
- func (s *Server) RegisterOnShutdown(f func(s *Server))
- func (s *Server) Restart(ctx context.Context) error
- func (s *Server) SendMessage(conn net.Conn, servicePath, serviceMethod string, metadata map[string]string, ...) error
- func (s *Server) Serve(network, address string) (err error)
- func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)
- func (s *Server) ServeListener(network string, ln net.Listener) (err error)
- func (s *Server) ServeWS(conn *websocket.Conn)
- func (s *Server) SetCORS(options *CORSOptions)
- func (s *Server) Shutdown(ctx context.Context) error
- func (s *Server) UnregisterAll() error
- type StreamAcceptor
- type StreamHandler
- type StreamService
- type VersionTag
- type ViewManager
- type WorkerPool
Constants ¶
const ( XVersion = "X-RPCX-Version" XMessageType = "X-RPCX-MessageType" XHeartbeat = "X-RPCX-Heartbeat" XOneway = "X-RPCX-Oneway" XMessageStatusType = "X-RPCX-MessageStatusType" XSerializeType = "X-RPCX-SerializeType" XMessageID = "X-RPCX-MessageID" XServicePath = "X-RPCX-ServicePath" XServiceMethod = "X-RPCX-ServiceMethod" XMeta = "X-RPCX-Meta" XErrorMessage = "X-RPCX-ErrorMessage" )
const ( // CodeUnknownJSONRPCError should be used for all non coded errors. CodeUnknownJSONRPCError = -32001 // CodeParseJSONRPCError is used when invalid JSON was received by the server. CodeParseJSONRPCError = -32700 // CodeInvalidjsonrpcRequest is used when the JSON sent is not a valid jsonrpcRequest object. CodeInvalidjsonrpcRequest = -32600 // CodeMethodNotFound should be returned by the handler when the method does // not exist / is not available. CodeMethodNotFound = -32601 // CodeInvalidParams should be returned by the handler when method // parameter(s) were invalid. CodeInvalidParams = -32602 // CodeInternalJSONRPCError is not currently returned but defined for completeness. CodeInternalJSONRPCError = -32603 )
const ( // ReaderBuffsize is used for bufio reader. ReaderBuffsize = 1024 // WriterBuffsize is used for bufio writer. WriterBuffsize = 1024 )
const (
// RPCXHandlerMetrics is the name of HandlerViewer
RPCXHandlerMetrics = "rpcx_handler"
)
const (
// RPCXProcessTimeMetrics is the name of ProcessTimeViewer
RPCXProcessTimeMetrics = "rpcx_processtime"
)
const (
// RPCXRequestRateMetrics is the name of RequestRateViewer
RPCXRequestRateMetrics = "rpcx_request_rate"
)
Variables ¶
var ( ErrServerClosed = errors.New("http: Server closed") ErrReqReachLimit = errors.New("request reached rate limit") )
ErrServerClosed is returned by the Server's Serve, ListenAndServe after a call to Shutdown or Close.
var ( // RemoteConnContextKey is a context key. It can be used in // services with context.WithValue to access the connection arrived on. // The associated value will be of type net.Conn. RemoteConnContextKey = &contextKey{"remote-conn"} // StartRequestContextKey records the start time StartRequestContextKey = &contextKey{"start-parse-request"} // StartSendRequestContextKey records the start time StartSendRequestContextKey = &contextKey{"start-send-request"} // TagContextKey is used to record extra info in handling services. Its value is a map[string]interface{} TagContextKey = &contextKey{"service-tag"} // HttpConnContextKey is used to store http connection. HttpConnContextKey = &contextKey{"http-conn"} )
var ErrNotAccept = errors.New("server refused the connection")
Functions ¶
func HTTPRequest2RpcxRequest ¶
HTTPRequest2RpcxRequest converts a http request to a rpcx request.
func NewHandlerViewer ¶
NewHandlerViewer returns the HandlerViewer instance
func NewProcessTimeViewer ¶
NewProcessTimeViewer returns the ProcessTimeViewer instance
func NewRequestRateViewer ¶
NewRequestRateViewer returns the RequestRateViewer instance
func RegisterMakeListener ¶
func RegisterMakeListener(network string, ml MakeListener)
RegisterMakeListener registers a MakeListener for network.
Types ¶
type CMuxPlugin ¶
type CORSOptions ¶
func AllowAllCORSOptions ¶
func AllowAllCORSOptions() *CORSOptions
AllowAllCORSOptions returns a option that allows access.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
Context represents a rpcx FastCall context.
func NewContext ¶
NewContext creates a server.Context for Handler.
func (*Context) DeleteKey ¶
func (ctx *Context) DeleteKey(key interface{})
DeleteKey delete the kv pair by key.
func (*Context) Get ¶
func (ctx *Context) Get(key interface{}) interface{}
Get returns value for key.
func (*Context) ServiceMethod ¶
ServiceMethod returns the ServiceMethod.
func (*Context) ServicePath ¶
ServicePath returns the ServicePath.
func (*Context) SetValue ¶
func (ctx *Context) SetValue(key, val interface{})
SetValue sets the kv pair.
func (*Context) WriteError ¶
type DownloadFileHandler ¶
type DownloadFileHandler func(conn net.Conn, args *share.DownloadFileArgs)
DownloadFileHandler handles downloading file. Must close the connection after it finished.
type FileTransfer ¶
type FileTransfer struct { Addr string AdvertiseAddr string // contains filtered or unexported fields }
FileTransfer support transfer files from clients. It registers a file transfer service and listens a on the given port. Clients will invokes this service to get the token and send the token and the file to this port.
func NewFileTransfer ¶
func NewFileTransfer(addr string, handler FileTransferHandler, downloadFileHandler DownloadFileHandler, waitNum int) *FileTransfer
NewFileTransfer creates a FileTransfer with given parameters.
func (*FileTransfer) Start ¶
func (s *FileTransfer) Start() error
func (*FileTransfer) Stop ¶
func (s *FileTransfer) Stop() error
type FileTransferHandler ¶
type FileTransferHandler func(conn net.Conn, args *share.FileTransferArgs)
FileTransferHandler handles uploading file. Must close the connection after it finished.
type FileTransferService ¶
type FileTransferService struct {
FileTransfer *FileTransfer
}
func (*FileTransferService) DownloadFile ¶
func (s *FileTransferService) DownloadFile(ctx context.Context, args *share.DownloadFileArgs, reply *share.FileTransferReply) error
func (*FileTransferService) TransferFile ¶
func (s *FileTransferService) TransferFile(ctx context.Context, args *share.FileTransferArgs, reply *share.FileTransferReply) error
type HandlerViewer ¶
type HandlerViewer struct {
// contains filtered or unexported fields
}
HandlerViewer collects metrics of rpcx.
func (*HandlerViewer) Name ¶
func (vr *HandlerViewer) Name() string
func (*HandlerViewer) Serve ¶
func (vr *HandlerViewer) Serve(w http.ResponseWriter, _ *http.Request)
func (*HandlerViewer) SetStatsMgr ¶
func (vr *HandlerViewer) SetStatsMgr(smgr *viewer.StatsMgr)
func (*HandlerViewer) View ¶
func (vr *HandlerViewer) View() *charts.Line
type HeartbeatPlugin ¶
type HeartbeatPlugin interface {
HeartbeatRequest(ctx context.Context, req *protocol.Message) error
}
HeartbeatPlugin is .
type ID ¶
ID is a jsonrpcRequest identifier. Only one of either the Name or Number members will be set, using the number form if the Name is the empty string.
func (*ID) MarshalJSON ¶
func (*ID) String ¶
String returns a string representation of the ID. The representation is non ambiguous, string forms are quoted, number forms are preceded by a #
func (*ID) UnmarshalJSON ¶
type JSONRPCError ¶
type JSONRPCError struct { // Code is an error code indicating the type of failure. Code int64 `json:"code"` // Message is a short description of the error. Message string `json:"message"` // Data is optional structured data containing additional information about the error. Data *json.RawMessage `json:"data"` }
JSONRPCError represents a structured error in a jsonrpcRespone.
func (*JSONRPCError) JSONRPCError ¶
func (err *JSONRPCError) JSONRPCError() string
type MakeListener ¶
MakeListener defines a listener generator.
type OptionFn ¶
type OptionFn func(*Server)
OptionFn configures options of server.
func WithCustomPool ¶
func WithCustomPool(pool WorkerPool) OptionFn
WithCustomPool uses a custom goroutine pool.
func WithReadTimeout ¶
WithReadTimeout sets readTimeout.
func WithTCPKeepAlivePeriod ¶
WithTCPKeepAlivePeriod sets tcp keepalive period.
func WithWriteTimeout ¶
WithWriteTimeout sets writeTimeout.
type PluginContainer ¶
type PluginContainer interface { Add(plugin Plugin) Remove(plugin Plugin) All() []Plugin DoRegister(name string, rcvr interface{}, metadata string) error DoRegisterFunction(serviceName, fname string, fn interface{}, metadata string) error DoUnregister(name string) error DoPostConnAccept(net.Conn) (net.Conn, bool) DoPostConnClose(net.Conn) bool DoPreReadRequest(ctx context.Context) error DoPostReadRequest(ctx context.Context, r *protocol.Message, e error) error DoPostHTTPRequest(ctx context.Context, r *http.Request, params httprouter.Params) error DoPreHandleRequest(ctx context.Context, req *protocol.Message) error DoPreCall(ctx context.Context, serviceName, methodName string, args interface{}) (interface{}, error) DoPostCall(ctx context.Context, serviceName, methodName string, args, reply interface{}, err error) (interface{}, error) DoPreWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error DoPostWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error DoPreWriteRequest(ctx context.Context) error DoPostWriteRequest(ctx context.Context, r *protocol.Message, e error) error DoHeartbeatRequest(ctx context.Context, req *protocol.Message) error MuxMatch(m cmux.CMux) }
PluginContainer represents a plugin container that defines all methods to manage plugins. And it also defines all extension points.
type PostCallPlugin ¶
type PostConnAcceptPlugin ¶
PostConnAcceptPlugin represents connection accept plugin. if returns false, it means subsequent IPostConnAcceptPlugins should not continue to handle this conn and this conn has been closed.
type PostConnClosePlugin ¶
PostConnClosePlugin represents client connection close plugin.
type PostHTTPRequestPlugin ¶
type PostHTTPRequestPlugin interface {
PostHTTPRequest(ctx context.Context, r *http.Request, params httprouter.Params) error
}
PostHTTPRequestPlugin represents .
type PostReadRequestPlugin ¶
type PostReadRequestPlugin interface {
PostReadRequest(ctx context.Context, r *protocol.Message, e error) error
}
PostReadRequestPlugin represents .
type PostWriteRequestPlugin ¶
type PostWriteRequestPlugin interface {
PostWriteRequest(ctx context.Context, r *protocol.Message, e error) error
}
PostWriteRequestPlugin represents .
type PostWriteResponsePlugin ¶
type PostWriteResponsePlugin interface {
PostWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error
}
PostWriteResponsePlugin represents .
type PreCallPlugin ¶
type PreHandleRequestPlugin ¶
type PreHandleRequestPlugin interface {
PreHandleRequest(ctx context.Context, r *protocol.Message) error
}
PreHandleRequestPlugin represents .
type PreReadRequestPlugin ¶
PreReadRequestPlugin represents .
type PreWriteRequestPlugin ¶
PreWriteRequestPlugin represents .
type PreWriteResponsePlugin ¶
type PreWriteResponsePlugin interface {
PreWriteResponse(context.Context, *protocol.Message, *protocol.Message, error) error
}
PreWriteResponsePlugin represents .
type ProcessTimeViewer ¶
type ProcessTimeViewer struct {
// contains filtered or unexported fields
}
ProcessTimeViewer collects metrics of rpcx.
func (*ProcessTimeViewer) Name ¶
func (vr *ProcessTimeViewer) Name() string
func (*ProcessTimeViewer) Serve ¶
func (vr *ProcessTimeViewer) Serve(w http.ResponseWriter, _ *http.Request)
func (*ProcessTimeViewer) SetStatsMgr ¶
func (vr *ProcessTimeViewer) SetStatsMgr(smgr *viewer.StatsMgr)
func (*ProcessTimeViewer) View ¶
func (vr *ProcessTimeViewer) View() *charts.Line
type RegisterFunctionPlugin ¶
type RegisterFunctionPlugin interface {
RegisterFunction(serviceName, fname string, fn interface{}, metadata string) error
}
RegisterFunctionPlugin is .
type RegisterPlugin ¶
type RegisterPlugin interface { Register(name string, rcvr interface{}, metadata string) error Unregister(name string) error }
RegisterPlugin is .
type RequestRateViewer ¶
type RequestRateViewer struct {
// contains filtered or unexported fields
}
RequestRateViewer collects metrics of rpcx.
func (*RequestRateViewer) Name ¶
func (vr *RequestRateViewer) Name() string
func (*RequestRateViewer) Serve ¶
func (vr *RequestRateViewer) Serve(w http.ResponseWriter, _ *http.Request)
func (*RequestRateViewer) SetStatsMgr ¶
func (vr *RequestRateViewer) SetStatsMgr(smgr *viewer.StatsMgr)
func (*RequestRateViewer) View ¶
func (vr *RequestRateViewer) View() *charts.Line
type Server ¶
type Server struct { DisableHTTPGateway bool // disable http invoke or not. DisableJSONRPC bool // disable json rpc or not. EnableProfile bool // enable profile and statsview or not AsyncWrite bool // set true if your server only serves few clients Plugins PluginContainer // AuthFunc can be used to auth. AuthFunc func(ctx context.Context, req *protocol.Message, token string) error // HandleServiceError is used to get all service errors. You can use it write logs or others. HandleServiceError func(error) // ServerErrorFunc is a customized error handlers and you can use it to return customized error strings to clients. // If not set, it use err.Error() ServerErrorFunc func(res *protocol.Message, err error) string ViewManager *ViewManager // contains filtered or unexported fields }
Server is rpcx server that use TCP or UDP.
func (*Server) ActiveClientConn ¶
ActiveClientConn returns active connections.
func (*Server) AddHandler ¶
func (*Server) EnableFileTransfer ¶
func (s *Server) EnableFileTransfer(serviceName string, fileTransfer *FileTransfer)
EnableFileTransfer supports filetransfer service in this server.
func (*Server) EnableStreamService ¶
func (s *Server) EnableStreamService(serviceName string, streamService *StreamService)
EnableStreamService supports stream service in this server.
func (*Server) Register ¶
Register publishes in the server the set of methods of the receiver value that satisfy the following conditions:
- exported method of exported type
- three arguments, the first is of context.Context, both of exported type for three arguments
- the third argument is a pointer
- one return value, of type error
It returns an error if the receiver is not an exported type or has no suitable methods. It also logs the error. The client accesses each method using a string of the form "Type.Method", where Type is the receiver's concrete type.
func (*Server) RegisterFunction ¶
RegisterFunction publishes a function that satisfy the following conditions:
- three arguments, the first is of context.Context, both of exported type for three arguments
- the third argument is a pointer
- one return value, of type error
The client accesses function using a string of the form "servicePath.Method".
func (*Server) RegisterFunctionName ¶
func (s *Server) RegisterFunctionName(servicePath string, name string, fn interface{}, metadata string) error
RegisterFunctionName is like RegisterFunction but uses the provided name for the function instead of the function's concrete type.
func (*Server) RegisterName ¶
RegisterName is like Register but uses the provided name for the type instead of the receiver's concrete type.
func (*Server) RegisterOnRestart ¶
RegisterOnRestart registers a function to call on Restart.
func (*Server) RegisterOnShutdown ¶
RegisterOnShutdown registers a function to call on Shutdown. This can be used to gracefully shutdown connections.
func (*Server) Restart ¶
Restart restarts this server gracefully. It starts a new rpcx server with the same port with SO_REUSEPORT socket option, and shutdown this rpcx server gracefully.
func (*Server) SendMessage ¶
func (s *Server) SendMessage(conn net.Conn, servicePath, serviceMethod string, metadata map[string]string, data []byte) error
SendMessage a request to the specified client. The client is designated by the conn. conn can be gotten from context in services:
ctx.Value(RemoteConnContextKey)
servicePath, serviceMethod, metadata can be set to zero values.
func (*Server) Serve ¶
Serve starts and listens RPC requests. It is blocked until receiving connections from clients.
func (*Server) ServeHTTP ¶
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request)
ServeHTTP implements an http.Handler that answers RPC requests.
func (*Server) ServeListener ¶
ServeListener listens RPC requests. It is blocked until receiving connections from clients.
func (*Server) SetCORS ¶
func (s *Server) SetCORS(options *CORSOptions)
SetCORS sets CORS options. for example:
cors.Options{ AllowedOrigins: []string{"foo.com"}, AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodDelete}, AllowCredentials: true, }
func (*Server) Shutdown ¶
Shutdown gracefully shuts down the server without interrupting any active connections. Shutdown works by first closing the listener, then closing all idle connections, and then waiting indefinitely for connections to return to idle and then shut down. If the provided context expires before the shutdown is complete, Shutdown returns the context's error, otherwise it returns any error returned from closing the Server's underlying Listener.
func (*Server) UnregisterAll ¶
UnregisterAll unregisters all services. You can call this method when you want to shutdown/upgrade this node.
type StreamAcceptor ¶
type StreamAcceptor func(ctx context.Context, args *share.StreamServiceArgs) bool
StreamAcceptor accepts connection from clients or not. You can use it to validate clients and determine if accept or drop the connection.
type StreamHandler ¶
type StreamHandler func(conn net.Conn, args *share.StreamServiceArgs)
StreamHandler handles a streaming connection with client.
type StreamService ¶
type StreamService struct { Addr string AdvertiseAddr string // contains filtered or unexported fields }
StreamService support streaming between clients and server. It registers a streaming service and listens on the given port. Clients will invokes this service to get the token and send the token and begin to stream.
func NewStreamService ¶
func NewStreamService(addr string, streamHandler StreamHandler, acceptor StreamAcceptor, waitNum int) *StreamService
NewStreamService creates a stream service.
func (*StreamService) Start ¶
func (s *StreamService) Start() error
func (*StreamService) Stop ¶
func (s *StreamService) Stop() error
func (*StreamService) Stream ¶
func (s *StreamService) Stream(ctx context.Context, args *share.StreamServiceArgs, reply *share.StreamServiceReply) error
type VersionTag ¶
type VersionTag struct{}
VersionTag is a special 0 sized struct that encodes as the jsonrpc version tag. It will fail during decode if it is not the correct version tag in the stream.
func (VersionTag) MarshalJSON ¶
func (VersionTag) MarshalJSON() ([]byte, error)
func (VersionTag) UnmarshalJSON ¶
func (VersionTag) UnmarshalJSON(data []byte) error
type ViewManager ¶
type ViewManager struct { Smgr *viewer.StatsMgr Ctx context.Context Cancel context.CancelFunc Views []viewer.Viewer // contains filtered or unexported fields }
ViewManager
func NewViewManager ¶
func NewViewManager(ln net.Listener, s *Server) *ViewManager
NewViewManager creates a new ViewManager instance
func (*ViewManager) Register ¶
func (vm *ViewManager) Register(views ...viewer.Viewer)
Register registers views to the ViewManager
func (*ViewManager) Start ¶
func (vm *ViewManager) Start() error
Start runs a http server and begin to collect metrics
type WorkerPool ¶
Source Files ¶
- context.go
- converter.go
- file_transfer.go
- gateway.go
- jsonrpc2.go
- jsonrpc2_wire.go
- listener.go
- listener_linux.go
- listener_unix.go
- memconn.go
- option.go
- options.go
- plugin.go
- pool.go
- server.go
- service.go
- statsview.go
- statsview_handler_viewers copy.go
- statsview_processtime_viewers.go
- statsview_reqrate_viewers.go
- stream.go