dispatcher

package
v0.5.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: MIT Imports: 7 Imported by: 0

README

Dispatcher

Go doc

暂无介绍...

目录导航

列出了该 package 下所有的函数及类型定义,可通过目录导航进行快捷跳转 ❤️

展开 / 折叠目录导航

包级函数定义

函数名称 描述
NewDispatcher 创建一个新的消息分发器 Dispatcher 实例
NewManager 生成消息分发器管理器

类型定义

类型 名称 描述
STRUCT Action 消息分发器操作器,用于暴露外部可操作的消息分发器函数
STRUCT Handler 消息处理器
STRUCT Dispatcher 用于服务器消息处理的消息分发器
STRUCT Manager 消息分发器管理器
INTERFACE Message 暂无描述...
INTERFACE Producer 暂无描述...

详情信息

func NewDispatcher[P Producer, M Message[P]](bufferSize int, name string, handler Handler[P, M]) *Dispatcher[P, M]

创建一个新的消息分发器 Dispatcher 实例

示例代码:


func ExampleNewDispatcher() {
	m := new(atomic.Int64)
	fm := new(atomic.Int64)
	w := new(sync.WaitGroup)
	w.Add(1)
	d := dispatcher.NewDispatcher(1024, "example-dispatcher", func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
		m.Add(1)
	})
	d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
		w.Done()
	})
	var producers = []string{"producer1", "producer2", "producer3"}
	for i := 0; i < len(producers); i++ {
		p := producers[i]
		for i := 0; i < 10; i++ {
			d.Put(&TestMessage{producer: p})
		}
		d.SetProducerDoneHandler(p, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) {
			fm.Add(1)
		})
	}
	d.Start()
	d.Expel()
	w.Wait()
	fmt.Println(fmt.Sprintf("producer num: %d, producer done: %d, finished: %d", len(producers), fm.Load(), m.Load()))
}

查看 / 收起单元测试

func TestNewDispatcher(t *testing.T) {
	var cases = []struct {
		name        string
		bufferSize  int
		handler     dispatcher.Handler[string, *TestMessage]
		shouldPanic bool
	}{{name: "TestNewDispatcher_BufferSize0AndHandlerNil", bufferSize: 0, handler: nil, shouldPanic: true}, {name: "TestNewDispatcher_BufferSize0AndHandlerNotNil", bufferSize: 0, handler: func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
	}, shouldPanic: true}, {name: "TestNewDispatcher_BufferSize1AndHandlerNil", bufferSize: 1, handler: nil, shouldPanic: true}, {name: "TestNewDispatcher_BufferSize1AndHandlerNotNil", bufferSize: 1, handler: func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
	}, shouldPanic: false}}
	for _, c := range cases {
		c := c
		t.Run(c.name, func(t *testing.T) {
			defer func() {
				if r := recover(); r != nil && !c.shouldPanic {
					t.Errorf("NewDispatcher() should not panic, but panic: %v", r)
				}
			}()
			dispatcher.NewDispatcher(c.bufferSize, c.name, c.handler)
		})
	}
}


func NewManager[P Producer, M Message[P]](bufferSize int, handler Handler[P, M]) *Manager[P, M]

生成消息分发器管理器

示例代码:


func ExampleNewManager() {
	mgr := dispatcher.NewManager[string, *TestMessage](10124*16, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
	})
	mgr.BindProducer("player_001", "shunt-001")
	mgr.BindProducer("player_002", "shunt-002")
	mgr.BindProducer("player_003", "shunt-sys")
	mgr.BindProducer("player_004", "shunt-sys")
	mgr.UnBindProducer("player_001")
	mgr.UnBindProducer("player_002")
	mgr.UnBindProducer("player_003")
	mgr.UnBindProducer("player_004")
	mgr.Wait()
	fmt.Println("done")
}

查看 / 收起单元测试

func TestNewManager(t *testing.T) {
	var cases = []struct {
		name        string
		bufferSize  int
		handler     dispatcher.Handler[string, *TestMessage]
		shouldPanic bool
	}{{name: "TestNewManager_BufferSize0AndHandlerNil", bufferSize: 0, handler: nil, shouldPanic: true}, {name: "TestNewManager_BufferSize0AndHandlerNotNil", bufferSize: 0, handler: func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
	}, shouldPanic: true}, {name: "TestNewManager_BufferSize1AndHandlerNil", bufferSize: 1, handler: nil, shouldPanic: true}, {name: "TestNewManager_BufferSize1AndHandlerNotNil", bufferSize: 1, handler: func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
	}, shouldPanic: false}}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			defer func() {
				if r := recover(); r != nil && !c.shouldPanic {
					t.Errorf("NewManager() should not panic, but panic: %v", r)
				}
			}()
			dispatcher.NewManager[string, *TestMessage](c.bufferSize, c.handler)
		})
	}
}


Action STRUCT

消息分发器操作器,用于暴露外部可操作的消息分发器函数

type Action[P Producer, M Message[P]] struct {
	unlock bool
	d      *Dispatcher[P, M]
}

func (*Action) Name() string

获取消息分发器名称


func (*Action) UnExpel()

取消特定生产者的驱逐计划


func (*Action) Expel()

设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭


Handler STRUCT

消息处理器

type Handler[P Producer, M Message[P]] func(dispatcher *Dispatcher[P, M], message M)

Dispatcher STRUCT

用于服务器消息处理的消息分发器

这个消息分发器为并发安全的生产者和消费者模型,生产者可以是任意类型,消费者必须是 Message 接口的实现。 生产者可以通过 Put 方法并发安全地将消息放入消息分发器,消息执行过程不会阻塞到 Put 方法,同时允许在 Start 方法之前调用 Put 方法。 在执行 Start 方法后,消息分发器会阻塞地从消息缓冲区中读取消息,然后执行消息处理器,消息处理器的执行过程不会阻塞到消息的生产。

为了保证消息不丢失,内部采用了 buffer.RingUnbounded 作为缓冲区实现,并且消息分发器不提供 Close 方法。 如果需要关闭消息分发器,可以通过 Expel 方法设置驱逐计划,当消息分发器中没有任何消息时,将会被释放。 同时,也可以使用 UnExpel 方法取消驱逐计划。

为什么提供 Expel 和 UnExpel 方法:

  • 在连接断开时,当需要执行一系列消息处理时,如果直接关闭消息分发器,可能会导致消息丢失。所以提供了 Expel 方法,可以在消息处理完成后再关闭消息分发器。
  • 当消息还未处理完成时连接重连,如果没有取消驱逐计划,可能会导致消息分发器被关闭。所以提供了 UnExpel 方法,可以在连接重连后取消驱逐计划。
type Dispatcher[P Producer, M Message[P]] struct {
	buf           *buffer.RingUnbounded[M]
	uniques       *haxmap.Map[string, struct{}]
	handler       Handler[P, M]
	expel         bool
	mc            int64
	pmc           map[P]int64
	pmcF          map[P]func(p P, dispatcher *Action[P, M])
	lock          sync.RWMutex
	name          string
	closedHandler atomic.Pointer[func(dispatcher *Action[P, M])]
	abort         chan struct{}
}

func (*Dispatcher) SetProducerDoneHandler(p P, handler func (p P, dispatcher *Action[P, M])) *Dispatcher[P, M]

设置特定生产者的所有消息处理完成时的回调函数

  • 如果 handler 为 nil,则会删除该生产者的回调函数
查看 / 收起单元测试

func TestDispatcher_SetProducerDoneHandler(t *testing.T) {
	var cases = []struct {
		name          string
		producer      string
		messageFinish *atomic.Bool
		cancel        bool
	}{{name: "TestDispatcher_SetProducerDoneHandlerNotCancel", producer: "producer", cancel: false}, {name: "TestDispatcher_SetProducerDoneHandlerCancel", producer: "producer", cancel: true}}
	for _, c := range cases {
		c := c
		t.Run(c.name, func(t *testing.T) {
			c.messageFinish = &atomic.Bool{}
			w := new(sync.WaitGroup)
			d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
				w.Done()
			})
			d.Put(&TestMessage{producer: c.producer})
			d.SetProducerDoneHandler(c.producer, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) {
				c.messageFinish.Store(true)
			})
			if c.cancel {
				d.SetProducerDoneHandler(c.producer, nil)
			}
			w.Add(1)
			d.Start()
			w.Wait()
			if c.cancel && c.messageFinish.Load() {
				t.Errorf("%s should cancel, but not", c.name)
			}
		})
	}
}


func (*Dispatcher) SetClosedHandler(handler func (dispatcher *Action[P, M])) *Dispatcher[P, M]

设置消息分发器关闭时的回调函数

查看 / 收起单元测试

func TestDispatcher_SetClosedHandler(t *testing.T) {
	var cases = []struct {
		name                  string
		handlerFinishMsgCount *atomic.Int64
		msgTime               time.Duration
		msgCount              int
	}{{name: "TestDispatcher_SetClosedHandler_Normal", msgTime: 0, msgCount: 1}, {name: "TestDispatcher_SetClosedHandler_MessageCount1024", msgTime: 0, msgCount: 1024}, {name: "TestDispatcher_SetClosedHandler_MessageTime1sMessageCount3", msgTime: 1 * time.Second, msgCount: 3}}
	for _, c := range cases {
		c := c
		t.Run(c.name, func(t *testing.T) {
			c.handlerFinishMsgCount = &atomic.Int64{}
			w := new(sync.WaitGroup)
			d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
				time.Sleep(c.msgTime)
				c.handlerFinishMsgCount.Add(1)
			})
			d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
				w.Done()
			})
			for i := 0; i < c.msgCount; i++ {
				d.Put(&TestMessage{producer: "producer"})
			}
			w.Add(1)
			d.Start()
			d.Expel()
			w.Wait()
			if c.handlerFinishMsgCount.Load() != int64(c.msgCount) {
				t.Errorf("%s should finish %d messages, but finish %d", c.name, c.msgCount, c.handlerFinishMsgCount.Load())
			}
		})
	}
}


func (*Dispatcher) Name() string

获取消息分发器名称

查看 / 收起单元测试

func TestDispatcher_Name(t *testing.T) {
	var cases = []struct{ name string }{{name: "TestDispatcher_Name_Normal"}}
	for _, c := range cases {
		c := c
		t.Run(c.name, func(t *testing.T) {
			d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			if d.Name() != c.name {
				t.Errorf("%s should equal %s, but not", c.name, c.name)
			}
		})
	}
}


func (*Dispatcher) Unique(name string) bool

设置唯一消息键,返回是否已存在


func (*Dispatcher) AntiUnique(name string)

取消唯一消息键


func (*Dispatcher) Expel()

设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭

查看 / 收起单元测试

func TestDispatcher_Expel(t *testing.T) {
	var cases = []struct {
		name                  string
		handlerFinishMsgCount *atomic.Int64
		msgTime               time.Duration
		msgCount              int
	}{{name: "TestDispatcher_Expel_Normal", msgTime: 0, msgCount: 1}, {name: "TestDispatcher_Expel_MessageCount1024", msgTime: 0, msgCount: 1024}, {name: "TestDispatcher_Expel_MessageTime1sMessageCount3", msgTime: 1 * time.Second, msgCount: 3}}
	for _, c := range cases {
		c := c
		t.Run(c.name, func(t *testing.T) {
			c.handlerFinishMsgCount = &atomic.Int64{}
			w := new(sync.WaitGroup)
			d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
				time.Sleep(c.msgTime)
				c.handlerFinishMsgCount.Add(1)
			})
			d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
				w.Done()
			})
			for i := 0; i < c.msgCount; i++ {
				d.Put(&TestMessage{producer: "producer"})
			}
			w.Add(1)
			d.Start()
			d.Expel()
			w.Wait()
			if c.handlerFinishMsgCount.Load() != int64(c.msgCount) {
				t.Errorf("%s should finish %d messages, but finish %d", c.name, c.msgCount, c.handlerFinishMsgCount.Load())
			}
		})
	}
}


func (*Dispatcher) UnExpel()

取消特定生产者的驱逐计划

查看 / 收起单元测试

func TestDispatcher_UnExpel(t *testing.T) {
	var cases = []struct {
		name      string
		closed    *atomic.Bool
		isUnExpel bool
		expect    bool
	}{{name: "TestDispatcher_UnExpel_Normal", isUnExpel: true, expect: false}, {name: "TestDispatcher_UnExpel_NotExpel", isUnExpel: false, expect: true}}
	for _, c := range cases {
		c := c
		t.Run(c.name, func(t *testing.T) {
			c.closed = &atomic.Bool{}
			w := new(sync.WaitGroup)
			d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
				w.Done()
			})
			d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
				c.closed.Store(true)
			})
			d.Put(&TestMessage{producer: "producer"})
			w.Add(1)
			if c.isUnExpel {
				d.Expel()
				d.UnExpel()
			} else {
				d.Expel()
			}
			d.Start()
			w.Wait()
			if c.closed.Load() != c.expect {
				t.Errorf("%s should %v, but %v", c.name, c.expect, c.closed.Load())
			}
		})
	}
}


func (*Dispatcher) IncrCount(producer P, i int64)

主动增量设置特定生产者的消息计数,这在等待异步消息完成后再关闭消息分发器时非常有用

  • 如果 i 为负数,则会减少消息计数

func (*Dispatcher) Put(message M)

将消息放入分发器

查看 / 收起单元测试

func TestDispatcher_Put(t *testing.T) {
	var cases = []struct {
		name        string
		producer    string
		messageDone *atomic.Bool
	}{{name: "TestDispatcher_Put_Normal", producer: "producer"}}
	for _, c := range cases {
		c := c
		t.Run(c.name, func(t *testing.T) {
			c.messageDone = &atomic.Bool{}
			w := new(sync.WaitGroup)
			w.Add(1)
			d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
				c.messageDone.Store(true)
				w.Done()
			})
			d.Start()
			d.Put(&TestMessage{producer: c.producer})
			d.Expel()
			w.Wait()
			if !c.messageDone.Load() {
				t.Errorf("%s should done, but not", c.name)
			}
		})
	}
}


func (*Dispatcher) Start() *Dispatcher[P, M]

以非阻塞的方式开始进行消息分发,当消息分发器中没有任何消息并且处于驱逐计划 Expel 时,将会自动关闭

查看 / 收起单元测试

func TestDispatcher_Start(t *testing.T) {
	var cases = []struct {
		name        string
		producer    string
		messageDone *atomic.Bool
	}{{name: "TestDispatcher_Start_Normal", producer: "producer"}}
	for _, c := range cases {
		c := c
		t.Run(c.name, func(t *testing.T) {
			c.messageDone = &atomic.Bool{}
			w := new(sync.WaitGroup)
			w.Add(1)
			d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
				c.messageDone.Store(true)
				w.Done()
			})
			d.Start()
			d.Put(&TestMessage{producer: c.producer})
			d.Expel()
			w.Wait()
			if !c.messageDone.Load() {
				t.Errorf("%s should done, but not", c.name)
			}
		})
	}
}


func (*Dispatcher) Closed() bool

判断消息分发器是否已关闭

查看 / 收起单元测试

func TestDispatcher_Closed(t *testing.T) {
	var cases = []struct{ name string }{{name: "TestDispatcher_Closed_Normal"}}
	for _, c := range cases {
		c := c
		t.Run(c.name, func(t *testing.T) {
			w := new(sync.WaitGroup)
			w.Add(1)
			d := dispatcher.NewDispatcher(1024, c.name, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
				w.Done()
			})
			d.Start()
			d.Expel()
			w.Wait()
			if !d.Closed() {
				t.Errorf("%s should closed, but not", c.name)
			}
		})
	}
}


Manager STRUCT

消息分发器管理器

type Manager[P Producer, M Message[P]] struct {
	handler        Handler[P, M]
	sys            *Dispatcher[P, M]
	dispatchers    map[string]*Dispatcher[P, M]
	member         map[string]map[P]struct{}
	curr           map[P]*Dispatcher[P, M]
	lock           sync.RWMutex
	w              sync.WaitGroup
	size           int
	closedHandler  func(name string)
	createdHandler func(name string)
}

func (*Manager) Wait()

等待所有消息分发器关闭


func (*Manager) SetDispatcherClosedHandler(handler func (name string)) *Manager[P, M]

设置消息分发器关闭时的回调函数

查看 / 收起单元测试

func TestManager_SetDispatcherClosedHandler(t *testing.T) {
	var cases = []struct {
		name            string
		setCloseHandler bool
	}{{name: "TestManager_SetDispatcherClosedHandler_Set", setCloseHandler: true}, {name: "TestManager_SetDispatcherClosedHandler_NotSet", setCloseHandler: false}}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			var closed atomic.Bool
			m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			if c.setCloseHandler {
				m.SetDispatcherClosedHandler(func(name string) {
					closed.Store(true)
				})
			}
			m.BindProducer(c.name, c.name)
			m.UnBindProducer(c.name)
			m.Wait()
			if c.setCloseHandler && !closed.Load() {
				t.Errorf("SetDispatcherClosedHandler() should be called")
			}
		})
	}
}


func (*Manager) SetDispatcherCreatedHandler(handler func (name string)) *Manager[P, M]

设置消息分发器创建时的回调函数

查看 / 收起单元测试

func TestManager_SetDispatcherCreatedHandler(t *testing.T) {
	var cases = []struct {
		name              string
		setCreatedHandler bool
	}{{name: "TestManager_SetDispatcherCreatedHandler_Set", setCreatedHandler: true}, {name: "TestManager_SetDispatcherCreatedHandler_NotSet", setCreatedHandler: false}}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			var created atomic.Bool
			m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			if c.setCreatedHandler {
				m.SetDispatcherCreatedHandler(func(name string) {
					created.Store(true)
				})
			}
			m.BindProducer(c.name, c.name)
			m.UnBindProducer(c.name)
			m.Wait()
			if c.setCreatedHandler && !created.Load() {
				t.Errorf("SetDispatcherCreatedHandler() should be called")
			}
		})
	}
}


func (*Manager) HasDispatcher(name string) bool

检查是否存在指定名称的消息分发器

查看 / 收起单元测试

func TestManager_HasDispatcher(t *testing.T) {
	var cases = []struct {
		name     string
		bindName string
		has      bool
	}{{name: "TestManager_HasDispatcher_Has", bindName: "TestManager_HasDispatcher_Has", has: true}, {name: "TestManager_HasDispatcher_NotHas", bindName: "TestManager_HasDispatcher_NotHas", has: false}}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			m.BindProducer(c.bindName, c.bindName)
			var cond string
			if c.has {
				cond = c.bindName
			}
			if m.HasDispatcher(cond) != c.has {
				t.Errorf("HasDispatcher() should return %v", c.has)
			}
		})
	}
}


func (*Manager) GetDispatcherNum() int

获取当前正在工作的消息分发器数量

查看 / 收起单元测试

func TestManager_GetDispatcherNum(t *testing.T) {
	var cases = []struct {
		name string
		num  int
	}{{name: "TestManager_GetDispatcherNum_N1", num: -1}, {name: "TestManager_GetDispatcherNum_0", num: 0}, {name: "TestManager_GetDispatcherNum_1", num: 1}, {name: "TestManager_GetDispatcherNum_2", num: 2}}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			switch {
			case c.num <= 0:
				return
			case c.num == 1:
				if m.GetDispatcherNum() != 1 {
					t.Errorf("GetDispatcherNum() should return 1")
				}
				return
			default:
				for i := 0; i < c.num-1; i++ {
					m.BindProducer(fmt.Sprintf("%s_%d", c.name, i), fmt.Sprintf("%s_%d", c.name, i))
				}
				if m.GetDispatcherNum() != c.num {
					t.Errorf("GetDispatcherNum() should return %v", c.num)
				}
			}
		})
	}
}


func (*Manager) GetSystemDispatcher() *Dispatcher[P, M]

获取系统消息分发器

查看 / 收起单元测试

func TestManager_GetSystemDispatcher(t *testing.T) {
	var cases = []struct{ name string }{{name: "TestManager_GetSystemDispatcher"}}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			if m.GetSystemDispatcher() == nil {
				t.Errorf("GetSystemDispatcher() should not return nil")
			}
		})
	}
}


func (*Manager) GetDispatcher(p P) *Dispatcher[P, M]

获取生产者正在使用的消息分发器,如果生产者没有绑定消息分发器,则会返回系统消息分发器

查看 / 收起单元测试

func TestManager_GetDispatcher(t *testing.T) {
	var cases = []struct {
		name     string
		bindName string
	}{{name: "TestManager_GetDispatcher", bindName: "TestManager_GetDispatcher"}}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			m.BindProducer(c.bindName, c.bindName)
			if m.GetDispatcher(c.bindName) == nil {
				t.Errorf("GetDispatcher() should not return nil")
			}
		})
	}
}


func (*Manager) BindProducer(p P, name string)

绑定生产者使用特定的消息分发器,如果生产者已经绑定了消息分发器,则会先解绑

查看 / 收起单元测试

func TestManager_BindProducer(t *testing.T) {
	var cases = []struct {
		name     string
		bindName string
	}{{name: "TestManager_BindProducer", bindName: "TestManager_BindProducer"}}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			m.BindProducer(c.bindName, c.bindName)
			if m.GetDispatcher(c.bindName) == nil {
				t.Errorf("GetDispatcher() should not return nil")
			}
		})
	}
}


func (*Manager) UnBindProducer(p P)

解绑生产者使用特定的消息分发器

查看 / 收起单元测试

func TestManager_UnBindProducer(t *testing.T) {
	var cases = []struct {
		name     string
		bindName string
	}{{name: "TestManager_UnBindProducer", bindName: "TestManager_UnBindProducer"}}
	for _, c := range cases {
		t.Run(c.name, func(t *testing.T) {
			m := dispatcher.NewManager[string, *TestMessage](1024, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
			})
			m.BindProducer(c.bindName, c.bindName)
			m.UnBindProducer(c.bindName)
			if m.GetDispatcher(c.bindName) != m.GetSystemDispatcher() {
				t.Errorf("GetDispatcher() should return SystemDispatcher")
			}
		})
	}
}


Message INTERFACE
type Message[P comparable] interface {
	GetProducer() P
}

Producer INTERFACE
type Producer interface {
	comparable
}

Documentation

Index

Examples

Constants

View Source
const SystemName = "*system"

Variables

This section is empty.

Functions

This section is empty.

Types

type Action

type Action[P Producer, M Message[P]] struct {
	// contains filtered or unexported fields
}

Action 消息分发器操作器,用于暴露外部可操作的消息分发器函数

func (*Action[P, M]) Expel

func (a *Action[P, M]) Expel()

Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭

func (*Action[P, M]) Name

func (a *Action[P, M]) Name() string

Name 获取消息分发器名称

func (*Action[P, M]) UnExpel

func (a *Action[P, M]) UnExpel()

UnExpel 取消特定生产者的驱逐计划

type Dispatcher

type Dispatcher[P Producer, M Message[P]] struct {
	// contains filtered or unexported fields
}

Dispatcher 用于服务器消息处理的消息分发器

这个消息分发器为并发安全的生产者和消费者模型,生产者可以是任意类型,消费者必须是 Message 接口的实现。 生产者可以通过 Put 方法并发安全地将消息放入消息分发器,消息执行过程不会阻塞到 Put 方法,同时允许在 Start 方法之前调用 Put 方法。 在执行 Start 方法后,消息分发器会阻塞地从消息缓冲区中读取消息,然后执行消息处理器,消息处理器的执行过程不会阻塞到消息的生产。

为了保证消息不丢失,内部采用了 buffer.RingUnbounded 作为缓冲区实现,并且消息分发器不提供 Close 方法。 如果需要关闭消息分发器,可以通过 Expel 方法设置驱逐计划,当消息分发器中没有任何消息时,将会被释放。 同时,也可以使用 UnExpel 方法取消驱逐计划。

为什么提供 Expel 和 UnExpel 方法:

  • 在连接断开时,当需要执行一系列消息处理时,如果直接关闭消息分发器,可能会导致消息丢失。所以提供了 Expel 方法,可以在消息处理完成后再关闭消息分发器。
  • 当消息还未处理完成时连接重连,如果没有取消驱逐计划,可能会导致消息分发器被关闭。所以提供了 UnExpel 方法,可以在连接重连后取消驱逐计划。

func NewDispatcher

func NewDispatcher[P Producer, M Message[P]](bufferSize int, name string, handler Handler[P, M]) *Dispatcher[P, M]

NewDispatcher 创建一个新的消息分发器 Dispatcher 实例

Example
m := new(atomic.Int64)
fm := new(atomic.Int64)
w := new(sync.WaitGroup)
w.Add(1)
d := dispatcher.NewDispatcher(1024, "example-dispatcher", func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
	m.Add(1)
})
d.SetClosedHandler(func(dispatcher *dispatcher.Action[string, *TestMessage]) {
	w.Done()
})
var producers = []string{"producer1", "producer2", "producer3"}
for i := 0; i < len(producers); i++ {
	p := producers[i]
	for i := 0; i < 10; i++ {
		d.Put(&TestMessage{producer: p})
	}
	d.SetProducerDoneHandler(p, func(p string, dispatcher *dispatcher.Action[string, *TestMessage]) {
		fm.Add(1)
	})
}
d.Start()
d.Expel()
w.Wait()
fmt.Println(fmt.Sprintf("producer num: %d, producer done: %d, finished: %d", len(producers), fm.Load(), m.Load()))
Output:

producer num: 3, producer done: 3, finished: 30

func (*Dispatcher[P, M]) AntiUnique

func (d *Dispatcher[P, M]) AntiUnique(name string)

AntiUnique 取消唯一消息键

func (*Dispatcher[P, M]) Closed

func (d *Dispatcher[P, M]) Closed() bool

Closed 判断消息分发器是否已关闭

func (*Dispatcher[P, M]) Expel

func (d *Dispatcher[P, M]) Expel()

Expel 设置该消息分发器即将被驱逐,当消息分发器中没有任何消息时,会自动关闭

func (*Dispatcher[P, M]) IncrCount

func (d *Dispatcher[P, M]) IncrCount(producer P, i int64)

IncrCount 主动增量设置特定生产者的消息计数,这在等待异步消息完成后再关闭消息分发器时非常有用

  • 如果 i 为负数,则会减少消息计数

func (*Dispatcher[P, M]) Name

func (d *Dispatcher[P, M]) Name() string

Name 获取消息分发器名称

func (*Dispatcher[P, M]) Put

func (d *Dispatcher[P, M]) Put(message M)

Put 将消息放入分发器

func (*Dispatcher[P, M]) SetClosedHandler

func (d *Dispatcher[P, M]) SetClosedHandler(handler func(dispatcher *Action[P, M])) *Dispatcher[P, M]

SetClosedHandler 设置消息分发器关闭时的回调函数

func (*Dispatcher[P, M]) SetProducerDoneHandler

func (d *Dispatcher[P, M]) SetProducerDoneHandler(p P, handler func(p P, dispatcher *Action[P, M])) *Dispatcher[P, M]

SetProducerDoneHandler 设置特定生产者的所有消息处理完成时的回调函数

  • 如果 handler 为 nil,则会删除该生产者的回调函数

func (*Dispatcher[P, M]) Start

func (d *Dispatcher[P, M]) Start() *Dispatcher[P, M]

Start 以非阻塞的方式开始进行消息分发,当消息分发器中没有任何消息并且处于驱逐计划 Expel 时,将会自动关闭

func (*Dispatcher[P, M]) UnExpel

func (d *Dispatcher[P, M]) UnExpel()

UnExpel 取消特定生产者的驱逐计划

func (*Dispatcher[P, M]) Unique

func (d *Dispatcher[P, M]) Unique(name string) bool

Unique 设置唯一消息键,返回是否已存在

type Handler

type Handler[P Producer, M Message[P]] func(dispatcher *Dispatcher[P, M], message M)

Handler 消息处理器

type Manager

type Manager[P Producer, M Message[P]] struct {
	// contains filtered or unexported fields
}

Manager 消息分发器管理器

func NewManager

func NewManager[P Producer, M Message[P]](bufferSize int, handler Handler[P, M]) *Manager[P, M]

NewManager 生成消息分发器管理器

Example
mgr := dispatcher.NewManager[string, *TestMessage](10124*16, func(dispatcher *dispatcher.Dispatcher[string, *TestMessage], message *TestMessage) {
	// do something
})
mgr.BindProducer("player_001", "shunt-001")
mgr.BindProducer("player_002", "shunt-002")
mgr.BindProducer("player_003", "shunt-sys")
mgr.BindProducer("player_004", "shunt-sys")
mgr.UnBindProducer("player_001")
mgr.UnBindProducer("player_002")
mgr.UnBindProducer("player_003")
mgr.UnBindProducer("player_004")
mgr.Wait()
fmt.Println("done")
Output:

done

func (*Manager[P, M]) BindProducer

func (m *Manager[P, M]) BindProducer(p P, name string)

BindProducer 绑定生产者使用特定的消息分发器,如果生产者已经绑定了消息分发器,则会先解绑

func (*Manager[P, M]) GetDispatcher

func (m *Manager[P, M]) GetDispatcher(p P) *Dispatcher[P, M]

GetDispatcher 获取生产者正在使用的消息分发器,如果生产者没有绑定消息分发器,则会返回系统消息分发器

func (*Manager[P, M]) GetDispatcherNum

func (m *Manager[P, M]) GetDispatcherNum() int

GetDispatcherNum 获取当前正在工作的消息分发器数量

func (*Manager[P, M]) GetSystemDispatcher

func (m *Manager[P, M]) GetSystemDispatcher() *Dispatcher[P, M]

GetSystemDispatcher 获取系统消息分发器

func (*Manager[P, M]) HasDispatcher

func (m *Manager[P, M]) HasDispatcher(name string) bool

HasDispatcher 检查是否存在指定名称的消息分发器

func (*Manager[P, M]) SetDispatcherClosedHandler

func (m *Manager[P, M]) SetDispatcherClosedHandler(handler func(name string)) *Manager[P, M]

SetDispatcherClosedHandler 设置消息分发器关闭时的回调函数

func (*Manager[P, M]) SetDispatcherCreatedHandler

func (m *Manager[P, M]) SetDispatcherCreatedHandler(handler func(name string)) *Manager[P, M]

SetDispatcherCreatedHandler 设置消息分发器创建时的回调函数

func (*Manager[P, M]) UnBindProducer

func (m *Manager[P, M]) UnBindProducer(p P)

UnBindProducer 解绑生产者使用特定的消息分发器

func (*Manager[P, M]) Wait

func (m *Manager[P, M]) Wait()

Wait 等待所有消息分发器关闭

type Message

type Message[P comparable] interface {
	// GetProducer 获取消息生产者
	GetProducer() P
}

type Producer

type Producer interface {
	comparable
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL