yaqpg

package module
v0.0.0-...-898239c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2023 License: MIT Imports: 11 Imported by: 0

README

yaqpg - Yet Another Queue for PostgreSQL

GoDoc Report Card Coverage Status

YAQPg is a simple PostgreSQL locking (and skipping) queue for local use. It is pronounced "yack-pee-gee" -- or jákpídzí or อยากผีจี if you prefer.

It is intended for small, simple workloads. If you want to build a system with a queue that might one day expand to use RabbitMQ or Amazon SQS, but right now you just want to run it on a single host, YAQPg might be of help. If you want to run a bigger queue on your PostgreSQL database, and you know what you're doing, you should consider using the original pgq. If you want to try different, possibly more mature implementation of a PostgreSQL queue in Go, there are many packages available.

The idea came (to me) from these two authors:

David Christensen

Andrew Stuart

This implementation is biased towards what we consider the "normal" use-case at TONSAI LLC. Your definition of "normal" may vary!

WARNING! ALPHA SOFTWARE!

This package is new (as of June 2023) and has not been tested much. Like all software, it probably contains bugs, and like all new software it probably contains a lot of them. 🪲🪲🪲

Example Usage


package main

import (
    "context"
    "log"
    "time"

    "github.com/biztos/yaqpg"
)

func main() {

    queue := yaqpg.MustStartNamedQueue("example")
    if err := queue.Add("ex1", []byte("any payload")); err != nil {
        log.Fatal(err)
    }
    if err := queue.AddDelayed("ex2", []byte("later payload"), time.Second); err != nil {
        log.Fatal(err)
    }
    proc := yaqpg.FunctionProcessor{
        func(ctx context.Context, item *yaqpg.Item) error {
            log.Println("processing", item.Ident, string(item.Payload))
            return nil
        }}
    queue.LogCounts()       // shows 1 ready, 1 pending
    time.Sleep(time.Second) // wait out the pending item
    if err := queue.Process(2, proc); err != nil {
        log.Fatal(err)
    }
    queue.LogCounts() // shows queue is empty

}

How It Works

YAQPg inserts items into a queue table in your PostgreSQL database. Within a queue table, there can be many named queues.

Each item consists of an Identifier and a binary Payload. An item also has a ReadyAt timestamp, indicating the earliest time at which it can be processed; and an Attempts count to allow giving up.

When you Process an item in the queue, it asynchronously executes the ProcessFunc function with the Item and a timeout context; if that function returns an error then the processing was assumed to have failed and the item is released back into the queue, subject to the MaxAttempts value of the Queue.

If the function returns nil or MaxAttempts is exceeded, the item is dropped from the queue, i.e. deleted from the database.

Limitations

Duplicates Allowed

Preventing duplicates is a tricky problem, because:

  • Duplicates of what exactly? Identifier? Payload?
  • Always? Optionally?
  • For how long?
  • At what performance cost?
  • And what should you do when you get a dupe anyway?

Instead of giving opinionated answers to these questions, this package simply allows duplicates. Queue processing should be idempotent(ish), and if it can't be then build that logic into your processor!

Binary Payload

The payload is just a []byte -- that is, a BYTEA in the database. This is convenient or inconvenient, depending on your use case.

The rationale is pretty simple: you should not be examining the payload on the database anyway, so opacity is not an issue; you would have to marshal behind the scenes anyway if we have jsonb on the database side and some object on the Go side; and pure bytes leaves the door open to sending actual binary payloads and/or compressing the payloads.

It might change to interface{} (aka any) in the future, but probably not.

Recommendations (not enforced)

Use autovacuum.

Not using autovacuum will very likely cause pain!

Use a dedicated database.

Because the insert/update/delete activity on a job queue can become very intense, and require a lot of vacuuming, you should usually keep the queue(s) in a separate database from your main database (if you have one).

That way, you can examine the database storage and watch for problems. This being experimental software, there might be problems!

You may also wish to use a dedicated table per queue, or even a dedicated database. But out of the box, yaqpg will handle multiple named queues in a single table.

You shouldn't need a dedicated server unless you are running high volumes, in which case you probably shouldn't be using this package. (Or maybe yes?)

Don't put too much in the Payload.

That's not what a queue is for! And your database may not like it.

There are no hard limits here, so $TOO_MUCH is Up To You. But you have been warned.

Schema

This package includes a file schema.sql which will create the default table named yaqpg_queue in your database. You can also get this SQL as a string from the Schema function.

You can make your own queue table, but it needs to have all the columns in the schema SQL or it will not work with this package.

Documentation

Overview

Package yaqpg proivides a simple locking queue for PostgreSQL databases. It is intended for light, local workloads.

Index

Constants

View Source
const MaxProcessLimit = 1000

Variables

View Source
var DefaultMaxAttempts = 5
View Source
var DefaultMaxConnections = 8
View Source
var DefaultMaxReprocessDelay = 1 * time.Minute
View Source
var DefaultProcessContextTimeout = 1 * time.Minute
View Source
var DefaultQueueName = "jobs"
View Source
var DefaultReprocessDelay = 2 * time.Second
View Source
var DefaultTableName = "yaqpg_queue"
View Source
var DelayTime = func(d time.Duration) time.Time {
	return Now().Add(d)
}
View Source
var Exit = os.Exit
View Source
var FillBatchSize = 100
View Source
var Now = time.Now

Overridable with custom "now" for testing.

View Source
var StartWithPlaceholder = true

Functions

func DefaultBackoffDelay

func DefaultBackoffDelay(attempts int) time.Duration

DefaultBackoffDelay returns DefaultReprocessDelay that doubles for every attempt over one, up to DefaultMaxReprocessDelay.

func DefaultStableDelay

func DefaultStableDelay(attempts int) time.Duration

DefaultStableDelay returns DefaultReprocessDelay regardless of attempts.

func Schema

func Schema(q *Queue) string

Schema returns the SQL statements needed to (re)create the database table for this queue.

Types

type FunctionProcessor

type FunctionProcessor struct {
	Function func(context.Context, *Item) error
}

FunctionProcessor is a Processor that uses a simple function to process each item.

func (FunctionProcessor) Process

func (p FunctionProcessor) Process(ctx context.Context, item *Item) error

Process implements Processor for FunctionProcessor using a thin wrapper.

type Item

type Item struct {
	BatchId  ulid.ULID // *should* be included in logs by the Processor.
	Ident    string
	Attempts int
	Payload  []byte
	// contains filtered or unexported fields
}

Item represents a queue item as used in Process and batch adding.

func NewItem

func NewItem(ident string, payload []byte) *Item

NewItem returns an item for AddBatch processing.

func (*Item) Scan

func (it *Item) Scan(row pgx.Row) error

Scan puts a database row into the item values.

type Logger

type Logger interface {
	Println(...interface{})
}

Logger is the simplest logger used by Queue.

var DefaultLogger Logger = log.Default()

type Processor

type Processor interface {
	// Process processes an item.  Returning error will invoke the re-queueing
	// logic, subject to MaxAttempts in the Queue.
	Process(context.Context, *Item) error
}

Processor is the thing that processes an item, returning an error if not completed successfully.

type Queue

type Queue struct {
	Name                  string
	TableName             string
	MaxAttempts           int
	ProcessContextTimeout time.Duration
	ReprocessDelayFunc    func(attempts int) time.Duration
	Pool                  *pgxpool.Pool
	Logger                Logger
	Silent                bool
}

Queue represents a named queue in a specific database table.

func MustStartDefaultQueue

func MustStartDefaultQueue() *Queue

MustStartDefaultQueue calls StartDefaultQueue and panics on error.

func MustStartNamedQueue

func MustStartNamedQueue(name string) *Queue

MustStartNamedQueue calls StartNamedQueue and panics on error.

func NewDefaultQueue

func NewDefaultQueue() *Queue

NewDefaultQueue returns a new queue with all defaults.

func NewNamedQueue

func NewNamedQueue(name string) *Queue

NewNamedQueue returns a new queue with all defaults except Name.

func StartDefaultQueue

func StartDefaultQueue() (*Queue, error)

StartDefaultQueue creates a NewDefaultQueue and calls Connect with DefaultMaxConnections.

func StartNamedQueue

func StartNamedQueue(name string) (*Queue, error)

StartNamedQueue creates a NewNamedQueue and calls Connect with DefaultMaxConnections.

func (*Queue) Add

func (q *Queue) Add(ident string, payload []byte) error

Add adds an item to the queue. The item will be immediately available.

func (*Queue) AddBatch

func (q *Queue) AddBatch(items []*Item) error

AddBatch adds a set of items in a transaction for immediate availability when finished. Every *Item in items will be given the BatchId unique to this call.

func (*Queue) AddBatchDelayed

func (q *Queue) AddBatchDelayed(items []*Item, delay time.Duration) error

AddBatchDelayed adds a set of items in a transaction for the given delay. The delay will be the same for all items. Every *Item in items will be given the BatchId unique to this call.

func (*Queue) AddDelayed

func (q *Queue) AddDelayed(ident string, payload []byte, delay time.Duration) error

AddDelayed adds an item to the queue with the specified delay.

func (*Queue) Connect

func (q *Queue) Connect(max_connections int) error

Connect creates connection pool for the database defined in the DATABASE_URL environment variable and stores the connection in q.Pool.

func (*Queue) Count

func (q *Queue) Count() (int, error)

Count returns the total number of items in the queue, regardless of ready_at values.

func (*Queue) CountPending

func (q *Queue) CountPending() (int, error)

CountPending returns the total number of items not yet ready in the queue, i.e. those with a ready_at later than the current time. For obvious reasons, this number may be inaccurate by the time you consume it.

func (*Queue) CountReady

func (q *Queue) CountReady() (int, error)

CountReady returns the total number of ready items in the queue, including items currently being processed. For obvious reasons, this number may be inaccurate by the time you consume it.

(Excluding items in flight at the db level is too inefficient and anyway none of this remains accurate. If we find a need to know the total number of items being processed, this can be handled on the code side easily enough, but is probably not worth the trouble.)

func (*Queue) CreateSchema

func (q *Queue) CreateSchema() error

CreateSchema connects and creates the queue table and indexes if needed. Existing relations are NOT dropped! The queue must already have connected to the database.

func (*Queue) Fill

func (q *Queue) Fill(count int, payload_size int, delay_max time.Duration) error

Fill fills the queue with count items of test data, with a randomized delay up to delay_max. The payload will be payload_size bytes of random data. Every item will have the same payload.

Fill concurrently adds batches of up to FillBatchSize items, all of which will have the same delay time. More randomness can be obtained by setting this to a lower value.

This is (arguably) useful for testing!

NOTE: variable payload size is not supported at this time.

func (*Queue) Log

func (q *Queue) Log(v ...interface{})

Log writes v to the logger, with the queue [Name] as a prefix.

func (*Queue) LogCounts

func (q *Queue) LogCounts()

LogCounts retrieves and logs the counts, and panics if any count returns an error.

func (*Queue) Logf

func (q *Queue) Logf(f string, v ...interface{})

Logf calls Log in Printf style.

func (*Queue) MustCount

func (q *Queue) MustCount() int

MustCount calls Count and panics on error.

func (*Queue) MustCountPending

func (q *Queue) MustCountPending() int

MustCountPending calls CountPending and panics on error.

func (*Queue) MustCountReady

func (q *Queue) MustCountReady() int

MustCountReady calls CountReady and panics on error.

func (*Queue) Process

func (q *Queue) Process(limit int, proc Processor) error

Process gets up to limit items from the queue and processes them in goroutines. The items are updated after processing completes, and the transaction (batch) is committed if there were no database errors, or rolled back if there were. The Processor's Process function is passed a ready_at context which it should respect. Returning an error from that function will result in the item being released back into the queue with its Attempts incremented and its ReadyAt pushed out by ReadyAtDelay; or deleted if there MaxAttempts are exceeded.

func (*Queue) ProcessReady

func (q *Queue) ProcessReady(limit int, proc Processor) error

ProcessReady calls Process with limit concurrently as many times as needed for the currently ready set of items. The number of ready items may not be zero after completion: pending items may become available, and new items may be added by other processes.

WARNING: this can be dangerous if you have a large queue!

Directories

Path Synopsis
filldrain
filldrain - fill and then batch-drain the queue
filldrain - fill and then batch-drain the queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL