sbus

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 25, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (
	HeartBeatDefaultMsgID uint16 = 1
)

Variables

View Source
var ErrMetaKVMissing = errors.New("wrong metadata lines. some keys or values are missing")
View Source
var MsgDataPool = new(sync.Pool)
View Source
var NsqDataPackObj = new(NsqDataPack)
View Source
var NsqDataPool = new(sync.Pool)
View Source
var TaskPool = new(sync.Pool)
View Source
var TopicEnum_name = map[int32]string{}
View Source
var TopicEnum_value = map[string]int32{}

Functions

func DecodeMetadata

func DecodeMetadata(l uint32, data []byte) (map[string]string, error)

func EncodeMetadata

func EncodeMetadata(m map[string]string, bb *bytes.Buffer)

len,string,len,string,......

func PutMsgData

func PutMsgData(msgData *MsgData)

func PutNsqData

func PutNsqData(nsqData *NsqData)

func PutTask

func PutTask(task STask)

func SetTopicEnumName

func SetTopicEnumName(v map[int32]string)

func SetTopicEnumValue

func SetTopicEnumValue(v map[string]int32)

Types

type BaseRequest

type BaseRequest struct{}

func (*BaseRequest) Abort

func (br *BaseRequest) Abort()

func (*BaseRequest) BindRouter

func (br *BaseRequest) BindRouter(router SRouter)

func (*BaseRequest) Call

func (br *BaseRequest) Call() error

func (*BaseRequest) Get

func (br *BaseRequest) Get(key string) (value interface{}, exists bool)

func (*BaseRequest) GetCmd

func (br *BaseRequest) GetCmd() uint16

func (*BaseRequest) GetConnection

func (br *BaseRequest) GetConnection() SConnection

func (*BaseRequest) GetData

func (br *BaseRequest) GetData() []byte

func (*BaseRequest) GetMessage

func (br *BaseRequest) GetMessage() SMsg

func (*BaseRequest) GetMsgID

func (br *BaseRequest) GetMsgID() int32

func (*BaseRequest) Set

func (br *BaseRequest) Set(key string, value interface{})

type BaseRouter

type BaseRouter struct{}

func (*BaseRouter) Handle

func (br *BaseRouter) Handle(task STask) error

Handle -

func (*BaseRouter) PostHandle

func (br *BaseRouter) PostHandle(task STask)

PostHandle -

func (*BaseRouter) PreHandle

func (br *BaseRouter) PreHandle(task STask)

PreHandle -

type ConnManager

type ConnManager struct {
	Connections utils.ShardLockMaps
}

func NewConnManager

func NewConnManager() *ConnManager

func (*ConnManager) Add

func (connMgr *ConnManager) Add(conn SConnection)

func (*ConnManager) ClearConn

func (connMgr *ConnManager) ClearConn()

func (*ConnManager) Get

func (connMgr *ConnManager) Get(connID uint64) (SConnection, error)

func (*ConnManager) Get2

func (connMgr *ConnManager) Get2(strConnId string) (SConnection, error)

Get2 It is recommended to use this method to obtain connection instances

func (*ConnManager) GetAllConnID

func (connMgr *ConnManager) GetAllConnID() []uint64

func (*ConnManager) GetAllConnIdStr

func (connMgr *ConnManager) GetAllConnIdStr() []string

func (*ConnManager) Len

func (connMgr *ConnManager) Len() int

func (*ConnManager) Range

func (connMgr *ConnManager) Range(cb func(uint64, SConnection, interface{}) error, args interface{}) (err error)

func (*ConnManager) Range2

func (connMgr *ConnManager) Range2(cb func(string, SConnection, interface{}) error, args interface{}) (err error)

Range2 It is recommended to use this method to 'Range'

func (*ConnManager) Remove

func (connMgr *ConnManager) Remove(conn SConnection)

type Connection

type Connection struct {
	Conn net.Conn
	// The ID of the current connection, also known as SessionID, globally unique, used by server Connection
	// uint64 range: 0~18,446,744,073,709,551,615
	// This is the maximum number of connID theoretically supported by the process
	// (当前连接的ID 也可以称作为SessionID,ID全局唯一 ,服务端Connection使用
	// uint64 取值范围:0 ~ 18,446,744,073,709,551,615
	// 这个是理论支持的进程connID的最大数量)
	ConnID uint64
	// connection id for string
	// (字符串的连接id)
	ConnIdStr string
	// 连接版本
	ConnVersion int32
	// The message management module that manages MsgID and the corresponding processing method
	// (消息管理MsgID和对应处理方法的消息管理模块)
	TaskHandler STaskHandler
	// onConnStart is the Hook function when the current connection is created.
	// (当前连接创建时Hook函数)
	OnConnStart func(conn SConnection)
	// onConnStop is the Hook function when the current connection is created.
	// (当前连接断开时的Hook函数)
	OnConnStop func(conn SConnection)

	// frameDecoder is the decoder for splitting or splicing data packets.
	// (断粘包解码器)
	FrameDecoder SFrameDecoder

	Datapack SDataPack

	// property is the connection attribute. (链接属性)
	Property map[string]string

	IOReadBuffSize uint32
	// contains filtered or unexported fields
}

func (*Connection) Context

func (bc *Connection) Context() context.Context

func (*Connection) GetConnID

func (bc *Connection) GetConnID() uint64

func (*Connection) GetConnIdStr

func (bc *Connection) GetConnIdStr() string

func (*Connection) GetConnVersion

func (bc *Connection) GetConnVersion() int32

func (*Connection) GetProperty

func (bc *Connection) GetProperty(key string) (string, error)

func (*Connection) GetTaskHandler

func (bc *Connection) GetTaskHandler() STaskHandler

func (*Connection) HasFrameDecoder

func (bc *Connection) HasFrameDecoder() bool

func (*Connection) IsAlive

func (bc *Connection) IsAlive() bool

func (*Connection) LocalAddr

func (bc *Connection) LocalAddr() net.Addr

func (*Connection) LocalAddrString

func (bc *Connection) LocalAddrString() string

func (*Connection) RemoteAddr

func (bc *Connection) RemoteAddr() net.Addr

func (*Connection) RemoteAddrString

func (bc *Connection) RemoteAddrString() string

func (*Connection) RemoveProperty

func (bc *Connection) RemoveProperty(key string)

func (*Connection) SendData

func (bc *Connection) SendData(data []byte) error

func (*Connection) SendMsg

func (bc *Connection) SendMsg(msg SMsg) error

func (*Connection) SetHeartBeat

func (bc *Connection) SetHeartBeat(checker SHeartbeatChecker)

func (*Connection) SetProperty

func (bc *Connection) SetProperty(key string, value string)

func (*Connection) Start

func (bc *Connection) Start()

Start()

func (*Connection) StartReader

func (bc *Connection) StartReader()

func (*Connection) Stop

func (bc *Connection) Stop()

type HandleStep

type HandleStep int
const (
	PRE_HANDLE  HandleStep = iota // PreHandle for pre-processing
	HANDLE                        // Handle for processing
	POST_HANDLE                   // PostHandle for post-processing

	HANDLE_OVER
)

type HeartBeatFunc

type HeartBeatFunc func(connection SConnection) error

HeartBeatFunc User-defined heartbeat function (用户自定义心跳函数)

type HeartBeatMsgFunc

type HeartBeatMsgFunc func(connection SConnection) SMsg

User-defined method for handling heartbeat detection messages (用户自定义的心跳检测消息处理方法)

type HeartBeatOption

type HeartBeatOption struct {
	MakeMsg          HeartBeatMsgFunc // User-defined method for handling heartbeat detection messages(用户自定义的心跳检测消息处理方法)
	OnRemoteNotAlive OnRemoteNotAlive // User-defined method for handling remote connections that are not alive(用户自定义的远程连接不存活时的处理方法)
	HeartBeatMsgID   uint32           // User-defined ID for heartbeat detection messages(用户自定义的心跳检测消息ID)
	Router           SRouter          // User-defined business processing route for heartbeat detection messages(用户自定义的心跳检测消息业务处理路由)
}

type MsgData

type MsgData struct {
	MsgID uint32
	// contains filtered or unexported fields
}

func GetMsgData

func GetMsgData(msgId uint32, data []byte) *MsgData

func (*MsgData) Reset

func (md *MsgData) Reset(msgId uint32, data []byte)

type NSQMsg

type NSQMsg struct {
	//*Header
	//PkgLen        uint32  // nsq不存在粘包问题 不需要
	Cmd           uint16
	Ret           uint16
	Version       uint8
	SerializeType smsg.SerializeType
	CompressType  smsg.CompressType
	MessageType   smsg.MessageType
	Seq           uint64
	Metadata      map[string]string
	Data          []byte
	// contains filtered or unexported fields
}

func NewNSQMsg

func NewNSQMsg(Cmd uint16, ret uint16, sType smsg.SerializeType, md map[string]string, data []byte) *NSQMsg

func (*NSQMsg) GetCmd

func (m *NSQMsg) GetCmd() uint16

func (*NSQMsg) GetCompressType

func (m *NSQMsg) GetCompressType() smsg.CompressType

func (*NSQMsg) GetData

func (m *NSQMsg) GetData() []byte

func (*NSQMsg) GetHasFrameDecoder

func (m *NSQMsg) GetHasFrameDecoder() bool

func (*NSQMsg) GetMessageType

func (m *NSQMsg) GetMessageType() smsg.MessageType

func (*NSQMsg) GetMeta

func (m *NSQMsg) GetMeta() map[string]string

func (*NSQMsg) GetMsgId

func (m *NSQMsg) GetMsgId() int32

func (*NSQMsg) GetNsqMessage

func (m *NSQMsg) GetNsqMessage() *nsq.Message

func (*NSQMsg) GetRet

func (m *NSQMsg) GetRet() uint16

func (*NSQMsg) GetSeq

func (m *NSQMsg) GetSeq() uint64

func (*NSQMsg) GetSerializeType

func (m *NSQMsg) GetSerializeType() smsg.SerializeType

func (*NSQMsg) GetVersion

func (m *NSQMsg) GetVersion() uint8

func (*NSQMsg) SetHasFrameDecoder

func (m *NSQMsg) SetHasFrameDecoder(hasFrameDevoder bool)

func (*NSQMsg) SetNsqMessage

func (m *NSQMsg) SetNsqMessage(message *nsq.Message)

type Nsq

type Nsq struct {
	Consumers []*NsqConsumer

	// Buffered channel used for message communication between the read and write goroutines
	// (有缓冲管道,用于读、写两个goroutine之间的消息通信)
	NsqDataBuffChan   chan *NsqData
	MaxNsqDataChanLen uint32

	Apis map[int32]SRouter
	// contains filtered or unexported fields
}

func NewNsq

func NewNsq(workPoolSize, maxTaskQueueLen, maxNsqDataChanLen uint32, channel, nsqLookupAddr string, concurrency, maxInFlight int, nsqdList []string) *Nsq

func NewNsqByConf

func NewNsqByConf(nsq2 sconfig.Nsq, dataPack SDataPack) (*Nsq, error)

func (*Nsq) AddRouter

func (n *Nsq) AddRouter(topic string, msgID int32, router SRouter)

func (*Nsq) HandleMessage

func (n *Nsq) HandleMessage(message *nsq.Message) error

func (*Nsq) SendToMsgBuffChan

func (n *Nsq) SendToMsgBuffChan(topic string, data []byte) error

func (*Nsq) Start

func (n *Nsq) Start()

func (*Nsq) StartWriter

func (n *Nsq) StartWriter(p *NsqProducer)

func (*Nsq) Stop

func (n *Nsq) Stop()

Stop stops the connection and ends the current connection state. (停止连接,结束当前连接状态)

type NsqConsumer

type NsqConsumer struct {
	// contains filtered or unexported fields
}

func NewNsqConsumer

func NewNsqConsumer(topic, channel, nsqLookupAddr string, concurrency, maxInFlight int) (*NsqConsumer, error)

func (*NsqConsumer) StartReader

func (c *NsqConsumer) StartReader(handler nsq.Handler) error

func (*NsqConsumer) Stop

func (c *NsqConsumer) Stop()

type NsqData

type NsqData struct {
	Topic string
	// contains filtered or unexported fields
}

func GetNsqData

func GetNsqData(topic string, data []byte) *NsqData

func (*NsqData) Reset

func (nd *NsqData) Reset(topic string, data []byte)

type NsqDataPack

type NsqDataPack struct {
}

func (*NsqDataPack) GetHeadLen

func (dp *NsqDataPack) GetHeadLen() uint32

func (*NsqDataPack) Pack

func (dp *NsqDataPack) Pack(msg SMsg) ([]byte, error)

Pack packs the message (compresses the data) (封包方法,压缩数据)

func (*NsqDataPack) Unpack

func (dp *NsqDataPack) Unpack(binaryData []byte) (SMsg, error)

Unpack unpacks the message (decompresses the data) (拆包方法,解压数据)

type NsqProducer

type NsqProducer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(nsqdAddr string) (*NsqProducer, error)

func (*NsqProducer) PublishDirect

func (p *NsqProducer) PublishDirect(topic string, data []byte) error

type OnRemoteNotAlive

type OnRemoteNotAlive func(connection SConnection)

OnRemoteNotAlive User-defined method for handling remote connections that are not alive 用户自定义的远程连接不存活时的处理方法

type QuicConn

type QuicConn struct {
	// contains filtered or unexported fields
}

func (*QuicConn) Close

func (c *QuicConn) Close() error

Close closes the connection.

func (*QuicConn) LocalAddr

func (c *QuicConn) LocalAddr() net.Addr

LocalAddr returns the local network address.

func (*QuicConn) Read

func (c *QuicConn) Read(b []byte) (int, error)

Read implements the Conn Read method.

func (*QuicConn) RemoteAddr

func (c *QuicConn) RemoteAddr() net.Addr

RemoteAddr returns the remote network address.

func (*QuicConn) SetDeadline

func (c *QuicConn) SetDeadline(t time.Time) error

SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.

func (*QuicConn) SetReadDeadline

func (c *QuicConn) SetReadDeadline(t time.Time) error

SetReadDeadline implements the Conn SetReadDeadline method.

func (*QuicConn) SetWriteDeadline

func (c *QuicConn) SetWriteDeadline(t time.Time) error

SetWriteDeadline implements the Conn SetWriteDeadline method.

func (*QuicConn) Write

func (c *QuicConn) Write(b []byte) (int, error)

Write implements the Conn Write method.

type QuicListener

type QuicListener struct {
	// contains filtered or unexported fields
}

func NewQuicListener

func NewQuicListener(c net.PacketConn, tlsConf *tls.Config, quicConfig *quic.Config) (*QuicListener, error)

func (*QuicListener) Accept

func (q *QuicListener) Accept() (net.Conn, error)

func (*QuicListener) AcceptContext

func (q *QuicListener) AcceptContext(ctx context.Context) (net.Conn, error)

AcceptContext waits for and returns the next connection to the listener.

func (*QuicListener) Addr

func (q *QuicListener) Addr() net.Addr

Addr returns the listener's network address.

func (*QuicListener) Close

func (q *QuicListener) Close() error

Close closes the listener. Any blocked Accept operations will be unblocked and return errors.

type SConnManager

type SConnManager interface {
	Add(SConnection)                                                        // Add connection
	Remove(SConnection)                                                     // Remove connection
	Get(uint64) (SConnection, error)                                        // Get a connection by ConnID
	Get2(string) (SConnection, error)                                       // Get a connection by string ConnID
	Len() int                                                               // Get current number of connections
	ClearConn()                                                             // Remove and stop all connections
	GetAllConnID() []uint64                                                 // Get all connection IDs
	GetAllConnIdStr() []string                                              // Get all string connection IDs
	Range(func(uint64, SConnection, interface{}) error, interface{}) error  // Traverse all connections
	Range2(func(string, SConnection, interface{}) error, interface{}) error // Traverse all connections 2
}

type SConnection

type SConnection interface {
	// Start the connection, make the current connection start working
	// (启动连接,让当前连接开始工作)
	Start()
	// Stop the connection and end the current connection state
	// (停止连接,结束当前连接状态)
	Stop()

	// Returns ctx, used by user-defined go routines to obtain connection exit status
	// (返回ctx,用于用户自定义的go程获取连接退出状态)
	Context() context.Context

	GetConnID() uint64            // Get the current connection ID (获取当前连接ID)
	GetConnIdStr() string         // Get the current connection ID for string (获取当前字符串连接ID)
	GetTaskHandler() STaskHandler // Get the message handler (获取消息处理器)
	RemoteAddr() net.Addr         // Get the remote address information of the connection (获取链接远程地址信息)
	LocalAddr() net.Addr          // Get the local address information of the connection (获取链接本地地址信息)
	LocalAddrString() string      // Get the local address information of the connection as a string
	RemoteAddrString() string     // Get the remote address information of the connection as a string
	GetConnVersion() int32

	SendData(data []byte) error // Send data to the message queue to be sent to the remote TCP client later
	SendMsg(msg SMsg) error

	SetProperty(key string, value string)   // Set connection property
	GetProperty(key string) (string, error) // Get connection property
	RemoveProperty(key string)              // Remove connection property
	IsAlive() bool                          // Check if the current connection is alive(判断当前连接是否存活)
	SetHeartBeat(checker SHeartbeatChecker) // Set the heartbeat detector (设置心跳检测器)

	// 返回当前连接是否存在FrameDecoder
	HasFrameDecoder() bool
}

func NewConnection

func NewConnection(conn net.Conn, connId uint64, connVersion int32, taskHandler STaskHandler, OnConnStart, OnConnStop func(conn SConnection), frameDecoder SFrameDecoder, datapack SDataPack, connManager SConnManager, IOReadBuffSize uint32, heartbeatDuration time.Duration) SConnection

func NewWsConnection

func NewWsConnection(cID uint64, taskHandler STaskHandler, conn *websocket.Conn, onConnStart, onConnStop func(conn SConnection), datapack SDataPack) SConnection

type SDataPack

type SDataPack interface {
	GetHeadLen() uint32
	Pack(msg SMsg) ([]byte, error)
	Unpack(binaryData []byte) (SMsg, error)
}

func NewNsqDataPack

func NewNsqDataPack() SDataPack

type SFrameDecoder

type SFrameDecoder interface {
	Decode(buff []byte) ([][]byte, error)
}

type SFuncTask

type SFuncTask interface {
	CallFunc()
}

type SHeartbeatChecker

type SHeartbeatChecker interface {
	SetOnRemoteNotAlive(OnRemoteNotAlive)
	SetHeartbeatMsgFunc(HeartBeatMsgFunc)
	//SetHeartbeatFunc(HeartBeatFunc)
	//BindRouter(uint32, SRouter)
	Start()
	Stop()
	SendHeartBeatMsg() error
	BindConn(connection SConnection)
	//Clone() SHeartbeatChecker
	Cmd() uint16
}

type SMsg

type SMsg interface {
	GetMsgId() int32
	GetCmd() uint16 // Gets the ID of the message(获取消息ID)
	GetRet() uint16
	GetVersion() uint8
	GetSerializeType() smsg.SerializeType
	GetCompressType() smsg.CompressType
	GetMessageType() smsg.MessageType
	GetSeq() uint64
	GetMeta() map[string]string
	GetData() []byte // Gets the content of the message(获取消息内容)
	//
	SetNsqMessage(message *nsq.Message)
	GetNsqMessage() *nsq.Message
	//
	GetHasFrameDecoder() bool
	SetHasFrameDecoder(hasFrameDevoder bool)
}

