pg

package
v1.13.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2023 License: MIT Imports: 27 Imported by: 1

Documentation

Index

Constants

View Source
const BatchSize uint = 1000

BatchSize is the default number of DB records to access in one batch

View Source
const (
	ChannelInsertOnEthTx = "insert_on_eth_txes"
)

Postgres channel to listen for new eth_txes

View Source
const (
	// NOTE: This is the default level in Postgres anyway, we just make it
	// explicit here
	DefaultIsolation = sql.LevelReadCommitted
)

NOTE: In an ideal world the timeouts below would be set to something sane in the postgres configuration by the user. Since we do not live in an ideal world, it is necessary to override them here.

They cannot easily be set at a session level due to how Go's connection pooling works.

Variables

View Source
var (
	// DefaultQueryTimeout is a reasonable upper bound for how long a SQL query should take
	DefaultQueryTimeout = 10 * time.Second

	// DefaultLockTimeout controls the max time we will wait for any kind of database lock.
	// It's good to set this to _something_ because waiting for locks forever is really bad.
	DefaultLockTimeout = 15 * time.Second
	// DefaultIdleInTxSessionTimeout controls the max time we leave a transaction open and idle.
	// It's good to set this to _something_ because leaving transactions open forever is really bad.
	DefaultIdleInTxSessionTimeout = 1 * time.Hour
)

unexport and make constant after legacy config is removed https://app.shortcut.com/chainlinklabs/story/33622/remove-legacy-config

Functions

func Batch

func Batch(cb BatchFunc) error

Batch is an iterator for batches of records

func NewConnection

func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error)

func NewEventBroadcaster

func NewEventBroadcaster(uri url.URL, minReconnectInterval time.Duration, maxReconnectDuration time.Duration, lggr logger.Logger, appID uuid.UUID) *eventBroadcaster

func OpenUnlockedDB added in v1.2.0

func OpenUnlockedDB(cfg LockedDBConfig) (db *sqlx.DB, err error)

OpenUnlockedDB just opens DB connection, without any DB locks. This should be used carefully, when we know we don't need any locks. Currently this is used by RebroadcastTransactions command only.

func PrepareQueryRowx

func PrepareQueryRowx(q Queryer, sql string, dest interface{}, arg interface{}) error

func SqlTransaction

func SqlTransaction(ctx context.Context, rdb *sql.DB, lggr logger.Logger, fn func(tx *sqlx.Tx) error, optss ...TxOptions) (err error)

func SqlxTransaction

func SqlxTransaction(ctx context.Context, q Queryer, lggr logger.Logger, fc func(q Queryer) error, txOpts ...TxOptions) (err error)

func WithLongQueryTimeout added in v1.3.0

func WithLongQueryTimeout() func(q *Q)

WithLongQueryTimeout prevents the usage of the `DefaultQueryTimeout` duration and uses `OneMinuteQueryTimeout` instead Some queries need to take longer when operating over big chunks of data, like deleting jobs, but we need to keep some upper bound timeout

func WithParentCtx

func WithParentCtx(ctx context.Context) func(q *Q)

WithParentCtx sets or overwrites the parent ctx

func WithParentCtxInheritTimeout added in v1.9.0

func WithParentCtxInheritTimeout(ctx context.Context) func(q *Q)

If the parent has a timeout, just use that instead of DefaultTimeout

func WithQueryer

func WithQueryer(queryer Queryer) func(q *Q)

WithQueryer sets the queryer

func WrapDbWithSqlx

func WrapDbWithSqlx(rdb *sql.DB) *sqlx.DB

Types

type AdvisoryLock added in v1.2.0

type AdvisoryLock interface {
	TakeAndHold(ctx context.Context) error
	Release()
}

AdvisoryLock is an interface for postgresql advisory locks.

func NewAdvisoryLock added in v1.2.0

func NewAdvisoryLock(db *sqlx.DB, lggr logger.Logger, cfg AdvisoryLockConfig) AdvisoryLock

NewAdvisoryLock returns an advisoryLocker

type AdvisoryLockConfig added in v1.10.0

type AdvisoryLockConfig interface {
	AdvisoryLockID() int64
	AdvisoryLockCheckInterval() time.Duration
	DatabaseDefaultQueryTimeout() time.Duration
}

type BatchFunc

type BatchFunc func(offset, limit uint) (count uint, err error)

BatchFunc is the function to execute on each batch of records, should return the count of records affected

type ConnectionConfig added in v1.10.0

type ConnectionConfig interface {
	DatabaseDefaultIdleInTxSessionTimeout() time.Duration
	DatabaseDefaultLockTimeout() time.Duration
	ORMMaxOpenConns() int
	ORMMaxIdleConns() int
}

type Event

type Event struct {
	Channel string
	Payload string
}

type EventBroadcaster

type EventBroadcaster interface {
	services.ServiceCtx
	Subscribe(channel, payloadFilter string) (Subscription, error)
	Notify(channel string, payload string) error
}

EventBroadcaster opaquely manages a collection of Postgres event listeners and broadcasts events to subscribers (with an optional payload filter).

type LeaseLock

type LeaseLock interface {
	TakeAndHold(ctx context.Context) error
	ClientID() uuid.UUID
	Release()
}

LeaseLock handles taking an exclusive lease on database access. This is not enforced by any database primitives, but rather voluntarily respected by other instances of the Chainlink application.

Chainlink is designed to run as a single instance. Running multiple instances of Chainlink on a single database at the same time is not supported and likely to lead to strange errors and possibly even data integrity failures.

With that being said, a common use case is to run multiple Chainlink instances in failover mode. The first instance will take some kind of lock on the database and subsequent instances will wait trying to take this lock in case the first instance disappears or dies.

Traditionally Chainlink has used an advisory lock to manage this. However, advisory locks come with several problems, notably: - Postgres does not really like it when you hold locks open for a very long time (hours/days). It hampers certain internal cleanup tasks and is explicitly discouraged by the postgres maintainers - The advisory lock can silently disappear on postgres upgrade - Advisory locks do not play nicely with pooling tools such as pgbouncer - If the application crashes, the advisory lock can be left hanging around for a while (sometimes hours) and can require manual intervention to remove it

For this reason, we now use a database leaseLock instead, which works as such: - Have one row in a database which is updated periodically with the client ID - CL node A will run a background process on start that updates this e.g. once per second - CL node B will spinlock, checking periodically to see if the update got too old. If it goes more than, say, 5s without updating, it assumes that node A is dead and takes over. Now CL node B is the owner of the row and it updates this every second - If CL node A comes back somehow, it will go to take out a lease and realise that the database has been leased to another process, so it will panic and quit immediately

func NewLeaseLock

func NewLeaseLock(db *sqlx.DB, appID uuid.UUID, lggr logger.Logger, cfg LeaseLockConfig) LeaseLock

NewLeaseLock creates a "leaseLock" - an entity that tries to take an exclusive lease on the database

type LeaseLockConfig added in v1.10.0

type LeaseLockConfig interface {
	DatabaseDefaultQueryTimeout() time.Duration
	LeaseLockDuration() time.Duration
	LeaseLockRefreshInterval() time.Duration
}

type Limit added in v1.4.0

type Limit int

Limit is a helper driver.Valuer for LIMIT queries which uses nil/NULL for negative values.

func (Limit) String added in v1.4.0

func (l Limit) String() string

func (Limit) Value added in v1.4.0

func (l Limit) Value() (driver.Value, error)

type LockedDB added in v1.2.0

type LockedDB interface {
	Open(ctx context.Context) error
	Close() error
	DB() *sqlx.DB
}

LockedDB bounds DB connection and DB locks.

func NewLockedDB added in v1.2.0

func NewLockedDB(cfg LockedDBConfig, lggr logger.Logger) LockedDB

NewLockedDB creates a new instance of LockedDB.

type LockedDBConfig added in v1.10.0

type LockedDBConfig interface {
	ConnectionConfig
	AdvisoryLockCheckInterval() time.Duration
	AdvisoryLockID() int64
	AppID() uuid.UUID
	DatabaseLockingMode() string
	DatabaseURL() url.URL
	DatabaseDefaultQueryTimeout() time.Duration
	LeaseLockDuration() time.Duration
	LeaseLockRefreshInterval() time.Duration
	GetDatabaseDialectConfiguredOrDefault() dialects.DialectName
	MigrateDatabase() bool
}

type NullEventBroadcaster

type NullEventBroadcaster struct {
	Sub *NullSubscription
}

NullEventBroadcaster implements null pattern for event broadcaster

func NewNullEventBroadcaster

func NewNullEventBroadcaster() *NullEventBroadcaster

func (*NullEventBroadcaster) Close

func (*NullEventBroadcaster) Close() error

Close does no-op.

func (*NullEventBroadcaster) HealthReport added in v1.13.0

func (*NullEventBroadcaster) HealthReport() map[string]error

HealthReport does no-op

func (*NullEventBroadcaster) Healthy

func (*NullEventBroadcaster) Healthy() error

Healthy does no-op.

func (*NullEventBroadcaster) Name added in v1.13.0

func (*NullEventBroadcaster) Name() string

func (*NullEventBroadcaster) Notify

func (*NullEventBroadcaster) Notify(channel string, payload string) error

func (*NullEventBroadcaster) Ready

func (*NullEventBroadcaster) Ready() error

Ready does no-op.

func (*NullEventBroadcaster) Start

Start does no-op.

func (*NullEventBroadcaster) Subscribe

func (ne *NullEventBroadcaster) Subscribe(channel, payloadFilter string) (Subscription, error)

type NullSubscription

type NullSubscription struct {
	Ch chan (Event)
}

func (*NullSubscription) ChannelName

func (ns *NullSubscription) ChannelName() string

func (*NullSubscription) Close

func (ns *NullSubscription) Close()

func (*NullSubscription) Events

func (ns *NullSubscription) Events() <-chan Event

func (*NullSubscription) InterestedIn

func (ns *NullSubscription) InterestedIn(event Event) bool

func (*NullSubscription) Send

func (ns *NullSubscription) Send(event Event)

type Q

type Q struct {
	Queryer
	ParentCtx context.Context

	QueryTimeout time.Duration
	// contains filtered or unexported fields
}

Q wraps an underlying queryer (either a *sqlx.DB or a *sqlx.Tx)

It is designed to make handling *sqlx.Tx or *sqlx.DB a little bit safer by preventing footguns such as having no deadline on contexts.

It also handles nesting transactions.

It automatically adds the default context deadline to all non-context queries (if you _really_ want to issue a query without a context, use the underlying Queryer)

This is not the prettiest construct but without macros its about the best we can do.

func NewQ

func NewQ(db *sqlx.DB, logger logger.Logger, config QConfig, qopts ...QOpt) (q Q)

func (Q) Context

func (q Q) Context() (context.Context, context.CancelFunc)

func (Q) ExecQ

func (q Q) ExecQ(query string, args ...interface{}) error

func (Q) ExecQIter

func (q Q) ExecQIter(query string, args ...interface{}) (sql.Result, context.CancelFunc, error)

CAUTION: A subtle problem lurks here, because the following code is buggy:

ctx, cancel := context.WithCancel(context.Background())
rows, err := db.QueryContext(ctx, "SELECT foo")
cancel() // canceling here "poisons" the scan below
for rows.Next() {
  rows.Scan(...)
}

We must cancel the context only after we have completely finished using the returned rows or result from the query/exec

For this reasons, the following functions return a context.CancelFunc and it is up to the caller to ensure that cancel is called after it has finished

Generally speaking, it makes more sense to use Get/Select in most cases, which avoids this problem

func (Q) ExecQNamed

func (q Q) ExecQNamed(query string, arg interface{}) (err error)

func (Q) Get

func (q Q) Get(dest interface{}, query string, args ...interface{}) error

func (Q) GetNamed

func (q Q) GetNamed(sql string, dest interface{}, arg interface{}) error

func (Q) Select

func (q Q) Select(dest interface{}, query string, args ...interface{}) error

Select and Get are safe to wrap the context cancellation because the rows are entirely consumed within the call

func (Q) Transaction

func (q Q) Transaction(fc func(q Queryer) error, txOpts ...TxOptions) error

func (Q) WithOpts

func (q Q) WithOpts(qopts ...QOpt) Q

type QConfig added in v1.10.0

type QConfig interface {
	LogSQL() bool
	DatabaseDefaultQueryTimeout() time.Duration
}

type QOpt

type QOpt func(*Q)

QOpt pattern for ORM methods aims to clarify usage and remove some common footguns, notably:

1. It should be easy and obvious how to pass a parent context or a transaction into an ORM method 2. Simple queries should not be cluttered 3. It should have compile-time safety and be explicit 4. It should enforce some sort of context deadline on all queries by default 5. It should optimise for clarity and readability 6. It should mandate using sqlx everywhere, gorm is forbidden in new code 7. It should make using sqlx a little more convenient by wrapping certain methods 8. It allows easier mocking of DB calls (Queryer is an interface)

The two main concepts introduced are:

A `Q` struct that wraps a `sqlx.DB` or `sqlx.Tx` and implements the `pg.Queryer` interface.

This struct is initialised with `QOpts` which define how the queryer should behave. `QOpts` can define a parent context, an open transaction or other options to configure the Queryer.

A sample ORM method looks like this:

func (o *orm) GetFoo(id int64, qopts ...pg.QOpt) (Foo, error) {
	q := pg.NewQ(q, qopts...)
	return q.Exec(...)
}

Now you can call it like so:

orm.GetFoo(1) // will automatically have default query timeout context set
orm.GetFoo(1, pg.WithParentCtx(ctx)) // will wrap the supplied parent context with the default query context
orm.GetFoo(1, pg.WithQueryer(tx)) // allows to pass in a running transaction or anything else that implements Queryer
orm.GetFoo(q, pg.WithQueryer(tx), pg.WithParentCtx(ctx)) // options can be combined

type Queryer

type Queryer interface {
	sqlx.Ext
	sqlx.ExtContext
	sqlx.Preparer
	sqlx.PreparerContext
	sqlx.Queryer
	Select(dest interface{}, query string, args ...interface{}) error
	SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
	PrepareNamed(query string) (*sqlx.NamedStmt, error)
	QueryRow(query string, args ...interface{}) *sql.Row
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
	Get(dest interface{}, query string, args ...interface{}) error
	GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
	NamedExec(query string, arg interface{}) (sql.Result, error)
	NamedQuery(query string, arg interface{}) (*sqlx.Rows, error)
}

type ReportFn added in v1.13.0

type ReportFn func(sql.DBStats)

type StatFn added in v1.13.0

type StatFn func() sql.DBStats

type StatsReporter added in v1.13.0

type StatsReporter struct {
	// contains filtered or unexported fields
}

func NewStatsReporter added in v1.13.0

func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...StatsReporterOpt) *StatsReporter

func (*StatsReporter) Start added in v1.13.0

func (r *StatsReporter) Start(ctx context.Context)

func (*StatsReporter) Stop added in v1.13.0

func (r *StatsReporter) Stop()

Stop stops all resources owned by the reporter and waits for all of them to be done

type StatsReporterOpt added in v1.13.0

type StatsReporterOpt func(*StatsReporter)

func StatsCustomReporterFn added in v1.13.0

func StatsCustomReporterFn(fn ReportFn) StatsReporterOpt

func StatsInterval added in v1.13.0

func StatsInterval(d time.Duration) StatsReporterOpt

type Subscription

type Subscription interface {
	Events() <-chan Event
	Close()

	ChannelName() string
	InterestedIn(event Event) bool
	Send(event Event)
}

Subscription represents a subscription to a Postgres event channel

type TxBeginner

type TxBeginner interface {
	BeginTxx(context.Context, *sql.TxOptions) (*sqlx.Tx, error)
}

TxBeginner can be a db or a conn, anything that implements BeginTxx

type TxOptions

type TxOptions struct {
	sql.TxOptions
	LockTimeout            time.Duration
	IdleInTxSessionTimeout time.Duration
}

func OptReadOnlyTx

func OptReadOnlyTx() TxOptions

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL