Documentation ¶
Overview ¶
Package proxy provides a reverse proxy handler for gRPC.
The implementation allows a `grpc.Server` to pass a received ServerStream to a ClientStream without understanding the semantics of the messages exchanged. It basically provides a transparent reverse-proxy.
This package is intentionally generic, exposing a `StreamDirector` function that allows users of this package to implement whatever logic of backend-picking, dialing and service verification to perform.
See examples on documented functions.
Index ¶
- func Codec() encoding.CodecV2
- func CodecWithParent(fallback encoding.CodecV2) encoding.CodecV2
- func NewFrame(payload []byte) any
- func RegisterService(server grpc.ServiceRegistrar, director StreamDirector, serviceName string, ...)
- func TransparentHandler(director StreamDirector, options ...Option) grpc.StreamHandler
- type Backend
- type Mode
- type Option
- type ServerStreamWrapper
- type SingleBackend
- type StreamDirector
- type StreamedDetectorFunc
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Codec ¶
Codec returns a proxying encoding.CodecV2 with the default protobuf codec as parent.
See CodecWithParent.
func CodecWithParent ¶
CodecWithParent returns a proxying encoding.CodecV2 with a user provided codec as parent.
This codec is *crucial* to the functioning of the proxy. It allows the proxy server to be oblivious to the schema of the forwarded messages. It basically treats a gRPC message frame as raw bytes. However, if the server handler, or the client caller are not proxy-internal functions it will fall back to trying to decode the message using a fallback codec.
func RegisterService ¶
func RegisterService(server grpc.ServiceRegistrar, director StreamDirector, serviceName string, options ...Option)
RegisterService sets up a proxy handler for a particular gRPC service and method. The behavior is the same as if you were registering a handler method, e.g. from a codegenerated pb.go file.
This can *only* be used if the `server` also uses grpc.CustomCodec() ServerOption.
Example ¶
ExampleRegisterService is a simple example of registering a service with the proxy.
package main import ( "google.golang.org/grpc" "github.com/siderolabs/grpc-proxy/proxy" ) var director proxy.StreamDirector func main() { // A gRPC server with the proxying codec enabled. server := grpc.NewServer(grpc.ForceServerCodecV2(proxy.Codec())) // Register a TestService with 4 of its methods explicitly. proxy.RegisterService(server, director, "talos.testproto.TestService", proxy.WithMethodNames("PingEmpty", "Ping", "PingError", "PingList"), proxy.WithStreamedMethodNames("PingList"), ) }
Output:
func TransparentHandler ¶
func TransparentHandler(director StreamDirector, options ...Option) grpc.StreamHandler
TransparentHandler returns a handler that attempts to proxy all requests that are not registered in the server. The indented use here is as a transparent proxy, where the server doesn't know about the services implemented by the backends. It should be used as a `grpc.UnknownServiceHandler`.
This can *only* be used if the `server` also uses grpc.CustomCodec() ServerOption.
Example ¶
ExampleTransparentHandler is an example of redirecting all requests to the proxy.
package main import ( "google.golang.org/grpc" "github.com/siderolabs/grpc-proxy/proxy" ) var director proxy.StreamDirector func main() { grpc.NewServer( grpc.ForceServerCodecV2(proxy.Codec()), grpc.UnknownServiceHandler(proxy.TransparentHandler(director)), ) }
Output:
Types ¶
type Backend ¶
type Backend interface { // String provides backend name for logging and errors. String() string // GetConnection returns a grpc connection to the backend. // // The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want // to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you // *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned. GetConnection(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) // AppendInfo is called to enhance response from the backend with additional data. // // Parameter streaming indicates if response is delivered in streaming mode or not. // // Usecase might be appending backend endpoint (or name) to the protobuf serialized response, so that response is enhanced // with source information. This is particularly important for one to many calls, when it is required to identify // response from each of the backends participating in the proxying. // // If not additional proxying is required, simply returning the buffer without changes works fine. AppendInfo(streaming bool, resp []byte) ([]byte, error) // BuildError is called to convert error from upstream into response field. // // BuildError is never called for one to one proxying, in that case all the errors are returned back to the caller // as grpc errors. Parameter streaming indicates if response is delivered in streaming mode or not. // // When proxying one to many, if one the requests fails or upstream returns an error, it is undesirable to fail the whole // request and discard responses from other backends. BuildError converts (marshals) error from backend into protobuf encoded // response which is analyzed by the caller, so that caller reaching out to N upstreams receives N1 successful responses and // N2 error responses so that N1 + N2 == N. // // If BuildError returns nil, error is returned as grpc error (failing whole request). BuildError(streaming bool, err error) ([]byte, error) }
Backend wraps information about upstream connection.
For simple one-to-one proxying, not much should be done in the Backend, simply providing a connection is enough.
When proxying one-to-many and aggregating results, Backend might be used to append additional fields to upstream response to support more complicated proxying.
type Mode ¶
type Mode int
Mode specifies proxying mode: one2one (transparent) or one2many (aggregation, error wrapping).
type Option ¶
type Option func(*handlerOptions)
Option configures gRPC proxy.
func WithMethodNames ¶
WithMethodNames configures list of method names to proxy for non-transparent handler.
func WithStreamedDetector ¶
func WithStreamedDetector(detector StreamedDetectorFunc) Option
WithStreamedDetector configures a function to detect streamed methods.
This is only important for one2many proxying.
func WithStreamedMethodNames ¶
WithStreamedMethodNames configures list of streamed method names.
This is only important for one2many proxying. This option can't be used with TransparentHandler.
type ServerStreamWrapper ¶
type ServerStreamWrapper struct { grpc.ServerStream // contains filtered or unexported fields }
ServerStreamWrapper wraps grpc.ServerStream and adds locking to the send path.
func (*ServerStreamWrapper) SendHeader ¶
func (wrapper *ServerStreamWrapper) SendHeader(md metadata.MD) error
SendHeader sends the header metadata. The provided md and headers set by SetHeader() will be sent. It fails if called multiple times.
func (*ServerStreamWrapper) SendMsg ¶
func (wrapper *ServerStreamWrapper) SendMsg(m any) error
SendMsg sends a message. On error, SendMsg aborts the stream and the error is returned directly.
SendMsg blocks until:
- There is sufficient flow control to schedule m with the transport, or
- The stream is done, or
- The stream breaks.
SendMsg does not wait until the message is received by the client. An untimely stream closure may result in lost messages.
It is safe to have a goroutine calling SendMsg and another goroutine calling RecvMsg on the same stream at the same time, but it is not safe to call SendMsg on the same stream in different goroutines.
func (*ServerStreamWrapper) SetHeader ¶
func (wrapper *ServerStreamWrapper) SetHeader(md metadata.MD) error
SetHeader sets the header metadata.
It may be called multiple times. When call multiple times, all the provided metadata will be merged. All the metadata will be sent out when one of the following happens:
- ServerStream.SendHeader() is called;
- The first response is sent out;
- An RPC status is sent out (error or success).
func (*ServerStreamWrapper) SetTrailer ¶
func (wrapper *ServerStreamWrapper) SetTrailer(md metadata.MD)
SetTrailer sets the trailer metadata which will be sent with the RPC status. When called more than once, all the provided metadata will be merged.
type SingleBackend ¶
type SingleBackend struct { // GetConn returns a grpc connection to the backend. // // The context returned from this function should be the context for the *outgoing* (to backend) call. In case you want // to forward any Metadata between the inbound request and outbound requests, you should do it manually. However, you // *must* propagate the cancel function (`context.WithCancel`) of the inbound context to the one returned. GetConn func(ctx context.Context) (context.Context, *grpc.ClientConn, error) }
SingleBackend implements a simple wrapper around get connection function of one to one proxying.
SingleBackend implements Backend interface and might be used as an easy wrapper for one to one proxying.
func (*SingleBackend) AppendInfo ¶
func (sb *SingleBackend) AppendInfo(_ bool, resp []byte) ([]byte, error)
AppendInfo is called to enhance response from the backend with additional data.
func (*SingleBackend) BuildError ¶
func (sb *SingleBackend) BuildError(bool, error) ([]byte, error)
BuildError is called to convert error from upstream into response field.
func (*SingleBackend) GetConnection ¶
func (sb *SingleBackend) GetConnection(ctx context.Context, _ string) (context.Context, *grpc.ClientConn, error)
GetConnection returns a grpc connection to the backend.
func (*SingleBackend) String ¶
func (sb *SingleBackend) String() string
type StreamDirector ¶
StreamDirector returns a list of Backend objects to forward the call to.
There are two proxying modes:
- one to one: StreamDirector returns a single Backend object - proxying is done verbatim, Backend.AppendInfo might be used to enhance response with source information (or it might be skipped).
- one to many: StreamDirector returns more than one Backend object - for unary calls responses from Backend objects are aggregated by concatenating protobuf responses (requires top-level `repeated` protobuf definition) and errors are wrapped as responses via BuildError. Responses are potentially enhanced via AppendInfo.
The presence of the `Context` allows for rich filtering, e.g. based on Metadata (headers). If no handling is meant to be done, a `codes.NotImplemented` gRPC error should be returned.
It is worth noting that the StreamDirector will be fired *after* all server-side stream interceptors are invoked. So decisions around authorization, monitoring etc. are better to be handled there.
See the rather rich example.
Example ¶
Provide sa simple example of a director that shields internal services and dials a staging or production backend. This is a *very naive* implementation that creates a new connection on every request. Consider using pooling.
package main import ( "context" "log" "strings" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/siderolabs/grpc-proxy/proxy" ) var director proxy.StreamDirector func main() { simpleBackendGen := func(hostname string) (proxy.Backend, error) { conn, err := grpc.NewClient( hostname, grpc.WithDefaultCallOptions(grpc.ForceCodecV2(proxy.Codec())), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { return nil, err } return &proxy.SingleBackend{ GetConn: func(ctx context.Context) (context.Context, *grpc.ClientConn, error) { md, _ := metadata.FromIncomingContext(ctx) // Copy the inbound metadata explicitly. outCtx := metadata.NewOutgoingContext(ctx, md.Copy()) return outCtx, conn, nil }, }, nil } stagingBackend, err := simpleBackendGen("api-service.staging.svc.local") if err != nil { log.Fatal("failed to create staging backend:", err) } prodBackend, err := simpleBackendGen("api-service.prod.svc.local") if err != nil { log.Fatal("failed to create production backend:", err) } director = func(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) { // Make sure we never forward internal services. if strings.HasPrefix(fullMethodName, "/com.example.internal.") { return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method") } md, ok := metadata.FromIncomingContext(ctx) if ok { // Decide on which backend to dial if val, exists := md[":authority"]; exists && val[0] == "staging.api.example.com" { return proxy.One2One, []proxy.Backend{stagingBackend}, nil } else if val, exists := md[":authority"]; exists && val[0] == "api.example.com" { return proxy.One2One, []proxy.Backend{prodBackend}, nil } } return proxy.One2One, nil, status.Errorf(codes.Unimplemented, "Unknown method") } }
Output:
type StreamedDetectorFunc ¶
StreamedDetectorFunc reports is gRPC is doing streaming (only for one2many proxying).