Documentation ¶
Index ¶
- type DefaultMQAdminExtImpl
- func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueue(clusterName string) (result bool, err error)
- func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueueByAddr(brokerAddr string) (bool, error)
- func (impl *DefaultMQAdminExtImpl) CloneGroupOffset(srcGroup, destGroup, topic string, isOffline bool) error
- func (impl *DefaultMQAdminExtImpl) ConsumeMessageDirectly(consumerGroup, clientId, msgId string) (*body.ConsumeMessageDirectlyResult, error)
- func (impl *DefaultMQAdminExtImpl) Consumed(msg *message.MessageExt, consumerGroupId string) (bool, error)
- func (impl *DefaultMQAdminExtImpl) CreateAndUpdateKvConfig(namespace, key, value string) error
- func (impl *DefaultMQAdminExtImpl) CreateAndUpdateSubscriptionGroupConfig(addr string, config *subscription.SubscriptionGroupConfig) error
- func (impl *DefaultMQAdminExtImpl) CreateAndUpdateTopicConfig(brokerAddr string, topicConfig *stgcommon.TopicConfig) error
- func (impl *DefaultMQAdminExtImpl) CreateCustomTopic(brokerAddr string, topicConfig *stgcommon.TopicConfig) error
- func (impl *DefaultMQAdminExtImpl) CreateOrUpdateOrderConf(key, value string, isCluster bool) error
- func (impl *DefaultMQAdminExtImpl) CreateTopic(key, newTopic string, queueNum int) error
- func (impl *DefaultMQAdminExtImpl) DeleteIpsByProjectGroup(key string) error
- func (impl *DefaultMQAdminExtImpl) DeleteKvConfig(namespace, key string) error
- func (impl *DefaultMQAdminExtImpl) DeleteSubscriptionGroup(brokerAddr, groupName string) error
- func (impl *DefaultMQAdminExtImpl) DeleteTopicInBroker(brokerAddrs set.Set, topic string) error
- func (impl *DefaultMQAdminExtImpl) DeleteTopicInNameServer(namesrvSet set.Set, topic string) error
- func (impl *DefaultMQAdminExtImpl) EarliestMsgStoreTime(mq *message.MessageQueue) (int64, error)
- func (impl *DefaultMQAdminExtImpl) ExamineBrokerClusterInfo() (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)
- func (impl *DefaultMQAdminExtImpl) ExamineConsumeStats(consumerGroup string) (*admin.ConsumeStats, error)
- func (impl *DefaultMQAdminExtImpl) ExamineConsumeStatsByTopic(consumerGroup, topic string) (*admin.ConsumeStats, error)
- func (impl *DefaultMQAdminExtImpl) ExamineConsumerConnectionInfo(consumerGroup, topic string) (*body.ConsumerConnectionPlus, int, error)
- func (impl *DefaultMQAdminExtImpl) ExamineProducerConnectionInfo(producerGroup, topic string) (*body.ProducerConnection, error)
- func (impl *DefaultMQAdminExtImpl) ExamineSubscriptionGroupConfig(addr, group string) (*subscription.SubscriptionGroupConfig, error)
- func (impl *DefaultMQAdminExtImpl) ExamineTopicConfig(addr, topic string) (*stgcommon.TopicConfig, error)
- func (impl *DefaultMQAdminExtImpl) ExamineTopicRouteInfo(topic string) (*route.TopicRouteData, error)
- func (impl *DefaultMQAdminExtImpl) ExamineTopicStats(topic string) (*admin.TopicStatsTable, error)
- func (impl *DefaultMQAdminExtImpl) FetchAllTopicList() (*body.TopicList, error)
- func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByAddr(brokerAddr string) (string, error)
- func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByClusterName(clusterName string) (set.Set, error)
- func (impl *DefaultMQAdminExtImpl) FetchBrokerRuntimeStats(brokerAddr string) (*body.KVTable, error)
- func (impl *DefaultMQAdminExtImpl) FetchMasterAddrByClusterName(clusterName string) (set.Set, error)
- func (impl *DefaultMQAdminExtImpl) GetAdminExtGroup() string
- func (impl *DefaultMQAdminExtImpl) GetAllClusterNames() ([]string, map[string]*route.BrokerData, error)
- func (impl *DefaultMQAdminExtImpl) GetClusterTopicWappers() ([]*body.TopicBrokerClusterWapper, error)
- func (impl *DefaultMQAdminExtImpl) GetConsumeStatus(topic, consumerGroupId, clientAddr string) (map[string]map[*message.MessageQueue]int64, error)
- func (impl *DefaultMQAdminExtImpl) GetConsumerRunningInfo(consumerGroupId, clientId string, jstack bool) (*body.ConsumerRunningInfo, error)
- func (impl *DefaultMQAdminExtImpl) GetCreateTopicKey() string
- func (impl *DefaultMQAdminExtImpl) GetIpsByProjectGroup(projectGroup string) (string, error)
- func (impl *DefaultMQAdminExtImpl) GetKVConfig(namespace, key string) (string, error)
- func (impl *DefaultMQAdminExtImpl) GetKVListByNamespace(namespace string) (*body.KVTable, error)
- func (impl *DefaultMQAdminExtImpl) GetNameServerAddressList() ([]string, error)
- func (impl *DefaultMQAdminExtImpl) GetProjectGroupByIp(ip string) (string, error)
- func (impl *DefaultMQAdminExtImpl) GetTopicsByCluster(clusterName string) ([]*body.TopicBrokerClusterWapper, error)
- func (impl *DefaultMQAdminExtImpl) MaxOffset(mq *message.MessageQueue) (int64, error)
- func (impl *DefaultMQAdminExtImpl) MessageTrackDetail(msg *message.MessageExt) ([]*track.MessageTrack, error)
- func (impl *DefaultMQAdminExtImpl) MinOffset(mq *message.MessageQueue) (int64, error)
- func (impl *DefaultMQAdminExtImpl) PutKVConfig(namespace, key, value string) error
- func (impl *DefaultMQAdminExtImpl) QueryConsumeTimeSpan(topic, consumerGroupId string) (set.Set, error)
- func (impl *DefaultMQAdminExtImpl) QueryMessage(topic, key string, maxNum int, begin, end int64) (*admin.QueryResult, error)
- func (impl *DefaultMQAdminExtImpl) QueryTopicConsumeByWho(topic string) (*body.GroupList, error)
- func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestamp(topic, group string, timestamp int64, force bool) (map[*message.MessageQueue]int64, error)
- func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestampOld(consumerGroup, topic string, timestamp int64, force bool) ([]*admin.RollbackStats, error)
- func (impl *DefaultMQAdminExtImpl) ResetOffsetNew(consumerGroup, topic string, timestamp int64) error
- func (impl *DefaultMQAdminExtImpl) SearchOffset(mq message.MessageQueue, timestamp int64) (int64, error)
- func (impl *DefaultMQAdminExtImpl) Shutdown() error
- func (impl *DefaultMQAdminExtImpl) Start() error
- func (impl *DefaultMQAdminExtImpl) UpdateBrokerConfig(brokerAddr string, properties map[string]interface{}) error
- func (impl *DefaultMQAdminExtImpl) ViewBrokerStatsData(brokerAddr, statsName, statsKey string) (*body.BrokerStatsData, error)
- func (impl *DefaultMQAdminExtImpl) ViewMessage(msgId string) (*message.MessageExt, error)
- func (impl *DefaultMQAdminExtImpl) WipeWritePermOfBroker(namesrvAddr, brokerName string) (int, error)
- type MQAdminExtInner
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultMQAdminExtImpl ¶
type DefaultMQAdminExtImpl struct { MQAdminExtInner // contains filtered or unexported fields }
DefaultMQAdminExtImpl 所有运维接口都在这里实现 Author: tianyuliang Since: 2017/11/2
func NewCustomMQAdminExtImpl ¶
func NewCustomMQAdminExtImpl(rpcHook remoting.RPCHook, namesrvAddr string) *DefaultMQAdminExtImpl
NewCustomMQAdminExtImpl 初始化admin控制器 Author: tianyuliang Since: 2017/11/1
func NewDefaultMQAdminExtImpl ¶
func NewDefaultMQAdminExtImpl(namesrvAddr string) *DefaultMQAdminExtImpl
NewDefaultMQAdminExtImpl 初始化admin控制器 Author: tianyuliang Since: 2017/11/1
func (*DefaultMQAdminExtImpl) CleanExpiredConsumerQueue ¶
func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueue(clusterName string) (result bool, err error)
触发清理失效的消费队列 cluster 如果参数cluster为空,则表示所有集群 return 清理是否成功
func (*DefaultMQAdminExtImpl) CleanExpiredConsumerQueueByAddr ¶
func (impl *DefaultMQAdminExtImpl) CleanExpiredConsumerQueueByAddr(brokerAddr string) (bool, error)
触发指定的broker清理失效的消费队列 return 清理是否成功
func (*DefaultMQAdminExtImpl) CloneGroupOffset ¶
func (impl *DefaultMQAdminExtImpl) CloneGroupOffset(srcGroup, destGroup, topic string, isOffline bool) error
克隆某一个组的消费进度到新的组
func (*DefaultMQAdminExtImpl) ConsumeMessageDirectly ¶
func (impl *DefaultMQAdminExtImpl) ConsumeMessageDirectly(consumerGroup, clientId, msgId string) (*body.ConsumeMessageDirectlyResult, error)
向指定Consumer发送某条消息
func (*DefaultMQAdminExtImpl) Consumed ¶
func (impl *DefaultMQAdminExtImpl) Consumed(msg *message.MessageExt, consumerGroupId string) (bool, error)
Consumed 校验某条消息是否被某个消费组消费过
return: true表示已被消费; false:表示未被消费
Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) CreateAndUpdateKvConfig ¶
func (impl *DefaultMQAdminExtImpl) CreateAndUpdateKvConfig(namespace, key, value string) error
在 namespace 上添加或者更新 KV 配置
func (*DefaultMQAdminExtImpl) CreateAndUpdateSubscriptionGroupConfig ¶
func (impl *DefaultMQAdminExtImpl) CreateAndUpdateSubscriptionGroupConfig(addr string, config *subscription.SubscriptionGroupConfig) error
向指定Broker创建或者更新订阅组配置
func (*DefaultMQAdminExtImpl) CreateAndUpdateTopicConfig ¶
func (impl *DefaultMQAdminExtImpl) CreateAndUpdateTopicConfig(brokerAddr string, topicConfig *stgcommon.TopicConfig) error
创建或更新Topic
func (*DefaultMQAdminExtImpl) CreateCustomTopic ¶
func (impl *DefaultMQAdminExtImpl) CreateCustomTopic(brokerAddr string, topicConfig *stgcommon.TopicConfig) error
创建Topic key 消息队列已存在的topic newTopic 需新建的topic queueNum 读写队列的数量
func (*DefaultMQAdminExtImpl) CreateOrUpdateOrderConf ¶
func (impl *DefaultMQAdminExtImpl) CreateOrUpdateOrderConf(key, value string, isCluster bool) error
创建或更新顺序消息的分区配置
func (*DefaultMQAdminExtImpl) CreateTopic ¶
func (impl *DefaultMQAdminExtImpl) CreateTopic(key, newTopic string, queueNum int) error
创建Topic key 消息队列已存在的topic newTopic 需新建的topic queueNum 读写队列的数量
func (*DefaultMQAdminExtImpl) DeleteIpsByProjectGroup ¶
func (impl *DefaultMQAdminExtImpl) DeleteIpsByProjectGroup(key string) error
删除 project group 对应的所有 server ip
func (*DefaultMQAdminExtImpl) DeleteKvConfig ¶
func (impl *DefaultMQAdminExtImpl) DeleteKvConfig(namespace, key string) error
删除 namespace 上的 KV 配置
func (*DefaultMQAdminExtImpl) DeleteSubscriptionGroup ¶
func (impl *DefaultMQAdminExtImpl) DeleteSubscriptionGroup(brokerAddr, groupName string) error
删除 broker 上的 subscription group 信息
func (*DefaultMQAdminExtImpl) DeleteTopicInBroker ¶
func (impl *DefaultMQAdminExtImpl) DeleteTopicInBroker(brokerAddrs set.Set, topic string) error
删除 broker 上的 topic 信息
func (*DefaultMQAdminExtImpl) DeleteTopicInNameServer ¶
func (impl *DefaultMQAdminExtImpl) DeleteTopicInNameServer(namesrvSet set.Set, topic string) error
删除 namesrv维护的topic信息
func (*DefaultMQAdminExtImpl) EarliestMsgStoreTime ¶
func (impl *DefaultMQAdminExtImpl) EarliestMsgStoreTime(mq *message.MessageQueue) (int64, error)
查询较早的存储消息
func (*DefaultMQAdminExtImpl) ExamineBrokerClusterInfo ¶
func (impl *DefaultMQAdminExtImpl) ExamineBrokerClusterInfo() (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)
查看集群信息
func (*DefaultMQAdminExtImpl) ExamineConsumeStats ¶
func (impl *DefaultMQAdminExtImpl) ExamineConsumeStats(consumerGroup string) (*admin.ConsumeStats, error)
查询消费进度
func (*DefaultMQAdminExtImpl) ExamineConsumeStatsByTopic ¶
func (impl *DefaultMQAdminExtImpl) ExamineConsumeStatsByTopic(consumerGroup, topic string) (*admin.ConsumeStats, error)
基于Topic查询消费进度
func (*DefaultMQAdminExtImpl) ExamineConsumerConnectionInfo ¶
func (impl *DefaultMQAdminExtImpl) ExamineConsumerConnectionInfo(consumerGroup, topic string) (*body.ConsumerConnectionPlus, int, error)
查看Consumer网络连接、订阅关系
func (*DefaultMQAdminExtImpl) ExamineProducerConnectionInfo ¶
func (impl *DefaultMQAdminExtImpl) ExamineProducerConnectionInfo(producerGroup, topic string) (*body.ProducerConnection, error)
查看Producer网络连接
func (*DefaultMQAdminExtImpl) ExamineSubscriptionGroupConfig ¶
func (impl *DefaultMQAdminExtImpl) ExamineSubscriptionGroupConfig(addr, group string) (*subscription.SubscriptionGroupConfig, error)
查询指定Broker的订阅组配置
func (*DefaultMQAdminExtImpl) ExamineTopicConfig ¶
func (impl *DefaultMQAdminExtImpl) ExamineTopicConfig(addr, topic string) (*stgcommon.TopicConfig, error)
查询指定Broker的Topic配置
func (*DefaultMQAdminExtImpl) ExamineTopicRouteInfo ¶
func (impl *DefaultMQAdminExtImpl) ExamineTopicRouteInfo(topic string) (*route.TopicRouteData, error)
查看Topic路由信息
func (*DefaultMQAdminExtImpl) ExamineTopicStats ¶
func (impl *DefaultMQAdminExtImpl) ExamineTopicStats(topic string) (*admin.TopicStatsTable, error)
查询Topic Offset信息
func (*DefaultMQAdminExtImpl) FetchAllTopicList ¶
func (impl *DefaultMQAdminExtImpl) FetchAllTopicList() (*body.TopicList, error)
从Name Server获取所有Topic列表
func (*DefaultMQAdminExtImpl) FetchBrokerNameByAddr ¶
func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByAddr(brokerAddr string) (string, error)
FetchBrokerNameByAddr 根据broker地址查询对应的broker名称
返回值: set<brokerName>
Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) FetchBrokerNameByClusterName ¶
func (impl *DefaultMQAdminExtImpl) FetchBrokerNameByClusterName(clusterName string) (set.Set, error)
FetchBrokerNameByClusterName 根据Cluster集群名称,拉取所有broker名称
返回值: set<brokerName>
Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) FetchBrokerRuntimeStats ¶
func (impl *DefaultMQAdminExtImpl) FetchBrokerRuntimeStats(brokerAddr string) (*body.KVTable, error)
获取Broker运行时数据
func (*DefaultMQAdminExtImpl) FetchMasterAddrByClusterName ¶
func (impl *DefaultMQAdminExtImpl) FetchMasterAddrByClusterName(clusterName string) (set.Set, error)
FetchMasterAddrByClusterName 拉取所有角色是“master”的broker地址列表
返回值: set.Set保存所有角色是master的 brokerAddr地址,即set<brokerAddr>
Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) GetAdminExtGroup ¶
func (impl *DefaultMQAdminExtImpl) GetAdminExtGroup() string
GetAdminExtGroup 查询admin管理的组名称 Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) GetAllClusterNames ¶
func (impl *DefaultMQAdminExtImpl) GetAllClusterNames() ([]string, map[string]*route.BrokerData, error)
GetClusterList 获取集群名称 Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) GetClusterTopicWappers ¶
func (impl *DefaultMQAdminExtImpl) GetClusterTopicWappers() ([]*body.TopicBrokerClusterWapper, error)
GetClusterList 获取集群名称 Author: tianyuliang Since: 2017/11/7
func (*DefaultMQAdminExtImpl) GetConsumeStatus ¶
func (impl *DefaultMQAdminExtImpl) GetConsumeStatus(topic, consumerGroupId, clientAddr string) (map[string]map[*message.MessageQueue]int64, error)
通过客户端查看消费者的消费情况
func (*DefaultMQAdminExtImpl) GetConsumerRunningInfo ¶
func (impl *DefaultMQAdminExtImpl) GetConsumerRunningInfo(consumerGroupId, clientId string, jstack bool) (*body.ConsumerRunningInfo, error)
查询Consumer内存数据结构
func (*DefaultMQAdminExtImpl) GetCreateTopicKey ¶
func (impl *DefaultMQAdminExtImpl) GetCreateTopicKey() string
GetCreateTopicKey 查询创建Topic的key值 Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) GetIpsByProjectGroup ¶
func (impl *DefaultMQAdminExtImpl) GetIpsByProjectGroup(projectGroup string) (string, error)
通过 project 获取所有的 server ip 信息
func (*DefaultMQAdminExtImpl) GetKVConfig ¶
func (impl *DefaultMQAdminExtImpl) GetKVConfig(namespace, key string) (string, error)
从Name Server获取一个配置项
func (*DefaultMQAdminExtImpl) GetKVListByNamespace ¶
func (impl *DefaultMQAdminExtImpl) GetKVListByNamespace(namespace string) (*body.KVTable, error)
获取指定Namespace下的所有kv
func (*DefaultMQAdminExtImpl) GetNameServerAddressList ¶
func (impl *DefaultMQAdminExtImpl) GetNameServerAddressList() ([]string, error)
获取Name Server地址列表
func (*DefaultMQAdminExtImpl) GetProjectGroupByIp ¶
func (impl *DefaultMQAdminExtImpl) GetProjectGroupByIp(ip string) (string, error)
通过 server ip 获取 project 信息
func (*DefaultMQAdminExtImpl) GetTopicsByCluster ¶
func (impl *DefaultMQAdminExtImpl) GetTopicsByCluster(clusterName string) ([]*body.TopicBrokerClusterWapper, error)
GetTopicsByCluster 根据ClusterName,查询该集群管理的所有Topic Author: tianyuliang Since: 2017/11/8
func (*DefaultMQAdminExtImpl) MaxOffset ¶
func (impl *DefaultMQAdminExtImpl) MaxOffset(mq *message.MessageQueue) (int64, error)
查询MessageQueue最大偏移量
func (*DefaultMQAdminExtImpl) MessageTrackDetail ¶
func (impl *DefaultMQAdminExtImpl) MessageTrackDetail(msg *message.MessageExt) ([]*track.MessageTrack, error)
查询消息被谁消费了
func (*DefaultMQAdminExtImpl) MinOffset ¶
func (impl *DefaultMQAdminExtImpl) MinOffset(mq *message.MessageQueue) (int64, error)
查询MessageQueue最小偏移量
func (*DefaultMQAdminExtImpl) PutKVConfig ¶
func (impl *DefaultMQAdminExtImpl) PutKVConfig(namespace, key, value string) error
向Name Server增加一个配置项
func (*DefaultMQAdminExtImpl) QueryConsumeTimeSpan ¶
func (impl *DefaultMQAdminExtImpl) QueryConsumeTimeSpan(topic, consumerGroupId string) (set.Set, error)
根据 topic 和 group 获取消息的时间跨度 retutn set<QueueTimeSpan>
func (*DefaultMQAdminExtImpl) QueryMessage ¶
func (impl *DefaultMQAdminExtImpl) QueryMessage(topic, key string, maxNum int, begin, end int64) (*admin.QueryResult, error)
搜索消息 topic topic名称 key 消息key关键字[业务系统基于此字段唯一标识消息] maxNum 最大搜索条数 begin 开始查询消息的时间戳 end 结束查询消息的时间戳
func (*DefaultMQAdminExtImpl) QueryTopicConsumeByWho ¶
func (impl *DefaultMQAdminExtImpl) QueryTopicConsumeByWho(topic string) (*body.GroupList, error)
根据Topic查询被哪些订阅组消费
func (*DefaultMQAdminExtImpl) ResetOffsetByTimestamp ¶
func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestamp(topic, group string, timestamp int64, force bool) (map[*message.MessageQueue]int64, error)
按照时间回溯消费进度(客户端不需要重启)
func (*DefaultMQAdminExtImpl) ResetOffsetByTimestampOld ¶
func (impl *DefaultMQAdminExtImpl) ResetOffsetByTimestampOld(consumerGroup, topic string, timestamp int64, force bool) ([]*admin.RollbackStats, error)
按照时间回溯消费进度(客户端需要重启)
func (*DefaultMQAdminExtImpl) ResetOffsetNew ¶
func (impl *DefaultMQAdminExtImpl) ResetOffsetNew(consumerGroup, topic string, timestamp int64) error
重置消费进度,无论Consumer是否在线,都可以执行。不保证最终结果是否成功,需要调用方通过消费进度查询来再次确认
func (*DefaultMQAdminExtImpl) SearchOffset ¶
func (impl *DefaultMQAdminExtImpl) SearchOffset(mq message.MessageQueue, timestamp int64) (int64, error)
根据时间戳搜索MessageQueue偏移量(注意:可能会出现大量IO开销)
func (*DefaultMQAdminExtImpl) Shutdown ¶
func (impl *DefaultMQAdminExtImpl) Shutdown() error
Shutdown 关闭Admin Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) Start ¶
func (impl *DefaultMQAdminExtImpl) Start() error
Start 启动Admin Author: tianyuliang Since: 2017/11/6
func (*DefaultMQAdminExtImpl) UpdateBrokerConfig ¶
func (impl *DefaultMQAdminExtImpl) UpdateBrokerConfig(brokerAddr string, properties map[string]interface{}) error
更新Broker配置
func (*DefaultMQAdminExtImpl) ViewBrokerStatsData ¶
func (impl *DefaultMQAdminExtImpl) ViewBrokerStatsData(brokerAddr, statsName, statsKey string) (*body.BrokerStatsData, error)
服务器统计数据输出
func (*DefaultMQAdminExtImpl) ViewMessage ¶
func (impl *DefaultMQAdminExtImpl) ViewMessage(msgId string) (*message.MessageExt, error)
根据msgId查询消息消费结果
func (*DefaultMQAdminExtImpl) WipeWritePermOfBroker ¶
func (impl *DefaultMQAdminExtImpl) WipeWritePermOfBroker(namesrvAddr, brokerName string) (int, error)
清除某个Broker的写权限,针对所有Name Server return 返回清除了多少个topic
type MQAdminExtInner ¶
type MQAdminExtInner interface { // 创建Topic CreateTopic(key, newTopic string, queueNum int) error // 启动Admin Start() error // 关闭Admin Shutdown() error // 更新Broker配置 UpdateBrokerConfig(brokerAddr string, properties map[string]interface{}) error // 向指定Broker创建或者更新Topic配置 CreateAndUpdateTopicConfig(addr string, config *stgcommon.TopicConfig) error // 向指定Broker创建或者更新订阅组配置 CreateAndUpdateSubscriptionGroupConfig(addr string, config *subscription.SubscriptionGroupConfig) error // 查询指定Broker的订阅组配置 ExamineSubscriptionGroupConfig(addr, group string) (*subscription.SubscriptionGroupConfig, error) // 查询指定Broker的Topic配置 ExamineTopicConfig(addr, topic string) (*stgcommon.TopicConfig, error) // 查询Topic Offset信息 ExamineTopicStats(topic string) (*admin.TopicStatsTable, error) // 从Name Server获取所有Topic列表 FetchAllTopicList() (*body.TopicList, error) // 获取Broker运行时数据 FetchBrokerRuntimeStats(brokerAddr string) (*body.KVTable, error) // 查询消费进度 ExamineConsumeStats(consumerGroup string) (*admin.ConsumeStats, error) // 基于Topic查询消费进度 ExamineConsumeStatsByTopic(consumerGroup, topic string) (*admin.ConsumeStats, error) // 查看集群信息 ExamineBrokerClusterInfo() (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error) // 查看Topic路由信息 ExamineTopicRouteInfo(topic string) (*route.TopicRouteData, error) // 查看Consumer网络连接、订阅关系 // 注意:第二个参数标记消费组对应的connection进程是否在线 code=206说明消费进程不在线 ExamineConsumerConnectionInfo(consumerGroup, topic string) (*body.ConsumerConnectionPlus, int, error) // 查看Producer网络连接 ExamineProducerConnectionInfo(producerGroup, topic string) (*body.ProducerConnection, error) // 获取Name Server地址列表 GetNameServerAddressList() ([]string, error) // 清除某个Broker的写权限,针对所有Name Server // return 返回清除了多少个topic WipeWritePermOfBroker(namesrvAddr, brokerName string) (int, error) // 向Name Server增加一个配置项 PutKVConfig(namespace, key, value string) error // 从Name Server获取一个配置项 GetKVConfig(namespace, key string) (string, error) // 在 namespace 上添加或者更新 KV 配置 CreateAndUpdateKvConfig(namespace, key, value string) error // 删除 namespace 上的 KV 配置 DeleteKvConfig(namespace, key string) error // 获取指定Namespace下的所有kv GetKVListByNamespace(namespace string) (*body.KVTable, error) // 删除 broker 上的 topic 信息 DeleteTopicInBroker(brokerAddrs set.Set, topic string) error // 删除 namesrv维护的topic信息 DeleteTopicInNameServer(namesrvs set.Set, topic string) error // 删除 broker 上的 subscription group 信息 DeleteSubscriptionGroup(brokerAddr, groupName string) error // 通过 server ip 获取 project 信息 GetProjectGroupByIp(ip string) (string, error) // 通过 project 获取所有的 server ip 信息 GetIpsByProjectGroup(projectGroup string) (string, error) // 删除 project group 对应的所有 server ip DeleteIpsByProjectGroup(key string) error // 按照时间回溯消费进度(客户端需要重启) ResetOffsetByTimestampOld(consumerGroup, topic string, timestamp int64, force bool) ([]*admin.RollbackStats, error) // 按照时间回溯消费进度(客户端不需要重启) ResetOffsetByTimestamp(topic, group string, timestamp int64, force bool) (map[*message.MessageQueue]int64, error) // 重置消费进度,无论Consumer是否在线,都可以执行。不保证最终结果是否成功,需要调用方通过消费进度查询来再次确认 ResetOffsetNew(consumerGroup, topic string, timestamp int64) error // 通过客户端查看消费者的消费情况 GetConsumeStatus(topic, consumerGroupId, clientAddr string) (map[string]map[*message.MessageQueue]int64, error) // 创建或更新顺序消息的分区配置 CreateOrUpdateOrderConf(key, value string, isCluster bool) error // 根据Topic查询被哪些订阅组消费 QueryTopicConsumeByWho(topic string) (*body.GroupList, error) // 根据 topic 和 group 获取消息的时间跨度 // retutn set<QueueTimeSpan> QueryConsumeTimeSpan(topic, group string) (set.Set, error) // 触发清理失效的消费队列 // cluster 如果参数cluster为空,则表示所有集群 // return 清理是否成功 CleanExpiredConsumerQueue(cluster string) (bool, error) // 触发指定的broker清理失效的消费队列 // return 清理是否成功 CleanExpiredConsumerQueueByAddr(addr string) (bool, error) // 查询Consumer内存数据结构 GetConsumerRunningInfo(consumerGroupId, clientId string, jstack bool) (*body.ConsumerRunningInfo, error) // 向指定Consumer发送某条消息 ConsumeMessageDirectly(consumerGroup, clientId, msgId string) (*body.ConsumeMessageDirectlyResult, error) //查询消息被谁消费了 MessageTrackDetail(msg *message.MessageExt) ([]*track.MessageTrack, error) // 克隆某一个组的消费进度到新的组 CloneGroupOffset(srcGroup, destGroup, topic string, isOffline bool) error // 服务器统计数据输出 ViewBrokerStatsData(brokerAddr, statsName, statsKey string) (*body.BrokerStatsData, error) // 创建指定Topic CreateCustomTopic(brokerAddr string, topicConfig *stgcommon.TopicConfig) error // 根据msgId查询消息消费结果 ViewMessage(msgId string) (*message.MessageExt, error) // 搜索消息 // topic topic名称 // key 消息key关键字[业务系统基于此字段唯一标识消息] // maxNum 最大搜索条数 // begin 开始查询消息的时间戳 // end 结束查询消息的时间戳 QueryMessage(topic, key string, maxNum int, begin, end int64) (*admin.QueryResult, error) // 查询较早的存储消息 EarliestMsgStoreTime(mq *message.MessageQueue) (int64, error) // 根据时间戳搜索MessageQueue偏移量(注意:可能会出现大量IO开销) SearchOffset(mq message.MessageQueue, timestamp int64) (int64, error) // 查询MessageQueue最大偏移量 MaxOffset(mq *message.MessageQueue) (int64, error) // 查询MessageQueue最小偏移量 MinOffset(mq *message.MessageQueue) (int64, error) }
MQAdminExtInner 运维接口
(1)MQ管理类接口,涉及所有与MQ管理相关的对外接口 (2)包括Topic创建、订阅组创建、配置修改等
Author: tianyuliang Since: 2017/11/1