srpc

package
v0.4.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2022 License: MIT Imports: 17 Imported by: 18

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrUnimplemented is returned if the RPC method was not implemented.
	ErrUnimplemented = errors.New("unimplemented")
	// ErrCompleted is returned if a message is received after the rpc was completed.
	ErrCompleted = errors.New("unexpected packet after rpc was completed")
	// ErrUnrecognizedPacket is returned if the packet type was not recognized.
	ErrUnrecognizedPacket = errors.New("unrecognized packet type")
	// ErrEmptyPacket is returned if nothing is specified in a packet.
	ErrEmptyPacket = errors.New("invalid empty packet")
	// ErrInvalidMessage indicates the message failed to parse.
	ErrInvalidMessage = errors.New("invalid message")
	// ErrEmptyMethodID is returned if the method id was empty.
	ErrEmptyMethodID = errors.New("method id empty")
	// ErrEmptyServiceID is returned if the service id was empty.
	ErrEmptyServiceID = errors.New("service id empty")
)
View Source
var (
	ErrInvalidLength        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflow          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroup = fmt.Errorf("proto: unexpected end of group")
)
View Source
var File_github_com_aperturerobotics_starpc_srpc_rpcproto_proto protoreflect.FileDescriptor

Functions

func NewPipeStream

func NewPipeStream(ctx context.Context) (Stream, Stream)

NewPipeStream constructs a new in-memory stream.

Types

type CallData

type CallData struct {

	// Data contains the packet in the sequence.
	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
	// DataIsZero indicates Data is set with an empty message.
	DataIsZero bool `protobuf:"varint,2,opt,name=data_is_zero,json=dataIsZero,proto3" json:"data_is_zero,omitempty"`
	// Complete indicates the RPC call is completed.
	Complete bool `protobuf:"varint,3,opt,name=complete,proto3" json:"complete,omitempty"`
	// Error contains any error that caused the RPC to fail.
	// If set, implies complete=true.
	Error string `protobuf:"bytes,4,opt,name=error,proto3" json:"error,omitempty"`
	// contains filtered or unexported fields
}

CallData contains a message in a streaming RPC sequence.

func (*CallData) Descriptor deprecated

func (*CallData) Descriptor() ([]byte, []int)

Deprecated: Use CallData.ProtoReflect.Descriptor instead.

func (*CallData) EqualVT

func (this *CallData) EqualVT(that *CallData) bool

func (*CallData) GetComplete

func (x *CallData) GetComplete() bool

func (*CallData) GetData

func (x *CallData) GetData() []byte

func (*CallData) GetDataIsZero added in v0.1.8

func (x *CallData) GetDataIsZero() bool

func (*CallData) GetError

func (x *CallData) GetError() string

func (*CallData) MarshalToSizedBufferVT

func (m *CallData) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*CallData) MarshalToVT

func (m *CallData) MarshalToVT(dAtA []byte) (int, error)

func (*CallData) MarshalVT

func (m *CallData) MarshalVT() (dAtA []byte, err error)

func (*CallData) ProtoMessage

func (*CallData) ProtoMessage()

func (*CallData) ProtoReflect

func (x *CallData) ProtoReflect() protoreflect.Message

func (*CallData) Reset

func (x *CallData) Reset()

func (*CallData) SizeVT

func (m *CallData) SizeVT() (n int)

func (*CallData) String

func (x *CallData) String() string

func (*CallData) UnmarshalVT

func (m *CallData) UnmarshalVT(dAtA []byte) error

func (*CallData) Validate

func (p *CallData) Validate() error

Validate performs cursory validation of the packet.

type CallStart

type CallStart struct {

	// RpcService is the service to contact.
	// Must be set.
	RpcService string `protobuf:"bytes,1,opt,name=rpc_service,json=rpcService,proto3" json:"rpc_service,omitempty"`
	// RpcMethod is the RPC method to call.
	// Must be set.
	RpcMethod string `protobuf:"bytes,2,opt,name=rpc_method,json=rpcMethod,proto3" json:"rpc_method,omitempty"`
	// Data contains the request or the first message in the stream.
	// Optional if streaming.
	Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
	// DataIsZero indicates Data is set with an empty message.
	DataIsZero bool `protobuf:"varint,4,opt,name=data_is_zero,json=dataIsZero,proto3" json:"data_is_zero,omitempty"`
	// contains filtered or unexported fields
}

CallStart requests starting a new RPC call.

func (*CallStart) Descriptor deprecated

func (*CallStart) Descriptor() ([]byte, []int)

Deprecated: Use CallStart.ProtoReflect.Descriptor instead.

func (*CallStart) EqualVT

func (this *CallStart) EqualVT(that *CallStart) bool

func (*CallStart) GetData

func (x *CallStart) GetData() []byte

func (*CallStart) GetDataIsZero added in v0.1.8

func (x *CallStart) GetDataIsZero() bool

func (*CallStart) GetRpcMethod

func (x *CallStart) GetRpcMethod() string

func (*CallStart) GetRpcService

func (x *CallStart) GetRpcService() string

func (*CallStart) MarshalToSizedBufferVT

func (m *CallStart) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*CallStart) MarshalToVT

func (m *CallStart) MarshalToVT(dAtA []byte) (int, error)

func (*CallStart) MarshalVT

func (m *CallStart) MarshalVT() (dAtA []byte, err error)

func (*CallStart) ProtoMessage

func (*CallStart) ProtoMessage()

func (*CallStart) ProtoReflect

func (x *CallStart) ProtoReflect() protoreflect.Message

func (*CallStart) Reset

func (x *CallStart) Reset()

func (*CallStart) SizeVT

func (m *CallStart) SizeVT() (n int)

func (*CallStart) String

func (x *CallStart) String() string

func (*CallStart) UnmarshalVT

func (m *CallStart) UnmarshalVT(dAtA []byte) error

func (*CallStart) Validate

func (p *CallStart) Validate() error

Validate performs cursory validation of the packet.

type Client

type Client interface {
	// Invoke executes a unary RPC with the remote.
	Invoke(ctx context.Context, service, method string, in, out Message) error

	// NewStream starts a streaming RPC with the remote & returns the stream.
	// firstMsg is optional.
	NewStream(ctx context.Context, service, method string, firstMsg Message) (Stream, error)
}

Client implements a SRPC client which can initiate RPC streams.

func NewClient

func NewClient(openStream OpenStreamFunc) Client

NewClient constructs a client with a OpenStreamFunc.

func NewClientWithMuxedConn added in v0.3.6

func NewClientWithMuxedConn(conn network.MuxedConn) Client

NewClientWithMuxedConn constructs a new client with a MuxedConn.

type ClientRPC

type ClientRPC struct {
	// contains filtered or unexported fields
}

ClientRPC represents the client side of an on-going RPC call message stream. Not concurrency safe: use a mutex if calling concurrently.

func NewClientRPC

func NewClientRPC(ctx context.Context, service, method string) *ClientRPC

NewClientRPC constructs a new ClientRPC session and writes CallStart. the writer will be closed when the ClientRPC completes. service and method must be specified. must call Start after creating the RPC object.

func (*ClientRPC) Close

func (r *ClientRPC) Close()

Close releases any resources held by the ClientRPC. not concurrency safe with HandlePacket.

func (*ClientRPC) Context

func (r *ClientRPC) Context() context.Context

Context is canceled when the ClientRPC is no longer valid.

func (*ClientRPC) HandleCallData

func (r *ClientRPC) HandleCallData(pkt *CallData) error

HandleCallData handles the call data packet.

func (*ClientRPC) HandleCallStart

func (r *ClientRPC) HandleCallStart(pkt *CallStart) error

HandleCallStart handles the call start packet.

func (*ClientRPC) HandlePacket

func (r *ClientRPC) HandlePacket(msg *Packet) error

HandlePacket handles an incoming parsed message packet. Not concurrency safe: use a mutex if calling concurrently.

func (*ClientRPC) HandlePacketData

func (r *ClientRPC) HandlePacketData(data []byte) error

HandlePacketData handles an incoming unparsed message packet. Not concurrency safe: use a mutex if calling concurrently.

func (*ClientRPC) ReadAll

func (r *ClientRPC) ReadAll() ([][]byte, error)

ReadAll reads all returned Data packets and returns any error. intended for use with unary rpcs.

func (*ClientRPC) Start

func (r *ClientRPC) Start(writer Writer, writeFirstMsg bool, firstMsg []byte) error

Start sets the writer and writes the MsgSend message. must only be called once!

type HTTPServer

type HTTPServer struct {
	// contains filtered or unexported fields
}

HTTPServer implements the SRPC server.

func NewHTTPServer

func NewHTTPServer(mux Mux, path string) (*HTTPServer, error)

NewHTTPServer builds a http server / handler. if path is empty, serves on all routes.

func (*HTTPServer) ServeHTTP

func (s *HTTPServer) ServeHTTP(w http.ResponseWriter, r *http.Request)

type Handler

type Handler interface {
	// GetServiceID returns the ID of the service.
	GetServiceID() string
	// GetMethodIDs returns the list of methods for the service.
	GetMethodIDs() []string
	// InvokeMethod invokes the method matching the service & method ID.
	// Returns false, nil if not found.
	// If service string is empty, ignore it.
	InvokeMethod(serviceID, methodID string, strm Stream) (bool, error)
}

Handler describes a SRPC call handler implementation.

type Message

type Message interface {
	MarshalVT() ([]byte, error)
	UnmarshalVT([]byte) error
}

Message is the vtprotobuf message interface.

type MsgStream added in v0.4.0

type MsgStream struct {
	// contains filtered or unexported fields
}

MsgStream implements the stream interface passed to implementations.

func NewMsgStream added in v0.4.0

func NewMsgStream(ctx context.Context, writer Writer, dataCh chan []byte) *MsgStream

NewMsgStream constructs a new Stream with a ClientRPC. dataCh should be closed when no more messages will arrive.

func (*MsgStream) Close added in v0.4.0

func (r *MsgStream) Close() error

Close closes the stream.

func (*MsgStream) CloseSend added in v0.4.0

func (r *MsgStream) CloseSend() error

CloseSend signals to the remote that we will no longer send any messages.

func (*MsgStream) Context added in v0.4.0

func (r *MsgStream) Context() context.Context

Context is canceled when the Stream is no longer valid.

func (*MsgStream) MsgRecv added in v0.4.0

func (r *MsgStream) MsgRecv(msg Message) error

MsgRecv receives an incoming message from the remote. Parses the message into the object at msg.

func (*MsgStream) MsgSend added in v0.4.0

func (r *MsgStream) MsgSend(msg Message) error

MsgSend sends the message to the remote.

type Mux

type Mux interface {
	// Register registers a new RPC method handler (service).
	Register(handler Handler) error
	// InvokeMethod invokes the method matching the service & method ID.
	// Returns false, nil if not found.
	// If service string is empty, ignore it.
	InvokeMethod(serviceID, methodID string, strm Stream) (bool, error)
}

Mux contains a set of <service, method> handlers.

func NewMux

func NewMux() Mux

NewMux constructs a new Mux.

type OpenStreamFunc

type OpenStreamFunc = func(ctx context.Context, msgHandler PacketHandler) (Writer, error)

OpenStreamFunc opens a stream with a remote. msgHandler must not be called concurrently.

func NewOpenStreamWithMuxedConn added in v0.3.6

func NewOpenStreamWithMuxedConn(conn network.MuxedConn) OpenStreamFunc

NewOpenStreamWithMuxedConn constructs a OpenStream func with a MuxedConn.

func NewServerPipe

func NewServerPipe(server *Server) OpenStreamFunc

NewServerPipe constructs a open stream func which creates an in-memory Pipe Stream with the given Server. Starts read pumps for both. Starts the HandleStream function on the server in a separate goroutine.

type Packet

type Packet struct {

	// Body is the packet body.
	//
	// Types that are assignable to Body:
	//	*Packet_CallStart
	//	*Packet_CallData
	Body isPacket_Body `protobuf_oneof:"body"`
	// contains filtered or unexported fields
}

Packet is a message sent over a srpc packet connection.

func NewCallDataPacket

func NewCallDataPacket(data []byte, dataIsZero bool, complete bool, err error) *Packet

NewCallDataPacket constructs a new CallData packet.

func NewCallStartPacket

func NewCallStartPacket(service, method string, data []byte, dataIsZero bool) *Packet

NewCallStartPacket constructs a new CallStart packet.

func (*Packet) Descriptor deprecated

func (*Packet) Descriptor() ([]byte, []int)

Deprecated: Use Packet.ProtoReflect.Descriptor instead.

func (*Packet) EqualVT

func (this *Packet) EqualVT(that *Packet) bool

func (*Packet) GetBody

func (m *Packet) GetBody() isPacket_Body

func (*Packet) GetCallData

func (x *Packet) GetCallData() *CallData

func (*Packet) GetCallStart

func (x *Packet) GetCallStart() *CallStart

func (*Packet) MarshalToSizedBufferVT

func (m *Packet) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*Packet) MarshalToVT

func (m *Packet) MarshalToVT(dAtA []byte) (int, error)

func (*Packet) MarshalVT

func (m *Packet) MarshalVT() (dAtA []byte, err error)

func (*Packet) ProtoMessage

func (*Packet) ProtoMessage()

func (*Packet) ProtoReflect

func (x *Packet) ProtoReflect() protoreflect.Message

func (*Packet) Reset

func (x *Packet) Reset()

func (*Packet) SizeVT

func (m *Packet) SizeVT() (n int)

func (*Packet) String

func (x *Packet) String() string

func (*Packet) UnmarshalVT

func (m *Packet) UnmarshalVT(dAtA []byte) error

func (*Packet) Validate

func (p *Packet) Validate() error

Validate performs cursory validation of the packet.

type PacketHandler

type PacketHandler = func(pkt *Packet) error

PacketHandler handles a packet.

type PacketReaderWriter

type PacketReaderWriter struct {
	// contains filtered or unexported fields
}

PacketReaderWriter reads and writes packets from a io.ReadWriter. Uses a LittleEndian uint32 length prefix.

func NewPacketReadWriter

func NewPacketReadWriter(rw io.ReadWriteCloser, cb PacketHandler) *PacketReaderWriter

NewPacketReadWriter constructs a new read/writer.

func (*PacketReaderWriter) Close

func (r *PacketReaderWriter) Close() error

Close closes the packet rw.

func (*PacketReaderWriter) ReadPump

func (r *PacketReaderWriter) ReadPump() error

ReadPump executes the read pump in a goroutine.

func (*PacketReaderWriter) WritePacket

func (r *PacketReaderWriter) WritePacket(p *Packet) error

WritePacket writes a packet to the writer.

type Packet_CallData

type Packet_CallData struct {
	// CallData is a message in a streaming RPC sequence.
	CallData *CallData `protobuf:"bytes,2,opt,name=call_data,json=callData,proto3,oneof"`
}

func (*Packet_CallData) MarshalToSizedBufferVT

func (m *Packet_CallData) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*Packet_CallData) MarshalToVT

func (m *Packet_CallData) MarshalToVT(dAtA []byte) (int, error)

func (*Packet_CallData) SizeVT

func (m *Packet_CallData) SizeVT() (n int)

type Packet_CallStart

type Packet_CallStart struct {
	// CallStart initiates a new call.
	CallStart *CallStart `protobuf:"bytes,1,opt,name=call_start,json=callStart,proto3,oneof"`
}

func (*Packet_CallStart) MarshalToSizedBufferVT

func (m *Packet_CallStart) MarshalToSizedBufferVT(dAtA []byte) (int, error)

func (*Packet_CallStart) MarshalToVT

func (m *Packet_CallStart) MarshalToVT(dAtA []byte) (int, error)

func (*Packet_CallStart) SizeVT

func (m *Packet_CallStart) SizeVT() (n int)

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handles incoming RPC streams with a mux.

func NewServer

func NewServer(mux Mux) *Server

NewServer constructs a new SRPC server.

func (*Server) AcceptMuxedConn added in v0.3.5

func (s *Server) AcceptMuxedConn(ctx context.Context, mplex network.MuxedConn) error

AcceptMuxedConn runs a loop which calls Accept on a muxer to handle streams.

Starts HandleStream in a separate goroutine to handle the stream. Returns context.Canceled or io.EOF when the loop is complete / closed.

func (*Server) GetMux added in v0.4.0

func (s *Server) GetMux() Mux

GetMux returns the mux.

func (*Server) HandleStream added in v0.3.5

func (s *Server) HandleStream(ctx context.Context, rwc io.ReadWriteCloser) error

HandleStream handles an incoming ReadWriteCloser stream.

type ServerRPC

type ServerRPC struct {
	// contains filtered or unexported fields
}

ServerRPC represents the server side of an on-going RPC call message stream. Not concurrency safe: use a mutex if calling concurrently.

func NewServerRPC

func NewServerRPC(ctx context.Context, mux Mux) *ServerRPC

NewServerRPC constructs a new ServerRPC session. note: call SetWriter before handling any incoming messages.

func (*ServerRPC) Close

func (r *ServerRPC) Close()

Close releases any resources held by the ServerRPC. not concurrency safe with HandlePacket.

func (*ServerRPC) Context

func (r *ServerRPC) Context() context.Context

Context is canceled when the ServerRPC is no longer valid.

func (*ServerRPC) HandleCallData

func (r *ServerRPC) HandleCallData(pkt *CallData) error

HandleCallData handles the call data packet.

func (*ServerRPC) HandleCallStart

func (r *ServerRPC) HandleCallStart(pkt *CallStart) error

HandleCallStart handles the call start packet.

func (*ServerRPC) HandlePacket

func (r *ServerRPC) HandlePacket(msg *Packet) error

HandlePacket handles an incoming parsed message packet. Not concurrency safe: use a mutex if calling concurrently.

func (*ServerRPC) SetWriter

func (r *ServerRPC) SetWriter(w Writer)

SetWriter sets the writer field.

type Stream

type Stream interface {
	// Context is canceled when the Stream is no longer valid.
	Context() context.Context

	// MsgSend sends the message to the remote.
	MsgSend(msg Message) error

	// MsgRecv receives an incoming message from the remote.
	// Parses the message into the object at msg.
	MsgRecv(msg Message) error

	// CloseSend signals to the remote that we will no longer send any messages.
	CloseSend() error

	// Close closes the stream.
	Close() error
}

Stream is a handle to an on-going bi-directional or one-directional stream RPC handle.

type WebSocketConn

type WebSocketConn struct {
	// contains filtered or unexported fields
}

WebSocketConn implements the p2p multiplexer over a WebSocket.

func NewWebSocketConn

func NewWebSocketConn(ctx context.Context, conn *websocket.Conn, isServer bool) (*WebSocketConn, error)

NewWebSocketConn constructs a new WebSocket connection.

func (*WebSocketConn) AcceptStream

func (w *WebSocketConn) AcceptStream() (io.ReadWriteCloser, error)

AcceptStream accepts an incoming stream.

func (*WebSocketConn) Close

func (w *WebSocketConn) Close() error

Close closes the writer.

func (*WebSocketConn) GetOpenStreamFunc

func (w *WebSocketConn) GetOpenStreamFunc() OpenStreamFunc

GetOpenStreamFunc returns the OpenStream func.

func (*WebSocketConn) GetWebSocket

func (w *WebSocketConn) GetWebSocket() *websocket.Conn

GetWebSocket returns the web socket conn.

func (*WebSocketConn) OpenStream

func (w *WebSocketConn) OpenStream(ctx context.Context, msgHandler PacketHandler) (Writer, error)

OpenStream tries to open a stream with the remote.

type Writer

type Writer interface {
	// WritePacket writes a packet to the remote.
	WritePacket(p *Packet) error
	// Close closes the writer.
	Close() error
}

Writer is the interface used to write messages to the remote.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL