writeloop

package
v0.5.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2024 License: MIT Imports: 3 Imported by: 0

README

Writeloop

Go doc

暂无介绍...

目录导航

列出了该 package 下所有的函数及类型定义,可通过目录导航进行快捷跳转 ❤️

展开 / 折叠目录导航

包级函数定义

函数名称 描述
NewChannel 创建基于 Channel 的写循环
NewUnbounded 创建写循环

类型定义

类型 名称 描述
STRUCT Channel 基于 chan 的写循环,与 Unbounded 相同,但是使用 Channel 实现
STRUCT Unbounded 写循环
INTERFACE WriteLoop 暂无描述...

详情信息

func NewChannel[Message any](pool *hub.ObjectPool[Message], channelSize int, writeHandler func (message Message) error, errorHandler func (err any)) *Channel[Message]

创建基于 Channel 的写循环

  • pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Channel 会在写入完成后将 Message 对象放回缓冲池
  • channelSize Channel 的大小
  • writeHandler 写入处理函数
  • errorHandler 错误处理函数

传入 writeHandler 的消息对象是从 Channel 中获取的,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象


func NewUnbounded[Message any](pool *hub.ObjectPool[Message], writeHandler func (message Message) error, errorHandler func (err any)) *Unbounded[Message]

创建写循环

  • pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Unbounded 会在写入完成后将 Message 对象放回缓冲池
  • writeHandler 写入处理函数
  • errorHandler 错误处理函数

传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象

示例代码:


func ExampleNewUnbounded() {
	pool := hub.NewObjectPool[Message](func() *Message {
		return &Message{}
	}, func(data *Message) {
		data.ID = 0
	})
	var wait sync.WaitGroup
	wait.Add(10)
	wl := writeloop.NewUnbounded(pool, func(message *Message) error {
		fmt.Println(message.ID)
		wait.Done()
		return nil
	}, func(err any) {
		fmt.Println(err)
	})
	for i := 0; i < 10; i++ {
		m := pool.Get()
		m.ID = i
		wl.Put(m)
	}
	wait.Wait()
	wl.Close()
}

查看 / 收起单元测试

func TestNewUnbounded(t *testing.T) {
	wl := writeloop.NewUnbounded(wp, func(message *Message) error {
		t.Log(message.ID)
		return nil
	}, func(err any) {
		t.Log(err)
	})
	assert.NotNil(t, wl)
	wl.Close()
}


Channel STRUCT

基于 chan 的写循环,与 Unbounded 相同,但是使用 Channel 实现

type Channel[T any] struct {
	c chan T
}

func (*Channel) Put(message T)

将数据放入写循环,message 应该来源于 hub.ObjectPool


func (*Channel) Close()

关闭写循环


Unbounded STRUCT

写循环

  • 用于将数据并发安全的写入到底层连接
type Unbounded[Message any] struct {
	buf *buffer.Unbounded[Message]
}

func (*Unbounded) Put(message Message)

将数据放入写循环,message 应该来源于 hub.ObjectPool

查看 / 收起单元测试

func TestUnbounded_Put(t *testing.T) {
	wl := writeloop.NewUnbounded(wp, func(message *Message) error {
		t.Log(message.ID)
		return nil
	}, func(err any) {
		t.Log(err)
	})
	assert.NotNil(t, wl)
	for i := 0; i < 100; i++ {
		m := wp.Get()
		m.ID = i
		wl.Put(m)
	}
	wl.Close()
}

查看 / 收起基准测试

func BenchmarkUnbounded_Put(b *testing.B) {
	wl := writeloop.NewUnbounded(wp, func(message *Message) error {
		return nil
	}, nil)
	defer func() {
		wl.Close()
	}()
	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			wl.Put(wp.Get())
		}
	})
	b.StopTimer()
}


func (*Unbounded) Close()

关闭写循环

查看 / 收起单元测试

func TestUnbounded_Close(t *testing.T) {
	wl := writeloop.NewUnbounded(wp, func(message *Message) error {
		t.Log(message.ID)
		return nil
	}, func(err any) {
		t.Log(err)
	})
	assert.NotNil(t, wl)
	for i := 0; i < 100; i++ {
		m := wp.Get()
		m.ID = i
		wl.Put(m)
	}
	wl.Close()
}


WriteLoop INTERFACE
type WriteLoop[Message any] interface {
	Put(message Message)
	Close()
}

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Channel added in v0.4.1

type Channel[T any] struct {
	// contains filtered or unexported fields
}

Channel 基于 chan 的写循环,与 Unbounded 相同,但是使用 Channel 实现

func NewChannel added in v0.4.1

func NewChannel[Message any](pool *hub.ObjectPool[Message], channelSize int, writeHandler func(message Message) error, errorHandler func(err any)) *Channel[Message]

NewChannel 创建基于 Channel 的写循环

  • pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Channel 会在写入完成后将 Message 对象放回缓冲池
  • channelSize Channel 的大小
  • writeHandler 写入处理函数
  • errorHandler 错误处理函数

传入 writeHandler 的消息对象是从 Channel 中获取的,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象

func (*Channel[T]) Close added in v0.4.1

func (slf *Channel[T]) Close()

Close 关闭写循环

func (*Channel[T]) Put added in v0.4.1

func (slf *Channel[T]) Put(message T)

Put 将数据放入写循环,message 应该来源于 hub.ObjectPool

type Unbounded added in v0.4.1

type Unbounded[Message any] struct {
	// contains filtered or unexported fields
}

Unbounded 写循环

  • 用于将数据并发安全的写入到底层连接

func NewUnbounded added in v0.4.1

func NewUnbounded[Message any](pool *hub.ObjectPool[Message], writeHandler func(message Message) error, errorHandler func(err any)) *Unbounded[Message]

NewUnbounded 创建写循环

  • pool 用于管理 Message 对象的缓冲池,在创建 Message 对象时也应该使用该缓冲池,以便复用 Message 对象。 Unbounded 会在写入完成后将 Message 对象放回缓冲池
  • writeHandler 写入处理函数
  • errorHandler 错误处理函数

传入 writeHandler 的消息对象是从 pool 中获取的,并且在 writeHandler 执行完成后会被放回 pool 中,因此 writeHandler 不应该持有消息对象的引用,同时也不应该主动释放消息对象

Example
pool := hub.NewObjectPool[Message](func() *Message {
	return &Message{}
}, func(data *Message) {
	data.ID = 0
})
var wait sync.WaitGroup
wait.Add(10)
wl := writeloop.NewUnbounded(pool, func(message *Message) error {
	fmt.Println(message.ID)
	wait.Done()
	return nil
}, func(err any) {
	fmt.Println(err)
})

for i := 0; i < 10; i++ {
	m := pool.Get()
	m.ID = i
	wl.Put(m)
}

wait.Wait()
wl.Close()
Output:

0
1
2
3
4
5
6
7
8
9

func (*Unbounded[Message]) Close added in v0.4.1

func (slf *Unbounded[Message]) Close()

Close 关闭写循环

func (*Unbounded[Message]) Put added in v0.4.1

func (slf *Unbounded[Message]) Put(message Message)

Put 将数据放入写循环,message 应该来源于 hub.ObjectPool

type WriteLoop

type WriteLoop[Message any] interface {
	Put(message Message)
	Close()
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL