Documentation ¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Channel ¶ added in v0.4.1
type Channel[T any] struct { // contains filtered or unexported fields }
Channel 基于 chan 的写循环,与 Unbounded 相同,但是使用 Channel 实现
func NewChannel ¶ added in v0.4.1
func NewChannel[Message any](pool *concurrent.Pool[Message], channelSize int, writeHandler func(message Message) error, errorHandler func(err any)) *Channel[Message]
NewChannel 创建基于 Channel 的写循环
- pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Channel 会在写入完成后将 Message 对象放回缓冲池
- channelSize Channel 的大小
- writeHandler 写入处理函数
- errorHandler 错误处理函数
传入 writeHandler 的消息对象是从 Channel 中获取的,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象
type Unbounded ¶ added in v0.4.1
type Unbounded[Message any] struct { // contains filtered or unexported fields }
Unbounded 写循环
- 用于将数据并发安全的写入到底层连接
func NewUnbounded ¶ added in v0.4.1
func NewUnbounded[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *Unbounded[Message]
NewUnbounded 创建写循环
- pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Unbounded 会在写入完成后将 Message 对象放回缓冲池
- writeHandler 写入处理函数
- errorHandler 错误处理函数
传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象
Example ¶
pool := concurrent.NewPool[Message](func() *Message { return &Message{} }, func(data *Message) { data.ID = 0 }) var wait sync.WaitGroup wait.Add(10) wl := writeloop.NewUnbounded(pool, func(message *Message) error { fmt.Println(message.ID) wait.Done() return nil }, func(err any) { fmt.Println(err) }) for i := 0; i < 10; i++ { m := pool.Get() m.ID = i wl.Put(m) } wait.Wait() wl.Close()
Output: 0 1 2 3 4 5 6 7 8 9
Click to show internal directories.
Click to hide internal directories.