Documentation ¶
Overview ¶
Server-to-client reverse RPC
Idea: clients connect to server and server can call functions on clients (either on specified client or on all simultaneously). This is mainly for purpose of controller to call functions on agents, but can be used for different purposes too.
The implementation is modularized as follows:
server [transport] <-> [Clients] <- callers ^ -----------------|---------------------------------- v client [transport] <-> [Dispatcher] -> handlers (functions)
Clients manages connected clients and provides an api for listing clients and calling functions.
Dispatcher dispatches messages from transport to specific handlers.
Handlers are simply functions `func(ctx, req) (resp, error)` that can be registered in HandlerRegistry.
Transports (client & server part) provide "actual transport IO".
This is a bit m:1:n architecture: * Dispatcher is a bridge between transports and handlers. * Clients is a bridge between transport and callers. Thus implementing additional handlers or transports should not require any changes in Client nor Dispatcher.
Transports should only be concerned in connecting and passing ClientToServer and ServerToClient messages. Currently implemented transports are:
- Bidirectional-grpc-stream-based (rpc.v1.Coordinator.Connect).
As grpc bidirectional stream may not work everywhere, possible and planned transports are:
- Heartbeat-based (heartbeat request passes ClientToServer messages, response contains ServerToClient messages; some additional latency is introduced).
- Long-poll.
Index ¶
- Variables
- func Call[RespValue any, Resp ExactMessage[RespValue]](clients *Clients, client string, req proto.Message) (*RespValue, error)
- func RegisterFunction[Req, Resp proto.Message](registry *HandlerRegistry, handler func(context.Context, Req) (Resp, error)) error
- func RegisterStreamClient(clientName string, lc fx.Lifecycle, handlers *HandlerRegistry, ...)
- func RegisterStreamServer(handler *StreamServer, server *grpc.Server, healthsrv *health.Server)
- type Clients
- type Dispatcher
- type ExactMessage
- type HandlerRegistry
- type RawResult
- type Result
- type StreamClient
- type StreamServer
Constants ¶
This section is empty.
Variables ¶
var ClientModule = fx.Provide(NewHandlerRegistry)
ClientModule are components needed for client-side of rpc
Note: Not providing StreamClient, as this package is generic and doesn't know what to connect to.
var ServerModule = fx.Options( fx.Provide(NewClients), fx.Provide(NewStreamServer), fx.Invoke(RegisterStreamServer), )
ServerModule are components needed for server-side of rpc.
Functions ¶
func Call ¶
func Call[RespValue any, Resp ExactMessage[RespValue]](clients *Clients, client string, req proto.Message) (*RespValue, error)
Call is Clients.Call with conversion to typed response
Intended as a building-block for typed wrappers.
func RegisterFunction ¶
func RegisterFunction[Req, Resp proto.Message]( registry *HandlerRegistry, handler func(context.Context, Req) (Resp, error), ) error
RegisterFunction register a function as ia handler in the registry
Only one function for a given Req type can be registered.
Note: This is not a method due to golang's generic's limitations.
func RegisterStreamClient ¶
func RegisterStreamClient( clientName string, lc fx.Lifecycle, handlers *HandlerRegistry, connWrapper grpcclient.ClientConnectionWrapper, addr string, )
RegisterStreamClient is an FX helper to connect new StreamClient to a given addr.
func RegisterStreamServer ¶
func RegisterStreamServer(handler *StreamServer, server *grpc.Server, healthsrv *health.Server)
RegisterStreamServer registers the handler on grpc.Server
To be used in fx.Invoke.
Types ¶
type Clients ¶
type Clients struct {
// contains filtered or unexported fields
}
Clients manages connected clients and allows calling functions on them.
func (*Clients) CallAll ¶
CallAll calls all clients and returns responses from all of them (in arbitrary order).
func (*Clients) Join ¶
func (c *Clients) Join( clientName string, nextID uint64, ) (<-chan *rpcv1.ServerToClient, chan<- *rpcv1.Response)
Join marks a new client as connected.
When client disconnects, transport should close responses channel.
ServerToClient channel will remain unclosed unless client with the same name will Join. In such case, transport should disconnect the client, as such connection is "stale". (Note: in this case transport should close responses channel as usual too).
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher dispatches ServerToClient calls and calls appropriate handler
Dispatcher is intended to be used in a synchronized way from a *single* transport.
func (*Dispatcher) Chan ¶
func (d *Dispatcher) Chan() <-chan *rpcv1.Response
Chan returns a channel with responses that transport should forward to server.
func (*Dispatcher) ProcessCommand ¶
func (d *Dispatcher) ProcessCommand(message *rpcv1.ServerToClient)
ProcessCommand processes a single server-to-client command.
func (*Dispatcher) Stop ¶
func (d *Dispatcher) Stop()
Stop cancels all the in-flight requests.
Note: Doesn't wait for completion.
type ExactMessage ¶
ExactMessage is like proto.Message, but known to point to T
type HandlerRegistry ¶
type HandlerRegistry struct {
// contains filtered or unexported fields
}
HandlerRegistry allow registering handlers and can start a dispatcher.
This is intended to be used at at fx provide/invoke stage.
func NewHandlerRegistry ¶
func NewHandlerRegistry() *HandlerRegistry
NewHandlerRegistry creates a new HandlerRegistry.
func (*HandlerRegistry) StartDispatcher ¶
func (r *HandlerRegistry) StartDispatcher() *Dispatcher
StartDispatcher starts a dispatcher using registered handlers.
Multiple independent dispatchers can be started using single handler registry.
type StreamClient ¶
type StreamClient struct {
// contains filtered or unexported fields
}
StreamClient is a client part of "stream" transport, which uses bidirectional grpc stream methods.
func NewStreamClient ¶
func NewStreamClient( name string, handlers *HandlerRegistry, client rpcv1.CoordinatorClient, ) *StreamClient
NewStreamClient creates a new StreamClient.
type StreamServer ¶
type StreamServer struct {
// contains filtered or unexported fields
}
StreamServer is a server part of "stream" transport, which uses bidirectional grpc stream methods.
Implements rpc.v1.Coordinator.
func NewStreamServer ¶
func NewStreamServer(clients *Clients) *StreamServer
NewStreamServer creates new StreamServer plugging into given Clients as a transport.
func (*StreamServer) Connect ¶
func (s *StreamServer) Connect(conn rpcv1.Coordinator_ConnectServer) error
Connect implements rpcv1.CoordinatorServer.