dispatcher

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2024 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const SystemName = "*system"

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action[P Producer, M Message[P]] struct {
	// contains filtered or unexported fields
}

Action 消息分发器操作器,用于暴露外部可操作的消息分发器函数

func (*Action[P, M]) Expel

func (a *Action[P, M]) Expel()

Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭

func (*Action[P, M]) Name

func (a *Action[P, M]) Name() string

Name 获取消息分发器名称

func (*Action[P, M]) UnExpel

func (a *Action[P, M]) UnExpel()

UnExpel 取消特定生产者的驱逐计划

type Dispatcher

type Dispatcher[P Producer, M Message[P]] struct {
	// contains filtered or unexported fields
}

Dispatcher 用于服务器消息处理的消息分发器

这个消息分发器为并发安全的生产者和消费者模型,生产者可以是任意类型,消费者必须是 Message 接口的实现。 生产者可以通过 Put 方法并发安全地将消息放入消息分发器,消息执行过程不会阻塞到 Put 方法,同时允许在 Start 方法之前调用 Put 方法。 在执行 Start 方法后,消息分发器会阻塞地从消息缓冲区中读取消息,然后执行消息处理器,消息处理器的执行过程不会阻塞到消息的生产。

为了保证消息不丢失,内部采用了 buffer.RingUnbounded 作为缓冲区实现,并且消息分发器不提供 Close 方法。 如果需要关闭消息分发器,可以通过 Expel 方法设置驱逐计划,当消息分发器中没有任何消息时,将会被释放。 同时,也可以使用 UnExpel 方法取消驱逐计划。

为什么提供 Expel 和 UnExpel 方法:

  • 在连接断开时,当需要执行一系列消息处理时,如果直接关闭消息分发器,可能会导致消息丢失。所以提供了 Expel 方法,可以在消息处理完成后再关闭消息分发器。
  • 当消息还未处理完成时连接重连,如果没有取消驱逐计划,可能会导致消息分发器被关闭。所以提供了 UnExpel 方法,可以在连接重连后取消驱逐计划。

func NewDispatcher

func NewDispatcher[P Producer, M Message[P]](bufferSize int, name string, handler Handler[P, M]) *Dispatcher[P, M]

NewDispatcher 创建一个新的消息分发器 Dispatcher 实例

Example
m := new(atomic.Int64)
fm := new(atomic.Int64)
w := new(sync.WaitGroup)
w.Add(1)
d := dispatcher.NewDispatcher(1024, "example-dispatcher", func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
	m.Add(1)
})
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
	w.Done()
})
var producers = []string{"producer1", "producer2", "producer3"}
for i := 0; i < len(producers); i++ {
	p := producers[i]
	for i := 0; i < 10; i++ {
		d.Put(&TestMessage{producer: p})
	}
	d.SetProducerDoneHandler(p, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) {
		fm.Add(1)
	})
}
d.Start()
d.Expel()
w.Wait()
fmt.Println(fmt.Sprintf("producer num: %d, producer done: %d, finished: %d", len(producers), fm.Load(), m.Load()))
Output:

producer num: 3, producer done: 3, finished: 30

func (*Dispatcher[P, M]) AntiUnique

func (d *Dispatcher[P, M]) AntiUnique(name string)

AntiUnique 取消唯一消息键

func (*Dispatcher[P, M]) Closed

func (d *Dispatcher[P, M]) Closed() bool

Closed 判断消息分发器是否已关闭

func (*Dispatcher[P, M]) Expel

func (d *Dispatcher[P, M]) Expel()

Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭

func (*Dispatcher[P, M]) IncrCount

func (d *Dispatcher[P, M]) IncrCount(producer P, i int64)

IncrCount 主动增量设置特定生产者的消息计数,这在等待异步消息完成后再关闭消息分发器时非常有用

  • 如果 i 为负数,则会减少消息计数

func (*Dispatcher[P, M]) Name

func (d *Dispatcher[P, M]) Name() string

Name 获取消息分发器名称

func (*Dispatcher[P, M]) Put

func (d *Dispatcher[P, M]) Put(message M)

Put 将消息放入分发器

func (*Dispatcher[P, M]) SetClosedHandler

func (d *Dispatcher[P, M]) SetClosedHandler(handler func(dispatcher *Action[P, M])) *Dispatcher[P, M]

SetClosedHandler 设置消息分发器关闭时的回调函数

func (*Dispatcher[P, M]) SetProducerDoneHandler

func (d *Dispatcher[P, M]) SetProducerDoneHandler(p P, handler func(p P, dispatcher *Action[P, M])) *Dispatcher[P, M]

SetProducerDoneHandler 设置特定生产者所有消息处理完成时的回调函数

  • 如果 handler 为 nil,则会删除该生产者的回调函数

需要注意的是,该 handler 中

func (*Dispatcher[P, M]) Start

func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M]

Start 以非阻塞的方式开始进行消息分发,当消息分发器中没有任何消息并且处于驱逐计划 Expel 时,将会自动关闭

func (*Dispatcher[P, M]) UnExpel

func (d *Dispatcher[P, M]) UnExpel()

UnExpel 取消特定生产者的驱逐计划

func (*Dispatcher[P, M]) Unique

func (d *Dispatcher[P, M]) Unique(name string) bool

Unique 设置唯一消息键,返回是否已存在

type Handler

type Handler[P Producer, M Message[P]] func(dispatcher *Dispatcher[P, M], message M)

Handler 消息处理器

type Manager

type Manager[P Producer, M Message[P]] struct {
	// contains filtered or unexported fields
}

Manager 消息分发器管理器

func NewManager

func NewManager[P Producer, M Message[P]](bufferSize int, handler Handler[P, M]) *Manager[P, M]

NewManager 生成消息分发器管理器

Example
mgr := dispatcher.NewManager[string, *TestMessage](10124*16, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
	// do something
})
mgr.BindProducer("player_001", "shunt-001")
mgr.BindProducer("player_002", "shunt-002")
mgr.BindProducer("player_003", "shunt-sys")
mgr.BindProducer("player_004", "shunt-sys")
mgr.UnBindProducer("player_001")
mgr.UnBindProducer("player_002")
mgr.UnBindProducer("player_003")
mgr.UnBindProducer("player_004")
mgr.Wait()
fmt.Println("done")
Output:

done

func (*Manager[P, M]) BindProducer

func (m *Manager[P, M]) BindProducer(p P, name string)

BindProducer 绑定生产者使用特定的消息分发器,如果生产者已经绑定了消息分发器,则会先解绑

func (*Manager[P, M]) GetDispatcher

func (m *Manager[P, M]) GetDispatcher(p P) *Dispatcher[P, M]

GetDispatcher 获取生产者正在使用的消息分发器,如果生产者没有绑定消息分发器,则会返回系统消息分发器

func (*Manager[P, M]) GetDispatcherNum

func (m *Manager[P, M]) GetDispatcherNum() int

GetDispatcherNum 获取当前正在工作的消息分发器数量

func (*Manager[P, M]) GetSystemDispatcher

func (m *Manager[P, M]) GetSystemDispatcher() *Dispatcher[P, M]

GetSystemDispatcher 获取系统消息分发器

func (*Manager[P, M]) HasDispatcher

func (m *Manager[P, M]) HasDispatcher(name string) bool

HasDispatcher 检查是否存在指定名称的消息分发器

func (*Manager[P, M]) SetDispatcherClosedHandler

func (m *Manager[P, M]) SetDispatcherClosedHandler(handler func(name string)) *Manager[P, M]

SetDispatcherClosedHandler 设置消息分发器关闭时的回调函数

func (*Manager[P, M]) SetDispatcherCreatedHandler

func (m *Manager[P, M]) SetDispatcherCreatedHandler(handler func(name string)) *Manager[P, M]

SetDispatcherCreatedHandler 设置消息分发器创建时的回调函数

func (*Manager[P, M]) UnBindProducer

func (m *Manager[P, M]) UnBindProducer(p P)

UnBindProducer 解绑生产者使用特定的消息分发器

func (*Manager[P, M]) Wait

func (m *Manager[P, M]) Wait()

Wait 等待所有消息分发器关闭

type Message

type Message[P comparable] interface {
	// GetProducer 获取消息生产者
	GetProducer() P
}

type Producer

type Producer interface {
	comparable
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL