Documentation ¶
Index ¶
- Constants
- Variables
- type ActDelete
- type ActGet
- type ActJoin
- type ActLeave
- type ActPost
- type ActUpdate
- type Actions
- type BizMessage
- type BizPushData
- type Bucket
- func (bucket *Bucket) AddConn(wsConn *Connection)
- func (bucket *Bucket) DelConn(wsConn *Connection)
- func (bucket *Bucket) DelConnByCID(ctx context.Context, connId uint64)
- func (bucket *Bucket) JoinRoom(roomId string, wsConn *Connection) (err error)
- func (bucket *Bucket) LeaveRoom(roomId string, wsConn *Connection) (err error)
- func (bucket *Bucket) PushAll(wsMsg *Message)
- func (bucket *Bucket) PushByCid(connId uint64, wsMsg *Message) (err error)
- type Config
- type ConnManager
- func (connMgr *ConnManager) AddConn(wsConnection *Connection)
- func (connMgr *ConnManager) DelConn(wsConnection *Connection)
- func (connMgr *ConnManager) DelConnByCID(ctx context.Context, connId uint64)
- func (connMgr *ConnManager) GetBucket(wsConnection *Connection) (bucket *Bucket)
- func (connMgr *ConnManager) GetBucketByCID(connId uint64) (bucket *Bucket)
- func (connMgr *ConnManager) JoinRoom(roomId string, wsConn *Connection) (err error)
- func (connMgr *ConnManager) LeaveRoom(roomId string, wsConn *Connection) (err error)
- func (connMgr *ConnManager) PushAll(msg *MessageReq) (err error)
- func (connMgr *ConnManager) PushByCID(connId uint64, message *MessageReq) (err error)
- func (connMgr *ConnManager) PushRoom(roomId string, items *MessageReq) (err error)
- type Connection
- func (conn *Connection) Close(context.Context)
- func (conn *Connection) Handle(ctx context.Context, h Service, req *MessageReq) (res *MessageRes, err error)
- func (conn *Connection) IsAlive() bool
- func (conn *Connection) KeepAlive()
- func (conn *Connection) ReadMessage() (message *Message, err error)
- func (conn *Connection) RouterHandle(ctx context.Context, ins *Instance, routers *map[string]Service)
- func (conn *Connection) SendMessage(message *Message) (err error)
- type Instance
- func (i *Instance) CloseSession(ctx context.Context, connId uint64) error
- func (i *Instance) Connection(ctx context.Context, wsSocket *ghttp.WebSocket, routers *map[string]Service) *Instance
- func (i *Instance) GetInstance() *Instance
- func (i *Instance) GetStats() *Statistics
- func (i *Instance) PushAll(msg *MessageReq) error
- func (i *Instance) PushRoom(roomId string, msg *MessageReq) error
- func (i *Instance) PushSession(connId uint64, message *MessageReq) error
- type JoinData
- type LeaveData
- type MergeWorker
- type Merger
- type Message
- type MessageReq
- type MessageRes
- type NullResp
- type PingData
- type PongData
- type PushBatch
- type PushContext
- type PushJob
- type Response
- type Room
- type Service
- type SrvAction
- type SrvAfter
- type SrvBefore
- type SrvHandler
- type Statistics
- func (s *Statistics) DispatchFailIncr()
- func (s *Statistics) DispatchFailNumIncr(batchSize int64)
- func (s *Statistics) DispatchPendingDesc()
- func (s *Statistics) DispatchPendingIncr()
- func (s *Statistics) DispatchTotalIncr(batchSize int64)
- func (s *Statistics) Dump() (data []byte, err error)
- func (s *Statistics) GetStats() *Statistics
- func (s *Statistics) MergerAllFailIncr(batchSize int64)
- func (s *Statistics) MergerAllTotalIncr(batchSize int64)
- func (s *Statistics) MergerPendingDesc()
- func (s *Statistics) MergerPendingIncr()
- func (s *Statistics) OnlineConnectionsDesc()
- func (s *Statistics) OnlineConnectionsIncr()
- func (s *Statistics) PushJobPendingDesc()
- func (s *Statistics) PushJobPendingIncr()
- func (s *Statistics) RoomCountDesc()
- func (s *Statistics) RoomCountIncr()
- func (s *Statistics) SendMessageFailIncr()
- func (s *Statistics) SendMessageTotalIncr()
Constants ¶
View Source
const ( NormalClosureCode = 1000 GoingAwayCode = 1001 ProtocolErrorCode = 1002 UnsupportedDataCode = 1003 ReservedCode = 1004 NoStatusRcvCode = 1005 AbnormalClosureCode = 1006 InvalidFramePayloadDataCode = 1007 PolicyViolationCode = 1008 MessageTooBigCode = 1009 MandatoryExtCode = 1010 InternalErrorCode = 1011 ServiceRestartCode = 1012 TryAgainLaterCode = 1013 ProxyInvalidResponseCode = 1014 TLSHandshakeCode = 1015 )
View Source
const ( ActionPost = "post" ActionGet = "get" ActionDelete = "delete" ActionUpdate = "update" ActionJoin = "join" ActionLeave = "leave" PushTypeUser = 0 // 推送房间 PushTypeRoom = 1 // 推送房间 PushTypeAll = 2 // 推送在线 )
推送类型
Variables ¶
View Source
var ( ErrConnectionLoss = gerror.New("ERR_CONNECTION_LOSS") ErrSendMessageFull = gerror.New("ERR_SEND_MESSAGE_FULL") ErrJoinRoomTwice = gerror.New("ERR_JOIN_ROOM_TWICE") ErrNotInRoom = gerror.New("ERR_NOT_IN_ROOM") ErrRoomIdInvalid = gerror.New("ERR_ROOM_ID_INVALID") ErrDispatchChannelFull = gerror.New("ERR_DISPATCH_CHANNEL_FULL") ErrMergeChannelFull = gerror.New("ERR_MERGE_CHANNEL_FULL") )
Functions ¶
This section is empty.
Types ¶
type ActDelete ¶
type ActDelete interface {
Delete(ctx context.Context, message *MessageReq) (*MessageRes, error)
}
type ActGet ¶
type ActGet interface {
Get(ctx context.Context, message *MessageReq) (*MessageRes, error)
}
type ActJoin ¶
type ActJoin interface {
Join(ctx context.Context, message *MessageReq) (*MessageRes, error)
}
type ActLeave ¶
type ActLeave interface {
Leave(ctx context.Context, message *MessageReq) (*MessageRes, error)
}
type ActPost ¶
type ActPost interface {
Post(ctx context.Context, message *MessageReq) (*MessageRes, error)
}
type ActUpdate ¶
type ActUpdate interface {
Update(ctx context.Context, message *MessageReq) (*MessageRes, error)
}
type BizMessage ¶
type BizMessage struct { Type string `json:"type"` // type消息类型: PING, PONG, JOIN, LEAVE, PUSH Data json.RawMessage `json:"data"` // data数据字段 }
BizMessage 业务消息的固定格式(type+data)
type BizPushData ¶
type BizPushData struct {
Items []*json.RawMessage `json:"items"`
}
BizPushData PUSH
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
func InitBucket ¶
func (*Bucket) AddConn ¶
func (bucket *Bucket) AddConn(wsConn *Connection)
func (*Bucket) DelConn ¶
func (bucket *Bucket) DelConn(wsConn *Connection)
func (*Bucket) DelConnByCID ¶
func (*Bucket) JoinRoom ¶
func (bucket *Bucket) JoinRoom(roomId string, wsConn *Connection) (err error)
type Config ¶
type Config struct { // websocket HTTP握手读超时 单位毫秒 ReadTimeout int `json:"readTimeout" yaml:"readTimeout"` // websocket HTTP握手写超时 单位毫秒 WriteTimeout int `json:"writeTimeout" yaml:"writeTimeout"` // websocket读队列长度 一般不需要修改 InChannelSize int `json:"inChannelSize" yaml:"inChannelSize"` // WebSocket写队列长度 一般不需要修改 OutChannelSize int `json:"outChannelSize" yaml:"outChannelSize"` // WebSocket心跳检查间隔 单位秒, 超过时间没有收到心跳, 服务端将主动断开链接 HeartbeatInterval int `json:"heartbeatInterval" yaml:"heartbeatInterval"` // 连接分桶的数量 桶越多, 推送的锁粒度越小, 推送并发度越高 BucketCount int `json:"bucketCount" yaml:"bucketCount"` // 每个桶的处理协程数量 影响同一时刻可以有多少个不同消息被分发出去 BucketWorkerCount int `json:"bucketWorkerCount" yaml:"bucketWorkerCount"` // bucket工作队列长度 每个bucket的分发任务放在一个独立队列中 BucketJobChannelSize int `json:"bucketJobChannelSize" yaml:"bucketJobChannelSize"` // bucket发送协程的数量 每个bucket有多个协程并发的推送消息 BucketJobWorkerCount int `json:"bucketJobWorkerCount" yaml:"bucketJobWorkerCount"` // 待分发队列的长度 分发队列缓冲所有待推送的消息, 等待被分发到bucket DispatchChannelSize int `json:"dispatchChannelSize" yaml:"dispatchChannelSize"` // 分发协程的数量 分发协程用于将待推送消息扇出给各个bucket DispatchWorkerCount int `json:"dispatchWorkerCount" yaml:"dispatchWorkerCount"` // 合并推送的最大延迟时间 单位毫秒, 在抵达maxPushBatchSize之前超时则发送 MaxMergerDelay int `json:"maxMergerDelay" yaml:"maxMergerDelay"` // 合并最多消息条数 消息推送频次越高, 应该使用更大的合并批次, 得到更高的吞吐收益 MaxMergerBatchSize int `json:"maxMergerBatchSize" yaml:"maxMergerBatchSize"` // 消息合并协程的数量 消息合并与json编码耗费CPU, 注意一个房间的消息只会由同一个协程处理. MergerWorkerCount int `json:"mergerWorkerCount" yaml:"mergerWorkerCount"` // 消息合并队列的容量 每个房间消息合并线程有一个队列, 推送量超过队列将被丢弃 MergerChannelSize int `json:"mergerChannelSize" yaml:"mergerChannelSize"` // 每个房间连接最多加入数量 MaxJoinRoom int `json:"maxJoinRoom" yaml:"maxJoinRoom"` }
type ConnManager ¶
type ConnManager struct {
// contains filtered or unexported fields
}
ConnManager 连接管理器
func ConnMgr ¶
func ConnMgr() *ConnManager
func NewConnMgr ¶
func NewConnMgr(config *Config) *ConnManager
func (*ConnManager) AddConn ¶
func (connMgr *ConnManager) AddConn(wsConnection *Connection)
func (*ConnManager) DelConn ¶
func (connMgr *ConnManager) DelConn(wsConnection *Connection)
func (*ConnManager) DelConnByCID ¶
func (connMgr *ConnManager) DelConnByCID(ctx context.Context, connId uint64)
func (*ConnManager) GetBucket ¶
func (connMgr *ConnManager) GetBucket(wsConnection *Connection) (bucket *Bucket)
func (*ConnManager) GetBucketByCID ¶
func (connMgr *ConnManager) GetBucketByCID(connId uint64) (bucket *Bucket)
func (*ConnManager) JoinRoom ¶
func (connMgr *ConnManager) JoinRoom(roomId string, wsConn *Connection) (err error)
func (*ConnManager) LeaveRoom ¶
func (connMgr *ConnManager) LeaveRoom(roomId string, wsConn *Connection) (err error)
func (*ConnManager) PushAll ¶
func (connMgr *ConnManager) PushAll(msg *MessageReq) (err error)
PushAll 向所有在线用户发送消息
func (*ConnManager) PushByCID ¶
func (connMgr *ConnManager) PushByCID(connId uint64, message *MessageReq) (err error)
PushByCID 向指定用户发送消息
func (*ConnManager) PushRoom ¶
func (connMgr *ConnManager) PushRoom(roomId string, items *MessageReq) (err error)
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
func NewConnection ¶
func (*Connection) Handle ¶
func (conn *Connection) Handle(ctx context.Context, h Service, req *MessageReq) (res *MessageRes, err error)
func (*Connection) ReadMessage ¶
func (conn *Connection) ReadMessage() (message *Message, err error)
ReadMessage 读取消息
func (*Connection) RouterHandle ¶
func (conn *Connection) RouterHandle(ctx context.Context, ins *Instance, routers *map[string]Service)
RouterHandle 处理websocket请求
func (*Connection) SendMessage ¶
func (conn *Connection) SendMessage(message *Message) (err error)
SendMessage 发送消息
type Instance ¶
type Instance struct { ServerId *uint64 `json:"serverId"` ConnManager *ConnManager `json:"connMgr"` Merge *Merger `json:"merger"` Cfg *Config `json:"config"` Stat *Statistics `json:"stats"` }
func (*Instance) CloseSession ¶
func (*Instance) Connection ¶
func (*Instance) GetInstance ¶
func (*Instance) GetStats ¶
func (i *Instance) GetStats() *Statistics
func (*Instance) PushAll ¶
func (i *Instance) PushAll(msg *MessageReq) error
func (*Instance) PushSession ¶
func (i *Instance) PushSession(connId uint64, message *MessageReq) error
type MergeWorker ¶
type MergeWorker struct {
// contains filtered or unexported fields
}
type Merger ¶
type Merger struct {
// contains filtered or unexported fields
}
Merger 广播消息、消息的合并
type Message ¶
Message websocket的Message对象
func BuildWsMessage ¶
func EncodeWsMessage ¶
func EncodeWsMessage(message *MessageReq) (wsMessage *Message, err error)
type MessageReq ¶
type MessageReq struct { Service string `json:"service"` Action string `json:"action"` Data interface{} `json:"data"` }
func DecodeMessage ¶
func DecodeMessage(buf []byte) (message *MessageReq, err error)
DecodeMessage 解析{"type": "PING", "data": {...}}的包
type MessageRes ¶
type PongData ¶
type PongData struct { SessionId uint64 `json:"session_id"` KeepAlive int `json:"keep_alive"` DateTime string `json:"date_time"` Expire int64 `json:"expire"` }
PongData PONG
type PushContext ¶
type PushContext struct {
// contains filtered or unexported fields
}
type PushJob ¶
type PushJob struct { // union { Msg *MessageReq // 未序列化的业务消息 // contains filtered or unexported fields }
PushJob 推送任务
type Room ¶
type Room struct {
// contains filtered or unexported fields
}
Room 房间
func (*Room) Join ¶
func (room *Room) Join(wsConn *Connection) (err error)
func (*Room) Leave ¶
func (room *Room) Leave(wsConn *Connection) (err error)
type SrvBefore ¶
type SrvBefore interface {
Before(ctx context.Context, message *MessageReq) error
}
type SrvHandler ¶
type SrvHandler interface {
Handle(ctx context.Context, message *MessageReq) (*MessageRes, error)
}
type Statistics ¶
type Statistics struct { // 反馈在线长连接的数量 OnlineConnections int64 `json:"online_connections"` // 反馈客户端的推送压力 SendMessageTotal int64 `json:"send_message_total"` SendMessageFail int64 `json:"send_message_fail"` // 反馈ConnMgr消息分发模块的压力 // 推送失败次数 PushFail int64 `json:"push_fail"` DispatchPending int64 `json:"dispatch_pending"` PushJobPending int64 `json:"push_job_pending"` // 分发总消息数 DispatchTotal int64 `json:"dispatch_total"` // 分发丢弃消息数 DispatchFail int64 `json:"dispatch_fail"` // 返回出房间在线的总数, 有利于分析内存上涨的原因 RoomCount int64 `json:"room_count"` // Merger模块处理队列, 反馈出消息合并的压力情况 MergerPending int64 `json:"merger_pending"` // Merger模块合并发送的消息总数与失败总数 MergerRoomTotal int64 `json:"merger_room_total"` MergerAllTotal int64 `json:"merger_all_total"` MergerRoomFail int64 `json:"merger_room_fail"` MergerAllFail int64 `json:"merger_all_fail"` }
func GetStats ¶
func GetStats() *Statistics
func (*Statistics) DispatchFailIncr ¶
func (s *Statistics) DispatchFailIncr()
func (*Statistics) DispatchFailNumIncr ¶
func (s *Statistics) DispatchFailNumIncr(batchSize int64)
func (*Statistics) DispatchPendingDesc ¶
func (s *Statistics) DispatchPendingDesc()
func (*Statistics) DispatchPendingIncr ¶
func (s *Statistics) DispatchPendingIncr()
func (*Statistics) DispatchTotalIncr ¶
func (s *Statistics) DispatchTotalIncr(batchSize int64)
func (*Statistics) Dump ¶
func (s *Statistics) Dump() (data []byte, err error)
func (*Statistics) GetStats ¶
func (s *Statistics) GetStats() *Statistics
func (*Statistics) MergerAllFailIncr ¶
func (s *Statistics) MergerAllFailIncr(batchSize int64)
func (*Statistics) MergerAllTotalIncr ¶
func (s *Statistics) MergerAllTotalIncr(batchSize int64)
func (*Statistics) MergerPendingDesc ¶
func (s *Statistics) MergerPendingDesc()
func (*Statistics) MergerPendingIncr ¶
func (s *Statistics) MergerPendingIncr()
func (*Statistics) OnlineConnectionsDesc ¶
func (s *Statistics) OnlineConnectionsDesc()
func (*Statistics) OnlineConnectionsIncr ¶
func (s *Statistics) OnlineConnectionsIncr()
func (*Statistics) PushJobPendingDesc ¶
func (s *Statistics) PushJobPendingDesc()
func (*Statistics) PushJobPendingIncr ¶
func (s *Statistics) PushJobPendingIncr()
func (*Statistics) RoomCountDesc ¶
func (s *Statistics) RoomCountDesc()
func (*Statistics) RoomCountIncr ¶
func (s *Statistics) RoomCountIncr()
func (*Statistics) SendMessageFailIncr ¶
func (s *Statistics) SendMessageFailIncr()
func (*Statistics) SendMessageTotalIncr ¶
func (s *Statistics) SendMessageTotalIncr()
Click to show internal directories.
Click to hide internal directories.