rpcstream

package
v0.36.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 23, 2024 License: MIT Imports: 12 Imported by: 1

README

RPC Stream

This package implements running a RPC service on top of another.

The "host" service has a signature like:

syntax = "proto3";
package mypackage;

import "github.com/aperturerobotics/starpc/rpcstream/rpcstream.proto";

// HostService proxies RPC calls to a target Mux.
service HostService {
  // MyRpc opens a stream to proxy a RPC call.
  rpc MyRpc(stream .rpcstream.RpcStreamPacket) returns (stream .rpcstream.RpcStreamPacket);
}

NewRpcStreamOpenStream(componentID, hostService.MyRpc) will construct a new OpenStreamFunc which starts a RPC call to MyRpc and forwards the starpc packets over the two-way stream.

The component ID can be used to determine which Mux the client should access.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNoServerForComponent is returned if the getter returns nil.
	ErrNoServerForComponent = errors.New("no server for that component")
	// ErrUnexpectedPacket is returned if the packet was unexpected.
	ErrUnexpectedPacket = errors.New("unexpected rpcstream packet")
)

Functions

func HandleProxyRpcStream added in v0.19.0

func HandleProxyRpcStream[T RpcStream](stream RpcStream, getter RpcProxyGetter[T]) error

HandleProxyRpcStream handles an incoming RPC stream proxying to a ReadWriteCloser.

func HandleRawRpcStream added in v0.19.0

func HandleRawRpcStream(stream RpcStream, getter RpcRawGetter) error

HandleRawRpcStream handles an incoming RPC stream proxying to a ReadWriteCloser.

func HandleRpcStream

func HandleRpcStream(stream RpcStream, getter RpcStreamGetter) error

HandleRpcStream handles an incoming RPC stream (remote is the initiator).

func NewRpcStreamClient added in v0.13.2

func NewRpcStreamClient[T RpcStream](rpcCaller RpcStreamCaller[T], componentID string, waitAck bool) srpc.Client

NewRpcStreamClient constructs a Client which opens streams with a RpcStream.

if waitAck is set, OpenStream waits for acknowledgment from the remote.

func NewRpcStreamOpenStream

func NewRpcStreamOpenStream[T RpcStream](rpcCaller RpcStreamCaller[T], componentID string, waitAck bool) srpc.OpenStreamFunc

NewRpcStreamOpenStream constructs an OpenStream function with a RpcStream.

if waitAck is set, OpenStream waits for acknowledgment from the remote.

func ReadPump added in v0.19.0

func ReadPump(strm RpcStream, cb srpc.PacketDataHandler, closed srpc.CloseHandler)

ReadPump executes the read pump in a goroutine.

calls the handler when closed or returning an error

func ReadToHandler added in v0.19.0

func ReadToHandler(strm RpcStream, cb srpc.PacketDataHandler) error

ReadToHandler reads data to the given handler. Does not handle closing the stream, use ReadPump instead.

Types

type RpcAck added in v0.5.0

type RpcAck struct {

	// Error indicates there was some error setting up the stream.
	Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
	// contains filtered or unexported fields
}

RpcAck is the ack message in a RPC stream.

func (*RpcAck) CloneMessageVT added in v0.18.3

func (m *RpcAck) CloneMessageVT() protobuf_go_lite.CloneMessage

func (*RpcAck) CloneVT added in v0.12.3

func (m *RpcAck) CloneVT() *RpcAck

func (*RpcAck) EqualMessageVT added in v0.18.3

func (this *RpcAck) EqualMessageVT(thatMsg any) bool

func (*RpcAck) EqualVT added in v0.5.0

func (this *RpcAck) EqualVT(that *RpcAck) bool

func (*RpcAck) GetError added in v0.5.0

func (x *RpcAck) GetError() string

func (*RpcAck) MarshalJSON added in v0.30.0

func (x *RpcAck) MarshalJSON() ([]byte, error)

MarshalJSON marshals the RpcAck to JSON.

func (*RpcAck) MarshalProtoJSON added in v0.30.0

func (x *RpcAck) MarshalProtoJSON(s *json.MarshalState)

MarshalProtoJSON marshals the RpcAck message to JSON.

func (*RpcAck) MarshalProtoText added in v0.31.4

func (x *RpcAck) MarshalProtoText() string

func (*RpcAck) MarshalToSizedBufferVT added in v0.5.0

func (m *RpcAck) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcAck) MarshalToVT added in v0.5.0

func (m *RpcAck) MarshalToVT(dAtA []byte) (int, error)

func (*RpcAck) MarshalVT added in v0.5.0

func (m *RpcAck) MarshalVT() (dAtA []byte, err error)

func (*RpcAck) ProtoMessage added in v0.5.0

func (*RpcAck) ProtoMessage()

func (*RpcAck) Reset added in v0.5.0

func (x *RpcAck) Reset()

func (*RpcAck) SizeVT added in v0.5.0

func (m *RpcAck) SizeVT() (n int)

func (*RpcAck) String added in v0.5.0

func (x *RpcAck) String() string

func (*RpcAck) UnmarshalJSON added in v0.30.1

func (x *RpcAck) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals the RpcAck from JSON.

func (*RpcAck) UnmarshalProtoJSON added in v0.30.1

func (x *RpcAck) UnmarshalProtoJSON(s *json.UnmarshalState)

UnmarshalProtoJSON unmarshals the RpcAck message from JSON.

func (*RpcAck) UnmarshalVT added in v0.5.0

func (m *RpcAck) UnmarshalVT(dAtA []byte) error

type RpcProxyGetter added in v0.19.0

type RpcProxyGetter[T RpcStream] func(ctx context.Context, componentID string) (
	caller RpcStreamCaller[T],
	callerComponentID string,
	rel func(),
	err error,
)

RpcProxyGetter returns a remote rpcstream call to proxy to. Returns the component ID to pass to the caller.

Returns a release function to call when done with the stream. The caller will cancel the context and close the rpc when done. Returns nil, "", nil, nil if not found.

type RpcRawGetter added in v0.19.0

type RpcRawGetter func(ctx context.Context, componentID string) (io.ReadWriteCloser, func(), error)

RpcRawGetter returns a read/write/closer to proxy data to/from. Returns a release function to call when done with the stream. The caller will call stream.Close as well as the release function (if any). Returns nil, nil, nil if not found.

type RpcStream

type RpcStream interface {
	srpc.Stream
	Send(*RpcStreamPacket) error
	Recv() (*RpcStreamPacket, error)
}

RpcStream implements a RPC call stream over a RPC call. Used to implement sub-components which have a different set of services & calls available.

func OpenRpcStream added in v0.5.0

func OpenRpcStream[T RpcStream](ctx context.Context, rpcCaller RpcStreamCaller[T], componentID string, waitAck bool) (RpcStream, error)

OpenRpcStream opens a RPC stream with a remote.

if waitAck is set, waits for acknowledgment from the remote before returning.

type RpcStreamCaller

type RpcStreamCaller[T RpcStream] func(ctx context.Context) (T, error)

RpcStreamCaller is a function which starts the RpcStream call.

type RpcStreamGetter

type RpcStreamGetter func(ctx context.Context, componentID string) (srpc.Invoker, func(), error)

RpcStreamGetter returns the Mux for the component ID from the remote. Returns a release function to call when done with the Mux. Returns nil, nil, nil if not found.

type RpcStreamInit

type RpcStreamInit struct {

	// ComponentId is the identifier of the component making the request.
	ComponentId string `protobuf:"bytes,1,opt,name=component_id,json=componentId,proto3" json:"componentId,omitempty"`
	// contains filtered or unexported fields
}

RpcStreamInit is the first message in a RPC stream.

func (*RpcStreamInit) CloneMessageVT added in v0.18.3

func (m *RpcStreamInit) CloneMessageVT() protobuf_go_lite.CloneMessage

func (*RpcStreamInit) CloneVT added in v0.12.3

func (m *RpcStreamInit) CloneVT() *RpcStreamInit

func (*RpcStreamInit) EqualMessageVT added in v0.18.3

func (this *RpcStreamInit) EqualMessageVT(thatMsg any) bool

func (*RpcStreamInit) EqualVT

func (this *RpcStreamInit) EqualVT(that *RpcStreamInit) bool

func (*RpcStreamInit) GetComponentId

func (x *RpcStreamInit) GetComponentId() string

func (*RpcStreamInit) MarshalJSON added in v0.30.0

func (x *RpcStreamInit) MarshalJSON() ([]byte, error)

MarshalJSON marshals the RpcStreamInit to JSON.

func (*RpcStreamInit) MarshalProtoJSON added in v0.30.0

func (x *RpcStreamInit) MarshalProtoJSON(s *json.MarshalState)

MarshalProtoJSON marshals the RpcStreamInit message to JSON.

func (*RpcStreamInit) MarshalProtoText added in v0.31.4

func (x *RpcStreamInit) MarshalProtoText() string

func (*RpcStreamInit) MarshalToSizedBufferVT

func (m *RpcStreamInit) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamInit) MarshalToVT

func (m *RpcStreamInit) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamInit) MarshalVT

func (m *RpcStreamInit) MarshalVT() (dAtA []byte, err error)

func (*RpcStreamInit) ProtoMessage

func (*RpcStreamInit) ProtoMessage()

func (*RpcStreamInit) Reset

func (x *RpcStreamInit) Reset()

func (*RpcStreamInit) SizeVT

func (m *RpcStreamInit) SizeVT() (n int)

func (*RpcStreamInit) String

func (x *RpcStreamInit) String() string

func (*RpcStreamInit) UnmarshalJSON added in v0.30.1

func (x *RpcStreamInit) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals the RpcStreamInit from JSON.

func (*RpcStreamInit) UnmarshalProtoJSON added in v0.30.1

func (x *RpcStreamInit) UnmarshalProtoJSON(s *json.UnmarshalState)

UnmarshalProtoJSON unmarshals the RpcStreamInit message from JSON.

func (*RpcStreamInit) UnmarshalVT

func (m *RpcStreamInit) UnmarshalVT(dAtA []byte) error

type RpcStreamPacket added in v0.4.5

type RpcStreamPacket struct {

	// Types that are assignable to Body:
	//
	//	*RpcStreamPacket_Init
	//	*RpcStreamPacket_Ack
	//	*RpcStreamPacket_Data
	Body isRpcStreamPacket_Body `protobuf_oneof:"body"`
	// contains filtered or unexported fields
}

RpcStreamPacket is a packet encapsulating data for a RPC stream.

func (*RpcStreamPacket) CloneMessageVT added in v0.18.3

func (m *RpcStreamPacket) CloneMessageVT() protobuf_go_lite.CloneMessage

func (*RpcStreamPacket) CloneVT added in v0.12.3

func (m *RpcStreamPacket) CloneVT() *RpcStreamPacket

func (*RpcStreamPacket) EqualMessageVT added in v0.18.3

func (this *RpcStreamPacket) EqualMessageVT(thatMsg any) bool

func (*RpcStreamPacket) EqualVT added in v0.4.5

func (this *RpcStreamPacket) EqualVT(that *RpcStreamPacket) bool

func (*RpcStreamPacket) GetAck added in v0.5.0

func (x *RpcStreamPacket) GetAck() *RpcAck

func (*RpcStreamPacket) GetBody added in v0.4.5

func (m *RpcStreamPacket) GetBody() isRpcStreamPacket_Body

func (*RpcStreamPacket) GetData added in v0.4.5

func (x *RpcStreamPacket) GetData() []byte

func (*RpcStreamPacket) GetInit added in v0.4.5

func (x *RpcStreamPacket) GetInit() *RpcStreamInit

func (*RpcStreamPacket) MarshalJSON added in v0.30.0

func (x *RpcStreamPacket) MarshalJSON() ([]byte, error)

MarshalJSON marshals the RpcStreamPacket to JSON.

func (*RpcStreamPacket) MarshalProtoJSON added in v0.30.0

func (x *RpcStreamPacket) MarshalProtoJSON(s *json.MarshalState)

MarshalProtoJSON marshals the RpcStreamPacket message to JSON.

func (*RpcStreamPacket) MarshalProtoText added in v0.31.4

func (x *RpcStreamPacket) MarshalProtoText() string

func (*RpcStreamPacket) MarshalToSizedBufferVT added in v0.4.5

func (m *RpcStreamPacket) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamPacket) MarshalToVT added in v0.4.5

func (m *RpcStreamPacket) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamPacket) MarshalVT added in v0.4.5

func (m *RpcStreamPacket) MarshalVT() (dAtA []byte, err error)

func (*RpcStreamPacket) ProtoMessage added in v0.4.5

func (*RpcStreamPacket) ProtoMessage()

func (*RpcStreamPacket) Reset added in v0.4.5

func (x *RpcStreamPacket) Reset()

func (*RpcStreamPacket) SizeVT added in v0.4.5

func (m *RpcStreamPacket) SizeVT() (n int)

func (*RpcStreamPacket) String added in v0.4.5

func (x *RpcStreamPacket) String() string

func (*RpcStreamPacket) UnmarshalJSON added in v0.30.1

func (x *RpcStreamPacket) UnmarshalJSON(b []byte) error

UnmarshalJSON unmarshals the RpcStreamPacket from JSON.

func (*RpcStreamPacket) UnmarshalProtoJSON added in v0.30.1

func (x *RpcStreamPacket) UnmarshalProtoJSON(s *json.UnmarshalState)

UnmarshalProtoJSON unmarshals the RpcStreamPacket message from JSON.

func (*RpcStreamPacket) UnmarshalVT added in v0.4.5

func (m *RpcStreamPacket) UnmarshalVT(dAtA []byte) error

type RpcStreamPacket_Ack added in v0.5.0

type RpcStreamPacket_Ack struct {
	// Ack is sent in response to Init.
	// Sent by the server.
	Ack *RpcAck `protobuf:"bytes,2,opt,name=ack,proto3,oneof"`
}

func (*RpcStreamPacket_Ack) CloneOneofVT added in v0.31.13

func (m *RpcStreamPacket_Ack) CloneOneofVT() isRpcStreamPacket_Body

func (*RpcStreamPacket_Ack) CloneVT added in v0.12.3

func (*RpcStreamPacket_Ack) EqualVT added in v0.10.0

func (this *RpcStreamPacket_Ack) EqualVT(thatIface isRpcStreamPacket_Body) bool

func (*RpcStreamPacket_Ack) MarshalToSizedBufferVT added in v0.5.0

func (m *RpcStreamPacket_Ack) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Ack) MarshalToVT added in v0.5.0

func (m *RpcStreamPacket_Ack) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Ack) SizeVT added in v0.5.0

func (m *RpcStreamPacket_Ack) SizeVT() (n int)

type RpcStreamPacket_Data added in v0.4.5

type RpcStreamPacket_Data struct {
	// Data is the encapsulated data packet.
	Data []byte `protobuf:"bytes,3,opt,name=data,proto3,oneof"`
}

func (*RpcStreamPacket_Data) CloneOneofVT added in v0.31.13

func (m *RpcStreamPacket_Data) CloneOneofVT() isRpcStreamPacket_Body

func (*RpcStreamPacket_Data) CloneVT added in v0.12.3

func (*RpcStreamPacket_Data) EqualVT added in v0.10.0

func (this *RpcStreamPacket_Data) EqualVT(thatIface isRpcStreamPacket_Body) bool

func (*RpcStreamPacket_Data) MarshalToSizedBufferVT added in v0.4.5

func (m *RpcStreamPacket_Data) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Data) MarshalToVT added in v0.4.5

func (m *RpcStreamPacket_Data) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Data) SizeVT added in v0.4.5

func (m *RpcStreamPacket_Data) SizeVT() (n int)

type RpcStreamPacket_Init added in v0.4.5

type RpcStreamPacket_Init struct {
	// Init is the first packet in the stream.
	// Sent by the initiator.
	Init *RpcStreamInit `protobuf:"bytes,1,opt,name=init,proto3,oneof"`
}

func (*RpcStreamPacket_Init) CloneOneofVT added in v0.31.13

func (m *RpcStreamPacket_Init) CloneOneofVT() isRpcStreamPacket_Body

func (*RpcStreamPacket_Init) CloneVT added in v0.12.3

func (*RpcStreamPacket_Init) EqualVT added in v0.10.0

func (this *RpcStreamPacket_Init) EqualVT(thatIface isRpcStreamPacket_Body) bool

func (*RpcStreamPacket_Init) MarshalToSizedBufferVT added in v0.4.5

func (m *RpcStreamPacket_Init) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Init) MarshalToVT added in v0.4.5

func (m *RpcStreamPacket_Init) MarshalToVT(dAtA []byte) (int, error)

func (*RpcStreamPacket_Init) SizeVT added in v0.4.5

func (m *RpcStreamPacket_Init) SizeVT() (n int)

type RpcStreamReadWriter

type RpcStreamReadWriter struct {
	// contains filtered or unexported fields
}

RpcStreamReadWriter reads and writes a buffered RpcStream.

func NewRpcStreamReadWriter

func NewRpcStreamReadWriter(stream RpcStream) *RpcStreamReadWriter

NewRpcStreamReadWriter constructs a new read/writer.

func (*RpcStreamReadWriter) Close

func (r *RpcStreamReadWriter) Close() error

Close closes the packet rw.

func (*RpcStreamReadWriter) Read added in v0.5.0

func (r *RpcStreamReadWriter) Read(p []byte) (n int, err error)

Read reads a packet from the writer.

func (*RpcStreamReadWriter) Write added in v0.5.0

func (r *RpcStreamReadWriter) Write(p []byte) (n int, err error)

Write writes a packet to the writer.

type RpcStreamWriter added in v0.19.0

type RpcStreamWriter struct {
	RpcStream
}

RpcStreamWriter implements the Writer only.

func NewRpcStreamWriter added in v0.19.0

func NewRpcStreamWriter(rpcStream RpcStream) *RpcStreamWriter

NewRpcStreamWriter constructs a new rpc stream writer.

func (*RpcStreamWriter) Close added in v0.19.0

func (r *RpcStreamWriter) Close() error

Close closes the writer.

func (*RpcStreamWriter) Write added in v0.19.0

func (r *RpcStreamWriter) Write(p []byte) (n int, err error)

Write writes a packet to the writer.

func (*RpcStreamWriter) WritePacket added in v0.19.0

func (r *RpcStreamWriter) WritePacket(p *srpc.Packet) error

WritePacket writes a packet to the remote.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL