beanstalkworker

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 1, 2024 License: GPL-2.0 Imports: 10 Imported by: 0

README

beanstalkworker

A helper library for creating beanstalkd consumer processes.

Usage

go get -u github.com/infinitytracking/beanstalkworker

Docs/Examples

Please see Go Docs for usage and examples:

https://godoc.org/github.com/infinitytracking/beanstalkworker

Aims

  • To provide a generic way for consuming beanstalkd jobs without all of the boiler plate code
  • To provide an easy way to spin up concurrent worker Go routines
  • To use Go's interfaces to make unit testing your workers easy

Details

The library is broken down into the following components:

  • JobManager interface - represents a way to handle a job's lifecycle.
  • RawJob - an implementation of JobManager for managing a Raw job's life cycle.
  • Worker - an implementation of a beanstalkd client process that consumes raw jobs from one or more tubes. It will automatically reconnect to beanstalkd server if it loses the connection.

See also

This library is a wrapper around the low-level Beanstalkd client written in Go:

https://github.com/beanstalkd/go-beanstalk

This client talks to Beanstalkd queue server:

https://beanstalkd.github.io/

Documentation

Overview

Example (Worker)
package main

import "github.com/infinitytracking/beanstalkworker"
import "context"
import "os"
import "os/signal"
import "syscall"
import "log"
import "fmt"
import "time"

func main() {
	//Setup context for cancelling beanstalk Worker.
	ctx, cancel := context.WithCancel(context.Background())

	//Start up signal handler that will cleanly shutdown beanstalk Worker.
	go signalHandler(cancel)

	//Define a new Worker process - how to connect to the beanstalkd server.
	bsWorker := beanstalkworker.NewWorker("127.0.0.1:11300")

	//Optional custom logger - see below.
	bsWorker.SetLogger(&MyLogger{})

	//Set concurrent Worker threads to 2.
	bsWorker.SetNumWorkers(2)

	//Job is deleted from the queue if unmarshal error appears. We can
	//decide to bury or release (default behaviour) it as well.
	bsWorker.SetUnmarshalErrorAction(beanstalkworker.ActionDeleteJob)

	//Define a common value (example a shared database connection)
	commonVar := "some common value"

	//Add one or more subcriptions to specific tubes with a handler function.
	bsWorker.Subscribe("job1", func(jobMgr beanstalkworker.JobManager, jobData Job1Data) {
		//Create a fresh handler struct per job (this ensures fresh state for each job).
		handler := &Job1Handler{
			JobManager: jobMgr,    //Embed the JobManager into the handler.
			commonVar:  commonVar, //Pass the commonVar into the handler.
		}

		handler.Run(jobData)
	})

	//Run the beanstalk Worker, this blocks until the context is cancelled.
	//It will also handle reconnecting to beanstalkd server automatically.
	bsWorker.Run(ctx)
}

// signalHandler catches OS signals for program to end.
func signalHandler(cancel context.CancelFunc) {
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
	for {
		<-sigc
		log.Print("Got signal, cancelling context")
		cancel()
	}
}

//Custom Logging Example

// MyLogger provides custom logging.
type MyLogger struct {
}

// Info logs a custom info message regarding the job.
func (l *MyLogger) Info(v ...interface{}) {
	log.Print("MyInfo: ", fmt.Sprint(v...))
}

// Infof logs a custom info message regarding the job.
func (l *MyLogger) Infof(format string, v ...interface{}) {
	format = "MyInfof: " + format
	log.Print(fmt.Sprintf(format, v...))
}

// Error logs a custom error message regarding the job.
func (l *MyLogger) Error(v ...interface{}) {
	log.Print("MyError: ", fmt.Sprint(v...))
}

// Errorf logs a custom error message regarding the job.
func (l *MyLogger) Errorf(format string, v ...interface{}) {
	format = "MyErrorf: " + format
	log.Print(fmt.Sprintf(format, v...))
}

//Job Handler

// Job1Handler contains the business logic to handle the Job1 type jobs.
type Job1Handler struct {
	beanstalkworker.JobManager
	commonVar string
}

// Job1Data is a struct that represents the Job1 data that arrives from the queue.
type Job1Data struct {
	SomeField      string `json:"someField"`
	SomeOtherField int    `json:"someOtherField"`
}

// LogError example of overriding a function provided in beanstalkworker.JobManager
// and calling the underlying function in order to add context.
func (handler *Job1Handler) LogError(a ...interface{}) {
	handler.JobManager.LogError("Job1 error: ", fmt.Sprint(a...))
}

// Run is executed by the beanstalk Worker when a Job1 type job is received.
func (handler *Job1Handler) Run(jobData Job1Data) {
	handler.LogInfo("Starting job with commonVar value: ", handler.commonVar)
	handler.LogInfo("Job Data received: ", jobData)
	handler.LogInfo("Job Priority: ", handler.GetPriority())
	handler.LogInfo("Job Releases: ", handler.GetReleases())
	handler.LogInfo("Job Reserves: ", handler.GetReserves())
	handler.LogInfo("Job Age: ", handler.GetAge())
	handler.LogInfo("Job Delay: ", handler.GetDelay())
	handler.LogInfo("Job Timeouts: ", handler.GetTimeouts())
	handler.LogInfo("Job Tube: ", handler.GetTube())
	// Retrieve the server's hostname where the job is running
	conn := handler.GetConn()
	stats, err := conn.Stats()
	if err != nil {
		handler.Release()
		return
	}
	handler.LogInfo("Hostname: ", stats["hostname"])

	//Simulate job processing time
	time.Sleep(2 * time.Second)

	if handler.GetTimeouts() == 0 {
		handler.LogInfo("Simulating a timeout by not releasing/deleting job")
		return
	}

	if handler.GetReserves() == 2 {
		handler.LogInfo("Release without setting custom delay or priority")
		handler.Release()
		return
	}

	handler.SetReturnDelay(5 * time.Second) //Optional return delay (defaults to current delay)
	handler.SetReturnPriority(5)            //Optional return priority (defaults to current priority)

	if handler.GetReleases() >= 3 {
		handler.Delete()
		handler.LogError("Deleting job as too many releases")
		return
	}

	handler.LogInfo("Releasing job to be retried...")
	handler.Release() //Pretend job process failed and needs retrying
}
Output:

Index

Examples

Constants

View Source
const (
	ActionDeleteJob  = "delete"
	ActionBuryJob    = "bury"
	ActionReleaseJob = "release"
)

Actions the user can choose in case of an unmarshal error.

Variables

This section is empty.

Functions

This section is empty.

Types

type CustomLogger

type CustomLogger interface {
	Info(v ...interface{})
	Infof(format string, args ...interface{})
	Error(v ...interface{})
	Errorf(format string, args ...interface{})
}

CustomLogger provides support for the creation of custom logging.

type Handler

type Handler interface{}

Handler provides an interface type for callback functions.

type JobManager

type JobManager interface {
	Delete()
	Touch()
	Release()
	LogError(a ...interface{})
	LogInfo(a ...interface{})
	GetAge() time.Duration
	GetPriority() uint32
	GetReleases() uint32
	GetReserves() uint32
	GetTimeouts() uint32
	GetDelay() time.Duration
	GetTube() string
	GetConn() *beanstalk.Conn
	SetReturnPriority(prio uint32)
	SetReturnDelay(delay time.Duration)
}

JobManager interface represents a way to handle a job's lifecycle.

type Logger

type Logger struct {
	Info   func(v ...interface{})
	Infof  func(format string, v ...interface{})
	Error  func(v ...interface{})
	Errorf func(format string, v ...interface{})
}

Logger provides support for standard logging.

func NewDefaultLogger

func NewDefaultLogger() *Logger

NewDefaultLogger creates a new Logger initialised to use the global log package.

type MockJob

type MockJob struct {
	// contains filtered or unexported fields
}

func NewMockJob

func NewMockJob(body string) *MockJob

func NewWillDeleteMockJob

func NewWillDeleteMockJob(body string) *MockJob

func NewWillReleaseMockJob

func NewWillReleaseMockJob(body string) *MockJob

func NewWillTouchMockJob

func NewWillTouchMockJob(body string) *MockJob

func (*MockJob) Delete

func (job *MockJob) Delete()

func (*MockJob) GetAge

func (job *MockJob) GetAge() time.Duration

func (*MockJob) GetConn

func (job *MockJob) GetConn() *beanstalk.Conn

func (*MockJob) GetDelay

func (job *MockJob) GetDelay() time.Duration

func (*MockJob) GetPriority

func (job *MockJob) GetPriority() uint32

func (*MockJob) GetReleases

func (job *MockJob) GetReleases() uint32

func (*MockJob) GetReserves

func (job *MockJob) GetReserves() uint32

func (*MockJob) GetTimeouts

func (job *MockJob) GetTimeouts() uint32

func (*MockJob) GetTube

func (job *MockJob) GetTube() string

func (*MockJob) LogError

func (job *MockJob) LogError(a ...interface{})

func (*MockJob) LogInfo

func (job *MockJob) LogInfo(a ...interface{})

func (*MockJob) Release

func (job *MockJob) Release()

func (*MockJob) SetReturnDelay

func (job *MockJob) SetReturnDelay(delay time.Duration)

func (*MockJob) SetReturnPriority

func (job *MockJob) SetReturnPriority(prio uint32)

func (*MockJob) Touch

func (job *MockJob) Touch()

type MockLogger

type MockLogger struct {
}

MockLogger A custom logger that must implement Info() Infof(), Error() and Errorf() to implement CustomLogger

func (*MockLogger) Error

func (l *MockLogger) Error(v ...interface{})

func (*MockLogger) Errorf

func (l *MockLogger) Errorf(format string, v ...interface{})

func (*MockLogger) Info

func (l *MockLogger) Info(v ...interface{})

func (*MockLogger) Infof

func (l *MockLogger) Infof(format string, v ...interface{})

type MockWorker

type MockWorker struct {
	// contains filtered or unexported fields
}

func (*MockWorker) Run

func (w *MockWorker) Run(ctx context.Context)

func (*MockWorker) SetLogger

func (w *MockWorker) SetLogger(cl CustomLogger)

func (*MockWorker) SetNumWorkers

func (w *MockWorker) SetNumWorkers(numWorkers int)

func (*MockWorker) SetUnmarshalErrorAction

func (w *MockWorker) SetUnmarshalErrorAction(action string)

func (*MockWorker) Subscribe

func (w *MockWorker) Subscribe(tube string, cb Handler)

type RawJob

type RawJob struct {
	// contains filtered or unexported fields
}

RawJob represents the raw job data that is returned by beanstalkd.

func NewEmptyJob

func NewEmptyJob(cl CustomLogger) *RawJob

NewEmptyJob initialises a new empty RawJob with a custom logger. Useful for testing methods that log messages on the job.

func (*RawJob) Bury

func (job *RawJob) Bury()

Bury function buries the job from the queue.

func (*RawJob) Delete

func (job *RawJob) Delete()

Delete function deletes the job from the queue.

func (*RawJob) GetAge

func (job *RawJob) GetAge() time.Duration

GetAge gets the age of the job from the job stats.

func (*RawJob) GetConn

func (job *RawJob) GetConn() *beanstalk.Conn

GetConn returns the beanstalk connection used to receive the job.

func (*RawJob) GetDelay

func (job *RawJob) GetDelay() time.Duration

GetDelay gets the delay of the job from the job stats.

func (*RawJob) GetPriority

func (job *RawJob) GetPriority() uint32

GetPriority gets the priority of the job.

func (*RawJob) GetReleases

func (job *RawJob) GetReleases() uint32

GetReleases gets the count of release of the job.

func (*RawJob) GetReserves

func (job *RawJob) GetReserves() uint32

GetReserves gets the count of reserves of the job.

func (*RawJob) GetTimeouts

func (job *RawJob) GetTimeouts() uint32

GetTimeouts gets the count of timeouts of the job.

func (*RawJob) GetTube

func (job *RawJob) GetTube() string

GetTube returns the tube name we got this job from.

func (*RawJob) LogError

func (job *RawJob) LogError(a ...interface{})

LogError function logs an error message regarding the job.

func (*RawJob) LogInfo

func (job *RawJob) LogInfo(a ...interface{})

LogInfo function logs an info message regarding the job.

func (*RawJob) Release

func (job *RawJob) Release()

Release function releases the job from the queue.

func (*RawJob) SetReturnDelay

func (job *RawJob) SetReturnDelay(delay time.Duration)

SetReturnDelay sets the return delay to use if a job is released back to queue.

func (*RawJob) SetReturnPriority

func (job *RawJob) SetReturnPriority(prio uint32)

SetReturnPriority sets the return priority to use if a job is released or buried.

func (*RawJob) Touch

func (job *RawJob) Touch()

Touch function touches the job from the queue.

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker represents a single process that is connecting to beanstalkd and is consuming jobs from one or more tubes.

func NewWorker

func NewWorker(addr string) *Worker

NewWorker creates a new Worker process, but does not actually connect to beanstalkd server yet.

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

Run starts one or more Worker threads based on the numWorkers value. If numWorkers is set to zero or less then 1 Worker is started.

func (*Worker) SetLogger

func (w *Worker) SetLogger(cl CustomLogger)

SetLogger switches logging to use a custom Logger.

func (*Worker) SetNumWorkers

func (w *Worker) SetNumWorkers(numWorkers int)

SetNumWorkers sets the number of concurrent workers threads that should be started. Each thread establishes a separate connection to the beanstalkd server.

func (*Worker) SetUnmarshalErrorAction

func (w *Worker) SetUnmarshalErrorAction(action string)

SetUnmarshalErrorAction defines what to do if there is an unmarshal error.

func (*Worker) Subscribe

func (w *Worker) Subscribe(tube string, cb Handler)

Subscribe adds a handler function to be run for jobs coming from a particular tube.

type WorkerClient

type WorkerClient interface {
	Subscribe(tube string, cb Handler)
	SetNumWorkers(numWorkers int)
	SetLogger(cl CustomLogger)
	SetUnmarshalErrorAction(action string)
	Run(ctx context.Context)
}

func NewMockWillDeleteWorker

func NewMockWillDeleteWorker(tube, jobStr string) WorkerClient

func NewMockWillReleaseWorker

func NewMockWillReleaseWorker(tube, jobStr string) WorkerClient

func NewMockWillTouchWorker

func NewMockWillTouchWorker(tube, jobStr string) WorkerClient

func NewMockWorker

func NewMockWorker() WorkerClient

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL