Documentation ¶
Index ¶
- Constants
- Variables
- func ClearFactory()
- func CreateMessageByFullName(fullName string) proto.Message
- func CreateMessageByID(msgId uint32) proto.Message
- func CreateMessageByShortName(name string) proto.Message
- func CreateMsgFromJSON(text []byte) (proto.Message, error)
- func CreatePairingAck(fullReqName string) proto.Message
- func DecodeHTTPRequestBody(req *http.Request, ptr interface{}) error
- func FreeNetMessage(netMsg *NetMessage)
- func GetHTTPRequestIP(req *http.Request) string
- func GetLocalIPList() []net.IP
- func GetMessageFullName(msgId uint32) string
- func GetMessageId(fullName string) uint32
- func GetMessageIdOf(msg proto.Message) uint32
- func GetMessageShortName(msgId uint32) string
- func GetMessageType(msgId uint32) reflect.Type
- func GetPairingAckName(fullReqName string) string
- func GetPairingAckNameOf(msgId uint32) string
- func HasValidSuffix(name string) bool
- func HashMsgName(name string) uint32
- func IsAckMessage(name string) bool
- func IsReqMessage(name string) bool
- func ReadLenData(r io.Reader, maxSize uint16) ([]byte, error)
- func ReadProtoFromHTTPRequest(req *http.Request, msg proto.Message) error
- func ReadProtoMessage(conn net.Conn, msg proto.Message) error
- func Register(fullname string) error
- func RegisterAllMessages()
- func RequestProtoMessage(conn net.Conn, req, ack proto.Message) error
- func TryEnqueueMsg(queue chan<- *NetMessage, msg *NetMessage) bool
- func WriteLenData(w io.Writer, body []byte) error
- func WriteProtoHTTPResponse(w http.ResponseWriter, msg proto.Message, contentType string) error
- func WriteProtoMessage(w io.Writer, msg proto.Message) error
- type Encryptor
- type Endpoint
- type EndpointMap
- func (m *EndpointMap) Clear()
- func (m *EndpointMap) Foreach(action func(Endpoint) bool)
- func (m *EndpointMap) Get(node NodeID) Endpoint
- func (m *EndpointMap) Has(node NodeID) bool
- func (m *EndpointMap) Init() *EndpointMap
- func (m *EndpointMap) IsEmpty() bool
- func (m *EndpointMap) Keys() []NodeID
- func (m *EndpointMap) Put(node NodeID, endpoint Endpoint)
- func (m *EndpointMap) PutIfAbsent(node NodeID, endpoint Endpoint)
- func (m *EndpointMap) Remove(node NodeID)
- func (m *EndpointMap) Size() int
- type Error
- type MsgFlag
- type NetMessage
- func (m *NetMessage) Ack(ack proto.Message) error
- func (m *NetMessage) Clone() *NetMessage
- func (m *NetMessage) DecodeTo(msg proto.Message) error
- func (m *NetMessage) Encode() error
- func (m *NetMessage) Refuse(ec int32) error
- func (m *NetMessage) Reply(cmd uint32, data []byte) error
- func (m *NetMessage) Reset()
- type NetV1Header
- func (h NetV1Header) CRC() uint32
- func (h NetV1Header) CalcCRC(body []byte) uint32
- func (h NetV1Header) Clear()
- func (h NetV1Header) Command() uint32
- func (h NetV1Header) Flag() MsgFlag
- func (h NetV1Header) Len() uint32
- func (h NetV1Header) Pack(size uint32, flag MsgFlag, seq, cmd uint32)
- func (h NetV1Header) Seq() uint32
- func (h NetV1Header) SetCRC(v uint32)
- type NodeID
- type NodeIDSet
- type SessionMessage
- type StreamConnBase
- func (c *StreamConnBase) GetNode() NodeID
- func (c *StreamConnBase) GetRemoteAddr() string
- func (c *StreamConnBase) GetUserData() any
- func (c *StreamConnBase) IsRunning() bool
- func (c *StreamConnBase) SendMsg(msg *NetMessage, mode int) error
- func (c *StreamConnBase) SendNonBlock(msg *NetMessage) bool
- func (c *StreamConnBase) SetEncryption(encrypt, decrypt Encryptor)
- func (c *StreamConnBase) SetNode(node NodeID)
- func (c *StreamConnBase) SetSendQueue(sendQueue chan *NetMessage)
- func (c *StreamConnBase) SetUserData(val any)
- type TcpSession
- type WebsockSession
- func (t *WebsockSession) Close() error
- func (t *WebsockSession) ForceClose(reason error)
- func (t *WebsockSession) Go(ctx context.Context, reader, writer bool)
- func (t *WebsockSession) ReadMessage(netMsg *NetMessage) error
- func (t *WebsockSession) SetIntranet(v bool)
- func (t *WebsockSession) UnderlyingConn() net.Conn
- type WsRecvMsg
- type WsWriteMsg
Constants ¶
const ( V1HeaderLength = 16 // 消息头大小 MaxPacketSize = 0x00FFFFFF // 最大消息大小,~16MB MaxPayloadSize = MaxPacketSize - V1HeaderLength // )
const ( SendBlock = 0 SendNonblock = 1 )
const ( DefaultRecvQueueSize = 1 << 16 DefaultBackendSendQueueSize = 1 << 15 DefaultSessionSendQueueSize = 1 << 10 DefaultBacklogSize = 128 DefaultErrorChanSize = 64 )
const ( NodeInstanceShift = 32 NodeTypeShift = 48 MaxNodeInstance = (1 << NodeInstanceShift) - 1 NodeBackendTypeMask NodeID = 1 << NodeTypeShift )
const ErrCodeField = "Code"
const PrefixLength = 2 // sizeof uint16
const (
UrlFormKey = "pb-data"
)
const WriteBufferReuseSize = 1 << 14
Variables ¶
var ( DefaultCompressThreshold = 1 << 12 // 压缩阈值,4KB MaxClientUpStreamSize = 1 << 18 // 最大client上行消息大小,256KB )
var ( ErrPktSizeOutOfRange = errors.New("packet size out of range") ErrPktChecksumMismatch = errors.New("packet checksum mismatch") ErrCannotDecryptPkt = errors.New("cannot decrypt packet") ErrConnNotRunning = errors.New("connection not running") ErrConnOutboundOverflow = errors.New("connection outbound queue overflow") ErrConnForceClose = errors.New("connection forced to close") ErrBufferOutOfRange = errors.New("buffer out of range") ErrConnClosed = errors.New("connection is closed") ErrInvalidWSMsgType = errors.New("invalid websocket message type") )
var DecodeMsgBody = func(flags MsgFlag, body []byte, decrypt Encryptor) ([]byte, error) { if flags.Has(FlagEncrypt) { if decrypt == nil { return nil, ErrCannotDecryptPkt } if decrypted, err := decrypt.Decrypt(body); err != nil { return nil, err } else { body = decrypted } } if flags.Has(FlagCompress) { var decoded bytes.Buffer if err := uncompress(body, &decoded); err != nil { return nil, err } else { body = decoded.Bytes() } } return body, nil }
var DecodeMsgFrom = func(rd io.Reader, maxSize uint32, decrypt Encryptor, netMsg *NetMessage) error { var head = NewNetV1Header() body, err := ReadHeadBody(rd, head, maxSize) if err != nil { return err } return DecodeNetMsg(head, body, decrypt, netMsg) }
DecodeMsgFrom decode message from reader
var DecodeNetMsg = func(head NetV1Header, body []byte, decrypt Encryptor, netMsg *NetMessage) error { var err error body, err = DecodeMsgBody(head.Flag(), body, decrypt) if err != nil { return err } netMsg.Seq = head.Seq() netMsg.Command = head.Command() netMsg.Data = body return nil }
var EncodeMsgTo = func(netMsg *NetMessage, encrypt Encryptor, w io.Writer) error { var flags MsgFlag if err := netMsg.Encode(); err != nil { return err } var body = netMsg.Data if len(body) > DefaultCompressThreshold { var encoded bytes.Buffer if err := compress(body, &encoded); err == nil { if encoded.Len() < len(body) { flags |= FlagCompress body = encoded.Bytes() } } else { zlog.Errorf("msg %d compress failed: %v", netMsg.Command, err) } } if encrypt != nil { if encrypted, err := encrypt.Encrypt(body); err == nil { body = encrypted flags |= FlagEncrypt } else { return err } } var bodySize = len(body) if bodySize > MaxPayloadSize { return fmt.Errorf("encoded msg %d size %d/%d overflow", netMsg.Command, bodySize, MaxPayloadSize) } var head = NewNetV1Header() head.Pack(uint32(bodySize+V1HeaderLength), flags, netMsg.Seq, netMsg.Command) var checksum = head.CalcCRC(body) head.SetCRC(checksum) if _, err := w.Write(head); err != nil { return err } if bodySize > 0 { if _, err := w.Write(body); err != nil { return err } } return nil }
EncodeMsgTo encode message to writer
var MessagePackagePrefix = "protos."
var ReadHeadBody = func(rd io.Reader, head NetV1Header, maxSize uint32) ([]byte, error) { if _, err := io.ReadFull(rd, head); err != nil { return nil, err } var nLen = head.Len() if nLen < V1HeaderLength || nLen > maxSize { zlog.Errorf("ReadHeadBody: msg size %d out of range", nLen) return nil, ErrPktSizeOutOfRange } var body []byte if nLen > V1HeaderLength { body = make([]byte, nLen-V1HeaderLength) if _, err := io.ReadFull(rd, body); err != nil { return nil, err } } var checksum = head.CalcCRC(body) if crc := head.CRC(); crc != checksum { zlog.Errorf("ReadHeadBody: msg %v checksum mismatch %x != %x", head.Command(), checksum, crc) return nil, ErrPktChecksumMismatch } return body, nil }
ReadHeadBody read header and body less than `maxSize`
var (
TCPReadTimeout = 300 * time.Second // 默认读超时
)
Functions ¶
func ClearFactory ¶ added in v1.0.10
func ClearFactory()
func CreateMessageByFullName ¶ added in v1.0.10
CreateMessageByFullName 根据消息名称创建消息(使用反射)
func CreateMessageByID ¶ added in v1.0.10
CreateMessageByID 根据消息ID创建消息(使用反射)
func CreateMessageByShortName ¶ added in v1.0.10
func CreateMsgFromJSON ¶ added in v1.0.10
func CreatePairingAck ¶ added in v1.0.10
func DecodeHTTPRequestBody ¶
DecodeHTTPRequestBody 解析http请求的body为json
func FreeNetMessage ¶ added in v1.0.4
func FreeNetMessage(netMsg *NetMessage)
func GetHTTPRequestIP ¶
GetHTTPRequestIP 获取http请求的来源IP
func GetMessageFullName ¶ added in v1.0.10
GetMessageFullName 根据消息ID获取消息名称
func GetMessageId ¶ added in v1.0.10
GetMessageId 根据消息名称获取消息ID
func GetMessageIdOf ¶ added in v1.0.10
GetMessageIdOf 获取proto消息的ID
func GetMessageShortName ¶ added in v1.0.10
func GetMessageType ¶ added in v1.0.10
func GetPairingAckName ¶ added in v1.0.10
GetPairingAckName 根据Req消息名称,返回其对应的Ack消息名称
func GetPairingAckNameOf ¶ added in v1.0.10
func HasValidSuffix ¶ added in v1.0.10
HasValidSuffix 指定的后缀才自动注册
func IsAckMessage ¶ added in v1.0.10
func IsReqMessage ¶ added in v1.0.10
func ReadLenData ¶ added in v1.0.4
ReadLenData 读取长度[2字节]开头的数据
func ReadProtoFromHTTPRequest ¶
ReadProtoFromHTTPRequest 从http请求中读取proto消息
func RegisterAllMessages ¶ added in v1.0.10
func RegisterAllMessages()
RegisterAllMessages 自动注册所有protobuf消息 protobuf使用init()注册(RegisterType),则此API需要在import后调用
func RequestProtoMessage ¶
RequestProtoMessage send req and wait for ack
func TryEnqueueMsg ¶ added in v1.0.3
func TryEnqueueMsg(queue chan<- *NetMessage, msg *NetMessage) bool
TryEnqueueMsg 尝试将消息放入队列,如果队列已满返回false
func WriteLenData ¶ added in v1.0.4
WriteLenData 写入长度[2字节]开头的数据
func WriteProtoHTTPResponse ¶
WriteProtoHTTPResponse 写入proto消息到http响应
Types ¶
type Endpoint ¶
type Endpoint interface { GetNode() NodeID SetNode(NodeID) GetRemoteAddr() string UnderlyingConn() net.Conn GetUserData() any SetEncryption(Encryptor, Encryptor) SetSendQueue(chan *NetMessage) Go(ctx context.Context, reader, writer bool) // 开启read/write线程 SendMsg(*NetMessage, int) error Close() error ForceClose(error) }
Endpoint 网络端点
type EndpointMap ¶
type EndpointMap struct {
// contains filtered or unexported fields
}
EndpointMap 线程安全的Endpoint Map
func NewEndpointMap ¶
func NewEndpointMap() *EndpointMap
func (*EndpointMap) Clear ¶
func (m *EndpointMap) Clear()
func (*EndpointMap) Foreach ¶
func (m *EndpointMap) Foreach(action func(Endpoint) bool)
Foreach action应该对map是read-only
func (*EndpointMap) Get ¶
func (m *EndpointMap) Get(node NodeID) Endpoint
func (*EndpointMap) Has ¶
func (m *EndpointMap) Has(node NodeID) bool
func (*EndpointMap) Init ¶
func (m *EndpointMap) Init() *EndpointMap
func (*EndpointMap) IsEmpty ¶
func (m *EndpointMap) IsEmpty() bool
func (*EndpointMap) Keys ¶
func (m *EndpointMap) Keys() []NodeID
func (*EndpointMap) Put ¶
func (m *EndpointMap) Put(node NodeID, endpoint Endpoint)
func (*EndpointMap) PutIfAbsent ¶
func (m *EndpointMap) PutIfAbsent(node NodeID, endpoint Endpoint)
func (*EndpointMap) Remove ¶
func (m *EndpointMap) Remove(node NodeID)
func (*EndpointMap) Size ¶
func (m *EndpointMap) Size() int
type NetMessage ¶
type NetMessage struct { CreatedAt int64 `json:"created_at,omitempty"` // microseconds Command uint32 `json:"cmd"` Seq uint32 `json:"seq,omitempty"` Data []byte `json:"data,omitempty"` Body proto.Message `json:"body,omitempty"` Session Endpoint `json:"-"` }
func AllocNetMessage ¶
func AllocNetMessage() *NetMessage
func CreateNetMessage ¶ added in v1.0.3
func CreateNetMessage(cmd, seq uint32, body proto.Message) *NetMessage
func CreateNetMessageWith ¶ added in v1.0.3
func CreateNetMessageWith(body proto.Message) *NetMessage
func NewNetMessage ¶
func NewNetMessage(cmd, seq uint32, data []byte) *NetMessage
func TryDequeueMsg ¶ added in v1.0.3
func TryDequeueMsg(queue <-chan *NetMessage) *NetMessage
TryDequeueMsg 尝试从队列中取出消息,如果队列为空返回nil
func (*NetMessage) Clone ¶
func (m *NetMessage) Clone() *NetMessage
func (*NetMessage) DecodeTo ¶
func (m *NetMessage) DecodeTo(msg proto.Message) error
DecodeTo decode `Data` to `msg`
func (*NetMessage) Reset ¶
func (m *NetMessage) Reset()
type NetV1Header ¶ added in v1.0.3
type NetV1Header []byte
NetV1Header 协议头
func NewNetV1Header ¶ added in v1.0.3
func NewNetV1Header() NetV1Header
func (NetV1Header) CRC ¶ added in v1.0.3
func (h NetV1Header) CRC() uint32
func (NetV1Header) CalcCRC ¶ added in v1.0.3
func (h NetV1Header) CalcCRC(body []byte) uint32
CalcCRC checksum = f(head) and f(body)
func (NetV1Header) Clear ¶ added in v1.0.5
func (h NetV1Header) Clear()
func (NetV1Header) Command ¶ added in v1.0.3
func (h NetV1Header) Command() uint32
func (NetV1Header) Flag ¶ added in v1.0.3
func (h NetV1Header) Flag() MsgFlag
func (NetV1Header) Len ¶ added in v1.0.3
func (h NetV1Header) Len() uint32
func (NetV1Header) Pack ¶ added in v1.0.3
func (h NetV1Header) Pack(size uint32, flag MsgFlag, seq, cmd uint32)
func (NetV1Header) Seq ¶ added in v1.0.3
func (h NetV1Header) Seq() uint32
func (NetV1Header) SetCRC ¶ added in v1.0.3
func (h NetV1Header) SetCRC(v uint32)
type NodeID ¶
type NodeID uint64
NodeID 节点ID 一个64位整数表示的节点号,用以标识一个service,低32位为服务实例编号,32-48位为服务类型; 或者一个客户端session,低32位为GATE内部的session编号,32-48位为GATE编号;
func MakeBackendNode ¶ added in v1.0.4
MakeBackendNode 根据服务号和实例号创建一个节点ID
func MakeGateSession ¶ added in v1.0.4
MakeGateSession `instance`指GATE的实例编号,限定为16位
type SessionMessage ¶
type StreamConnBase ¶
type StreamConnBase struct { Node NodeID // Running atomic.Bool // RecvQueue chan<- *NetMessage // 收消息队列 SendQueue chan *NetMessage // 发消息队列 ErrChan chan *Error // error signal Encrypt Encryptor // 加密 Decrypt Encryptor // 解密 RemoteAddr string // 缓存的远端地址 Userdata any // user data }
StreamConnBase base stream connection
func (*StreamConnBase) GetNode ¶ added in v1.0.3
func (c *StreamConnBase) GetNode() NodeID
func (*StreamConnBase) GetRemoteAddr ¶ added in v1.0.3
func (c *StreamConnBase) GetRemoteAddr() string
func (*StreamConnBase) GetUserData ¶ added in v1.0.3
func (c *StreamConnBase) GetUserData() any
func (*StreamConnBase) IsRunning ¶
func (c *StreamConnBase) IsRunning() bool
func (*StreamConnBase) SendMsg ¶
func (c *StreamConnBase) SendMsg(msg *NetMessage, mode int) error
func (*StreamConnBase) SendNonBlock ¶
func (c *StreamConnBase) SendNonBlock(msg *NetMessage) bool
func (*StreamConnBase) SetEncryption ¶
func (c *StreamConnBase) SetEncryption(encrypt, decrypt Encryptor)
func (*StreamConnBase) SetNode ¶
func (c *StreamConnBase) SetNode(node NodeID)
func (*StreamConnBase) SetSendQueue ¶
func (c *StreamConnBase) SetSendQueue(sendQueue chan *NetMessage)
func (*StreamConnBase) SetUserData ¶
func (c *StreamConnBase) SetUserData(val any)
type TcpSession ¶
type TcpSession struct { StreamConnBase // contains filtered or unexported fields }
func NewTcpSession ¶
func NewTcpSession(conn net.Conn, sendQSize int) *TcpSession
func (*TcpSession) Close ¶
func (t *TcpSession) Close() error
func (*TcpSession) ForceClose ¶
func (t *TcpSession) ForceClose(reason error)
func (*TcpSession) SetIntranet ¶
func (t *TcpSession) SetIntranet(v bool)
func (*TcpSession) UnderlyingConn ¶
func (t *TcpSession) UnderlyingConn() net.Conn
type WebsockSession ¶ added in v1.0.10
type WebsockSession struct { StreamConnBase // contains filtered or unexported fields }
func NewWebsockSession ¶ added in v1.0.10
func NewWebsockSession(conn *websocket.Conn, sendQSize int) *WebsockSession
func (*WebsockSession) Close ¶ added in v1.0.10
func (t *WebsockSession) Close() error
func (*WebsockSession) ForceClose ¶ added in v1.0.10
func (t *WebsockSession) ForceClose(reason error)
func (*WebsockSession) Go ¶ added in v1.0.10
func (t *WebsockSession) Go(ctx context.Context, reader, writer bool)
func (*WebsockSession) ReadMessage ¶ added in v1.0.10
func (t *WebsockSession) ReadMessage(netMsg *NetMessage) error
func (*WebsockSession) SetIntranet ¶ added in v1.0.10
func (t *WebsockSession) SetIntranet(v bool)
func (*WebsockSession) UnderlyingConn ¶ added in v1.0.10
func (t *WebsockSession) UnderlyingConn() net.Conn
type WsRecvMsg ¶ added in v1.0.10
type WsRecvMsg struct { Cmd string `json:"cmd"` Seq uint32 `json:"seq,omitempty"` Body json.RawMessage `json:"body,omitempty"` }
type WsWriteMsg ¶ added in v1.0.10
type WsWriteMsg struct { Cmd string `json:"cmd"` Seq uint32 `json:"seq,omitempty"` Body proto.Message `json:"body,omitempty"` }
func CreateJSONFromProto ¶ added in v1.0.10
func CreateJSONFromProto(msgId uint32, data []byte) (*WsWriteMsg, error)