Documentation ¶
Overview ¶
Package backchannel implements connection multiplexing that allows for invoking gRPC methods from the server to the client.
gRPC allows only for invoking RPCs from client to the server. Invoking RPCs from the server to the client can be useful in some cases such as tunneling through firewalls. While implementing such a use case would be possible with plain bidirectional streams, the approach has various limitations that force additional work on the user. All messages in a single stream are ordered and processed sequentially. If concurrency is desired, this would require the user to implement their own concurrency handling. Request routing and cancellations would also have to be implemented separately on top of the bidirectional stream.
To do away with these problems, this package provides a multiplexed transport for running two independent gRPC sessions on a single connection. This allows for dialing back to the client from the server to establish another gRPC session where the server and client roles are switched.
The server side supports clients that are unaware of the multiplexing. The server peeks the incoming network stream to see if it starts with the magic bytes that indicate a multiplexing aware client. If the magic bytes are present, the server initiates the multiplexing session and dials back to the client over the already established network connection. If the magic bytes are not present, the server restores the the bytes back into the original network stream and handles it without a multiplexing session.
Usage:
- Implement a ServerFactory, which is simply a function that returns a Server that can serve on the backchannel connection. Plug in the ClientHandshake to the Clientconn via grpc.WithTransportCredentials when dialing. This ensures all connections established by gRPC work with a multiplexing session and have a backchannel Server serving.
- Configure the ServerHandshake on the server side by passing it into the gRPC server via the grpc.Creds option. The ServerHandshake method is called on each newly established connection. It peeks the network stream to see if a multiplexing session should be initiated. If so, it also dials back to the client's backchannel server. Server makes the backchannel connection's available later via the Registry's Backchannel method. The ID of the peer associated with the current RPC handler can be fetched via GetPeerID. The returned ID can be used to access the correct backchannel connection from the Registry.
Example ¶
package main import ( "context" "fmt" "net" "github.com/sirupsen/logrus" "gitlab.com/gitlab-org/gitaly/v14/internal/backchannel" "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) func main() { // Open the server's listener. ln, err := net.Listen("tcp", "localhost:0") if err != nil { fmt.Printf("failed to start listener: %v", err) return } // Registry is for storing the open backchannels. It should be passed into the ServerHandshaker // which creates the backchannel connections and adds them to the registry. The RPC handlers // can use the registry to access available backchannels by their peer ID. registry := backchannel.NewRegistry() logger := logrus.NewEntry(logrus.New()) // ServerHandshaker initiates the multiplexing session on the server side. Once that is done, // it creates the backchannel connection and stores it into the registry. For each connection, // the ServerHandshaker passes down the peer ID via the context. The peer ID identifies a // backchannel connection. handshaker := backchannel.NewServerHandshaker(logger, insecure.NewCredentials(), registry, nil) // Create the server srv := grpc.NewServer( grpc.Creds(handshaker), grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error { fmt.Println("Gitaly received a transactional mutator") backchannelID, err := backchannel.GetPeerID(stream.Context()) if err == backchannel.ErrNonMultiplexedConnection { // This call is from a client that is not multiplexing aware. Client is not // Praefect, so no need to perform voting. The client could be for example // GitLab calling Gitaly directly. fmt.Println("Gitaly responding to a non-multiplexed client") return stream.SendMsg(&gitalypb.CreateBranchResponse{}) } else if err != nil { return fmt.Errorf("get peer id: %w", err) } backchannelConn, err := registry.Backchannel(backchannelID) if err != nil { return fmt.Errorf("get backchannel: %w", err) } fmt.Println("Gitaly sending vote to Praefect via backchannel") if err := backchannelConn.Invoke( stream.Context(), "/Praefect/VoteTransaction", &gitalypb.VoteTransactionRequest{}, &gitalypb.VoteTransactionResponse{}, ); err != nil { return fmt.Errorf("invoke backchannel: %w", err) } fmt.Println("Gitaly received vote response via backchannel") fmt.Println("Gitaly responding to the transactional mutator") return stream.SendMsg(&gitalypb.CreateBranchResponse{}) }), ) defer srv.Stop() // Start the server go func() { if err := srv.Serve(ln); err != nil { fmt.Printf("failed to serve: %v", err) } }() fmt.Printf("Invoke with a multiplexed client:\n\n") if err := invokeWithMuxedClient(logger, ln.Addr().String()); err != nil { fmt.Printf("failed to invoke with muxed client: %v", err) return } fmt.Printf("\nInvoke with a non-multiplexed client:\n\n") if err := invokeWithNormalClient(ln.Addr().String()); err != nil { fmt.Printf("failed to invoke with non-muxed client: %v", err) return } } func invokeWithMuxedClient(logger *logrus.Entry, address string) error { // clientHandshaker's ClientHandshake gets called on each established connection. The Server returned by the // ServerFactory is started on Praefect's end of the connection, which Gitaly can call. clientHandshaker := backchannel.NewClientHandshaker(logger, func() backchannel.Server { return grpc.NewServer(grpc.UnknownServiceHandler(func(srv interface{}, stream grpc.ServerStream) error { fmt.Println("Praefect received vote via backchannel") fmt.Println("Praefect responding via backchannel") return stream.SendMsg(&gitalypb.VoteTransactionResponse{}) })) }) return invokeWithOpts(address, grpc.WithTransportCredentials(clientHandshaker.ClientHandshake(insecure.NewCredentials()))) } func invokeWithNormalClient(address string) error { return invokeWithOpts(address, grpc.WithInsecure()) } func invokeWithOpts(address string, opts ...grpc.DialOption) error { clientConn, err := grpc.Dial(address, opts...) if err != nil { return fmt.Errorf("dial server: %w", err) } if err := clientConn.Invoke(context.Background(), "/Gitaly/Mutator", &gitalypb.CreateBranchRequest{}, &gitalypb.CreateBranchResponse{}); err != nil { return fmt.Errorf("call server: %w", err) } if err := clientConn.Close(); err != nil { return fmt.Errorf("close clientConn: %w", err) } return nil }
Output: Invoke with a multiplexed client: Gitaly received a transactional mutator Gitaly sending vote to Praefect via backchannel Praefect received vote via backchannel Praefect responding via backchannel Gitaly received vote response via backchannel Gitaly responding to the transactional mutator Invoke with a non-multiplexed client: Gitaly received a transactional mutator Gitaly responding to a non-multiplexed client
Index ¶
- Variables
- func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry, ...) credentials.TransportCredentials
- func WithID(authInfo credentials.AuthInfo, id ID) credentials.AuthInfo
- type ClientHandshaker
- type ID
- type Registry
- type Server
- type ServerFactory
- type ServerHandshaker
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ErrNonMultiplexedConnection = errors.New("non-multiplexed connection")
ErrNonMultiplexedConnection is returned when attempting to get the peer id of a non-multiplexed connection.
Functions ¶
func NewServerHandshaker ¶
func NewServerHandshaker(logger *logrus.Entry, tc credentials.TransportCredentials, reg *Registry, dialOpts []grpc.DialOption) credentials.TransportCredentials
NewServerHandshaker returns a new server side implementation of the backchannel. The provided TransportCredentials are handshaked prior to initializing the multiplexing session. The Registry is used to store the backchannel connections. DialOptions can be used to set custom dial options for the backchannel connections. They must not contain a dialer or transport credentials as those set by the handshaker.
func WithID ¶
func WithID(authInfo credentials.AuthInfo, id ID) credentials.AuthInfo
WithID stores the ID in the provided AuthInfo so it can be later accessed by the RPC handler. This is exported to facilitate testing.
Types ¶
type ClientHandshaker ¶
type ClientHandshaker struct {
// contains filtered or unexported fields
}
ClientHandshaker implements the client side handshake of the multiplexed connection.
func NewClientHandshaker ¶
func NewClientHandshaker(logger *logrus.Entry, serverFactory ServerFactory) ClientHandshaker
NewClientHandshaker returns a new client side implementation of the backchannel. The provided logger is used to log multiplexing errors.
func (ClientHandshaker) ClientHandshake ¶
func (ch ClientHandshaker) ClientHandshake(tc credentials.TransportCredentials) credentials.TransportCredentials
ClientHandshake returns TransportCredentials that perform the client side multiplexing handshake and start the backchannel Server on the established connections. The transport credentials are used to intiliaze the connection prior to the multiplexing.
type ID ¶
type ID uint64
ID is a monotonically increasing number that uniquely identifies a peer connection.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is a thread safe registry for backchannels. It enables accessing the backchannels via a unique ID.
func (*Registry) Backchannel ¶
func (r *Registry) Backchannel(id ID) (*grpc.ClientConn, error)
Backchannel returns a backchannel for the ID. Returns an error if no backchannel is registered for the ID.
func (*Registry) RegisterBackchannel ¶
func (r *Registry) RegisterBackchannel(conn *grpc.ClientConn) ID
RegisterBackchannel registers a new backchannel and returns its unique ID.
func (*Registry) RemoveBackchannel ¶
RemoveBackchannel removes a backchannel from the registry.
type Server ¶
type Server interface { // Serve starts serving on the listener. Serve(net.Listener) error // Stops the server and closes all connections. Stop() }
Server is the interface of a backchannel server.
type ServerFactory ¶
type ServerFactory func() Server
ServerFactory returns the server that should serve on the backchannel. Each invocation should return a new server as the servers get stopped when a backchannel closes.
type ServerHandshaker ¶
type ServerHandshaker struct { credentials.TransportCredentials // contains filtered or unexported fields }
ServerHandshaker implements the server side handshake of the multiplexed connection.
func (ServerHandshaker) ServerHandshake ¶
func (s ServerHandshaker) ServerHandshake(conn net.Conn) (net.Conn, credentials.AuthInfo, error)
ServerHandshake peeks the connection to determine whether the client supports establishing a backchannel by multiplexing the network connection. If so, it establishes a gRPC ClientConn back to the client and stores it's ID in the AuthInfo where it can be later accessed by the RPC handlers. gRPC sets an IO timeout on the connection before calling ServerHandshake, so we don't have to handle timeouts separately.