Documentation
¶
Index ¶
- Constants
- func ApplyKVFilesWithBatchMethod(ctx context.Context, logIter LogIter, batchCount int, batchSize uint64, ...) error
- func ApplyKVFilesWithSingleMethod(ctx context.Context, files LogIter, ...) error
- func LoadAndProcessMetaKVFilesInBatch(ctx context.Context, defaultFiles []*backuppb.DataFileInfo, ...) error
- func LoadMigrations(ctx context.Context, s storage.ExternalStorage) iter.TryNextor[*backuppb.Migration]
- func PutRawKvWithRetry(ctx context.Context, client *rawkv.RawKVBatchClient, key, value []byte, ...) error
- func SeparateAndSortFilesByCF(files []*backuppb.DataFileInfo) ([]*backuppb.DataFileInfo, []*backuppb.DataFileInfo)
- func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo
- type BatchMetaKVProcessor
- type CompactedFileSplitStrategy
- type CompactedSSTs
- type CopiedSST
- type DDLMetaGroup
- type FileIndex
- type FileIndexIter
- type FilesInRegion
- type FilesInTable
- type FullBackupStorageConfig
- type GetIDMapConfig
- type GroupIndex
- type GroupIndexIter
- type KvEntryWithTS
- type LockedMigrations
- type Log
- type LogClient
- func (rc *LogClient) CleanUpKVFiles(ctx context.Context) error
- func (rc *LogClient) Close(ctx context.Context)
- func (rc *LogClient) CurrentTS() uint64
- func (rc *LogClient) FailpointDoChecksumForLogRestore(ctx context.Context, kvClient kv.Client, pdClient pd.Client, ...) (finalErr error)
- func (rc *LogClient) GenGlobalID(ctx context.Context) (int64, error)
- func (rc *LogClient) GenGlobalIDs(ctx context.Context, n int) ([]int64, error)
- func (rc *LogClient) GetBaseIDMap(ctx context.Context, cfg *GetIDMapConfig) (map[stream.UpstreamID]*stream.DBReplace, error)
- func (rc *LogClient) GetClusterID(ctx context.Context) uint64
- func (rc *LogClient) GetDomain() *domain.Domain
- func (rc *LogClient) GetGCRows() []*stream.PreDelRangeQuery
- func (rc *LogClient) GetMigrations(ctx context.Context) (*LockedMigrations, error)
- func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error
- func (rc *LogClient) InitCheckpointMetadataForCompactedSstRestore(ctx context.Context) (map[string]struct{}, error)
- func (rc *LogClient) InitClients(ctx context.Context, backend *backuppb.StorageBackend, ...) error
- func (rc *LogClient) InsertGCRows(ctx context.Context) error
- func (rc *LogClient) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint, ...) error
- func (rc *LogClient) LoadOrCreateCheckpointMetadataForLogRestore(ctx context.Context, startTS, restoredTS uint64, gcRatio string, ...) (string, error)
- func (rc *LogClient) RecordDeleteRange(sql *stream.PreDelRangeQuery)
- func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue) error
- func (rc *LogClient) RestoreBatchMetaKVFiles(ctx context.Context, files []*backuppb.DataFileInfo, ...) ([]*KvEntryWithTS, error)
- func (rc *LogClient) RestoreKVFiles(ctx context.Context, rules map[int64]*restoreutils.RewriteRules, ...) error
- func (rc *LogClient) RestoreSSTFiles(ctx context.Context, compactionsIter iter.TryNextor[SSTs], ...) error
- func (rc *LogClient) RestoreSSTStatisticFields(pushTo *[]zapcore.Field)
- func (rc *LogClient) RunGCRowsLoader(ctx context.Context)
- func (rc *LogClient) SaveIdMapWithFailPoints(ctx context.Context, manager *stream.TableMappingManager) error
- func (rc *LogClient) SetCrypter(crypter *backuppb.CipherInfo)
- func (rc *LogClient) SetCurrentTS(ts uint64) error
- func (rc *LogClient) SetRawKVBatchClient(ctx context.Context, pdAddrs []string, security config.Security) error
- func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, ...) error
- func (rc *LogClient) SetUpstreamClusterID(upstreamClusterID uint64)
- func (rc *LogClient) UpdateSchemaVersionFullReload(ctx context.Context) error
- func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper(ctx context.Context, compactedIter iter.TryNextor[SSTs], ...) (iter.TryNextor[SSTs], error)
- func (rc *LogClient) WrapLogFilesIterWithSplitHelper(ctx context.Context, logIter LogIter, execCtx sqlexec.RestrictedSQLExecutor, ...) (LogIter, error)
- type LogDataFileInfo
- type LogFileImporter
- type LogFileManager
- func (lm *LogFileManager) BuildMigrations(migs []*backuppb.Migration)
- func (lm *LogFileManager) Close()
- func (lm *LogFileManager) CountExtraSSTTotalKVs(ctx context.Context) (int64, error)
- func (lm *LogFileManager) FilterDataFiles(m MetaNameIter) LogIter
- func (lm *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter
- func (lm *LogFileManager) GetCompactionIter(ctx context.Context) iter.TryNextor[SSTs]
- func (lm *LogFileManager) GetIngestedSSTs(ctx context.Context) iter.TryNextor[SSTs]
- func (lm *LogFileManager) LoadDDLFiles(ctx context.Context) ([]Log, error)
- func (lm *LogFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error)
- func (lm *LogFileManager) ReadFilteredEntriesFromFiles(ctx context.Context, file Log, filterTS uint64) ([]*KvEntryWithTS, []*KvEntryWithTS, error)
- func (lm *LogFileManager) ShiftTS() uint64
- func (lm *LogFileManager) ShouldFilterOutByTs(d *backuppb.DataFileInfo) bool
- type LogFileManagerInit
- type LogFilesSkipMap
- type LogFilesSkipMapExt
- type LogIter
- type LogRestoreManager
- type LogSplitStrategy
- type Meta
- type MetaGroupIter
- type MetaIter
- type MetaKVInfoProcessor
- func (mp *MetaKVInfoProcessor) GetTableHistoryManager() *stream.LogBackupTableHistoryManager
- func (mp *MetaKVInfoProcessor) GetTableMappingManager() *stream.TableMappingManager
- func (mp *MetaKVInfoProcessor) ProcessBatch(ctx context.Context, files []*backuppb.DataFileInfo, entries []*KvEntryWithTS, ...) ([]*KvEntryWithTS, error)
- func (mp *MetaKVInfoProcessor) ReadMetaKVFilesAndBuildInfo(ctx context.Context, files []*backuppb.DataFileInfo) error
- type MetaMigrationsIter
- type MetaName
- type MetaNameIter
- type MetaWithMigrations
- type PhysicalMigrationsIter
- type PhysicalWithMigrations
- type RPCResult
- type RangeController
- type RegionFunc
- type RestoreMetaKVProcessor
- type RetryStrategy
- type RewrittenSSTs
- type SSTs
- type SstRestoreManager
- type SubCompactionIter
- type WithMigrations
- func (wm *WithMigrations) Compactions(ctx context.Context, s storage.ExternalStorage) iter.TryNextor[*backuppb.LogFileSubcompaction]
- func (wm *WithMigrations) IngestedSSTs(ctx context.Context, s storage.ExternalStorage) iter.TryNextor[*backuppb.IngestedSSTs]
- func (wm *WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter
- type WithMigrationsBuilder
Constants ¶
const ( CompactedSSTsType = 1 CopiedSSTsType = 2 )
const MetaKVBatchSize = 64 * 1024 * 1024
const PITRIdMapBlockSize int = 524288
const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"
Variables ¶
This section is empty.
Functions ¶
func LoadAndProcessMetaKVFilesInBatch ¶
func LoadAndProcessMetaKVFilesInBatch( ctx context.Context, defaultFiles []*backuppb.DataFileInfo, writeFiles []*backuppb.DataFileInfo, processor BatchMetaKVProcessor, ) error
LoadAndProcessMetaKVFilesInBatch restores meta kv files to TiKV in strict TS order. It does so in batch and after success it triggers an update so every TiDB node can pick up the restored content.
func LoadMigrations ¶
func PutRawKvWithRetry ¶
func SeparateAndSortFilesByCF ¶
func SeparateAndSortFilesByCF(files []*backuppb.DataFileInfo) ([]*backuppb.DataFileInfo, []*backuppb.DataFileInfo)
SeparateAndSortFilesByCF filters and sorts files by column family. It separates files into write CF and default CF groups and then sorts them within each CF group.
func SortMetaKVFiles ¶
func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo
Types ¶
type BatchMetaKVProcessor ¶
type BatchMetaKVProcessor interface { // ProcessBatch processes a batch of files and with a filterTS and return what's not processed for next iteration ProcessBatch( ctx context.Context, files []*backuppb.DataFileInfo, entries []*KvEntryWithTS, filterTS uint64, cf string, ) ([]*KvEntryWithTS, error) }
BatchMetaKVProcessor defines how to process a batch of files
type CompactedFileSplitStrategy ¶
type CompactedFileSplitStrategy struct { *split.BaseSplitStrategy // contains filtered or unexported fields }
func NewCompactedFileSplitStrategy ¶
func NewCompactedFileSplitStrategy( rules map[int64]*utils.RewriteRules, checkpointsSet map[string]struct{}, updateStatsFn func(uint64, uint64), ) *CompactedFileSplitStrategy
func (*CompactedFileSplitStrategy) Accumulate ¶
func (cs *CompactedFileSplitStrategy) Accumulate(ssts SSTs)
func (*CompactedFileSplitStrategy) ShouldSkip ¶
func (cs *CompactedFileSplitStrategy) ShouldSkip(ssts SSTs) bool
func (*CompactedFileSplitStrategy) ShouldSplit ¶
func (cs *CompactedFileSplitStrategy) ShouldSplit() bool
type CompactedSSTs ¶
type CompactedSSTs struct {
*backuppb.LogFileSubcompaction
}
func (*CompactedSSTs) GetSSTs ¶
func (s *CompactedSSTs) GetSSTs() []*backuppb.File
func (*CompactedSSTs) SetSSTs ¶
func (s *CompactedSSTs) SetSSTs(files []*backuppb.File)
func (*CompactedSSTs) String ¶
func (s *CompactedSSTs) String() string
func (*CompactedSSTs) TableID ¶
func (s *CompactedSSTs) TableID() int64
func (*CompactedSSTs) Type ¶
func (s *CompactedSSTs) Type() int
type CopiedSST ¶
type CopiedSST struct { File *backuppb.File Rewritten backuppb.RewrittenTableID // contains filtered or unexported fields }
func (*CopiedSST) RewrittenTo ¶
type DDLMetaGroup ¶
type DDLMetaGroup struct { Path string FileMetas []*backuppb.DataFileInfo }
type FileIndex ¶
type FileIndex = iter.Indexed[*backuppb.DataFileInfo]
FileIndex is the type of logical data file with index from physical data file.
type FileIndexIter ¶
FileIndexIter is the type of iterator of logical data file with index from physical data file.
type FilesInRegion ¶
type FilesInRegion struct {
// contains filtered or unexported fields
}
type FilesInTable ¶
type FilesInTable struct {
// contains filtered or unexported fields
}
type FullBackupStorageConfig ¶
type FullBackupStorageConfig struct { Backend *backuppb.StorageBackend Opts *storage.ExternalStorageOptions }
type GetIDMapConfig ¶
type GetIDMapConfig struct { // required LoadSavedIDMap bool // optional FullBackupStorageConfig *FullBackupStorageConfig CipherInfo *backuppb.CipherInfo // generated at full restore step that contains all the table ids that need to restore PiTRTableTracker *utils.PiTRIdTracker }
type GroupIndex ¶
type GroupIndex = iter.Indexed[*backuppb.DataFileGroup]
GroupIndex is the type of physical data file with index from metadata.
type GroupIndexIter ¶
type GroupIndexIter = iter.TryNextor[GroupIndex]
GroupIndexIter is the type of iterator of physical data file with index from metadata.
type KvEntryWithTS ¶
KvEntryWithTS is kv entry with ts, the ts is decoded from entry.
type LockedMigrations ¶
type LockedMigrations struct { Migs []*backuppb.Migration ReadLock storage.RemoteLock }
type Log ¶
type Log = *backuppb.DataFileInfo
Log is the metadata of one file recording KV sequences.
type LogClient ¶
type LogClient struct { *LogFileManager // contains filtered or unexported fields }
func NewLogClient ¶
func NewLogClient( pdClient pd.Client, pdHTTPCli pdhttp.Client, tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters, ) *LogClient
NewLogClient returns a new LogClient.
func (*LogClient) CleanUpKVFiles ¶
func (*LogClient) FailpointDoChecksumForLogRestore ¶
func (rc *LogClient) FailpointDoChecksumForLogRestore( ctx context.Context, kvClient kv.Client, pdClient pd.Client, rewriteRules map[int64]*restoreutils.RewriteRules, ) (finalErr error)
called by failpoint, only used for test it would print the checksum result into the log, and the auto-test script records them to compare another cluster's checksum.
func (*LogClient) GenGlobalID ¶
GenGlobalID generates a global id by transaction way.
func (*LogClient) GenGlobalIDs ¶
GenGlobalIDs generates several global ids by transaction way.
func (*LogClient) GetBaseIDMap ¶
func (rc *LogClient) GetBaseIDMap( ctx context.Context, cfg *GetIDMapConfig, ) (map[stream.UpstreamID]*stream.DBReplace, error)
GetBaseIDMap get the id map from following ways 1. from previously saved id map if the same task has been running and built/saved id map already but failed later 2. from previous different task. A PiTR job might be split into multiple runs/tasks and each task only restores a subset of the entire job. 3. from full backup snapshot if specified.
func (*LogClient) GetClusterID ¶
GetClusterID gets the cluster id from down-stream cluster.
func (*LogClient) GetGCRows ¶
func (rc *LogClient) GetGCRows() []*stream.PreDelRangeQuery
only for unit test
func (*LogClient) GetMigrations ¶
func (rc *LogClient) GetMigrations(ctx context.Context) (*LockedMigrations, error)
func (*LogClient) InitCheckpointMetadataForCompactedSstRestore ¶
func (*LogClient) InitClients ¶
func (*LogClient) InsertGCRows ¶
InsertGCRows insert the querys into table `gc_delete_range`
func (*LogClient) InstallLogFileManager ¶
func (*LogClient) LoadOrCreateCheckpointMetadataForLogRestore ¶
func (rc *LogClient) LoadOrCreateCheckpointMetadataForLogRestore( ctx context.Context, startTS, restoredTS uint64, gcRatio string, tiflashRecorder *tiflashrec.TiFlashRecorder, ) (string, error)
func (*LogClient) RecordDeleteRange ¶
func (rc *LogClient) RecordDeleteRange(sql *stream.PreDelRangeQuery)
func (*LogClient) RepairIngestIndex ¶
func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue) error
RepairIngestIndex drops the indexes from IngestRecorder and re-add them.
func (*LogClient) RestoreBatchMetaKVFiles ¶
func (rc *LogClient) RestoreBatchMetaKVFiles( ctx context.Context, files []*backuppb.DataFileInfo, schemasReplace *stream.SchemasReplace, kvEntries []*KvEntryWithTS, filterTS uint64, updateStats func(kvCount uint64, size uint64), progressInc func(), cf string, ) ([]*KvEntryWithTS, error)
RestoreBatchMetaKVFiles tries to restore and rewrite meta kv to TiKV from external storage. It reads out entries from the given files and only restores ones that's in filter range, then it returns those entries out of the filter range back to caller for next iteration of restore.
func (*LogClient) RestoreKVFiles ¶
func (rc *LogClient) RestoreKVFiles( ctx context.Context, rules map[int64]*restoreutils.RewriteRules, logIter LogIter, pitrBatchCount uint32, pitrBatchSize uint32, updateStats func(kvCount uint64, size uint64), onProgress func(cnt int64), cipherInfo *backuppb.CipherInfo, masterKeys []*encryptionpb.MasterKey, ) error
func (*LogClient) RestoreSSTFiles ¶
func (rc *LogClient) RestoreSSTFiles( ctx context.Context, compactionsIter iter.TryNextor[SSTs], rules map[int64]*restoreutils.RewriteRules, importModeSwitcher *restore.ImportModeSwitcher, onProgress func(int64), ) error
func (*LogClient) RestoreSSTStatisticFields ¶
func (*LogClient) RunGCRowsLoader ¶
use channel to save the delete-range query to make it thread-safety.
func (*LogClient) SaveIdMapWithFailPoints ¶
func (*LogClient) SetCrypter ¶
func (rc *LogClient) SetCrypter(crypter *backuppb.CipherInfo)
func (*LogClient) SetCurrentTS ¶
func (*LogClient) SetRawKVBatchClient ¶
func (*LogClient) SetStorage ¶
func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error
func (*LogClient) SetUpstreamClusterID ¶
func (*LogClient) UpdateSchemaVersionFullReload ¶
UpdateSchemaVersionFullReload updates schema version to trigger a full reload in transaction way.
func (*LogClient) WrapCompactedFilesIterWithSplitHelper ¶
func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper( ctx context.Context, compactedIter iter.TryNextor[SSTs], rules map[int64]*restoreutils.RewriteRules, checkpointSets map[string]struct{}, updateStatsFn func(uint64, uint64), splitSize uint64, splitKeys int64, ) (iter.TryNextor[SSTs], error)
WrapCompactedFilesIteratorWithSplit applies a splitting strategy to the compacted files iterator. It uses a region splitter to handle the splitting logic based on the provided rules and checkpoint sets.
func (*LogClient) WrapLogFilesIterWithSplitHelper ¶
func (rc *LogClient) WrapLogFilesIterWithSplitHelper( ctx context.Context, logIter LogIter, execCtx sqlexec.RestrictedSQLExecutor, rules map[int64]*restoreutils.RewriteRules, updateStatsFn func(uint64, uint64), splitSize uint64, splitKeys int64, ) (LogIter, error)
WrapLogFilesIterWithSplitHelper applies a splitting strategy to the log files iterator. It uses a region splitter to handle the splitting logic based on the provided rules.
type LogDataFileInfo ¶
type LogDataFileInfo struct { *backuppb.DataFileInfo MetaDataGroupName string OffsetInMetaGroup int OffsetInMergedGroup int }
type LogFileImporter ¶
type LogFileImporter struct {
// contains filtered or unexported fields
}
func NewLogFileImporter ¶
func NewLogFileImporter( metaClient split.SplitClient, importClient importclient.ImporterClient, backend *backuppb.StorageBackend, ) *LogFileImporter
NewFileImporter returns a new file importClient.
func (*LogFileImporter) ClearFiles ¶
func (*LogFileImporter) Close ¶
func (importer *LogFileImporter) Close() error
func (*LogFileImporter) ImportKVFiles ¶
func (importer *LogFileImporter) ImportKVFiles( ctx context.Context, files []*LogDataFileInfo, rule *restoreutils.RewriteRules, shiftStartTS uint64, startTS uint64, restoreTS uint64, supportBatch bool, cipherInfo *backuppb.CipherInfo, masterKeys []*encryptionpb.MasterKey, ) error
ImportKVFiles restores the kv events.
type LogFileManager ¶
type LogFileManager struct { // The output channel for statistics. // This will be collected when reading the metadata. Stats *logFilesStatistic // contains filtered or unexported fields }
LogFileManager is the manager for log files of a certain restoration, which supports read / filter from the log backup archive with static start TS / restore TS.
func CreateLogFileManager ¶
func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFileManager, error)
CreateLogFileManager creates a log file manager using the specified config. Generally the config cannot be changed during its lifetime.
func (*LogFileManager) BuildMigrations ¶
func (lm *LogFileManager) BuildMigrations(migs []*backuppb.Migration)
func (*LogFileManager) Close ¶
func (lm *LogFileManager) Close()
func (*LogFileManager) CountExtraSSTTotalKVs ¶
func (lm *LogFileManager) CountExtraSSTTotalKVs(ctx context.Context) (int64, error)
func (*LogFileManager) FilterDataFiles ¶
func (lm *LogFileManager) FilterDataFiles(m MetaNameIter) LogIter
func (*LogFileManager) FilterMetaFiles ¶
func (lm *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter
func (*LogFileManager) GetCompactionIter ¶
GetCompactionIter fetches compactions that may contain file less than the TS.
func (*LogFileManager) GetIngestedSSTs ¶
func (*LogFileManager) LoadDDLFiles ¶
func (lm *LogFileManager) LoadDDLFiles(ctx context.Context) ([]Log, error)
LoadDDLFiles loads all DDL files needs to be restored in the restoration. This function returns all DDL files needing directly because we need sort all of them.
func (*LogFileManager) LoadDMLFiles ¶
func (lm *LogFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error)
LoadDMLFiles loads all DML files needs to be restored in the restoration. This function returns a stream, because there are usually many DML files need to be restored.
func (*LogFileManager) ReadFilteredEntriesFromFiles ¶
func (lm *LogFileManager) ReadFilteredEntriesFromFiles( ctx context.Context, file Log, filterTS uint64, ) ([]*KvEntryWithTS, []*KvEntryWithTS, error)
ReadFilteredEntriesFromFiles loads content of a log file from external storage, and filter out entries based on TS.
func (*LogFileManager) ShiftTS ¶
func (lm *LogFileManager) ShiftTS() uint64
func (*LogFileManager) ShouldFilterOutByTs ¶
func (lm *LogFileManager) ShouldFilterOutByTs(d *backuppb.DataFileInfo) bool
ShouldFilterOutByTs checks whether a file should be filtered out via the current client.
type LogFileManagerInit ¶
type LogFileManagerInit struct { StartTS uint64 RestoreTS uint64 Storage storage.ExternalStorage MigrationsBuilder *WithMigrationsBuilder Migrations *WithMigrations MetadataDownloadBatchSize uint EncryptionManager *encryption.Manager }
LogFileManagerInit is the config needed for initializing the log file manager.
type LogFilesSkipMap ¶
type LogFilesSkipMap struct {
// contains filtered or unexported fields
}
func NewLogFilesSkipMap ¶
func NewLogFilesSkipMap() *LogFilesSkipMap
func (*LogFilesSkipMap) Insert ¶
func (m *LogFilesSkipMap) Insert(metaKey string, groupOff, fileOff int)
type LogFilesSkipMapExt ¶
type LogFilesSkipMapExt struct {
// contains filtered or unexported fields
}
func NewLogFilesSkipMapExt ¶
func NewLogFilesSkipMapExt() *LogFilesSkipMapExt
func (*LogFilesSkipMapExt) Insert ¶
func (m *LogFilesSkipMapExt) Insert(metaKey string, groupOff, fileOff int)
func (*LogFilesSkipMapExt) NeedSkip ¶
func (m *LogFilesSkipMapExt) NeedSkip(metaKey string, groupOff, fileOff int) bool
func (*LogFilesSkipMapExt) SkipGroup ¶
func (m *LogFilesSkipMapExt) SkipGroup(metaKey string, groupOff int)
func (*LogFilesSkipMapExt) SkipMeta ¶
func (m *LogFilesSkipMapExt) SkipMeta(metaKey string)
type LogIter ¶
type LogIter = iter.TryNextor[*LogDataFileInfo]
LogIter is the type of iterator of each log files' meta information.
func WrapLogFilesIterWithCheckpointFailpoint ¶
func WrapLogFilesIterWithCheckpointFailpoint( v failpoint.Value, logIter LogIter, rules map[int64]*restoreutils.RewriteRules, ) (LogIter, error)
type LogRestoreManager ¶
type LogRestoreManager struct {
// contains filtered or unexported fields
}
LogRestoreManager is a comprehensive wrapper that encapsulates all logic related to log restoration, including concurrency management, checkpoint handling, and file importing for efficient log processing.
func NewLogRestoreManager ¶
func NewLogRestoreManager( ctx context.Context, fileImporter *LogFileImporter, poolSize uint, createCheckpointSessionFn func() (glue.Session, error), ) (*LogRestoreManager, error)
func (*LogRestoreManager) Close ¶
func (l *LogRestoreManager) Close(ctx context.Context)
type LogSplitStrategy ¶
type LogSplitStrategy struct { *split.BaseSplitStrategy // contains filtered or unexported fields }
func NewLogSplitStrategy ¶
func NewLogSplitStrategy( ctx context.Context, useCheckpoint bool, execCtx sqlexec.RestrictedSQLExecutor, rules map[int64]*restoreutils.RewriteRules, updateStatsFn func(uint64, uint64), ) (*LogSplitStrategy, error)
func (*LogSplitStrategy) Accumulate ¶
func (ls *LogSplitStrategy) Accumulate(file *LogDataFileInfo)
func (*LogSplitStrategy) ShouldSkip ¶
func (ls *LogSplitStrategy) ShouldSkip(file *LogDataFileInfo) bool
func (*LogSplitStrategy) ShouldSplit ¶
func (ls *LogSplitStrategy) ShouldSplit() bool
type MetaGroupIter ¶
type MetaGroupIter = iter.TryNextor[DDLMetaGroup]
MetaGroupIter is the iterator of flushes of metadata.
type MetaKVInfoProcessor ¶
type MetaKVInfoProcessor struct {
// contains filtered or unexported fields
}
MetaKVInfoProcessor implements BatchMetaKVProcessor to iterate meta kv and collect information.
1. It collects table renaming information. The table rename operation will not change the table id, and the process will drop the original table and create a new one with the same table id, so in DDL history there will be two events that corresponds to the same table id.
2. It builds the id mapping from upstream to downstream. This logic was nested into table rewrite previously and now separated out to its own component.
func NewMetaKVInfoProcessor ¶
func NewMetaKVInfoProcessor(client *LogClient) *MetaKVInfoProcessor
func (*MetaKVInfoProcessor) GetTableHistoryManager ¶
func (mp *MetaKVInfoProcessor) GetTableHistoryManager() *stream.LogBackupTableHistoryManager
func (*MetaKVInfoProcessor) GetTableMappingManager ¶
func (mp *MetaKVInfoProcessor) GetTableMappingManager() *stream.TableMappingManager
func (*MetaKVInfoProcessor) ProcessBatch ¶
func (mp *MetaKVInfoProcessor) ProcessBatch( ctx context.Context, files []*backuppb.DataFileInfo, entries []*KvEntryWithTS, filterTS uint64, cf string, ) ([]*KvEntryWithTS, error)
func (*MetaKVInfoProcessor) ReadMetaKVFilesAndBuildInfo ¶
func (mp *MetaKVInfoProcessor) ReadMetaKVFilesAndBuildInfo( ctx context.Context, files []*backuppb.DataFileInfo, ) error
type MetaMigrationsIter ¶
type MetaMigrationsIter = iter.TryNextor[*MetaWithMigrations]
type MetaNameIter ¶
MetaNameIter is the type of iterator of metadata files' content with name.
type MetaWithMigrations ¶
type MetaWithMigrations struct {
// contains filtered or unexported fields
}
func (*MetaWithMigrations) Physicals ¶
func (mwm *MetaWithMigrations) Physicals(groupIndexIter GroupIndexIter) PhysicalMigrationsIter
type PhysicalMigrationsIter ¶
type PhysicalMigrationsIter = iter.TryNextor[*PhysicalWithMigrations]
type PhysicalWithMigrations ¶
type PhysicalWithMigrations struct {
// contains filtered or unexported fields
}
func (*PhysicalWithMigrations) Logicals ¶
func (pwm *PhysicalWithMigrations) Logicals(fileIndexIter FileIndexIter) FileIndexIter
type RPCResult ¶
RPCResult is the result after executing some RPCs to TiKV.
func RPCResultFromError ¶
func RPCResultFromPBError ¶
func RPCResultFromPBError(err *import_sstpb.Error) RPCResult
func RPCResultOK ¶
func RPCResultOK() RPCResult
func (*RPCResult) StrategyForRetry ¶
func (r *RPCResult) StrategyForRetry() RetryStrategy
func (*RPCResult) StrategyForRetryGoError ¶
func (r *RPCResult) StrategyForRetryGoError() RetryStrategy
func (*RPCResult) StrategyForRetryStoreError ¶
func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy
type RangeController ¶
type RangeController struct {
// contains filtered or unexported fields
}
RangeController manages the execution of operations over a range of regions. It provides functionality to scan regions within a specified key range and apply a given function to each region, handling errors and retries automatically.
func CreateRangeController ¶
func CreateRangeController(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) RangeController
CreateRangeController creates a controller that cloud be used to scan regions in a range and apply a function over these regions. You can then call the `Run` method for applying some functions.
func (*RangeController) ApplyFuncToRange ¶
func (o *RangeController) ApplyFuncToRange(ctx context.Context, f RegionFunc) error
ApplyFuncToRange apples the `regionFunc` for all regions in `o.start` and `o.end`. It would retry errors according to the `rpcResponse`.
type RegionFunc ¶
type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult
type RestoreMetaKVProcessor ¶
type RestoreMetaKVProcessor struct {
// contains filtered or unexported fields
}
RestoreMetaKVProcessor implements BatchMetaKVProcessor for restoring files in batches
func NewRestoreMetaKVProcessor ¶
func NewRestoreMetaKVProcessor(client *LogClient, schemasReplace *stream.SchemasReplace, updateStats func(kvCount uint64, size uint64), progressInc func()) *RestoreMetaKVProcessor
func (*RestoreMetaKVProcessor) ProcessBatch ¶
func (rp *RestoreMetaKVProcessor) ProcessBatch( ctx context.Context, files []*backuppb.DataFileInfo, entries []*KvEntryWithTS, filterTS uint64, cf string, ) ([]*KvEntryWithTS, error)
func (*RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles ¶
func (rp *RestoreMetaKVProcessor) RestoreAndRewriteMetaKVFiles( ctx context.Context, files []*backuppb.DataFileInfo, ) error
RestoreAndRewriteMetaKVFiles tries to restore files about meta kv-event from stream-backup.
type RetryStrategy ¶
type RetryStrategy int
const ( StrategyGiveUp RetryStrategy = iota StrategyFromThisRegion StrategyFromStart )
type RewrittenSSTs ¶
type RewrittenSSTs interface { // RewrittenTo returns the table ID that the SST should be treated as // when doing filtering. RewrittenTo() int64 }
RewrittenSSTs is an extension to the `SSTs` that needs extra key rewriting. This allows a SST being restored "as if" it in another table.
The name "rewritten" means that the SST has already been rewritten somewhere else -- before importing it, we need "replay" the rewrite on it.
For example, if a SST contains content of table `1`. And `RewrittenTo` returns `10`, the downstream wants to rewrite table `10` to `100`: - When searching for rewrite rules for the SSTs, we will use the table ID `10`(`RewrittenTo()`). - When importing the SST, we will use the rewrite rule `1`(`TableID()`) -> `100`(RewriteRule).
type SSTs ¶
type SSTs interface { fmt.Stringer Type() int // TableID returns the ID of the table associated with the SST files. // This should be the same as the physical content's table ID. TableID() int64 // GetSSTs returns a slice of pointers to backuppb.File, representing the SST files. GetSSTs() []*backuppb.File // SetSSTs allows the user to override the internal SSTs to be restored. // The input SST set should already be a subset of `GetSSTs.` SetSSTs([]*backuppb.File) }
SSTs is an interface that represents a collection of SST files.
type SstRestoreManager ¶
type SstRestoreManager struct {
// contains filtered or unexported fields
}
SstRestoreManager is a comprehensive wrapper that encapsulates all logic related to sst restoration, including concurrency management, checkpoint handling, and file importing(splitting) for efficient log processing.
func NewSstRestoreManager ¶
func NewSstRestoreManager( ctx context.Context, snapFileImporter *snapclient.SnapFileImporter, concurrencyPerStore uint, storeCount uint, createCheckpointSessionFn func() (glue.Session, error), ) (*SstRestoreManager, error)
func (*SstRestoreManager) Close ¶
func (s *SstRestoreManager) Close(ctx context.Context)
type SubCompactionIter ¶
type SubCompactionIter iter.TryNextor[*backuppb.LogFileSubcompaction]
func Subcompactions ¶
func Subcompactions(ctx context.Context, prefix string, s storage.ExternalStorage, shiftStartTS, restoredTS uint64) SubCompactionIter
type WithMigrations ¶
type WithMigrations struct {
// contains filtered or unexported fields
}
func (*WithMigrations) Compactions ¶
func (wm *WithMigrations) Compactions(ctx context.Context, s storage.ExternalStorage) iter.TryNextor[*backuppb.LogFileSubcompaction]
func (*WithMigrations) IngestedSSTs ¶
func (wm *WithMigrations) IngestedSSTs(ctx context.Context, s storage.ExternalStorage) iter.TryNextor[*backuppb.IngestedSSTs]
func (*WithMigrations) Metas ¶
func (wm *WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter
type WithMigrationsBuilder ¶
type WithMigrationsBuilder struct {
// contains filtered or unexported fields
}
func (*WithMigrationsBuilder) Build ¶
func (builder *WithMigrationsBuilder) Build(migs []*backuppb.Migration) WithMigrations
Create the wrapper by migrations.
func (*WithMigrationsBuilder) SetShiftStartTS ¶
func (builder *WithMigrationsBuilder) SetShiftStartTS(ts uint64)