Documentation ¶
Overview ¶
Package mattn_stdlib_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and Go stdlib database/sql package.
To use different database drivers or query helpers, copy this (small) package and reimplement OpenDB with your preferred types.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DB ¶
func OpenDB ¶
OpenDB connects the various parts of the SQLite streaming infrastructure together. It builds a watcher, defines a unique sql driver name bound to that watcher, and ensures that all database connections with that driver name properly notify the watcher of database changes. It then builds the appropriate DB structure for streaming queries.
Example ¶
ExampleOpenDB demonstrates the creation and use of a streamable SQLite database.
package main import ( "context" "database/sql" "fmt" "os" "git.sr.ht/~gioverse/skel/stream/sqlitestream" "git.sr.ht/~gioverse/skel/stream/sqlitestream/mattn_stdlib_stream" "git.sr.ht/~gioverse/skel/stream/sqlitestream/sqlitewatch" ) func main() { // Create a context to manage the lifetime of our database streaming. ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Create a streamable database. db, err := mattn_stdlib_stream.OpenDB(ctx, "sqlite3", "database.sqlite?_journal_mode=WAL") if err != nil { panic(err) } // Clean up after ourselves. defer os.Remove("database.sqlite") // Ensure we close the DB before deleting it. defer db.Close() // Populate the DB with some data. err = sqlitestream.WriteVoid(db, ctx, func(tx *sql.Tx) error { _, err := tx.Exec(`CREATE TABLE foo (a INTEGER); INSERT INTO foo (a) VALUES (5),(6),(7);`) return err }) if err != nil { panic(err) } // Create a stream of the data in the foo table using the sqlitestream helpers. fooChan := sqlitestream.StreamTables(db, ctx, func(tx *sql.Tx) ([]int, error) { var foos []int rows, err := tx.Query(`SELECT * from foo;`) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var f int if err := rows.Scan(&f); err != nil { return nil, err } foos = append(foos, f) } return foos, err }, "foo") // Note the "foo" here. This ensures that we get updates on this channel when this table changes. // We should get an initial value with the current data in the table. foos := <-fooChan fmt.Printf("initial foos: %v\n", foos.V) // Resolve the rowid of a row we care about. We can use this to watch changes to that particular row // without being notified of changes elsewhere in the table. rowid, err := sqlitestream.ReadParams(db, ctx, int(7), func(tx *sql.Tx, val int) (int64, error) { // Look up the rowid of a particular row in the table. We will only be notified of // changes to *that* row. var rowid int64 row := tx.QueryRow(`SELECT rowid FROM foo WHERE a = ?;`, val) if err := row.Scan(&rowid); err != nil { return 0, err } return rowid, err }) if err != nil { panic(err) } // Use the rowid to stream that table row. specificChan := sqlitestream.StreamParams(db, ctx, rowid, func(tx *sql.Tx, rowid int64) ([]int, error) { var foos []int rows, err := tx.Query(`SELECT * FROM foo WHERE rowid = ?;`, rowid) if err != nil { return nil, err } defer rows.Close() for rows.Next() { var f int if err := rows.Scan(&f); err != nil { return nil, err } foos = append(foos, f) } return foos, err }, func(tx *sql.Tx, rowid int64) []sqlitewatch.Table { // Depending on your use-case, you can sometimes resolve the rowid in here. return []sqlitewatch.Table{{Name: "foo", RowIDs: []int64{rowid}}} }) // We should get an initial value with the current data in the table. foos = <-specificChan fmt.Printf("initial specific foos: %v\n", foos.V) // Update the data in the table. err = sqlitestream.WriteVoid(db, ctx, func(tx *sql.Tx) error { _, err := tx.Exec(` UPDATE foo SET a = 9 WHERE a = 6; INSERT INTO foo (a) VALUES (100); `) return err }) // We should get an updated value matching the new database contents. foos = <-fooChan fmt.Printf("updated foos: %v\n", foos.V) // We should *not* get an updated value on the specificChan, since we didn't change the row // it cares about. select { case <-specificChan: fmt.Printf("unexpected foos: %v\n", foos.V) default: } // Update the data in the table. err = sqlitestream.WriteParamsVoid(db, ctx, 7, func(tx *sql.Tx, val int) error { _, err := tx.Exec(` UPDATE foo SET a = 1 WHERE a = ?; `, val) return err }) // We should get an updated value matching the new database contents. foos = <-fooChan fmt.Printf("updated foos: %v\n", foos.V) // We should get an updated value matching the new database contents. foos = <-specificChan fmt.Printf("updated specific foos: %v\n", foos.V) }
Output: initial foos: [5 6 7] initial specific foos: [7] updated foos: [5 9 7 100] updated foos: [5 9 1 100] updated specific foos: [1]