Documentation ¶
Overview ¶
计划任务的执行
Index ¶
- type Executor
- func (executor *Executor) ExecuteJob(info *datamodels.JobExecuteInfo, c chan<- *datamodels.JobExecuteResult) (err error)
- func (executor *Executor) GetJobCategory(idOrName string) (category *datamodels.Category, err error)
- func (executor *Executor) PostCategoryToMaster(category *datamodels.Category) (*datamodels.Category, error)
- func (executor *Executor) PostJobExecuteResultToMaster(result *datamodels.JobExecuteResult) (*datamodels.JobExecuteResult, error)
- func (executor *Executor) PostJobExecuteToMaster(jobExecute *datamodels.JobExecute) (*datamodels.JobExecute, error)
- type JobExecuteLogFilter
- type JobLock
- type LogHandler
- type MongoLogHandler
- type Register
- type Scheduler
- func (scheduler *Scheduler) HandlerJobExecuteResult(result *datamodels.JobExecuteResult)
- func (scheduler *Scheduler) PushJobEvent(jobEvent *datamodels.JobEvent)
- func (scheduler *Scheduler) PushJobExecuteResult(result *datamodels.JobExecuteResult)
- func (scheduler *Scheduler) ScheduleLoop()
- func (scheduler *Scheduler) TryRunJob(jobPlan *datamodels.JobSchedulePlan) (err error)
- func (scheduler *Scheduler) TrySchedule() (scheduleAfter time.Duration)
- type Socket
- type SortLogByStartTime
- type WatchJobsHandler
- type WatchKillHandler
- type Worker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Executor ¶
type Executor struct { }
任务执行器
func (*Executor) ExecuteJob ¶
func (executor *Executor) ExecuteJob(info *datamodels.JobExecuteInfo, c chan<- *datamodels.JobExecuteResult) (err error)
执行一个任务
func (*Executor) GetJobCategory ¶
func (executor *Executor) GetJobCategory(idOrName string) (category *datamodels.Category, err error)
获取计划任务的分类信息 URL:/api/v1/category/:name Method: GET
func (*Executor) PostCategoryToMaster ¶
func (executor *Executor) PostCategoryToMaster(category *datamodels.Category) (*datamodels.Category, error)
创建分类 URL:/api/v1/category/:name Method: GET
func (*Executor) PostJobExecuteResultToMaster ¶
func (executor *Executor) PostJobExecuteResultToMaster(result *datamodels.JobExecuteResult) (*datamodels.JobExecuteResult, error)
Post发送任务执行信息到Master URL:/api/v1/job/execute/create Method: POST Data: jobExecute
func (*Executor) PostJobExecuteToMaster ¶
func (executor *Executor) PostJobExecuteToMaster(jobExecute *datamodels.JobExecute) (*datamodels.JobExecute, error)
Post发送任务执行信息到Master URL:/api/v1/job/execute/create Method: POST Data: jobExecute
type JobExecuteLogFilter ¶
type JobExecuteLogFilter struct {
Name string `bson: "name"` // job的名字
}
type JobLock ¶
type JobLock struct { ID int `json:"id"` // 锁请求的序号 Name string `json:"name"` // 锁的名称 LeaseID int64 `json:"lease_id"` // 锁对应的租约ID Password string `json:"password"` // 锁的密码 IsActive bool `json:"is_active"` // 锁是否有效中 NeedKillChan chan bool // 释放本程序的通道:当timer到期了,还未发起续租,那么就需要kill,jobLock对应的任务 // contains filtered or unexported fields }
计划任务的锁 通过httl连接发起TryLock. 通过http发起续租 通过http发起release释放锁
func (*JobLock) ReleaseLock ¶
释放锁 释放租约应该也需要传递秘钥,后续优化,这里暂时只传递锁的名字
type LogHandler ¶
type LogHandler interface { ConsumeLogsLoop() // 消费日志循环函数 AddLog(executeLog *datamodels.JobExecuteLog) error // 添加日志 Stop() // 日志处理器停止时的操作 }
日志处理的接口
type MongoLogHandler ¶
type MongoLogHandler struct { Duration int // 刷新日志的间隔(毫秒) // contains filtered or unexported fields }
日志处理器--mongo
func NewMongoLogHandler ¶
func NewMongoLogHandler(mongoConfig *common.MongoConfig) (logHandler *MongoLogHandler, err error)
func (*MongoLogHandler) AddLog ¶
func (logHandler *MongoLogHandler) AddLog(executeLog *datamodels.JobExecuteLog) (err error)
保存日志操作
func (*MongoLogHandler) ConsumeLogsLoop ¶
func (logHandler *MongoLogHandler) ConsumeLogsLoop()
消费日志循环
func (*MongoLogHandler) List ¶
func (logHandler *MongoLogHandler) List(page int, pageSize int) (logList []*datamodels.JobExecuteLog, err error)
获取日志的列表
func (*MongoLogHandler) Stop ¶
func (logHandler *MongoLogHandler) Stop()
日志处理器停止时候的操作 停止的时候,需要把日志全部写入 当worker需要停止的时候,需要调度这些
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
任务调度器
func (*Scheduler) HandlerJobExecuteResult ¶
func (scheduler *Scheduler) HandlerJobExecuteResult(result *datamodels.JobExecuteResult)
处理计划任务的结果
func (*Scheduler) PushJobEvent ¶
func (scheduler *Scheduler) PushJobEvent(jobEvent *datamodels.JobEvent)
推送任务变化事件
func (*Scheduler) PushJobExecuteResult ¶
func (scheduler *Scheduler) PushJobExecuteResult(result *datamodels.JobExecuteResult)
回传任务执行结果
func (*Scheduler) TryRunJob ¶
func (scheduler *Scheduler) TryRunJob(jobPlan *datamodels.JobSchedulePlan) (err error)
执行计划任务
func (*Scheduler) TrySchedule ¶
计算任务调度状态 会尝试执行需要执行的计划任务,并计算jobPlan的下次执行时间 计算now与所有jobPlan中最近的下次执行的时间的间隔 当间隔大于1分钟的时候,设置其为一分钟
type Socket ¶
type Socket struct { IsActive bool // 是否有效,断开的时候设置为false // contains filtered or unexported fields }
func (*Socket) SendMessage ¶
发送消息 messageType: 消息类型 data []byte: 发送小消息内容 needPacket bool: 是否需要封装一下包,有时候可自行封装
func (*Socket) SendMessageEventToMaster ¶
发送消息 messageType: 消息类型 data []byte: 发送小消息内容 needPacket bool: 是否需要封装一下包,有时候可自行封装
type SortLogByStartTime ¶
type SortLogByStartTime struct {
StartTime int `bson: "startTime"` // 根据开始时间排序
}
type WatchJobsHandler ¶
监听etcd中jobs的变化
func (*WatchJobsHandler) HandlerGetResponse ¶
func (watch *WatchJobsHandler) HandlerGetResponse(response *clientv3.GetResponse)
func (*WatchJobsHandler) HandlerWatchChan ¶
func (watch *WatchJobsHandler) HandlerWatchChan(watchChan clientv3.WatchChan)
处理watch
type WatchKillHandler ¶
func (*WatchKillHandler) HandlerGetResponse ¶
func (watch *WatchKillHandler) HandlerGetResponse(response *clientv3.GetResponse)
func (*WatchKillHandler) HandlerWatchChan ¶
func (watch *WatchKillHandler) HandlerWatchChan(watchChan clientv3.WatchChan)