rpc

package
v0.0.0-...-fab76b5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 27, 2025 License: AGPL-3.0 Imports: 12 Imported by: 70

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrShutdown = errors.New("connection is shut down")

ErrShutdown is returned when a request is made on a connection that is shutting down.

Functions

func IsShutdownErr

func IsShutdownErr(err error) bool

IsShutdownErr returns true if the error is ErrShutdown.

func TracingFromContext

func TracingFromContext(ctx context.Context) (string, string, int)

TracingFromContext returns the traceID and spanID from the context.

func WithTracing

func WithTracing(ctx context.Context, traceID, spanID string, flags int) context.Context

WithTracing returns a context with the given traceID and spanID.

Types

type Call

type Call struct {
	Request
	Params     interface{}
	Response   interface{}
	Error      error
	Done       chan *Call
	TraceID    string
	SpanID     string
	TraceFlags int
}

Call represents an active RPC.

type Codec

type Codec interface {
	// ReadHeader reads a message header into hdr.
	ReadHeader(hdr *Header) error

	// ReadBody reads a message body into the given body value.  The
	// isRequest parameter specifies whether the message being read
	// is a request; if not, it's a response.  The body value will
	// be a non-nil struct pointer, or nil to signify that the body
	// should be read and discarded.
	ReadBody(body interface{}, isRequest bool) error

	// WriteMessage writes a message with the given header and body.
	// The body will always be a struct. It may be called concurrently
	// with ReadHeader and ReadBody, but will not be called
	// concurrently with itself.
	WriteMessage(hdr *Header, body interface{}) error

	// Close closes the codec. It may be called concurrently
	// and should cause the Read methods to unblock.
	Close() error
}

A Codec implements reading and writing of messages in an RPC session. The RPC code calls WriteMessage to write a message to the connection and calls ReadHeader and ReadBody in pairs to read messages.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn represents an RPC endpoint. It can both initiate and receive RPC requests. There may be multiple outstanding Calls associated with a single Client, and a Client may be used by multiple goroutines simultaneously.

func NewConn

func NewConn(codec Codec, factory RecorderFactory) *Conn

NewConn creates a new connection that uses the given codec for transport, but it does not start it. Conn.Start must be called before any requests are sent or received. If recorderFactory is non-nil, it will be called to get a new recorder for every request.

func (*Conn) Call

func (conn *Conn) Call(ctx context.Context, req Request, params, response interface{}) error

Call invokes the named action on the object of the given type with the given id. The returned values will be stored in response, which should be a pointer. If the action fails remotely, the error will have a cause of type RequestError. The params value may be nil if no parameters are provided; the response value may be nil to indicate that any result should be discarded.

func (*Conn) Close

func (conn *Conn) Close() error

Close closes the connection and its underlying codec; it returns when all requests have been terminated.

If the connection is serving requests, and the root value implements the Killer interface, its Kill method will be called. The codec will then be closed only when all its outstanding server calls have completed.

Calling Close multiple times is not an error.

func (*Conn) Dead

func (conn *Conn) Dead() <-chan struct{}

Dead returns a channel that is closed when the connection has been closed or the underlying transport has received an error. There may still be outstanding requests. Dead must be called after conn.Start has been called.

func (*Conn) Serve

func (conn *Conn) Serve(root interface{}, factory RecorderFactory, transformErrors func(error) error)

Serve serves RPC requests on the connection by invoking methods on root. Note that it does not start the connection running, though it may be called once the connection is already started.

The server executes each client request by calling a method on root to obtain an object to act on; then it invokes an method on that object with the request parameters, possibly returning some result.

Methods on the root value are of the form:

M(id string) (O, error)

where M is an exported name, conventionally naming the object type, id is some identifier for the object and O is the type of the returned object.

Methods defined on O may defined in one of the following forms, where T and R must be struct types.

Method([context.Context])
Method([context.Context]) R
Method([context.Context]) (R, error)
Method([context.Context]) error
Method([context.Context,]T)
Method([context.Context,]T) R
Method([context.Context,]T) (R, error)
Method([context.Context,]T) error

If transformErrors is non-nil, it will be called on all returned non-nil errors, for example to transform the errors into ServerErrors with specified codes. There will be a panic if transformErrors returns nil.

Serve may be called at any time on a connection to change the set of methods being served by the connection. This will have no effect on calls that are currently being services. If root is nil, the connection will serve no methods.

func (*Conn) ServeRoot

func (conn *Conn) ServeRoot(root Root, factory RecorderFactory, transformErrors func(error) error)

ServeRoot is like Serve except that it gives the root object dynamic control over what methods are available instead of using reflection on the type.

The server executes each client request by calling FindMethod to obtain a method to invoke. It invokes that method with the request parameters, possibly returning some result.

The Kill method will be called when the connection is closed.

func (*Conn) Start

func (conn *Conn) Start(ctx context.Context)

Start starts the RPC connection running. It must be called at least once for any RPC connection (client or server side) It has no effect if it has already been called. By default, a connection serves no methods. See Conn.Serve for a description of how to serve methods on a Conn.

The context passed in will be propagated to requests served by the connection.

type ErrorCoder

type ErrorCoder interface {
	ErrorCode() string
}

ErrorCoder represents any error that has an associated error code. An error code is a short string that represents the kind of an error.

type ErrorInfoProvider

type ErrorInfoProvider interface {
	ErrorInfo() map[string]interface{}
}

ErrorInfoProvider represents any error that can provide additional error information as a map.

type Header struct {
	// RequestId holds the sequence number of the request.
	// For replies, it holds the sequence number of the request
	// that is being replied to.
	RequestId uint64

	// Request holds the action to invoke.
	Request Request

	// Error holds the error string for a response. If there is no error,
	// this will be empty.
	Error string

	// ErrorCode holds the code of the error for a response. Error code will
	// be empty if there is no error.
	// TODO (stickupkid): This should be renamed to ResponseCode, that way
	// we're not confusing a programmatic error (empty string) with a
	// valid response code.
	ErrorCode string

	// ErrorInfo holds an optional set of additional information for an
	// error. This is used to provide additional information about the
	// error.
	// TODO (stickupkid): This should have been metadata for all responses
	// not just errors.
	ErrorInfo map[string]interface{}

	// Version defines the wire format of the request and response structure.
	Version int

	// TraceID holds the trace id of the request. This is used for sending
	// and receiving trace information.
	TraceID string

	// SpanID holds the span id of the request. This is used for sending
	// and receiving trace information.
	SpanID string

	// TraceFlags holds the trace flags of the request. This is used for
	// sending and receiving trace information.
	// Currently it indicates if a trace is sampled.
	TraceFlags int
}

Header is a header written before every RPC call. Since RPC requests can be initiated from either side, the header may represent a request from the other side or a response to an outstanding request.

func (*Header) IsRequest

func (hdr *Header) IsRequest() bool

IsRequest returns whether the header represents an RPC request. If it is not a request, it is a response.

type Killer

type Killer interface {
	// Kill kills any outstanding requests.  It should return
	// immediately.
	Kill()
}

Killer represents a type that can be asked to abort any outstanding requests. The Kill method should return immediately.

type Observer

type Observer interface {

	// ServerRequest informs the Observer of a request made
	// to the Conn. If the request was not recognized or there was
	// an error reading the body, body will be nil.
	//
	// ServerRequest is called just before the server method
	// is invoked.
	ServerRequest(hdr *Header, body interface{})

	// ServerReply informs the RequestNotifier of a reply sent to a
	// server request. The given Request gives details of the call
	// that was made; the given Header and body are the header and
	// body sent as reply.
	//
	// ServerReply is called just before the reply is written.
	ServerReply(req Request, hdr *Header, body interface{})
}

Observer can be implemented to find out about requests occurring in an RPC conn, for example to print requests for logging purposes. The calls should not block or interact with the Conn object as that can cause delays to the RPC server or deadlock.

type ObserverFactory

type ObserverFactory interface {
	// RPCObserver will return a new Observer usually constructed
	// from the state previously built up in the Observer. The
	// returned instance will be utilized per RPC request.
	RPCObserver() Observer
}

ObserverFactory is a type which can construct a new Observer.

type ObserverMultiplexer

type ObserverMultiplexer struct {
	// contains filtered or unexported fields
}

ObserverMultiplexer multiplexes calls to an arbitrary number of Observers.

func NewObserverMultiplexer

func NewObserverMultiplexer(rpcObservers ...Observer) *ObserverMultiplexer

NewObserverMultiplexer returns a new ObserverMultiplexer with the provided RequestNotifiers.

func (*ObserverMultiplexer) ServerReply

func (m *ObserverMultiplexer) ServerReply(req Request, hdr *Header, body interface{})

ServerReply implements Observer.

func (*ObserverMultiplexer) ServerRequest

func (m *ObserverMultiplexer) ServerRequest(hdr *Header, body interface{})

ServerRequest implements Observer.

type Recorder

type Recorder interface {
	HandleRequest(hdr *Header, body interface{}) error
	HandleReply(req Request, replyHdr *Header, body interface{}) error
}

Recorder represents something the connection uses to record requests and replies. Recording a message can fail (for example for audit logging), and when it does the request should be failed as well.

type RecorderFactory

type RecorderFactory func() Recorder

RecorderFactory is a function that returns a recorder to record details of a single request/response.

type Request

type Request struct {
	// Type holds the type of object to act on.
	Type string

	// Version holds the version of Type we will be acting on
	Version int

	// Id holds the id of the object to act on.
	Id string

	// Action holds the action to perform on the object.
	Action string
}

Request represents an RPC to be performed, absent its parameters.

type RequestError

type RequestError struct {
	Message string
	Code    string
	Info    map[string]interface{}
}

RequestError represents an error returned from an RPC request.

func (*RequestError) Error

func (e *RequestError) Error() string

func (*RequestError) ErrorCode

func (e *RequestError) ErrorCode() string

ErrorCode returns the error code associated with the error.

func (*RequestError) ErrorInfo

func (e *RequestError) ErrorInfo() map[string]interface{}

ErrorInfo returns the error information associated with the error.

func (*RequestError) UnmarshalInfo

func (e *RequestError) UnmarshalInfo(to interface{}) error

UnmarshalInfo attempts to unmarshal the information contained in the Info field of a RequestError into an object instance a pointer to which is passed via the to argument. The method will return an error if a non-pointer arg is provided.

type Root

type Root interface {
	Killer
	// FindMethod returns a MethodCaller for the given method name. The
	// method will be associated with the given facade and version.
	FindMethod(rootName string, version int, methodName string) (rpcreflect.MethodCaller, error)
	// StartTrace starts a trace for a given request.
	StartTrace(context.Context) (context.Context, trace.Span)
}

Root represents a type that can be used to lookup a Method and place calls on that method.

Directories

Path Synopsis
Package jsoncodec provides a JSON codec for the rpc package.
Package jsoncodec provides a JSON codec for the rpc package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL