storage

package
v1.1.0-beta.0...-35f329d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// TaskColumns is the columns for task.
	// TODO: dispatcher_id will update to scheduler_id later
	TaskColumns = basicTaskColumns + `, t.start_time, t.state_update_time, t.meta, t.dispatcher_id, t.error, t.modify_params`
	// InsertTaskColumns is the columns used in insert task.
	InsertTaskColumns = `task_key, type, state, priority, concurrency, step, meta, create_time, target_scope`

	// SubtaskColumns is the columns for subtask.
	SubtaskColumns = basicSubtaskColumns + `, state_update_time, meta, summary`
	// InsertSubtaskColumns is the columns used in insert subtask.
	InsertSubtaskColumns = `step, task_key, exec_id, meta, state, type, concurrency, ordinal, create_time, checkpoint, summary`
)

Variables

View Source
var (

	// ErrUnstableSubtasks is the error when we detected that the subtasks are
	// unstable, i.e. count, order and content of the subtasks are changed on
	// different call.
	ErrUnstableSubtasks = errors.New("unstable subtasks")

	// ErrTaskNotFound is the error when we can't found task.
	// i.e. TransferTasks2History move task from tidb_global_task to tidb_global_task_history.
	ErrTaskNotFound = errors.New("task not found")

	// ErrTaskAlreadyExists is the error when we submit a task with the same task key.
	// i.e. SubmitTask in handle may submit a task twice.
	ErrTaskAlreadyExists = errors.New("task already exists")

	// ErrTaskStateNotAllow is the error when the task state is not allowed to do the operation.
	ErrTaskStateNotAllow = errors.New("task state not allow to do the operation")

	// ErrTaskChanged is the error when task changed by other operation.
	ErrTaskChanged = errors.New("task changed by other operation")

	// ErrSubtaskNotFound is the error when can't find subtask by subtask_id and execId,
	// i.e. scheduler change the subtask's execId when subtask need to balance to other nodes.
	ErrSubtaskNotFound = errors.New("subtask not found")
)
View Source
var TestChannel = make(chan struct{})

TestChannel is used for test.

View Source
var (
	// TestLastTaskID is used for test to set the last task ID.
	TestLastTaskID atomic.Int64
)

Functions

func Row2SubTask

func Row2SubTask(r chunk.Row) *proto.Subtask

Row2SubTask converts a row to a subtask.

func Row2Task

func Row2Task(r chunk.Row) *proto.Task

Row2Task converts a row to a task.

func SetTaskManager

func SetTaskManager(is *TaskManager)

SetTaskManager sets the task manager.

Types

type SessionExecutor

type SessionExecutor interface {
	// WithNewSession executes the function with a new session.
	WithNewSession(fn func(se sessionctx.Context) error) error
	// WithNewTxn executes the fn in a new transaction.
	WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error
}

SessionExecutor defines the interface for executing SQLs in a session.

type TaskExecInfo

type TaskExecInfo struct {
	*proto.TaskBase
	// SubtaskConcurrency is the concurrency of subtask in current task step.
	// TODO: will be used when support subtask have smaller concurrency than task,
	// TODO: such as post-process of import-into. Also remember the 'modifying' state
	// also update subtask concurrency.
	// TODO: we might need create one task executor for each step in this case, to alloc
	// TODO: minimal resource
	SubtaskConcurrency int
}

TaskExecInfo is the execution information of a task, on some exec node.

type TaskHandle

type TaskHandle interface {
	// GetPreviousSubtaskMetas gets previous subtask metas.
	GetPreviousSubtaskMetas(taskID int64, step proto.Step) ([][]byte, error)
	SessionExecutor
}

TaskHandle provides the interface for operations needed by Scheduler. Then we can use scheduler's function in Scheduler interface.

type TaskManager

type TaskManager struct {
	// contains filtered or unexported fields
}

TaskManager is the manager of task and subtask.

func GetTaskManager

func GetTaskManager() (*TaskManager, error)

GetTaskManager gets the task manager.

func NewTaskManager

func NewTaskManager(sePool util.SessionPool) *TaskManager

NewTaskManager creates a new task manager.

func (*TaskManager) AdjustTaskOverflowConcurrency

func (mgr *TaskManager) AdjustTaskOverflowConcurrency(ctx context.Context, se sessionctx.Context) error

AdjustTaskOverflowConcurrency change the task concurrency to a max value supported by current cluster. This is a workaround for an upgrade bug: in v7.5.x, the task concurrency is hard-coded to 16, resulting in a stuck issue if the new version TiDB has less than 16 CPU count. We don't adjust the concurrency in subtask table because this field does not exist in v7.5.0. For details, see https://github.com/pingcap/tidb/issues/50894. For the following versions, there is a check when submitting a new task. This function should be a no-op.

func (*TaskManager) CancelSubtask

func (mgr *TaskManager) CancelSubtask(ctx context.Context, execID string, taskID int64) error

CancelSubtask update the task's subtasks' state to canceled.

func (*TaskManager) CancelTask

func (mgr *TaskManager) CancelTask(ctx context.Context, taskID int64) error

CancelTask cancels task.

func (*TaskManager) CancelTaskByKeySession

func (*TaskManager) CancelTaskByKeySession(ctx context.Context, se sessionctx.Context, taskKey string) error

CancelTaskByKeySession cancels task by key using input session.

func (*TaskManager) CreateTask

func (mgr *TaskManager) CreateTask(ctx context.Context, key string, tp proto.TaskType, concurrency int, targetScope string, meta []byte) (taskID int64, err error)

CreateTask adds a new task to task table.

func (*TaskManager) CreateTaskWithSession

func (mgr *TaskManager) CreateTaskWithSession(
	ctx context.Context,
	se sessionctx.Context,
	key string,
	tp proto.TaskType,
	concurrency int,
	targetScope string,
	meta []byte,
) (taskID int64, err error)

CreateTaskWithSession adds a new task to task table with session.

func (*TaskManager) DeleteDeadNodes

func (mgr *TaskManager) DeleteDeadNodes(ctx context.Context, nodes []string) error

DeleteDeadNodes deletes the dead nodes from mysql.dist_framework_meta.

func (*TaskManager) ExecuteSQLWithNewSession

func (mgr *TaskManager) ExecuteSQLWithNewSession(ctx context.Context, sql string, args ...any) (rs []chunk.Row, err error)

ExecuteSQLWithNewSession executes one SQL with new session.

func (*TaskManager) FailSubtask

func (mgr *TaskManager) FailSubtask(ctx context.Context, execID string, taskID int64, err error) error

FailSubtask update the task's subtask state to failed and set the err.

func (*TaskManager) FailTask

func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error

FailTask implements the scheduler.TaskManager interface.

func (*TaskManager) FinishSubtask

func (mgr *TaskManager) FinishSubtask(ctx context.Context, execID string, id int64, meta []byte) error

FinishSubtask updates the subtask meta and mark state to succeed.

func (*TaskManager) GCSubtasks

func (mgr *TaskManager) GCSubtasks(ctx context.Context) error

GCSubtasks deletes the history subtask which is older than the given days.

func (*TaskManager) GetActiveSubtasks

func (mgr *TaskManager) GetActiveSubtasks(ctx context.Context, taskID int64) ([]*proto.SubtaskBase, error)

GetActiveSubtasks implements TaskManager.GetActiveSubtasks.

func (*TaskManager) GetAllNodes

func (mgr *TaskManager) GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error)

GetAllNodes gets nodes in dist_framework_meta.

func (*TaskManager) GetAllSubtasks

func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.SubtaskBase, error)

GetAllSubtasks gets all subtasks with basic columns.

func (*TaskManager) GetAllSubtasksByStepAndState

func (mgr *TaskManager) GetAllSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.SubtaskState) ([]*proto.Subtask, error)

GetAllSubtasksByStepAndState gets the subtask by step and state.

func (*TaskManager) GetCPUCountOfNode

func (mgr *TaskManager) GetCPUCountOfNode(ctx context.Context) (int, error)

GetCPUCountOfNode gets the cpu count of node.

func (*TaskManager) GetFirstSubtaskInStates

func (mgr *TaskManager) GetFirstSubtaskInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (*proto.Subtask, error)

GetFirstSubtaskInStates gets the first subtask by given states.

func (*TaskManager) GetSubtaskCntGroupByStates

func (mgr *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error)

GetSubtaskCntGroupByStates gets the subtask count by states.

func (*TaskManager) GetSubtaskErrors

func (mgr *TaskManager) GetSubtaskErrors(ctx context.Context, taskID int64) ([]error, error)

GetSubtaskErrors gets subtasks' errors.

func (*TaskManager) GetSubtaskRowCount

func (mgr *TaskManager) GetSubtaskRowCount(ctx context.Context, taskID int64, step proto.Step) (int64, error)

GetSubtaskRowCount gets the subtask row count.

func (*TaskManager) GetSubtasksByExecIDAndStepAndStates

func (mgr *TaskManager) GetSubtasksByExecIDAndStepAndStates(ctx context.Context, execID string, taskID int64, step proto.Step, states ...proto.SubtaskState) ([]*proto.Subtask, error)

GetSubtasksByExecIDAndStepAndStates gets all subtasks by given states on one node.

func (*TaskManager) GetSubtasksWithHistory

func (mgr *TaskManager) GetSubtasksWithHistory(ctx context.Context, taskID int64, step proto.Step) ([]*proto.Subtask, error)

GetSubtasksWithHistory gets the subtasks from tidb_global_task and tidb_global_task_history.

func (*TaskManager) GetTaskBaseByID

func (mgr *TaskManager) GetTaskBaseByID(ctx context.Context, taskID int64) (task *proto.TaskBase, err error)

GetTaskBaseByID implements the TaskManager.GetTaskBaseByID interface.

func (*TaskManager) GetTaskBaseByIDWithHistory

func (mgr *TaskManager) GetTaskBaseByIDWithHistory(ctx context.Context, taskID int64) (task *proto.TaskBase, err error)

GetTaskBaseByIDWithHistory gets the task by the task ID from both tidb_global_task and tidb_global_task_history.

func (*TaskManager) GetTaskBaseByKeyWithHistory

func (mgr *TaskManager) GetTaskBaseByKeyWithHistory(ctx context.Context, key string) (task *proto.TaskBase, err error)

GetTaskBaseByKeyWithHistory gets the task base from history table by the task key.

func (*TaskManager) GetTaskByID

func (mgr *TaskManager) GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)

GetTaskByID gets the task by the task ID.

func (*TaskManager) GetTaskByIDWithHistory

func (mgr *TaskManager) GetTaskByIDWithHistory(ctx context.Context, taskID int64) (task *proto.Task, err error)

GetTaskByIDWithHistory gets the task by the task ID from both tidb_global_task and tidb_global_task_history.

func (*TaskManager) GetTaskByKey

func (mgr *TaskManager) GetTaskByKey(ctx context.Context, key string) (task *proto.Task, err error)

GetTaskByKey gets the task by the task key.

func (*TaskManager) GetTaskByKeyWithHistory

func (mgr *TaskManager) GetTaskByKeyWithHistory(ctx context.Context, key string) (task *proto.Task, err error)

GetTaskByKeyWithHistory gets the task from history table by the task key.

func (*TaskManager) GetTaskExecInfoByExecID

func (mgr *TaskManager) GetTaskExecInfoByExecID(ctx context.Context, execID string) ([]*TaskExecInfo, error)

GetTaskExecInfoByExecID implements the scheduler.TaskManager interface.

func (*TaskManager) GetTasksInStates

func (mgr *TaskManager) GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)

GetTasksInStates gets the tasks in the states(order by priority asc, create_time acs, id asc).

func (*TaskManager) GetTopUnfinishedTasks

func (mgr *TaskManager) GetTopUnfinishedTasks(ctx context.Context) ([]*proto.TaskBase, error)

GetTopUnfinishedTasks implements the scheduler.TaskManager interface.

func (*TaskManager) GetUsedSlotsOnNodes

func (mgr *TaskManager) GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error)

GetUsedSlotsOnNodes implements the scheduler.TaskManager interface.

func (*TaskManager) InitMeta

func (mgr *TaskManager) InitMeta(ctx context.Context, tidbID string, role string) error

InitMeta insert the manager information into dist_framework_meta.

func (*TaskManager) InitMetaSession

func (*TaskManager) InitMetaSession(ctx context.Context, se sessionctx.Context, execID string, role string) error

InitMetaSession insert the manager information into dist_framework_meta. if the record exists, update the cpu_count and role.

func (*TaskManager) ModifiedTask

func (mgr *TaskManager) ModifiedTask(ctx context.Context, task *proto.Task) error

ModifiedTask implements the scheduler.TaskManager interface.

func (*TaskManager) ModifyTaskByID

func (mgr *TaskManager) ModifyTaskByID(ctx context.Context, taskID int64, param *proto.ModifyParam) error

ModifyTaskByID modifies the task by the task ID.

func (*TaskManager) PauseSubtasks

func (mgr *TaskManager) PauseSubtasks(ctx context.Context, execID string, taskID int64) error

PauseSubtasks update all running/pending subtasks to pasued state.

func (*TaskManager) PauseTask

func (mgr *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, error)

PauseTask pauses the task.

func (*TaskManager) PausedTask

func (mgr *TaskManager) PausedTask(ctx context.Context, taskID int64) error

PausedTask update the task state from pausing to paused.

func (*TaskManager) RecoverMeta

func (mgr *TaskManager) RecoverMeta(ctx context.Context, execID string, role string) error

RecoverMeta insert the manager information into dist_framework_meta. if the record exists, update the cpu_count. Don't update role for we only update it in `set global tidb_service_scope`. if not there might has a data race.

func (*TaskManager) ResumeSubtasks

func (mgr *TaskManager) ResumeSubtasks(ctx context.Context, taskID int64) error

ResumeSubtasks update all paused subtasks to pending state.

func (*TaskManager) ResumeTask

func (mgr *TaskManager) ResumeTask(ctx context.Context, taskKey string) (bool, error)

ResumeTask resumes the task.

func (*TaskManager) ResumedTask

func (mgr *TaskManager) ResumedTask(ctx context.Context, taskID int64) error

ResumedTask implements the scheduler.TaskManager interface.

func (*TaskManager) RevertTask

func (mgr *TaskManager) RevertTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error

RevertTask implements the scheduler.TaskManager interface.

func (*TaskManager) RevertedTask

func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error

RevertedTask implements the scheduler.TaskManager interface.

func (*TaskManager) RunningSubtasksBack2Pending

func (mgr *TaskManager) RunningSubtasksBack2Pending(ctx context.Context, subtasks []*proto.SubtaskBase) error

RunningSubtasksBack2Pending implements the taskexecutor.TaskTable interface.

func (*TaskManager) StartSubtask

func (mgr *TaskManager) StartSubtask(ctx context.Context, subtaskID int64, execID string) error

StartSubtask updates the subtask state to running.

func (*TaskManager) SucceedTask

func (mgr *TaskManager) SucceedTask(ctx context.Context, taskID int64) error

SucceedTask update task state from running to succeed.

func (*TaskManager) SwitchTaskStep

func (mgr *TaskManager) SwitchTaskStep(
	ctx context.Context,
	task *proto.Task,
	nextState proto.TaskState,
	nextStep proto.Step,
	subtasks []*proto.Subtask,
) error

SwitchTaskStep implements the scheduler.TaskManager interface.

func (*TaskManager) SwitchTaskStepInBatch

func (mgr *TaskManager) SwitchTaskStepInBatch(
	ctx context.Context,
	task *proto.Task,
	nextState proto.TaskState,
	nextStep proto.Step,
	subtasks []*proto.Subtask,
) error

SwitchTaskStepInBatch implements the scheduler.TaskManager interface.

func (*TaskManager) TransferSubtasks2HistoryWithSession

func (*TaskManager) TransferSubtasks2HistoryWithSession(ctx context.Context, se sessionctx.Context, taskID int64) error

TransferSubtasks2HistoryWithSession transfer the selected subtasks into tidb_background_subtask_history table by taskID.

func (*TaskManager) TransferTasks2History

func (mgr *TaskManager) TransferTasks2History(ctx context.Context, tasks []*proto.Task) error

TransferTasks2History transfer the selected tasks into tidb_global_task_history table by taskIDs.

func (*TaskManager) UpdateSubtaskRowCount

func (mgr *TaskManager) UpdateSubtaskRowCount(ctx context.Context, subtaskID int64, rowCount int64) error

UpdateSubtaskRowCount updates the subtask row count.

func (*TaskManager) UpdateSubtaskStateAndError

func (mgr *TaskManager) UpdateSubtaskStateAndError(
	ctx context.Context,
	execID string,
	id int64, state proto.SubtaskState, subTaskErr error) error

UpdateSubtaskStateAndError updates the subtask state.

func (*TaskManager) UpdateSubtasksExecIDs

func (mgr *TaskManager) UpdateSubtasksExecIDs(ctx context.Context, subtasks []*proto.SubtaskBase) error

UpdateSubtasksExecIDs update subtasks' execID.

func (*TaskManager) WithNewSession

func (mgr *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) error

WithNewSession executes the function with a new session.

func (*TaskManager) WithNewTxn

func (mgr *TaskManager) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error

WithNewTxn executes the fn in a new transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL