werk

module
v0.0.0-...-0f1f97a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2021 License: Apache-2.0

README

werk

A basic template for Golang Machinery distributed worker queue.

Werk is a minimal refactoring of RichardKnop/machinery, an "asynchronous task queue/job queue based on distributed message passing." For those familiar, it's the Go version of Python's Celery.

Note: this is template, not a go package. Do not try to go get it.

What is an asynchronous task queue?

An asynchronous task queue uses a central message datastore (in this case, Redis) to allow one or more "callers" to register tasks in a central location, and one or more distributed "workers" to pull tasks from that queue, run them, and return the response.

Why use an asynchronous task queue?

Some http server tasks take longer than a single http request, so you need to instead start the task, run it in the background, and retrieve the output in the future.

In horizontally scaled systems, there's no guarantee that the load balancer will hit the same server instance when the client attempts to retrieve the output of a particular task. This is bad.

Using Redis to register tasks ensures that all http servers (the "callers") and the distributed "workers" can see all of the tasks, regardless of how many replicates are running.

Why this repo?

This refactor was done because the Machinery examples launch both the worker and the caller processes from the same mixed codebase, making it difficult to view the two as separate, independently scaleable services. The callers and workers can and should be ran on multiple machines.

Werk splits the worker and caller logic into two separate folders, with a shared "machine" package that manages the shared components.

How to run Werk

1) Clone this repo

Note: You will need to update the imports if you're running in a custom module.

The cloned repo will come with a go.mod file that registers this repo as github.com/erik-dunteman/werk by default. Both caller.go and worker.go import github.com/erik-dunteman/werk/machine using this module name.

If you plan to rename your go module, change the github.com/erik-dunteman/werk/machine imports to your new name.

2) Start the Redis server, which each worker and caller will connect to.

Via docker:

docker run -p 6379:6379 redis

If not using docker, follow these instructions to start a redis server

2) Start one (or more) workers

The worker folder contains the code to start a worker. Run:

go run worker/worker.go
3) Start one (or more) callers

The caller folder contains the code to start the caller. In this repo, it will place a new task into the queue, briefly wait for the worker to finish, and retrieve the output.

go run caller/caller.go

How to add tasks to Werk

1) Edit the shared machine package

You define your task handlers in tasks.go. First, define the function as xyzHandler:

// Do xyz stuff ...
func xyzHandler(t int) (bool, error) {
	// add logic here
	return true, nil
}

Second, register that task for caller/worker use by adding it to the Tasks map:

// Define the tasks for external use
var Tasks = map[string]interface{}{
	"sleep": sleepHandler,
	"hello": helloHandler,
	"xyz": xyzHandler, // remember the name "xyz", that's how we call this task later.
}

This is the core of how Werk works. Both the callers and the workers will be importing this definition by accessing the Tasks map. When rolling changes to prod, the callers and workers must both be updated so that they share the same defined Tasks.

You can optionally edit server behavior (timeouts, redis endpoint, etc) in config.go.

2) Edit the caller in caller.go

Call a newly defined task

// xyz
thisTask := tasks.Signature{
	Name: "xyz", // must be same task name as in machine.Tasks
	Args: []tasks.Arg{ // send in the task handler arguments here. In this case, it's just one int.
		{
			Type:  "int",
			Value: 1,
		},
	},
}

asyncResult, err := server.SendTaskWithContext(ctx, &thisTask)
if err != nil {
}
  
// Now normally here, we'd retrieve the result with a separate http server call,
// but we'll simulate that by waiting shortly, then retrieving the result below.
  
results, err := asyncResult.Get(time.Duration(time.Millisecond * 5))
if err != nil {
}

Doing more with Machinery

Consult with the original Machinery repo, and it's associated example to extend this template.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL