redis-stream

module
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: MIT

README

redis-stream

介绍

基于redis的队列库,支持延迟队列,设置消费者数量。

依赖 github.com/go-redis/redis

安装教程
go get gitee.com/yjsoft-sh/redis-stream
例子
package main

import (
	"fmt"
	redis_stream "gitee.com/yjsoft-sh/redis-stream/src"
	"github.com/go-redis/redis"
	"time"
)

var stream *redis_stream.RedisStream

var queueName = "test_stream"
var delayName = "delay"

func init() {
	client := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})

	stream, _ = redis_stream.New(client)
}

func main() {
	testNormal()
	testDelay()
}

// 普通队列
func testNormal() {
	_ = stream.Add(queueName, "ok")
	group := stream.CreateConsumerGroup(queueName, &redis_stream.ConsumerOption{ConsumerCount: 2})

	//当有错误返回的时候,队列里的数据还会在下次消费到
	group.Callback(func(message *redis_stream.Payload) error {
		fmt.Printf("%v\n", message)
		return nil
	})
	if err := group.Listen(); err != nil {
		panic(err)
	}
}

// 延迟队列
func testDelay() {
	_ = stream.Delay(delayName, "data", time.Minute) //插入数据

	group := stream.CreateConsumerGroup(delayName, &redis_stream.ConsumerOption{ConsumerCount: 2, IsDelay: true})

	//当有错误返回的时候,队列里的数据还会在下次消费到
	group.Callback(func(message *redis_stream.Payload) error {
		fmt.Printf("%v\n", message)
		return nil
	})
	if err := group.Listen(); err != nil {
		panic(err)
	}
}


ConsumerOption 是消费的时候的配置参数,可选值有
  • ConsumerCount 消费者数量:默认为10个
  • FetchCount 从队列中一次性拿出的数据量,默认为10条
  • StreamMaxLen 队列中最多能存放的数据条数,默认为math.MaxInt
  • IsDelay 当前要监听的队列是否是延迟队列
参与贡献
  1. Fork 本仓库
  2. 新建 Feat_xxx 分支
  3. 提交代码
  4. 新建 Pull Request
特技
  1. 使用 Readme_XXX.md 来支持不同的语言,例如 Readme_en.md, Readme_zh.md
  2. Gitee 官方博客 blog.gitee.com
  3. 你可以 https://gitee.com/explore 这个地址来了解 Gitee 上的优秀开源项目
  4. GVP 全称是 Gitee 最有价值开源项目,是综合评定出的优秀开源项目
  5. Gitee 官方提供的使用手册 https://gitee.com/help
  6. Gitee 封面人物是一档用来展示 Gitee 会员风采的栏目 https://gitee.com/gitee-stars/

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL