common

package
v0.0.0-...-243e8fd Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2022 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

View Source
const (
	STREAM_MQ_MAX_LEN  = 500000 //消息队列最大长度
	READ_MSG_AMOUNT    = 1000   //每次读取消息的条数
	READ_MSG_BLOCK_SEC = 30     //阻塞读取消息时间
	TEST_STREAM_KEY    = "TestStreamKey1"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ConsumerInfo

type ConsumerInfo struct {
	Name    string // 消费者名称
	Pending int64  // 等待队列长度
	Idle    int64  // 消费者空闲时间(毫秒)
}

消费者信息

type GroupInfo

type GroupInfo struct {
	Name            string // 消费组名称
	Consumers       int64  // 组内消费者个数
	Pending         int64  // 组内所有消费者的等待列表总长度
	LastDeliveredId string // 组内最后一条被消费的消息ID
}

消费组信息

type PendingMsgInfo

type PendingMsgInfo struct {
	MsgId          string //消息ID
	BelongConsumer string //所属消费者
	IdleTime       int    //已读取未消费时长
	ReadCount      int    //消息被读取次数
}

等待列表中的消息属性

type RedisConnOpt

type RedisConnOpt struct {
	Enable   bool
	Host     string
	Port     int32
	Password string
	Index    int32
	TTL      int32
}

type RedisStreamMQClient

type RedisStreamMQClient struct {
	RedisConnOpt RedisConnOpt
	ConnPool     *redis.Pool
	StreamKey    string //stream对应的key值
	GroupName    string //消费者组名称
	ConsumerName string //消费者名称
}

func NewClient

func NewClient(opt RedisConnOpt) *RedisStreamMQClient

func (*RedisStreamMQClient) ConvertVecInterface

func (mqClient *RedisStreamMQClient) ConvertVecInterface(vecReply []interface{}) (msgMap map[string]map[string][]string)

func (*RedisStreamMQClient) CreateConsumer

func (mqClient *RedisStreamMQClient) CreateConsumer(streamKey string, groupName string, consumerName string) error

CreateConsumer 创建消费者

func (*RedisStreamMQClient) CreateConsumerGroup

func (mqClient *RedisStreamMQClient) CreateConsumerGroup(streamKey string, groupName string, beginMsgId string) error

CreateConsumerGroup 创建消费者组

func (*RedisStreamMQClient) DelConsumer

func (mqClient *RedisStreamMQClient) DelConsumer(streamKey string, groupName string, consumerName string) error

DelConsumer 删除消费者

func (*RedisStreamMQClient) DelDeadMsg

func (mqClient *RedisStreamMQClient) DelDeadMsg(streamKey string, groupName string, vecMsgId []string) error

DelDeadMsg 删除不能被消费者处理,也就是不能被 XACK,长时间处于 Pending 列表中的消息

func (*RedisStreamMQClient) DelMsg

func (mqClient *RedisStreamMQClient) DelMsg(streamKey string, vecMsgId []string) (err error)

DelMsg 删除消息

func (*RedisStreamMQClient) DestroyConsumerGroup

func (mqClient *RedisStreamMQClient) DestroyConsumerGroup(streamKey string, groupName string) error

DestroyConsumerGroup 销毁消费者组

func (*RedisStreamMQClient) GetMsg

func (mqClient *RedisStreamMQClient) GetMsg(msgAmount int32, streamKey string, beginMsgId string) (
	msgMap map[string]map[string][]string, err error)

GetMsg 非阻塞方式读取消息

func (*RedisStreamMQClient) GetMsgBlock

func (mqClient *RedisStreamMQClient) GetMsgBlock(blockSec int32, msgAmount int32, streamKey string) (
	msgMap map[string]map[string][]string, err error)

GetMsgBlock 阻塞方式读取消息

func (*RedisStreamMQClient) GetMsgBlockByGroupConsumer

func (mqClient *RedisStreamMQClient) GetMsgBlockByGroupConsumer(blockSec int32, streamKey string, groupName string,
	consumerName string, msgAmount int32) (msgMap map[string]map[string][]string, err error)

GetMsgByGroupConsumer 组内消息分配操作,组内每个消费者消费多少消息

func (*RedisStreamMQClient) GetMsgByGroupConsumer

func (mqClient *RedisStreamMQClient) GetMsgByGroupConsumer(streamKey string, groupName string,
	consumerName string, msgAmount int32) (msgMap map[string]map[string][]string, err error)

GetMsgByGroupConsumer 组内消息分配操作,组内每个消费者消费多少消息

func (*RedisStreamMQClient) GetPendingList

func (mqClient *RedisStreamMQClient) GetPendingList(streamKey string, groupName string, consumerName string, msgAmount int32) (
	vecPendingMsg []*PendingMsgInfo, err error)

GetPendingList 获取等待列表(读取但还未消费的消息)

func (*RedisStreamMQClient) GetStreamsLen

func (mqClient *RedisStreamMQClient) GetStreamsLen(streamKey string) int

GetStreamsLen 获取消息队列的长度,消息消费之后会做标记,不会删除

func (*RedisStreamMQClient) MonitorConsumerGroupInfo

func (mqClient *RedisStreamMQClient) MonitorConsumerGroupInfo(streamKey string) (groupInfo *GroupInfo)

MonitorConsumerGroupInfo 监控消费者组信息

func (*RedisStreamMQClient) MonitorConsumerInfo

func (mqClient *RedisStreamMQClient) MonitorConsumerInfo(streamKey string, groupName string) (vecConsumerInfo []*ConsumerInfo)

MonitorConsumerInfo 监控消费者信息

func (*RedisStreamMQClient) MonitorMqInfo

func (mqClient *RedisStreamMQClient) MonitorMqInfo(streamKey string) (streamMQInfo *StreamMQInfo)

MonitorMqInfo 监控服务器队列信息

func (*RedisStreamMQClient) MoveMsg

func (mqClient *RedisStreamMQClient) MoveMsg(streamKey string, groupName string,
	consumerName string, idleTime int, vecMsgId []string) error

MoveMsg 转移消息到其他等待列表中

func (*RedisStreamMQClient) PutMsg

func (mqClient *RedisStreamMQClient) PutMsg(streamKey string, msgKey string, msgValue string) (strMsgId string, err error)

PutMsg 添加消息

func (*RedisStreamMQClient) PutMsgBatch

func (mqClient *RedisStreamMQClient) PutMsgBatch(streamKey string, msgMap map[string]string) (msgId string, err error)

PutMsgBatch 批量添加消息

func (*RedisStreamMQClient) ReplyAck

func (mqClient *RedisStreamMQClient) ReplyAck(streamKey string, groupName string, vecMsgId []string) error

ReplyAck 返回ACK

type StreamMQInfo

type StreamMQInfo struct {
	Length          int64                         // 消息队列长度
	RedixTreeKeys   int64                         // 基数树key数
	RedixTreeNodes  int64                         // 基数树节点数
	LastGeneratedId string                        // 最后一个生成的消息ID
	Groups          int64                         // 消费组个数
	FirstEntry      *map[string]map[string]string // 第一个消息体
	LastEntry       *map[string]map[string]string // 最后一个消息体
}

消息队列信息

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL