pg

package
v2.13.0-beta0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2024 License: MIT Imports: 27 Imported by: 2

Documentation

Index

Constants

View Source
const BatchSize uint = 1000

BatchSize is the default number of DB records to access in one batch

Variables

View Source
var MinRequiredPGVersion = 110000

Functions

func Batch

func Batch(cb BatchFunc) error

Batch is an iterator for batches of records

func NewConnection

func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (*sqlx.DB, error)

func OpenUnlockedDB

func OpenUnlockedDB(appID uuid.UUID, cfg LockedDBConfig) (db *sqlx.DB, err error)

OpenUnlockedDB just opens DB connection, without any DB locks. This should be used carefully, when we know we don't need any locks. Currently this is used by RebroadcastTransactions command only.

func WrapDbWithSqlx

func WrapDbWithSqlx(rdb *sql.DB) *sqlx.DB

Types

type BatchFunc

type BatchFunc func(offset, limit uint) (count uint, err error)

BatchFunc is the function to execute on each batch of records, should return the count of records affected

type ConnectionConfig

type ConnectionConfig interface {
	DefaultIdleInTxSessionTimeout() time.Duration
	DefaultLockTimeout() time.Duration
	MaxOpenConns() int
	MaxIdleConns() int
}

type Getter added in v2.10.0

type Getter interface {
	Get(dest interface{}, query string, args ...interface{}) error
}

type LeaseLock

type LeaseLock interface {
	TakeAndHold(ctx context.Context) error
	ClientID() uuid.UUID
	Release()
}

LeaseLock handles taking an exclusive lease on database access. This is not enforced by any database primitives, but rather voluntarily respected by other instances of the Chainlink application.

Chainlink is designed to run as a single instance. Running multiple instances of Chainlink on a single database at the same time is not supported and likely to lead to strange errors and possibly even data integrity failures.

With that being said, a common use case is to run multiple Chainlink instances in failover mode. The first instance will take some kind of lock on the database and subsequent instances will wait trying to take this lock in case the first instance disappears or dies.

Traditionally Chainlink has used an advisory lock to manage this. However, advisory locks come with several problems, notably: - Postgres does not really like it when you hold locks open for a very long time (hours/days). It hampers certain internal cleanup tasks and is explicitly discouraged by the postgres maintainers - The advisory lock can silently disappear on postgres upgrade - Advisory locks do not play nicely with pooling tools such as pgbouncer - If the application crashes, the advisory lock can be left hanging around for a while (sometimes hours) and can require manual intervention to remove it

For this reason, we now use a database leaseLock instead, which works as such: - Have one row in a database which is updated periodically with the client ID - CL node A will run a background process on start that updates this e.g. once per second - CL node B will spinlock, checking periodically to see if the update got too old. If it goes more than, say, 5s without updating, it assumes that node A is dead and takes over. Now CL node B is the owner of the row and it updates this every second - If CL node A comes back somehow, it will go to take out a lease and realise that the database has been leased to another process, so it will panic and quit immediately

func NewLeaseLock

func NewLeaseLock(db *sqlx.DB, appID uuid.UUID, lggr logger.Logger, cfg LeaseLockConfig) LeaseLock

NewLeaseLock creates a "leaseLock" - an entity that tries to take an exclusive lease on the database

type LeaseLockConfig

type LeaseLockConfig struct {
	DefaultQueryTimeout  time.Duration
	LeaseDuration        time.Duration
	LeaseRefreshInterval time.Duration
}

type LockedDB

type LockedDB interface {
	Open(ctx context.Context) error
	Close() error
	DB() *sqlx.DB
}

LockedDB bounds DB connection and DB locks.

func NewLockedDB

func NewLockedDB(appID uuid.UUID, cfg LockedDBConfig, lockCfg config.Lock, lggr logger.Logger) LockedDB

NewLockedDB creates a new instance of LockedDB.

type LockedDBConfig

type LockedDBConfig interface {
	ConnectionConfig
	URL() url.URL
	DefaultQueryTimeout() time.Duration
	Dialect() dialects.DialectName
}

type Queryer

type Queryer interface {
	sqlx.Ext
	sqlx.ExtContext
	sqlx.Preparer
	sqlx.PreparerContext
	sqlx.Queryer
	Select(dest interface{}, query string, args ...interface{}) error
	SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
	PrepareNamed(query string) (*sqlx.NamedStmt, error)
	QueryRow(query string, args ...interface{}) *sql.Row
	QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
	Get(dest interface{}, query string, args ...interface{}) error
	GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error
	NamedExec(query string, arg interface{}) (sql.Result, error)
	NamedQuery(query string, arg interface{}) (*sqlx.Rows, error)
}

Queryer is deprecated. Use sqlutil.DataSource instead

type ReportFn

type ReportFn func(sql.DBStats)

type StatFn

type StatFn func() sql.DBStats

type StatsReporter

type StatsReporter struct {
	// contains filtered or unexported fields
}

func NewStatsReporter

func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...StatsReporterOpt) *StatsReporter

func (*StatsReporter) Start

func (r *StatsReporter) Start(ctx context.Context)

func (*StatsReporter) Stop

func (r *StatsReporter) Stop()

Stop stops all resources owned by the reporter and waits for all of them to be done

type StatsReporterOpt

type StatsReporterOpt func(*StatsReporter)

func StatsCustomReporterFn

func StatsCustomReporterFn(fn ReportFn) StatsReporterOpt

func StatsInterval

func StatsInterval(d time.Duration) StatsReporterOpt

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL