gs

package module
v1.0.0-release Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2024 License: MIT Imports: 33 Imported by: 0

README

go-scheduler-sdk

很多公司java与go开发共存,java中有go-scheduler做为任务调度引擎,为此也出现了go执行器(客户端),使用起来比较简单:

支持

1.执行器注册
2.耗时任务取消
3.任务注册,像写http.Handler一样方便
4.任务panic处理
5.阻塞策略处理
6.任务完成支持返回执行备注
7.任务超时取消 (单位:秒,0为不限制)
8.失败重试次数(在参数param中,目前由任务自行处理)
9.可自定义日志
10.自定义日志查看handler
11.支持外部路由(可与gin集成)

Example

package main

import (
	"context"
	"fmt"
	gs "github.com/liuhailove/go-scheduler-sdk"
	"github.com/liuhailove/go-scheduler-sdk/example/task"
	"log"
)

func main() {
	exec := gs.NewExecutor(
		gs.ServerAddr("http://localhost:8082/gs-job-admin"),
		gs.AccessToken(""),                 //请求令牌(默认为空)
		gs.RegistryKey("my-golang-jobs-2"), //执行器名称
		gs.SetLogger(&logger{}),            //自定义日志
		gs.SetSync(true),
		gs.SetServerMode(gs.FIX_SERVER_MODE),
		gs.SetCollapse(true),
		gs.SetMaxConcurrencyNum(500),
		gs.WithTaskWrapper(func(taskFunc gs.TaskFunc) gs.TaskFunc {
			return func(ctx context.Context, param *gs.RunReq) ([]string, error) {
				ctx = context.WithValue(ctx, "firstKey", "firstKey")
				return taskFunc(ctx, param)
			}
		}),
		gs.WithTaskWrapper(func(taskFunc gs.TaskFunc) gs.TaskFunc {
			return func(ctx context.Context, param *gs.RunReq) ([]string, error) {
				fmt.Println(ctx.Value("firstKey"))
				ctx = context.WithValue(ctx, "secondKey", "second param ctx")
				return taskFunc(ctx, param)
			}
		}),
	)
	exec.Init()
	//设置日志查看handler
	exec.LogHandler(func(req *gs.LogReq) *gs.LogRes {
		return &gs.LogRes{Code: 200, Msg: "", Content: gs.LogResContent{
			FromLineNum: req.FromLineNum,
			ToLineNum:   2,
			LogContent:  "这个是自定义日志handler",
			IsEnd:       true,
		}}
	})
	// 注册任务handler
	exec.RegTask("task.test", task.Test)
	exec.RegTask("task.test2", task.Test2)
	exec.RegTask("task.test3", task.Test3)
	exec.RegTask("task.testP", task.TestP)
	exec.RegTask("task.panic", task.Panic)
	exec.RegTask("task.dbShardingMapReduceTaskFunc", task.DbShardingMapReduceTaskFunc)
	exec.RegTask("task.largeShardingMapReduceTaskFunc", task.LargeShardingMapReduceTaskFunc)
	exec.RegTask("task.tableShardingMapReduceTaskFunc", task.TableShardingMapReduceTaskFunc)
	exec.RegTask("task.rowShardingMapReduceTaskFunc", task.RowShardingMapReduceTaskFunc)
	exec.RegTask("task.rowSumShardingMapReduceTaskFunc", task.RowSumShardingMapReduceTaskFunc)
	exec.RegTask("task.rowReduceShardingMapReduceTaskFunc", task.RowReduceShardingMapReduceTaskFunc)
	exec.RegTask("TaskBillGenerate", task.TaskBillGenerate)
	exec.RegTask("DisburseSubTask", task.TaskBillGenerate)

	// 测试自定义串行
	exec.RegTask("task.CustomerShardingTaskFunc", task.CustomerShardingTaskFunc)
	exec.RegTask("task.CustomerPrintTaskFunc", task.CustomerPrintTaskFunc)

	// core test
	exec.RegTask("task.BillGenerateTask", task.BillGenerateTask)
	exec.RegTask("task.BillingFileTask", task.BillingFileTask)
	exec.RegTask("task.DpdnPlanTask", task.DpdnPlanTask)
	exec.RegTask("task.OverdueFileTask", task.OverdueFileTask)
	exec.RegTask("task.OverduePlanFileTask", task.OverduePlanFileTask)
	exec.RegTask("task.PlanRemainFileTask", task.PlanRemainFileTask)
	exec.RegTask("task.TaskBizDateChange", task.TaskBizDateChange)
	exec.RegTask("task.TaskFeeCalculate", task.TaskFeeCalculate)
	exec.Run()

	fmt.Println(exec.GetAddress())
}

// gscheduler.Logger接口实现
type logger struct{}

func (l *logger) Info(format string, a ...interface{}) {
	fmt.Println(fmt.Sprintf("自定义日志 [Info]- "+format, a...))
}

func (l *logger) Error(format string, a ...interface{}) {
	log.Println(fmt.Sprintf("自定义日志 [Error]- "+format, a...))
}

func (l *logger) Debug(format string, a ...interface{}) {
	log.Println(fmt.Sprintf("自定义日志 [Debug]- "+format, a...))
}

示例项目

github.com/github.com/liuhailove/go-scheduler-sdk/example/

与gin框架集成

https://github.com/liuhailove/go-scheduler-sdk

go-scheduler-admin配置

添加执行器

执行器管理->新增执行器,执行器列表如下:

AppName		名称		注册方式	OnLine 		机器地址 		操作
golang-jobs	golang执行器	自动注册 		查看 ( 1 )   

查看->注册节点

http://127.0.0.1:9999
添加任务

任务管理->新增(注意,使用BEAN模式,JobHandler与RegTask名称一致)

1	测试panic	BEAN:task.panic	* 0 * * * ?	admin	STOP	
2	测试耗时任务	BEAN:task.test2	* * * * * ?	admin	STOP	
3	测试golang	BEAN:task.test		* * * * * ?	admin	STOP

Documentation

Index

Constants

View Source
const (
	ETCD_SERVER_KEY = "go_scheduler_root/go_scheduler_namespace/go_scheduler_group/"
	// FetchIntervalSecond 拉取时间间隔
	FetchIntervalSecond = 2 * 1000
	// SendMetricUrl 发送metric url
	SendMetricUrl = "/api/metrics"
)
View Source
const (
	// DefaultMetricLogSingleFileMaxSize 10Mb
	DefaultMetricLogSingleFileMaxSize uint64 = 10485760
	// DefaultMetricLogMaxFileAmount 保存最大文件数
	DefaultMetricLogMaxFileAmount uint32 = 10
	// DefaultLogDir 默认日志目录
	DefaultLogDir = "gslog"
	// DefaultFlushIntervalInSec 日志刷新间隔
	DefaultFlushIntervalInSec uint32 = 1
)
View Source
const DEFAULT_TIME_OUT = time.Second * 10

DEFAULT_TIME_OUT 默认超时时间

Variables

View Source
var (
	Bean       = GlueType{/* contains filtered or unexported fields */}
	GlueShell  = GlueType{/* contains filtered or unexported fields */}
	GluePython = GlueType{/* contains filtered or unexported fields */}
)
View Source
var (
	DefaultRegistryKey              = "golang-jobs"
	DefaultTimerDelayInMilliseconds = 50
	MinTimerDelayInMilliseconds     = 10
	MaxTimerDelayInMilliseconds     = 1000
)
View Source
var (
	CurrentPID = os.Getpid()
)
View Source
var (
	GeneralMsg = "{\"code\":200,\"msg\":\"\"}"
)

Functions

func Contains

func Contains(arr []string, str string) bool

Contains 判断是否包含

func DefaultCheckIdle

func DefaultCheckIdle(cxt context.Context, jobID int64) bool

DefaultCheckIdle 默认空闲检查,返回真,

func DefaultCheckResult

func DefaultCheckResult(cxt context.Context, param *RunReq) bool

DefaultCheckResult 默认结果检查处理, 如果用户没有自己处理返回结果,则默认返回处理失败

func DefaultSerialKeyPostfixGenFunc

func DefaultSerialKeyPostfixGenFunc(param *RunReq) string

DefaultSerialKeyPostfixGenFunc 默认串行key生成方法,以JobId:JobId作为Key

func FileIsExist

func FileIsExist(path string) bool

func GetServiceAddr

func GetServiceAddr(services []string, mode LoadBalanceMode) string

GetServiceAddr 加载服务地址

func Int64ToStr

func Int64ToStr(i int64) string

Int64ToStr int64 to str

Types

type CheckIdleFunc

type CheckIdleFunc func(cxt context.Context, jobID int64) bool

CheckIdleFunc 空闲检查,true:空闲,false:忙碌,只有在业务自己实现空闲检查时,才有用到此方法 备注:调度中心的路由策略需要时忙碌转移

type CheckResultFunc

type CheckResultFunc func(cxt context.Context, param *RunReq) bool

CheckResultFunc 执行结果检查,true:处理成功,false:处理失败,此方法只在以下场景才会调用 1.调度中心配置了需要结果检查 2.执行结果上报时,虽然调度中心响应了成功,但是最终却丢失了(此场景只发生在调度中心出现故障时才发生,如crash或者kill -9)

type ExecuteResult

type ExecuteResult struct {
	Code int64       `json:"code"`
	Msg  interface{} `json:"msg"`
}

ExecuteResult 任务执行结果 200 表示任务执行正常,500表示失败

type Executor

type Executor interface {
	// Init 初始化
	Init(...Option)
	// LogHandler 日志查询
	LogHandler(handler LogHandler)
	// RegTask 注册任务
	RegTask(pattern string, task TaskFunc)
	// RunTask 运行任务
	RunTask(writer http.ResponseWriter, request *http.Request)
	// KillTask 杀死任务
	KillTask(writer http.ResponseWriter, request *http.Request)
	// TaskLog 任务日志
	TaskLog(writer http.ResponseWriter, request *http.Request)
	// Run 运行服务
	Run() error
	// Stop 停止服务
	Stop()
	// RunningTask 获取运行中的任务
	RunningTask() map[string]*Task
	// GetAddress 此方法需要在init之后调用才有效,获取node中的调度server地址
	GetAddress() string
}

Executor 执行器

func NewExecutor

func NewExecutor(opts ...Option) Executor

NewExecutor 创建执行器

type FirstBalancer

type FirstBalancer struct {
}

FirstBalancer 选择第一个

func (FirstBalancer) Balance

func (f FirstBalancer) Balance(services []string) string

type GlueType

type GlueType struct {
	// contains filtered or unexported fields
}

func (*GlueType) GetCmd

func (g *GlueType) GetCmd() string

func (*GlueType) GetDesc

func (g *GlueType) GetDesc() string

func (*GlueType) GetScriptFlag

func (g *GlueType) GetScriptFlag() bool

type JobLogReq

type JobLogReq struct {
	LogStartTime int64 `json:"logStartTime"` // 本次调度日志开始时间
	LogEndTime   int64 `json:"LogEndTime"`   // 本次调度日志截至时间
	JobID        int64 `json:"jobId"`        // 本次调度任务ID
}

JobLogReq 日志请求

type LastBalancer

type LastBalancer struct {
}

LastBalancer 选择最后一个

func (LastBalancer) Balance

func (l LastBalancer) Balance(services []string) string

type LoadBalanceMode

type LoadBalanceMode int

LoadBalanceMode 负载均衡模式,仅仅在ServerMode为ETCD_LOOKUP_MODE生效

const (
	DEFAULT_MODE LoadBalanceMode = 0
	// RODOM_MODE 随机
	RODOM_MODE LoadBalanceMode = 1
	// ROUND_MODE 轮询
	ROUND_MODE LoadBalanceMode = 2
	// FIRST_MODE 第一个
	FIRST_MODE LoadBalanceMode = 3
	// LAST_MODE 最后一个
	LAST_MODE LoadBalanceMode = 4
)

type LoadBalancer

type LoadBalancer interface {
	// Balance 均衡策略,返回被负载的service
	Balance(services []string) string
}

LoadBalancer 负载均衡接口

type LogHandler

type LogHandler func(req *LogReq) *LogRes

type LogReq

type LogReq struct {
	LogDateTim  int64 `json:"logDateTim"`  // 本次调度日志时间
	LogID       int64 `json:"logId"`       // 本次调度日志ID
	FromLineNum int   `json:"fromLineNum"` // 日志开始行号,滚动加载日志
}

LogReq 日志请求

type LogRes

type LogRes struct {
	Code    int64         `json:"code"`    // 200 表示正常、其他失败
	Msg     string        `json:"msg"`     // 错误提示消息
	Content LogResContent `json:"content"` // 日志响应内容
}

LogRes 日志响应

type LogResContent

type LogResContent struct {
	FromLineNum int    `json:"fromLineNum"` // 本次请求,日志开始行数
	ToLineNum   int    `json:"toLineNum"`   // 本次请求,日志结束行号
	LogContent  string `json:"logContent"`  // 本次请求日志内容
	IsEnd       bool   `json:"isEnd"`       // 日志是否全部加载完
}

LogResContent 日志响应内容

type OneLogReq

type OneLogReq struct {
	LogStartTime int64 `json:"logStartTime"` // 本次调度日志开始时间
	LogEndTime   int64 `json:"LogEndTime"`   // 本次调度日志截至时间
	LogID        int64 `json:"logID"`        // 本次调度任务执行日志ID
}

OneLogReq 日志请求

type Option

type Option func(o *Options)

func AccessToken

func AccessToken(token string) Option

AccessToken 请求令牌

func Endpoints

func Endpoints(endpoints []string) Option

Endpoints 设置ETCD

func ExecutorIp

func ExecutorIp(executorIp string) Option

func ExecutorPort

func ExecutorPort(executorPort string) Option

func LoadBalance

func LoadBalance(mode LoadBalanceMode) Option

LoadBalance 设置负载均衡模式

func RegistryKey

func RegistryKey(registryKey string) Option

RegistryKey 设置执行器标识

func ServerAddr

func ServerAddr(addr string) Option

ServerAddr 设置调度中心地址

func SetCheckIdleFunc

func SetCheckIdleFunc(checkIdleFunc CheckIdleFunc) Option

SetCheckIdleFunc 设置空闲检查方法

func SetCheckResultFunc

func SetCheckResultFunc(resultFunc CheckResultFunc) Option

SetCheckResultFunc 设置结果检查方法

func SetCollapse

func SetCollapse(collapse bool) Option

func SetFlushIntervalInSec

func SetFlushIntervalInSec(flushIntervalInSec uint32) Option

SetFlushIntervalInSec 设置日志刷新时间

func SetLogDir

func SetLogDir(logDir string) Option

SetLogDir 日志目录

func SetLogger

func SetLogger(l logging.Logger) Option

SetLogger 设置日志处理器

func SetMaxConcurrencyNum

func SetMaxConcurrencyNum(maxConcurrencyNum uint16) Option

func SetMetricLogMaxFileAmount

func SetMetricLogMaxFileAmount(metricLogMaxFileAmount uint32) Option

SetMetricLogMaxFileAmount 设置最大保存多少个file

func SetMetricLogSingleFileMaxSize

func SetMetricLogSingleFileMaxSize(metricLogSingleFileMaxSize uint64) Option

SetMetricLogSingleFileMaxSize 设置单个metric文件大小

func SetRouterFlag

func SetRouterFlag(routerFlag string) Option

func SetSerialKeyPostfixGenFunc

func SetSerialKeyPostfixGenFunc(serialKeyPostfixGenFunc SerialKeyPostfixGenFunc) Option

SetSerialKeyPostfixGenFunc 设置串行key生成方法

func SetServerMode

func SetServerMode(mode ServerMode) Option

SetServerMode 设置连接服务的模式

func SetSync

func SetSync(sync bool) Option

SetSync 设置是否同步

func SetTimerDelayInMilliseconds

func SetTimerDelayInMilliseconds(timerDelayInMilliseconds int) Option

func Timeout

func Timeout(timeout time.Duration) Option

Timeout 设置超时时间

func WithTaskWrapper

func WithTaskWrapper(taskWrapper TaskWrapper) Option

WithTaskWrapper 设置预处理方法Wrapper

type Options

type Options struct {
	ServerAddr               string          `json:"server_addr"`                 // 调度中心地址
	AccessToken              string          `json:"access_token"`                // 请求令牌
	Timeout                  time.Duration   `json:"timeout"`                     // 接口超时时间
	ExecutorIp               string          `json:"executor_ip"`                 // 本地(执行器)IP(可自行获取)
	ExecutorPort             string          `json:"executor_port"`               // 本地(执行器)端口
	RegistryKey              string          `json:"registry_key"`                // 执行器名称
	LogDir                   string          `json:"log_dir"`                     // 日志目录
	Sync                     bool            `json:"sync"`                        // 是否同步启动,默认为false
	Endpoints                []string        `json:"endpoints"`                   // etcd地址
	ServerMode               ServerMode      `json:"server_mode"`                 // 连接server的模式
	LoadBalanceMode          LoadBalanceMode `json:"load_balance_mode"`           // 负载均衡模式
	MaxConcurrencyNum        uint16          `json:"max_concurrency_num"`         // 最发并发数,默认为CPU*2
	Collapse                 bool            `json:"collapse"`                    // 开启请求合并
	TimerDelayInMilliseconds int             `json:"timer_delay_in_milliseconds"` // 请求合并延迟时间
	// contains filtered or unexported fields
}

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

func (*Queue) PopFrontAll

func (q *Queue) PopFrontAll() []interface{}

PopFrontAll 全部出队

func (*Queue) Push

func (q *Queue) Push(val interface{})

Push 入队

type RandomBalancer

type RandomBalancer struct {
}

RandomBalancer 随机负载均衡算法

func (*RandomBalancer) Balance

func (r *RandomBalancer) Balance(services []string) string

type Registry

type Registry struct {
	RegistryGroup string `json:"registryGroup"`
	RegistryKey   string `json:"registryKey"`
	// LoadStat load1负载
	LoadStat float64 `json:"loadStat"`
	// CpuStat cpu 使用率
	CpuStat float64 `json:"cpuStat"`
	// MemoryStat 内存使用率
	MemoryStat    int64  `json:"memoryStat"`
	RegistryValue string `json:"registryValue"`

	// routerFlag 路由标签
	RouterFlag string `json:"routerFlag"`
}

Registry 注册参数

type RoundBalancer

type RoundBalancer struct {
	// contains filtered or unexported fields
}

RoundBalancer 轮询负载均衡算法

func (*RoundBalancer) Balance

func (r *RoundBalancer) Balance(services []string) string

type RunReq

type RunReq struct {
	JobID                                        int64  `json:"jobId"`                                        // 任务ID
	ExecutorHandler                              string `json:"executorHandler"`                              // 任务标识
	ExecutorParams                               string `json:"executorParams"`                               // 任务参数
	ExecutorBlockStrategy                        string `json:"executorBlockStrategy"`                        // 任务阻塞策略
	ExecutorTimeout                              int64  `json:"executorTimeout"`                              // 任务超时时间,单位秒,大于零时生效
	LogID                                        int64  `json:"logId"`                                        // 本次调度日志ID
	ParentLog                                    int64  `json:"parentLog"`                                    // 本次调度的父日志ID
	InstanceId                                   string `json:"instanceId"`                                   // 实例ID,一次运行串联的任务实例ID一致
	LogDateTime                                  int64  `json:"logDateTime"`                                  // 本次调度日志时间
	GlueType                                     string `json:"glueType"`                                     // 任务模式,可选值参考 GlueTypeEnum
	GlueSource                                   string `json:"glueSource"`                                   // GLUE脚本代码
	GlueUpdatetime                               int64  `json:"glueUpdatetime"`                               // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
	BroadcastIndex                               int64  `json:"broadcastIndex"`                               // 分片参数:当前分片
	BroadcastTotal                               int64  `json:"broadcastTotal"`                               // 分片参数:总分片
	BusinessStartExecutorToleranceThresholdInMin int32  `json:"businessStartExecutorToleranceThresholdInMin"` // 业务开始执行容忍阈值
}

RunReq 触发任务请求参数

func (*RunReq) String

func (r *RunReq) String() string

type SendMetricRetryCallback

type SendMetricRetryCallback struct {
	// contains filtered or unexported fields
}

func (SendMetricRetryCallback) DoWithRetry

func (m SendMetricRetryCallback) DoWithRetry(ctx retry.RtyContext) interface{}

type SerialKeyPostfixGenFunc

type SerialKeyPostfixGenFunc func(param *RunReq) string

SerialKeyPostfixGenFunc 串行执行策略时,串行key的生成策略, 默认情况下以JobId:JobId作为串行依据,然而有些场景下业务希望 又可以按照策略并行执行,比如同一个用户下希望是串行执行 不同用户下并行执行

type ServerMode

type ServerMode int

ServerMode 连接server的模式

const (
	// HISTORY_SERVER_MODE 固定调度中心模式
	HISTORY_SERVER_MODE ServerMode = 0
	// FIX_SERVER_MODE 固定调度中心模式
	FIX_SERVER_MODE ServerMode = 1
	// ETCD_LOOKUP_MODE etcd自动查询模式
	ETCD_LOOKUP_MODE ServerMode = 2
	// MIXED_MODE 混合模式,此种模式优先使用etcd,如果etcd不可用,则使用固定IP
	MIXED_MODE ServerMode = 3
)

type Task

type Task struct {
	Id    int64
	Name  string
	Ext   context.Context
	Param *RunReq

	Cancel                context.CancelFunc
	StartTime             int64
	EndTime               int64
	LogId                 int64    // 任务LogId
	ExecutorBlockStrategy string   //阻塞策略
	GlueType              GlueType //任务类型
	// contains filtered or unexported fields
}

Task 任务

func (*Task) Info

func (t *Task) Info() string

Info 任务信息

func (*Task) Run

func (t *Task) Run(callback func(code int64, msg []string), e *executor)

Run 运行任务

type TaskFunc

type TaskFunc func(ctx context.Context, param *RunReq) ([]string, error)

TaskFunc 任务执行函数

type TaskWrapper

type TaskWrapper func(TaskFunc) TaskFunc

TaskWrapper 包装 TaskWrapper 并返回等效项,任务执行前处理,一般可以处理通用的逻辑,如context设置、打印参数、限流设置等

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL