engine

package
v0.0.0-...-b70a47d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2023 License: MIT Imports: 8 Imported by: 0

README

内存存储引擎设计

这里分成了三个大板块,分别对应了生产者,消费者,异步调度(为心跳准备的)

data 是最主要的对外暴露接口,除此之外,可能会暴露其他结构体,具体的还需设计完成之后再进行叙述

这里的时间表示都是 int64,原因是为了节省空间, time.Timer 占据了 24 个字节,然而当我们使用纳秒为单位表示时间,int64 仅占据了8个字节,每一个结构体就能节省16个字节。

ConsumerMap

这个结构体中存储的是一个以 id 为 key 的映射集合,值对应的是该id消费者所依赖的topic列表,以及其更新时间

当心跳发送到服务端,服务端就会通过 value 中的更新时间去遍历 providerMap 对应的 topic,若 topic 更新时间大于消费者更新时间,则将其记录下来在心跳中返回,服务端会根据返回信息重新拉取信息,并修改 consumer 的更新时间

ProviderMap

这个结构是通过 map 去存储另一个 map 结构,外层map通过名字对应一个主题topic

topic 中对应一个 map 结构,存储的是该 topic 类型服务的提供者信息,当有消费者来消费时,就随机拉取一个服务交给消费者

schedule

这是用于异步调度任务的结构体,

Documentation

Index

Constants

View Source
const (
	TEN_SECOND   = 10000000000 // 10秒
	SIX_SECOND   = 6000000000  // 6秒
	FIVE_SECOND  = 5000000000  // 5秒
	THREE_SECOND = 3000000000  // 3秒
	ONE_SECOND   = 1000000000  // 1秒
)

Variables

View Source
var ProviderSet = wire.NewSet(NewData)

ProviderSet is data providers.

Functions

This section is empty.

Types

type ConsumerMap

type ConsumerMap struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

依赖列表

int32 是消费者的id
rely中是被消费者的信息,以及更新时间
心跳来时,就从这里查询依赖信息
如果信息不一致,则让客户端再拉取一次

func (*ConsumerMap) Add

func (rm *ConsumerMap) Add(now int64, id int32, rely *Rely) error

增加一个消费者

加写锁

func (*ConsumerMap) GetRelyTopics

func (rm *ConsumerMap) GetRelyTopics(id int32) (Rely, error)

获取消费者当前信息

加读锁
这里不返回指针,是为了防止被外部修改,导致逻辑混乱

func (*ConsumerMap) Remove

func (rm *ConsumerMap) Remove(id int32)

移除一个消费者

加写锁

func (*ConsumerMap) Update

func (rm *ConsumerMap) Update(now int64, id int32, topics []string) error

修改该消费者的依赖项

加写锁

func (*ConsumerMap) UpdateTime

func (rm *ConsumerMap) UpdateTime(now int64, id int32) error

消费者的依赖变更时间

加写锁
这个是消费者依赖的topic没有改变的情况下,
消费者所依赖的 provider 更新了,对应更新
依赖他的 cumsumer 的更新时间

type Data

type Data struct {
	// contains filtered or unexported fields
}

Data .

func NewData

func NewData(c *conf.Data, logger log.Logger) (*Data, func(), error)

NewData .

func (*Data) AddConsumer

func (data *Data) AddConsumer(now int64, id int32, rely *Rely) error

添加一个 consumer

添加一个消费者

func (*Data) AddProvider

func (data *Data) AddProvider(topicName string, now int64, id int32, provider *Provider) error

添加一个 provider

思路: 先去拿 topic,拿不到则创建 topic
之后将 provider 塞入 topic,注意锁的使用

func (*Data) GetID

func (data *Data) GetID() int32

获取 ID

初次连接时用于创建唯一标识 ID

func (*Data) RemoveConsumer

func (data *Data) RemoveConsumer(id int32) error

移除一个 consumer

移除一个消费者

func (*Data) RemoveProvider

func (data *Data) RemoveProvider(topicName string, now int64, id int32) error

移除一个 provider

思路:先去拿 topic,拿不到则 报错
之后从 topic 中查询 id 在 pending 还是 running 队列
若都不存在则报错

func (*Data) Schedule

func (data *Data) Schedule(task *Task)

向内传入一个延时任务

func (*Data) UpdateConsumerRely

func (data *Data) UpdateConsumerRely(now int64, id int32, topics []string) error

更新 consumer 依赖项

也就是说 consumer 依赖的 topic 有改变
比如从依赖 (A,B) 变成 (A,C) 等

func (*Data) UpdateConsumerTime

func (data *Data) UpdateConsumerTime(now int64, id int32) error

更新 consumer 依赖状况

这是 consumer 依赖的 topic 未变
但是被依赖的 provider 自身出现变动应该触发的函数
比如 provider current 变化,或者 provider 失联,被
移动到 pending 队列中等情况,在心跳时被消费者检测到

func (*Data) UpdateProvider

func (data *Data) UpdateProvider(topicName string, now int64, id int32, provider *Provider) error

更新一个 provider

思路:获取 topic,未获取到报错
更新 provider,出错之后先尝试从 pending 队列中将其复活,出错则返回
再次尝试更新,若出错则返回

type Provider

type Provider struct {
	Schema    map[string]string // 元数据
	Keepalive int64             // 心跳时间(纳秒,time.Now().UnixNano())
	IP        int64             // ip
	Port      int16             // 端口
}

type Rely

type Rely struct {
	Topics []string
	// contains filtered or unexported fields
}

type Task

type Task struct {
	Content map[string][]int32 // string -> topic_name, i32 -> provider_id
	Action  TaskAction         // 行为
}

type TaskAction

type TaskAction int8

延时任务类型,目前只有一个 TURN_TO_PENDING,不排除有新增的可能,先暂时这样

const (
	TURN_TO_PENDING TaskAction = iota // 将 provider 移动到 pending 队列

)

type Topic

type Topic struct {
	// 一般情况下,添加/修改/删除主题内单个provider时,
	// 又或者某个provider心跳超时未联系,会使用topic锁
	// 在读取时不需要上锁,更新单个provider的 keepalive 时也不需要上锁
	// 注: 所有方法都没有加锁,应当在调用时依据业务来加锁
	// 		原因1: 每个方法加锁会导致反复的加解锁,产生大量开销
	// 		原因2: go原生不支持可重入锁,若无意间将锁重复lock,会导致严重bug
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*Topic) AddProvider

func (t *Topic) AddProvider(now int64, id int32, provider *Provider) error

添加一个 provider 到 running 列表中

加写锁

func (*Topic) GetPendingProvider

func (t *Topic) GetPendingProvider(id int32) (*Provider, error)

获取被 pending 的 provider

加读锁

func (*Topic) GetRunningProvider

func (t *Topic) GetRunningProvider(id int32) (*Provider, error)

获取正在运行的 provider

加读锁

func (*Topic) Pend

func (t *Topic) Pend(now int64, id int32) error

待裁决

加写锁
当长时间 provider 无响应时,将其从 running 转移到 pending

func (*Topic) RemovePendingProvider

func (t *Topic) RemovePendingProvider(id int32)

移除阻塞的 provider

加写锁
这个移除不会影响当前运行,因此不需要修改时间

func (*Topic) RemoveRunningProvider

func (t *Topic) RemoveRunningProvider(now int64, id int32)

移除正在运行的 provider

加写锁

func (*Topic) Resurrect

func (t *Topic) Resurrect(now int64, id int32) error

复活

加写锁
让 pending 状态中的 provider 复活,回到 running 列表中

func (*Topic) UpdateRunningProvider

func (t *Topic) UpdateRunningProvider(now int64, id int32, provider *Provider) error

修改 provider 信息

加写锁

type TopicMap

type TopicMap struct {
	// 这把大锁是防止同名主题在
	// 同时创建,造成覆盖
	// 也就是说,只有在创建/删除主题时,会使用该锁
	// 注: 所有方法都没有加锁,应当在调用时依据业务来加锁
	// 		原因1: 每个方法加锁会导致反复的加解锁,产生大量开销
	// 		原因2: go原生不支持可重入锁,若无意间将锁重复lock,会导致严重bug
	sync.RWMutex
	// contains filtered or unexported fields
}

服务列表

func (*TopicMap) AddTopic

func (tm *TopicMap) AddTopic(name string) (*Topic, error)

添加主题

加写锁

func (*TopicMap) GetTopic

func (tm *TopicMap) GetTopic(name string) (*Topic, error)

获取主题

加读锁

func (*TopicMap) GetXTopic

func (tm *TopicMap) GetXTopic(name string) (*Topic, error)

获取主题,若主题不存在则创建主题

加写锁

func (*TopicMap) RemoveTopic

func (tm *TopicMap) RemoveTopic(name string)

移除主题

加写锁

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL