morpc

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2022 License: Apache-2.0 Imports: 14 Imported by: 3

README

morpc

The morpc is a Goetty wrapper based message communication framework. Based on morpc, you can implement Request-Response, Stream two types of communication.

Components

morpc consists of RPCClient and RPCServer. RPCClient is used to send messages and RPCServer is used to process and respond to request messages.

RPCClient

An RPCClient can manage multiple underlying tcp connections, which we call Backend. Each Backend will start two gortoutines to handle the IO reads and writes. A server address can correspond to more than one Backend, and load balancing by way of RoundRobin.

RPCServer

RPCServer can listen to a TCP address or a UnixSocket.After a client connects, the RPCServer allocates two co-processes to handle the IO reads and writes.When RPCServer is started, it will set a message processing Handler, which will be invoked whenever a message is received from a client, and the specific logic of message processing needs to be implemented in the Handler.

Examples

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Backend

type Backend interface {
	// Send send the request for future to the corresponding backend.
	// moerr.ErrBackendClosed returned if backend is closed.
	Send(ctx context.Context, request Message, opts SendOptions) (*Future, error)
	// NewStream create a stream used to asynchronous stream of sending and receiving messages.
	// If the underlying connection is reset during the duration of the stream, then the stream will
	// be closed.
	NewStream(receiveChanBuffer int) (Stream, error)
	// Close close the backend.
	Close()
	// Busy the backend receives a lot of requests concurrently during operation, but when the number
	// of requests waiting to be sent reaches some threshold, the current backend is busy.
	Busy() bool
}

Backend backend represents a wrapper for a client communicating with a remote server.

func NewRemoteBackend

func NewRemoteBackend(
	remote string,
	codec Codec,
	options ...BackendOption) (Backend, error)

NewRemoteBackend create a goetty connection based backend. This backend will start 2 goroutiune, one for read and one for write. If there is a network error in the underlying goetty connection, it will automatically retry until the Future times out.

type BackendFactory

type BackendFactory interface {
	// Create create the corresponding backend based on the given address.
	Create(address string) (Backend, error)
}

BackendFactory backend factory

func NewGoettyBasedBackendFactory

func NewGoettyBasedBackendFactory(codec Codec, options ...BackendOption) BackendFactory

type BackendOption

type BackendOption func(*remoteBackend)

BackendOption options for create remote backend

func WithBackendBatchSendSize

func WithBackendBatchSendSize(size int) BackendOption

WithBackendBatchSendSize set the maximum number of messages to be sent together at each batch. Default is 8.

func WithBackendBufferSize

func WithBackendBufferSize(size int) BackendOption

WithBackendBufferSize set the buffer size of the wait send chan. Default is 1024.

func WithBackendBusyBufferSize

func WithBackendBusyBufferSize(size int) BackendOption

WithBackendBusyBufferSize if len(writeC) >= size, backend is busy. Default is 3/4 buffer size.

func WithBackendConnectTimeout

func WithBackendConnectTimeout(timeout time.Duration) BackendOption

WithBackendConnectTimeout set the timeout for connect to remote. Default 10s.

func WithBackendConnectWhenCreate

func WithBackendConnectWhenCreate() BackendOption

WithBackendConnectWhenCreate connection the goetty connection while create the backend.

func WithBackendFilter

func WithBackendFilter(filter func([]*Future) []*Future) BackendOption

WithBackendFilter set send fiter func. Input ready to send futures, output is really need to be send futures.

func WithBackendGoettyOptions

func WithBackendGoettyOptions(options ...goetty.Option) BackendOption

WithBackendGoettyOptions set goetty connection options. e.g. set read/write buffer size, adjust net.Conn attribute etc.

func WithBackendLogger

func WithBackendLogger(logger *zap.Logger) BackendOption

WithBackendLogger set the backend logger

type ClientOption

type ClientOption func(*client)

ClientOption client options for create client

func WithClientCreateTaskChanSize

func WithClientCreateTaskChanSize(size int) ClientOption

WithClientCreateTaskChanSize set the buffer size of the chan that creates the Backend Task.

func WithClientDisableCreateTask

func WithClientDisableCreateTask() ClientOption

WithClientDisableCreateTask set disable create backend task. The client has a task to create backends asynchronously, but when the client finds that there are not enough backends and has busy backend, it will automatically create backends until maxBackendsPerHost is reached.

func WithClientInitBackends

func WithClientInitBackends(backends []string, counts []int) ClientOption

WithClientInitBackends set the number of connections for the initialized backends.

func WithClientLogger

func WithClientLogger(logger *zap.Logger) ClientOption

WithClientLogger set client logger

func WithClientMaxBackendPerHost

func WithClientMaxBackendPerHost(maxBackendsPerHost int) ClientOption

WithClientMaxBackendPerHost maximum number of connections per host

type ClientSession

type ClientSession interface {
	// Write writing the response message to the client.
	Write(response Message, opts SendOptions) error
}

ClientSession client session, which is used to send the response message. Note that it is not thread-safe.

type Codec

type Codec interface {
	codec.Encoder
	codec.Decoder
}

Codec codec

func NewMessageCodec

func NewMessageCodec(messageFactory func() Message, payloadCopyBufSize int) Codec

NewMessageCodec create a message codec. If the message is a PayloadMessage, payloadCopyBufSize determines how much data is copied from the payload to the socket each time.

type Future

type Future struct {
	// contains filtered or unexported fields
}

Future is used to obtain response data synchronously.

func (*Future) Close

func (f *Future) Close()

Close close the future.

func (*Future) Get

func (f *Future) Get() (Message, error)

Get get the response data synchronously, blocking until `context.Done` or the response is received. This method cannot be called more than once. After calling `Get`, `Close` must be called to close `Future`.

type Message

type Message interface {
	// SetID each message has a unique ID in a RPCClient Backend. If it is a message transmitted
	// in stream, the ID must be set to Stream.ID.
	SetID(uint64)
	// GetID returns ID of the message
	GetID() uint64
	// DebugString return debug string
	DebugString() string
	// Size size of message after marshal
	Size() int
	// MarshalTo marshal to target byte slice
	MarshalTo(data []byte) (int, error)
	// Unmarshal unmarshal from data
	Unmarshal(data []byte) error
}

Message morpc is not a normal remote method call, rather it is a message-based asynchronous driven framework.

type PayloadMessage

type PayloadMessage interface {
	Message

	// GetPayloadField return the payload data
	GetPayloadField() []byte
	// SetPayloadField set the payload data
	SetPayloadField(data []byte)
}

PayloadMessage is similar message, but has a large payload field. To avoid redundant copy of memory, the encoding is msgTotalSize(4 bytes) + flag(1 byte) + messageWithoutPayloadSize(4 bytes) + messageWithoutPayload + payload, all fields except payload will be written to the buffer of each link before being written to the socket. payload, being a []byte, can be written directly to the socket to avoid a copy from the buffer to the socket.

Note: When decoding, all the socket data will be read into the buffer, the payload data will not be copied from the buffer once, but directly using the slice of the buffer data to call SetPayloadField. so this is not safe and needs to be used very carefully, i.e. after processing the message back to the rpc framework, this data cannot be held.

type RPCClient

type RPCClient interface {
	// Send send a request message to the corresponding server and return a Future to get the
	// response message.
	Send(ctx context.Context, backend string, request Message, opts SendOptions) (*Future, error)
	// NewStream create a stream used to asynchronous stream of sending and receiving messages.
	// If the underlying connection is reset during the duration of the stream, then the stream will
	// be closed.
	NewStream(backend string, receiveChanBuffer int) (Stream, error)
	// Close close the client
	Close() error
}

RPCClient morpc is not a normal remote method call, rather it is a message-based asynchronous driven framework. Each message has a unique ID, and the response to this message must have the same ID.

func NewClient

func NewClient(factory BackendFactory, options ...ClientOption) (RPCClient, error)

NewClient create rpc client with options

type RPCServer

type RPCServer interface {
	// Start start listening and wait for client messages. After each client link is established,
	// a separate goroutine is assigned to handle the Read, and the Read-to message is handed over
	// to the Handler for processing.
	Start() error
	// Close close the rpc server
	Close() error
	// RegisterRequestHandler register the request handler. The request handler is processed in the
	// read goroutine of the current client connection. Sequence is the sequence of message received
	// by the current client connection. If error returned by handler, client connection will closed.
	// Handler can use the ClientSession to write response, both synchronous and asynchronous.
	RegisterRequestHandler(func(request Message, sequence uint64, cs ClientSession) error)
}

RPCServer RPC server implementation corresponding to RPCClient.

func NewRPCServer

func NewRPCServer(name, address string, codec Codec, options ...ServerOption) (RPCServer, error)

NewRPCServer create rpc server with options. After the rpc server starts, one link corresponds to two goroutines, one read and one write. All messages to be written are first written to a buffer chan and sent to the client by the write goroutine.

type SendOptions

type SendOptions struct {
	// Timeout set write deadline for backend connection
	Timeout time.Duration
	// Arg send arg
	Arg any
}

SendOptions send options

type ServerOption

type ServerOption func(*server)

ServerOption server options for create rpc server

func WithServerBatchSendSize

func WithServerBatchSendSize(size int) ServerOption

WithServerBatchSendSize set the maximum number of messages to be sent together at each batch. Default is 8.

func WithServerGoettyOptions

func WithServerGoettyOptions(options ...goetty.Option) ServerOption

WithServerGoettyOptions set write filter func. Input ready to send Messages, output is really need to be send Messages.

func WithServerLogger

func WithServerLogger(logger *zap.Logger) ServerOption

WithServerLogger set rpc server logger

func WithServerSessionBufferSize

func WithServerSessionBufferSize(size int) ServerOption

WithServerSessionBufferSize set the buffer size of the write response chan. Default is 16.

func WithServerWriteFilter

func WithServerWriteFilter(filter func(Message) bool) ServerOption

WithServerWriteFilter set write filter func. Input ready to send Messages, output is really need to be send Messages.

type Stream

type Stream interface {
	// ID returns the stream ID. All messages transmitted on the current stream need to use the
	// stream ID as the message ID
	ID() uint64
	// Send send message to stream
	Send(request Message, opts SendOptions) error
	// Receive returns a channel to read message from server. The channel will be closed after stream
	// closed.
	Receive() (chan Message, error)
	// Close close the stream.
	Close() error
}

Stream used to asynchronous stream of sending and receiving messages

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL