Documentation ¶
Index ¶
- Constants
- Variables
- func AddMissingColumns(conn Connection, table Table, newCols iop.Columns) (ok bool, err error)
- func CleanSQL(conn Connection, sql string) string
- func CommonColumns(colNames1 []string, colNames2 []string) (commCols []string)
- func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
- func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
- func GenerateAlterDDL(conn Connection, table Table, newColumns iop.Columns) (bool, error)
- func GetQualifierQuote(dialect dbio.Type) string
- func InsertBatchStream(conn Connection, tx *BaseTransaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
- func InsertStream(conn Connection, tx *BaseTransaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
- func PK(obj interface{}) (pk []string)
- func ParseSQLMultiStatements(sql string) (sqls g.Strings)
- func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns)
- func SplitTableFullName(tableName string) (string, string)
- func TestPermissions(conn Connection, tableName string) (err error)
- func UID(obj interface{}) string
- func Upsert(conn Connection, tx Transaction, sourceTable, targetTable string, ...) (count int64, err error)
- type BaseConn
- func (conn *BaseConn) AddLog(text string)
- func (conn *BaseConn) Base() *BaseConn
- func (conn *BaseConn) BaseURL() string
- func (conn *BaseConn) Begin(options ...*sql.TxOptions) (err error)
- func (conn *BaseConn) BeginContext(ctx context.Context, options ...*sql.TxOptions) (err error)
- func (conn *BaseConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportFlowCSV(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *BaseConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) string
- func (conn *BaseConn) CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string
- func (conn *BaseConn) Close() error
- func (conn *BaseConn) Commit() (err error)
- func (conn *BaseConn) CompareChecksums(tableName string, columns iop.Columns) (err error)
- func (conn *BaseConn) ConnString() string
- func (conn *BaseConn) Connect(timeOut ...int) (err error)
- func (conn *BaseConn) Context() *g.Context
- func (conn *BaseConn) CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error)
- func (conn *BaseConn) CreateTemporaryTable(tableName string, cols iop.Columns) (err error)
- func (conn *BaseConn) CurrentDatabase() (dbName string, err error)
- func (conn *BaseConn) Db() *sqlx.DB
- func (conn *BaseConn) DbX() *DbX
- func (conn *BaseConn) DropTable(tableNames ...string) (err error)
- func (conn *BaseConn) DropView(viewNames ...string) (err error)
- func (conn *BaseConn) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecMulti(sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error)
- func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
- func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)
- func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetColumnsFull(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetCount(tableFName string) (uint64, error)
- func (conn *BaseConn) GetDDL(tableFName string) (string, error)
- func (conn *BaseConn) GetDatabases() (iop.Dataset, error)
- func (conn *BaseConn) GetGormConn(config *gorm.Config) (*gorm.DB, error)
- func (conn *BaseConn) GetIndexes(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetNativeType(col iop.Column) (nativeType string, err error)
- func (conn *BaseConn) GetObjects(schema string, objectType string) (iop.Dataset, error)
- func (conn *BaseConn) GetPrimaryKeys(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetProp(key string) string
- func (conn *BaseConn) GetSQLColumns(sqls ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetSchemas() (iop.Dataset, error)
- func (conn *BaseConn) GetSchemata(schemaName, tableName string) (Schemata, error)
- func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetTables(schema string) (iop.Dataset, error)
- func (conn *BaseConn) GetTemplateValue(path string) (value string)
- func (conn *BaseConn) GetType() dbio.Type
- func (conn *BaseConn) GetURL(newURL ...string) string
- func (conn *BaseConn) GetViews(schema string) (iop.Dataset, error)
- func (conn *BaseConn) Import(data iop.Dataset, tableName string) error
- func (conn *BaseConn) Info() (ci ConnInfo)
- func (conn *BaseConn) Init() (err error)
- func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) Kill() error
- func (conn *BaseConn) LoadTemplates() error
- func (conn *BaseConn) MustExec(sql string, args ...interface{}) (result sql.Result)
- func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
- func (conn *BaseConn) OptimizeTable(table *Table, newColumns iop.Columns) (ok bool, err error)
- func (conn *BaseConn) Prepare(query string) (stmt *sql.Stmt, err error)
- func (conn *BaseConn) ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error)
- func (conn *BaseConn) PropArr() []string
- func (conn *BaseConn) Props() map[string]string
- func (conn *BaseConn) Query(sql string, options ...map[string]interface{}) (data iop.Dataset, err error)
- func (conn *BaseConn) QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (iop.Dataset, error)
- func (conn *BaseConn) Quote(field string) string
- func (conn *BaseConn) Rollback() (err error)
- func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)
- func (conn *BaseConn) Schemata() Schemata
- func (conn *BaseConn) Self() Connection
- func (conn *BaseConn) SetProp(key string, val string)
- func (conn *BaseConn) StreamRecords(sql string) (<-chan map[string]interface{}, error)
- func (conn *BaseConn) StreamRows(sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
- func (conn *BaseConn) StreamRowsContext(ctx context.Context, query string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
- func (conn *BaseConn) SumbitTemplate(level string, templateMap map[string]string, name string, ...) (data iop.Dataset, err error)
- func (conn *BaseConn) SwapTable(srcTable string, tgtTable string) (err error)
- func (conn *BaseConn) TableExists(tableFName string) (exists bool, err error)
- func (conn *BaseConn) Template() Template
- func (conn *BaseConn) Tx() Transaction
- func (conn *BaseConn) Unquote(field string) string
- func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
- func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)
- type BaseTransaction
- func (t *BaseTransaction) Commit() (err error)
- func (t *BaseTransaction) Context() *g.Context
- func (t *BaseTransaction) DisableTrigger(tableName, triggerName string) (err error)
- func (t *BaseTransaction) EnableTrigger(tableName, triggerName string) (err error)
- func (t *BaseTransaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (t *BaseTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BaseTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BaseTransaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (t *BaseTransaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (t *BaseTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *BaseTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *BaseTransaction) Rollback() (err error)
- func (t *BaseTransaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)
- func (t *BaseTransaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)
- type BigQueryConn
- func (conn *BigQueryConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) Close() error
- func (conn *BigQueryConn) Connect(timeOut ...int) error
- func (conn *BigQueryConn) CopyFromGCS(gcsURI string, tableFName string, dsColumns []iop.Column) error
- func (conn *BigQueryConn) CopyFromLocal(localURI string, tableFName string, dsColumns []iop.Column) error
- func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error
- func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BigQueryConn) ExportToGCS(sql string, gcsURI string) error
- func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *BigQueryConn) GetDatabases() (iop.Dataset, error)
- func (conn *BigQueryConn) GetSchemas() (iop.Dataset, error)
- func (conn *BigQueryConn) GetSchemata(schemaName, tableName string) (Schemata, error)
- func (conn *BigQueryConn) Init() error
- func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) LoadCSVFromReader(tableFName string, reader io.Reader, dsColumns []iop.Column) error
- func (conn *BigQueryConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
- func (conn *BigQueryConn) Unload(sqls ...string) (gsPath string, err error)
- type BigTableAction
- type BigTableConn
- func (conn *BigTableConn) BulkExportFlow(tableNames ...string) (df *iop.Dataflow, err error)
- func (conn *BigTableConn) Close() error
- func (conn *BigTableConn) Connect(timeOut ...int) error
- func (conn *BigTableConn) ExecContext(ctx context.Context, payload string, args ...interface{}) (result sql.Result, err error)
- func (conn *BigTableConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BigTableConn) GetSQLColumns(sqls ...string) (columns iop.Columns, err error)
- func (conn *BigTableConn) GetSchemata(schemaName, tableName string) (schemata Schemata, err error)
- func (conn *BigTableConn) GetTables(schema string) (data iop.Dataset, err error)
- func (conn *BigTableConn) GetViews(schema string) (data iop.Dataset, err error)
- func (conn *BigTableConn) Init() error
- func (conn *BigTableConn) InsertBatchStream(table string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigTableConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *BigTableConn) StreamRowsContext(ctx context.Context, table string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
- type BigTableQuery
- type BlankTransaction
- func (t *BlankTransaction) Commit() (err error)
- func (t *BlankTransaction) Context() *g.Context
- func (t *BlankTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BlankTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BlankTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *BlankTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *BlankTransaction) Rollback() (err error)
- type ClickhouseConn
- func (conn *ClickhouseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *ClickhouseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *ClickhouseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *ClickhouseConn) Init() error
- func (conn *ClickhouseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
- type ColumnType
- type ConnInfo
- type Connection
- type DataAnalyzer
- func (da *DataAnalyzer) AnalyzeColumns(sampleSize int) (err error)
- func (da *DataAnalyzer) GetManyToMany(nonUniqueCols iop.Columns) (err error)
- func (da *DataAnalyzer) GetOneToMany(uniqueCols, nonUniqueCols iop.Columns) (err error)
- func (da *DataAnalyzer) GetOneToOne(uniqueCols iop.Columns) (err error)
- func (da *DataAnalyzer) GetSchemata(force bool) (err error)
- func (da *DataAnalyzer) ProcessRelations() (err error)
- type DataAnalyzerOptions
- type Database
- type DbX
- func (x *DbX) Delete(o interface{}) (cnt int, err error)
- func (x *DbX) Get(o interface{}, fields ...string) (err error)
- func (x *DbX) Insert(o interface{}, fields ...string) (err error)
- func (x *DbX) Select(o interface{}, fields ...string) (err error)
- func (x *DbX) TableName(o interface{}) (name string)
- func (x *DbX) Update(o interface{}, fields ...string) (cnt int, err error)
- func (x *DbX) Upsert(o interface{}, fields ...string) (cnt int, err error)
- func (x *DbX) Where(where ...interface{}) *DbX
- type ManualTransaction
- func (t *ManualTransaction) Commit() (err error)
- func (t *ManualTransaction) Context() *g.Context
- func (t *ManualTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *ManualTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *ManualTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *ManualTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *ManualTransaction) Rollback() (err error)
- type ModelDbX
- func (m *ModelDbX) Bind(bindFunc func(p interface{}) error, objPtr interface{}) (err error)
- func (m *ModelDbX) Delete(db *sqlx.DB) (err error)
- func (m *ModelDbX) Fields() (fields []string)
- func (m *ModelDbX) Get(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Insert(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Rec() map[string]interface{}
- func (m *ModelDbX) Select(db *sqlx.DB, objPtr interface{}, fields ...string) (err error)
- func (m *ModelDbX) TableName(objPtr interface{}) string
- func (m *ModelDbX) Update(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Values(fields []string) (values []interface{}, err error)
- func (m *ModelDbX) Where(where ...interface{}) *ModelDbX
- type MsSQLServerConn
- func (conn *MsSQLServerConn) BcpExport() (err error)
- func (conn *MsSQLServerConn) BcpImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) BcpImportStreamParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
- func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *MsSQLServerConn) GetURL(newURL ...string) string
- func (conn *MsSQLServerConn) Init() error
- type MySQLConn
- func (conn *MySQLConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *MySQLConn) GetURL(newURL ...string) string
- func (conn *MySQLConn) Init() error
- func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) LoadDataOutFile(sql string) (stdOutReader io.Reader, err error)
- type OracleConn
- func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *OracleConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *OracleConn) Init() error
- func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
- type Pool
- type PostgresConn
- func (conn *PostgresConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)
- func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *PostgresConn) Init() error
- type RedshiftConn
- func (conn *RedshiftConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *RedshiftConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *RedshiftConn) ConnString() string
- func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string) (count uint64, err error)
- func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *RedshiftConn) Init() error
- func (conn *RedshiftConn) Unload(sqls ...string) (s3Path string, err error)
- type Relation
- type Result
- type SQLiteConn
- type Schema
- type Schemata
- type SnowflakeConn
- func (conn *SnowflakeConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *SnowflakeConn) ConnString() string
- func (conn *SnowflakeConn) Connect(timeOut ...int) error
- func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
- func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
- func (conn *SnowflakeConn) CopyToAzure(sqls ...string) (azPath string, err error)
- func (conn *SnowflakeConn) CopyToS3(sqls ...string) (s3Path string, err error)
- func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *SnowflakeConn) GetColumnsFull(tableFName string) (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetDatabases() (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetFile(internalStagePath, fPath string) (err error)
- func (conn *SnowflakeConn) Init() error
- func (conn *SnowflakeConn) PutFile(fPath string, internalStagePath string) (err error)
- func (conn *SnowflakeConn) UnloadViaStage(sqls ...string) (filePath string, err error)
- type StatFieldSQL
- type Table
- type Template
- type Transaction
- type User
- type WhereClause
Constants ¶
const RelationManyToMany = "many_to_many"
const RelationManyToOne = "many_to_one"
const RelationOneToMany = "one_to_many"
const RelationOneToOne = "one_to_one"
Variables ¶
var ( // UseBulkExportFlowCSV to use BulkExportFlowCSV UseBulkExportFlowCSV = false SampleSize = 900 )
var Debug = false
Debug prints queries when true
var InferDBStream = false
InferDBStream may need to be `true`, since precision and scale is not guaranteed. If `false`, will use the database stream source schema
Functions ¶
func AddMissingColumns ¶ added in v0.3.66
func CleanSQL ¶
func CleanSQL(conn Connection, sql string) string
CleanSQL removes creds from the query
func CommonColumns ¶
CommonColumns return common columns
func CopyFromAzure ¶
func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func CopyFromS3 ¶
func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
func GenerateAlterDDL ¶ added in v0.3.111
GenerateDDL genrate a DDL based on a dataset
func GetQualifierQuote ¶ added in v0.3.111
func InsertBatchStream ¶ added in v0.0.5
func InsertBatchStream(conn Connection, tx *BaseTransaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func InsertStream ¶ added in v0.0.5
func InsertStream(conn Connection, tx *BaseTransaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream inserts a stream
func PK ¶ added in v0.0.5
func PK(obj interface{}) (pk []string)
PK returns the primary keys of a model
func ParseSQLMultiStatements ¶ added in v0.0.5
ParseSQLMultiStatements splits a sql text into statements typically by a ';'
func SQLColumns ¶
func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns)
SQLColumns returns the columns from database ColumnType
func SplitTableFullName ¶
SplitTableFullName retrusn the schema / table name
func TestPermissions ¶
func TestPermissions(conn Connection, tableName string) (err error)
TestPermissions tests the needed permissions in a given connection
func Upsert ¶ added in v0.0.5
func Upsert(conn Connection, tx Transaction, sourceTable, targetTable string, pkFields []string) (count int64, err error)
Upsert upserts from source table into target table
Types ¶
type BaseConn ¶
type BaseConn struct { Connection URL string Type dbio.Type // the type of database for sqlx: postgres, mysql, sqlite Data iop.Dataset Log []string // contains filtered or unexported fields }
BaseConn is a database connection
func (*BaseConn) BeginContext ¶ added in v0.0.5
BeginContext starts a connection wide transaction
func (*BaseConn) BulkExportFlow ¶
BulkExportFlow creates a dataflow from a sql query
func (*BaseConn) BulkExportFlowCSV ¶
BulkExportFlowCSV creates a dataflow from a sql query, using CSVs
func (*BaseConn) BulkExportStream ¶
func (conn *BaseConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream streams the rows in bulk
func (*BaseConn) BulkImportFlow ¶
BulkImportFlow imports the streams rows in bulk concurrently using channels
func (*BaseConn) BulkImportStream ¶
func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream import the stream rows in bulk
func (*BaseConn) CastColumnForSelect ¶
CastColumnForSelect casts to the correct target column type
func (*BaseConn) CastColumnsForSelect ¶
CastColumnsForSelect cast the source columns into the target Column types
func (*BaseConn) CompareChecksums ¶
CompareChecksums compares the checksum values from the database side to the checkum values from the StreamProcessor
func (*BaseConn) ConnString ¶ added in v0.2.0
ConnString returns the connection string needed for connection
func (*BaseConn) CreateTable ¶ added in v0.0.5
CreateTable creates a new table based on provided columns `tableName` should have 'schema.table' format
func (*BaseConn) CreateTemporaryTable ¶ added in v0.0.5
CreateTemporaryTable creates a temp table based on provided columns
func (*BaseConn) CurrentDatabase ¶ added in v0.2.0
CurrentDatabase returns the name of the current database
func (*BaseConn) ExecContext ¶
func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BaseConn) ExecMultiContext ¶ added in v0.0.5
func (conn *BaseConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*BaseConn) GenerateDDL ¶
func (conn *BaseConn) GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error)
GenerateDDL genrate a DDL based on a dataset
func (*BaseConn) GenerateInsertStatement ¶
func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*BaseConn) GenerateUpsertExpressions ¶
func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
GenerateUpsertExpressions returns a map with needed expressions
func (*BaseConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL returns a sql for upsert
func (*BaseConn) GetAnalysis ¶ added in v0.0.5
func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)
GetAnalysis runs an analysis
func (*BaseConn) GetColumnStats ¶
func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
GetColumnStats analyzes the table and returns the column statistics
func (*BaseConn) GetColumns ¶
func (*BaseConn) GetColumnsFull ¶
GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`
func (*BaseConn) GetDatabases ¶ added in v0.2.0
GetDatabases returns databases for given connection
func (*BaseConn) GetGormConn ¶
GetGormConn returns the gorm db connection
func (*BaseConn) GetIndexes ¶
GetIndexes returns indexes for given table.
func (*BaseConn) GetNativeType ¶ added in v0.0.5
GetNativeType returns the native column type from generic
func (*BaseConn) GetObjects ¶
GetObjects returns objects (tables or views) for given schema `objectType` can be either 'table', 'view' or 'all'
func (*BaseConn) GetPrimaryKeys ¶
GetPrimaryKeys returns primark keys for given table.
func (*BaseConn) GetSQLColumns ¶
GetSQLColumns return columns from a sql query result
func (*BaseConn) GetSchemas ¶
GetSchemas returns schemas
func (*BaseConn) GetSchemata ¶ added in v0.1.0
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*BaseConn) GetTableColumns ¶ added in v0.3.111
func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
GetColumns returns columns for given table. `tableFName` should include schema and table, example: `schema1.table2` fields should be `column_name|data_type`
func (*BaseConn) GetTemplateValue ¶
GetTemplateValue returns the value of the path
func (*BaseConn) InsertBatchStream ¶
func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BaseConn) InsertStream ¶
InsertStream inserts a stream into a table
func (*BaseConn) LoadTemplates ¶
LoadTemplates loads the appropriate yaml template
func (*BaseConn) MustExec ¶
MustExec execs the query using e and panics if there was an error. Any placeholder parameters are replaced with supplied args.
func (*BaseConn) NewTransaction ¶ added in v0.0.5
func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
NewTransaction creates a new transaction
func (*BaseConn) OptimizeTable ¶
OptimizeTable analyzes the table and alters the table with the columns data type based on its analysis result if table is missing, it is created with a new DDl Hole in this: will truncate data points, since it is based only on new data being inserted... would need a complete stats of the target table to properly optimize.
func (*BaseConn) ProcessTemplate ¶ added in v0.1.0
func (conn *BaseConn) ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error)
ProcessTemplate processes a template SQL text at a given level
func (*BaseConn) Query ¶
func (conn *BaseConn) Query(sql string, options ...map[string]interface{}) (data iop.Dataset, err error)
Query runs a sql query, returns `result`, `error`
func (*BaseConn) QueryContext ¶
func (conn *BaseConn) QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (iop.Dataset, error)
QueryContext runs a sql query with ctx, returns `result`, `error`
func (*BaseConn) RunAnalysis ¶
func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)
RunAnalysis runs an analysis
func (*BaseConn) Self ¶
func (conn *BaseConn) Self() Connection
Self returns the respective connection Instance This is useful to refer back to a subclass method from the superclass level. (Aka overloading)
func (*BaseConn) StreamRecords ¶
StreamRecords the records of a sql query, returns `result`, `error`
func (*BaseConn) StreamRows ¶
func (conn *BaseConn) StreamRows(sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
StreamRows the rows of a sql query, returns `result`, `error`
func (*BaseConn) StreamRowsContext ¶
func (conn *BaseConn) StreamRowsContext(ctx context.Context, query string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`
func (*BaseConn) SumbitTemplate ¶ added in v0.1.0
func (*BaseConn) TableExists ¶
TableExists returns true if the table exists
func (*BaseConn) Upsert ¶
func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types
func (*BaseConn) ValidateColumnNames ¶
func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)
ValidateColumnNames verifies that source fields are present in the target table It will return quoted field names as `newColNames`, the same length as `colNames`
type BaseTransaction ¶ added in v0.3.82
type BaseTransaction struct { Tx *sqlx.Tx Conn Connection // contains filtered or unexported fields }
BaseTransaction is a database transaction
func (*BaseTransaction) Commit ¶ added in v0.3.82
func (t *BaseTransaction) Commit() (err error)
func (*BaseTransaction) Context ¶ added in v0.3.82
func (t *BaseTransaction) Context() *g.Context
Commit commits connection wide transaction
func (*BaseTransaction) DisableTrigger ¶ added in v0.3.82
func (t *BaseTransaction) DisableTrigger(tableName, triggerName string) (err error)
DisableTrigger disables a trigger
func (*BaseTransaction) EnableTrigger ¶ added in v0.3.82
func (t *BaseTransaction) EnableTrigger(tableName, triggerName string) (err error)
EnableTrigger enables a trigger
func (*BaseTransaction) Exec ¶ added in v0.3.82
func (t *BaseTransaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)
Exec runs a sql query, returns `error`
func (*BaseTransaction) ExecContext ¶ added in v0.3.82
func (t *BaseTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BaseTransaction) ExecMultiContext ¶ added in v0.3.82
func (t *BaseTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*BaseTransaction) InsertBatchStream ¶ added in v0.3.82
func (t *BaseTransaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BaseTransaction) InsertStream ¶ added in v0.3.82
func (t *BaseTransaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream inserts a stream into a table
func (*BaseTransaction) Prepare ¶ added in v0.3.82
func (t *BaseTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
Prepare prepares the statement
func (*BaseTransaction) QueryContext ¶ added in v0.3.82
func (t *BaseTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
QueryContext queries rows
func (*BaseTransaction) Rollback ¶ added in v0.3.82
func (t *BaseTransaction) Rollback() (err error)
Rollback rolls back connection wide transaction
func (*BaseTransaction) Upsert ¶ added in v0.3.82
func (t *BaseTransaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)
Upsert does an upsert from source table into target table
func (*BaseTransaction) UpsertStream ¶ added in v0.3.82
func (t *BaseTransaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)
UpsertStream inserts a stream into a table in batch
type BigQueryConn ¶
type BigQueryConn struct { BaseConn URL string Client *bigquery.Client ProjectID string DatasetID string Location string Datasets []string }
BigQueryConn is a Google Big Query connection
func (*BigQueryConn) BulkExportFlow ¶
func (conn *BigQueryConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*BigQueryConn) BulkImportFlow ¶
func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*BigQueryConn) BulkImportStream ¶
func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) Connect ¶
func (conn *BigQueryConn) Connect(timeOut ...int) error
Connect connects to the database
func (*BigQueryConn) CopyFromGCS ¶
func (*BigQueryConn) CopyFromLocal ¶ added in v0.3.121
func (conn *BigQueryConn) CopyFromLocal(localURI string, tableFName string, dsColumns []iop.Column) error
CopyFromGCS into bigquery from google storage
func (*BigQueryConn) CopyToGCS ¶
func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error
func (*BigQueryConn) ExecContext ¶
func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BigQueryConn) ExportToGCS ¶ added in v0.3.122
func (conn *BigQueryConn) ExportToGCS(sql string, gcsURI string) error
CopyToGCS Copy table to gc storage
func (*BigQueryConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*BigQueryConn) GetDatabases ¶ added in v0.2.9
func (conn *BigQueryConn) GetDatabases() (iop.Dataset, error)
GetDatabases returns databases
func (*BigQueryConn) GetSchemas ¶ added in v0.2.9
func (conn *BigQueryConn) GetSchemas() (iop.Dataset, error)
GetSchemas returns schemas
func (*BigQueryConn) GetSchemata ¶ added in v0.2.9
func (conn *BigQueryConn) GetSchemata(schemaName, tableName string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*BigQueryConn) InsertBatchStream ¶
func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BigQueryConn) InsertStream ¶
func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) LoadCSVFromReader ¶ added in v0.3.120
func (conn *BigQueryConn) LoadCSVFromReader(tableFName string, reader io.Reader, dsColumns []iop.Column) error
LoadCSVFromReader demonstrates loading data into a BigQuery table using a file on the local filesystem. https://cloud.google.com/bigquery/docs/batch-loading-data#loading_data_from_local_files
func (*BigQueryConn) NewTransaction ¶ added in v0.3.82
func (conn *BigQueryConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*BigQueryConn) StreamRowsContext ¶
func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
type BigTableAction ¶ added in v0.3.126
type BigTableAction string
const BTCreateColumnFamily BigTableAction = "create_column_family"
const BTCreateTable BigTableAction = "create_table"
const BTDeleteTable BigTableAction = "delete_table"
const BTTableInfo BigTableAction = "table_info"
type BigTableConn ¶ added in v0.3.126
type BigTableConn struct { BaseConn URL string Client *bigtable.Client ProjectID string InstanceID string Location string }
BigTableConn is a Google Big Query connection
func (*BigTableConn) BulkExportFlow ¶ added in v0.3.126
func (conn *BigTableConn) BulkExportFlow(tableNames ...string) (df *iop.Dataflow, err error)
func (*BigTableConn) Close ¶ added in v0.3.126
func (conn *BigTableConn) Close() error
Close closes the connection
func (*BigTableConn) Connect ¶ added in v0.3.126
func (conn *BigTableConn) Connect(timeOut ...int) error
Connect connects to the database
func (*BigTableConn) ExecContext ¶ added in v0.3.126
func (*BigTableConn) GetColumns ¶ added in v0.3.126
func (*BigTableConn) GetSQLColumns ¶ added in v0.3.126
func (conn *BigTableConn) GetSQLColumns(sqls ...string) (columns iop.Columns, err error)
GetTables returns tables for given schema
func (*BigTableConn) GetSchemata ¶ added in v0.3.128
func (conn *BigTableConn) GetSchemata(schemaName, tableName string) (schemata Schemata, err error)
func (*BigTableConn) GetTables ¶ added in v0.3.126
func (conn *BigTableConn) GetTables(schema string) (data iop.Dataset, err error)
func (*BigTableConn) GetViews ¶ added in v0.3.126
func (conn *BigTableConn) GetViews(schema string) (data iop.Dataset, err error)
GetTables returns tables for given schema
func (*BigTableConn) Init ¶ added in v0.3.126
func (conn *BigTableConn) Init() error
Init initiates the object
func (*BigTableConn) InsertBatchStream ¶ added in v0.3.126
func (conn *BigTableConn) InsertBatchStream(table string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BigTableConn) NewTransaction ¶ added in v0.3.126
func (conn *BigTableConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*BigTableConn) StreamRowsContext ¶ added in v0.3.126
func (conn *BigTableConn) StreamRowsContext(ctx context.Context, table string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
type BigTableQuery ¶ added in v0.3.126
type BigTableQuery struct { Action BigTableAction `json:"action"` Table string `json:"table"` ColumnFamilies []string `json:"column_family"` }
type BlankTransaction ¶ added in v0.3.82
type BlankTransaction struct { Conn Connection // contains filtered or unexported fields }
func (*BlankTransaction) Commit ¶ added in v0.3.82
func (t *BlankTransaction) Commit() (err error)
func (*BlankTransaction) Context ¶ added in v0.3.82
func (t *BlankTransaction) Context() *g.Context
func (*BlankTransaction) ExecContext ¶ added in v0.3.82
func (*BlankTransaction) ExecMultiContext ¶ added in v0.3.82
func (*BlankTransaction) Prepare ¶ added in v0.3.82
func (t *BlankTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
func (*BlankTransaction) QueryContext ¶ added in v0.3.82
func (*BlankTransaction) Rollback ¶ added in v0.3.82
func (t *BlankTransaction) Rollback() (err error)
type ClickhouseConn ¶ added in v0.1.0
ClickhouseConn is a Clikchouse connection
func (*ClickhouseConn) BulkImportStream ¶ added in v0.3.118
func (conn *ClickhouseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*ClickhouseConn) GenerateInsertStatement ¶ added in v0.3.118
func (conn *ClickhouseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*ClickhouseConn) GenerateUpsertSQL ¶ added in v0.1.0
func (conn *ClickhouseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*ClickhouseConn) Init ¶ added in v0.1.0
func (conn *ClickhouseConn) Init() error
Init initiates the object
func (*ClickhouseConn) NewTransaction ¶ added in v0.3.84
func (conn *ClickhouseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
NewTransaction creates a new transaction
type ColumnType ¶ added in v0.3.84
type Connection ¶
type Connection interface { BaseURL() string ConnString() string Begin(options ...*sql.TxOptions) error BeginContext(ctx context.Context, options ...*sql.TxOptions) error BulkExportFlow(sqls ...string) (*iop.Dataflow, error) BulkExportStream(sql string) (*iop.Datastream, error) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error) CastColumnForSelect(srcColumn iop.Column, tgtColumn iop.Column) string CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string Close() error Commit() error CompareChecksums(tableName string, columns iop.Columns) (err error) Connect(timeOut ...int) error Context() *g.Context CreateTemporaryTable(tableName string, cols iop.Columns) (err error) CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error) Db() *sqlx.DB DbX() *DbX DropTable(...string) error DropView(...string) error ExecMulti(sql string, args ...interface{}) (result sql.Result, err error) ExecMultiContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) Exec(sql string, args ...interface{}) (result sql.Result, err error) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error) GenerateInsertStatement(tableName string, fields []string, numRows int) string GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error) GetColumns(tableFName string, fields ...string) (iop.Columns, error) GetColumnsFull(string) (iop.Dataset, error) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error) GetCount(string) (uint64, error) GetDDL(string) (string, error) GetGormConn(config *gorm.Config) (*gorm.DB, error) GetIndexes(string) (iop.Dataset, error) GetNativeType(col iop.Column) (nativeType string, err error) GetPrimaryKeys(string) (iop.Dataset, error) GetProp(string) string GetSchemata(schemaName, tableName string) (Schemata, error) CurrentDatabase() (string, error) GetDatabases() (iop.Dataset, error) GetSchemas() (iop.Dataset, error) GetSQLColumns(sqls ...string) (columns iop.Columns, err error) GetTables(string) (iop.Dataset, error) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error) GetTemplateValue(path string) (value string) GetType() dbio.Type GetURL(newURL ...string) string GetViews(string) (iop.Dataset, error) Init() error InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error) Kill() error LoadTemplates() error MustExec(sql string, args ...interface{}) (result sql.Result) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error) OptimizeTable(table *Table, columns iop.Columns) (ok bool, err error) Prepare(query string) (stmt *sql.Stmt, err error) Props() map[string]string PropsArr() []string Query(sql string, options ...map[string]interface{}) (iop.Dataset, error) QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (iop.Dataset, error) Quote(field string) string RenameTable(table string, newTable string) (err error) Rollback() error ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error) RunAnalysis(string, map[string]interface{}) (iop.Dataset, error) GetAnalysis(string, map[string]interface{}) (string, error) Schemata() Schemata Self() Connection SetProp(string, string) StreamRecords(sql string) (<-chan map[string]interface{}, error) StreamRows(sql string, options ...map[string]interface{}) (*iop.Datastream, error) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error) SwapTable(srcTable string, tgtTable string) (err error) TableExists(tableFName string) (exists bool, err error) Template() Template SumbitTemplate(level string, templateMap map[string]string, name string, values map[string]interface{}) (data iop.Dataset, err error) Tx() Transaction Unquote(string) string Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error) ValidateColumnNames(tgtColName []string, colNames []string, quote bool) (newColNames []string, err error) Base() *BaseConn Info() ConnInfo // contains filtered or unexported methods }
Connection is the Base interface for Connections
func Clone ¶ added in v0.3.111
func Clone(conn Connection) (newConn Connection, err error)
func NewConn ¶
func NewConn(URL string, props ...string) (Connection, error)
NewConn return the most proper connection for a given database
func NewConnContext ¶
NewConnContext return the most proper connection for a given database with context props are provided as `"Prop1=Value1", "Prop2=Value2", ...`
type DataAnalyzer ¶ added in v0.3.65
type DataAnalyzer struct { Conn Connection Schemata Schemata ColumnMap map[string]iop.Column RelationMap map[string]map[string]map[string]Relation // table > column A > column B > relation Options DataAnalyzerOptions }
func NewDataAnalyzer ¶ added in v0.3.65
func NewDataAnalyzer(conn Connection, opts DataAnalyzerOptions) (da *DataAnalyzer, err error)
func (*DataAnalyzer) AnalyzeColumns ¶ added in v0.3.65
func (da *DataAnalyzer) AnalyzeColumns(sampleSize int) (err error)
func (*DataAnalyzer) GetManyToMany ¶ added in v0.3.65
func (da *DataAnalyzer) GetManyToMany(nonUniqueCols iop.Columns) (err error)
func (*DataAnalyzer) GetOneToMany ¶ added in v0.3.65
func (da *DataAnalyzer) GetOneToMany(uniqueCols, nonUniqueCols iop.Columns) (err error)
func (*DataAnalyzer) GetOneToOne ¶ added in v0.3.65
func (da *DataAnalyzer) GetOneToOne(uniqueCols iop.Columns) (err error)
func (*DataAnalyzer) GetSchemata ¶ added in v0.3.65
func (da *DataAnalyzer) GetSchemata(force bool) (err error)
func (*DataAnalyzer) ProcessRelations ¶ added in v0.3.65
func (da *DataAnalyzer) ProcessRelations() (err error)
type DataAnalyzerOptions ¶ added in v0.3.65
type DbX ¶ added in v0.0.5
type DbX struct {
// contains filtered or unexported fields
}
DbX is db express
type ManualTransaction ¶ added in v0.3.82
type ManualTransaction struct { Conn Connection // contains filtered or unexported fields }
func (*ManualTransaction) Commit ¶ added in v0.3.82
func (t *ManualTransaction) Commit() (err error)
func (*ManualTransaction) Context ¶ added in v0.3.82
func (t *ManualTransaction) Context() *g.Context
func (*ManualTransaction) ExecContext ¶ added in v0.3.82
func (*ManualTransaction) ExecMultiContext ¶ added in v0.3.82
func (*ManualTransaction) Prepare ¶ added in v0.3.82
func (t *ManualTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
func (*ManualTransaction) QueryContext ¶ added in v0.3.82
func (*ManualTransaction) Rollback ¶ added in v0.3.82
func (t *ManualTransaction) Rollback() (err error)
type ModelDbX ¶ added in v0.0.5
type ModelDbX struct { Ptr interface{} `json:"-"` RowsAffected int `json:"-"` // contains filtered or unexported fields }
ModelDbX is the base for any SQL model
func (*ModelDbX) TableName ¶ added in v0.0.5
TableName returns the table name of the underlying pointer
type MsSQLServerConn ¶
MsSQLServerConn is a Microsoft SQL Server connection
func (*MsSQLServerConn) BcpExport ¶
func (conn *MsSQLServerConn) BcpExport() (err error)
BcpExport exports data to datastream
func (*MsSQLServerConn) BcpImportStream ¶
func (conn *MsSQLServerConn) BcpImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BcpImportStream Import using bcp tool https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15 bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 Limitation: if comma or delimite is in field, it will error. need to use delimiter not in field, or do some other transformation
func (*MsSQLServerConn) BcpImportStreamParrallel ¶
func (conn *MsSQLServerConn) BcpImportStreamParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
BcpImportStreamParrallel uses goroutine to import partitioned files
func (*MsSQLServerConn) BulkImportFlow ¶
func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*MsSQLServerConn) BulkImportStream ¶
func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MsSQLServerConn) CopyFromAzure ¶
func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
CopyFromAzure uses the COPY INTO Table command from Azure https://docs.microsoft.com/en-us/sql/t-sql/statements/copy-into-transact-sql?view=azure-sqldw-latest
func (*MsSQLServerConn) CopyViaAzure ¶
func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Azure DWH COPY INTO Table command
func (*MsSQLServerConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*MsSQLServerConn) GetURL ¶
func (conn *MsSQLServerConn) GetURL(newURL ...string) string
GetURL returns the processed URL
type MySQLConn ¶
MySQLConn is a Postgres connection
func (*MySQLConn) BulkExportStream ¶
func (conn *MySQLConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream bulk Export
func (*MySQLConn) BulkImportStream ¶
func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MySQLConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
UPSERT https://vladmihalcea.com/how-do-upsert-and-merge-work-in-oracle-sql-server-postgresql-and-mysql/ GenerateUpsertSQL generates the upsert SQL
func (*MySQLConn) LoadDataInFile ¶
func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
LoadDataInFile Bulk Import
func (*MySQLConn) LoadDataOutFile ¶
LoadDataOutFile Bulk Export Possible error: ERROR 1227 (42000) at line 1: Access denied; you need (at least one of) the FILE privilege(s) for this operation File privilege needs to be granted to user also the --secure-file-priv option needs to be set properly for it to work. https://stackoverflow.com/questions/9819271/why-is-mysql-innodb-insert-so-slow to improve innodb insert speed
type OracleConn ¶
OracleConn is a Postgres connection
func (*OracleConn) BulkImportStream ¶
func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*OracleConn) ExecMultiContext ¶ added in v0.3.120
func (conn *OracleConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*OracleConn) GenerateInsertStatement ¶
func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*OracleConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*OracleConn) SQLLoad ¶
func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
SQLLoad uses sqlldr to Bulk Import cat test1.csv | sqlldr system/oracle@oracle.host:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr cannot import when newline in value. Need to scan for new lines.
type PostgresConn ¶
PostgresConn is a Postgres connection
func (*PostgresConn) BulkExportStream ¶
func (conn *PostgresConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream uses the bulk dumping (COPY)
func (*PostgresConn) BulkImportStream ¶
func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*PostgresConn) CastColumnForSelect ¶
func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*PostgresConn) CopyToStdout ¶
func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)
CopyToStdout Copy TO STDOUT
func (*PostgresConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
type RedshiftConn ¶
RedshiftConn is a Redshift connection
func (*RedshiftConn) BulkExportFlow ¶
func (conn *RedshiftConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*RedshiftConn) BulkExportStream ¶
func (conn *RedshiftConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream reads in bulk
func (*RedshiftConn) BulkImportFlow ¶
func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) BulkImportStream ¶
func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) ConnString ¶ added in v0.2.0
func (conn *RedshiftConn) ConnString() string
func (*RedshiftConn) CopyFromS3 ¶
func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string) (count uint64, err error)
CopyFromS3 uses the COPY INTO Table command from AWS S3
func (*RedshiftConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
type Result ¶ added in v0.0.5
type Result struct {
// contains filtered or unexported fields
}
func (Result) LastInsertId ¶ added in v0.0.5
func (Result) RowsAffected ¶ added in v0.0.5
type SQLiteConn ¶
SQLiteConn is a Google Big Query connection
func (*SQLiteConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *SQLiteConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*SQLiteConn) GetURL ¶ added in v0.3.113
func (conn *SQLiteConn) GetURL(newURL ...string) string
GetURL returns the processed URL
type Schemata ¶
Schemata contains the full schema for a connection
func GetSchemataAll ¶ added in v0.2.0
func GetSchemataAll(conn Connection) (schemata Schemata, err error)
GetSchemataAll obtains the schemata for all databases detected
func (*Schemata) LoadTablesJSON ¶ added in v0.2.0
LoadTablesJSON loads from a json string
type SnowflakeConn ¶
SnowflakeConn is a Snowflake connection
func (*SnowflakeConn) BulkExportFlow ¶
func (conn *SnowflakeConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*SnowflakeConn) BulkImportFlow ¶
func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*SnowflakeConn) BulkImportStream ¶
func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*SnowflakeConn) ConnString ¶ added in v0.2.0
func (conn *SnowflakeConn) ConnString() string
func (*SnowflakeConn) Connect ¶
func (conn *SnowflakeConn) Connect(timeOut ...int) error
Connect connects to the database
func (*SnowflakeConn) CopyFromAzure ¶
func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyFromS3 ¶
func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
CopyFromS3 uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyToAzure ¶
func (conn *SnowflakeConn) CopyToAzure(sqls ...string) (azPath string, err error)
CopyToAzure exports a query to an Azure location
func (*SnowflakeConn) CopyToS3 ¶
func (conn *SnowflakeConn) CopyToS3(sqls ...string) (s3Path string, err error)
CopyToS3 exports a query to an S3 location
func (*SnowflakeConn) CopyViaAWS ¶
func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAWS uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyViaAzure ¶
func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyViaStage ¶
func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaStage uses the Snowflake COPY INTO Table command https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*SnowflakeConn) GetColumnsFull ¶ added in v0.1.8
func (conn *SnowflakeConn) GetColumnsFull(tableFName string) (data iop.Dataset, err error)
GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`
func (*SnowflakeConn) GetDatabases ¶ added in v0.2.0
func (conn *SnowflakeConn) GetDatabases() (data iop.Dataset, err error)
GetDatabases returns the list of databases
func (*SnowflakeConn) GetFile ¶ added in v0.1.0
func (conn *SnowflakeConn) GetFile(internalStagePath, fPath string) (err error)
GetFile Copies from a staging location to a local file or folder
func (*SnowflakeConn) PutFile ¶
func (conn *SnowflakeConn) PutFile(fPath string, internalStagePath string) (err error)
PutFile Copies a local file or folder into a staging location
func (*SnowflakeConn) UnloadViaStage ¶ added in v0.1.0
func (conn *SnowflakeConn) UnloadViaStage(sqls ...string) (filePath string, err error)
type StatFieldSQL ¶ added in v0.3.65
type Table ¶
type Table struct { Name string `json:"name"` Schema string `json:"schema"` Database string `json:"database,omitempty"` IsView bool `json:"is_view,omitempty"` // whether is a view SQL string `json:"sql,omitempty"` Dialect dbio.Type `json:"dialect,omitempty"` Columns iop.Columns `json:"columns,omitempty"` }
Table represents a schemata table
func ParseTableName ¶ added in v0.3.111
type Template ¶
type Template struct { Core map[string]string Metadata map[string]string Analysis map[string]string Function map[string]string `yaml:"function"` GeneralTypeMap map[string]string `yaml:"general_type_map"` NativeTypeMap map[string]string `yaml:"native_type_map"` NativeStatsMap map[string]bool `yaml:"native_stat_map"` Variable map[string]string }
Template is a database YAML template
type Transaction ¶ added in v0.0.5
type Transaction interface { Context() *g.Context Commit() (err error) Rollback() (err error) Prepare(query string) (stmt *sql.Stmt, err error) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) }
type WhereClause ¶ added in v0.0.5
type WhereClause []interface{}
WhereClause is the where clause
func (WhereClause) Args ¶ added in v0.0.5
func (wc WhereClause) Args() []interface{}
Args returns the where clause arguments
func (WhereClause) Clause ¶ added in v0.0.5
func (wc WhereClause) Clause() string
Clause returns the string where clause