Documentation ¶
Index ¶
- Constants
- type ConsumerInfo
- type GroupInfo
- type PendingMsgInfo
- type RedisConnOpt
- type RedisStreamMQClient
- func (mqClient *RedisStreamMQClient) ConvertVecInterface(vecReply []interface{}) (msgMap map[string]map[string][]string)
- func (mqClient *RedisStreamMQClient) CreateConsumer(streamKey string, groupName string, consumerName string) error
- func (mqClient *RedisStreamMQClient) CreateConsumerGroup(streamKey string, groupName string, beginMsgId string) error
- func (mqClient *RedisStreamMQClient) DelConsumer(streamKey string, groupName string, consumerName string) error
- func (mqClient *RedisStreamMQClient) DelDeadMsg(streamKey string, groupName string, vecMsgId []string) error
- func (mqClient *RedisStreamMQClient) DelMsg(streamKey string, vecMsgId []string) (err error)
- func (mqClient *RedisStreamMQClient) DestroyConsumerGroup(streamKey string, groupName string) error
- func (mqClient *RedisStreamMQClient) GetMsg(msgAmount int32, streamKey string, beginMsgId string) (msgMap map[string]map[string][]string, err error)
- func (mqClient *RedisStreamMQClient) GetMsgBlock(blockSec int32, msgAmount int32, streamKey string) (msgMap map[string]map[string][]string, err error)
- func (mqClient *RedisStreamMQClient) GetMsgBlockByGroupConsumer(blockSec int32, streamKey string, groupName string, consumerName string, ...) (msgMap map[string]map[string][]string, err error)
- func (mqClient *RedisStreamMQClient) GetMsgByGroupConsumer(streamKey string, groupName string, consumerName string, msgAmount int32) (msgMap map[string]map[string][]string, err error)
- func (mqClient *RedisStreamMQClient) GetPendingList(streamKey string, groupName string, consumerName string, msgAmount int32) (vecPendingMsg []*PendingMsgInfo, err error)
- func (mqClient *RedisStreamMQClient) GetStreamsLen(streamKey string) int
- func (mqClient *RedisStreamMQClient) MonitorConsumerGroupInfo(streamKey string) (groupInfo *GroupInfo)
- func (mqClient *RedisStreamMQClient) MonitorConsumerInfo(streamKey string, groupName string) (vecConsumerInfo []*ConsumerInfo)
- func (mqClient *RedisStreamMQClient) MonitorMqInfo(streamKey string) (streamMQInfo *StreamMQInfo)
- func (mqClient *RedisStreamMQClient) MoveMsg(streamKey string, groupName string, consumerName string, idleTime int, ...) error
- func (mqClient *RedisStreamMQClient) PutMsg(streamKey string, msgKey string, msgValue string) (strMsgId string, err error)
- func (mqClient *RedisStreamMQClient) PutMsgBatch(streamKey string, msgMap map[string]string) (msgId string, err error)
- func (mqClient *RedisStreamMQClient) ReplyAck(streamKey string, groupName string, vecMsgId []string) error
- type StreamMQInfo
Constants ¶
View Source
const ( STREAM_MQ_MAX_LEN = 500000 //消息队列最大长度 READ_MSG_AMOUNT = 1000 //每次读取消息的条数 READ_MSG_BLOCK_SEC = 30 //阻塞读取消息时间 TEST_STREAM_KEY = "TestStreamKey1" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConsumerInfo ¶
消费者信息
type GroupInfo ¶
type GroupInfo struct { Name string // 消费组名称 Consumers int64 // 组内消费者个数 Pending int64 // 组内所有消费者的等待列表总长度 LastDeliveredId string // 组内最后一条被消费的消息ID }
消费组信息
type PendingMsgInfo ¶
type PendingMsgInfo struct { MsgId string //消息ID BelongConsumer string //所属消费者 IdleTime int //已读取未消费时长 ReadCount int //消息被读取次数 }
等待列表中的消息属性
type RedisConnOpt ¶
type RedisStreamMQClient ¶
type RedisStreamMQClient struct { RedisConnOpt RedisConnOpt ConnPool *redis.Pool StreamKey string //stream对应的key值 GroupName string //消费者组名称 ConsumerName string //消费者名称 }
func NewClient ¶
func NewClient(opt RedisConnOpt) *RedisStreamMQClient
func (*RedisStreamMQClient) ConvertVecInterface ¶
func (mqClient *RedisStreamMQClient) ConvertVecInterface(vecReply []interface{}) (msgMap map[string]map[string][]string)
func (*RedisStreamMQClient) CreateConsumer ¶
func (mqClient *RedisStreamMQClient) CreateConsumer(streamKey string, groupName string, consumerName string) error
CreateConsumer 创建消费者
func (*RedisStreamMQClient) CreateConsumerGroup ¶
func (mqClient *RedisStreamMQClient) CreateConsumerGroup(streamKey string, groupName string, beginMsgId string) error
CreateConsumerGroup 创建消费者组
func (*RedisStreamMQClient) DelConsumer ¶
func (mqClient *RedisStreamMQClient) DelConsumer(streamKey string, groupName string, consumerName string) error
DelConsumer 删除消费者
func (*RedisStreamMQClient) DelDeadMsg ¶
func (mqClient *RedisStreamMQClient) DelDeadMsg(streamKey string, groupName string, vecMsgId []string) error
DelDeadMsg 删除不能被消费者处理,也就是不能被 XACK,长时间处于 Pending 列表中的消息
func (*RedisStreamMQClient) DelMsg ¶
func (mqClient *RedisStreamMQClient) DelMsg(streamKey string, vecMsgId []string) (err error)
DelMsg 删除消息
func (*RedisStreamMQClient) DestroyConsumerGroup ¶
func (mqClient *RedisStreamMQClient) DestroyConsumerGroup(streamKey string, groupName string) error
DestroyConsumerGroup 销毁消费者组
func (*RedisStreamMQClient) GetMsg ¶
func (mqClient *RedisStreamMQClient) GetMsg(msgAmount int32, streamKey string, beginMsgId string) ( msgMap map[string]map[string][]string, err error)
GetMsg 非阻塞方式读取消息
func (*RedisStreamMQClient) GetMsgBlock ¶
func (mqClient *RedisStreamMQClient) GetMsgBlock(blockSec int32, msgAmount int32, streamKey string) ( msgMap map[string]map[string][]string, err error)
GetMsgBlock 阻塞方式读取消息
func (*RedisStreamMQClient) GetMsgBlockByGroupConsumer ¶
func (mqClient *RedisStreamMQClient) GetMsgBlockByGroupConsumer(blockSec int32, streamKey string, groupName string, consumerName string, msgAmount int32) (msgMap map[string]map[string][]string, err error)
GetMsgByGroupConsumer 组内消息分配操作,组内每个消费者消费多少消息
func (*RedisStreamMQClient) GetMsgByGroupConsumer ¶
func (mqClient *RedisStreamMQClient) GetMsgByGroupConsumer(streamKey string, groupName string, consumerName string, msgAmount int32) (msgMap map[string]map[string][]string, err error)
GetMsgByGroupConsumer 组内消息分配操作,组内每个消费者消费多少消息
func (*RedisStreamMQClient) GetPendingList ¶
func (mqClient *RedisStreamMQClient) GetPendingList(streamKey string, groupName string, consumerName string, msgAmount int32) ( vecPendingMsg []*PendingMsgInfo, err error)
GetPendingList 获取等待列表(读取但还未消费的消息)
func (*RedisStreamMQClient) GetStreamsLen ¶
func (mqClient *RedisStreamMQClient) GetStreamsLen(streamKey string) int
GetStreamsLen 获取消息队列的长度,消息消费之后会做标记,不会删除
func (*RedisStreamMQClient) MonitorConsumerGroupInfo ¶
func (mqClient *RedisStreamMQClient) MonitorConsumerGroupInfo(streamKey string) (groupInfo *GroupInfo)
MonitorConsumerGroupInfo 监控消费者组信息
func (*RedisStreamMQClient) MonitorConsumerInfo ¶
func (mqClient *RedisStreamMQClient) MonitorConsumerInfo(streamKey string, groupName string) (vecConsumerInfo []*ConsumerInfo)
MonitorConsumerInfo 监控消费者信息
func (*RedisStreamMQClient) MonitorMqInfo ¶
func (mqClient *RedisStreamMQClient) MonitorMqInfo(streamKey string) (streamMQInfo *StreamMQInfo)
MonitorMqInfo 监控服务器队列信息
func (*RedisStreamMQClient) MoveMsg ¶
func (mqClient *RedisStreamMQClient) MoveMsg(streamKey string, groupName string, consumerName string, idleTime int, vecMsgId []string) error
MoveMsg 转移消息到其他等待列表中
func (*RedisStreamMQClient) PutMsg ¶
func (mqClient *RedisStreamMQClient) PutMsg(streamKey string, msgKey string, msgValue string) (strMsgId string, err error)
PutMsg 添加消息
func (*RedisStreamMQClient) PutMsgBatch ¶
func (mqClient *RedisStreamMQClient) PutMsgBatch(streamKey string, msgMap map[string]string) (msgId string, err error)
PutMsgBatch 批量添加消息
type StreamMQInfo ¶
type StreamMQInfo struct { Length int64 // 消息队列长度 RedixTreeKeys int64 // 基数树key数 RedixTreeNodes int64 // 基数树节点数 LastGeneratedId string // 最后一个生成的消息ID Groups int64 // 消费组个数 FirstEntry *map[string]map[string]string // 第一个消息体 LastEntry *map[string]map[string]string // 最后一个消息体 }
消息队列信息
Click to show internal directories.
Click to hide internal directories.