qnet

package
v1.0.10 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: BSD-3-Clause Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	V1HeaderLength = 16                             // 消息头大小
	MaxPacketSize  = 0x00FFFFFF                     // 最大消息大小,~16MB
	MaxPayloadSize = MaxPacketSize - V1HeaderLength //
)
View Source
const (
	SendBlock    = 0
	SendNonblock = 1
)
View Source
const (
	DefaultRecvQueueSize        = 1 << 16
	DefaultBackendSendQueueSize = 1 << 15
	DefaultSessionSendQueueSize = 1 << 10
	DefaultBacklogSize          = 128
	DefaultErrorChanSize        = 64
)
View Source
const (
	NodeInstanceShift          = 32
	NodeTypeShift              = 48
	MaxNodeInstance            = (1 << NodeInstanceShift) - 1
	NodeBackendTypeMask NodeID = 1 << NodeTypeShift
)
View Source
const ErrCodeField = "Code"
View Source
const PrefixLength = 2 // sizeof uint16
View Source
const (
	UrlFormKey = "pb-data"
)
View Source
const WriteBufferReuseSize = 1 << 14

Variables

View Source
var (
	DefaultCompressThreshold = 1 << 12 // 压缩阈值,4KB
	MaxClientUpStreamSize    = 1 << 18 // 最大client上行消息大小,256KB
)
View Source
var (
	ErrPktSizeOutOfRange    = errors.New("packet size out of range")
	ErrPktChecksumMismatch  = errors.New("packet checksum mismatch")
	ErrCannotDecryptPkt     = errors.New("cannot decrypt packet")
	ErrConnNotRunning       = errors.New("connection not running")
	ErrConnOutboundOverflow = errors.New("connection outbound queue overflow")
	ErrConnForceClose       = errors.New("connection forced to close")
	ErrBufferOutOfRange     = errors.New("buffer out of range")
	ErrConnClosed           = errors.New("connection is closed")
	ErrInvalidWSMsgType     = errors.New("invalid websocket message type")
)
View Source
var DecodeMsgBody = func(flags MsgFlag, body []byte, decrypt Encryptor) ([]byte, error) {
	if flags.Has(FlagEncrypt) {
		if decrypt == nil {
			return nil, ErrCannotDecryptPkt
		}
		if decrypted, err := decrypt.Decrypt(body); err != nil {
			return nil, err
		} else {
			body = decrypted
		}
	}
	if flags.Has(FlagCompress) {
		var decoded bytes.Buffer
		if err := uncompress(body, &decoded); err != nil {
			return nil, err
		} else {
			body = decoded.Bytes()
		}
	}
	return body, nil
}
View Source
var DecodeMsgFrom = func(rd io.Reader, maxSize uint32, decrypt Encryptor, netMsg *NetMessage) error {
	var head = NewNetV1Header()
	body, err := ReadHeadBody(rd, head, maxSize)
	if err != nil {
		return err
	}
	return DecodeNetMsg(head, body, decrypt, netMsg)
}

DecodeMsgFrom decode message from reader

View Source
var DecodeNetMsg = func(head NetV1Header, body []byte, decrypt Encryptor, netMsg *NetMessage) error {
	var err error
	body, err = DecodeMsgBody(head.Flag(), body, decrypt)
	if err != nil {
		return err
	}
	netMsg.Seq = head.Seq()
	netMsg.Command = head.Command()
	netMsg.Data = body
	return nil
}
View Source
var EncodeMsgTo = func(netMsg *NetMessage, encrypt Encryptor, w io.Writer) error {
	var flags MsgFlag
	if err := netMsg.Encode(); err != nil {
		return err
	}
	var body = netMsg.Data
	if len(body) > DefaultCompressThreshold {
		var encoded bytes.Buffer
		if err := compress(body, &encoded); err == nil {
			if encoded.Len() < len(body) {
				flags |= FlagCompress
				body = encoded.Bytes()
			}
		} else {
			zlog.Errorf("msg %d compress failed: %v", netMsg.Command, err)
		}
	}
	if encrypt != nil {
		if encrypted, err := encrypt.Encrypt(body); err == nil {
			body = encrypted
			flags |= FlagEncrypt
		} else {
			return err
		}
	}

	var bodySize = len(body)
	if bodySize > MaxPayloadSize {
		return fmt.Errorf("encoded msg %d size %d/%d overflow", netMsg.Command, bodySize, MaxPayloadSize)
	}

	var head = NewNetV1Header()
	head.Pack(uint32(bodySize+V1HeaderLength), flags, netMsg.Seq, netMsg.Command)
	var checksum = head.CalcCRC(body)
	head.SetCRC(checksum)

	if _, err := w.Write(head); err != nil {
		return err
	}
	if bodySize > 0 {
		if _, err := w.Write(body); err != nil {
			return err
		}
	}
	return nil
}

EncodeMsgTo encode message to writer

View Source
var MessagePackagePrefix = "protos."
View Source
var ReadHeadBody = func(rd io.Reader, head NetV1Header, maxSize uint32) ([]byte, error) {
	if _, err := io.ReadFull(rd, head); err != nil {
		return nil, err
	}
	var nLen = head.Len()
	if nLen < V1HeaderLength || nLen > maxSize {
		zlog.Errorf("ReadHeadBody: msg size %d out of range", nLen)
		return nil, ErrPktSizeOutOfRange
	}
	var body []byte
	if nLen > V1HeaderLength {
		body = make([]byte, nLen-V1HeaderLength)
		if _, err := io.ReadFull(rd, body); err != nil {
			return nil, err
		}
	}
	var checksum = head.CalcCRC(body)
	if crc := head.CRC(); crc != checksum {
		zlog.Errorf("ReadHeadBody: msg %v checksum mismatch %x != %x", head.Command(), checksum, crc)
		return nil, ErrPktChecksumMismatch
	}
	return body, nil
}

ReadHeadBody read header and body less than `maxSize`

View Source
var (
	TCPReadTimeout = 300 * time.Second // 默认读超时
)

Functions

func ClearFactory added in v1.0.10

func ClearFactory()

func CreateMessageByFullName added in v1.0.10

func CreateMessageByFullName(fullName string) proto.Message

CreateMessageByFullName 根据消息名称创建消息(使用反射)

func CreateMessageByID added in v1.0.10

func CreateMessageByID(msgId uint32) proto.Message

CreateMessageByID 根据消息ID创建消息(使用反射)

func CreateMessageByShortName added in v1.0.10

func CreateMessageByShortName(name string) proto.Message

func CreateMsgFromJSON added in v1.0.10

func CreateMsgFromJSON(text []byte) (proto.Message, error)

func CreatePairingAck added in v1.0.10

func CreatePairingAck(fullReqName string) proto.Message

func DecodeHTTPRequestBody

func DecodeHTTPRequestBody(req *http.Request, ptr interface{}) error

DecodeHTTPRequestBody 解析http请求的body为json

func FreeNetMessage added in v1.0.4

func FreeNetMessage(netMsg *NetMessage)

func GetHTTPRequestIP

func GetHTTPRequestIP(req *http.Request) string

GetHTTPRequestIP 获取http请求的来源IP

func GetLocalIPList

func GetLocalIPList() []net.IP

GetLocalIPList 获取本地IP列表

func GetMessageFullName added in v1.0.10

func GetMessageFullName(msgId uint32) string

GetMessageFullName 根据消息ID获取消息名称

func GetMessageId added in v1.0.10

func GetMessageId(fullName string) uint32

GetMessageId 根据消息名称获取消息ID

func GetMessageIdOf added in v1.0.10

func GetMessageIdOf(msg proto.Message) uint32

GetMessageIdOf 获取proto消息的ID

func GetMessageShortName added in v1.0.10

func GetMessageShortName(msgId uint32) string

func GetMessageType added in v1.0.10

func GetMessageType(msgId uint32) reflect.Type

func GetPairingAckName added in v1.0.10

func GetPairingAckName(fullReqName string) string

GetPairingAckName 根据Req消息名称,返回其对应的Ack消息名称

func GetPairingAckNameOf added in v1.0.10

func GetPairingAckNameOf(msgId uint32) string

func HasValidSuffix added in v1.0.10

func HasValidSuffix(name string) bool

HasValidSuffix 指定的后缀才自动注册

func HashMsgName added in v1.0.10

func HashMsgName(name string) uint32

HashMsgName 计算字符串的hash值

func IsAckMessage added in v1.0.10

func IsAckMessage(name string) bool

func IsReqMessage added in v1.0.10

func IsReqMessage(name string) bool

func ReadLenData added in v1.0.4

func ReadLenData(r io.Reader, maxSize uint16) ([]byte, error)

ReadLenData 读取长度[2字节]开头的数据

func ReadProtoFromHTTPRequest

func ReadProtoFromHTTPRequest(req *http.Request, msg proto.Message) error

ReadProtoFromHTTPRequest 从http请求中读取proto消息

func ReadProtoMessage

func ReadProtoMessage(conn net.Conn, msg proto.Message) error

func Register added in v1.0.10

func Register(fullname string) error

func RegisterAllMessages added in v1.0.10

func RegisterAllMessages()

RegisterAllMessages 自动注册所有protobuf消息 protobuf使用init()注册(RegisterType),则此API需要在import后调用

func RequestProtoMessage

func RequestProtoMessage(conn net.Conn, req, ack proto.Message) error

RequestProtoMessage send req and wait for ack

func TryEnqueueMsg added in v1.0.3

func TryEnqueueMsg(queue chan<- *NetMessage, msg *NetMessage) bool

TryEnqueueMsg 尝试将消息放入队列,如果队列已满返回false

func WriteLenData added in v1.0.4

func WriteLenData(w io.Writer, body []byte) error

WriteLenData 写入长度[2字节]开头的数据

func WriteProtoHTTPResponse

func WriteProtoHTTPResponse(w http.ResponseWriter, msg proto.Message, contentType string) error

WriteProtoHTTPResponse 写入proto消息到http响应

func WriteProtoMessage

func WriteProtoMessage(w io.Writer, msg proto.Message) error

Types

type Encryptor

type Encryptor interface {
	Encrypt([]byte) ([]byte, error)
	Decrypt([]byte) ([]byte, error)
}

type Endpoint

type Endpoint interface {
	GetNode() NodeID
	SetNode(NodeID)
	GetRemoteAddr() string
	UnderlyingConn() net.Conn

	GetUserData() any
	SetEncryption(Encryptor, Encryptor)
	SetSendQueue(chan *NetMessage)
	Go(ctx context.Context, reader, writer bool) // 开启read/write线程

	SendMsg(*NetMessage, int) error
	Close() error
	ForceClose(error)
}

Endpoint 网络端点

type EndpointMap

type EndpointMap struct {
	// contains filtered or unexported fields
}

EndpointMap 线程安全的Endpoint Map

func NewEndpointMap

func NewEndpointMap() *EndpointMap

func (*EndpointMap) Clear

func (m *EndpointMap) Clear()

func (*EndpointMap) Foreach

func (m *EndpointMap) Foreach(action func(Endpoint) bool)

Foreach action应该对map是read-only

func (*EndpointMap) Get

func (m *EndpointMap) Get(node NodeID) Endpoint

func (*EndpointMap) Has

func (m *EndpointMap) Has(node NodeID) bool

func (*EndpointMap) Init

func (m *EndpointMap) Init() *EndpointMap

func (*EndpointMap) IsEmpty

func (m *EndpointMap) IsEmpty() bool

func (*EndpointMap) Keys

func (m *EndpointMap) Keys() []NodeID

func (*EndpointMap) Put

func (m *EndpointMap) Put(node NodeID, endpoint Endpoint)

func (*EndpointMap) PutIfAbsent

func (m *EndpointMap) PutIfAbsent(node NodeID, endpoint Endpoint)

func (*EndpointMap) Remove

func (m *EndpointMap) Remove(node NodeID)

func (*EndpointMap) Size

func (m *EndpointMap) Size() int

type Error

type Error struct {
	Err      error
	Endpoint Endpoint
}

func NewError

func NewError(err error, endpoint Endpoint) *Error

func (Error) Error

func (e Error) Error() string

type MsgFlag

type MsgFlag uint8
const (
	FlagCompress MsgFlag = 0x01
	FlagEncrypt  MsgFlag = 0x02
	FlagExtent   MsgFlag = 0x08
	FlagError    MsgFlag = 0x10
	FlagCache    MsgFlag = 0x20
)

func (MsgFlag) Clear

func (g MsgFlag) Clear(n MsgFlag) MsgFlag

func (MsgFlag) Has

func (g MsgFlag) Has(n MsgFlag) bool

type NetMessage

type NetMessage struct {
	CreatedAt int64         `json:"created_at,omitempty"` // microseconds
	Command   uint32        `json:"cmd"`
	Seq       uint32        `json:"seq,omitempty"`
	Data      []byte        `json:"data,omitempty"`
	Body      proto.Message `json:"body,omitempty"`
	Session   Endpoint      `json:"-"`
}

func AllocNetMessage

func AllocNetMessage() *NetMessage

func CreateNetMessage added in v1.0.3

func CreateNetMessage(cmd, seq uint32, body proto.Message) *NetMessage

func CreateNetMessageWith added in v1.0.3

func CreateNetMessageWith(body proto.Message) *NetMessage

func NewNetMessage

func NewNetMessage(cmd, seq uint32, data []byte) *NetMessage

func TryDequeueMsg added in v1.0.3

func TryDequeueMsg(queue <-chan *NetMessage) *NetMessage

TryDequeueMsg 尝试从队列中取出消息,如果队列为空返回nil

func (*NetMessage) Ack added in v1.0.4

func (m *NetMessage) Ack(ack proto.Message) error

func (*NetMessage) Clone

func (m *NetMessage) Clone() *NetMessage

func (*NetMessage) DecodeTo

func (m *NetMessage) DecodeTo(msg proto.Message) error

DecodeTo decode `Data` to `msg`

func (*NetMessage) Encode

func (m *NetMessage) Encode() error

Encode encode `Body` to `Data`

func (*NetMessage) Refuse

func (m *NetMessage) Refuse(ec int32) error

Refuse 返回一个带错误码的Ack

func (*NetMessage) Reply

func (m *NetMessage) Reply(cmd uint32, data []byte) error

func (*NetMessage) Reset

func (m *NetMessage) Reset()

type NetV1Header added in v1.0.3

type NetV1Header []byte

NetV1Header 协议头

func NewNetV1Header added in v1.0.3

func NewNetV1Header() NetV1Header

func (NetV1Header) CRC added in v1.0.3

func (h NetV1Header) CRC() uint32

func (NetV1Header) CalcCRC added in v1.0.3

func (h NetV1Header) CalcCRC(body []byte) uint32

CalcCRC checksum = f(head) and f(body)

func (NetV1Header) Clear added in v1.0.5

func (h NetV1Header) Clear()

func (NetV1Header) Command added in v1.0.3

func (h NetV1Header) Command() uint32

func (NetV1Header) Flag added in v1.0.3

func (h NetV1Header) Flag() MsgFlag

func (NetV1Header) Len added in v1.0.3

func (h NetV1Header) Len() uint32

func (NetV1Header) Pack added in v1.0.3

func (h NetV1Header) Pack(size uint32, flag MsgFlag, seq, cmd uint32)

func (NetV1Header) Seq added in v1.0.3

func (h NetV1Header) Seq() uint32

func (NetV1Header) SetCRC added in v1.0.3

func (h NetV1Header) SetCRC(v uint32)

type NodeID

type NodeID uint64

NodeID 节点ID 一个64位整数表示的节点号,用以标识一个service,低32位为服务实例编号,32-48位为服务类型; 或者一个客户端session,低32位为GATE内部的session编号,32-48位为GATE编号;

func MakeBackendNode added in v1.0.4

func MakeBackendNode(service uint16, instance uint32) NodeID

MakeBackendNode 根据服务号和实例号创建一个节点ID

func MakeGateSession added in v1.0.4

func MakeGateSession(instance uint16, session uint32) NodeID

MakeGateSession `instance`指GATE的实例编号,限定为16位

func (NodeID) GateID

func (n NodeID) GateID() uint16

GateID client会话的网关ID

func (NodeID) Instance

func (n NodeID) Instance() uint32

Instance 节点的实例编号

func (NodeID) IsBackend

func (n NodeID) IsBackend() bool

IsBackend 是否backend节点

func (NodeID) IsSession

func (n NodeID) IsSession() bool

IsSession 是否client会话

func (NodeID) Service

func (n NodeID) Service() int16

Service 服务型

func (NodeID) Session added in v1.0.4

func (n NodeID) Session() uint32

func (NodeID) String

func (n NodeID) String() string

type NodeIDSet

type NodeIDSet = []NodeID

NodeIDSet 没有重复ID的有序集合

type SessionMessage

type SessionMessage struct {
	Session Endpoint
	MsgId   uint32
	MsgBody proto.Message
}

type StreamConnBase

type StreamConnBase struct {
	Node       NodeID             //
	Running    atomic.Bool        //
	RecvQueue  chan<- *NetMessage // 收消息队列
	SendQueue  chan *NetMessage   // 发消息队列
	ErrChan    chan *Error        // error signal
	Encrypt    Encryptor          // 加密
	Decrypt    Encryptor          // 解密
	RemoteAddr string             // 缓存的远端地址
	Userdata   any                // user data
}

StreamConnBase base stream connection

func (*StreamConnBase) GetNode added in v1.0.3

func (c *StreamConnBase) GetNode() NodeID

func (*StreamConnBase) GetRemoteAddr added in v1.0.3

func (c *StreamConnBase) GetRemoteAddr() string

func (*StreamConnBase) GetUserData added in v1.0.3

func (c *StreamConnBase) GetUserData() any

func (*StreamConnBase) IsRunning

func (c *StreamConnBase) IsRunning() bool

func (*StreamConnBase) SendMsg

func (c *StreamConnBase) SendMsg(msg *NetMessage, mode int) error

func (*StreamConnBase) SendNonBlock

func (c *StreamConnBase) SendNonBlock(msg *NetMessage) bool

func (*StreamConnBase) SetEncryption

func (c *StreamConnBase) SetEncryption(encrypt, decrypt Encryptor)

func (*StreamConnBase) SetNode

func (c *StreamConnBase) SetNode(node NodeID)

func (*StreamConnBase) SetSendQueue

func (c *StreamConnBase) SetSendQueue(sendQueue chan *NetMessage)

func (*StreamConnBase) SetUserData

func (c *StreamConnBase) SetUserData(val any)

type TcpSession

type TcpSession struct {
	StreamConnBase
	// contains filtered or unexported fields
}

func NewTcpSession

func NewTcpSession(conn net.Conn, sendQSize int) *TcpSession

func (*TcpSession) Close

func (t *TcpSession) Close() error

func (*TcpSession) ForceClose

func (t *TcpSession) ForceClose(reason error)

func (*TcpSession) Go

func (t *TcpSession) Go(ctx context.Context, reader, writer bool)

func (*TcpSession) SetIntranet

func (t *TcpSession) SetIntranet(v bool)

func (*TcpSession) UnderlyingConn

func (t *TcpSession) UnderlyingConn() net.Conn

type WebsockSession added in v1.0.10

type WebsockSession struct {
	StreamConnBase
	// contains filtered or unexported fields
}

func NewWebsockSession added in v1.0.10

func NewWebsockSession(conn *websocket.Conn, sendQSize int) *WebsockSession

func (*WebsockSession) Close added in v1.0.10

func (t *WebsockSession) Close() error

func (*WebsockSession) ForceClose added in v1.0.10

func (t *WebsockSession) ForceClose(reason error)

func (*WebsockSession) Go added in v1.0.10

func (t *WebsockSession) Go(ctx context.Context, reader, writer bool)

func (*WebsockSession) ReadMessage added in v1.0.10

func (t *WebsockSession) ReadMessage(netMsg *NetMessage) error

func (*WebsockSession) SetIntranet added in v1.0.10

func (t *WebsockSession) SetIntranet(v bool)

func (*WebsockSession) UnderlyingConn added in v1.0.10

func (t *WebsockSession) UnderlyingConn() net.Conn

type WsRecvMsg added in v1.0.10

type WsRecvMsg struct {
	Cmd  string          `json:"cmd"`
	Seq  uint32          `json:"seq,omitempty"`
	Body json.RawMessage `json:"body,omitempty"`
}

type WsWriteMsg added in v1.0.10

type WsWriteMsg struct {
	Cmd  string        `json:"cmd"`
	Seq  uint32        `json:"seq,omitempty"`
	Body proto.Message `json:"body,omitempty"`
}

func CreateJSONFromProto added in v1.0.10

func CreateJSONFromProto(msgId uint32, data []byte) (*WsWriteMsg, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL