Documentation ¶
Index ¶
- Constants
- Variables
- func CheckDiskQuota(mgr DiskUsage, quota int64) (largeEngines []uuid.UUID, inProgressLargeEngines int, totalDiskSize int64, ...)
- func EstimateCompactionThreshold(files []mydump.FileInfo, cp *checkpoints.TableCheckpoint, factor int64) int64
- func EstimateCompactionThreshold2(totalRawFileSize int64) int64
- func GetRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (regionSplitSize int64, regionSplitKeys int64, err error)
- func NewEncodingBuilder(ctx context.Context) encode.EncodingBuilder
- func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.TargetInfoGetter
- func VerifyRLimit(estimateMaxFiles RlimT) error
- type Backend
- func (local *Backend) BatchSplitRegions(ctx context.Context, region *split.RegionInfo, keys [][]byte) (*split.RegionInfo, []*split.RegionInfo, error)
- func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error
- func (local *Backend) Close()
- func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error
- func (local *Backend) EngineFileSizes() (res []backend.EngineFileSize)
- func (local *Backend) FlushAllEngines(parentCtx context.Context) (err error)
- func (local *Backend) FlushEngine(ctx context.Context, engineID uuid.UUID) error
- func (local *Backend) GetDupeController(dupeConcurrency int, errorMgr *errormanager.ErrorManager) *DupeController
- func (local *Backend) GetExternalEngineKVStatistics(engineUUID uuid.UUID) (totalKVSize int64, totalKVCount int64)
- func (local *Backend) GetImportedKVCount(engineUUID uuid.UUID) int64
- func (local *Backend) GetPDClient() pd.Client
- func (local *Backend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, ...) error
- func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error)
- func (local *Backend) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error
- func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) error
- func (*Backend) RetryImportDelay() time.Duration
- func (local *Backend) ScatterRegion(ctx context.Context, regionInfo *split.RegionInfo) error
- func (*Backend) ShouldPostProcess() bool
- func (local *Backend) SplitAndScatterRegionByRanges(ctx context.Context, ranges []common.Range, needSplit bool) (err error)
- func (local *Backend) SplitAndScatterRegionInBatches(ctx context.Context, ranges []common.Range, needSplit bool, batchCnt int) error
- func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []common.Range) (<-chan struct{}, error)
- func (local *Backend) TotalMemoryConsume() int64
- func (local *Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, ...) error
- type BackendConfig
- type ChecksumManager
- type DiskUsage
- type DupKVStream
- type DupKVStreamImpl
- type DupeController
- func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, ...) (hasDupe bool, err error)
- func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, ...) (hasDupe bool, err error)
- func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, ...) (err error)
- type DupeDetector
- func (m *DupeDetector) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB *pebble.DB, keyAdapter common.KeyAdapter) error
- func (m *DupeDetector) CollectDuplicateRowsFromTiKV(ctx context.Context, importClientFactory ImportClientFactory) error
- func (m *DupeDetector) HasDuplicate() bool
- func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKVStream) error
- func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupKVStream, tableID int64, ...) error
- type Engine
- func (e *Engine) Cleanup(dataDir string) error
- func (e *Engine) Close() error
- func (*Engine) DecRef()
- func (e *Engine) Exist(dataDir string) error
- func (e *Engine) Finish(totalBytes, totalCount int64)
- func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error)
- func (e *Engine) GetKeyRange() (startKey []byte, endKey []byte, err error)
- func (e *Engine) GetTS() uint64
- func (e *Engine) ID() string
- func (e *Engine) ImportedStatistics() (importedSize int64, importedKVCount int64)
- func (*Engine) IncRef()
- func (e *Engine) KVStatistics() (totalSize int64, totalKVCount int64)
- func (e *Engine) LoadIngestData(ctx context.Context, regionRanges []common.Range, ...) error
- func (e *Engine) NewIter(ctx context.Context, lowerBound, upperBound []byte) common.ForwardIter
- func (e *Engine) SplitRanges(startKey, endKey []byte, sizeLimit, keysLimit int64, logger log.Logger) ([]common.Range, error)
- func (e *Engine) TotalMemorySize() int64
- type ImportClientFactory
- type Iter
- type RangePropertiesCollector
- type RemoteChecksum
- type RemoteDupKVStream
- type RlimT
- type StoreWriteLimiter
- type TableRegionSizeGetter
- type TableRegionSizeGetterImpl
- type TiKVChecksumManager
- type TiKVModeSwitcher
- type Writer
Constants ¶
const ( // DupDetectDirSuffix is used by pre-deduplication to store the encoded index KV. DupDetectDirSuffix = ".dupdetect" // DupResultDirSuffix is used by pre-deduplication to store the duplicated row ID. DupResultDirSuffix = ".dupresult" )
const ( CompactionLowerThreshold = 512 * units.MiB CompactionUpperThreshold = 32 * units.GiB )
compaction threshold
Variables ¶
var ( // MinDistSQLScanConcurrency is the minimum value of tidb_distsql_scan_concurrency. MinDistSQLScanConcurrency = 4 // DefaultBackoffWeight is the default value of tidb_backoff_weight for checksum. // RegionRequestSender will retry within a maxSleep time, default is 2 * 20 = 40 seconds. // When TiKV client encounters an error of "region not leader", it will keep // retrying every 500 ms, if it still fails after maxSleep, it will return "region unavailable". // When there are many pending compaction bytes, TiKV might not respond within 1m, // and report "rpcError:wait recvLoop timeout,timeout:1m0s", and retry might // time out again. // so we enlarge it to 30 * 20 = 10 minutes. DefaultBackoffWeight = 15 * tikvstore.DefBackOffWeight )
var ( // RunInTest indicates whether the current process is running in test. RunInTest bool // LastAlloc is the last ID allocator. LastAlloc manual.Allocator )
var BuildDuplicateTaskForTest = func(m *DupeDetector) ([]dupTask, error) {
return m.buildDupTasks()
}
BuildDuplicateTaskForTest is only used for test.
var CheckTiFlashVersionForTest = checkTiFlashVersion
CheckTiFlashVersionForTest is only used for tests.
var ( // MaxWriteAndIngestRetryTimes is the max retry times for write and ingest. // A large retry times is for tolerating tikv cluster failures. MaxWriteAndIngestRetryTimes = 30 )
var TiFlashReplicaQueryForTest = tiFlashReplicaQuery
TiFlashReplicaQueryForTest is only used for tests.
var WaitRMFolderChForTest = make(chan struct{})
WaitRMFolderChForTest is a channel for testing.
Functions ¶
func CheckDiskQuota ¶
func CheckDiskQuota(mgr DiskUsage, quota int64) ( largeEngines []uuid.UUID, inProgressLargeEngines int, totalDiskSize int64, totalMemSize int64, )
CheckDiskQuota verifies if the total engine file size is below the given quota. If the quota is exceeded, this method returns an array of engines, which after importing can decrease the total size below quota.
func EstimateCompactionThreshold ¶
func EstimateCompactionThreshold(files []mydump.FileInfo, cp *checkpoints.TableCheckpoint, factor int64) int64
EstimateCompactionThreshold estimate SST files compression threshold by total row file size with a higher compression threshold, the compression time increases, but the iteration time decreases. Try to limit the total SST files number under 500. But size compress 32GB SST files cost about 20min, we set the upper bound to 32GB to avoid too long compression time. factor is the non-clustered(1 for data engine and number of non-clustered index count for index engine).
func EstimateCompactionThreshold2 ¶
EstimateCompactionThreshold2 estimate SST files compression threshold by total row file size see EstimateCompactionThreshold for more details.
func GetRegionSplitSizeKeys ¶
func GetRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) ( regionSplitSize int64, regionSplitKeys int64, err error)
GetRegionSplitSizeKeys return region split size, region split keys, error
func NewEncodingBuilder ¶
func NewEncodingBuilder(ctx context.Context) encode.EncodingBuilder
NewEncodingBuilder creates an KVEncodingBuilder with local backend implementation.
func NewTargetInfoGetter ¶
NewTargetInfoGetter creates an TargetInfoGetter with local backend implementation.
func VerifyRLimit ¶
VerifyRLimit checks whether the open-file limit is large enough. In Local-backend, we need to read and write a lot of L0 SST files, so we need to check system max open files limit.
Types ¶
type Backend ¶
type Backend struct { BackendConfig // contains filtered or unexported fields }
Backend is a local backend.
func NewBackend ¶
func NewBackend( ctx context.Context, tls *common.TLS, config BackendConfig, regionSizeGetter TableRegionSizeGetter, ) (b *Backend, err error)
NewBackend creates new connections to tikv.
func (*Backend) BatchSplitRegions ¶
func (local *Backend) BatchSplitRegions( ctx context.Context, region *split.RegionInfo, keys [][]byte, ) (*split.RegionInfo, []*split.RegionInfo, error)
BatchSplitRegions will split regions by the given split keys and tries to scatter new regions. If split/scatter fails because new region is not ready, this function will not return error.
func (*Backend) CleanupEngine ¶
CleanupEngine cleanup the engine and reclaim the space.
func (*Backend) CloseEngine ¶
func (local *Backend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error
CloseEngine closes backend engine by uuid.
func (*Backend) EngineFileSizes ¶
func (local *Backend) EngineFileSizes() (res []backend.EngineFileSize)
EngineFileSizes implements DiskUsage interface.
func (*Backend) FlushAllEngines ¶
FlushAllEngines flush all engines.
func (*Backend) FlushEngine ¶
FlushEngine ensure the written data is saved successfully, to make sure no data lose after restart
func (*Backend) GetDupeController ¶
func (local *Backend) GetDupeController(dupeConcurrency int, errorMgr *errormanager.ErrorManager) *DupeController
GetDupeController returns a new dupe controller.
func (*Backend) GetExternalEngineKVStatistics ¶
func (local *Backend) GetExternalEngineKVStatistics(engineUUID uuid.UUID) ( totalKVSize int64, totalKVCount int64)
GetExternalEngineKVStatistics returns kv statistics of some engine.
func (*Backend) GetImportedKVCount ¶
GetImportedKVCount returns the number of imported KV pairs of some engine.
func (*Backend) GetPDClient ¶
GetPDClient returns the PD client.
func (*Backend) ImportEngine ¶
func (local *Backend) ImportEngine( ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64, ) error
ImportEngine imports an engine to TiKV.
func (*Backend) LocalWriter ¶
func (local *Backend) LocalWriter(_ context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error)
LocalWriter returns a new local writer.
func (*Backend) OpenEngine ¶
func (local *Backend) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error
OpenEngine must be called with holding mutex of Engine.
func (*Backend) ResetEngine ¶
ResetEngine reset the engine and reclaim the space.
func (*Backend) RetryImportDelay ¶
RetryImportDelay returns the delay time before retrying to import a file.
func (*Backend) ScatterRegion ¶
ScatterRegion scatter the regions and retry if it fails. It returns error if can not scatter after max_retry.
func (*Backend) ShouldPostProcess ¶
ShouldPostProcess returns true if the backend should post process the data.
func (*Backend) SplitAndScatterRegionByRanges ¶
func (local *Backend) SplitAndScatterRegionByRanges( ctx context.Context, ranges []common.Range, needSplit bool, ) (err error)
SplitAndScatterRegionByRanges include region split & scatter operation just like br. we can simply call br function, but we need to change some function signature of br When the ranges total size is small, we can skip the split to avoid generate empty regions. TODO: remove this file and use br internal functions
func (*Backend) SplitAndScatterRegionInBatches ¶
func (local *Backend) SplitAndScatterRegionInBatches( ctx context.Context, ranges []common.Range, needSplit bool, batchCnt int, ) error
SplitAndScatterRegionInBatches splits&scatter regions in batches. Too many split&scatter requests may put a lot of pressure on TiKV and PD.
func (*Backend) SwitchModeByKeyRanges ¶
func (local *Backend) SwitchModeByKeyRanges(ctx context.Context, ranges []common.Range) (<-chan struct{}, error)
SwitchModeByKeyRanges will switch tikv mode for regions in the specific key range for multirocksdb. This function will spawn a goroutine to keep switch mode periodically until the context is done. The return done channel is used to notify the caller that the background goroutine is exited.
func (*Backend) TotalMemoryConsume ¶
TotalMemoryConsume returns the total memory usage of the local backend.
func (*Backend) UnsafeImportAndReset ¶
func (local *Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error
UnsafeImportAndReset forces the backend to import the content of an engine into the target and then reset the engine to empty. This method will not close the engine. Make sure the engine is flushed manually before calling this method.
type BackendConfig ¶
type BackendConfig struct { // comma separated list of PD endpoints. PDAddr string LocalStoreDir string // max number of cached grpc.ClientConn to a store. // note: this is not the limit of actual connections, each grpc.ClientConn can have one or more of it. MaxConnPerStore int // compress type when write or ingest into tikv ConnCompressType config.CompressionType // concurrency of generateJobForRange and import(write & ingest) workers WorkerConcurrency int // batch kv size when writing to TiKV KVWriteBatchSize int64 RegionSplitBatchSize int RegionSplitConcurrency int CheckpointEnabled bool // memory table size of pebble. since pebble can have multiple mem tables, the max memory used is // MemTableSize * MemTableStopWritesThreshold, see pebble.Options for more details. MemTableSize int LocalWriterMemCacheSize int64 // whether check TiKV capacity before write & ingest. ShouldCheckTiKV bool DupeDetectEnabled bool DuplicateDetectOpt common.DupDetectOpt // max write speed in bytes per second to each store(burst is allowed), 0 means no limit StoreWriteBWLimit int // When TiKV is in normal mode, ingesting too many SSTs will cause TiKV write stall. // To avoid this, we should check write stall before ingesting SSTs. Note that, we // must check both leader node and followers in client side, because followers will // not check write stall as long as ingest command is accepted by leader. ShouldCheckWriteStall bool // soft limit on the number of open files that can be used by pebble DB. // the minimum value is 128. MaxOpenFiles int KeyspaceName string // the scope when pause PD schedulers. PausePDSchedulerScope config.PausePDSchedulerScope ResourceGroupName string TaskType string RaftKV2SwitchModeDuration time.Duration // whether disable automatic compactions of pebble db of engine. // deduplicate pebble db is not affected by this option. // see DisableAutomaticCompactions of pebble.Options for more details. // default true. DisableAutomaticCompactions bool }
BackendConfig is the config for local backend.
func NewBackendConfig ¶
func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resourceGroupName, taskType string, raftKV2SwitchModeDuration time.Duration) BackendConfig
NewBackendConfig creates a new BackendConfig.
type ChecksumManager ¶
type ChecksumManager interface {
Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error)
}
ChecksumManager is a manager that manages checksums.
func NewTiDBChecksumExecutor ¶
func NewTiDBChecksumExecutor(db *sql.DB) ChecksumManager
NewTiDBChecksumExecutor creates a new tidb checksum executor.
type DiskUsage ¶
type DiskUsage interface { // EngineFileSizes obtains the size occupied locally of all engines managed // by this backend. This method is used to compute disk quota. // It can return nil if the content are all stored remotely. EngineFileSizes() (res []backend.EngineFileSize) }
DiskUsage is an interface to obtain the size occupied locally of all engines
type DupKVStream ¶
type DupKVStream interface { // Next returns the next key-value pair or any error it encountered. // At the end of the stream, the error is io.EOF. Next() (key, val []byte, err error) // Close closes the stream. Close() error }
DupKVStream is a streaming interface for collecting duplicate key-value pairs.
type DupKVStreamImpl ¶
type DupKVStreamImpl struct {
// contains filtered or unexported fields
}
DupKVStreamImpl implements the interface of DupKVStream. It collects duplicate key-value pairs from a pebble.DB.
func NewLocalDupKVStream ¶
func NewLocalDupKVStream(dupDB *pebble.DB, keyAdapter common.KeyAdapter, keyRange tidbkv.KeyRange) *DupKVStreamImpl
NewLocalDupKVStream creates a new DupKVStreamImpl with the given duplicate db and key range.
func (*DupKVStreamImpl) Close ¶
func (s *DupKVStreamImpl) Close() error
Close implements the interface of DupKVStream.
func (*DupKVStreamImpl) Next ¶
func (s *DupKVStreamImpl) Next() (key, val []byte, err error)
Next implements the interface of DupKVStream.
type DupeController ¶
type DupeController struct {
// contains filtered or unexported fields
}
DupeController is used to collect duplicate keys from local and remote data source and resolve duplication.
func (*DupeController) CollectLocalDuplicateRows ¶
func (local *DupeController) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error)
CollectLocalDuplicateRows collect duplicate keys from local db. We will store the duplicate keys which may be repeated with other keys in local data source.
func (*DupeController) CollectRemoteDuplicateRows ¶
func (local *DupeController) CollectRemoteDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *encode.SessionOptions) (hasDupe bool, err error)
CollectRemoteDuplicateRows collect duplicate keys from remote TiKV storage. This keys may be duplicate with the data import by other lightning.
func (*DupeController) ResolveDuplicateRows ¶
func (local *DupeController) ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) (err error)
ResolveDuplicateRows resolves duplicated rows by deleting/inserting data according to the required algorithm.
type DupeDetector ¶
type DupeDetector struct {
// contains filtered or unexported fields
}
DupeDetector provides methods to collect and decode duplicated KV pairs into row data. The results are stored into the errorMgr. this object can only be used once, either for local or remote deduplication.
func NewDupeDetector ¶
func NewDupeDetector( tbl table.Table, tableName string, splitCli split.SplitClient, tikvCli *tikv.KVStore, tikvCodec tikv.Codec, errMgr *errormanager.ErrorManager, sessOpts *encode.SessionOptions, concurrency int, logger log.Logger, resourceGroupName string, taskType string, ) (*DupeDetector, error)
NewDupeDetector creates a new DupeDetector.
func (*DupeDetector) CollectDuplicateRowsFromDupDB ¶
func (m *DupeDetector) CollectDuplicateRowsFromDupDB(ctx context.Context, dupDB *pebble.DB, keyAdapter common.KeyAdapter) error
CollectDuplicateRowsFromDupDB collects duplicates from the duplicate DB and records all duplicate row info into errorMgr.
func (*DupeDetector) CollectDuplicateRowsFromTiKV ¶
func (m *DupeDetector) CollectDuplicateRowsFromTiKV(ctx context.Context, importClientFactory ImportClientFactory) error
CollectDuplicateRowsFromTiKV collects duplicates from the remote TiKV and records all duplicate row info into errorMgr.
func (*DupeDetector) HasDuplicate ¶
func (m *DupeDetector) HasDuplicate() bool
HasDuplicate returns true if there are duplicated KV pairs.
func (*DupeDetector) RecordDataConflictError ¶
func (m *DupeDetector) RecordDataConflictError(ctx context.Context, stream DupKVStream) error
RecordDataConflictError records data conflicts to errorMgr. The key received from stream must be a row key.
func (*DupeDetector) RecordIndexConflictError ¶
func (m *DupeDetector) RecordIndexConflictError(ctx context.Context, stream DupKVStream, tableID int64, indexInfo *model.IndexInfo) error
RecordIndexConflictError records index conflicts to errorMgr. The key received from stream must be an index key.
type Engine ¶
Engine is a local engine.
func (*Engine) Exist ¶
Exist checks if db folder existing (meta sometimes won't flush before lightning exit)
func (*Engine) GetFirstAndLastKey ¶
GetFirstAndLastKey reads the first and last key in range [lowerBound, upperBound) in the engine. Empty upperBound means unbounded.
func (*Engine) GetKeyRange ¶
GetKeyRange implements common.Engine.
func (*Engine) ImportedStatistics ¶
ImportedStatistics returns the imported kv size and imported kv count.
func (*Engine) KVStatistics ¶
KVStatistics returns the total kv size and total kv count.
func (*Engine) LoadIngestData ¶
func (e *Engine) LoadIngestData( ctx context.Context, regionRanges []common.Range, outCh chan<- common.DataAndRange, ) error
LoadIngestData return (local) Engine itself because Engine has implemented IngestData interface.
func (*Engine) SplitRanges ¶
func (e *Engine) SplitRanges( startKey, endKey []byte, sizeLimit, keysLimit int64, logger log.Logger, ) ([]common.Range, error)
SplitRanges gets size properties from pebble and split ranges according to size/keys limit.
func (*Engine) TotalMemorySize ¶
TotalMemorySize returns the total memory size of the engine.
type ImportClientFactory ¶
type ImportClientFactory interface { Create(ctx context.Context, storeID uint64) (sst.ImportSSTClient, error) Close() }
ImportClientFactory is factory to create new import client for specific store.
type Iter ¶
type Iter interface { common.ForwardIter // Last moves this iter to the last key. Last() bool // OpType represents operations of pair. currently we have two types. // 1. Put // 2. Delete OpType() sst.Pair_OP }
Iter abstract iterator method for Ingester.
type RangePropertiesCollector ¶
type RangePropertiesCollector struct {
// contains filtered or unexported fields
}
RangePropertiesCollector collects range properties for each range.
func (*RangePropertiesCollector) Add ¶
func (c *RangePropertiesCollector) Add(key pebble.InternalKey, value []byte) error
Add implements `pebble.TablePropertyCollector`. Add implements `TablePropertyCollector.Add`.
func (*RangePropertiesCollector) Finish ¶
func (c *RangePropertiesCollector) Finish(userProps map[string]string) error
Finish implements `pebble.TablePropertyCollector`.
func (*RangePropertiesCollector) Name ¶
func (*RangePropertiesCollector) Name() string
Name implements `pebble.TablePropertyCollector`.
type RemoteChecksum ¶
type RemoteChecksum struct { Schema string Table string Checksum uint64 TotalKVs uint64 TotalBytes uint64 }
RemoteChecksum represents a checksum result got from tidb.
func (*RemoteChecksum) IsEqual ¶
func (rc *RemoteChecksum) IsEqual(other *verification.KVChecksum) bool
IsEqual checks whether the checksum is equal to the other.
type RemoteDupKVStream ¶
type RemoteDupKVStream struct {
// contains filtered or unexported fields
}
RemoteDupKVStream implements the interface of DupKVStream. It collects duplicate key-value pairs from a TiKV region.
func NewRemoteDupKVStream ¶
func NewRemoteDupKVStream( ctx context.Context, region *split.RegionInfo, keyRange tidbkv.KeyRange, importClientFactory ImportClientFactory, resourceGroupName string, taskType string, ) (*RemoteDupKVStream, error)
NewRemoteDupKVStream creates a new RemoteDupKVStream.
func (*RemoteDupKVStream) Close ¶
func (s *RemoteDupKVStream) Close() error
Close implements the interface of DupKVStream.
func (*RemoteDupKVStream) Next ¶
func (s *RemoteDupKVStream) Next() (key, val []byte, err error)
Next implements the interface of DupKVStream.
type RlimT ¶
type RlimT = uint64
RlimT is the type of rlimit values.
func GetSystemRLimit ¶
GetSystemRLimit returns the current open-file limit.
type StoreWriteLimiter ¶
type StoreWriteLimiter interface { WaitN(ctx context.Context, storeID uint64, n int) error Limit() int }
StoreWriteLimiter is used to limit the write rate of a store.
type TableRegionSizeGetter ¶
type TableRegionSizeGetter interface {
GetTableRegionSize(ctx context.Context, tableID int64) (map[uint64]int64, error)
}
TableRegionSizeGetter get table region size.
type TableRegionSizeGetterImpl ¶
TableRegionSizeGetterImpl implements TableRegionSizeGetter.
func (*TableRegionSizeGetterImpl) GetTableRegionSize ¶
func (g *TableRegionSizeGetterImpl) GetTableRegionSize(ctx context.Context, tableID int64) (map[uint64]int64, error)
GetTableRegionSize implements TableRegionSizeGetter.
type TiKVChecksumManager ¶
type TiKVChecksumManager struct {
// contains filtered or unexported fields
}
TiKVChecksumManager is a manager that can compute checksum of a table using TiKV.
func NewTiKVChecksumManager ¶
func NewTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanConcurrency uint, backoffWeight int, resourceGroupName, explicitRequestSourceType string) *TiKVChecksumManager
NewTiKVChecksumManager return a new tikv checksum manager
func (*TiKVChecksumManager) Checksum ¶
func (e *TiKVChecksumManager) Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error)
Checksum implements the ChecksumManager interface.
type TiKVModeSwitcher ¶
type TiKVModeSwitcher interface { // ToImportMode switches all TiKV nodes to Import mode. ToImportMode(ctx context.Context, ranges ...*sstpb.Range) // ToNormalMode switches all TiKV nodes to Normal mode. ToNormalMode(ctx context.Context, ranges ...*sstpb.Range) }
TiKVModeSwitcher is used to switch TiKV nodes between Import and Normal mode.
func NewTiKVModeSwitcher ¶
NewTiKVModeSwitcher creates a new TiKVModeSwitcher.
type Writer ¶
Writer is used to write data into a SST file.
func (*Writer) AppendRows ¶
AppendRows appends rows to the SST file.
func (*Writer) EstimatedSize ¶
EstimatedSize returns the estimated size of the SST file.