drivers

package
v0.1.10-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2025 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Driver

type Driver interface {
	WithTx(ctx context.Context, fn func(tx Transaction) error) error
	// Basic operations
	Exec(ctx context.Context, sql string, args ...interface{}) error
	Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) Row

	// Postgres-specific operations
	Listen(ctx context.Context, channel string) error
	Notify(ctx context.Context, channel string, payload string) error
	// New method to handle external transactions
	AddJobWithTx(ctx context.Context, tx interface{}) (Transaction, error)
	WaitForNotification(ctx context.Context) (*Notification, error)
}

Core database operations needed for the job queue

func NewPgxDriver

func NewPgxDriver(pool interface{}) (Driver, error)

NewPgxDriver creates a new pgx-based driver implementation for PostgreSQL. It uses pgx's native connection pool for better performance and features like automatic connection recovery, statement caching, and native LISTEN/NOTIFY support.

Parameters:

  • pool: A *pgxpool.Pool instance. Must be initialized and connected. The pool handles connection lifecycle and maintains a connection pool for optimal performance.

Returns:

  • Driver: The database driver implementation
  • error: Non-nil if the pool is nil or of wrong type

Example:

config, _ := pgxpool.ParseConfig("postgres://localhost:5432/myapp")
pool, _ := pgxpool.NewWithConfig(context.Background(), config)
driver, err := NewPgxDriver(pool)

func NewSQLDriver

func NewSQLDriver(db *sql.DB, connStr string) (Driver, error)

NewSQLDriver creates a new database/sql driver implementation for PostgreSQL. It requires both a database connection and the original connection string because the lib/pq notification listener needs the connection string for establishing its own connection.

Parameters:

  • db: An initialized *sql.DB connection pool
  • connStr: The PostgreSQL connection string (e.g., "postgres://user:pass@localhost:5432/dbname")

Returns:

  • Driver: The database driver implementation
  • error: Non-nil if the database connection is nil

Example:

db, _ := sql.Open("postgres", "postgres://localhost:5432/myapp")
driver, err := NewSQLDriver(db, "postgres://localhost:5432/myapp")

type Notification

type Notification struct {
	Channel string
	Payload string
}

Notification represents a PostgreSQL notification

type PgxDriver

type PgxDriver struct {
	// contains filtered or unexported fields
}

func (*PgxDriver) AddJobWithTx

func (d *PgxDriver) AddJobWithTx(ctx context.Context, tx interface{}) (Transaction, error)

AddJobWithTx accepts an external pgx transaction and wraps it in our Transaction interface

func (*PgxDriver) Exec

func (d *PgxDriver) Exec(ctx context.Context, sql string, args ...interface{}) error

func (*PgxDriver) Listen

func (d *PgxDriver) Listen(ctx context.Context, channel string) error

func (*PgxDriver) Notify

func (d *PgxDriver) Notify(ctx context.Context, channel string, payload string) error

func (*PgxDriver) Query

func (d *PgxDriver) Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)

func (*PgxDriver) QueryRow

func (d *PgxDriver) QueryRow(ctx context.Context, sql string, args ...interface{}) Row

func (*PgxDriver) WaitForNotification

func (d *PgxDriver) WaitForNotification(ctx context.Context) (*Notification, error)

WaitForNotification waits for a notification on any channel this connection is listening on

func (*PgxDriver) WithTx

func (d *PgxDriver) WithTx(ctx context.Context, fn func(tx Transaction) error) error

type PgxTx

type PgxTx interface {
	Exec(ctx context.Context, sql string, args ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) pgx.Row
}

PgxTx represents a pgx transaction that can be passed in

type Row

type Row interface {
	Scan(dest ...interface{}) error
}

Row/Rows interfaces (minimal required functionality)

type Rows

type Rows interface {
	Next() bool
	Scan(dest ...interface{}) error
	Close() error
}

type SQLDriver

type SQLDriver struct {
	// contains filtered or unexported fields
}

func (*SQLDriver) AddJobWithTx

func (d *SQLDriver) AddJobWithTx(ctx context.Context, tx interface{}) (Transaction, error)

AddJobWithTx accepts an external database/sql transaction and wraps it in our Transaction interface

func (*SQLDriver) Exec

func (d *SQLDriver) Exec(ctx context.Context, sql string, args ...interface{}) error

func (*SQLDriver) Listen

func (d *SQLDriver) Listen(ctx context.Context, channel string) error

func (*SQLDriver) Notify

func (d *SQLDriver) Notify(ctx context.Context, channel string, payload string) error

func (*SQLDriver) Query

func (d *SQLDriver) Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)

func (*SQLDriver) QueryRow

func (d *SQLDriver) QueryRow(ctx context.Context, sql string, args ...interface{}) Row

func (*SQLDriver) WaitForNotification

func (d *SQLDriver) WaitForNotification(ctx context.Context) (*Notification, error)

WaitForNotification waits for a notification on any channel this connection is listening on

func (*SQLDriver) WithTx

func (d *SQLDriver) WithTx(ctx context.Context, fn func(tx Transaction) error) error

type SQLTx

type SQLTx interface {
	ExecContext(ctx context.Context, sql string, args ...interface{}) (sql.Result, error)
	QueryContext(ctx context.Context, sql string, args ...interface{}) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, sql string, args ...interface{}) *sql.Row
}

SQLTx represents a database/sql transaction that can be passed in

type Transaction

type Transaction interface {
	Exec(ctx context.Context, sql string, args ...interface{}) error
	Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)
	QueryRow(ctx context.Context, sql string, args ...interface{}) Row
}

Transaction represents our internal transaction interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL