Documentation
¶
Index ¶
- Constants
- Variables
- func CleanUpTempDir(ctx context.Context, se sessionctx.Context, path string)
- func CopReadBatchSize(hintSize int) int
- func CopReadChunkPoolSize(hintConc int) int
- func CreateLocalBackend(ctx context.Context, store kv.Storage, job *model.Job, checkDup bool) (*local.BackendConfig, *local.Backend, error)
- func GenIngestTempDataDir() (string, error)
- func InitGlobalLightningEnv(path string) (ok bool)
- func InstanceAddr() string
- func NewDDLTLS() (*common.TLS, error)
- func NewMemRootImpl(maxQuota int64) *memRootImpl
- func RiskOfDiskFull(available, capacity uint64) bool
- func TryConvertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error
- type BackendCtx
- type BackendCtxBuilder
- func (b *BackendCtxBuilder) Build(cfg *local.BackendConfig, bd *local.Backend) (BackendCtx, error)
- func (b *BackendCtxBuilder) ForDuplicateCheck() *BackendCtxBuilder
- func (b *BackendCtxBuilder) WithCheckpointManagerParam(sessPool *sess.Pool, physicalID int64) *BackendCtxBuilder
- func (b *BackendCtxBuilder) WithImportDistributedLock(etcdCli *clientv3.Client, importTS uint64) *BackendCtxBuilder
- type CheckpointManager
- func (s *CheckpointManager) AdvanceWatermark(imported bool) error
- func (s *CheckpointManager) Close()
- func (s *CheckpointManager) GetTS() uint64
- func (s *CheckpointManager) IsKeyProcessed(end kv.Key) bool
- func (s *CheckpointManager) NextKeyToProcess() kv.Key
- func (s *CheckpointManager) Register(taskID int, end kv.Key)
- func (s *CheckpointManager) TotalKeyCount() int
- func (s *CheckpointManager) UpdateTotalKeys(taskID int, delta int, last bool)
- func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int)
- type CheckpointOperator
- type DiskRoot
- type Engine
- type JobReorgMeta
- type MemRoot
- type MockBackendCtx
- func (m *MockBackendCtx) AddChunk(id int, endKey kv.Key)
- func (m *MockBackendCtx) AdvanceWatermark(imported bool) error
- func (m *MockBackendCtx) Close()
- func (*MockBackendCtx) CollectRemoteDuplicateRows(indexID int64, _ table.Table) error
- func (m *MockBackendCtx) FinishAndUnregisterEngines(_ UnregisterOpt) error
- func (m *MockBackendCtx) FinishChunk(id int, count int)
- func (bc *MockBackendCtx) GetDiskUsage() uint64
- func (m *MockBackendCtx) GetImportTS() uint64
- func (m *MockBackendCtx) GetLocalBackend() *local.Backend
- func (m *MockBackendCtx) Ingest(_ context.Context) error
- func (m *MockBackendCtx) IngestIfQuotaExceeded(_ context.Context, taskID, cnt int) error
- func (m *MockBackendCtx) NextStartKey() kv.Key
- func (m *MockBackendCtx) Register(indexIDs []int64, _ []bool, _ table.Table) ([]Engine, error)
- func (m *MockBackendCtx) TotalKeyCount() int
- func (m *MockBackendCtx) UpdateChunk(id int, count int, done bool)
- type MockEngineInfo
- type MockWriteHook
- type MockWriter
- type ReorgCheckpoint
- type ResourceTracker
- type UnregisterOpt
- type Writer
Constants ¶
const ( JobCheckpointVersionCurrent = JobCheckpointVersion1 JobCheckpointVersion1 = 1 )
JobCheckpointVersionCurrent is the current version of the checkpoint.
const ( LitErrAllocMemFail string = "allocate memory failed" LitErrCreateDirFail string = "create ingest sort path error" LitErrStatDirFail string = "stat ingest sort path error" LitErrCreateBackendFail string = "build ingest backend failed" LitErrGetBackendFail string = "cannot get ingest backend" LitErrCreateEngineFail string = "build ingest engine failed" LitErrCreateContextFail string = "build ingest writer context failed" LitErrGetEngineFail string = "can not get ingest engine info" LitErrGetStorageQuota string = "get storage quota error" LitErrCloseEngineErr string = "close engine error" LitErrCleanEngineErr string = "clean engine error" LitErrFlushEngineErr string = "flush engine data err" LitErrIngestDataErr string = "ingest data into storage error" LitErrRemoteDupExistErr string = "remote duplicate index key exist" LitErrExceedConcurrency string = "the concurrency is greater than ingest limit" LitErrCloseWriterErr string = "close writer error" LitErrReadSortPath string = "cannot read sort path" LitErrCleanSortPath string = "clean up temp dir failed" LitErrResetEngineFail string = "reset engine failed" LitWarnEnvInitFail string = "initialize environment failed" LitWarnConfigError string = "build config for backend failed" LitInfoEnvInitSucc string = "init global ingest backend environment finished" LitInfoSortDir string = "the ingest sorted directory" LitInfoCreateBackend string = "create one backend for an DDL job" LitInfoCloseBackend string = "close one backend for DDL job" LitInfoOpenEngine string = "open an engine for index reorg task" LitInfoCreateWrite string = "create one local writer for index reorg task" LitInfoCloseEngine string = "flush all writer and get closed engine" LitInfoRemoteDupCheck string = "start remote duplicate checking" LitInfoStartImport string = "start to import data" LitInfoChgMemSetting string = "change memory setting for ingest" LitInfoInitMemSetting string = "initial memory setting for ingest" LitInfoUnsafeImport string = "do a partial import data into the storage" )
Message const text
Variables ¶
var ( // LitMemRoot is used to track the memory usage of the lightning backfill process. LitMemRoot MemRoot // LitDiskRoot is used to track the disk usage of the lightning backfill process. LitDiskRoot DiskRoot // LitInitialized is the flag indicates whether the lightning backfill process is initialized. LitInitialized bool )
var BackendCounterForTest = atomic.Int64{}
BackendCounterForTest is only used in test.
var ForceSyncFlagForTest atomic.Bool
ForceSyncFlagForTest is a flag to force sync only for test.
var ImporterRangeConcurrencyForTest *atomic.Int32
ImporterRangeConcurrencyForTest is only used for test.
var MockDMLExecutionStateBeforeImport func()
MockDMLExecutionStateBeforeImport is a failpoint to mock the DML execution state before import.
var MockExecAfterWriteRow func()
MockExecAfterWriteRow is only used for test.
var ResignOwnerForTest = atomic.NewBool(false)
ResignOwnerForTest is only used for test.
var TrackerCountForTest = atomic.Int64{}
TrackerCountForTest is only used for test.
Functions ¶
func CleanUpTempDir ¶
func CleanUpTempDir(ctx context.Context, se sessionctx.Context, path string)
CleanUpTempDir is used to remove the stale index data. This function gets running DDL jobs from `mysql.tidb_ddl_job` and it only removes the folders that related to finished jobs.
func CopReadBatchSize ¶
CopReadBatchSize is the batch size of coprocessor read. It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid sending too many cop requests for the same handle range.
func CopReadChunkPoolSize ¶
CopReadChunkPoolSize is the size of chunk pool, which represents the max concurrent ongoing coprocessor requests. It multiplies the tidb_ddl_reorg_worker_cnt by 10.
func CreateLocalBackend ¶
func CreateLocalBackend(ctx context.Context, store kv.Storage, job *model.Job, checkDup bool) (*local.BackendConfig, *local.Backend, error)
CreateLocalBackend creates a local backend for adding index.
func GenIngestTempDataDir ¶
GenIngestTempDataDir generates a path for DDL ingest. Format: ${temp-dir}/tmp_ddl-{port}
func InitGlobalLightningEnv ¶
InitGlobalLightningEnv initialize Lightning backfill environment.
func InstanceAddr ¶
func InstanceAddr() string
InstanceAddr returns the string concat with instance address and temp-dir.
func NewMemRootImpl ¶
func NewMemRootImpl(maxQuota int64) *memRootImpl
NewMemRootImpl creates a new memRootImpl.
func RiskOfDiskFull ¶
RiskOfDiskFull checks if the disk has less than 10% space.
Types ¶
type BackendCtx ¶
type BackendCtx interface { // Register create a new engineInfo for each index ID and register it to the // backend context. If the index ID is already registered, it will return the // associated engines. Only one group of index ID is allowed to register for a // BackendCtx. // // Register is only used in local disk based ingest. Register(indexIDs []int64, uniques []bool, tbl table.Table) ([]Engine, error) // FinishAndUnregisterEngines finishes the task and unregisters all engines that // are Register-ed before. It's safe to call it multiple times. // // FinishAndUnregisterEngines is only used in local disk based ingest. FinishAndUnregisterEngines(opt UnregisterOpt) error // IngestIfQuotaExceeded updates the task and count to checkpoint manager, and try to ingest them to disk or TiKV // according to the last ingest time or the usage of local disk. IngestIfQuotaExceeded(ctx context.Context, taskID int, count int) error // Ingest checks if all engines need to be flushed and imported. It's concurrent safe. Ingest(ctx context.Context) (err error) CheckpointOperator // GetLocalBackend exposes local.Backend. It's only used in global sort based // ingest. GetLocalBackend() *local.Backend // CollectRemoteDuplicateRows collects duplicate entry error for given index as // the supplement of Ingest. // // CollectRemoteDuplicateRows is only used in global sort based ingest. CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error GetDiskUsage() uint64 Close() }
BackendCtx is the backend context for one add index reorg task.
func NewMockBackendCtx ¶
func NewMockBackendCtx(job *model.Job, sessCtx sessionctx.Context, cpMgr *CheckpointManager) BackendCtx
NewMockBackendCtx creates a MockBackendCtx.
type BackendCtxBuilder ¶
type BackendCtxBuilder struct {
// contains filtered or unexported fields
}
BackendCtxBuilder is the builder of BackendCtx.
func NewBackendCtxBuilder ¶
NewBackendCtxBuilder creates a BackendCtxBuilder.
func (*BackendCtxBuilder) Build ¶
func (b *BackendCtxBuilder) Build(cfg *local.BackendConfig, bd *local.Backend) (BackendCtx, error)
Build builds a BackendCtx.
func (*BackendCtxBuilder) ForDuplicateCheck ¶
func (b *BackendCtxBuilder) ForDuplicateCheck() *BackendCtxBuilder
ForDuplicateCheck marks this backend context is only used for duplicate check. TODO(tangenta): remove this after we don't rely on the backend to do duplicate check.
func (*BackendCtxBuilder) WithCheckpointManagerParam ¶
func (b *BackendCtxBuilder) WithCheckpointManagerParam( sessPool *sess.Pool, physicalID int64, ) *BackendCtxBuilder
WithCheckpointManagerParam only is used by non-DXF local ingest mode.
func (*BackendCtxBuilder) WithImportDistributedLock ¶
func (b *BackendCtxBuilder) WithImportDistributedLock(etcdCli *clientv3.Client, importTS uint64) *BackendCtxBuilder
WithImportDistributedLock needs a etcd client to maintain a distributed lock during partial import.
type CheckpointManager ¶
type CheckpointManager struct {
// contains filtered or unexported fields
}
CheckpointManager is a checkpoint manager implementation that used by non-distributed reorganization. It manages the data as two-level checkpoints: "flush"ed to local storage and "import"ed to TiKV. The checkpoint is saved in a table in the TiDB cluster.
func NewCheckpointManager ¶
func NewCheckpointManager( ctx context.Context, sessPool *sess.Pool, physicalID int64, jobID int64, localStoreDir string, pdCli pd.Client, ) (*CheckpointManager, error)
NewCheckpointManager creates a new checkpoint manager.
func (*CheckpointManager) AdvanceWatermark ¶
func (s *CheckpointManager) AdvanceWatermark(imported bool) error
AdvanceWatermark advances the watermark according to flushed or imported status.
func (*CheckpointManager) Close ¶
func (s *CheckpointManager) Close()
Close closes the checkpoint manager.
func (*CheckpointManager) GetTS ¶
func (s *CheckpointManager) GetTS() uint64
GetTS returns the TS saved in checkpoint.
func (*CheckpointManager) IsKeyProcessed ¶
func (s *CheckpointManager) IsKeyProcessed(end kv.Key) bool
IsKeyProcessed checks if the key is processed. The key may not be imported. This is called before the reader reads the data and decides whether to skip the current task.
func (*CheckpointManager) NextKeyToProcess ¶
func (s *CheckpointManager) NextKeyToProcess() kv.Key
NextKeyToProcess finds the next unprocessed key in checkpoint. If there is no such key, it returns nil.
func (*CheckpointManager) Register ¶
func (s *CheckpointManager) Register(taskID int, end kv.Key)
Register registers a new task. taskID MUST be continuous ascending and start from 0.
TODO(lance6716): remove this constraint, use endKey as taskID and use ordered map type for checkpoints.
func (*CheckpointManager) TotalKeyCount ¶
func (s *CheckpointManager) TotalKeyCount() int
TotalKeyCount returns the key counts that have processed. It contains the keys that is not sync to checkpoint.
func (*CheckpointManager) UpdateTotalKeys ¶
func (s *CheckpointManager) UpdateTotalKeys(taskID int, delta int, last bool)
UpdateTotalKeys updates the total keys of the task. This is called by the reader after reading the data to update the number of rows contained in the current chunk.
func (*CheckpointManager) UpdateWrittenKeys ¶
func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int)
UpdateWrittenKeys updates the written keys of the task. This is called by the writer after writing the local engine to update the current number of rows written.
type CheckpointOperator ¶
type CheckpointOperator interface { NextStartKey() tikv.Key TotalKeyCount() int AddChunk(id int, endKey tikv.Key) UpdateChunk(id int, count int, done bool) FinishChunk(id int, count int) AdvanceWatermark(imported bool) error GetImportTS() uint64 }
CheckpointOperator contains the operations to checkpoints.
type DiskRoot ¶
type DiskRoot interface { Add(id int64, tracker ResourceTracker) Remove(id int64) Count() int UpdateUsage() ShouldImport() bool UsageInfo() string PreCheckUsage() error StartupCheck() error }
DiskRoot is used to track the disk usage for the lightning backfill process.
func NewDiskRootImpl ¶
NewDiskRootImpl creates a new DiskRoot.
type Engine ¶
type Engine interface { Flush() error Close(cleanup bool) CreateWriter(id int, writerCfg *backend.LocalWriterConfig) (Writer, error) }
Engine is the interface for the engine that can be used to write key-value pairs.
type JobReorgMeta ¶
type JobReorgMeta struct {
Checkpoint *ReorgCheckpoint `json:"reorg_checkpoint"`
}
JobReorgMeta is the metadata for a reorg job.
type MemRoot ¶
type MemRoot interface { Consume(size int64) Release(size int64) CheckConsume(size int64) bool // ConsumeWithTag consumes memory with a tag. The main difference between // ConsumeWithTag and Consume is that if the memory is updated afterward, caller // can use ReleaseWithTag then ConsumeWithTag to update the memory usage. ConsumeWithTag(tag string, size int64) ReleaseWithTag(tag string) SetMaxMemoryQuota(quota int64) MaxMemoryQuota() int64 CurrentUsage() int64 CurrentUsageWithTag(tag string) int64 RefreshConsumption() }
MemRoot is used to track the memory usage for the lightning backfill process. TODO(lance6716): change API to prevent TOCTOU.
type MockBackendCtx ¶
type MockBackendCtx struct {
// contains filtered or unexported fields
}
MockBackendCtx is a mock backend context.
func (*MockBackendCtx) AddChunk ¶
func (m *MockBackendCtx) AddChunk(id int, endKey kv.Key)
AddChunk implements CheckpointOperator interface.
func (*MockBackendCtx) AdvanceWatermark ¶
func (m *MockBackendCtx) AdvanceWatermark(imported bool) error
AdvanceWatermark implements CheckpointOperator interface.
func (*MockBackendCtx) CollectRemoteDuplicateRows ¶
func (*MockBackendCtx) CollectRemoteDuplicateRows(indexID int64, _ table.Table) error
CollectRemoteDuplicateRows implements BackendCtx.CollectRemoteDuplicateRows interface.
func (*MockBackendCtx) FinishAndUnregisterEngines ¶
func (m *MockBackendCtx) FinishAndUnregisterEngines(_ UnregisterOpt) error
FinishAndUnregisterEngines implements BackendCtx interface.
func (*MockBackendCtx) FinishChunk ¶
func (m *MockBackendCtx) FinishChunk(id int, count int)
FinishChunk implements CheckpointOperator interface.
func (*MockBackendCtx) GetDiskUsage ¶
func (bc *MockBackendCtx) GetDiskUsage() uint64
GetDiskUsage returns current disk usage of underlying backend.
func (*MockBackendCtx) GetImportTS ¶
func (m *MockBackendCtx) GetImportTS() uint64
GetImportTS implements CheckpointOperator interface.
func (*MockBackendCtx) GetLocalBackend ¶
func (m *MockBackendCtx) GetLocalBackend() *local.Backend
GetLocalBackend returns the local backend.
func (*MockBackendCtx) Ingest ¶
func (m *MockBackendCtx) Ingest(_ context.Context) error
Ingest implements BackendCtx.Ingest interface.
func (*MockBackendCtx) IngestIfQuotaExceeded ¶
func (m *MockBackendCtx) IngestIfQuotaExceeded(_ context.Context, taskID, cnt int) error
IngestIfQuotaExceeded implements BackendCtx.IngestIfQuotaExceeded interface.
func (*MockBackendCtx) NextStartKey ¶
func (m *MockBackendCtx) NextStartKey() kv.Key
NextStartKey implements CheckpointOperator interface.
func (*MockBackendCtx) TotalKeyCount ¶
func (m *MockBackendCtx) TotalKeyCount() int
TotalKeyCount implements CheckpointOperator interface.
func (*MockBackendCtx) UpdateChunk ¶
func (m *MockBackendCtx) UpdateChunk(id int, count int, done bool)
UpdateChunk implements CheckpointOperator interface.
type MockEngineInfo ¶
type MockEngineInfo struct {
// contains filtered or unexported fields
}
MockEngineInfo is a mock engine info.
func NewMockEngineInfo ¶
func NewMockEngineInfo(sessCtx sessionctx.Context) *MockEngineInfo
NewMockEngineInfo creates a new mock engine info.
func (*MockEngineInfo) Close ¶
func (*MockEngineInfo) Close(_ bool)
Close implements Engine.Close interface.
func (*MockEngineInfo) CreateWriter ¶
func (m *MockEngineInfo) CreateWriter(id int, _ *backend.LocalWriterConfig) (Writer, error)
CreateWriter implements Engine.CreateWriter interface.
func (*MockEngineInfo) Flush ¶
func (*MockEngineInfo) Flush() error
Flush implements Engine.Flush interface.
func (*MockEngineInfo) SetHook ¶
func (m *MockEngineInfo) SetHook(onWrite func(key, val []byte))
SetHook set the write hook.
type MockWriteHook ¶
type MockWriteHook func(key, val []byte)
MockWriteHook the hook for write in mock engine.
type MockWriter ¶
type MockWriter struct {
// contains filtered or unexported fields
}
MockWriter is a mock writer.
func (*MockWriter) LockForWrite ¶
func (*MockWriter) LockForWrite() func()
LockForWrite implements Writer.LockForWrite interface.
type ReorgCheckpoint ¶
type ReorgCheckpoint struct { LocalSyncKey kv.Key `json:"local_sync_key"` LocalKeyCount int `json:"local_key_count"` GlobalSyncKey kv.Key `json:"global_sync_key"` GlobalKeyCount int `json:"global_key_count"` InstanceAddr string `json:"instance_addr"` PhysicalID int64 `json:"physical_id"` // TS of next engine ingest. TS uint64 `json:"ts"` Version int64 `json:"version"` }
ReorgCheckpoint is the checkpoint for a reorg job.
type ResourceTracker ¶
type ResourceTracker interface {
GetDiskUsage() uint64
}
ResourceTracker has the method of GetUsage.
type UnregisterOpt ¶
type UnregisterOpt int
UnregisterOpt controls the behavior of backend context unregistering.
const ( // OptCloseEngines only closes engines, it does not clean up sort path data. OptCloseEngines UnregisterOpt = 1 << iota // OptCleanData cleans up local sort dir data. OptCleanData // OptCheckDup checks if there is duplicate entry for unique indexes. OptCheckDup )
type Writer ¶
type Writer interface { // WriteRow writes one row into downstream. // To enable uniqueness check, the handle should be non-empty. WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error LockForWrite() (unlock func()) }
Writer is the interface for the writer that can be used to write key-value pairs.