writeloop

package
v0.4.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2023 License: MIT Imports: 3 Imported by: 0

README

WriteLoop

Go doc

该包提供了一个并发安全的写循环实现。开发者可以使用它来快速构建和管理写入操作。

写循环是一种特殊的循环,它可以并发安全地将数据写入到底层连接。写循环在 Minotaur 中是一个泛型类型,可以处理任意类型的消息。

Unbounded 基于无界缓冲区的写循环

基于无界缓冲区的写循环实现,它可以处理任意数量的消息。它使用 Pool 来管理消息对象,使用 Unbounded 来管理消息队列。

Unbounded 使用了 PoolUnbounded 进行实现。 通过 Pool 创建的消息对象无需手动释放,它会在写循环处理完消息后自动回收。

Channel 基于 chan 的写循环

基于 chan 的写循环实现,拥有极高的吞吐量。它使用 Pool 来管理消息对象,使用 Channel 来管理消息队列。

如何选择

  • Unbounded 适用于消息体较小、消息量庞大、消息处理时间较慢的场景,由于没有消息数量限制,所以它可以处理任意数量的消息,但是它的吞吐量较低,并且会占用大量的内存。
  • Channel 适用于消息体较大、消息量较小、消息处理时间较快的场景,由于使用了 Channel 来管理消息队列,所以它的吞吐量较高,但是它的消息数量是有限制的,如果消息数量超过了限制,那么写循环会阻塞,造成无法及时响应消息(例如心跳中断等)。

通常来说,建议使用 Channel 来实现写循环,因为它的吞吐量更高,并且绝大多数情况是很难达到阻塞的。

使用示例

该示例由 Unbounded 实现,Channel 的使用方法与之类似。

package main

import (
    "fmt"
	"github.com/kercylan98/minotaur/server/writeloop"
	"github.com/kercylan98/minotaur/utils/concurrent"
)

func main() {
	pool := concurrent.NewPool[Message](func() *Message {
		return &Message{}
	}, func(data *Message) {
		data.ID = 0
	})
	var wait sync.WaitGroup
	wait.Add(10)
	wl := writeloop.NewUnbounded(pool, func(message *Message) error {
		fmt.Println(message.ID)
		wait.Done()
		return nil
	}, func(err any) {
		fmt.Println(err)
	})

	for i := 0; i < 10; i++ {
		m := pool.Get()
		m.ID = i
		wl.Put(m)
	}

	wait.Wait()
	wl.Close()
}

在这个示例中,我们创建了一个写循环,然后将一些消息放入写循环。每个消息都会被 writeHandle 函数处理,如果在处理过程中发生错误,errorHandle 函数会被调用。在使用完写循环后,我们需要关闭它。

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel added in v0.4.1

type Channel[T any] struct {
	// contains filtered or unexported fields
}

Channel 基于 chan 的写循环,与 Unbounded 相同,但是使用 Channel 实现

func NewChannel added in v0.4.1

func NewChannel[Message any](pool *concurrent.Pool[Message], channelSize int, writeHandler func(message Message) error, errorHandler func(err any)) *Channel[Message]

NewChannel 创建基于 Channel 的写循环

  • pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Channel 会在写入完成后将 Message 对象放回缓冲池
  • channelSize Channel 的大小
  • writeHandler 写入处理函数
  • errorHandler 错误处理函数

传入 writeHandler 的消息对象是从 Channel 中获取的,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象

func (*Channel[T]) Close added in v0.4.1

func (slf *Channel[T]) Close()

Close 关闭写循环

func (*Channel[T]) Put added in v0.4.1

func (slf *Channel[T]) Put(message T)

Put 将数据放入写循环,message 应该来源于 concurrent.Pool

type Unbounded added in v0.4.1

type Unbounded[Message any] struct {
	// contains filtered or unexported fields
}

Unbounded 写循环

  • 用于将数据并发安全的写入到底层连接

func NewUnbounded added in v0.4.1

func NewUnbounded[Message any](pool *concurrent.Pool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *Unbounded[Message]

NewUnbounded 创建写循环

  • pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Unbounded 会在写入完成后将 Message 对象放回缓冲池
  • writeHandler 写入处理函数
  • errorHandler 错误处理函数

传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象

Example
pool := concurrent.NewPool[Message](func() *Message {
	return &Message{}
}, func(data *Message) {
	data.ID = 0
})
var wait sync.WaitGroup
wait.Add(10)
wl := writeloop.NewUnbounded(pool, func(message *Message) error {
	fmt.Println(message.ID)
	wait.Done()
	return nil
}, func(err any) {
	fmt.Println(err)
})

for i := 0; i < 10; i++ {
	m := pool.Get()
	m.ID = i
	wl.Put(m)
}

wait.Wait()
wl.Close()
Output:

0
1
2
3
4
5
6
7
8
9

func (*Unbounded[Message]) Close added in v0.4.1

func (slf *Unbounded[Message]) Close()

Close 关闭写循环

func (*Unbounded[Message]) Put added in v0.4.1

func (slf *Unbounded[Message]) Put(message Message)

Put 将数据放入写循环,message 应该来源于 concurrent.Pool

type WriteLoop

type WriteLoop[Message any] interface {
	Put(message Message)
	Close()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL