queues

package
v0.5.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2024 License: MIT Imports: 6 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrorQueueClosed  = errors.New("queue closed")  // 队列已关闭
	ErrorQueueInvalid = errors.New("queue invalid") // 无效的队列
)

Functions

func NewNonBlockingRW

func NewNonBlockingRW[I, T comparable](id I, chanSize, bufferSize int) nexus.Queue[I, T]

NewNonBlockingRW 创建一个并发安全的队列 NonBlockingRW,该队列支持通过 chanSize 自定义读取 channel 的大小,同支持使用 bufferSize 指定 buffer.Ring 的初始大小

  • closedHandler 可选的设置队列关闭处理函数,在队列关闭后将执行该函数。此刻队列不再可用

Types

type NonBlockingRW

type NonBlockingRW[I, T comparable] struct {
	// contains filtered or unexported fields
}

NonBlockingRW 队列是一个适用于消息处理等场景的并发安全的数据结构

  • 该队列接收自定义的消息 M,并将消息有序的传入 Read 函数所返回的 channel 中以供处理
  • 该结构主要实现目标为读写分离且并发安全的非阻塞传输队列,当消费阻塞时以牺牲内存为代价换取消息的生产不阻塞,适用于服务器消息处理等
  • 该队列保证了消息的完整性,确保消息不丢失,在队列关闭后会等待所有消息处理完毕后进行关闭,并提供 SetClosedHandler 函数来监听队列的关闭信号

func (*NonBlockingRW[I, T]) Close

func (n *NonBlockingRW[I, T]) Close()

Close 关闭队列

func (*NonBlockingRW[I, T]) Consume

func (n *NonBlockingRW[I, T]) Consume() <-chan nexus.EventInfo[I, T]

Consume 获取队列消息的只读通道,

func (*NonBlockingRW[I, T]) GetId

func (n *NonBlockingRW[I, T]) GetId() I

GetId 获取队列 Id

func (*NonBlockingRW[I, T]) GetMessageCount

func (n *NonBlockingRW[I, T]) GetMessageCount() (count int64)

GetMessageCount 获取消息数量

func (*NonBlockingRW[I, T]) GetTopicMessageCount

func (n *NonBlockingRW[I, T]) GetTopicMessageCount(topic T) int64

GetTopicMessageCount 获取特定主题的消息数量

func (*NonBlockingRW[I, T]) IncrementCustomMessageCount

func (n *NonBlockingRW[I, T]) IncrementCustomMessageCount(topic T, delta int64)

func (*NonBlockingRW[I, T]) IsClosed

func (n *NonBlockingRW[I, T]) IsClosed() bool

IsClosed 判断队列是否已关闭

func (*NonBlockingRW[I, T]) IsClosing

func (n *NonBlockingRW[I, T]) IsClosing() bool

IsClosing 判断队列是否正在关闭

func (*NonBlockingRW[I, T]) IsRunning

func (n *NonBlockingRW[I, T]) IsRunning() bool

IsRunning 判断队列是否正在运行

func (*NonBlockingRW[I, T]) Publish

func (n *NonBlockingRW[I, T]) Publish(topic T, event nexus.Event[I, T]) error

func (*NonBlockingRW[I, T]) Run

func (n *NonBlockingRW[I, T]) Run()

Run 阻塞的运行队列,当队列非首次运行时,将会引发来自 ErrorQueueInvalid 的 panic

type NonBlockingRWState

type NonBlockingRWState = int32
const (
	NonBlockingRWStatusNone    NonBlockingRWState = iota - 1 // 队列未运行
	NonBlockingRWStatusRunning                               // 队列运行中
	NonBlockingRWStatusClosing                               // 队列关闭中
	NonBlockingRWStatusClosed                                // 队列已关闭
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL