Documentation ¶
Overview ¶
Package grpctunnel provides tools to support tunneling of gRPC services: carrying gRPC calls over a gRPC stream.
This support includes "pinning" an RPC channel to a single server, by sending all requests on a single gRPC stream. There are also tools for adapting certain kinds of bidirectional stream RPCs into a stub such that a single stream looks like a sequence of unary calls.
This support also includes "reverse services", where a client can initiate a connection to a server and subsequently the server can then wrap that connection with an RPC stub, used to send requests from the server to that client (and the client then replies and sends responses back to the server).
Index ¶
- Variables
- func RegisterRgrpcServiceServer(s grpc.ServiceRegistrar, srv RgrpcServiceServer)
- func Serve(stream RgrpcService_OpenRgrpcClient, handlers HandlerMap, ...) error
- type Client
- func (c *Client) Close()
- func (c *Client) Context() context.Context
- func (c *Client) Done() <-chan struct{}
- func (c *Client) Err() error
- func (c *Client) Invoke(ctx context.Context, methodName string, req, resp interface{}, ...) error
- func (c *Client) IsDone() bool
- func (c *Client) NewStream(ctx context.Context, desc *grpc.StreamDesc, methodName string, ...) (grpc.ClientStream, error)
- type ClientToServer
- func (*ClientToServer) Descriptor() ([]byte, []int)deprecated
- func (x *ClientToServer) GetCancel() *empty.Empty
- func (m *ClientToServer) GetFrame() isClientToServer_Frame
- func (x *ClientToServer) GetHalfClose() *empty.Empty
- func (x *ClientToServer) GetMoreRequestData() []byte
- func (x *ClientToServer) GetNewStream() *NewStream
- func (x *ClientToServer) GetRequestMessage() *MessageData
- func (x *ClientToServer) GetStreamId() int64
- func (*ClientToServer) ProtoMessage()
- func (x *ClientToServer) ProtoReflect() protoreflect.Message
- func (x *ClientToServer) Reset()
- func (x *ClientToServer) String() string
- type ClientToServer_Cancel
- type ClientToServer_HalfClose
- type ClientToServer_MoreRequestData
- type ClientToServer_NewStream
- type ClientToServer_RequestMessage
- type CloseStream
- func (*CloseStream) Descriptor() ([]byte, []int)deprecated
- func (x *CloseStream) GetResponseTrailers() *Metadata
- func (x *CloseStream) GetStatus() *status.Status
- func (*CloseStream) ProtoMessage()
- func (x *CloseStream) ProtoReflect() protoreflect.Message
- func (x *CloseStream) Reset()
- func (x *CloseStream) String() string
- type HandlerMap
- type MessageData
- type Metadata
- type Metadata_Values
- type NewStream
- func (*NewStream) Descriptor() ([]byte, []int)deprecated
- func (x *NewStream) GetMethodName() string
- func (x *NewStream) GetRequestHeaders() *Metadata
- func (*NewStream) ProtoMessage()
- func (x *NewStream) ProtoReflect() protoreflect.Message
- func (x *NewStream) Reset()
- func (x *NewStream) String() string
- type RgrpcServiceClient
- type RgrpcServiceServer
- type RgrpcService_OpenRgrpcClient
- type RgrpcService_OpenRgrpcServer
- type ServerChannel
- type ServerToClient
- func (*ServerToClient) Descriptor() ([]byte, []int)deprecated
- func (x *ServerToClient) GetCloseStream() *CloseStream
- func (m *ServerToClient) GetFrame() isServerToClient_Frame
- func (x *ServerToClient) GetMoreResponseData() []byte
- func (x *ServerToClient) GetResponseHeaders() *Metadata
- func (x *ServerToClient) GetResponseMessage() *MessageData
- func (x *ServerToClient) GetStreamId() int64
- func (*ServerToClient) ProtoMessage()
- func (x *ServerToClient) ProtoReflect() protoreflect.Message
- func (x *ServerToClient) Reset()
- func (x *ServerToClient) String() string
- type ServerToClient_CloseStream
- type ServerToClient_MoreResponseData
- type ServerToClient_ResponseHeaders
- type ServerToClient_ResponseMessage
- type UnimplementedRgrpcServiceServer
- type UnsafeRgrpcServiceServer
Constants ¶
This section is empty.
Variables ¶
var File_rgrpc_proto protoreflect.FileDescriptor
var RgrpcService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "rgrpc.RgrpcService", HandlerType: (*RgrpcServiceServer)(nil), Methods: []grpc.MethodDesc{}, Streams: []grpc.StreamDesc{ { StreamName: "OpenRgrpc", Handler: _RgrpcService_OpenRgrpc_Handler, ServerStreams: true, ClientStreams: true, }, }, Metadata: "rgrpc.proto", }
RgrpcService_ServiceDesc is the grpc.ServiceDesc for RgrpcService service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)
Functions ¶
func RegisterRgrpcServiceServer ¶
func RegisterRgrpcServiceServer(s grpc.ServiceRegistrar, srv RgrpcServiceServer)
func Serve ¶ added in v0.2.0
func Serve(stream RgrpcService_OpenRgrpcClient, handlers HandlerMap, isClosing func() bool) error
ServeChannel uses the given services to handle incoming RPC requests that arrive via the given client tunnel stream. Since this is a reverse tunnel, RPC requests are initiated by the server, and this end (the client) processes the requests and sends responses.
It returns if, in the process of reading requests, it detects invalid usage of the stream (client sending references to invalid stream IDs or sending frames for a stream ID in improper order) or if the stream itself fails (for example, if the client cancels the tunnel or there is a network disruption).
On return the provided stream should be canceled as soon as possible. Typical usage looks like so:
ctx, cancel := context.WithCancel(ctx) defer cancel() stream, err := stub.OpenRgrpc(ctx) if err != nil { return err } return rgrpc.Serve(stream, handlers)
Types ¶
type Client ¶ added in v0.2.0
type Client struct { Peer *peer.Peer RequestHeaders metadata.MD // contains filtered or unexported fields }
func NewClient ¶ added in v0.2.0
func NewClient(stream RgrpcService_OpenRgrpcServer) *Client
func (*Client) NewStream ¶ added in v0.2.0
func (c *Client) NewStream(ctx context.Context, desc *grpc.StreamDesc, methodName string, opts ...grpc.CallOption) (grpc.ClientStream, error)
type ClientToServer ¶
type ClientToServer struct { // The ID of the stream. Stream IDs must be used in increasing order and // cannot be re-used. Unlike in the HTTP/2 protocol, the stream ID is 64-bit // so overflow in a long-lived channel is excessively unlikely. (If the // channel were used for a stream every nanosecond, it would take close to // 300 years to exhaust every ID and reach an overflow situation.) StreamId int64 `protobuf:"varint,1,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` // Types that are assignable to Frame: // *ClientToServer_NewStream // *ClientToServer_RequestMessage // *ClientToServer_MoreRequestData // *ClientToServer_HalfClose // *ClientToServer_Cancel Frame isClientToServer_Frame `protobuf_oneof:"frame"` // contains filtered or unexported fields }
ClientToServer is the message a client sends to a server.
For a single stream ID, the first such message must include the new_stream field. After that, there can be any number of requests sent, via the request_message field and additional messages thereafter that use the more_request_data field (for requests that are larger than 16kb). And finally, the RPC ends with either the half_close or cancel fields. If the half_close field is used, the RPC stream remains active so the server may continue to send response data. But, if the cancel field is used, the RPC stream is aborted and thus closed on both client and server ends. If a stream has been half-closed, the only allowed message from the client for that stream ID is one with the cancel field, to abort the remainder of the operation.
func (*ClientToServer) Descriptor
deprecated
func (*ClientToServer) Descriptor() ([]byte, []int)
Deprecated: Use ClientToServer.ProtoReflect.Descriptor instead.
func (*ClientToServer) GetCancel ¶
func (x *ClientToServer) GetCancel() *empty.Empty
func (*ClientToServer) GetFrame ¶
func (m *ClientToServer) GetFrame() isClientToServer_Frame
func (*ClientToServer) GetHalfClose ¶
func (x *ClientToServer) GetHalfClose() *empty.Empty
func (*ClientToServer) GetMoreRequestData ¶
func (x *ClientToServer) GetMoreRequestData() []byte
func (*ClientToServer) GetNewStream ¶
func (x *ClientToServer) GetNewStream() *NewStream
func (*ClientToServer) GetRequestMessage ¶
func (x *ClientToServer) GetRequestMessage() *MessageData
func (*ClientToServer) GetStreamId ¶
func (x *ClientToServer) GetStreamId() int64
func (*ClientToServer) ProtoMessage ¶
func (*ClientToServer) ProtoMessage()
func (*ClientToServer) ProtoReflect ¶
func (x *ClientToServer) ProtoReflect() protoreflect.Message
func (*ClientToServer) Reset ¶
func (x *ClientToServer) Reset()
func (*ClientToServer) String ¶
func (x *ClientToServer) String() string
type ClientToServer_Cancel ¶
type ClientToServer_HalfClose ¶
type ClientToServer_HalfClose struct { // Half-closes the stream, signaling that no more request messages will // be sent. No other messages, other than one with the cancel field set, // should be sent for this stream (at least not until it is terminated // by the server, after which the ID can be re-used). HalfClose *empty.Empty `protobuf:"bytes,5,opt,name=half_close,json=halfClose,proto3,oneof"` }
type ClientToServer_MoreRequestData ¶
type ClientToServer_MoreRequestData struct { // Sends a chunk of request data, for a request message that could not // wholly fit in a request_message field (e.g. > 16kb). MoreRequestData []byte `protobuf:"bytes,4,opt,name=more_request_data,json=moreRequestData,proto3,oneof"` }
type ClientToServer_NewStream ¶
type ClientToServer_NewStream struct { // Creates a new RPC stream, which includes request header metadata. The // stream ID must not be an already active stream. NewStream *NewStream `protobuf:"bytes,2,opt,name=new_stream,json=newStream,proto3,oneof"` }
type ClientToServer_RequestMessage ¶
type ClientToServer_RequestMessage struct { // Sends a message on the RPC stream. If the message is larger than 16k, // the rest of the message should be sent in chunks using the // more_request_data field (up to 16kb of data in each chunk). RequestMessage *MessageData `protobuf:"bytes,3,opt,name=request_message,json=requestMessage,proto3,oneof"` }
type CloseStream ¶
type CloseStream struct { ResponseTrailers *Metadata `protobuf:"bytes,1,opt,name=response_trailers,json=responseTrailers,proto3" json:"response_trailers,omitempty"` Status *status.Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` // contains filtered or unexported fields }
func (*CloseStream) Descriptor
deprecated
func (*CloseStream) Descriptor() ([]byte, []int)
Deprecated: Use CloseStream.ProtoReflect.Descriptor instead.
func (*CloseStream) GetResponseTrailers ¶
func (x *CloseStream) GetResponseTrailers() *Metadata
func (*CloseStream) GetStatus ¶
func (x *CloseStream) GetStatus() *status.Status
func (*CloseStream) ProtoMessage ¶
func (*CloseStream) ProtoMessage()
func (*CloseStream) ProtoReflect ¶
func (x *CloseStream) ProtoReflect() protoreflect.Message
func (*CloseStream) Reset ¶
func (x *CloseStream) Reset()
func (*CloseStream) String ¶
func (x *CloseStream) String() string
type HandlerMap ¶
type HandlerMap map[string]*serviceInfo
HandlerMap is used to accumulate service handlers into a map. The handlers can be registered once in the map, and then re-used to configure multiple servers that should expose the same handlers. HandlerMap can also be used as the internal store of registered handlers for a server implementation.
func (HandlerMap) GetServiceInfo ¶
func (m HandlerMap) GetServiceInfo() map[string]grpc.ServiceInfo
GetServiceInfo returns a map from service names to ServiceInfo. Service names include the package names, in the form of <package>.<service>.
func (HandlerMap) RegisterService ¶
func (m HandlerMap) RegisterService(desc *grpc.ServiceDesc, ss interface{})
RegisterService registers a service and its implementation to the gRPC server. It is called from the IDL generated code. This must be called before invoking Serve. If ss is non-nil (for legacy code), its type is checked to ensure it implements sd.HandlerType.
type MessageData ¶
type MessageData struct { // The full size of the message. Size int32 `protobuf:"varint,1,opt,name=size,proto3" json:"size,omitempty"` // The message data. This field should not be longer than 16kb (16,384 // bytes). If the full size of the message is larger then it should be // split into multiple chunks. The chunking is done to allow multiple // access to the underlying gRPC stream by concurrent tunneled streams. // If very large messages were sent via a single chunk, it could cause // head-of-line blocking and starvation when multiple streams need to send // data on the one underlying gRPC stream. Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` // contains filtered or unexported fields }
func (*MessageData) Descriptor
deprecated
func (*MessageData) Descriptor() ([]byte, []int)
Deprecated: Use MessageData.ProtoReflect.Descriptor instead.
func (*MessageData) GetData ¶
func (x *MessageData) GetData() []byte
func (*MessageData) GetSize ¶
func (x *MessageData) GetSize() int32
func (*MessageData) ProtoMessage ¶
func (*MessageData) ProtoMessage()
func (*MessageData) ProtoReflect ¶
func (x *MessageData) ProtoReflect() protoreflect.Message
func (*MessageData) Reset ¶
func (x *MessageData) Reset()
func (*MessageData) String ¶
func (x *MessageData) String() string
type Metadata ¶
type Metadata struct { Md map[string]*Metadata_Values `` /* 145-byte string literal not displayed */ // contains filtered or unexported fields }
func (*Metadata) Descriptor
deprecated
func (*Metadata) GetMd ¶
func (x *Metadata) GetMd() map[string]*Metadata_Values
func (*Metadata) ProtoMessage ¶
func (*Metadata) ProtoMessage()
func (*Metadata) ProtoReflect ¶
func (x *Metadata) ProtoReflect() protoreflect.Message
type Metadata_Values ¶
type Metadata_Values struct { Val []string `protobuf:"bytes,1,rep,name=val,proto3" json:"val,omitempty"` // contains filtered or unexported fields }
func (*Metadata_Values) Descriptor
deprecated
func (*Metadata_Values) Descriptor() ([]byte, []int)
Deprecated: Use Metadata_Values.ProtoReflect.Descriptor instead.
func (*Metadata_Values) GetVal ¶
func (x *Metadata_Values) GetVal() []string
func (*Metadata_Values) ProtoMessage ¶
func (*Metadata_Values) ProtoMessage()
func (*Metadata_Values) ProtoReflect ¶
func (x *Metadata_Values) ProtoReflect() protoreflect.Message
func (*Metadata_Values) Reset ¶
func (x *Metadata_Values) Reset()
func (*Metadata_Values) String ¶
func (x *Metadata_Values) String() string
type NewStream ¶
type NewStream struct { MethodName string `protobuf:"bytes,1,opt,name=method_name,json=methodName,proto3" json:"method_name,omitempty"` RequestHeaders *Metadata `protobuf:"bytes,2,opt,name=request_headers,json=requestHeaders,proto3" json:"request_headers,omitempty"` // contains filtered or unexported fields }
func (*NewStream) Descriptor
deprecated
func (*NewStream) GetMethodName ¶
func (*NewStream) GetRequestHeaders ¶
func (*NewStream) ProtoMessage ¶
func (*NewStream) ProtoMessage()
func (*NewStream) ProtoReflect ¶
func (x *NewStream) ProtoReflect() protoreflect.Message
type RgrpcServiceClient ¶
type RgrpcServiceClient interface { // OpenRgrpc creates a "reverse" channel, which allows the server to // act as a client and send RPCs to the client that creates the tunnel. It // is in most respects identical to OpenTunnel except that the roles are // reversed: the server initiates RPCs and sends requests and the client // replies to them and sends responses. OpenRgrpc(ctx context.Context, opts ...grpc.CallOption) (RgrpcService_OpenRgrpcClient, error) }
RgrpcServiceClient is the client API for RgrpcService service.
For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
func NewRgrpcServiceClient ¶
func NewRgrpcServiceClient(cc grpc.ClientConnInterface) RgrpcServiceClient
type RgrpcServiceServer ¶
type RgrpcServiceServer interface { // OpenRgrpc creates a "reverse" channel, which allows the server to // act as a client and send RPCs to the client that creates the tunnel. It // is in most respects identical to OpenTunnel except that the roles are // reversed: the server initiates RPCs and sends requests and the client // replies to them and sends responses. OpenRgrpc(RgrpcService_OpenRgrpcServer) error // contains filtered or unexported methods }
RgrpcServiceServer is the server API for RgrpcService service. All implementations must embed UnimplementedRgrpcServiceServer for forward compatibility
type RgrpcService_OpenRgrpcClient ¶
type RgrpcService_OpenRgrpcClient interface { Send(*ServerToClient) error Recv() (*ClientToServer, error) grpc.ClientStream }
type RgrpcService_OpenRgrpcServer ¶
type RgrpcService_OpenRgrpcServer interface { Send(*ClientToServer) error Recv() (*ServerToClient, error) grpc.ServerStream }
type ServerChannel ¶
type ServerChannel struct {
// contains filtered or unexported fields
}
type ServerToClient ¶
type ServerToClient struct { // The ID of the stream. Stream IDs are defined by the client and should be // used in monotonically increasing order. They cannot be re-used. Unlike // HTTP/2, the ID is 64-bit, so overflow/re-use should not be an issue. (If // the channel were used for a stream every nanosecond, it would take close // to 300 years to exhaust every ID and reach an overflow situation.) StreamId int64 `protobuf:"varint,1,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` // Types that are assignable to Frame: // *ServerToClient_ResponseHeaders // *ServerToClient_ResponseMessage // *ServerToClient_MoreResponseData // *ServerToClient_CloseStream Frame isServerToClient_Frame `protobuf_oneof:"frame"` // contains filtered or unexported fields }
ServerToClient is the message a server sends to a client.
For a single stream ID, the first such message should include the response_headers field unless no headers are to be sent. After the headers, the server can send any number of responses, via the response_message field and additional messages thereafter that use the more_response_data field (for responses that are larger than 16kb). A message with the close_stream field concludes the stream, whether it terminates successfully or with an error.
func (*ServerToClient) Descriptor
deprecated
func (*ServerToClient) Descriptor() ([]byte, []int)
Deprecated: Use ServerToClient.ProtoReflect.Descriptor instead.
func (*ServerToClient) GetCloseStream ¶
func (x *ServerToClient) GetCloseStream() *CloseStream
func (*ServerToClient) GetFrame ¶
func (m *ServerToClient) GetFrame() isServerToClient_Frame
func (*ServerToClient) GetMoreResponseData ¶
func (x *ServerToClient) GetMoreResponseData() []byte
func (*ServerToClient) GetResponseHeaders ¶
func (x *ServerToClient) GetResponseHeaders() *Metadata
func (*ServerToClient) GetResponseMessage ¶
func (x *ServerToClient) GetResponseMessage() *MessageData
func (*ServerToClient) GetStreamId ¶
func (x *ServerToClient) GetStreamId() int64
func (*ServerToClient) ProtoMessage ¶
func (*ServerToClient) ProtoMessage()
func (*ServerToClient) ProtoReflect ¶
func (x *ServerToClient) ProtoReflect() protoreflect.Message
func (*ServerToClient) Reset ¶
func (x *ServerToClient) Reset()
func (*ServerToClient) String ¶
func (x *ServerToClient) String() string
type ServerToClient_CloseStream ¶
type ServerToClient_CloseStream struct { // Terminates the stream and communicates the final disposition to the // client. After the stream is closed, no other messages should use the // given stream ID until the ID is re-used (e.g. a NewStream message is // received that creates another stream with the same ID). CloseStream *CloseStream `protobuf:"bytes,5,opt,name=close_stream,json=closeStream,proto3,oneof"` }
type ServerToClient_MoreResponseData ¶
type ServerToClient_MoreResponseData struct { // Sends a chunk of response data, for a response message that could not // wholly fit in a response_message field (e.g. > 16kb). MoreResponseData []byte `protobuf:"bytes,4,opt,name=more_response_data,json=moreResponseData,proto3,oneof"` }
type ServerToClient_ResponseHeaders ¶
type ServerToClient_ResponseHeaders struct { // Sends response headers for this stream. If headers are sent at all, // they must be sent before any response message data. ResponseHeaders *Metadata `protobuf:"bytes,2,opt,name=response_headers,json=responseHeaders,proto3,oneof"` }
type ServerToClient_ResponseMessage ¶
type ServerToClient_ResponseMessage struct { // Sends a message on the RPC stream. If the message is larger than 16k, // the rest of the message should be sent in chunks using the // more_response_data field (up to 16kb of data in each chunk). ResponseMessage *MessageData `protobuf:"bytes,3,opt,name=response_message,json=responseMessage,proto3,oneof"` }
type UnimplementedRgrpcServiceServer ¶
type UnimplementedRgrpcServiceServer struct { }
UnimplementedRgrpcServiceServer must be embedded to have forward compatible implementations.
func (UnimplementedRgrpcServiceServer) OpenRgrpc ¶
func (UnimplementedRgrpcServiceServer) OpenRgrpc(RgrpcService_OpenRgrpcServer) error
type UnsafeRgrpcServiceServer ¶
type UnsafeRgrpcServiceServer interface {
// contains filtered or unexported methods
}
UnsafeRgrpcServiceServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to RgrpcServiceServer will result in compilation errors.