Documentation ¶
Index ¶
- Constants
- Variables
- func GetMsgFromBRError(err error) string
- func ProcessChunk(ctx context.Context, chunk *checkpoints.ChunkCheckpoint, ...) (err error)
- type FieldMapping
- type JobImportParam
- type JobImporter
- type LoadDataController
- func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error
- func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig
- func (e *LoadDataController) GetFieldCount() int
- func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo
- func (e *LoadDataController) GetParser(ctx context.Context, dataFileInfo LoadDataReaderInfo) (parser mydump.Parser, err error)
- func (e *LoadDataController) InitDataFiles(ctx context.Context) error
- type LoadDataReaderInfo
- type Plan
- type TableImporter
- func (ti *TableImporter) Close() error
- func (ti *TableImporter) Import()
- func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) error
- func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
- func (ti *TableImporter) OpenIndexEngine(ctx context.Context) (*backend.OpenedEngine, error)
- func (ti *TableImporter) Param() *JobImportParam
- func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpoints.EngineCheckpoint, error)
- func (ti *TableImporter) Result() string
Constants ¶
const ( // LoadDataFormatDelimitedData delimited data. LoadDataFormatDelimitedData = "delimited data" // LoadDataFormatSQLDump represents the data source file of LOAD DATA is mydumper-format DML file. LoadDataFormatSQLDump = "sql file" // LoadDataFormatParquet represents the data source file of LOAD DATA is parquet. LoadDataFormatParquet = "parquet" // LogicalImportMode represents the import mode is SQL-like. LogicalImportMode = "logical" // PhysicalImportMode represents the import mode is KV-like. PhysicalImportMode = "physical" )
Variables ¶
var ( MinDeliverBytes uint64 = 96 * units.KiB // 96 KB (data + index). batch at least this amount of bytes to reduce number of messages // see default for tikv-importer.max-kv-pairs MinDeliverRowCnt = 4096 )
constants, make it a variable for test
GetKVStore returns a kv.Storage. kv encoder of physical mode needs it.
var ( // LoadDataReadBlockSize is exposed for test. LoadDataReadBlockSize = int64(config.ReadBlockSize) )
var TestSyncCh = make(chan struct{})
TestSyncCh is used in unit test to synchronize the execution of LOAD DATA.
Functions ¶
func GetMsgFromBRError ¶
GetMsgFromBRError get msg from BR error. TODO: add GetMsg() to errors package to replace this function. see TestGetMsgFromBRError for more details.
func ProcessChunk ¶
func ProcessChunk( ctx context.Context, chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataEngine, indexEngine *backend.OpenedEngine, logger *zap.Logger, ) (err error)
ProcessChunk processes a chunk, and write kv pairs to dataEngine and indexEngine.
Types ¶
type FieldMapping ¶
type FieldMapping struct { Column *table.Column UserVar *ast.VariableExpr }
FieldMapping indicates the relationship between input field and table column or user variable
type JobImportParam ¶
type JobImportParam struct { Job *asyncloaddata.Job Group *errgroup.Group GroupCtx context.Context // should be closed in the end of the job. Done chan struct{} Progress *asyncloaddata.Progress }
JobImportParam is the param of the job import.
type JobImporter ¶
type JobImporter interface { // Param returns the param of the job import. Param() *JobImportParam // Import imports the job. // import should run in routines using param.Group, when import finished, it should close param.Done. // during import, we should use param.GroupCtx, so this method has no context param. Import() // Result returns the result of the job import. // todo: return a struct Result() string io.Closer }
JobImporter is the interface for importing a job.
type LoadDataController ¶
type LoadDataController struct { FileLocRef ast.FileLocRefTp Path string Format string ColumnsAndUserVars []*ast.ColumnNameOrUserVar ColumnAssignments []*ast.Assignment OnDuplicate ast.OnDuplicateKeyHandlingType Table table.Table DBName string DBID int64 // how input field(or input column) from data file is mapped, either to a column or variable. // if there's NO column list clause in load data statement, then it's table's columns // else it's user defined list. FieldMappings []*FieldMapping // see InsertValues.InsertColumns // todo: our behavior is different with mysql. such as for table t(a,b) // - "...(a,a) set a=100" is allowed in mysql, but not in tidb // - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored. // - ref columns in set clause is allowed in mysql, but not in tidb InsertColumns []*table.Column // Data interpretation is restrictive if the SQL mode is restrictive and neither // the IGNORE nor the LOCAL modifier is specified. Errors terminate the load // operation. // ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments Restrictive bool // used for DELIMITED DATA format FieldNullDef []string NullValueOptEnclosed bool plannercore.LineFieldsInfo IgnoreLines uint64 // import options ImportMode string ThreadCnt int64 BatchSize int64 Detached bool // total data file size in bytes, only initialized when load from remote. TotalFileSize int64 // user session context. DO NOT use it if load is in DETACHED mode. UserCtx sessionctx.Context // contains filtered or unexported fields }
LoadDataController load data controller. todo: need a better name
func NewLoadDataController ¶
func NewLoadDataController(userCtx sessionctx.Context, plan *Plan, tbl table.Table) (*LoadDataController, error)
NewLoadDataController create new controller.
func (*LoadDataController) CheckRequirements ¶
func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error
CheckRequirements checks the requirements for load data.
func (*LoadDataController) GenerateCSVConfig ¶
func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig
GenerateCSVConfig generates a CSV config for parser from LoadDataWorker.
func (*LoadDataController) GetFieldCount ¶
func (e *LoadDataController) GetFieldCount() int
GetFieldCount get field count.
func (*LoadDataController) GetLoadDataReaderInfos ¶
func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo
GetLoadDataReaderInfos returns the LoadDataReaderInfo for each data file.
func (*LoadDataController) GetParser ¶
func (e *LoadDataController) GetParser( ctx context.Context, dataFileInfo LoadDataReaderInfo, ) (parser mydump.Parser, err error)
GetParser returns a parser for the data file.
func (*LoadDataController) InitDataFiles ¶
func (e *LoadDataController) InitDataFiles(ctx context.Context) error
InitDataFiles initializes the data store and load data files.
type LoadDataReaderInfo ¶
type LoadDataReaderInfo struct { // Opener can be called at needed to get a io.ReadSeekCloser. It will only // be called once. Opener func(ctx context.Context) (io.ReadSeekCloser, error) // Remote is not nil only if load from cloud storage. Remote *mydump.SourceFileMeta }
LoadDataReaderInfo provides information for a data reader of LOAD DATA.
type Plan ¶
type Plan struct { TableName *ast.TableName TableInfo *model.TableInfo FileLocRef ast.FileLocRefTp Path string Format string ColumnsAndUserVars []*ast.ColumnNameOrUserVar ColumnAssignments []*ast.Assignment OnDuplicate ast.OnDuplicateKeyHandlingType FieldsInfo *ast.FieldsClause LinesInfo *ast.LinesClause Restrictive bool IgnoreLines *uint64 SQLMode mysql.SQLMode Charset *string ImportantSysVars map[string]string ImportMode string DiskQuota config.ByteSize Checksum config.PostOpLevel AddIndex bool Analyze config.PostOpLevel ThreadCnt int64 BatchSize int64 MaxWriteSpeed config.ByteSize SplitFile bool MaxRecordedErrors int64 Detached bool DistSQLScanConcurrency int }
Plan describes the plan of LOAD DATA.
func NewPlan ¶
func NewPlan(userSctx sessionctx.Context, plan *plannercore.LoadData, tbl table.Table) (*Plan, error)
NewPlan creates a new load data plan.
type TableImporter ¶
type TableImporter struct { *JobImportParam *LoadDataController // contains filtered or unexported fields }
TableImporter is a table importer.
func NewTableImporter ¶
func NewTableImporter(param *JobImportParam, e *LoadDataController) (ti *TableImporter, err error)
NewTableImporter creates a new table importer.
func (*TableImporter) Close ¶
func (ti *TableImporter) Close() error
Close implements the io.Closer interface.
func (*TableImporter) Import ¶
func (ti *TableImporter) Import()
Import implements JobImporter.Import.
func (*TableImporter) ImportAndCleanup ¶
func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) error
ImportAndCleanup imports the engine and cleanup the engine data.
func (*TableImporter) OpenDataEngine ¶
func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)
OpenDataEngine opens a data engine.
func (*TableImporter) OpenIndexEngine ¶
func (ti *TableImporter) OpenIndexEngine(ctx context.Context) (*backend.OpenedEngine, error)
OpenIndexEngine opens an index engine.
func (*TableImporter) Param ¶
func (ti *TableImporter) Param() *JobImportParam
Param implements JobImporter.Param.
func (*TableImporter) PopulateChunks ¶
func (ti *TableImporter) PopulateChunks(ctx context.Context) (map[int32]*checkpoints.EngineCheckpoint, error)
PopulateChunks populates chunks from table regions. in dist framework, this should be done in the tidb node which is responsible for splitting job into subtasks then table-importer handles data belongs to the subtask.
func (*TableImporter) Result ¶
func (ti *TableImporter) Result() string
Result implements JobImporter.Result.