scheduler

package
v1.1.0-beta.0...-0c6681f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 14, 2025 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// CheckTaskFinishedInterval is the interval for scheduler.
	// exported for testing.
	CheckTaskFinishedInterval = 500 * time.Millisecond
	// RetrySQLTimes is the max retry times when executing SQL.
	RetrySQLTimes = 30
	// RetrySQLInterval is the initial interval between two SQL retries.
	RetrySQLInterval = 3 * time.Second
	// RetrySQLMaxInterval is the max interval between two SQL retries.
	RetrySQLMaxInterval = 30 * time.Second
)
View Source
var (
	// CheckTaskRunningInterval is the interval for loading tasks.
	// It is exported for testing.
	CheckTaskRunningInterval = 3 * time.Second

	// DefaultCleanUpInterval is the interval of cleanup routine.
	DefaultCleanUpInterval = 10 * time.Minute
)
View Source
var MockServerInfo atomic.Pointer[[]string]

MockServerInfo exported for scheduler_test.go

Functions

func ClearSchedulerCleanUpFactory

func ClearSchedulerCleanUpFactory()

ClearSchedulerCleanUpFactory is only used in test.

func ClearSchedulerFactory

func ClearSchedulerFactory()

ClearSchedulerFactory is only used in test.

func GetLiveExecIDs

func GetLiveExecIDs(ctx context.Context) ([]string, error)

GetLiveExecIDs returns all live executor node IDs.

func IsCancelledErr

func IsCancelledErr(err error) bool

IsCancelledErr checks if the error is a cancelled error.

func RegisterSchedulerCleanUpFactory

func RegisterSchedulerCleanUpFactory(taskType proto.TaskType, ctor cleanUpFactoryFn)

RegisterSchedulerCleanUpFactory is used to register the scheduler clean up factory. normally scheduler cleanup is used in the scheduler_manager gcTaskLoop to do clean up works when tasks are finished.

func RegisterSchedulerFactory

func RegisterSchedulerFactory(taskType proto.TaskType, ctor schedulerFactoryFn)

RegisterSchedulerFactory is used to register the scheduler factory. normally scheduler ctor should be registered before the server start. and should be called in a single routine, such as in init(). after the server start, there's should be no write to the map. but for index backfill, the register call stack is so deep, not sure if it's safe to do so, so we use a lock here.

func VerifyTaskStateTransform

func VerifyTaskStateTransform(from, to proto.TaskState) bool

VerifyTaskStateTransform verifies whether the task state transform is valid.

Types

type BaseScheduler

type BaseScheduler struct {
	Param

	// when RegisterSchedulerFactory, the factory MUST initialize this fields.
	Extension
	// contains filtered or unexported fields
}

BaseScheduler is the base struct for Scheduler. each task type embed this struct and implement the Extension interface.

func NewBaseScheduler

func NewBaseScheduler(ctx context.Context, task *proto.Task, param Param) *BaseScheduler

NewBaseScheduler creates a new BaseScheduler.

func (*BaseScheduler) Close

func (*BaseScheduler) Close()

Close closes the scheduler.

func (*BaseScheduler) GetPreviousSubtaskMetas

func (s *BaseScheduler) GetPreviousSubtaskMetas(taskID int64, step proto.Step) ([][]byte, error)

GetPreviousSubtaskMetas get subtask metas from specific step.

func (*BaseScheduler) GetTask

func (s *BaseScheduler) GetTask() *proto.Task

GetTask implements the Scheduler interface.

func (*BaseScheduler) Init

func (*BaseScheduler) Init() error

Init implements the Scheduler interface.

func (*BaseScheduler) ScheduleTask

func (s *BaseScheduler) ScheduleTask()

ScheduleTask implements the Scheduler interface.

func (*BaseScheduler) WithNewSession

func (s *BaseScheduler) WithNewSession(fn func(se sessionctx.Context) error) error

WithNewSession executes the function with a new session.

func (*BaseScheduler) WithNewTxn

func (s *BaseScheduler) WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error

WithNewTxn executes the fn in a new transaction.

type CleanUpRoutine

type CleanUpRoutine interface {
	// CleanUp do the cleanup work.
	// task.Meta can be updated here, such as redacting some sensitive info.
	CleanUp(ctx context.Context, task *proto.Task) error
}

CleanUpRoutine is used for the framework to do some clean up work if the task is finished.

type Extension

type Extension interface {
	// OnTick is used to handle the ticker event, if business impl need to do some periodical work, you can
	// do it here, but don't do too much work here, because the ticker interval is small, and it will block
	// the event is generated every CheckTaskRunningInterval, and only when the task NOT FINISHED and NO ERROR.
	OnTick(ctx context.Context, task *proto.Task)

	// OnNextSubtasksBatch is used to generate batch of subtasks for next stage
	// NOTE: don't change task.State inside, framework will manage it.
	// it's called when:
	// 	1. task is pending and entering it's first step.
	// 	2. subtasks scheduled has all finished with no error.
	// when next step is StepDone, it should return nil, nil.
	OnNextSubtasksBatch(ctx context.Context, h storage.TaskHandle, task *proto.Task, execIDs []string, step proto.Step) (subtaskMetas [][]byte, err error)

	// OnDone is called when task is done, either finished successfully or failed
	// with error.
	// if the task is failed when initializing scheduler, or it's an unknown task,
	// we don't call this function.
	OnDone(ctx context.Context, h storage.TaskHandle, task *proto.Task) error

	// GetEligibleInstances is used to get the eligible instances for the task.
	// on certain condition we may want to use some instances to do the task, such as instances with more disk.
	// if returned instances is empty, it means all instances are eligible.
	// TODO: run import from server disk using framework makes this logic complicated,
	// the instance might not be managed by framework.
	GetEligibleInstances(ctx context.Context, task *proto.Task) ([]string, error)

	// IsRetryableErr is used to check whether the error occurred in scheduler is retryable.
	IsRetryableErr(err error) bool

	// GetNextStep is used to get the next step for the task.
	// if task runs successfully, it should go from StepInit to business steps,
	// then to StepDone, then scheduler will mark it as finished.
	// NOTE: don't depend on task meta to decide the next step, if it's really needed,
	// initialize required fields on scheduler.Init
	GetNextStep(task *proto.TaskBase) proto.Step
}

Extension is used to control the process operations for each task. it's used to extend functions of BaseScheduler. as golang doesn't support inheritance, we embed this interface in Scheduler to simulate abstract method as in other OO languages.

func GetTestSchedulerExt

func GetTestSchedulerExt(ctrl *gomock.Controller) Extension

GetTestSchedulerExt return scheduler.Extension for testing.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manage a bunch of schedulers. Scheduler schedule and monitor tasks. The scheduling task number is limited by size of gPool.

func NewManager

func NewManager(ctx context.Context, taskMgr TaskManager, serverID string) *Manager

NewManager creates a scheduler struct.

func (*Manager) Cancel

func (sm *Manager) Cancel()

Cancel cancels the scheduler manager. used in test to simulate tidb node shutdown.

func (*Manager) Initialized

func (sm *Manager) Initialized() bool

Initialized check the manager initialized.

func (*Manager) MockScheduler

func (sm *Manager) MockScheduler(task *proto.Task) *BaseScheduler

MockScheduler mock one scheduler for one task, only used for tests.

func (*Manager) Start

func (sm *Manager) Start()

Start the schedulerManager, start the scheduleTaskLoop to start multiple schedulers.

func (*Manager) Stop

func (sm *Manager) Stop()

Stop the schedulerManager.

type NodeManager

type NodeManager struct {
	// contains filtered or unexported fields
}

NodeManager maintains live TiDB nodes in the cluster, and maintains the nodes managed by the framework.

type Param

type Param struct {
	// contains filtered or unexported fields
}

Param is used to pass parameters when creating scheduler.

type Scheduler

type Scheduler interface {
	// Init initializes the scheduler, should be called before ExecuteTask.
	// if Init returns error, scheduler manager will fail the task directly,
	// so the returned error should be a fatal error.
	Init() error
	// ScheduleTask schedules the task execution step by step.
	ScheduleTask()
	// Close closes the scheduler, should be called if Init returns nil.
	Close()
	// GetTask returns the task that the scheduler is managing.
	// the task is for read only, it might be accessed by multiple goroutines
	GetTask() *proto.Task
	Extension
}

Scheduler manages the lifetime of a task including submitting subtasks and updating the status of a task.

type SlotManager

type SlotManager struct {
	// contains filtered or unexported fields
}

SlotManager is used to manage the resource slots and stripes.

Slot is the resource unit of dist framework on each node, each slot represents 1 cpu core, 1/total-core of memory, 1/total-core of disk, etc.

Stripe is the resource unit of dist framework, regardless of the node, each stripe means 1 slot on all nodes managed by dist framework. Number of stripes is equal to number of slots on each node, as we assume that all nodes managed by dist framework are isomorphic. Stripes reserved for a task defines the maximum resource that a task can use but the task might not use all the resources. To maximize the resource utilization, we will try to schedule as many tasks as possible depends on the used slots on each node and the minimum resource required by the tasks, and in this case, we don't consider task order.

Dist framework will try to allocate resource by slots and stripes, and give quota to subtask, but subtask can determine what to conform.

type TaskManager

type TaskManager interface {
	// GetTopUnfinishedTasks returns unfinished tasks, limited by MaxConcurrentTask*2,
	// to make sure low ranking tasks can be scheduled if resource is enough.
	// The returned tasks are sorted by task order, see proto.Task.
	GetTopUnfinishedTasks(ctx context.Context) ([]*proto.TaskBase, error)
	// GetAllSubtasks gets all subtasks with basic columns.
	GetAllSubtasks(ctx context.Context) ([]*proto.SubtaskBase, error)
	GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)
	GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
	GetTaskBaseByID(ctx context.Context, taskID int64) (task *proto.TaskBase, err error)
	GCSubtasks(ctx context.Context) error
	GetAllNodes(ctx context.Context) ([]proto.ManagedNode, error)
	DeleteDeadNodes(ctx context.Context, nodes []string) error
	// TransferTasks2History transfer tasks, and it's related subtasks to history tables.
	TransferTasks2History(ctx context.Context, tasks []*proto.Task) error
	// CancelTask updated task state to canceling.
	CancelTask(ctx context.Context, taskID int64) error
	// FailTask updates task state to Failed and updates task error.
	FailTask(ctx context.Context, taskID int64, currentState proto.TaskState, taskErr error) error
	// RevertTask updates task state to reverting, and task error.
	RevertTask(ctx context.Context, taskID int64, taskState proto.TaskState, taskErr error) error
	// RevertedTask updates task state to reverted.
	RevertedTask(ctx context.Context, taskID int64) error
	// PauseTask updated task state to pausing.
	PauseTask(ctx context.Context, taskKey string) (bool, error)
	// PausedTask updated task state to 'paused'.
	PausedTask(ctx context.Context, taskID int64) error
	// ResumedTask updated task state from resuming to running.
	ResumedTask(ctx context.Context, taskID int64) error
	// ModifiedTask tries to update task concurrency and meta, and update state
	// back to prev-state, if success, it will also update concurrency of all
	// active subtasks.
	ModifiedTask(ctx context.Context, task *proto.Task) error
	// SucceedTask updates a task to success state.
	SucceedTask(ctx context.Context, taskID int64) error
	// SwitchTaskStep switches the task to the next step and add subtasks in one
	// transaction. It will change task state too if we're switch from InitStep to
	// next step.
	SwitchTaskStep(ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask) error
	// SwitchTaskStepInBatch similar to SwitchTaskStep, but it will insert subtasks
	// in batch, and task step change will be in a separate transaction.
	// Note: subtasks of this step must be stable, i.e. count, order and content
	// should be the same on each try, else the subtasks inserted might be messed up.
	// And each subtask of this step must be different, to handle the network
	// partition or owner change.
	SwitchTaskStepInBatch(ctx context.Context, task *proto.Task, nextState proto.TaskState, nextStep proto.Step, subtasks []*proto.Subtask) error
	// GetUsedSlotsOnNodes returns the used slots on nodes that have subtask scheduled.
	// subtasks of each task on one node is only accounted once as we don't support
	// running them concurrently.
	// we only consider pending/running subtasks, subtasks related to revert are
	// not considered.
	GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error)
	// GetActiveSubtasks returns subtasks of the task that are in pending/running state.
	GetActiveSubtasks(ctx context.Context, taskID int64) ([]*proto.SubtaskBase, error)
	// GetSubtaskCntGroupByStates returns the count of subtasks of some step group by state.
	GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error)
	ResumeSubtasks(ctx context.Context, taskID int64) error
	GetSubtaskErrors(ctx context.Context, taskID int64) ([]error, error)
	UpdateSubtasksExecIDs(ctx context.Context, subtasks []*proto.SubtaskBase) error

	// GetAllSubtasksByStepAndState gets all subtasks by given states for one step.
	GetAllSubtasksByStepAndState(ctx context.Context, taskID int64, step proto.Step, state proto.SubtaskState) ([]*proto.Subtask, error)

	WithNewSession(fn func(se sessionctx.Context) error) error
	WithNewTxn(ctx context.Context, fn func(se sessionctx.Context) error) error
}

TaskManager defines the interface to access task table.

Directories

Path Synopsis
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL