work

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 18, 2017 License: MIT Imports: 14 Imported by: 2

README

DispatchMe/go-work GoDoc

This is a fork of gocraft/work that removes their strange usage of custom contexts. The use of reflection to support that was detracting from efficiency and didn't feel very "go-like", so we made this change

DispatchMe/go-work lets you enqueue and processes background jobs in Go. Jobs are durable and backed by Redis. Very similar to Sidekiq for Go.

  • Fast and efficient. Faster than this, this, and this. See below for benchmarks.
  • Reliable - don't lose jobs even if your process crashes.
  • Middleware on jobs -- good for metrics instrumentation, logging, etc.
  • If a job fails, it will be retried a specified number of times.
  • Schedule jobs to happen in the future.
  • Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.
  • Web UI to manage failed jobs and observe the system.
  • Periodically enqueue jobs on a cron-like schedule.

Enqueue new jobs

To enqueue jobs, you need to make an Enqueuer with a redis namespace and a redigo pool. Each enqueued job has a name and can take optional arguments. Arguments are k/v pairs (serialized as JSON internally).

package main

import (
	"github.com/garyburd/redigo/redis"
	"github.com/DispatchMe/go-work"
)

// Make a redis pool
var redisPool = &redis.Pool{
	MaxActive: 5,
	MaxIdle: 5,
	Wait: true,
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	},
}

type SendEmailJobParameters struct {
	Address string
	Subject string
	CustomerID int
}

// Make an enqueuer with a particular namespace
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func main() {
	// Enqueue a job named "send_email" with the specified parameters.
	_, err := enqueuer.Enqueue("send_email", &SendEmailJobParameters{Address: "test@example.com", Subject: "hello world", CustomerID: 4})
	if err != nil {
		log.Fatal(err)
	}
}


Process jobs

In order to process jobs, you'll need to make a WorkerPool. Add middleware and jobs to the pool, and start the pool.

package main

import (
	"github.com/garyburd/redigo/redis"
	"github.com/DispatchMe/go-work"
	"os"
	"os/signal"
)

// Make a redis pool
var redisPool = &redis.Pool{
	MaxActive: 5,
	MaxIdle: 5,
	Wait: true,
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	},
}

func main() {
	// Make a new pool. Arguments:
	// 10 is the max concurrency
	// "my_app_namespace" is the Redis namespace
	// redisPool is a Redis pool
	pool := work.NewWorkerPool(10, "my_app_namespace", redisPool)

	// Add middleware that will be executed for each job
	pool.Middleware(LogMiddleware)

	// Map the name of jobs to handler functions
	pool.Job("send_email", SendEmailHandler)

	// Customize options:
	pool.JobWithOptions("export", JobOptions{Priority: 10, MaxFails: 1}, ExportHandler)

	// Start processing jobs
	pool.Start()

	// Wait for a signal to quit:
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, os.Kill)
	<-signalChan

	// Stop the pool
	pool.Stop()
}

func LogMiddleware(ctx *work.Context, next work.NextMiddlewareFunc) error {
	fmt.Println("Starting job: ", ctx.Job.Name)
	return next()
}

func SendEmailHandler(ctx *work.Context) error {
	// Extract arguments:
	args := new(SendEmailJobParameters)
	err := ctx.Job.UnmarshalPayload(args)
	if err != nil {
		return err
	}

	// Go ahead and send the email...
	// sendEmailTo(args.Address, args.Subject)

	return nil
}

func ExportHandler(ctx *work.Context) error {
	return nil
}

Special Features

Check-ins

Since this is a background job processing library, it's fairly common to have jobs that that take a long time to execute. Imagine you have a job that takes an hour to run. It can often be frustrating to know if it's hung, or about to finish, or if it has 30 more minutes to go.

To solve this, you can instrument your jobs to "checkin" every so often with a string message. This checkin status will show up in the web UI. For instance, your job could look like this:

func ExportHandler(ctx *work.Context) error {
	rowsToExport := getRows()
	for i, row := range rowsToExport {
		exportRow(row)
		if i % 1000 == 0 {
			ctx.Job.Checkin("i=" + fmt.Sprint(i))   // Here's the magic! This tells DispatchMe/go-work our status
		}
	}
}

Then in the web UI, you'll see the status of the worker:

Name Arguments Started At Check-in At Check-in
export {"account_id": 123} 2016/07/09 04:16:51 2016/07/09 05:03:13 i=335000
Scheduled Jobs

You can schedule jobs to be executed in the future. To do so, make a new Enqueuer and call its EnqueueIn method:

enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
secondsInTheFuture := 300
_, err := enqueuer.EnqueueIn("send_welcome_email", secondsInTheFuture, work.Q{"address": "test@example.com"})
Unique Jobs

You can enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once. For instance, you might have a worker that expires the cache of an object. It doesn't make sense for multiple such jobs to exist at once. Also note that unique jobs are supported for normal enqueues as well as scheduled enqueues.

enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
job, err := enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // job returned
job, err = enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // job == nil -- this duplicate job isn't enqueued.
job, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "789"}) // job != nil (diff id)
Periodic Enqueueing (Cron)

You can periodically enqueue jobs on your DispatchMe/go-work cluster using your worker pool. The scheduling specification uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.

pool := work.NewWorkerPool(10, "my_app_namespace", redisPool)
pool.PeriodicallyEnqueue("0 0 * * * *", "calculate_caches") // This will enqueue a "calculate_caches" job every hour
pool.Job("calculate_caches", CalculateCaches) // Still need to register a handler for this job separately

Run the Web UI

The web UI provides a view to view the state of your DispatchMe/go-work cluster, inspect queued jobs, and retry or delete dead jobs.

Building an installing the binary:

go get github.com/DispatchMe/go-work/cmd/workwebui
go install github.com/DispatchMe/go-work/cmd/workwebui

Then, you can run it:

workwebui -redis=":6379" -ns="work" -listen=":5040"

Navigate to http://localhost:5040/.

You'll see a view that looks like this:

Web UI Screenshot

Design and concepts

Enqueueing jobs
  • When jobs are enqueued, they're serialized with JSON and added to a simple Redis list with LPUSH.
  • Jobs are added to a list with the same name as the job. Each job name gets its own queue. Whereas with other job systems you have to design which jobs go on which queues, there's no need for that here.
Scheduling algorithm
  • Each job lives in a list-based queue with the same name as the job.
  • Each of these queues can have an associated priority. The priority is a number from 1 to 100000.
  • Each time a worker pulls a job, it needs to choose a queue. It chooses a queue probabilistically based on its relative priority.
  • If the sum of priorities among all queues is 1000, and one queue has priority 100, jobs will be pulled from that queue 10% of the time.
  • Obviously if a queue is empty, it won't be considered.
  • The semantics of "always process X jobs before Y jobs" can be accurately approximated by giving X a large number (like 10000) and Y a small number (like 1).
Processing a job
  • To process a job, a worker will execute a Lua script to atomically move a job its queue to an in-progress queue.
  • The worker will then run the job. The job will either finish successfully or result in an error or panic.
    • If the process completely crashes, the reaper will eventually find it in its in-progress queue and requeue it.
  • If the job is successful, we'll simply remove the job from the in-progress queue.
  • If the job returns an error or panic, we'll see how many retries a job has left. If it doesn't have any, we'll move it to the dead queue. If it has retries left, we'll consume a retry and add the job to the retry queue.
Workers and WorkerPools
  • WorkerPools provide the public API of DispatchMe/go-work.
    • You can attach jobs and middleware to them.
    • You can start and stop them.
    • Based on their concurrency setting, they'll spin up N worker goroutines.
  • Each worker is run in a goroutine. It will get a job from redis, run it, get the next job, etc.
    • Each worker is independent. They are not dispatched work -- they get their own work.
Retry job, scheduled jobs, and the requeuer
  • In addition to the normal list-based queues that normal jobs live in, there are two other types of queues: the retry queue and the scheduled job queue.
  • Both of these are implemented as Redis z-sets. The score is the unix timestamp when the job should be run. The value is the bytes of the job.
  • The requeuer will occasionally look for jobs in these queues that should be run now. If they should be, they'll be atomically moved to the normal list-based queue and eventually processed.
Dead jobs
  • After a job has failed a specified number of times, it will be added to the dead job queue.
  • The dead job queue is just a Redis z-set. The score is the timestamp it failed and the value is the job.
  • To retry failed jobs, use the UI or the Client API.
The reaper
  • If a process crashes hard (eg, the power on the server turns off or the kernal freezes), some jobs may be in progress and we won't want to lose them. They're safe in their in-progress queue.
  • The reaper will look for worker pools without a heartbeat. It will scan their in-progress queues and requeue anything it finds.
Unique jobs
  • You can enqueue unique jobs such that a given name/arguments are on the queue at once.
  • Both normal queues and the scheduled queue are considered.
  • When a unique job is enqueued, we'll atomically set a redis key that includes the job name and arguments and enqueue the job.
  • When the job is processed, we'll delete that key to permit another job to be enqueued.
Periodic Jobs
  • You can tell a worker pool to enqueue jobs periodically using a cron schedule.
  • Each worker pool will wake up every 2 minutes, and if jobs haven't been scheduled yet, it will schedule all the jobs that would be executed in the next five minutes.
  • Each periodic job that runs at a given time has a predictable byte pattern. Since jobs are scheduled on the scheduled job queue (a Redis z-set), if the same job is scheduled twice for a given time, it can only exist in the z-set once.
Terminology reference
  • "worker pool" - a pool of workers
  • "worker" - an individual worker in a single goroutine. Gets a job from redis, does job, gets next job...
  • "heartbeater" or "worker pool heartbeater" - goroutine owned by worker pool that runs concurrently with workers. Writes the worker pool's config/status (aka "heartbeat") every 5 seconds.
  • "heartbeat" - the status written by the heartbeater.
  • "observer" or "worker observer" - observes a worker. Writes stats. makes "observations".
  • "worker observation" - A snapshot made by an observer of what a worker is working on.
  • "periodic enqueuer" - A process that runs with a worker pool that periodically enqueues new jobs based on cron schedules.
  • "job" - the actual bundle of data that constitutes one job
  • "job name" - each job has a name, like "create_watch"
  • "job type" - backend/private nomenclature for the handler+options for processing a job
  • "queue" - each job creates a queue with the same name as the job. only jobs named X go into the X queue.
  • "retry jobs" - If a job fails and needs to be retried, it will be put on this queue.
  • "scheduled jobs" - Jobs enqueued to be run in th future will be put on a scheduled job queue.
  • "dead jobs" - If a job exceeds its MaxFails count, it will be put on the dead job queue.

Benchmarks

The benches folder contains various benchmark code. In each case, we enqueue 100k jobs across 5 queues. The jobs are almost no-op jobs: they simply increment an atomic counter. We then measure the rate of change of the counter to obtain our measurement.

Library Speed
DispatchMe/go-work 20944 jobs/s
jrallison/go-workers 19945 jobs/s
benmanns/goworker 10328.5 jobs/s
albrow/jobs 40 jobs/s

Authors

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotDeleted = fmt.Errorf("nothing deleted")

ErrNotDeleted is returned by functions that delete jobs to indicate that although the redis commands were successful, no object was actually deleted by those commmands.

View Source
var ErrNotRetried = fmt.Errorf("nothing retried")

ErrNotRetried is returned by functions that retry jobs to indicate that although the redis commands were successful, no object was actually retried by those commmands.

Functions

This section is empty.

Types

type BackoffCalculator added in v0.6.0

type BackoffCalculator func(job *Job) int64

You may provide your own backoff function for retrying failed jobs or use the builtin one. Returns the number of seconds to wait until the next attempt.

The builtin backoff calculator provides an exponentially increasing wait function.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client implements all of the functionality of the web UI. It can be used to inspect the status of a running cluster and retry dead jobs.

func NewClient

func NewClient(namespace string, pool *redis.Pool) *Client

NewClient creates a new Client with the specified redis namespace and connection pool.

func (*Client) DeadJobs

func (c *Client) DeadJobs(page uint) ([]*DeadJob, int64, error)

DeadJobs returns a list of DeadJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of dead jobs is also returned.

func (*Client) DeleteAllDeadJobs

func (c *Client) DeleteAllDeadJobs() error

DeleteAllDeadJobs deletes all dead jobs.

func (*Client) DeleteDeadJob

func (c *Client) DeleteDeadJob(diedAt int64, jobID string) error

DeleteDeadJob deletes a dead job from Redis.

func (*Client) DeleteRetryJob

func (c *Client) DeleteRetryJob(retryAt int64, jobID string) error

DeleteRetryJob deletes a job in the retry queue.

func (*Client) DeleteScheduledJob

func (c *Client) DeleteScheduledJob(scheduledFor int64, jobID string) error

DeleteScheduledJob deletes a job in the scheduled queue.

func (*Client) Queues

func (c *Client) Queues() ([]*Queue, error)

Queues returns the Queue's it finds.

func (*Client) RetryAllDeadJobs

func (c *Client) RetryAllDeadJobs() error

RetryAllDeadJobs requeues all dead jobs. In other words, it puts them all back on the normal work queue for workers to pull from and process.

func (*Client) RetryDeadJob

func (c *Client) RetryDeadJob(diedAt int64, jobID string) error

RetryDeadJob retries a dead job. The job will be re-queued on the normal work queue for eventual processing by a worker.

func (*Client) RetryJobs

func (c *Client) RetryJobs(page uint) ([]*RetryJob, int64, error)

RetryJobs returns a list of RetryJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of retry jobs is also returned.

func (*Client) ScheduledJobs

func (c *Client) ScheduledJobs(page uint) ([]*ScheduledJob, int64, error)

ScheduledJobs returns a list of ScheduledJob's. The page param is 1-based; each page is 20 items. The total number of items (not pages) in the list of scheduled jobs is also returned.

func (*Client) WorkerObservations

func (c *Client) WorkerObservations() ([]*WorkerObservation, error)

WorkerObservations returns all of the WorkerObservation's it finds for all worker pools' workers.

func (*Client) WorkerPoolHeartbeats

func (c *Client) WorkerPoolHeartbeats() ([]*WorkerPoolHeartbeat, error)

WorkerPoolHeartbeats queries Redis and returns all WorkerPoolHeartbeat's it finds (even for those worker pools which don't have a current heartbeat).

type Context added in v0.6.0

type Context struct {
	Job *Job
	// contains filtered or unexported fields
}

func NewContext added in v0.6.0

func NewContext(job *Job) *Context

func (*Context) Get added in v0.6.0

func (c *Context) Get(key string) interface{}

func (*Context) Set added in v0.6.0

func (c *Context) Set(key string, value interface{})

type DeadJob

type DeadJob struct {
	DiedAt int64 `json:"died_at"`
	*Job
}

DeadJob represents a job in the dead queue.

type Enqueuer

type Enqueuer struct {
	Namespace string // eg, "myapp-work"
	Pool      *redis.Pool
	// contains filtered or unexported fields
}

Enqueuer can enqueue jobs.

func NewEnqueuer

func NewEnqueuer(namespace string, pool *redis.Pool) *Enqueuer

NewEnqueuer creates a new enqueuer with the specified Redis namespace and Redis pool.

func (*Enqueuer) Enqueue

func (e *Enqueuer) Enqueue(jobName string, payload interface{}) (*Job, error)

Enqueue will enqueue the specified job name and arguments. The args param can be nil if no args ar needed. Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com"})

func (*Enqueuer) EnqueueIn

func (e *Enqueuer) EnqueueIn(jobName string, secondsFromNow int64, payload interface{}) (*ScheduledJob, error)

EnqueueIn enqueues a job in the scheduled job queue for execution in secondsFromNow seconds.

func (*Enqueuer) EnqueueUnique

func (e *Enqueuer) EnqueueUnique(jobName string, payload interface{}) (*Job, error)

EnqueueUnique enqueues a job unless a job is already enqueued with the same name and arguments. The already-enqueued job can be in the normal work queue or in the scheduled job queue. Once a worker begins processing a job, another job with the same name and arguments can be enqueued again. Any failed jobs in the retry queue or dead queue don't count against the uniqueness -- so if a job fails and is retried, two unique jobs with the same name and arguments can be enqueued at once. In order to add robustness to the system, jobs are only unique for 24 hours after they're enqueued. This is mostly relevant for scheduled jobs. EnqueueUnique returns the job if it was enqueued and nil if it wasn't

func (*Enqueuer) EnqueueUniqueIn

func (e *Enqueuer) EnqueueUniqueIn(jobName string, secondsFromNow int64, payload interface{}) (*ScheduledJob, error)

EnqueueUniqueIn enqueues a unique job in the scheduled job queue for execution in secondsFromNow seconds. See EnqueueUnique for the semantics of unique jobs.

type Handler added in v0.6.0

type Handler func(ctx *Context) error

type Job

type Job struct {
	// Inputs when making a new job
	Name       string `json:"name,omitempty"`
	ID         string `json:"id"`
	EnqueuedAt int64  `json:"t"`

	// Payload can be any json-marshallable object. This leads to a
	// double json-marshal which could be a bit better optimized,
	// but makes it easier to have custom job definitions.
	Payload []byte `json:"payload"`

	Unique bool `json:"unique,omitempty"`

	// Inputs when retrying
	Fails    int64  `json:"fails,omitempty"` // number of times this job has failed
	LastErr  string `json:"err,omitempty"`
	FailedAt int64  `json:"failed_at,omitempty"`
	// contains filtered or unexported fields
}

Job represents a job.

func (*Job) Checkin

func (j *Job) Checkin(msg string)

Checkin will update the status of the executing job to the specified messages. This message is visible within the web UI. This is useful for indicating some sort of progress on very long running jobs. For instance, on a job that has to process a million records over the course of an hour, the job could call Checkin with the current job number every 10k jobs.

func (*Job) SetPayload added in v0.6.0

func (j *Job) SetPayload(payload interface{}) error

func (*Job) UnmarshalPayload added in v0.6.0

func (j *Job) UnmarshalPayload(dest interface{}) error

type JobOptions

type JobOptions struct {
	Priority uint              // Priority from 1 to 10000
	MaxFails uint              // 1: send straight to dead (unless SkipDead)
	SkipDead bool              // If true, don't send failed jobs to the dead queue when retries are exhausted.
	Backoff  BackoffCalculator // If not set, uses the default backoff algorithm
}

JobOptions can be passed to JobWithOptions.

type Middleware added in v0.6.0

type Middleware func(ctx *Context, next NextMiddlewareFunc) error

type NextMiddlewareFunc

type NextMiddlewareFunc func() error

NextMiddlewareFunc is a function type (whose instances are named 'next') that you call to advance to the next middleware.

type Q

type Q map[string]interface{}

Q is a shortcut to easily specify arguments for jobs when enqueueing them. Example: e.Enqueue("send_email", work.Q{"addr": "test@example.com", "track": true})

type Queue

type Queue struct {
	JobName string `json:"job_name"`
	Count   int64  `json:"count"`
	Latency int64  `json:"latency"`
}

Queue represents a queue that holds jobs with the same name. It indicates their name, count, and latency (in seconds). Latency is a measurement of how long ago the next job to be processed was enqueued.

type RetryJob

type RetryJob struct {
	RetryAt int64 `json:"retry_at"`
	*Job
}

RetryJob represents a job in the retry queue.

type ScheduledJob

type ScheduledJob struct {
	RunAt int64 `json:"run_at"`
	*Job
}

ScheduledJob represents a job in the scheduled queue.

type WorkerObservation

type WorkerObservation struct {
	WorkerID string `json:"worker_id"`
	IsBusy   bool   `json:"is_busy"`

	// If IsBusy:
	JobName   string `json:"job_name"`
	JobID     string `json:"job_id"`
	StartedAt int64  `json:"started_at"`
	Payload   []byte `json:"payload"`
	Checkin   string `json:"checkin"`
	CheckinAt int64  `json:"checkin_at"`
}

WorkerObservation represents the latest observation taken from a worker. The observation indicates whether the worker is busy processing a job, and if so, information about that job.

type WorkerPool

type WorkerPool struct {
	// contains filtered or unexported fields
}

WorkerPool represents a pool of workers. It forms the primary API of gocraft/work. WorkerPools provide the public API of gocraft/work. You can attach jobs and middlware to them. You can start and stop them. Based on their concurrency setting, they'll spin up N worker goroutines.

func NewWorkerPool

func NewWorkerPool(concurrency uint, namespace string, pool *redis.Pool) *WorkerPool

NewWorkerPool creates a new worker pool. ctx should be a struct literal whose type will be used for middleware and handlers. concurrency specifies how many workers to spin up - each worker can process jobs concurrently.

func (*WorkerPool) Drain

func (wp *WorkerPool) Drain()

Drain drains all jobs in the queue before returning. Note that if jobs are added faster than we can process them, this function wouldn't return.

func (*WorkerPool) Job

func (wp *WorkerPool) Job(name string, handler Handler) *WorkerPool

Job registers the job name to the specified handler fn. For instnace, when workers pull jobs from the name queue, they'll be processed by the specified handler function. fn can take one of these forms: (*ContextType).func(*Job) error, (ContextType matches the type of ctx specified when creating a pool) func(*Job) error, for the generic handler format.

func (*WorkerPool) JobWithOptions

func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, handler Handler) *WorkerPool

JobWithOptions adds a handler for 'name' jobs as per the Job function, but permits you specify additional options such as a job's priority, retry count, and whether to send dead jobs to the dead job queue or trash them.

func (*WorkerPool) Middleware

func (wp *WorkerPool) Middleware(mw Middleware) *WorkerPool

Middleware appends the specified function to the middleware chain. The fn can take one of these forms: (*ContextType).func(*Job, NextMiddlewareFunc) error, (ContextType matches the type of ctx specified when creating a pool) func(*Job, NextMiddlewareFunc) error, for the generic middleware format.

func (*WorkerPool) PeriodicallyEnqueue

func (wp *WorkerPool) PeriodicallyEnqueue(spec string, jobName string) *WorkerPool

PeriodicallyEnqueue will periodically enqueue jobName according to the cron-based spec. The spec format is based on https://godoc.org/github.com/robfig/cron, which is a relatively standard cron format. Note that the first value is the seconds! If you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.

func (*WorkerPool) Start

func (wp *WorkerPool) Start()

Start starts the workers and associated processes.

func (*WorkerPool) Stop

func (wp *WorkerPool) Stop()

Stop stops the workers and associated processes.

type WorkerPoolHeartbeat

type WorkerPoolHeartbeat struct {
	WorkerPoolID string   `json:"worker_pool_id"`
	StartedAt    int64    `json:"started_at"`
	HeartbeatAt  int64    `json:"heartbeat_at"`
	JobNames     []string `json:"job_names"`
	Concurrency  uint     `json:"concurrency"`
	Host         string   `json:"host"`
	Pid          int      `json:"pid"`
	WorkerIDs    []string `json:"worker_ids"`
}

WorkerPoolHeartbeat represents the heartbeat from a worker pool. WorkerPool's write a heartbeat every 5 seconds so we know they're alive and includes config information.

Directories

Path Synopsis
benches
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL