driver

package
v0.33.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2023 License: MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const Comma = ", "

Comma is the comma string to use in SQL queries.

Variables

View Source
var Tuning = struct {
	// ErrgroupLimit is passed to errgroup.Group.SetLimit.
	// Note that this is the limit for any one errgroup, but
	// not a ceiling on the total number of goroutines spawned,
	// as some errgroups may themselves start an errgroup.
	ErrgroupLimit int

	// RecordChSize is the size of the buffer chan for record
	// insertion/writing.
	RecordChSize int

	// SampleSize is the number of samples that a detector should
	// take to determine type.
	SampleSize int

	// MaxRetryInterval is the maximum interval to wait between retries.
	MaxRetryInterval time.Duration

	// SQLConfig holds config for sql.DB.
	SQLConfig *SQLConfig
}{
	ErrgroupLimit:    16,
	RecordChSize:     1024,
	SampleSize:       1024,
	MaxRetryInterval: time.Second * 3,
	SQLConfig: &SQLConfig{
		MaxOpenConns:    50,
		MaxIdleConns:    8,
		ConnMaxIdleTime: time.Second * 10,
		ConnMaxLifetime: 0,
	},
}

Tuning holds tuning params. Ultimately these params could come from user config or be dynamically calculated/adjusted?

Functions

func MaxBatchRows

func MaxBatchRows(drvr SQLDriver, numCols int) int

MaxBatchRows returns the maximum number of rows allowed for a batch insert for drvr. Note that the returned value may differ for each database driver.

func NewRecordFromScanRow

func NewRecordFromScanRow(meta sqlz.RecordMeta, row []any, skip []int) (rec sqlz.Record, skipped []int)

NewRecordFromScanRow iterates over the elements of the row slice from rows.Scan, and returns a new (record) slice, replacing any wrapper types such as sql.NullString with the unboxed value, and other similar sanitization. For example, it will make a copy of any sql.RawBytes. The row slice can be reused by rows.Scan after this function returns.

Any row elements specified in skip will not be processed; the value will be copied directly from row[i] into rec[i]. If any element of row otherwise cannot be processed, its value is copied directly into rec, and its index is returned in skipped. The caller must take appropriate action to deal with all elements of rec listed in skipped.

REVISIT: Do we need the skip mechanism at all?

func PrepareInsertStmt

func PrepareInsertStmt(ctx context.Context, drvr SQLDriver, db sqlz.Preparer, destTbl string, destCols []string,
	numRows int,
) (stmt *sql.Stmt, err error)

PrepareInsertStmt prepares an insert statement using driver-specific syntax from drvr. numRows specifies how many rows of values are inserted by each execution of the insert statement (1 row being the prototypical usage).

Types

type BatchInsert

type BatchInsert struct {
	// RecordCh is the channel that the caller sends records on. The
	// caller must close RecordCh when done.
	RecordCh chan<- []any

	// ErrCh returns any errors that occur during insert. ErrCh is
	// closed by BatchInsert when processing is complete.
	ErrCh <-chan error
	// contains filtered or unexported fields
}

BatchInsert encapsulates inserting records to a db. The caller sends (munged) records on recCh; the record values should be munged via the Munge method prior to sending. Records are written to db in batches of batchSize as passed to NewBatchInsert (the final batch may be less than batchSize). The caller must close recCh to indicate that all records have been sent, or cancel the ctx passed to NewBatchInsert to stop the insertion goroutine. Any error is returned on errCh. Processing is complete when errCh is closed: the caller must select on errCh.

func NewBatchInsert

func NewBatchInsert(ctx context.Context, drvr SQLDriver, db sqlz.DB,
	destTbl string, destColNames []string, batchSize int,
) (*BatchInsert, error)

NewBatchInsert returns a new BatchInsert instance. The internal goroutine is started.

Note that the db arg must guarantee a single connection: that is, it must be a sql.Conn or sql.Tx.

func (BatchInsert) Munge

func (bi BatchInsert) Munge(rec []any) error

Munge should be invoked on every record before sending on RecordCh.

func (*BatchInsert) Written

func (bi *BatchInsert) Written() int64

Written returns the number of records inserted (at the time of invocation). For the final value, Written should be invoked after ErrCh is closed.

type Database

type Database interface {
	// DB returns the sql.DB object for this Database.
	DB() *sql.DB

	// SQLDriver returns the underlying database driver. This
	// may be different from the type reported by the
	// Database source.
	SQLDriver() SQLDriver

	// Source returns the data source for which this connection was opened.
	Source() *source.Source

	// SourceMetadata returns metadata about the data source.
	SourceMetadata(ctx context.Context) (*source.Metadata, error)

	// TableMetadata returns metadata for the specified table in the data source.
	TableMetadata(ctx context.Context, tblName string) (*source.TableMetadata, error)

	// Close is invoked to close and release any underlying resources.
	Close() error
}

Database models a database handle. It is conceptually equivalent to stdlib sql.DB, and in fact encapsulates a sql.DB instance. The realized sql.DB instance can be accessed via the DB method.

type DatabaseOpener

type DatabaseOpener interface {
	// Open returns a Database instance for src.
	Open(ctx context.Context, src *source.Source) (Database, error)
}

DatabaseOpener opens a Database.

type Databases

type Databases struct {
	// contains filtered or unexported fields
}

Databases provides a mechanism for getting Database instances. Note that at this time instances returned by Open are cached and then closed by Close. This may be a bad approach.

func NewDatabases

func NewDatabases(log *slog.Logger, drvrs Provider, scratchSrcFn ScratchSrcFunc) *Databases

NewDatabases returns a Databases instances.

func (*Databases) Close

func (d *Databases) Close() error

Close closes d, invoking Close on any instances opened via d.Open.

func (*Databases) Open

func (d *Databases) Open(ctx context.Context, src *source.Source) (Database, error)

Open returns an opened Database for src. The returned Database may be cached and returned on future invocations for the same handle. Thus, the caller should typically not close the Database: it will be closed via d.Close.

NOTE: This entire logic re caching/not-closing is a bit sketchy, and needs to be revisited.

Open implements DatabaseOpener.

func (*Databases) OpenJoin

func (d *Databases) OpenJoin(ctx context.Context, src1, src2 *source.Source, srcN ...*source.Source) (Database, error)

OpenJoin opens an appropriate database for use as a work DB for joining across sources.

Note: There is much work to be done on this method. At this time, only two sources are supported. Ultimately OpenJoin should be able to inspect the join srcs and use heuristics to determine the best location for the join to occur (to minimize copying of data for the join etc.). Currently the implementation simply delegates to OpenScratch.

OpenJoin implements JoinDatabaseOpener.

func (*Databases) OpenScratch

func (d *Databases) OpenScratch(ctx context.Context, name string) (Database, error)

OpenScratch returns a scratch database instance. It is not necessary for the caller to close the returned Database as its Close method will be invoked by d.Close.

OpenScratch implements ScratchDatabaseOpener.

type Driver

type Driver interface {
	DatabaseOpener

	// DriverMetadata returns driver metadata.
	DriverMetadata() Metadata

	// ValidateSource verifies that the source is valid for this driver. It
	// may transform the source into a canonical form, which is returned in
	// the "src" return value (the original source is not changed). An error
	// is returned if the source is invalid.
	ValidateSource(source *source.Source) (src *source.Source, err error)

	// Ping verifies that the source is reachable, or returns an error if not.
	// The exact behavior of Ping() is driver-dependent.
	Ping(ctx context.Context, src *source.Source) error

	// Truncate truncates tbl in src. If arg reset is true, the
	// identity counter for tbl should be reset, if supported
	// by the driver. Some DB impls may reset the identity
	// counter regardless of the val of reset.
	Truncate(ctx context.Context, src *source.Source, tbl string, reset bool) (affected int64, err error)
}

Driver is the interface that must be implemented for a data source type.

type InsertMungeFunc

type InsertMungeFunc func(vals sqlz.Record) error

InsertMungeFunc is invoked on vals before insertion (or update, despite the name). Note that InsertMungeFunc operates on the vals slice, while NewRecordFunc returns a new slice.

func DefaultInsertMungeFunc

func DefaultInsertMungeFunc(destTbl string, destMeta sqlz.RecordMeta) InsertMungeFunc

DefaultInsertMungeFunc returns an InsertMungeFunc that checks the values of rec against destMeta and performs necessary munging. For example, if any element is a ptr to an empty string and the dest type is a not of kind Text, the empty string was probably intended to mean nil. This happens when the original source doesn't handle nil, e.g. with CSV, where nil is effectively represented by "".

The returned InsertMungeFunc accounts for common cases, but it's possible that certain databases will require a custom InsertMungeFunc.

type JoinDatabaseOpener

type JoinDatabaseOpener interface {
	// OpenJoin opens an appropriate Database for use as
	// a work DB for joining across sources.
	OpenJoin(ctx context.Context, src1, src2 *source.Source, srcN ...*source.Source) (Database, error)
}

JoinDatabaseOpener can open a join database.

type Metadata

type Metadata struct {
	// Type is the driver source type, e.g. "mysql" or "csv", etc.
	Type source.Type `json:"type"`

	// Description is typically the long name of the driver, e.g.
	// "MySQL" or "Microsoft Excel XLSX".
	Description string `json:"description"`

	// Doc is optional documentation, typically a URL.
	Doc string `json:"doc,omitempty"`

	// UserDefined is true if this driver is the product of a
	// user driver definition, and false if built-in.
	UserDefined bool `json:"user_defined"`

	// IsSQL is true if this driver is a SQL driver.
	IsSQL bool `json:"is_sql"`

	// Monotable is true if this is a non-SQL document type that
	// effectively has a single table, such as CSV.
	Monotable bool `json:"monotable"`
}

Metadata holds driver metadata.

type NewRecordFunc

type NewRecordFunc func(scanRow []any) (rec sqlz.Record, err error)

NewRecordFunc is invoked on a query result row (scanRow) to normalize and standardize the data, returning a new record. The provided scanRow arg is available for reuse after this func returns.

Ultimately rec should only contain:

nil, *int64, *bool, *float64, *string, *[]byte, *time.Time

Thus a func instance might unbox sql.NullString et al, or deal with any driver specific quirks.

type Provider

type Provider interface {
	// DriverFor returns a driver instance for the given type.
	DriverFor(typ source.Type) (Driver, error)
}

Provider is a factory that returns Driver instances.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry provides access to driver implementations.

func NewRegistry

func NewRegistry(log *slog.Logger) *Registry

NewRegistry returns a new Registry instance that provides access to driver implementations. Note that Registry implements Provider.

func (*Registry) AddProvider

func (r *Registry) AddProvider(typ source.Type, p Provider)

AddProvider registers the provider for the specified driver type. This method has no effect if there's already a provider for typ.

func (*Registry) DriverFor

func (r *Registry) DriverFor(typ source.Type) (Driver, error)

DriverFor implements Provider.

func (*Registry) Drivers

func (r *Registry) Drivers() []Driver

Drivers returns the registered drivers.

func (*Registry) DriversMetadata

func (r *Registry) DriversMetadata() []Metadata

DriversMetadata returns metadata for each registered driver type.

func (*Registry) ProviderFor

func (r *Registry) ProviderFor(typ source.Type) Provider

ProviderFor returns the provider for typ, or nil if no registered provider.

type SQLConfig added in v0.31.0

type SQLConfig struct {
	MaxOpenConns    int
	MaxIdleConns    int
	ConnMaxIdleTime time.Duration
	ConnMaxLifetime time.Duration
}

SQLConfig encapsulates settings for sql.DB.

func (*SQLConfig) Apply added in v0.31.0

func (c *SQLConfig) Apply(db *sql.DB)

Apply applies c to db.

type SQLDriver

type SQLDriver interface {
	Driver

	// Dialect returns the SQL dialect.
	Dialect() dialect.Dialect

	// Renderer returns the SQL renderer for this driver.
	Renderer() *render.Renderer

	// CurrentSchema returns the current schema name.
	CurrentSchema(ctx context.Context, db sqlz.DB) (string, error)

	// TableColumnTypes returns the column type info from
	// the SQL driver. If len(colNames) is 0, info is returned
	// for all columns in the table.
	TableColumnTypes(ctx context.Context, db sqlz.DB, tblName string, colNames []string) ([]*sql.ColumnType, error)

	// RecordMeta returns the result metadata (the metadata for
	// each col) from colTypes. RecordMeta is preferred over
	// sql.Rows.ColumnTypes because of the inconsistent behavior
	// of various SQL driver implementations wrt reporting
	// "nullable" information and other quirks. The returned
	// metadata may differ from the original metadata returned
	// by rows.ColumnTypes.
	//
	// The caller should typically should invoke rows.Next before
	// this method is invoked, as some implementations do not return
	// complete column type info until after the first call to rows.Next.
	//
	// RecordMeta also returns a NewRecordFunc which can be
	// applied to the scan row from sql.Rows.
	RecordMeta(colTypes []*sql.ColumnType) (sqlz.RecordMeta, NewRecordFunc, error)

	// PrepareInsertStmt prepares a statement for inserting
	// values to destColNames in destTbl. numRows specifies
	// how many rows of values are inserted by each execution of
	// the insert statement (1 row being the prototypical usage).
	// It is the caller's responsibility to close the execer.
	//
	// Note that db must guarantee a single connection: that is, db
	// must be a sql.Conn or sql.Tx.
	PrepareInsertStmt(ctx context.Context, db sqlz.DB, destTbl string, destColNames []string, numRows int) (*StmtExecer,
		error)

	// PrepareUpdateStmt prepares a statement for updating destColNames in
	// destTbl, using the supplied where clause (which may be empty).
	// The where arg should use question mark "?" as the placeholder: it will
	// be translated to the appropriate driver-specific placeholder. For example,
	// the where arg could be:
	//
	//   "actor_id = ? AND first_name = ?".
	//
	// Use the returned StmtExecer per its documentation. It is the caller's
	// responsibility to close the execer.
	//
	// Note that db must guarantee a single connection: that is, db
	// must be a sql.Conn or sql.Tx.
	PrepareUpdateStmt(ctx context.Context, db sqlz.DB, destTbl string, destColNames []string,
		where string) (*StmtExecer, error)

	// CreateTable creates the table defined by tblDef. Some implementations
	// may not honor all of the fields of tblDef, e.g. an impl might not
	// build the foreign key constraints. At a minimum the implementation
	// must honor the table name and column names and kinds from tblDef.
	CreateTable(ctx context.Context, db sqlz.DB, tblDef *sqlmodel.TableDef) error

	// TableExists returns true if there's an existing table tbl in db.
	TableExists(ctx context.Context, db sqlz.DB, tbl string) (bool, error)

	// CopyTable copies fromTable into a new table toTable.
	// If copyData is true, fromTable's data is also copied.
	// Constraints (keys, defaults etc.) may not be copied. The
	// number of copied rows is returned in copied.
	CopyTable(ctx context.Context, db sqlz.DB, fromTable, toTable string, copyData bool) (copied int64, err error)

	// DropTable drops tbl from db. If ifExists is true, an "IF EXISTS"
	// or equivalent clause is added, if supported.
	DropTable(ctx context.Context, db sqlz.DB, tbl string, ifExists bool) error

	// AlterTableRename renames a table.
	AlterTableRename(ctx context.Context, db sqlz.DB, tbl, newName string) error

	// AlterTableAddColumn adds column col to tbl. The column is appended
	// to the list of columns (that is, the column position cannot be
	// specified).
	AlterTableAddColumn(ctx context.Context, db sqlz.DB, tbl, col string, knd kind.Kind) error

	// AlterTableRenameColumn renames a column.
	AlterTableRenameColumn(ctx context.Context, db sqlz.DB, tbl, col, newName string) error
}

SQLDriver is implemented by Driver instances for SQL databases.

type ScratchDatabaseOpener

type ScratchDatabaseOpener interface {
	// OpenScratch returns a database for scratch use.
	OpenScratch(ctx context.Context, name string) (Database, error)
}

ScratchDatabaseOpener opens a scratch database. A scratch database is typically a short-lived database used as a target for loading non-SQL data (such as CSV).

type ScratchSrcFunc

type ScratchSrcFunc func(log *slog.Logger, name string) (src *source.Source, cleanFn func() error, err error)

ScratchSrcFunc is a function that returns a scratch source. The caller is responsible for invoking cleanFn.

type StmtExecFunc

type StmtExecFunc func(ctx context.Context, args ...any) (affected int64, err error)

StmtExecFunc is provided by driver implementations to wrap execution of a prepared statement. Typically the func will perform some driver-specific action, such as managing retryable errors.

type StmtExecer

type StmtExecer struct {
	// contains filtered or unexported fields
}

StmtExecer encapsulates the elements required to execute a SQL statement. Typically the statement is an INSERT. The Munge method should be applied to each row of values prior to invoking Exec. The caller is responsible for invoking Close.

func NewStmtExecer

func NewStmtExecer(stmt *sql.Stmt, mungeFn InsertMungeFunc, execFn StmtExecFunc, destMeta sqlz.RecordMeta) *StmtExecer

NewStmtExecer returns a new StmtExecer instance. The caller is responsible for invoking Close on the returned StmtExecer.

func (*StmtExecer) Close

func (x *StmtExecer) Close() error

Close closes x's statement.

func (*StmtExecer) DestMeta

func (x *StmtExecer) DestMeta() sqlz.RecordMeta

DestMeta returns the RecordMeta for the destination table columns.

func (*StmtExecer) Exec

func (x *StmtExecer) Exec(ctx context.Context, args ...any) (affected int64, err error)

Exec executes the statement. The caller should invoke Munge on each row of values prior to passing those values to Exec.

func (*StmtExecer) Munge

func (x *StmtExecer) Munge(rec []any) error

Munge should be applied to each row of values prior to inserting invoking Exec.

Directories

Path Synopsis
Package dialect contains functionality for SQL dialects.
Package dialect contains functionality for SQL dialects.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL