delayq

package module
v0.1.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2022 License: MIT Imports: 11 Imported by: 0

README

延迟队列 delayq

业务背景

在工作时,遇到以下业务需求

  • 某个资源过期后,在过期后的第 30 天后会自动删除,但是在自动删除前,需要在在过期第 23 天,28天,29天发短息通知用户,该过期资源将被删除

  • 用户要删除某个重要资源时,需要留 24 小时的缓冲时间给用户考虑是否真的要删掉,若 24 小时内用户没有撤销删除操作,则 24 小时后将该资源删除

怎么去解决以上的业务需求呢? 一开始最容易想到的就是后台一直去轮询 mysql 的表,看看是否该删除空间/发短信通知用户。这种做法存在的问题是: mysql 的性能比较容易到达瓶颈,一直去扫表,mysql 的压力会很大。因此这个方案是不可行的。

后来,我们采用的方案是延迟队列,一开始考虑的是用 beanstalkd , 但是 beanstalkd 存在一个问题是,只支持入队操作,不支持出队操作。因此 beanstalkd 也被抛弃了。因为在我们的需求中,是需要有出队操作的。

例如,当用户删除某个资源时,将该准备删除的资源加入延迟队列,24小时后自动删除该资源,如果在 24 小时内,用户点击了撤销操作,此时需要将该资源的信息从延迟队列中删除。

后面参考了有赞的延迟队列设计 的文章,实现了一个延迟队列。在延迟队列设计中,最重要的就是以下三个组件

  • Job Pool:存放 Job 的元信息
  • Delay Queue: 存放暂不可被消费的 jobID
  • Ready Queue: 存放着可以被消费的 jobID

将一个延迟任务入队的流程如下:

  • 将 Job 的元信息添加到 Job Pool 中
  • 将 Job ID 和 到期时间存入到 Delay Queue
  • Timer 每隔 1 秒钟去扫描 Delay Queue ,如果发现当前时间大于等于某个 Job 的到期时间,将 Job ID 添加到 Ready Queue 中,并且把 Delay Queue 中的对应的 Job ID 删除
  • Ready Queue 中的 Job ID 会被消费者消费掉

将一个非延迟任务队列入队流程如下:

  • 将 Job 的元信息添加到 Job Pool 中
  • 将 Job ID 存入到 Ready Queue
  • Ready Queue 中的 Job ID 会被消费者消费掉

将一个延迟队列出队流程如下:

  • 根据 JobID 将 Delay Queue 中对应的数据删掉
  • 根据 JobID 将 Job Pool 中的元信息删除

Quickstart

通过 go get 安装 delayq

go get -u github.com/withseid/delayq

初始化客户端

package main

import (
	"encoding/json"
	"time"

	"github.com/withseid/delayq"
	"github.com/withseid/delayq/example/model"
)

func main() {
	config := delayq.RedisConfiguration{
		Host: "192.168.89.160",
		Port: "6379",
	}
	client := delayq.NewClient(config)




}

延迟任务入队可选 Option

  • ProcessAt: 在某个时间点执行任务
  • ProcessIn: 从当前时间算起,延迟多久后再执行任务
  • Retry:重试次数,此 Option 不配置时,默认无限重试,重试时间每次递增,从 21 -> 212 s

在可选 Option 中,若 ProcessAt 和 ProcessIn 都不选,则该任务立即执行。多 ProcessAt 和 ProcessIn 都选了,则只有最后一个会生效。

可选 Option ProcessAt, 例如 delayq.ProcessAt(time.Now().AddDate(0, 0, 1)) 表示将在第二天的这个时间执行某个任务

	space1 := model.DeletedSpace{
		SpaceID: "space1",
	}
	data, err := json.Marshal(space1)
	if err != nil {
		panic(err)
	}

	// 假设当前时间是 2022-08-16 18:12
	// delayq.ProcessAt(time.Now().AddDate(0, 0, 1)) 表示将在 2022-08-17 18:12 执行该任务
	client.Enqueue(space1.Topic(), space1.SpaceID, data, delayq.ProcessAt(time.Now().AddDate(0, 0, 1)))

可选 Option ProcessIn,例如 delayq.ProcessIn(time.Second*10) 表示 5s 后执行某个任务,

	space2 := model.DeletedSpace{
		SpaceID: "space2",
	}
	data, err := json.Marshal(space2)
	if err != nil {
		panic(err)
	}
	// delayq.ProcessIn(time.Second*24) 表示将在当前时间的基础上,延迟 10s 后执行
	client.Enqueue(space2.Topic(), space2.SpaceID, data, delayq.ProcessIn(time.Second*10))

可选 Option Retry, 例如 delayq.Retry(6), 表示重试 6 次,若重试 6 次都失败,则自动丢弃任务,默认为无限重试

	space3 := model.DeletedSpace{
		SpaceID: "space3",
	}
	data, err := json.Marshal(space3)
	if err != nil {
		panic(err)
	}
	client.Enqueue(space3.Topic(), space3.SpaceID, data, delayq.Retry(6))

出队

	// 将 JobID 为 space2 的延迟任务出队
	client.Dequeue(space2.Topic(), space2.SpaceID)

新建一个 Server,负责消费延迟队列中的消息并且执行延迟延迟

package main

import (
	"context"

	"github.com/withseid/delayq"
	"github.com/withseid/delayq/example/model"
)

func main() {
	config := delayq.RedisConfiguration{
		Host: "192.168.89.160",
		Port: "6379",
	}
	server := delayq.NewServer(config)
	ds := model.DeletedSpace{}

	server.HandlerFunc(ds.Topic(), &ds)
	server.Run(context.TODO())
}

DeletedSpace 类型的延迟任务的处理逻辑

package model

import (
	"context"
	"encoding/json"
	"fmt"

	"github.com/withseid/delayq"
)

// const DeletedSpaceTopic = "deleted_space"

type DeletedSpace struct {
	SpaceID string
}

func (d *DeletedSpace) Topic() string {
	return "deleted_space"
}

func (d *DeletedSpace) Execute(ctx context.Context, job *delayq.Job) error {
	ds := DeletedSpace{}
	err := json.Unmarshal(job.Boday, &ds)
	if err != nil {
		return err
	}

	fmt.Println("DeletedSpace: ", ds)
	return nil
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	RedisDelayQueue   = "delayq:dq"
	RedisJobPool      = "delayq:jp"
	RedisReadyQueue   = "delayq:rq"
	RedisProcessQueue = "delayq:process"
)

Functions

This section is empty.

Types

type Client added in v0.1.1

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(config RedisConfiguration) Client

func (*Client) Dequeue added in v0.1.1

func (c *Client) Dequeue(topic string, jobID string)

func (*Client) Enqueue added in v0.1.1

func (c *Client) Enqueue(topic string, jobID string, payload []byte, opts ...Option)

type Handler

type Handler interface {
	Execute(ctx context.Context, job *Job) error
	Topic() string
}

type Job

type Job struct {
	Topic      string
	ID         string
	Delay      int64
	MaxRetry   int64
	RetryCount int64
	TTR        int64
	Boday      []byte
}

type Option

type Option interface {
	String() string
	Type() OptionType
	Value() interface{}
}

func ProcessAt

func ProcessAt(d time.Time) Option

func ProcessIn

func ProcessIn(d time.Duration) Option

func Retry added in v0.1.4

func Retry(d int64) Option

func Timeout

func Timeout(d time.Duration) Option

type OptionType

type OptionType int
const (
	ProcessAtOpt OptionType = iota
	ProcessInOpt
	TimeoutOpt
	RetryOpt
)

type RedisConfiguration

type RedisConfiguration struct {
	Host string
	Port string
}

type Semaphore

type Semaphore struct {
	// contains filtered or unexported fields
}

func NewSemaphore

func NewSemaphore(maxSize int) *Semaphore

func (*Semaphore) Add

func (s *Semaphore) Add(delta int)

func (*Semaphore) Done

func (s *Semaphore) Done()

func (*Semaphore) Wait

func (s *Semaphore) Wait()

type Server added in v0.1.1

type Server struct {
	// contains filtered or unexported fields
}

func NewServer

func NewServer(config RedisConfiguration) *Server

func (*Server) HandlerFunc added in v0.1.1

func (n *Server) HandlerFunc(topic string, handler Handler)

func (*Server) Run added in v0.1.1

func (n *Server) Run(ctx context.Context) error

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL