Documentation
¶
Index ¶
- Constants
- Variables
- func CreateMessageByID(hash uint32) proto.Message
- func CreateMessageByName(name string) proto.Message
- func CreatePairingAck(req proto.Message) proto.Message
- func CreatePairingAckBy(reqName string) proto.Message
- func DateTime() string
- func DeregisterHandler(command uint16)
- func DispatchMessage(ctx context.Context, msg *Packet) error
- func FreePacket(pkt *Packet)
- func GetEnv(key string, def string) string
- func GetEnvBool(key string) bool
- func GetEnvFloat(key string, def float64) float64
- func GetEnvInt(key string, def int) int
- func GetEnvInt64(key string, def int64) int64
- func GetMessageIDByName(name string) uint32
- func GetMessageIDOf(msg proto.Message) uint32
- func GetMessageNameByID(hash uint32) string
- func GetPairingAckName(reqName string) string
- func GetServiceTypes() []uint8
- func IBody2Bytes(body interface{}) []byte
- func IBody2Float(body interface{}) float64
- func IBody2Int64(body interface{}) int64
- func IBody2String(body interface{}) string
- func IBodyAsBytes(body interface{}) []byte
- func Now() time.Time
- func NowString() string
- func PB2JSON(msg proto.Message) string
- func RegisterAllMessages(validNameSuffix ...string) (er error)
- func RegisterHandler(command uint16, action PacketHandler)
- func RegisterService(s Service)
- func SafeToBody(val interface{}) interface{}
- func SetPacketAllocPolicy(policy int) int
- func SetPanicHandler(f PanicHandler)
- func StartClock()
- func StopClock()
- func WallClock() *datetime.Clock
- type ArenaAllocator
- type Endpoint
- type EndpointFlag
- type EndpointHashMap
- func (m *EndpointHashMap) Clear()
- func (m *EndpointHashMap) Foreach(action func(Endpoint) int)
- func (m *EndpointHashMap) Get(node NodeID) Endpoint
- func (m *EndpointHashMap) Has(node NodeID) bool
- func (m *EndpointHashMap) IsEmpty() bool
- func (m *EndpointHashMap) Keys() []NodeID
- func (m *EndpointHashMap) Put(node NodeID, endpoint Endpoint)
- func (m *EndpointHashMap) PutIfAbsent(node NodeID, endpoint Endpoint)
- func (m *EndpointHashMap) Remove(node NodeID)
- func (m *EndpointHashMap) Size() int
- type EndpointMap
- type EndpointMapShard
- type EndpointShardedMap
- func (m *EndpointShardedMap) Clear()
- func (m *EndpointShardedMap) Foreach(action func(Endpoint) int)
- func (m *EndpointShardedMap) Get(node NodeID) Endpoint
- func (m *EndpointShardedMap) Has(node NodeID) bool
- func (m *EndpointShardedMap) IsEmpty() bool
- func (m *EndpointShardedMap) Keys() []NodeID
- func (m *EndpointShardedMap) Put(node NodeID, endpoint Endpoint)
- func (m *EndpointShardedMap) PutIfAbsent(node NodeID, endpoint Endpoint)
- func (m *EndpointShardedMap) Remove(node NodeID)
- func (m *EndpointShardedMap) Size() int
- type EndpointSyncMap
- func (m *EndpointSyncMap) Clear()
- func (m *EndpointSyncMap) Foreach(action func(Endpoint) int)
- func (m *EndpointSyncMap) Get(node NodeID) Endpoint
- func (m *EndpointSyncMap) Has(node NodeID) bool
- func (m *EndpointSyncMap) IsEmpty() bool
- func (m *EndpointSyncMap) Keys() []NodeID
- func (m *EndpointSyncMap) Put(node NodeID, endpoint Endpoint)
- func (m *EndpointSyncMap) PutIfAbsent(node NodeID, endpoint Endpoint)
- func (m *EndpointSyncMap) Remove(node NodeID)
- func (m *EndpointSyncMap) Size() int
- type IPacketAllocator
- type MessageDispatcher
- func (d *MessageDispatcher) Deregister(command uint16)
- func (d *MessageDispatcher) DeregisterOne(command uint16, handler PacketHandler)
- func (d *MessageDispatcher) Dispatch(ctx context.Context, pkt *Packet) error
- func (d *MessageDispatcher) Register(command uint16, handler PacketHandler)
- func (d *MessageDispatcher) SetPanicHandler(f PanicHandler)
- type NetPacket
- type NodeID
- type NodeIDSet
- type Packet
- func (m *Packet) AutoDecode() error
- func (m *Packet) Clone() *Packet
- func (m *Packet) Decode(msg proto.Message) error
- func (m *Packet) DecodeTo(msg proto.Message) error
- func (m *Packet) EncodeToBytes() []byte
- func (m *Packet) Errno() int32
- func (m *Packet) Refuse(errno int32) error
- func (m *Packet) Reply(ack proto.Message) error
- func (m *Packet) ReplyAny(body interface{}) error
- func (m *Packet) Reset()
- func (m *Packet) SessionNode() NodeID
- func (m *Packet) SetBody(val interface{})
- func (m *Packet) SetErrno(ec int32)
- func (m *Packet) String() string
- type PacketFlag
- type PacketHandler
- type PacketType
- type PanicHandler
- type PoolAllocator
- type Runnable
- type RuntimeAllocator
- type Service
- type ServiceContext
- func (c *ServiceContext) AddFinalizer(action func())
- func (c *ServiceContext) Close()
- func (c *ServiceContext) EnqueuePacket(pkt *Packet)
- func (c *ServiceContext) EnqueueTask(task Runnable)
- func (c *ServiceContext) InboundQueue() chan<- *Packet
- func (c *ServiceContext) Instance() Service
- func (c *ServiceContext) MessageQueue() <-chan *Packet
- func (c *ServiceContext) OutboundQueue() <-chan *Packet
- func (c *ServiceContext) QuitDone() <-chan struct{}
- func (c *ServiceContext) RunID() string
- func (c *ServiceContext) SendPacket(pkt *Packet)
- func (c *ServiceContext) SetInstance(inst Service)
- func (c *ServiceContext) StartTime() time.Time
- func (c *ServiceContext) TaskQueue() <-chan Runnable
- func (c *ServiceContext) TryEnqueuePacket(pkt *Packet) bool
- func (c *ServiceContext) TryEnqueueTask(task Runnable) bool
- func (c *ServiceContext) TrySendPacket(pkt *Packet) bool
- type State
- type Task
- type TaskFactory
- type Timer
- type TimerQueue
- func (s *TimerQueue) Cancel(id int) bool
- func (s *TimerQueue) Chan() <-chan Runnable
- func (s *TimerQueue) IsPending(id int) bool
- func (s *TimerQueue) RunAfter(timeUnits int, r Runnable) int
- func (s *TimerQueue) RunEvery(interval int, r Runnable) int
- func (s *TimerQueue) Shutdown()
- func (s *TimerQueue) Size() int
- func (s *TimerQueue) Start() error
- type Tuple
Constants ¶
const ( FOREACH_CONTINUE = 0 FOREACH_EXIT = 1 )
foreach控制
const ( SERVICE_ALL uint8 = 0xFF // 所有服务 INSTANCE_ALL uint16 = 0xFFFF // 所有实例 INSTANCE_ANY uint16 = 0x0000 // 任意实例 REFERER_ALL uint32 = 0xFFFFFFFF // 广播给所有玩家 )
const ( NodeServiceShift = 16 NodeTypeShift = 31 NodeTypeMask = 1 << NodeTypeShift )
const ( AllocPolicyRuntime = 0 AllocPolicyPool = 1 AllocPolicyArena = 2 )
不同的分配算法(策略)
const ( SuffixREQ = "Req" SuffixACK = "Ack" )
消息协议规则:
1, 请求消息以REQ结尾 2, 响应消息以ACK结尾 3, 通知消息以NTF结尾
const ( StateInit = 0 StateStarting = 1 StateRunning = 2 StateShuttingDown = 3 StateTerminated = 4 )
const ( TaskStateInit = 0 TaskScheduled = 1 // task is scheduled for execution TaskExecuted = 2 // a non-repeating task has already executed (or is currently executing) and has not been cancelled. TaskCancelled = 3 // task has been cancelled (with a call to TimerTask.Cancel). )
const ( PendingQueueCapacity = 128 // pending add/delete TimeoutQueueCapacity = 1024 // pending timed-out )
const ClockEpoch int64 = 1577836800 // 2020-01-01 00:00:00 UTC
epoch of clock
const EndpointMapShardCount = 16
const VERSION = "0.6.6"
版本号
Variables ¶
var (
ErrContextInboundQueueFull = errors.New("context inbound queue full")
)
Functions ¶
func CreateMessageByName ¶ added in v0.4.4
根据名称创建消息
func CreatePairingAck ¶ added in v0.4.4
如果消息的名字是XXXReq,则尝试创建与其名称对应的XXXAck消息
func CreatePairingAckBy ¶ added in v0.4.4
如果消息的名字是XXXReq,则尝试创建与其名称对应的XXXAck消息
func DeregisterHandler ¶ added in v0.4.8
func DeregisterHandler(command uint16)
func GetEnvInt64 ¶ added in v0.4.7
func GetPairingAckName ¶ added in v0.4.4
根据Req消息的名称,返回其对应的Ack消息名称
func IBodyAsBytes ¶ added in v0.4.1
func IBodyAsBytes(body interface{}) []byte
func RegisterAllMessages ¶ added in v0.6.1
自动注册所有protobuf消息 因为protobuf使用init()注册(RegisterType),此API需要在import后调用
func RegisterHandler ¶ added in v0.4.8
func RegisterHandler(command uint16, action PacketHandler)
func SetPanicHandler ¶ added in v0.4.8
func SetPanicHandler(f PanicHandler)
Types ¶
type ArenaAllocator ¶ added in v0.4.2
type ArenaAllocator struct {
// contains filtered or unexported fields
}
arena算法的分配器 思路是:
一次申请一个block(N个元素的数组),然后从block数组里再逐个按需分配,block分配完了就丢掉(交给GC),再申请另一个block 这样对runtime来说每次malloc都是以N个元素大小的单位,理论上可以减缓GC的压力,并且业务层无需做Free
func NewArenaAllocator ¶ added in v0.4.2
func NewArenaAllocator(blockSize int) *ArenaAllocator
type Endpoint ¶
type Endpoint interface { // 节点号 Node() NodeID SetNode(NodeID) // 远端地址 RemoteAddr() string // 发送消息(非阻塞) Send(*Packet) error // 关闭读/写 Close() error ForceClose(error) IsRunning() bool // 绑定自定义数据 SetUserData(interface{}) UserData() interface{} // 原始连接对象 RawConn() net.Conn // 发送/接收计数数据 Stats() *stats.Stats ErrorChan() <-chan error OutMsgChan() chan<- *Packet // 开启read/write线程 Go(EndpointFlag) // 设置加解密 SetEncryptPair(cipher.BlockCryptor, cipher.BlockCryptor) }
网络端点
type EndpointFlag ¶
type EndpointFlag uint32
开启reader/writer标记
const ( EndpointReader EndpointFlag = 0x01 // 只开启reader EndpointWriter EndpointFlag = 0x02 // 只开启writer EndpointReadWriter EndpointFlag = 0x03 // 开启reader和writer )
type EndpointHashMap ¶ added in v0.1.32
type EndpointHashMap struct {
// contains filtered or unexported fields
}
线程安全的EndpointMap,适合数据量不是很大的场景
func NewEndpointHashMap ¶ added in v0.1.32
func NewEndpointHashMap() *EndpointHashMap
func (*EndpointHashMap) Clear ¶ added in v0.1.32
func (m *EndpointHashMap) Clear()
func (*EndpointHashMap) Foreach ¶ added in v0.1.32
func (m *EndpointHashMap) Foreach(action func(Endpoint) int)
action应该对map是read-only
func (*EndpointHashMap) Get ¶ added in v0.1.32
func (m *EndpointHashMap) Get(node NodeID) Endpoint
func (*EndpointHashMap) Has ¶ added in v0.1.32
func (m *EndpointHashMap) Has(node NodeID) bool
func (*EndpointHashMap) IsEmpty ¶ added in v0.1.32
func (m *EndpointHashMap) IsEmpty() bool
func (*EndpointHashMap) Keys ¶ added in v0.1.32
func (m *EndpointHashMap) Keys() []NodeID
func (*EndpointHashMap) Put ¶ added in v0.1.32
func (m *EndpointHashMap) Put(node NodeID, endpoint Endpoint)
func (*EndpointHashMap) PutIfAbsent ¶ added in v0.1.32
func (m *EndpointHashMap) PutIfAbsent(node NodeID, endpoint Endpoint)
func (*EndpointHashMap) Remove ¶ added in v0.1.32
func (m *EndpointHashMap) Remove(node NodeID)
func (*EndpointHashMap) Size ¶ added in v0.1.32
func (m *EndpointHashMap) Size() int
type EndpointMap ¶
type EndpointMap interface { Size() int IsEmpty() bool Has(node NodeID) bool Get(node NodeID) Endpoint Keys() []NodeID Foreach(func(Endpoint) int) Put(node NodeID, endpoint Endpoint) PutIfAbsent(node NodeID, endpoint Endpoint) Remove(node NodeID) Clear() }
线程安全的Endpoint字典
type EndpointMapShard ¶ added in v0.1.32
func (*EndpointMapShard) Clear ¶ added in v0.1.32
func (s *EndpointMapShard) Clear()
type EndpointShardedMap ¶ added in v0.6.7
type EndpointShardedMap struct {
// contains filtered or unexported fields
}
线程安全的EndpointMap,适合数据量很大的场景
func NewEndpointShardedMap ¶ added in v0.6.7
func NewEndpointShardedMap() *EndpointShardedMap
func (*EndpointShardedMap) Clear ¶ added in v0.6.7
func (m *EndpointShardedMap) Clear()
func (*EndpointShardedMap) Foreach ¶ added in v0.6.7
func (m *EndpointShardedMap) Foreach(action func(Endpoint) int)
action应该对map是read-only
func (*EndpointShardedMap) Get ¶ added in v0.6.7
func (m *EndpointShardedMap) Get(node NodeID) Endpoint
func (*EndpointShardedMap) Has ¶ added in v0.6.7
func (m *EndpointShardedMap) Has(node NodeID) bool
func (*EndpointShardedMap) IsEmpty ¶ added in v0.6.7
func (m *EndpointShardedMap) IsEmpty() bool
func (*EndpointShardedMap) Keys ¶ added in v0.6.7
func (m *EndpointShardedMap) Keys() []NodeID
func (*EndpointShardedMap) Put ¶ added in v0.6.7
func (m *EndpointShardedMap) Put(node NodeID, endpoint Endpoint)
func (*EndpointShardedMap) PutIfAbsent ¶ added in v0.6.7
func (m *EndpointShardedMap) PutIfAbsent(node NodeID, endpoint Endpoint)
func (*EndpointShardedMap) Remove ¶ added in v0.6.7
func (m *EndpointShardedMap) Remove(node NodeID)
func (*EndpointShardedMap) Size ¶ added in v0.6.7
func (m *EndpointShardedMap) Size() int
type EndpointSyncMap ¶ added in v0.6.7
type EndpointSyncMap struct {
// contains filtered or unexported fields
}
func (*EndpointSyncMap) Clear ¶ added in v0.6.7
func (m *EndpointSyncMap) Clear()
func (*EndpointSyncMap) Foreach ¶ added in v0.6.7
func (m *EndpointSyncMap) Foreach(action func(Endpoint) int)
func (*EndpointSyncMap) Get ¶ added in v0.6.7
func (m *EndpointSyncMap) Get(node NodeID) Endpoint
func (*EndpointSyncMap) Has ¶ added in v0.6.7
func (m *EndpointSyncMap) Has(node NodeID) bool
func (*EndpointSyncMap) IsEmpty ¶ added in v0.6.7
func (m *EndpointSyncMap) IsEmpty() bool
func (*EndpointSyncMap) Keys ¶ added in v0.6.7
func (m *EndpointSyncMap) Keys() []NodeID
func (*EndpointSyncMap) Put ¶ added in v0.6.7
func (m *EndpointSyncMap) Put(node NodeID, endpoint Endpoint)
func (*EndpointSyncMap) PutIfAbsent ¶ added in v0.6.7
func (m *EndpointSyncMap) PutIfAbsent(node NodeID, endpoint Endpoint)
func (*EndpointSyncMap) Remove ¶ added in v0.6.7
func (m *EndpointSyncMap) Remove(node NodeID)
func (*EndpointSyncMap) Size ¶ added in v0.6.7
func (m *EndpointSyncMap) Size() int
type IPacketAllocator ¶ added in v0.6.1
func GetPacketAllocator ¶ added in v0.6.1
func GetPacketAllocator(policy int) IPacketAllocator
type MessageDispatcher ¶ added in v0.4.8
type MessageDispatcher struct {
// contains filtered or unexported fields
}
消息派发
func DefaultDispatcher ¶ added in v0.4.8
func DefaultDispatcher() *MessageDispatcher
func NewMessageDispatcher ¶ added in v0.4.8
func NewMessageDispatcher() *MessageDispatcher
func (*MessageDispatcher) Deregister ¶ added in v0.4.8
func (d *MessageDispatcher) Deregister(command uint16)
取消所有
func (*MessageDispatcher) DeregisterOne ¶ added in v0.4.8
func (d *MessageDispatcher) DeregisterOne(command uint16, handler PacketHandler)
取消单个注册
func (*MessageDispatcher) Dispatch ¶ added in v0.4.8
func (d *MessageDispatcher) Dispatch(ctx context.Context, pkt *Packet) error
func (*MessageDispatcher) Register ¶ added in v0.4.8
func (d *MessageDispatcher) Register(command uint16, handler PacketHandler)
注册一个
func (*MessageDispatcher) SetPanicHandler ¶ added in v0.4.8
func (d *MessageDispatcher) SetPanicHandler(f PanicHandler)
type NodeID ¶
type NodeID uint32
节点ID 一个32位整数表示的节点号,用以标识一个service(最高位为0),或者一个客户端session(最高位为1) 如果是服务编号:8位服务编号,16位服务实例编号
服务实例二进制布局 -------------------------------------- | reserved | service | instance | -------------------------------------- 32 24 16 0
type Packet ¶ added in v0.6.1
type Packet struct { Type PacketType `json:"type,omitempty"` // 消息类型 Flags PacketFlag `json:"flg,omitempty"` // 标志位 Command uint16 `json:"cmd"` // 协议命令,即如何执行消息 MsgID uint32 `json:"mid,omitempty"` // 消息ID,即如何解析body Seq uint32 `json:"seq,omitempty"` // 会话内的唯一序列号 Node NodeID `json:"node,omitempty"` // 源/目标节点 Body interface{} `json:"body,omitempty"` // 消息内容,int32/int64/float64/string/bytes/protobuf.Message Session Endpoint `json:"-"` // 关联的会话 }
Packet表示一个应用层消息
func NewPacket ¶ added in v0.6.1
func NewPacket(cmd uint16, node NodeID, seq uint32, pType PacketType, flag PacketFlag, body proto.Message) *Packet
func (*Packet) AutoDecode ¶ added in v0.6.1
根据MsgID把body的字节流解析为proto消息
func (*Packet) EncodeToBytes ¶ added in v0.6.1
func (*Packet) SessionNode ¶ added in v0.6.7
type PacketFlag ¶
type PacketFlag uint8
消息标志位
const ( PFlagCompressed PacketFlag = 0x01 // 压缩 PFlagEncrypted PacketFlag = 0x02 // 加密 PFlagRoute PacketFlag = 0x08 // 路由标记 )
func (PacketFlag) Clear ¶ added in v0.1.21
func (g PacketFlag) Clear(n PacketFlag) PacketFlag
func (PacketFlag) Has ¶ added in v0.1.21
func (g PacketFlag) Has(n PacketFlag) bool
type PacketType ¶
type PacketType uint8
消息类型
const ( PTypeMessage PacketType = 0x01 // 普通消息 PTypePing PacketType = 0x02 // 心跳 PTypePong PacketType = 0x03 // 心跳回复 PTypeError PacketType = 0x04 // 错误 PTypeRequest PacketType = 0x05 // 远程调用请求 PTypeResponse PacketType = 0x06 // 远程调用返回 PTypeControl PacketType = 0x07 // 控制 )
type PanicHandler ¶ added in v0.4.8
type PanicHandler func(*Packet, interface{})
type PoolAllocator ¶ added in v0.4.2
type PoolAllocator struct {
// contains filtered or unexported fields
}
使用sync.Pool的分配器
func NewPoolAllocator ¶ added in v0.4.2
func NewPoolAllocator() *PoolAllocator
func (*PoolAllocator) Alloc ¶ added in v0.4.2
func (a *PoolAllocator) Alloc() *Packet
func (*PoolAllocator) Free ¶ added in v0.4.2
func (a *PoolAllocator) Free(msg *Packet)
放回pool里供后面的Alloc使用
type RuntimeAllocator ¶ added in v0.4.2
type RuntimeAllocator struct {
// contains filtered or unexported fields
}
使用new函数的分配器
func NewRuntimeAllocator ¶ added in v0.4.2
func NewRuntimeAllocator() *RuntimeAllocator
func (*RuntimeAllocator) Alloc ¶ added in v0.4.2
func (a *RuntimeAllocator) Alloc() *Packet
func (*RuntimeAllocator) Free ¶ added in v0.4.2
func (a *RuntimeAllocator) Free(msg *Packet)
type Service ¶
type Service interface { Type() uint8 // 节点号 Node() NodeID SetNode(id NodeID) // 当前状态 State() int32 SetState(int32) // 上下文对象 Context() *ServiceContext // 初始化 Init(context.Context, *ServiceContext) error // 启动服务 Startup(context.Context) error }
Service实现具体的服务
type ServiceContext ¶
type ServiceContext struct {
// contains filtered or unexported fields
}
服务的上下文
func NewServiceContext ¶
func NewServiceContext(inboundQueueSize, outboundQueueSize, taskQueueSize int) *ServiceContext
func (*ServiceContext) AddFinalizer ¶ added in v0.1.28
func (c *ServiceContext) AddFinalizer(action func())
func (*ServiceContext) EnqueuePacket ¶ added in v0.6.7
func (c *ServiceContext) EnqueuePacket(pkt *Packet)
投递一条待处理的消息
func (*ServiceContext) EnqueueTask ¶ added in v0.6.2
func (c *ServiceContext) EnqueueTask(task Runnable)
投递待执行任务
func (*ServiceContext) InboundQueue ¶ added in v0.1.6
func (c *ServiceContext) InboundQueue() chan<- *Packet
用于给session在select语句里投递收到的网络消息
func (*ServiceContext) Instance ¶ added in v0.1.2
func (c *ServiceContext) Instance() Service
service实例
func (*ServiceContext) MessageQueue ¶
func (c *ServiceContext) MessageQueue() <-chan *Packet
消息队列,仅消费
func (*ServiceContext) OutboundQueue ¶ added in v0.6.2
func (c *ServiceContext) OutboundQueue() <-chan *Packet
待发送队列
func (*ServiceContext) QuitDone ¶ added in v0.1.24
func (c *ServiceContext) QuitDone() <-chan struct{}
等待close完成
func (*ServiceContext) SendPacket ¶ added in v0.6.1
func (c *ServiceContext) SendPacket(pkt *Packet)
投递一条待发送的消息
func (*ServiceContext) SetInstance ¶ added in v0.2.9
func (c *ServiceContext) SetInstance(inst Service)
func (*ServiceContext) StartTime ¶ added in v0.1.24
func (c *ServiceContext) StartTime() time.Time
func (*ServiceContext) TaskQueue ¶ added in v0.6.2
func (c *ServiceContext) TaskQueue() <-chan Runnable
任务队列,仅消费
func (*ServiceContext) TryEnqueuePacket ¶ added in v0.6.7
func (c *ServiceContext) TryEnqueuePacket(pkt *Packet) bool
func (*ServiceContext) TryEnqueueTask ¶ added in v0.6.7
func (c *ServiceContext) TryEnqueueTask(task Runnable) bool
func (*ServiceContext) TrySendPacket ¶ added in v0.6.7
func (c *ServiceContext) TrySendPacket(pkt *Packet) bool
type State ¶ added in v0.1.19
type State int32
service state
func (*State) IsShuttingDown ¶ added in v0.1.19
func (*State) IsTerminated ¶ added in v0.1.19
type TaskFactory ¶ added in v0.6.6
type TaskFactory struct {
// contains filtered or unexported fields
}
func (*TaskFactory) Chan ¶ added in v0.6.6
func (f *TaskFactory) Chan() <-chan Task
func (*TaskFactory) Init ¶ added in v0.6.6
func (f *TaskFactory) Init(capacity int) *TaskFactory
func (*TaskFactory) Schedule ¶ added in v0.6.6
func (f *TaskFactory) Schedule(task Task) error
func (*TaskFactory) Start ¶ added in v0.6.6
func (f *TaskFactory) Start() error
type Timer ¶ added in v0.4.8
type Timer interface { Start() error // 关闭定时器 Shutdown() // 在`timeUnits`时间后执行`r` RunAfter(timeUnits int, r Runnable) int // 每隔`interval`时间执行`r` RunEvery(interval int, r Runnable) int // 取消一个timer Cancel(id int) bool // 判断timer是否在计划中 IsPending(id int) bool // 超时的待执行runner Chan() <-chan Runnable // timer数量 Size() int }
定时器
func NewDefaultTimerQueue ¶ added in v0.4.8
func NewDefaultTimerQueue() Timer
type TimerQueue ¶ added in v0.4.8
type TimerQueue struct { C chan Runnable // 到期的定时器 // contains filtered or unexported fields }
最小堆实现的定时器 标准库的四叉堆实现的time.Timer已经可以满足大多数高精度的定时需求 这个实现主要是为了在大量timer的场景,把timer的压力从runtime放到应用上
func (*TimerQueue) Chan ¶ added in v0.4.8
func (s *TimerQueue) Chan() <-chan Runnable
func (*TimerQueue) IsPending ¶ added in v0.4.8
func (s *TimerQueue) IsPending(id int) bool
func (*TimerQueue) RunAfter ¶ added in v0.4.8
func (s *TimerQueue) RunAfter(timeUnits int, r Runnable) int
创建一个定时器,在`timeUnits`时间后运行`r`
func (*TimerQueue) RunEvery ¶ added in v0.4.8
func (s *TimerQueue) RunEvery(interval int, r Runnable) int
创建一个定时器,每隔`interval`时间运行一次`r`
func (*TimerQueue) Shutdown ¶ added in v0.4.8
func (s *TimerQueue) Shutdown()
func (*TimerQueue) Size ¶ added in v0.4.8
func (s *TimerQueue) Size() int
func (*TimerQueue) Start ¶ added in v0.4.8
func (s *TimerQueue) Start() error
Starts the background thread explicitly