Documentation
¶
Overview ¶
Package stream对象
streamhelper 满足pchelper规定的生产者和消费者接口,同时提供stream管理对象
Index ¶
- Variables
- func SerializeWithJSON() optparams.Option[Options]
- func SerializeWithMsgpack() optparams.Option[Options]
- func WithAutocreate() optparams.Option[createGroupOpt]
- func WithBlockTime(d time.Duration) optparams.Option[Options]
- func WithClientID(clientID string) optparams.Option[Options]
- func WithConsumerAckMode(ack AckModeType) optparams.Option[Options]
- func WithConsumerDefaultStartAt(t time.Time) optparams.Option[Options]
- func WithConsumerDefaultStartEarliest() optparams.Option[Options]
- func WithConsumerDefaultStartLatest() optparams.Option[Options]
- func WithConsumerDefaultStartPosition(flag string) optparams.Option[Options]
- func WithConsumerGroupName(groupname string) optparams.Option[Options]
- func WithConsumerRecvBatchSize(size int64) optparams.Option[Options]
- func WithProducerDefaultMaxLen(n int64) optparams.Option[Options]
- func WithProducerDefaultStrict() optparams.Option[Options]
- func WithStartAtID(id string) optparams.Option[createGroupOpt]
- func WithStartEarliest() optparams.Option[createGroupOpt]
- func WithStartLatest() optparams.Option[createGroupOpt]
- func WithStrict() optparams.Option[trimOpt]
- func WithUUIDSnowflake() optparams.Option[Options]
- func WithUUIDSonyflake() optparams.Option[Options]
- func WithUUIDv4() optparams.Option[Options]
- type AckModeType
- type Consumer
- type Options
- type Producer
- type Stream
- func (s *Stream) Ack(ctx context.Context, groupname string, ids ...string) error
- func (s *Stream) CreateGroup(ctx context.Context, groupname string, ...) (string, error)
- func (s *Stream) Delete(ctx context.Context, ids ...string) error
- func (s *Stream) DeleteConsumerFromGroup(ctx context.Context, groupname, consumername string) (int64, error)
- func (s *Stream) DeleteGroup(ctx context.Context, groupname string) (int64, error)
- func (s *Stream) GroupInfos(ctx context.Context) ([]redis.XInfoGroup, error)
- func (s *Stream) HasGroup(ctx context.Context, groupname string) (bool, error)
- func (s *Stream) HasGroups(ctx context.Context, groupnames ...string) (bool, error)
- func (s *Stream) Len(ctx context.Context) (int64, error)
- func (s *Stream) Move(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ...) ([]redis.XMessage, error)
- func (s *Stream) MoveJustID(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ...) ([]string, error)
- func (s *Stream) Pending(ctx context.Context, groupname string) (*redis.XPending, error)
- func (s *Stream) Range(ctx context.Context, start, stop string) ([]redis.XMessage, error)
- func (s *Stream) SetGroupStartAt(ctx context.Context, groupname, start string) (string, error)
- func (s *Stream) Trim(ctx context.Context, count int64, opts ...optparams.Option[trimOpt]) (int64, error)
Constants ¶
This section is empty.
Variables ¶
var ErrStreamConsumerAlreadyListened = errors.New("stream already listened")
ErrStreamConsumerAlreadyListened 流已经被监听了
var ErrStreamConsumerNotListeningYet = errors.New("stream not listening yet")
ErrStreamConsumerNotListeningYet 流未被监听
var ErrStreamNeedToPointOutGroups = errors.New("stream need to point out groups")
ErrStreamNeedToPointOutGroups 流操作需要指定消费者组
Functions ¶
func SerializeWithJSON ¶
SerializeWithJSON 使用JSON作为序列化反序列化的协议
func SerializeWithMsgpack ¶
SerializeWithMsgpack 使用JSON作为序列化反序列化的协议
func WithAutocreate ¶
WithAutocreate CreateGroup方法的参数,用于设置是否自动创建流
func WithBlockTime ¶
WithBlockTime 设置客户端阻塞等待消息的时长
func WithClientID ¶
WithClientID 中间件通用设置,设置客户端id
func WithConsumerAckMode ¶
func WithConsumerAckMode(ack AckModeType) optparams.Option[Options]
WithConsumerAckMode stream消费者专用,用于设定同步校验规则
func WithConsumerDefaultStartAt ¶
WithConsumerDefaultStartAt stream消费者专用,用于设定默认消费起始时间
func WithConsumerDefaultStartEarliest ¶
WithConsumerDefaultStartEarliest stream消费者专用,用于设定默认消费从最早的消息开始
func WithConsumerDefaultStartLatest ¶
WithConsumerDefaultStartLatest stream消费者专用,用于设定默认消费从最新的消息开始
func WithConsumerDefaultStartPosition ¶
WithConsumerDefaultStartPosition stream消费者专用,用于设定默认消费起始位置,不设置则group设置为`$`,否则设置为`>`
func WithConsumerGroupName ¶
WithConsumerGroupName stream消费者专用,用于设定客户端组
func WithConsumerRecvBatchSize ¶
WithConsumerRecvBatchSize stream消费者专用,用于设定一次获取的消息批长度
func WithProducerDefaultMaxLen ¶
WithProducerDefaultMaxLen stream生产者专用,用于设置流的默认最长长度
func WithProducerDefaultStrict ¶
WithProducerStrict stream生产者专用,用于设置流是否为严格模式
func WithStartAtID ¶
WithStartLatest CreateGroup方法的参数,用于设置创建流后流的默认读取位置为指定id
func WithStartEarliest ¶
WithStartEarliest CreateGroup方法的参数,用于设置创建流后流的默认读取位置为最早数据
func WithStartLatest ¶
WithStartLatest CreateGroup方法的参数,用于设置创建流后流的默认读取位置为最新数据
func WithUUIDSnowflake ¶
WithUUIDSnowflake 使用snowflake作为uuid的生成器
func WithUUIDSonyflake ¶
WithUUIDSonyflake 使用sonyflake作为uuid的生成器
Types ¶
type AckModeType ¶
type AckModeType uint8
AckModeType stream的Ack模式
const ( //AckModeAckWhenGet 获取到后确认 AckModeAckWhenGet AckModeType = iota //AckModeAckWhenDone 处理完后确认 AckModeAckWhenDone //AckModeNoAck 不做确认,消费者需要自己实现ack操作,最好别这么用 AckModeNoAck )
type Consumer ¶
type Consumer struct { *clientIdhelper.ClientIDAbc *pchelper.ConsumerABC TopicInfos map[string]string // contains filtered or unexported fields }
Consumer 流消费者对象
func NewConsumer ¶
NewConsumer 创建一个指向多个流的消费者 默认一批获取一个消息,可以通过`WithStreamComsumerRecvBatchSize`设置批的大小 如果使用`WithStreamComsumerGroupName`设定了group,则按组消费(默认总组监听的最新位置开始监听,收到消息后确认,消息确认策略可以通过`WithStreamComsumerAckMode`配置), 否则按按单独客户端消费(默认从开始监听的时刻开始消费) 需要注意单独客户端消费不会记录消费的偏移量,因此很容易丢失下次请求时的结果.解决方法是第一次使用`$`,后面则需要记录下id号从它开始 @params cli redis.UniversalClient redis客户端对象 @params opts ...optparams.Option[pchelper.Options] 消费者的配置
func (*Consumer) Get ¶
Get 从多个流中取出数据 @params ctx context.Context 请求的上下文 @params timeout time.Duration 等待超时时间,为0则表示一直阻塞直到有数据
type Options ¶
type Options struct { BlockTime time.Duration //queue结构使用的参数,用于设置每次拉取的阻塞时长 RecvBatchSize int64 //stream消费者专用,用于设定一次获取的消息批长度 Group string //stream消费者专用,用于设定客户端组 AckMode AckModeType //stream消费者专用,用于设定同步校验规则 DefaultStart string //stream消费者专用,用于设定默认的监听的起始位置 DefaultMaxLen int64 //stream生产者专用,用于设置流的默认最长长度 DefaultStrict bool //stream生产者专用,用于设置流是否为严格模式 ProducerConsumerOpts []optparams.Option[pchelper.Options] //初始化pchelper的配置 ClientIDOpts []optparams.Option[clientIdhelper.Options] //初始化ClientID的配置 }
type Producer ¶
type Producer struct { *pchelper.ProducerConsumerABC *clientIdhelper.ClientIDAbc // contains filtered or unexported fields }
Producer 流的生产者对象
func NewProducer ¶
NewProducer 创建一个新的流生产者 @params k *clientkey.ClientKey redis客户端的键对象 @params opts ...broker.Option 生产者的配置
func (*Producer) PubEvent ¶
func (p *Producer) PubEvent(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[pchelper.PublishOptions]) (*pchelper.Event, error)
PubEvent 向流中放入事件数据 @params ctx context.Context 请求的上下文 @params payload []byte 发送的消息负载 @returns *event.Event 发送出去的消息对象
func (*Producer) Publish ¶
func (p *Producer) Publish(ctx context.Context, topic string, payload interface{}, opts ...optparams.Option[pchelper.PublishOptions]) error
Publish 向流中放入数据 @params ctx context.Context 请求的上下文 @params payload interface{} 发送的消息负载,负载如果不是map[string]interface{}形式或者可以被json/msgpack序列化的对象则统一以[value 值]的形式传出
type Stream ¶
type Stream struct { Name string // contains filtered or unexported fields }
Stream 流对象
func NewStream ¶
NewStream 创建一个新的流对象 @params cli redis.UniversalClient redis客户端的对象 @params name string 流名
func (*Stream) Ack ¶
Ack 手工确认组已经消耗了消息 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params ids ...string 确认被消耗的id列表
func (*Stream) CreateGroup ¶
func (s *Stream) CreateGroup(ctx context.Context, groupname string, opts ...optparams.Option[createGroupOpt]) (string, error)
CreateGroup 为指定消费者在指定的Stream上创建消费者组,创建流后流的默认读取位置为最新数据 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表
func (*Stream) Delete ¶
Delete 设置标志位标识删除流中指定id的数据 @params ctx context.Context 上下文信息,用于控制请求的结束 @params ids ...string 要删除的消息id列表
func (*Stream) DeleteConsumerFromGroup ¶
func (s *Stream) DeleteConsumerFromGroup(ctx context.Context, groupname, consumername string) (int64, error)
DeleteConsumerFromGroup 在指定的Stream上删除指定消费者组中的指定消费者 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params groupname string 消费者组名列表
func (*Stream) DeleteGroup ¶
DeleteGroup 为指定消费者在指定的Stream上删除消费者组 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表
func (*Stream) GroupInfos ¶
GroupInfos 获取主题流中注册的消费者组信息 @params ctx context.Context 上下文信息,用于控制请求的结束
func (*Stream) HasGroup ¶
HasGroup 判断消费者组是否在Stream上 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名
func (*Stream) HasGroups ¶
HasGroups 判断消费者组是否都在在Stream上 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupnames ...string 消费者组名列表
func (*Stream) Move ¶
func (s *Stream) Move(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ids ...string) ([]redis.XMessage, error)
Move 转移消息的所有权给用户组中的某个用户 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params toconsumer string 要转移给所有权的消费者 @params minIdle time.Duration 被转移出去的消息等待时间最小值,为0则时间计数被重置 @params ids ...string 要转移所有权的消息id
func (*Stream) MoveJustID ¶
func (s *Stream) MoveJustID(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ids ...string) ([]string, error)
MoveJustID 转移消息的所有权给用户组中的某个用户 这个命令和Move区别在于返回值只有消息id @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params toconsumer string 要转移给所有权的消费者 @params minIdle time.Duration 被转移出去的消息等待时间最小值,为0则时间计数被重置 @params ids ...string 要转移所有权的消息id
func (*Stream) Pending ¶
Pending 查看消费组中等待确认的消息列表 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表
func (*Stream) Range ¶
Range 获取消息列表,会自动过滤已经删除的消息 @params ctx context.Context 请求的上下文 @params start string 开始位置,`-`表示最小值, `+`表示最大值,可以指定毫秒级时间戳,也可以指定特定消息id @params stop string 结束位置,`-`表示最小值, `+`表示最大值,可以指定毫秒级时间戳,也可以指定特定消息id
func (*Stream) SetGroupStartAt ¶
SetGroupStartAt 设置指定消费者组在主题流中的读取起始位置 @params ctx context.Context 上下文信息,用于控制请求的结束 @params groupname string 消费者组名列表 @params start string 开始位置