Documentation
¶
Index ¶
- Constants
- Variables
- func Contains(arr []string, str string) bool
- func DefaultCheckIdle(cxt context.Context, jobID int64) bool
- func DefaultCheckResult(cxt context.Context, param *RunReq) bool
- func DefaultSerialKeyPostfixGenFunc(param *RunReq) string
- func FileIsExist(path string) bool
- func GetServiceAddr(services []string, mode LoadBalanceMode) string
- func Int64ToStr(i int64) string
- type CheckIdleFunc
- type CheckResultFunc
- type ExecuteResult
- type Executor
- type FirstBalancer
- type GlueType
- type JobLogReq
- type LastBalancer
- type LoadBalanceMode
- type LoadBalancer
- type LogHandler
- type LogReq
- type LogRes
- type LogResContent
- type OneLogReq
- type Option
- func AccessToken(token string) Option
- func Endpoints(endpoints []string) Option
- func ExecutorIp(executorIp string) Option
- func ExecutorPort(executorPort string) Option
- func LoadBalance(mode LoadBalanceMode) Option
- func RegistryKey(registryKey string) Option
- func ServerAddr(addr string) Option
- func SetCheckIdleFunc(checkIdleFunc CheckIdleFunc) Option
- func SetCheckResultFunc(resultFunc CheckResultFunc) Option
- func SetCollapse(collapse bool) Option
- func SetFlushIntervalInSec(flushIntervalInSec uint32) Option
- func SetLogDir(logDir string) Option
- func SetLogger(l logging.Logger) Option
- func SetMaxConcurrencyNum(maxConcurrencyNum uint16) Option
- func SetMetricLogMaxFileAmount(metricLogMaxFileAmount uint32) Option
- func SetMetricLogSingleFileMaxSize(metricLogSingleFileMaxSize uint64) Option
- func SetRouterFlag(routerFlag string) Option
- func SetSerialKeyPostfixGenFunc(serialKeyPostfixGenFunc SerialKeyPostfixGenFunc) Option
- func SetServerMode(mode ServerMode) Option
- func SetSync(sync bool) Option
- func SetTimerDelayInMilliseconds(timerDelayInMilliseconds int) Option
- func Timeout(timeout time.Duration) Option
- func WithTaskWrapper(taskWrapper TaskWrapper) Option
- type Options
- type Queue
- type RandomBalancer
- type Registry
- type RoundBalancer
- type RunReq
- type SendMetricRetryCallback
- type SerialKeyPostfixGenFunc
- type ServerMode
- type Task
- type TaskFunc
- type TaskWrapper
Constants ¶
const ( ETCD_SERVER_KEY = "go_scheduler_root/go_scheduler_namespace/go_scheduler_group/" // FetchIntervalSecond 拉取时间间隔 FetchIntervalSecond = 2 * 1000 // SendMetricUrl 发送metric url SendMetricUrl = "/api/metrics" )
const ( // DefaultMetricLogSingleFileMaxSize 10Mb DefaultMetricLogSingleFileMaxSize uint64 = 10485760 // DefaultMetricLogMaxFileAmount 保存最大文件数 DefaultMetricLogMaxFileAmount uint32 = 10 // DefaultLogDir 默认日志目录 DefaultLogDir = "gslog" // DefaultFlushIntervalInSec 日志刷新间隔 DefaultFlushIntervalInSec uint32 = 1 )
const DEFAULT_TIME_OUT = time.Second * 10
DEFAULT_TIME_OUT 默认超时时间
Variables ¶
var ( Bean = GlueType{/* contains filtered or unexported fields */} GlueShell = GlueType{/* contains filtered or unexported fields */} GluePython = GlueType{/* contains filtered or unexported fields */} )
var ( DefaultRegistryKey = "golang-jobs" DefaultTimerDelayInMilliseconds = 50 MinTimerDelayInMilliseconds = 10 MaxTimerDelayInMilliseconds = 1000 )
var (
CurrentPID = os.Getpid()
)
var (
GeneralMsg = "{\"code\":200,\"msg\":\"\"}"
)
Functions ¶
func DefaultCheckIdle ¶
DefaultCheckIdle 默认空闲检查,返回真,
func DefaultCheckResult ¶
DefaultCheckResult 默认结果检查处理, 如果用户没有自己处理返回结果,则默认返回处理失败
func DefaultSerialKeyPostfixGenFunc ¶
DefaultSerialKeyPostfixGenFunc 默认串行key生成方法,以JobId:JobId作为Key
func FileIsExist ¶
func GetServiceAddr ¶
func GetServiceAddr(services []string, mode LoadBalanceMode) string
GetServiceAddr 加载服务地址
Types ¶
type CheckIdleFunc ¶
CheckIdleFunc 空闲检查,true:空闲,false:忙碌,只有在业务自己实现空闲检查时,才有用到此方法 备注:调度中心的路由策略需要时忙碌转移
type CheckResultFunc ¶
CheckResultFunc 执行结果检查,true:处理成功,false:处理失败,此方法只在以下场景才会调用 1.调度中心配置了需要结果检查 2.执行结果上报时,虽然调度中心响应了成功,但是最终却丢失了(此场景只发生在调度中心出现故障时才发生,如crash或者kill -9)
type ExecuteResult ¶
type ExecuteResult struct { Code int64 `json:"code"` Msg interface{} `json:"msg"` }
ExecuteResult 任务执行结果 200 表示任务执行正常,500表示失败
type Executor ¶
type Executor interface { // Init 初始化 Init(...Option) // LogHandler 日志查询 LogHandler(handler LogHandler) // RegTask 注册任务 RegTask(pattern string, task TaskFunc) // RunTask 运行任务 RunTask(writer http.ResponseWriter, request *http.Request) // KillTask 杀死任务 KillTask(writer http.ResponseWriter, request *http.Request) // TaskLog 任务日志 TaskLog(writer http.ResponseWriter, request *http.Request) // Run 运行服务 Run() error // Stop 停止服务 Stop() // RunningTask 获取运行中的任务 RunningTask() map[string]*Task // GetAddress 此方法需要在init之后调用才有效,获取node中的调度server地址 GetAddress() string }
Executor 执行器
type FirstBalancer ¶
type FirstBalancer struct { }
FirstBalancer 选择第一个
func (FirstBalancer) Balance ¶
func (f FirstBalancer) Balance(services []string) string
type GlueType ¶
type GlueType struct {
// contains filtered or unexported fields
}
func (*GlueType) GetScriptFlag ¶
type JobLogReq ¶
type JobLogReq struct { LogStartTime int64 `json:"logStartTime"` // 本次调度日志开始时间 LogEndTime int64 `json:"LogEndTime"` // 本次调度日志截至时间 JobID int64 `json:"jobId"` // 本次调度任务ID }
JobLogReq 日志请求
type LastBalancer ¶
type LastBalancer struct { }
LastBalancer 选择最后一个
func (LastBalancer) Balance ¶
func (l LastBalancer) Balance(services []string) string
type LoadBalanceMode ¶
type LoadBalanceMode int
LoadBalanceMode 负载均衡模式,仅仅在ServerMode为ETCD_LOOKUP_MODE生效
const ( DEFAULT_MODE LoadBalanceMode = 0 // RODOM_MODE 随机 RODOM_MODE LoadBalanceMode = 1 // ROUND_MODE 轮询 ROUND_MODE LoadBalanceMode = 2 // FIRST_MODE 第一个 FIRST_MODE LoadBalanceMode = 3 // LAST_MODE 最后一个 LAST_MODE LoadBalanceMode = 4 )
type LoadBalancer ¶
LoadBalancer 负载均衡接口
type LogHandler ¶
type LogReq ¶
type LogReq struct { LogDateTim int64 `json:"logDateTim"` // 本次调度日志时间 LogID int64 `json:"logId"` // 本次调度日志ID FromLineNum int `json:"fromLineNum"` // 日志开始行号,滚动加载日志 }
LogReq 日志请求
type LogRes ¶
type LogRes struct { Code int64 `json:"code"` // 200 表示正常、其他失败 Msg string `json:"msg"` // 错误提示消息 Content LogResContent `json:"content"` // 日志响应内容 }
LogRes 日志响应
type LogResContent ¶
type LogResContent struct { FromLineNum int `json:"fromLineNum"` // 本次请求,日志开始行数 ToLineNum int `json:"toLineNum"` // 本次请求,日志结束行号 LogContent string `json:"logContent"` // 本次请求日志内容 IsEnd bool `json:"isEnd"` // 日志是否全部加载完 }
LogResContent 日志响应内容
type OneLogReq ¶
type OneLogReq struct { LogStartTime int64 `json:"logStartTime"` // 本次调度日志开始时间 LogEndTime int64 `json:"LogEndTime"` // 本次调度日志截至时间 LogID int64 `json:"logID"` // 本次调度任务执行日志ID }
OneLogReq 日志请求
type Option ¶
type Option func(o *Options)
func ExecutorIp ¶
func ExecutorPort ¶
func SetCheckIdleFunc ¶
func SetCheckIdleFunc(checkIdleFunc CheckIdleFunc) Option
SetCheckIdleFunc 设置空闲检查方法
func SetCheckResultFunc ¶
func SetCheckResultFunc(resultFunc CheckResultFunc) Option
SetCheckResultFunc 设置结果检查方法
func SetCollapse ¶
func SetFlushIntervalInSec ¶
SetFlushIntervalInSec 设置日志刷新时间
func SetMaxConcurrencyNum ¶
func SetMetricLogMaxFileAmount ¶
SetMetricLogMaxFileAmount 设置最大保存多少个file
func SetMetricLogSingleFileMaxSize ¶
SetMetricLogSingleFileMaxSize 设置单个metric文件大小
func SetRouterFlag ¶
func SetSerialKeyPostfixGenFunc ¶
func SetSerialKeyPostfixGenFunc(serialKeyPostfixGenFunc SerialKeyPostfixGenFunc) Option
SetSerialKeyPostfixGenFunc 设置串行key生成方法
func WithTaskWrapper ¶
func WithTaskWrapper(taskWrapper TaskWrapper) Option
WithTaskWrapper 设置预处理方法Wrapper
type Options ¶
type Options struct { ServerAddr string `json:"server_addr"` // 调度中心地址 AccessToken string `json:"access_token"` // 请求令牌 Timeout time.Duration `json:"timeout"` // 接口超时时间 ExecutorIp string `json:"executor_ip"` // 本地(执行器)IP(可自行获取) ExecutorPort string `json:"executor_port"` // 本地(执行器)端口 RegistryKey string `json:"registry_key"` // 执行器名称 LogDir string `json:"log_dir"` // 日志目录 Sync bool `json:"sync"` // 是否同步启动,默认为false Endpoints []string `json:"endpoints"` // etcd地址 ServerMode ServerMode `json:"server_mode"` // 连接server的模式 LoadBalanceMode LoadBalanceMode `json:"load_balance_mode"` // 负载均衡模式 MaxConcurrencyNum uint16 `json:"max_concurrency_num"` // 最发并发数,默认为CPU*2 Collapse bool `json:"collapse"` // 开启请求合并 TimerDelayInMilliseconds int `json:"timer_delay_in_milliseconds"` // 请求合并延迟时间 // contains filtered or unexported fields }
type RandomBalancer ¶
type RandomBalancer struct { }
RandomBalancer 随机负载均衡算法
func (*RandomBalancer) Balance ¶
func (r *RandomBalancer) Balance(services []string) string
type Registry ¶
type Registry struct { RegistryGroup string `json:"registryGroup"` RegistryKey string `json:"registryKey"` // LoadStat load1负载 LoadStat float64 `json:"loadStat"` // CpuStat cpu 使用率 CpuStat float64 `json:"cpuStat"` // MemoryStat 内存使用率 MemoryStat int64 `json:"memoryStat"` RegistryValue string `json:"registryValue"` // routerFlag 路由标签 RouterFlag string `json:"routerFlag"` }
Registry 注册参数
type RoundBalancer ¶
type RoundBalancer struct {
// contains filtered or unexported fields
}
RoundBalancer 轮询负载均衡算法
func (*RoundBalancer) Balance ¶
func (r *RoundBalancer) Balance(services []string) string
type RunReq ¶
type RunReq struct { JobID int64 `json:"jobId"` // 任务ID ExecutorHandler string `json:"executorHandler"` // 任务标识 ExecutorParams string `json:"executorParams"` // 任务参数 ExecutorBlockStrategy string `json:"executorBlockStrategy"` // 任务阻塞策略 ExecutorTimeout int64 `json:"executorTimeout"` // 任务超时时间,单位秒,大于零时生效 LogID int64 `json:"logId"` // 本次调度日志ID ParentLog int64 `json:"parentLog"` // 本次调度的父日志ID InstanceId string `json:"instanceId"` // 实例ID,一次运行串联的任务实例ID一致 LogDateTime int64 `json:"logDateTime"` // 本次调度日志时间 GlueType string `json:"glueType"` // 任务模式,可选值参考 GlueTypeEnum GlueSource string `json:"glueSource"` // GLUE脚本代码 GlueUpdatetime int64 `json:"glueUpdatetime"` // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新 BroadcastIndex int64 `json:"broadcastIndex"` // 分片参数:当前分片 BroadcastTotal int64 `json:"broadcastTotal"` // 分片参数:总分片 BusinessStartExecutorToleranceThresholdInMin int32 `json:"businessStartExecutorToleranceThresholdInMin"` // 业务开始执行容忍阈值 }
RunReq 触发任务请求参数
type SendMetricRetryCallback ¶
type SendMetricRetryCallback struct {
// contains filtered or unexported fields
}
func (SendMetricRetryCallback) DoWithRetry ¶
func (m SendMetricRetryCallback) DoWithRetry(ctx retry.RtyContext) interface{}
type SerialKeyPostfixGenFunc ¶
SerialKeyPostfixGenFunc 串行执行策略时,串行key的生成策略, 默认情况下以JobId:JobId作为串行依据,然而有些场景下业务希望 又可以按照策略并行执行,比如同一个用户下希望是串行执行 不同用户下并行执行
type ServerMode ¶
type ServerMode int
ServerMode 连接server的模式
const ( // HISTORY_SERVER_MODE 固定调度中心模式 HISTORY_SERVER_MODE ServerMode = 0 // FIX_SERVER_MODE 固定调度中心模式 FIX_SERVER_MODE ServerMode = 1 // ETCD_LOOKUP_MODE etcd自动查询模式 ETCD_LOOKUP_MODE ServerMode = 2 // MIXED_MODE 混合模式,此种模式优先使用etcd,如果etcd不可用,则使用固定IP MIXED_MODE ServerMode = 3 )
type Task ¶
type Task struct { Id int64 Name string Ext context.Context Param *RunReq Cancel context.CancelFunc StartTime int64 EndTime int64 LogId int64 // 任务LogId ExecutorBlockStrategy string //阻塞策略 GlueType GlueType //任务类型 // contains filtered or unexported fields }
Task 任务
type TaskWrapper ¶
TaskWrapper 包装 TaskWrapper 并返回等效项,任务执行前处理,一般可以处理通用的逻辑,如context设置、打印参数、限流设置等