xxl

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2022 License: Apache-2.0 Imports: 23 Imported by: 2

README

jupiter接入xxl-job分布式调度中心模块

example: /example/xxl-job 用法如下:

  1. 增加配置: 以下是配置说明
[xxl]
  [xxl.job]
    [xxl.job.executor]
      address = "http://127.0.0.1:8080/xxl-job-admin"  # 注意换成XXL调度中心对应环境的域名
      access_token = "jupiter-token"    # 注册xxl-job执行器需要的token信息
      appname = "jupiter-xxl-job-demo"  # 启动执行器的名称
      port = "59000"                    # 启动执行器的服务端口
      log_dir = "./"                    # 执行器产生的日志文件目录
	  # 以下的配置建议使用默认
	  #host = ""                        # 启动执行器的主机。默认通过ip.get注册。确保xxl-job能调度该地址
	  debug = true						# 是否开启debug模式
	  switch = true 					# 执行器的总开关
	  registry_group = "EXECUTOR"		# 执行器组
	  #timeout = 2000					# 接口超时时间
  1. 示例用法:
func main() {
	eng := NewEngine()
	eng.RegisterHooks(hooks.Stage_AfterStop, func() {
		fmt.Println("exit jupiter app ...")
	})
	if err := eng.Run(); err != nil {
		log.Fatal(err)
	}
}

type Engine struct {
	jupiter.Application
}

func NewEngine() *Engine {
	eng := &Engine{}
	if err := eng.Startup(
		xgo.ParallelWithError(
			eng.startXxlJob,
		),
	); err != nil {
		xlog.Panic("startup engine", xlog.Any("err", err))
	}
	return eng
}

func (eng *Engine) startXxlJob() error {
	executor := xxl.StdNewExecutor()
	// 注册定时任务
	executor.RegXJob(
		NewTest(),
		NewTest2(),
	)
	eng.Executor(executor)
	return nil
}


// =======以下为示例任务test.go=========
type Test struct{}

func NewTest() *Test {
	return &Test{}
}

// 任务名称
func (t *Test) GetJobName() string {
	return "test"
}

// xxl-job 分布式调度任务执行函数
func (t *Test) Run(ctx context.Context, param *executor.RunReq) (msg string, err error) {
	//使用xxl-logger日志即可在xxl-job平台上看到日志
	logger.Info(param.LogID, "start run...")
	logger.Info(param.LogID, fmt.Sprintf("job param is: %s", param.ExecutorParams))
	fmt.Println("test job has been executed")
	return "success", nil
}
注:调度器中的停止某项任务是停止调度接下来的任务。如果用中途中止某个正在进行中的定时任务,请使用ctx.Done()

Documentation

Index

Constants

View Source
const (
	SerialExecution     = "SERIAL_EXECUTION"       //单机串行
	DiscardLater        = "DISCARD_LATER"          //丢弃后续调度
	DiscardLaterNoAlarm = "DISCARD_LATER_NO_ALARM" //丢弃后续调度,并不报警
	CoverEarly          = "COVER_EARLY"            //覆盖之前调度
)

阻塞处理策略

View Source
const (
	TaskResultTypeDone = iota
	TaskResultTypeFailed
	TaskResultTypeCancel
	TaskResultTypeTimeout
	TaskResultTypePanic
)

Variables

View Source
var (
	DefaultExecutorPort  = "59000"
	DefaultAccessToken   = "jupiter-task-token"
	DefaultRegistryKey   = "jupiter-demo-app"
	DefaultRegistryGroup = "EXECUTOR"
	DefaultSwitch        = true
	DefaultExecuteIp     = ipv4.LocalIP()
)
View Source
var MaxQueueSize = 1
View Source
var (
	OverLimit = "over limit"
)

Functions

func CheckOptions

func CheckOptions(opts *Options)

func Int64ToStr

func Int64ToStr(i int64) string

int64 to str

func StdNewExecutor

func StdNewExecutor(opts ...Option) executor.Executor

创建执行器

func StrToInt64

func StrToInt64(str string) int64

str to int64

Types

type CallbackFunc

type CallbackFunc func(ctx context.Context, status int, msg string) error

任务执行完后通知回调函数

type ExecuteResult

type ExecuteResult struct {
	Error int64       `json:"error"`
	Msg   interface{} `json:"msg"`
}

任务执行结果 200 表示任务执行正常,500表示失败

type HttpHandler

type HttpHandler func(http.ResponseWriter, *http.Request)

中间件

type JobExecutor

type JobExecutor struct {
	// contains filtered or unexported fields
}

func (*JobExecutor) GetAddress

func (e *JobExecutor) GetAddress() string

GetAddress

func (*JobExecutor) GracefulStop

func (e *JobExecutor) GracefulStop()

执行器优雅地退出

func (*JobExecutor) RegXJob

func (e *JobExecutor) RegXJob(jobs ...executor.XJob)

注册执行器任务

func (*JobExecutor) Run

func (e *JobExecutor) Run() (err error)

执行器启动

func (*JobExecutor) Stop

func (e *JobExecutor) Stop()

执行器退出

type LogResult

type LogResult struct {
	FromLineNum int32  `json:"fromLineNum"` // 本次请求,日志开始行数
	ToLineNum   int32  `json:"toLineNum"`   // 本次请求,日志结束行号
	LogContent  string `json:"logContent"`  // 本次请求日志内容
	IsEnd       bool   `json:"isEnd"`       // 日志是否全部加载完
}

日志响应内容

type Option

type Option func(o *Options)

func AccessToken

func AccessToken(token string) Option

请求令牌

func Debug

func Debug() Option

本地调试

func ExecutorHost

func ExecutorHost(ip string) Option

设置执行器IP

func ExecutorPort

func ExecutorPort(port string) Option

设置执行器端口

func RegistryGroup

func RegistryGroup(registryGroup string) Option

设置执行器组

func RegistryKey

func RegistryKey(registryKey string) Option

设置执行器标识

func ServerAddr

func ServerAddr(addr string) Option

设置调度中心地址

func Switch

func Switch(s bool) Option

设置默认开关

type Options

type Options struct {
	ServerAddr    string        `json:"address" toml:"address"`                //调度中心地址
	AccessToken   string        `json:"access_token" toml:"access_token"`      //请求令牌
	Timeout       time.Duration `json:"timeout" toml:"timeout"`                //接口超时时间
	ExecutorIp    string        `json:"executor_ip" toml:"executor_ip"`        //本地(执行器)IP(可自行获取)
	ExecutorPort  string        `json:"port" toml:"port"`                      //本地(执行器)端口
	RegistryKey   string        `json:"appname" toml:"appname"`                //执行器名称
	RegistryGroup string        `json:"registry_group " toml:"registry_group"` //执行器组,默认EXECUTOR
	LogDir        string        `json:"log_dir" toml:"log_dir"`                //日志目录
	Switch        bool          `json:"switch" toml:"switch"`                  //开关
	Debug         bool          `json:"debug" toml:"debug"`                    //开关
}

func DefaultConfig

func DefaultConfig() *Options

func DefaultOptions

func DefaultOptions() *Options

func (*Options) Build

func (options *Options) Build() *JobExecutor

type Registry

type Registry struct {
	RegistryGroup string `json:"registryGroup"`
	RegistryKey   string `json:"registryKey"`
	RegistryValue string `json:"registryValue"`
}

注册参数

type Task

type Task struct {
	Id        int64            // 任务id
	Name      string           // 任务名
	Param     *executor.RunReq // 参数
	StartTime int64            // 开始时间
	EndTime   int64            // 结束时间
	// contains filtered or unexported fields
}

任务定义

func (*Task) Cancel

func (t *Task) Cancel()

取消任务

func (*Task) GetContext

func (t *Task) GetContext() context.Context

获取context

func (*Task) GetId

func (t *Task) GetId() int64

获取ID

func (*Task) GetName

func (t *Task) GetName() string

获取Name

func (*Task) GetParam

func (t *Task) GetParam() *executor.RunReq

获取参数

func (*Task) Info

func (t *Task) Info() string

任务信息

func (*Task) IsRunning

func (t *Task) IsRunning() bool

判断任务是否在运行中

func (*Task) Run

func (t *Task) Run(ctx context.Context, cb CallbackFunc)

任务执行序

func (*Task) SetTimeout

func (t *Task) SetTimeout(timeout time.Duration)

更新任务timeout

func (*Task) Trace

func (t *Task) Trace(step string)

任务跟踪

type TaskFunc

type TaskFunc func(ctx context.Context, param *executor.RunReq) (msg string, err error)

任务执行函数

type TaskResult

type TaskResult struct {
	// contains filtered or unexported fields
}

type TaskWithPending

type TaskWithPending struct {
	*Task
	// contains filtered or unexported fields
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL