nrpc

package module
v0.0.0-...-f10c188 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2022 License: MIT Imports: 14 Imported by: 0

README

NRPC - Fast RPC

Documentation

Index

Constants

View Source
const (
	TimeZero    time.Duration = 0
	TimeForever time.Duration = 1<<63 - 1
)
View Source
const (
	// CmdNone is invalid
	CmdNone byte = 0

	// CmdRequest the other side should response to a request message
	CmdRequest byte = 1

	// CmdResponse the other side should not response to a request message
	CmdResponse byte = 2

	// CmdNotify the other side should not response to a request message
	CmdNotify byte = 3
)
View Source
const (
	// HeaderIndexBodyLenBegin .
	HeaderIndexBodyLenBegin = 0
	// HeaderIndexBodyLenEnd .
	HeaderIndexBodyLenEnd = 4
	// HeaderIndexReserved .
	HeaderIndexReserved = 4
	// HeaderIndexCmd .
	HeaderIndexCmd = 5
	// HeaderIndexFlag .
	HeaderIndexFlag = 6
	// HeaderIndexMethodLen .
	HeaderIndexMethodLen = 7
	// HeaderIndexSeqBegin .
	HeaderIndexSeqBegin = 8
	// HeaderIndexSeqEnd .
	HeaderIndexSeqEnd = 16
	// HeaderFlagMaskError .
	HeaderFlagMaskError byte = 0x01
	// HeaderFlagMaskAsync .
	HeaderFlagMaskAsync byte = 0x02
)
View Source
const (
	// HeadLen represents Message head length.
	HeadLen int = 16

	// MaxMethodLen limits Message method length.
	MaxMethodLen int = 127

	// MaxBodyLen limits Message body length.
	MaxBodyLen int = 1024*1024*64 - 16
)

Variables

View Source
var (
	// Represents a timeout error because of timer or context.
	ErrClientTimeout = errors.New("timeout")

	// Represents an error of 0 time parameter.
	ErrClientInvalidTimeoutZero = errors.New("invalid timeout, should not be 0")

	// Represents an error of less than 0 time parameter.
	ErrClientInvalidTimeoutLessThanZero = errors.New("invalid timeout, should not be < 0")

	// Represents an error with 0 time parameter but with non-nil callback.
	ErrClientInvalidTimeoutZeroWithNonNilCallback = errors.New("invalid timeout 0 with non-nil callback")

	// Represents an error of Client's send queue is full.
	ErrClientOverstock = errors.New("timeout: rpc Client's send queue is full")

	// Represents an error that Client is reconnecting.
	ErrClientReconnecting = errors.New("client reconnecting")

	// Represents an error that Client is stopped.
	ErrClientStopped = errors.New("client stopped")

	// Represents an error of empty dialer array.
	ErrClientInvalidPoolDialers = errors.New("invalid dialers: empty array")
)
View Source
var (
	// Invalid message CMD.
	ErrInvalidRspMessage = errors.New("invalid response message cmd")

	// Method not found.
	ErrMethodNotFound = errors.New("method not found")

	// Invlaid flag bit index.
	ErrInvalidFlagBitIndex = errors.New("invalid index, should be 0-7")
)
View Source
var (
	ErrContextResponseToNotify = errors.New("should not response to a context with notify message")
)
View Source
var (
	ErrTimeout = errors.New("timeout")
)

Functions

func AsyncResponse

func AsyncResponse() bool

Returns default AsyncResponse flag.

func BatchRecv

func BatchRecv() bool

Returns default BatchRecv flag.

func BatchSend

func BatchSend() bool

Returns default BatchSend flag.

func BeforeRecv

func BeforeRecv(h func(net.Conn) error)

Registers default handler which will be called before Recv.

func BeforeSend

func BeforeSend(h func(net.Conn) error)

Registers default handler which will be called before Send.

func Handle

func Handle(m string, h HandlerFunc, args ...interface{})

Handle registers default method/router handler.

If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine, Else the handler will be called synchronously in the client's reading goroutine one by one.

func HandleConnected

func HandleConnected(onConnected func(*Client))

Registers default handler which will be called when client connected.

func HandleDisconnected

func HandleDisconnected(onDisConnected func(*Client))

Registers default handler which will be called when client disconnected.

func HandleMessageDropped

func HandleMessageDropped(onOverstock func(c *Client, m *Message))

Registers default handler which will be called when message dropped.

func HandleNotFound

func HandleNotFound(h HandlerFunc)

HandleNotFound registers default "" method/router handler, It will be called when mothod/router is not found.

func HandleOverstock

func HandleOverstock(onOverstock func(c *Client, m *Message))

Registers default handler which will be called when client send queue is overstock.

func HandleSessionMiss

func HandleSessionMiss(onSessionMiss func(c *Client, m *Message))

Registers default handler which will be called when async message seq not found.

func RecvBufferSize

func RecvBufferSize() int

Returns default client's read buffer size.

func SendQueueSize

func SendQueueSize() int

Returns default client's send queue channel capacity.

func SetAsyncResponse

func SetAsyncResponse(async bool)

Sets default AsyncResponse flag.

func SetBatchRecv

func SetBatchRecv(batch bool)

Sets default BatchRecv flag.

func SetBatchSend

func SetBatchSend(batch bool)

Sets default BatchSend flag.

func SetBufferFactory

func SetBufferFactory(f func(int) []byte)

SetBufferFactory registers default buffer maker.

func SetHandler

func SetHandler(h Handler)

Sets default Handler.

func SetLogTag

func SetLogTag(tag string)

Sets DefaultHandler's log tag.

func SetReaderWrapper

func SetReaderWrapper(wrapper func(conn net.Conn) io.Reader)

Registers default reader wrapper for net.Conn.

func SetRecvBufferSize

func SetRecvBufferSize(size int)

Sets default client's read buffer size.

func SetSendQueueSize

func SetSendQueueSize(size int)

Sets default client's send queue channel capacity.

func Use

func Use(h HandlerFunc)

Default method/router handler middleware.

func UseCoder

func UseCoder(coder MessageCoder)

UseCoder registers default message coding middleware, coder.Encode will be called before message send, coder.Decode will be called after message recv.

Types

type Client

type Client struct {
	Conn    net.Conn
	Codec   codec.Codec
	Handler Handler
	Reader  io.Reader
	Dialer  DialerFunc
	Head    Header

	IsAsync bool
	// contains filtered or unexported fields
}

Client may be used by multiple goroutines simultaneously.

func NewClient

func NewClient(dialer DialerFunc) (*Client, error)

NewClient creates a Client.

func (*Client) Call

func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout time.Duration, args ...interface{}) error

Call makes an rpc call with a timeout.

func (*Client) CallAsync

func (c *Client) CallAsync(method string, req interface{}, handler HandlerFunc, timeout time.Duration, args ...interface{}) error

CallAsync makes an asynchronous rpc call with timeout. CallAsync will not block waiting for the server's response, But the handler will be called if the response arrives before the timeout.

func (*Client) CallWith

func (c *Client) CallWith(ctx context.Context, method string, req interface{}, rsp interface{}, args ...interface{}) error

CallWith uses context to make rpc call.

func (*Client) CheckState

func (c *Client) CheckState() error

CheckState checks Client's state

func (*Client) Delete

func (c *Client) Delete(key interface{})

Deletes key-value pair.

func (*Client) Get

func (c *Client) Get(key interface{}) (interface{}, bool)

Returns value for key.

func (*Client) NewMessage

func (c *Client) NewMessage(cmd byte, method string, v interface{}) *Message

Creates a Message by client's seq, handler and codec.

func (*Client) Notify

func (c *Client) Notify(method string, data interface{}, timeout time.Duration, args ...interface{}) error

Notify makes a notify with timeout. A notify does not need a response from the server.

func (*Client) NotifyWith

func (c *Client) NotifyWith(ctx context.Context, method string, data interface{}, args ...interface{}) error

NotifyWith use context to make rpc notify. A notify does not need a response from the server.

func (*Client) PushMsg

func (c *Client) PushMsg(msg *Message, timeout time.Duration) error

Pushes a msg to Client's send queue with timeout.

func (*Client) Restart

func (c *Client) Restart() error

Stops and starts the client.

func (*Client) Set

func (c *Client) Set(key interface{}, value interface{})

Sets key-value pair.

func (*Client) Stop

func (c *Client) Stop()

Stop stops the client.

type ClientPool

type ClientPool struct {
	// contains filtered or unexported fields
}

func NewClientPool

func NewClientPool(dialer DialerFunc, size int) (*ClientPool, error)

Creates a ClientPool.

func NewClientPoolFromDialers

func NewClientPoolFromDialers(dialers []DialerFunc) (*ClientPool, error)

Creates a ClientPool by multiple dialers.

func (*ClientPool) Get

func (pool *ClientPool) Get(index int) *Client

Returns a Client by index.

func (*ClientPool) Handler

func (pool *ClientPool) Handler() Handler

Return Handler

func (*ClientPool) Next

func (pool *ClientPool) Next() *Client

Returns a Client by round robin.

func (*ClientPool) Size

func (pool *ClientPool) Size() int

Returns Client number.

func (*ClientPool) Stop

func (pool *ClientPool) Stop()

Stops all clients.

type Context

type Context struct {
	Client  *Client
	Message *Message
	// contains filtered or unexported fields
}

func (*Context) Abort

func (ctx *Context) Abort()

Stops the one-by-one-calling of middlewares and method/router handler.

func (*Context) Bind

func (ctx *Context) Bind(v interface{}) error

Bind parses the body data and stores the result in the value pointed to by 'v'.

func (*Context) Body

func (ctx *Context) Body() []byte

Returns body.

func (*Context) Deadline

func (ctx *Context) Deadline() (deadline time.Time, ok bool)

Implements stdlib's Context.

func (*Context) Done

func (ctx *Context) Done() <-chan struct{}

Implements stdlib's Context.

func (*Context) Err

func (ctx *Context) Err() error

Implements stdlib's Context.

func (*Context) Error

func (ctx *Context) Error(v interface{}) error

Responses an error Message to the Client.

func (*Context) Get

func (ctx *Context) Get(key interface{}) (interface{}, bool)

Returns value for key.

func (*Context) Next

func (ctx *Context) Next()

Calls next middleware or method/router handler.

func (*Context) Set

func (ctx *Context) Set(key interface{}, value interface{})

Sets key-value pair.

func (*Context) Value

func (ctx *Context) Value(key interface{}) interface{}

Returns the value associated with this context for key, implements stdlib's Context.

func (*Context) Values

func (ctx *Context) Values() map[interface{}]interface{}

Returns values.

func (*Context) Write

func (ctx *Context) Write(v interface{}) error

Responses a Message to the Client.

func (*Context) WriteWithTimeout

func (ctx *Context) WriteWithTimeout(v interface{}, timeout time.Duration) error

Responses a Message to the Client with timeout.

type DialerFunc

type DialerFunc func() (net.Conn, error)

Used by Client to connect to the server.

type Handler

type Handler interface {
	Clone() Handler
	LogTag() string
	SetLogTag(tag string)

	// Registers handler which will be called when client connected.
	HandleConnected(onConnected func(*Client))
	// Will be called when client is connected.
	OnConnected(c *Client)

	// Registers handler which will be called when client is disconnected.
	HandleDisconnected(onDisConnected func(*Client))
	// Will be called when client is disconnected.
	OnDisconnected(c *Client)

	// Registers handler which will be called when client send queue is overstock.
	HandleOverstock(onOverstock func(c *Client, m *Message))
	// Will be called when client chSend is full.
	OnOverstock(c *Client, m *Message)

	// Registers handler which will be called when message dropped.
	HandleMessageDropped(onOverstock func(c *Client, m *Message))
	// Will be called when message is dropped.
	OnMessageDropped(c *Client, m *Message)

	// Registers handler which will be called when async message seq not found.
	HandleSessionMiss(onSessionMiss func(c *Client, m *Message))
	// Will be called when async message seq not found.
	OnSessionMiss(c *Client, m *Message)

	// Registers handler which will be called before Recv.
	BeforeRecv(h func(net.Conn) error)
	// Registers handler which will be called before Send.
	BeforeSend(h func(net.Conn) error)

	// Returns BatchRecv flag.
	BatchRecv() bool
	// Sets BatchRecv flag.
	SetBatchRecv(batch bool)
	// Returns BatchSend flag.
	BatchSend() bool
	// Sets BatchSend flag.
	SetBatchSend(batch bool)

	// Returns AsyncResponse flag.
	AsyncResponse() bool
	// Sets AsyncResponse flag.
	SetAsyncResponse(async bool)

	// Wraps net.Conn to Read data with io.Reader.
	WrapReader(conn net.Conn) io.Reader
	// Registers reader wrapper for net.Conn.
	SetReaderWrapper(wrapper func(conn net.Conn) io.Reader)

	// Reads a message from a client.
	Recv(c *Client) (*Message, error)
	// Writes buffer data to a connection.
	Send(c net.Conn, buffer []byte) (int, error)
	// Writes multiple buffer data to a connection.
	SendN(conn net.Conn, buffers net.Buffers) (int, error)

	// Returns client's read buffer size.
	RecvBufferSize() int
	// Sets client's read buffer size.
	SetRecvBufferSize(size int)

	// Returns client's send queue channel capacity.
	SendQueueSize() int
	// Sets client's send queue channel capacity.
	SetSendQueueSize(size int)

	// Registers method/router handler middleware.
	Use(h HandlerFunc)

	// UseCoder registers message coding middleware,
	// coder.Encode will be called before message send,
	// coder.Decode will be called after message recv.
	UseCoder(coder MessageCoder)

	// Coders returns coding middlewares.
	Coders() []MessageCoder

	// Handle registers method/router handler.
	//
	// If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine,
	// Else the handler will be called synchronously in the client's reading goroutine one by one.
	Handle(m string, h HandlerFunc, args ...interface{})

	// HandleNotFound registers "" method/router handler,
	// It will be called when mothod/router is not found.
	HandleNotFound(h HandlerFunc)

	// OnMessage finds method/router middlewares and handler, then call them one by one.
	OnMessage(c *Client, m *Message)

	// GetBuffer makes a buffer by size.
	GetBuffer(size int) []byte

	// SetBufferFactory registers buffer maker.
	SetBufferFactory(f func(int) []byte)
}

Defines net message handler interface.

var DefaultHandler Handler = NewHandler()

func NewHandler

func NewHandler() Handler

Returns a default Handler implementation.

type HandlerFunc

type HandlerFunc func(*Context)

Defines message handler of middleware and method/router.

type Header []byte

Header defines Message head.

func (Header) BodyLen

func (h Header) BodyLen() int

BodyLen returns Message body length.

type Message

type Message struct {
	Buffer []byte
	// contains filtered or unexported fields
}

Message represents an nrpc Message.

func (*Message) BodyLen

func (m *Message) BodyLen() int

BodyLen returns body length.

func (*Message) Cmd

func (m *Message) Cmd() byte

Cmd returns cmd.

func (*Message) Data

func (m *Message) Data() []byte

Data returns payload data after method.

func (*Message) Error

func (m *Message) Error() error

Error returns error.

func (*Message) Get

func (m *Message) Get(key interface{}) (interface{}, bool)

Get returns value for key.

func (*Message) IsAsync

func (m *Message) IsAsync() bool

IsAsync returns async flag.

func (*Message) IsError

func (m *Message) IsError() bool

IsError returns error flag.

func (*Message) IsFlagBitSet

func (m *Message) IsFlagBitSet(index int) bool

IsFlagBitSet returns flag bit value.

func (*Message) Len

func (m *Message) Len() int

Len returns total length of buffer.

func (*Message) Method

func (m *Message) Method() string

Method returns method.

func (*Message) MethodLen

func (m *Message) MethodLen() int

MethodLen returns method length.

func (*Message) Seq

func (m *Message) Seq() uint64

Seq returns sequence number.

func (*Message) Set

func (m *Message) Set(key interface{}, value interface{})

Set sets key-value pair.

func (*Message) SetAsync

func (m *Message) SetAsync(isAsync bool)

SetAsync sets async flag.

func (*Message) SetBodyLen

func (m *Message) SetBodyLen(l int)

SetBodyLen sets body length.

func (*Message) SetCmd

func (m *Message) SetCmd(cmd byte)

SetCmd sets cmd.

func (*Message) SetError

func (m *Message) SetError(isError bool)

SetError sets error flag.

func (*Message) SetFlagBit

func (m *Message) SetFlagBit(index int, value bool) error

SetFlagBit sets flag bit value by index.

func (*Message) SetMethodLen

func (m *Message) SetMethodLen(l int)

SetMethodLen sets method length.

func (*Message) SetSeq

func (m *Message) SetSeq(seq uint64)

SetSeq sets sequence number.

func (*Message) Values

func (m *Message) Values() map[interface{}]interface{}

Values returns values.

type MessageCoder

type MessageCoder interface {
	// Encode wrap message before send to client
	Encode(*Client, *Message) *Message
	// Decode unwrap message between recv and handle
	Decode(*Client, *Message) *Message
}

MessageCoder defines Message coding middleware interface.

type Server

type Server struct {
	Accepted int64
	CurrLoad int64
	MaxLoad  int64

	Codec   codec.Codec
	Handler Handler

	Listener net.Listener
	// contains filtered or unexported fields
}

func NewServer

func NewServer() *Server

NewServer .

func (*Server) NewMessage

func (s *Server) NewMessage(cmd byte, method string, v interface{}) *Message

NewMessage creates a Message.

func (*Server) Run

func (s *Server) Run(addr string) error

Run .

func (*Server) Serve

func (s *Server) Serve(ln net.Listener) error

Serve .

func (*Server) Shutdown

func (s *Server) Shutdown(ctx context.Context) error

Shutdown .

func (*Server) Stop

func (s *Server) Stop() error

Stop .

type WebsocketConn

type WebsocketConn interface {
	HandleWebsocket(func())
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL