README
¶
codingXiang/go-worker
Introduction
go-worker is a distributed task system
for Go
which implement master-worker architecture, there are few feature in this project:
- high available architecture
- schedule task with cron expression
- dynamic task assign
- simple restful api
- command line interface
Architecture
Single Master
Multiple Master
install
go get github.com/codingXiang/go-worker
Usage
master
package main
import (
"github.com/codingXiang/go-worker"
"github.com/coreos/etcd/clientv3"
"github.com/gocraft/work"
"github.com/gomodule/redigo/redis"
"time"
)
func main() {
//make a redis connection pool
redisPool := &redis.Pool{
MaxActive: 5,
MaxIdle: 5,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("a12345"))
},
}
//create single master node
//master := go_worker.NewMaster(redisPool, "demo", &go_worker.MasterOption{
// IsCluster: false,
//})
//create multiple master node with options
master := go_worker.NewMaster(redisPool, "demo", &go_worker.MasterOption{
IsCluster: true,
ETCDConfig: clientv3.Config{
Endpoints: []string{"localhost:32773", "localhost:32769", "localhost:32772"},
DialTimeout: 5 * time.Second,
},
})
//initialize master settings
master.Init()
//add task with cron spec
id, _ := master.AddTask("*/3 * * * * *", "send_email", work.Q{"address": "test@example.com", "subject": "hello world", "customer_id": 4})
//exec task by id
master.ExecTask(id)
select {}
}
worker
package main
import (
go_worker "github.com/codingXiang/go-worker"
"github.com/gocraft/work"
"github.com/gomodule/redigo/redis"
"log"
)
//define a custom job
type CustomJob struct {
customerID int64
Pool *work.WorkerPool
Cloud string `json:"cloud"`
Area string `json:"area"`
Namespace string `json:"namespace"`
Spec string `json:"spec"`
}
//custom must to implement `Do(job *work.Job) error` function
func (c *CustomJob) Do(job *work.Job) error {
log.Println(job.Name)
return nil
}
func main() {
//make a redis connection pool
redisPool := &redis.Pool{
MaxActive: 5,
MaxIdle: 5,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", ":6379", redis.DialPassword("a12345"))
},
}
//make a worker instance
worker := go_worker.NewWorker(10, "demo", redisPool)
//make a CustomJob instance
customJob := &CustomJob{}
//define job to worker with name, instance and options
worker.AddJob("send_email", customJob, nil)
//start worker
worker.Start()
select {}
}
Dependencies
Documentation
¶
Index ¶
- Constants
- func BuildKeyPath(basePath string, paths ...string) string
- type Enqueue
- type EnqueueEntity
- func (g *EnqueueEntity) AddArgs(key string, value interface{}) Enqueue
- func (g *EnqueueEntity) Do() (*work.Job, error)
- func (g *EnqueueEntity) GetArgs() map[string]interface{}
- func (g *EnqueueEntity) GetEntryID() cronV3.EntryID
- func (g *EnqueueEntity) GetID() string
- func (g *EnqueueEntity) GetInstance() *EnqueueEntity
- func (g *EnqueueEntity) GetJobName() string
- func (g *EnqueueEntity) GetSpec() string
- func (g *EnqueueEntity) RemoveArgs(key string) Enqueue
- func (g *EnqueueEntity) Run()
- func (g *EnqueueEntity) SetEntryID(id cronV3.EntryID) Enqueue
- func (g *EnqueueEntity) SetJobName(JobName string) Enqueue
- func (g *EnqueueEntity) SetSpec(Spec string) Enqueue
- type Job
- type Master
- type MasterClusterEntity
- func (g *MasterClusterEntity) AddTask(Spec string, JobName string, Args map[string]interface{}) (*TaskInfo, error)
- func (g *MasterClusterEntity) ExecTask(id string) error
- func (g *MasterClusterEntity) Init() Master
- func (g *MasterClusterEntity) RemoveTask(id string) error
- func (g *MasterClusterEntity) WatchMaster() error
- func (g *MasterClusterEntity) WatchTask() error
- type MasterEntity
- func (g *MasterEntity) AddTask(Spec string, JobName string, Args map[string]interface{}) (*TaskInfo, error)
- func (g *MasterEntity) ExecTask(id string) error
- func (g *MasterEntity) GetBusyWorkers() ([]*work.WorkerObservation, error)
- func (g *MasterEntity) GetEnqueue(id string) (Enqueue, error)
- func (g *MasterEntity) GetEnqueues() map[string]Enqueue
- func (g *MasterEntity) GetID() string
- func (g *MasterEntity) GetQueues() ([]*work.Queue, error)
- func (g *MasterEntity) GetWorkerHeartbeats() ([]*work.WorkerPoolHeartbeat, error)
- func (g *MasterEntity) Init() Master
- func (g *MasterEntity) RemoveTask(id string) error
- type MasterOption
- type TaskInfo
- type Worker
- type WorkerEntity
Constants ¶
View Source
const ( MASTER = "master" TASK = "task" LOCK = "lock" )
View Source
const (
Now = "now"
)
Variables ¶
This section is empty.
Functions ¶
func BuildKeyPath ¶
BuildKeyPath 組合 etcd key 的路徑
Types ¶
type Enqueue ¶
type Enqueue interface { GetInstance() *EnqueueEntity GetID() string GetEntryID() cronV3.EntryID SetEntryID(id cronV3.EntryID) Enqueue GetSpec() string SetSpec(string) Enqueue GetJobName() string SetJobName(string) Enqueue GetArgs() map[string]interface{} AddArgs(key string, value interface{}) Enqueue RemoveArgs(key string) Enqueue Do() (*work.Job, error) Run() }
Enqueue 封裝 EnqueueEntity 方法的 interface
type EnqueueEntity ¶
type EnqueueEntity struct { ID string `json:"id"` Engine *work.Enqueuer `json:"-"` Spec string `json:"Spec"` EntryID cronV3.EntryID `json:"EntryID"` JobName string `json:"JobName"` Args map[string]interface{} `json:"Args"` }
EnqueueEntity 實例
func (*EnqueueEntity) AddArgs ¶
func (g *EnqueueEntity) AddArgs(key string, value interface{}) Enqueue
func (*EnqueueEntity) GetArgs ¶
func (g *EnqueueEntity) GetArgs() map[string]interface{}
func (*EnqueueEntity) GetEntryID ¶
func (g *EnqueueEntity) GetEntryID() cronV3.EntryID
func (*EnqueueEntity) GetID ¶
func (g *EnqueueEntity) GetID() string
func (*EnqueueEntity) GetInstance ¶
func (g *EnqueueEntity) GetInstance() *EnqueueEntity
func (*EnqueueEntity) GetJobName ¶
func (g *EnqueueEntity) GetJobName() string
func (*EnqueueEntity) GetSpec ¶
func (g *EnqueueEntity) GetSpec() string
func (*EnqueueEntity) RemoveArgs ¶
func (g *EnqueueEntity) RemoveArgs(key string) Enqueue
func (*EnqueueEntity) SetEntryID ¶
func (g *EnqueueEntity) SetEntryID(id cronV3.EntryID) Enqueue
func (*EnqueueEntity) SetJobName ¶
func (g *EnqueueEntity) SetJobName(JobName string) Enqueue
func (*EnqueueEntity) SetSpec ¶
func (g *EnqueueEntity) SetSpec(Spec string) Enqueue
type Master ¶
type Master interface { Init() Master GetID() string AddTask(Spec string, JobName string, Args map[string]interface{}) (*TaskInfo, error) GetEnqueues() map[string]Enqueue GetEnqueue(id string) (Enqueue, error) GetWorkerHeartbeats() ([]*work.WorkerPoolHeartbeat, error) GetBusyWorkers() ([]*work.WorkerObservation, error) GetQueues() ([]*work.Queue, error) ExecTask(id string) error RemoveTask(id string) error }
func NewMaster ¶
func NewMaster(pool *redis.Pool, namespace string, option *MasterOption) Master
NewMaster 建立 Master 實例
func NewMasterCluster ¶
func NewMasterCluster(base *MasterEntity, option *MasterOption) Master
NewMasterCluster 建立集群版本 Master Instance
type MasterClusterEntity ¶
type MasterClusterEntity struct { *MasterEntity // contains filtered or unexported fields }
func (*MasterClusterEntity) AddTask ¶
func (g *MasterClusterEntity) AddTask(Spec string, JobName string, Args map[string]interface{}) (*TaskInfo, error)
AddTask 新增任務
func (*MasterClusterEntity) ExecTask ¶ added in v0.0.5
func (g *MasterClusterEntity) ExecTask(id string) error
func (*MasterClusterEntity) RemoveTask ¶
func (g *MasterClusterEntity) RemoveTask(id string) error
RemoveTask 移除任務
func (*MasterClusterEntity) WatchMaster ¶
func (g *MasterClusterEntity) WatchMaster() error
WatchMaster 集群監聽 Master
func (*MasterClusterEntity) WatchTask ¶
func (g *MasterClusterEntity) WatchTask() error
WatchTask 集群監聽任務
type MasterEntity ¶
type MasterEntity struct {
// contains filtered or unexported fields
}
Master 實例
func (*MasterEntity) AddTask ¶
func (g *MasterEntity) AddTask(Spec string, JobName string, Args map[string]interface{}) (*TaskInfo, error)
AddTask 加入任務
func (*MasterEntity) GetBusyWorkers ¶
func (g *MasterEntity) GetBusyWorkers() ([]*work.WorkerObservation, error)
func (*MasterEntity) GetEnqueue ¶
func (g *MasterEntity) GetEnqueue(id string) (Enqueue, error)
func (*MasterEntity) GetEnqueues ¶
func (g *MasterEntity) GetEnqueues() map[string]Enqueue
func (*MasterEntity) GetID ¶
func (g *MasterEntity) GetID() string
func (*MasterEntity) GetWorkerHeartbeats ¶
func (g *MasterEntity) GetWorkerHeartbeats() ([]*work.WorkerPoolHeartbeat, error)
GetWorkerHeartbeats 取得 worker heartbeats 陣列
func (*MasterEntity) Init ¶
func (g *MasterEntity) Init() Master
type MasterOption ¶
type Worker ¶
type WorkerEntity ¶
type WorkerEntity struct {
// contains filtered or unexported fields
}
func (*WorkerEntity) AddJob ¶
func (g *WorkerEntity) AddJob(name string, job Job, option *work.JobOptions) Worker
func (*WorkerEntity) Start ¶
func (g *WorkerEntity) Start() Worker
func (*WorkerEntity) Stop ¶
func (g *WorkerEntity) Stop() Worker
Click to show internal directories.
Click to hide internal directories.