fSchedule

package module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 25, 2023 License: MIT Imports: 22 Imported by: 0

README

FSchedule 分布式调度中心客户端

包:"github.com/farseer-go/fSchedule"

模块:fSchedule.Module

codecov Build

概述

FSchedule是一款跨语言分布式的调度中心

  • 高可用(HA):多实例的job客户端。同一个任务、同一个job实例只会被调度一次
  • 快速搭建:服务端可运行于docker或k8s下,1分钟即可把服务部署到您的生产环境中
  • 轻量级:低内存(没有客户端连接的时候130m,有任务的时候250m)、低CPU消耗,依赖少。
  • 动态执行:可定时、间隔时间、Cron、或由业务方job动态设定下次执行时间。
  • 快速上手:Farseer.Net.Job(.NET CORE)、farseer-go/fSchedule(golang),可以快速实现一个job
  • 可视化:使用FOPS,可以维护任务组,查看任务进度、耗时、日志。

FSchedule服务端:

?> 这个是FSchedule服务端的客户端组件,使得接入服务端变得非常简单。

包:"github.com/farseer-go/fSchedule"

模块:fSchedule.Module

定义任务

func job1(jobContext *fSchedule.JobContext) bool {
    return true
}

入参fSchedule.JobContext 任务上下文,包含任务的相关信息,及控制任务下一次执行时间

出参true:本次任务执行成功。false:执行失败(失败后,服务端会立即重新调度)。

添加一个任务

// isEnable:任务是否开启
// name:任务组名称(英文)
// caption:任务组标题(任务说明)
// ver:任务组版本,初始版本号必须为1
// cron:任务计划时间(间隔时间)
// startAt:任务开始时间,单位:时间戳,秒(在这个时间之后才会开始)
// job:任务执行的函数本体
func AddJob(isEnable bool, name, caption string, ver int, cron string, startAt int64, job JobFunc)

示例:

fSchedule.AddJob(true, "Hello"+strconv.Itoa(i), "测试HelloJob"+strconv.Itoa(i), 1, "0/1 * * * * ?", 1674571566, job1)

fSchedule.AddJob向服务端注册任务。需要放到模块中PostInitialize方法执行

任务组版本说明,如果需要向服务端修改任务组属性,则要在原版本号的基础下+1,否则无效。

如果原来服务端版本为3,本次想修改caption,则应将ver改为4。这时服务端才会修改,否则忽略。

完整示例


import (
	"github.com/farseer-go/fSchedule"
	"github.com/farseer-go/fs"
	"github.com/farseer-go/fs/modules"
	"strconv"
	"testing"
	"time"
)

// 启动模块
type startupModule struct {}
func (module startupModule) DependsModule() []modules.FarseerModule {
	return []modules.FarseerModule{fSchedule.Module{}}
}
func (module startupModule) PreInitialize() {}
func (module startupModule) Initialize() {}
func (module startupModule) PostInitialize() {
	// 在这里注册任务
    fSchedule.AddJob(true, "Hello1", "测试HelloJob1", 1, "0/1 * * * * ?", 1674571566, job1)
}
func (module startupModule) Shutdown() {}

// 主函数
func main() {
	fs.Initialize[startupModule]("test fSchedule")
	time.Sleep(300000 * time.Second)
}

// 执行任务
func job1(jobContext *fSchedule.JobContext) bool {
	// db.delete()... 比如清理日志数据
    return true
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddJob

func AddJob(isEnable bool, name, caption string, ver int, cron string, job JobFunc, ops ...options)

AddJob 客户端支持的任务

func GetClient

func GetClient() *clientVO

func Kill

func Kill(TaskId int64)

Kill 终止任务

func NewClient

func NewClient()

Types

type ClientJob

type ClientJob struct {
	Name     string                                 // 任务名称
	Ver      int                                    // 任务版本
	Caption  string                                 // 任务标题
	Cron     string                                 // 任务执行表达式
	StartAt  int64                                  // 任务开始时间(时间戳秒)
	IsEnable bool                                   // 任务是否启用
	Data     collections.Dictionary[string, string] // 第一次注册时使用
	// contains filtered or unexported fields
}

type Job

type Job struct {
	ClientJob ClientJob
	// contains filtered or unexported fields
}

func (*Job) Run

func (receiver *Job) Run()

type JobContext

type JobContext struct {
	Id          int64                                  // 主键
	TaskGroupId int64                                  // 任务组ID
	Ver         int                                    // 任务版本
	Name        string                                 // 实现Job的特性名称(客户端识别哪个实现类)
	Data        collections.Dictionary[string, string] // 数据

	StartAt time.Time // 任务开始时间
	// contains filtered or unexported fields
}

JobContext 任务在运行时,更新状态

func (*JobContext) Critical added in v0.8.0

func (receiver *JobContext) Critical(contents ...any)

Critical 打印Critical日志

func (*JobContext) Criticalf added in v0.8.0

func (receiver *JobContext) Criticalf(format string, a ...any)

Criticalf 打印Critical日志

func (*JobContext) Debug added in v0.8.0

func (receiver *JobContext) Debug(contents ...any)

Debug 打印Debug日志

func (*JobContext) Debugf added in v0.8.0

func (receiver *JobContext) Debugf(format string, a ...any)

Debugf 打印Debug日志

func (*JobContext) Error added in v0.8.0

func (receiver *JobContext) Error(contents ...any)

Error 打印Error日志

func (*JobContext) Errorf added in v0.8.0

func (receiver *JobContext) Errorf(format string, a ...any) error

Errorf 打印Error日志

func (*JobContext) Info added in v0.8.0

func (receiver *JobContext) Info(contents ...any)

Info 打印Info日志

func (*JobContext) Infof added in v0.8.0

func (receiver *JobContext) Infof(format string, a ...any)

Infof 打印Info日志

func (*JobContext) SetNextAt

func (receiver *JobContext) SetNextAt(t time.Time)

SetNextAt 设置下次运行时间

func (*JobContext) SetProgress

func (receiver *JobContext) SetProgress(progress int)

SetProgress 设置任务进度

func (*JobContext) Trace added in v0.8.0

func (receiver *JobContext) Trace(content ...any)

Trace 打印Trace日志

func (*JobContext) Tracef added in v0.8.0

func (receiver *JobContext) Tracef(format string, a ...any)

Tracef 打印Trace日志

func (*JobContext) Warning added in v0.8.0

func (receiver *JobContext) Warning(contents ...any)

Warning 打印Warning日志

func (*JobContext) Warningf added in v0.8.0

func (receiver *JobContext) Warningf(format string, a ...any)

Warningf 打印Warning日志

type JobFunc

type JobFunc func(jobContext *JobContext) bool

JobFunc 客户端要执行的JOB

type Module

type Module struct {
}

func (Module) DependsModule

func (module Module) DependsModule() []modules.FarseerModule

func (Module) PostInitialize

func (module Module) PostInitialize()

func (Module) PreInitialize

func (module Module) PreInitialize()

func (Module) Shutdown

func (module Module) Shutdown()

type Option added in v0.9.0

type Option struct {
	StartAt int64                                  // 任务开始时间(时间戳秒)
	Data    collections.Dictionary[string, string] // 第一次注册时使用
}

type RegistryResponse added in v0.9.0

type RegistryResponse struct {
	ClientIp   string // 客户端IP
	ClientPort int    // 客户端端口
}

type ResourceVO

type ResourceVO struct {
	QueueCount    int     // 排队中的任务数量
	WorkCount     int     // 正在处理的任务数量
	CpuUsage      float64 // CPU百分比
	MemoryUsage   float64 // 内存百分比
	AllowSchedule bool    // 是否允许调度
}

ResourceVO 客户端资源情况

func Check

func Check(clientId int64) ResourceVO

Check 检查客户端存活

func Invoke

func Invoke(task TaskEO) ResourceVO

Invoke 下发任务

type TaskEO

type TaskEO struct {
	Id          int64                                  // 主键
	Caption     string                                 // 任务组标题
	TaskGroupId int64                                  // 任务组ID
	Name        string                                 // 实现Job的特性名称(客户端识别哪个实现类)
	StartAt     time.Time                              // 开始时间
	Data        collections.Dictionary[string, string] // 本次执行任务时的Data数据
}

TaskEO 任务记录

type TaskReportDTO

type TaskReportDTO struct {
	Id           int64                                  // 主键
	TaskGroupId  int64                                  // 任务组ID
	Ver          int                                    // 任务版本
	Name         string                                 // 实现Job的特性名称(客户端识别哪个实现类)
	Data         collections.Dictionary[string, string] // 数据
	NextTimespan int64                                  // 下次执行时间
	Progress     int                                    // 当前进度
	Status       TaskStatus                             // 执行状态
	RunSpeed     int64                                  // 执行速度
}

func Status

func Status(TaskId int64) TaskReportDTO

Status 查询任务状态

type TaskStatus

type TaskStatus int
const (
	None         TaskStatus = iota //  未开始
	Scheduling                     //  调度中
	ScheduleFail                   //  调度失败
	Working                        //  执行中
	Fail                           //  成功
	Success                        //  完成
)

func (TaskStatus) String

func (receiver TaskStatus) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL