pgnotifier

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2024 License: MIT Imports: 4 Imported by: 0

README

pgnotifier

The pgnotifier package provides a Go interface for handling PostgreSQL notifications. It offers a simple way to subscribe to, receive, and manage notifications from a PostgreSQL database. This package supports both blocking and non-blocking modes for notification handling.

Features

  • Subscribe to Notifications: Easily listen to notifications on specific channels.
  • Real-Time Handling: Receive notifications synchronously (blocking) or asynchronously (non-blocking).
  • Graceful Shutdown: Cleanly stop notifications and release resources.
  • Error Handling: Receive errors related to notifications through dedicated channels.

Installation

To install the pgnotifier package, run:

go get github.com/Siva-0310/pgnotifier

Usage

Create a New Notifier

First, set up a PostgreSQL connection pool and create a new notifier instance.

import (
    "context"
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/Siva-0310/pgnotifier"
)

func main() {
    ctx := context.Background()

    // Set up PostgreSQL connection pool
    pool, err := pgxpool.New(ctx, "postgres://user:password@localhost:5432/alerts")
    if err != nil {
        panic(err)
    }
    defer pool.Close()

    // Create a new notifier
    notifier, err := pgnotifier.New(ctx, pool)
    if err != nil {
        panic(err)
    }
    defer notifier.Close(ctx, "test_channel")
}
Subscribe to Notifications

To start listening to notifications on a specific topic:

err := notifier.Listen(ctx, "test_channel")
if err != nil {
    panic(err)
}
Receiving Notifications

Blocking Mode

Blocks until a notification is received or the context is canceled.

err := notifier.Blocking(ctx)
if err != nil {
    panic(err)
}

Non-Blocking Mode

Starts a goroutine to listen for notifications without blocking the main thread.

notifier.NonBlocking(ctx)

select {
case notification := <-notifier.NotificationChannel():
    fmt.Println("Received notification:", notification.Payload)
case err := <-notifier.ErrorChannel():
    fmt.Println("Error:", err)
}
Waiting for a Notification

Waits for a single notification, blocking until one is received or an error occurs.

notification, err := notifier.Wait(ctx)
if err != nil {
    panic(err)
}
fmt.Println("Received notification:", notification.Payload)
Unsubscribing from Notifications

To stop listening to notifications on a specific topic:

err := notifier.UnListen(ctx, "test_channel")
if err != nil {
    panic(err)
}
Closing the Notifier

Closes the notifier, unsubscribing from all topics and releasing resources.

err := notifier.Close(ctx, "test_channel")
if err != nil {
    panic(err)
}

Interface Methods

Notifier
  • Listen(ctx context.Context, topic string) error: Subscribes to notifications on the specified topic.
  • UnListen(ctx context.Context, topic string) error: Unsubscribes from notifications on the specified topic.
  • Blocking(ctx context.Context) error: Blocks until a notification is received or the context is canceled.
  • NonBlocking(ctx context.Context): Starts a goroutine for non-blocking notification listening.
  • Wait(ctx context.Context) (*pgconn.Notification, error): Waits for a single notification.
  • NotificationChannel() chan *pgconn.Notification: Returns a channel for receiving notifications.
  • ErrorChannel() chan error: Returns a channel for receiving errors.
  • StopBlocking(): Stops the blocking operation.
  • Close(ctx context.Context, topic string) error: Closes the notifier, releasing resources.

Example

Here's a complete example demonstrating how to use the pgnotifier package:

package main

import (
    "context"
    "fmt"
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/Siva-0310/pgnotifier"
    "log"
)

func main() {
    ctx := context.Background()

    // Set up PostgreSQL connection pool
    pool, err := pgxpool.New(ctx, "postgres://user:password@localhost:5432/alerts")
    if err != nil {
        log.Fatalf("Unable to connect to database: %v", err)
    }
    defer pool.Close()

    // Create a new notifier
    notifier, err := pgnotifier.New(ctx, pool)
    if err != nil {
        log.Fatalf("Failed to create notifier: %v", err)
    }
    defer notifier.Close(ctx, "test_channel")

    // Subscribe to a notification topic
    err = notifier.Listen(ctx, "test_channel")
    if err != nil {
        log.Fatalf("Failed to listen to test_channel: %v", err)
    }

    // Start non-blocking notification listening
    notifier.NonBlocking(ctx)

    // Send a notification
    _, err = pool.Exec(ctx, "NOTIFY test_channel, 'test_payload'")
    if err != nil {
        log.Fatalf("Sending notification failed: %v", err)
    }

    // Wait for the notification or an error
    select {
    case notification := <-notifier.NotificationChannel():
        fmt.Println("Notification received:", notification.Payload)
    case err := <-notifier.ErrorChannel():
        log.Fatalf("Received error: %v", err)
    case <-ctx.Done():
        log.Fatalf("Timed out waiting for notification")
    }

    // Clean up
    err = notifier.UnListen(ctx, "test_channel")
    if err != nil {
        log.Fatalf("UnListen failed: %v", err)
    }
}

Contributing

Contributions are welcome! Please open an issue or submit a pull request on GitHub.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Notifier

type Notifier interface {
	// Listen subscribes to notifications on the specified topic.
	// It begins listening for notifications on the given topic.
	// Returns an error if the subscription fails.
	Listen(ctx context.Context, topic string) error

	// UnListen unsubscribes from notifications on the specified topic.
	// It stops listening for notifications on the given topic.
	// Returns an error if the unsubscription fails.
	UnListen(ctx context.Context, topic string) error

	// Blocking blocks until a notification is received or the context is canceled.
	// Notifications are sent to the notifierChannel, and the method returns
	// an error if one occurs or the context is canceled.
	Blocking(ctx context.Context) error

	// NonBlocking starts a goroutine that listens for notifications in a non-blocking manner.
	// It uses the Blocking method internally and sends any encountered errors to the errorChannel.
	// The method returns immediately after starting the goroutine.
	NonBlocking(ctx context.Context)

	// Wait waits for a single notification.
	// It blocks until a notification is received or an error occurs.
	// Returns the notification or an error if one occurs.
	Wait(ctx context.Context) (*pgconn.Notification, error)

	// NotificationChannel returns a channel for receiving notifications.
	// Notifications are sent to this channel by the Blocking or NonBlocking methods.
	NotificationChannel() chan *pgconn.Notification

	// ErrorChannel returns a channel for receiving errors.
	// Errors encountered during notification handling are sent to this channel.
	ErrorChannel() chan error

	// StopBlocking stops the blocking operation initiated by the Blocking method.
	// It signals cancellation to stop the blocking operation.
	StopBlocking()

	// Close closes the notifier, unsubscribing from all topics and releasing resources.
	// It signals cancellation to stop any running blocking operations, closes channels,
	// and releases the connection back to the pool.
	// Returns an error if any issues occur during the closure process.
	Close(ctx context.Context, topic string) error
}

Notifier defines the interface for handling PostgreSQL notifications.

func New

func New(ctx context.Context, pool *pgxpool.Pool) (Notifier, error)

New creates a new notifier instance using the provided connection pool. It acquires a connection from the pool and initializes the notifier instance. Returns the notifier instance and an error if the connection acquisition fails.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL