Documentation ¶
Overview ¶
go-mysqlbinlog: a simple binlog tool to sync remote MySQL binlog. go-mysqlbinlog supports semi-sync mode like facebook mysqlbinlog. see http://yoshinorimatsunobu.blogspot.com/2014/04/semi-synchronous-replication-at-facebook.html
Index ¶
- Constants
- Variables
- func CheckAuditSetting(cnf *config.Config)
- func Exist(filename string) bool
- func GetDataTypeBase(dataType string) string
- func GetDataTypeLength(dataType string) []int
- func GetErrorLevel(code ErrorCode) uint8
- func GetErrorMessage(code ErrorCode, lang string) string
- func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet) ([]chunk.Row, error)
- func GetTimeValue(ctx sessionctx.Context, v interface{}, tp byte, fsp int) (d types.Datum, err error)
- func HTMLEscape(w io.Writer, b []byte)
- func HTMLEscapeString(s string) string
- func IsCurrentTimestampExpr(e ast.ExprNode) bool
- func Max(x, y int) int
- func Min(x, y int) int
- func StringStorageReq(dataType string, charset string) int
- type ChanOscData
- type DBInfo
- type ErrExprLoc
- type ErrorCode
- type ExplainInfo
- type FieldInfo
- type IndexInfo
- type LevelSets
- type MasterStatus
- type MyRecordSets
- type PrintRecord
- type PrintSets
- type ProcessListSets
- type Record
- type Rewrite
- type SQLError
- type Session
- type SourceOptions
- type SplitRecord
- type SplitSets
- type TableInfo
- type TxnState
- func (st *TxnState) Commit(ctx context.Context) error
- func (st *TxnState) Delete(k kv.Key) error
- func (st *TxnState) Get(k kv.Key) ([]byte, error)
- func (st *TxnState) Rollback() error
- func (st *TxnState) Seek(k kv.Key) (kv.Iterator, error)
- func (st *TxnState) SeekReverse(k kv.Key) (kv.Iterator, error)
- func (st *TxnState) Set(k kv.Key, v []byte) error
- func (st *TxnState) String() string
- func (st *TxnState) Valid() bool
- type VariableSets
Constants ¶
const ( DBTypeMysql = iota DBTypeMariaDB DBTypeTiDB )
数据库类型
const ( StageOK byte = iota StageCheck StageExec StageBackup )
审核阶段
const ( StatusAuditOk byte = iota StatusExecFail StatusExecOK StatusBackupFail StatusBackupOK )
审核状态
const ( TABLE_COMMENT_MAXLEN = 2048 COLUMN_COMMENT_MAXLEN = 1024 INDEX_COMMENT_MAXLEN = 1024 TABLE_PARTITION_COMMENT_MAXLEN = 1024 )
const ( // ErrExprInSelect is in select fields for the error of ErrFieldNotInGroupBy ErrExprInSelect = "SELECT list" // ErrExprInOrderBy is in order by items for the error of ErrFieldNotInGroupBy ErrExprInOrderBy = "ORDER BY" )
Variables ¶
var ( // StageList 审核各阶段,可选值: 审核阶段,执行阶段,备份阶段 StageList = [4]string{"RERUN", "CHECKED", "EXECUTED", "BACKUP"} // StatusList 状态列表. 如审核完成,执行成功/失败,备份成功/失败等 StatusList = [5]string{"Audit Completed", "Execute failed", "Execute Successfully", "Execute Successfully\nBackup failed", "Execute Successfully\nBackup Successfully"} )
var ( ErrWrongValueForVar = terror.ClassVariable.New(mysql.ErrWrongValueForVar, mysql.MySQLErrName[mysql.ErrWrongValueForVar]) ErrTruncatedWrongValue = terror.ClassVariable.New(mysql.ErrTruncatedWrongValue, mysql.MySQLErrName[mysql.ErrTruncatedWrongValue]) ErrWrongTypeForVar = terror.ClassVariable.New(mysql.ErrWrongTypeForVar, mysql.MySQLErrName[mysql.ErrWrongTypeForVar]) )
var ( // All the exported errors are defined here: ErrIncorrectParameterCount = terror.ClassExpression.New(mysql.ErrWrongParamcountToNativeFct, mysql.MySQLErrName[mysql.ErrWrongParamcountToNativeFct]) ErrDivisionByZero = terror.ClassExpression.New(mysql.ErrDivisionByZero, mysql.MySQLErrName[mysql.ErrDivisionByZero]) ErrRegexp = terror.ClassExpression.New(mysql.ErrRegexp, mysql.MySQLErrName[mysql.ErrRegexp]) ErrOperandColumns = terror.ClassExpression.New(mysql.ErrOperandColumns, mysql.MySQLErrName[mysql.ErrOperandColumns]) ErrCutValueGroupConcat = terror.ClassExpression.New(mysql.ErrCutValueGroupConcat, mysql.MySQLErrName[mysql.ErrCutValueGroupConcat]) )
var ErrorsChinese = map[ErrorCode]string{}/* 161 elements not displayed */
var ErrorsDefault = map[ErrorCode]string{}/* 169 elements not displayed */
var Keywords = map[string]bool{}/* 673 elements not displayed */
Keywords 数据库关键字
var URL string
Functions ¶
func CheckAuditSetting ¶
CheckAuditSetting 自动校准旧的审核规则和自定义规则
func GetDataTypeBase ¶
GetDataTypeBase 获取dataType中的数据类型,忽略长度
func GetDataTypeLength ¶
GetDataTypeLength 获取dataType中的数据类型长度
func GetErrorLevel ¶
func GetErrorMessage ¶
GetErrorMessage 获取审核信息,默认为英文
func GetRows4Test ¶
func GetRows4Test(ctx context.Context, sctx sessionctx.Context, rs ast.RecordSet) ([]chunk.Row, error)
GetRows4Test gets all the rows from a RecordSet, only used for test.
func GetTimeValue ¶
func GetTimeValue(ctx sessionctx.Context, v interface{}, tp byte, fsp int) (d types.Datum, err error)
GetTimeValue gets the time value with type tp.
func HTMLEscape ¶
HTMLEscape writes to w the escaped HTML equivalent of the plain text data b.
func HTMLEscapeString ¶
HTMLEscapeString returns the escaped HTML equivalent of the plain text data s.
func IsCurrentTimestampExpr ¶
IsCurrentTimestampExpr returns whether e is CurrentTimestamp expression.
func StringStorageReq ¶
StringStorageReq String Type Storage Requirements return bytes count
Types ¶
type ChanOscData ¶
type ChanOscData struct {
// contains filtered or unexported fields
}
type ErrExprLoc ¶
ErrExprLoc is for generate the ErrFieldNotInGroupBy error info
type ErrorCode ¶
type ErrorCode int
const ( ER_ERROR_FIRST ErrorCode = iota ER_NOT_SUPPORTED_YET ER_SQL_NO_SOURCE ER_SQL_NO_OP_TYPE ER_SQL_INVALID_OP_TYPE ER_PARSE_ERROR ER_SYNTAX_ERROR ER_REMOTE_EXE_ERROR ER_SHUTDOWN_COMPLETE ER_WITH_INSERT_FIELD ER_WITH_INSERT_VALUES ER_WRONG_VALUE_COUNT_ON_ROW ER_BAD_FIELD_ERROR ER_FIELD_SPECIFIED_TWICE ER_BAD_NULL_ERROR ER_NO_WHERE_CONDITION ER_NORMAL_SHUTDOWN ER_FORCING_CLOSE ER_CON_COUNT_ERROR ER_INVALID_COMMAND ER_SQL_INVALID_SOURCE ER_WRONG_DB_NAME ER_NO_DB_ERROR ER_WITH_LIMIT_CONDITION ER_WITH_ORDERBY_CONDITION ER_SELECT_ONLY_STAR ER_ORDERY_BY_RAND ER_ID_IS_UPER ER_UNKNOWN_COLLATION ER_INVALID_DATA_TYPE ER_NOT_ALLOWED_NULLABLE ER_DUP_FIELDNAME ER_WRONG_COLUMN_NAME ER_WRONG_AUTO_KEY ER_TABLE_CANT_HANDLE_AUTO_INCREMENT ER_FOREIGN_KEY ER_TOO_MANY_KEY_PARTS ER_TOO_LONG_IDENT ER_UDPATE_TOO_MUCH_ROWS ER_INSERT_TOO_MUCH_ROWS ER_CHANGE_TOO_MUCH_ROWS ER_WRONG_NAME_FOR_INDEX ER_TOO_MANY_KEYS ER_NOT_SUPPORTED_KEY_TYPE ER_WRONG_SUB_KEY ER_WRONG_KEY_COLUMN ER_TOO_LONG_KEY ER_MULTIPLE_PRI_KEY ER_DUP_KEYNAME ER_TOO_LONG_INDEX_COMMENT ER_DUP_INDEX ER_TEMP_TABLE_TMP_PREFIX ER_TABLE_PREFIX ER_TABLE_CHARSET_MUST_UTF8 ER_TABLE_CHARSET_MUST_NULL ER_TABLE_MUST_HAVE_COMMENT ER_COLUMN_HAVE_NO_COMMENT ER_TABLE_MUST_HAVE_PK ER_PARTITION_NOT_ALLOWED ER_USE_ENUM ER_USE_TEXT_OR_BLOB ER_COLUMN_EXISTED ER_COLUMN_NOT_EXISTED ER_CANT_DROP_FIELD_OR_KEY ER_INVALID_DEFAULT ER_USERNAME ER_HOSTNAME ER_NOT_VALID_PASSWORD ER_WRONG_STRING_LENGTH ER_BLOB_USED_AS_KEY ER_TOO_LONG_BAKDB_NAME ER_INVALID_BACKUP_HOST_INFO ER_BINLOG_CORRUPTED ER_NET_READ_ERROR ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE ER_SLAVE_RELAY_LOG_WRITE_FAILURE ER_INCORRECT_GLOBAL_LOCAL_VAR ER_START_AS_BEGIN ER_OUTOFMEMORY ER_HAVE_BEGIN ER_NET_READ_INTERRUPTED ER_BINLOG_FORMAT_STATEMENT ER_ERROR_EXIST_BEFORE ER_UNKNOWN_SYSTEM_VARIABLE ER_UNKNOWN_CHARACTER_SET ER_END_WITH_COMMIT ER_DB_NOT_EXISTED_ERROR ER_TABLE_EXISTS_ERROR ER_INDEX_NAME_IDX_PREFIX ER_INDEX_NAME_UNIQ_PREFIX ER_AUTOINC_UNSIGNED ER_VARCHAR_TO_TEXT_LEN ER_CHAR_TO_VARCHAR_LEN ER_KEY_COLUMN_DOES_NOT_EXITS ER_INC_INIT_ERR ER_WRONG_ARGUMENTS ER_SET_DATA_TYPE_INT_BIGINT ER_TIMESTAMP_DEFAULT ER_CHARSET_ON_COLUMN ER_AUTO_INCR_ID_WARNING ER_ALTER_TABLE_ONCE ER_BLOB_CANT_HAVE_DEFAULT ER_END_WITH_SEMICOLON ER_NON_UNIQ_ERROR ER_TABLE_NOT_EXISTED_ERROR ER_UNKNOWN_TABLE ER_INVALID_GROUP_FUNC_USE ER_INDEX_USE_ALTER_TABLE ER_WITH_DEFAULT_ADD_COLUMN ER_TRUNCATED_WRONG_VALUE ER_TEXT_NOT_NULLABLE_ERROR ER_WRONG_VALUE_FOR_VAR ER_TOO_MUCH_AUTO_TIMESTAMP_COLS ER_INVALID_ON_UPDATE ER_DDL_DML_COEXIST ER_SLAVE_CORRUPT_EVENT ER_COLLATION_CHARSET_MISMATCH ER_NOT_SUPPORTED_ALTER_OPTION ER_CONFLICTING_DECLARATIONS ER_IDENT_USE_KEYWORD ER_VIEW_SELECT_CLAUSE ER_OSC_KILL_FAILED ER_NET_PACKETS_OUT_OF_ORDER ER_NOT_SUPPORTED_ITEM_TYPE ER_INVALID_IDENT ER_INCEPTION_EMPTY_QUERY ER_PK_COLS_NOT_INT ER_PK_TOO_MANY_PARTS ER_REMOVED_SPACES ER_CHANGE_COLUMN_TYPE ER_CANT_DROP_TABLE ER_CANT_DROP_DATABASE ER_WRONG_TABLE_NAME ER_CANT_SET_CHARSET ER_CANT_SET_COLLATION ER_CANT_SET_ENGINE ER_MUST_AT_LEAST_ONE_COLUMN ER_MUST_HAVE_COLUMNS ErrColumnsMustHaveIndex ErrColumnsMustHaveIndexTypeErr ER_PRIMARY_CANT_HAVE_NULL ErrCantRemoveAllFields ErrNotFoundTableInfo ErrNotFoundThreadId ErrNotFoundMasterStatus ErrNonUniqTable ErrWrongUsage ErrDataTooLong ErrCharsetNotSupport ErrCollationNotSupport ErrTableCollationNotSupport ErrJsonTypeSupport ErrEngineNotSupport ErrMixOfGroupFuncAndFields ErrFieldNotInGroupBy ErCantChangeColumnPosition ErCantChangeColumn ER_DATETIME_DEFAULT ER_TOO_MUCH_AUTO_DATETIME_COLS ErrFloatDoubleToDecimal ErrIdentifierUpper ErrWrongAndExpr ErrCannotAddForeign ErrWrongFkDefWithMatch ErrFkDupName ErrJoinNoOnCondition ErrImplicitTypeConversion ErrUseValueExpr ER_ERROR_LAST )
type ExplainInfo ¶
type ExplainInfo struct { SelectType string `gorm:"Column:select_type"` Table string `gorm:"Column:table"` Partitions string `gorm:"Column:partitions"` Type string `gorm:"Column:type"` PossibleKeys string `gorm:"Column:possible_keys"` Key string `gorm:"Column:key"` KeyLen string `gorm:"Column:key_len"` Ref string `gorm:"Column:ref"` Rows int `gorm:"Column:rows"` Filtered float32 `gorm:"Column:filtered"` Extra string `gorm:"Column:Extra"` // TiDB的Explain预估行数存储在Count中 Count float32 `gorm:"Column:count"` }
ExplainInfo 执行计划信息
type FieldInfo ¶
type FieldInfo struct { Field string `gorm:"Column:Field"` Type string `gorm:"Column:Type"` Collation string `gorm:"Column:Collation"` Null string `gorm:"Column:Null"` Key string `gorm:"Column:Key"` Default *string `gorm:"Column:Default"` Extra string `gorm:"Column:Extra"` Privileges string `gorm:"Column:Privileges"` Comment string `gorm:"Column:Comment"` IsDeleted bool `gorm:"-"` IsNew bool `gorm:"-"` Tp *types.FieldType `gorm:"-"` }
FieldInfo 字段信息
type IndexInfo ¶
type IndexInfo struct { gorm.Model Table string `gorm:"Column:Table"` NonUnique int `gorm:"Column:Non_unique"` IndexName string `gorm:"Column:Key_name"` Seq int `gorm:"Column:Seq_in_index"` ColumnName string `gorm:"Column:Column_name"` IndexType string `gorm:"Column:Index_type"` IsDeleted bool `gorm:"-"` }
IndexInfo 索引信息
type LevelSets ¶
type LevelSets struct {
// contains filtered or unexported fields
}
func NewLevelSets ¶
type MasterStatus ¶
type MasterStatus struct { gorm.Model File string `gorm:"Column:File"` Position int `gorm:"Column:Position"` BinlogDoDB string `gorm:"Column:Binlog_Do_DB"` BinlogIgnoreDB string `gorm:"Column:Binlog_Ignore_DB"` ExecutedGtidSet string `gorm:"Column:Executed_Gtid_Set"` }
MasterStatus 主库状态信息,包括当前日志文件,位置等
type MyRecordSets ¶
func NewRecordSets ¶
func NewRecordSets() *MyRecordSets
func (*MyRecordSets) All ¶
func (s *MyRecordSets) All() []*Record
func (*MyRecordSets) Append ¶
func (s *MyRecordSets) Append(r *Record)
func (*MyRecordSets) Next ¶
func (s *MyRecordSets) Next() *Record
func (*MyRecordSets) Rows ¶
func (s *MyRecordSets) Rows() []sqlexec.RecordSet
type PrintRecord ¶
type PrintRecord struct { ID int SQL string // 审核级别,0为成功,1为警告,2为错误 ErrLevel uint8 // 错误/警告信息 ErrorMessage string // 语法树 QueryTree string }
func (*PrintRecord) List ¶
func (r *PrintRecord) List() []interface{}
type PrintSets ¶
type PrintSets struct {
// contains filtered or unexported fields
}
func NewPrintSets ¶
func NewPrintSets() *PrintSets
type ProcessListSets ¶
type ProcessListSets struct {
// contains filtered or unexported fields
}
func NewOscProcessListSets ¶
func NewOscProcessListSets(count int, hideCommand bool) *ProcessListSets
func NewProcessListSets ¶
func NewProcessListSets(count int) *ProcessListSets
func (*ProcessListSets) Rows ¶
func (s *ProcessListSets) Rows() []sqlexec.RecordSet
type Record ¶
type Record struct { // 阶段 RERUN EXECUTED CHECKED Stage byte // 阶段说明 Execute Successfully / 审核完成 / 失败... // Audit completed // Execute failed // Execute Successfully // Execute Successfully,Backup successfully // Execute Successfully,Backup failed StageStatus byte // 审核级别,0为成功,1为警告,2为错误 ErrLevel uint8 // 错误/警告信息 ErrorMessage string Sql string // 受影响行 AffectedRows int // 备份库的库名 BackupDBName string // 执行用时 ExecTime string // 备份用时 BackupCostTime string // sql的hash值,osc使用 Sqlsha1 string Buf *bytes.Buffer Type ast.StmtNode // 备份相关 ExecTimestamp int64 StartFile string StartPosition int EndFile string EndPosition int ThreadId uint32 SeqNo int DBName string TableName string TableInfo *TableInfo // ddl回滚 DDLRollback string OPID string ExecComplete bool // 是否开启OSC UseOsc bool // update多表时,记录多余的表 // update多表时,默认set第一列的表为主表,其余表才会记录到该处 // 仅在发现多表操作时,初始化该参数 MultiTables map[string]*TableInfo }
Record 审核/执行记录.
type Rewrite ¶
Rewrite 用于重写SQL
func NewRewrite ¶
NewRewrite 返回一个*Rewrite对象,如果SQL无法被正常解析,将错误输出到日志中,返回一个nil
func (*Rewrite) RewriteDML2Select ¶
RewriteDML2Select dml2select: DML 转成 SELECT,兼容低版本的 EXPLAIN
func (*Rewrite) TestSelect2Count ¶
type SQLError ¶
SQLError records an error information, from executing SQL.
func NewErr ¶
NewErr generates a SQL error, with an error code and default format specifier defined in MySQLErrName.
type Session ¶
type Session interface { sessionctx.Context AffectedRows() uint64 // Affected rows by latest executed stmt. // Execute(context.Context, string) ([]sqlexec.RecordSet, error) // Execute a sql statement. Execute(context.Context, string) ([]Record, error) // Execute a sql statement. ExecuteInc(context.Context, string) ([]sqlexec.RecordSet, error) // Execute a sql statement. SetConnectionID(uint64) SetProcessInfo(string, time.Time, byte) SetCollation(coID int) error SetSessionManager(util.SessionManager) Auth(user *auth.UserIdentity, auth []byte, salt []byte) bool ShowProcess() util.ProcessInfo // 用以测试 GetAlterTablePostPart(sql string, isPtOSC bool) string // LoadOptions 加载配置 LoadOptions(opt SourceOptions) error // Audit 审核 Audit(ctx context.Context, sql string) ([]Record, error) // RunExecute 执行 RunExecute(ctx context.Context, sql string) ([]Record, error) // 特殊SQL审核 CheckStmt(ctx context.Context, stmtNode ast.StmtNode, currentSQL string) ([]sqlexec.RecordSet, error) // 拆分 Split(ctx context.Context, sql string) ([]SplitRecord, error) // 打印语法树 Print(ctx context.Context, sql string) ([]PrintRecord, error) // 打印语法树 QueryTree(ctx context.Context, sql string) ([]PrintRecord, error) }
Session context
func CreateSession ¶
CreateSession creates a new session environment.
func CreateSession4Test ¶
CreateSession4Test creates a new session environment for test.
type SourceOptions ¶
type SourceOptions struct { Host string Port int User string Password string Check bool Execute bool Backup bool IgnoreWarnings bool // 每次执行后休眠多少毫秒. 用以降低对线上数据库的影响,特别是针对大量写入的操作. // 单位为毫秒,最小值为0, 最大值为100秒,也就是100000毫秒 Sleep int // 执行多条后休眠, 最小值1,默认值1 SleepRows int // 仅供第三方扩展使用! 设置该字符串会跳过binlog解析! MiddlewareExtend string MiddlewareDB string // 原始主机和端口,用以解析binlog ParseHost string ParsePort int // sql指纹功能,可在调用参数中设置,也可全局设置,值取并集 Fingerprint bool // 打印语法树功能 Print bool // DDL/DML分隔功能 Split bool // 使用count(*)计算受影响行数 RealRowCount bool // 连接的数据库,默认为mysql DB string Ssl string // 连接加密 SslCA string // 证书颁发机构(CA)证书 SslCert string // 客户端公共密钥证书 SslKey string // 客户端私钥文件 // 事务支持,一次执行多少条 TranBatch int }
SourceOptions 线上数据库信息和审核或执行的参数
type SplitRecord ¶
type SplitRecord struct { ID int SQL string // 审核级别,0为成功,1为警告,2为错误 ErrLevel uint8 // 错误/警告信息 ErrorMessage string // ddl标志位 IsDDL bool }
func (*SplitRecord) List ¶
func (r *SplitRecord) List() []interface{}
type SplitSets ¶
type SplitSets struct {
// contains filtered or unexported fields
}
func NewSplitSets ¶
func NewSplitSets() *SplitSets
type TableInfo ¶
type TableInfo struct { Schema string Name string // 表别名,仅用于update,delete多表 AsName string Fields []FieldInfo // 索引 Indexes []*IndexInfo // 是否已删除 IsDeleted bool // 备份库是否已创建 IsCreated bool // 表是否为新增 IsNew bool // 列是否为新增 IsNewColumns bool AlterCount int // 是否已清除已删除的列[解析binlog时会自动清除已删除的列] IsClear bool // 表大小.单位MB TableSize uint // 字符集&排序规则 Collation string // contains filtered or unexported fields }
TableInfo 表结构. 表结构实现了快照功能,在表结构变更前,会复制快照,在快照上做变更 在解析binlog时,基于执行时的快照做binlog解析,以实现删除列时的binlog解析
type TxnState ¶
type TxnState struct { // States of a TxnState should be one of the followings: // Invalid: kv.Transaction == nil && txnFuture == nil // Pending: kv.Transaction == nil && txnFuture != nil // Valid: kv.Transaction != nil && txnFuture == nil kv.Transaction // contains filtered or unexported fields }
TxnState wraps kv.Transaction to provide a new kv.Transaction. 1. It holds all statement related modification in the buffer before flush to the txn, so if execute statement meets error, the txn won't be made dirty. 2. It's a lazy transaction, that means it's a txnFuture befort StartTS() is really need.
func (*TxnState) SeekReverse ¶
SeekReverse overrides the Transaction interface.
type VariableSets ¶
type VariableSets struct {
// contains filtered or unexported fields
}
func NewVariableSets ¶
func NewVariableSets(count int) *VariableSets
func (*VariableSets) Append ¶
func (s *VariableSets) Append(name string, value string)
func (*VariableSets) Rows ¶
func (s *VariableSets) Rows() []sqlexec.RecordSet