Documentation ¶
Overview ¶
Package rmq 提供了访问Rmq服务的能力
Index ¶
- Constants
- Variables
- func InitConsumer(config ConsumerConf) (err error)
- func InitProducer(config ProducerConf) (err error)
- func StartConsumer(g *gin.Engine, service string, callback MessageCallback) error
- func StartProducer(service string) error
- func StopConsumer(service string) error
- func StopProducer(service string) error
- func StopRmqConsume()
- func StopRmqProduce()
- type Auth
- type ClientConf
- type ConsumerConf
- type DelayLevel
- type Message
- type MessageBatch
- type MessageCallback
- type NmqResponse
- type Producer
- type ProducerConf
- type RmqConfig
Constants ¶
View Source
const ( Second = DelayLevel(iota + 1) Seconds5 Seconds10 Seconds30 Minute1 Minutes2 Minutes3 Minutes4 Minutes5 Minutes6 Minutes7 Minutes8 Minutes9 Minutes10 Minutes20 Minutes30 Hour1 Hours2 )
View Source
const HeaderPre = "X-Mq-"
Variables ¶
View Source
var ( // ErrRmqSvcConfigInvalid 服务配置无效 ErrRmqSvcConfigInvalid = fmt.Errorf("requested rmq service is not correctly configured") // ErrRmqSvcNotRegiestered 服务尚未被注册 ErrRmqSvcNotRegiestered = fmt.Errorf("requested rmq service is not registered") // ErrRmqSvcAlreadyRegistered 服务已被注册 ErrRmqSvcAlreadyRegistered = fmt.Errorf("requested rmq service is already registered") // ErrRmqSvcInvalidOperation 当前操作无效 ErrRmqSvcInvalidOperation = fmt.Errorf("requested rmq service is not suitable for current operation") )
Functions ¶
func InitConsumer ¶
func InitConsumer(config ConsumerConf) (err error)
func InitProducer ¶
func InitProducer(config ProducerConf) (err error)
func StartConsumer ¶
func StartConsumer(g *gin.Engine, service string, callback MessageCallback) error
StartConsumer 启动指定已注册的Rmq消费服务, 并指定消费回调
Types ¶
type Auth ¶
type Auth struct { AccessKey string `json:"ak,omitempty" yaml:"ak,omitempty"` SecretKey string `json:"sk,omitempty" yaml:"sk,omitempty"` }
auth 提供链接到Broker所需要的验证信息(按需配置)
type ClientConf ¶
type ClientConf struct { // 名称,不同 producer 间不可重复,不同 consumer 间不可重复 Service string `json:"service" yaml:"service"` // 提供名字服务器的地址,eg: mq-xxx-svc.mq NameServer string `json:"nameserver" yaml:"nameserver"` // auth 配置,走 proxy 鉴权,通常不需要手动配置 Auth Auth `json:"auth" yaml:"auth"` // 要生产/消费的主题 Topic string `json:"topic" yaml:"topic"` // 是否开启消息轨迹,默认不开启 Trace bool `json:"trace" yaml:"trace"` // 存储消息轨迹的 topic, 默认: RMQ_SYS_TRACE_TOPIC TraceTopic string `json:"traceTopic" yaml:"traceTopic"` }
func (*ClientConf) Check ¶
func (conf *ClientConf) Check() error
type ConsumerConf ¶
type ConsumerConf struct { ClientConf `json:",inline" yaml:",inline"` // 消费消息的TAG Tags []string `json:"tags" yaml:"tags"` // 消费组名称 Group string `json:"group" yaml:"group"` // 是否是广播消费模式 Broadcast bool `json:"broadcast" yaml:"broadcast"` // 是否是顺序消费模式 Orderly bool `json:"orderly" yaml:"orderly"` // 批量消费数量, 默认1 Batch int `json:"batch" yaml:"batch"` // 消费失败时的重试次数 Retry int `json:"retry" yaml:"retry"` // 消费失败重试间隔 顺序消费时可用 RetryInterval time.Duration `json:"retry_interval,omitempty" yaml:"retry_interval,omitempty"` }
func (*ConsumerConf) Check ¶
func (conf *ConsumerConf) Check() error
type Message ¶
type Message interface { WithTag(string) Message WithKey(string) Message WithShard(string) Message WithDelay(DelayLevel) Message WithHeader(key string, value string) Message Send(ctx *gin.Context) (msgID string, err error) GetContent() []byte GetTag() string GetKey() string GetShard() string GetID() string GetHeader(key string) string GetAllHeader() map[string]string GetTime() time.Time //消费时使用 获取消息生产时间 GetRetry() int //消费时使用 获取消息重试次数 GetTopic() string //消费时使用 获取消息Topic // Deprecated: nmq hack方法, 不需使用 SetTopic(string) Message // Deprecated: nmq hack方法, 不需使用 SetProperty(key string, value string) Message //nmq hack方法, 不需使用 }
Message 消息提供的接口定义
type MessageBatch ¶
type MessageBatch []Message
type MessageCallback ¶
MessageCallback 定义业务方接收消息的回调接口
type NmqResponse ¶
type ProducerConf ¶
type ProducerConf struct { ClientConf `json:",inline" yaml:",inline"` // 生产重试次数 Retry int `json:"retry" yaml:"retry"` // 生产超时时间 Timeout time.Duration `json:"timeout" yaml:"timeout"` }
func (*ProducerConf) Check ¶
func (conf *ProducerConf) Check() error
type RmqConfig ¶
type RmqConfig struct { Producer []ProducerConf Consumer []ConsumerConf }
Click to show internal directories.
Click to hide internal directories.