lq

package module
v0.0.0-...-8c9f8af Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2024 License: MIT Imports: 11 Imported by: 0

README

lq

lightweight queue runner package inspired by Laravel queue for go. since using gorm it supports all databases gorm supports. (Mysql, PostgreSQL, SQLite, SQL Server and TiDB) (https://gorm.io/docs/connecting_to_the_database.html) the mechanism for loading queues is pulling from database.

Features

  • Support Mysql, PostgreSQL, Sqlite, SQL Server and TiDB since based on gorm
  • Support multiple workers
  • Graceful shutdown

Sample Implementation

package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"os"
	"os/signal"
	"path/filepath"
	"syscall"
	"time"

	lq "github.com/oneapplab/lq"
)

// main queue cmd which works similar to laravel queue runner, listen on database
func main() {
	root, _ := os.Getwd()
	path := filepath.Join(root, "storage", "jobs.db") // get sqlite path from command line -sqlite, default '../storage/jobs.db'
	workerPtr := flag.Uint("worker", 5, "an uint8")   // get workers count from command line -worker, default 5
	flag.Parse()

	// get syscall's related events for graceful shutdown
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
	defer stop()

	config := lq.Init(nil, path, 5, 10)
	job := (&lq.Job{}).Init(config)

	// Define how to process jobs
	jobProcessor := lq.NewJobProcessor(
		&ctx,
		GenericHandler,    // handler
		"default",         // queue
		uint8(*workerPtr), // 5 workers
		config,            // you can pass *gorm.DB or just path file you want sqlite database being stored
	)

	// to re-run failed jobs with max attempts 5 (each failed jobs with attempts less than 5 would run again)
	job.DispatchFailedJobs("default", 5)

	// sample:
	// Add some jobs to the queue
	for i := 0; i < 100; i++ {
		_, err := job.Create("default", "test-action", fmt.Sprintf(`{"task":"send_another_email","user_id":%d}`, i+1), time.Now())

		if err != nil {
			fmt.Printf("Failed to add job: %v\n", err)
		}
	}

	// Start processing jobs in the background
	jobProcessor.Start()
	<-ctx.Done()
	jobProcessor.Stop()
}

// GenericHandler passed to job processor to decide which method should handle related job
func GenericHandler(job lq.JobModel, workerID uint8) error {
	switch job.Handler {
	case "test-handler":
		return TestHandler(job, workerID)
	default:
		return TestHandler(job, workerID)
	}
}

func TestHandler(job lq.JobModel, workerID uint8) error {
	time.Sleep(3 * time.Second)
	log.Printf("--------- Test Handled: ---- %s ----- worker ID: %d", job.ID.String(), workerID)

	return nil
}

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	// contains filtered or unexported fields
}

func GetConfig

func GetConfig() (*Config, error)

func Init

func Init(db *gorm.DB, sqliteDBPath string, sleep uint8, interval uint16) *Config

type Job

type Job struct {
	// contains filtered or unexported fields
}

func (*Job) Create

func (j *Job) Create(
	queue string,
	handler string,
	payload string,
	available time.Time,
) (*JobModel, error)

Create creates a new job in the database

func (*Job) DeleteOlderJobs

func (j *Job) DeleteOlderJobs(
	date time.Time,
)

DeleteOlderJobs deletes messages that are older than passed date

func (*Job) DispatchFailedJobs

func (j *Job) DispatchFailedJobs(
	queue string,
	maxAttempts int,
)

DispatchFailedJobs update failed jobs to re run again by workers

func (*Job) Init

func (j *Job) Init(config *Config) *Job

type JobModel

type JobModel struct {
	ID          uuid.UUID `gorm:"primaryKey;type:uuid;"`
	Queue       string    `gorm:"index,default:'default'"`
	Handler     string
	Payload     string `gorm:"type:longtext"`
	Error       string `gorm:"type:longtext"`
	Attempts    uint8  `gorm:"default:0"`
	ReservedAt  sql.NullInt64
	FailedAt    sql.NullInt64
	AvailableAt int64
	CreatedAt   int64
}

func (JobModel) TableName

func (JobModel) TableName() string

type JobProcessor

type JobProcessor struct {
	DB      *gorm.DB                                 // database instance
	Handler func(job JobModel, workerId uint8) error // Function to process each job
	Queue   string                                   // The queue to process
	Workers uint8                                    // Number of concurrent workers
	// contains filtered or unexported fields
}

JobProcessor holds the configuration for processing jobs with a worker pool

func NewJobProcessor

func NewJobProcessor(
	ctx *context.Context,
	handler func(job JobModel, workerId uint8) error,
	queue string,
	workers uint8,
	config *Config,
) *JobProcessor

NewJobProcessor creates a new instance of the job processor

func (*JobProcessor) Start

func (p *JobProcessor) Start()

Start the worker pool and begins processing jobs concurrently

func (*JobProcessor) Stop

func (p *JobProcessor) Stop()

Stop stops the job processor by closing the job channel and waiting for all workers to finish

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL