task

package
v5.0.0-release+incompa... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2018 License: LGPL-3.0 Imports: 24 Imported by: 6

README

任务调度设计

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

Scheduler 调度器

func (*Scheduler) Next

func (s *Scheduler) Next() (*job.Job, error)

Next 下一个调度对象

func (*Scheduler) Stop

func (s *Scheduler) Stop()

Stop 停止

type TaskEngine

type TaskEngine struct {
	// contains filtered or unexported fields
}

TaskEngine 任务引擎 处理任务的执行,结果处理,任务自动调度 TODO:执行记录清理工作

func CreateTaskEngine

func CreateTaskEngine(nodeCluster *node.Cluster, node *nodeclient.HostNode) *TaskEngine

CreateTaskEngine 创建task管理引擎

func (*TaskEngine) AddGroupConfig

func (t *TaskEngine) AddGroupConfig(groupID string, configs map[string]string)

AddGroupConfig 添加组会话配置

func (*TaskEngine) AddTask

func (t *TaskEngine) AddTask(task *model.Task) error

AddTask 添加task 新添加task

func (*TaskEngine) CacheTask

func (t *TaskEngine) CacheTask(task *model.Task)

CacheTask 缓存task

func (*TaskEngine) DependRun

func (t *TaskEngine) DependRun(node string, task, depTask *model.Task, Strategy string) (bool, error)

DependRun 验证依赖任务执行情况

func (*TaskEngine) DependsRun

func (t *TaskEngine) DependsRun(node string, task *model.Task) (bool, error)

DependsRun DependRun

func (*TaskEngine) GetJob

func (t *TaskEngine) GetJob(hash string) *job.Job

GetJob 获取已经存在的job

func (*TaskEngine) GetTask

func (t *TaskEngine) GetTask(taskID string) *model.Task

GetTask gettask

func (*TaskEngine) GetTaskGroup

func (t *TaskEngine) GetTaskGroup(taskGroupID string) *model.TaskGroup

GetTaskGroup 获取taskgroup

func (*TaskEngine) GetValidationCriteria

func (t *TaskEngine) GetValidationCriteria(task *model.Task) (vas []ValidationCriteria)

GetValidationCriteria 获取调度必要条件

func (*TaskEngine) LoadStaticTask

func (t *TaskEngine) LoadStaticTask()

LoadStaticTask 从文件加载task TODO:动态加载

func (*TaskEngine) PutSchedul

func (t *TaskEngine) PutSchedul(taskID string, nodeID string) (err error)

PutSchedul 发布调度需求,即定义task的某个执行节点 taskID+nodeID = 一个调度单位,保证不重复 node不能为空

func (*TaskEngine) RemoveTask

func (t *TaskEngine) RemoveTask(task *model.Task)

RemoveTask 从缓存移除task

func (*TaskEngine) ScheduleGroup

func (t *TaskEngine) ScheduleGroup(nextGroups *model.TaskGroup, node string) error

ScheduleGroup 调度执行指定task

func (*TaskEngine) Start

func (t *TaskEngine) Start(errchan chan error) error

Start start

func (*TaskEngine) Stop

func (t *TaskEngine) Stop()

Stop task engine stop

func (*TaskEngine) StopTask

func (t *TaskEngine) StopTask(task *model.Task, node string)

StopTask 停止任务,即删除任务对应的JOB

func (*TaskEngine) UpdateGroup

func (t *TaskEngine) UpdateGroup(group *model.TaskGroup)

UpdateGroup 更新taskgroup

func (*TaskEngine) UpdateJob

func (t *TaskEngine) UpdateJob(jb *job.Job)

UpdateJob 持久化增加or更新job

func (*TaskEngine) UpdateJobConfig

func (t *TaskEngine) UpdateJobConfig(jb *job.Job, groupID string) error

UpdateJobConfig 更新job的配置 解析可赋值变量 ${XXX}

func (*TaskEngine) UpdateTask

func (t *TaskEngine) UpdateTask(task *model.Task)

UpdateTask 更新task

type ValidationCriteria

type ValidationCriteria func(string, *model.Task) (bool, error)

ValidationCriteria 在某个节点执行任务,行不行

var AllCouldRun ValidationCriteria = func(string, *model.Task) (bool, error) {
	return true, nil
}

AllCouldRun 可以执行

var ModeRun ValidationCriteria = func(node string, task *model.Task) (bool, error) {
	if task.RunMode == "OnlyOnce" {
		if status, ok := task.Status[node]; ok {
			if status.CompleStatus == "Success" {
				return false, fmt.Errorf("this job In violation of the task runmode")
			}
		}
	}
	return true, nil
}

ModeRun 验证任务执行策略

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL