checkpoints

package
v1.1.0-beta.0...-a878e1f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2024 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CheckpointTableNameTask   = "task_v2"
	CheckpointTableNameTable  = "table_v9"
	CheckpointTableNameEngine = "engine_v5"
	CheckpointTableNameChunk  = "chunk_v5"
)

the table names to store each kind of checkpoint in the checkpoint database remember to increase the version number in case of incompatible change.

View Source
const (
	CreateDBTemplate        = "CREATE DATABASE IF NOT EXISTS %s;"
	CreateTaskTableTemplate = `` /* 376-byte string literal not displayed */

	CreateTableTableTemplate = `` /* 616-byte string literal not displayed */

	CreateEngineTableTemplate = `` /* 338-byte string literal not displayed */

	CreateChunkTableTemplate = `` /* 827-byte string literal not displayed */

	InitTaskTemplate = `` /* 174-byte string literal not displayed */

	InitTableTemplate = `` /* 197-byte string literal not displayed */

	ReadTaskTemplate = `` /* 139-byte string literal not displayed */

	ReadEngineTemplate = `
		SELECT engine_id, status FROM %s.%s WHERE table_name = ? ORDER BY engine_id DESC;`
	ReadChunkTemplate = `` /* 268-byte string literal not displayed */

	ReadTableRemainTemplate = `
		SELECT status, alloc_base, table_id, table_info, kv_bytes, kv_kvs, kv_checksum FROM %s.%s WHERE table_name = ?;`
	ReplaceEngineTemplate = `
		REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);`
	ReplaceChunkTemplate = `` /* 346-byte string literal not displayed */

	UpdateChunkTemplate = `` /* 168-byte string literal not displayed */

	UpdateTableRebaseTemplate = `
		UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;`
	UpdateTableStatusTemplate = `
		UPDATE %s.%s SET status = ? WHERE table_name = ?;`
	UpdateTableChecksumTemplate = `UPDATE %s.%s SET kv_bytes = ?, kv_kvs = ?, kv_checksum = ? WHERE table_name = ?;`
	UpdateEngineTemplate        = `
		UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);`
	DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;"
)

some frequently used SQL statement templates. shared by MySQLCheckpointsDB and GlueCheckpointsDB

View Source
const WholeTableEngineID = math.MaxInt32

WholeTableEngineID is the engine ID used for the whole table engine.

Variables

This section is empty.

Functions

func IsCheckpointTable

func IsCheckpointTable(name string) bool

IsCheckpointTable checks if the table name is a checkpoint table.

func IsCheckpointsDBExists

func IsCheckpointsDBExists(ctx context.Context, cfg *config.Config) (bool, error)

IsCheckpointsDBExists checks if the checkpoints DB exists.

Types

type CheckpointStatus

type CheckpointStatus uint8

CheckpointStatus is the status of a checkpoint.

const (
	CheckpointStatusMissing         CheckpointStatus = 0
	CheckpointStatusMaxInvalid      CheckpointStatus = 25
	CheckpointStatusLoaded          CheckpointStatus = 30
	CheckpointStatusAllWritten      CheckpointStatus = 60
	CheckpointStatusDupDetected     CheckpointStatus = 70
	CheckpointStatusIndexDropped    CheckpointStatus = 80
	CheckpointStatusClosed          CheckpointStatus = 90
	CheckpointStatusImported        CheckpointStatus = 120
	CheckpointStatusIndexImported   CheckpointStatus = 140
	CheckpointStatusAlteredAutoInc  CheckpointStatus = 150
	CheckpointStatusChecksumSkipped CheckpointStatus = 170
	CheckpointStatusChecksummed     CheckpointStatus = 180
	CheckpointStatusIndexAdded      CheckpointStatus = 190
	CheckpointStatusAnalyzeSkipped  CheckpointStatus = 200
	CheckpointStatusAnalyzed        CheckpointStatus = 210
)

CheckpointStatus values.

func (CheckpointStatus) MetricName

func (status CheckpointStatus) MetricName() string

MetricName returns the metric name for the checkpoint status.

type ChunkCheckpoint

type ChunkCheckpoint struct {
	Key               ChunkCheckpointKey
	FileMeta          mydump.SourceFileMeta
	ColumnPermutation []int
	Chunk             mydump.Chunk
	Checksum          verify.KVChecksum
	Timestamp         int64
}

ChunkCheckpoint is the checkpoint for a chunk.

func (*ChunkCheckpoint) DeepCopy

func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint

DeepCopy returns a deep copy of the chunk checkpoint.

func (*ChunkCheckpoint) FinishedSize

func (ccp *ChunkCheckpoint) FinishedSize() int64

FinishedSize returns the size of the finished part of the chunk.

func (*ChunkCheckpoint) GetKey

func (ccp *ChunkCheckpoint) GetKey() string

GetKey returns the key of the chunk checkpoint.

func (*ChunkCheckpoint) TotalSize

func (ccp *ChunkCheckpoint) TotalSize() int64

TotalSize returns the total size of the chunk.

func (*ChunkCheckpoint) UnfinishedSize

func (ccp *ChunkCheckpoint) UnfinishedSize() int64

UnfinishedSize returns the size of the unfinished part of the chunk.

type ChunkCheckpointKey

type ChunkCheckpointKey struct {
	Path   string
	Offset int64
}

ChunkCheckpointKey is the key of a chunk checkpoint.

func (*ChunkCheckpointKey) String

func (key *ChunkCheckpointKey) String() string

String implements fmt.Stringer.

type ChunkCheckpointMerger

type ChunkCheckpointMerger struct {
	EngineID          int32
	Key               ChunkCheckpointKey
	Checksum          verify.KVChecksum
	Pos               int64
	RowID             int64
	ColumnPermutation []int
	EndOffset         int64 // For test only.
}

ChunkCheckpointMerger is the merger for chunk updates.

func (*ChunkCheckpointMerger) MergeInto

func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)

MergeInto implements TableCheckpointMerger.MergeInto.

type DB

type DB interface {
	Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
	TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
	Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
	Close() error
	// InsertEngineCheckpoints initializes the checkpoints related to a table.
	// It assumes the entire table has not been imported before and will fill in
	// default values for the column permutations and checksums.
	InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
	Update(taskCtx context.Context, checkpointDiffs map[string]*TableCheckpointDiff) error

	RemoveCheckpoint(ctx context.Context, tableName string) error
	// MoveCheckpoints renames the checkpoint schema to include a suffix
	// including the taskID (e.g. `tidb_lightning_checkpoints.1234567890.bak`).
	MoveCheckpoints(ctx context.Context, taskID int64) error
	// GetLocalStoringTables returns a map containing tables have engine files stored on local disk.
	// currently only meaningful for local backend
	GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
	IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
	DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
	DumpTables(ctx context.Context, csv io.Writer) error
	DumpEngines(ctx context.Context, csv io.Writer) error
	DumpChunks(ctx context.Context, csv io.Writer) error
}

DB is the interface for a checkpoint database.

func OpenCheckpointsDB

func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (DB, error)

OpenCheckpointsDB opens a checkpoints DB according to the given config.

type DestroyedTableCheckpoint

type DestroyedTableCheckpoint struct {
	TableName   string
	MinEngineID int32
	MaxEngineID int32
}

DestroyedTableCheckpoint is the checkpoint for a table that has been

type EngineCheckpoint

type EngineCheckpoint struct {
	Status CheckpointStatus
	Chunks []*ChunkCheckpoint // a sorted array
}

EngineCheckpoint is the checkpoint for an engine.

func (*EngineCheckpoint) DeepCopy

func (engine *EngineCheckpoint) DeepCopy() *EngineCheckpoint

DeepCopy returns a deep copy of the engine checkpoint.

type FileCheckpointsDB

type FileCheckpointsDB struct {
	// contains filtered or unexported fields
}

FileCheckpointsDB is a file based checkpoints DB

func NewFileCheckpointsDB

func NewFileCheckpointsDB(ctx context.Context, path string) (*FileCheckpointsDB, error)

NewFileCheckpointsDB creates a new FileCheckpointsDB

func NewFileCheckpointsDBWithExstorageFileName

func NewFileCheckpointsDBWithExstorageFileName(
	ctx context.Context,
	path string,
	s storage.ExternalStorage,
	fileName string,
) (*FileCheckpointsDB, error)

NewFileCheckpointsDBWithExstorageFileName creates a new FileCheckpointsDB with external storage and file name

func (*FileCheckpointsDB) Close

func (cpdb *FileCheckpointsDB) Close() error

Close implements CheckpointsDB.Close.

func (*FileCheckpointsDB) DestroyErrorCheckpoint

func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error)

DestroyErrorCheckpoint implements CheckpointsDB.DestroyErrorCheckpoint.

func (*FileCheckpointsDB) DumpChunks

func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error

DumpChunks implements CheckpointsDB.DumpChunks.

func (*FileCheckpointsDB) DumpEngines

func (cpdb *FileCheckpointsDB) DumpEngines(context.Context, io.Writer) error

DumpEngines implements CheckpointsDB.DumpEngines.

func (*FileCheckpointsDB) DumpTables

func (cpdb *FileCheckpointsDB) DumpTables(context.Context, io.Writer) error

DumpTables implements CheckpointsDB.DumpTables.

func (*FileCheckpointsDB) Get

func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error)

Get implements CheckpointsDB.Get.

func (*FileCheckpointsDB) GetLocalStoringTables

func (cpdb *FileCheckpointsDB) GetLocalStoringTables(_ context.Context) (map[string][]int32, error)

GetLocalStoringTables implements CheckpointsDB.GetLocalStoringTables.

func (*FileCheckpointsDB) IgnoreErrorCheckpoint

func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error

IgnoreErrorCheckpoint implements CheckpointsDB.IgnoreErrorCheckpoint.

func (*FileCheckpointsDB) Initialize

func (cpdb *FileCheckpointsDB) Initialize(_ context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error

Initialize implements CheckpointsDB.Initialize.

func (*FileCheckpointsDB) InsertEngineCheckpoints

func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error

InsertEngineCheckpoints implements CheckpointsDB.InsertEngineCheckpoints.

func (*FileCheckpointsDB) MoveCheckpoints

func (cpdb *FileCheckpointsDB) MoveCheckpoints(_ context.Context, taskID int64) error

MoveCheckpoints implements CheckpointsDB.MoveCheckpoints.

func (*FileCheckpointsDB) RemoveCheckpoint

func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error

RemoveCheckpoint implements CheckpointsDB.RemoveCheckpoint.

func (*FileCheckpointsDB) TaskCheckpoint

func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error)

TaskCheckpoint implements CheckpointsDB.TaskCheckpoint.

func (*FileCheckpointsDB) Update

func (cpdb *FileCheckpointsDB) Update(_ context.Context, checkpointDiffs map[string]*TableCheckpointDiff) error

Update implements CheckpointsDB.Update.

type MySQLCheckpointsDB

type MySQLCheckpointsDB struct {
	// contains filtered or unexported fields
}

MySQLCheckpointsDB is a checkpoints database for MySQL.

func NewMySQLCheckpointsDB

func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (*MySQLCheckpointsDB, error)

NewMySQLCheckpointsDB creates a new MySQLCheckpointsDB.

func (*MySQLCheckpointsDB) Close

func (cpdb *MySQLCheckpointsDB) Close() error

Close implements the DB interface.

func (*MySQLCheckpointsDB) DestroyErrorCheckpoint

func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)

DestroyErrorCheckpoint implements CheckpointsDB.DestroyErrorCheckpoint.

func (*MySQLCheckpointsDB) DumpChunks

func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer) error

DumpChunks implements CheckpointsDB.DumpChunks.

func (*MySQLCheckpointsDB) DumpEngines

func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Writer) error

DumpEngines implements CheckpointsDB.DumpEngines.

func (*MySQLCheckpointsDB) DumpTables

func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer) error

DumpTables implements CheckpointsDB.DumpTables.

func (*MySQLCheckpointsDB) Get

func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)

Get implements the DB interface.

func (*MySQLCheckpointsDB) GetLocalStoringTables

func (cpdb *MySQLCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)

GetLocalStoringTables implements CheckpointsDB.GetLocalStoringTables.

func (*MySQLCheckpointsDB) IgnoreErrorCheckpoint

func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error

IgnoreErrorCheckpoint implements CheckpointsDB.IgnoreErrorCheckpoint.

func (*MySQLCheckpointsDB) Initialize

func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error

Initialize implements the DB interface.

func (*MySQLCheckpointsDB) InsertEngineCheckpoints

func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error

InsertEngineCheckpoints implements the DB interface.

func (*MySQLCheckpointsDB) MoveCheckpoints

func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error

MoveCheckpoints implements CheckpointsDB.MoveCheckpoints.

func (*MySQLCheckpointsDB) RemoveCheckpoint

func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error

RemoveCheckpoint implements CheckpointsDB.RemoveCheckpoint.

func (*MySQLCheckpointsDB) TaskCheckpoint

func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)

TaskCheckpoint implements the DB interface.

func (*MySQLCheckpointsDB) Update

func (cpdb *MySQLCheckpointsDB) Update(taskCtx context.Context, checkpointDiffs map[string]*TableCheckpointDiff) error

Update implements the DB interface.

type NullCheckpointsDB

type NullCheckpointsDB struct{}

NullCheckpointsDB is a checkpoints database with no checkpoints.

func NewNullCheckpointsDB

func NewNullCheckpointsDB() *NullCheckpointsDB

NewNullCheckpointsDB creates a new NullCheckpointsDB.

func (*NullCheckpointsDB) Close

func (*NullCheckpointsDB) Close() error

Close implements the DB interface.

func (*NullCheckpointsDB) DestroyErrorCheckpoint

func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error)

DestroyErrorCheckpoint implements CheckpointsDB.DestroyErrorCheckpoint.

func (*NullCheckpointsDB) DumpChunks

DumpChunks implements CheckpointsDB.DumpChunks.

func (*NullCheckpointsDB) DumpEngines

DumpEngines implements CheckpointsDB.DumpEngines.

func (*NullCheckpointsDB) DumpTables

DumpTables implements CheckpointsDB.DumpTables.

func (*NullCheckpointsDB) Get

Get implements the DB interface.

func (*NullCheckpointsDB) GetLocalStoringTables

func (*NullCheckpointsDB) GetLocalStoringTables(context.Context) (map[string][]int32, error)

GetLocalStoringTables implements CheckpointsDB.GetLocalStoringTables.

func (*NullCheckpointsDB) IgnoreErrorCheckpoint

func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error

IgnoreErrorCheckpoint implements CheckpointsDB.IgnoreErrorCheckpoint.

func (*NullCheckpointsDB) Initialize

Initialize implements the DB interface.

func (*NullCheckpointsDB) InsertEngineCheckpoints

func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error

InsertEngineCheckpoints implements the DB interface.

func (*NullCheckpointsDB) MoveCheckpoints

func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error

MoveCheckpoints implements CheckpointsDB.MoveCheckpoints.

func (*NullCheckpointsDB) RemoveCheckpoint

func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error

RemoveCheckpoint implements CheckpointsDB.RemoveCheckpoint.

func (*NullCheckpointsDB) TaskCheckpoint

func (*NullCheckpointsDB) TaskCheckpoint(context.Context) (*TaskCheckpoint, error)

TaskCheckpoint implements the DB interface.

func (*NullCheckpointsDB) Update

Update implements the DB interface.

type RebaseCheckpointMerger

type RebaseCheckpointMerger struct {
	AllocBase int64
}

RebaseCheckpointMerger is the merger for rebasing the auto-increment ID.

func (*RebaseCheckpointMerger) MergeInto

func (merger *RebaseCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)

MergeInto implements TableCheckpointMerger.MergeInto.

type StatusCheckpointMerger

type StatusCheckpointMerger struct {
	EngineID int32 // WholeTableEngineID == apply to whole table.
	Status   CheckpointStatus
}

StatusCheckpointMerger is the merger for status updates.

func (*StatusCheckpointMerger) MergeInto

func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)

MergeInto implements TableCheckpointMerger.MergeInto.

func (*StatusCheckpointMerger) SetInvalid

func (merger *StatusCheckpointMerger) SetInvalid()

SetInvalid sets the status to an invalid value.

type TableCheckpoint

type TableCheckpoint struct {
	Status    CheckpointStatus
	AllocBase int64
	Engines   map[int32]*EngineCheckpoint
	TableID   int64
	// TableInfo is desired table info what we want to restore. When add-index-by-sql is enabled,
	// we will first drop indexes from target table, then restore data, then add indexes back. In case
	// of crash, this field will be used to save the dropped indexes, so we can add them back.
	TableInfo *model.TableInfo
	// remote checksum before restore
	Checksum verify.KVChecksum
}

TableCheckpoint is the checkpoint for a table.

func (*TableCheckpoint) Apply

func (cp *TableCheckpoint) Apply(cpd *TableCheckpointDiff)

Apply the diff to the existing chunk and engine checkpoints in `cp`.

func (*TableCheckpoint) CountChunks

func (cp *TableCheckpoint) CountChunks() int

CountChunks returns the number of chunks in the table checkpoint.

func (*TableCheckpoint) DeepCopy

func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint

DeepCopy returns a deep copy of the table checkpoint.

type TableCheckpointDiff

type TableCheckpointDiff struct {
	// contains filtered or unexported fields
}

TableCheckpointDiff is the difference between two table checkpoints.

func NewTableCheckpointDiff

func NewTableCheckpointDiff() *TableCheckpointDiff

NewTableCheckpointDiff returns a new TableCheckpointDiff.

func (*TableCheckpointDiff) String

func (cpd *TableCheckpointDiff) String() string

String implements fmt.Stringer interface.

type TableCheckpointMerger

type TableCheckpointMerger interface {
	// MergeInto the table checkpoint diff from a status update or chunk update.
	// If there are multiple updates to the same table, only the last one will
	// take effect. Therefore, the caller must ensure events for the same table
	// are properly ordered by the global time (an old event must be merged
	// before the new one).
	MergeInto(cpd *TableCheckpointDiff)
}

TableCheckpointMerger is the interface for merging table checkpoint diffs.

type TableChecksumMerger

type TableChecksumMerger struct {
	Checksum verify.KVChecksum
}

TableChecksumMerger is the merger for table checksums.

func (*TableChecksumMerger) MergeInto

func (m *TableChecksumMerger) MergeInto(cpd *TableCheckpointDiff)

MergeInto implements TableCheckpointMerger.MergeInto.

type TaskCheckpoint

type TaskCheckpoint struct {
	TaskID       int64
	SourceDir    string
	Backend      string
	ImporterAddr string
	TiDBHost     string
	TiDBPort     int
	PdAddr       string
	SortedKVDir  string
	LightningVer string
}

TaskCheckpoint is the checkpoint for a task.

type TidbDBInfo

type TidbDBInfo struct {
	ID     int64
	Name   string
	Tables map[string]*TidbTableInfo
}

TidbDBInfo is the database info in TiDB.

type TidbTableInfo

type TidbTableInfo struct {
	ID   int64
	DB   string
	Name string
	// Core is the current table info in TiDB.
	Core *model.TableInfo
	// Desired is the table info that we want to migrate to. In most cases, it is the same as Core. But
	// when we want to import index and data separately, we should first drop indices and then import data,
	// so the Core will be the table info without indices, but the Desired will be the table info with indices.
	Desired *model.TableInfo
}

TidbTableInfo is the table info in TiDB.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL