checkpoint

package
v1.1.0-beta.0...-70f5218 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2024 License: Apache-2.0, Apache-2.0 Imports: 31 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CheckpointBackupDir = CheckpointDir + "/backup"

	CheckpointDataDirForBackup     = CheckpointBackupDir + "/data"
	CheckpointChecksumDirForBackup = CheckpointBackupDir + "/checksum"
	CheckpointMetaPathForBackup    = CheckpointBackupDir + "/checkpoint.meta"
	CheckpointLockPathForBackup    = CheckpointBackupDir + "/checkpoint.lock"
)
View Source
const (
	LogRestoreCheckpointDatabaseName      string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint"
	SnapshotRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint"
)

Notice that: 1. the checkpoint table only records one task checkpoint. 2. BR regards the metadata table as a file so that it is not empty if the table exists. 3. BR regards the checkpoint table as a directory which is managed by metadata table.

View Source
const CheckpointDir = "checkpoints"
View Source
const CheckpointIdMapBlockSize int = 524288
View Source
const MaxChecksumTotalCost float64 = 60.0

Variables

This section is empty.

Functions

func AppendForBackup

func AppendForBackup(
	ctx context.Context,
	r *CheckpointRunner[BackupKeyType, BackupValueType],
	groupKey BackupKeyType,
	startKey []byte,
	endKey []byte,
	files []*backuppb.File,
) error

func AppendRangeForLogRestore

func AppendRangeForLogRestore(
	ctx context.Context,
	r *CheckpointRunner[LogRestoreKeyType, LogRestoreValueType],
	groupKey LogRestoreKeyType,
	tableID int64,
	goff int,
	foff int,
) error

func AppendRangesForRestore

func AppendRangesForRestore(
	ctx context.Context,
	r *CheckpointRunner[RestoreKeyType, RestoreValueType],
	tableID RestoreKeyType,
	rangeKey string,
) error

func ExistsCheckpointIngestIndexRepairSQLs

func ExistsCheckpointIngestIndexRepairSQLs(ctx context.Context, dom *domain.Domain) bool

func ExistsCheckpointProgress

func ExistsCheckpointProgress(
	ctx context.Context,
	dom *domain.Domain,
) bool

func ExistsLogRestoreCheckpointMetadata

func ExistsLogRestoreCheckpointMetadata(
	ctx context.Context,
	dom *domain.Domain,
) bool

func ExistsSnapshotRestoreCheckpoint

func ExistsSnapshotRestoreCheckpoint(
	ctx context.Context,
	dom *domain.Domain,
) bool

func IsCheckpointDB

func IsCheckpointDB(dbname pmodel.CIStr) bool

IsCheckpointDB checks whether the dbname is checkpoint database.

func LoadCheckpointChecksumForRestore

func LoadCheckpointChecksumForRestore(
	ctx context.Context,
	execCtx sqlexec.RestrictedSQLExecutor,
) (map[int64]*ChecksumItem, time.Duration, error)

func LoadCheckpointDataForLogRestore

func LoadCheckpointDataForLogRestore[K KeyType, V ValueType](
	ctx context.Context,
	execCtx sqlexec.RestrictedSQLExecutor,
	fn func(K, V),
) (time.Duration, error)

load the whole checkpoint range data and retrieve the metadata of restored ranges and return the total time cost in the past executions

func LoadCheckpointDataForSnapshotRestore

func LoadCheckpointDataForSnapshotRestore[K KeyType, V ValueType](
	ctx context.Context,
	execCtx sqlexec.RestrictedSQLExecutor,
	fn func(K, V),
) (time.Duration, error)

load the whole checkpoint range data and retrieve the metadata of restored ranges and return the total time cost in the past executions

func RemoveCheckpointDataForBackup

func RemoveCheckpointDataForBackup(ctx context.Context, s storage.ExternalStorage) error

func RemoveCheckpointDataForLogRestore

func RemoveCheckpointDataForLogRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error

func RemoveCheckpointDataForSnapshotRestore

func RemoveCheckpointDataForSnapshotRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error

func SaveCheckpointIngestIndexRepairSQLs

func SaveCheckpointIngestIndexRepairSQLs(
	ctx context.Context,
	se glue.Session,
	meta *CheckpointIngestIndexRepairSQLs,
) error

func SaveCheckpointMetadata

func SaveCheckpointMetadata(ctx context.Context, s storage.ExternalStorage, meta *CheckpointMetadataForBackup) error

save the checkpoint metadata into the external storage

func SaveCheckpointMetadataForLogRestore

func SaveCheckpointMetadataForLogRestore(
	ctx context.Context,
	se glue.Session,
	meta *CheckpointMetadataForLogRestore,
) error

func SaveCheckpointMetadataForSnapshotRestore

func SaveCheckpointMetadataForSnapshotRestore(
	ctx context.Context,
	se glue.Session,
	meta *CheckpointMetadataForSnapshotRestore,
) error

func SaveCheckpointProgress

func SaveCheckpointProgress(
	ctx context.Context,
	se glue.Session,
	meta *CheckpointProgress,
) error

func WalkCheckpointFileForBackup

func WalkCheckpointFileForBackup(
	ctx context.Context,
	s storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	fn func(BackupKeyType, BackupValueType),
) (time.Duration, error)

walk the whole checkpoint range files and retrieve the metadata of backed up ranges and return the total time cost in the past executions

Types

type BackupKeyType

type BackupKeyType = string

type BackupValueType

type BackupValueType = RangeType

type CheckpointData

type CheckpointData struct {
	DureTime        time.Duration     `json:"dure-time"`
	RangeGroupMetas []*RangeGroupData `json:"range-group-metas"`
}

type CheckpointIngestIndexRepairSQL

type CheckpointIngestIndexRepairSQL struct {
	IndexID    int64        `json:"index-id"`
	SchemaName pmodel.CIStr `json:"schema-name"`
	TableName  pmodel.CIStr `json:"table-name"`
	IndexName  string       `json:"index-name"`
	AddSQL     string       `json:"add-sql"`
	AddArgs    []any        `json:"add-args"`
}

type CheckpointIngestIndexRepairSQLs

type CheckpointIngestIndexRepairSQLs struct {
	SQLs []CheckpointIngestIndexRepairSQL
}

type CheckpointLock

type CheckpointLock struct {
	LockId   uint64 `json:"lock-id"`
	ExpireAt int64  `json:"expire-at"`
}

type CheckpointMessage

type CheckpointMessage[K KeyType, V ValueType] struct {
	// start-key of the origin range
	GroupKey K

	Group []V
}

type CheckpointMetadataForBackup

type CheckpointMetadataForBackup struct {
	GCServiceId string        `json:"gc-service-id"`
	ConfigHash  []byte        `json:"config-hash"`
	BackupTS    uint64        `json:"backup-ts"`
	Ranges      []rtree.Range `json:"ranges"`

	CheckpointChecksum map[int64]*ChecksumItem    `json:"-"`
	CheckpointDataMap  map[string]rtree.RangeTree `json:"-"`
}

func LoadCheckpointMetadata

func LoadCheckpointMetadata(ctx context.Context, s storage.ExternalStorage) (*CheckpointMetadataForBackup, error)

load checkpoint metadata from the external storage

type CheckpointMetadataForLogRestore

type CheckpointMetadataForLogRestore struct {
	UpstreamClusterID uint64 `json:"upstream-cluster-id"`
	RestoredTS        uint64 `json:"restored-ts"`
	StartTS           uint64 `json:"start-ts"`
	RewriteTS         uint64 `json:"rewrite-ts"`
	GcRatio           string `json:"gc-ratio"`
	// tiflash recorder items with snapshot restore records
	TiFlashItems map[int64]model.TiFlashReplicaInfo `json:"tiflash-recorder,omitempty"`
}

type CheckpointMetadataForSnapshotRestore

type CheckpointMetadataForSnapshotRestore struct {
	UpstreamClusterID uint64                `json:"upstream-cluster-id"`
	RestoredTS        uint64                `json:"restored-ts"`
	SchedulersConfig  *pdutil.ClusterConfig `json:"schedulers-config"`
}

type CheckpointProgress

type CheckpointProgress struct {
	Progress RestoreProgress `json:"progress"`
}

func LoadCheckpointProgress

func LoadCheckpointProgress(
	ctx context.Context,
	execCtx sqlexec.RestrictedSQLExecutor,
) (*CheckpointProgress, error)

type CheckpointRunner

type CheckpointRunner[K KeyType, V ValueType] struct {
	// contains filtered or unexported fields
}

func StartCheckpointBackupRunnerForTest

func StartCheckpointBackupRunnerForTest(
	ctx context.Context,
	storage storage.ExternalStorage,
	cipher *backuppb.CipherInfo,
	tick time.Duration,
	timer GlobalTimer,
) (*CheckpointRunner[BackupKeyType, BackupValueType], error)

only for test

func StartCheckpointLogRestoreRunnerForTest

func StartCheckpointLogRestoreRunnerForTest(
	ctx context.Context,
	se glue.Session,
	tick time.Duration,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)

only for test

func StartCheckpointRestoreRunnerForTest

func StartCheckpointRestoreRunnerForTest(
	ctx context.Context,
	se glue.Session,
	tick time.Duration,
	retryDuration time.Duration,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)

only for test

func StartCheckpointRunnerForLogRestore

func StartCheckpointRunnerForLogRestore(
	ctx context.Context,
	se glue.Session,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error)

Notice that the session is owned by the checkpoint runner, and it will be also closed by it.

func StartCheckpointRunnerForRestore

func StartCheckpointRunnerForRestore(
	ctx context.Context,
	se glue.Session,
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error)

Notice that the session is owned by the checkpoint runner, and it will be also closed by it.

func (*CheckpointRunner[K, V]) Append

func (r *CheckpointRunner[K, V]) Append(
	ctx context.Context,
	message *CheckpointMessage[K, V],
) error

func (*CheckpointRunner[K, V]) FlushChecksum

func (r *CheckpointRunner[K, V]) FlushChecksum(
	ctx context.Context,
	tableID int64,
	crc64xor uint64,
	totalKvs uint64,
	totalBytes uint64,
) error

func (*CheckpointRunner[K, V]) FlushChecksumItem

func (r *CheckpointRunner[K, V]) FlushChecksumItem(
	ctx context.Context,
	checksumItem *ChecksumItem,
) error

func (*CheckpointRunner[K, V]) WaitForFinish

func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)

Note: Cannot be parallel with `Append` function

type CheckpointTaskInfoForLogRestore

type CheckpointTaskInfoForLogRestore struct {
	Metadata            *CheckpointMetadataForLogRestore
	HasSnapshotMetadata bool
	// the progress for this task
	Progress RestoreProgress
}

CheckpointTaskInfo is unique information within the same cluster id. It represents the last restore task executed for this cluster.

type ChecksumInfo

type ChecksumInfo struct {
	Content  []byte        `json:"content"`
	Checksum []byte        `json:"checksum"`
	DureTime time.Duration `json:"dure-time"`
}

type ChecksumItem

type ChecksumItem struct {
	TableID    int64  `json:"table-id"`
	Crc64xor   uint64 `json:"crc64-xor"`
	TotalKvs   uint64 `json:"total-kvs"`
	TotalBytes uint64 `json:"total-bytes"`
}

type ChecksumItems

type ChecksumItems struct {
	Items []*ChecksumItem `json:"checksum-items"`
}

type GlobalTimer

type GlobalTimer interface {
	GetTS(context.Context) (int64, int64, error)
}

type KeyType

type KeyType interface {
	~BackupKeyType | ~RestoreKeyType
}

type LogRestoreKeyType

type LogRestoreKeyType = string

type LogRestoreValueMarshaled

type LogRestoreValueMarshaled struct {
	// group index in the metadata
	Goff int `json:"goff"`
	// downstream table id -> file indexes in the group
	Foffs map[int64][]int `json:"foffs"`
}

func (LogRestoreValueMarshaled) IdentKey

func (l LogRestoreValueMarshaled) IdentKey() []byte

type LogRestoreValueType

type LogRestoreValueType struct {
	// downstream table id
	TableID int64
	// group index in the metadata
	Goff int
	// file index in the group
	Foff int
}

func (LogRestoreValueType) IdentKey

func (l LogRestoreValueType) IdentKey() []byte

type RangeGroup

type RangeGroup[K KeyType, V ValueType] struct {
	GroupKey K   `json:"group-key"`
	Group    []V `json:"groups"`
}

type RangeGroupData

type RangeGroupData struct {
	RangeGroupsEncriptedData []byte
	Checksum                 []byte
	CipherIv                 []byte

	Size int
}

type RangeType

type RangeType struct {
	*rtree.Range
}

func (RangeType) IdentKey

func (r RangeType) IdentKey() []byte

type RestoreKeyType

type RestoreKeyType = int64

type RestoreProgress

type RestoreProgress int

A progress type for snapshot + log restore.

Before the id-maps is persist into external storage, the snapshot restore and id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`, it can retry from snapshot restore.

After the id-maps is persist into external storage, there are some meta-kvs has been restored into the cluster, such as `rename ddl`. Where would be a situation:

the first execution:

table A created in snapshot restore is renamed to table B in log restore
     table A (id 80)       -------------->        table B (id 80)
  ( snapshot restore )                            ( log restore )

the second execution if don't skip snasphot restore:

table A is created again in snapshot restore, because there is no table named A
     table A (id 81)       -------------->   [not in id-maps, so ignored]
  ( snapshot restore )                            ( log restore )

Finally, there is a duplicated table A in the cluster. Therefore, need to skip snapshot restore when the progress is `InLogRestoreAndIdMapPersist`.

const (
	InSnapshotRestore RestoreProgress = iota
	// Only when the id-maps is persist, status turns into it.
	InLogRestoreAndIdMapPersist
)

type RestoreValueType

type RestoreValueType struct {
	// the file key of a range
	RangeKey string
}

func (RestoreValueType) IdentKey

func (rv RestoreValueType) IdentKey() []byte

type TimeTicker

type TimeTicker interface {
	Ch() <-chan time.Time
	Stop()
}

type ValueType

type ValueType interface {
	IdentKey() []byte
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL