logclient

package
v1.1.0-beta.0...-1acbbec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 22, 2024 License: Apache-2.0, Apache-2.0 Imports: 68 Imported by: 0

Documentation

Index

Constants

View Source
const MetaKVBatchSize = 64 * 1024 * 1024
View Source
const PITRIdMapBlockSize int = 524288
View Source
const SplitFilesBufferSize = 4096
View Source
const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL"

Variables

This section is empty.

Functions

func ApplyKVFilesWithBatchMethod

func ApplyKVFilesWithBatchMethod(
	ctx context.Context,
	logIter LogIter,
	batchCount int,
	batchSize uint64,
	applyFunc func(files []*LogDataFileInfo, kvCount int64, size uint64),
	applyWg *sync.WaitGroup,
) error

func ApplyKVFilesWithSingelMethod

func ApplyKVFilesWithSingelMethod(
	ctx context.Context,
	files LogIter,
	applyFunc func(file []*LogDataFileInfo, kvCount int64, size uint64),
	applyWg *sync.WaitGroup,
) error

func RestoreMetaKVFilesWithBatchMethod

func RestoreMetaKVFilesWithBatchMethod(
	ctx context.Context,
	defaultFiles []*backuppb.DataFileInfo,
	writeFiles []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
	restoreBatch func(
		ctx context.Context,
		files []*backuppb.DataFileInfo,
		schemasReplace *stream.SchemasReplace,
		kvEntries []*KvEntryWithTS,
		filterTS uint64,
		updateStats func(kvCount uint64, size uint64),
		progressInc func(),
		cf string,
	) ([]*KvEntryWithTS, error),
) error

func SortMetaKVFiles

func SortMetaKVFiles(files []*backuppb.DataFileInfo) []*backuppb.DataFileInfo

Types

type DDLMetaGroup

type DDLMetaGroup struct {
	Path      string
	FileMetas []*backuppb.DataFileInfo
}

type FilesInRegion

type FilesInRegion struct {
	// contains filtered or unexported fields
}

type FilesInTable

type FilesInTable struct {
	// contains filtered or unexported fields
}

type FullBackupStorageConfig

type FullBackupStorageConfig struct {
	Backend *backuppb.StorageBackend
	Opts    *storage.ExternalStorageOptions
}

type InitSchemaConfig

type InitSchemaConfig struct {
	// required
	IsNewTask   bool
	TableFilter filter.Filter

	// optional
	TiFlashRecorder   *tiflashrec.TiFlashRecorder
	FullBackupStorage *FullBackupStorageConfig
}

type KvEntryWithTS

type KvEntryWithTS struct {
	E  kv.Entry
	Ts uint64
}

the kv entry with ts, the ts is decoded from entry.

type Log

type Log = *backuppb.DataFileInfo

Log is the metadata of one file recording KV sequences.

type LogClient

type LogClient struct {
	*LogFileManager
	// contains filtered or unexported fields
}

func NewRestoreClient

func NewRestoreClient(
	pdClient pd.Client,
	pdHTTPCli pdhttp.Client,
	tlsConf *tls.Config,
	keepaliveConf keepalive.ClientParameters,
) *LogClient

NewRestoreClient returns a new RestoreClient.

func (*LogClient) CleanUpKVFiles

func (rc *LogClient) CleanUpKVFiles(
	ctx context.Context,
) error

func (*LogClient) Close

func (rc *LogClient) Close()

Close a client.

func (*LogClient) FailpointDoChecksumForLogRestore

func (rc *LogClient) FailpointDoChecksumForLogRestore(
	ctx context.Context,
	kvClient kv.Client,
	pdClient pd.Client,
	idrules map[int64]int64,
	rewriteRules map[int64]*restoreutils.RewriteRules,
) (finalErr error)

called by failpoint, only used for test it would print the checksum result into the log, and the auto-test script records them to compare another cluster's checksum.

func (*LogClient) GenGlobalID

func (rc *LogClient) GenGlobalID(ctx context.Context) (int64, error)

GenGlobalID generates a global id by transaction way.

func (*LogClient) GenGlobalIDs

func (rc *LogClient) GenGlobalIDs(ctx context.Context, n int) ([]int64, error)

GenGlobalIDs generates several global ids by transaction way.

func (*LogClient) GetClusterID

func (rc *LogClient) GetClusterID(ctx context.Context) uint64

GetClusterID gets the cluster id from down-stream cluster.

func (*LogClient) GetDomain

func (rc *LogClient) GetDomain() *domain.Domain

func (*LogClient) GetGCRows

func (rc *LogClient) GetGCRows() []*stream.PreDelRangeQuery

only for unit test

func (*LogClient) Init

func (rc *LogClient) Init(g glue.Glue, store kv.Storage) error

Init create db connection and domain for storage.

func (*LogClient) InitCheckpointMetadataForLogRestore

func (rc *LogClient) InitCheckpointMetadataForLogRestore(ctx context.Context, taskName string, gcRatio string) (string, error)

func (*LogClient) InitClients

func (rc *LogClient) InitClients(ctx context.Context, backend *backuppb.StorageBackend)

func (*LogClient) InitSchemasReplaceForDDL

func (rc *LogClient) InitSchemasReplaceForDDL(
	ctx context.Context,
	cfg *InitSchemaConfig,
) (*stream.SchemasReplace, error)

InitSchemasReplaceForDDL gets schemas information Mapping from old schemas to new schemas. It is used to rewrite meta kv-event.

func (*LogClient) InsertGCRows

func (rc *LogClient) InsertGCRows(ctx context.Context) error

InsertGCRows insert the querys into table `gc_delete_range`

func (*LogClient) InstallLogFileManager

func (rc *LogClient) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint) error

func (*LogClient) PreConstructAndSaveIDMap

func (rc *LogClient) PreConstructAndSaveIDMap(
	ctx context.Context,
	fsInWriteCF, fsInDefaultCF []*backuppb.DataFileInfo,
	sr *stream.SchemasReplace,
) error

PreConstructAndSaveIDMap constructs id mapping and save it.

func (*LogClient) RecordDeleteRange

func (rc *LogClient) RecordDeleteRange(sql *stream.PreDelRangeQuery)

func (*LogClient) RepairIngestIndex

func (rc *LogClient) RepairIngestIndex(ctx context.Context, ingestRecorder *ingestrec.IngestRecorder, g glue.Glue, taskName string) error

RepairIngestIndex drops the indexes from IngestRecorder and re-add them.

func (*LogClient) RestoreBatchMetaKVFiles

func (rc *LogClient) RestoreBatchMetaKVFiles(
	ctx context.Context,
	files []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	kvEntries []*KvEntryWithTS,
	filterTS uint64,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
	cf string,
) ([]*KvEntryWithTS, error)

func (*LogClient) RestoreKVFiles

func (rc *LogClient) RestoreKVFiles(
	ctx context.Context,
	rules map[int64]*restoreutils.RewriteRules,
	idrules map[int64]int64,
	logIter LogIter,
	runner *checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType],
	pitrBatchCount uint32,
	pitrBatchSize uint32,
	updateStats func(kvCount uint64, size uint64),
	onProgress func(cnt int64),
) error

func (*LogClient) RestoreMetaKVFiles

func (rc *LogClient) RestoreMetaKVFiles(
	ctx context.Context,
	files []*backuppb.DataFileInfo,
	schemasReplace *stream.SchemasReplace,
	updateStats func(kvCount uint64, size uint64),
	progressInc func(),
) error

RestoreMetaKVFiles tries to restore files about meta kv-event from stream-backup.

func (*LogClient) RunGCRowsLoader

func (rc *LogClient) RunGCRowsLoader(ctx context.Context)

use channel to save the delete-range query to make it thread-safety.

func (*LogClient) SetConcurrency

func (rc *LogClient) SetConcurrency(c uint)

func (*LogClient) SetCrypter

func (rc *LogClient) SetCrypter(crypter *backuppb.CipherInfo)

func (*LogClient) SetCurrentTS

func (rc *LogClient) SetCurrentTS(ts uint64)

func (*LogClient) SetRawKVBatchClient

func (rc *LogClient) SetRawKVBatchClient(
	ctx context.Context,
	pdAddrs []string,
	security config.Security,
) error

func (*LogClient) SetStorage

func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error

func (*LogClient) SetUpstreamClusterID

func (rc *LogClient) SetUpstreamClusterID(upstreamClusterID uint64)

func (*LogClient) UpdateSchemaVersion

func (rc *LogClient) UpdateSchemaVersion(ctx context.Context) error

UpdateSchemaVersion updates schema version by transaction way.

func (*LogClient) WrapLogFilesIterWithCheckpoint

func (rc *LogClient) WrapLogFilesIterWithCheckpoint(
	ctx context.Context,
	logIter LogIter,
	downstreamIdset map[int64]struct{},
	taskName string,
	updateStats func(kvCount, size uint64),
	onProgress func(),
) (LogIter, error)

func (*LogClient) WrapLogFilesIterWithSplitHelper

func (rc *LogClient) WrapLogFilesIterWithSplitHelper(logIter LogIter, rules map[int64]*restoreutils.RewriteRules, g glue.Glue, store kv.Storage) (LogIter, error)

type LogDataFileInfo

type LogDataFileInfo struct {
	*backuppb.DataFileInfo
	MetaDataGroupName   string
	OffsetInMetaGroup   int
	OffsetInMergedGroup int
}

type LogFileImporter

type LogFileImporter struct {
	// contains filtered or unexported fields
}

func NewLogFileImporter

func NewLogFileImporter(
	metaClient split.SplitClient,
	importClient importclient.ImporterClient,
	backend *backuppb.StorageBackend,
) *LogFileImporter

NewFileImporter returns a new file importClient.

func (*LogFileImporter) ClearFiles

func (importer *LogFileImporter) ClearFiles(ctx context.Context, pdClient pd.Client, prefix string) error

func (*LogFileImporter) Close

func (importer *LogFileImporter) Close() error

func (*LogFileImporter) ImportKVFiles

func (importer *LogFileImporter) ImportKVFiles(
	ctx context.Context,
	files []*LogDataFileInfo,
	rule *restoreutils.RewriteRules,
	shiftStartTS uint64,
	startTS uint64,
	restoreTS uint64,
	supportBatch bool,
) error

ImportKVFiles restores the kv events.

type LogFileManager

type LogFileManager struct {
	// contains filtered or unexported fields
}

LogFileManager is the manager for log files of a certain restoration, which supports read / filter from the log backup archive with static start TS / restore TS.

func CreateLogFileManager

func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*LogFileManager, error)

CreateLogFileManager creates a log file manager using the specified config. Generally the config cannot be changed during its lifetime.

func (*LogFileManager) FilterDataFiles

func (rc *LogFileManager) FilterDataFiles(ms MetaIter) LogIter

func (*LogFileManager) FilterMetaFiles

func (rc *LogFileManager) FilterMetaFiles(ms MetaIter) MetaGroupIter

func (*LogFileManager) LoadDDLFilesAndCountDMLFiles

func (rc *LogFileManager) LoadDDLFilesAndCountDMLFiles(ctx context.Context, counter *int) ([]Log, error)

LoadDDLFilesAndCountDMLFiles loads all DDL files needs to be restored in the restoration. At the same time, if the `counter` isn't nil, counting the DML file needs to be restored into `counter`. This function returns all DDL files needing directly because we need sort all of them.

func (*LogFileManager) LoadDMLFiles

func (rc *LogFileManager) LoadDMLFiles(ctx context.Context) (LogIter, error)

LoadDMLFiles loads all DML files needs to be restored in the restoration. This function returns a stream, because there are usually many DML files need to be restored.

func (*LogFileManager) ReadAllEntries

func (rc *LogFileManager) ReadAllEntries(
	ctx context.Context,
	file Log,
	filterTS uint64,
) ([]*KvEntryWithTS, []*KvEntryWithTS, error)

ReadAllEntries loads content of a log file, with filtering out no needed entries.

func (*LogFileManager) ShiftTS

func (rc *LogFileManager) ShiftTS() uint64

func (*LogFileManager) ShouldFilterOut

func (rc *LogFileManager) ShouldFilterOut(d *backuppb.DataFileInfo) bool

ShouldFilterOut checks whether a file should be filtered out via the current client.

type LogFileManagerInit

type LogFileManagerInit struct {
	StartTS   uint64
	RestoreTS uint64
	Storage   storage.ExternalStorage

	MetadataDownloadBatchSize uint
}

LogFileManagerInit is the config needed for initializing the log file manager.

type LogFilesIterWithSplitHelper

type LogFilesIterWithSplitHelper struct {
	// contains filtered or unexported fields
}

func (*LogFilesIterWithSplitHelper) TryNext

type LogFilesSkipMap

type LogFilesSkipMap struct {
	// contains filtered or unexported fields
}

func NewLogFilesSkipMap

func NewLogFilesSkipMap() *LogFilesSkipMap

func (*LogFilesSkipMap) Insert

func (m *LogFilesSkipMap) Insert(metaKey string, groupOff, fileOff int)

func (*LogFilesSkipMap) NeedSkip

func (m *LogFilesSkipMap) NeedSkip(metaKey string, groupOff, fileOff int) bool

type LogIter

type LogIter = iter.TryNextor[*LogDataFileInfo]

LogIter is the type of iterator of each log files' meta information.

func NewLogFilesIterWithSplitHelper

func NewLogFilesIterWithSplitHelper(iter LogIter, rules map[int64]*restoreutils.RewriteRules, client split.SplitClient, splitSize uint64, splitKeys int64) LogIter

type Meta

type Meta = *backuppb.Metadata

Meta is the metadata of files.

type MetaGroupIter

type MetaGroupIter = iter.TryNextor[DDLMetaGroup]

MetaGroupIter is the iterator of flushes of metadata.

type MetaIter

type MetaIter = iter.TryNextor[*backuppb.Metadata]

MetaIter is the type of iterator of metadata files' content.

type OverRegionsInRangeController

type OverRegionsInRangeController struct {
	// contains filtered or unexported fields
}

func OverRegionsInRange

func OverRegionsInRange(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) OverRegionsInRangeController

OverRegionsInRange creates a controller that cloud be used to scan regions in a range and apply a function over these regions. You can then call the `Run` method for applying some functions.

func (*OverRegionsInRangeController) Run

Run executes the `regionFunc` over the regions in `o.start` and `o.end`. It would retry the errors according to the `rpcResponse`.

type RPCResult

type RPCResult struct {
	Err error

	ImportError string
	StoreError  *errorpb.Error
}

RPCResult is the result after executing some RPCs to TiKV.

func RPCResultFromError

func RPCResultFromError(err error) RPCResult

func RPCResultFromPBError

func RPCResultFromPBError(err *import_sstpb.Error) RPCResult

func RPCResultOK

func RPCResultOK() RPCResult

func (*RPCResult) Error

func (r *RPCResult) Error() string

func (*RPCResult) OK

func (r *RPCResult) OK() bool

func (*RPCResult) StrategyForRetry

func (r *RPCResult) StrategyForRetry() RetryStrategy

func (*RPCResult) StrategyForRetryGoError

func (r *RPCResult) StrategyForRetryGoError() RetryStrategy

func (*RPCResult) StrategyForRetryStoreError

func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy

type RegionFunc

type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult

type RetryStrategy

type RetryStrategy int
const (
	StrategyGiveUp RetryStrategy = iota
	StrategyFromThisRegion
	StrategyFromStart
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL