database

package
v0.0.0-...-642d756 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 21, 2024 License: BSD-3-Clause Imports: 29 Imported by: 0

Documentation

Overview

Package database adds some useful functionality to a sql.DB. It is independent of the database driver and the DB schema.

Index

Constants

View Source
const OnConflictDoNothing = "ON CONFLICT DO NOTHING"

Variables

View Source
var QueryLoggingDisabled bool

QueryLoggingDisabled stops logging of queries when true. For use in tests only: not concurrency-safe.

Functions

func Collect1

func Collect1[T any](ctx context.Context, db *DB, query string, args ...any) (ts []T, err error)

Collect1 runs the query, which must select for a single column that can be scanned into a value of type T, and returns a slice of the resulting values.

func CollectStructPtrs

func CollectStructPtrs[T any](ctx context.Context, db *DB, query string, args ...any) ([]*T, error)

func CollectStructs

func CollectStructs[T any](ctx context.Context, db *DB, query string, args ...any) ([]T, error)

CollectStructs scans the rows from the query into structs and returns a slice of them. Example:

type Player struct { Name string; Score int }
var players []Player
err := db.CollectStructs(ctx, &players, "SELECT name, score FROM players")

func ConnectAndExecute

func ConnectAndExecute(uri string, dbFunc func(*sql.DB) error) (outerErr error)

ConnectAndExecute connects to the postgres database specified by uri and executes dbFunc, then cleans up the database connection. It returns an error that Is derrors.NotFound if no connection could be made.

func CopyFromChan

func CopyFromChan(c <-chan RowItem) pgx.CopyFromSource

CopyFromChan returns a CopyFromSource that gets its rows from a channel.

func CreateDB

func CreateDB(dbName string) error

CreateDB creates a new database dbName.

func CreateDBIfNotExists

func CreateDBIfNotExists(dbName string) error

CreateDBIfNotExists checks whether the given dbName is an existing database, and creates one if not.

func DBConnURI

func DBConnURI(dbName string) string

DBConnURI generates a postgres connection string in URI format. This is necessary as migrate expects a URI.

func DropDB

func DropDB(dbName string) error

DropDB drops the database named dbName.

func NullIsEmpty

func NullIsEmpty(s *string) sql.Scanner

NullIsEmpty returns a sql.Scanner that writes the empty string to s if the sql.Value is NULL.

func NullPtr

func NullPtr(p any) nullPtr

NullPtr is for scanning nullable database columns into pointer variables or fields. When given a pointer to a pointer to some type T, it returns a value that can be passed to a Scan function. If the corresponding column is nil, the variable will be set to nil. Otherwise, it will be set to a newly allocated pointer to the column value.

func RegisterOCWrapper

func RegisterOCWrapper(driverName string, opts ...ocsql.TraceOption) (string, error)

RegisterOCWrapper registers a driver that wraps the OpenCensus driver, which in turn wraps the driver named as the first argument.

func ResetDB

func ResetDB(ctx context.Context, db *DB) error

ResetDB truncates all data from the given test DB. It should be called after every test that mutates the database.

func StructScanner

func StructScanner[T any]() func(p *T) []any

StructScanner returns a function that, when called on a struct pointer of its argument type, returns a slice of arguments suitable for Row.Scan or Rows.Scan. The call to either Scan will populate the exported fields of the struct in the order they appear in the type definition.

StructScanner panics if p is not a struct or a pointer to a struct. The function it returns will panic if its argument is not a pointer to a struct.

Example:

type Player struct { Name string; Score int }
playerScanArgs := database.StructScanner(Player{})
err := db.RunQuery(ctx, "SELECT name, score FROM players", func(rows *sql.Rows) error {
    var p Player
    if err := rows.Scan(playerScanArgs(&p)...); err != nil {
        return err
    }
    // use p
    return nil
})

func TryToMigrate

func TryToMigrate(dbName string) (isMigrationError bool, outerErr error)

TryToMigrate attempts to migrate the database named dbName to the latest migration. If this operation fails in the migration step, it returns isMigrationError=true to signal that the database should be recreated.

Types

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB wraps a sql.DB. The methods it exports correspond closely to those of sql.DB. They enhance the original by requiring a context argument, and by logging the query and any resulting errors.

A DB may represent a transaction. If so, its execution and query methods operate within the transaction.

func New

func New(db *sql.DB, instanceID string) *DB

New creates a new DB from a sql.DB.

func Open

func Open(driverName, dbinfo, instanceID string) (_ *DB, err error)

Open creates a new DB for the given connection string.

func (*DB) BulkInsert

func (db *DB) BulkInsert(ctx context.Context, table string, columns []string, values []any, conflictAction string) (err error)

BulkInsert constructs and executes a multi-value insert statement. The query is constructed using the format:

INSERT INTO <table> (<columns>) VALUES (<placeholders-for-each-item-in-values>)

If conflictAction is not empty, it is appended to the statement.

The query is executed using a PREPARE statement with the provided values.

func (*DB) BulkInsertReturning

func (db *DB) BulkInsertReturning(ctx context.Context, table string, columns []string, values []any, conflictAction string, returningColumns []string, scanFunc func(*sql.Rows) error) (err error)

BulkInsertReturning is like BulkInsert, but supports returning values from the INSERT statement. In addition to the arguments of BulkInsert, it takes a list of columns to return and a function to scan those columns. To get the returned values, provide a function that scans them as if they were the selected columns of a query. See TestBulkInsert for an example.

func (*DB) BulkUpdate

func (db *DB) BulkUpdate(ctx context.Context, table string, columns, types []string, values [][]any) (err error)

BulkUpdate executes multiple UPDATE statements in a transaction.

Columns must contain the names of some of table's columns. The first is treated as a key; that is, the values to update are matched with existing rows by comparing the values of the first column.

Types holds the database type of each column. For example,

[]string{"INT", "TEXT"}

Values contains one slice of values per column. (Note that this is unlike BulkInsert, which takes a single slice of interleaved values.)

func (*DB) BulkUpsert

func (db *DB) BulkUpsert(ctx context.Context, table string, columns []string, values []any, conflictColumns []string) error

BulkUpsert is like BulkInsert, but instead of a conflict action, a list of conflicting columns is provided. An "ON CONFLICT (conflict_columns) DO UPDATE" clause is added to the statement, with assignments "c=excluded.c" for every column c.

func (*DB) BulkUpsertReturning

func (db *DB) BulkUpsertReturning(ctx context.Context, table string, columns []string, values []any, conflictColumns, returningColumns []string, scanFunc func(*sql.Rows) error) error

BulkUpsertReturning is like BulkInsertReturning, but performs an upsert like BulkUpsert.

func (*DB) Close

func (db *DB) Close() error

Close closes the database connection.

func (*DB) CopyInsert

func (db *DB) CopyInsert(ctx context.Context, table string, columns []string, src pgx.CopyFromSource, dropColumn string) (err error)

CopyInsert insert rows into table using the pgx driver's CopyFrom method. It returns an error if the underlying driver is not pgx. columns is the list of columns to upsert. src is the source of the rows to upsert. If dropColumn is non-empty, that column will be dropped from the temporary table before copying. Use dropColumn for generated ID columns.

CopyInsert works by first creating a temporary table, populating it with CopyFrom, and then running an INSERT...SELECT... to insert its rows into the original table.

func (*DB) CopyUpsert

func (db *DB) CopyUpsert(ctx context.Context, table string, columns []string, src pgx.CopyFromSource, conflictColumns []string, dropColumn string) (err error)

CopyUpsert upserts rows into table using the pgx driver's CopyFrom method. It returns an error if the underlying driver is not pgx. columns is the list of columns to upsert. src is the source of the rows to upsert. conflictColumns are the columns that might conflict (i.e. that have a UNIQUE constraint). If dropColumn is non-empty, that column will be dropped from the temporary table before copying. Use dropColumn for generated ID columns.

CopyUpsert works by first creating a temporary table, populating it with CopyFrom, and then running an INSERT...SELECT...ON CONFLICT to upsert its rows into the original table.

func (*DB) Exec

func (db *DB) Exec(ctx context.Context, query string, args ...any) (_ int64, err error)

Exec executes a SQL statement and returns the number of rows it affected.

func (*DB) InTransaction

func (db *DB) InTransaction() bool

func (*DB) IsRetryable

func (db *DB) IsRetryable() bool

func (*DB) MaxRetries

func (db *DB) MaxRetries() int

MaxRetries returns the maximum number of times thata serializable transaction was retried.

func (*DB) Ping

func (db *DB) Ping() error

func (*DB) Prepare

func (db *DB) Prepare(ctx context.Context, query string) (*sql.Stmt, error)

func (*DB) Query

func (db *DB) Query(ctx context.Context, query string, args ...any) (_ *sql.Rows, err error)

Query runs the DB query.

func (*DB) QueryRow

func (db *DB) QueryRow(ctx context.Context, query string, args ...any) *sql.Row

QueryRow runs the query and returns a single row.

func (*DB) RunQuery

func (db *DB) RunQuery(ctx context.Context, query string, f func(*sql.Rows) error, params ...any) error

RunQuery executes query, then calls f on each row. It stops when there are no more rows or f returns a non-nil error.

func (*DB) RunQueryIncrementally

func (db *DB) RunQueryIncrementally(ctx context.Context, query string, batchSize int, f func(*sql.Rows) error, params ...any) (err error)

RunQueryIncrementally executes query, then calls f on each row. It fetches rows in groups of size batchSize. It stops when there are no more rows, or when f returns io.EOF.

func (*DB) Transact

func (db *DB) Transact(ctx context.Context, iso sql.IsolationLevel, txFunc func(*DB) error) (err error)

Transact executes the given function in the context of a SQL transaction at the given isolation level, rolling back the transaction if the function panics or returns an error.

The given function is called with a DB that is associated with a transaction. The DB should be used only inside the function; if it is used to access the database after the function returns, the calls will return errors.

If the isolation level requires it, Transact will retry the transaction upon serialization failure, so txFunc may be called more than once.

func (*DB) WithPGXConn

func (db *DB) WithPGXConn(f func(conn *pgx.Conn) error) error

type MultiErr

type MultiErr []error

MultiErr can be used to combine one or more errors into a single error.

func (MultiErr) Error

func (m MultiErr) Error() string

type RowItem

type RowItem struct {
	Values []any
	Err    error
}

A RowItem is a row of values or an error.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL