Documentation ¶
Overview ¶
Package rabbitmq
- exchangeType: direct * simple模式 一对一,一个发送一个接收 * work模式 和simple模式一样,不同在于work模式可以有多个消费者,work模式起到一个负载均衡的作用,不同worker轮询获取一条消息并进行处理 * Routing模式(publishKey: routingKey + queueBind: routingKey) 在订阅模式下,一个消息可以被多个消费者消费,如果我们想指定某个消息由哪些消费者消费,我们就要采用Routing模式, routing模式最大的特点是可以从生产端通过routingKey来指定的消费端来消费消息
- exchangeType: fanout * 订阅模式(exchangeType: fanout, routingKey: empty) simple模式和work模式他们有一个共同的特点就是一个消息只能被一个消费者消费,如果需要一个消息被多个消费者消费,就需要订阅模式。 订阅模式的特点是一个消息被投递到多个队列,一个消息能被多个消费者获取。 过程是由生产者将消息发送到exchange(交换机)里,然后exchange通过一系列的规则发送到队列上,然后由绑定对应的消费者进行消息。
- exchangeType: topic * topic模式(publishKey: routingKey + queueBind: routingKey) 当基于routing模式,通过指定通配符的方式来指定我们的消费者来消费消息
Package rabbitmq log capability of zerolog @Description zerolog implementation of log capability @Author Ryan Fan 2021-06-09 @Update Ryan Fan 2021-06-09
Index ¶
- Constants
- Variables
- func NewMq(providerType string, config *MqConfig, logger types.LogProvider) (types.Mq, error)
- type BaseClient
- type ConsumeOption
- type ConsumerClient
- type ConsumerConfig
- type ExchangeOption
- type MqConfig
- type MqProviderConfig
- type ProducerClient
- type ProducerConfig
- type PublishOption
- type QosOption
- type QueueOption
- type RabbitMq
- func (rmq *RabbitMq) CreateConsumer(name string, processFunc types.MqMsgProcessFunc, ...) (types.MqConsumer, error)
- func (rmq *RabbitMq) CreateProducer(name string, args ...map[types.MqOptionType]types.MqOptioner) (types.MqProducer, error)
- func (rmq *RabbitMq) GetDefaultOptions() map[types.MqOptionType]types.MqOptioner
- func (rmq *RabbitMq) NewConsumerClient(name string, options map[types.MqOptionType]types.MqOptioner) (*ConsumerClient, error)
- type RabbitMqConsumer
- type RabbitMqProducer
- type RabbitmqProvider
Constants ¶
const (
MAX_CLIENT_RECONNECT_WAIT_TIME = 10 * time.Second
)
重连等待的最大等待时间
const PUBLISH_CONFIRM_RETRY_TIMEOUT = 1 * time.Second
Variables ¶
var ( ErrConsumerConfigNotFound = errors.New("consumer config not found") ErrInvalidConsumerConfig = errors.New("invalid consumer config") ErrProducerConfigNotFound = errors.New("producer config not found") ErrInvalidProducerConfig = errors.New("invalid consumer config") )
var ( ErrPublishNotConfirmed = errors.New("publish is not acknowledged") ErrPublishAckLost = errors.New("publish ack lost") )
var (
SupportedExchangeTypes = []string{"direct", "fanout", "topic"}
)
支持的exchangeTypes
Functions ¶
Types ¶
type BaseClient ¶
type BaseClient struct { Logger types.LogProvider Name string // 客户端名字 Url string // 连接url Options map[types.MqOptionType]types.MqOptioner Connection *amqp.Connection Channel *amqp.Channel // contains filtered or unexported fields }
消息队列客户端维护connection和channel
type ConsumeOption ¶
type ConsumeOption struct { ConsumerTag string // consumer_tag 消费者标签 NoLocal bool // 这个功能属于AMQP的标准,但是rabbitMQ并没有做实现. NoAck bool // 收到消息后,是否不需要回复确认即被认为被消费 Exclusive bool // 排他消费者,即这个队列只能由一个消费者消费.适用于任务不允许进行并发处理的情况下.比如系统对接 NoWait bool // 不返回执行结果,但是如果exclusive开启的话,则必须需要等待结果的,如果exclusive和nowait都为true就会报错 Arguments amqp.Table }
mandatory=true 当没有队列匹配routingKey, 发布的消息也可能处于不能递交状态 immediate=true 如果在匹配的队列上没有消费者准备好,发布的消息也可能处于不能递交状态
func (ConsumeOption) GetType ¶
func (q ConsumeOption) GetType() types.MqOptionType
type ConsumerClient ¶
type ConsumerClient struct { *BaseClient Config *ConsumerConfig }
type ConsumerConfig ¶
type ConsumerConfig struct { Name string `mapstructure:"name"` ExchangeName string `mapstructure:"exchange_name"` ExchangeType string `mapstructure:"exchange_type"` QueueName string `mapstructure:"queue_name"` RoutingKeys []string `mapstructure:"routing_keys"` }
ConsumerConfig 客户端配置
type ExchangeOption ¶
type ExchangeOption struct { Durable bool // 消息是否持久化 AutoDelete bool // 是否会自动删除:当最后一个消费者断开后,是否将队列中的消息清除 Internal bool // 是否具有排他性,意思只有创建者可见,其他人不可用 NoWait bool // 是否阻塞 Args amqp.Table }
声明exchange, 注意:如果声明出错的话会导致关闭channel
以"amp."前缀开头的exchange名字为保留名字
不同类型的exchange定义了消息的不同路由方式,当前有如下类型: "direct", "fanout", "topic"和"headers"
Durable=true, AutoDelete=false 该类型的exchange在服务重启后会恢复并保持可用即使没有其他binding存在. 这种生命周期的exchange是最稳定的并且是缺省的exchange
Durable=false, AutoDelete=true 如果没有其他bindings该exchange将会被删除,并且在服务重启后不会被恢复 这种生命周期的exchange适用于临时的并且在其他消费者完成后不需要继续保留在virtual host的场景
Durable=false, AutoDelete=false 该类型的exchange在服务运行时始终存在即使其没有其他binding存在 这种生命周期的exchange适用于在binding之前需要等待较长时间
Durable=true, Auto-Deleted=true 这种类型exchange在服务重启后会恢复,但无可用binding的时候会被删除 这种生命周期的exchange适用于绑定到持久化的队列但同时又要求其自动会被删除
注意: RabbitMQ会将'amq.fanout'开头的exchange声明为durable=true, 所以绑定到该exchange的队列也需要设置durable=true
internal=true 当不想对外公布exchange只想做内部exchange的时候才适用,一般不用
当noWait=true,在声明的时候不会等待server端的确认,如果有错误会被NotifyClose回调函数监听到
func (ExchangeOption) GetType ¶
func (q ExchangeOption) GetType() types.MqOptionType
type MqConfig ¶
type MqConfig struct { Name string `mapstructure:"name"` Host string `mapstructure:"host"` Port int `mapstructure:"port"` Username string `mapstructure:"username"` Password string `mapstructure:"password"` Vhost string `mapstructure:"vhost"` Consumers []*ConsumerConfig `mapstructure:"consumers"` Producers []*ProducerConfig `mapstructure:"producers"` }
MqConfig amqp://user:pass@host:10000/vhost
type MqProviderConfig ¶
type ProducerClient ¶
type ProducerClient struct { *BaseClient Config *ProducerConfig }
type ProducerConfig ¶
type ProducerConfig struct { Name string `mapstructure:"name"` ExchangeName string `mapstructure:"exchange_name"` ExchangeType string `mapstructure:"exchange_type"` }
ProducerConfig 发送端配置
type PublishOption ¶
type PublishOption struct { Mandatory bool Immediate bool ContentType string // MIME content type DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) }
mandatory=true 当没有队列匹配routingKey, 发布的消息也可能处于不能递交状态 immediate=true 如果在匹配的队列上没有消费者准备好,发布的消息也可能处于不能递交状态
func (PublishOption) GetType ¶
func (q PublishOption) GetType() types.MqOptionType
type QueueOption ¶
type QueueOption struct { Durable bool // 消息是否持久化 AutoDelete bool // 是否会自动删除:当最后一个消费者断开后,是否将队列中的消息清除 Exclusive bool // 是否具有排他性,意思只有创建者可见,其他人不可用 NoWait bool // 是否阻塞等待 Args amqp.Table }
所有declare的queue会获取一个queueBinding,具有以下配置: exchangeName="", type="direct", routingKey=queueName
通过该queueBinding, 我们可以直接通过以下配置的exchange发送消息: exchangeName="", routingKey=queueName,
e,g:
QueueDeclare("alerts", true, false, false, false, nil) Publish("", "alerts", false, false, Publishing{Body: []byte("...")}) Delivery Exchange Key Queue ----------------------------------------------- key: alerts -> "" -> alerts -> alerts
如果queueName为空,服务器会生成一个唯一的queueName并通过该Queue结构的Name字段返回
Durable=true,AutoDelete=false 该队列不管服务是否重启,也不管是否有消费者或者binding它就始终存在 持久的消息将会在服务重启后恢复,注意这些队列只能保存在durable=true的exchange上
Durable=false,AutoDelete=true 该队列在服务重启后并不会会重新声明 当最后一个消费者取消或者消费者通道关闭后服务器将很快删除该队列 这种队列只能保存在durable=false的exchange上
Durable=false,AutoDelete=false 该队列在服务运行时将始终处于可用状态不管有多少个消费者 这种生命周期的队列通常用来保存可能在不同消费者间存在很久的临时拓扑 这种队列只能保存在durable=false的exchange上
Durable=true, AutoDelete=true 该队列在服务重启后会恢复,但如果没有活动的消费者该队列将会被移除 这种生命周期的队列一般不太会使用
Exclusive=true的队列只能被声明队列的connections使用,当connection关闭的时候队列也会被删除 同时如果有其他channel尝试declare,bind,consume,purge或删除同样名字的队列会报错
Nowait=true 如果该选项为true, 在声明队列时会假设总是声明成功, 如果服务器上已有同样的queue, 或者其他connections尝试修改该队列,channel都会抛出exception
如果返回的错误不为空,你可以认为该队列用这些配置参数不能声明成功,channel会关闭
func (QueueOption) GetType ¶
func (q QueueOption) GetType() types.MqOptionType
type RabbitMq ¶
type RabbitMq struct { Logger types.LogProvider Config *MqConfig }
func (*RabbitMq) CreateConsumer ¶
func (rmq *RabbitMq) CreateConsumer(name string, processFunc types.MqMsgProcessFunc, args ...map[types.MqOptionType]types.MqOptioner) (types.MqConsumer, error)
producer的名字和route中的名字对应
func (*RabbitMq) CreateProducer ¶
func (rmq *RabbitMq) CreateProducer(name string, args ...map[types.MqOptionType]types.MqOptioner) (types.MqProducer, error)
producer的名字和route中的名字对应
func (*RabbitMq) GetDefaultOptions ¶
func (rmq *RabbitMq) GetDefaultOptions() map[types.MqOptionType]types.MqOptioner
func (*RabbitMq) NewConsumerClient ¶
func (rmq *RabbitMq) NewConsumerClient(name string, options map[types.MqOptionType]types.MqOptioner) (*ConsumerClient, error)
type RabbitMqConsumer ¶
type RabbitMqConsumer struct { Logger types.LogProvider ConsumeOption *ConsumeOption QosOption *QosOption Client *ConsumerClient QueueName string Process types.MqMsgProcessFunc // contains filtered or unexported fields }
func (*RabbitMqConsumer) Close ¶
func (mc *RabbitMqConsumer) Close()
func (*RabbitMqConsumer) Consume ¶
func (mc *RabbitMqConsumer) Consume()
消费消息 consume -> spawn a routing -> handle(deliveries)
type RabbitMqProducer ¶
type RabbitMqProducer struct { Logger types.LogProvider Option *PublishOption ExchangeName string Client *ProducerClient CurrentDeliveryTag uint64 // 当前确认的deliveryTag LastDeliverTag uint64 // 上一次确认的deliveryTag // contains filtered or unexported fields }
func (*RabbitMqProducer) Close ¶
func (rmp *RabbitMqProducer) Close()
func (*RabbitMqProducer) GetLastConfirmedId ¶
func (rmp *RabbitMqProducer) GetLastConfirmedId() uint64
func (RabbitMqProducer) Publish ¶
func (rmp RabbitMqProducer) Publish(data []byte, args ...interface{}) error
Publish 发布内容到exchange 当你想发送一条消息到单个队列的时候,你可以用queueName当做routingKey来发送到缺省的exchange 因为每个申明的queue都有一个隐式的route到缺省exchange
mandatory=true 当没有队列匹配routingKey, 发布的消息也可能处于不能递交状态 ---> immediate=true 如果在匹配的队列上没有消费者准备好,发布的消息也可能处于不能递交状态
当connection、channel被关闭的时候都会返回错误,所以我们不能通过判断没有错误来认为服务器已经接收到发布的内容 同时因为发布动作是异步的,不能递交的消息会被服务器返回, 我们需要实现Channel.NotifyReturn接口来监听并处理不能递交的消息
当底层socket被关闭的时候如果没有将等待发送的消息报从内核缓存中进行保存,有可能导致发布内容不能到达broker 最简单的防止消息对视就是在终止发布应用的时候需要调用Connection.Close来保证消息不丢失 另外为了确保消息到达服务器需要添加一个Channel.NotifyPublish的监听,并且让Channel处于Confirm模式 发布递交的标签和对应的确认从1开始, 当所有发布确认后会退出
当发布没有返回错误并且channel在confirm模式, DeliveryTags的内部计数器首先确认从1开始
type RabbitmqProvider ¶
type RabbitmqProvider struct {
mq.BaseMqProvider
}
func (*RabbitmqProvider) Init ¶
func (rp *RabbitmqProvider) Init(rootConfiger types.Configer, logger types.LogProvider, args ...interface{}) error
Init implements types.Provider interface, used to initialize the capability @author Ryan Fan (2021-06-09) @param baseconf.Configer root config interface to extract config info @return error