rpc

package
v0.0.0-...-c18a219 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2023 License: MIT Imports: 26 Imported by: 4

README

RPC

Features

  • Simple and lightweight
  • High performance
  • Support port multiplexing (Multiple protocols on the same port)
  • Support multiple codecs: JSON, HTTP, Protocol Buffers...
  • Load balance: Random, RoundRobin...
  • OpenTracing support
  • Service discovery and governance with etcd, consul...
  • Support authentication and authorization (TODO)
  • Circuit breaker (TODO)
  • Rate limit (TODO)

Server

type TestService struct {
}

func (TestService) Hello(ctx context.Context, name string) string {
	return "Hello, " + name
}

func main() {
	s := rpc.Listen(transport.Address{URL: ":9000"})
	s.Match(json.Matcher, "json")
	//s.Match(jsoniter.Matcher, "json")
	//s.Match(proto.Matcher, "proto")
	//s.Use(filter.Test())
	s.RegisterService("Test", TestService{})
	s.RegisterFunc("Test", "Ping", func() string {
		return "pong"
	})
	log.Fatal(s.Serve())
}

Client

c := rpc.Dial("json", transport.Address{URL: "127.0.0.1:9000"})

var s string
err := c.Call(context.Background(), "Test", "Hello", []interface{}{"auxo"}, &s)

Code generating

If you use Protocol Buffers codec, you can generate contract codes from Protocol Buffers service definition files with protoc-gen-auxo.

Install tools

Use go get to install the code generator:

go install github.com/cuigh/protoc-gen-auxo

You will also need:

Usage
protoc --go_out=. --auxo_out=. hello.proto

The name of generated code file is end with .auxo.go. The file include service interfaces and client proxies.

Documentation

Index

Constants

View Source
const PkgName = "auxo.net.rpc"

Variables

View Source
var (
	// ErrNodeUnavailable indicates no node is available for call.
	ErrNodeUnavailable = NewError(StatusNodeUnavailable, "rpc: no node is available")
	// ErrNodeShutdown indicates Node is shut down.
	ErrNodeShutdown = NewError(StatusNodeShutdown, "rpc: node is shut down")
	// ErrNilResult indicates the return value is nil.
	ErrNilResult = NewError(StatusNilResult, "rpc: result is nil")
	// ErrServerClosed indicates Server is closed.
	ErrServerClosed = NewError(StatusServerClosed, "rpc: server closed")
)
View Source
var (
	ClientPool = clientManager{/* contains filtered or unexported fields */}
)

Functions

func Any

func Any(_ ReadPeeker) bool

Any is an always matched Matcher.

func NewError

func NewError(code StatusCode, format string, args ...interface{}) *errors.CodedError

func RegisterBalancer

func RegisterBalancer(b BalancerBuilder)

func RegisterCodec

func RegisterCodec(name string, cb CodecBuilder)

func RegisterNewer

func RegisterNewer(t reflect.Type, n Newer)

func StatusOf

func StatusOf(err error) int32

Types

type Action

type Action interface {
	// Name is the name of action, normally is '[Service].[Method]'
	Name() string
	// In returns input arguments of the action
	In() []reflect.Type
	// Out returns output arguments of the action
	Out() []reflect.Type
	// Context returns true if the first in-arg is `context.Context`
	Context() bool
	// Error returns true if the last out-arg is `error`
	Error() bool
	// Handler returns real executor of the action
	Handler() SHandler
	// contains filtered or unexported methods
}

Action is the interface that wraps the methods of service executor.

type ActionSet

type ActionSet interface {
	Get(name string) Action
	Find(service, method string) Action
	Range(fn func(a Action) bool)
}

type AsyncError

type AsyncError interface {
	Wait() error
}

type Balancer

type Balancer interface {
	Update(nodes []*Node)
	Next() (*Node, error)
}

type BalancerBuilder

type BalancerBuilder interface {
	// Name returns the name of balancers built by this builder.
	// It will be used to pick balancers (for example in service config).
	Name() string
	// Build creates a new balancer with the options.
	Build(opts data.Map) Balancer
}

BalancerBuilder creates a balancer.

func GetBalancer

func GetBalancer(name string) BalancerBuilder

type BalancerOptions

type BalancerOptions struct {
}

type CFilter

type CFilter func(CHandler) CHandler

CFilter is client interceptor.

type CHandler

type CHandler func(*Call) error

type Call

type Call struct {
	// contains filtered or unexported fields
}

func (*Call) Context

func (c *Call) Context() ct.Context

func (*Call) Err

func (c *Call) Err() error

func (*Call) Reply

func (c *Call) Reply() interface{}

func (*Call) Request

func (c *Call) Request() *Request

func (*Call) Server

func (c *Call) Server() string

func (*Call) SetContext

func (c *Call) SetContext(ctx ct.Context)

func (*Call) Wait

func (c *Call) Wait() (err error)

Wait implements interface of AsyncError.

type Channel

type Channel struct {
	net.Conn
	// contains filtered or unexported fields
}

func (*Channel) Flush

func (c *Channel) Flush() error

func (*Channel) ID

func (c *Channel) ID() string

func (*Channel) Peek

func (c *Channel) Peek(n int) ([]byte, error)

func (*Channel) Read

func (c *Channel) Read(p []byte) (n int, err error)

func (*Channel) ReadByte

func (c *Channel) ReadByte() (byte, error)

func (*Channel) ReadBytes

func (c *Channel) ReadBytes(delim byte) ([]byte, error)

func (*Channel) ReadString

func (c *Channel) ReadString(delim byte) (string, error)

func (*Channel) Reader

func (c *Channel) Reader() *bufio.Reader

func (*Channel) Write

func (c *Channel) Write(p []byte) (n int, err error)

func (*Channel) WriteByte

func (c *Channel) WriteByte(b byte) error

func (*Channel) WriteString

func (c *Channel) WriteString(s string) (int, error)

func (*Channel) Writer

func (c *Channel) Writer() *bufio.Writer

type Client

type Client struct {
	// contains filtered or unexported fields
}

func AutoClient

func AutoClient(name string) (*Client, error)

AutoClient loads options from config file and create a Client. The created client is cached, so next call AutoClient with the same name will return the same Client instance.

func Dial

func Dial(codec string, addrs ...transport.Address) (*Client, error)

Dial creates Client with codec and addrs.

func NewClient

func NewClient(opts ClientOptions) (*Client, error)

NewClient creates Client with options.

func (*Client) Call

func (c *Client) Call(ctx ct.Context, service, method string, args []interface{}, reply interface{}) error

func (*Client) Close

func (c *Client) Close()

func (*Client) Use

func (c *Client) Use(filter ...CFilter)

type ClientCodec

type ClientCodec interface {
	// Encode send request to server, must be concurrent safe.
	Encode(req *Request) error
	DecodeHead(head *ResponseHead) error
	DecodeResult(result *Result) error
	DiscardResult() error
}

type ClientOptions

type ClientOptions struct {
	Name    string              `json:"name" yaml:"name"`
	Desc    string              `json:"desc" yaml:"desc"`
	Version string              `json:"version" yaml:"version"`
	Group   string              `json:"group" yaml:"group"`
	Fail    FailMode            `json:"fail" yaml:"fail"`
	Address []transport.Address `json:"address" yaml:"address"`
	Codec   struct {
		Name    string   `json:"name" yaml:"name"`
		Options data.Map `json:"options" yaml:"options"`
	} `json:"codec" yaml:"codec"`
	Balancer struct {
		Name    string   `json:"name" yaml:"name"`
		Options data.Map `json:"options" yaml:"options"`
	} `json:"balancer" yaml:"balancer"`
	Resolver struct {
		Name    string   `json:"name" yaml:"name"`
		Options data.Map `json:"options" yaml:"options"`
	} `json:"resolver" yaml:"resolver"`
	Channels     int           `json:"channels" yaml:"channels"`
	CallTimeout  time.Duration `json:"call_timeout" yaml:"call_timeout"`
	DialTimeout  time.Duration `json:"dial_timeout" yaml:"dial_timeout"`
	ReadTimeout  time.Duration `json:"read_timeout" yaml:"read_timeout"`
	WriteTimeout time.Duration `json:"write_timeout" yaml:"write_timeout"`
}

func (*ClientOptions) AddAddress

func (opts *ClientOptions) AddAddress(uri string, options data.Map)

type CodecBuilder

type CodecBuilder interface {
	NewClient(s Stream, opts data.Map) ClientCodec
	NewServer(s Stream, opts data.Map) ServerCodec
}

type Context

type Context interface {
	Request() *Request
	Action() Action
	Context() ct.Context
	SetContext(ctx ct.Context)
	// User returns info of current visitor.
	User() security.User
	// SetUser set user info of current visitor. Generally used by authentication filter.
	SetUser(user security.User)
}

func FromContext

func FromContext(ctx ct.Context) Context

type FailMode

type FailMode int
const (
	// FailFast returns error immediately
	FailFast FailMode = iota
	// FailOver selects another server node automatically
	FailOver
	// FailTry use current server node again
	FailTry
)

func (*FailMode) Unmarshal

func (f *FailMode) Unmarshal(i interface{}) error

type LazyClient

type LazyClient struct {
	Name string
	// contains filtered or unexported fields
}

func (*LazyClient) Get

func (l *LazyClient) Get() (c *Client)

func (*LazyClient) Try

func (l *LazyClient) Try() (c *Client, err error)

type Matcher

type Matcher func(p ReadPeeker) bool

Matcher matches a connection based on it's content.

type Newer

type Newer func(t reflect.Type) interface{}

Newer is a function to generate a value instead of reflect.New.

type Node

type Node struct {
	// contains filtered or unexported fields
}

func (*Node) Call

func (n *Node) Call(ctx ct.Context, service, method string, args []interface{}, reply interface{}) (err error)

func (*Node) Close

func (n *Node) Close()

type NodeOptions

type NodeOptions struct {
	Codec struct {
		Name    string
		Options data.Map
	}
	Address transport.Address
}

type ReadPeeker

type ReadPeeker interface {
	Read(p []byte) (n int, err error)
	Peek(n int) ([]byte, error)
}

type Request

type Request struct {
	Head RequestHead   `json:"head"`
	Args []interface{} `json:"args,omitempty"`
}

type RequestHead

type RequestHead struct {
	//Type    byte         `json:"type"` // 0-rpc, 1-heartbeat
	ID      uint64       `json:"id"` // ID == 0 for a heartbeat response
	Service string       `json:"service,omitempty"`
	Method  string       `json:"method,omitempty"`
	Labels  data.Options `json:"labels,omitempty"`
	Assets  data.Options `json:"assets,omitempty"` // it would be send back to client

}

type Response

type Response struct {
	Head   ResponseHead `json:"head"`
	Result Result       `json:"result"`
}

type ResponseHead

type ResponseHead struct {
	//Type byte   `json:"type"` // 0-rpc, 1-heartbeat
	ID     uint64       `json:"id"` // ID == 0 for a heartbeat request
	Assets data.Options `json:"assets,omitempty"`
}

type Result

type Result struct {
	Value interface{}        `json:"value,omitempty"`
	Error *errors.CodedError `json:"error,omitempty"`
}

type SFilter

type SFilter func(SHandler) SHandler

type SHandler

type SHandler func(c Context) (interface{}, error)

type Server

type Server struct {
	// contains filtered or unexported fields
}

func AutoServer

func AutoServer(name string) (*Server, error)

AutoServer loads options from config file and create a Server.

func Listen

func Listen(addrs ...transport.Address) (*Server, error)

func NewServer

func NewServer(opts ServerOptions) (*Server, error)

func (*Server) Close

func (s *Server) Close(timeout time.Duration)

func (*Server) Match

func (s *Server) Match(m Matcher, codec string, opts ...data.Map)

func (*Server) RegisterFunc

func (s *Server) RegisterFunc(service, method string, fn interface{}, filter ...SFilter) error

func (*Server) RegisterService

func (s *Server) RegisterService(name string, svc interface{}, filter ...SFilter) error

func (*Server) Serve

func (s *Server) Serve() error

func (*Server) Sessions

func (s *Server) Sessions() SessionMap

func (*Server) Use

func (s *Server) Use(filter ...SFilter)

type ServerCodec

type ServerCodec interface {
	// Encode send response to client, must be concurrent safe.
	Encode(resp *Response) error
	DecodeHead(head *RequestHead) error
	DecodeArgs(args []interface{}) error
	DiscardArgs() error
}

type ServerOptions

type ServerOptions struct {
	Name     string `json:"name" yaml:"name"`
	Desc     string `json:"desc" yaml:"desc"`
	Version  string `json:"version" yaml:"version"`
	Address  []transport.Address
	Registry struct {
		Name    string   `json:"name" yaml:"name"`
		Options data.Map `json:"options" yaml:"options"`
	} `json:"registry" yaml:"registry"`
	CallTimeout       time.Duration `json:"call_timeout" yaml:"call_timeout"`
	ReadTimeout       time.Duration `json:"read_timeout" yaml:"read_timeout"`
	WriteTimeout      time.Duration `json:"write_timeout" yaml:"write_timeout"`
	HeartbeatInterval time.Duration `json:"heartbeat_interval" yaml:"heartbeat"`
	MaxClients        int32         `json:"max_clients" yaml:"max_clients"`
	MaxPoolSize       int32         `json:"max_pool_size" yaml:"max_pool_size"`
	Backlog           int32         `json:"backlog" yaml:"backlog"`
	Debug             bool          `json:"debug" yaml:"debug"`
}

func (*ServerOptions) AddAddress

func (opts *ServerOptions) AddAddress(uri string, options data.Map)

type Session

type Session interface {
	ID() string
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
}

type SessionMap

type SessionMap interface {
	Range(func(s Session) bool)
}

type Stats

type Stats struct {
	AcceptSuccess uint32
	AcceptFailure uint32
}

type StatusCode

type StatusCode uint32

A StatusCode is an unsigned 32-bit error code.

const (
	// StatusOK is returned on success.
	StatusOK StatusCode = 0

	// StatusUnknown indicates an unknown error. An example of where this error may be returned is
	// if a Status value received from another address space belongs to
	// an error-space that is not known in this address space.  Also
	// errors raised by APIs that do not return enough error information
	// may be converted to this error.
	StatusUnknown StatusCode = 1

	// StatusCanceled indicates the operation was canceled (typically by the caller).
	StatusCanceled StatusCode = 2

	// StatusDeadlineExceeded means operation expired before completion.
	// For operations that change the state of the system, this error may be
	// returned even if the operation has completed successfully. For
	// example, a successful response from a server could have been delayed
	// long enough for the deadline to expire.
	StatusDeadlineExceeded StatusCode = 3

	// StatusNodeUnavailable indicates no node is available for call.
	StatusNodeUnavailable StatusCode = 4

	// StatusNodeShutdown indicates Node is shut down.
	StatusNodeShutdown StatusCode = 5

	// StatusCodecNotRegistered indicates codec is not registered.
	StatusCodecNotRegistered StatusCode = 6

	// StatusServerClosed indicates server is closed.
	StatusServerClosed StatusCode = 7

	// StatusMethodNotFound indicates calling method is unregistered on server.
	StatusMethodNotFound StatusCode = 8

	// StatusInvalidArgument indicates client specified an invalid argument.
	// Note that this differs from FailedPrecondition. It indicates arguments
	// that are problematic regardless of the state of the system
	// (e.g., a malformed file name).
	StatusInvalidArgument StatusCode = 9

	// StatusUnauthorized indicates client is unauthorized.
	StatusUnauthorized StatusCode = 10

	// StatusLoginFailed indicates client's login is failed.
	StatusLoginFailed StatusCode = 11

	// StatusNilResult indicates return value is nil.
	StatusNilResult StatusCode = 12
)

type Stream

type Stream interface {
	// read
	Reader() *bufio.Reader
	Peek(n int) ([]byte, error)
	Read(p []byte) (n int, err error)
	ReadByte() (byte, error)
	ReadBytes(delim byte) ([]byte, error)
	ReadString(delim byte) (string, error)

	// write
	Writer() *bufio.Writer
	Write(p []byte) (n int, err error)
	WriteByte(c byte) error
	WriteString(s string) (int, error)
	Flush() error
}

Directories

Path Synopsis
codec
filter
dns

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL