Documentation ¶
Index ¶
- Variables
- func NewNonBlockingRW[I, T comparable](id I, chanSize, bufferSize int) nexus.Queue[I, T]
- type NonBlockingRW
- func (n *NonBlockingRW[I, T]) Close()
- func (n *NonBlockingRW[I, T]) Consume() <-chan nexus.EventInfo[I, T]
- func (n *NonBlockingRW[I, T]) GetId() I
- func (n *NonBlockingRW[I, T]) GetMessageCount() (count int64)
- func (n *NonBlockingRW[I, T]) GetTopicMessageCount(topic T) int64
- func (n *NonBlockingRW[I, T]) IncrementCustomMessageCount(topic T, delta int64)
- func (n *NonBlockingRW[I, T]) IsClosed() bool
- func (n *NonBlockingRW[I, T]) IsClosing() bool
- func (n *NonBlockingRW[I, T]) IsRunning() bool
- func (n *NonBlockingRW[I, T]) Publish(topic T, event nexus.Event[I, T]) error
- func (n *NonBlockingRW[I, T]) Run()
- type NonBlockingRWState
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrorQueueClosed = errors.New("queue closed") // 队列已关闭 ErrorQueueInvalid = errors.New("queue invalid") // 无效的队列 )
Functions ¶
func NewNonBlockingRW ¶
func NewNonBlockingRW[I, T comparable](id I, chanSize, bufferSize int) nexus.Queue[I, T]
NewNonBlockingRW 创建一个并发安全的队列 NonBlockingRW,该队列支持通过 chanSize 自定义读取 channel 的大小,同支持使用 bufferSize 指定 buffer.Ring 的初始大小
- closedHandler 可选的设置队列关闭处理函数,在队列关闭后将执行该函数。此刻队列不再可用
Types ¶
type NonBlockingRW ¶
type NonBlockingRW[I, T comparable] struct { // contains filtered or unexported fields }
NonBlockingRW 队列是一个适用于消息处理等场景的并发安全的数据结构
- 该队列接收自定义的消息 M,并将消息有序的传入 Read 函数所返回的 channel 中以供处理
- 该结构主要实现目标为读写分离且并发安全的非阻塞传输队列,当消费阻塞时以牺牲内存为代价换取消息的生产不阻塞,适用于服务器消息处理等
- 该队列保证了消息的完整性,确保消息不丢失,在队列关闭后会等待所有消息处理完毕后进行关闭,并提供 SetClosedHandler 函数来监听队列的关闭信号
func (*NonBlockingRW[I, T]) Consume ¶
func (n *NonBlockingRW[I, T]) Consume() <-chan nexus.EventInfo[I, T]
Consume 获取队列消息的只读通道,
func (*NonBlockingRW[I, T]) GetMessageCount ¶
func (n *NonBlockingRW[I, T]) GetMessageCount() (count int64)
GetMessageCount 获取消息数量
func (*NonBlockingRW[I, T]) GetTopicMessageCount ¶
func (n *NonBlockingRW[I, T]) GetTopicMessageCount(topic T) int64
GetTopicMessageCount 获取特定主题的消息数量
func (*NonBlockingRW[I, T]) IncrementCustomMessageCount ¶
func (n *NonBlockingRW[I, T]) IncrementCustomMessageCount(topic T, delta int64)
func (*NonBlockingRW[I, T]) IsClosed ¶
func (n *NonBlockingRW[I, T]) IsClosed() bool
IsClosed 判断队列是否已关闭
func (*NonBlockingRW[I, T]) IsClosing ¶
func (n *NonBlockingRW[I, T]) IsClosing() bool
IsClosing 判断队列是否正在关闭
func (*NonBlockingRW[I, T]) IsRunning ¶
func (n *NonBlockingRW[I, T]) IsRunning() bool
IsRunning 判断队列是否正在运行
func (*NonBlockingRW[I, T]) Publish ¶
func (n *NonBlockingRW[I, T]) Publish(topic T, event nexus.Event[I, T]) error
func (*NonBlockingRW[I, T]) Run ¶
func (n *NonBlockingRW[I, T]) Run()
Run 阻塞的运行队列,当队列非首次运行时,将会引发来自 ErrorQueueInvalid 的 panic
type NonBlockingRWState ¶
type NonBlockingRWState = int32
const ( NonBlockingRWStatusNone NonBlockingRWState = iota - 1 // 队列未运行 NonBlockingRWStatusRunning // 队列运行中 NonBlockingRWStatusClosing // 队列关闭中 NonBlockingRWStatusClosed // 队列已关闭 )
Click to show internal directories.
Click to hide internal directories.