goworker

package module
v0.0.0-...-3fc4c60 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2023 License: MIT Imports: 21 Imported by: 0

README

goworker

Build GoDoc

goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby while harnessing the efficiency and concurrency of Go to minimize job latency and cost.

goworker workers can run alongside Ruby Resque clients so that you can keep all but your most resource-intensive jobs in Ruby.

Installation

To install goworker, use

go get github.com/benmanns/goworker

to install the package, and then from your worker

import "github.com/benmanns/goworker"

Getting Started

To create a worker, write a function matching the signature

func(string, ...interface{}) error

and register it using

goworker.Register("MyClass", myFunc)

Here is a simple worker that prints its arguments:

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
	fmt.Printf("From %s, %v\n", queue, args)
	return nil
}

func init() {
	goworker.Register("MyClass", myFunc)
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

To create workers that share a database pool or other resources, use a closure to share variables.

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func newMyFunc(uri string) (func(queue string, args ...interface{}) error) {
	foo := NewFoo(uri)
	return func(queue string, args ...interface{}) error {
		foo.Bar(args)
		return nil
	}
}

func init() {
	goworker.Register("MyClass", newMyFunc("http://www.example.com/"))
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

Here is a simple worker with settings:

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
	fmt.Printf("From %s, %v\n", queue, args)
	return nil
}

func init() {
	settings := goworker.WorkerSettings{
		URI:            "redis://localhost:6379/",
		Connections:    100,
		Queues:         []string{"myqueue", "delimited", "queues"},
		UseNumber:      true,
		ExitOnComplete: false,
		Concurrency:    2,
		Namespace:      "resque:",
		Interval:       5.0,
	}
	goworker.SetSettings(settings)
	goworker.Register("MyClass", myFunc)
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

goworker worker functions receive the queue they are serving and a slice of interfaces. To use them as parameters to other functions, use Go type assertions to convert them into usable types.

// Expecting (int, string, float64)
func myFunc(queue, args ...interface{}) error {
	idNum, ok := args[0].(json.Number)
	if !ok {
		return errorInvalidParam
	}
	id, err := idNum.Int64()
	if err != nil {
		return errorInvalidParam
	}
	name, ok := args[1].(string)
	if !ok {
		return errorInvalidParam
	}
	weightNum, ok := args[2].(json.Number)
	if !ok {
		return errorInvalidParam
	}
	weight, err := weightNum.Float64()
	if err != nil {
		return errorInvalidParam
	}
	doSomething(id, name, weight)
	return nil
}

For testing, it is helpful to use the redis-cli program to insert jobs onto the Redis queue:

redis-cli -r 100 RPUSH resque:queue:myqueue '{"class":"MyClass","args":["hi","there"]}'

will insert 100 jobs for the MyClass worker onto the myqueue queue. It is equivalent to:

class MyClass
  @queue = :myqueue
end

100.times do
  Resque.enqueue MyClass, ['hi', 'there']
end

or

goworker.Enqueue(&goworker.Job{
    Queue: "myqueue",
    Payload: goworker.Payload{
        Class: "MyClass",
        Args: []interface{}{"hi", "there"},
    },
})

Flags

There are several flags which control the operation of the goworker client.

  • -queues="comma,delimited,queues" — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's * queue). If you have multiple queues you can assign them weights. A queue with a weight of 2 will be checked twice as often as a queue with a weight of 1: -queues='high=2,low=1'.
  • -interval=5.0 — Specifies the wait period between polling if no job was in the queue the last time one was requested.
  • -concurrency=25 — Specifies the number of concurrently executing workers. This number can be as low as 1 or rather comfortably as high as 100,000, and should be tuned to your workflow and the availability of outside resources.
  • -connections=2 — Specifies the maximum number of Redis connections that goworker will consume between the poller and all workers. There is not much performance gain over two and a slight penalty when using only one. This is configurable in case you need to keep connection counts low for cloud Redis providers who limit plans on maxclients.
  • -uri=redis://localhost:6379/ — Specifies the URI of the Redis database from which goworker polls for jobs. Accepts URIs of the format redis://user:pass@host:port/db or unix:///path/to/redis.sock. The flag may also be set by the environment variable $($REDIS_PROVIDER) or $REDIS_URL. E.g. set $REDIS_PROVIDER to REDISTOGO_URL on Heroku to let the Redis To Go add-on configure the Redis database.
  • -namespace=resque: — Specifies the namespace from which goworker retrieves jobs and stores stats on workers.
  • -exit-on-complete=false — Exits goworker when there are no jobs left in the queue. This is helpful in conjunction with the time command to benchmark different configurations.

You can also configure your own flags for use within your workers. Be sure to set them before calling goworker.Main(). It is okay to call flags.Parse() before calling goworker.Main() if you need to do additional processing on your flags.

Signal Handling in goworker

To stop goworker, send a QUIT, TERM, or INT signal to the process. This will immediately stop job polling. There can be up to $CONCURRENCY jobs currently running, which will continue to run until they are finished.

Failure Modes

Like Resque, goworker makes no guarantees about the safety of jobs in the event of process shutdown. Workers must be both idempotent and tolerant to loss of the job in the event of failure.

If the process is killed with a KILL or by a system failure, there may be one job that is currently in the poller's buffer that will be lost without any representation in either the queue or the worker variable.

If you are running goworker on a system like Heroku, which sends a TERM to signal a process that it needs to stop, ten seconds later sends a KILL to force the process to stop, your jobs must finish within 10 seconds or they may be lost. Jobs will be recoverable from the Redis database under

resque:worker:<hostname>:<process-id>-<worker-id>:<queues>

as a JSON object with keys queue, run_at, and payload, but the process is manual. Additionally, there is no guarantee that the job in Redis under the worker key has not finished, if the process is killed before goworker can flush the update to Redis.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

Documentation

Overview

Package goworker is a Resque-compatible, Go-based background worker. It allows you to push jobs into a queue using an expressive language like Ruby while harnessing the efficiency and concurrency of Go to minimize job latency and cost.

goworker workers can run alongside Ruby Resque clients so that you can keep all but your most resource-intensive jobs in Ruby.

To create a worker, write a function matching the signature

func(string, ...interface{}) error

and register it using

goworker.Register("MyClass", myFunc)

Here is a simple worker that prints its arguments:

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func myFunc(queue string, args ...interface{}) error {
	fmt.Printf("From %s, %v\n", queue, args)
	return nil
}

func init() {
	goworker.Register("MyClass", myFunc)
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

To create workers that share a database pool or other resources, use a closure to share variables.

package main

import (
	"fmt"
	"github.com/benmanns/goworker"
)

func newMyFunc(uri string) (func(queue string, args ...interface{}) error) {
	foo := NewFoo(uri)
	return func(queue string, args ...interface{}) error {
		foo.Bar(args)
		return nil
	}
}

func init() {
	goworker.Register("MyClass", newMyFunc("http://www.example.com/"))
}

func main() {
	if err := goworker.Work(); err != nil {
		fmt.Println("Error:", err)
	}
}

goworker worker functions receive the queue they are serving and a slice of interfaces. To use them as parameters to other functions, use Go type assertions to convert them into usable types.

// Expecting (int, string, float64)
func myFunc(queue, args ...interface{}) error {
	idNum, ok := args[0].(json.Number)
	if !ok {
		return errorInvalidParam
	}
	id, err := idNum.Int64()
	if err != nil {
		return errorInvalidParam
	}
	name, ok := args[1].(string)
	if !ok {
		return errorInvalidParam
	}
	weightNum, ok := args[2].(json.Number)
	if !ok {
		return errorInvalidParam
	}
	weight, err := weightNum.Float64()
	if err != nil {
		return errorInvalidParam
	}
	doSomething(id, name, weight)
	return nil
}

For testing, it is helpful to use the redis-cli program to insert jobs onto the Redis queue:

redis-cli -r 100 RPUSH resque:queue:myqueue '{"class":"MyClass","args":["hi","there"]}'

will insert 100 jobs for the MyClass worker onto the myqueue queue. It is equivalent to:

class MyClass
  @queue = :myqueue
end

100.times do
  Resque.enqueue MyClass, ['hi', 'there']
end

Running goworker

After building your workers, you will have an executable that you can run which will automatically poll a Redis server and call your workers as jobs arrive.

Flags

There are several flags which control the operation of the goworker client.

-queues="comma,delimited,queues" — This is the only required flag. The recommended practice is to separate your Resque workers from your goworkers with different queues. Otherwise, Resque worker classes that have no goworker analog will cause the goworker process to fail the jobs. Because of this, there is no default queue, nor is there a way to select all queues (à la Resque's * queue). Queues are processed in the order they are specififed. If you have multiple queues you can assign them weights. A queue with a weight of 2 will be checked twice as often as a queue with a weight of 1: -queues='high=2,low=1'.

-interval=5.0 — Specifies the wait period between polling if no job was in the queue the last time one was requested.

-concurrency=25 — Specifies the number of concurrently executing workers. This number can be as low as 1 or rather comfortably as high as 100,000, and should be tuned to your workflow and the availability of outside resources.

-connections=2 — Specifies the maximum number of Redis connections that goworker will consume between the poller and all workers. There is not much performance gain over two and a slight penalty when using only one. This is configurable in case you need to keep connection counts low for cloud Redis providers who limit plans on maxclients.

-uri=redis://localhost:6379/ — Specifies the URI of the Redis database from which goworker polls for jobs. Accepts URIs of the format redis://user:pass@host:port/db or unix:///path/to/redis.sock. The flag may also be set by the environment variable $($REDIS_PROVIDER) or $REDIS_URL. E.g. set $REDIS_PROVIDER to REDISTOGO_URL on Heroku to let the Redis To Go add-on configure the Redis database.

-namespace=resque: — Specifies the namespace from which goworker retrieves jobs and stores stats on workers.

-exit-on-complete=false — Exits goworker when there are no jobs left in the queue. This is helpful in conjunction with the time command to benchmark different configurations.

-use-number=false — Uses json.Number when decoding numbers in the job payloads. This will avoid issues that occur when goworker and the json package decode large numbers as floats, which then get encoded in scientific notation, losing pecision. This will default to true soon.

You can also configure your own flags for use within your workers. Be sure to set them before calling goworker.Main(). It is okay to call flags.Parse() before calling goworker.Main() if you need to do additional processing on your flags.

Signal Handling in goworker

To stop goworker, send a QUIT, TERM, or INT signal to the process. This will immediately stop job polling. There can be up to $CONCURRENCY jobs currently running, which will continue to run until they are finished.

Failure Modes

Like Resque, goworker makes no guarantees about the safety of jobs in the event of process shutdown. Workers must be both idempotent and tolerant to loss of the job in the event of failure.

If the process is killed with a KILL or by a system failure, there may be one job that is currently in the poller's buffer that will be lost without any representation in either the queue or the worker variable.

If you are running Goworker on a system like Heroku, which sends a TERM to signal a process that it needs to stop, ten seconds later sends a KILL to force the process to stop, your jobs must finish within 10 seconds or they may be lost. Jobs will be recoverable from the Redis database under

resque:worker:<hostname>:<process-id>-<worker-id>:<queues>

as a JSON object with keys queue, run_at, and payload, but the process is manual. Additionally, there is no guarantee that the job in Redis under the worker key has not finished, if the process is killed before goworker can flush the update to Redis.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Close

func Close()

Close cleans up resources initialized by goworker. This will be called by Work when cleaning up. However, if you are using the Init function to access goworker functions and configuration without processing jobs by calling Work, you should run this function when cleaning up. For example,

if err := goworker.Init(); err != nil {
	fmt.Println("Error:", err)
}
defer goworker.Close()

func Enqueue

func Enqueue(job *Job) error

func Init

func Init() error

Init initializes the goworker process. This will be called by the Work function, but may be used by programs that wish to access goworker functions and configuration without actually processing jobs.

func Namespace

func Namespace() string

Namespace returns the namespace flag for goworker. You can use this with the GetConn and PutConn functions to operate on the same namespace that goworker uses.

func PutConn

func PutConn(conn *RedisConn)

PutConn puts a connection back into the connection pool. Run this as soon as you finish using a connection that you got from GetConn. Expect this API to change drastically.

func Register

func Register(class string, worker workerFunc)

Register registers a goworker worker function. Class refers to the Ruby name of the class which enqueues the job. Worker is a function which accepts a queue and an arbitrary array of interfaces as arguments.

func SetSettings

func SetSettings(settings WorkerSettings)

func Work

func Work() error

Work starts the goworker process. Check for errors in the return value. Work will take over the Go executable and will run until a QUIT, INT, or TERM signal is received, or until the queues are empty if the -exit-on-complete flag is set.

Types

type Job

type Job struct {
	Queue   string
	Payload Payload
}

type Payload

type Payload struct {
	Class string        `json:"class"`
	Args  []interface{} `json:"args"`
}

type RedisConn

type RedisConn struct {
	redis.Conn
}

func GetConn

func GetConn() (*RedisConn, error)

GetConn returns a connection from the goworker Redis connection pool. When using the pool, check in connections as quickly as possible, because holding a connection will cause concurrent worker functions to lock while they wait for an available connection. Expect this API to change drastically.

func (*RedisConn) Close

func (r *RedisConn) Close()

type WorkerSettings

type WorkerSettings struct {
	QueuesString   string
	Queues         queuesFlag
	IntervalFloat  float64
	Interval       intervalFlag
	Concurrency    int
	Connections    int
	URI            string
	Namespace      string
	ExitOnComplete bool
	IsStrict       bool
	UseNumber      bool
	SkipTLSVerify  bool
	TLSCertPath    string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL