pgmq

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 4, 2023 License: MIT Imports: 7 Imported by: 1

README

pgmq-go

Go Reference Go Report Card CI codecov

A Go (Golang) client for Postgres Message Queue (PGMQ). Based loosely on the Rust client.

Usage

Start a Postgres instance with the PGMQ extension installed:

docker run -d --name postgres -e POSTGRES_PASSWORD=password -p 5432:5432 quay.io/tembo/pgmq-pg:latest

Then

package main

import (
    "context"
    "fmt"

    "github.com/craigpastro/pgmq-go"
)

func main() {
    ctx := context.Background()

    q, err := pgmq.New(ctx, "postgres://postgres:password@localhost:5432/postgres")
    if err != nil {
        panic(err)
    }

    err = q.CreateQueue(ctx, "my_queue")
    if err != nil {
        panic(err)
    }

    id, err := q.Send(ctx, "my_queue", map[string]any{"foo": "bar"})
    if err != nil {
        panic(err)
    }

    msg, err := q.Read(ctx, "my_queue", 30)
    if err != nil {
        panic(err)
    }

    // Archive the message by moving it to the "pgmq_<queue_name>_archive" table.
    // Alternatively, you can `Delete` the message, or read and delete in one
    // call by using `Pop`.
    _, err = q.Archive(ctx, "my_queue", id)
    if err != nil {
        panic(err)
    }
}

Contributions

We ❤ contributions.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoRows = errors.New("pgmq: no rows in result set")
	ErrPing   = errors.New("pgmq: failed to ping db")
)

Functions

This section is empty.

Types

type DB

type DB interface {
	Exec(ctx context.Context, sql string, args ...any) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
	Close()
}

type Message

type Message struct {
	MsgID      int64
	ReadCount  int64
	EnqueuedAt time.Time
	// VT is "visibility time". The UTC timestamp at which the message will
	// be available for reading again.
	VT      time.Time
	Message map[string]any
}

type PGMQ

type PGMQ struct {
	// contains filtered or unexported fields
}

func MustNew

func MustNew(ctx context.Context, connString string) *PGMQ

MustNew is similar to New, but panics if it encounters an error.

func New

func New(ctx context.Context, connString string) (*PGMQ, error)

New establishes a connection to Postgres given by the connString, checks connection, if check is failed, returns ErrPing, that can be retried, then creates the pgmq extension if it does not already exist.

func (*PGMQ) Archive

func (p *PGMQ) Archive(ctx context.Context, queue string, msgID int64) (bool, error)

Archive moves a message from the queue table to the archive table by its id. View messages on the archive table with sql:

select * from pgmq_<queue_name>_archive;

func (*PGMQ) ArchiveBatch

func (p *PGMQ) ArchiveBatch(ctx context.Context, queue string, msgIDs []int64) ([]bool, error)

ArchiveBatch moves a batch of messages from the queue table to the archive table by their ids. View messages on the archive table with sql:

select * from pgmq_<queue_name>_archive;

func (*PGMQ) Close

func (p *PGMQ) Close()

Close closes the underlying connection pool.

func (*PGMQ) CreateQueue

func (p *PGMQ) CreateQueue(ctx context.Context, queue string) error

CreateQueue creates a new queue. This sets up the queue's tables, indexes, and metadata.

func (*PGMQ) Delete

func (p *PGMQ) Delete(ctx context.Context, queue string, msgID int64) (bool, error)

Delete deletes a message from the queue by its id. This is a permanent delete and cannot be undone. If you want to retain a log of the message, use the Archive method.

func (*PGMQ) DeleteBatch

func (p *PGMQ) DeleteBatch(ctx context.Context, queue string, msgIDs []int64) ([]bool, error)

DeleteBatch deletes a batch of messages from the queue by their ids. This is a permanent delete and cannot be undone. If you want to retain a log of the messages, use the ArchiveBatch method.

func (*PGMQ) DropQueue

func (p *PGMQ) DropQueue(ctx context.Context, queue string) error

DropQueue deletes the given queue. It deletes the queue's tables, indices, and metadata. It will return an error if the queue does not exist.

func (*PGMQ) Pop

func (p *PGMQ) Pop(ctx context.Context, queue string) (*Message, error)

Pop reads single message from the queue and deletes it at the same time. Similar to Read and ReadBatch if no messages are available an ErrNoRows is returned. Unlike these methods, the visibility timeout does not apply. This is because the message is immediately deleted.

func (*PGMQ) Read

func (p *PGMQ) Read(ctx context.Context, queue string, vt int64) (*Message, error)

Read a single message from the queue. If the queue is empty or all messages are invisible, an ErrNoRows errors is returned. If a message is returned, it is made invisible for the duration of the visibility timeout (vt) in seconds.

func (*PGMQ) ReadBatch

func (p *PGMQ) ReadBatch(ctx context.Context, queue string, vt int64, numMsgs int64) ([]*Message, error)

ReadBatch reads a specified number of messages from the queue. Any messages that are returned are made invisible for the duration of the visibility timeout (vt) in seconds. If vt is 0 it will be set to the default value, vtDefault.

If the queue is empty or all messages are invisible an ErrNoRows error is returned.

func (*PGMQ) Send

func (p *PGMQ) Send(ctx context.Context, queue string, msg map[string]any) (int64, error)

Send sends a single message to a queue. The message id, unique to the queue, is returned.

func (*PGMQ) SendBatch

func (p *PGMQ) SendBatch(ctx context.Context, queue string, msgs []map[string]any) ([]int64, error)

SendBatch sends a batch of messages to a queue. The message ids, unique to the queue, are returned.

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL