xxl

package module
v1.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 27, 2023 License: MIT Imports: 16 Imported by: 0

README

xxl-job-executor-go

很多公司java与go开发共存,java中有xxl-job做为任务调度引擎,为此也出现了go执行器(客户端),使用起来比较简单:

支持

1.执行器注册
2.耗时任务取消
3.任务注册,像写http.Handler一样方便
4.任务panic处理
5.阻塞策略处理
6.任务完成支持返回执行备注
7.任务超时取消 (单位:秒,0为不限制)
8.失败重试次数(在参数param中,目前由任务自行处理)
9.可自定义日志
10.自定义日志查看handler
11.支持外部路由(可与gin集成)

Example

package main

import (
	"fmt"
	xxl "github.com/xxl-job/xxl-job-executor-go"
	"github.com/xxl-job/xxl-job-executor-go/example/task"
	"log"
)

func main() {
	exec := xxl.NewExecutor(
		xxl.ServerAddr("http://127.0.0.1/xxl-job-admin"),
		xxl.AccessToken(""),            //请求令牌(默认为空)
		xxl.ExecutorIp("127.0.0.1"),    //可自动获取
		xxl.ExecutorPort("9999"),       //默认9999(非必填)
		xxl.RegistryKey("golang-jobs"), //执行器名称
		xxl.SetLogger(&logger{}),       //自定义日志
	)
	exec.Init()
	//设置日志查看handler
	exec.LogHandler(func(req *xxl.LogReq) *xxl.LogRes {
		return &xxl.LogRes{Code: xxl.SuccessCode, Msg: "", Content: xxl.LogResContent{
			FromLineNum: req.FromLineNum,
			ToLineNum:   2,
			LogContent:  "这个是自定义日志handler",
			IsEnd:       true,
		}}
	})
	//注册任务handler
	exec.RegTask("task.test", task.Test)
	exec.RegTask("task.test2", task.Test2)
	exec.RegTask("task.panic", task.Panic)
	log.Fatal(exec.Run())
}

//xxl.Logger接口实现
type logger struct{}

func (l *logger) Info(format string, a ...interface{}) {
	fmt.Println(fmt.Sprintf("自定义日志 - "+format, a...))
}

func (l *logger) Error(format string, a ...interface{}) {
	log.Println(fmt.Sprintf("自定义日志 - "+format, a...))
}

示例项目

github.com/xxl-job/xxl-job-executor-go/example/

与gin框架集成

https://github.com/gin-middleware/xxl-job-executor

xxl-job-admin配置

添加执行器

执行器管理->新增执行器,执行器列表如下:

AppName		名称		注册方式	OnLine 		机器地址 		操作
golang-jobs	golang执行器	自动注册 		查看 ( 1 )   

查看->注册节点

http://127.0.0.1:9999
添加任务

任务管理->新增(注意,使用BEAN模式,JobHandler与RegTask名称一致)

1	测试panic	BEAN:task.panic	* 0 * * * ?	admin	STOP	
2	测试耗时任务	BEAN:task.test2	* * * * * ?	admin	STOP	
3	测试golang	BEAN:task.test		* * * * * ?	admin	STOP

Documentation

Index

Constants

View Source
const (
	SuccessCode = 200
	FailureCode = 500
)

响应码

View Source
const (
	First  ExecutorRouteStrategy = "FIRST"  // 第一个
	Last   ExecutorBlockStrategy = "LAST"   // 最后一个
	Round  ExecutorBlockStrategy = "ROUND"  // 轮询
	Random ExecutorRouteStrategy = "RANDOM" // 随机
)
View Source
const (
	Persistence = -1
)

Variables

View Source
var (
	AddJobErr   = errors.New("add job failed")
	QueryJobErr = errors.New("query job failed")
	DelJobErr   = errors.New("delete job failed")
	RunJobErr   = errors.New("run job failed")
)
View Source
var (
	DefaultExecutorPort = "9999"
	DefaultRegistryKey  = "golang-jobs"
)
View Source
var LoginErr = errors.New("login failed")

Functions

func Int64ToStr

func Int64ToStr(i int64) string

Int64ToStr int64 to str

func StrToInt64

func StrToInt64(s string) (int64, error)

StrToInt64 int64 to str

Types

type AddJob

type AddJob struct {
	JobGroup               int64  `json:"jobGroup"`                // 执行器ID
	JobDesc                string `json:"jobDesc"`                 // 任务描述
	Author                 string `json:"author"`                  // 作者
	TriggerStatus          int32  `json:"triggerStatus"`           // 任务状态
	AlarmEmail             string `json:"alarmEmail"`              // 多个邮件使用逗号分隔
	ScheduleType           string `json:"scheduleType"`            // 调度类型, CRON 定时, FIX_RATE 固定速率
	ScheduleConf           string `json:"scheduleConf"`            // 必传
	ScheduleConfCron       string `json:"schedule_conf_CRON"`      // 不清楚该字段功能
	CronGenDisplay         string `json:"cronGen_display"`         // CRON类型, 与scheduleConf一致, 否则不传
	ScheduleConfFixDelay   string `json:"schedule_conf_FIX_DELAY"` // 不清楚该字段功能
	ScheduleConfFixRate    string `json:"schedule_conf_FIX_RATE"`  // FIX_RATE类型, 与scheduleConf一致, 否则不传
	GlueType               string `json:"glueType"`                // 运行模式, BEAN
	ExecutorHandler        string `json:"executorHandler"`         // JobHandler, 任务名
	ExecutorParam          string `json:"executorParam"`           // 执行参数
	ExecutorRouteStrategy  string `json:"executorRouteStrategy"`   // 路由策略, ROUND 轮询, FIRST 第一个, LAST 最后一个
	ChildJobId             string `json:"childJobId"`              // 子任务ID, 做个任务使用逗号隔开
	MisfireStrategy        string `json:"misfireStrategy"`         // 调度过期策略, DO_NOTHING 忽略, FIRE_ONCE_NOW 立即执行一次
	ExecutorBlockStrategy  string `json:"executorBlockStrategy"`   // 阻塞处理策略, SERIAL_EXECUTION 串行, DISCARD_LATER 丢弃后续调度, COVER_EARLY 覆盖之前调度
	ExecutorTimeout        int64  `json:"executorTimeout"`         // 执行超时时间, 单位: s
	ExecutorFailRetryCount int32  `json:"executorFailRetryCount"`  // 失败重试次数
	GlueRemark             string `json:"glueRemark"`              // 值为: GLUE代码初始化
	GlueSource             string `json:"glueSource"`              // 源码
}

type AddJobResp

type AddJobResp struct {
	Code    int         `json:"code"`
	Msg     interface{} `json:"msg"`
	Content string      `json:"content"`
}

type Auth

type Auth interface {
	Login() ([]*http.Cookie, error)
}

func NewAuth

func NewAuth(userName, password, addr string, interval time.Duration) (Auth, func(), error)

type AuthImpl

type AuthImpl struct {
	Addr     string
	UserName string
	Password string
	// contains filtered or unexported fields
}

func (*AuthImpl) CronReplaceCookie added in v1.1.6

func (a *AuthImpl) CronReplaceCookie(interval time.Duration)

func (*AuthImpl) Login

func (a *AuthImpl) Login() ([]*http.Cookie, error)

Login 登录

type DeleteJob

type DeleteJob struct {
	Id int64 `json:"id"`
}

type ExecuteResult

type ExecuteResult struct {
	Code int64       `json:"code"`
	Msg  interface{} `json:"msg"`
}

ExecuteResult 任务执行结果 200 表示任务执行正常,500表示失败

type Executor

type Executor interface {
	// Init 初始化
	Init(...Option)
	// LogHandler 日志查询
	LogHandler(handler LogHandler)
	// RegTaskNoStorage 注册任务到内存
	RegTaskNoStorage(pattern string)
	// RegTempTask 注册临时性任务
	RegTempTask(pattern string, handlerName string, jobId, expireAt int64)
	// RegPersistenceTask 注册任务
	RegPersistenceTask(pattern, handlerName string, jobId int64)
	// RunTask 运行任务
	RunTask(writer http.ResponseWriter, request *http.Request)
	// KillTask 杀死任务
	KillTask(writer http.ResponseWriter, request *http.Request)
	// TaskLog 任务日志
	TaskLog(writer http.ResponseWriter, request *http.Request)
	// Beat 心跳检测
	Beat(writer http.ResponseWriter, request *http.Request)
	// IdleBeat 忙碌检测
	IdleBeat(writer http.ResponseWriter, request *http.Request)
	// Run 运行服务
	Run() error
	// Stop 停止服务
	Stop()
}

Executor 执行器

func NewExecutor

func NewExecutor(opts ...Option) Executor

NewExecutor 创建执行器

type ExecutorBlockStrategy added in v1.1.8

type ExecutorBlockStrategy string
const (
	SerialExecution ExecutorBlockStrategy = "SERIAL_EXECUTION" // 串行
	DiscardLater    ExecutorBlockStrategy = "DISCARD_LATER"    // 丢弃后续调度
	CoverEarly      ExecutorBlockStrategy = "COVER_EARLY"      // 覆盖之前调度
)

type ExecutorRouteStrategy added in v1.1.8

type ExecutorRouteStrategy string

type Job

type Job interface {
	// Add 添加任务
	Add(ctx context.Context, req *AddJob) (int64, error)
	// QueryJobList 查询JobInfo
	QueryJobList(ctx context.Context, req *QueryJob) (*JobList, error)
	// RemoveJob 移除任务
	RemoveJob(ctx context.Context, req *DeleteJob) error
	// RunJob 运行任务
	RunJob(ctx context.Context, jobId int64) error
}

func NewJob

func NewJob(addr string, auth Auth) Job

type JobImpl

type JobImpl struct {
	// contains filtered or unexported fields
}

func (*JobImpl) Add

func (j *JobImpl) Add(ctx context.Context, req *AddJob) (int64, error)

Add 添加任务

func (*JobImpl) QueryJobList

func (j *JobImpl) QueryJobList(ctx context.Context, req *QueryJob) (*JobList, error)

QueryJobList 查询JobInfo

func (*JobImpl) RemoveJob

func (j *JobImpl) RemoveJob(ctx context.Context, req *DeleteJob) error

RemoveJob 移除任务

func (*JobImpl) RunJob added in v1.1.6

func (j *JobImpl) RunJob(ctx context.Context, jobId int64) error

type JobInfo

type JobInfo struct {
	Id                     int       `json:"id"`
	JobGroup               int       `json:"jobGroup"`
	JobDesc                string    `json:"jobDesc"`
	AddTime                time.Time `json:"addTime"`
	UpdateTime             time.Time `json:"updateTime"`
	Author                 string    `json:"author"`
	AlarmEmail             string    `json:"alarmEmail"`
	ScheduleType           string    `json:"scheduleType"`
	ScheduleConf           string    `json:"scheduleConf"`
	MisfireStrategy        string    `json:"misfireStrategy"`
	ExecutorRouteStrategy  string    `json:"executorRouteStrategy"`
	ExecutorHandler        string    `json:"executorHandler"`
	ExecutorParam          string    `json:"executorParam"`
	ExecutorBlockStrategy  string    `json:"executorBlockStrategy"`
	ExecutorTimeout        int       `json:"executorTimeout"`
	ExecutorFailRetryCount int       `json:"executorFailRetryCount"`
	GlueType               string    `json:"glueType"`
	GlueSource             string    `json:"glueSource"`
	GlueRemark             string    `json:"glueRemark"`
	GlueUpdatetime         time.Time `json:"glueUpdatetime"`
	ChildJobId             string    `json:"childJobId"`
	TriggerStatus          int       `json:"triggerStatus"`
	TriggerLastTime        int       `json:"triggerLastTime"`
	TriggerNextTime        int       `json:"triggerNextTime"`
}

type JobList

type JobList struct {
	RecordsFiltered int       `json:"recordsFiltered"`
	Data            []JobInfo `json:"data"`
	RecordsTotal    int       `json:"recordsTotal"`
}

type LogFunc

type LogFunc func(req LogReq, res *LogRes) []byte

LogFunc 应用日志

type LogHandler

type LogHandler func(req *LogReq) *LogRes

type LogReq

type LogReq struct {
	LogDateTim  int64 `json:"logDateTim"`  // 本次调度日志时间
	LogID       int64 `json:"logId"`       // 本次调度日志ID
	FromLineNum int   `json:"fromLineNum"` // 日志开始行号,滚动加载日志
}

LogReq 日志请求

type LogRes

type LogRes struct {
	Code    int64         `json:"code"`    // 200 表示正常、其他失败
	Msg     string        `json:"msg"`     // 错误提示消息
	Content LogResContent `json:"content"` // 日志响应内容
}

LogRes 日志响应

type LogResContent

type LogResContent struct {
	FromLineNum int    `json:"fromLineNum"` // 本次请求,日志开始行数
	ToLineNum   int    `json:"toLineNum"`   // 本次请求,日志结束行号
	LogContent  string `json:"logContent"`  // 本次请求日志内容
	IsEnd       bool   `json:"isEnd"`       // 日志是否全部加载完
}

LogResContent 日志响应内容

type Logger

type Logger interface {
	Debug(format string, a ...interface{})
	Info(format string, a ...interface{})
	Error(format string, a ...interface{})
}

Logger 系统日志

type MisfireStrategy added in v1.1.8

type MisfireStrategy string
const (
	DoNothing   MisfireStrategy = "DO_NOTHING"    //  什么都不做
	FireOnceNow MisfireStrategy = "FIRE_ONCE_NOW" // 立即执行
)

type Option

type Option func(o *Options)

func AccessToken

func AccessToken(token string) Option

AccessToken 请求令牌

func ExecutorIp

func ExecutorIp(ip string) Option

ExecutorIp 设置执行器IP

func ExecutorPort

func ExecutorPort(port string) Option

ExecutorPort 设置执行器端口

func RegistryKey

func RegistryKey(registryKey string) Option

RegistryKey 设置执行器标识

func ServerAddr

func ServerAddr(addr string) Option

ServerAddr 设置调度中心地址

func SetConcurrentExecute added in v1.2.3

func SetConcurrentExecute(concurrentExecute bool) Option

SetConcurrentExecute 设置是否并发执行

func SetHandlerMap

func SetHandlerMap(m map[string]TaskFunc) Option

SetHandlerMap 设置job处理器

func SetLogger

func SetLogger(l Logger) Option

SetLogger 设置日志处理器

func SetStorage

func SetStorage(storage Storager) Option

SetStorage 设置job处理器

type Options

type Options struct {
	ServerAddr   string        `json:"server_addr"`   //调度中心地址
	AccessToken  string        `json:"access_token"`  //请求令牌
	Timeout      time.Duration `json:"timeout"`       //接口超时时间
	ExecutorIp   string        `json:"executor_ip"`   //本地(执行器)IP(可自行获取)
	ExecutorPort string        `json:"executor_port"` //本地(执行器)端口
	RegistryKey  string        `json:"registry_key"`  //执行器名称
	LogDir       string        `json:"log_dir"`       //日志目录

	Storage    Storager            // 任务存储
	HandlerMap map[string]TaskFunc // 任务函数

	ConcurrentExecute bool // 是否并发执行
	// contains filtered or unexported fields
}

func (*Options) GetRunningTaskId added in v1.2.3

func (o *Options) GetRunningTaskId(jobId, logId int64) string

GetRunningTaskId 生成运行任务ID

type QueryJob

type QueryJob struct {
	JobGroup        int64  `json:"jobGroup"`        // 执行器ID
	TriggerStatus   int32  `json:"triggerStatus"`   // 直接使用-1
	JobDesc         string `json:"jobDesc"`         // 任务描述
	ExecutorHandler string `json:"executorHandler"` // 任务名
	Author          string `json:"author"`          // 作者
	Start           int32  `json:"start"`           // 偏移量, 默认是0
	Length          int32  `json:"length"`          // 每页数量
}

type Registry

type Registry struct {
	RegistryGroup string `json:"registryGroup"`
	RegistryKey   string `json:"registryKey"`
	RegistryValue string `json:"registryValue"`
}

Registry 注册参数

type RunJob added in v1.1.6

type RunJob struct {
	Id int64 `json:"id"`
}

type RunReq

type RunReq struct {
	JobID                 int64  `json:"jobId"`                 // 任务ID
	ExecutorHandler       string `json:"executorHandler"`       // 任务标识
	ExecutorParams        string `json:"executorParams"`        // 任务参数
	ExecutorBlockStrategy string `json:"executorBlockStrategy"` // 任务阻塞策略
	ExecutorTimeout       int64  `json:"executorTimeout"`       // 任务超时时间,单位秒,大于零时生效
	LogID                 int64  `json:"logId"`                 // 本次调度日志ID
	LogDateTime           int64  `json:"logDateTime"`           // 本次调度日志时间
	GlueType              string `json:"glueType"`              // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum
	GlueSource            string `json:"glueSource"`            // GLUE脚本代码
	GlueUpdatetime        int64  `json:"glueUpdatetime"`        // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
	BroadcastIndex        int64  `json:"broadcastIndex"`        // 分片参数:当前分片
	BroadcastTotal        int64  `json:"broadcastTotal"`        // 分片参数:总分片
}

RunReq 触发任务请求参数

type ScheduleType added in v1.1.8

type ScheduleType string
const (
	FixRate ScheduleType = "FIX_RATE"
	Cron    ScheduleType = "CRON"
)

type SessionStorage

type SessionStorage struct {
	// contains filtered or unexported fields
}

func NewSessionStorage

func NewSessionStorage() *SessionStorage

func (*SessionStorage) Del

func (s *SessionStorage) Del(key string)

func (*SessionStorage) Exists

func (s *SessionStorage) Exists(key string) bool

func (*SessionStorage) Get

func (s *SessionStorage) Get(taskName string) *Storage

func (*SessionStorage) GetAll

func (s *SessionStorage) GetAll() map[string]*Storage

func (*SessionStorage) Len

func (s *SessionStorage) Len() int

func (*SessionStorage) Set

func (s *SessionStorage) Set(taskName, handleName string, jobId, expireAt int64)

type Storage

type Storage struct {
	HandleName string
	ExpireAt   int64
	JobId      int64
}

func (*Storage) Equal added in v1.1.3

func (s *Storage) Equal(storage *Storage) bool

Equal 相等

func (*Storage) Expired

func (s *Storage) Expired() bool

Expired 已过期

func (*Storage) Persistence

func (s *Storage) Persistence() bool

Persistence 是否为永久

type Storager

type Storager interface {
	// Set expireAt 过期时间, 时间戳
	Set(taskName, handleName string, jobId, expireAt int64)
	Get(taskName string) *Storage
	Del(taskName string)
	Exists(taskName string) bool
	Len() int
	GetAll() map[string]*Storage
}

type Task

type Task struct {
	Id    int64
	Name  string
	Ext   context.Context
	Param *RunReq

	Cancel    context.CancelFunc
	StartTime int64
	EndTime   int64
	LogID     int64
	// contains filtered or unexported fields
}

Task 任务

func (*Task) Info

func (t *Task) Info() string

Info 任务信息

func (*Task) Run

func (t *Task) Run(callback func(code int64, msg string), fn TaskFunc)

Run 运行任务

type TaskFunc

type TaskFunc func(cxt context.Context, param *RunReq) string

TaskFunc 任务执行函数

type TriggerStatus added in v1.1.8

type TriggerStatus int32
const (
	Total   TriggerStatus = -1 // 全部
	Stop    TriggerStatus = 0  // 停止
	Running TriggerStatus = 1  // 运行
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL