gozzzworker

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 14, 2021 License: MIT Imports: 11 Imported by: 0

README

gozzzworker

Go Report Card

gozzzworker is Go-based background tasks worker.

Now:

  • Run worker to execute task
  • Specify execution time
  • Supported redis
  • Return json message after task finished
  • Task priority

Future:

  • Retry task when failed
  • Versioning?
  • RabbitMQ?

Installation

To install
go get github.com/zondatw/gozzzworker

To import
import "github.com/zondatw/gozzzworker"

Quickstart

task function need to follow rule:

func(args json.RawMessage) (interface{}, error)

and register function using

workerObj.RegisterTaskFunction("Task Name", taskFunction)

WorkerSetting:

&gozzzworker.WorkerSetting{
    Size:     3,                   // How many concurrent workers do you want
    Address:  "localhost:6379",    // Redis path
    Password: "",                  // Redis password, set empty string if no password 
    DB:       0,                   // Redis DB number
}

Example quicker start:

package main

import (
	"encoding/json"
	"errors"
	"fmt"

	"github.com/zondatw/gozzzworker"
)

func task1(args json.RawMessage) (interface{}, error) {
	type ArgType struct {
		A int    `json:"a"`
		B string `json:"b"`
	}
	var argData ArgType
	json.Unmarshal(args, &argData)
	fmt.Println("Task 1:", argData.A, argData.B)

	type retType struct {
		C int    `json:"c"`
		D string `json:"d"`
	}
	ret := &retType{
		C: 123456,
		D: "Yooooooooooooooooooooooooooooooooooo",
	}
	return ret, nil
}

func task2(args json.RawMessage) (interface{}, error) {
	fmt.Println("Task 2:", args)
	return "", errors.New("yooooooooo")
}

func main() {
	w := gozzzworker.NewWorker(&gozzzworker.WorkerSetting{
		Size:     3,
		Address:  "localhost:6379",
		Password: "",
		DB:       0,
	})
	w.RegisterTaskFunction("Task 1", task1)
	w.RegisterTaskFunction("Task 2", task2)
	w.RegisterTaskFunction("Task 3", task3)
	w.Run()
}

And you can push test data to redis, just follow rule:

# HASH type
key: gozzzworker:task:msg
field: 1 (task id need match gozzzworker:task:queue value)
value: '{"task":"Task 1","args":{"a":1,"b":"yoooo"},"priority":0}' (json format args)

# ZSet
key: gozzzworker:task:queue
value: 1 (task id need match gozzzworker:task:msg field)
score: 123 (timestamp what executed time do you want)

example redis command:

HSET gozzzworker:task:msg 1 '{"task":"Task 1","args":{"a":1,"b":"yoooo"},"priority":0}'
ZAdd gozzzworker:task:queue 123 1

Get return message after execute task

type retMsgType struct {
	Status string `json:"status"` // complete execution
	Msg    string `json:"msg"` // return message json type
}
# HASH type
key: gozzzworker:task:ret
field: 1 (task id need match gozzzworker:task:msg field)

example redis command:

HGET gozzzworker:task:ret 1

Return message example:

  • success
    {"status": "Success", "msg": {"c":123456,"d":"Yooooooooooooooooooooooooooooooooooo"}}
    
  • fail
    {"status": "Fail", "msg": {"Error": "yooooooooo"}}
    

or you can using gozzzproducer

Close

You can send following signals to close worker

  • os.Interrupt
  • SIGTERM
  • SIGINT
  • SIGQUIT

Reference

dramatiq
The Case For A Go Worker Pool

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is broker struct

func NewBroker

func NewBroker(addTask func(string, string, json.RawMessage, int), taskRetChan <-chan *taskRetData, taskRetWg *sync.WaitGroup, address string, password string, db int) *Broker

NewBroker will initialize a new broker

func (*Broker) Run

func (b *Broker) Run()

Run get due tasks

type Pool

type Pool struct {
	TaskRetChan chan *taskRetData
	TaskRetWg   sync.WaitGroup
	// contains filtered or unexported fields
}

Pool is worker pool struct

func NewPool

func NewPool(size int) *Pool

NewPool will initialize a new pool

func (*Pool) AddTask

func (p *Pool) AddTask(taskID string, funcName string, args json.RawMessage, priority int)

AddTask to task chan

func (*Pool) End

func (p *Pool) End()

End safely close chan

func (*Pool) RegisterTaskFunction

func (p *Pool) RegisterTaskFunction(funcName string, function taskFuncType)

RegisterTaskFunction mapping

func (*Pool) Run

func (p *Pool) Run()

Run create pool to run all work

type PriorityQueue added in v0.0.3

type PriorityQueue []*TaskNode

PriorityQueue using minheap

func (PriorityQueue) Len added in v0.0.3

func (pq PriorityQueue) Len() int

func (PriorityQueue) Less added in v0.0.3

func (pq PriorityQueue) Less(next, current int) bool

func (*PriorityQueue) Pop added in v0.0.3

func (pq *PriorityQueue) Pop() interface{}

func (*PriorityQueue) Push added in v0.0.3

func (pq *PriorityQueue) Push(x interface{})

func (PriorityQueue) Swap added in v0.0.3

func (pq PriorityQueue) Swap(next, current int)

type RedisConn

type RedisConn struct {
	// contains filtered or unexported fields
}

RedisConn is redis connection struct

func NewRedisConn

func NewRedisConn(address string, password string, db int) *RedisConn

NewRedisConn create redis connection

func (*RedisConn) GetHashValue

func (rc *RedisConn) GetHashValue(key string, field string) (value string, retErr error)

GetHashValue get hash value

func (*RedisConn) GetZRangeByScoreLessThan

func (rc *RedisConn) GetZRangeByScoreLessThan(key string, number string) (valueArray []string, retErr error)

GetZRangeByScoreLessThan get ZSet values whose the score is less than number

func (*RedisConn) RemoveHash

func (rc *RedisConn) RemoveHash(key string, field string) (retErr error)

RemoveHash remove hash

func (*RedisConn) RemoveZSet

func (rc *RedisConn) RemoveZSet(key string, valueArray []string) (retErr error)

RemoveZSet remove values from ZSet

func (*RedisConn) SetHashValue

func (rc *RedisConn) SetHashValue(key string, field string, value string) (retErr error)

SetHashValue set Hash field and value

type Task

type Task struct {
	// contains filtered or unexported fields
}

Task struct

func NewTask

func NewTask(id string, function taskFuncType, args json.RawMessage) *Task

NewTask create new task

func (*Task) Run

func (t *Task) Run(wg *sync.WaitGroup) (retMsg interface{}, err error)

Run task function

type TaskJSONType

type TaskJSONType struct {
	Task     string          `json:"task"`
	Args     json.RawMessage `json:"args"`
	Priority int             `json:"priority"`
}

TaskJSONType is register task json schema

type TaskNode added in v0.0.3

type TaskNode struct {
	// contains filtered or unexported fields
}

TaskNode for PriorityQueue

type TaskSetting

type TaskSetting struct {
	// contains filtered or unexported fields
}

TaskSetting is a setting about task function mapping

func NewTaskSetting

func NewTaskSetting() *TaskSetting

NewTaskSetting create new task setting

func (*TaskSetting) Register

func (ts *TaskSetting) Register(funcName string, function taskFuncType)

Register function mapping

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker is worker struct

func NewWorker

func NewWorker(setting *WorkerSetting) *Worker

NewWorker will initialize a new worker

func (*Worker) BrokerRun

func (w *Worker) BrokerRun()

BrokerRun will run broker

func (*Worker) RegisterTaskFunction

func (w *Worker) RegisterTaskFunction(funcName string, function taskFuncType)

RegisterTaskFunction mapping

func (*Worker) Run

func (w *Worker) Run()

Run worker

type WorkerSetting

type WorkerSetting struct {
	Size     int
	Address  string
	Password string
	DB       int
}

WorkerSetting is worker setting

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL