Documentation ¶
Index ¶
- Variables
- func CleanSQL(conn Connection, sql string) string
- func ColumnNames(columns []iop.Column) (colNames []string)
- func CommonColumns(colNames1 []string, colNames2 []string) (commCols []string)
- func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
- func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
- func GetSlingEnv() map[string]string
- func SQLColumns(colTypes []*sql.ColumnType, NativeTypeMap map[string]string) (columns []iop.Column)
- func SplitTableFullName(tableName string) (string, string)
- func TestPermissions(conn Connection, tableName string) (err error)
- type BaseConn
- func (conn *BaseConn) BaseURL() string
- func (conn *BaseConn) Begin(options ...*sql.TxOptions) (err error)
- func (conn *BaseConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportFlowCSV(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *BaseConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) string
- func (conn *BaseConn) CastColumnsForSelect(srcColumns []iop.Column, tgtColumns []iop.Column) []string
- func (conn *BaseConn) Close() error
- func (conn *BaseConn) Commit() (err error)
- func (conn *BaseConn) CompareChecksums(tableName string, columns []iop.Column) (err error)
- func (conn *BaseConn) Connect(timeOut ...int) (err error)
- func (conn *BaseConn) Context() *g.Context
- func (conn *BaseConn) Db() *sqlx.DB
- func (conn *BaseConn) DropTable(tableNames ...string) (err error)
- func (conn *BaseConn) DropView(viewNames ...string) (err error)
- func (conn *BaseConn) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) GenerateDDL(tableFName string, data iop.Dataset) (string, error)
- func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
- func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns []iop.Column, err error)
- func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns []iop.Column, err error)
- func (conn *BaseConn) GetColumnsFull(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetCount(tableFName string) (uint64, error)
- func (conn *BaseConn) GetDDL(tableFName string) (string, error)
- func (conn *BaseConn) GetGormConn(config *gorm.Config) (*gorm.DB, error)
- func (conn *BaseConn) GetIndexes(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetObjects(schema string, objectType string) (iop.Dataset, error)
- func (conn *BaseConn) GetPrimaryKeys(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetProp(key string) string
- func (conn *BaseConn) GetSQLColumns(sqls ...string) (columns []iop.Column, err error)
- func (conn *BaseConn) GetSchemaObjects(schemaName string) (Schema, error)
- func (conn *BaseConn) GetSchemas() (iop.Dataset, error)
- func (conn *BaseConn) GetTables(schema string) (iop.Dataset, error)
- func (conn *BaseConn) GetTemplateValue(path string) (value string)
- func (conn *BaseConn) GetType() dbio.Type
- func (conn *BaseConn) GetURL(newURL ...string) string
- func (conn *BaseConn) GetViews(schema string) (iop.Dataset, error)
- func (conn *BaseConn) Import(data iop.Dataset, tableName string) error
- func (conn *BaseConn) Init() (err error)
- func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) Kill() error
- func (conn *BaseConn) LoadTemplates() error
- func (conn *BaseConn) MustExec(sql string, args ...interface{}) (result sql.Result)
- func (conn *BaseConn) OptimizeTable(tableName string, newColStats []iop.Column) (err error)
- func (conn *BaseConn) Prepare(ctx context.Context, query string) (stmt *sql.Stmt, err error)
- func (conn *BaseConn) PropArr() []string
- func (conn *BaseConn) Props() map[string]string
- func (conn *BaseConn) Query(sql string, limit ...int) (data iop.Dataset, err error)
- func (conn *BaseConn) QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error)
- func (conn *BaseConn) Quote(field string) string
- func (conn *BaseConn) Rollback() (err error)
- func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (iop.Dataset, error)
- func (conn *BaseConn) RunAnalysisField(analysisName string, tableFName string, fields ...string) (iop.Dataset, error)
- func (conn *BaseConn) RunAnalysisTable(analysisName string, tableFNames ...string) (iop.Dataset, error)
- func (conn *BaseConn) Schemata() Schemata
- func (conn *BaseConn) Self() Connection
- func (conn *BaseConn) SetProp(key string, val string)
- func (conn *BaseConn) StreamRecords(sql string) (<-chan map[string]interface{}, error)
- func (conn *BaseConn) StreamRows(sql string, limit ...int) (ds *iop.Datastream, err error)
- func (conn *BaseConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
- func (conn *BaseConn) SwapTable(srcTable string, tgtTable string) (err error)
- func (conn *BaseConn) TableExists(tableFName string) (exists bool, err error)
- func (conn *BaseConn) Template() Template
- func (conn *BaseConn) Tx() *sqlx.Tx
- func (conn *BaseConn) Unquote(field string) string
- func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
- func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)
- type BigQueryConn
- func (conn *BigQueryConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) Close() error
- func (conn *BigQueryConn) Connect(timeOut ...int) error
- func (conn *BigQueryConn) CopyFromGCS(gcsURI string, tableFName string, dsColumns []iop.Column) error
- func (conn *BigQueryConn) CopyToGCS(tableFName string, gcsURI string) error
- func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BigQueryConn) Init() error
- func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
- func (conn *BigQueryConn) Unload(sqls ...string) (gsPath string, err error)
- func (conn *BigQueryConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
- type Connection
- type MsSQLServerConn
- func (conn *MsSQLServerConn) BcpExport() (err error)
- func (conn *MsSQLServerConn) BcpImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) BcpImportStreamParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
- func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) GetURL(newURL ...string) string
- func (conn *MsSQLServerConn) Init() error
- func (conn *MsSQLServerConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
- type MySQLConn
- func (conn *MySQLConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) GetURL(newURL ...string) string
- func (conn *MySQLConn) Init() error
- func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) LoadDataOutFile(sql string) (stdOutReader io.Reader, err error)
- func (conn *MySQLConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
- type OracleConn
- func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *OracleConn) Init() error
- func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *OracleConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
- type Pool
- type PostgresConn
- func (conn *PostgresConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)
- func (conn *PostgresConn) Init() error
- func (conn *PostgresConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
- type RedshiftConn
- func (conn *RedshiftConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *RedshiftConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string) (count uint64, err error)
- func (conn *RedshiftConn) Init() error
- func (conn *RedshiftConn) Unload(sqls ...string) (s3Path string, err error)
- func (conn *RedshiftConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
- type SQLiteConn
- type Schema
- type Schemata
- type SnowflakeConn
- func (conn *SnowflakeConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *SnowflakeConn) Connect(timeOut ...int) error
- func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
- func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
- func (conn *SnowflakeConn) CopyToAzure(sqls ...string) (azPath string, err error)
- func (conn *SnowflakeConn) CopyToS3(sqls ...string) (s3Path string, err error)
- func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) GetSchemas() (iop.Dataset, error)
- func (conn *SnowflakeConn) GetTables(schema string) (iop.Dataset, error)
- func (conn *SnowflakeConn) Init() error
- func (conn *SnowflakeConn) PutFile(fPath string, internalStagePath string) (err error)
- func (conn *SnowflakeConn) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error)
- type Table
- type Template
Constants ¶
This section is empty.
Variables ¶
var ( // UseBulkExportFlowCSV to use BulkExportFlowCSV UseBulkExportFlowCSV = false SampleSize = 900 )
Functions ¶
func CleanSQL ¶
func CleanSQL(conn Connection, sql string) string
CleanSQL removes creds from the query
func ColumnNames ¶
ColumnNames return column names of columns array
func CommonColumns ¶
CommonColumns return common columns
func CopyFromAzure ¶
func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func CopyFromS3 ¶
func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
func SQLColumns ¶
SQLColumns returns the columns from database ColumnType
func SplitTableFullName ¶
SplitTableFullName retrusn the schema / table name
func TestPermissions ¶
func TestPermissions(conn Connection, tableName string) (err error)
TestPermissions tests the needed permissions in a given connection
Types ¶
type BaseConn ¶
type BaseConn struct { Connection URL string Type dbio.Type // the type of database for sqlx: postgres, mysql, sqlite Data iop.Dataset // contains filtered or unexported fields }
BaseConn is a database connection
func (*BaseConn) BulkExportFlow ¶
BulkExportFlow creates a dataflow from a sql query
func (*BaseConn) BulkExportFlowCSV ¶
BulkExportFlowCSV creates a dataflow from a sql query, using CSVs
func (*BaseConn) BulkExportStream ¶
func (conn *BaseConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream streams the rows in bulk
func (*BaseConn) BulkImportFlow ¶
BulkImportFlow imports the streams rows in bulk concurrently using channels
func (*BaseConn) BulkImportStream ¶
func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream import the stream rows in bulk
func (*BaseConn) CastColumnForSelect ¶
CastColumnForSelect casts to the correct target column type
func (*BaseConn) CastColumnsForSelect ¶
func (conn *BaseConn) CastColumnsForSelect(srcColumns []iop.Column, tgtColumns []iop.Column) []string
CastColumnsForSelect cast the source columns into the target Column types
func (*BaseConn) CompareChecksums ¶
CompareChecksums compares the checksum values from the database side to the checkum values from the StreamProcessor
func (*BaseConn) ExecContext ¶
func (conn *BaseConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BaseConn) GenerateDDL ¶
GenerateDDL genrate a DDL based on a dataset
func (*BaseConn) GenerateInsertStatement ¶
func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*BaseConn) GenerateUpsertExpressions ¶
func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
GenerateUpsertExpressions returns a map with needed expressions
func (*BaseConn) GetColumnStats ¶
func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns []iop.Column, err error)
GetColumnStats analyzes the table and returns the column statistics
func (*BaseConn) GetColumns ¶
func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns []iop.Column, err error)
GetColumns returns columns for given table. `tableFName` should include schema and table, example: `schema1.table2` fields should be `column_name|data_type`
func (*BaseConn) GetColumnsFull ¶
GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`
func (*BaseConn) GetGormConn ¶
GetGormConn returns the gorm db connection
func (*BaseConn) GetIndexes ¶
GetIndexes returns indexes for given table.
func (*BaseConn) GetObjects ¶
GetObjects returns objects (tables or views) for given schema `objectType` can be either 'table', 'view' or 'all'
func (*BaseConn) GetPrimaryKeys ¶
GetPrimaryKeys returns primark keys for given table.
func (*BaseConn) GetSQLColumns ¶
GetSQLColumns return columns from a sql query result
func (*BaseConn) GetSchemaObjects ¶
GetSchemaObjects obtain full schemata info
func (*BaseConn) GetSchemas ¶
GetSchemas returns schemas
func (*BaseConn) GetTemplateValue ¶
GetTemplateValue returns the value of the path
func (*BaseConn) InsertBatchStream ¶
func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BaseConn) InsertStream ¶
InsertStream inserts a stream into a table
func (*BaseConn) LoadTemplates ¶
LoadTemplates loads the appropriate yaml template
func (*BaseConn) MustExec ¶
MustExec execs the query using e and panics if there was an error. Any placeholder parameters are replaced with supplied args.
func (*BaseConn) OptimizeTable ¶
OptimizeTable analyzes the table and alters the table with the columns data type based on its analysis result if table is missing, it is created with a new DDl Hole in this: will truncate data points, since it is based only on new data being inserted... would need a complete stats of the target table to properly optimize.
func (*BaseConn) QueryContext ¶
func (conn *BaseConn) QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error)
QueryContext runs a sql query with ctx, returns `result`, `error`
func (*BaseConn) RunAnalysis ¶
func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (iop.Dataset, error)
RunAnalysis runs an analysis
func (*BaseConn) RunAnalysisField ¶
func (conn *BaseConn) RunAnalysisField(analysisName string, tableFName string, fields ...string) (iop.Dataset, error)
RunAnalysisField runs a field level analysis
func (*BaseConn) RunAnalysisTable ¶
func (conn *BaseConn) RunAnalysisTable(analysisName string, tableFNames ...string) (iop.Dataset, error)
RunAnalysisTable runs a table level analysis
func (*BaseConn) Self ¶
func (conn *BaseConn) Self() Connection
Self returns the respective connection Instance This is useful to refer back to a subclass method from the superclass level. (Aka overloading)
func (*BaseConn) StreamRecords ¶
StreamRecords the records of a sql query, returns `result`, `error`
func (*BaseConn) StreamRows ¶
StreamRows the rows of a sql query, returns `result`, `error`
func (*BaseConn) StreamRowsContext ¶
func (conn *BaseConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`
func (*BaseConn) TableExists ¶
TableExists returns true if the table exists
func (*BaseConn) Upsert ¶
func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types
func (*BaseConn) ValidateColumnNames ¶
func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)
ValidateColumnNames verifies that source fields are present in the target table It will return quoted field names as `newColNames`, the same length as `colNames`
type BigQueryConn ¶
type BigQueryConn struct { BaseConn URL string Client *bigquery.Client ProjectID string DatasetID string Location string Datasets []string }
BigQueryConn is a Google Big Query connection
func (*BigQueryConn) BulkExportFlow ¶
func (conn *BigQueryConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*BigQueryConn) BulkImportFlow ¶
func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*BigQueryConn) BulkImportStream ¶
func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) Connect ¶
func (conn *BigQueryConn) Connect(timeOut ...int) error
Connect connects to the database
func (*BigQueryConn) CopyFromGCS ¶
func (conn *BigQueryConn) CopyFromGCS(gcsURI string, tableFName string, dsColumns []iop.Column) error
CopyFromGCS into bigquery from google storage
func (*BigQueryConn) CopyToGCS ¶
func (conn *BigQueryConn) CopyToGCS(tableFName string, gcsURI string) error
CopyToGCS Copy table to gc storage
func (*BigQueryConn) ExecContext ¶
func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BigQueryConn) InsertBatchStream ¶
func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BigQueryConn) InsertStream ¶
func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) StreamRowsContext ¶
func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`
type Connection ¶
type Connection interface { Self() Connection Init() error Connect(timeOut ...int) error Kill() error Close() error GetType() dbio.Type GetGormConn(config *gorm.Config) (*gorm.DB, error) LoadTemplates() error GetURL(newURL ...string) string StreamRows(sql string, limit ...int) (*iop.Datastream, error) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error) BulkExportStream(sql string) (*iop.Datastream, error) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error) BulkExportFlow(sqls ...string) (*iop.Dataflow, error) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) Begin(options ...*sql.TxOptions) error Commit() error Rollback() error Query(sql string, limit ...int) (iop.Dataset, error) QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error) GenerateDDL(tableFName string, data iop.Dataset) (string, error) Quote(string) string Unquote(string) string GenerateInsertStatement(tableName string, fields []string, numRows int) string GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error) DropTable(...string) error DropView(...string) error InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error) Db() *sqlx.DB Tx() *sqlx.Tx Exec(sql string, args ...interface{}) (result sql.Result, err error) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) MustExec(sql string, args ...interface{}) (result sql.Result) Schemata() Schemata Template() Template SetProp(string, string) GetProp(string) string PropsArr() []string Props() map[string]string GetTemplateValue(path string) (value string) Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error) SwapTable(srcTable string, tgtTable string) (err error) RenameTable(table string, newTable string) (err error) Context() *g.Context StreamRecords(sql string) (<-chan map[string]interface{}, error) GetDDL(string) (string, error) GetSchemaObjects(string) (Schema, error) GetSchemas() (iop.Dataset, error) GetTables(string) (iop.Dataset, error) GetViews(string) (iop.Dataset, error) GetSQLColumns(sqls ...string) (columns []iop.Column, err error) TableExists(tableFName string) (exists bool, err error) GetColumns(tableFName string, fields ...string) ([]iop.Column, error) GetColumnStats(tableName string, fields ...string) (columns []iop.Column, err error) GetPrimaryKeys(string) (iop.Dataset, error) GetIndexes(string) (iop.Dataset, error) GetColumnsFull(string) (iop.Dataset, error) GetCount(string) (uint64, error) RunAnalysis(string, map[string]interface{}) (iop.Dataset, error) RunAnalysisTable(string, ...string) (iop.Dataset, error) RunAnalysisField(string, string, ...string) (iop.Dataset, error) CastColumnForSelect(srcColumn iop.Column, tgtColumn iop.Column) string CastColumnsForSelect(srcColumns []iop.Column, tgtColumns []iop.Column) []string ValidateColumnNames(tgtColName []string, colNames []string, quote bool) (newColNames []string, err error) OptimizeTable(tableName string, columns []iop.Column) (err error) CompareChecksums(tableName string, columns []iop.Column) (err error) BaseURL() string // contains filtered or unexported methods }
Connection is the Base interface for Connections
func NewConn ¶
func NewConn(URL string, props ...string) (Connection, error)
NewConn return the most proper connection for a given database
func NewConnContext ¶
NewConnContext return the most proper connection for a given database with context props are provided as `"Prop1=Value1", "Prop2=Value2", ...`
type MsSQLServerConn ¶
MsSQLServerConn is a Microsoft SQL Server connection
func (*MsSQLServerConn) BcpExport ¶
func (conn *MsSQLServerConn) BcpExport() (err error)
BcpExport exports data to datastream
func (*MsSQLServerConn) BcpImportStream ¶
func (conn *MsSQLServerConn) BcpImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BcpImportStream Import using bcp tool https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15 bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 Limitation: if comma or delimite is in field, it will error. need to use delimiter not in field, or do some other transformation
func (*MsSQLServerConn) BcpImportStreamParrallel ¶
func (conn *MsSQLServerConn) BcpImportStreamParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
BcpImportStreamParrallel uses goroutine to import partitioned files
func (*MsSQLServerConn) BulkImportFlow ¶
func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*MsSQLServerConn) BulkImportStream ¶
func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MsSQLServerConn) CopyFromAzure ¶
func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
CopyFromAzure uses the COPY INTO Table command from Azure https://docs.microsoft.com/en-us/sql/t-sql/statements/copy-into-transact-sql?view=azure-sqldw-latest
func (*MsSQLServerConn) CopyViaAzure ¶
func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Azure DWH COPY INTO Table command
func (*MsSQLServerConn) GetURL ¶
func (conn *MsSQLServerConn) GetURL(newURL ...string) string
GetURL returns the processed URL
type MySQLConn ¶
MySQLConn is a Postgres connection
func (*MySQLConn) BulkExportStream ¶
func (conn *MySQLConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream bulk Export
func (*MySQLConn) BulkImportStream ¶
func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MySQLConn) LoadDataInFile ¶
func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
LoadDataInFile Bulk Import
func (*MySQLConn) LoadDataOutFile ¶
LoadDataOutFile Bulk Export Possible error: ERROR 1227 (42000) at line 1: Access denied; you need (at least one of) the FILE privilege(s) for this operation File privilege needs to be granted to user also the --secure-file-priv option needs to be set properly for it to work. https://stackoverflow.com/questions/9819271/why-is-mysql-innodb-insert-so-slow to improve innodb insert speed
type OracleConn ¶
OracleConn is a Postgres connection
func (*OracleConn) BulkImportStream ¶
func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*OracleConn) GenerateInsertStatement ¶
func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*OracleConn) SQLLoad ¶
func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
SQLLoad uses sqlldr to Bulk Import cat test1.csv | sqlldr system/oracle@oracle.host:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr cannot import when newline in value. Need to scan for new lines.
type PostgresConn ¶
PostgresConn is a Postgres connection
func (*PostgresConn) BulkExportStream ¶
func (conn *PostgresConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream uses the bulk dumping (COPY)
func (*PostgresConn) BulkImportStream ¶
func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*PostgresConn) CastColumnForSelect ¶
func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*PostgresConn) CopyToStdout ¶
func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)
CopyToStdout Copy TO STDOUT
type RedshiftConn ¶
RedshiftConn is a Redshift connection
func (*RedshiftConn) BulkExportFlow ¶
func (conn *RedshiftConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*RedshiftConn) BulkExportStream ¶
func (conn *RedshiftConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream reads in bulk
func (*RedshiftConn) BulkImportFlow ¶
func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) BulkImportStream ¶
func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) CopyFromS3 ¶
func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string) (count uint64, err error)
CopyFromS3 uses the COPY INTO Table command from AWS S3
type SQLiteConn ¶
SQLiteConn is a Google Big Query connection
type Schemata ¶
type Schemata struct { Schemas map[string]Schema Tables map[string]*Table // all tables with full name lower case (schema.table) }
Schemata contains the full schema for a connection
type SnowflakeConn ¶
SnowflakeConn is a Snowflake connection
func (*SnowflakeConn) BulkExportFlow ¶
func (conn *SnowflakeConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*SnowflakeConn) BulkImportFlow ¶
func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*SnowflakeConn) BulkImportStream ¶
func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*SnowflakeConn) Connect ¶
func (conn *SnowflakeConn) Connect(timeOut ...int) error
Connect connects to the database
func (*SnowflakeConn) CopyFromAzure ¶
func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyFromS3 ¶
func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
CopyFromS3 uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyToAzure ¶
func (conn *SnowflakeConn) CopyToAzure(sqls ...string) (azPath string, err error)
CopyToAzure exports a query to an Azure location
func (*SnowflakeConn) CopyToS3 ¶
func (conn *SnowflakeConn) CopyToS3(sqls ...string) (s3Path string, err error)
CopyToS3 exports a query to an S3 location
func (*SnowflakeConn) CopyViaAWS ¶
func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAWS uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyViaAzure ¶
func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyViaStage ¶
func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaStage uses the Snowflake COPY INTO Table command https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) GetSchemas ¶
func (conn *SnowflakeConn) GetSchemas() (iop.Dataset, error)
GetSchemas returns schemas
func (*SnowflakeConn) GetTables ¶
func (conn *SnowflakeConn) GetTables(schema string) (iop.Dataset, error)
GetTables returns tables for given schema
type Table ¶
type Table struct { Name string `json:"name"` FullName string `json:"full_name"` IsView bool `json:"is_view"` // whether is a view Columns []iop.Column ColumnsMap map[string]*iop.Column }
Table represents a schemata table
type Template ¶
type Template struct { Core map[string]string Metadata map[string]string Analysis map[string]string Function map[string]string `yaml:"function"` GeneralTypeMap map[string]string `yaml:"general_type_map"` NativeTypeMap map[string]string `yaml:"native_type_map"` NativeStatsMap map[string]bool `yaml:"native_stat_map"` Variable map[string]string }
Template is a database YAML template