importer

package
v1.1.0-beta.0...-61c2172 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: Apache-2.0 Imports: 84 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DataFormatCSV represents the data source file of IMPORT INTO is csv.
	DataFormatCSV = "csv"
	// DataFormatDelimitedData delimited data.
	DataFormatDelimitedData = "delimited data"
	// DataFormatSQL represents the data source file of IMPORT INTO is mydumper-format DML file.
	DataFormatSQL = "sql"
	// DataFormatParquet represents the data source file of IMPORT INTO is parquet.
	DataFormatParquet = "parquet"

	// DefaultDiskQuota is the default disk quota for IMPORT INTO
	DefaultDiskQuota = config.ByteSize(50 << 30) // 50GiB

)
View Source
const (

	// JobStatusRunning exported since it's used in show import jobs
	JobStatusRunning = "running"

	// JobStepGlobalSorting is the first step when using global sort,
	// step goes from none -> global-sorting -> importing -> validating -> none.
	JobStepGlobalSorting = "global-sorting"
	// JobStepImporting is the first step when using local sort,
	// step goes from none -> importing -> validating -> none.
	// when used in global sort, it means importing the sorted data.
	// when used in local sort, it means encode&sort data and then importing the data.
	JobStepImporting  = "importing"
	JobStepValidating = "validating"
)

constants for job status and step.

Variables

View Source
var (
	MinDeliverBytes uint64 = 96 * units.KiB // 96 KB (data + index). batch at least this amount of bytes to reduce number of messages
	// MinDeliverRowCnt see default for tikv-importer.max-kv-pairs
	MinDeliverRowCnt = 4096
)

constants, make it a variable for test

View Source
var (
	// CheckDiskQuotaInterval is the default time interval to check disk quota.
	// TODO: make it dynamically adjusting according to the speed of import and the disk size.
	CheckDiskQuotaInterval = 10 * time.Second
)
View Source
var GetEtcdClient = getEtcdClient

GetEtcdClient returns an etcd client. exported for testing.

View Source
var (

	// LoadDataReadBlockSize is exposed for test.
	LoadDataReadBlockSize = int64(config.ReadBlockSize)
)
View Source
var (
	// NewClientWithContext returns a kv.Client.
	NewClientWithContext = pd.NewClientWithContext
)
View Source
var NewTiKVModeSwitcher = local.NewTiKVModeSwitcher

NewTiKVModeSwitcher make it a var, so we can mock it in tests.

View Source
var (
	// TestLastImportJobID last created job id, used in unit test.
	TestLastImportJobID atomic.Int64
)

vars used for test.

View Source
var TestSyncCh = make(chan struct{})

TestSyncCh is used in unit test to synchronize the execution.

Functions

func CancelJob

func CancelJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64) (err error)

CancelJob cancels import into job. Only a running/paused job can be canceled. check privileges using get before calling this method.

func CreateJob

func CreateJob(
	ctx context.Context,
	conn sqlexec.SQLExecutor,
	db, table string,
	tableID int64,
	user string,
	parameters *ImportParameters,
	sourceFileSize int64,
) (int64, error)

CreateJob creates import into job by insert a record to system table. The AUTO_INCREMENT value will be returned as jobID.

func FailJob

func FailJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, errorMsg string) error

FailJob fails import into job. A job can only be failed once. It will not return error when there's no matched job.

func FinishJob

func FinishJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, summary *JobSummary) error

FinishJob tries to finish a running job with jobID, change its status to finished, clear its step. It will not return error when there's no matched job.

func FlushTableStats

func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64, result *JobImportResult) error

FlushTableStats flushes the stats of the table.

func GetActiveJobCnt

func GetActiveJobCnt(ctx context.Context, conn sqlexec.SQLExecutor, tableSchema, tableName string) (int64, error)

GetActiveJobCnt returns the count of active import jobs. Active import jobs include pending and running jobs.

func GetImportRootDir

func GetImportRootDir(tidbCfg *tidb.Config) string

GetImportRootDir returns the root directory for import. The directory structure is like:

-> /path/to/tidb-tmpdir
  -> import-4000
  -> 1
  -> some-uuid

exported for testing.

func GetMsgFromBRError

func GetMsgFromBRError(err error) string

GetMsgFromBRError get msg from BR error. TODO: add GetMsg() to errors package to replace this function. see TestGetMsgFromBRError for more details.

func GetRegionSplitSizeKeys

func GetRegionSplitSizeKeys(ctx context.Context) (regionSplitSize int64, regionSplitKeys int64, err error)

GetRegionSplitSizeKeys gets the region split size and keys from PD.

func GetTargetNodeCPUCnt

func GetTargetNodeCPUCnt(ctx context.Context, sourceType DataSourceType, path string) (int, error)

GetTargetNodeCPUCnt get cpu count of target node where the import into job will be executed. target node is current node if it's server-disk import, import from query or disttask is disabled, else it's the node managed by disttask. exported for testing.

func Job2Step

func Job2Step(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, step string) error

Job2Step tries to change the step of a running job with jobID. It will not return error when there's no matched job.

func PostProcess

func PostProcess(
	ctx context.Context,
	se sessionctx.Context,
	maxIDs map[autoid.AllocatorType]int64,
	plan *Plan,
	localChecksum *verify.KVGroupChecksum,
	logger *zap.Logger,
) (err error)

PostProcess does the post-processing for the task. exported for testing.

func ProcessChunk

func ProcessChunk(
	ctx context.Context,
	chunk *checkpoints.ChunkCheckpoint,
	tableImporter *TableImporter,
	dataEngine, indexEngine *backend.OpenedEngine,
	progress *Progress,
	logger *zap.Logger,
	groupChecksum *verification.KVGroupChecksum,
) error

ProcessChunk processes a chunk, and write kv pairs to dataEngine and indexEngine.

func ProcessChunkWithWriter

func ProcessChunkWithWriter(
	ctx context.Context,
	chunk *checkpoints.ChunkCheckpoint,
	tableImporter *TableImporter,
	dataWriter, indexWriter backend.EngineWriter,
	progress *Progress,
	logger *zap.Logger,
	groupChecksum *verification.KVGroupChecksum,
) error

ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.

func RebaseAllocatorBases

func RebaseAllocatorBases(ctx context.Context, kvStore tidbkv.Storage, maxIDs map[autoid.AllocatorType]int64, plan *Plan, logger *zap.Logger) (err error)

RebaseAllocatorBases rebase the allocator bases.

func StartJob

func StartJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, step string) error

StartJob tries to start a pending job with jobID, change its status/step to running/input step. It will not return error when there's no matched job or the job has already started.

func VerifyChecksum

func VerifyChecksum(ctx context.Context, plan *Plan, localChecksum verify.KVChecksum, se sessionctx.Context, logger *zap.Logger) error

VerifyChecksum verify the checksum of the table.

Types

type ASTArgs

type ASTArgs struct {
	FileLocRef         ast.FileLocRefTp
	ColumnsAndUserVars []*ast.ColumnNameOrUserVar
	ColumnAssignments  []*ast.Assignment
	OnDuplicate        ast.OnDuplicateKeyHandlingType
	FieldsInfo         *ast.FieldsClause
	LinesInfo          *ast.LinesClause
}

ASTArgs is the arguments for ast.LoadDataStmt. TODO: remove this struct and use the struct which can be serialized.

func ASTArgsFromImportPlan

func ASTArgsFromImportPlan(plan *plannercore.ImportInto) *ASTArgs

ASTArgsFromImportPlan creates ASTArgs from plan.

func ASTArgsFromPlan

func ASTArgsFromPlan(plan *plannercore.LoadData) *ASTArgs

ASTArgsFromPlan creates ASTArgs from plan.

func ASTArgsFromStmt

func ASTArgsFromStmt(stmt string) (*ASTArgs, error)

ASTArgsFromStmt creates ASTArgs from statement.

type ChunkProcessor

type ChunkProcessor interface {
	Process(ctx context.Context) error
}

ChunkProcessor is used to process a chunk of data, include encode data to KV and deliver KV to local or global storage.

func NewFileChunkProcessor

func NewFileChunkProcessor(
	parser mydump.Parser,
	encoder KVEncoder,
	keyspace []byte,
	chunk *checkpoints.ChunkCheckpoint,
	logger *zap.Logger,
	diskQuotaLock *syncutil.RWMutex,
	dataWriter backend.EngineWriter,
	indexWriter backend.EngineWriter,
	groupChecksum *verify.KVGroupChecksum,
) ChunkProcessor

NewFileChunkProcessor creates a new local sort chunk processor. exported for test.

type DataSourceType

type DataSourceType string

DataSourceType indicates the data source type of IMPORT INTO.

const (
	// DataSourceTypeFile represents the data source of IMPORT INTO is file.
	// exported for test.
	DataSourceTypeFile DataSourceType = "file"
	// DataSourceTypeQuery represents the data source of IMPORT INTO is query.
	DataSourceTypeQuery DataSourceType = "query"
)

func (DataSourceType) String

func (t DataSourceType) String() string

type FieldMapping

type FieldMapping struct {
	Column  *table.Column
	UserVar *ast.VariableExpr
}

FieldMapping indicates the relationship between input field and table column or user variable

type ImportParameters

type ImportParameters struct {
	ColumnsAndVars string `json:"columns-and-vars,omitempty"`
	SetClause      string `json:"set-clause,omitempty"`
	// for s3 URL, AK/SK is redacted for security
	FileLocation string `json:"file-location"`
	Format       string `json:"format"`
	// only include what user specified, not include default value.
	Options map[string]any `json:"options,omitempty"`
}

ImportParameters is the parameters for import into statement. it's a minimal meta info to store in tidb_import_jobs for diagnose. for detailed info, see tidb_global_tasks.

func (*ImportParameters) String

func (ip *ImportParameters) String() string

String implements fmt.Stringer interface.

type IndexRouteWriter

type IndexRouteWriter struct {
	// contains filtered or unexported fields
}

IndexRouteWriter is a writer for index when using global sort. we route kvs of different index to different writer in order to make merge sort easier, else kv data of all subtasks will all be overlapped.

func NewIndexRouteWriter

func NewIndexRouteWriter(logger *zap.Logger, writerFactory func(int64) *external.Writer) *IndexRouteWriter

NewIndexRouteWriter creates a new IndexRouteWriter.

func (*IndexRouteWriter) AppendRows

func (w *IndexRouteWriter) AppendRows(ctx context.Context, _ []string, rows encode.Rows) error

AppendRows implements backend.EngineWriter interface.

func (*IndexRouteWriter) Close

Close implements backend.EngineWriter interface.

func (*IndexRouteWriter) IsSynced

func (*IndexRouteWriter) IsSynced() bool

IsSynced implements backend.EngineWriter interface.

type JobImportResult

type JobImportResult struct {
	Affected   uint64
	Warnings   []stmtctx.SQLWarn
	ColSizeMap map[int64]int64
}

JobImportResult is the result of the job import.

type JobInfo

type JobInfo struct {
	ID             int64
	CreateTime     types.Time
	StartTime      types.Time
	EndTime        types.Time
	TableSchema    string
	TableName      string
	TableID        int64
	CreatedBy      string
	Parameters     ImportParameters
	SourceFileSize int64
	Status         string
	// in SHOW IMPORT JOB, we name it as phase.
	// here, we use the same name as in distributed framework.
	Step string
	// the summary info of the job, it's updated only when the job is finished.
	// for running job, we should query the progress from the distributed framework.
	Summary      *JobSummary
	ErrorMessage string
}

JobInfo is the information of import into job.

func GetAllViewableJobs

func GetAllViewableJobs(ctx context.Context, conn sqlexec.SQLExecutor, user string, hasSuperPriv bool) ([]*JobInfo, error)

GetAllViewableJobs gets all viewable jobs.

func GetJob

func GetJob(ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, user string, hasSuperPriv bool) (*JobInfo, error)

GetJob returns the job with the given id if the user has privilege. hasSuperPriv: whether the user has super privilege. If the user has super privilege, the user can show or operate all jobs, else the user can only show or operate his own jobs.

func (*JobInfo) CanCancel

func (j *JobInfo) CanCancel() bool

CanCancel returns whether the job can be cancelled.

type JobSummary

type JobSummary struct {
	// ImportedRows is the number of rows imported into TiKV.
	ImportedRows uint64 `json:"imported-rows,omitempty"`
}

JobSummary is the summary info of import into job.

type KVEncoder

type KVEncoder interface {
	Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
	// GetColumnSize returns the size of each column in the current encoder.
	GetColumnSize() map[int64]int64
	io.Closer
}

KVEncoder encodes a row of data into a KV pair.

func NewTableKVEncoder

func NewTableKVEncoder(
	config *encode.EncodingConfig,
	ti *TableImporter,
) (KVEncoder, error)

NewTableKVEncoder creates a new tableKVEncoder. exported for test.

type LoadDataController

type LoadDataController struct {
	*Plan
	*ASTArgs

	Table table.Table

	// how input field(or input column) from data file is mapped, either to a column or variable.
	// if there's NO column list clause in SQL statement, then it's table's columns
	// else it's user defined list.
	FieldMappings []*FieldMapping
	// see InsertValues.InsertColumns
	// Note: our behavior is different with mysql. such as for table t(a,b)
	// - "...(a,a) set a=100" is allowed in mysql, but not in tidb
	// - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored.
	// - ref columns in set clause is allowed in mysql, but not in tidb
	InsertColumns []*table.Column

	// GlobalSortStore is used to store sorted data when using global sort.
	GlobalSortStore storage.ExternalStorage
	// ExecuteNodesCnt is the count of execute nodes.
	ExecuteNodesCnt int
	// contains filtered or unexported fields
}

LoadDataController load data controller. todo: need a better name

func NewLoadDataController

func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs) (*LoadDataController, error)

NewLoadDataController create new controller.

func (*LoadDataController) CheckRequirements

func (e *LoadDataController) CheckRequirements(ctx context.Context, conn sqlexec.SQLExecutor) error

CheckRequirements checks the requirements for IMPORT INTO. we check the following things here:

  • when import from file 1. there is no active job on the target table 2. the total file size > 0 3. if global sort, thread count >= 16 and have required privileges
  • target table should be empty
  • no CDC or PiTR tasks running

we check them one by one, and return the first error we meet.

func (*LoadDataController) CreateColAssignExprs

func (e *LoadDataController) CreateColAssignExprs(sctx sessionctx.Context) ([]expression.Expression, []stmtctx.SQLWarn, error)

CreateColAssignExprs creates the column assignment expressions using session context. RewriteAstExpr will write ast node in place(due to xxNode.Accept), but it doesn't change node content, so we sync it.

func (*LoadDataController) FullTableName

func (e *LoadDataController) FullTableName() string

FullTableName return FQDN of the table.

func (*LoadDataController) GenerateCSVConfig

func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig

GenerateCSVConfig generates a CSV config for parser from LoadDataWorker.

func (*LoadDataController) GetFieldCount

func (e *LoadDataController) GetFieldCount() int

GetFieldCount get field count.

func (*LoadDataController) GetLoadDataReaderInfos

func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo

GetLoadDataReaderInfos returns the LoadDataReaderInfo for each data file.

func (*LoadDataController) GetParser

func (e *LoadDataController) GetParser(
	ctx context.Context,
	dataFileInfo LoadDataReaderInfo,
) (parser mydump.Parser, err error)

GetParser returns a parser for the data file.

func (*LoadDataController) HandleSkipNRows

func (e *LoadDataController) HandleSkipNRows(parser mydump.Parser) error

HandleSkipNRows skips the first N rows of the data file.

func (*LoadDataController) InitDataFiles

func (e *LoadDataController) InitDataFiles(ctx context.Context) error

InitDataFiles initializes the data store and files. it will call InitDataStore internally.

func (*LoadDataController) InitDataStore

func (e *LoadDataController) InitDataStore(ctx context.Context) error

InitDataStore initializes the data store.

func (*LoadDataController) InitTiKVConfigs

func (e *LoadDataController) InitTiKVConfigs(ctx context.Context, sctx sessionctx.Context) error

InitTiKVConfigs initializes some TiKV related configs.

func (*LoadDataController) PopulateChunks

func (e *LoadDataController) PopulateChunks(ctx context.Context) (ecp map[int32]*checkpoints.EngineCheckpoint, err error)

PopulateChunks populates chunks from table regions. in dist framework, this should be done in the tidb node which is responsible for splitting job into subtasks then table-importer handles data belongs to the subtask.

func (*LoadDataController) SetExecuteNodeCnt

func (e *LoadDataController) SetExecuteNodeCnt(cnt int)

SetExecuteNodeCnt sets the execute node count.

type LoadDataReaderInfo

type LoadDataReaderInfo struct {
	// Opener can be called at needed to get a io.ReadSeekCloser. It will only
	// be called once.
	Opener func(ctx context.Context) (io.ReadSeekCloser, error)
	// Remote is not nil only if load from cloud storage.
	Remote *mydump.SourceFileMeta
}

LoadDataReaderInfo provides information for a data reader of LOAD DATA.

type Plan

type Plan struct {
	DBName string
	DBID   int64
	// TableInfo is the table info we used during import, we might change it
	// if add index by SQL is enabled(it's disabled now).
	TableInfo *model.TableInfo
	// DesiredTableInfo is the table info before import, and the desired table info
	// after import.
	DesiredTableInfo *model.TableInfo

	Path string
	// only effective when data source is file.
	Format string
	// Data interpretation is restrictive if the SQL mode is restrictive and neither
	// the IGNORE nor the LOCAL modifier is specified. Errors terminate the load
	// operation.
	// ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments
	Restrictive bool

	SQLMode mysql.SQLMode
	// Charset is the charset of the data file when file is CSV or TSV.
	// it might be nil when using LOAD DATA and no charset is specified.
	// for IMPORT INTO, it is always non-nil.
	Charset          *string
	ImportantSysVars map[string]string

	// used for LOAD DATA and CSV format of IMPORT INTO
	FieldNullDef []string
	// this is not used in IMPORT INTO
	NullValueOptEnclosed bool
	// LinesStartingBy is not used in IMPORT INTO
	// FieldsOptEnclosed is not used in either IMPORT INTO or LOAD DATA
	plannercore.LineFieldsInfo
	IgnoreLines uint64

	DiskQuota             config.ByteSize
	Checksum              config.PostOpLevel
	ThreadCnt             int
	MaxWriteSpeed         config.ByteSize
	SplitFile             bool
	MaxRecordedErrors     int64
	Detached              bool
	DisableTiKVImportMode bool
	MaxEngineSize         config.ByteSize
	CloudStorageURI       string
	DisablePrecheck       bool

	// used for checksum in physical mode
	DistSQLScanConcurrency int

	// todo: remove it when load data code is reverted.
	InImportInto   bool
	DataSourceType DataSourceType
	// only initialized for IMPORT INTO, used when creating job.
	Parameters *ImportParameters `json:"-"`
	// the user who executes the statement, in the form of user@host
	// only initialized for IMPORT INTO
	User string `json:"-"`

	IsRaftKV2 bool
	// total data file size in bytes.
	TotalFileSize int64
	// used in tests to force enable merge-step when using global sort.
	ForceMergeStep bool
}

Plan describes the plan of LOAD DATA and IMPORT INTO.

func NewImportPlan

func NewImportPlan(ctx context.Context, userSctx sessionctx.Context, plan *plannercore.ImportInto, tbl table.Table) (*Plan, error)

NewImportPlan creates a new import into plan.

func NewPlanFromLoadDataPlan

func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.LoadData) (*Plan, error)

NewPlanFromLoadDataPlan creates a import plan from LOAD DATA.

func (*Plan) IsGlobalSort

func (p *Plan) IsGlobalSort() bool

IsGlobalSort returns true if we sort data on global storage.

func (*Plan) IsLocalSort

func (p *Plan) IsLocalSort() bool

IsLocalSort returns true if we sort data on local disk.

type Progress

type Progress struct {
	// contains filtered or unexported fields
}

Progress is the progress of the IMPORT INTO task.

func NewProgress

func NewProgress() *Progress

NewProgress creates a new Progress.

func (*Progress) AddColSize

func (p *Progress) AddColSize(colSizeMap map[int64]int64)

AddColSize adds the size of the column to the progress.

func (*Progress) GetColSize

func (p *Progress) GetColSize() map[int64]int64

GetColSize returns the size of the column.

type QueryRow

type QueryRow struct {
	ID   int64
	Data []types.Datum
}

QueryRow is a row from query result.

type TableImporter

type TableImporter struct {
	*LoadDataController
	// contains filtered or unexported fields
}

TableImporter is a table importer.

func NewTableImporter

func NewTableImporter(
	ctx context.Context,
	e *LoadDataController,
	id string,
	kvStore tidbkv.Storage,
) (ti *TableImporter, err error)

NewTableImporter creates a new table importer.

func NewTableImporterForTest

func NewTableImporterForTest(ctx context.Context, e *LoadDataController, id string, helper local.StoreHelper) (*TableImporter, error)

NewTableImporterForTest creates a new table importer for test.

func (*TableImporter) Allocators

func (ti *TableImporter) Allocators() autoid.Allocators

Allocators returns allocators used to record max used ID, i.e. PanickingAllocators.

func (*TableImporter) Backend

func (ti *TableImporter) Backend() *local.Backend

Backend returns the backend of the importer.

func (*TableImporter) CheckDiskQuota

func (ti *TableImporter) CheckDiskQuota(ctx context.Context)

CheckDiskQuota checks disk quota.

func (*TableImporter) Close

func (ti *TableImporter) Close() error

Close implements the io.Closer interface.

func (*TableImporter) GetKeySpace

func (ti *TableImporter) GetKeySpace() []byte

GetKeySpace gets the keyspace of the kv store.

func (*TableImporter) ImportAndCleanup

func (ti *TableImporter) ImportAndCleanup(ctx context.Context, closedEngine *backend.ClosedEngine) (int64, error)

ImportAndCleanup imports the engine and cleanup the engine data.

func (*TableImporter) ImportSelectedRows

func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.Context) (*JobImportResult, error)

ImportSelectedRows imports selected rows.

func (*TableImporter) OpenDataEngine

func (ti *TableImporter) OpenDataEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)

OpenDataEngine opens a data engine.

func (*TableImporter) OpenIndexEngine

func (ti *TableImporter) OpenIndexEngine(ctx context.Context, engineID int32) (*backend.OpenedEngine, error)

OpenIndexEngine opens an index engine.

func (*TableImporter) SetSelectedRowCh

func (ti *TableImporter) SetSelectedRowCh(ch chan QueryRow)

SetSelectedRowCh sets the channel to receive selected rows.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL