morpc

package
v1.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2024 License: Apache-2.0 Imports: 32 Imported by: 3

README

morpc

The morpc is a Goetty wrapper based message communication framework. Based on morpc, you can implement Request-Response, Stream two types of communication.

Components

morpc consists of RPCClient and RPCServer. RPCClient is used to send messages and RPCServer is used to process and respond to request messages.

RPCClient

An RPCClient can manage multiple underlying tcp connections, which we call Backend. Each Backend will start two gortoutines to handle the IO reads and writes. A server address can correspond to more than one Backend, and load balancing by way of RoundRobin.

RPCServer

RPCServer can listen to a TCP address or a UnixSocket.After a client connects, the RPCServer allocates two co-processes to handle the IO reads and writes.When RPCServer is started, it will set a message processing Handler, which will be invoked whenever a message is received from a client, and the specific logic of message processing needs to be implemented in the Handler.

Examples

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetMessageSize added in v0.6.0

func GetMessageSize() int

Types

type Backend

type Backend interface {
	// Send send the request for future to the corresponding backend.
	// moerr.ErrBackendClosed returned if backend is closed.
	Send(ctx context.Context, request Message) (*Future, error)
	// SendInternal is similar to Send, but perform on internal message
	SendInternal(ctx context.Context, request Message) (*Future, error)
	// NewStream create a stream used to asynchronous stream of sending and receiving messages.
	// If the underlying connection is reset during the duration of the stream, then the stream
	// will be closed.
	NewStream(unlockAfterClose bool) (Stream, error)
	// Close close the backend.
	Close()
	// Busy the backend receives a lot of requests concurrently during operation, but when the number
	// of requests waiting to be sent reaches some threshold, the current backend is busy.
	Busy() bool
	// LastActiveTime returns last active time
	LastActiveTime() time.Time
	// Lock other I/O operations can not use this backend if the backend is locked
	Lock()
	// Unlock the backend can used by other I/O operations after unlock
	Unlock()
	// Locked indicates if backend is locked
	Locked() bool
}

Backend backend represents a wrapper for a client communicating with a remote server.

func NewRemoteBackend

func NewRemoteBackend(
	remote string,
	codec Codec,
	options ...BackendOption) (Backend, error)

NewRemoteBackend create a goetty connection based backend. This backend will start 2 goroutine, one for read and one for write. If there is a network error in the underlying goetty connection, it will automatically retry until the Future times out.

type BackendFactory

type BackendFactory interface {
	// Create create the corresponding backend based on the given address.
	Create(address string, extraOptions ...BackendOption) (Backend, error)
}

BackendFactory backend factory

func NewGoettyBasedBackendFactory

func NewGoettyBasedBackendFactory(codec Codec, options ...BackendOption) BackendFactory

type BackendOption

type BackendOption func(*remoteBackend)

BackendOption options for create remote backend

func WithBackendBatchSendSize

func WithBackendBatchSendSize(size int) BackendOption

WithBackendBatchSendSize set the maximum number of messages to be sent together at each batch. Default is 8.

func WithBackendBufferSize

func WithBackendBufferSize(size int) BackendOption

WithBackendBufferSize set the buffer size of the wait send chan. Default is 1024.

func WithBackendBusyBufferSize

func WithBackendBusyBufferSize(size int) BackendOption

WithBackendBusyBufferSize if len(writeC) >= size, backend is busy. Default is 3/4 buffer size.

func WithBackendConnectTimeout

func WithBackendConnectTimeout(timeout time.Duration) BackendOption

WithBackendConnectTimeout set the timeout for connect to remote. Default 10s.

func WithBackendFilter

func WithBackendFilter(filter func(Message, string) bool) BackendOption

WithBackendFilter set send filter func. Input ready to send futures, output is really need to be send futures.

func WithBackendFreeOrphansResponse added in v1.1.0

func WithBackendFreeOrphansResponse(value func(Message)) BackendOption

WithBackendFreeOrphansResponse setup free orphans response func

func WithBackendGoettyOptions

func WithBackendGoettyOptions(options ...goetty.Option) BackendOption

WithBackendGoettyOptions set goetty connection options. e.g. set read/write buffer size, adjust net.Conn attribute etc.

func WithBackendHasPayloadResponse added in v0.6.0

func WithBackendHasPayloadResponse() BackendOption

WithBackendHasPayloadResponse has payload response means read a response that hold a slice of data in the read buffer to avoid data copy.

func WithBackendLogger

func WithBackendLogger(logger *zap.Logger) BackendOption

WithBackendLogger set the backend logger

