rabbitmq

package
v2.9.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2023 License: Apache-2.0 Imports: 13 Imported by: 0

README

rabbitmq: An easy golang amqp client

Introduction

rabbitmq 扩展了 github.com/streadway/amqp 包,在 amqp 包的基础上实现了网络断线重连、消息发送失败重试的能力。

由于消息的发送和接收,依赖 Channel 的创建,Channel 的创建依赖与 Connection 的连接。如果断开网络连接,Connection 就会断开。 但是 Channel 并不知道是谁创建了自己。所以通常情况下,我们有两种断线重连的方案:

  • 一种是 Connection 断开后,Channel 自己重新获取 Connection。此种方式是最简单的实现方案。 但此法的问题是,如果 Channel 数量极其庞大, 每个 Channel 都会创建 Connection 并连接。当网络状况不大好的时候,可能会有数量极其庞大的 Channel 反复尝试重连, 导致服务器资源占用会暴增,甚至加剧网络的阻塞。但其实我们只需要一个 Connection 去判断网络是否能连接、已连接。
  • 一种是 Connection 断开后,Connection 自己重连。如果重连不成功,Channel 不做任何操作。如果重连成功,自动重跑注册的操作。 而如果使用断线重连发送消息,将在 Connection 连接成功后继续发送需要发送的消息。此种方式实现比较复杂, 但避免了服务器资源占用以及加剧网络阻塞的问题。

rabbitmq 采用的是后者。

Feature

目前实现了

  • 断线重连
  • 消息重发
  • 同步的消息发送确认

Usage

package main

import (
  "log"
  "time"
  "github.com/abulo/ratel/v2/client/rabbitmq"
  "github.com/streadway/amqp"
)

func main() {
  // -- 创建 Connection 并连接服务器 --
  conn, err := rabbitmq.Dial("amqp://guest:guest@localhost:5672/", rabbitmq.DefaultTimesRetry())
  if err != nil {
    log.Fatal(err)
  }
  defer conn.Close()

  // -- 发送消息 --
  go func() {
    producer := conn.Producer()
    err = producer.Send("amq.direct", "key.direct",
      []byte("producer.Send() | "+time.Now().Format("2006-01-02 15:04:05")),
      rabbitmq.DefaultSendOpts())
    if err != nil {
      log.Fatal(err)
    }
  }()

  // -- 接收消息 --
  go func() {
    consumer := conn.Consumer()
    consumer.Receive(
      "queue.direct",
      rabbitmq.NewReceiveOptsBuilder().SetAutoAck(false).Build(),
      &rabbitmq.AbsReceiveListener{
        ConsumerMethod: func(d *amqp.Delivery) (brk bool) {
          log.Println("queue.direct ", d.DeliveryTag, " ", string(d.Body))
          err := d.Ack(false)
          if err != nil {
            log.Println(err)
          }
          return
        },
        FinishMethod: func(err error) {
          if err != nil {
            // 处理错误
            log.Fatal(err)
          }
          // defer xxx.close() // 关闭资源操作等
        },
      })
  }()

  time.Sleep(time.Second * 10)
}

小建议:

如无特殊需求,我们在全局只需创建一个 Connection,所有消息的发送和接收都使用 Producer 和 Consumer 处理。

Examples

消费

生产

Unresolved

  • 消息异步发送与确认
  • 重复消费
  • 网络分区状态下,找不到队列时消息重发(ReturnListener)
  • 消费者 ack 重试

License

LGPL

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AbsReceiveListener

type AbsReceiveListener struct {
	ConsumerMethod ConsumerFunc
	FinishMethod   func(err error)
}

AbsReceiveListener 的抽象实现。 如果 ConsumerMethod 为 nil 或不赋值,将 panic; 如果 FinishMethod 为 nil 或不赋值,则默认不做任何操作。

func (*AbsReceiveListener) Consumer

func (lis *AbsReceiveListener) Consumer(delivery *amqp.Delivery) (brk bool)

Consumer ..

func (*AbsReceiveListener) Finish

func (lis *AbsReceiveListener) Finish(err error)

Finish TODO

func (*AbsReceiveListener) Remove

func (lis *AbsReceiveListener) Remove(key string, ch *Channel)

Remove TODO

type Adder

type Adder func() uint64

Adder 累加器。每次执行累加一定数额,返回一个 uint64。

func AdderGenerator

func AdderGenerator(delta uint64) Adder

AdderGenerator 累加器生成器。生成的累加器从 0 开始累加,delta 表示需要累加的数字

type Channel

type Channel struct {
	*amqp.Channel
	// contains filtered or unexported fields
}

Channel represents a channel

func (*Channel) Receive

func (c *Channel) Receive(queue string, consumer ConsumerFunc) error

Receive returns error if delivery

func (*Channel) ReceiveOpts

func (c *Channel) ReceiveOpts(queue string, consumer ConsumerFunc, opts *ReceiveOpts) error

ReceiveOpts 持续接收消息并消费,除非 `<-chan amqp.Delivery` 关闭或 ConsumerFunc 主动放弃接收。 参数 opts 表示接收选项。opts 如果为 nil,将使用 DefaultReceiveOpts() 作为默认配置。 如果将参数 opts 的 autoAck 属性设为 false,则应该在 ReceiveListener.Consumer() 函数中调用 (*amqp.Delivery).Ack 手动确认消费;如果设为 true,已发送的消息会被服务器认为已被消费, 可能因网络状况不好等原因导致消息未被接收,从而造成数据丢失。 autoAck 为 false 时不提供自动确认, 是因为消费者有可能会需要拒绝确认,或在消费出现错误时不进行确认。 参数 consumer 用于处理接收操作。参数 consumer 一定不能为 nil,否则将 panic。 返回值:当 `<-chan amqp.Delivery` 关闭或 ConsumerFunc 主动放弃接收,返回 nil;其他情况则返回 error

func (*Channel) RemoveOperation

func (c *Channel) RemoveOperation(key string)

RemoveOperation removes an operation from the channel

func (*Channel) Send

func (c *Channel) Send(exchange string, routingKey string, body []byte) error

Send 如果使用了 Channel.SetXxx 设置了配置,将使用设定的配置发送消息,否则使用默认配置

func (*Channel) SendOpts

func (c *Channel) SendOpts(exchange string, routingKey string, body []byte, opts *SendOpts) error

SendOpts 发送消息。此方法不支持并发操作,如果需要并发发送,请先创建新的 Channel,再执行此方法。 参数 body 即需要发送的消息。 参数 opts 即发送消息需要配置的选项。如果 opts 为 nil,则表示使用默认配置。可以通过配置 SendOpts.retryable 启用消息重发的能力。请注意,由于消息重发使用的是同步的方式处理 ack,因此启用消息重发会极大降低 QPS。

type Connection

type Connection struct {
	sync.Once // 用于保证 Dial 只被调用一次
	// contains filtered or unexported fields
}

Connection amqp 连接。 Connection 创建后不会直接连接服务器,而是要调用 Dial 后才会执行连接服务器操作

func Dial

func Dial(url string, retryable Retryable) (*Connection, error)

Dial 如果 retryable 为 nil,则表示不启用断线重连

Example
conn, err := Dial("amqp://guest:guest@localhost:5672/", nil)
if err != nil {
	panic(err)
}
defer conn.Close()
fmt.Println("connected!")
Output:

connected!

func NewConnection

func NewConnection(url string, retryable Retryable) *Connection

NewConnection retryable 如果为 nil,则使用 emptyRetryable 替换。emptyRetryable 不会尝试重试操作。

func (*Connection) CanRetry

func (c *Connection) CanRetry() bool

CanRetry returns true

func (*Connection) Channel

func (c *Connection) Channel() (*Channel, error)

Channel 可用于发送、接收消息。 函数会先判断是否已连接,否则将尝试重连(使用您之前设置的 Retryable 配置)。 在获得连接的情况下,会立刻创建 Channel。但可能会存在极少数情况下,因为网络不稳定等因素, Channel 创建之前,连接又断开,则会因为网络原因产生错误。

func (*Connection) Close

func (c *Connection) Close() error

Close 关闭 Connection。

func (*Connection) Consumer

func (c *Connection) Consumer() *Consumer

Consumer returns the consumer for the connection

func (*Connection) Dial

func (c *Connection) Dial() error

Dial 连接服务器。仅允许被调用一次。

func (*Connection) IsOpen

func (c *Connection) IsOpen() bool

IsOpen returns true

func (*Connection) Producer

func (c *Connection) Producer() *Producer

Producer returns the producer for the connection

func (*Connection) QueueBuilder

func (c *Connection) QueueBuilder() *QueueBuilder

QueueBuilder returns the queue builder for the connection

func (*Connection) RegisterAndExec

func (c *Connection) RegisterAndExec(opt Operation)

RegisterAndExec 注册并执行 Operation 。 每次重连后,重连监听器会自动使用 exec 执行一遍所有的 Operation 函数。 注意:

  • 函数会在 Operation 执行完后主动关闭 Channel,因此我们无需在 Operation 中手动关闭 Channel。
  • 由于使用了 go routine,该方法可能会在 Operation 操作执行完毕前返回。
Example
var err error
conn := NewConnection("amqp://guest:guest@localhost:5672/", &TimesRetry{Interval: 3 * time.Second, RetryTimes: 10})
defer conn.Close()

err = conn.Dial()
if err != nil {
	log.Fatal(err)
}

consumerTag := NewDefaultSAdder()
conn.RegisterAndExec(func(key string, ch *Channel) {
	deliverys, e := ch.Consume("queue.direct", consumerTag(), true, false, false, false, nil)
	if e != nil {
		log.Fatal(e)
	}
	for delivery := range deliverys {
		log.Println("queue.direct-1 ", delivery.DeliveryTag, " ", string(delivery.Body))
	}
})
conn.RegisterAndExec(func(key string, ch *Channel) {
	e := ch.Receive("queue.direct", func(delivery *amqp.Delivery) (brk bool) {
		log.Println("queue.direct-2 ", delivery.DeliveryTag, " ", string(delivery.Body))
		return true
	})
	if e != nil {
		log.Fatal(e)
	}
})

//注意:conn.RegisterAndExec 会导致重连时再次执行 Operation 中的操作。如果不希望发生此情况,应该使用 Connect.Channel 消费消息
conn.RegisterAndExec(func(key string, ch *Channel) {
	delivery, _, e := ch.Get("queue.direct", true)
	if e != nil {
		log.Fatal(e)
	}
	log.Println("queue.direct ", delivery.DeliveryTag, " ", string(delivery.Body))
})

//注意:conn.RegisterAndExec 会导致重连时再次执行 Operation 中的操作。如果不希望发生此情况,应该使用 Connect.Channel 发送消息
conn.RegisterAndExec(func(key string, ch *Channel) {
	e := ch.Send("amq.direct", "key.direct", []byte("rabbitmq addOperation() send test!"))
	if e != nil {
		log.Fatal(e)
	}
})
Output:

func (*Connection) RemoveOperation

func (c *Connection) RemoveOperation(key string)

func (*Connection) RetryChannel

func (c *Connection) RetryChannel(retryable Retryable) (ch *Channel, err error)

RetryChannel 将在 Channel 创建失败后,尝试重试。如果仍然失败,将返回造成失败的原因。

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func (*Consumer) Get

func (c *Consumer) Get(queue string, autoAck bool) (*amqp.Delivery, bool, error)

Get When autoAck is true, the server will automatically acknowledge this message so you don't have to. But if you are unable to fully process this message before the channel or connection is closed, the message will not get requeued

func (*Consumer) Receive

func (c *Consumer) Receive(queue string, opts *ReceiveOpts, lis ReceiveListener)

Receive 持续接收消息并消费。如果期望只接收一次消息,可以使用 Get 方法。 此方法是异步方法,内部使用了 go routine 执行接收操作,因此即便没有消息 可以接收时,该方法也不会阻塞。 详见 Channel.ReceiveOpts

Example
conn := getConnection()

consumer := conn.Consumer()
consumer.Receive(
	"queue.direct",
	NewReceiveOptsBuilder().SetAutoAck(false).Build(),
	&AbsReceiveListener{
		ConsumerMethod: func(delivery *amqp.Delivery) (brk bool) {
			log.Println("queue.direct ", delivery.DeliveryTag, " ", string(delivery.Body))
			err := delivery.Ack(false)
			if err != nil {
				log.Println(err)
			}
			return
		},
		FinishMethod: func(err error) {
			if err != nil {
				// 处理错误
				return
			}
			// defer xxx.close() // 关闭资源操作等
		},
	})

time.Sleep(time.Minute) // 由于 Consumer.Receive() 内部采用了异步方式处理,因此 Receive 方法不会阻塞等待
Output:

type ConsumerFunc

type ConsumerFunc func(*amqp.Delivery) (brk bool)

如果没有消息则该方法阻塞等待;否则本方法会被持续调用, 直到主动停止消费(即本方法返回 true)。 返回值 brk 表示是否 break,即在循环消费过程中是否需要终止消费。

type CtxRetry

type CtxRetry struct {
	Ctx      context.Context
	Interval time.Duration // 间隔时间,指定断线后间隔多久再尝试重试。

	sync.RWMutex
	// contains filtered or unexported fields
}

CtxRetry ...

func DefaultCtxRetry

func DefaultCtxRetry(ctx context.Context) *CtxRetry

DefaultCtxRetry ...

func NewCtxRetry

func NewCtxRetry(ctx context.Context, interval time.Duration) *CtxRetry

NewCtxRetry ...

func (*CtxRetry) GiveUp

func (r *CtxRetry) GiveUp()

GiveUp ..

type MessageFactory

type MessageFactory func(body []byte) amqp.Publishing

MessageFactory 消息工厂方法。默认提供了如: MessagePlainTransient, MessagePlainPersistent, MessageJSONPersistent 等 在内的工厂方法。 如果没有需要的工厂方法,则需要调用者自己提供对应的工厂方法。

var (
	// MessagePlainTransient 无格式、非持久化消息工厂方法
	MessagePlainTransient MessageFactory = func(body []byte) amqp.Publishing {
		return amqp.Publishing{
			ContentType:  "text/plain",
			DeliveryMode: amqp.Transient,
			Body:         body,
		}
	}
	// MessagePlainPersistent 无格式、持久化消息工厂方法
	MessagePlainPersistent MessageFactory = func(body []byte) amqp.Publishing {
		return amqp.Publishing{
			ContentType:  "text/plain",
			DeliveryMode: amqp.Persistent,
			Body:         body,
		}
	}
	// MessageJSONTransient JSON、非持久化消息工厂方法
	MessageJSONTransient MessageFactory = func(body []byte) amqp.Publishing {
		return amqp.Publishing{
			ContentType:  "text/json",
			DeliveryMode: amqp.Transient,
			Body:         body,
		}
	}
	// MessageJSONPersistent JSON、持久化消息工厂方法
	MessageJSONPersistent MessageFactory = func(body []byte) amqp.Publishing {
		return amqp.Publishing{
			ContentType:  "text/json",
			DeliveryMode: amqp.Persistent,
			Body:         body,
		}
	}
)

type Operation

type Operation func(key string, ch *Channel)

type Operations

type Operations map[string]Operation

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

func (*Producer) Send

func (p *Producer) Send(exchange string, routingKey string, body []byte, opts *SendOpts) error

Send 发送消息。 参数 body 即需要发送的消息。 参数 opts 即发送消息需要配置的选项。如果 opts 为 nil,则表示使用默认配置。可以通过配置 SendOpts.retryable 启用消息重发的能力。请注意,由于消息重发使用的是同步的方式处理 ack,因此启用消息重发会极大降低 QPS。

type Queue

type Queue struct {
	*amqp.Queue
	// contains filtered or unexported fields
}

Queue represents a queue of messages

func (*Queue) DeclareAndBind

func (q *Queue) DeclareAndBind(queueName, key, exchange string) error

DeclareAndBind 根据 queueName 声明队列,并绑定 queueName, key 到指定的 exchange。 Queue 可能会因为网络原因创建失败,不提供一定创建成功保证。

func (*Queue) RetryDeclareAndBind

func (q *Queue) RetryDeclareAndBind(queueName, key, exchange string) error

RetryDeclareAndBind will retry the specified messages Channels

type QueueBindOpts

type QueueBindOpts struct {
	// contains filtered or unexported fields
}

QueueBindOpts is the queue binding options

func DefaultBindOpts

func DefaultBindOpts() *QueueBindOpts

DefaultBindOpts returns the default queue binding options

type QueueBindOptsBuilder

type QueueBindOptsBuilder struct {
	// contains filtered or unexported fields
}

QueueBindOptsBuilder is the builder for QueueBindOptsBuilder

func NewQueueBindOptsBuilder

func NewQueueBindOptsBuilder() *QueueBindOptsBuilder

NewQueueBindOptsBuilder returns a new QueueBindOptsBuilder

func (*QueueBindOptsBuilder) Build

func (bld *QueueBindOptsBuilder) Build() *QueueBindOpts

Build sets the parameters

func (*QueueBindOptsBuilder) SetArgs

func (bld *QueueBindOptsBuilder) SetArgs(args *amqp.Table) *QueueBindOptsBuilder

SetArgs sets the arguments

func (*QueueBindOptsBuilder) SetNoWait

func (bld *QueueBindOptsBuilder) SetNoWait(b bool) *QueueBindOptsBuilder

SetNoWait sets the default value

type QueueBuilder

type QueueBuilder struct {
	// contains filtered or unexported fields
}

QueueBuilder ...

func NewQueueBuilder

func NewQueueBuilder(c *Connection) *QueueBuilder

NewQueueBuilder creates a new QueueBuilder instance

func (*QueueBuilder) Build

func (bld *QueueBuilder) Build() *Queue

Build the queue

func (*QueueBuilder) SetQueueBindOpts

func (bld *QueueBuilder) SetQueueBindOpts(builderFn func(builder *QueueBindOptsBuilder) *QueueBindOpts) *QueueBuilder

SetQueueBindOpts sets the binding options for the queue builde

func (*QueueBuilder) SetQueueDeclareOpts

func (bld *QueueBuilder) SetQueueDeclareOpts(builderFn func(builder *QueueDeclareOptsBuilder) *QueueDeclareOpts) *QueueBuilder

SetQueueDeclareOpts sets the default value for the Queue declaration

func (*QueueBuilder) SetRetryable

func (bld *QueueBuilder) SetRetryable(retryable Retryable) *QueueBuilder

SetRetryable sets the retryable flag to true for

type QueueDeclareOpts

type QueueDeclareOpts struct {
	// contains filtered or unexported fields
}

QueueDeclareOpts returns the list of queued operations

func DefaultQueueDeclareOpts

func DefaultQueueDeclareOpts() *QueueDeclareOpts

DefaultQueueDeclareOpts holds the default queue declaration

type QueueDeclareOptsBuilder

type QueueDeclareOptsBuilder struct {
	// contains filtered or unexported fields
}

QueueDeclareOptsBuilder is the builder for QueueDeclareOpts

func NewQueueDeclareOptsBuilder

func NewQueueDeclareOptsBuilder() *QueueDeclareOptsBuilder

NewQueueDeclareOptsBuilder creates a new QueueDeclareOptsBuilder

func (*QueueDeclareOptsBuilder) Build

Build returns the queue declaration options

func (*QueueDeclareOptsBuilder) SetArgs

SetArgs sets the arguments to be passed to the command

func (*QueueDeclareOptsBuilder) SetAutoDelete

func (bld *QueueDeclareOptsBuilder) SetAutoDelete(b bool) *QueueDeclareOptsBuilder

SetAutoDelete sets the auto delete option

func (*QueueDeclareOptsBuilder) SetDurable

SetDurable sets whether the queue declaration

func (*QueueDeclareOptsBuilder) SetExclusive

SetExclusive sets the exclusive

func (*QueueDeclareOptsBuilder) SetNoWait

SetNoWait sets the number of seconds to wait before returning

type ReceiveListener

type ReceiveListener interface {
	// Consumer 用于实现消费操作。详见 ConsumerFunc。
	//
	// 如果消费者主动终止了退出,应该在 Finish 中主动移除当前 ReceiveListener。
	// 否则下次断线重连会再次触发该消息的接收操作。
	Consumer(*amqp.Delivery) (brk bool)
	// Finish 处理接收完成需要执行的操作,比如用于清理或关闭某些内容。
	// 如果没有相关操作需要执行,可以提供空实现。
	// Finish 可能由于主动停止接收消息或因为产生错误被调用。
	// 如果消费时没有错误,则参数 err 为 nil。
	Finish(err error)
	// 当主动停止消费时,应当实现该方法,主动移除当前 ReceiveListener。
	// 否则,一旦断线重连,该 ReceiveListener 会继续消费。
	Remove(key string, ch *Channel)
}

ReceiveListener ...

type ReceiveOpts

type ReceiveOpts struct {
	// contains filtered or unexported fields
}

ReceiveOpts 消息接收选项。 如果 autoAck 设为 false,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去 消息。因此调用者应该主动调用 (*amqp.Delivery).Ack 确认消费,防止消息在内存(或者磁盘)中积累。 如果 autoAck 设为 true,消息会被服务器默认为已消费,可能会导致消费者无法处理数据时造成数据丢失。 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经 断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。 consumerTag 用于唯一识别一个消费者,如果不填可自动生成。 其他参数如果没有特别需求,默认不填即可。

func DefaultReceiveOpts

func DefaultReceiveOpts() *ReceiveOpts

DefaultReceiveOpts 将 ReceiveOpts.autoAck 默认设置为 true

type ReceiveOptsBuilder

type ReceiveOptsBuilder struct {
	// contains filtered or unexported fields
}

func NewReceiveOptsBuilder

func NewReceiveOptsBuilder() *ReceiveOptsBuilder

func (*ReceiveOptsBuilder) Build

func (bld *ReceiveOptsBuilder) Build() *ReceiveOpts

func (*ReceiveOptsBuilder) SetArgs

func (bld *ReceiveOptsBuilder) SetArgs(args *amqp.Table) *ReceiveOptsBuilder

func (*ReceiveOptsBuilder) SetAutoAck

func (bld *ReceiveOptsBuilder) SetAutoAck(b bool) *ReceiveOptsBuilder

作用详见 ReceiveOpts

func (*ReceiveOptsBuilder) SetConsumerTag

func (bld *ReceiveOptsBuilder) SetConsumerTag(tag string) *ReceiveOptsBuilder

func (*ReceiveOptsBuilder) SetExclusive

func (bld *ReceiveOptsBuilder) SetExclusive(b bool) *ReceiveOptsBuilder

func (*ReceiveOptsBuilder) SetNoLocal

func (bld *ReceiveOptsBuilder) SetNoLocal(b bool) *ReceiveOptsBuilder

func (*ReceiveOptsBuilder) SetNoWait

func (bld *ReceiveOptsBuilder) SetNoWait(b bool) *ReceiveOptsBuilder

type Retryable

type Retryable interface {

	// 放弃重试。在应该放弃重试的时候主动放弃重试,防止多余的重试或无限重试。
	GiveUp()
	// contains filtered or unexported methods
}

Retryable ..

type SAdder

type SAdder func() string

SAdder 返回一个 uint64 的字符串

func NewDefaultSAdder

func NewDefaultSAdder() SAdder

NewDefaultSAdder 获取一个从 0 开始累加,每次加 1 的累加器。返回一个 uint64 的字符串

func SAdderGenerator

func SAdderGenerator(delta uint64) SAdder

SAdderGenerator ..

type Schema

type Schema string
const (
	AMQP  Schema = "amqp"
	AMQPS Schema = "amqps"
)

type SendOpts

type SendOpts struct {
	// contains filtered or unexported fields
}

SendOpts 消息发送选项。 mandatory 设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列, 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当该选项设置为 false 时, 出现上述情形,则消息直接被丢弃。 immediate 设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者, 那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。 RabbitMQ 3.0版本开始去掉了对 immediate 参数的支持,对此 RabbitMQ 官方解释是: immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL 和 DLX 的方法替代。 messageFactory 如果未设置该选项,则默认使用 MessagePlainTransient 生产消息。 retryable 如果不设置该选项,表示不启用消息重发功能。

func DefaultSendOpts

func DefaultSendOpts() *SendOpts

DefaultSendOpts 默认消息发送选项:消息无格式,非持久化,启用默认重试配置(DefaultTimesRetry)

type SendOptsBuilder

type SendOptsBuilder struct {
	// contains filtered or unexported fields
}

SendOptsBuilder ...

func NewSendOptsBuilder

func NewSendOptsBuilder() *SendOptsBuilder

NewSendOptsBuilder creates a new SendOptsBuilder

func (*SendOptsBuilder) Build

func (bld *SendOptsBuilder) Build() *SendOpts

Build the sendOptsBuilder

func (*SendOptsBuilder) SetImmediate

func (bld *SendOptsBuilder) SetImmediate(b bool) *SendOptsBuilder

SetImmediate 设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者, 那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。

func (*SendOptsBuilder) SetMandatory

func (bld *SendOptsBuilder) SetMandatory(b bool) *SendOptsBuilder

SetMandatory 设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列, 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当该选项设置为 false 时, 出现上述情形,则消息直接被丢弃。

func (*SendOptsBuilder) SetMessageFactory

func (bld *SendOptsBuilder) SetMessageFactory(factory MessageFactory) *SendOptsBuilder

SetMessageFactory 设置消息工厂方法。默认为 Plain Transient (无格式,非持久化)形式。

func (*SendOptsBuilder) SetRetryable

func (bld *SendOptsBuilder) SetRetryable(retryable Retryable) *SendOptsBuilder

SetRetryable 设置重试配置

type TimesRetry

type TimesRetry struct {
	RetryTimes int           // 重试次数。如果 Always 为 true,此选项不可用。
	Interval   time.Duration // 间隔时间,指定断线后间隔多久再尝试重试。
	Always     bool          // 是否一直重试

	sync.RWMutex
	// contains filtered or unexported fields
}

TimesRetry ..

func DefaultTimesRetry

func DefaultTimesRetry() *TimesRetry

DefaultTimesRetry 创建一个默认的重试配置:总是重试,且间隔三秒

func NewTimesRetry

func NewTimesRetry(always bool, interval time.Duration, retryTimes int) *TimesRetry

NewTimesRetry 创建根据次数结束重试的配置

func (*TimesRetry) GiveUp

func (r *TimesRetry) GiveUp()

GiveUp ..

type TimesRetryBuilder

type TimesRetryBuilder struct {
	// contains filtered or unexported fields
}

TimesRetryBuilder ..

func NewTimesRetryBuilder

func NewTimesRetryBuilder() *TimesRetryBuilder

NewTimesRetryBuilder ..

func (*TimesRetryBuilder) Builder

func (bld *TimesRetryBuilder) Builder() *TimesRetry

Builder ...

func (*TimesRetryBuilder) SetAlways

func (bld *TimesRetryBuilder) SetAlways(always bool) *TimesRetryBuilder

SetAlways ...

func (*TimesRetryBuilder) SetInterval

func (bld *TimesRetryBuilder) SetInterval(interval time.Duration) *TimesRetryBuilder

SetInterval ...

func (*TimesRetryBuilder) SetRetryTimes

func (bld *TimesRetryBuilder) SetRetryTimes(retryTimes int) *TimesRetryBuilder

SetRetryTimes ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL