Documentation ¶
Index ¶
- Constants
- type Action
- type Dispatcher
- func (d *Dispatcher[P, M]) AntiUnique(name string)
- func (d *Dispatcher[P, M]) Closed() bool
- func (d *Dispatcher[P, M]) Expel()
- func (d *Dispatcher[P, M]) IncrCount(producer P, i int64)
- func (d *Dispatcher[P, M]) Name() string
- func (d *Dispatcher[P, M]) Put(message M)
- func (d *Dispatcher[P, M]) SetClosedHandler(handler func(dispatcher *Action[P, M])) *Dispatcher[P, M]
- func (d *Dispatcher[P, M]) SetProducerDoneHandler(p P, handler func(p P, dispatcher *Action[P, M])) *Dispatcher[P, M]
- func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M]
- func (d *Dispatcher[P, M]) UnExpel()
- func (d *Dispatcher[P, M]) Unique(name string) bool
- type Handler
- type Manager
- func (m *Manager[P, M]) BindProducer(p P, name string)
- func (m *Manager[P, M]) GetDispatcher(p P) *Dispatcher[P, M]
- func (m *Manager[P, M]) GetDispatcherNum() int
- func (m *Manager[P, M]) GetSystemDispatcher() *Dispatcher[P, M]
- func (m *Manager[P, M]) HasDispatcher(name string) bool
- func (m *Manager[P, M]) SetDispatcherClosedHandler(handler func(name string)) *Manager[P, M]
- func (m *Manager[P, M]) SetDispatcherCreatedHandler(handler func(name string)) *Manager[P, M]
- func (m *Manager[P, M]) UnBindProducer(p P)
- func (m *Manager[P, M]) Wait()
- type Message
- type Producer
Examples ¶
Constants ¶
const SystemName = "*system"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Action ¶
Action 消息分发器操作器,用于暴露外部可操作的消息分发器函数
type Dispatcher ¶
Dispatcher 用于服务器消息处理的消息分发器
这个消息分发器为并发安全的生产者和消费者模型,生产者可以是任意类型,消费者必须是 Message 接口的实现。 生产者可以通过 Put 方法并发安全地将消息放入消息分发器,消息执行过程不会阻塞到 Put 方法,同时允许在 Start 方法之前调用 Put 方法。 在执行 Start 方法后,消息分发器会阻塞地从消息缓冲区中读取消息,然后执行消息处理器,消息处理器的执行过程不会阻塞到消息的生产。
为了保证消息不丢失,内部采用了 buffer.RingUnbounded 作为缓冲区实现,并且消息分发器不提供 Close 方法。 如果需要关闭消息分发器,可以通过 Expel 方法设置驱逐计划,当消息分发器中没有任何消息时,将会被释放。 同时,也可以使用 UnExpel 方法取消驱逐计划。
为什么提供 Expel 和 UnExpel 方法:
- 在连接断开时,当需要执行一系列消息处理时,如果直接关闭消息分发器,可能会导致消息丢失。所以提供了 Expel 方法,可以在消息处理完成后再关闭消息分发器。
- 当消息还未处理完成时连接重连,如果没有取消驱逐计划,可能会导致消息分发器被关闭。所以提供了 UnExpel 方法,可以在连接重连后取消驱逐计划。
func NewDispatcher ¶
func NewDispatcher[P Producer, M Message[P]](bufferSize int, name string, handler Handler[P, M]) *Dispatcher[P, M]
NewDispatcher 创建一个新的消息分发器 Dispatcher 实例
Example ¶
m := new(atomic.Int64) fm := new(atomic.Int64) w := new(sync.WaitGroup) w.Add(1) d := dispatcher.NewDispatcher(1024, "example-dispatcher", func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) { m.Add(1) }) d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) { w.Done() }) var producers = []string{"producer1", "producer2", "producer3"} for i := 0; i < len(producers); i++ { p := producers[i] for i := 0; i < 10; i++ { d.Put(&TestMessage{producer: p}) } d.SetProducerDoneHandler(p, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) { fm.Add(1) }) } d.Start() d.Expel() w.Wait() fmt.Println(fmt.Sprintf("producer num: %d, producer done: %d, finished: %d", len(producers), fm.Load(), m.Load()))
Output: producer num: 3, producer done: 3, finished: 30
func (*Dispatcher[P, M]) AntiUnique ¶
func (d *Dispatcher[P, M]) AntiUnique(name string)
AntiUnique 取消唯一消息键
func (*Dispatcher[P, M]) Expel ¶
func (d *Dispatcher[P, M]) Expel()
Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭
func (*Dispatcher[P, M]) IncrCount ¶
func (d *Dispatcher[P, M]) IncrCount(producer P, i int64)
IncrCount 主动增量设置特定生产者的消息计数,这在等待异步消息完成后再关闭消息分发器时非常有用
- 如果 i 为负数,则会减少消息计数
func (*Dispatcher[P, M]) SetClosedHandler ¶
func (d *Dispatcher[P, M]) SetClosedHandler(handler func(dispatcher *Action[P, M])) *Dispatcher[P, M]
SetClosedHandler 设置消息分发器关闭时的回调函数
func (*Dispatcher[P, M]) SetProducerDoneHandler ¶
func (d *Dispatcher[P, M]) SetProducerDoneHandler(p P, handler func(p P, dispatcher *Action[P, M])) *Dispatcher[P, M]
SetProducerDoneHandler 设置特定生产者所有消息处理完成时的回调函数
- 如果 handler 为 nil,则会删除该生产者的回调函数
需要注意的是,该 handler 中
func (*Dispatcher[P, M]) Start ¶
func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M]
Start 以非阻塞的方式开始进行消息分发,当消息分发器中没有任何消息并且处于驱逐计划 Expel 时,将会自动关闭
func (*Dispatcher[P, M]) Unique ¶
func (d *Dispatcher[P, M]) Unique(name string) bool
Unique 设置唯一消息键,返回是否已存在
type Handler ¶
type Handler[P Producer, M Message[P]] func(dispatcher *Dispatcher[P, M], message M)
Handler 消息处理器
type Manager ¶
Manager 消息分发器管理器
func NewManager ¶
NewManager 生成消息分发器管理器
Example ¶
mgr := dispatcher.NewManager[string, *TestMessage](10124*16, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) { // do something }) mgr.BindProducer("player_001", "shunt-001") mgr.BindProducer("player_002", "shunt-002") mgr.BindProducer("player_003", "shunt-sys") mgr.BindProducer("player_004", "shunt-sys") mgr.UnBindProducer("player_001") mgr.UnBindProducer("player_002") mgr.UnBindProducer("player_003") mgr.UnBindProducer("player_004") mgr.Wait() fmt.Println("done")
Output: done
func (*Manager[P, M]) BindProducer ¶
BindProducer 绑定生产者使用特定的消息分发器,如果生产者已经绑定了消息分发器,则会先解绑
func (*Manager[P, M]) GetDispatcher ¶
func (m *Manager[P, M]) GetDispatcher(p P) *Dispatcher[P, M]
GetDispatcher 获取生产者正在使用的消息分发器,如果生产者没有绑定消息分发器,则会返回系统消息分发器
func (*Manager[P, M]) GetDispatcherNum ¶
GetDispatcherNum 获取当前正在工作的消息分发器数量
func (*Manager[P, M]) GetSystemDispatcher ¶
func (m *Manager[P, M]) GetSystemDispatcher() *Dispatcher[P, M]
GetSystemDispatcher 获取系统消息分发器
func (*Manager[P, M]) HasDispatcher ¶
HasDispatcher 检查是否存在指定名称的消息分发器
func (*Manager[P, M]) SetDispatcherClosedHandler ¶
SetDispatcherClosedHandler 设置消息分发器关闭时的回调函数
func (*Manager[P, M]) SetDispatcherCreatedHandler ¶
SetDispatcherCreatedHandler 设置消息分发器创建时的回调函数
func (*Manager[P, M]) UnBindProducer ¶
func (m *Manager[P, M]) UnBindProducer(p P)
UnBindProducer 解绑生产者使用特定的消息分发器
type Message ¶
type Message[P comparable] interface { // GetProducer 获取消息生产者 GetProducer() P }
type Producer ¶
type Producer interface { comparable }