Documentation
¶
Index ¶
- Constants
- Variables
- func DecodeMetadata(l uint32, data []byte) (map[string]string, error)
- func EncodeMetadata(m map[string]string, bb *bytes.Buffer)
- func PutMsgData(msgData *MsgData)
- func PutNsqData(nsqData *NsqData)
- func PutTask(task STask)
- func SetTopicEnumName(v map[int32]string)
- func SetTopicEnumValue(v map[string]int32)
- type BaseRequest
- func (br *BaseRequest) Abort()
- func (br *BaseRequest) BindRouter(router SRouter)
- func (br *BaseRequest) Call() error
- func (br *BaseRequest) Get(key string) (value interface{}, exists bool)
- func (br *BaseRequest) GetCmd() uint16
- func (br *BaseRequest) GetConnection() SConnection
- func (br *BaseRequest) GetData() []byte
- func (br *BaseRequest) GetMessage() SMsg
- func (br *BaseRequest) GetMsgID() int32
- func (br *BaseRequest) Set(key string, value interface{})
- type BaseRouter
- type ConnManager
- func (connMgr *ConnManager) Add(conn SConnection)
- func (connMgr *ConnManager) ClearConn()
- func (connMgr *ConnManager) Get(connID uint64) (SConnection, error)
- func (connMgr *ConnManager) Get2(strConnId string) (SConnection, error)
- func (connMgr *ConnManager) GetAllConnID() []uint64
- func (connMgr *ConnManager) GetAllConnIdStr() []string
- func (connMgr *ConnManager) Len() int
- func (connMgr *ConnManager) Range(cb func(uint64, SConnection, interface{}) error, args interface{}) (err error)
- func (connMgr *ConnManager) Range2(cb func(string, SConnection, interface{}) error, args interface{}) (err error)
- func (connMgr *ConnManager) Remove(conn SConnection)
- type Connection
- func (bc *Connection) Context() context.Context
- func (bc *Connection) GetConnID() uint64
- func (bc *Connection) GetConnIdStr() string
- func (bc *Connection) GetConnVersion() int32
- func (bc *Connection) GetProperty(key string) (string, error)
- func (bc *Connection) GetTaskHandler() STaskHandler
- func (bc *Connection) HasFrameDecoder() bool
- func (bc *Connection) IsAlive() bool
- func (bc *Connection) LocalAddr() net.Addr
- func (bc *Connection) LocalAddrString() string
- func (bc *Connection) RemoteAddr() net.Addr
- func (bc *Connection) RemoteAddrString() string
- func (bc *Connection) RemoveProperty(key string)
- func (bc *Connection) SendData(data []byte) error
- func (bc *Connection) SendMsg(msg SMsg) error
- func (bc *Connection) SetHeartBeat(checker SHeartbeatChecker)
- func (bc *Connection) SetProperty(key string, value string)
- func (bc *Connection) Start()
- func (bc *Connection) StartReader()
- func (bc *Connection) Stop()
- type HandleStep
- type HeartBeatFunc
- type HeartBeatMsgFunc
- type HeartBeatOption
- type MsgData
- type NSQMsg
- func (m *NSQMsg) GetCmd() uint16
- func (m *NSQMsg) GetCompressType() smsg.CompressType
- func (m *NSQMsg) GetData() []byte
- func (m *NSQMsg) GetHasFrameDecoder() bool
- func (m *NSQMsg) GetMessageType() smsg.MessageType
- func (m *NSQMsg) GetMeta() map[string]string
- func (m *NSQMsg) GetMsgId() int32
- func (m *NSQMsg) GetNsqMessage() *nsq.Message
- func (m *NSQMsg) GetRet() uint16
- func (m *NSQMsg) GetSeq() uint64
- func (m *NSQMsg) GetSerializeType() smsg.SerializeType
- func (m *NSQMsg) GetVersion() uint8
- func (m *NSQMsg) SetHasFrameDecoder(hasFrameDevoder bool)
- func (m *NSQMsg) SetNsqMessage(message *nsq.Message)
- type Nsq
- type NsqConsumer
- type NsqData
- type NsqDataPack
- type NsqProducer
- type OnRemoteNotAlive
- type QuicConn
- func (c *QuicConn) Close() error
- func (c *QuicConn) LocalAddr() net.Addr
- func (c *QuicConn) Read(b []byte) (int, error)
- func (c *QuicConn) RemoteAddr() net.Addr
- func (c *QuicConn) SetDeadline(t time.Time) error
- func (c *QuicConn) SetReadDeadline(t time.Time) error
- func (c *QuicConn) SetWriteDeadline(t time.Time) error
- func (c *QuicConn) Write(b []byte) (int, error)
- type QuicListener
- type SConnManager
- type SConnection
- type SDataPack
- type SFrameDecoder
- type SFuncTask
- type SHeartbeatChecker
- type SMsg
- type SRouter
- type STask
- type STaskHandler
- type Task
- type TaskHandler
- type TopicEnum
- type WsConnection
Constants ¶
const (
HeartBeatDefaultMsgID uint16 = 1
)
Variables ¶
var ErrMetaKVMissing = errors.New("wrong metadata lines. some keys or values are missing")
var MsgDataPool = new(sync.Pool)
var NsqDataPackObj = new(NsqDataPack)
var NsqDataPool = new(sync.Pool)
var TaskPool = new(sync.Pool)
var TopicEnum_name = map[int32]string{}
var TopicEnum_value = map[string]int32{}
Functions ¶
func EncodeMetadata ¶
len,string,len,string,......
func PutMsgData ¶
func PutMsgData(msgData *MsgData)
func PutNsqData ¶
func PutNsqData(nsqData *NsqData)
func SetTopicEnumName ¶
func SetTopicEnumValue ¶
Types ¶
type BaseRequest ¶
type BaseRequest struct{}
func (*BaseRequest) Abort ¶
func (br *BaseRequest) Abort()
func (*BaseRequest) BindRouter ¶
func (br *BaseRequest) BindRouter(router SRouter)
func (*BaseRequest) Call ¶
func (br *BaseRequest) Call() error
func (*BaseRequest) Get ¶
func (br *BaseRequest) Get(key string) (value interface{}, exists bool)
func (*BaseRequest) GetCmd ¶
func (br *BaseRequest) GetCmd() uint16
func (*BaseRequest) GetConnection ¶
func (br *BaseRequest) GetConnection() SConnection
func (*BaseRequest) GetData ¶
func (br *BaseRequest) GetData() []byte
func (*BaseRequest) GetMessage ¶
func (br *BaseRequest) GetMessage() SMsg
func (*BaseRequest) GetMsgID ¶
func (br *BaseRequest) GetMsgID() int32
func (*BaseRequest) Set ¶
func (br *BaseRequest) Set(key string, value interface{})
type BaseRouter ¶
type BaseRouter struct{}
type ConnManager ¶
type ConnManager struct {
Connections utils.ShardLockMaps
}
func NewConnManager ¶
func NewConnManager() *ConnManager
func (*ConnManager) Add ¶
func (connMgr *ConnManager) Add(conn SConnection)
func (*ConnManager) ClearConn ¶
func (connMgr *ConnManager) ClearConn()
func (*ConnManager) Get ¶
func (connMgr *ConnManager) Get(connID uint64) (SConnection, error)
func (*ConnManager) Get2 ¶
func (connMgr *ConnManager) Get2(strConnId string) (SConnection, error)
Get2 It is recommended to use this method to obtain connection instances
func (*ConnManager) GetAllConnID ¶
func (connMgr *ConnManager) GetAllConnID() []uint64
func (*ConnManager) GetAllConnIdStr ¶
func (connMgr *ConnManager) GetAllConnIdStr() []string
func (*ConnManager) Len ¶
func (connMgr *ConnManager) Len() int
func (*ConnManager) Range ¶
func (connMgr *ConnManager) Range(cb func(uint64, SConnection, interface{}) error, args interface{}) (err error)
func (*ConnManager) Range2 ¶
func (connMgr *ConnManager) Range2(cb func(string, SConnection, interface{}) error, args interface{}) (err error)
Range2 It is recommended to use this method to 'Range'
func (*ConnManager) Remove ¶
func (connMgr *ConnManager) Remove(conn SConnection)
type Connection ¶
type Connection struct { Conn net.Conn // The ID of the current connection, also known as SessionID, globally unique, used by server Connection // uint64 range: 0~18,446,744,073,709,551,615 // This is the maximum number of connID theoretically supported by the process // (当前连接的ID 也可以称作为SessionID,ID全局唯一 ,服务端Connection使用 // uint64 取值范围:0 ~ 18,446,744,073,709,551,615 // 这个是理论支持的进程connID的最大数量) ConnID uint64 // connection id for string // (字符串的连接id) ConnIdStr string // 连接版本 ConnVersion int32 // The message management module that manages MsgID and the corresponding processing method // (消息管理MsgID和对应处理方法的消息管理模块) TaskHandler STaskHandler // onConnStart is the Hook function when the current connection is created. // (当前连接创建时Hook函数) OnConnStart func(conn SConnection) // onConnStop is the Hook function when the current connection is created. // (当前连接断开时的Hook函数) OnConnStop func(conn SConnection) // frameDecoder is the decoder for splitting or splicing data packets. // (断粘包解码器) FrameDecoder SFrameDecoder Datapack SDataPack // property is the connection attribute. (链接属性) Property map[string]string IOReadBuffSize uint32 // contains filtered or unexported fields }
func (*Connection) Context ¶
func (bc *Connection) Context() context.Context
func (*Connection) GetConnID ¶
func (bc *Connection) GetConnID() uint64
func (*Connection) GetConnIdStr ¶
func (bc *Connection) GetConnIdStr() string
func (*Connection) GetConnVersion ¶
func (bc *Connection) GetConnVersion() int32
func (*Connection) GetProperty ¶
func (bc *Connection) GetProperty(key string) (string, error)
func (*Connection) GetTaskHandler ¶
func (bc *Connection) GetTaskHandler() STaskHandler
func (*Connection) HasFrameDecoder ¶
func (bc *Connection) HasFrameDecoder() bool
func (*Connection) IsAlive ¶
func (bc *Connection) IsAlive() bool
func (*Connection) LocalAddr ¶
func (bc *Connection) LocalAddr() net.Addr
func (*Connection) LocalAddrString ¶
func (bc *Connection) LocalAddrString() string
func (*Connection) RemoteAddr ¶
func (bc *Connection) RemoteAddr() net.Addr
func (*Connection) RemoteAddrString ¶
func (bc *Connection) RemoteAddrString() string
func (*Connection) RemoveProperty ¶
func (bc *Connection) RemoveProperty(key string)
func (*Connection) SendData ¶
func (bc *Connection) SendData(data []byte) error
func (*Connection) SendMsg ¶
func (bc *Connection) SendMsg(msg SMsg) error
func (*Connection) SetHeartBeat ¶
func (bc *Connection) SetHeartBeat(checker SHeartbeatChecker)
func (*Connection) SetProperty ¶
func (bc *Connection) SetProperty(key string, value string)
func (*Connection) StartReader ¶
func (bc *Connection) StartReader()
func (*Connection) Stop ¶
func (bc *Connection) Stop()
type HandleStep ¶
type HandleStep int
const ( PRE_HANDLE HandleStep = iota // PreHandle for pre-processing HANDLE // Handle for processing POST_HANDLE // PostHandle for post-processing HANDLE_OVER )
type HeartBeatFunc ¶
type HeartBeatFunc func(connection SConnection) error
HeartBeatFunc User-defined heartbeat function (用户自定义心跳函数)
type HeartBeatMsgFunc ¶
type HeartBeatMsgFunc func(connection SConnection) SMsg
User-defined method for handling heartbeat detection messages (用户自定义的心跳检测消息处理方法)
type HeartBeatOption ¶
type HeartBeatOption struct { MakeMsg HeartBeatMsgFunc // User-defined method for handling heartbeat detection messages(用户自定义的心跳检测消息处理方法) OnRemoteNotAlive OnRemoteNotAlive // User-defined method for handling remote connections that are not alive(用户自定义的远程连接不存活时的处理方法) HeartBeatMsgID uint32 // User-defined ID for heartbeat detection messages(用户自定义的心跳检测消息ID) Router SRouter // User-defined business processing route for heartbeat detection messages(用户自定义的心跳检测消息业务处理路由) }
type MsgData ¶
type MsgData struct { MsgID uint32 // contains filtered or unexported fields }
func GetMsgData ¶
type NSQMsg ¶
type NSQMsg struct { //*Header //PkgLen uint32 // nsq不存在粘包问题 不需要 Cmd uint16 Ret uint16 Version uint8 SerializeType smsg.SerializeType CompressType smsg.CompressType MessageType smsg.MessageType Seq uint64 Metadata map[string]string Data []byte // contains filtered or unexported fields }
func (*NSQMsg) GetCompressType ¶
func (m *NSQMsg) GetCompressType() smsg.CompressType
func (*NSQMsg) GetHasFrameDecoder ¶
func (*NSQMsg) GetMessageType ¶
func (m *NSQMsg) GetMessageType() smsg.MessageType
func (*NSQMsg) GetNsqMessage ¶
func (m *NSQMsg) GetNsqMessage() *nsq.Message
func (*NSQMsg) GetSerializeType ¶
func (m *NSQMsg) GetSerializeType() smsg.SerializeType
func (*NSQMsg) GetVersion ¶
func (*NSQMsg) SetHasFrameDecoder ¶
func (*NSQMsg) SetNsqMessage ¶
func (m *NSQMsg) SetNsqMessage(message *nsq.Message)
type Nsq ¶
type Nsq struct { Consumers []*NsqConsumer // Buffered channel used for message communication between the read and write goroutines // (有缓冲管道,用于读、写两个goroutine之间的消息通信) NsqDataBuffChan chan *NsqData MaxNsqDataChanLen uint32 Apis map[int32]SRouter // contains filtered or unexported fields }
func (*Nsq) HandleMessage ¶
func (*Nsq) StartWriter ¶
func (n *Nsq) StartWriter(p *NsqProducer)
type NsqConsumer ¶
type NsqConsumer struct {
// contains filtered or unexported fields
}
func NewNsqConsumer ¶
func NewNsqConsumer(topic, channel, nsqLookupAddr string, concurrency, maxInFlight int) (*NsqConsumer, error)
func (*NsqConsumer) StartReader ¶
func (c *NsqConsumer) StartReader(handler nsq.Handler) error
func (*NsqConsumer) Stop ¶
func (c *NsqConsumer) Stop()
type NsqData ¶
type NsqData struct { Topic string // contains filtered or unexported fields }
func GetNsqData ¶
type NsqDataPack ¶
type NsqDataPack struct { }
func (*NsqDataPack) GetHeadLen ¶
func (dp *NsqDataPack) GetHeadLen() uint32
type NsqProducer ¶
type NsqProducer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(nsqdAddr string) (*NsqProducer, error)
func (*NsqProducer) PublishDirect ¶
func (p *NsqProducer) PublishDirect(topic string, data []byte) error
type OnRemoteNotAlive ¶
type OnRemoteNotAlive func(connection SConnection)
OnRemoteNotAlive User-defined method for handling remote connections that are not alive 用户自定义的远程连接不存活时的处理方法
type QuicConn ¶
type QuicConn struct {
// contains filtered or unexported fields
}
func (*QuicConn) RemoteAddr ¶
RemoteAddr returns the remote network address.
func (*QuicConn) SetDeadline ¶
SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
func (*QuicConn) SetReadDeadline ¶
SetReadDeadline implements the Conn SetReadDeadline method.
func (*QuicConn) SetWriteDeadline ¶
SetWriteDeadline implements the Conn SetWriteDeadline method.
type QuicListener ¶
type QuicListener struct {
// contains filtered or unexported fields
}
func NewQuicListener ¶
func NewQuicListener(c net.PacketConn, tlsConf *tls.Config, quicConfig *quic.Config) (*QuicListener, error)
func (*QuicListener) AcceptContext ¶
AcceptContext waits for and returns the next connection to the listener.
func (*QuicListener) Addr ¶
func (q *QuicListener) Addr() net.Addr
Addr returns the listener's network address.
func (*QuicListener) Close ¶
func (q *QuicListener) Close() error
Close closes the listener. Any blocked Accept operations will be unblocked and return errors.
type SConnManager ¶
type SConnManager interface { Add(SConnection) // Add connection Remove(SConnection) // Remove connection Get(uint64) (SConnection, error) // Get a connection by ConnID Get2(string) (SConnection, error) // Get a connection by string ConnID Len() int // Get current number of connections ClearConn() // Remove and stop all connections GetAllConnID() []uint64 // Get all connection IDs GetAllConnIdStr() []string // Get all string connection IDs Range(func(uint64, SConnection, interface{}) error, interface{}) error // Traverse all connections Range2(func(string, SConnection, interface{}) error, interface{}) error // Traverse all connections 2 }
type SConnection ¶
type SConnection interface { // Start the connection, make the current connection start working // (启动连接,让当前连接开始工作) Start() // Stop the connection and end the current connection state // (停止连接,结束当前连接状态) Stop() // Returns ctx, used by user-defined go routines to obtain connection exit status // (返回ctx,用于用户自定义的go程获取连接退出状态) Context() context.Context GetConnID() uint64 // Get the current connection ID (获取当前连接ID) GetConnIdStr() string // Get the current connection ID for string (获取当前字符串连接ID) GetTaskHandler() STaskHandler // Get the message handler (获取消息处理器) RemoteAddr() net.Addr // Get the remote address information of the connection (获取链接远程地址信息) LocalAddr() net.Addr // Get the local address information of the connection (获取链接本地地址信息) LocalAddrString() string // Get the local address information of the connection as a string RemoteAddrString() string // Get the remote address information of the connection as a string GetConnVersion() int32 SendData(data []byte) error // Send data to the message queue to be sent to the remote TCP client later SendMsg(msg SMsg) error SetProperty(key string, value string) // Set connection property GetProperty(key string) (string, error) // Get connection property RemoveProperty(key string) // Remove connection property IsAlive() bool // Check if the current connection is alive(判断当前连接是否存活) SetHeartBeat(checker SHeartbeatChecker) // Set the heartbeat detector (设置心跳检测器) // 返回当前连接是否存在FrameDecoder HasFrameDecoder() bool }
func NewConnection ¶
func NewConnection(conn net.Conn, connId uint64, connVersion int32, taskHandler STaskHandler, OnConnStart, OnConnStop func(conn SConnection), frameDecoder SFrameDecoder, datapack SDataPack, connManager SConnManager, IOReadBuffSize uint32, heartbeatDuration time.Duration) SConnection
func NewWsConnection ¶
func NewWsConnection(cID uint64, taskHandler STaskHandler, conn *websocket.Conn, onConnStart, onConnStop func(conn SConnection), datapack SDataPack) SConnection
type SDataPack ¶
type SDataPack interface { GetHeadLen() uint32 Pack(msg SMsg) ([]byte, error) Unpack(binaryData []byte) (SMsg, error) }
func NewNsqDataPack ¶
func NewNsqDataPack() SDataPack
type SFrameDecoder ¶
type SHeartbeatChecker ¶
type SHeartbeatChecker interface { SetOnRemoteNotAlive(OnRemoteNotAlive) SetHeartbeatMsgFunc(HeartBeatMsgFunc) //SetHeartbeatFunc(HeartBeatFunc) //BindRouter(uint32, SRouter) Start() Stop() SendHeartBeatMsg() error BindConn(connection SConnection) //Clone() SHeartbeatChecker Cmd() uint16 }
type SMsg ¶
type SMsg interface { GetMsgId() int32 GetCmd() uint16 // Gets the ID of the message(获取消息ID) GetRet() uint16 GetVersion() uint8 GetSerializeType() smsg.SerializeType GetCompressType() smsg.CompressType GetMessageType() smsg.MessageType GetSeq() uint64 GetMeta() map[string]string GetData() []byte // Gets the content of the message(获取消息内容) // SetNsqMessage(message *nsq.Message) GetNsqMessage() *nsq.Message // GetHasFrameDecoder() bool SetHasFrameDecoder(hasFrameDevoder bool) }
type STask ¶
type STask interface { GetConnection() SConnection // Get the connection information of the request(获取请求连接信息) GetData() []byte // Get the data of the request message(获取请求消息的数据) GetMsgID() int32 // Get the message ID of the request(获取请求的消息ID) GetCmd() uint16 GetMessage() SMsg // Get the raw data of the request message (获取请求消息的原始数据 add by uuxia 2023-03-10) BindRouter(router SRouter) // Bind which router handles this request(绑定这次请求由哪个路由处理) // Move on to the next handler to start execution, but the function that calls this method will execute in reverse order of their order // (转进到下一个处理器开始执行 但是调用此方法的函数会根据先后顺序逆序执行) Call() error //erminate the execution of the processing function, but the function that calls this method will be executed until completion // 终止处理函数的运行 但调用此方法的函数会执行完毕 Abort() //Set 在 Request 中存放一个上下文 Set(key string, value interface{}) //Get 从 Request 中获取一个上下文信息 Get(key string) (value interface{}, exists bool) }
func GetTask ¶
func GetTask(conn SConnection, msg SMsg) STask
type STaskHandler ¶
type STaskHandler interface { AddRouter(msgID int32, router SRouter) StartWorkerPool() // Start the worker pool SendTaskToTaskQueue(task STask) // Pass the message to the TaskQueue for processing by the worker(将消息交给TaskQueue,由worker进行处理) Stop() }
func NewTaskHandler ¶
func NewTaskHandler(workPoolSize, maxTaskQueueLen uint32) STaskHandler
type Task ¶
type Task struct { BaseRequest // contains filtered or unexported fields }
func (*Task) BindRouter ¶
func (*Task) GetConnection ¶
func (r *Task) GetConnection() SConnection
func (*Task) GetMessage ¶
func (*Task) Reset ¶
func (r *Task) Reset(conn SConnection, msg SMsg)
type TaskHandler ¶
type TaskHandler struct { Apis map[int32]SRouter // The number of worker goroutines in the business work Worker pool // (业务工作Worker池的数量) WorkerPoolSize uint32 // A message queue for workers to take tasks // (Worker负责取任务的消息队列) TaskQueue chan STask // contains filtered or unexported fields }
func (*TaskHandler) AddRouter ¶
func (mh *TaskHandler) AddRouter(msgID int32, router SRouter)
func (*TaskHandler) SendTaskToTaskQueue ¶
func (mh *TaskHandler) SendTaskToTaskQueue(task STask)
SendTaskToTaskQueue sends the message to the TaskQueue for processing by the worker (将消息交给TaskQueue,由worker进行处理)
func (*TaskHandler) StartOneWorker ¶
func (mh *TaskHandler) StartOneWorker(workerID int, taskQueue chan STask)
StartOneWorker starts a worker workflow (启动一个Worker工作流程)
func (*TaskHandler) StartWorkerPool ¶
func (mh *TaskHandler) StartWorkerPool()
StartWorkerPool starts the worker pool
func (*TaskHandler) Stop ¶
func (mh *TaskHandler) Stop()
type WsConnection ¶
type WsConnection struct { Connection // contains filtered or unexported fields }