Documentation ¶
Overview ¶
Example (Worker) ¶
package main import "github.com/infinitytracking/beanstalkworker" import "context" import "os" import "os/signal" import "syscall" import "log" import "fmt" import "time" func main() { //Setup context for cancelling beanstalk Worker. ctx, cancel := context.WithCancel(context.Background()) //Start up signal handler that will cleanly shutdown beanstalk Worker. go signalHandler(cancel) //Define a new Worker process - how to connect to the beanstalkd server. bsWorker := beanstalkworker.NewWorker("127.0.0.1:11300") //Optional custom logger - see below. bsWorker.SetLogger(&MyLogger{}) //Set concurrent Worker threads to 2. bsWorker.SetNumWorkers(2) //Job is deleted from the queue if unmarshal error appears. We can //decide to bury or release (default behaviour) it as well. bsWorker.SetUnmarshalErrorAction(beanstalkworker.ActionDeleteJob) //Define a common value (example a shared database connection) commonVar := "some common value" //Add one or more subcriptions to specific tubes with a handler function. bsWorker.Subscribe("job1", func(jobMgr beanstalkworker.JobManager, jobData Job1Data) { //Create a fresh handler struct per job (this ensures fresh state for each job). handler := &Job1Handler{ JobManager: jobMgr, //Embed the JobManager into the handler. commonVar: commonVar, //Pass the commonVar into the handler. } handler.Run(jobData) }) //Run the beanstalk Worker, this blocks until the context is cancelled. //It will also handle reconnecting to beanstalkd server automatically. bsWorker.Run(ctx) } // signalHandler catches OS signals for program to end. func signalHandler(cancel context.CancelFunc) { sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT) for { <-sigc log.Print("Got signal, cancelling context") cancel() } } //Custom Logging Example // MyLogger provides custom logging. type MyLogger struct { } // Info logs a custom info message regarding the job. func (l *MyLogger) Info(v ...interface{}) { log.Print("MyInfo: ", fmt.Sprint(v...)) } // Infof logs a custom info message regarding the job. func (l *MyLogger) Infof(format string, v ...interface{}) { format = "MyInfof: " + format log.Print(fmt.Sprintf(format, v...)) } // Error logs a custom error message regarding the job. func (l *MyLogger) Error(v ...interface{}) { log.Print("MyError: ", fmt.Sprint(v...)) } // Errorf logs a custom error message regarding the job. func (l *MyLogger) Errorf(format string, v ...interface{}) { format = "MyErrorf: " + format log.Print(fmt.Sprintf(format, v...)) } //Job Handler // Job1Handler contains the business logic to handle the Job1 type jobs. type Job1Handler struct { beanstalkworker.JobManager commonVar string } // Job1Data is a struct that represents the Job1 data that arrives from the queue. type Job1Data struct { SomeField string `json:"someField"` SomeOtherField int `json:"someOtherField"` } // LogError example of overriding a function provided in beanstalkworker.JobManager // and calling the underlying function in order to add context. func (handler *Job1Handler) LogError(a ...interface{}) { handler.JobManager.LogError("Job1 error: ", fmt.Sprint(a...)) } // Run is executed by the beanstalk Worker when a Job1 type job is received. func (handler *Job1Handler) Run(jobData Job1Data) { handler.LogInfo("Starting job with commonVar value: ", handler.commonVar) handler.LogInfo("Job Data received: ", jobData) handler.LogInfo("Job Priority: ", handler.GetPriority()) handler.LogInfo("Job Releases: ", handler.GetReleases()) handler.LogInfo("Job Reserves: ", handler.GetReserves()) handler.LogInfo("Job Age: ", handler.GetAge()) handler.LogInfo("Job Delay: ", handler.GetDelay()) handler.LogInfo("Job Timeouts: ", handler.GetTimeouts()) handler.LogInfo("Job Tube: ", handler.GetTube()) // Retrieve the server's hostname where the job is running conn := handler.GetConn() stats, err := conn.Stats() if err != nil { handler.Release() return } handler.LogInfo("Hostname: ", stats["hostname"]) //Simulate job processing time time.Sleep(2 * time.Second) if handler.GetTimeouts() == 0 { handler.LogInfo("Simulating a timeout by not releasing/deleting job") return } if handler.GetReserves() == 2 { handler.LogInfo("Release without setting custom delay or priority") handler.Release() return } handler.SetReturnDelay(5 * time.Second) //Optional return delay (defaults to current delay) handler.SetReturnPriority(5) //Optional return priority (defaults to current priority) if handler.GetReleases() >= 3 { handler.Delete() handler.LogError("Deleting job as too many releases") return } handler.LogInfo("Releasing job to be retried...") handler.Release() //Pretend job process failed and needs retrying }
Output:
Index ¶
- Constants
- type CustomLogger
- type Handler
- type JobManager
- type Logger
- type MockJob
- func (job *MockJob) Delete()
- func (job *MockJob) GetAge() time.Duration
- func (job *MockJob) GetConn() *beanstalk.Conn
- func (job *MockJob) GetDelay() time.Duration
- func (job *MockJob) GetPriority() uint32
- func (job *MockJob) GetReleases() uint32
- func (job *MockJob) GetReserves() uint32
- func (job *MockJob) GetTimeouts() uint32
- func (job *MockJob) GetTube() string
- func (job *MockJob) LogError(a ...interface{})
- func (job *MockJob) LogInfo(a ...interface{})
- func (job *MockJob) Release()
- func (job *MockJob) SetReturnDelay(delay time.Duration)
- func (job *MockJob) SetReturnPriority(prio uint32)
- func (job *MockJob) Touch()
- type MockLogger
- type MockWorker
- type RawJob
- func (job *RawJob) Bury()
- func (job *RawJob) Delete()
- func (job *RawJob) GetAge() time.Duration
- func (job *RawJob) GetConn() *beanstalk.Conn
- func (job *RawJob) GetDelay() time.Duration
- func (job *RawJob) GetPriority() uint32
- func (job *RawJob) GetReleases() uint32
- func (job *RawJob) GetReserves() uint32
- func (job *RawJob) GetTimeouts() uint32
- func (job *RawJob) GetTube() string
- func (job *RawJob) LogError(a ...interface{})
- func (job *RawJob) LogInfo(a ...interface{})
- func (job *RawJob) Release()
- func (job *RawJob) SetReturnDelay(delay time.Duration)
- func (job *RawJob) SetReturnPriority(prio uint32)
- func (job *RawJob) Touch()
- type Worker
- type WorkerClient
Examples ¶
Constants ¶
const ( ActionDeleteJob = "delete" ActionBuryJob = "bury" ActionReleaseJob = "release" )
Actions the user can choose in case of an unmarshal error.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CustomLogger ¶
type CustomLogger interface { Info(v ...interface{}) Infof(format string, args ...interface{}) Error(v ...interface{}) Errorf(format string, args ...interface{}) }
CustomLogger provides support for the creation of custom logging.
type JobManager ¶
type JobManager interface { Delete() Touch() Release() LogError(a ...interface{}) LogInfo(a ...interface{}) GetAge() time.Duration GetPriority() uint32 GetReleases() uint32 GetReserves() uint32 GetTimeouts() uint32 GetDelay() time.Duration GetTube() string GetConn() *beanstalk.Conn SetReturnPriority(prio uint32) SetReturnDelay(delay time.Duration) }
JobManager interface represents a way to handle a job's lifecycle.
type Logger ¶
type Logger struct { Info func(v ...interface{}) Infof func(format string, v ...interface{}) Error func(v ...interface{}) Errorf func(format string, v ...interface{}) }
Logger provides support for standard logging.
func NewDefaultLogger ¶
func NewDefaultLogger() *Logger
NewDefaultLogger creates a new Logger initialised to use the global log package.
type MockJob ¶
type MockJob struct {
// contains filtered or unexported fields
}
func NewMockJob ¶
func NewWillDeleteMockJob ¶
func NewWillReleaseMockJob ¶
func NewWillTouchMockJob ¶
func (*MockJob) GetPriority ¶
func (*MockJob) GetReleases ¶
func (*MockJob) GetReserves ¶
func (*MockJob) GetTimeouts ¶
func (*MockJob) SetReturnDelay ¶
func (*MockJob) SetReturnPriority ¶
type MockLogger ¶
type MockLogger struct { }
MockLogger A custom logger that must implement Info() Infof(), Error() and Errorf() to implement CustomLogger
func (*MockLogger) Error ¶
func (l *MockLogger) Error(v ...interface{})
func (*MockLogger) Errorf ¶
func (l *MockLogger) Errorf(format string, v ...interface{})
func (*MockLogger) Info ¶
func (l *MockLogger) Info(v ...interface{})
func (*MockLogger) Infof ¶
func (l *MockLogger) Infof(format string, v ...interface{})
type MockWorker ¶
type MockWorker struct {
// contains filtered or unexported fields
}
func (*MockWorker) Run ¶
func (w *MockWorker) Run(ctx context.Context)
func (*MockWorker) SetLogger ¶
func (w *MockWorker) SetLogger(cl CustomLogger)
func (*MockWorker) SetNumWorkers ¶
func (w *MockWorker) SetNumWorkers(numWorkers int)
func (*MockWorker) SetUnmarshalErrorAction ¶
func (w *MockWorker) SetUnmarshalErrorAction(action string)
func (*MockWorker) Subscribe ¶
func (w *MockWorker) Subscribe(tube string, cb Handler)
type RawJob ¶
type RawJob struct {
// contains filtered or unexported fields
}
RawJob represents the raw job data that is returned by beanstalkd.
func NewEmptyJob ¶
func NewEmptyJob(cl CustomLogger) *RawJob
NewEmptyJob initialises a new empty RawJob with a custom logger. Useful for testing methods that log messages on the job.
func (*RawJob) GetConn ¶
func (job *RawJob) GetConn() *beanstalk.Conn
GetConn returns the beanstalk connection used to receive the job.
func (*RawJob) GetPriority ¶
GetPriority gets the priority of the job.
func (*RawJob) GetReleases ¶
GetReleases gets the count of release of the job.
func (*RawJob) GetReserves ¶
GetReserves gets the count of reserves of the job.
func (*RawJob) GetTimeouts ¶
GetTimeouts gets the count of timeouts of the job.
func (*RawJob) LogError ¶
func (job *RawJob) LogError(a ...interface{})
LogError function logs an error message regarding the job.
func (*RawJob) LogInfo ¶
func (job *RawJob) LogInfo(a ...interface{})
LogInfo function logs an info message regarding the job.
func (*RawJob) Release ¶
func (job *RawJob) Release()
Release function releases the job from the queue.
func (*RawJob) SetReturnDelay ¶
SetReturnDelay sets the return delay to use if a job is released back to queue.
func (*RawJob) SetReturnPriority ¶
SetReturnPriority sets the return priority to use if a job is released or buried.
type Worker ¶
type Worker struct {
// contains filtered or unexported fields
}
Worker represents a single process that is connecting to beanstalkd and is consuming jobs from one or more tubes.
func NewWorker ¶
NewWorker creates a new Worker process, but does not actually connect to beanstalkd server yet.
func (*Worker) Run ¶
Run starts one or more Worker threads based on the numWorkers value. If numWorkers is set to zero or less then 1 Worker is started.
func (*Worker) SetLogger ¶
func (w *Worker) SetLogger(cl CustomLogger)
SetLogger switches logging to use a custom Logger.
func (*Worker) SetNumWorkers ¶
SetNumWorkers sets the number of concurrent workers threads that should be started. Each thread establishes a separate connection to the beanstalkd server.
func (*Worker) SetUnmarshalErrorAction ¶
SetUnmarshalErrorAction defines what to do if there is an unmarshal error.
type WorkerClient ¶
type WorkerClient interface { Subscribe(tube string, cb Handler) SetNumWorkers(numWorkers int) SetLogger(cl CustomLogger) SetUnmarshalErrorAction(action string) Run(ctx context.Context) }
func NewMockWillDeleteWorker ¶
func NewMockWillDeleteWorker(tube, jobStr string) WorkerClient
func NewMockWillReleaseWorker ¶
func NewMockWillReleaseWorker(tube, jobStr string) WorkerClient
func NewMockWillTouchWorker ¶
func NewMockWillTouchWorker(tube, jobStr string) WorkerClient
func NewMockWorker ¶
func NewMockWorker() WorkerClient