bee binlog efficient easy worker
startup test docker environment
docker-compose -f testdata/docker-compose.yml up -d
- Kafka Development Environment
build bee
- local:
linux: make linux
- create sample conf.yml by
./bee -init
- edit conf.yml
- start
MYSQL_PWD=root mysql -h -u root -vvv < testdata/bee.sql
kafka message
- 消费测试
kt consume -version=2.5.1 -topic=bee.binlog.pos
- 消费测试
kt consume -version=2.5.1 -topic=bee.binlog.dat
// DDLRequest 是数据定义请求.
type DDLRequest struct {
Timestamp uint32 // binlog发生时间,unix毫秒数的时间戳
Schema string // 当前schema(即当前所在数据库的库名)
Stmt string // DDL语句
// RowsRequest 是数据更新请求.
type RowsRequest struct {
Timestamp uint32 // binlog发生时间,unix毫秒数的时间戳
Schema string // 数据更新的schema(即数据库的库名)
Table string // 数据更新的表名
Action calan.Action // 更新动作: update/insert/delete
Columns []string // 更新字段列表
Rows []Row // 更新行
// Row 更新行.
type Row struct {
Old []interface{} `json:",omitempty"` // 更新之前的取值(只有update时有)
Row []interface{} // 更新后的取值
2021-03-16 16:36:46.534 [INFO ] 19249 --- [13 ] [-] : publish message value:[{"Timestamp":1615883806,"Schema":"split_bj_test","Table":"t_member","Action":"insert","Columns":["id","name","sex","citizen_id","update_time","create_time","remark"],"Rows":[{"Row":[3,"shaseng",1,"110822199010110023","2021-03-16 16:36:46","2021-03-16 16:36:46","aaa"]}]}] to kafka result &{Partition:0 Offset:4 Topic:bee.binlog.dat}
2021-03-16 16:37:46.532 [INFO ] 19249 --- [13 ] [-] : publish message value:{"Timestamp":1615883806,"Name":"mysql-bin.000002","Pos":386559236} to kafka result &{Partition:0 Offset:3 Topic:bee.binlog.pos}
2021-03-16 17:05:47.573 [INFO ] 19249 --- [13 ] [-] : publish message value:[{"Timestamp":1615885547,"Schema":"split_bj_test","Table":"t_member","Action":"update","Columns":["id","name","sex","citizen_id","update_time","create_time","remark"],"Rows":[{"Old":[1,"唐僧",1,"110822199010110011","2021-03-16 16:35:25","2021-03-16 16:35:25","唐朝高僧"],"Row":[1,"唐僧",1,"110822199010110011","2021-03-16 17:05:47","2021-03-16 16:35:25","更更更更"]}]}] to kafka result &{Partition:0 Offset:5 Topic:bee.binlog.dat}
2021-03-16 17:05:47.682 [INFO ] 19249 --- [13 ] [-] : publish message value:{"Timestamp":1615885547,"Name":"mysql-bin.000002","Pos":386559651} to kafka result &{Partition:0 Offset:4 Topic:bee.binlog.pos}
2021-03-16 17:07:33.894 [INFO ] 19249 --- [13 ] [-] : publish message value:[{"Timestamp":1615885653,"Schema":"split_bj_test","Table":"t_member","Action":"update","Columns":["id","name","sex","citizen_id","update_time","create_time","remark"],"Rows":[{"Old":[1,"唐僧",1,"110822199010110011","2021-03-16 17:05:47","2021-03-16 16:35:25","更更更更"],"Row":[1,"唐僧",1,"110822199010110011","2021-03-16 17:07:33","2021-03-16 16:35:25","靠靠靠"]},{"Old":[2,"shaseng",1,"110822199010110023","2021-03-16 16:35:49","2021-03-16 16:35:49","aaa"],"Row":[2,"shaseng",1,"110822199010110023","2021-03-16 17:07:33","2021-03-16 16:35:49","靠靠靠"]},{"Old":[3,"shaseng",1,"110822199010110023","2021-03-16 16:36:46","2021-03-16 16:36:46","aaa"],"Row":[3,"shaseng",1,"110822199010110023","2021-03-16 17:07:33","2021-03-16 16:36:46","靠靠靠"]}]}] to kafka result &{Partition:0 Offset:6 Topic:bee.binlog.dat}
2021-03-16 17:07:33.895 [INFO ] 19249 --- [13 ] [-] : publish message value:{"Timestamp":1615885653,"Name":"mysql-bin.000002","Pos":386560283} to kafka result &{Partition:0 Offset:5 Topic:bee.binlog.pos}
2021-03-16 17:08:20.901 [INFO ] 19249 --- [13 ] [-] : publish message value:[{"Timestamp":1615885700,"Schema":"split_bj_test","Table":"t_member","Action":"delete","Columns":["id","name","sex","citizen_id","update_time","create_time","remark"],"Rows":[{"Row":[1,"唐僧",1,"110822199010110011","2021-03-16 17:07:33","2021-03-16 16:35:25","靠靠靠"]}]}] to kafka result &{Partition:0 Offset:7 Topic:bee.binlog.dat}
2021-03-16 17:08:24.931 [INFO ] 19249 --- [13 ] [-] : publish message value:[{"Timestamp":1615885704,"Schema":"split_bj_test","Table":"t_member","Action":"insert","Columns":["id","name","sex","citizen_id","update_time","create_time","remark"],"Rows":[{"Row":[4,"唐僧",1,"110822199010110011","2021-03-16 17:08:24","2021-03-16 17:08:24","唐朝高僧"]}]}] to kafka result &{Partition:0 Offset:8 Topic:bee.binlog.dat}
# kt consume -version=2.5.1 -topic=bee.binlog.dat
{"partition":0,"offset":13,"key":"bee.t_member","value":{"Timestamp":1615436876,"Schema":"bee","Table":"t_member","Action":"insert","Columns":["id","name","sex","citizen_id","update_time","create_time","remark"],"Rows":[{"Row":[41,"guanyin",0,"110110100011000000","2021-03-11 04:27:56","2021-03-11 04:27:56","ccc"]}]},"timestamp":"2021-03-11T12:27:56.076+08:00"}
{"partition":0,"offset":14,"key":"bee","value":{"Timestamp":1615437545,"Schema":"bee","Stmt":"CREATE TABLE emp(id int(11) COMMENT '员工编号', name varchar(255) COMMENT '员工姓名', dept_id int(11) COMMENT '部门编号', leader int(11) COMMENT '直属领导', state int(11) COMMENT '是否在职 1在职 0离职', PRIMARY KEY (id)) ENGINE = InnoDB CHARACTER SET = utf8mb4"},"timestamp":"2021-03-11T12:39:05.25+08:00"}
# kt consume -version=2.5.1 -topic=bee.binlog.pos
$ MYSQL_PWD=root mysql -h -u root
| Log_name | File_size |
| mysql-bin.000001 | 177 |
| mysql-bin.000002 | 3085942 |
| mysql-bin.000003 | 9456 |
| ... | ... |
| mysql-bin.000010 | 6659 |
| mysql-bin.000011 | 194 |
11 rows in set (0.26 sec)
mysql> show variables like 'log_bin%';
| Variable_name | Value |
| log_bin | ON |
| log_bin_basename | /var/lib/mysql/mysql-bin |
| log_bin_index | /var/lib/mysql/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
5 rows in set (0.08 sec)
mysql> show binlog events in 'mysql-bin.000010';
| Log_name | Pos | Event_type | Server_id | End_log_pos | Info |
| mysql-bin.000010 | 4 | Format_desc | 10001 | 123 | Server ver: 5.7.33-log, Binlog ver: 4 |
| mysql-bin.000010 | 123 | Previous_gtids | 10001 | 194 | 74969d8f-8220-11eb-b120-0242ac180003:1-99 |
| mysql-bin.000010 | 194 | Gtid | 10001 | 259 | SET @@SESSION.GTID_NEXT= '74969d8f-8220-11eb-b120-0242ac180003:100' |
| mysql-bin.000010 | 4107 | Query | 10001 | 4191 | BEGIN |
| mysql-bin.000010 | 4191 | Table_map | 10001 | 4255 | table_id: 108 (bee.t_member) |
| mysql-bin.000010 | 4255 | Write_rows | 10001 | 4342 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000010 | 4342 | Xid | 10001 | 4373 | COMMIT /* xid=152 */ |
| mysql-bin.000010 | 4373 | Gtid | 10001 | 4438 | SET @@SESSION.GTID_NEXT= '74969d8f-8220-11eb-b120-0242ac180003:109' |
| mysql-bin.000010 | 4438 | Query | 10001 | 4522 | BEGIN |
| mysql-bin.000010 | 4522 | Table_map | 10001 | 4586 | table_id: 108 (bee.t_member) |
| mysql-bin.000010 | 4586 | Write_rows | 10001 | 4673 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000010 | 4673 | Xid | 10001 | 4704 | COMMIT /* xid=161 */ |
| mysql-bin.000010 | 6028 | Gtid | 10001 | 6093 | SET @@SESSION.GTID_NEXT= '74969d8f-8220-11eb-b120-0242ac180003:114' |
| mysql-bin.000010 | 6093 | Query | 10001 | 6177 | BEGIN |
| mysql-bin.000010 | 6177 | Table_map | 10001 | 6241 | table_id: 108 (bee.t_member) |
| mysql-bin.000010 | 6241 | Write_rows | 10001 | 6329 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000010 | 6329 | Table_map | 10001 | 6393 | table_id: 108 (bee.t_member) |
| mysql-bin.000010 | 6393 | Write_rows | 10001 | 6479 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000010 | 6479 | Table_map | 10001 | 6543 | table_id: 108 (bee.t_member) |
| mysql-bin.000010 | 6543 | Write_rows | 10001 | 6628 | table_id: 108 flags: STMT_END_F |
| mysql-bin.000010 | 6628 | Xid | 10001 | 6659 | COMMIT /* xid=216 */ |
97 rows in set (0.09 sec)
列名 |
说明 |
Log_name |
当前事件所在的 binlog 文件名称 |
Pos |
当前事件的开始位置,每个事件都占用固定的字节大小,结束位置(End_log_pos) 减去 Pos ,就是这个事件占用的字节数 |
Event_type |
事件的类型 |
Server_id |
表示这个事件的 MySQL server_id |
End_log_pos |
下一个事件的开始位置 |
Info |
当前事件的描述信息 |
事件名 |
说明 |
Format_desc |
是 binlog 文件的第一个事件。在 Info 列,我们可以看到 MySQL server 的版本号,以及 binlog 的版本号 |
Previous_gtids |
这是表示之前的 binlog 文件中,已经执行过的GTID。(需要开启GTID选项) |
Anonymous_Gtid |
没有开启 GTID 选项时,每个事物的开始事件 |
Query |
向 Binlog 发送一个语句,这里是事务开始语句 BEGIN 。 |
Rows_query |
记录SQL。只有当参数 binlog_rows_query_log_events = TRUE 时才会产生,这个参数的默认值是 FALSE |
Table_map |
记录将要被修改的表的结构 |
Delete_rows |
从表中删除一个记录 |
Xid |
事务 commit 的时候写入事务 ID |
Rotate |
每个 Binlog 文件的结束事件。指定下一个 Binlog 文件的名称。 |
以上两张表,摘自parsing and “tampering” MySQL binlog with Python