Documentation ¶
Index ¶
- Constants
- Variables
- func CleanUpTempDir(ctx context.Context, se sessionctx.Context, path string)
- func CopReadBatchSize(hintSize int) int
- func CopReadChunkPoolSize(hintConc int) int
- func GenIngestTempDataDir() (string, error)
- func InitGlobalLightningEnv(path string) (ok bool)
- func InstanceAddr() string
- func NewDDLTLS() (*common.TLS, error)
- func NewMemRootImpl(maxQuota int64, bcCtxMgr *litBackendCtxMgr) *memRootImpl
- func RiskOfDiskFull(available, capacity uint64) bool
- func TryConvertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *model.TableInfo) error
- type BackendCtx
- type BackendCtxMgr
- type CheckpointManager
- func (s *CheckpointManager) AdvanceWatermark(flushed, imported bool)
- func (s *CheckpointManager) Close()
- func (s *CheckpointManager) GetTS() uint64
- func (s *CheckpointManager) IsKeyProcessed(end kv.Key) bool
- func (s *CheckpointManager) NextKeyToProcess() kv.Key
- func (s *CheckpointManager) Register(taskID int, end kv.Key)
- func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key)
- func (s *CheckpointManager) UpdateTotalKeys(taskID int, delta int, last bool)
- func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int)
- type DiskRoot
- type Engine
- type FlushController
- type FlushMode
- type JobReorgMeta
- type MemRoot
- type MockBackendCtx
- func (m *MockBackendCtx) AttachCheckpointManager(mgr *CheckpointManager)
- func (*MockBackendCtx) CollectRemoteDuplicateRows(indexID int64, _ table.Table) error
- func (*MockBackendCtx) FinishAndUnregisterEngines(_ UnregisterOpt) error
- func (*MockBackendCtx) Flush(context.Context, FlushMode) (flushed, imported bool, err error)
- func (m *MockBackendCtx) GetCheckpointManager() *CheckpointManager
- func (m *MockBackendCtx) GetLocalBackend() *local.Backend
- func (m *MockBackendCtx) Register(indexIDs []int64, _ []bool, _ table.Table) ([]Engine, error)
- type MockBackendCtxMgr
- func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error)
- func (m *MockBackendCtxMgr) EncodeJobSortPath(int64) string
- func (m *MockBackendCtxMgr) Load(jobID int64) (BackendCtx, bool)
- func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, ...) (BackendCtx, error)
- func (m *MockBackendCtxMgr) ResetSessCtx()
- func (m *MockBackendCtxMgr) Unregister(jobID int64)
- type MockEngineInfo
- type MockWriteHook
- type MockWriter
- type ReorgCheckpoint
- type UnregisterOpt
- type Writer
Constants ¶
const ( JobCheckpointVersionCurrent = JobCheckpointVersion1 JobCheckpointVersion1 = 1 )
JobCheckpointVersionCurrent is the current version of the checkpoint.
const ( LitErrAllocMemFail string = "allocate memory failed" LitErrCreateDirFail string = "create ingest sort path error" LitErrStatDirFail string = "stat ingest sort path error" LitErrCreateBackendFail string = "build ingest backend failed" LitErrGetBackendFail string = "cannot get ingest backend" LitErrCreateEngineFail string = "build ingest engine failed" LitErrCreateContextFail string = "build ingest writer context failed" LitErrGetEngineFail string = "can not get ingest engine info" LitErrGetStorageQuota string = "get storage quota error" LitErrCloseEngineErr string = "close engine error" LitErrCleanEngineErr string = "clean engine error" LitErrFlushEngineErr string = "flush engine data err" LitErrIngestDataErr string = "ingest data into storage error" LitErrRemoteDupExistErr string = "remote duplicate index key exist" LitErrExceedConcurrency string = "the concurrency is greater than ingest limit" LitErrCloseWriterErr string = "close writer error" LitErrReadSortPath string = "cannot read sort path" LitErrCleanSortPath string = "clean up temp dir failed" LitErrResetEngineFail string = "reset engine failed" LitWarnEnvInitFail string = "initialize environment failed" LitWarnConfigError string = "build config for backend failed" LitInfoEnvInitSucc string = "init global ingest backend environment finished" LitInfoSortDir string = "the ingest sorted directory" LitInfoCreateBackend string = "create one backend for an DDL job" LitInfoCloseBackend string = "close one backend for DDL job" LitInfoOpenEngine string = "open an engine for index reorg task" LitInfoCreateWrite string = "create one local writer for index reorg task" LitInfoCloseEngine string = "flush all writer and get closed engine" LitInfoRemoteDupCheck string = "start remote duplicate checking" LitInfoStartImport string = "start to import data" LitInfoChgMemSetting string = "change memory setting for ingest" LitInfoInitMemSetting string = "initial memory setting for ingest" LitInfoUnsafeImport string = "do a partial import data into the storage" )
Message const text
Variables ¶
var ( // LitBackCtxMgr is the entry for the lightning backfill process. LitBackCtxMgr BackendCtxMgr // LitMemRoot is used to track the memory usage of the lightning backfill process. LitMemRoot MemRoot // LitInitialized is the flag indicates whether the lightning backfill process is initialized. LitInitialized bool )
var ForceSyncFlagForTest = false
ForceSyncFlagForTest is a flag to force sync only for test.
var ImporterRangeConcurrencyForTest *atomic.Int32
ImporterRangeConcurrencyForTest is only used for test.
var MockDMLExecutionStateBeforeImport func()
MockDMLExecutionStateBeforeImport is a failpoint to mock the DML execution state before import.
var MockExecAfterWriteRow func()
MockExecAfterWriteRow is only used for test.
var ResignOwnerForTest = atomic.NewBool(false)
ResignOwnerForTest is only used for test.
Functions ¶
func CleanUpTempDir ¶
func CleanUpTempDir(ctx context.Context, se sessionctx.Context, path string)
CleanUpTempDir is used to remove the stale index data. This function gets running DDL jobs from `mysql.tidb_ddl_job` and it only removes the folders that related to finished jobs.
func CopReadBatchSize ¶
CopReadBatchSize is the batch size of coprocessor read. It multiplies the tidb_ddl_reorg_batch_size by 10 to avoid sending too many cop requests for the same handle range.
func CopReadChunkPoolSize ¶
CopReadChunkPoolSize is the size of chunk pool, which represents the max concurrent ongoing coprocessor requests. It multiplies the tidb_ddl_reorg_worker_cnt by 10.
func GenIngestTempDataDir ¶
GenIngestTempDataDir generates a path for DDL ingest. Format: ${temp-dir}/tmp_ddl-{port}
func InitGlobalLightningEnv ¶
InitGlobalLightningEnv initialize Lightning backfill environment.
func InstanceAddr ¶
func InstanceAddr() string
InstanceAddr returns the string concat with instance address and temp-dir.
func NewMemRootImpl ¶
func NewMemRootImpl(maxQuota int64, bcCtxMgr *litBackendCtxMgr) *memRootImpl
NewMemRootImpl creates a new memRootImpl.
func RiskOfDiskFull ¶
RiskOfDiskFull checks if the disk has less than 10% space.
Types ¶
type BackendCtx ¶
type BackendCtx interface { // Register create a new engineInfo for each index ID and register it to the // backend context. If the index ID is already registered, it will return the // associated engines. Only one group of index ID is allowed to register for a // BackendCtx. // // Register is only used in local disk based ingest. Register(indexIDs []int64, uniques []bool, tbl table.Table) ([]Engine, error) // FinishAndUnregisterEngines finishes the task and unregisters all engines that // are Register-ed before. It's safe to call it multiple times. // // FinishAndUnregisterEngines is only used in local disk based ingest. FinishAndUnregisterEngines(opt UnregisterOpt) error FlushController AttachCheckpointManager(*CheckpointManager) GetCheckpointManager() *CheckpointManager // GetLocalBackend exposes local.Backend. It's only used in global sort based // ingest. GetLocalBackend() *local.Backend // CollectRemoteDuplicateRows collects duplicate entry error for given index as // the supplement of FlushController.Flush. // // CollectRemoteDuplicateRows is only used in global sort based ingest. CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error }
BackendCtx is the backend context for one add index reorg task.
type BackendCtxMgr ¶
type BackendCtxMgr interface { // CheckMoreTasksAvailable checks if it can run more ingest backfill tasks. CheckMoreTasksAvailable() (bool, error) // Register uses jobID to identify the BackendCtx. If there's already a // BackendCtx with the same jobID, it will be returned. Otherwise, a new // BackendCtx will be created and returned. Register( ctx context.Context, jobID int64, hasUnique bool, etcdClient *clientv3.Client, pdSvcDiscovery sd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64, ) (BackendCtx, error) Unregister(jobID int64) // EncodeJobSortPath encodes the job ID to the local disk sort path. EncodeJobSortPath(jobID int64) string // Load returns the registered BackendCtx with the given jobID. Load(jobID int64) (BackendCtx, bool) }
BackendCtxMgr is used to manage the BackendCtx.
func NewLitBackendCtxMgr ¶
func NewLitBackendCtxMgr(path string, memQuota uint64) BackendCtxMgr
NewLitBackendCtxMgr creates a new litBackendCtxMgr.
type CheckpointManager ¶
type CheckpointManager struct {
// contains filtered or unexported fields
}
CheckpointManager is a checkpoint manager implementation that used by non-distributed reorganization. It manages the data as two-level checkpoints: "flush"ed to local storage and "import"ed to TiKV. The checkpoint is saved in a table in the TiDB cluster.
func NewCheckpointManager ¶
func NewCheckpointManager( ctx context.Context, sessPool *sess.Pool, physicalID int64, jobID int64, indexIDs []int64, localStoreDir string, pdCli pd.Client, ) (*CheckpointManager, error)
NewCheckpointManager creates a new checkpoint manager.
func (*CheckpointManager) AdvanceWatermark ¶
func (s *CheckpointManager) AdvanceWatermark(flushed, imported bool)
AdvanceWatermark advances the watermark according to flushed or imported status.
func (*CheckpointManager) Close ¶
func (s *CheckpointManager) Close()
Close closes the checkpoint manager.
func (*CheckpointManager) GetTS ¶
func (s *CheckpointManager) GetTS() uint64
GetTS returns the TS saved in checkpoint.
func (*CheckpointManager) IsKeyProcessed ¶
func (s *CheckpointManager) IsKeyProcessed(end kv.Key) bool
IsKeyProcessed checks if the key is processed. The key may not be imported. This is called before the reader reads the data and decides whether to skip the current task.
func (*CheckpointManager) NextKeyToProcess ¶
func (s *CheckpointManager) NextKeyToProcess() kv.Key
NextKeyToProcess finds the next unprocessed key in checkpoint. If there is no such key, it returns nil.
func (*CheckpointManager) Register ¶
func (s *CheckpointManager) Register(taskID int, end kv.Key)
Register registers a new task. taskID MUST be continuous ascending and start from 0.
TODO(lance6716): remove this constraint, use endKey as taskID and use ordered map type for checkpoints.
func (*CheckpointManager) Status ¶
func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key)
Status returns the status of the checkpoint.
func (*CheckpointManager) UpdateTotalKeys ¶
func (s *CheckpointManager) UpdateTotalKeys(taskID int, delta int, last bool)
UpdateTotalKeys updates the total keys of the task. This is called by the reader after reading the data to update the number of rows contained in the current chunk.
func (*CheckpointManager) UpdateWrittenKeys ¶
func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int)
UpdateWrittenKeys updates the written keys of the task. This is called by the writer after writing the local engine to update the current number of rows written.
type DiskRoot ¶
type DiskRoot interface { UpdateUsage() ShouldImport() bool UsageInfo() string PreCheckUsage() error StartupCheck() error }
DiskRoot is used to track the disk usage for the lightning backfill process.
func NewDiskRootImpl ¶
NewDiskRootImpl creates a new DiskRoot.
type Engine ¶
type Engine interface { Flush() error Close(cleanup bool) CreateWriter(id int, writerCfg *backend.LocalWriterConfig) (Writer, error) }
Engine is the interface for the engine that can be used to write key-value pairs.
type FlushController ¶
type FlushController interface { // Flush checks if al engines need to be flushed and imported based on given // FlushMode. It's concurrent safe. Flush(ctx context.Context, mode FlushMode) (flushed, imported bool, err error) }
FlushController is an interface to control the flush of data so after it returns caller can save checkpoint.
type JobReorgMeta ¶
type JobReorgMeta struct {
Checkpoint *ReorgCheckpoint `json:"reorg_checkpoint"`
}
JobReorgMeta is the metadata for a reorg job.
type MemRoot ¶
type MemRoot interface { Consume(size int64) Release(size int64) CheckConsume(size int64) bool // ConsumeWithTag consumes memory with a tag. The main difference between // ConsumeWithTag and Consume is that if the memory is updated afterward, caller // can use ReleaseWithTag then ConsumeWithTag to update the memory usage. ConsumeWithTag(tag string, size int64) ReleaseWithTag(tag string) SetMaxMemoryQuota(quota int64) MaxMemoryQuota() int64 CurrentUsage() int64 CurrentUsageWithTag(tag string) int64 RefreshConsumption() }
MemRoot is used to track the memory usage for the lightning backfill process. TODO(lance6716): change API to prevent TOCTOU.
type MockBackendCtx ¶
type MockBackendCtx struct {
// contains filtered or unexported fields
}
MockBackendCtx is a mock backend context.
func (*MockBackendCtx) AttachCheckpointManager ¶
func (m *MockBackendCtx) AttachCheckpointManager(mgr *CheckpointManager)
AttachCheckpointManager attaches a checkpoint manager to the backend context.
func (*MockBackendCtx) CollectRemoteDuplicateRows ¶
func (*MockBackendCtx) CollectRemoteDuplicateRows(indexID int64, _ table.Table) error
CollectRemoteDuplicateRows implements BackendCtx.CollectRemoteDuplicateRows interface.
func (*MockBackendCtx) FinishAndUnregisterEngines ¶
func (*MockBackendCtx) FinishAndUnregisterEngines(_ UnregisterOpt) error
FinishAndUnregisterEngines implements BackendCtx interface.
func (*MockBackendCtx) GetCheckpointManager ¶
func (m *MockBackendCtx) GetCheckpointManager() *CheckpointManager
GetCheckpointManager returns the checkpoint manager attached to the backend context.
func (*MockBackendCtx) GetLocalBackend ¶
func (m *MockBackendCtx) GetLocalBackend() *local.Backend
GetLocalBackend returns the local backend.
type MockBackendCtxMgr ¶
type MockBackendCtxMgr struct {
// contains filtered or unexported fields
}
MockBackendCtxMgr is a mock backend context manager.
func NewMockBackendCtxMgr ¶
func NewMockBackendCtxMgr(sessCtxProvider func() sessionctx.Context) *MockBackendCtxMgr
NewMockBackendCtxMgr creates a new mock backend context manager.
func (*MockBackendCtxMgr) CheckMoreTasksAvailable ¶
func (m *MockBackendCtxMgr) CheckMoreTasksAvailable() (bool, error)
CheckMoreTasksAvailable implements BackendCtxMgr.CheckMoreTaskAvailable interface.
func (*MockBackendCtxMgr) EncodeJobSortPath ¶
func (m *MockBackendCtxMgr) EncodeJobSortPath(int64) string
EncodeJobSortPath implements BackendCtxMgr interface.
func (*MockBackendCtxMgr) Load ¶
func (m *MockBackendCtxMgr) Load(jobID int64) (BackendCtx, bool)
Load implements BackendCtxMgr.Load interface.
func (*MockBackendCtxMgr) Register ¶
func (m *MockBackendCtxMgr) Register(ctx context.Context, jobID int64, unique bool, etcdClient *clientv3.Client, pdSvcDiscovery sd.ServiceDiscovery, resourceGroupName string, importConc int, maxWriteSpeed int, initTS uint64) (BackendCtx, error)
Register implements BackendCtxMgr.Register interface.
func (*MockBackendCtxMgr) ResetSessCtx ¶
func (m *MockBackendCtxMgr) ResetSessCtx()
ResetSessCtx is only used for mocking test.
func (*MockBackendCtxMgr) Unregister ¶
func (m *MockBackendCtxMgr) Unregister(jobID int64)
Unregister implements BackendCtxMgr.Unregister interface.
type MockEngineInfo ¶
type MockEngineInfo struct {
// contains filtered or unexported fields
}
MockEngineInfo is a mock engine info.
func NewMockEngineInfo ¶
func NewMockEngineInfo(sessCtx sessionctx.Context) *MockEngineInfo
NewMockEngineInfo creates a new mock engine info.
func (*MockEngineInfo) Close ¶
func (*MockEngineInfo) Close(_ bool)
Close implements Engine.Close interface.
func (*MockEngineInfo) CreateWriter ¶
func (m *MockEngineInfo) CreateWriter(id int, _ *backend.LocalWriterConfig) (Writer, error)
CreateWriter implements Engine.CreateWriter interface.
func (*MockEngineInfo) Flush ¶
func (*MockEngineInfo) Flush() error
Flush implements Engine.Flush interface.
func (*MockEngineInfo) SetHook ¶
func (m *MockEngineInfo) SetHook(onWrite func(key, val []byte))
SetHook set the write hook.
type MockWriteHook ¶
type MockWriteHook func(key, val []byte)
MockWriteHook the hook for write in mock engine.
type MockWriter ¶
type MockWriter struct {
// contains filtered or unexported fields
}
MockWriter is a mock writer.
func (*MockWriter) LockForWrite ¶
func (*MockWriter) LockForWrite() func()
LockForWrite implements Writer.LockForWrite interface.
type ReorgCheckpoint ¶
type ReorgCheckpoint struct { LocalSyncKey kv.Key `json:"local_sync_key"` LocalKeyCount int `json:"local_key_count"` GlobalSyncKey kv.Key `json:"global_sync_key"` GlobalKeyCount int `json:"global_key_count"` InstanceAddr string `json:"instance_addr"` PhysicalID int64 `json:"physical_id"` // TS of next engine ingest. TS uint64 `json:"ts"` Version int64 `json:"version"` }
ReorgCheckpoint is the checkpoint for a reorg job.
type UnregisterOpt ¶
type UnregisterOpt int
UnregisterOpt controls the behavior of backend context unregistering.
const ( // OptCloseEngines only closes engines, it does not clean up sort path data. OptCloseEngines UnregisterOpt = 1 << iota // OptCleanData cleans up local sort dir data. OptCleanData // OptCheckDup checks if there is duplicate entry for unique indexes. OptCheckDup )
type Writer ¶
type Writer interface { // WriteRow writes one row into downstream. // To enable uniqueness check, the handle should be non-empty. WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error LockForWrite() (unlock func()) }
Writer is the interface for the writer that can be used to write key-value pairs.