pgxlisten

package module
v0.0.0-...-4e78fc3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2024 License: MIT Imports: 7 Imported by: 0

README

pgx listener

Documentation

Overview

Example
// create a new listener
listen := pgxlisten.New(
	// channel names to listen on
	[]string{"listen1", "listen2", "listen3"},
	// notification handler function
	func(payload *pgconn.Notification) {
		slog.Info("notification",
			slog.String("channel", payload.Channel),
			slog.String("payload", string(payload.Payload)))
	},
	// after connect logger
	pgxlisten.WithAfterConnect(func(_ context.Context, conn *pgx.Conn) error {
		slog.Info("listener connected")
		return nil
	}),
	// connection error logger
	pgxlisten.WithOnError(func(err error) {
		slog.Error("listener", slog.String("error", err.Error()))
	}),
	// reconnect delay (default 5 sec)
	pgxlisten.WithReconnectDelay(time.Second),
)

ctx, done := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer done()

const dsn = "postgres://localhost:5432/test?sslmode=disable"

// connect and start listening; auto restart on error
if err := listen.Run(ctx, dsn); err != nil {
	slog.Error("listen", slog.String("error", err.Error()))
}
Output:

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Listener

type Listener struct {
	// contains filtered or unexported fields
}

Listener represents a PostgreSQL notification listener.

func New

func New(channels []string, handler func(payload *pgconn.Notification), opts ...Option) *Listener

New creates a new Listener with the provided channels and handler. It panics if channels is empty or handler is nil.

func (*Listener) Listen

func (l *Listener) Listen(ctx context.Context, conn *pgx.Conn) error

Listen starts listening for notifications on the provided channels. It will stop listening when the provided context is canceled or an error occurs.

func (*Listener) Run

func (l *Listener) Run(ctx context.Context, dsn string) error

Run starts listening for notifications on the provided dsn. It will reconnect after the provided delay. It will stop listening only when the provided context is canceled.

func (*Listener) RunFromPool

func (l *Listener) RunFromPool(ctx context.Context, pool *pgxpool.Pool) error

RunFromPool starts listening for notifications on the provided pool connection. It will stop listening when the provided context is canceled or an error occurs. It will automatically reconnect after the provided delay.

type Option

type Option func(*Listener)

Option represents an option for the Listen function.

func WithAfterConnect

func WithAfterConnect(afterConnect func(context.Context, *pgx.Conn) error) Option

WithAfterConnect adds a function that will be called after the connection is established. It can be used to perform additional setup. If it returns an error, the connection will be closed and listening will be stopped.

func WithOnError

func WithOnError(onError func(error)) Option

WithOnError add a function that will be called when a connection error occurs during listening. It can be used to handle connection errors during listening.

func WithReconnectDelay

func WithReconnectDelay(delay time.Duration) Option

WithReconnectDelay sets the delay between reconnect attempts. The default delay is 5 seconds.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL