pg

package
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package pg defines the primary PostgreSQL-powered DB and Pool types used to support Kwil DB.

See the DB type for more information.

Index

Constants

View Source
const (
	MessageTypePrepare          pglogrepl.MessageType = 'P' // this is not the prepare you're looking for
	MessageTypeBeginPrepare     pglogrepl.MessageType = 'b'
	MessageTypeCommitPrepared   pglogrepl.MessageType = 'K'
	MessageTypeRollbackPrepared pglogrepl.MessageType = 'r'
)
View Source
const DefaultSchemaFilterPrefix = "ds_"
View Source
const (
	InternalSchemaName = "kwild_internal"
)

Variables

This section is empty.

Functions

func UseLogger

func UseLogger(log klog.Logger)

Types

type BeginPrepareMessageV3

type BeginPrepareMessageV3 struct {
	// Flags currently unused (must be 0).
	// Flags uint8
	// PrepareLSN is the LSN of the prepare.
	PrepareLSN pglogrepl.LSN
	// EndPrepareLSN is the end LSN of the prepared transaction.
	EndPrepareLSN pglogrepl.LSN
	// PrepareTime is the prepare timestamp of the transaction
	PrepareTime time.Time
	// Xid of the transaction
	Xid uint32
	// UserGID ius the user-defined GID of the prepared transaction.
	UserGID string
}

BeginPrepareMessageV3 is the beginning of a prepared transaction message.

func (*BeginPrepareMessageV3) Decode

func (m *BeginPrepareMessageV3) Decode(src []byte) error

func (*BeginPrepareMessageV3) Type

type CommitPreparedMessageV3

type CommitPreparedMessageV3 struct {
	// Flags currently unused (must be 0).
	Flags uint8
	// CommitLSN is the LSN of the commit of the prepared transaction.
	CommitLSN pglogrepl.LSN
	// EndCommitLSN is the end LSN of the commit of the prepared transaction.
	EndCommitLSN pglogrepl.LSN
	// CommitTime is the commit timestamp of the transaction
	CommitTime time.Time
	// Xid of the transaction
	Xid uint32
	// UserGID is the user-defined GID of the prepared transaction.
	UserGID string
}

CommitPreparedMessageV3 is a commit prepared message.

func (*CommitPreparedMessageV3) Decode

func (m *CommitPreparedMessageV3) Decode(src []byte) error

func (*CommitPreparedMessageV3) Type

type ConnConfig

type ConnConfig struct {
	// Host, Port, User, Pass, and DBName are used verbatim to create a
	// connection string in DSN format.
	// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
	Host, Port string
	User, Pass string
	DBName     string
}

ConnConfig groups the basic connection settings used to construct the DSN "connection string" used to open a new connection to a postgres host. TODO: use this in the various constructors for DB, Pool, etc.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is a session-aware wrapper that creates and stores a write Tx on request, and provides top level Exec/Set methods that error if no Tx exists. This design prevents any out-of-session write statements from executing, and makes uncommitted reads explicit (and impossible in the absence of an active transaction).

This type is tailored to use in kwild in the following ways:

  1. Controlled transactional interaction that requires beginning a transaction before using the Exec method, unless put in "autocommit" mode using the AutoCommit method. Use of the write connection when not executing a block's transactions is prevented.

  2. Using an underlying connection pool, with multiple readers and a single write connection to ensure all uses of Execute operate on the active transaction.

  3. Emulating SQLite changesets by collecting WAL data for updates from a dedicated logical replication connection and slot. The Precommit method is used to retrieve the commit ID prior to Commit.

DB requires a superuser connection to a Postgres database that can perform administrative actions on the database.

func NewDB

func NewDB(ctx context.Context, cfg *DBConfig) (*DB, error)

NewDB creates a new Kwil DB instance. On creation, it will connect to the configured postgres process, creating as many connections as specified by the PoolConfig plus a special connection for a logical replication slot receiver. The database user (postgresql "role") must be a super user for several reasons: creating triggers, collations, and the replication publication.

WARNING: There must only be ONE instance of a DB for a given postgres database. Transactions that use the Precommit method update an internal table used to sequence transactions.

func (*DB) AutoCommit

func (db *DB) AutoCommit(auto bool)

AutoCommit toggles auto-commit mode, in which the Execute method may be used without having to begin/commit. This is to support startup and initialization tasks that occur prior to the start of the atomic commit process used while executing blocks.

func (*DB) BeginOuterTx

func (db *DB) BeginOuterTx(ctx context.Context) (sql.OuterTx, error)

BeginTx makes the DB's singular transaction, which is used automatically by consumers of the Query and Execute methods. This is the mode of operation used by Kwil to have one system coordinating transaction lifetime, with one or more other systems implicitly using the transaction for their queries.

The returned transaction is also capable of creating nested transactions. This functionality is used to prevent user dataset query errors from rolling back the outermost transaction.

func (*DB) BeginReadTx

func (db *DB) BeginReadTx(ctx context.Context) (sql.Tx, error)

ReadTx creates a read-only transaction for the database. It obtains a read connection from the pool, which will be returned to the pool when the transaction is closed.

func (*DB) BeginTx

func (db *DB) BeginTx(ctx context.Context) (sql.Tx, error)

func (*DB) Close

func (db *DB) Close() error

Close shuts down the Kwil DB. This stops all connections and the WAL data receiver.

func (*DB) Execute

func (db *DB) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)

Execute runs a statement on an existing transaction, or on a short lived transaction from the write connection if in auto-commit mode.

func (*DB) Pool

func (db *DB) Pool() *Pool

Pool is a trapdoor to get the connection pool. Probably not for normal Kwil DB operation, but test setup/teardown.

func (*DB) Query

func (db *DB) Query(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)

Query performs a read-only query on a read connection.

type DBConfig

type DBConfig struct {
	PoolConfig

	// SchemaFilter is used to include WAL data for certain *postgres* schema
	// (not Kwil schema). If nil, the default is to include updates to tables in
	// any schema prefixed by "ds_".
	SchemaFilter func(string) bool
}

DBConfig is the configuration for the Kwil DB backend, which includes the connection parameters and a schema filter used to selectively include WAL data for certain PostgreSQL schemas in commit ID calculation.

type NamedArgs

type NamedArgs = pgx.NamedArgs

NamedArgs is a query rewriter that can be used as one of the first arguments in the []any provided to a query function so that the named arguments are automatically used to rewrite the SQL statement from named (using @argname) syntax to positional ($1, $2, etc.). IMPORTANT: Note that the input statement requires named arguments to us "@" instead of "$" for the named arguments. Modify the SQL string as necessary to work with this rewriter.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a simple read connection pool with one dedicated writer connection. This type is relatively low level, and Kwil will generally use the DB type to manage sessions instead of this type directly. It is exported primarily for testing and reuse in more general use cases.

Pool supports Kwil's single transactional DB writer model:

  • a single writer connection, on which a transaction is created by a top level system during block execution (i.e. the AbciApp), and from which reads of uncommitted DB records may be performed.
  • multiple readers, which may service other asynchronous operations such as a gRPC user service.

The write methods from the Tx returned from the BeginTx method should be preferred over directly using the Pool's write methods. The DB type is the session-aware wrapper that creates and stores the write Tx, and provides top level Exec/Set methods that error if no Tx exists. Only use Pool as a building block or for testing individual systems outside of the context of a session.

func NewPool

func NewPool(ctx context.Context, cfg *PoolConfig) (*Pool, error)

NewPool creates a connection pool to a PostgreSQL database.

func (*Pool) BeginReadTx

func (p *Pool) BeginReadTx(ctx context.Context) (sql.Tx, error)

BeginReadTx starts a read-only transaction.

func (*Pool) BeginTx

func (p *Pool) BeginTx(ctx context.Context) (sql.Tx, error)

BeginTx starts a read-write transaction. It is an error to call this twice without first closing the initial transaction.

func (*Pool) Close

func (p *Pool) Close() error

func (*Pool) Execute

func (p *Pool) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)

Execute performs a read-write query on the writer connection.

func (*Pool) Query

func (p *Pool) Query(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)

Query performs a read-only query using the read connection pool. It is executed in a transaction with read only access mode to ensure there can be no modifications.

type PoolConfig

type PoolConfig struct {
	ConnConfig

	// MaxConns is the maximum number of allowable connections, including the
	// one write connection. Thus there will be MaxConns-1 readers.
	MaxConns uint32
}

PoolConfig combines a connection config with additional options for a pool of read connections and a single write connection, as required for kwild.MaxConns

type PrepareMessageV3

type PrepareMessageV3 struct {
	// Flags currently unused (must be 0).
	Flags uint8
	// PrepareLSN is the LSN of the prepare.
	PrepareLSN pglogrepl.LSN
	// EndPrepareLSN is the end LSN of the prepared transaction.
	EndPrepareLSN pglogrepl.LSN
	// PrepareTime is the prepare timestamp of the transaction
	PrepareTime time.Time
	// Xid of the transaction
	Xid uint32
	// UserGID ius the user-defined GID of the prepared transaction.
	UserGID string
}

PrepareMessageV3 is the a prepared transaction message.

func (*PrepareMessageV3) Decode

func (m *PrepareMessageV3) Decode(src []byte) error

func (*PrepareMessageV3) Type

type QueryMode

type QueryMode = pgx.QueryExecMode

QueryMode is a type recognized by the query methods when in one of the first arguments in the []any that causes the query to be executed in a certain mode. Presently this is used to change from the prepare/describe approaches to determining input argument type to a simpler mode that infers the argument types from the passed Go variable types, which is helpful for "in-line expressions" such as `SELECT $1;` that convey no information on their own about the type of the argument, resulting in an assumed type that may not match the type of provided Go variable (error in many cases).

const (
	// QueryModeDefault uses a prepare-query request cycle to determine arg
	// types (OID) using postgres to describe the statement. This may not be
	// helpful for in-line expressions that reference no known table. There must
	// be an encode/decode plan available for the OID and the Go type.
	QueryModeDefault QueryMode = pgx.QueryExecModeCacheStatement
	// QueryModeExec still uses the extended protocol, but does not ask
	// postgresql to describe the statement to determine types.
	QueryModeExec QueryMode = pgx.QueryExecModeExec
	// QueryModeSimple is like QueryModeExec, except that it uses the "simple"
	// postgresql wire protocol. Prefer QueryModeExec if argument type inference
	// based on the Go variables is required since this forces everything into text.
	QueryModeSimple QueryMode = pgx.QueryExecModeSimpleProtocol

	// QueryModeInferredArgTypes runs the query in a special execution mode that
	// is like QueryExecModeExec except that it infers the argument OIDs from
	// the Go argument values and asserts those types in preparing the
	// statement, which is necessary for our in-line expressions. It is
	// incompatible with other special arguments like NamedArgs.
	QueryModeInferredArgTypes QueryMode = 1 << 16
)

type RollbackPreparedMessageV3

type RollbackPreparedMessageV3 struct {
	// Flags currently unused (must be 0).
	Flags uint8
	// EndLSN is the end LSN of the prepared transaction.
	EndLSN pglogrepl.LSN
	// RollbackLSN is the end LSN of the rollback of the prepared transaction.
	RollbackLSN pglogrepl.LSN
	// PrepareTime is the prepare timestamp of the transaction
	PrepareTime time.Time
	// RollbackTime is the rollback timestamp of the transaction
	RollbackTime time.Time
	// Xid of the transaction
	Xid uint32
	// UserGID ius the user-defined GID of the prepared transaction.
	UserGID string
}

RollbackPreparedMessageV3 is a rollback prepared message.

func (*RollbackPreparedMessageV3) Decode

func (m *RollbackPreparedMessageV3) Decode(src []byte) error

func (*RollbackPreparedMessageV3) Type

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL