pg

package
v2.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2024 License: MIT Imports: 26 Imported by: 1

Documentation

Index

Constants

View Source
const BatchSize uint = 1000

BatchSize is the default number of DB records to access in one batch

View Source
const (
	// DefaultQueryTimeout is a reasonable upper bound for how long a SQL query should take.
	// The configured value should be used instead of this if possible.
	DefaultQueryTimeout = 10 * time.Second
)

Variables

This section is empty.

Functions

func Batch

func Batch(cb BatchFunc) error

Batch is an iterator for batches of records

func NewConnection

func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error)

func OpenUnlockedDB

func OpenUnlockedDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error)

OpenUnlockedDB just opens DB connection, without any DB locks. This should be used carefully, when we know we don't need any locks. Currently this is used by RebroadcastTransactions command only.

func PrepareQueryRowx

func PrepareQueryRowx(q Queryer, sql string, dest interface{}, arg interface{}) error

func SqlTransaction

func SqlTransaction(ctx context.Context, rdb *sql.DB, lggr logger.Logger, fn func(tx *sqlx.Tx) error, opts ...TxOption) (err error)

func SqlxTransaction

func SqlxTransaction(ctx context.Context, q Queryer, lggr logger.Logger, fc func(q Queryer) error, txOpts ...TxOption) (err error)

func WrapDbWithSqlx

func WrapDbWithSqlx(rdb *sql.DB) *sqlx.DB

Types

type BatchFunc

type BatchFunc func(offset, limit uint) (count uint, err error)

BatchFunc is the function to execute on each batch of records, should return the count of records affected

type ConnectionConfig

type ConnectionConfig interface {
	DefaultIdleInTxSessionTimeout() time.Duration
	DefaultLockTimeout() time.Duration
	MaxOpenConns() int
	MaxIdleConns() int
}

type LeaseLock

type LeaseLock interface {
	TakeAndHold(ctx context.Context) error
	ClientID() uuid.UUID
	Release()
}

LeaseLock handles taking an exclusive lease on database access. This is not enforced by any database primitives, but rather voluntarily respected by other instances of the Plugin application.

Plugin is designed to run as a single instance. Running multiple instances of Plugin on a single database at the same time is not supported and likely to lead to strange errors and possibly even data integrity failures.

With that being said, a common use case is to run multiple Plugin instances in failover mode. The first instance will take some kind of lock on the database and subsequent instances will wait trying to take this lock in case the first instance disappears or dies.

Traditionally Plugin has used an advisory lock to manage this. However, advisory locks come with several problems, notably: - Postgres does not really like it when you hold locks open for a very long time (hours/days). It hampers certain internal cleanup tasks and is explicitly discouraged by the postgres maintainers - The advisory lock can silently disappear on postgres upgrade - Advisory locks do not play nicely with pooling tools such as pgbouncer - If the application crashes, the advisory lock can be left hanging around for a while (sometimes hours) and can require manual intervention to remove it

For this reason, we now use a database leaseLock instead, which works as such: - Have one row in a database which is updated periodically with the client ID - CL node A will run a background process on start that updates this e.g. once per second - CL node B will spinlock, checking periodically to see if the update got too old. If it goes more than, say, 5s without updating, it assumes that node A is dead and takes over. Now CL node B is the owner of the row and it updates this every second - If CL node A comes back somehow, it will go to take out a lease and realise that the database has been leased to another process, so it will panic and quit immediately

func NewLeaseLock

func NewLeaseLock(db *sqlx.DB, appID uuid.UUID, lggr logger.Logger, cfg LeaseLockConfig) LeaseLock

NewLeaseLock creates a "leaseLock" - an entity that tries to take an exclusive lease on the database

type LeaseLockConfig

type LeaseLockConfig struct {
	DefaultQueryTimeout  time.Duration
	LeaseDuration        time.Duration
	LeaseRefreshInterval time.Duration
}

type Limit

type Limit int

Limit is a helper driver.Valuer for LIMIT queries which uses nil/NULL for negative values.

func (Limit) String

func (l Limit) String() string

func (Limit) Value

func (l Limit) Value() (driver.Value, error)

type LockedDB

type LockedDB interface {
	Open(ctx context.Context) error
	Close() error
	DB() *sqlx.DB
}

LockedDB bounds DB connection and DB locks.

func NewLockedDB

func NewLockedDB(appID uuid.UUID, cfg LockedDBConfig, lockCfg config.Lock, lggr logger.Logger) LockedDB

NewLockedDB creates a new instance of LockedDB.

type LockedDBConfig

type LockedDBConfig interface {
	ConnectionConfig
	URL() url.URL
	DefaultQueryTimeout() time.Duration
	Dialect() dialects.DialectName
}

type Q

type Q struct {
	Queryer
	ParentCtx context.Context

	QueryTimeout time.Duration
	// contains filtered or unexported fields
}

Q wraps an underlying queryer (either a *sqlx.DB or a *sqlx.Tx)

It is designed to make handling *sqlx.Tx or *sqlx.DB a little bit safer by preventing footguns such as having no deadline on contexts.

It also handles nesting transactions.

It automatically adds the default context deadline to all non-context queries (if you _really_ want to issue a query without a context, use the underlying Queryer)

This is not the prettiest construct but without macros its about the best we can do.

func NewQ

func NewQ(db *sqlx.DB, lggr logger.Logger, config QConfig, qopts ...QOpt) (q Q)

func (Q) Context

func (q Q) Context() (context.Context, context.CancelFunc)

func (Q) ExecQ

func (q Q) ExecQ(query string, args ...interface{}) error

func (Q) ExecQIter

func (q Q) ExecQIter(query string, args ...interface{}) (sql.Result, context.CancelFunc, error)

CAUTION: A subtle problem lurks here, because the following code is buggy:

ctx, cancel := context.WithCancel(context.Background())
rows, err := db.QueryContext(ctx, "SELECT foo")
cancel() // canceling here "poisons" the scan below
for rows.Next() {
  rows.Scan(...)
}

We must cancel the context only after we have completely finished using the returned rows or result from the query/exec

For this reasons, the following functions return a context.CancelFunc and it is up to the caller to ensure that cancel is called after it has finished

Generally speaking, it makes more sense to use Get/Select in most cases, which avoids this problem

func (Q) ExecQNamed

func (q Q) ExecQNamed(query string, arg interface{}) (err error)

func (Q) Get

func (q Q) Get(dest interface{}, query string, args ...interface{}) error

func (Q) GetNamed

func (q Q) GetNamed(sql string, dest interface{}, arg interface{}) error

func (Q) Select

func (q Q) Select(dest interface{}, query string, args ...interface{}) error

Select and Get are safe to wrap the context cancellation because the rows are entirely consumed within the call

func (Q) SelectNamed

func (q Q) SelectNamed(dest interface{}, query string, arg interface{}) error

func (Q) Transaction

func (q Q) Transaction(fc func(q Queryer) error, txOpts ...TxOption) error

func (Q) WithOpts

func (q Q) WithOpts(qopts ...QOpt) Q

type QConfig

type QConfig interface {
	LogSQL() bool
	DefaultQueryTimeout() time.Duration
}

func NewQConfig

func NewQConfig(logSQL bool) QConfig

type QOpt

type QOpt func(*Q)

QOpt pattern for ORM methods aims to clarify usage and remove some common footguns, notably:

1. It should be easy and obvious how to pass a parent context or a transaction into an ORM method 2. Simple queries should not be cluttered 3. It should have compile-time safety and be explicit 4. It should enforce some sort of context deadline on all queries by default 5. It should optimise for clarity and readability 6. It should mandate using sqlx everywhere, gorm is forbidden in new code 7. It should make using sqlx a little more convenient by wrapping certain methods 8. It allows easier mocking of DB calls (Queryer is an interface)

The two main concepts introduced are:

A `Q` struct that wraps a `sqlx.DB` or `sqlx.Tx` and implements the `pg.Queryer` interface.

This struct is initialised with `QOpts` which define how the queryer should behave. `QOpts` can define a parent context, an open transaction or other options to configure the Queryer.

A sample ORM method looks like this:

func (o *orm) GetFoo(id int64, qopts ...pg.QOpt) (Foo, error) {
	q := pg.NewQ(q, qopts...)
	return q.Exec(...)
}

Now you can call it like so:

orm.GetFoo(1) // will automatically have default query timeout context set
orm.GetFoo(1, pg.WithParentCtx(ctx)) // will wrap the supplied parent context with the default query context
orm.GetFoo(1, pg.WithQueryer(tx)) // allows to pass in a running transaction or anything else that implements Queryer
orm.GetFoo(q, pg.WithQueryer(tx), pg.WithParentCtx(ctx)) // options can be combined

func WithLongQueryTimeout

func WithLongQueryTimeout() QOpt

WithLongQueryTimeout prevents the usage of the `DefaultQueryTimeout` duration and uses `OneMinuteQueryTimeout` instead Some queries need to take longer when operating over big chunks of data, like deleting jobs, but we need to keep some upper bound timeout

func WithParentCtx

func WithParentCtx(ctx context.Context) QOpt

WithParentCtx sets or overwrites the parent ctx

func WithParentCtxInheritTimeout

func WithParentCtxInheritTimeout(ctx context.Context) QOpt

If the parent has a timeout, just use that instead of DefaultTimeout

func WithQueryer

func WithQueryer(queryer Queryer) QOpt

WithQueryer sets the queryer

type Queryer

type Queryer interface {
	sqlx.Ext
	sqlx.ExtContext
	sqlx.Preparer
	sqlx.PreparerContext
	sqlx.Queryer
	Select(dest interface{}, query string, args ...interface{}) error
	SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
	PrepareNamed(query string) (*sqlx.NamedStmt, error)
	QueryRow(query string, args ...interface{}) *sql.Row
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
	Get(dest interface{}, query string, args ...interface{}) error
	GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
	NamedExec(query string, arg interface{}) (sql.Result, error)
	NamedQuery(query string, arg interface{}) (*sqlx.Rows, error)
}

type ReportFn

type ReportFn func(sql.DBStats)

type StatFn

type StatFn func() sql.DBStats

type StatsReporter

type StatsReporter struct {
	// contains filtered or unexported fields
}

func NewStatsReporter

func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...StatsReporterOpt) *StatsReporter

func (*StatsReporter) Start

func (r *StatsReporter) Start(ctx context.Context)

func (*StatsReporter) Stop

func (r *StatsReporter) Stop()

Stop stops all resources owned by the reporter and waits for all of them to be done

type StatsReporterOpt

type StatsReporterOpt func(*StatsReporter)

func StatsCustomReporterFn

func StatsCustomReporterFn(fn ReportFn) StatsReporterOpt

func StatsInterval

func StatsInterval(d time.Duration) StatsReporterOpt

type TxOption

type TxOption func(*sql.TxOptions)

TxOption is a functional option for SQL transactions.

func OptReadOnlyTx

func OptReadOnlyTx() TxOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL