Documentation ¶
Index ¶
- Variables
- func CleanSQL(conn Connection, sql string) string
- func CommonColumns(colNames1 []string, colNames2 []string) (commCols []string)
- func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
- func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
- func GetSlingEnv() map[string]string
- func InsertBatchStream(conn Connection, tx *Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
- func InsertStream(conn Connection, tx *Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
- func PK(obj interface{}) (pk []string)
- func ParseSQLMultiStatements(sql string) (sqls g.Strings)
- func SQLColumns(colTypes []*sql.ColumnType, NativeTypeMap map[string]string) (columns iop.Columns)
- func SplitTableFullName(tableName string) (string, string)
- func TestPermissions(conn Connection, tableName string) (err error)
- func UID(obj interface{}) string
- func Upsert(conn Connection, tx *Transaction, sourceTable, targetTable string, ...) (count int64, err error)
- type BaseConn
- func (conn *BaseConn) BaseURL() string
- func (conn *BaseConn) Begin(options ...*sql.TxOptions) (err error)
- func (conn *BaseConn) BeginContext(ctx context.Context, options ...*sql.TxOptions) (err error)
- func (conn *BaseConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportFlowCSV(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *BaseConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *BaseConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) string
- func (conn *BaseConn) CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string
- func (conn *BaseConn) Close() error
- func (conn *BaseConn) Commit() (err error)
- func (conn *BaseConn) CompareChecksums(tableName string, columns iop.Columns) (err error)
- func (conn *BaseConn) Connect(timeOut ...int) (err error)
- func (conn *BaseConn) Context() *g.Context
- func (conn *BaseConn) CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error)
- func (conn *BaseConn) CreateTemporaryTable(tableName string, cols iop.Columns) (err error)
- func (conn *BaseConn) Db() *sqlx.DB
- func (conn *BaseConn) DbX() *DbX
- func (conn *BaseConn) DropTable(tableNames ...string) (err error)
- func (conn *BaseConn) DropView(viewNames ...string) (err error)
- func (conn *BaseConn) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecMulti(sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (conn *BaseConn) GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error)
- func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
- func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)
- func (conn *BaseConn) GetAnalysisField(analysisName string, tableFName string, fields ...string) (sql string, err error)
- func (conn *BaseConn) GetAnalysisTable(analysisName string, tableFNames ...string) (sql string, err error)
- func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetColumnsFull(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetCount(tableFName string) (uint64, error)
- func (conn *BaseConn) GetDDL(tableFName string) (string, error)
- func (conn *BaseConn) GetGormConn(config *gorm.Config) (*gorm.DB, error)
- func (conn *BaseConn) GetIndexes(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetNativeType(col iop.Column) (nativeType string, err error)
- func (conn *BaseConn) GetObjects(schema string, objectType string) (iop.Dataset, error)
- func (conn *BaseConn) GetPrimaryKeys(tableFName string) (iop.Dataset, error)
- func (conn *BaseConn) GetProp(key string) string
- func (conn *BaseConn) GetSQLColumns(sqls ...string) (columns iop.Columns, err error)
- func (conn *BaseConn) GetSchemaObjects(schemaName string) (Schema, error)
- func (conn *BaseConn) GetSchemas() (iop.Dataset, error)
- func (conn *BaseConn) GetTables(schema string) (iop.Dataset, error)
- func (conn *BaseConn) GetTemplateValue(path string) (value string)
- func (conn *BaseConn) GetType() dbio.Type
- func (conn *BaseConn) GetURL(newURL ...string) string
- func (conn *BaseConn) GetViews(schema string) (iop.Dataset, error)
- func (conn *BaseConn) Import(data iop.Dataset, tableName string) error
- func (conn *BaseConn) Init() (err error)
- func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BaseConn) Kill() error
- func (conn *BaseConn) LoadTemplates() error
- func (conn *BaseConn) MustExec(sql string, args ...interface{}) (result sql.Result)
- func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (*Transaction, error)
- func (conn *BaseConn) OptimizeTable(tableName string, newColStats iop.Columns) (err error)
- func (conn *BaseConn) Prepare(query string) (stmt *sql.Stmt, err error)
- func (conn *BaseConn) PropArr() []string
- func (conn *BaseConn) Props() map[string]string
- func (conn *BaseConn) Query(sql string, limit ...int) (data iop.Dataset, err error)
- func (conn *BaseConn) QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error)
- func (conn *BaseConn) Quote(field string, normalize ...bool) string
- func (conn *BaseConn) Rollback() (err error)
- func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)
- func (conn *BaseConn) Schemata() Schemata
- func (conn *BaseConn) Self() Connection
- func (conn *BaseConn) SetProp(key string, val string)
- func (conn *BaseConn) SetTx(tx *Transaction)
- func (conn *BaseConn) StreamRecords(sql string) (<-chan map[string]interface{}, error)
- func (conn *BaseConn) StreamRows(sql string, limit ...int) (ds *iop.Datastream, err error)
- func (conn *BaseConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
- func (conn *BaseConn) SwapTable(srcTable string, tgtTable string) (err error)
- func (conn *BaseConn) TableExists(tableFName string) (exists bool, err error)
- func (conn *BaseConn) Template() Template
- func (conn *BaseConn) Tx() *Transaction
- func (conn *BaseConn) Unquote(field string) string
- func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
- func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)
- type BigQueryConn
- func (conn *BigQueryConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) Close() error
- func (conn *BigQueryConn) Connect(timeOut ...int) error
- func (conn *BigQueryConn) CopyFromGCS(gcsURI string, tableFName string, dsColumns []iop.Column) error
- func (conn *BigQueryConn) CopyToGCS(tableFName string, gcsURI string) error
- func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
- func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *BigQueryConn) Init() error
- func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
- func (conn *BigQueryConn) Unload(sqls ...string) (gsPath string, err error)
- type Connection
- type DbX
- func (x *DbX) Delete(o interface{}) (cnt int, err error)
- func (x *DbX) Get(o interface{}, fields ...string) (err error)
- func (x *DbX) Insert(o interface{}, fields ...string) (err error)
- func (x *DbX) Select(o interface{}, fields ...string) (err error)
- func (x *DbX) TableName(o interface{}) (name string)
- func (x *DbX) Update(o interface{}, fields ...string) (cnt int, err error)
- func (x *DbX) Upsert(o interface{}, fields ...string) (cnt int, err error)
- func (x *DbX) Where(where ...interface{}) *DbX
- type ModelDbX
- func (m *ModelDbX) Bind(bindFunc func(p interface{}) error, objPtr interface{}) (err error)
- func (m *ModelDbX) Delete(db *sqlx.DB) (err error)
- func (m *ModelDbX) Fields() (fields []string)
- func (m *ModelDbX) Get(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Insert(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Rec() map[string]interface{}
- func (m *ModelDbX) Select(db *sqlx.DB, objPtr interface{}, fields ...string) (err error)
- func (m *ModelDbX) TableName(objPtr interface{}) string
- func (m *ModelDbX) Update(db *sqlx.DB, fields ...string) (err error)
- func (m *ModelDbX) Values(fields []string) (values []interface{}, err error)
- func (m *ModelDbX) Where(where ...interface{}) *ModelDbX
- type MsSQLServerConn
- func (conn *MsSQLServerConn) BcpExport() (err error)
- func (conn *MsSQLServerConn) BcpImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) BcpImportStreamParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
- func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *MsSQLServerConn) GetURL(newURL ...string) string
- func (conn *MsSQLServerConn) Init() error
- type MySQLConn
- func (conn *MySQLConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *MySQLConn) GetURL(newURL ...string) string
- func (conn *MySQLConn) Init() error
- func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *MySQLConn) LoadDataOutFile(sql string) (stdOutReader io.Reader, err error)
- type OracleConn
- func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
- func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *OracleConn) Init() error
- func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
- type Pool
- type PostgresConn
- func (conn *PostgresConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
- func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)
- func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *PostgresConn) Init() error
- type RedshiftConn
- func (conn *RedshiftConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *RedshiftConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
- func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string) (count uint64, err error)
- func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *RedshiftConn) Init() error
- func (conn *RedshiftConn) Unload(sqls ...string) (s3Path string, err error)
- type Result
- type SQLiteConn
- type Schema
- type Schemata
- type SnowflakeConn
- func (conn *SnowflakeConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
- func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (conn *SnowflakeConn) Connect(timeOut ...int) error
- func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
- func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
- func (conn *SnowflakeConn) CopyToAzure(sqls ...string) (azPath string, err error)
- func (conn *SnowflakeConn) CopyToS3(sqls ...string) (s3Path string, err error)
- func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
- func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
- func (conn *SnowflakeConn) Init() error
- func (conn *SnowflakeConn) PutFile(fPath string, internalStagePath string) (err error)
- type Table
- type Template
- type Transaction
- func (t *Transaction) Commit() (err error)
- func (t *Transaction) DisableTrigger(tableName, triggerName string) (err error)
- func (t *Transaction) EnableTrigger(tableName, triggerName string) (err error)
- func (t *Transaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)
- func (t *Transaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *Transaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
- func (t *Transaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (t *Transaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
- func (t *Transaction) Prepare(query string) (stmt *sql.Stmt, err error)
- func (t *Transaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
- func (t *Transaction) Rollback() (err error)
- func (t *Transaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)
- func (t *Transaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)
- type User
- type WhereClause
Constants ¶
This section is empty.
Variables ¶
var ( // UseBulkExportFlowCSV to use BulkExportFlowCSV UseBulkExportFlowCSV = false SampleSize = 900 )
var Debug = false
Debug prints queries when true
Functions ¶
func CleanSQL ¶
func CleanSQL(conn Connection, sql string) string
CleanSQL removes creds from the query
func CommonColumns ¶
CommonColumns return common columns
func CopyFromAzure ¶
func CopyFromAzure(conn Connection, tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func CopyFromS3 ¶
func CopyFromS3(conn Connection, tableFName, s3Path string) (err error)
func InsertBatchStream ¶ added in v0.0.5
func InsertBatchStream(conn Connection, tx *Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func InsertStream ¶ added in v0.0.5
func InsertStream(conn Connection, tx *Transaction, tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream inserts a stream
func PK ¶ added in v0.0.5
func PK(obj interface{}) (pk []string)
PK returns the primary keys of a model
func ParseSQLMultiStatements ¶ added in v0.0.5
ParseSQLMultiStatements splits a sql text into statements typically by a ';'
func SQLColumns ¶
SQLColumns returns the columns from database ColumnType
func SplitTableFullName ¶
SplitTableFullName retrusn the schema / table name
func TestPermissions ¶
func TestPermissions(conn Connection, tableName string) (err error)
TestPermissions tests the needed permissions in a given connection
func Upsert ¶ added in v0.0.5
func Upsert(conn Connection, tx *Transaction, sourceTable, targetTable string, pkFields []string) (count int64, err error)
Upsert upserts from source table into target table
Types ¶
type BaseConn ¶
type BaseConn struct { Connection URL string Type dbio.Type // the type of database for sqlx: postgres, mysql, sqlite Data iop.Dataset // contains filtered or unexported fields }
BaseConn is a database connection
func (*BaseConn) BeginContext ¶ added in v0.0.5
BeginContext starts a connection wide transaction
func (*BaseConn) BulkExportFlow ¶
BulkExportFlow creates a dataflow from a sql query
func (*BaseConn) BulkExportFlowCSV ¶
BulkExportFlowCSV creates a dataflow from a sql query, using CSVs
func (*BaseConn) BulkExportStream ¶
func (conn *BaseConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream streams the rows in bulk
func (*BaseConn) BulkImportFlow ¶
BulkImportFlow imports the streams rows in bulk concurrently using channels
func (*BaseConn) BulkImportStream ¶
func (conn *BaseConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream import the stream rows in bulk
func (*BaseConn) CastColumnForSelect ¶
CastColumnForSelect casts to the correct target column type
func (*BaseConn) CastColumnsForSelect ¶
CastColumnsForSelect cast the source columns into the target Column types
func (*BaseConn) CompareChecksums ¶
CompareChecksums compares the checksum values from the database side to the checkum values from the StreamProcessor
func (*BaseConn) CreateTable ¶ added in v0.0.5
CreateTable creates a new table based on provided columns `tableName` should have 'schema.table' format
func (*BaseConn) CreateTemporaryTable ¶ added in v0.0.5
CreateTemporaryTable creates a temp table based on provided columns
func (*BaseConn) ExecContext ¶
func (conn *BaseConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BaseConn) ExecMultiContext ¶ added in v0.0.5
func (conn *BaseConn) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*BaseConn) GenerateDDL ¶
func (conn *BaseConn) GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error)
GenerateDDL genrate a DDL based on a dataset
func (*BaseConn) GenerateInsertStatement ¶
func (conn *BaseConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*BaseConn) GenerateUpsertExpressions ¶
func (conn *BaseConn) GenerateUpsertExpressions(srcTable string, tgtTable string, pkFields []string) (exprs map[string]string, err error)
GenerateUpsertExpressions returns a map with needed expressions
func (*BaseConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *BaseConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL returns a sql for upsert
func (*BaseConn) GetAnalysis ¶ added in v0.0.5
func (conn *BaseConn) GetAnalysis(analysisName string, values map[string]interface{}) (sql string, err error)
GetAnalysis runs an analysis
func (*BaseConn) GetAnalysisField ¶ added in v0.0.5
func (conn *BaseConn) GetAnalysisField(analysisName string, tableFName string, fields ...string) (sql string, err error)
GetAnalysisField runs a field level analysis
func (*BaseConn) GetAnalysisTable ¶ added in v0.0.5
func (conn *BaseConn) GetAnalysisTable(analysisName string, tableFNames ...string) (sql string, err error)
GetAnalysisTable runs a table level analysis
func (*BaseConn) GetColumnStats ¶
func (conn *BaseConn) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error)
GetColumnStats analyzes the table and returns the column statistics
func (*BaseConn) GetColumns ¶
func (conn *BaseConn) GetColumns(tableFName string, fields ...string) (columns iop.Columns, err error)
GetColumns returns columns for given table. `tableFName` should include schema and table, example: `schema1.table2` fields should be `column_name|data_type`
func (*BaseConn) GetColumnsFull ¶
GetColumnsFull returns columns for given table. `tableName` should include schema and table, example: `schema1.table2` fields should be `schema_name|table_name|table_type|column_name|data_type|column_id`
func (*BaseConn) GetGormConn ¶
GetGormConn returns the gorm db connection
func (*BaseConn) GetIndexes ¶
GetIndexes returns indexes for given table.
func (*BaseConn) GetNativeType ¶ added in v0.0.5
GetNativeType returns the native column type from generic
func (*BaseConn) GetObjects ¶
GetObjects returns objects (tables or views) for given schema `objectType` can be either 'table', 'view' or 'all'
func (*BaseConn) GetPrimaryKeys ¶
GetPrimaryKeys returns primark keys for given table.
func (*BaseConn) GetSQLColumns ¶
GetSQLColumns return columns from a sql query result
func (*BaseConn) GetSchemaObjects ¶
GetSchemaObjects obtain full schemata info
func (*BaseConn) GetSchemas ¶
GetSchemas returns schemas
func (*BaseConn) GetTemplateValue ¶
GetTemplateValue returns the value of the path
func (*BaseConn) InsertBatchStream ¶
func (conn *BaseConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BaseConn) InsertStream ¶
InsertStream inserts a stream into a table
func (*BaseConn) LoadTemplates ¶
LoadTemplates loads the appropriate yaml template
func (*BaseConn) MustExec ¶
MustExec execs the query using e and panics if there was an error. Any placeholder parameters are replaced with supplied args.
func (*BaseConn) NewTransaction ¶ added in v0.0.5
func (conn *BaseConn) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (*Transaction, error)
NewTransaction creates a new transaction
func (*BaseConn) OptimizeTable ¶
OptimizeTable analyzes the table and alters the table with the columns data type based on its analysis result if table is missing, it is created with a new DDl Hole in this: will truncate data points, since it is based only on new data being inserted... would need a complete stats of the target table to properly optimize.
func (*BaseConn) QueryContext ¶
func (conn *BaseConn) QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error)
QueryContext runs a sql query with ctx, returns `result`, `error`
func (*BaseConn) RunAnalysis ¶
func (conn *BaseConn) RunAnalysis(analysisName string, values map[string]interface{}) (data iop.Dataset, err error)
RunAnalysis runs an analysis
func (*BaseConn) Self ¶
func (conn *BaseConn) Self() Connection
Self returns the respective connection Instance This is useful to refer back to a subclass method from the superclass level. (Aka overloading)
func (*BaseConn) SetTx ¶ added in v0.0.5
func (conn *BaseConn) SetTx(tx *Transaction)
SetTx sets the transaction
func (*BaseConn) StreamRecords ¶
StreamRecords the records of a sql query, returns `result`, `error`
func (*BaseConn) StreamRows ¶
StreamRows the rows of a sql query, returns `result`, `error`
func (*BaseConn) StreamRowsContext ¶
func (conn *BaseConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`
func (*BaseConn) TableExists ¶
TableExists returns true if the table exists
func (*BaseConn) Upsert ¶
func (conn *BaseConn) Upsert(srcTable string, tgtTable string, primKeys []string) (rowAffCnt int64, err error)
Upsert inserts / updates from a srcTable into a target table. Assuming the srcTable has some or all of the tgtTable fields with matching types
func (*BaseConn) ValidateColumnNames ¶
func (conn *BaseConn) ValidateColumnNames(tgtColNames []string, colNames []string, quote bool) (newColNames []string, err error)
ValidateColumnNames verifies that source fields are present in the target table It will return quoted field names as `newColNames`, the same length as `colNames`
type BigQueryConn ¶
type BigQueryConn struct { BaseConn URL string Client *bigquery.Client ProjectID string DatasetID string Location string Datasets []string }
BigQueryConn is a Google Big Query connection
func (*BigQueryConn) BulkExportFlow ¶
func (conn *BigQueryConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*BigQueryConn) BulkImportFlow ¶
func (conn *BigQueryConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*BigQueryConn) BulkImportStream ¶
func (conn *BigQueryConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) Connect ¶
func (conn *BigQueryConn) Connect(timeOut ...int) error
Connect connects to the database
func (*BigQueryConn) CopyFromGCS ¶
func (conn *BigQueryConn) CopyFromGCS(gcsURI string, tableFName string, dsColumns []iop.Column) error
CopyFromGCS into bigquery from google storage
func (*BigQueryConn) CopyToGCS ¶
func (conn *BigQueryConn) CopyToGCS(tableFName string, gcsURI string) error
CopyToGCS Copy table to gc storage
func (*BigQueryConn) ExecContext ¶
func (conn *BigQueryConn) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*BigQueryConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *BigQueryConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*BigQueryConn) InsertBatchStream ¶
func (conn *BigQueryConn) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*BigQueryConn) InsertStream ¶
func (conn *BigQueryConn) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream demonstrates loading data into a BigQuery table using a file on the local filesystem.
func (*BigQueryConn) StreamRowsContext ¶
func (conn *BigQueryConn) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error)
StreamRowsContext streams the rows of a sql query with context, returns `result`, `error`
type Connection ¶
type Connection interface { BaseURL() string Begin(options ...*sql.TxOptions) error BeginContext(ctx context.Context, options ...*sql.TxOptions) error BulkExportFlow(sqls ...string) (*iop.Dataflow, error) BulkExportStream(sql string) (*iop.Datastream, error) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error) CastColumnForSelect(srcColumn iop.Column, tgtColumn iop.Column) string CastColumnsForSelect(srcColumns iop.Columns, tgtColumns iop.Columns) []string Close() error Commit() error CompareChecksums(tableName string, columns iop.Columns) (err error) Connect(timeOut ...int) error Context() *g.Context CreateTemporaryTable(tableName string, cols iop.Columns) (err error) CreateTable(tableName string, cols iop.Columns, tableDDL string) (err error) Db() *sqlx.DB DbX() *DbX DropTable(...string) error DropView(...string) error ExecMulti(sql string, args ...interface{}) (result sql.Result, err error) ExecMultiContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) Exec(sql string, args ...interface{}) (result sql.Result, err error) ExecContext(ctx context.Context, sql string, args ...interface{}) (result sql.Result, err error) GenerateDDL(tableFName string, data iop.Dataset, temporary bool) (string, error) GenerateInsertStatement(tableName string, fields []string, numRows int) string GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error) GetColumns(tableFName string, fields ...string) (iop.Columns, error) GetColumnsFull(string) (iop.Dataset, error) GetColumnStats(tableName string, fields ...string) (columns iop.Columns, err error) GetCount(string) (uint64, error) GetDDL(string) (string, error) GetGormConn(config *gorm.Config) (*gorm.DB, error) GetIndexes(string) (iop.Dataset, error) GetNativeType(col iop.Column) (nativeType string, err error) GetPrimaryKeys(string) (iop.Dataset, error) GetProp(string) string GetSchemaObjects(string) (Schema, error) GetSchemas() (iop.Dataset, error) GetSQLColumns(sqls ...string) (columns iop.Columns, err error) GetTables(string) (iop.Dataset, error) GetTemplateValue(path string) (value string) GetType() dbio.Type GetURL(newURL ...string) string GetViews(string) (iop.Dataset, error) Init() error InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error) Kill() error LoadTemplates() error MustExec(sql string, args ...interface{}) (result sql.Result) NewTransaction(ctx context.Context, options ...*sql.TxOptions) (*Transaction, error) OptimizeTable(tableName string, columns iop.Columns) (err error) Prepare(query string) (stmt *sql.Stmt, err error) Props() map[string]string PropsArr() []string Query(sql string, limit ...int) (iop.Dataset, error) QueryContext(ctx context.Context, sql string, limit ...int) (iop.Dataset, error) Quote(field string, normalize ...bool) string RenameTable(table string, newTable string) (err error) Rollback() error RunAnalysis(string, map[string]interface{}) (iop.Dataset, error) GetAnalysis(string, map[string]interface{}) (string, error) GetAnalysisField(string, string, ...string) (string, error) GetAnalysisTable(string, ...string) (string, error) Schemata() Schemata Self() Connection SetProp(string, string) SetTx(*Transaction) StreamRecords(sql string) (<-chan map[string]interface{}, error) StreamRows(sql string, limit ...int) (*iop.Datastream, error) StreamRowsContext(ctx context.Context, sql string, limit ...int) (ds *iop.Datastream, err error) SwapTable(srcTable string, tgtTable string) (err error) TableExists(tableFName string) (exists bool, err error) Template() Template Tx() *Transaction Unquote(string) string Upsert(srcTable string, tgtTable string, pkFields []string) (rowAffCnt int64, err error) ValidateColumnNames(tgtColName []string, colNames []string, quote bool) (newColNames []string, err error) // contains filtered or unexported methods }
Connection is the Base interface for Connections
func NewConn ¶
func NewConn(URL string, props ...string) (Connection, error)
NewConn return the most proper connection for a given database
func NewConnContext ¶
NewConnContext return the most proper connection for a given database with context props are provided as `"Prop1=Value1", "Prop2=Value2", ...`
type DbX ¶ added in v0.0.5
type DbX struct {
// contains filtered or unexported fields
}
DbX is db express
type ModelDbX ¶ added in v0.0.5
type ModelDbX struct { Ptr interface{} `json:"-"` RowsAffected int `json:"-"` // contains filtered or unexported fields }
ModelDbX is the base for any SQL model
func (*ModelDbX) TableName ¶ added in v0.0.5
TableName returns the table name of the underlying pointer
type MsSQLServerConn ¶
MsSQLServerConn is a Microsoft SQL Server connection
func (*MsSQLServerConn) BcpExport ¶
func (conn *MsSQLServerConn) BcpExport() (err error)
BcpExport exports data to datastream
func (*MsSQLServerConn) BcpImportStream ¶
func (conn *MsSQLServerConn) BcpImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BcpImportStream Import using bcp tool https://docs.microsoft.com/en-us/sql/tools/bcp-utility?view=sql-server-ver15 bcp dbo.test1 in '/tmp/LargeDataset.csv' -S tcp:sqlserver.host,51433 -d master -U sa -P 'password' -c -t ',' -b 5000 Limitation: if comma or delimite is in field, it will error. need to use delimiter not in field, or do some other transformation
func (*MsSQLServerConn) BcpImportStreamParrallel ¶
func (conn *MsSQLServerConn) BcpImportStreamParrallel(tableFName string, ds *iop.Datastream) (count uint64, err error)
BcpImportStreamParrallel uses goroutine to import partitioned files
func (*MsSQLServerConn) BulkImportFlow ¶
func (conn *MsSQLServerConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*MsSQLServerConn) BulkImportStream ¶
func (conn *MsSQLServerConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MsSQLServerConn) CopyFromAzure ¶
func (conn *MsSQLServerConn) CopyFromAzure(tableFName, azPath string) (count uint64, err error)
CopyFromAzure uses the COPY INTO Table command from Azure https://docs.microsoft.com/en-us/sql/t-sql/statements/copy-into-transact-sql?view=azure-sqldw-latest
func (*MsSQLServerConn) CopyViaAzure ¶
func (conn *MsSQLServerConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Azure DWH COPY INTO Table command
func (*MsSQLServerConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *MsSQLServerConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*MsSQLServerConn) GetURL ¶
func (conn *MsSQLServerConn) GetURL(newURL ...string) string
GetURL returns the processed URL
type MySQLConn ¶
MySQLConn is a Postgres connection
func (*MySQLConn) BulkExportStream ¶
func (conn *MySQLConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream bulk Export
func (*MySQLConn) BulkImportStream ¶
func (conn *MySQLConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*MySQLConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *MySQLConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
UPSERT https://vladmihalcea.com/how-do-upsert-and-merge-work-in-oracle-sql-server-postgresql-and-mysql/ GenerateUpsertSQL generates the upsert SQL
func (*MySQLConn) LoadDataInFile ¶
func (conn *MySQLConn) LoadDataInFile(tableFName string, ds *iop.Datastream) (count uint64, err error)
LoadDataInFile Bulk Import
func (*MySQLConn) LoadDataOutFile ¶
LoadDataOutFile Bulk Export Possible error: ERROR 1227 (42000) at line 1: Access denied; you need (at least one of) the FILE privilege(s) for this operation File privilege needs to be granted to user also the --secure-file-priv option needs to be set properly for it to work. https://stackoverflow.com/questions/9819271/why-is-mysql-innodb-insert-so-slow to improve innodb insert speed
type OracleConn ¶
OracleConn is a Postgres connection
func (*OracleConn) BulkImportStream ¶
func (conn *OracleConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*OracleConn) GenerateInsertStatement ¶
func (conn *OracleConn) GenerateInsertStatement(tableName string, fields []string, numRows int) string
GenerateInsertStatement returns the proper INSERT statement
func (*OracleConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *OracleConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
func (*OracleConn) SQLLoad ¶
func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count uint64, err error)
SQLLoad uses sqlldr to Bulk Import cat test1.csv | sqlldr system/oracle@oracle.host:1521/xe control=sqlldr.ctl log=/dev/stdout bad=/dev/stderr cannot import when newline in value. Need to scan for new lines.
type PostgresConn ¶
PostgresConn is a Postgres connection
func (*PostgresConn) BulkExportStream ¶
func (conn *PostgresConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream uses the bulk dumping (COPY)
func (*PostgresConn) BulkImportStream ¶
func (conn *PostgresConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table
func (*PostgresConn) CastColumnForSelect ¶
func (conn *PostgresConn) CastColumnForSelect(srcCol iop.Column, tgtCol iop.Column) (selectStr string)
CastColumnForSelect casts to the correct target column type
func (*PostgresConn) CopyToStdout ¶
func (conn *PostgresConn) CopyToStdout(sql string) (stdOutReader io.Reader, err error)
CopyToStdout Copy TO STDOUT
func (*PostgresConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *PostgresConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
type RedshiftConn ¶
RedshiftConn is a Redshift connection
func (*RedshiftConn) BulkExportFlow ¶
func (conn *RedshiftConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*RedshiftConn) BulkExportStream ¶
func (conn *RedshiftConn) BulkExportStream(sql string) (ds *iop.Datastream, err error)
BulkExportStream reads in bulk
func (*RedshiftConn) BulkImportFlow ¶
func (conn *RedshiftConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow inserts a flow of streams into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) BulkImportStream ¶
func (conn *RedshiftConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream inserts a stream into a table. For redshift we need to create CSVs in S3 and then use the COPY command.
func (*RedshiftConn) CopyFromS3 ¶
func (conn *RedshiftConn) CopyFromS3(tableFName, s3Path string) (count uint64, err error)
CopyFromS3 uses the COPY INTO Table command from AWS S3
func (*RedshiftConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *RedshiftConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
type Result ¶ added in v0.0.5
type Result struct {
// contains filtered or unexported fields
}
func (Result) LastInsertId ¶ added in v0.0.5
func (Result) RowsAffected ¶ added in v0.0.5
type SQLiteConn ¶
SQLiteConn is a Google Big Query connection
func (*SQLiteConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *SQLiteConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
type Schemata ¶
type Schemata struct { Schemas map[string]Schema Tables map[string]*Table // all tables with full name lower case (schema.table) }
Schemata contains the full schema for a connection
type SnowflakeConn ¶
SnowflakeConn is a Snowflake connection
func (*SnowflakeConn) BulkExportFlow ¶
func (conn *SnowflakeConn) BulkExportFlow(sqls ...string) (df *iop.Dataflow, err error)
BulkExportFlow reads in bulk
func (*SnowflakeConn) BulkImportFlow ¶
func (conn *SnowflakeConn) BulkImportFlow(tableFName string, df *iop.Dataflow) (count uint64, err error)
BulkImportFlow bulk import flow
func (*SnowflakeConn) BulkImportStream ¶
func (conn *SnowflakeConn) BulkImportStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
BulkImportStream bulk import stream
func (*SnowflakeConn) Connect ¶
func (conn *SnowflakeConn) Connect(timeOut ...int) error
Connect connects to the database
func (*SnowflakeConn) CopyFromAzure ¶
func (conn *SnowflakeConn) CopyFromAzure(tableFName, azPath string) (err error)
CopyFromAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyFromS3 ¶
func (conn *SnowflakeConn) CopyFromS3(tableFName, s3Path string) (err error)
CopyFromS3 uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyToAzure ¶
func (conn *SnowflakeConn) CopyToAzure(sqls ...string) (azPath string, err error)
CopyToAzure exports a query to an Azure location
func (*SnowflakeConn) CopyToS3 ¶
func (conn *SnowflakeConn) CopyToS3(sqls ...string) (s3Path string, err error)
CopyToS3 exports a query to an S3 location
func (*SnowflakeConn) CopyViaAWS ¶
func (conn *SnowflakeConn) CopyViaAWS(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAWS uses the Snowflake COPY INTO Table command from AWS S3 https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyViaAzure ¶
func (conn *SnowflakeConn) CopyViaAzure(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaAzure uses the Snowflake COPY INTO Table command from Azure https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) CopyViaStage ¶
func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (count uint64, err error)
CopyViaStage uses the Snowflake COPY INTO Table command https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html
func (*SnowflakeConn) GenerateUpsertSQL ¶ added in v0.0.5
func (conn *SnowflakeConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFields []string) (sql string, err error)
GenerateUpsertSQL generates the upsert SQL
type Table ¶
type Table struct { Name string `json:"name"` FullName string `json:"full_name"` IsView bool `json:"is_view"` // whether is a view Columns iop.Columns ColumnsMap map[string]*iop.Column }
Table represents a schemata table
type Template ¶
type Template struct { Core map[string]string Metadata map[string]string Analysis map[string]string Function map[string]string `yaml:"function"` GeneralTypeMap map[string]string `yaml:"general_type_map"` NativeTypeMap map[string]string `yaml:"native_type_map"` NativeStatsMap map[string]bool `yaml:"native_stat_map"` Variable map[string]string }
Template is a database YAML template
type Transaction ¶ added in v0.0.5
type Transaction struct { Tx *sqlx.Tx Context *g.Context Conn Connection // contains filtered or unexported fields }
Transaction is a database transaction
func (*Transaction) Commit ¶ added in v0.0.5
func (t *Transaction) Commit() (err error)
Commit commits connection wide transaction
func (*Transaction) DisableTrigger ¶ added in v0.0.5
func (t *Transaction) DisableTrigger(tableName, triggerName string) (err error)
DisableTrigger disables a trigger
func (*Transaction) EnableTrigger ¶ added in v0.0.5
func (t *Transaction) EnableTrigger(tableName, triggerName string) (err error)
EnableTrigger enables a trigger
func (*Transaction) Exec ¶ added in v0.0.5
func (t *Transaction) Exec(sql string, args ...interface{}) (result sql.Result, err error)
Exec runs a sql query, returns `error`
func (*Transaction) ExecContext ¶ added in v0.0.5
func (t *Transaction) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecContext runs a sql query with context, returns `error`
func (*Transaction) ExecMultiContext ¶ added in v0.0.5
func (t *Transaction) ExecMultiContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error)
ExecMultiContext runs multiple sql queries with context, returns `error`
func (*Transaction) InsertBatchStream ¶ added in v0.0.5
func (t *Transaction) InsertBatchStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertBatchStream inserts a stream into a table in batch
func (*Transaction) InsertStream ¶ added in v0.0.5
func (t *Transaction) InsertStream(tableFName string, ds *iop.Datastream) (count uint64, err error)
InsertStream inserts a stream into a table
func (*Transaction) Prepare ¶ added in v0.0.5
func (t *Transaction) Prepare(query string) (stmt *sql.Stmt, err error)
Prepare prepares the statement
func (*Transaction) QueryContext ¶ added in v0.0.5
func (t *Transaction) QueryContext(ctx context.Context, q string, args ...interface{}) (result *sqlx.Rows, err error)
QueryContext queries rows
func (*Transaction) Rollback ¶ added in v0.0.5
func (t *Transaction) Rollback() (err error)
Rollback rolls back connection wide transaction
func (*Transaction) Upsert ¶ added in v0.0.5
func (t *Transaction) Upsert(sourceTable, targetTable string, pkFields []string) (count uint64, err error)
Upsert does an upsert from source table into target table
func (*Transaction) UpsertStream ¶ added in v0.0.5
func (t *Transaction) UpsertStream(tableFName string, ds *iop.Datastream, pk []string) (count uint64, err error)
UpsertStream inserts a stream into a table in batch
type WhereClause ¶ added in v0.0.5
type WhereClause []interface{}
WhereClause is the where clause
func (WhereClause) Args ¶ added in v0.0.5
func (wc WhereClause) Args() []interface{}
Args returns the where clause arguments
func (WhereClause) Clause ¶ added in v0.0.5
func (wc WhereClause) Clause() string
Clause returns the string where clause