local

package
v1.1.0-beta.0...-b6141ec Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: Apache-2.0 Imports: 103 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DupDetectDirSuffix is used by pre-deduplication to store the encoded index KV.
	DupDetectDirSuffix = ".dupdetect"
	// DupResultDirSuffix is used by pre-deduplication to store the duplicated row ID.
	DupResultDirSuffix = ".dupresult"
)
View Source
const (
	CompactionLowerThreshold = 512 * units.MiB
	CompactionUpperThreshold = 32 * units.GiB
)

compaction threshold

Variables

View Source
var (

	// MinDistSQLScanConcurrency is the minimum value of tidb_distsql_scan_concurrency.
	MinDistSQLScanConcurrency = 4

	// DefaultBackoffWeight is the default value of tidb_backoff_weight for checksum.
	// RegionRequestSender will retry within a maxSleep time, default is 2 * 20 = 40 seconds.
	// When TiKV client encounters an error of "region not leader", it will keep
	// retrying every 500 ms, if it still fails after maxSleep, it will return "region unavailable".
	// When there are many pending compaction bytes, TiKV might not respond within 1m,
	// and report "rpcError:wait recvLoop timeout,timeout:1m0s", and retry might
	// time out again.
	// so we enlarge it to 30 * 20 = 10 minutes.
	DefaultBackoffWeight = 15 * tikvstore.DefBackOffWeight
)
View Source
var (
	// RunInTest indicates whether the current process is running in test.
	RunInTest bool
	// LastAlloc is the last ID allocator.
	LastAlloc manual.Allocator
)
View Source
var BuildDuplicateTaskForTest = func(m *dupeDetector) ([]dupTask, error) {
	return m.buildDupTasks()
}

BuildDuplicateTaskForTest is only used for test.

View Source
var CheckTiFlashVersionForTest = checkTiFlashVersion

CheckTiFlashVersionForTest is only used for tests.

View Source
var (

	// MaxWriteAndIngestRetryTimes is the max retry times for write and ingest.
	// A large retry times is for tolerating tikv cluster failures.
	MaxWriteAndIngestRetryTimes = 30
)
View Source
var TiFlashReplicaQueryForTest = tiFlashReplicaQuery

TiFlashReplicaQueryForTest is only used for tests.

View Source
var WaitRMFolderChForTest = make(chan struct{})

WaitRMFolderChForTest is a channel for testing.

Functions

func CheckDiskQuota

func CheckDiskQuota(mgr DiskUsage, quota int64) (
	largeEngines []uuid.UUID,
	inProgressLargeEngines int,
	totalDiskSize int64,
	totalMemSize int64,
)

CheckDiskQuota verifies if the total engine file size is below the given quota. If the quota is exceeded, this method returns an array of engines, which after importing can decrease the total size below quota.

func ConvertToErrFoundConflictRecords

func ConvertToErrFoundConflictRecords(originalErr error, tbl table.Table) error

ConvertToErrFoundConflictRecords converts ErrFoundDuplicateKeys to ErrFoundDataConflictRecords or ErrFoundIndexConflictRecords error.

func EstimateCompactionThreshold

func EstimateCompactionThreshold(files []mydump.FileInfo, cp *checkpoints.TableCheckpoint, factor int64) int64

EstimateCompactionThreshold estimate SST files compression threshold by total row file size with a higher compression threshold, the compression time increases, but the iteration time decreases. Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min, we set the upper bound to 32GB to avoid too long compression time. factor is the non-clustered(1 for data engine and number of non-clustered index count for index engine).

func EstimateCompactionThreshold2

func EstimateCompactionThreshold2(totalRawFileSize int64) int64

EstimateCompactionThreshold2 estimate SST files compression threshold by total row file size see EstimateCompactionThreshold for more details.

func GetRegionSplitSizeKeys

func GetRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (
	regionSplitSize int64, regionSplitKeys int64, err error)

GetRegionSplitSizeKeys return region split size, region split keys, error

func NewDupeDetector

func NewDupeDetector(
	tbl table.Table,
	tableName string,
	splitCli split.SplitClient,
	tikvCli *tikv.KVStore,
	tikvCodec tikv.Codec,
	errMgr *errormanager.ErrorManager,
	sessOpts *encode.SessionOptions,
	concurrency int,
	logger log.Logger,
	resourceGroupName string,
	taskType string,
) (*dupeDetector, error)

NewDupeDetector creates a new dupeDetector.

func NewEncodingBuilder

func NewEncodingBuilder(ctx context.Context) encode.EncodingBuilder

NewEncodingBuilder creates an KVEncodingBuilder with local backend implementation.

func NewTargetInfoGetter

func NewTargetInfoGetter(
	tls *common.TLS,
	db *sql.DB,
	pdHTTPCli pdhttp.Client,
) backend.TargetInfoGetter

NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation. `pdHTTPCli` should not be nil when need to check component versions in CheckRequirements.

func RetrieveKeyAndValueFromErrFoundDuplicateKeys

func RetrieveKeyAndValueFromErrFoundDuplicateKeys(err error) ([]byte, []byte, error)

RetrieveKeyAndValueFromErrFoundDuplicateKeys retrieves the key and value from ErrFoundDuplicateKeys error.

func VerifyRLimit

func VerifyRLimit(estimateMaxFiles RlimT) error

VerifyRLimit checks whether the open-file limit is large enough. In Local-backend, we need to read and write a lot of L0 SST files, so we need to check system max open files limit.

Types

type Backend

type Backend struct {
	BackendConfig
	// contains filtered or unexported fields
}

Backend is a local backend.

func NewBackend

func NewBackend(
	ctx context.Context,
	tls *common.TLS,
	config BackendConfig,
	pdSvcDiscovery sd.ServiceDiscovery,
) (b *Backend, err error)

NewBackend creates new connections to tikv.

func NewBackendForTest

func NewBackendForTest(ctx context.Context, config BackendConfig, storeHelper StoreHelper) (*Backend, error)

NewBackendForTest creates a new Backend for test.

func (*Backend) CleanupEngine

func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error

CleanupEngine cleanup the engine and reclaim the space.

func (*Backend) Close

func (local *Backend) Close()

Close the local backend.

func (*Backend) CloseEngine

func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error

CloseEngine closes backend engine by uuid.

func (*Backend) CloseEngineMgr

func (local *Backend) CloseEngineMgr()

CloseEngineMgr close the engine manager. This function is used for test.

func (*Backend) EngineFileSizes

func (local *Backend) EngineFileSizes() (res []backend.EngineFileSize)

EngineFileSizes implements DiskUsage interface.

func (*Backend) FlushAllEngines

func (local *Backend) FlushAllEngines(parentCtx context.Context) (err error)

FlushAllEngines flush all engines.

func (*Backend) FlushEngine

func (local *Backend) FlushEngine(ctx context.Context, engineID uuid.UUID) error

FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart

func (*Backend) GetDupeController

func (local *Backend) GetDupeController(dupeConcurrency int, errorMgr *errormanager.ErrorManager) *DupeController

GetDupeController returns a new dupe controller.

func (*Backend) GetExternalEngineKVStatistics

func (local *Backend) GetExternalEngineKVStatistics(engineUUID uuid.UUID) (
	totalKVSize int64, totalKVCount int64)

GetExternalEngineKVStatistics returns kv statistics of some engine.

func (*Backend) GetImportedKVCount

func (local *Backend) GetImportedKVCount(engineUUID uuid.UUID) int64

GetImportedKVCount returns the number of imported KV pairs of some engine.

func (*Backend) GetTS

func (local *Backend) GetTS(ctx context.Context) (physical, logical int64, err error)

GetTS implements StoreHelper interface.

func (*Backend) GetTiKVCodec

func (local *Backend) GetTiKVCodec() tikvclient.Codec

GetTiKVCodec implements StoreHelper interface.

func (*Backend) GetWriteSpeedLimit

func (local *Backend) GetWriteSpeedLimit() int

GetWriteSpeedLimit returns the speed of the write limiter.

func (*Backend) ImportEngine

func (local *Backend) ImportEngine(
	ctx context.Context,
	engineUUID uuid.UUID,
	regionSplitSize, regionSplitKeys int64,
) error

ImportEngine imports an engine to TiKV.

func (*Backend) LocalWriter

func (local *Backend) LocalWriter(ctx context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error)

LocalWriter returns a new local writer.

func (*Backend) OpenEngine

func (local *Backend) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error

OpenEngine must be called with holding mutex of Engine.

func (*Backend) ResetEngine

func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error

ResetEngine reset the engine and reclaim the space.

func (*Backend) ResetEngineSkipAllocTS

func (local *Backend) ResetEngineSkipAllocTS(ctx context.Context, engineUUID uuid.UUID) error

ResetEngineSkipAllocTS is like ResetEngine but the inner TS of the engine is invalid. Caller must use SetTSAfterResetEngine to set a valid TS before import the engine.

func (*Backend) RetryImportDelay

func (*Backend) RetryImportDelay() time.Duration

RetryImportDelay returns the delay time before retrying to import a file.

func (*Backend) SetTSAfterResetEngine

func (local *Backend) SetTSAfterResetEngine(engineUUID uuid.UUID, ts uint64) error

SetTSAfterResetEngine allocates a new TS for the engine after it's reset. This is typically called after persisting the chosen TS of the engine to make sure TS is not changed after task failover.

func (*Backend) ShouldPostProcess

func (*Backend) ShouldPostProcess() bool

ShouldPostProcess returns true if the backend should post process the data.

func (*Backend) TotalMemoryConsume

func (local *Backend) TotalMemoryConsume() int64

TotalMemoryConsume returns the total memory usage of the local backend.

func (*Backend) UnsafeImportAndReset

func (local *Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error

UnsafeImportAndReset forces the backend to import the content of an engine into the target and then reset the engine to empty. This method will not close the engine. Make sure the engine is flushed manually before calling this method.

func (*Backend) UpdateWriteSpeedLimit

func (local *Backend) UpdateWriteSpeedLimit(limit int)

UpdateWriteSpeedLimit updates the write limiter of the backend.

type BackendConfig

type BackendConfig struct {
	// comma separated list of PD endpoints.
	PDAddr        string
	LocalStoreDir string
	// max number of cached grpc.ClientConn to a store.
	// note: this is not the limit of actual connections, each grpc.ClientConn can have one or more of it.
	MaxConnPerStore int
	// compress type when write or ingest into tikv
	ConnCompressType config.CompressionType
	// concurrency of generateJobForRange and import(write & ingest) workers
	WorkerConcurrency int
	// batch kv size when writing to TiKV
	KVWriteBatchSize       int64
	RegionSplitBatchSize   int
	RegionSplitConcurrency int
	CheckpointEnabled      bool
	// memory table size of pebble. since pebble can have multiple mem tables, the max memory used is
	// MemTableSize * MemTableStopWritesThreshold, see pebble.Options for more details.
	MemTableSize int
	// LocalWriterMemCacheSize is the memory threshold for one local writer of
	// engines. If the KV payload size exceeds LocalWriterMemCacheSize, local writer
	// will flush them into the engine.
	//
	// It has lower priority than LocalWriterConfig.Local.MemCacheSize.
	LocalWriterMemCacheSize int64
	// whether check TiKV capacity before write & ingest.
	ShouldCheckTiKV    bool
	DupeDetectEnabled  bool
	DuplicateDetectOpt common.DupDetectOpt
	// max write speed in bytes per second to each store(burst is allowed), 0 means no limit
	StoreWriteBWLimit int
	// When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall.
	// To avoid this, we should check write stall before ingesting SSTs. Note that, we
	// must check both leader node and followers in client side, because followers will
	// not check write stall as long as ingest command is accepted by leader.
	ShouldCheckWriteStall bool
	// soft limit on the number of open files that can be used by pebble DB.
	// the minimum value is 128.
	MaxOpenFiles int
	KeyspaceName string
	// the scope when pause PD schedulers.
	PausePDSchedulerScope     config.PausePDSchedulerScope
	ResourceGroupName         string
	TaskType                  string
	RaftKV2SwitchModeDuration time.Duration
	// whether disable automatic compactions of pebble db of engine.
	// deduplicate pebble db is not affected by this option.
	// see DisableAutomaticCompactions of pebble.Options for more details.
	// default true.
	DisableAutomaticCompactions bool
	BlockSize                   int
}

BackendConfig is the config for local backend.

func NewBackendConfig

func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName, taskType string, raftKV2SwitchModeDuration time.Duration) BackendConfig

NewBackendConfig creates a new BackendConfig.

type ChecksumManager

type ChecksumManager interface {
	Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error)
}

ChecksumManager is a manager that manages checksums.

func NewTiDBChecksumExecutor

func NewTiDBChecksumExecutor(db *sql.DB) ChecksumManager

NewTiDBChecksumExecutor creates a new tidb checksum executor.

type DiskUsage

type DiskUsage interface {
	// EngineFileSizes obtains the size occupied locally of all engines managed
	// by this backend. This method is used to compute disk quota.
	// It can return nil if the content are all stored remotely.
	EngineFileSizes() (res []backend.EngineFileSize)
}

DiskUsage is an interface to obtain the size occupied locally of all engines

type DupKVStream

type DupKVStream interface {
	// Next returns the next key-value pair or any error it encountered.
	// At the end of the stream, the error is io.EOF.
	Next() (key, val []byte, err error)
	// Close closes the stream.
	Close() error
}

DupKVStream is a streaming interface for collecting duplicate key-value pairs.

type DupKVStreamImpl

type DupKVStreamImpl struct {
	// contains filtered or unexported fields
}

DupKVStreamImpl implements the interface of DupKVStream. It collects duplicate key-value pairs from a pebble.DB.

func NewLocalDupKVStream

func NewLocalDupKVStream(dupDB *pebble.DB, keyAdapter common.KeyAdapter, keyRange tidbkv.KeyRange) *DupKVStreamImpl

NewLocalDupKVStream creates a new DupKVStreamImpl with the given duplicate db and key range.

func (*DupKVStreamImpl) Close

func (s *DupKVStreamImpl) Close() error

Close implements the interface of DupKVStream.

func (*DupKVStreamImpl) Next

func (s *DupKVStreamImpl) Next() (key, val []byte, err error)

Next implements the interface of DupKVStream.

type DupeController

type DupeController struct {
	// contains filtered or unexported fields
}

DupeController is used to collect duplicate keys from local and remote data source and resolve duplication.

func (*DupeController) CollectLocalDuplicateRows

func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions, algorithm config.DuplicateResolutionAlgorithm) (hasDupe bool, err error)

CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which may be repeated with other keys in local data source.

func (*DupeController) CollectRemoteDuplicateRows

func (local *DupeController) CollectRemoteDuplicateRows(
	ctx context.Context,
	tbl table.Table,
	tableName string,
	opts *encode.SessionOptions,
	algorithm config.DuplicateResolutionAlgorithm,
) (hasDupe bool, err error)

CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with the data import by other lightning. TODO: revise the returned arguments to (hasDupe bool, dupInfo *DupInfo, err error) to distinguish the conflict error and the common error

func (*DupeController) ResolveDuplicateRows

func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) (err error)

ResolveDuplicateRows resolves duplicated rows by deleting/inserting data according to the required algorithm.

type Engine

type Engine struct {
	UUID uuid.UUID
	// contains filtered or unexported fields
}

Engine is a local engine.

func (*Engine) Cleanup

func (e *Engine) Cleanup(dataDir string) error

Cleanup remove meta, db and duplicate detection files

func (*Engine) Close

func (e *Engine) Close() error

Close closes the engine and release all resources.

func (*Engine) DecRef

func (*Engine) DecRef()

DecRef implements IngestData interface.

func (*Engine) Exist

func (e *Engine) Exist(dataDir string) error

Exist checks if db folder existing (meta sometimes won't flush before lightning exit)

func (*Engine) Finish

func (e *Engine) Finish(totalBytes, totalCount int64)

Finish implements IngestData interface.

func (*Engine) GetFirstAndLastKey

func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error)

GetFirstAndLastKey reads the first and last key in range [lowerBound, upperBound) in the engine. Empty upperBound means unbounded.

func (*Engine) GetKeyRange

func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error)

GetKeyRange implements common.Engine.

func (*Engine) GetRegionSplitKeys

func (e *Engine) GetRegionSplitKeys() ([][]byte, error)

GetRegionSplitKeys implements common.Engine.

func (*Engine) GetTS

func (e *Engine) GetTS() uint64

GetTS implements IngestData interface.

func (*Engine) ID

func (e *Engine) ID() string

ID is the identifier of an engine.

func (*Engine) ImportedStatistics

func (e *Engine) ImportedStatistics() (importedSize int64, importedKVCount int64)

ImportedStatistics returns the imported kv size and imported kv count.

func (*Engine) IncRef

func (*Engine) IncRef()

IncRef implements IngestData interface.

func (*Engine) KVStatistics

func (e *Engine) KVStatistics() (totalSize int64, totalKVCount int64)

KVStatistics returns the total kv size and total kv count.

func (*Engine) LoadIngestData

func (e *Engine) LoadIngestData(
	ctx context.Context,
	outCh chan<- common.DataAndRanges,
) (err error)

LoadIngestData return (local) Engine itself because Engine has implemented IngestData interface.

func (*Engine) NewIter

func (e *Engine) NewIter(
	ctx context.Context,
	lowerBound, upperBound []byte,
	bufPool *membuf.Pool,
) common.ForwardIter

NewIter implements IngestData interface.

func (*Engine) TotalMemorySize

func (e *Engine) TotalMemorySize() int64

TotalMemorySize returns the total memory size of the engine.

type IngestLocalEngineIter

type IngestLocalEngineIter interface {
	common.ForwardIter
	// Last moves this iter to the last key.
	Last() bool
}

IngestLocalEngineIter abstract iterator method for iterator.

type Iter

type Iter interface {
	// Valid check this iter reach the end.
	Valid() bool
	// Next moves this iter forward.
	Next() bool
	// Key returns current position pair's key. The key is accessible after more
	// Next() or Key() invocations but is invalidated by Close() or ReleaseBuf().
	Key() []byte
	// Value returns current position pair's Value. The value is accessible after
	// more Next() or Value() invocations but is invalidated by Close() or
	// ReleaseBuf().
	Value() []byte
	// Close close this iter.
	Close() error
	// Error return current error on this iter.
	Error() error
}

Iter describes an iterator.

type RangePropertiesCollector

type RangePropertiesCollector struct {
	// contains filtered or unexported fields
}

RangePropertiesCollector collects range properties for each range.

func (*RangePropertiesCollector) Add

func (c *RangePropertiesCollector) Add(key pebble.InternalKey, value []byte) error

Add implements `pebble.TablePropertyCollector`. Add implements `TablePropertyCollector.Add`.

func (*RangePropertiesCollector) Finish

func (c *RangePropertiesCollector) Finish(userProps map[string]string) error

Finish implements `pebble.TablePropertyCollector`.

func (*RangePropertiesCollector) Name

Name implements `pebble.TablePropertyCollector`.

type RemoteChecksum

type RemoteChecksum struct {
	Schema     string
	Table      string
	Checksum   uint64
	TotalKVs   uint64
	TotalBytes uint64
}

RemoteChecksum represents a checksum result got from tidb.

func (*RemoteChecksum) IsEqual

func (rc *RemoteChecksum) IsEqual(other *verification.KVChecksum) bool

IsEqual checks whether the checksum is equal to the other.

type RemoteDupKVStream

type RemoteDupKVStream struct {
	// contains filtered or unexported fields
}

RemoteDupKVStream implements the interface of DupKVStream. It collects duplicate key-value pairs from a TiKV region.

func NewRemoteDupKVStream

func NewRemoteDupKVStream(
	ctx context.Context,
	region *split.RegionInfo,
	keyRange tidbkv.KeyRange,
	importClientFactory importClientFactory,
	resourceGroupName string,
	taskType string,
	minCommitTS uint64,
) (*RemoteDupKVStream, error)

NewRemoteDupKVStream creates a new RemoteDupKVStream.

func (*RemoteDupKVStream) Close

func (s *RemoteDupKVStream) Close() error

Close implements the interface of DupKVStream.

func (*RemoteDupKVStream) Next

func (s *RemoteDupKVStream) Next() (key, val []byte, err error)

Next implements the interface of DupKVStream.

type RlimT

type RlimT = uint64

RlimT is the type of rlimit values.

func GetSystemRLimit

func GetSystemRLimit() (RlimT, error)

GetSystemRLimit returns the current open-file limit.

type StoreHelper

type StoreHelper interface {
	GetTS(ctx context.Context) (physical, logical int64, err error)
	GetTiKVCodec() tikvclient.Codec
}

StoreHelper have some api to help encode or store KV data

type StoreWriteLimiter

type StoreWriteLimiter interface {
	WaitN(ctx context.Context, storeID uint64, n int) error
	Limit() int
	UpdateLimit(limit int)
}

StoreWriteLimiter is used to limit the write rate of a store.

type TiKVChecksumManager

type TiKVChecksumManager struct {
	// contains filtered or unexported fields
}

TiKVChecksumManager is a manager that can compute checksum of a table using TiKV.

func NewTiKVChecksumManager

func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint, backoffWeight int, resourceGroupName, explicitRequestSourceType string) *TiKVChecksumManager

NewTiKVChecksumManager return a new tikv checksum manager

func (*TiKVChecksumManager) Checksum

Checksum implements the ChecksumManager interface.

type TiKVModeSwitcher

type TiKVModeSwitcher interface {
	// ToImportMode switches all TiKV nodes to Import mode.
	ToImportMode(ctx context.Context, ranges ...*sstpb.Range)
	// ToNormalMode switches all TiKV nodes to Normal mode.
	ToNormalMode(ctx context.Context, ranges ...*sstpb.Range)
}

TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.

func NewTiKVModeSwitcher

func NewTiKVModeSwitcher(tls *tls.Config, pdHTTPCli pdhttp.Client, logger *zap.Logger) TiKVModeSwitcher

NewTiKVModeSwitcher creates a new TiKVModeSwitcher.

type Writer

type Writer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Writer is used to write data into a SST file.

func (*Writer) AppendRows

func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows encode.Rows) error

AppendRows appends rows to the SST file.

func (*Writer) Close

Close implements backend.ChunkFlushStatus.

func (*Writer) EstimatedSize

func (w *Writer) EstimatedSize() uint64

EstimatedSize returns the estimated size of the SST file.

func (*Writer) IsSynced

func (w *Writer) IsSynced() bool

IsSynced implements backend.ChunkFlushStatus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL