rocketmq_client

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2024 License: MIT Imports: 16 Imported by: 0

README

rocketmq-client-go

介绍

go版rocketMQ客户端 对官方github.com/apache/rocketmq-clients/golang/v5对了封装,简化了使用方式,并增加了必填参数的友好提示。 此外,提供了goFrame框架的扩展包,支持生产者到消费者的链路追踪。

依赖
  • golang 1.21
  • github.com/apache/rocketmq-clients/golang/v5

使用gf扩展的话:

  • github.com/gogf/gf/v2 v2.7.1
安装教程
go get -u github.com/yiqiang3344/rocketmq-client-go@latest
使用说明

参考example

direct中为直接使用的示例。

gf中为基于goFrame框架的使用示例。

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NewFilterExpression = func(expression string) *FilterExpression {
	return &FilterExpression{
		Expression:     expression,
		ExpressionType: rmq_client.TAG,
	}
}
View Source
var NewFilterExpressionWithType = func(expression string, expressionType rmq_client.FilterExpressionType) *FilterExpression {
	return &FilterExpression{
		Expression:     expression,
		ExpressionType: expressionType,
	}
}
View Source
var SUB_ALL = NewFilterExpression("*")

Functions

func IsNoNewMessage

func IsNoNewMessage(err error) bool

IsNoNewMessage 是否没有新消息

func IsTooManyRequest

func IsTooManyRequest(err error) bool

IsTooManyRequest 是否触发了流控

func Send

func Send(ctx context.Context, cfg *Config, producer rmq_client.Producer, topicType TopicType, msg Message) (resp []*rmq_client.SendReceipt, err error)

Send 同步发送消息 可支持普通、延迟、顺序类型的消息,不支持事务消息

func SendAsync

func SendAsync(ctx context.Context, cfg *Config, producer rmq_client.Producer, topicType TopicType, msg Message, dealFunc SendAsyncDealFunc) (err error)

SendAsync 异步发送消息 可支持普通、延迟、顺序类型的消息,不支持事务消息

func SendTransaction

func SendTransaction(ctx context.Context, cfg *Config, producer rmq_client.Producer, message Message, confirmFunc ConfirmFunc) (resp []*rmq_client.SendReceipt, err error)

SendTransaction 发送事务消息 注意:事务消息的生产者不能和其他类型消息的生产者共用

func SimpleConsume

func SimpleConsume(ctx context.Context, cfg *Config, consumeFunc ConsumeFunc, oFunc ...ConsumerOptionFunc) (stopFunc func(), err error)

SimpleConsume 简单消费类型消费

func SimpleConsume4Gf

func SimpleConsume4Gf(ctx context.Context, cfg *Config, consumeFunc ConsumeFunc, oFunc ...ConsumerOptionFunc) (stopFunc func(), err error)

SimpleConsume4Gf gf版简单消费类型消费

Types

type Config

type Config struct {
	Endpoint         string           //必填
	NameSpace        string           //必填
	ConsumerGroup    string           //使用消费者时,必填
	AccessKey        string           //可选
	AccessSecret     string           //可选
	LogPath          string           //官方rocketmq日志文件路径,默认为/tmp
	LogStdout        bool             //是否在终端输出官方rocketmq日志,输出的话则不会记录日志文件
	Debug            bool             //是否在终端输出本客户端的debug信息
	DebugHandlerFunc debugHandlerFunc //本客户端的debug信息处理方法,不管debug开没开,有debug信息的时候都会调用
	FlowColor        *string          //流量染色标识,为nil则表示不启用流量染色功能,生产者时表示流量染色标识,消费者时表示当前系统的染色标识
	FlowColorBase    *bool            //当前环境是否是基准环境,消费者使用,为nil则忽略,是基准系统时,可以匹配流量标识为空字符串的消息
}

type ConfirmFunc

type ConfirmFunc func(msg Message, resp []*rmq_client.SendReceipt) bool

ConfirmFunc 二次确认方法 注意:不要异步处理,本地事务逻辑提交时返回true,否则返回false

type ConsumeFunc

type ConsumeFunc func(ctx context.Context, msg *rmq_client.MessageView, consumer Consumer) error

ConsumeFunc 消费方法 方法内消费成功时需要调用consumer.Ack(); 消费时间可能超过消费者MaxMessageNum设置的时间时,可调用consumer.ChangeInvisibleDuration()或consumer.ChangeInvisibleDurationAsync()方法调整消息消费超时时间;

type Consumer

type Consumer interface {
	Ack(ctx context.Context) error
	ChangeInvisibleDuration(invisibleDuration time.Duration) error
	ChangeInvisibleDurationAsync(invisibleDuration time.Duration)
}

type ConsumerOptionFunc

type ConsumerOptionFunc func(options *ConsumerOptions)

func WithConsumerOptionAwaitDuration

func WithConsumerOptionAwaitDuration(AwaitDuration time.Duration) ConsumerOptionFunc

func WithConsumerOptionInvisibleDuration

func WithConsumerOptionInvisibleDuration(InvisibleDuration time.Duration) ConsumerOptionFunc

func WithConsumerOptionMaxMessageNum

func WithConsumerOptionMaxMessageNum(MaxMessageNum int32) ConsumerOptionFunc

func WithConsumerOptionSubExpressions

func WithConsumerOptionSubExpressions(SubExpressions map[string]*FilterExpression) ConsumerOptionFunc

type ConsumerOptions

type ConsumerOptions struct {
	AwaitDuration     time.Duration                //接收消息的超时时间,默认5秒,实际值为设置值+3秒
	MaxMessageNum     int32                        //每次接收的消息数量,默认10
	InvisibleDuration time.Duration                //接收到的消息的不可见时间,默认10秒
	SubExpressions    map[string]*FilterExpression //订阅表达式,必填,key为topic,简单消费类型只支持tag和sql匹配
}

type FilterExpression added in v1.0.1

type FilterExpression struct {
	Expression     string
	ExpressionType rmq_client.FilterExpressionType
}

type Message

type Message struct {
	Body              string            //消息内容,必填
	Topic             string            //主题,必填
	Tag               string            //标签,可选
	MessageGroup      string            //消息组,FIFO消息类型必填,其他可选
	Keys              []string          //索引列表,可选
	Properties        map[string]string //自定义属性,可选
	DeliveryTimestamp time.Time         //延迟时间,Delay消息类型必填,其他可选
}

type Producer

type Producer interface {
	Stop() error                                                                                            //注销消费者
	Send(ctx context.Context, topicType TopicType, msg Message) (resp []*rmq_client.SendReceipt, err error) //同步发送消息
	SendAsync(ctx context.Context, topicType TopicType, msg Message, dealFunc SendAsyncDealFunc) error      //异步发送消息
	SendTransaction(ctx context.Context, message Message, confirmFunc ConfirmFunc) error                    //发送事务消息
}

func GetGfProducer

func GetGfProducer(cfg *Config, oFunc ...ProducerOptionFunc) (producer Producer, err error)

func GetProducer

func GetProducer(cfg *Config, oFunc ...ProducerOptionFunc) (producer Producer, err error)

type ProducerOptionFunc

type ProducerOptionFunc func(options *ProducerOptions)

func WithProducerOptionMaxAttempts

func WithProducerOptionMaxAttempts(maxAttempts int32) ProducerOptionFunc

func WithProducerOptionTopics

func WithProducerOptionTopics(Topics ...string) ProducerOptionFunc

func WithProducerOptionTransactionChecker

func WithProducerOptionTransactionChecker(transactionChecker SendTransactionCheckerFunc) ProducerOptionFunc

type ProducerOptions

type ProducerOptions struct {
	Topics      []string //支持的主题列表,可选
	MaxAttempts int32    //重试次数,可选
	// contains filtered or unexported fields
}

type SendAsyncDealFunc

type SendAsyncDealFunc func(ctx context.Context, msg Message, resp []*rmq_client.SendReceipt, err error)

type SendTransactionCheckerFunc

type SendTransactionCheckerFunc func(msg *rmq_client.MessageView) rmq_client.TransactionResolution

type TopicType

type TopicType string

TopicType 主题类型

const (
	TopicNormal      TopicType = "NORMAL"
	TopicFIFO        TopicType = "FIFO"
	TopicDelay       TopicType = "DELAY"
	TopicTransaction TopicType = "TRANSACTION"

	FlowColor = "FlowColor"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL