pgq

package module
v0.0.0-...-4591ef0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

README

PGQ - go queues on top of postgres

GoDoc GoReportCard

pgsq logo

PGQ is a Go package that provides a queuing mechanism for your Go applications. It is built on top of the postgres database and enables developers to implement efficient and reliable, but simple message queues for their services composed architecture using the familiar postgres infrastructure.

Features

  • Postgres-backed: Leverages the power of SQL to store and manage queues.
  • Reliable: Guarantees message persistence and delivery, even if facing the various failures.
  • Transactional: Supports transactional message handling, ensuring consistency.
  • Simple usage: Provides a clean and easy-to-use API for interacting with the queue.
  • Efficient: Optimized for medium to long jobs durations.
  • Scheduled Messages: Schedule messages to be processed at a specific future time.

Why PGQ?

  • Postgres: The postgres just works and is feature rich, scalable and performant.
  • SQL: You are already familiar with SQLm right? No need to learn anything new.
  • Stack: You do not have to maintain/manage/administer/patch any additional message broker component.
  • Simplicity: The pgq is a simple and straightforward solution for your messaging needs.
  • Usability: The pgq can be used for many scenarios.

Of course, you can implement your own implementation or use other messaging technologies, but why would you do that when you can use the postgres which is already there and is a proven technology?

See the benefits using pgq in the following video:

PGQ promo video

When to pick PGQ?

Even though there are other great technologies and tools for complex messaging including the robust routing configuration, sometimes you do not need it, and you can be just fine with the simpler tooling.

Pick pgq if you:

  • need to distribute the traffic fairly among your app replicas
  • need to protect your services from overload
  • want the out-of-a-box observability of your queues
  • want to use SQL for managing your queues
  • already use postgres and you want to keep your tech stack simple
  • don't want to manage another technology and learn its specifics
  • need to schedule messages to be processed at a specific time

No need to bring the new technology to your existing stack when you can be pretty satisfied with postgres. Write the consumers and publishers in various languages with the simple idea behind - use postgres table as a queue. While using pgq you have a superb observability of the queue.
You can easily see the payloads of the messages waiting to be processed, but moreover payloads of both the currently being processed and already processed messages. You can get the processing results, duration and other statistics pretty simply. As the pgq queue table contains the records of already processed jobs too, you already have out of the box the historical statistics of all messages and can view it effortlessly by using simple SQL queries.

Pgq is intended to replace the specialized message brokers in environments where you already use postgres, and you want clean, simple and straightforward communication among your services.

Basic principles

  • Every queue is the single postgres table.
  • You maintain the table on your own. You can extend it as you need.
  • Publishers add new rows to the queue table.
  • Consumers update the pgq mandatory fields of the rows in the queue table.

Installation

To install PGQ, use the go get command:

go get go.dataddo.com/pgq@latest

Setup

Prerequisites:

In order to make the pgq functional, there must exist the postgres table with all the necessary pgq fields. You can create the table on your own with classic CREATE TABLE ..., or you can use the query generator to generate the query for you. The generated query creates the queue table alongside with indexes which improve the consumer queries performance.

You usually run the setup commands just once during the queue setup.

package main

import (
	"fmt"
	"go.dataddo.com/pgq/x/schema"
)

func main() {
	queueName := "my_queue"

	// create string contains the "CREATE TABLE queueName ..." 
	// which you may use for table and indexes creation.
	create := schema.GenerateCreateTableQuery(queueName)
	fmt.Println(create)

	// You may also use the "GenerateDropTableQuery" for dropping all the pgq artifacts (down migration)
}

Usage

Publishing the message
package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"

	_ "github.com/jackc/pgx/v4/stdlib"

	"go.dataddo.com/pgq"
)

func main() {
	postgresDSN := "your_postgres_dsn"
	queueName := "your_queue_name"

	// create a new postgres connection 
	db, err := sql.Open("pgx", postgresDSN)
	if err != nil {
		panic(err.Error())
	}
	defer db.Close()

	// create the publisher which may be reused for multiple messages
	// you may pass the optional PublisherOptions when creating it
	publisher := pgq.NewPublisher(db)

	// publish the message to the queue
	// provide the payload which is the JSON object
	// and optional metadata which is the map[string]string
	msg := &pgq.MessageOutgoing{
		Payload: json.RawMessage(`{"foo":"bar"}`),
	}
	msgId, err := publisher.Publish(context.Background(), queueName, msg)
	if err != nil {
		panic(err.Error())
	}

	fmt.Println("Message published with ID:", msgId)
}

After the message is successfully published, you can see the new row with given msgId in the queue table.

Publisher options

Very often you want some metadata to be part of the message, so you can filter the messages in the queue table by it. Metadata can be any additional information you think is worth to be part of the message, but you do not want to be part of the payload. It can be the publisher app name/version, payload schema version, customer identifiers etc etc.

You can simply attach the metadata to single message by:

metadata := pgq.Metadata{
	"publisherHost": "localhost",
	"payloadVersion": "v1.0"
}

or you can configure the publisher to attach the metadata to all messages it publishes:

opts := []pgq.PublisherOption{
	pgq.WithMetaInjectors(
		pgq.StaticMetaInjector(
			pgq.Metadata{
				"publisherHost": "localhost",
				"publisherVersion": "commitRSA"
			}
		),
	),
},

publisher := pgq.NewPublisher(db, opts)
metadata := pgq.Metadata{
	"payloadVersion": "v1.0" // message specific meta field
}
Consuming the messages
package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"go.dataddo.com/pgq"
	_ "github.com/jackc/pgx/v4/stdlib"
)

func main() {
	postgresDSN := "your_postgres_dsn"
	queueName := "your_queue_name"

	// create a new postgres connection and publisher 
	db, err := sql.Open("pgx", postgresDSN)
	if err != nil {
		panic(err.Error())
	}
	defer db.Close()

	// create the consumer which gets attached to handling function we defined above
	h := &handler{}
	consumer, err := pgq.NewConsumer(db, queueName, h)
	if err != nil {
		panic(err.Error())
	}

	err = consumer.Run(context.Background())
	if err != nil {
		panic(err.Error())
	}
}

// we must specify the message handler, which implements simple interface
type handler struct {}
func (h *handler) HandleMessage(_ context.Context, msg *pgq.MessageIncoming) (processed bool, err error) {
	fmt.Println("Message payload:", string(msg.Payload))
	return true, nil
}
Consumer options

You can configure the consumer by passing the optional ConsumeOptions when creating it.

Option Description
WithLogger Provide your own *slog.Logger to have the pgq logs under control
WithMaxParallelMessages Set how many consumers you want to run concurently in your app.
WithLockDuration You can set your own locks effective duration according to your needs time.Duration. If you handle messages quickly, set the duration in seconds/minutes. If you play with long-duration jobs it makes sense to set this to bigger value than your longest job takes to be processed.
WithPollingInterval Defines the frequency of asking postgres table for the new message [time.Duration].
WithInvalidMessageCallback Handle the invalid messages which may appear in the queue. You may re-publish it to some junk queue etc.
WithHistoryLimit how far in the history you want to search for messages in the queue. Sometimes you want to ignore messages created days ago even though the are unprocessed.
WithMetrics No problem to attach your own metrics provider (prometheus, ...) here.
WithMetadataFilter Allows to filter consumed message. At this point OpEqual and OpNotEqual are supported
consumer, err := NewConsumer(db, queueName, handler,
		WithLogger(slog.New(slog.NewTextHandler(&tbWriter{tb: t}, &slog.HandlerOptions{Level: slog.LevelDebug}))),
		WithLockDuration(10 * time.Minute),
		WithPollingInterval(2 * time.Second),
		WithMaxParallelMessages(1),
		WithMetrics(noop.Meter{}),
	)

For more detailed usage examples and API documentation, please refer to the Dataddo pgq GoDoc page.

Message

The message is the essential structure for communication between services using pgq. The message struct matches the postgres table schema. You can modify the table structure on your own by adding extra columns, but pgq depends on following mandatory fields only:

Field Description
id The unique ID of the message in the db.
payload User's custom message content in JSON format.
metadata User's custom metadata about the message in JSON format so your payload remains cleansed from unnecessary data. This is the good place where to put information like the publisher app name, payload schema version, customer related information for easier debugging etc.
created_at The timestamp when the record in db was created (message received to the queue).
started_at Timestamp indicating when the consumer started to process the message.
scheduled_for Timestamp to delay message processing until a specific time. If NULL, the message is processed immediately.
locked_until Contains the consumer lock validity timestamp. If this field is set and has not expired yet, no other consumer can process the message.
processed_at Timestamp when the message was processed (either success or failure).
error_detail The reason why the processing of the message failed provided by the consumer. NULL means no error.
consumed_count The integer incremented by consumer retries is preventing the consumption of the message which can cause infinite processing loops because of OOM errors etc.

Handy pgq SQL queries

Queue size

Get the messages waiting in the queue to be fetched by consumer.

select * from queue_name where processed_at is null and locked_until is null;

Get the number of messages waiting in the queue. A good candidate for the queue length metric in your monitoring system.

select count(*) from queue_name where processed_at is null and locked_until is null;
Messages currently being processed

Get the messages being processed at the moment.

select * from queue_name where processed_at is null and locked_until is null;

Get the number of messages currently being processed. Another good candidate for metric in your monitoring system.

select count(*) from queue_name where processed_at is null and locked_until is null;

Tip: You can use the pgq table as a source for your monitoring system and enable alerting for suspicious values. It is usually good not to monitor only the peak size but also the empty queues, which may indicate some troubles on publishers side. The queue length and messages being processed example in a Grafana panel populated by data from Prometheus fectehd from postgres queue table

Processed messages

The messages which have already been successfully processed.

select * from queue_name where processed_at is not null and error_detail is null;

The messages which have already been processed, but ended with an error.

select * from queue_name where processed_at is not null and error_detail is not null;
Other useful queries
-- messages created in last 1 day which have not been processed yet
select * from queue_name where processed_at is null and created_at > NOW() - INTERVAL '1 DAY';

-- messages causing unexpected failures of consumers (ususally OOM) 
select * from queue_name where consumed_count > 1;

-- top 10 slowest processed messages
select id, queue_name - started_at as duration  from extractor_input where processed_at is not null and started_at is not null order by duration desc limit 10;

Under the hood

The pgq internally uses the classic UPDATE + SELECT ... FOR UPDATE postgres statement which creates the transactional lock for the selected rows in the postgres table and enables the table to behave like the queue. The Select statement is using the SKIP LOCKED clause which enables the consumer to fetch the messages in the queue in the order they were created, and doesn't get stuck on the locked rows.

Consumer loop
The consumer loop diagram Consumers periodically ask the queue table for the new messages to be processed.
  • When there is no message to be processed, consumer idles for a polling interval duration and then tries again.
  • When the consumer finds the message to be processed, it locks it by updating the locked_until field with the lock duration timestamp.
  • If the consumer fails to update the locked_until field, it means that another consumer has already locked the message, and the current consumer tries to find the message again.
  • If the consumer successfully locks the message, it starts to process it.
  • When the consumer finishes the processing, it updates the processed_at field with the current timestamp.

Optimizing performance

When using the pgq in production environment you should focus on the following areas to improve the performance:

Queue table Indexes

Having indexes on the fields which are used for sending is the essential key for the good performance. When postgres lacks the indexes, it can very negatively influence the performance of searching of the queue, which may lead to slowing down the whole database instance.

Each queue table should have at least the following indexes:

CREATE INDEX IDX_CREATED_AT ON my_queue_name (created_at);
CREATE INDEX IDX_PROCESSED_AT_CONSUMED_COUNT ON my_queue_name (consumed_count, processed_at) WHERE (processed_at IS NULL);

These indexes are automatically part of the output query of the GenerateCreateTableQuery function. But if you create tables on your own, please make sure you have them.

Queue table partitioning

Usually you do not need to keep the full history of the queue table in the database for months back. You may delete such rows with the DELETE command in some cron jobs, but do not forget that DELETE is a very expensive operation in postgres, and it may affect insertions and updates in the table.

The better solution is to use the postgres table partitioning. The easiest way how to set up the partitioning is to use the pg_partman postgres extension.

If the query returns 0, you need to install the extension first, otherwise you're ready to partition.

SELECT count(name) FROM pg_available_extensions where name = 'pg_partman';
  1. we create the template table to be used for creation of new partitions:

The template table must have exactly the same structure as the original queue, and it has the _template name suffix. It must also contain the indexes so the partition derived tables have it too.

CREATE TABLE my_queue_name_template (id UUID NOT NULL DEFAULT gen_random_uuid(), created_at TIMESTAMP(0) WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL, payload JSONB DEFAULT NULL, metadata JSONB DEFAULT NULL, locked_until TIMESTAMP(0) WITH TIME ZONE DEFAULT NULL, processed_at TIMESTAMP(0) WITH TIME ZONE DEFAULT NULL, error_detail TEXT DEFAULT NULL, started_at TIMESTAMP(0) WITH TIME ZONE DEFAULT NULL, consumed_count INT DEFAULT 0 NOT NULL, PRIMARY KEY(id, created_at));
CREATE INDEX IDX_CREATED_AT_TPL ON my_queue_name_template (created_at);
CREATE INDEX IDX_PROCESED_AT_TPL ON my_queue_name_template (consumed_count, processed_at) WHERE (processed_at IS NULL);
  1. we create the partitioned table with the same structure as the template table, but with the partitioning key:
-- DROP the table if it already exists
DROP table IF EXISTS my_queue_name;
-- and let it be created like partman does it. This is the default queue to be used when no partitioned one is matched
CREATE TABLE IF NOT EXISTS my_queue_name
(LIKE my_queue_name_template INCLUDING DEFAULTS INCLUDING CONSTRAINTS INCLUDING INDEXES INCLUDING COMMENTS) PARTITION BY RANGE (created_at);
  1. we instruct partman to create the partitions every day automatically:
SELECT partman.create_parent('my_queue_name', 'created_at', 'native', 'daily', p_template_table := 'my_queue_name_template';
  1. we configure partman how to rotate the tables setting the 14 days retention period:
UPDATE partman.part_config
SET infinite_time_partitions = true,
    retention = '14 days',
    retention_keep_table = false,
    retention_keep_index = false
WHERE parent_table = 'my_queue_name';

Contribution

We are open to any contribution to the pgq package, but since we use it in our production environment, we have to be very careful about the changes. We don't need to add any new features, but we are open to any bug fixes, performance improvements, and documentation enhancements.

Run integration tests

The unit tests will run without any additional setup, but the integration tests require the running postgres instance, otherwise are skipped.

In one shell start the postgres docker container:

docker run --rm -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres:15-alpine

In another shell run the tests:

TEST_POSTGRES_DSN=postgres://postgres:postgres@localhost:5432/postgres go test ./...

Documentation

Index

Examples

Constants

View Source
const (
	// MessageProcessed signals that message was processed and shouldn't be processed
	// again. If processed with an error, it is expected permanent, and new run would
	// result in the same error.
	MessageProcessed = true
	// MessageNotProcessed signals that message wasn't processed and can be processed
	// again. The error interrupting processing is considered temporary.
	MessageNotProcessed = false
)

Variables

This section is empty.

Functions

func StaticMetaInjector

func StaticMetaInjector(m Metadata) func(context.Context, Metadata)

StaticMetaInjector returns a Metadata injector that injects given Metadata.

func ValidateFields

func ValidateFields(ctx context.Context, db *sqlx.DB, queueName string) error

ValidateFields checks if required fields exist

func ValidateIndexes

func ValidateIndexes(ctx context.Context, db *sqlx.DB, queueName string) error

ValidateIndexes checks if required indexes exist

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is the preconfigured subscriber of the write input messages

Example
package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"os"
	"os/signal"

	"go.dataddo.com/pgq"
)

type Handler struct{}

func (h *Handler) HandleMessage(ctx context.Context, msg *pgq.MessageIncoming) (res bool, err error) {
	defer func() {
		r := recover()
		if r == nil {
			return
		}
		log.Println("Recovered in 'Handler.HandleMessage()'", r)
		// nack the message, it will be retried
		res = pgq.MessageNotProcessed
		if e, ok := r.(error); ok {
			err = e
		} else {
			err = fmt.Errorf("%v", r)
		}
	}()
	if msg.Metadata["heaviness"] == "heavy" {
		// nack the message, it will be retried
		// Message won't contain error detail in the database.
		return pgq.MessageNotProcessed, nil
	}
	var myPayload struct {
		Foo string `json:"foo"`
	}
	if err := json.Unmarshal(msg.Payload, &myPayload); err != nil {
		// discard the message, it will not be retried
		// Message will contain error detail in the database.
		return pgq.MessageProcessed, fmt.Errorf("invalid payload: %v", err)
	}
	// doSomethingWithThePayload(ctx, myPayload)
	return pgq.MessageProcessed, nil
}

func main() {
	db, err := sql.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres")
	if err != nil {
		log.Fatal("Error opening database:", err)
	}
	defer db.Close()
	const queueName = "test_queue"
	c, err := pgq.NewConsumer(db, queueName, &Handler{})
	if err != nil {
		log.Fatal("Error creating consumer:", err)
	}
	ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
	if err := c.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
		log.Fatal("Error running consumer:", err)
	}
}
Output:

func NewConsumer

func NewConsumer(db *sql.DB, queueName string, handler MessageHandler, opts ...ConsumerOption) (*Consumer, error)

NewConsumer creates Consumer with proper settings

Example
slogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
c, err := pgq.NewConsumer(db, "queue_name", &Handler{},
	pgq.WithLockDuration(10*time.Minute),
	pgq.WithPollingInterval(500*time.Millisecond),
	pgq.WithAckTimeout(5*time.Second),
	pgq.WithMessageProcessingReserveDuration(5*time.Second),
	pgq.WithMaxParallelMessages(42),
	pgq.WithMetrics(noop.Meter{}),
	pgq.WithHistoryLimit(24*time.Hour),
	pgq.WithLogger(slogger),
	pgq.WithInvalidMessageCallback(func(ctx context.Context, msg pgq.InvalidMessage, err error) {
		// message Payload and/or Metadata are not JSON object.
		// The message will be discarded.
		slogger.Warn("invalid message",
			"error", err,
			"msg.id", msg.ID,
		)
	}),
)
_, _ = c, err
Output:

func NewConsumerExt

func NewConsumerExt(db *sqlx.DB, queueName string, handler MessageHandler, opts ...ConsumerOption) (*Consumer, error)

NewConsumer creates Consumer with proper settings, using sqlx.DB (until refactored to use pgx directly)

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context) error

Run consumes messages until the context isn't cancelled.

type ConsumerOption

type ConsumerOption func(c *consumerConfig)

ConsumerOption applies option to consumerConfig.

func WithAckTimeout

func WithAckTimeout(d time.Duration) ConsumerOption

WithAckTimeout sets the timeout for updating the message status when message is processed.

func WithHistoryLimit

func WithHistoryLimit(d time.Duration) ConsumerOption

WithHistoryLimit sets how long in history you want to search for unprocessed messages (default is no limit). If not set, it will look for message in the whole table. You may set this value when using partitioned table to search just in partitions you are interested in.

func WithInvalidMessageCallback

func WithInvalidMessageCallback(fn InvalidMessageCallback) ConsumerOption

WithInvalidMessageCallback sets callback for invalid messages.

func WithLockDuration

func WithLockDuration(d time.Duration) ConsumerOption

WithLockDuration sets the maximal duration for how long the message remains locked for other consumers.

func WithLogger

func WithLogger(logger *slog.Logger) ConsumerOption

WithLogger sets logger. Default is no logging.

func WithMaxConsumeCount

func WithMaxConsumeCount(max uint) ConsumerOption

WithMaxConsumeCount sets the maximal number of times a message can be consumed before it is ignored. Unhandled SIGKILL or uncaught panic, OOM error etc. could lead to consumer failure infinite loop. Setting this value to greater than 0 will prevent happening this loop. Setting this to value 0 disableS this safe mechanism.

func WithMaxParallelMessages

func WithMaxParallelMessages(n int) ConsumerOption

WithMaxParallelMessages sets how many jobs can single consumer process simultaneously.

func WithMessageProcessingReserveDuration

func WithMessageProcessingReserveDuration(d time.Duration) ConsumerOption

WithMessageProcessingReserveDuration sets the duration for which the message is reserved for handling result state.

func WithMetadataFilter

func WithMetadataFilter(filter *MetadataFilter) ConsumerOption

func WithMetrics

func WithMetrics(m metric.Meter) ConsumerOption

WithMetrics sets metrics meter. Default is noop.Meter{}.

func WithPollingInterval

func WithPollingInterval(d time.Duration) ConsumerOption

WithPollingInterval sets how frequently consumer checks the queue for new messages.

type InvalidMessage

type InvalidMessage struct {
	ID       string
	Metadata json.RawMessage
	Payload  json.RawMessage
}

InvalidMessage is definition of invalid message used as argument for InvalidMessageCallback by Consumer.

type InvalidMessageCallback

type InvalidMessageCallback func(ctx context.Context, msg InvalidMessage, err error)

InvalidMessageCallback defines what should happen to messages which are identified as invalid. Such messages usually have missing or malformed required fields.

type MessageHandler

type MessageHandler interface {
	HandleMessage(context.Context, *MessageIncoming) (processed bool, err error)
}

MessageHandler handles message received from queue. Returning false means message wasn't processed correctly and shouldn't be acknowledged. Error contains additional information.

Possible combinations:

// | processed | err        | description                                          |
// | --------- | ---------- | ---------------------------------------------------- |
// | false     | <nil>      | missing failure info, but the message can be retried |
// | false     | some error | not processed, because of some error, can be retried |
// | true      | <nil>      | processed, no error.                                 |
// | true      | some error | processed, ended with error. Don't retry!            |

type MessageHandlerFunc

type MessageHandlerFunc func(context.Context, *MessageIncoming) (processed bool, err error)

MessageHandlerFunc is MessageHandler implementation by simple function.

func (MessageHandlerFunc) HandleMessage

func (fn MessageHandlerFunc) HandleMessage(ctx context.Context, msg *MessageIncoming) (processed bool, err error)

HandleMessage calls self. It also implements MessageHandler interface.

type MessageIncoming

type MessageIncoming struct {

	// Metadata contains the message Metadata.
	Metadata Metadata
	// Payload is the message's Payload.
	Payload json.RawMessage
	// Attempt number, counts from 1. It is incremented every time the message is
	// consumed.
	Attempt int
	// Deadline is the time when the message will be returned to the queue if not
	// finished. It is set by the queue when the message is consumed.
	Deadline time.Time
	// contains filtered or unexported fields
}

MessageIncoming is a record retrieved from table queue in Postgres

func NewMessage

func NewMessage(meta Metadata, payload json.RawMessage, attempt int, maxConsumedCount uint) *MessageIncoming

NewMessage creates new message that satisfies Message interface.

func (*MessageIncoming) LastAttempt

func (m *MessageIncoming) LastAttempt() bool

LastAttempt returns true if the message is consumed for the last time according to maxConsumedCount settings. If the Consumer is not configured to limit the number of attempts setting WithMaxConsumeCount to zero, it always returns false.

type MessageOutgoing

type MessageOutgoing struct {
	// ScheduledFor is the time when the message should be processed. If nil, the messages
	// gets processed immediately.
	ScheduledFor *time.Time `db:"scheduled_for"`
	// Payload is the message's Payload.
	Payload json.RawMessage `db:"payload"`
	// Metadata contains the message Metadata.
	Metadata Metadata `db:"metadata"`
}

MessageOutgoing is a record to be inserted into table queue in Postgres

type Metadata

type Metadata map[string]string

Metadata is message Metadata definition.

type MetadataFilter

type MetadataFilter struct {
	Key       string
	Operation MetadataOperation
	Value     string
}

MetadataFilter is a filter for metadata. Right now support only direct matching of key/value

type MetadataOperation

type MetadataOperation string

MetadataFilter is a filter for metadata. Right now support only direct matching of key/value

const (
	OpEqual    MetadataOperation = "="
	OpNotEqual MetadataOperation = "<>"
)

type Publisher

type Publisher interface {
	Publish(ctx context.Context, queue string, msg ...*MessageOutgoing) ([]uuid.UUID, error)
}

Publisher publishes messages to Postgres queue.

Example
package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"log"
	"time"

	"go.dataddo.com/pgq"
)

type PayloadStruct struct {
	Foo string `json:"foo"`
}

func main() {
	db, err := sql.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres")
	if err != nil {
		log.Fatal("Error opening database:", err)
	}
	defer db.Close()
	const queueName = "test_queue"
	p := pgq.NewPublisher(db)
	payload, _ := json.Marshal(PayloadStruct{Foo: "bar"})
	messages := []*pgq.MessageOutgoing{
		{
			Metadata: pgq.Metadata{
				"version": "1.0",
			},
			Payload: json.RawMessage(payload),
		},
		{
			Metadata: pgq.Metadata{
				"version": "1.0",
			},
			Payload: json.RawMessage(payload),
		},
		{
			Metadata: pgq.Metadata{
				"version": "1.0",
			},
			Payload: json.RawMessage(payload),
		},
	}
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	ids, err := p.Publish(ctx, queueName, messages...)
	if err != nil {
		log.Fatal("Error publishing message:", err)
	}
	log.Println("Published messages with ids:", ids)
}
Output:

func NewPublisher

func NewPublisher(db *sql.DB, opts ...PublisherOption) Publisher

NewPublisher initializes the publisher with given *sql.DB client.

Example
package main

import (
	"database/sql"
	"os"

	"go.dataddo.com/pgq"
)

var db *sql.DB

func main() {
	hostname, _ := os.Hostname()
	p := pgq.NewPublisher(db,
		pgq.WithMetaInjectors(
			pgq.StaticMetaInjector(pgq.Metadata{"publisher-id": hostname}),
		),
	)
	_ = p
}
Output:

func NewPublisherExt

func NewPublisherExt(db *sqlx.DB, opts ...PublisherOption) Publisher

NewPublisher initializes the publisher with given *sqlx.DB client

type PublisherOption

type PublisherOption func(*publisherConfig)

PublisherOption configures the publisher. Multiple options can be passed to NewPublisher. Options are applied in the order they are given. The last option overrides any previous ones. If no options are passed to NewPublisher, the default values are used.

func WithMetaInjectors

func WithMetaInjectors(injectors ...func(context.Context, Metadata)) PublisherOption

WithMetaInjectors adds Metadata injectors to the publisher. Injectors are run in the order they are given.

Directories

Path Synopsis
internal
pg
require
Package require is a minimal alternative to github.com/stretchr/testify/require.
Package require is a minimal alternative to github.com/stretchr/testify/require.
x
schema
Package schema is a place where to put general functions and constants relevant to the postgres table schema and pgq setup
Package schema is a place where to put general functions and constants relevant to the postgres table schema and pgq setup

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL