rocketmq

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: MIT Imports: 11 Imported by: 0

README

queue用法

生产者

  1. 应用入口载入组件:
package main

import (
    "gintpl/pkg/queue/rocketmq"
)

rocketmq.InitRMQProducer(cfg)
  1. 发送消息
package main

import (
    "gintpl/pkg/queue/rocketmq"
)

rocketmq.Send(ctx, "topic", "msg")

消费者

  1. 实现Consumer接口
  2. 使用init注册消费者组
  3. 启动消费者:rocketmq.InitConsumer(web.Config.RocketMq)

RocketMQ部署文档:https://rocketmq.io/course/deploy/rocketmq_learning-gvr7dx_awbbpb_ogr2blaw8vy3tv14/?spm=5176.29160081.0.0.50a9608eF04Ez7

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrReceipt = errors.New("send message receipt empty")

Functions

func ConsumerStop

func ConsumerStop()

ConsumerStop 停止所有消费组

func InitConsumer

func InitConsumer(cfg *config.RocketMq)

InitConsumer 初始化消费组

func InitProducer

func InitProducer(cfg *config.RocketMq)

InitProducer 初始化rocket mq生产者

func ProducerStop

func ProducerStop()

ProducerStop 停止生产者

func RegisterConsumer

func RegisterConsumer(c Consumer)

RegisterConsumer 注册消费组组

func Send

func Send(ctx context.Context, topic string, body []byte, opts ...MessageOpts) error

Send 发送消息 rocket mq 客户端内部有重试逻辑,建议调用者要处理发送错误避免发送失败消息丢失,失败时可以把消息写入到本地db

func StartConsumer

func StartConsumer(ctx context.Context, endpoint, accessKey, secretKey string, consumer Consumer)

StartConsumer 启动消费者 rocket mq的原则,同一个消费者组下所有消费者实例所订阅的Topic、Tag必须完全一致,否则可能会出现混乱,丢失 ctx 仅用来控制退出

Types

type AckFn

type AckFn func()

AckFn 处理成功的ack通知

type Consumer

type Consumer interface {
	// GroupName 组名称
	GroupName() string
	// Number 消费组消费者数量,消费者数量大于1时会异步执行,否则时同步执行
	Number() int
	// Subscribe 订阅的主题和tag, key是要订阅的topic,value是要订阅的tag,所有是*
	Subscribe() map[string]string
	// Run 消费者消费消息逻辑,异步执行时处理成功需要主动调用AckFn函数,否则AckFun是nil
	Run(*rmq.MessageView, AckFn) error
}

Consumer 消费者接口

type MessageOpts

type MessageOpts struct {
	Tag       string
	Keys      []string
	Group     string
	DelayTime int // 延迟秒数
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL