sql

package
v0.0.0-...-8b3bc45 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2022 License: Apache-2.0 Imports: 15 Imported by: 3

Documentation

Index

Constants

View Source
const (
	// DefaultFlowCheckpoints is the default table for checkpoints.
	DefaultFlowCheckpoints = "flow_checkpoints_v1"
	// DefaultFlowMaterializations is the default table for materialization specs.
	DefaultFlowMaterializations = "flow_materializations_v2"
)
View Source
const TypeLengthPlaceholder = "?"

TypeLengthPlaceholder is the placeholder string that may appear in the SQL string, which will be replaced by the MaxLength of the string.

Variables

View Source
var (
	// DefaultUnwrappedIdentifiers is a SkipWrapper function that checks for identifiers that 1
	// typically do not need wrapping.
	DefaultUnwrappedIdentifiers = regexp.MustCompile(`^[_\pL]+[_\pL\pN]*$`).MatchString

	// DefaultQuoteSanitizer used for sanitizing fields in SQL.
	DefaultQuoteSanitizer = strings.NewReplacer("'", "''").Replace
)

Functions

func BackticksWrapper

func BackticksWrapper() func(text string) string

Backticks returns a wrapper function with a single backtick character on the both the Left and the Right.

func DoubleQuotesWrapper

func DoubleQuotesWrapper() func(text string) string

DoubleQuotes returns a wrapper function with a single double quote character on the both the Left and the Right.

func DumpTables

func DumpTables(db *sql.DB, tables ...*Table) (string, error)

DumpTables is a convenience for testing which dumps the contents of the given tables into a debug string suitable for snapshotting.

func Identity

func Identity(elem interface{}) (interface{}, error)

Identity is an identity function for no-op conversions of tuple elements to `interface{}` values that are suitable for use as sql parameters.

func PostgresParameterPlaceholder

func PostgresParameterPlaceholder(parameterIndex int) string

PostgresParameterPlaceholder returns $N style parameters where N is the parameter number starting at 1.

func QuestionMarkPlaceholder

func QuestionMarkPlaceholder(_ int) string

QuestionMarkPlaceholder returns the constant string "?".

func SingleQuotesWrapper

func SingleQuotesWrapper() func(text string) string

SingleQuotesWrapper returns a wrapper function with one single quote character on the both the Left and the Right.

func ValidateMatchesExisting

func ValidateMatchesExisting(existing *pf.MaterializationSpec_Binding, proposed *pf.CollectionSpec) map[string]*pm.Constraint

ValidateMatchesExisting returns a set of constraints to use when there is a new proposed CollectionSpec for a materialization that is already running, or has been Applied. The returned constraints will explicitly require all fields that are currently materialized, as long as they are not unsatisfiable, and forbid any fields that are not currently materialized.

func ValidateNewSQLProjections

func ValidateNewSQLProjections(proposed *pf.CollectionSpec, deltaUpdates bool) map[string]*pm.Constraint

ValidateNewSQLProjections returns a set of constraints for a proposed flow collection for a **new** materialization (one that is not running and has never been Applied). Note that this will "recommend" all projections of single scalar types, which is what drives the default field selection in flowctl.

func ValidateSelectedFields

func ValidateSelectedFields(constraints map[string]*pm.Constraint, proposed *pf.MaterializationSpec_Binding) error

ValidateSelectedFields validates a proposed MaterializationSpec against a set of constraints. If any constraints would be violated, then an error is returned.

Types

type Column

type Column struct {
	// The Name of the column, which is used when referencing columns by name when calling SQL
	// generation functions. This value should not include any quotes or escape characters.
	Name string
	// Identifier is the final form of the column name, exactly as it should be represented in SQL
	// statements. If quoting is necessary, then the quotes must be included here.
	Identifier string
	// Comment is optional text that will be used only on CREATE TABLE statements
	Comment string
	// PrimaryKey is true if this column is the primary key, or if it is part of a composite key.
	PrimaryKey bool
	// Type is the application type of the data. This corresponds closely to JSON types, but
	// includes "binary" and excludes "null". Unlike Flow Projections, a Column may only have a
	// single type, and nullability is represented as a separate boolean rather than a type itself.
	Type ColumnType
	// StringType is optional additional type information for strings.
	StringType *StringTypeInfo
	// NotNull is true if the database columns should disallow null values.
	NotNull bool
}

Column describes a SQL table column that will hold JSON values

func ColumnForProjection

func ColumnForProjection(projection *pf.Projection, identifierRenderer *Renderer) Column

ColumnForProjection returns a Column that is appropriate for storing values from the given Projection.

type ColumnType

type ColumnType string

ColumnType represents a minimal set of database-agnostic types that we may try to store and query. This set of types is slightly different than the set of JSON types. This has a "binary" type for dealing with byte slices, and there is no "null" type, since nullability is modeled separately.

const (
	STRING  ColumnType = "string"
	BOOLEAN ColumnType = "boolean"
	INTEGER ColumnType = "integer"
	NUMBER  ColumnType = "number"
	OBJECT  ColumnType = "object"
	ARRAY   ColumnType = "array"
	BINARY  ColumnType = "binary"
)

ColumnType constants that are used by ColumnTypeMapper

type ColumnTypeMapper

type ColumnTypeMapper map[ColumnType]TypeMapper

ColumnTypeMapper selects a specific TypeMapper based on the type of the data that will be passed to as a parameter for inserts or updates to the column.

func (ColumnTypeMapper) GetColumnType

func (amap ColumnTypeMapper) GetColumnType(col *Column) (*ResolvedColumnType, error)

GetColumnType implements the TypeMapper interface

type CommentConfig

type CommentConfig struct {
	// Linewise determines whether to render line or block comments. If it is true, then each line
	// of comment text will be wrapped separately. If false, then the entire multi-line block of
	// comment text will be wrapped once.
	Linewise bool
	// Wrap holds the strings that will bound the beginning and end of the comment.
	Wrap TokenPair
}

CommentConfig determines how SQL comments are rendered.

func LineComment

func LineComment() CommentConfig

LineComment returns a CommentConfig configured for standard sql line comments that begins each line with a double dash ("-- ")

type CommentRenderer

type CommentRenderer struct {
	// Linewise determines whether to render line or block comments. If it is true, then each line
	// of comment text will be wrapped separately. If false, then the entire multi-line block of
	// comment text will be wrapped once.
	Linewise bool
	// Wrap holds the strings that will bound the beginning and end of the comment.
	Wrap *TokenPair
}

CommentRenderer is used to render comments in SQL.

func LineCommentRenderer

func LineCommentRenderer() *CommentRenderer

LineCommentRenderer returns a per line comment valid for standard SQL.

func (*CommentRenderer) Render

func (cr *CommentRenderer) Render(text string) string

Render takes a string and renders it as a comment based on it's configuration.

func (*CommentRenderer) Write

func (cr *CommentRenderer) Write(w io.Writer, text string, indent string) (int, error)

Write renders a comment based on the rules.

type ConstColumnType

type ConstColumnType ResolvedColumnType

ConstColumnType is a ResolvedColumnType that is known statically at compile time.

func RawConstColumnType

func RawConstColumnType(sql string) ConstColumnType

RawConstColumnType returns a ConstColumnType that always uses the given sql string as DDL and performs a no-op value conversion.

func (ConstColumnType) GetColumnType

func (c ConstColumnType) GetColumnType(col *Column) (*ResolvedColumnType, error)

GetColumnType implements the TypeMapper interface

type Driver

type Driver struct {
	// URL at which documentation for the driver may be found.
	DocumentationURL string
	// Instance of the type into which endpoint specifications are parsed.
	EndpointSpecType interface{}
	// Instance of the type into which resource specifications are parsed.
	ResourceSpecType Resource
	// NewEndpoint returns an Endpoint, which will be used to handle interactions with the database.
	NewEndpoint func(context.Context, json.RawMessage) (Endpoint, error)
	// NewResource returns an uninitialized Resource which may be parsed into.
	NewResource func(ep Endpoint) Resource
	// NewTransactor returns a Transactor ready for pm.RunTransactions.
	NewTransactor func(context.Context, Endpoint, *pf.MaterializationSpec, Fence, []Resource) (pm.Transactor, error)
}

Driver implements the pm.DriverServer interface.

func (*Driver) ApplyDelete

func (d *Driver) ApplyDelete(ctx context.Context, req *pm.ApplyRequest) (*pm.ApplyResponse, error)

ApplyDelete implements the DriverServer interface.

func (*Driver) ApplyUpsert

func (d *Driver) ApplyUpsert(ctx context.Context, req *pm.ApplyRequest) (*pm.ApplyResponse, error)

ApplyUpsert implements the DriverServer interface.

func (*Driver) Spec

func (d *Driver) Spec(ctx context.Context, req *pm.SpecRequest) (*pm.SpecResponse, error)

Spec implements the DriverServer interface.

func (*Driver) Transactions

func (d *Driver) Transactions(stream pm.Driver_TransactionsServer) error

Transactions implements the DriverServer interface.

func (*Driver) Validate

func (d *Driver) Validate(ctx context.Context, req *pm.ValidateRequest) (*pm.ValidateResponse, error)

Validate implements the DriverServer interface.

type Endpoint

type Endpoint interface {

	// LoadSpec loads the named MaterializationSpec and its version that's stored within the Endpoint, if any.
	LoadSpec(ctx context.Context, materialization pf.Materialization) (string, *pf.MaterializationSpec, error)

	// CreateTableStatement returns the SQL statement to create the specified table in the correct dialect.
	CreateTableStatement(table *Table) (string, error)

	// ExecuteStatements takes a slice of SQL statements and executes them as a single transaction
	// (or as multiple transactions if it's not possible for the implementation) and rolls back
	// if there is a failure.
	ExecuteStatements(ctx context.Context, statements []string) error

	// NewFence installs and returns a new endpoint specific Fence implementation. On return, all
	// older endpoints with matching materialization name and overlapping key-range will be
	// blocked from further database operations. This prevents rogue endpoints from committing
	// further transactions.
	NewFence(ctx context.Context, materialization pf.Materialization, keyBegin, keyEnd uint32) (Fence, error)

	// Generator returns the dialect specific SQL generator for the endpoint.
	Generator() *Generator

	// FlowTables returns the FlowTables definitions for this endpoint.
	FlowTables() *FlowTables
}

Endpoint is an sql compatible endpoint that allows dialect specific tasks and generators.

type ExecFn

type ExecFn func(ctx context.Context, sql string, arguments ...interface{}) (rowsAffected int64, _ error)

ExecFn executes a |sql| statement with |arguments|, and returns the number of rows affected.

type Fence

type Fence interface {
	// Fetch the current checkpoint.
	Checkpoint() []byte
	// SetCheckpoint sets the current checkpoint.
	SetCheckpoint(checkpoint []byte)
	// LogEntry returns a logger Entry with context of the current fence to differentiate
	// concurrent threads in the logs.
	LogEntry() *logrus.Entry
}

Fence is an installed barrier in a shared checkpoints table which prevents other sessions from committing transactions under the fenced ID -- and prevents this Fence from committing where another session has in turn fenced this instance off.

type FlowTables

type FlowTables struct {
	Checkpoints *Table // Table of Flow checkpoints.
	Specs       *Table // Table of MaterializationSpecs.
}

FlowTables is the table specifications for Flow.

func DefaultFlowTables

func DefaultFlowTables(prefix string) FlowTables

DefaultFlowTables returns the default Flow *Table configurations and names with optional prefix. The prefix can be used to prepend pre-table identifiers such as schema names.

type Generator

type Generator struct {
	Placeholder        func(int) string
	CommentRenderer    *CommentRenderer
	IdentifierRenderer *Renderer
	ValueRenderer      *Renderer
	TypeMappings       TypeMapper
}

Generator generates SQL for a large variety of SQL dialects using various configuration parameters.

func PostgresSQLGenerator

func PostgresSQLGenerator() Generator

PostgresSQLGenerator returns a SQLGenerator for the postgresql SQL dialect.

func SQLiteSQLGenerator

func SQLiteSQLGenerator() Generator

SQLiteSQLGenerator returns a SQLGenerator for the sqlite SQL dialect.

func (*Generator) InsertStatement

func (gen *Generator) InsertStatement(table *Table) (string, ParametersConverter, error)

InsertStatement returns an insert statement for the given table that includes all columns. The returned sql will have a parameter placeholder for every column in the order they appear in the Table. This should generate a plain insert statement, not an upsert, since we'll know in advance whether each document exists or not, and only use the InsertStatement when we know the document does not exist.

func (*Generator) QueryOnPrimaryKey

func (gen *Generator) QueryOnPrimaryKey(table *Table, selectColumns ...string) (string, ParametersConverter, error)

QueryOnPrimaryKey generates a query that has a placeholder parameter for each primary key in the order given in the table. Only selectColumns will be selected in the same order as provided.

func (*Generator) UpdateStatement

func (gen *Generator) UpdateStatement(table *Table, setColumns []string, whereColumns []string) (string, ParametersConverter, error)

UpdateStatement returns an update statement for the given table that sets the columns given in setColumns and matches based on the columns in whereColumns. The returned statement will have a placeholder parameter for each of the setColumns in the order given, followed by a parameter for each of the whereColumns in the order given.

type LengthConstrainedColumnType

type LengthConstrainedColumnType ResolvedColumnType

LengthConstrainedColumnType is a TypeMapper that must always have a length argument, e.g. "VARCHAR(42)"

func (LengthConstrainedColumnType) GetColumnType

func (c LengthConstrainedColumnType) GetColumnType(col *Column) (*ResolvedColumnType, error)

GetColumnType implements the TypeMapper interface

type MaxLengthableColumnType

type MaxLengthableColumnType struct {
	WithoutLength *ConstColumnType
	WithLength    *LengthConstrainedColumnType
}

MaxLengthableColumnType is a TypeMapper that supports column types that may have a length argument (e.g. "VARCHAR(76)").

func (MaxLengthableColumnType) GetColumnType

func (c MaxLengthableColumnType) GetColumnType(col *Column) (*ResolvedColumnType, error)

GetColumnType implements the TypeMapper interface

type NullableTypeMapping

type NullableTypeMapping struct {
	NotNullText  string
	NullableText string
	Inner        TypeMapper
}

NullableTypeMapping wraps a TypeMapper to add "NULL" and/or "NOT NULL" to the generated SQL type depending on the nullability of the column. Most databases will assume that a column may contain null as long as it isn't declared with a NOT NULL constraint, but some databases (e.g. ms sql server) make that behavior configurable, requiring the DDL to explicitly declare a column with NULL if it may contain null values. This wrapper will handle either or both cases.

func (NullableTypeMapping) GetColumnType

func (mapper NullableTypeMapping) GetColumnType(col *Column) (*ResolvedColumnType, error)

GetColumnType implements the TypeMapper interface

type ParametersConverter

type ParametersConverter []func(interface{}) (interface{}, error)

ParametersConverter is a slice of functions that can be used to convert a Tuple into an []interface{} that can be passed to the database driver. This conversion may be different depending on the specific driver, so instances of ParametersConverter should be obtained from the Generator when generating a sql statement.

func (ParametersConverter) Convert

func (c ParametersConverter) Convert(tup tuple.Tuple) ([]interface{}, error)

Convert transforms the given Tuple into an []interface{} that can be used to execute SQL statements.

type Renderer

type Renderer struct {
	// contains filtered or unexported fields
}

Renderer is used for naming things inside of SQL. It can be used for fields or values to handle sanitization and quoting.

func NewRenderer

func NewRenderer(sanitizer func(string) string, wrapper func(string) string, skipWrapper func(string) bool) *Renderer

NewRenderer returns a configured renderer instance.

func (*Renderer) Render

func (r *Renderer) Render(text string) string

Render takes a string and renders text based on it's configuration.

func (*Renderer) Sanitize

func (r *Renderer) Sanitize(text string) string

Sanitize uses a SanitizerFunc or returns the original string if it's nil.

func (*Renderer) Wrap

func (r *Renderer) Wrap(text string) string

Wrap uses a WrapperFunc or returns the original string if it's nil.

func (*Renderer) Write

func (r *Renderer) Write(w io.Writer, text string) (int, error)

Write takes a writer and renders text based on it's configuration.

type ResolvedColumnType

type ResolvedColumnType struct {
	SQLType        string
	ValueConverter func(interface{}) (interface{}, error)
}

ResolvedColumnType represents the result of successfully mapping a Column to SQL DDL and a function that can be used to convert a Tuple element into a type that is appropriate for the driver.

type Resource

type Resource interface {
	// Validate returns an error if the Resource is malformed.
	Validate() error
	// Path returns the fully qualified name of the resource, as '.'-separated components.
	Path() ResourcePath
	// DeltaUpdates is true if the resource should be materialized using delta updates.
	DeltaUpdates() bool
}

Resource is a driver-provided type which represents the SQL resource (for example, a table) bound to by a binding.

type ResourcePath

type ResourcePath []string

ResourcePath is '.'-separated path components of a fully qualified database resource.

func (ResourcePath) Join

func (p ResourcePath) Join() string

Join the ResourcePath into a '.'-separated string.

type StdEndpoint

type StdEndpoint struct {
	// contains filtered or unexported fields
}

StdEndpoint is the *database/sql.DB standard implementation of an endpoint.

func NewStdEndpoint

func NewStdEndpoint(config interface{}, db *sql.DB, generator Generator, flowTables FlowTables) *StdEndpoint

NewStdEndpoint composes a new StdEndpoint suitable for sql.DB compatible databases.

func (*StdEndpoint) Config

func (e *StdEndpoint) Config() interface{}

Config returns the endpoint's config value.

func (*StdEndpoint) CreateTableStatement

func (e *StdEndpoint) CreateTableStatement(table *Table) (string, error)

CreateTableStatement generates a CREATE TABLE statement for the given table. The returned statement must not contain any parameter placeholders.

func (*StdEndpoint) DB

func (e *StdEndpoint) DB() *sql.DB

DB returns the embedded *sql.DB.

func (*StdEndpoint) ExecuteStatements

func (e *StdEndpoint) ExecuteStatements(ctx context.Context, statements []string) error

ExecuteStatements executes all of the statements provided in a single transaction.

func (*StdEndpoint) FlowTables

func (e *StdEndpoint) FlowTables() *FlowTables

FlowTables returns the Flow Tables configurations.

func (*StdEndpoint) Generator

func (e *StdEndpoint) Generator() *Generator

Generator returns the SQL generator.

func (*StdEndpoint) LoadSpec

func (e *StdEndpoint) LoadSpec(ctx context.Context, materialization pf.Materialization) (version string, _ *pf.MaterializationSpec, _ error)

LoadSpec loads the named MaterializationSpec and its version that's stored within the Endpoint, if any.

func (*StdEndpoint) NewFence

func (e *StdEndpoint) NewFence(ctx context.Context, materialization pf.Materialization, keyBegin, keyEnd uint32) (Fence, error)

NewStdFence installs and returns a new *StdFence. On return, all older fences of this |shardFqn| have been fenced off from committing further transactions.

type StdFence

type StdFence struct {
	// contains filtered or unexported fields
}

StdFence is an installed barrier in a shared checkpoints table which prevents other sessions from committing transactions under the fenced ID -- and prevents this Fence from committing where another session has in turn fenced this instance off. This implementation of the Fence interface is for standard *sql.DB compatable databases.

func (*StdFence) Checkpoint

func (f *StdFence) Checkpoint() []byte

Checkpoint returns the current checkpoint.

func (*StdFence) LogEntry

func (f *StdFence) LogEntry() *log.Entry

LogEntry returns a log.Entry with pre-set fields that identify the Shard ID and Fence.

func (*StdFence) SetCheckpoint

func (f *StdFence) SetCheckpoint(checkpoint []byte)

SetCheckpoint sets the current checkpoint.

func (*StdFence) Update

func (f *StdFence) Update(ctx context.Context, execFn ExecFn) error

Update the fence and its Checkpoint, returning an error if this Fence has in turn been fenced off by another. Update takes a ExecFn callback which should be scoped to a database transaction, such as sql.Tx or a database-specific transaction implementation.

type StringTypeInfo

type StringTypeInfo struct {
	Format      string
	ContentType string
	MaxLength   uint32
}

StringTypeInfo holds optional additional type information for string columns

type StringTypeMapping

type StringTypeMapping struct {
	Default       TypeMapper
	ByFormat      map[string]TypeMapper
	ByContentType map[string]TypeMapper
}

StringTypeMapping is a special TypeMapper for string type columns, which can take the format and/or content type into account when deciding what sql column type to generate.

func (StringTypeMapping) GetColumnType

func (mapping StringTypeMapping) GetColumnType(col *Column) (*ResolvedColumnType, error)

GetColumnType implements the TypeMapper interface

type Table

type Table struct {
	// The Name of the table before sanitization and quoting.
	Name string
	// Identifier is the final form of the table name, exactly as it should be represented in SQL
	// statements. If quoting is necessary, then the quotes must be included here.
	Identifier string
	// Optional Comment to add to create table statements.
	Comment string
	// The complete list of columns that should be created for the table and used in insert statements. This does
	// not need to include "automatic" columns (e.g. rowid), but only columns that should be
	// explicitly created and inserted into.
	Columns []Column
	// If IfNotExists is true then the create table statement will include an "IF NOT EXISTS" (or
	// equivalent).
	IfNotExists bool

	// Whether this is a temporary table. If true, then this will be created with the "TEMP(ORARY)" keyword.
	Temporary bool
	// If this is a temporary table, then this may optionally specify the ON COMMIT behavior. If
	// left blank, then no "ON COMMIT" clause will be added.
	TempOnCommit string
}

Table describes a database table, which can be used to generate various types of SQL statements.

func FlowCheckpointsTable

func FlowCheckpointsTable(name string) *Table

FlowCheckpointsTable returns the Table description for the table that holds the checkpoint and nonce values for each materialization shard.

func FlowMaterializationsTable

func FlowMaterializationsTable(name string) *Table

FlowMaterializationsTable returns the Table description for the table that holds the MaterializationSpec that corresponds to each target table. This state is used both for sql generation and for validation.

func TableForMaterialization

func TableForMaterialization(name string, comment string, identifierRenderer *Renderer, spec *pf.MaterializationSpec_Binding) *Table

TableForMaterialization converts a MaterializationSpec into the Table representation that's used by Generator. This assumes that the MaterializationSpec has already been validated to ensure that each projection has exactly one type besides "null".

func (Table) GetColumn

func (t Table) GetColumn(name string) *Column

GetColumn returns the Column with the given Name (not to be confused with Identifier). This can be used, for example, to map from the Name to the Identifier for a column.

type TokenPair

type TokenPair struct {
	Left  string
	Right string
}

TokenPair is a generic way of representing strings that can be used to surround some text for quoting and commenting.

func NewTokenPair

func NewTokenPair(l, r string) *TokenPair

NewTokenPair returns a TokenPair with the left and right tokens specified.

func (*TokenPair) Wrap

func (p *TokenPair) Wrap(text string) string

Wrap returns the given string surrounded by the strings in this TokenPair.

func (*TokenPair) Write

func (p *TokenPair) Write(w io.Writer, text string) (int, error)

Write takes an io.Writer and writes out the wrapped text. This function is leveraged for writing comments.

type TypeMapper

type TypeMapper interface {
	// GetColumnType resolves a Column to a specific SQL type. For example, for all "string"
	// type Columns, it may return the "TEXT" sql type. An implementation may take into account as
	// much or as little information as it wants to about a particular column, and some may not
	// inspect the column at all.
	GetColumnType(column *Column) (*ResolvedColumnType, error)
}

A TypeMapper resolves a Column to a specific base SQL type. For example, for all "string" type Columns, it may return the "TEXT" sql type. We use a decorator pattern to compose TypeMappers.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL