Documentation ¶
Index ¶
- Constants
- func Batch(cb BatchFunc) error
- func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error)
- func NewEventBroadcaster(uri url.URL, minReconnectInterval time.Duration, ...) *eventBroadcaster
- func OpenUnlockedDB(cfg LockedDBConfig) (db *sqlx.DB, err error)
- func PrepareQueryRowx(q Queryer, sql string, dest interface{}, arg interface{}) error
- func SqlTransaction(ctx context.Context, rdb *sql.DB, lggr logger.Logger, ...) (err error)
- func SqlxTransaction(ctx context.Context, q Queryer, lggr logger.Logger, fc func(q Queryer) error, ...) (err error)
- func WrapDbWithSqlx(rdb *sql.DB) *sqlx.DB
- type BatchFunc
- type ConnectionConfig
- type Event
- type EventBroadcaster
- type LeaseLock
- type LeaseLockConfig
- type Limit
- type LockedDB
- type LockedDBConfig
- type NullEventBroadcaster
- func (*NullEventBroadcaster) Close() error
- func (*NullEventBroadcaster) HealthReport() map[string]error
- func (*NullEventBroadcaster) Name() string
- func (*NullEventBroadcaster) Notify(channel string, payload string) error
- func (*NullEventBroadcaster) Ready() error
- func (*NullEventBroadcaster) Start(context.Context) error
- func (ne *NullEventBroadcaster) Subscribe(channel, payloadFilter string) (Subscription, error)
- type NullSubscription
- type Q
- func (q Q) Context() (context.Context, context.CancelFunc)
- func (q Q) ExecQ(query string, args ...interface{}) error
- func (q Q) ExecQIter(query string, args ...interface{}) (sql.Result, context.CancelFunc, error)
- func (q Q) ExecQNamed(query string, arg interface{}) (err error)
- func (q Q) Get(dest interface{}, query string, args ...interface{}) error
- func (q Q) GetNamed(sql string, dest interface{}, arg interface{}) error
- func (q Q) Select(dest interface{}, query string, args ...interface{}) error
- func (q Q) Transaction(fc func(q Queryer) error, txOpts ...TxOptions) error
- func (q Q) WithOpts(qopts ...QOpt) Q
- type QConfig
- type QOpt
- type Queryer
- type ReportFn
- type StatFn
- type StatsReporter
- type StatsReporterOpt
- type Subscription
- type TxBeginner
- type TxOptions
Constants ¶
const ( ChannelInsertOnEthTx = "insert_on_eth_txes" ChannelInsertOnCosmosMsg = "insert_on_cosmos_msg" )
Postgres channel to listen for new eth_txes
const BatchSize uint = 1000
BatchSize is the default number of DB records to access in one batch
const ( // NOTE: This is the default level in Postgres anyway, we just make it // explicit here DefaultIsolation = sql.LevelReadCommitted )
NOTE: In an ideal world the timeouts below would be set to something sane in the postgres configuration by the user. Since we do not live in an ideal world, it is necessary to override them here.
They cannot easily be set at a session level due to how Go's connection pooling works.
const ( // DefaultQueryTimeout is a reasonable upper bound for how long a SQL query should take. // The configured value should be used instead of this if possible. DefaultQueryTimeout = 10 * time.Second )
Variables ¶
This section is empty.
Functions ¶
func NewConnection ¶
func NewConnection(uri string, dialect dialects.DialectName, config ConnectionConfig) (db *sqlx.DB, err error)
func NewEventBroadcaster ¶
func OpenUnlockedDB ¶
func OpenUnlockedDB(cfg LockedDBConfig) (db *sqlx.DB, err error)
OpenUnlockedDB just opens DB connection, without any DB locks. This should be used carefully, when we know we don't need any locks. Currently this is used by RebroadcastTransactions command only.
func PrepareQueryRowx ¶
func SqlTransaction ¶
func SqlxTransaction ¶
Types ¶
type BatchFunc ¶
BatchFunc is the function to execute on each batch of records, should return the count of records affected
type ConnectionConfig ¶
type EventBroadcaster ¶
type EventBroadcaster interface { services.ServiceCtx Subscribe(channel, payloadFilter string) (Subscription, error) Notify(channel string, payload string) error }
EventBroadcaster opaquely manages a collection of Postgres event listeners and broadcasts events to subscribers (with an optional payload filter).
type LeaseLock ¶
LeaseLock handles taking an exclusive lease on database access. This is not enforced by any database primitives, but rather voluntarily respected by other instances of the Chainlink application.
Chainlink is designed to run as a single instance. Running multiple instances of Chainlink on a single database at the same time is not supported and likely to lead to strange errors and possibly even data integrity failures.
With that being said, a common use case is to run multiple Chainlink instances in failover mode. The first instance will take some kind of lock on the database and subsequent instances will wait trying to take this lock in case the first instance disappears or dies.
Traditionally Chainlink has used an advisory lock to manage this. However, advisory locks come with several problems, notably: - Postgres does not really like it when you hold locks open for a very long time (hours/days). It hampers certain internal cleanup tasks and is explicitly discouraged by the postgres maintainers - The advisory lock can silently disappear on postgres upgrade - Advisory locks do not play nicely with pooling tools such as pgbouncer - If the application crashes, the advisory lock can be left hanging around for a while (sometimes hours) and can require manual intervention to remove it
For this reason, we now use a database leaseLock instead, which works as such: - Have one row in a database which is updated periodically with the client ID - CL node A will run a background process on start that updates this e.g. once per second - CL node B will spinlock, checking periodically to see if the update got too old. If it goes more than, say, 5s without updating, it assumes that node A is dead and takes over. Now CL node B is the owner of the row and it updates this every second - If CL node A comes back somehow, it will go to take out a lease and realise that the database has been leased to another process, so it will panic and quit immediately
func NewLeaseLock ¶
NewLeaseLock creates a "leaseLock" - an entity that tries to take an exclusive lease on the database
type LeaseLockConfig ¶
type Limit ¶
type Limit int
Limit is a helper driver.Valuer for LIMIT queries which uses nil/NULL for negative values.
type LockedDB ¶
LockedDB bounds DB connection and DB locks.
func NewLockedDB ¶
func NewLockedDB(cfg LockedDBConfig, lggr logger.Logger) LockedDB
NewLockedDB creates a new instance of LockedDB.
type LockedDBConfig ¶
type LockedDBConfig interface { ConnectionConfig AppID() uuid.UUID DatabaseLockingMode() string DatabaseURL() url.URL DatabaseDefaultQueryTimeout() time.Duration LeaseLockDuration() time.Duration LeaseLockRefreshInterval() time.Duration GetDatabaseDialectConfiguredOrDefault() dialects.DialectName MigrateDatabase() bool }
type NullEventBroadcaster ¶
type NullEventBroadcaster struct {
Sub *NullSubscription
}
NullEventBroadcaster implements null pattern for event broadcaster
func NewNullEventBroadcaster ¶
func NewNullEventBroadcaster() *NullEventBroadcaster
func (*NullEventBroadcaster) HealthReport ¶
func (*NullEventBroadcaster) HealthReport() map[string]error
HealthReport does no-op
func (*NullEventBroadcaster) Name ¶
func (*NullEventBroadcaster) Name() string
func (*NullEventBroadcaster) Notify ¶
func (*NullEventBroadcaster) Notify(channel string, payload string) error
func (*NullEventBroadcaster) Start ¶
func (*NullEventBroadcaster) Start(context.Context) error
Start does no-op.
func (*NullEventBroadcaster) Subscribe ¶
func (ne *NullEventBroadcaster) Subscribe(channel, payloadFilter string) (Subscription, error)
type NullSubscription ¶
type NullSubscription struct {
Ch chan (Event)
}
func (*NullSubscription) ChannelName ¶
func (ns *NullSubscription) ChannelName() string
func (*NullSubscription) Close ¶
func (ns *NullSubscription) Close()
func (*NullSubscription) Events ¶
func (ns *NullSubscription) Events() <-chan Event
func (*NullSubscription) InterestedIn ¶
func (ns *NullSubscription) InterestedIn(event Event) bool
func (*NullSubscription) Send ¶
func (ns *NullSubscription) Send(event Event)
type Q ¶
type Q struct { Queryer ParentCtx context.Context QueryTimeout time.Duration // contains filtered or unexported fields }
Q wraps an underlying queryer (either a *sqlx.DB or a *sqlx.Tx)
It is designed to make handling *sqlx.Tx or *sqlx.DB a little bit safer by preventing footguns such as having no deadline on contexts.
It also handles nesting transactions.
It automatically adds the default context deadline to all non-context queries (if you _really_ want to issue a query without a context, use the underlying Queryer)
This is not the prettiest construct but without macros its about the best we can do.
func (Q) ExecQIter ¶
CAUTION: A subtle problem lurks here, because the following code is buggy:
ctx, cancel := context.WithCancel(context.Background()) rows, err := db.QueryContext(ctx, "SELECT foo") cancel() // canceling here "poisons" the scan below for rows.Next() { rows.Scan(...) }
We must cancel the context only after we have completely finished using the returned rows or result from the query/exec
For this reasons, the following functions return a context.CancelFunc and it is up to the caller to ensure that cancel is called after it has finished
Generally speaking, it makes more sense to use Get/Select in most cases, which avoids this problem
func (Q) ExecQNamed ¶
type QOpt ¶
type QOpt func(*Q)
QOpt pattern for ORM methods aims to clarify usage and remove some common footguns, notably:
1. It should be easy and obvious how to pass a parent context or a transaction into an ORM method 2. Simple queries should not be cluttered 3. It should have compile-time safety and be explicit 4. It should enforce some sort of context deadline on all queries by default 5. It should optimise for clarity and readability 6. It should mandate using sqlx everywhere, gorm is forbidden in new code 7. It should make using sqlx a little more convenient by wrapping certain methods 8. It allows easier mocking of DB calls (Queryer is an interface)
The two main concepts introduced are:
A `Q` struct that wraps a `sqlx.DB` or `sqlx.Tx` and implements the `pg.Queryer` interface.
This struct is initialised with `QOpts` which define how the queryer should behave. `QOpts` can define a parent context, an open transaction or other options to configure the Queryer.
A sample ORM method looks like this:
func (o *orm) GetFoo(id int64, qopts ...pg.QOpt) (Foo, error) { q := pg.NewQ(q, qopts...) return q.Exec(...) }
Now you can call it like so:
orm.GetFoo(1) // will automatically have default query timeout context set orm.GetFoo(1, pg.WithParentCtx(ctx)) // will wrap the supplied parent context with the default query context orm.GetFoo(1, pg.WithQueryer(tx)) // allows to pass in a running transaction or anything else that implements Queryer orm.GetFoo(q, pg.WithQueryer(tx), pg.WithParentCtx(ctx)) // options can be combined
func WithLongQueryTimeout ¶
func WithLongQueryTimeout() QOpt
WithLongQueryTimeout prevents the usage of the `DefaultQueryTimeout` duration and uses `OneMinuteQueryTimeout` instead Some queries need to take longer when operating over big chunks of data, like deleting jobs, but we need to keep some upper bound timeout
func WithParentCtx ¶
WithParentCtx sets or overwrites the parent ctx
func WithParentCtxInheritTimeout ¶
If the parent has a timeout, just use that instead of DefaultTimeout
type Queryer ¶
type Queryer interface { sqlx.Ext sqlx.ExtContext sqlx.Preparer sqlx.PreparerContext sqlx.Queryer Select(dest interface{}, query string, args ...interface{}) error SelectContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error PrepareNamed(query string) (*sqlx.NamedStmt, error) QueryRow(query string, args ...interface{}) *sql.Row QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row Get(dest interface{}, query string, args ...interface{}) error GetContext(ctx context.Context, dest interface{}, query string, args ...interface{}) error NamedExec(query string, arg interface{}) (sql.Result, error) NamedQuery(query string, arg interface{}) (*sqlx.Rows, error) }
type StatsReporter ¶
type StatsReporter struct {
// contains filtered or unexported fields
}
func NewStatsReporter ¶
func NewStatsReporter(fn StatFn, lggr logger.Logger, opts ...StatsReporterOpt) *StatsReporter
func (*StatsReporter) Start ¶
func (r *StatsReporter) Start(ctx context.Context)
func (*StatsReporter) Stop ¶
func (r *StatsReporter) Stop()
Stop stops all resources owned by the reporter and waits for all of them to be done
type StatsReporterOpt ¶
type StatsReporterOpt func(*StatsReporter)
func StatsCustomReporterFn ¶
func StatsCustomReporterFn(fn ReportFn) StatsReporterOpt
func StatsInterval ¶
func StatsInterval(d time.Duration) StatsReporterOpt
type Subscription ¶
type Subscription interface { Events() <-chan Event Close() ChannelName() string InterestedIn(event Event) bool Send(event Event) }
Subscription represents a subscription to a Postgres event channel
type TxBeginner ¶
TxBeginner can be a db or a conn, anything that implements BeginTxx