Documentation ¶
Index ¶
- type ChannelInfo
- type ConsumerGroupInfo
- func (cg *ConsumerGroupInfo) FindChannel(clientId string) *ChannelInfo
- func (cg *ConsumerGroupInfo) FindSubscriptionData(topic string) *heartbeat.SubscriptionData
- func (cg *ConsumerGroupInfo) FindSubscriptionDataPlus(topic string) *heartbeat.SubscriptionDataPlus
- func (cg *ConsumerGroupInfo) GetAllChannel() []netm.Context
- func (cg *ConsumerGroupInfo) GetAllClientId() []string
- func (cg *ConsumerGroupInfo) SubscriptionTableToMap() map[string]*heartbeat.SubscriptionDataPlus
- func (cg *ConsumerGroupInfo) UnregisterChannel(clientChannelInfo *ChannelInfo)
- func (cg *ConsumerGroupInfo) UpdateChannel(infoNew *ChannelInfo, consumeType heartbeat.ConsumeType, ...) bool
- func (cg *ConsumerGroupInfo) UpdateSubscription(subList []heartbeat.SubscriptionDataPlus) bool
- type ConsumerManager
- func (cm *ConsumerManager) DoChannelCloseEvent(remoteAddr string, ctx netm.Context)
- func (cm *ConsumerManager) FindChannel(group, clientId string) *ChannelInfo
- func (cm *ConsumerManager) FindSubscriptionData(group, topic string) *heartbeat.SubscriptionDataPlus
- func (cm *ConsumerManager) FindSubscriptionDataCount(group string) int32
- func (cm *ConsumerManager) GetConsumerGroupInfo(group string) *ConsumerGroupInfo
- func (cm *ConsumerManager) QueryTopicConsumeByWho(topic string) set.Set
- func (cm *ConsumerManager) RegisterConsumer(group string, channelInfo *ChannelInfo, consumeType heartbeat.ConsumeType, ...) bool
- func (cm *ConsumerManager) ScanNotActiveChannel()
- func (cm *ConsumerManager) UnregisterConsumer(group string, channelInfo *ChannelInfo)
- type ConsumerTable
- type ProducerGroupConnTable
- func (table *ProducerGroupConnTable) ForeachByWPerm(fn func(k string, v map[string]*ChannelInfo))
- func (table *ProducerGroupConnTable) Get(k string) map[string]*ChannelInfo
- func (table *ProducerGroupConnTable) Put(k string, v map[string]*ChannelInfo)
- func (table *ProducerGroupConnTable) Remove(k string) map[string]*ChannelInfo
- func (table *ProducerGroupConnTable) Size() int
- type ProducerManager
- func (pm *ProducerManager) DoChannelCloseEvent(remoteAddr string, ctx netm.Context)
- func (pm *ProducerManager) GetGroupChannelTable() *ProducerGroupConnTable
- func (pm *ProducerManager) PickProducerChannelRandomly(producerGroupHashCode int) *ChannelInfo
- func (pm *ProducerManager) RegisterProducer(group string, channelInfo *ChannelInfo)
- func (pm *ProducerManager) ScanNotActiveChannel()
- func (pm *ProducerManager) UnregisterProducer(group string, channelInfo *ChannelInfo)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChannelInfo ¶
type ChannelInfo struct { Context netm.Context ClientId string LanguageCode string Addr string Version int32 LastUpdateTimestamp int64 }
func NewClientChannelInfo ¶
func (*ChannelInfo) ToString ¶
func (info *ChannelInfo) ToString() string
type ConsumerGroupInfo ¶
type ConsumerGroupInfo struct { GroupName string SubscriptionTable *sync.Map // key:Topic, val:SubscriptionDataPlus ConnTable *sync.Map // key: Channel.Addr() val: ChannelInfo ConsumeType heartbeat.ConsumeType MessageModel heartbeat.MessageModel ConsumeFromWhere heartbeat.ConsumeFromWhere // contains filtered or unexported fields }
ConsumerGroupInfo 整个Consumer Group信息 Author gaoyanlei Since 2017/8/17
func NewConsumerGroupInfo ¶
func NewConsumerGroupInfo(groupName string, consumeType heartbeat.ConsumeType, messageModel heartbeat.MessageModel, consumeFromWhere heartbeat.ConsumeFromWhere) *ConsumerGroupInfo
func (*ConsumerGroupInfo) FindChannel ¶
func (cg *ConsumerGroupInfo) FindChannel(clientId string) *ChannelInfo
FindChannel 根据clientId获得通道 Author rongzhihong Since 2017/9/17
func (*ConsumerGroupInfo) FindSubscriptionData ¶
func (cg *ConsumerGroupInfo) FindSubscriptionData(topic string) *heartbeat.SubscriptionData
func (*ConsumerGroupInfo) FindSubscriptionDataPlus ¶
func (cg *ConsumerGroupInfo) FindSubscriptionDataPlus(topic string) *heartbeat.SubscriptionDataPlus
func (*ConsumerGroupInfo) GetAllChannel ¶
func (cg *ConsumerGroupInfo) GetAllChannel() []netm.Context
getAllChannel 获得所有通道 Author rongzhihong Since 2017/9/11
func (*ConsumerGroupInfo) GetAllClientId ¶
func (cg *ConsumerGroupInfo) GetAllClientId() []string
getAllChannel 获得所有客户端ID Author rongzhihong Since 2017/9/14
func (*ConsumerGroupInfo) SubscriptionTableToMap ¶
func (cg *ConsumerGroupInfo) SubscriptionTableToMap() map[string]*heartbeat.SubscriptionDataPlus
SubscriptionTableToMap SubscriptionTable To Map Author rongzhihong Since 2017/9/17
func (*ConsumerGroupInfo) UnregisterChannel ¶
func (cg *ConsumerGroupInfo) UnregisterChannel(clientChannelInfo *ChannelInfo)
UnregisterChannel 注销通道 Author rongzhihong Since 2017/9/14
func (*ConsumerGroupInfo) UpdateChannel ¶
func (cg *ConsumerGroupInfo) UpdateChannel(infoNew *ChannelInfo, consumeType heartbeat.ConsumeType, messageModel heartbeat.MessageModel, consumeFromWhere heartbeat.ConsumeFromWhere) bool
*
- UpdateChannel 更新通道
- Author gaoyanlei
- Since 2017/9/21
func (*ConsumerGroupInfo) UpdateSubscription ¶
func (cg *ConsumerGroupInfo) UpdateSubscription(subList []heartbeat.SubscriptionDataPlus) bool
UpdateSubscription 更新订阅 Author rongzhihong Since 2017/9/17
type ConsumerManager ¶
type ConsumerManager struct { ConsumerIdsChangeListener rebalance.ConsumerIdsChangeListener ChannelExpiredTimeout int64 // contains filtered or unexported fields }
ConsumerManager 消费者管理 Author gaoyanlei Since 2017/8/9
func NewConsumerManager ¶
func NewConsumerManager(consumerIdsChangeListener rebalance.ConsumerIdsChangeListener) *ConsumerManager
NewConsumerOffsetManager 初始化ConsumerOffsetManager Author gaoyanlei Since 2017/8/9
func (*ConsumerManager) DoChannelCloseEvent ¶
func (cm *ConsumerManager) DoChannelCloseEvent(remoteAddr string, ctx netm.Context)
ScanNotActiveChannel 扫描不活跃的通道 Author rongzhihong Since 2017/9/11
func (*ConsumerManager) FindChannel ¶
func (cm *ConsumerManager) FindChannel(group, clientId string) *ChannelInfo
FindChannel 获得某个组某个消费Id对应的通道 Author rongzhihong Since 2017/9/18
func (*ConsumerManager) FindSubscriptionData ¶
func (cm *ConsumerManager) FindSubscriptionData(group, topic string) *heartbeat.SubscriptionDataPlus
func (*ConsumerManager) FindSubscriptionDataCount ¶
func (cm *ConsumerManager) FindSubscriptionDataCount(group string) int32
FindSubscriptionDataCount 根据group查找订阅数量 Author rongzhihong Since 2017/9/18
func (*ConsumerManager) GetConsumerGroupInfo ¶
func (cm *ConsumerManager) GetConsumerGroupInfo(group string) *ConsumerGroupInfo
func (*ConsumerManager) QueryTopicConsumeByWho ¶
func (cm *ConsumerManager) QueryTopicConsumeByWho(topic string) set.Set
QueryTopicConsumeByWho 根据topic查找消费者 Author rongzhihong Since 2017/9/18
func (*ConsumerManager) RegisterConsumer ¶
func (cm *ConsumerManager) RegisterConsumer(group string, channelInfo *ChannelInfo, consumeType heartbeat.ConsumeType, messageModel heartbeat.MessageModel, consumeFromWhere heartbeat.ConsumeFromWhere, subList []heartbeat.SubscriptionDataPlus) bool
registerConsumer 注册Consumer Author gaoyanlei Since 2017/8/24
func (*ConsumerManager) ScanNotActiveChannel ¶
func (cm *ConsumerManager) ScanNotActiveChannel()
ScanNotActiveChannel 扫描不活跃的通道 Author rongzhihong Since 2017/9/11
func (*ConsumerManager) UnregisterConsumer ¶
func (cm *ConsumerManager) UnregisterConsumer(group string, channelInfo *ChannelInfo)
UnregisterConsumer 注销消费者 Author rongzhihong Since 2017/9/18
type ConsumerTable ¶
type ConsumerTable struct { GroupChannelTable map[string]*ConsumerGroupInfo sync.RWMutex `json:"-"` }
func NewConsumerTable ¶
func NewConsumerTable() *ConsumerTable
type ProducerGroupConnTable ¶
type ProducerGroupConnTable struct { GroupChannelTable map[string]map[string]*ChannelInfo // key:group value: map[channel.Addr()]ChannelInfo sync.RWMutex `json:"-"` }
func NewProducerGroupConnTable ¶
func NewProducerGroupConnTable() *ProducerGroupConnTable
func (*ProducerGroupConnTable) ForeachByWPerm ¶
func (table *ProducerGroupConnTable) ForeachByWPerm(fn func(k string, v map[string]*ChannelInfo))
ForeachByWPerm 写操作迭代 Author rongzhihong Since 2017/10/17
func (*ProducerGroupConnTable) Get ¶
func (table *ProducerGroupConnTable) Get(k string) map[string]*ChannelInfo
func (*ProducerGroupConnTable) Put ¶
func (table *ProducerGroupConnTable) Put(k string, v map[string]*ChannelInfo)
func (*ProducerGroupConnTable) Remove ¶
func (table *ProducerGroupConnTable) Remove(k string) map[string]*ChannelInfo
func (*ProducerGroupConnTable) Size ¶
func (table *ProducerGroupConnTable) Size() int
type ProducerManager ¶
type ProducerManager struct { LockTimeoutMillis int64 ChannelExpiredTimeout int64 GroupChannelTable *ProducerGroupConnTable GroupChannelLock sync.RWMutex HashCodeChannelLock sync.RWMutex Rand *rand.Rand // contains filtered or unexported fields }
func NewProducerManager ¶
func NewProducerManager() *ProducerManager
func (*ProducerManager) DoChannelCloseEvent ¶
func (pm *ProducerManager) DoChannelCloseEvent(remoteAddr string, ctx netm.Context)
DoChannelCloseEvent 通道关闭事件 Author rongzhihong Since 2017/9/17
func (*ProducerManager) GetGroupChannelTable ¶
func (pm *ProducerManager) GetGroupChannelTable() *ProducerGroupConnTable
GetGroupChannelTable 获得组通道 Author gaoyanlei Since 2017/8/24
func (*ProducerManager) PickProducerChannelRandomly ¶
func (pm *ProducerManager) PickProducerChannelRandomly(producerGroupHashCode int) *ChannelInfo
PickProducerChannelRandomly 事务消息 Author rongzhihong Since 2017/9/17
func (*ProducerManager) RegisterProducer ¶
func (pm *ProducerManager) RegisterProducer(group string, channelInfo *ChannelInfo)
registerProducer producer注册 Author gaoyanlei Since 2017/8/24
func (*ProducerManager) ScanNotActiveChannel ¶
func (pm *ProducerManager) ScanNotActiveChannel()
ScanNotActiveChannel 扫描不活跃通道 Author rongzhihong Since 2017/9/17
func (*ProducerManager) UnregisterProducer ¶
func (pm *ProducerManager) UnregisterProducer(group string, channelInfo *ChannelInfo)
UnregisterProducer 注销producer Author gaoyanlei Since 2017/8/24