rmq

package
v1.0.78 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 17, 2024 License: MIT Imports: 27 Imported by: 0

Documentation

Overview

Package rmq 提供了访问Rmq服务的能力

Index

Constants

View Source
const (
	Second = DelayLevel(iota + 1)
	Seconds5
	Seconds10
	Seconds30
	Minute1
	Minutes2
	Minutes3
	Minutes4
	Minutes5
	Minutes6
	Minutes7
	Minutes8
	Minutes9
	Minutes10
	Minutes20
	Minutes30
	Hour1
	Hours2
)
View Source
const HeaderPre = "X-Mq-"

Variables

View Source
var (
	// ErrRmqSvcConfigInvalid 服务配置无效
	ErrRmqSvcConfigInvalid = fmt.Errorf("requested rmq service is not correctly configured")
	// ErrRmqSvcNotRegiestered 服务尚未被注册
	ErrRmqSvcNotRegiestered = fmt.Errorf("requested rmq service is not registered")
	// ErrRmqSvcAlreadyRegistered 服务已被注册
	ErrRmqSvcAlreadyRegistered = fmt.Errorf("requested rmq service is already registered")
	// ErrRmqSvcInvalidOperation 当前操作无效
	ErrRmqSvcInvalidOperation = fmt.Errorf("requested rmq service is not suitable for current operation")
)

Functions

func InitConsumer

func InitConsumer(config ConsumerConf) (err error)

func InitProducer

func InitProducer(config ProducerConf) (err error)

func StartConsumer

func StartConsumer(g *gin.Engine, service string, callback MessageCallback) error

StartConsumer 启动指定已注册的Rmq消费服务, 并指定消费回调

func StartProducer

func StartProducer(service string) error

StartProducer 启动指定已注册的Rmq生产服务

func StopConsumer

func StopConsumer(service string) error

StopConsumer 停止指定已注册的Rmq消费服务

func StopProducer

func StopProducer(service string) error

StopProducer 停止指定已注册的Rmq生产服务

func StopRmqConsume

func StopRmqConsume()

停止所有 rmq 消费者

func StopRmqProduce

func StopRmqProduce()

停止所有 rmq 生产者

Types

type Auth

type Auth struct {
	AccessKey string `json:"ak,omitempty" yaml:"ak,omitempty"`
	SecretKey string `json:"sk,omitempty" yaml:"sk,omitempty"`
}

auth 提供链接到Broker所需要的验证信息(按需配置)

type ClientConf

type ClientConf struct {
	// 名称,不同 producer 间不可重复,不同 consumer 间不可重复
	Service string `json:"service" yaml:"service"`
	// 提供名字服务器的地址,eg: mq-xxx-svc.mq
	NameServer string `json:"nameserver" yaml:"nameserver"`
	// auth 配置,走 proxy 鉴权,通常不需要手动配置
	Auth Auth `json:"auth" yaml:"auth"`
	// 要生产/消费的主题
	Topic string `json:"topic" yaml:"topic"`
	// 是否开启消息轨迹,默认不开启
	Trace bool `json:"trace" yaml:"trace"`
	// 存储消息轨迹的 topic, 默认: RMQ_SYS_TRACE_TOPIC
	TraceTopic string `json:"traceTopic" yaml:"traceTopic"`
}

func (*ClientConf) Check

func (conf *ClientConf) Check() error

type ConsumerConf

type ConsumerConf struct {
	ClientConf `json:",inline" yaml:",inline"`
	// 消费消息的TAG
	Tags []string `json:"tags" yaml:"tags"`
	// 消费组名称
	Group string `json:"group" yaml:"group"`
	// 是否是广播消费模式
	Broadcast bool `json:"broadcast" yaml:"broadcast"`
	// 是否是顺序消费模式
	Orderly bool `json:"orderly" yaml:"orderly"`
	// 批量消费数量, 默认1
	Batch int `json:"batch" yaml:"batch"`
	// 消费失败时的重试次数
	Retry int `json:"retry" yaml:"retry"`
	// 消费失败重试间隔  顺序消费时可用
	RetryInterval time.Duration `json:"retry_interval,omitempty" yaml:"retry_interval,omitempty"`
}

func (*ConsumerConf) Check

func (conf *ConsumerConf) Check() error

type DelayLevel

type DelayLevel int

DelayLevel 定义消息延迟发送的级别

type Message

type Message interface {
	WithTag(string) Message
	WithKey(string) Message
	WithShard(string) Message
	WithDelay(DelayLevel) Message
	WithHeader(key string, value string) Message
	Send(ctx *gin.Context) (msgID string, err error)
	GetContent() []byte
	GetTag() string
	GetKey() string
	GetShard() string
	GetID() string
	GetHeader(key string) string
	GetAllHeader() map[string]string
	GetTime() time.Time //消费时使用  获取消息生产时间
	GetRetry() int      //消费时使用  获取消息重试次数
	GetTopic() string   //消费时使用  获取消息Topic

	// Deprecated: nmq hack方法, 不需使用
	SetTopic(string) Message
	// Deprecated: nmq hack方法, 不需使用
	SetProperty(key string, value string) Message //nmq hack方法, 不需使用
}

Message 消息提供的接口定义

func NewMessage

func NewMessage(service string, content []byte) (Message, error)

NewMessage 创建一条新的消息

type MessageBatch

type MessageBatch []Message

func (MessageBatch) Send

func (batch MessageBatch) Send(ctx *gin.Context) (msgID string, err error)

type MessageCallback

type MessageCallback func(ctx *gin.Context, msg Message) error

MessageCallback 定义业务方接收消息的回调接口

type NmqResponse

type NmqResponse struct {
	TransID uint64 `mcpack:"_transid"`
	ErrNo   int    `mcpack:"_error_no" binding:"required"`
	ErrStr  string `mcpack:"_error_msg" binding:"required"`
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

type ProducerConf

type ProducerConf struct {
	ClientConf `json:",inline" yaml:",inline"`
	// 生产重试次数
	Retry int `json:"retry" yaml:"retry"`
	// 生产超时时间
	Timeout time.Duration `json:"timeout" yaml:"timeout"`
}

func (*ProducerConf) Check

func (conf *ProducerConf) Check() error

type RmqConfig

type RmqConfig struct {
	Producer []ProducerConf
	Consumer []ConsumerConf
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL