nanoq

package module
v0.0.0-...-c445dbc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2024 License: 0BSD Imports: 18 Imported by: 0

README

NanoQ

NanoQ is a MySQL-powered task queue, implemented in ~400 lines of code.

While it can be used as-is, you are encouraged to copy-paste it into your project and customize it according to your tastes and needs.

Requirements

Due to its use of SKIP LOCKED, NanoQ requires MySQL 8.0 or newer / MariaDB 10.6.0 or newer.

Features

  1. Delayed tasks
  2. Unique tasks (automatic fingerprinting based on task type and payload)
  3. Per-task timeouts
  4. Retries (with exponential backoff and jitter)
  5. Processor with global and per-handler middleware

Failed and completed tasks are not retained in the database.

Usage

Add table.sql to your migrations.

Define a task and a handler to process it:

type recalculateStockPayload struct {
	ProductID string `json:"product_id"`
}

func NewRecalculateStockTask(productID string) nanoq.Task {
	payload, _ := json.Marshal(recalculateStockPayload{
		ProductID: productID,
	})
	return nanoq.NewTask("recalculate-stock", payload, nanoq.WithTimeout(15*time.Second), nanoq.WithScheduledIn(5 * time.Minute))
}

// This could also be a method on a Handler struct containing dependencies.
func RecalculateStock(logger *slog.Logger) nanoq.Handler {
	return func(ctx context.Context, t nanoq.Task) error {
		var payload recalculateStockPayload
		if err := json.Unmarshal(t.Payload, &payload); err != nil {
			return fmt.Errorf("json unmarshal: %v: %w", err, nanoq.ErrSkipRetry)
		}

		// Do your thing here.

		logger.Info("Task completed",
			slog.String("task_type", "recalculate-stock"),
			slog.String("product_id", payload.ProductID),
		)

		return nil
	}
}

Create a task (usually in an HTTP handler):

// Usually provided to the HTTP handler.
queueClient := nanoq.Client(db)

// The transaction (tx) usually already exists. Otherwise, queueClient.RunTransaction() can be used to start one.
t := NewRecalculateStockTask("my-product")
if err := queueClient.CreateTask(ctx, tx, t); err != nanoq.ErrDuplicateTask {
	// Handle unexpected errors.
	// Ignores ErrDuplicateTask because multiple HTTP requests can require the same stock recalculation.
}

Finally, initialize the processor:

// logger is an existing *slog.Logger.
processor := nanoq.NewProcessor(nanoq.NewClient(db), logger)

// The default retry policy uses an exponential backoff with jitter,
// but callers can provide their own if necessary.
processor.RetryPolicy(func (t nanoq.Task) {
	// First retry in 5s, every next retry in 1h.
	if t.Retries == 0 {
		return 5 * time.Second
	}
	return 1 * time.Hour
})
processor.OnError(func(ctx context.Context, t nanoq.Task, err error) {
	// Log each failed task. 
	// Idea: Send to Sentry when t.Retries == t.MaxRetries.
	logger.Error(err.Error(),
		slog.String("task_type", t.Type),
		slog.String("attempt", fmt.Sprintf("%v/%v", t.Retries, t.MaxRetries)),
	)
})
processor.Handle("recalculate-stock", RecalculateStock(logger))

// Use as many workers as we have CPUs.
processor.Run(context.Background(), runtime.NumCPU(), 5 * time.Second)

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrDuplicateTask indicates that an identical task already exists in the queue.
	// Tasks are considered identical if they have the same fingerprint.
	ErrDuplicateTask = errors.New("duplicate task")

	// ErrNoTasks indicates that no tasks are available for processing.
	ErrNoTasks = errors.New("no tasks available")

	// ErrSkipRetry indicates that the task should not be retried.
	ErrSkipRetry = errors.New("skip retry")

	// ErrTaskTimeout indicates that the task timeout has been exceeded.
	ErrTaskTimeout = errors.New("task timeout exceeded")
)

Functions

func DefaultRetryPolicy

func DefaultRetryPolicy(t Task) time.Duration

DefaultRetryPolicy uses an exponential base delay with jitter. Approximate examples: 7s, 50s, 5min, 20min, 50min, 2h, 4h, 9h, 16h, 27h.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a database-backed queue client.

func NewClient

func NewClient(db *sqlx.DB) *Client

NewClient creates a new client.

func (*Client) ClaimTask

func (c *Client) ClaimTask(ctx context.Context) (Task, error)

ClaimTask claims a task for processing.

Returns ErrNoTasks if no tasks are available.

func (*Client) CreateTask

func (c *Client) CreateTask(ctx context.Context, tx *sqlx.Tx, t Task) error

CreateTask creates the given task.

Expected to run in an existing transaction. Returns ErrDuplicateTask if a task with the same fingerprint already exists.

func (*Client) DeleteTask

func (c *Client) DeleteTask(ctx context.Context, t Task) error

DeleteTask deletes the given task.

func (*Client) ReleaseTask

func (c *Client) ReleaseTask(ctx context.Context, t Task) error

ReleaseTask releases the given task, allowing it to be claimed again.

func (*Client) RetryTask

func (c *Client) RetryTask(ctx context.Context, t Task, retryIn time.Duration) error

RetryTask schedules a retry of the given task.

func (*Client) RunTransaction

func (c *Client) RunTransaction(ctx context.Context, fn func(tx *sqlx.Tx) error) error

RunTransaction runs the given function in a transaction.

The transaction will be rolled back if the function returns an error. Otherwise, it will be committed.

type ErrorHandler

type ErrorHandler func(ctx context.Context, t Task, err error)

ErrorHandler handles a task processing error.

type Handler

type Handler func(ctx context.Context, t Task) error

Handler processes tasks of a specific type.

Should return nil if the processing of a task was successful. If an error is returned, the task will be retried after a delay, until max retries are reached.

Return an ErrSkipRetry error to skip any remaining retries and remove the task from the queue.

type Middleware

type Middleware func(next Handler) Handler

Middleware wrap a handler in order to run logic before/after it.

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor represents the queue processor.

func NewProcessor

func NewProcessor(client *Client, logger *slog.Logger) *Processor

NewProcessor creates a new processor.

func (*Processor) Handle

func (p *Processor) Handle(taskType string, h Handler, ms ...Middleware)

Handle registers the handler for a task type.

func (*Processor) OnError

func (p *Processor) OnError(h ErrorHandler)

OnError registers the given error handler.

func (*Processor) RetryPolicy

func (p *Processor) RetryPolicy(rp RetryPolicy)

RetryPolicy registers the given retry policy.

func (*Processor) Run

func (p *Processor) Run(ctx context.Context, concurrency int, shutdownTimeout time.Duration)

Run starts the processor and blocks until a shutdown signal (SIGINT/SIGTERM) is received.

Once the shutdown signal is received, workers stop claiming new tasks. The tasks that are still being processed are then given until shutdownTimeout to complete, after which they are stopped (via canceled context).

func (*Processor) Use

func (p *Processor) Use(m Middleware)

Use registers a middleware that runs for all task types.

type RetryPolicy

type RetryPolicy func(t Task) time.Duration

RetryPolicy determines the retry delay for a given task.

type Task

type Task struct {
	ID             string     `db:"id"`
	Fingerprint    string     `db:"fingerprint"`
	Type           string     `db:"type"`
	Payload        []byte     `db:"payload"`
	Retries        uint8      `db:"retries"`
	MaxRetries     uint8      `db:"max_retries"`
	TimeoutSeconds int32      `db:"timeout_seconds"`
	CreatedAt      time.Time  `db:"created_at"`
	ScheduledAt    time.Time  `db:"scheduled_at"`
	ClaimedAt      *time.Time `db:"claimed_at"`
}

Task represents a task.

func NewTask

func NewTask(taskType string, payload []byte, opts ...TaskOption) Task

NewTask creates a new task.

func (Task) Timeout

func (t Task) Timeout() time.Duration

Timeout returns the task timeout as a time.Duration.

type TaskOption

type TaskOption func(t *Task)

TaskOption represents a task option.

func WithFingerprintData

func WithFingerprintData(data []byte) TaskOption

WithFingerprintData provides the data used to fingerprint the task.

By default, the entire task payload is used.

func WithMaxRetries

func WithMaxRetries(maxRetries uint8) TaskOption

WithMaxRetries allows the task to be retried the given number of times.

Defaults to 10. Use 0 to disallow retries.

func WithScheduledAt

func WithScheduledAt(scheduledAt time.Time) TaskOption

WithScheduledAt schedules the task at the given time.

func WithScheduledIn

func WithScheduledIn(scheduledIn time.Duration) TaskOption

WithScheduledIn schedules the task after the given duration.

func WithTimeout

func WithTimeout(timeout time.Duration) TaskOption

WithTimeout sets the task timeout.

Must be at least 1s. Defaults to 60s.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL