bgjob

package module
v1.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 1, 2024 License: MIT Imports: 9 Imported by: 1

README

bgjob

Build and test codecov Go Report Card

Tiny library to handle background jobs.

Uses PostgreSQL to organize job queues.

Highly inspired by gue

Features

  • Durable job storage
  • At-least-ones execution
  • Job uniqueness
  • Delayed execution
  • Retries with dynamic timeout
  • Enqueuing jobs in transaction
  • DLQ
  • Zero dependencies (database/sql is used, to log events you can use Observer)

Why ?

  • You need flexible and robust job processing tool
  • You already use PostgreSQL
  • You require strong guaranties for task processing and consistency
  • You have a quite small load. Queues on database usually can handle around 1000 rps

State

  • The package has been used in production for 3 years with a small load

Install

  1. go get github.com/txix-open/bgjob
  2. Add to your db migration tool sql from migration/init.sql

Complete example

package main

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"log"
	"runtime"
	"time"

	_ "github.com/jackc/pgx/v5/stdlib"

	"github.com/txix-open/bgjob"
)

type Observer struct {
	bgjob.NoopObserver //"extend" NoopObserver to override method you need
}

func (o Observer) WorkerError(ctx context.Context, err error) {
	log.Printf("worker: %v", err)
}

func main() {
	dsn := "postgres://test:test@localhost:5432/test"
	db, err := sql.Open("pgx", dsn) //be sure you add tables and indexes from migration/init.sql
	if err != nil {
		panic(err)
	}

	handleComplete := bgjob.HandlerFunc(func(ctx context.Context, job bgjob.Job) bgjob.Result {
		fmt.Printf("%s\n", job.Arg)
		return bgjob.Complete() //complete job, job will be deleted
	})
	handleRetry := bgjob.HandlerFunc(func(ctx context.Context, job bgjob.Job) bgjob.Result {
		if job.Attempt == 2 {
			return bgjob.Complete()
		}
		//here you have attempts counter, you can easily do backoff retries
		return bgjob.Retry(1*time.Second, errors.New("some error"))
	})
	handleMoveToDlq := bgjob.HandlerFunc(func(ctx context.Context, job bgjob.Job) bgjob.Result {
		//you can move the job to permanent storage if you can't handle it
		return bgjob.MoveToDlq(errors.New("move to dlq"))
	})

	cli := bgjob.NewClient(bgjob.NewPgStore(db))

	//if handler for job type wasn't provided, job will be moved to dlq
	handler := bgjob.NewMux().
		Register("complete_me", handleComplete).
		Register("retry_me", handleRetry).
		Register("move_to_dlq", handleMoveToDlq)
	queueName := "test"
	observer := Observer{}
	worker := bgjob.NewWorker(
		cli,
		queueName,
		handler,
		bgjob.WithObserver(observer),            //default noop
		bgjob.WithConcurrency(runtime.NumCPU()), //default 1
		bgjob.WithPollInterval(500*time.Millisecond), //default 1s
	)
	ctx := context.Background()
	worker.Run(ctx) //call ones, non-blocking

	err = cli.Enqueue(ctx, bgjob.EnqueueRequest{
		Id:    "test", //you can provide you own id
		Queue: queueName,
		Type:  "complete_me",
		Arg:   []byte(`{"simpleJson": 1}`), //it can be json or protobuf or a simple string
		Delay: 1 * time.Second,             //you can delay job execution
	})
	if err != nil {
		panic(err)
	}

	err = cli.Enqueue(ctx, bgjob.EnqueueRequest{
		Queue: queueName,
		Type:  "retry_me",
		//those fields must be specified
	})
	if err != nil {
		panic(err)
	}

	err = cli.Enqueue(ctx, bgjob.EnqueueRequest{
		Queue: queueName,
		Type:  "move_to_dlq",
	})
	if err != nil {
		panic(err)
	}

	time.Sleep(1 * time.Second)
	worker.Shutdown() //graceful shutdown, call ones
}


Enqueue job in transaction

  • bgjob.Enqueue and bgjob.BulkEnqueue can help
package main

func enqueueInTx(db *sql.DB) (err error) {
	tx, err := db.Begin()
	if err != nil {
		return fmt.Errorf("begin tx: %w", err)
	}
	defer func() {
		if err != nil {
			_ = tx.Rollback()
		}
	}()
	
	// YOUR BUSINESS TRANSACTION HERE
	
	err = bgjob.Enqueue(context.Background(), tx, bgjob.EnqueueRequest{
		Queue: "work",
		Type:  "send_email",
	})
	if err != nil {
		return fmt.Errorf("begin tx: %w", err)
	}
	
	err = tx.Commit()
	if err != nil {
		return fmt.Errorf("commit tx: %w", err)
	}
	
	return nil
}

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrQueueIsRequired = errors.New("queue is required")
	ErrTypeIsRequired  = errors.New("type is required")
	ErrEmptyQueue      = errors.New("queue is empty")
	ErrJobAlreadyExist = errors.New("job already exist")

	ErrUnknownType = errors.New("unknown type")
)

Functions

func BulkEnqueue

func BulkEnqueue(ctx context.Context, e ExecerContext, list []EnqueueRequest) error

func Enqueue

func Enqueue(ctx context.Context, e ExecerContext, req EnqueueRequest) error

func NewPgStore

func NewPgStore(db *sql.DB) *pgStore

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(store Store) *Client

func (*Client) BulkEnqueue

func (c *Client) BulkEnqueue(ctx context.Context, list []EnqueueRequest) error

func (*Client) Do

func (c *Client) Do(ctx context.Context, queue string, f func(ctx context.Context, job Job) Result) error

func (*Client) Enqueue

func (c *Client) Enqueue(ctx context.Context, req EnqueueRequest) error

type EnqueueRequest

type EnqueueRequest struct {
	Id    string        //optional
	Queue string        //required
	Type  string        //required
	Arg   []byte        //optional
	Delay time.Duration //optional
}

type ExecerContext

type ExecerContext interface {
	ExecContext(ctx context.Context, s string, args ...interface{}) (sql.Result, error)
}

type Handler

type Handler interface {
	Handle(ctx context.Context, job Job) Result
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, job Job) Result

func (HandlerFunc) Handle

func (h HandlerFunc) Handle(ctx context.Context, job Job) Result

type Job

type Job struct {
	Id        string
	Queue     string
	Type      string
	Arg       []byte
	Attempt   int32
	LastError *string
	NextRunAt int64
	CreatedAt time.Time
	UpdatedAt time.Time
}

type Mux

type Mux struct {
	// contains filtered or unexported fields
}

func NewMux

func NewMux() *Mux

func (*Mux) Handle

func (m *Mux) Handle(ctx context.Context, job Job) Result

func (*Mux) Register

func (m *Mux) Register(jobType string, handler Handler) *Mux

type NoopObserver

type NoopObserver struct {
}

func NewNoopObserver

func NewNoopObserver() NoopObserver

func (NoopObserver) JobCompleted

func (n NoopObserver) JobCompleted(ctx context.Context, job Job)

func (NoopObserver) JobMovedToDlq

func (n NoopObserver) JobMovedToDlq(ctx context.Context, job Job, err error)

func (NoopObserver) JobRescheduled

func (n NoopObserver) JobRescheduled(ctx context.Context, job Job, after time.Duration)

func (NoopObserver) JobStarted

func (n NoopObserver) JobStarted(ctx context.Context, job Job)

func (NoopObserver) JobWillBeRetried

func (n NoopObserver) JobWillBeRetried(ctx context.Context, job Job, after time.Duration, err error)

func (NoopObserver) QueueIsEmpty

func (n NoopObserver) QueueIsEmpty(ctx context.Context)

func (NoopObserver) WorkerError

func (n NoopObserver) WorkerError(ctx context.Context, err error)

type Observer

type Observer interface {
	JobStarted(ctx context.Context, job Job)
	JobCompleted(ctx context.Context, job Job)
	JobRescheduled(ctx context.Context, job Job, after time.Duration)
	JobWillBeRetried(ctx context.Context, job Job, after time.Duration, err error)
	JobMovedToDlq(ctx context.Context, job Job, err error)
	QueueIsEmpty(ctx context.Context)
	WorkerError(ctx context.Context, err error)
}

type Result

type Result struct {
	// contains filtered or unexported fields
}

func Complete

func Complete() Result

func MoveToDlq

func MoveToDlq(err error) Result

func Reschedule

func Reschedule(after time.Duration) Result

func Retry

func Retry(after time.Duration, err error) Result

type Store

type Store interface {
	Acquire(ctx context.Context, queue string, tx func(tx Tx) error) error
	BulkInsert(ctx context.Context, jobs []Job) error
}

type Tx

type Tx interface {
	Job() Job
	Update(ctx context.Context, id string, attempt int32, lastError string, nextRunAt int64) error
	UpdateNextRun(ctx context.Context, id string, nextRunAt int64) error
	Delete(ctx context.Context, id string) error
	SaveInDlq(ctx context.Context, job Job) error
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(cli *Client, queue string, handler Handler, opts ...WorkerOption) *Worker

func (*Worker) Run

func (w *Worker) Run(ctx context.Context)

func (*Worker) Shutdown

func (w *Worker) Shutdown()

type WorkerOption

type WorkerOption func(w *Worker)

func WithConcurrency

func WithConcurrency(value int) WorkerOption

func WithObserver

func WithObserver(observer Observer) WorkerOption

func WithPollInterval

func WithPollInterval(interval time.Duration) WorkerOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL