process

package
v0.0.0-...-ba2213e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2019 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CHARACTER_MAX_LENGTH = 255
	VALID_PATTERN_STR    = "^[%|a-zA-Z0-9_-]+$"
)

Variables

This section is empty.

Functions

func CheckGroup

func CheckGroup(group string) error

func CheckMessage

func CheckMessage(msg *message.Message, defaultMQProducer DefaultMQProducer)

func CheckTopic

func CheckTopic(topic string)

Types

type ClientRemotingProcessor

type ClientRemotingProcessor struct {
	MQClientFactory *MQClientInstance
}

客户端处理器 Author: yintongqiang Since: 2017/8/8

func NewClientRemotingProcessor

func NewClientRemotingProcessor(mqClientFactory *MQClientInstance) *ClientRemotingProcessor

func (*ClientRemotingProcessor) ProcessRequest

处理request

type CommunicationMode

type CommunicationMode int
const (
	SYNC CommunicationMode = iota
	ASYNC
	ONEWAY
)

func (CommunicationMode) String

func (cm CommunicationMode) String() string

type ConsumeMessageConcurrentlyService

type ConsumeMessageConcurrentlyService struct {
	// contains filtered or unexported fields
}

func NewConsumeMessageConcurrentlyService

func NewConsumeMessageConcurrentlyService(defaultMQPushConsumerImpl *DefaultMQPushConsumerImpl, messageListener consumer.MessageListenerConcurrently) *ConsumeMessageConcurrentlyService

func (*ConsumeMessageConcurrentlyService) ConsumeMessageDirectly

func (service *ConsumeMessageConcurrentlyService) ConsumeMessageDirectly(msg *message.MessageExt, brokerName string) *body.ConsumeMessageDirectlyResult

func (*ConsumeMessageConcurrentlyService) Shutdown

func (service *ConsumeMessageConcurrentlyService) Shutdown()

func (*ConsumeMessageConcurrentlyService) Start

func (service *ConsumeMessageConcurrentlyService) Start()

仅仅为了实现接口

func (*ConsumeMessageConcurrentlyService) SubmitConsumeRequest

func (service *ConsumeMessageConcurrentlyService) SubmitConsumeRequest(msgs []*message.MessageExt, processQueue *consumer.ProcessQueue, messageQueue *message.MessageQueue, dispathToConsume bool)

提交消费请求

type ConsumeMessageService

type ConsumeMessageService interface {
	Start()    // 开启
	Shutdown() // 关闭
	ConsumeMessageDirectly(msg *message.MessageExt, brokerName string) *body.ConsumeMessageDirectlyResult
	SubmitConsumeRequest(msgs []*message.MessageExt, processQueue *consumer.ProcessQueue, messageQueue *message.MessageQueue, dispathToConsume bool) // 提交消费请求
}

ConsumeMessageService: 消费消息服务接口 Author: yintongqiang Since: 2017/8/11

type DefaultMQProducer

type DefaultMQProducer struct {
	DefaultMQProducerImpl            *DefaultMQProducerImpl
	ProducerGroup                    string
	CreateTopicKey                   string
	DefaultTopicQueueNums            int
	SendMsgTimeout                   int64
	CompressMsgBodyOverHowmuch       int
	RetryTimesWhenSendFailed         int32
	RetryAnotherBrokerWhenNotStoreOK bool
	MaxMessageSize                   int
	UnitMode                         bool
	ClientConfig                     *stgclient.ClientConfig
}

func NewDefaultMQProducer

func NewDefaultMQProducer(producerGroup string) *DefaultMQProducer

func (*DefaultMQProducer) CreateTopic

func (defaultMQProducer *DefaultMQProducer) CreateTopic(key, newTopic string, queueNum int)

对外提供创建topic方法

func (*DefaultMQProducer) Send

func (defaultMQProducer *DefaultMQProducer) Send(msg *message.Message) (*SendResult, error)

发送同步消息

func (*DefaultMQProducer) SendCallBack

func (defaultMQProducer *DefaultMQProducer) SendCallBack(msg *message.Message, callback SendCallback) error

发送callback消息

func (*DefaultMQProducer) SendOneWay

func (defaultMQProducer *DefaultMQProducer) SendOneWay(msg *message.Message) error

发送sendOneWay消息

func (*DefaultMQProducer) SetNamesrvAddr

func (defaultMQProducer *DefaultMQProducer) SetNamesrvAddr(namesrvAddr string)

func (*DefaultMQProducer) Shutdown

func (defaultMQProducer *DefaultMQProducer) Shutdown()

func (*DefaultMQProducer) Start

func (defaultMQProducer *DefaultMQProducer) Start()

type DefaultMQProducerImpl

type DefaultMQProducerImpl struct {
	DefaultMQProducer     *DefaultMQProducer
	TopicPublishInfoTable *sync.Map
	ServiceState          stgcommon.ServiceState
	MQClientFactory       *MQClientInstance
}

func NewDefaultMQProducerImpl

func NewDefaultMQProducerImpl(defaultMQProducer *DefaultMQProducer) *DefaultMQProducerImpl

func (*DefaultMQProducerImpl) CreateTopic

func (defaultMQProducerImpl *DefaultMQProducerImpl) CreateTopic(key, newTopic string, queueNum int)

对外提供创建topic方法

func (*DefaultMQProducerImpl) CreateTopicByFlag

func (defaultMQProducerImpl *DefaultMQProducerImpl) CreateTopicByFlag(key, newTopic string, queueNum, topicSysFlag int)

对外提供创建topic方法

func (*DefaultMQProducerImpl) GetPublishTopicList

func (defaultMQProducerImpl *DefaultMQProducerImpl) GetPublishTopicList() set.Set

获取topic发布集合

func (*DefaultMQProducerImpl) IsPublishTopicNeedUpdate

func (defaultMQProducerImpl *DefaultMQProducerImpl) IsPublishTopicNeedUpdate(topic string) bool

是否需要更新topic信息

func (*DefaultMQProducerImpl) SendByTimeout

func (defaultMQProducerImpl *DefaultMQProducerImpl) SendByTimeout(msg *message.Message, timeout int64) (*SendResult, error)

带timeout的发送消息

func (*DefaultMQProducerImpl) Shutdown

func (defaultMQProducerImpl *DefaultMQProducerImpl) Shutdown()

func (*DefaultMQProducerImpl) ShutdownFlag

func (defaultMQProducerImpl *DefaultMQProducerImpl) ShutdownFlag(shutdownFactory bool)

func (*DefaultMQProducerImpl) StartFlag

func (defaultMQProducerImpl *DefaultMQProducerImpl) StartFlag(startFactory bool) error

生产启动方法

func (*DefaultMQProducerImpl) UpdateTopicPublishInfo

func (defaultMQProducerImpl *DefaultMQProducerImpl) UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)

更新topic信息

type DefaultMQPullConsumer

type DefaultMQPullConsumer struct {
	// contains filtered or unexported fields
}

DefaultMQPullConsumer: 手动拉取消息 Author: yintongqiang Since: 2017/8/17

func NewDefaultMQPullConsumer

func NewDefaultMQPullConsumer(consumerGroup string) *DefaultMQPullConsumer

func (*DefaultMQPullConsumer) FetchSubscribeMessageQueues

func (pullConsumer *DefaultMQPullConsumer) FetchSubscribeMessageQueues(topic string) []*message.MessageQueue

func (*DefaultMQPullConsumer) Pull

func (pullConsumer *DefaultMQPullConsumer) Pull(mq *message.MessageQueue, subExpression string, offset int64, maxNums int) (*consumer.PullResult, error)

func (*DefaultMQPullConsumer) SetNamesrvAddr

func (pullConsumer *DefaultMQPullConsumer) SetNamesrvAddr(namesrvAddr string)

设置namesrvaddr

func (*DefaultMQPullConsumer) Shutdown

func (pullConsumer *DefaultMQPullConsumer) Shutdown()

func (*DefaultMQPullConsumer) Start

func (pullConsumer *DefaultMQPullConsumer) Start()

type DefaultMQPullConsumerImpl

type DefaultMQPullConsumerImpl struct {
	OffsetStore   store.OffsetStore
	RebalanceImpl RebalanceImpl
	// contains filtered or unexported fields
}

func NewDefaultMQPullConsumerImpl

func NewDefaultMQPullConsumerImpl(defaultMQPullConsumer *DefaultMQPullConsumer) *DefaultMQPullConsumerImpl

func (*DefaultMQPullConsumerImpl) ConsumeFromWhere

func (pullImpl *DefaultMQPullConsumerImpl) ConsumeFromWhere() heartbeat.ConsumeFromWhere

func (*DefaultMQPullConsumerImpl) ConsumeType

func (pullImpl *DefaultMQPullConsumerImpl) ConsumeType() heartbeat.ConsumeType

func (*DefaultMQPullConsumerImpl) DoRebalance

func (pullImpl *DefaultMQPullConsumerImpl) DoRebalance()

执行负载

func (*DefaultMQPullConsumerImpl) GroupName

func (pullImpl *DefaultMQPullConsumerImpl) GroupName() string

func (*DefaultMQPullConsumerImpl) IsSubscribeTopicNeedUpdate

func (pullImpl *DefaultMQPullConsumerImpl) IsSubscribeTopicNeedUpdate(topic string) bool

func (*DefaultMQPullConsumerImpl) IsUnitMode

func (pullImpl *DefaultMQPullConsumerImpl) IsUnitMode() bool

func (*DefaultMQPullConsumerImpl) MessageModel

func (pullImpl *DefaultMQPullConsumerImpl) MessageModel() heartbeat.MessageModel

func (*DefaultMQPullConsumerImpl) PersistConsumerOffset

func (pullImpl *DefaultMQPullConsumerImpl) PersistConsumerOffset()

持久化offset

func (*DefaultMQPullConsumerImpl) Start

func (pullImpl *DefaultMQPullConsumerImpl) Start()

func (*DefaultMQPullConsumerImpl) Subscriptions

func (pullImpl *DefaultMQPullConsumerImpl) Subscriptions() set.Set

获取订阅信息

func (*DefaultMQPullConsumerImpl) UpdateTopicSubscribeInfo

func (pullImpl *DefaultMQPullConsumerImpl) UpdateTopicSubscribeInfo(topic string, info set.Set)

当订阅信息改变时,更新订阅信息

type DefaultMQPushConsumer

type DefaultMQPushConsumer struct {
	// contains filtered or unexported fields
}

func NewDefaultMQPushConsumer

func NewDefaultMQPushConsumer(consumerGroup string) *DefaultMQPushConsumer

创建push消费结构体

func (*DefaultMQPushConsumer) RegisterMessageListener

func (pushConsumer *DefaultMQPushConsumer) RegisterMessageListener(messageListener listener.MessageListener)

注册监听器

func (*DefaultMQPushConsumer) SetConsumeFromWhere

func (pushConsumer *DefaultMQPushConsumer) SetConsumeFromWhere(consumeFromWhere heartbeat.ConsumeFromWhere)

设置从哪个位置开始消费

func (*DefaultMQPushConsumer) SetMessageModel

func (pushConsumer *DefaultMQPushConsumer) SetMessageModel(model heartbeat.MessageModel)

设置消费类型

func (*DefaultMQPushConsumer) SetNamesrvAddr

func (pushConsumer *DefaultMQPushConsumer) SetNamesrvAddr(namesrvAddr string)

设置namesrvaddr

func (*DefaultMQPushConsumer) Shutdown

func (pushConsumer *DefaultMQPushConsumer) Shutdown()

关闭消费服务

func (*DefaultMQPushConsumer) Start

func (pushConsumer *DefaultMQPushConsumer) Start()

启动消费服务

func (*DefaultMQPushConsumer) Subscribe

func (pushConsumer *DefaultMQPushConsumer) Subscribe(topic string, subExpression string)

订阅topic和tag

type DefaultMQPushConsumerImpl

type DefaultMQPushConsumerImpl struct {
	OffsetStore store.OffsetStore

	PullTimeDelayMillsWhenException   int
	PullTimeDelayMillsWhenSuspend     int
	BrokerSuspendMaxTimeMillis        int
	PullTimeDelayMillsWhenFlowControl int
	ConsumerTimeoutMillisWhenSuspend  int
	// contains filtered or unexported fields
}

func NewDefaultMQPushConsumerImpl

func NewDefaultMQPushConsumerImpl(defaultMQPushConsumer *DefaultMQPushConsumer) *DefaultMQPushConsumerImpl

func (*DefaultMQPushConsumerImpl) ConsumeFromWhere

func (pushConsumerImpl *DefaultMQPushConsumerImpl) ConsumeFromWhere() heartbeat.ConsumeFromWhere

func (*DefaultMQPushConsumerImpl) ConsumeType

func (pushConsumerImpl *DefaultMQPushConsumerImpl) ConsumeType() heartbeat.ConsumeType

func (*DefaultMQPushConsumerImpl) DoRebalance

func (pushConsumerImpl *DefaultMQPushConsumerImpl) DoRebalance()

执行负载

func (*DefaultMQPushConsumerImpl) ExecutePullRequestImmediately

func (pushConsumerImpl *DefaultMQPushConsumerImpl) ExecutePullRequestImmediately(pullRequest *consumer.PullRequest)

立即执行pull请求

func (*DefaultMQPushConsumerImpl) ExecutePullRequestLater

func (pushConsumerImpl *DefaultMQPushConsumerImpl) ExecutePullRequestLater(pullRequest *consumer.PullRequest, timeDelay int)

延迟执行pull请求

func (*DefaultMQPushConsumerImpl) GroupName

func (pushConsumerImpl *DefaultMQPushConsumerImpl) GroupName() string

func (*DefaultMQPushConsumerImpl) IsSubscribeTopicNeedUpdate

func (pushConsumerImpl *DefaultMQPushConsumerImpl) IsSubscribeTopicNeedUpdate(topic string) bool

func (*DefaultMQPushConsumerImpl) IsUnitMode

func (pushConsumerImpl *DefaultMQPushConsumerImpl) IsUnitMode() bool

func (*DefaultMQPushConsumerImpl) MessageModel

func (pushConsumerImpl *DefaultMQPushConsumerImpl) MessageModel() heartbeat.MessageModel

func (*DefaultMQPushConsumerImpl) PersistConsumerOffset

func (pushConsumerImpl *DefaultMQPushConsumerImpl) PersistConsumerOffset()

持久化消费offset

func (*DefaultMQPushConsumerImpl) Shutdown

func (pushConsumerImpl *DefaultMQPushConsumerImpl) Shutdown()

关闭

func (*DefaultMQPushConsumerImpl) Start

func (pushConsumerImpl *DefaultMQPushConsumerImpl) Start()

启动消费服务器

func (*DefaultMQPushConsumerImpl) Subscriptions

func (pushConsumerImpl *DefaultMQPushConsumerImpl) Subscriptions() set.Set

获取订阅信息

func (*DefaultMQPushConsumerImpl) UpdateTopicSubscribeInfo

func (pushConsumerImpl *DefaultMQPushConsumerImpl) UpdateTopicSubscribeInfo(topic string, info set.Set)

更新订阅信息

type FindBrokerResult

type FindBrokerResult struct {
	// contains filtered or unexported fields
}

type LocalFileOffsetStore

type LocalFileOffsetStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewLocalFileOffsetStore

func NewLocalFileOffsetStore(mQClientFactory *MQClientInstance, groupName string) *LocalFileOffsetStore

func (*LocalFileOffsetStore) Load

func (store *LocalFileOffsetStore) Load()

读取本地offset

func (*LocalFileOffsetStore) Persist

func (store *LocalFileOffsetStore) Persist(mq *message.MessageQueue)

持久化

func (*LocalFileOffsetStore) PersistAll

func (store *LocalFileOffsetStore) PersistAll(mqs set.Set)

持久化所有队列

func (*LocalFileOffsetStore) ReadOffset

读取本地offset的备份

func (*LocalFileOffsetStore) RemoveOffset

func (store *LocalFileOffsetStore) RemoveOffset(mq *message.MessageQueue)

删除offset

func (*LocalFileOffsetStore) UpdateOffset

func (store *LocalFileOffsetStore) UpdateOffset(mq *message.MessageQueue, offset int64, increaseOnly bool)

更新本地offset

type MQAdminImpl

type MQAdminImpl struct {
	// contains filtered or unexported fields
}

MQAdminImpl: 运维方法 Author: yintongqiang Since: 2017/8/13

func NewMQAdminImpl

func NewMQAdminImpl(clientFactory *MQClientInstance) *MQAdminImpl

func (*MQAdminImpl) CreateTopic

func (impl *MQAdminImpl) CreateTopic(key, newTopic string, queueNum, topicSysFlag int) error

func (*MQAdminImpl) FetchSubscribeMessageQueues

func (impl *MQAdminImpl) FetchSubscribeMessageQueues(topic string) []*message.MessageQueue

func (*MQAdminImpl) MaxOffset

func (impl *MQAdminImpl) MaxOffset(mq *message.MessageQueue) int64

type MQClientAPIImpl

type MQClientAPIImpl struct {
	DefalutRemotingClient   remoting.RemotingClient
	ClientRemotingProcessor *ClientRemotingProcessor
	ProjectGroupPrefix      string
}

MQClientAPIImpl: 内部使用核心处理api Author: yintongqiang Since: 2017/8/8

func NewMQClientAPIImpl

func NewMQClientAPIImpl(clientRemotingProcessor *ClientRemotingProcessor) *MQClientAPIImpl

func (*MQClientAPIImpl) CleanExpiredConsumeQueue

func (impl *MQClientAPIImpl) CleanExpiredConsumeQueue(brokerAddr string, timeoutMillis int64) (bool, error)

CleanExpiredConsumeQueue 触发清理失效的消费队列 Author: tianyuliang Since: 2017/11/6

func (*MQClientAPIImpl) CloneGroupOffset

func (impl *MQClientAPIImpl) CloneGroupOffset(brokerAddr, srcGroup, destGroup, topic string, isOffline bool, timeoutMillis int64) error

CloneGroupOffset 克隆消费组的偏移量 Author: tianyuliang Since: 2017/11/6

func (*MQClientAPIImpl) ConsumeMessageDirectly

func (impl *MQClientAPIImpl) ConsumeMessageDirectly(brokerAddr, consumerGroup, clientId, msgId string, timeoutMills int64) (*body.ConsumeMessageDirectlyResult, error)

ConsumeMessageDirectly Author: tianyuliang Since: 2017/11/6

func (*MQClientAPIImpl) CreateCustomTopic

func (impl *MQClientAPIImpl) CreateCustomTopic(addr, defaultTopic string, topicConfig stgcommon.TopicConfig, timeoutMillis int)

CreateCustomTopic 创建指定Topic Author: tianyuliang Since: 2017/11/1

func (*MQClientAPIImpl) CreateTopic

func (impl *MQClientAPIImpl) CreateTopic(brokerAddr, defaultTopic string, topicConfig *stgcommon.TopicConfig, timeoutMillis int) error

创建topic

func (*MQClientAPIImpl) DeleteSubscriptionGroup

func (impl *MQClientAPIImpl) DeleteSubscriptionGroup(brokerAddr, groupName string, timeoutMillis int64) error

DeleteSubscriptionGroup 删除订阅组信息 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) DeleteTopicInBroker

func (impl *MQClientAPIImpl) DeleteTopicInBroker(brokerAddr, topic string, timeoutMillis int64) error

DeleteTopicInBroker 删除broker节点上对应的Topic Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) DeleteTopicInNameServer

func (impl *MQClientAPIImpl) DeleteTopicInNameServer(namesrvAddr, topic string, timeoutMillis int64) error

DeleteTopicInNameServer 删除Namesrv维护的Topic Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) GetBrokerClusterInfo

func (impl *MQClientAPIImpl) GetBrokerClusterInfo(timeoutMillis int64) (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)

GetBrokerClusterInfo 查询Cluster集群信息 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) GetBrokerRuntimeInfo

func (impl *MQClientAPIImpl) GetBrokerRuntimeInfo(brokerAddr string, timeoutMillis int64) (*body.KVTable, error)

GetBrokerRuntimeInfo 查询Broker运行时状态信息 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) GetConsumeStats

func (impl *MQClientAPIImpl) GetConsumeStats(brokerAddr, consumerGroup string, timeoutMillis int64) (*admin.ConsumeStats, error)

GetConsumeStats 查询消费者状态 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) GetConsumeStatsByTopic

func (impl *MQClientAPIImpl) GetConsumeStatsByTopic(brokerAddr, consumerGroup, topic string, timeoutMillis int64) (*admin.ConsumeStats, error)

GetConsumeStats 查询消费者状态 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) GetConsumerConnectionList

func (impl *MQClientAPIImpl) GetConsumerConnectionList(brokerAddr, consumerGroup string, timeoutMillis int64) (*body.ConsumerConnectionPlus, int, error)

GetConsumerConnectionList 查询在线消费进程列表 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) GetConsumerIdListByGroup

func (impl *MQClientAPIImpl) GetConsumerIdListByGroup(addr string, consumerGroup string, timeoutMillis int64) []string

func (*MQClientAPIImpl) GetConsumerRunningInfo

func (impl *MQClientAPIImpl) GetConsumerRunningInfo(brokerAddr, consumerGroup, clientId string, jstack bool, timeoutMillis int64) (*body.ConsumerRunningInfo, error)

GetConsumerRunningInfo 获得consumer运行时状态信息 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) GetDefaultTopicRouteInfoFromNameServer

func (impl *MQClientAPIImpl) GetDefaultTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) *route.TopicRouteData

func (*MQClientAPIImpl) GetKVConfigValue

func (impl *MQClientAPIImpl) GetKVConfigValue(namespace, key string, timeoutMillis int64) (string, error)

GetKVConfigValue 获取配置信息 Author: tianyuliang Since: 2017/11/1

func (*MQClientAPIImpl) GetKVListByNamespace

func (impl *MQClientAPIImpl) GetKVListByNamespace(namespace string, timeoutMillis int64) (*body.KVTable, error)

GetKVConfigValue 获取配置信息 Author: tianyuliang Since: 2017/11/1

func (*MQClientAPIImpl) GetMaxOffset

func (impl *MQClientAPIImpl) GetMaxOffset(addr string, topic string, queueId int, timeoutMillis int64) int64

获取队列最大offset

func (*MQClientAPIImpl) GetNameServerAddressList

func (impl *MQClientAPIImpl) GetNameServerAddressList() []string

GetNameServerAddressList 获取最新namesrv地址列表 Author: tianyuliang Since: 2017/11/6

func (*MQClientAPIImpl) GetProducerConnectionList

func (impl *MQClientAPIImpl) GetProducerConnectionList(brokerAddr, producerGroup string, timeoutMillis int64) (*body.ProducerConnection, error)

GetProducerConnectionList 查询在线生产者进程信息 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) GetProjectGroupByIp

func (impl *MQClientAPIImpl) GetProjectGroupByIp(ip string, timeoutMillis int64) (string, error)

从namesrv查询客户端IP信息

func (*MQClientAPIImpl) GetTopicListFromNameServer

func (impl *MQClientAPIImpl) GetTopicListFromNameServer(timeoutMills int64) (*body.TopicList, error)

GetTopicListFromNameServer 从Namesrv查询所有Topic列表 Author: tianyuliang Since: 2017/11/1

func (*MQClientAPIImpl) GetTopicRouteInfoFromNameServer

func (impl *MQClientAPIImpl) GetTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) (*route.TopicRouteData, error)

func (*MQClientAPIImpl) GetTopicStatsInfo

func (impl *MQClientAPIImpl) GetTopicStatsInfo(brokerAddr, topic string, timeoutMillis int64) (*admin.TopicStatsTable, error)

GetTopicStatsInfo 查询Topic状态信息 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) GetTopicsByCluster

func (impl *MQClientAPIImpl) GetTopicsByCluster(clusterName string, timeoutMillis int64) (*body.TopicPlusList, error)

GetTopicsByCluster 查询集群信息 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) InvokeBrokerToGetConsumerStatus

func (impl *MQClientAPIImpl) InvokeBrokerToGetConsumerStatus(brokerAddr, topic, group, clientAddr string, timeoutMillis int64) (map[string]map[*message.MessageQueue]int64, error)

InvokeBrokerToGetConsumerStatus 反向查找broker中的consume状态 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) PullMessage

func (impl *MQClientAPIImpl) PullMessage(addr string, requestHeader header.PullMessageRequestHeader,
	timeoutMillis int, communicationMode CommunicationMode, pullCallback PullCallback) *PullResultExt

func (*MQClientAPIImpl) PutKVConfigValue

func (impl *MQClientAPIImpl) PutKVConfigValue(namespace, key, value string, timeoutMillis int64) error

putKVConfigValue 设置配置信息 Author: tianyuliang Since: 2017/11/1

func (*MQClientAPIImpl) QueryConsumeTimeSpan

func (impl *MQClientAPIImpl) QueryConsumeTimeSpan(brokerAddr, topic, consumerGroup string, timeoutMills int64) (set.Set, error)

QueryConsumeTimeSpan Author: tianyuliang Since: 2017/11/6

func (*MQClientAPIImpl) QueryTopicConsumeByWho

func (impl *MQClientAPIImpl) QueryTopicConsumeByWho(brokerAddr, topic string, timeoutMillis int64) (*body.GroupList, error)

QueryTopicConsumeByWho 查询topic被那些组消费 Author: tianyuliang Since: 2017/11/3

func (*MQClientAPIImpl) SendMessage

func (impl *MQClientAPIImpl) SendMessage(addr string, brokerName string, msg *message.Message, requestHeader header.SendMessageRequestHeader,
	timeoutMillis int64, communicationMode CommunicationMode, sendCallback SendCallback) (*SendResult, error)

func (*MQClientAPIImpl) Shutdwon

func (impl *MQClientAPIImpl) Shutdwon()

关闭romoting

func (*MQClientAPIImpl) Start

func (impl *MQClientAPIImpl) Start()

调用romoting的start

func (*MQClientAPIImpl) UpdateConsumerOffsetOneway

func (impl *MQClientAPIImpl) UpdateConsumerOffsetOneway(addr string, requestHeader header.UpdateConsumerOffsetRequestHeader, timeoutMillis int64)

func (*MQClientAPIImpl) UpdateNameServerAddressList

func (impl *MQClientAPIImpl) UpdateNameServerAddressList(addrs string)

func (*MQClientAPIImpl) ViewBrokerStatsData

func (impl *MQClientAPIImpl) ViewBrokerStatsData(brokerAddr, statsName, statsKey string, timeoutMillis int64) (*body.BrokerStatsData, error)

ViewBrokerStatsData 查询broker节点自身的状态信息 Author: tianyuliang Since: 2017/11/3

func (MQClientAPIImpl) ViewMessage

func (impl MQClientAPIImpl) ViewMessage(storeHost string, physicOffset uint64, timeoutMills int64) (*message.MessageExt, error)

ViewMessage Author: tianyuliang Since: 2017/11/9

func (*MQClientAPIImpl) WipeWritePermOfBroker

func (impl *MQClientAPIImpl) WipeWritePermOfBroker(namesrvAddr, brokerName string, timeoutMillis int64) (int, error)

WipeWritePermOfBroker 关闭broker写权限 Author: tianyuliang Since: 2017/11/3

type MQClientInstance

type MQClientInstance struct {
	ClientConfig            *stgclient.ClientConfig
	InstanceIndex           int32
	ClientId                string
	ProducerTable           *sync.Map // groupId<MQProducerInner>
	ConsumerTable           *sync.Map // groupId<MQConsumerInner>
	AdminExtTable           *sync.Map // groupId<MQAdminExtInner>
	MQClientAPIImpl         *MQClientAPIImpl
	MQAdminImpl             *MQAdminImpl
	TopicRouteTable         *sync.Map // topic<*TopicRouteData>
	LockNamesrv             lock.RWMutex
	LockHeartbeat           lock.RWMutex
	BrokerAddrTable         *sync.Map // broker name  map[int(brokerId)] string(address)
	ClientRemotingProcessor *ClientRemotingProcessor
	PullMessageService      *PullMessageService
	RebalanceService        *RebalanceService
	DefaultMQProducer       *DefaultMQProducer
	ServiceState            stgcommon.ServiceState
	TimerTask               set.Set
}

MQClientInstance: producer和consumer核心 Author: yintongqiang Since: 2017/8/8

func NewMQClientInstance

func NewMQClientInstance(clientConfig *stgclient.ClientConfig, instanceIndex int32, clientId string) *MQClientInstance

NewMQClientInstance: 初始化 Author: yintongqiang Since: 2017/8/10

func (*MQClientInstance) ConsumeMessageDirectly

func (self *MQClientInstance) ConsumeMessageDirectly(msg *message.MessageExt, consumerGroup, brokerName string) *body.ConsumeMessageDirectlyResult

func (*MQClientInstance) FindBrokerAddressInPublish

func (mqClientInstance *MQClientInstance) FindBrokerAddressInPublish(brokerName string) string

查找broker的master地址

func (*MQClientInstance) RegisterConsumer

func (mqClientInstance *MQClientInstance) RegisterConsumer(group string, consumer consumer.MQConsumerInner) bool

将生产者group和发送类保存到内存中

func (*MQClientInstance) RegisterProducer

func (mqClientInstance *MQClientInstance) RegisterProducer(group string, producer *DefaultMQProducerImpl) bool

将生产者group和发送类保存到内存中

func (*MQClientInstance) SendHeartbeatToAllBrokerWithLock

func (mqClientInstance *MQClientInstance) SendHeartbeatToAllBrokerWithLock()

向所有boker发送心跳

func (*MQClientInstance) Shutdown

func (mqClientInstance *MQClientInstance) Shutdown()

func (*MQClientInstance) Start

func (mqClientInstance *MQClientInstance) Start()

func (*MQClientInstance) StartScheduledTask

func (mqClientInstance *MQClientInstance) StartScheduledTask()

func (*MQClientInstance) UnregisterConsumer

func (mqClientInstance *MQClientInstance) UnregisterConsumer(group string)

注销消费者

func (*MQClientInstance) UnregisterProducer

func (mqClientInstance *MQClientInstance) UnregisterProducer(group string)

注销生产者

func (*MQClientInstance) UpdateTopicRouteInfoFromNameServer

func (mqClientInstance *MQClientInstance) UpdateTopicRouteInfoFromNameServer()

从nameserver更新路由信息

func (*MQClientInstance) UpdateTopicRouteInfoFromNameServerByArgs

func (mqClientInstance *MQClientInstance) UpdateTopicRouteInfoFromNameServerByArgs(topic string, isDefault bool, defaultMQProducer *DefaultMQProducer) bool

func (*MQClientInstance) UpdateTopicRouteInfoFromNameServerByTopic

func (mqClientInstance *MQClientInstance) UpdateTopicRouteInfoFromNameServerByTopic(topic string) bool

type MQClientManager

type MQClientManager struct {
	// clientId MQClientInstance
	FactoryTable          *syncMap.Map
	FactoryIndexGenerator int32
}

func GetInstance

func GetInstance() *MQClientManager

func NewMQClientManager

func NewMQClientManager() *MQClientManager

func (*MQClientManager) GetAndCreateMQClientInstance

func (mQClientManager *MQClientManager) GetAndCreateMQClientInstance(clientConfig *stgclient.ClientConfig) *MQClientInstance

从集合中查询MQClientInstance,无则创建一个

func (*MQClientManager) RemoveClientFactory

func (mQClientManager *MQClientManager) RemoveClientFactory(clientId string)

删除客户端

type MQProducer

type MQProducer interface {
	// 启动
	Start() error
	// 关闭
	Shutdown()
	// 同步发送消息
	Send(msg *message.Message) (*SendResult, error)
	// 只发送不处理
	SendOneWay(msg *message.Message) error
	// 异步发送
	SendCallBack(msg *message.Message, callback SendCallback) error
}

type MQProducerInner

type MQProducerInner interface {
	// 获取生产者topic信息列表
	GetPublishTopicList() set.Set
	// topic信息是否需要更新
	IsPublishTopicNeedUpdate(topic string) bool
	// 更新topic信息
	UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
}

type PullAPIWrapper

type PullAPIWrapper struct {
	// contains filtered or unexported fields
}

func NewPullAPIWrapper

func NewPullAPIWrapper(mQClientFactory *MQClientInstance, consumerGroup string, unitMode bool) *PullAPIWrapper

func (*PullAPIWrapper) PullKernelImpl

func (api *PullAPIWrapper) PullKernelImpl(mq *message.MessageQueue,
	subExpression string,
	subVersion int,
	offset int64,
	maxNums int,
	sysFlag int,
	commitOffset int64,
	brokerSuspendMaxTimeMillis int,
	timeoutMillis int,
	communicationMode CommunicationMode,
	pullCallback PullCallback) *PullResultExt

type PullCallBackImpl

type PullCallBackImpl struct {
	*heartbeat.SubscriptionData
	*consumer.PullRequest
	*DefaultMQPushConsumerImpl
	// contains filtered or unexported fields
}

func (*PullCallBackImpl) OnSuccess

func (backImpl *PullCallBackImpl) OnSuccess(pullResultExt *PullResultExt)

type PullCallback

type PullCallback interface {
	OnSuccess(pullResultExt *PullResultExt)
}

type PullMessageService

type PullMessageService struct {
	MQClientFactory  *MQClientInstance
	PullRequestQueue chan *consumer.PullRequest
	// contains filtered or unexported fields
}

func NewPullMessageService

func NewPullMessageService(mqClientFactory *MQClientInstance) *PullMessageService

func (*PullMessageService) ExecutePullRequestImmediately

func (service *PullMessageService) ExecutePullRequestImmediately(pullRequest *consumer.PullRequest)

向通道中加入pullRequest

func (*PullMessageService) ExecutePullRequestLater

func (service *PullMessageService) ExecutePullRequestLater(pullRequest *consumer.PullRequest, timeDelay int)

延迟执行pull请求

func (*PullMessageService) Shutdown

func (service *PullMessageService) Shutdown()

func (*PullMessageService) Start

func (service *PullMessageService) Start()

type PullResultExt

type PullResultExt struct {
	*consumer.PullResult
	// contains filtered or unexported fields
}

func NewPullResultExt

func NewPullResultExt(pullStatus consumer.PullStatus, nextBeginOffset int64, minOffset int64, maxOffset int64,
	msgFoundList []*message.MessageExt, suggestWhichBrokerId int64, messageBinary []byte) *PullResultExt

type RebalanceImpl

type RebalanceImpl interface {
	// 消费类型(主动或被动)
	ConsumeType() heartbeat.ConsumeType
	// 队列变更主要用于Schedule service for pull consumer todo 暂未实现
	MessageQueueChanged(topic string, mqAll set.Set, mqDivided set.Set)
	// 删除不必要的,非法的queue
	RemoveUnnecessaryMessageQueue(mq *message.MessageQueue, pq *consumer.ProcessQueue) bool
	// 分发拉取消息请求(拉取消息真正入口)
	DispatchPullRequest(pullRequestList []*consumer.PullRequest)
	// 计算队列拉取位置
	ComputePullFromWhere(mq *message.MessageQueue) int64
}

type RebalanceImplExt

type RebalanceImplExt struct {
	RebalanceImpl                RebalanceImpl
	ProcessQueueTable            *sync.Map //*MessageQueue, *ProcessQueue
	TopicSubscribeInfoTable      *sync.Map //topic  Set<*MessageQueue>
	SubscriptionInner            *sync.Map //topic, *SubscriptionData
	ConsumerGroup                string
	MessageModel                 heartbeat.MessageModel
	AllocateMessageQueueStrategy rebalance.AllocateMessageQueueStrategy
	MQClientFactory              *MQClientInstance
}

func NewRebalanceImplExt

func NewRebalanceImplExt(rebalanceImpl RebalanceImpl) *RebalanceImplExt

func (*RebalanceImplExt) RemoveProcessQueue

func (ext *RebalanceImplExt) RemoveProcessQueue(mq *message.MessageQueue)

删除内存中非法的消费处理队列

type RebalancePullImpl

type RebalancePullImpl struct {
	*RebalanceImplExt
	// contains filtered or unexported fields
}

func NewRebalancePullImpl

func NewRebalancePullImpl(defaultMQPullConsumerImpl *DefaultMQPullConsumerImpl) *RebalancePullImpl

func (*RebalancePullImpl) ComputePullFromWhere

func (pullImpl *RebalancePullImpl) ComputePullFromWhere(mq *message.MessageQueue) int64

func (*RebalancePullImpl) ConsumeType

func (pullImpl *RebalancePullImpl) ConsumeType() heartbeat.ConsumeType

func (*RebalancePullImpl) DispatchPullRequest

func (pullImpl *RebalancePullImpl) DispatchPullRequest(pullRequestList []*consumer.PullRequest)

func (*RebalancePullImpl) MessageQueueChanged

func (pullImpl *RebalancePullImpl) MessageQueueChanged(topic string, mqAll set.Set, mqDivided set.Set)

func (*RebalancePullImpl) RemoveUnnecessaryMessageQueue

func (pullImpl *RebalancePullImpl) RemoveUnnecessaryMessageQueue(mq *message.MessageQueue, pq *consumer.ProcessQueue) bool

type RebalancePushImpl

type RebalancePushImpl struct {
	// contains filtered or unexported fields
}

func NewRebalancePushImpl

func NewRebalancePushImpl(defaultMQPushConsumerImpl *DefaultMQPushConsumerImpl) *RebalancePushImpl

func (*RebalancePushImpl) ComputePullFromWhere

func (pushImpl *RebalancePushImpl) ComputePullFromWhere(mq *message.MessageQueue) int64

func (*RebalancePushImpl) ConsumeType

func (pushImpl *RebalancePushImpl) ConsumeType() heartbeat.ConsumeType

func (*RebalancePushImpl) DispatchPullRequest

func (pushImpl *RebalancePushImpl) DispatchPullRequest(pullRequestList []*consumer.PullRequest)

func (*RebalancePushImpl) MessageQueueChanged

func (pushImpl *RebalancePushImpl) MessageQueueChanged(topic string, mqAll set.Set, mqDivided set.Set)

func (*RebalancePushImpl) RemoveUnnecessaryMessageQueue

func (pushImpl *RebalancePushImpl) RemoveUnnecessaryMessageQueue(mq *message.MessageQueue, pq *consumer.ProcessQueue) bool

type RebalanceService

type RebalanceService struct {
	MQClientFactory *MQClientInstance
	WaitInterval    int //单位秒

	Wakeup chan bool
	// contains filtered or unexported fields
}

func NewRebalanceService

func NewRebalanceService(mqClientFactory *MQClientInstance) *RebalanceService

func (*RebalanceService) Shutdown

func (service *RebalanceService) Shutdown()

func (*RebalanceService) Start

func (service *RebalanceService) Start()

type RemoteBrokerOffsetStore

type RemoteBrokerOffsetStore struct {
	// contains filtered or unexported fields
}

func NewRemoteBrokerOffsetStore

func NewRemoteBrokerOffsetStore(mQClientFactory *MQClientInstance, groupName string) *RemoteBrokerOffsetStore

func (*RemoteBrokerOffsetStore) Load

func (store *RemoteBrokerOffsetStore) Load()

func (*RemoteBrokerOffsetStore) Persist

func (store *RemoteBrokerOffsetStore) Persist(mq *message.MessageQueue)

func (*RemoteBrokerOffsetStore) PersistAll

func (store *RemoteBrokerOffsetStore) PersistAll(mqs set.Set)

func (*RemoteBrokerOffsetStore) ReadOffset

func (rStore *RemoteBrokerOffsetStore) ReadOffset(mq *message.MessageQueue, rType store.ReadOffsetType) int64

func (*RemoteBrokerOffsetStore) RemoveOffset

func (store *RemoteBrokerOffsetStore) RemoveOffset(mq *message.MessageQueue)

func (*RemoteBrokerOffsetStore) UpdateOffset

func (store *RemoteBrokerOffsetStore) UpdateOffset(mq *message.MessageQueue, offset int64, increaseOnly bool)

type SendCallback

type SendCallback func(sendResult *SendResult, err error)
type SendCallback interface {
	OnSuccess(sendResult *SendResult)
}

type SendResult

type SendResult struct {
	SendStatus    SendStatus
	MsgId         string
	MessageQueue  *message.MessageQueue
	QueueOffset   int64
	TransactionId string
}

func NewSendResult

func NewSendResult(sendStatus SendStatus, msgId string, messageQueue *message.MessageQueue, queueOffset int64, projectGroupPrefix string) *SendResult

func (*SendResult) ToString

func (sendResult *SendResult) ToString() string

type SendStatus

type SendStatus int
const (
	SEND_OK SendStatus = iota
	FLUSH_DISK_TIMEOUT
	FLUSH_SLAVE_TIMEOUT
	SLAVE_NOT_AVAILABLE
)

func (SendStatus) String

func (status SendStatus) String() string

type TopicPublishInfo

type TopicPublishInfo struct {
	OrderTopic          bool
	HaveTopicRouterInfo bool
	MessageQueueList    []*message.MessageQueue
	SendWhichQueue      int64
}

TopicPublishInfo: topic发布信息 Author: yintongqiang Since: 2017/8/10

func NewTopicPublishInfo

func NewTopicPublishInfo() *TopicPublishInfo

func (*TopicPublishInfo) SelectOneMessageQueue

func (topicPublishInfo *TopicPublishInfo) SelectOneMessageQueue(lastBrokerName string) *message.MessageQueue

取模获取选择队列

func (*TopicPublishInfo) ToString

func (self *TopicPublishInfo) ToString() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL