Documentation ¶
Index ¶
- Constants
- Variables
- func Append(buf []byte, more ...byte) []byte
- func AppendString(buf []byte, more string) []byte
- func AsyncExecute(f func())
- func AsyncResponse() bool
- func BatchRecv() bool
- func BatchSend() bool
- func BeforeRecv(h func(net.Conn) error)
- func BeforeSend(h func(net.Conn) error)
- func EnablePool(enable bool)
- func Free(buf []byte)
- func Handle(m string, h HandlerFunc, args ...interface{})
- func HandleConnected(onConnected func(*Client))
- func HandleDisconnected(onDisConnected func(*Client))
- func HandleMalloc(f func(int) []byte)
- func HandleMessageDropped(onOverstock func(c *Client, m *Message))
- func HandleNotFound(h HandlerFunc)
- func HandleOverstock(onOverstock func(c *Client, m *Message))
- func HandleSessionMiss(onSessionMiss func(c *Client, m *Message))
- func LogDebugInfo()
- func Malloc(size int) []byte
- func MaxBodyLen() int
- func ReadTimeout() time.Duration
- func Realloc(buf []byte, size int) []byte
- func RecvBufferSize() int
- func SendBufferSize() int
- func SendQueueSize() int
- func SetAsyncExecutor(executor func(f func()))
- func SetAsyncResponse(async bool)
- func SetBatchRecv(batch bool)
- func SetBatchSend(batch bool)
- func SetDebug(enable bool)
- func SetHandler(h Handler)
- func SetLogTag(tag string)
- func SetMaxBodyLen(l int)
- func SetReadTimeout(timeout time.Duration)
- func SetReaderWrapper(wrapper func(conn net.Conn) io.Reader)
- func SetRecvBufferSize(size int)
- func SetSendBufferSize(size int)
- func SetSendQueueSize(size int)
- func SetStreamQueueSize(size int)
- func SetWriteTimeout(timeout time.Duration)
- func StreamQueueSize() int
- func Use(h HandlerFunc)
- func UseCoder(coder MessageCoder)
- func WriteTimeout() time.Duration
- type Allocator
- type AsyncHandlerFunc
- type BufferPool
- func (bp *BufferPool) Append(buf []byte, more ...byte) []byte
- func (bp *BufferPool) AppendString(buf []byte, more string) []byte
- func (bp *BufferPool) Free(buf []byte)
- func (bp *BufferPool) LogDebugInfo()
- func (bp *BufferPool) Malloc(size int) []byte
- func (bp *BufferPool) Realloc(buf []byte, size int) []byte
- type Client
- func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout time.Duration, ...) error
- func (c *Client) CallAsync(method string, req interface{}, handler AsyncHandlerFunc, ...) error
- func (c *Client) CallContext(ctx context.Context, method string, req interface{}, rsp interface{}, ...) error
- func (c *Client) CallWith(ctx context.Context, method string, req interface{}, rsp interface{}, ...) error
- func (c *Client) CheckState() error
- func (c *Client) Delete(key interface{})
- func (c *Client) Get(key interface{}) (interface{}, bool)
- func (c *Client) Keepalive(interval time.Duration)
- func (c *Client) NewMessage(cmd byte, method string, v interface{}, args ...interface{}) *Message
- func (client *Client) NewStream(method string) *Stream
- func (c *Client) Notify(method string, data interface{}, timeout time.Duration, args ...interface{}) error
- func (c *Client) NotifyContext(ctx context.Context, method string, data interface{}, args ...interface{}) error
- func (c *Client) NotifyWith(ctx context.Context, method string, data interface{}, args ...interface{}) error
- func (c *Client) Ping()
- func (c *Client) Pong()
- func (c *Client) PushMsg(msg *Message, timeout time.Duration) error
- func (c *Client) Restart() error
- func (c *Client) Set(key interface{}, value interface{})
- func (c *Client) SetState(running bool)
- func (c *Client) Stop()
- type ClientPool
- type Context
- func (ctx *Context) Abort()
- func (ctx *Context) Bind(v interface{}) error
- func (ctx *Context) Body() []byte
- func (ctx *Context) Deadline() (deadline time.Time, ok bool)
- func (ctx *Context) Done() <-chan struct{}
- func (ctx *Context) Err() error
- func (ctx *Context) Error(v interface{}) error
- func (ctx *Context) Get(key interface{}) (interface{}, bool)
- func (ctx *Context) Next()
- func (ctx *Context) Release()
- func (ctx *Context) ResponseError() interface{}
- func (ctx *Context) Set(key interface{}, value interface{})
- func (ctx *Context) Value(key interface{}) interface{}
- func (ctx *Context) Values() map[interface{}]interface{}
- func (ctx *Context) Write(v interface{}) error
- func (ctx *Context) WriteWithTimeout(v interface{}, timeout time.Duration) error
- type DialerFunc
- type Handler
- type HandlerFunc
- type Header
- type Message
- func (m *Message) BodyLen() int
- func (m *Message) Cmd() byte
- func (m *Message) Data() []byte
- func (m *Message) Error() error
- func (m *Message) Get(key interface{}) (interface{}, bool)
- func (m *Message) IsAsync() bool
- func (m *Message) IsError() bool
- func (m *Message) IsFlagBitSet(index int) bool
- func (m *Message) IsStreamEOF() bool
- func (m *Message) IsStreamLocal() bool
- func (m *Message) Len() int
- func (m *Message) Method() string
- func (m *Message) MethodLen() int
- func (m *Message) Payback()
- func (m *Message) Release() int32
- func (m *Message) ResetAttrs()
- func (m *Message) Retain() int32
- func (m *Message) Seq() uint64
- func (m *Message) Set(key interface{}, value interface{})
- func (m *Message) SetAsync(isAsync bool)
- func (m *Message) SetBodyLen(l int)
- func (m *Message) SetCmd(cmd byte)
- func (m *Message) SetError(isError bool)
- func (m *Message) SetFlagBit(index int, value bool) error
- func (m *Message) SetMethodLen(l int)
- func (m *Message) SetSeq(seq uint64)
- func (m *Message) SetStreamEOF(eof bool)
- func (m *Message) SetStreamLocal(local bool)
- func (m *Message) Values() map[interface{}]interface{}
- type MessageCoder
- type NativeAllocator
- type Server
- func (s *Server) Broadcast(method string, v interface{}, args ...interface{})
- func (s *Server) BroadcastWithFilter(method string, v interface{}, filter func(*Client) bool, args ...interface{})
- func (s *Server) ForEach(h func(*Client))
- func (s *Server) ForEachWithFilter(h func(*Client), filter func(*Client) bool)
- func (s *Server) NewMessage(cmd byte, method string, v interface{}, args ...interface{}) *Message
- func (s *Server) Run(addr string) error
- func (s *Server) Serve(ln net.Listener) error
- func (s *Server) Shutdown(ctx context.Context) error
- func (s *Server) Stop() error
- type Stream
- func (s *Stream) CloseRecv()
- func (s *Stream) CloseRecvContext(ctx context.Context)
- func (s *Stream) CloseSend()
- func (s *Stream) CloseSendContext(ctx context.Context)
- func (s *Stream) Id() uint64
- func (s *Stream) Recv(v interface{}) error
- func (s *Stream) RecvContext(ctx context.Context, v interface{}) error
- func (s *Stream) RecvWith(ctx context.Context, v interface{}) error
- func (s *Stream) Send(v interface{}, args ...interface{}) error
- func (s *Stream) SendAndClose(v interface{}, args ...interface{}) error
- func (s *Stream) SendAndCloseContext(ctx context.Context, v interface{}, args ...interface{}) error
- func (s *Stream) SendAndCloseWith(ctx context.Context, v interface{}, args ...interface{}) error
- func (s *Stream) SendContext(ctx context.Context, v interface{}, args ...interface{}) error
- func (s *Stream) SendWith(ctx context.Context, v interface{}, args ...interface{}) error
- type StreamHandlerFunc
Constants ¶
const ( // TimeZero represents zero time. TimeZero time.Duration = 0 // TimeForever represents forever time. TimeForever time.Duration = 1<<63 - 1 )
const ( // CmdNone is invalid CmdNone byte = 0 // CmdRequest the other side should response to a request message CmdRequest byte = 1 // CmdResponse the other side should not response to a request message CmdResponse byte = 2 // CmdNotify the other side should not response to a request message CmdNotify byte = 3 // CmdPing . CmdPing byte = 4 // CmdPong . CmdPong byte = 5 // CmdStream . CmdStream byte = 6 )
const ( // HeaderIndexBodyLenBegin . HeaderIndexBodyLenBegin = 0 // HeaderIndexBodyLenEnd . HeaderIndexBodyLenEnd = 4 // HeaderIndexReserved . HeaderIndexReserved = 4 // HeaderIndexCmd . HeaderIndexCmd = 5 // HeaderIndexFlag . HeaderIndexFlag = 6 // HeaderIndexMethodLen . HeaderIndexMethodLen = 7 // HeaderIndexSeqBegin . HeaderIndexSeqBegin = 8 // HeaderIndexSeqEnd . HeaderIndexSeqEnd = 16 // HeaderFlagMaskError . HeaderFlagMaskError byte = 0x01 // HeaderFlagMaskAsync . HeaderFlagMaskAsync byte = 0x02 HeaderStreamLocalBitIndex = 7 HeaderStreamEOFBitIndex = 6 HeaderStreamLocalBit = byte(0x1) << HeaderStreamLocalBitIndex HeaderStreamEOFBit = byte(0x1) << HeaderStreamEOFBitIndex HeaderStreamFlagBitMask = HeaderStreamLocalBit | HeaderStreamEOFBit HeaderCmdBitMask = ^HeaderStreamFlagBitMask )
const ( // HeadLen represents Message head length. HeadLen int = 16 // MaxMethodLen limits Message method length. MaxMethodLen int = 127 // DefaultMaxBodyLen limits Message body length. DefaultMaxBodyLen int = 1024*1024*64 - 16 )
Variables ¶
var ( // ErrClientTimeout represents a timeout error because of timer or context. ErrClientTimeout = errors.New("timeout") // ErrClientInvalidTimeoutZero represents an error of 0 time parameter. ErrClientInvalidTimeoutZero = errors.New("invalid timeout, should not be 0") // ErrClientInvalidTimeoutLessThanZero represents an error of less than 0 time parameter. ErrClientInvalidTimeoutLessThanZero = errors.New("invalid timeout, should not be < 0") // ErrClientInvalidTimeoutZeroWithNonNilCallback represents an error with 0 time parameter but with non-nil callback. ErrClientInvalidTimeoutZeroWithNonNilCallback = errors.New("invalid timeout 0 with non-nil callback") // ErrClientOverstock represents an error of Client's send queue is full. ErrClientOverstock = errors.New("timeout: rpc Client's send queue is full") // ErrClientReconnecting represents an error that Client is reconnecting. ErrClientReconnecting = errors.New("client reconnecting") // ErrClientStopped represents an error that Client is stopped. ErrClientStopped = errors.New("client stopped") // ErrClientInvalidPoolDialers represents an error of empty dialer array. ErrClientInvalidPoolDialers = errors.New("invalid dialers: empty array") // ErrClientInvalidAsyncHandler represents an error of invalid(nil) async handler. ErrClientInvalidAsyncHandler = errors.New("invalid async handler: should not be nil") )
client error
var ( // ErrInvalidRspMessage represents an error of invalid message CMD. ErrInvalidRspMessage = errors.New("invalid response message cmd") // ErrMethodNotFound represents an error of method not found. ErrMethodNotFound = errors.New("method not found") // ErrInvalidFlagBitIndex represents an error of invlaid flag bit index. ErrInvalidFlagBitIndex = errors.New("invalid index, should be 0-7") )
message error
var ( // PingMessage . PingMessage = newMessage(CmdPing, "", nil, false, false, 0, nil, nil, nil) // PongMessage . PongMessage = newMessage(CmdPong, "", nil, false, false, 0, nil, nil, nil) )
var ( // ErrContextResponseToNotify represents an error that response to a notify message. ErrContextResponseToNotify = errors.New("should not response to a context with notify message") )
context error
var ( // ErrStreamClosedSend represents an error of stream closed send. ErrStreamClosedSend = errors.New("stream has closed send") )
stream errors
var ( // ErrTimeout represents an error of timeout. ErrTimeout = errors.New("timeout") )
general errors
Functions ¶
func AppendString ¶ added in v1.2.7
AppendString exports default package method.
func BeforeRecv ¶
BeforeRecv registers default handler which will be called before Recv.
func BeforeSend ¶
BeforeSend registers default handler which will be called before Send.
func EnablePool ¶ added in v1.1.9
func EnablePool(enable bool)
EnablePool registers handlers for pool operation for Context and Message and Message.Buffer
func Handle ¶
func Handle(m string, h HandlerFunc, args ...interface{})
Handle registers default method/router handler.
If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine, Else the handler will be called synchronously in the client's reading goroutine one by one.
func HandleConnected ¶
func HandleConnected(onConnected func(*Client))
HandleConnected registers default handler which will be called when client connected.
func HandleDisconnected ¶
func HandleDisconnected(onDisConnected func(*Client))
HandleDisconnected registers default handler which will be called when client disconnected.
func HandleMalloc ¶ added in v1.1.9
HandleMalloc registers default buffer maker.
func HandleMessageDropped ¶
HandleMessageDropped registers default handler which will be called when message dropped.
func HandleNotFound ¶
func HandleNotFound(h HandlerFunc)
HandleNotFound registers default "" method/router handler, It will be called when mothod/router is not found.
func HandleOverstock ¶
HandleOverstock registers default handler which will be called when client send queue is overstock.
func HandleSessionMiss ¶
HandleSessionMiss registers default handler which will be called when async message seq not found.
func MaxBodyLen ¶
func MaxBodyLen() int
func ReadTimeout ¶ added in v1.2.11
ReadTimeout returns client's read timeout.
func RecvBufferSize ¶
func RecvBufferSize() int
RecvBufferSize returns default client's read buffer size.
func SendBufferSize ¶ added in v1.1.9
func SendBufferSize() int
SendBufferSize returns default client's read buffer size.
func SendQueueSize ¶
func SendQueueSize() int
SendQueueSize returns default client's send queue channel capacity.
func SetAsyncExecutor ¶ added in v1.1.10
func SetAsyncExecutor(executor func(f func()))
SetAsyncExecutor sets executor. AsyncExecute executes a func
func SetAsyncResponse ¶
func SetAsyncResponse(async bool)
SetAsyncResponse sets default AsyncResponse flag.
func SetMaxBodyLen ¶ added in v1.1.11
func SetMaxBodyLen(l int)
func SetReadTimeout ¶ added in v1.2.11
SetReadTimeout sets client's read timeout.
func SetReaderWrapper ¶
SetReaderWrapper registers default reader wrapper for net.Conn.
func SetRecvBufferSize ¶
func SetRecvBufferSize(size int)
SetRecvBufferSize sets default client's read buffer size.
func SetSendBufferSize ¶ added in v1.1.9
func SetSendBufferSize(size int)
SetSendBufferSize sets default client's read buffer size.
func SetSendQueueSize ¶
func SetSendQueueSize(size int)
SetSendQueueSize sets default client's send queue channel capacity.
func SetStreamQueueSize ¶ added in v1.2.15
func SetStreamQueueSize(size int)
SetStreamQueueSize sets default stream queue channel capacity.
func SetWriteTimeout ¶ added in v1.2.11
SetWriteTimeout sets client's write timeout.
func StreamQueueSize ¶ added in v1.2.15
func StreamQueueSize() int
StreamQueueSize returns default stream queue channel capacity.
func UseCoder ¶
func UseCoder(coder MessageCoder)
UseCoder registers default message coding middleware, coder.Encode will be called before message send, coder.Decode will be called after message recv.
func WriteTimeout ¶ added in v1.2.11
WriteTimeout returns client's write timeout.
Types ¶
type Allocator ¶ added in v1.2.7
type AsyncHandlerFunc ¶ added in v1.2.12
AsyncHandlerFunc defines callback of Client.CallAsync.
type BufferPool ¶ added in v1.1.9
type BufferPool struct { Debug bool // contains filtered or unexported fields }
BufferPool .
func (*BufferPool) Append ¶ added in v1.2.7
func (bp *BufferPool) Append(buf []byte, more ...byte) []byte
Append .
func (*BufferPool) AppendString ¶ added in v1.2.7
func (bp *BufferPool) AppendString(buf []byte, more string) []byte
AppendString .
func (*BufferPool) LogDebugInfo ¶ added in v1.2.7
func (bp *BufferPool) LogDebugInfo()
type Client ¶
type Client struct { Conn net.Conn Codec codec.Codec Handler Handler Reader io.Reader Dialer DialerFunc Head Header // contains filtered or unexported fields }
Client represents an arpc Client. There may be multiple outstanding Calls or Notifys associated with a single Client, and a Client may be used by multiple goroutines simultaneously.
func NewClient ¶
func NewClient(dialer DialerFunc, args ...interface{}) (*Client, error)
NewClient creates a Client.
func (*Client) Call ¶
func (c *Client) Call(method string, req interface{}, rsp interface{}, timeout time.Duration, args ...interface{}) error
Call makes an rpc call with a timeout. Call will block waiting for the server's response until timeout.
func (*Client) CallAsync ¶
func (c *Client) CallAsync(method string, req interface{}, handler AsyncHandlerFunc, timeout time.Duration, args ...interface{}) error
CallAsync makes an asynchronous rpc call with timeout. CallAsync will not block waiting for the server's response, But the handler will be called if the response arrives before the timeout.
func (*Client) CallContext ¶ added in v1.2.12
func (c *Client) CallContext(ctx context.Context, method string, req interface{}, rsp interface{}, args ...interface{}) error
CallContext uses context to make rpc call. CallContext blocks to wait for a response from the server until it times out.
func (*Client) CallWith ¶
func (c *Client) CallWith(ctx context.Context, method string, req interface{}, rsp interface{}, args ...interface{}) error
CallWith uses context to make rpc call. CallWith blocks to wait for a response from the server until it times out.
func (*Client) NewMessage ¶
NewMessage creates a Message by client's seq, handler and codec.
func (*Client) Notify ¶
func (c *Client) Notify(method string, data interface{}, timeout time.Duration, args ...interface{}) error
Notify makes a notify with timeout. A notify does not need a response from the server.
func (*Client) NotifyContext ¶ added in v1.2.12
func (c *Client) NotifyContext(ctx context.Context, method string, data interface{}, args ...interface{}) error
NotifyContext use context to make rpc notify. A notify does not need a response from the server.
func (*Client) NotifyWith ¶
func (c *Client) NotifyWith(ctx context.Context, method string, data interface{}, args ...interface{}) error
NotifyWith use context to make rpc notify. A notify does not need a response from the server.
func (*Client) Set ¶
func (c *Client) Set(key interface{}, value interface{})
Set sets key-value pair.
type ClientPool ¶
type ClientPool struct {
// contains filtered or unexported fields
}
ClientPool represents an arpc Client Pool.
func NewClientPool ¶
func NewClientPool(dialer DialerFunc, size int, args ...interface{}) (*ClientPool, error)
NewClientPool creates a ClientPool.
func NewClientPoolFromDialers ¶
func NewClientPoolFromDialers(dialers []DialerFunc, args ...interface{}) (*ClientPool, error)
NewClientPoolFromDialers creates a ClientPool by multiple dialers.
func (*ClientPool) Get ¶
func (pool *ClientPool) Get(index int) *Client
Get returns a Client by index.
func (*ClientPool) Next ¶
func (pool *ClientPool) Next() *Client
Next returns a Client by round robin.
type Context ¶
Context represents an arpc Call's context.
func (*Context) Abort ¶ added in v1.1.0
func (ctx *Context) Abort()
Abort stops the one-by-one-calling of middlewares and method/router handler.
func (*Context) Bind ¶
Bind parses the body data and stores the result in the value pointed to by v.
func (*Context) Next ¶
func (ctx *Context) Next()
Next calls next middleware or method/router handler.
func (*Context) ResponseError ¶ added in v1.2.16
func (ctx *Context) ResponseError() interface{}
func (*Context) Set ¶
func (ctx *Context) Set(key interface{}, value interface{})
Set sets key-value pair.
func (*Context) Value ¶ added in v1.1.0
func (ctx *Context) Value(key interface{}) interface{}
Value returns the value associated with this context for key, implements stdlib's Context.
func (*Context) Values ¶
func (ctx *Context) Values() map[interface{}]interface{}
Values returns values.
type DialerFunc ¶
DialerFunc defines the dialer used by arpc Client to connect to the server.
type Handler ¶
type Handler interface { // Clone returns a copy of Handler. Clone() Handler // LogTag returns log tag value. LogTag() string // SetLogTag sets log tag. SetLogTag(tag string) // HandleConnected registers handler which will be called when client connected. HandleConnected(onConnected func(*Client)) // OnConnected will be called when client is connected. OnConnected(c *Client) // HandleDisconnected registers handler which will be called when client is disconnected. HandleDisconnected(onDisConnected func(*Client)) // OnDisconnected will be called when client is disconnected. OnDisconnected(c *Client) // MaxReconnectTimes returns client's max reconnect times. MaxReconnectTimes() int // SetMaxReconnectTimes sets client's max reconnect times for. SetMaxReconnectTimes(n int) // HandleOverstock registers handler which will be called when client send queue is overstock. HandleOverstock(onOverstock func(c *Client, m *Message)) // OnOverstock will be called when client chSend is full. OnOverstock(c *Client, m *Message) // HandleMessageDone registers handler which will be called when message dropped. HandleMessageDone(onMessageDone func(c *Client, m *Message)) // OnMessageDone will be called when message is dropped. OnMessageDone(c *Client, m *Message) // HandleMessageDropped registers handler which will be called when message dropped. HandleMessageDropped(onOverstock func(c *Client, m *Message)) // OnOverstock will be called when message is dropped. OnMessageDropped(c *Client, m *Message) // HandleSessionMiss registers handler which will be called when async message seq not found. HandleSessionMiss(onSessionMiss func(c *Client, m *Message)) // OnSessionMiss will be called when async message seq not found. OnSessionMiss(c *Client, m *Message) // HandleContextDone registers handler which will be called when message dropped. HandleContextDone(onContextDone func(ctx *Context)) // OnContextDone will be called when message is dropped. OnContextDone(ctx *Context) // BeforeRecv registers handler which will be called before Recv. BeforeRecv(h func(net.Conn) error) // BeforeSend registers handler which will be called before Send. BeforeSend(h func(net.Conn) error) // BatchRecv returns BatchRecv flag. BatchRecv() bool // SetBatchRecv sets BatchRecv flag. SetBatchRecv(batch bool) // BatchSend returns BatchSend flag. BatchSend() bool // SetBatchSend sets BatchSend flag. SetBatchSend(batch bool) // AsyncWrite returns AsyncWrite flag. AsyncWrite() bool // SetAsyncWrite sets AsyncWrite flag. SetAsyncWrite(async bool) // AsyncResponse returns AsyncResponse flag. AsyncResponse() bool // SetAsyncResponse sets AsyncResponse flag. SetAsyncResponse(async bool) // WrapReader wraps net.Conn to Read data with io.Reader. WrapReader(conn net.Conn) io.Reader // SetReaderWrapper registers reader wrapper for net.Conn. SetReaderWrapper(wrapper func(conn net.Conn) io.Reader) // Recv reads a message from a client. Recv(c *Client) (*Message, error) // Send writes buffer data to a connection. Send(c net.Conn, buffer []byte) (int, error) // SendN writes multiple buffer data to a connection. SendN(conn net.Conn, buffers net.Buffers) (int, error) // RecvBufferSize returns client's recv buffer size. RecvBufferSize() int // SetRecvBufferSize sets client's recv buffer size. SetRecvBufferSize(size int) // SendBufferSize returns client's send buffer size. SendBufferSize() int // SetSendBufferSize sets client's send buffer size. SetSendBufferSize(size int) // ReadTimeout returns client's read timeout. ReadTimeout() time.Duration // SetReadTimeout sets client's read timeout. SetReadTimeout(timeout time.Duration) // WriteTimeout returns client's write timeout. WriteTimeout() time.Duration // SetWriteTimeout sets client's write timeout. SetWriteTimeout(timeout time.Duration) // SendQueueSize returns client's send queue channel capacity. SendQueueSize() int // SetSendQueueSize sets client's send queue channel capacity. SetSendQueueSize(size int) // StreamQueueSize returns stream queue channel capacity. StreamQueueSize() int // SetStreamQueueSize sets stream queue channel capacity. SetStreamQueueSize(size int) // MaxBodyLen returns max body length of a message. MaxBodyLen() int // SetMaxBodyLen sets max body length of a message. SetMaxBodyLen(l int) // Use registers method/router handler middleware. Use(h HandlerFunc) // UseCoder registers message coding middleware, // coder.Encode will be called before message send, // coder.Decode will be called after message recv. UseCoder(coder MessageCoder) // Coders returns coding middlewares. Coders() []MessageCoder // Handle registers method/router handler. // // If pass a Boolean value of "true", the handler will be called asynchronously in a new goroutine, // Else the handler will be called synchronously in the client's reading goroutine one by one. Handle(m string, h HandlerFunc, args ...interface{}) // HandleNotFound registers "" method/router handler, // It will be called when mothod/router is not found. HandleNotFound(h HandlerFunc) // HandleStream registers method/router stream handler. HandleStream(m string, h StreamHandlerFunc, args ...interface{}) // OnMessage finds method/router middlewares and handler, then call them one by one. OnMessage(c *Client, m *Message) // Malloc makes a buffer by size. Malloc(size int) []byte // HandleMalloc registers buffer maker. HandleMalloc(f func(size int) []byte) // Append append bytes to buffer. Append(b []byte, more ...byte) []byte // HandleAppend registers buffer appender. HandleAppend(f func(b []byte, more ...byte) []byte) // Free release a buffer. Free([]byte) // HandleFree registers buffer releaser. HandleFree(f func(buf []byte)) // EnablePool registers handlers for pool operation for Context and Message and Message.Buffer EnablePool(enable bool) Context() (context.Context, context.CancelFunc) SetContext(ctx context.Context, cancel context.CancelFunc) Cancel() // NewMessage creates a Message. NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, codec codec.Codec, values map[interface{}]interface{}) *Message // NewMessageWithBuffer creates a message with the buffer and manage the message by the pool. // The buffer arg should be managed by a pool if EnablePool(true) . NewMessageWithBuffer(buffer []byte) *Message // SetAsyncExecutor sets executor. SetAsyncExecutor(executor func(f func())) // AsyncExecute executes a func AsyncExecute(f func()) }
Handler defines net message handler interface.
var DefaultHandler Handler = NewHandler()
DefaultHandler is the default Handler used by arpc
type Message ¶
type Message struct { Buffer []byte // contains filtered or unexported fields }
Message represents an arpc Message.
func NewMessage ¶ added in v1.1.9
func NewMessage(cmd byte, method string, v interface{}, isError bool, isAsync bool, seq uint64, h Handler, codec codec.Codec, values map[interface{}]interface{}) *Message
NewMessage creates a Message.
func (*Message) IsFlagBitSet ¶
IsFlagBitSet returns flag bit value.
func (*Message) IsStreamEOF ¶ added in v1.2.15
IsStream represents whether it's a stream's last message and the stream is EOF and closed.
func (*Message) IsStreamLocal ¶ added in v1.2.15
IsStream represents whether it's a stream message.
func (*Message) Payback ¶ added in v1.1.9
func (m *Message) Payback()
Payback put Message to the pool.
func (*Message) Release ¶ added in v1.1.9
Release decrement the reference count and returns the current value.
func (*Message) ResetAttrs ¶ added in v1.1.9
func (m *Message) ResetAttrs()
ResetAttrs resets reserved/cmd/flag/methodLen to 0.
func (*Message) Retain ¶ added in v1.1.9
Retain increment the reference count and returns the current value.
func (*Message) Set ¶
func (m *Message) Set(key interface{}, value interface{})
Set sets key-value pair.
func (*Message) SetFlagBit ¶
SetFlagBit sets flag bit value by index.
func (*Message) SetMethodLen ¶
SetMethodLen sets method length.
func (*Message) SetStreamEOF ¶ added in v1.2.15
SetStream sets the flag for a stream's last message and mark the stream is EOF and closed.
func (*Message) SetStreamLocal ¶ added in v1.2.15
SetStream sets the flag for a stream message.
type MessageCoder ¶
type MessageCoder interface { // Encode wrap message before send to client Encode(*Client, *Message) *Message // Decode unwrap message between recv and handle Decode(*Client, *Message) *Message }
MessageCoder defines Message coding middleware interface.
type NativeAllocator ¶ added in v1.1.9
type NativeAllocator struct{}
NativeAllocator definition.
func (*NativeAllocator) Malloc ¶ added in v1.1.9
func (a *NativeAllocator) Malloc(size int) []byte
Malloc .
type Server ¶
type Server struct { Accepted int64 CurrLoad int64 MaxLoad int64 Codec codec.Codec Handler Handler Listener net.Listener // contains filtered or unexported fields }
Server represents an arpc Server.
func (*Server) BroadcastWithFilter ¶ added in v1.2.2
func (*Server) ForEachWithFilter ¶ added in v1.2.2
func (*Server) NewMessage ¶
NewMessage creates a Message.
type Stream ¶ added in v1.2.15
type Stream struct {
// contains filtered or unexported fields
}
Stream .
func (*Stream) CloseRecvContext ¶ added in v1.2.15
func (*Stream) CloseSendContext ¶ added in v1.2.15
func (*Stream) RecvContext ¶ added in v1.2.15
func (*Stream) SendAndClose ¶ added in v1.2.15
func (*Stream) SendAndCloseContext ¶ added in v1.2.15
func (*Stream) SendAndCloseWith ¶ added in v1.2.15
func (*Stream) SendContext ¶ added in v1.2.15
type StreamHandlerFunc ¶ added in v1.2.15
type StreamHandlerFunc func(*Stream)
StreamHandlerFunc defines stream handler.