pgln

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 19, 2024 License: MIT Imports: 10 Imported by: 3

README

pgln - PostgreSQL Listen/Notify Library

A robust PostgreSQL Listen/Notify library built on top of pgx.

⭐️ Please Star This Project

If you find this project useful, please consider giving it a star ⭐️ on GitHub. It helps others find the project and shows your support!

Table of Contents

Motivation

PostgreSQL's Listen/Notify feature provides a basic pub/sub mechanism. One common use case is simple cache synchronization for downstream services without the need for additional services like Redis or RabbitMQ.

However, when a connection disconnects, notifications are lost as PostgreSQL doesn't store them. This library addresses this issue by providing an out-of-sync BLOCKING callback to rebuild your state while maintaining an active listening connection, ensuring no new notifications are missed.

Use Case

For a detailed explanation of use cases and implementation details, please read our article on LinkedIn: PGLN: PostgreSQL Listen/Notify Please note the examples are more current in this repository, as the code continues to improve.

Installation

go get github.com/tzahifadida/pgln

Features

  • Supports *sql.DB connections using the pgx driver in stdlib mode
  • Automatic reconnection
  • Single connection for multiple Listen channels (Notify operations acquire, use, and release an additional connection)
  • Out-of-sync callback for reconnects, allowing cache rebuilding without losing notifications
  • Support for LISTEN and NOTIFY operations
  • Transaction-safe notify operations with NotifyQuery
  • Safe unlisten operations with completion signaling
  • Blocking callbacks, giving users full control over concurrency management

Important Note on Callbacks

All callbacks provided to this library (NotificationCallback, DoneCallback, ErrorCallback, and OutOfSyncBlockingCallback) are BLOCKING. This means that when a callback is invoked, it will block the library's internal operations until the callback completes.

It is the responsibility of the library user to decide whether to perform operations synchronously within the callback or to use goroutines for concurrent execution. If you need to perform long-running or potentially blocking operations in a callback, consider wrapping the operation in a goroutine to avoid blocking the library's internal processes.

Example of non-blocking callback usage:

NotificationCallback: func(channel string, payload string) {
go func() {
// Perform potentially long-running operations here
processNotification(channel, payload)
}()
},

By making callbacks blocking, this library provides you with full control over concurrency management and the ability to ensure operations are completed before proceeding, if necessary.

Major Methods and Usage

NewPGListenNotifyBuilder()

Creates a new builder for configuring the PGListenNotify instance.

builder := pgln.NewPGListenNotifyBuilder()
Builder Methods
  • SetContext(ctx context.Context): Sets the context for the PGListenNotify instance.
  • SetDB(db *sql.DB): Sets the database connection (must be a *sql.DB using pgx driver).
  • SetReconnectInterval(reconnectInterval time.Duration): Sets the interval for reconnection attempts.
  • SetHealthCheckTimeout(timeout time.Duration): Sets the timeout for health checks.
  • Build(): Builds and returns the PGListenNotify instance.
PGListenNotify Methods
  • Start(): Starts the listening process. Must be called before any Listen operations.
  • Shutdown(): Gracefully shuts down the PGListenNotify instance.
  • Listen(channel string, options ListenOptions) (chan error, error): Starts listening on a channel (non-blocking).
  • ListenAndWaitForListening(channel string, options ListenOptions) error: Starts listening on a channel and waits for it to be ready (blocking).
  • UnListen(channel string) (chan struct{}, error): Stops listening on a channel and returns a channel that will be closed when the unlisten operation is complete.
  • UnlistenAndWaitForUnlistening(channel string) error: Stops listening on a channel and waits until it's completely removed (blocking).
  • Notify(channel string, payload string) error: Sends a notification to a channel.
  • NotifyQuery(channel string, payload string) NotifyQueryResult: Returns a query and parameters for sending a notification within a transaction.

Example Usage

This example demonstrates how to use the pgln library, including the use of NotifyQuery for transaction-safe notifications and proper error handling. It also shows how to use callbacks safely.

package main

import (
	"context"
	"database/sql"
	"fmt"
	"github.com/tzahifadida/pgln"
	_ "github.com/jackc/pgx/v5/stdlib" // pgx driver for database/sql
	"os"
	"strings"
	"time"
)

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()

	connectionString := os.Getenv("PGLN_CONNECTION_STRING")

	// Open a database connection using pgx driver
	db, err := sql.Open("pgx", connectionString)
	if err != nil {
		fmt.Printf("Failed to open database: %v\n", err)
		return
	}
	defer db.Close()

	builder := pgln.NewPGListenNotifyBuilder().
		SetContext(ctx).
		SetReconnectInterval(5 * time.Second).
		SetHealthCheckTimeout(2 * time.Second).
		SetDB(db)

	r, err := builder.Build()
	if err != nil {
		fmt.Printf("Build error: %v\n", err)
		return
	}
	err = r.Start()
	if err != nil {
		fmt.Printf("Start error: %v\n", err)
		return
	}

	defer func() {
		_, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer shutdownCancel()

		err := r.UnlistenAndWaitForUnlistening("pgln_foo")
		if err != nil {
			if err == context.DeadlineExceeded {
				fmt.Println("UnListen timed out")
			} else if err != context.Canceled {
				fmt.Printf("UnListen error: %v\n", err)
			}
		}
		r.Shutdown()
	}()

	notificationReceived := make(chan string, 1)

	err = r.ListenAndWaitForListening("pgln_foo", pgln.ListenOptions{
		NotificationCallback: func(channel string, payload string) {
			// This callback is blocking. For long-running operations, consider using a goroutine:
			go func() {
				fmt.Printf("Notification received: %s - %s\n", channel, payload)
				select {
				case notificationReceived <- payload:
				default:
					fmt.Println("Notification channel full, discarding payload")
				}
			}()
		},
		ErrorCallback: func(channel string, err error) {
			if !strings.Contains(err.Error(), "context canceled") {
				fmt.Printf("Error: %s - %s\n", channel, err)
			}
		},
		OutOfSyncBlockingCallback: func(channel string) error {
			// This callback is intentionally blocking to ensure sync before proceeding
			fmt.Printf("Out-of-sync: %s\n", channel)
			return nil
		},
	})
	if err != nil {
		fmt.Printf("Listen error: %v\n", err)
		return
	}

	// Start a transaction
	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		fmt.Printf("Failed to begin transaction: %v\n", err)
		return
	}
	defer tx.Rollback() // Rollback if not committed

	// Use NotifyQuery to get the notification query
	notifyQuery := r.NotifyQuery("pgln_foo", "Transaction notification")

	// Execute the notification query within the transaction
	_, err = tx.ExecContext(ctx, notifyQuery.Query, notifyQuery.Params...)
	if err != nil {
		fmt.Printf("Failed to execute notify query: %v\n", err)
		return
	}

	// Commit the transaction
	err = tx.Commit()
	if err != nil {
		fmt.Printf("Failed to commit transaction: %v\n", err)
		return
	}

	// Wait for the notification or timeout
	select {
	case payload := <-notificationReceived:
		fmt.Printf("Received notification payload: %s\n", payload)
	case <-time.After(5 * time.Second):
		fmt.Println("Timed out waiting for notification")
	case <-ctx.Done():
		fmt.Println("Context cancelled")
	}
}
Important Note on NotifyQuery

The NotifyQuery method is particularly useful when you need to ensure that a notification is sent only if a transaction is successfully committed. This is because PostgreSQL executes NOTIFY commands at commit time, not at the time they are issued within a transaction.

By using NotifyQuery, you can:

  1. Include the notification as part of a larger transaction.
  2. Ensure that the notification is sent only if all other operations in the transaction succeed.
  3. Avoid sending notifications for operations that may be rolled back.

This makes NotifyQuery ideal for scenarios where you want to notify other parts of your system about changes, but only if those changes are successfully persisted to the database.

Testing

Run tests using the go test command.

For more detailed examples, refer to builder_test.go in the repository.

Status and Support

This library uses the pgx driver in stdlib mode. For issues related to the driver itself, please contact the pgx maintainers.

For questions or issues specific to pgln that are not related to the pgx driver, feel free to open an issue in this repository.

Community contributions and help with reported issues are welcome and encouraged.

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

License

This project is licensed under the MIT License - see the LICENSE file for details.


We hope you find pgln useful for your PostgreSQL Listen/Notify needs. If you have any questions, suggestions, or encounter any issues, please don't hesitate to open an issue or contribute to the project.

Documentation

Overview

Package pgln provides functionality for PostgreSQL LISTEN/NOTIFY operations.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DoneCallbackType

type DoneCallbackType func(channel string)

DoneCallbackType is a function type called when listening is done.

type ErrorCallbackType

type ErrorCallbackType func(channel string, err error)

ErrorCallbackType is a function type for handling errors.

type ListenOptions

type ListenOptions struct {
	// NotificationCallback is an optional BLOCKING callback that is called for each channel when a notification arrives.
	// Blocking means that we leave it up to the callback to decide if everything should wait or to spawn a goroutine.
	NotificationCallback NotificationCallbackType
	// DoneCallback is an optional BLOCKING callback that is called for each channel when we are done listening.
	// Blocking means that we leave it up to the callback to decide if everything should wait or to spawn a goroutine.
	DoneCallback DoneCallbackType

	// ErrorCallback is an optional BLOCKING callback that is called for each channel when there is an unexpected error.
	// Blocking means that we leave it up to the callback to decide if everything should wait or to spawn a goroutine.
	ErrorCallback ErrorCallbackType
	// OutOfSyncBlockingCallback is an optional BLOCKING callback that is called for each channel when we first connect
	// and when we reconnect. It is called just before receiving the first notification.
	// It allows you to catch up or rebuild your caches while we are listening for new notifications, so you do not lose your messages
	// while rebuilding.
	// Blocking means that we leave it up to the callback to decide if everything should wait or to spawn a goroutine.
	OutOfSyncBlockingCallback OutOfSyncBlockingCallbackType
	// contains filtered or unexported fields
}

ListenOptions is used when Listen is called. It also maintains internal state.

type NotificationCallbackType

type NotificationCallbackType func(channel string, payload string)

NotificationCallbackType is a function type for handling notifications.

type NotifyQueryResult added in v0.1.4

type NotifyQueryResult struct {
	Query  string // The SQL query to be executed
	Params []any  // The parameters for the query
}

NotifyQueryResult represents the result of a NotifyQuery call.

type OutOfSyncBlockingCallbackType

type OutOfSyncBlockingCallbackType func(channel string) error

OutOfSyncBlockingCallbackType is a function type for handling out-of-sync situations.

type PGListenNotify

type PGListenNotify struct {
	// contains filtered or unexported fields
}

PGListenNotify handles listening and notifying operations. It is generated by the Builder.

func (*PGListenNotify) Listen

func (r *PGListenNotify) Listen(channel string, options ListenOptions) (chan error, error)

Listen will "listen" to a channel specified in the options. It returns a channel that will receive an error if there's a failure to listen. To stop listening for a channel, use UnListen.

func (*PGListenNotify) ListenAndWaitForListening added in v0.1.4

func (r *PGListenNotify) ListenAndWaitForListening(channel string, options ListenOptions) error

ListenAndWaitForListening combines Listen and waiting for the listening to start. It returns an error if there was a failure in listening or if the context is cancelled.

func (*PGListenNotify) Notify

func (r *PGListenNotify) Notify(channel string, payload string) error

Notify allows you to send a notification to a specified channel with a payload.

func (*PGListenNotify) NotifyQuery added in v0.1.4

func (r *PGListenNotify) NotifyQuery(channel string, payload string) NotifyQueryResult

NotifyQuery returns the query string and parameters for notifying a channel. This allows the caller to run the query themselves in their own transaction. The main reason is that pg_notify is run only if the transaction commits. If the transaction fails, it will not run, which is usually what we want.

func (*PGListenNotify) Shutdown added in v0.1.4

func (r *PGListenNotify) Shutdown()

Shutdown releases resources that were created internally.

func (*PGListenNotify) Start added in v0.1.4

func (r *PGListenNotify) Start() error

Start begins the monitoring process. It should be called after Build().

func (*PGListenNotify) UnListen

func (r *PGListenNotify) UnListen(channel string) (chan struct{}, error)

UnListen stops listening for the specified channel. It returns a channel that will be closed when the unlisten operation is complete.

func (*PGListenNotify) UnlistenAndWaitForUnlistening added in v0.1.6

func (r *PGListenNotify) UnlistenAndWaitForUnlistening(channel string) error

UnlistenAndWaitForUnlistening stops listening for the specified channel and waits until it's completely removed. This function is important for safe shutdown as it ensures that no more notifications will be processed for this channel.

type PGListenNotifyBuilder

type PGListenNotifyBuilder interface {
	// SetContext allows you to set your own custom context, so we can stop listening when the context is done.
	SetContext(ctx context.Context) PGListenNotifyBuilder

	// SetDB allows you to set your own sql.DB connection.
	SetDB(db *sql.DB) PGListenNotifyBuilder

	// SetReconnectInterval sets the interval in milliseconds to wait between reconnection attempts.
	SetReconnectInterval(reconnectInterval time.Duration) PGListenNotifyBuilder

	// SetHealthCheckTimeout sets the timeout for health check operations.
	SetHealthCheckTimeout(timeout time.Duration) PGListenNotifyBuilder

	// Build is the final call to construct the PGListenNotify structure.
	Build() (*PGListenNotify, error)
}

PGListenNotifyBuilder builds a PGListenNotify structure you can use to listen for new notifications.

func NewPGListenNotifyBuilder

func NewPGListenNotifyBuilder() PGListenNotifyBuilder

NewPGListenNotifyBuilder creates a new PGListenNotifyBuilder to configure the listener.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL