Documentation ¶
Index ¶
- Constants
- Variables
- func GetTaskImportedRows(ctx context.Context, jobID int64) (uint64, error)
- func TaskKey(jobID int64) string
- func TestChecksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, ...) (*local.RemoteChecksum, error)
- type Checksum
- type Chunk
- type DistImporter
- func NewDistImporter(param *importer.JobImportParam, plan *importer.Plan, stmt string, ...) (*DistImporter, error)
- func NewDistImporterCurrNode(param *importer.JobImportParam, plan *importer.Plan, stmt string, ...) (*DistImporter, error)
- func NewDistImporterServerFile(param *importer.JobImportParam, plan *importer.Plan, stmt string, ...) (*DistImporter, error)
- func (*DistImporter) Close() error
- func (*DistImporter) Import()
- func (ti *DistImporter) ImportTask(task *proto.Task)
- func (ti *DistImporter) JobID() int64
- func (ti *DistImporter) Param() *importer.JobImportParam
- func (ti *DistImporter) Result(ctx context.Context) importer.JobImportResult
- func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, error)
- type ImportCleanUpS3
- type ImportDispatcherExt
- func (*ImportDispatcherExt) GetEligibleInstances(ctx context.Context, gTask *proto.Task) ([]*infosync.ServerInfo, bool, error)
- func (dsp *ImportDispatcherExt) GetNextStep(task *proto.Task) proto.Step
- func (*ImportDispatcherExt) IsRetryableErr(error) bool
- func (dsp *ImportDispatcherExt) OnDone(ctx context.Context, handle dispatcher.TaskHandle, task *proto.Task) error
- func (dsp *ImportDispatcherExt) OnNextSubtasksBatch(ctx context.Context, taskHandle dispatcher.TaskHandle, gTask *proto.Task, ...) (resSubtaskMeta [][]byte, err error)
- func (dsp *ImportDispatcherExt) OnTick(ctx context.Context, task *proto.Task)
- type ImportSpec
- type ImportStepMeta
- type LogicalPlan
- type MergeSortSpec
- type MergeSortStepMeta
- type MiniTaskExecutor
- type PostProcessSpec
- type PostProcessStepMeta
- type Result
- type SharedVars
- type TaskMeta
- type WriteIngestSpec
- type WriteIngestStepMeta
Constants ¶
const ( // StepImport we sort source data and ingest it into TiKV in this step. StepImport proto.Step = 1 // StepPostProcess we verify checksum and add index in this step. StepPostProcess proto.Step = 2 // StepEncodeAndSort encode source data and write sorted kv into global storage. StepEncodeAndSort proto.Step = 3 // StepMergeSort merge sorted kv from global storage, so we can have better // read performance during StepWriteAndIngest. // depends on how much kv files are overlapped, there's might 0 subtasks // in this step. StepMergeSort proto.Step = 4 // StepWriteAndIngest write sorted kv into TiKV and ingest it. StepWriteAndIngest proto.Step = 5 )
Steps of IMPORT INTO, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order: - local sort: StepInit -> StepImport -> StepPostProcess -> StepDone - global sort: StepInit -> StepEncodeAndSort -> StepMergeSort -> StepWriteAndIngest -> StepPostProcess -> StepDone
Variables ¶
var NewTaskRegisterWithTTL = utils.NewTaskRegisterWithTTL
NewTaskRegisterWithTTL is the ctor for TaskRegister. It is exported for testing.
var TestSyncChan = make(chan struct{})
TestSyncChan is used to test.
Functions ¶
func GetTaskImportedRows ¶
GetTaskImportedRows gets the number of imported rows of a job. Note: for finished job, we can get the number of imported rows from task meta.
func TestChecksumTable ¶
func TestChecksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error)
TestChecksumTable is used to test checksum table in unit test.
Types ¶
type Chunk ¶
type Chunk struct { Path string FileSize int64 Offset int64 EndOffset int64 PrevRowIDMax int64 RowIDMax int64 Type mydump.SourceType Compression mydump.Compression Timestamp int64 }
Chunk records the chunk information.
type DistImporter ¶
type DistImporter struct { *importer.JobImportParam // contains filtered or unexported fields }
DistImporter is a JobImporter for distributed IMPORT INTO.
func NewDistImporter ¶
func NewDistImporter(param *importer.JobImportParam, plan *importer.Plan, stmt string, sourceFileSize int64) (*DistImporter, error)
NewDistImporter creates a new DistImporter.
func NewDistImporterCurrNode ¶
func NewDistImporterCurrNode(param *importer.JobImportParam, plan *importer.Plan, stmt string, sourceFileSize int64) (*DistImporter, error)
NewDistImporterCurrNode creates a new DistImporter to import data on current node.
func NewDistImporterServerFile ¶
func NewDistImporterServerFile(param *importer.JobImportParam, plan *importer.Plan, stmt string, ecp map[int32]*checkpoints.EngineCheckpoint, sourceFileSize int64) (*DistImporter, error)
NewDistImporterServerFile creates a new DistImporter to import given files on current node. we also run import on current node. todo: merge all 3 ctor into one.
func (*DistImporter) Close ¶
func (*DistImporter) Close() error
Close implements the io.Closer interface.
func (*DistImporter) ImportTask ¶
func (ti *DistImporter) ImportTask(task *proto.Task)
ImportTask import task.
func (*DistImporter) Param ¶
func (ti *DistImporter) Param() *importer.JobImportParam
Param implements JobImporter.Param.
func (*DistImporter) Result ¶
func (ti *DistImporter) Result(ctx context.Context) importer.JobImportResult
Result implements JobImporter.Result.
func (*DistImporter) SubmitTask ¶
SubmitTask submits a task to the distribute framework.
type ImportCleanUpS3 ¶
type ImportCleanUpS3 struct { }
ImportCleanUpS3 implements dispatcher.CleanUpRoutine.
type ImportDispatcherExt ¶
type ImportDispatcherExt struct { GlobalSort bool // contains filtered or unexported fields }
ImportDispatcherExt is an extension of ImportDispatcher, exported for test.
func (*ImportDispatcherExt) GetEligibleInstances ¶
func (*ImportDispatcherExt) GetEligibleInstances(ctx context.Context, gTask *proto.Task) ([]*infosync.ServerInfo, bool, error)
GetEligibleInstances implements dispatcher.Extension interface.
func (*ImportDispatcherExt) GetNextStep ¶
func (dsp *ImportDispatcherExt) GetNextStep(task *proto.Task) proto.Step
GetNextStep implements dispatcher.Extension interface.
func (*ImportDispatcherExt) IsRetryableErr ¶
func (*ImportDispatcherExt) IsRetryableErr(error) bool
IsRetryableErr implements dispatcher.Extension interface.
func (*ImportDispatcherExt) OnDone ¶
func (dsp *ImportDispatcherExt) OnDone(ctx context.Context, handle dispatcher.TaskHandle, task *proto.Task) error
OnDone implements dispatcher.Extension interface.
func (*ImportDispatcherExt) OnNextSubtasksBatch ¶
func (dsp *ImportDispatcherExt) OnNextSubtasksBatch( ctx context.Context, taskHandle dispatcher.TaskHandle, gTask *proto.Task, serverInfos []*infosync.ServerInfo, nextStep proto.Step, ) ( resSubtaskMeta [][]byte, err error)
OnNextSubtasksBatch generate batch of next stage's plan.
type ImportSpec ¶
ImportSpec is the specification of an import pipeline.
func (*ImportSpec) ToSubtaskMeta ¶
func (s *ImportSpec) ToSubtaskMeta(planner.PlanCtx) ([]byte, error)
ToSubtaskMeta converts the import spec to subtask meta.
type ImportStepMeta ¶
type ImportStepMeta struct { // this is the engine ID, not the id in tidb_background_subtask table. ID int32 Chunks []Chunk Checksum Checksum Result Result // MaxIDs stores the max id that have been used during encoding for each allocator type. // the max id is same among all allocator types for now, since we're using same base, see // NewPanickingAllocators for more info. MaxIDs map[autoid.AllocatorType]int64 SortedDataMeta *external.SortedKVMeta // SortedIndexMetas is a map from index id to its sorted kv meta. SortedIndexMetas map[int64]*external.SortedKVMeta }
ImportStepMeta is the meta of import step. Dispatcher will split the task into subtasks(FileInfos -> Chunks) All the field should be serializable.
type LogicalPlan ¶
type LogicalPlan struct { JobID int64 Plan importer.Plan Stmt string EligibleInstances []*infosync.ServerInfo ChunkMap map[int32][]Chunk }
LogicalPlan represents a logical plan for import into.
func (*LogicalPlan) FromTaskMeta ¶
func (p *LogicalPlan) FromTaskMeta(bs []byte) error
FromTaskMeta converts the task meta to logical plan.
func (*LogicalPlan) ToPhysicalPlan ¶
func (p *LogicalPlan) ToPhysicalPlan(planCtx planner.PlanCtx) (*planner.PhysicalPlan, error)
ToPhysicalPlan converts the logical plan to physical plan.
func (*LogicalPlan) ToTaskMeta ¶
func (p *LogicalPlan) ToTaskMeta() ([]byte, error)
ToTaskMeta converts the logical plan to task meta.
type MergeSortSpec ¶
type MergeSortSpec struct {
*MergeSortStepMeta
}
MergeSortSpec is the specification of a merge-sort pipeline.
func (*MergeSortSpec) ToSubtaskMeta ¶
func (s *MergeSortSpec) ToSubtaskMeta(planner.PlanCtx) ([]byte, error)
ToSubtaskMeta converts the merge-sort spec to subtask meta.
type MergeSortStepMeta ¶
type MergeSortStepMeta struct { // KVGroup is the group name of the sorted kv, either dataKVGroup or index-id. KVGroup string `json:"kv-group"` DataFiles []string `json:"data-files"` external.SortedKVMeta `json:"sorted-kv-meta"` }
MergeSortStepMeta is the meta of merge sort step.
type MiniTaskExecutor ¶
type MiniTaskExecutor interface {
Run(ctx context.Context, dataWriter, indexWriter backend.EngineWriter) error
}
MiniTaskExecutor is the interface for a minimal task executor. exported for testing.
type PostProcessSpec ¶
PostProcessSpec is the specification of a post process pipeline.
func (*PostProcessSpec) ToSubtaskMeta ¶
func (*PostProcessSpec) ToSubtaskMeta(planCtx planner.PlanCtx) ([]byte, error)
ToSubtaskMeta converts the post process spec to subtask meta.
type PostProcessStepMeta ¶
type PostProcessStepMeta struct { // accumulated checksum of all subtasks in import step. Checksum Checksum // MaxIDs of max all max-ids of subtasks in import step. MaxIDs map[autoid.AllocatorType]int64 }
PostProcessStepMeta is the meta of post process step.
type Result ¶
Result records the metrics information. This portion of the code may be implemented uniformly in the framework in the future.
type SharedVars ¶
type SharedVars struct { // SortedIndexMetas is a map from index id to its sorted kv meta. // contains filtered or unexported fields }
SharedVars is the shared variables of all minimal tasks in a subtask. This is because subtasks cannot directly obtain the results of the minimal subtask. All the fields should be concurrent safe.
type TaskMeta ¶
type TaskMeta struct { // IMPORT INTO job id. JobID int64 Plan importer.Plan Stmt string Result Result // eligible instances to run this task, we run on all instances if it's empty. // we only need this when run IMPORT INTO without distributed option now, i.e. // running on the instance that initiate the IMPORT INTO. EligibleInstances []*infosync.ServerInfo // the file chunks to import, when import from server file, we need to pass those // files to the framework dispatcher which might run on another instance. // we use a map from engine ID to chunks since we need support split_file for CSV, // so need to split them into engines before passing to dispatcher. ChunkMap map[int32][]Chunk }
TaskMeta is the task of IMPORT INTO. All the field should be serializable.
type WriteIngestSpec ¶
type WriteIngestSpec struct {
*WriteIngestStepMeta
}
WriteIngestSpec is the specification of a write-ingest pipeline.
func (*WriteIngestSpec) ToSubtaskMeta ¶
func (s *WriteIngestSpec) ToSubtaskMeta(planner.PlanCtx) ([]byte, error)
ToSubtaskMeta converts the write-ingest spec to subtask meta.
type WriteIngestStepMeta ¶
type WriteIngestStepMeta struct { KVGroup string `json:"kv-group"` external.SortedKVMeta `json:"sorted-kv-meta"` DataFiles []string `json:"data-files"` StatFiles []string `json:"stat-files"` RangeSplitKeys [][]byte `json:"range-split-keys"` RangeSplitSize int64 `json:"range-split-size"` Result Result }
WriteIngestStepMeta is the meta of write and ingest step. only used when global sort is enabled.