Documentation ¶
Overview ¶
Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.
Index ¶
- Constants
- func ApplyKVFilesWithBatchMethod(ctx context.Context, iter LogIter, batchCount int, batchSize uint64, ...) error
- func ApplyKVFilesWithSingelMethod(ctx context.Context, files LogIter, ...) error
- func CheckConsistencyAndValidPeer(regionInfos []*RecoverRegionInfo) (map[uint64]struct{}, error)
- func CheckNewCollationEnable(backupNewCollationEnable string, g glue.Glue, storage kv.Storage, ...) error
- func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFileManager, error)
- func DDLJobBlockListRule(ddlJob *model.Job) bool
- func EstimateRangeSize(files []*backuppb.File) int
- func Exhaust(ec <-chan error) []error
- func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job)
- func FilterDDLJobs(allDDLJobs []*model.Job, tables []*metautil.Table) (ddlJobs []*model.Job)
- func FilterFilesByRegion(files []*backuppb.DataFileInfo, ranges []kv.KeyRange, r *split.RegionInfo) ([]*backuppb.DataFileInfo, error)
- func GetExistedUserDBs(dom *domain.Domain) []*model.DBInfo
- func GetKeyTS(key []byte) (uint64, error)
- func GetRewriteEncodedKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error)
- func GetRewriteRawKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error)
- func GetRewriteRulesMap(newTable, oldTable *model.TableInfo, newTimeStamp uint64, getDetailRule bool) map[int64]*RewriteRules
- func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64
- func GetSSTMetaFromFile(id []byte, file *backuppb.File, region *metapb.Region, ...) import_sstpb.SSTMeta
- func GetTSFromFile(ctx context.Context, s storage.ExternalStorage, filename string) (uint64, error)
- func GoValidateFileRanges(ctx context.Context, tableStream <-chan CreatedTable, ...) <-chan TableWithRange
- func MapTableToFiles(files []*backuppb.File) map[int64][]*backuppb.File
- func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient, rc *Client, ...) error
- func NeedSplit(splitKey []byte, regions []*split.RegionInfo, isRawKv bool) *split.RegionInfo
- func NeedsMerge(left, right *rtree.Range, splitSizeBytes, splitKeyCount uint64) bool
- func NewSplitHelperIteratorForTest(helper *split.SplitHelper, tableID int64, rule *RewriteRules) *splitHelperIterator
- func ParseQuoteName(name string) (db, table string)
- func PrefixEndKey(key []byte) []byte
- func PrefixStartKey(key []byte) []byte
- func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, ...) (int, error)
- func ReplaceMetadata(meta *backuppb.Metadata, filegroups []*backuppb.DataFileGroup)
- func SetTSToFile(ctx context.Context, s storage.ExternalStorage, safepoint uint64, ...) error
- func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo
- func SortRanges(ranges []rtree.Range, rewriteRules *RewriteRules) ([]rtree.Range, error)
- func SplitPoint(ctx context.Context, iter *splitHelperIterator, client split.SplitClient, ...) (err error)
- func SplitRanges(ctx context.Context, client *Client, ranges []rtree.Range, ...) error
- func TidyOldSchemas(sr *stream.SchemasReplace) *backup.Schemas
- func TruncateTS(key []byte) []byte
- func UpdateShiftTS(m *backuppb.Metadata, startTS uint64, restoreTS uint64) (uint64, bool)
- func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) error
- func ZapTables(tables []CreatedTable) zapcore.Field
- type AppliedFile
- type BatchSender
- type Batcher
- func (b *Batcher) Add(tbs TableWithRange)
- func (b *Batcher) Close()
- func (b *Batcher) DisableAutoCommit()
- func (b *Batcher) EnableAutoCommit(ctx context.Context, delay time.Duration)
- func (b *Batcher) Len() int
- func (b *Batcher) Send(ctx context.Context)
- func (b *Batcher) SetThreshold(newThreshold int)
- type Client
- func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metautil.Table) error
- func (rc *Client) CheckTargetClusterFresh(ctx context.Context) error
- func (rc *Client) CleanUpKVFiles(ctx context.Context) error
- func (rc *Client) ClearSystemUsers(ctx context.Context, resetUsers []string) error
- func (rc *Client) Close()
- func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error
- func (rc *Client) CreatePolicies(ctx context.Context, policyMap *sync.Map) error
- func (rc *Client) CreateTables(dom *domain.Domain, tables []*metautil.Table, newTS uint64) (*RewriteRules, []*model.TableInfo, error)
- func (rc *Client) EnableOnline()
- func (rc *Client) EnableSkipCreateSQL()
- func (rc *Client) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error
- func (rc Client) FilterDataFiles(ms MetaIter) LogIter
- func (rc Client) FilterMetaFiles(ms MetaIter) MetaGroupIter
- func (rc *Client) FixIndex(ctx context.Context, schema, table, index string) error
- func (rc *Client) FixIndicesOfTable(ctx context.Context, schema string, table *model.TableInfo) error
- func (rc *Client) FixIndicesOfTables(ctx context.Context, fullBackupTables map[int64]*metautil.Table, ...) error
- func (rc *Client) GenGlobalID(ctx context.Context) (int64, error)
- func (rc *Client) GenGlobalIDs(ctx context.Context, n int) ([]int64, error)
- func (rc *Client) GenerateRebasedTables(tables []*metautil.Table)
- func (rc *Client) GetBatchDdlSize() uint
- func (rc *Client) GetDBSchema(dom *domain.Domain, dbName model.CIStr) (*model.DBInfo, bool)
- func (rc *Client) GetDDLJobs() []*model.Job
- func (rc *Client) GetDatabase(name string) *utils.Database
- func (rc *Client) GetDatabases() []*utils.Database
- func (rc *Client) GetDomain() *domain.Domain
- func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) ([]*backuppb.File, error)
- func (rc *Client) GetGCRows() []string
- func (rc *Client) GetPDClient() pd.Client
- func (rc *Client) GetPlacementPolicies() (*sync.Map, error)
- func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pdtypes.Rule, error)
- func (rc *Client) GetPolicyMap() *sync.Map
- func (rc *Client) GetRebasedTables() map[UniqueTableName]bool
- func (rc *Client) GetSupportPolicy() bool
- func (rc *Client) GetTLSConfig() *tls.Config
- func (rc *Client) GetTS(ctx context.Context) (uint64, error)
- func (rc *Client) GetTSWithRetry(ctx context.Context) (uint64, error)
- func (rc *Client) GetTableSchema(dom *domain.Domain, dbName model.CIStr, tableName model.CIStr) (*model.TableInfo, error)
- func (rc *Client) GoCreateTables(ctx context.Context, dom *domain.Domain, tables []*metautil.Table, ...) <-chan CreatedTable
- func (rc *Client) GoValidateChecksum(ctx context.Context, tableStream <-chan CreatedTable, kvClient kv.Client, ...) <-chan struct{}
- func (rc *Client) HasBackedUpSysDB() bool
- func (rc *Client) Init(g glue.Glue, store kv.Storage) error
- func (rc *Client) InitBackupMeta(c context.Context, backupMeta *backuppb.BackupMeta, ...) error
- func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool)
- func (rc *Client) InitFullClusterRestore(explicitFilter bool)
- func (rc *Client) InitSchemasReplaceForDDL(tables *map[int64]*metautil.Table, tableFilter filter.Filter) (*stream.SchemasReplace, error)
- func (rc *Client) InsertDeleteRangeForIndex(jobID int64, elementID *int64, tableID int64, indexIDs []int64)
- func (rc *Client) InsertDeleteRangeForTable(jobID int64, tableIDs []int64)
- func (rc *Client) InsertGCRows(ctx context.Context) error
- func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error
- func (rc *Client) IsFull() bool
- func (rc *Client) IsFullClusterRestore() bool
- func (rc *Client) IsIncremental() bool
- func (rc *Client) IsOnline() bool
- func (rc *Client) IsRawKvMode() bool
- func (rc *Client) IsSkipCreateSQL() bool
- func (rc Client) LoadDDLFilesAndCountDMLFiles(ctx context.Context, counter *int) ([]Log, error)
- func (rc Client) LoadDMLFiles(ctx context.Context) (LogIter, error)
- func (rc *Client) LoadRestoreStores(ctx context.Context) error
- func (rc *Client) PreCheckTableClusterIndex(tables []*metautil.Table, ddlJobs []*model.Job, dom *domain.Domain) error
- func (rc *Client) PreCheckTableTiFlashReplica(ctx context.Context, tables []*metautil.Table, ...) error
- func (rc Client) ReadAllEntries(ctx context.Context, file Log, filterTS uint64) ([]*KvEntryWithTS, []*KvEntryWithTS, error)
- func (rc *Client) ResetPlacementRules(ctx context.Context, tables []*model.TableInfo) error
- func (rc *Client) ResetRestoreLabels(ctx context.Context) error
- func (rc *Client) ResetSpeedLimit(ctx context.Context) error
- func (rc *Client) ResetTS(ctx context.Context, pdCtrl *pdutil.PdController) error
- func (rc *Client) ResetTiFlashReplicas(ctx context.Context, g glue.Glue, storage kv.Storage) error
- func (rc *Client) RestoreBatchMetaKVFiles(ctx context.Context, files []*backuppb.DataFileInfo, ...) ([]*KvEntryWithTS, error)
- func (rc *Client) RestoreKVFiles(ctx context.Context, rules map[int64]*RewriteRules, iter LogIter, ...) error
- func (rc *Client) RestoreMetaKVFiles(ctx context.Context, files []*backuppb.DataFileInfo, ...) error
- func (rc *Client) RestoreMetaKVFilesWithBatchMethod(ctx context.Context, defaultFiles []*backuppb.DataFileInfo, ...) error
- func (rc *Client) RestoreRaw(ctx context.Context, startKey []byte, endKey []byte, files []*backuppb.File, ...) error
- func (rc *Client) RestoreSSTFiles(ctx context.Context, files []*backuppb.File, rewriteRules *RewriteRules, ...) (err error)
- func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter)
- func (rc *Client) RunGCRowsLoader(ctx context.Context)
- func (rc *Client) SaveSchemas(ctx context.Context, sr *stream.SchemasReplace, logStartTS uint64, ...) error
- func (rc *Client) SetBatchDdlSize(batchDdlsize uint)
- func (rc *Client) SetConcurrency(c uint)
- func (rc *Client) SetCrypter(crypter *backuppb.CipherInfo)
- func (rc *Client) SetCurrentTS(ts uint64)
- func (rc *Client) SetPlacementPolicyMode(withPlacementPolicy string)
- func (rc *Client) SetPolicyMap(p *sync.Map)
- func (rc *Client) SetRateLimit(rateLimit uint64)
- func (rc *Client) SetRawKVClient(c *RawKVBatchClient)
- func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, ...) error
- func (rc *Client) SetSwitchModeInterval(interval time.Duration)
- func (rc *Client) SetWithSysTable(withSysTable bool)
- func (rc *Client) SetupPlacementRules(ctx context.Context, tables []*model.TableInfo) error
- func (rc Client) ShiftTS() uint64
- func (rc Client) ShouldFilterOut(d *backuppb.DataFileInfo) bool
- func (rc *Client) SplitRanges(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules, ...) error
- func (rc *Client) SwitchToImportMode(ctx context.Context)
- func (rc *Client) SwitchToNormalMode(ctx context.Context) error
- func (rc *Client) UpdateSchemaVersion(ctx context.Context) error
- func (rc *Client) WaitPlacementSchedule(ctx context.Context, tables []*model.TableInfo) error
- func (rc *Client) WrapLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, g glue.Glue, store kv.Storage) (LogIter, error)
- type Comparator
- type ContextManager
- type CreatedTable
- type DB
- func (db *DB) Close()
- func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error
- func (db *DB) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
- func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, ddlTables map[UniqueTableName]bool, ...) error
- func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table, ...) error
- func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, ...) error
- func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error
- func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error
- type DDLJobFilterRule
- type DDLMetaGroup
- type DrainResult
- type FileGroupInfo
- type FileImporter
- func (importer *FileImporter) CheckMultiIngestSupport(ctx context.Context, pdClient pd.Client) error
- func (importer *FileImporter) ClearFiles(ctx context.Context, pdClient pd.Client, prefix string) error
- func (importer *FileImporter) ImportKVFileForRegion(ctx context.Context, files []*backuppb.DataFileInfo, rule *RewriteRules, ...) RPCResult
- func (importer *FileImporter) ImportKVFiles(ctx context.Context, files []*backuppb.DataFileInfo, rule *RewriteRules, ...) error
- func (importer *FileImporter) ImportSSTFiles(ctx context.Context, files []*backuppb.File, rewriteRules *RewriteRules, ...) error
- func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error
- type FilesInRegion
- type FilesInTable
- type ImporterClient
- type KVPair
- type KvEntryWithTS
- type Log
- type LogFileManagerInit
- type LogFilesIterWithSplitHelper
- type LogIter
- type LogSplitHelper
- type MergeRangesStat
- type Meta
- type MetaGroupIter
- type MetaIter
- type MetadataInfo
- type OnSplitFunc
- type OverRegionsInRangeController
- type RPCResult
- type Range
- type RawKVBatchClient
- type RawkvClient
- type RecoverRegion
- type RecoverRegionInfo
- type Recovery
- func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error)
- func (recovery *Recovery) GetTotalRegions() int
- func (recovery *Recovery) MakeRecoveryPlan() error
- func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error)
- func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error
- func (recovery *Recovery) RecoverRegions(ctx context.Context) (err error)
- func (recovery *Recovery) WaitApply(ctx context.Context) (err error)
- type RegionFunc
- type RegionSplitter
- func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*split.RegionInfo)
- func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRegions []*split.RegionInfo, backoffer utils.Backoffer)
- func (rs *RegionSplitter) Split(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules, ...) error
- type RetryStrategy
- type RewriteRules
- type SendType
- type StoreMeta
- type StreamBackupSearch
- type StreamKVInfo
- type StreamMetadataSet
- func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *FileGroupInfo) (shouldBreak bool))
- func (ms *StreamMetadataSet) LoadFrom(ctx context.Context, s storage.ExternalStorage) error
- func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s storage.ExternalStorage, until uint64) (uint64, error)
- func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context.Context, from uint64, storage storage.ExternalStorage, ...) ([]string, error)
- type TableSink
- type TableWithRange
- type TiKVRestorer
- type UniqueTableName
Constants ¶
const (
MetaKVBatchSize = 64 * 1024 * 1024
)
const SplitFilesBufferSize = 4096
const (
// TruncateSafePointFileName is the filename that the ts(the log have been truncated) is saved into.
TruncateSafePointFileName = "v1_stream_trancate_safepoint.txt"
)
Variables ¶
This section is empty.
Functions ¶
func CheckConsistencyAndValidPeer ¶
func CheckConsistencyAndValidPeer(regionInfos []*RecoverRegionInfo) (map[uint64]struct{}, error)
func CheckNewCollationEnable ¶
func CreateLogFileManager ¶
func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFileManager, error)
CreateLogFileManager creates a log file manager using the specified config. Generally the config cannot be changed during its lifetime.
func DDLJobBlockListRule ¶
DDLJobBlockListRule rule for filter ddl job with type in block list.
func EstimateRangeSize ¶
EstimateRangeSize estimates the total range count by file.
func FilterDDLJobByRules ¶
func FilterDDLJobByRules(srcDDLJobs []*model.Job, rules ...DDLJobFilterRule) (dstDDLJobs []*model.Job)
FilterDDLJobByRules if one of rules returns true, the job in srcDDLJobs will be filtered.
func FilterDDLJobs ¶
FilterDDLJobs filters ddl jobs.
func FilterFilesByRegion ¶
func FilterFilesByRegion( files []*backuppb.DataFileInfo, ranges []kv.KeyRange, r *split.RegionInfo, ) ([]*backuppb.DataFileInfo, error)
func GetExistedUserDBs ¶
GetExistedUserDBs get dbs created or modified by users
func GetRewriteEncodedKeys ¶
func GetRewriteEncodedKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error)
GetRewriteRawKeys rewrites rules to the encoded key
func GetRewriteRawKeys ¶
func GetRewriteRawKeys(file AppliedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error)
GetRewriteRawKeys rewrites rules to the raw key.
func GetRewriteRulesMap ¶
func GetRewriteTableID ¶
func GetRewriteTableID(tableID int64, rewriteRules *RewriteRules) int64
GetRewriteTableID gets rewrite table id by the rewrite rule and original table id
func GetSSTMetaFromFile ¶
func GetSSTMetaFromFile( id []byte, file *backuppb.File, region *metapb.Region, regionRule *import_sstpb.RewriteRule, ) import_sstpb.SSTMeta
GetSSTMetaFromFile compares the keys in file, region and rewrite rules, then returns a sst conn. The range of the returned sst meta is [regionRule.NewKeyPrefix, append(regionRule.NewKeyPrefix, 0xff)].
func GetTSFromFile ¶
func GetTSFromFile( ctx context.Context, s storage.ExternalStorage, filename string, ) (uint64, error)
GetTSFromFile gets the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.
func GoValidateFileRanges ¶
func GoValidateFileRanges( ctx context.Context, tableStream <-chan CreatedTable, fileOfTable map[int64][]*backuppb.File, splitSizeBytes, splitKeyCount uint64, errCh chan<- error, ) <-chan TableWithRange
GoValidateFileRanges validate files by a stream of tables and yields tables with range.
func MapTableToFiles ¶
MapTableToFiles makes a map that mapping table ID to its backup files. aware that one file can and only can hold one table.
func MockCallSetSpeedLimit ¶
func MockCallSetSpeedLimit(ctx context.Context, fakeImportClient ImporterClient, rc *Client, concurrency uint) error
Mock the call of setSpeedLimit function
func NeedSplit ¶
func NeedSplit(splitKey []byte, regions []*split.RegionInfo, isRawKv bool) *split.RegionInfo
NeedSplit checks whether a key is necessary to split, if true returns the split region.
func NeedsMerge ¶
NeedsMerge checks whether two ranges needs to be merged.
func NewSplitHelperIteratorForTest ¶
func NewSplitHelperIteratorForTest(helper *split.SplitHelper, tableID int64, rule *RewriteRules) *splitHelperIterator
func ParseQuoteName ¶
ParseQuoteName parse the quote `db`.`table` name, and split it.
func PrefixEndKey ¶
func PrefixStartKey ¶
func RecoverData ¶
func RecoverData(ctx context.Context, resolveTS uint64, allStores []*metapb.Store, mgr *conn.Mgr, progress glue.Progress, restoreTS uint64, concurrency uint32) (int, error)
RecoverData recover the tikv cluster 1. read all meta data from tikvs 2. make recovery plan and then recovery max allocate ID firstly 3. send the recover plan and the wait tikv to apply, in waitapply, all assigned region leader will check apply log to the last log 4. ensure all region apply to last log 5. prepare the flashback 6. flashback to resolveTS
func ReplaceMetadata ¶
func ReplaceMetadata(meta *backuppb.Metadata, filegroups []*backuppb.DataFileGroup)
replace the filegroups and update the ts of the replaced metadata
func SetTSToFile ¶
func SetTSToFile( ctx context.Context, s storage.ExternalStorage, safepoint uint64, filename string, ) error
SetTSToFile overrides the current truncate safepoint. truncate safepoint is the TS used for last truncating: which means logs before this TS would probably be deleted or incomplete.
func SortMetaKVFiles ¶
func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo
func SortRanges ¶
SortRanges checks if the range overlapped and sort them.
func SplitPoint ¶
func SplitPoint( ctx context.Context, iter *splitHelperIterator, client split.SplitClient, splitF splitFunc, ) (err error)
SplitPoint selects ranges overlapped with each region, and calls `splitF` to split the region
func SplitRanges ¶
func SplitRanges( ctx context.Context, client *Client, ranges []rtree.Range, rewriteRules *RewriteRules, updateCh glue.Progress, isRawKv bool, ) error
SplitRanges splits region by 1. data range after rewrite. 2. rewrite rules.
func TidyOldSchemas ¶
func TidyOldSchemas(sr *stream.SchemasReplace) *backup.Schemas
TidyOldSchemas produces schemas information.
func TruncateTS ¶
func UpdateShiftTS ¶
func ValidateFileRewriteRule ¶
func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) error
ValidateFileRewriteRule uses rewrite rules to validate the ranges of a file.
func ZapTables ¶
func ZapTables(tables []CreatedTable) zapcore.Field
ZapTables make zap field of table for debuging, including table names.
Types ¶
type AppliedFile ¶
AppliedFile has two types for now. 1. SST file used by full backup/restore. 2. KV file used by pitr restore.
type BatchSender ¶
type BatchSender interface { // PutSink sets the sink of this sender, user to this interface promise // call this function at least once before first call to `RestoreBatch`. PutSink(sink TableSink) // RestoreBatch will send the restore request. RestoreBatch(ranges DrainResult) Close() }
BatchSender is the abstract of how the batcher send a batch.
func NewTiKVSender ¶
func NewTiKVSender( ctx context.Context, cli TiKVRestorer, updateCh glue.Progress, splitConcurrency uint, ) (BatchSender, error)
NewTiKVSender make a sender that send restore requests to TiKV.
type Batcher ¶
type Batcher struct {
// contains filtered or unexported fields
}
Batcher collects ranges to restore and send batching split/ingest request.
func NewBatcher ¶
func NewBatcher( ctx context.Context, sender BatchSender, manager ContextManager, errCh chan<- error, ) (*Batcher, <-chan CreatedTable)
NewBatcher creates a new batcher by a sender and a context manager. the former defines how the 'restore' a batch(i.e. send, or 'push down' the task to where). the context manager defines the 'lifetime' of restoring tables(i.e. how to enter 'restore' mode, and how to exit). this batcher will work background, send batches per second, or batch size reaches limit. and it will emit full-restored tables to the output channel returned.
func (*Batcher) Close ¶
func (b *Batcher) Close()
Close closes the batcher, sending all pending requests, close updateCh.
func (*Batcher) DisableAutoCommit ¶
func (b *Batcher) DisableAutoCommit()
DisableAutoCommit blocks the current goroutine until the worker can gracefully stop, and then disable auto commit.
func (*Batcher) EnableAutoCommit ¶
EnableAutoCommit enables the batcher commit batch periodically even batcher size isn't big enough. we make this function for disable AutoCommit in some case.
func (*Batcher) Send ¶
Send sends all pending requests in the batcher. returns tables sent FULLY in the current batch.
func (*Batcher) SetThreshold ¶
SetThreshold sets the threshold that how big the batch size reaching need to send batch. note this function isn't goroutine safe yet, just set threshold before anything starts(e.g. EnableAutoCommit), please.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client sends requests to restore files.
func MockClient ¶
MockClient create a fake client used to test.
func NewRestoreClient ¶
func NewRestoreClient( pdClient pd.Client, tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters, isRawKv bool, ) *Client
NewRestoreClient returns a new RestoreClient.
func (*Client) CheckSysTableCompatibility ¶
func (*Client) CheckTargetClusterFresh ¶
CheckTargetClusterFresh check whether the target cluster is fresh or not if there's no user dbs or tables, we take it as a fresh cluster, although user may have created some users or made other changes.
func (*Client) ClearSystemUsers ¶
ClearSystemUsers is used for volume-snapshot restoration. because we can not support restore user in some scenarios, for example in cloud. we'd better use this function to drop cloud_admin user after volume-snapshot restore.
func (*Client) CreateDatabase ¶
CreateDatabase creates a database.
func (*Client) CreatePolicies ¶
CreatePolicies creates all policies in full restore.
func (*Client) CreateTables ¶
func (rc *Client) CreateTables( dom *domain.Domain, tables []*metautil.Table, newTS uint64, ) (*RewriteRules, []*model.TableInfo, error)
CreateTables creates multiple tables, and returns their rewrite rules.
func (*Client) EnableOnline ¶
func (rc *Client) EnableOnline()
EnableOnline sets the mode of restore to online.
func (*Client) EnableSkipCreateSQL ¶
func (rc *Client) EnableSkipCreateSQL()
EnableSkipCreateSQL sets switch of skip create schema and tables.
func (Client) FilterDataFiles ¶
func (Client) FilterMetaFiles ¶
func (rc Client) FilterMetaFiles(ms MetaIter) MetaGroupIter
func (*Client) FixIndicesOfTable ¶
func (rc *Client) FixIndicesOfTable(ctx context.Context, schema string, table *model.TableInfo) error
FixIndicdesOfTable tries to fix the indices of the table via `ADMIN RECOVERY INDEX`.
func (*Client) FixIndicesOfTables ¶
func (rc *Client) FixIndicesOfTables( ctx context.Context, fullBackupTables map[int64]*metautil.Table, onProgress func(), ) error
FixIndicesOfTables tries to fix the indices of the tables via `ADMIN RECOVERY INDEX`.
func (*Client) GenGlobalID ¶
GenGlobalID generates a global id by transaction way.
func (*Client) GenGlobalIDs ¶
GenGlobalIDs generates several global ids by transaction way.
func (*Client) GenerateRebasedTables ¶
GenerateRebasedTables generate a map[UniqueTableName]bool to represent tables that haven't updated table info. there are two situations: 1. tables that already exists in the restored cluster. 2. tables that are created by executing ddl jobs. so, only tables in incremental restoration will be added to the map
func (*Client) GetBatchDdlSize ¶
func (*Client) GetDBSchema ¶
GetDBSchema gets the schema of a db from TiDB cluster
func (*Client) GetDDLJobs ¶
GetDDLJobs returns ddl jobs.
func (*Client) GetDatabase ¶
GetDatabase returns a database by name.
func (*Client) GetDatabases ¶
GetDatabases returns all databases.
func (*Client) GetFilesInRawRange ¶
func (rc *Client) GetFilesInRawRange(startKey []byte, endKey []byte, cf string) ([]*backuppb.File, error)
GetFilesInRawRange gets all files that are in the given range or intersects with the given range.
func (*Client) GetPDClient ¶
GetPDClient returns a pd client.
func (*Client) GetPlacementPolicies ¶
GetPlacementPolicies returns policies.
func (*Client) GetPlacementRules ¶
GetPlacementRules return the current placement rules.
func (*Client) GetPolicyMap ¶
GetPolicyMap set policyMap.
func (*Client) GetRebasedTables ¶
func (rc *Client) GetRebasedTables() map[UniqueTableName]bool
GetRebasedTables returns tables that may need to be rebase auto increment id or auto random id
func (*Client) GetSupportPolicy ¶
GetSupportPolicy tells whether target tidb support placement policy.
func (*Client) GetTLSConfig ¶
GetTLSConfig returns the tls config.
func (*Client) GetTSWithRetry ¶
GetTSWithRetry gets a new timestamp with retry from PD.
func (*Client) GetTableSchema ¶
func (rc *Client) GetTableSchema( dom *domain.Domain, dbName model.CIStr, tableName model.CIStr, ) (*model.TableInfo, error)
GetTableSchema returns the schema of a table from TiDB.
func (*Client) GoCreateTables ¶
func (rc *Client) GoCreateTables( ctx context.Context, dom *domain.Domain, tables []*metautil.Table, newTS uint64, errCh chan<- error, ) <-chan CreatedTable
GoCreateTables create tables, and generate their information. this function will use workers as the same number of sessionPool, leave sessionPool nil to send DDLs sequential.
func (*Client) GoValidateChecksum ¶
func (rc *Client) GoValidateChecksum( ctx context.Context, tableStream <-chan CreatedTable, kvClient kv.Client, errCh chan<- error, updateCh glue.Progress, concurrency uint, ) <-chan struct{}
GoValidateChecksum forks a goroutine to validate checksum after restore. it returns a channel fires a struct{} when all things get done.
func (*Client) HasBackedUpSysDB ¶
HasBackedUpSysDB whether we have backed up system tables br backs system tables up since 5.1.0
func (*Client) InitBackupMeta ¶
func (rc *Client) InitBackupMeta( c context.Context, backupMeta *backuppb.BackupMeta, backend *backuppb.StorageBackend, reader *metautil.MetaReader) error
InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient.
func (*Client) InitClients ¶
func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool)
func (*Client) InitFullClusterRestore ¶
InitFullClusterRestore init fullClusterRestore and set SkipGrantTable as needed
func (*Client) InitSchemasReplaceForDDL ¶
func (rc *Client) InitSchemasReplaceForDDL( tables *map[int64]*metautil.Table, tableFilter filter.Filter, ) (*stream.SchemasReplace, error)
InitSchemasReplaceForDDL gets schemas information Mapping from old schemas to new schemas. It is used to rewrite meta kv-event.
func (*Client) InsertDeleteRangeForIndex ¶
func (rc *Client) InsertDeleteRangeForIndex(jobID int64, elementID *int64, tableID int64, indexIDs []int64)
InsertDeleteRangeForIndex generates query to insert index delete job into table `gc_delete_range`.
func (*Client) InsertDeleteRangeForTable ¶
InsertDeleteRangeForTable generates query to insert table delete job into table `gc_delete_range`.
func (*Client) InsertGCRows ¶
InsertGCRows insert the querys into table `gc_delete_range`
func (*Client) InstallLogFileManager ¶
func (*Client) IsFullClusterRestore ¶
func (*Client) IsIncremental ¶
IsIncremental returns whether this backup is incremental.
func (*Client) IsRawKvMode ¶
IsRawKvMode checks whether the backup data is in raw kv format, in which case transactional recover is forbidden.
func (*Client) IsSkipCreateSQL ¶
IsSkipCreateSQL returns whether we need skip create schema and tables in restore.
func (Client) LoadDDLFilesAndCountDMLFiles ¶
LoadDDLFilesAndCountDMLFiles loads all DDL files needs to be restored in the restoration. At the same time, if the `counter` isn't nil, counting the DML file needs to be restored into `counter`. This function returns all DDL files needing directly because we need sort all of them.
func (Client) LoadDMLFiles ¶
LoadDMLFiles loads all DML files needs to be restored in the restoration. This function returns a stream, because there are usually many DML files need to be restored.
func (*Client) LoadRestoreStores ¶
LoadRestoreStores loads the stores used to restore data.
func (*Client) PreCheckTableClusterIndex ¶
func (rc *Client) PreCheckTableClusterIndex( tables []*metautil.Table, ddlJobs []*model.Job, dom *domain.Domain, ) error
PreCheckTableClusterIndex checks whether backup tables and existed tables have different cluster index options。
func (*Client) PreCheckTableTiFlashReplica ¶
func (rc *Client) PreCheckTableTiFlashReplica( ctx context.Context, tables []*metautil.Table, recorder *tiflashrec.TiFlashRecorder, ) error
PreCheckTableTiFlashReplica checks whether TiFlash replica is less than TiFlash node.
func (Client) ReadAllEntries ¶
func (rc Client) ReadAllEntries( ctx context.Context, file Log, filterTS uint64, ) ([]*KvEntryWithTS, []*KvEntryWithTS, error)
ReadAllEntries loads content of a log file, with filtering out no needed entries.
func (*Client) ResetPlacementRules ¶
ResetPlacementRules removes placement rules for tables.
func (*Client) ResetRestoreLabels ¶
ResetRestoreLabels removes the exclusive labels of the restore stores.
func (*Client) ResetTiFlashReplicas ¶
func (*Client) RestoreBatchMetaKVFiles ¶
func (rc *Client) RestoreBatchMetaKVFiles( ctx context.Context, files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, kvEntries []*KvEntryWithTS, filterTS uint64, updateStats func(kvCount uint64, size uint64), progressInc func(), cf string, ) ([]*KvEntryWithTS, error)
func (*Client) RestoreKVFiles ¶
func (*Client) RestoreMetaKVFiles ¶
func (rc *Client) RestoreMetaKVFiles( ctx context.Context, files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, updateStats func(kvCount uint64, size uint64), progressInc func(), ) error
RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.
func (*Client) RestoreMetaKVFilesWithBatchMethod ¶
func (rc *Client) RestoreMetaKVFilesWithBatchMethod( ctx context.Context, defaultFiles []*backuppb.DataFileInfo, writeFiles []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, updateStats func(kvCount uint64, size uint64), progressInc func(), restoreBatch func( ctx context.Context, files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, kvEntries []*KvEntryWithTS, filterTS uint64, updateStats func(kvCount uint64, size uint64), progressInc func(), cf string, ) ([]*KvEntryWithTS, error), ) error
func (*Client) RestoreRaw ¶
func (rc *Client) RestoreRaw( ctx context.Context, startKey []byte, endKey []byte, files []*backuppb.File, updateCh glue.Progress, ) error
RestoreRaw tries to restore raw keys in the specified range.
func (*Client) RestoreSSTFiles ¶
func (rc *Client) RestoreSSTFiles( ctx context.Context, files []*backuppb.File, rewriteRules *RewriteRules, updateCh glue.Progress, ) (err error)
RestoreSSTFiles tries to restore the files.
func (*Client) RestoreSystemSchemas ¶
RestoreSystemSchemas restores the system schema(i.e. the `mysql` schema). Detail see https://github.com/pingcap/br/issues/679#issuecomment-762592254.
func (*Client) RunGCRowsLoader ¶
use channel to save the delete-range query to make it thread-safety.
func (*Client) SaveSchemas ¶
func (*Client) SetBatchDdlSize ¶
func (*Client) SetConcurrency ¶
SetConcurrency sets the concurrency of dbs tables files.
func (*Client) SetCrypter ¶
func (rc *Client) SetCrypter(crypter *backuppb.CipherInfo)
func (*Client) SetCurrentTS ¶
func (*Client) SetPlacementPolicyMode ¶
SetPlacementPolicyMode to policy mode.
func (*Client) SetPolicyMap ¶
SetPolicyMap set policyMap.
func (*Client) SetRateLimit ¶
SetRateLimit to set rateLimit.
func (*Client) SetRawKVClient ¶
func (rc *Client) SetRawKVClient(c *RawKVBatchClient)
func (*Client) SetStorage ¶
func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error
func (*Client) SetSwitchModeInterval ¶
SetSwitchModeInterval set switch mode interval for client.
func (*Client) SetWithSysTable ¶
func (*Client) SetupPlacementRules ¶
SetupPlacementRules sets rules for the tables' regions.
func (Client) ShouldFilterOut ¶
func (rc Client) ShouldFilterOut(d *backuppb.DataFileInfo) bool
ShouldFilterOut checks whether a file should be filtered out via the current client.
func (*Client) SplitRanges ¶
func (rc *Client) SplitRanges(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules, updateCh glue.Progress, isRawKv bool) error
SplitRanges implements TiKVRestorer.
func (*Client) SwitchToImportMode ¶
SwitchToImportMode switch tikv cluster to import mode.
func (*Client) SwitchToNormalMode ¶
SwitchToNormalMode switch tikv cluster to normal mode.
func (*Client) UpdateSchemaVersion ¶
UpdateSchemaVersion updates schema version by transaction way.
func (*Client) WaitPlacementSchedule ¶
WaitPlacementSchedule waits PD to move tables to restore stores.
type Comparator ¶
Comparator is used for comparing the relationship of src and dst
func NewStartWithComparator ¶
func NewStartWithComparator() Comparator
NewStartWithComparator create a comparator to compare whether src starts with dst
type ContextManager ¶
type ContextManager interface { // Enter make some tables 'enter' this context(a.k.a., prepare for restore). Enter(ctx context.Context, tables []CreatedTable) error // Leave make some tables 'leave' this context(a.k.a., restore is done, do some post-works). Leave(ctx context.Context, tables []CreatedTable) error // Close closes the context manager, sometimes when the manager is 'killed' and should do some cleanup // it would be call. Close(ctx context.Context) }
ContextManager is the struct to manage a TiKV 'context' for restore. Batcher will call Enter when any table should be restore on batch, so you can do some prepare work here(e.g. set placement rules for online restore).
func NewBRContextManager ¶
func NewBRContextManager(client *Client) ContextManager
NewBRContextManager makes a BR context manager, that is, set placement rules for online restore when enter(see <splitPrepareWork>), unset them when leave.
type CreatedTable ¶
type CreatedTable struct { RewriteRule *RewriteRules Table *model.TableInfo OldTable *metautil.Table }
CreatedTable is a table created on restore process, but not yet filled with data.
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB is a TiDB instance, not thread-safe.
func (*DB) CreateDatabase ¶
CreateDatabase executes a CREATE DATABASE SQL.
func (*DB) CreatePlacementPolicy ¶
CreatePlacementPolicy check whether cluster support policy and create the policy.
func (*DB) CreateTable ¶
func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error
CreateTable executes a CREATE TABLE SQL.
func (*DB) CreateTablePostRestore ¶
func (*DB) CreateTables ¶
func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error
CreateTables execute a internal CREATE TABLES.
type DDLJobFilterRule ¶
type DDLMetaGroup ¶
type DDLMetaGroup struct { Path string FileMetas []*backuppb.DataFileInfo }
type DrainResult ¶
type DrainResult struct { // TablesToSend are tables that would be send at this batch. TablesToSend []CreatedTable // BlankTablesAfterSend are tables that will be full-restored after this batch send. BlankTablesAfterSend []CreatedTable RewriteRules *RewriteRules Ranges []rtree.Range }
DrainResult is the collection of some ranges and theirs metadata.
func (DrainResult) Files ¶
func (result DrainResult) Files() []*backuppb.File
Files returns all files of this drain result.
type FileGroupInfo ¶
keep these meta-information for statistics and filtering
type FileImporter ¶
type FileImporter struct {
// contains filtered or unexported fields
}
FileImporter used to import a file to TiKV.
func NewFileImporter ¶
func NewFileImporter( metaClient split.SplitClient, importClient ImporterClient, backend *backuppb.StorageBackend, isRawKvMode bool, ) FileImporter
NewFileImporter returns a new file importClient.
func (*FileImporter) CheckMultiIngestSupport ¶
func (importer *FileImporter) CheckMultiIngestSupport(ctx context.Context, pdClient pd.Client) error
CheckMultiIngestSupport checks whether all stores support multi-ingest
func (*FileImporter) ClearFiles ¶
func (*FileImporter) ImportKVFileForRegion ¶
func (importer *FileImporter) ImportKVFileForRegion( ctx context.Context, files []*backuppb.DataFileInfo, rule *RewriteRules, shiftStartTS uint64, startTS uint64, restoreTS uint64, info *split.RegionInfo, supportBatch bool, ) RPCResult
Import tries to import a file.
func (*FileImporter) ImportKVFiles ¶
func (importer *FileImporter) ImportKVFiles( ctx context.Context, files []*backuppb.DataFileInfo, rule *RewriteRules, shiftStartTS uint64, startTS uint64, restoreTS uint64, supportBatch bool, ) error
ImportKVFiles restores the kv events.
func (*FileImporter) ImportSSTFiles ¶
func (importer *FileImporter) ImportSSTFiles( ctx context.Context, files []*backuppb.File, rewriteRules *RewriteRules, cipher *backuppb.CipherInfo, apiVersion kvrpcpb.APIVersion, ) error
ImportSSTFiles tries to import a file. All rules must contain encoded keys.
func (*FileImporter) SetRawRange ¶
func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error
SetRawRange sets the range to be restored in raw kv mode.
type FilesInRegion ¶
type FilesInRegion struct {
// contains filtered or unexported fields
}
type FilesInTable ¶
type FilesInTable struct {
// contains filtered or unexported fields
}
type ImporterClient ¶
type ImporterClient interface { ClearFiles( ctx context.Context, storeID uint64, req *import_sstpb.ClearRequest, ) (*import_sstpb.ClearResponse, error) ApplyKVFile( ctx context.Context, storeID uint64, req *import_sstpb.ApplyRequest, ) (*import_sstpb.ApplyResponse, error) DownloadSST( ctx context.Context, storeID uint64, req *import_sstpb.DownloadRequest, ) (*import_sstpb.DownloadResponse, error) IngestSST( ctx context.Context, storeID uint64, req *import_sstpb.IngestRequest, ) (*import_sstpb.IngestResponse, error) MultiIngest( ctx context.Context, storeID uint64, req *import_sstpb.MultiIngestRequest, ) (*import_sstpb.IngestResponse, error) SetDownloadSpeedLimit( ctx context.Context, storeID uint64, req *import_sstpb.SetDownloadSpeedLimitRequest, ) (*import_sstpb.SetDownloadSpeedLimitResponse, error) GetImportClient( ctx context.Context, storeID uint64, ) (import_sstpb.ImportSSTClient, error) SupportMultiIngest(ctx context.Context, stores []uint64) (bool, error) }
ImporterClient is used to import a file to TiKV.
func NewImportClient ¶
func NewImportClient(metaClient split.SplitClient, tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters) ImporterClient
NewImportClient returns a new ImporterClient.
type KvEntryWithTS ¶
type KvEntryWithTS struct {
// contains filtered or unexported fields
}
the kv entry with ts, the ts is decoded from entry.
type Log ¶
type Log = *backuppb.DataFileInfo
Log is the metadata of one file recording KV sequences.
type LogFileManagerInit ¶
type LogFileManagerInit struct { StartTS uint64 RestoreTS uint64 Storage storage.ExternalStorage }
LogFileManagerInit is the config needed for initializing the log file manager.
type LogFilesIterWithSplitHelper ¶
type LogFilesIterWithSplitHelper struct {
// contains filtered or unexported fields
}
func (*LogFilesIterWithSplitHelper) TryNext ¶
func (splitIter *LogFilesIterWithSplitHelper) TryNext(ctx context.Context) iter.IterResult[*backuppb.DataFileInfo]
type LogIter ¶
type LogIter = iter.TryNextor[*backuppb.DataFileInfo]
LogIter is the type of iterator of each log files' meta information.
func NewLogFilesIterWithSplitHelper ¶
func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) LogIter
type LogSplitHelper ¶
type LogSplitHelper struct {
// contains filtered or unexported fields
}
func NewLogSplitHelper ¶
func NewLogSplitHelper(rules map[int64]*RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) *LogSplitHelper
func (*LogSplitHelper) Merge ¶
func (helper *LogSplitHelper) Merge(file *backuppb.DataFileInfo)
type MergeRangesStat ¶
type MergeRangesStat struct { TotalFiles int TotalWriteCFFile int TotalDefaultCFFile int TotalRegions int RegionKeysAvg int RegionBytesAvg int MergedRegions int MergedRegionKeysAvg int MergedRegionBytesAvg int }
MergeRangesStat holds statistics for the MergeRanges.
func MergeFileRanges ¶
func MergeFileRanges( files []*backuppb.File, splitSizeBytes, splitKeyCount uint64, ) ([]rtree.Range, *MergeRangesStat, error)
MergeFileRanges returns ranges of the files are merged based on splitSizeBytes and splitKeyCount.
By merging small ranges, it speeds up restoring a backup that contains many small ranges (regions) as it reduces split region and scatter region.
type MetaGroupIter ¶
type MetaGroupIter = iter.TryNextor[DDLMetaGroup]
MetaGroupIter is the iterator of flushes of metadata.
type MetadataInfo ¶
type MetadataInfo struct { MinTS uint64 FileGroupInfos []*FileGroupInfo }
keep these meta-information for statistics and filtering
type OverRegionsInRangeController ¶
type OverRegionsInRangeController struct {
// contains filtered or unexported fields
}
func OverRegionsInRange ¶
func OverRegionsInRange(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) OverRegionsInRangeController
OverRegionsInRange creates a controller that cloud be used to scan regions in a range and apply a function over these regions. You can then call the `Run` method for applying some functions.
func (*OverRegionsInRangeController) Run ¶
func (o *OverRegionsInRangeController) Run(ctx context.Context, f RegionFunc) error
Run executes the `regionFunc` over the regions in `o.start` and `o.end`. It would retry the errors according to the `rpcResponse`.
type RPCResult ¶
RPCResult is the result after executing some RPCs to TiKV.
func RPCResultFromError ¶
func RPCResultFromPBError ¶
func RPCResultFromPBError(err *import_sstpb.Error) RPCResult
func RPCResultOK ¶
func RPCResultOK() RPCResult
func (*RPCResult) StrategyForRetry ¶
func (r *RPCResult) StrategyForRetry() RetryStrategy
func (*RPCResult) StrategyForRetryGoError ¶
func (r *RPCResult) StrategyForRetryGoError() RetryStrategy
func (*RPCResult) StrategyForRetryStoreError ¶
func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy
type Range ¶
Range record start and end key for localStoreDir.DB so we can write it to tikv in streaming
type RawKVBatchClient ¶
type RawKVBatchClient struct {
// contains filtered or unexported fields
}
RawKVBatchClient is used to put raw kv-entry into tikv. Note: it is not thread safe.
func NewRawKVBatchClient ¶
func NewRawKVBatchClient( rawkvClient RawkvClient, batchCount int, ) *RawKVBatchClient
NewRawKVBatchClient create a batch rawkv client.
func (*RawKVBatchClient) Close ¶
func (c *RawKVBatchClient) Close()
Close closes the RawKVBatchClient.
func (*RawKVBatchClient) Put ¶
Put puts (key, value) into buffer justly, wait for batch write if the buffer is full.
func (*RawKVBatchClient) PutRest ¶
func (c *RawKVBatchClient) PutRest(ctx context.Context) error
PutRest writes the rest pairs (key, values) into tikv.
func (*RawKVBatchClient) SetColumnFamily ¶
func (c *RawKVBatchClient) SetColumnFamily(columnFamily string)
SetColumnFamily set the columnFamily for the client.
type RawkvClient ¶
type RawkvClient interface { Get(ctx context.Context, key []byte, options ...rawkv.RawOption) ([]byte, error) Put(ctx context.Context, key, value []byte, options ...rawkv.RawOption) error BatchGet(ctx context.Context, keys [][]byte, options ...rawkv.RawOption) ([][]byte, error) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error Close() error }
RawkvClient is the interface for rawkv.client
func NewRawkvClient ¶
func NewRawkvClient(ctx context.Context, pdAddrs []string, security config.Security) (RawkvClient, error)
NewRawkvClient create a rawkv client.
type RecoverRegion ¶
type RecoverRegion struct { *recovpb.RegionMeta StoreId uint64 }
func LeaderCandidates ¶
func LeaderCandidates(peers []*RecoverRegion) ([]*RecoverRegion, error)
in cloud, since iops and bandwidth limitation, write operator in raft is slow, so raft state (logterm, lastlog, commitlog...) are the same among the peers LeaderCandidates select all peers can be select as a leader during the restore
func SelectRegionLeader ¶
func SelectRegionLeader(storeBalanceScore map[uint64]int, peers []*RecoverRegion) *RecoverRegion
for region A, has candidate leader x, y, z peer x on store 1 with storeBalanceScore 3 peer y on store 3 with storeBalanceScore 2 peer z on store 4 with storeBalanceScore 1 result: peer z will be select as leader on store 4
type RecoverRegionInfo ¶
type RecoverRegionInfo struct { RegionId uint64 RegionVersion uint64 StartKey []byte EndKey []byte TombStone bool }
func SortRecoverRegions ¶
func SortRecoverRegions(regions map[uint64][]*RecoverRegion) []*RecoverRegionInfo
type Recovery ¶
type Recovery struct { StoreMetas []StoreMeta RecoveryPlan map[uint64][]*recovpb.RecoverRegionRequest MaxAllocID uint64 // contains filtered or unexported fields }
for test
func NewRecovery ¶
func (*Recovery) FlashbackToVersion ¶
func (recovery *Recovery) FlashbackToVersion(ctx context.Context, resolveTS uint64, commitTS uint64) (err error)
flashback the region data to version resolveTS
func (*Recovery) GetTotalRegions ¶
func (*Recovery) MakeRecoveryPlan ¶
generate the related the recovery plan to tikvs: 1. check overlap the region, make a recovery decision 2. build a leader list for all region during the tikv startup 3. get max allocate id
func (*Recovery) PrepareFlashbackToVersion ¶
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error)
prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (*Recovery) ReadRegionMeta ¶
ReadRegionMeta read all region meta from tikvs
func (*Recovery) RecoverRegions ¶
RecoverRegions send the recovery plan to recovery region (force leader etc) only tikvs have regions whose have to recover be sent
type RegionFunc ¶
type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult
type RegionSplitter ¶
type RegionSplitter struct {
// contains filtered or unexported fields
}
RegionSplitter is a executor of region split by rules.
func NewRegionSplitter ¶
func NewRegionSplitter(client split.SplitClient) *RegionSplitter
NewRegionSplitter returns a new RegionSplitter.
func (*RegionSplitter) ScatterRegions ¶
func (rs *RegionSplitter) ScatterRegions(ctx context.Context, newRegions []*split.RegionInfo)
ScatterRegions scatter the regions.
func (*RegionSplitter) ScatterRegionsWithBackoffer ¶
func (rs *RegionSplitter) ScatterRegionsWithBackoffer(ctx context.Context, newRegions []*split.RegionInfo, backoffer utils.Backoffer)
ScatterRegionsWithBackoffer scatter the region with some backoffer. This function is for testing the retry mechanism. For a real cluster, directly use ScatterRegions would be fine.
func (*RegionSplitter) Split ¶
func (rs *RegionSplitter) Split( ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules, isRawKv bool, onSplit OnSplitFunc, ) error
Split executes a region split. It will split regions by the rewrite rules, then it will split regions by the end key of each range. tableRules includes the prefix of a table, since some ranges may have a prefix with record sequence or index sequence. note: all ranges and rewrite rules must have raw key.
type RetryStrategy ¶
type RetryStrategy int
const ( StrategyGiveUp RetryStrategy = iota StrategyFromThisRegion StrategyFromStart )
type RewriteRules ¶
type RewriteRules struct {
Data []*import_sstpb.RewriteRule
}
RewriteRules contains rules for rewriting keys of tables.
func EmptyRewriteRule ¶
func EmptyRewriteRule() *RewriteRules
EmptyRewriteRule make a new, empty rewrite rule.
func GetRewriteRuleOfTable ¶
func GetRewriteRuleOfTable( oldTableID, newTableID int64, newTimeStamp uint64, indexIDs map[int64]int64, getDetailRule bool, ) *RewriteRules
GetRewriteRuleOfTable returns a rewrite rule from t_{oldID} to t_{newID}.
func GetRewriteRules ¶
func GetRewriteRules( newTable, oldTable *model.TableInfo, newTimeStamp uint64, getDetailRule bool, ) *RewriteRules
GetRewriteRules returns the rewrite rule of the new table and the old table. getDetailRule is used for normal backup & restore. if set to true, means we collect the rules like tXXX_r, tYYY_i. if set to false, means we only collect the rules contain table_id, tXXX, tYYY.
func (*RewriteRules) Append ¶
func (r *RewriteRules) Append(other RewriteRules)
Append append its argument to this rewrite rules.
type SendType ¶
type SendType int
SendType is the 'type' of a send. when we make a 'send' command to worker, we may want to flush all pending ranges (when auto commit enabled), or, we just want to clean overflowing ranges(when just adding a table to batcher).
const ( // SendUntilLessThanBatch will make the batcher send batch until // its remaining range is less than its batchSizeThreshold. SendUntilLessThanBatch SendType = iota // SendAll will make the batcher send all pending ranges. SendAll // SendAllThenClose will make the batcher send all pending ranges and then close itself. SendAllThenClose )
type StoreMeta ¶
type StoreMeta struct { StoreId uint64 RegionMetas []*recovpb.RegionMeta }
func NewStoreMeta ¶
type StreamBackupSearch ¶
type StreamBackupSearch struct {
// contains filtered or unexported fields
}
StreamBackupSearch is used for searching key from log data files
func NewStreamBackupSearch ¶
func NewStreamBackupSearch(storage storage.ExternalStorage, comparator Comparator, searchKey []byte) *StreamBackupSearch
NewStreamBackupSearch creates an instance of StreamBackupSearch
func (*StreamBackupSearch) Search ¶
func (s *StreamBackupSearch) Search(ctx context.Context) ([]*StreamKVInfo, error)
Search kv entries from log data files
func (*StreamBackupSearch) SetEndTs ¶
func (s *StreamBackupSearch) SetEndTs(endTs uint64)
SetEndTs set end timestamp searched to
func (*StreamBackupSearch) SetStartTS ¶
func (s *StreamBackupSearch) SetStartTS(startTs uint64)
SetStartTS set start timestamp searched from
type StreamKVInfo ¶
type StreamKVInfo struct { Key string `json:"key"` EncodedKey string `json:"-"` WriteType byte `json:"write-type"` StartTs uint64 `json:"start-ts"` CommitTs uint64 `json:"commit-ts"` CFName string `json:"cf-name"` Value string `json:"value,omitempty"` ShortValue string `json:"short-value,omitempty"` }
StreamKVInfo stores kv info searched from log data files
type StreamMetadataSet ¶
type StreamMetadataSet struct { // if set true, the metadata and datafile won't be removed DryRun bool // a parser of metadata Helper *stream.MetadataHelper // for test BeforeDoWriteBack func(path string, replaced *backuppb.Metadata) (skip bool) // contains filtered or unexported fields }
func (*StreamMetadataSet) IterateFilesFullyBefore ¶
func (ms *StreamMetadataSet) IterateFilesFullyBefore(before uint64, f func(d *FileGroupInfo) (shouldBreak bool))
IterateFilesFullyBefore runs the function over all files contain data before the timestamp only.
0 before |------------------------------------------| |-file1---------------| <- File contains records in this TS range would be found. |-file2--------------| <- File contains any record out of this won't be found.
This function would call the `f` over file1 only.
func (*StreamMetadataSet) LoadFrom ¶
func (ms *StreamMetadataSet) LoadFrom(ctx context.Context, s storage.ExternalStorage) error
LoadFrom loads data from an external storage into the stream metadata set. (Now only for test)
func (*StreamMetadataSet) LoadUntilAndCalculateShiftTS ¶
func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s storage.ExternalStorage, until uint64) (uint64, error)
LoadUntilAndCalculateShiftTS loads the metadata until the specified timestamp and calculate the shift-until-ts by the way. This would record all metadata files that *may* contain data from transaction committed before that TS.
func (*StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch ¶
func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context.Context, from uint64, storage storage.ExternalStorage, updateFn func(num int64)) ([]string, error)
RemoveDataFilesAndUpdateMetadataInBatch concurrently remove datafilegroups and update metadata. Only one metadata is processed in each thread, including deleting its datafilegroup and updating it. Returns the not deleted datafilegroups.
type TableSink ¶
type TableSink interface { EmitTables(tables ...CreatedTable) EmitError(error) Close() }
TableSink is the 'sink' of restored data by a sender.
type TableWithRange ¶
type TableWithRange struct { CreatedTable Range []rtree.Range }
TableWithRange is a CreatedTable that has been bind to some of key ranges.
type TiKVRestorer ¶
type TiKVRestorer interface { // SplitRanges split regions implicated by the ranges and rewrite rules. // After spliting, it also scatters the fresh regions. SplitRanges(ctx context.Context, ranges []rtree.Range, rewriteRules *RewriteRules, updateCh glue.Progress, isRawKv bool) error // RestoreSSTFiles import the files to the TiKV. RestoreSSTFiles(ctx context.Context, files []*backuppb.File, rewriteRules *RewriteRules, updateCh glue.Progress) error }
TiKVRestorer is the minimal methods required for restoring. It contains the primitive APIs extract from `restore.Client`, so some of arguments may seem redundant. Maybe TODO: make a better abstraction?