Documentation ¶
Overview ¶
Package pg defines the primary PostgreSQL-powered DB and Pool types used to support Kwil DB.
See the DB type for more information.
Index ¶
- Constants
- Variables
- func DecodeStreamPrefix(b [5]byte) (csType byte, sz uint32)
- func IsUnchanged(v any) bool
- func QueryRowFunc(ctx context.Context, tx sql.Executor, stmt string, scans []any, ...) error
- func QueryRowFuncAny(ctx context.Context, tx sql.Executor, stmt string, fn func([]any) error, ...) error
- func RowCount(ctx context.Context, qualifiedTable string, db sql.Executor) (int64, error)
- func StreamElement(w io.Writer, s ChangeStreamer) error
- func TableStats(ctx context.Context, schema, table string, db sql.Executor) (*sql.Statistics, error)
- func TextValue(val any) (txt string, null bool, ok bool)
- func UseLogger(log klog.Logger)
- type BeginPrepareMessageV3
- type ChangeStreamer
- type ChangesetEntry
- func (ce *ChangesetEntry) ApplyChangesetEntry(ctx context.Context, tx sql.DB, relation *Relation) error
- func (c *ChangesetEntry) DecodeTuples(relation *Relation) (oldValues, newValues []any, err error)
- func (ce *ChangesetEntry) Kind() string
- func (ce *ChangesetEntry) MarshalBinary() ([]byte, error)
- func (ce *ChangesetEntry) Prefix() byte
- func (ce *ChangesetEntry) String() string
- func (ce *ChangesetEntry) UnmarshalBinary(data []byte) error
- type ColInfo
- func (ci *ColInfo) IsBool() bool
- func (ci *ColInfo) IsByteA() bool
- func (ci *ColInfo) IsFloat() bool
- func (ci *ColInfo) IsInt() bool
- func (ci *ColInfo) IsNumeric() bool
- func (ci *ColInfo) IsText() bool
- func (ci *ColInfo) IsTime() bool
- func (ci *ColInfo) IsUINT256() bool
- func (ci *ColInfo) IsUUID() bool
- func (ci *ColInfo) Type() ColType
- type ColType
- type Column
- type CommitPreparedMessageV3
- type ConnConfig
- type DB
- func (db *DB) AutoCommit(auto bool)
- func (db *DB) BeginDelayedReadTx() sql.OuterReadTx
- func (db *DB) BeginPreparedTx(ctx context.Context) (sql.PreparedTx, error)
- func (db *DB) BeginReadTx(ctx context.Context) (sql.OuterReadTx, error)
- func (db *DB) BeginReservedReadTx(ctx context.Context) (sql.Tx, error)
- func (db *DB) BeginSnapshotTx(ctx context.Context) (sql.Tx, string, error)
- func (db *DB) BeginTx(ctx context.Context) (sql.Tx, error)
- func (db *DB) Close() error
- func (db *DB) Done() <-chan struct{}
- func (db *DB) EnsureFullReplicaIdentityDatasets(ctx context.Context) error
- func (db *DB) Err() error
- func (db *DB) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)
- func (db *DB) Pool() *Pool
- func (db *DB) Query(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)
- type DBConfig
- type NamedArgs
- type Pool
- func (p *Pool) BeginReadTx(ctx context.Context) (sql.OuterReadTx, error)
- func (p *Pool) BeginTx(ctx context.Context) (sql.Tx, error)
- func (p *Pool) Close() error
- func (p *Pool) Execute(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)
- func (p *Pool) Query(ctx context.Context, stmt string, args ...any) (*sql.ResultSet, error)
- type PoolConfig
- type PrepareMessageV3
- type QueryMode
- type Relation
- type RollbackPreparedMessageV3
- type TableStatser
- type Tuple
- type TupleColumn
- type ValueType
Constants ¶
const ( RelationType = byte(0x01) ChangesetEntryType = byte(0x02) BlockSpendsType = byte(0x03) )
const ( MessageTypePrepare pglogrepl.MessageType = 'P' // this is not the prepare you're looking for MessageTypeBeginPrepare pglogrepl.MessageType = 'b' MessageTypeCommitPrepared pglogrepl.MessageType = 'K' MessageTypeRollbackPrepared pglogrepl.MessageType = 'r' MessageTypeStreamPrepare pglogrepl.MessageType = 'p' )
const DefaultSchemaFilterPrefix = "ds_"
const (
InternalSchemaName = "kwild_internal"
)
Variables ¶
var ErrNaN = errors.New("NaN")
var ErrUnsupportedOID = errors.New("unsupported OID")
Functions ¶
func DecodeStreamPrefix ¶ added in v0.9.0
DecodeStreamPrefix decodes prefix bytes for a changeset element. This mirrors the encoding convention in StreamElement.
func IsUnchanged ¶ added in v0.9.0
func QueryRowFunc ¶ added in v0.9.0
func QueryRowFunc(ctx context.Context, tx sql.Executor, stmt string, scans []any, fn func() error, args ...any) error
QueryRowFunc will attempt to execute an SQL statement, handling the rows and returned values as described by the sql.QueryScanner interface. If the provided Executor is also a sql.QueryScanner, that method will be used, otherwise, it will attempt to use the underlying DB connection. The latter is supported for all concrete transaction types in this package as well as instances of the pgx.Tx interface.
func QueryRowFuncAny ¶ added in v0.9.0
func QueryRowFuncAny(ctx context.Context, tx sql.Executor, stmt string, fn func([]any) error, args ...any) error
QueryRowFuncAny is similar to QueryRowFunc, except that no scan values slice is provided. The provided function is called for each row of the result. The caller does not determine the types of the Go variables in the values slice. In this way it behaves similar to Execute, but providing "for each row" functionality so that every row does not need to be loaded into memory. See also QueryRowFunc, which allows the caller to provide a scan values slice.
func RowCount ¶ added in v0.9.0
RowCount gets a precise row count for the named fully qualified table. If the Executor satisfies the RowCounter interface, that method will be used directly. Otherwise a simple select query is used.
func StreamElement ¶ added in v0.9.0
func StreamElement(w io.Writer, s ChangeStreamer) error
StreamElement writes the serialized changeset element to the writer, preceded by the type's prefix and serialized size. This is supports streamed encoding. When decoding, use DecodeStreamPrefix to interpret the 5-byte prefixes before each encoded element.
func TableStats ¶ added in v0.9.0
func TableStats(ctx context.Context, schema, table string, db sql.Executor) (*sql.Statistics, error)
TableStats collects deterministic statistics for a table. If schema is empty, the "public" schema is assumed. This method is used to obtain the ground truth statistics for a table; incremental statistics updates should be preferred when possible. If the sql.Executor implementation is a TableStatser, it's method is used directly. This is primarily to allow a stub DB for testing.
Types ¶
type BeginPrepareMessageV3 ¶
type BeginPrepareMessageV3 struct { // Flags currently unused (must be 0). // Flags uint8 // PrepareLSN is the LSN of the prepare. PrepareLSN pglogrepl.LSN // EndPrepareLSN is the end LSN of the prepared transaction. EndPrepareLSN pglogrepl.LSN // PrepareTime is the prepare timestamp of the transaction PrepareTime time.Time // Xid of the transaction Xid uint32 // UserGID ius the user-defined GID of the prepared transaction. UserGID string }
BeginPrepareMessageV3 is the beginning of a prepared transaction message.
func (*BeginPrepareMessageV3) Decode ¶
func (m *BeginPrepareMessageV3) Decode(src []byte) error
func (*BeginPrepareMessageV3) Type ¶
func (m *BeginPrepareMessageV3) Type() pglogrepl.MessageType
type ChangeStreamer ¶ added in v0.9.0
type ChangeStreamer interface { encoding.BinaryMarshaler Prefix() byte }
ChangeStreamer is a type that supports streaming with StreamElement. This and the associated helper functions could alternatively be in migrator.
type ChangesetEntry ¶ added in v0.9.0
type ChangesetEntry struct { // RelationIdx is the index in the full relation list for the changeset that // precedes the tuple change entries. RelationIdx uint32 OldTuple []*TupleColumn // empty for insert NewTuple []*TupleColumn // empty for delete }
func (*ChangesetEntry) ApplyChangesetEntry ¶ added in v0.9.0
func (*ChangesetEntry) DecodeTuples ¶ added in v0.9.0
func (c *ChangesetEntry) DecodeTuples(relation *Relation) (oldValues, newValues []any, err error)
DecodeTuple decodes serialized tuple column values into their native types. Any value may be nil, depending on the ValueType.
func (*ChangesetEntry) Kind ¶ added in v0.9.0
func (ce *ChangesetEntry) Kind() string
func (*ChangesetEntry) MarshalBinary ¶ added in v0.9.0
func (ce *ChangesetEntry) MarshalBinary() ([]byte, error)
func (*ChangesetEntry) Prefix ¶ added in v0.9.0
func (ce *ChangesetEntry) Prefix() byte
func (*ChangesetEntry) String ¶ added in v0.9.0
func (ce *ChangesetEntry) String() string
func (*ChangesetEntry) UnmarshalBinary ¶ added in v0.9.0
func (ce *ChangesetEntry) UnmarshalBinary(data []byte) error
type ColInfo ¶ added in v0.9.0
type ColInfo struct { Pos int Name string // DataType is the string reported by information_schema.column for the // column. Use the Type() method to return the ColType. DataType string Array bool Nullable bool // contains filtered or unexported fields }
ColInfo is used when ingesting column descriptions from PostgreSQL, such as from information_schema.column. Use the Type method to return a canonical ColType.
func ColumnInfo ¶ added in v0.9.0
ColumnInfo attempts to describe the columns of a table in a specified PostgreSQL schema. The results are **as reported by information_schema.column**.
If the provided sql.Executor is also a ColumnInfoer, its ColumnInfo method will be used. This is primarily for testing with a mocked DB transaction. Otherwise, the Executor must be one of the transaction types created by this package, which provide access to the underlying DB connection.
type ColType ¶ added in v0.9.0
type ColType string
ColType is the type used to enumerate various known column types (and arrays of those types). These are used to describe tables characterized by the ColumnInfo function, and to support its ScanVal method.
const ( ColTypeInt ColType = "int" ColTypeText ColType = "text" ColTypeBool ColType = "bool" ColTypeByteA ColType = "bytea" ColTypeUUID ColType = "uuid" ColTypeNumeric ColType = "numeric" ColTypeUINT256 ColType = "uint256" ColTypeFloat ColType = "float" ColTypeTime ColType = "timestamp" ColTypeIntArray ColType = "int[]" ColTypeTextArray ColType = "text[]" ColTypeBoolArray ColType = "bool[]" ColTypeByteAArray ColType = "bytea[]" ColTypeUUIDArray ColType = "uuid[]" ColTypeNumericArray ColType = "numeric[]" ColTypeUINT256Array ColType = "uint256[]" ColTypeFloatArray ColType = "float[]" ColTypeTimeArray ColType = "timestamp[]" ColTypeUnknown ColType = "unknown" )
type CommitPreparedMessageV3 ¶
type CommitPreparedMessageV3 struct { // Flags currently unused (must be 0). Flags uint8 // CommitLSN is the LSN of the commit of the prepared transaction. CommitLSN pglogrepl.LSN // EndCommitLSN is the end LSN of the commit of the prepared transaction. EndCommitLSN pglogrepl.LSN // CommitTime is the commit timestamp of the transaction CommitTime time.Time // Xid of the transaction Xid uint32 // UserGID is the user-defined GID of the prepared transaction. UserGID string }
CommitPreparedMessageV3 is a commit prepared message.
func (*CommitPreparedMessageV3) Decode ¶
func (m *CommitPreparedMessageV3) Decode(src []byte) error
func (*CommitPreparedMessageV3) Type ¶
func (m *CommitPreparedMessageV3) Type() pglogrepl.MessageType
type ConnConfig ¶
type ConnConfig struct { // Host, Port, User, Pass, and DBName are used verbatim to create a // connection string in DSN format. // https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING Host, Port string User, Pass string DBName string }
ConnConfig groups the basic connection settings used to construct the DSN "connection string" used to open a new connection to a postgres host. TODO: use this in the various constructors for DB, Pool, etc.
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB is a session-aware wrapper that creates and stores a write Tx on request, and provides top level Exec/Set methods that error if no Tx exists. This design prevents any out-of-session write statements from executing, and makes uncommitted reads explicit (and impossible in the absence of an active transaction).
This type is tailored to use in kwild in the following ways:
Controlled transactional interaction that requires beginning a transaction before using the Exec method, unless put in "autocommit" mode using the AutoCommit method. Use of the write connection when not executing a block's transactions is prevented.
Using an underlying connection pool, with multiple readers and a single write connection to ensure all uses of Execute operate on the active transaction.
Emulating SQLite changesets by collecting WAL data for updates from a dedicated logical replication connection and slot. The Precommit method is used to retrieve the commit ID prior to Commit.
DB requires a superuser connection to a Postgres database that can perform administrative actions on the database.
func NewDB ¶
NewDB creates a new Kwil DB instance. On creation, it will connect to the configured postgres process, creating as many connections as specified by the PoolConfig plus a special connection for a logical replication slot receiver. The database user (postgresql "role") must be a super user for several reasons: creating triggers, collations, and the replication publication.
WARNING: There must only be ONE instance of a DB for a given postgres database. Transactions that use the Precommit method update an internal table used to sequence transactions.
func (*DB) AutoCommit ¶
AutoCommit toggles auto-commit mode, in which the Execute method may be used without having to begin/commit. This is to support startup and initialization tasks that occur prior to the start of the atomic commit process used while executing blocks.
func (*DB) BeginDelayedReadTx ¶ added in v0.9.0
func (db *DB) BeginDelayedReadTx() sql.OuterReadTx
BeginDelayedReadTx returns a valid SQL transaction, but will only start the transaction once the first query is executed. This is useful for when a calling module is expected to control the lifetime of a read transaction, but the implementation might not need to use the transaction.
func (*DB) BeginPreparedTx ¶ added in v0.9.0
BeginPreparedTx makes the DB's singular transaction, which is used automatically by consumers of the Query and Execute methods. This is the mode of operation used by Kwil to have one system coordinating transaction lifetime, with one or more other systems implicitly using the transaction for their queries.
This method creates a sequenced transaction, and it should be committed with a prepared transaction (two-phase commit) using Precommit. Use BeginTx for a regular outer transaction without sequencing or a prepared transaction.
The returned transaction is also capable of creating nested transactions. This functionality is used to prevent user dataset query errors from rolling back the outermost transaction.
func (*DB) BeginReadTx ¶
ReadTx creates a read-only transaction for the database. It obtains a read connection from the pool, which will be returned to the pool when the transaction is closed.
func (*DB) BeginReservedReadTx ¶ added in v0.8.1
BeginReservedReadTx starts a read-only transaction using a reserved reader connection. This is to allow read-only consensus operations that operate outside of the write transaction's lifetime, such as proposal preparation and approval, to function without contention on the reader pool that services user requests.
func (*DB) BeginSnapshotTx ¶ added in v0.8.1
BeginSnapshotTx creates a read-only transaction with serializable isolation level. This is used for taking a snapshot of the database.
func (*DB) BeginTx ¶
BeginTx starts a regular read-write transaction. For a sequenced two-phase transaction, use BeginPreparedTx.
func (*DB) Close ¶
Close shuts down the Kwil DB. This stops all connections and the WAL data receiver.
func (*DB) Done ¶ added in v0.8.1
func (db *DB) Done() <-chan struct{}
Done allows higher level systems to monitor the state of the DB backend connection and shutdown (or restart) the application if necessary. Without this, the application hits an error the next time it uses the DB, which may be confusing and inopportune.
func (*DB) EnsureFullReplicaIdentityDatasets ¶ added in v0.9.0
EnsureFullReplicaIdentityDatasets should be used after the first time opening a database that was restored from a snapshot, which may have been created with an older version of kwild that did not set this on all tables.
func (*DB) Err ¶ added in v0.8.1
Err provides any error that caused the DB to shutdown. This will return context.Canceled after Close has been called.
func (*DB) Execute ¶
Execute runs a statement on an existing transaction, or on a short lived transaction from the write connection if in auto-commit mode.
type DBConfig ¶
type DBConfig struct { PoolConfig // SchemaFilter is used to include WAL data for certain *postgres* schema // (not Kwil schema). If nil, the default is to include updates to tables in // any schema prefixed by "ds_". // DEPRECATED: This has become baked into Kwil's DB conventions in many places. SchemaFilter func(string) bool }
DBConfig is the configuration for the Kwil DB backend, which includes the connection parameters and a schema filter used to selectively include WAL data for certain PostgreSQL schemas in commit ID calculation.
type NamedArgs ¶
type NamedArgs = pgx.NamedArgs
NamedArgs is a query rewriter that can be used as one of the first arguments in the []any provided to a query function so that the named arguments are automatically used to rewrite the SQL statement from named (using @argname) syntax to positional ($1, $2, etc.). IMPORTANT: Note that the input statement requires named arguments to us "@" instead of "$" for the named arguments. Modify the SQL string as necessary to work with this rewriter.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool is a simple read connection pool with one dedicated writer connection. This type is relatively low level, and Kwil will generally use the DB type to manage sessions instead of this type directly. It is exported primarily for testing and reuse in more general use cases.
Pool supports Kwil's single transactional DB writer model:
- a single writer connection, on which a transaction is created by a top level system during block execution (i.e. the AbciApp), and from which reads of uncommitted DB records may be performed.
- multiple readers, which may service other asynchronous operations such as a gRPC user service.
The write methods from the Tx returned from the BeginTx method should be preferred over directly using the Pool's write methods. The DB type is the session-aware wrapper that creates and stores the write Tx, and provides top level Exec/Set methods that error if no Tx exists. Only use Pool as a building block or for testing individual systems outside of the context of a session.
func NewPool ¶
func NewPool(ctx context.Context, cfg *PoolConfig) (*Pool, error)
NewPool creates a connection pool to a PostgreSQL database.
func (*Pool) BeginReadTx ¶
BeginReadTx starts a read-only transaction.
func (*Pool) BeginTx ¶
BeginTx starts a read-write transaction. It is an error to call this twice without first closing the initial transaction.
type PoolConfig ¶
type PoolConfig struct { ConnConfig // MaxConns is the maximum number of allowable connections in the read pool. // This does not include the reserved connections for the consensus thread, // one of which is a writer. MaxConns uint32 }
PoolConfig combines a connection config with additional options for a pool of read connections and a single write connection, as required for kwild.MaxConns
type PrepareMessageV3 ¶
type PrepareMessageV3 struct { // Flags currently unused (must be 0). Flags uint8 // PrepareLSN is the LSN of the prepare. PrepareLSN pglogrepl.LSN // EndPrepareLSN is the end LSN of the prepared transaction. EndPrepareLSN pglogrepl.LSN // PrepareTime is the prepare timestamp of the transaction PrepareTime time.Time // Xid of the transaction Xid uint32 // UserGID ius the user-defined GID of the prepared transaction. UserGID string }
PrepareMessageV3 is the a prepared transaction message.
func (*PrepareMessageV3) Decode ¶
func (m *PrepareMessageV3) Decode(src []byte) error
func (*PrepareMessageV3) Type ¶
func (m *PrepareMessageV3) Type() pglogrepl.MessageType
type QueryMode ¶
type QueryMode = pgx.QueryExecMode
QueryMode is a type recognized by the query methods when in one of the first arguments in the []any that causes the query to be executed in a certain mode. Presently this is used to change from the prepare/describe approaches to determining input argument type to a simpler mode that infers the argument types from the passed Go variable types, which is helpful for "in-line expressions" such as `SELECT $1;` that convey no information on their own about the type of the argument, resulting in an assumed type that may not match the type of provided Go variable (error in many cases).
const ( // QueryModeDefault uses a prepare-query request cycle to determine arg // types (OID) using postgres to describe the statement. This may not be // helpful for in-line expressions that reference no known table. There must // be an encode/decode plan available for the OID and the Go type. QueryModeDefault QueryMode = pgx.QueryExecModeCacheStatement // QueryModeDescribeExec also uses a prepared statement, but it is unnamed // and not cached, and thus avoids the need to retry a query if the table // definitions are modified concurrently. Requires two round trips. QueryModeDescribeExec QueryMode = pgx.QueryExecModeDescribeExec // QueryModeExec still uses the extended protocol, but does not ask // postgresql to describe the statement to determine parameters types. // Instead, the types are determined from the Go variables. QueryModeExec QueryMode = pgx.QueryExecModeExec // QueryModeSimple is like QueryModeExec, except that it uses the "simple" // postgresql wire protocol. Prefer QueryModeExec if argument type inference // based on the Go variables is required since this forces everything into text. QueryModeSimple QueryMode = pgx.QueryExecModeSimpleProtocol // QueryModeInferredArgTypes runs the query in a special execution mode that // is like QueryModeExec except that it infers the argument OIDs from the Go // argument values AND asserts those types in preparing the statement, which // is necessary for our in-line expressions. QueryModeExec does not use // Parse/Describe messages; this mode does while asserting the param types. // It is like a hybrid between QueryModeDescribeExec and QueryModeExec. It // is incompatible with other special arguments like NamedArgs. QueryModeInferredArgTypes QueryMode = 1 << 16 )
type Relation ¶ added in v0.9.0
Relation is a table in a schema.
func (*Relation) MarshalBinary ¶ added in v0.9.0
func (*Relation) UnmarshalBinary ¶ added in v0.9.0
type RollbackPreparedMessageV3 ¶
type RollbackPreparedMessageV3 struct { // Flags currently unused (must be 0). Flags uint8 // EndLSN is the end LSN of the prepared transaction. EndLSN pglogrepl.LSN // RollbackLSN is the end LSN of the rollback of the prepared transaction. RollbackLSN pglogrepl.LSN // PrepareTime is the prepare timestamp of the transaction PrepareTime time.Time // RollbackTime is the rollback timestamp of the transaction RollbackTime time.Time // Xid of the transaction Xid uint32 // UserGID ius the user-defined GID of the prepared transaction. UserGID string }
RollbackPreparedMessageV3 is a rollback prepared message.
func (*RollbackPreparedMessageV3) Decode ¶
func (m *RollbackPreparedMessageV3) Decode(src []byte) error
func (*RollbackPreparedMessageV3) Type ¶
func (m *RollbackPreparedMessageV3) Type() pglogrepl.MessageType
type TableStatser ¶ added in v0.9.0
type TableStatser interface {
TableStats(ctx context.Context, schema, table string) (*sql.Statistics, error)
}
TableStatser is an interface that the implementation of a sql.Executor may implement.
type Tuple ¶ added in v0.9.0
type Tuple struct { // relationIdx is the index of the relation in the changeset metadata struct. RelationIdx uint32 // Columns is a list of columns and their values. Columns []*TupleColumn }
Tuple is a tuple of values.
type TupleColumn ¶ added in v0.9.0
type TupleColumn struct { // ValueType gives information on the type of data in the column. If the // type is of type Null, UnchangedUpdate, or Toast, the Data field will be nil. ValueType ValueType // Data is the actual data in the column. Data []byte }
TupleColumn is a column within a tuple.
type ValueType ¶ added in v0.9.0
type ValueType uint8
ValueType gives information on the type of data in a tuple column.
const ( // NullValue indicates a NULL value // (as opposed to something like an empty string). NullValue ValueType = iota // ToastValue indicates a column is a TOAST pointer, // and that the actual value is stored elsewhere and // was unchanged. ToastValue // SerializedValue indicates a column is a non-nil value // and can be deserialized. SerializedValue // UnchangedUpdate indicates a column was unchanged. This is used in the new // tuples in an UPDATE changeset entry. UnchangedUpdate )