Documentation ¶
Index ¶
- Constants
- func CheckGroup(group string) error
- func CheckMessage(msg *message.Message, defaultMQProducer DefaultMQProducer)
- func CheckTopic(topic string)
- type ClientRemotingProcessor
- type CommunicationMode
- type ConsumeMessageConcurrentlyService
- func (service *ConsumeMessageConcurrentlyService) ConsumeMessageDirectly(msg *message.MessageExt, brokerName string) *body.ConsumeMessageDirectlyResult
- func (service *ConsumeMessageConcurrentlyService) Shutdown()
- func (service *ConsumeMessageConcurrentlyService) Start()
- func (service *ConsumeMessageConcurrentlyService) SubmitConsumeRequest(msgs []*message.MessageExt, processQueue *consumer.ProcessQueue, ...)
- type ConsumeMessageService
- type DefaultMQProducer
- func (defaultMQProducer *DefaultMQProducer) CreateTopic(key, newTopic string, queueNum int)
- func (defaultMQProducer *DefaultMQProducer) Send(msg *message.Message) (*SendResult, error)
- func (defaultMQProducer *DefaultMQProducer) SendCallBack(msg *message.Message, callback SendCallback) error
- func (defaultMQProducer *DefaultMQProducer) SendOneWay(msg *message.Message) error
- func (defaultMQProducer *DefaultMQProducer) SetNamesrvAddr(namesrvAddr string)
- func (defaultMQProducer *DefaultMQProducer) Shutdown()
- func (defaultMQProducer *DefaultMQProducer) Start()
- type DefaultMQProducerImpl
- func (defaultMQProducerImpl *DefaultMQProducerImpl) CreateTopic(key, newTopic string, queueNum int)
- func (defaultMQProducerImpl *DefaultMQProducerImpl) CreateTopicByFlag(key, newTopic string, queueNum, topicSysFlag int)
- func (defaultMQProducerImpl *DefaultMQProducerImpl) GetPublishTopicList() set.Set
- func (defaultMQProducerImpl *DefaultMQProducerImpl) IsPublishTopicNeedUpdate(topic string) bool
- func (defaultMQProducerImpl *DefaultMQProducerImpl) SendByTimeout(msg *message.Message, timeout int64) (*SendResult, error)
- func (defaultMQProducerImpl *DefaultMQProducerImpl) Shutdown()
- func (defaultMQProducerImpl *DefaultMQProducerImpl) ShutdownFlag(shutdownFactory bool)
- func (defaultMQProducerImpl *DefaultMQProducerImpl) StartFlag(startFactory bool) error
- func (defaultMQProducerImpl *DefaultMQProducerImpl) UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
- type DefaultMQPullConsumer
- func (pullConsumer *DefaultMQPullConsumer) FetchSubscribeMessageQueues(topic string) []*message.MessageQueue
- func (pullConsumer *DefaultMQPullConsumer) Pull(mq *message.MessageQueue, subExpression string, offset int64, maxNums int) (*consumer.PullResult, error)
- func (pullConsumer *DefaultMQPullConsumer) SetNamesrvAddr(namesrvAddr string)
- func (pullConsumer *DefaultMQPullConsumer) Shutdown()
- func (pullConsumer *DefaultMQPullConsumer) Start()
- type DefaultMQPullConsumerImpl
- func (pullImpl *DefaultMQPullConsumerImpl) ConsumeFromWhere() heartbeat.ConsumeFromWhere
- func (pullImpl *DefaultMQPullConsumerImpl) ConsumeType() heartbeat.ConsumeType
- func (pullImpl *DefaultMQPullConsumerImpl) DoRebalance()
- func (pullImpl *DefaultMQPullConsumerImpl) GroupName() string
- func (pullImpl *DefaultMQPullConsumerImpl) IsSubscribeTopicNeedUpdate(topic string) bool
- func (pullImpl *DefaultMQPullConsumerImpl) IsUnitMode() bool
- func (pullImpl *DefaultMQPullConsumerImpl) MessageModel() heartbeat.MessageModel
- func (pullImpl *DefaultMQPullConsumerImpl) PersistConsumerOffset()
- func (pullImpl *DefaultMQPullConsumerImpl) Start()
- func (pullImpl *DefaultMQPullConsumerImpl) Subscriptions() set.Set
- func (pullImpl *DefaultMQPullConsumerImpl) UpdateTopicSubscribeInfo(topic string, info set.Set)
- type DefaultMQPushConsumer
- func (pushConsumer *DefaultMQPushConsumer) RegisterMessageListener(messageListener listener.MessageListener)
- func (pushConsumer *DefaultMQPushConsumer) SetConsumeFromWhere(consumeFromWhere heartbeat.ConsumeFromWhere)
- func (pushConsumer *DefaultMQPushConsumer) SetMessageModel(model heartbeat.MessageModel)
- func (pushConsumer *DefaultMQPushConsumer) SetNamesrvAddr(namesrvAddr string)
- func (pushConsumer *DefaultMQPushConsumer) Shutdown()
- func (pushConsumer *DefaultMQPushConsumer) Start()
- func (pushConsumer *DefaultMQPushConsumer) Subscribe(topic string, subExpression string)
- type DefaultMQPushConsumerImpl
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) ConsumeFromWhere() heartbeat.ConsumeFromWhere
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) ConsumeType() heartbeat.ConsumeType
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) DoRebalance()
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) ExecutePullRequestImmediately(pullRequest *consumer.PullRequest)
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) ExecutePullRequestLater(pullRequest *consumer.PullRequest, timeDelay int)
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) GroupName() string
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) IsSubscribeTopicNeedUpdate(topic string) bool
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) IsUnitMode() bool
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) MessageModel() heartbeat.MessageModel
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) PersistConsumerOffset()
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) Shutdown()
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) Start()
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) Subscriptions() set.Set
- func (pushConsumerImpl *DefaultMQPushConsumerImpl) UpdateTopicSubscribeInfo(topic string, info set.Set)
- type FindBrokerResult
- type LocalFileOffsetStore
- func (store *LocalFileOffsetStore) Load()
- func (store *LocalFileOffsetStore) Persist(mq *message.MessageQueue)
- func (store *LocalFileOffsetStore) PersistAll(mqs set.Set)
- func (store *LocalFileOffsetStore) ReadOffset(mq *message.MessageQueue, rType baseStore.ReadOffsetType) int64
- func (store *LocalFileOffsetStore) RemoveOffset(mq *message.MessageQueue)
- func (store *LocalFileOffsetStore) UpdateOffset(mq *message.MessageQueue, offset int64, increaseOnly bool)
- type MQAdminImpl
- type MQClientAPIImpl
- func (impl *MQClientAPIImpl) CleanExpiredConsumeQueue(brokerAddr string, timeoutMillis int64) (bool, error)
- func (impl *MQClientAPIImpl) CloneGroupOffset(brokerAddr, srcGroup, destGroup, topic string, isOffline bool, ...) error
- func (impl *MQClientAPIImpl) ConsumeMessageDirectly(brokerAddr, consumerGroup, clientId, msgId string, timeoutMills int64) (*body.ConsumeMessageDirectlyResult, error)
- func (impl *MQClientAPIImpl) CreateCustomTopic(addr, defaultTopic string, topicConfig stgcommon.TopicConfig, ...)
- func (impl *MQClientAPIImpl) CreateTopic(brokerAddr, defaultTopic string, topicConfig *stgcommon.TopicConfig, ...) error
- func (impl *MQClientAPIImpl) DeleteSubscriptionGroup(brokerAddr, groupName string, timeoutMillis int64) error
- func (impl *MQClientAPIImpl) DeleteTopicInBroker(brokerAddr, topic string, timeoutMillis int64) error
- func (impl *MQClientAPIImpl) DeleteTopicInNameServer(namesrvAddr, topic string, timeoutMillis int64) error
- func (impl *MQClientAPIImpl) GetBrokerClusterInfo(timeoutMillis int64) (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)
- func (impl *MQClientAPIImpl) GetBrokerRuntimeInfo(brokerAddr string, timeoutMillis int64) (*body.KVTable, error)
- func (impl *MQClientAPIImpl) GetConsumeStats(brokerAddr, consumerGroup string, timeoutMillis int64) (*admin.ConsumeStats, error)
- func (impl *MQClientAPIImpl) GetConsumeStatsByTopic(brokerAddr, consumerGroup, topic string, timeoutMillis int64) (*admin.ConsumeStats, error)
- func (impl *MQClientAPIImpl) GetConsumerConnectionList(brokerAddr, consumerGroup string, timeoutMillis int64) (*body.ConsumerConnectionPlus, int, error)
- func (impl *MQClientAPIImpl) GetConsumerIdListByGroup(addr string, consumerGroup string, timeoutMillis int64) []string
- func (impl *MQClientAPIImpl) GetConsumerRunningInfo(brokerAddr, consumerGroup, clientId string, jstack bool, timeoutMillis int64) (*body.ConsumerRunningInfo, error)
- func (impl *MQClientAPIImpl) GetDefaultTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) *route.TopicRouteData
- func (impl *MQClientAPIImpl) GetKVConfigValue(namespace, key string, timeoutMillis int64) (string, error)
- func (impl *MQClientAPIImpl) GetKVListByNamespace(namespace string, timeoutMillis int64) (*body.KVTable, error)
- func (impl *MQClientAPIImpl) GetMaxOffset(addr string, topic string, queueId int, timeoutMillis int64) int64
- func (impl *MQClientAPIImpl) GetNameServerAddressList() []string
- func (impl *MQClientAPIImpl) GetProducerConnectionList(brokerAddr, producerGroup string, timeoutMillis int64) (*body.ProducerConnection, error)
- func (impl *MQClientAPIImpl) GetProjectGroupByIp(ip string, timeoutMillis int64) (string, error)
- func (impl *MQClientAPIImpl) GetTopicListFromNameServer(timeoutMills int64) (*body.TopicList, error)
- func (impl *MQClientAPIImpl) GetTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) (*route.TopicRouteData, error)
- func (impl *MQClientAPIImpl) GetTopicStatsInfo(brokerAddr, topic string, timeoutMillis int64) (*admin.TopicStatsTable, error)
- func (impl *MQClientAPIImpl) GetTopicsByCluster(clusterName string, timeoutMillis int64) (*body.TopicPlusList, error)
- func (impl *MQClientAPIImpl) InvokeBrokerToGetConsumerStatus(brokerAddr, topic, group, clientAddr string, timeoutMillis int64) (map[string]map[*message.MessageQueue]int64, error)
- func (impl *MQClientAPIImpl) PullMessage(addr string, requestHeader header.PullMessageRequestHeader, timeoutMillis int, ...) *PullResultExt
- func (impl *MQClientAPIImpl) PutKVConfigValue(namespace, key, value string, timeoutMillis int64) error
- func (impl *MQClientAPIImpl) QueryConsumeTimeSpan(brokerAddr, topic, consumerGroup string, timeoutMills int64) (set.Set, error)
- func (impl *MQClientAPIImpl) QueryTopicConsumeByWho(brokerAddr, topic string, timeoutMillis int64) (*body.GroupList, error)
- func (impl *MQClientAPIImpl) SendMessage(addr string, brokerName string, msg *message.Message, ...) (*SendResult, error)
- func (impl *MQClientAPIImpl) Shutdwon()
- func (impl *MQClientAPIImpl) Start()
- func (impl *MQClientAPIImpl) UpdateConsumerOffsetOneway(addr string, requestHeader header.UpdateConsumerOffsetRequestHeader, ...)
- func (impl *MQClientAPIImpl) UpdateNameServerAddressList(addrs string)
- func (impl *MQClientAPIImpl) ViewBrokerStatsData(brokerAddr, statsName, statsKey string, timeoutMillis int64) (*body.BrokerStatsData, error)
- func (impl MQClientAPIImpl) ViewMessage(storeHost string, physicOffset uint64, timeoutMills int64) (*message.MessageExt, error)
- func (impl *MQClientAPIImpl) WipeWritePermOfBroker(namesrvAddr, brokerName string, timeoutMillis int64) (int, error)
- type MQClientInstance
- func (self *MQClientInstance) ConsumeMessageDirectly(msg *message.MessageExt, consumerGroup, brokerName string) *body.ConsumeMessageDirectlyResult
- func (mqClientInstance *MQClientInstance) FindBrokerAddressInPublish(brokerName string) string
- func (mqClientInstance *MQClientInstance) RegisterConsumer(group string, consumer consumer.MQConsumerInner) bool
- func (mqClientInstance *MQClientInstance) RegisterProducer(group string, producer *DefaultMQProducerImpl) bool
- func (mqClientInstance *MQClientInstance) SendHeartbeatToAllBrokerWithLock()
- func (mqClientInstance *MQClientInstance) Shutdown()
- func (mqClientInstance *MQClientInstance) Start()
- func (mqClientInstance *MQClientInstance) StartScheduledTask()
- func (mqClientInstance *MQClientInstance) UnregisterConsumer(group string)
- func (mqClientInstance *MQClientInstance) UnregisterProducer(group string)
- func (mqClientInstance *MQClientInstance) UpdateTopicRouteInfoFromNameServer()
- func (mqClientInstance *MQClientInstance) UpdateTopicRouteInfoFromNameServerByArgs(topic string, isDefault bool, defaultMQProducer *DefaultMQProducer) bool
- func (mqClientInstance *MQClientInstance) UpdateTopicRouteInfoFromNameServerByTopic(topic string) bool
- type MQClientManager
- type MQProducer
- type MQProducerInner
- type PullAPIWrapper
- type PullCallBackImpl
- type PullCallback
- type PullMessageService
- type PullResultExt
- type RebalanceImpl
- type RebalanceImplExt
- type RebalancePullImpl
- func (pullImpl *RebalancePullImpl) ComputePullFromWhere(mq *message.MessageQueue) int64
- func (pullImpl *RebalancePullImpl) ConsumeType() heartbeat.ConsumeType
- func (pullImpl *RebalancePullImpl) DispatchPullRequest(pullRequestList []*consumer.PullRequest)
- func (pullImpl *RebalancePullImpl) MessageQueueChanged(topic string, mqAll set.Set, mqDivided set.Set)
- func (pullImpl *RebalancePullImpl) RemoveUnnecessaryMessageQueue(mq *message.MessageQueue, pq *consumer.ProcessQueue) bool
- type RebalancePushImpl
- func (pushImpl *RebalancePushImpl) ComputePullFromWhere(mq *message.MessageQueue) int64
- func (pushImpl *RebalancePushImpl) ConsumeType() heartbeat.ConsumeType
- func (pushImpl *RebalancePushImpl) DispatchPullRequest(pullRequestList []*consumer.PullRequest)
- func (pushImpl *RebalancePushImpl) MessageQueueChanged(topic string, mqAll set.Set, mqDivided set.Set)
- func (pushImpl *RebalancePushImpl) RemoveUnnecessaryMessageQueue(mq *message.MessageQueue, pq *consumer.ProcessQueue) bool
- type RebalanceService
- type RemoteBrokerOffsetStore
- func (store *RemoteBrokerOffsetStore) Load()
- func (store *RemoteBrokerOffsetStore) Persist(mq *message.MessageQueue)
- func (store *RemoteBrokerOffsetStore) PersistAll(mqs set.Set)
- func (rStore *RemoteBrokerOffsetStore) ReadOffset(mq *message.MessageQueue, rType store.ReadOffsetType) int64
- func (store *RemoteBrokerOffsetStore) RemoveOffset(mq *message.MessageQueue)
- func (store *RemoteBrokerOffsetStore) UpdateOffset(mq *message.MessageQueue, offset int64, increaseOnly bool)
- type SendCallback
- type SendResult
- type SendStatus
- type TopicPublishInfo
Constants ¶
const ( CHARACTER_MAX_LENGTH = 255 VALID_PATTERN_STR = "^[%|a-zA-Z0-9_-]+$" )
Variables ¶
This section is empty.
Functions ¶
func CheckGroup ¶
func CheckMessage ¶
func CheckMessage(msg *message.Message, defaultMQProducer DefaultMQProducer)
func CheckTopic ¶
func CheckTopic(topic string)
Types ¶
type ClientRemotingProcessor ¶
type ClientRemotingProcessor struct {
MQClientFactory *MQClientInstance
}
客户端处理器 Author: yintongqiang Since: 2017/8/8
func NewClientRemotingProcessor ¶
func NewClientRemotingProcessor(mqClientFactory *MQClientInstance) *ClientRemotingProcessor
func (*ClientRemotingProcessor) ProcessRequest ¶
func (self *ClientRemotingProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
处理request
type CommunicationMode ¶
type CommunicationMode int
const ( SYNC CommunicationMode = iota ASYNC ONEWAY )
func (CommunicationMode) String ¶
func (cm CommunicationMode) String() string
type ConsumeMessageConcurrentlyService ¶
type ConsumeMessageConcurrentlyService struct {
// contains filtered or unexported fields
}
func NewConsumeMessageConcurrentlyService ¶
func NewConsumeMessageConcurrentlyService(defaultMQPushConsumerImpl *DefaultMQPushConsumerImpl, messageListener consumer.MessageListenerConcurrently) *ConsumeMessageConcurrentlyService
func (*ConsumeMessageConcurrentlyService) ConsumeMessageDirectly ¶
func (service *ConsumeMessageConcurrentlyService) ConsumeMessageDirectly(msg *message.MessageExt, brokerName string) *body.ConsumeMessageDirectlyResult
func (*ConsumeMessageConcurrentlyService) Shutdown ¶
func (service *ConsumeMessageConcurrentlyService) Shutdown()
func (*ConsumeMessageConcurrentlyService) Start ¶
func (service *ConsumeMessageConcurrentlyService) Start()
仅仅为了实现接口
func (*ConsumeMessageConcurrentlyService) SubmitConsumeRequest ¶
func (service *ConsumeMessageConcurrentlyService) SubmitConsumeRequest(msgs []*message.MessageExt, processQueue *consumer.ProcessQueue, messageQueue *message.MessageQueue, dispathToConsume bool)
提交消费请求
type ConsumeMessageService ¶
type ConsumeMessageService interface { Start() // 开启 Shutdown() // 关闭 ConsumeMessageDirectly(msg *message.MessageExt, brokerName string) *body.ConsumeMessageDirectlyResult SubmitConsumeRequest(msgs []*message.MessageExt, processQueue *consumer.ProcessQueue, messageQueue *message.MessageQueue, dispathToConsume bool) // 提交消费请求 }
ConsumeMessageService: 消费消息服务接口 Author: yintongqiang Since: 2017/8/11
type DefaultMQProducer ¶
type DefaultMQProducer struct { DefaultMQProducerImpl *DefaultMQProducerImpl ProducerGroup string CreateTopicKey string DefaultTopicQueueNums int SendMsgTimeout int64 CompressMsgBodyOverHowmuch int RetryTimesWhenSendFailed int32 RetryAnotherBrokerWhenNotStoreOK bool MaxMessageSize int UnitMode bool ClientConfig *stgclient.ClientConfig }
func NewDefaultMQProducer ¶
func NewDefaultMQProducer(producerGroup string) *DefaultMQProducer
func (*DefaultMQProducer) CreateTopic ¶
func (defaultMQProducer *DefaultMQProducer) CreateTopic(key, newTopic string, queueNum int)
对外提供创建topic方法
func (*DefaultMQProducer) Send ¶
func (defaultMQProducer *DefaultMQProducer) Send(msg *message.Message) (*SendResult, error)
发送同步消息
func (*DefaultMQProducer) SendCallBack ¶
func (defaultMQProducer *DefaultMQProducer) SendCallBack(msg *message.Message, callback SendCallback) error
发送callback消息
func (*DefaultMQProducer) SendOneWay ¶
func (defaultMQProducer *DefaultMQProducer) SendOneWay(msg *message.Message) error
发送sendOneWay消息
func (*DefaultMQProducer) SetNamesrvAddr ¶
func (defaultMQProducer *DefaultMQProducer) SetNamesrvAddr(namesrvAddr string)
func (*DefaultMQProducer) Shutdown ¶
func (defaultMQProducer *DefaultMQProducer) Shutdown()
func (*DefaultMQProducer) Start ¶
func (defaultMQProducer *DefaultMQProducer) Start()
type DefaultMQProducerImpl ¶
type DefaultMQProducerImpl struct { DefaultMQProducer *DefaultMQProducer TopicPublishInfoTable *sync.Map ServiceState stgcommon.ServiceState MQClientFactory *MQClientInstance }
func NewDefaultMQProducerImpl ¶
func NewDefaultMQProducerImpl(defaultMQProducer *DefaultMQProducer) *DefaultMQProducerImpl
func (*DefaultMQProducerImpl) CreateTopic ¶
func (defaultMQProducerImpl *DefaultMQProducerImpl) CreateTopic(key, newTopic string, queueNum int)
对外提供创建topic方法
func (*DefaultMQProducerImpl) CreateTopicByFlag ¶
func (defaultMQProducerImpl *DefaultMQProducerImpl) CreateTopicByFlag(key, newTopic string, queueNum, topicSysFlag int)
对外提供创建topic方法
func (*DefaultMQProducerImpl) GetPublishTopicList ¶
func (defaultMQProducerImpl *DefaultMQProducerImpl) GetPublishTopicList() set.Set
获取topic发布集合
func (*DefaultMQProducerImpl) IsPublishTopicNeedUpdate ¶
func (defaultMQProducerImpl *DefaultMQProducerImpl) IsPublishTopicNeedUpdate(topic string) bool
是否需要更新topic信息
func (*DefaultMQProducerImpl) SendByTimeout ¶
func (defaultMQProducerImpl *DefaultMQProducerImpl) SendByTimeout(msg *message.Message, timeout int64) (*SendResult, error)
带timeout的发送消息
func (*DefaultMQProducerImpl) Shutdown ¶
func (defaultMQProducerImpl *DefaultMQProducerImpl) Shutdown()
func (*DefaultMQProducerImpl) ShutdownFlag ¶
func (defaultMQProducerImpl *DefaultMQProducerImpl) ShutdownFlag(shutdownFactory bool)
func (*DefaultMQProducerImpl) StartFlag ¶
func (defaultMQProducerImpl *DefaultMQProducerImpl) StartFlag(startFactory bool) error
生产启动方法
func (*DefaultMQProducerImpl) UpdateTopicPublishInfo ¶
func (defaultMQProducerImpl *DefaultMQProducerImpl) UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
更新topic信息
type DefaultMQPullConsumer ¶
type DefaultMQPullConsumer struct {
// contains filtered or unexported fields
}
DefaultMQPullConsumer: 手动拉取消息 Author: yintongqiang Since: 2017/8/17
func NewDefaultMQPullConsumer ¶
func NewDefaultMQPullConsumer(consumerGroup string) *DefaultMQPullConsumer
func (*DefaultMQPullConsumer) FetchSubscribeMessageQueues ¶
func (pullConsumer *DefaultMQPullConsumer) FetchSubscribeMessageQueues(topic string) []*message.MessageQueue
func (*DefaultMQPullConsumer) Pull ¶
func (pullConsumer *DefaultMQPullConsumer) Pull(mq *message.MessageQueue, subExpression string, offset int64, maxNums int) (*consumer.PullResult, error)
func (*DefaultMQPullConsumer) SetNamesrvAddr ¶
func (pullConsumer *DefaultMQPullConsumer) SetNamesrvAddr(namesrvAddr string)
设置namesrvaddr
func (*DefaultMQPullConsumer) Shutdown ¶
func (pullConsumer *DefaultMQPullConsumer) Shutdown()
func (*DefaultMQPullConsumer) Start ¶
func (pullConsumer *DefaultMQPullConsumer) Start()
type DefaultMQPullConsumerImpl ¶
type DefaultMQPullConsumerImpl struct { OffsetStore store.OffsetStore RebalanceImpl RebalanceImpl // contains filtered or unexported fields }
func NewDefaultMQPullConsumerImpl ¶
func NewDefaultMQPullConsumerImpl(defaultMQPullConsumer *DefaultMQPullConsumer) *DefaultMQPullConsumerImpl
func (*DefaultMQPullConsumerImpl) ConsumeFromWhere ¶
func (pullImpl *DefaultMQPullConsumerImpl) ConsumeFromWhere() heartbeat.ConsumeFromWhere
func (*DefaultMQPullConsumerImpl) ConsumeType ¶
func (pullImpl *DefaultMQPullConsumerImpl) ConsumeType() heartbeat.ConsumeType
func (*DefaultMQPullConsumerImpl) DoRebalance ¶
func (pullImpl *DefaultMQPullConsumerImpl) DoRebalance()
执行负载
func (*DefaultMQPullConsumerImpl) GroupName ¶
func (pullImpl *DefaultMQPullConsumerImpl) GroupName() string
func (*DefaultMQPullConsumerImpl) IsSubscribeTopicNeedUpdate ¶
func (pullImpl *DefaultMQPullConsumerImpl) IsSubscribeTopicNeedUpdate(topic string) bool
func (*DefaultMQPullConsumerImpl) IsUnitMode ¶
func (pullImpl *DefaultMQPullConsumerImpl) IsUnitMode() bool
func (*DefaultMQPullConsumerImpl) MessageModel ¶
func (pullImpl *DefaultMQPullConsumerImpl) MessageModel() heartbeat.MessageModel
func (*DefaultMQPullConsumerImpl) PersistConsumerOffset ¶
func (pullImpl *DefaultMQPullConsumerImpl) PersistConsumerOffset()
持久化offset
func (*DefaultMQPullConsumerImpl) Start ¶
func (pullImpl *DefaultMQPullConsumerImpl) Start()
func (*DefaultMQPullConsumerImpl) Subscriptions ¶
func (pullImpl *DefaultMQPullConsumerImpl) Subscriptions() set.Set
获取订阅信息
func (*DefaultMQPullConsumerImpl) UpdateTopicSubscribeInfo ¶
func (pullImpl *DefaultMQPullConsumerImpl) UpdateTopicSubscribeInfo(topic string, info set.Set)
当订阅信息改变时,更新订阅信息
type DefaultMQPushConsumer ¶
type DefaultMQPushConsumer struct {
// contains filtered or unexported fields
}
func NewDefaultMQPushConsumer ¶
func NewDefaultMQPushConsumer(consumerGroup string) *DefaultMQPushConsumer
创建push消费结构体
func (*DefaultMQPushConsumer) RegisterMessageListener ¶
func (pushConsumer *DefaultMQPushConsumer) RegisterMessageListener(messageListener listener.MessageListener)
注册监听器
func (*DefaultMQPushConsumer) SetConsumeFromWhere ¶
func (pushConsumer *DefaultMQPushConsumer) SetConsumeFromWhere(consumeFromWhere heartbeat.ConsumeFromWhere)
设置从哪个位置开始消费
func (*DefaultMQPushConsumer) SetMessageModel ¶
func (pushConsumer *DefaultMQPushConsumer) SetMessageModel(model heartbeat.MessageModel)
设置消费类型
func (*DefaultMQPushConsumer) SetNamesrvAddr ¶
func (pushConsumer *DefaultMQPushConsumer) SetNamesrvAddr(namesrvAddr string)
设置namesrvaddr
func (*DefaultMQPushConsumer) Shutdown ¶
func (pushConsumer *DefaultMQPushConsumer) Shutdown()
关闭消费服务
func (*DefaultMQPushConsumer) Subscribe ¶
func (pushConsumer *DefaultMQPushConsumer) Subscribe(topic string, subExpression string)
订阅topic和tag
type DefaultMQPushConsumerImpl ¶
type DefaultMQPushConsumerImpl struct { OffsetStore store.OffsetStore PullTimeDelayMillsWhenException int PullTimeDelayMillsWhenSuspend int BrokerSuspendMaxTimeMillis int PullTimeDelayMillsWhenFlowControl int ConsumerTimeoutMillisWhenSuspend int // contains filtered or unexported fields }
func NewDefaultMQPushConsumerImpl ¶
func NewDefaultMQPushConsumerImpl(defaultMQPushConsumer *DefaultMQPushConsumer) *DefaultMQPushConsumerImpl
func (*DefaultMQPushConsumerImpl) ConsumeFromWhere ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) ConsumeFromWhere() heartbeat.ConsumeFromWhere
func (*DefaultMQPushConsumerImpl) ConsumeType ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) ConsumeType() heartbeat.ConsumeType
func (*DefaultMQPushConsumerImpl) DoRebalance ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) DoRebalance()
执行负载
func (*DefaultMQPushConsumerImpl) ExecutePullRequestImmediately ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) ExecutePullRequestImmediately(pullRequest *consumer.PullRequest)
立即执行pull请求
func (*DefaultMQPushConsumerImpl) ExecutePullRequestLater ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) ExecutePullRequestLater(pullRequest *consumer.PullRequest, timeDelay int)
延迟执行pull请求
func (*DefaultMQPushConsumerImpl) GroupName ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) GroupName() string
func (*DefaultMQPushConsumerImpl) IsSubscribeTopicNeedUpdate ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) IsSubscribeTopicNeedUpdate(topic string) bool
func (*DefaultMQPushConsumerImpl) IsUnitMode ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) IsUnitMode() bool
func (*DefaultMQPushConsumerImpl) MessageModel ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) MessageModel() heartbeat.MessageModel
func (*DefaultMQPushConsumerImpl) PersistConsumerOffset ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) PersistConsumerOffset()
持久化消费offset
func (*DefaultMQPushConsumerImpl) Shutdown ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) Shutdown()
关闭
func (*DefaultMQPushConsumerImpl) Start ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) Start()
启动消费服务器
func (*DefaultMQPushConsumerImpl) Subscriptions ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) Subscriptions() set.Set
获取订阅信息
func (*DefaultMQPushConsumerImpl) UpdateTopicSubscribeInfo ¶
func (pushConsumerImpl *DefaultMQPushConsumerImpl) UpdateTopicSubscribeInfo(topic string, info set.Set)
更新订阅信息
type FindBrokerResult ¶
type FindBrokerResult struct {
// contains filtered or unexported fields
}
type LocalFileOffsetStore ¶
func NewLocalFileOffsetStore ¶
func NewLocalFileOffsetStore(mQClientFactory *MQClientInstance, groupName string) *LocalFileOffsetStore
func (*LocalFileOffsetStore) Persist ¶
func (store *LocalFileOffsetStore) Persist(mq *message.MessageQueue)
持久化
func (*LocalFileOffsetStore) PersistAll ¶
func (store *LocalFileOffsetStore) PersistAll(mqs set.Set)
持久化所有队列
func (*LocalFileOffsetStore) ReadOffset ¶
func (store *LocalFileOffsetStore) ReadOffset(mq *message.MessageQueue, rType baseStore.ReadOffsetType) int64
读取本地offset的备份
func (*LocalFileOffsetStore) RemoveOffset ¶
func (store *LocalFileOffsetStore) RemoveOffset(mq *message.MessageQueue)
删除offset
func (*LocalFileOffsetStore) UpdateOffset ¶
func (store *LocalFileOffsetStore) UpdateOffset(mq *message.MessageQueue, offset int64, increaseOnly bool)
更新本地offset
type MQAdminImpl ¶
type MQAdminImpl struct {
// contains filtered or unexported fields
}
MQAdminImpl: 运维方法 Author: yintongqiang Since: 2017/8/13
func NewMQAdminImpl ¶
func NewMQAdminImpl(clientFactory *MQClientInstance) *MQAdminImpl
func (*MQAdminImpl) CreateTopic ¶
func (impl *MQAdminImpl) CreateTopic(key, newTopic string, queueNum, topicSysFlag int) error
func (*MQAdminImpl) FetchSubscribeMessageQueues ¶
func (impl *MQAdminImpl) FetchSubscribeMessageQueues(topic string) []*message.MessageQueue
func (*MQAdminImpl) MaxOffset ¶
func (impl *MQAdminImpl) MaxOffset(mq *message.MessageQueue) int64
type MQClientAPIImpl ¶
type MQClientAPIImpl struct { DefalutRemotingClient remoting.RemotingClient ClientRemotingProcessor *ClientRemotingProcessor ProjectGroupPrefix string }
MQClientAPIImpl: 内部使用核心处理api Author: yintongqiang Since: 2017/8/8
func NewMQClientAPIImpl ¶
func NewMQClientAPIImpl(clientRemotingProcessor *ClientRemotingProcessor) *MQClientAPIImpl
func (*MQClientAPIImpl) CleanExpiredConsumeQueue ¶
func (impl *MQClientAPIImpl) CleanExpiredConsumeQueue(brokerAddr string, timeoutMillis int64) (bool, error)
CleanExpiredConsumeQueue 触发清理失效的消费队列 Author: tianyuliang Since: 2017/11/6
func (*MQClientAPIImpl) CloneGroupOffset ¶
func (impl *MQClientAPIImpl) CloneGroupOffset(brokerAddr, srcGroup, destGroup, topic string, isOffline bool, timeoutMillis int64) error
CloneGroupOffset 克隆消费组的偏移量 Author: tianyuliang Since: 2017/11/6
func (*MQClientAPIImpl) ConsumeMessageDirectly ¶
func (impl *MQClientAPIImpl) ConsumeMessageDirectly(brokerAddr, consumerGroup, clientId, msgId string, timeoutMills int64) (*body.ConsumeMessageDirectlyResult, error)
ConsumeMessageDirectly Author: tianyuliang Since: 2017/11/6
func (*MQClientAPIImpl) CreateCustomTopic ¶
func (impl *MQClientAPIImpl) CreateCustomTopic(addr, defaultTopic string, topicConfig stgcommon.TopicConfig, timeoutMillis int)
CreateCustomTopic 创建指定Topic Author: tianyuliang Since: 2017/11/1
func (*MQClientAPIImpl) CreateTopic ¶
func (impl *MQClientAPIImpl) CreateTopic(brokerAddr, defaultTopic string, topicConfig *stgcommon.TopicConfig, timeoutMillis int) error
创建topic
func (*MQClientAPIImpl) DeleteSubscriptionGroup ¶
func (impl *MQClientAPIImpl) DeleteSubscriptionGroup(brokerAddr, groupName string, timeoutMillis int64) error
DeleteSubscriptionGroup 删除订阅组信息 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) DeleteTopicInBroker ¶
func (impl *MQClientAPIImpl) DeleteTopicInBroker(brokerAddr, topic string, timeoutMillis int64) error
DeleteTopicInBroker 删除broker节点上对应的Topic Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) DeleteTopicInNameServer ¶
func (impl *MQClientAPIImpl) DeleteTopicInNameServer(namesrvAddr, topic string, timeoutMillis int64) error
DeleteTopicInNameServer 删除Namesrv维护的Topic Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) GetBrokerClusterInfo ¶
func (impl *MQClientAPIImpl) GetBrokerClusterInfo(timeoutMillis int64) (*body.ClusterPlusInfo, []*body.ClusterBrokerWapper, error)
GetBrokerClusterInfo 查询Cluster集群信息 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) GetBrokerRuntimeInfo ¶
func (impl *MQClientAPIImpl) GetBrokerRuntimeInfo(brokerAddr string, timeoutMillis int64) (*body.KVTable, error)
GetBrokerRuntimeInfo 查询Broker运行时状态信息 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) GetConsumeStats ¶
func (impl *MQClientAPIImpl) GetConsumeStats(brokerAddr, consumerGroup string, timeoutMillis int64) (*admin.ConsumeStats, error)
GetConsumeStats 查询消费者状态 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) GetConsumeStatsByTopic ¶
func (impl *MQClientAPIImpl) GetConsumeStatsByTopic(brokerAddr, consumerGroup, topic string, timeoutMillis int64) (*admin.ConsumeStats, error)
GetConsumeStats 查询消费者状态 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) GetConsumerConnectionList ¶
func (impl *MQClientAPIImpl) GetConsumerConnectionList(brokerAddr, consumerGroup string, timeoutMillis int64) (*body.ConsumerConnectionPlus, int, error)
GetConsumerConnectionList 查询在线消费进程列表 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) GetConsumerIdListByGroup ¶
func (impl *MQClientAPIImpl) GetConsumerIdListByGroup(addr string, consumerGroup string, timeoutMillis int64) []string
func (*MQClientAPIImpl) GetConsumerRunningInfo ¶
func (impl *MQClientAPIImpl) GetConsumerRunningInfo(brokerAddr, consumerGroup, clientId string, jstack bool, timeoutMillis int64) (*body.ConsumerRunningInfo, error)
GetConsumerRunningInfo 获得consumer运行时状态信息 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) GetDefaultTopicRouteInfoFromNameServer ¶
func (impl *MQClientAPIImpl) GetDefaultTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) *route.TopicRouteData
func (*MQClientAPIImpl) GetKVConfigValue ¶
func (impl *MQClientAPIImpl) GetKVConfigValue(namespace, key string, timeoutMillis int64) (string, error)
GetKVConfigValue 获取配置信息 Author: tianyuliang Since: 2017/11/1
func (*MQClientAPIImpl) GetKVListByNamespace ¶
func (impl *MQClientAPIImpl) GetKVListByNamespace(namespace string, timeoutMillis int64) (*body.KVTable, error)
GetKVConfigValue 获取配置信息 Author: tianyuliang Since: 2017/11/1
func (*MQClientAPIImpl) GetMaxOffset ¶
func (impl *MQClientAPIImpl) GetMaxOffset(addr string, topic string, queueId int, timeoutMillis int64) int64
获取队列最大offset
func (*MQClientAPIImpl) GetNameServerAddressList ¶
func (impl *MQClientAPIImpl) GetNameServerAddressList() []string
GetNameServerAddressList 获取最新namesrv地址列表 Author: tianyuliang Since: 2017/11/6
func (*MQClientAPIImpl) GetProducerConnectionList ¶
func (impl *MQClientAPIImpl) GetProducerConnectionList(brokerAddr, producerGroup string, timeoutMillis int64) (*body.ProducerConnection, error)
GetProducerConnectionList 查询在线生产者进程信息 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) GetProjectGroupByIp ¶
func (impl *MQClientAPIImpl) GetProjectGroupByIp(ip string, timeoutMillis int64) (string, error)
从namesrv查询客户端IP信息
func (*MQClientAPIImpl) GetTopicListFromNameServer ¶
func (impl *MQClientAPIImpl) GetTopicListFromNameServer(timeoutMills int64) (*body.TopicList, error)
GetTopicListFromNameServer 从Namesrv查询所有Topic列表 Author: tianyuliang Since: 2017/11/1
func (*MQClientAPIImpl) GetTopicRouteInfoFromNameServer ¶
func (impl *MQClientAPIImpl) GetTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) (*route.TopicRouteData, error)
func (*MQClientAPIImpl) GetTopicStatsInfo ¶
func (impl *MQClientAPIImpl) GetTopicStatsInfo(brokerAddr, topic string, timeoutMillis int64) (*admin.TopicStatsTable, error)
GetTopicStatsInfo 查询Topic状态信息 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) GetTopicsByCluster ¶
func (impl *MQClientAPIImpl) GetTopicsByCluster(clusterName string, timeoutMillis int64) (*body.TopicPlusList, error)
GetTopicsByCluster 查询集群信息 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) InvokeBrokerToGetConsumerStatus ¶
func (impl *MQClientAPIImpl) InvokeBrokerToGetConsumerStatus(brokerAddr, topic, group, clientAddr string, timeoutMillis int64) (map[string]map[*message.MessageQueue]int64, error)
InvokeBrokerToGetConsumerStatus 反向查找broker中的consume状态 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) PullMessage ¶
func (impl *MQClientAPIImpl) PullMessage(addr string, requestHeader header.PullMessageRequestHeader, timeoutMillis int, communicationMode CommunicationMode, pullCallback PullCallback) *PullResultExt
func (*MQClientAPIImpl) PutKVConfigValue ¶
func (impl *MQClientAPIImpl) PutKVConfigValue(namespace, key, value string, timeoutMillis int64) error
putKVConfigValue 设置配置信息 Author: tianyuliang Since: 2017/11/1
func (*MQClientAPIImpl) QueryConsumeTimeSpan ¶
func (impl *MQClientAPIImpl) QueryConsumeTimeSpan(brokerAddr, topic, consumerGroup string, timeoutMills int64) (set.Set, error)
QueryConsumeTimeSpan Author: tianyuliang Since: 2017/11/6
func (*MQClientAPIImpl) QueryTopicConsumeByWho ¶
func (impl *MQClientAPIImpl) QueryTopicConsumeByWho(brokerAddr, topic string, timeoutMillis int64) (*body.GroupList, error)
QueryTopicConsumeByWho 查询topic被那些组消费 Author: tianyuliang Since: 2017/11/3
func (*MQClientAPIImpl) SendMessage ¶
func (impl *MQClientAPIImpl) SendMessage(addr string, brokerName string, msg *message.Message, requestHeader header.SendMessageRequestHeader, timeoutMillis int64, communicationMode CommunicationMode, sendCallback SendCallback) (*SendResult, error)
func (*MQClientAPIImpl) UpdateConsumerOffsetOneway ¶
func (impl *MQClientAPIImpl) UpdateConsumerOffsetOneway(addr string, requestHeader header.UpdateConsumerOffsetRequestHeader, timeoutMillis int64)
func (*MQClientAPIImpl) UpdateNameServerAddressList ¶
func (impl *MQClientAPIImpl) UpdateNameServerAddressList(addrs string)
func (*MQClientAPIImpl) ViewBrokerStatsData ¶
func (impl *MQClientAPIImpl) ViewBrokerStatsData(brokerAddr, statsName, statsKey string, timeoutMillis int64) (*body.BrokerStatsData, error)
ViewBrokerStatsData 查询broker节点自身的状态信息 Author: tianyuliang Since: 2017/11/3
func (MQClientAPIImpl) ViewMessage ¶
func (impl MQClientAPIImpl) ViewMessage(storeHost string, physicOffset uint64, timeoutMills int64) (*message.MessageExt, error)
ViewMessage Author: tianyuliang Since: 2017/11/9
func (*MQClientAPIImpl) WipeWritePermOfBroker ¶
func (impl *MQClientAPIImpl) WipeWritePermOfBroker(namesrvAddr, brokerName string, timeoutMillis int64) (int, error)
WipeWritePermOfBroker 关闭broker写权限 Author: tianyuliang Since: 2017/11/3
type MQClientInstance ¶
type MQClientInstance struct { ClientConfig *stgclient.ClientConfig InstanceIndex int32 ClientId string ProducerTable *sync.Map // groupId<MQProducerInner> ConsumerTable *sync.Map // groupId<MQConsumerInner> AdminExtTable *sync.Map // groupId<MQAdminExtInner> MQClientAPIImpl *MQClientAPIImpl MQAdminImpl *MQAdminImpl TopicRouteTable *sync.Map // topic<*TopicRouteData> LockNamesrv lock.RWMutex LockHeartbeat lock.RWMutex BrokerAddrTable *sync.Map // broker name map[int(brokerId)] string(address) ClientRemotingProcessor *ClientRemotingProcessor PullMessageService *PullMessageService RebalanceService *RebalanceService DefaultMQProducer *DefaultMQProducer ServiceState stgcommon.ServiceState TimerTask set.Set }
MQClientInstance: producer和consumer核心 Author: yintongqiang Since: 2017/8/8
func NewMQClientInstance ¶
func NewMQClientInstance(clientConfig *stgclient.ClientConfig, instanceIndex int32, clientId string) *MQClientInstance
NewMQClientInstance: 初始化 Author: yintongqiang Since: 2017/8/10
func (*MQClientInstance) ConsumeMessageDirectly ¶
func (self *MQClientInstance) ConsumeMessageDirectly(msg *message.MessageExt, consumerGroup, brokerName string) *body.ConsumeMessageDirectlyResult
func (*MQClientInstance) FindBrokerAddressInPublish ¶
func (mqClientInstance *MQClientInstance) FindBrokerAddressInPublish(brokerName string) string
查找broker的master地址
func (*MQClientInstance) RegisterConsumer ¶
func (mqClientInstance *MQClientInstance) RegisterConsumer(group string, consumer consumer.MQConsumerInner) bool
将生产者group和发送类保存到内存中
func (*MQClientInstance) RegisterProducer ¶
func (mqClientInstance *MQClientInstance) RegisterProducer(group string, producer *DefaultMQProducerImpl) bool
将生产者group和发送类保存到内存中
func (*MQClientInstance) SendHeartbeatToAllBrokerWithLock ¶
func (mqClientInstance *MQClientInstance) SendHeartbeatToAllBrokerWithLock()
向所有boker发送心跳
func (*MQClientInstance) Shutdown ¶
func (mqClientInstance *MQClientInstance) Shutdown()
func (*MQClientInstance) Start ¶
func (mqClientInstance *MQClientInstance) Start()
func (*MQClientInstance) StartScheduledTask ¶
func (mqClientInstance *MQClientInstance) StartScheduledTask()
func (*MQClientInstance) UnregisterConsumer ¶
func (mqClientInstance *MQClientInstance) UnregisterConsumer(group string)
注销消费者
func (*MQClientInstance) UnregisterProducer ¶
func (mqClientInstance *MQClientInstance) UnregisterProducer(group string)
注销生产者
func (*MQClientInstance) UpdateTopicRouteInfoFromNameServer ¶
func (mqClientInstance *MQClientInstance) UpdateTopicRouteInfoFromNameServer()
从nameserver更新路由信息
func (*MQClientInstance) UpdateTopicRouteInfoFromNameServerByArgs ¶
func (mqClientInstance *MQClientInstance) UpdateTopicRouteInfoFromNameServerByArgs(topic string, isDefault bool, defaultMQProducer *DefaultMQProducer) bool
func (*MQClientInstance) UpdateTopicRouteInfoFromNameServerByTopic ¶
func (mqClientInstance *MQClientInstance) UpdateTopicRouteInfoFromNameServerByTopic(topic string) bool
type MQClientManager ¶
type MQClientManager struct { // clientId MQClientInstance FactoryTable *syncMap.Map FactoryIndexGenerator int32 }
func GetInstance ¶
func GetInstance() *MQClientManager
func NewMQClientManager ¶
func NewMQClientManager() *MQClientManager
func (*MQClientManager) GetAndCreateMQClientInstance ¶
func (mQClientManager *MQClientManager) GetAndCreateMQClientInstance(clientConfig *stgclient.ClientConfig) *MQClientInstance
从集合中查询MQClientInstance,无则创建一个
func (*MQClientManager) RemoveClientFactory ¶
func (mQClientManager *MQClientManager) RemoveClientFactory(clientId string)
删除客户端
type MQProducer ¶
type MQProducerInner ¶
type PullAPIWrapper ¶
type PullAPIWrapper struct {
// contains filtered or unexported fields
}
func NewPullAPIWrapper ¶
func NewPullAPIWrapper(mQClientFactory *MQClientInstance, consumerGroup string, unitMode bool) *PullAPIWrapper
func (*PullAPIWrapper) PullKernelImpl ¶
func (api *PullAPIWrapper) PullKernelImpl(mq *message.MessageQueue, subExpression string, subVersion int, offset int64, maxNums int, sysFlag int, commitOffset int64, brokerSuspendMaxTimeMillis int, timeoutMillis int, communicationMode CommunicationMode, pullCallback PullCallback) *PullResultExt
type PullCallBackImpl ¶
type PullCallBackImpl struct { *heartbeat.SubscriptionData *consumer.PullRequest *DefaultMQPushConsumerImpl // contains filtered or unexported fields }
func (*PullCallBackImpl) OnSuccess ¶
func (backImpl *PullCallBackImpl) OnSuccess(pullResultExt *PullResultExt)
type PullCallback ¶
type PullCallback interface {
OnSuccess(pullResultExt *PullResultExt)
}
type PullMessageService ¶
type PullMessageService struct { MQClientFactory *MQClientInstance PullRequestQueue chan *consumer.PullRequest // contains filtered or unexported fields }
func NewPullMessageService ¶
func NewPullMessageService(mqClientFactory *MQClientInstance) *PullMessageService
func (*PullMessageService) ExecutePullRequestImmediately ¶
func (service *PullMessageService) ExecutePullRequestImmediately(pullRequest *consumer.PullRequest)
向通道中加入pullRequest
func (*PullMessageService) ExecutePullRequestLater ¶
func (service *PullMessageService) ExecutePullRequestLater(pullRequest *consumer.PullRequest, timeDelay int)
延迟执行pull请求
func (*PullMessageService) Shutdown ¶
func (service *PullMessageService) Shutdown()
func (*PullMessageService) Start ¶
func (service *PullMessageService) Start()
type PullResultExt ¶
type PullResultExt struct { *consumer.PullResult // contains filtered or unexported fields }
func NewPullResultExt ¶
func NewPullResultExt(pullStatus consumer.PullStatus, nextBeginOffset int64, minOffset int64, maxOffset int64, msgFoundList []*message.MessageExt, suggestWhichBrokerId int64, messageBinary []byte) *PullResultExt
type RebalanceImpl ¶
type RebalanceImpl interface { // 消费类型(主动或被动) ConsumeType() heartbeat.ConsumeType // 队列变更主要用于Schedule service for pull consumer todo 暂未实现 MessageQueueChanged(topic string, mqAll set.Set, mqDivided set.Set) // 删除不必要的,非法的queue RemoveUnnecessaryMessageQueue(mq *message.MessageQueue, pq *consumer.ProcessQueue) bool // 分发拉取消息请求(拉取消息真正入口) DispatchPullRequest(pullRequestList []*consumer.PullRequest) // 计算队列拉取位置 ComputePullFromWhere(mq *message.MessageQueue) int64 }
type RebalanceImplExt ¶
type RebalanceImplExt struct { RebalanceImpl RebalanceImpl ProcessQueueTable *sync.Map //*MessageQueue, *ProcessQueue TopicSubscribeInfoTable *sync.Map //topic Set<*MessageQueue> SubscriptionInner *sync.Map //topic, *SubscriptionData ConsumerGroup string MessageModel heartbeat.MessageModel AllocateMessageQueueStrategy rebalance.AllocateMessageQueueStrategy MQClientFactory *MQClientInstance }
func NewRebalanceImplExt ¶
func NewRebalanceImplExt(rebalanceImpl RebalanceImpl) *RebalanceImplExt
func (*RebalanceImplExt) RemoveProcessQueue ¶
func (ext *RebalanceImplExt) RemoveProcessQueue(mq *message.MessageQueue)
删除内存中非法的消费处理队列
type RebalancePullImpl ¶
type RebalancePullImpl struct { *RebalanceImplExt // contains filtered or unexported fields }
func NewRebalancePullImpl ¶
func NewRebalancePullImpl(defaultMQPullConsumerImpl *DefaultMQPullConsumerImpl) *RebalancePullImpl
func (*RebalancePullImpl) ComputePullFromWhere ¶
func (pullImpl *RebalancePullImpl) ComputePullFromWhere(mq *message.MessageQueue) int64
func (*RebalancePullImpl) ConsumeType ¶
func (pullImpl *RebalancePullImpl) ConsumeType() heartbeat.ConsumeType
func (*RebalancePullImpl) DispatchPullRequest ¶
func (pullImpl *RebalancePullImpl) DispatchPullRequest(pullRequestList []*consumer.PullRequest)
func (*RebalancePullImpl) MessageQueueChanged ¶
func (*RebalancePullImpl) RemoveUnnecessaryMessageQueue ¶
func (pullImpl *RebalancePullImpl) RemoveUnnecessaryMessageQueue(mq *message.MessageQueue, pq *consumer.ProcessQueue) bool
type RebalancePushImpl ¶
type RebalancePushImpl struct {
// contains filtered or unexported fields
}
func NewRebalancePushImpl ¶
func NewRebalancePushImpl(defaultMQPushConsumerImpl *DefaultMQPushConsumerImpl) *RebalancePushImpl
func (*RebalancePushImpl) ComputePullFromWhere ¶
func (pushImpl *RebalancePushImpl) ComputePullFromWhere(mq *message.MessageQueue) int64
func (*RebalancePushImpl) ConsumeType ¶
func (pushImpl *RebalancePushImpl) ConsumeType() heartbeat.ConsumeType
func (*RebalancePushImpl) DispatchPullRequest ¶
func (pushImpl *RebalancePushImpl) DispatchPullRequest(pullRequestList []*consumer.PullRequest)
func (*RebalancePushImpl) MessageQueueChanged ¶
func (*RebalancePushImpl) RemoveUnnecessaryMessageQueue ¶
func (pushImpl *RebalancePushImpl) RemoveUnnecessaryMessageQueue(mq *message.MessageQueue, pq *consumer.ProcessQueue) bool
type RebalanceService ¶
type RebalanceService struct { MQClientFactory *MQClientInstance WaitInterval int //单位秒 Wakeup chan bool // contains filtered or unexported fields }
func NewRebalanceService ¶
func NewRebalanceService(mqClientFactory *MQClientInstance) *RebalanceService
func (*RebalanceService) Shutdown ¶
func (service *RebalanceService) Shutdown()
func (*RebalanceService) Start ¶
func (service *RebalanceService) Start()
type RemoteBrokerOffsetStore ¶
type RemoteBrokerOffsetStore struct {
// contains filtered or unexported fields
}
func NewRemoteBrokerOffsetStore ¶
func NewRemoteBrokerOffsetStore(mQClientFactory *MQClientInstance, groupName string) *RemoteBrokerOffsetStore
func (*RemoteBrokerOffsetStore) Load ¶
func (store *RemoteBrokerOffsetStore) Load()
func (*RemoteBrokerOffsetStore) Persist ¶
func (store *RemoteBrokerOffsetStore) Persist(mq *message.MessageQueue)
func (*RemoteBrokerOffsetStore) PersistAll ¶
func (store *RemoteBrokerOffsetStore) PersistAll(mqs set.Set)
func (*RemoteBrokerOffsetStore) ReadOffset ¶
func (rStore *RemoteBrokerOffsetStore) ReadOffset(mq *message.MessageQueue, rType store.ReadOffsetType) int64
func (*RemoteBrokerOffsetStore) RemoveOffset ¶
func (store *RemoteBrokerOffsetStore) RemoveOffset(mq *message.MessageQueue)
func (*RemoteBrokerOffsetStore) UpdateOffset ¶
func (store *RemoteBrokerOffsetStore) UpdateOffset(mq *message.MessageQueue, offset int64, increaseOnly bool)
type SendCallback ¶
type SendCallback func(sendResult *SendResult, err error)
type SendCallback interface { OnSuccess(sendResult *SendResult) }
type SendResult ¶
type SendResult struct { SendStatus SendStatus MsgId string MessageQueue *message.MessageQueue QueueOffset int64 TransactionId string }
func NewSendResult ¶
func NewSendResult(sendStatus SendStatus, msgId string, messageQueue *message.MessageQueue, queueOffset int64, projectGroupPrefix string) *SendResult
func (*SendResult) ToString ¶
func (sendResult *SendResult) ToString() string
type SendStatus ¶
type SendStatus int
const ( SEND_OK SendStatus = iota FLUSH_DISK_TIMEOUT FLUSH_SLAVE_TIMEOUT SLAVE_NOT_AVAILABLE )
func (SendStatus) String ¶
func (status SendStatus) String() string
type TopicPublishInfo ¶
type TopicPublishInfo struct { OrderTopic bool HaveTopicRouterInfo bool MessageQueueList []*message.MessageQueue SendWhichQueue int64 }
TopicPublishInfo: topic发布信息 Author: yintongqiang Since: 2017/8/10
func NewTopicPublishInfo ¶
func NewTopicPublishInfo() *TopicPublishInfo
func (*TopicPublishInfo) SelectOneMessageQueue ¶
func (topicPublishInfo *TopicPublishInfo) SelectOneMessageQueue(lastBrokerName string) *message.MessageQueue
取模获取选择队列
func (*TopicPublishInfo) ToString ¶
func (self *TopicPublishInfo) ToString() string
Source Files ¶
- client_remoting_processor.go
- communication_mode.go
- consume_message_concurrently_service.go
- consume_message_service.go
- default_mq_producer.go
- default_mq_producer_impl.go
- default_mq_pull_consumer.go
- default_mq_pull_consumer_impl.go
- default_mq_push_consumer.go
- default_mq_push_consumer_impl.go
- find_broker_result.go
- local_file_offset_store.go
- mq_admin_impl.go
- mq_client_api_ext_impl.go
- mq_client_api_impl.go
- mq_client_instance.go
- mq_client_manager.go
- mq_producer.go
- mq_producer_inner.go
- pull_api_wrapper.go
- pull_call_back.go
- pull_message_service.go
- pull_result_ext.go
- rebalance_impl.go
- rebalance_pull_impl.go
- rebalance_push_impl.go
- rebalance_service.go
- remote_broker_offset_store.go
- send_callback.go
- send_result.go
- send_status.go
- topic_publish_info.go
- validators.go