queue_reliable

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type RedisQueueStatus

type RedisQueueStatus struct {
	Key         string // 标识消费者的唯一Key
	MachineName string // 机器名
	UserName    string // 用户名
	ProcessId   int    // 进程Id
	Ip          string // Ip地址
	CreateTime  int64  // 开始时间
	LastActive  int64  // 最后活跃时间
	Consumes    int64  // 消费消息数
	Acks        int64  // 确认消息数
}

func CreateStatus

func CreateStatus() RedisQueueStatus

type RedisReliableQueue

type RedisReliableQueue struct {
	DB int // redis DB 默认为 0

	ThrowOnFailure              bool   // 失败时抛出异常。默认false
	RetryTimesWhenSendFailed    int    // 发送消息失败时的重试次数。默认3次
	RetryIntervalWhenSendFailed int    // 重试间隔。默认1000ms
	AckKey                      string // 用于确认的列表
	RetryInterval               int64  // 重新处理确认队列中死信的间隔。默认60s
	MinPipeline                 int64  // 最小管道阈值,达到该值时使用管道,默认3

	IsEmpty bool             // 是否为空
	Status  RedisQueueStatus // 消费状态
	// contains filtered or unexported fields
}

func New added in v0.3.1

func New(client cache.ICache, key string, logger glog.ILogger) *RedisReliableQueue

func (*RedisReliableQueue) Acknowledge added in v0.3.1

func (r *RedisReliableQueue) Acknowledge(keys ...string) int64

Acknowledge 确认消费,从AckKey中删除

func (*RedisReliableQueue) Add

func (r *RedisReliableQueue) Add(values ...interface{}) int64

Add 批量生产添加

func (*RedisReliableQueue) AddDelay added in v0.3.1

func (r *RedisReliableQueue) AddDelay(value interface{}, delay int64) int64

AddDelay 添加延迟消息

func (*RedisReliableQueue) ClearAllAck added in v0.3.1

func (r *RedisReliableQueue) ClearAllAck()

ClearAllAck 清空所有Ack队列。危险操作!!!

func (*RedisReliableQueue) Consume added in v0.3.1

func (r *RedisReliableQueue) Consume(fn func(value interface{}) int64, timeOut ...int64) int64

Consume 高级消费消息。消息处理成功后,自动确认并删除消息体 Publish 必须跟 ConsumeAsync 配对使用。

func (*RedisReliableQueue) InitDelay added in v0.3.1

func (r *RedisReliableQueue) InitDelay()

InitDelay 初始化延迟队列功能。生产者自动初始化,消费者最好能够按队列初始化一次 该功能是附加功能,需要消费者主动调用,每个队列的多消费者开一个即可。

核心工作是启动延迟队列的TransferAsync大循环,每个进程内按队列开一个最合适,多了没有用反而形成争夺。

func (*RedisReliableQueue) Publish added in v0.3.1

func (r *RedisReliableQueue) Publish(messages map[string]interface{}, expire int64) int64

Publish 高级生产消息。消息体和消息键分离,业务层指定消息键,可随时查看或删除,同时避免重复生产

Publish 必须跟 ConsumeAsync 配对使用。

messages 消息字典,id为键,消息体为值 expire 消息体过期时间,单位秒

func (*RedisReliableQueue) RetryAck

func (r *RedisReliableQueue) RetryAck()

RetryAck 消费获取,从Key弹出并备份到AckKey,支持阻塞 假定前面获取的消息已经确认,因该方法内部可能回滚确认队列,避免误杀 超时时间,默认0秒永远阻塞;负数表示直接返回,不阻塞。

func (*RedisReliableQueue) RollbackAck

func (r *RedisReliableQueue) RollbackAck(key, ackKey string) []string

RollbackAck 回滚指定AckKey内的消息到Key

func (*RedisReliableQueue) RollbackAllAck

func (r *RedisReliableQueue) RollbackAllAck() int64

RollbackAllAck 全局回滚死信,一般由单一线程执行,避免干扰处理中数据

func (*RedisReliableQueue) Take

func (r *RedisReliableQueue) Take(count ...int) []string

Take 批量消费获取,从Key弹出并备份到AckKey 假定前面获取的消息已经确认,因该方法内部可能回滚确认队列,避免误杀 count 要消费的消息个数

func (*RedisReliableQueue) TakeAck added in v0.3.1

func (r *RedisReliableQueue) TakeAck(count ...int) []string

TakeAck 从确认列表弹出消息,用于消费中断后,重新恢复现场时获取 理论上Ack队列只存储极少数数据

func (*RedisReliableQueue) TakeOne

func (r *RedisReliableQueue) TakeOne(timeout ...int64) string

TakeOne 消费获取,从Key弹出并备份到AckKey,支持阻塞 假定前面获取的消息已经确认,因该方法内部可能回滚确认队列,避免误杀 timeout 超时时间,默认0秒永远阻塞;负数表示直接返回,不阻塞。

func (*RedisReliableQueue) UpdateStatus

func (r *RedisReliableQueue) UpdateStatus()

UpdateStatus 更新状态

type ReliableQueue added in v0.3.1

type ReliableQueue struct {
	// contains filtered or unexported fields
}

func NewReliableQueue added in v0.3.1

func NewReliableQueue(client cache.ICache, logger glog.ILogger) *ReliableQueue

func (*ReliableQueue) GetReliableQueue added in v0.3.1

func (r *ReliableQueue) GetReliableQueue(topic string) *RedisReliableQueue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL