turbine

module
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2025 License: Apache-2.0

README

Turbine

GoDoc Version Build Status Go Report Card Codecov

Fast, distributed message queue for Golang and MongoDB

There are many, many message queue tools. Perhaps you should use one of those instead. But Turbine fills a unique niche in that it:

  1. is a distributed queue that can be shared between several producer and consumer servers simultaneously
  2. supports swappable storage providers, with the first provider being MongoDB.
  3. supports fast, in-memory queues using Golang channels
  4. can retry failed jobs (with exponential backoff)
  5. can schedule jobs to in the future

Pushing Tasks to the Queue

// Create a Task with any parameters
task := queue.NewTask(
    "TaskName" // Task Name
    map[string]any{ // Parameters
        "foo": "bar",
        "baz": 42,
    }
)

// Publish the task to the Queue (this will happen quickly)
if err := queue.Publish(task); err != nil {
    // only errors related to queuing the task
}

Run the Queue

// Create and start a queue
q := queue.New(
    queue.WithConsumers(), // one or more "consumer" functions (below)
    queue.WithStorage(),   // optional storage adapter persists tasks 
    queue.WithTimeout(),   // other configs, like timeouts, retries, etc
)

Consuming Tasks from the Queue

When the turbine queue receives a task, it tries to execute it using one or more Consumer functions, which have the following signature:

func Consumer(name string, args map[string]any) (bool, error)

It is the consumer's job to identify the task by name, and decide if it can execute it or not. You can configure any number of consumer functions, so if a task is not recognized, it is passed to the next consumer until a match is found.

If the consumer DOES recognize the Task, then it executes the job and returns TRUE, along with an error result (or nil if the task was successful).

When a task return an error, it is re-queued according to Turbine's exponential backoff logic, and will be re-run at some point in the future.

Mongo Storage Provider

Turbine is built to support pluggable storage providers, so that any datastore can be used to manage queued tasks.

Currently, there is a single storage provider for Mongodb, which safely queues and dequeues tasks for any number of distributed queue workers. However, storage providers implement a simple interface, so it is simple to create a new storage provider for any back end that you want to use.

IMPORTANT: If you do not use a storage provider, the Turbine queue will still work, but will only work in memory. This means that items cannot be queued for future dates, and will not have a retry delay.

To initialize the storage provider, use the following code:

import (
    "github.com/benpate/turbine/queue"
    "github.com/benpate/turbine/queue_mongo"
)

// Create a MongoDB database connection
connection := mongo.Connect(...) 

// Pass the DB connection into the storage provider
provider := queue_mongo.New(connection)

// Initialize the queue with the storage provider
q := queue.New(queue.WithStorage(provider))

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL