Documentation ¶
Index ¶
- Constants
- Variables
- func Row2SubTask(r chunk.Row) *proto.Subtask
- func Row2Task(r chunk.Row) *proto.Task
- func SetTaskManager(is *TaskManager)
- type SessionExecutor
- type TaskExecInfo
- type TaskHandle
- type TaskManager
- func (mgr *TaskManager) AdjustTaskOverflowConcurrency(ctx context.Context, se sessionctx.Context) error
- func (mgr *TaskManager) CancelSubtask(ctx context.Context, execID string, taskID int64) error
- func (mgr *TaskManager) CancelTask(ctx context.Context, taskID int64) error
- func (*TaskManager) CancelTaskByKeySession(ctx context.Context, se sessionctx.Context, taskKey string) error
- func (mgr *TaskManager) CreateTask(ctx context.Context, key string, tp proto.TaskType, concurrency int, ...) (taskID int64, err error)
- func (mgr *TaskManager) CreateTaskWithSession(ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, ...) (taskID int64, err error)
- func (mgr *TaskManager) DeleteDeadNodes(ctx context.Context, nodes []string) error
- func (mgr *TaskManager) ExecuteSQLWithNewSession(ctx context.Context, sql string, args ...any) (rs []chunk.Row, err error)
- func (mgr *TaskManager) FailSubtask(ctx context.Context, execID string, taskID int64, err error) error
- func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error
- func (mgr *TaskManager) FinishSubtask(ctx context.Context, execID string, id int64, meta []byte) error
- func (mgr *TaskManager) GCSubtasks(ctx context.Context) error
- func (mgr *TaskManager) GetActiveSubtasks(ctx context.Context, taskID int64) ([]*proto.SubtaskBase, error)
- func (mgr *TaskManager) GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error)
- func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.SubtaskBase, error)
- func (mgr *TaskManager) GetAllSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.SubtaskState) ([]*proto.Subtask, error)
- func (mgr *TaskManager) GetCPUCountOfNode(ctx context.Context) (int, error)
- func (mgr *TaskManager) GetFirstSubtaskInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, ...) (*proto.Subtask, error)
- func (mgr *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error)
- func (mgr *TaskManager) GetSubtaskErrors(ctx context.Context, taskID int64) ([]error, error)
- func (mgr *TaskManager) GetSubtaskRowCount(ctx context.Context, taskID int64, step proto.Step) (int64, error)
- func (mgr *TaskManager) GetSubtasksByExecIDAndStepAndStates(ctx context.Context, execID string, taskID int64, step proto.Step, ...) ([]*proto.Subtask, error)
- func (mgr *TaskManager) GetSubtasksWithHistory(ctx context.Context, taskID int64, step proto.Step) ([]*proto.Subtask, error)
- func (mgr *TaskManager) GetTaskBaseByID(ctx context.Context, taskID int64) (task *proto.TaskBase, err error)
- func (mgr *TaskManager) GetTaskBaseByIDWithHistory(ctx context.Context, taskID int64) (task *proto.TaskBase, err error)
- func (mgr *TaskManager) GetTaskBaseByKeyWithHistory(ctx context.Context, key string) (task *proto.TaskBase, err error)
- func (mgr *TaskManager) GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
- func (mgr *TaskManager) GetTaskByIDWithHistory(ctx context.Context, taskID int64) (task *proto.Task, err error)
- func (mgr *TaskManager) GetTaskByKey(ctx context.Context, key string) (task *proto.Task, err error)
- func (mgr *TaskManager) GetTaskByKeyWithHistory(ctx context.Context, key string) (task *proto.Task, err error)
- func (mgr *TaskManager) GetTaskExecInfoByExecID(ctx context.Context, execID string) ([]*TaskExecInfo, error)
- func (mgr *TaskManager) GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)
- func (mgr *TaskManager) GetTopUnfinishedTasks(ctx context.Context) ([]*proto.TaskBase, error)
- func (mgr *TaskManager) GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error)
- func (mgr *TaskManager) HasSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, ...) (bool, error)
- func (mgr *TaskManager) InitMeta(ctx context.Context, tidbID string, role string) error
- func (*TaskManager) InitMetaSession(ctx context.Context, se sessionctx.Context, execID string, role string) error
- func (mgr *TaskManager) ModifiedTask(ctx context.Context, task *proto.Task) error
- func (mgr *TaskManager) ModifyTaskByID(ctx context.Context, taskID int64, param *proto.ModifyParam) error
- func (mgr *TaskManager) PauseSubtasks(ctx context.Context, execID string, taskID int64) error
- func (mgr *TaskManager) PauseTask(ctx context.Context, taskKey string) (bool, error)
- func (mgr *TaskManager) PausedTask(ctx context.Context, taskID int64) error
- func (mgr *TaskManager) RecoverMeta(ctx context.Context, execID string, role string) error
- func (mgr *TaskManager) ResumeSubtasks(ctx context.Context, taskID int64) error
- func (mgr *TaskManager) ResumeTask(ctx context.Context, taskKey string) (bool, error)
- func (mgr *TaskManager) ResumedTask(ctx context.Context, taskID int64) error
- func (mgr *TaskManager) RevertTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error
- func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error
- func (mgr *TaskManager) RunningSubtasksBack2Pending(ctx context.Context, subtasks []*proto.SubtaskBase) error
- func (mgr *TaskManager) StartSubtask(ctx context.Context, subtaskID int64, execID string) error
- func (mgr *TaskManager) SucceedTask(ctx context.Context, taskID int64) error
- func (mgr *TaskManager) SwitchTaskStep(ctx context.Context, task *proto.Task, nextState proto.TaskState, ...) error
- func (mgr *TaskManager) SwitchTaskStepInBatch(ctx context.Context, task *proto.Task, nextState proto.TaskState, ...) error
- func (*TaskManager) TransferSubtasks2HistoryWithSession(ctx context.Context, se sessionctx.Context, taskID int64) error
- func (mgr *TaskManager) TransferTasks2History(ctx context.Context, tasks []*proto.Task) error
- func (mgr *TaskManager) UpdateSubtaskRowCount(ctx context.Context, subtaskID int64, rowCount int64) error
- func (mgr *TaskManager) UpdateSubtaskStateAndError(ctx context.Context, execID string, id int64, state proto.SubtaskState, ...) error
- func (mgr *TaskManager) UpdateSubtasksExecIDs(ctx context.Context, subtasks []*proto.SubtaskBase) error
- func (mgr *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) error
- func (mgr *TaskManager) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error
Constants ¶
const ( // TaskColumns is the columns for task. // TODO: dispatcher_id will update to scheduler_id later TaskColumns = basicTaskColumns + `, t.start_time, t.state_update_time, t.meta, t.dispatcher_id, t.error, t.modify_params` // InsertTaskColumns is the columns used in insert task. InsertTaskColumns = `task_key, type, state, priority, concurrency, step, meta, create_time, target_scope` // SubtaskColumns is the columns for subtask. SubtaskColumns = basicSubtaskColumns + `, state_update_time, meta, summary` // InsertSubtaskColumns is the columns used in insert subtask. InsertSubtaskColumns = `step, task_key, exec_id, meta, state, type, concurrency, ordinal, create_time, checkpoint, summary` )
Variables ¶
var ( // ErrUnstableSubtasks is the error when we detected that the subtasks are // unstable, i.e. count, order and content of the subtasks are changed on // different call. ErrUnstableSubtasks = errors.New("unstable subtasks") // ErrTaskNotFound is the error when we can't found task. // i.e. TransferTasks2History move task from tidb_global_task to tidb_global_task_history. ErrTaskNotFound = errors.New("task not found") // ErrTaskAlreadyExists is the error when we submit a task with the same task key. // i.e. SubmitTask in handle may submit a task twice. ErrTaskAlreadyExists = errors.New("task already exists") // ErrTaskStateNotAllow is the error when the task state is not allowed to do the operation. ErrTaskStateNotAllow = errors.New("task state not allow to do the operation") // ErrTaskChanged is the error when task changed by other operation. ErrTaskChanged = errors.New("task changed by other operation") // ErrSubtaskNotFound is the error when can't find subtask by subtask_id and execId, // i.e. scheduler change the subtask's execId when subtask need to balance to other nodes. ErrSubtaskNotFound = errors.New("subtask not found") )
var TestChannel = make(chan struct{})
TestChannel is used for test.
var ( // TestLastTaskID is used for test to set the last task ID. TestLastTaskID atomic.Int64 )
Functions ¶
func Row2SubTask ¶
Row2SubTask converts a row to a subtask.
Types ¶
type SessionExecutor ¶
type SessionExecutor interface { // WithNewSession executes the function with a new session. WithNewSession(fn func(se sessionctx.Context) error) error // WithNewTxn executes the fn in a new transaction. WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error }
SessionExecutor defines the interface for executing SQLs in a session.
type TaskExecInfo ¶
type TaskExecInfo struct { *proto.TaskBase // SubtaskConcurrency is the concurrency of subtask in current task step. // TODO: will be used when support subtask have smaller concurrency than task, // TODO: such as post-process of import-into. Also remember the 'modifying' state // also update subtask concurrency. // TODO: we might need create one task executor for each step in this case, to alloc // TODO: minimal resource SubtaskConcurrency int }
TaskExecInfo is the execution information of a task, on some exec node.
type TaskHandle ¶
type TaskHandle interface { // GetPreviousSubtaskMetas gets previous subtask metas. GetPreviousSubtaskMetas(taskID int64, step proto.Step) ([][]byte, error) SessionExecutor }
TaskHandle provides the interface for operations needed by Scheduler. Then we can use scheduler's function in Scheduler interface.
type TaskManager ¶
type TaskManager struct {
// contains filtered or unexported fields
}
TaskManager is the manager of task and subtask.
func GetTaskManager ¶
func GetTaskManager() (*TaskManager, error)
GetTaskManager gets the task manager.
func NewTaskManager ¶
func NewTaskManager(sePool util.SessionPool) *TaskManager
NewTaskManager creates a new task manager.
func (*TaskManager) AdjustTaskOverflowConcurrency ¶
func (mgr *TaskManager) AdjustTaskOverflowConcurrency(ctx context.Context, se sessionctx.Context) error
AdjustTaskOverflowConcurrency change the task concurrency to a max value supported by current cluster. This is a workaround for an upgrade bug: in v7.5.x, the task concurrency is hard-coded to 16, resulting in a stuck issue if the new version TiDB has less than 16 CPU count. We don't adjust the concurrency in subtask table because this field does not exist in v7.5.0. For details, see https://github.com/pingcap/tidb/issues/50894. For the following versions, there is a check when submitting a new task. This function should be a no-op.
func (*TaskManager) CancelSubtask ¶
CancelSubtask update the task's subtasks' state to canceled.
func (*TaskManager) CancelTask ¶
func (mgr *TaskManager) CancelTask(ctx context.Context, taskID int64) error
CancelTask cancels task.
func (*TaskManager) CancelTaskByKeySession ¶
func (*TaskManager) CancelTaskByKeySession(ctx context.Context, se sessionctx.Context, taskKey string) error
CancelTaskByKeySession cancels task by key using input session.
func (*TaskManager) CreateTask ¶
func (mgr *TaskManager) CreateTask(ctx context.Context, key string, tp proto.TaskType, concurrency int, targetScope string, meta []byte) (taskID int64, err error)
CreateTask adds a new task to task table.
func (*TaskManager) CreateTaskWithSession ¶
func (mgr *TaskManager) CreateTaskWithSession( ctx context.Context, se sessionctx.Context, key string, tp proto.TaskType, concurrency int, targetScope string, meta []byte, ) (taskID int64, err error)
CreateTaskWithSession adds a new task to task table with session.
func (*TaskManager) DeleteDeadNodes ¶
func (mgr *TaskManager) DeleteDeadNodes(ctx context.Context, nodes []string) error
DeleteDeadNodes deletes the dead nodes from mysql.dist_framework_meta.
func (*TaskManager) ExecuteSQLWithNewSession ¶
func (mgr *TaskManager) ExecuteSQLWithNewSession(ctx context.Context, sql string, args ...any) (rs []chunk.Row, err error)
ExecuteSQLWithNewSession executes one SQL with new session.
func (*TaskManager) FailSubtask ¶
func (mgr *TaskManager) FailSubtask(ctx context.Context, execID string, taskID int64, err error) error
FailSubtask update the task's subtask state to failed and set the err.
func (*TaskManager) FailTask ¶
func (mgr *TaskManager) FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error
FailTask implements the scheduler.TaskManager interface.
func (*TaskManager) FinishSubtask ¶
func (mgr *TaskManager) FinishSubtask(ctx context.Context, execID string, id int64, meta []byte) error
FinishSubtask updates the subtask meta and mark state to succeed.
func (*TaskManager) GCSubtasks ¶
func (mgr *TaskManager) GCSubtasks(ctx context.Context) error
GCSubtasks deletes the history subtask which is older than the given days.
func (*TaskManager) GetActiveSubtasks ¶
func (mgr *TaskManager) GetActiveSubtasks(ctx context.Context, taskID int64) ([]*proto.SubtaskBase, error)
GetActiveSubtasks implements TaskManager.GetActiveSubtasks.
func (*TaskManager) GetAllNodes ¶
func (mgr *TaskManager) GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error)
GetAllNodes gets nodes in dist_framework_meta.
func (*TaskManager) GetAllSubtasks ¶
func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.SubtaskBase, error)
GetAllSubtasks gets all subtasks with basic columns.
func (*TaskManager) GetAllSubtasksByStepAndState ¶
func (mgr *TaskManager) GetAllSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.SubtaskState) ([]*proto.Subtask, error)
GetAllSubtasksByStepAndState gets the subtask by step and state.
func (*TaskManager) GetCPUCountOfNode ¶
func (mgr *TaskManager) GetCPUCountOfNode(ctx context.Context) (int, error)
GetCPUCountOfNode gets the cpu count of node.
func (*TaskManager) GetFirstSubtaskInStates ¶
func (mgr *TaskManager) GetFirstSubtaskInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (*proto.Subtask, error)
GetFirstSubtaskInStates gets the first subtask by given states.
func (*TaskManager) GetSubtaskCntGroupByStates ¶
func (mgr *TaskManager) GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error)
GetSubtaskCntGroupByStates gets the subtask count by states.
func (*TaskManager) GetSubtaskErrors ¶
GetSubtaskErrors gets subtasks' errors.
func (*TaskManager) GetSubtaskRowCount ¶
func (mgr *TaskManager) GetSubtaskRowCount(ctx context.Context, taskID int64, step proto.Step) (int64, error)
GetSubtaskRowCount gets the subtask row count.
func (*TaskManager) GetSubtasksByExecIDAndStepAndStates ¶
func (mgr *TaskManager) GetSubtasksByExecIDAndStepAndStates(ctx context.Context, execID string, taskID int64, step proto.Step, states ...proto.SubtaskState) ([]*proto.Subtask, error)
GetSubtasksByExecIDAndStepAndStates gets all subtasks by given states on one node.
func (*TaskManager) GetSubtasksWithHistory ¶
func (mgr *TaskManager) GetSubtasksWithHistory(ctx context.Context, taskID int64, step proto.Step) ([]*proto.Subtask, error)
GetSubtasksWithHistory gets the subtasks from tidb_global_task and tidb_global_task_history.
func (*TaskManager) GetTaskBaseByID ¶
func (mgr *TaskManager) GetTaskBaseByID(ctx context.Context, taskID int64) (task *proto.TaskBase, err error)
GetTaskBaseByID implements the TaskManager.GetTaskBaseByID interface.
func (*TaskManager) GetTaskBaseByIDWithHistory ¶
func (mgr *TaskManager) GetTaskBaseByIDWithHistory(ctx context.Context, taskID int64) (task *proto.TaskBase, err error)
GetTaskBaseByIDWithHistory gets the task by the task ID from both tidb_global_task and tidb_global_task_history.
func (*TaskManager) GetTaskBaseByKeyWithHistory ¶
func (mgr *TaskManager) GetTaskBaseByKeyWithHistory(ctx context.Context, key string) (task *proto.TaskBase, err error)
GetTaskBaseByKeyWithHistory gets the task base from history table by the task key.
func (*TaskManager) GetTaskByID ¶
func (mgr *TaskManager) GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
GetTaskByID gets the task by the task ID.
func (*TaskManager) GetTaskByIDWithHistory ¶
func (mgr *TaskManager) GetTaskByIDWithHistory(ctx context.Context, taskID int64) (task *proto.Task, err error)
GetTaskByIDWithHistory gets the task by the task ID from both tidb_global_task and tidb_global_task_history.
func (*TaskManager) GetTaskByKey ¶
GetTaskByKey gets the task by the task key.
func (*TaskManager) GetTaskByKeyWithHistory ¶
func (mgr *TaskManager) GetTaskByKeyWithHistory(ctx context.Context, key string) (task *proto.Task, err error)
GetTaskByKeyWithHistory gets the task from history table by the task key.
func (*TaskManager) GetTaskExecInfoByExecID ¶
func (mgr *TaskManager) GetTaskExecInfoByExecID(ctx context.Context, execID string) ([]*TaskExecInfo, error)
GetTaskExecInfoByExecID implements the scheduler.TaskManager interface.
func (*TaskManager) GetTasksInStates ¶
func (mgr *TaskManager) GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)
GetTasksInStates gets the tasks in the states(order by priority asc, create_time acs, id asc).
func (*TaskManager) GetTopUnfinishedTasks ¶
GetTopUnfinishedTasks implements the scheduler.TaskManager interface.
func (*TaskManager) GetUsedSlotsOnNodes ¶
GetUsedSlotsOnNodes implements the scheduler.TaskManager interface.
func (*TaskManager) HasSubtasksInStates ¶
func (mgr *TaskManager) HasSubtasksInStates(ctx context.Context, tidbID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (bool, error)
HasSubtasksInStates checks if there are subtasks in the states.
func (*TaskManager) InitMetaSession ¶
func (*TaskManager) InitMetaSession(ctx context.Context, se sessionctx.Context, execID string, role string) error
InitMetaSession insert the manager information into dist_framework_meta. if the record exists, update the cpu_count and role.
func (*TaskManager) ModifiedTask ¶
ModifiedTask implements the scheduler.TaskManager interface.
func (*TaskManager) ModifyTaskByID ¶
func (mgr *TaskManager) ModifyTaskByID(ctx context.Context, taskID int64, param *proto.ModifyParam) error
ModifyTaskByID modifies the task by the task ID.
func (*TaskManager) PauseSubtasks ¶
PauseSubtasks update all running/pending subtasks to pasued state.
func (*TaskManager) PausedTask ¶
func (mgr *TaskManager) PausedTask(ctx context.Context, taskID int64) error
PausedTask update the task state from pausing to paused.
func (*TaskManager) RecoverMeta ¶
RecoverMeta insert the manager information into dist_framework_meta. if the record exists, update the cpu_count. Don't update role for we only update it in `set global tidb_service_scope`. if not there might has a data race.
func (*TaskManager) ResumeSubtasks ¶
func (mgr *TaskManager) ResumeSubtasks(ctx context.Context, taskID int64) error
ResumeSubtasks update all paused subtasks to pending state.
func (*TaskManager) ResumeTask ¶
ResumeTask resumes the task.
func (*TaskManager) ResumedTask ¶
func (mgr *TaskManager) ResumedTask(ctx context.Context, taskID int64) error
ResumedTask implements the scheduler.TaskManager interface.
func (*TaskManager) RevertTask ¶
func (mgr *TaskManager) RevertTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error
RevertTask implements the scheduler.TaskManager interface.
func (*TaskManager) RevertedTask ¶
func (mgr *TaskManager) RevertedTask(ctx context.Context, taskID int64) error
RevertedTask implements the scheduler.TaskManager interface.
func (*TaskManager) RunningSubtasksBack2Pending ¶
func (mgr *TaskManager) RunningSubtasksBack2Pending(ctx context.Context, subtasks []*proto.SubtaskBase) error
RunningSubtasksBack2Pending implements the taskexecutor.TaskTable interface.
func (*TaskManager) StartSubtask ¶
StartSubtask updates the subtask state to running.
func (*TaskManager) SucceedTask ¶
func (mgr *TaskManager) SucceedTask(ctx context.Context, taskID int64) error
SucceedTask update task state from running to succeed.
func (*TaskManager) SwitchTaskStep ¶
func (mgr *TaskManager) SwitchTaskStep( ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask, ) error
SwitchTaskStep implements the scheduler.TaskManager interface.
func (*TaskManager) SwitchTaskStepInBatch ¶
func (mgr *TaskManager) SwitchTaskStepInBatch( ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask, ) error
SwitchTaskStepInBatch implements the scheduler.TaskManager interface.
func (*TaskManager) TransferSubtasks2HistoryWithSession ¶
func (*TaskManager) TransferSubtasks2HistoryWithSession(ctx context.Context, se sessionctx.Context, taskID int64) error
TransferSubtasks2HistoryWithSession transfer the selected subtasks into tidb_background_subtask_history table by taskID.
func (*TaskManager) TransferTasks2History ¶
TransferTasks2History transfer the selected tasks into tidb_global_task_history table by taskIDs.
func (*TaskManager) UpdateSubtaskRowCount ¶
func (mgr *TaskManager) UpdateSubtaskRowCount(ctx context.Context, subtaskID int64, rowCount int64) error
UpdateSubtaskRowCount updates the subtask row count.
func (*TaskManager) UpdateSubtaskStateAndError ¶
func (mgr *TaskManager) UpdateSubtaskStateAndError( ctx context.Context, execID string, id int64, state proto.SubtaskState, subTaskErr error) error
UpdateSubtaskStateAndError updates the subtask state.
func (*TaskManager) UpdateSubtasksExecIDs ¶
func (mgr *TaskManager) UpdateSubtasksExecIDs(ctx context.Context, subtasks []*proto.SubtaskBase) error
UpdateSubtasksExecIDs update subtasks' execID.
func (*TaskManager) WithNewSession ¶
func (mgr *TaskManager) WithNewSession(fn func(se sessionctx.Context) error) error
WithNewSession executes the function with a new session.
func (*TaskManager) WithNewTxn ¶
func (mgr *TaskManager) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error
WithNewTxn executes the fn in a new transaction.