Documentation ¶
Index ¶
- Constants
- Variables
- func CancelJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64) (err error)
- func CreateJob(ctx context.Context, conn sqlexec.SQLExecutor, db, table string, tableID int64, ...) (int64, error)
- func FailJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, errorMsg string) error
- func FinishJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, ...) error
- func GetActiveJobCnt(ctx context.Context, conn sqlexec.SQLExecutor) (int64, error)
- func GetCachedKVStoreFrom(pdAddr string, tls *common.TLS) (tidbkv.Storage, error)
- func GetMsgFromBRError(err error) string
- func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error)
- func GetTiKVModeSwitcherWithPDClient(ctx context.Context, logger *zap.Logger) (pd.Client, local.TiKVModeSwitcher, error)
- func Job2Step(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, step string) error
- func ProcessChunk(ctx context.Context, chunk *checkpoints.ChunkCheckpoint, ...) error
- func ProcessChunkWith(ctx context.Context, chunk *checkpoints.ChunkCheckpoint, ...) error
- func StartJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, step string) error
- type ASTArgs
- type ChunkProcessor
- type FieldMapping
- type ImportParameters
- type IndexRouteWriter
- type JobImportParam
- type JobImportResult
- type JobImporter
- type JobInfo
- type JobSummary
- type KVEncoder
- type LoadDataController
- func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error
- func (e *LoadDataController) CreateColAssignExprs(sctx sessionctx.Context) ([]expression.Expression, []stmtctx.SQLWarn, error)
- func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig
- func (e *LoadDataController) GetFieldCount() int
- func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo
- func (e *LoadDataController) GetParser(ctx context.Context, dataFileInfo LoadDataReaderInfo) (parser mydump.Parser, err error)
- func (e *LoadDataController) HandleSkipNRows(parser mydump.Parser) error
- func (e *LoadDataController) InitDataFiles(ctx context.Context) error
- func (e *LoadDataController) InitDataStore(ctx context.Context) error
- func (e *LoadDataController) IsGlobalSort() bool
- func (e *LoadDataController) IsLocalSort() bool
- func (e *LoadDataController) PopulateChunks(ctx context.Context) (ecp map[int32]*checkpoints.EngineCheckpoint, err error)
- func (e *LoadDataController) SetExecuteNodeCnt(cnt int)
- type LoadDataReaderInfo
- type Plan
- type TableImporter
- func (ti *TableImporter) Allocators() autoid.Allocators
- func (ti *TableImporter) Backend() *local.Backend
- func (ti *TableImporter) CheckDiskQuota(ctx context.Context)
- func (ti *TableImporter) Close() error
- func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) (int64, error)
- func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
- func (ti *TableImporter) OpenIndexEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
Constants ¶
const ( // DataFormatCSV represents the data source file of IMPORT INTO is csv. DataFormatCSV = "csv" // DataFormatDelimitedData delimited data. DataFormatDelimitedData = "delimited data" // DataFormatSQL represents the data source file of IMPORT INTO is mydumper-format DML file. DataFormatSQL = "sql" // DataFormatParquet represents the data source file of IMPORT INTO is parquet. DataFormatParquet = "parquet" // DefaultDiskQuota is the default disk quota for IMPORT INTO DefaultDiskQuota = config.ByteSize(50 << 30) // 50GiB )
const ( // JobStatusRunning exported since it's used in show import jobs JobStatusRunning = "running" // JobStepGlobalSorting is the first step when using global sort, // step goes from none -> global-sorting -> importing -> validating -> none. JobStepGlobalSorting = "global-sorting" // JobStepImporting is the first step when using local sort, // step goes from none -> importing -> validating -> none. // when used in global sort, it means importing the sorted data. // when used in local sort, it means encode&sort data and then importing the data. JobStepImporting = "importing" JobStepValidating = "validating" )
constants for job status and step.
Variables ¶
var ( MinDeliverBytes uint64 = 96 * units.KiB // 96 KB (data + index). batch at least this amount of bytes to reduce number of messages // MinDeliverRowCnt see default for tikv-importer.max-kv-pairs MinDeliverRowCnt = 4096 )
constants, make it a variable for test
var ( // CheckDiskQuotaInterval is the default time interval to check disk quota. // TODO: make it dynamically adjusting according to the speed of import and the disk size. CheckDiskQuotaInterval = 10 * time.Second )
var GetEtcdClient = getEtcdClient
GetEtcdClient returns an etcd client. exported for testing.
GetKVStore returns a kv.Storage. kv encoder of physical mode needs it.
var ( // LoadDataReadBlockSize is exposed for test. LoadDataReadBlockSize = int64(config.ReadBlockSize) )
var NewTiKVModeSwitcher = local.NewTiKVModeSwitcher
NewTiKVModeSwitcher make it a var, so we can mock it in tests.
var ( // TestLastImportJobID last created job id, used in unit test. TestLastImportJobID atomic.Int64 )
vars used for test.
var TestSyncCh = make(chan struct{})
TestSyncCh is used in unit test to synchronize the execution.
Functions ¶
func CancelJob ¶
CancelJob cancels import into job. Only a running/paused job can be canceled. check privileges using get before calling this method.
func CreateJob ¶
func CreateJob( ctx context.Context, conn sqlexec.SQLExecutor, db, table string, tableID int64, user string, parameters *ImportParameters, sourceFileSize int64, ) (int64, error)
CreateJob creates import into job by insert a record to system table. The AUTO_INCREMENT value will be returned as jobID.
func FailJob ¶
FailJob fails import into job. A job can only be failed once. It will not return error when there's no matched job.
func FinishJob ¶
func FinishJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, summary *JobSummary) error
FinishJob tries to finish a running job with jobID, change its status to finished, clear its step. It will not return error when there's no matched job.
func GetActiveJobCnt ¶
GetActiveJobCnt returns the count of active import jobs. Active import jobs include pending and running jobs.
func GetCachedKVStoreFrom ¶
GetCachedKVStoreFrom gets a cached kv store from PD address. Callers should NOT close the kv store.
func GetMsgFromBRError ¶
GetMsgFromBRError get msg from BR error. TODO: add GetMsg() to errors package to replace this function. see TestGetMsgFromBRError for more details.
func GetRegionSplitSizeKeys ¶
func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error)
GetRegionSplitSizeKeys gets the region split size and keys from PD.
func GetTiKVModeSwitcherWithPDClient ¶
func GetTiKVModeSwitcherWithPDClient(ctx context.Context, logger *zap.Logger) (pd.Client, local.TiKVModeSwitcher, error)
GetTiKVModeSwitcherWithPDClient creates a new TiKV mode switcher with its pd Client.
func Job2Step ¶
Job2Step tries to change the step of a running job with jobID. It will not return error when there's no matched job.
func ProcessChunk ¶
func ProcessChunk( ctx context.Context, chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataEngine, indexEngine *backend.OpenedEngine, progress *asyncloaddata.Progress, logger *zap.Logger, ) error
ProcessChunk processes a chunk, and write kv pairs to dataEngine and indexEngine.
func ProcessChunkWith ¶
func ProcessChunkWith( ctx context.Context, chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataWriter, indexWriter backend.EngineWriter, progress *asyncloaddata.Progress, logger *zap.Logger, ) error
ProcessChunkWith processes a chunk, and write kv pairs to dataWriter and indexWriter.
Types ¶
type ASTArgs ¶
type ASTArgs struct { FileLocRef ast.FileLocRefTp ColumnsAndUserVars []*ast.ColumnNameOrUserVar ColumnAssignments []*ast.Assignment OnDuplicate ast.OnDuplicateKeyHandlingType FieldsInfo *ast.FieldsClause LinesInfo *ast.LinesClause }
ASTArgs is the arguments for ast.LoadDataStmt. TODO: remove this struct and use the struct which can be serialized.
func ASTArgsFromImportPlan ¶
func ASTArgsFromImportPlan(plan *plannercore.ImportInto) *ASTArgs
ASTArgsFromImportPlan creates ASTArgs from plan.
func ASTArgsFromPlan ¶
func ASTArgsFromPlan(plan *plannercore.LoadData) *ASTArgs
ASTArgsFromPlan creates ASTArgs from plan.
func ASTArgsFromStmt ¶
ASTArgsFromStmt creates ASTArgs from statement.
type ChunkProcessor ¶
ChunkProcessor is used to process a chunk of data, include encode data to KV and deliver KV to local or global storage.
func NewLocalSortChunkProcessor ¶
func NewLocalSortChunkProcessor( parser mydump.Parser, encoder KVEncoder, kvCodec tikv.Codec, chunk *checkpoints.ChunkCheckpoint, logger *zap.Logger, diskQuotaLock *syncutil.RWMutex, dataWriter backend.EngineWriter, indexWriter backend.EngineWriter, ) ChunkProcessor
NewLocalSortChunkProcessor creates a new local sort chunk processor. exported for test.
type FieldMapping ¶
type FieldMapping struct { Column *table.Column UserVar *ast.VariableExpr }
FieldMapping indicates the relationship between input field and table column or user variable
type ImportParameters ¶
type ImportParameters struct { ColumnsAndVars string `json:"columns-and-vars,omitempty"` SetClause string `json:"set-clause,omitempty"` // for s3 URL, AK/SK is redacted for security FileLocation string `json:"file-location"` Format string `json:"format"` // only include what user specified, not include default value. Options map[string]interface{} `json:"options,omitempty"` }
ImportParameters is the parameters for import into statement. it's a minimal meta info to store in tidb_import_jobs for diagnose. for detailed info, see tidb_global_tasks.
func (*ImportParameters) String ¶
func (ip *ImportParameters) String() string
String implements fmt.Stringer interface.
type IndexRouteWriter ¶
type IndexRouteWriter struct {
// contains filtered or unexported fields
}
IndexRouteWriter is a writer for index when using global sort. we route kvs of different index to different writer in order to make merge sort easier, else kv data of all subtasks will all be overlapped.
drawback of doing this is that the number of writers need to open will be index-count * encode-concurrency, when the table has many indexes, and each writer will take 256MiB buffer on default. this will take a lot of memory, or even OOM.
func NewIndexRouteWriter ¶
func NewIndexRouteWriter(logger *zap.Logger, writerFactory func(int64) *external.Writer) *IndexRouteWriter
NewIndexRouteWriter creates a new IndexRouteWriter.
func (*IndexRouteWriter) AppendRows ¶
AppendRows implements backend.EngineWriter interface.
func (*IndexRouteWriter) Close ¶
func (w *IndexRouteWriter) Close(ctx context.Context) (backend.ChunkFlushStatus, error)
Close implements backend.EngineWriter interface.
func (*IndexRouteWriter) IsSynced ¶
func (*IndexRouteWriter) IsSynced() bool
IsSynced implements backend.EngineWriter interface.
type JobImportParam ¶
type JobImportParam struct { Job *asyncloaddata.Job Group *errgroup.Group GroupCtx context.Context // should be closed in the end of the job. Done chan struct{} Progress *asyncloaddata.Progress }
JobImportParam is the param of the job import.
type JobImportResult ¶
type JobImportResult struct { Affected uint64 Warnings []stmtctx.SQLWarn ColSizeMap map[int64]int64 }
JobImportResult is the result of the job import.
type JobImporter ¶
type JobImporter interface { // Param returns the param of the job import. Param() *JobImportParam // Import imports the job. // import should run in routines using param.Group, when import finished, it should close param.Done. // during import, we should use param.GroupCtx, so this method has no context param. Import() // Result returns the result of the job import. Result() JobImportResult io.Closer }
JobImporter is the interface for importing a job.
type JobInfo ¶
type JobInfo struct { ID int64 CreateTime types.Time StartTime types.Time EndTime types.Time TableSchema string TableName string TableID int64 CreatedBy string Parameters ImportParameters SourceFileSize int64 Status string // in SHOW IMPORT JOB, we name it as phase. // here, we use the same name as in distributed framework. Step string // the summary info of the job, it's updated only when the job is finished. // for running job, we should query the progress from the distributed framework. Summary *JobSummary ErrorMessage string }
JobInfo is the information of import into job.
func GetAllViewableJobs ¶
func GetAllViewableJobs(ctx context.Context, conn sqlexec.SQLExecutor, user string, hasSuperPriv bool) ([]*JobInfo, error)
GetAllViewableJobs gets all viewable jobs.
func GetJob ¶
func GetJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, user string, hasSuperPriv bool) (*JobInfo, error)
GetJob returns the job with the given id if the user has privilege. hasSuperPriv: whether the user has super privilege. If the user has super privilege, the user can show or operate all jobs, else the user can only show or operate his own jobs.
type JobSummary ¶
type JobSummary struct { // ImportedRows is the number of rows imported into TiKV. ImportedRows uint64 `json:"imported-rows,omitempty"` }
JobSummary is the summary info of import into job.
type KVEncoder ¶
type KVEncoder interface { Encode(row []types.Datum, rowID int64) (*kv.Pairs, error) // GetColumnSize returns the size of each column in the current encoder. GetColumnSize() map[int64]int64 io.Closer }
KVEncoder encodes a row of data into a KV pair.
func NewTableKVEncoder ¶
func NewTableKVEncoder( config *encode.EncodingConfig, ti *TableImporter, ) (KVEncoder, error)
NewTableKVEncoder creates a new tableKVEncoder. exported for test.
type LoadDataController ¶
type LoadDataController struct { *Plan *ASTArgs Table table.Table // how input field(or input column) from data file is mapped, either to a column or variable. // if there's NO column list clause in SQL statement, then it's table's columns // else it's user defined list. FieldMappings []*FieldMapping // see InsertValues.InsertColumns // Note: our behavior is different with mysql. such as for table t(a,b) // - "...(a,a) set a=100" is allowed in mysql, but not in tidb // - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored. // - ref columns in set clause is allowed in mysql, but not in tidb InsertColumns []*table.Column // GlobalSortStore is used to store sorted data when using global sort. GlobalSortStore storage.ExternalStorage // ExecuteNodesCnt is the count of execute nodes. ExecuteNodesCnt int // contains filtered or unexported fields }
LoadDataController load data controller. todo: need a better name
func NewLoadDataController ¶
func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*LoadDataController, error)
NewLoadDataController create new controller.
func (*LoadDataController) CheckRequirements ¶
func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error
CheckRequirements checks the requirements for IMPORT INTO. we check the following things here:
- target table should be empty
- no CDC or PiTR tasks running
todo: check if there's running lightning tasks? we check them one by one, and return the first error we meet.
func (*LoadDataController) CreateColAssignExprs ¶
func (e *LoadDataController) CreateColAssignExprs(sctx sessionctx.Context) ([]expression.Expression, []stmtctx.SQLWarn, error)
CreateColAssignExprs creates the column assignment expressions using session context. RewriteAstExpr will write ast node in place(due to xxNode.Accept), but it doesn't change node content, so we sync it.
func (*LoadDataController) GenerateCSVConfig ¶
func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig
GenerateCSVConfig generates a CSV config for parser from LoadDataWorker.
func (*LoadDataController) GetFieldCount ¶
func (e *LoadDataController) GetFieldCount() int
GetFieldCount get field count.
func (*LoadDataController) GetLoadDataReaderInfos ¶
func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo
GetLoadDataReaderInfos returns the LoadDataReaderInfo for each data file.
func (*LoadDataController) GetParser ¶
func (e *LoadDataController) GetParser( ctx context.Context, dataFileInfo LoadDataReaderInfo, ) (parser mydump.Parser, err error)
GetParser returns a parser for the data file.
func (*LoadDataController) HandleSkipNRows ¶
func (e *LoadDataController) HandleSkipNRows(parser mydump.Parser) error
HandleSkipNRows skips the first N rows of the data file.
func (*LoadDataController) InitDataFiles ¶
func (e *LoadDataController) InitDataFiles(ctx context.Context) error
InitDataFiles initializes the data store and files. it will call InitDataStore internally.
func (*LoadDataController) InitDataStore ¶
func (e *LoadDataController) InitDataStore(ctx context.Context) error
InitDataStore initializes the data store.
func (*LoadDataController) IsGlobalSort ¶
func (e *LoadDataController) IsGlobalSort() bool
IsGlobalSort returns true if we sort data on global storage.
func (*LoadDataController) IsLocalSort ¶
func (e *LoadDataController) IsLocalSort() bool
IsLocalSort returns true if we sort data on local disk.
func (*LoadDataController) PopulateChunks ¶
func (e *LoadDataController) PopulateChunks(ctx context.Context) (ecp map[int32]*checkpoints.EngineCheckpoint, err error)
PopulateChunks populates chunks from table regions. in dist framework, this should be done in the tidb node which is responsible for splitting job into subtasks then table-importer handles data belongs to the subtask.
func (*LoadDataController) SetExecuteNodeCnt ¶
func (e *LoadDataController) SetExecuteNodeCnt(cnt int)
SetExecuteNodeCnt sets the execute node count.
type LoadDataReaderInfo ¶
type LoadDataReaderInfo struct { // Opener can be called at needed to get a io.ReadSeekCloser. It will only // be called once. Opener func(ctx context.Context) (io.ReadSeekCloser, error) // Remote is not nil only if load from cloud storage. Remote *mydump.SourceFileMeta }
LoadDataReaderInfo provides information for a data reader of LOAD DATA.
type Plan ¶
type Plan struct { DBName string DBID int64 // TableInfo is the table info we used during import, we might change it // if add index by SQL is enabled(it's disabled now). TableInfo *model.TableInfo // DesiredTableInfo is the table info before import, and the desired table info // after import. DesiredTableInfo *model.TableInfo Path string Format string // Data interpretation is restrictive if the SQL mode is restrictive and neither // the IGNORE nor the LOCAL modifier is specified. Errors terminate the load // operation. // ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments Restrictive bool SQLMode mysql.SQLMode // Charset is the charset of the data file when file is CSV or TSV. // it might be nil when using LOAD DATA and no charset is specified. // for IMPORT INTO, it is always non-nil. Charset *string ImportantSysVars map[string]string // used for LOAD DATA and CSV format of IMPORT INTO FieldNullDef []string // this is not used in IMPORT INTO NullValueOptEnclosed bool // LinesStartingBy is not used in IMPORT INTO // FieldsOptEnclosed is not used in either IMPORT INTO or LOAD DATA plannercore.LineFieldsInfo IgnoreLines uint64 DiskQuota config.ByteSize Checksum config.PostOpLevel ThreadCnt int64 MaxWriteSpeed config.ByteSize SplitFile bool MaxRecordedErrors int64 Detached bool DisableTiKVImportMode bool MaxEngineSize config.ByteSize CloudStorageURI string // used for checksum in physical mode DistSQLScanConcurrency int // todo: remove it when load data code is reverted. InImportInto bool // only initialized for IMPORT INTO, used when creating job. Parameters *ImportParameters `json:"-"` // the user who executes the statement, in the form of user@host // only initialized for IMPORT INTO User string `json:"-"` IsRaftKV2 bool // total data file size in bytes. TotalFileSize int64 }
Plan describes the plan of LOAD DATA and IMPORT INTO.
func NewImportPlan ¶
func NewImportPlan(userSctx sessionctx.Context, plan *plannercore.ImportInto, tbl table.Table) (*Plan, error)
NewImportPlan creates a new import into plan.
func NewPlanFromLoadDataPlan ¶
func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.LoadData) (*Plan, error)
NewPlanFromLoadDataPlan creates a import plan from LOAD DATA.
func (*Plan) InitTiKVConfigs ¶
InitTiKVConfigs initializes some TiKV related configs.
type TableImporter ¶
type TableImporter struct { *JobImportParam *LoadDataController // contains filtered or unexported fields }
TableImporter is a table importer.
func NewTableImporter ¶
func NewTableImporter(param *JobImportParam, e *LoadDataController, taskID int64) (ti *TableImporter, err error)
NewTableImporter creates a new table importer.
func (*TableImporter) Allocators ¶
func (ti *TableImporter) Allocators() autoid.Allocators
Allocators returns allocators used to record max used ID, i.e. PanickingAllocators.
func (*TableImporter) Backend ¶
func (ti *TableImporter) Backend() *local.Backend
Backend returns the backend of the importer.
func (*TableImporter) CheckDiskQuota ¶
func (ti *TableImporter) CheckDiskQuota(ctx context.Context)
CheckDiskQuota checks disk quota.
func (*TableImporter) Close ¶
func (ti *TableImporter) Close() error
Close implements the io.Closer interface.
func (*TableImporter) ImportAndCleanup ¶
func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) (int64, error)
ImportAndCleanup imports the engine and cleanup the engine data.
func (*TableImporter) OpenDataEngine ¶
func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
OpenDataEngine opens a data engine.
func (*TableImporter) OpenIndexEngine ¶
func (ti *TableImporter) OpenIndexEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
OpenIndexEngine opens an index engine.