Index ¶
- Constants
- Variables
- func ChangeColumnTypeViaAdd(conn Connection, table Table, col iop.Column) (err error)
- func CleanSQL(conn Connection, sql string) string
- func CommonColumns(colNames1 []string, colNames2 []string) (commCols []string)
- func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
- func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
- func EnsureBinDuckDB(version string) (binPath string, err error)
- func EnsureBinSQLite() (binPath string, err error)
- func GenerateAlterDDL(conn Connection, table Table, newColumns iop.Columns) (bool, error)
- func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Columns, isTemp bool) (ok bool, ddlParts []string, err error)
- func GetQualifierQuote(dialect dbio.Type) string
- func HasVariedCase(text string) bool
- func InsertBatchStream(conn Connection, tx Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
- func InsertStream(conn Connection, tx *BaseTransaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
- func NativeTypeToGeneral(name, dbType string, conn Connection) (colType iop.ColumnType)
- func PK(obj interface{}) (pk []string)
- func ParseColumnName(text string, dialect dbio.Type) (colName string, err error)
- func ParseSQLMultiStatements(sql string, Dialect ...dbio.Type) (sqls g.Strings)
- func QuoteNames(dialect dbio.Type, names ...string) (newNames []string)
- func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns)
- func SplitTableFullName(tableName string) (string, string)
- func TableExists(conn Connection, tableFName string) (exists bool, err error)
- func TestPermissions(conn Connection, tableName string) (err error)
- func UID(obj interface{}) string
- func Upsert(conn Connection, tx Transaction, sourceTable, targetTable string, ...) (count int64, err error)
- type BaseConn
- func (conn *BaseConn) AddMissingColumns(table Table, newCols iop.Columns) (ok bool, err error)
- func (conn *BaseConn) Base() *BaseConn
- func (conn *BaseConn) BaseURL() string
- func (conn *BaseConn) Begin(options ...*sql.TxOptions) (err error)
- func (conn *BaseConn) BeginContext(ctx context.Context, options ...*sql.TxOptions) (err error)
- func (conn *BaseConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportFlowCSV(table Table) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
- func (conn *BaseConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) string
- func (conn *BaseConn) CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string
- func (conn *BaseConn) Close() error
- func (conn *BaseConn) Commit() (err error)
- func (conn *BaseConn) CompareChecksums(tableName string, columns iop.Columns) (err error)
- func (conn *BaseConn) ConnString() string
- func (conn *BaseConn) Connect(timeOut (err error)
- func (conn *BaseConn) Context() *g.Context
- func (conn *BaseConn) CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error)
- func (conn *BaseConn) CreateTemporaryTable(tableName string, cols iop.Columns) (err error)
- func (conn *BaseConn) CurrentDatabase() (dbName string, err error)
- func (conn *BaseConn) Db() *sqlx.DB
- func (conn *BaseConn) DbX() *DbX
- func (conn *BaseConn) DropTable(tableNames ...string) (err error)
- func (conn *BaseConn) DropView(viewNames ...string) (err error)
- func (conn *BaseConn) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecMulti(sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
- func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
- func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)
- func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetColumnsFull(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetCount(tableFName string) (uint64, error)
- func (conn *BaseConn) GetDDL(tableFName string) (string, error)
- func (conn *BaseConn) GetDatabases() (iop.Dataset, error)
- func (conn *BaseConn) GetGormConn(config *gorm.Config) (*gorm.DB, error)
- func (conn *BaseConn) GetIndexes(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetNativeType(col iop.Column) (nativeType string, err error)
- func (conn *BaseConn) GetObjects(schema string, objectType string) (iop.Dataset, error)
- func (conn *BaseConn) GetPrimaryKeys(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetProp(key string) string
- func (conn *BaseConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
- func (conn *BaseConn) GetSchemas() (iop.Dataset, error)
- func (conn *BaseConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
- func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetTables(schema string) (iop.Dataset, error)
- func (conn *BaseConn) GetTemplateValue(path string) (value string)
- func (conn *BaseConn) GetType() dbio.Type
- func (conn *BaseConn) GetURL(newURL ...string) string
- func (conn *BaseConn) GetViews(schema string) (iop.Dataset, error)
- func (conn *BaseConn) Import(data iop.Dataset, tableName string) error
- func (conn *BaseConn) Info() (ci ConnInfo)
- func (conn *BaseConn) Init() (err error)
- func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) Kill() error
- func (conn *BaseConn) LoadTemplates() (err error)
- func (conn *BaseConn) LogSQL(query string, args ...any)
- func (conn *BaseConn) MustExec(sql string, args ...interface{}) (result sql.Result)
- func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
- func (conn *BaseConn) OptimizeTable(table *Table, newColumns iop.Columns, isTemp ...bool) (ok bool, err error)
- func (conn *BaseConn) Prepare(query string) (stmt *sql.Stmt, err error)
- func (conn *BaseConn) ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error)
- func (conn *BaseConn) PropArr() []string
- func (conn *BaseConn) PropArrExclude(exclude ...string) []string
- func (conn *BaseConn) Props() map[string]string
- func (conn *BaseConn) Query(sql string, options[string]interface{}) (data iop.Dataset, err error)
- func (conn *BaseConn) QueryContext(ctx context.Context, sql string, options[string]interface{}) (data iop.Dataset, err error)
- func (conn *BaseConn) Quote(field string, normalize ...bool) string
- func (conn *BaseConn) Rollback() (err error)
- func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)
- func (conn *BaseConn) Schemata() Schemata
- func (conn *BaseConn) Self() Connection
- func (conn *BaseConn) SetProp(key string, val string)
- func (conn *BaseConn) StreamRecords(sql string) (<-chan map[string]interface{}, error)
- func (conn *BaseConn) StreamRows(sql string, options[string]interface{}) (ds *iop.Datastream, err error)
- func (conn *BaseConn) StreamRowsContext(ctx context.Context, query string, options[string]interface{}) (ds *iop.Datastream, err error)
- func (conn *BaseConn) SubmitTemplate(level string, templateMap map[string]string, name string, ...) (data iop.Dataset, err error)
- func (conn *BaseConn) SwapTable(srcTable string, tgtTable string) (err error)
- func (conn *BaseConn) Template() dbio.Template
- func (conn *BaseConn) Tx() Transaction
- func (conn *BaseConn) Unquote(field string) string
- func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
- func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)
- type BaseTransaction
- func (t *BaseTransaction) Commit() (err error)
- func (t *BaseTransaction) Connection() Connection
- func (t *BaseTransaction) Context() *g.Context
- func (t *BaseTransaction) DisableTrigger(tableName, triggerName string) (err error)
- func (t *BaseTransaction) EnableTrigger(tableName, triggerName string) (err error)
- func (t *BaseTransaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (t *BaseTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BaseTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BaseTransaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (t *BaseTransaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (t *BaseTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *BaseTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *BaseTransaction) Rollback() (err error)
- func (t *BaseTransaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)
- func (t *BaseTransaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)
- type BigQueryConn
- func (conn *BigQueryConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *BigQueryConn) Close() error
- func (conn *BigQueryConn) Connect(timeOut error
- func (conn *BigQueryConn) CopyFromGCS(gcsURI string, table Table, dsColumns []iop.Column) error
- func (conn *BigQueryConn) CopyFromLocal(localURI string, table Table, dsColumns []iop.Column) error
- func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error
- func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BigQueryConn) ExportToGCS(sql string, gcsURI string) error
- func (conn *BigQueryConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *BigQueryConn) GetDatabases() (iop.Dataset, error)
- func (conn *BigQueryConn) GetSchemas() (iop.Dataset, error)
- func (conn *BigQueryConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
- func (conn *BigQueryConn) Init() error
- func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) LoadCSVFromReader(table Table, reader io.Reader, dsColumns []iop.Column) error
- func (conn *BigQueryConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, options[string]interface{}) (ds *iop.Datastream, err error)
- func (conn *BigQueryConn) Unload(tables ...Table) (gsPath string, err error)
- type BigTableAction
- type BigTableConn
- func (conn *BigTableConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *BigTableConn) Close() error
- func (conn *BigTableConn) Connect(timeOut error
- func (conn *BigTableConn) ExecContext(ctx context.Context, payload string, args ...interface{}) (result sql.Result, err error)
- func (conn *BigTableConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BigTableConn) GetColumnsFull(tableFName string) (iop.Dataset, error)
- func (conn *BigTableConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
- func (conn *BigTableConn) GetSchemas() (iop.Dataset, error)
- func (conn *BigTableConn) GetSchemata(schemaName string, tableNames ...string) (schemata Schemata, err error)
- func (conn *BigTableConn) GetTables(schema string) (data iop.Dataset, err error)
- func (conn *BigTableConn) GetViews(schema string) (data iop.Dataset, err error)
- func (conn *BigTableConn) Init() error
- func (conn *BigTableConn) InsertBatchStream(table string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigTableConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *BigTableConn) StreamRowsContext(ctx context.Context, table string, options[string]interface{}) (ds *iop.Datastream, err error)
- type BigTableQuery
- type BlankTransaction
- func (t *BlankTransaction) Commit() (err error)
- func (t *BlankTransaction) Connection() Connection
- func (t *BlankTransaction) Context() *g.Context
- func (t *BlankTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BlankTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *BlankTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *BlankTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *BlankTransaction) Rollback() (err error)
- type ClickhouseConn
- func (conn *ClickhouseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *ClickhouseConn) ConnString() string
- func (conn *ClickhouseConn) Connect(timeOut (err error)
- func (conn *ClickhouseConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *ClickhouseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *ClickhouseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *ClickhouseConn) Init() error
- func (conn *ClickhouseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
- type ColumnType
- type ConnInfo
- type Connection
- type DataAnalyzer
- func (da *DataAnalyzer) AnalyzeColumns(sampleSize int, includeViews bool) (err error)
- func (da *DataAnalyzer) GetManyToMany(nonUniqueCols iop.Columns, asString bool) (err error)
- func (da *DataAnalyzer) GetOneToMany(uniqueCols, nonUniqueCols iop.Columns, asString bool) (err error)
- func (da *DataAnalyzer) GetOneToOne(uniqueCols iop.Columns, asString bool) (err error)
- func (da *DataAnalyzer) GetSchemata(force bool) (err error)
- func (da *DataAnalyzer) ProcessRelations() (err error)
- func (da *DataAnalyzer) ProcessRelationsInteger() (err error)
- func (da *DataAnalyzer) ProcessRelationsString() (err error)
- func (da *DataAnalyzer) WriteRelationsYaml(path string) (err error)
- type DataAnalyzerOptions
- type Database
- type DbX
- func (x *DbX) Delete(o interface{}) (cnt int, err error)
- func (x *DbX) Get(o interface{}, fields ...string) (err error)
- func (x *DbX) Insert(o interface{}, fields ...string) (err error)
- func (x *DbX) Select(o interface{}, fields ...string) (err error)
- func (x *DbX) TableName(o interface{}) (name string)
- func (x *DbX) Update(o interface{}, fields ...string) (cnt int, err error)
- func (x *DbX) Upsert(o interface{}, fields ...string) (cnt int, err error)
- func (x *DbX) Where(where ...interface{}) *DbX
- type DuckDbConn
- func (conn *DuckDbConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *DuckDbConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *DuckDbConn) Close() error
- func (conn *DuckDbConn) Connect(timeOut (err error)
- func (conn *DuckDbConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *DuckDbConn) ExecMultiContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *DuckDbConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *DuckDbConn) GetURL(newURL ...string) string
- func (conn *DuckDbConn) Init() error
- func (conn *DuckDbConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *DuckDbConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *DuckDbConn) StreamRowsContext(ctx context.Context, sql string, options[string]interface{}) (ds *iop.Datastream, err error)
- type ManualTransaction
- func (t *ManualTransaction) Commit() (err error)
- func (t *ManualTransaction) Context() *g.Context
- func (t *ManualTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *ManualTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *ManualTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *ManualTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *ManualTransaction) Rollback() (err error)
- type ModelDbX
- func (m *ModelDbX) Bind(bindFunc func(p interface{}) error, objPtr interface{}) (err error)
- func (m *ModelDbX) Delete(db *sqlx.DB) (err error)
- func (m *ModelDbX) Fields() (fields []string)
- func (m *ModelDbX) Get(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Insert(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Rec() map[string]interface{}
- func (m *ModelDbX) Select(db *sqlx.DB, objPtr interface{}, fields ...string) (err error)
- func (m *ModelDbX) TableName(objPtr interface{}) string
- func (m *ModelDbX) Update(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Values(fields []string) (values []interface{}, err error)
- func (m *ModelDbX) Where(where ...interface{}) *ModelDbX
- type MongoDBConn
- func (conn *MongoDBConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *MongoDBConn) Close() error
- func (conn *MongoDBConn) Connect(timeOut error
- func (conn *MongoDBConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *MongoDBConn) GetSchemas() (data iop.Dataset, err error)
- func (conn *MongoDBConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
- func (conn *MongoDBConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
- func (conn *MongoDBConn) GetTables(schema string) (data iop.Dataset, err error)
- func (conn *MongoDBConn) Init() error
- func (conn *MongoDBConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *MongoDBConn) StreamRowsContext(ctx context.Context, collectionName string, Opts[string]interface{}) (ds *iop.Datastream, err error)
- type MsSQLServerConn
- func (conn *MsSQLServerConn) BcpExport() (err error)
- func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count uint64, err error)
- func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) ConnString() string
- func (conn *MsSQLServerConn) Connect(timeOut (err error)
- func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
- func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *MsSQLServerConn) GetURL(newURL ...string) string
- func (conn *MsSQLServerConn) Init() error
- type MySQLConn
- func (conn *MySQLConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
- func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *MySQLConn) GetURL(newURL ...string) string
- func (conn *MySQLConn) Init() error
- func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) LoadDataOutFile(ctx *g.Context, sql string) (stdOutReader io.Reader, err error)
- type OracleConn
- func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *OracleConn) ConnString() string
- func (conn *OracleConn) Connect(timeOut (err error)
- func (conn *OracleConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *OracleConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
- func (conn *OracleConn) Init() error
- func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *OracleConn) SubmitTemplate(level string, templateMap map[string]string, name string, ...) (data iop.Dataset, err error)
- func (conn *OracleConn) Version() int
- type Pool
- type PostgresConn
- func (conn *PostgresConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
- func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *PostgresConn) CopyToStdout(ctx *g.Context, sql string) (stdOutReader io.Reader, err error)
- func (conn *PostgresConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *PostgresConn) Init() error
- type PrometheusConn
- func (conn *PrometheusConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *PrometheusConn) Close() error
- func (conn *PrometheusConn) Connect(timeOut error
- func (conn *PrometheusConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *PrometheusConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
- func (conn *PrometheusConn) GetSchemas() (data iop.Dataset, err error)
- func (conn *PrometheusConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
- func (conn *PrometheusConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
- func (conn *PrometheusConn) GetTables(schema string) (data iop.Dataset, err error)
- func (conn *PrometheusConn) Init() error
- func (conn *PrometheusConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- func (conn *PrometheusConn) StreamRowsContext(ctx context.Context, query string, Opts[string]interface{}) (ds *iop.Datastream, err error)
- type ProtonConn
- func (conn *ProtonConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *ProtonConn) ConnString() string
- func (conn *ProtonConn) Connect(timeOut (err error)
- func (conn *ProtonConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *ProtonConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *ProtonConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *ProtonConn) GetCount(tableFName string) (uint64, error)
- func (conn *ProtonConn) Init() error
- func (conn *ProtonConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
- type RedshiftConn
- func (conn *RedshiftConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *RedshiftConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
- func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *RedshiftConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *RedshiftConn) ConnString() string
- func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string, columns iop.Columns) (count uint64, err error)
- func (conn *RedshiftConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *RedshiftConn) Init() error
- func (conn *RedshiftConn) OptimizeTable(table *Table, newColumns iop.Columns, isTemp ...bool) (ok bool, err error)
- func (conn *RedshiftConn) Unload(ctx *g.Context, tables ...Table) (s3Path string, err error)
- func (conn *RedshiftConn) WarnStlLoadErrors(err error)
- type Relation
- type Result
- type SQLiteConn
- func (conn *SQLiteConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *SQLiteConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *SQLiteConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
- func (conn *SQLiteConn) GetURL(newURL ...string) string
- func (conn *SQLiteConn) Init() error
- type Schema
- type Schemata
- func (s *Schemata) Columns(filters ...string) map[string]iop.Column
- func (s *Schemata) Database() Database
- func (s *Schemata) Filtered(columnLevel bool, filters ...string) (ns Schemata)
- func (s *Schemata) LoadTablesJSON(payload string) error
- func (s *Schemata) Tables(filters ...string) map[string]Table
- type SnowflakeConn
- func (conn *SnowflakeConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
- func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *SnowflakeConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *SnowflakeConn) ConnString() string
- func (conn *SnowflakeConn) Connect(timeOut error
- func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
- func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
- func (conn *SnowflakeConn) CopyToAzure(tables ...Table) (azPath string, err error)
- func (conn *SnowflakeConn) CopyToS3(tables ...Table) (s3Path string, err error)
- func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
- func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *SnowflakeConn) GetColumnsFull(tableFName string) (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetDatabases() (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetSchemas() (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetTables(schema string) (data iop.Dataset, err error)
- func (conn *SnowflakeConn) GetViews(schema string) (data iop.Dataset, err error)
- func (conn *SnowflakeConn) Init() error
- func (conn *SnowflakeConn) StageGET(internalStagePath, folderPath string) (filePaths []string, err error)
- func (conn *SnowflakeConn) StagePUT(fileURI string, internalStagePath string) (err error)
- func (conn *SnowflakeConn) UnloadViaStage(tables ...Table) (filePath string, err error)
- type StarRocksConn
- func (conn *StarRocksConn) AddMissingColumns(table Table, newCols iop.Columns) (ok bool, err error)
- func (conn *StarRocksConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *StarRocksConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
- func (conn *StarRocksConn) GetDatabases() (data iop.Dataset, err error)
- func (conn *StarRocksConn) GetURL(newURL ...string) string
- func (conn *StarRocksConn) Init() error
- func (conn *StarRocksConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *StarRocksConn) OptimizeTable(table *Table, newColumns iop.Columns, isTemp ...bool) (ok bool, err error)
- func (conn *StarRocksConn) StreamLoad(feURL, tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *StarRocksConn) WaitAlterTable(table Table) (err error)
- type StatFieldSQL
- type Table
- func (t *Table) Clone() Table
- func (t *Table) ColumnsMap() map[string]iop.Column
- func (t *Table) DatabaseQ() string
- func (t *Table) FDQN() string
- func (t *Table) FullName() string
- func (t *Table) IsQuery() bool
- func (t *Table) NameQ() string
- func (t *Table) SchemaQ() string
- func (t *Table) Select(limit, offset int, fields ...string) (sql string)
- func (t *Table) SetKeys(pkCols []string, updateCol string, otherKeys TableKeys) error
- type TableKeys
- type Transaction
- type TrinoConn
- func (conn *TrinoConn) ConnString() string
- func (conn *TrinoConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *TrinoConn) Init() error
- func (conn *TrinoConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
- type User
- type WhereClause
Constants ¶
const RelationManyToMany = "many_to_many"
const RelationManyToOne = "many_to_one"
const RelationOneToMany = "one_to_many"
const RelationOneToOne = "one_to_one"
Variables ¶
var Debug = false
Debug prints queries when true
var DuckDbFileCmd = map[string]*exec.Cmd{}
var DuckDbFileContext = map[string]*g.Context{} // so that collision doesn't happen
var DuckDbMux = sync.Mutex{}
var DuckDbUseTempFile = false
var DuckDbVersion = "1.0.0"
var InferDBStream = false
InferDBStream may need to be `true`, since precision and scale is not guaranteed. If `false`, will use the database stream source schema
var PartitionByColumn = func(conn Connection, table Table, c string, p int) ([]Table, error) { return []Table{table}, nil }
var PartitionByOffset = func(conn Connection, table Table, l int) ([]Table, error) { return []Table{table}, nil }
var SQLiteVersion = "3.41"
var ( // UseBulkExportFlowCSV to use BulkExportFlowCSV UseBulkExportFlowCSV = false )
Functions ¶
func ChangeColumnTypeViaAdd ¶
func ChangeColumnTypeViaAdd(conn Connection, table Table, col iop.Column) (err error)
ChangeColumnTypeViaAdd swaps a new column with the old in order to change the type need to use this with snowflake when changing from date to string, or number to string
func CleanSQL ¶
func CleanSQL(conn Connection, sql string) string
CleanSQL removes creds from the query
func CommonColumns ¶
CommonColumns return common columns
func CopyFromAzure ¶
func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure
func CopyFromS3 ¶
func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
func EnsureBinDuckDB ¶
EnsureBinDuckDB ensures duckdb binary exists if missing, downloads and uses
func EnsureBinSQLite ¶
EnsureBinSQLite ensures sqlite binary exists if missing, downloads and uses
func GenerateAlterDDL ¶
GenerateAlterDDL generate a DDL based on a dataset
func GetOptimizeTableStatements ¶ added in v1.1.8
func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Columns, isTemp bool) (ok bool, ddlParts []string, err error)
GetOptimizeTableStatements analyzes the table and alters the table with the columns data type based on its analysis result if table is missing, it is created with a new DDl Hole in this: will truncate data points, since it is based only on new data being inserted... would need a complete stats of the target table to properly optimize.
func GetQualifierQuote ¶
func HasVariedCase ¶
func InsertBatchStream ¶
func InsertBatchStream(conn Connection, tx Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func InsertStream ¶
func InsertStream(conn Connection, tx *BaseTransaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream inserts a stream
func NativeTypeToGeneral ¶
func NativeTypeToGeneral(name, dbType string, conn Connection) (colType iop.ColumnType)
func ParseColumnName ¶
func ParseSQLMultiStatements ¶
ParseSQLMultiStatements splits a sql text into statements typically by a ';'
func QuoteNames ¶ added in v1.1.8
func SQLColumns ¶
func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns)
SQLColumns returns the columns from database ColumnType
func SplitTableFullName ¶
SplitTableFullName retrusn the schema / table name
func TableExists ¶
func TableExists(conn Connection, tableFName string) (exists bool, err error)
TableExists returns true if the table exists
func TestPermissions ¶
func TestPermissions(conn Connection, tableName string) (err error)
TestPermissions tests the needed permissions in a given connection
func Upsert ¶
func Upsert(conn Connection, tx Transaction, sourceTable, targetTable string, pkFields []string) (count int64, err error)
Upsert upserts from source table into target table
Types ¶
type BaseConn ¶
type BaseConn struct { Connection URL string Type dbio.Type // the type of database for sqlx: postgres, mysql, sqlite Data iop.Dataset Log []string // contains filtered or unexported fields }
BaseConn is a database connection
func (*BaseConn) AddMissingColumns ¶ added in v1.1.8
func (*BaseConn) BeginContext ¶
BeginContext starts a connection wide transaction
func (*BaseConn) BulkExportFlow ¶
BulkExportFlow creates a dataflow from a sql query
func (*BaseConn) BulkExportFlowCSV ¶
BulkExportFlowCSV creates a dataflow from a sql query, using CSVs
func (*BaseConn) BulkExportStream ¶
func (conn *BaseConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
BulkExportStream streams the rows in bulk
func (*BaseConn) BulkImportFlow ¶
BulkImportFlow imports the streams rows in bulk concurrently using channels
func (*BaseConn) BulkImportStream ¶
func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream import the stream rows in bulk
func (*BaseConn) CastColumnForSelect ¶
CastColumnForSelect casts to the correct target column type
func (*BaseConn) CastColumnsForSelect ¶
CastColumnsForSelect cast the source columns into the target Column types
func (*BaseConn) CompareChecksums ¶
CompareChecksums compares the checksum values from the database side to the checkum values from the StreamProcessor
func (*BaseConn) ConnString ¶
ConnString returns the connection string needed for connection
func (*BaseConn) CreateTable ¶
CreateTable creates a new table based on provided columns `tableName` should have 'schema.table' format
func (*BaseConn) CreateTemporaryTable ¶
CreateTemporaryTable creates a temp table based on provided columns
func (*BaseConn) CurrentDatabase ¶
CurrentDatabase returns the name of the current database
func (*BaseConn) ExecContext ¶
func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BaseConn) ExecMultiContext ¶
func (conn *BaseConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*BaseConn) GenerateDDL ¶
GenerateDDL genrate a DDL based on a dataset
func (*BaseConn) GenerateInsertStatement ¶
func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*BaseConn) GenerateUpsertExpressions ¶
func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
GenerateUpsertExpressions returns a map with needed expressions
func (*BaseConn) GenerateUpsertSQL ¶
func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL returns a sql for upsert
func (*BaseConn) GetAnalysis ¶
func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)
GetAnalysis runs an analysis
func (*BaseConn) GetColumnStats ¶
func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
GetColumnStats analyzes the table and returns the column statistics
func (*BaseConn) GetColumns ¶
func (*BaseConn) GetColumnsFull ¶
GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`
func (*BaseConn) GetDatabases ¶
GetDatabases returns databases for given connection
func (*BaseConn) GetGormConn ¶
GetGormConn returns the gorm db connection
func (*BaseConn) GetIndexes ¶
GetIndexes returns indexes for given table.
func (*BaseConn) GetNativeType ¶
GetNativeType returns the native column type from generic
func (*BaseConn) GetObjects ¶
GetObjects returns objects (tables or views) for given schema `objectType` can be either 'table', 'view' or 'all'
func (*BaseConn) GetPrimaryKeys ¶
GetPrimaryKeys returns primark keys for given table.
func (*BaseConn) GetSQLColumns ¶
GetSQLColumns return columns from a sql query result
func (*BaseConn) GetSchemas ¶
GetSchemas returns schemas
func (*BaseConn) GetSchemata ¶
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*BaseConn) GetTableColumns ¶
func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
GetColumns returns columns for given table. `tableFName` should include schema and table, example: `schema1.table2` fields should be `column_name|data_type`
func (*BaseConn) GetTemplateValue ¶
GetTemplateValue returns the value of the path
func (*BaseConn) InsertBatchStream ¶
func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BaseConn) InsertStream ¶
InsertStream inserts a stream into a table
func (*BaseConn) LoadTemplates ¶
LoadTemplates loads the appropriate yaml template
func (*BaseConn) MustExec ¶
MustExec execs the query using e and panics if there was an error. Any placeholder parameters are replaced with supplied args.
func (*BaseConn) NewTransaction ¶
func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
NewTransaction creates a new transaction
func (*BaseConn) OptimizeTable ¶
func (*BaseConn) ProcessTemplate ¶
func (conn *BaseConn) ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error)
ProcessTemplate processes a template SQL text at a given level
func (*BaseConn) PropArrExclude ¶ added in v1.2.14
func (*BaseConn) Query ¶
func (conn *BaseConn) Query(sql string, options[string]interface{}) (data iop.Dataset, err error)
Query runs a sql query, returns `result`, `error`
func (*BaseConn) QueryContext ¶
func (conn *BaseConn) QueryContext(ctx context.Context, sql string, options[string]interface{}) (data iop.Dataset, err error)
QueryContext runs a sql query with ctx, returns `result`, `error`
func (*BaseConn) RunAnalysis ¶
func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)
RunAnalysis runs an analysis
func (*BaseConn) Self ¶
func (conn *BaseConn) Self() Connection
Self returns the respective connection Instance This is useful to refer back to a subclass method from the superclass level. (Aka overloading)
func (*BaseConn) StreamRecords ¶
StreamRecords the records of a sql query, returns `result`, `error`
func (*BaseConn) StreamRows ¶
func (conn *BaseConn) StreamRows(sql string, options[string]interface{}) (ds *iop.Datastream, err error)
StreamRows the rows of a sql query, returns `result`, `error`
func (*BaseConn) StreamRowsContext ¶
func (conn *BaseConn) StreamRowsContext(ctx context.Context, query string, options[string]interface{}) (ds *iop.Datastream, err error)
StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`
func (*BaseConn) SubmitTemplate ¶ added in v1.2.4
func (*BaseConn) Upsert ¶
func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types
func (*BaseConn) ValidateColumnNames ¶
func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)
ValidateColumnNames verifies that source fields are present in the target table It will return quoted field names as `newColNames`, the same length as `colNames`
type BaseTransaction ¶
type BaseTransaction struct { Tx *sqlx.Tx Conn Connection // contains filtered or unexported fields }
BaseTransaction is a database transaction
func (*BaseTransaction) Commit ¶
func (t *BaseTransaction) Commit() (err error)
func (*BaseTransaction) Connection ¶
func (t *BaseTransaction) Connection() Connection
Connection return the connection
func (*BaseTransaction) Context ¶
func (t *BaseTransaction) Context() *g.Context
Commit commits connection wide transaction
func (*BaseTransaction) DisableTrigger ¶
func (t *BaseTransaction) DisableTrigger(tableName, triggerName string) (err error)
DisableTrigger disables a trigger
func (*BaseTransaction) EnableTrigger ¶
func (t *BaseTransaction) EnableTrigger(tableName, triggerName string) (err error)
EnableTrigger enables a trigger
func (*BaseTransaction) Exec ¶
func (t *BaseTransaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)
Exec runs a sql query, returns `error`
func (*BaseTransaction) ExecContext ¶
func (t *BaseTransaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BaseTransaction) ExecMultiContext ¶
func (t *BaseTransaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*BaseTransaction) InsertBatchStream ¶
func (t *BaseTransaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BaseTransaction) InsertStream ¶
func (t *BaseTransaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream inserts a stream into a table
func (*BaseTransaction) Prepare ¶
func (t *BaseTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
Prepare prepares the statement
func (*BaseTransaction) QueryContext ¶
func (t *BaseTransaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
QueryContext queries rows
func (*BaseTransaction) Rollback ¶
func (t *BaseTransaction) Rollback() (err error)
Rollback rolls back connection wide transaction
func (*BaseTransaction) Upsert ¶
func (t *BaseTransaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)
Upsert does an upsert from source table into target table
func (*BaseTransaction) UpsertStream ¶
func (t *BaseTransaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)
UpsertStream inserts a stream into a table in batch
type BigQueryConn ¶
type BigQueryConn struct { BaseConn URL string Client *bigquery.Client ProjectID string DatasetID string Location string Datasets []string Mux sync.Mutex }
BigQueryConn is a Google Big Query connection
func (*BigQueryConn) BulkExportFlow ¶
func (conn *BigQueryConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*BigQueryConn) BulkImportFlow ¶
func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in GCS and then use the COPY command.
func (*BigQueryConn) BulkImportStream ¶
func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) CastColumnForSelect ¶
func (conn *BigQueryConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*BigQueryConn) Connect ¶
func (conn *BigQueryConn) Connect(timeOut error
Connect connects to the database
func (*BigQueryConn) CopyFromGCS ¶
func (*BigQueryConn) CopyFromLocal ¶
CopyFromGCS into bigquery from google storage
func (*BigQueryConn) CopyToGCS ¶
func (conn *BigQueryConn) CopyToGCS(table Table, gcsURI string) error
func (*BigQueryConn) ExecContext ¶
func (*BigQueryConn) ExportToGCS ¶
func (conn *BigQueryConn) ExportToGCS(sql string, gcsURI string) error
CopyToGCS Copy table to gc storage
func (*BigQueryConn) GenerateDDL ¶ added in v1.1.4
func (conn *BigQueryConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*BigQueryConn) GenerateUpsertSQL ¶
func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*BigQueryConn) GetDatabases ¶
func (conn *BigQueryConn) GetDatabases() (iop.Dataset, error)
GetDatabases returns databases
func (*BigQueryConn) GetSchemas ¶
func (conn *BigQueryConn) GetSchemas() (iop.Dataset, error)
GetSchemas returns schemas
func (*BigQueryConn) GetSchemata ¶
func (conn *BigQueryConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*BigQueryConn) InsertBatchStream ¶
func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BigQueryConn) InsertStream ¶
func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) LoadCSVFromReader ¶
func (conn *BigQueryConn) LoadCSVFromReader(table Table, reader io.Reader, dsColumns []iop.Column) error
LoadCSVFromReader demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) NewTransaction ¶
func (conn *BigQueryConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*BigQueryConn) StreamRowsContext ¶
func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, options[string]interface{}) (ds *iop.Datastream, err error)
type BigTableAction ¶
type BigTableAction string
const BTCreateColumnFamily BigTableAction = "create_column_family"
const BTCreateTable BigTableAction = "create_table"
const BTDeleteTable BigTableAction = "delete_table"
const BTTableInfo BigTableAction = "table_info"
type BigTableConn ¶
type BigTableConn struct { BaseConn URL string Client *bigtable.Client ProjectID string InstanceID string Location string }
BigTableConn is a Google Big Query connection
func (*BigTableConn) BulkExportFlow ¶
func (conn *BigTableConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
func (*BigTableConn) Connect ¶
func (conn *BigTableConn) Connect(timeOut error
Connect connects to the database
func (*BigTableConn) ExecContext ¶
func (*BigTableConn) GetColumns ¶
func (*BigTableConn) GetColumnsFull ¶
func (conn *BigTableConn) GetColumnsFull(tableFName string) (iop.Dataset, error)
func (*BigTableConn) GetSQLColumns ¶
func (conn *BigTableConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
GetTables returns tables for given schema
func (*BigTableConn) GetSchemas ¶
func (conn *BigTableConn) GetSchemas() (iop.Dataset, error)
func (*BigTableConn) GetSchemata ¶
func (conn *BigTableConn) GetSchemata(schemaName string, tableNames ...string) (schemata Schemata, err error)
func (*BigTableConn) GetTables ¶
func (conn *BigTableConn) GetTables(schema string) (data iop.Dataset, err error)
func (*BigTableConn) GetViews ¶
func (conn *BigTableConn) GetViews(schema string) (data iop.Dataset, err error)
GetTables returns tables for given schema
func (*BigTableConn) InsertBatchStream ¶
func (conn *BigTableConn) InsertBatchStream(table string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BigTableConn) NewTransaction ¶
func (conn *BigTableConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*BigTableConn) StreamRowsContext ¶
func (conn *BigTableConn) StreamRowsContext(ctx context.Context, table string, options[string]interface{}) (ds *iop.Datastream, err error)
type BigTableQuery ¶
type BigTableQuery struct { Action BigTableAction `json:"action"` Table string `json:"table"` ColumnFamilies []string `json:"column_family"` }
type BlankTransaction ¶
type BlankTransaction struct { Conn Connection // contains filtered or unexported fields }
func (*BlankTransaction) Commit ¶
func (t *BlankTransaction) Commit() (err error)
func (*BlankTransaction) Connection ¶
func (t *BlankTransaction) Connection() Connection
func (*BlankTransaction) Context ¶
func (t *BlankTransaction) Context() *g.Context
func (*BlankTransaction) ExecContext ¶
func (*BlankTransaction) ExecMultiContext ¶
func (*BlankTransaction) Prepare ¶
func (t *BlankTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
func (*BlankTransaction) QueryContext ¶
func (*BlankTransaction) Rollback ¶
func (t *BlankTransaction) Rollback() (err error)
type ClickhouseConn ¶
ClickhouseConn is a Clikchouse connection
func (*ClickhouseConn) BulkImportStream ¶
func (conn *ClickhouseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*ClickhouseConn) ConnString ¶
func (conn *ClickhouseConn) ConnString() string
func (*ClickhouseConn) Connect ¶ added in v1.1.12
func (conn *ClickhouseConn) Connect(timeOut (err error)
func (*ClickhouseConn) GenerateDDL ¶ added in v1.1.4
func (conn *ClickhouseConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*ClickhouseConn) GenerateInsertStatement ¶
func (conn *ClickhouseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*ClickhouseConn) GenerateUpsertSQL ¶
func (conn *ClickhouseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*ClickhouseConn) NewTransaction ¶
func (conn *ClickhouseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
NewTransaction creates a new transaction
type ColumnType ¶
type Connection ¶
type Connection interface { Base() *BaseConn BaseURL() string Begin(options ...*sql.TxOptions) error BeginContext(ctx context.Context, options ...*sql.TxOptions) error BulkExportFlow(table Table) (*iop.Dataflow, error) BulkExportStream(table Table) (*iop.Datastream, error) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error) CastColumnForSelect(srcColumn iop.Column, tgtColumn iop.Column) string CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string Close() error Commit() error CompareChecksums(tableName string, columns iop.Columns) (err error) Connect(timeOut error ConnString() string Context() *g.Context CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error) CreateTemporaryTable(tableName string, cols iop.Columns) (err error) CurrentDatabase() (string, error) Db() *sqlx.DB DbX() *DbX DropTable(...string) error DropView(...string) error Exec(sql string, args ...interface{}) (result sql.Result, err error) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) ExecMulti(sql string, args ...interface{}) (result sql.Result, err error) ExecMultiContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error) GenerateInsertStatement(tableName string, fields []string, numRows int) string GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error) GetAnalysis(string, map[string]interface{}) (string, error) GetColumns(tableFName string, fields ...string) (iop.Columns, error) GetColumnsFull(string) (iop.Dataset, error) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error) GetCount(string) (uint64, error) GetDatabases() (iop.Dataset, error) GetDDL(string) (string, error) GetGormConn(config *gorm.Config) (*gorm.DB, error) GetIndexes(string) (iop.Dataset, error) GetNativeType(col iop.Column) (nativeType string, err error) GetPrimaryKeys(string) (iop.Dataset, error) GetProp(string) string GetSchemas() (iop.Dataset, error) GetSchemata(schemaName string, tableNames ...string) (Schemata, error) GetSQLColumns(table Table) (columns iop.Columns, err error) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error) GetTables(string) (iop.Dataset, error) GetTemplateValue(path string) (value string) GetType() dbio.Type GetURL(newURL ...string) string GetViews(string) (iop.Dataset, error) Info() ConnInfo Init() error InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error) Kill() error LoadTemplates() error MustExec(sql string, args ...interface{}) (result sql.Result) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error) OptimizeTable(table *Table, columns iop.Columns, isTemp ...bool) (ok bool, err error) Prepare(query string) (stmt *sql.Stmt, err error) ProcessTemplate(level, text string, values map[string]interface{}) (sql string, err error) Props() map[string]string PropsArr() []string Query(sql string, options[string]interface{}) (iop.Dataset, error) QueryContext(ctx context.Context, sql string, options[string]interface{}) (iop.Dataset, error) Quote(field string, normalize ...bool) string RenameTable(table string, newTable string) (err error) Rollback() error RunAnalysis(string, map[string]interface{}) (iop.Dataset, error) Schemata() Schemata Self() Connection SetProp(string, string) StreamRecords(sql string) (<-chan map[string]interface{}, error) StreamRows(sql string, options[string]interface{}) (*iop.Datastream, error) StreamRowsContext(ctx context.Context, sql string, options[string]interface{}) (ds *iop.Datastream, err error) SubmitTemplate(level string, templateMap map[string]string, name string, values map[string]interface{}) (data iop.Dataset, err error) SwapTable(srcTable string, tgtTable string) (err error) Template() dbio.Template Tx() Transaction Unquote(string) string Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error) ValidateColumnNames(tgtColName []string, colNames []string, quote bool) (newColNames []string, err error) AddMissingColumns(table Table, newCols iop.Columns) (ok bool, err error) }
Connection is the Base interface for Connections
func Clone ¶
func Clone(conn Connection) (newConn Connection, err error)
func NewConn ¶
func NewConn(URL string, props ...string) (Connection, error)
NewConn return the most proper connection for a given database
func NewConnContext ¶
NewConnContext return the most proper connection for a given database with context props are provided as `"Prop1=Value1", "Prop2=Value2", ...`
type DataAnalyzer ¶
type DataAnalyzer struct { Conn Connection Schemata Schemata ColumnMap map[string]iop.Column RelationMap map[string]map[string]map[string]Relation // table > column A > column B > relation Options DataAnalyzerOptions }
func NewDataAnalyzer ¶
func NewDataAnalyzer(conn Connection, opts DataAnalyzerOptions) (da *DataAnalyzer, err error)
func (*DataAnalyzer) AnalyzeColumns ¶
func (da *DataAnalyzer) AnalyzeColumns(sampleSize int, includeViews bool) (err error)
func (*DataAnalyzer) GetManyToMany ¶
func (da *DataAnalyzer) GetManyToMany(nonUniqueCols iop.Columns, asString bool) (err error)
func (*DataAnalyzer) GetOneToMany ¶
func (da *DataAnalyzer) GetOneToMany(uniqueCols, nonUniqueCols iop.Columns, asString bool) (err error)
func (*DataAnalyzer) GetOneToOne ¶
func (da *DataAnalyzer) GetOneToOne(uniqueCols iop.Columns, asString bool) (err error)
func (*DataAnalyzer) GetSchemata ¶
func (da *DataAnalyzer) GetSchemata(force bool) (err error)
func (*DataAnalyzer) ProcessRelations ¶
func (da *DataAnalyzer) ProcessRelations() (err error)
func (*DataAnalyzer) ProcessRelationsInteger ¶
func (da *DataAnalyzer) ProcessRelationsInteger() (err error)
func (*DataAnalyzer) ProcessRelationsString ¶
func (da *DataAnalyzer) ProcessRelationsString() (err error)
func (*DataAnalyzer) WriteRelationsYaml ¶
func (da *DataAnalyzer) WriteRelationsYaml(path string) (err error)
type DataAnalyzerOptions ¶
type DbX ¶
type DbX struct {
// contains filtered or unexported fields
DbX is db express
type DuckDbConn ¶
DuckDbConn is a Duck DB connection
func (*DuckDbConn) BulkImportStream ¶
func (conn *DuckDbConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*DuckDbConn) CastColumnForSelect ¶ added in v1.2.10
func (conn *DuckDbConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*DuckDbConn) Connect ¶
func (conn *DuckDbConn) Connect(timeOut (err error)
func (*DuckDbConn) ExecContext ¶
func (*DuckDbConn) ExecMultiContext ¶
func (conn *DuckDbConn) ExecMultiContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*DuckDbConn) GenerateUpsertSQL ¶
func (conn *DuckDbConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*DuckDbConn) GetURL ¶
func (conn *DuckDbConn) GetURL(newURL ...string) string
GetURL returns the processed URL
func (*DuckDbConn) InsertBatchStream ¶
func (conn *DuckDbConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*DuckDbConn) InsertStream ¶
func (conn *DuckDbConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*DuckDbConn) StreamRowsContext ¶
func (conn *DuckDbConn) StreamRowsContext(ctx context.Context, sql string, options[string]interface{}) (ds *iop.Datastream, err error)
type ManualTransaction ¶
type ManualTransaction struct { Conn Connection // contains filtered or unexported fields }
func (*ManualTransaction) Commit ¶
func (t *ManualTransaction) Commit() (err error)
func (*ManualTransaction) Context ¶
func (t *ManualTransaction) Context() *g.Context
func (*ManualTransaction) ExecContext ¶
func (*ManualTransaction) ExecMultiContext ¶
func (*ManualTransaction) Prepare ¶
func (t *ManualTransaction) Prepare(query string) (stmt *sql.Stmt, err error)
func (*ManualTransaction) QueryContext ¶
func (*ManualTransaction) Rollback ¶
func (t *ManualTransaction) Rollback() (err error)
type ModelDbX ¶
type ModelDbX struct { Ptr interface{} `json:"-"` RowsAffected int `json:"-"` // contains filtered or unexported fields }
ModelDbX is the base for any SQL model
type MongoDBConn ¶ added in v1.1.14
MongoDBConn is a Mongo connection
func (*MongoDBConn) BulkExportFlow ¶ added in v1.1.14
func (conn *MongoDBConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
func (*MongoDBConn) Close ¶ added in v1.1.14
func (conn *MongoDBConn) Close() error
func (*MongoDBConn) Connect ¶ added in v1.1.14
func (conn *MongoDBConn) Connect(timeOut error
Connect connects to the database
func (*MongoDBConn) ExecContext ¶ added in v1.1.14
func (*MongoDBConn) GetSchemas ¶ added in v1.1.14
func (conn *MongoDBConn) GetSchemas() (data iop.Dataset, err error)
GetSchemas returns schemas
func (*MongoDBConn) GetSchemata ¶ added in v1.1.15
func (conn *MongoDBConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*MongoDBConn) GetTableColumns ¶ added in v1.1.14
func (conn *MongoDBConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
NewTransaction creates a new transaction
func (*MongoDBConn) GetTables ¶ added in v1.1.14
func (conn *MongoDBConn) GetTables(schema string) (data iop.Dataset, err error)
GetSchemas returns schemas
func (*MongoDBConn) Init ¶ added in v1.1.14
func (conn *MongoDBConn) Init() error
Init initiates the object
func (*MongoDBConn) NewTransaction ¶ added in v1.1.14
func (conn *MongoDBConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*MongoDBConn) StreamRowsContext ¶ added in v1.1.14
func (conn *MongoDBConn) StreamRowsContext(ctx context.Context, collectionName string, Opts[string]interface{}) (ds *iop.Datastream, err error)
type MsSQLServerConn ¶
MsSQLServerConn is a Microsoft SQL Server connection
func (*MsSQLServerConn) BcpExport ¶
func (conn *MsSQLServerConn) BcpExport() (err error)
BcpExport exports data to datastream
func (*MsSQLServerConn) BcpImportFile ¶
func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count uint64, err error)
BcpImportFile Import using bcp tool bcp dbo.test1 in '/tmp/LargeDataset.csv' -S,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 Limitation: if comma or delimite is in field, it will error. need to use delimiter not in field, or do some other transformation
func (*MsSQLServerConn) BcpImportFileParrallel ¶
func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
BcpImportFileParrallel uses goroutine to import partitioned files
func (*MsSQLServerConn) BulkImportFlow ¶
func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*MsSQLServerConn) BulkImportStream ¶
func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MsSQLServerConn) ConnString ¶ added in v1.2.3
func (conn *MsSQLServerConn) ConnString() string
func (*MsSQLServerConn) Connect ¶ added in v1.2.3
func (conn *MsSQLServerConn) Connect(timeOut (err error)
func (*MsSQLServerConn) CopyFromAzure ¶
func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
CopyFromAzure uses the COPY INTO Table command from Azure
func (*MsSQLServerConn) CopyViaAzure ¶
func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Azure DWH COPY INTO Table command
func (*MsSQLServerConn) GenerateUpsertSQL ¶
func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*MsSQLServerConn) GetURL ¶
func (conn *MsSQLServerConn) GetURL(newURL ...string) string
GetURL returns the processed URL
type MySQLConn ¶
MySQLConn is a MySQL or MariaDB connection
func (*MySQLConn) BulkExportStream ¶
func (conn *MySQLConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
BulkExportStream bulk Export
func (*MySQLConn) BulkImportStream ¶
func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MySQLConn) GenerateUpsertSQL ¶
func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
UPSERT GenerateUpsertSQL generates the upsert SQL
func (*MySQLConn) LoadDataInFile ¶
func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
LoadDataInFile Bulk Import
func (*MySQLConn) LoadDataOutFile ¶
func (conn *MySQLConn) LoadDataOutFile(ctx *g.Context, sql string) (stdOutReader io.Reader, err error)
LoadDataOutFile Bulk Export Possible error: ERROR 1227 (42000) at line 1: Access denied; you need (at least one of) the FILE privilege(s) for this operation File privilege needs to be granted to user also the --secure-file-priv option needs to be set properly for it to work. to improve innodb insert speed
type OracleConn ¶
OracleConn is a Postgres connection
func (*OracleConn) BulkImportStream ¶
func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*OracleConn) ConnString ¶ added in v1.1.14
func (conn *OracleConn) ConnString() string
func (*OracleConn) Connect ¶ added in v1.1.14
func (conn *OracleConn) Connect(timeOut (err error)
func (*OracleConn) ExecMultiContext ¶
func (conn *OracleConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*OracleConn) GenerateInsertStatement ¶
func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*OracleConn) GenerateUpsertSQL ¶
func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*OracleConn) GetTableColumns ¶ added in v1.1.8
func (*OracleConn) SQLLoad ¶
func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
SQLLoad uses sqlldr to Bulk Import cat test1.csv | sqlldr system/ control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr cannot import when newline in value. Need to scan for new lines.
func (*OracleConn) SubmitTemplate ¶ added in v1.2.4
func (*OracleConn) Version ¶ added in v1.1.14
func (conn *OracleConn) Version() int
type PostgresConn ¶
PostgresConn is a Postgres connection
func (*PostgresConn) BulkExportStream ¶
func (conn *PostgresConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
BulkExportStream uses the bulk dumping (COPY)
func (*PostgresConn) BulkImportStream ¶
func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*PostgresConn) CastColumnForSelect ¶
func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*PostgresConn) CopyToStdout ¶
func (conn *PostgresConn) CopyToStdout(ctx *g.Context, sql string) (stdOutReader io.Reader, err error)
CopyToStdout Copy TO STDOUT
func (*PostgresConn) GenerateDDL ¶ added in v1.1.4
func (conn *PostgresConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*PostgresConn) GenerateUpsertSQL ¶
func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
type PrometheusConn ¶ added in v1.2.2
PrometheusConn is a Prometheus connection
func (*PrometheusConn) BulkExportFlow ¶ added in v1.2.2
func (conn *PrometheusConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
func (*PrometheusConn) Close ¶ added in v1.2.2
func (conn *PrometheusConn) Close() error
func (*PrometheusConn) Connect ¶ added in v1.2.2
func (conn *PrometheusConn) Connect(timeOut error
Connect connects to the database
func (*PrometheusConn) ExecContext ¶ added in v1.2.2
func (*PrometheusConn) GetSQLColumns ¶ added in v1.2.2
func (conn *PrometheusConn) GetSQLColumns(table Table) (columns iop.Columns, err error)
func (*PrometheusConn) GetSchemas ¶ added in v1.2.2
func (conn *PrometheusConn) GetSchemas() (data iop.Dataset, err error)
GetSchemas returns schemas
func (*PrometheusConn) GetSchemata ¶ added in v1.2.2
func (conn *PrometheusConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*PrometheusConn) GetTableColumns ¶ added in v1.2.2
func (conn *PrometheusConn) GetTableColumns(table *Table, fields ...string) (columns iop.Columns, err error)
NewTransaction creates a new transaction
func (*PrometheusConn) GetTables ¶ added in v1.2.2
func (conn *PrometheusConn) GetTables(schema string) (data iop.Dataset, err error)
GetSchemas returns schemas
func (*PrometheusConn) Init ¶ added in v1.2.2
func (conn *PrometheusConn) Init() error
Init initiates the object
func (*PrometheusConn) NewTransaction ¶ added in v1.2.2
func (conn *PrometheusConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
func (*PrometheusConn) StreamRowsContext ¶ added in v1.2.2
func (conn *PrometheusConn) StreamRowsContext(ctx context.Context, query string, Opts[string]interface{}) (ds *iop.Datastream, err error)
type ProtonConn ¶ added in v1.2.7
ProtonConn is a Proton connection
func (*ProtonConn) BulkImportStream ¶ added in v1.2.7
func (conn *ProtonConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*ProtonConn) ConnString ¶ added in v1.2.7
func (conn *ProtonConn) ConnString() string
func (*ProtonConn) Connect ¶ added in v1.2.7
func (conn *ProtonConn) Connect(timeOut (err error)
func (*ProtonConn) GenerateDDL ¶ added in v1.2.7
func (conn *ProtonConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*ProtonConn) GenerateInsertStatement ¶ added in v1.2.7
func (conn *ProtonConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*ProtonConn) GenerateUpsertSQL ¶ added in v1.2.7
func (conn *ProtonConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*ProtonConn) GetCount ¶ added in v1.2.14
func (conn *ProtonConn) GetCount(tableFName string) (uint64, error)
GetCount returns count of records
func (*ProtonConn) Init ¶ added in v1.2.7
func (conn *ProtonConn) Init() error
Init initiates the object
func (*ProtonConn) NewTransaction ¶ added in v1.2.7
func (conn *ProtonConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (Transaction, error)
NewTransaction creates a new transaction
type RedshiftConn ¶
RedshiftConn is a Redshift connection
func (*RedshiftConn) BulkExportFlow ¶
func (conn *RedshiftConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*RedshiftConn) BulkExportStream ¶
func (conn *RedshiftConn) BulkExportStream(table Table) (ds *iop.Datastream, err error)
BulkExportStream reads in bulk
func (*RedshiftConn) BulkImportFlow ¶
func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) BulkImportStream ¶
func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) CastColumnForSelect ¶ added in v1.2.10
func (conn *RedshiftConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*RedshiftConn) ConnString ¶
func (conn *RedshiftConn) ConnString() string
func (*RedshiftConn) CopyFromS3 ¶
func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string, columns iop.Columns) (count uint64, err error)
CopyFromS3 uses the COPY INTO Table command from AWS S3
func (*RedshiftConn) GenerateDDL ¶ added in v1.1.4
func (conn *RedshiftConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*RedshiftConn) GenerateUpsertSQL ¶
func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*RedshiftConn) OptimizeTable ¶ added in v1.1.9
func (*RedshiftConn) WarnStlLoadErrors ¶ added in v1.1.9
func (conn *RedshiftConn) WarnStlLoadErrors(err error)
type Result ¶
type Result struct {
// contains filtered or unexported fields
func (Result) LastInsertId ¶
func (Result) RowsAffected ¶
type SQLiteConn ¶
SQLiteConn is a Google Big Query connection
func (*SQLiteConn) BulkImportStream ¶
func (conn *SQLiteConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*SQLiteConn) GenerateUpsertSQL ¶
func (conn *SQLiteConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*SQLiteConn) GetSchemata ¶
func (conn *SQLiteConn) GetSchemata(schemaName string, tableNames ...string) (Schemata, error)
GetSchemata obtain full schemata info for a schema and/or table in current database
func (*SQLiteConn) GetURL ¶
func (conn *SQLiteConn) GetURL(newURL ...string) string
GetURL returns the processed URL
type Schemata ¶
type Schemata struct { Databases map[string]Database `json:"databases"` // contains filtered or unexported fields }
Schemata contains the full schema for a connection
func GetSchemataAll ¶
func GetSchemataAll(conn Connection) (schemata Schemata, err error)
GetSchemataAll obtains the schemata for all databases detected
func GetTablesSchemata ¶
func GetTablesSchemata(conn Connection, tableNames ...string) (schemata Schemata, err error)
GetTablesSchemata obtains the schemata for specified tables
func (*Schemata) LoadTablesJSON ¶
LoadTablesJSON loads from a json string
type SnowflakeConn ¶
SnowflakeConn is a Snowflake connection
func (*SnowflakeConn) BulkExportFlow ¶
func (conn *SnowflakeConn) BulkExportFlow(table Table) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*SnowflakeConn) BulkImportFlow ¶
func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*SnowflakeConn) BulkImportStream ¶
func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*SnowflakeConn) CastColumnForSelect ¶ added in v1.1.15
func (conn *SnowflakeConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*SnowflakeConn) ConnString ¶
func (conn *SnowflakeConn) ConnString() string
func (*SnowflakeConn) Connect ¶
func (conn *SnowflakeConn) Connect(timeOut error
Connect connects to the database
func (*SnowflakeConn) CopyFromAzure ¶
func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure
func (*SnowflakeConn) CopyFromS3 ¶
func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
CopyFromS3 uses the Snowflake COPY INTO Table command from AWS S3
func (*SnowflakeConn) CopyToAzure ¶
func (conn *SnowflakeConn) CopyToAzure(tables ...Table) (azPath string, err error)
CopyToAzure exports a query to an Azure location
func (*SnowflakeConn) CopyToS3 ¶
func (conn *SnowflakeConn) CopyToS3(tables ...Table) (s3Path string, err error)
CopyToS3 exports a query to an S3 location
func (*SnowflakeConn) CopyViaAWS ¶
func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAWS uses the Snowflake COPY INTO Table command from AWS S3
func (*SnowflakeConn) CopyViaAzure ¶
func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Snowflake COPY INTO Table command from Azure
func (*SnowflakeConn) CopyViaStage ¶
func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaStage uses the Snowflake COPY INTO Table command
func (*SnowflakeConn) GenerateDDL ¶ added in v1.1.4
func (conn *SnowflakeConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (sql string, err error)
GenerateDDL generates a DDL based on a dataset
func (*SnowflakeConn) GenerateUpsertSQL ¶
func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*SnowflakeConn) GetColumnsFull ¶
func (conn *SnowflakeConn) GetColumnsFull(tableFName string) (data iop.Dataset, err error)
GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`
func (*SnowflakeConn) GetDatabases ¶
func (conn *SnowflakeConn) GetDatabases() (data iop.Dataset, err error)
GetDatabases returns the list of databases
func (*SnowflakeConn) GetSchemas ¶
func (conn *SnowflakeConn) GetSchemas() (data iop.Dataset, err error)
GetSchemas returns schemas
func (*SnowflakeConn) GetTables ¶
func (conn *SnowflakeConn) GetTables(schema string) (data iop.Dataset, err error)
GetTables returns tables
func (*SnowflakeConn) GetViews ¶
func (conn *SnowflakeConn) GetViews(schema string) (data iop.Dataset, err error)
GetTables returns tables
func (*SnowflakeConn) StageGET ¶ added in v1.2.14
func (conn *SnowflakeConn) StageGET(internalStagePath, folderPath string) (filePaths []string, err error)
StageGET Copies from a staging location to a local file or folder
func (*SnowflakeConn) StagePUT ¶ added in v1.2.14
func (conn *SnowflakeConn) StagePUT(fileURI string, internalStagePath string) (err error)
StagePUT Copies a local file or folder into a staging location
func (*SnowflakeConn) UnloadViaStage ¶
func (conn *SnowflakeConn) UnloadViaStage(tables ...Table) (filePath string, err error)
type StarRocksConn ¶
StarRocksConn is a StarRocks connection
func (*StarRocksConn) AddMissingColumns ¶ added in v1.1.8
func (*StarRocksConn) BulkImportFlow ¶
func (conn *StarRocksConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table.
func (*StarRocksConn) GenerateDDL ¶
func (conn *StarRocksConn) GenerateDDL(table Table, data iop.Dataset, temporary bool) (string, error)
GenerateDDL generates a DDL based on a dataset
func (*StarRocksConn) GetDatabases ¶ added in v1.1.9
func (conn *StarRocksConn) GetDatabases() (data iop.Dataset, err error)
GetDatabases returns the list of databases
func (*StarRocksConn) GetURL ¶
func (conn *StarRocksConn) GetURL(newURL ...string) string
GetURL returns the processed URL
func (*StarRocksConn) InsertBatchStream ¶
func (conn *StarRocksConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*StarRocksConn) OptimizeTable ¶ added in v1.1.8
func (*StarRocksConn) StreamLoad ¶
func (conn *StarRocksConn) StreamLoad(feURL, tableFName string, df *iop.Dataflow) (count uint64, err error)
StreamLoad bulk loads
func (*StarRocksConn) WaitAlterTable ¶ added in v1.1.8
func (conn *StarRocksConn) WaitAlterTable(table Table) (err error)
type StatFieldSQL ¶
type Table ¶
type Table struct { Name string `json:"name"` Schema string `json:"schema"` Database string `json:"database,omitempty"` IsView bool `json:"is_view,omitempty"` // whether is a view SQL string `json:"sql,omitempty"` DDL string `json:"ddl,omitempty"` Dialect dbio.Type `json:"dialect,omitempty"` Columns iop.Columns `json:"columns,omitempty"` Keys TableKeys `json:"keys,omitempty"` Raw string `json:"raw"` // contains filtered or unexported fields }
Table represents a schemata table
type Transaction ¶
type Transaction interface { Connection() Connection Context() *g.Context Commit() (err error) Rollback() (err error) Prepare(query string) (stmt *sql.Stmt, err error) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) }
type TrinoConn ¶ added in v1.1.14
TrinoConn is a Trino connection
func (*TrinoConn) ConnString ¶ added in v1.1.14
func (*TrinoConn) ExecContext ¶ added in v1.1.14
func (conn *TrinoConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*TrinoConn) NewTransaction ¶ added in v1.1.14
func (conn *TrinoConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (tx Transaction, err error)
NewTransaction creates a new transaction
type WhereClause ¶
type WhereClause []interface{}
WhereClause is the where clause
func (WhereClause) Args ¶
func (wc WhereClause) Args() []interface{}
Args returns the where clause arguments
func (WhereClause) Clause ¶
func (wc WhereClause) Clause() string
Clause returns the string where clause
Source Files
- analyzer.go
- database.go
- database_bigquery.go
- database_bigtable.go
- database_clickhouse.go
- database_duckdb.go
- database_mongo.go
- database_mysql.go
- database_oracle.go
- database_postgres.go
- database_prometheus.go
- database_proton.go
- database_redshift.go
- database_snowflake.go
- database_sqlite.go
- database_sqlserver.go
- database_starrocks.go
- database_trino.go
- dbx.go
- schemata.go
- transaction.go