func WithBackendMetrics added in v1.0.0

func WithBackendMetrics(metrics *metrics) BackendOption

WithBackendMetrics setup backend metrics

func WithBackendReadTimeout added in v1.0.0

func WithBackendReadTimeout(value time.Duration) BackendOption

WithBackendReadTimeout set read timeout for read loop.

func WithBackendStreamBufferSize added in v0.5.1

func WithBackendStreamBufferSize(value int) BackendOption

WithBackendStreamBufferSize set buffer size for stream receive message chan

func WithDisconnectAfterRead added in v1.2.0

func WithDisconnectAfterRead(n int) BackendOption

WithDisconnectAfterRead used for testing. Close the connection after read N messages.

type ClientOption

type ClientOption func(*client)

ClientOption client options for create client

func WithClientCreateTaskChanSize

func WithClientCreateTaskChanSize(size int) ClientOption

WithClientCreateTaskChanSize set the buffer size of the chan that creates the Backend Task.

func WithClientEnableAutoCreateBackend added in v1.1.1

func WithClientEnableAutoCreateBackend() ClientOption

WithClientEnableAutoCreateBackend enable client to automatically create a backend in the background, when the links in the connection pool are used, if the pool has not reached the maximum number of links, it will automatically create them in the background to improve the latency of link creation.

func WithClientInitBackends

func WithClientInitBackends(backends []string, counts []int) ClientOption

WithClientInitBackends set the number of connections for the initialized backends.

func WithClientLogger

func WithClientLogger(logger *zap.Logger) ClientOption

WithClientLogger set client logger

func WithClientMaxBackendMaxIdleDuration added in v0.6.0

func WithClientMaxBackendMaxIdleDuration(value time.Duration) ClientOption

WithClientMaxBackendMaxIdleDuration set the maximum idle duration of the backend connection. Backend connection that exceed this time will be automatically closed. 0 means no idle time limit.

func WithClientMaxBackendPerHost

func WithClientMaxBackendPerHost(maxBackendsPerHost int) ClientOption

WithClientMaxBackendPerHost maximum number of connections per host

type ClientSession

type ClientSession interface {
	// Close close the client session
	Close() error
	// SessionCtx get the session context, if session is closed the context will be canceled.
	SessionCtx() context.Context
	// Write writing the response message to the client.
	Write(ctx context.Context, response Message) error
	// AsyncWrite only put message into write queue, and return immediately.
	AsyncWrite(response Message) error
	// CreateCache create a message cache using cache ID. Cache will removed if
	// context is done.
	CreateCache(ctx context.Context, cacheID uint64) (MessageCache, error)
	// DeleteCache delete cache using the spec cacheID
	DeleteCache(cacheID uint64)
	// GetCache returns the message cache
	GetCache(cacheID uint64) (MessageCache, error)
	// RemoteAddress returns remote address, include ip and port
	RemoteAddress() string
}

ClientSession client session, which is used to send the response message. Note that it is not thread-safe.

type Codec

type Codec interface {
	codec.Codec
	// Valid valid the message is valid
	Valid(message Message) error
	// AddHeaderCodec add header codec. The HeaderCodecs are added sequentially and the headercodecs are
	// executed in the order in which they are added at codec time.
	AddHeaderCodec(HeaderCodec)
}

Codec codec

func NewMessageCodec

func NewMessageCodec(messageFactory func() Message, options ...CodecOption) Codec

NewMessageCodec create message codec. The message encoding format consists of a message header and a message body. Format:

  1. Size, 4 bytes, required. Inlucde header and body.
  2. Message header 2.1. Flag, 1 byte, required. 2.2. Checksum, 8 byte, optional. Set if has a checksun flag 2.3. PayloadSize, 4 byte, optional. Set if the message is a morpc.PayloadMessage. 2.4. Streaming sequence, 4 byte, optional. Set if the message is in a streaming. 2.5. Custom headers, optional. Set if has custom header codecs
  3. Message body 3.1. message body, required. 3.2. payload, optional. Set if has paylad flag.

type CodecOption added in v0.6.0

type CodecOption func(*messageCodec)

CodecOption codec options

func WithCodecEnableChecksum added in v0.6.0

func WithCodecEnableChecksum() CodecOption

WithCodecEnableChecksum enable checksum

func WithCodecEnableCompress added in v0.7.0

func WithCodecEnableCompress(pool *mpool.MPool) CodecOption

WithCodecEnableCompress enable compress body and payload

func WithCodecIntegrationHLC added in v0.6.0

func WithCodecIntegrationHLC(clock clock.Clock) CodecOption

WithCodecIntegrationHLC intrgration hlc

func WithCodecMaxBodySize added in v0.6.0

func WithCodecMaxBodySize(size int) CodecOption

WithCodecMaxBodySize set rpc max body size

func WithCodecPayloadCopyBufferSize added in v0.6.0

func WithCodecPayloadCopyBufferSize(value int) CodecOption

WithCodecPayloadCopyBufferSize set payload copy buffer size, if is a PayloadMessage

type Config added in v0.8.0

type Config struct {
	// MaxConnections maximum number of connections to communicate with each DNStore.
	// Default is 400.
	MaxConnections int `toml:"max-connections"`
	// MaxIdleDuration maximum connection idle time, connection will be closed automatically
	// if this value is exceeded. Default is 1 min.
	MaxIdleDuration toml.Duration `toml:"max-idle-duration"`
	// SendQueueSize maximum capacity of the send request queue per connection, when the
	// queue is full, the send request will be blocked. Default is 10240.
	SendQueueSize int `toml:"send-queue-size"`
	// BusyQueueSize when the length of the send queue reaches the currently set value, the
	// current connection is busy with high load. When any connection with Busy status exists,
	// a new connection will be created until the value set by MaxConnections is reached.
	// Default is 3/4 of SendQueueSize.
	BusyQueueSize int `toml:"busy-queue-size"`
	// WriteBufferSize buffer size for write messages per connection. Default is 1kb
	WriteBufferSize toml.ByteSize `toml:"write-buffer-size"`
	// ReadBufferSize buffer size for read messages per connection. Default is 1kb
	ReadBufferSize toml.ByteSize `toml:"read-buffer-size"`
	// MaxMessageSize max message size for rpc. Default is 100M
	MaxMessageSize toml.ByteSize `toml:"max-message-size"`
	// PayloadCopyBufferSize buffer size for copy payload to socket. Default is 16kb
	PayloadCopyBufferSize toml.ByteSize `toml:"payload-copy-buffer-size"`
	// EnableCompress enable compress message
	EnableCompress bool `toml:"enable-compress"`

	// ServerWorkers number of server workers for handle requests
	ServerWorkers int `toml:"server-workers"`
	// ServerBufferQueueSize queue size for server buffer requetsts
	ServerBufferQueueSize int `toml:"server-buffer-queue-size"`

	// BackendOptions extra backend options
	BackendOptions []BackendOption `toml:"-"`
	// ClientOptions extra client options
	ClientOptions []ClientOption `toml:"-"`
	// CodecOptions extra codec options
	CodecOptions []CodecOption `toml:"-"`
}

Config rpc client config

func (*Config) Adjust added in v0.8.0

func (c *Config) Adjust()

Adjust adjust config, fill default value

func (Config) NewClient added in v0.8.0

func (c Config) NewClient(
	name string,
	logger *zap.Logger,
	responseFactory func() Message) (RPCClient, error)

NewClient create client from config

func (Config) NewServer added in v0.8.0

func (c Config) NewServer(
	name string,
	address string,
	logger *zap.Logger,
	requestFactory func() Message,
	responseReleaseFunc func(Message),
	opts ...ServerOption) (RPCServer, error)

NewServer new rpc server

type Future

type Future struct {
	// contains filtered or unexported fields
}

Future is used to obtain response data synchronously.

func (*Future) Close

func (f *Future) Close()

Close closes the future.

func (*Future) Get

func (f *Future) Get() (Message, error)

Get get the response data synchronously, blocking until `context.Done` or the response is received. This method cannot be called more than once. After calling `Get`, `Close` must be called to close `Future`.

type HandleFunc added in v0.8.0

type HandleFunc[REQ, RESP MethodBasedMessage] func(context.Context, REQ, RESP) error

HandleFunc request handle func

type HandlerOption added in v0.8.0

type HandlerOption[REQ, RESP MethodBasedMessage] func(*handler[REQ, RESP])

HandlerOption message handler option

func WithHandleMessageFilter added in v0.8.0

func WithHandleMessageFilter[REQ, RESP MethodBasedMessage](filter func(REQ) bool) HandlerOption[REQ, RESP]

WithHandleMessageFilter set filter func. Requests can be modified or filtered out by the filter before they are processed by the handler.

func WithHandlerRespReleaseFunc added in v1.2.0

func WithHandlerRespReleaseFunc[REQ, RESP MethodBasedMessage](f func(message Message)) HandlerOption[REQ, RESP]

WithHandlerRespReleaseFunc sets the respReleaseFunc of the handler.

type HeaderCodec added in v0.6.0

type HeaderCodec interface {
	// Encode encode header into output buffer
	Encode(*RPCMessage, *buf.ByteBuf) (int, error)
	// Decode decode header from input buffer
	Decode(*RPCMessage, []byte) (int, error)
}

HeaderCodec encode and decode header

type Message

type Message interface {
	// SetID each message has a unique ID in a RPCClient Backend. If it is a message transmitted
	// in stream, the ID must be set to Stream.ID.
	SetID(uint64)
	// GetID returns ID of the message
	GetID() uint64
	// DebugString return debug string
	DebugString() string
	// Size size of message after marshal
	Size() int
	// MarshalTo marshal to target byte slice
	MarshalTo(data []byte) (int, error)
	// Unmarshal unmarshal from data
	Unmarshal(data []byte) error
}

Message morpc is not a normal remote method call, rather it is a message-based asynchronous driven framework.

type MessageCache added in v0.8.0

type MessageCache interface {
	// Add FIFO add message to cache
	Add(value Message) error
	// Len returns message count in the cache
	Len() (int, error)
	// Pop pop the first message in the cache, return false means no message in cache
	Pop() (Message, bool, error)
	// Close close the cache
	Close()
}

MessageCache the client uses stream to send messages to the server, and when the server thinks it has not received enough messages, it can cache the messages sent by the client.

type MessageHandler added in v0.8.0

type MessageHandler[REQ, RESP MethodBasedMessage] interface {
	// Start start the txn server
	Start() error
	// Close the txn server
	Close() error
	// RegisterHandleFunc register request handler func
	RegisterHandleFunc(method uint32, handleFunc HandleFunc[REQ, RESP], async bool) MessageHandler[REQ, RESP]
	// Handle handle at local
	Handle(ctx context.Context, req REQ) RESP
}

MessageHandler receives and handle requests from client.

func NewMessageHandler added in v0.8.0

func NewMessageHandler[REQ, RESP MethodBasedMessage](
	name string,
	address string,
	cfg Config,
	pool MessagePool[REQ, RESP],
	opts ...HandlerOption[REQ, RESP]) (MessageHandler[REQ, RESP], error)

NewMessageHandler create a message handler.

type MessagePool added in v0.8.0

type MessagePool[REQ, RESP MethodBasedMessage] interface {
	AcquireRequest() REQ
	ReleaseRequest(REQ)

	AcquireResponse() RESP
	ReleaseResponse(RESP)
}

MessagePool message pool is used to reuse request and response to avoid allocate.

func NewMessagePool added in v0.8.0

func NewMessagePool[REQ, RESP MethodBasedMessage](
	requestFactory func() REQ,
	responseFactory func() RESP) MessagePool[REQ, RESP]

NewMessagePool create message pool

type MethodBasedMessage added in v0.8.0

type MethodBasedMessage interface {
	Message
	// Reset reset message
	Reset()
	// Method message type
	Method() uint32
	// SetMethod set message type.
	SetMethod(uint32)
	// WrapError wrap error into message, and transport to remote endpoint.
	WrapError(error)
	// UnwrapError parse error from the message.
	UnwrapError() error
}

MethodBasedMessage defines messages based on Request and Response patterns in RPC. And different processing logic can be implemented according to the Method in Request.

type PayloadMessage

type PayloadMessage interface {
	Message

	// GetPayloadField return the payload data
	GetPayloadField() []byte
	// SetPayloadField set the payload data
	SetPayloadField(data []byte)
}

PayloadMessage is similar message, but has a large payload field. To avoid redundant copy of memory, the encoding is msgTotalSize(4 bytes) + flag(1 byte) + messageWithoutPayloadSize(4 bytes) + messageWithoutPayload + payload, all fields except payload will be written to the buffer of each link before being written to the socket. payload, being a []byte, can be written directly to the socket to avoid a copy from the buffer to the socket.

Note: When decoding, all the socket data will be read into the buffer, the payload data will not be copied from the buffer once, but directly using the slice of the buffer data to call SetPayloadField. so this is not safe and needs to be used very carefully, i.e. after processing the message back to the rpc framework, this data cannot be held.

type RPCClient

type RPCClient interface {
	// Send send a request message to the corresponding server and return a Future to get the
	// response message.
	Send(ctx context.Context, backend string, request Message) (*Future, error)
	// NewStream create a stream used to asynchronous stream of sending and receiving messages.
	// If the underlying connection is reset during the duration of the stream, then the stream will
	// be closed.
	NewStream(backend string, lock bool) (Stream, error)
	// Ping is used to check if the remote service is available. The remote service will reply with
	// a pong when it receives the ping.
	Ping(ctx context.Context, backend string) error
	// Close close the client
	Close() error

	CloseBackend() error
}

RPCClient morpc is not a normal remote method call, rather it is a message-based asynchronous driven framework. Each message has a unique ID, and the response to this message must have the same ID.

func NewClient

func NewClient(
	name string,
	factory BackendFactory,
	options ...ClientOption) (RPCClient, error)

NewClient create rpc client with options

type RPCMessage added in v0.6.0

type RPCMessage struct {
	// Ctx context
	Ctx    context.Context
	Cancel context.CancelFunc
	// Message raw rpc message
	Message Message
	// contains filtered or unexported fields
}

RPCMessage any message sent via morpc needs to have a Context set, which is transmitted across the network. So messages sent and received at the network level are RPCMessage.

func (RPCMessage) GetTimeoutFromContext added in v0.6.0

func (m RPCMessage) GetTimeoutFromContext() (time.Duration, error)

GetTimeoutFromContext returns the timeout duration from context.

func (RPCMessage) InternalMessage added in v0.7.0

func (m RPCMessage) InternalMessage() bool

InternalMessage returns true means the rpc message is the internal message in morpc.

func (RPCMessage) Timeout added in v0.6.0

func (m RPCMessage) Timeout() bool

Timeout return true if the message is timeout

type RPCServer

type RPCServer interface {
	// Start start listening and wait for client messages. After each client link is established,
	// a separate goroutine is assigned to handle the Read, and the Read-to message is handed over
	// to the Handler for processing.
	Start() error
	// Close close the rpc server
	Close() error
	// RegisterRequestHandler register the request handler. The request handler is processed in the
	// read goroutine of the current client connection. Sequence is the sequence of message received
	// by the current client connection. If error returned by handler, client connection will closed.
	// Handler can use the ClientSession to write response, both synchronous and asynchronous.
	RegisterRequestHandler(func(ctx context.Context, request RPCMessage, sequence uint64, cs ClientSession) error)
}

RPCServer RPC server implementation corresponding to RPCClient.

func NewRPCServer

func NewRPCServer(
	name, address string,
	codec Codec,
	options ...ServerOption) (RPCServer, error)

NewRPCServer create rpc server with options. After the rpc server starts, one link corresponds to two goroutines, one read and one write. All messages to be written are first written to a buffer chan and sent to the client by the write goroutine.

type ServerOption

type ServerOption func(*server)

ServerOption server options for create rpc server

func WithServerBatchSendSize

func WithServerBatchSendSize(size int) ServerOption

WithServerBatchSendSize set the maximum number of messages to be sent together at each batch. Default is 8.

func WithServerDisableAutoCancelContext added in v0.6.0

func WithServerDisableAutoCancelContext() ServerOption

WithServerDisableAutoCancelContext disable automatic cancel messaging for the context. The server will receive RPC messages from the client, each message comes with a Context, and morpc will call the handler to process it, and when the handler returns, the Context will be auto cancel the context. But in some scenarios, the handler is asynchronous, so morpc can't directly cancel the context after the handler returns, otherwise many strange problems will occur.

func WithServerGoettyOptions

func WithServerGoettyOptions(options ...goetty.Option) ServerOption

WithServerGoettyOptions set write filter func. Input ready to send Messages, output is really need to be send Messages.

func WithServerLogger

func WithServerLogger(logger *zap.Logger) ServerOption

WithServerLogger set rpc server logger

func WithServerSessionBufferSize

func WithServerSessionBufferSize(size int) ServerOption

WithServerSessionBufferSize set the buffer size of the write response chan. Default is 16.

func WithServerWriteFilter

func WithServerWriteFilter(filter func(Message) bool) ServerOption

WithServerWriteFilter set write filter func. Input ready to send Messages, output is really need to be send Messages.

type Stream

type Stream interface {
	// ID returns the stream ID. All messages transmitted on the current stream need to use the
	// stream ID as the message ID
	ID() uint64
	// Send send message to stream
	Send(ctx context.Context, request Message) error
	// Receive returns a channel to read stream message from server. If nil is received, the receive
	// loop needs to exit. In any case, Stream.Close needs to be called.
	Receive() (chan Message, error)
	// Close close the stream. If closeConn is true, the underlying connection will be closed.
	Close(closeConn bool) error
}

Stream used to asynchronous stream of sending and receiving messages

Directories

Path Synopsis
examples
Package mock_morpc is a generated GoMock package.
Package mock_morpc is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL