Documentation ¶
Index ¶
- Variables
- func ClearSchedulerCleanUpFactory()
- func ClearSchedulerFactory()
- func GetLiveExecIDs(ctx context.Context) ([]string, error)
- func IsCancelledErr(err error) bool
- func RegisterSchedulerCleanUpFactory(taskType proto.TaskType, ctor cleanUpFactoryFn)
- func RegisterSchedulerFactory(taskType proto.TaskType, ctor schedulerFactoryFn)
- func VerifyTaskStateTransform(from, to proto.TaskState) bool
- type BaseScheduler
- func (*BaseScheduler) Close()
- func (s *BaseScheduler) GetPreviousSubtaskMetas(taskID int64, step proto.Step) ([][]byte, error)
- func (s *BaseScheduler) GetTask() *proto.Task
- func (*BaseScheduler) Init() error
- func (s *BaseScheduler) ScheduleTask()
- func (s *BaseScheduler) WithNewSession(fn func(se sessionctx.Context) error) error
- func (s *BaseScheduler) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error
- type CleanUpRoutine
- type Extension
- type Manager
- type NodeManager
- type Param
- type Scheduler
- type SlotManager
- type TaskManager
Constants ¶
This section is empty.
Variables ¶
var ( // CheckTaskFinishedInterval is the interval for scheduler. // exported for testing. CheckTaskFinishedInterval = 500 * time.Millisecond // RetrySQLTimes is the max retry times when executing SQL. RetrySQLTimes = 30 // RetrySQLInterval is the initial interval between two SQL retries. RetrySQLInterval = 3 * time.Second // RetrySQLMaxInterval is the max interval between two SQL retries. RetrySQLMaxInterval = 30 * time.Second )
var ( // CheckTaskRunningInterval is the interval for loading tasks. // It is exported for testing. CheckTaskRunningInterval = 3 * time.Second // DefaultCleanUpInterval is the interval of cleanup routine. DefaultCleanUpInterval = 10 * time.Minute )
var MockServerInfo atomic.Pointer[[]string]
MockServerInfo exported for scheduler_test.go
Functions ¶
func ClearSchedulerCleanUpFactory ¶
func ClearSchedulerCleanUpFactory()
ClearSchedulerCleanUpFactory is only used in test.
func ClearSchedulerFactory ¶
func ClearSchedulerFactory()
ClearSchedulerFactory is only used in test.
func GetLiveExecIDs ¶
GetLiveExecIDs returns all live executor node IDs.
func IsCancelledErr ¶
IsCancelledErr checks if the error is a cancelled error.
func RegisterSchedulerCleanUpFactory ¶
RegisterSchedulerCleanUpFactory is used to register the scheduler clean up factory. normally scheduler cleanup is used in the scheduler_manager gcTaskLoop to do clean up works when tasks are finished.
func RegisterSchedulerFactory ¶
RegisterSchedulerFactory is used to register the scheduler factory. normally scheduler ctor should be registered before the server start. and should be called in a single routine, such as in init(). after the server start, there's should be no write to the map. but for index backfill, the register call stack is so deep, not sure if it's safe to do so, so we use a lock here.
func VerifyTaskStateTransform ¶
VerifyTaskStateTransform verifies whether the task state transform is valid.
Types ¶
type BaseScheduler ¶
type BaseScheduler struct { Param // when RegisterSchedulerFactory, the factory MUST initialize this fields. Extension // contains filtered or unexported fields }
BaseScheduler is the base struct for Scheduler. each task type embed this struct and implement the Extension interface.
func NewBaseScheduler ¶
NewBaseScheduler creates a new BaseScheduler.
func (*BaseScheduler) GetPreviousSubtaskMetas ¶
GetPreviousSubtaskMetas get subtask metas from specific step.
func (*BaseScheduler) GetTask ¶
func (s *BaseScheduler) GetTask() *proto.Task
GetTask implements the Scheduler interface.
func (*BaseScheduler) Init ¶
func (*BaseScheduler) Init() error
Init implements the Scheduler interface.
func (*BaseScheduler) ScheduleTask ¶
func (s *BaseScheduler) ScheduleTask()
ScheduleTask implements the Scheduler interface.
func (*BaseScheduler) WithNewSession ¶
func (s *BaseScheduler) WithNewSession(fn func(se sessionctx.Context) error) error
WithNewSession executes the function with a new session.
func (*BaseScheduler) WithNewTxn ¶
func (s *BaseScheduler) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error
WithNewTxn executes the fn in a new transaction.
type CleanUpRoutine ¶
type CleanUpRoutine interface { // CleanUp do the cleanup work. // task.Meta can be updated here, such as redacting some sensitive info. CleanUp(ctx context.Context, task *proto.Task) error }
CleanUpRoutine is used for the framework to do some clean up work if the task is finished.
type Extension ¶
type Extension interface { // OnTick is used to handle the ticker event, if business impl need to do some periodical work, you can // do it here, but don't do too much work here, because the ticker interval is small, and it will block // the event is generated every CheckTaskRunningInterval, and only when the task NOT FINISHED and NO ERROR. OnTick(ctx context.Context, task *proto.Task) // OnNextSubtasksBatch is used to generate batch of subtasks for next stage // NOTE: don't change task.State inside, framework will manage it. // it's called when: // 1. task is pending and entering it's first step. // 2. subtasks scheduled has all finished with no error. // when next step is StepDone, it should return nil, nil. OnNextSubtasksBatch(ctx context.Context, h storage.TaskHandle, task *proto.Task, execIDs []string, step proto.Step) (subtaskMetas [][]byte, err error) // OnDone is called when task is done, either finished successfully or failed // with error. // if the task is failed when initializing scheduler, or it's an unknown task, // we don't call this function. OnDone(ctx context.Context, h storage.TaskHandle, task *proto.Task) error // GetEligibleInstances is used to get the eligible instances for the task. // on certain condition we may want to use some instances to do the task, such as instances with more disk. // if returned instances is empty, it means all instances are eligible. // TODO: run import from server disk using framework makes this logic complicated, // the instance might not be managed by framework. GetEligibleInstances(ctx context.Context, task *proto.Task) ([]string, error) // IsRetryableErr is used to check whether the error occurred in scheduler is retryable. IsRetryableErr(err error) bool // GetNextStep is used to get the next step for the task. // if task runs successfully, it should go from StepInit to business steps, // then to StepDone, then scheduler will mark it as finished. // NOTE: don't depend on task meta to decide the next step, if it's really needed, // initialize required fields on scheduler.Init GetNextStep(task *proto.TaskBase) proto.Step }
Extension is used to control the process operations for each task. it's used to extend functions of BaseScheduler. as golang doesn't support inheritance, we embed this interface in Scheduler to simulate abstract method as in other OO languages.
func GetTestSchedulerExt ¶
func GetTestSchedulerExt(ctrl *gomock.Controller) Extension
GetTestSchedulerExt return scheduler.Extension for testing.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manage a bunch of schedulers. Scheduler schedule and monitor tasks. The scheduling task number is limited by size of gPool.
func NewManager ¶
func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Manager
NewManager creates a scheduler struct.
func (*Manager) Cancel ¶
func (sm *Manager) Cancel()
Cancel cancels the scheduler manager. used in test to simulate tidb node shutdown.
func (*Manager) Initialized ¶
Initialized check the manager initialized.
func (*Manager) MockScheduler ¶
func (sm *Manager) MockScheduler(task *proto.Task) *BaseScheduler
MockScheduler mock one scheduler for one task, only used for tests.
type NodeManager ¶
type NodeManager struct {
// contains filtered or unexported fields
}
NodeManager maintains live TiDB nodes in the cluster, and maintains the nodes managed by the framework.
type Param ¶
type Param struct {
// contains filtered or unexported fields
}
Param is used to pass parameters when creating scheduler.
type Scheduler ¶
type Scheduler interface { // Init initializes the scheduler, should be called before ExecuteTask. // if Init returns error, scheduler manager will fail the task directly, // so the returned error should be a fatal error. Init() error // ScheduleTask schedules the task execution step by step. ScheduleTask() // Close closes the scheduler, should be called if Init returns nil. Close() // GetTask returns the task that the scheduler is managing. // the task is for read only, it might be accessed by multiple goroutines GetTask() *proto.Task Extension }
Scheduler manages the lifetime of a task including submitting subtasks and updating the status of a task.
type SlotManager ¶
type SlotManager struct {
// contains filtered or unexported fields
}
SlotManager is used to manage the resource slots and stripes.
Slot is the resource unit of dist framework on each node, each slot represents 1 cpu core, 1/total-core of memory, 1/total-core of disk, etc.
Stripe is the resource unit of dist framework, regardless of the node, each stripe means 1 slot on all nodes managed by dist framework. Number of stripes is equal to number of slots on each node, as we assume that all nodes managed by dist framework are isomorphic. Stripes reserved for a task defines the maximum resource that a task can use but the task might not use all the resources. To maximize the resource utilization, we will try to schedule as many tasks as possible depends on the used slots on each node and the minimum resource required by the tasks, and in this case, we don't consider task order.
Dist framework will try to allocate resource by slots and stripes, and give quota to subtask, but subtask can determine what to conform.
type TaskManager ¶
type TaskManager interface { // GetTopUnfinishedTasks returns unfinished tasks, limited by MaxConcurrentTask*2, // to make sure lower rank tasks can be scheduled if resource is enough. // The returned tasks are sorted by task order, see proto.Task. GetTopUnfinishedTasks(ctx context.Context) ([]*proto.TaskBase, error) // GetAllSubtasks gets all subtasks with basic columns. GetAllSubtasks(ctx context.Context) ([]*proto.SubtaskBase, error) GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error) GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error) GetTaskBaseByID(ctx context.Context, taskID int64) (task *proto.TaskBase, err error) GCSubtasks(ctx context.Context) error GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error) DeleteDeadNodes(ctx context.Context, nodes []string) error // TransferTasks2History transfer tasks, and it's related subtasks to history tables. TransferTasks2History(ctx context.Context, tasks []*proto.Task) error // CancelTask updated task state to canceling. CancelTask(ctx context.Context, taskID int64) error // FailTask updates task state to Failed and updates task error. FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error // RevertTask updates task state to reverting, and task error. RevertTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error // RevertedTask updates task state to reverted. RevertedTask(ctx context.Context, taskID int64) error // PauseTask updated task state to pausing. PauseTask(ctx context.Context, taskKey string) (bool, error) // PausedTask updated task state to paused. PausedTask(ctx context.Context, taskID int64) error // ResumedTask updated task state from resuming to running. ResumedTask(ctx context.Context, taskID int64) error // SucceedTask updates a task to success state. SucceedTask(ctx context.Context, taskID int64) error // SwitchTaskStep switches the task to the next step and add subtasks in one // transaction. It will change task state too if we're switch from InitStep to // next step. SwitchTaskStep(ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask) error // SwitchTaskStepInBatch similar to SwitchTaskStep, but it will insert subtasks // in batch, and task step change will be in a separate transaction. // Note: subtasks of this step must be stable, i.e. count, order and content // should be the same on each try, else the subtasks inserted might be messed up. // And each subtask of this step must be different, to handle the network // partition or owner change. SwitchTaskStepInBatch(ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask) error // GetUsedSlotsOnNodes returns the used slots on nodes that have subtask scheduled. // subtasks of each task on one node is only accounted once as we don't support // running them concurrently. // we only consider pending/running subtasks, subtasks related to revert are // not considered. GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error) // GetActiveSubtasks returns subtasks of the task that are in pending/running state. GetActiveSubtasks(ctx context.Context, taskID int64) ([]*proto.SubtaskBase, error) // GetSubtaskCntGroupByStates returns the count of subtasks of some step group by state. GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error) ResumeSubtasks(ctx context.Context, taskID int64) error GetSubtaskErrors(ctx context.Context, taskID int64) ([]error, error) UpdateSubtasksExecIDs(ctx context.Context, subtasks []*proto.SubtaskBase) error // GetAllSubtasksByStepAndState gets all subtasks by given states for one step. GetAllSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.SubtaskState) ([]*proto.Subtask, error) WithNewSession(fn func(se sessionctx.Context) error) error WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error }
TaskManager defines the interface to access task table.