muxrpc

package module
v2.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2021 License: MIT Imports: 24 Imported by: 43

README

go-muxrpc Build Status GoDoc Go Report Card

golang.org implementation of ssbc/muxrpc

main purpose: efficient interface to scuttlebot

Documentation

Overview

Code generated by counterfeiter. DO NOT EDIT.

Code generated by counterfeiter. DO NOT EDIT.

Index

Constants

View Source
const ChunkSize = 65536

Variables

View Source
var (
	ErrStreamNotReadable = errors.New("muxrpc: this stream can not be read from")
	ErrStreamNotWritable = errors.New("muxrpc: this stream can not be written to")
	ErrStreamNotClosable = errors.New("muxrpc: this stream can not be closed")
)
View Source
var ErrSessionTerminated = errors.New("muxrpc: session terminated")

ErrSessionTerminated is returned once Terminate() was called or the connection dies

Functions

func HasMethod added in v2.0.11

func HasMethod(edp Endpoint, m Method) bool

HasMethod returns true if an endpoint supports a specific method

func IsServer

func IsServer(edp Endpoint) bool

IsServer tells you if the passed endpoint is in the server-role or not. i.e.: Did I call the remote: yes. Was I called by the remote: no. Q: don't want to extend Endpoint interface?

func IsSinkClosed

func IsSinkClosed(err error) bool

IsSinkClosed should be moved to luigi to gether with the error

func NewSinkWriter

func NewSinkWriter(sink *ByteSink) io.WriteCloser

func NewSourceReader

func NewSourceReader(src *ByteSource) io.Reader

Types

type ByteSink

type ByteSink struct {
	// contains filtered or unexported fields
}

ByteSink exposes a WriteCloser which wrapps each write into a muxrpc packet for that stream with the correct flags set.

func NewTestSink

func NewTestSink(w io.Writer) *ByteSink

func (*ByteSink) AsStream

func (bs *ByteSink) AsStream() *streamSink

AsStream returns a legacy stream adapter for luigi code

func (*ByteSink) Close

func (bs *ByteSink) Close() error

func (*ByteSink) CloseWithError

func (bs *ByteSink) CloseWithError(err error) error

func (*ByteSink) SetEncoding

func (bs *ByteSink) SetEncoding(re RequestEncoding)

func (*ByteSink) Write

func (bs *ByteSink) Write(b []byte) (int, error)

type ByteSinker

type ByteSinker interface {
	io.WriteCloser

	// sometimes we want to close a query early before it is drained
	// (this sends a EndErr packet back )
	CloseWithError(error) error

	SetEncoding(re RequestEncoding)
}

type ByteSource

type ByteSource struct {
	// contains filtered or unexported fields
}

ByteSource is inspired by sql.Rows but without the Scan(), it just reads plain []bytes, one per muxrpc packet.

func NewTestSource

func NewTestSource(bodies ...[]byte) *ByteSource

func (*ByteSource) AsStream

func (bs *ByteSource) AsStream() *streamSource

AsStream returns a legacy stream adapter for luigi code

func (*ByteSource) Bytes

func (bs *ByteSource) Bytes() ([]byte, error)

Bytes returns the full slice of bytes from the next frame.

func (*ByteSource) Cancel

func (bs *ByteSource) Cancel(err error)

Cancel stops reading and terminates the request. Sometimes we want to close a query early before it is drained.

func (*ByteSource) Err

func (bs *ByteSource) Err() error

Err returns nill or an error when processing fails or the context was canceled

func (*ByteSource) Next

func (bs *ByteSource) Next(ctx context.Context) bool

Next blocks until there are new muxrpc frames for this stream

func (*ByteSource) Reader

func (bs *ByteSource) Reader(fn ReadFn) error

Reader passes a (limited) reader for the next segment to the passed . Since the stream can't be written while it's read, the reader is only valid during the call to the passed function.

type ByteSourcer

type ByteSourcer interface {
	Next(context.Context) bool
	Reader(ReadFn) error

	// sometimes we want to close a query early before it is drained
	// (this sends a EndErr packet back )
	Cancel(error)
}

type CallError

type CallError struct {
	Name    string `json:"name"`
	Message string `json:"message"`
	Stack   string `json:"stack"`
}

CallError is returned when a call fails

func (CallError) Error

func (e CallError) Error() string

type CallHandler

type CallHandler interface {
	HandleCall(ctx context.Context, req *Request)
}

type CallType

type CallType string

CallType is the type of a call

func (CallType) Flags

func (t CallType) Flags() codec.Flag

Flags returns the packet flags of the respective call type

type ConnectHandler

type ConnectHandler interface {
	HandleConnect(ctx context.Context, edp Endpoint)
}

type Endpoint

type Endpoint interface {
	// The different call types:
	Async(ctx context.Context, ret interface{}, tipe RequestEncoding, method Method, args ...interface{}) error

	Source(ctx context.Context, tipe RequestEncoding, method Method, args ...interface{}) (*ByteSource, error)
	Sink(ctx context.Context, tipe RequestEncoding, method Method, args ...interface{}) (*ByteSink, error)
	Duplex(ctx context.Context, tipe RequestEncoding, method Method, args ...interface{}) (*ByteSource, *ByteSink, error)

	// Terminate wraps up the RPC session
	Terminate() error

	// Remote returns the network address of the remote
	Remote() net.Addr
}

Endpoint allows calling functions on the RPC peer.

func Handle

func Handle(pkr *Packer, handler Handler, opts ...HandleOption) Endpoint

Handle handles the connection of the packer using the specified handler.

type ErrNoSuchMethod added in v2.0.4

type ErrNoSuchMethod struct {
	Method Method
}

func (ErrNoSuchMethod) Error added in v2.0.4

func (e ErrNoSuchMethod) Error() string

type ErrWrongStreamType

type ErrWrongStreamType struct {
	// contains filtered or unexported fields
}

func (ErrWrongStreamType) Error

func (wst ErrWrongStreamType) Error() string

type FakeEndpoint

type FakeEndpoint struct {
	AsyncStub func(context.Context, interface{}, RequestEncoding, Method, ...interface{}) error

	DuplexStub func(context.Context, RequestEncoding, Method, ...interface{}) (*ByteSource, *ByteSink, error)

	RemoteStub func() net.Addr

	SinkStub func(context.Context, RequestEncoding, Method, ...interface{}) (*ByteSink, error)

	SourceStub func(context.Context, RequestEncoding, Method, ...interface{}) (*ByteSource, error)

	TerminateStub func() error
	// contains filtered or unexported fields
}

func (*FakeEndpoint) Async

func (fake *FakeEndpoint) Async(arg1 context.Context, arg2 interface{}, arg3 RequestEncoding, arg4 Method, arg5 ...interface{}) error

func (*FakeEndpoint) AsyncArgsForCall

func (fake *FakeEndpoint) AsyncArgsForCall(i int) (context.Context, interface{}, RequestEncoding, Method, []interface{})

func (*FakeEndpoint) AsyncCallCount

func (fake *FakeEndpoint) AsyncCallCount() int

func (*FakeEndpoint) AsyncCalls

func (fake *FakeEndpoint) AsyncCalls(stub func(context.Context, interface{}, RequestEncoding, Method, ...interface{}) error)

func (*FakeEndpoint) AsyncReturns

func (fake *FakeEndpoint) AsyncReturns(result1 error)

func (*FakeEndpoint) AsyncReturnsOnCall

func (fake *FakeEndpoint) AsyncReturnsOnCall(i int, result1 error)

func (*FakeEndpoint) Duplex

func (fake *FakeEndpoint) Duplex(arg1 context.Context, arg2 RequestEncoding, arg3 Method, arg4 ...interface{}) (*ByteSource, *ByteSink, error)

func (*FakeEndpoint) DuplexArgsForCall

func (fake *FakeEndpoint) DuplexArgsForCall(i int) (context.Context, RequestEncoding, Method, []interface{})

func (*FakeEndpoint) DuplexCallCount

func (fake *FakeEndpoint) DuplexCallCount() int

func (*FakeEndpoint) DuplexCalls

func (fake *FakeEndpoint) DuplexCalls(stub func(context.Context, RequestEncoding, Method, ...interface{}) (*ByteSource, *ByteSink, error))

func (*FakeEndpoint) DuplexReturns

func (fake *FakeEndpoint) DuplexReturns(result1 *ByteSource, result2 *ByteSink, result3 error)

func (*FakeEndpoint) DuplexReturnsOnCall

func (fake *FakeEndpoint) DuplexReturnsOnCall(i int, result1 *ByteSource, result2 *ByteSink, result3 error)

func (*FakeEndpoint) Invocations

func (fake *FakeEndpoint) Invocations() map[string][][]interface{}

func (*FakeEndpoint) Remote

func (fake *FakeEndpoint) Remote() net.Addr

func (*FakeEndpoint) RemoteCallCount

func (fake *FakeEndpoint) RemoteCallCount() int

func (*FakeEndpoint) RemoteCalls

func (fake *FakeEndpoint) RemoteCalls(stub func() net.Addr)

func (*FakeEndpoint) RemoteReturns

func (fake *FakeEndpoint) RemoteReturns(result1 net.Addr)

func (*FakeEndpoint) RemoteReturnsOnCall

func (fake *FakeEndpoint) RemoteReturnsOnCall(i int, result1 net.Addr)

func (*FakeEndpoint) Sink

func (fake *FakeEndpoint) Sink(arg1 context.Context, arg2 RequestEncoding, arg3 Method, arg4 ...interface{}) (*ByteSink, error)

func (*FakeEndpoint) SinkArgsForCall

func (fake *FakeEndpoint) SinkArgsForCall(i int) (context.Context, RequestEncoding, Method, []interface{})

func (*FakeEndpoint) SinkCallCount

func (fake *FakeEndpoint) SinkCallCount() int

func (*FakeEndpoint) SinkCalls

func (fake *FakeEndpoint) SinkCalls(stub func(context.Context, RequestEncoding, Method, ...interface{}) (*ByteSink, error))

func (*FakeEndpoint) SinkReturns

func (fake *FakeEndpoint) SinkReturns(result1 *ByteSink, result2 error)

func (*FakeEndpoint) SinkReturnsOnCall

func (fake *FakeEndpoint) SinkReturnsOnCall(i int, result1 *ByteSink, result2 error)

func (*FakeEndpoint) Source

func (fake *FakeEndpoint) Source(arg1 context.Context, arg2 RequestEncoding, arg3 Method, arg4 ...interface{}) (*ByteSource, error)

func (*FakeEndpoint) SourceArgsForCall

func (fake *FakeEndpoint) SourceArgsForCall(i int) (context.Context, RequestEncoding, Method, []interface{})

func (*FakeEndpoint) SourceCallCount

func (fake *FakeEndpoint) SourceCallCount() int

func (*FakeEndpoint) SourceCalls

func (fake *FakeEndpoint) SourceCalls(stub func(context.Context, RequestEncoding, Method, ...interface{}) (*ByteSource, error))

func (*FakeEndpoint) SourceReturns

func (fake *FakeEndpoint) SourceReturns(result1 *ByteSource, result2 error)

func (*FakeEndpoint) SourceReturnsOnCall

func (fake *FakeEndpoint) SourceReturnsOnCall(i int, result1 *ByteSource, result2 error)

func (*FakeEndpoint) Terminate

func (fake *FakeEndpoint) Terminate() error

func (*FakeEndpoint) TerminateCallCount

func (fake *FakeEndpoint) TerminateCallCount() int

func (*FakeEndpoint) TerminateCalls

func (fake *FakeEndpoint) TerminateCalls(stub func() error)

func (*FakeEndpoint) TerminateReturns

func (fake *FakeEndpoint) TerminateReturns(result1 error)

func (*FakeEndpoint) TerminateReturnsOnCall

func (fake *FakeEndpoint) TerminateReturnsOnCall(i int, result1 error)

type FakeHandler added in v2.0.1

type FakeHandler struct {
	HandleCallStub func(context.Context, *Request)

	HandleConnectStub func(context.Context, Endpoint)

	HandledStub func(Method) bool
	// contains filtered or unexported fields
}

func (*FakeHandler) HandleCall added in v2.0.1

func (fake *FakeHandler) HandleCall(arg1 context.Context, arg2 *Request)

func (*FakeHandler) HandleCallArgsForCall added in v2.0.1

func (fake *FakeHandler) HandleCallArgsForCall(i int) (context.Context, *Request)

func (*FakeHandler) HandleCallCallCount added in v2.0.1

func (fake *FakeHandler) HandleCallCallCount() int

func (*FakeHandler) HandleCallCalls added in v2.0.1

func (fake *FakeHandler) HandleCallCalls(stub func(context.Context, *Request))

func (*FakeHandler) HandleConnect added in v2.0.1

func (fake *FakeHandler) HandleConnect(arg1 context.Context, arg2 Endpoint)

func (*FakeHandler) HandleConnectArgsForCall added in v2.0.1

func (fake *FakeHandler) HandleConnectArgsForCall(i int) (context.Context, Endpoint)

func (*FakeHandler) HandleConnectCallCount added in v2.0.1

func (fake *FakeHandler) HandleConnectCallCount() int

func (*FakeHandler) HandleConnectCalls added in v2.0.1

func (fake *FakeHandler) HandleConnectCalls(stub func(context.Context, Endpoint))

func (*FakeHandler) Handled added in v2.0.1

func (fake *FakeHandler) Handled(arg1 Method) bool

func (*FakeHandler) HandledArgsForCall added in v2.0.1

func (fake *FakeHandler) HandledArgsForCall(i int) Method

func (*FakeHandler) HandledCallCount added in v2.0.1

func (fake *FakeHandler) HandledCallCount() int

func (*FakeHandler) HandledCalls added in v2.0.1

func (fake *FakeHandler) HandledCalls(stub func(Method) bool)

func (*FakeHandler) HandledReturns added in v2.0.1

func (fake *FakeHandler) HandledReturns(result1 bool)

func (*FakeHandler) HandledReturnsOnCall added in v2.0.1

func (fake *FakeHandler) HandledReturnsOnCall(i int, result1 bool)

func (*FakeHandler) Invocations added in v2.0.1

func (fake *FakeHandler) Invocations() map[string][][]interface{}

type HandleOption

type HandleOption func(*rpc)

HandleOption are used to configure rpc handler instances

func WithContext

func WithContext(ctx context.Context) HandleOption

WithContext sets the context for the whole lifetime of the rpc session

func WithIsServer

func WithIsServer(yes bool) HandleOption

WithIsServer sets wether the Handle should be in the server (true) or client (false) role

func WithLogger

func WithLogger(l log.Logger) HandleOption

WithLogger let's you overwrite the stderr logger

func WithRemoteAddr

func WithRemoteAddr(addr net.Addr) HandleOption

WithRemoteAddr also sets the remote address the endpoint is connected to. ie if the packer tunnels through something which can't see the address.

type Handler

type Handler interface {

	// Handled returns true if the method is handled by the handler
	Handled(Method) bool

	CallHandler
	ConnectHandler
}

Handler allows handling connections. When we are being called, HandleCall is called. When a connection is established, HandleConnect is called. TODO: let HandleCall return an error

func ApplyHandlerWrappers

func ApplyHandlerWrappers(h Handler, hws ...HandlerWrapper) Handler

type HandlerMux

type HandlerMux struct {
	// contains filtered or unexported fields
}

func (*HandlerMux) HandleCall

func (hm *HandlerMux) HandleCall(ctx context.Context, req *Request)

func (*HandlerMux) HandleConnect

func (hm *HandlerMux) HandleConnect(ctx context.Context, edp Endpoint)

func (*HandlerMux) Handled

func (hm *HandlerMux) Handled(m Method) bool

func (*HandlerMux) Register

func (hm *HandlerMux) Register(m Method, h Handler)

type HandlerWrapper

type HandlerWrapper func(Handler) Handler

type Method

type Method []string

Method defines the name of the endpoint.

func (Method) String

func (m Method) String() string

func (*Method) UnmarshalJSON

func (m *Method) UnmarshalJSON(d []byte) error

UnmarshalJSON decodes the

type Packer

type Packer struct {
	// contains filtered or unexported fields
}

Packer is a duplex stream that sends and receives *codec.Packet values. Usually wraps a network connection or stdio.

func NewPacker

func NewPacker(rwc io.ReadWriteCloser) *Packer

NewPacker takes an io.ReadWriteCloser and returns a Packer.

func (*Packer) Close

func (pkr *Packer) Close() error

Close closes the packer.

func (*Packer) NextHeader

func (pkr *Packer) NextHeader(ctx context.Context, hdr *codec.Header) error

Next returns the next packet from the underlying stream.

type ReadFn

type ReadFn func(r io.Reader) error

ReadFn is what a ByteSource needs for it's ReadFn. The passed reader is only valid during the call to it.

type Request

type Request struct {
	// Stream is a legacy adapter for luigi-powered streams
	Stream Stream `json:"-"`

	// Method is the name of the called function
	Method Method `json:"name"`

	// Args contains the call arguments
	RawArgs json.RawMessage `json:"args"`

	// Type is the type of the call, i.e. async, sink, source or duplex
	Type CallType `json:"type"`
	// contains filtered or unexported fields
}

Request assembles the state of an RPC call

func (*Request) Args

func (req *Request) Args() []interface{}

Args is a legacy stub to get the unmarshaled json arguments

func (*Request) Close

func (req *Request) Close() error

Close closes the stream with io.EOF

func (*Request) CloseWithError

func (req *Request) CloseWithError(cerr error) error

CloseWithError is used to close an ongoing request. Ie instruct the remote to stop sending data or notify it that a stream couldn't be fully filled because of an error

func (Request) Endpoint

func (req Request) Endpoint() Endpoint

Endpoint returns the client instance to start new calls. Mostly usefull inside handlers.

func (Request) RemoteAddr

func (req Request) RemoteAddr() net.Addr

RemoteAddr returns the netwrap'ed network adddress of the underlying connection. This is usually a pair of secretstream.Addr and TCP

func (*Request) ResponseSink

func (req *Request) ResponseSink() (*ByteSink, error)

ResponseSink returns the response writer for incoming source requests.

func (*Request) ResponseSource

func (req *Request) ResponseSource() (*ByteSource, error)

ResponseSource returns the reader for incoming data of sink or duplex calls.

func (*Request) Return

func (req *Request) Return(ctx context.Context, v interface{}) error

Return is a helper that returns on an async call

type RequestEncoding

type RequestEncoding uint

RequestEncoding hides the specifics of codec.Flag

const (
	TypeBinary RequestEncoding = iota
	TypeString
	TypeJSON
)

binary, string and JSON are the three supported format types. Don't ask me why we have string and binary, this just copies the javascript secifics.

func (RequestEncoding) IsValid

func (rt RequestEncoding) IsValid() bool

IsValid returns false if the type is not known.

type Server

type Server interface {
	Remote() net.Addr
	Serve() error
}

Server can handle packets to and from a remote party

type Stream

type Stream interface {
	luigi.Source
	luigi.Sink
	luigi.ErrorCloser

	// WithType tells the stream in what type JSON data should be unmarshalled into
	WithType(tipe interface{})

	// WithReq tells the stream what request number should be used for sent messages
	WithReq(req int32)
}

Stream is a muxrpc stream for the general duplex case.

Directories

Path Synopsis
cmd
muxgen
muxgen generates code to easily call functions on the endpoint type myEndpoint struct { // ...
muxgen generates code to easily call functions on the endpoint type myEndpoint struct { // ...
Package codec implements readers and writers for https://github.com/dominictarr/packet-stream-codec Packet structure: ( [flags (1byte), length (4 bytes, UInt32BE), req (4 bytes, Int32BE)] # Header [body (length bytes)] ) * [zeros (9 bytes)] Flags: [ignored (4 bits), stream (1 bit), end/err (1 bit), type (2 bits)] type = {0 => Buffer, 1 => String, 2 => JSON} # PacketType
Package codec implements readers and writers for https://github.com/dominictarr/packet-stream-codec Packet structure: ( [flags (1byte), length (4 bytes, UInt32BE), req (4 bytes, Int32BE)] # Header [body (length bytes)] ) * [zeros (9 bytes)] Flags: [ignored (4 bits), stream (1 bit), end/err (1 bit), type (2 bits)] type = {0 => Buffer, 1 => String, 2 => JSON} # PacketType
Package typemux offers an improved muxrpc.HandlerMux (think HTTP router).
Package typemux offers an improved muxrpc.HandlerMux (think HTTP router).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL