restore

package
v0.0.0-...-503c688 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 8, 2023 License: Apache-2.0, Apache-2.0 Imports: 93 Imported by: 0

Documentation

Overview

Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

Index

Constants

View Source
const (
	MetaKVBatchSize = 64 * 1024 * 1024
)
View Source
const SplitFilesBufferSize = 4096
View Source
const (
	// TruncateSafePointFileName is the filename that the ts(the log have been truncated) is saved into.
	TruncateSafePointFileName = "v1_stream_trancate_safepoint.txt"
)

Variables

This section is empty.

Functions

func ApplyKVFilesWithBatchMethod

func ApplyKVFilesWithBatchMethod(
	ctx context.Context,
	logIter LogIter,
	batchCount int,
	batchSize uint64,
	applyFunc func(files []*LogDataFileInfo, kvCount int64, size uint64),
	applyWg *sync.WaitGroup,
) error

func ApplyKVFilesWithSingelMethod

func ApplyKVFilesWithSingelMethod(
	ctx context.Context,
	files LogIter,
	applyFunc func(file []*LogDataFileInfo, kvCount int64, size uint64),
	applyWg *sync.WaitGroup,
) error

func CheckConsistencyAndValidPeer

func CheckConsistencyAndValidPeer(regionInfos []*RecoverRegionInfo) (map[uint64]struct{}, error)

func CheckKeyspaceBREnable

func CheckKeyspaceBREnable(ctx context.Context, pdClient pd.Client) error

func CheckNewCollationEnable

func CheckNewCollationEnable(
	backupNewCollationEnable string,
	g glue.Glue,
	storage kv.Storage,
	CheckRequirements bool,
) error

func CreateLogFileManager

func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFileManager, error)

CreateLogFileManager creates a log file manager using the specified config. Generally the config cannot be changed during its lifetime.

func DDLJobBlockListRule

func DDLJobBlockListRule(ddlJob *model.Job) bool

DDLJobBlockListRule rule for filter ddl job with type in block list.

func EstimateRangeSize

func EstimateRangeSize(files []*backuppb.File) int

EstimateRangeSize estimates the total range count by file.

func Exhaust

func Exhaust(ec <-chan error) []error

Exhaust drains all remaining errors in the channel, into a slice of errors.

func FilterDDLJobByRules

func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job)

FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.

func FilterDDLJobs

func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs []*model.Job)

FilterDDLJobs filters ddl jobs.

func GetExistedUserDBs

func GetExistedUserDBs(dom *domain.Domain) []*model.DBInfo

GetExistedUserDBs get dbs created or modified by users

func GetKeyTS

func GetKeyTS(key []byte) (uint64, error)

func GetRewriteEncodedKeys

func GetRewriteEncodedKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error)

GetRewriteRawKeys rewrites rules to the encoded key

func GetRewriteRawKeys

func GetRewriteRawKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error)

GetRewriteRawKeys rewrites rules to the raw key.

func GetRewriteRulesMap

func GetRewriteRulesMap(
	newTable, oldTable *model.TableInfo, newTimeStamp uint64, getDetailRule bool,
) map[int64]*RewriteRules

func GetRewriteTableID

func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64

GetRewriteTableID gets rewrite table id by the rewrite rule and original table id

func GetSSTMetaFromFile

func GetSSTMetaFromFile(
	id []byte,
	file *backuppb.File,
	region *metapb.Region,
	regionRule *import_sstpb.RewriteRule,
	rewriteMode RewriteMode,
) (meta *import_sstpb.SSTMeta, err error)

GetSSTMetaFromFile compares the keys in file, region and rewrite rules, then returns a sst conn. The range of the returned sst meta is [regionRule.NewKeyPrefix, append(regionRule.NewKeyPrefix, 0xff)].

func GetTSFromFile

func GetTSFromFile(
	ctx context.Context,
	s storage.ExternalStorage,
	filename string,
) (uint64, error)

GetTSFromFile gets the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.

func GoValidateFileRanges

func GoValidateFileRanges(
	ctx context.Context,
	tableStream <-chan CreatedTable,
	fileOfTable map[int64][]*backuppb.File,
	splitSizeBytes, splitKeyCount uint64,
	errCh chan<- error,
) <-chan TableWithRange

GoValidateFileRanges validate files by a stream of tables and yields tables with range.

func MapTableToFiles

func MapTableToFiles(files []*backuppb.File) map[int64][]*backuppb.File

MapTableToFiles makes a map that mapping table ID to its backup files. aware that one file can and only can hold one table.

func MockCallSetSpeedLimit

func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient, rc *Client, concurrency uint) error

Mock the call of setSpeedLimit function

func NeedSplit

func NeedSplit(splitKey []byte, regions []*split.RegionInfo, isRawKv bool) *split.RegionInfo

NeedSplit checks whether a key is necessary to split, if true returns the split region.

func NeedsMerge

func NeedsMerge(left, right *rtree.Range, splitSizeBytes, splitKeyCount uint64) bool

NeedsMerge checks whether two ranges needs to be merged.

func NewSplitHelperIteratorForTest

func NewSplitHelperIteratorForTest(helper *split.SplitHelper, tableID int64, rule *RewriteRules) *splitHelperIterator

func ParseQuoteName

func ParseQuoteName(name string) (db, table string)

ParseQuoteName parse the quote `db`.`table` name, and split it.

func PrefixEndKey

func PrefixEndKey(key []byte) []byte

func PrefixStartKey

func PrefixStartKey(key []byte) []byte

func RecoverData

func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error)

RecoverData recover the tikv cluster 1. read all meta data from tikvs 2. make recovery plan and then recovery max allocate ID firstly 3. send the recover plan and the wait tikv to apply, in waitapply, all assigned region leader will check apply log to the last log 4. ensure all region apply to last log 5. prepare the flashback 6. flashback to resolveTS

func ReplaceMetadata

func ReplaceMetadata(meta *backuppb.Metadata, filegroups []*backuppb.DataFileGroup)

replace the filegroups and update the ts of the replaced metadata

func SetTSToFile

func SetTSToFile(
	ctx context.Context,
	s storage.ExternalStorage,
	safepoint uint64,
	filename string,
) error

SetTSToFile overrides the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.

func SortMetaKVFiles

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo

func SortRanges

func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range, error)

SortRanges checks if the range overlapped and sort them.

func SplitPoint

func SplitPoint(
	ctx context.Context,
	iter *splitHelperIterator,
	client split.SplitClient,
	splitF splitFunc,
) (err error)

SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region

func SplitRanges

func SplitRanges(
	ctx context.Context,
	client *Client,
	ranges []rtree.Range,
	rewriteRules *RewriteRules,
	updateCh glue.Progress,
	isRawKv bool,
) error

SplitRanges splits region by 1. data range after rewrite. 2. rewrite rules.

func TruncateTS

func TruncateTS(key []byte) []byte

func UpdateShiftTS

func UpdateShiftTS(m *backuppb.Metadata, startTS uint64, restoreTS uint64) (uint64, bool)

func ValidateFileRewriteRule

func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) error

ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file.

func ZapTables

func ZapTables(tables []CreatedTable) zapcore.Field

ZapTables make zap field of table for debuging, including table names.

Types

type AppliedFile

type AppliedFile interface {
	GetStartKey() []byte
	GetEndKey() []byte
}

AppliedFile has two types for now. 1. SST file used by full backup/restore. 2. KV file used by pitr restore.

type BatchSender

type BatchSender interface {
	// PutSink sets the sink of this sender, user to this interface promise
	// call this function at least once before first call to `RestoreBatch`.
	PutSink(sink TableSink)
	// RestoreBatch will send the restore request.
	RestoreBatch(ranges DrainResult)
	Close()
}

BatchSender is the abstract of how the batcher send a batch.

func NewTiKVSender

func NewTiKVSender(
	ctx context.Context,
	cli TiKVRestorer,
	updateCh glue.Progress,
	splitConcurrency uint,
) (BatchSender, error)

NewTiKVSender make a sender that send restore requests to TiKV.

type Batcher

type Batcher struct {
	// contains filtered or unexported fields
}

Batcher collects ranges to restore and send batching split/ingest request.

func NewBatcher

func NewBatcher(
	ctx context.Context,
	sender BatchSender,
	manager ContextManager,
	errCh chan<- error,
	updateCh glue.Progress,
) (*Batcher, <-chan CreatedTable)

NewBatcher creates a new batcher by a sender and a context manager. the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where). the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit). this batcher will work background, send batches per second, or batch size reaches limit. and it will emit full-restored tables to the output channel returned.

func (*Batcher) Add

func (b *Batcher) Add(tbs TableWithRange)

Add adds a task to the Batcher.

func (*Batcher) Close

func (b *Batcher) Close()

Close closes the batcher, sending all pending requests, close updateCh.

func (*Batcher) DisableAutoCommit

func (b *Batcher) DisableAutoCommit()

DisableAutoCommit blocks the current goroutine until the worker can gracefully stop, and then disable auto commit.

func (*Batcher) EnableAutoCommit

func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration)

EnableAutoCommit enables the batcher commit batch periodically even batcher size isn't big enough. we make this function for disable AutoCommit in some case.

func (*Batcher) Len

func (b *Batcher) Len() int

Len calculate the current size of this batcher.

func (*Batcher) Send

func (b *Batcher) Send(ctx context.Context)

Send sends all pending requests in the batcher. returns tables sent FULLY in the current batch.

func (*Batcher) SetCheckpoint

func (b *Batcher) SetCheckpoint(sets map[int64]map[string]struct{})

func (*Batcher) SetThreshold

func (b *Batcher) SetThreshold(newThreshold int)

SetThreshold sets the threshold that how big the batch size reaching need to send batch. note this function isn't goroutine safe yet, just set threshold before anything starts(e.g. EnableAutoCommit), please.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client sends requests to restore files.

func MockClient

func MockClient(dbs map[string]*utils.Database) *Client

MockClient create a fake client used to test.

func NewRestoreClient

func NewRestoreClient(
	pdClient pd.Client,
	tlsConf *tls.Config,
	keepaliveConf keepalive.ClientParameters,
	isRawKv bool,
) *Client

NewRestoreClient returns a new RestoreClient.

func (*Client) CheckSysTableCompatibility

func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metautil.Table) error

func (*Client) CheckTargetClusterFresh

func (rc *Client) CheckTargetClusterFresh(ctx context.Context) error

CheckTargetClusterFresh check whether the target cluster is fresh or not if there's no user dbs or tables, we take it as a fresh cluster, although user may have created some users or made other changes.

func (*Client) CleanUpKVFiles

func (rc *Client) CleanUpKVFiles(
	ctx context.Context,
) error

func (*Client) ClearSystemUsers

func (rc *Client) ClearSystemUsers(ctx context.Context, resetUsers []string) error

ClearSystemUsers is used for volume-snapshot restoration. because we can not support restore user in some scenarios, for example in cloud. we'd better use this function to drop cloud_admin user after volume-snapshot restore.

func (*Client) Close

func (rc *Client) Close()

Close a client.

func (*Client) CreateDatabase

func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error

CreateDatabase creates a database.

func (*Client) CreatePolicies

func (rc *Client) CreatePolicies(ctx context.Context, policyMap *sync.Map) error

CreatePolicies creates all policies in full restore.

func (*Client) CreateTables

func (rc *Client) CreateTables(
	dom *domain.Domain,
	tables []*metautil.Table,
	newTS uint64,
) (*RewriteRules, []*model.TableInfo, error)

CreateTables creates multiple tables, and returns their rewrite rules.

func (*Client) EnableOnline

func (rc *Client) EnableOnline()

EnableOnline sets the mode of restore to online.

func (*Client) EnableSkipCreateSQL

func (rc *Client) EnableSkipCreateSQL()

EnableSkipCreateSQL sets switch of skip create schema and tables.

func (*Client) ExecDDLs

func (rc *Client) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error

ExecDDLs executes the queries of the ddl jobs.

func (*Client) FailpointDoChecksumForLogRestore

func (rc *Client) FailpointDoChecksumForLogRestore(
	ctx context.Context,
	kvClient kv.Client,
	pdClient pd.Client,
	idrules map[int64]int64,
	rewriteRules map[int64]*RewriteRules,
) (finalErr error)

called by failpoint, only used for test it would print the checksum result into the log, and the auto-test script records them to compare another cluster's checksum.

func (Client) FilterDataFiles

func (rc Client) FilterDataFiles(ms MetaIter) LogIter

func (Client) FilterMetaFiles

func (rc Client) FilterMetaFiles(ms MetaIter) MetaGroupIter

func (*Client) FixIndex

func (rc *Client) FixIndex(ctx context.Context, schema, table, index string) error

FixIndex tries to fix a single index.

func (*Client) FixIndicesOfTable

func (rc *Client) FixIndicesOfTable(ctx context.Context, schema string, table *model.TableInfo) error

FixIndicdesOfTable tries to fix the indices of the table via `ADMIN RECOVERY INDEX`.

func (*Client) FixIndicesOfTables

func (rc *Client) FixIndicesOfTables(
	ctx context.Context,
	fullBackupTables map[int64]*metautil.Table,
	onProgress func(),
) error

FixIndicesOfTables tries to fix the indices of the tables via `ADMIN RECOVERY INDEX`.

func (*Client) GenGlobalID

func (rc *Client) GenGlobalID(ctx context.Context) (int64, error)

GenGlobalID generates a global id by transaction way.

func (*Client) GenGlobalIDs

func (rc *Client) GenGlobalIDs(ctx context.Context, n int) ([]int64, error)

GenGlobalIDs generates several global ids by transaction way.

func (*Client) GenerateRebasedTables

func (rc *Client) GenerateRebasedTables(tables []*metautil.Table)

GenerateRebasedTables generate a map[UniqueTableName]bool to represent tables that haven't updated table info. there are two situations: 1. tables that already exists in the restored cluster. 2. tables that are created by executing ddl jobs. so, only tables in incremental restoration will be added to the map

func (*Client) GetBatchDdlSize

func (rc *Client) GetBatchDdlSize() uint

func (*Client) GetClusterID

func (rc *Client) GetClusterID(ctx context.Context) uint64

GetClusterID gets the cluster id from down-stream cluster.

func (*Client) GetDBSchema

func (rc *Client) GetDBSchema(dom *domain.Domain, dbName model.CIStr) (*model.DBInfo, bool)

GetDBSchema gets the schema of a db from TiDB cluster

func (*Client) GetDDLJobs

func (rc *Client) GetDDLJobs() []*model.Job

GetDDLJobs returns ddl jobs.

func (*Client) GetDatabase

func (rc *Client) GetDatabase(name string) *utils.Database

GetDatabase returns a database by name.

func (*Client) GetDatabases

func (rc *Client) GetDatabases() []*utils.Database

GetDatabases returns all databases.

func (*Client) GetDomain

func (rc *Client) GetDomain() *domain.Domain

func (*Client) GetFilesInRawRange

func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) ([]*backuppb.File, error)

GetFilesInRawRange gets all files that are in the given range or intersects with the given range.

func (*Client) GetGCRows

func (rc *Client) GetGCRows() []string

only for unit test

func (*Client) GetPDClient

func (rc *Client) GetPDClient() pd.Client

GetPDClient returns a pd client.

func (*Client) GetPlacementPolicies

func (rc *Client) GetPlacementPolicies() (*sync.Map, error)

GetPlacementPolicies returns policies.

func (*Client) GetPlacementRules

func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pdtypes.Rule, error)

GetPlacementRules return the current placement rules.

func (*Client) GetPolicyMap

func (rc *Client) GetPolicyMap() *sync.Map

GetPolicyMap set policyMap.

func (*Client) GetRebasedTables

func (rc *Client) GetRebasedTables() map[UniqueTableName]bool

GetRebasedTables returns tables that may need to be rebase auto increment id or auto random id

func (*Client) GetRewriteMode

func (rc *Client) GetRewriteMode() RewriteMode

func (*Client) GetSupportPolicy

func (rc *Client) GetSupportPolicy() bool

GetSupportPolicy tells whether target tidb support placement policy.

func (*Client) GetTLSConfig

func (rc *Client) GetTLSConfig() *tls.Config

GetTLSConfig returns the tls config.

func (*Client) GetTS

func (rc *Client) GetTS(ctx context.Context) (uint64, error)

GetTS gets a new timestamp from PD.

func (*Client) GetTSWithRetry

func (rc *Client) GetTSWithRetry(ctx context.Context) (uint64, error)

GetTSWithRetry gets a new timestamp with retry from PD.

func (*Client) GetTableSchema

func (rc *Client) GetTableSchema(
	dom *domain.Domain,
	dbName model.CIStr,
	tableName model.CIStr,
) (*model.TableInfo, error)

GetTableSchema returns the schema of a table from TiDB.

func (*Client) GoCreateTables

func (rc *Client) GoCreateTables(
	ctx context.Context,
	dom *domain.Domain,
	tables []*metautil.Table,
	newTS uint64,
	errCh chan<- error,
) <-chan CreatedTable

GoCreateTables create tables, and generate their information. this function will use workers as the same number of sessionPool, leave sessionPool nil to send DDLs sequential.

func (*Client) GoValidateChecksum

func (rc *Client) GoValidateChecksum(
	ctx context.Context,
	tableStream <-chan CreatedTable,
	kvClient kv.Client,
	errCh chan<- error,
	updateCh glue.Progress,
	concurrency uint,
) <-chan struct{}

GoValidateChecksum forks a goroutine to validate checksum after restore. it returns a channel fires a struct{} when all things get done.

func (*Client) HasBackedUpSysDB

func (rc *Client) HasBackedUpSysDB() bool

HasBackedUpSysDB whether we have backed up system tables br backs system tables up since 5.1.0

func (*Client) Init

func (rc *Client) Init(g glue.Glue, store kv.Storage) error

Init create db connection and domain for storage.

func (*Client) InitBackupMeta

func (rc *Client) InitBackupMeta(
	c context.Context,
	backupMeta *backuppb.BackupMeta,
	backend *backuppb.StorageBackend,
	reader *metautil.MetaReader) error

InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient.

func (*Client) InitCheckpoint

func (rc *Client) InitCheckpoint(
	ctx context.Context,
	s storage.ExternalStorage,
	taskName string,
	config *pdutil.ClusterConfig,
	useCheckpoint bool,
) (map[int64]map[string]struct{}, *pdutil.ClusterConfig, error)

InitCheckpoint initialize the checkpoint status for the cluster. If the cluster is restored for the first time, it will initialize the checkpoint metadata. Otherwrise, it will load checkpoint metadata and checkpoint ranges/checksum from the external storage.

func (*Client) InitCheckpointMetadataForLogRestore

func (rc *Client) InitCheckpointMetadataForLogRestore(ctx context.Context, taskName string, gcRatio string) (string, error)

func (*Client) InitClients

func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool, isTxnKvMode bool)

func (*Client) InitFullClusterRestore

func (rc *Client) InitFullClusterRestore(explicitFilter bool)

InitFullClusterRestore init fullClusterRestore and set SkipGrantTable as needed

func (*Client) InitSchemasReplaceForDDL

func (rc *Client) InitSchemasReplaceForDDL(
	ctx context.Context,
	cfg *InitSchemaConfig,
) (*stream.SchemasReplace, error)

InitSchemasReplaceForDDL gets schemas information Mapping from old schemas to new schemas. It is used to rewrite meta kv-event.

func (*Client) InsertDeleteRangeForIndex

func (rc *Client) InsertDeleteRangeForIndex(jobID int64, elementID *int64, tableID int64, indexIDs []int64)

InsertDeleteRangeForIndex generates query to insert index delete job into table `gc_delete_range`.

func (*Client) InsertDeleteRangeForTable

func (rc *Client) InsertDeleteRangeForTable(jobID int64, tableIDs []int64)

InsertDeleteRangeForTable generates query to insert table delete job into table `gc_delete_range`.

func (*Client) InsertGCRows

func (rc *Client) InsertGCRows(ctx context.Context) error

InsertGCRows insert the querys into table `gc_delete_range`

func (*Client) InstallLogFileManager

func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error

func (*Client) IsFull

func (rc *Client) IsFull() bool

IsFull returns whether this backup is full.

func (*Client) IsFullClusterRestore

func (rc *Client) IsFullClusterRestore() bool

func (*Client) IsIncremental

func (rc *Client) IsIncremental() bool

IsIncremental returns whether this backup is incremental.

func (*Client) IsOnline

func (rc *Client) IsOnline() bool

IsOnline tells if it's a online restore.

func (*Client) IsRawKvMode

func (rc *Client) IsRawKvMode() bool

IsRawKvMode checks whether the backup data is in raw kv format, in which case transactional recover is forbidden.

func (*Client) IsSkipCreateSQL

func (rc *Client) IsSkipCreateSQL() bool

IsSkipCreateSQL returns whether we need skip create schema and tables in restore.

func (Client) LoadDDLFilesAndCountDMLFiles

func (rc Client) LoadDDLFilesAndCountDMLFiles(ctx context.Context, counter *int) ([]Log, error)

LoadDDLFilesAndCountDMLFiles loads all DDL files needs to be restored in the restoration. At the same time, if the `counter` isn't nil, counting the DML file needs to be restored into `counter`. This function returns all DDL files needing directly because we need sort all of them.

func (Client) LoadDMLFiles

func (rc Client) LoadDMLFiles(ctx context.Context) (LogIter, error)

LoadDMLFiles loads all DML files needs to be restored in the restoration. This function returns a stream, because there are usually many DML files need to be restored.

func (*Client) LoadRestoreStores

func (rc *Client) LoadRestoreStores(ctx context.Context) error

LoadRestoreStores loads the stores used to restore data.

func (*Client) PreCheckTableClusterIndex

func (rc *Client) PreCheckTableClusterIndex(
	tables []*metautil.Table,
	ddlJobs []*model.Job,
	dom *domain.Domain,
) error

PreCheckTableClusterIndex checks whether backup tables and existed tables have different cluster index options。

func (*Client) PreCheckTableTiFlashReplica

func (rc *Client) PreCheckTableTiFlashReplica(
	ctx context.Context,
	tables []*metautil.Table,
	recorder *tiflashrec.TiFlashRecorder,
) error

PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node.

func (*Client) PreConstructAndSaveIDMap

func (rc *Client) PreConstructAndSaveIDMap(
	ctx context.Context,
	fsInWriteCF, fsInDefaultCF []*backuppb.DataFileInfo,
	sr *stream.SchemasReplace,
) error

PreConstructAndSaveIDMap constructs id mapping and save it.

func (*Client) RangeFilterFromIngestRecorder

func (rc *Client) RangeFilterFromIngestRecorder(recorder *ingestrec.IngestRecorder, rewriteRules map[int64]*RewriteRules) error

RangeFilterFromIngestRecorder rewrites the table id of items in the ingestRecorder TODO: need to implement the range filter out feature

func (Client) ReadAllEntries

func (rc Client) ReadAllEntries(
	ctx context.Context,
	file Log,
	filterTS uint64,
) ([]*KvEntryWithTS, []*KvEntryWithTS, error)

ReadAllEntries loads content of a log file, with filtering out no needed entries.

func (*Client) RepairIngestIndex

func (rc *Client) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, storage kv.Storage, taskName string) error

RepairIngestIndex drops the indexes from IngestRecorder and re-add them.

func (*Client) ResetPlacementRules

func (rc *Client) ResetPlacementRules(ctx context.Context, tables []*model.TableInfo) error

ResetPlacementRules removes placement rules for tables.

func (*Client) ResetRestoreLabels

func (rc *Client) ResetRestoreLabels(ctx context.Context) error

ResetRestoreLabels removes the exclusive labels of the restore stores.

func (*Client) ResetSpeedLimit

func (rc *Client) ResetSpeedLimit(ctx context.Context) error

func (*Client) ResetTS

func (rc *Client) ResetTS(ctx context.Context, pdCtrl *pdutil.PdController) error

ResetTS resets the timestamp of PD to a bigger value.

func (*Client) ResetTiFlashReplicas

func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage) error

func (*Client) RestoreBatchMetaKVFiles

func (rc *Client) RestoreBatchMetaKVFiles(
	ctx context.Context,
	files []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	kvEntries []*KvEntryWithTS,
	filterTS uint64,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
	cf string,
) ([]*KvEntryWithTS, error)

func (*Client) RestoreKVFiles

func (rc *Client) RestoreKVFiles(
	ctx context.Context,
	rules map[int64]*RewriteRules,
	idrules map[int64]int64,
	logIter LogIter,
	runner *checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType],
	pitrBatchCount uint32,
	pitrBatchSize uint32,
	updateStats func(kvCount uint64, size uint64),
	onProgress func(cnt int64),
) error

func (*Client) RestoreMetaKVFiles

func (rc *Client) RestoreMetaKVFiles(
	ctx context.Context,
	files []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
) error

RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.

func (*Client) RestoreMetaKVFilesWithBatchMethod

func (rc *Client) RestoreMetaKVFilesWithBatchMethod(
	ctx context.Context,
	defaultFiles []*backuppb.DataFileInfo,
	writeFiles []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
	restoreBatch func(
		ctx context.Context,
		files []*backuppb.DataFileInfo,
		schemasReplace *stream.SchemasReplace,
		kvEntries []*KvEntryWithTS,
		filterTS uint64,
		updateStats func(kvCount uint64, size uint64),
		progressInc func(),
		cf string,
	) ([]*KvEntryWithTS, error),
) error

func (*Client) RestoreRaw

func (rc *Client) RestoreRaw(
	ctx context.Context, startKey []byte, endKey []byte, files []*backuppb.File, updateCh glue.Progress,
) error

RestoreRaw tries to restore raw keys in the specified range.

func (*Client) RestoreSSTFiles

func (rc *Client) RestoreSSTFiles(
	ctx context.Context,
	tableIDWithFiles []TableIDWithFiles,
	rewriteRules *RewriteRules,
	updateCh glue.Progress,
) (err error)

RestoreSSTFiles tries to restore the files.

func (*Client) RestoreSystemSchemas

func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter)

RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema). Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.

func (*Client) RunGCRowsLoader

func (rc *Client) RunGCRowsLoader(ctx context.Context)

use channel to save the delete-range query to make it thread-safety.

func (*Client) SaveIDMap

func (rc *Client) SaveIDMap(
	ctx context.Context,
	sr *stream.SchemasReplace,
) error

SaveIDMap saves the id mapping information.

func (*Client) SetBatchDdlSize

func (rc *Client) SetBatchDdlSize(batchDdlsize uint)

func (*Client) SetConcurrency

func (rc *Client) SetConcurrency(c uint)

SetConcurrency sets the concurrency of dbs tables files.

func (*Client) SetCrypter

func (rc *Client) SetCrypter(crypter *backuppb.CipherInfo)

func (*Client) SetCurrentTS

func (rc *Client) SetCurrentTS(ts uint64)

func (*Client) SetPlacementPolicyMode

func (rc *Client) SetPlacementPolicyMode(withPlacementPolicy string)

SetPlacementPolicyMode to policy mode.

func (*Client) SetPolicyMap

func (rc *Client) SetPolicyMap(p *sync.Map)

SetPolicyMap set policyMap.

func (*Client) SetRateLimit

func (rc *Client) SetRateLimit(rateLimit uint64)

SetRateLimit to set rateLimit.

func (*Client) SetRawKVClient

func (rc *Client) SetRawKVClient(c *RawKVBatchClient)

func (*Client) SetRewriteMode

func (rc *Client) SetRewriteMode(mode RewriteMode)

func (*Client) SetStorage

func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error

func (*Client) SetSwitchModeInterval

func (rc *Client) SetSwitchModeInterval(interval time.Duration)

SetSwitchModeInterval set switch mode interval for client.

func (*Client) SetWithSysTable

func (rc *Client) SetWithSysTable(withSysTable bool)

func (*Client) SetupPlacementRules

func (rc *Client) SetupPlacementRules(ctx context.Context, tables []*model.TableInfo) error

SetupPlacementRules sets rules for the tables' regions.

func (Client) ShiftTS

func (rc Client) ShiftTS() uint64

func (Client) ShouldFilterOut

func (rc Client) ShouldFilterOut(d *backuppb.DataFileInfo) bool

ShouldFilterOut checks whether a file should be filtered out via the current client.

func (*Client) SplitRanges

func (rc *Client) SplitRanges(ctx context.Context,
	ranges []rtree.Range,
	rewriteRules *RewriteRules,
	updateCh glue.Progress,
	isRawKv bool) error

SplitRanges implements TiKVRestorer.

func (*Client) SwitchToImportMode

func (rc *Client) SwitchToImportMode(ctx context.Context)

SwitchToImportMode switch tikv cluster to import mode.

func (*Client) SwitchToNormalMode

func (rc *Client) SwitchToNormalMode(ctx context.Context) error

SwitchToNormalMode switch tikv cluster to normal mode.

func (*Client) UpdateSchemaVersion

func (rc *Client) UpdateSchemaVersion(ctx context.Context) error

UpdateSchemaVersion updates schema version by transaction way.

func (*Client) WaitForFilesRestored

func (rc *Client) WaitForFilesRestored(ctx context.Context, files []*backuppb.File, updateCh glue.Progress) error

func (*Client) WaitForFinishCheckpoint

func (rc *Client) WaitForFinishCheckpoint(ctx context.Context, flush bool)

func (*Client) WaitPlacementSchedule

func (rc *Client) WaitPlacementSchedule(ctx context.Context, tables []*model.TableInfo) error

WaitPlacementSchedule waits PD to move tables to restore stores.

func (*Client) WrapLogFilesIterWithCheckpoint

func (rc *Client) WrapLogFilesIterWithCheckpoint(
	ctx context.Context,
	logIter LogIter,
	downstreamIdset map[int64]struct{},
	taskName string,
	updateStats func(kvCount, size uint64),
	onProgress func(),
) (LogIter, error)

func (*Client) WrapLogFilesIterWithSplitHelper

func (rc *Client) WrapLogFilesIterWithSplitHelper(logIter LogIter, rules map[int64]*RewriteRules, g glue.Glue, store kv.Storage) (LogIter, error)

type Comparator

type Comparator interface {
	Compare(src, dst []byte) bool
}

Comparator is used for comparing the relationship of src and dst

func NewStartWithComparator

func NewStartWithComparator() Comparator

NewStartWithComparator create a comparator to compare whether src starts with dst

type ContextManager

type ContextManager interface {
	// Enter make some tables 'enter' this context(a.k.a., prepare for restore).
	Enter(ctx context.Context, tables []CreatedTable) error
	// Leave make some tables 'leave' this context(a.k.a., restore is done, do some post-works).
	Leave(ctx context.Context, tables []CreatedTable) error
	// Close closes the context manager, sometimes when the manager is 'killed' and should do some cleanup
	// it would be call.
	Close(ctx context.Context)
}

ContextManager is the struct to manage a TiKV 'context' for restore. Batcher will call Enter when any table should be restore on batch, so you can do some prepare work here(e.g. set placement rules for online restore).

func NewBRContextManager

func NewBRContextManager(client *Client) ContextManager

NewBRContextManager makes a BR context manager, that is, set placement rules for online restore when enter(see <splitPrepareWork>), unset them when leave.

type CreatedTable

type CreatedTable struct {
	RewriteRule *RewriteRules
	Table       *model.TableInfo
	OldTable    *metautil.Table
}

CreatedTable is a table created on restore process, but not yet filled with data.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is a TiDB instance, not thread-safe.

func NewDB

func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error)

NewDB returns a new DB.

func (*DB) Close

func (db *DB) Close()

Close closes the connection.

func (*DB) CreateDatabase

func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error

CreateDatabase executes a CREATE DATABASE SQL.

func (*DB) CreatePlacementPolicy

func (db *DB) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error

CreatePlacementPolicy check whether cluster support policy and create the policy.

func (*DB) CreateTable

func (db *DB) CreateTable(ctx context.Context, table *metautil.Table,
	ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error

CreateTable executes a CREATE TABLE SQL.

func (*DB) CreateTablePostRestore

func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table, toBeCorrectedTables map[UniqueTableName]bool) error

func (*DB) CreateTables

func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table,
	ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error

CreateTables execute a internal CREATE TABLES.

func (*DB) ExecDDL

func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error

ExecDDL executes the query of a ddl job.

func (*DB) UpdateStatsMeta

func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error

UpdateStatsMeta update count and snapshot ts in mysql.stats_meta

type DDLJobFilterRule

type DDLJobFilterRule func(ddlJob *model.Job) bool

type DDLMetaGroup

type DDLMetaGroup struct {
	Path      string
	FileMetas []*backuppb.DataFileInfo
}

type DrainResult

type DrainResult struct {
	// TablesToSend are tables that would be send at this batch.
	TablesToSend []CreatedTable
	// BlankTablesAfterSend are tables that will be full-restored after this batch send.
	BlankTablesAfterSend []CreatedTable
	RewriteRules         *RewriteRules
	Ranges               []rtree.Range
	// Record which part of ranges belongs to the table
	TableEndOffsetInRanges []int
}

DrainResult is the collection of some ranges and theirs metadata.

func (DrainResult) Files

func (result DrainResult) Files() []TableIDWithFiles

Files returns all files of this drain result.

type FileGroupInfo

type FileGroupInfo struct {
	MaxTS   uint64
	Length  uint64
	KVCount int64
}

keep these meta-information for statistics and filtering

type FileImporter

type FileImporter struct {
	// contains filtered or unexported fields
}

FileImporter used to import a file to TiKV.

func NewFileImporter

func NewFileImporter(
	metaClient split.SplitClient,
	importClient ImporterClient,
	backend *backuppb.StorageBackend,
	isRawKvMode bool,
	isTxnKvMode bool,
	rewriteMode RewriteMode,
) FileImporter

NewFileImporter returns a new file importClient.

func (*FileImporter) CheckMultiIngestSupport

func (importer *FileImporter) CheckMultiIngestSupport(ctx context.Context, pdClient pd.Client) error

CheckMultiIngestSupport checks whether all stores support multi-ingest

func (*FileImporter) ClearFiles

func (importer *FileImporter) ClearFiles(ctx context.Context, pdClient pd.Client, prefix string) error

func (*FileImporter) ImportKVFileForRegion

func (importer *FileImporter) ImportKVFileForRegion(
	ctx context.Context,
	files []*LogDataFileInfo,
	rule *RewriteRules,
	shiftStartTS uint64,
	startTS uint64,
	restoreTS uint64,
	info *split.RegionInfo,
	supportBatch bool,
) RPCResult

Import tries to import a file.

func (*FileImporter) ImportKVFiles

func (importer *FileImporter) ImportKVFiles(
	ctx context.Context,
	files []*LogDataFileInfo,
	rule *RewriteRules,
	shiftStartTS uint64,
	startTS uint64,
	restoreTS uint64,
	supportBatch bool,
) error

ImportKVFiles restores the kv events.

func (*FileImporter) ImportSSTFiles

func (importer *FileImporter) ImportSSTFiles(
	ctx context.Context,
	files []*backuppb.File,
	rewriteRules *RewriteRules,
	cipher *backuppb.CipherInfo,
	apiVersion kvrpcpb.APIVersion,
) error

ImportSSTFiles tries to import a file. All rules must contain encoded keys.

func (*FileImporter) SetRawRange

func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error

SetRawRange sets the range to be restored in raw kv mode.

type FilesInRegion

type FilesInRegion struct {
	// contains filtered or unexported fields
}

type FilesInTable

type FilesInTable struct {
	// contains filtered or unexported fields
}

type FullBackupStorageConfig

type FullBackupStorageConfig struct {
	Backend *backuppb.StorageBackend
	Opts    *storage.ExternalStorageOptions
}

type ImporterClient

type ImporterClient interface {
	ClearFiles(
		ctx context.Context,
		storeID uint64,
		req *import_sstpb.ClearRequest,
	) (*import_sstpb.ClearResponse, error)

	ApplyKVFile(
		ctx context.Context,
		storeID uint64,
		req *import_sstpb.ApplyRequest,
	) (*import_sstpb.ApplyResponse, error)

	DownloadSST(
		ctx context.Context,
		storeID uint64,
		req *import_sstpb.DownloadRequest,
	) (*import_sstpb.DownloadResponse, error)

	IngestSST(
		ctx context.Context,
		storeID uint64,
		req *import_sstpb.IngestRequest,
	) (*import_sstpb.IngestResponse, error)
	MultiIngest(
		ctx context.Context,
		storeID uint64,
		req *import_sstpb.MultiIngestRequest,
	) (*import_sstpb.IngestResponse, error)

	SetDownloadSpeedLimit(
		ctx context.Context,
		storeID uint64,
		req *import_sstpb.SetDownloadSpeedLimitRequest,
	) (*import_sstpb.SetDownloadSpeedLimitResponse, error)

	GetImportClient(
		ctx context.Context,
		storeID uint64,
	) (import_sstpb.ImportSSTClient, error)

	SupportMultiIngest(ctx context.Context, stores []uint64) (bool, error)
}

ImporterClient is used to import a file to TiKV.

func NewImportClient

func NewImportClient(metaClient split.SplitClient, tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters) ImporterClient

NewImportClient returns a new ImporterClient.

type InitSchemaConfig

type InitSchemaConfig struct {
	// required
	IsNewTask      bool
	HasFullRestore bool
	TableFilter    filter.Filter

	// optional
	TiFlashRecorder   *tiflashrec.TiFlashRecorder
	FullBackupStorage *FullBackupStorageConfig
}

type KVPair

type KVPair struct {
	// contains filtered or unexported fields
}

type KvEntryWithTS

type KvEntryWithTS struct {
	// contains filtered or unexported fields
}

the kv entry with ts, the ts is decoded from entry.

type KvMode

type KvMode int
const (
	TiDB KvMode = iota
	Raw
	Txn
)

type Log

type Log = *backuppb.DataFileInfo

Log is the metadata of one file recording KV sequences.

type LogDataFileInfo

type LogDataFileInfo struct {
	*backuppb.DataFileInfo
	MetaDataGroupName   string
	OffsetInMetaGroup   int
	OffsetInMergedGroup int
}

func FilterFilesByRegion

func FilterFilesByRegion(
	files []*LogDataFileInfo,
	ranges []kv.KeyRange,
	r *split.RegionInfo,
) ([]*LogDataFileInfo, error)

type LogFileManagerInit

type LogFileManagerInit struct {
	StartTS   uint64
	RestoreTS uint64
	Storage   storage.ExternalStorage
}

LogFileManagerInit is the config needed for initializing the log file manager.

type LogFilesIterWithSplitHelper

type LogFilesIterWithSplitHelper struct {
	// contains filtered or unexported fields
}

func (*LogFilesIterWithSplitHelper) TryNext

type LogFilesSkipMap

type LogFilesSkipMap struct {
	// contains filtered or unexported fields
}

func NewLogFilesSkipMap

func NewLogFilesSkipMap() *LogFilesSkipMap

func (*LogFilesSkipMap) Insert

func (m *LogFilesSkipMap) Insert(metaKey string, groupOff, fileOff int)

func (*LogFilesSkipMap) NeedSkip

func (m *LogFilesSkipMap) NeedSkip(metaKey string, groupOff, fileOff int) bool

type LogIter

type LogIter = iter.TryNextor[*LogDataFileInfo]

LogIter is the type of iterator of each log files' meta information.

func NewLogFilesIterWithSplitHelper

func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) LogIter

type LogSplitHelper

type LogSplitHelper struct {
	// contains filtered or unexported fields
}

func NewLogSplitHelper

func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) *LogSplitHelper

func (*LogSplitHelper) Merge

func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo)

func (*LogSplitHelper) Split

func (helper *LogSplitHelper) Split(ctx context.Context) error

type MergeRangesStat

type MergeRangesStat struct {
	TotalFiles           int
	TotalWriteCFFile     int
	TotalDefaultCFFile   int
	TotalRegions         int
	RegionKeysAvg        int
	RegionBytesAvg       int
	MergedRegions        int
	MergedRegionKeysAvg  int
	MergedRegionBytesAvg int
}

MergeRangesStat holds statistics for the MergeRanges.

func MergeFileRanges

func MergeFileRanges(
	files []*backuppb.File, splitSizeBytes, splitKeyCount uint64,
) ([]rtree.Range, *MergeRangesStat, error)

MergeFileRanges returns ranges of the files are merged based on splitSizeBytes and splitKeyCount.

By merging small ranges, it speeds up restoring a backup that contains many small ranges (regions) as it reduces split region and scatter region.

type Meta

type Meta = *backuppb.Metadata

Meta is the metadata of files.

type MetaGroupIter

type MetaGroupIter = iter.TryNextor[DDLMetaGroup]

MetaGroupIter is the iterator of flushes of metadata.

type MetaIter

type MetaIter = iter.TryNextor[*backuppb.Metadata]

MetaIter is the type of iterator of metadata files' content.

type MetadataInfo

type MetadataInfo struct {
	MinTS          uint64
	FileGroupInfos []*FileGroupInfo
}

keep these meta-information for statistics and filtering

type OnSplitFunc

type OnSplitFunc func(key [][]byte)

OnSplitFunc is called before split a range.

type OverRegionsInRangeController

type OverRegionsInRangeController struct {
	// contains filtered or unexported fields
}

func OverRegionsInRange

func OverRegionsInRange(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) OverRegionsInRangeController

OverRegionsInRange creates a controller that cloud be used to scan regions in a range and apply a function over these regions. You can then call the `Run` method for applying some functions.

func (*OverRegionsInRangeController) Run

Run executes the `regionFunc` over the regions in `o.start` and `o.end`. It would retry the errors according to the `rpcResponse`.

type RPCResult

type RPCResult struct {
	Err error

	ImportError string
	StoreError  *errorpb.Error
}

RPCResult is the result after executing some RPCs to TiKV.

func RPCResultFromError

func RPCResultFromError(err error) RPCResult

func RPCResultFromPBError

func RPCResultFromPBError(err *import_sstpb.Error) RPCResult

func RPCResultOK

func RPCResultOK() RPCResult

func (*RPCResult) Error

func (r *RPCResult) Error() string

func (*RPCResult) OK

func (r *RPCResult) OK() bool

func (*RPCResult) StrategyForRetry

func (r *RPCResult) StrategyForRetry() RetryStrategy

func (*RPCResult) StrategyForRetryGoError

func (r *RPCResult) StrategyForRetryGoError() RetryStrategy

func (*RPCResult) StrategyForRetryStoreError

func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy

type Range

type Range struct {
	Start []byte
	End   []byte
}

Range record start and end key for localStoreDir.DB so we can write it to tikv in streaming

type RawKVBatchClient

type RawKVBatchClient struct {
	// contains filtered or unexported fields
}

RawKVBatchClient is used to put raw kv-entry into tikv. Note: it is not thread safe.

func NewRawKVBatchClient

func NewRawKVBatchClient(
	rawkvClient RawkvClient,
	batchCount int,
) *RawKVBatchClient

NewRawKVBatchClient create a batch rawkv client.

func (*RawKVBatchClient) Close

func (c *RawKVBatchClient) Close()

Close closes the RawKVBatchClient.

func (*RawKVBatchClient) Put

func (c *RawKVBatchClient) Put(ctx context.Context, key, value []byte, originTs uint64) error

Put puts (key, value) into buffer justly, wait for batch write if the buffer is full.

func (*RawKVBatchClient) PutRest

func (c *RawKVBatchClient) PutRest(ctx context.Context) error

PutRest writes the rest pairs (key, values) into tikv.

func (*RawKVBatchClient) SetColumnFamily

func (c *RawKVBatchClient) SetColumnFamily(columnFamily string)

SetColumnFamily set the columnFamily for the client.

type RawkvClient

type RawkvClient interface {
	Get(ctx context.Context, key []byte, options ...rawkv.RawOption) ([]byte, error)
	Put(ctx context.Context, key, value []byte, options ...rawkv.RawOption) error
	BatchGet(ctx context.Context, keys [][]byte, options ...rawkv.RawOption) ([][]byte, error)
	BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error
	Close() error
}

RawkvClient is the interface for rawkv.client

func NewRawkvClient

func NewRawkvClient(ctx context.Context, pdAddrs []string, security config.Security) (RawkvClient, error)

NewRawkvClient create a rawkv client.

type RecoverRegion

type RecoverRegion struct {
	*recovpb.RegionMeta
	StoreId uint64
}

func LeaderCandidates

func LeaderCandidates(peers []*RecoverRegion) ([]*RecoverRegion, error)

in cloud, since iops and bandwidth limitation, write operator in raft is slow, so raft state (logterm, lastlog, commitlog...) are the same among the peers LeaderCandidates select all peers can be select as a leader during the restore

func SelectRegionLeader

func SelectRegionLeader(storeBalanceScore map[uint64]int, peers []*RecoverRegion) *RecoverRegion

for region A, has candidate leader x, y, z peer x on store 1 with storeBalanceScore 3 peer y on store 3 with storeBalanceScore 2 peer z on store 4 with storeBalanceScore 1 result: peer z will be select as leader on store 4

type RecoverRegionInfo

type RecoverRegionInfo struct {
	RegionId      uint64
	RegionVersion uint64
	StartKey      []byte
	EndKey        []byte
	TombStone     bool
}

func SortRecoverRegions

func SortRecoverRegions(regions map[uint64][]*RecoverRegion) []*RecoverRegionInfo

type Recovery

type Recovery struct {
	StoreMetas   []StoreMeta
	RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest
	MaxAllocID   uint64
	// contains filtered or unexported fields
}

for test

func NewRecovery

func NewRecovery(allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, concurrency uint32) Recovery

func (*Recovery) FlashbackToVersion

func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error)

flashback the region data to version resolveTS

func (*Recovery) GetTotalRegions

func (recovery *Recovery) GetTotalRegions() int

func (*Recovery) MakeRecoveryPlan

func (recovery *Recovery) MakeRecoveryPlan() error

generate the related the recovery plan to tikvs: 1. check overlap the region, make a recovery decision 2. build a leader list for all region during the tikv startup 3. get max allocate id

func (*Recovery) PrepareFlashbackToVersion

func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error)

prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state

func (*Recovery) ReadRegionMeta

func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error

ReadRegionMeta read all region meta from tikvs

func (*Recovery) RecoverRegions

func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error)

RecoverRegions send the recovery plan to recovery region (force leader etc) only tikvs have regions whose have to recover be sent

func (*Recovery) WaitApply

func (recovery *Recovery) WaitApply(ctx context.Context) (err error)

WaitApply send wait apply to all tikv ensure all region peer apply log into the last

type RegionFunc

type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult

type RegionSplitter

type RegionSplitter struct {
	// contains filtered or unexported fields
}

RegionSplitter is a executor of region split by rules.

func NewRegionSplitter

func NewRegionSplitter(client split.SplitClient) *RegionSplitter

NewRegionSplitter returns a new RegionSplitter.

func (*RegionSplitter) ScatterRegions

func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*split.RegionInfo)

ScatterRegions scatter the regions.

func (*RegionSplitter) ScatterRegionsWithBackoffer

func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRegions []*split.RegionInfo, backoffer utils.Backoffer)

ScatterRegionsWithBackoffer scatter the region with some backoffer. This function is for testing the retry mechanism. For a real cluster, directly use ScatterRegions would be fine.

func (*RegionSplitter) Split

func (rs *RegionSplitter) Split(
	ctx context.Context,
	ranges []rtree.Range,
	rewriteRules *RewriteRules,
	isRawKv bool,
	onSplit OnSplitFunc,
) error

Split executes a region split. It will split regions by the rewrite rules, then it will split regions by the end key of each range. tableRules includes the prefix of a table, since some ranges may have a prefix with record sequence or index sequence. note: all ranges and rewrite rules must have raw key.

type RetryStrategy

type RetryStrategy int
const (
	StrategyGiveUp RetryStrategy = iota
	StrategyFromThisRegion
	StrategyFromStart
)

type RewriteMode

type RewriteMode int

RewriteMode is a mode flag that tells the TiKV how to handle the rewrite rules.

const (
	// RewriteModeLegacy means no rewrite rule is applied.
	RewriteModeLegacy RewriteMode = iota

	// RewriteModeKeyspace means the rewrite rule could be applied to keyspace.
	RewriteModeKeyspace
)

type RewriteRules

type RewriteRules struct {
	Data        []*import_sstpb.RewriteRule
	OldKeyspace []byte
	NewKeyspace []byte
}

RewriteRules contains rules for rewriting keys of tables.

func EmptyRewriteRule

func EmptyRewriteRule() *RewriteRules

EmptyRewriteRule make a new, empty rewrite rule.

func GetRewriteRuleOfTable

func GetRewriteRuleOfTable(
	oldTableID, newTableID int64,
	newTimeStamp uint64,
	indexIDs map[int64]int64,
	getDetailRule bool,
) *RewriteRules

GetRewriteRuleOfTable returns a rewrite rule from t_{oldID} to t_{newID}.

func GetRewriteRules

func GetRewriteRules(
	newTable, oldTable *model.TableInfo, newTimeStamp uint64, getDetailRule bool,
) *RewriteRules

GetRewriteRules returns the rewrite rule of the new table and the old table. getDetailRule is used for normal backup & restore. if set to true, means we collect the rules like tXXX_r, tYYY_i. if set to false, means we only collect the rules contain table_id, tXXX, tYYY.

func (*RewriteRules) Append

func (r *RewriteRules) Append(other RewriteRules)

Append append its argument to this rewrite rules.

type SendType

type SendType int

SendType is the 'type' of a send. when we make a 'send' command to worker, we may want to flush all pending ranges (when auto commit enabled), or, we just want to clean overflowing ranges(when just adding a table to batcher).

const (
	// SendUntilLessThanBatch will make the batcher send batch until
	// its remaining range is less than its batchSizeThreshold.
	SendUntilLessThanBatch SendType = iota
	// SendAll will make the batcher send all pending ranges.
	SendAll
	// SendAllThenClose will make the batcher send all pending ranges and then close itself.
	SendAllThenClose
)

type StoreMeta

type StoreMeta struct {
	StoreId     uint64
	RegionMetas []*recovpb.RegionMeta
}

func NewStoreMeta

func NewStoreMeta(storeId uint64) StoreMeta

type StreamBackupSearch

type StreamBackupSearch struct {
	// contains filtered or unexported fields
}

StreamBackupSearch is used for searching key from log data files

func NewStreamBackupSearch

func NewStreamBackupSearch(storage storage.ExternalStorage, comparator Comparator, searchKey []byte) *StreamBackupSearch

NewStreamBackupSearch creates an instance of StreamBackupSearch

func (*StreamBackupSearch) Search

func (s *StreamBackupSearch) Search(ctx context.Context) ([]*StreamKVInfo, error)

Search kv entries from log data files

func (*StreamBackupSearch) SetEndTs

func (s *StreamBackupSearch) SetEndTs(endTs uint64)

SetEndTs set end timestamp searched to

func (*StreamBackupSearch) SetStartTS

func (s *StreamBackupSearch) SetStartTS(startTs uint64)

SetStartTS set start timestamp searched from

type StreamKVInfo

type StreamKVInfo struct {
	Key        string `json:"key"`
	EncodedKey string `json:"-"`
	WriteType  byte   `json:"write-type"`
	StartTs    uint64 `json:"start-ts"`
	CommitTs   uint64 `json:"commit-ts"`
	CFName     string `json:"cf-name"`
	Value      string `json:"value,omitempty"`
	ShortValue string `json:"short-value,omitempty"`
}

StreamKVInfo stores kv info searched from log data files

type StreamMetadataSet

type StreamMetadataSet struct {
	// if set true, the metadata and datafile won't be removed
	DryRun bool

	// a parser of metadata
	Helper *stream.MetadataHelper

	// for test
	BeforeDoWriteBack func(path string, replaced *backuppb.Metadata) (skip bool)
	// contains filtered or unexported fields
}

func (*StreamMetadataSet) IterateFilesFullyBefore

func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *FileGroupInfo) (shouldBreak bool))

IterateFilesFullyBefore runs the function over all files contain data before the timestamp only.

0                                          before
|------------------------------------------|
 |-file1---------------| <- File contains records in this TS range would be found.
                               |-file2--------------| <- File contains any record out of this won't be found.

This function would call the `f` over file1 only.

func (*StreamMetadataSet) LoadFrom

LoadFrom loads data from an external storage into the stream metadata set. (Now only for test)

func (*StreamMetadataSet) LoadUntilAndCalculateShiftTS

func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s storage.ExternalStorage, until uint64) (uint64, error)

LoadUntilAndCalculateShiftTS loads the metadata until the specified timestamp and calculate the shift-until-ts by the way. This would record all metadata files that *may* contain data from transaction committed before that TS.

func (*StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch

func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context.Context, from uint64, storage storage.ExternalStorage, updateFn func(num int64)) ([]string, error)

RemoveDataFilesAndUpdateMetadataInBatch concurrently remove datafilegroups and update metadata. Only one metadata is processed in each thread, including deleting its datafilegroup and updating it. Returns the not deleted datafilegroups.

type TableIDWithFiles

type TableIDWithFiles struct {
	TableID int64

	Files []*backuppb.File
}

type TableSink

type TableSink interface {
	EmitTables(tables ...CreatedTable)
	EmitError(error)
	Close()
}

TableSink is the 'sink' of restored data by a sender.

type TableWithRange

type TableWithRange struct {
	CreatedTable

	Range []rtree.Range
}

TableWithRange is a CreatedTable that has been bind to some of key ranges.

type TiKVRestorer

type TiKVRestorer interface {
	// SplitRanges split regions implicated by the ranges and rewrite rules.
	// After spliting, it also scatters the fresh regions.
	SplitRanges(ctx context.Context,
		ranges []rtree.Range,
		rewriteRules *RewriteRules,
		updateCh glue.Progress,
		isRawKv bool) error
	// RestoreSSTFiles import the files to the TiKV.
	RestoreSSTFiles(ctx context.Context,
		tableIDWithFiles []TableIDWithFiles,
		rewriteRules *RewriteRules,
		updateCh glue.Progress) error
}

TiKVRestorer is the minimal methods required for restoring. It contains the primitive APIs extract from `restore.Client`, so some of arguments may seem redundant. Maybe TODO: make a better abstraction?

type UniqueTableName

type UniqueTableName struct {
	DB    string
	Table string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL