ws

package
v2.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 19, 2023 License: Apache-2.0 Imports: 14 Imported by: 4

Documentation

Index

Constants

View Source
const (
	NormalClosureCode           = 1000
	GoingAwayCode               = 1001
	ProtocolErrorCode           = 1002
	UnsupportedDataCode         = 1003
	ReservedCode                = 1004
	NoStatusRcvCode             = 1005
	AbnormalClosureCode         = 1006
	InvalidFramePayloadDataCode = 1007
	PolicyViolationCode         = 1008
	MessageTooBigCode           = 1009
	MandatoryExtCode            = 1010
	InternalErrorCode           = 1011
	ServiceRestartCode          = 1012
	TryAgainLaterCode           = 1013
	ProxyInvalidResponseCode    = 1014
	TLSHandshakeCode            = 1015
	UnauthorizedCode            = 3000
)
View Source
const (
	ActionPost   = "post"
	ActionGet    = "get"
	ActionDelete = "delete"
	ActionUpdate = "update"
	ActionJoin   = "join"
	ActionLeave  = "leave"
	PushTypeUser = 0 // 推送房间
	PushTypeRoom = 1 // 推送房间
	PushTypeAll  = 2 // 推送在线
)

推送类型

Variables

View Source
var (
	ErrConnectionLoss = gerror.New("ERR_CONNECTION_LOSS")

	ErrSendMessageFull = gerror.New("ERR_SEND_MESSAGE_FULL")

	ErrJoinRoomTwice = gerror.New("ERR_JOIN_ROOM_TWICE")

	ErrNotInRoom = gerror.New("ERR_NOT_IN_ROOM")

	ErrRoomIdInvalid = gerror.New("ERR_ROOM_ID_INVALID")

	ErrDispatchChannelFull = gerror.New("ERR_DISPATCH_CHANNEL_FULL")

	ErrMergeChannelFull = gerror.New("ERR_MERGE_CHANNEL_FULL")
)

Functions

This section is empty.

Types

type ActDelete

type ActDelete interface {
	Delete(ctx context.Context, message *MessageReq) (*MessageRes, error)
}

type ActGet

type ActGet interface {
	Get(ctx context.Context, message *MessageReq) (*MessageRes, error)
}

type ActJoin

type ActJoin interface {
	Join(ctx context.Context, message *MessageReq) (*MessageRes, error)
}

type ActLeave

type ActLeave interface {
	Leave(ctx context.Context, message *MessageReq) (*MessageRes, error)
}

type ActPost

type ActPost interface {
	Post(ctx context.Context, message *MessageReq) (*MessageRes, error)
}

type ActUpdate

type ActUpdate interface {
	Update(ctx context.Context, message *MessageReq) (*MessageRes, error)
}

type Actions

type Actions interface {
	ActPost
	ActDelete
	ActUpdate
	ActGet
	ActJoin
	ActLeave
}

type BizMessage

type BizMessage struct {
	Type string          `json:"type"` // type消息类型: PING, PONG, JOIN, LEAVE, PUSH
	Data json.RawMessage `json:"data"` // data数据字段
}

BizMessage 业务消息的固定格式(type+data)

type BizPushData

type BizPushData struct {
	Items []*json.RawMessage `json:"items"`
}

BizPushData PUSH

type Bucket

type Bucket struct {
	// contains filtered or unexported fields
}

func InitBucket

func InitBucket(bucketIdx int) (bucket *Bucket)

func (*Bucket) AddConn

func (bucket *Bucket) AddConn(wsConn *Connection)

func (*Bucket) DelConn

func (bucket *Bucket) DelConn(wsConn *Connection)

func (*Bucket) DelConnByCID

func (bucket *Bucket) DelConnByCID(ctx context.Context, connId uint64)

func (*Bucket) JoinRoom

func (bucket *Bucket) JoinRoom(roomId string, wsConn *Connection) (err error)

func (*Bucket) LeaveRoom

func (bucket *Bucket) LeaveRoom(roomId string, wsConn *Connection) (err error)

func (*Bucket) PushAll

func (bucket *Bucket) PushAll(wsMsg *Message)

PushAll 推送给Bucket内所有用户

func (*Bucket) PushByCid

func (bucket *Bucket) PushByCid(connId uint64, wsMsg *Message) (err error)

PushByCid 根据CID 给用户推送消息

type Config

type Config struct {
	// websocket HTTP握手读超时 单位毫秒
	ReadTimeout int `json:"readTimeout" yaml:"readTimeout"`
	// websocket HTTP握手写超时 单位毫秒
	WriteTimeout int `json:"writeTimeout" yaml:"writeTimeout"`
	// websocket读队列长度 一般不需要修改
	InChannelSize int `json:"inChannelSize" yaml:"inChannelSize"`
	// WebSocket写队列长度 一般不需要修改
	OutChannelSize int `json:"outChannelSize" yaml:"outChannelSize"`
	// WebSocket心跳检查间隔 单位秒, 超过时间没有收到心跳, 服务端将主动断开链接
	HeartbeatInterval int `json:"heartbeatInterval" yaml:"heartbeatInterval"`
	// 连接分桶的数量 桶越多, 推送的锁粒度越小, 推送并发度越高
	BucketCount int `json:"bucketCount" yaml:"bucketCount"`
	// 每个桶的处理协程数量 影响同一时刻可以有多少个不同消息被分发出去
	BucketWorkerCount int `json:"bucketWorkerCount" yaml:"bucketWorkerCount"`
	// bucket工作队列长度 每个bucket的分发任务放在一个独立队列中
	BucketJobChannelSize int `json:"bucketJobChannelSize" yaml:"bucketJobChannelSize"`
	// bucket发送协程的数量 每个bucket有多个协程并发的推送消息
	BucketJobWorkerCount int `json:"bucketJobWorkerCount" yaml:"bucketJobWorkerCount"`
	// 待分发队列的长度 分发队列缓冲所有待推送的消息, 等待被分发到bucket
	DispatchChannelSize int `json:"dispatchChannelSize" yaml:"dispatchChannelSize"`
	// 分发协程的数量 分发协程用于将待推送消息扇出给各个bucket
	DispatchWorkerCount int `json:"dispatchWorkerCount" yaml:"dispatchWorkerCount"`
	// 合并推送的最大延迟时间 单位毫秒, 在抵达maxPushBatchSize之前超时则发送
	MaxMergerDelay int `json:"maxMergerDelay" yaml:"maxMergerDelay"`
	// 合并最多消息条数 消息推送频次越高, 应该使用更大的合并批次, 得到更高的吞吐收益
	MaxMergerBatchSize int `json:"maxMergerBatchSize" yaml:"maxMergerBatchSize"`
	// 消息合并协程的数量 消息合并与json编码耗费CPU, 注意一个房间的消息只会由同一个协程处理.
	MergerWorkerCount int `json:"mergerWorkerCount" yaml:"mergerWorkerCount"`
	// 消息合并队列的容量 每个房间消息合并线程有一个队列, 推送量超过队列将被丢弃
	MergerChannelSize int `json:"mergerChannelSize" yaml:"mergerChannelSize"`
	// 每个房间连接最多加入数量
	MaxJoinRoom int `json:"maxJoinRoom" yaml:"maxJoinRoom"`
}

type ConnManager

type ConnManager struct {
	// contains filtered or unexported fields
}

ConnManager 连接管理器

func ConnMgr

func ConnMgr() *ConnManager

func NewConnMgr

func NewConnMgr(config *Config) *ConnManager

func (*ConnManager) AddConn

func (connMgr *ConnManager) AddConn(wsConnection *Connection)

func (*ConnManager) DelConn

func (connMgr *ConnManager) DelConn(wsConnection *Connection)

func (*ConnManager) DelConnByCID

func (connMgr *ConnManager) DelConnByCID(ctx context.Context, connId uint64)

func (*ConnManager) GetBucket

func (connMgr *ConnManager) GetBucket(wsConnection *Connection) (bucket *Bucket)

func (*ConnManager) GetBucketByCID

func (connMgr *ConnManager) GetBucketByCID(connId uint64) (bucket *Bucket)

func (*ConnManager) JoinRoom

func (connMgr *ConnManager) JoinRoom(roomId string, wsConn *Connection) (err error)

func (*ConnManager) LeaveRoom

func (connMgr *ConnManager) LeaveRoom(roomId string, wsConn *Connection) (err error)

func (*ConnManager) PushAll

func (connMgr *ConnManager) PushAll(msg *MessageReq) (err error)

PushAll 向所有在线用户发送消息

func (*ConnManager) PushByCID

func (connMgr *ConnManager) PushByCID(connId uint64, message *MessageReq) (err error)

PushByCID 向指定用户发送消息

func (*ConnManager) PushRoom

func (connMgr *ConnManager) PushRoom(roomId string, items *MessageReq) (err error)

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func NewConnection

func NewConnection(
	ctx context.Context, connId uint64,
	wsSocket *ghttp.WebSocket,
	heartbeat, inChannelSize, outChannelSize int,
) (c *Connection)

func (*Connection) Close

func (conn *Connection) Close(context.Context)

Close 关闭连接

func (*Connection) Handle

func (conn *Connection) Handle(ctx context.Context, h Service, req *MessageReq) (res *MessageRes, err error)

func (*Connection) IsAlive

func (conn *Connection) IsAlive() bool

IsAlive 检查心跳(不需要太频繁)

func (*Connection) KeepAlive

func (conn *Connection) KeepAlive()

KeepAlive 更新心跳

func (*Connection) ReadMessage

func (conn *Connection) ReadMessage() (message *Message, err error)

ReadMessage 读取消息

func (*Connection) RouterHandle

func (conn *Connection) RouterHandle(ctx context.Context, ins *Instance, routers *map[string]Service)

RouterHandle 处理websocket请求

func (*Connection) SendMessage

func (conn *Connection) SendMessage(message *Message) (err error)

SendMessage 发送消息

type Instance

type Instance struct {
	ServerId    *uint64      `json:"serverId"`
	ConnManager *ConnManager `json:"connMgr"`
	Merge       *Merger      `json:"merger"`
	Cfg         *Config      `json:"config"`
	Stat        *Statistics  `json:"stats"`
}

func NewSocket

func NewSocket(id *uint64, cfg *Config) *Instance

func (*Instance) CloseSession

func (i *Instance) CloseSession(ctx context.Context, connId uint64) error

func (*Instance) Connection

func (i *Instance) Connection(ctx context.Context, wsSocket *ghttp.WebSocket, routers *map[string]Service) *Instance

func (*Instance) GetInstance

func (i *Instance) GetInstance() *Instance

func (*Instance) GetStats

func (i *Instance) GetStats() *Statistics

func (*Instance) PushAll

func (i *Instance) PushAll(msg *MessageReq) error

func (*Instance) PushRoom

func (i *Instance) PushRoom(roomId string, msg *MessageReq) error

func (*Instance) PushSession

func (i *Instance) PushSession(connId uint64, message *MessageReq) error

type JoinData

type JoinData struct {
	Room string `json:"room"`
}

JoinData JOIN

type LeaveData

type LeaveData struct {
	Room string `json:"room"`
}

LeaveData LEAVE

type MergeWorker

type MergeWorker struct {
	// contains filtered or unexported fields
}

type Merger

type Merger struct {
	// contains filtered or unexported fields
}

Merger 广播消息、消息的合并

func NewMerger

func NewMerger(config *Config) *Merger

func (*Merger) PushAll

func (merger *Merger) PushAll(msg *json.RawMessage) (err error)

PushAll 广播合并推送

func (*Merger) PushByCid

func (merger *Merger) PushByCid(connId uint64, msg *json.RawMessage) (err error)

type Message

type Message struct {
	MsgType int
	MsgData []byte
}

Message websocket的Message对象

func BuildWsMessage

func BuildWsMessage(msgType int, msgData []byte) (wsMessage *Message)

func EncodeWsMessage

func EncodeWsMessage(message *MessageReq) (wsMessage *Message, err error)

type MessageReq

type MessageReq struct {
	Service string      `json:"service"`
	Action  string      `json:"action"`
	Data    interface{} `json:"data"`
}

func DecodeMessage

func DecodeMessage(buf []byte) (message *MessageReq, err error)

DecodeMessage 解析{"type": "PING", "data": {...}}的包

type MessageRes

type MessageRes struct {
	Service string      `json:"service"`
	Action  string      `json:"action"`
	Data    interface{} `json:"data"`
}

type NullResp

type NullResp struct{}

type PingData

type PingData struct{}

PingData PING

type PongData

type PongData struct {
	SessionId uint64 `json:"session_id"`
	KeepAlive int    `json:"keep_alive"`
	DateTime  string `json:"date_time"`
	Expire    int64  `json:"expire"`
}

PongData PONG

type PushBatch

type PushBatch struct {
	// contains filtered or unexported fields
}

type PushContext

type PushContext struct {
	// contains filtered or unexported fields
}

type PushJob

type PushJob struct {

	// union {
	Msg *MessageReq // 未序列化的业务消息
	// contains filtered or unexported fields
}

PushJob 推送任务

type Response

type Response struct {
	Code    int         `json:"code"`
	Message string      `json:"message"`
	Body    interface{} `json:"body"`
}

type Room

type Room struct {
	// contains filtered or unexported fields
}

Room 房间

func InitRoom

func InitRoom(roomId string) (room *Room)

func (*Room) Count

func (room *Room) Count() int

func (*Room) Join

func (room *Room) Join(wsConn *Connection) (err error)

func (*Room) Leave

func (room *Room) Leave(wsConn *Connection) (err error)

func (*Room) Push

func (room *Room) Push(wsMsg *Message)

type Service

type Service interface {
	SrvAction
}

type SrvAction

type SrvAction interface {
	Action() Actions
}

type SrvAfter

type SrvAfter interface {
	After(ctx context.Context, response *MessageRes) error
}

type SrvBefore

type SrvBefore interface {
	Before(ctx context.Context, message *MessageReq) error
}

type SrvHandler

type SrvHandler interface {
	Handle(ctx context.Context, message *MessageReq) (*MessageRes, error)
}

type Statistics

type Statistics struct {
	// 反馈在线长连接的数量
	OnlineConnections int64 `json:"online_connections"`

	// 反馈客户端的推送压力
	SendMessageTotal int64 `json:"send_message_total"`
	SendMessageFail  int64 `json:"send_message_fail"`

	// 反馈ConnMgr消息分发模块的压力
	// 推送失败次数
	PushFail        int64 `json:"push_fail"`
	DispatchPending int64 `json:"dispatch_pending"`
	PushJobPending  int64 `json:"push_job_pending"`
	// 分发总消息数
	DispatchTotal int64 `json:"dispatch_total"`
	// 分发丢弃消息数
	DispatchFail int64 `json:"dispatch_fail"`
	// 返回出房间在线的总数, 有利于分析内存上涨的原因
	RoomCount int64 `json:"room_count"`

	// Merger模块处理队列, 反馈出消息合并的压力情况
	MergerPending int64 `json:"merger_pending"`

	// Merger模块合并发送的消息总数与失败总数
	MergerRoomTotal int64 `json:"merger_room_total"`
	MergerAllTotal  int64 `json:"merger_all_total"`
	MergerRoomFail  int64 `json:"merger_room_fail"`
	MergerAllFail   int64 `json:"merger_all_fail"`
}

func GetStats

func GetStats() *Statistics

func (*Statistics) DispatchFailIncr

func (s *Statistics) DispatchFailIncr()

func (*Statistics) DispatchFailNumIncr

func (s *Statistics) DispatchFailNumIncr(batchSize int64)

func (*Statistics) DispatchPendingDesc

func (s *Statistics) DispatchPendingDesc()

func (*Statistics) DispatchPendingIncr

func (s *Statistics) DispatchPendingIncr()

func (*Statistics) DispatchTotalIncr

func (s *Statistics) DispatchTotalIncr(batchSize int64)

func (*Statistics) Dump

func (s *Statistics) Dump() (data []byte, err error)

func (*Statistics) GetStats

func (s *Statistics) GetStats() *Statistics

func (*Statistics) MergerAllFailIncr

func (s *Statistics) MergerAllFailIncr(batchSize int64)

func (*Statistics) MergerAllTotalIncr

func (s *Statistics) MergerAllTotalIncr(batchSize int64)

func (*Statistics) MergerPendingDesc

func (s *Statistics) MergerPendingDesc()

func (*Statistics) MergerPendingIncr

func (s *Statistics) MergerPendingIncr()

func (*Statistics) OnlineConnectionsDesc

func (s *Statistics) OnlineConnectionsDesc()

func (*Statistics) OnlineConnectionsIncr

func (s *Statistics) OnlineConnectionsIncr()

func (*Statistics) PushJobPendingDesc

func (s *Statistics) PushJobPendingDesc()

func (*Statistics) PushJobPendingIncr

func (s *Statistics) PushJobPendingIncr()

func (*Statistics) RoomCountDesc

func (s *Statistics) RoomCountDesc()

func (*Statistics) RoomCountIncr

func (s *Statistics) RoomCountIncr()

func (*Statistics) SendMessageFailIncr

func (s *Statistics) SendMessageFailIncr()

func (*Statistics) SendMessageTotalIncr

func (s *Statistics) SendMessageTotalIncr()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL