rtask

package module
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2021 License: MIT Imports: 12 Imported by: 0

README

RTask

RTask 是 Golang 一款基于 Redis 和 Cron 的简易任务队列。

快速上手

您需要使用 Go Module 导入 RTask 工具包。

go get -u github.com/avtion/rtask

使用教程

package main

import (
	"context"
	"log"
	"time"

	"github.com/avtion/rtask"
	"github.com/go-redis/redis/v8"
)

func main() {
	// set your job handler.
	jobHandler := func(ctx context.Context, currentJob *rtask.Job) error {
		log.Printf("process job, id: %s, payload: %s", currentJob.ID, string(currentJob.Payload))
		return nil
	}
	timeoutJobHandler := func(ctx context.Context, currentJob *rtask.Job) error {
		log.Printf("process timeout job, id: %s , payload: %s, beginAt: %s, ttl: %s",
			currentJob.ID, string(currentJob.Payload), currentJob.BeginAt, currentJob.TTL,
		)
		return nil
	}

	// rtask need redis, connect to redis first.
	rc := redis.NewClient(&redis.Options{})
	
	// then build a new task controller.
	tc, err := rtask.NewTaskController(rc, time.Minute,
		// rtask.WithBlock(),
		rtask.WithRedisKey("testRTask"),
		rtask.WithJobHandler(jobHandler),
		rtask.WithTimeoutJobHandler(timeoutJobHandler),
	)
	if err != nil {
		log.Fatalln(err)
	}

	// add some job need to do.
	_, _ = tc.AddJob(time.Now().Add(time.Minute), []byte("after one min"))
	_, _ = tc.AddJob(time.Now().Add(time.Minute), []byte("after one min, but timeout"), rtask.WithTTL(time.Millisecond))
	_, _ = tc.AddJob(time.Now().Add(2*time.Minute), []byte("after two min"))
	
	// run the task handler.
	tc.Run()
}

许可协议

MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BuildTaskControllerOption

type BuildTaskControllerOption func(tc *TaskController) error

func WithBlock

func WithBlock() BuildTaskControllerOption

WithBlock set cron goroutine block

func WithJobHandler

func WithJobHandler(fn func(ctx context.Context, currentJob *Job) error) BuildTaskControllerOption

WithJobHandler set job handler.

func WithRedisKey

func WithRedisKey(key string) BuildTaskControllerOption

WithRedisKey set task controller redis key.

func WithTimeoutJobHandler

func WithTimeoutJobHandler(fn func(ctx context.Context, currentJob *Job) error) BuildTaskControllerOption

WithTimeoutJobHandler set timeout job handler.

type Job

type Job struct {
	// 任务的唯一标识, 根据 Payload 和 BeginAt 决定
	ID string `msgpack:"id"`
	// 任务载荷
	Payload []byte `msgpack:"payload"`
	// 任务开始运行时间
	BeginAt time.Time `msgpack:"beginAt"`
	// 任务超时时间, 如果轮询任务执行时间大于任务开始时间加超时时间, 任务会失效
	// 如果 TTL 等于 0 则认为是任务不会超时, 直至成功为止
	TTL time.Duration `msgpack:"ttl"`
}

type JobOption

type JobOption func(j *Job) error

func WithTTL

func WithTTL(ttl time.Duration) JobOption

WithTTL set job ttl.

type TaskController

type TaskController struct {

	// 任务处理函数, 任务中心将会调用该函数处理所有的 Job
	JobHandler func(ctx context.Context, currentJob *Job) error
	// 超时任务处理函数
	TimeoutJobHandler func(ctx context.Context, currentJob *Job) error
	// contains filtered or unexported fields
}

func NewTaskController

func NewTaskController(rc *redis.Client, interval time.Duration, opts ...BuildTaskControllerOption) (*TaskController, error)

NewTaskController build a new task controller.

func (*TaskController) AddJob

func (tc *TaskController) AddJob(beginAt time.Time, payload []byte, opts ...JobOption) (string, error)

AddJob add a new job to task controller.

func (*TaskController) ListJobs

func (tc *TaskController) ListJobs() ([]*Job, error)

ListJobs list all jobs.

func (*TaskController) RemoveJobs

func (tc *TaskController) RemoveJobs(jobIDs ...string) (int64, error)

RemoveJobs remove jobs.

func (*TaskController) Run

func (tc *TaskController) Run()

Run begin process task.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL