timewheel

package module
v0.0.0-...-b71bc5b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

README

timewheel

timewheel: 纯 golang 实现的时间轮框架

核心能力

基于 golang time ticker + 环形数组实现了单机版时间轮工具

代码主要来源于timewheel

基于 golang time ticker + redis zset 实现了分布式版时间轮工具

参考timewheel,修改任务的执行方式,增加多 redis 客户端的实现

使用示例

使用单测示例代码如下. 参见 ./time_wheel_test.go 文件

  • 单机版时间轮
func Test_timeWheel(t *testing.T) {
	timeWheel := NewTimeWheel(10, 500*time.Millisecond)
	defer timeWheel.Stop()

	timeWheel.AddTask("test1", func() {
		t.Errorf("test1, %v", time.Now())
	}, time.Now().Add(time.Second))
	timeWheel.AddTask("test2", func() {
		t.Errorf("test2, %v", time.Now())
	}, time.Now().Add(5*time.Second))
	timeWheel.AddTask("test2", func() {
		t.Errorf("test2, %v", time.Now())
	}, time.Now().Add(3*time.Second))

	<-time.After(6 * time.Second)
}
  • redis版时间轮
const (
	// redis 服务器信息
	network  = "tcp"
	address  = "localhost:6379"
	password = ""
)

// 任务处理函数
func handle(ctx context.Context, task *RTaskElement) error {
	log.Printf("get task: %v\n", task)
	return nil
}

func Test_RedisTimeWheel_Redigo(t *testing.T) {
	client := redigo.NewClient(network, address, password)
	run(t, client)
}

func Test_RedisTimeWheel_Goredis(t *testing.T) {
	client := goredis.NewClient(network, address, password)
	run(t, client)
}

func run(t *testing.T, client redis.Store) {
	ctx := context.Background()

	rTimeWheel := NewRTimeWheel(client, handle)

	defer rTimeWheel.Stop()

	if err := rTimeWheel.AddTask(ctx, "test1", &RTaskElement{
		Key:  "key1",
		Msg:  util.GetTimeMinuteStr(time.Now()),
		Type: "test",
	}, time.Now().Add(time.Second)); err != nil {
		t.Error(err)
		return
	}

	if err := rTimeWheel.AddTask(ctx, "test2", &RTaskElement{
		Key:  "key2",
		Msg:  util.GetTimeMinuteStr(time.Now()),
		Type: "test",
	}, time.Now().Add(4*time.Second)); err != nil {
		t.Error(err)
		return
	}

	if err := rTimeWheel.RemoveTask(ctx, "test2", time.Now().Add(4*time.Second)); err != nil {
		t.Error(err)
		return
	}

	if err := rTimeWheel.AddTask(ctx, "test3", &RTaskElement{
		Key:  "key3",
		Msg:  util.GetTimeMinuteStr(time.Now()),
		Type: "test",
	}, time.Now().Add(5*time.Second)); err != nil {
		t.Error(err)
		return
	}

	<-time.After(6 * time.Second)
	t.Log("ok")
}


Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type RTaskElement

type RTaskElement struct {
	// 任务 key
	Key string `json:"key"`
	// 任务内容
	Msg string `json:"msg"`
	// 任务类型
	Type string `json:"type"`
	// 执行时间
	ExecuteAtUnix int64 `json:"executeAtUnix"`
}

RTaskElement 任务明细

func NewRTaskElement

func NewRTaskElement(msg string, _type string) *RTaskElement

NewRTaskElement 创建新任务

type RTimeWheel

type RTimeWheel struct {
	// 内置的单例工具,用于保证 stopc 只被关闭一次
	sync.Once
	// contains filtered or unexported fields
}

RTimeWheel redis实现的分布式时间轮

func NewRTimeWheel

func NewRTimeWheel(store redis.Store, handle func(context.Context, *RTaskElement) error) *RTimeWheel

NewRTimeWheel 构造 redis 实现的分布式时间轮

func (*RTimeWheel) AddTask

func (r *RTimeWheel) AddTask(ctx context.Context, key string, task *RTaskElement, executeAt time.Time) error

AddTask 添加定时任务

func (*RTimeWheel) RemoveTask

func (r *RTimeWheel) RemoveTask(ctx context.Context, key string, executeAt time.Time) error

RemoveTask 从 redis 时间轮中删除一个定时任务

func (*RTimeWheel) Stop

func (r *RTimeWheel) Stop()

Stop 停止时间轮

type TimeWheel

type TimeWheel struct {
	// 单例工具,保证时间轮停止操作只能执行一次
	sync.Once
	// contains filtered or unexported fields
}

TimeWheel 时间轮

func NewTimeWheel

func NewTimeWheel(slotNum int, interval time.Duration) *TimeWheel

NewTimeWheel 新建时间轮 slotNum 环状数组长度 interval 轮询时间间隔

func (*TimeWheel) AddTask

func (t *TimeWheel) AddTask(key string, task func(), executeAt time.Time)

AddTask 添加任务到时间轮

func (*TimeWheel) RemoveTask

func (t *TimeWheel) RemoveTask(key string)

RemoveTask 从时间轮移除任务

func (*TimeWheel) Stop

func (t *TimeWheel) Stop()

Stop 停止时间轮

Directories

Path Synopsis
examples
pkg

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL