rpcstream

package
v0.21.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2023 License: MIT Imports: 12 Imported by: 1

README

RPC Stream

This package implements running a RPC service on top of another.

The "host" service has a signature like:

syntax = "proto3";
package mypackage;

import "github.com/aperturerobotics/starpc/rpcstream/rpcstream.proto";

// HostService proxies RPC calls to a target Mux.
service HostService {
  // MyRpc opens a stream to proxy a RPC call.
  rpc MyRpc(stream .rpcstream.RpcStreamPacket) returns (stream .rpcstream.RpcStreamPacket);
}

NewRpcStreamOpenStream(componentID, hostService.MyRpc) will construct a new OpenStreamFunc which starts a RPC call to MyRpc and forwards the starpc packets over the two-way stream.

The component ID can be used to determine which Mux the client should access.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoServerForComponent is returned if the getter returns nil.
	ErrNoServerForComponent = errors.New("no server for that component")
	// ErrUnexpectedPacket is returned if the packet was unexpected.
	ErrUnexpectedPacket = errors.New("unexpected rpcstream packet")
)
View Source
var (
	ErrInvalidLength        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflow          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroup = fmt.Errorf("proto: unexpected end of group")
)
View Source
var File_github_com_aperturerobotics_starpc_rpcstream_rpcstream_proto protoreflect.FileDescriptor

Functions

func HandleProxyRpcStream added in v0.19.0

func HandleProxyRpcStream[T RpcStream](stream RpcStream, getter RpcProxyGetter[T]) error

HandleProxyRpcStream handles an incoming RPC stream proxying to a ReadWriteCloser.

func HandleRawRpcStream added in v0.19.0

func HandleRawRpcStream(stream RpcStream, getter RpcRawGetter) error

HandleRawRpcStream handles an incoming RPC stream proxying to a ReadWriteCloser.

func HandleRpcStream

func HandleRpcStream(stream RpcStream, getter RpcStreamGetter) error

HandleRpcStream handles an incoming RPC stream (remote is the initiator).

func NewRpcStreamClient added in v0.13.2

func NewRpcStreamClient[T RpcStream](rpcCaller RpcStreamCaller[T], componentID string, waitAck bool) srpc.Client

NewRpcStreamClient constructs a Client which opens streams with a RpcStream.

if waitAck is set, OpenStream waits for acknowledgment from the remote.

func NewRpcStreamOpenStream

func NewRpcStreamOpenStream[T RpcStream](rpcCaller RpcStreamCaller[T], componentID string, waitAck bool) srpc.OpenStreamFunc

NewRpcStreamOpenStream constructs an OpenStream function with a RpcStream.

if waitAck is set, OpenStream waits for acknowledgment from the remote.

func ReadPump added in v0.19.0

func ReadPump(strm RpcStream, cb srpc.PacketDataHandler, closed srpc.CloseHandler)

ReadPump executes the read pump in a goroutine.

calls the handler when closed or returning an error

func ReadToHandler added in v0.19.0

func ReadToHandler(strm RpcStream, cb srpc.PacketDataHandler) error

ReadToHandler reads data to the given handler. Does not handle closing the stream, use ReadPump instead.

Types

type RpcAck added in v0.5.0

type RpcAck struct {

	// Error indicates there was some error setting up the stream.
	Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
	// contains filtered or unexported fields
}

RpcAck is the ack message in a RPC stream.

func (*RpcAck) CloneMessageVT added in v0.18.3

func (m *RpcAck) CloneMessageVT() proto.Message

func (*RpcAck) CloneVT added in v0.12.3

func (m *RpcAck) CloneVT() *RpcAck

func (*RpcAck) Descriptor deprecated added in v0.5.0

func (*RpcAck) Descriptor() ([]byte, []int)

Deprecated: Use RpcAck.ProtoReflect.Descriptor instead.

func (*RpcAck) EqualMessageVT added in v0.18.3

func (this *RpcAck) EqualMessageVT(thatMsg proto.Message) bool

func (*RpcAck) EqualVT added in v0.5.0

func (this *RpcAck) EqualVT(that *RpcAck) bool

func (*RpcAck) GetError added in v0.5.0

func (x *RpcAck) GetError() string

func (*RpcAck) MarshalToSizedBufferVT added in v0.5.0

func (m *RpcAck) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcAck) MarshalToVT added in v0.5.0

func (m *RpcAck) MarshalToVT(dAtA []byte) (int, error)

func (*RpcAck) MarshalVT added in v0.5.0

func (m *RpcAck) MarshalVT() (dAtA []byte, err error)

func (*RpcAck) ProtoMessage added in v0.5.0

func (*RpcAck) ProtoMessage()

func (*RpcAck) ProtoReflect added in v0.5.0

func (x *RpcAck) ProtoReflect() protoreflect.Message

func (*RpcAck) Reset added in v0.5.0

func (x *RpcAck) Reset()

func (*RpcAck) SizeVT added in v0.5.0

func (m *RpcAck) SizeVT() (n int)

func (*RpcAck) String added in v0.5.0

func (x *RpcAck) String() string

func (*RpcAck) UnmarshalVT added in v0.5.0

func (m *RpcAck) UnmarshalVT(dAtA []byte) error

type RpcProxyGetter added in v0.19.0

type RpcProxyGetter[T RpcStream] func(ctx context.Context, componentID string) (
	caller RpcStreamCaller[T],
	callerComponentID string,
	rel func(),
	err error,
)

RpcProxyGetter returns a remote rpcstream call to proxy to. Returns the component ID to pass to the caller.

Returns a release function to call when done with the stream. The caller will cancel the context and close the rpc when done. Returns nil, "", nil, nil if not found.

type RpcRawGetter added in v0.19.0

type RpcRawGetter func(ctx context.Context, componentID string) (io.ReadWriteCloser, func(), error)

RpcRawGetter returns a read/write/closer to proxy data to/from. Returns a release function to call when done with the stream. The caller will call stream.Close as well as the release function (if any). Returns nil, nil, nil if not found.

type RpcStream

type RpcStream interface {
	srpc.Stream
	Send(*RpcStreamPacket) error
	Recv() (*RpcStreamPacket, error)
}

RpcStream implements a RPC call stream over a RPC call. Used to implement sub-components which have a different set of services & calls available.

func OpenRpcStream added in v0.5.0

func OpenRpcStream[T RpcStream](ctx context.Context, rpcCaller RpcStreamCaller[T], componentID string, waitAck bool) (RpcStream, error)

OpenRpcStream opens a RPC stream with a remote.

if waitAck is set, waits for acknowledgment from the remote before returning.

type RpcStreamCaller

type RpcStreamCaller[T RpcStream] func(ctx context.Context) (T, error)

RpcStreamCaller is a function which starts the RpcStream call.

type RpcStreamGetter

type RpcStreamGetter func(ctx context.Context, componentID string) (srpc.Invoker, func(), error)

RpcStreamGetter returns the Mux for the component ID from the remote. Returns a release function to call when done with the Mux. Returns nil, nil, nil if not found.

type RpcStreamInit

type RpcStreamInit struct {

	// ComponentId is the identifier of the component making the request.
	ComponentId string `protobuf:"bytes,1,opt,name=component_id,json=componentId,proto3" json:"component_id,omitempty"`
	// contains filtered or unexported fields
}

RpcStreamInit is the first message in a RPC stream.

func (*RpcStreamInit) CloneMessageVT added in v0.18.3

func (m *RpcStreamInit) CloneMessageVT() proto.Message

func (*RpcStreamInit) CloneVT added in v0.12.3

func (m *RpcStreamInit) CloneVT() *RpcStreamInit

func (*RpcStreamInit) Descriptor deprecated

func (*RpcStreamInit) Descriptor() ([]byte, []int)

Deprecated: Use RpcStreamInit.ProtoReflect.Descriptor instead.

func (*RpcStreamInit) EqualMessageVT added in v0.18.3

func (this *RpcStreamInit) EqualMessageVT(thatMsg proto.Message) bool

func (*RpcStreamInit) EqualVT

func (this *RpcStreamInit) EqualVT(that *RpcStreamInit) bool

func (*RpcStreamInit) GetComponentId

func (x *RpcStreamInit) GetComponentId() string

func (*RpcStreamInit) MarshalToSizedBufferVT

func (m *RpcStreamInit) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamInit) MarshalToVT

func (m *RpcStreamInit) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamInit) MarshalVT

func (m *RpcStreamInit) MarshalVT() (dAtA []byte, err error)

func (*RpcStreamInit) ProtoMessage

func (*RpcStreamInit) ProtoMessage()

func (*RpcStreamInit) ProtoReflect

func (x *RpcStreamInit) ProtoReflect() protoreflect.Message

func (*RpcStreamInit) Reset

func (x *RpcStreamInit) Reset()

func (*RpcStreamInit) SizeVT

func (m *RpcStreamInit) SizeVT() (n int)

func (*RpcStreamInit) String

func (x *RpcStreamInit) String() string

func (*RpcStreamInit) UnmarshalVT

func (m *RpcStreamInit) UnmarshalVT(dAtA []byte) error

type RpcStreamPacket added in v0.4.5

type RpcStreamPacket struct {

	// Types that are assignable to Body:
	//
	//	*RpcStreamPacket_Init
	//	*RpcStreamPacket_Ack
	//	*RpcStreamPacket_Data
	Body isRpcStreamPacket_Body `protobuf_oneof:"body"`
	// contains filtered or unexported fields
}

RpcStreamPacket is a packet encapsulating data for a RPC stream.

func (*RpcStreamPacket) CloneMessageVT added in v0.18.3

func (m *RpcStreamPacket) CloneMessageVT() proto.Message

func (*RpcStreamPacket) CloneVT added in v0.12.3

func (m *RpcStreamPacket) CloneVT() *RpcStreamPacket

func (*RpcStreamPacket) Descriptor deprecated added in v0.4.5

func (*RpcStreamPacket) Descriptor() ([]byte, []int)

Deprecated: Use RpcStreamPacket.ProtoReflect.Descriptor instead.

func (*RpcStreamPacket) EqualMessageVT added in v0.18.3

func (this *RpcStreamPacket) EqualMessageVT(thatMsg proto.Message) bool

func (*RpcStreamPacket) EqualVT added in v0.4.5

func (this *RpcStreamPacket) EqualVT(that *RpcStreamPacket) bool

func (*RpcStreamPacket) GetAck added in v0.5.0

func (x *RpcStreamPacket) GetAck() *RpcAck

func (*RpcStreamPacket) GetBody added in v0.4.5

func (m *RpcStreamPacket) GetBody() isRpcStreamPacket_Body

func (*RpcStreamPacket) GetData added in v0.4.5

func (x *RpcStreamPacket) GetData() []byte

func (*RpcStreamPacket) GetInit added in v0.4.5

func (x *RpcStreamPacket) GetInit() *RpcStreamInit

func (*RpcStreamPacket) MarshalToSizedBufferVT added in v0.4.5

func (m *RpcStreamPacket) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamPacket) MarshalToVT added in v0.4.5

func (m *RpcStreamPacket) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamPacket) MarshalVT added in v0.4.5

func (m *RpcStreamPacket) MarshalVT() (dAtA []byte, err error)

func (*RpcStreamPacket) ProtoMessage added in v0.4.5

func (*RpcStreamPacket) ProtoMessage()

func (*RpcStreamPacket) ProtoReflect added in v0.4.5

func (x *RpcStreamPacket) ProtoReflect() protoreflect.Message

func (*RpcStreamPacket) Reset added in v0.4.5

func (x *RpcStreamPacket) Reset()

func (*RpcStreamPacket) SizeVT added in v0.4.5

func (m *RpcStreamPacket) SizeVT() (n int)

func (*RpcStreamPacket) String added in v0.4.5

func (x *RpcStreamPacket) String() string

func (*RpcStreamPacket) UnmarshalVT added in v0.4.5

func (m *RpcStreamPacket) UnmarshalVT(dAtA []byte) error

type RpcStreamPacket_Ack added in v0.5.0

type RpcStreamPacket_Ack struct {
	// Ack is sent in response to Init.
	// Sent by the server.
	Ack *RpcAck `protobuf:"bytes,2,opt,name=ack,proto3,oneof"`
}

func (*RpcStreamPacket_Ack) CloneVT added in v0.12.3

func (m *RpcStreamPacket_Ack) CloneVT() isRpcStreamPacket_Body

func (*RpcStreamPacket_Ack) EqualVT added in v0.10.0

func (this *RpcStreamPacket_Ack) EqualVT(thatIface isRpcStreamPacket_Body) bool

func (*RpcStreamPacket_Ack) MarshalToSizedBufferVT added in v0.5.0

func (m *RpcStreamPacket_Ack) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Ack) MarshalToVT added in v0.5.0

func (m *RpcStreamPacket_Ack) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Ack) SizeVT added in v0.5.0

func (m *RpcStreamPacket_Ack) SizeVT() (n int)

type RpcStreamPacket_Data added in v0.4.5

type RpcStreamPacket_Data struct {
	// Data is the encapsulated data packet.
	Data []byte `protobuf:"bytes,3,opt,name=data,proto3,oneof"`
}

func (*RpcStreamPacket_Data) CloneVT added in v0.12.3

func (m *RpcStreamPacket_Data) CloneVT() isRpcStreamPacket_Body

func (*RpcStreamPacket_Data) EqualVT added in v0.10.0

func (this *RpcStreamPacket_Data) EqualVT(thatIface isRpcStreamPacket_Body) bool

func (*RpcStreamPacket_Data) MarshalToSizedBufferVT added in v0.4.5

func (m *RpcStreamPacket_Data) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Data) MarshalToVT added in v0.4.5

func (m *RpcStreamPacket_Data) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Data) SizeVT added in v0.4.5

func (m *RpcStreamPacket_Data) SizeVT() (n int)

type RpcStreamPacket_Init added in v0.4.5

type RpcStreamPacket_Init struct {
	// Init is the first packet in the stream.
	// Sent by the initiator.
	Init *RpcStreamInit `protobuf:"bytes,1,opt,name=init,proto3,oneof"`
}

func (*RpcStreamPacket_Init) CloneVT added in v0.12.3

func (m *RpcStreamPacket_Init) CloneVT() isRpcStreamPacket_Body

func (*RpcStreamPacket_Init) EqualVT added in v0.10.0

func (this *RpcStreamPacket_Init) EqualVT(thatIface isRpcStreamPacket_Body) bool

func (*RpcStreamPacket_Init) MarshalToSizedBufferVT added in v0.4.5

func (m *RpcStreamPacket_Init) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Init) MarshalToVT added in v0.4.5

func (m *RpcStreamPacket_Init) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Init) SizeVT added in v0.4.5

func (m *RpcStreamPacket_Init) SizeVT() (n int)

type RpcStreamReadWriter

type RpcStreamReadWriter struct {
	// contains filtered or unexported fields
}

RpcStreamReadWriter reads and writes a buffered RpcStream.

func NewRpcStreamReadWriter

func NewRpcStreamReadWriter(stream RpcStream) *RpcStreamReadWriter

NewRpcStreamReadWriter constructs a new read/writer.

func (*RpcStreamReadWriter) Close

func (r *RpcStreamReadWriter) Close() error

Close closes the packet rw.

func (*RpcStreamReadWriter) Read added in v0.5.0

func (r *RpcStreamReadWriter) Read(p []byte) (n int, err error)

Read reads a packet from the writer.

func (*RpcStreamReadWriter) Write added in v0.5.0

func (r *RpcStreamReadWriter) Write(p []byte) (n int, err error)

Write writes a packet to the writer.

type RpcStreamWriter added in v0.19.0

type RpcStreamWriter struct {
	RpcStream
}

RpcStreamWriter implements the Writer only.

func NewRpcStreamWriter added in v0.19.0

func NewRpcStreamWriter(rpcStream RpcStream) *RpcStreamWriter

NewRpcStreamWriter constructs a new rpc stream writer.

func (*RpcStreamWriter) Close added in v0.19.0

func (r *RpcStreamWriter) Close() error

Close closes the writer.

func (*RpcStreamWriter) Write added in v0.19.0

func (r *RpcStreamWriter) Write(p []byte) (n int, err error)

Write writes a packet to the writer.

func (*RpcStreamWriter) WritePacket added in v0.19.0

func (r *RpcStreamWriter) WritePacket(p *srpc.Packet) error

WritePacket writes a packet to the remote.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL