importinto

package
v1.1.0-beta.0...-c0b86a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2024 License: Apache-2.0 Imports: 69 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// StepImport we sort source data and ingest it into TiKV in this step.
	StepImport proto.Step = 1
	// StepPostProcess we verify checksum and add index in this step.
	StepPostProcess proto.Step = 2
	// StepEncodeAndSort encode source data and write sorted kv into global storage.
	StepEncodeAndSort proto.Step = 3
	// StepMergeSort merge sorted kv from global storage, so we can have better
	// read performance during StepWriteAndIngest.
	// depends on how much kv files are overlapped, there's might 0 subtasks
	// in this step.
	StepMergeSort proto.Step = 4
	// StepWriteAndIngest write sorted kv into TiKV and ingest it.
	StepWriteAndIngest proto.Step = 5
)

Steps of IMPORT INTO, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order: - local sort: StepInit -> StepImport -> StepPostProcess -> StepDone - global sort: StepInit -> StepEncodeAndSort -> StepMergeSort -> StepWriteAndIngest -> StepPostProcess -> StepDone

Variables

View Source
var NewTaskRegisterWithTTL = utils.NewTaskRegisterWithTTL

NewTaskRegisterWithTTL is the ctor for TaskRegister. It is exported for testing.

View Source
var TestSyncChan = make(chan struct{})

TestSyncChan is used to test.

Functions

func GetTaskImportedRows

func GetTaskImportedRows(ctx context.Context, jobID int64) (uint64, error)

GetTaskImportedRows gets the number of imported rows of a job. Note: for finished job, we can get the number of imported rows from task meta.

func TaskKey

func TaskKey(jobID int64) string

TaskKey returns the task key for a job.

func TestChecksumTable

func TestChecksumTable(ctx context.Context, executor storage.SessionExecutor, taskMeta *TaskMeta, logger *zap.Logger) (*local.RemoteChecksum, error)

TestChecksumTable is used to test checksum table in unit test.

Types

type Checksum

type Checksum struct {
	Sum  uint64
	KVs  uint64
	Size uint64
}

Checksum records the checksum information.

type Chunk

type Chunk struct {
	Path         string
	FileSize     int64
	Offset       int64
	EndOffset    int64
	PrevRowIDMax int64
	RowIDMax     int64
	Type         mydump.SourceType
	Compression  mydump.Compression
	Timestamp    int64
}

Chunk records the chunk information.

type DistImporter

type DistImporter struct {
	*importer.JobImportParam
	// contains filtered or unexported fields
}

DistImporter is a JobImporter for distributed IMPORT INTO.

func NewDistImporter

func NewDistImporter(param *importer.JobImportParam, plan *importer.Plan, stmt string, sourceFileSize int64) (*DistImporter, error)

NewDistImporter creates a new DistImporter.

func NewDistImporterCurrNode

func NewDistImporterCurrNode(param *importer.JobImportParam, plan *importer.Plan, stmt string, sourceFileSize int64) (*DistImporter, error)

NewDistImporterCurrNode creates a new DistImporter to import data on current node.

func NewDistImporterServerFile

func NewDistImporterServerFile(param *importer.JobImportParam, plan *importer.Plan, stmt string, ecp map[int32]*checkpoints.EngineCheckpoint, sourceFileSize int64) (*DistImporter, error)

NewDistImporterServerFile creates a new DistImporter to import given files on current node. we also run import on current node. todo: merge all 3 ctor into one.

func (*DistImporter) Close

func (*DistImporter) Close() error

Close implements the io.Closer interface.

func (*DistImporter) Import

func (*DistImporter) Import()

Import implements JobImporter.Import.

func (*DistImporter) ImportTask

func (ti *DistImporter) ImportTask(task *proto.Task)

ImportTask import task.

func (*DistImporter) JobID

func (ti *DistImporter) JobID() int64

JobID returns the job id.

func (*DistImporter) Param

func (ti *DistImporter) Param() *importer.JobImportParam

Param implements JobImporter.Param.

func (*DistImporter) Result

Result implements JobImporter.Result.

func (*DistImporter) SubmitTask

func (ti *DistImporter) SubmitTask(ctx context.Context) (int64, *proto.Task, error)

SubmitTask submits a task to the distribute framework.

type ImportCleanUpS3

type ImportCleanUpS3 struct {
}

ImportCleanUpS3 implements dispatcher.CleanUpRoutine.

func (*ImportCleanUpS3) CleanUp

func (*ImportCleanUpS3) CleanUp(ctx context.Context, task *proto.Task) error

CleanUp implements the CleanUpRoutine.CleanUp interface.

type ImportDispatcherExt

type ImportDispatcherExt struct {
	GlobalSort bool
	// contains filtered or unexported fields
}

ImportDispatcherExt is an extension of ImportDispatcher, exported for test.

func (*ImportDispatcherExt) GetEligibleInstances

func (*ImportDispatcherExt) GetEligibleInstances(ctx context.Context, gTask *proto.Task) ([]*infosync.ServerInfo, bool, error)

GetEligibleInstances implements dispatcher.Extension interface.

func (*ImportDispatcherExt) GetNextStep

func (dsp *ImportDispatcherExt) GetNextStep(task *proto.Task) proto.Step

GetNextStep implements dispatcher.Extension interface.

func (*ImportDispatcherExt) IsRetryableErr

func (*ImportDispatcherExt) IsRetryableErr(error) bool

IsRetryableErr implements dispatcher.Extension interface.

func (*ImportDispatcherExt) OnDone

func (dsp *ImportDispatcherExt) OnDone(ctx context.Context, handle dispatcher.TaskHandle, task *proto.Task) error

OnDone implements dispatcher.Extension interface.

func (*ImportDispatcherExt) OnNextSubtasksBatch

func (dsp *ImportDispatcherExt) OnNextSubtasksBatch(
	ctx context.Context,
	taskHandle dispatcher.TaskHandle,
	gTask *proto.Task,
	serverInfos []*infosync.ServerInfo,
	nextStep proto.Step,
) (
	resSubtaskMeta [][]byte, err error)

OnNextSubtasksBatch generate batch of next stage's plan.

func (*ImportDispatcherExt) OnTick

func (dsp *ImportDispatcherExt) OnTick(ctx context.Context, task *proto.Task)

OnTick implements dispatcher.Extension interface.

type ImportSpec

type ImportSpec struct {
	ID     int32
	Plan   importer.Plan
	Chunks []Chunk
}

ImportSpec is the specification of an import pipeline.

func (*ImportSpec) ToSubtaskMeta

func (s *ImportSpec) ToSubtaskMeta(planner.PlanCtx) ([]byte, error)

ToSubtaskMeta converts the import spec to subtask meta.

type ImportStepMeta

type ImportStepMeta struct {
	// this is the engine ID, not the id in tidb_background_subtask table.
	ID       int32
	Chunks   []Chunk
	Checksum Checksum
	Result   Result
	// MaxIDs stores the max id that have been used during encoding for each allocator type.
	// the max id is same among all allocator types for now, since we're using same base, see
	// NewPanickingAllocators for more info.
	MaxIDs map[autoid.AllocatorType]int64

	SortedDataMeta *external.SortedKVMeta
	// SortedIndexMetas is a map from index id to its sorted kv meta.
	SortedIndexMetas map[int64]*external.SortedKVMeta
}

ImportStepMeta is the meta of import step. Dispatcher will split the task into subtasks(FileInfos -> Chunks) All the field should be serializable.

type LogicalPlan

type LogicalPlan struct {
	JobID             int64
	Plan              importer.Plan
	Stmt              string
	EligibleInstances []*infosync.ServerInfo
	ChunkMap          map[int32][]Chunk
}

LogicalPlan represents a logical plan for import into.

func (*LogicalPlan) FromTaskMeta

func (p *LogicalPlan) FromTaskMeta(bs []byte) error

FromTaskMeta converts the task meta to logical plan.

func (*LogicalPlan) ToPhysicalPlan

func (p *LogicalPlan) ToPhysicalPlan(planCtx planner.PlanCtx) (*planner.PhysicalPlan, error)

ToPhysicalPlan converts the logical plan to physical plan.

func (*LogicalPlan) ToTaskMeta

func (p *LogicalPlan) ToTaskMeta() ([]byte, error)

ToTaskMeta converts the logical plan to task meta.

type MergeSortSpec

type MergeSortSpec struct {
	*MergeSortStepMeta
}

MergeSortSpec is the specification of a merge-sort pipeline.

func (*MergeSortSpec) ToSubtaskMeta

func (s *MergeSortSpec) ToSubtaskMeta(planner.PlanCtx) ([]byte, error)

ToSubtaskMeta converts the merge-sort spec to subtask meta.

type MergeSortStepMeta

type MergeSortStepMeta struct {
	// KVGroup is the group name of the sorted kv, either dataKVGroup or index-id.
	KVGroup               string   `json:"kv-group"`
	DataFiles             []string `json:"data-files"`
	external.SortedKVMeta `json:"sorted-kv-meta"`
}

MergeSortStepMeta is the meta of merge sort step.

type MiniTaskExecutor

type MiniTaskExecutor interface {
	Run(ctx context.Context, dataWriter, indexWriter backend.EngineWriter) error
}

MiniTaskExecutor is the interface for a minimal task executor. exported for testing.

type PostProcessSpec

type PostProcessSpec struct {
	// for checksum request
	Schema string
	Table  string
}

PostProcessSpec is the specification of a post process pipeline.

func (*PostProcessSpec) ToSubtaskMeta

func (*PostProcessSpec) ToSubtaskMeta(planCtx planner.PlanCtx) ([]byte, error)

ToSubtaskMeta converts the post process spec to subtask meta.

type PostProcessStepMeta

type PostProcessStepMeta struct {
	// accumulated checksum of all subtasks in import step.
	Checksum Checksum
	// MaxIDs of max all max-ids of subtasks in import step.
	MaxIDs map[autoid.AllocatorType]int64
}

PostProcessStepMeta is the meta of post process step.

type Result

type Result struct {
	LoadedRowCnt uint64
	ColSizeMap   map[int64]int64
}

Result records the metrics information. This portion of the code may be implemented uniformly in the framework in the future.

type SharedVars

type SharedVars struct {
	TableImporter *importer.TableImporter
	DataEngine    *backend.OpenedEngine
	IndexEngine   *backend.OpenedEngine
	Progress      *asyncloaddata.Progress

	Checksum *verification.KVChecksum

	SortedDataMeta *external.SortedKVMeta
	// SortedIndexMetas is a map from index id to its sorted kv meta.
	SortedIndexMetas map[int64]*external.SortedKVMeta
	ShareMu          sync.Mutex
	// contains filtered or unexported fields
}

SharedVars is the shared variables of all minimal tasks in a subtask. This is because subtasks cannot directly obtain the results of the minimal subtask. All the fields should be concurrent safe.

type TaskMeta

type TaskMeta struct {
	// IMPORT INTO job id.
	JobID  int64
	Plan   importer.Plan
	Stmt   string
	Result Result
	// eligible instances to run this task, we run on all instances if it's empty.
	// we only need this when run IMPORT INTO without distributed option now, i.e.
	// running on the instance that initiate the IMPORT INTO.
	EligibleInstances []*infosync.ServerInfo
	// the file chunks to import, when import from server file, we need to pass those
	// files to the framework dispatcher which might run on another instance.
	// we use a map from engine ID to chunks since we need support split_file for CSV,
	// so need to split them into engines before passing to dispatcher.
	ChunkMap map[int32][]Chunk
}

TaskMeta is the task of IMPORT INTO. All the field should be serializable.

type WriteIngestSpec

type WriteIngestSpec struct {
	*WriteIngestStepMeta
}

WriteIngestSpec is the specification of a write-ingest pipeline.

func (*WriteIngestSpec) ToSubtaskMeta

func (s *WriteIngestSpec) ToSubtaskMeta(planner.PlanCtx) ([]byte, error)

ToSubtaskMeta converts the write-ingest spec to subtask meta.

type WriteIngestStepMeta

type WriteIngestStepMeta struct {
	KVGroup               string `json:"kv-group"`
	external.SortedKVMeta `json:"sorted-kv-meta"`
	DataFiles             []string `json:"data-files"`
	StatFiles             []string `json:"stat-files"`
	RangeSplitKeys        [][]byte `json:"range-split-keys"`
	RangeSplitSize        int64    `json:"range-split-size"`

	Result Result
}

WriteIngestStepMeta is the meta of write and ingest step. only used when global sort is enabled.

Directories

Path Synopsis
Code generated by MockGen.
Code generated by MockGen.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL