Documentation ¶
Overview ¶
Package gobinlog 将自己伪装成slave获取mysql主从复杂流来 获取mysql数据库的数据变更,提供轻量级,快速的dump协议交互 以及binlog的row模式下的格式解析。
gobinlog使用方式非常简单,你要实现一个MysqlTableMapper
type mysqlColumnAttribute struct { field string typ string } func (m *mysqlColumnAttribute) Field() string { return m.field } func (m *mysqlColumnAttribute) IsUnSignedInt() bool { return strings.Contains(m.typ, mysqlUnsigned) } type mysqlTableInfo struct { name MysqlTableName columns []MysqlColumn } func (m *mysqlTableInfo) Name() MysqlTableName { return m.name } func (m *mysqlTableInfo) Columns() []MysqlColumn { return m.columns } type exampleMysqlTableMapper struct { db *sql.DB } func (e *exampleMysqlTableMapper) MysqlTable(name MysqlTableName) (MysqlTable, error) { info := &mysqlTableInfo{ name: name, columns: make([]MysqlColumn, 0, 10), } query := "desc " + name.String() rows, err := e.db.Query(query) if err != nil { return info, fmt.Errorf("query failed query: %s, error: %v", query, err) } defer rows.Close() var null,key,extra string var columnDefault []byte for i := 0; rows.Next(); i++ { column := &mysqlColumnAttribute{} err = rows.Scan(&column.field, &column.typ, &null, &key, &columnDefault, &extra) if err != nil { return info, err } info.columns = append(info.columns, column) } return info, nil }
你需要申请一个NewRowStreamer,数据库连接信息如下
user:password@tcp(ip:port)/db
其中user是mysql的用户名,password是mysql的密码,ip是mysql的ip地址, port是mysql的端口,db是mysql的数据库名,serverID要与主库不同,
s, err := gobinlog.NewStreamer(dsn, 1234, e) if err != nil { _log.Errorf("NewStreamer fail. err: %v", err) return }
SetBinlogPosition的参数可以通过SHOW MASTER STATUS获取,通过这个函数 可以设置同步起始位置
s.SetBinlogPosition(pos)
通过开启Stream,可以在SendTransactionFun用于处理事务信息函数,如打印事务信息
err = s.Stream(ctx, func(t *Transaction) error { fmt.Printf("%v", *t) return nil })
如果有需要,你可以通过Error获取Stream过程中的错误
err = s.Error()
当然你如果需要详细的调试信息,你可以通过SetLogger函数设置对应的调试接口
gobinlog.SetLogger(NewDefaultLogger(os.Stdout, DebugLevel))
Index ¶
- func SetLogger(logger log.Logger)
- type ColumnData
- type ColumnType
- func (c ColumnType) IsBit() bool
- func (c ColumnType) IsBlob() bool
- func (c ColumnType) IsDate() bool
- func (c ColumnType) IsDateTime() bool
- func (c ColumnType) IsDecimal() bool
- func (c ColumnType) IsFloat() bool
- func (c ColumnType) IsGeometry() bool
- func (c ColumnType) IsInteger() bool
- func (c ColumnType) IsString() bool
- func (c ColumnType) IsTime() bool
- func (c ColumnType) IsTimestamp() bool
- func (c ColumnType) String() string
- type Error
- type FormatType
- type MysqlColumn
- type MysqlTable
- type MysqlTableMapper
- type MysqlTableName
- type Position
- type RowData
- type SendTransactionFunc
- type StatementType
- type StreamEvent
- type Streamer
- type Transaction
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ColumnData ¶
type ColumnData struct { Filed string // 字段信息 Type ColumnType // binlog中的列类型 IsEmpty bool // data is empty,即该列没有变化 Data []byte // the data }
ColumnData 单个列的信息
func (*ColumnData) MarshalJSON ¶
func (c *ColumnData) MarshalJSON() ([]byte, error)
MarshalJSON 实现ColumnData的json序列化
type MysqlColumn ¶
MysqlColumn 用于实现mysql表列的接口
type MysqlTable ¶
type MysqlTable interface { Name() MysqlTableName //表名 Columns() []MysqlColumn //所有列 }
MysqlTable 用于实现mysql表的接口
type MysqlTableMapper ¶
type MysqlTableMapper interface {
MysqlTable(name MysqlTableName) (MysqlTable, error)
}
MysqlTableMapper 用于获取表信息的接口
type MysqlTableName ¶
type MysqlTableName struct { DbName string `json:"db"` //数据库名 TableName string `json:"table"` //表名 }
MysqlTableName mysql的表名
func NewMysqlTableName ¶
func NewMysqlTableName(database, table string) MysqlTableName
NewMysqlTableName 创建MysqlTableName
type Position ¶
type Position struct { Filename string `json:"filename"` //binlog文件名 Offset int64 `json:"offset"` //在binlog文件中的位移 }
Position 指定binlog的位置,以文件名和位移
type SendTransactionFunc ¶
type SendTransactionFunc func(*Transaction) error
SendTransactionFunc 处理事务信息函数,你可以将一个chan注册到这个函数中如
func getTransaction(tran *Transaction) error{ Transactions <- tran return nil }
如果这个函数返回错误,那么RowStreamer.Stream会停止dump以及解析binlog且返回错误
type StatementType ¶
type StatementType int
StatementType means the sql statement type
const ( StatementUnknown StatementType = iota //不知道的语句 StatementBegin //开始语句 StatementCommit //提交语句 StatementRollback //回滚语句 StatementInsert //插入语句 StatementUpdate //更新语句 StatementDelete //删除语句 StatementCreate //创建表语句 StatementAlter //改变表属性语句 StatementDrop //删除表语句 StatementTruncate //截取表语句 StatementRename //重命名表语句 StatementSet //设置属性语句 )
sql语句类型
func GetStatementCategory ¶
func GetStatementCategory(sql string) StatementType
GetStatementCategory we can get statement type from a SQL
type StreamEvent ¶
type StreamEvent struct { Type StatementType //语句类型 Table MysqlTableName //表名 Query replication.Query //sql Timestamp int64 //执行时间 RowValues []*RowData //which data come to used for StatementInsert and StatementUpdate RowIdentifies []*RowData //which data come from used for StatementUpdate and StatementDelete }
StreamEvent means a SQL or a rows in binlog
func (*StreamEvent) MarshalJSON ¶
func (s *StreamEvent) MarshalJSON() ([]byte, error)
MarshalJSON 实现StreamEvent的json序列化
type Streamer ¶
type Streamer struct {
// contains filtered or unexported fields
}
Streamer 从github.com/youtube/vitess/go/vt/binlog/binlog_streamer.go的基础上移植过来 专门用来RowStreamer解析row模式的binlog event,将其变为对应的事务
func NewStreamer ¶
func NewStreamer(dsn string, serverID uint32, tableMapper MysqlTableMapper) (*Streamer, error)
NewStreamer dsn是mysql数据库的信息,serverID是标识该数据库的信息
func (*Streamer) SetBinlogPosition ¶
SetBinlogPosition 设置开始的binlog位置
type Transaction ¶
type Transaction struct { NowPosition Position //在binlog中的当前位置 NextPosition Position //在binlog中的下一个位置 Timestamp int64 //执行时间 Events []*StreamEvent //一组有事务的binlog evnet }
Transaction 代表一组有事务的binlog evnet
func (*Transaction) MarshalJSON ¶
func (t *Transaction) MarshalJSON() ([]byte, error)
MarshalJSON 实现Transaction的json序列化
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。
|
Package replication 用于将binlog解析成可视的数据或者sql语句 是从github.com/youtube/vitess/go/mysql的基础上移植过来,其 主要功能如下:1.完全支持mysql 5.6.x的所有数据格式解析,2.支持 5.7.x的绝大多数数据格式解析,仅仅不支持JSON数据。 |