type SRouter

type SRouter interface {
	PreHandle(task STask)    //Hook method before processing conn business(在处理conn业务之前的钩子方法)
	Handle(task STask) error //Method for processing conn business(处理conn业务的方法)
	PostHandle(task STask)   //Hook method after processing conn business(处理conn业务之后的钩子方法)
}

type STask

type STask interface {
	GetConnection() SConnection // Get the connection information of the request(获取请求连接信息)

	GetData() []byte // Get the data of the request message(获取请求消息的数据)
	GetMsgID() int32 // Get the message ID of the request(获取请求的消息ID)
	GetCmd() uint16
	GetMessage() SMsg // Get the raw data of the request message (获取请求消息的原始数据 add by uuxia 2023-03-10)

	BindRouter(router SRouter) // Bind which router handles this request(绑定这次请求由哪个路由处理)
	// Move on to the next handler to start execution, but the function that calls this method will execute in reverse order of their order
	// (转进到下一个处理器开始执行 但是调用此方法的函数会根据先后顺序逆序执行)
	Call() error

	//erminate the execution of the processing function, but the function that calls this method will be executed until completion
	// 终止处理函数的运行 但调用此方法的函数会执行完毕
	Abort()

	//Set 在 Request 中存放一个上下文
	Set(key string, value interface{})
	//Get 从 Request 中获取一个上下文信息
	Get(key string) (value interface{}, exists bool)
}

func GetTask

func GetTask(conn SConnection, msg SMsg) STask

type STaskHandler

type STaskHandler interface {
	AddRouter(msgID int32, router SRouter)
	StartWorkerPool()               //  Start the worker pool
	SendTaskToTaskQueue(task STask) // Pass the message to the TaskQueue for processing by the worker(将消息交给TaskQueue,由worker进行处理)
	Stop()
}

func NewTaskHandler

func NewTaskHandler(workPoolSize, maxTaskQueueLen uint32) STaskHandler

type Task

type Task struct {
	BaseRequest
	// contains filtered or unexported fields
}

func (*Task) BindRouter

func (r *Task) BindRouter(router SRouter)

func (*Task) Call

func (r *Task) Call() error

func (*Task) GetCmd

func (r *Task) GetCmd() uint16

func (*Task) GetConnection

func (r *Task) GetConnection() SConnection

func (*Task) GetData

func (r *Task) GetData() []byte

func (*Task) GetMessage

func (r *Task) GetMessage() SMsg

func (*Task) GetMsgID

func (r *Task) GetMsgID() int32

func (*Task) Reset

func (r *Task) Reset(conn SConnection, msg SMsg)

type TaskHandler

type TaskHandler struct {
	Apis map[int32]SRouter
	// The number of worker goroutines in the business work Worker pool
	// (业务工作Worker池的数量)
	WorkerPoolSize uint32
	// A message queue for workers to take tasks
	// (Worker负责取任务的消息队列)
	TaskQueue chan STask
	// contains filtered or unexported fields
}

func (*TaskHandler) AddRouter

func (mh *TaskHandler) AddRouter(msgID int32, router SRouter)

func (*TaskHandler) SendTaskToTaskQueue

func (mh *TaskHandler) SendTaskToTaskQueue(task STask)

SendTaskToTaskQueue sends the message to the TaskQueue for processing by the worker (将消息交给TaskQueue,由worker进行处理)

func (*TaskHandler) StartOneWorker

func (mh *TaskHandler) StartOneWorker(workerID int, taskQueue chan STask)

StartOneWorker starts a worker workflow (启动一个Worker工作流程)

func (*TaskHandler) StartWorkerPool

func (mh *TaskHandler) StartWorkerPool()

StartWorkerPool starts the worker pool

func (*TaskHandler) Stop

func (mh *TaskHandler) Stop()

type TopicEnum

type TopicEnum int32

func (TopicEnum) String

func (t TopicEnum) String() string

type WsConnection

type WsConnection struct {
	Connection
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL