Documentation ¶
Index ¶
- Constants
- Variables
- func Step2Str(t TaskType, s Step) string
- func Type2Int(t TaskType) int
- type Allocatable
- type ManagedNode
- type Modification
- type ModificationType
- type ModifyParam
- type Step
- type StepResource
- type Subtask
- type SubtaskBase
- type SubtaskState
- type Task
- type TaskBase
- type TaskState
- type TaskType
Constants ¶
const ( // TaskIDLabelName is the label name of task id. TaskIDLabelName = "task_id" // NormalPriority represents the normal priority of task. NormalPriority = 512 )
Variables ¶
var ( // EmptyMeta is the empty meta of task/subtask. EmptyMeta = []byte("{}") )
var MaxConcurrentTask = 16
MaxConcurrentTask is the max concurrency of task. TODO: remove this limit later.
Functions ¶
Types ¶
type Allocatable ¶
type Allocatable struct {
// contains filtered or unexported fields
}
Allocatable is a resource with capacity that can be allocated, it's routine safe.
func NewAllocatable ¶
func NewAllocatable(capacity int64) *Allocatable
NewAllocatable creates a new Allocatable.
func (*Allocatable) Alloc ¶
func (a *Allocatable) Alloc(n int64) bool
Alloc allocates v from the Allocatable.
func (*Allocatable) Capacity ¶
func (a *Allocatable) Capacity() int64
Capacity returns the capacity of the Allocatable.
func (*Allocatable) Used ¶
func (a *Allocatable) Used() int64
Used returns the used resource of the Allocatable.
type ManagedNode ¶
type ManagedNode struct { // ID see GenerateExecID, it's named as host in the meta table. ID string // Role of the node, either "" or "background" // all managed node should have the same role Role string CPUCount int }
ManagedNode is a TiDB node that is managed by the framework.
type Modification ¶
type Modification struct { Type ModificationType `json:"type"` To int64 `json:"to"` }
Modification is one modification for task.
func (Modification) String ¶
func (m Modification) String() string
String implements fmt.Stringer interface.
type ModificationType ¶
type ModificationType string
ModificationType is the type of task modification.
const ( // ModifyConcurrency is the type for modifying task concurrency. ModifyConcurrency ModificationType = "modify_concurrency" )
func (ModificationType) String ¶
func (t ModificationType) String() string
String implements fmt.Stringer interface.
type ModifyParam ¶
type ModifyParam struct { PrevState TaskState `json:"prev_state"` Modifications []Modification `json:"modifications"` }
ModifyParam is the parameter for task modification.
func (*ModifyParam) String ¶
func (p *ModifyParam) String() string
String implements fmt.Stringer interface.
type Step ¶
type Step int64
Step is the step of task.
TaskStep is the step of task. DO NOT change the value of the constants, will break backward compatibility. successfully task MUST go from StepInit to business steps, then StepDone.
const ( // ImportStepImport we sort source data and ingest it into TiKV in this step. ImportStepImport Step = 1 // ImportStepPostProcess we verify checksum and add index in this step. ImportStepPostProcess Step = 2 // ImportStepEncodeAndSort encode source data and write sorted kv into global storage. ImportStepEncodeAndSort Step = 3 // ImportStepMergeSort merge sorted kv from global storage, so we can have better // read performance during ImportStepWriteAndIngest. // depends on how much kv files are overlapped, there's might 0 subtasks // in this step. ImportStepMergeSort Step = 4 // ImportStepWriteAndIngest write sorted kv into TiKV and ingest it. ImportStepWriteAndIngest Step = 5 )
Steps of IMPORT INTO, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order: - local sort: StepInit -> ImportStepImport -> ImportStepPostProcess -> StepDone - global sort: StepInit -> ImportStepEncodeAndSort -> ImportStepMergeSort -> ImportStepWriteAndIngest -> ImportStepPostProcess -> StepDone
const ( BackfillStepReadIndex Step = 1 // BackfillStepMergeSort only used in global sort, it will merge sorted kv from global storage, so we can have better // read performance during BackfillStepWriteAndIngest with global sort. // depends on how much kv files are overlapped. // When kv files overlapped less than MergeSortOverlapThreshold, there‘re no subtasks. BackfillStepMergeSort Step = 2 // BackfillStepWriteAndIngest write sorted kv into TiKV and ingest it. BackfillStepWriteAndIngest Step = 3 )
Steps of Add Index, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order: - local sort: StepInit -> BackfillStepReadIndex -> StepDone - global sort: StepInit -> BackfillStepReadIndex -> BackfillStepMergeSort -> BackfillStepWriteAndIngest -> StepDone
type StepResource ¶
type StepResource struct { CPU *Allocatable Mem *Allocatable }
StepResource is the max resource that a task step can use. it's also the max resource that a subtask can use, as we run subtasks of task step in sequence.
func (*StepResource) String ¶
func (s *StepResource) String() string
String implements Stringer interface.
type Subtask ¶
type Subtask struct { SubtaskBase // UpdateTime is the time when the subtask is updated. // it can be used as subtask end time if the subtask is finished. // it's 0 if it hasn't started yet. UpdateTime time.Time // Meta is the metadata of subtask, should not be nil. // meta of different subtasks of same step must be different too. // NOTE: this field can be changed by StepExecutor.OnFinished method, to store // some result, and framework will update the subtask meta in the storage. // On other code path, this field should be read-only. Meta []byte Summary string }
Subtask represents the subtask of distribute framework. subtasks of a task are run in parallel on different nodes, but on each node, at most 1 subtask can be run at the same time, see StepExecutor too.
type SubtaskBase ¶
type SubtaskBase struct { ID int64 Step Step Type TaskType // taken from task_key of the subtask table TaskID int64 State SubtaskState // Concurrency is the concurrency of the subtask, should <= task's concurrency. // some subtasks like post-process of import into, don't consume too many resources, // can lower this value. // NOTE: currently it always equals task's concurrency, except the subtask is // done and task concurrency is modified. // NOTE: this field should normally be used for non-runtime purpose, when // allocating resource at runtime, use StepResource.CPU instead. Concurrency int // ExecID is the ID of target executor, right now it's the same as instance_id, // its value is IP:PORT, see GenerateExecID ExecID string CreateTime time.Time // StartTime is the time when the subtask is started. // it's 0 if it hasn't started yet. StartTime time.Time // Ordinal is the ordinal of subtask, should be unique for some task and step. // starts from 1. Ordinal int }
SubtaskBase contains the basic information of a subtask. we define this to avoid load subtask meta which might be very large into memory.
func (*SubtaskBase) IsDone ¶
func (t *SubtaskBase) IsDone() bool
IsDone checks if the subtask is done.
func (*SubtaskBase) String ¶
func (t *SubtaskBase) String() string
type SubtaskState ¶
type SubtaskState string
SubtaskState is the state of subtask.
const ( SubtaskStatePending SubtaskState = "pending" SubtaskStateRunning SubtaskState = "running" SubtaskStateSucceed SubtaskState = "succeed" SubtaskStateFailed SubtaskState = "failed" SubtaskStateCanceled SubtaskState = "canceled" SubtaskStatePaused SubtaskState = "paused" )
subtask state machine for normal subtask:
NOTE: `running` -> `pending` only happens when some node is taken as dead, so its running subtask is balanced to other node, and the subtask is idempotent, we do this to make the subtask can be scheduled to other node again, it's NOT a normal state transition.
┌──────────────┐ │ ┌───┴──┐ │ ┌───────►│paused│ ▼ │ └──────┘ ┌───────┐ ┌───┴───┐ ┌───────┐ │pending├───►│running├───►│succeed│ └───────┘ └┬──┬───┘ └───────┘ ▲ │ │ ┌──────┐ └────────┘ ├───────►│failed│ │ └──────┘ │ ┌────────┐ └───────►│canceled│ └────────┘
func (SubtaskState) String ¶
func (s SubtaskState) String() string
type Task ¶
type Task struct { TaskBase // SchedulerID is not used now. SchedulerID string StartTime time.Time StateUpdateTime time.Time // Meta is the metadata of task, it's read-only in most cases, but it can be // changed in below case, and framework will update the task meta in the storage. // - task switches to next step in Scheduler.OnNextSubtasksBatch // - on task cleanup, we might do some redaction on the meta. // - on task 'modifying', params inside the meta can be changed. Meta []byte Error error ModifyParam ModifyParam }
Task represents the task of distributed framework. A task is abstracted as multiple steps that runs in sequence, each step contains multiple sub-tasks that runs in parallel, such as:
task ├── step1 │ ├── subtask1 │ ├── subtask2 │ └── subtask3 └── step2 ├── subtask1 ├── subtask2 └── subtask3
tasks are run in the order of rank, and the rank is defined by:
priority asc, create_time asc, id asc.
type TaskBase ¶
type TaskBase struct { ID int64 Key string Type TaskType State TaskState Step Step // Priority is the priority of task, the smaller value means the higher priority. // valid range is [1, 1024], default is NormalPriority. Priority int // Concurrency controls the max resource usage of the task, i.e. the max number // of slots the task can use on each node. Concurrency int // TargetScope indicates that the task should be running on tidb nodes which // contain the tidb_service_scope=TargetScope label. // To be compatible with previous version, if it's "" or "background", the task try run on nodes of "background" scope, // if there is no such nodes, will try nodes of "" scope. TargetScope string CreateTime time.Time }
TaskBase contains the basic information of a task. we define this to avoid load task meta which might be very large into memory.
func (*TaskBase) Compare ¶
Compare compares two tasks by task rank. returns < 0 represents rank of t is higher than 'other'.
func (*TaskBase) CompareTask ¶
CompareTask a wrapper of Compare.
type TaskState ¶
type TaskState string
TaskState is the state of task.
const ( TaskStatePending TaskState = "pending" TaskStateRunning TaskState = "running" TaskStateSucceed TaskState = "succeed" TaskStateFailed TaskState = "failed" TaskStateReverting TaskState = "reverting" TaskStateReverted TaskState = "reverted" TaskStateCancelling TaskState = "cancelling" TaskStatePausing TaskState = "pausing" TaskStatePaused TaskState = "paused" TaskStateResuming TaskState = "resuming" TaskStateModifying TaskState = "modifying" )
task state machine
Note: if a task fails during running, it will end with `reverted` state. The `failed` state is used to mean the framework cannot run the task, such as invalid task type, scheduler init error(fatal), etc.
normal execution state transition:
┌──────┐ │failed│ └──────┘ ▲ ┌──┴────┐ ┌───────┐ ┌────────┐ │pending├────►│running├────►│succeed │ └──┬────┘ └──┬┬───┘ └────────┘ │ ││ ┌─────────┐ ┌────────┐ │ │└────────►│reverting├────►│reverted│ │ ▼ └─────────┘ └────────┘ │ ┌──────────┐ ▲ └─────────►│cancelling├────┘ └──────────┘
pause/resume state transition: as we don't know the state of the task before `paused`, so the state after `resuming` is always `running`.
┌───────┐ │pending├──┐ └───────┘ │ ┌───────┐ ┌──────┐ ├────►│pausing├──────►│paused│ ┌───────┐ │ └───────┘ └───┬──┘ │running├──┘ │ └───▲───┘ ┌────────┐ │ └────────────┤resuming│◄─────────┘ └────────┘
modifying state transition:
┌───────┐ │pending├──┐ └───────┘ │ ┌───────┐ │ ┌─────────┐ │running├──┼────►│modifying├────► original state └───────┘ │ └─────────┘ ┌───────┐ │ │paused ├──┘ └───────┘
func (TaskState) CanMoveToModifying ¶
CanMoveToModifying checks if current state can move to 'modifying' state.