workers

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2022 License: MIT Imports: 22 Imported by: 0

README

Build Status GoDoc

Sidekiq compatible background workers in golang.

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • well tested

Example usage:

package main

import (
	"github.com/jrallison/go-workers"
)

func myJob(message *workers.Msg) error {
  // do something with your message
  // message.Jid()
  // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
  return nil
}

type myMiddleware struct{}

func (r *myMiddleware) Call(queue string, message *workers.Msg, next func() bool) (acknowledge bool) {
  // do something before each message is processed
  acknowledge = next()
  // do something after each message is processed
  return
} 

func main() {
  workers.Configure(map[string]interface{}{
    // location of redis instance
    "server":  "localhost:6379",
    // instance of the database
    "database":  0,
    // number of connections to keep open with redis
    "pool":    30,
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    "process": "1",
  })

  workers.Middleware.Append(&myMiddleware{})

  // pull messages from "myqueue" with concurrency of 10
  workers.Process("myqueue", myJob, 10)

  // pull messages from "myqueue2" with concurrency of 20
  workers.Process("myqueue2", myJob, 20)

  // Add a job to a queue
  workers.Enqueue("myqueue3", "Add", []int{1, 2})

  // Add a job to a queue with retry
  workers.Enqueue("myqueue3", "Add", []int{1, 2}, workers.WithMaxRetries(25))

  // stats will be available at http://localhost:8080/stats
  go workers.StatsServer(8080)

  // Blocks until process is told to exit via unix signal
  workers.Run()
}

Initial development sponsored by Customer.io

Documentation

Index

Constants

View Source
const (
	DEFAULT_MAX_RETRIES = 25
)
View Source
const (
	LAYOUT = "2006-01-02 15:04:05 MST"
)
View Source
const (
	NanoSecondPrecision = 1000000000.0
)

Variables

View Source
var (
	RETRY_KEY           = "goretry"
	SCHEDULED_JOBS_KEY  = "schedule"
	INPROGRESS_JOBS_KEY = "inprogress"
	ARGV_VALUE_KEY      = "argvValue"
	CANCEL_KEY          = "cancel"
)
View Source
var Config *config
View Source
var (
	ErrJidExists = fmt.Errorf("jid has already exists")
)
View Source
var Logger = logrus.WithField("from", "go-workers")

Functions

func BeforeStart

func BeforeStart(f func())

func CancelJob added in v0.1.1

func CancelJob(jid string) error

func Configure

func Configure(options map[string]interface{})

func DuringDrain

func DuringDrain(f func())

func Enqueue

func Enqueue(queue, class string, args interface{}, opts ...EnqueueOptions) (string, error)

func JobExists

func JobExists(jid string) (bool, error)

func Process

func Process(queue string, job jobFunc, concurrency int, mids ...Action)

func Quit

func Quit()

func ResetManagers

func ResetManagers() error

func Run

func Run()

func ShouldRetry

func ShouldRetry(message *Msg) bool

func Start

func Start()

func Stats

func Stats(w http.ResponseWriter, req *http.Request)

func StatsServer

func StatsServer(port int)

Types

type Acknowledge

type Acknowledge struct {
	KeepData bool
	// contains filtered or unexported fields
}

type Action

type Action interface {
	Call(queue string, message *Msg, next func() CallResult) CallResult
}

type Args

type Args struct {
	// contains filtered or unexported fields
}

func (Args) Equals

func (d Args) Equals(other interface{}) bool

func (Args) ToJson

func (d Args) ToJson() string

type CallResult

type CallResult struct {
	Acknowledge bool
	KeepValue   bool
	Err         error
}

type EnqueueData

type EnqueueData struct {
	Queue      string      `json:"queue,omitempty"`
	Class      string      `json:"class"`
	Args       interface{} `json:"args"`
	Jid        string      `json:"jid"`
	EnqueuedAt float64     `json:"enqueued_at"`
	EnqueueParam
}

type EnqueueOptFunc

type EnqueueOptFunc func(param *EnqueueParam)

func WithAt

func WithAt(at time.Time) EnqueueOptFunc

func WithIn

func WithIn(in time.Duration) EnqueueOptFunc

func WithJid added in v0.1.2

func WithJid(jid string) EnqueueOptFunc

func WithMaxRetries

func WithMaxRetries(maxRetries int) EnqueueOptFunc

func WithRetry

func WithRetry() EnqueueOptFunc

func (EnqueueOptFunc) Apply

func (e EnqueueOptFunc) Apply(param *EnqueueParam)

type EnqueueOptions

type EnqueueOptions interface {
	Apply(param *EnqueueParam)
}

type EnqueueParam

type EnqueueParam struct {
	RetryCount int     `json:"retry_count,omitempty"`
	At         float64 `json:"at,omitempty"`
	MaxRetries int     `json:"max_retries,omitempty"`
	Jid        string  `json:"jid,omitempty"`
}

type Fetcher

type Fetcher interface {
	Queue() string
	Fetch()
	Acknowledge(*Acknowledge)
	Ready() chan bool
	FinishedWork() chan bool
	Messages() chan *Msg
	Close()
	Closed() bool
	HeartbeatJob(*Msg) chan bool
	Heartbeat(*Msg)
	Continue(*Msg) bool
}

func NewFetch

func NewFetch(queue string, messages chan *Msg, ready chan bool) Fetcher

type MiddlewareLogging

type MiddlewareLogging struct{}

func (*MiddlewareLogging) Call

func (l *MiddlewareLogging) Call(queue string, message *Msg, next func() CallResult) (result CallResult)

type MiddlewareRetry

type MiddlewareRetry struct{}

func (*MiddlewareRetry) Call

func (r *MiddlewareRetry) Call(queue string, message *Msg, next func() CallResult) (result CallResult)

type MiddlewareStats

type MiddlewareStats struct{}

func (*MiddlewareStats) Call

func (l *MiddlewareStats) Call(queue string, message *Msg, next func() CallResult) (result CallResult)

type Middlewares

type Middlewares struct {
	// contains filtered or unexported fields
}

func NewMiddleware

func NewMiddleware(actions ...Action) *Middlewares

func (*Middlewares) Append

func (m *Middlewares) Append(action Action)

func (*Middlewares) Prepend

func (m *Middlewares) Prepend(action Action)

type Msg

type Msg struct {
	Logger  *logrus.Entry
	Context context.Context
	// contains filtered or unexported fields
}

func NewMsg

func NewMsg(content string) (*Msg, error)

func (*Msg) Args

func (m *Msg) Args() *Args

func (*Msg) Args2Obj

func (s *Msg) Args2Obj(obj interface{}) error

func (Msg) Equals

func (d Msg) Equals(other interface{}) bool

func (*Msg) Jid

func (m *Msg) Jid() string

func (*Msg) OriginalJson

func (m *Msg) OriginalJson() string

func (Msg) ToJson

func (d Msg) ToJson() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL