README
¶
Sidekiq compatible background wrkrs in golang.
- reliable queueing for all queues using brpoplpush
- handles retries
- support custom middleware
- customize concurrency per queue
- responds to Unix signals to safely wait for jobs to finish before exiting.
- provides stats on what jobs are currently running
- well tested
Example usage:
package main
import (
"github.com/jrallison/go-wrkrs"
)
func myJob(message *wrkrs.Msg) {
// do something with your message
// message.Jid()
// message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
}
type myMiddleware struct{}
func (r *myMiddleware) Call(queue string, message *wrkrs.Msg, next func() bool) (acknowledge bool) {
// do something before each message is processed
acknowledge = next()
// do something after each message is processed
return
}
func main() {
wrkrs.Configure(map[string]string{
// location of redis instance
"server": "localhost:6379",
// instance of the database
"database": "0",
// number of connections to keep open with redis
"pool": "30",
// unique process id for this instance of wrkrs (for proper recovery of inprogress jobs on crash)
"process": "1",
})
wrkrs.Middleware.Append(&myMiddleware{})
// pull messages from "myqueue" with concurrency of 10
wrkrs.Process("myqueue", myJob, 10)
// pull messages from "myqueue2" with concurrency of 20
wrkrs.Process("myqueue2", myJob, 20)
// Add a job to a queue
wrkrs.Enqueue("myqueue3", "Add", []int{1, 2})
// Add a job to a queue with retry
wrkrs.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, wrkrs.EnqueueOptions{Retry: true})
// stats will be available at http://localhost:8080/stats
go wrkrs.StatsServer(8080)
// Blocks until process is told to exit via unix signal
wrkrs.Run()
}
Initial development sponsored by Customer.io
Documentation
¶
Index ¶
- Constants
- Variables
- func BeforeStart(f func())
- func Configure(options map[string]string)
- func DuringDrain(f func())
- func Enqueue(queue, class string, args interface{}) (string, error)
- func EnqueueAt(queue, class string, at time.Time, args interface{}) (string, error)
- func EnqueueIn(queue, class string, in float64, args interface{}) (string, error)
- func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)
- func Process(queue string, job jobFunc, concurrency int, mids ...Action)
- func Quit()
- func ResetManagers() error
- func Run()
- func Start()
- func Stats(w http.ResponseWriter, req *http.Request)
- func StatsServer(port int)
- type Action
- type Args
- type EnqueueData
- type EnqueueOptions
- type Fetcher
- type MiddlewareLogging
- type MiddlewareRetry
- type MiddlewareStats
- type Middlewares
- type Msg
- type WorkersLogger
Constants ¶
View Source
const ( DEFAULT_MAX_RETRY = 25 LAYOUT = "2006-01-02 15:04:05 MST" )
View Source
const ( RETRY_KEY = "goretry" SCHEDULED_JOBS_KEY = "schedule" )
View Source
const (
NanoSecondPrecision = 1000000000.0
)
Variables ¶
View Source
var Config *config
View Source
var Middleware = NewMiddleware( &MiddlewareLogging{}, &MiddlewareRetry{}, &MiddlewareStats{}, )
Functions ¶
func BeforeStart ¶
func BeforeStart(f func())
func DuringDrain ¶
func DuringDrain(f func())
func EnqueueWithOptions ¶
func EnqueueWithOptions(queue, class string, args interface{}, opts EnqueueOptions) (string, error)
func ResetManagers ¶
func ResetManagers() error
func StatsServer ¶
func StatsServer(port int)
Types ¶
type EnqueueData ¶
type EnqueueData struct { Queue string `json:"queue,omitempty"` Class string `json:"class"` Args interface{} `json:"args"` Jid string `json:"jid"` EnqueuedAt float64 `json:"enqueued_at"` EnqueueOptions }
type EnqueueOptions ¶
type Fetcher ¶
type MiddlewareLogging ¶
type MiddlewareLogging struct{}
type MiddlewareRetry ¶
type MiddlewareRetry struct{}
type MiddlewareStats ¶
type MiddlewareStats struct{}
type Middlewares ¶
type Middlewares struct {
// contains filtered or unexported fields
}
func NewMiddleware ¶
func NewMiddleware(actions ...Action) *Middlewares
func (*Middlewares) Append ¶
func (m *Middlewares) Append(action Action)
func (*Middlewares) Prepend ¶
func (m *Middlewares) Prepend(action Action)
type WorkersLogger ¶
type WorkersLogger interface { Println(...interface{}) Printf(string, ...interface{}) }
var Logger WorkersLogger = log.New(os.Stdout, "wrkrs: ", log.Ldate|log.Lmicroseconds)
Click to show internal directories.
Click to hide internal directories.