Documentation ¶
Index ¶
- type AbsReceiveListener
- type Adder
- type Channel
- func (c *Channel) Receive(queue string, consumer ConsumerFunc) error
- func (c *Channel) ReceiveOpts(queue string, consumer ConsumerFunc, opts *ReceiveOpts) error
- func (c *Channel) RemoveOperation(key string)
- func (c *Channel) Send(exchange string, routingKey string, body []byte) error
- func (c *Channel) SendOpts(exchange string, routingKey string, body []byte, opts *SendOpts) error
- type Connection
- func (c *Connection) CanRetry() bool
- func (c *Connection) Channel() (*Channel, error)
- func (c *Connection) Close() error
- func (c *Connection) Consumer() *Consumer
- func (c *Connection) Dial() error
- func (c *Connection) IsOpen() bool
- func (c *Connection) Producer() *Producer
- func (c *Connection) QueueBuilder() *QueueBuilder
- func (c *Connection) RegisterAndExec(opt Operation)
- func (c *Connection) RemoveOperation(key string)
- func (c *Connection) RetryChannel(retryable Retryable) (ch *Channel, err error)
- type Consumer
- type ConsumerFunc
- type CtxRetry
- type MessageFactory
- type Operation
- type Operations
- type Producer
- type Queue
- type QueueBindOpts
- type QueueBindOptsBuilder
- type QueueBuilder
- func (bld *QueueBuilder) Build() *Queue
- func (bld *QueueBuilder) SetQueueBindOpts(builderFn func(builder *QueueBindOptsBuilder) *QueueBindOpts) *QueueBuilder
- func (bld *QueueBuilder) SetQueueDeclareOpts(builderFn func(builder *QueueDeclareOptsBuilder) *QueueDeclareOpts) *QueueBuilder
- func (bld *QueueBuilder) SetRetryable(retryable Retryable) *QueueBuilder
- type QueueDeclareOpts
- type QueueDeclareOptsBuilder
- func (bld *QueueDeclareOptsBuilder) Build() *QueueDeclareOpts
- func (bld *QueueDeclareOptsBuilder) SetArgs(args *amqp.Table) *QueueDeclareOptsBuilder
- func (bld *QueueDeclareOptsBuilder) SetAutoDelete(b bool) *QueueDeclareOptsBuilder
- func (bld *QueueDeclareOptsBuilder) SetDurable(b bool) *QueueDeclareOptsBuilder
- func (bld *QueueDeclareOptsBuilder) SetExclusive(b bool) *QueueDeclareOptsBuilder
- func (bld *QueueDeclareOptsBuilder) SetNoWait(b bool) *QueueDeclareOptsBuilder
- type ReceiveListener
- type ReceiveOpts
- type ReceiveOptsBuilder
- func (bld *ReceiveOptsBuilder) Build() *ReceiveOpts
- func (bld *ReceiveOptsBuilder) SetArgs(args *amqp.Table) *ReceiveOptsBuilder
- func (bld *ReceiveOptsBuilder) SetAutoAck(b bool) *ReceiveOptsBuilder
- func (bld *ReceiveOptsBuilder) SetConsumerTag(tag string) *ReceiveOptsBuilder
- func (bld *ReceiveOptsBuilder) SetExclusive(b bool) *ReceiveOptsBuilder
- func (bld *ReceiveOptsBuilder) SetNoLocal(b bool) *ReceiveOptsBuilder
- func (bld *ReceiveOptsBuilder) SetNoWait(b bool) *ReceiveOptsBuilder
- type Retryable
- type SAdder
- type Schema
- type SendOpts
- type SendOptsBuilder
- func (bld *SendOptsBuilder) Build() *SendOpts
- func (bld *SendOptsBuilder) SetImmediate(b bool) *SendOptsBuilder
- func (bld *SendOptsBuilder) SetMandatory(b bool) *SendOptsBuilder
- func (bld *SendOptsBuilder) SetMessageFactory(factory MessageFactory) *SendOptsBuilder
- func (bld *SendOptsBuilder) SetRetryable(retryable Retryable) *SendOptsBuilder
- type TimesRetry
- type TimesRetryBuilder
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AbsReceiveListener ¶
type AbsReceiveListener struct { ConsumerMethod ConsumerFunc FinishMethod func(err error) }
AbsReceiveListener 的抽象实现。 如果 ConsumerMethod 为 nil 或不赋值,将 panic; 如果 FinishMethod 为 nil 或不赋值,则默认不做任何操作。
func (*AbsReceiveListener) Consumer ¶
func (lis *AbsReceiveListener) Consumer(delivery *amqp.Delivery) (brk bool)
Consumer ..
func (*AbsReceiveListener) Remove ¶
func (lis *AbsReceiveListener) Remove(key string, ch *Channel)
Remove TODO
type Adder ¶
type Adder func() uint64
Adder 累加器。每次执行累加一定数额,返回一个 uint64。
func AdderGenerator ¶
AdderGenerator 累加器生成器。生成的累加器从 0 开始累加,delta 表示需要累加的数字
type Channel ¶
Channel represents a channel
func (*Channel) Receive ¶
func (c *Channel) Receive(queue string, consumer ConsumerFunc) error
Receive returns error if delivery
func (*Channel) ReceiveOpts ¶
func (c *Channel) ReceiveOpts(queue string, consumer ConsumerFunc, opts *ReceiveOpts) error
ReceiveOpts 持续接收消息并消费,除非 `<-chan amqp.Delivery` 关闭或 ConsumerFunc 主动放弃接收。 参数 opts 表示接收选项。opts 如果为 nil,将使用 DefaultReceiveOpts() 作为默认配置。 如果将参数 opts 的 autoAck 属性设为 false,则应该在 ReceiveListener.Consumer() 函数中调用 (*amqp.Delivery).Ack 手动确认消费;如果设为 true,已发送的消息会被服务器认为已被消费, 可能因网络状况不好等原因导致消息未被接收,从而造成数据丢失。 autoAck 为 false 时不提供自动确认, 是因为消费者有可能会需要拒绝确认,或在消费出现错误时不进行确认。 参数 consumer 用于处理接收操作。参数 consumer 一定不能为 nil,否则将 panic。 返回值:当 `<-chan amqp.Delivery` 关闭或 ConsumerFunc 主动放弃接收,返回 nil;其他情况则返回 error
func (*Channel) RemoveOperation ¶
RemoveOperation removes an operation from the channel
type Connection ¶
Connection amqp 连接。 Connection 创建后不会直接连接服务器,而是要调用 Dial 后才会执行连接服务器操作
func Dial ¶
func Dial(url string, retryable Retryable) (*Connection, error)
Dial 如果 retryable 为 nil,则表示不启用断线重连
Example ¶
conn, err := Dial("amqp://guest:guest@localhost:5672/", nil) if err != nil { panic(err) } defer conn.Close() fmt.Println("connected!")
Output: connected!
func NewConnection ¶
func NewConnection(url string, retryable Retryable) *Connection
NewConnection retryable 如果为 nil,则使用 emptyRetryable 替换。emptyRetryable 不会尝试重试操作。
func (*Connection) Channel ¶
func (c *Connection) Channel() (*Channel, error)
Channel 可用于发送、接收消息。 函数会先判断是否已连接,否则将尝试重连(使用您之前设置的 Retryable 配置)。 在获得连接的情况下,会立刻创建 Channel。但可能会存在极少数情况下,因为网络不稳定等因素, Channel 创建之前,连接又断开,则会因为网络原因产生错误。
func (*Connection) Consumer ¶
func (c *Connection) Consumer() *Consumer
Consumer returns the consumer for the connection
func (*Connection) Producer ¶
func (c *Connection) Producer() *Producer
Producer returns the producer for the connection
func (*Connection) QueueBuilder ¶
func (c *Connection) QueueBuilder() *QueueBuilder
QueueBuilder returns the queue builder for the connection
func (*Connection) RegisterAndExec ¶
func (c *Connection) RegisterAndExec(opt Operation)
RegisterAndExec 注册并执行 Operation 。 每次重连后,重连监听器会自动使用 exec 执行一遍所有的 Operation 函数。 注意:
- 函数会在 Operation 执行完后主动关闭 Channel,因此我们无需在 Operation 中手动关闭 Channel。
- 由于使用了 go routine,该方法可能会在 Operation 操作执行完毕前返回。
Example ¶
var err error conn := NewConnection("amqp://guest:guest@localhost:5672/", &TimesRetry{Interval: 3 * time.Second, RetryTimes: 10}) defer conn.Close() err = conn.Dial() if err != nil { log.Fatal(err) } consumerTag := NewDefaultSAdder() conn.RegisterAndExec(func(key string, ch *Channel) { deliverys, e := ch.Consume("queue.direct", consumerTag(), true, false, false, false, nil) if e != nil { log.Fatal(e) } for delivery := range deliverys { log.Println("queue.direct-1 ", delivery.DeliveryTag, " ", string(delivery.Body)) } }) conn.RegisterAndExec(func(key string, ch *Channel) { e := ch.Receive("queue.direct", func(delivery *amqp.Delivery) (brk bool) { log.Println("queue.direct-2 ", delivery.DeliveryTag, " ", string(delivery.Body)) return true }) if e != nil { log.Fatal(e) } }) //注意:conn.RegisterAndExec 会导致重连时再次执行 Operation 中的操作。如果不希望发生此情况,应该使用 Connect.Channel 消费消息 conn.RegisterAndExec(func(key string, ch *Channel) { delivery, _, e := ch.Get("queue.direct", true) if e != nil { log.Fatal(e) } log.Println("queue.direct ", delivery.DeliveryTag, " ", string(delivery.Body)) }) //注意:conn.RegisterAndExec 会导致重连时再次执行 Operation 中的操作。如果不希望发生此情况,应该使用 Connect.Channel 发送消息 conn.RegisterAndExec(func(key string, ch *Channel) { e := ch.Send("amq.direct", "key.direct", []byte("rabbitmq addOperation() send test!")) if e != nil { log.Fatal(e) } })
Output:
func (*Connection) RemoveOperation ¶
func (c *Connection) RemoveOperation(key string)
func (*Connection) RetryChannel ¶
func (c *Connection) RetryChannel(retryable Retryable) (ch *Channel, err error)
RetryChannel 将在 Channel 创建失败后,尝试重试。如果仍然失败,将返回造成失败的原因。
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) Get ¶
Get When autoAck is true, the server will automatically acknowledge this message so you don't have to. But if you are unable to fully process this message before the channel or connection is closed, the message will not get requeued
func (*Consumer) Receive ¶
func (c *Consumer) Receive(queue string, opts *ReceiveOpts, lis ReceiveListener)
Receive 持续接收消息并消费。如果期望只接收一次消息,可以使用 Get 方法。 此方法是异步方法,内部使用了 go routine 执行接收操作,因此即便没有消息 可以接收时,该方法也不会阻塞。 详见 Channel.ReceiveOpts
Example ¶
conn := getConnection() consumer := conn.Consumer() consumer.Receive( "queue.direct", NewReceiveOptsBuilder().SetAutoAck(false).Build(), &AbsReceiveListener{ ConsumerMethod: func(delivery *amqp.Delivery) (brk bool) { log.Println("queue.direct ", delivery.DeliveryTag, " ", string(delivery.Body)) err := delivery.Ack(false) if err != nil { log.Println(err) } return }, FinishMethod: func(err error) { if err != nil { // 处理错误 return } // defer xxx.close() // 关闭资源操作等 }, }) time.Sleep(time.Minute) // 由于 Consumer.Receive() 内部采用了异步方式处理,因此 Receive 方法不会阻塞等待
Output:
type ConsumerFunc ¶
如果没有消息则该方法阻塞等待;否则本方法会被持续调用, 直到主动停止消费(即本方法返回 true)。 返回值 brk 表示是否 break,即在循环消费过程中是否需要终止消费。
type CtxRetry ¶
type CtxRetry struct { Ctx context.Context Interval time.Duration // 间隔时间,指定断线后间隔多久再尝试重试。 sync.RWMutex // contains filtered or unexported fields }
CtxRetry ...
func NewCtxRetry ¶
NewCtxRetry ...
type MessageFactory ¶
type MessageFactory func(body []byte) amqp.Publishing
MessageFactory 消息工厂方法。默认提供了如: MessagePlainTransient, MessagePlainPersistent, MessageJSONPersistent 等 在内的工厂方法。 如果没有需要的工厂方法,则需要调用者自己提供对应的工厂方法。
var ( // MessagePlainTransient 无格式、非持久化消息工厂方法 MessagePlainTransient MessageFactory = func(body []byte) amqp.Publishing { return amqp.Publishing{ ContentType: "text/plain", DeliveryMode: amqp.Transient, Body: body, } } // MessagePlainPersistent 无格式、持久化消息工厂方法 MessagePlainPersistent MessageFactory = func(body []byte) amqp.Publishing { return amqp.Publishing{ ContentType: "text/plain", DeliveryMode: amqp.Persistent, Body: body, } } // MessageJSONTransient JSON、非持久化消息工厂方法 MessageJSONTransient MessageFactory = func(body []byte) amqp.Publishing { return amqp.Publishing{ ContentType: "text/json", DeliveryMode: amqp.Transient, Body: body, } } // MessageJSONPersistent JSON、持久化消息工厂方法 MessageJSONPersistent MessageFactory = func(body []byte) amqp.Publishing { return amqp.Publishing{ ContentType: "text/json", DeliveryMode: amqp.Persistent, Body: body, } } )
type Operations ¶
type Queue ¶
Queue represents a queue of messages
func (*Queue) DeclareAndBind ¶
DeclareAndBind 根据 queueName 声明队列,并绑定 queueName, key 到指定的 exchange。 Queue 可能会因为网络原因创建失败,不提供一定创建成功保证。
func (*Queue) RetryDeclareAndBind ¶
RetryDeclareAndBind will retry the specified messages Channels
type QueueBindOpts ¶
type QueueBindOpts struct {
// contains filtered or unexported fields
}
QueueBindOpts is the queue binding options
func DefaultBindOpts ¶
func DefaultBindOpts() *QueueBindOpts
DefaultBindOpts returns the default queue binding options
type QueueBindOptsBuilder ¶
type QueueBindOptsBuilder struct {
// contains filtered or unexported fields
}
QueueBindOptsBuilder is the builder for QueueBindOptsBuilder
func NewQueueBindOptsBuilder ¶
func NewQueueBindOptsBuilder() *QueueBindOptsBuilder
NewQueueBindOptsBuilder returns a new QueueBindOptsBuilder
func (*QueueBindOptsBuilder) Build ¶
func (bld *QueueBindOptsBuilder) Build() *QueueBindOpts
Build sets the parameters
func (*QueueBindOptsBuilder) SetArgs ¶
func (bld *QueueBindOptsBuilder) SetArgs(args *amqp.Table) *QueueBindOptsBuilder
SetArgs sets the arguments
func (*QueueBindOptsBuilder) SetNoWait ¶
func (bld *QueueBindOptsBuilder) SetNoWait(b bool) *QueueBindOptsBuilder
SetNoWait sets the default value
type QueueBuilder ¶
type QueueBuilder struct {
// contains filtered or unexported fields
}
QueueBuilder ...
func NewQueueBuilder ¶
func NewQueueBuilder(c *Connection) *QueueBuilder
NewQueueBuilder creates a new QueueBuilder instance
func (*QueueBuilder) SetQueueBindOpts ¶
func (bld *QueueBuilder) SetQueueBindOpts(builderFn func(builder *QueueBindOptsBuilder) *QueueBindOpts) *QueueBuilder
SetQueueBindOpts sets the binding options for the queue builde
func (*QueueBuilder) SetQueueDeclareOpts ¶
func (bld *QueueBuilder) SetQueueDeclareOpts(builderFn func(builder *QueueDeclareOptsBuilder) *QueueDeclareOpts) *QueueBuilder
SetQueueDeclareOpts sets the default value for the Queue declaration
func (*QueueBuilder) SetRetryable ¶
func (bld *QueueBuilder) SetRetryable(retryable Retryable) *QueueBuilder
SetRetryable sets the retryable flag to true for
type QueueDeclareOpts ¶
type QueueDeclareOpts struct {
// contains filtered or unexported fields
}
QueueDeclareOpts returns the list of queued operations
func DefaultQueueDeclareOpts ¶
func DefaultQueueDeclareOpts() *QueueDeclareOpts
DefaultQueueDeclareOpts holds the default queue declaration
type QueueDeclareOptsBuilder ¶
type QueueDeclareOptsBuilder struct {
// contains filtered or unexported fields
}
QueueDeclareOptsBuilder is the builder for QueueDeclareOpts
func NewQueueDeclareOptsBuilder ¶
func NewQueueDeclareOptsBuilder() *QueueDeclareOptsBuilder
NewQueueDeclareOptsBuilder creates a new QueueDeclareOptsBuilder
func (*QueueDeclareOptsBuilder) Build ¶
func (bld *QueueDeclareOptsBuilder) Build() *QueueDeclareOpts
Build returns the queue declaration options
func (*QueueDeclareOptsBuilder) SetArgs ¶
func (bld *QueueDeclareOptsBuilder) SetArgs(args *amqp.Table) *QueueDeclareOptsBuilder
SetArgs sets the arguments to be passed to the command
func (*QueueDeclareOptsBuilder) SetAutoDelete ¶
func (bld *QueueDeclareOptsBuilder) SetAutoDelete(b bool) *QueueDeclareOptsBuilder
SetAutoDelete sets the auto delete option
func (*QueueDeclareOptsBuilder) SetDurable ¶
func (bld *QueueDeclareOptsBuilder) SetDurable(b bool) *QueueDeclareOptsBuilder
SetDurable sets whether the queue declaration
func (*QueueDeclareOptsBuilder) SetExclusive ¶
func (bld *QueueDeclareOptsBuilder) SetExclusive(b bool) *QueueDeclareOptsBuilder
SetExclusive sets the exclusive
func (*QueueDeclareOptsBuilder) SetNoWait ¶ added in v3.1.21
func (bld *QueueDeclareOptsBuilder) SetNoWait(b bool) *QueueDeclareOptsBuilder
SetNoWait sets the number of seconds to wait before returning
type ReceiveListener ¶
type ReceiveListener interface { // Consumer 用于实现消费操作。详见 ConsumerFunc。 // // 如果消费者主动终止了退出,应该在 Finish 中主动移除当前 ReceiveListener。 // 否则下次断线重连会再次触发该消息的接收操作。 Consumer(*amqp.Delivery) (brk bool) // Finish 处理接收完成需要执行的操作,比如用于清理或关闭某些内容。 // 如果没有相关操作需要执行,可以提供空实现。 // Finish 可能由于主动停止接收消息或因为产生错误被调用。 // 如果消费时没有错误,则参数 err 为 nil。 Finish(err error) // 当主动停止消费时,应当实现该方法,主动移除当前 ReceiveListener。 // 否则,一旦断线重连,该 ReceiveListener 会继续消费。 Remove(key string, ch *Channel) }
ReceiveListener ...
type ReceiveOpts ¶
type ReceiveOpts struct {
// contains filtered or unexported fields
}
ReceiveOpts 消息接收选项。 如果 autoAck 设为 false,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去 消息。因此调用者应该主动调用 (*amqp.Delivery).Ack 确认消费,防止消息在内存(或者磁盘)中积累。 如果 autoAck 设为 true,消息会被服务器默认为已消费,可能会导致消费者无法处理数据时造成数据丢失。 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经 断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。 consumerTag 用于唯一识别一个消费者,如果不填可自动生成。 其他参数如果没有特别需求,默认不填即可。
func DefaultReceiveOpts ¶
func DefaultReceiveOpts() *ReceiveOpts
DefaultReceiveOpts 将 ReceiveOpts.autoAck 默认设置为 true
type ReceiveOptsBuilder ¶
type ReceiveOptsBuilder struct {
// contains filtered or unexported fields
}
func NewReceiveOptsBuilder ¶
func NewReceiveOptsBuilder() *ReceiveOptsBuilder
func (*ReceiveOptsBuilder) Build ¶
func (bld *ReceiveOptsBuilder) Build() *ReceiveOpts
func (*ReceiveOptsBuilder) SetArgs ¶
func (bld *ReceiveOptsBuilder) SetArgs(args *amqp.Table) *ReceiveOptsBuilder
func (*ReceiveOptsBuilder) SetAutoAck ¶
func (bld *ReceiveOptsBuilder) SetAutoAck(b bool) *ReceiveOptsBuilder
作用详见 ReceiveOpts
func (*ReceiveOptsBuilder) SetConsumerTag ¶
func (bld *ReceiveOptsBuilder) SetConsumerTag(tag string) *ReceiveOptsBuilder
func (*ReceiveOptsBuilder) SetExclusive ¶
func (bld *ReceiveOptsBuilder) SetExclusive(b bool) *ReceiveOptsBuilder
func (*ReceiveOptsBuilder) SetNoLocal ¶
func (bld *ReceiveOptsBuilder) SetNoLocal(b bool) *ReceiveOptsBuilder
func (*ReceiveOptsBuilder) SetNoWait ¶
func (bld *ReceiveOptsBuilder) SetNoWait(b bool) *ReceiveOptsBuilder
type Retryable ¶
type Retryable interface { // 放弃重试。在应该放弃重试的时候主动放弃重试,防止多余的重试或无限重试。 GiveUp() // contains filtered or unexported methods }
Retryable ..
type SAdder ¶
type SAdder func() string
SAdder 返回一个 uint64 的字符串
func NewDefaultSAdder ¶
func NewDefaultSAdder() SAdder
NewDefaultSAdder 获取一个从 0 开始累加,每次加 1 的累加器。返回一个 uint64 的字符串
type SendOpts ¶
type SendOpts struct {
// contains filtered or unexported fields
}
SendOpts 消息发送选项。 mandatory 设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列, 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当该选项设置为 false 时, 出现上述情形,则消息直接被丢弃。 immediate 设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者, 那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。 RabbitMQ 3.0版本开始去掉了对 immediate 参数的支持,对此 RabbitMQ 官方解释是: immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL 和 DLX 的方法替代。 messageFactory 如果未设置该选项,则默认使用 MessagePlainTransient 生产消息。 retryable 如果不设置该选项,表示不启用消息重发功能。
func DefaultSendOpts ¶
func DefaultSendOpts() *SendOpts
DefaultSendOpts 默认消息发送选项:消息无格式,非持久化,启用默认重试配置(DefaultTimesRetry)
type SendOptsBuilder ¶
type SendOptsBuilder struct {
// contains filtered or unexported fields
}
SendOptsBuilder ...
func NewSendOptsBuilder ¶
func NewSendOptsBuilder() *SendOptsBuilder
NewSendOptsBuilder creates a new SendOptsBuilder
func (*SendOptsBuilder) Build ¶
func (bld *SendOptsBuilder) Build() *SendOpts
Build the sendOptsBuilder
func (*SendOptsBuilder) SetImmediate ¶
func (bld *SendOptsBuilder) SetImmediate(b bool) *SendOptsBuilder
SetImmediate 设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者, 那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。
func (*SendOptsBuilder) SetMandatory ¶
func (bld *SendOptsBuilder) SetMandatory(b bool) *SendOptsBuilder
SetMandatory 设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列, 那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当该选项设置为 false 时, 出现上述情形,则消息直接被丢弃。
func (*SendOptsBuilder) SetMessageFactory ¶
func (bld *SendOptsBuilder) SetMessageFactory(factory MessageFactory) *SendOptsBuilder
SetMessageFactory 设置消息工厂方法。默认为 Plain Transient (无格式,非持久化)形式。
func (*SendOptsBuilder) SetRetryable ¶
func (bld *SendOptsBuilder) SetRetryable(retryable Retryable) *SendOptsBuilder
SetRetryable 设置重试配置
type TimesRetry ¶
type TimesRetry struct { RetryTimes int // 重试次数。如果 Always 为 true,此选项不可用。 Interval time.Duration // 间隔时间,指定断线后间隔多久再尝试重试。 Always bool // 是否一直重试 sync.RWMutex // contains filtered or unexported fields }
TimesRetry ..
func DefaultTimesRetry ¶
func DefaultTimesRetry() *TimesRetry
DefaultTimesRetry 创建一个默认的重试配置:总是重试,且间隔三秒
func NewTimesRetry ¶
func NewTimesRetry(always bool, interval time.Duration, retryTimes int) *TimesRetry
NewTimesRetry 创建根据次数结束重试的配置
type TimesRetryBuilder ¶
type TimesRetryBuilder struct {
// contains filtered or unexported fields
}
TimesRetryBuilder ..
func (*TimesRetryBuilder) SetAlways ¶
func (bld *TimesRetryBuilder) SetAlways(always bool) *TimesRetryBuilder
SetAlways ...
func (*TimesRetryBuilder) SetInterval ¶
func (bld *TimesRetryBuilder) SetInterval(interval time.Duration) *TimesRetryBuilder
SetInterval ...
func (*TimesRetryBuilder) SetRetryTimes ¶
func (bld *TimesRetryBuilder) SetRetryTimes(retryTimes int) *TimesRetryBuilder
SetRetryTimes ...