Documentation ¶
Index ¶
- Variables
- func IsNoNewMessage(err error) bool
- func IsTooManyRequest(err error) bool
- func Send(ctx context.Context, cfg *Config, producer rmq_client.Producer, ...) (resp []*rmq_client.SendReceipt, err error)
- func SendAsync(ctx context.Context, cfg *Config, producer rmq_client.Producer, ...) (err error)
- func SendTransaction(ctx context.Context, cfg *Config, producer rmq_client.Producer, ...) (resp []*rmq_client.SendReceipt, err error)
- func SimpleConsume(ctx context.Context, cfg *Config, consumeFunc ConsumeFunc, ...) (stopFunc func(), err error)
- func SimpleConsume4Gf(ctx context.Context, cfg *Config, consumeFunc ConsumeFunc, ...) (stopFunc func(), err error)
- type Config
- type ConfirmFunc
- type ConsumeFunc
- type Consumer
- type ConsumerOptionFunc
- func WithConsumerOptionAwaitDuration(AwaitDuration time.Duration) ConsumerOptionFunc
- func WithConsumerOptionInvisibleDuration(InvisibleDuration time.Duration) ConsumerOptionFunc
- func WithConsumerOptionMaxMessageNum(MaxMessageNum int32) ConsumerOptionFunc
- func WithConsumerOptionSubExpressions(SubExpressions map[string]*FilterExpression) ConsumerOptionFunc
- type ConsumerOptions
- type FilterExpression
- type Message
- type Producer
- type ProducerOptionFunc
- type ProducerOptions
- type SendAsyncDealFunc
- type SendTransactionCheckerFunc
- type TopicType
Constants ¶
This section is empty.
Variables ¶
View Source
var NewFilterExpression = func(expression string) *FilterExpression { return &FilterExpression{ Expression: expression, ExpressionType: rmq_client.TAG, } }
View Source
var NewFilterExpressionWithType = func(expression string, expressionType rmq_client.FilterExpressionType) *FilterExpression { return &FilterExpression{ Expression: expression, ExpressionType: expressionType, } }
View Source
var SUB_ALL = NewFilterExpression("*")
Functions ¶
func Send ¶
func Send(ctx context.Context, cfg *Config, producer rmq_client.Producer, topicType TopicType, msg Message) (resp []*rmq_client.SendReceipt, err error)
Send 同步发送消息 可支持普通、延迟、顺序类型的消息,不支持事务消息
func SendAsync ¶
func SendAsync(ctx context.Context, cfg *Config, producer rmq_client.Producer, topicType TopicType, msg Message, dealFunc SendAsyncDealFunc) (err error)
SendAsync 异步发送消息 可支持普通、延迟、顺序类型的消息,不支持事务消息
func SendTransaction ¶
func SendTransaction(ctx context.Context, cfg *Config, producer rmq_client.Producer, message Message, confirmFunc ConfirmFunc) (resp []*rmq_client.SendReceipt, err error)
SendTransaction 发送事务消息 注意:事务消息的生产者不能和其他类型消息的生产者共用
func SimpleConsume ¶
func SimpleConsume(ctx context.Context, cfg *Config, consumeFunc ConsumeFunc, oFunc ...ConsumerOptionFunc) (stopFunc func(), err error)
SimpleConsume 简单消费类型消费
func SimpleConsume4Gf ¶
func SimpleConsume4Gf(ctx context.Context, cfg *Config, consumeFunc ConsumeFunc, oFunc ...ConsumerOptionFunc) (stopFunc func(), err error)
SimpleConsume4Gf gf版简单消费类型消费
Types ¶
type Config ¶
type Config struct { Endpoint string //必填 NameSpace string //必填 ConsumerGroup string //使用消费者时,必填 AccessKey string //可选 AccessSecret string //可选 LogPath string //官方rocketmq日志文件路径,默认为/tmp LogStdout bool //是否在终端输出官方rocketmq日志,输出的话则不会记录日志文件 Debug bool //是否在终端输出本客户端的debug信息 DebugHandlerFunc debugHandlerFunc //本客户端的debug信息处理方法,不管debug开没开,有debug信息的时候都会调用 FlowColor *string //流量染色标识,为nil则表示不启用流量染色功能,生产者时表示流量染色标识,消费者时表示当前系统的染色标识 FlowColorBase *bool //当前环境是否是基准环境,消费者使用,为nil则忽略,是基准系统时,可以匹配流量标识为空字符串的消息 }
type ConfirmFunc ¶
type ConfirmFunc func(msg Message, resp []*rmq_client.SendReceipt) bool
ConfirmFunc 二次确认方法 注意:不要异步处理,本地事务逻辑提交时返回true,否则返回false
type ConsumeFunc ¶
type ConsumeFunc func(ctx context.Context, msg *rmq_client.MessageView, consumer Consumer) error
ConsumeFunc 消费方法 方法内消费成功时需要调用consumer.Ack(); 消费时间可能超过消费者MaxMessageNum设置的时间时,可调用consumer.ChangeInvisibleDuration()或consumer.ChangeInvisibleDurationAsync()方法调整消息消费超时时间;
type ConsumerOptionFunc ¶
type ConsumerOptionFunc func(options *ConsumerOptions)
func WithConsumerOptionAwaitDuration ¶
func WithConsumerOptionAwaitDuration(AwaitDuration time.Duration) ConsumerOptionFunc
func WithConsumerOptionInvisibleDuration ¶
func WithConsumerOptionInvisibleDuration(InvisibleDuration time.Duration) ConsumerOptionFunc
func WithConsumerOptionMaxMessageNum ¶
func WithConsumerOptionMaxMessageNum(MaxMessageNum int32) ConsumerOptionFunc
func WithConsumerOptionSubExpressions ¶
func WithConsumerOptionSubExpressions(SubExpressions map[string]*FilterExpression) ConsumerOptionFunc
type ConsumerOptions ¶
type FilterExpression ¶ added in v1.0.1
type FilterExpression struct { Expression string ExpressionType rmq_client.FilterExpressionType }
type Producer ¶
type Producer interface { Stop() error //注销消费者 Send(ctx context.Context, topicType TopicType, msg Message) (resp []*rmq_client.SendReceipt, err error) //同步发送消息 SendAsync(ctx context.Context, topicType TopicType, msg Message, dealFunc SendAsyncDealFunc) error //异步发送消息 SendTransaction(ctx context.Context, message Message, confirmFunc ConfirmFunc) error //发送事务消息 }
func GetGfProducer ¶
func GetGfProducer(cfg *Config, oFunc ...ProducerOptionFunc) (producer Producer, err error)
func GetProducer ¶
func GetProducer(cfg *Config, oFunc ...ProducerOptionFunc) (producer Producer, err error)
type ProducerOptionFunc ¶
type ProducerOptionFunc func(options *ProducerOptions)
func WithProducerOptionMaxAttempts ¶
func WithProducerOptionMaxAttempts(maxAttempts int32) ProducerOptionFunc
func WithProducerOptionTopics ¶
func WithProducerOptionTopics(Topics ...string) ProducerOptionFunc
func WithProducerOptionTransactionChecker ¶
func WithProducerOptionTransactionChecker(transactionChecker SendTransactionCheckerFunc) ProducerOptionFunc
type ProducerOptions ¶
type SendAsyncDealFunc ¶
type SendAsyncDealFunc func(ctx context.Context, msg Message, resp []*rmq_client.SendReceipt, err error)
type SendTransactionCheckerFunc ¶
type SendTransactionCheckerFunc func(msg *rmq_client.MessageView) rmq_client.TransactionResolution
Source Files ¶
Click to show internal directories.
Click to hide internal directories.