Documentation ¶
Index ¶
- type RedisQueueStatus
- type RedisReliableQueue
- func (r *RedisReliableQueue) Acknowledge(keys ...string) int64
- func (r *RedisReliableQueue) Add(values ...interface{}) int64
- func (r *RedisReliableQueue) AddDelay(value interface{}, delay int64) int64
- func (r *RedisReliableQueue) ClearAllAck()
- func (r *RedisReliableQueue) Consume(fn func(value interface{}) int64, timeOut ...int64) int64
- func (r *RedisReliableQueue) InitDelay()
- func (r *RedisReliableQueue) Publish(messages map[string]interface{}, expire int64) int64
- func (r *RedisReliableQueue) RetryAck()
- func (r *RedisReliableQueue) RollbackAck(key, ackKey string) []string
- func (r *RedisReliableQueue) RollbackAllAck() int64
- func (r *RedisReliableQueue) Take(count ...int) []string
- func (r *RedisReliableQueue) TakeAck(count ...int) []string
- func (r *RedisReliableQueue) TakeOne(timeout ...int64) string
- func (r *RedisReliableQueue) UpdateStatus()
- type ReliableQueue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type RedisQueueStatus ¶
type RedisQueueStatus struct { Key string // 标识消费者的唯一Key MachineName string // 机器名 UserName string // 用户名 ProcessId int // 进程Id Ip string // Ip地址 CreateTime int64 // 开始时间 LastActive int64 // 最后活跃时间 Consumes int64 // 消费消息数 Acks int64 // 确认消息数 }
func CreateStatus ¶
func CreateStatus() RedisQueueStatus
type RedisReliableQueue ¶
type RedisReliableQueue struct { DB int // redis DB 默认为 0 ThrowOnFailure bool // 失败时抛出异常。默认false RetryTimesWhenSendFailed int // 发送消息失败时的重试次数。默认3次 RetryIntervalWhenSendFailed int // 重试间隔。默认1000ms AckKey string // 用于确认的列表 RetryInterval int64 // 重新处理确认队列中死信的间隔。默认60s MinPipeline int64 // 最小管道阈值,达到该值时使用管道,默认3 IsEmpty bool // 是否为空 Status RedisQueueStatus // 消费状态 // contains filtered or unexported fields }
func (*RedisReliableQueue) Acknowledge ¶ added in v0.3.1
func (r *RedisReliableQueue) Acknowledge(keys ...string) int64
Acknowledge 确认消费,从AckKey中删除
func (*RedisReliableQueue) Add ¶
func (r *RedisReliableQueue) Add(values ...interface{}) int64
Add 批量生产添加
func (*RedisReliableQueue) AddDelay ¶ added in v0.3.1
func (r *RedisReliableQueue) AddDelay(value interface{}, delay int64) int64
AddDelay 添加延迟消息
func (*RedisReliableQueue) ClearAllAck ¶ added in v0.3.1
func (r *RedisReliableQueue) ClearAllAck()
ClearAllAck 清空所有Ack队列。危险操作!!!
func (*RedisReliableQueue) Consume ¶ added in v0.3.1
func (r *RedisReliableQueue) Consume(fn func(value interface{}) int64, timeOut ...int64) int64
Consume 高级消费消息。消息处理成功后,自动确认并删除消息体 Publish 必须跟 ConsumeAsync 配对使用。
func (*RedisReliableQueue) InitDelay ¶ added in v0.3.1
func (r *RedisReliableQueue) InitDelay()
InitDelay 初始化延迟队列功能。生产者自动初始化,消费者最好能够按队列初始化一次 该功能是附加功能,需要消费者主动调用,每个队列的多消费者开一个即可。
核心工作是启动延迟队列的TransferAsync大循环,每个进程内按队列开一个最合适,多了没有用反而形成争夺。
func (*RedisReliableQueue) Publish ¶ added in v0.3.1
func (r *RedisReliableQueue) Publish(messages map[string]interface{}, expire int64) int64
Publish 高级生产消息。消息体和消息键分离,业务层指定消息键,可随时查看或删除,同时避免重复生产
Publish 必须跟 ConsumeAsync 配对使用。
messages 消息字典,id为键,消息体为值 expire 消息体过期时间,单位秒
func (*RedisReliableQueue) RetryAck ¶
func (r *RedisReliableQueue) RetryAck()
RetryAck 消费获取,从Key弹出并备份到AckKey,支持阻塞 假定前面获取的消息已经确认,因该方法内部可能回滚确认队列,避免误杀 超时时间,默认0秒永远阻塞;负数表示直接返回,不阻塞。
func (*RedisReliableQueue) RollbackAck ¶
func (r *RedisReliableQueue) RollbackAck(key, ackKey string) []string
RollbackAck 回滚指定AckKey内的消息到Key
func (*RedisReliableQueue) RollbackAllAck ¶
func (r *RedisReliableQueue) RollbackAllAck() int64
RollbackAllAck 全局回滚死信,一般由单一线程执行,避免干扰处理中数据
func (*RedisReliableQueue) Take ¶
func (r *RedisReliableQueue) Take(count ...int) []string
Take 批量消费获取,从Key弹出并备份到AckKey 假定前面获取的消息已经确认,因该方法内部可能回滚确认队列,避免误杀 count 要消费的消息个数
func (*RedisReliableQueue) TakeAck ¶ added in v0.3.1
func (r *RedisReliableQueue) TakeAck(count ...int) []string
TakeAck 从确认列表弹出消息,用于消费中断后,重新恢复现场时获取 理论上Ack队列只存储极少数数据
func (*RedisReliableQueue) TakeOne ¶
func (r *RedisReliableQueue) TakeOne(timeout ...int64) string
TakeOne 消费获取,从Key弹出并备份到AckKey,支持阻塞 假定前面获取的消息已经确认,因该方法内部可能回滚确认队列,避免误杀 timeout 超时时间,默认0秒永远阻塞;负数表示直接返回,不阻塞。
func (*RedisReliableQueue) UpdateStatus ¶
func (r *RedisReliableQueue) UpdateStatus()
UpdateStatus 更新状态
type ReliableQueue ¶ added in v0.3.1
type ReliableQueue struct {
// contains filtered or unexported fields
}
func NewReliableQueue ¶ added in v0.3.1
func NewReliableQueue(client cache.ICache, logger glog.ILogger) *ReliableQueue
func (*ReliableQueue) GetReliableQueue ¶ added in v0.3.1
func (r *ReliableQueue) GetReliableQueue(topic string) *RedisReliableQueue