Documentation ¶
Index ¶
- type RedisStream
- func (r *RedisStream) Ack(group string, id string) int64
- func (r *RedisStream) Acknowledge(keys ...string) int64
- func (r *RedisStream) Add(value interface{}, msgId ...string) string
- func (r *RedisStream) AddInternal(value interface{}, msgId string, trim bool, retryOnFailed bool) string
- func (r *RedisStream) Adds(values []interface{}) int
- func (r *RedisStream) Claim(group string, consumer string, id string, msIdle int64) []redis.XMessage
- func (r *RedisStream) ConsumeBlock(ctx context.Context, OnMessage func(msg []redis.XMessage) bool)
- func (r *RedisStream) Count() int64
- func (r *RedisStream) Delete(id ...string) int64
- func (r *RedisStream) GetConsumers(group string) []redis.XInfoConsumer
- func (r *RedisStream) GetGroups() []redis.XInfoGroup
- func (r *RedisStream) GetInfo() *redis.XInfoStream
- func (r *RedisStream) GetPending(group string) *redis.XPending
- func (r *RedisStream) GroupCreate(group string, startIds ...string) bool
- func (r *RedisStream) GroupDeleteConsumer(group string, consumer string) int64
- func (r *RedisStream) GroupDestroy(group string) int64
- func (r *RedisStream) GroupSetId(group string, startId string) bool
- func (r *RedisStream) IsEmpty() bool
- func (r *RedisStream) Pending(group string, startId string, endId string, count ...int64) []redis.XPendingExt
- func (r *RedisStream) Range(startId string, endId string, count ...int64) []redis.XMessage
- func (r *RedisStream) RangeTimeSpan(start int64, end int64, count ...int64) []redis.XMessage
- func (r *RedisStream) Read(startId string, count int64) []redis.XMessage
- func (r *RedisStream) ReadBlock(startId string, count int64, block int64) []redis.XMessage
- func (r *RedisStream) ReadGroup(group string, consumer string, count int64) []redis.XMessage
- func (r *RedisStream) ReadGroupBlock(group string, consumer string, count int64, block int64, id ...string) []redis.XMessage
- func (r *RedisStream) RetryAck()
- func (r *RedisStream) SetGroup(group string) bool
- func (r *RedisStream) SetNextId(id string)
- func (r *RedisStream) Take(count int64) []redis.XMessage
- func (r *RedisStream) TakeMessageBlock(count int64, timeout int64) []redis.XMessage
- func (r *RedisStream) TakeOne() []redis.XMessage
- func (r *RedisStream) TakeOneBlock(ctx context.Context, timeout int64) []redis.XMessage
- func (r *RedisStream) Trim(maxLen int64) int64
- type StreamQueue
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type RedisStream ¶
type RedisStream struct { DB int // redis DB 默认为 0 Topic string // 消息队列主题 ThrowOnFailure bool // 失败时抛出异常。默认false RetryTimesWhenSendFailed int // 发送消息失败时的重试次数。默认3次 RetryIntervalWhenSendFailed int // 重试间隔。默认1000ms RetryInterval int64 // 重新处理确认队列中死信的间隔。默认60s MaxLength int64 // 最大队列长度。要保留的消息个数,超过则移除较老消息,非精确,实际上略大于该值,默认100万 MaxRetry int64 // 最大重试次数。超过该次数后,消息将被抛弃,默认10次 BlockTime int64 // 异步消费时的阻塞时间。默认15秒 StartId string // 开始编号。独立消费时使用,消费组消费时不使用,默认0-0 Group string // 消费者组。指定消费组后,不再使用独立消费。通过SetGroup可自动创建消费组 FromLastOffset bool // 首次消费时的消费策略 默认值false,表示从头部开始消费,等同于RocketMQ/Java版的CONSUME_FROM_FIRST_OFFSET 一个新的订阅组第一次启动从队列的最前位置开始消费,后续再启动接着上次消费的进度开始消费。 // contains filtered or unexported fields }
func (*RedisStream) Ack ¶
func (r *RedisStream) Ack(group string, id string) int64
Ack 确认消息 group 消费组名称 id 消息Id
func (*RedisStream) Acknowledge ¶
func (r *RedisStream) Acknowledge(keys ...string) int64
Acknowledge 消费确认
func (*RedisStream) Add ¶
func (r *RedisStream) Add(value interface{}, msgId ...string) string
Add 生产添加
func (*RedisStream) AddInternal ¶
func (r *RedisStream) AddInternal(value interface{}, msgId string, trim bool, retryOnFailed bool) string
func (*RedisStream) Claim ¶
func (r *RedisStream) Claim(group string, consumer string, id string, msIdle int64) []redis.XMessage
Claim 改变待处理消息的所有权,抢夺他人未确认消息 group 消费组名称 consumer 目标消费者 id 消息Id msIdle 空闲时间。默认3600_000
func (*RedisStream) ConsumeBlock ¶
func (r *RedisStream) ConsumeBlock(ctx context.Context, OnMessage func(msg []redis.XMessage) bool)
ConsumeBlock 队列消费大循环,处理消息后自动确认 ctx context.Context OnMessage msg 消费消息信息 returns 返回True确认消费Acknowledge 返回False不确认消费Acknowledge 等待二次消费
func (*RedisStream) GetConsumers ¶
func (r *RedisStream) GetConsumers(group string) []redis.XInfoConsumer
GetConsumers 获取消费者 group 消费组
func (*RedisStream) GetGroups ¶
func (r *RedisStream) GetGroups() []redis.XInfoGroup
GetGroups 获取消费分组信息
func (*RedisStream) GetPending ¶
func (r *RedisStream) GetPending(group string) *redis.XPending
GetPending 获取等待列表 group 消费组名称信息
func (*RedisStream) GroupCreate ¶
func (r *RedisStream) GroupCreate(group string, startIds ...string) bool
GroupCreate 创建消费组 group 消费组名称开始编号。 startIds 0表示从开头,$表示从末尾,收到下一条生产消息才开始消费 stream不存在,则会报错,所以在后面 加上 MkStream
func (*RedisStream) GroupDeleteConsumer ¶
func (r *RedisStream) GroupDeleteConsumer(group string, consumer string) int64
GroupDeleteConsumer 销毁消费者 group 消费组名称 consumer 消费者 returns 返回消费者在被删除之前所拥有的待处理消息数量
func (*RedisStream) GroupDestroy ¶
func (r *RedisStream) GroupDestroy(group string) int64
GroupDestroy 销毁消费组 group 消费组名称
func (*RedisStream) GroupSetId ¶
func (r *RedisStream) GroupSetId(group string, startId string) bool
GroupSetId 设置消费组Id group 消费组名称 startId 开始编号
func (*RedisStream) Pending ¶
func (r *RedisStream) Pending(group string, startId string, endId string, count ...int64) []redis.XPendingExt
Pending 获取等待列表消息 group 消费组名称
func (*RedisStream) Range ¶
func (r *RedisStream) Range(startId string, endId string, count ...int64) []redis.XMessage
Range 获取区间消息
func (*RedisStream) RangeTimeSpan ¶
func (r *RedisStream) RangeTimeSpan(start int64, end int64, count ...int64) []redis.XMessage
RangeTimeSpan 获取区间消息
func (*RedisStream) Read ¶
func (r *RedisStream) Read(startId string, count int64) []redis.XMessage
Read 原始独立消费 startId 开始编号 特殊的$,表示接收从阻塞那一刻开始添加到流的消息 count 消息个数
func (*RedisStream) ReadBlock ¶
func (r *RedisStream) ReadBlock(startId string, count int64, block int64) []redis.XMessage
ReadBlock 原始独立消费 startId 开始编号 特殊的$,表示接收从阻塞那一刻开始添加到流的消息 count 消息个数 block 阻塞毫秒数,0表示永远
func (*RedisStream) ReadGroup ¶
func (r *RedisStream) ReadGroup(group string, consumer string, count int64) []redis.XMessage
ReadGroup 消费组消费 group 消费组名称 consumer 消费组 count 消息个数
func (*RedisStream) ReadGroupBlock ¶
func (r *RedisStream) ReadGroupBlock(group string, consumer string, count int64, block int64, id ...string) []redis.XMessage
ReadGroupBlock 消费组消费 group 消费组 consumer 消费组 count 消息个数 block 阻塞毫秒数,0表示永远 id 消息id
func (*RedisStream) SetGroup ¶
func (r *RedisStream) SetGroup(group string) bool
SetGroup 设置消费组。如果消费组不存在则创建
func (*RedisStream) SetNextId ¶
func (r *RedisStream) SetNextId(id string)
func (*RedisStream) Take ¶
func (r *RedisStream) Take(count int64) []redis.XMessage
Take 批量消费获取,前移指针StartId
func (*RedisStream) TakeMessageBlock ¶
func (r *RedisStream) TakeMessageBlock(count int64, timeout int64) []redis.XMessage
TakeMessageBlock 异步消费获取一个 timeout 超时时间,默认0秒永远阻塞
func (*RedisStream) TakeOneBlock ¶
func (r *RedisStream) TakeOneBlock(ctx context.Context, timeout int64) []redis.XMessage
TakeOneBlock 异步消费获取一个
func (*RedisStream) Trim ¶
func (r *RedisStream) Trim(maxLen int64) int64
Trim 裁剪队列到指定大小 maxLen 最大长度。为了提高效率,最大长度并没有那么精准
type StreamQueue ¶ added in v0.3.1
type StreamQueue struct {
// contains filtered or unexported fields
}
func NewStreamQueue ¶ added in v0.3.1
func NewStreamQueue(client cache.ICache, logger glog.ILogger) *StreamQueue
func (*StreamQueue) GetStreamQueue ¶ added in v0.3.1
func (r *StreamQueue) GetStreamQueue(topic string) *RedisStream