Documentation ¶
Index ¶
- Constants
- func AppendForBackup(ctx context.Context, r *CheckpointRunner[BackupKeyType, BackupValueType], ...) error
- func AppendRangeForLogRestore(ctx context.Context, ...) error
- func AppendRangesForRestore(ctx context.Context, r *CheckpointRunner[RestoreKeyType, RestoreValueType], ...) error
- func ExistsCheckpointIngestIndexRepairSQLs(ctx context.Context, dom *domain.Domain) bool
- func ExistsCheckpointProgress(ctx context.Context, dom *domain.Domain) bool
- func ExistsLogRestoreCheckpointMetadata(ctx context.Context, dom *domain.Domain) bool
- func ExistsSstRestoreCheckpoint(ctx context.Context, dom *domain.Domain, dbName string) bool
- func IsCheckpointDB(dbname ast.CIStr) bool
- func LoadCheckpointChecksumForRestore(ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor) (map[int64]*ChecksumItem, time.Duration, error)
- func LoadCheckpointDataForLogRestore[K KeyType, V ValueType](ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, fn func(K, V)) (time.Duration, error)
- func LoadCheckpointDataForSstRestore[K KeyType, V ValueType](ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, dbName string, ...) (time.Duration, error)
- func RemoveCheckpointDataForBackup(ctx context.Context, s storage.ExternalStorage) error
- func RemoveCheckpointDataForLogRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error
- func RemoveCheckpointDataForSstRestore(ctx context.Context, dom *domain.Domain, se glue.Session, dbName string) error
- func SaveCheckpointIngestIndexRepairSQLs(ctx context.Context, se glue.Session, meta *CheckpointIngestIndexRepairSQLs) error
- func SaveCheckpointMetadata(ctx context.Context, s storage.ExternalStorage, ...) error
- func SaveCheckpointMetadataForLogRestore(ctx context.Context, se glue.Session, meta *CheckpointMetadataForLogRestore) error
- func SaveCheckpointMetadataForSstRestore(ctx context.Context, se glue.Session, dbName string, ...) error
- func SaveCheckpointProgress(ctx context.Context, se glue.Session, meta *CheckpointProgress) error
- func WalkCheckpointFileForBackup(ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, ...) (time.Duration, error)
- type BackupKeyType
- type BackupValueType
- type CheckpointData
- type CheckpointIngestIndexRepairSQL
- type CheckpointIngestIndexRepairSQLs
- type CheckpointItem
- type CheckpointLock
- type CheckpointMessage
- type CheckpointMetadataForBackup
- type CheckpointMetadataForLogRestore
- type CheckpointMetadataForSnapshotRestore
- type CheckpointProgress
- type CheckpointRunner
- func StartCheckpointBackupRunnerForTest(ctx context.Context, storage storage.ExternalStorage, ...) (*CheckpointRunner[BackupKeyType, BackupValueType], error)
- func StartCheckpointLogRestoreRunnerForTest(ctx context.Context, se glue.Session, tick time.Duration) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)
- func StartCheckpointRestoreRunnerForTest(ctx context.Context, se glue.Session, dbName string, tick time.Duration, ...) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)
- func StartCheckpointRunnerForBackup(ctx context.Context, storage storage.ExternalStorage, ...) (*CheckpointRunner[BackupKeyType, BackupValueType], error)
- func StartCheckpointRunnerForLogRestore(ctx context.Context, se glue.Session) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)
- func StartCheckpointRunnerForRestore(ctx context.Context, se glue.Session, dbName string) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)
- func (r *CheckpointRunner[K, V]) Append(ctx context.Context, message *CheckpointMessage[K, V]) error
- func (r *CheckpointRunner[K, V]) FlushChecksum(ctx context.Context, tableID int64, crc64xor uint64, totalKvs uint64, ...) error
- func (r *CheckpointRunner[K, V]) FlushChecksumItem(ctx context.Context, checksumItem *ChecksumItem) error
- func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)
- type CheckpointTaskInfoForLogRestore
- type ChecksumInfo
- type ChecksumItem
- type ChecksumItems
- type GlobalTimer
- type KeyType
- type LogRestoreKeyType
- type LogRestoreValueMarshaled
- type LogRestoreValueType
- type RangeGroup
- type RangeGroupData
- type RangeType
- type RestoreKeyType
- type RestoreProgress
- type RestoreValueType
- type TimeTicker
- type ValueType
Constants ¶
const ( CheckpointBackupDir = CheckpointDir + "/backup" CheckpointDataDirForBackup = CheckpointBackupDir + "/data" CheckpointChecksumDirForBackup = CheckpointBackupDir + "/checksum" CheckpointMetaPathForBackup = CheckpointBackupDir + "/checkpoint.meta" CheckpointLockPathForBackup = CheckpointBackupDir + "/checkpoint.lock" )
const ( LogRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint" SnapshotRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint" CustomSSTRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Custom_SST_Restore_Checkpoint" )
Notice that: 1. the checkpoint table only records one task checkpoint. 2. BR regards the metadata table as a file so that it is not empty if the table exists. 3. BR regards the checkpoint table as a directory which is managed by metadata table.
const CheckpointDir = "checkpoints"
const CheckpointIdMapBlockSize int = 524288
const MaxChecksumTotalCost float64 = 60.0
Variables ¶
This section is empty.
Functions ¶
func AppendForBackup ¶
func AppendForBackup( ctx context.Context, r *CheckpointRunner[BackupKeyType, BackupValueType], groupKey BackupKeyType, startKey []byte, endKey []byte, files []*backuppb.File, ) error
func AppendRangeForLogRestore ¶
func AppendRangeForLogRestore( ctx context.Context, r *CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], groupKey LogRestoreKeyType, tableID int64, goff int, foff int, ) error
func AppendRangesForRestore ¶
func AppendRangesForRestore( ctx context.Context, r *CheckpointRunner[RestoreKeyType, RestoreValueType], c *CheckpointItem, ) error
func IsCheckpointDB ¶
IsCheckpointDB checks whether the dbname is checkpoint database.
func LoadCheckpointChecksumForRestore ¶
func LoadCheckpointChecksumForRestore( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (map[int64]*ChecksumItem, time.Duration, error)
func LoadCheckpointDataForLogRestore ¶
func LoadCheckpointDataForLogRestore[K KeyType, V ValueType]( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, fn func(K, V), ) (time.Duration, error)
load the whole checkpoint range data and retrieve the metadata of restored ranges and return the total time cost in the past executions
func LoadCheckpointDataForSstRestore ¶
func LoadCheckpointDataForSstRestore[K KeyType, V ValueType]( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, dbName string, fn func(K, V), ) (time.Duration, error)
load the whole checkpoint range data and retrieve the metadata of restored ranges and return the total time cost in the past executions
func RemoveCheckpointDataForBackup ¶
func RemoveCheckpointDataForBackup(ctx context.Context, s storage.ExternalStorage) error
func SaveCheckpointMetadata ¶
func SaveCheckpointMetadata(ctx context.Context, s storage.ExternalStorage, meta *CheckpointMetadataForBackup) error
save the checkpoint metadata into the external storage
func SaveCheckpointProgress ¶
func WalkCheckpointFileForBackup ¶
func WalkCheckpointFileForBackup( ctx context.Context, s storage.ExternalStorage, cipher *backuppb.CipherInfo, fn func(BackupKeyType, BackupValueType), ) (time.Duration, error)
walk the whole checkpoint range files and retrieve the metadata of backed up ranges and return the total time cost in the past executions
Types ¶
type BackupKeyType ¶
type BackupKeyType = string
type BackupValueType ¶
type BackupValueType = RangeType
type CheckpointData ¶
type CheckpointData struct { DureTime time.Duration `json:"dure-time"` RangeGroupMetas []*RangeGroupData `json:"range-group-metas"` }
type CheckpointIngestIndexRepairSQLs ¶
type CheckpointIngestIndexRepairSQLs struct {
SQLs []CheckpointIngestIndexRepairSQL
}
func LoadCheckpointIngestIndexRepairSQLs ¶
func LoadCheckpointIngestIndexRepairSQLs( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointIngestIndexRepairSQLs, error)
type CheckpointItem ¶
type CheckpointItem struct {
// contains filtered or unexported fields
}
func NewCheckpointFileItem ¶
func NewCheckpointFileItem(tableID RestoreKeyType, fileName string) *CheckpointItem
func NewCheckpointRangeKeyItem ¶
func NewCheckpointRangeKeyItem(tableID RestoreKeyType, rangeKey string) *CheckpointItem
type CheckpointLock ¶
type CheckpointMessage ¶
type CheckpointMetadataForBackup ¶
type CheckpointMetadataForBackup struct { GCServiceId string `json:"gc-service-id"` ConfigHash []byte `json:"config-hash"` BackupTS uint64 `json:"backup-ts"` Ranges []rtree.Range `json:"ranges"` CheckpointChecksum map[int64]*ChecksumItem `json:"-"` CheckpointDataMap map[string]rtree.RangeTree `json:"-"` }
func LoadCheckpointMetadata ¶
func LoadCheckpointMetadata(ctx context.Context, s storage.ExternalStorage) (*CheckpointMetadataForBackup, error)
load checkpoint metadata from the external storage
type CheckpointMetadataForLogRestore ¶
type CheckpointMetadataForLogRestore struct { UpstreamClusterID uint64 `json:"upstream-cluster-id"` RestoredTS uint64 `json:"restored-ts"` StartTS uint64 `json:"start-ts"` RewriteTS uint64 `json:"rewrite-ts"` GcRatio string `json:"gc-ratio"` // tiflash recorder items with snapshot restore records TiFlashItems map[int64]model.TiFlashReplicaInfo `json:"tiflash-recorder,omitempty"` }
func LoadCheckpointMetadataForLogRestore ¶
func LoadCheckpointMetadataForLogRestore( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointMetadataForLogRestore, error)
type CheckpointMetadataForSnapshotRestore ¶
type CheckpointMetadataForSnapshotRestore struct { UpstreamClusterID uint64 `json:"upstream-cluster-id"` RestoredTS uint64 `json:"restored-ts"` SchedulersConfig *pdutil.ClusterConfig `json:"schedulers-config"` }
func LoadCheckpointMetadataForSnapshotRestore ¶
func LoadCheckpointMetadataForSnapshotRestore( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointMetadataForSnapshotRestore, error)
type CheckpointProgress ¶
type CheckpointProgress struct {
Progress RestoreProgress `json:"progress"`
}
func LoadCheckpointProgress ¶
func LoadCheckpointProgress( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointProgress, error)
type CheckpointRunner ¶
func StartCheckpointBackupRunnerForTest ¶
func StartCheckpointBackupRunnerForTest( ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, tick time.Duration, timer GlobalTimer, ) (*CheckpointRunner[BackupKeyType, BackupValueType], error)
only for test
func StartCheckpointLogRestoreRunnerForTest ¶
func StartCheckpointLogRestoreRunnerForTest( ctx context.Context, se glue.Session, tick time.Duration, ) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)
only for test
func StartCheckpointRestoreRunnerForTest ¶
func StartCheckpointRestoreRunnerForTest( ctx context.Context, se glue.Session, dbName string, tick time.Duration, retryDuration time.Duration, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)
only for test
func StartCheckpointRunnerForBackup ¶
func StartCheckpointRunnerForBackup( ctx context.Context, storage storage.ExternalStorage, cipher *backuppb.CipherInfo, timer GlobalTimer, ) (*CheckpointRunner[BackupKeyType, BackupValueType], error)
func StartCheckpointRunnerForLogRestore ¶
func StartCheckpointRunnerForLogRestore( ctx context.Context, se glue.Session, ) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)
Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForRestore ¶
func StartCheckpointRunnerForRestore( ctx context.Context, se glue.Session, dbName string, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)
Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func (*CheckpointRunner[K, V]) Append ¶
func (r *CheckpointRunner[K, V]) Append( ctx context.Context, message *CheckpointMessage[K, V], ) error
func (*CheckpointRunner[K, V]) FlushChecksum ¶
func (*CheckpointRunner[K, V]) FlushChecksumItem ¶
func (r *CheckpointRunner[K, V]) FlushChecksumItem( ctx context.Context, checksumItem *ChecksumItem, ) error
func (*CheckpointRunner[K, V]) WaitForFinish ¶
func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)
Note: Cannot be parallel with `Append` function
type CheckpointTaskInfoForLogRestore ¶
type CheckpointTaskInfoForLogRestore struct { Metadata *CheckpointMetadataForLogRestore HasSnapshotMetadata bool // the progress for this task Progress RestoreProgress }
CheckpointTaskInfo is unique information within the same cluster id. It represents the last restore task executed for this cluster.
func TryToGetCheckpointTaskInfo ¶
func TryToGetCheckpointTaskInfo( ctx context.Context, dom *domain.Domain, execCtx sqlexec.RestrictedSQLExecutor, ) (*CheckpointTaskInfoForLogRestore, error)
type ChecksumInfo ¶
type ChecksumItem ¶
type ChecksumItems ¶
type ChecksumItems struct {
Items []*ChecksumItem `json:"checksum-items"`
}
type KeyType ¶
type KeyType interface { ~BackupKeyType | ~RestoreKeyType }
type LogRestoreKeyType ¶
type LogRestoreKeyType = string
type LogRestoreValueType ¶
type RangeGroup ¶
type RangeGroupData ¶
type RestoreKeyType ¶
type RestoreKeyType = int64
type RestoreProgress ¶
type RestoreProgress int
A progress type for snapshot + log restore.
Before the id-maps is persist into external storage, the snapshot restore and id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`, it can retry from snapshot restore.
After the id-maps is persist into external storage, there are some meta-kvs has been restored into the cluster, such as `rename ddl`. Where would be a situation:
the first execution:
table A created in snapshot restore is renamed to table B in log restore table A (id 80) --------------> table B (id 80) ( snapshot restore ) ( log restore )
the second execution if don't skip snasphot restore:
table A is created again in snapshot restore, because there is no table named A table A (id 81) --------------> [not in id-maps, so ignored] ( snapshot restore ) ( log restore )
Finally, there is a duplicated table A in the cluster. Therefore, need to skip snapshot restore when the progress is `InLogRestoreAndIdMapPersist`.
const ( InSnapshotRestore RestoreProgress = iota // Only when the id-maps is persist, status turns into it. InLogRestoreAndIdMapPersist )