Documentation ¶
Index ¶
- Constants
- func AddGSetWithPurged(ctx context.Context, gset gmysql.GTIDSet, conn *BaseConn) (gmysql.GTIDSet, error)
- func AdjustSQLModeCompatible(sqlModes string) (string, error)
- func CreateTableSQLToOneRow(sql string) string
- func ExtractTiDBVersion(version string) (*semver.Version, error)
- func FetchAllDoTables(ctx context.Context, db *BaseDB, bw *filter.Filter) (map[string][]string, error)
- func FetchTableEstimatedBytes(ctx context.Context, db *BaseDB, schema string, table string) (int64, error)
- func FetchTargetDoTables(ctx context.Context, source string, db *BaseDB, bw *filter.Filter, ...) (map[filter.Table][]filter.Table, map[filter.Table][]string, error)
- func GetAllServerID(ctx *tcontext.Context, db *BaseDB) (map[uint32]struct{}, error)
- func GetBinlogDB(ctx *tcontext.Context, db *BaseDB, flavor string) (string, string, error)
- func GetDBCaseSensitive(ctx context.Context, db *BaseDB) (bool, error)
- func GetFlavor(ctx context.Context, db *BaseDB) (string, error)
- func GetGTIDMode(ctx *tcontext.Context, db *BaseDB) (string, error)
- func GetGlobalVariable(ctx *tcontext.Context, db *BaseDB, variable string) (value string, err error)
- func GetMariaDBGtidDomainID(ctx *tcontext.Context, db *BaseDB) (uint32, error)
- func GetMariaDBUUID(ctx *tcontext.Context, db *BaseDB) (string, error)
- func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) (string, uint64, string, string, string, error)
- func GetMaxConnections(ctx *tcontext.Context, db *BaseDB) (int, error)
- func GetMaxConnectionsForConn(ctx *tcontext.Context, conn *BaseConn) (int, error)
- func GetParser(ctx *tcontext.Context, db *BaseDB) (*parser.Parser, error)
- func GetParserForConn(ctx *tcontext.Context, conn *BaseConn) (*parser.Parser, error)
- func GetParserFromSQLModeStr(sqlMode string) (*parser.Parser, error)
- func GetPosAndGs(ctx *tcontext.Context, db *BaseDB, flavor string) (binlogPos gmysql.Position, gs gmysql.GTIDSet, err error)
- func GetRandomServerID(ctx *tcontext.Context, db *BaseDB) (uint32, error)
- func GetSQLModeStrBySQLMode(sqlMode tmysql.SQLMode) string
- func GetServerID(ctx *tcontext.Context, db *BaseDB) (uint32, error)
- func GetServerUUID(ctx *tcontext.Context, db *BaseDB, flavor string) (string, error)
- func GetServerUnixTS(ctx context.Context, db *BaseDB) (int64, error)
- func GetSessionVariable(ctx *tcontext.Context, conn *BaseConn, variable string) (value string, err error)
- func GetSlaveServerID(ctx *tcontext.Context, db *BaseDB) (map[uint32]struct{}, error)
- func InitMockDB(c *check.C) sqlmock.Sqlmock
- func InitMockDBFull() (*sql.DB, sqlmock.Sqlmock, error)
- func InitMockDBNotClose() (*sql.DB, sqlmock.Sqlmock, error)
- func InitVersionDB() sqlmock.Sqlmock
- func IsErrBinlogPurged(err error) bool
- func IsErrDuplicateEntry(err error) bool
- func IsMariaDB(version string) bool
- func IsMySQLError(err error, code uint16) bool
- func IsNoSuchThreadError(err error) bool
- func KillConn(ctx *tcontext.Context, db *BaseDB, connID uint32) error
- func MockDefaultDBProvider() (sqlmock.Sqlmock, error)
- type BaseConn
- func (conn *BaseConn) ApplyRetryStrategy(tctx *tcontext.Context, params retry.Params, ...) (interface{}, int, error)
- func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ...) (int, error)
- func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ...) (int, error)
- func (conn *BaseConn) ExecuteSQLsAutoSplit(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ...) error
- func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error)
- func (conn *BaseConn) SetRetryStrategy(strategy retry.Strategy) error
- type BaseDB
- func GetDownstreamDB(cfg *dbconfig.DBConfig) (*BaseDB, error)
- func GetUpstreamDB(cfg *dbconfig.DBConfig) (*BaseDB, error)
- func NewBaseDB(db *sql.DB, scope terror.ErrScope, doFuncInClose ...func()) *BaseDB
- func NewBaseDBForTest(db *sql.DB, doFuncInClose ...func()) *BaseDB
- func NewMockDB(db *sql.DB, doFuncInClose ...func()) *BaseDB
- func (d *BaseDB) Close() error
- func (d *BaseDB) CloseConn(conn *BaseConn) error
- func (d *BaseDB) CloseConnWithoutErr(conn *BaseConn)
- func (d *BaseDB) DoTxWithRetry(tctx *tcontext.Context, queries []string, args [][]interface{}, ...) error
- func (d *BaseDB) ExecContext(tctx *tcontext.Context, query string, args ...interface{}) (sql.Result, error)
- func (d *BaseDB) ForceCloseConn(conn *BaseConn) error
- func (d *BaseDB) ForceCloseConnWithoutErr(conn *BaseConn)
- func (d *BaseDB) GetBaseConn(ctx context.Context) (*BaseConn, error)
- func (d *BaseDB) QueryContext(tctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error)
- type Cluster
- type DBProvider
- type DefaultDBProviderImpl
- type LowerCaseTableNamesFlavor
- type ScopedDBConfig
Constants ¶
const ( // LCTableNamesSensitive represent lower_case_table_names = 0, case sensitive. LCTableNamesSensitive LowerCaseTableNamesFlavor = 0 // LCTableNamesInsensitive represent lower_case_table_names = 1, case insensitive. LCTableNamesInsensitive = 1 // LCTableNamesMixed represent lower_case_table_names = 2, table names are case-sensitive, but case-insensitive in usage. LCTableNamesMixed = 2 )
const ( // DefaultDBTimeout represents a DB operation timeout for common usages. DefaultDBTimeout = 30 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func AddGSetWithPurged ¶
func AddGSetWithPurged(ctx context.Context, gset gmysql.GTIDSet, conn *BaseConn) (gmysql.GTIDSet, error)
AddGSetWithPurged is used to handle this case: https://github.com/pingcap/dm/issues/1418 we might get a gtid set from Previous_gtids event in binlog, but that gtid set can't be used to start a gtid sync because it doesn't cover all gtid_purged. The error of using it will be ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. so we add gtid_purged to it.
func AdjustSQLModeCompatible ¶
AdjustSQLModeCompatible adjust downstream sql mode to compatible. TODO: When upstream's datatime is 2020-00-00, 2020-00-01, 2020-06-00 and so on, downstream will be 2019-11-30, 2019-12-01, 2020-05-31, as if set the 'NO_ZERO_IN_DATE', 'NO_ZERO_DATE'. This is because the implementation of go-mysql, that you can see https://github.com/go-mysql-org/go-mysql/blob/master/replication/row_event.go#L1063-L1087
func CreateTableSQLToOneRow ¶
CreateTableSQLToOneRow formats the result of SHOW CREATE TABLE to one row.
func ExtractTiDBVersion ¶
ExtractTiDBVersion extract tidb's version version format: "5.7.25-TiDB-v3.0.0-beta-211-g09beefbe0-dirty" - ^..........
func FetchAllDoTables ¶
func FetchAllDoTables(ctx context.Context, db *BaseDB, bw *filter.Filter) (map[string][]string, error)
FetchAllDoTables returns all need to do tables after filtered (fetches from upstream MySQL).
func FetchTableEstimatedBytes ¶
func FetchTableEstimatedBytes(ctx context.Context, db *BaseDB, schema string, table string) (int64, error)
FetchTableEstimatedBytes returns the estimated size (data + index) in bytes of the table.
func FetchTargetDoTables ¶
func FetchTargetDoTables( ctx context.Context, source string, db *BaseDB, bw *filter.Filter, router *regexprrouter.RouteTable, ) (map[filter.Table][]filter.Table, map[filter.Table][]string, error)
FetchTargetDoTables returns all need to do tables after filtered and routed (fetches from upstream MySQL).
func GetAllServerID ¶
GetAllServerID gets all slave server id and master server id.
func GetBinlogDB ¶
GetBinlogDB get binlog_do_db and binlog_ignore_db from `show master status`.
func GetDBCaseSensitive ¶
GetDBCaseSensitive returns the case-sensitive setting of target db.
func GetGTIDMode ¶
GetGTIDMode return GTID_MODE.
func GetGlobalVariable ¶
func GetGlobalVariable(ctx *tcontext.Context, db *BaseDB, variable string) (value string, err error)
GetGlobalVariable gets server's global variable.
func GetMariaDBGtidDomainID ¶
GetMariaDBGtidDomainID gets MariaDB server's `gtid_domain_id`.
func GetMariaDBUUID ¶
GetMariaDBUUID gets equivalent `server_uuid` for MariaDB `gtid_domain_id` joined `server_id` with domainServerIDSeparator.
func GetMasterStatus ¶
func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) ( string, uint64, string, string, string, error, )
GetMasterStatus gets status from master. When the returned error is nil, the gtid must be not nil.
func GetMaxConnections ¶
GetMaxConnections gets max_connections for sql.DB which is suitable for session variable max_connections.
func GetMaxConnectionsForConn ¶
GetMaxConnectionsForConn gets max_connections for BaseConn which is suitable for session variable max_connections.
func GetParser ¶
GetParser gets a parser for sql.DB which is suitable for session variable sql_mode.
func GetParserForConn ¶
GetParserForConn gets a parser for BaseConn which is suitable for session variable sql_mode.
func GetParserFromSQLModeStr ¶
GetParserFromSQLModeStr gets a parser and applies given sqlMode.
func GetPosAndGs ¶
func GetPosAndGs(ctx *tcontext.Context, db *BaseDB, flavor string) ( binlogPos gmysql.Position, gs gmysql.GTIDSet, err error, )
GetPosAndGs get binlog position and gmysql.GTIDSet from `show master status`.
func GetRandomServerID ¶
GetRandomServerID gets a random server ID which is not used.
func GetSQLModeStrBySQLMode ¶
GetSQLModeStrBySQLMode get string represent of sql_mode by sql_mode.
func GetServerID ¶
GetServerID gets server's `server_id`.
func GetServerUUID ¶
GetServerUUID gets server's `server_uuid`.
func GetServerUnixTS ¶
GetServerUnixTS gets server's `UNIX_TIMESTAMP()`.
func GetSessionVariable ¶
func GetSessionVariable(ctx *tcontext.Context, conn *BaseConn, variable string) (value string, err error)
GetSessionVariable gets connection's session variable.
func GetSlaveServerID ¶
GetSlaveServerID gets all slave server id.
func InitMockDB ¶
InitMockDB return a mocked db for unit test.
func InitMockDBFull ¶
func InitMockDBNotClose ¶
func InitVersionDB ¶
func InitVersionDB() sqlmock.Sqlmock
InitVersionDB return a mocked db for unit test's show version.
func IsErrBinlogPurged ¶
IsErrBinlogPurged checks whether err is BinlogPurged error.
func IsErrDuplicateEntry ¶
IsErrDuplicateEntry checks whether err is DuplicateEntry error.
func IsMySQLError ¶
IsMySQLError checks whether err is MySQLError error.
func IsNoSuchThreadError ¶
IsNoSuchThreadError checks whether err is NoSuchThreadError.
func MockDefaultDBProvider ¶
func MockDefaultDBProvider() (sqlmock.Sqlmock, error)
MockDefaultDBProvider return a mocked db for unit test.
Types ¶
type BaseConn ¶
BaseConn is the basic connection we use in dm BaseDB -> BaseConn correspond to sql.DB -> sql.Conn In our scenario, there are two main reasons why we need BaseConn
- we often need one fixed DB connection to execute sql
- we need own retry policy during execute failed
So we split a fixed sql.Conn out of sql.DB, and wraps it to BaseConn And Similar with sql.Conn, all BaseConn generated from one BaseDB shares this BaseDB to reset
Basic usage: For Syncer and Loader Unit, they both have different amount of connections due to config Currently we have some types of connections exist
Syncer: Worker Connection: DML connection: execute some DML on Downstream DB, one unit has `syncer.WorkerCount` worker connections DDL Connection: execute some DDL on Downstream DB, one unit has one connection CheckPoint Connection: interact with CheckPoint DB, one unit has one connection OnlineDDL connection: interact with Online DDL DB, one unit has one connection ShardGroupKeeper connection: interact with ShardGroupKeeper DB, one unit has one connection Loader: Worker Connection: execute some DML to Downstream DB, one unit has `loader.PoolSize` worker connections CheckPoint Connection: interact with CheckPoint DB, one unit has one connection Restore Connection: only use to create schema and table in restoreData, it ignore already exists error and it should be removed after use, one unit has one connection
each connection should have ability to retry on some common errors (e.g. tmysql.ErrTiKVServerTimeout) or maybe some specify errors in the future and each connection also should have ability to reset itself during some specify connection error (e.g. driver.ErrBadConn).
func NewBaseConn ¶
NewBaseConn builds BaseConn to connect real DB.
func NewBaseConnForTest ¶
NewBaseConnForTest builds BaseConn to connect real DB for test.
func (*BaseConn) ApplyRetryStrategy ¶
func (conn *BaseConn) ApplyRetryStrategy(tctx *tcontext.Context, params retry.Params, operateFn func(*tcontext.Context) (interface{}, error), ) (interface{}, int, error)
ApplyRetryStrategy apply specify strategy for BaseConn.
func (*BaseConn) ExecuteSQL ¶
func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}) (int, error)
ExecuteSQL executes sql on real DB, return 1. failed: (the index of sqls executed error, error) 2. succeed: (rows affected, nil).
func (*BaseConn) ExecuteSQLWithIgnoreError ¶
func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error)
ExecuteSQLWithIgnoreError executes sql on real DB, and will ignore some error and continue execute the next query. return 1. failed: (the index of sqls executed error, error) 2. succeed: (rows affected, nil).
func (*BaseConn) ExecuteSQLsAutoSplit ¶
func (conn *BaseConn) ExecuteSQLsAutoSplit( tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}, ) error
ExecuteSQLsAutoSplit executes sqls and when meet "transaction too large" error, it will try to split the sqls into two parts and execute them again. The `queries` and `args` should be the same length.
type BaseDB ¶
type BaseDB struct { DB *sql.DB Retry retry.Strategy Scope terror.ErrScope // contains filtered or unexported fields }
BaseDB wraps *sql.DB, control the BaseConn.
func NewBaseDBForTest ¶
NewBaseDBForTest returns *BaseDB object for test.
func (*BaseDB) CloseConn ¶
CloseConn release BaseConn resource from BaseDB, and returns the connection to the connection pool, has the same meaning of sql.Conn.Close.
func (*BaseDB) CloseConnWithoutErr ¶
CloseConnWithoutErr release BaseConn resource from BaseDB, and returns the connection to the connection pool, has the same meaning of sql.Conn.Close, and log warning on error.
func (*BaseDB) DoTxWithRetry ¶
func (*BaseDB) ExecContext ¶
func (d *BaseDB) ExecContext(tctx *tcontext.Context, query string, args ...interface{}) (sql.Result, error)
TODO: retry can be done inside the BaseDB.
func (*BaseDB) ForceCloseConn ¶
ForceCloseConn release BaseConn resource from BaseDB, and close BaseConn completely(not return to the connection pool).
func (*BaseDB) ForceCloseConnWithoutErr ¶
ForceCloseConnWithoutErr close the connection completely(not return to the conn pool), and output a warning log if meets an error.
func (*BaseDB) GetBaseConn ¶
GetBaseConn retrieves *BaseConn which has own retryStrategy.
type Cluster ¶
type Cluster struct { *server.Server testutils.Cluster kv.Storage *server.TiDBDriver *domain.Domain Port int }
TODO: export Config in https://github.com/pingcap/tidb/blob/a8fa29b56d633b1ec843e21cb89131dd4fd601db/br/pkg/mock/mock_cluster.go#L35 Cluster is mock tidb cluster.
type DBProvider ¶
type DBProvider interface {
Apply(config ScopedDBConfig) (*BaseDB, error)
}
DBProvider providers BaseDB instance.
var DefaultDBProvider DBProvider
DefaultDBProvider is global instance of DBProvider.
type DefaultDBProviderImpl ¶
type DefaultDBProviderImpl struct{}
DefaultDBProviderImpl is default DBProvider implement.
func (*DefaultDBProviderImpl) Apply ¶
func (d *DefaultDBProviderImpl) Apply(config ScopedDBConfig) (*BaseDB, error)
Apply will build BaseDB with DBConfig.
type LowerCaseTableNamesFlavor ¶
type LowerCaseTableNamesFlavor uint8
LowerCaseTableNamesFlavor represents the type of db `lower_case_table_names` settings.
func FetchLowerCaseTableNamesSetting ¶
func FetchLowerCaseTableNamesSetting(ctx context.Context, conn *BaseConn) (LowerCaseTableNamesFlavor, error)
FetchLowerCaseTableNamesSetting return the `lower_case_table_names` setting of target db.
type ScopedDBConfig ¶
func DownstreamDBConfig ¶
func DownstreamDBConfig(cfg *dbconfig.DBConfig) ScopedDBConfig
func UpstreamDBConfig ¶
func UpstreamDBConfig(cfg *dbconfig.DBConfig) ScopedDBConfig