conn

package
v0.0.0-...-ca34dc8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2024 License: Apache-2.0 Imports: 47 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// LCTableNamesSensitive represent lower_case_table_names = 0, case sensitive.
	LCTableNamesSensitive LowerCaseTableNamesFlavor = 0
	// LCTableNamesInsensitive represent lower_case_table_names = 1, case insensitive.
	LCTableNamesInsensitive = 1
	// LCTableNamesMixed represent lower_case_table_names = 2, table names are case-sensitive, but case-insensitive in usage.
	LCTableNamesMixed = 2
)
View Source
const (
	// DefaultDBTimeout represents a DB operation timeout for common usages.
	DefaultDBTimeout = 30 * time.Second
)

Variables

This section is empty.

Functions

func AddGSetWithPurged

func AddGSetWithPurged(ctx context.Context, gset gmysql.GTIDSet, conn *BaseConn) (gmysql.GTIDSet, error)

AddGSetWithPurged is used to handle this case: https://github.com/pingcap/dm/issues/1418 we might get a gtid set from Previous_gtids event in binlog, but that gtid set can't be used to start a gtid sync because it doesn't cover all gtid_purged. The error of using it will be ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. so we add gtid_purged to it.

func AdjustSQLModeCompatible

func AdjustSQLModeCompatible(sqlModes string) (string, error)

AdjustSQLModeCompatible adjust downstream sql mode to compatible. TODO: When upstream's datatime is 2020-00-00, 2020-00-01, 2020-06-00 and so on, downstream will be 2019-11-30, 2019-12-01, 2020-05-31, as if set the 'NO_ZERO_IN_DATE', 'NO_ZERO_DATE'. This is because the implementation of go-mysql, that you can see https://github.com/go-mysql-org/go-mysql/blob/master/replication/row_event.go#L1063-L1087

func CreateTableSQLToOneRow

func CreateTableSQLToOneRow(sql string) string

CreateTableSQLToOneRow formats the result of SHOW CREATE TABLE to one row.

func ExtractTiDBVersion

func ExtractTiDBVersion(version string) (*semver.Version, error)

ExtractTiDBVersion extract tidb's version version format: "5.7.25-TiDB-v3.0.0-beta-211-g09beefbe0-dirty" - ^..........

func FetchAllDoTables

func FetchAllDoTables(ctx context.Context, db *BaseDB, bw *filter.Filter) (map[string][]string, error)

FetchAllDoTables returns all need to do tables after filtered (fetches from upstream MySQL).

func FetchTableEstimatedBytes

func FetchTableEstimatedBytes(ctx context.Context, db *BaseDB, schema string, table string) (int64, error)

FetchTableEstimatedBytes returns the estimated size (data + index) in bytes of the table.

func FetchTargetDoTables

func FetchTargetDoTables(
	ctx context.Context,
	source string,
	db *BaseDB,
	bw *filter.Filter,
	router *regexprrouter.RouteTable,
) (map[filter.Table][]filter.Table, map[filter.Table][]string, error)

FetchTargetDoTables returns all need to do tables after filtered and routed (fetches from upstream MySQL).

func GetAllServerID

func GetAllServerID(ctx *tcontext.Context, db *BaseDB) (map[uint32]struct{}, error)

GetAllServerID gets all slave server id and master server id.

func GetBinlogDB

func GetBinlogDB(ctx *tcontext.Context, db *BaseDB, flavor string) (string, string, error)

GetBinlogDB get binlog_do_db and binlog_ignore_db from `show master status`.

func GetDBCaseSensitive

func GetDBCaseSensitive(ctx context.Context, db *BaseDB) (bool, error)

GetDBCaseSensitive returns the case-sensitive setting of target db.

func GetFlavor

func GetFlavor(ctx context.Context, db *BaseDB) (string, error)

GetFlavor gets flavor from DB.

func GetGTIDMode

func GetGTIDMode(ctx *tcontext.Context, db *BaseDB) (string, error)

GetGTIDMode return GTID_MODE.

func GetGlobalVariable

func GetGlobalVariable(ctx *tcontext.Context, db *BaseDB, variable string) (value string, err error)

GetGlobalVariable gets server's global variable.

func GetMariaDBGtidDomainID

func GetMariaDBGtidDomainID(ctx *tcontext.Context, db *BaseDB) (uint32, error)

GetMariaDBGtidDomainID gets MariaDB server's `gtid_domain_id`.

func GetMariaDBUUID

func GetMariaDBUUID(ctx *tcontext.Context, db *BaseDB) (string, error)

GetMariaDBUUID gets equivalent `server_uuid` for MariaDB `gtid_domain_id` joined `server_id` with domainServerIDSeparator.

func GetMasterStatus

func GetMasterStatus(ctx *tcontext.Context, db *BaseDB, flavor string) (
	string, uint64, string, string, string, error,
)

GetMasterStatus gets status from master. When the returned error is nil, the gtid must be not nil.

func GetMaxConnections

func GetMaxConnections(ctx *tcontext.Context, db *BaseDB) (int, error)

GetMaxConnections gets max_connections for sql.DB which is suitable for session variable max_connections.

func GetMaxConnectionsForConn

func GetMaxConnectionsForConn(ctx *tcontext.Context, conn *BaseConn) (int, error)

GetMaxConnectionsForConn gets max_connections for BaseConn which is suitable for session variable max_connections.

func GetParser

func GetParser(ctx *tcontext.Context, db *BaseDB) (*parser.Parser, error)

GetParser gets a parser for sql.DB which is suitable for session variable sql_mode.

func GetParserForConn

func GetParserForConn(ctx *tcontext.Context, conn *BaseConn) (*parser.Parser, error)

GetParserForConn gets a parser for BaseConn which is suitable for session variable sql_mode.

func GetParserFromSQLModeStr

func GetParserFromSQLModeStr(sqlMode string) (*parser.Parser, error)

GetParserFromSQLModeStr gets a parser and applies given sqlMode.

func GetPosAndGs

func GetPosAndGs(ctx *tcontext.Context, db *BaseDB, flavor string) (
	binlogPos gmysql.Position,
	gs gmysql.GTIDSet,
	err error,
)

GetPosAndGs get binlog position and gmysql.GTIDSet from `show master status`.

func GetRandomServerID

func GetRandomServerID(ctx *tcontext.Context, db *BaseDB) (uint32, error)

GetRandomServerID gets a random server ID which is not used.

func GetSQLModeStrBySQLMode

func GetSQLModeStrBySQLMode(sqlMode tmysql.SQLMode) string

GetSQLModeStrBySQLMode get string represent of sql_mode by sql_mode.

func GetServerID

func GetServerID(ctx *tcontext.Context, db *BaseDB) (uint32, error)

GetServerID gets server's `server_id`.

func GetServerUUID

func GetServerUUID(ctx *tcontext.Context, db *BaseDB, flavor string) (string, error)

GetServerUUID gets server's `server_uuid`.

func GetServerUnixTS

func GetServerUnixTS(ctx context.Context, db *BaseDB) (int64, error)

GetServerUnixTS gets server's `UNIX_TIMESTAMP()`.

func GetSessionVariable

func GetSessionVariable(ctx *tcontext.Context, conn *BaseConn, variable string) (value string, err error)

GetSessionVariable gets connection's session variable.

func GetSlaveServerID

func GetSlaveServerID(ctx *tcontext.Context, db *BaseDB) (map[uint32]struct{}, error)

GetSlaveServerID gets all slave server id.

func InitMockDB

func InitMockDB(c *check.C) sqlmock.Sqlmock

InitMockDB return a mocked db for unit test.

func InitMockDBFull

func InitMockDBFull() (*sql.DB, sqlmock.Sqlmock, error)

func InitMockDBNotClose

func InitMockDBNotClose() (*sql.DB, sqlmock.Sqlmock, error)

func InitVersionDB

func InitVersionDB() sqlmock.Sqlmock

InitVersionDB return a mocked db for unit test's show version.

func IsErrBinlogPurged

func IsErrBinlogPurged(err error) bool

IsErrBinlogPurged checks whether err is BinlogPurged error.

func IsErrDuplicateEntry

func IsErrDuplicateEntry(err error) bool

IsErrDuplicateEntry checks whether err is DuplicateEntry error.

func IsMariaDB

func IsMariaDB(version string) bool

IsMariaDB tells whether the version is mariadb.

func IsMySQLError

func IsMySQLError(err error, code uint16) bool

IsMySQLError checks whether err is MySQLError error.

func IsNoSuchThreadError

func IsNoSuchThreadError(err error) bool

IsNoSuchThreadError checks whether err is NoSuchThreadError.

func KillConn

func KillConn(ctx *tcontext.Context, db *BaseDB, connID uint32) error

KillConn kills the DB connection (thread in mysqld).

func MockDefaultDBProvider

func MockDefaultDBProvider() (sqlmock.Sqlmock, error)

MockDefaultDBProvider return a mocked db for unit test.

Types

type BaseConn

type BaseConn struct {
	DBConn        *sql.Conn
	Scope         terror.ErrScope
	RetryStrategy retry.Strategy
}

BaseConn is the basic connection we use in dm BaseDB -> BaseConn correspond to sql.DB -> sql.Conn In our scenario, there are two main reasons why we need BaseConn

  1. we often need one fixed DB connection to execute sql
  2. we need own retry policy during execute failed

So we split a fixed sql.Conn out of sql.DB, and wraps it to BaseConn And Similar with sql.Conn, all BaseConn generated from one BaseDB shares this BaseDB to reset

Basic usage: For Syncer and Loader Unit, they both have different amount of connections due to config Currently we have some types of connections exist

Syncer:
	Worker Connection:
		DML connection:
			execute some DML on Downstream DB, one unit has `syncer.WorkerCount` worker connections
		DDL Connection:
			execute some DDL on Downstream DB, one unit has one connection
	CheckPoint Connection:
		interact with CheckPoint DB, one unit has one connection
	OnlineDDL connection:
		interact with Online DDL DB, one unit has one connection
	ShardGroupKeeper connection:
		interact with ShardGroupKeeper DB, one unit has one connection

Loader:
	Worker Connection:
		execute some DML to Downstream DB, one unit has `loader.PoolSize` worker connections
	CheckPoint Connection:
		interact with CheckPoint DB, one unit has one connection
	Restore Connection:
		only use to create schema and table in restoreData,
		it ignore already exists error and it should be removed after use, one unit has one connection

each connection should have ability to retry on some common errors (e.g. tmysql.ErrTiKVServerTimeout) or maybe some specify errors in the future and each connection also should have ability to reset itself during some specify connection error (e.g. driver.ErrBadConn).

func NewBaseConn

func NewBaseConn(conn *sql.Conn, scope terror.ErrScope, strategy retry.Strategy) *BaseConn

NewBaseConn builds BaseConn to connect real DB.

func NewBaseConnForTest

func NewBaseConnForTest(conn *sql.Conn, strategy retry.Strategy) *BaseConn

NewBaseConnForTest builds BaseConn to connect real DB for test.

func (*BaseConn) ApplyRetryStrategy

func (conn *BaseConn) ApplyRetryStrategy(tctx *tcontext.Context, params retry.Params,
	operateFn func(*tcontext.Context) (interface{}, error),
) (interface{}, int, error)

ApplyRetryStrategy apply specify strategy for BaseConn.

func (*BaseConn) ExecuteSQL

func (conn *BaseConn) ExecuteSQL(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, queries []string, args ...[]interface{}) (int, error)

ExecuteSQL executes sql on real DB, return 1. failed: (the index of sqls executed error, error) 2. succeed: (rows affected, nil).

func (*BaseConn) ExecuteSQLWithIgnoreError

func (conn *BaseConn) ExecuteSQLWithIgnoreError(tctx *tcontext.Context, hVec *prometheus.HistogramVec, task string, ignoreErr func(error) bool, queries []string, args ...[]interface{}) (int, error)

ExecuteSQLWithIgnoreError executes sql on real DB, and will ignore some error and continue execute the next query. return 1. failed: (the index of sqls executed error, error) 2. succeed: (rows affected, nil).

func (*BaseConn) ExecuteSQLsAutoSplit

func (conn *BaseConn) ExecuteSQLsAutoSplit(
	tctx *tcontext.Context,
	hVec *prometheus.HistogramVec,
	task string,
	queries []string,
	args ...[]interface{},
) error

ExecuteSQLsAutoSplit executes sqls and when meet "transaction too large" error, it will try to split the sqls into two parts and execute them again. The `queries` and `args` should be the same length.

func (*BaseConn) QuerySQL

func (conn *BaseConn) QuerySQL(tctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error)

QuerySQL runs a query statement.

func (*BaseConn) SetRetryStrategy

func (conn *BaseConn) SetRetryStrategy(strategy retry.Strategy) error

SetRetryStrategy set retry strategy for baseConn.

type BaseDB

type BaseDB struct {
	DB *sql.DB

	Retry retry.Strategy

	Scope terror.ErrScope
	// contains filtered or unexported fields
}

BaseDB wraps *sql.DB, control the BaseConn.

func GetDownstreamDB

func GetDownstreamDB(cfg *dbconfig.DBConfig) (*BaseDB, error)

func GetUpstreamDB

func GetUpstreamDB(cfg *dbconfig.DBConfig) (*BaseDB, error)

func NewBaseDB

func NewBaseDB(db *sql.DB, scope terror.ErrScope, doFuncInClose ...func()) *BaseDB

NewBaseDB returns *BaseDB object for test.

func NewBaseDBForTest

func NewBaseDBForTest(db *sql.DB, doFuncInClose ...func()) *BaseDB

NewBaseDBForTest returns *BaseDB object for test.

func NewMockDB

func NewMockDB(db *sql.DB, doFuncInClose ...func()) *BaseDB

NewMockDB returns *BaseDB object for mock.

func (*BaseDB) Close

func (d *BaseDB) Close() error

Close release *BaseDB resource.

func (*BaseDB) CloseConn

func (d *BaseDB) CloseConn(conn *BaseConn) error

CloseConn release BaseConn resource from BaseDB, and returns the connection to the connection pool, has the same meaning of sql.Conn.Close.

func (*BaseDB) CloseConnWithoutErr

func (d *BaseDB) CloseConnWithoutErr(conn *BaseConn)

CloseConnWithoutErr release BaseConn resource from BaseDB, and returns the connection to the connection pool, has the same meaning of sql.Conn.Close, and log warning on error.

func (*BaseDB) DoTxWithRetry

func (d *BaseDB) DoTxWithRetry(tctx *tcontext.Context, queries []string, args [][]interface{}, retryer retry.Retryer) error

func (*BaseDB) ExecContext

func (d *BaseDB) ExecContext(tctx *tcontext.Context, query string, args ...interface{}) (sql.Result, error)

TODO: retry can be done inside the BaseDB.

func (*BaseDB) ForceCloseConn

func (d *BaseDB) ForceCloseConn(conn *BaseConn) error

ForceCloseConn release BaseConn resource from BaseDB, and close BaseConn completely(not return to the connection pool).

func (*BaseDB) ForceCloseConnWithoutErr

func (d *BaseDB) ForceCloseConnWithoutErr(conn *BaseConn)

ForceCloseConnWithoutErr close the connection completely(not return to the conn pool), and output a warning log if meets an error.

func (*BaseDB) GetBaseConn

func (d *BaseDB) GetBaseConn(ctx context.Context) (*BaseConn, error)

GetBaseConn retrieves *BaseConn which has own retryStrategy.

func (*BaseDB) QueryContext

func (d *BaseDB) QueryContext(tctx *tcontext.Context, query string, args ...interface{}) (*sql.Rows, error)

TODO: retry can be done inside the BaseDB.

type Cluster

TODO: export Config in https://github.com/pingcap/tidb/blob/a8fa29b56d633b1ec843e21cb89131dd4fd601db/br/pkg/mock/mock_cluster.go#L35 Cluster is mock tidb cluster.

func NewCluster

func NewCluster() (*Cluster, error)

NewCluster create a new mock cluster.

func (*Cluster) Start

func (mock *Cluster) Start() error

Start runs a mock cluster.

func (*Cluster) Stop

func (mock *Cluster) Stop()

Stop stops a mock cluster.

type DBProvider

type DBProvider interface {
	Apply(config ScopedDBConfig) (*BaseDB, error)
}

DBProvider providers BaseDB instance.

var DefaultDBProvider DBProvider

DefaultDBProvider is global instance of DBProvider.

type DefaultDBProviderImpl

type DefaultDBProviderImpl struct{}

DefaultDBProviderImpl is default DBProvider implement.

func (*DefaultDBProviderImpl) Apply

func (d *DefaultDBProviderImpl) Apply(config ScopedDBConfig) (*BaseDB, error)

Apply will build BaseDB with DBConfig.

type LowerCaseTableNamesFlavor

type LowerCaseTableNamesFlavor uint8

LowerCaseTableNamesFlavor represents the type of db `lower_case_table_names` settings.

func FetchLowerCaseTableNamesSetting

func FetchLowerCaseTableNamesSetting(ctx context.Context, conn *BaseConn) (LowerCaseTableNamesFlavor, error)

FetchLowerCaseTableNamesSetting return the `lower_case_table_names` setting of target db.

type ScopedDBConfig

type ScopedDBConfig struct {
	*dbconfig.DBConfig
	Scope terror.ErrScope
}

func DownstreamDBConfig

func DownstreamDBConfig(cfg *dbconfig.DBConfig) ScopedDBConfig

func UpstreamDBConfig

func UpstreamDBConfig(cfg *dbconfig.DBConfig) ScopedDBConfig

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL