rabbitmq

package
v0.0.1-202204071100 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2022 License: MPL-2.0 Imports: 11 Imported by: 1

README

RabbitMQ配置

  • default: 缺省的RabbitMQ客户端
  • items: 其他的RabbitMQ客户端
[sdk.rabbitmq]
   [sdk.rabbitmq.default]
       host="127.0.0.1"                       <--- RabbitMQ连接地址
       username="testuser"                    <--- RabbitMQ连接的用户名
       password="testpassword"                <--- RabbitMQ连接的密码
       port=5672                              <--- RabbitMQ连接的端口         
       vhost="/"                              <--- RabbitMQ连接的vhost
       [[sdk.rabbitmq.default.consumers]]     <--- RabbitMQ消费者配置,包括exchange_name、exchange_type...
           name = "consumer1"                 <--- 消费者必须用name来区分
           exchange_name="testexchange"
           exchange_type="direct"             <--- 当前支支持direct,fanout,topic三种交换方式
           queue_name = "testqueue"           <--- 队列名,如果queue_name为空,则会自动生成一个随机名字的队列
           routing_keys = [""]                <--- 在topic模式下,可以指定多个routing_key
       [[sdk.rabbitmq.default.producers]]     <--- RabbitMQ生产者配置,包括exchange_name、exchange_type
           name = "producer1"                 <--- 消费者必须用name来区分
           exchange_name="testexchange"
           exchange_type="direct"             <--- 当前支支持direct,fanout,topic三种交换方式
   [[sdk.rabbitmq.items]]
           name = "extra_rabbitmq"
           host="127.0.0.1"
           username="testuser"
           password="testpassword"
           port=5672
           vhost="/"
           [[sdk.rabbitmq.items.consumers]]
               ...
  1. 除了default的RabbitMQ,如果还需要使用其他RabbitMQ服务,可以在[[sdk.rabbitmq.items]]中配置,用name来区分
  2. consumers配置中必须指定name来区分不同交换配置, 在创建consumer或者producer的时候使用name来获取相关配置信息
  3. consumers配置实际使用中建议必须设置queue_name,否则server会自动生成一个随机队列,在下次重启consumer或者producer的时候,无法知道这个随机队列的名字,从而无法继续处理
  4. 在producer发送消息的时候,如果在clients.routing_keys中指定了路由键,需要在发送时同样指定key,否则发送会失败。如果clients.routing_keys中包含""路由键,则发送的时候可以忽略key

RabbitMQ使用指南

获取RabbitMQ客户端

  • 获取缺省RabbitMQ客户端: sdk.Rabbitmq.My()
  • 获取指定名字的RabbitMQ客户端: sdk.Rabbitmq.By(name)

Rabbitmq创建producer并发送消息

  • 首先指定client的name通过Producer()获取producer实例
  • 调用producer实例的Publish()方法发送消息
p, err := sdk.Rabbitmq.My().Producer("producer1")
if err != nil {
	sdk.Logger.Error("create producer", "err", err)
    return err
}

err = p.Publish([]byte("test"))
if err != nil {
	sdk.Logger.Fatal("publish", "err", err)
}

这里sdk中保证了发送消息是可靠的,如果日志打印出publishing is not acked错误请及时检查代码

注意: 如果在topic模式,或者其他指定了非空的routing_key的情况下,Publish函数的第二个值必须为能够匹配上的routing_key

...
err = p.Publish([]byte("test"), "routing_key1")
if err != nil {
	sdk.Logger.Fatal("publish", "err", err)
}

Rabbitmq创建consumer并接收处理消息

  1. 首先指定client的name通过Consumer()获取consumer实例
  2. 定义消息内容的处理函数, 消息函数为func(data []byte) error格式
  3. 调用consumer实例的Consume()方法接收并处理消息
func msgProcess(data []byte) types.MqMsgAction {
	fmt.Println(utils.BytesToString(data))
	return types.Ack
}

func xxx() {
    c, err := sdk.Rabbitmq.My().Consumer("cosumer1", msgProcess)
	if err != nil {
		sdk.Logger.Fatal("create consumer", "err", err)
	}

    c.Consume()
}

自定义发送或者接收的选项

  1. 首先调用GetDefaultOptions()函数获取到系统所有默认选项, 默认选项有QueueOption, ExchangeOption, PublishOption, ConsumeOption四种
  2. 修改指定选项中的默认值
  3. 在创建producer或者consumer的时候将修改后的Options作为参数传入
...
mq := Rabbitmq.My()
options := mq.GetDefaultOptions()
queueOption := options[types.MqOptionQueue].(*rabbitmq.QueueOption)
queueOption.Durable = false
options[types.MqOptionQueue] = queueOption
p, err := mq.Producer("client1", msgProcess, options)
... 

这里sdk保证了各种connection或channel错误,包括网络故障,RabbitMQ服务重启都可以重连恢复

Documentation

Overview

Package rabbitmq

  1. exchangeType: direct * simple模式 一对一,一个发送一个接收 * work模式 和simple模式一样,不同在于work模式可以有多个消费者,work模式起到一个负载均衡的作用,不同worker轮询获取一条消息并进行处理 * Routing模式(publishKey: routingKey + queueBind: routingKey) 在订阅模式下,一个消息可以被多个消费者消费,如果我们想指定某个消息由哪些消费者消费,我们就要采用Routing模式, routing模式最大的特点是可以从生产端通过routingKey来指定的消费端来消费消息
  2. exchangeType: fanout * 订阅模式(exchangeType: fanout, routingKey: empty) simple模式和work模式他们有一个共同的特点就是一个消息只能被一个消费者消费,如果需要一个消息被多个消费者消费,就需要订阅模式。 订阅模式的特点是一个消息被投递到多个队列,一个消息能被多个消费者获取。 过程是由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
  3. exchangeType: topic * topic模式(publishKey: routingKey + queueBind: routingKey) 当基于routing模式,通过指定通配符的方式来指定我们的消费者来消费消息

Package rabbitmq log capability of zerolog @Description zerolog implementation of log capability @Author Ryan Fan 2021-06-09 @Update Ryan Fan 2021-06-09

Index

Constants

View Source
const (
	MAX_CLIENT_RECONNECT_WAIT_TIME = 10 * time.Second
)

重连等待的最大等待时间

View Source
const PUBLISH_CONFIRM_RETRY_TIMEOUT = 1 * time.Second

Variables

View Source
var (
	ErrConsumerConfigNotFound = errors.New("consumer config not found")
	ErrInvalidConsumerConfig  = errors.New("invalid consumer config")
	ErrProducerConfigNotFound = errors.New("producer config not found")
	ErrInvalidProducerConfig  = errors.New("invalid consumer config")
)
View Source
var (
	ErrPublishNotConfirmed = errors.New("publish is not acknowledged")
	ErrPublishAckLost      = errors.New("publish ack lost")
)
View Source
var (
	SupportedExchangeTypes = []string{"direct", "fanout", "topic"}
)

支持的exchangeTypes

Functions

func NewMq

func NewMq(providerType string, config *MqConfig, logger types.LogProvider) (types.Mq, error)

Types

type BaseClient

type BaseClient struct {
	Logger types.LogProvider

	Name    string // 客户端名字
	Url     string // 连接url
	Options map[types.MqOptionType]types.MqOptioner

	Connection *amqp.Connection
	Channel    *amqp.Channel
	// contains filtered or unexported fields
}

消息队列客户端维护connection和channel

type ConsumeOption

type ConsumeOption struct {
	ConsumerTag string // consumer_tag 消费者标签
	NoLocal     bool   // 这个功能属于AMQP的标准,但是rabbitMQ并没有做实现.
	NoAck       bool   // 收到消息后,是否不需要回复确认即被认为被消费
	Exclusive   bool   // 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接
	NoWait      bool   // 不返回执行结果,但是如果exclusive开启的话,则必须需要等待结果的,如果exclusive和nowait都为true就会报错
	Arguments   amqp.Table
}

mandatory=true 当没有队列匹配routingKey, 发布的消息也可能处于不能递交状态 immediate=true 如果在匹配的队列上没有消费者准备好,发布的消息也可能处于不能递交状态

func (ConsumeOption) GetType

func (q ConsumeOption) GetType() types.MqOptionType

type ConsumerClient

type ConsumerClient struct {
	*BaseClient
	Config *ConsumerConfig
}

type ConsumerConfig

type ConsumerConfig struct {
	Name         string   `mapstructure:"name"`
	ExchangeName string   `mapstructure:"exchange_name"`
	ExchangeType string   `mapstructure:"exchange_type"`
	QueueName    string   `mapstructure:"queue_name"`
	RoutingKeys  []string `mapstructure:"routing_keys"`
}

ConsumerConfig 客户端配置

type ExchangeOption

type ExchangeOption struct {
	Durable    bool // 消息是否持久化
	AutoDelete bool // 是否会自动删除:当最后一个消费者断开后,是否将队列中的消息清除
	Internal   bool // 是否具有排他性,意思只有创建者可见,其他人不可用
	NoWait     bool // 是否阻塞
	Args       amqp.Table
}

声明exchange, 注意:如果声明出错的话会导致关闭channel

以"amp."前缀开头的exchange名字为保留名字

不同类型的exchange定义了消息的不同路由方式,当前有如下类型: "direct", "fanout", "topic"和"headers"

Durable=true, AutoDelete=false 该类型的exchange在服务重启后会恢复并保持可用即使没有其他binding存在. 这种生命周期的exchange是最稳定的并且是缺省的exchange

Durable=false, AutoDelete=true 如果没有其他bindings该exchange将会被删除,并且在服务重启后不会被恢复 这种生命周期的exchange适用于临时的并且在其他消费者完成后不需要继续保留在virtual host的场景

Durable=false, AutoDelete=false 该类型的exchange在服务运行时始终存在即使其没有其他binding存在 这种生命周期的exchange适用于在binding之前需要等待较长时间

Durable=true, Auto-Deleted=true 这种类型exchange在服务重启后会恢复,但无可用binding的时候会被删除 这种生命周期的exchange适用于绑定到持久化的队列但同时又要求其自动会被删除

注意: RabbitMQ会将'amq.fanout'开头的exchange声明为durable=true, 所以绑定到该exchange的队列也需要设置durable=true

internal=true 当不想对外公布exchange只想做内部exchange的时候才适用,一般不用

当noWait=true,在声明的时候不会等待server端的确认,如果有错误会被NotifyClose回调函数监听到

func (ExchangeOption) GetType

func (q ExchangeOption) GetType() types.MqOptionType

type MqConfig

type MqConfig struct {
	Name      string            `mapstructure:"name"`
	Host      string            `mapstructure:"host"`
	Port      int               `mapstructure:"port"`
	Username  string            `mapstructure:"username"`
	Password  string            `mapstructure:"password"`
	Vhost     string            `mapstructure:"vhost"`
	Consumers []*ConsumerConfig `mapstructure:"consumers"`
	Producers []*ProducerConfig `mapstructure:"producers"`
}

MqConfig amqp://user:pass@host:10000/vhost

type MqProviderConfig

type MqProviderConfig struct {
	Default *MqConfig   `mapstructure:"default"` // 缺省
	Items   []*MqConfig `mapstructure:"items"`
}

type ProducerClient

type ProducerClient struct {
	*BaseClient
	Config *ProducerConfig
}

type ProducerConfig

type ProducerConfig struct {
	Name         string `mapstructure:"name"`
	ExchangeName string `mapstructure:"exchange_name"`
	ExchangeType string `mapstructure:"exchange_type"`
}

ProducerConfig 发送端配置

type PublishOption

type PublishOption struct {
	Mandatory bool
	Immediate bool

	ContentType  string // MIME content type
	DeliveryMode uint8  // Transient (0 or 1) or Persistent (2)

}

mandatory=true 当没有队列匹配routingKey, 发布的消息也可能处于不能递交状态 immediate=true 如果在匹配的队列上没有消费者准备好,发布的消息也可能处于不能递交状态

func (PublishOption) GetType

func (q PublishOption) GetType() types.MqOptionType

type QosOption

type QosOption struct {
	PrefetchCount int
	PrefetchSize  int
	Global        bool
}

func (QosOption) GetType

func (q QosOption) GetType() types.MqOptionType

type QueueOption

type QueueOption struct {
	Durable    bool // 消息是否持久化
	AutoDelete bool // 是否会自动删除:当最后一个消费者断开后,是否将队列中的消息清除
	Exclusive  bool // 是否具有排他性,意思只有创建者可见,其他人不可用
	NoWait     bool // 是否阻塞等待
	Args       amqp.Table
}

所有declare的queue会获取一个queueBinding,具有以下配置: exchangeName="", type="direct", routingKey=queueName

通过该queueBinding, 我们可以直接通过以下配置的exchange发送消息: exchangeName="", routingKey=queueName,

e,g:

QueueDeclare("alerts", true, false, false, false, nil)
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})

Delivery       Exchange  Key       Queue
-----------------------------------------------
key: alerts -> ""     -> alerts -> alerts

如果queueName为空,服务器会生成一个唯一的queueName并通过该Queue结构的Name字段返回

Durable=true,AutoDelete=false 该队列不管服务是否重启,也不管是否有消费者或者binding它就始终存在 持久的消息将会在服务重启后恢复,注意这些队列只能保存在durable=true的exchange上

Durable=false,AutoDelete=true 该队列在服务重启后并不会会重新声明 当最后一个消费者取消或者消费者通道关闭后服务器将很快删除该队列 这种队列只能保存在durable=false的exchange上

Durable=false,AutoDelete=false 该队列在服务运行时将始终处于可用状态不管有多少个消费者 这种生命周期的队列通常用来保存可能在不同消费者间存在很久的临时拓扑 这种队列只能保存在durable=false的exchange上

Durable=true, AutoDelete=true 该队列在服务重启后会恢复,但如果没有活动的消费者该队列将会被移除 这种生命周期的队列一般不太会使用

Exclusive=true的队列只能被声明队列的connections使用,当connection关闭的时候队列也会被删除 同时如果有其他channel尝试declare,bind,consume,purge或删除同样名字的队列会报错

Nowait=true 如果该选项为true, 在声明队列时会假设总是声明成功, 如果服务器上已有同样的queue, 或者其他connections尝试修改该队列,channel都会抛出exception

如果返回的错误不为空,你可以认为该队列用这些配置参数不能声明成功,channel会关闭

func (QueueOption) GetType

func (q QueueOption) GetType() types.MqOptionType

type RabbitMq

type RabbitMq struct {
	Logger types.LogProvider
	Config *MqConfig
}

func (*RabbitMq) CreateConsumer

func (rmq *RabbitMq) CreateConsumer(name string, processFunc types.MqMsgProcessFunc, args ...map[types.MqOptionType]types.MqOptioner) (types.MqConsumer, error)

producer的名字和route中的名字对应

func (*RabbitMq) CreateProducer

func (rmq *RabbitMq) CreateProducer(name string, args ...map[types.MqOptionType]types.MqOptioner) (types.MqProducer, error)

producer的名字和route中的名字对应

func (*RabbitMq) GetDefaultOptions

func (rmq *RabbitMq) GetDefaultOptions() map[types.MqOptionType]types.MqOptioner

func (*RabbitMq) NewConsumerClient

func (rmq *RabbitMq) NewConsumerClient(name string, options map[types.MqOptionType]types.MqOptioner) (*ConsumerClient, error)

type RabbitMqConsumer

type RabbitMqConsumer struct {
	Logger        types.LogProvider
	ConsumeOption *ConsumeOption
	QosOption     *QosOption
	Client        *ConsumerClient
	QueueName     string
	Process       types.MqMsgProcessFunc
	// contains filtered or unexported fields
}

func (*RabbitMqConsumer) Close

func (mc *RabbitMqConsumer) Close()

func (*RabbitMqConsumer) Consume

func (mc *RabbitMqConsumer) Consume()

消费消息 consume -> spawn a routing -> handle(deliveries)

type RabbitMqProducer

type RabbitMqProducer struct {
	Logger             types.LogProvider
	Option             *PublishOption
	ExchangeName       string
	Client             *ProducerClient
	CurrentDeliveryTag uint64 // 当前确认的deliveryTag
	LastDeliverTag     uint64 // 上一次确认的deliveryTag
	// contains filtered or unexported fields
}

func (*RabbitMqProducer) Close

func (rmp *RabbitMqProducer) Close()

func (*RabbitMqProducer) GetLastConfirmedId

func (rmp *RabbitMqProducer) GetLastConfirmedId() uint64

func (RabbitMqProducer) Publish

func (rmp RabbitMqProducer) Publish(data []byte, args ...interface{}) error

Publish 发布内容到exchange 当你想发送一条消息到单个队列的时候,你可以用queueName当做routingKey来发送到缺省的exchange 因为每个申明的queue都有一个隐式的route到缺省exchange

mandatory=true 当没有队列匹配routingKey, 发布的消息也可能处于不能递交状态 ---> immediate=true 如果在匹配的队列上没有消费者准备好,发布的消息也可能处于不能递交状态

当connection、channel被关闭的时候都会返回错误,所以我们不能通过判断没有错误来认为服务器已经接收到发布的内容 同时因为发布动作是异步的,不能递交的消息会被服务器返回, 我们需要实现Channel.NotifyReturn接口来监听并处理不能递交的消息

当底层socket被关闭的时候如果没有将等待发送的消息报从内核缓存中进行保存,有可能导致发布内容不能到达broker 最简单的防止消息对视就是在终止发布应用的时候需要调用Connection.Close来保证消息不丢失 另外为了确保消息到达服务器需要添加一个Channel.NotifyPublish的监听,并且让Channel处于Confirm模式 发布递交的标签和对应的确认从1开始, 当所有发布确认后会退出

当发布没有返回错误并且channel在confirm模式, DeliveryTags的内部计数器首先确认从1开始

type RabbitmqProvider

type RabbitmqProvider struct {
	mq.BaseMqProvider
}

func (*RabbitmqProvider) Init

func (rp *RabbitmqProvider) Init(rootConfiger types.Configer, logger types.LogProvider, args ...interface{}) error

Init implements types.Provider interface, used to initialize the capability @author Ryan Fan (2021-06-09) @param baseconf.Configer root config interface to extract config info @return error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL