Documentation
¶
Index ¶
- func InitTCP(server *Server, addrs []string, accept int) (err error)
- func InitWebsocket(server *Server, addrs []string, accept int) (err error)
- func InitWebsocketWithTLS(server *Server, addrs []string, certFile, privateFile string, accept int) (err error)
- func InitWhitelist(c *conf.Whitelist) (err error)
- type Bucket
- func (b *Bucket) Broadcast(p *grpc.Proto, op int32)
- func (b *Bucket) BroadcastRoom(arg *grpc.BroadcastRoomReq)
- func (b *Bucket) ChangeRoom(nrid string, ch *Channel) (err error)
- func (b *Bucket) Channel(key string) (ch *Channel)
- func (b *Bucket) ChannelCount() int
- func (b *Bucket) Del(dch *Channel)
- func (b *Bucket) DelRoom(room *Room)
- func (b *Bucket) IPCount() (res map[string]struct{})
- func (b *Bucket) Put(rid string, ch *Channel) (err error)
- func (b *Bucket) Room(rid string) (room *Room)
- func (b *Bucket) RoomCount() int
- func (b *Bucket) Rooms() (res map[string]struct{})
- func (b *Bucket) RoomsCount() (res map[string]int32)
- func (b *Bucket) UpRoomsCount(roomCountMap map[string]int32)
- type Channel
- type Ring
- type Room
- type Round
- type RoundOptions
- type Server
- func (s *Server) Bucket(subKey string) *Bucket
- func (s *Server) Buckets() []*Bucket
- func (s *Server) Close() (err error)
- func (s *Server) Connect(c context.Context, p *model.Proto, cookie string) (mid int64, key, rid string, accepts []int32, heartbeat time.Duration, ...)
- func (s *Server) Disconnect(c context.Context, mid int64, key string) (err error)
- func (s *Server) Heartbeat(ctx context.Context, mid int64, key string) (err error)
- func (s *Server) Operate(ctx context.Context, p *model.Proto, ch *Channel, b *Bucket) error
- func (s *Server) RandServerHearbeat() time.Duration
- func (s *Server) Receive(ctx context.Context, mid int64, p *model.Proto) (err error)
- func (s *Server) RenewOnline(ctx context.Context, serverID string, rommCount map[string]int32) (allRoom map[string]int32, err error)
- func (s *Server) ServeTCP(conn *net.TCPConn, rp, wp *bytes.Pool, tr *xtime.Timer)
- func (s *Server) ServeWebsocket(conn net.Conn, rp, wp *bytes.Pool, tr *xtime.Timer)
- type Whitelist
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitWebsocket ¶
InitWebsocket listen all tcp.bind and start accept connections.
func InitWebsocketWithTLS ¶
func InitWebsocketWithTLS(server *Server, addrs []string, certFile, privateFile string, accept int) (err error)
InitWebsocketWithTLS init websocket with tls.
func InitWhitelist ¶
InitWhitelist a whitelist struct.
Types ¶
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
func (*Bucket) Broadcast ¶
Broadcast push msgs to all channels in the bucket. 把消息 p 推送到对本 Bucket 内所有关注房间 op 的 channels 中。
func (*Bucket) BroadcastRoom ¶
func (b *Bucket) BroadcastRoom(arg *grpc.BroadcastRoomReq)
BroadcastRoom broadcast a message to specified room
一个 Bucket 会开多个 goroutine 来并发的做单房间的广播消息推送,每个 goroutine 监听一个管道。 推送单房间广播消息时,可以随机发往任何管道,交由对应的 goroutine 来处理。 这里为了保证负载均衡,使用了 原子锁 + 递增 方式,本质就是轮循选择一个 goroutine 。
这里 BroadcastRoom() 函数就是简单选择一个管道,然后把消息发给它,后面 go roomproc() 中会取出并处理。
func (*Bucket) ChangeRoom ¶
ChangeRoom change ro room user 更换房间。
func (*Bucket) ChannelCount ¶
ChannelCount channel count in the bucket 当前 bucket 内用户总数。
func (*Bucket) RoomsCount ¶
RoomsCount get all room id where online number > 0. 获取每个房间内的用户数目。
func (*Bucket) UpRoomsCount ¶
UpRoomsCount update all room count
type Channel ¶
type Channel struct { Room *Room // user 归属房间 CliProto Ring // cliProto 是一个 Ring Buffer,保存 Room 广播或是 client 直接发送过来的消息体 Writer bufio.Writer // 客户端连接 conn 的 写 封装 Reader bufio.Reader // 客户端连接 conn 的 读 封装 Next *Channel // 双向链表 rlink Prev *Channel // 双向链表 llink Mid int64 // user id Key string // user 在 logic service 的 key IP string // user ip // contains filtered or unexported fields }
func (*Channel) Signal ¶
func (c *Channel) Signal()
Signal send signal to the channel, protocol ready.
type Ring ¶
type Ring struct {
// contains filtered or unexported fields
}
Ring ring proto buffer.
type Room ¶
type Room struct { ID string // 房间号 Online int32 // 房间的 channel 数量,即房间的在线用户的多少 // dirty read is ok AllOnline int32 // contains filtered or unexported fields }
type Round ¶
type Round struct {
// contains filtered or unexported fields
}
Round userd for connection round-robin get a reader/writer/timer for split big lock.
func (*Round) Reader ¶
Reader get a reader memory buffer. 取 Reader Pool ,给定数字 rn 会以取余方式选择其中一个,用于分散锁竞争压力,增加并发量。
type RoundOptions ¶
type RoundOptions struct { Timer int // 每次要分配多少个用于 time.Timer 的 Pool TimerSize int // 每个 time.Timer 一开始能接收的 TimerData 数量 Reader int // 每次要分配多少个用于 Reader bytes 的 Pool ReadBuf int // 每个 Reader bytes Pool 有多少个 Buffer ReadBufSize int // 每个 Reader bytes Pool 的 Buffer 能有多大的空间 Writer int // 每次要分配多少个用于 Writer bytes 的 Pool WriteBuf int // 每个 Writer bytes Pool 有多少个 Buffer WriteBufSize int // 每个 Writer bytes Pool 的 Buffer 能有多大的空间 }
RoundOptions round options.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is comet server.
func (*Server) Bucket ¶
Bucket get the bucket by subkey.
用 CityHash32 计算 user key 的哈希值,然后对 bucket 总数取余算得 idx ,来取出 user key 对应的 bucket。 这种方法能够尽可能将 user 分散到不同的 bucket 中,随着 bucket 的增加,每个 bucket 内的 user 总数是降低的, 这样便可以在高并发时,避免在同一个 bucket 上的频繁锁竞争。
注,一个 user key 会始终映射到相同的 bucket 里,除非发生 bucket 的扩容,这个另做讨论。
func (*Server) Connect ¶
func (s *Server) Connect(c context.Context, p *model.Proto, cookie string) (mid int64, key, rid string, accepts []int32, heartbeat time.Duration, err error)
Connect connected a connection. 告知 logic service 有人想要进入某个房间。
func (*Server) Disconnect ¶
Disconnect disconnected a connection. client 连接中断,告知 logic service 需清理此人的状态信息。
func (*Server) RandServerHearbeat ¶
RandServerHearbeat rand server heartbeat. 返回 [10 min, 30 min] 之间的一个随机值,被用作 comet - logic 之间上报 user 心跳的时间间隔
func (*Server) RenewOnline ¶
func (s *Server) RenewOnline(ctx context.Context, serverID string, rommCount map[string]int32) (allRoom map[string]int32, err error)
RenewOnline renew room online.