Introduction
Go has lots of synchronization mechanisms ranging from language primitives such as mutexes, channels, atomic functions,
and to a more complicated components such as content.Context. Unfortunately,
using them to solve problems arising in day-to-day programming might be quite challenging, especially for novices.
The goal of the Job design pattern and this implementation is to provide an easy-to-use and solid foundation
for solving problems involving concurrent executions and their control. The Job pattern in many cases can be viewed as
an alternative to content.Context, though it's not meant to completely replace it.
Documentation
What is a Job?
A job is a set of concurrently running tasks, execution of which depends on each other. If one task fails, the whole job
execution fails too.
myjob := job.NewJob(nil)
myjob.AddTask(task1)
myjob.AddTask(task2)
<-myjob.Run()
// Let's process the result
What is a Task?
A single task consists of the three routines: an initialization routine, a recurrent routine, and a finalization routine:
func (stream *stream) ReadOnStreamTask(j job.Job) (job.Init, job.Run, job.Finalize) {
init := func(task job.Task){
// Do some initialization
}
run := func(task job.Task) {
read(stream, task)
task.Tick()
}
fin := func(task job.Task) {
readCancel(stream, task)
}
return init, run, fin
}
The recurrent routine is running in an indefinite loop. It represents well-known for { select { } }
statements in
Go. The recurrent routine calls three special methods:
- .Tick() - to proceed task execution.
- .Done() - to finish task execution (or .FinishJob() to finish job execution as well).
- .Idle() - to tell that a task has nothing to do, and as a result it might be finished by reaching the idle timeout.
Tasks can assert some conditions, replacing if err != nil { panic(err) }
by a more terse way:
func (m *MyTask) MyTask(j job.JobInterface) (job.Init, job.Run, job.Finalize) {
run := func(task *job.TaskInfo) {
_, err := os.Open("badfile")
task.Assert(err)
}
}
Every failed assertion will result in the cancellation of job execution, and invocation of the finalization routines of all
tasks of the job being cancelled.
There are two types of tasks: an ordinary task (or recurrent), and oneshot task. The main purpose of an oneshot task is
to start off its job execution once the task is finished:
mainJob := j.NewJob(nil)
mainJob.AddOneshotTask(mngr.ConnectTask)
mainJob.AddTask(netmanager.ReadTask)
mainJob.AddTask(netmanager.WriteTask)
mainJob.AddTask(imgResizer.ScanForImagesTask)
mainJob.AddTask(imgResizer.SaveResizedImageTask)
<-mainJob.Run()
In the example above the job won't start until a network connection established. A job can have only one oneshot task.
For data sharing tasks should employ (although it's not an obligation) a ping/pong synchronization using two channels,
where the first one is being used to receive data and the second one - to notify the sender that data processing is completed.
run := func(task job.Task) {
select {
case data := <- p.conn.Upstream().RecvRaw(): // Receive data from upstream server
p.conn.Downstream().Write() <- data // Write data to downstream server
p.conn.Downstream().WriteSync() // sync with downstream data receiver
p.conn.Upstream().RecvRawSync() // sync with upstream data sender
}
task.Tick()
}
To prevent a task from being blocked for good, be sure to close all channels it's using in its finalization routine.
Real life example
Now, when you have a basic understanding, let's put the given pattern to use and take a look at a
real life example:
the implementation of a reverse proxy working as layer 4 load balancer, a backend server resizing images, and a simple
client that would scan a specified directory for images and send them through the proxy server for resizing.
The code will speak for itself, so that you will quickly get the idea of how to use the given pattern.
API reference
Public functions
- NewJob(value interface{}) *job - creates a new job with the given job value.
- RegisterDefaultLogger(logger Logger) - registers a fallback logger for all jobs.
Job
Task
- Being called by the recurrent routine.