Documentation
¶
Index ¶
- Constants
- Variables
- type ConsumerMap
- func (rm *ConsumerMap) Add(now int64, id int32, rely *Rely) error
- func (rm *ConsumerMap) GetRelyTopics(id int32) (Rely, error)
- func (rm *ConsumerMap) Remove(id int32)
- func (rm *ConsumerMap) Update(now int64, id int32, topics []string) error
- func (rm *ConsumerMap) UpdateTime(now int64, id int32) error
- type Data
- func (data *Data) AddConsumer(now int64, id int32, rely *Rely) error
- func (data *Data) AddProvider(topicName string, now int64, id int32, provider *Provider) error
- func (data *Data) GetID() int32
- func (data *Data) RemoveConsumer(id int32) error
- func (data *Data) RemoveProvider(topicName string, now int64, id int32) error
- func (data *Data) Schedule(task *Task)
- func (data *Data) UpdateConsumerRely(now int64, id int32, topics []string) error
- func (data *Data) UpdateConsumerTime(now int64, id int32) error
- func (data *Data) UpdateProvider(topicName string, now int64, id int32, provider *Provider) error
- type Provider
- type Rely
- type Task
- type TaskAction
- type Topic
- func (t *Topic) AddProvider(now int64, id int32, provider *Provider) error
- func (t *Topic) GetPendingProvider(id int32) (*Provider, error)
- func (t *Topic) GetRunningProvider(id int32) (*Provider, error)
- func (t *Topic) Pend(now int64, id int32) error
- func (t *Topic) RemovePendingProvider(id int32)
- func (t *Topic) RemoveRunningProvider(now int64, id int32)
- func (t *Topic) Resurrect(now int64, id int32) error
- func (t *Topic) UpdateRunningProvider(now int64, id int32, provider *Provider) error
- type TopicMap
Constants ¶
View Source
const ( TEN_SECOND = 10000000000 // 10秒 SIX_SECOND = 6000000000 // 6秒 FIVE_SECOND = 5000000000 // 5秒 THREE_SECOND = 3000000000 // 3秒 ONE_SECOND = 1000000000 // 1秒 )
Variables ¶
View Source
var ProviderSet = wire.NewSet(NewData)
ProviderSet is data providers.
Functions ¶
This section is empty.
Types ¶
type ConsumerMap ¶
依赖列表
int32 是消费者的id rely中是被消费者的信息,以及更新时间 心跳来时,就从这里查询依赖信息 如果信息不一致,则让客户端再拉取一次
func (*ConsumerMap) Add ¶
func (rm *ConsumerMap) Add(now int64, id int32, rely *Rely) error
增加一个消费者
加写锁
func (*ConsumerMap) GetRelyTopics ¶
func (rm *ConsumerMap) GetRelyTopics(id int32) (Rely, error)
获取消费者当前信息
加读锁 这里不返回指针,是为了防止被外部修改,导致逻辑混乱
func (*ConsumerMap) Update ¶
func (rm *ConsumerMap) Update(now int64, id int32, topics []string) error
修改该消费者的依赖项
加写锁
func (*ConsumerMap) UpdateTime ¶
func (rm *ConsumerMap) UpdateTime(now int64, id int32) error
消费者的依赖变更时间
加写锁 这个是消费者依赖的topic没有改变的情况下, 消费者所依赖的 provider 更新了,对应更新 依赖他的 cumsumer 的更新时间
type Data ¶
type Data struct {
// contains filtered or unexported fields
}
Data .
func (*Data) RemoveProvider ¶
移除一个 provider
思路:先去拿 topic,拿不到则 报错 之后从 topic 中查询 id 在 pending 还是 running 队列 若都不存在则报错
func (*Data) UpdateConsumerRely ¶
更新 consumer 依赖项
也就是说 consumer 依赖的 topic 有改变 比如从依赖 (A,B) 变成 (A,C) 等
func (*Data) UpdateConsumerTime ¶
更新 consumer 依赖状况
这是 consumer 依赖的 topic 未变 但是被依赖的 provider 自身出现变动应该触发的函数 比如 provider current 变化,或者 provider 失联,被 移动到 pending 队列中等情况,在心跳时被消费者检测到
type Task ¶
type Task struct { Content map[string][]int32 // string -> topic_name, i32 -> provider_id Action TaskAction // 行为 }
type TaskAction ¶
type TaskAction int8
延时任务类型,目前只有一个 TURN_TO_PENDING,不排除有新增的可能,先暂时这样
const (
TURN_TO_PENDING TaskAction = iota // 将 provider 移动到 pending 队列
)
type Topic ¶
type Topic struct { // 一般情况下,添加/修改/删除主题内单个provider时, // 又或者某个provider心跳超时未联系,会使用topic锁 // 在读取时不需要上锁,更新单个provider的 keepalive 时也不需要上锁 // 注: 所有方法都没有加锁,应当在调用时依据业务来加锁 // 原因1: 每个方法加锁会导致反复的加解锁,产生大量开销 // 原因2: go原生不支持可重入锁,若无意间将锁重复lock,会导致严重bug sync.RWMutex // contains filtered or unexported fields }
type TopicMap ¶
type TopicMap struct { // 这把大锁是防止同名主题在 // 同时创建,造成覆盖 // 也就是说,只有在创建/删除主题时,会使用该锁 // 注: 所有方法都没有加锁,应当在调用时依据业务来加锁 // 原因1: 每个方法加锁会导致反复的加解锁,产生大量开销 // 原因2: go原生不支持可重入锁,若无意间将锁重复lock,会导致严重bug sync.RWMutex // contains filtered or unexported fields }
服务列表
Click to show internal directories.
Click to hide internal directories.