pg

package
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 18, 2024 License: Apache-2.0 Imports: 32 Imported by: 0

Documentation

Overview

Package pg defines the primary PostgreSQL-powered DB and Pool types used to support Kwil DB.

See the DB type for more information.

Index

Constants

View Source
const (
	RelationType       = byte(0x01)
	ChangesetEntryType = byte(0x02)
	BlockSpendsType    = byte(0x03)
)
View Source
const (
	MessageTypePrepare          pglogrepl.MessageType = 'P' // this is not the prepare you're looking for
	MessageTypeBeginPrepare     pglogrepl.MessageType = 'b'
	MessageTypeCommitPrepared   pglogrepl.MessageType = 'K'
	MessageTypeRollbackPrepared pglogrepl.MessageType = 'r'
	MessageTypeStreamPrepare    pglogrepl.MessageType = 'p'
)
View Source
const DefaultSchemaFilterPrefix = "ds_"
View Source
const (
	InternalSchemaName = "kwild_internal"
)

Variables

View Source
var ErrNaN = errors.New("NaN")
View Source
var ErrUnsupportedOID = errors.New("unsupported OID")

Functions

func DecodeStreamPrefix added in v0.9.0

func DecodeStreamPrefix(b [5]byte) (csType byte, sz uint32)

DecodeStreamPrefix decodes prefix bytes for a changeset element. This mirrors the encoding convention in StreamElement.

func IsUnchanged added in v0.9.0

func IsUnchanged(v any) bool

func QueryRowFunc added in v0.9.0

func QueryRowFunc(ctx context.Context, tx sql.Executor, stmt string,
	scans []any, fn func() error, args ...any) error

QueryRowFunc will attempt to execute an SQL statement, handling the rows and returned values as described by the sql.QueryScanner interface. If the provided Executor is also a sql.QueryScanner, that method will be used, otherwise, it will attempt to use the underlying DB connection. The latter is supported for all concrete transaction types in this package as well as instances of the pgx.Tx interface.

func QueryRowFuncAny added in v0.9.0

func QueryRowFuncAny(ctx context.Context, tx sql.Executor, stmt string,
	fn func([]any) error, args ...any) error

QueryRowFuncAny is similar to QueryRowFunc, except that no scan values slice is provided. The provided function is called for each row of the result. The caller does not determine the types of the Go variables in the values slice. In this way it behaves similar to Execute, but providing "for each row" functionality so that every row does not need to be loaded into memory. See also QueryRowFunc, which allows the caller to provide a scan values slice.

func RowCount added in v0.9.0

func RowCount(ctx context.Context, qualifiedTable string, db sql.Executor) (int64, error)

RowCount gets a precise row count for the named fully qualified table. If the Executor satisfies the RowCounter interface, that method will be used directly. Otherwise a simple select query is used.

func StreamElement added in v0.9.0

func StreamElement(w io.Writer, s ChangeStreamer) error

StreamElement writes the serialized changeset element to the writer, preceded by the type's prefix and serialized size. This is supports streamed encoding. When decoding, use DecodeStreamPrefix to interpret the 5-byte prefixes before each encoded element.

func TableStats added in v0.9.0

func TableStats(ctx context.Context, schema, table string, db sql.Executor) (*sql.Statistics, error)

TableStats collects deterministic statistics for a table. If schema is empty, the "public" schema is assumed. This method is used to obtain the ground truth statistics for a table; incremental statistics updates should be preferred when possible. If the sql.Executor implementation is a TableStatser, it's method is used directly. This is primarily to allow a stub DB for testing.

func TextValue added in v0.9.0

func TextValue(val any) (txt string, null bool, ok bool)

TextValue recognizes types used to return SQL TEXT values from SQL queries. Depending on the type and value, the value may represent a NULL as indicated by the null return.

func UseLogger

func UseLogger(log klog.Logger)

Types

type BeginPrepareMessageV3

type BeginPrepareMessageV3 struct {
	// Flags currently unused (must be 0).
	// Flags uint8
	// PrepareLSN is the LSN of the prepare.
	PrepareLSN pglogrepl.LSN
	// EndPrepareLSN is the end LSN of the prepared transaction.
	EndPrepareLSN pglogrepl.LSN
	// PrepareTime is the prepare timestamp of the transaction
	PrepareTime time.Time
	// Xid of the transaction
	Xid uint32
	// UserGID ius the user-defined GID of the prepared transaction.
	UserGID string
}

BeginPrepareMessageV3 is the beginning of a prepared transaction message.

func (*BeginPrepareMessageV3) Decode

func (m *BeginPrepareMessageV3) Decode(src []byte) error

func (*BeginPrepareMessageV3) Type

type ChangeStreamer added in v0.9.0

type ChangeStreamer interface {
	encoding.BinaryMarshaler
	Prefix() byte
}

ChangeStreamer is a type that supports streaming with StreamElement. This and the associated helper functions could alternatively be in migrator.

type ChangesetEntry added in v0.9.0

type ChangesetEntry struct {
	// RelationIdx is the index in the full relation list for the changeset that
	// precedes the tuple change entries.
	RelationIdx uint32

	OldTuple []*TupleColumn // empty for insert
	NewTuple []*TupleColumn // empty for delete

}

func (*ChangesetEntry) ApplyChangesetEntry added in v0.9.0

func (ce *ChangesetEntry) ApplyChangesetEntry(ctx context.Context, tx sql.DB, relation *Relation) error

func (*ChangesetEntry) DecodeTuples added in v0.9.0

func (c *ChangesetEntry) DecodeTuples(relation *Relation) (oldValues, newValues []any, err error)

DecodeTuple decodes serialized tuple column values into their native types. Any value may be nil, depending on the ValueType.

func (*ChangesetEntry) Kind added in v0.9.0

func (ce *ChangesetEntry) Kind() string

func (*ChangesetEntry) MarshalBinary added in v0.9.0

func (ce *ChangesetEntry) MarshalBinary() ([]byte, error)

func (*ChangesetEntry) Prefix added in v0.9.0

func (ce *ChangesetEntry) Prefix() byte

func (*ChangesetEntry) String added in v0.9.0

func (ce *ChangesetEntry) String() string

func (*ChangesetEntry) UnmarshalBinary added in v0.9.0

func (ce *ChangesetEntry) UnmarshalBinary(data []byte) error

type ColInfo added in v0.9.0

type ColInfo struct {
	Pos  int
	Name string
	// DataType is the string reported by information_schema.column for the
	// column. Use the Type() method to return the ColType.
	DataType string
	Array    bool
	Nullable bool
	// contains filtered or unexported fields
}

ColInfo is used when ingesting column descriptions from PostgreSQL, such as from information_schema.column. Use the Type method to return a canonical ColType.

func ColumnInfo added in v0.9.0

func ColumnInfo(ctx context.Context, tx sql.Executor, schema, tbl string) ([]ColInfo, error)

ColumnInfo attempts to describe the columns of a table in a specified PostgreSQL schema. The results are **as reported by information_schema.column**.

If the provided sql.Executor is also a ColumnInfoer, its ColumnInfo method will be used. This is primarily for testing with a mocked DB transaction. Otherwise, the Executor must be one of the transaction types created by this package, which provide access to the underlying DB connection.

func (*ColInfo) IsBool added in v0.9.0

func (ci *ColInfo) IsBool() bool

func (*ColInfo) IsByteA added in v0.9.0

func (ci *ColInfo) IsByteA() bool

func (*ColInfo) IsFloat added in v0.9.0

func (ci *ColInfo) IsFloat() bool

func (*ColInfo) IsInt added in v0.9.0

func (ci *ColInfo) IsInt() bool

func (*ColInfo) IsNumeric added in v0.9.0

func (ci *ColInfo) IsNumeric() bool

func (*ColInfo) IsText added in v0.9.0

func (ci *ColInfo) IsText() bool

func (*ColInfo) IsTime added in v0.9.0

func (ci *ColInfo) IsTime() bool

func (*ColInfo) IsUINT256 added in v0.9.0

func (ci *ColInfo) IsUINT256() bool

func (*ColInfo) IsUUID added in v0.9.0

func (ci *ColInfo) IsUUID() bool

func (*ColInfo) Type added in v0.9.0

func (ci *ColInfo) Type() ColType

Type returns the canonical ColType based on the DataType, which is the type string reported by PostgreSQL from information_schema.columns.

type ColType added in v0.9.0

type ColType string

ColType is the type used to enumerate various known column types (and arrays of those types). These are used to describe tables characterized by the ColumnInfo function, and to support its ScanVal method.

const (
	ColTypeInt     ColType = "int"
	ColTypeText    ColType = "text"
	ColTypeBool    ColType = "bool"
	ColTypeByteA   ColType = "bytea"
	ColTypeUUID    ColType = "uuid"
	ColTypeNumeric ColType = "numeric"
	ColTypeUINT256 ColType = "uint256"
	ColTypeFloat   ColType = "float"
	ColTypeTime    ColType = "timestamp"

	ColTypeIntArray     ColType = "int[]"
	ColTypeTextArray    ColType = "text[]"
	ColTypeBoolArray    ColType = "bool[]"
	ColTypeByteAArray   ColType = "bytea[]"
	ColTypeUUIDArray    ColType = "uuid[]"
	ColTypeNumericArray ColType = "numeric[]"
	ColTypeUINT256Array ColType = "uint256[]"
	ColTypeFloatArray   ColType = "float[]"
	ColTypeTimeArray    ColType = "timestamp[]"

	ColTypeUnknown ColType = "unknown"
)

type Column added in v0.9.0

type Column struct {
	Name string
	Type *types.DataType
}

Column is a column name and value.

type CommitPreparedMessageV3

type CommitPreparedMessageV3 struct {
	// Flags currently unused (must be 0).
	Flags uint8
	// CommitLSN is the LSN of the commit of the prepared transaction.
	CommitLSN pglogrepl.LSN
	// EndCommitLSN is the end LSN of the commit of the prepared transaction.
	EndCommitLSN pglogrepl.LSN
	// CommitTime is the commit timestamp of the transaction
	CommitTime time.Time
	// Xid of the transaction
	Xid uint32
	// UserGID is the user-defined GID of the prepared transaction.
	UserGID string
}

CommitPreparedMessageV3 is a commit prepared message.

func (*CommitPreparedMessageV3) Decode

func (m *CommitPreparedMessageV3) Decode(src []byte) error

func (*CommitPreparedMessageV3) Type

type ConnConfig

type ConnConfig struct {
	// Host, Port, User, Pass, and DBName are used verbatim to create a
	// connection string in DSN format.
	// https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
	Host, Port string
	User, Pass string
	DBName     string
}

ConnConfig groups the basic connection settings used to construct the DSN "connection string" used to open a new connection to a postgres host. TODO: use this in the various constructors for DB, Pool, etc.

type DB

type DB struct {
	// contains filtered or unexported fields
}

DB is a session-aware wrapper that creates and stores a write Tx on request, and provides top level Exec/Set methods that error if no Tx exists. This design prevents any out-of-session write statements from executing, and makes uncommitted reads explicit (and impossible in the absence of an active transaction).

This type is tailored to use in kwild in the following ways:

  1. Controlled transactional interaction that requires beginning a transaction before using the Exec method, unless put in "autocommit" mode using the AutoCommit method. Use of the write connection when not executing a block's transactions is prevented.

  2. Using an underlying connection pool, with multiple readers and a single write connection to ensure all uses of Execute operate on the active transaction.

  3. Emulating SQLite changesets by collecting WAL data for updates from a dedicated logical replication connection and slot. The Precommit method is used to retrieve the commit ID prior to Commit.

DB requires a superuser connection to a Postgres database that can perform administrative actions on the database.

func NewDB

func NewDB(ctx context.Context, cfg *DBConfig) (*DB, error)

NewDB creates a new Kwil DB instance. On creation, it will connect to the configured postgres process, creating as many connections as specified by the PoolConfig plus a special connection for a logical replication slot receiver. The database user (postgresql "role") must be a super user for several reasons: creating triggers, collations, and the replication publication.

WARNING: There must only be ONE instance of a DB for a given postgres database. Transactions that use the Precommit method update an internal table used to sequence transactions.

func (*DB) AutoCommit

func (db *DB) AutoCommit(auto bool)

