database

package
v0.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2021 License: GPL-3.0 Imports: 44 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// UseBulkExportFlowCSV to use BulkExportFlowCSV
	UseBulkExportFlowCSV = false

	SampleSize = 900
)

Functions

func CleanSQL

func CleanSQL(conn Connection, sql string) string

CleanSQL removes creds from the query

func ColumnNames

func ColumnNames(columns []iop.Column) (colNames []string)

ColumnNames return column names of columns array

func CommonColumns

func CommonColumns(colNames1 []string, colNames2 []string) (commCols []string)

CommonColumns return common columns

func CopyFromAzure

func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)

CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func CopyFromS3

func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)

func GetSlingEnv

func GetSlingEnv() map[string]string

GetSlingEnv return sling Env Data

func SQLColumns

func SQLColumns(colTypes []*sql.ColumnType, NativeTypeMap map[string]string) (columns []iop.Column)

SQLColumns returns the columns from database ColumnType

func SplitTableFullName

func SplitTableFullName(tableName string) (string, string)

SplitTableFullName retrusn the schema / table name

func TestPermissions

func TestPermissions(conn Connection, tableName string) (err error)

TestPermissions tests the needed permissions in a given connection

Types

type BaseConn

type BaseConn struct {
	Connection
	URL  string
	Type dbio.Type // the type of database for sqlx: postgres, mysql, sqlite

	Data iop.Dataset
	// contains filtered or unexported fields
}

BaseConn is a database connection

func (*BaseConn) BaseURL

func (conn *BaseConn) BaseURL() string

BaseURL returns the base URL with default port

func (*BaseConn) Begin

func (conn *BaseConn) Begin(options ...*sql.TxOptions) (err error)

Begin starts a connection wide transaction

func (*BaseConn) BulkExportFlow

func (conn *BaseConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlow creates a dataflow from a sql query

func (*BaseConn) BulkExportFlowCSV

func (conn *BaseConn) BulkExportFlowCSV(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlowCSV creates a dataflow from a sql query, using CSVs

func (*BaseConn) BulkExportStream

func (conn *BaseConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream streams the rows in bulk

func (*BaseConn) BulkImportFlow

func (conn *BaseConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow imports the streams rows in bulk concurrently using channels

func (*BaseConn) BulkImportStream

func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream import the stream rows in bulk

func (*BaseConn) CastColumnForSelect

func (conn *BaseConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) string

CastColumnForSelect casts to the correct target column type

func (*BaseConn) CastColumnsForSelect

func (conn *BaseConn) CastColumnsForSelect(srcColumns []iop.Column, tgtColumns []iop.Column) []string

CastColumnsForSelect cast the source columns into the target Column types

func (*BaseConn) Close

func (conn *BaseConn) Close() error

Close closes the connection

func (*BaseConn) Commit

func (conn *BaseConn) Commit() (err error)

Commit commits connection wide transaction

func (*BaseConn) CompareChecksums

func (conn *BaseConn) CompareChecksums(tableName string, columns []iop.Column) (err error)

CompareChecksums compares the checksum values from the database side to the checkum values from the StreamProcessor

func (*BaseConn) Connect

func (conn *BaseConn) Connect(timeOut ...int) (err error)

Connect connects to the database

func (*BaseConn) Context

func (conn *BaseConn) Context() *g.Context

Context returns the db context

func (*BaseConn) Db

func (conn *BaseConn) Db() *sqlx.DB

Db returns the sqlx db object

func (*BaseConn) DropTable

func (conn *BaseConn) DropTable(tableNames ...string) (err error)

DropTable drops given table.

func (*BaseConn) DropView

func (conn *BaseConn) DropView(viewNames ...string) (err error)

DropView drops given view.

func (*BaseConn) Exec

func (conn *BaseConn) Exec(sql string, args ...interface{}) (result sql.Result, err error)

Exec runs a sql query, returns `error`

func (*BaseConn) ExecContext

func (conn *BaseConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)

ExecContext runs a sql query with context, returns `error`

func (*BaseConn) GenerateDDL

func (conn *BaseConn) GenerateDDL(tableFName string, data iop.Dataset) (string, error)

GenerateDDL genrate a DDL based on a dataset

func (*BaseConn) GenerateInsertStatement

func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string

GenerateInsertStatement returns the proper INSERT statement

func (*BaseConn) GenerateUpsertExpressions

func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)

GenerateUpsertExpressions returns a map with needed expressions

func (*BaseConn) GetColumnStats

func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns []iop.Column, err error)

GetColumnStats analyzes the table and returns the column statistics

func (*BaseConn) GetColumns

func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns []iop.Column, err error)

GetColumns returns columns for given table. `tableFName` should include schema and table, example: `schema1.table2` fields should be `column_name|data_type`

func (*BaseConn) GetColumnsFull

func (conn *BaseConn) GetColumnsFull(tableFName string) (iop.Dataset, error)

GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`

func (*BaseConn) GetCount

func (conn *BaseConn) GetCount(tableFName string) (uint64, error)

GetCount returns count of records

func (*BaseConn) GetDDL

func (conn *BaseConn) GetDDL(tableFName string) (string, error)

GetDDL returns DDL for given table.

func (*BaseConn) GetGormConn

func (conn *BaseConn) GetGormConn(config *gorm.Config) (*gorm.DB, error)

GetGormConn returns the gorm db connection

func (*BaseConn) GetIndexes

func (conn *BaseConn) GetIndexes(tableFName string) (iop.Dataset, error)

GetIndexes returns indexes for given table.

func (*BaseConn) GetObjects

func (conn *BaseConn) GetObjects(schema string, objectType string) (iop.Dataset, error)

GetObjects returns objects (tables or views) for given schema `objectType` can be either 'table', 'view' or 'all'

func (*BaseConn) GetPrimaryKeys

func (conn *BaseConn) GetPrimaryKeys(tableFName string) (iop.Dataset, error)

GetPrimaryKeys returns primark keys for given table.

func (*BaseConn) GetProp

func (conn *BaseConn) GetProp(key string) string

GetProp returns the value of a property

func (*BaseConn) GetSQLColumns

func (conn *BaseConn) GetSQLColumns(sqls ...string) (columns []iop.Column, err error)

GetSQLColumns return columns from a sql query result

func (*BaseConn) GetSchemaObjects

func (conn *BaseConn) GetSchemaObjects(schemaName string) (Schema, error)

GetSchemaObjects obtain full schemata info

func (*BaseConn) GetSchemas

func (conn *BaseConn) GetSchemas() (iop.Dataset, error)

GetSchemas returns schemas

func (*BaseConn) GetTables

func (conn *BaseConn) GetTables(schema string) (iop.Dataset, error)

GetTables returns tables for given schema

func (*BaseConn) GetTemplateValue

func (conn *BaseConn) GetTemplateValue(path string) (value string)

GetTemplateValue returns the value of the path

func (*BaseConn) GetType

func (conn *BaseConn) GetType() dbio.Type

GetType returns the type db object

func (*BaseConn) GetURL

func (conn *BaseConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*BaseConn) GetViews

func (conn *BaseConn) GetViews(schema string) (iop.Dataset, error)

GetViews returns views for given schema

func (*BaseConn) Import

func (conn *BaseConn) Import(data iop.Dataset, tableName string) error

Import imports `data` into `tableName`

func (*BaseConn) Init

func (conn *BaseConn) Init() (err error)

Init initiates the connection object & add default port if missing

func (*BaseConn) InsertBatchStream

func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*BaseConn) InsertStream

func (conn *BaseConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream inserts a stream into a table

func (*BaseConn) Kill

func (conn *BaseConn) Kill() error

Kill kill the database connection

func (*BaseConn) LoadTemplates

func (conn *BaseConn) LoadTemplates() error

LoadTemplates loads the appropriate yaml template

func (*BaseConn) MustExec

func (conn *BaseConn) MustExec(sql string, args ...interface{}) (result sql.Result)

MustExec execs the query using e and panics if there was an error. Any placeholder parameters are replaced with supplied args.

func (*BaseConn) OptimizeTable

func (conn *BaseConn) OptimizeTable(tableName string, newColStats []iop.Column) (err error)

OptimizeTable analyzes the table and alters the table with the columns data type based on its analysis result if table is missing, it is created with a new DDl Hole in this: will truncate data points, since it is based only on new data being inserted... would need a complete stats of the target table to properly optimize.

func (*BaseConn) Prepare

func (conn *BaseConn) Prepare(ctx context.Context, query string) (stmt *sql.Stmt, err error)

Prepare prepares the statement

func (*BaseConn) PropArr

func (conn *BaseConn) PropArr() []string

PropArr returns an array of properties

func (*BaseConn) Props

func (conn *BaseConn) Props() map[string]string

Props returns a map properties

func (*BaseConn) Query

func (conn *BaseConn) Query(sql string, limit ...int) (data iop.Dataset, err error)

Query runs a sql query, returns `result`, `error`

func (*BaseConn) QueryContext

func (conn *BaseConn) QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error)

QueryContext runs a sql query with ctx, returns `result`, `error`

func (*BaseConn) Quote

func (conn *BaseConn) Quote(field string) string

Quote adds quotes to the field name

func (*BaseConn) Rollback

func (conn *BaseConn) Rollback() (err error)

Rollback rolls back connection wide transaction

func (*BaseConn) RunAnalysis

func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (iop.Dataset, error)

RunAnalysis runs an analysis

func (*BaseConn) RunAnalysisField

func (conn *BaseConn) RunAnalysisField(analysisName string, tableFName string, fields ...string) (iop.Dataset, error)

RunAnalysisField runs a field level analysis

func (*BaseConn) RunAnalysisTable

func (conn *BaseConn) RunAnalysisTable(analysisName string, tableFNames ...string) (iop.Dataset, error)

RunAnalysisTable runs a table level analysis

func (*BaseConn) Schemata

func (conn *BaseConn) Schemata() Schemata

Schemata returns the Schemata object

func (*BaseConn) Self

func (conn *BaseConn) Self() Connection

Self returns the respective connection Instance This is useful to refer back to a subclass method from the superclass level. (Aka overloading)

func (*BaseConn) SetProp

func (conn *BaseConn) SetProp(key string, val string)

SetProp sets the value of a property

func (*BaseConn) StreamRecords

func (conn *BaseConn) StreamRecords(sql string) (<-chan map[string]interface{}, error)

StreamRecords the records of a sql query, returns `result`, `error`

func (*BaseConn) StreamRows

func (conn *BaseConn) StreamRows(sql string, limit ...int) (ds *iop.Datastream, err error)

StreamRows the rows of a sql query, returns `result`, `error`

func (*BaseConn) StreamRowsContext

func (conn *BaseConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)

StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`

func (*BaseConn) SwapTable

func (conn *BaseConn) SwapTable(srcTable string, tgtTable string) (err error)

SwapTable swaps two table

func (*BaseConn) TableExists

func (conn *BaseConn) TableExists(tableFName string) (exists bool, err error)

TableExists returns true if the table exists

func (*BaseConn) Template

func (conn *BaseConn) Template() Template

Template returns the Template object

func (*BaseConn) Tx

func (conn *BaseConn) Tx() *sqlx.Tx

Tx returns the current sqlx tx object

func (*BaseConn) Unquote

func (conn *BaseConn) Unquote(field string) string

Unquote removes quotes to the field name

func (*BaseConn) Upsert

func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

func (*BaseConn) ValidateColumnNames

func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)

ValidateColumnNames verifies that source fields are present in the target table It will return quoted field names as `newColNames`, the same length as `colNames`

type BigQueryConn

type BigQueryConn struct {
	BaseConn
	URL       string
	Client    *bigquery.Client
	ProjectID string
	DatasetID string
	Location  string
	Datasets  []string
}

BigQueryConn is a Google Big Query connection

func (*BigQueryConn) BulkExportFlow

func (conn *BigQueryConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlow reads in bulk

func (*BigQueryConn) BulkImportFlow

func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.

func (*BigQueryConn) BulkImportStream

func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream demonstrates loading data into a BigQuery table using a file on the local filesystem.

func (*BigQueryConn) Close

func (conn *BigQueryConn) Close() error

Close closes the connection

func (*BigQueryConn) Connect

func (conn *BigQueryConn) Connect(timeOut ...int) error

Connect connects to the database

func (*BigQueryConn) CopyFromGCS

func (conn *BigQueryConn) CopyFromGCS(gcsURI string, tableFName string, dsColumns []iop.Column) error

CopyFromGCS into bigquery from google storage

func (*BigQueryConn) CopyToGCS

func (conn *BigQueryConn) CopyToGCS(tableFName string, gcsURI string) error

CopyToGCS Copy table to gc storage

func (*BigQueryConn) ExecContext

func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)

ExecContext runs a sql query with context, returns `error`

func (*BigQueryConn) Init

func (conn *BigQueryConn) Init() error

Init initiates the object

func (*BigQueryConn) InsertBatchStream

func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertBatchStream inserts a stream into a table in batch

func (*BigQueryConn) InsertStream

func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.

func (*BigQueryConn) StreamRowsContext

func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)

StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`

func (*BigQueryConn) Unload

func (conn *BigQueryConn) Unload(sqls ...string) (gsPath string, err error)

Unload to Google Cloud Storage

func (*BigQueryConn) Upsert

func (conn *BigQueryConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

type Connection

type Connection interface {
	Self() Connection
	Init() error
	Connect(timeOut ...int) error
	Kill() error
	Close() error
	GetType() dbio.Type
	GetGormConn(config *gorm.Config) (*gorm.DB, error)
	LoadTemplates() error
	GetURL(newURL ...string) string
	StreamRows(sql string, limit ...int) (*iop.Datastream, error)
	StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
	BulkExportStream(sql string) (*iop.Datastream, error)
	BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
	BulkExportFlow(sqls ...string) (*iop.Dataflow, error)
	BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
	Begin(options ...*sql.TxOptions) error
	Commit() error
	Rollback() error
	Query(sql string, limit ...int) (iop.Dataset, error)
	QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error)
	GenerateDDL(tableFName string, data iop.Dataset) (string, error)
	Quote(string) string
	Unquote(string) string
	GenerateInsertStatement(tableName string, fields []string, numRows int) string
	GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
	DropTable(...string) error
	DropView(...string) error
	InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
	InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
	Db() *sqlx.DB
	Tx() *sqlx.Tx
	Exec(sql string, args ...interface{}) (result sql.Result, err error)
	ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
	MustExec(sql string, args ...interface{}) (result sql.Result)
	Schemata() Schemata
	Template() Template
	SetProp(string, string)
	GetProp(string) string
	PropsArr() []string
	Props() map[string]string
	GetTemplateValue(path string) (value string)
	Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
	SwapTable(srcTable string, tgtTable string) (err error)
	RenameTable(table string, newTable string) (err error)
	Context() *g.Context

	StreamRecords(sql string) (<-chan map[string]interface{}, error)
	GetDDL(string) (string, error)
	GetSchemaObjects(string) (Schema, error)
	GetSchemas() (iop.Dataset, error)
	GetTables(string) (iop.Dataset, error)
	GetViews(string) (iop.Dataset, error)
	GetSQLColumns(sqls ...string) (columns []iop.Column, err error)
	TableExists(tableFName string) (exists bool, err error)
	GetColumns(tableFName string, fields ...string) ([]iop.Column, error)
	GetColumnStats(tableName string, fields ...string) (columns []iop.Column, err error)
	GetPrimaryKeys(string) (iop.Dataset, error)
	GetIndexes(string) (iop.Dataset, error)
	GetColumnsFull(string) (iop.Dataset, error)
	GetCount(string) (uint64, error)
	RunAnalysis(string, map[string]interface{}) (iop.Dataset, error)
	RunAnalysisTable(string, ...string) (iop.Dataset, error)
	RunAnalysisField(string, string, ...string) (iop.Dataset, error)
	CastColumnForSelect(srcColumn iop.Column, tgtColumn iop.Column) string
	CastColumnsForSelect(srcColumns []iop.Column, tgtColumns []iop.Column) []string
	ValidateColumnNames(tgtColName []string, colNames []string, quote bool) (newColNames []string, err error)
	OptimizeTable(tableName string, columns []iop.Column) (err error)
	CompareChecksums(tableName string, columns []iop.Column) (err error)
	BaseURL() string
	// contains filtered or unexported methods
}

Connection is the Base interface for Connections

func NewConn

func NewConn(URL string, props ...string) (Connection, error)

NewConn return the most proper connection for a given database

func NewConnContext

func NewConnContext(ctx context.Context, URL string, props ...string) (Connection, error)

NewConnContext return the most proper connection for a given database with context props are provided as `"Prop1=Value1", "Prop2=Value2", ...`

type MsSQLServerConn

type MsSQLServerConn struct {
	BaseConn
	URL string
	// contains filtered or unexported fields
}

MsSQLServerConn is a Microsoft SQL Server connection

func (*MsSQLServerConn) BcpExport

func (conn *MsSQLServerConn) BcpExport() (err error)

BcpExport exports data to datastream

func (*MsSQLServerConn) BcpImportStream

func (conn *MsSQLServerConn) BcpImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BcpImportStream Import using bcp tool https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15 bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 Limitation: if comma or delimite is in field, it will error. need to use delimiter not in field, or do some other transformation

func (*MsSQLServerConn) BcpImportStreamParrallel

func (conn *MsSQLServerConn) BcpImportStreamParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)

BcpImportStreamParrallel uses goroutine to import partitioned files

func (*MsSQLServerConn) BulkImportFlow

func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow bulk import flow

func (*MsSQLServerConn) BulkImportStream

func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*MsSQLServerConn) CopyFromAzure

func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)

CopyFromAzure uses the COPY INTO Table command from Azure https://docs.microsoft.com/en-us/sql/t-sql/statements/copy-into-transact-sql?view=azure-sqldw-latest

func (*MsSQLServerConn) CopyViaAzure

func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaAzure uses the Azure DWH COPY INTO Table command

func (*MsSQLServerConn) GetURL

func (conn *MsSQLServerConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*MsSQLServerConn) Init

func (conn *MsSQLServerConn) Init() error

Init initiates the object

func (*MsSQLServerConn) Upsert

func (conn *MsSQLServerConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

type MySQLConn

type MySQLConn struct {
	BaseConn
	URL string
}

MySQLConn is a Postgres connection

func (*MySQLConn) BulkExportStream

func (conn *MySQLConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream bulk Export

func (*MySQLConn) BulkImportStream

func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*MySQLConn) GetURL

func (conn *MySQLConn) GetURL(newURL ...string) string

GetURL returns the processed URL

func (*MySQLConn) Init

func (conn *MySQLConn) Init() error

Init initiates the object

func (*MySQLConn) LoadDataInFile

func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)

LoadDataInFile Bulk Import

func (*MySQLConn) LoadDataOutFile

func (conn *MySQLConn) LoadDataOutFile(sql string) (stdOutReader io.Reader, err error)

LoadDataOutFile Bulk Export Possible error: ERROR 1227 (42000) at line 1: Access denied; you need (at least one of) the FILE privilege(s) for this operation File privilege needs to be granted to user also the --secure-file-priv option needs to be set properly for it to work. https://stackoverflow.com/questions/9819271/why-is-mysql-innodb-insert-so-slow to improve innodb insert speed

func (*MySQLConn) Upsert

func (conn *MySQLConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

type OracleConn

type OracleConn struct {
	BaseConn
	URL string
}

OracleConn is a Postgres connection

func (*OracleConn) BulkImportStream

func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*OracleConn) GenerateInsertStatement

func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string

GenerateInsertStatement returns the proper INSERT statement

func (*OracleConn) Init

func (conn *OracleConn) Init() error

Init initiates the object

func (*OracleConn) SQLLoad

func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)

SQLLoad uses sqlldr to Bulk Import cat test1.csv | sqlldr system/oracle@oracle.host:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr cannot import when newline in value. Need to scan for new lines.

func (*OracleConn) Upsert

func (conn *OracleConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

type Pool

type Pool struct {
	Dbs map[string]*sqlx.DB
	Mux sync.Mutex
}

Pool is a pool of connections

type PostgresConn

type PostgresConn struct {
	BaseConn
	URL string
}

PostgresConn is a Postgres connection

func (*PostgresConn) BulkExportStream

func (conn *PostgresConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream uses the bulk dumping (COPY)

func (*PostgresConn) BulkImportStream

func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream inserts a stream into a table

func (*PostgresConn) CastColumnForSelect

func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)

CastColumnForSelect casts to the correct target column type

func (*PostgresConn) CopyToStdout

func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)

CopyToStdout Copy TO STDOUT

func (*PostgresConn) Init

func (conn *PostgresConn) Init() error

Init initiates the object

func (*PostgresConn) Upsert

func (conn *PostgresConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

type RedshiftConn

type RedshiftConn struct {
	BaseConn
	URL string
}

RedshiftConn is a Redshift connection

func (*RedshiftConn) BulkExportFlow

func (conn *RedshiftConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlow reads in bulk

func (*RedshiftConn) BulkExportStream

func (conn *RedshiftConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)

BulkExportStream reads in bulk

func (*RedshiftConn) BulkImportFlow

func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.

func (*RedshiftConn) BulkImportStream

func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream inserts a stream into a table. For redshift we need to create CSVs in S3 and then use the COPY command.

func (*RedshiftConn) CopyFromS3

func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string) (count uint64, err error)

CopyFromS3 uses the COPY INTO Table command from AWS S3

func (*RedshiftConn) Init

func (conn *RedshiftConn) Init() error

Init initiates the object

func (*RedshiftConn) Unload

func (conn *RedshiftConn) Unload(sqls ...string) (s3Path string, err error)

Unload unloads a query to S3

func (*RedshiftConn) Upsert

func (conn *RedshiftConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

type SQLiteConn

type SQLiteConn struct {
	BaseConn
	URL string
}

SQLiteConn is a Google Big Query connection

func (*SQLiteConn) Init

func (conn *SQLiteConn) Init() error

Init initiates the object

func (*SQLiteConn) Upsert

func (conn *SQLiteConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

type Schema

type Schema struct {
	Name   string `json:"name"`
	Tables map[string]Table
}

Schema represents a schemata schema

func (*Schema) ToData

func (schema *Schema) ToData() (data iop.Dataset)

ToData converts schema objects to tabular format

type Schemata

type Schemata struct {
	Schemas map[string]Schema
	Tables  map[string]*Table // all tables with full name lower case (schema.table)
}

Schemata contains the full schema for a connection

type SnowflakeConn

type SnowflakeConn struct {
	BaseConn
	URL        string
	Warehouse  string
	CopyMethod string
}

SnowflakeConn is a Snowflake connection

func (*SnowflakeConn) BulkExportFlow

func (conn *SnowflakeConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)

BulkExportFlow reads in bulk

func (*SnowflakeConn) BulkImportFlow

func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)

BulkImportFlow bulk import flow

func (*SnowflakeConn) BulkImportStream

func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)

BulkImportStream bulk import stream

func (*SnowflakeConn) Connect

func (conn *SnowflakeConn) Connect(timeOut ...int) error

Connect connects to the database

func (*SnowflakeConn) CopyFromAzure

func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)

CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyFromS3

func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)

CopyFromS3 uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyToAzure

func (conn *SnowflakeConn) CopyToAzure(sqls ...string) (azPath string, err error)

CopyToAzure exports a query to an Azure location

func (*SnowflakeConn) CopyToS3

func (conn *SnowflakeConn) CopyToS3(sqls ...string) (s3Path string, err error)

CopyToS3 exports a query to an S3 location

func (*SnowflakeConn) CopyViaAWS

func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaAWS uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyViaAzure

func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) CopyViaStage

func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)

CopyViaStage uses the Snowflake COPY INTO Table command https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html

func (*SnowflakeConn) GetSchemas

func (conn *SnowflakeConn) GetSchemas() (iop.Dataset, error)

GetSchemas returns schemas

func (*SnowflakeConn) GetTables

func (conn *SnowflakeConn) GetTables(schema string) (iop.Dataset, error)

GetTables returns tables for given schema

func (*SnowflakeConn) Init

func (conn *SnowflakeConn) Init() error

Init initiates the object

func (*SnowflakeConn) PutFile

func (conn *SnowflakeConn) PutFile(fPath string, internalStagePath string) (err error)

PutFile Copies a local file or folder into a staging location

func (*SnowflakeConn) Upsert

func (conn *SnowflakeConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)

Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types

type Table

type Table struct {
	Name       string `json:"name"`
	FullName   string `json:"full_name"`
	IsView     bool   `json:"is_view"` // whether is a view
	Columns    []iop.Column
	ColumnsMap map[string]*iop.Column
}

Table represents a schemata table

type Template

type Template struct {
	Core           map[string]string
	Metadata       map[string]string
	Analysis       map[string]string
	Function       map[string]string `yaml:"function"`
	GeneralTypeMap map[string]string `yaml:"general_type_map"`
	NativeTypeMap  map[string]string `yaml:"native_type_map"`
	NativeStatsMap map[string]bool   `yaml:"native_stat_map"`
	Variable       map[string]string
}

Template is a database YAML template

func (Template) ToData

func (template Template) ToData() (data iop.Dataset)

ToData convert is dataset

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL