sqlitestream

package
v0.0.0-...-1ec411c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2024 License: MIT, Unlicense Imports: 10 Imported by: 0

Documentation

Overview

Package sqlitestream provides a streamable SQLite database abstraction.

This enables building reactive applications that respond whenever their data model changes, regardless of where it was changed (so long as the database abstraction provided here is used to make changes).

This work was developed initially by Chris Waldon and Jack Mordaunt within Plato Team. We thank Plato for open-sourcing the refined byproduct of many months of engineering on this system.

How it works and limitations

This approach is built atop SQLite's Update Hook. As such, it inherits some limitations from the update hook:

  • Tables created using WITHOUT ROWID do not cause the hook to fire, and therefore are incompatible with this package.
  • Queries using INSERT OR REPLACE, ON CONFLICT REPLACE, or any other REPLACE-based operation do not trigger updates to the rows acted upon because they delete and *recreate* those rows with a different rowid. ON CONFLICT UPDATE does work though.
  • If a table is emptied via the Truncate Optimization, the hook is not invoked and stream queries will not see the values disappear.

API Structure

The DB type wraps the necessary logic to manage database streaming, but (due to limitations of Go generics) the methods for interacting with the database are all top-level generic functions that accept a DB as their first argument.

Usage

While the API of this package is the high-level API for streaming from SQLite, this package is generic across SQLite database drivers and SQL API abstractions. We provide an example package git.sr.ht/~gioverse/skel/stream/sqlitestream/mattn_sqlx_stream that shows how to use this package with mattn's sqlite driver and the sqlx query abstraction, but users can use any query abstraction and any SQLite database driver supporting the update hook.

See the example in the mattn_sqlx_stream package for a full tutorial on how to use this package in real code.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Read

func Read[D DBInterface, X TXInterface, T any](db *DB[D, X], ctx context.Context, work func(X) (T, error)) (T, error)

Read is ReadParams without parameters.

func ReadParams

func ReadParams[D DBInterface, X TXInterface, P, T any](db *DB[D, X], ctx context.Context, params P, work func(X, P) (T, error)) (T, error)

ReadParams provides a convenient high-level API for read-only queries with parameters and returning data.

func Stream

func Stream[D DBInterface, X TXInterface, T any](
	db *DB[D, X],
	ctx context.Context,
	query func(X) (T, error),
	genWatchTables func(X) []sqlitewatch.Table,
) <-chan stream.Result[T]

Stream is StreamParams without parameters.

func StreamParams

func StreamParams[D DBInterface, X TXInterface, P, T any](
	db *DB[D, X],
	ctx context.Context,
	params P,
	query func(X, P) (T, error),
	genWatchTables func(X, P) []sqlitewatch.Table,
) <-chan stream.Result[T]

StreamParams returns a channel that emits the results of the provided query every time the watch tables (returned from the genWatchTables func) are modified. If genWatchTables does not return any tables, the stream will never update, though the query function will be invoked once at initialization.

When the context is cancelled, the goroutine will shut down and the returned channel will close.

func StreamTables

func StreamTables[D DBInterface, X TXInterface, T any](db *DB[D, X], ctx context.Context, query func(X) (T, error), watchTables ...string) <-chan stream.Result[T]

StreamTables is StreamTablesParams without parameters.

func StreamTablesParams

func StreamTablesParams[D DBInterface, X TXInterface, P, T any](db *DB[D, X], ctx context.Context, params P, query func(X, P) (T, error), watchTables ...string) <-chan stream.Result[T]

StreamTablesParams is StreamParams except it takes the names of tables to watch directly. This variant doesn't consider rowids within the tables.

func Write

func Write[D DBInterface, X TXInterface, T any](db *DB[D, X], ctx context.Context, work func(X) (T, error)) (T, error)

Write provides a convenient high-level API for write queries returning data.

func WriteParams

func WriteParams[D DBInterface, X TXInterface, P, T any](db *DB[D, X], ctx context.Context, params P, work func(X, P) (T, error)) (T, error)

WriteParams provides a convenient high-level API for write queries with parameters and returning data.

func WriteParamsVoid

func WriteParamsVoid[D DBInterface, X TXInterface, P any](db *DB[D, X], ctx context.Context, params P, work func(X, P) error) error

WriteParamsVoid is WriteParams returning no data.

func WriteVoid

func WriteVoid[D DBInterface, X TXInterface](db *DB[D, X], ctx context.Context, work func(X) error) error

WriteVoid provides a convenient high-level API for write queries returning no data.

Types

type DB

type DB[D DBInterface, X TXInterface] struct {
	sync.Mutex
	// contains filtered or unexported fields
}

DB manages a watchable SQLite database. It is designed for databases operating in WAL mode, which permits only one writer at a time, but many readers. This type implements application-level locking to prevent write transactions from running simultaneously (generating SQLITE_ERROR_BUSY). Thus, all database transactions should be built using this type, via the Read, Write, and Stream helpers.

If all database interactions are correctly mediated by this type, the sqlitewatch.Watcher's commit detection logic will be properly invoked after every database transaction, ensuring consistent detection of database changes.

func NewDB

func NewDB[D DBInterface, X TXInterface](db D, watcher *sqlitewatch.Watcher, makeTx func(db D, ctx context.Context, readonly bool) (X, error)) *DB[D, X]

NewDB builds a DB using the provided concrete database and watcher. The makeTx function defines how to build read only and write transactions using the concrete database.

func (*DB[D, X]) Close

func (d *DB[D, X]) Close() error

Close the underlying DBInterface, returning any error.

type DBInterface

type DBInterface interface {
	Close() error
}

DBInterface defines the minimal interface of a database instanced needed in order for the DB type to manage interactions with the database.

type TXInterface

type TXInterface interface {
	// Commit the transaction, returning any errors.
	Commit() error
	// Rollback the transation, returning any errors.
	Rollback() error
}

TXInterface defines the minimal interface of a database transaction needed in order for the DB type to manage the lifecycle of the transaction.

Directories

Path Synopsis
Package mattn_sqlx_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and sqlx query helpers.
Package mattn_sqlx_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and sqlx query helpers.
Package mattn_stdlib_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and Go stdlib database/sql package.
Package mattn_stdlib_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and Go stdlib database/sql package.
Package sqlitewatch implements low-level SQLite operation watching.
Package sqlitewatch implements low-level SQLite operation watching.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL