#smartgo
smartgo整体架构图
smartgo是什么?
SmartGo是能够支持主流消息队列功能及满足物联网MQTT数千万长连接设备推送消息使用golang语言全新开发的一款分布式、队列模型的智能中间件,具有以下特点:
- 支持point-point、pub-sub、request-reply等多种模式
- 支持严格的消息顺序
- 支持数百万长连接
- 亿级消息堆积能力
- 比较友好的分布式特性
当前最新版本功能支持:
- 将整个项目命名为smartgo-1.0.0
- 将项目中所有子工程命名为stg-*
如何开始?
开发规范必读
- 源文件使用Unix换行、UTF-8文件编码,遵照golang内置格式化代码规范
- 请在git clone命令之前执行
git config --global core.autocrlf false
,确保本地代码使用Unix换行格式
- 请在非主干分支上开发,禁止提交本地未测试运行通过代码到线上分支
- 每次提交及之前(正常来说需要先pull --rebase,解决冲突),对代码进行修改必须有相对应的解释说明
- 正常组内开发人员提交代码,需要经过经过审核后方可提交(且需要有统一格式注释,参照注释类型3)
原始文档请参考:smartgo/docs/doc/目录:
SmartGo文档截图示例:
SmartGo-Store:
SmartGo-Broker:
SmartGo-Net:
SmartGo-Register:
SmartGo-Client:
Markdown文档概述(由于Markdown格式会缺失图片,细节请参考原始文档)
SmartGo-Store技术文档说明
针对版本V1.0.0
©
成都基础平台架构
2017/11/21
目 录
1 存储 4
1.1 概述 4
1.2 零拷贝技术 4
1.3 CommitLog 4
1.4 ConsumeQueue 7
1.5 索引 9
1.6 主从同步 11
1.7 刷盘 13
1.8 文件清理 13
附件一Smartgo开发者联系方式 15
- 1存储
- 1.1概述
存储模块主要包含存储Producer生产的消息、ConsumeQueue、索引等数据以及主从同步、刷盘、清理服务等。
- 1.2零拷贝技术
零拷贝是通过将文件映射到内存上,直接操作文件,相比于传统的io(首先要调用系统IO,然后要将数据从内核空间传输到用户空间),避免了很多不必要的数据拷贝,提高存储性能。
存储消息,使用了零拷贝,零拷贝包含以下两种方式:
方式 |
优点 |
缺点 |
mmap + write |
即使频繁调用,使用小块文件传输,效率也很高 |
不能很好的利用 DMA 方式,会比 sendfile 多消耗 CPU,内存安全性控制复杂 |
sendfile |
可以利用 DMA 方式,消耗 CPU 较少,大块文件传输效率高,无内存安全新问题 |
小块文件效率低于 mmap 方式 |
SmartGo采用mmap+write方式,因为有小块数据传输的需求,效果会比sendfile更好。
- 1.3CommitLog
CommitLog用于存储真实消息数据。CommitLog路径默认为用户工作目录/store/commitlog。
CommitLog存储目录结构:
commitlog
- 00000000000000000000
- 00000000001073741824
commitlog文件名生成的规则:
文件名的长度为20位,左边补零,剩余的为文件起始偏移量(第一个文件起始偏移量为0);
文件名字根据指定commitlog文件大小(默认文件大小为1G,可以通过MessageStoreConfig的mapedFileSizieCommitLog进行配置)递增,文件大小单位为字节。
例如:
默认commitlog文件大小为1G=1073741824b
第一文件的起始偏移量为0,不足20位进行补零,故文件名00000000000000000000,当第 一文件写满,第二文件的起始偏移量为1073741824,不足20位进行补零,故文件名为00000000001073741824,后面的文件名以此类推。
文件n起始偏移量 = size * (n- 1)
文件1起始偏移量 = 1073741824 * (1 - 1) = 0
文件2起始偏移量 = 1073741824 * (2 - 1) = 1073741824
通过commitlog文件名能够方便快速定位信息所在的文件。
文件Index = (消息的起始物理偏移量-最早的文件的起始偏移量)/文件大小,即 (1073741827-0)/1073741824=1,可得知该消息在队列中的第二个文件中:
commitlog文件的消息结构:
序号 |
字段 |
说明 |
字节数 |
备注 |
1 |
TotalSize |
消息总长度 |
4 |
|
2 |
MagicCode |
MagicCode |
4 |
MagicCode分为:MessageMagicCode、BlankMagicCode。MessageMagicCode表示正确的消息内容;BlankMagicCode表示CommitLog文件空间不足,采用空字节占位写满文件。 |
3 |
BodyCRC |
消息内容CRC |
4 |
BodyCRC的值是对消息内容(body)进行CRC32生成的32bit冗余校验码,用于确保消息的正确性。 |
4 |
QueueId |
消息队列编号 |
4 |
|
5 |
Flag |
消息标志 |
4 |
|
6 |
QueueOffset |
消息队列位置 |
8 |
自增值,消息队列逻辑位置,通过该值才能查找到consume queue中的数据;QueueOffset * 20才是消息队列的物理偏移量。 |
7 |
PhysicalOffset |
物理位置 |
8 |
|
8 |
SysFlag |
MessageSysFlag |
4 |
|
9 |
BornTimestamp |
生产消息时间戳 |
8 |
|
10 |
BornHost |
生产消息的地址+端口 |
8 |
|
11 |
StoreTimestamp |
存储消息时间戳 |
8 |
|
12 |
StoreHost |
存储消息的地址+端口 |
8 |
|
13 |
ReconsumeTimes |
重新消费消息次数 |
4 |
|
14 |
PreparedTransationOffset |
|
8 |
|
15 |
BodyLength |
消息内容长度 |
4 |
|
16 |
Body |
消息内容 |
bodyLength |
|
17 |
TopicLength |
Topic长度 |
1 |
|
18 |
Topic |
topic |
topicLength |
|
19 |
PropertiesLength |
附加属性长度 |
2 |
|
20 |
Properties |
附加属性 |
propertiesLength |
|
添加CommitLog数据,将数据写入到MapedFile,每个MapedFile对应着一个储存消息的二进制文件,MapedFile在创建时会映射到内存上,添加消息时将需要保存的数据写入内存,后续有刷盘服务会将内存中数据持久化到二进制物理文件中,下图是添加CommitLog数据的主要业务流程:
查询CommitLog数据,直接从映射的内存中根据物理偏移量以及数据大小,获取数据,下图是查询CommitLog数据的主要业务流程:
- 1.4ConsumeQueue
消费者逻辑队列,对应/store/consumequeue文件夹,每个消费队列文件目录机构如下:
consumequeue
-- topic
-- queue id
-- 00000000000000000000
-- 00000000000000001040
-- 00000000000000002080
consumequeue文件名生成规则:
commitlog文件名生成规则一致,需要注意的是:maped文件大小为=向上取整(指定size/消息位置信息size)* 消息位置信息size
例如:
指定消费队列文件大小=1024
消息位置信息size = 20
mapedFileSize = 向上取整(1024 / 20) * 20
mapedFileSize = 1040
consumequeue文件结构:
ConsumeQueue中并不需要存储消息的内容,而存储的是消息在CommitLog中的offset。也就是说,ConsumeQuue其实是CommitLog的一个索引文件。
序号 |
字段 |
说明 |
字节数 |
备注 |
1 |
CommitLog Offset |
CommitLog的起始物理偏移量physical offset |
8 |
|
2 |
Size |
消息的大小 |
4 |
|
3 |
Message Tag Hashcode |
消息Tag的哈希值 |
8 |
用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息) |
ConsumeQueue是定长的结构,每条数据大小为20个字节,每个文件默认大小为600万个字节。Consumer消费消息的时候,需要2个步骤:首先读取ConsumeQueue得到offset,然后读取CommitLog得到消息内容。
添加消息时,添加消息到commitLog后会向分发服务添加一个分发请求,分发服务调用MessageStore添加消息位置信息,根据消息的Topic、QueueId获取ConsumeQueue,消息的位置信息追加到对应的消费队列中,最终保存的二进制文件中,主要流程如下图:
获取消息时, 首先根据Topic、QueueId获取ConsumeQueue,然后根据消息逻辑offset,获取消息的物理偏移量、消息的Size,最后根据消息的物理偏移量、消息的Size获取CommitLog数据,主要流程如下图:
- 1.5索引
IndexService(索引服务)
IndexService用于创建索引文件集合,当用户想要查询某个topic下某个key的消息时,能够快速响应。
Index File(索引文件)
IndexFile存储消息索引的文件,文件结构如下:
索引文件由三个部分组成:Header(索引文件头信息)、Slot Table(槽位信息)、Index Linked List(消息的索引内容)
Index Header:索引文件头信息由40个字节的数据组成。
序号 |
字段 |
说明 |
字节数 |
备注 |
1 |
BeginTimestamp |
索引文件开始时间 |
8 |
第一个索引创建的时间 |
2 |
EndTimestamp |
索引文件结束时间 |
8 |
最后一个索引创建的时间 |
3 |
BeginPhyOffset |
索引文件开始的物理偏移量 |
8 |
第一个索引对应的CommitLog物理偏移量 |
4 |
EndPhyOffset |
索引文件结束的物理偏移量 |
8 |
最后一个索引对应的CommitLog物理偏移量 |
5 |
HashSlotCount |
索引文件占用的槽位数 |
4 |
|
6 |
IndexCount |
索引的个数 |
4 |
|
Slot Table
Index Linked List:消息的索引内容链表,默认每个文件有2000W消息索引内容组成,每个消息索引内容为20个字节的数据。
序号 |
字段 |
说明 |
字节数 |
备注 |
1 |
KeyHash |
key的哈希值 |
4 |
topic-key(key是消息的key)的hashCode组成 |
2 |
PhyOffset |
commitLog的物理偏移量 |
8 |
|
3 |
Timestamp |
索引创建的时间 |
4 |
|
4 |
NextIndexOffset |
下一个索引的索引地址 |
4 |
|
IndexFile的创建过程:
首先在DispatchMessageService写入ConsumeQueue后,会再调用indexService.putRequest,添加索引请求;IndexService定时获取创建索引请求,调用IndexService的buildIndex进行创建索引。
- 1.6主从同步
在集群模式的部署方式中,Master与Slave配对是通过指定相同的brokerName参数来配对,Master的BrokerId必须是0,Slave的BrokerId必须是大于0的数。一个Master下面可以挂载多个Slave,同一个Master下的多个Slave通过指定不同的BrokerId来区分。
主从同步服务
存储模块启动时,会启动主从同步服务,主从同步服务主要的组成部分是:主从同步服务端、主从同步客户端
主从同步服务端
接收slave节点的连接请求,接收到请求后会建立主从连接,接受和传递主从之间数据。
主从连接
主从连接主要由主从写服务、主从读服务组成,主从写服务主要用于master传输同步数据,主从读服务主要用于接受slave节点发送的offset信息。
主从写服务传输的数据结构:
序号 |
字段 |
说明 |
字节数 |
备注 |
1 |
Offset |
commitLog物理偏移量 |
8 |
同步commitLog物理偏移量 |
2 |
BodySize |
传输数据的大小 |
4 |
|
3 |
BodyData |
传输数据的内容 |
BodyLength |
|
主从同步客户端
连接master节点,定时上报offset以及接收master节点传输的同步数据。
主从同步客户端上报的数据结构:
序号 |
字段 |
说明 |
字节数 |
备注 |
1 |
Offset |
commitLog物理偏移量 |
8 |
slave节点的最大commitLog物理偏移量 |
主从同步客户端上报offset时,会获取当前最大CommitLog文件物理偏移量。如果HAClient是首次上报offset,并且上报的offset为0,master节点会获取最后一个CommitLog文件进行传输,其余的CommitLog文件不会进行同步。上报的offset不为0,master节点会从上报的offset进行同步。
- 1.7刷盘
RocketMQ刷盘有两种方式,分为:同步刷盘、异步刷盘。
同步刷盘:在消息到达MQ后,RocketMQ需要将数据持久化,同步刷盘是指数据到达内存之后,必须刷到commitlog日志之后才算成功,然后返回producer数据已经发送成功。
异步刷盘:数据到达内存之后,返回producer说数据已经发送成功,然后再写入commitlog日志。
RocketMQ默认是使用异步刷盘。
逻辑队列刷盘服务(FlushConsumeQueueService):用于将ConsumeQueue的File文件写入入里磁盘,
首先判断是否到达了刷盘时间,如果到达了,那么全盘通刷;否则,遍历所有的ConsumeQueue,调用cq.commit(flushConsumeQueueLeastPages)进行刷盘,flushConsumeQueueLeastPages是目前文件的未刷盘大小达到flushConsumeQueueLeastPages*OS_PAGE_SIZE(1024*4)个,才进行刷盘。
逻辑队列刷盘服务:定时将ConsumeQueue的数据从内存写入到文件。
- 1.8文件清理
存储服务启动时,会启动定时清理文件服务,定时清除服务会每分钟定时清理CommitLog、ConsumeQueue文件。
清理CommitLog文件服务
清理CommitLog文件,需要满足以下任意一条件:
1、消息文件过期(默认72小时),且到达清理时点(默认是凌晨4点),删除过期文件。
2、消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
3、磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。
注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。
清理ConsumeQueue文件服务
定时清理小于最小CommitLog物理偏移量的ConsumeQueue的文件。
SmartGo-Broker 技术文档说明
针对版本V1.0.0
©
成都基础平台架构
2017/11/21
目 录
1 概述 4
2 Borker模块交互 4
2.1 Registry 4
2.2 Client 4
2.3 Net 4
2.4 Store 5
2.5 Broker 5
3 专业术语 5
3.1 Topic 5
3.1 ConsumerOffset 5
3.2 SubscriptionGroup 5
4 Broker实现原理 5
4.1 Topic 管理 5
4.2 ConsumerOffset管理 7
4.3 SubscriptionGroup管理 9
4.4 发送消息 10
4.5 消费消息 11
4.6 主从同步 12
4.7 Hold 13
4.8 消息统计 14
4.9 Producer、Consumer连接管理 15
附件一 Smargo开发者联系方式 16
- 1概述
Broker消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。Broker通过自身实现方法并且发布,提供Client调用。也就是说相对于Client,Broker是一个Service。
Broker在Smargo的角色很多,它是给Producer、Consumer提供服务的Service、又是调用Registry服务的Client、而它自身还承载着运维接口、消息统计等一系列的功能。
- 2Borker模块交互
- 2.1Registry
每个Broker与每个Registry保持长连接。
启动时会向每一个Registry注册,启动过后Broker每隔30秒向Register发送心跳,注册和发送心跳都包含了将自身的clusterName,brokerName,topic信息发。如果Broker 2分钟内没有发送心跳数据,则断开连接。
Broker挂掉或者断开,Registry会有自动感应,会更新删除该Broker与Topic的关系。
- 2.2Client
每个Client通过Registry拿到BrokerList地址,Client与BrokerList保持长连接。
Producer向Broker发送消息,Broker负者处理解析消息,然后转发到Stroe进行消息持久化。
Consumer从Broker拉取消息进行消费,Broker会维护Consumer与Topic之间订阅关系,并且会维护与Topic消费的Offset。
- 2.3Net
Broker通过Net创建Service(目前端口为10911),注册并发布服务,供Client调用。
Broker通过Net创建Client,调用Registry的方法。
- 2.4Store
Broker收到消息,经过一些列的验证,解析,重新封装后将消息交给Store做后续的处理。
- 2.5Broker
Broker主节点之间没有交互,主节点与备节点同步Topic信息,Consumer Offset,延迟队列的Offset,订阅关系等。
- 3专业术语
- 3.1Topic
Topic是一个消息主题,一个在线Producer实例只能对应一个Topic,一个在线Consumer实例可以对应多个Topic,一条消息必须属于一个Topic。
- 1.1ConsumerOffset
ConsumerOffset主要记录了Consumer GroupName与Topic每个Queue的消费进度。
- 3.2SubscriptionGroup
SubscriptionGroup用来管理订阅组的订阅信息,包含订阅权限、重试队列,重试次数等。
- 4Broker实现原理
- 4.1Topic 管理
默认Topic
目前Broker启动时会生成六个默认的Topic,OFFSET_MOVED_EVENT、SELF_TEST_TOPIC、DEFAULT_TOPIC、BENCHMARK_TOPIC、集群名称Topic、Broker名称Topic。其中DEFAULT_TOPIC最为关键,应为Topic的创建会以DEFAULT_TOPIC为模板进行创建。目前Smargo中DEFAULT_TOPIC的读写队列默认为16个;并且是一个可读可写Topic。
持久化
每个Broker会将其下的每一个Topic进行统一的持久化,这些Topic被全部保存到一个以JSON的形式都保存到一个文件中,Smargo保存Topic文件的路径为/当前用户目录下/store/config/topic.json文件。该文件主要保存了每一个Topic的主要信息如:TopicName(topic名称)、ReadQueueNums(读队列个数)、WriteQueueNums(写队列个数)、Perm(topic权限)、Order(是否为顺序Topic)、topicSysFlag(系统标识)。
文件存储内容如下:
{
"topicConfigTable": {
"topicConfigTable": {
"%RETRY%consumerGroupId-example-200": {
"SEPARATOR": "",
"topicName": "%RETRY%consumerGroupId-example-200",
"readQueueNums": 1,
"writeQueueNums": 1,
"perm": 6,
"topicFilterType": 0,
"topicSysFlag": 0,
"order": false
}
}
},
"dataVersion": {
"timestamp": 1511333414604049700,
"counter": 2023
}
}
初始化
在Broker启动时,Broker会将Topic.json文件进行加载,在内存中维护一套Topic名称与Topic对象之间的关系,对Topic进行任何操作,都会更新内存所维护的关系以及Topic.json的文件。
创建Topic
创建Topic由Client发起,Broker没有检测到Client所需要发送的Topic,其创建如图所示:
在创建Topic的过程中,会将创建的Topic的队列数与DefaultTopic队列数对比,取其小的队列数为新建Topic的队列数。创建成功后会立马向所有Registry注册。
其他操作
如果对原有的Topic进行了操作,会第一时间将内存维护的信息进行更新并且会刷入磁盘中。Broker启动时会开启一个线程,每隔30秒向Registry注册,将更新的Topic维护到Registry中。
- 4.2ConsumerOffset管理
ConsumerOffset主要管理的是订阅组与Topic Queue消费进度的管理。具体流程如下:
初始化
在Broker启动时,Broker会将ConsumerOffset.json文件进行加载,在内存中维护一套以订阅组名称与Topic名称组合为key,以当前Topic队列消费offset为value的关系。
持久化
在Broker启动时候,Broker会开启一个线程每个5秒对Client上报的Consumer与Topic Offset进行持久化。
Smargo保存consumerOffset文件的路径为/当前用户目录下/store/config/consumerOffset.json文件。存储结构如下:
{
"offsets": {
groupName@TopicName: {
queue 1: offset,
…
queue x: offset,
}
}
}
- 4.3SubscriptionGroup管理
SubscriptionGroup用来管理订阅组的订阅信息,包含订阅权限、重试队列,重试次数等。其流程如下:
Consumer通过心跳服务进行对SubscriptionGroup来维护。
持久化
每当Broker维护SubscriptionGroup关系发生改变,都会进行一次持久化。Smargo保存subscriptionGroup文件的路径为/当前用户目录下/store/config/subscriptionGroup.json。存储结构如下:
{
topicName: {
"groupName": "xx", //订阅组名称
"consumeEnable": true, //是否可以消费
"consumeFromMinEnable": true, //是否允许从队列最小位置开始消费,线上默认会设置为false
"consumeBroadcastEnable": true, //是否允许广播方式消费
"retryQueueNums": 1, //消费失败的消息放到一个重试队列,每个订阅组配置几个重试队列
"retryMaxTimes": 16, //重试消费最大次数,超过则投递到死信队列,不再投递,并报警
"brokerId": 0, //从哪个Broker开始消费
"whichBrokerWhenConsumeSlowly": 0 //发现消息堆积后,将Consumer的消费请求重定向到另外一台Slave机器
}
},
"dataVersion": {
"timestamp": 1511342161274071800,
"counter": 3
}
}
- 4.4发送消息
整个消息的发送流程分为两个步骤流程:
Consumer回退消息
针对Consumer消费失败消息投放重试队列,Broker接收到消息检测到如果是消费失败的Consumer端回退消息。会经历一下流程:
- a)检测当前消息的订阅组是否存在。
- b)检测当前Broker是否有写入权限
- c)获取到重试队列Topic(重试队列Topic一般为%RETRY%+groupName),计算QueueID。
- d)如果当前消息消费次数大于设置重试消费次数则投入死信队列(死信队列Topic一般为%DLQ% +GroupName)消息将不会再被消费。
- e)如果当前消息消费次数小于设置重试消费次数,则会将当前消费次数+3个等级延迟,延迟该消息的消费。
- f)重新组装消息对象调用store服务。
- g)统计
Producter发送正常消息
正常消息的发送情况远没有重试消息流程复杂,其流程如下:
-
a)检查发送消息的合法性(Topic、Broker权限等)。
-
b)重新组装消息对象调用store服务。
-
c)统计
-
4.5消费消息
Bolt在consumer端的消费方式有两种:一种push(推)、一种pull(拉)。不管是pull与push对Broker而言都是由Consumer主动发起的pull操作。其主要流程如下:
-
a)在拉取消息时,Consumer会先同步当前Broker持久化当前定于组每个Queue的Offset。
-
b)Consumer通过获取到的Offset作为拉取消息的开始位置,向Broker发起拉取消息请求。
-
c)Broker校验信息(Broker权限、订阅关系、Topic是否存在、QueueID是否有效)。
-
d)去Store拉取消息。
-
e)Broker通过返回结果返回消息。在push模式下如果拉取消息为空,Consumer则会每隔15秒在进行一次拉取。
-
f)进行统计。
-
4.6主从同步
Broker集群只允许存在一个主节点,而一个主节点下可挂载一个或多个从节点。主节点与从节点通过心跳保持关联,而从节点彼此间不通信。从节点会定时从Registry查询主节点的地址,当主节点地址发生变化后,原关联就会中断,新的主从关联会重新建立。
在正常情况下,主节点对外提供Topic创建、消息存储、拉取等服务,而从节点只是用于备份数据。业务Topic从节点定时同步主节点的信息。只有主节点挂了之后,才允许客户端到从节点拉取消息,但是依然不允许从节点存储消息。主从同步时序图如下:
- 4.7Hold
消费端从Broker拉取消息,当消息不存在时,通过长连接或者定时发送拉取请求来实现当有新消息时能拉取到结果。当新消息到来较迟时,长连接或者定时发送请求的方式,都会造成带宽的浪费,造成Broker的没必要的压力。Broker延缓Hold住拉取消息请求的方式可解决上诉问题,将未拉取到消息的请求放入等待队列,待新消息到来时或者等待超时的唤醒。因Hold服务记录了此次拉取请求的通道信息,所以Broker可从该通道将查询到的消息或者超时状态主动推送给给客户端,从而避免了带宽的浪费。
消费端从Broker拉取某个偏移量的消息,当该偏移量比最大偏移量都大,则表明该消息还未存入Broker,无法被拉取到。将该拉取请求放入等待队列中,等待有新消息到来时的唤醒。Hold请求的序列图如下:
- 4.8消息统计
Broker从时间、消息来源、消费等多个维度统计了的Topic、Group的put、get消息的次数、流量等,实时计算、记录了对应的Tps。根据统计数据,用户即可了解自己的业务量、业务波动等,运维即可知道当前Broker集群的处理能力,为扩容等提供可靠数据依据。
Broker启动时,初始化统计服务,当Broker拉取或存储消息成功时,会在原统计数据的基础上增加相应次数、流量值,计算并记录不同时间段的Tps。消息统计时序图如下:
- 4.9Producer、Consumer连接管理
记录当前生产者、消费者与Broker相连的通道信息,维护通道其与Topic、Group、SubList关系。开发、运维人员可查看当前生产者、消费者数量,Topic对应的客户端数量、同一订阅组下客户端的数量等。
客户端通过Registry查询到Broker路由信息列表,遍历路由列表,找到业务Topic对应的Broker地址。根据Broker地址,客户端连接Broker,定时发送心跳来维护该通道连接,而Broker接收到心跳后,实时维护Channel、Topic、Group、SubList的彼此之间的关系。客户端连接管理时序图如下:
BlotMQ-net技术文档说明
针对版本V1.0.0
©
成都基础平台架构
2017/11/21
目录
1 概述 4
2 背景 4
3 专业术语 5
4 网络实现原理 5
4.1 设计与交互 5
4.2 连接管理 6
4.3 事件通知 6
4.4 粘包 7
4.5 通讯包结构 7
4.5.1 RemotingCommand结构 8
4.6 报文压缩 8
4.7 心跳处理 8
附件一 Smartgo开发者联系方式 9
- 1概述
本章将从网络层面讲解Smartgo,它采用的IO模型(epoll),如何实现的事情通知?粘包是怎么实现的?以及消息交互报文的协议,将一一进行说明。 Smartgo在网络层面使用TCP长连接作为通讯方式,RocketMQ使用Netty库为基础网络开发库,netty是事件驱动的网络编程框架和工具,它的强大毋庸置疑。而Golang目前还没有和Netty类似的实现库,所有需要构建一个性能优异的网络基础库。
- 2背景
Smartgo在网络层面使用TCP长连接作为通讯方式,RocketMQ使用Netty库为基础网络开发库,netty是事件驱动的网络编程框架和工具,它的强大毋庸置疑。而Golang目前还没有和Netty类似的实现库,所有需要构建一个性能优异的网络基础库。
IO模型
Netty是基于NIO(Non-blocking I/O)的实现,NIO的多路复用select/epoll默认使用epoll,可以在不同操作系统有不同选择。Smartgo同样选择了epoll,Golang的net包标准库底层使用了epoll,在runtime层面,是用epoll/kqueue实现的非阻塞io,为性能提供了保障。不同的是开发者层面任然是阻塞的,配合Golang的线程模型CSP能达到高性能。
连接管理
Smartgo的netm包提供的统一的连接管理功能,将所有连接统一管理,简化使用者维护连接。该功能讲会在之后去除,由事情通知功能代替,连接维护交由使用者维护。
事件通知
提供类似于Netty的事情通知功能,但目前只支持少数几类事情,提供代码的重用行。
粘包
这里的粘包是业务粘包,标准net包在底层提供了粘包保证了报文的正确性。业务报文是否完整,将进行粘包处理。
报文协议
Smartgo中报文格式定义分为header和body,这两部分都定义了格式来进行通信。这部分将介绍具体的格式以及含义。
-
3专业术语
-
4网络实现原理
-
4.1设计与交互
图中是整个网络层的设计以及报文的处理流程。分为client和server端,client负责创建连接,报文的封包和拆包。服务器除此之外还要维护客户端连接,保证连接能接收数据。缓存队列可以缓存突发流程的,保证程序的可靠性。
客户端首先创建连接,连接创建成功,发送消息并等待响应消息。发送消息前会将报文进行编码,接收到消息后也会将消息进行解码。
服务器端会启动端口监听,接收来自客户端的连接。当有连接连上的时候,服务端用一个新的Goroutine接收客户端连接。当接收到一个客户端发送的消息时,会将消息发到队列中,然后会从队列中取出消息进行粘包。最后将完整的报文交给业务进行处理并响应。这里说明一点的是,队列和粘包都针对的单个连接,减少资源的竞争。
- 4.2连接管理
当客户端创建连接或者服务端接收一个连接时,将会把连接放入到一个map中,连接地址作为key。同时将新建Goroutine接收连接所接收到的信息。该功能会在之后删除,由事件通知替代。
- 4.3事件通知
当连接状态发送变化时,将该事件通知给用户。支持事件:
- Active: 当接收到一个新创建的连接时,被动接收通常作为服务端。
- Connect: 当新创建一个新连接时,通常主动创建连接的客户端。
- Disconnet: 当连接时断开时,通常被动断开。
- Closed: 当连接时断开时,通常主动关闭。
- Error: 当连接使用中发生错误时。
- 4.4粘包
粘包指的是业务粘包,标准net包在底层提供了粘包保证了报文的正确性。业务报文是否完整,将进行粘包处理。Smartgo采用length-field的方式传输报文。length占用4个字节,存储之后的报文长度。粘包就是将接收的报文进行验证,先验证length域,在根据length域的值取得field域。如果length长度不够,会将报文进行缓存,等待下一个报文的到来。粘包必须是针对单个连接进行,保证传输报文的不乱序。
- length域: 报文的长度。
- field域: 报文内容。
- 4.5通讯包结构
- length域: 报文的长度。
- header length域: 报文头部长度。
- header域: 报文头部。
- body域: 报文内容。
header和body域的数据解析后是RemotingCommand这个结构,通信时将RemotingCommand序列化成byte[]字节数组通信层进行传输。
1. 4.5.1RemotingCommand结构
字段名 |
请求 |
响应 |
code |
请求操作代码,请求接收方根据不同的代码做不同操作 |
应答结果代码,0表示成功,非0表示各种错误 |
**Language ** |
请求发起方实现语言 |
响应方实现语言 |
**Version ** |
请求方程序版本 |
响应方程序版本 |
**Opaque ** |
请求标识代码,多线程,连接复用使用 |
应答方不做修改,直接返回 |
**Flag ** |
通信层的标志位 |
通信层的标志位 |
**Remark ** |
传输自定文本信息 |
错诨详细描述信息 |
**ExtFields ** |
请求自定义字段 |
响应自定义字段 |
CustomHeader |
自定义结构,传输时将其转换为extFields型数据 |
自定义结构,传输时将其转换为extFields型数据 |
**Body ** |
请求body |
响应body |
- 4.6报文压缩
报文达到一定长度后,提供报文压缩功能。压缩算法使用zip。RemotingCommand的SysFlag标识的第二位标识报文是否为压缩。
- 4.7心跳处理
通信组件本身不处理心跳,由上层进行心跳处理。
SmartGo-Registry 技术文档说明
针对版本V1.0.0
©
成都基础平台架构
2017/11/21
目 录
1 概述 4
2 Registry模块交互 4
2.1 Broker 4
2.2 Console 4
2.3 Net 4
2.4 Registry 4
3 专业术语 5
4 Registry实现原理 5
4.1 Broker注册业务 5
4.1 扫描活跃Broker 6
4.1 Registry与ZooKeeper 7
4.2 Registry核心数据结构 8
4.3 Registry内存数据变化 8
4.4 普通Topic与顺序Topic 9
4.1 Topic与Broker映射关系 10
- 1概述
Registry模块维护了很多broker和topic等信息,通过net和broker建立长连接,来保持与broker的通信,同时会提供心跳检测、数据更新与查询等常规服务。它保存活跃的broker列表,包括Master和Slave;同时也保存所有topic和该topic所有队列的列表。
- 2Registry模块交互
- 2.1Broker
每个Registry定时收到broker注册信息,并维护每个broker节点地址、角色、ID等信息,同时维护每个Broker活跃信息。
- 2.2Console
console模块通过client底层接口,间接与所有Registry建立连接并通信。
- 2.3Net
registry通过Net创建Service(目前端口为9876),注册并发布服务,供Broker、Client、Web等模块调用。
- 2.4Registry
Registry节点之间没有交互。大部分Registry维护的数据都存储于内存,顺序Topic数据存储与文件。
- 3专业术语
Topic
Topic是一个消息主题,一个在线Producer实例只能对应一个Topic,一个在线Consumer实例可以对应多个Topic,一条消息必须属于一个Topic。
QueueData
Topic队列,包含broker名称、读队列数、写队列数、broker权限、topic是否同步标记值。
BrokerData
描述Broker详细信息的数据结构,包括broker名称、broker地址、brokerId等。
TopicQueueTable
描述topic、queue、broker三类数据结构的映射关系。
BrokerAddrTable
描述brokerName、brokerId、brokerAddr三类数据结构的映射关系。
ClusterAddrTable
描述Broker与Cluster二者对应关系,可以通过brokerName查询详细的broker信息。
BrokerLiveTable
描述broker心跳信息的结构,每个broker发送心跳信息后,Registry将会维护broker心跳的最后更新时间。
- 4Registry实现原理
- 4.1Broker注册业务
broker注册由broker发起请求,Registry收到请求后维护topic、broker、cluster等等映射关系。
Registry收到请求后,根据请求参数,先后更新ClusterAddrTable、BrokerAddrTable、TopicQueueTable、BrokerLiveTable、FilterServerTable等数据接口的数据。
注意:每来一个Master注册,创建一个QueueData对象;如果是新建topic,就是添加QueueData对象;如果是修改topic,就是把旧的QueueData删除,加入新的。
- 1.1扫描活跃Broker
broker每隔30秒上报一次心跳信息,Registry收到心跳信息后,在内存中维护BrokerLiveTable结构体,该数据结构存储了每个broker最后心跳更新时间、broker地址、net连接等等信息。
Registry启动后就执行定时任务:每隔10秒执行一次, 扫描2分钟内没有心跳上报的broker。
如果扫描到结果,即存在2分钟内没有心跳上报的broker节点,那么Registry就主动关闭net连接,并删除内存中维护的数据,同时一并删除broker、topic、cluster、filter等等关联信息。
- 1.1Registry与ZooKeeper
(1)对于Smartgo来说,topic的数据在每个Master上是对等的,没有哪个Master上有topic上的全部数据,所以ZooKeeper的选举leader功能并不适合Smartgo。
(2)Smartgo集群中,需要有构件来处理一些通用数据,比如broker列表,broker刷新时间,使用ZooKeeper客户端处理数据之间的一些逻辑关系却比较麻烦,并且ZooKeeper还得保证多个master之间的一致性,这点更增加代码复杂度。如果有多种角色,那么ZooKeeper代码就更复杂了。
(3)既然Smartgo集群中没有用到ZooKeeper的一些重量级的功能,只是使用ZooKeeper的数据一致性和发布订阅的话,与其依赖重量级的ZooKeeper,还不如写个轻量级的Registry。
(4)Registry也可以集群部署,Registry与Registry之间无任何信息同步,只有一千多行代码的Registry稳定性肯定高于ZooKeeper。
- 1.2Registry核心数据结构
数据结构 |
类型 |
数据格式 |
存储数据 |
TopicQueueTable |
HashMap |
topic[list<QueueData>] |
保存topic-queue信息 |
BrokerAddrTable |
HashMap |
brokerName[BrokerData] |
保存broker地址信息 |
ClusterAddrTable |
HashMap |
clusterName[set<brokerName>] |
保存broker-cluster信息 |
BrokerLiveTable |
HashMap |
brokerAddr[brokerLiveTable] |
保存broker心跳信息 |
- 4.2Registry内存数据变化
整个Registry模块维护的内存数据,包括TopicQueueTable、BrokerAddrTable、ClusterAddrTable、BrokerLiveTable、FilterServerTable等5个部分,这5个部分的数据结构都存在内存中。不同的请求到达Registry模块后,内存中的数据有不同的处理方式。
(1)Broker注册业务请求发出后,Registry模块的内存数据就会新增或更新。Registry将会在内存中维护topic、broker、queue、cluster、filter等相互的映射关系。
(2) Broker卸载业务请求发出后,Registry模块的内存数据就会清除对应的映射关系。如果BrokerAddrs节点没有数据,则会将内存中整个BrokerAddrs节点都删除。
(3) Registry定时任务,每隔10秒扫描一次,如果在2分钟内不活跃Broker(也就是2分钟内没有心跳信息的Broker),也会在内存中删除对应的映射关系。
(4)Client和Admin模块,则通过接口查询Registry内存中的数据,例如查询Topic路由信息。
(5)如果网络通信模块Net的连接关闭、异常、空转,则Registry侧收到请求后,也会删除内存数据。
- 4.3普通Topic与顺序Topic
|
普通Topic |
顺序Topic |
创建方式 |
Broker注册,注册成功则创建成功 |
调用PUT_KV_CONFIG接口,调用成功则创建成功 |
存储文件 |
存储于Broker模块文件,路径是$SMARTGO_HOME/store/config/topics.json |
存储与Registry模块文件,路径是$HOME/namesrv/kvConfig.json |
查询方式 |
Broker注册会加载topics.json内容,方式传递给最终将所有Topic数据加载到Registry内存中 |
Registry启动,直接读取kvConfig.json并加载到内存中 |
Broker与Topic |
Registry模块维护TopicQueueTable数据 |
kvConfig.json配置文件,{"jcpt-example-200":"broker-a:8"}以brokerName:queueNum区分 |
数据状态 |
维护于Registry内存中 |
维护于Registry内存中 |
- 1.1Topic与Broker映射关系
Registry维护topic与broker对应的关系的数据结构是 TopicQueueTable,结构类型是map,内存中映射关系如下图所示:
(1)一个Topic对应多个QueueData, 而每个QueueData包括brokerName名称、queue队列个数;
(2)brokerName恰恰又是BrokerAddrTable结构的key值,根据brokerName即可把BrokerAddrTable结构体、TopicQueueTable结构体 关联起来。
(3)每次客户端查询topic路由信息,只需把TopicQueueTable结构体解析,然后把broker与topic在内存中的数据结构解析即可。
SmartGo-Client 技术文档说明
针对版本V1.0.0
©
成都基础平台架构
2017/11/21
目 录
1 概述 |
2 Client模块交互 |
|
|
|
3 专业术语 |
|
|
|
|
|
|
|
|
|
4 Client实现原理 |
|
|
|
|
|
|
|
5 Client最佳实践 |
|
|
附件一 SmartMQ开发者联系方式 |
|
- 1概述
Client在发送端起发送负载作用,在消费端起消费负载作用,是整个消息中间件的入口和出口,只与Registry和Broker进行交互。
- 2Client模块交互
- 2.1Registry
Client与Registry集群中随机一个保持长连接。
启动时会向Registry建立链接,启动过每隔30秒向Registry获取topic的路由信息并更新本地路由配置。
- 2.2Broker
每个Client通过Registry拿到BrokerList地址,Client与BrokerList保持长连接。
Producer通过路由信息轮询的向Broker每个队列发送消息(仅针对普通消息),。
Consumer从Broker拉取消息进行消费,Broker会维护Consumer与Topic之间订阅关系,并且会维护与Topic消费的Offset,主要是针对集群消息模式,广播消费模式Topic的Offset是存储在客户的。
- 2.3Net
Client通过Net创建Client,调用Registry和Broker的方法。
- 3专业术语
- 3.1Producer
消息生产者,负责产生消息,一般由业务系统负责产生消息。
- 3.2Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费。
- 3.3Push Consumer
Consumer 的一种,应用通常吐 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。
- 3.4Pull Consumer
Consumer 的一种,应用通常主劢调用 Consumer 的拉消息方法从 Broker 拉消息,主劢权由应用控制。
- 3.5Producer Group
一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
- 3.6Consumer Group
一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
- 3.7广播消费
一条消息被多个 Consumer 消费,即使返些 Consumer 属亍同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。在 CORBA Notification 规范中,消费方式都属于广播消费。在 JMS 规范中,相当于 JMS publish/subscribe model。
- 3.8集群消费
一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。在 CORBA Notification 规范中,无此消费方式。在 JMS 规范中,JMS point-to-point model 与之类似,但是SmartMQ的集群消费功能大等于 PTP 模型。因为 SmartMQ单个 Consumer Group 内的消费者类似于 PTP,但是一个 Topic/Queue 可以被多个 Consumer Group 消费。
- 3.9顺序消息
敬请期待
- 4Client实现原理
- 4.1创建Topic
Client创建Topic时序图如下:
每个Broker启动时会向Registry注册一个DEFAULT_TOPIC信息,当客户端创建Topic时,通过从Registry拿DEFAULT_TOPIC路由信息即可拿到集群所有Broker列表,然后依次调用Broker创建Topic接口就在Broker上创建了该Topic。
- 4.2发送同步消息
发送同步消息时序图如下:
发送消息负载客户端从registry拿到topic对应所有broker的所有队列依次遍历队列发送到每个队列,从而保证了发送端负载。
SendResult中SendStatus值说明
SEND_OK 发送成功并同步到SLAVE成功
FLUSH_DISK_TIMEOUT 刷盘超时
FLUSH_SLAVE_TIMEOUT 同步到SLAVE超时
SLAVE_NOT_AVAILABLE SLAVE不可用
- 4.3发送异步消息
发送异步消息时序图如下:
异步消息和同步消息流程几乎是一模一样只是在返回SendResult时,客户端不需要等待只需传入一个回调函数,服务端处理发送消息成功即通过回调函数返回SendResult给客户端。
- 4.4发送OneWay消息
发送OneWay消息时序图如下:
发送OneWay消息和发送异步消息,同步消息类似,只是服务端没有返回值,不清楚服务端是否成功,此模式是性能最高的,但消息可靠性不能保证。
- 4.5Push集群消费
Push消费流程比较繁琐,时序图如下:
Push集群消费默认负载算法是按照ConsumerId平均分配队列。举个例子,一个Topic 24个队列,开1个客户端消费则这个客户端订阅这个topic 24个队列,如果开2个客户端消费,则每个客户端订阅这个topic 24个队列中12个队列,从而达到客户端消费负载。客户端可以重写这个负载策略。
- 4.6Push广播消费
流程和push集群消费类似,仅持久化offset时存在本地,负载对广播消费不起作用。
- 4.7Pull消费
Pull消息时序图如下:
Pull消费主要是客户端控制,offset客户端完全是自己管理,所以没有集群消费和广播消费。
- 5Client最佳实践
- 5.1Producer最佳实践
- 一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。 message.setTags("TagA");
- 每个消息在业务局面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为个消息创建索引(哈希索引),应用可以通过 topic,key 来查询返条消息内容,以及消息被谁消费。由亍是哈希索引,请务必保证 key 尽可能唯一,返样可以避免潜在的哈希冲突。
// 订单 Id
String orderId = "20034568923546";
message.setKeys(orderId);
- 消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
- send 消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义。
SEND_OK
消息収送成功
FLUSH_DISK_TIMEOUT
消息収送成功,但是服务器刷盘超时,消息已经迕入服务器队列,只有此时服务器宕机,消息才会丢失。
FLUSH_SLAVE_TIMEOUT
消息収送成功,但是服务器同步到Slave时超时,消息已经迕入服务器队列,只有此时服务器宕机,消息才会丢失。
SLAVE_NOT_AVAILABLE
消息収送成功,但是此时slave不可用,消息已经迕入服务器队列,只有此时服务器宕机,消息才会丢失。
对亍精卫发送顺序消息的应用,由于顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果
sendresult中的status字段不等于SEND_OK,就应该尝试重试。对于其他应用,则没有必要返样。
- 对于消息不可丢失应用,务必要有消息重试机制,例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。
Producer 的 send 方法本身支持内部重试,重试逻辑如下:
- 至多重试 3 次。
- 如果发送失败,则轮转到下一个 Broker。
- 返个方法的总耗时时间不超过 sendMsgTimeout 设置的值,默认 10s。
所以,如果本身向broker发送消息产生超时异常,就不会再做重试。
以上策略仍然不能能保证消息一定収送成功,为保证消息一定成功,建议应用返样做,如果调用 send 同步方法发送失败,则尝试将消息存储到 db,由后台线程定时重试,保证消息一定到达 Broker。
上述db重试方式为什举没有集成到MQ客户端内部做,而是要求应用自己去完成,我们基于以下几点考虑:
- MQ的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是cpu、内存、网络。
- 如果 MQ 客户端内部集成一个 KV 存储模块,那举数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又由亍应用关闭过程不受MQ运维人员控制,可能经常会収生 kill -9 返样暴力方式关闭,造成数据没有及时落盘而丢失。
- Producer 所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。
综上,建议重试过程交由应用来控制。
一个 RPC 调用,通常是返样一个过程
- 客户端収送请求到服务器
- 服务器处理该请求
- 服务器向客户端返回应答
所以一个 RPC 的耗时时间是上述三个步骤的总和,而某些场景要求耗时非常短,但是对可靠性要求并不高,例如日志收集类应用,此类应用可以采用 oneway 形式调用,oneway 形式只収送请求不等待应答,而収送请求在客户端实现局面仅仅是一个 os 系统调用的开销,即将数据写入客户端的 socket 缓冲区,此过程耗时通常在微秒。
- 5.2Consumer最佳实践
SmartMQ 无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务局面去重,有以下几种去重方式。
- 将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单 Id 等,消费之前判断是否在Db 或全局KV存储中存在,如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是可能会存在同样的消息有两个不同msgId的情冴(有多种原因),返种情况可能会使业务上重复消费,建议最好使用消息内容中的唯一标识字段去重。
- 使用业务局面的状态机去重。
1附件一 Smargo开发者联系方式
包管理必读
# 下载并安装包管理工具
go get -u github.com/kardianos/govendor # 下载govendor源码
go install github.com/kardianos/govendor # 安装govendor依赖工具
# 同步包
govendor sync # 基于vendor.json文件下载依赖包
# 更改包依赖
govendor update # 从 $GOPATH 更新包依赖到vendor目录
# 重新做包依赖
govendor init # 初始化vendor目录
govendor add +external # 添加所有外部包到vendor目录
注释规范
- 对于注释,请遵照以下规范:
- 注释类型1(适用于结构体或者包名注释)、
// 方法对象名 xxx
// Author: xxx, <xxx@gmail.com>
// Since: 2017/3/20 or v1.0.0
// 由于是顺序消息,因此只能选择一个queue生产和消费消息
// xxx Modify: xxx, <xxx@gmail.com> Since: 2017/3/20 or v1.0.0
// xxx Add: xxx, <xxx@gmail.com> Since: 2017/3/20 or v1.0.0
- 关于TODO、FIXME、XXX注释规范(后续再加上)、
// TODO: + 说明:xxx Author: xxx, <xxx@gmail.com> Since: 2017/3/20 or v1.0.0
如果代码中有TODO该标识,说明在标识处有功能代码待编写,待实现的功能在说明中会简略说明。
// FIXME: + 说明:xxx Author: xxx, <xxx@gmail.com> Since: 2017/3/20 or v1.0.0
如果代码中有FIXME该标识,说明标识处代码需要修正,甚至代码是错误的,不能工作,需要修复,如何修正会在说明中简略说明。
// XXX: + 说明:xxx Author: xxx, <xxx@gmail.com> Since: 2017/3/20 or v1.0.0
如果代码中有XXX该标识,说明标识处代码虽然实现了功能,但是实现的方法有待商榷,希望将来能改进,要改进的地方会在说明中简略说明。
开发IDE
联系我们
:fa-comments-o: smartgo开发组 https://git.oschina.net/cloudzone/smartgo