Documentation ¶
Index ¶
- Constants
- func Batch(cb BatchFunc) error
- func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error)
- func OpenUnlockedDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error)
- func PrepareQueryRowx(q Queryer, sql string, dest interface{}, arg interface{}) error
- func SqlTransaction(ctx context.Context, rdb *sql.DB, lggr logger.Logger, ...) (err error)
- func SqlxTransaction(ctx context.Context, q Queryer, lggr logger.Logger, fc func(q Queryer) error, ...) (err error)
- func WrapDbWithSqlx(rdb *sql.DB) *sqlx.DB
- type BatchFunc
- type ConnectionConfig
- type LeaseLock
- type LeaseLockConfig
- type Limit
- type LockedDB
- type LockedDBConfig
- type Q
- func (q Q) Context() (context.Context, context.CancelFunc)
- func (q Q) ExecQ(query string, args ...interface{}) error
- func (q Q) ExecQIter(query string, args ...interface{}) (sql.Result, context.CancelFunc, error)
- func (q Q) ExecQNamed(query string, arg interface{}) (err error)
- func (q Q) Get(dest interface{}, query string, args ...interface{}) error
- func (q Q) GetNamed(sql string, dest interface{}, arg interface{}) error
- func (q Q) Select(dest interface{}, query string, args ...interface{}) error
- func (q Q) SelectNamed(dest interface{}, query string, arg interface{}) error
- func (q Q) Transaction(fc func(q Queryer) error, txOpts ...TxOption) error
- func (q Q) WithOpts(qopts ...QOpt) Q
- type QConfig
- type QOpt
- type Queryer
- type ReportFn
- type StatFn
- type StatsReporter
- type StatsReporterOpt
- type TxOption
Constants ¶
const BatchSize uint = 1000
BatchSize is the default number of DB records to access in one batch
const ( // DefaultQueryTimeout is a reasonable upper bound for how long a SQL query should take. // The configured value should be used instead of this if possible. DefaultQueryTimeout = 10 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func NewConnection ¶
func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error)
func OpenUnlockedDB ¶
OpenUnlockedDB just opens DB connection, without any DB locks. This should be used carefully, when we know we don't need any locks. Currently this is used by RebroadcastTransactions command only.
func PrepareQueryRowx ¶
func SqlTransaction ¶
func SqlxTransaction ¶
Types ¶
type BatchFunc ¶
BatchFunc is the function to execute on each batch of records, should return the count of records affected
type ConnectionConfig ¶
type LeaseLock ¶
LeaseLock handles taking an exclusive lease on database access. This is not enforced by any database primitives, but rather voluntarily respected by other instances of the Plugin application.
Plugin is designed to run as a single instance. Running multiple instances of Plugin on a single database at the same time is not supported and likely to lead to strange errors and possibly even data integrity failures.
With that being said, a common use case is to run multiple Plugin instances in failover mode. The first instance will take some kind of lock on the database and subsequent instances will wait trying to take this lock in case the first instance disappears or dies.
Traditionally Plugin has used an advisory lock to manage this. However, advisory locks come with several problems, notably: - Postgres does not really like it when you hold locks open for a very long time (hours/days). It hampers certain internal cleanup tasks and is explicitly discouraged by the postgres maintainers - The advisory lock can silently disappear on postgres upgrade - Advisory locks do not play nicely with pooling tools such as pgbouncer - If the application crashes, the advisory lock can be left hanging around for a while (sometimes hours) and can require manual intervention to remove it
For this reason, we now use a database leaseLock instead, which works as such: - Have one row in a database which is updated periodically with the client ID - CL node A will run a background process on start that updates this e.g. once per second - CL node B will spinlock, checking periodically to see if the update got too old. If it goes more than, say, 5s without updating, it assumes that node A is dead and takes over. Now CL node B is the owner of the row and it updates this every second - If CL node A comes back somehow, it will go to take out a lease and realise that the database has been leased to another process, so it will panic and quit immediately
func NewLeaseLock ¶
NewLeaseLock creates a "leaseLock" - an entity that tries to take an exclusive lease on the database
type LeaseLockConfig ¶
type Limit ¶
type Limit int
Limit is a helper driver.Valuer for LIMIT queries which uses nil/NULL for negative values.
type LockedDB ¶
LockedDB bounds DB connection and DB locks.
func NewLockedDB ¶
func NewLockedDB(appID uuid.UUID, cfg LockedDBConfig, lockCfg config.Lock, lggr logger.Logger) LockedDB
NewLockedDB creates a new instance of LockedDB.
type LockedDBConfig ¶
type LockedDBConfig interface { ConnectionConfig URL() url.URL DefaultQueryTimeout() time.Duration Dialect() dialects.DialectName }
type Q ¶
type Q struct { Queryer ParentCtx context.Context QueryTimeout time.Duration // contains filtered or unexported fields }
Q wraps an underlying queryer (either a *sqlx.DB or a *sqlx.Tx)
It is designed to make handling *sqlx.Tx or *sqlx.DB a little bit safer by preventing footguns such as having no deadline on contexts.
It also handles nesting transactions.
It automatically adds the default context deadline to all non-context queries (if you _really_ want to issue a query without a context, use the underlying Queryer)
This is not the prettiest construct but without macros its about the best we can do.
func (Q) ExecQIter ¶
CAUTION: A subtle problem lurks here, because the following code is buggy:
ctx, cancel := context.WithCancel(context.Background()) rows, err := db.QueryContext(ctx, "SELECT foo") cancel() // canceling here "poisons" the scan below for rows.Next() { rows.Scan(...) }
We must cancel the context only after we have completely finished using the returned rows or result from the query/exec
For this reasons, the following functions return a context.CancelFunc and it is up to the caller to ensure that cancel is called after it has finished
Generally speaking, it makes more sense to use Get/Select in most cases, which avoids this problem
func (Q) ExecQNamed ¶
func (Q) Select ¶
Select and Get are safe to wrap the context cancellation because the rows are entirely consumed within the call
func (Q) SelectNamed ¶
type QConfig ¶
func NewQConfig ¶
type QOpt ¶
type QOpt func(*Q)
QOpt pattern for ORM methods aims to clarify usage and remove some common footguns, notably:
1. It should be easy and obvious how to pass a parent context or a transaction into an ORM method 2. Simple queries should not be cluttered 3. It should have compile-time safety and be explicit 4. It should enforce some sort of context deadline on all queries by default 5. It should optimise for clarity and readability 6. It should mandate using sqlx everywhere, gorm is forbidden in new code 7. It should make using sqlx a little more convenient by wrapping certain methods 8. It allows easier mocking of DB calls (Queryer is an interface)
The two main concepts introduced are:
A `Q` struct that wraps a `sqlx.DB` or `sqlx.Tx` and implements the `pg.Queryer` interface.
This struct is initialised with `QOpts` which define how the queryer should behave. `QOpts` can define a parent context, an open transaction or other options to configure the Queryer.
A sample ORM method looks like this:
func (o *orm) GetFoo(id int64, qopts ...pg.QOpt) (Foo, error) { q := pg.NewQ(q, qopts...) return q.Exec(...) }
Now you can call it like so:
orm.GetFoo(1) // will automatically have default query timeout context set orm.GetFoo(1, pg.WithParentCtx(ctx)) // will wrap the supplied parent context with the default query context orm.GetFoo(1, pg.WithQueryer(tx)) // allows to pass in a running transaction or anything else that implements Queryer orm.GetFoo(q, pg.WithQueryer(tx), pg.WithParentCtx(ctx)) // options can be combined
func WithLongQueryTimeout ¶
func WithLongQueryTimeout() QOpt
WithLongQueryTimeout prevents the usage of the `DefaultQueryTimeout` duration and uses `OneMinuteQueryTimeout` instead Some queries need to take longer when operating over big chunks of data, like deleting jobs, but we need to keep some upper bound timeout
func WithParentCtx ¶
WithParentCtx sets or overwrites the parent ctx
func WithParentCtxInheritTimeout ¶
If the parent has a timeout, just use that instead of DefaultTimeout
type Queryer ¶
type Queryer interface { sqlx.Ext sqlx.ExtContext sqlx.Preparer sqlx.PreparerContext sqlx.Queryer Select(dest interface{}, query string, args ...interface{}) error SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error PrepareNamed(query string) (*sqlx.NamedStmt, error) QueryRow(query string, args ...interface{}) *sql.Row QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row Get(dest interface{}, query string, args ...interface{}) error GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error NamedExec(query string, arg interface{}) (sql.Result, error) NamedQuery(query string, arg interface{}) (*sqlx.Rows, error) }
type StatsReporter ¶
type StatsReporter struct {
// contains filtered or unexported fields
}
func NewStatsReporter ¶
func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...StatsReporterOpt) *StatsReporter
func (*StatsReporter) Start ¶
func (r *StatsReporter) Start(ctx context.Context)
func (*StatsReporter) Stop ¶
func (r *StatsReporter) Stop()
Stop stops all resources owned by the reporter and waits for all of them to be done
type StatsReporterOpt ¶
type StatsReporterOpt func(*StatsReporter)
func StatsCustomReporterFn ¶
func StatsCustomReporterFn(fn ReportFn) StatsReporterOpt
func StatsInterval ¶
func StatsInterval(d time.Duration) StatsReporterOpt