logclient

package
v1.1.0-beta.0...-d9dc3f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: Apache-2.0, Apache-2.0 Imports: 71 Imported by: 0

Documentation

Index

Constants

View Source
const MetaKVBatchSize = 64 * 1024 * 1024
View Source
const PITRIdMapBlockSize int = 524288
View Source
const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"

Variables

View Source
var TotalEntryCount int64

Functions

func ApplyKVFilesWithBatchMethod

func ApplyKVFilesWithBatchMethod(
	ctx context.Context,
	logIter LogIter,
	batchCount int,
	batchSize uint64,
	applyFunc func(files []*LogDataFileInfo, kvCount int64, size uint64),
	applyWg *sync.WaitGroup,
) error

func ApplyKVFilesWithSingleMethod

func ApplyKVFilesWithSingleMethod(
	ctx context.Context,
	files LogIter,
	applyFunc func(file []*LogDataFileInfo, kvCount int64, size uint64),
	applyWg *sync.WaitGroup,
) error

func RestoreMetaKVFilesWithBatchMethod

func RestoreMetaKVFilesWithBatchMethod(
	ctx context.Context,
	defaultFiles []*backuppb.DataFileInfo,
	writeFiles []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
	restoreBatch func(
		ctx context.Context,
		files []*backuppb.DataFileInfo,
		schemasReplace *stream.SchemasReplace,
		kvEntries []*KvEntryWithTS,
		filterTS uint64,
		updateStats func(kvCount uint64, size uint64),
		progressInc func(),
		cf string,
	) ([]*KvEntryWithTS, error),
) error

func SortMetaKVFiles

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo

Types

type BuildTableMappingManagerConfig

type BuildTableMappingManagerConfig struct {
	// required
	CurrentIdMapSaved bool
	TableFilter       filter.Filter

	// optional
	FullBackupStorage *FullBackupStorageConfig
	CipherInfo        *backuppb.CipherInfo
	Files             []*backuppb.DataFileInfo
}

type CompactedFileSplitStrategy

type CompactedFileSplitStrategy struct {
	*split.BaseSplitStrategy
	// contains filtered or unexported fields
}

func NewCompactedFileSplitStrategy

func NewCompactedFileSplitStrategy(
	rules map[int64]*restoreutils.RewriteRules,
	checkpointsSet map[string]struct{},
	updateStatsFn func(uint64, uint64),
) *CompactedFileSplitStrategy

func (*CompactedFileSplitStrategy) Accumulate

func (cs *CompactedFileSplitStrategy) Accumulate(subCompaction *backuppb.LogFileSubcompaction)

func (*CompactedFileSplitStrategy) ShouldSkip

func (cs *CompactedFileSplitStrategy) ShouldSkip(subCompaction *backuppb.LogFileSubcompaction) bool

func (*CompactedFileSplitStrategy) ShouldSplit

func (cs *CompactedFileSplitStrategy) ShouldSplit() bool

type DDLMetaGroup

type DDLMetaGroup struct {
	Path      string
	FileMetas []*backuppb.DataFileInfo
}

type FileIndex

type FileIndex = iter.Indexed[*backuppb.DataFileInfo]

FileIndex is the type of logical data file with index from physical data file.

type FileIndexIter

type FileIndexIter = iter.TryNextor[FileIndex]

FileIndexIter is the type of iterator of logical data file with index from physical data file.

type FilesInRegion

type FilesInRegion struct {
	// contains filtered or unexported fields
}

type FilesInTable

type FilesInTable struct {
	// contains filtered or unexported fields
}

type FullBackupStorageConfig

type FullBackupStorageConfig struct {
	Backend *backuppb.StorageBackend
	Opts    *storage.ExternalStorageOptions
}

type GroupIndex

type GroupIndex = iter.Indexed[*backuppb.DataFileGroup]

GroupIndex is the type of physical data file with index from metadata.

type GroupIndexIter

type GroupIndexIter = iter.TryNextor[GroupIndex]

GroupIndexIter is the type of iterator of physical data file with index from metadata.

type KvEntryWithTS

type KvEntryWithTS struct {
	E  kv.Entry
	Ts uint64
}

the kv entry with ts, the ts is decoded from entry.

type Log

type Log = *backuppb.DataFileInfo

Log is the metadata of one file recording KV sequences.

type LogClient

type LogClient struct {
	*LogFileManager
	// contains filtered or unexported fields
}

func NewRestoreClient

func NewRestoreClient(
	pdClient pd.Client,
	pdHTTPCli pdhttp.Client,
	tlsConf *tls.Config,
	keepaliveConf keepalive.ClientParameters,
) *LogClient

NewRestoreClient returns a new RestoreClient.

func (*LogClient) BuildTableMappingManager

func (rc *LogClient) BuildTableMappingManager(
	ctx context.Context,
	cfg *BuildTableMappingManagerConfig,
) (*stream.TableMappingManager, error)

BuildTableMappingManager builds the table mapping manager. It reads the full backup storage to get the full backup table info to initialize the manager, or it reads the id map from previous task, or it loads the saved mapping from last time of run of the same task.

func (*LogClient) CleanUpKVFiles

func (rc *LogClient) CleanUpKVFiles(
	ctx context.Context,
) error

func (*LogClient) Close

func (rc *LogClient) Close(ctx context.Context)

Close a client.

func (*LogClient) CurrentTS

func (rc *LogClient) CurrentTS() uint64

func (*LogClient) FailpointDoChecksumForLogRestore

func (rc *LogClient) FailpointDoChecksumForLogRestore(
	ctx context.Context,
	kvClient kv.Client,
	pdClient pd.Client,
	rewriteRules map[int64]*restoreutils.RewriteRules,
) (finalErr error)

called by failpoint, only used for test it would print the checksum result into the log, and the auto-test script records them to compare another cluster's checksum.

func (*LogClient) GenGlobalID

func (rc *LogClient) GenGlobalID(ctx context.Context) (int64, error)

GenGlobalID generates a global id by transaction way.

func (*LogClient) GenGlobalIDs

func (rc *LogClient) GenGlobalIDs(ctx context.Context, n int) ([]int64, error)

GenGlobalIDs generates several global ids by transaction way.

func (*LogClient) GetClusterID

func (rc *LogClient) GetClusterID(ctx context.Context) uint64

GetClusterID gets the cluster id from down-stream cluster.

func (*LogClient) GetDomain

func (rc *LogClient) GetDomain() *domain.Domain

func (*LogClient) GetGCRows

func (rc *LogClient) GetGCRows() []*stream.PreDelRangeQuery

only for unit test

func (*LogClient) GetMigrations

func (rc *LogClient) GetMigrations(ctx context.Context) ([]*backuppb.Migration, error)

func (*LogClient) Init

func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error

Init create db connection and domain for storage.

func (*LogClient) InitCheckpointMetadataForCompactedSstRestore

func (rc *LogClient) InitCheckpointMetadataForCompactedSstRestore(
	ctx context.Context,
) (map[string]struct{}, error)

func (*LogClient) InitCheckpointMetadataForLogRestore

func (rc *LogClient) InitCheckpointMetadataForLogRestore(
	ctx context.Context,
	startTS, restoredTS uint64,
	gcRatio string,
	tiflashRecorder *tiflashrec.TiFlashRecorder,
) (string, error)

func (*LogClient) InitClients

func (rc *LogClient) InitClients(
	ctx context.Context,
	backend *backuppb.StorageBackend,
	createSessionFn func() (glue.Session, error),
	concurrency uint,
	concurrencyPerStore uint,
) error

func (*LogClient) InsertGCRows

func (rc *LogClient) InsertGCRows(ctx context.Context) error

InsertGCRows insert the querys into table `gc_delete_range`

func (*LogClient) InstallLogFileManager

func (rc *LogClient) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint,
	encryptionManager *encryption.Manager) error

func (*LogClient) IterMetaKVToBuildAndSaveIdMap

func (rc *LogClient) IterMetaKVToBuildAndSaveIdMap(
	ctx context.Context,
	tableMappingManager *stream.TableMappingManager,
	files []*backuppb.DataFileInfo,
) error

IterMetaKVToBuildAndSaveIdMap iterates meta kv and builds id mapping and saves it to storage.

func (*LogClient) RecordDeleteRange

func (rc *LogClient) RecordDeleteRange(sql *stream.PreDelRangeQuery)

func (*LogClient) RepairIngestIndex

func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue) error

RepairIngestIndex drops the indexes from IngestRecorder and re-add them.

func (*LogClient) RestoreAndRewriteMetaKVFiles

func (rc *LogClient) RestoreAndRewriteMetaKVFiles(
	ctx context.Context,
	files []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
) error

RestoreAndRewriteMetaKVFiles tries to restore files about meta kv-event from stream-backup.

func (*LogClient) RestoreBatchMetaKVFiles

func (rc *LogClient) RestoreBatchMetaKVFiles(
	ctx context.Context,
	files []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	kvEntries []*KvEntryWithTS,
	filterTS uint64,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
	cf string,
) ([]*KvEntryWithTS, error)

func (*LogClient) RestoreCompactedSstFiles

func (rc *LogClient) RestoreCompactedSstFiles(
	ctx context.Context,
	compactionsIter iter.TryNextor[*backuppb.LogFileSubcompaction],
	rules map[int64]*restoreutils.RewriteRules,
	importModeSwitcher *restore.ImportModeSwitcher,
	onProgress func(int64),
) error

func (*LogClient) RestoreKVFiles

func (rc *LogClient) RestoreKVFiles(
	ctx context.Context,
	rules map[int64]*restoreutils.RewriteRules,
	logIter LogIter,
	pitrBatchCount uint32,
	pitrBatchSize uint32,
	updateStats func(kvCount uint64, size uint64),
	onProgress func(cnt int64),
	cipherInfo *backuppb.CipherInfo,
	masterKeys []*encryptionpb.MasterKey,
) error

func (*LogClient) RunGCRowsLoader

func (rc *LogClient) RunGCRowsLoader(ctx context.Context)

use channel to save the delete-range query to make it thread-safety.

func (*LogClient) SetCrypter

func (rc *LogClient) SetCrypter(crypter *backuppb.CipherInfo)

func (*LogClient) SetCurrentTS

func (rc *LogClient) SetCurrentTS(ts uint64) error

func (*LogClient) SetRawKVBatchClient

func (rc *LogClient) SetRawKVBatchClient(
	ctx context.Context,
	pdAddrs []string,
	security config.Security,
) error

func (*LogClient) SetStorage

func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error

func (*LogClient) SetUpstreamClusterID

func (rc *LogClient) SetUpstreamClusterID(upstreamClusterID uint64)

func (*LogClient) UpdateSchemaVersion

func (rc *LogClient) UpdateSchemaVersion(ctx context.Context) error

UpdateSchemaVersion updates schema version by transaction way.

func (*LogClient) WrapCompactedFilesIterWithSplitHelper

func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper(
	ctx context.Context,
	compactedIter iter.TryNextor[*backuppb.LogFileSubcompaction],
	rules map[int64]*restoreutils.RewriteRules,
	checkpointSets map[string]struct{},
	updateStatsFn func(uint64, uint64),
	splitSize uint64,
	splitKeys int64,
) (iter.TryNextor[*backuppb.LogFileSubcompaction], error)

WrapCompactedFilesIteratorWithSplit applies a splitting strategy to the compacted files iterator. It uses a region splitter to handle the splitting logic based on the provided rules and checkpoint sets.

func (*LogClient) WrapLogFilesIterWithSplitHelper

func (rc *LogClient) WrapLogFilesIterWithSplitHelper(
	ctx context.Context,
	logIter LogIter,
	execCtx sqlexec.RestrictedSQLExecutor,
	rules map[int64]*restoreutils.RewriteRules,
	updateStatsFn func(uint64, uint64),
	splitSize uint64,
	splitKeys int64,
) (LogIter, error)

WrapLogFilesIteratorWithSplit applies a splitting strategy to the log files iterator. It uses a region splitter to handle the splitting logic based on the provided rules.

type LogDataFileInfo

type LogDataFileInfo struct {
	*backuppb.DataFileInfo
	MetaDataGroupName   string
	OffsetInMetaGroup   int
	OffsetInMergedGroup int
}

type LogFileImporter

type LogFileImporter struct {
	// contains filtered or unexported fields
}

func NewLogFileImporter

func NewLogFileImporter(
	metaClient split.SplitClient,
	importClient importclient.ImporterClient,
	backend *backuppb.StorageBackend,
) *LogFileImporter

NewFileImporter returns a new file importClient.

func (*LogFileImporter) ClearFiles

func (importer *LogFileImporter) ClearFiles(ctx context.Context, pdClient pd.Client, prefix string) error

func (*LogFileImporter) Close

func (importer *LogFileImporter) Close() error

func (*LogFileImporter) ImportKVFiles

func (importer *LogFileImporter) ImportKVFiles(
	ctx context.Context,
	files []*LogDataFileInfo,
	rule *restoreutils.RewriteRules,
	shiftStartTS uint64,
	startTS uint64,
	restoreTS uint64,
	supportBatch bool,
	cipherInfo *backuppb.CipherInfo,
	masterKeys []*encryptionpb.MasterKey,
) error

ImportKVFiles restores the kv events.

type LogFileManager

type LogFileManager struct {
	// contains filtered or unexported fields
}

LogFileManager is the manager for log files of a certain restoration, which supports read / filter from the log backup archive with static start TS / restore TS.

func CreateLogFileManager

func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFileManager, error)

CreateLogFileManager creates a log file manager using the specified config. Generally the config cannot be changed during its lifetime.

func (*LogFileManager) BuildMigrations

func (rc *LogFileManager) BuildMigrations(migs []*backuppb.Migration)

func (*LogFileManager) FilterDataFiles

func (rc *LogFileManager) FilterDataFiles(m MetaNameIter) LogIter

func (*LogFileManager) FilterMetaFiles

func (rc *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter

func (*LogFileManager) GetCompactionIter

Fetch compactions that may contain file less than the TS.

func (*LogFileManager) LoadDDLFilesAndCountDMLFiles

func (rc *LogFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context) ([]Log, error)

LoadDDLFilesAndCountDMLFiles loads all DDL files needs to be restored in the restoration. This function returns all DDL files needing directly because we need sort all of them.

func (*LogFileManager) LoadDMLFiles

func (rc *LogFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error)

LoadDMLFiles loads all DML files needs to be restored in the restoration. This function returns a stream, because there are usually many DML files need to be restored.

func (*LogFileManager) ReadAllEntries

func (rc *LogFileManager) ReadAllEntries(
	ctx context.Context,
	file Log,
	filterTS uint64,
) ([]*KvEntryWithTS, []*KvEntryWithTS, error)

ReadAllEntries loads content of a log file, with filtering out no needed entries.

func (*LogFileManager) ShiftTS

func (rc *LogFileManager) ShiftTS() uint64

func (*LogFileManager) ShouldFilterOut

func (rc *LogFileManager) ShouldFilterOut(d *backuppb.DataFileInfo) bool

ShouldFilterOut checks whether a file should be filtered out via the current client.

type LogFileManagerInit

type LogFileManagerInit struct {
	StartTS   uint64
	RestoreTS uint64
	Storage   storage.ExternalStorage

	MigrationsBuilder         *WithMigrationsBuilder
	Migrations                *WithMigrations
	MetadataDownloadBatchSize uint
	EncryptionManager         *encryption.Manager
}

LogFileManagerInit is the config needed for initializing the log file manager.

type LogFilesSkipMap

type LogFilesSkipMap struct {
	// contains filtered or unexported fields
}

func NewLogFilesSkipMap

func NewLogFilesSkipMap() *LogFilesSkipMap

func (*LogFilesSkipMap) Insert

func (m *LogFilesSkipMap) Insert(metaKey string, groupOff, fileOff int)

func (*LogFilesSkipMap) NeedSkip

func (m *LogFilesSkipMap) NeedSkip(metaKey string, groupOff, fileOff int) bool

type LogFilesSkipMapExt

type LogFilesSkipMapExt struct {
	// contains filtered or unexported fields
}

func NewLogFilesSkipMapExt

func NewLogFilesSkipMapExt() *LogFilesSkipMapExt

func (*LogFilesSkipMapExt) Insert

func (m *LogFilesSkipMapExt) Insert(metaKey string, groupOff, fileOff int)

func (*LogFilesSkipMapExt) NeedSkip

func (m *LogFilesSkipMapExt) NeedSkip(metaKey string, groupOff, fileOff int) bool

func (*LogFilesSkipMapExt) SkipGroup

func (m *LogFilesSkipMapExt) SkipGroup(metaKey string, groupOff int)

func (*LogFilesSkipMapExt) SkipMeta

func (m *LogFilesSkipMapExt) SkipMeta(metaKey string)

type LogIter

type LogIter = iter.TryNextor[*LogDataFileInfo]

LogIter is the type of iterator of each log files' meta information.

func WrapLogFilesIterWithCheckpointFailpoint

func WrapLogFilesIterWithCheckpointFailpoint(
	v failpoint.Value,
	logIter LogIter,
	rules map[int64]*restoreutils.RewriteRules,
) (LogIter, error)

type LogRestoreManager

type LogRestoreManager struct {
	// contains filtered or unexported fields
}

LogRestoreManager is a comprehensive wrapper that encapsulates all logic related to log restoration, including concurrency management, checkpoint handling, and file importing for efficient log processing.

func NewLogRestoreManager

func NewLogRestoreManager(
	ctx context.Context,
	fileImporter *LogFileImporter,
	poolSize uint,
	createCheckpointSessionFn func() (glue.Session, error),
) (*LogRestoreManager, error)

func (*LogRestoreManager) Close

func (l *LogRestoreManager) Close(ctx context.Context)

type LogSplitStrategy

type LogSplitStrategy struct {
	*split.BaseSplitStrategy
	// contains filtered or unexported fields
}

func NewLogSplitStrategy

func NewLogSplitStrategy(
	ctx context.Context,
	useCheckpoint bool,
	execCtx sqlexec.RestrictedSQLExecutor,
	rules map[int64]*restoreutils.RewriteRules,
	updateStatsFn func(uint64, uint64),
) (*LogSplitStrategy, error)

func (*LogSplitStrategy) Accumulate

func (ls *LogSplitStrategy) Accumulate(file *LogDataFileInfo)

func (*LogSplitStrategy) ShouldSkip

func (ls *LogSplitStrategy) ShouldSkip(file *LogDataFileInfo) bool

func (*LogSplitStrategy) ShouldSplit

func (ls *LogSplitStrategy) ShouldSplit() bool

type Meta

type Meta = *backuppb.Metadata

Meta is the metadata of files.

type MetaGroupIter

type MetaGroupIter = iter.TryNextor[DDLMetaGroup]

MetaGroupIter is the iterator of flushes of metadata.

type MetaIter

type MetaIter = iter.TryNextor[*backuppb.Metadata]

MetaIter is the type of iterator of metadata files' content.

type MetaMigrationsIter

type MetaMigrationsIter = iter.TryNextor[*MetaWithMigrations]

type MetaName

type MetaName struct {
	// contains filtered or unexported fields
}

type MetaNameIter

type MetaNameIter = iter.TryNextor[*MetaName]

MetaNameIter is the type of iterator of metadata files' content with name.

type MetaWithMigrations

type MetaWithMigrations struct {
	// contains filtered or unexported fields
}

func (*MetaWithMigrations) Physicals

func (mwm *MetaWithMigrations) Physicals(groupIndexIter GroupIndexIter) PhysicalMigrationsIter

type PhysicalMigrationsIter

type PhysicalMigrationsIter = iter.TryNextor[*PhysicalWithMigrations]

type PhysicalWithMigrations

type PhysicalWithMigrations struct {
	// contains filtered or unexported fields
}

func (*PhysicalWithMigrations) Logicals

func (pwm *PhysicalWithMigrations) Logicals(fileIndexIter FileIndexIter) FileIndexIter

type RPCResult

type RPCResult struct {
	Err error

	ImportError string
	StoreError  *errorpb.Error
}

RPCResult is the result after executing some RPCs to TiKV.

func RPCResultFromError

func RPCResultFromError(err error) RPCResult

func RPCResultFromPBError

func RPCResultFromPBError(err *import_sstpb.Error) RPCResult

func RPCResultOK

func RPCResultOK() RPCResult

func (*RPCResult) Error

func (r *RPCResult) Error() string

func (*RPCResult) OK

func (r *RPCResult) OK() bool

func (*RPCResult) StrategyForRetry

func (r *RPCResult) StrategyForRetry() RetryStrategy

func (*RPCResult) StrategyForRetryGoError

func (r *RPCResult) StrategyForRetryGoError() RetryStrategy

func (*RPCResult) StrategyForRetryStoreError

func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy

type RangeController

type RangeController struct {
	// contains filtered or unexported fields
}

RangeController manages the execution of operations over a range of regions. It provides functionality to scan regions within a specified key range and apply a given function to each region, handling errors and retries automatically.

func CreateRangeController

func CreateRangeController(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) RangeController

CreateRangeController creates a controller that cloud be used to scan regions in a range and apply a function over these regions. You can then call the `Run` method for applying some functions.

func (*RangeController) ApplyFuncToRange

func (o *RangeController) ApplyFuncToRange(ctx context.Context, f RegionFunc) error

ApplyFuncToRange apples the `regionFunc` for all regions in `o.start` and `o.end`. It would retry errors according to the `rpcResponse`.

type RegionFunc

type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult

type RetryStrategy

type RetryStrategy int
const (
	StrategyGiveUp RetryStrategy = iota
	StrategyFromThisRegion
	StrategyFromStart
)

type SstRestoreManager

type SstRestoreManager struct {
	// contains filtered or unexported fields
}

SstRestoreManager is a comprehensive wrapper that encapsulates all logic related to sst restoration, including concurrency management, checkpoint handling, and file importing(splitting) for efficient log processing.

func NewSstRestoreManager

func NewSstRestoreManager(
	ctx context.Context,
	snapFileImporter *snapclient.SnapFileImporter,
	concurrencyPerStore uint,
	storeCount uint,
	createCheckpointSessionFn func() (glue.Session, error),
) (*SstRestoreManager, error)

func (*SstRestoreManager) Close

func (s *SstRestoreManager) Close(ctx context.Context)

type WithMigrations

type WithMigrations struct {
	// contains filtered or unexported fields
}

func (*WithMigrations) Metas

func (wm *WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter

type WithMigrationsBuilder

type WithMigrationsBuilder struct {
	// contains filtered or unexported fields
}

func (*WithMigrationsBuilder) Build

func (builder *WithMigrationsBuilder) Build(migs []*backuppb.Migration) WithMigrations

Create the wrapper by migrations.

func (*WithMigrationsBuilder) SetShiftStartTS

func (builder *WithMigrationsBuilder) SetShiftStartTS(ts uint64)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL