redis-stream
介绍
基于redis的队列库,支持延迟队列,设置消费者数量。
依赖 github.com/go-redis/redis
安装教程
go get gitee.com/yjsoft-sh/redis-stream
例子
package main
import (
"fmt"
redis_stream "gitee.com/yjsoft-sh/redis-stream/src"
"github.com/go-redis/redis"
"time"
)
var stream *redis_stream.RedisStream
var queueName = "test_stream"
var delayName = "delay"
func init() {
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
stream, _ = redis_stream.New(client)
}
func main() {
testNormal()
testDelay()
}
// 普通队列
func testNormal() {
_ = stream.Add(queueName, "ok")
group := stream.CreateConsumerGroup(queueName, &redis_stream.ConsumerOption{ConsumerCount: 2})
//当有错误返回的时候,队列里的数据还会在下次消费到
group.Callback(func(message *redis_stream.Payload) error {
fmt.Printf("%v\n", message)
return nil
})
if err := group.Listen(); err != nil {
panic(err)
}
}
// 延迟队列
func testDelay() {
_ = stream.Delay(delayName, "data", time.Minute) //插入数据
group := stream.CreateConsumerGroup(delayName, &redis_stream.ConsumerOption{ConsumerCount: 2, IsDelay: true})
//当有错误返回的时候,队列里的数据还会在下次消费到
group.Callback(func(message *redis_stream.Payload) error {
fmt.Printf("%v\n", message)
return nil
})
if err := group.Listen(); err != nil {
panic(err)
}
}
ConsumerOption 是消费的时候的配置参数,可选值有
- ConsumerCount 消费者数量:默认为10个
- FetchCount 从队列中一次性拿出的数据量,默认为10条
- StreamMaxLen 队列中最多能存放的数据条数,默认为
math.MaxInt
- IsDelay 当前要监听的队列是否是延迟队列
参与贡献
- Fork 本仓库
- 新建 Feat_xxx 分支
- 提交代码
- 新建 Pull Request
特技
- 使用 Readme_XXX.md 来支持不同的语言,例如 Readme_en.md, Readme_zh.md
- Gitee 官方博客 blog.gitee.com
- 你可以 https://gitee.com/explore 这个地址来了解 Gitee 上的优秀开源项目
- GVP 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
- Gitee 官方提供的使用手册 https://gitee.com/help
- Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 https://gitee.com/gitee-stars/