redmq

package module
v0.0.0-...-360bc3d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2024 License: Apache-2.0 Imports: 5 Imported by: 0

README

rdmq-SDK

使用Redis实现mq的SDK

接入SOP

​ 首先需要创建topic和consumer group

  • 创建topic
xadd xscan_topic * first_key first_val
  • 创建 consumer group
XGROUP CREATE test_topic test_group 0-0
  • 构造redis客户端实例
	config := redis.Config{
		Network:  "tcp",
		Address:  "",
		Password: "",
	}
	
	client := redis.NewClient(config)
  • 启动生产者producer
	producer := redmq.NewProducer(client, redmq.WithMsgQueueLen(10))//最多保留10条消息

	ctx := context.Background()
	msgID, err := producer.SendMsg(ctx, producers.Topic, "test_kk", "test_vv")
	if err != nil {
		t.Error(err)
		return
	}

完整示例

consumer

// @Author Chen_dark
// @Date 2024/10/31 11:26:00
// @Desc
package examples

import (
	"context"
	"github.com/YouChenJun/redmq"
	"github.com/YouChenJun/rdmq/redis"
	"testing"
	"time"
)

// 自定义实现的死信队列
type DemoDeadLetterMailbox struct {
	do func(msg *redis.MsgEntity)
}

func NewDemoDeadLetterMailbox(do func(msg *redis.MsgEntity)) *DemoDeadLetterMailbox {
	return &DemoDeadLetterMailbox{
		do: do,
	}
}

// 死信队列接收消息的处理方法
func (d *DemoDeadLetterMailbox) Deliver(ctx context.Context, msg *redis.MsgEntity) error {
	d.do(msg)
	return nil
}

func Test_Consumer(t *testing.T) {
	config := redis.Config{
		Network:  "tcp",
		Address:  "",
		Password: "",
	}
	ConsumerConifg := &redis.ConsumerConfig{
		RedisConfig: config,
		GroupID:     "",
		ConsumerID:  "",
		Topic:       "",
	}

	client := redis.NewClient(config)

	// 接收到消息后的处理函数
	callbackFunc := func(ctx context.Context, msg *redis.MsgEntity) error {
		t.Logf("receive msg, msg id: %s, msg key: %s, msg val: %s", msg.MsgID, msg.Key, msg.Val)
		return nil
	}

	// 自定义实现的死信队列
	demoDeadLetterMailbox := NewDemoDeadLetterMailbox(func(msg *redis.MsgEntity) {
		t.Logf("receive dead letter, msg id: %s, msg key: %s, msg val: %s", msg.MsgID, msg.Key, msg.Val)
	})

	// 构造并启动消费者
	consumer, err := redmq.NewConsumer(client, ConsumerConifg, callbackFunc,
		// 每条消息最多重试 2 次
		redmq.WithMaxRetryLimit(2),
		// 每轮接收消息的超时时间为 2 s
		redmq.WithReceiveTimeout(2*time.Second),
		// 注入自定义实现的死信队列
		redmq.WithDeadLetterMailbox(demoDeadLetterMailbox))
	if err != nil {
		t.Error(err)
		return
	}
	defer consumer.Stop()

	// 十秒后退出单测程序
	<-time.After(10 * time.Second)
}

producer

// @Author Chen_dark
// @Date 2024/10/31 11:33:00
// @Desc
package examples

import (
	"context"
	"github.com/YouChenJun/redmq"
	"github.com/YouChenJun/rdmq/redis"
	"testing"
)

func Test_Producer(t *testing.T) {
	config := redis.Config{
		Network:  "tcp",
		Address:  "192.168.8.189:6379",
		Password: "",
	}
	producers := &redis.ProducerConfig{
		RedisConfig: config,
		Topic:       "",
	}

	client := redis.NewClient(producers.RedisConfig)

	// 最多保留十条消息
	producer := redmq.NewProducer(client, redmq.WithMsgQueueLen(10))

	ctx := context.Background()
	msgID, err := producer.SendMsg(ctx, producers.Topic, "test_kk", "test_vv")
	if err != nil {
		t.Error(err)
		return
	}
	t.Log(msgID)

}

Documentation

Overview

@Author Chen_dark @Date 2024/10/31 11:16:00 @Desc

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

消费者

func NewConsumer

func NewConsumer(client *redis.Client, consumerConfig *redis.ConsumerConfig, callbackFunc MsgCallback, opts ...ConsumerOption) (*Consumer, error)

func (*Consumer) Stop

func (c *Consumer) Stop()

停止 consumer

type ConsumerOption

type ConsumerOption func(opts *ConsumerOptions)

func WaitTime

func WaitTime(waittime time.Duration) ConsumerOption

WaitTime 设置消息等待时间的选项函数

func WithDeadLetterDeliverTimeout

func WithDeadLetterDeliverTimeout(timeout time.Duration) ConsumerOption

WithDeadLetterDeliverTimeout 设置投递死信流程超时阈值的选项函数

func WithDeadLetterMailbox

func WithDeadLetterMailbox(mailbox DeadLetterMailbox) ConsumerOption

WithDeadLetterMailbox 设置死信队列的选项函数

func WithHandleMsgsTimeout

func WithHandleMsgsTimeout(timeout time.Duration) ConsumerOption

WithHandleMsgsTimeout 设置处理消息流程超时阈值的选项函数

func WithMaxRetryLimit

func WithMaxRetryLimit(maxRetryLimit int) ConsumerOption

WithMaxRetryLimit 设置处理消息最大重试次数的选项函数

func WithReceiveTimeout

func WithReceiveTimeout(timeout time.Duration) ConsumerOption

WithReceiveTimeout 设置接收消息超时时间的选项函数

type ConsumerOptions

type ConsumerOptions struct {
	// contains filtered or unexported fields
}

type DeadLetterLogger

type DeadLetterLogger struct{}

默认使用的死信队列,仅仅对消息失败的信息进行日志打印

func NewDeadLetterLogger

func NewDeadLetterLogger() *DeadLetterLogger

func (*DeadLetterLogger) Deliver

func (d *DeadLetterLogger) Deliver(ctx context.Context, msg *redis.MsgEntity) error

type DeadLetterMailbox

type DeadLetterMailbox interface {
	Deliver(ctx context.Context, msg *redis.MsgEntity) error
}

死信队列,当消息处理失败达到指定次数时,会被投递到此处

type MsgCallback

type MsgCallback func(ctx context.Context, msg *redis.MsgEntity) (error, bool)

接收到消息后执行的回调函数-此处的bool为判断,如果true则回复ACK

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func NewProducer

func NewProducer(client *redis.Client, opts ...ProducerOption) *Producer

func (*Producer) SendJsonMsg

func (p *Producer) SendJsonMsg(ctx context.Context, topic, key, val string) (string, error)

生产一条消息-JSON

func (*Producer) SendMsg

func (p *Producer) SendMsg(ctx context.Context, topic, key, val string) (string, error)

生产一条消息

type ProducerOption

type ProducerOption func(opts *ProducerOptions)

func WithMsgQueueLen

func WithMsgQueueLen(len int) ProducerOption

WithMsgQueueLen 设置消息队列长度的选项函数

type ProducerOptions

type ProducerOptions struct {
	// contains filtered or unexported fields
}

type RedisConfig

type RedisConfig struct {
	Network  string `mapstructure:"network"`
	Address  string `mapstructure:"address"`
	Password string `mapstructure:"password"`
	Topic    string `mapstructure:"topic"`
}

Directories

Path Synopsis
@Author Chen_dark @Date 2024/10/31 11:09:00 @Desc
@Author Chen_dark @Date 2024/10/31 11:09:00 @Desc
@Author Chen_dark @Date 2024/10/31 11:16:00 @Desc
@Author Chen_dark @Date 2024/10/31 11:16:00 @Desc

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL