xmachinery
介绍
xmachinery 是github.com/RichardKnop/machinery 的扩展包,添加定时任务管理API,扩展支持广播任务
核心代码
定时任务管理
func (server *XServer) registerScheduledTask(task ScheduledTask) error {
//检查cron表达式
schedule, err := secondsParser.Parse(task.Spec)
if err != nil {
return err
}
//移除旧任务
server.removeScheduledTask(task.Id)
//包装任务函数
f := func() {
//抢占任务锁
err := server.machineryLock.LockWithRetries(utils.GetLockName(task.TaskCode, task.Spec), schedule.Next(time.Now()).UnixNano()-1)
if err != nil {
return
}
//创建任务签名
signature := task.Signature()
//发送任务
_, err = server.SendTask(signature)
if err != nil {
log.ERROR.Printf("scheduled task failed. task id is: %s. task name is: %s. error is %s", task.Id, task.TaskCode, err.Error())
}
}
//添加定时任务
entryId, err := server.scheduler.AddFunc(task.Spec, f)
newTask := NewScheduledTask(task.Id, task.TaskCode, task.Spec, task.TaskQueue, task.Args...)
newTask.entryId = entryId
//存储定时任务信息
server.registeredScheduledTasks[task.Id] = newTask
return err
}
广播任务broker
func (b *BrokerBroadcast) nextBroadCastTask(queue string) (result []byte, err error) {
//默认拉取消息间隔为1000ms
pollPeriodMilliseconds := 1000
if b.GetConfig().Redis != nil {
configuredPollPeriod := b.GetConfig().Redis.NormalTasksPollPeriod
if configuredPollPeriod > 0 {
pollPeriodMilliseconds = configuredPollPeriod
}
}
pollPeriod := time.Duration(pollPeriodMilliseconds) * time.Millisecond
if b.lastBroadcastMsgId == "" {
//若客户端记录的最新消息id为空,则从流中最新的消息id
msgs, err := b.rclient.XRevRangeN(context.Background(), queue, "+", "-", 1).Result()
if err != nil {
return []byte{}, err
}
if len(msgs) == 0 {
b.lastBroadcastMsgId = "0"
return []byte{}, redis.Nil
}
b.lastBroadcastMsgId = msgs[0].ID
}
//消费广播消息,一次一条
streams, err := b.rclient.XRead(context.Background(), &redis.XReadArgs{
Streams: []string{queue, b.lastBroadcastMsgId},
Count: 1,
Block: pollPeriod,
}).Result()
if err != nil {
return []byte{}, err
}
if len(streams) == 0 || len(streams[0].Messages) == 0 {
return []byte{}, redis.Nil
}
msg := streams[0].Messages[0]
b.lastBroadcastMsgId = msg.ID
signatureV := msg.Values[defaultStreamBroadcastMsgKey]
signatureStr, ok := signatureV.(string)
if ok {
return []byte(signatureStr), nil
}
return []byte{}, fmt.Errorf("not support msg type")
}
func (b *BrokerBroadcast) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) {
...
// 监听广播任务的协程
// 任务到达时直接投递给消费者
b.broadcastWG.Add(1)
go func() {
defer b.broadcastWG.Done()
for {
select {
// 监听消费者是否结束
case <-b.GetStopChan():
return
default:
task, _ := b.nextBroadCastTask(b.redisBroadcastTasksKey)
if len(task) > 0 {
deliveries <- task
}
}
}
}()
...
}
示例
定时任务管理api
package main
import (
"fmt"
"gitee.com/sqxwww/xmachinery"
"github.com/RichardKnop/machinery/v2"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
redisbroker "github.com/RichardKnop/machinery/v2/brokers/redis"
"github.com/RichardKnop/machinery/v2/config"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2/tasks"
)
var server *xmachinery.XServer
func init() {
server, _ = startServer()
}
func main() {
server.RegisterScheduledTask(&xmachinery.ScheduledTask{
Id: "countDoneScheduler",
TaskCode: "countDown",
Spec: "0/2 * * * * ?",
Args: []tasks.Arg{{Type: "int", Value: 5}},
})
worker()
}
func countDown(count int) error {
if count <= 0 {
fmt.Println("removing countDoneScheduler")
//移除定时任务
server.RemoveScheduledTask("countDoneScheduler")
return nil
}
fmt.Println("current count is ", count)
count--
//替换定时任务
server.RegisterScheduledTask(&xmachinery.ScheduledTask{
Id: "countDoneScheduler",
TaskCode: "countDown",
Spec: "0/2 * * * * ?",
Args: []tasks.Arg{{Type: "int", Value: count}},
})
return nil
}
func startServer() (*xmachinery.XServer, error) {
cnf := &config.Config{
DefaultQueue: "machinery_tasks",
ResultsExpireIn: 3600,
Redis: &config.RedisConfig{
MaxIdle: 3,
IdleTimeout: 240,
ReadTimeout: 15,
WriteTimeout: 15,
ConnectTimeout: 15,
NormalTasksPollPeriod: 1000,
DelayedTasksPollPeriod: 500,
},
}
broker := redisbroker.New(cnf, "localhost:6379", "", "", 0)
backend := redisbackend.New(cnf, "localhost:6379", "", "", 0)
lock := eagerlock.New()
tmp := machinery.NewServer(cnf, broker, backend, lock)
server := xmachinery.NewServer(tmp)
tasks := map[string]interface{}{
"countDown": countDown,
}
return server, server.RegisterTasks(tasks)
}
func worker() error {
consumerTag := "machinery_worker"
worker := server.NewWorker(consumerTag, 0)
return worker.Launch()
}
广播任务broker
package main
import (
"fmt"
broadcastbroker "gitee.com/sqxwww/xmachinery/brokers/redis"
"github.com/RichardKnop/machinery/v2"
redisbackend "github.com/RichardKnop/machinery/v2/backends/redis"
"github.com/RichardKnop/machinery/v2/config"
eagerlock "github.com/RichardKnop/machinery/v2/locks/eager"
"github.com/RichardKnop/machinery/v2/tasks"
"sync"
"time"
)
var server *machinery.Server
func init() {
server, _ = startServer()
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
worker()
}()
}
time.Sleep(time.Second)
singature, _ := tasks.NewSignature("hello", nil)
//设置广播任务头
singature.Headers = map[string]interface{}{"broadcastTask": ""}
server.SendTask(singature)
wg.Wait()
}
func startServer() (*machinery.Server, error) {
cnf := &config.Config{
DefaultQueue: "machinery_tasks",
ResultsExpireIn: 3600,
Redis: &config.RedisConfig{
MaxIdle: 3,
IdleTimeout: 240,
ReadTimeout: 15,
WriteTimeout: 15,
ConnectTimeout: 15,
NormalTasksPollPeriod: 1000,
DelayedTasksPollPeriod: 500,
},
}
//使用支持广播的broker
broker := broadcastbroker.New(cnf, []string{"localhost:6379"}, 0)
backend := redisbackend.New(cnf, "localhost:6379", "", "", 0)
lock := eagerlock.New()
server := machinery.NewServer(cnf, broker, backend, lock)
tasks := map[string]interface{}{
"hello": func() error {
fmt.Println("hello broadcast broker")
return nil
},
}
return server, server.RegisterTasks(tasks)
}
func worker() error {
consumerTag := "machinery_worker"
worker := server.NewWorker(consumerTag, 0)
errorsChan := make(chan error)
worker.LaunchAsync(errorsChan)
return <-errorsChan
}