rpc

package
v2.1.0-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 15, 2023 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Overview

Server-to-client reverse RPC

Idea: clients connect to server and server can call functions on clients (either on specified client or on all simultaneously). This is mainly for purpose of controller to call functions on agents, but can be used for different purposes too.

The implementation is modularized as follows:

server      [transport] <-> [Clients]    <- callers
                 ^
-----------------|----------------------------------
                 v
client      [transport] <-> [Dispatcher] -> handlers (functions)

Clients manages connected clients and provides an api for listing clients and calling functions.

Dispatcher dispatches messages from transport to specific handlers.

Handlers are simply functions `func(ctx, req) (resp, error)` that can be registered in HandlerRegistry.

Transports (client & server part) provide "actual transport IO".

This is a bit m:1:n architecture: * Dispatcher is a bridge between transports and handlers. * Clients is a bridge between transport and callers. Thus implementing additional handlers or transports should not require any changes in Client nor Dispatcher.

Transports should only be concerned in connecting and passing ClientToServer and ServerToClient messages. Currently implemented transports are:

  • Bidirectional-grpc-stream-based (rpc.v1.Coordinator.Connect).

As grpc bidirectional stream may not work everywhere, possible and planned transports are:

  • Heartbeat-based (heartbeat request passes ClientToServer messages, response contains ServerToClient messages; some additional latency is introduced).
  • Long-poll.

Index

Constants

This section is empty.

Variables

ClientModule are components needed for client-side of rpc

Note: Not providing StreamClient, as this package is generic and does not know what to connect to.

ServerModule are components needed for server-side of rpc.

Functions

func Call

func Call[RespValue any, Resp ExactMessage[RespValue]](clients *Clients, client string, req proto.Message) (*RespValue, error)

Call is Clients.Call with conversion to typed response

Intended as a building-block for typed wrappers.

func RegisterFunction

func RegisterFunction[Req, Resp proto.Message](
	registry *HandlerRegistry,
	handler func(context.Context, Req) (Resp, error),
) error

RegisterFunction register a function as ia handler in the registry

Only one function for a given Req type can be registered.

Note: This is not a method due to golang's generic's limitations.

func RegisterStreamClient

func RegisterStreamClient(
	clientName string,
	lc fx.Lifecycle,
	handlers *HandlerRegistry,
	connWrapper grpcclient.ClientConnectionWrapper,
	addr string,
)

RegisterStreamClient is an FX helper to connect new StreamClient to a given addr.

func RegisterStreamServer

func RegisterStreamServer(handler *StreamServer, server *grpc.Server, healthsrv *health.Server)

RegisterStreamServer registers the handler on grpc.Server

To be used in fx.Invoke.

Types

type Clients

type Clients struct {
	// contains filtered or unexported fields
}

Clients manages connected clients and allows calling functions on them.

func NewClients

func NewClients() *Clients

NewClients creates new Clients.

func (*Clients) Call

func (c *Clients) Call(clientName string, req *anypb.Any) ([]byte, error)

Call calls given client.

func (*Clients) CallAll

func (c *Clients) CallAll(req *anypb.Any) []RawResult

CallAll calls all clients and returns responses from all of them (in arbitrary order).

func (*Clients) Join

func (c *Clients) Join(
	clientName string,
	nextID uint64,
) (<-chan *rpcv1.ServerToClient, chan<- *rpcv1.Response)

Join marks a new client as connected.

When client disconnects, transport should close responses channel.

ServerToClient channel will remain unclosed unless client with the same name will Join. In such case, transport should disconnect the client, as such connection is "stale". (Note: in this case transport should close responses channel as usual too).

func (*Clients) List

func (c *Clients) List() []string

List lists all active clients.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher dispatches ServerToClient calls and calls appropriate handler

Dispatcher is intended to be used in a synchronized way from a *single* transport.

func (*Dispatcher) Chan

func (d *Dispatcher) Chan() <-chan *rpcv1.Response

Chan returns a channel with responses that transport should forward to server.

func (*Dispatcher) ProcessCommand

func (d *Dispatcher) ProcessCommand(message *rpcv1.ServerToClient)

ProcessCommand processes a single server-to-client command.

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

Stop cancels all the in-flight requests.

Note: Doesn't wait for completion.

type ExactMessage

type ExactMessage[T any] interface {
	proto.Message
	*T
}

ExactMessage is like proto.Message, but known to point to T

See https://go.googlesource.com/proposal/+/refs/heads/master/design/43651-type-parameters.md#pointer-method-example

type HandlerRegistry

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry allow registering handlers and can start a dispatcher.

This is intended to be used at at fx provide/invoke stage.

func NewHandlerRegistry

func NewHandlerRegistry() *HandlerRegistry

NewHandlerRegistry creates a new HandlerRegistry.

func (*HandlerRegistry) StartDispatcher

func (r *HandlerRegistry) StartDispatcher() *Dispatcher

StartDispatcher starts a dispatcher using registered handlers.

Multiple independent dispatchers can be started using single handler registry.

type RawResult

type RawResult = Result[[]byte]

RawResult is a Result that is not yet converted to the proper response type.

type Result

type Result[Resp any] struct {
	Client  string
	Success Resp
	Err     error
}

Result is a result from one client. Used in CallAll.

func CallAll

func CallAll[RespValue any, Resp ExactMessage[RespValue]](clients *Clients, req proto.Message) ([]Result[*RespValue], error)

CallAll is Clients.CallAll with conversion to typed response

Intended as a building-block for typed wrappers.

type StreamClient

type StreamClient struct {
	// contains filtered or unexported fields
}

StreamClient is a client part of "stream" transport, which uses bidirectional grpc stream methods.

func NewStreamClient

func NewStreamClient(
	name string,
	handlers *HandlerRegistry,
	client rpcv1.CoordinatorClient,
) *StreamClient

NewStreamClient creates a new StreamClient.

func (*StreamClient) Start

func (c *StreamClient) Start()

Start starts the StreamClient.

func (*StreamClient) Stop

func (c *StreamClient) Stop()

Stop stops the StreamClient.

type StreamServer

type StreamServer struct {
	// contains filtered or unexported fields
}

StreamServer is a server part of "stream" transport, which uses bidirectional grpc stream methods.

Implements rpc.v1.Coordinator.

func NewStreamServer

func NewStreamServer(clients *Clients) *StreamServer

NewStreamServer creates new StreamServer plugging into given Clients as a transport.

func (*StreamServer) Connect

Connect implements rpcv1.CoordinatorServer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL