proto

package
v1.1.0-beta.0...-448e302 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2025 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// TaskIDLabelName is the label name of task id.
	TaskIDLabelName = "task_id"
	// NormalPriority represents the normal priority of task.
	NormalPriority = 512
)

Variables

View Source
var (
	// EmptyMeta is the empty meta of task/subtask.
	EmptyMeta = []byte("{}")
)
View Source
var MaxConcurrentTask = 16

MaxConcurrentTask is the max concurrency of task. TODO: remove this limit later.

Functions

func Step2Str

func Step2Str(t TaskType, s Step) string

Step2Str converts step to string. it's too bad that we define step as int 🙃.

func Type2Int

func Type2Int(t TaskType) int

Type2Int converts task type to int.

Types

type Allocatable

type Allocatable struct {
	// contains filtered or unexported fields
}

Allocatable is a resource with capacity that can be allocated, it's routine safe.

func NewAllocatable

func NewAllocatable(capacity int64) *Allocatable

NewAllocatable creates a new Allocatable.

func (*Allocatable) Alloc

func (a *Allocatable) Alloc(n int64) bool

Alloc allocates v from the Allocatable.

func (*Allocatable) Capacity

func (a *Allocatable) Capacity() int64

Capacity returns the capacity of the Allocatable.

func (*Allocatable) Free

func (a *Allocatable) Free(n int64)

Free frees v from the Allocatable.

func (*Allocatable) Used

func (a *Allocatable) Used() int64

Used returns the used resource of the Allocatable.

type ManagedNode

type ManagedNode struct {
	// ID see GenerateExecID, it's named as host in the meta table.
	ID string
	// Role of the node, either "" or "background"
	// all managed node should have the same role
	Role     string
	CPUCount int
}

ManagedNode is a TiDB node that is managed by the framework.

type Modification

type Modification struct {
	Type ModificationType `json:"type"`
	To   int64            `json:"to"`
}

Modification is one modification for task.

func (Modification) String

func (m Modification) String() string

String implements fmt.Stringer interface.

type ModificationType

type ModificationType string

ModificationType is the type of task modification.

const (
	// ModifyConcurrency is the type for modifying task concurrency.
	ModifyConcurrency ModificationType = "modify_concurrency"
)

func (ModificationType) String

func (t ModificationType) String() string

String implements fmt.Stringer interface.

type ModifyParam

type ModifyParam struct {
	PrevState     TaskState      `json:"prev_state"`
	Modifications []Modification `json:"modifications"`
}

ModifyParam is the parameter for task modification.

func (*ModifyParam) String

func (p *ModifyParam) String() string

String implements fmt.Stringer interface.

type Step

type Step int64

Step is the step of task.

const (
	StepInit Step = -1
	StepDone Step = -2
)

TaskStep is the step of task. DO NOT change the value of the constants, will break backward compatibility. successfully task MUST go from StepInit to business steps, then StepDone.

const (
	StepOne   Step = 1
	StepTwo   Step = 2
	StepThree Step = 3
)

Steps of example task type.

const (
	// ImportStepImport we sort source data and ingest it into TiKV in this step.
	ImportStepImport Step = 1
	// ImportStepPostProcess we verify checksum and add index in this step.
	ImportStepPostProcess Step = 2
	// ImportStepEncodeAndSort encode source data and write sorted kv into global storage.
	ImportStepEncodeAndSort Step = 3
	// ImportStepMergeSort merge sorted kv from global storage, so we can have better
	// read performance during ImportStepWriteAndIngest.
	// depends on how much kv files are overlapped, there's might 0 subtasks
	// in this step.
	ImportStepMergeSort Step = 4
	// ImportStepWriteAndIngest write sorted kv into TiKV and ingest it.
	ImportStepWriteAndIngest Step = 5
)

Steps of IMPORT INTO, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order: - local sort: StepInit -> ImportStepImport -> ImportStepPostProcess -> StepDone - global sort: StepInit -> ImportStepEncodeAndSort -> ImportStepMergeSort -> ImportStepWriteAndIngest -> ImportStepPostProcess -> StepDone

const (
	BackfillStepReadIndex Step = 1
	// BackfillStepMergeSort only used in global sort, it will merge sorted kv from global storage, so we can have better
	// read performance during BackfillStepWriteAndIngest with global sort.
	// depends on how much kv files are overlapped.
	// When kv files overlapped less than MergeSortOverlapThreshold, there‘re no subtasks.
	BackfillStepMergeSort Step = 2

	// BackfillStepWriteAndIngest write sorted kv into TiKV and ingest it.
	BackfillStepWriteAndIngest Step = 3
)

Steps of Add Index, each step is represented by one or multiple subtasks. the initial step is StepInit(-1) steps are processed in the following order: - local sort: StepInit -> BackfillStepReadIndex -> StepDone - global sort: StepInit -> BackfillStepReadIndex -> BackfillStepMergeSort -> BackfillStepWriteAndIngest -> StepDone

type StepResource

type StepResource struct {
	CPU *Allocatable
	Mem *Allocatable
}

StepResource is the max resource that a task step can use. it's also the max resource that a subtask can use, as we run subtasks of task step in sequence.

func (*StepResource) String

func (s *StepResource) String() string

String implements Stringer interface.

type Subtask

type Subtask struct {
	SubtaskBase
	// UpdateTime is the time when the subtask is updated.
	// it can be used as subtask end time if the subtask is finished.
	// it's 0 if it hasn't started yet.
	UpdateTime time.Time
	// Meta is the metadata of subtask, should not be nil.
	// meta of different subtasks of same step must be different too.
	// NOTE: this field can be changed by StepExecutor.OnFinished method, to store
	// some result, and framework will update the subtask meta in the storage.
	// On other code path, this field should be read-only.
	Meta    []byte
	Summary string
}

Subtask represents the subtask of distribute framework. subtasks of a task are run in parallel on different nodes, but on each node, at most 1 subtask can be run at the same time, see StepExecutor too.

func NewSubtask

func NewSubtask(step Step, taskID int64, tp TaskType, execID string, concurrency int, meta []byte, ordinal int) *Subtask

NewSubtask create a new subtask.

type SubtaskBase

type SubtaskBase struct {
	ID   int64
	Step Step
	Type TaskType
	// taken from task_key of the subtask table
	TaskID int64
	State  SubtaskState
	// Concurrency is the concurrency of the subtask, should <= task's concurrency.
	// some subtasks like post-process of import into, don't consume too many resources,
	// can lower this value.
	// NOTE: currently it always equals task's concurrency, except the subtask is
	// done and task concurrency is modified.
	// NOTE: this field should normally be used for non-runtime purpose, when
	// allocating resource at runtime, use StepResource.CPU instead.
	Concurrency int
	// ExecID is the ID of target executor, right now it's the same as instance_id,
	// its value is IP:PORT, see GenerateExecID
	ExecID     string
	CreateTime time.Time
	// StartTime is the time when the subtask is started.
	// it's 0 if it hasn't started yet.
	StartTime time.Time
	// Ordinal is the ordinal of subtask, should be unique for some task and step.
	// starts from 1.
	Ordinal int
}

SubtaskBase contains the basic information of a subtask. we define this to avoid load subtask meta which might be very large into memory.

func (*SubtaskBase) IsDone

func (t *SubtaskBase) IsDone() bool

IsDone checks if the subtask is done.

func (*SubtaskBase) String

func (t *SubtaskBase) String() string

type SubtaskState

type SubtaskState string

SubtaskState is the state of subtask.

const (
	SubtaskStatePending  SubtaskState = "pending"
	SubtaskStateRunning  SubtaskState = "running"
	SubtaskStateSucceed  SubtaskState = "succeed"
	SubtaskStateFailed   SubtaskState = "failed"
	SubtaskStateCanceled SubtaskState = "canceled"
	SubtaskStatePaused   SubtaskState = "paused"
)

subtask state machine for normal subtask:

NOTE: `running` -> `pending` only happens when some node is taken as dead, so its running subtask is balanced to other node, and the subtask is idempotent, we do this to make the subtask can be scheduled to other node again, it's NOT a normal state transition.

               ┌──────────────┐
               │          ┌───┴──┐
               │ ┌───────►│paused│
               ▼ │        └──────┘
┌───────┐    ┌───┴───┐    ┌───────┐
│pending├───►│running├───►│succeed│
└───────┘    └┬──┬───┘    └───────┘
     ▲        │  │        ┌──────┐
     └────────┘  ├───────►│failed│
                 │        └──────┘
                 │        ┌────────┐
                 └───────►│canceled│
                          └────────┘

func (SubtaskState) String

func (s SubtaskState) String() string

type Task

type Task struct {
	TaskBase
	// SchedulerID is not used now.
	SchedulerID     string
	StartTime       time.Time
	StateUpdateTime time.Time
	// Meta is the metadata of task, it's read-only in most cases, but it can be
	// changed in below case, and framework will update the task meta in the storage.
	// 	- task switches to next step in Scheduler.OnNextSubtasksBatch
	// 	- on task cleanup, we might do some redaction on the meta.
	// 	- on task 'modifying', params inside the meta can be changed.
	Meta        []byte
	Error       error
	ModifyParam ModifyParam
}

Task represents the task of distributed framework. A task is abstracted as multiple steps that runs in sequence, each step contains multiple sub-tasks that runs in parallel, such as:

task
├── step1
│   ├── subtask1
│   ├── subtask2
│   └── subtask3
└── step2
    ├── subtask1
    ├── subtask2
    └── subtask3

tasks are run in the order of rank, and the rank is defined by:

priority asc, create_time asc, id asc.

type TaskBase

type TaskBase struct {
	ID    int64
	Key   string
	Type  TaskType
	State TaskState
	Step  Step
	// Priority is the priority of task, the smaller value means the higher priority.
	// valid range is [1, 1024], default is NormalPriority.
	Priority int
	// Concurrency controls the max resource usage of the task, i.e. the max number
	// of slots the task can use on each node.
	Concurrency int
	// TargetScope indicates that the task should be running on tidb nodes which
	// contain the tidb_service_scope=TargetScope label.
	// To be compatible with previous version, if it's "" or "background", the task try run on nodes of "background" scope,
	// if there is no such nodes, will try nodes of "" scope.
	TargetScope string
	CreateTime  time.Time
}

TaskBase contains the basic information of a task. we define this to avoid load task meta which might be very large into memory.

func (*TaskBase) Compare

func (t *TaskBase) Compare(other *TaskBase) int

Compare compares two tasks by task rank. returns < 0 represents rank of t is higher than 'other'.

func (*TaskBase) CompareTask

func (t *TaskBase) CompareTask(other *Task) int

CompareTask a wrapper of Compare.

func (*TaskBase) IsDone

func (t *TaskBase) IsDone() bool

IsDone checks if the task is done.

func (*TaskBase) String

func (t *TaskBase) String() string

String implements fmt.Stringer interface.

type TaskState

type TaskState string

TaskState is the state of task.

const (
	TaskStatePending    TaskState = "pending"
	TaskStateRunning    TaskState = "running"
	TaskStateSucceed    TaskState = "succeed"
	TaskStateFailed     TaskState = "failed"
	TaskStateReverting  TaskState = "reverting"
	TaskStateReverted   TaskState = "reverted"
	TaskStateCancelling TaskState = "cancelling"
	TaskStatePausing    TaskState = "pausing"
	TaskStatePaused     TaskState = "paused"
	TaskStateResuming   TaskState = "resuming"
	TaskStateModifying  TaskState = "modifying"
)

task state machine

Note: if a task fails during running, it will end with `reverted` state. The `failed` state is used to mean the framework cannot run the task, such as invalid task type, scheduler init error(fatal), etc.

normal execution state transition:

┌──────┐
│failed│
└──────┘
   ▲
┌──┴────┐     ┌───────┐     ┌────────┐
│pending├────►│running├────►│succeed │
└──┬────┘     └──┬┬───┘     └────────┘
   │             ││         ┌─────────┐     ┌────────┐
   │             │└────────►│reverting├────►│reverted│
   │             ▼          └─────────┘     └────────┘
   │          ┌──────────┐    ▲
   └─────────►│cancelling├────┘
              └──────────┘

pause/resume state transition: as we don't know the state of the task before `paused`, so the state after `resuming` is always `running`.

┌───────┐
│pending├──┐
└───────┘  │     ┌───────┐       ┌──────┐
           ├────►│pausing├──────►│paused│
┌───────┐  │     └───────┘       └───┬──┘
│running├──┘                         │
└───▲───┘        ┌────────┐          │
    └────────────┤resuming│◄─────────┘
                 └────────┘

modifying state transition:

┌───────┐
│pending├──┐
└───────┘  │
┌───────┐  │     ┌─────────┐
│running├──┼────►│modifying├────► original state
└───────┘  │     └─────────┘
┌───────┐  │
│paused ├──┘
└───────┘

func (TaskState) CanMoveToModifying

func (s TaskState) CanMoveToModifying() bool

CanMoveToModifying checks if current state can move to 'modifying' state.

func (TaskState) String

func (s TaskState) String() string

type TaskType

type TaskType string

TaskType is the type of task.

const (
	// TaskTypeExample is TaskType of Example, it's for test.
	TaskTypeExample TaskType = "Example"
	// ImportInto is TaskType of ImportInto.
	ImportInto TaskType = "ImportInto"
	// Backfill is TaskType of add index Backfilling process.
	Backfill TaskType = "backfill"
)

func Int2Type

func Int2Type(i int) TaskType

Int2Type converts int to task type.

func (TaskType) String

func (t TaskType) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL