pgmq

package module
v0.0.0-...-02a8b7d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2024 License: MIT Imports: 6 Imported by: 1

README

pgmq

A Go library for the pgmq message queue extension for PostgreSQL.

Features

  • All pgmq features
  • Type-safety
  • Transactions

Requirements

  • pgmq extension installed in your PostgreSQL database

Installation

go get github.com/joeychilson/pgmq

Example

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/joeychilson/pgmq"
)

// EmailNotification represents an email to be sent
type EmailNotification struct {
	To      string            `json:"to"`
	Subject string            `json:"subject"`
	Body    string            `json:"body"`
	Meta    map[string]string `json:"meta,omitempty"`
}

// EmailProcessor handles the email queue processing
type EmailProcessor struct {
	queue        *pgmq.Queue[EmailNotification]
	emailService EmailService
}

// EmailService interface defines methods for sending emails
type EmailService interface {
	Send(ctx context.Context, email EmailNotification) error
}

// MockEmailService implements EmailService for demonstration
type MockEmailService struct{}

func (m *MockEmailService) Send(ctx context.Context, email EmailNotification) error {
	log.Printf("Sending email to %s: %s\n", email.To, email.Subject)
	return nil
}

func main() {
	ctx := context.Background()

	// Connect to PostgreSQL
	db, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatalf("failed to connect to database: %v", err)
	}
	defer db.Close()

	// Create a new typed queue for email notifications
	emailQueue, err := pgmq.New[EmailNotification](ctx, db, "email_notifications")
	if err != nil {
		log.Fatalf("failed to create queue: %v", err)
	}

	// Create the queue table if it doesn't exist
	if err := emailQueue.Create(ctx); err != nil {
		log.Fatalf("failed to create queue table: %v", err)
	}

	// Initialize the email processor
	processor := &EmailProcessor{
		queue:        emailQueue,
		emailService: &MockEmailService{},
	}

	// Start the producer (simulating email requests)
	go func() {
		if err := processor.produceEmails(ctx); err != nil {
			log.Printf("producer error: %v", err)
		}
	}()

	// Start the consumer (processing emails)
	if err := processor.consumeEmails(ctx); err != nil {
		log.Fatalf("consumer error: %v", err)
	}
}

func (p *EmailProcessor) produceEmails(ctx context.Context) error {
	// Simulate sending different types of emails
	emails := []*EmailNotification{
		{
			To:      "user1@example.com",
			Subject: "Welcome to our platform!",
			Body:    "Thank you for signing up...",
			Meta: map[string]string{
				"type": "welcome",
			},
		},
		{
			To:      "user2@example.com",
			Subject: "Your order has shipped",
			Body:    "Your order #12345 is on its way...",
			Meta: map[string]string{
				"type":    "order_status",
				"orderID": "12345",
			},
		},
		{
			To:      "user3@example.com",
			Subject: "Password reset request",
			Body:    "Click here to reset your password...",
			Meta: map[string]string{
				"type": "security",
			},
		},
	}

	// Send emails to the queue
	msgIDs, err := p.queue.SendBatch(ctx, emails)
	if err != nil {
		return fmt.Errorf("failed to send batch: %w", err)
	}

	log.Printf("Successfully queued %d emails with IDs: %v", len(msgIDs), msgIDs)
	return nil
}

func (p *EmailProcessor) consumeEmails(ctx context.Context) error {
	log.Println("Starting email consumer...")

	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
			// Read a batch of up to 10 messages with a 30-second visibility timeout
			messages, err := p.queue.ReadBatch(ctx, 10, pgmq.VisibilityTimeoutDefault)
			if err != nil {
				log.Printf("error reading messages: %v", err)
				time.Sleep(5 * time.Second)
				continue
			}

			if len(messages) == 0 {
				log.Println("no messages available, waiting...")
				time.Sleep(5 * time.Second)
				continue
			}

			// Process each message
			var successIDs []int64
			for _, msg := range messages {
				err := p.emailService.Send(ctx, msg.Message)
				if err != nil {
					log.Printf("failed to send email ID %d: %v", msg.ID, err)
					continue
				}
				successIDs = append(successIDs, msg.ID)
			}

			// Archive successfully processed messages
			if len(successIDs) > 0 {
				if err := p.queue.ArchiveBatch(ctx, successIDs); err != nil {
					log.Printf("failed to archive messages: %v", err)
				}
			}

			// Print queue metrics periodically
			metrics, err := p.queue.Metrics(ctx)
			if err != nil {
				log.Printf("failed to get metrics: %v", err)
			} else {
				log.Printf("Queue metrics - Length: %d, Total Messages: %d",
					metrics.Length, metrics.TotalMessages)
			}
		}
	}
}

Documentation

Index

Constants

View Source
const (
	// ReadLimitDefault is the default maximum number of messages to read.
	ReadLimitDefault = 1
	// VisibilityTimeoutDefault is the default message visibility timeout.
	VisibilityTimeoutDefault = 30 * time.Second
	// PollTimeoutDefault is the default maximum time to wait for a message.
	PollTimeoutDefault = 5 * time.Second
	// PollIntervalDefault is the default time to wait between polling attempts.
	PollIntervalDefault = 250 * time.Millisecond
)

Variables

This section is empty.

Functions

func ListQueues

func ListQueues(ctx context.Context, querier Querier) ([]string, error)

ListQueues retrieves a list of all queues in the database.

func ValidateQueueName

func ValidateQueueName(ctx context.Context, querier Querier, name string) error

ValidateQueueName checks if the queue name length is within the allowed limits.

Types

type Message

type Message[T any] struct {
	ID         int64     `json:"msg_id"`
	ReadCount  int       `json:"read_ct"`
	EnqueuedAt time.Time `json:"enqueued_at"`
	VisibleAt  time.Time `json:"vt"`
	Message    T         `json:"message"`
}

Message represents a single message in the queue with metadata.

type Querier

type Querier interface {
	Exec(context.Context, string, ...any) (pgconn.CommandTag, error)
	Query(context.Context, string, ...any) (pgx.Rows, error)
	QueryRow(context.Context, string, ...any) pgx.Row
}

Querier interface defines the required database operations.

type Queue

type Queue[T any] struct {
	// contains filtered or unexported fields
}

Queue represents a typed message queue instance.

func New

func New[T any](ctx context.Context, querier Querier, name string) (*Queue[T], error)

New creates a new Queue instance with the specified name. The pgmq extension is automatically created if it does not exist.

func (*Queue[T]) Archive

func (q *Queue[T]) Archive(ctx context.Context, msgID int64) error

Archive moves a message to the archive table by its ID.

func (*Queue[T]) ArchiveBatch

func (q *Queue[T]) ArchiveBatch(ctx context.Context, msgIDs []int64) error

ArchiveBatch moves multiple messages to the archive table.

func (*Queue[T]) Create

func (q *Queue[T]) Create(ctx context.Context) error

Create initializes a new queue in the database.

func (*Queue[T]) CreatePartitioned

func (q *Queue[T]) CreatePartitioned(ctx context.Context, partitionInterval, retentionInterval int) error

CreatePartitioned initializes a new partitioned queue in the database.

func (*Queue[T]) CreateUnlogged

func (q *Queue[T]) CreateUnlogged(ctx context.Context) error

CreateUnlogged initializes a new unlogged queue in the database.

func (*Queue[T]) Delete

func (q *Queue[T]) Delete(ctx context.Context, msgID int64) error

Delete permanently removes a message from the queue by its ID.

func (*Queue[T]) DeleteBatch

func (q *Queue[T]) DeleteBatch(ctx context.Context, msgIDs []int64) error

DeleteBatch permanently removes multiple messages from the queue.

func (*Queue[T]) DetachArchive

func (q *Queue[T]) DetachArchive(ctx context.Context) error

DetachArchive detaches the archive table from the queue.

func (*Queue[T]) Drop

func (q *Queue[T]) Drop(ctx context.Context) error

Drop removes the entire queue and all its messages.

func (*Queue[T]) Metrics

func (q *Queue[T]) Metrics(ctx context.Context) (*QueueMetrics, error)

Metrics retrieves the current queue metrics.

func (*Queue[T]) Pop

func (q *Queue[T]) Pop(ctx context.Context) (*Message[T], error)

Pop removes and returns a single message from the queue.

func (*Queue[T]) Purge

func (q *Queue[T]) Purge(ctx context.Context) (int, error)

Purge removes all messages from the queue.

func (*Queue[T]) Read

func (q *Queue[T]) Read(ctx context.Context, visibilityTimeout time.Duration) (*Message[T], error)

Read retrieves a single message from the queue. visibilityTimeout is the time to lock the message in seconds.

func (*Queue[T]) ReadBatch

func (q *Queue[T]) ReadBatch(ctx context.Context, maxMessages int, visibilityTimeout time.Duration) ([]*Message[T], error)

ReadBatch retrieves multiple messages from the queue. maxMessages is the maximum number of messages to read. visibilityTimeout is the time to lock the messages in seconds.

func (*Queue[T]) ReadBatchWithPoll

func (q *Queue[T]) ReadBatchWithPoll(ctx context.Context, maxMessages int, visibilityTimeout, pollTimeout, pollInterval time.Duration) ([]*Message[T], error)

ReadBatchWithPoll retrieves a batch of messages with polling support. maxMessages is the maximum number of messages to read. visibilityTimeout is the time to lock the messages in seconds. pollTimeout is the maximum time to wait for a message in seconds. pollInterval is the time to wait between polling attempts in milliseconds.

func (*Queue[T]) ReadWithPoll

func (q *Queue[T]) ReadWithPoll(ctx context.Context, visibilityTimeout, pollTimeout, pollInterval time.Duration) (*Message[T], error)

ReadWithPoll retrieves a single message with polling support. visibilityTimeout is the time to lock the message in seconds. pollTimeout is the maximum time to wait for a message in seconds. pollInterval is the time to wait between polling attempts in milliseconds.

func (*Queue[T]) Send

func (q *Queue[T]) Send(ctx context.Context, message *T) (int64, error)

Send adds a new message to the queue.

func (*Queue[T]) SendBatch

func (q *Queue[T]) SendBatch(ctx context.Context, messages []*T) ([]int64, error)

SendBatch adds multiple messages to the queue in a single operation.

func (*Queue[T]) SendBatchDelayed

func (q *Queue[T]) SendBatchDelayed(ctx context.Context, messages []*T, delay time.Duration) ([]int64, error)

SendBatchDelayed adds multiple messages with a specified delay. delay is the time to wait before the messages become visible in seconds.

func (*Queue[T]) SendDelayed

func (q *Queue[T]) SendDelayed(ctx context.Context, message *T, delay time.Duration) (int64, error)

SendDelayed adds a new message with a specified delay. delay is the time to wait before the messages become visible in seconds.

func (*Queue[T]) SetVisibilityTimeout

func (q *Queue[T]) SetVisibilityTimeout(ctx context.Context, msgID int64, timeout time.Duration) error

SetVisibilityTimeout changes the visibility timeout of a message by its ID. timeout is the new visibility timeout in seconds.

func (*Queue[T]) WithTx

func (q *Queue[T]) WithTx(tx pgx.Tx) *Queue[T]

WithTx creates a new Queue instance with the specified transaction.

type QueueMetrics

type QueueMetrics struct {
	Name             string
	Length           int
	NewestMessageAge *time.Duration
	OldestMessageAge *time.Duration
	TotalMessages    int
	ScrapeTime       time.Time
}

QueueMetrics represents the current queue statistics.

func Metrics

func Metrics(ctx context.Context, querier Querier) ([]*QueueMetrics, error)

Metrics retrieves the metrics of all queues in the database.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL