jobs

package module
v0.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2021 License: MIT Imports: 13 Imported by: 0

README

Jobs

Development Status

Jobs is no longer being actively developed. I will still try my best to respond to issues and pull requests, but in general you should not expect much support. No new features will be added. Still, Jobs is reasonably well-tested, and it is probably fine to use it for low-traffic hobby sites. If you are looking for something for more serious, production use-cases, consider alternatives such as RabbitMQ.

Jobs follows semantic versioning but offers no guarantees of backwards compatibility until version 1.0.

About

Jobs is a persistent and flexible background jobs library for go.

Version Circle CI GoDoc

Jobs is powered by Redis and supports the following features:

  • A job can encapsulate any arbitrary functionality. A job can do anything which can be done in a go function.
  • A job can be one-off (only executed once) or recurring (scheduled to execute at a specific interval).
  • A job can be retried a specified number of times if it fails.
  • A job is persistent, with protections against power loss and other worst case scenarios. (See the Guarantees section below)
  • Work on jobs can be spread amongst any number of concurrent workers across any number of machines.
  • Provided it is persisted to disk, every job will be executed at least once, and in ideal network conditions will be executed exactly once. (See the Guarantees section below)
  • You can query the database to find out e.g. the number of jobs that are currently executing or how long a particular job took to execute.
  • Any job that permanently fails will have its error captured and stored.

Why is it Useful?

Jobs is intended to be used in web applications. It is useful for cases where you need to execute some long-running code, but you don't want your users to wait for the code to execute before rendering a response. A good example is sending a welcome email to your users after they sign up. You can use Jobs to schedule the email to be sent asynchronously, and render a response to your user without waiting for the email to be sent. You could use a goroutine to accomplish the same thing, but in the event of a server restart or power loss, the email might never be sent. Jobs guarantees that the email will be sent at some time and allows you to spread the work between different machines.

Installation

Jobs requires Go version >= 1.2. If you do not already have it, follow these instructions:

Jobs requires access to a Redis database. If you plan to have multiple worker pools spread out across different machines, they should all connect to the same Redis database. If you only want to run one worker pool, it is safe to install Redis locally and run it on the same machine. In either case, if you need to install Redis, follow these instructions:

  • Install Redis.
  • Follow the instructions in the section called Installing Redis more properly.
  • Make sure you understand how Redis Persistence works and have edited your config file to get your desired persistence. We recommend using both RDB and AOF and setting fsync to either "always" or "everysec".

After that, you can install Jobs like you would any other go package: go get github.com/albrow/jobs. If you want to update the package later, use go get -u github.com/albrow/jobs. Then you can import Jobs like you would any other go package by adding import github.com/albrow/jobs to your go source file.

Quickstart Guide

Connecting to Redis

You can configure the connection to Redis by editing Config.Db. Here are the options:

  • Address is the address of the redis database to connect to. Default is "localhost:6379".
  • Network is the type of network to use to connect to the redis database Default is "tcp".
  • Database is the redis database number to use for storing all data. Default is 0.
  • Password is a password to use for connecting to a redis database via the AUTH command. If empty, Jobs will not attempt to authenticate. Default is "" (an empty string).

You should edit Config.Db during program initialization, before running Pool.Start or scheduling any jobs. Here's an example of how to configure Jobs to use databse #10 and authenticate with the password "foobar":

func main() {
	// Configure database options at the start of your application
	jobs.Config.Db.Database = 10
	jobs.Config.Db.Password = "foobar"
}
Registering Job Types

Jobs must be organized into discrete types. Here's an example of how to register a job which sends a welcome email to users:

// We'll specify that we want the job to be retried 3 times before finally failing
welcomeEmailJobs, err := jobs.RegisterType("welcomeEmail", 3, func(user *User) error {
	msg := fmt.Sprintf("Hello, %s! Thanks for signing up for foo.com.", user.Name)
	if err := emails.Send(user.EmailAddress, msg); err != nil {
		// The returned error will be captured by a worker, which will then log the error
		// in the database and trigger up to 3 retries.
		return err
	}
})

The final argument to the RegisterType function is a HandlerFunc which will be executed when the job runs. HandlerFunc must be a function which accepts either zero or one arguments and returns an error.

Scheduling a Job

After registering a job type, you can schedule a job using the Schedule or ScheduleRecurring methods like so:

// The priority argument lets you choose how important the job is. Higher
// priority jobs will be executed first.
job, err := welcomeEmailJobs.Schedule(100, time.Now(), &User{EmailAddress: "foo@example.com"})
if err != nil {
	// Handle err
}

You can use the Job object returned by Schedule or ScheduleRecurring to check on the status of the job or cancel it manually.

Starting and Configuring Worker Pools

You can schedule any number of worker pools across any number of machines, provided every machine agrees on the definition of the job types. If you want, you can start a worker pool on the same machines that are scheduling jobs, or you can have each worker pool running on a designated machine. Since each pool is assigned an id based on a unique hardware identifier, you must only run one worker pool per machine.

To create a new pool with the default configuration, just pass in nil:

pool, err := jobs.NewPool(nil)
if err != nil {
	// Handle err
}

You can also specify a different configuration by passing in *PoolConfig. Any zero values in the config you pass in will fallback to the default values. So here's how you could start a pool with 10 workers and a batch size of 10, while letting the other options remain the default.

pool, err := jobs.NewPool(&jobs.PoolConfig{
	NumWorkers: 10,
	BatchSize: 10,
})
if err != nil {
	// Handle err
}

After you have created a pool, you can start it with the Start method. Once started, the pool will continuously query the database for new jobs and delegate those jobs to workers. Any program that calls Pool.Start() should also wait for the workers to finish before exiting. You can do so by wrapping Close and Wait in a defer statement. Typical usage looks something like this:

func main() {
	pool, err := jobs.NewPool(nil)
	if err != nil {
		// Handle err
	}
	defer func() {
		pool.Close()
		if err := pool.Wait(); err != nil {
			// Handle err
		}
	}()
	if err := pool.Start(); err != nil {
		// Handle err
	}
}

You can also call Close and Wait at any time to manually stop the pool from executing new jobs. In this case, any jobs that are currently being executed will still finish.

Testing

To run the tests, make sure you have Redis running and accepting unix socket connections on the address /tmp/redis.sock. The tests will use database #14. WARNING: After each test is run, database #14 will be completely erased, so make sure you do not have any important data stored there.

To run the tests just run go test . If anything fails, please report an issue and describe what happened.

Contributing

See Contributing.md

Guarantees

Persistence

Since jobs is powered by Redis, there is a chance that you can lose data with the default Redis configuration. To get the best persistence guarantees, you should set Redis to use both AOF and RDB persistence modes and set fsync to "always". With these settings, Redis is more or less as persistent as a database like postgres. If want better performance and are okay with a slightly greater chance of losing data (i.e. jobs not executing), you can set fsync to "everysec".

Read more about Redis persistence.

Atomicity

Jobs is carefully written using Redis transactions and lua scripting so that all database changes are atomic. If Redis crashes in the middle of a transaction or script execution, it is possible that your AOF file can become corrupted. If this happens, Redis will refuse to start until the AOF file is fixed. It is relatively easy to fix the problem with the redis-check-aof tool, which will remove the partial transaction from the AOF file. In effect, this guarantees that modifications of the database are atomic, even in the event of a power loss or hard reset, with the caveat that you may need to use the redis-check-aof tool in the worst case scenario.

Read more about Redis transactions and scripts.

Job Execution

Jobs guarantees that a job will be executed at least once, provided it has been persisted on disk. (See the section on Persistence directly above). A job can only picked up by one pool at a time because a pool atomically pops (gets and immediately moves) the next available jobs from the database. A job can only be executed by one worker at a time because the jobs are delegated to workers via a shared channel. Each worker pool checks on the health of all the other pools when it starts. If a pool crashes or is otherwise disconnected, any jobs it had grabbed from the database that did not yet finish will be re-queued and picked up by a different pool.

This is in no way an exhaustive list, but here are some known examples of scenarios that may cause a job to be executed more than once:

  1. If there is a power failure or hard reset while a worker is in the middle of executing a job, the job may be stuck in a half-executed state. Since there is no way to know how much of the job was successfully completed, the job will be re-queued and picked up by a different pool, where it may be partially or fully executed more than once.
  2. If a pool becomes disconnected, it will be considered stale and its jobs will be re-queued and reclaimed by a different pool. However, if the stale pool is able to partly or fully execute jobs without a reliable internet connection, any jobs belonging to the stale pool might be executed more than once. You can increase the StaleTimeout parameter for a pool to make this scenario less likely.

License

Jobs is licensed under the MIT License. See the LICENSE file for more information.

Documentation

Overview

Package jobs is a persistent and flexible background jobs library.

Version: 0.4.2

Jobs is powered by redis and supports the following features:

  • A job can encapsulate any arbitrary functionality. A job can do anything which can be done in a go function.
  • A job can be one-off (only executed once) or recurring (scheduled to execute at a specific interval).
  • A job can be retried a specified number of times if it fails.
  • A job is persistent, with protections against power loss and other worst case scenarios.
  • Jobs can be executed by any number of concurrent workers accross any number of machines.
  • Provided it is persisted to disk, every job will be executed *at least* once, and in ideal network conditions will be executed *exactly* once.
  • You can query the database to find out e.g. the number of jobs that are currently executing or how long a particular job took to execute.
  • Any job that permanently fails will have its error captured and stored.

Why is it Useful

Jobs is intended to be used in web applications. It is useful for cases where you need to execute some long-running code, but you don't want your users to wait for the code to execute before rendering a response. A good example is sending a welcome email to your users after they sign up. You can use Jobs to schedule the email to be sent asynchronously, and render a response to your user without waiting for the email to be sent. You could use a goroutine to accomplish the same thing, but in the event of a server restart or power loss, the email might never be sent. Jobs guarantees that the email will be sent at some time, and allows you to spread the work between different machines.

More Information

Visit https://github.com/albrow/jobs for a Quickstart Guide, code examples, and more information.

Index

Constants

This section is empty.

Variables

View Source
var Config = configType{
	Db: databaseConfig{

		Address: "localhost:6379",

		Network: "tcp",

		Database: 0,

		Password: "",
	},
}

Config is where all configuration variables are stored. You may modify Config directly in order to change config variables, and should only do so at the start of your program.

View Source
var DefaultPoolConfig = &PoolConfig{
	NumWorkers:   runtime.GOMAXPROCS(0),
	BatchSize:    runtime.GOMAXPROCS(0),
	MinWait:      200 * time.Millisecond,
	StaleTimeout: 30 * time.Second,
}

DefaultPoolConfig is the default config for pools. You can override any values by passing in a *PoolConfig to NewPool. Any zero values in PoolConfig will be interpreted as the default.

View Source
var Keys = struct {
	// jobsTimeIndex is the key for a sorted set which keeps all outstanding
	// jobs sorted by their time field.
	JobsTimeIndex string
	// jobsTemp is the key for a temporary set which is created and then destroyed
	// during the process of getting the next jobs in the queue.
	JobsTemp string
	// activePools is the key for a set which holds the pool ids for all active
	// pools.
	ActivePools string
}{
	JobsTimeIndex: "jobs:time",
	JobsTemp:      "jobs:temp",
	ActivePools:   "pools:active",
}

keys stores any constant redis keys. By storing them all here, we avoid using string literals which are prone to typos.

View Source
var Types = map[string]*Type{}

Types is map of job type names to *Type

Functions

This section is empty.

Types

type ErrorJobNotFound added in v0.3.1

type ErrorJobNotFound struct {
	// contains filtered or unexported fields
}

ErrorJobNotFound is returned whenever a specific job is not found, e.g. from the FindById function.

func (ErrorJobNotFound) Error added in v0.3.1

func (e ErrorJobNotFound) Error() string

type ErrorNameAlreadyRegistered

type ErrorNameAlreadyRegistered struct {
	// contains filtered or unexported fields
}

ErrorNameAlreadyRegistered is returned whenever RegisterType is called with a name that has already been registered.

func (ErrorNameAlreadyRegistered) Error

Error satisfies the error interface.

type HandlerFunc added in v0.2.0

type HandlerFunc interface{}

A HandlerFunc is a function which accepts ether zero or one arguments and returns an error. The function will be executed by a worker. If the function returns a non-nil error or causes a panic, the worker will capture and log the error, and if applicable the job may be queued for retry.

type Job

type Job struct {
	// contains filtered or unexported fields
}

Job represents a discrete piece of work to be done by a worker.

func FindById added in v0.1.0

func FindById(id string) (*Job, error)

FindById returns the job with the given id or an error if the job cannot be found (in which case the error will have type ErrorJobNotFound) or there was a problem connecting to the database.

func (*Job) Cancel

func (j *Job) Cancel() error

Cancel cancels the job, but does not remove it from the database. It will be added to a list of cancelled jobs. If you wish to remove it from the database, use the Destroy method. Attempting to cancel a destroyed job will have no effect.

func (*Job) Data added in v0.3.2

func (j *Job) Data() []byte

Data returns the gob-encoded data of the job

func (*Job) Destroy

func (j *Job) Destroy() error

Destroy removes all traces of the job from the database. If the job is currently being executed by a worker, the worker may still finish the job. Attempting to destroy a job that has already been destroyed will have no effect, so it is safe to call Destroy multiple times.

func (*Job) Duration

func (j *Job) Duration() time.Duration

Duration returns how long the job took to execute with nanosecond precision. I.e. the difference between j.Finished() and j.Started(). It returns a duration of zero if the job has not finished yet.

func (*Job) Error

func (j *Job) Error() error

Error returns the last error that arose during execution of the job. It is only non-nil if the job has failed at some point.

func (*Job) Finished

func (j *Job) Finished() time.Time

Finished returns the time that the job finished executing (in local time with nanosecond precision) or the zero time if the job has not finished executing yet.

func (*Job) Freq added in v0.3.2

func (j *Job) Freq() int64

Freq returns the frequency at which the job should be executed. Specifically it returns the number of nanoseconds between each scheduled execution.

func (*Job) Id

func (j *Job) Id() string

Id returns the unique identifier used for the job. If the job has not yet been saved to the database, it may return an empty string.

func (*Job) IsRecurring added in v0.3.0

func (j *Job) IsRecurring() bool

IsRecurring returns true iff the job is recurring

func (*Job) Key added in v0.3.0

func (j *Job) Key() string

Key returns the key used for the hash in redis which stores all the fields for this job.

func (*Job) NextTime added in v0.3.0

func (j *Job) NextTime() int64

NextTime returns the time (unix UTC with nanosecond precision) that the job should execute next, if it is a recurring job, and 0 if it is not.

func (*Job) PoolId added in v0.3.2

func (j *Job) PoolId() string

PoolId returns the pool id of the job if it is currently being executed or has been executed and at some point has been assigned to a specific pool. Otherwise, it returns an empty string.

func (*Job) Priority added in v0.3.2

func (j *Job) Priority() int

Priority returns the job's priority.

func (*Job) Refresh

func (j *Job) Refresh() error

Refresh mutates the job by setting its fields to the most recent data found in the database. It returns an error if there was a problem connecting to the database or if the job was destroyed.

func (*Job) Reschedule added in v0.1.0

func (j *Job) Reschedule(time time.Time) error

Reschedule reschedules the job with the given time. It can be used to reschedule cancelled jobs. It may also be used to reschedule finished or failed jobs, however, in most cases if you want to reschedule finished jobs you should use the ScheduleRecurring method and if you want to reschedule failed jobs, you should set the number of retries > 0 when registering the job type. Attempting to reschedule a destroyed job will have no effect. Reschedule returns an error if there was a problem connecting to the database.

func (*Job) Retries added in v0.3.2

func (j *Job) Retries() uint

Retries returns the number of remaining retries for the job.

func (*Job) Started

func (j *Job) Started() time.Time

Started returns the time that the job started executing (in local time with nanosecond precision) or the zero time if the job has not started executing yet.

func (*Job) Status

func (j *Job) Status() Status

Status returns the status of the job.

func (*Job) Time added in v0.3.2

func (j *Job) Time() int64

Time returns the time at which the job should be executed in UTC UNIX format with nanosecond precision.

type Pool

type Pool struct {

	// RWMutex is only used during testing when we need to
	// change some of the fields for the pool after it was started.
	// NOTE: currently only used in one test (TestStalePoolsArePurged)
	// and might be removed if we refactor later.
	sync.RWMutex
	// contains filtered or unexported fields
}

Pool is a pool of workers. Pool will query the database for queued jobs and delegate those jobs to some number of workers. It will do this continuously until the main program exits or you call Pool.Close().

func NewPool

func NewPool(config *PoolConfig) (*Pool, error)

NewPool creates and returns a new pool with the given configuration. You can pass in nil to use the default values. Otherwise, any zero values in config will be interpreted as the default value.

func (*Pool) Close

func (p *Pool) Close()

Close closes the worker pool and prevents it from delegating any new jobs. However, any jobs that are currently being executed will still be executed. Close returns immediately. If you want to wait until all workers are done executing their current jobs, use the Wait method.

func (*Pool) SetAfterFunc added in v0.4.0

func (p *Pool) SetAfterFunc(f func(*Job))

SetAfterFunc will assign a function that will be executed each time a job is finished.

func (*Pool) Start

func (p *Pool) Start() error

Start starts the worker pool. This means the pool will initialize workers, continuously query the database for queued jobs, and delegate those jobs to the workers.

func (*Pool) Wait

func (p *Pool) Wait() error

Wait will return when all workers are done executing their jobs. Wait can only possibly return after you have called Close. To prevent errors due to partially-executed jobs, any go program which starts a worker pool should call Wait (and Close before that if needed) before exiting.

type PoolConfig

type PoolConfig struct {
	// NumWorkers is the number of workers to run
	// Each worker will run inside its own goroutine
	// and execute jobs asynchronously. Default is
	// runtime.GOMAXPROCS.
	NumWorkers int
	// BatchSize is the number of jobs to send through
	// the jobs channel at once. Increasing BatchSize means
	// the worker pool will query the database less frequently,
	// so you would get higher performance. However this comes
	// at the cost that jobs with lower priority may sometimes be
	// executed before jobs with higher priority, because the jobs
	// with higher priority were not ready yet the last time the pool
	// queried the database. Decreasing BatchSize means more
	// frequent queries to the database and lower performance, but
	// greater likelihood of executing jobs in perfect order with regards
	// to priority. Setting BatchSize to 1 gaurantees that higher priority
	// jobs are always executed first as soon as they are ready. Default is
	// runtime.GOMAXPROCS.
	BatchSize int
	// MinWait is the minimum amount of time the pool will wait before checking
	// the database for queued jobs. The pool may take longer to query the database
	// if the jobs channel is blocking (i.e. if no workers are ready to execute new
	// jobs). Default is 200ms.
	MinWait time.Duration
	// StaleTimeout is the amount of time to wait for a pool to reply to a ping request
	// before considering it stale. Stale pools will be purged and if they have any
	// corresponding jobs in the executing set, those jobs will be requeued. Default
	// is 30 seconds.
	StaleTimeout time.Duration
}

PoolConfig is a set of configuration options for pools. Setting any value to the zero value will be interpretted as the default.

type Status added in v0.1.0

type Status string

Status represents the different statuses a job can have.

const (
	// StatusSaved is the status of any job that has been saved into the database but not yet queued
	StatusSaved Status = "saved"
	// StatusQueued is the status of any job that has been queued for execution but not yet selected
	StatusQueued Status = "queued"
	// StatusExecuting is the status of any job that has been selected for execution and is being delegated
	// to some worker and any job that is currently being executed by some worker.
	StatusExecuting Status = "executing"
	// StatusFinished is the status of any job that has been successfully executed.
	StatusFinished Status = "finished"
	// StatusFailed is the status of any job that failed to execute and for which there are no remaining retries.
	StatusFailed Status = "failed"
	// StatusCancelled is the status of any job that was manually cancelled.
	StatusCancelled Status = "cancelled"
	// StatusDestroyed is the status of any job that has been destroyed, i.e. completely removed
	// from the database.
	StatusDestroyed Status = "destroyed"
)

func (Status) Count added in v0.1.0

func (status Status) Count() (int, error)

Count returns the number of jobs that currently have the given status or an error if there was a problem connecting to the database.

func (Status) JobIds added in v0.1.0

func (status Status) JobIds() ([]string, error)

JobIds returns the ids of all jobs that have the given status, ordered by priority or an error if there was a problem connecting to the database.

func (Status) Jobs added in v0.1.0

func (status Status) Jobs() ([]*Job, error)

Jobs returns all jobs that have the given status, ordered by priority or an error if there was a problem connecting to the database.

func (Status) Key added in v0.3.0

func (status Status) Key() string

key returns the key used for the sorted set in redis which will hold all jobs with this status.

type Type added in v0.1.0

type Type struct {
	// contains filtered or unexported fields
}

Type represents a type of job that can be executed by workers

func RegisterType added in v0.1.0

func RegisterType(name string, retries uint, handler HandlerFunc) (*Type, error)

RegisterType registers a new type of job that can be executed by workers. name should be a unique string identifier for the job. retries is the number of times this type of job should be retried if it fails. handler is a function that a worker will call in order to execute the job. handler should be a function which accepts either 0 or 1 arguments of any type, corresponding to the data for a job of this type. All jobs of this type must have data with the same type as the first argument to handler, or nil if the handler accepts no arguments.

func (*Type) Schedule added in v0.1.0

func (jt *Type) Schedule(priority int, time time.Time, data interface{}) (*Job, error)

Schedule schedules a on-off job of the given type with the given parameters. Jobs with a higher priority will be executed first. The job will not be executed until after time. data is the data associated with this particular job and should have the same type as the first argument to the handler for this Type.

func (*Type) ScheduleRecurring added in v0.1.0

func (jt *Type) ScheduleRecurring(priority int, time time.Time, freq time.Duration, data interface{}) (*Job, error)

ScheduleRecurring schedules a recurring job of the given type with the given parameters. Jobs with a higher priority will be executed first. The job will not be executed until after time. After time, the job will be executed with a frequency specified by freq. data is the data associated with this particular job and should have the same type as the first argument to the handler for this Type. Every recurring execution of the job will use the same data.

func (*Type) String added in v0.1.0

func (jt *Type) String() string

String satisfies the Stringer interface and returns the name of the Type.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL