client

package
v0.0.0-...-ba2213e Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2019 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type ChannelInfo

type ChannelInfo struct {
	Context             netm.Context
	ClientId            string
	LanguageCode        string
	Addr                string
	Version             int32
	LastUpdateTimestamp int64
}

func NewClientChannelInfo

func NewClientChannelInfo(ctx netm.Context, clientId string, languageCode, addr string, version int32) *ChannelInfo

func (*ChannelInfo) ToString

func (info *ChannelInfo) ToString() string

type ConsumerGroupInfo

type ConsumerGroupInfo struct {
	GroupName         string
	SubscriptionTable *sync.Map // key:Topic, val:SubscriptionDataPlus
	ConnTable         *sync.Map // key: Channel.Addr() val: ChannelInfo
	ConsumeType       heartbeat.ConsumeType
	MessageModel      heartbeat.MessageModel
	ConsumeFromWhere  heartbeat.ConsumeFromWhere
	// contains filtered or unexported fields
}

ConsumerGroupInfo 整个Consumer Group信息 Author gaoyanlei Since 2017/8/17

func NewConsumerGroupInfo

func NewConsumerGroupInfo(groupName string, consumeType heartbeat.ConsumeType, messageModel heartbeat.MessageModel,
	consumeFromWhere heartbeat.ConsumeFromWhere) *ConsumerGroupInfo

func (*ConsumerGroupInfo) FindChannel

func (cg *ConsumerGroupInfo) FindChannel(clientId string) *ChannelInfo

FindChannel 根据clientId获得通道 Author rongzhihong Since 2017/9/17

func (*ConsumerGroupInfo) FindSubscriptionData

func (cg *ConsumerGroupInfo) FindSubscriptionData(topic string) *heartbeat.SubscriptionData

func (*ConsumerGroupInfo) FindSubscriptionDataPlus

func (cg *ConsumerGroupInfo) FindSubscriptionDataPlus(topic string) *heartbeat.SubscriptionDataPlus

func (*ConsumerGroupInfo) GetAllChannel

func (cg *ConsumerGroupInfo) GetAllChannel() []netm.Context

getAllChannel 获得所有通道 Author rongzhihong Since 2017/9/11

func (*ConsumerGroupInfo) GetAllClientId

func (cg *ConsumerGroupInfo) GetAllClientId() []string

getAllChannel 获得所有客户端ID Author rongzhihong Since 2017/9/14

func (*ConsumerGroupInfo) SubscriptionTableToMap

func (cg *ConsumerGroupInfo) SubscriptionTableToMap() map[string]*heartbeat.SubscriptionDataPlus

SubscriptionTableToMap SubscriptionTable To Map Author rongzhihong Since 2017/9/17

func (*ConsumerGroupInfo) UnregisterChannel

func (cg *ConsumerGroupInfo) UnregisterChannel(clientChannelInfo *ChannelInfo)

UnregisterChannel 注销通道 Author rongzhihong Since 2017/9/14

func (*ConsumerGroupInfo) UpdateChannel

func (cg *ConsumerGroupInfo) UpdateChannel(infoNew *ChannelInfo, consumeType heartbeat.ConsumeType,
	messageModel heartbeat.MessageModel, consumeFromWhere heartbeat.ConsumeFromWhere) bool

*

  • UpdateChannel 更新通道
  • Author gaoyanlei
  • Since 2017/9/21

func (*ConsumerGroupInfo) UpdateSubscription

func (cg *ConsumerGroupInfo) UpdateSubscription(subList []heartbeat.SubscriptionDataPlus) bool

UpdateSubscription 更新订阅 Author rongzhihong Since 2017/9/17

type ConsumerManager

type ConsumerManager struct {
	ConsumerIdsChangeListener rebalance.ConsumerIdsChangeListener
	ChannelExpiredTimeout     int64
	// contains filtered or unexported fields
}

ConsumerManager 消费者管理 Author gaoyanlei Since 2017/8/9

func NewConsumerManager

func NewConsumerManager(consumerIdsChangeListener rebalance.ConsumerIdsChangeListener) *ConsumerManager

NewConsumerOffsetManager 初始化ConsumerOffsetManager Author gaoyanlei Since 2017/8/9

func (*ConsumerManager) DoChannelCloseEvent

func (cm *ConsumerManager) DoChannelCloseEvent(remoteAddr string, ctx netm.Context)

ScanNotActiveChannel 扫描不活跃的通道 Author rongzhihong Since 2017/9/11

func (*ConsumerManager) FindChannel

func (cm *ConsumerManager) FindChannel(group, clientId string) *ChannelInfo

FindChannel 获得某个组某个消费Id对应的通道 Author rongzhihong Since 2017/9/18

func (*ConsumerManager) FindSubscriptionData

func (cm *ConsumerManager) FindSubscriptionData(group, topic string) *heartbeat.SubscriptionDataPlus

func (*ConsumerManager) FindSubscriptionDataCount

func (cm *ConsumerManager) FindSubscriptionDataCount(group string) int32

FindSubscriptionDataCount 根据group查找订阅数量 Author rongzhihong Since 2017/9/18

func (*ConsumerManager) GetConsumerGroupInfo

func (cm *ConsumerManager) GetConsumerGroupInfo(group string) *ConsumerGroupInfo

func (*ConsumerManager) QueryTopicConsumeByWho

func (cm *ConsumerManager) QueryTopicConsumeByWho(topic string) set.Set

QueryTopicConsumeByWho 根据topic查找消费者 Author rongzhihong Since 2017/9/18

func (*ConsumerManager) RegisterConsumer

func (cm *ConsumerManager) RegisterConsumer(group string, channelInfo *ChannelInfo, consumeType heartbeat.ConsumeType,
	messageModel heartbeat.MessageModel, consumeFromWhere heartbeat.ConsumeFromWhere, subList []heartbeat.SubscriptionDataPlus) bool

registerConsumer 注册Consumer Author gaoyanlei Since 2017/8/24

func (*ConsumerManager) ScanNotActiveChannel

func (cm *ConsumerManager) ScanNotActiveChannel()

ScanNotActiveChannel 扫描不活跃的通道 Author rongzhihong Since 2017/9/11

func (*ConsumerManager) UnregisterConsumer

func (cm *ConsumerManager) UnregisterConsumer(group string, channelInfo *ChannelInfo)

UnregisterConsumer 注销消费者 Author rongzhihong Since 2017/9/18

type ConsumerTable

type ConsumerTable struct {
	GroupChannelTable map[string]*ConsumerGroupInfo
	sync.RWMutex      `json:"-"`
}

func NewConsumerTable

func NewConsumerTable() *ConsumerTable

type ProducerGroupConnTable

type ProducerGroupConnTable struct {
	GroupChannelTable map[string]map[string]*ChannelInfo // key:group value: map[channel.Addr()]ChannelInfo
	sync.RWMutex      `json:"-"`
}

func NewProducerGroupConnTable

func NewProducerGroupConnTable() *ProducerGroupConnTable

func (*ProducerGroupConnTable) ForeachByWPerm

func (table *ProducerGroupConnTable) ForeachByWPerm(fn func(k string, v map[string]*ChannelInfo))

ForeachByWPerm 写操作迭代 Author rongzhihong Since 2017/10/17

func (*ProducerGroupConnTable) Get

func (table *ProducerGroupConnTable) Get(k string) map[string]*ChannelInfo

func (*ProducerGroupConnTable) Put

func (table *ProducerGroupConnTable) Put(k string, v map[string]*ChannelInfo)

func (*ProducerGroupConnTable) Remove

func (table *ProducerGroupConnTable) Remove(k string) map[string]*ChannelInfo

func (*ProducerGroupConnTable) Size

func (table *ProducerGroupConnTable) Size() int

type ProducerManager

type ProducerManager struct {
	LockTimeoutMillis     int64
	ChannelExpiredTimeout int64
	GroupChannelTable     *ProducerGroupConnTable
	GroupChannelLock      sync.RWMutex

	HashCodeChannelLock sync.RWMutex
	Rand                *rand.Rand
	// contains filtered or unexported fields
}

func NewProducerManager

func NewProducerManager() *ProducerManager

func (*ProducerManager) DoChannelCloseEvent

func (pm *ProducerManager) DoChannelCloseEvent(remoteAddr string, ctx netm.Context)

DoChannelCloseEvent 通道关闭事件 Author rongzhihong Since 2017/9/17

func (*ProducerManager) GetGroupChannelTable

func (pm *ProducerManager) GetGroupChannelTable() *ProducerGroupConnTable

GetGroupChannelTable 获得组通道 Author gaoyanlei Since 2017/8/24

func (*ProducerManager) PickProducerChannelRandomly

func (pm *ProducerManager) PickProducerChannelRandomly(producerGroupHashCode int) *ChannelInfo

PickProducerChannelRandomly 事务消息 Author rongzhihong Since 2017/9/17

func (*ProducerManager) RegisterProducer

func (pm *ProducerManager) RegisterProducer(group string, channelInfo *ChannelInfo)

registerProducer producer注册 Author gaoyanlei Since 2017/8/24

func (*ProducerManager) ScanNotActiveChannel

func (pm *ProducerManager) ScanNotActiveChannel()

ScanNotActiveChannel 扫描不活跃通道 Author rongzhihong Since 2017/9/17

func (*ProducerManager) UnregisterProducer

func (pm *ProducerManager) UnregisterProducer(group string, channelInfo *ChannelInfo)

UnregisterProducer 注销producer Author gaoyanlei Since 2017/8/24

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL