queue

package
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2023 License: Apache-2.0 Imports: 10 Imported by: 0

README

Task Queue

This is an implementation of a task queue, which is a queue of tasks which are executed in order, with retry and expiry options. In order to create a task queue, use the New function with a Connection Pool and any additional options:


import (
    queue "github.com/mutablelogic/go-accessory/pkg/queue"
    . "github.com/mutablelogic/go-accessory"
)

func main() {
    var pool Pool // This is the connection pool
    var opts []queue.Option // Queue options

    queue := queue.New(pool, opts...)
    if queue == nil {
        panic("Unable to create queue")
    }

    // Use the task queue
}

The set of options you can pass are as follows:

Option Description
queue.OptNamespace(string) Set the namespace to use for the queue
queue.OptMaxAge(time.Duration) The maximum age for any task before expiry. By default, a task is retried without a deadline
queue.OptMaxRetries(uint) The maximum number of retries for any task. By default, a task is retried without a maximum retry count
queue.OptWorkers(uint, time.Duration) The number of simultaneous task workers, and the maximum time a worker is allowed to run for, or zero for no deadline. By default, the number of workers equals the number of CPU cores and there is no task deadline
queue.OptBackoff(time.Duration) The backoff time on task failure. On first retry is made after the backoff period, the second after two times the backoff period and so forth

Where a task queue is used on a single host, the tasks will be spread across the available workers. Where there are task queues executing on multiple hosts (using a MongoDB database, rather than sqlite), the tasks will be spread across the available workers on all hosts.

Tasks

A task consists of:

  • A unique identifier ("Key") which is used to identify the task;
  • Optionally, a task priority. Tasks are executed in priority order, with higher numbers being higher priority;
  • When the task was created, and the age of the task;
  • When the task expires if it is not completed, and the number of retries that have be attempted to complete the task;
  • The last error that was returned when attempting to complete the task;
  • A set of "tags" to identify the parameters used to execute the task.

In order to create a new task, use the queue.New function:

    var tags []Tag // The set of tags for the task
    task, err := queue.New(context.TODO(), tags...)
    if err != nil {
        panic("Unable to insert task")
    }

Workers

You define a task worker as a function passed to the queue.Run method:

    var worker queue.WorkerFn // The worker function

    if err := queue.Run(context.TODO(), worker); err != nil {
        panic(err)
    }

The signature of a worker function is func(context.Context, Task) error. The context should allow a worker to be cancelled and return an error for long-running tasks exceeding a deadline. The worker should return an error if a task could not be completed successfully, or nil if the task was completed successfully.

Your worker function can obtain task parameters using the task.Tags() method. The tag values are currently always strings, which can then be parsed into other types.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewQueue

func NewQueue(pool Pool, opts ...Option) TaskQueue

Create a new queue with the given namespace

func NewTask

func NewTask(namespace string) *task

NewTask creates a new task with the given namespace, priority, and when the task should expire. The task is scheduled to run immediately.

func NewTaskWithPriority

func NewTaskWithPriority(namespace string, priority int) *task

NewTaskWithPriority creates a new task with the given namespace and priority

func NewTaskWithPriorityAndExpiresAt

func NewTaskWithPriorityAndExpiresAt(namespace string, priority int, expires_at time.Time) *task

NewTaskWithPriorityAndExpiresAt creates a new task with the given namespace, priority, and when the task should expire.

Types

type Option

type Option func(*queue) error

func OptBackoff

func OptBackoff(v time.Duration) Option

Set the retry backoff value. The first retry is attempted after the backoff value, the second retry is attempted after 2*backoff, and so forth, until either the task age is reached or the maximum number of retires is reached.

func OptMaxAge

func OptMaxAge(v time.Duration) Option

Set the database name

func OptMaxRetries

func OptMaxRetries(v uint) Option

Set the maximum number of retries

func OptNamespace

func OptNamespace(v string) Option

Set the queue namespace

func OptWorkers

func OptWorkers(n uint, deadline time.Duration) Option

Set the maximum number of workers and work deadline

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL