Documentation ¶
Index ¶
- Constants
- Variables
- func CreateMessageByID(hash uint32) proto.Message
- func CreateMessageByName(name string) proto.Message
- func CreatePairingAck(req proto.Message) proto.Message
- func CreatePairingAckBy(reqName string) proto.Message
- func DateTime() string
- func GetMessageIDByName(name string) uint32
- func GetMessageIDOf(msg proto.Message) uint32
- func GetMessageNameByID(hash uint32) string
- func GetPairingAckName(reqName string) string
- func GetServiceNames() []string
- func IBody2Bytes(body interface{}) []byte
- func IBody2Float(body interface{}) float64
- func IBody2Int64(body interface{}) int64
- func IBody2String(body interface{}) string
- func IBodyAsBytes(body interface{}) []byte
- func Now() time.Time
- func NowString() string
- func PB2JSON(msg proto.Message) string
- func RegisterAllAPI()
- func RegisterService(s Service)
- func SafeToBody(val interface{}) interface{}
- func SetMessageAllocator(alloc IMessageAllocator)
- func StartClock()
- func StopClock()
- func WallClock() *datetime.Clock
- type ArenaAllocator
- type Endpoint
- type EndpointFlag
- type EndpointMap
- type IMessageAllocator
- type Message
- func (m *Message) AddRefers(refers ...NodeID)
- func (m *Message) AutoDecode() error
- func (m *Message) Clone() *Message
- func (m *Message) DecodeTo(msg proto.Message) error
- func (m *Message) EncodeToBytes() []byte
- func (m *Message) Errno() int32
- func (m *Message) Refuse(errno int32) error
- func (m *Message) Reply(ack proto.Message) error
- func (m *Message) ReplyAny(body interface{}) error
- func (m *Message) Reset()
- func (m *Message) SetBody(val interface{})
- func (m *Message) SetErrno(ec int32)
- func (m *Message) String() string
- type MessageAllocator
- type MessageEndpoint
- type MessageHandler
- type NetPacket
- type NodeID
- type NodeIDSet
- type PacketFlag
- type PoolAllocator
- type RuntimeAllocator
- type Service
- type ServiceContext
- func (c *ServiceContext) AddFinalizer(action func())
- func (c *ServiceContext) Close()
- func (c *ServiceContext) InboundQueue() chan<- *Message
- func (c *ServiceContext) Instance() Service
- func (c *ServiceContext) MessageQueue() <-chan *Message
- func (c *ServiceContext) QuitDone() <-chan struct{}
- func (c *ServiceContext) RunID() string
- func (c *ServiceContext) Send(pkt *Message)
- func (c *ServiceContext) SetInstance(inst Service)
- func (c *ServiceContext) StartTime() time.Time
- type State
Constants ¶
View Source
const ( FOREACH_CONTINUE = 0 FOREACH_EXIT = 1 )
foreach控制
View Source
const ( SERVICE_ALL = 0xFF // 所有服务 INSTANCE_ALL = 0xFFFF // 所有实例 )
View Source
const ( NodeServiceShift = 16 NodeTypeShift = 31 NodeServiceMask = 0x00FF0000 NodeInstanceMask = 0x0000FFFF )
View Source
const ( SuffixREQ = "Req" SuffixACK = "Ack" SuffixNTF = "Ntf" )
View Source
const ( StateInit = 0 StateStarting = 1 StateRunning = 2 StateShuttingDown = 3 StateTerminated = 4 )
View Source
const ClockEpoch int64 = 1577836800 // 2020-01-01 00:00:00 UTC
epoch of clock
View Source
const VERSION = "0.4.6"
版本号
Variables ¶
View Source
var ErrContextInboundQueueFull = errors.New("context inbound queue full")
Functions ¶
func CreateMessageByName ¶ added in v0.4.4
根据名称创建消息
func CreatePairingAck ¶ added in v0.4.4
如果消息的名字是XXXReq,则尝试创建与其名称对应的XXXAck消息
func CreatePairingAckBy ¶ added in v0.4.4
如果消息的名字是XXXReq,则尝试创建与其名称对应的XXXAck消息
func GetPairingAckName ¶ added in v0.4.4
根据Req消息的名称,返回其对应的Ack消息名称
func IBodyAsBytes ¶ added in v0.4.1
func IBodyAsBytes(body interface{}) []byte
func RegisterAllAPI ¶ added in v0.4.4
func RegisterAllAPI()
自动注册所有protobuf消息 因为protobuf使用init()注册(RegisterType),此API需要在import后调用
func SetMessageAllocator ¶ added in v0.4.1
func SetMessageAllocator(alloc IMessageAllocator)
Types ¶
type ArenaAllocator ¶ added in v0.4.2
type ArenaAllocator struct {
// contains filtered or unexported fields
}
自定义arena算法的分配器
func NewArenaAllocator ¶ added in v0.4.2
func NewArenaAllocator(blockSize int) *ArenaAllocator
blockSize应该是64的倍数
func (*ArenaAllocator) Free ¶ added in v0.4.2
func (a *ArenaAllocator) Free(msg *Message)
func (*ArenaAllocator) PtrIndex ¶ added in v0.4.2
func (a *ArenaAllocator) PtrIndex(msg *Message) int
是否在分配区间内
type Endpoint ¶
type Endpoint interface { MessageEndpoint // 原始连接对象 RawConn() net.Conn // 发送/接收计数数据 Stats() *stats.Stats ErrorChan() <-chan error OutMsgChan() chan<- *Message // 开启read/write线程 Go(EndpointFlag) // 设置加解密 SetEncryptPair(cipher.BlockCryptor, cipher.BlockCryptor) }
网络端点
type EndpointFlag ¶
type EndpointFlag uint32
开启reader/writer标记
const ( EndpointReader EndpointFlag = 0x01 // 只开启reader EndpointWriter EndpointFlag = 0x02 // 只开启writer EndpointReadWriter EndpointFlag = 0x03 // 开启reader和writer )
type EndpointMap ¶
type EndpointMap interface { Size() int Has(node NodeID) bool Get(node NodeID) Endpoint IsEmpty() bool Foreach(func(Endpoint) bool) Put(node NodeID, endpoint Endpoint) PutIfAbsent(node NodeID, endpoint Endpoint) Remove(node NodeID) Pop(node NodeID) Endpoint Clear() }
线程安全的Endpoint字典
type IMessageAllocator ¶ added in v0.4.2
func NewPoolAllocator ¶ added in v0.4.2
func NewPoolAllocator() IMessageAllocator
func NewRuntimeAllocator ¶ added in v0.4.2
func NewRuntimeAllocator() IMessageAllocator
type Message ¶ added in v0.4.1
type Message struct { Command int32 `json:"cmd"` // 协议命令,即如何执行消息 MsgID uint32 `json:"mid,omitempty"` // 协议ID,即如何解析body Seq uint16 `json:"seq,omitempty"` // 序列号 Flags PacketFlag `json:"flg,omitempty"` // 标志位 Node NodeID `json:"node,omitempty"` // 源/目标节点 Refers []NodeID `json:"ref,omitempty"` // 组播session列表 Body interface{} `json:"body,omitempty"` // 消息内容,int32/int64/float64/string/bytes/proto.Message Endpoint MessageEndpoint `json:"-"` // 关联的endpoint }
Message表示一个应用层消息
func AllocMessage ¶ added in v0.4.2
func AllocMessage() *Message
func NewMessage ¶ added in v0.4.1
func NewMessage(cmd int32, flags PacketFlag, seq uint16, body interface{}) *Message
func (*Message) AutoDecode ¶ added in v0.4.5
根据MsgID把body的字节流解析为proto消息
func (*Message) EncodeToBytes ¶ added in v0.4.1
type MessageAllocator ¶ added in v0.4.1
type MessageAllocator func() *Message
type MessageEndpoint ¶
type MessageEndpoint interface { // 节点ID NodeID() NodeID SetNodeID(NodeID) // 远端地址 RemoteAddr() string // 发送消息(非阻塞) Send(*Message) error // 关闭读/写 Close() error ForceClose(error) IsRunning() bool // 绑定自定义数据 SetUserData(interface{}) UserData() interface{} }
绑定到消息上的endpoint
type MessageHandler ¶ added in v0.4.1
消息处理器
type NodeID ¶
type NodeID uint32
节点ID 一个32位整数表示的节点号,用以标识一个service(最高位为0),或者一个客户端session(最高位为1) 如果是服务编号:8位服务编号,16位服务实例编号
服务实例二进制布局 -------------------------------------- | reserved | service | instance | -------------------------------------- 32 24 16 0
type PacketFlag ¶
type PacketFlag uint16
消息标志位
const ( // 低8位用于表达一些传输flag PFlagCompressed PacketFlag = 0x01 // 压缩 PFlagEncrypted PacketFlag = 0x02 // 加密 PFlagError PacketFlag = 0x04 // 错误标记 PFlagHashCmd PacketFlag = 0x10 // CMD是字符串hash PFlagReserve PacketFlag = 0x20 // PFlagRPC PacketFlag = 0x40 // 远程调用 // 高8位用于server之间通信使用 PFlagRawBody PacketFlag = 0x0100 // body是未解析的字节流 PFlagMulticast PacketFlag = 0x1000 // 组播消息 PFlagRoute PacketFlag = 0x2000 // 路由消息 )
func (PacketFlag) Clear ¶ added in v0.1.21
func (g PacketFlag) Clear(n PacketFlag) PacketFlag
func (PacketFlag) Has ¶ added in v0.1.21
func (g PacketFlag) Has(n PacketFlag) bool
type PoolAllocator ¶ added in v0.4.2
type PoolAllocator struct {
// contains filtered or unexported fields
}
使用sync.Pool的分配器
func (*PoolAllocator) Alloc ¶ added in v0.4.2
func (a *PoolAllocator) Alloc() *Message
func (*PoolAllocator) Free ¶ added in v0.4.2
func (a *PoolAllocator) Free(msg *Message)
type RuntimeAllocator ¶ added in v0.4.2
type RuntimeAllocator struct { }
默认使用new的分配器
func (*RuntimeAllocator) Alloc ¶ added in v0.4.2
func (a *RuntimeAllocator) Alloc() *Message
func (*RuntimeAllocator) Free ¶ added in v0.4.2
func (a *RuntimeAllocator) Free(msg *Message)
type Service ¶
type Service interface { Type() uint8 Name() string // 节点ID NodeID() NodeID SetNodeID(id NodeID) // 当前状态 State() int32 SetState(int32) // 上下文对象 Context() *ServiceContext // 初始化 Init(context.Context, *ServiceContext) error // 启动服务 Startup(context.Context) error }
一个Service代表一个服务
type ServiceContext ¶
type ServiceContext struct {
// contains filtered or unexported fields
}
服务的上下文
func NewServiceContext ¶
func NewServiceContext(queueSize int) *ServiceContext
func (*ServiceContext) AddFinalizer ¶ added in v0.1.28
func (c *ServiceContext) AddFinalizer(action func())
func (*ServiceContext) InboundQueue ¶ added in v0.1.6
func (c *ServiceContext) InboundQueue() chan<- *Message
消息队列,仅接收
func (*ServiceContext) Instance ¶ added in v0.1.2
func (c *ServiceContext) Instance() Service
service实例
func (*ServiceContext) MessageQueue ¶
func (c *ServiceContext) MessageQueue() <-chan *Message
消息队列,仅消费
func (*ServiceContext) QuitDone ¶ added in v0.1.24
func (c *ServiceContext) QuitDone() <-chan struct{}
等待close完成
func (*ServiceContext) Send ¶ added in v0.1.5
func (c *ServiceContext) Send(pkt *Message)
投递一条消息到context
func (*ServiceContext) SetInstance ¶ added in v0.2.9
func (c *ServiceContext) SetInstance(inst Service)
func (*ServiceContext) StartTime ¶ added in v0.1.24
func (c *ServiceContext) StartTime() time.Time
type State ¶ added in v0.1.19
type State int32
service state
func (*State) IsShuttingDown ¶ added in v0.1.19
func (*State) IsTerminated ¶ added in v0.1.19
Source Files ¶
Click to show internal directories.
Click to hide internal directories.