proxy

package
v15.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 24, 2022 License: MIT, Apache-2.0 Imports: 12 Imported by: 0

Documentation

Overview

Package proxy provides a reverse proxy handler for gRPC.

The implementation allows a `grpc.Server` to pass a received ServerStream to a ClientStream without understanding the semantics of the messages exchanged. It basically provides a transparent reverse-proxy.

This package is intentionally generic, exposing a `StreamDirector` function that allows users of this package to implement whatever logic of backend-picking, dialing and service verification to perform.

See examples on documented functions.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrInvalidPeekCount = errors.New("peek count must be greater than zero")

ErrInvalidPeekCount indicates the director function requested an invalid peek quanity

Functions

func CodecWithParent

func CodecWithParent(fallback encoding.Codec) encoding.Codec

CodecWithParent returns a proxying encoding.Codec with a user provided codec as parent.

This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes. However, if the server handler, or the client caller are not proxy-internal functions it will fall back to trying to decode the message using a fallback codec.

func NewCodec

func NewCodec() encoding.Codec

NewCodec returns a proxying encoding.Codec with the default protobuf codec as parent.

See CodecWithParent.

func RegisterService

func RegisterService(server *grpc.Server, director StreamDirector, serviceName string, methodNames ...string)

RegisterService sets up a proxy handler for a particular gRPC service and method. The behaviour is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.

This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.

Example
package main

import (
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
	"google.golang.org/grpc"
)

var director proxy.StreamDirector

func main() {
	// A gRPC server with the proxying codec enabled.
	server := grpc.NewServer(grpc.ForceServerCodec(proxy.NewCodec()))
	// Register a TestService with 4 of its methods explicitly.
	proxy.RegisterService(server, director,
		"mwitkow.testproto.TestService",
		"PingEmpty", "Ping", "PingError", "PingList")
}
Output:

func RegisterStreamHandlers

func RegisterStreamHandlers(server *grpc.Server, serviceName string, streamers map[string]grpc.StreamHandler)

RegisterStreamHandlers sets up stream handlers for a set of gRPC methods for a given service. streamers is a map of method to grpc.StreamHandler eg:

streamHandler := func(srv interface{}, stream ServerStream) error  {
                      /** do some stuff **/
                      return nil
                 }

RegisterStreamHandlers(grpcServer, "MyGrpcService", map[string]grpc.StreamHandler{"Method1": streamHandler}) note: multiple calls with the same serviceName will result in a fatal

func TransparentHandler

func TransparentHandler(director StreamDirector) grpc.StreamHandler

TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the backends. It should be used as a `grpc.UnknownServiceHandler`.

This can *only* be used if the `server` also uses grpcproxy.CodecForServer() ServerOption.

Example
package main

import (
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
	"google.golang.org/grpc"
)

var director proxy.StreamDirector

func main() {
	grpc.NewServer(
		grpc.ForceServerCodec(proxy.NewCodec()),
		grpc.UnknownServiceHandler(proxy.TransparentHandler(director)))
}
Output:

Types

type Destination

type Destination struct {
	// Ctx is the context used for the connection.
	Ctx context.Context
	// Conn is the GRPC client connection.
	Conn *grpc.ClientConn
	// Msg is the initial message which shall be sent to the destination. This is used in order
	// to allow for re-writing the header message.
	Msg []byte
	// ErrHandler is invoked when proxying to the destination fails. It can be used to swallow
	// errors in case proxying failures are considered to be non-fatal. If all errors are
	// swallowed, the proxied RPC will be successful.
	ErrHandler func(error) error
}

Destination contains a client connection as well as a rewritten protobuf message

type StreamDirector

type StreamDirector func(ctx context.Context, fullMethodName string, peeker StreamPeeker) (*StreamParameters, error)

StreamDirector returns a gRPC ClientConn to be used to forward the call to.

The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned.

The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned.

It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors are invoked. So decisions around authorization, monitoring etc. are better to be handled there.

See the rather rich example.

Example

Provide sa simple example of a director that shields internal services and dials a staging or production backend. This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.

package main

import (
	"context"
	"strings"

	"gitlab.com/gitlab-org/gitaly/v15/internal/metadata"
	"gitlab.com/gitlab-org/gitaly/v15/internal/praefect/grpc-proxy/proxy"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"

	grpc_metadata "google.golang.org/grpc/metadata"
	"google.golang.org/grpc/status"
)

var director proxy.StreamDirector

func main() {
	director = func(ctx context.Context, fullMethodName string, _ proxy.StreamPeeker) (*proxy.StreamParameters, error) {
		// Make sure we never forward internal services.
		if strings.HasPrefix(fullMethodName, "/com.example.internal.") {
			return nil, status.Errorf(codes.Unimplemented, "Unknown method")
		}
		md, ok := grpc_metadata.FromIncomingContext(ctx)
		if ok {
			// Decide on which backend to dial
			if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" {
				// Make sure we use DialContext so the dialing can be cancelled/time out together with the context.
				conn, err := grpc.DialContext(ctx, "api-service.staging.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
				return proxy.NewStreamParameters(proxy.Destination{
					Conn: conn,
					Ctx:  metadata.IncomingToOutgoing(ctx),
				}, nil, nil, nil), err
			} else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" {
				conn, err := grpc.DialContext(ctx, "api-service.prod.svc.local", grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())))
				return proxy.NewStreamParameters(proxy.Destination{
					Conn: conn,
					Ctx:  metadata.IncomingToOutgoing(ctx),
				}, nil, nil, nil), err
			}
		}
		return nil, status.Errorf(codes.Unimplemented, "Unknown method")
	}
}
Output:

type StreamParameters

type StreamParameters struct {
	// contains filtered or unexported fields
}

StreamParameters encapsulates streaming parameters the praefect coordinator returns to the proxy handler

func NewStreamParameters

func NewStreamParameters(primary Destination, secondaries []Destination, reqFinalizer func() error, callOpts []grpc.CallOption) *StreamParameters

NewStreamParameters returns a new instance of StreamParameters

func (*StreamParameters) CallOptions

func (s *StreamParameters) CallOptions() []grpc.CallOption

CallOptions returns call options

func (*StreamParameters) Primary

func (s *StreamParameters) Primary() Destination

func (*StreamParameters) RequestFinalizer

func (s *StreamParameters) RequestFinalizer() error

RequestFinalizer calls the request finalizer

func (*StreamParameters) Secondaries

func (s *StreamParameters) Secondaries() []Destination

type StreamPeeker

type StreamPeeker interface {
	// Peek allows a director to peek one message into the request stream without
	// removing those messages from the stream that will be forwarded to
	// the backend server.
	Peek() (frame []byte, _ error)
}

StreamPeeker abstracts away the gRPC stream being forwarded so that it can be inspected and modified.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL