checkpoints

package
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 3, 2021 License: Apache-2.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// the table names to store each kind of checkpoint in the checkpoint database
	// remember to increase the version number in case of incompatible change.
	CheckpointTableNameTask   = "task_v2"
	CheckpointTableNameTable  = "table_v6"
	CheckpointTableNameEngine = "engine_v5"
	CheckpointTableNameChunk  = "chunk_v5"
)
View Source
const (
	// shared by MySQLCheckpointsDB and GlueCheckpointsDB
	CreateDBTemplate        = "CREATE DATABASE IF NOT EXISTS %s;"
	CreateTaskTableTemplate = `` /* 376-byte string literal not displayed */

	CreateTableTableTemplate = `` /* 435-byte string literal not displayed */

	CreateEngineTableTemplate = `` /* 338-byte string literal not displayed */

	CreateChunkTableTemplate = `` /* 827-byte string literal not displayed */

	InitTaskTemplate = `` /* 174-byte string literal not displayed */

	InitTableTemplate = `` /* 182-byte string literal not displayed */

	ReadTaskTemplate = `` /* 139-byte string literal not displayed */

	ReadEngineTemplate = `
		SELECT engine_id, status FROM %s.%s WHERE table_name = ? ORDER BY engine_id DESC;`
	ReadChunkTemplate = `` /* 268-byte string literal not displayed */

	ReadTableRemainTemplate = `
		SELECT status, alloc_base, table_id FROM %s.%s WHERE table_name = ?;`
	ReplaceEngineTemplate = `
		REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);`
	ReplaceChunkTemplate = `` /* 346-byte string literal not displayed */

	UpdateChunkTemplate = `` /* 168-byte string literal not displayed */

	UpdateTableRebaseTemplate = `
		UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;`
	UpdateTableStatusTemplate = `
		UPDATE %s.%s SET status = ? WHERE table_name = ?;`
	UpdateEngineTemplate = `
		UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);`
	DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;"
)
View Source
const WholeTableEngineID = math.MaxInt32

Variables

This section is empty.

Functions

func IsCheckpointTable

func IsCheckpointTable(name string) bool

func IsCheckpointsDBExists

func IsCheckpointsDBExists(ctx context.Context, cfg *config.Config) (bool, error)

func Transact

func Transact(ctx context.Context, purpose string, s Session, logger log.Logger, action func(context.Context, Session) error) error

Types

type CheckpointStatus

type CheckpointStatus uint8
const (
	CheckpointStatusMissing         CheckpointStatus = 0
	CheckpointStatusMaxInvalid      CheckpointStatus = 25
	CheckpointStatusLoaded          CheckpointStatus = 30
	CheckpointStatusAllWritten      CheckpointStatus = 60
	CheckpointStatusClosed          CheckpointStatus = 90
	CheckpointStatusImported        CheckpointStatus = 120
	CheckpointStatusIndexImported   CheckpointStatus = 140
	CheckpointStatusAlteredAutoInc  CheckpointStatus = 150
	CheckpointStatusChecksumSkipped CheckpointStatus = 170
	CheckpointStatusChecksummed     CheckpointStatus = 180
	CheckpointStatusAnalyzeSkipped  CheckpointStatus = 200
	CheckpointStatusAnalyzed        CheckpointStatus = 210
)

func (CheckpointStatus) MetricName

func (status CheckpointStatus) MetricName() string

type ChunkCheckpoint

type ChunkCheckpoint struct {
	Key               ChunkCheckpointKey
	FileMeta          mydump.SourceFileMeta
	ColumnPermutation []int
	Chunk             mydump.Chunk
	Checksum          verify.KVChecksum
	Timestamp         int64
}

func (*ChunkCheckpoint) DeepCopy

func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint

type ChunkCheckpointKey

type ChunkCheckpointKey struct {
	Path   string
	Offset int64
}

func (*ChunkCheckpointKey) String

func (key *ChunkCheckpointKey) String() string

type ChunkCheckpointMerger

type ChunkCheckpointMerger struct {
	EngineID          int32
	Key               ChunkCheckpointKey
	Checksum          verify.KVChecksum
	Pos               int64
	RowID             int64
	ColumnPermutation []int
}

func (*ChunkCheckpointMerger) MergeInto

func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)

type DB

type DB interface {
	Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
	TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
	Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
	Close() error
	// InsertEngineCheckpoints initializes the checkpoints related to a table.
	// It assumes the entire table has not been imported before and will fill in
	// default values for the column permutations and checksums.
	InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
	Update(checkpointDiffs map[string]*TableCheckpointDiff)

	RemoveCheckpoint(ctx context.Context, tableName string) error
	// MoveCheckpoints renames the checkpoint schema to include a suffix
	// including the taskID (e.g. `tidb_lightning_checkpoints.1234567890.bak`).
	MoveCheckpoints(ctx context.Context, taskID int64) error
	// GetLocalStoringTables returns a map containing tables have engine files stored on local disk.
	// currently only meaningful for local backend
	GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
	IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
	DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
	DumpTables(ctx context.Context, csv io.Writer) error
	DumpEngines(ctx context.Context, csv io.Writer) error
	DumpChunks(ctx context.Context, csv io.Writer) error
}

func OpenCheckpointsDB

func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (DB, error)

type DestroyedTableCheckpoint

type DestroyedTableCheckpoint struct {
	TableName   string
	MinEngineID int32
	MaxEngineID int32
}

type EngineCheckpoint

type EngineCheckpoint struct {
	Status CheckpointStatus
	Chunks []*ChunkCheckpoint // a sorted array
}

func (*EngineCheckpoint) DeepCopy

func (engine *EngineCheckpoint) DeepCopy() *EngineCheckpoint

type FileCheckpointsDB

type FileCheckpointsDB struct {
	// contains filtered or unexported fields
}

func NewFileCheckpointsDB

func NewFileCheckpointsDB(path string) *FileCheckpointsDB

func (*FileCheckpointsDB) Close

func (cpdb *FileCheckpointsDB) Close() error

func (*FileCheckpointsDB) DestroyErrorCheckpoint

func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error)

func (*FileCheckpointsDB) DumpChunks

func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error

func (*FileCheckpointsDB) DumpEngines

func (cpdb *FileCheckpointsDB) DumpEngines(context.Context, io.Writer) error

func (*FileCheckpointsDB) DumpTables

func (cpdb *FileCheckpointsDB) DumpTables(context.Context, io.Writer) error

func (*FileCheckpointsDB) Get

func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error)

func (*FileCheckpointsDB) GetLocalStoringTables

func (cpdb *FileCheckpointsDB) GetLocalStoringTables(_ context.Context) (map[string][]int32, error)

func (*FileCheckpointsDB) IgnoreErrorCheckpoint

func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error

func (*FileCheckpointsDB) Initialize

func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error

func (*FileCheckpointsDB) InsertEngineCheckpoints

func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error

func (*FileCheckpointsDB) MoveCheckpoints

func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error

func (*FileCheckpointsDB) RemoveCheckpoint

func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error

func (*FileCheckpointsDB) TaskCheckpoint

func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error)

func (*FileCheckpointsDB) Update

func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)

type GlueCheckpointsDB

type GlueCheckpointsDB struct {
	// contains filtered or unexported fields
}

GlueCheckpointsDB is almost same with MySQLCheckpointsDB, but it uses TiDB's internal data structure which requires a lot to keep same with database/sql. TODO: Encapsulate Begin/Commit/Rollback txn, form SQL with args and query/iter/scan TiDB's RecordSet into a interface to reuse MySQLCheckpointsDB.

func NewGlueCheckpointsDB

func NewGlueCheckpointsDB(ctx context.Context, se Session, f func() (Session, error), schemaName string) (*GlueCheckpointsDB, error)

func (GlueCheckpointsDB) Close

func (g GlueCheckpointsDB) Close() error

func (GlueCheckpointsDB) DestroyErrorCheckpoint

func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)

func (GlueCheckpointsDB) DumpChunks

func (g GlueCheckpointsDB) DumpChunks(ctx context.Context, csv io.Writer) error

func (GlueCheckpointsDB) DumpEngines

func (g GlueCheckpointsDB) DumpEngines(ctx context.Context, csv io.Writer) error

func (GlueCheckpointsDB) DumpTables

func (g GlueCheckpointsDB) DumpTables(ctx context.Context, csv io.Writer) error

func (GlueCheckpointsDB) Get

func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)

func (GlueCheckpointsDB) GetLocalStoringTables

func (g GlueCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)

func (GlueCheckpointsDB) IgnoreErrorCheckpoint

func (g GlueCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error

func (GlueCheckpointsDB) Initialize

func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error

func (GlueCheckpointsDB) InsertEngineCheckpoints

func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpointMap map[int32]*EngineCheckpoint) error

func (GlueCheckpointsDB) MoveCheckpoints

func (g GlueCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error

func (GlueCheckpointsDB) RemoveCheckpoint

func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error

func (GlueCheckpointsDB) TaskCheckpoint

func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)

func (GlueCheckpointsDB) Update

func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)

type MySQLCheckpointsDB

type MySQLCheckpointsDB struct {
	// contains filtered or unexported fields
}

func NewMySQLCheckpointsDB

func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (*MySQLCheckpointsDB, error)

func (*MySQLCheckpointsDB) Close

func (cpdb *MySQLCheckpointsDB) Close() error

func (*MySQLCheckpointsDB) DestroyErrorCheckpoint

func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)

func (*MySQLCheckpointsDB) DumpChunks

func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer) error

func (*MySQLCheckpointsDB) DumpEngines

func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Writer) error

func (*MySQLCheckpointsDB) DumpTables

func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer) error

func (*MySQLCheckpointsDB) Get

func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)

func (*MySQLCheckpointsDB) GetLocalStoringTables

func (cpdb *MySQLCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)

func (*MySQLCheckpointsDB) IgnoreErrorCheckpoint

func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error

func (*MySQLCheckpointsDB) Initialize

func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error

func (*MySQLCheckpointsDB) InsertEngineCheckpoints

func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error

func (*MySQLCheckpointsDB) MoveCheckpoints

func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error

func (*MySQLCheckpointsDB) RemoveCheckpoint

func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error

func (*MySQLCheckpointsDB) TaskCheckpoint

func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)

func (*MySQLCheckpointsDB) Update

func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)

type NullCheckpointsDB

type NullCheckpointsDB struct{}

NullCheckpointsDB is a checkpoints database with no checkpoints.

func NewNullCheckpointsDB

func NewNullCheckpointsDB() *NullCheckpointsDB

func (*NullCheckpointsDB) Close

func (*NullCheckpointsDB) Close() error

func (*NullCheckpointsDB) DestroyErrorCheckpoint

func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error)

func (*NullCheckpointsDB) DumpChunks

func (*NullCheckpointsDB) DumpEngines

func (*NullCheckpointsDB) DumpTables

func (*NullCheckpointsDB) Get

func (*NullCheckpointsDB) GetLocalStoringTables

func (*NullCheckpointsDB) GetLocalStoringTables(context.Context) (map[string][]int32, error)

func (*NullCheckpointsDB) IgnoreErrorCheckpoint

func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error

func (*NullCheckpointsDB) Initialize

func (*NullCheckpointsDB) InsertEngineCheckpoints

func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error

func (*NullCheckpointsDB) MoveCheckpoints

func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error

func (*NullCheckpointsDB) RemoveCheckpoint

func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error

func (*NullCheckpointsDB) TaskCheckpoint

func (*NullCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)

func (*NullCheckpointsDB) Update

type RebaseCheckpointMerger

type RebaseCheckpointMerger struct {
	AllocBase int64
}

func (*RebaseCheckpointMerger) MergeInto

func (merger *RebaseCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)

type Session

type Session interface {
	Close()
	Execute(context.Context, string) ([]sqlexec.RecordSet, error)
	CommitTxn(context.Context) error
	RollbackTxn(context.Context)
	PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error)
	ExecutePreparedStmt(ctx context.Context, stmtID uint32, param []types.Datum) (sqlexec.RecordSet, error)
	DropPreparedStmt(stmtID uint32) error
}

type StatusCheckpointMerger

type StatusCheckpointMerger struct {
	EngineID int32 // WholeTableEngineID == apply to whole table.
	Status   CheckpointStatus
}

func (*StatusCheckpointMerger) MergeInto

func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)

func (*StatusCheckpointMerger) SetInvalid

func (merger *StatusCheckpointMerger) SetInvalid()

type TableCheckpoint

type TableCheckpoint struct {
	Status    CheckpointStatus
	AllocBase int64
	Engines   map[int32]*EngineCheckpoint
	TableID   int64
}

func (*TableCheckpoint) Apply

func (cp *TableCheckpoint) Apply(cpd *TableCheckpointDiff)

Apply the diff to the existing chunk and engine checkpoints in `cp`.

func (*TableCheckpoint) CountChunks

func (cp *TableCheckpoint) CountChunks() int

func (*TableCheckpoint) DeepCopy

func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint

type TableCheckpointDiff

type TableCheckpointDiff struct {
	// contains filtered or unexported fields
}

func NewTableCheckpointDiff

func NewTableCheckpointDiff() *TableCheckpointDiff

func (*TableCheckpointDiff) String

func (cpd *TableCheckpointDiff) String() string

type TableCheckpointMerger

type TableCheckpointMerger interface {
	// MergeInto the table checkpoint diff from a status update or chunk update.
	// If there are multiple updates to the same table, only the last one will
	// take effect. Therefore, the caller must ensure events for the same table
	// are properly ordered by the global time (an old event must be merged
	// before the new one).
	MergeInto(cpd *TableCheckpointDiff)
}

type TaskCheckpoint

type TaskCheckpoint struct {
	TaskID       int64
	SourceDir    string
	Backend      string
	ImporterAddr string
	TiDBHost     string
	TiDBPort     int
	PdAddr       string
	SortedKVDir  string
	LightningVer string
}

type TidbDBInfo

type TidbDBInfo struct {
	Name   string
	Tables map[string]*TidbTableInfo
}

type TidbTableInfo

type TidbTableInfo struct {
	ID   int64
	DB   string
	Name string
	Core *model.TableInfo
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL