jobber

package module
v3.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2021 License: Apache-2.0 Imports: 9 Imported by: 0

README

jobber

Little job queue thing for golang

A very basic example is provided in main

Overview

At a high level, you create a single Boss, add named Workers to the Boss, and add Jobs to the Workers.

Boss

The Boss is the central controller for any / all workers. You can have as many workers as you want under a single boss.

Worker

Worker is responsible for completing a specific type of work. The general idea is that workers have a unique name within a given Boss and they only do one type of job. You can, of course, do whatever you want.

The default Worker is called PitDroid and can be used out of the box or as a prototype for your own worker implementation.

Job

A Job can be any unit of work you wish to executed by a Worker. It's interface is designed to be minimal. The output of Process() is directly passed to the channel returned by RespondTo(), unless the Worker has been terminated in which case it is recommended an standard error of some sort be passed in.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	WorkerStopping     = errors.New("worker is stopping")
	WorkerTerminated   = errors.New("worker terminated")
	WorkerJobQueueFull = errors.New("worker job queue full")
)

Functions

This section is empty.

Types

type Boss

type Boss struct {
	// contains filtered or unexported fields
}

Boss controls the life of the workers

func NewBoss

func NewBoss(conf *BossConf) *Boss

NewBoss will create a new Boss with a background context

func (*Boss) AddJob

func (b *Boss) AddJob(workerName string, j Job) error

AddWork will push a new job to a worker's queue with no ttl

func (*Boss) AddJobCtx

func (b *Boss) AddJobCtx(ctx context.Context, workerName string, j Job) error

func (*Boss) HasWorker

func (b *Boss) HasWorker(name string) bool

func (*Boss) HireWorker

func (b *Boss) HireWorker(name string, queueLength int) error

HireWorker will attempt to hire a new worker using the specified HiringAgency and add them to the job pool.

func (*Boss) PlaceWorker

func (b *Boss) PlaceWorker(worker Worker) error

PlaceWorker will attempt to add a hired worker to the job pool, if one doesn't already exist with that name

func (*Boss) RemoveStaleWorkers

func (b *Boss) RemoveStaleWorkers(deadline time.Time) int

RemoveStaleWorkers terminate workers that have not processed a job after the provided deadline. Note: this requires that workers implement the StatWorker interface. Otherwise, this func does nothing but block for a short while.

func (*Boss) RemoveWorker

func (b *Boss) RemoveWorker(worker Worker)

func (*Boss) ScaleDownAll

func (b *Boss) ScaleDownAll()

ShutdownAll will attempt to gracefully shutdown, completing all currently queued jobs but no longer accepting new ones

func (*Boss) ScaleDownWorker

func (b *Boss) ScaleDownWorker(workerName string) error

ScaleDownWorker will tell a worker to finish up their queue then remove them

func (*Boss) TerminateAll

func (b *Boss) TerminateAll()

TerminateAll will immediately fire all current workers, dropping all currently queued jobs.

func (*Boss) TerminateWorker

func (b *Boss) TerminateWorker(workerName string) (err error)

TerminateWorker will remove the worker immediately, effectively cancelling all queued work.

func (*Boss) Worker

func (b *Boss) Worker(name string) (worker Worker)

Worker will attempt to return to you a worker by name

type BossConf

type BossConf struct {
	Log *log.Logger
}

func DefaultConfig

func DefaultConfig() *BossConf

type HR

type HR func(Worker)

HR is where workers are sent when they are done and should be removed from the Boss

type HiringAgencyFunc

type HiringAgencyFunc func(name string, queueLength int) Worker
var HiringAgency HiringAgencyFunc = NewPitDroid

HiringAgency allows you to create your own worker hiring function in case you don't like PitDroids.

type Job

type Job interface {
	// Process must contain whatever logic is needed to perform the job, returning whatever error is generated while
	// processing (if any)
	Process() error
	// RespondTo must be passed whatever output came from Process()
	RespondTo() chan<- *JobResponse
}

Job represents any unit of work that you'd like to task a Worker with. Any context handling should be done in your Process() implementation.

type JobResponse

type JobResponse struct {
	JobType string
	Placed  time.Time
	Start   time.Time
	End     time.Time
	Worker  string
	Err     error
	// contains filtered or unexported fields
}

JobResponse wraps the output of a job upon completion, whether it is successful or not.

type LoggingWorker

type LoggingWorker interface {
	Worker
	SetLogger(*log.Logger)
}

type ManagedWorker

type ManagedWorker interface {
	Worker
	SetHR(HR)
}

type PitDroid

type PitDroid struct {
	// contains filtered or unexported fields
}

PitDroids are simple workers that will do as instructed.

func (*PitDroid) AddJob

func (pd *PitDroid) AddJob(ctx context.Context, j Job) error

AddJob will append a job to this worker's queue. The context is used to limit how long a worker can try to push a job onto its queue before failing. It does not limit the job's execution time.

func (*PitDroid) Length

func (pd *PitDroid) Length() int

Length returns the current number of items this worker has in its queue

func (*PitDroid) Name

func (pd *PitDroid) Name() string

Name returns the name of this worker

func (*PitDroid) ScaleDown

func (pd *PitDroid) ScaleDown()

ScaleDown will tell this worker to stop accepting new jobs, complete all jobs left in its queue, then send itself to HR

func (*PitDroid) SetLogger

func (pd *PitDroid) SetLogger(log *log.Logger)

func (*PitDroid) Statistics

func (pd *PitDroid) Statistics() WorkerStatistics

func (*PitDroid) Terminate

func (pd *PitDroid) Terminate()

Terminate will tell this worker to stop accepting new jobs, flush all current jobs from its queue, then send itself to HR

type StatWorker

type StatWorker interface {
	Worker
	Statistics() WorkerStatistics
}

type Worker

type Worker interface {
	// Name must return the name of worker.  This must be unique across all workers managed by the boss
	Name() string
	// Length must return the size of the current queue of work for this worker.
	Length() int
	// AddJob must attempt to add a new job to the worker's queue, failing if the worker has been told to stop
	AddJob(context.Context, Job) error
	// ScaleDown must mark the worker as stopped, process any and all jobs remaining in it's queue
	ScaleDown()
	// Terminate must send an error message to all remaining jobs in this worker's queue
	Terminate()
}

func NewPitDroid

func NewPitDroid(name string, queueLength int) Worker

NewPitDroid will return to you a new PitDroid, the default worker prototype for jobber

type WorkerStatistics

type WorkerStatistics struct {
	WorkerName            string    `json:"worker_name"`
	JobsCompleted         uint64    `json:"jobs_completed"`
	LastJobCompletionTime time.Time `json:"last_job_completion_time"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL