Documentation ¶
Index ¶
- Constants
- func GetHour(n int64) *hour
- func GetHourInterval(n int64) *hourInterval
- func GetMinutes(n int64) *minutes
- func GetSeconds(n int64) *seconds
- func NewJobsXorm(db *xorm.Engine) *jobsXorm
- func NewJobsZorm(db *zorm.DBDao) *jobsZorm
- type Client
- type ClientConfig
- type ConfigCreateInCustomId
- type ConfigCreateInCustomIdMaxNumber
- type ConfigCreateInCustomIdMaxNumberOnly
- type ConfigCreateInCustomIdOnly
- type Cron
- type CronConfig
- type Etcd
- func (e Etcd) Close()
- func (e Etcd) Create(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error)
- func (e Etcd) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error)
- func (e Etcd) ExtractWorkerIP(regKey string) string
- func (e Etcd) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)
- func (e Etcd) GetWatchKey() string
- func (e Etcd) IssueWatchKey(ip string) string
- func (e Etcd) ListWorkers() (workerArr []string, err error)
- func (e Etcd) RegisterWorker()
- func (e Etcd) Update(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error)
- func (e Etcd) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan
- type EtcdConfig
- type GrpcCron
- type JobsBeegoOrm
- type JobsGorm
- func (j *JobsGorm) CheckManyTask(tx *gorm.DB, vs []jobs_gorm_model.Task)
- func (j *JobsGorm) CheckSingleTask(tx *gorm.DB, v jobs_gorm_model.Task)
- func (j *JobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error
- func (j *JobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error
- func (j *JobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error
- func (j *JobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error
- func (j *JobsGorm) EditTask(tx *gorm.DB, id uint) *gorm.DB
- func (j *JobsGorm) GetDb() *gorm.DB
- func (j *JobsGorm) GetEtcd() *clientv3.Client
- func (j *JobsGorm) GetEtcdIssueAddress(server *Etcd, v *jobs_gorm_model.Task) (address string, err error)
- func (j *JobsGorm) GetRedis() *redis.Client
- func (j *JobsGorm) Lock(info jobs_gorm_model.Task, id any) (string, error)
- func (j *JobsGorm) LockForever(info jobs_gorm_model.Task, id any) (string, error)
- func (j *JobsGorm) RefreshIp(tx *gorm.DB)
- func (j *JobsGorm) Run(info jobs_gorm_model.Task, status int, desc string)
- func (j *JobsGorm) RunAddLog(id uint, runId string) error
- func (j *JobsGorm) TaskFindAll(tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task)
- func (j *JobsGorm) TaskFindAllError(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task
- func (j *JobsGorm) TaskFindAllIn(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task
- func (j *JobsGorm) TaskFindAllSuccess(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task
- func (j *JobsGorm) TaskFindAllTimeout(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task
- func (j *JobsGorm) TaskFindAllWait(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task
- func (j *JobsGorm) TaskIpInit(tx *gorm.DB, ips map[string]string) bool
- func (j *JobsGorm) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB
- func (j *JobsGorm) TaskLogRunTake(tx *gorm.DB, taskId uint, runId string) (result jobs_gorm_model.TaskLogRun)
- func (j *JobsGorm) TaskTake(tx *gorm.DB, customId string) (result jobs_gorm_model.Task)
- func (j *JobsGorm) TaskTakeError(tx *gorm.DB, customId string) jobs_gorm_model.Task
- func (j *JobsGorm) TaskTakeId(tx *gorm.DB, id uint) (result jobs_gorm_model.Task)
- func (j *JobsGorm) TaskTakeIn(tx *gorm.DB, customId string) jobs_gorm_model.Task
- func (j *JobsGorm) TaskTakeSuccess(tx *gorm.DB, customId string) jobs_gorm_model.Task
- func (j *JobsGorm) TaskTakeTimeout(tx *gorm.DB, customId string) jobs_gorm_model.Task
- func (j *JobsGorm) TaskTakeWait(tx *gorm.DB, customId string) jobs_gorm_model.Task
- func (j *JobsGorm) TaskTypeTake(tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task)
- func (j *JobsGorm) TaskTypeTakeError(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task
- func (j *JobsGorm) TaskTypeTakeIn(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task
- func (j *JobsGorm) TaskTypeTakeSuccess(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task
- func (j *JobsGorm) TaskTypeTakeTimeout(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task
- func (j *JobsGorm) TaskTypeTakeWait(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task
- func (j *JobsGorm) Unlock(info jobs_gorm_model.Task, id any) error
- func (j *JobsGorm) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB
- type OperationAttr
- func WithEtcdClient(etcdClient *clientv3.Client) *OperationAttr
- func WithGormClient(client *gorm.DB) *OperationAttr
- func WithIpService(ipService *goip.Client) *OperationAttr
- func WithLockPrefix(lockPrefix string) *OperationAttr
- func WithOutsideIp(outsideIp string) *OperationAttr
- func WithRedisClient(redisClient *redis.Client) *OperationAttr
- type Server
- type ServerConfig
- type Worker
- type WorkerConfig
Constants ¶
const ( CodeAbnormal = 0 // 异常 CodeError = http.StatusInternalServerError // 失败 CodeSuccess = http.StatusOK // 成功 CodeEnd = http.StatusCreated // 结束 )
const ( // JobSaveDir 定时任务任务保存目录 JobSaveDir = "/cron/jobs/" // JobWorkerDir 服务注册目录 JobWorkerDir = "/cron/workers/" )
const ( TASK_IN = "IN" // 任务运行 TASK_SUCCESS = "SUCCESS" // 任务完成 TASK_ERROR = "ERROR" // 任务异常 TASK_TIMEOUT = "TIMEOUT" // 任务超时 TASK_WAIT = "WAIT" // 任务等待 )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ClientConfig ¶
type ClientConfig struct {
Address string // 服务端口 127.0.0.1:8888
}
ClientConfig 客户端配置
type ConfigCreateInCustomId ¶
type ConfigCreateInCustomId struct { Tx *gorm.DB // 驱动 Params string // 参数 Frequency int64 // 频率(秒单位) CustomId string // 自定义编号 CustomSequence int64 // 自定义顺序 Type string // 类型 SpecifyIp string // 指定外网IP CurrentIp string // 当前ip }
ConfigCreateInCustomId 创建正在运行任务
type ConfigCreateInCustomIdMaxNumber ¶
type ConfigCreateInCustomIdMaxNumber struct { Tx *gorm.DB // 驱动 Params string // 参数 Frequency int64 // 频率(秒单位) MaxNumber int64 // 最大次数 CustomId string // 自定义编号 CustomSequence int64 // 自定义顺序 Type string // 类型 SpecifyIp string // 指定外网IP CurrentIp string // 当前ip }
ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量
type ConfigCreateInCustomIdMaxNumberOnly ¶
type ConfigCreateInCustomIdMaxNumberOnly struct { Tx *gorm.DB // 驱动 Params string // 参数 Frequency int64 // 频率(秒单位) MaxNumber int64 // 最大次数 CustomId string // 自定义编号 CustomSequence int64 // 自定义顺序 Type string // 类型 SpecifyIp string // 指定外网IP CurrentIp string // 当前ip }
ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
type ConfigCreateInCustomIdOnly ¶
type ConfigCreateInCustomIdOnly struct { Tx *gorm.DB // 驱动 Params string // 参数 Frequency int64 // 频率(秒单位) CustomId string // 自定义编号 CustomSequence int64 // 自定义顺序 Type string // 类型 SpecifyIp string // 指定外网IP CurrentIp string // 当前ip }
ConfigCreateInCustomIdOnly 创建正在运行唯一任务
type Cron ¶
type Cron struct {
// contains filtered or unexported fields
}
Cron 定时任务管理器
func (*Cron) AddJobByFunc ¶
AddJobByFunc 添加函数作为定时任务 id:唯一任务id spec:配置定时执行时间表达式 f:需要执行的任务方法
func (*Cron) AddJobByInterface ¶
AddJobByInterface 实现接口的方式添加定时任务 id:唯一任务id spec:配置定时执行时间表达式 cmd:需要执行的任务方法
func (*Cron) IsExistsJob ¶
IsExistsJob 判断是否存在任务 id:唯一任务id
type CronConfig ¶
type CronConfig struct {
Address string // 服务端口 127.0.0.1:8888
}
CronConfig 定时任务配置
type Etcd ¶
type Etcd struct { EtcdConfig // 配置 Client *clientv3.Client // 驱动 Kv clientv3.KV // kv API子集 Lease clientv3.Lease // lease(租约)对象 // contains filtered or unexported fields }
Etcd etcd
func NewEtcdServer ¶
func NewEtcdServer(config *EtcdConfig) (*Etcd, error)
NewEtcdServer 创建 etcd server
func NewEtcdWorker ¶
func NewEtcdWorker(config *EtcdConfig) (*Etcd, error)
NewEtcdWorker 创建 etcd Worker
func (Etcd) Create ¶
func (e Etcd) Create(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error)
Create 创建
func (Etcd) Delete ¶
func (e Etcd) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error)
Delete 删除
func (Etcd) ExtractWorkerIP ¶
ExtractWorkerIP 提取worker的IP
func (Etcd) Get ¶
func (e Etcd) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)
Get 获取
func (Etcd) ListWorkers ¶
ListWorkers 获取在线worker列表
type EtcdConfig ¶
type EtcdConfig struct { Endpoints []string // 接口 []string{"http://127.0.0.1:2379"} DialTimeout time.Duration // time.Second * 5 LocalIP string // 本机IP Username string // 用户名 Password string // 密码 CustomDirectory string // 自定义目录,后面不需要/ Debug bool // 是否打印 }
EtcdConfig etcd配置
type GrpcCron ¶
type GrpcCron struct { CronConfig // 配置 Pub pb.PubSubClient // 订阅 Conn *grpc.ClientConn // 链接信息 }
GrpcCron 定时任务
func (*GrpcCron) Send ¶
func (c *GrpcCron) Send(in *pb.PublishRequest) (*pb.PublishResponse, error)
Send 发送
type JobsBeegoOrm ¶
func NewJobsBeegoOrm ¶
func NewJobsBeegoOrm(db *orm.Ormer) *JobsBeegoOrm
type JobsGorm ¶
type JobsGorm struct {
// contains filtered or unexported fields
}
JobsGorm Gorm数据库驱动
func NewJobsGorm ¶
func NewJobsGorm(attrs ...*OperationAttr) (*JobsGorm, error)
NewJobsGorm 初始化 WithGormClient && WithRedisClient && WithLockPrefix && WithOutsideIp WithGormClient && WithEtcdClient && WithLockPrefix && WithOutsideIp
func (*JobsGorm) CheckManyTask ¶
func (j *JobsGorm) CheckManyTask(tx *gorm.DB, vs []jobs_gorm_model.Task)
CheckManyTask 多任务检查
func (*JobsGorm) CheckSingleTask ¶
func (j *JobsGorm) CheckSingleTask(tx *gorm.DB, v jobs_gorm_model.Task)
CheckSingleTask 单任务检查
func (*JobsGorm) CreateInCustomId ¶
func (j *JobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error
CreateInCustomId 创建正在运行任务
func (*JobsGorm) CreateInCustomIdMaxNumber ¶
func (j *JobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error
CreateInCustomIdMaxNumber 创建正在运行任务并限制数量
func (*JobsGorm) CreateInCustomIdMaxNumberOnly ¶
func (j *JobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error
CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量
func (*JobsGorm) CreateInCustomIdOnly ¶
func (j *JobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error
CreateInCustomIdOnly 创建正在运行唯一任务
func (*JobsGorm) GetEtcdIssueAddress ¶
func (j *JobsGorm) GetEtcdIssueAddress(server *Etcd, v *jobs_gorm_model.Task) (address string, err error)
GetEtcdIssueAddress 获取ETCD下发地址
func (*JobsGorm) LockForever ¶
LockForever 永远上锁
func (*JobsGorm) Run ¶
func (j *JobsGorm) Run(info jobs_gorm_model.Task, status int, desc string)
Run 运行
func (*JobsGorm) TaskFindAll ¶
TaskFindAll 查询多任务
func (*JobsGorm) TaskFindAllError ¶
TaskFindAllError 查询多任务 - 任务异常
func (*JobsGorm) TaskFindAllIn ¶
TaskFindAllIn 查询多任务 - 任务运行
func (*JobsGorm) TaskFindAllSuccess ¶
TaskFindAllSuccess 查询多任务 - 任务完成
func (*JobsGorm) TaskFindAllTimeout ¶
TaskFindAllTimeout 查询多任务 - 任务超时
func (*JobsGorm) TaskFindAllWait ¶
TaskFindAllWait 查询多任务 - 任务等待
func (*JobsGorm) TaskIpInit ¶
TaskIpInit 实例任务ip
func (*JobsGorm) TaskIpUpdate ¶
TaskIpUpdate 更新ip
func (*JobsGorm) TaskLogRunTake ¶
func (j *JobsGorm) TaskLogRunTake(tx *gorm.DB, taskId uint, runId string) (result jobs_gorm_model.TaskLogRun)
TaskLogRunTake 查询任务执行日志
func (*JobsGorm) TaskTakeError ¶
TaskTakeError 查询单任务 - 任务异常
func (*JobsGorm) TaskTakeId ¶
TaskTakeId 查询单任务
func (*JobsGorm) TaskTakeIn ¶
TaskTakeIn 查询单任务 - 任务运行
func (*JobsGorm) TaskTakeSuccess ¶
TaskTakeSuccess 查询单任务 - 任务完成
func (*JobsGorm) TaskTakeTimeout ¶
TaskTakeTimeout 查询单任务 - 任务超时
func (*JobsGorm) TaskTakeWait ¶
TaskTakeWait 查询单任务 - 任务等待
func (*JobsGorm) TaskTypeTake ¶
TaskTypeTake 查询单任务
func (*JobsGorm) TaskTypeTakeError ¶
TaskTypeTakeError 查询单任务 - 任务异常
func (*JobsGorm) TaskTypeTakeIn ¶
TaskTypeTakeIn 查询单任务 - 任务运行
func (*JobsGorm) TaskTypeTakeSuccess ¶
TaskTypeTakeSuccess 查询单任务 - 任务完成
func (*JobsGorm) TaskTypeTakeTimeout ¶
TaskTypeTakeTimeout 查询单任务 - 任务超时
func (*JobsGorm) TaskTypeTakeWait ¶
TaskTypeTakeWait 查询单任务 - 任务等待
type OperationAttr ¶
type OperationAttr struct {
// contains filtered or unexported fields
}
OperationAttr 操作属性
func WithEtcdClient ¶
func WithEtcdClient(etcdClient *clientv3.Client) *OperationAttr
WithEtcdClient 设置分布式缓存驱动
func WithLockPrefix ¶
func WithLockPrefix(lockPrefix string) *OperationAttr
WithLockPrefix 设置锁Key前缀 redis:fmt.Sprintf("cron:lock:%v:%v", info.Type, id) etcd:fmt.Sprintf("cron/lock/%v/%v", info.Type, id)
func WithRedisClient ¶
func WithRedisClient(redisClient *redis.Client) *OperationAttr
WithRedisClient 设置缓存数据库驱动
type Server ¶
type Server struct { ServerConfig // 配置 Pub *pubsub.Publisher // 订阅 Conn *grpc.Server // 链接信息 }
Server 服务
type ServerConfig ¶
type ServerConfig struct { PublishTimeout time.Duration // 控制发布时最大阻塞时间 PubBuffer int // 缓冲区大小,控制每个订阅者的chan缓冲区大小 Address string // 服务端口 0.0.0.0:8888 }
ServerConfig 服务配置
type Worker ¶
type Worker struct { WorkerConfig // 配置 Pub pb.PubSubClient // 订阅 Conn *grpc.ClientConn // 链接信息 }
Worker 工作
func (*Worker) SubscribeCron ¶
func (w *Worker) SubscribeCron() pb.PubSub_SubscribeClient
SubscribeCron 订阅服务
type WorkerConfig ¶
WorkerConfig 工作配置
Source Files ¶
- cron.go
- cron_spec.go
- etcd.go
- etcd_curd.go
- etcd_server.go
- etcd_worker.go
- grpc.go
- grpc_client.go
- grpc_cron.go
- grpc_server.go
- grpc_worker.go
- jobs.go
- jobs_beego_orm.go
- jobs_gorm.go
- jobs_gorm_check_task.go
- jobs_gorm_etcd.go
- jobs_gorm_get.go
- jobs_gorm_ip.go
- jobs_gorm_lock.go
- jobs_gorm_model.go
- jobs_gorm_run.go
- jobs_xorm.go
- jobs_zorm.go
- operation_attr.go