Documentation
¶
Index ¶
- Constants
- Variables
- func ChangeColumnTypeViaAdd(conn Connection, table Table, col iop.Column) (err error)
- func CommonColumns(colNames1 []string, colNames2 []string) (commCols []string)
- func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
- func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
- func EnsureBinSQLite() (binPath string, err error)
- func GenerateAlterDDL(conn Connection, table Table, newColumns iop.Columns) (bool, error)
- func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Columns, isTemp bool) (ok bool, ddlParts []string, err error)
- func GetQualifierQuote(dialect dbio.Type) string
- func InsertBatchStream(conn Connection, tx Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
- func InsertStream(conn Connection, tx *BaseTransaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
- func NativeTypeToGeneral(name, dbType string, conn Connection) (colType iop.ColumnType)
- func PK(obj interface{}) (pk []string)
- func ParseColumnName(text string, dialect dbio.Type) (colName string, err error)
- func ParseSQLMultiStatements(sql string, Dialect ...dbio.Type) (sqls []string)
- func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns)
- func SplitTableFullName(tableName string) (string, string)
- func TableExists(conn Connection, tableFName string) (exists bool, err error)
- func TestPermissions(conn Connection, tableName string) (err error)
- func UID(obj interface{}) string
- func Upsert(conn Connection, tx Transaction, sourceTable, targetTable string, ...) (count int64, err error)
- type BaseConn
- func (conn *BaseConn) AddMissingColumns(table Table, newCols iop.Columns) (ok bool, err error)
- func (conn *BaseConn) Base() *BaseConn
- func (conn *BaseConn) BaseURL() string
- func (conn *BaseConn) Begin(options ...*sql.TxOptions) (err error)
- func (conn *BaseConn) BeginContext(ctx context.Context, options ...*sql.TxOptions) (err error)
- func (conn *BaseConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportFlowCSV(table Table) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
- func (conn *BaseConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) string
- func (conn *BaseConn) CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string
- func (conn *BaseConn) Close() error
- func (conn *BaseConn) Commit() (err error)
- func (conn *BaseConn) CompareChecksums(tableName string, columns iop.Columns) (err error)
- func (conn *BaseConn) ConnString() string
- func (conn *BaseConn) Connect(timeOut ...int) (err error)
- func (conn *BaseConn) Context() *g.Context
- func (conn *BaseConn) CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error)
- func (conn *BaseConn) CreateTemporaryTable(tableName string, cols iop.Columns) (err error)
- func (conn *BaseConn) CurrentDatabase() (dbName string, err error)
- func (conn *BaseConn) Db() *sqlx.DB
- func (conn *BaseConn) DbX() *DbX
- func (conn *BaseConn) DropTable(tableNames ...string) (err error)
- func (conn *BaseConn) DropView(viewNames ...string) (err error)
- func (conn *BaseConn) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecMulti(sqls ...string) (result sql.Result, err error)
- func (conn *BaseConn) ExecMultiContext(ctx context.Context, qs ...string) (result sql.Result, err error)
- func (conn *BaseConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
- func (conn *BaseConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
- func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
- func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)
- func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetColumnsFull(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetCount(tableFName string) (uint64, error)
- func (conn *BaseConn) GetDDL(tableFName string) (string, error)
- func (conn *BaseConn) GetDatabases() (iop.Dataset, error)
- func (conn *BaseConn) GetGormConn(config *gorm.Config) (*gorm.DB, error)
- func (conn *BaseConn) GetIndexes(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetNativeType(col iop.Column) (nativeType string, err error)
- func (conn *BaseConn) GetObjects(schema string, objectType string) (iop.Dataset, error)
- func (conn *BaseConn) GetPrimaryKeys(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetProp(key ...string) string
- func (conn *BaseConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
- func (conn *BaseConn) GetSchemas() (iop.Dataset, error)
- func (conn *BaseConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
- func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetTables(schema string) (iop.Dataset, error)
- func (conn *BaseConn) GetTablesAndViews(schema string) (iop.Dataset, error)
- func (conn *BaseConn) GetTemplateValue(path string) (value string)
- func (conn *BaseConn) GetType() dbio.Type
- func (conn *BaseConn) GetURL(newURL ...string) string
- func (conn *BaseConn) GetViews(schema string) (iop.Dataset, error)
- func (conn *BaseConn) Import(data iop.Dataset, tableName string) error
- func (conn *BaseConn) Info() (ci ConnInfo)
- func (conn *BaseConn) Init() (err error)
- func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) Kill() error
- func (conn *BaseConn) LoadTemplates() (err error)
- func (conn *BaseConn) LogSQL(query string, args ...any)
- func (conn *BaseConn) MustExec(sql string, args ...interface{}) (result sql.Result)
- func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
- func (conn *BaseConn) OptimizeTable(table *Table, newColumns iop.Columns, isTemp ...bool) (ok bool, err error)
- func (conn *BaseConn) Prepare(query string) (stmt *sql.Stmt, err error)
- func (conn *BaseConn) ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error)
- func (conn *BaseConn) PropArr() []string
- func (conn *BaseConn) PropArrExclude(exclude ...string) []string
- func (conn *BaseConn) Props() map[string]string
- func (conn *BaseConn) Query(sql string, options ...map[string]interface{}) (data iop.Dataset, err error)
- func (conn *BaseConn) QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (data iop.Dataset, err error)
- func (conn *BaseConn) Quote(field string, normalize ...bool) string
- func (conn *BaseConn) ReplaceProps(newProps map[string]string)
- func (conn *BaseConn) Rollback() (err error)
- func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)
- func (conn *BaseConn) Schemata() Schemata
- func (conn *BaseConn) Self() Connection
- func (conn *BaseConn) SetProp(key string, val string)
- func (conn *BaseConn) StreamRecords(sql string) (<-chan map[string]interface{}, error)
- func (conn *BaseConn) StreamRows(sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
- func (conn *BaseConn) StreamRowsContext(ctx context.Context, query string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
- func (conn *BaseConn) SubmitTemplate(level string, templateMap map[string]string, name string, ...) (data iop.Dataset, err error)
- func (conn *BaseConn) SwapTable(srcTable string, tgtTable string) (err error)
- func (conn *BaseConn) Template() dbio.Template
- func (conn *BaseConn) Tx() Transaction
- func (conn *BaseConn) Unquote(field string) string
- func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
- func (conn *BaseConn) ValidateColumnNames(tgtCols iop.Columns, colNames []string, quote bool) (newCols iop.Columns, err error)
- type BaseTransaction
- func (t *BaseTransaction) Commit() (err error)
- func (t *BaseTransaction) Connection() Connection
- func (t *BaseTransaction) Context() *g.Context
- func (t *BaseTransaction) DisableTrigger(tableName, triggerName string) (err error)
- func (t *BaseTransaction) EnableTrigger(tableName, triggerName string) (err error)
- func (t *BaseTransaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (t *BaseTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BaseTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BaseTransaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (t *BaseTransaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (t *BaseTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *BaseTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *BaseTransaction) Rollback() (err error)
- func (t *BaseTransaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)
- func (t *BaseTransaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)
- type BigQueryConn
- func (conn *BigQueryConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *BigQueryConn) Close() error
- func (conn *BigQueryConn) Connect(timeOut ...int) error
- func (conn *BigQueryConn) CopyFromGCS(gcsURI string, table Table, dsColumns []iop.Column) error
- func (conn *BigQueryConn) CopyFromLocal(localURI string, table Table, dsColumns []iop.Column) error
- func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error
- func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BigQueryConn) ExportToGCS(sql string, gcsURI string) error
- func (conn *BigQueryConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *BigQueryConn) GetDatabases() (iop.Dataset, error)
- func (conn *BigQueryConn) GetSchemas() (iop.Dataset, error)
- func (conn *BigQueryConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
- func (conn *BigQueryConn) Init() error
- func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) LoadCSVFromReader(table Table, reader io.Reader, dsColumns []iop.Column) error
- func (conn *BigQueryConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
- func (conn *BigQueryConn) Unload(tables ...Table) (gsPath string, err error)
- type BigTableAction
- type BigTableConn
- func (conn *BigTableConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *BigTableConn) Close() error
- func (conn *BigTableConn) Connect(timeOut ...int) error
- func (conn *BigTableConn) ExecContext(ctx context.Context, payload string, args ...interface{}) (result sql.Result, err error)
- func (conn *BigTableConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BigTableConn) GetColumnsFull(tableFName string) (iop.Dataset, error)
- func (conn *BigTableConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
- func (conn *BigTableConn) GetSchemas() (iop.Dataset, error)
- func (conn *BigTableConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (schemata Schemata, err error)
- func (conn *BigTableConn) GetTables(schema string) (data iop.Dataset, err error)
- func (conn *BigTableConn) GetViews(schema string) (data iop.Dataset, err error)
- func (conn *BigTableConn) Init() error
- func (conn *BigTableConn) InsertBatchStream(table string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigTableConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *BigTableConn) StreamRowsContext(ctx context.Context, table string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
- type BigTableQuery
- type BlankTransaction
- func (t *BlankTransaction) Commit() (err error)
- func (t *BlankTransaction) Connection() Connection
- func (t *BlankTransaction) Context() *g.Context
- func (t *BlankTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BlankTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BlankTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *BlankTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *BlankTransaction) Rollback() (err error)
- type ClickhouseConn
- func (conn *ClickhouseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *ClickhouseConn) ConnString() string
- func (conn *ClickhouseConn) Connect(timeOut ...int) (err error)
- func (conn *ClickhouseConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (ddl string, err error)
- func (conn *ClickhouseConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
- func (conn *ClickhouseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *ClickhouseConn) GetNativeType(col iop.Column) (nativeType string, err error)
- func (conn *ClickhouseConn) Init() error
- func (conn *ClickhouseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
- type ColumnType
- type ConnInfo
- type Connection
- type DataAnalyzer
- func (da *DataAnalyzer) AnalyzeColumns(sampleSize int, includeViews bool) (err error)
- func (da *DataAnalyzer) GetManyToMany(nonUniqueCols iop.Columns, asString bool) (err error)
- func (da *DataAnalyzer) GetOneToMany(uniqueCols, nonUniqueCols iop.Columns, asString bool) (err error)
- func (da *DataAnalyzer) GetOneToOne(uniqueCols iop.Columns, asString bool) (err error)
- func (da *DataAnalyzer) GetSchemata(force bool) (err error)
- func (da *DataAnalyzer) ProcessRelations() (err error)
- func (da *DataAnalyzer) ProcessRelationsInteger() (err error)
- func (da *DataAnalyzer) ProcessRelationsString() (err error)
- func (da *DataAnalyzer) WriteRelationsYaml(path string) (err error)
- type DataAnalyzerOptions
- type Database
- type DbX
- func (x *DbX) Delete(o interface{}) (cnt int, err error)
- func (x *DbX) Get(o interface{}, fields ...string) (err error)
- func (x *DbX) Insert(o interface{}, fields ...string) (err error)
- func (x *DbX) Select(o interface{}, fields ...string) (err error)
- func (x *DbX) TableName(o interface{}) (name string)
- func (x *DbX) Update(o interface{}, fields ...string) (cnt int, err error)
- func (x *DbX) Upsert(o interface{}, fields ...string) (cnt int, err error)
- func (x *DbX) Where(where ...interface{}) *DbX
- type DuckDbConn
- func (conn *DuckDbConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *DuckDbConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *DuckDbConn) Close() (err error)
- func (conn *DuckDbConn) Connect(timeOut ...int) (err error)
- func (conn *DuckDbConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *DuckDbConn) ExecMultiContext(ctx context.Context, sqls ...string) (result sql.Result, err error)
- func (conn *DuckDbConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *DuckDbConn) GetURL(newURL ...string) string
- func (conn *DuckDbConn) Init() error
- func (conn *DuckDbConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *DuckDbConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *DuckDbConn) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
- type ManualTransaction
- func (t *ManualTransaction) Commit() (err error)
- func (t *ManualTransaction) Context() *g.Context
- func (t *ManualTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *ManualTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *ManualTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *ManualTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *ManualTransaction) Rollback() (err error)
- type ModelDbX
- func (m *ModelDbX) Bind(bindFunc func(p interface{}) error, objPtr interface{}) (err error)
- func (m *ModelDbX) Delete(db *sqlx.DB) (err error)
- func (m *ModelDbX) Fields() (fields []string)
- func (m *ModelDbX) Get(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Insert(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Rec() map[string]interface{}
- func (m *ModelDbX) Select(db *sqlx.DB, objPtr interface{}, fields ...string) (err error)
- func (m *ModelDbX) TableName(objPtr interface{}) string
- func (m *ModelDbX) Update(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Values(fields []string) (values []interface{}, err error)
- func (m *ModelDbX) Where(where ...interface{}) *ModelDbX
- type MongoDBConn
- func (conn *MongoDBConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *MongoDBConn) Close() error
- func (conn *MongoDBConn) Connect(timeOut ...int) error
- func (conn *MongoDBConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *MongoDBConn) GetSchemas() (data iop.Dataset, err error)
- func (conn *MongoDBConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
- func (conn *MongoDBConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
- func (conn *MongoDBConn) GetTables(schema string) (data iop.Dataset, err error)
- func (conn *MongoDBConn) Init() error
- func (conn *MongoDBConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *MongoDBConn) StreamRowsContext(ctx context.Context, collectionName string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error)
- type MsSQLServerConn
- func (conn *MsSQLServerConn) BcpExport() (err error)
- func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count uint64, err error)
- func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) ConnString() string
- func (conn *MsSQLServerConn) Connect(timeOut ...int) (err error)
- func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
- func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
- func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *MsSQLServerConn) GetURL(newURL ...string) string
- func (conn *MsSQLServerConn) Init() error
- type MySQLConn
- func (conn *MySQLConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
- func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
- func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *MySQLConn) GetURL(newURL ...string) string
- func (conn *MySQLConn) Init() error
- func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) LoadDataOutFile(ctx *g.Context, sql string) (stdOutReader io.Reader, err error)
- type OracleConn
- func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *OracleConn) ConnString() string
- func (conn *OracleConn) Connect(timeOut ...int) (err error)
- func (conn *OracleConn) ExecMultiContext(ctx context.Context, qs ...string) (result sql.Result, err error)
- func (conn *OracleConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
- func (conn *OracleConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
- func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *OracleConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
- func (conn *OracleConn) Init() error
- func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *OracleConn) SubmitTemplate(level string, templateMap map[string]string, name string, ...) (data iop.Dataset, err error)
- func (conn *OracleConn) Version() int
- type Pool
- type PostgresConn
- func (conn *PostgresConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
- func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *PostgresConn) CopyToStdout(ctx *g.Context, sql string) (stdOutReader io.Reader, err error)
- func (conn *PostgresConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (ddl string, err error)
- func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *PostgresConn) Init() error
- type PrometheusConn
- func (conn *PrometheusConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *PrometheusConn) Close() error
- func (conn *PrometheusConn) Connect(timeOut ...int) error
- func (conn *PrometheusConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *PrometheusConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
- func (conn *PrometheusConn) GetSchemas() (data iop.Dataset, err error)
- func (conn *PrometheusConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
- func (conn *PrometheusConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
- func (conn *PrometheusConn) GetTables(schema string) (data iop.Dataset, err error)
- func (conn *PrometheusConn) Init() error
- func (conn *PrometheusConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *PrometheusConn) StreamRowsContext(ctx context.Context, query string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error)
- type ProtonConn
- func (conn *ProtonConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *ProtonConn) ConnString() string
- func (conn *ProtonConn) Connect(timeOut ...int) (err error)
- func (conn *ProtonConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *ProtonConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
- func (conn *ProtonConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *ProtonConn) GetCount(tableFName string) (uint64, error)
- func (conn *ProtonConn) Init() error
- func (conn *ProtonConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
- type RedshiftConn
- func (conn *RedshiftConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *RedshiftConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
- func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *RedshiftConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *RedshiftConn) ConnString() string
- func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string, columns iop.Columns) (count uint64, err error)
- func (conn *RedshiftConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *RedshiftConn) Init() error
- func (conn *RedshiftConn) OptimizeTable(table *Table, newColumns iop.Columns, isTemp ...bool) (ok bool, err error)
- func (conn *RedshiftConn) Unload(ctx *g.Context, tables ...Table) (s3Path string, err error)
- func (conn *RedshiftConn) WarnStlLoadErrors(err error)
- type Relation
- type Result
- type SQLiteConn
- func (conn *SQLiteConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *SQLiteConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
- func (conn *SQLiteConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *SQLiteConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
- func (conn *SQLiteConn) GetURL(newURL ...string) string
- func (conn *SQLiteConn) Init() error
- type Schema
- type Schemata
- func (s *Schemata) Columns(filters ...string) map[string]iop.Column
- func (s *Schemata) Database() Database
- func (s *Schemata) Filtered(columnLevel bool, filters ...string) (ns Schemata)
- func (s *Schemata) LoadTablesJSON(payload string) error
- func (s *Schemata) Tables(filters ...string) map[string]Table
- type SchemataLevel
- type SnowflakeConn
- func (conn *SnowflakeConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *SnowflakeConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *SnowflakeConn) ConnString() string
- func (conn *SnowflakeConn) Connect(timeOut ...int) error
- func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
- func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
- func (conn *SnowflakeConn) CopyToAzure(tables ...Table) (azPath string, err error)
- func (conn *SnowflakeConn) CopyToS3(tables ...Table) (s3Path string, err error)
- func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *SnowflakeConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
- func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *SnowflakeConn) GetColumnsFull(tableFName string) (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetDatabases() (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetSchemas() (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetTables(schema string) (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetTablesAndViews(schema string) (iop.Dataset, error)
- func (conn *SnowflakeConn) GetViews(schema string) (data iop.Dataset, err error)
- func (conn *SnowflakeConn) Init() error
- func (conn *SnowflakeConn) StageGET(internalStagePath, folderPath string) (filePaths []string, err error)
- func (conn *SnowflakeConn) StagePUT(fileURI string, internalStagePath string) (err error)
- func (conn *SnowflakeConn) UnloadViaStage(tables ...Table) (filePath string, unloaded int64, err error)
- type StarRocksConn
- func (conn *StarRocksConn) AddMissingColumns(table Table, newCols iop.Columns) (ok bool, err error)
- func (conn *StarRocksConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *StarRocksConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *StarRocksConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
- func (conn *StarRocksConn) GetDatabases() (data iop.Dataset, err error)
- func (conn *StarRocksConn) GetURL(newURL ...string) string
- func (conn *StarRocksConn) Init() error
- func (conn *StarRocksConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *StarRocksConn) OptimizeTable(table *Table, newColumns iop.Columns, isTemp ...bool) (ok bool, err error)
- func (conn *StarRocksConn) StreamLoad(feURL, tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *StarRocksConn) WaitAlterTable(table Table) (err error)
- type StatFieldSQL
- type Table
- func (t *Table) AddPrimaryKeyToDDL(ddl string, columns iop.Columns) (string, error)
- func (t *Table) Clone() Table
- func (t *Table) ColumnsMap() map[string]iop.Column
- func (t *Table) DatabaseQ() string
- func (t *Table) FDQN() string
- func (t *Table) FullName() string
- func (t *Table) Indexes(columns iop.Columns) (indexes []TableIndex)
- func (t *Table) IsQuery() bool
- func (t *Table) MarshalJSON() ([]byte, error)
- func (t *Table) NameQ() string
- func (t *Table) SchemaQ() string
- func (t *Table) Select(limit, offset int, fields ...string) (sql string)
- func (t *Table) SetKeys(sourcePKCols []string, updateCol string, tableKeys TableKeys) error
- type TableIndex
- type TableKeys
- type Transaction
- type TrinoConn
- func (conn *TrinoConn) ConnString() string
- func (conn *TrinoConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *TrinoConn) Init() error
- func (conn *TrinoConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- type User
- type WhereClause
Constants ¶
const RelationManyToMany = "many_to_many"
const RelationManyToOne = "many_to_one"
const RelationOneToMany = "one_to_many"
const RelationOneToOne = "one_to_one"
Variables ¶
var Debug = false
Debug prints queries when true
var DuckDbFileCmd = map[string]*exec.Cmd{}
var DuckDbFileContext = map[string]*g.Context{} // so that collision doesn't happen
var DuckDbMux = sync.Mutex{}
var DuckDbUseTempFile = false
var InferDBStream = false
InferDBStream may need to be `true`, since precision and scale is not guaranteed. If `false`, will use the database stream source schema
var PartitionByColumn = func(conn Connection, table Table, c string, p int) ([]Table, error) { return []Table{table}, nil }
var PartitionByOffset = func(conn Connection, table Table, l int) ([]Table, error) { return []Table{table}, nil }
var SQLiteVersion = "3.41"
var ( // UseBulkExportFlowCSV to use BulkExportFlowCSV UseBulkExportFlowCSV = false )
Functions ¶
func ChangeColumnTypeViaAdd ¶
func ChangeColumnTypeViaAdd(conn Connection, table Table, col iop.Column) (err error)
ChangeColumnTypeViaAdd swaps a new column with the old in order to change the type need to use this with snowflake when changing from date to string, or number to string
func CommonColumns ¶
CommonColumns return common columns
func CopyFromAzure ¶
func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func CopyFromS3 ¶
func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
func EnsureBinSQLite ¶
EnsureBinSQLite ensures sqlite binary exists if missing, downloads and uses
func GenerateAlterDDL ¶
GenerateAlterDDL generate a DDL based on a dataset
func GetOptimizeTableStatements ¶ added in v1.1.8
func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Columns, isTemp bool) (ok bool, ddlParts []string, err error)
GetOptimizeTableStatements analyzes the table and alters the table with the columns data type based on its analysis result if table is missing, it is created with a new DDl Hole in this: will truncate data points, since it is based only on new data being inserted... would need a complete stats of the target table to properly optimize.
func GetQualifierQuote ¶
func InsertBatchStream ¶
func InsertBatchStream(conn Connection, tx Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func InsertStream ¶
func InsertStream(conn Connection, tx *BaseTransaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream inserts a stream
func NativeTypeToGeneral ¶
func NativeTypeToGeneral(name, dbType string, conn Connection) (colType iop.ColumnType)
func ParseColumnName ¶
func ParseSQLMultiStatements ¶
ParseSQLMultiStatements splits a sql text into statements typically by a ';'
func SQLColumns ¶
func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns)
SQLColumns returns the columns from database ColumnType
func SplitTableFullName ¶
SplitTableFullName retrusn the schema / table name
func TableExists ¶
func TableExists(conn Connection, tableFName string) (exists bool, err error)
TableExists returns true if the table exists
func TestPermissions ¶
func TestPermissions(conn Connection, tableName string) (err error)
TestPermissions tests the needed permissions in a given connection
func Upsert ¶
func Upsert(conn Connection, tx Transaction, sourceTable, targetTable string, pkFields []string) (count int64, err error)
Upsert upserts from source table into target table
Types ¶
type BaseConn ¶
type BaseConn struct { Connection URL string Type dbio.Type // the type of database for sqlx: postgres, mysql, sqlite Data iop.Dataset Log []string // contains filtered or unexported fields }
BaseConn is a database connection
func (*BaseConn) AddMissingColumns ¶ added in v1.1.8
func (*BaseConn) BeginContext ¶
BeginContext starts a connection wide transaction
func (*BaseConn) BulkExportFlow ¶
BulkExportFlow creates a dataflow from a sql query
func (*BaseConn) BulkExportFlowCSV ¶
BulkExportFlowCSV creates a dataflow from a sql query, using CSVs
func (*BaseConn) BulkExportStream ¶
func (conn *BaseConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
BulkExportStream streams the rows in bulk
func (*BaseConn) BulkImportFlow ¶
BulkImportFlow imports the streams rows in bulk concurrently using channels
func (*BaseConn) BulkImportStream ¶
func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream import the stream rows in bulk
func (*BaseConn) CastColumnForSelect ¶
CastColumnForSelect casts to the correct target column type
func (*BaseConn) CastColumnsForSelect ¶
CastColumnsForSelect cast the source columns into the target Column types
func (*BaseConn) CompareChecksums ¶
CompareChecksums compares the checksum values from the database side to the checkum values from the StreamProcessor
func (*BaseConn) ConnString ¶
ConnString returns the connection string needed for connection
func (*BaseConn) CreateTable ¶
CreateTable creates a new table based on provided columns `tableName` should have 'schema.table' format
func (*BaseConn) CreateTemporaryTable ¶
CreateTemporaryTable creates a temp table based on provided columns
func (*BaseConn) CurrentDatabase ¶
CurrentDatabase returns the name of the current database
func (*BaseConn) ExecContext ¶
func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BaseConn) ExecMultiContext ¶
func (conn *BaseConn) ExecMultiContext(ctx context.Context, qs ...string) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*BaseConn) GenerateDDL ¶
GenerateDDL genrate a DDL based on a dataset
func (*BaseConn) GenerateInsertStatement ¶
func (conn *BaseConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*BaseConn) GenerateUpsertExpressions ¶
func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
GenerateUpsertExpressions returns a map with needed expressions
func (*BaseConn) GenerateUpsertSQL ¶
func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL returns a sql for upsert
func (*BaseConn) GetAnalysis ¶
func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)
GetAnalysis runs an analysis
func (*BaseConn) GetColumnStats ¶
func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
GetColumnStats analyzes the table and returns the column statistics
func (*BaseConn) GetColumns ¶
func (*BaseConn) GetColumnsFull ¶
GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`
func (*BaseConn) GetDatabases ¶
GetDatabases returns databases for given connection
func (*BaseConn) GetGormConn ¶
GetGormConn returns the gorm db connection
func (*BaseConn) GetIndexes ¶
GetIndexes returns indexes for given table.
func (*BaseConn) GetNativeType ¶
GetNativeType returns the native column type from generic
func (*BaseConn) GetObjects ¶
GetObjects returns objects (tables or views) for given schema `objectType` can be either 'table', 'view' or 'all'
func (*BaseConn) GetPrimaryKeys ¶
GetPrimaryKeys returns primark keys for given table.
func (*BaseConn) GetSQLColumns ¶
GetSQLColumns return columns from a sql query result
func (*BaseConn) GetSchemas ¶
GetSchemas returns schemas
func (*BaseConn) GetSchemata ¶
func (conn *BaseConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*BaseConn) GetTableColumns ¶
func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
GetColumns returns columns for given table. `tableFName` should include schema and table, example: `schema1.table2` fields should be `column_name|data_type`
func (*BaseConn) GetTablesAndViews ¶ added in v1.2.19
GetTablesAndViews returns tables/views for given schema
func (*BaseConn) GetTemplateValue ¶
GetTemplateValue returns the value of the path
func (*BaseConn) InsertBatchStream ¶
func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BaseConn) InsertStream ¶
InsertStream inserts a stream into a table
func (*BaseConn) LoadTemplates ¶
LoadTemplates loads the appropriate yaml template
func (*BaseConn) MustExec ¶
MustExec execs the query using e and panics if there was an error. Any placeholder parameters are replaced with supplied args.
func (*BaseConn) NewTransaction ¶
func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
NewTransaction creates a new transaction
func (*BaseConn) OptimizeTable ¶
func (*BaseConn) ProcessTemplate ¶
func (conn *BaseConn) ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error)
ProcessTemplate processes a template SQL text at a given level
func (*BaseConn) PropArrExclude ¶ added in v1.2.14
func (*BaseConn) Query ¶
func (conn *BaseConn) Query(sql string, options ...map[string]interface{}) (data iop.Dataset, err error)
Query runs a sql query, returns `result`, `error`
func (*BaseConn) QueryContext ¶
func (conn *BaseConn) QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (data iop.Dataset, err error)
QueryContext runs a sql query with ctx, returns `result`, `error`
func (*BaseConn) ReplaceProps ¶ added in v1.2.16
ReplaceProps used when reusing a connection since the provided props can change, this is used to delete old original props and set new ones
func (*BaseConn) RunAnalysis ¶
func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)
RunAnalysis runs an analysis
func (*BaseConn) Self ¶
func (conn *BaseConn) Self() Connection
Self returns the respective connection Instance This is useful to refer back to a subclass method from the superclass level. (Aka overloading)
func (*BaseConn) StreamRecords ¶
StreamRecords the records of a sql query, returns `result`, `error`
func (*BaseConn) StreamRows ¶
func (conn *BaseConn) StreamRows(sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
StreamRows the rows of a sql query, returns `result`, `error`
func (*BaseConn) StreamRowsContext ¶
func (conn *BaseConn) StreamRowsContext(ctx context.Context, query string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`
func (*BaseConn) SubmitTemplate ¶ added in v1.2.4
func (*BaseConn) Upsert ¶
func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types
func (*BaseConn) ValidateColumnNames ¶
func (conn *BaseConn) ValidateColumnNames(tgtCols iop.Columns, colNames []string, quote bool) (newCols iop.Columns, err error)
ValidateColumnNames verifies that source fields are present in the target table It will return quoted field names as `newColNames`, the same length as `colNames`
type BaseTransaction ¶
type BaseTransaction struct { Tx *sqlx.Tx Conn Connection // contains filtered or unexported fields }
BaseTransaction is a database transaction
func (*BaseTransaction) Commit ¶
func (t *BaseTransaction) Commit() (err error)
func (*BaseTransaction) Connection ¶
func (t *BaseTransaction) Connection() Connection
Connection return the connection
func (*BaseTransaction) Context ¶
func (t *BaseTransaction) Context() *g.Context
Commit commits connection wide transaction
func (*BaseTransaction) DisableTrigger ¶
func (t *BaseTransaction) DisableTrigger(tableName, triggerName string) (err error)
DisableTrigger disables a trigger
func (*BaseTransaction) EnableTrigger ¶
func (t *BaseTransaction) EnableTrigger(tableName, triggerName string) (err error)
EnableTrigger enables a trigger
func (*BaseTransaction) Exec ¶
func (t *BaseTransaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)
Exec runs a sql query, returns `error`
func (*BaseTransaction) ExecContext ¶
func (t *BaseTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BaseTransaction) ExecMultiContext ¶
func (t *BaseTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*BaseTransaction) InsertBatchStream ¶
func (t *BaseTransaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BaseTransaction) InsertStream ¶
func (t *BaseTransaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream inserts a stream into a table
func (*BaseTransaction) Prepare ¶
func (t *BaseTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
Prepare prepares the statement
func (*BaseTransaction) QueryContext ¶
func (t *BaseTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
QueryContext queries rows
func (*BaseTransaction) Rollback ¶
func (t *BaseTransaction) Rollback() (err error)
Rollback rolls back connection wide transaction
func (*BaseTransaction) Upsert ¶
func (t *BaseTransaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)
Upsert does an upsert from source table into target table
func (*BaseTransaction) UpsertStream ¶
func (t *BaseTransaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)
UpsertStream inserts a stream into a table in batch
type BigQueryConn ¶
type BigQueryConn struct { BaseConn URL string Client *bigquery.Client ProjectID string DatasetID string Location string Datasets []string Mux sync.Mutex }
BigQueryConn is a Google Big Query connection
func (*BigQueryConn) BulkExportFlow ¶
func (conn *BigQueryConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*BigQueryConn) BulkImportFlow ¶
func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in GCS and then use the COPY command.
func (*BigQueryConn) BulkImportStream ¶
func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) CastColumnForSelect ¶
func (conn *BigQueryConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*BigQueryConn) Connect ¶
func (conn *BigQueryConn) Connect(timeOut ...int) error
Connect connects to the database
func (*BigQueryConn) CopyFromGCS ¶
func (*BigQueryConn) CopyFromLocal ¶
CopyFromGCS into bigquery from google storage
func (*BigQueryConn) CopyToGCS ¶
func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error
func (*BigQueryConn) ExecContext ¶
func (*BigQueryConn) ExportToGCS ¶
func (conn *BigQueryConn) ExportToGCS(sql string, gcsURI string) error
CopyToGCS Copy table to gc storage
func (*BigQueryConn) GenerateDDL ¶ added in v1.1.4
func (conn *BigQueryConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*BigQueryConn) GenerateUpsertSQL ¶
func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*BigQueryConn) GetDatabases ¶
func (conn *BigQueryConn) GetDatabases() (iop.Dataset, error)
GetDatabases returns databases
func (*BigQueryConn) GetSchemas ¶
func (conn *BigQueryConn) GetSchemas() (iop.Dataset, error)
GetSchemas returns schemas
func (*BigQueryConn) GetSchemata ¶
func (conn *BigQueryConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*BigQueryConn) InsertBatchStream ¶
func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BigQueryConn) InsertStream ¶
func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) LoadCSVFromReader ¶
func (conn *BigQueryConn) LoadCSVFromReader(table Table, reader io.Reader, dsColumns []iop.Column) error
LoadCSVFromReader demonstrates loading data into a BigQuery table using a file on the local filesystem. https://cloud.google.com/bigquery/docs/batch-loading-data#loading_data_from_local_files
func (*BigQueryConn) NewTransaction ¶
func (conn *BigQueryConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*BigQueryConn) StreamRowsContext ¶
func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
type BigTableAction ¶
type BigTableAction string
const BTCreateColumnFamily BigTableAction = "create_column_family"
const BTCreateTable BigTableAction = "create_table"
const BTDeleteTable BigTableAction = "delete_table"
const BTTableInfo BigTableAction = "table_info"
type BigTableConn ¶
type BigTableConn struct { BaseConn URL string Client *bigtable.Client ProjectID string InstanceID string Location string }
BigTableConn is a Google Big Query connection
func (*BigTableConn) BulkExportFlow ¶
func (conn *BigTableConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
func (*BigTableConn) Connect ¶
func (conn *BigTableConn) Connect(timeOut ...int) error
Connect connects to the database
func (*BigTableConn) ExecContext ¶
func (*BigTableConn) GetColumns ¶
func (*BigTableConn) GetColumnsFull ¶
func (conn *BigTableConn) GetColumnsFull(tableFName string) (iop.Dataset, error)
func (*BigTableConn) GetSQLColumns ¶
func (conn *BigTableConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
GetTables returns tables for given schema
func (*BigTableConn) GetSchemas ¶
func (conn *BigTableConn) GetSchemas() (iop.Dataset, error)
func (*BigTableConn) GetSchemata ¶
func (conn *BigTableConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (schemata Schemata, err error)
func (*BigTableConn) GetTables ¶
func (conn *BigTableConn) GetTables(schema string) (data iop.Dataset, err error)
func (*BigTableConn) GetViews ¶
func (conn *BigTableConn) GetViews(schema string) (data iop.Dataset, err error)
GetTables returns tables for given schema
func (*BigTableConn) InsertBatchStream ¶
func (conn *BigTableConn) InsertBatchStream(table string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BigTableConn) NewTransaction ¶
func (conn *BigTableConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*BigTableConn) StreamRowsContext ¶
func (conn *BigTableConn) StreamRowsContext(ctx context.Context, table string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
type BigTableQuery ¶
type BigTableQuery struct { Action BigTableAction `json:"action"` Table string `json:"table"` ColumnFamilies []string `json:"column_family"` }
type BlankTransaction ¶
type BlankTransaction struct { Conn Connection // contains filtered or unexported fields }
func (*BlankTransaction) Commit ¶
func (t *BlankTransaction) Commit() (err error)
func (*BlankTransaction) Connection ¶
func (t *BlankTransaction) Connection() Connection
func (*BlankTransaction) Context ¶
func (t *BlankTransaction) Context() *g.Context
func (*BlankTransaction) ExecContext ¶
func (*BlankTransaction) ExecMultiContext ¶
func (*BlankTransaction) Prepare ¶
func (t *BlankTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
func (*BlankTransaction) QueryContext ¶
func (*BlankTransaction) Rollback ¶
func (t *BlankTransaction) Rollback() (err error)
type ClickhouseConn ¶
ClickhouseConn is a Clikchouse connection
func (*ClickhouseConn) BulkImportStream ¶
func (conn *ClickhouseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*ClickhouseConn) ConnString ¶
func (conn *ClickhouseConn) ConnString() string
func (*ClickhouseConn) Connect ¶ added in v1.1.12
func (conn *ClickhouseConn) Connect(timeOut ...int) (err error)
func (*ClickhouseConn) GenerateDDL ¶ added in v1.1.4
func (conn *ClickhouseConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (ddl string, err error)
GenerateDDL generates a DDL based on a dataset
func (*ClickhouseConn) GenerateInsertStatement ¶
func (conn *ClickhouseConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*ClickhouseConn) GenerateUpsertSQL ¶
func (conn *ClickhouseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*ClickhouseConn) GetNativeType ¶ added in v1.2.15
func (conn *ClickhouseConn) GetNativeType(col iop.Column) (nativeType string, err error)
func (*ClickhouseConn) NewTransaction ¶
func (conn *ClickhouseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
NewTransaction creates a new transaction
type ColumnType ¶
type Connection ¶
type Connection interface { Base() *BaseConn BaseURL() string Begin(options ...*sql.TxOptions) error BeginContext(ctx context.Context, options ...*sql.TxOptions) error BulkExportFlow(table Table) (*iop.Dataflow, error) BulkExportStream(table Table) (*iop.Datastream, error) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error) CastColumnForSelect(srcColumn iop.Column, tgtColumn iop.Column) string CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string Close() error Commit() error CompareChecksums(tableName string, columns iop.Columns) (err error) Connect(timeOut ...int) error ConnString() string Context() *g.Context CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error) CreateTemporaryTable(tableName string, cols iop.Columns) (err error) CurrentDatabase() (string, error) Db() *sqlx.DB DbX() *DbX DropTable(...string) error DropView(...string) error Exec(sql string, args ...interface{}) (result sql.Result, err error) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) ExecMulti(sqls ...string) (result sql.Result, err error) ExecMultiContext(ctx context.Context, sqls ...string) (result sql.Result, err error) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error) GetAnalysis(string, map[string]interface{}) (string, error) GetColumns(tableFName string, fields ...string) (iop.Columns, error) GetColumnsFull(string) (iop.Dataset, error) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error) GetCount(string) (uint64, error) GetDatabases() (iop.Dataset, error) GetDDL(string) (string, error) GetGormConn(config *gorm.Config) (*gorm.DB, error) GetIndexes(string) (iop.Dataset, error) GetNativeType(col iop.Column) (nativeType string, err error) GetPrimaryKeys(string) (iop.Dataset, error) GetProp(...string) string GetSchemas() (iop.Dataset, error) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error) GetSQLColumns(table Table) (columns iop.Columns, err error) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error) GetTablesAndViews(string) (iop.Dataset, error) GetTables(string) (iop.Dataset, error) GetTemplateValue(path string) (value string) GetType() dbio.Type GetURL(newURL ...string) string GetViews(string) (iop.Dataset, error) Info() ConnInfo Init() error InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error) Kill() error LoadTemplates() error MustExec(sql string, args ...interface{}) (result sql.Result) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error) OptimizeTable(table *Table, columns iop.Columns, isTemp ...bool) (ok bool, err error) Prepare(query string) (stmt *sql.Stmt, err error) ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error) Props() map[string]string PropsArr() []string Query(sql string, options ...map[string]interface{}) (iop.Dataset, error) QueryContext(ctx context.Context, sql string, options ...map[string]interface{}) (iop.Dataset, error) Quote(field string, normalize ...bool) string RenameTable(table string, newTable string) (err error) Rollback() error RunAnalysis(string, map[string]interface{}) (iop.Dataset, error) Schemata() Schemata Self() Connection SetProp(string, string) StreamRecords(sql string) (<-chan map[string]interface{}, error) StreamRows(sql string, options ...map[string]interface{}) (*iop.Datastream, error) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error) SubmitTemplate(level string, templateMap map[string]string, name string, values map[string]interface{}) (data iop.Dataset, err error) SwapTable(srcTable string, tgtTable string) (err error) Template() dbio.Template Tx() Transaction Unquote(string) string Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error) ValidateColumnNames(tgtCols iop.Columns, colNames []string, quote bool) (newCols iop.Columns, err error) AddMissingColumns(table Table, newCols iop.Columns) (ok bool, err error) }
Connection is the Base interface for Connections
func Clone ¶
func Clone(conn Connection) (newConn Connection, err error)
func NewConn ¶
func NewConn(URL string, props ...string) (Connection, error)
NewConn return the most proper connection for a given database
func NewConnContext ¶
NewConnContext return the most proper connection for a given database with context props are provided as `"Prop1=Value1", "Prop2=Value2", ...`
type DataAnalyzer ¶
type DataAnalyzer struct { Conn Connection Schemata Schemata ColumnMap map[string]iop.Column RelationMap map[string]map[string]map[string]Relation // table > column A > column B > relation Options DataAnalyzerOptions }
func NewDataAnalyzer ¶
func NewDataAnalyzer(conn Connection, opts DataAnalyzerOptions) (da *DataAnalyzer, err error)
func (*DataAnalyzer) AnalyzeColumns ¶
func (da *DataAnalyzer) AnalyzeColumns(sampleSize int, includeViews bool) (err error)
func (*DataAnalyzer) GetManyToMany ¶
func (da *DataAnalyzer) GetManyToMany(nonUniqueCols iop.Columns, asString bool) (err error)
func (*DataAnalyzer) GetOneToMany ¶
func (da *DataAnalyzer) GetOneToMany(uniqueCols, nonUniqueCols iop.Columns, asString bool) (err error)
func (*DataAnalyzer) GetOneToOne ¶
func (da *DataAnalyzer) GetOneToOne(uniqueCols iop.Columns, asString bool) (err error)
func (*DataAnalyzer) GetSchemata ¶
func (da *DataAnalyzer) GetSchemata(force bool) (err error)
func (*DataAnalyzer) ProcessRelations ¶
func (da *DataAnalyzer) ProcessRelations() (err error)
func (*DataAnalyzer) ProcessRelationsInteger ¶
func (da *DataAnalyzer) ProcessRelationsInteger() (err error)
func (*DataAnalyzer) ProcessRelationsString ¶
func (da *DataAnalyzer) ProcessRelationsString() (err error)
func (*DataAnalyzer) WriteRelationsYaml ¶
func (da *DataAnalyzer) WriteRelationsYaml(path string) (err error)
type DataAnalyzerOptions ¶
type DbX ¶
type DbX struct {
// contains filtered or unexported fields
}
DbX is db express
type DuckDbConn ¶
DuckDbConn is a Duck DB connection
func (*DuckDbConn) BulkImportFlow ¶ added in v1.2.19
func (*DuckDbConn) CastColumnForSelect ¶ added in v1.2.10
func (conn *DuckDbConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*DuckDbConn) Close ¶
func (conn *DuckDbConn) Close() (err error)
func (*DuckDbConn) Connect ¶
func (conn *DuckDbConn) Connect(timeOut ...int) (err error)
func (*DuckDbConn) ExecContext ¶
func (*DuckDbConn) ExecMultiContext ¶
func (conn *DuckDbConn) ExecMultiContext(ctx context.Context, sqls ...string) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*DuckDbConn) GenerateUpsertSQL ¶
func (conn *DuckDbConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*DuckDbConn) GetURL ¶
func (conn *DuckDbConn) GetURL(newURL ...string) string
GetURL returns the processed URL
func (*DuckDbConn) InsertBatchStream ¶
func (conn *DuckDbConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*DuckDbConn) InsertStream ¶
func (conn *DuckDbConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*DuckDbConn) StreamRowsContext ¶
func (conn *DuckDbConn) StreamRowsContext(ctx context.Context, sql string, options ...map[string]interface{}) (ds *iop.Datastream, err error)
type ManualTransaction ¶
type ManualTransaction struct { Conn Connection // contains filtered or unexported fields }
func (*ManualTransaction) Commit ¶
func (t *ManualTransaction) Commit() (err error)
func (*ManualTransaction) Context ¶
func (t *ManualTransaction) Context() *g.Context
func (*ManualTransaction) ExecContext ¶
func (*ManualTransaction) ExecMultiContext ¶
func (*ManualTransaction) Prepare ¶
func (t *ManualTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
func (*ManualTransaction) QueryContext ¶
func (*ManualTransaction) Rollback ¶
func (t *ManualTransaction) Rollback() (err error)
type ModelDbX ¶
type ModelDbX struct { Ptr interface{} `json:"-"` RowsAffected int `json:"-"` // contains filtered or unexported fields }
ModelDbX is the base for any SQL model
type MongoDBConn ¶ added in v1.1.14
MongoDBConn is a Mongo connection
func (*MongoDBConn) BulkExportFlow ¶ added in v1.1.14
func (conn *MongoDBConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
func (*MongoDBConn) Close ¶ added in v1.1.14
func (conn *MongoDBConn) Close() error
func (*MongoDBConn) Connect ¶ added in v1.1.14
func (conn *MongoDBConn) Connect(timeOut ...int) error
Connect connects to the database
func (*MongoDBConn) ExecContext ¶ added in v1.1.14
func (*MongoDBConn) GetSchemas ¶ added in v1.1.14
func (conn *MongoDBConn) GetSchemas() (data iop.Dataset, err error)
GetSchemas returns schemas
func (*MongoDBConn) GetSchemata ¶ added in v1.1.15
func (conn *MongoDBConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*MongoDBConn) GetTableColumns ¶ added in v1.1.14
func (conn *MongoDBConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
NewTransaction creates a new transaction
func (*MongoDBConn) GetTables ¶ added in v1.1.14
func (conn *MongoDBConn) GetTables(schema string) (data iop.Dataset, err error)
GetSchemas returns schemas
func (*MongoDBConn) Init ¶ added in v1.1.14
func (conn *MongoDBConn) Init() error
Init initiates the object
func (*MongoDBConn) NewTransaction ¶ added in v1.1.14
func (conn *MongoDBConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*MongoDBConn) StreamRowsContext ¶ added in v1.1.14
func (conn *MongoDBConn) StreamRowsContext(ctx context.Context, collectionName string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error)
type MsSQLServerConn ¶
MsSQLServerConn is a Microsoft SQL Server connection
func (*MsSQLServerConn) BcpExport ¶
func (conn *MsSQLServerConn) BcpExport() (err error)
BcpExport exports data to datastream
func (*MsSQLServerConn) BcpImportFile ¶
func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count uint64, err error)
BcpImportFile Import using bcp tool https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15 bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 Limitation: if comma or delimite is in field, it will error. need to use delimiter not in field, or do some other transformation
func (*MsSQLServerConn) BcpImportFileParrallel ¶
func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
BcpImportFileParrallel uses goroutine to import partitioned files
func (*MsSQLServerConn) BulkImportFlow ¶
func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*MsSQLServerConn) BulkImportStream ¶
func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MsSQLServerConn) ConnString ¶ added in v1.2.3
func (conn *MsSQLServerConn) ConnString() string
func (*MsSQLServerConn) Connect ¶ added in v1.2.3
func (conn *MsSQLServerConn) Connect(timeOut ...int) (err error)
func (*MsSQLServerConn) CopyFromAzure ¶
func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
CopyFromAzure uses the COPY INTO Table command from Azure https://docs.microsoft.com/en-us/sql/t-sql/statements/copy-into-transact-sql?view=azure-sqldw-latest
func (*MsSQLServerConn) CopyViaAzure ¶
func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Azure DWH COPY INTO Table command
func (*MsSQLServerConn) GenerateDDL ¶ added in v1.2.15
func (*MsSQLServerConn) GenerateUpsertSQL ¶
func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*MsSQLServerConn) GetURL ¶
func (conn *MsSQLServerConn) GetURL(newURL ...string) string
GetURL returns the processed URL
type MySQLConn ¶
MySQLConn is a MySQL or MariaDB connection
func (*MySQLConn) BulkExportStream ¶
func (conn *MySQLConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
BulkExportStream bulk Export
func (*MySQLConn) BulkImportStream ¶
func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MySQLConn) GenerateDDL ¶ added in v1.2.15
func (*MySQLConn) GenerateUpsertSQL ¶
func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
UPSERT https://vladmihalcea.com/how-do-upsert-and-merge-work-in-oracle-sql-server-postgresql-and-mysql/ GenerateUpsertSQL generates the upsert SQL
func (*MySQLConn) LoadDataInFile ¶
func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
LoadDataInFile Bulk Import
func (*MySQLConn) LoadDataOutFile ¶
func (conn *MySQLConn) LoadDataOutFile(ctx *g.Context, sql string) (stdOutReader io.Reader, err error)
LoadDataOutFile Bulk Export Possible error: ERROR 1227 (42000) at line 1: Access denied; you need (at least one of) the FILE privilege(s) for this operation File privilege needs to be granted to user also the --secure-file-priv option needs to be set properly for it to work. https://stackoverflow.com/questions/9819271/why-is-mysql-innodb-insert-so-slow to improve innodb insert speed
type OracleConn ¶
OracleConn is a Postgres connection
func (*OracleConn) BulkImportStream ¶
func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*OracleConn) ConnString ¶ added in v1.1.14
func (conn *OracleConn) ConnString() string
func (*OracleConn) Connect ¶ added in v1.1.14
func (conn *OracleConn) Connect(timeOut ...int) (err error)
func (*OracleConn) ExecMultiContext ¶
func (conn *OracleConn) ExecMultiContext(ctx context.Context, qs ...string) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*OracleConn) GenerateDDL ¶ added in v1.2.15
func (*OracleConn) GenerateInsertStatement ¶
func (conn *OracleConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*OracleConn) GenerateUpsertSQL ¶
func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*OracleConn) GetTableColumns ¶ added in v1.1.8
func (*OracleConn) SQLLoad ¶
func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
SQLLoad uses sqlldr to Bulk Import cat test1.csv | sqlldr system/oracle@oracle.host:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr cannot import when newline in value. Need to scan for new lines.
func (*OracleConn) SubmitTemplate ¶ added in v1.2.4
func (*OracleConn) Version ¶ added in v1.1.14
func (conn *OracleConn) Version() int
type PostgresConn ¶
PostgresConn is a Postgres connection
func (*PostgresConn) BulkExportStream ¶
func (conn *PostgresConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
BulkExportStream uses the bulk dumping (COPY)
func (*PostgresConn) BulkImportStream ¶
func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*PostgresConn) CastColumnForSelect ¶
func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*PostgresConn) CopyToStdout ¶
func (conn *PostgresConn) CopyToStdout(ctx *g.Context, sql string) (stdOutReader io.Reader, err error)
CopyToStdout Copy TO STDOUT
func (*PostgresConn) GenerateDDL ¶ added in v1.1.4
func (conn *PostgresConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (ddl string, err error)
GenerateDDL generates a DDL based on a dataset
func (*PostgresConn) GenerateUpsertSQL ¶
func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
type PrometheusConn ¶ added in v1.2.2
PrometheusConn is a Prometheus connection
func (*PrometheusConn) BulkExportFlow ¶ added in v1.2.2
func (conn *PrometheusConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
func (*PrometheusConn) Close ¶ added in v1.2.2
func (conn *PrometheusConn) Close() error
func (*PrometheusConn) Connect ¶ added in v1.2.2
func (conn *PrometheusConn) Connect(timeOut ...int) error
Connect connects to the database
func (*PrometheusConn) ExecContext ¶ added in v1.2.2
func (*PrometheusConn) GetSQLColumns ¶ added in v1.2.2
func (conn *PrometheusConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
func (*PrometheusConn) GetSchemas ¶ added in v1.2.2
func (conn *PrometheusConn) GetSchemas() (data iop.Dataset, err error)
GetSchemas returns schemas
func (*PrometheusConn) GetSchemata ¶ added in v1.2.2
func (conn *PrometheusConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*PrometheusConn) GetTableColumns ¶ added in v1.2.2
func (conn *PrometheusConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
NewTransaction creates a new transaction
func (*PrometheusConn) GetTables ¶ added in v1.2.2
func (conn *PrometheusConn) GetTables(schema string) (data iop.Dataset, err error)
GetSchemas returns schemas
func (*PrometheusConn) Init ¶ added in v1.2.2
func (conn *PrometheusConn) Init() error
Init initiates the object
func (*PrometheusConn) NewTransaction ¶ added in v1.2.2
func (conn *PrometheusConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*PrometheusConn) StreamRowsContext ¶ added in v1.2.2
func (conn *PrometheusConn) StreamRowsContext(ctx context.Context, query string, Opts ...map[string]interface{}) (ds *iop.Datastream, err error)
type ProtonConn ¶ added in v1.2.7
ProtonConn is a Proton connection
func (*ProtonConn) BulkImportStream ¶ added in v1.2.7
func (conn *ProtonConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*ProtonConn) ConnString ¶ added in v1.2.7
func (conn *ProtonConn) ConnString() string
func (*ProtonConn) Connect ¶ added in v1.2.7
func (conn *ProtonConn) Connect(timeOut ...int) (err error)
func (*ProtonConn) GenerateDDL ¶ added in v1.2.7
func (conn *ProtonConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*ProtonConn) GenerateInsertStatement ¶ added in v1.2.7
func (conn *ProtonConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*ProtonConn) GenerateUpsertSQL ¶ added in v1.2.7
func (conn *ProtonConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*ProtonConn) GetCount ¶ added in v1.2.14
func (conn *ProtonConn) GetCount(tableFName string) (uint64, error)
GetCount returns count of records
func (*ProtonConn) Init ¶ added in v1.2.7
func (conn *ProtonConn) Init() error
Init initiates the object
func (*ProtonConn) NewTransaction ¶ added in v1.2.7
func (conn *ProtonConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
NewTransaction creates a new transaction
type RedshiftConn ¶
RedshiftConn is a Redshift connection
func (*RedshiftConn) BulkExportFlow ¶
func (conn *RedshiftConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*RedshiftConn) BulkExportStream ¶
func (conn *RedshiftConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
BulkExportStream reads in bulk
func (*RedshiftConn) BulkImportFlow ¶
func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) BulkImportStream ¶
func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) CastColumnForSelect ¶ added in v1.2.10
func (conn *RedshiftConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*RedshiftConn) ConnString ¶
func (conn *RedshiftConn) ConnString() string
func (*RedshiftConn) CopyFromS3 ¶
func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string, columns iop.Columns) (count uint64, err error)
CopyFromS3 uses the COPY INTO Table command from AWS S3
func (*RedshiftConn) GenerateDDL ¶ added in v1.1.4
func (conn *RedshiftConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*RedshiftConn) GenerateUpsertSQL ¶
func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*RedshiftConn) OptimizeTable ¶ added in v1.1.9
func (*RedshiftConn) WarnStlLoadErrors ¶ added in v1.1.9
func (conn *RedshiftConn) WarnStlLoadErrors(err error)
type Result ¶
type Result struct {
// contains filtered or unexported fields
}
func (Result) LastInsertId ¶
func (Result) RowsAffected ¶
type SQLiteConn ¶
SQLiteConn is a Google Big Query connection
func (*SQLiteConn) BulkImportStream ¶
func (conn *SQLiteConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*SQLiteConn) GenerateDDL ¶ added in v1.2.15
func (*SQLiteConn) GenerateUpsertSQL ¶
func (conn *SQLiteConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*SQLiteConn) GetSchemata ¶
func (conn *SQLiteConn) GetSchemata(level SchemataLevel, schemaName string, tableNames ...string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*SQLiteConn) GetURL ¶
func (conn *SQLiteConn) GetURL(newURL ...string) string
GetURL returns the processed URL
type Schema ¶
type Schema struct { Name string `json:"name"` Database string `json:"database"` Tables map[string]Table `json:"tables"` }
Schema represents a schemata schema
type Schemata ¶
type Schemata struct { Databases map[string]Database `json:"databases"` // contains filtered or unexported fields }
Schemata contains the full schema for a connection
func GetSchemataAll ¶
func GetSchemataAll(conn Connection) (schemata Schemata, err error)
GetSchemataAll obtains the schemata for all databases detected
func GetTablesSchemata ¶
func GetTablesSchemata(conn Connection, tableNames ...string) (schemata Schemata, err error)
GetTablesSchemata obtains the schemata for specified tables
func (*Schemata) LoadTablesJSON ¶
LoadTablesJSON loads from a json string
type SchemataLevel ¶ added in v1.2.19
type SchemataLevel string
const ( SchemataLevelSchema SchemataLevel = "schema" SchemataLevelTable SchemataLevel = "table" SchemataLevelColumn SchemataLevel = "column" )
type SnowflakeConn ¶
SnowflakeConn is a Snowflake connection
func (*SnowflakeConn) BulkExportFlow ¶
func (conn *SnowflakeConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*SnowflakeConn) BulkImportFlow ¶
func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*SnowflakeConn) BulkImportStream ¶
func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*SnowflakeConn) CastColumnForSelect ¶ added in v1.1.15
func (conn *SnowflakeConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*SnowflakeConn) ConnString ¶
func (conn *SnowflakeConn) ConnString() string
func (*SnowflakeConn) Connect ¶
func (conn *SnowflakeConn) Connect(timeOut ...int) error
Connect connects to the database
func (*SnowflakeConn) CopyFromAzure ¶
func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyFromS3 ¶
func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
CopyFromS3 uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyToAzure ¶
func (conn *SnowflakeConn) CopyToAzure(tables ...Table) (azPath string, err error)
CopyToAzure exports a query to an Azure location
func (*SnowflakeConn) CopyToS3 ¶
func (conn *SnowflakeConn) CopyToS3(tables ...Table) (s3Path string, err error)
CopyToS3 exports a query to an S3 location
func (*SnowflakeConn) CopyViaAWS ¶
func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAWS uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyViaAzure ¶
func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyViaStage ¶
func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaStage uses the Snowflake COPY INTO Table command https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) GenerateDDL ¶ added in v1.1.4
func (conn *SnowflakeConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*SnowflakeConn) GenerateInsertStatement ¶ added in v1.2.19
func (conn *SnowflakeConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string
CastColumnForSelect casts to the correct target column type
func (*SnowflakeConn) GenerateUpsertSQL ¶
func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*SnowflakeConn) GetColumnsFull ¶
func (conn *SnowflakeConn) GetColumnsFull(tableFName string) (data iop.Dataset, err error)
GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`
func (*SnowflakeConn) GetDatabases ¶
func (conn *SnowflakeConn) GetDatabases() (data iop.Dataset, err error)
GetDatabases returns the list of databases
func (*SnowflakeConn) GetSchemas ¶
func (conn *SnowflakeConn) GetSchemas() (data iop.Dataset, err error)
GetSchemas returns schemas
func (*SnowflakeConn) GetTables ¶
func (conn *SnowflakeConn) GetTables(schema string) (data iop.Dataset, err error)
GetTables returns tables
func (*SnowflakeConn) GetTablesAndViews ¶ added in v1.2.19
func (conn *SnowflakeConn) GetTablesAndViews(schema string) (iop.Dataset, error)
GetTablesAndViews returns tables/views for given schema
func (*SnowflakeConn) GetViews ¶
func (conn *SnowflakeConn) GetViews(schema string) (data iop.Dataset, err error)
GetTables returns tables
func (*SnowflakeConn) StageGET ¶ added in v1.2.14
func (conn *SnowflakeConn) StageGET(internalStagePath, folderPath string) (filePaths []string, err error)
StageGET Copies from a staging location to a local file or folder
func (*SnowflakeConn) StagePUT ¶ added in v1.2.14
func (conn *SnowflakeConn) StagePUT(fileURI string, internalStagePath string) (err error)
StagePUT Copies a local file or folder into a staging location
func (*SnowflakeConn) UnloadViaStage ¶
func (conn *SnowflakeConn) UnloadViaStage(tables ...Table) (filePath string, unloaded int64, err error)
type StarRocksConn ¶
StarRocksConn is a StarRocks connection
func (*StarRocksConn) AddMissingColumns ¶ added in v1.1.8
func (*StarRocksConn) BulkImportFlow ¶
func (conn *StarRocksConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table.
func (*StarRocksConn) ExecContext ¶ added in v1.2.19
func (conn *StarRocksConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*StarRocksConn) GenerateDDL ¶
func (conn *StarRocksConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
GenerateDDL generates a DDL based on a dataset
func (*StarRocksConn) GetDatabases ¶ added in v1.1.9
func (conn *StarRocksConn) GetDatabases() (data iop.Dataset, err error)
GetDatabases returns the list of databases
func (*StarRocksConn) GetURL ¶
func (conn *StarRocksConn) GetURL(newURL ...string) string
GetURL returns the processed URL
func (*StarRocksConn) InsertBatchStream ¶
func (conn *StarRocksConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*StarRocksConn) OptimizeTable ¶ added in v1.1.8
func (*StarRocksConn) StreamLoad ¶
func (conn *StarRocksConn) StreamLoad(feURL, tableFName string, df *iop.Dataflow) (count uint64, err error)
StreamLoad bulk loads https://docs.starrocks.io/docs/loading/StreamLoad/ https://docs.starrocks.io/docs/sql-reference/sql-statements/data-manipulation/STREAM_LOAD/
func (*StarRocksConn) WaitAlterTable ¶ added in v1.1.8
func (conn *StarRocksConn) WaitAlterTable(table Table) (err error)
type StatFieldSQL ¶
type Table ¶
type Table struct { Name string `json:"name"` Schema string `json:"schema"` Database string `json:"database,omitempty"` IsView bool `json:"is_view,omitempty"` // whether is a view SQL string `json:"sql,omitempty"` DDL string `json:"ddl,omitempty"` Dialect dbio.Type `json:"dialect,omitempty"` Columns iop.Columns `json:"columns,omitempty"` Keys TableKeys `json:"keys,omitempty"` Raw string `json:"raw"` // contains filtered or unexported fields }
Table represents a schemata table
func (*Table) AddPrimaryKeyToDDL ¶ added in v1.2.15
AddPrimaryKeyToDDL adds a primary key to the table
func (*Table) Indexes ¶ added in v1.2.15
func (t *Table) Indexes(columns iop.Columns) (indexes []TableIndex)
func (*Table) MarshalJSON ¶ added in v1.2.19
type TableIndex ¶ added in v1.2.15
func (*TableIndex) CreateDDL ¶ added in v1.2.15
func (ti *TableIndex) CreateDDL() string
func (*TableIndex) DropDDL ¶ added in v1.2.15
func (ti *TableIndex) DropDDL() string
type Transaction ¶
type Transaction interface { Connection() Connection Context() *g.Context Commit() (err error) Rollback() (err error) Prepare(query string) (stmt *sql.Stmt, err error) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) }
type TrinoConn ¶ added in v1.1.14
TrinoConn is a Trino connection
func (*TrinoConn) ConnString ¶ added in v1.1.14
func (*TrinoConn) ExecContext ¶ added in v1.1.14
func (conn *TrinoConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*TrinoConn) NewTransaction ¶ added in v1.1.14
func (conn *TrinoConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
type WhereClause ¶
type WhereClause []interface{}
WhereClause is the where clause
func (WhereClause) Args ¶
func (wc WhereClause) Args() []interface{}
Args returns the where clause arguments
func (WhereClause) Clause ¶
func (wc WhereClause) Clause() string
Clause returns the string where clause
Source Files
¶
- analyzer.go
- database.go
- database_bigquery.go
- database_bigtable.go
- database_clickhouse.go
- database_duckdb.go
- database_duckdb_unix.go
- database_mongo.go
- database_mysql.go
- database_oracle.go
- database_postgres.go
- database_prometheus.go
- database_proton.go
- database_redshift.go
- database_snowflake.go
- database_sqlite.go
- database_sqlserver.go
- database_starrocks.go
- database_trino.go
- dbx.go
- schemata.go
- transaction.go