sql

package
v0.0.0-...-ec019eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2024 License: MIT Imports: 46 Imported by: 2

Documentation

Index

Constants

View Source
const (
	BigQueryAutocommitUnsupported = "BigQuery bulker doesn't support auto commit mode as not efficient"
	BigqueryBulkerTypeId          = "bigquery"
)
View Source
const (
	SSLModeRequire    string = "require"
	SSLModeDisable    string = "disable"
	SSLModeVerifyCA   string = "verify-ca"
	SSLModeVerifyFull string = "verify-full"

	SSLModeNotProvided string = ""
)
View Source
const BulkerManagedPkConstraintPrefix = "jitsu_pk_"
View Source
const (
	ClickHouseBulkerTypeId = "clickhouse"
)
View Source
const ContextTransactionKey = "transaction"
View Source
const (
	MySQLBulkerTypeId = "mysql"
)
View Source
const (

	// that value indicates that table must not use namespace (schema or db) in queries.
	// e.g. for Redshift where temporary tables don't belong to any schema
	NoNamespaceValue = "__jitsu_no_namespace__"
)
View Source
const PartitonIdKeyword = "__partition_id"
View Source
const (
	PostgresBulkerTypeId = "postgres"
)
View Source
const (
	RedshiftBulkerTypeId = "redshift"
)
View Source
const (
	SnowflakeBulkerTypeId = "snowflake"
)

Variables

View Source
var (
	ColumnTypesOption = bulker.ImplementationOption[types.SQLTypes]{
		Key:          "columnTypes",
		DefaultValue: types.SQLTypes{},
		AdvancedParseFunc: func(o *bulker.ImplementationOption[types.SQLTypes], serializedValue any) (bulker.StreamOption, error) {
			switch v := serializedValue.(type) {
			case map[string]any:
				sqlTypes := types.SQLTypes{}
				for key, value := range v {
					switch t := value.(type) {
					case string:
						sqlTypes.With(key, t)
					case []string:
						if len(t) == 1 {
							sqlTypes.With(key, t[0])
						} else if len(t) == 2 {
							sqlTypes.WithDDL(key, t[0], t[1])
						} else {
							return nil, fmt.Errorf("failed to parse 'columnTypes' option: %v incorrect number of elements. expected 1 or 2", v)
						}
					}
				}
				return withColumnTypes(o, sqlTypes), nil
			default:
				return nil, fmt.Errorf("failed to parse 'columnTypes' option: %v incorrect type: %T expected map[string]any", v, v)
			}
		},
	}

	DeduplicateWindow = bulker.ImplementationOption[int]{
		Key:          "deduplicateWindow",
		DefaultValue: 31,
		ParseFunc:    utils.ParseInt,
	}

	OmitNilsOption = bulker.ImplementationOption[bool]{
		Key:          "omitNils",
		DefaultValue: true,
		ParseFunc:    utils.ParseBool,
	}

	SchemaFreezeOption = bulker.ImplementationOption[bool]{
		Key:          "schemaFreeze",
		DefaultValue: false,
		ParseFunc:    utils.ParseBool,
	}

	MaxColumnsCount = bulker.ImplementationOption[int]{
		Key:          "maxColumnsCount",
		DefaultValue: 5000,
		ParseFunc:    utils.ParseInt,
	}

	TemporaryBatchSizeOption = bulker.ImplementationOption[int]{
		Key:          "temporaryBatchSize",
		DefaultValue: 0,
		ParseFunc:    utils.ParseInt,
	}
)
View Source
var BigQueryPartitonIdRegex = regexp.MustCompile("(\\w+)/(\\d\\d\\d\\d-\\d\\d-\\d\\dT\\d\\d:\\d\\d:\\d\\dZ)")
View Source
var DefaultTypeResolver = NewTypeResolver()
View Source
var ErrTableNotExist = errors.New("table doesn't exist")
View Source
var IndexParameterPlaceholder = func(i int, name string) string {
	return "$" + strconv.Itoa(i)
}
View Source
var NamedParameterPlaceholder = func(i int, name string) string {
	return "@" + name
}
View Source
var QuestionMarkParameterPlaceholder = func(i int, name string) string {
	return "?"
}

Functions

func GranularityToPartitionIds

func GranularityToPartitionIds(g Granularity, t time.Time) []string

func InitTypes

func InitTypes(dataTypes map[types2.DataType][]string, supportsJSON bool) (typesMapping map[types2.DataType]string, reverseTypesMapping map[string]types2.DataType)

func NewBigquery

func NewBigquery(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewBigquery return configured BigQuery bulker.Bulker instance

func NewClickHouse

func NewClickHouse(bulkerConfig bulkerlib.Config) (bulkerlib.Bulker, error)

NewClickHouse returns configured ClickHouse adapter instance

func NewMySQL

func NewMySQL(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewMySQL returns configured MySQL adapter instance

func NewPostgres

func NewPostgres(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewPostgres return configured Postgres bulker.Bulker instance

func NewRedshift

func NewRedshift(bulkerConfig bulker.Config) (bulker.Bulker, error)

func NewRedshiftClassic

func NewRedshiftClassic(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewRedshift returns configured Redshift adapter instance

func NewRedshiftIAM

func NewRedshiftIAM(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewPostgres return configured Postgres bulker.Bulker instance

func NewSnowflake

func NewSnowflake(bulkerConfig bulker.Config) (bulker.Bulker, error)

NewSnowflake returns configured Snowflake adapter instance

func ProcessSSL

func ProcessSSL(dir string, dsc *PostgresConfig) error

ProcessSSL serializes SSL payload (ca, client cert, key) into files enriches input DataSourceConfig parameters with SSL config ssl configuration might be file path as well as string content

func WithColumnType

func WithColumnType(columnName, sqlType string) bulker.StreamOption

WithColumnType provides overrides for column type of single column for current BulkerStream object fields

func WithColumnTypeDDL

func WithColumnTypeDDL(columnName, sqlType, ddlType string) bulker.StreamOption

WithColumnTypeDDL provides overrides for column type and DDL type of single column for current BulkerStream object fields

func WithColumnTypes

func WithColumnTypes(fields types.SQLTypes) bulker.StreamOption

WithColumnTypes provides overrides for column types of current BulkerStream object fields

func WithDeduplicateWindow

func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption

func WithMaxColumnsCount

func WithMaxColumnsCount(maxColumnsCount int) bulker.StreamOption

func WithOmitNils

func WithOmitNils() bulker.StreamOption

func WithSchemaFreeze

func WithSchemaFreeze() bulker.StreamOption

func WithTemporaryBatchSize

func WithTemporaryBatchSize(temporaryBatchSize int) bulker.StreamOption

func WithoutOmitNils

func WithoutOmitNils() bulker.StreamOption

Types

type AbstractSQLStream

type AbstractSQLStream struct {
	// contains filtered or unexported fields
}

type AbstractTransactionalSQLStream

type AbstractTransactionalSQLStream struct {
	*AbstractSQLStream
	// contains filtered or unexported fields
}

func (*AbstractTransactionalSQLStream) Abort

func (*AbstractTransactionalSQLStream) Consume

func (ps *AbstractTransactionalSQLStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error)

func (*AbstractTransactionalSQLStream) ConsumeJSON

func (ps *AbstractTransactionalSQLStream) ConsumeJSON(ctx context.Context, json []byte) (state bulker.State, processedObject types.Object, err error)

func (*AbstractTransactionalSQLStream) ConsumeMap

func (ps *AbstractTransactionalSQLStream) ConsumeMap(ctx context.Context, mp map[string]any) (state bulker.State, processedObject types.Object, err error)

type AutoCommitStream

type AutoCommitStream struct {
	*AbstractSQLStream
}

func (*AutoCommitStream) Abort

func (ps *AutoCommitStream) Abort(ctx context.Context) (state bulker.State)

func (*AutoCommitStream) Complete

func (ps *AutoCommitStream) Complete(ctx context.Context) (state bulker.State, err error)

func (*AutoCommitStream) Consume

func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error)

func (*AutoCommitStream) ConsumeJSON

func (ps *AutoCommitStream) ConsumeJSON(ctx context.Context, json []byte) (state bulker.State, processedObject types.Object, err error)

func (*AutoCommitStream) ConsumeMap

func (ps *AutoCommitStream) ConsumeMap(ctx context.Context, mp map[string]any) (state bulker.State, processedObject types.Object, err error)

type BQItem

type BQItem struct {
	// contains filtered or unexported fields
}

BQItem struct for streaming inserts to BigQuery

func (*BQItem) Save

func (bqi *BQItem) Save() (row map[string]bigquery.Value, insertID string, err error)

type BigQuery

type BigQuery struct {
	appbase.Service
	// contains filtered or unexported fields
}

BigQuery adapter for creating,patching (schema or table), inserting and copying data from gcs to BigQuery

func (*BigQuery) BuildConstraintName

func (bq *BigQuery) BuildConstraintName(tableName string) string

func (*BigQuery) Close

func (bq *BigQuery) Close() error

func (*BigQuery) ColumnName

func (bq *BigQuery) ColumnName(identifier string) string

func (*BigQuery) CopyTables

func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error)

func (*BigQuery) Count

func (bq *BigQuery) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error)

func (*BigQuery) CreateStream

func (bq *BigQuery) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*BigQuery) CreateTable

func (bq *BigQuery) CreateTable(ctx context.Context, table *Table) (err error)

CreateTable creates google BigQuery table from Table

func (*BigQuery) DefaultNamespace

func (bq *BigQuery) DefaultNamespace() string

func (*BigQuery) Delete

func (bq *BigQuery) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) (err error)

func (*BigQuery) DeletePartition

func (bq *BigQuery) DeletePartition(ctx context.Context, namespace, tableName string, datePartiton *DatePartition) error

func (*BigQuery) Drop

func (bq *BigQuery) Drop(ctx context.Context, table *Table, ifExists bool) error

func (*BigQuery) DropTable

func (bq *BigQuery) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error

DropTable drops table from BigQuery

func (*BigQuery) GetAvroSchema

func (bq *BigQuery) GetAvroSchema(table *Table) *types2.AvroSchema

func (*BigQuery) GetAvroType

func (bq *BigQuery) GetAvroType(sqlType string) (any, bool)

func (*BigQuery) GetBatchFileCompression

func (bq *BigQuery) GetBatchFileCompression() types2.FileCompression

func (*BigQuery) GetBatchFileFormat

func (bq *BigQuery) GetBatchFileFormat() types2.FileFormat

func (*BigQuery) GetDataType

func (bq *BigQuery) GetDataType(sqlType string) (types2.DataType, bool)

func (*BigQuery) GetSQLType

func (bq *BigQuery) GetSQLType(dataType types2.DataType) (string, bool)

func (*BigQuery) GetTableSchema

func (bq *BigQuery) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)

GetTableSchema return google BigQuery table (name,columns) representation wrapped in Table struct

func (*BigQuery) InitDatabase

func (bq *BigQuery) InitDatabase(ctx context.Context) error

InitDatabase creates google BigQuery Dataset if doesn't exist

func (*BigQuery) Insert

func (bq *BigQuery) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) (err error)

func (*BigQuery) LoadTable

func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)

func (*BigQuery) OpenTx

func (bq *BigQuery) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

func (*BigQuery) PatchTableSchema

func (bq *BigQuery) PatchTableSchema(ctx context.Context, patchSchema *Table) error

PatchTableSchema adds Table columns to google BigQuery table

func (*BigQuery) Ping

func (bq *BigQuery) Ping(ctx context.Context) error

func (*BigQuery) ReplaceTable

func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*BigQuery) RunJob

func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription string) (job *bigquery.Job, state bulker.WarehouseState, err error)

func (*BigQuery) Select

func (bq *BigQuery) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

func (*BigQuery) StringifyObjects

func (bq *BigQuery) StringifyObjects() bool

func (*BigQuery) TableHelper

func (bq *BigQuery) TableHelper() *TableHelper

func (*BigQuery) TableName

func (bq *BigQuery) TableName(identifier string) string

func (*BigQuery) TmpNamespace

func (bq *BigQuery) TmpNamespace(namespace string) string

func (*BigQuery) TruncateTable

func (bq *BigQuery) TruncateTable(ctx context.Context, namespace string, tableName string) error

TruncateTable deletes all records in tableName table

func (*BigQuery) Type

func (bq *BigQuery) Type() string

func (*BigQuery) Update

func (bq *BigQuery) Update(ctx context.Context, namespace, tableName string, object types2.Object, whenConditions *WhenConditions) (err error)

type ClickHouse

type ClickHouse struct {
	*SQLAdapterBase[ClickHouseConfig]
	// contains filtered or unexported fields
}

ClickHouse is adapter for creating,patching (schema or table), inserting data to clickhouse

func (*ClickHouse) Config

func (ch *ClickHouse) Config() *ClickHouseConfig

func (*ClickHouse) CopyTables

func (ch *ClickHouse) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulkerlib.WarehouseState, err error)

func (*ClickHouse) Count

func (ch *ClickHouse) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error)

func (*ClickHouse) CreateStream

func (ch *ClickHouse) CreateStream(id, tableName string, mode bulkerlib.BulkMode, streamOptions ...bulkerlib.StreamOption) (bulkerlib.BulkerStream, error)

func (*ClickHouse) CreateTable

func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error

CreateTable create database table with name,columns provided in Table representation New tables will have MergeTree() or ReplicatedMergeTree() engine depends on config.cluster empty or not

func (*ClickHouse) Delete

func (ch *ClickHouse) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error

func (*ClickHouse) Drop

func (ch *ClickHouse) Drop(ctx context.Context, table *Table, ifExists bool) error

func (*ClickHouse) DropTable

func (ch *ClickHouse) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error

func (*ClickHouse) GetTableSchema

func (ch *ClickHouse) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)

GetTableSchema return table (name,columns with name and types) representation wrapped in Table struct

func (*ClickHouse) InitDatabase

func (ch *ClickHouse) InitDatabase(ctx context.Context) error

InitDatabase create database instance if doesn't exist

func (*ClickHouse) Insert

func (ch *ClickHouse) Insert(ctx context.Context, table *Table, _ bool, objects ...types.Object) (err error)

func (*ClickHouse) IsDistributed

func (ch *ClickHouse) IsDistributed() bool

func (*ClickHouse) LoadTable

func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulkerlib.WarehouseState, err error)

LoadTable transfer data from local file to ClickHouse table

func (*ClickHouse) OpenTx

func (ch *ClickHouse) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx relies on ClickHouse session by creating new connection and wrapping it with TxSQLAdapter it makes sure that all activity happens in one connection.

func (*ClickHouse) PatchTableSchema

func (ch *ClickHouse) PatchTableSchema(ctx context.Context, patchSchema *Table) error

PatchTableSchema add new columns(from provided Table) to existing table drop and create distributed table

func (*ClickHouse) Ping

func (ch *ClickHouse) Ping(_ context.Context) error

func (*ClickHouse) ReplaceTable

func (ch *ClickHouse) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*ClickHouse) Select

func (ch *ClickHouse) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

func (*ClickHouse) TmpNamespace

func (ch *ClickHouse) TmpNamespace(string) string

func (*ClickHouse) TruncateTable

func (ch *ClickHouse) TruncateTable(ctx context.Context, namespace string, tableName string) error

TruncateTable deletes all records in tableName table

func (*ClickHouse) Type

func (ch *ClickHouse) Type() string

type ClickHouseCluster

type ClickHouseCluster interface {
	IsDistributed() bool
	Config() *ClickHouseConfig
}

type ClickHouseConfig

type ClickHouseConfig struct {
	Protocol   ClickHouseProtocol `mapstructure:"protocol,omitempty" json:"protocol,omitempty" yaml:"protocol,omitempty"`
	Hosts      []string           `mapstructure:"hosts,omitempty" json:"hosts,omitempty" yaml:"hosts,omitempty"`
	Parameters map[string]string  `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"`
	Username   string             `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"`
	Password   string             `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"`
	Database   string             `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"`
	Cluster    string             `mapstructure:"cluster,omitempty" json:"cluster,omitempty" yaml:"cluster,omitempty"`
	TLS        map[string]string  `mapstructure:"tls,omitempty" json:"tls,omitempty" yaml:"tls,omitempty"`
	Engine     *EngineConfig      `mapstructure:"engine,omitempty" json:"engine,omitempty" yaml:"engine,omitempty"`
	LoadAsJSON bool               `mapstructure:"loadAsJson,omitempty" json:"loadAsJson,omitempty" yaml:"loadAsJson,omitempty"`
}

ClickHouseConfig dto for deserialized clickhouse config

func (*ClickHouseConfig) Validate

func (chc *ClickHouseConfig) Validate() error

Validate required fields in ClickHouseConfig

type ClickHouseProtocol

type ClickHouseProtocol string
const (
	ClickHouseProtocolNative ClickHouseProtocol = "clickhouse"
	ClickHouseProtocolSecure ClickHouseProtocol = "clickhouse-secure"
	ClickHouseProtocolHTTP   ClickHouseProtocol = "http"
	ClickHouseProtocolHTTPS  ClickHouseProtocol = "https"
)

type ColumnDDLFunction

type ColumnDDLFunction func(quotedName, name string, table *Table, column types2.SQLColumn) string

ColumnDDLFunction generate column DDL for CREATE TABLE statement based on type (SQLColumn) and whether it is used for PK

type ColumnScanner

type ColumnScanner struct {
	ColumnType *sql.ColumnType
	// contains filtered or unexported fields
}

func (*ColumnScanner) Get

func (s *ColumnScanner) Get() any

func (*ColumnScanner) Scan

func (s *ColumnScanner) Scan(src any) error

type Columns

type Columns = *types2.OrderedMap[string, types.SQLColumn]

Columns is a list of columns representation

func NewColumns

func NewColumns() Columns

func NewColumnsFromArrays

func NewColumnsFromArrays(arr []types2.El[string, types.SQLColumn]) Columns

func NewColumnsFromMap

func NewColumnsFromMap(mp map[string]types.SQLColumn) Columns

type ConWithDB

type ConWithDB struct {
	// contains filtered or unexported fields
}

func NewConWithDB

func NewConWithDB(db *sql.DB, con *sql.Conn) *ConWithDB

func (*ConWithDB) Close

func (c *ConWithDB) Close() error

func (*ConWithDB) ExecContext

func (c *ConWithDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

func (*ConWithDB) PrepareContext

func (c *ConWithDB) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)

func (*ConWithDB) QueryContext

func (c *ConWithDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

func (*ConWithDB) QueryRowContext

func (c *ConWithDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

type DB

type DB interface {
	TxOrDB
	io.Closer
}

type DataSourceConfig

type DataSourceConfig struct {
	Host       string            `mapstructure:"host,omitempty" json:"host,omitempty" yaml:"host,omitempty"`
	Port       int               `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"`
	Db         string            `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"`
	Schema     string            `mapstructure:"defaultSchema,omitempty" json:"defaultSchema,omitempty" yaml:"defaultSchema,omitempty"`
	Username   string            `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"`
	Password   string            `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"`
	Parameters map[string]string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination)

func (*DataSourceConfig) Validate

func (dsc *DataSourceConfig) Validate() error

Validate required fields in DataSourceConfig

type DatePartition

type DatePartition struct {
	Field       string
	Value       time.Time
	Granularity Granularity
}

type DbConnectFunction

type DbConnectFunction[T any] func(config *T) (*sql.DB, error)

DbConnectFunction function is used to connect to database

type DeduplicationLine

type DeduplicationLine struct {
	// contains filtered or unexported fields
}

type DummyTypeResolver

type DummyTypeResolver struct {
}

DummyTypeResolver doesn't do anything

func NewDummyTypeResolver

func NewDummyTypeResolver() *DummyTypeResolver

NewDummyTypeResolver return DummyTypeResolver

func (*DummyTypeResolver) Resolve

func (dtr *DummyTypeResolver) Resolve(object map[string]any, sqlTypeHints types2.SQLTypes) (Fields, error)

Resolve return one dummy field and types.Fields becomes not empty. (it is used in Facebook destination)

type EngineConfig

type EngineConfig struct {
	RawStatement    string        `mapstructure:"rawStatement,omitempty" json:"rawStatement,omitempty" yaml:"rawStatement,omitempty"`
	NullableFields  []string      `mapstructure:"nullableFields,omitempty" json:"nullableFields,omitempty" yaml:"nullableFields,omitempty"`
	PartitionFields []FieldConfig `mapstructure:"partitionFields,omitempty" json:"partitionFields,omitempty" yaml:"partitionFields,omitempty"`
	OrderFields     []FieldConfig `mapstructure:"orderFields,omitempty" json:"orderFields,omitempty" yaml:"orderFields,omitempty"`
	PrimaryKeys     []string      `mapstructure:"primaryKeys,omitempty" json:"primaryKeys,omitempty" yaml:"primaryKeys,omitempty"`
}

EngineConfig dto for deserialized clickhouse engine config

type ErrorAdapter

type ErrorAdapter func(error) error

ErrorAdapter is used to extract implementation specific payload and adapt to standard error

type Field

type Field struct {
	// contains filtered or unexported fields
}

Field is a data type holder with sql type suggestion

func NewField

func NewField(t types2.DataType) Field

NewField returns Field instance

func NewFieldWithSQLType

func NewFieldWithSQLType(t types2.DataType, suggestedType *types2.SQLColumn) Field

NewFieldWithSQLType returns Field instance with configured suggested sql types

func (Field) GetSuggestedSQLType

func (f Field) GetSuggestedSQLType() (types2.SQLColumn, bool)

GetSuggestedSQLType returns suggested SQL type if configured

func (Field) GetType

func (f Field) GetType() types2.DataType

GetType get field type based on occurrence in one file lazily get common ancestor type (typing.GetCommonAncestorType)

type FieldConfig

type FieldConfig struct {
	Function string `mapstructure:"function,omitempty" json:"function,omitempty" yaml:"function,omitempty"`
	Field    string `mapstructure:"field,omitempty" json:"field,omitempty" yaml:"field,omitempty"`
}

FieldConfig dto for deserialized clickhouse engine fields

type Fields

type Fields = *types.OrderedMap[string, Field]

type Granularity

type Granularity string

Granularity is a granularity of TimeInterval

const (
	HOUR    Granularity = "HOUR"
	DAY     Granularity = "DAY"
	WEEK    Granularity = "WEEK"
	MONTH   Granularity = "MONTH"
	QUARTER Granularity = "QUARTER"
	YEAR    Granularity = "YEAR"
	ALL     Granularity = "ALL"
)

func ParseGranularity

func ParseGranularity(s string) (Granularity, error)

ParseGranularity returns Granularity value from string

func (Granularity) Format

func (g Granularity) Format(t time.Time) string

Format returns formatted string value representation

func (Granularity) Lower

func (g Granularity) Lower(t time.Time) time.Time

Lower returns the lower value of interval

func (Granularity) String

func (g Granularity) String() string

String returns string value representation

func (Granularity) Upper

func (g Granularity) Upper(t time.Time) time.Time

Upper returns the upper value of interval

type IdentifierFunction

type IdentifierFunction func(identifier string, alphanumeric bool) (adapted string, needQuotes bool)

IdentifierFunction adapts identifier name to format required by database e.g. masks or escapes special characters

type JobRunner

type JobRunner interface {
	Run(ctx context.Context) (*bigquery.Job, error)
}

type LoadSource

type LoadSource struct {
	Type     LoadSourceType
	Format   types2.FileFormat
	Path     string
	S3Config *S3OptionConfig
}

type LoadSourceType

type LoadSourceType string
const (
	LocalFile        LoadSourceType = "local_file"
	GoogleCloudStore LoadSourceType = "google_cloud_store"
	AmazonS3         LoadSourceType = "amazon_s3"
)

type MySQL

type MySQL struct {
	*SQLAdapterBase[DataSourceConfig]
	// contains filtered or unexported fields
}

MySQL is adapter for creating, patching (schema or table), inserting data to mySQL database

func (*MySQL) BuildConstraintName

func (m *MySQL) BuildConstraintName(tableName string) string

func (*MySQL) CopyTables

func (m *MySQL) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error)

func (*MySQL) CreateStream

func (m *MySQL) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*MySQL) CreateTable

func (m *MySQL) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*MySQL) GetTableSchema

func (m *MySQL) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)

GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct

func (*MySQL) InitDatabase

func (m *MySQL) InitDatabase(ctx context.Context) error

InitDatabase creates database instance if doesn't exist

func (*MySQL) Insert

func (m *MySQL) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

func (*MySQL) LoadTable

func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)

func (*MySQL) OpenTx

func (m *MySQL) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*MySQL) ReplaceTable

func (m *MySQL) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

type ParameterPlaceholder

type ParameterPlaceholder func(i int, name string) string

type Postgres

type Postgres struct {
	*SQLAdapterBase[PostgresConfig]
	// contains filtered or unexported fields
}

Postgres is adapter for creating,patching (schema or table), inserting data to postgres

func (*Postgres) Close

func (p *Postgres) Close() error

Close underlying sql.DB

func (*Postgres) CopyTables

func (p *Postgres) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error)

func (*Postgres) CreateStream

func (p *Postgres) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*Postgres) CreateTable

func (p *Postgres) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*Postgres) GetTableSchema

func (p *Postgres) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)

GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct

func (*Postgres) InitDatabase

func (p *Postgres) InitDatabase(ctx context.Context) error

InitDatabase creates database schema instance if doesn't exist

func (*Postgres) Insert

func (p *Postgres) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

func (*Postgres) LoadTable

func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)

func (*Postgres) OpenTx

func (p *Postgres) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*Postgres) Ping

func (p *Postgres) Ping(ctx context.Context) error

func (*Postgres) ReplaceTable

func (p *Postgres) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

type PostgresConfig

type PostgresConfig struct {
	DataSourceConfig `mapstructure:",squash"`
	SSLConfig        `mapstructure:",squash"`
}

type QueryPayload

type QueryPayload struct {
	Namespace      string
	NamespaceFrom  string
	TableName      string
	Columns        string
	Placeholders   string
	PrimaryKeyName string
	UpdateSet      string

	TableTo        string
	TableFrom      string
	JoinConditions string
	SourceColumns  string
}

type Redshift

type Redshift struct {
	//Aws Redshift uses Postgres fork under the hood
	*Postgres
	// contains filtered or unexported fields
}

Redshift adapter for creating,patching (schema or table), inserting and copying data from s3 to redshift

func (*Redshift) CopyTables

func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error)

func (*Redshift) CreateStream

func (p *Redshift) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*Redshift) CreateTable

func (p *Redshift) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*Redshift) GetTableSchema

func (p *Redshift) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)

GetTableSchema return table (name,columns, primary key) representation wrapped in Table struct

func (*Redshift) Insert

func (p *Redshift) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

func (*Redshift) LoadTable

func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)

LoadTable copy transfer data from s3 to redshift by passing COPY request to redshift

func (*Redshift) OpenTx

func (p *Redshift) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*Redshift) ReplaceTable

func (p *Redshift) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*Redshift) TmpNamespace

func (p *Redshift) TmpNamespace(string) string

func (*Redshift) Type

func (p *Redshift) Type() string

Type returns Postgres type

type RedshiftIAM

type RedshiftIAM struct {
	*SQLAdapterBase[driver.RedshiftConfig]
}

Postgres is adapter for creating,patching (schema or table), inserting data to postgres

func (*RedshiftIAM) Close

func (p *RedshiftIAM) Close() error

Close underlying sql.DB

func (*RedshiftIAM) CopyTables

func (p *RedshiftIAM) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error)

func (*RedshiftIAM) CreateStream

func (p *RedshiftIAM) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*RedshiftIAM) CreateTable

func (p *RedshiftIAM) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*RedshiftIAM) GetTableSchema

func (p *RedshiftIAM) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)

GetTableSchema return table (name,columns, primary key) representation wrapped in Table struct

func (*RedshiftIAM) InitDatabase

func (p *RedshiftIAM) InitDatabase(ctx context.Context) error

InitDatabase creates database schema instance if doesn't exist

func (*RedshiftIAM) Insert

func (p *RedshiftIAM) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

func (*RedshiftIAM) LoadTable

func (p *RedshiftIAM) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)

LoadTable copy transfer data from s3 to redshift by passing COPY request to redshift

func (*RedshiftIAM) OpenTx

func (p *RedshiftIAM) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*RedshiftIAM) Ping

func (p *RedshiftIAM) Ping(ctx context.Context) error

func (*RedshiftIAM) ReplaceTable

func (p *RedshiftIAM) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*RedshiftIAM) Type

func (p *RedshiftIAM) Type() string

type ReplacePartitionStream

type ReplacePartitionStream struct {
	*AbstractTransactionalSQLStream
	// contains filtered or unexported fields
}

func (*ReplacePartitionStream) Complete

func (ps *ReplacePartitionStream) Complete(ctx context.Context) (state bulker.State, err error)

func (*ReplacePartitionStream) Consume

func (ps *ReplacePartitionStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObjects types.Object, err error)

func (*ReplacePartitionStream) ConsumeJSON

func (ps *ReplacePartitionStream) ConsumeJSON(ctx context.Context, json []byte) (state bulker.State, processedObject types.Object, err error)

func (*ReplacePartitionStream) ConsumeMap

func (ps *ReplacePartitionStream) ConsumeMap(ctx context.Context, mp map[string]any) (state bulker.State, processedObject types.Object, err error)

type ReplaceTableStream

type ReplaceTableStream struct {
	*AbstractTransactionalSQLStream
}

func (*ReplaceTableStream) Complete

func (ps *ReplaceTableStream) Complete(ctx context.Context) (state bulker.State, err error)

type RepresentationTable

type RepresentationTable struct {
	Name             string                          `json:"name"`
	Schema           *types2.OrderedMap[string, any] `json:"schema"`
	PrimaryKeyFields []string                        `json:"primaryKeyFields,omitempty"`
	PrimaryKeyName   string                          `json:"primaryKeyName,omitempty"`
	Temporary        bool                            `json:"temporary,omitempty"`
}

type S3OptionConfig

type S3OptionConfig struct {
	AuthenticationMethod string        `mapstructure:"authenticationMethod,omitempty" json:"authenticationMethod,omitempty" yaml:"authenticationMethod,omitempty"`
	AccessKeyID          string        `mapstructure:"accessKeyId,omitempty" json:"accessKeyId,omitempty" yaml:"accessKeyId,omitempty"`
	SecretAccessKey      string        `mapstructure:"secretAccessKey,omitempty" json:"secretAccessKey,omitempty" yaml:"secretAccessKey,omitempty"`
	RoleARN              string        `mapstructure:"roleARN,omitempty" json:"roleARN,omitempty" yaml:"roleARN,omitempty"`
	RoleARNExpiry        time.Duration `mapstructure:"roleARNExpiry,omitempty" json:"roleARNExpiry,omitempty" yaml:"roleARNExpiry,omitempty"`
	ExternalID           string        `mapstructure:"externalID,omitempty" json:"externalID,omitempty" yaml:"externalID,omitempty"`
	Bucket               string        `mapstructure:"bucket,omitempty" json:"bucket,omitempty" yaml:"bucket,omitempty"`
	Region               string        `mapstructure:"region,omitempty" json:"region,omitempty" yaml:"region,omitempty"`
	Folder               string        `mapstructure:"folder,omitempty" json:"folder,omitempty" yaml:"folder,omitempty"`
}

type SQLAdapter

type SQLAdapter interface {
	Type() string

	//GetSQLType return mapping from generic bulker type to SQL type specific for this database
	GetSQLType(dataType types2.DataType) (string, bool)
	GetDataType(sqlType string) (types2.DataType, bool)
	GetAvroType(sqlType string) (any, bool)
	GetAvroSchema(table *Table) *types2.AvroSchema
	GetBatchFileFormat() types2.FileFormat
	GetBatchFileCompression() types2.FileCompression
	StringifyObjects() bool
	OpenTx(ctx context.Context) (*TxSQLAdapter, error)
	Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error
	Ping(ctx context.Context) error
	// InitDatabase setups required db objects like 'schema' or 'dataset' if they don't exist
	InitDatabase(ctx context.Context) error
	TableHelper() *TableHelper
	GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
	CreateTable(ctx context.Context, schemaToCreate *Table) error
	CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error)
	LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
	PatchTableSchema(ctx context.Context, patchTable *Table) error
	TruncateTable(ctx context.Context, namespace string, tableName string) error
	//(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error
	Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error
	DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error
	Drop(ctx context.Context, table *Table, ifExists bool) error

	ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) error

	Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)
	Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error)

	// ColumnName adapts column name to sql identifier rules of database
	ColumnName(rawColumn string) string
	// TableName adapts table name to sql identifier rules of database
	TableName(rawTableName string) string
	BuildConstraintName(tableName string) string
	DefaultNamespace() string
	// TmpNamespace returns namespace used by temporary tables, e.g. for warehouses where temporary tables
	// must not be specified with schema or db prefix NoNamespaceValue constant must be used
	TmpNamespace(targetNamespace string) string
}

SQLAdapter is a manager for DWH tables

type SQLAdapterBase

type SQLAdapterBase[T any] struct {
	appbase.Service
	// contains filtered or unexported fields
}

func (*SQLAdapterBase[T]) BuildConstraintName

func (b *SQLAdapterBase[T]) BuildConstraintName(tableName string) string

func (*SQLAdapterBase[T]) Close

func (b *SQLAdapterBase[T]) Close() error

Close underlying sql.DB

func (*SQLAdapterBase[T]) ColumnName

func (b *SQLAdapterBase[T]) ColumnName(identifier string) string

func (*SQLAdapterBase[T]) Count

func (b *SQLAdapterBase[T]) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error)

func (*SQLAdapterBase[T]) CreateTable

func (b *SQLAdapterBase[T]) CreateTable(ctx context.Context, schemaToCreate *Table) error

CreateTable create table columns and pk key override input table sql type with configured cast type make fields from Table PkFields - 'not null'

func (*SQLAdapterBase[T]) DefaultNamespace

func (b *SQLAdapterBase[T]) DefaultNamespace() string

func (*SQLAdapterBase[T]) Delete

func (b *SQLAdapterBase[T]) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error

func (*SQLAdapterBase[T]) DeleteAll

func (b *SQLAdapterBase[T]) DeleteAll(ctx context.Context, namespace, tableName string) error

DeleteAll deletes all records in tableName table

func (*SQLAdapterBase[T]) Drop

func (b *SQLAdapterBase[T]) Drop(ctx context.Context, table *Table, ifExists bool) error

func (*SQLAdapterBase[T]) DropTable

func (b *SQLAdapterBase[T]) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error

func (*SQLAdapterBase[T]) GetAvroSchema

func (b *SQLAdapterBase[T]) GetAvroSchema(table *Table) *types2.AvroSchema

func (*SQLAdapterBase[T]) GetAvroType

func (b *SQLAdapterBase[T]) GetAvroType(sqlType string) (any, bool)

func (*SQLAdapterBase[T]) GetBatchFileCompression

func (b *SQLAdapterBase[T]) GetBatchFileCompression() types2.FileCompression

func (*SQLAdapterBase[T]) GetBatchFileFormat

func (b *SQLAdapterBase[T]) GetBatchFileFormat() types2.FileFormat

func (*SQLAdapterBase[T]) GetDataType

func (b *SQLAdapterBase[T]) GetDataType(sqlType string) (types2.DataType, bool)

func (*SQLAdapterBase[T]) GetSQLType

func (b *SQLAdapterBase[T]) GetSQLType(dataType types2.DataType) (string, bool)

func (*SQLAdapterBase[T]) PatchTableSchema

func (b *SQLAdapterBase[T]) PatchTableSchema(ctx context.Context, patchTable *Table) error

PatchTableSchema alter table with columns (if not empty) recreate primary key (if not empty) or delete primary key if Table.DeletePrimaryKeyNamed is true

func (*SQLAdapterBase[T]) Ping

func (b *SQLAdapterBase[T]) Ping(ctx context.Context) error

func (*SQLAdapterBase[T]) ReplaceTable

func (b *SQLAdapterBase[T]) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*SQLAdapterBase[T]) Select

func (b *SQLAdapterBase[T]) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

func (*SQLAdapterBase[T]) StringifyObjects

func (b *SQLAdapterBase[T]) StringifyObjects() bool

func (*SQLAdapterBase[T]) TableHelper

func (b *SQLAdapterBase[T]) TableHelper() *TableHelper

func (*SQLAdapterBase[T]) TableName

func (b *SQLAdapterBase[T]) TableName(identifier string) string

func (*SQLAdapterBase[T]) TmpNamespace

func (b *SQLAdapterBase[T]) TmpNamespace(targetNamespace string) string

func (*SQLAdapterBase[T]) ToWhenConditions

func (b *SQLAdapterBase[T]) ToWhenConditions(conditions *WhenConditions, paramExpression ParameterPlaceholder, valuesShift int) (string, []any)

ToWhenConditions generates WHEN clause for SQL query based on provided WhenConditions

paramExpression - SQLParameterExpression function that produce parameter placeholder for parametrized query, depending on database can be: IndexParameterPlaceholder, QuestionMarkParameterPlaceholder, NamedParameterPlaceholder

valuesShift - for parametrized query index of first when clause value in all values provided to query (for UPDATE queries 'valuesShift' = len(object fields))

func (*SQLAdapterBase[T]) TruncateTable

func (b *SQLAdapterBase[T]) TruncateTable(ctx context.Context, namespace string, tableName string) error

func (*SQLAdapterBase[T]) Type

func (b *SQLAdapterBase[T]) Type() string

Type returns Postgres type

func (*SQLAdapterBase[T]) Update

func (b *SQLAdapterBase[T]) Update(ctx context.Context, table *Table, object types2.Object, whenConditions *WhenConditions) error

type SSLConfig

type SSLConfig struct {
	SSLMode       string `mapstructure:"sslMode,omitempty"`
	SSLServerCA   string `mapstructure:"sslServerCA,omitempty"`
	SSLClientCert string `mapstructure:"sslClientCert,omitempty"`
	SSLClientKey  string `mapstructure:"sslClientKey,omitempty"`
}

SSLConfig is a dto for deserialized SSL configuration for Postgres

func (*SSLConfig) ValidateSSL

func (sc *SSLConfig) ValidateSSL() error

ValidateSSL returns err if the ssl configuration is invalid

type Snowflake

type Snowflake struct {
	*SQLAdapterBase[SnowflakeConfig]
}

Snowflake is adapter for creating,patching (schema or table), inserting data to snowflake

func (*Snowflake) BuildConstraintName

func (s *Snowflake) BuildConstraintName(tableName string) string

func (*Snowflake) CopyTables

func (s *Snowflake) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error)

func (*Snowflake) CreateStream

func (s *Snowflake) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)

func (*Snowflake) CreateTable

func (s *Snowflake) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*Snowflake) GetTableSchema

func (s *Snowflake) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)

GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct

func (*Snowflake) InitDatabase

func (s *Snowflake) InitDatabase(ctx context.Context) error

InitDatabase create database schema instance if doesn't exist

func (*Snowflake) Insert

func (s *Snowflake) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

Insert inserts data with InsertContext as a single object or a batch into Snowflake

func (*Snowflake) LoadTable

func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)

LoadTable transfer data from local file to Snowflake by passing COPY request to Snowflake

func (*Snowflake) OpenTx

func (s *Snowflake) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

OpenTx opens underline sql transaction and return wrapped instance

func (*Snowflake) ReplaceTable

func (s *Snowflake) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) (err error)

func (*Snowflake) Select

func (s *Snowflake) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

type SnowflakeConfig

type SnowflakeConfig struct {
	Account    string             `mapstructure:"account,omitempty" json:"account,omitempty" yaml:"account,omitempty"`
	Port       int                `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"`
	Db         string             `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"`
	Schema     string             `mapstructure:"defaultSchema,omitempty" json:"defaultSchema,omitempty" yaml:"defaultSchema,omitempty"`
	Username   string             `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"`
	Password   string             `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"`
	Warehouse  string             `mapstructure:"warehouse,omitempty" json:"warehouse,omitempty" yaml:"warehouse,omitempty"`
	Parameters map[string]*string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"`
}

SnowflakeConfig dto for deserialized datasource config for Snowflake

func (*SnowflakeConfig) Validate

func (sc *SnowflakeConfig) Validate() error

Validate required fields in SnowflakeConfig

type Table

type Table struct {
	Name string
	// database or schema depending on warehouse
	Namespace string
	Temporary bool
	Cached    bool

	Columns         Columns
	PKFields        types2.OrderedSet[string]
	PrimaryKeyName  string
	TimestampColumn string

	Partition DatePartition

	DeletePrimaryKeyNamed string
}

Table is a dto for DWH Table representation

func (*Table) CleanClone

func (t *Table) CleanClone() *Table

CleanClone returns clone of current table w/o 'New' or 'Override' flags

func (*Table) Clone

func (t *Table) Clone() *Table

func (*Table) CloneIfNeeded

func (t *Table) CloneIfNeeded() *Table

func (*Table) ColumnNames

func (t *Table) ColumnNames() []string

ColumnNames return column names as array

func (*Table) ColumnsCount

func (t *Table) ColumnsCount() int

func (*Table) Diff

func (t *Table) Diff(sqlAdapter SQLAdapter, another *Table) *Table

Diff calculates diff between current schema and another one. Return schema to add to current schema (for being equal) or empty if 1) another one is empty 2) all fields from another schema exist in current schema NOTE: Diff method doesn't take types into account

func (*Table) Exists

func (t *Table) Exists() bool

Exists returns true if there is at least one column

func (*Table) GetPKFields

func (t *Table) GetPKFields() []string

GetPKFields returns primary keys list

func (*Table) GetPKFieldsSet

func (t *Table) GetPKFieldsSet() types2.OrderedSet[string]

func (*Table) MappedColumnNames

func (t *Table) MappedColumnNames(f func(string) string) []string

func (*Table) MappedColumns

func (t *Table) MappedColumns(f func(string, types.SQLColumn) string) []string

func (*Table) ToArray

func (t *Table) ToArray() []types.SQLColumn

func (*Table) ToSimpleMap

func (t *Table) ToSimpleMap() *types2.OrderedMap[string, any]

func (*Table) WithoutColumns

func (t *Table) WithoutColumns() *Table

type TableField

type TableField struct {
	Field string `json:"field,omitempty"`
	Type  string `json:"type,omitempty"`
	Value any    `json:"value,omitempty"`
}

TableField is a table column representation

type TableHelper

type TableHelper struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

TableHelper keeps tables schema state inmemory and update it according to incoming new data consider that all tables are in one destination schema. note: Assume that after any outer changes in db we need to increment table version in Service

func NewTableHelper

func NewTableHelper(maxIdentifierLength int, identifierQuoteChar rune) TableHelper

NewTableHelper returns configured TableHelper instance Note: columnTypesMapping must be not empty (or fields will be ignored)

func (*TableHelper) ColumnName

func (th *TableHelper) ColumnName(columnName string) string

func (*TableHelper) EnsureTableWithCaching

func (th *TableHelper) EnsureTableWithCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, dataSchema *Table) (*Table, error)

EnsureTableWithCaching calls ensureTable with cacheTable = true it is used in stream destinations (because we don't have time to select table schema, but there is retry on error)

func (*TableHelper) EnsureTableWithoutCaching

func (th *TableHelper) EnsureTableWithoutCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, dataSchema *Table) (*Table, error)

EnsureTableWithoutCaching calls ensureTable with cacheTable = true it is used in batch destinations and syncStore (because we have time to select table schema)

func (*TableHelper) Get

func (th *TableHelper) Get(ctx context.Context, sqlAdapter SQLAdapter, namespace string, tableName string, cacheTable bool) (*Table, error)

func (*TableHelper) GetCached

func (th *TableHelper) GetCached(tableName string) (*Table, bool)

func (*TableHelper) MapSchema

func (th *TableHelper) MapSchema(sqlAdapter SQLAdapter, schema types2.Schema, nameTransformer func(string) string) *Table

MapSchema maps types.Schema into types.Table (structure with SQL types)

func (*TableHelper) MapTableSchema

func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesHeader, object types2.Object, pkColumns []string, timestampColumn string, namespace string) (*Table, types2.Object)

MapTableSchema maps types.TypesHeader (JSON structure with json data types) into types.Table (structure with SQL types) applies column types mapping adjusts object properties names to column names

func (*TableHelper) TableName

func (th *TableHelper) TableName(tableName string) string

type TableStatementFactory

type TableStatementFactory struct {
	// contains filtered or unexported fields
}

TableStatementFactory is used for creating CREATE TABLE statements depends on config

func NewTableStatementFactory

func NewTableStatementFactory(ch ClickHouseCluster) *TableStatementFactory

func (TableStatementFactory) CreateTableStatement

func (tsf TableStatementFactory) CreateTableStatement(namespacePrefix, quotedTableName, tableName, columnsClause string, table *Table) string

CreateTableStatement return clickhouse DDL for creating table statement

type TransactionalStream

type TransactionalStream struct {
	*AbstractTransactionalSQLStream
}

TODO: Use real temporary tables

func (*TransactionalStream) Complete

func (ps *TransactionalStream) Complete(ctx context.Context) (state bulker.State, err error)

type TxOrDB

type TxOrDB interface {
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
	PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
}

type TxSQLAdapter

type TxSQLAdapter struct {
	// contains filtered or unexported fields
}

func (*TxSQLAdapter) BuildConstraintName

func (tx *TxSQLAdapter) BuildConstraintName(tableName string) string

func (*TxSQLAdapter) ColumnName

func (tx *TxSQLAdapter) ColumnName(identifier string) string

func (*TxSQLAdapter) Commit

func (tx *TxSQLAdapter) Commit() error

func (*TxSQLAdapter) CopyTables

func (tx *TxSQLAdapter) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error)

func (*TxSQLAdapter) Count

func (tx *TxSQLAdapter) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error)

func (*TxSQLAdapter) CreateTable

func (tx *TxSQLAdapter) CreateTable(ctx context.Context, schemaToCreate *Table) error

func (*TxSQLAdapter) DefaultNamespace

func (tx *TxSQLAdapter) DefaultNamespace() string

func (*TxSQLAdapter) Delete

func (tx *TxSQLAdapter) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error
func (tx *TxSQLAdapter) Update(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error {
	ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx)
	return tx.sqlAdapter.Update(ctx, tableName, object, whenConditions)
}

func (*TxSQLAdapter) Drop

func (tx *TxSQLAdapter) Drop(ctx context.Context, table *Table, ifExists bool) error

func (*TxSQLAdapter) DropTable

func (tx *TxSQLAdapter) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error

func (*TxSQLAdapter) GetAvroSchema

func (tx *TxSQLAdapter) GetAvroSchema(table *Table) *types2.AvroSchema

func (*TxSQLAdapter) GetAvroType

func (tx *TxSQLAdapter) GetAvroType(sqlType string) (any, bool)

func (*TxSQLAdapter) GetBatchFileCompression

func (tx *TxSQLAdapter) GetBatchFileCompression() types2.FileCompression

func (*TxSQLAdapter) GetBatchFileFormat

func (tx *TxSQLAdapter) GetBatchFileFormat() types2.FileFormat

func (*TxSQLAdapter) GetDataType

func (tx *TxSQLAdapter) GetDataType(sqlType string) (types2.DataType, bool)

func (*TxSQLAdapter) GetSQLType

func (tx *TxSQLAdapter) GetSQLType(dataType types2.DataType) (string, bool)

func (*TxSQLAdapter) GetTableSchema

func (tx *TxSQLAdapter) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)

func (*TxSQLAdapter) InitDatabase

func (tx *TxSQLAdapter) InitDatabase(ctx context.Context) error

func (*TxSQLAdapter) Insert

func (tx *TxSQLAdapter) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error

func (*TxSQLAdapter) LoadTable

func (tx *TxSQLAdapter) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (bulker.WarehouseState, error)

func (*TxSQLAdapter) OpenTx

func (tx *TxSQLAdapter) OpenTx(ctx context.Context) (*TxSQLAdapter, error)

func (*TxSQLAdapter) PatchTableSchema

func (tx *TxSQLAdapter) PatchTableSchema(ctx context.Context, patchTable *Table) error

func (*TxSQLAdapter) Ping

func (tx *TxSQLAdapter) Ping(ctx context.Context) error

func (*TxSQLAdapter) ReplaceTable

func (tx *TxSQLAdapter) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) error

func (*TxSQLAdapter) Rollback

func (tx *TxSQLAdapter) Rollback() error

func (*TxSQLAdapter) Select

func (tx *TxSQLAdapter) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)

func (*TxSQLAdapter) StringifyObjects

func (tx *TxSQLAdapter) StringifyObjects() bool

func (*TxSQLAdapter) TableHelper

func (tx *TxSQLAdapter) TableHelper() *TableHelper

func (*TxSQLAdapter) TableName

func (tx *TxSQLAdapter) TableName(identifier string) string

func (*TxSQLAdapter) TmpNamespace

func (tx *TxSQLAdapter) TmpNamespace(targetNamespace string) string

func (*TxSQLAdapter) TruncateTable

func (tx *TxSQLAdapter) TruncateTable(ctx context.Context, namespace string, tableName string) error

func (*TxSQLAdapter) Type

func (tx *TxSQLAdapter) Type() string

type TxWrapper

type TxWrapper struct {
	// contains filtered or unexported fields
}

TxWrapper is sql transaction wrapper. Used for handling and log errors with db type (postgres, mySQL, redshift or snowflake) on Commit() and Rollback() calls

func NewCustomWrapper

func NewCustomWrapper(dbType string, custom io.Closer, err error) *TxWrapper

func NewDbWrapper

func NewDbWrapper(dbType string, db DB, queryLogger *logging.QueryLogger, errorAdapter ErrorAdapter, closeDb bool) *TxWrapper

func NewDummyTxWrapper

func NewDummyTxWrapper(dbType string) *TxWrapper

func NewTxWrapper

func NewTxWrapper(dbType string, tx *sql.Tx, queryLogger *logging.QueryLogger, errorAdapter ErrorAdapter) *TxWrapper

func (*TxWrapper) Commit

func (t *TxWrapper) Commit() error

Commit commits underlying transaction and returns err if occurred

func (*TxWrapper) ExecContext

func (t *TxWrapper) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

ExecContext executes a query that doesn't return rows. For example: an INSERT and UPDATE.

func (*TxWrapper) GetCustom

func (t *TxWrapper) GetCustom() (io.Closer, error)

func (*TxWrapper) PrepareContext

func (t *TxWrapper) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)

PrepareContext creates a prepared statement for use within a transaction.

The returned statement operates within the transaction and will be closed when the transaction has been committed or rolled back.

To use an existing prepared statement on this transaction, see Tx.Stmt.

The provided context will be used for the preparation of the context, not for the execution of the returned statement. The returned statement will run in the transaction context.

func (*TxWrapper) QueryContext

func (t *TxWrapper) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)

QueryContext executes a query that returns rows, typically a SELECT.

func (*TxWrapper) QueryRowContext

func (t *TxWrapper) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row

QueryRowContext executes a query that is expected to return at most one row. QueryRowContext always returns a non-nil value. Errors are deferred until Row's Scan method is called. If the query selects no rows, the *Row's Scan will return ErrNoRows. Otherwise, the *Row's Scan scans the first selected row and discards the rest.

func (*TxWrapper) Rollback

func (t *TxWrapper) Rollback() error

Rollback cancels underlying transaction and logs system err if occurred

type TypeCastFunction

type TypeCastFunction func(placeholder string, column types2.SQLColumn) string

TypeCastFunction wraps parameter(or placeholder) to a type cast expression if it is necessary (e.g. on types overrides)

type TypeResolver

type TypeResolver interface {
	Resolve(object map[string]any, sqlTypeHints types2.SQLTypes) (Fields, error)
}

TypeResolver resolves types.Fields from input object

type TypeResolverImpl

type TypeResolverImpl struct {
}

TypeResolverImpl resolves types based on converter.go rules

func NewTypeResolver

func NewTypeResolver() *TypeResolverImpl

NewTypeResolver returns TypeResolverImpl

func (*TypeResolverImpl) Resolve

func (tr *TypeResolverImpl) Resolve(object types2.Object, sqlTypeHints types2.SQLTypes) (Fields, error)

Resolve return types.Fields representation of input object apply default typecast and define column types reformat from json.Number into int64 or float64 and put back reformat from string with timestamp into time.Time and put back

type TypesHeader

type TypesHeader struct {
	TableName string
	Fields    Fields
	Partition DatePartition
}

TypesHeader is the schema result of parsing JSON objects

func ProcessEvents

func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, nameTransformer func(string) string, omitNils bool, stringifyObjects bool, notFlatteningKeys types2.Set[string]) (*TypesHeader, types.Object, error)

ProcessEvents processes events objects without applying mapping rules returns table headerm array of processed objects or error if at least 1 was occurred

func (*TypesHeader) Exists

func (bh *TypesHeader) Exists() bool

Exists returns true if there is at least one field

type ValueMappingFunction

type ValueMappingFunction func(value any, valuePresent bool, column types2.SQLColumn) any

ValueMappingFunction maps object value to database value. For cases such default value substitution for null or missing values

type WhenCondition

type WhenCondition struct {
	Field  string
	Value  any
	Clause string
}

WhenCondition is a representation of SQL delete condition

type WhenConditions

type WhenConditions struct {
	Conditions    []WhenCondition
	JoinCondition string
}

WhenConditions is a dto for multiple WhenCondition instances with Joiner

func ByPartitionId

func ByPartitionId(partitonId string) *WhenConditions

ByPartitionId return delete condition that removes objects based on __partition_id value or empty condition if partitonId is empty

func NewWhenConditions

func NewWhenConditions(field string, clause string, value any) *WhenConditions

func (*WhenConditions) Add

func (dc *WhenConditions) Add(field string, clause string, value any) *WhenConditions

func (*WhenConditions) IsEmpty

func (dc *WhenConditions) IsEmpty() bool

IsEmpty returns true if there is no conditions

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL