Documentation ¶
Index ¶
- Constants
- Variables
- func BuildConstraintName(tableName string) string
- func GranularityToPartitionIds(g Granularity, t time.Time) []string
- func NewBigquery(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewClickHouse(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewMySQL(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewPostgres(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewRedshift(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewSnowflake(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func ProcessSSL(dir string, dsc *PostgresConfig) error
- func WithColumnType(columnName, sqlType string) bulker.StreamOption
- func WithColumnTypeDDL(columnName, sqlType, ddlType string) bulker.StreamOption
- func WithColumnTypes(fields types.SQLTypes) bulker.StreamOption
- type AbstractSQLStream
- type AbstractTransactionalSQLStream
- type AutoCommitStream
- func (ps *AutoCommitStream) Abort(ctx context.Context) (state bulker.State, err error)
- func (ps *AutoCommitStream) Complete(ctx context.Context) (state bulker.State, err error)
- func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObjects []types.Object, err error)
- type BQItem
- type BigQuery
- func (bq *BigQuery) Close() error
- func (bq *BigQuery) ColumnName(identifier string) string
- func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, merge bool) (err error)
- func (bq *BigQuery) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)
- func (bq *BigQuery) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (bq *BigQuery) CreateTable(ctx context.Context, table *Table) (err error)
- func (bq *BigQuery) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) (err error)
- func (bq *BigQuery) DeletePartition(ctx context.Context, tableName string, datePartiton *DatePartition) error
- func (bq *BigQuery) Drop(ctx context.Context, table *Table, ifExists bool) error
- func (bq *BigQuery) DropTable(ctx context.Context, tableName string, ifExists bool) error
- func (bq *BigQuery) GetBatchFileCompression() types.FileCompression
- func (bq *BigQuery) GetBatchFileFormat() types.FileFormat
- func (bq *BigQuery) GetDataType(sqlType string) (types.DataType, bool)
- func (bq *BigQuery) GetSQLType(dataType types.DataType) (string, bool)
- func (bq *BigQuery) GetTableSchema(ctx context.Context, tableName string) (*Table, error)
- func (bq *BigQuery) InitDatabase(ctx context.Context) error
- func (bq *BigQuery) Insert(ctx context.Context, table *Table, merge bool, objects ...types.Object) (err error)
- func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error)
- func (bq *BigQuery) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (bq *BigQuery) PatchTableSchema(ctx context.Context, patchSchema *Table) error
- func (bq *BigQuery) Ping(ctx context.Context) error
- func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (bq *BigQuery) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, ...) ([]map[string]any, error)
- func (bq *BigQuery) TableHelper() *TableHelper
- func (bq *BigQuery) TableName(identifier string) string
- func (bq *BigQuery) TruncateTable(ctx context.Context, tableName string) error
- func (bq *BigQuery) Type() string
- func (bq *BigQuery) Update(ctx context.Context, tableName string, object types.Object, ...) (err error)
- type ClickHouse
- func (ch *ClickHouse) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, merge bool) (err error)
- func (ch *ClickHouse) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)
- func (ch *ClickHouse) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error
- func (ch *ClickHouse) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error
- func (ch *ClickHouse) Drop(ctx context.Context, table *Table, ifExists bool) error
- func (ch *ClickHouse) DropTable(ctx context.Context, tableName string, ifExists bool) error
- func (ch *ClickHouse) GetTableSchema(ctx context.Context, tableName string) (*Table, error)
- func (ch *ClickHouse) InitDatabase(ctx context.Context) error
- func (ch *ClickHouse) Insert(ctx context.Context, table *Table, merge bool, objects ...types.Object) (err error)
- func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error)
- func (ch *ClickHouse) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (ch *ClickHouse) PatchTableSchema(ctx context.Context, patchSchema *Table) error
- func (ch *ClickHouse) Ping(ctx context.Context) error
- func (ch *ClickHouse) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (ch *ClickHouse) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, ...) ([]map[string]any, error)
- func (ch *ClickHouse) TruncateTable(ctx context.Context, tableName string) error
- func (ch *ClickHouse) Type() string
- type ClickHouseConfig
- type ClickHouseProtocol
- type ColumnDDLFunction
- type ColumnScanner
- type Columns
- type DataSourceConfig
- type DatePartition
- type DbConnectFunction
- type DummyTypeResolver
- type EngineConfig
- type ErrorAdapter
- type Field
- type FieldConfig
- type Fields
- type Granularity
- type IdentifierFunction
- type LoadSource
- type LoadSourceType
- type MySQL
- func (m *MySQL) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, merge bool) error
- func (m *MySQL) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (m *MySQL) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (m *MySQL) GetTableSchema(ctx context.Context, tableName string) (*Table, error)
- func (m *MySQL) InitDatabase(ctx context.Context) error
- func (m *MySQL) Insert(ctx context.Context, table *Table, merge bool, objects ...types.Object) error
- func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error)
- func (m *MySQL) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (m *MySQL) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- type ParameterPlaceholder
- type Postgres
- func (p *Postgres) Close() error
- func (p *Postgres) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, merge bool) error
- func (p *Postgres) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (p *Postgres) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (p *Postgres) GetTableSchema(ctx context.Context, tableName string) (*Table, error)
- func (p *Postgres) InitDatabase(ctx context.Context) error
- func (p *Postgres) Insert(ctx context.Context, table *Table, merge bool, objects ...types.Object) error
- func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error)
- func (p *Postgres) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- type PostgresConfig
- type QueryPayload
- type Redshift
- func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, merge bool) (err error)
- func (p *Redshift) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (p *Redshift) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (p *Redshift) GetTableSchema(ctx context.Context, tableName string) (*Table, error)
- func (p *Redshift) Insert(ctx context.Context, table *Table, merge bool, objects ...types.Object) error
- func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error)
- func (p *Redshift) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (p *Redshift) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (p *Redshift) Type() string
- type RedshiftConfig
- type ReplacePartitionStream
- type ReplaceTableStream
- type RepresentationTable
- type S3OptionConfig
- type SQLAdapter
- type SQLAdapterBase
- func (b *SQLAdapterBase[T]) Close() error
- func (b *SQLAdapterBase[T]) ColumnName(identifier string) string
- func (b *SQLAdapterBase[T]) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)
- func (b *SQLAdapterBase[T]) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (b *SQLAdapterBase[T]) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error
- func (b *SQLAdapterBase[T]) Drop(ctx context.Context, table *Table, ifExists bool) error
- func (b *SQLAdapterBase[T]) DropTable(ctx context.Context, tableName string, ifExists bool) error
- func (b *SQLAdapterBase[T]) GetBatchFileCompression() types.FileCompression
- func (b *SQLAdapterBase[T]) GetBatchFileFormat() types.FileFormat
- func (b *SQLAdapterBase[T]) GetDataType(sqlType string) (types.DataType, bool)
- func (b *SQLAdapterBase[T]) GetSQLType(dataType types.DataType) (string, bool)
- func (b *SQLAdapterBase[T]) PatchTableSchema(ctx context.Context, patchTable *Table) error
- func (b *SQLAdapterBase[T]) Ping(ctx context.Context) error
- func (b *SQLAdapterBase[T]) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (b *SQLAdapterBase[T]) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, ...) ([]map[string]any, error)
- func (b *SQLAdapterBase[T]) TableHelper() *TableHelper
- func (b *SQLAdapterBase[T]) TableName(identifier string) string
- func (b *SQLAdapterBase[T]) ToWhenConditions(conditions *WhenConditions, paramExpression ParameterPlaceholder, ...) (string, []any)
- func (b *SQLAdapterBase[T]) TruncateTable(ctx context.Context, tableName string) error
- func (b *SQLAdapterBase[T]) Type() string
- func (b *SQLAdapterBase[T]) Update(ctx context.Context, table *Table, object types.Object, ...) error
- type SSLConfig
- type Snowflake
- func (s *Snowflake) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, merge bool) error
- func (s *Snowflake) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (s *Snowflake) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (s *Snowflake) GetTableSchema(ctx context.Context, tableName string) (*Table, error)
- func (s *Snowflake) InitDatabase(ctx context.Context) error
- func (s *Snowflake) Insert(ctx context.Context, table *Table, merge bool, objects ...types.Object) error
- func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error)
- func (s *Snowflake) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (s *Snowflake) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) error
- func (s *Snowflake) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, ...) ([]map[string]any, error)
- type SnowflakeConfig
- type Table
- type TableField
- type TableHelper
- func (th *TableHelper) ColumnName(columnName string) string
- func (th *TableHelper) EnsureTableWithCaching(ctx context.Context, destinationID string, dataSchema *Table) (*Table, error)
- func (th *TableHelper) EnsureTableWithoutCaching(ctx context.Context, destinationID string, dataSchema *Table) (*Table, error)
- func (th *TableHelper) GetCached(tableName string) (*Table, bool)
- func (th *TableHelper) MapTableSchema(batchHeader *TypesHeader, object types.Object, pkFields utils.Set[string], ...) (*Table, types.Object)
- func (th *TableHelper) SetSQLAdapter(adapter SQLAdapter)
- func (th *TableHelper) TableName(tableName string) string
- type TableStatementFactory
- type TransactionalStream
- type TxOrDB
- type TxSQLAdapter
- func (tx *TxSQLAdapter) ColumnName(identifier string) string
- func (tx *TxSQLAdapter) Commit() error
- func (tx *TxSQLAdapter) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, merge bool) error
- func (tx *TxSQLAdapter) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)
- func (tx *TxSQLAdapter) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (tx *TxSQLAdapter) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error
- func (tx *TxSQLAdapter) Drop(ctx context.Context, table *Table, ifExists bool) error
- func (tx *TxSQLAdapter) DropTable(ctx context.Context, tableName string, ifExists bool) error
- func (tx *TxSQLAdapter) GetBatchFileCompression() types.FileCompression
- func (tx *TxSQLAdapter) GetBatchFileFormat() types.FileFormat
- func (tx *TxSQLAdapter) GetDataType(sqlType string) (types.DataType, bool)
- func (tx *TxSQLAdapter) GetSQLType(dataType types.DataType) (string, bool)
- func (tx *TxSQLAdapter) GetTableSchema(ctx context.Context, tableName string) (*Table, error)
- func (tx *TxSQLAdapter) InitDatabase(ctx context.Context) error
- func (tx *TxSQLAdapter) Insert(ctx context.Context, table *Table, merge bool, objects ...types.Object) error
- func (tx *TxSQLAdapter) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) error
- func (tx *TxSQLAdapter) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (tx *TxSQLAdapter) PatchTableSchema(ctx context.Context, patchTable *Table) error
- func (tx *TxSQLAdapter) Ping(ctx context.Context) error
- func (tx *TxSQLAdapter) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) error
- func (tx *TxSQLAdapter) Rollback() error
- func (tx *TxSQLAdapter) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, ...) ([]map[string]any, error)
- func (tx *TxSQLAdapter) TableHelper() *TableHelper
- func (tx *TxSQLAdapter) TableName(identifier string) string
- func (tx *TxSQLAdapter) TruncateTable(ctx context.Context, tableName string) error
- func (tx *TxSQLAdapter) Type() string
- type TxWrapper
- func (t *TxWrapper) Commit() error
- func (t *TxWrapper) Exec(query string, args ...any) (sql.Result, error)
- func (t *TxWrapper) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (t *TxWrapper) Prepare(query string) (*sql.Stmt, error)
- func (t *TxWrapper) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
- func (t *TxWrapper) Query(query string, args ...any) (*sql.Rows, error)
- func (t *TxWrapper) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (t *TxWrapper) QueryRow(query string, args ...any) *sql.Row
- func (t *TxWrapper) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- func (t *TxWrapper) Rollback() error
- type TypeCastFunction
- type TypeResolver
- type TypeResolverImpl
- type TypesHeader
- type ValueMappingFunction
- type WhenCondition
- type WhenConditions
Constants ¶
const ( BigQueryAutocommitUnsupported = "BigQuery bulker doesn't support auto commit mode as not efficient" BigqueryBulkerTypeId = "bigquery" )
const ( SSLModeRequire string = "require" SSLModeDisable string = "disable" SSLModeVerifyCA string = "verify-ca" SSLModeVerifyFull string = "verify-full" SSLModeNotProvided string = "" )
const BulkerManagedPkConstraintPrefix = "bulker_managed_"
const (
ClickHouseBulkerTypeId = "clickhouse"
)
TODO tmp table
const ContextTransactionKey = "transaction"
const (
MySQLBulkerTypeId = "mysql"
)
const PartitonIdKeyword = "__partition_id"
const (
PostgresBulkerTypeId = "postgres"
)
const (
RedshiftBulkerTypeId = "redshift"
)
const (
SnowflakeBulkerTypeId = "snowflake"
)
const SqlTypePrefix = "__sql_type"
Variables ¶
var BigQueryPartitonIdRegex = regexp.MustCompile("(\\w+)/(\\d\\d\\d\\d-\\d\\d-\\d\\dT\\d\\d:\\d\\d:\\d\\dZ)")
var ( ColumnTypesOption = bulker.ImplementationOption[types.SQLTypes]{ Key: "columnTypes", DefaultValue: types.SQLTypes{}, AdvancedParseFunc: func(o *bulker.ImplementationOption[types.SQLTypes], serializedValue any) (bulker.StreamOption, error) { switch v := serializedValue.(type) { case map[string]any: sqlTypes := types.SQLTypes{} for key, value := range v { switch t := value.(type) { case string: sqlTypes.With(key, t) case []string: if len(t) == 1 { sqlTypes.With(key, t[0]) } else if len(t) == 2 { sqlTypes.WithDDL(key, t[0], t[1]) } else { return nil, fmt.Errorf("failed to parse 'columnTypes' option: %v incorrect number of elements. expected 1 or 2", v) } } } return withColumnTypes(o, sqlTypes), nil default: return nil, fmt.Errorf("failed to parse 'columnTypes' option: %v incorrect type: %T expected map[string]any", v, v) } }, } )
var DefaultTypeResolver = NewTypeResolver()
var ErrTableNotExist = errors.New("table doesn't exist")
var IndexParameterPlaceholder = func(i int, name string) string { return "$" + strconv.Itoa(i) }
var NamedParameterPlaceholder = func(i int, name string) string {
return "@" + name
}
var QuestionMarkParameterPlaceholder = func(i int, name string) string {
return "?"
}
Functions ¶
func BuildConstraintName ¶
func GranularityToPartitionIds ¶
func GranularityToPartitionIds(g Granularity, t time.Time) []string
func NewBigquery ¶
NewBigquery return configured BigQuery bulker.Bulker instance
func NewClickHouse ¶
NewClickHouse returns configured ClickHouse adapter instance
func NewPostgres ¶
NewPostgres return configured Postgres bulker.Bulker instance
func NewRedshift ¶
NewRedshift returns configured Redshift adapter instance
func NewSnowflake ¶
NewSnowflake returns configured Snowflake adapter instance
func ProcessSSL ¶
func ProcessSSL(dir string, dsc *PostgresConfig) error
ProcessSSL serializes SSL payload (ca, client cert, key) into files enriches input DataSourceConfig parameters with SSL config ssl configuration might be file path as well as string content
func WithColumnType ¶
func WithColumnType(columnName, sqlType string) bulker.StreamOption
WithColumnType provides overrides for column type of single column for current BulkerStream object fields
func WithColumnTypeDDL ¶
func WithColumnTypeDDL(columnName, sqlType, ddlType string) bulker.StreamOption
WithColumnTypeDDL provides overrides for column type and DDL type of single column for current BulkerStream object fields
func WithColumnTypes ¶
func WithColumnTypes(fields types.SQLTypes) bulker.StreamOption
WithColumnTypes provides overrides for column types of current BulkerStream object fields
Types ¶
type AbstractSQLStream ¶
type AbstractSQLStream struct {
// contains filtered or unexported fields
}
type AbstractTransactionalSQLStream ¶
type AbstractTransactionalSQLStream struct { AbstractSQLStream // contains filtered or unexported fields }
type AutoCommitStream ¶
type AutoCommitStream struct {
AbstractSQLStream
}
type BQItem ¶
type BQItem struct {
// contains filtered or unexported fields
}
BQItem struct for streaming inserts to BigQuery
type BigQuery ¶
type BigQuery struct { objects.ServiceBase // contains filtered or unexported fields }
BigQuery adapter for creating,patching (schema or table), inserting and copying data from gcs to BigQuery
func (*BigQuery) ColumnName ¶
func (*BigQuery) CopyTables ¶
func (*BigQuery) CreateStream ¶
func (bq *BigQuery) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*BigQuery) CreateTable ¶
CreateTable creates google BigQuery table from Table
func (*BigQuery) DeletePartition ¶
func (*BigQuery) GetBatchFileCompression ¶
func (bq *BigQuery) GetBatchFileCompression() types.FileCompression
func (*BigQuery) GetBatchFileFormat ¶
func (bq *BigQuery) GetBatchFileFormat() types.FileFormat
func (*BigQuery) GetDataType ¶
func (*BigQuery) GetSQLType ¶
func (*BigQuery) GetTableSchema ¶
GetTableSchema return google BigQuery table (name,columns) representation wrapped in Table struct
func (*BigQuery) InitDatabase ¶
InitDatabase creates google BigQuery Dataset if doesn't exist
func (*BigQuery) PatchTableSchema ¶
PatchTableSchema adds Table columns to google BigQuery table
func (*BigQuery) ReplaceTable ¶
func (*BigQuery) TableHelper ¶
func (bq *BigQuery) TableHelper() *TableHelper
func (*BigQuery) TruncateTable ¶
TruncateTable deletes all records in tableName table
type ClickHouse ¶
type ClickHouse struct { SQLAdapterBase[ClickHouseConfig] // contains filtered or unexported fields }
ClickHouse is adapter for creating,patching (schema or table), inserting data to clickhouse
func (*ClickHouse) CopyTables ¶
func (*ClickHouse) Count ¶
func (ch *ClickHouse) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)
func (*ClickHouse) CreateStream ¶
func (ch *ClickHouse) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*ClickHouse) CreateTable ¶
func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error
CreateTable create database table with name,columns provided in Table representation New tables will have MergeTree() or ReplicatedMergeTree() engine depends on config.cluster empty or not
func (*ClickHouse) Delete ¶
func (ch *ClickHouse) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error
func (*ClickHouse) GetTableSchema ¶
GetTableSchema return table (name,columns with name and types) representation wrapped in Table struct
func (*ClickHouse) InitDatabase ¶
func (ch *ClickHouse) InitDatabase(ctx context.Context) error
InitDatabase create database instance if doesn't exist
func (*ClickHouse) LoadTable ¶
func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error)
LoadTable transfer data from local file to ClickHouse table
func (*ClickHouse) OpenTx ¶
func (ch *ClickHouse) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
OpenTx opens underline sql transaction and return wrapped instance
func (*ClickHouse) PatchTableSchema ¶
func (ch *ClickHouse) PatchTableSchema(ctx context.Context, patchSchema *Table) error
PatchTableSchema add new columns(from provided Table) to existing table drop and create distributed table
func (*ClickHouse) ReplaceTable ¶
func (*ClickHouse) Select ¶
func (ch *ClickHouse) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)
func (*ClickHouse) TruncateTable ¶
func (ch *ClickHouse) TruncateTable(ctx context.Context, tableName string) error
TruncateTable deletes all records in tableName table
func (*ClickHouse) Type ¶
func (ch *ClickHouse) Type() string
type ClickHouseConfig ¶
type ClickHouseConfig struct { Protocol ClickHouseProtocol `mapstructure:"protocol,omitempty" json:"protocol,omitempty" yaml:"protocol,omitempty"` Hosts []string `mapstructure:"hosts,omitempty" json:"hosts,omitempty" yaml:"hosts,omitempty"` Parameters map[string]string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"` Username string `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"` Password string `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"` Database string `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"` Cluster string `mapstructure:"cluster,omitempty" json:"cluster,omitempty" yaml:"cluster,omitempty"` TLS map[string]string `mapstructure:"tls,omitempty" json:"tls,omitempty" yaml:"tls,omitempty"` Engine *EngineConfig `mapstructure:"engine,omitempty" json:"engine,omitempty" yaml:"engine,omitempty"` }
ClickHouseConfig dto for deserialized clickhouse config
func (*ClickHouseConfig) Validate ¶
func (chc *ClickHouseConfig) Validate() error
Validate required fields in ClickHouseConfig
type ClickHouseProtocol ¶
type ClickHouseProtocol string
const ( ClickHouseProtocolNative ClickHouseProtocol = "clickhouse" ClickHouseProtocolSecure ClickHouseProtocol = "clickhouse-secure" ClickHouseProtocolHTTP ClickHouseProtocol = "http" ClickHouseProtocolHTTPS ClickHouseProtocol = "https" )
type ColumnDDLFunction ¶
ColumnDDLFunction generate column DDL for CREATE TABLE statement based on type (SQLColumn) and whether it is used for PK
type ColumnScanner ¶
type ColumnScanner struct { ColumnType *sql.ColumnType // contains filtered or unexported fields }
func (*ColumnScanner) Get ¶
func (s *ColumnScanner) Get() any
func (*ColumnScanner) Scan ¶
func (s *ColumnScanner) Scan(src any) error
type DataSourceConfig ¶
type DataSourceConfig struct { Host string `mapstructure:"host,omitempty" json:"host,omitempty" yaml:"host,omitempty"` Port int `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"` Db string `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"` Schema string `mapstructure:"defaultSchema,omitempty" json:"defaultSchema,omitempty" yaml:"defaultSchema,omitempty"` Username string `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"` Password string `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"` Parameters map[string]string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"` }
DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination)
func (*DataSourceConfig) Validate ¶
func (dsc *DataSourceConfig) Validate() error
Validate required fields in DataSourceConfig
type DatePartition ¶
type DatePartition struct { Field string Value time.Time Granularity Granularity }
type DbConnectFunction ¶
DbConnectFunction function is used to connect to database
type DummyTypeResolver ¶
type DummyTypeResolver struct { }
DummyTypeResolver doesn't do anything
func NewDummyTypeResolver ¶
func NewDummyTypeResolver() *DummyTypeResolver
NewDummyTypeResolver return DummyTypeResolver
type EngineConfig ¶
type EngineConfig struct { RawStatement string `mapstructure:"rawStatement,omitempty" json:"rawStatement,omitempty" yaml:"rawStatement,omitempty"` NullableFields []string `mapstructure:"nullableFields,omitempty" json:"nullableFields,omitempty" yaml:"nullableFields,omitempty"` PartitionFields []FieldConfig `mapstructure:"partitionFields,omitempty" json:"partitionFields,omitempty" yaml:"partitionFields,omitempty"` OrderFields []FieldConfig `mapstructure:"orderFields,omitempty" json:"orderFields,omitempty" yaml:"orderFields,omitempty"` PrimaryKeys []string `mapstructure:"primaryKeys,omitempty" json:"primaryKeys,omitempty" yaml:"primaryKeys,omitempty"` }
EngineConfig dto for deserialized clickhouse engine config
type ErrorAdapter ¶
ErrorAdapter is used to extract implementation specific payload and adapt to standard error
type Field ¶
type Field struct {
// contains filtered or unexported fields
}
Field is a data type holder with sql type suggestion
func NewFieldWithSQLType ¶
NewFieldWithSQLType returns Field instance with configured suggested sql types
func (Field) GetSuggestedSQLType ¶
GetSuggestedSQLType returns suggested SQL type if configured
type FieldConfig ¶
type FieldConfig struct { Function string `mapstructure:"function,omitempty" json:"function,omitempty" yaml:"function,omitempty"` Field string `mapstructure:"field,omitempty" json:"field,omitempty" yaml:"field,omitempty"` }
FieldConfig dto for deserialized clickhouse engine fields
type Fields ¶
func (Fields) OverrideTypes ¶
type Granularity ¶
type Granularity string
Granularity is a granularity of TimeInterval
const ( HOUR Granularity = "HOUR" DAY Granularity = "DAY" WEEK Granularity = "WEEK" MONTH Granularity = "MONTH" QUARTER Granularity = "QUARTER" YEAR Granularity = "YEAR" ALL Granularity = "ALL" )
func ParseGranularity ¶
func ParseGranularity(s string) (Granularity, error)
ParseGranularity returns Granularity value from string
func (Granularity) Format ¶
func (g Granularity) Format(t time.Time) string
Format returns formatted string value representation
func (Granularity) Lower ¶
func (g Granularity) Lower(t time.Time) time.Time
Lower returns the lower value of interval
func (Granularity) String ¶
func (g Granularity) String() string
String returns string value representation
type IdentifierFunction ¶
IdentifierFunction adapts identifier name to format required by database e.g. masks or escapes special characters
type LoadSource ¶
type LoadSource struct { Type LoadSourceType Format types.FileFormat Path string S3Config *S3OptionConfig }
type LoadSourceType ¶
type LoadSourceType string
const ( LocalFile LoadSourceType = "local_file" GoogleCloudStore LoadSourceType = "google_cloud_store" AmazonS3 LoadSourceType = "amazon_s3" )
type MySQL ¶
type MySQL struct { SQLAdapterBase[DataSourceConfig] // contains filtered or unexported fields }
MySQL is adapter for creating, patching (schema or table), inserting data to mySQL database
func (*MySQL) CopyTables ¶
func (*MySQL) CreateStream ¶
func (m *MySQL) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*MySQL) CreateTable ¶
func (*MySQL) GetTableSchema ¶
GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct
func (*MySQL) InitDatabase ¶
InitDatabase creates database instance if doesn't exist
type ParameterPlaceholder ¶
type Postgres ¶
type Postgres struct { SQLAdapterBase[PostgresConfig] // contains filtered or unexported fields }
Postgres is adapter for creating,patching (schema or table), inserting data to postgres
func (*Postgres) CopyTables ¶
func (*Postgres) CreateStream ¶
func (p *Postgres) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*Postgres) CreateTable ¶
func (*Postgres) GetTableSchema ¶
GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct
func (*Postgres) InitDatabase ¶
InitDatabase creates database schema instance if doesn't exist
type PostgresConfig ¶
type PostgresConfig struct { DataSourceConfig `mapstructure:",squash"` SSLConfig `mapstructure:",squash"` }
type QueryPayload ¶
type Redshift ¶
type Redshift struct { //Aws Redshift uses Postgres fork under the hood *Postgres // contains filtered or unexported fields }
Redshift adapter for creating,patching (schema or table), inserting and copying data from s3 to redshift
func (*Redshift) CopyTables ¶
func (*Redshift) CreateStream ¶
func (p *Redshift) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*Redshift) CreateTable ¶
func (*Redshift) GetTableSchema ¶
GetTableSchema return table (name,columns, primary key) representation wrapped in Table struct
func (*Redshift) LoadTable ¶
func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error)
LoadTable copy transfer data from s3 to redshift by passing COPY request to redshift
func (*Redshift) OpenTx ¶
func (p *Redshift) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
OpenTx opens underline sql transaction and return wrapped instance
func (*Redshift) ReplaceTable ¶
type RedshiftConfig ¶
type RedshiftConfig struct { DataSourceConfig `mapstructure:",squash"` S3OptionConfig `mapstructure:",squash" yaml:"-,inline"` }
type ReplacePartitionStream ¶
type ReplacePartitionStream struct { AbstractTransactionalSQLStream // contains filtered or unexported fields }
type ReplaceTableStream ¶
type ReplaceTableStream struct {
AbstractTransactionalSQLStream
}
type RepresentationTable ¶
type S3OptionConfig ¶
type S3OptionConfig struct { AccessKeyID string `mapstructure:"accessKeyId,omitempty" json:"accessKeyId,omitempty" yaml:"accessKeyId,omitempty"` SecretKey string `mapstructure:"secretAccessKey,omitempty" json:"secretAccessKey,omitempty" yaml:"secretAccessKey,omitempty"` Bucket string `mapstructure:"bucket,omitempty" json:"bucket,omitempty" yaml:"bucket,omitempty"` Region string `mapstructure:"region,omitempty" json:"region,omitempty" yaml:"region,omitempty"` Folder string `mapstructure:"folder,omitempty" json:"folder,omitempty" yaml:"folder,omitempty"` }
type SQLAdapter ¶
type SQLAdapter interface { Type() string //GetSQLType return mapping from generic bulker type to SQL type specific for this database GetSQLType(dataType types.DataType) (string, bool) //GetDataType return mapping from sql type to generic bulker type GetDataType(sqlType string) (types.DataType, bool) GetBatchFileFormat() types.FileFormat GetBatchFileCompression() types.FileCompression OpenTx(ctx context.Context) (*TxSQLAdapter, error) Insert(ctx context.Context, table *Table, merge bool, objects ...types.Object) error Ping(ctx context.Context) error // InitDatabase setups required db objects like 'schema' or 'dataset' if they don't exist InitDatabase(ctx context.Context) error TableHelper() *TableHelper GetTableSchema(ctx context.Context, tableName string) (*Table, error) CreateTable(ctx context.Context, schemaToCreate *Table) error CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, merge bool) error LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) error PatchTableSchema(ctx context.Context, patchTable *Table) error TruncateTable(ctx context.Context, tableName string) error //(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error DropTable(ctx context.Context, tableName string, ifExists bool) error Drop(ctx context.Context, table *Table, ifExists bool) error ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) error Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error) // ColumnName adapts column name to sql identifier rules of database ColumnName(rawColumn string) string // TableName adapts table name to sql identifier rules of database TableName(rawTableName string) string }
SQLAdapter is a manager for DWH tables
type SQLAdapterBase ¶
type SQLAdapterBase[T any] struct { objects.ServiceBase // contains filtered or unexported fields }
func (*SQLAdapterBase[T]) ColumnName ¶
func (b *SQLAdapterBase[T]) ColumnName(identifier string) string
func (*SQLAdapterBase[T]) Count ¶
func (b *SQLAdapterBase[T]) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)
func (*SQLAdapterBase[T]) CreateTable ¶
func (b *SQLAdapterBase[T]) CreateTable(ctx context.Context, schemaToCreate *Table) error
CreateTable create table columns and pk key override input table sql type with configured cast type make fields from Table PkFields - 'not null'
func (*SQLAdapterBase[T]) Delete ¶
func (b *SQLAdapterBase[T]) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error
func (*SQLAdapterBase[T]) GetBatchFileCompression ¶
func (b *SQLAdapterBase[T]) GetBatchFileCompression() types.FileCompression
func (*SQLAdapterBase[T]) GetBatchFileFormat ¶
func (b *SQLAdapterBase[T]) GetBatchFileFormat() types.FileFormat
func (*SQLAdapterBase[T]) GetDataType ¶
func (b *SQLAdapterBase[T]) GetDataType(sqlType string) (types.DataType, bool)
func (*SQLAdapterBase[T]) GetSQLType ¶
func (b *SQLAdapterBase[T]) GetSQLType(dataType types.DataType) (string, bool)
func (*SQLAdapterBase[T]) PatchTableSchema ¶
func (b *SQLAdapterBase[T]) PatchTableSchema(ctx context.Context, patchTable *Table) error
PatchTableSchema alter table with columns (if not empty) recreate primary key (if not empty) or delete primary key if Table.DeletePkFields is true
func (*SQLAdapterBase[T]) ReplaceTable ¶
func (*SQLAdapterBase[T]) Select ¶
func (b *SQLAdapterBase[T]) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)
func (*SQLAdapterBase[T]) TableHelper ¶
func (b *SQLAdapterBase[T]) TableHelper() *TableHelper
func (*SQLAdapterBase[T]) TableName ¶
func (b *SQLAdapterBase[T]) TableName(identifier string) string
func (*SQLAdapterBase[T]) ToWhenConditions ¶
func (b *SQLAdapterBase[T]) ToWhenConditions(conditions *WhenConditions, paramExpression ParameterPlaceholder, valuesShift int) (string, []any)
ToWhenConditions generates WHEN clause for SQL query based on provided WhenConditions
paramExpression - SQLParameterExpression function that produce parameter placeholder for parametrized query, depending on database can be: IndexParameterPlaceholder, QuestionMarkParameterPlaceholder, NamedParameterPlaceholder
valuesShift - for parametrized query index of first when clause value in all values provided to query (for UPDATE queries 'valuesShift' = len(object fields))
func (*SQLAdapterBase[T]) TruncateTable ¶
func (b *SQLAdapterBase[T]) TruncateTable(ctx context.Context, tableName string) error
TruncateTable deletes all records in tableName table
func (*SQLAdapterBase[T]) Type ¶
func (b *SQLAdapterBase[T]) Type() string
Type returns Postgres type
func (*SQLAdapterBase[T]) Update ¶
func (b *SQLAdapterBase[T]) Update(ctx context.Context, table *Table, object types.Object, whenConditions *WhenConditions) error
type SSLConfig ¶
type SSLConfig struct { SSLMode string `mapstructure:"sslMode,omitempty"` SSLServerCA string `mapstructure:"sslServerCA,omitempty"` SSLClientCert string `mapstructure:"sslClientCert,omitempty"` SSLClientKey string `mapstructure:"sslClientKey,omitempty"` }
SSLConfig is a dto for deserialized SSL configuration for Postgres
func (*SSLConfig) ValidateSSL ¶
ValidateSSL returns err if the ssl configuration is invalid
type Snowflake ¶
type Snowflake struct { SQLAdapterBase[SnowflakeConfig] }
Snowflake is adapter for creating,patching (schema or table), inserting data to snowflake
func (*Snowflake) CopyTables ¶
func (*Snowflake) CreateStream ¶
func (s *Snowflake) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*Snowflake) CreateTable ¶
func (*Snowflake) GetTableSchema ¶
GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct
func (*Snowflake) InitDatabase ¶
InitDatabase create database schema instance if doesn't exist
func (*Snowflake) Insert ¶
func (s *Snowflake) Insert(ctx context.Context, table *Table, merge bool, objects ...types.Object) error
Insert inserts data with InsertContext as a single object or a batch into Snowflake
func (*Snowflake) LoadTable ¶
func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (err error)
LoadTable transfer data from local file to Snowflake by passing COPY request to Snowflake
func (*Snowflake) OpenTx ¶
func (s *Snowflake) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
OpenTx opens underline sql transaction and return wrapped instance
func (*Snowflake) ReplaceTable ¶
type SnowflakeConfig ¶
type SnowflakeConfig struct { Account string `mapstructure:"account,omitempty" json:"account,omitempty" yaml:"account,omitempty"` Port int `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"` Db string `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"` Schema string `mapstructure:"defaultSchema,omitempty" json:"defaultSchema,omitempty" yaml:"defaultSchema,omitempty"` Username string `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"` Password string `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"` Warehouse string `mapstructure:"warehouse,omitempty" json:"warehouse,omitempty" yaml:"warehouse,omitempty"` Parameters map[string]*string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"` }
SnowflakeConfig dto for deserialized datasource config for Snowflake
func (*SnowflakeConfig) Validate ¶
func (sc *SnowflakeConfig) Validate() error
Validate required fields in SnowflakeConfig
type Table ¶
type Table struct { Name string Temporary bool Cached bool Columns Columns PKFields utils.Set[string] PrimaryKeyName string TimestampColumn string Partition DatePartition DeletePkFields bool }
Table is a dto for DWH Table representation
func (*Table) Diff ¶
Diff calculates diff between current schema and another one. Return schema to add to current schema (for being equal) or empty if 1) another one is empty 2) all fields from another schema exist in current schema NOTE: Diff method doesn't take types into account
func (*Table) FitsToTable ¶
FitsToTable checks that current table fits to the destination table column-wise (doesn't have new columns)
func (*Table) GetPKFields ¶
GetPKFields returns primary keys list
func (*Table) GetPKFieldsSet ¶
GetPKFieldsSet returns primary keys set
func (*Table) SortedColumnNames ¶
SortedColumnNames return column names sorted in alphabetical order
type TableField ¶
type TableField struct { Field string `json:"field,omitempty"` Type string `json:"type,omitempty"` Value any `json:"value,omitempty"` }
TableField is a table column representation
type TableHelper ¶
TableHelper keeps tables schema state inmemory and update it according to incoming new data consider that all tables are in one destination schema. note: Assume that after any outer changes in db we need to increment table version in Service
func NewTableHelper ¶
func NewTableHelper(sqlAdapter SQLAdapter, maxIdentifierLength int, identifierQuoteChar rune) TableHelper
NewTableHelper returns configured TableHelper instance Note: columnTypesMapping must be not empty (or fields will be ignored)
func (*TableHelper) ColumnName ¶
func (th *TableHelper) ColumnName(columnName string) string
func (*TableHelper) EnsureTableWithCaching ¶
func (th *TableHelper) EnsureTableWithCaching(ctx context.Context, destinationID string, dataSchema *Table) (*Table, error)
EnsureTableWithCaching calls ensureTable with cacheTable = true it is used in stream destinations (because we don't have time to select table schema, but there is retry on error)
func (*TableHelper) EnsureTableWithoutCaching ¶
func (th *TableHelper) EnsureTableWithoutCaching(ctx context.Context, destinationID string, dataSchema *Table) (*Table, error)
EnsureTableWithoutCaching calls ensureTable with cacheTable = true it is used in batch destinations and syncStore (because we have time to select table schema)
func (*TableHelper) MapTableSchema ¶
func (th *TableHelper) MapTableSchema(batchHeader *TypesHeader, object types.Object, pkFields utils.Set[string], timestampColumn string) (*Table, types.Object)
MapTableSchema maps types.TypesHeader (JSON structure with json data types) into types.Table (structure with SQL types) applies column types mapping adjusts object properties names to column names
func (*TableHelper) SetSQLAdapter ¶
func (th *TableHelper) SetSQLAdapter(adapter SQLAdapter)
func (*TableHelper) TableName ¶
func (th *TableHelper) TableName(tableName string) string
type TableStatementFactory ¶
type TableStatementFactory struct {
// contains filtered or unexported fields
}
TableStatementFactory is used for creating CREATE TABLE statements depends on config
func NewTableStatementFactory ¶
func NewTableStatementFactory(config *ClickHouseConfig) (*TableStatementFactory, error)
func (TableStatementFactory) CreateTableStatement ¶
func (tsf TableStatementFactory) CreateTableStatement(quotedTableName, tableName, columnsClause string, table *Table) string
CreateTableStatement return clickhouse DDL for creating table statement
type TransactionalStream ¶
type TransactionalStream struct {
AbstractTransactionalSQLStream
}
TODO: Use real temporary tables
type TxOrDB ¶
type TxOrDB interface { ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) Exec(query string, args ...any) (sql.Result, error) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) Query(query string, args ...any) (*sql.Rows, error) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row QueryRow(query string, args ...any) *sql.Row PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) Prepare(query string) (*sql.Stmt, error) }
type TxSQLAdapter ¶
type TxSQLAdapter struct {
// contains filtered or unexported fields
}
func (*TxSQLAdapter) ColumnName ¶
func (tx *TxSQLAdapter) ColumnName(identifier string) string
func (*TxSQLAdapter) Commit ¶
func (tx *TxSQLAdapter) Commit() error
func (*TxSQLAdapter) CopyTables ¶
func (*TxSQLAdapter) Count ¶
func (tx *TxSQLAdapter) Count(ctx context.Context, tableName string, whenConditions *WhenConditions) (int, error)
func (*TxSQLAdapter) CreateTable ¶
func (tx *TxSQLAdapter) CreateTable(ctx context.Context, schemaToCreate *Table) error
func (*TxSQLAdapter) Delete ¶
func (tx *TxSQLAdapter) Delete(ctx context.Context, tableName string, deleteConditions *WhenConditions) error
func (tx *TxSQLAdapter) Update(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) return tx.sqlAdapter.Update(ctx, tableName, object, whenConditions) }
func (*TxSQLAdapter) GetBatchFileCompression ¶
func (tx *TxSQLAdapter) GetBatchFileCompression() types.FileCompression
func (*TxSQLAdapter) GetBatchFileFormat ¶
func (tx *TxSQLAdapter) GetBatchFileFormat() types.FileFormat
func (*TxSQLAdapter) GetDataType ¶
func (tx *TxSQLAdapter) GetDataType(sqlType string) (types.DataType, bool)
func (*TxSQLAdapter) GetSQLType ¶
func (tx *TxSQLAdapter) GetSQLType(dataType types.DataType) (string, bool)
func (*TxSQLAdapter) GetTableSchema ¶
func (*TxSQLAdapter) InitDatabase ¶
func (tx *TxSQLAdapter) InitDatabase(ctx context.Context) error
func (*TxSQLAdapter) LoadTable ¶
func (tx *TxSQLAdapter) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) error
func (*TxSQLAdapter) OpenTx ¶
func (tx *TxSQLAdapter) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
func (*TxSQLAdapter) PatchTableSchema ¶
func (tx *TxSQLAdapter) PatchTableSchema(ctx context.Context, patchTable *Table) error
func (*TxSQLAdapter) ReplaceTable ¶
func (*TxSQLAdapter) Rollback ¶
func (tx *TxSQLAdapter) Rollback() error
func (*TxSQLAdapter) Select ¶
func (tx *TxSQLAdapter) Select(ctx context.Context, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)
func (*TxSQLAdapter) TableHelper ¶
func (tx *TxSQLAdapter) TableHelper() *TableHelper
func (*TxSQLAdapter) TableName ¶
func (tx *TxSQLAdapter) TableName(identifier string) string
func (*TxSQLAdapter) TruncateTable ¶
func (tx *TxSQLAdapter) TruncateTable(ctx context.Context, tableName string) error
func (*TxSQLAdapter) Type ¶
func (tx *TxSQLAdapter) Type() string
type TxWrapper ¶
type TxWrapper struct {
// contains filtered or unexported fields
}
TxWrapper is sql transaction wrapper. Used for handling and log errors with db type (postgres, mySQL, redshift or snowflake) on Commit() and Rollback() calls
func NewDbWrapper ¶
func NewDbWrapper(dbType string, db *sql.DB, queryLogger *logging.QueryLogger, errorAdapter ErrorAdapter) *TxWrapper
func NewDummyTxWrapper ¶
func NewTxWrapper ¶
func NewTxWrapper(dbType string, tx *sql.Tx, queryLogger *logging.QueryLogger, errorAdapter ErrorAdapter) *TxWrapper
func (*TxWrapper) Exec ¶
Exec executes a query that doesn't return rows. For example: an INSERT and UPDATE.
Exec uses context.Background internally; to specify the context, use ExecContext.
func (*TxWrapper) ExecContext ¶
ExecContext executes a query that doesn't return rows. For example: an INSERT and UPDATE.
func (*TxWrapper) Prepare ¶
Prepare creates a prepared statement for use within a transaction.
The returned statement operates within the transaction and will be closed when the transaction has been committed or rolled back.
To use an existing prepared statement on this transaction, see Tx.Stmt.
Prepare uses context.Background internally; to specify the context, use PrepareContext.
func (*TxWrapper) PrepareContext ¶
PrepareContext creates a prepared statement for use within a transaction.
The returned statement operates within the transaction and will be closed when the transaction has been committed or rolled back.
To use an existing prepared statement on this transaction, see Tx.Stmt.
The provided context will be used for the preparation of the context, not for the execution of the returned statement. The returned statement will run in the transaction context.
func (*TxWrapper) Query ¶
Query executes a query that returns rows, typically a SELECT.
Query uses context.Background internally; to specify the context, use QueryContext.
func (*TxWrapper) QueryContext ¶
QueryContext executes a query that returns rows, typically a SELECT.
func (*TxWrapper) QueryRow ¶
QueryRow executes a query that is expected to return at most one row. QueryRow always returns a non-nil value. Errors are deferred until Row's Scan method is called. If the query selects no rows, the *Row's Scan will return ErrNoRows. Otherwise, the *Row's Scan scans the first selected row and discards the rest.
QueryRow uses context.Background internally; to specify the context, use QueryRowContext.
func (*TxWrapper) QueryRowContext ¶
QueryRowContext executes a query that is expected to return at most one row. QueryRowContext always returns a non-nil value. Errors are deferred until Row's Scan method is called. If the query selects no rows, the *Row's Scan will return ErrNoRows. Otherwise, the *Row's Scan scans the first selected row and discards the rest.
type TypeCastFunction ¶
TypeCastFunction wraps parameter(or placeholder) to a type cast expression if it is necessary (e.g. on types overrides)
type TypeResolver ¶
type TypeResolver interface {
Resolve(object map[string]any, sqlTypeHints types.SQLTypes) (Fields, error)
}
TypeResolver resolves types.Fields from input object
type TypeResolverImpl ¶
type TypeResolverImpl struct { }
TypeResolverImpl resolves types based on converter.go rules
func NewTypeResolver ¶
func NewTypeResolver() *TypeResolverImpl
NewTypeResolver returns TypeResolverImpl
func (*TypeResolverImpl) Resolve ¶
func (tr *TypeResolverImpl) Resolve(object map[string]any, sqlTypeHints types.SQLTypes) (Fields, error)
Resolve return types.Fields representation of input object apply default typecast and define column types reformat from json.Number into int64 or float64 and put back reformat from string with timestamp into time.Time and put back
type TypesHeader ¶
type TypesHeader struct { TableName string Fields Fields Partition DatePartition }
TypesHeader is the schema result of parsing JSON objects
func ProcessEvents ¶
func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes) (*TypesHeader, types.Object, error)
ProcessEvents processes events objects without applying mapping rules returns table headerm array of processed objects or error if at least 1 was occurred
func (*TypesHeader) Exists ¶
func (bh *TypesHeader) Exists() bool
Exists returns true if there is at least one field
type ValueMappingFunction ¶
ValueMappingFunction maps object value to database value. For cases such default value substitution for null or missing values
type WhenCondition ¶
WhenCondition is a representation of SQL delete condition
type WhenConditions ¶
type WhenConditions struct { Conditions []WhenCondition JoinCondition string }
WhenConditions is a dto for multiple WhenCondition instances with Joiner
func ByPartitionId ¶
func ByPartitionId(partitonId string) *WhenConditions
ByPartitionId return delete condition that removes objects based on __partition_id value or empty condition if partitonId is empty
func NewWhenConditions ¶
func NewWhenConditions(field string, clause string, value any) *WhenConditions
func (*WhenConditions) Add ¶
func (dc *WhenConditions) Add(field string, clause string, value any) *WhenConditions
func (*WhenConditions) IsEmpty ¶
func (dc *WhenConditions) IsEmpty() bool
IsEmpty returns true if there is no conditions
Source Files ¶
- abstract.go
- abstract_transactional.go
- autocommit_stream.go
- batch_header.go
- bigquery.go
- clickhouse.go
- datasource_config.go
- delete_condition.go
- mysql.go
- options.go
- postgres.go
- processor.go
- redshift.go
- replacepartition_stream.go
- replacetable_stream.go
- snowflake.go
- sql_adapter.go
- sql_adapter_base.go
- table.go
- table_helper.go
- transactional_stream.go
- tx_wrapper.go
- type_resolver.go
- utils.go