AutoCommit toggles auto-commit mode, in which the Execute method may be used without having to begin/commit. This is to support startup and initialization tasks that occur prior to the start of the atomic commit process used while executing blocks.

func (*DB) BeginDelayedReadTx added in v0.9.0

func (db *DB) BeginDelayedReadTx() sql.OuterReadTx

BeginDelayedReadTx returns a valid SQL transaction, but will only start the transaction once the first query is executed. This is useful for when a calling module is expected to control the lifetime of a read transaction, but the implementation might not need to use the transaction.

func (*DB) BeginPreparedTx added in v0.9.0

func (db *DB) BeginPreparedTx(ctx context.Context) (sql.PreparedTx, error)

BeginPreparedTx makes the DB's singular transaction, which is used automatically by consumers of the Query and Execute methods. This is the mode of operation used by Kwil to have one system coordinating transaction lifetime, with one or more other systems implicitly using the transaction for their queries.

This method creates a sequenced transaction, and it should be committed with a prepared transaction (two-phase commit) using Precommit. Use BeginTx for a regular outer transaction without sequencing or a prepared transaction.

The returned transaction is also capable of creating nested transactions. This functionality is used to prevent user dataset query errors from rolling back the outermost transaction.

func (*DB) BeginReadTx

func (db *DB) BeginReadTx(ctx context.Context) (sql.OuterReadTx, error)

ReadTx creates a read-only transaction for the database. It obtains a read connection from the pool, which will be returned to the pool when the transaction is closed.

func (*DB) BeginReservedReadTx added in v0.8.1

func (db *DB) BeginReservedReadTx(ctx context.Context) (sql.Tx, error)

BeginReservedReadTx starts a read-only transaction using a reserved reader connection. This is to allow read-only consensus operations that operate outside of the write transaction's lifetime, such as proposal preparation and approval, to function without contention on the reader pool that services user requests.

func (*DB) BeginSnapshotTx added in v0.8.1

func (db *DB) BeginSnapshotTx(ctx context.Context) (sql.Tx, string, error)

BeginSnapshotTx creates a read-only transaction with serializable isolation level. This is used for taking a snapshot of the database.

func (*DB) BeginTx

func (db *DB) BeginTx(ctx context.Context) (sql.Tx, error)

BeginTx starts a regular read-write transaction. For a sequenced two-phase transaction, use BeginPreparedTx.

func (*DB) Close

func (db *DB) Close() error

Close shuts down the Kwil DB. This stops all connections and the WAL data receiver.

func (*DB) Done added in v0.8.1

func (db *DB) Done() <-chan struct{}

Done allows higher level systems to monitor the state of the DB backend connection and shutdown (or restart) the application if necessary. Without this, the application hits an error the next time it uses the DB, which may be confusing and inopportune.

func (*DB) EnsureFullReplicaIdentityDatasets added in v0.9.0

func (db *DB) EnsureFullReplicaIdentityDatasets(ctx context.Context) error

EnsureFullReplicaIdentityDatasets should be used after the first time opening a database that was restored from a snapshot, which may have been created with an older version of kwild that did not set this on all tables.

func (*DB) Err added in v0.8.1

func (db *DB) Err() error

Err provides any error that caused the DB to shutdown. This will return context.Canceled after Close has been called.

func (*DB) Execute

func (db *DB) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)

Execute runs a statement on an existing transaction, or on a short lived transaction from the write connection if in auto-commit mode.

func (*DB) Pool

func (db *DB) Pool() *Pool

Pool is a trapdoor to get the connection pool. Probably not for normal Kwil DB operation, but test setup/teardown.

func (*DB) Query

func (db *DB) Query(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)

Query performs a read-only query on a read connection.

type DBConfig

type DBConfig struct {
	PoolConfig

	// SchemaFilter is used to include WAL data for certain *postgres* schema
	// (not Kwil schema). If nil, the default is to include updates to tables in
	// any schema prefixed by "ds_".
	// DEPRECATED: This has become baked into Kwil's DB conventions in many places.
	SchemaFilter func(string) bool
}

DBConfig is the configuration for the Kwil DB backend, which includes the connection parameters and a schema filter used to selectively include WAL data for certain PostgreSQL schemas in commit ID calculation.

type NamedArgs

type NamedArgs = pgx.NamedArgs

NamedArgs is a query rewriter that can be used as one of the first arguments in the []any provided to a query function so that the named arguments are automatically used to rewrite the SQL statement from named (using @argname) syntax to positional ($1, $2, etc.). IMPORTANT: Note that the input statement requires named arguments to us "@" instead of "$" for the named arguments. Modify the SQL string as necessary to work with this rewriter.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool is a simple read connection pool with one dedicated writer connection. This type is relatively low level, and Kwil will generally use the DB type to manage sessions instead of this type directly. It is exported primarily for testing and reuse in more general use cases.

Pool supports Kwil's single transactional DB writer model:

  • a single writer connection, on which a transaction is created by a top level system during block execution (i.e. the AbciApp), and from which reads of uncommitted DB records may be performed.
  • multiple readers, which may service other asynchronous operations such as a gRPC user service.

The write methods from the Tx returned from the BeginTx method should be preferred over directly using the Pool's write methods. The DB type is the session-aware wrapper that creates and stores the write Tx, and provides top level Exec/Set methods that error if no Tx exists. Only use Pool as a building block or for testing individual systems outside of the context of a session.

func NewPool

func NewPool(ctx context.Context, cfg *PoolConfig) (*Pool, error)

NewPool creates a connection pool to a PostgreSQL database.

func (*Pool) BeginReadTx

func (p *Pool) BeginReadTx(ctx context.Context) (sql.OuterReadTx, error)

BeginReadTx starts a read-only transaction.

func (*Pool) BeginTx

func (p *Pool) BeginTx(ctx context.Context) (sql.Tx, error)

BeginTx starts a read-write transaction. It is an error to call this twice without first closing the initial transaction.

func (*Pool) Close

func (p *Pool) Close() error

func (*Pool) Execute

func (p *Pool) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)

Execute performs a read-write query on the writer connection.

func (*Pool) Query

func (p *Pool) Query(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)

Query performs a read-only query using the read connection pool. It is executed in a transaction with read only access mode to ensure there can be no modifications.

type PoolConfig

type PoolConfig struct {
	ConnConfig

	// MaxConns is the maximum number of allowable connections in the read pool.
	// This does not include the reserved connections for the consensus thread,
	// one of which is a writer.
	MaxConns uint32
}

PoolConfig combines a connection config with additional options for a pool of read connections and a single write connection, as required for kwild.MaxConns

type PrepareMessageV3

type PrepareMessageV3 struct {
	// Flags currently unused (must be 0).
	Flags uint8
	// PrepareLSN is the LSN of the prepare.
	PrepareLSN pglogrepl.LSN
	// EndPrepareLSN is the end LSN of the prepared transaction.
	EndPrepareLSN pglogrepl.LSN
	// PrepareTime is the prepare timestamp of the transaction
	PrepareTime time.Time
	// Xid of the transaction
	Xid uint32
	// UserGID ius the user-defined GID of the prepared transaction.
	UserGID string
}

PrepareMessageV3 is the a prepared transaction message.

func (*PrepareMessageV3) Decode

func (m *PrepareMessageV3) Decode(src []byte) error

func (*PrepareMessageV3) Type

type QueryMode

type QueryMode = pgx.QueryExecMode

QueryMode is a type recognized by the query methods when in one of the first arguments in the []any that causes the query to be executed in a certain mode. Presently this is used to change from the prepare/describe approaches to determining input argument type to a simpler mode that infers the argument types from the passed Go variable types, which is helpful for "in-line expressions" such as `SELECT $1;` that convey no information on their own about the type of the argument, resulting in an assumed type that may not match the type of provided Go variable (error in many cases).

const (
	// QueryModeDefault uses a prepare-query request cycle to determine arg
	// types (OID) using postgres to describe the statement. This may not be
	// helpful for in-line expressions that reference no known table. There must
	// be an encode/decode plan available for the OID and the Go type.
	QueryModeDefault QueryMode = pgx.QueryExecModeCacheStatement
	// QueryModeDescribeExec also uses a prepared statement, but it is unnamed
	// and not cached, and thus avoids the need to retry a query if the table
	// definitions are modified concurrently. Requires two round trips.
	QueryModeDescribeExec QueryMode = pgx.QueryExecModeDescribeExec
	// QueryModeExec still uses the extended protocol, but does not ask
	// postgresql to describe the statement to determine parameters types.
	// Instead, the types are determined from the Go variables.
	QueryModeExec QueryMode = pgx.QueryExecModeExec
	// QueryModeSimple is like QueryModeExec, except that it uses the "simple"
	// postgresql wire protocol. Prefer QueryModeExec if argument type inference
	// based on the Go variables is required since this forces everything into text.
	QueryModeSimple QueryMode = pgx.QueryExecModeSimpleProtocol

	// QueryModeInferredArgTypes runs the query in a special execution mode that
	// is like QueryModeExec except that it infers the argument OIDs from the Go
	// argument values AND asserts those types in preparing the statement, which
	// is necessary for our in-line expressions. QueryModeExec does not use
	// Parse/Describe messages; this mode does while asserting the param types.
	// It is like a hybrid between QueryModeDescribeExec and QueryModeExec. It
	// is incompatible with other special arguments like NamedArgs.
	QueryModeInferredArgTypes QueryMode = 1 << 16
)

type Relation added in v0.9.0

type Relation struct {
	Schema  string
	Table   string
	Columns []*Column
}

Relation is a table in a schema.

func (*Relation) MarshalBinary added in v0.9.0

func (r *Relation) MarshalBinary() ([]byte, error)

func (*Relation) Prefix added in v0.9.0

func (r *Relation) Prefix() byte

func (*Relation) String added in v0.9.0

func (r *Relation) String() string

func (*Relation) UnmarshalBinary added in v0.9.0

func (r *Relation) UnmarshalBinary(data []byte) error

type RollbackPreparedMessageV3

type RollbackPreparedMessageV3 struct {
	// Flags currently unused (must be 0).
	Flags uint8
	// EndLSN is the end LSN of the prepared transaction.
	EndLSN pglogrepl.LSN
	// RollbackLSN is the end LSN of the rollback of the prepared transaction.
	RollbackLSN pglogrepl.LSN
	// PrepareTime is the prepare timestamp of the transaction
	PrepareTime time.Time
	// RollbackTime is the rollback timestamp of the transaction
	RollbackTime time.Time
	// Xid of the transaction
	Xid uint32
	// UserGID ius the user-defined GID of the prepared transaction.
	UserGID string
}

RollbackPreparedMessageV3 is a rollback prepared message.

func (*RollbackPreparedMessageV3) Decode

func (m *RollbackPreparedMessageV3) Decode(src []byte) error

func (*RollbackPreparedMessageV3) Type

type TableStatser added in v0.9.0

type TableStatser interface {
	TableStats(ctx context.Context, schema, table string) (*sql.Statistics, error)
}

TableStatser is an interface that the implementation of a sql.Executor may implement.

type Tuple added in v0.9.0

type Tuple struct {
	// relationIdx is the index of the relation in the changeset metadata struct.
	RelationIdx uint32
	// Columns is a list of columns and their values.
	Columns []*TupleColumn
}

Tuple is a tuple of values.

type TupleColumn added in v0.9.0

type TupleColumn struct {
	// ValueType gives information on the type of data in the column. If the
	// type is of type Null, UnchangedUpdate, or Toast, the Data field will be nil.
	ValueType ValueType
	// Data is the actual data in the column.
	Data []byte
}

TupleColumn is a column within a tuple.

type ValueType added in v0.9.0

type ValueType uint8

ValueType gives information on the type of data in a tuple column.

const (
	// NullValue indicates a NULL value
	// (as opposed to something like an empty string).
	NullValue ValueType = iota
	// ToastValue indicates a column is a TOAST pointer,
	// and that the actual value is stored elsewhere and
	// was unchanged.
	ToastValue
	// SerializedValue indicates a column is a non-nil value
	// and can be deserialized.
	SerializedValue
	// UnchangedUpdate indicates a column was unchanged. This is used in the new
	// tuples in an UPDATE changeset entry.
	UnchangedUpdate
)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL