birpc

package module
v0.0.0-...-b602ce8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 29, 2017 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DEFAULT_CONFIG = &Config{
	KeepAliveTimeout:  40 * time.Second,
	KeepAliveInterval: 20 * time.Second,
	Timeout:           1 * time.Minute,
}
View Source
var GobEncoding = &Encoding{Encoder: func(writer io.Writer) Encoder {
	return &gobEncoder{gob.NewEncoder(&ErrorPropagationWriter{Writer: writer})}
},
	Decoder: func(reader io.Reader) Decoder {
		return &gobDecoder{gob.NewDecoder(&ErrorPropagationReader{Reader: reader})}
	},
	Register: func(dataType interface{}) {
		gob.Register(dataType)
	}}

Functions

func StructHandler

func StructHandler(s interface{}) map[string]StreamHandler

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func (*Client) Connect

func (c *Client) Connect(ctx context.Context, network string, addr string) (*StickyConnection, error)

func (*Client) Serve

func (c *Client) Serve(ctx context.Context, network string, addr string, connections chan *StickyConnection) error

type ClientConnection

type ClientConnection interface {
	Connection
	OpenStream(ctx context.Context, path string) (Stream, error)
	Call(ctx context.Context, path string, req interface{}, response interface{}) error
	CallStream(ctx context.Context, path string, req interface{}) (StreamReader, error)
}

type Config

type Config struct {
	KeepAliveInterval time.Duration
	KeepAliveTimeout  time.Duration
	Timeout           time.Duration
}

func (*Config) Validate

func (c *Config) Validate() error

type Connection

type Connection interface {
	Config() *Config
	State() ConnectionState
	Close() error
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
}

type ConnectionState

type ConnectionState int
const (
	CON_CONNECTING ConnectionState = iota
	CON_READY
	CON_FAILURE
	CON_IDLE
	CON_CLOSED
)

type Decoder

type Decoder interface {
	Decode(to interface{}) error
}

Must return RpcError(code=RPC_WRONG_ENCODING) or RpcConnectionError on underlying reader/writer errors

type DecodingProvider

type DecodingProvider func(reader io.Reader) Decoder

type Encoder

type Encoder interface {
	Encode(from interface{}) error
}

Must return RpcError(code=RPC_WRONG_ENCODING) or RpcConnectionError on underlying reader/writer errors

type Encoding

type Encoding struct {
	Encoder  EncodingProvider
	Decoder  DecodingProvider
	Register func(dataType interface{})
}

type EncodingProvider

type EncodingProvider func(writer io.Writer) Encoder

type ErrorPropagationReader

type ErrorPropagationReader struct {
	Reader io.Reader
}

func (*ErrorPropagationReader) Read

func (r *ErrorPropagationReader) Read(p []byte) (n int, err error)

type ErrorPropagationWriter

type ErrorPropagationWriter struct {
	Writer io.Writer
}

Used to propagate writer errors as RpcError to distinguish encoding errors from connection errors

func (*ErrorPropagationWriter) Write

func (w *ErrorPropagationWriter) Write(p []byte) (n int, err error)

type Network

type Network interface {
	DialContext(ctx context.Context, network, address string) (net.Conn, error)
	Listen(network, address string) (net.Listener, error)
}

type Rpc

type Rpc struct {
	Config *Config
	// contains filtered or unexported fields
}

func NewRpc

func NewRpc(encoding *Encoding, config *Config) (*Rpc, error)

func (*Rpc) NewClient

func (rpc *Rpc) NewClient() *Client

func (*Rpc) NewServer

func (rpc *Rpc) NewServer(id string) *Server

type RpcConnectionError

type RpcConnectionError struct {
	Cause error
}

func (*RpcConnectionError) Error

func (rce *RpcConnectionError) Error() string

type RpcError

type RpcError struct {
	Code RpcStatusCode
	Msg  string
}

RPC error is part of rpc protocol error reporting. All these errors is fatal for stream - stream will be closed after such error

func (*RpcError) Error

func (e *RpcError) Error() string

type RpcStatusCode

type RpcStatusCode byte
const (
	RPC_OK             RpcStatusCode = iota
	RPC_PROTOCOL_ERROR               // wrong protocol; followed by error message and stream close
	RPC_NOT_FOUND                    // received by client if not such path; like Http 404; followed by stream close
	RPC_WRONG_ENCODING               // returned to client when server unable to decode stream message; followed by encoded string with error description and stream close
	RPC_APP_ERROR                    // application specific error; followed by encoded app error object and stream close
)

type Server

type Server struct {
	// contains filtered or unexported fields
}

func (*Server) Connect

func (s *Server) Connect(ctx context.Context, network string, addr string) error

func (*Server) RegisterHandler

func (s *Server) RegisterHandler(path string, handler StreamHandler)

func (*Server) Serve

func (s *Server) Serve(ctx context.Context, network string, addr string) error

type ServerConnection

type ServerConnection interface {
	Connection
	AcceptStream(ctx context.Context) (Stream, error)
}

type StickyConnection

type StickyConnection struct {
	StateChangeListener func(oldState StickyConnectionState, newState StickyConnectionState)
	// contains filtered or unexported fields
}

func (*StickyConnection) Call

func (sc *StickyConnection) Call(ctx context.Context, path string, req interface{}, res interface{}) error

func (*StickyConnection) CallStream

func (sc *StickyConnection) CallStream(ctx context.Context, path string, req interface{}) (StreamReader, error)

func (*StickyConnection) Id

func (sc *StickyConnection) Id() string

func (*StickyConnection) LocalAddr

func (sc *StickyConnection) LocalAddr() net.Addr

func (*StickyConnection) OpenStream

func (sc *StickyConnection) OpenStream(ctx context.Context, path string) (Stream, error)

func (*StickyConnection) ProcessStream

func (sc *StickyConnection) ProcessStream(ctx context.Context,
	path string,
	req interface{},
	targetMaker func() interface{},
	processor func(target interface{}) error) error

func (*StickyConnection) RemoteAddr

func (sc *StickyConnection) RemoteAddr() net.Addr

func (*StickyConnection) State

func (*StickyConnection) Terminate

func (sc *StickyConnection) Terminate()

Terminate connection. It can't be used after termination and can be freely deleted

type StickyConnectionState

type StickyConnectionState uint8
const (
	SCON_OFFLINE    StickyConnectionState = iota // all calls will return error in this state, can become online when underlying transport reconnected
	SCON_ONLINE                                  // underlying transport is connected and can be used
	SCON_TERMINATED                              // connection terminated and can't be used anymore
)

type Stream

type Stream interface {
	Path() string
	Read(target interface{}) error
	Write(data interface{}) error
	WriteError(err error) error

	io.Closer
	// contains filtered or unexported methods
}

type StreamHandler

type StreamHandler func(context.Context, Stream) error

func FuncHandler

func FuncHandler(f interface{}) StreamHandler

type StreamImpl

type StreamImpl struct {
	// contains filtered or unexported fields
}

Stream is analogue of tpc connection but it opened on multiplexed tcp stream

func (*StreamImpl) Close

func (s *StreamImpl) Close() error

func (*StreamImpl) Path

func (s *StreamImpl) Path() string

func (*StreamImpl) Read

func (s *StreamImpl) Read(target interface{}) error

func (*StreamImpl) Write

func (s *StreamImpl) Write(data interface{}) error

func (*StreamImpl) WriteError

func (s *StreamImpl) WriteError(err error) error

type StreamReader

type StreamReader interface {
	io.Closer
	Read(targetMsg interface{}) error
}

restricted interface to read messages from stream note what closing reader or writer will cause whole stream to be closed, not only single direction as in TCP

type StreamWriter

type StreamWriter interface {
	io.Closer
	Write(msg interface{}) error
}

restricted interface to write messages to stream

type TcpNetwork

type TcpNetwork struct{}

func (*TcpNetwork) DialContext

func (tcpnet *TcpNetwork) DialContext(ctx context.Context, network, address string) (net.Conn, error)

func (*TcpNetwork) Listen

func (tcpnet *TcpNetwork) Listen(network, address string) (net.Listener, error)

type TlsNetwork

type TlsNetwork struct {
	ClientConfig *tls.Config
	ServerConfig *tls.Config
}

func (*TlsNetwork) DialContext

func (tlsnet *TlsNetwork) DialContext(ctx context.Context, network, address string) (net.Conn, error)

func (*TlsNetwork) Listen

func (tlsnet *TlsNetwork) Listen(network, address string) (net.Listener, error)

type Transport

type Transport struct {
	// contains filtered or unexported fields
}

Transport allow open and accept streams. Transport is not statefull and can fail. Client/Server implementation support creating and maintaining transports(including reconnects) This particular implementation is around multiplexed TCP/TLS connection

func (*Transport) AcceptStream

func (transport *Transport) AcceptStream(ctx context.Context) (Stream, error)

Block until other side opens stream. Client can validate path and send error RPC_NOT_FOUND if path is bad

func (*Transport) Call

func (transport *Transport) Call(ctx context.Context, path string, req interface{}, response interface{}) error

Simplified method with call semantic. Uses openStream, write request and read response after what closes the stream. Should always have request and response. If nil passed to req or res bool placeholder will be used instead.

func (*Transport) CallStream

func (transport *Transport) CallStream(ctx context.Context, path string, req interface{}) (StreamReader, error)

Simplified method with call semantic. Uses openStream, write request and return reader to read sequence of responses. User must close reader after completion.

func (*Transport) Close

func (transport *Transport) Close() error

Closes transport and all underlying connections

func (*Transport) Config

func (transport *Transport) Config() *Config

func (*Transport) LocalAddr

func (transport *Transport) LocalAddr() net.Addr

Return local address of the transport connection

func (*Transport) OpenStream

func (transport *Transport) OpenStream(ctx context.Context, path string) (Stream, error)

Open stream for specified path. Stream operations can be canceled via ctx

func (*Transport) RemoteAddr

func (transport *Transport) RemoteAddr() net.Addr

Return remote address of the transport connection

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL