Documentation
¶
Overview ¶
调度模块,负责从元数据库读取并解析调度信息。 将需要执行的任务发送给执行模块,并读取返回信息。
Index ¶
- func CheckErr(info string, err error)
- func Copy(copy_to interface{}, copy_from interface{}) (err error)
- func GetNow() time.Time
- func PrintErr(info string, err error)
- func Restore(batchId string, scdId int64) (err error)
- func TruncDate(cyc string, now time.Time) time.Time
- type ExecJob
- type ExecSchedule
- func (es *ExecSchedule) InitExecSchedule() (err error)
- func (s *ExecSchedule) Log() (err error)
- func (es *ExecSchedule) Pause()
- func (es *ExecSchedule) Run()
- func (es *ExecSchedule) RunTasks() (err error)
- func (es *ExecSchedule) Start() (err error)
- func (es *ExecSchedule) TaskDone(et *ExecTask) (finish bool, err error)
- type ExecTask
- type GlobalConfigStruct
- type Job
- type Reply
- type Schedule
- func (s *Schedule) Add() error
- func (s *Schedule) AddJob(job *Job) error
- func (s *Schedule) AddScheduleStart() error
- func (s *Schedule) AddTask(task *Task) error
- func (s *Schedule) Delete() error
- func (s *Schedule) DeleteJob(id int64) error
- func (s *Schedule) DeleteTask(id int64) error
- func (s *Schedule) GetJobById(id int64) (*Job, error)
- func (s *Schedule) GetTaskById(id int64) *Task
- func (s *Schedule) InitSchedule() error
- func (s *Schedule) Timer()
- func (s *Schedule) UpdateJob(job *Job) error
- func (s *Schedule) UpdateSchedule() error
- type ScheduleManager
- func (sl *ScheduleManager) AddExecSchedule(es *ExecSchedule)
- func (sl *ScheduleManager) AddSchedule(s *Schedule) error
- func (sl *ScheduleManager) DeleteSchedule(id int64) error
- func (sl *ScheduleManager) GetScheduleById(id int64) *Schedule
- func (sl *ScheduleManager) InitScheduleList()
- func (sl *ScheduleManager) RemoveExecSchedule(batchId string)
- func (sl *ScheduleManager) StartListener()
- func (sl *ScheduleManager) StartScheduleById(id int64) error
- type Task
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Copy ¶
func Copy(copy_to interface{}, copy_from interface{}) (err error)
Copy复制对象 来自github.com/jinzhu/copier
Types ¶
type ExecJob ¶
type ExecJob struct { // contains filtered or unexported fields } // }}}
作业执行信息结构
func ExecJobWarper ¶
根据传入的batchId和Job参数来构建一个调度的执行结构,并返回。
func (*ExecJob) InitExecJob ¶
func (ej *ExecJob) InitExecJob(es *ExecSchedule) (err error)
初始化作业执行链,并返回。
type ExecSchedule ¶
type ExecSchedule struct { // contains filtered or unexported fields } // }}}
调度执行信息结构
func ExecScheduleWarper ¶
func ExecScheduleWarper(s *Schedule) *ExecSchedule
根据传入的Schedule参数来构建一个调度的执行结构,并返回。
func (*ExecSchedule) InitExecSchedule ¶
func (es *ExecSchedule) InitExecSchedule() (err error)
初始化调度的执行结构,使之包含完整的执行链。
func (*ExecSchedule) Run ¶
func (es *ExecSchedule) Run()
ExecSchedule.Run()方法执行调度任务。 过程中会维护一个Chan *ExecTask类型变量staskChan,用来传递执行完成的Task。 通过遍历Schedule下的全部Task,找出可执行的Task(依赖列表为空的Task),启动线程执行task.Run 方法,并将staskChan传给它。当Task执行结束后会把自己放入staskChan中,处理的另一部分监控着 staskChan,从其中取出执行完毕的task后,会从其它任务的依赖列表中将已执行完毕的task删除, 并重新找出依赖列表为空的task,启动线程运行它的Run方法。 全部执行结束后,设置Schedule的下次启动时间。
type ExecTask ¶
type ExecTask struct { // contains filtered or unexported fields } // }}}
任务执行信息结构
func ExecTaskWarper ¶
根据传入的batchId和Job参数来构建一个调度的执行结构,并返回。
func (*ExecTask) InitExecTask ¶
func (et *ExecTask) InitExecTask(es *ExecSchedule) error
初始化Task执行结构
type GlobalConfigStruct ¶
type GlobalConfigStruct struct { L *logrus.Logger //log对象 HiveConn *sql.DB //元数据库链接 LogConn *sql.DB //日志数据库链接 ManagerPort string //管理模块的web服务端口 Port string //Schedule与Worker模块通信端口 Schedules *ScheduleManager //包含全部Schedule列表的结构 } // }}}
GlobalConfigStruct结构中定义了程序中的一些配置信息
type Job ¶
type Job struct { Id int64 //作业ID ScheduleId int64 //调度ID ScheduleCyc string //调度周期 Name string //作业名称 Desc string //作业说明 PreJobId int64 //上级作业ID PreJob *Job `json:"-"` //上级作业 NextJobId int64 //下级作业ID NextJob *Job `json:"-"` //下级作业 Tasks map[string]*Task //作业中的任务 TaskCnt int //调度中任务数量 CreateUserId int64 //创建人 CreateTime time.Time //创人 ModifyUserId int64 //修改人 ModifyTime time.Time //修改时间 } // }}}
作业信息结构
func (*Job) InitJob ¶
根据Job.Id初始化Job结构,从元数据库获取Job的基本信息初始化后 继续初始化Job所属的Task列表,同时递归调用自身,初始化下级Job结构 失败返回error信息。
func (*Job) InitTasksForJob ¶
初始化Job下的Tasks信息,从元数据库取到Job下所有的TaskId后 调用方法初始化Task并加至Job的Tasks成员中,同时也添加到全局Tasks列表 出错返回错误信息
func (*Job) UpdateTask ¶
UpdateTask更新Job中指定Task的信息。 它会根据参数查找本Job下符合的Task,找到后更新信息 并调用Task的add方法进行持久化操作。
type Schedule ¶
type Schedule struct { Id int64 //调度ID Name string //调度名称 Count int8 //调度次数 Cyc string //调度周期 StartSecond []time.Duration //启动时间 StartMonth []int //启动月份 NextStart time.Time //下次启动时间 TimeOut int64 //最大执行时间 JobId int64 //作业ID Job *Job //作业 Jobs []*Job //作业列表 Tasks []*Task `json:"-"` //任务列表 Desc string //调度说明 JobCnt int //调度中作业数量 TaskCnt int //调度中任务数量 CreateUserId int64 //创建人 CreateTime time.Time //创人 ModifyUserId int64 //修改人 ModifyTime time.Time //修改时间 // contains filtered or unexported fields } // }}}
调度信息结构
func (*Schedule) AddJob ¶
在调度中添加一个Job,AddJob会接收传入的Job类型的参数,并调用它的 Add()方法进行持久化操作。成功后把它添加到调度链中,添加时若调度 下无Job则将Job直接添加到调度中,否则添加到调度中的任务链末端。
func (*Schedule) AddScheduleStart ¶
addStart将Schedule的启动列表持久化到数据库 添加前先调用delStart方法将Schedule中的原有启动列表清空 需要注意的是:内存中的启动列表单位为纳秒,存储前需要转成秒 若成功则开始添加,失败返回err信息
func (*Schedule) DeleteJob ¶
DeleteJob删除调度中最后一个Job,它会接收传入的Job Id,并查看是否 调度中最后一个Job,是,检查Job下有无Task,无,则执行删除操作,完成 后,将该Job的前一个Job的nextJob指针置0,更新调度信息。 出错或不符条件则返回error信息
func (*Schedule) DeleteTask ¶
DeleteTask方法用来删除指定id的Task。首先会根据传入参数在Schedule的Tasks列 表中查出对应的Task。然后将其从Tasks列表中去除,将其从所属Job中去除,调用 Task的Delete方法删除Task的依赖关系,完成后删除元数据库的信息。 没找到对应Task或删除失败,返回error信息。
func (*Schedule) GetJobById ¶
GetJobById遍历Jobs列表,返回调度中指定Id的Job,若没找到返回nil
func (*Schedule) GetTaskById ¶
GetTaskById根据传入的id查找Tasks中对应的Task,没有则返回nil。
func (*Schedule) InitSchedule ¶
从元数据库初始化Schedule结构,先从元数据库获取Schedule的信息,完成后 根据其中的Jobid继续从元数据库读取job信息,并初始化。完成后继续初始化下级Job, 同时将初始化完成的Job和Task添加到Schedule的Jobs、Tasks成员中。
func (*Schedule) Timer ¶
func (s *Schedule) Timer()
按时启动Schedule,Timer中会根据Schedule的周期以及启动时间计算下次 启动的时间,并依据此设置一个定时器按时唤醒,Schedule唤醒后,会重新 从元数据库初始化一下信息,生成执行结构ExecSchedule,执行其Run方法
func (*Schedule) UpdateJob ¶
UpdateJob用来在调度中添加一个Job UpdateJob会接收传入的Job类型的参数,修改调度中对应的Job信息,完成后 调用Job自身的update方法进行持久化操作。
func (*Schedule) UpdateSchedule ¶
UpdateSchedule方法会将传入参数的信息更新到Schedule结构并持久化到数据库中 在持久化之前会调用addStart方法将启动列表持久化
type ScheduleManager ¶
type ScheduleManager struct { ScheduleList []*Schedule //全部的调度列表 ExecScheduleList map[string]*ExecSchedule //当前执行的调度列表 Global *GlobalConfigStruct //配置信息 } // }}}
ScheduleManager通过成员ScheduleList持有全部的Schedule。 并提供获取、增加、删除以及启动、停止Schedule的功能。
func (*ScheduleManager) AddExecSchedule ¶
func (sl *ScheduleManager) AddExecSchedule(es *ExecSchedule)
增加一个调度执行结构
func (*ScheduleManager) AddSchedule ¶
func (sl *ScheduleManager) AddSchedule(s *Schedule) error
增加Schedule,将参数中的Schedule加入的列表中,并调用其Add方法持久化。
func (*ScheduleManager) DeleteSchedule ¶
func (sl *ScheduleManager) DeleteSchedule(id int64) error
从当前ScheduleList列表中移除指定id的Schedule。 完成后,调用Schedule自身的Delete方法,删除其中的Job、Task信息并做持久化操作。 失败返回error信息
func (*ScheduleManager) GetScheduleById ¶
func (sl *ScheduleManager) GetScheduleById(id int64) *Schedule
查找当前ScheduleList列表中指定id的Schedule,并返回。 查不到返回nil
func (*ScheduleManager) InitScheduleList ¶
func (sl *ScheduleManager) InitScheduleList()
初始化ScheduleList,设置全局变量g
func (*ScheduleManager) RemoveExecSchedule ¶
func (sl *ScheduleManager) RemoveExecSchedule(batchId string)
移除一个调度执行结构
func (*ScheduleManager) StartListener ¶
func (sl *ScheduleManager) StartListener()
开始监听Schedule,遍历列表中的Schedule并启动它的Timer方法。
func (*ScheduleManager) StartScheduleById ¶
func (sl *ScheduleManager) StartScheduleById(id int64) error
启动指定的Schedule,从ScheduleList中获取到指定id的Schedule后,从元数据库获取 Schedule的信息初始化一下调度链,然后调用它自身的Timer方法,启动监听。 失败返回error信息。
type Task ¶
type Task struct { Id int64 // 任务的ID Address string // 任务的执行地址 Name string // 任务名称 TaskType int64 // 任务类型 ScheduleCyc string //调度周期 TaskCyc string //调度周期 StartSecond time.Duration //周期内启动时间 Cmd string // 任务执行的命令或脚本、函数名等。 Desc string //任务说明 TimeOut int64 // 设定超时时间,0表示不做超时限制。单位秒 Param []string // 任务的参数信息 Attr map[string]string // 任务的属性信息 JobId int64 //所属作业ID RelTasksId []int64 //依赖的任务Id RelTasks map[string]*Task //`json:"-"` //依赖的任务 RelTaskCnt int64 //依赖的任务数量 CreateUserId int64 //创建人 CreateTime time.Time //创人 ModifyUserId int64 //修改人 ModifyTime time.Time //修改时间 } // }}}
任务信息结构