Documentation ¶
Index ¶
- Constants
- func IsCheckpointTable(name string) bool
- func IsCheckpointsDBExists(ctx context.Context, cfg *config.Config) (bool, error)
- func Transact(ctx context.Context, purpose string, s Session, logger log.Logger, ...) error
- type CheckpointStatus
- type CheckpointsDB
- type ChunkCheckpoint
- type ChunkCheckpointKey
- type ChunkCheckpointMerger
- type DestroyedTableCheckpoint
- type EngineCheckpoint
- type FileCheckpointsDB
- func (cpdb *FileCheckpointsDB) Close() error
- func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error)
- func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error
- func (cpdb *FileCheckpointsDB) DumpEngines(context.Context, io.Writer) error
- func (cpdb *FileCheckpointsDB) DumpTables(context.Context, io.Writer) error
- func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error)
- func (cpdb *FileCheckpointsDB) GetLocalStoringTables(_ context.Context) (map[string][]int32, error)
- func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error
- func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
- func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
- func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
- func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error
- func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error)
- func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
- type GlueCheckpointsDB
- func (g GlueCheckpointsDB) Close() error
- func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
- func (g GlueCheckpointsDB) DumpChunks(ctx context.Context, csv io.Writer) error
- func (g GlueCheckpointsDB) DumpEngines(ctx context.Context, csv io.Writer) error
- func (g GlueCheckpointsDB) DumpTables(ctx context.Context, csv io.Writer) error
- func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
- func (g GlueCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
- func (g GlueCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
- func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
- func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, ...) error
- func (g GlueCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
- func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
- func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
- func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
- type MySQLCheckpointsDB
- func (cpdb *MySQLCheckpointsDB) Close() error
- func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
- func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer) error
- func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Writer) error
- func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer) error
- func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
- func (cpdb *MySQLCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
- func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
- func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
- func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
- func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
- func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
- func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
- func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
- type NullCheckpointsDB
- func (*NullCheckpointsDB) Close() error
- func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error)
- func (*NullCheckpointsDB) DumpChunks(context.Context, io.Writer) error
- func (*NullCheckpointsDB) DumpEngines(context.Context, io.Writer) error
- func (*NullCheckpointsDB) DumpTables(context.Context, io.Writer) error
- func (*NullCheckpointsDB) Get(_ context.Context, _ string) (*TableCheckpoint, error)
- func (*NullCheckpointsDB) GetLocalStoringTables(context.Context) (map[string][]int32, error)
- func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error
- func (*NullCheckpointsDB) Initialize(context.Context, *config.Config, map[string]*TidbDBInfo) error
- func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error
- func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error
- func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error
- func (*NullCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
- func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff)
- type RebaseCheckpointMerger
- type Session
- type StatusCheckpointMerger
- type TableCheckpoint
- type TableCheckpointDiff
- type TableCheckpointMerger
- type TaskCheckpoint
- type TidbDBInfo
- type TidbTableInfo
Constants ¶
View Source
const ( // the table names to store each kind of checkpoint in the checkpoint database // remember to increase the version number in case of incompatible change. CheckpointTableNameTask = "task_v2" CheckpointTableNameTable = "table_v6" CheckpointTableNameEngine = "engine_v5" CheckpointTableNameChunk = "chunk_v5" )
View Source
const ( // shared by MySQLCheckpointsDB and GlueCheckpointsDB CreateDBTemplate = "CREATE DATABASE IF NOT EXISTS %s;" CreateTaskTableTemplate = `` /* 376-byte string literal not displayed */ CreateTableTableTemplate = `` /* 435-byte string literal not displayed */ CreateEngineTableTemplate = `` /* 338-byte string literal not displayed */ CreateChunkTableTemplate = `` /* 827-byte string literal not displayed */ InitTaskTemplate = `` /* 174-byte string literal not displayed */ InitTableTemplate = `` /* 182-byte string literal not displayed */ ReadTaskTemplate = `` /* 139-byte string literal not displayed */ ReadEngineTemplate = ` SELECT engine_id, status FROM %s.%s WHERE table_name = ? ORDER BY engine_id DESC;` ReadChunkTemplate = `` /* 268-byte string literal not displayed */ ReadTableRemainTemplate = ` SELECT status, alloc_base, table_id FROM %s.%s WHERE table_name = ?;` ReplaceEngineTemplate = ` REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);` ReplaceChunkTemplate = `` /* 346-byte string literal not displayed */ UpdateChunkTemplate = `` /* 168-byte string literal not displayed */ UpdateTableRebaseTemplate = ` UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;` UpdateTableStatusTemplate = ` UPDATE %s.%s SET status = ? WHERE table_name = ?;` UpdateEngineTemplate = ` UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);` DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;" )
View Source
const WholeTableEngineID = math.MaxInt32
Variables ¶
This section is empty.
Functions ¶
func IsCheckpointTable ¶
func IsCheckpointsDBExists ¶
Types ¶
type CheckpointStatus ¶
type CheckpointStatus uint8
const ( CheckpointStatusMissing CheckpointStatus = 0 CheckpointStatusMaxInvalid CheckpointStatus = 25 CheckpointStatusLoaded CheckpointStatus = 30 CheckpointStatusAllWritten CheckpointStatus = 60 CheckpointStatusClosed CheckpointStatus = 90 CheckpointStatusImported CheckpointStatus = 120 CheckpointStatusIndexImported CheckpointStatus = 140 CheckpointStatusAlteredAutoInc CheckpointStatus = 150 CheckpointStatusChecksumSkipped CheckpointStatus = 170 CheckpointStatusChecksummed CheckpointStatus = 180 CheckpointStatusAnalyzeSkipped CheckpointStatus = 200 CheckpointStatusAnalyzed CheckpointStatus = 210 )
func (CheckpointStatus) MetricName ¶
func (status CheckpointStatus) MetricName() string
type CheckpointsDB ¶
type CheckpointsDB interface { Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error) Get(ctx context.Context, tableName string) (*TableCheckpoint, error) Close() error // InsertEngineCheckpoints initializes the checkpoints related to a table. // It assumes the entire table has not been imported before and will fill in // default values for the column permutations and checksums. InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error Update(checkpointDiffs map[string]*TableCheckpointDiff) RemoveCheckpoint(ctx context.Context, tableName string) error // MoveCheckpoints renames the checkpoint schema to include a suffix // including the taskID (e.g. `tidb_lightning_checkpoints.1234567890.bak`). MoveCheckpoints(ctx context.Context, taskID int64) error // GetLocalStoringTables returns a map containing tables have engine files stored on local disk. // currently only meaningful for local backend GetLocalStoringTables(ctx context.Context) (map[string][]int32, error) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) DumpTables(ctx context.Context, csv io.Writer) error DumpEngines(ctx context.Context, csv io.Writer) error DumpChunks(ctx context.Context, csv io.Writer) error }
func OpenCheckpointsDB ¶
type ChunkCheckpoint ¶
type ChunkCheckpoint struct { Key ChunkCheckpointKey FileMeta mydump.SourceFileMeta ColumnPermutation []int Chunk mydump.Chunk Checksum verify.KVChecksum Timestamp int64 }
func (*ChunkCheckpoint) DeepCopy ¶
func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint
type ChunkCheckpointKey ¶
func (*ChunkCheckpointKey) String ¶
func (key *ChunkCheckpointKey) String() string
type ChunkCheckpointMerger ¶
type ChunkCheckpointMerger struct { EngineID int32 Key ChunkCheckpointKey Checksum verify.KVChecksum Pos int64 RowID int64 ColumnPermutation []int }
func (*ChunkCheckpointMerger) MergeInto ¶
func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)
type EngineCheckpoint ¶
type EngineCheckpoint struct { Status CheckpointStatus Chunks []*ChunkCheckpoint // a sorted array }
func (*EngineCheckpoint) DeepCopy ¶
func (engine *EngineCheckpoint) DeepCopy() *EngineCheckpoint
type FileCheckpointsDB ¶
type FileCheckpointsDB struct {
// contains filtered or unexported fields
}
func NewFileCheckpointsDB ¶
func NewFileCheckpointsDB(path string) *FileCheckpointsDB
func (*FileCheckpointsDB) Close ¶
func (cpdb *FileCheckpointsDB) Close() error
func (*FileCheckpointsDB) DestroyErrorCheckpoint ¶
func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error)
func (*FileCheckpointsDB) DumpChunks ¶
func (*FileCheckpointsDB) DumpEngines ¶
func (*FileCheckpointsDB) DumpTables ¶
func (*FileCheckpointsDB) Get ¶
func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error)
func (*FileCheckpointsDB) GetLocalStoringTables ¶
func (*FileCheckpointsDB) IgnoreErrorCheckpoint ¶
func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error
func (*FileCheckpointsDB) Initialize ¶
func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
func (*FileCheckpointsDB) InsertEngineCheckpoints ¶
func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
func (*FileCheckpointsDB) MoveCheckpoints ¶
func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
func (*FileCheckpointsDB) RemoveCheckpoint ¶
func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error
func (*FileCheckpointsDB) TaskCheckpoint ¶
func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error)
func (*FileCheckpointsDB) Update ¶
func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
type GlueCheckpointsDB ¶
type GlueCheckpointsDB struct {
// contains filtered or unexported fields
}
GlueCheckpointsDB is almost same with MySQLCheckpointsDB, but it uses TiDB's internal data structure which requires a lot to keep same with database/sql. TODO: Encapsulate Begin/Commit/Rollback txn, form SQL with args and query/iter/scan TiDB's RecordSet into a interface to reuse MySQLCheckpointsDB.
func NewGlueCheckpointsDB ¶
func (GlueCheckpointsDB) Close ¶
func (g GlueCheckpointsDB) Close() error
func (GlueCheckpointsDB) DestroyErrorCheckpoint ¶
func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
func (GlueCheckpointsDB) DumpChunks ¶
func (GlueCheckpointsDB) DumpEngines ¶
func (GlueCheckpointsDB) DumpTables ¶
func (GlueCheckpointsDB) Get ¶
func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
func (GlueCheckpointsDB) GetLocalStoringTables ¶
func (GlueCheckpointsDB) IgnoreErrorCheckpoint ¶
func (g GlueCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
func (GlueCheckpointsDB) Initialize ¶
func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
func (GlueCheckpointsDB) InsertEngineCheckpoints ¶
func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpointMap map[int32]*EngineCheckpoint) error
func (GlueCheckpointsDB) MoveCheckpoints ¶
func (g GlueCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
func (GlueCheckpointsDB) RemoveCheckpoint ¶
func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
func (GlueCheckpointsDB) TaskCheckpoint ¶
func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
func (GlueCheckpointsDB) Update ¶
func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
type MySQLCheckpointsDB ¶
type MySQLCheckpointsDB struct {
// contains filtered or unexported fields
}
func NewMySQLCheckpointsDB ¶
func (*MySQLCheckpointsDB) Close ¶
func (cpdb *MySQLCheckpointsDB) Close() error
func (*MySQLCheckpointsDB) DestroyErrorCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
func (*MySQLCheckpointsDB) DumpChunks ¶
func (*MySQLCheckpointsDB) DumpEngines ¶
func (*MySQLCheckpointsDB) DumpTables ¶
func (*MySQLCheckpointsDB) Get ¶
func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
func (*MySQLCheckpointsDB) GetLocalStoringTables ¶
func (*MySQLCheckpointsDB) IgnoreErrorCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
func (*MySQLCheckpointsDB) Initialize ¶
func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
func (*MySQLCheckpointsDB) InsertEngineCheckpoints ¶
func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
func (*MySQLCheckpointsDB) MoveCheckpoints ¶
func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
func (*MySQLCheckpointsDB) RemoveCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
func (*MySQLCheckpointsDB) TaskCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
func (*MySQLCheckpointsDB) Update ¶
func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
type NullCheckpointsDB ¶
type NullCheckpointsDB struct{}
NullCheckpointsDB is a checkpoints database with no checkpoints.
func NewNullCheckpointsDB ¶
func NewNullCheckpointsDB() *NullCheckpointsDB
func (*NullCheckpointsDB) Close ¶
func (*NullCheckpointsDB) Close() error
func (*NullCheckpointsDB) DestroyErrorCheckpoint ¶
func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error)
func (*NullCheckpointsDB) DumpChunks ¶
func (*NullCheckpointsDB) DumpEngines ¶
func (*NullCheckpointsDB) DumpTables ¶
func (*NullCheckpointsDB) Get ¶
func (*NullCheckpointsDB) Get(_ context.Context, _ string) (*TableCheckpoint, error)
func (*NullCheckpointsDB) GetLocalStoringTables ¶
func (*NullCheckpointsDB) IgnoreErrorCheckpoint ¶
func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error
func (*NullCheckpointsDB) Initialize ¶
func (*NullCheckpointsDB) Initialize(context.Context, *config.Config, map[string]*TidbDBInfo) error
func (*NullCheckpointsDB) InsertEngineCheckpoints ¶
func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error
func (*NullCheckpointsDB) MoveCheckpoints ¶
func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error
func (*NullCheckpointsDB) RemoveCheckpoint ¶
func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error
func (*NullCheckpointsDB) TaskCheckpoint ¶
func (*NullCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
func (*NullCheckpointsDB) Update ¶
func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff)
type RebaseCheckpointMerger ¶
type RebaseCheckpointMerger struct {
AllocBase int64
}
func (*RebaseCheckpointMerger) MergeInto ¶
func (merger *RebaseCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)
type Session ¶
type Session interface { Close() Execute(context.Context, string) ([]sqlexec.RecordSet, error) CommitTxn(context.Context) error RollbackTxn(context.Context) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error) ExecutePreparedStmt(ctx context.Context, stmtID uint32, param []types.Datum) (sqlexec.RecordSet, error) DropPreparedStmt(stmtID uint32) error }
type StatusCheckpointMerger ¶
type StatusCheckpointMerger struct { EngineID int32 // WholeTableEngineID == apply to whole table. Status CheckpointStatus }
func (*StatusCheckpointMerger) MergeInto ¶
func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)
func (*StatusCheckpointMerger) SetInvalid ¶
func (merger *StatusCheckpointMerger) SetInvalid()
type TableCheckpoint ¶
type TableCheckpoint struct { Status CheckpointStatus AllocBase int64 Engines map[int32]*EngineCheckpoint TableID int64 }
func (*TableCheckpoint) Apply ¶
func (cp *TableCheckpoint) Apply(cpd *TableCheckpointDiff)
Apply the diff to the existing chunk and engine checkpoints in `cp`.
func (*TableCheckpoint) CountChunks ¶
func (cp *TableCheckpoint) CountChunks() int
func (*TableCheckpoint) DeepCopy ¶
func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint
type TableCheckpointDiff ¶
type TableCheckpointDiff struct {
// contains filtered or unexported fields
}
func NewTableCheckpointDiff ¶
func NewTableCheckpointDiff() *TableCheckpointDiff
func (*TableCheckpointDiff) String ¶
func (cpd *TableCheckpointDiff) String() string
type TableCheckpointMerger ¶
type TableCheckpointMerger interface { // MergeInto the table checkpoint diff from a status update or chunk update. // If there are multiple updates to the same table, only the last one will // take effect. Therefore, the caller must ensure events for the same table // are properly ordered by the global time (an old event must be merged // before the new one). MergeInto(cpd *TableCheckpointDiff) }
type TaskCheckpoint ¶
type TidbDBInfo ¶
type TidbDBInfo struct { Name string Tables map[string]*TidbTableInfo }
Click to show internal directories.
Click to hide internal directories.