queue_stream

package
v0.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 11, 2024 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type RedisStream

type RedisStream struct {
	DB int // redis DB 默认为 0

	Topic                       string // 消息队列主题
	ThrowOnFailure              bool   // 失败时抛出异常。默认false
	RetryTimesWhenSendFailed    int    // 发送消息失败时的重试次数。默认3次
	RetryIntervalWhenSendFailed int    // 重试间隔。默认1000ms

	RetryInterval int64  // 重新处理确认队列中死信的间隔。默认60s
	MaxLength     int64  // 最大队列长度。要保留的消息个数,超过则移除较老消息,非精确,实际上略大于该值,默认100万
	MaxRetry      int64  // 最大重试次数。超过该次数后,消息将被抛弃,默认10次
	BlockTime     int64  // 异步消费时的阻塞时间。默认15秒
	StartId       string // 开始编号。独立消费时使用,消费组消费时不使用,默认0-0
	Group         string // 消费者组。指定消费组后,不再使用独立消费。通过SetGroup可自动创建消费组

	FromLastOffset bool // 首次消费时的消费策略  默认值false,表示从头部开始消费,等同于RocketMQ/Java版的CONSUME_FROM_FIRST_OFFSET  一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费。
	// contains filtered or unexported fields
}

func New added in v0.3.1

func New(client cache.ICache, key string, logger glog.ILogger) *RedisStream

func (*RedisStream) Ack

func (r *RedisStream) Ack(group string, id string) int64

Ack 确认消息 group 消费组名称 id 消息Id

func (*RedisStream) Acknowledge

func (r *RedisStream) Acknowledge(keys ...string) int64

Acknowledge 消费确认

func (*RedisStream) Add

func (r *RedisStream) Add(value interface{}, msgId ...string) string

Add 生产添加

func (*RedisStream) AddInternal

func (r *RedisStream) AddInternal(value interface{}, msgId string, trim bool, retryOnFailed bool) string

func (*RedisStream) Adds

func (r *RedisStream) Adds(values []interface{}) int

Adds 批量生产添加

func (*RedisStream) Claim

func (r *RedisStream) Claim(group string, consumer string, id string, msIdle int64) []redis.XMessage

Claim 改变待处理消息的所有权,抢夺他人未确认消息 group 消费组名称 consumer 目标消费者 id 消息Id msIdle 空闲时间。默认3600_000

func (*RedisStream) ConsumeBlock

func (r *RedisStream) ConsumeBlock(ctx context.Context, OnMessage func(msg []redis.XMessage) bool)

ConsumeBlock 队列消费大循环,处理消息后自动确认 ctx context.Context OnMessage msg 消费消息信息 returns 返回True确认消费Acknowledge 返回False不确认消费Acknowledge 等待二次消费

func (*RedisStream) Count

func (r *RedisStream) Count() int64

Count 个数

func (*RedisStream) Delete

func (r *RedisStream) Delete(id ...string) int64

Delete 删除指定消息 id 消息Id

func (*RedisStream) GetConsumers

func (r *RedisStream) GetConsumers(group string) []redis.XInfoConsumer

GetConsumers 获取消费者 group 消费组

func (*RedisStream) GetGroups

func (r *RedisStream) GetGroups() []redis.XInfoGroup

GetGroups 获取消费分组信息

func (*RedisStream) GetInfo

func (r *RedisStream) GetInfo() *redis.XInfoStream

GetInfo 队列信息

func (*RedisStream) GetPending

func (r *RedisStream) GetPending(group string) *redis.XPending

GetPending 获取等待列表 group 消费组名称信息

func (*RedisStream) GroupCreate

func (r *RedisStream) GroupCreate(group string, startIds ...string) bool

GroupCreate 创建消费组 group 消费组名称开始编号。 startIds 0表示从开头,$表示从末尾,收到下一条生产消息才开始消费 stream不存在,则会报错,所以在后面 加上 MkStream

func (*RedisStream) GroupDeleteConsumer

func (r *RedisStream) GroupDeleteConsumer(group string, consumer string) int64

GroupDeleteConsumer 销毁消费者 group 消费组名称 consumer 消费者 returns 返回消费者在被删除之前所拥有的待处理消息数量

func (*RedisStream) GroupDestroy

func (r *RedisStream) GroupDestroy(group string) int64

GroupDestroy 销毁消费组 group 消费组名称

func (*RedisStream) GroupSetId

func (r *RedisStream) GroupSetId(group string, startId string) bool

GroupSetId 设置消费组Id group 消费组名称 startId 开始编号

func (*RedisStream) IsEmpty

func (r *RedisStream) IsEmpty() bool

IsEmpty 集合是否为空

func (*RedisStream) Pending

func (r *RedisStream) Pending(group string, startId string, endId string, count ...int64) []redis.XPendingExt

Pending 获取等待列表消息 group 消费组名称

func (*RedisStream) Range

func (r *RedisStream) Range(startId string, endId string, count ...int64) []redis.XMessage

Range 获取区间消息

func (*RedisStream) RangeTimeSpan

func (r *RedisStream) RangeTimeSpan(start int64, end int64, count ...int64) []redis.XMessage

RangeTimeSpan 获取区间消息

func (*RedisStream) Read

func (r *RedisStream) Read(startId string, count int64) []redis.XMessage

Read 原始独立消费 startId 开始编号 特殊的$,表示接收从阻塞那一刻开始添加到流的消息 count 消息个数

func (*RedisStream) ReadBlock

func (r *RedisStream) ReadBlock(startId string, count int64, block int64) []redis.XMessage

ReadBlock 原始独立消费 startId 开始编号 特殊的$,表示接收从阻塞那一刻开始添加到流的消息 count 消息个数 block 阻塞毫秒数,0表示永远

func (*RedisStream) ReadGroup

func (r *RedisStream) ReadGroup(group string, consumer string, count int64) []redis.XMessage

ReadGroup 消费组消费 group 消费组名称 consumer 消费组 count 消息个数

func (*RedisStream) ReadGroupBlock

func (r *RedisStream) ReadGroupBlock(group string, consumer string, count int64, block int64, id ...string) []redis.XMessage

ReadGroupBlock 消费组消费 group 消费组 consumer 消费组 count 消息个数 block 阻塞毫秒数,0表示永远 id 消息id

func (*RedisStream) RetryAck

func (r *RedisStream) RetryAck() int

RetryAck 处理未确认的死信,重新放入队列

func (*RedisStream) SetGroup

func (r *RedisStream) SetGroup(group string) bool

SetGroup 设置消费组。如果消费组不存在则创建

func (*RedisStream) SetNextId

func (r *RedisStream) SetNextId(id string)

func (*RedisStream) Take

func (r *RedisStream) Take(count int64) []redis.XMessage

Take 批量消费获取,前移指针StartId

func (*RedisStream) TakeMessageBlock

func (r *RedisStream) TakeMessageBlock(count int64, timeout int64) []redis.XMessage

TakeMessageBlock 异步消费获取一个 timeout 超时时间,默认0秒永远阻塞

func (*RedisStream) TakeOne

func (r *RedisStream) TakeOne() []redis.XMessage

TakeOne 消费获取一个

func (*RedisStream) TakeOneBlock

func (r *RedisStream) TakeOneBlock(ctx context.Context, timeout int64) []redis.XMessage

TakeOneBlock 异步消费获取一个

func (*RedisStream) Trim

func (r *RedisStream) Trim(maxLen int64) int64

Trim 裁剪队列到指定大小 maxLen 最大长度。为了提高效率,最大长度并没有那么精准

type StreamQueue added in v0.3.1

type StreamQueue struct {
	// contains filtered or unexported fields
}

func NewStreamQueue added in v0.3.1

func NewStreamQueue(client cache.ICache, logger glog.ILogger) *StreamQueue

func (*StreamQueue) GetStreamQueue added in v0.3.1

func (r *StreamQueue) GetStreamQueue(topic string) *RedisStream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL