tasque

package
v0.0.0-...-5d0e3c9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 16, 2016 License: BSD-2-Clause Imports: 7 Imported by: 0

README

Tasque

Disque based remote task execution queue for Go

Taskque levereges Disque (https://github.com/antirez/disque) to create a simple and easy to use distributed task execution queue.

The idea is simple - you creat TaskHandlers - callbacks that receive Tasks - which are simple execution context objects. Then you run a Worker process that can handle multiple TaskHandlers by name. You can then enqueue tasks for the handlers from any machine in your cluster using a Client, and they get executed.

Example - Creating a Worker


import (
	"github.com/EverythingMe/go-disque/tasque"
	
	"crypto/md5"
	"fmt"
	"io"
	"net/http"
	"os"
	"time"
)

// Step 1: Define a handler that has a unique name
var Downloader = tasque.FuncHandler(func(t *tasque.Task) error {

	u := t.Params["url"].(string)
	res, err := http.Get(u)
	if err != nil {
		return err
	}
	defer res.Body.Close()

	fp, err := os.Create(fmt.Sprintf("/tmp/%x", md5.Sum([]byte(u))))
	if err != nil {
		return err
	}
	defer fp.Close()

	if _, err := io.Copy(fp, res.Body); err != nil {
		return err
	}
	fmt.Printf("Downloaded %s successfully\n", u)

	return nil
	
}, "download")



// Step 2: Registering the handler and starting a Worker

func main() {
	
	// Worker with 10 concurrent goroutines. In real life scenarios set this to much higher values...
	worker := tasque.NewWorker(10, "127.0.0.1:7711")

	// register our downloader
	worker.Handle(Downloader)
	
	// Run the worker
	worker.Run()

}


Example - Enqueuing a task


func main() {
	
	client := tasque.NewClient(5*time.Second, "127.0.0.1:7711")

	task := tasque.NewTask(Downloader.Id()).Set("url", "http://google.com")
	err := client.Do(task)
	if err != nil {
		panic(err)
	}
}

Documentation

Overview

Package tasque implements a simple task processing framework on top of disque. The idea is that you create "task handlers" that process tasks, similar to how http handlers process requests. Each task handler must have a unique name or id that is used to enqueue tasks in it, like a n http handler is routed to a URL.

You create a Worker that is a server for executing task handlers, and register task handlers in it.

Then,for enqueueing tasks, you create a client and enqueue them by name, optionally giving tasks properties.

Example
worker := NewWorker(4, "127.0.0.1:7711")

worker.Handle(Downloader)

go worker.Run()

client := NewClient(5*time.Second, "127.0.0.1:7711")

task := NewTask(Downloader.Id()).Set("url", "http://google.com")
err := client.Do(task)
if err != nil {
	panic(err)
}

time.Sleep(2000 * time.Millisecond)
Output:

Downloaded http://google.com successfully

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is used to enqueue tasks

func NewClient

func NewClient(enqueueTimeout time.Duration, addrs ...string) *Client

Create a new client for the given disque addrs. enqueueTimeout is the amount of time after which we fail

func (*Client) Delay

func (c *Client) Delay(t *Task, delay time.Duration) error

Delay puts the task in the queue for execution after the delay period of time. This also sets the jobId of the task

func (*Client) Do

func (c *Client) Do(t *Task) error

Do puts the task in the queue for immediate execution, and set the task's jobId

type FuncTaskHandler

type FuncTaskHandler struct {
	// contains filtered or unexported fields
}

FuncTaskHandler wraps a func as a complete handler

func FuncHandler

func FuncHandler(f func(*Task) error, id string) FuncTaskHandler

FuncHandler takes a func and its id and converts them into a FuncTaskHandler

func (FuncTaskHandler) Handle

func (fh FuncTaskHandler) Handle(t *Task) error

Handle calls the underlying func to handle the task

func (FuncTaskHandler) Id

func (fh FuncTaskHandler) Id() string

Id returns the id of the handler

type Task

type Task struct {
	Name   string
	Params map[string]interface{}

	EnqueueTime time.Time
	// contains filtered or unexported fields
}

A task represents the name and parameters of a task we want to execute on a worker. You can use it to pass parameters to the job executor

func NewTask

func NewTask(id string) *Task

Create a new task with a given id

func (*Task) Delay

func (t *Task) Delay(c *Client, d time.Duration) error

Delay executes the task, delayed for d duration

func (*Task) Do

func (t *Task) Do(c *Client) error

Execute the task on the client

func (Task) JobId

func (t Task) JobId() string

JobId The task's jobId

func (*Task) Set

func (t *Task) Set(k string, v interface{}) *Task

Set a property in the task

func (*Task) SetRetry

func (t *Task) SetRetry(d time.Duration) *Task

Set the retry timeout. This must be greater than 1. If the worker does not ACK the task in this timeout, disque will try to re-queue it

func (*Task) SetTTL

func (t *Task) SetTTL(ttl time.Duration) *Task

Set the task TTL - if it will not succeed after this time, disque will give up on it

type TaskHandler

type TaskHandler interface {
	Handle(*Task) error
	Id() string
}

TaskHandler is the interface that handlers must provide

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

A worker the runner for a worker process that runs goroutines as worker threads. You register TaskHandlers to handle tasks in the worker

func NewWorker

func NewWorker(numGoroutines int, addrs ...string) *Worker

Create a new worker that runs numGoroutines concurrently, connecting to disque addrs

func (*Worker) Handle

func (w *Worker) Handle(h TaskHandler)

Register a task handler in the worker. This shoudl

func (*Worker) Run

func (w *Worker) Run()

Run starts the worker and makes it request jobs

func (*Worker) Stop

func (w *Worker) Stop()

Stop stops the worker from processing new tasks

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL