gojobs

package
v1.0.47 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 20, 2022 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CodeAbnormal = 0                              // 异常
	CodeError    = http.StatusInternalServerError // 失败
	CodeSuccess  = http.StatusOK                  // 成功
	CodeEnd      = http.StatusCreated             // 结束
)
View Source
const (
	// JobSaveDir 定时任务任务保存目录
	JobSaveDir = "/cron/jobs/"
	// JobWorkerDir 服务注册目录
	JobWorkerDir = "/cron/workers/"
)
View Source
const (
	TASK_IN      = "IN"      // 任务运行
	TASK_SUCCESS = "SUCCESS" // 任务完成
	TASK_ERROR   = "ERROR"   // 任务异常
	TASK_TIMEOUT = "TIMEOUT" // 任务超时
	TASK_WAIT    = "WAIT"    // 任务等待
)

Variables

This section is empty.

Functions

func GetHour

func GetHour(n int64) *hour

GetHour 每天n点执行一次

func GetHourInterval

func GetHourInterval(n int64) *hourInterval

GetHourInterval 每隔n小时执行一次

func GetMinutes

func GetMinutes(n int64) *minutes

GetMinutes 每隔n分钟执行一次

func GetSeconds

func GetSeconds(n int64) *seconds

GetSeconds 每隔n秒执行一次

func NewJobsXorm

func NewJobsXorm(db *xorm.Engine) *jobsXorm

NewJobsXorm 初始化

func NewJobsZorm

func NewJobsZorm(db *zorm.DBDao) *jobsZorm

NewJobsZorm 初始化

Types

type Client

type Client struct {
	ClientConfig                  // 配置
	Conn         *grpc.ClientConn // 链接信息
}

Client 定时任务

func NewClient

func NewClient(config *ClientConfig) *Client

NewClient 创建客户端

type ClientConfig

type ClientConfig struct {
	Address string // 服务端口 127.0.0.1:8888
}

ClientConfig 客户端配置

type ConfigCreateInCustomId

type ConfigCreateInCustomId struct {
	Tx             *gorm.DB // 驱动
	Params         string   // 参数
	Frequency      int64    // 频率(秒单位)
	CustomId       string   // 自定义编号
	CustomSequence int64    // 自定义顺序
	Type           string   // 类型
	SpecifyIp      string   // 指定外网IP
	CurrentIp      string   // 当前ip
}

ConfigCreateInCustomId 创建正在运行任务

type ConfigCreateInCustomIdMaxNumber

type ConfigCreateInCustomIdMaxNumber struct {
	Tx             *gorm.DB // 驱动
	Params         string   // 参数
	Frequency      int64    // 频率(秒单位)
	MaxNumber      int64    // 最大次数
	CustomId       string   // 自定义编号
	CustomSequence int64    // 自定义顺序
	Type           string   // 类型
	SpecifyIp      string   // 指定外网IP
	CurrentIp      string   // 当前ip
}

ConfigCreateInCustomIdMaxNumber 创建正在运行任务并限制数量

type ConfigCreateInCustomIdMaxNumberOnly

type ConfigCreateInCustomIdMaxNumberOnly struct {
	Tx             *gorm.DB // 驱动
	Params         string   // 参数
	Frequency      int64    // 频率(秒单位)
	MaxNumber      int64    // 最大次数
	CustomId       string   // 自定义编号
	CustomSequence int64    // 自定义顺序
	Type           string   // 类型
	SpecifyIp      string   // 指定外网IP
	CurrentIp      string   // 当前ip
}

ConfigCreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量

type ConfigCreateInCustomIdOnly

type ConfigCreateInCustomIdOnly struct {
	Tx             *gorm.DB // 驱动
	Params         string   // 参数
	Frequency      int64    // 频率(秒单位)
	CustomId       string   // 自定义编号
	CustomSequence int64    // 自定义顺序
	Type           string   // 类型
	SpecifyIp      string   // 指定外网IP
	CurrentIp      string   // 当前ip
}

ConfigCreateInCustomIdOnly 创建正在运行唯一任务

type Cron

type Cron struct {
	// contains filtered or unexported fields
}

Cron 定时任务管理器

func NewCron

func NewCron() *Cron

NewCron 创建一个定时任务管理器

func (*Cron) AddJobByFunc

func (c *Cron) AddJobByFunc(id string, spec string, f func()) error

AddJobByFunc 添加函数作为定时任务 id:唯一任务id spec:配置定时执行时间表达式 f:需要执行的任务方法

func (*Cron) AddJobByInterface

func (c *Cron) AddJobByInterface(id string, spec string, cmd cron.Job) error

AddJobByInterface 实现接口的方式添加定时任务 id:唯一任务id spec:配置定时执行时间表达式 cmd:需要执行的任务方法

func (*Cron) DelByID

func (c *Cron) DelByID(id string)

DelByID 删除任务 id:唯一任务id

func (*Cron) Ids

func (c *Cron) Ids() []string

Ids ...

func (*Cron) IsExistsJob

func (c *Cron) IsExistsJob(id string) bool

IsExistsJob 判断是否存在任务 id:唯一任务id

func (*Cron) Start

func (c *Cron) Start()

Start 启动任务

func (*Cron) Stop

func (c *Cron) Stop()

Stop 关闭任务

type CronConfig

type CronConfig struct {
	Address string // 服务端口 127.0.0.1:8888
}

CronConfig 定时任务配置

type Etcd

type Etcd struct {
	EtcdConfig                  // 配置
	Client     *clientv3.Client // 驱动
	Kv         clientv3.KV      // kv API子集
	Lease      clientv3.Lease   // lease(租约)对象
	// contains filtered or unexported fields
}

Etcd etcd

func NewEtcdServer

func NewEtcdServer(config *EtcdConfig) (*Etcd, error)

NewEtcdServer 创建 etcd server

func NewEtcdWorker

func NewEtcdWorker(config *EtcdConfig) (*Etcd, error)

NewEtcdWorker 创建 etcd Worker

func (Etcd) Close

func (e Etcd) Close()

Close 关闭

func (Etcd) Create

func (e Etcd) Create(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error)

Create 创建

func (Etcd) Delete

func (e Etcd) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error)

Delete 删除

func (Etcd) ExtractWorkerIP

func (e Etcd) ExtractWorkerIP(regKey string) string

ExtractWorkerIP 提取worker的IP

func (Etcd) Get

func (e Etcd) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)

Get 获取

func (Etcd) GetWatchKey

func (e Etcd) GetWatchKey() string

GetWatchKey 监听的key

func (Etcd) IssueWatchKey

func (e Etcd) IssueWatchKey(ip string) string

IssueWatchKey 下发的key

func (Etcd) ListWorkers

func (e Etcd) ListWorkers() (workerArr []string, err error)

ListWorkers 获取在线worker列表

func (Etcd) RegisterWorker

func (e Etcd) RegisterWorker()

RegisterWorker 注册worker

func (Etcd) Update

func (e Etcd) Update(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error)

Update 更新

func (Etcd) Watch

func (e Etcd) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan

Watch 监听

type EtcdConfig

type EtcdConfig struct {
	Endpoints       []string      // 接口 []string{"http://127.0.0.1:2379"}
	DialTimeout     time.Duration // time.Second * 5
	LocalIP         string        // 本机IP
	Username        string        // 用户名
	Password        string        // 密码
	CustomDirectory string        // 自定义目录,后面不需要/
	Debug           bool          // 是否打印
}

EtcdConfig etcd配置

type GrpcCron

type GrpcCron struct {
	CronConfig                  // 配置
	Pub        pb.PubSubClient  // 订阅
	Conn       *grpc.ClientConn // 链接信息
}

GrpcCron 定时任务

func NewGrpcCron

func NewGrpcCron(config *CronConfig) *GrpcCron

NewGrpcCron 创建定时任务

func (*GrpcCron) Send

func (c *GrpcCron) Send(in *pb.PublishRequest) (*pb.PublishResponse, error)

Send 发送

type JobsBeegoOrm

type JobsBeegoOrm struct {
	Db *orm.Ormer
}

func NewJobsBeegoOrm

func NewJobsBeegoOrm(db *orm.Ormer) *JobsBeegoOrm

type JobsGorm

type JobsGorm struct {
	// contains filtered or unexported fields
}

JobsGorm Gorm数据库驱动

func NewJobsGorm

func NewJobsGorm(attrs ...*OperationAttr) (*JobsGorm, error)

NewJobsGorm 初始化 WithGormClient && WithRedisClient && WithLockPrefix && WithOutsideIp WithGormClient && WithEtcdClient && WithLockPrefix && WithOutsideIp

func (*JobsGorm) CheckManyTask

func (j *JobsGorm) CheckManyTask(tx *gorm.DB, vs []jobs_gorm_model.Task)

CheckManyTask 多任务检查

func (*JobsGorm) CheckSingleTask

func (j *JobsGorm) CheckSingleTask(tx *gorm.DB, v jobs_gorm_model.Task)

CheckSingleTask 单任务检查

func (*JobsGorm) CreateInCustomId

func (j *JobsGorm) CreateInCustomId(config *ConfigCreateInCustomId) error

CreateInCustomId 创建正在运行任务

func (*JobsGorm) CreateInCustomIdMaxNumber

func (j *JobsGorm) CreateInCustomIdMaxNumber(config *ConfigCreateInCustomIdMaxNumber) error

CreateInCustomIdMaxNumber 创建正在运行任务并限制数量

func (*JobsGorm) CreateInCustomIdMaxNumberOnly

func (j *JobsGorm) CreateInCustomIdMaxNumberOnly(config *ConfigCreateInCustomIdMaxNumberOnly) error

CreateInCustomIdMaxNumberOnly 创建正在运行唯一任务并限制数量

func (*JobsGorm) CreateInCustomIdOnly

func (j *JobsGorm) CreateInCustomIdOnly(config *ConfigCreateInCustomIdOnly) error

CreateInCustomIdOnly 创建正在运行唯一任务

func (*JobsGorm) EditTask

func (j *JobsGorm) EditTask(tx *gorm.DB, id uint) *gorm.DB

EditTask 任务修改

func (*JobsGorm) GetDb

func (j *JobsGorm) GetDb() *gorm.DB

GetDb 数据库驱动

func (*JobsGorm) GetEtcd

func (j *JobsGorm) GetEtcd() *clientv3.Client

GetEtcd 分布式缓存驱动

func (*JobsGorm) GetEtcdIssueAddress

func (j *JobsGorm) GetEtcdIssueAddress(server *Etcd, v *jobs_gorm_model.Task) (address string, err error)

GetEtcdIssueAddress 获取ETCD下发地址

func (*JobsGorm) GetRedis

func (j *JobsGorm) GetRedis() *redis.Client

GetRedis 缓存数据库驱动

func (*JobsGorm) Lock

func (j *JobsGorm) Lock(info jobs_gorm_model.Task, id any) (string, error)

Lock 上锁

func (*JobsGorm) LockForever

func (j *JobsGorm) LockForever(info jobs_gorm_model.Task, id any) (string, error)

LockForever 永远上锁

func (*JobsGorm) RefreshIp

func (j *JobsGorm) RefreshIp(tx *gorm.DB)

RefreshIp 刷新Ip

func (*JobsGorm) Run

func (j *JobsGorm) Run(info jobs_gorm_model.Task, status int, desc string)

Run 运行

func (*JobsGorm) RunAddLog

func (j *JobsGorm) RunAddLog(id uint, runId string) error

RunAddLog 任务执行日志

func (*JobsGorm) TaskFindAll

func (j *JobsGorm) TaskFindAll(tx *gorm.DB, frequency int64) (results []jobs_gorm_model.Task)

TaskFindAll 查询多任务

func (*JobsGorm) TaskFindAllError

func (j *JobsGorm) TaskFindAllError(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task

TaskFindAllError 查询多任务 - 任务异常

func (*JobsGorm) TaskFindAllIn

func (j *JobsGorm) TaskFindAllIn(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task

TaskFindAllIn 查询多任务 - 任务运行

func (*JobsGorm) TaskFindAllSuccess

func (j *JobsGorm) TaskFindAllSuccess(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task

TaskFindAllSuccess 查询多任务 - 任务完成

func (*JobsGorm) TaskFindAllTimeout

func (j *JobsGorm) TaskFindAllTimeout(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task

TaskFindAllTimeout 查询多任务 - 任务超时

func (*JobsGorm) TaskFindAllWait

func (j *JobsGorm) TaskFindAllWait(tx *gorm.DB, frequency int64) []jobs_gorm_model.Task

TaskFindAllWait 查询多任务 - 任务等待

func (*JobsGorm) TaskIpInit

func (j *JobsGorm) TaskIpInit(tx *gorm.DB, ips map[string]string) bool

TaskIpInit 实例任务ip

func (*JobsGorm) TaskIpUpdate

func (j *JobsGorm) TaskIpUpdate(tx *gorm.DB, taskType, ips string) *gorm.DB

TaskIpUpdate 更新ip

func (*JobsGorm) TaskLogRunTake

func (j *JobsGorm) TaskLogRunTake(tx *gorm.DB, taskId uint, runId string) (result jobs_gorm_model.TaskLogRun)

TaskLogRunTake 查询任务执行日志

func (*JobsGorm) TaskTake

func (j *JobsGorm) TaskTake(tx *gorm.DB, customId string) (result jobs_gorm_model.Task)

TaskTake 查询单任务

func (*JobsGorm) TaskTakeError

func (j *JobsGorm) TaskTakeError(tx *gorm.DB, customId string) jobs_gorm_model.Task

TaskTakeError 查询单任务 - 任务异常

func (*JobsGorm) TaskTakeId

func (j *JobsGorm) TaskTakeId(tx *gorm.DB, id uint) (result jobs_gorm_model.Task)

TaskTakeId 查询单任务

func (*JobsGorm) TaskTakeIn

func (j *JobsGorm) TaskTakeIn(tx *gorm.DB, customId string) jobs_gorm_model.Task

TaskTakeIn 查询单任务 - 任务运行

func (*JobsGorm) TaskTakeSuccess

func (j *JobsGorm) TaskTakeSuccess(tx *gorm.DB, customId string) jobs_gorm_model.Task

TaskTakeSuccess 查询单任务 - 任务完成

func (*JobsGorm) TaskTakeTimeout

func (j *JobsGorm) TaskTakeTimeout(tx *gorm.DB, customId string) jobs_gorm_model.Task

TaskTakeTimeout 查询单任务 - 任务超时

func (*JobsGorm) TaskTakeWait

func (j *JobsGorm) TaskTakeWait(tx *gorm.DB, customId string) jobs_gorm_model.Task

TaskTakeWait 查询单任务 - 任务等待

func (*JobsGorm) TaskTypeTake

func (j *JobsGorm) TaskTypeTake(tx *gorm.DB, customId, Type string) (result jobs_gorm_model.Task)

TaskTypeTake 查询单任务

func (*JobsGorm) TaskTypeTakeError

func (j *JobsGorm) TaskTypeTakeError(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task

TaskTypeTakeError 查询单任务 - 任务异常

func (*JobsGorm) TaskTypeTakeIn

func (j *JobsGorm) TaskTypeTakeIn(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task

TaskTypeTakeIn 查询单任务 - 任务运行

func (*JobsGorm) TaskTypeTakeSuccess

func (j *JobsGorm) TaskTypeTakeSuccess(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task

TaskTypeTakeSuccess 查询单任务 - 任务完成

func (*JobsGorm) TaskTypeTakeTimeout

func (j *JobsGorm) TaskTypeTakeTimeout(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task

TaskTypeTakeTimeout 查询单任务 - 任务超时

func (*JobsGorm) TaskTypeTakeWait

func (j *JobsGorm) TaskTypeTakeWait(tx *gorm.DB, customId, Type string) jobs_gorm_model.Task

TaskTypeTakeWait 查询单任务 - 任务等待

func (*JobsGorm) Unlock

func (j *JobsGorm) Unlock(info jobs_gorm_model.Task, id any) error

Unlock Lock 解锁

func (*JobsGorm) UpdateFrequency

func (j *JobsGorm) UpdateFrequency(tx *gorm.DB, id uint, frequency int64) *gorm.DB

UpdateFrequency 更新任务频率

type OperationAttr

type OperationAttr struct {
	// contains filtered or unexported fields
}

OperationAttr 操作属性

func WithEtcdClient

func WithEtcdClient(etcdClient *clientv3.Client) *OperationAttr

WithEtcdClient 设置分布式缓存驱动

func WithGormClient

func WithGormClient(client *gorm.DB) *OperationAttr

WithGormClient 设置数据库驱动

func WithIpService

func WithIpService(ipService *goip.Client) *OperationAttr

WithIpService 设置ip服务

func WithLockPrefix

func WithLockPrefix(lockPrefix string) *OperationAttr

WithLockPrefix 设置锁Key前缀 redis:fmt.Sprintf("cron:lock:%v:%v", info.Type, id) etcd:fmt.Sprintf("cron/lock/%v/%v", info.Type, id)

func WithOutsideIp

func WithOutsideIp(outsideIp string) *OperationAttr

WithOutsideIp 设置外网ip

func WithRedisClient

func WithRedisClient(redisClient *redis.Client) *OperationAttr

WithRedisClient 设置缓存数据库驱动

type Server

type Server struct {
	ServerConfig                   // 配置
	Pub          *pubsub.Publisher // 订阅
	Conn         *grpc.Server      // 链接信息
}

Server 服务

func NewServer

func NewServer(config *ServerConfig) *Server

NewServer 创建服务和注册

func (*Server) StartCron

func (s *Server) StartCron()

StartCron 启动定时任务

func (*Server) StartUp

func (s *Server) StartUp()

StartUp 启动服务

type ServerConfig

type ServerConfig struct {
	PublishTimeout time.Duration // 控制发布时最大阻塞时间
	PubBuffer      int           // 缓冲区大小,控制每个订阅者的chan缓冲区大小
	Address        string        // 服务端口 0.0.0.0:8888
}

ServerConfig 服务配置

type Worker

type Worker struct {
	WorkerConfig                  // 配置
	Pub          pb.PubSubClient  // 订阅
	Conn         *grpc.ClientConn // 链接信息
}

Worker 工作

func NewWorker

func NewWorker(config *WorkerConfig) *Worker

NewWorker 创建工作

func (*Worker) StartCron

func (w *Worker) StartCron() pb.PubSub_SubscribeClient

StartCron 启动任务

func (*Worker) SubscribeCron

func (w *Worker) SubscribeCron() pb.PubSub_SubscribeClient

SubscribeCron 订阅服务

type WorkerConfig

type WorkerConfig struct {
	Address  string // 服务端口 127.0.0.1:8888
	ClientIp string // 自己的ip地址
}

WorkerConfig 工作配置

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL