fSchedule

package module
v0.14.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 11, 2024 License: MIT Imports: 27 Imported by: 0

README

FSchedule 分布式调度中心客户端

包:"github.com/farseer-go/fSchedule"

模块:fSchedule.Module

codecov Build

概述

FSchedule是一款跨语言分布式的调度中心

  • 高可用(HA):多实例的job客户端。同一个任务、同一个job实例只会被调度一次
  • 快速搭建:服务端可运行于docker或k8s下,1分钟即可把服务部署到您的生产环境中
  • 轻量级:低内存(没有客户端连接的时候130m,有任务的时候250m)、低CPU消耗,依赖少。
  • 动态执行:可定时、间隔时间、Cron、或由业务方job动态设定下次执行时间。
  • 快速上手:Farseer.Net.Job(.NET CORE)、farseer-go/fSchedule(golang),可以快速实现一个job
  • 可视化:使用FOPS,可以维护任务组,查看任务进度、耗时、日志。

FSchedule服务端:

?> 这个是FSchedule服务端的客户端组件,使得接入服务端变得非常简单。

包:"github.com/farseer-go/fSchedule"

模块:fSchedule.Module

定义任务

func job1(jobContext *fSchedule.JobContext) bool {
    return true
}

入参fSchedule.JobContext 任务上下文,包含任务的相关信息,及控制任务下一次执行时间

出参true:本次任务执行成功。false:执行失败(失败后,服务端会立即重新调度)。

添加一个任务

// isEnable:任务是否开启
// name:任务组名称(英文)
// caption:任务组标题(任务说明)
// ver:任务组版本,初始版本号必须为1
// cron:任务计划时间(间隔时间)
// startAt:任务开始时间,单位:时间戳,秒(在这个时间之后才会开始)
// job:任务执行的函数本体
func AddJob(isEnable bool, name, caption string, ver int, cron string, startAt int64, job JobFunc)

示例:

fSchedule.AddJob(true, "Hello"+strconv.Itoa(i), "测试HelloJob"+strconv.Itoa(i), 1, "0/1 * * * * ?", 1674571566, job1)

fSchedule.AddJob向服务端注册任务。需要放到模块中PostInitialize方法执行

任务组版本说明,如果需要向服务端修改任务组属性,则要在原版本号的基础下+1,否则无效。

如果原来服务端版本为3,本次想修改caption,则应将ver改为4。这时服务端才会修改,否则忽略。

完整示例


import (
	"github.com/farseer-go/fSchedule"
	"github.com/farseer-go/fs"
	"github.com/farseer-go/fs/modules"
	"strconv"
	"testing"
	"time"
)

// 启动模块
type startupModule struct {}
func (module startupModule) DependsModule() []modules.FarseerModule {
	return []modules.FarseerModule{fSchedule.Module{}}
}
func (module startupModule) PreInitialize() {}
func (module startupModule) Initialize() {}
func (module startupModule) PostInitialize() {
	// 在这里注册任务
    fSchedule.AddJob(true, "Hello1", "测试HelloJob1", 1, "0/1 * * * * ?", 1674571566, job1)
}
func (module startupModule) Shutdown() {}

// 主函数
func main() {
	fs.Initialize[startupModule]("test fSchedule")
	time.Sleep(300000 * time.Second)
}

// 执行任务
func job1(jobContext *fSchedule.JobContext) bool {
	// db.delete()... 比如清理日志数据
    return true
}

Documentation

Overview

@area /api/

该文件由fsctl route命令自动生成,请不要手动修改此文件

Index

Constants

This section is empty.

Variables

View Source
var StandardParser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)

Functions

func AddJob

func AddJob(isEnable bool, name, caption string, ver int, cronString string, job JobFunc, ops ...options)

AddJob 客户端支持的任务

func GetClient

func GetClient() *clientVO

func Kill

func Kill(taskId int64)

Kill 终止任务 @post kill

func NewClient

func NewClient()

func RegistryJob added in v0.13.0

func RegistryJob()

RegistryJob 定时10分钟,注册一次任务,防止掉线

Types

type ClientJob

type ClientJob struct {
	Name     string                                 // 任务名称
	Ver      int                                    // 任务版本
	Caption  string                                 // 任务标题
	Cron     string                                 // 任务执行表达式
	StartAt  int64                                  // 任务开始时间(时间戳秒)
	IsEnable bool                                   // 任务是否启用
	Data     collections.Dictionary[string, string] // 第一次注册时使用
	// contains filtered or unexported fields
}

func (*ClientJob) IsNil added in v0.13.0

func (receiver *ClientJob) IsNil() bool

type Job

type Job struct {
	ClientJob ClientJob
	// contains filtered or unexported fields
}

func (*Job) Run

func (receiver *Job) Run()

type JobContext

type JobContext struct {
	Id      int64                                  // 主键
	Ver     int                                    // 任务版本
	Name    string                                 // 实现Job的特性名称(客户端识别哪个实现类)
	Caption string                                 // 任务标题
	Data    collections.Dictionary[string, string] // 数据

	StartAt time.Time // 任务开始时间

	Ctx context.Context // 客户端执行时,需要检查ctx是否被Cancel
	// contains filtered or unexported fields
}

JobContext 任务在运行时,更新状态

func (*JobContext) Critical added in v0.8.0

func (receiver *JobContext) Critical(contents ...any)

Critical 打印Critical日志

func (*JobContext) Criticalf added in v0.8.0

func (receiver *JobContext) Criticalf(format string, a ...any)

Criticalf 打印Critical日志

func (*JobContext) Debug added in v0.8.0

func (receiver *JobContext) Debug(contents ...any)

Debug 打印Debug日志

func (*JobContext) Debugf added in v0.8.0

func (receiver *JobContext) Debugf(format string, a ...any)

Debugf 打印Debug日志

func (*JobContext) Error added in v0.8.0

func (receiver *JobContext) Error(contents ...any)

Error 打印Error日志

func (*JobContext) Errorf added in v0.8.0

func (receiver *JobContext) Errorf(format string, a ...any)

Errorf 打印Error日志

func (*JobContext) Info added in v0.8.0

func (receiver *JobContext) Info(contents ...any)

Info 打印Info日志

func (*JobContext) Infof added in v0.8.0

func (receiver *JobContext) Infof(format string, a ...any)

Infof 打印Info日志

func (*JobContext) Remark added in v0.14.0

func (receiver *JobContext) Remark(format string, a ...any)

Remark 报告失败原因

func (*JobContext) SetNextAt

func (receiver *JobContext) SetNextAt(t time.Time)

SetNextAt 设置下次运行时间

func (*JobContext) SetProgress

func (receiver *JobContext) SetProgress(progress int)

SetProgress 设置任务进度

func (*JobContext) Trace added in v0.8.0

func (receiver *JobContext) Trace(contents ...any)

Trace 打印Trace日志

func (*JobContext) Tracef added in v0.8.0

func (receiver *JobContext) Tracef(format string, a ...any)

Tracef 打印Trace日志

func (*JobContext) Warning added in v0.8.0

func (receiver *JobContext) Warning(contents ...any)

Warning 打印Warning日志

func (*JobContext) Warningf added in v0.8.0

func (receiver *JobContext) Warningf(format string, a ...any)

Warningf 打印Warning日志

type JobFunc

type JobFunc func(jobContext *JobContext) bool

JobFunc 客户端要执行的JOB

type Module

type Module struct {
}

func (Module) DependsModule

func (module Module) DependsModule() []modules.FarseerModule

func (Module) PostInitialize

func (module Module) PostInitialize()

func (Module) PreInitialize

func (module Module) PreInitialize()

func (Module) Shutdown

func (module Module) Shutdown()

type Option added in v0.9.0

type Option struct {
	StartAt int64                                  // 任务开始时间(时间戳秒)
	Data    collections.Dictionary[string, string] // 第一次注册时使用
}

type RegistryResponse added in v0.9.0

type RegistryResponse struct {
	ClientIp   string // 客户端IP
	ClientPort int    // 客户端端口
}

type ResourceVO

type ResourceVO struct {
	QueueCount    int     // 排队中的任务数量
	WorkCount     int     // 正在处理的任务数量
	CpuUsage      float64 // CPU百分比
	MemoryUsage   float64 // 内存百分比
	AllowSchedule bool    // 是否允许调度
}

ResourceVO 客户端资源情况

func Check

func Check(clientId int64) ResourceVO

Check 检查客户端存活 @post check

func Invoke

func Invoke(task TaskEO) ResourceVO

Invoke 下发任务 @post invoke

type TaskEO

type TaskEO struct {
	Id      int64                                  // 主键
	Caption string                                 // 任务组标题
	Name    string                                 // 实现Job的特性名称(客户端识别哪个实现类)
	StartAt time.Time                              // 开始时间
	Data    collections.Dictionary[string, string] // 本次执行任务时的Data数据
}

TaskEO 任务记录

type TaskReportDTO

type TaskReportDTO struct {
	Id           int64                                  // 主键
	Ver          int                                    // 任务版本
	Name         string                                 // 实现Job的特性名称(客户端识别哪个实现类)
	Data         collections.Dictionary[string, string] // 数据
	NextTimespan int64                                  // 下次执行时间
	Progress     int                                    // 当前进度
	Status       executeStatus.Enum                     // 执行状态
	FailRemark   string                                 // 失败原因
	ResourceVO
}

func Status

func Status(TaskId int64) TaskReportDTO

Status 查询任务状态 @post status

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL