taskexecutor

package
v1.1.0-beta.0...-ec288d9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// TaskCheckInterval is the interval to check whether there are tasks to run.
	// TODO maybe change this interval larger for performance.
	TaskCheckInterval = 300 * time.Millisecond
	// SubtaskCheckInterval is the interval to check whether there are subtasks to run.
	// exported for testing.
	SubtaskCheckInterval = 300 * time.Millisecond
	// MaxSubtaskCheckInterval is the max interval to check whether there are subtasks to run.
	// exported for testing.
	MaxSubtaskCheckInterval = 2 * time.Second
)
View Source
var (
	// ErrCancelSubtask is the cancel cause when cancelling subtasks.
	ErrCancelSubtask = errors.New("cancel subtasks")
	// ErrFinishSubtask is the cancel cause when TaskExecutor successfully processed subtasks.
	ErrFinishSubtask = errors.New("finish subtasks")
	// ErrNonIdempotentSubtask means the subtask is left in running state and is not idempotent,
	// so cannot be run again.
	ErrNonIdempotentSubtask = errors.New("subtask in running state and is not idempotent")

	// MockTiDBDown is used to mock TiDB node down, return true if it's chosen.
	MockTiDBDown func(execID string, task *proto.TaskBase) bool
)

Functions

func ClearTaskExecutors

func ClearTaskExecutors()

ClearTaskExecutors is only used in test

func GetTaskExecutorFactory

func GetTaskExecutorFactory(taskType proto.TaskType) taskExecutorFactoryFn

GetTaskExecutorFactory gets taskExecutorFactory by task type.

func RegisterTaskType

func RegisterTaskType(taskType proto.TaskType, factory taskExecutorFactoryFn, opts ...TaskTypeOption)

RegisterTaskType registers the task type.

Types

type BaseTaskExecutor

type BaseTaskExecutor struct {
	Extension
	// contains filtered or unexported fields
}

BaseTaskExecutor is the base implementation of TaskExecutor.

func NewBaseTaskExecutor

func NewBaseTaskExecutor(ctx context.Context, id string, task *proto.Task, taskTable TaskTable) *BaseTaskExecutor

NewBaseTaskExecutor creates a new BaseTaskExecutor. see TaskExecutor.Init for why we want to use task-base to create TaskExecutor. TODO: we can refactor this part to pass task base only, but currently ADD-INDEX depends on it to init, so we keep it for now.

func (*BaseTaskExecutor) Cancel

func (e *BaseTaskExecutor) Cancel()

Cancel implements TaskExecutor.Cancel.

func (*BaseTaskExecutor) CancelRunningSubtask

func (e *BaseTaskExecutor) CancelRunningSubtask()

CancelRunningSubtask implements TaskExecutor.CancelRunningSubtask.

func (*BaseTaskExecutor) Close

func (e *BaseTaskExecutor) Close()

Close closes the TaskExecutor when all the subtasks are complete.

func (*BaseTaskExecutor) Ctx

func (e *BaseTaskExecutor) Ctx() context.Context

Ctx returns the context of the task executor. TODO: remove it when add-index.taskexecutor.Init don't depends on it.

func (*BaseTaskExecutor) GetTaskBase

func (e *BaseTaskExecutor) GetTaskBase() *proto.TaskBase

GetTaskBase implements TaskExecutor.GetTaskBase.

func (*BaseTaskExecutor) Init

Init implements the TaskExecutor interface.

func (*BaseTaskExecutor) Run

func (e *BaseTaskExecutor) Run(resource *proto.StepResource)

Run implements the TaskExecutor interface.

func (*BaseTaskExecutor) RunStep

func (e *BaseTaskExecutor) RunStep(resource *proto.StepResource) (err error)

RunStep start to fetch and run all subtasks for the step of task on the node. return if there's no subtask to run.

type EmptyStepExecutor

type EmptyStepExecutor struct {
	execute.StepExecFrameworkInfo
}

EmptyStepExecutor is an empty Executor. it can be used for the task that does not need to split into subtasks.

func (*EmptyStepExecutor) Cleanup

Cleanup implements the StepExecutor interface.

func (*EmptyStepExecutor) Init

Init implements the StepExecutor interface.

func (*EmptyStepExecutor) OnFinished

func (*EmptyStepExecutor) OnFinished(_ context.Context, _ *proto.Subtask) error

OnFinished implements the StepExecutor interface.

func (*EmptyStepExecutor) RealtimeSummary

func (*EmptyStepExecutor) RealtimeSummary() *execute.SubtaskSummary

RealtimeSummary implements the StepExecutor interface.

func (*EmptyStepExecutor) RunSubtask

RunSubtask implements the StepExecutor interface.

type Extension

type Extension interface {
	// IsIdempotent returns whether the subtask is idempotent.
	// when tidb restart, the subtask might be left in the running state.
	// if it's idempotent, the Executor can rerun the subtask, else
	// the Executor will mark the subtask as failed.
	IsIdempotent(subtask *proto.Subtask) bool
	// GetStepExecutor returns the subtask executor for the subtask.
	// Note:
	// 1. summary is the summary manager of all subtask of the same type now.
	// 2. should not retry the error from it.
	GetStepExecutor(task *proto.Task) (execute.StepExecutor, error)
	// IsRetryableError returns whether the error is transient.
	// When error is transient, the framework won't mark subtasks as failed,
	// then the TaskExecutor can load the subtask again and redo it.
	IsRetryableError(err error) bool
}

Extension extends the TaskExecutor. each task type should implement this interface.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager monitors the task table and manages the taskExecutors.

func NewManager

func NewManager(ctx context.Context, id string, taskTable TaskTable) (*Manager, error)

NewManager creates a new task executor Manager.

func (*Manager) Cancel

func (m *Manager) Cancel()

Cancel cancels the executor manager. used in test to simulate tidb node shutdown.

func (*Manager) InitMeta

func (m *Manager) InitMeta() error

InitMeta initializes the meta of the Manager. not a must-success step before start manager, manager will try to recover meta periodically.

func (*Manager) Start

func (m *Manager) Start() error

Start starts the Manager.

func (*Manager) Stop

func (m *Manager) Stop()

Stop stops the Manager.

type Pool

type Pool interface {
	Run(func()) error
	RunWithConcurrency(chan func(), uint32) error
	ReleaseAndWait()
}

Pool defines the interface of a pool.

type TaskExecutor

type TaskExecutor interface {
	// Init initializes the TaskExecutor, the returned error is fatal, it will fail
	// the task directly, so be careful what to put into it.
	// The context passing in is Manager.ctx, don't use it to init long-running routines,
	// as it will NOT be cancelled when the task is finished.
	// NOTE: do NOT depend on task meta to do initialization, as we plan to pass
	// task-base to the TaskExecutor in the future, if you need to do some initialization
	// based on task meta, do it in GetStepExecutor, as execute.StepExecutor is
	// where subtasks are actually executed.
	Init(context.Context) error
	// Run runs the task with given resource, it will try to run each step one by
	// one, if it cannot find any subtask to run for a while(10s now), it will exit,
	// so manager can free and reuse the resource.
	// we assume that all steps will have same resource usage now, will change it
	// when we support different resource usage for different steps.
	Run(resource *proto.StepResource)
	// GetTaskBase returns the task, returned value is for read only, don't change it.
	GetTaskBase() *proto.TaskBase
	// CancelRunningSubtask cancels the running subtask and change its state to `cancelled`,
	// the task executor will keep running, so we can have a context to update the
	// subtask state or keep handling revert logic.
	CancelRunningSubtask()
	// Cancel cancels the task executor, the state of running subtask is not changed.
	// it's separated with Close as Close mostly mean will wait all resource released
	// before return, but we only want its context cancelled and check whether it's
	// closed later.
	Cancel()
	// Close closes the TaskExecutor.
	Close()
	IsRetryableError(err error) bool
}

TaskExecutor is the executor for a task. Each task type should implement this interface. context tree of task execution:

Manager.ctx
└── TaskExecutor.ctx: Cancel cancels this one
   └── RunStep.ctx: CancelRunningSubtask cancels this one

type TaskTable

type TaskTable interface {
	// GetTaskExecInfoByExecID gets all task exec infos by given execID, if there's
	// no executable subtask on the execID for some task, it's not returned.
	GetTaskExecInfoByExecID(ctx context.Context, execID string) ([]*storage.TaskExecInfo, error)
	GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)
	GetTaskByID(ctx context.Context, taskID int64) (task *proto.Task, err error)
	GetTaskBaseByID(ctx context.Context, taskID int64) (task *proto.TaskBase, err error)
	// GetSubtasksByExecIDAndStepAndStates gets all subtasks by given states and execID.
	GetSubtasksByExecIDAndStepAndStates(ctx context.Context, execID string, taskID int64, step proto.Step, states ...proto.SubtaskState) ([]*proto.Subtask, error)
	GetFirstSubtaskInStates(ctx context.Context, instanceID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (*proto.Subtask, error)
	// InitMeta insert the manager information into dist_framework_meta.
	// Call it when starting task executor or in set variable operation.
	InitMeta(ctx context.Context, execID string, role string) error
	// RecoverMeta recover the manager information into dist_framework_meta.
	// Call it periodically to recover deleted meta.
	RecoverMeta(ctx context.Context, execID string, role string) error
	// StartSubtask try to update the subtask's state to running if the subtask is owned by execID.
	// If the update success, it means the execID's related task executor own the subtask.
	StartSubtask(ctx context.Context, subtaskID int64, execID string) error
	// UpdateSubtaskStateAndError update the subtask's state and error.
	UpdateSubtaskStateAndError(ctx context.Context, execID string, subtaskID int64, state proto.SubtaskState, err error) error
	// FailSubtask update the task's subtask state to failed and set the err.
	FailSubtask(ctx context.Context, execID string, taskID int64, err error) error
	// CancelSubtask update the task's subtasks' state to canceled.
	CancelSubtask(ctx context.Context, exe string, taskID int64) error
	// FinishSubtask updates the subtask meta and mark state to succeed.
	FinishSubtask(ctx context.Context, execID string, subtaskID int64, meta []byte) error
	// PauseSubtasks update subtasks state to paused.
	PauseSubtasks(ctx context.Context, execID string, taskID int64) error

	HasSubtasksInStates(ctx context.Context, execID string, taskID int64, step proto.Step, states ...proto.SubtaskState) (bool, error)
	// RunningSubtasksBack2Pending update the state of subtask which belongs to this
	// node from running to pending.
	// see subtask state machine for more detail.
	RunningSubtasksBack2Pending(ctx context.Context, subtasks []*proto.SubtaskBase) error
}

TaskTable defines the interface to access the task table.

type TaskTypeOption

type TaskTypeOption func(opts *taskTypeOptions)

TaskTypeOption is the option of TaskType.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL