admin

package
v0.0.0-...-ba2213e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2019 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultMQAdminExtImpl

type DefaultMQAdminExtImpl struct {
	MQAdminExtInner
	// contains filtered or unexported fields
}

DefaultMQAdminExtImpl 所有运维接口都在这里实现 Author: tianyuliang Since: 2017/11/2

func NewCustomMQAdminExtImpl

func NewCustomMQAdminExtImpl(rpcHook remoting.RPCHook, namesrvAddr string) *DefaultMQAdminExtImpl

NewCustomMQAdminExtImpl 初始化admin控制器 Author: tianyuliang Since: 2017/11/1

func NewDefaultMQAdminExtImpl

func NewDefaultMQAdminExtImpl(namesrvAddr string) *DefaultMQAdminExtImpl

NewDefaultMQAdminExtImpl 初始化admin控制器 Author: tianyuliang Since: 2017/11/1

func (*DefaultMQAdminExtImpl) CleanExpiredConsumerQueue

func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueue(clusterName string) (result bool, err error)

触发清理失效的消费队列 cluster 如果参数cluster为空,则表示所有集群 return 清理是否成功

func (*DefaultMQAdminExtImpl) CleanExpiredConsumerQueueByAddr

func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueueByAddr(brokerAddr string) (bool, error)

触发指定的broker清理失效的消费队列 return 清理是否成功

func (*DefaultMQAdminExtImpl) CloneGroupOffset

func (impl *DefaultMQAdminExtImpl) CloneGroupOffset(srcGroup, destGroup, topic string, isOffline bool) error

克隆某一个组的消费进度到新的组

func (*DefaultMQAdminExtImpl) ConsumeMessageDirectly

func (impl *DefaultMQAdminExtImpl) ConsumeMessageDirectly(consumerGroup, clientId, msgId string) (*body.ConsumeMessageDirectlyResult, error)

向指定Consumer发送某条消息

func (*DefaultMQAdminExtImpl) Consumed

func (impl *DefaultMQAdminExtImpl) Consumed(msg *message.MessageExt, consumerGroupId string) (bool, error)

Consumed 校验某条消息是否被某个消费组消费过

return: true表示已被消费; false:表示未被消费

Author: tianyuliang Since: 2017/11/6

func (*DefaultMQAdminExtImpl) CreateAndUpdateKvConfig

func (impl *DefaultMQAdminExtImpl) CreateAndUpdateKvConfig(namespace, key, value string) error

在 namespace 上添加或者更新 KV 配置

func (*DefaultMQAdminExtImpl) CreateAndUpdateSubscriptionGroupConfig

func (impl *DefaultMQAdminExtImpl) CreateAndUpdateSubscriptionGroupConfig(addr string, config *subscription.SubscriptionGroupConfig) error

向指定Broker创建或者更新订阅组配置

func (*DefaultMQAdminExtImpl) CreateAndUpdateTopicConfig

func (impl *DefaultMQAdminExtImpl) CreateAndUpdateTopicConfig(brokerAddr string, topicConfig *stgcommon.TopicConfig) error

创建或更新Topic

func (*DefaultMQAdminExtImpl) CreateCustomTopic

func (impl *DefaultMQAdminExtImpl) CreateCustomTopic(brokerAddr string, topicConfig *stgcommon.TopicConfig) error

创建Topic key 消息队列已存在的topic newTopic 需新建的topic queueNum 读写队列的数量

func (*DefaultMQAdminExtImpl) CreateOrUpdateOrderConf

func (impl *DefaultMQAdminExtImpl) CreateOrUpdateOrderConf(key, value string, isCluster bool) error

创建或更新顺序消息的分区配置

func (*DefaultMQAdminExtImpl) CreateTopic

func (impl *DefaultMQAdminExtImpl) CreateTopic(key, newTopic string, queueNum int) error

创建Topic key 消息队列已存在的topic newTopic 需新建的topic queueNum 读写队列的数量

func (*DefaultMQAdminExtImpl) DeleteIpsByProjectGroup

func (impl *DefaultMQAdminExtImpl) DeleteIpsByProjectGroup(key string) error

删除 project group 对应的所有 server ip

func (*DefaultMQAdminExtImpl) DeleteKvConfig

func (impl *DefaultMQAdminExtImpl) DeleteKvConfig(namespace, key string) error

删除 namespace 上的 KV 配置

func (*DefaultMQAdminExtImpl) DeleteSubscriptionGroup

func (impl *DefaultMQAdminExtImpl) DeleteSubscriptionGroup(brokerAddr, groupName string) error

删除 broker 上的 subscription group 信息

func (*DefaultMQAdminExtImpl) DeleteTopicInBroker

func (impl *DefaultMQAdminExtImpl) DeleteTopicInBroker(brokerAddrs set.Set, topic string) error

删除 broker 上的 topic 信息

func (*DefaultMQAdminExtImpl) DeleteTopicInNameServer

func (impl *DefaultMQAdminExtImpl) DeleteTopicInNameServer(namesrvSet set.Set, topic string) error

删除 namesrv维护的topic信息

func (*DefaultMQAdminExtImpl) EarliestMsgStoreTime

func (impl *DefaultMQAdminExtImpl) EarliestMsgStoreTime(mq *message.MessageQueue) (int64, error)

查询较早的存储消息

func (*DefaultMQAdminExtImpl) ExamineBrokerClusterInfo

func (impl *DefaultMQAdminExtImpl) ExamineBrokerClusterInfo() (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)

查看集群信息

func (*DefaultMQAdminExtImpl) ExamineConsumeStats

func (impl *DefaultMQAdminExtImpl) ExamineConsumeStats(consumerGroup string) (*admin.ConsumeStats, error)

查询消费进度

func (*DefaultMQAdminExtImpl) ExamineConsumeStatsByTopic

func (impl *DefaultMQAdminExtImpl) ExamineConsumeStatsByTopic(consumerGroup, topic string) (*admin.ConsumeStats, error)

基于Topic查询消费进度

func (*DefaultMQAdminExtImpl) ExamineConsumerConnectionInfo

func (impl *DefaultMQAdminExtImpl) ExamineConsumerConnectionInfo(consumerGroup, topic string) (*body.ConsumerConnectionPlus, int, error)

查看Consumer网络连接、订阅关系

func (*DefaultMQAdminExtImpl) ExamineProducerConnectionInfo

func (impl *DefaultMQAdminExtImpl) ExamineProducerConnectionInfo(producerGroup, topic string) (*body.ProducerConnection, error)

查看Producer网络连接

func (*DefaultMQAdminExtImpl) ExamineSubscriptionGroupConfig

func (impl *DefaultMQAdminExtImpl) ExamineSubscriptionGroupConfig(addr, group string) (*subscription.SubscriptionGroupConfig, error)

查询指定Broker的订阅组配置

func (*DefaultMQAdminExtImpl) ExamineTopicConfig

func (impl *DefaultMQAdminExtImpl) ExamineTopicConfig(addr, topic string) (*stgcommon.TopicConfig, error)

查询指定Broker的Topic配置

func (*DefaultMQAdminExtImpl) ExamineTopicRouteInfo

func (impl *DefaultMQAdminExtImpl) ExamineTopicRouteInfo(topic string) (*route.TopicRouteData, error)

查看Topic路由信息

func (*DefaultMQAdminExtImpl) ExamineTopicStats

func (impl *DefaultMQAdminExtImpl) ExamineTopicStats(topic string) (*admin.TopicStatsTable, error)

查询Topic Offset信息

func (*DefaultMQAdminExtImpl) FetchAllTopicList

func (impl *DefaultMQAdminExtImpl) FetchAllTopicList() (*body.TopicList, error)

从Name Server获取所有Topic列表

func (*DefaultMQAdminExtImpl) FetchBrokerNameByAddr

func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByAddr(brokerAddr string) (string, error)

FetchBrokerNameByAddr 根据broker地址查询对应的broker名称

返回值: set<brokerName>

Author: tianyuliang Since: 2017/11/7

func (*DefaultMQAdminExtImpl) FetchBrokerNameByClusterName

func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByClusterName(clusterName string) (set.Set, error)

FetchBrokerNameByClusterName 根据Cluster集群名称,拉取所有broker名称

返回值: set<brokerName>

Author: tianyuliang Since: 2017/11/7

func (*DefaultMQAdminExtImpl) FetchBrokerRuntimeStats

func (impl *DefaultMQAdminExtImpl) FetchBrokerRuntimeStats(brokerAddr string) (*body.KVTable, error)

获取Broker运行时数据

func (*DefaultMQAdminExtImpl) FetchMasterAddrByClusterName

func (impl *DefaultMQAdminExtImpl) FetchMasterAddrByClusterName(clusterName string) (set.Set, error)

FetchMasterAddrByClusterName 拉取所有角色是“master”的broker地址列表

返回值: set.Set保存所有角色是master的 brokerAddr地址,即set<brokerAddr>

Author: tianyuliang Since: 2017/11/7

func (*DefaultMQAdminExtImpl) GetAdminExtGroup

func (impl *DefaultMQAdminExtImpl) GetAdminExtGroup() string

GetAdminExtGroup 查询admin管理的组名称 Author: tianyuliang Since: 2017/11/6

func (*DefaultMQAdminExtImpl) GetAllClusterNames

func (impl *DefaultMQAdminExtImpl) GetAllClusterNames() ([]string, map[string]*route.BrokerData, error)

GetClusterList 获取集群名称 Author: tianyuliang Since: 2017/11/7

func (*DefaultMQAdminExtImpl) GetClusterTopicWappers

func (impl *DefaultMQAdminExtImpl) GetClusterTopicWappers() ([]*body.TopicBrokerClusterWapper, error)

GetClusterList 获取集群名称 Author: tianyuliang Since: 2017/11/7

func (*DefaultMQAdminExtImpl) GetConsumeStatus

func (impl *DefaultMQAdminExtImpl) GetConsumeStatus(topic, consumerGroupId, clientAddr string) (map[string]map[*message.MessageQueue]int64, error)

通过客户端查看消费者的消费情况

func (*DefaultMQAdminExtImpl) GetConsumerRunningInfo

func (impl *DefaultMQAdminExtImpl) GetConsumerRunningInfo(consumerGroupId, clientId string, jstack bool) (*body.ConsumerRunningInfo, error)

查询Consumer内存数据结构

func (*DefaultMQAdminExtImpl) GetCreateTopicKey

func (impl *DefaultMQAdminExtImpl) GetCreateTopicKey() string

GetCreateTopicKey 查询创建Topic的key值 Author: tianyuliang Since: 2017/11/6

func (*DefaultMQAdminExtImpl) GetIpsByProjectGroup

func (impl *DefaultMQAdminExtImpl) GetIpsByProjectGroup(projectGroup string) (string, error)

通过 project 获取所有的 server ip 信息

func (*DefaultMQAdminExtImpl) GetKVConfig

func (impl *DefaultMQAdminExtImpl) GetKVConfig(namespace, key string) (string, error)

从Name Server获取一个配置项

func (*DefaultMQAdminExtImpl) GetKVListByNamespace

func (impl *DefaultMQAdminExtImpl) GetKVListByNamespace(namespace string) (*body.KVTable, error)

获取指定Namespace下的所有kv

func (*DefaultMQAdminExtImpl) GetNameServerAddressList

func (impl *DefaultMQAdminExtImpl) GetNameServerAddressList() ([]string, error)

获取Name Server地址列表

func (*DefaultMQAdminExtImpl) GetProjectGroupByIp

func (impl *DefaultMQAdminExtImpl) GetProjectGroupByIp(ip string) (string, error)

通过 server ip 获取 project 信息

func (*DefaultMQAdminExtImpl) GetTopicsByCluster

func (impl *DefaultMQAdminExtImpl) GetTopicsByCluster(clusterName string) ([]*body.TopicBrokerClusterWapper, error)

GetTopicsByCluster 根据ClusterName,查询该集群管理的所有Topic Author: tianyuliang Since: 2017/11/8

func (*DefaultMQAdminExtImpl) MaxOffset

func (impl *DefaultMQAdminExtImpl) MaxOffset(mq *message.MessageQueue) (int64, error)

查询MessageQueue最大偏移量

func (*DefaultMQAdminExtImpl) MessageTrackDetail

func (impl *DefaultMQAdminExtImpl) MessageTrackDetail(msg *message.MessageExt) ([]*track.MessageTrack, error)

查询消息被谁消费了

func (*DefaultMQAdminExtImpl) MinOffset

func (impl *DefaultMQAdminExtImpl) MinOffset(mq *message.MessageQueue) (int64, error)

查询MessageQueue最小偏移量

func (*DefaultMQAdminExtImpl) PutKVConfig

func (impl *DefaultMQAdminExtImpl) PutKVConfig(namespace, key, value string) error

向Name Server增加一个配置项

func (*DefaultMQAdminExtImpl) QueryConsumeTimeSpan

func (impl *DefaultMQAdminExtImpl) QueryConsumeTimeSpan(topic, consumerGroupId string) (set.Set, error)

根据 topic 和 group 获取消息的时间跨度 retutn set<QueueTimeSpan>

func (*DefaultMQAdminExtImpl) QueryMessage

func (impl *DefaultMQAdminExtImpl) QueryMessage(topic, key string, maxNum int, begin, end int64) (*admin.QueryResult, error)

搜索消息 topic topic名称 key 消息key关键字[业务系统基于此字段唯一标识消息] maxNum 最大搜索条数 begin 开始查询消息的时间戳 end 结束查询消息的时间戳

func (*DefaultMQAdminExtImpl) QueryTopicConsumeByWho

func (impl *DefaultMQAdminExtImpl) QueryTopicConsumeByWho(topic string) (*body.GroupList, error)

根据Topic查询被哪些订阅组消费

func (*DefaultMQAdminExtImpl) ResetOffsetByTimestamp

func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestamp(topic, group string, timestamp int64, force bool) (map[*message.MessageQueue]int64, error)

按照时间回溯消费进度(客户端不需要重启)

func (*DefaultMQAdminExtImpl) ResetOffsetByTimestampOld

func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestampOld(consumerGroup, topic string, timestamp int64, force bool) ([]*admin.RollbackStats, error)

按照时间回溯消费进度(客户端需要重启)

func (*DefaultMQAdminExtImpl) ResetOffsetNew

func (impl *DefaultMQAdminExtImpl) ResetOffsetNew(consumerGroup, topic string, timestamp int64) error

重置消费进度,无论Consumer是否在线,都可以执行。不保证最终结果是否成功,需要调用方通过消费进度查询来再次确认

func (*DefaultMQAdminExtImpl) SearchOffset

func (impl *DefaultMQAdminExtImpl) SearchOffset(mq message.MessageQueue, timestamp int64) (int64, error)

根据时间戳搜索MessageQueue偏移量(注意:可能会出现大量IO开销)

func (*DefaultMQAdminExtImpl) Shutdown

func (impl *DefaultMQAdminExtImpl) Shutdown() error

Shutdown 关闭Admin Author: tianyuliang Since: 2017/11/6

func (*DefaultMQAdminExtImpl) Start

func (impl *DefaultMQAdminExtImpl) Start() error

Start 启动Admin Author: tianyuliang Since: 2017/11/6

func (*DefaultMQAdminExtImpl) UpdateBrokerConfig

func (impl *DefaultMQAdminExtImpl) UpdateBrokerConfig(brokerAddr string, properties map[string]interface{}) error

更新Broker配置

func (*DefaultMQAdminExtImpl) ViewBrokerStatsData

func (impl *DefaultMQAdminExtImpl) ViewBrokerStatsData(brokerAddr, statsName, statsKey string) (*body.BrokerStatsData, error)

服务器统计数据输出

func (*DefaultMQAdminExtImpl) ViewMessage

func (impl *DefaultMQAdminExtImpl) ViewMessage(msgId string) (*message.MessageExt, error)

根据msgId查询消息消费结果

func (*DefaultMQAdminExtImpl) WipeWritePermOfBroker

func (impl *DefaultMQAdminExtImpl) WipeWritePermOfBroker(namesrvAddr, brokerName string) (int, error)

清除某个Broker的写权限,针对所有Name Server return 返回清除了多少个topic

type MQAdminExtInner

type MQAdminExtInner interface {

	// 创建Topic
	CreateTopic(key, newTopic string, queueNum int) error

	// 启动Admin
	Start() error

	// 关闭Admin
	Shutdown() error

	// 更新Broker配置
	UpdateBrokerConfig(brokerAddr string, properties map[string]interface{}) error

	// 向指定Broker创建或者更新Topic配置
	CreateAndUpdateTopicConfig(addr string, config *stgcommon.TopicConfig) error

	// 向指定Broker创建或者更新订阅组配置
	CreateAndUpdateSubscriptionGroupConfig(addr string, config *subscription.SubscriptionGroupConfig) error

	// 查询指定Broker的订阅组配置
	ExamineSubscriptionGroupConfig(addr, group string) (*subscription.SubscriptionGroupConfig, error)

	// 查询指定Broker的Topic配置
	ExamineTopicConfig(addr, topic string) (*stgcommon.TopicConfig, error)

	// 查询Topic Offset信息
	ExamineTopicStats(topic string) (*admin.TopicStatsTable, error)

	// 从Name Server获取所有Topic列表
	FetchAllTopicList() (*body.TopicList, error)

	// 获取Broker运行时数据
	FetchBrokerRuntimeStats(brokerAddr string) (*body.KVTable, error)

	// 查询消费进度
	ExamineConsumeStats(consumerGroup string) (*admin.ConsumeStats, error)

	// 基于Topic查询消费进度
	ExamineConsumeStatsByTopic(consumerGroup, topic string) (*admin.ConsumeStats, error)

	// 查看集群信息
	ExamineBrokerClusterInfo() (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)

	// 查看Topic路由信息
	ExamineTopicRouteInfo(topic string) (*route.TopicRouteData, error)

	// 查看Consumer网络连接、订阅关系
	// 注意:第二个参数标记消费组对应的connection进程是否在线 code=206说明消费进程不在线
	ExamineConsumerConnectionInfo(consumerGroup, topic string) (*body.ConsumerConnectionPlus, int, error)

	// 查看Producer网络连接
	ExamineProducerConnectionInfo(producerGroup, topic string) (*body.ProducerConnection, error)

	// 获取Name Server地址列表
	GetNameServerAddressList() ([]string, error)

	// 清除某个Broker的写权限,针对所有Name Server
	// return 返回清除了多少个topic
	WipeWritePermOfBroker(namesrvAddr, brokerName string) (int, error)

	// 向Name Server增加一个配置项
	PutKVConfig(namespace, key, value string) error

	// 从Name Server获取一个配置项
	GetKVConfig(namespace, key string) (string, error)

	// 在 namespace 上添加或者更新 KV 配置
	CreateAndUpdateKvConfig(namespace, key, value string) error

	// 删除 namespace 上的 KV 配置
	DeleteKvConfig(namespace, key string) error

	// 获取指定Namespace下的所有kv
	GetKVListByNamespace(namespace string) (*body.KVTable, error)

	// 删除 broker 上的 topic 信息
	DeleteTopicInBroker(brokerAddrs set.Set, topic string) error

	// 删除 namesrv维护的topic信息
	DeleteTopicInNameServer(namesrvs set.Set, topic string) error

	// 删除 broker 上的 subscription group 信息
	DeleteSubscriptionGroup(brokerAddr, groupName string) error

	// 通过 server ip 获取 project 信息
	GetProjectGroupByIp(ip string) (string, error)

	// 通过 project 获取所有的 server ip 信息
	GetIpsByProjectGroup(projectGroup string) (string, error)

	// 删除 project group 对应的所有 server ip
	DeleteIpsByProjectGroup(key string) error

	// 按照时间回溯消费进度(客户端需要重启)
	ResetOffsetByTimestampOld(consumerGroup, topic string, timestamp int64, force bool) ([]*admin.RollbackStats, error)

	// 按照时间回溯消费进度(客户端不需要重启)
	ResetOffsetByTimestamp(topic, group string, timestamp int64, force bool) (map[*message.MessageQueue]int64, error)

	// 重置消费进度,无论Consumer是否在线,都可以执行。不保证最终结果是否成功,需要调用方通过消费进度查询来再次确认
	ResetOffsetNew(consumerGroup, topic string, timestamp int64) error

	// 通过客户端查看消费者的消费情况
	GetConsumeStatus(topic, consumerGroupId, clientAddr string) (map[string]map[*message.MessageQueue]int64, error)

	// 创建或更新顺序消息的分区配置
	CreateOrUpdateOrderConf(key, value string, isCluster bool) error

	// 根据Topic查询被哪些订阅组消费
	QueryTopicConsumeByWho(topic string) (*body.GroupList, error)

	// 根据 topic 和 group 获取消息的时间跨度
	// retutn set<QueueTimeSpan>
	QueryConsumeTimeSpan(topic, group string) (set.Set, error)

	// 触发清理失效的消费队列
	// cluster 如果参数cluster为空,则表示所有集群
	// return 清理是否成功
	CleanExpiredConsumerQueue(cluster string) (bool, error)

	// 触发指定的broker清理失效的消费队列
	// return 清理是否成功
	CleanExpiredConsumerQueueByAddr(addr string) (bool, error)

	// 查询Consumer内存数据结构
	GetConsumerRunningInfo(consumerGroupId, clientId string, jstack bool) (*body.ConsumerRunningInfo, error)

	// 向指定Consumer发送某条消息
	ConsumeMessageDirectly(consumerGroup, clientId, msgId string) (*body.ConsumeMessageDirectlyResult, error)

	//查询消息被谁消费了
	MessageTrackDetail(msg *message.MessageExt) ([]*track.MessageTrack, error)

	// 克隆某一个组的消费进度到新的组
	CloneGroupOffset(srcGroup, destGroup, topic string, isOffline bool) error

	// 服务器统计数据输出
	ViewBrokerStatsData(brokerAddr, statsName, statsKey string) (*body.BrokerStatsData, error)

	// 创建指定Topic
	CreateCustomTopic(brokerAddr string, topicConfig *stgcommon.TopicConfig) error

	// 根据msgId查询消息消费结果
	ViewMessage(msgId string) (*message.MessageExt, error)

	// 搜索消息
	// topic  topic名称
	// key    消息key关键字[业务系统基于此字段唯一标识消息]
	// maxNum 最大搜索条数
	// begin  开始查询消息的时间戳
	// end    结束查询消息的时间戳
	QueryMessage(topic, key string, maxNum int, begin, end int64) (*admin.QueryResult, error)

	// 查询较早的存储消息
	EarliestMsgStoreTime(mq *message.MessageQueue) (int64, error)

	// 根据时间戳搜索MessageQueue偏移量(注意:可能会出现大量IO开销)
	SearchOffset(mq message.MessageQueue, timestamp int64) (int64, error)

	// 查询MessageQueue最大偏移量
	MaxOffset(mq *message.MessageQueue) (int64, error)

	// 查询MessageQueue最小偏移量
	MinOffset(mq *message.MessageQueue) (int64, error)
}

MQAdminExtInner 运维接口

(1)MQ管理类接口,涉及所有与MQ管理相关的对外接口 (2)包括Topic创建、订阅组创建、配置修改等

Author: tianyuliang Since: 2017/11/1

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL