Documentation ¶
Index ¶
- func GetMessageSize() int
- type Backend
- type BackendFactory
- type BackendOption
- func WithBackendBatchSendSize(size int) BackendOption
- func WithBackendBufferSize(size int) BackendOption
- func WithBackendBusyBufferSize(size int) BackendOption
- func WithBackendConnectTimeout(timeout time.Duration) BackendOption
- func WithBackendConnectWhenCreate() BackendOption
- func WithBackendFilter(filter func(Message, string) bool) BackendOption
- func WithBackendGoettyOptions(options ...goetty.Option) BackendOption
- func WithBackendHasPayloadResponse() BackendOption
- func WithBackendLogger(logger *zap.Logger) BackendOption
- func WithBackendStreamBufferSize(value int) BackendOption
- type ClientOption
- func WithClientCreateTaskChanSize(size int) ClientOption
- func WithClientInitBackends(backends []string, counts []int) ClientOption
- func WithClientLogger(logger *zap.Logger) ClientOption
- func WithClientMaxBackendMaxIdleDuration(value time.Duration) ClientOption
- func WithClientMaxBackendPerHost(maxBackendsPerHost int) ClientOption
- func WithClientTag(tag string) ClientOption
- type ClientSession
- type Codec
- type CodecOption
- type Future
- type HeaderCodec
- type Message
- type PayloadMessage
- type RPCClient
- type RPCMessage
- type RPCServer
- type ServerOption
- func WithServerBatchSendSize(size int) ServerOption
- func WithServerDisableAutoCancelContext() ServerOption
- func WithServerGoettyOptions(options ...goetty.Option) ServerOption
- func WithServerLogger(logger *zap.Logger) ServerOption
- func WithServerSessionBufferSize(size int) ServerOption
- func WithServerWriteFilter(filter func(Message) bool) ServerOption
- type Stream
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetMessageSize ¶ added in v0.6.0
func GetMessageSize() int
Types ¶
type Backend ¶
type Backend interface { // Send send the request for future to the corresponding backend. // moerr.ErrBackendClosed returned if backend is closed. Send(ctx context.Context, request Message) (*Future, error) // NewStream create a stream used to asynchronous stream of sending and receiving messages. // If the underlying connection is reset during the duration of the stream, then the stream will // be closed. NewStream(unlockAfterClose bool) (Stream, error) // Close close the backend. Close() // Busy the backend receives a lot of requests concurrently during operation, but when the number // of requests waiting to be sent reaches some threshold, the current backend is busy. Busy() bool // LastActiveTime returns last active time LastActiveTime() time.Time // Lock other I/O operations can not use this backend if the backend is locked Lock() // Unlock the backend can used by other I/O operations after unlock Unlock() // Locked indicates if backend is locked Locked() bool }
Backend backend represents a wrapper for a client communicating with a remote server.
func NewRemoteBackend ¶
func NewRemoteBackend( remote string, codec Codec, options ...BackendOption) (Backend, error)
NewRemoteBackend create a goetty connection based backend. This backend will start 2 goroutiune, one for read and one for write. If there is a network error in the underlying goetty connection, it will automatically retry until the Future times out.
type BackendFactory ¶
type BackendFactory interface { // Create create the corresponding backend based on the given address. Create(address string) (Backend, error) }
BackendFactory backend factory
func NewGoettyBasedBackendFactory ¶
func NewGoettyBasedBackendFactory(codec Codec, options ...BackendOption) BackendFactory
type BackendOption ¶
type BackendOption func(*remoteBackend)
BackendOption options for create remote backend
func WithBackendBatchSendSize ¶
func WithBackendBatchSendSize(size int) BackendOption
WithBackendBatchSendSize set the maximum number of messages to be sent together at each batch. Default is 8.
func WithBackendBufferSize ¶
func WithBackendBufferSize(size int) BackendOption
WithBackendBufferSize set the buffer size of the wait send chan. Default is 1024.
func WithBackendBusyBufferSize ¶
func WithBackendBusyBufferSize(size int) BackendOption
WithBackendBusyBufferSize if len(writeC) >= size, backend is busy. Default is 3/4 buffer size.
func WithBackendConnectTimeout ¶
func WithBackendConnectTimeout(timeout time.Duration) BackendOption
WithBackendConnectTimeout set the timeout for connect to remote. Default 10s.
func WithBackendConnectWhenCreate ¶
func WithBackendConnectWhenCreate() BackendOption
WithBackendConnectWhenCreate connection the goetty connection while create the backend.
func WithBackendFilter ¶
func WithBackendFilter(filter func(Message, string) bool) BackendOption
WithBackendFilter set send fiter func. Input ready to send futures, output is really need to be send futures.
func WithBackendGoettyOptions ¶
func WithBackendGoettyOptions(options ...goetty.Option) BackendOption
WithBackendGoettyOptions set goetty connection options. e.g. set read/write buffer size, adjust net.Conn attribute etc.
func WithBackendHasPayloadResponse ¶ added in v0.6.0
func WithBackendHasPayloadResponse() BackendOption
WithBackendHasPayloadResponse has payload response means read a response that hold a slice of data in the read buffer to avoid data copy.
func WithBackendLogger ¶
func WithBackendLogger(logger *zap.Logger) BackendOption
WithBackendLogger set the backend logger
func WithBackendStreamBufferSize ¶ added in v0.5.1
func WithBackendStreamBufferSize(value int) BackendOption
WithBackendStreamBufferSize set buffer size for stream receive message chan
type ClientOption ¶
type ClientOption func(*client)
ClientOption client options for create client
func WithClientCreateTaskChanSize ¶
func WithClientCreateTaskChanSize(size int) ClientOption
WithClientCreateTaskChanSize set the buffer size of the chan that creates the Backend Task.
func WithClientInitBackends ¶
func WithClientInitBackends(backends []string, counts []int) ClientOption
WithClientInitBackends set the number of connections for the initialized backends.
func WithClientLogger ¶
func WithClientLogger(logger *zap.Logger) ClientOption
WithClientLogger set client logger
func WithClientMaxBackendMaxIdleDuration ¶ added in v0.6.0
func WithClientMaxBackendMaxIdleDuration(value time.Duration) ClientOption
WithClientMaxBackendMaxIdleDuration set the maximum idle duration of the backend connection. Backend connection that exceed this time will be automatically closed. 0 means no idle time limit.
func WithClientMaxBackendPerHost ¶
func WithClientMaxBackendPerHost(maxBackendsPerHost int) ClientOption
WithClientMaxBackendPerHost maximum number of connections per host
func WithClientTag ¶ added in v0.6.0
func WithClientTag(tag string) ClientOption
type ClientSession ¶
type ClientSession interface { // Write writing the response message to the client. Write(ctx context.Context, response Message) error }
ClientSession client session, which is used to send the response message. Note that it is not thread-safe.
type Codec ¶
type Codec interface { codec.Codec // AddHeaderCodec add header codec. The HeaderCodecs are added sequentially and the headercodecs are // executed in the order in which they are added at codec time. AddHeaderCodec(HeaderCodec) }
Codec codec
func NewMessageCodec ¶
func NewMessageCodec(messageFactory func() Message, options ...CodecOption) Codec
NewMessageCodec create message codec. The message encoding format consists of a message header and a message body. Format:
- Size, 4 bytes, required. Inlucde header and body.
- Message header 2.1. Flag, 1 byte, required. 2.2. Checksum, 8 byte, optional. Set if has a checksun flag 2.3. PayloadSize, 4 byte, optional. Set if the message is a morpc.PayloadMessage. 2.4. Custom headers, optional. Set if has custom header codecs
- Message body 3.1. message body, required. 3.2. payload, optional. Set if has paylad flag.
type CodecOption ¶ added in v0.6.0
type CodecOption func(*messageCodec)
CodecOption codec options
func WithCodecEnableChecksum ¶ added in v0.6.0
func WithCodecEnableChecksum() CodecOption
WithCodecEnableChecksum enable checksum
func WithCodecIntegrationHLC ¶ added in v0.6.0
func WithCodecIntegrationHLC(clock clock.Clock) CodecOption
WithCodecIntegrationHLC intrgration hlc
func WithCodecMaxBodySize ¶ added in v0.6.0
func WithCodecMaxBodySize(size int) CodecOption
WithCodecMaxBodySize set rpc max body size
func WithCodecPayloadCopyBufferSize ¶ added in v0.6.0
func WithCodecPayloadCopyBufferSize(value int) CodecOption
WithCodecPayloadCopyBufferSize set payload copy buffer size, if is a PayloadMessage
type Future ¶
type Future struct {
// contains filtered or unexported fields
}
Future is used to obtain response data synchronously.
type HeaderCodec ¶ added in v0.6.0
type HeaderCodec interface { // Encode encode header into output buffer Encode(*RPCMessage, *buf.ByteBuf) (int, error) // Decode decode header from input buffer Decode(*RPCMessage, []byte) (int, error) }
HeaderCodec encode and decode header
type Message ¶
type Message interface { // SetID each message has a unique ID in a RPCClient Backend. If it is a message transmitted // in stream, the ID must be set to Stream.ID. SetID(uint64) // GetID returns ID of the message GetID() uint64 // DebugString return debug string DebugString() string // Size size of message after marshal Size() int // MarshalTo marshal to target byte slice MarshalTo(data []byte) (int, error) // Unmarshal unmarshal from data Unmarshal(data []byte) error }
Message morpc is not a normal remote method call, rather it is a message-based asynchronous driven framework.
type PayloadMessage ¶
type PayloadMessage interface { Message // GetPayloadField return the payload data GetPayloadField() []byte // SetPayloadField set the payload data SetPayloadField(data []byte) }
PayloadMessage is similar message, but has a large payload field. To avoid redundant copy of memory, the encoding is msgTotalSize(4 bytes) + flag(1 byte) + messageWithoutPayloadSize(4 bytes) + messageWithoutPayload + payload, all fields except payload will be written to the buffer of each link before being written to the socket. payload, being a []byte, can be written directly to the socket to avoid a copy from the buffer to the socket.
Note: When decoding, all the socket data will be read into the buffer, the payload data will not be copied from the buffer once, but directly using the slice of the buffer data to call SetPayloadField. so this is not safe and needs to be used very carefully, i.e. after processing the message back to the rpc framework, this data cannot be held.
type RPCClient ¶
type RPCClient interface { // Send send a request message to the corresponding server and return a Future to get the // response message. Send(ctx context.Context, backend string, request Message) (*Future, error) // NewStream create a stream used to asynchronous stream of sending and receiving messages. // If the underlying connection is reset during the duration of the stream, then the stream will // be closed. NewStream(backend string, lock bool) (Stream, error) // Close close the client Close() error }
RPCClient morpc is not a normal remote method call, rather it is a message-based asynchronous driven framework. Each message has a unique ID, and the response to this message must have the same ID.
func NewClient ¶
func NewClient(factory BackendFactory, options ...ClientOption) (RPCClient, error)
NewClient create rpc client with options
type RPCMessage ¶ added in v0.6.0
type RPCMessage struct { // Ctx context Ctx context.Context // Message raw rpc message Message Message // contains filtered or unexported fields }
RPCMessage any message sent via morpc needs to have a Context set, which is transmitted across the network. So messages sent and received at the network level are RPCMessage.
func (RPCMessage) GetTimeoutFromContext ¶ added in v0.6.0
func (m RPCMessage) GetTimeoutFromContext() (time.Duration, error)
GetTimeoutFromContext returns the timeout duration from context.
func (RPCMessage) Timeout ¶ added in v0.6.0
func (m RPCMessage) Timeout() bool
Timeout return true if the message is timeout
type RPCServer ¶
type RPCServer interface { // Start start listening and wait for client messages. After each client link is established, // a separate goroutine is assigned to handle the Read, and the Read-to message is handed over // to the Handler for processing. Start() error // Close close the rpc server Close() error // RegisterRequestHandler register the request handler. The request handler is processed in the // read goroutine of the current client connection. Sequence is the sequence of message received // by the current client connection. If error returned by handler, client connection will closed. // Handler can use the ClientSession to write response, both synchronous and asynchronous. RegisterRequestHandler(func(ctx context.Context, request Message, sequence uint64, cs ClientSession) error) }
RPCServer RPC server implementation corresponding to RPCClient.
func NewRPCServer ¶
func NewRPCServer(name, address string, codec Codec, options ...ServerOption) (RPCServer, error)
NewRPCServer create rpc server with options. After the rpc server starts, one link corresponds to two goroutines, one read and one write. All messages to be written are first written to a buffer chan and sent to the client by the write goroutine.
type ServerOption ¶
type ServerOption func(*server)
ServerOption server options for create rpc server
func WithServerBatchSendSize ¶
func WithServerBatchSendSize(size int) ServerOption
WithServerBatchSendSize set the maximum number of messages to be sent together at each batch. Default is 8.
func WithServerDisableAutoCancelContext ¶ added in v0.6.0
func WithServerDisableAutoCancelContext() ServerOption
WithServerDisableAutoCancelContext disable automatic cancel messaging for the context. The server will receive RPC messages from the client, each message comes with a Context, and morpc will call the handler to process it, and when the handler returns, the Context will be auto cancel the context. But in some scenarios, the handler is asynchronous, so morpc can't directly cancel the context after the handler returns, otherwise many strange problems will occur.
func WithServerGoettyOptions ¶
func WithServerGoettyOptions(options ...goetty.Option) ServerOption
WithServerGoettyOptions set write filter func. Input ready to send Messages, output is really need to be send Messages.
func WithServerLogger ¶
func WithServerLogger(logger *zap.Logger) ServerOption
WithServerLogger set rpc server logger
func WithServerSessionBufferSize ¶
func WithServerSessionBufferSize(size int) ServerOption
WithServerSessionBufferSize set the buffer size of the write response chan. Default is 16.
func WithServerWriteFilter ¶
func WithServerWriteFilter(filter func(Message) bool) ServerOption
WithServerWriteFilter set write filter func. Input ready to send Messages, output is really need to be send Messages.
type Stream ¶
type Stream interface { // ID returns the stream ID. All messages transmitted on the current stream need to use the // stream ID as the message ID ID() uint64 // Send send message to stream Send(ctx context.Context, request Message) error // Receive returns a channel to read stream message from server. If nil is received, the receive // loop needs to exit. In any case, Stream.Close needs to be called. Receive() (chan Message, error) // Close close the stream. Close() error }
Stream used to asynchronous stream of sending and receiving messages