logclient

package
v1.1.0-beta.0...-cea46f1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0, Apache-2.0 Imports: 71 Imported by: 0

Documentation

Index

Constants

View Source
const MetaKVBatchSize = 64 * 1024 * 1024
View Source
const PITRIdMapBlockSize int = 524288
View Source
const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"

Variables

View Source
var TotalEntryCount int64

Functions

func ApplyKVFilesWithBatchMethod

func ApplyKVFilesWithBatchMethod(
	ctx context.Context,
	logIter LogIter,
	batchCount int,
	batchSize uint64,
	applyFunc func(files []*LogDataFileInfo, kvCount int64, size uint64),
	applyWg *sync.WaitGroup,
) error

func ApplyKVFilesWithSingleMethod

func ApplyKVFilesWithSingleMethod(
	ctx context.Context,
	files LogIter,
	applyFunc func(file []*LogDataFileInfo, kvCount int64, size uint64),
	applyWg *sync.WaitGroup,
) error

func RestoreMetaKVFilesWithBatchMethod

func RestoreMetaKVFilesWithBatchMethod(
	ctx context.Context,
	defaultFiles []*backuppb.DataFileInfo,
	writeFiles []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
	restoreBatch func(
		ctx context.Context,
		files []*backuppb.DataFileInfo,
		schemasReplace *stream.SchemasReplace,
		kvEntries []*KvEntryWithTS,
		filterTS uint64,
		updateStats func(kvCount uint64, size uint64),
		progressInc func(),
		cf string,
	) ([]*KvEntryWithTS, error),
) error

func SortMetaKVFiles

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo

Types

type CompactedFileSplitStrategy

type CompactedFileSplitStrategy struct {
	*split.BaseSplitStrategy
	// contains filtered or unexported fields
}

func NewCompactedFileSplitStrategy

func NewCompactedFileSplitStrategy(
	rules map[int64]*restoreutils.RewriteRules,
	checkpointsSet map[string]struct{},
	updateStatsFn func(uint64, uint64),
) *CompactedFileSplitStrategy

func (*CompactedFileSplitStrategy) Accumulate

func (cs *CompactedFileSplitStrategy) Accumulate(subCompaction *backuppb.LogFileSubcompaction)

func (*CompactedFileSplitStrategy) ShouldSkip

func (cs *CompactedFileSplitStrategy) ShouldSkip(subCompaction *backuppb.LogFileSubcompaction) bool

func (*CompactedFileSplitStrategy) ShouldSplit

func (cs *CompactedFileSplitStrategy) ShouldSplit() bool

type DDLMetaGroup

type DDLMetaGroup struct {
	Path      string
	FileMetas []*backuppb.DataFileInfo
}

type FileIndex

type FileIndex = iter.Indexed[*backuppb.DataFileInfo]

FileIndex is the type of logical data file with index from physical data file.

type FileIndexIter

type FileIndexIter = iter.TryNextor[FileIndex]

FileIndexIter is the type of iterator of logical data file with index from physical data file.

type FilesInRegion

type FilesInRegion struct {
	// contains filtered or unexported fields
}

type FilesInTable

type FilesInTable struct {
	// contains filtered or unexported fields
}

type FullBackupStorageConfig

type FullBackupStorageConfig struct {
	Backend *backuppb.StorageBackend
	Opts    *storage.ExternalStorageOptions
}

type GroupIndex

type GroupIndex = iter.Indexed[*backuppb.DataFileGroup]

GroupIndex is the type of physical data file with index from metadata.

type GroupIndexIter

type GroupIndexIter = iter.TryNextor[GroupIndex]

GroupIndexIter is the type of iterator of physical data file with index from metadata.

type InitSchemaConfig

type InitSchemaConfig struct {
	// required
	IsNewTask   bool
	TableFilter filter.Filter

	// optional
	TiFlashRecorder   *tiflashrec.TiFlashRecorder
	FullBackupStorage *FullBackupStorageConfig
}

type KvEntryWithTS

type KvEntryWithTS struct {
	E  kv.Entry
	Ts uint64
}

the kv entry with ts, the ts is decoded from entry.

type Log

type Log = *backuppb.DataFileInfo

Log is the metadata of one file recording KV sequences.

type LogClient

type LogClient struct {
	*LogFileManager
	// contains filtered or unexported fields
}

func NewRestoreClient

func NewRestoreClient(
	pdClient pd.Client,
	pdHTTPCli pdhttp.Client,
	tlsConf *tls.Config,
	keepaliveConf keepalive.ClientParameters,
) *LogClient

NewRestoreClient returns a new RestoreClient.

func (*LogClient) CleanUpKVFiles

func (rc *LogClient) CleanUpKVFiles(
	ctx context.Context,
) error

func (*LogClient) Close

func (rc *LogClient) Close(ctx context.Context)

Close a client.

func (*LogClient) FailpointDoChecksumForLogRestore

func (rc *LogClient) FailpointDoChecksumForLogRestore(
	ctx context.Context,
	kvClient kv.Client,
	pdClient pd.Client,
	rewriteRules map[int64]*restoreutils.RewriteRules,
) (finalErr error)

called by failpoint, only used for test it would print the checksum result into the log, and the auto-test script records them to compare another cluster's checksum.

func (*LogClient) GenGlobalID

func (rc *LogClient) GenGlobalID(ctx context.Context) (int64, error)

GenGlobalID generates a global id by transaction way.

func (*LogClient) GenGlobalIDs

func (rc *LogClient) GenGlobalIDs(ctx context.Context, n int) ([]int64, error)

GenGlobalIDs generates several global ids by transaction way.

func (*LogClient) GetClusterID

func (rc *LogClient) GetClusterID(ctx context.Context) uint64

GetClusterID gets the cluster id from down-stream cluster.

func (*LogClient) GetDomain

func (rc *LogClient) GetDomain() *domain.Domain

func (*LogClient) GetGCRows

func (rc *LogClient) GetGCRows() []*stream.PreDelRangeQuery

only for unit test

func (*LogClient) GetMigrations

func (rc *LogClient) GetMigrations(ctx context.Context) ([]*backuppb.Migration, error)

func (*LogClient) Init

func (rc *LogClient) Init(ctx context.Context, g glue.Glue, store kv.Storage) error

Init create db connection and domain for storage.

func (*LogClient) InitCheckpointMetadataForCompactedSstRestore

func (rc *LogClient) InitCheckpointMetadataForCompactedSstRestore(
	ctx context.Context,
) (map[string]struct{}, error)

func (*LogClient) InitCheckpointMetadataForLogRestore

func (rc *LogClient) InitCheckpointMetadataForLogRestore(
	ctx context.Context,
	startTS, restoredTS uint64,
	gcRatio string,
	tiflashRecorder *tiflashrec.TiFlashRecorder,
) (string, error)

func (*LogClient) InitClients

func (rc *LogClient) InitClients(
	ctx context.Context,
	backend *backuppb.StorageBackend,
	createSessionFn func() (glue.Session, error),
	concurrency uint,
	concurrencyPerStore uint,
) error

func (*LogClient) InitSchemasReplaceForDDL

func (rc *LogClient) InitSchemasReplaceForDDL(
	ctx context.Context,
	cfg *InitSchemaConfig,
	cipherInfo *backuppb.CipherInfo,
) (*stream.SchemasReplace, error)

InitSchemasReplaceForDDL gets schemas information Mapping from old schemas to new schemas. It is used to rewrite meta kv-event.

func (*LogClient) InsertGCRows

func (rc *LogClient) InsertGCRows(ctx context.Context) error

InsertGCRows insert the querys into table `gc_delete_range`

func (*LogClient) InstallLogFileManager

func (rc *LogClient) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint,
	encryptionManager *encryption.Manager) error

func (*LogClient) PreConstructAndSaveIDMap

func (rc *LogClient) PreConstructAndSaveIDMap(
	ctx context.Context,
	fsInWriteCF, fsInDefaultCF []*backuppb.DataFileInfo,
	sr *stream.SchemasReplace,
) error

PreConstructAndSaveIDMap constructs id mapping and save it.

func (*LogClient) RecordDeleteRange

func (rc *LogClient) RecordDeleteRange(sql *stream.PreDelRangeQuery)

func (*LogClient) RepairIngestIndex

func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue) error

RepairIngestIndex drops the indexes from IngestRecorder and re-add them.

func (*LogClient) RestoreBatchMetaKVFiles

func (rc *LogClient) RestoreBatchMetaKVFiles(
	ctx context.Context,
	files []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	kvEntries []*KvEntryWithTS,
	filterTS uint64,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
	cf string,
) ([]*KvEntryWithTS, error)

func (*LogClient) RestoreCompactedSstFiles

func (rc *LogClient) RestoreCompactedSstFiles(
	ctx context.Context,
	compactionsIter iter.TryNextor[*backuppb.LogFileSubcompaction],
	rules map[int64]*restoreutils.RewriteRules,
	importModeSwitcher *restore.ImportModeSwitcher,
	onProgress func(int64),
) error

func (*LogClient) RestoreKVFiles

func (rc *LogClient) RestoreKVFiles(
	ctx context.Context,
	rules map[int64]*restoreutils.RewriteRules,
	logIter LogIter,
	pitrBatchCount uint32,
	pitrBatchSize uint32,
	updateStats func(kvCount uint64, size uint64),
	onProgress func(cnt int64),
	cipherInfo *backuppb.CipherInfo,
	masterKeys []*encryptionpb.MasterKey,
) error

func (*LogClient) RestoreMetaKVFiles

func (rc *LogClient) RestoreMetaKVFiles(
	ctx context.Context,
	files []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
) error

RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.

func (*LogClient) RunGCRowsLoader

func (rc *LogClient) RunGCRowsLoader(ctx context.Context)

use channel to save the delete-range query to make it thread-safety.

func (*LogClient) SetCrypter

func (rc *LogClient) SetCrypter(crypter *backuppb.CipherInfo)

func (*LogClient) SetCurrentTS

func (rc *LogClient) SetCurrentTS(ts uint64) error

func (*LogClient) SetRawKVBatchClient

func (rc *LogClient) SetRawKVBatchClient(
	ctx context.Context,
	pdAddrs []string,
	security config.Security,
) error

func (*LogClient) SetStorage

func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error

func (*LogClient) SetUpstreamClusterID

func (rc *LogClient) SetUpstreamClusterID(upstreamClusterID uint64)

func (*LogClient) UpdateSchemaVersion

func (rc *LogClient) UpdateSchemaVersion(ctx context.Context) error

UpdateSchemaVersion updates schema version by transaction way.

func (*LogClient) WrapCompactedFilesIterWithSplitHelper

func (rc *LogClient) WrapCompactedFilesIterWithSplitHelper(
	ctx context.Context,
	compactedIter iter.TryNextor[*backuppb.LogFileSubcompaction],
	rules map[int64]*restoreutils.RewriteRules,
	checkpointSets map[string]struct{},
	updateStatsFn func(uint64, uint64),
	splitSize uint64,
	splitKeys int64,
) (iter.TryNextor[*backuppb.LogFileSubcompaction], error)

WrapCompactedFilesIteratorWithSplit applies a splitting strategy to the compacted files iterator. It uses a region splitter to handle the splitting logic based on the provided rules and checkpoint sets.

func (*LogClient) WrapLogFilesIterWithSplitHelper

func (rc *LogClient) WrapLogFilesIterWithSplitHelper(
	ctx context.Context,
	logIter LogIter,
	execCtx sqlexec.RestrictedSQLExecutor,
	rules map[int64]*restoreutils.RewriteRules,
	updateStatsFn func(uint64, uint64),
	splitSize uint64,
	splitKeys int64,
) (LogIter, error)

WrapLogFilesIteratorWithSplit applies a splitting strategy to the log files iterator. It uses a region splitter to handle the splitting logic based on the provided rules.

type LogDataFileInfo

type LogDataFileInfo struct {
	*backuppb.DataFileInfo
	MetaDataGroupName   string
	OffsetInMetaGroup   int
	OffsetInMergedGroup int
}

type LogFileImporter

type LogFileImporter struct {
	// contains filtered or unexported fields
}

func NewLogFileImporter

func NewLogFileImporter(
	metaClient split.SplitClient,
	importClient importclient.ImporterClient,
	backend *backuppb.StorageBackend,
) *LogFileImporter

NewFileImporter returns a new file importClient.

func (*LogFileImporter) ClearFiles

func (importer *LogFileImporter) ClearFiles(ctx context.Context, pdClient pd.Client, prefix string) error

func (*LogFileImporter) Close

func (importer *LogFileImporter) Close() error

func (*LogFileImporter) ImportKVFiles

func (importer *LogFileImporter) ImportKVFiles(
	ctx context.Context,
	files []*LogDataFileInfo,
	rule *restoreutils.RewriteRules,
	shiftStartTS uint64,
	startTS uint64,
	restoreTS uint64,
	supportBatch bool,
	cipherInfo *backuppb.CipherInfo,
	masterKeys []*encryptionpb.MasterKey,
) error

ImportKVFiles restores the kv events.

type LogFileManager

type LogFileManager struct {
	// contains filtered or unexported fields
}

LogFileManager is the manager for log files of a certain restoration, which supports read / filter from the log backup archive with static start TS / restore TS.

func CreateLogFileManager

func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFileManager, error)

CreateLogFileManager creates a log file manager using the specified config. Generally the config cannot be changed during its lifetime.

func (*LogFileManager) BuildMigrations

func (rc *LogFileManager) BuildMigrations(migs []*backuppb.Migration)

func (*LogFileManager) FilterDataFiles

func (rc *LogFileManager) FilterDataFiles(m MetaNameIter) LogIter

func (*LogFileManager) FilterMetaFiles

func (rc *LogFileManager) FilterMetaFiles(ms MetaNameIter) MetaGroupIter

func (*LogFileManager) GetCompactionIter

Fetch compactions that may contain file less than the TS.

func (*LogFileManager) LoadDDLFilesAndCountDMLFiles

func (rc *LogFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context) ([]Log, error)

LoadDDLFilesAndCountDMLFiles loads all DDL files needs to be restored in the restoration. This function returns all DDL files needing directly because we need sort all of them.

func (*LogFileManager) LoadDMLFiles

func (rc *LogFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error)

LoadDMLFiles loads all DML files needs to be restored in the restoration. This function returns a stream, because there are usually many DML files need to be restored.

func (*LogFileManager) ReadAllEntries

func (rc *LogFileManager) ReadAllEntries(
	ctx context.Context,
	file Log,
	filterTS uint64,
) ([]*KvEntryWithTS, []*KvEntryWithTS, error)

ReadAllEntries loads content of a log file, with filtering out no needed entries.

func (*LogFileManager) ShiftTS

func (rc *LogFileManager) ShiftTS() uint64

func (*LogFileManager) ShouldFilterOut

func (rc *LogFileManager) ShouldFilterOut(d *backuppb.DataFileInfo) bool

ShouldFilterOut checks whether a file should be filtered out via the current client.

type LogFileManagerInit

type LogFileManagerInit struct {
	StartTS   uint64
	RestoreTS uint64
	Storage   storage.ExternalStorage

	MigrationsBuilder         *WithMigrationsBuilder
	Migrations                *WithMigrations
	MetadataDownloadBatchSize uint
	EncryptionManager         *encryption.Manager
}

LogFileManagerInit is the config needed for initializing the log file manager.

type LogFilesSkipMap

type LogFilesSkipMap struct {
	// contains filtered or unexported fields
}

func NewLogFilesSkipMap

func NewLogFilesSkipMap() *LogFilesSkipMap

func (*LogFilesSkipMap) Insert

func (m *LogFilesSkipMap) Insert(metaKey string, groupOff, fileOff int)

func (*LogFilesSkipMap) NeedSkip

func (m *LogFilesSkipMap) NeedSkip(metaKey string, groupOff, fileOff int) bool

type LogFilesSkipMapExt

type LogFilesSkipMapExt struct {
	// contains filtered or unexported fields
}

func NewLogFilesSkipMapExt

func NewLogFilesSkipMapExt() *LogFilesSkipMapExt

func (*LogFilesSkipMapExt) Insert

func (m *LogFilesSkipMapExt) Insert(metaKey string, groupOff, fileOff int)

func (*LogFilesSkipMapExt) NeedSkip

func (m *LogFilesSkipMapExt) NeedSkip(metaKey string, groupOff, fileOff int) bool

func (*LogFilesSkipMapExt) SkipGroup

func (m *LogFilesSkipMapExt) SkipGroup(metaKey string, groupOff int)

func (*LogFilesSkipMapExt) SkipMeta

func (m *LogFilesSkipMapExt) SkipMeta(metaKey string)

type LogIter

type LogIter = iter.TryNextor[*LogDataFileInfo]

LogIter is the type of iterator of each log files' meta information.

func WrapLogFilesIterWithCheckpointFailpoint

func WrapLogFilesIterWithCheckpointFailpoint(
	v failpoint.Value,
	logIter LogIter,
	rules map[int64]*restoreutils.RewriteRules,
) (LogIter, error)

type LogRestoreManager

type LogRestoreManager struct {
	// contains filtered or unexported fields
}

LogRestoreManager is a comprehensive wrapper that encapsulates all logic related to log restoration, including concurrency management, checkpoint handling, and file importing for efficient log processing.

func NewLogRestoreManager

func NewLogRestoreManager(
	ctx context.Context,
	fileImporter *LogFileImporter,
	poolSize uint,
	createCheckpointSessionFn func() (glue.Session, error),
) (*LogRestoreManager, error)

func (*LogRestoreManager) Close

func (l *LogRestoreManager) Close(ctx context.Context)

type LogSplitStrategy

type LogSplitStrategy struct {
	*split.BaseSplitStrategy
	// contains filtered or unexported fields
}

func NewLogSplitStrategy

func NewLogSplitStrategy(
	ctx context.Context,
	useCheckpoint bool,
	execCtx sqlexec.RestrictedSQLExecutor,
	rules map[int64]*restoreutils.RewriteRules,
	updateStatsFn func(uint64, uint64),
) (*LogSplitStrategy, error)

func (*LogSplitStrategy) Accumulate

func (ls *LogSplitStrategy) Accumulate(file *LogDataFileInfo)

func (*LogSplitStrategy) ShouldSkip

func (ls *LogSplitStrategy) ShouldSkip(file *LogDataFileInfo) bool

func (*LogSplitStrategy) ShouldSplit

func (ls *LogSplitStrategy) ShouldSplit() bool

type Meta

type Meta = *backuppb.Metadata

Meta is the metadata of files.

type MetaGroupIter

type MetaGroupIter = iter.TryNextor[DDLMetaGroup]

MetaGroupIter is the iterator of flushes of metadata.

type MetaIter

type MetaIter = iter.TryNextor[*backuppb.Metadata]

MetaIter is the type of iterator of metadata files' content.

type MetaMigrationsIter

type MetaMigrationsIter = iter.TryNextor[*MetaWithMigrations]

type MetaName

type MetaName struct {
	// contains filtered or unexported fields
}

type MetaNameIter

type MetaNameIter = iter.TryNextor[*MetaName]

MetaNameIter is the type of iterator of metadata files' content with name.

type MetaWithMigrations

type MetaWithMigrations struct {
	// contains filtered or unexported fields
}

func (*MetaWithMigrations) Physicals

func (mwm *MetaWithMigrations) Physicals(groupIndexIter GroupIndexIter) PhysicalMigrationsIter

type PhysicalMigrationsIter

type PhysicalMigrationsIter = iter.TryNextor[*PhysicalWithMigrations]

type PhysicalWithMigrations

type PhysicalWithMigrations struct {
	// contains filtered or unexported fields
}

func (*PhysicalWithMigrations) Logicals

func (pwm *PhysicalWithMigrations) Logicals(fileIndexIter FileIndexIter) FileIndexIter

type RPCResult

type RPCResult struct {
	Err error

	ImportError string
	StoreError  *errorpb.Error
}

RPCResult is the result after executing some RPCs to TiKV.

func RPCResultFromError

func RPCResultFromError(err error) RPCResult

func RPCResultFromPBError

func RPCResultFromPBError(err *import_sstpb.Error) RPCResult

func RPCResultOK

func RPCResultOK() RPCResult

func (*RPCResult) Error

func (r *RPCResult) Error() string

func (*RPCResult) OK

func (r *RPCResult) OK() bool

func (*RPCResult) StrategyForRetry

func (r *RPCResult) StrategyForRetry() RetryStrategy

func (*RPCResult) StrategyForRetryGoError

func (r *RPCResult) StrategyForRetryGoError() RetryStrategy

func (*RPCResult) StrategyForRetryStoreError

func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy

type RangeController

type RangeController struct {
	// contains filtered or unexported fields
}

RangeController manages the execution of operations over a range of regions. It provides functionality to scan regions within a specified key range and apply a given function to each region, handling errors and retries automatically.

func CreateRangeController

func CreateRangeController(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) RangeController

CreateRangeController creates a controller that cloud be used to scan regions in a range and apply a function over these regions. You can then call the `Run` method for applying some functions.

func (*RangeController) ApplyFuncToRange

func (o *RangeController) ApplyFuncToRange(ctx context.Context, f RegionFunc) error

ApplyFuncToRange apples the `regionFunc` for all regions in `o.start` and `o.end`. It would retry errors according to the `rpcResponse`.

type RegionFunc

type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult

type RetryStrategy

type RetryStrategy int
const (
	StrategyGiveUp RetryStrategy = iota
	StrategyFromThisRegion
	StrategyFromStart
)

type SstRestoreManager

type SstRestoreManager struct {
	// contains filtered or unexported fields
}

SstRestoreManager is a comprehensive wrapper that encapsulates all logic related to sst restoration, including concurrency management, checkpoint handling, and file importing(splitting) for efficient log processing.

func NewSstRestoreManager

func NewSstRestoreManager(
	ctx context.Context,
	snapFileImporter *snapclient.SnapFileImporter,
	concurrencyPerStore uint,
	storeCount uint,
	createCheckpointSessionFn func() (glue.Session, error),
) (*SstRestoreManager, error)

func (*SstRestoreManager) Close

func (s *SstRestoreManager) Close(ctx context.Context)

type WithMigrations

type WithMigrations struct {
	// contains filtered or unexported fields
}

func (*WithMigrations) Metas

func (wm *WithMigrations) Metas(metaNameIter MetaNameIter) MetaMigrationsIter

type WithMigrationsBuilder

type WithMigrationsBuilder struct {
	// contains filtered or unexported fields
}

func (*WithMigrationsBuilder) Build

func (builder *WithMigrationsBuilder) Build(migs []*backuppb.Migration) WithMigrations

Create the wrapper by migrations.

func (*WithMigrationsBuilder) SetShiftStartTS

func (builder *WithMigrationsBuilder) SetShiftStartTS(ts uint64)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL