rpc

package
v0.0.0-...-607911e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: MIT Imports: 14 Imported by: 4

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ReceiveNotify

func ReceiveNotify[T any](ctx context.Context, resp *Response, ch chan T) error

ReceiveNotify takes a continued response and sends received values to a channel, until an error is returned or the context finishes. In either case, the response and the channel will be closed.

Types

type Call

type Call struct {
	CallHeader

	Caller  Caller
	Decoder codec.Decoder
	Context context.Context

	mux.Channel
}

Call is used on the responding side of a call and is passed to the handler. Call has a Caller so it can be used to make calls back to the calling side.

func (*Call) Receive

func (c *Call) Receive(v interface{}) error

Receive will decode an incoming value from the underlying channel. It can be called more than once when multiple values are expected, but should always be called once in a handler. It can be called with nil to discard the value.

func (*Call) Selector

func (c *Call) Selector() string

type CallHeader

type CallHeader struct {
	S string // Selector
}

CallHeader is the first value encoded over the channel to make a call.

type Caller

type Caller interface {
	Call(ctx context.Context, selector string, params any, reply ...any) (*Response, error)
}

A Caller is able to perform remote calls.

Call makes synchronous calls to the remote selector passing args and putting the response value(s) in reply. Both args and reply can be nil. Args can be a channel of any values for asynchronously streaming multiple values from another goroutine, however the call will still block until a response is sent. If there is an error making the call an error is returned, and if an error is returned by the remote handler a RemoteError is returned. Multiple reply parameters can be provided in order to receive multi-valued returns from the remote call.

A Response is also returned for advanced operations. For example, you can check if the call is continued, meaning the underlying channel will be kept open for either streaming back more results or using the channel as a full duplex byte stream.

type Client

type Client struct {
	mux.Session
	// contains filtered or unexported fields
}

Client wraps a session and codec to make RPC calls over the session.

func NewClient

func NewClient(session mux.Session, codec codec.Codec) *Client

NewClient takes a session and codec to make a client for making RPC calls.

func (*Client) Call

func (c *Client) Call(ctx context.Context, selector string, args any, reply ...any) (*Response, error)

Call makes synchronous calls to the remote selector passing args and putting the reply value in reply. Both args and reply can be nil. Args can be a channel of interface{} values for asynchronously streaming multiple values from another goroutine, however the call will still block until a response is sent. If there is an error making the call an error is returned, and if an error is returned by the remote handler a RemoteError is returned.

A Response value is also returned for advanced operations. For example, you can check if the call is continued, meaning the underlying channel will be kept open for either streaming back more results or using the channel as a full duplex byte stream.

type FrameCodec

type FrameCodec struct {
	codec.Codec
}

FrameCodec is a special codec used to actually read/write other codecs to a transport using a length prefix.

func (*FrameCodec) Decoder

func (c *FrameCodec) Decoder(r io.Reader) codec.Decoder

Decoder returns a frame decoder that first reads a four byte frame length value used to read the rest of the frame, then uses the embedded codec to decode those bytes into a value.

func (*FrameCodec) Encoder

func (c *FrameCodec) Encoder(w io.Writer) codec.Encoder

Encoder returns a frame encoder that first encodes a value to a buffer using the embedded codec, prepends the encoded value byte length as a four byte big endian uint32, then writes to the given Writer.

type Handler

type Handler interface {
	RespondRPC(Responder, *Call)
}

A Handler responds to an RPC request.

RespondRPC should use Call to receive at least one input argument value, then use Responder to return a value or continue. Since an input argument value is always sent to the handler, a call to Receive on the Call value shoud always be done otherwise the call will block. You can call Receive with nil to discard the input value. If Responder is not used, a default value of nil is returned.

func NotFoundHandler

func NotFoundHandler() Handler

NotFoundHandler returns a simple handler that returns an error "not found".

func ProxyHandler

func ProxyHandler(dst *Client) Handler

ProxyHandler returns a handler that tries its best to proxy the call to the dst Client, regardless of call style and assuming the same encoding.

type HandlerFunc

type HandlerFunc func(Responder, *Call)

The HandlerFunc type is an adapter to allow the use of ordinary functions as RPC handlers. If f is a function with the appropriate signature, HandlerFunc(f) is a Handler that calls f.

func (HandlerFunc) RespondRPC

func (f HandlerFunc) RespondRPC(resp Responder, call *Call)

RespondRPC calls f(resp, call).

type RemoteError

type RemoteError string

RemoteError is an error that has been returned from the remote side of the RPC connection.

func (RemoteError) Error

func (e RemoteError) Error() string

type RespondMux

type RespondMux struct {
	// contains filtered or unexported fields
}

RespondMux is an RPC call multiplexer. It matches the selector of each incoming call against a list of registered selector patterns and calls the handler for the pattern that most closely matches the selector.

RespondMux also takes care of normalizing the selector to a path form "/foo/bar", allowing you to use this or the more conventional RPC dot form "foo.bar".

Patterns match exact incoming selectors, or can end with a "/" or "." to indicate handling any selectors beginning with this pattern. Longer patterns take precedence over shorter ones, so that if there are handlers registered for both "foo." and "foo.bar.", the latter handler will be called for selectors beginning "foo.bar." and the former will receive calls for any other selectors prefixed with "foo.".

Since RespondMux is also a Handler, you can use them for submuxing. If a pattern matches a handler that is a RespondMux, it will trim the matching selector prefix before matching against the sub RespondMux.

func NewRespondMux

func NewRespondMux() *RespondMux

NewRespondMux allocates and returns a new RespondMux.

func (*RespondMux) Handle

func (m *RespondMux) Handle(pattern string, handler Handler)

Handle registers the handler for the given pattern. If a handler already exists for pattern, Handle panics.

func (*RespondMux) Handler

func (m *RespondMux) Handler(c *Call) (h Handler, pattern string)

Handler returns the handler to use for the given call, consulting c.Selector(). It always returns a non-nil handler.

If there is no registered handler that applies to the request, Handler returns the FallbackHandler or if not set, a "not found" handler with an empty pattern.

func (*RespondMux) Match

func (m *RespondMux) Match(selector string) (h Handler, pattern string)

Match finds a handler given a selector string. Most-specific (longest) pattern wins. If a pattern handler is a submux, it will call Match with the selector minus the pattern.

func (*RespondMux) Remove

func (m *RespondMux) Remove(selector string) (h Handler)

Remove removes and returns the handler for the selector.

func (*RespondMux) RespondRPC

func (m *RespondMux) RespondRPC(r Responder, c *Call)

RespondRPC dispatches the call to the handler whose pattern most closely matches the selector.

type Responder

type Responder interface {
	// Return sends a return value, which can be an error, and closes the channel.
	Return(...any) error

	// Continue sets the response to keep the channel open after sending a return value,
	// and returns the underlying channel for you to take control of. If called, you
	// become responsible for closing the channel.
	Continue(...any) (mux.Channel, error)

	// Send encodes a value over the underlying channel, but does not initiate a response,
	// so it must be used after calling Continue.
	Send(interface{}) error
}

Responder is used by handlers to initiate a response and send values to the caller.

type Response

type Response struct {
	ResponseHeader
	Value   any
	Channel mux.Channel
	// contains filtered or unexported fields
}

Response is used on the calling side to represent a response and allow access to the ResponseHeader data, the reply value, the underlying channel, and methods to send or receive encoded values over the channel if Continue was set on the ResponseHeader.

func (*Response) Close

func (r *Response) Close() error

func (*Response) CloseWrite

func (r *Response) CloseWrite() error

func (*Response) Continue

func (r *Response) Continue() bool

func (*Response) Err

func (r *Response) Err() error

func (*Response) Receive

func (r *Response) Receive(v interface{}) error

Receive decodes a value from the underlying channel if it is still open.

func (*Response) Send

func (r *Response) Send(v interface{}) error

Send encodes a value over the underlying channel if it is still open.

type ResponseHeader

type ResponseHeader struct {
	E *string // Error
	C bool    // Continue: after parsing response, keep stream open for whatever protocol
}

ResponseHeader is the value encoded over the channel to indicate a response.

type Server

type Server struct {
	Handler Handler
	Codec   codec.Codec
}

Server wraps a Handler and codec to respond to RPC calls.

func (*Server) Respond

func (s *Server) Respond(sess mux.Session, ctx context.Context)

Respond will Accept channels until the Session is closed and respond with the server handler in its own goroutine. If Handler was not set, an empty RespondMux is used. If the handler does not initiate a response, a nil value is returned. If the handler does not call Continue, the channel will be closed. Respond will panic if Codec is nil.

If the context is not nil, it will be added to Calls. Otherwise the Call Context will be set to a context.Background().

func (*Server) Serve

func (s *Server) Serve(l net.Listener) error

Serve will Accept sessions until the Listener is closed, and will Respond to accepted sessions in their own goroutine.

func (*Server) ServeMux

func (s *Server) ServeMux(l mux.Listener) error

ServeMux will Accept sessions until the Listener is closed, and will Respond to accepted sessions in their own goroutine.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL