Documentation ¶
Index ¶
- Constants
- func DoResponse(ctx netm.Context, request *protocol.RemotingCommand, ...)
- func GetConsumerOffsetPath(rootDir string) string
- func GetSubscriptionGroupPath(rootDir string) string
- func GetTopicConfigPath(rootDir string) string
- type AbstractSendMessageProcessor
- func (asmp *AbstractSendMessageProcessor) ExecuteSendMessageHookAfter(response *protocol.RemotingCommand, context *mqtrace.SendMessageContext)
- func (asmp *AbstractSendMessageProcessor) ExecuteSendMessageHookBefore(ctx netm.Context, request *protocol.RemotingCommand, ...)
- func (asmp *AbstractSendMessageProcessor) HasSendMessageHook() bool
- func (asmp *AbstractSendMessageProcessor) RegisterSendMessageHook(sendMessageHookList []mqtrace.SendMessageHook)
- type AdminBrokerProcessor
- type Broker2Client
- func (b2c *Broker2Client) CallClient(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
- func (b2c *Broker2Client) CheckProducerTransactionState(channel netm.Context, requestHeader *header.CheckTransactionStateRequestHeader, ...)
- func (b2c *Broker2Client) GetConsumeStatus(topic, group, originClientId string) *protocol.RemotingCommand
- func (b2c *Broker2Client) ResetOffset(topic, group string, timeStamp int64, isForce bool) *protocol.RemotingCommand
- type BrokerAllConfig
- type BrokerController
- func (self *BrokerController) EncodeAllConfig() string
- func (self *BrokerController) GetBrokerAddr() string
- func (self *BrokerController) GetStoreHost() string
- func (self *BrokerController) Initialize() bool
- func (self *BrokerController) RegisterBrokerAll(checkOrderConfig bool, oneway bool)
- func (self *BrokerController) RegisterConsumeMessageHook(hook mqtrace.ConsumeMessageHook)
- func (self *BrokerController) RegisterSendMessageHook(hook mqtrace.SendMessageHook)
- func (self *BrokerController) Shutdown()
- func (self *BrokerController) Start()
- func (self *BrokerController) UpdateAllConfig(properties []byte)
- type BrokerControllerTask
- type ClientHouseKeepingService
- func (self *ClientHouseKeepingService) OnContextClose(ctx netm.Context)
- func (self *ClientHouseKeepingService) OnContextConnect(ctx netm.Context)
- func (self *ClientHouseKeepingService) OnContextError(ctx netm.Context)
- func (self *ClientHouseKeepingService) OnContextIdle(ctx netm.Context)
- func (self *ClientHouseKeepingService) Shutdown()
- func (self *ClientHouseKeepingService) Start()
- type ClientManageProcessor
- func (cmp *ClientManageProcessor) ExecuteConsumeMessageHookAfter(context *mqtrace.ConsumeMessageContext)
- func (cmp *ClientManageProcessor) HasConsumeMessageHook() bool
- func (cmp *ClientManageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
- func (cmp *ClientManageProcessor) RegisterConsumeMessageHook(consumeMessageHookList []mqtrace.ConsumeMessageHook)
- type ConfigManager
- type ConfigManagerExt
- type ConsumerOffsetManager
- func (com *ConsumerOffsetManager) CloneOffset(srcGroup, destGroup, topic string)
- func (com *ConsumerOffsetManager) CommitOffset(group, topic string, queueId int, offset int64)
- func (com *ConsumerOffsetManager) ConfigFilePath() string
- func (com *ConsumerOffsetManager) Decode(buf []byte)
- func (com *ConsumerOffsetManager) Encode(prettyFormat bool) string
- func (com *ConsumerOffsetManager) Load() bool
- func (com *ConsumerOffsetManager) Persist()
- func (com *ConsumerOffsetManager) QueryMinOffsetInAllGroup(topic, filterGroups string) map[int]int64
- func (com *ConsumerOffsetManager) QueryOffset(group, topic string, queueId int) int64
- func (com *ConsumerOffsetManager) QueryOffsetByGroupAndTopic(group, topic string) map[int]int64
- func (self *ConsumerOffsetManager) ScanUnsubscribedTopic()
- func (com *ConsumerOffsetManager) WhichGroupByTopic(topic string) set.Set
- func (com *ConsumerOffsetManager) WhichTopicByConsumer(group string) set.Set
- type DefaultConsumerIdsChangeListener
- type DefaultTransactionCheckExecuter
- type EndTransactionProcessor
- type FilterServerInfo
- type FilterServerManager
- func (fsm *FilterServerManager) BuildNewFilterServerList() (filterServerAdds []string)
- func (fsm *FilterServerManager) RegisterFilterServer(ctx netm.Context, filterServerAddr string)
- func (fsm *FilterServerManager) ScanNotActiveChannel()
- func (fsm *FilterServerManager) Shutdown()
- func (fsm *FilterServerManager) Start()
- type OffsetTable
- func (table *OffsetTable) Foreach(fn func(k string, v map[int]int64))
- func (table *OffsetTable) Get(k string) map[int]int64
- func (j *OffsetTable) MarshalJSON() ([]byte, error)
- func (j *OffsetTable) MarshalJSONBuf(buf fflib.EncodingBuffer) error
- func (table *OffsetTable) Put(k string, v map[int]int64)
- func (table *OffsetTable) PutAll(offsetMap *syncmap.Map)
- func (table *OffsetTable) Remove(k string) map[int]int64
- func (table *OffsetTable) RemoveByFlag(fn func(k string, v map[int]int64) bool)
- func (table *OffsetTable) Size() int
- func (j *OffsetTable) UnmarshalJSON(input []byte) error
- func (j *OffsetTable) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
- type PullMessageProcessor
- func (pull *PullMessageProcessor) ExecuteConsumeMessageHookBefore(context *mqtrace.ConsumeMessageContext)
- func (pull *PullMessageProcessor) ExecuteRequestWhenWakeup(ctx netm.Context, request *protocol.RemotingCommand)
- func (pull *PullMessageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
- func (pull *PullMessageProcessor) RegisterConsumeMessageHook(consumeMessageHookList []mqtrace.ConsumeMessageHook)
- type PullRequestHoldService
- type QueryMessageProcessor
- func (qmp *QueryMessageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
- func (qmp *QueryMessageProcessor) QueryMessage(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
- func (qmp *QueryMessageProcessor) ViewMessageById(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
- type RebalanceLockManager
- func (manager *RebalanceLockManager) TryLock(group string, mq *message.MessageQueue, clientId string) bool
- func (manager *RebalanceLockManager) TryLockBatch(group string, mqs set.Set, clientId string) set.Set
- func (manager *RebalanceLockManager) UnlockBatch(group string, mqs set.Set, clientId string)
- type SendMessageProcessor
- func (smp *SendMessageProcessor) ConsumerSendMsgBack(conn netm.Context, request *protocol.RemotingCommand) (remotingCommand *protocol.RemotingCommand)
- func (smp *SendMessageProcessor) ExecuteConsumeMessageHookAfter(context *mqtrace.ConsumeMessageContext)
- func (smp *SendMessageProcessor) HasConsumeMessageHook() bool
- func (smp *SendMessageProcessor) HasSendMessageHook() bool
- func (smp *SendMessageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
- func (smp *SendMessageProcessor) RegisterConsumeMessageHook(consumeMessageHookList []mqtrace.ConsumeMessageHook)
- func (smp *SendMessageProcessor) RegisterSendMessageHook(sendMessageHookList []mqtrace.SendMessageHook)
- func (smp *SendMessageProcessor) SendMessage(ctx netm.Context, request *protocol.RemotingCommand, ...) *protocol.RemotingCommand
- type SlaveSynchronize
- type SubscriptionGroupManager
- func (self *SubscriptionGroupManager) ConfigFilePath() string
- func (self *SubscriptionGroupManager) Decode(buf []byte)
- func (self *SubscriptionGroupManager) DeleteSubscriptionGroupConfig(groupName string)
- func (self *SubscriptionGroupManager) Encode(prettyFormat bool) string
- func (self *SubscriptionGroupManager) FindSubscriptionGroupConfig(group string) *subscription.SubscriptionGroupConfig
- func (self *SubscriptionGroupManager) Load() bool
- func (self *SubscriptionGroupManager) UpdateSubscriptionGroupConfig(config *subscription.SubscriptionGroupConfig)
- type TopicConfigManager
- func (self *TopicConfigManager) ConfigFilePath() string
- func (tcm *TopicConfigManager) CreateTopicInSendMessageBackMethod(topic string, clientDefaultTopicQueueNums int32, perm, topicSysFlag int) (topicConfig *stgcommon.TopicConfig, err error)
- func (tcm *TopicConfigManager) CreateTopicInSendMessageMethod(topic, defaultTopic, remoteAddress string, clientDefaultTopicQueueNums int32, ...) (topicConfig *stgcommon.TopicConfig, err error)
- func (self *TopicConfigManager) Decode(content []byte)
- func (tcm *TopicConfigManager) DeleteTopicConfig(topic string)
- func (self *TopicConfigManager) Encode(prettyFormat bool) string
- func (tcm *TopicConfigManager) IsOrderTopic(topic string) bool
- func (tcm *TopicConfigManager) Load() bool
- func (tcm *TopicConfigManager) SelectTopicConfig(topic string) *stgcommon.TopicConfig
- func (tcm *TopicConfigManager) UpdateTopicConfig(topicConfig *stgcommon.TopicConfig)
Constants ¶
const ( TOPIC_GROUP_SEPARATOR = "@" MAX_VALUE = 0x7fffffffffffffff )
const (
DLQ_NUMS_PER_GROUP = 1
)
Variables ¶
This section is empty.
Functions ¶
func DoResponse ¶
func DoResponse(ctx netm.Context, request *protocol.RemotingCommand, response *protocol.RemotingCommand)
func GetConsumerOffsetPath ¶
GetConsumerOffsetPath 获取consumerOffset.json路径 Author gaoyanlei Since 2017/8/21
func GetSubscriptionGroupPath ¶
GetSubscriptionGroupPath 获取subscriptionGroup.json路径 Author gaoyanlei Since 2017/8/21
func GetTopicConfigPath ¶
GetTopicConfigPath 获取topic.json路径 Author gaoyanlei Since 2017/8/21
Types ¶
type AbstractSendMessageProcessor ¶
type AbstractSendMessageProcessor struct { BrokerController *BrokerController Rand *rand.Rand StoreHost string // contains filtered or unexported fields }
AbstractSendMessageProcessor 发送处理类 Author gaoyanlei Since 2017/8/14
func NewAbstractSendMessageProcessor ¶
func NewAbstractSendMessageProcessor(brokerController *BrokerController) *AbstractSendMessageProcessor
NewAbstractSendMessageProcessor 初始化ConsumerOffsetManager Author gaoyanlei Since 2017/8/14
func (*AbstractSendMessageProcessor) ExecuteSendMessageHookAfter ¶
func (asmp *AbstractSendMessageProcessor) ExecuteSendMessageHookAfter(response *protocol.RemotingCommand, context *mqtrace.SendMessageContext)
ExecuteSendMessageHookAfter 发送消息后执行回调函数 Author rongzhihong Since 2017/9/11
func (*AbstractSendMessageProcessor) ExecuteSendMessageHookBefore ¶
func (asmp *AbstractSendMessageProcessor) ExecuteSendMessageHookBefore(ctx netm.Context, request *protocol.RemotingCommand, context *mqtrace.SendMessageContext)
ExecuteSendMessageHookBefore 发送消息前执行回调函数 Author rongzhihong Since 2017/9/11
func (*AbstractSendMessageProcessor) HasSendMessageHook ¶
func (asmp *AbstractSendMessageProcessor) HasSendMessageHook() bool
hasSendMessageHook 检查SendMessageHookList的长度 Author rongzhihong Since 2017/9/11
func (*AbstractSendMessageProcessor) RegisterSendMessageHook ¶
func (asmp *AbstractSendMessageProcessor) RegisterSendMessageHook(sendMessageHookList []mqtrace.SendMessageHook)
RegisterSendMessageHook 注册赋值 Author rongzhihong Since 2017/9/11
type AdminBrokerProcessor ¶
type AdminBrokerProcessor struct {
BrokerController *BrokerController
}
AdminBrokerProcessor 管理类请求处理 Author gaoyanlei Since 2017/8/23
func NewAdminBrokerProcessor ¶
func NewAdminBrokerProcessor(controller *BrokerController) *AdminBrokerProcessor
NewAdminBrokerProcessor 初始化 Author gaoyanlei Since 2017/8/23
func (*AdminBrokerProcessor) ProcessRequest ¶
func (self *AdminBrokerProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
ProcessRequest 请求入口 Author rongzhihong Since 2017/8/23
func (*AdminBrokerProcessor) ViewBrokerStatsData ¶
func (abp *AdminBrokerProcessor) ViewBrokerStatsData(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
ViewBrokerStatsData 查看Broker统计信息 Author rongzhihong Since 2017/9/19
type Broker2Client ¶
type Broker2Client struct {
BrokerController *BrokerController
}
Broker2Client Broker主动调用客户端接口 Author gaoyanlei Since 2017/8/9
func NewBroker2Clientr ¶
func NewBroker2Clientr(brokerController *BrokerController) *Broker2Client
NewBroker2Clientr Broker2Client Author gaoyanlei Since 2017/8/9
func (*Broker2Client) CallClient ¶
func (b2c *Broker2Client) CallClient(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
CallClient 调用客户端 Author rongzhihong Since 2017/9/18
func (*Broker2Client) CheckProducerTransactionState ¶
func (b2c *Broker2Client) CheckProducerTransactionState(channel netm.Context, requestHeader *header.CheckTransactionStateRequestHeader, selectMapedBufferResult *stgstorelog.SelectMapedBufferResult)
CheckProducerTransactionState Broker主动回查Producer事务状态,Oneway Author rongzhihong Since 2017/9/11
func (*Broker2Client) GetConsumeStatus ¶
func (b2c *Broker2Client) GetConsumeStatus(topic, group, originClientId string) *protocol.RemotingCommand
GetConsumeStatus Broker主动获取Consumer端的消息情况 Author rongzhihong Since 2017/9/18
func (*Broker2Client) ResetOffset ¶
func (b2c *Broker2Client) ResetOffset(topic, group string, timeStamp int64, isForce bool) *protocol.RemotingCommand
ResetOffset Broker 主动通知 Consumer,offset列表发生变化,需要进行重置 Author rongzhihong Since 2017/9/18
type BrokerAllConfig ¶
type BrokerAllConfig struct { BrokerConfig *stgcommon.BrokerConfig `json:"brokerConfig"` MessageStoreConfig *stgstorelog.MessageStoreConfig `json:"messageStoreConfig"` }
BrokerAllConfig Broker配置文件信息 Author rongzhihong Since 2017/9/12
func NewBrokerAllConfig ¶
func NewBrokerAllConfig() *BrokerAllConfig
NewBrokerAllConfig Broker配置文件信息初始化 Author rongzhihong Since 2017/9/12
func NewDefaultBrokerAllConfig ¶
func NewDefaultBrokerAllConfig(brokerConfig *stgcommon.BrokerConfig, messageStoreConfig *stgstorelog.MessageStoreConfig) *BrokerAllConfig
NewDefaultBrokerAllConfig Broker配置文件信息初始化 Author: tianyuliang Since: 2017/9/27
type BrokerController ¶
type BrokerController struct { BrokerConfig *stgcommon.BrokerConfig MessageStoreConfig *stgstorelog.MessageStoreConfig ConfigDataVersion *stgcommon.DataVersion ConsumerOffsetManager *ConsumerOffsetManager ConsumerManager *client.ConsumerManager ProducerManager *client.ProducerManager ClientHousekeepingService *ClientHouseKeepingService DefaultTransactionCheckExecuter *DefaultTransactionCheckExecuter PullMessageProcessor *PullMessageProcessor PullRequestHoldService *PullRequestHoldService Broker2Client *Broker2Client SubscriptionGroupManager *SubscriptionGroupManager ConsumerIdsChangeListener rebalance.ConsumerIdsChangeListener RebalanceLockManager *RebalanceLockManager BrokerOuterAPI *out.BrokerOuterAPI SlaveSynchronize *SlaveSynchronize MessageStore *stgstorelog.DefaultMessageStore RemotingClient *remoting.DefalutRemotingClient RemotingServer *remoting.DefalutRemotingServer TopicConfigManager *TopicConfigManager UpdateMasterHAServerAddrPeriodically bool FilterServerManager *FilterServerManager StoreHost string ConfigFile string // contains filtered or unexported fields }
BrokerController broker服务控制器 Author gaoyanlei Since 2017/8/25
func CreateBrokerController ¶
func CreateBrokerController(smartgoBrokerFilePath ...string) *BrokerController
CreateBrokerController 创建BrokerController对象 Author: tianyuliang Since: 2017/9/20
func NewBrokerController ¶
func NewBrokerController(brokerConfig *stgcommon.BrokerConfig, messageStoreConfig *stgstorelog.MessageStoreConfig, remotingClient *remoting.DefalutRemotingClient) *BrokerController
NewBrokerController 初始化broker服务控制器 Author gaoyanlei Since 2017/8/25
func Start ¶
func Start(stopChan chan bool, smartgoBrokerFilePath string) *BrokerController
Start 启动BrokerController Author: tianyuliang Since: 2017/9/20
func (*BrokerController) EncodeAllConfig ¶
func (self *BrokerController) EncodeAllConfig() string
EncodeAllConfig 读取所有配置文件信息 Author rongzhihong Since 2017/9/12
func (*BrokerController) GetBrokerAddr ¶
func (self *BrokerController) GetBrokerAddr() string
GetBrokerAddr 获得brokerAddr Author rongzhihong Since 2017/9/5
func (*BrokerController) GetStoreHost ¶
func (self *BrokerController) GetStoreHost() string
GetStoreHost 获取StoreHost Author: tianyuliang, <tianyuliang@gome.com.cn> Since: 2017/9/26
func (*BrokerController) Initialize ¶
func (self *BrokerController) Initialize() bool
Initialize 初始化broker必要操作 Author rongzhihong Since 2017/9/12
func (*BrokerController) RegisterBrokerAll ¶
func (self *BrokerController) RegisterBrokerAll(checkOrderConfig bool, oneway bool)
RegisterBrokerAll 注册所有broker Author rongzhihong Since 2017/9/12
func (*BrokerController) RegisterConsumeMessageHook ¶
func (self *BrokerController) RegisterConsumeMessageHook(hook mqtrace.ConsumeMessageHook)
RegisterSendMessageHook 注册消费消息的回调 Author rongzhihong Since 2017/9/11
func (*BrokerController) RegisterSendMessageHook ¶
func (self *BrokerController) RegisterSendMessageHook(hook mqtrace.SendMessageHook)
RegisterSendMessageHook 注册发送消息的回调 Author rongzhihong Since 2017/9/11
func (*BrokerController) Shutdown ¶
func (self *BrokerController) Shutdown()
Shutdown BrokerController停止入口 Author rongzhihong Since 2017/9/12
func (*BrokerController) Start ¶
func (self *BrokerController) Start()
Start BrokerController控制器的start启动入口 Author rongzhihong Since 2017/9/12
func (*BrokerController) UpdateAllConfig ¶
func (self *BrokerController) UpdateAllConfig(properties []byte)
UpdateAllConfig 更新所有文件 Author rongzhihong Since 2017/9/12
type BrokerControllerTask ¶
type BrokerControllerTask struct { BrokerController *BrokerController DeleteTopicTask *timeutil.Ticker BrokerStatsRecordTask *timeutil.Ticker PersistConsumerOffsetTask *timeutil.Ticker ScanUnSubscribedTopicTask *timeutil.Ticker FetchNameServerAddrTask *timeutil.Ticker SlaveSynchronizeTask *timeutil.Ticker PrintMasterAndSlaveDiffTask *timeutil.Ticker RegisterAllBrokerTask *timeutil.Ticker }
BrokerControllerTask broker控制器的各种任务 Author: tianyuliang Since: 2017/10/11
func NewBrokerControllerTask ¶
func NewBrokerControllerTask(controller *BrokerController) *BrokerControllerTask
func (*BrokerControllerTask) Shutdown ¶
func (self *BrokerControllerTask) Shutdown() bool
type ClientHouseKeepingService ¶
type ClientHouseKeepingService struct {
// contains filtered or unexported fields
}
ClientHousekeepingService 定期检测客户端连接,清除不活动的连接 Author rongzhihong Since 2017/9/8
func NewClientHousekeepingService ¶
func NewClientHousekeepingService(controller *BrokerController) *ClientHouseKeepingService
NewClientHousekeepingService 初始化定期检查客户端连接的服务 Author rongzhihong Since 2017/9/8
func (*ClientHouseKeepingService) OnContextClose ¶
func (self *ClientHouseKeepingService) OnContextClose(ctx netm.Context)
OnContextClose 监听通道关闭 Author rongzhihong Since 2017/9/8
func (*ClientHouseKeepingService) OnContextConnect ¶
func (self *ClientHouseKeepingService) OnContextConnect(ctx netm.Context)
OnContextConnect 监听通道连接 Author rongzhihong Since 2017/9/8
func (*ClientHouseKeepingService) OnContextError ¶
func (self *ClientHouseKeepingService) OnContextError(ctx netm.Context)
OnContextError 监听通道异常 Author rongzhihong Since 2017/9/8
func (*ClientHouseKeepingService) OnContextIdle ¶
func (self *ClientHouseKeepingService) OnContextIdle(ctx netm.Context)
OnContextIdle 监听通道闲置 Author rongzhihong Since 2017/9/8
func (*ClientHouseKeepingService) Shutdown ¶
func (self *ClientHouseKeepingService) Shutdown()
Shutdown 停止定时扫描过期的连接的服务 Author rongzhihong Since 2017/9/8
func (*ClientHouseKeepingService) Start ¶
func (self *ClientHouseKeepingService) Start()
Start 启动定时扫描过期的连接的服务 Author rongzhihong Since 2017/9/8
type ClientManageProcessor ¶
type ClientManageProcessor struct { BrokerController *BrokerController // contains filtered or unexported fields }
func NewClientManageProcessor ¶
func NewClientManageProcessor(controller *BrokerController) *ClientManageProcessor
NewClientManageProcessor 初始化ClientManageProcessor Author gaoyanlei Since 2017/8/9
func (*ClientManageProcessor) ExecuteConsumeMessageHookAfter ¶
func (cmp *ClientManageProcessor) ExecuteConsumeMessageHookAfter(context *mqtrace.ConsumeMessageContext)
ExecuteConsumeMessageHookAfter 消费消息后执行的回调函数 Author rongzhihong Since 2017/9/14
func (*ClientManageProcessor) HasConsumeMessageHook ¶
func (cmp *ClientManageProcessor) HasConsumeMessageHook() bool
hasConsumeMessageHook 判断是否有回调函数 Author rongzhihong Since 2017/9/14
func (*ClientManageProcessor) ProcessRequest ¶
func (cmp *ClientManageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
func (*ClientManageProcessor) RegisterConsumeMessageHook ¶
func (cmp *ClientManageProcessor) RegisterConsumeMessageHook(consumeMessageHookList []mqtrace.ConsumeMessageHook)
RegisterConsumeMessageHook 注册回调函数 Author rongzhihong Since 2017/9/14
type ConfigManager ¶
type ConfigManagerExt ¶
type ConfigManagerExt struct { ConfigManager ConfigManager sync.RWMutex }
func NewConfigManagerExt ¶
func NewConfigManagerExt(configManager ConfigManager) *ConfigManagerExt
func (*ConfigManagerExt) Load ¶
func (cme *ConfigManagerExt) Load() bool
func (*ConfigManagerExt) Persist ¶
func (cme *ConfigManagerExt) Persist()
type ConsumerOffsetManager ¶
type ConsumerOffsetManager struct { TOPIC_GROUP_SEPARATOR string Offsets *OffsetTable BrokerController *BrokerController // contains filtered or unexported fields }
ConsumerOffsetManager Consumer消费进度管理 Author gaoyanlei Since 2017/8/9
func NewConsumerOffsetManager ¶
func NewConsumerOffsetManager(brokerController *BrokerController) *ConsumerOffsetManager
NewConsumerOffsetManager 初始化ConsumerOffsetManager Author gaoyanlei Since 2017/8/9
func (*ConsumerOffsetManager) CloneOffset ¶
func (com *ConsumerOffsetManager) CloneOffset(srcGroup, destGroup, topic string)
CloneOffset 克隆偏移量 Author rongzhihong Since 2017/9/18
func (*ConsumerOffsetManager) CommitOffset ¶
func (com *ConsumerOffsetManager) CommitOffset(group, topic string, queueId int, offset int64)
CommitOffset 提交offset Author gaoyanlei Since 2017/9/10
func (*ConsumerOffsetManager) ConfigFilePath ¶
func (com *ConsumerOffsetManager) ConfigFilePath() string
func (*ConsumerOffsetManager) Decode ¶
func (com *ConsumerOffsetManager) Decode(buf []byte)
func (*ConsumerOffsetManager) Encode ¶
func (com *ConsumerOffsetManager) Encode(prettyFormat bool) string
func (*ConsumerOffsetManager) Load ¶
func (com *ConsumerOffsetManager) Load() bool
func (*ConsumerOffsetManager) Persist ¶
func (com *ConsumerOffsetManager) Persist()
func (*ConsumerOffsetManager) QueryMinOffsetInAllGroup ¶
func (com *ConsumerOffsetManager) QueryMinOffsetInAllGroup(topic, filterGroups string) map[int]int64
QueryMinOffsetInAllGroup 查询所有组中最小偏移量 Author rongzhihong Since 2017/9/18
func (*ConsumerOffsetManager) QueryOffset ¶
func (com *ConsumerOffsetManager) QueryOffset(group, topic string, queueId int) int64
QueryOffset 获取group下topic queueId 的offset Author gaoyanlei Since 2017/9/10
func (*ConsumerOffsetManager) QueryOffsetByGroupAndTopic ¶
func (com *ConsumerOffsetManager) QueryOffsetByGroupAndTopic(group, topic string) map[int]int64
QueryOffsetByGroupAndTopic 获取group与topuic所有队列offset Author rongzhihong Since 2017/9/12
func (*ConsumerOffsetManager) ScanUnsubscribedTopic ¶
func (self *ConsumerOffsetManager) ScanUnsubscribedTopic()
ScanUnsubscribedTopic 扫描被删除Topic,并删除该Topic对应的Offset Author gaoyanlei Since 2017/8/22
func (*ConsumerOffsetManager) WhichGroupByTopic ¶
func (com *ConsumerOffsetManager) WhichGroupByTopic(topic string) set.Set
WhichGroupByTopic 获得Topic的消费者 Author rongzhihong Since 2017/9/18
func (*ConsumerOffsetManager) WhichTopicByConsumer ¶
func (com *ConsumerOffsetManager) WhichTopicByConsumer(group string) set.Set
WhichTopicByConsumer 获得消费者的Topic Author rongzhihong Since 2017/9/18
type DefaultConsumerIdsChangeListener ¶
type DefaultConsumerIdsChangeListener struct {
BrokerController *BrokerController
}
DefaultConsumerIdsChangeListener ConsumerId列表变化,通知所有Consumer Author gaoyanlei Since 2017/8/9
func NewDefaultConsumerIdsChangeListener ¶
func NewDefaultConsumerIdsChangeListener(brokerController *BrokerController) *DefaultConsumerIdsChangeListener
NewDefaultConsumerIdsChangeListener 初始化 Author gaoyanlei Since 2017/8/9
func (*DefaultConsumerIdsChangeListener) ConsumerIdsChanged ¶
func (listener *DefaultConsumerIdsChangeListener) ConsumerIdsChanged(group string, channels []netm.Context)
ConsumerIdsChanged 通知Consumer改变 Author gaoyanlei Since 2017/8/9
type DefaultTransactionCheckExecuter ¶
type DefaultTransactionCheckExecuter struct {
// contains filtered or unexported fields
}
DefaultTransactionCheckExecuter 存储层回调此接口,用来主动回查Producer的事务状态 Author rongzhihong Since 2017/9/17
func NewDefaultTransactionCheckExecuter ¶
func NewDefaultTransactionCheckExecuter(brokerController *BrokerController) *DefaultTransactionCheckExecuter
NewDefaultTransactionCheckExecuter 初始化事务 Author rongzhihong Since 2017/9/17
type EndTransactionProcessor ¶
type EndTransactionProcessor struct {
BrokerController *BrokerController
}
EndTransactionProcessor Commit或Rollback事务 Author rongzhihong Since 2017/9/18
func (*EndTransactionProcessor) ProcessRequest ¶
func (etp *EndTransactionProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
ProcessRequest 请求 Author rongzhihong Since 2017/9/18
type FilterServerInfo ¶
type FilterServerInfo struct {
// contains filtered or unexported fields
}
FilterServerInfo FilterServer基本信息 Author rongzhihong Since 2017/9/8
type FilterServerManager ¶
type FilterServerManager struct { FilterServerMaxIdleTimeMills int64 // contains filtered or unexported fields }
FilterServerManager FilterServer管理 Author rongzhihong Since 2017/9/8
func NewFilterServerManager ¶
func NewFilterServerManager(bc *BrokerController) *FilterServerManager
NewFilterServerManager 初始化FilterServerManager Author rongzhihong Since 2017/9/8
func (*FilterServerManager) BuildNewFilterServerList ¶
func (fsm *FilterServerManager) BuildNewFilterServerList() (filterServerAdds []string)
BuildNewFilterServerList FilterServer地址列表 Author rongzhihong Since 2017/9/8
func (*FilterServerManager) RegisterFilterServer ¶
func (fsm *FilterServerManager) RegisterFilterServer(ctx netm.Context, filterServerAddr string)
RegisterFilterServer 注册FilterServer Author rongzhihong Since 2017/9/8
func (*FilterServerManager) ScanNotActiveChannel ¶
func (fsm *FilterServerManager) ScanNotActiveChannel()
ScanNotActiveChannel 10s向Broker注册一次,Broker如果发现30s没有注册,则删除它 Author rongzhihong Since 2017/9/8
func (*FilterServerManager) Shutdown ¶
func (fsm *FilterServerManager) Shutdown()
Shutdown 停止检查Filter Server的定时任务 Author rongzhihong Since 2017/9/8
func (*FilterServerManager) Start ¶
func (fsm *FilterServerManager) Start()
Start 启动;定时检查Filter Server个数,数量不符合,则创建 Author rongzhihong Since 2017/9/8
type OffsetTable ¶
type OffsetTable struct { Offsets map[string]map[int]int64 `json:"offsets"` sync.RWMutex `json:"-"` }
func (*OffsetTable) MarshalJSON ¶
func (j *OffsetTable) MarshalJSON() ([]byte, error)
MarshalJSON marshal bytes to json - template
func (*OffsetTable) MarshalJSONBuf ¶
func (j *OffsetTable) MarshalJSONBuf(buf fflib.EncodingBuffer) error
MarshalJSONBuf marshal buff to json - template
func (*OffsetTable) PutAll ¶
func (table *OffsetTable) PutAll(offsetMap *syncmap.Map)
PutAll 同步Offset配置文件 Author rongzhihong Since 2017/9/18
func (*OffsetTable) RemoveByFlag ¶
func (table *OffsetTable) RemoveByFlag(fn func(k string, v map[int]int64) bool)
func (*OffsetTable) Size ¶
func (table *OffsetTable) Size() int
func (*OffsetTable) UnmarshalJSON ¶
func (j *OffsetTable) UnmarshalJSON(input []byte) error
UnmarshalJSON umarshall json - template of ffjson
func (*OffsetTable) UnmarshalJSONFFLexer ¶
func (j *OffsetTable) UnmarshalJSONFFLexer(fs *fflib.FFLexer, state fflib.FFParseState) error
UnmarshalJSONFFLexer fast json unmarshall - template ffjson
type PullMessageProcessor ¶
type PullMessageProcessor struct { BrokerController *BrokerController ConsumeMessageHookList []mqtrace.ConsumeMessageHook }
PullMessageProcessor 拉消息请求处理 Author gaoyanlei Since 2017/8/10
func NewEndTransactionProcessor ¶
func NewEndTransactionProcessor(brokerController *BrokerController) *PullMessageProcessor
NewEndTransactionProcessor 初始化EndTransactionProcessor Author rongzhihong Since 2017/9/18
func NewPullMessageProcessor ¶
func NewPullMessageProcessor(brokerController *BrokerController) *PullMessageProcessor
NewPullMessageProcessor 初始化PullMessageProcessor Author gaoyanlei Since 2017/8/9
func (*PullMessageProcessor) ExecuteConsumeMessageHookBefore ¶
func (pull *PullMessageProcessor) ExecuteConsumeMessageHookBefore(context *mqtrace.ConsumeMessageContext)
ExecuteConsumeMessageHookBefore 消费消息前,执行回调 Author rongzhihong Since 2017/9/11
func (*PullMessageProcessor) ExecuteRequestWhenWakeup ¶
func (pull *PullMessageProcessor) ExecuteRequestWhenWakeup(ctx netm.Context, request *protocol.RemotingCommand)
ExecuteRequestWhenWakeup 唤醒拉取消息的请求 Author rongzhihong Since 2017/9/5
func (*PullMessageProcessor) ProcessRequest ¶
func (pull *PullMessageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
func (*PullMessageProcessor) RegisterConsumeMessageHook ¶
func (pull *PullMessageProcessor) RegisterConsumeMessageHook(consumeMessageHookList []mqtrace.ConsumeMessageHook)
ConsumeMessageHook 消费消息回调 Author rongzhihong Since 2017/9/11
type PullRequestHoldService ¶
type PullRequestHoldService struct { TOPIC_QUEUEID_SEPARATOR string // contains filtered or unexported fields }
PullRequestHoldService 拉消息请求管理,如果拉不到消息,则在这里Hold住,等待消息到来 Author rongzhihong Since 2017/9/5
func NewPullRequestHoldService ¶
func NewPullRequestHoldService(brokerController *BrokerController) *PullRequestHoldService
NewPullRequestHoldService 初始化拉消息请求服务 Author rongzhihong Since 2017/9/5
func (*PullRequestHoldService) Shutdown ¶
func (serv *PullRequestHoldService) Shutdown()
Shutdown 停止 Author rongzhihong Since 2017/9/5
func (*PullRequestHoldService) Start ¶
func (serv *PullRequestHoldService) Start()
Start 启动入口 Author rongzhihong Since 2017/9/5
func (*PullRequestHoldService) SuspendPullRequest ¶
func (serv *PullRequestHoldService) SuspendPullRequest(topic string, queueId int32, pullRequest *longpolling.PullRequest)
SuspendPullRequest 延缓拉请求 Author rongzhihong Since 2017/9/5
type QueryMessageProcessor ¶
type QueryMessageProcessor struct {
BrokerController *BrokerController
}
QueryMessageProcessor 查询消息请求处理 Author rongzhihong Since 2017/9/18
func NewQueryMessageProcessor ¶
func NewQueryMessageProcessor(brokerController *BrokerController) *QueryMessageProcessor
NewQueryMessageProcessor 初始化QueryMessageProcessor Author rongzhihong Since 2017/9/18
func (*QueryMessageProcessor) ProcessRequest ¶
func (qmp *QueryMessageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
ProcessRequest 请求 Author rongzhihong Since 2017/9/18
func (*QueryMessageProcessor) QueryMessage ¶
func (qmp *QueryMessageProcessor) QueryMessage(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
ProcessRequest 查询请求 Author rongzhihong Since 2017/9/18
func (*QueryMessageProcessor) ViewMessageById ¶
func (qmp *QueryMessageProcessor) ViewMessageById(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
ProcessRequest 根据MsgId查询消息 Author rongzhihong Since 2017/9/18
type RebalanceLockManager ¶
RebalanceLockManager 平衡锁管理 Author rongzhihong Since 2017/9/20
func NewRebalanceLockManager ¶
func NewRebalanceLockManager() *RebalanceLockManager
NewRebalanceLockManager 初始化 Author rongzhihong Since 2017/9/20
func (*RebalanceLockManager) TryLock ¶
func (manager *RebalanceLockManager) TryLock(group string, mq *message.MessageQueue, clientId string) bool
TryLock 尝试锁住 Author rongzhihong Since 2017/9/20
func (*RebalanceLockManager) TryLockBatch ¶
func (manager *RebalanceLockManager) TryLockBatch(group string, mqs set.Set, clientId string) set.Set
TryLockBatch 批量方式锁队列,返回锁定成功的队列集合 Author rongzhihong Since 2017/9/20
func (*RebalanceLockManager) UnlockBatch ¶
func (manager *RebalanceLockManager) UnlockBatch(group string, mqs set.Set, clientId string)
UnlockBatch 批量方式解锁队列 Author rongzhihong Since 2017/9/20
type SendMessageProcessor ¶
type SendMessageProcessor struct { BrokerController *BrokerController // contains filtered or unexported fields }
SendMessageProcessor 处理客户端发送消息的请求 Author gaoyanlei Since 2017/8/24
func NewSendMessageProcessor ¶
func NewSendMessageProcessor(brokerController *BrokerController) *SendMessageProcessor
func (*SendMessageProcessor) ConsumerSendMsgBack ¶
func (smp *SendMessageProcessor) ConsumerSendMsgBack(conn netm.Context, request *protocol.RemotingCommand) (remotingCommand *protocol.RemotingCommand)
consumerSendMsgBack 客户端返回未消费消息 Author gaoyanlei Since 2017/8/17
func (*SendMessageProcessor) ExecuteConsumeMessageHookAfter ¶
func (smp *SendMessageProcessor) ExecuteConsumeMessageHookAfter(context *mqtrace.ConsumeMessageContext)
ExecuteConsumeMessageHookAfter 消费消息后执行回调 Author rongzhihong Since 2017/9/5
func (*SendMessageProcessor) HasConsumeMessageHook ¶
func (smp *SendMessageProcessor) HasConsumeMessageHook() bool
HasConsumeMessageHook 判断是否存在消费消息回调 Author rongzhihong Since 2017/9/5
func (*SendMessageProcessor) HasSendMessageHook ¶
func (smp *SendMessageProcessor) HasSendMessageHook() bool
HasSendMessageHook 判断是否存在发送消息回调 Author rongzhihong Since 2017/9/5
func (*SendMessageProcessor) ProcessRequest ¶
func (smp *SendMessageProcessor) ProcessRequest(ctx netm.Context, request *protocol.RemotingCommand) (*protocol.RemotingCommand, error)
func (*SendMessageProcessor) RegisterConsumeMessageHook ¶
func (smp *SendMessageProcessor) RegisterConsumeMessageHook(consumeMessageHookList []mqtrace.ConsumeMessageHook)
RegisterSendMessageHook 注册赋值消费消息回调 Author rongzhihong Since 2017/9/5
func (*SendMessageProcessor) RegisterSendMessageHook ¶
func (smp *SendMessageProcessor) RegisterSendMessageHook(sendMessageHookList []mqtrace.SendMessageHook)
RegisterSendMessageHook 注册赋值发送消息回调 Author rongzhihong Since 2017/9/5
func (*SendMessageProcessor) SendMessage ¶
func (smp *SendMessageProcessor) SendMessage(ctx netm.Context, request *protocol.RemotingCommand, mqtraceContext *mqtrace.SendMessageContext, requestHeader *header.SendMessageRequestHeader) *protocol.RemotingCommand
sendMessage 正常消息 Author gaoyanlei Since 2017/8/17
type SlaveSynchronize ¶
type SlaveSynchronize struct { BrokerController *BrokerController // contains filtered or unexported fields }
SlaveSynchronize Slave从Master同步信息(非消息) Author gaoyanlei Since 2017/8/10
func NewSlaveSynchronize ¶
func NewSlaveSynchronize(brokerController *BrokerController) *SlaveSynchronize
NewSlaveSynchronize 初始化SubscriptionGroupManager Author gaoyanlei Since 2017/8/9
type SubscriptionGroupManager ¶
type SubscriptionGroupManager struct { BrokerController *BrokerController SubscriptionGroupTable *subscription.SubscriptionGroupTable ConfigManagerExt *ConfigManagerExt }
SubscriptionGroupManager 用来管理订阅组,包括订阅权限等 Author gaoyanlei Since 2017/8/9
func NewSubscriptionGroupManager ¶
func NewSubscriptionGroupManager(brokerController *BrokerController) *SubscriptionGroupManager
NewSubscriptionGroupManager 创建SubscriptionGroupManager Author gaoyanlei Since 2017/8/9
func (*SubscriptionGroupManager) ConfigFilePath ¶
func (self *SubscriptionGroupManager) ConfigFilePath() string
func (*SubscriptionGroupManager) Decode ¶
func (self *SubscriptionGroupManager) Decode(buf []byte)
func (*SubscriptionGroupManager) DeleteSubscriptionGroupConfig ¶
func (self *SubscriptionGroupManager) DeleteSubscriptionGroupConfig(groupName string)
deleteSubscriptionGroupConfig 删除某个订阅组的配置 Author rongzhihong Since 2017/9/18
func (*SubscriptionGroupManager) Encode ¶
func (self *SubscriptionGroupManager) Encode(prettyFormat bool) string
func (*SubscriptionGroupManager) FindSubscriptionGroupConfig ¶
func (self *SubscriptionGroupManager) FindSubscriptionGroupConfig(group string) *subscription.SubscriptionGroupConfig
FindSubscriptionGroupConfig 查找订阅关系 Author gaoyanlei Since 2017/8/17
func (*SubscriptionGroupManager) Load ¶
func (self *SubscriptionGroupManager) Load() bool
func (*SubscriptionGroupManager) UpdateSubscriptionGroupConfig ¶
func (self *SubscriptionGroupManager) UpdateSubscriptionGroupConfig(config *subscription.SubscriptionGroupConfig)
UpdateSubscriptionGroupConfig 更新订阅组配置 Author rongzhihong Since 2017/9/18
type TopicConfigManager ¶
type TopicConfigManager struct { LockTimeoutMillis int64 BrokerController *BrokerController TopicConfigSerializeWrapper *body.TopicConfigSerializeWrapper SystemTopicList set.Set ConfigManagerExt *ConfigManagerExt DataVersion *stgcommon.DataVersion // contains filtered or unexported fields }
func NewTopicConfigManager ¶
func NewTopicConfigManager(brokerController *BrokerController) *TopicConfigManager
NewTopicConfigManager 初始化TopicConfigManager Author gaoyanlei Since 2017/8/9
func (*TopicConfigManager) ConfigFilePath ¶
func (self *TopicConfigManager) ConfigFilePath() string
func (*TopicConfigManager) CreateTopicInSendMessageBackMethod ¶
func (tcm *TopicConfigManager) CreateTopicInSendMessageBackMethod(topic string, clientDefaultTopicQueueNums int32, perm, topicSysFlag int) (topicConfig *stgcommon.TopicConfig, err error)
createTopicInSendMessageBackMethod 该方法没有判断broker权限. Author gaoyanlei Since 2017/8/11
func (*TopicConfigManager) CreateTopicInSendMessageMethod ¶
func (tcm *TopicConfigManager) CreateTopicInSendMessageMethod(topic, defaultTopic, remoteAddress string, clientDefaultTopicQueueNums int32, topicSysFlag int) (topicConfig *stgcommon.TopicConfig, err error)
createTopicInSendMessageMethod 创建topic Author gaoyanlei Since 2017/8/10
func (*TopicConfigManager) Decode ¶
func (self *TopicConfigManager) Decode(content []byte)
func (*TopicConfigManager) DeleteTopicConfig ¶
func (tcm *TopicConfigManager) DeleteTopicConfig(topic string)
deleteTopicConfig 删除topic Author gaoyanlei Since 2017/8/10
func (*TopicConfigManager) Encode ¶
func (self *TopicConfigManager) Encode(prettyFormat bool) string
func (*TopicConfigManager) IsOrderTopic ¶
func (tcm *TopicConfigManager) IsOrderTopic(topic string) bool
isOrderTopic 判断是否是顺序topic Author gaoyanlei Since 2017/8/10
func (*TopicConfigManager) Load ¶
func (tcm *TopicConfigManager) Load() bool
func (*TopicConfigManager) SelectTopicConfig ¶
func (tcm *TopicConfigManager) SelectTopicConfig(topic string) *stgcommon.TopicConfig
selectTopicConfig 根据topic查找 Author gaoyanlei Since 2017/8/11
func (*TopicConfigManager) UpdateTopicConfig ¶
func (tcm *TopicConfigManager) UpdateTopicConfig(topicConfig *stgcommon.TopicConfig)
updateTopicConfig 更新topic信息 Author gaoyanlei Since 2017/8/10
Source Files ¶
- abstract_send_message_processor.go
- admin_broker_processor.go
- broker_all_config.go
- broker_controller.go
- broker_controller_task.go
- broker_path_config_helper.go
- broker_startup.go
- broker_to_client.go
- client_house_keeping_service.go
- client_manage_processor.go
- config_manager.go
- consumer_offset_manager.go
- default_consumer_ids_change_listener.go
- default_transaction_check_executer.go
- end_transaction_processor.go
- filter_server_manager.go
- offset_table.go
- offset_table_ffjson.go
- pull_message_processor.go
- pull_request_hold_service.go
- query_message_processor.go
- rebalance_lock_manager.go
- send_message_processor.go
- slave_synchronize.go
- subscription_group_manager.go
- topic_config_manager.go