taskexecutor

package
v1.1.0-beta.0...-91bfa27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2025 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// TaskCheckInterval is the interval to check whether there are tasks to run.
	// TODO maybe change this interval larger for performance.
	TaskCheckInterval = 300 * time.Millisecond
	// SubtaskCheckInterval is the interval to check whether there are subtasks to run.
	// exported for testing.
	SubtaskCheckInterval = 300 * time.Millisecond
	// MaxSubtaskCheckInterval is the max interval to check whether there are subtasks to run.
	// exported for testing.
	MaxSubtaskCheckInterval = 2 * time.Second
)
View Source
var (
	// ErrCancelSubtask is the cancel cause when cancelling subtasks.
	ErrCancelSubtask = errors.New("cancel subtasks")
	// ErrNonIdempotentSubtask means the subtask is left in running state and is not idempotent,
	// so cannot be run again.
	ErrNonIdempotentSubtask = errors.New("subtask in running state and is not idempotent")
)

Functions

func ClearTaskExecutors

func ClearTaskExecutors()

ClearTaskExecutors is only used in test

func RegisterTaskType

func RegisterTaskType(taskType proto.TaskType, factory FactoryFn)

RegisterTaskType registers the task type.

Types

type BaseStepExecutor

type BaseStepExecutor struct {
	execute.StepExecFrameworkInfo
}

BaseStepExecutor is the base step executor implementation.

func (*BaseStepExecutor) Cleanup

Cleanup implements the StepExecutor interface.

func (*BaseStepExecutor) Init

Init implements the StepExecutor interface.

func (*BaseStepExecutor) RealtimeSummary

func (*BaseStepExecutor) RealtimeSummary() *execute.SubtaskSummary

RealtimeSummary implements the StepExecutor interface.

func (*BaseStepExecutor) RunSubtask

RunSubtask implements the StepExecutor interface.

func (*BaseStepExecutor) TaskMetaModified

func (*BaseStepExecutor) TaskMetaModified(*proto.Task) error

TaskMetaModified implements the StepExecutor interface.

type BaseTaskExecutor

type BaseTaskExecutor struct {
	Param

	Extension
	// contains filtered or unexported fields
}

BaseTaskExecutor is the base implementation of TaskExecutor.

func NewBaseTaskExecutor

func NewBaseTaskExecutor(ctx context.Context, task *proto.Task, param Param) *BaseTaskExecutor

NewBaseTaskExecutor creates a new BaseTaskExecutor. see TaskExecutor.Init for why we want to use task-base to create TaskExecutor. TODO: we can refactor this part to pass task base only, but currently ADD-INDEX depends on it to init, so we keep it for now.

func (*BaseTaskExecutor) Cancel

func (e *BaseTaskExecutor) Cancel()

Cancel implements TaskExecutor.Cancel.

func (*BaseTaskExecutor) CancelRunningSubtask

func (e *BaseTaskExecutor) CancelRunningSubtask()

CancelRunningSubtask implements TaskExecutor.CancelRunningSubtask.

func (*BaseTaskExecutor) Close

func (e *BaseTaskExecutor) Close()

Close closes the TaskExecutor when all the subtasks are complete.

func (*BaseTaskExecutor) Ctx

func (e *BaseTaskExecutor) Ctx() context.Context

Ctx returns the context of the task executor. TODO: remove it when add-index.taskexecutor.Init don't depend on it.

func (*BaseTaskExecutor) GetTaskBase

func (e *BaseTaskExecutor) GetTaskBase() *proto.TaskBase

GetTaskBase implements TaskExecutor.GetTaskBase.

func (*BaseTaskExecutor) Init

Init implements the TaskExecutor interface.

func (*BaseTaskExecutor) Run

func (e *BaseTaskExecutor) Run()

Run implements the TaskExecutor interface.

type Extension

type Extension interface {
	// IsIdempotent returns whether the subtask is idempotent.
	// when tidb restart, the subtask might be left in the running state.
	// if it's idempotent, the Executor can rerun the subtask, else
	// the Executor will mark the subtask as failed.
	IsIdempotent(subtask *proto.Subtask) bool
	// GetStepExecutor returns the subtask executor for the subtask.
	// Note, the error returned is fatal, framework will fail the task directly.
	GetStepExecutor(task *proto.Task) (execute.StepExecutor, error)
	// IsRetryableError returns whether the error is transient.
	// When error is transient, the framework won't mark subtasks as failed,
	// then the TaskExecutor can load the subtask again and redo it.
	IsRetryableError(err error) bool
}

Extension extends the TaskExecutor. each task type should implement this interface.

type FactoryFn

type FactoryFn func(ctx context.Context, task *proto.Task, param Param) TaskExecutor

FactoryFn is a function to create a TaskExecutor.

func GetTaskExecutorFactory

func GetTaskExecutorFactory(taskType proto.TaskType) FactoryFn

GetTaskExecutorFactory gets taskExecutorFactory by task type.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager monitors the task table and manages the taskExecutors.

func NewManager

func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager, error)

NewManager creates a new task executor Manager.

func (*Manager) Cancel

func (m *Manager) Cancel()

Cancel cancels the executor manager. used in test to simulate tidb node shutdown.

func (*Manager) InitMeta

func (m *Manager) InitMeta() error

InitMeta initializes the meta of the Manager. not a must-success step before start manager, manager will try to recover meta periodically.

func (*Manager) Start

func (m *Manager) Start() error

Start starts the Manager.

func (*Manager) Stop

func (m *Manager) Stop()

Stop stops the Manager.

type NodeResource

type NodeResource struct {
	// contains filtered or unexported fields
}

NodeResource is the resource of the node. exported for test.

func NewNodeResource

func NewNodeResource(totalCPU int, totalMem int64) *NodeResource

NewNodeResource creates a new NodeResource. exported for test.

type Param

type Param struct {
	// contains filtered or unexported fields
}

Param is the parameters to create a task executor.

func NewParamForTest

func NewParamForTest(taskTable TaskTable, slotMgr *slotManager, nodeRc *NodeResource, execID string) Param

NewParamForTest creates a new Param for test.

type TaskExecutor

type TaskExecutor interface {
	// Init initializes the TaskExecutor, the returned error is fatal, it will fail
	// the task directly, so be careful what to put into it.
	// The context passing in is Manager.ctx, don't use it to init long-running routines,
	// as it will NOT be cancelled when the task is finished.
	// NOTE: do NOT depend on task meta to do initialization, as we plan to pass
	// task-base to the TaskExecutor in the future, if you need to do some initialization
	// based on task meta, do it in GetStepExecutor, as execute.StepExecutor is
	// where subtasks are actually executed.
	Init(context.Context) error
	// Run runs the task, it will try to run each step one by one, if it cannot
	// find any subtask to run for a while(10s now), it will exit, so manager
	// can free and reuse the resource.
	// we assume that all steps will have same resource usage now, will change it
	// when we support different resource usage for different steps.
	Run()
	// GetTaskBase returns the task, returned value is for read only, don't change it.
	GetTaskBase() *proto.TaskBase
	// CancelRunningSubtask cancels the running subtask and change its state to `cancelled`,
	// the task executor will keep running, so we can have a context to update the
	// subtask state or keep handling revert logic.
	CancelRunningSubtask()
	// Cancel cancels the task executor, the state of running subtask is not changed.
	// it's separated with Close as Close mostly mean will wait all resource released
	// before return, but we only want its context cancelled and check whether it's
	// closed later.
	Cancel()
	// Close closes the TaskExecutor.
	Close()
	IsRetryableError(err error) bool
}

TaskExecutor is the executor for a task. Each task type should implement this interface. context tree of task execution:

Manager.ctx
└── TaskExecutor.ctx: Cancel cancels this one
   └── RunStep.ctx: CancelRunningSubtask cancels this one

type TaskTable

type TaskTable interface {
	// GetTaskExecInfoByExecID gets all task exec infos by given execID, if there's
	// no executable subtask on the execID for some task, it's not returned.
	GetTaskExecInfoByExecID(ctx context.Context, execID string) ([]*storage.TaskExecInfo, error)
	GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)
	GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
	GetTaskBaseByID(ctx context.Context, taskID int64) (task *proto.TaskBase, err error)
	// GetSubtasksByExecIDAndStepAndStates gets all subtasks by given states and execID.
	GetSubtasksByExecIDAndStepAndStates(ctx context.Context, execID string, taskID int64, step proto.Step, states ...proto.SubtaskState) ([]*proto.Subtask, error)
	GetFirstSubtaskInStates(ctx context.Context, instanceID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (*proto.Subtask, error)
	// InitMeta insert the manager information into dist_framework_meta.
	// Call it when starting task executor or in set variable operation.
	InitMeta(ctx context.Context, execID string, role string) error
	// RecoverMeta recover the manager information into dist_framework_meta.
	// Call it periodically to recover deleted meta.
	RecoverMeta(ctx context.Context, execID string, role string) error
	// StartSubtask try to update the subtask's state to running if the subtask is owned by execID.
	// If the update success, it means the execID's related task executor own the subtask.
	StartSubtask(ctx context.Context, subtaskID int64, execID string) error
	// UpdateSubtaskStateAndError update the subtask's state and error.
	UpdateSubtaskStateAndError(ctx context.Context, execID string, subtaskID int64, state proto.SubtaskState, err error) error
	// FailSubtask update the task's subtask state to failed and set the err.
	FailSubtask(ctx context.Context, execID string, taskID int64, err error) error
	// CancelSubtask update the task's subtasks' state to canceled.
	CancelSubtask(ctx context.Context, exe string, taskID int64) error
	// FinishSubtask updates the subtask meta and mark state to succeed.
	FinishSubtask(ctx context.Context, execID string, subtaskID int64, meta []byte) error
	// PauseSubtasks update subtasks state to paused.
	PauseSubtasks(ctx context.Context, execID string, taskID int64) error

	// RunningSubtasksBack2Pending update the state of subtask which belongs to this
	// node from running to pending.
	// see subtask state machine for more detail.
	RunningSubtasksBack2Pending(ctx context.Context, subtasks []*proto.SubtaskBase) error
}

TaskTable defines the interface to access the task table.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL