mattn_stdlib_stream

package
v0.0.0-...-96c2757 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2024 License: MIT, Unlicense Imports: 7 Imported by: 0

Documentation

Overview

Package mattn_stdlib_stream provides a concrete example of building a streaming SQLite API using the popular mattn sqlite3 driver and Go stdlib database/sql package.

To use different database drivers or query helpers, copy this (small) package and reimplement OpenDB with your preferred types.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DB

type DB = sqlitestream.DB[*sql.DB, *sql.Tx]

func OpenDB

func OpenDB(ctx context.Context, driverName, dsn string) (*DB, error)

OpenDB connects the various parts of the SQLite streaming infrastructure together. It builds a watcher, defines a unique sql driver name bound to that watcher, and ensures that all database connections with that driver name properly notify the watcher of database changes. It then builds the appropriate DB structure for streaming queries.

Example

ExampleOpenDB demonstrates the creation and use of a streamable SQLite database.

package main

import (
	"context"
	"database/sql"
	"fmt"
	"os"

	"git.sr.ht/~gioverse/skel/stream/sqlitestream"
	"git.sr.ht/~gioverse/skel/stream/sqlitestream/mattn_stdlib_stream"
	"git.sr.ht/~gioverse/skel/stream/sqlitestream/sqlitewatch"
)

func main() {
	// Create a context to manage the lifetime of our database streaming.
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	// Create a streamable database.
	db, err := mattn_stdlib_stream.OpenDB(ctx, "sqlite3", "database.sqlite?_journal_mode=WAL")
	if err != nil {
		panic(err)
	}
	// Clean up after ourselves.
	defer os.Remove("database.sqlite")
	// Ensure we close the DB before deleting it.
	defer db.Close()

	// Populate the DB with some data.
	err = sqlitestream.WriteVoid(db, ctx, func(tx *sql.Tx) error {
		_, err := tx.Exec(`CREATE TABLE foo (a INTEGER); INSERT INTO foo (a) VALUES (5),(6),(7);`)
		return err
	})
	if err != nil {
		panic(err)
	}

	// Create a stream of the data in the foo table using the sqlitestream helpers.
	fooChan := sqlitestream.StreamTables(db, ctx, func(tx *sql.Tx) ([]int, error) {
		var foos []int
		rows, err := tx.Query(`SELECT * from foo;`)
		if err != nil {
			return nil, err
		}
		defer rows.Close()
		for rows.Next() {
			var f int
			if err := rows.Scan(&f); err != nil {
				return nil, err
			}
			foos = append(foos, f)
		}
		return foos, err
	}, "foo") // Note the "foo" here. This ensures that we get updates on this channel when this table changes.

	// We should get an initial value with the current data in the table.
	foos := <-fooChan
	fmt.Printf("initial foos: %v\n", foos.V)

	// Resolve the rowid of a row we care about. We can use this to watch changes to that particular row
	// without being notified of changes elsewhere in the table.
	rowid, err := sqlitestream.ReadParams(db, ctx, int(7), func(tx *sql.Tx, val int) (int64, error) {
		// Look up the rowid of a particular row in the table. We will only be notified of
		// changes to *that* row.
		var rowid int64
		row := tx.QueryRow(`SELECT rowid FROM foo WHERE a = ?;`, val)
		if err := row.Scan(&rowid); err != nil {
			return 0, err
		}
		return rowid, err
	})
	if err != nil {
		panic(err)
	}
	// Use the rowid to stream that table row.
	specificChan := sqlitestream.StreamParams(db, ctx, rowid, func(tx *sql.Tx, rowid int64) ([]int, error) {
		var foos []int
		rows, err := tx.Query(`SELECT * FROM foo WHERE rowid = ?;`, rowid)
		if err != nil {
			return nil, err
		}
		defer rows.Close()
		for rows.Next() {
			var f int
			if err := rows.Scan(&f); err != nil {
				return nil, err
			}
			foos = append(foos, f)
		}
		return foos, err
	}, func(tx *sql.Tx, rowid int64) []sqlitewatch.Table {
		// Depending on your use-case, you can sometimes resolve the rowid in here.
		return []sqlitewatch.Table{{Name: "foo", RowIDs: []int64{rowid}}}
	})

	// We should get an initial value with the current data in the table.
	foos = <-specificChan
	fmt.Printf("initial specific foos: %v\n", foos.V)

	// Update the data in the table.
	err = sqlitestream.WriteVoid(db, ctx, func(tx *sql.Tx) error {
		_, err := tx.Exec(`
    		UPDATE foo SET a = 9 WHERE a = 6;
    		INSERT INTO foo (a) VALUES (100);
		`)
		return err
	})

	// We should get an updated value matching the new database contents.
	foos = <-fooChan
	fmt.Printf("updated foos: %v\n", foos.V)

	// We should *not* get an updated value on the specificChan, since we didn't change the row
	// it cares about.
	select {
	case <-specificChan:
		fmt.Printf("unexpected foos: %v\n", foos.V)
	default:
	}

	// Update the data in the table.
	err = sqlitestream.WriteParamsVoid(db, ctx, 7, func(tx *sql.Tx, val int) error {
		_, err := tx.Exec(` UPDATE foo SET a = 1 WHERE a = ?; `, val)
		return err
	})

	// We should get an updated value matching the new database contents.
	foos = <-fooChan
	fmt.Printf("updated foos: %v\n", foos.V)
	// We should get an updated value matching the new database contents.
	foos = <-specificChan
	fmt.Printf("updated specific foos: %v\n", foos.V)

}
Output:

initial foos: [5 6 7]
initial specific foos: [7]
updated foos: [5 9 7 100]
updated foos: [5 9 1 100]
updated specific foos: [1]

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL