xxljob

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2024 License: MIT Imports: 16 Imported by: 0

README

xxljob

GoDoc CI codecov Release

XXL-JOB golang executor is a standalone http server which manages the connection with XXL-JOB server.

The default port is 9999. It registers itself as a node in XXL-JOB server, listens and handles the calls from XXL-JOB server.

A golang executor can run multiple jobs concurrently, but each job is run in serial mode, which is strictly following the design pattern of XXL-JOB:

XXL-JOB的不同任务之间并行调度、并行执行。 XXL-JOB的单个任务,针对多个执行器是并行运行的,针对单个执行器是串行执行的。同时支持任务终止。

See 5.4.5 并行调度

Prerequisite

go version >= 1.16

Installation

go get -u github.com/hyperjiang/xxljob

Usage

1. Start the executor
import "github.com/hyperjiang/xxljob"

const (
	appName     = "xxl-job-executor-sample"
	accessToken = "default_token"
	host        = "localhost:8080/xxl-job-admin"
	demoHandler = "demoJobHandler"
)

e := xxljob.NewExecutor(
    xxljob.WithAppName(appName),
    xxljob.WithAccessToken(accessToken),
    xxljob.WithHost(host),
)

// start in goroutine
go e.Start()
2. Add job handler

Job handlers are functions that implement xxljob.JobHandler (that is func(ctx context.Context, param xxljob.JobParam) error).

e.AddJobHandler(demoHandler, func(ctx context.Context, param xxljob.JobParam) error {
    fmt.Println(param.Params)
    return nil
})
3. Stop the executor
e.Stop()

Documentation

Overview

Package xxljob implements a golang executor for xxl-job. It provides functions to execute jobs, get job logs, and report job execution status. An executor can run multiple jobs concurrently, but each job is run in serial mode.

Index

Constants

View Source
const (
	// SerialExecution: the scheduling job enters the FIFO queue and runs in serial mode. (default)
	SerialExecution = "SERIAL_EXECUTION"
	// DiscardLater: if there are running jobs in the executor,
	// this job will be discarded and marked as failed.
	DiscardLater = "DISCARD_LATER"
	// CoverEarly: if there are running jobs in the executor,
	// the running jobs will be terminated and the queue will be cleared,
	// and then this new job will be run.
	CoverEarly = "COVER_EARLY"
)

Variables

This section is empty.

Functions

func LocalIP

func LocalIP() string

LocalIP gets local IPv4 address.

func ReadableSize

func ReadableSize(size int64) string

ReadableSize prints the size in human readable format.

func TruncateDuration

func TruncateDuration(d time.Duration) time.Duration

TruncateDuration truncates a duration less than 1s to the specified precision, otherwise returns the duration unchanged.

Types

type CallbackParam

type CallbackParam struct {
	LogID       int64  `json:"logId"`
	LogDateTime int64  `json:"logDateTim"`
	HandleCode  int    `json:"handleCode"`
	HandleMsg   string `json:"handleMsg"`
}

CallbackParam is to report job execution result.

type Executor

type Executor struct {
	Options
	// contains filtered or unexported fields
}

Executor is responsible for executing jobs.

func NewExecutor

func NewExecutor(opts ...Option) *Executor

NewExecutor creates a new executor.

func (*Executor) AddJobHandler

func (e *Executor) AddJobHandler(name string, h JobHandler)

AddJobHandler registers a job handler by name.

func (*Executor) GetJobHandler

func (e *Executor) GetJobHandler(name string) JobHandler

GetJobHandler retrieves the job handler for a given name.

func (*Executor) RemoveJobHandler

func (e *Executor) RemoveJobHandler(name string)

RemoveJobHandler removes a job handler by name.

func (*Executor) Start

func (e *Executor) Start() error

Start starts the executor and register itself to the xxl-job server.

func (*Executor) Stop

func (e *Executor) Stop() error

Stop stops the executor.

func (*Executor) TriggerJob

func (e *Executor) TriggerJob(params RunParam) error

TriggerJob triggers a job. It will return error if handler does not exist or log id is duplicate.

type IdleBeatParam

type IdleBeatParam struct {
	JobID int `json:"jobId"`
}

IdleBeatParam is for idle checking.

type Job

type Job struct {
	ID        int
	LogID     int64
	Name      string
	Handle    JobHandler
	Param     JobParam
	Timeout   int // timeout in seconds
	StartTime time.Time
	EndTime   time.Time
	// contains filtered or unexported fields
}

Job represents a scheduled job.

func (*Job) Duration

func (j *Job) Duration() time.Duration

Duration returns the duration of job execution.

func (*Job) Run

func (j *Job) Run()

Run runs the job.

func (*Job) Stop

func (j *Job) Stop()

Stop stops the job.

type JobHandler

type JobHandler func(ctx context.Context, param JobParam) error

JobHandler is the handler function for executing job.

type JobParam

type JobParam struct {
	Params        string
	ShardingIndex int
	ShardingTotal int
}

JobParam is the parameter passed to the job handler.

type KillParam

type KillParam struct {
	JobID int `json:"jobId"`
}

KillParam is used to terminate a running job.

type LogParam

type LogParam struct {
	LogId       int64 `json:"logId"`
	LogDateTime int64 `json:"logDateTim"`
	FromLineNum int   `json:"fromLineNum"`
}

LogParam is used to get job logs.

type LogResult

type LogResult struct {
	FromLineNum int    `json:"fromLineNum"`
	ToLineNum   int    `json:"toLineNum"`
	LogContent  string `json:"logContent"`
	IsEnd       bool   `json:"isEnd"`
}

LogResult is the response format for log content.

type Logger

type Logger interface {
	Info(string, ...interface{})
	Error(string, ...interface{})
}

Logger is a generic logger interface.

func DefaultLogger

func DefaultLogger() Logger

DefaultLogger returns a default logger.

func DummyLogger

func DummyLogger() Logger

DummyLogger returns a logger which writes nothing.

type Option

type Option func(*Options)

Option is for setting options.

func WithAccessToken

func WithAccessToken(token string) Option

WithAccessToken sets access token.

func WithAppName

func WithAppName(appName string) Option

WithAppName sets app name.

func WithCallbackBufferSize

func WithCallbackBufferSize(size int) Option

WithCallbackBufferSize sets callback buffer size.

func WithCallbackInterval

func WithCallbackInterval(interval string) Option

WithCallbackInterval sets callback interval.

func WithClientTimeout

func WithClientTimeout(timeout time.Duration) Option

WithClientTimeout sets client timeout.

func WithHost

func WithHost(host string) Option

WithHost sets xxl-job server address.

func WithIdleTimeout

func WithIdleTimeout(timeout time.Duration) Option

WithIdleTimeout sets idle timeout.

func WithInterruptSignals

func WithInterruptSignals(signals []os.Signal) Option

WithInterruptSignals sets interrupt signals.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets logger.

func WithPort

func WithPort(port int) Option

WithPort sets service port.

func WithReadTimeout

func WithReadTimeout(timeout time.Duration) Option

WithReadTimeout sets read timeout.

func WithRegisterInterval

func WithRegisterInterval(interval string) Option

WithRegisterInterval sets register interval.

func WithSizeLimit

func WithSizeLimit(sizeLimit int64) Option

WithSizeLimit sets size limit.

func WithWaitTimeout

func WithWaitTimeout(timeout time.Duration) Option

WithWaitTimeout sets wait timeout for graceful shutdown.

func WithWriteTimeout

func WithWriteTimeout(timeout time.Duration) Option

WithWriteTimeout sets write timeout.

type Options

type Options struct {
	// client settings
	AccessToken        string
	AppName            string
	CallbackBufferSize int
	CallbackInterval   string
	ClientTimeout      time.Duration
	Host               string
	Logger             Logger
	RegisterInterval   string
	SizeLimit          int64 // we will not log the response if its size exceeds the size limit

	// http server settings
	Port         int
	IdleTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration
	WaitTimeout  time.Duration
	// contains filtered or unexported fields
}

Options are executor options.

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions creates options with defaults

type RegistryParam

type RegistryParam struct {
	RegistryGroup string `json:"registryGroup"`
	RegistryKey   string `json:"registryKey"`
	RegistryValue string `json:"registryValue"`
}

RegistryParam is to register or deregister an executor.

type Response

type Response struct {
	Code    int         `json:"code"` // 200 means success, other means failed
	Msg     string      `json:"msg"`  // error message
	Content interface{} `json:"content,omitempty"`
}

Response is the response format.

func NewErrorResponse

func NewErrorResponse(msg string) *Response

NewErrorResponse returns an error response.

func NewSuccResponse

func NewSuccResponse() *Response

NewSuccResponse returns a default success response.

func (Response) String

func (res Response) String() string

String marshals the response into a json string.

type RunParam

type RunParam struct {
	JobID                 int    `json:"jobId"`
	ExecutorHandler       string `json:"executorHandler"`
	ExecutorParams        string `json:"executorParams"`
	ExecutorBlockStrategy string `json:"executorBlockStrategy"`
	ExecutorTimeout       int    `json:"executorTimeout"` // job execution timeout in seconds
	LogID                 int64  `json:"logId"`
	LogDateTime           int64  `json:"logDateTime"`    // timestamp in milliseconds
	GlueType              string `json:"glueType"`       // unsupported in go executor
	GlueSource            string `json:"glueSource"`     // unsupported in go executor
	GlueUpdatetime        int64  `json:"glueUpdatetime"` // unsupported in go executor
	BroadcastIndex        int    `json:"broadcastIndex"`
	BroadcastTotal        int    `json:"broadcastTotal"`
}

RunParam is used to trigger a job.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL