rpc

package
v2.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 25, 2022 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

View Source
const (
	Balancer  = "balancer"
	Connector = "connector"
	Server    = "backend"
	Database  = "database"
	Loader    = "loader"
)
View Source
const (
	DefaultQueue  = "dq"
	DefaultSuffix = ""
	JsonSuffix    = "json"
)
View Source
const (
	CodeTypeJson  = "json"
	CodeTypeProto = "proto"
)
View Source
const (
	MsgTypeRequest  MessageType = 0x00
	MsgTypePublish              = 0x01
	MsgTypeResponse             = 0x02
)

Message types

Variables

View Source
var (
	ErrInvalidMessage = errors.New("invalid message")
)

Functions

func DefRpcInit added in v2.0.6

func DefRpcInit()

公用调用方法

func DefaultCallback

func DefaultCallback(req *MsgRpc) []byte

func Publish

func Publish(s ReqBuilder) error

func PublishBroadcast added in v2.0.5

func PublishBroadcast(s ReqBuilder) error

func QueuePublish added in v2.0.5

func QueuePublish(s ReqBuilder) error

func QueueRequest added in v2.0.5

func QueueRequest(s ReqBuilder) error

func Request

func Request(s ReqBuilder) error

Types

type CallbackFunc

type CallbackFunc func(req *MsgRpc) []byte

type DefaultRpcEncoder

type DefaultRpcEncoder struct {
	// contains filtered or unexported fields
}

func NewRpcEncoder

func NewRpcEncoder(encoder serialize.Serializer) *DefaultRpcEncoder

func (*DefaultRpcEncoder) Decode

func (r *DefaultRpcEncoder) Decode(data []byte, rpcMsg *MsgRpc) error

func (*DefaultRpcEncoder) DecodeMsg

func (r *DefaultRpcEncoder) DecodeMsg(data []byte, v any) error

func (*DefaultRpcEncoder) Encode

func (r *DefaultRpcEncoder) Encode(rpcMsg *MsgRpc) ([]byte, error)

Encode Protocol --------<length>--------|--type--|----<MsgId>------|-<data>- ----------3byte---------|-1 byte-|-----4 byte------|--------

func (*DefaultRpcEncoder) Response

func (r *DefaultRpcEncoder) Response(v any) []byte

type EncoderRpc

type EncoderRpc interface {
	Encode(rpcMsg *MsgRpc) ([]byte, error)
	Decode(data []byte, rpcMsg *MsgRpc) error
	DecodeMsg(data []byte, v any) error
	Response(v any) []byte
}

type Handler

type Handler struct {
	// contains filtered or unexported fields
}

func NewHandler

func NewHandler() *Handler

func (*Handler) DealMsg

func (h *Handler) DealMsg(codeType string, server ServerRpc, req *MsgRpc) ([]byte, error)

func (*Handler) Register

func (h *Handler) Register(msgId int32, v any)

type HandlerItem

type HandlerItem struct {
	MsgType MessageType
	InType  reflect.Type
	Func    reflect.Value
}

type HttpHandler added in v2.0.2

type HttpHandler interface {
	Run(addr ...string) error
}

type MessageType

type MessageType byte

type MsgHandler added in v2.0.3

type MsgHandler interface {
	Register(msgId int32, v any)
	DealMsg(codeType string, server ServerRpc, req *MsgRpc) ([]byte, error)
}

type MsgRpc

type MsgRpc struct {
	MsgType MessageType
	MsgId   int32
	MsgData any
}

type NatsRpc

type NatsRpc struct {
	Endpoints   []string
	Options     []nats.Option
	Client      *nats.Conn
	DialTimeout time.Duration
	RpcCoder    map[string]EncoderRpc
	Server      *treaty.Server
	DebugMsg    bool
	Prefix      string
	Finder      *discover.Finder
}

func NewRpcNats

func NewRpcNats(opts ...NatsRpcOption) *NatsRpc

func (*NatsRpc) DealMsg

func (r *NatsRpc) DealMsg(msg *nats.Msg, callback CallbackFunc, coder EncoderRpc)

func (*NatsRpc) DecodeMsg

func (r *NatsRpc) DecodeMsg(codeType string, data []byte, v any) error

func (*NatsRpc) EncodeMsg

func (r *NatsRpc) EncodeMsg(coder EncoderRpc, msgType MessageType, msgId int32, req any) ([]byte, error)

func (*NatsRpc) Find

func (r *NatsRpc) Find(serverType string, arg any) *treaty.Server

func (*NatsRpc) GetCoder

func (r *NatsRpc) GetCoder(codeType string) EncoderRpc

func (*NatsRpc) GetServer

func (r *NatsRpc) GetServer() *treaty.Server

func (*NatsRpc) Publish

func (r *NatsRpc) Publish(s ReqBuilder) error

func (*NatsRpc) PublishBroadcast

func (r *NatsRpc) PublishBroadcast(s ReqBuilder) error

func (*NatsRpc) QueuePublish added in v2.0.3

func (r *NatsRpc) QueuePublish(s ReqBuilder) error

func (*NatsRpc) QueueRequest added in v2.0.3

func (r *NatsRpc) QueueRequest(s ReqBuilder) error

func (*NatsRpc) QueueSubscribe

func (r *NatsRpc) QueueSubscribe(s RssBuilder) error

func (*NatsRpc) RegEncoder

func (r *NatsRpc) RegEncoder(typ string, encoder EncoderRpc)

func (*NatsRpc) RemoveFindCache

func (r *NatsRpc) RemoveFindCache(arg any)

func (*NatsRpc) Request

func (r *NatsRpc) Request(s ReqBuilder) error

func (*NatsRpc) Response

func (r *NatsRpc) Response(codeType string, v any) []byte

func (*NatsRpc) Subscribe

func (r *NatsRpc) Subscribe(s RssBuilder) error

func (*NatsRpc) SubscribeBroadcast

func (r *NatsRpc) SubscribeBroadcast(s RssBuilder) error

type NatsRpcOption

type NatsRpcOption func(r *NatsRpc)

func WithNatsDebugMsg

func WithNatsDebugMsg(debug bool) NatsRpcOption

func WithNatsDialTimeout

func WithNatsDialTimeout(timeout time.Duration) NatsRpcOption

func WithNatsEndpoints

func WithNatsEndpoints(endpoints []string) NatsRpcOption

func WithNatsOptions

func WithNatsOptions(opts ...nats.Option) NatsRpcOption

func WithNatsPrefix

func WithNatsPrefix(prefix string) NatsRpcOption

func WithNatsServer

func WithNatsServer(server *treaty.Server) NatsRpcOption

type Option added in v2.0.3

type Option func(b *ServerBase)

func WithBroadcastEventHandler added in v2.0.3

func WithBroadcastEventHandler(handler CallbackFunc) Option

func WithInnerMsgHandler added in v2.0.3

func WithInnerMsgHandler(handler MsgHandler) Option

func WithPlugin added in v2.0.3

func WithPlugin(plugin ServerPlugin) Option

func WithPlugins added in v2.0.3

func WithPlugins(plugins []ServerPlugin) Option

func WithSelfEventHandler added in v2.0.3

func WithSelfEventHandler(handler CallbackFunc) Option

type ReqBuilder

type ReqBuilder struct {
	// contains filtered or unexported fields
}

func DefaultReqBuilder added in v2.0.3

func DefaultReqBuilder() *ReqBuilder

func NewReqBuilder

func NewReqBuilder(server *treaty.Server) *ReqBuilder

func (*ReqBuilder) Build

func (r *ReqBuilder) Build() ReqBuilder

func (*ReqBuilder) SetCodeType

func (r *ReqBuilder) SetCodeType(codeType string) *ReqBuilder

func (*ReqBuilder) SetMsgId

func (r *ReqBuilder) SetMsgId(msgId int32) *ReqBuilder

func (*ReqBuilder) SetQueue added in v2.0.3

func (r *ReqBuilder) SetQueue(queue string) *ReqBuilder

func (*ReqBuilder) SetReq

func (r *ReqBuilder) SetReq(req any) *ReqBuilder

func (*ReqBuilder) SetResp

func (r *ReqBuilder) SetResp(resp any) *ReqBuilder

func (*ReqBuilder) SetServer

func (r *ReqBuilder) SetServer(server *treaty.Server) *ReqBuilder

func (*ReqBuilder) SetServerType

func (r *ReqBuilder) SetServerType(serverType string) *ReqBuilder

func (*ReqBuilder) SetSuffix

func (r *ReqBuilder) SetSuffix(suffix string) *ReqBuilder

type RssBuilder

type RssBuilder struct {
	// contains filtered or unexported fields
}

func NewRssBuilder

func NewRssBuilder(server *treaty.Server) *RssBuilder

func (*RssBuilder) Build

func (r *RssBuilder) Build() RssBuilder

func (*RssBuilder) SetCallback

func (r *RssBuilder) SetCallback(callback CallbackFunc) *RssBuilder

func (*RssBuilder) SetCodeType

func (r *RssBuilder) SetCodeType(codeType string) *RssBuilder

func (*RssBuilder) SetParallel

func (r *RssBuilder) SetParallel(parallel bool) *RssBuilder

func (*RssBuilder) SetQueue

func (r *RssBuilder) SetQueue(queue string) *RssBuilder

func (*RssBuilder) SetServer

func (r *RssBuilder) SetServer(server *treaty.Server) *RssBuilder

func (*RssBuilder) SetSuffix

func (r *RssBuilder) SetSuffix(suffix string) *RssBuilder

type ServerBase added in v2.0.3

type ServerBase struct {
	Server     *treaty.Server
	Rpc        ServerRpc
	SubBuilder *RssBuilder
	// contains filtered or unexported fields
}

func NewServerBase added in v2.0.3

func NewServerBase(s *treaty.Server, options ...Option) *ServerBase

func (*ServerBase) AddPlugin added in v2.0.3

func (s *ServerBase) AddPlugin(plugin ServerPlugin)

func (*ServerBase) AfterInit added in v2.0.3

func (s *ServerBase) AfterInit()

func (*ServerBase) BeforeShutdown added in v2.0.3

func (s *ServerBase) BeforeShutdown()

func (*ServerBase) HandleBroadcastEvent added in v2.0.3

func (s *ServerBase) HandleBroadcastEvent(req *MsgRpc) []byte

func (*ServerBase) HandleSelfEvent added in v2.0.3

func (s *ServerBase) HandleSelfEvent(req *MsgRpc) []byte

HandleSelfEvent 内部事件处理

func (*ServerBase) Init added in v2.0.3

func (s *ServerBase) Init()

func (*ServerBase) Register added in v2.0.3

func (s *ServerBase) Register(msgId int32, v any)

func (*ServerBase) SetBroadcastEventHandler added in v2.0.3

func (s *ServerBase) SetBroadcastEventHandler(handler CallbackFunc)

func (*ServerBase) SetInnerMsgHandler added in v2.0.3

func (s *ServerBase) SetInnerMsgHandler(handler MsgHandler)

func (*ServerBase) SetSelfEventHandler added in v2.0.3

func (s *ServerBase) SetSelfEventHandler(handler CallbackFunc)

func (*ServerBase) Shutdown added in v2.0.3

func (s *ServerBase) Shutdown()

type ServerCreator

type ServerCreator func(s *treaty.Server) (ServerEntity, error)

type ServerEntity

type ServerEntity interface {
	Init()                                   //初始化
	AfterInit()                              //初始化后执行操作
	BeforeShutdown()                         //服务关闭前操作
	Shutdown()                               //服务关闭操作
	HandleSelfEvent(req *MsgRpc) []byte      //处理自己的事件
	HandleBroadcastEvent(req *MsgRpc) []byte //处理广播事件
}

ServerEntity server entity

type ServerPlugin added in v2.0.3

type ServerPlugin interface {
	Init(s *ServerBase)           //初始化
	AfterInit(s *ServerBase)      //初始化后执行操作
	BeforeShutdown(s *ServerBase) //服务关闭前操作
	Shutdown(s *ServerBase)       //服务关闭操作
}

ServerPlugin server extand

type ServerQueue added in v2.0.3

type ServerQueue struct {
	*ServerBase
	// contains filtered or unexported fields
}

ServerQueue base queue

func NewServerQueue added in v2.0.3

func NewServerQueue(queue string, s *treaty.Server, options ...Option) *ServerQueue

func (*ServerQueue) AfterInit added in v2.0.3

func (s *ServerQueue) AfterInit()

type ServerRpc

type ServerRpc interface {
	RegEncoder(typ string, encoder EncoderRpc)           //register encoder
	Subscribe(s RssBuilder) error                        //self Subscribe
	SubscribeBroadcast(s RssBuilder) error               //broadcast subscribe
	QueueSubscribe(s RssBuilder) error                   //queue self Subscribe
	Publish(s ReqBuilder) error                          //publish
	QueuePublish(s ReqBuilder) error                     //queue publish
	PublishBroadcast(s ReqBuilder) error                 //broadcast publish
	Request(s ReqBuilder) error                          //request
	QueueRequest(s ReqBuilder) error                     //queue request
	Response(codeType string, v any) []byte              //response the msg
	DecodeMsg(codeType string, data []byte, v any) error //decode msg
	GetCoder(codeType string) EncoderRpc                 //get encoder
	GetServer() *treaty.Server                           //get current server
	Find(serverType string, arg any) *treaty.Server      //find server
	RemoveFindCache(arg any)                             //clear find cache
}

ServerRpc rpc interface

func NewRpcServer

func NewRpcServer(cfg config.RpcConf, server *treaty.Server) ServerRpc

NewRpcServer create rpc server

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL