Documentation ¶
Index ¶
- Constants
- Variables
- func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, ...) (autoIDBase, autoIDMax int64, err error)
- func ConnectMySQL(cfg *mysql.Config) (*sql.DB, error)
- func EncodeIntRowID(rowID int64) []byte
- func EscapeIdentifier(identifier string) string
- func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo
- func GetJSON(ctx context.Context, client *http.Client, url string, v interface{}) error
- func InterpolateMySQLString(s string) string
- func IsContextCanceledError(err error) bool
- func IsDirExists(name string) bool
- func IsEmptyDir(name string) bool
- func IsRetryableError(err error) bool
- func KillMySelf() error
- func NormalizeError(err error) error
- func NormalizeOrWrapErr(rfcErr *errors.Error, err error, args ...interface{}) error
- func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, ...) error
- func Retry(purpose string, parentLogger log.Logger, action func() error) error
- func SameDisk(dir1 string, dir2 string) (bool, error)
- func SchemaExists(ctx context.Context, db utils.QueryExecutor, schema string) (bool, error)
- func StringSliceEqual(a, b []string) bool
- func TableExists(ctx context.Context, db utils.QueryExecutor, schema, table string) (bool, error)
- func TableHasAutoID(info *model.TableInfo) bool
- func TableHasAutoRowID(info *model.TableInfo) bool
- func UniqueTable(schema string, table string) string
- func WriteMySQLIdentifier(builder *strings.Builder, identifier string)
- type ConnPool
- type GRPCConns
- type KvPair
- type MySQLConnectParam
- type OnceError
- type Pauser
- type SQLWithRetry
- func (t SQLWithRetry) Exec(ctx context.Context, purpose string, query string, args ...interface{}) error
- func (t SQLWithRetry) QueryRow(ctx context.Context, purpose string, query string, dest ...interface{}) error
- func (t SQLWithRetry) QueryStringRows(ctx context.Context, purpose string, query string) ([][]string, error)
- func (t SQLWithRetry) Transact(ctx context.Context, purpose string, ...) error
- type StorageSize
- type TLS
- func (tc *TLS) GetJSON(ctx context.Context, path string, v interface{}) error
- func (tc *TLS) TLSConfig() *tls.Config
- func (tc *TLS) ToGRPCDialOption() grpc.DialOption
- func (tc *TLS) ToPDSecurityOption() pd.SecurityOption
- func (tc *TLS) ToTiKVSecurityConfig() config.Security
- func (tc *TLS) WithHost(host string) *TLS
- func (tc *TLS) WrapListener(l net.Listener) net.Listener
Constants ¶
const (
// IndexEngineID is the engine ID for index engine.
IndexEngineID = -1
)
Variables ¶
var ( ErrUnknown = errors.Normalize("unknown error", errors.RFCCodeText("Lightning:Common:ErrUnknown")) ErrInvalidArgument = errors.Normalize("invalid argument", errors.RFCCodeText("Lightning:Common:ErrInvalidArgument")) ErrVersionMismatch = errors.Normalize("version mismatch", errors.RFCCodeText("Lightning:Common:ErrVersionMismatch")) ErrReadConfigFile = errors.Normalize("cannot read config file '%s'", errors.RFCCodeText("Lightning:Config:ErrReadConfigFile")) ErrParseConfigFile = errors.Normalize("cannot parse config file '%s'", errors.RFCCodeText("Lightning:Config:ErrParseConfigFile")) ErrInvalidConfig = errors.Normalize("invalid config", errors.RFCCodeText("Lightning:Config:ErrInvalidConfig")) ErrInvalidTLSConfig = errors.Normalize("invalid tls config", errors.RFCCodeText("Lightning:Config:ErrInvalidTLSConfig")) ErrInvalidSortedKVDir = errors.Normalize("invalid sorted-kv-dir '%s' for local backend, please change the config or delete the path", errors.RFCCodeText("Lightning:Config:ErrInvalidSortedKVDir")) ErrStorageUnknown = errors.Normalize("unknown storage error", errors.RFCCodeText("Lightning:Storage:ErrStorageUnknown")) ErrInvalidPermission = errors.Normalize("invalid permission", errors.RFCCodeText("Lightning:Storage:ErrInvalidPermission")) ErrInvalidStorageConfig = errors.Normalize("invalid data-source-dir", errors.RFCCodeText("Lightning:Storage:ErrInvalidStorageConfig")) ErrEmptySourceDir = errors.Normalize("data-source-dir '%s' doesn't exist or contains no files", errors.RFCCodeText("Lightning:Storage:ErrEmptySourceDir")) ErrTableRoute = errors.Normalize("table route error", errors.RFCCodeText("Lightning:Loader:ErrTableRoute")) ErrInvalidSchemaFile = errors.Normalize("invalid schema file", errors.RFCCodeText("Lightning:Loader:ErrInvalidSchemaFile")) ErrTooManySourceFiles = errors.Normalize("too many source files", errors.RFCCodeText("Lightning:Loader:ErrTooManySourceFiles")) ErrSystemRequirementNotMet = errors.Normalize("system requirement not met", errors.RFCCodeText("Lightning:PreCheck:ErrSystemRequirementNotMet")) ErrCheckpointSchemaConflict = errors.Normalize("checkpoint schema conflict", errors.RFCCodeText("Lightning:PreCheck:ErrCheckpointSchemaConflict")) ErrPreCheckFailed = errors.Normalize("tidb-lightning pre-check failed: %s", errors.RFCCodeText("Lightning:PreCheck:ErrPreCheckFailed")) ErrCheckClusterRegion = errors.Normalize("check tikv cluster region error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckClusterRegion")) ErrCheckLocalResource = errors.Normalize("check local storage resource error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckLocalResource")) ErrCheckTableEmpty = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty")) ErrCheckCSVHeader = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader")) ErrCheckDataSource = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource")) ErrCheckCDCPiTR = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR")) ErrOpenCheckpoint = errors.Normalize("open checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrOpenCheckpoint")) ErrReadCheckpoint = errors.Normalize("read checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrReadCheckpoint")) ErrUpdateCheckpoint = errors.Normalize("update checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrUpdateCheckpoint")) ErrUnknownCheckpointDriver = errors.Normalize("unknown checkpoint driver '%s'", errors.RFCCodeText("Lightning:Checkpoint:ErrUnknownCheckpointDriver")) ErrInvalidCheckpoint = errors.Normalize("invalid checkpoint", errors.RFCCodeText("Lightning:Checkpoint:ErrInvalidCheckpoint")) ErrCheckpointNotFound = errors.Normalize("checkpoint not found", errors.RFCCodeText("Lightning:Checkpoint:ErrCheckpointNotFound")) ErrInitCheckpoint = errors.Normalize("init checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrInitCheckpoint")) ErrCleanCheckpoint = errors.Normalize("clean checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrCleanCheckpoint")) ErrMetaMgrUnknown = errors.Normalize("unknown error occur on meta manager", errors.RFCCodeText("Lightning:MetaMgr:ErrMetaMgrUnknown")) ErrDBConnect = errors.Normalize("failed to connect database", errors.RFCCodeText("Lightning:DB:ErrDBConnect")) ErrInitErrManager = errors.Normalize("init error manager error", errors.RFCCodeText("Lightning:DB:ErrInitErrManager")) ErrInitMetaManager = errors.Normalize("init meta manager error", errors.RFCCodeText("Lightning:DB:ErrInitMetaManager")) ErrUpdatePD = errors.Normalize("update pd error", errors.RFCCodeText("Lightning:PD:ErrUpdatePD")) ErrCreatePDClient = errors.Normalize("create pd client error", errors.RFCCodeText("Lightning:PD:ErrCreatePDClient")) ErrPauseGC = errors.Normalize("pause gc error", errors.RFCCodeText("Lightning:PD:ErrPauseGC")) ErrCheckKVVersion = errors.Normalize("check tikv version error", errors.RFCCodeText("Lightning:KV:ErrCheckKVVersion")) ErrCreateKVClient = errors.Normalize("create kv client error", errors.RFCCodeText("Lightning:KV:ErrCreateKVClient")) ErrCheckMultiIngest = errors.Normalize("check multi-ingest support error", errors.RFCCodeText("Lightning:KV:ErrCheckMultiIngest")) ErrKVEpochNotMatch = errors.Normalize("epoch not match", errors.RFCCodeText("Lightning:KV:EpochNotMatch")) ErrKVNotLeader = errors.Normalize("not leader", errors.RFCCodeText("Lightning:KV:NotLeader")) ErrKVServerIsBusy = errors.Normalize("server is busy", errors.RFCCodeText("Lightning:KV:ServerIsBusy")) ErrKVRegionNotFound = errors.Normalize("region not found", errors.RFCCodeText("Lightning:KV:RegionNotFound")) ErrKVReadIndexNotReady = errors.Normalize("read index not ready", errors.RFCCodeText("Lightning:KV:ReadIndexNotReady")) ErrKVIngestFailed = errors.Normalize("ingest tikv failed", errors.RFCCodeText("Lightning:KV:ErrKVIngestFailed")) ErrKVRaftProposalDropped = errors.Normalize("raft proposal dropped", errors.RFCCodeText("Lightning:KV:ErrKVRaftProposalDropped")) ErrUnknownBackend = errors.Normalize("unknown backend %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownBackend")) ErrCheckLocalFile = errors.Normalize("cannot find local file for table: %s engineDir: %s", errors.RFCCodeText("Lightning:Restore:ErrCheckLocalFile")) ErrOpenDuplicateDB = errors.Normalize("open duplicate db error", errors.RFCCodeText("Lightning:Restore:ErrOpenDuplicateDB")) ErrSchemaNotExists = errors.Normalize("table `%s`.`%s` schema not found", errors.RFCCodeText("Lightning:Restore:ErrSchemaNotExists")) ErrInvalidSchemaStmt = errors.Normalize("invalid schema statement: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidSchemaStmt")) ErrCreateSchema = errors.Normalize("create schema failed, table: %s, stmt: %s", errors.RFCCodeText("Lightning:Restore:ErrCreateSchema")) ErrUnknownColumns = errors.Normalize("unknown columns in header (%s) for table %s", errors.RFCCodeText("Lightning:Restore:ErrUnknownColumns")) ErrChecksumMismatch = errors.Normalize("checksum mismatched remote vs local => (checksum: %d vs %d) (total_kvs: %d vs %d) (total_bytes:%d vs %d)", errors.RFCCodeText("Lighting:Restore:ErrChecksumMismatch")) ErrRestoreTable = errors.Normalize("restore table %s failed", errors.RFCCodeText("Lightning:Restore:ErrRestoreTable")) ErrEncodeKV = errors.Normalize("encode kv error in file %s at offset %d", errors.RFCCodeText("Lightning:Restore:ErrEncodeKV")) ErrAllocTableRowIDs = errors.Normalize("allocate table row id error", errors.RFCCodeText("Lightning:Restore:ErrAllocTableRowIDs")) ErrInvalidMetaStatus = errors.Normalize("invalid meta status: '%s'", errors.RFCCodeText("Lightning:Restore:ErrInvalidMetaStatus")) ErrTableIsChecksuming = errors.Normalize("table '%s' is checksuming", errors.RFCCodeText("Lightning:Restore:ErrTableIsChecksuming")) ErrResolveDuplicateRows = errors.Normalize("resolve duplicate rows error on table '%s'", errors.RFCCodeText("Lightning:Restore:ErrResolveDuplicateRows")) ErrFoundDuplicateKeys = errors.Normalize("found duplicate key '%s', value '%s'", errors.RFCCodeText("Lightning:Restore:ErrFoundDuplicateKey")) ErrAddIndexFailed = errors.Normalize("add index on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrAddIndexFailed")) ErrDropIndexFailed = errors.Normalize("drop index %s on table %s failed", errors.RFCCodeText("Lightning:Restore:ErrDropIndexFailed")) )
error definitions
var DefaultImportVariablesTiDB = map[string]string{
"tidb_row_format_version": "1",
}
DefaultImportVariablesTiDB is used in ObtainImportantVariables to retrieve the system variables from downstream in local/importer backend. The values record the default values if missing.
var DefaultImportantVariables = map[string]string{
"max_allowed_packet": "67108864",
"div_precision_increment": "4",
"time_zone": "SYSTEM",
"lc_time_names": "en_US",
"default_week_format": "0",
"block_encryption_mode": "aes-128-ecb",
"group_concat_max_len": "1024",
}
DefaultImportantVariables is used in ObtainImportantVariables to retrieve the system variables from downstream which may affect KV encode result. The values record the default values if missing.
var EncodeIntRowIDToBuf = codec.EncodeComparableVarint
EncodeIntRowIDToBuf encodes an int64 row id to a buffer.
Functions ¶
func AllocGlobalAutoID ¶
func AllocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error)
AllocGlobalAutoID allocs N consecutive autoIDs from TiDB.
func ConnectMySQL ¶
ConnectMySQL connects MySQL with the dsn. If access is denied and the password is a valid base64 encoding, we will try to connect MySQL with the base64 decoding of the password.
func EncodeIntRowID ¶
EncodeIntRowID encodes an int64 row id.
func EscapeIdentifier ¶
EscapeIdentifier quote and escape an sql identifier
func GetAutoRandomColumn ¶
func GetAutoRandomColumn(tblInfo *model.TableInfo) *model.ColumnInfo
GetAutoRandomColumn return the column with auto_random, return nil if the table doesn't have it. todo: better put in ddl package, but this will cause import cycle since ddl package import lightning
func GetJSON ¶
GetJSON fetches a page and parses it as JSON. The parsed result will be stored into the `v`. The variable `v` must be a pointer to a type that can be unmarshalled from JSON.
Example:
client := &http.Client{} var resp struct { IP string } if err := util.GetJSON(client, "http://api.ipify.org/?format=json", &resp); err != nil { return errors.Trace(err) } fmt.Println(resp.IP)
func InterpolateMySQLString ¶
InterpolateMySQLString interpolates a string into a MySQL string literal.
func IsContextCanceledError ¶
IsContextCanceledError returns whether the error is caused by context cancellation. This function should only be used when the code logic is affected by whether the error is canceling or not.
This function returns `false` (not a context-canceled error) if `err == nil`.
func IsRetryableError ¶
IsRetryableError returns whether the error is transient (e.g. network connection dropped) or irrecoverable (e.g. user pressing Ctrl+C). This function returns `false` (irrecoverable) if `err == nil`.
If the error is a multierr, returns true only if all suberrors are retryable.
func KillMySelf ¶
func KillMySelf() error
KillMySelf sends sigint to current process, used in integration test only
Only works on Unix. Signaling on Windows is not supported.
func NormalizeError ¶
NormalizeError converts an arbitrary error to *errors.Error based above predefined errors. If the underlying err is already an *error.Error which is prefixed by "Lightning:", leave error ID unchanged. Otherwise, converts the error ID to Lightning's predefined error IDs.
func NormalizeOrWrapErr ¶
NormalizeOrWrapErr tries to normalize err. If the returned error is ErrUnknown, wraps it with the given rfcErr.
func RebaseGlobalAutoID ¶
func RebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) error
RebaseGlobalAutoID rebase the autoID base to newBase.
func Retry ¶
Retry is shared by SQLWithRetry.perform, implementation of GlueCheckpointsDB and TiDB's glue implementation
func SchemaExists ¶
SchemaExists return whether schema with specified name exists.
func StringSliceEqual ¶
StringSliceEqual checks if two string slices are equal.
func TableExists ¶
TableExists return whether table with specified name exists in target db
func TableHasAutoID ¶
TableHasAutoID return whether table has auto generated id.
func TableHasAutoRowID ¶
TableHasAutoRowID return whether table has auto generated row id
func UniqueTable ¶
UniqueTable returns an unique table name.
func WriteMySQLIdentifier ¶
WriteMySQLIdentifier writes a MySQL identifier into the string builder. Writes a MySQL identifier into the string builder. The identifier is always escaped into the form "`foo`".
Types ¶
type ConnPool ¶
type ConnPool struct {
// contains filtered or unexported fields
}
ConnPool is a lazy pool of gRPC channels. When `Get` called, it lazily allocates new connection if connection not full. If it's full, then it will return allocated channels round-robin.
func NewConnPool ¶
func NewConnPool(capacity int, newConn func(ctx context.Context) (*grpc.ClientConn, error), logger log.Logger) *ConnPool
NewConnPool creates a new connPool by the specified conn factory function and capacity.
func (*ConnPool) TakeConns ¶
func (p *ConnPool) TakeConns() (conns []*grpc.ClientConn)
TakeConns takes all connections from the pool.
type GRPCConns ¶
type GRPCConns struct {
// contains filtered or unexported fields
}
GRPCConns is a pool of gRPC connections.
func (*GRPCConns) Close ¶
func (conns *GRPCConns) Close()
Close closes all gRPC connections in the pool.
func (*GRPCConns) GetGrpcConn ¶
func (conns *GRPCConns) GetGrpcConn(ctx context.Context, storeID uint64, tcpConcurrency int, newConn func(ctx context.Context) (*grpc.ClientConn, error)) (*grpc.ClientConn, error)
GetGrpcConn gets a gRPC connection from the pool.
type KvPair ¶
type KvPair struct { // Key is the key of the KV pair Key []byte // Val is the value of the KV pair Val []byte // RowID is the row id of the KV pair. RowID []byte }
KvPair is a pair of key and value.
type MySQLConnectParam ¶
type MySQLConnectParam struct { Host string Port int User string Password string SQLMode string MaxAllowedPacket uint64 TLSConfig *tls.Config AllowFallbackToPlaintext bool Net string Vars map[string]string }
MySQLConnectParam records the parameters needed to connect to a MySQL database.
func (*MySQLConnectParam) Connect ¶
func (param *MySQLConnectParam) Connect() (*sql.DB, error)
Connect creates a new connection to the database.
func (*MySQLConnectParam) ToDriverConfig ¶
func (param *MySQLConnectParam) ToDriverConfig() *mysql.Config
ToDriverConfig converts the MySQLConnectParam to a mysql.Config.
type OnceError ¶
type OnceError struct {
// contains filtered or unexported fields
}
OnceError is an error value which will can be assigned once.
The zero value is ready for use.
type Pauser ¶
type Pauser struct {
// contains filtered or unexported fields
}
Pauser is a type which could allow multiple goroutines to wait on demand, similar to a gate or traffic light.
type SQLWithRetry ¶
type SQLWithRetry struct { // either *sql.DB or *sql.Conn DB utils.DBExecutor Logger log.Logger HideQueryLog bool }
SQLWithRetry constructs a retryable transaction.
func (SQLWithRetry) Exec ¶
func (t SQLWithRetry) Exec(ctx context.Context, purpose string, query string, args ...interface{}) error
Exec executes a single SQL with optional retry.
func (SQLWithRetry) QueryRow ¶
func (t SQLWithRetry) QueryRow(ctx context.Context, purpose string, query string, dest ...interface{}) error
QueryRow executes a query that is expected to return at most one row.
func (SQLWithRetry) QueryStringRows ¶
func (t SQLWithRetry) QueryStringRows(ctx context.Context, purpose string, query string) ([][]string, error)
QueryStringRows executes a query that is expected to return multiple rows whose every column is string.
type StorageSize ¶
StorageSize represents the storage's capacity and available size Learn from tidb-binlog source code.
func GetStorageSize ¶
func GetStorageSize(dir string) (size StorageSize, err error)
GetStorageSize gets storage's capacity and available size
type TLS ¶
type TLS struct {
// contains filtered or unexported fields
}
TLS is a wrapper around a TLS configuration.
func NewTLS ¶
func NewTLS(caPath, certPath, keyPath, host string, caBytes, certBytes, keyBytes []byte) (*TLS, error)
NewTLS constructs a new HTTP client with TLS configured with the CA, certificate and key paths.
func NewTLSFromMockServer ¶
NewTLSFromMockServer constructs a new TLS instance from the certificates of an *httptest.Server.
func (*TLS) ToGRPCDialOption ¶
func (tc *TLS) ToGRPCDialOption() grpc.DialOption
ToGRPCDialOption constructs a gRPC dial option.
func (*TLS) ToPDSecurityOption ¶
func (tc *TLS) ToPDSecurityOption() pd.SecurityOption
ToPDSecurityOption converts the TLS configuration to a PD security option.
func (*TLS) ToTiKVSecurityConfig ¶
ToTiKVSecurityConfig converts the TLS configuration to a TiKV security config. TODO: TiKV does not support pass in content.