taskqueue

package module
v0.0.0-...-00b9358 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2024 License: MIT Imports: 12 Imported by: 0

README

TaskQueue

A flexible and robust background task queue system for Go applications, built with PostgreSQL and GORM.

Features

  • Multiple Queue Support: Organize tasks in different queues
  • Priority Levels: Three priority levels (Low, Normal, High)
  • Task Dependencies: Tasks can depend on other tasks
  • Flexible Scheduling:
    • Immediate execution
    • Delayed execution (executeAt)
    • Periodic tasks
  • Task Management:
    • Task cancellation
    • Queue pause/resume
    • Task status tracking
  • Worker Management:
    • Configurable number of workers
    • Dynamic worker scaling
    • Graceful shutdown
  • Advanced Features:
    • Pre and Post execution hooks
    • Task retention management
    • Database-backed persistence
    • JSON payload support
    • Task dependencies resolution
  • Timeout & Retry Handling:
    • Task-level execution timeout
    • Exponential backoff retry strategy
    • Configurable retry parameters
    • Error tracking and reporting

Requirements

  • Go 1.16 or higher
  • PostgreSQL 10 or higher
  • Dependencies:
    github.com/lib/pq
    gorm.io/gorm
    gorm.io/driver/postgres
    

Installation

go get github.com/choirulanwar/taskqueue

Quick Start

package main

import (
    "context"
    "time"
    "github.com/choirulanwar/taskqueue"
    "gorm.io/gorm"
    "gorm.io/driver/postgres"
)

func main() {
    // Connect to PostgreSQL
    db, err := gorm.Open(postgres.Open("host=localhost user=postgres dbname=taskqueue"), &gorm.Config{})
    if err != nil {
        panic(err)
    }

    // Create configuration
    config := &taskqueue.Config{
        DB:              db,
        NumWorkers:      3,
        RetentionPeriod: 24 * time.Hour,
        DefaultTimeout:  5 * time.Minute,
        DefaultMaxRetries: 3,
        DefaultInitialBackoff: 1 * time.Second,
        DefaultMaxBackoff: 1 * time.Hour,
        DefaultBackoffFactor: 2.0,
    }

    // Create client and dispatcher
    client, _ := taskqueue.NewClient(config)
    dispatcher, _ := taskqueue.NewDispatcher(config)

    // Start the dispatcher
    ctx := context.Background()
    dispatcher.Start(ctx)

    // Enqueue a task
    payload := []byte(`{"key": "value"}`)
    task, err := client.EnqueueTask(ctx, "default_queue", payload,
        taskqueue.WithPriority(taskqueue.PriorityHigh),
        taskqueue.WithTimeout(30 * time.Second),
        taskqueue.WithRetry(5, 2*time.Second, 30*time.Minute, 2.0),
    )
}

Usage Examples

Basic Task Enqueuing
// Simple task
task, err := client.EnqueueTask(ctx, "email_queue", payload)

// High priority task
task, err := client.EnqueueTask(ctx, "email_queue", payload,
    taskqueue.WithPriority(taskqueue.PriorityHigh))

// Delayed task
executeAt := time.Now().Add(1 * time.Hour)
task, err := client.EnqueueTask(ctx, "email_queue", payload,
    taskqueue.WithExecuteAt(executeAt))
Task Dependencies
// Task that depends on another task
task, err := client.EnqueueTask(ctx, "process_queue", payload,
    taskqueue.WithDependencies([]string{previousTaskID}))
Periodic Tasks
// Schedule a task to run every hour
task, err := client.SchedulePeriodicTask(ctx, "cleanup_queue", payload,
    1*time.Hour,
    taskqueue.WithRetentionTime(7*24*time.Hour))
Queue Management
// Pause a queue
err := client.PauseQueue(ctx, "email_queue")

// Resume a queue
err := client.ResumeQueue(ctx, "email_queue")

// List tasks in a queue
tasks, err := client.ListTasks(ctx, "email_queue")

// Cancel a specific task
err := client.CancelTask(ctx, taskID)
Worker Management
// Add a new worker
err := dispatcher.AddWorker()

// Remove a worker
err := dispatcher.RemoveWorker()
Using Hooks
config := &taskqueue.Config{
    DB:         db,
    NumWorkers: 3,
    PreHooks: []taskqueue.Hook{
        func(ctx context.Context, task *taskqueue.Task) error {
            log.Printf("Processing task: %s", task.ID)
            return nil
        },
    },
    PostHooks: []taskqueue.Hook{
        func(ctx context.Context, task *taskqueue.Task) error {
            log.Printf("Completed task: %s", task.ID)
            return nil
        },
    },
}
Task Retention
// Set retention time when enqueueing task
task, err := client.EnqueueTask(ctx, "email_queue", payload,
    taskqueue.WithRetentionTime(24*time.Hour))

// Cleanup old tasks
err := client.CleanupTasks(ctx)
Timeout and Retry Mechanism
Timeout Handling
  • Each task can have its own execution timeout
  • If not specified, uses DefaultTimeout from config
  • Task is automatically cancelled when timeout is reached
  • Failed tasks can be retried based on retry configuration
Retry with Exponential Backoff
  • Configurable retry attempts (MaxRetries)
  • Initial delay between retries (InitialBackoff)
  • Maximum delay cap (MaxBackoff)
  • Exponential growth factor (BackoffFactor)

Example backoff calculation with InitialBackoff=1s and BackoffFactor=2.0:

Retry #1: 1s
Retry #2: 2s
Retry #3: 4s
Retry #4: 8s
Retry #5: 16s

Task Status Lifecycle

  1. TaskStatusPending: Initial state when task is created
  2. TaskStatusRunning: Task is being processed by a worker
  3. TaskStatusCompleted: Task has been successfully completed
  4. TaskStatusFailed: Task failed during execution
  5. TaskStatusCanceled: Task was manually canceled

Best Practices

  1. Error Handling: Always check returned errors from all operations
  2. Graceful Shutdown: Implement proper shutdown handling in your application
  3. Monitoring: Use hooks to implement monitoring and logging
  4. Task Payload: Keep task payloads small and structured
  5. Dependencies: Avoid creating circular dependencies between tasks
  6. Retention: Set appropriate retention periods based on your needs
  7. Timeout Configuration: Set reasonable timeouts based on task complexity
  8. Retry Strategy: Use shorter initial backoff for quick-retry tasks

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

View Source
const (
	PriorityLow    = 1
	PriorityNormal = 2
	PriorityHigh   = 3
)

Priority levels for tasks

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Task management
	EnqueueTask(ctx context.Context, queueName string, payload []byte, opts ...TaskOption) (*Task, error)
	CancelTask(ctx context.Context, taskID string) error
	GetTask(ctx context.Context, taskID string) (*Task, error)
	ListTasks(ctx context.Context, queueName string) ([]*Task, error)

	// Queue management
	PauseQueue(ctx context.Context, queueName string) error
	ResumeQueue(ctx context.Context, queueName string) error

	// Periodic tasks
	SchedulePeriodicTask(ctx context.Context, queueName string, payload []byte, interval time.Duration, opts ...TaskOption) (*Task, error)

	// Cleanup
	CleanupTasks(ctx context.Context) error

	// GetConfig returns the current configuration
	GetConfig() *Config
}

Client interface defines methods for interacting with the task queue

func NewClient

func NewClient(config *Config) (Client, error)

NewClient creates a new task queue client

type Config

type Config struct {
	DB              *gorm.DB
	NumWorkers      int
	RetentionPeriod time.Duration
	PreHooks        []Hook
	PostHooks       []Hook

	ServerID              string        // Unique ID for this server instance
	LockTTL               time.Duration // How long a task lock is valid
	DefaultTimeout        time.Duration
	DefaultMaxRetries     int
	DefaultInitialBackoff time.Duration
	DefaultMaxBackoff     time.Duration
	DefaultBackoffFactor  float64
	// contains filtered or unexported fields
}

Config represents the configuration for the task queue

type Dispatcher

type Dispatcher interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	AddWorker() error
	RemoveWorker() error
}

Dispatcher interface defines methods for task dispatching and worker management

func NewDispatcher

func NewDispatcher(config *Config) (Dispatcher, error)

NewDispatcher creates a new task dispatcher

type Hook

type Hook func(ctx context.Context, task *Task) error

Hook represents a function that can be executed before or after task execution

type Task

type Task struct {
	ID            string          `gorm:"primarykey"`
	QueueName     string          `gorm:"index"`
	Status        TaskStatus      `gorm:"index"`
	Priority      int             `gorm:"index"`
	Payload       json.RawMessage `gorm:"type:jsonb"`
	Dependencies  pq.StringArray  `gorm:"type:text[]"` // Using pq.StringArray for proper PostgreSQL array handling
	ExecuteAt     *time.Time      `gorm:"index"`
	RetentionTime *time.Time
	CreatedAt     time.Time
	UpdatedAt     time.Time
	DeletedAt     gorm.DeletedAt `gorm:"index"`
	LockedBy      string         `gorm:"index"` // Server ID that locked this task
	LockedAt      *time.Time     // When the task was locked
	LockTimeout   *time.Time     // When the lock expires

	// Timeout settings
	Timeout time.Duration // Maximum execution time for the task

	// Retry settings
	RetryCount     int           `gorm:"default:0"` // Current retry attempt
	MaxRetries     int           `gorm:"default:3"` // Maximum number of retry attempts
	LastError      string        // Last error message
	NextRetryAt    *time.Time    `gorm:"index"` // When to retry next
	InitialBackoff time.Duration // Initial backoff duration
	MaxBackoff     time.Duration // Maximum backoff duration
	BackoffFactor  float64       // Multiplier for exponential backoff
}

Task represents a background task

type TaskOption

type TaskOption func(*Task)

TaskOption represents options for task creation

func WithDependencies

func WithDependencies(dependencies []string) TaskOption

WithDependencies sets the dependencies for a task

func WithExecuteAt

func WithExecuteAt(executeAt time.Time) TaskOption

WithExecuteAt sets the execution time for a task

func WithPriority

func WithPriority(priority int) TaskOption

WithPriority sets the priority of a task

func WithRetentionTime

func WithRetentionTime(retention time.Duration) TaskOption

WithRetentionTime sets the retention time for a task

func WithRetry

func WithRetry(maxRetries int, initialBackoff, maxBackoff time.Duration, backoffFactor float64) TaskOption

WithRetry sets the retry configuration for a task

func WithTimeout

func WithTimeout(timeout time.Duration) TaskOption

WithTimeout sets the maximum execution time for a task

type TaskStatus

type TaskStatus string

TaskStatus represents the current state of a task

const (
	TaskStatusPending   TaskStatus = "pending"
	TaskStatusRunning   TaskStatus = "running"
	TaskStatusCompleted TaskStatus = "completed"
	TaskStatusFailed    TaskStatus = "failed"
	TaskStatusCanceled  TaskStatus = "canceled"
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL