Documentation ¶
Overview ¶
Package database adds some useful functionality to a sql.DB. It is independent of the database driver and the DB schema.
Index ¶
- Constants
- Variables
- func Collect1[T any](ctx context.Context, db *DB, query string, args ...any) (ts []T, err error)
- func CollectStructPtrs[T any](ctx context.Context, db *DB, query string, args ...any) ([]*T, error)
- func CollectStructs[T any](ctx context.Context, db *DB, query string, args ...any) ([]T, error)
- func ConnectAndExecute(uri string, dbFunc func(*sql.DB) error) (outerErr error)
- func CopyFromChan(c <-chan RowItem) pgx.CopyFromSource
- func CreateDB(dbName string) error
- func CreateDBIfNotExists(dbName string) error
- func DBConnURI(dbName string) string
- func DropDB(dbName string) error
- func NullIsEmpty(s *string) sql.Scanner
- func NullPtr(p any) nullPtr
- func RegisterOCWrapper(driverName string, opts ...ocsql.TraceOption) (string, error)
- func ResetDB(ctx context.Context, db *DB) error
- func StructScanner[T any]() func(p *T) []any
- func TryToMigrate(dbName string) (isMigrationError bool, outerErr error)
- type DB
- func (db *DB) BulkInsert(ctx context.Context, table string, columns []string, values []any, ...) (err error)
- func (db *DB) BulkInsertReturning(ctx context.Context, table string, columns []string, values []any, ...) (err error)
- func (db *DB) BulkUpdate(ctx context.Context, table string, columns, types []string, values [][]any) (err error)
- func (db *DB) BulkUpsert(ctx context.Context, table string, columns []string, values []any, ...) error
- func (db *DB) BulkUpsertReturning(ctx context.Context, table string, columns []string, values []any, ...) error
- func (db *DB) Close() error
- func (db *DB) CopyInsert(ctx context.Context, table string, columns []string, src pgx.CopyFromSource, ...) (err error)
- func (db *DB) CopyUpsert(ctx context.Context, table string, columns []string, src pgx.CopyFromSource, ...) (err error)
- func (db *DB) Exec(ctx context.Context, query string, args ...any) (_ int64, err error)
- func (db *DB) InTransaction() bool
- func (db *DB) IsRetryable() bool
- func (db *DB) MaxRetries() int
- func (db *DB) Ping() error
- func (db *DB) Prepare(ctx context.Context, query string) (*sql.Stmt, error)
- func (db *DB) Query(ctx context.Context, query string, args ...any) (_ *sql.Rows, err error)
- func (db *DB) QueryRow(ctx context.Context, query string, args ...any) *sql.Row
- func (db *DB) RunQuery(ctx context.Context, query string, f func(*sql.Rows) error, params ...any) error
- func (db *DB) RunQueryIncrementally(ctx context.Context, query string, batchSize int, f func(*sql.Rows) error, ...) (err error)
- func (db *DB) Transact(ctx context.Context, iso sql.IsolationLevel, txFunc func(*DB) error) (err error)
- func (db *DB) WithPGXConn(f func(conn *pgx.Conn) error) error
- type MultiErr
- type RowItem
Constants ¶
const OnConflictDoNothing = "ON CONFLICT DO NOTHING"
Variables ¶
var QueryLoggingDisabled bool
QueryLoggingDisabled stops logging of queries when true. For use in tests only: not concurrency-safe.
Functions ¶
func Collect1 ¶
Collect1 runs the query, which must select for a single column that can be scanned into a value of type T, and returns a slice of the resulting values.
func CollectStructPtrs ¶
func CollectStructs ¶
CollectStructs scans the rows from the query into structs and returns a slice of them. Example:
type Player struct { Name string; Score int } var players []Player err := db.CollectStructs(ctx, &players, "SELECT name, score FROM players")
func ConnectAndExecute ¶
ConnectAndExecute connects to the postgres database specified by uri and executes dbFunc, then cleans up the database connection. It returns an error that Is derrors.NotFound if no connection could be made.
func CopyFromChan ¶
func CopyFromChan(c <-chan RowItem) pgx.CopyFromSource
CopyFromChan returns a CopyFromSource that gets its rows from a channel.
func CreateDBIfNotExists ¶
CreateDBIfNotExists checks whether the given dbName is an existing database, and creates one if not.
func DBConnURI ¶
DBConnURI generates a postgres connection string in URI format. This is necessary as migrate expects a URI.
func NullIsEmpty ¶
NullIsEmpty returns a sql.Scanner that writes the empty string to s if the sql.Value is NULL.
func NullPtr ¶
func NullPtr(p any) nullPtr
NullPtr is for scanning nullable database columns into pointer variables or fields. When given a pointer to a pointer to some type T, it returns a value that can be passed to a Scan function. If the corresponding column is nil, the variable will be set to nil. Otherwise, it will be set to a newly allocated pointer to the column value.
func RegisterOCWrapper ¶
func RegisterOCWrapper(driverName string, opts ...ocsql.TraceOption) (string, error)
RegisterOCWrapper registers a driver that wraps the OpenCensus driver, which in turn wraps the driver named as the first argument.
func ResetDB ¶
ResetDB truncates all data from the given test DB. It should be called after every test that mutates the database.
func StructScanner ¶
StructScanner returns a function that, when called on a struct pointer of its argument type, returns a slice of arguments suitable for Row.Scan or Rows.Scan. The call to either Scan will populate the exported fields of the struct in the order they appear in the type definition.
StructScanner panics if p is not a struct or a pointer to a struct. The function it returns will panic if its argument is not a pointer to a struct.
Example:
type Player struct { Name string; Score int } playerScanArgs := database.StructScanner(Player{}) err := db.RunQuery(ctx, "SELECT name, score FROM players", func(rows *sql.Rows) error { var p Player if err := rows.Scan(playerScanArgs(&p)...); err != nil { return err } // use p return nil })
func TryToMigrate ¶
TryToMigrate attempts to migrate the database named dbName to the latest migration. If this operation fails in the migration step, it returns isMigrationError=true to signal that the database should be recreated.
Types ¶
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB wraps a sql.DB. The methods it exports correspond closely to those of sql.DB. They enhance the original by requiring a context argument, and by logging the query and any resulting errors.
A DB may represent a transaction. If so, its execution and query methods operate within the transaction.
func (*DB) BulkInsert ¶
func (db *DB) BulkInsert(ctx context.Context, table string, columns []string, values []any, conflictAction string) (err error)
BulkInsert constructs and executes a multi-value insert statement. The query is constructed using the format:
INSERT INTO <table> (<columns>) VALUES (<placeholders-for-each-item-in-values>)
If conflictAction is not empty, it is appended to the statement.
The query is executed using a PREPARE statement with the provided values.
func (*DB) BulkInsertReturning ¶
func (db *DB) BulkInsertReturning(ctx context.Context, table string, columns []string, values []any, conflictAction string, returningColumns []string, scanFunc func(*sql.Rows) error) (err error)
BulkInsertReturning is like BulkInsert, but supports returning values from the INSERT statement. In addition to the arguments of BulkInsert, it takes a list of columns to return and a function to scan those columns. To get the returned values, provide a function that scans them as if they were the selected columns of a query. See TestBulkInsert for an example.
func (*DB) BulkUpdate ¶
func (db *DB) BulkUpdate(ctx context.Context, table string, columns, types []string, values [][]any) (err error)
BulkUpdate executes multiple UPDATE statements in a transaction.
Columns must contain the names of some of table's columns. The first is treated as a key; that is, the values to update are matched with existing rows by comparing the values of the first column.
Types holds the database type of each column. For example,
[]string{"INT", "TEXT"}
Values contains one slice of values per column. (Note that this is unlike BulkInsert, which takes a single slice of interleaved values.)
func (*DB) BulkUpsert ¶
func (db *DB) BulkUpsert(ctx context.Context, table string, columns []string, values []any, conflictColumns []string) error
BulkUpsert is like BulkInsert, but instead of a conflict action, a list of conflicting columns is provided. An "ON CONFLICT (conflict_columns) DO UPDATE" clause is added to the statement, with assignments "c=excluded.c" for every column c.
func (*DB) BulkUpsertReturning ¶
func (db *DB) BulkUpsertReturning(ctx context.Context, table string, columns []string, values []any, conflictColumns, returningColumns []string, scanFunc func(*sql.Rows) error) error
BulkUpsertReturning is like BulkInsertReturning, but performs an upsert like BulkUpsert.
func (*DB) CopyInsert ¶
func (db *DB) CopyInsert(ctx context.Context, table string, columns []string, src pgx.CopyFromSource, dropColumn string) (err error)
CopyInsert insert rows into table using the pgx driver's CopyFrom method. It returns an error if the underlying driver is not pgx. columns is the list of columns to upsert. src is the source of the rows to upsert. If dropColumn is non-empty, that column will be dropped from the temporary table before copying. Use dropColumn for generated ID columns.
CopyInsert works by first creating a temporary table, populating it with CopyFrom, and then running an INSERT...SELECT... to insert its rows into the original table.
func (*DB) CopyUpsert ¶
func (db *DB) CopyUpsert(ctx context.Context, table string, columns []string, src pgx.CopyFromSource, conflictColumns []string, dropColumn string) (err error)
CopyUpsert upserts rows into table using the pgx driver's CopyFrom method. It returns an error if the underlying driver is not pgx. columns is the list of columns to upsert. src is the source of the rows to upsert. conflictColumns are the columns that might conflict (i.e. that have a UNIQUE constraint). If dropColumn is non-empty, that column will be dropped from the temporary table before copying. Use dropColumn for generated ID columns.
CopyUpsert works by first creating a temporary table, populating it with CopyFrom, and then running an INSERT...SELECT...ON CONFLICT to upsert its rows into the original table.
func (*DB) InTransaction ¶
func (*DB) IsRetryable ¶
func (*DB) MaxRetries ¶
MaxRetries returns the maximum number of times thata serializable transaction was retried.
func (*DB) RunQuery ¶
func (db *DB) RunQuery(ctx context.Context, query string, f func(*sql.Rows) error, params ...any) error
RunQuery executes query, then calls f on each row. It stops when there are no more rows or f returns a non-nil error.
func (*DB) RunQueryIncrementally ¶
func (db *DB) RunQueryIncrementally(ctx context.Context, query string, batchSize int, f func(*sql.Rows) error, params ...any) (err error)
RunQueryIncrementally executes query, then calls f on each row. It fetches rows in groups of size batchSize. It stops when there are no more rows, or when f returns io.EOF.
func (*DB) Transact ¶
func (db *DB) Transact(ctx context.Context, iso sql.IsolationLevel, txFunc func(*DB) error) (err error)
Transact executes the given function in the context of a SQL transaction at the given isolation level, rolling back the transaction if the function panics or returns an error.
The given function is called with a DB that is associated with a transaction. The DB should be used only inside the function; if it is used to access the database after the function returns, the calls will return errors.
If the isolation level requires it, Transact will retry the transaction upon serialization failure, so txFunc may be called more than once.