Documentation ¶
Index ¶
- Constants
- Variables
- func GranularityToPartitionIds(g Granularity, t time.Time) []string
- func InitTypes(dataTypes map[types2.DataType][]string, supportsJSON bool) (typesMapping map[types2.DataType]string, ...)
- func NewBigquery(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewClickHouse(bulkerConfig bulkerlib.Config) (bulkerlib.Bulker, error)
- func NewMySQL(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewPostgres(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewRedshift(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewRedshiftClassic(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewRedshiftIAM(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func NewSnowflake(bulkerConfig bulker.Config) (bulker.Bulker, error)
- func ProcessSSL(dir string, dsc *PostgresConfig) error
- func WithColumnType(columnName, sqlType string) bulker.StreamOption
- func WithColumnTypeDDL(columnName, sqlType, ddlType string) bulker.StreamOption
- func WithColumnTypes(fields types.SQLTypes) bulker.StreamOption
- func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption
- func WithMaxColumnsCount(maxColumnsCount int) bulker.StreamOption
- func WithOmitNils() bulker.StreamOption
- func WithSchemaFreeze() bulker.StreamOption
- func WithTemporaryBatchSize(temporaryBatchSize int) bulker.StreamOption
- func WithoutOmitNils() bulker.StreamOption
- type AbstractSQLStream
- type AbstractTransactionalSQLStream
- func (ps *AbstractTransactionalSQLStream) Abort(ctx context.Context) (state bulker.State)
- func (ps *AbstractTransactionalSQLStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error)
- func (ps *AbstractTransactionalSQLStream) ConsumeJSON(ctx context.Context, json []byte) (state bulker.State, processedObject types.Object, err error)
- func (ps *AbstractTransactionalSQLStream) ConsumeMap(ctx context.Context, mp map[string]any) (state bulker.State, processedObject types.Object, err error)
- type AutoCommitStream
- func (ps *AutoCommitStream) Abort(ctx context.Context) (state bulker.State)
- func (ps *AutoCommitStream) Complete(ctx context.Context) (state bulker.State, err error)
- func (ps *AutoCommitStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObject types.Object, err error)
- func (ps *AutoCommitStream) ConsumeJSON(ctx context.Context, json []byte) (state bulker.State, processedObject types.Object, err error)
- func (ps *AutoCommitStream) ConsumeMap(ctx context.Context, mp map[string]any) (state bulker.State, processedObject types.Object, err error)
- type BQItem
- type BigQuery
- func (bq *BigQuery) BuildConstraintName(tableName string) string
- func (bq *BigQuery) Close() error
- func (bq *BigQuery) ColumnName(identifier string) string
- func (bq *BigQuery) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error)
- func (bq *BigQuery) Count(ctx context.Context, namespace string, tableName string, ...) (int, error)
- func (bq *BigQuery) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (bq *BigQuery) CreateTable(ctx context.Context, table *Table) (err error)
- func (bq *BigQuery) DefaultNamespace() string
- func (bq *BigQuery) Delete(ctx context.Context, namespace string, tableName string, ...) (err error)
- func (bq *BigQuery) DeletePartition(ctx context.Context, namespace, tableName string, datePartiton *DatePartition) error
- func (bq *BigQuery) Drop(ctx context.Context, table *Table, ifExists bool) error
- func (bq *BigQuery) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error
- func (bq *BigQuery) GetAvroSchema(table *Table) *types2.AvroSchema
- func (bq *BigQuery) GetAvroType(sqlType string) (any, bool)
- func (bq *BigQuery) GetBatchFileCompression() types2.FileCompression
- func (bq *BigQuery) GetBatchFileFormat() types2.FileFormat
- func (bq *BigQuery) GetDataType(sqlType string) (types2.DataType, bool)
- func (bq *BigQuery) GetSQLType(dataType types2.DataType) (string, bool)
- func (bq *BigQuery) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
- func (bq *BigQuery) InitDatabase(ctx context.Context) error
- func (bq *BigQuery) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) (err error)
- func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
- func (bq *BigQuery) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (bq *BigQuery) PatchTableSchema(ctx context.Context, patchSchema *Table) error
- func (bq *BigQuery) Ping(ctx context.Context) error
- func (bq *BigQuery) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (bq *BigQuery) RunJob(ctx context.Context, runner JobRunner, jobDescription string) (job *bigquery.Job, state bulker.WarehouseState, err error)
- func (bq *BigQuery) Select(ctx context.Context, namespace string, tableName string, ...) ([]map[string]any, error)
- func (bq *BigQuery) StringifyObjects() bool
- func (bq *BigQuery) TableHelper() *TableHelper
- func (bq *BigQuery) TableName(identifier string) string
- func (bq *BigQuery) TmpNamespace(namespace string) string
- func (bq *BigQuery) TruncateTable(ctx context.Context, namespace string, tableName string) error
- func (bq *BigQuery) Type() string
- func (bq *BigQuery) Update(ctx context.Context, namespace, tableName string, object types2.Object, ...) (err error)
- type ClickHouse
- func (ch *ClickHouse) Config() *ClickHouseConfig
- func (ch *ClickHouse) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulkerlib.WarehouseState, err error)
- func (ch *ClickHouse) Count(ctx context.Context, namespace string, tableName string, ...) (int, error)
- func (ch *ClickHouse) CreateStream(id, tableName string, mode bulkerlib.BulkMode, ...) (bulkerlib.BulkerStream, error)
- func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error
- func (ch *ClickHouse) Delete(ctx context.Context, namespace string, tableName string, ...) error
- func (ch *ClickHouse) Drop(ctx context.Context, table *Table, ifExists bool) error
- func (ch *ClickHouse) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error
- func (ch *ClickHouse) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
- func (ch *ClickHouse) InitDatabase(ctx context.Context) error
- func (ch *ClickHouse) Insert(ctx context.Context, table *Table, _ bool, objects ...types.Object) (err error)
- func (ch *ClickHouse) IsDistributed() bool
- func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulkerlib.WarehouseState, err error)
- func (ch *ClickHouse) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (ch *ClickHouse) PatchTableSchema(ctx context.Context, patchSchema *Table) error
- func (ch *ClickHouse) Ping(_ context.Context) error
- func (ch *ClickHouse) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (ch *ClickHouse) Select(ctx context.Context, namespace string, tableName string, ...) ([]map[string]any, error)
- func (ch *ClickHouse) TmpNamespace(string) string
- func (ch *ClickHouse) TruncateTable(ctx context.Context, namespace string, tableName string) error
- func (ch *ClickHouse) Type() string
- type ClickHouseCluster
- type ClickHouseConfig
- type ClickHouseProtocol
- type ColumnDDLFunction
- type ColumnScanner
- type Columns
- type ConWithDB
- func (c *ConWithDB) Close() error
- func (c *ConWithDB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (c *ConWithDB) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
- func (c *ConWithDB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (c *ConWithDB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- type DB
- type DataSourceConfig
- type DatePartition
- type DbConnectFunction
- type DeduplicationLine
- type DummyTypeResolver
- type EngineConfig
- type ErrorAdapter
- type Field
- type FieldConfig
- type Fields
- type Granularity
- type IdentifierFunction
- type JobRunner
- type LoadSource
- type LoadSourceType
- type MySQL
- func (m *MySQL) BuildConstraintName(tableName string) string
- func (m *MySQL) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error)
- func (m *MySQL) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (m *MySQL) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (m *MySQL) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
- func (m *MySQL) InitDatabase(ctx context.Context) error
- func (m *MySQL) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error
- func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
- func (m *MySQL) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (m *MySQL) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- type ParameterPlaceholder
- type Postgres
- func (p *Postgres) Close() error
- func (p *Postgres) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error)
- func (p *Postgres) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (p *Postgres) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (p *Postgres) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
- func (p *Postgres) InitDatabase(ctx context.Context) error
- func (p *Postgres) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error
- func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
- func (p *Postgres) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (p *Postgres) Ping(ctx context.Context) error
- func (p *Postgres) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- type PostgresConfig
- type QueryPayload
- type Redshift
- func (p *Redshift) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error)
- func (p *Redshift) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (p *Redshift) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (p *Redshift) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
- func (p *Redshift) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error
- func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
- func (p *Redshift) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (p *Redshift) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (p *Redshift) TmpNamespace(string) string
- func (p *Redshift) Type() string
- type RedshiftIAM
- func (p *RedshiftIAM) Close() error
- func (p *RedshiftIAM) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error)
- func (p *RedshiftIAM) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (p *RedshiftIAM) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (p *RedshiftIAM) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
- func (p *RedshiftIAM) InitDatabase(ctx context.Context) error
- func (p *RedshiftIAM) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error
- func (p *RedshiftIAM) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
- func (p *RedshiftIAM) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (p *RedshiftIAM) Ping(ctx context.Context) error
- func (p *RedshiftIAM) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (p *RedshiftIAM) Type() string
- type ReplacePartitionStream
- func (ps *ReplacePartitionStream) Complete(ctx context.Context) (state bulker.State, err error)
- func (ps *ReplacePartitionStream) Consume(ctx context.Context, object types.Object) (state bulker.State, processedObjects types.Object, err error)
- func (ps *ReplacePartitionStream) ConsumeJSON(ctx context.Context, json []byte) (state bulker.State, processedObject types.Object, err error)
- func (ps *ReplacePartitionStream) ConsumeMap(ctx context.Context, mp map[string]any) (state bulker.State, processedObject types.Object, err error)
- type ReplaceTableStream
- type RepresentationTable
- type S3OptionConfig
- type SQLAdapter
- type SQLAdapterBase
- func (b *SQLAdapterBase[T]) BuildConstraintName(tableName string) string
- func (b *SQLAdapterBase[T]) Close() error
- func (b *SQLAdapterBase[T]) ColumnName(identifier string) string
- func (b *SQLAdapterBase[T]) Count(ctx context.Context, namespace string, tableName string, ...) (int, error)
- func (b *SQLAdapterBase[T]) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (b *SQLAdapterBase[T]) DefaultNamespace() string
- func (b *SQLAdapterBase[T]) Delete(ctx context.Context, namespace string, tableName string, ...) error
- func (b *SQLAdapterBase[T]) DeleteAll(ctx context.Context, namespace, tableName string) error
- func (b *SQLAdapterBase[T]) Drop(ctx context.Context, table *Table, ifExists bool) error
- func (b *SQLAdapterBase[T]) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error
- func (b *SQLAdapterBase[T]) GetAvroSchema(table *Table) *types2.AvroSchema
- func (b *SQLAdapterBase[T]) GetAvroType(sqlType string) (any, bool)
- func (b *SQLAdapterBase[T]) GetBatchFileCompression() types2.FileCompression
- func (b *SQLAdapterBase[T]) GetBatchFileFormat() types2.FileFormat
- func (b *SQLAdapterBase[T]) GetDataType(sqlType string) (types2.DataType, bool)
- func (b *SQLAdapterBase[T]) GetSQLType(dataType types2.DataType) (string, bool)
- func (b *SQLAdapterBase[T]) PatchTableSchema(ctx context.Context, patchTable *Table) error
- func (b *SQLAdapterBase[T]) Ping(ctx context.Context) error
- func (b *SQLAdapterBase[T]) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (b *SQLAdapterBase[T]) Select(ctx context.Context, namespace string, tableName string, ...) ([]map[string]any, error)
- func (b *SQLAdapterBase[T]) StringifyObjects() bool
- func (b *SQLAdapterBase[T]) TableHelper() *TableHelper
- func (b *SQLAdapterBase[T]) TableName(identifier string) string
- func (b *SQLAdapterBase[T]) TmpNamespace(targetNamespace string) string
- func (b *SQLAdapterBase[T]) ToWhenConditions(conditions *WhenConditions, paramExpression ParameterPlaceholder, ...) (string, []any)
- func (b *SQLAdapterBase[T]) TruncateTable(ctx context.Context, namespace string, tableName string) error
- func (b *SQLAdapterBase[T]) Type() string
- func (b *SQLAdapterBase[T]) Update(ctx context.Context, table *Table, object types2.Object, ...) error
- type SSLConfig
- type Snowflake
- func (s *Snowflake) BuildConstraintName(tableName string) string
- func (s *Snowflake) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error)
- func (s *Snowflake) CreateStream(id, tableName string, mode bulker.BulkMode, ...) (bulker.BulkerStream, error)
- func (s *Snowflake) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (s *Snowflake) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
- func (s *Snowflake) InitDatabase(ctx context.Context) error
- func (s *Snowflake) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error
- func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
- func (s *Snowflake) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (s *Snowflake) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) (err error)
- func (s *Snowflake) Select(ctx context.Context, namespace string, tableName string, ...) ([]map[string]any, error)
- type SnowflakeConfig
- type Table
- func (t *Table) CleanClone() *Table
- func (t *Table) Clone() *Table
- func (t *Table) CloneIfNeeded() *Table
- func (t *Table) ColumnNames() []string
- func (t *Table) ColumnsCount() int
- func (t *Table) Diff(sqlAdapter SQLAdapter, another *Table) *Table
- func (t *Table) Exists() bool
- func (t *Table) GetPKFields() []string
- func (t *Table) GetPKFieldsSet() types2.OrderedSet[string]
- func (t *Table) MappedColumnNames(f func(string) string) []string
- func (t *Table) MappedColumns(f func(string, types.SQLColumn) string) []string
- func (t *Table) ToArray() []types.SQLColumn
- func (t *Table) ToSimpleMap() *types2.OrderedMap[string, any]
- func (t *Table) WithoutColumns() *Table
- type TableField
- type TableHelper
- func (th *TableHelper) ColumnName(columnName string) string
- func (th *TableHelper) EnsureTableWithCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, ...) (*Table, error)
- func (th *TableHelper) EnsureTableWithoutCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, ...) (*Table, error)
- func (th *TableHelper) Get(ctx context.Context, sqlAdapter SQLAdapter, namespace string, tableName string, ...) (*Table, error)
- func (th *TableHelper) GetCached(tableName string) (*Table, bool)
- func (th *TableHelper) MapSchema(sqlAdapter SQLAdapter, schema types2.Schema, ...) *Table
- func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesHeader, object types2.Object, ...) (*Table, types2.Object)
- func (th *TableHelper) TableName(tableName string) string
- type TableStatementFactory
- type TransactionalStream
- type TxOrDB
- type TxSQLAdapter
- func (tx *TxSQLAdapter) BuildConstraintName(tableName string) string
- func (tx *TxSQLAdapter) ColumnName(identifier string) string
- func (tx *TxSQLAdapter) Commit() error
- func (tx *TxSQLAdapter) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error)
- func (tx *TxSQLAdapter) Count(ctx context.Context, namespace string, tableName string, ...) (int, error)
- func (tx *TxSQLAdapter) CreateTable(ctx context.Context, schemaToCreate *Table) error
- func (tx *TxSQLAdapter) DefaultNamespace() string
- func (tx *TxSQLAdapter) Delete(ctx context.Context, namespace string, tableName string, ...) error
- func (tx *TxSQLAdapter) Drop(ctx context.Context, table *Table, ifExists bool) error
- func (tx *TxSQLAdapter) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error
- func (tx *TxSQLAdapter) GetAvroSchema(table *Table) *types2.AvroSchema
- func (tx *TxSQLAdapter) GetAvroType(sqlType string) (any, bool)
- func (tx *TxSQLAdapter) GetBatchFileCompression() types2.FileCompression
- func (tx *TxSQLAdapter) GetBatchFileFormat() types2.FileFormat
- func (tx *TxSQLAdapter) GetDataType(sqlType string) (types2.DataType, bool)
- func (tx *TxSQLAdapter) GetSQLType(dataType types2.DataType) (string, bool)
- func (tx *TxSQLAdapter) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
- func (tx *TxSQLAdapter) InitDatabase(ctx context.Context) error
- func (tx *TxSQLAdapter) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error
- func (tx *TxSQLAdapter) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (bulker.WarehouseState, error)
- func (tx *TxSQLAdapter) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
- func (tx *TxSQLAdapter) PatchTableSchema(ctx context.Context, patchTable *Table) error
- func (tx *TxSQLAdapter) Ping(ctx context.Context) error
- func (tx *TxSQLAdapter) ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, ...) error
- func (tx *TxSQLAdapter) Rollback() error
- func (tx *TxSQLAdapter) Select(ctx context.Context, namespace string, tableName string, ...) ([]map[string]any, error)
- func (tx *TxSQLAdapter) StringifyObjects() bool
- func (tx *TxSQLAdapter) TableHelper() *TableHelper
- func (tx *TxSQLAdapter) TableName(identifier string) string
- func (tx *TxSQLAdapter) TmpNamespace(targetNamespace string) string
- func (tx *TxSQLAdapter) TruncateTable(ctx context.Context, namespace string, tableName string) error
- func (tx *TxSQLAdapter) Type() string
- type TxWrapper
- func NewCustomWrapper(dbType string, custom io.Closer, err error) *TxWrapper
- func NewDbWrapper(dbType string, db DB, queryLogger *logging.QueryLogger, ...) *TxWrapper
- func NewDummyTxWrapper(dbType string) *TxWrapper
- func NewTxWrapper(dbType string, tx *sql.Tx, queryLogger *logging.QueryLogger, ...) *TxWrapper
- func (t *TxWrapper) Commit() error
- func (t *TxWrapper) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
- func (t *TxWrapper) GetCustom() (io.Closer, error)
- func (t *TxWrapper) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
- func (t *TxWrapper) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
- func (t *TxWrapper) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
- func (t *TxWrapper) Rollback() error
- type TypeCastFunction
- type TypeResolver
- type TypeResolverImpl
- type TypesHeader
- type ValueMappingFunction
- type WhenCondition
- type WhenConditions
Constants ¶
const ( BigQueryAutocommitUnsupported = "BigQuery bulker doesn't support auto commit mode as not efficient" BigqueryBulkerTypeId = "bigquery" )
const ( SSLModeRequire string = "require" SSLModeDisable string = "disable" SSLModeVerifyCA string = "verify-ca" SSLModeVerifyFull string = "verify-full" SSLModeNotProvided string = "" )
const BulkerManagedPkConstraintPrefix = "jitsu_pk_"
const (
ClickHouseBulkerTypeId = "clickhouse"
)
const ContextTransactionKey = "transaction"
const (
MySQLBulkerTypeId = "mysql"
)
const ( // that value indicates that table must not use namespace (schema or db) in queries. // e.g. for Redshift where temporary tables don't belong to any schema NoNamespaceValue = "__jitsu_no_namespace__" )
const PartitonIdKeyword = "__partition_id"
const (
PostgresBulkerTypeId = "postgres"
)
const (
RedshiftBulkerTypeId = "redshift"
)
const (
SnowflakeBulkerTypeId = "snowflake"
)
Variables ¶
var ( ColumnTypesOption = bulker.ImplementationOption[types.SQLTypes]{ Key: "columnTypes", DefaultValue: types.SQLTypes{}, AdvancedParseFunc: func(o *bulker.ImplementationOption[types.SQLTypes], serializedValue any) (bulker.StreamOption, error) { switch v := serializedValue.(type) { case map[string]any: sqlTypes := types.SQLTypes{} for key, value := range v { switch t := value.(type) { case string: sqlTypes.With(key, t) case []string: if len(t) == 1 { sqlTypes.With(key, t[0]) } else if len(t) == 2 { sqlTypes.WithDDL(key, t[0], t[1]) } else { return nil, fmt.Errorf("failed to parse 'columnTypes' option: %v incorrect number of elements. expected 1 or 2", v) } } } return withColumnTypes(o, sqlTypes), nil default: return nil, fmt.Errorf("failed to parse 'columnTypes' option: %v incorrect type: %T expected map[string]any", v, v) } }, } DeduplicateWindow = bulker.ImplementationOption[int]{ Key: "deduplicateWindow", DefaultValue: 31, ParseFunc: utils.ParseInt, } OmitNilsOption = bulker.ImplementationOption[bool]{ Key: "omitNils", DefaultValue: true, ParseFunc: utils.ParseBool, } SchemaFreezeOption = bulker.ImplementationOption[bool]{ Key: "schemaFreeze", DefaultValue: false, ParseFunc: utils.ParseBool, } MaxColumnsCount = bulker.ImplementationOption[int]{ Key: "maxColumnsCount", DefaultValue: 5000, ParseFunc: utils.ParseInt, } TemporaryBatchSizeOption = bulker.ImplementationOption[int]{ Key: "temporaryBatchSize", DefaultValue: 0, ParseFunc: utils.ParseInt, } )
var BigQueryPartitonIdRegex = regexp.MustCompile("(\\w+)/(\\d\\d\\d\\d-\\d\\d-\\d\\dT\\d\\d:\\d\\d:\\d\\dZ)")
var DefaultTypeResolver = NewTypeResolver()
var ErrTableNotExist = errors.New("table doesn't exist")
var IndexParameterPlaceholder = func(i int, name string) string { return "$" + strconv.Itoa(i) }
var NamedParameterPlaceholder = func(i int, name string) string {
return "@" + name
}
var QuestionMarkParameterPlaceholder = func(i int, name string) string {
return "?"
}
Functions ¶
func GranularityToPartitionIds ¶
func GranularityToPartitionIds(g Granularity, t time.Time) []string
func NewBigquery ¶
NewBigquery return configured BigQuery bulker.Bulker instance
func NewClickHouse ¶
NewClickHouse returns configured ClickHouse adapter instance
func NewPostgres ¶
NewPostgres return configured Postgres bulker.Bulker instance
func NewRedshiftClassic ¶
NewRedshift returns configured Redshift adapter instance
func NewRedshiftIAM ¶
NewPostgres return configured Postgres bulker.Bulker instance
func NewSnowflake ¶
NewSnowflake returns configured Snowflake adapter instance
func ProcessSSL ¶
func ProcessSSL(dir string, dsc *PostgresConfig) error
ProcessSSL serializes SSL payload (ca, client cert, key) into files enriches input DataSourceConfig parameters with SSL config ssl configuration might be file path as well as string content
func WithColumnType ¶
func WithColumnType(columnName, sqlType string) bulker.StreamOption
WithColumnType provides overrides for column type of single column for current BulkerStream object fields
func WithColumnTypeDDL ¶
func WithColumnTypeDDL(columnName, sqlType, ddlType string) bulker.StreamOption
WithColumnTypeDDL provides overrides for column type and DDL type of single column for current BulkerStream object fields
func WithColumnTypes ¶
func WithColumnTypes(fields types.SQLTypes) bulker.StreamOption
WithColumnTypes provides overrides for column types of current BulkerStream object fields
func WithDeduplicateWindow ¶
func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption
func WithMaxColumnsCount ¶
func WithMaxColumnsCount(maxColumnsCount int) bulker.StreamOption
func WithOmitNils ¶
func WithOmitNils() bulker.StreamOption
func WithSchemaFreeze ¶
func WithSchemaFreeze() bulker.StreamOption
func WithTemporaryBatchSize ¶
func WithTemporaryBatchSize(temporaryBatchSize int) bulker.StreamOption
func WithoutOmitNils ¶
func WithoutOmitNils() bulker.StreamOption
Types ¶
type AbstractSQLStream ¶
type AbstractSQLStream struct {
// contains filtered or unexported fields
}
type AbstractTransactionalSQLStream ¶
type AbstractTransactionalSQLStream struct { *AbstractSQLStream // contains filtered or unexported fields }
func (*AbstractTransactionalSQLStream) Abort ¶
func (ps *AbstractTransactionalSQLStream) Abort(ctx context.Context) (state bulker.State)
func (*AbstractTransactionalSQLStream) ConsumeJSON ¶
type AutoCommitStream ¶
type AutoCommitStream struct {
*AbstractSQLStream
}
func (*AutoCommitStream) Abort ¶
func (ps *AutoCommitStream) Abort(ctx context.Context) (state bulker.State)
func (*AutoCommitStream) ConsumeJSON ¶
type BQItem ¶
type BQItem struct {
// contains filtered or unexported fields
}
BQItem struct for streaming inserts to BigQuery
type BigQuery ¶
BigQuery adapter for creating,patching (schema or table), inserting and copying data from gcs to BigQuery
func (*BigQuery) BuildConstraintName ¶
func (*BigQuery) ColumnName ¶
func (*BigQuery) CopyTables ¶
func (*BigQuery) CreateStream ¶
func (bq *BigQuery) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*BigQuery) CreateTable ¶
CreateTable creates google BigQuery table from Table
func (*BigQuery) DefaultNamespace ¶
func (*BigQuery) DeletePartition ¶
func (*BigQuery) DropTable ¶
func (bq *BigQuery) DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error
DropTable drops table from BigQuery
func (*BigQuery) GetAvroSchema ¶
func (bq *BigQuery) GetAvroSchema(table *Table) *types2.AvroSchema
func (*BigQuery) GetBatchFileCompression ¶
func (bq *BigQuery) GetBatchFileCompression() types2.FileCompression
func (*BigQuery) GetBatchFileFormat ¶
func (bq *BigQuery) GetBatchFileFormat() types2.FileFormat
func (*BigQuery) GetDataType ¶
func (*BigQuery) GetSQLType ¶
func (*BigQuery) GetTableSchema ¶
func (bq *BigQuery) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
GetTableSchema return google BigQuery table (name,columns) representation wrapped in Table struct
func (*BigQuery) InitDatabase ¶
InitDatabase creates google BigQuery Dataset if doesn't exist
func (*BigQuery) LoadTable ¶
func (bq *BigQuery) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
func (*BigQuery) PatchTableSchema ¶
PatchTableSchema adds Table columns to google BigQuery table
func (*BigQuery) ReplaceTable ¶
func (*BigQuery) StringifyObjects ¶
func (*BigQuery) TableHelper ¶
func (bq *BigQuery) TableHelper() *TableHelper
func (*BigQuery) TmpNamespace ¶
func (*BigQuery) TruncateTable ¶
TruncateTable deletes all records in tableName table
type ClickHouse ¶
type ClickHouse struct { *SQLAdapterBase[ClickHouseConfig] // contains filtered or unexported fields }
ClickHouse is adapter for creating,patching (schema or table), inserting data to clickhouse
func (*ClickHouse) Config ¶
func (ch *ClickHouse) Config() *ClickHouseConfig
func (*ClickHouse) CopyTables ¶
func (ch *ClickHouse) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulkerlib.WarehouseState, err error)
func (*ClickHouse) Count ¶
func (ch *ClickHouse) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error)
func (*ClickHouse) CreateStream ¶
func (ch *ClickHouse) CreateStream(id, tableName string, mode bulkerlib.BulkMode, streamOptions ...bulkerlib.StreamOption) (bulkerlib.BulkerStream, error)
func (*ClickHouse) CreateTable ¶
func (ch *ClickHouse) CreateTable(ctx context.Context, table *Table) error
CreateTable create database table with name,columns provided in Table representation New tables will have MergeTree() or ReplicatedMergeTree() engine depends on config.cluster empty or not
func (*ClickHouse) Delete ¶
func (ch *ClickHouse) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error
func (*ClickHouse) GetTableSchema ¶
func (ch *ClickHouse) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
GetTableSchema return table (name,columns with name and types) representation wrapped in Table struct
func (*ClickHouse) InitDatabase ¶
func (ch *ClickHouse) InitDatabase(ctx context.Context) error
InitDatabase create database instance if doesn't exist
func (*ClickHouse) IsDistributed ¶
func (ch *ClickHouse) IsDistributed() bool
func (*ClickHouse) LoadTable ¶
func (ch *ClickHouse) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulkerlib.WarehouseState, err error)
LoadTable transfer data from local file to ClickHouse table
func (*ClickHouse) OpenTx ¶
func (ch *ClickHouse) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
OpenTx relies on ClickHouse session by creating new connection and wrapping it with TxSQLAdapter it makes sure that all activity happens in one connection.
func (*ClickHouse) PatchTableSchema ¶
func (ch *ClickHouse) PatchTableSchema(ctx context.Context, patchSchema *Table) error
PatchTableSchema add new columns(from provided Table) to existing table drop and create distributed table
func (*ClickHouse) ReplaceTable ¶
func (*ClickHouse) Select ¶
func (ch *ClickHouse) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)
func (*ClickHouse) TmpNamespace ¶
func (ch *ClickHouse) TmpNamespace(string) string
func (*ClickHouse) TruncateTable ¶
TruncateTable deletes all records in tableName table
func (*ClickHouse) Type ¶
func (ch *ClickHouse) Type() string
type ClickHouseCluster ¶
type ClickHouseCluster interface { IsDistributed() bool Config() *ClickHouseConfig }
type ClickHouseConfig ¶
type ClickHouseConfig struct { Protocol ClickHouseProtocol `mapstructure:"protocol,omitempty" json:"protocol,omitempty" yaml:"protocol,omitempty"` Hosts []string `mapstructure:"hosts,omitempty" json:"hosts,omitempty" yaml:"hosts,omitempty"` Parameters map[string]string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"` Username string `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"` Password string `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"` Database string `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"` Cluster string `mapstructure:"cluster,omitempty" json:"cluster,omitempty" yaml:"cluster,omitempty"` TLS map[string]string `mapstructure:"tls,omitempty" json:"tls,omitempty" yaml:"tls,omitempty"` Engine *EngineConfig `mapstructure:"engine,omitempty" json:"engine,omitempty" yaml:"engine,omitempty"` LoadAsJSON bool `mapstructure:"loadAsJson,omitempty" json:"loadAsJson,omitempty" yaml:"loadAsJson,omitempty"` }
ClickHouseConfig dto for deserialized clickhouse config
func (*ClickHouseConfig) Validate ¶
func (chc *ClickHouseConfig) Validate() error
Validate required fields in ClickHouseConfig
type ClickHouseProtocol ¶
type ClickHouseProtocol string
const ( ClickHouseProtocolNative ClickHouseProtocol = "clickhouse" ClickHouseProtocolSecure ClickHouseProtocol = "clickhouse-secure" ClickHouseProtocolHTTP ClickHouseProtocol = "http" ClickHouseProtocolHTTPS ClickHouseProtocol = "https" )
type ColumnDDLFunction ¶
ColumnDDLFunction generate column DDL for CREATE TABLE statement based on type (SQLColumn) and whether it is used for PK
type ColumnScanner ¶
type ColumnScanner struct { ColumnType *sql.ColumnType // contains filtered or unexported fields }
func (*ColumnScanner) Get ¶
func (s *ColumnScanner) Get() any
func (*ColumnScanner) Scan ¶
func (s *ColumnScanner) Scan(src any) error
type Columns ¶
type Columns = *types2.OrderedMap[string, types.SQLColumn]
Columns is a list of columns representation
func NewColumns ¶
func NewColumns() Columns
func NewColumnsFromArrays ¶
type ConWithDB ¶
type ConWithDB struct {
// contains filtered or unexported fields
}
func (*ConWithDB) ExecContext ¶
func (*ConWithDB) PrepareContext ¶
func (*ConWithDB) QueryContext ¶
type DataSourceConfig ¶
type DataSourceConfig struct { Host string `mapstructure:"host,omitempty" json:"host,omitempty" yaml:"host,omitempty"` Port int `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"` Db string `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"` Schema string `mapstructure:"defaultSchema,omitempty" json:"defaultSchema,omitempty" yaml:"defaultSchema,omitempty"` Username string `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"` Password string `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"` Parameters map[string]string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"` }
DataSourceConfig dto for deserialized datasource config (e.g. in Postgres or AwsRedshift destination)
func (*DataSourceConfig) Validate ¶
func (dsc *DataSourceConfig) Validate() error
Validate required fields in DataSourceConfig
type DatePartition ¶
type DatePartition struct { Field string Value time.Time Granularity Granularity }
type DbConnectFunction ¶
DbConnectFunction function is used to connect to database
type DeduplicationLine ¶
type DeduplicationLine struct {
// contains filtered or unexported fields
}
type DummyTypeResolver ¶
type DummyTypeResolver struct { }
DummyTypeResolver doesn't do anything
func NewDummyTypeResolver ¶
func NewDummyTypeResolver() *DummyTypeResolver
NewDummyTypeResolver return DummyTypeResolver
type EngineConfig ¶
type EngineConfig struct { RawStatement string `mapstructure:"rawStatement,omitempty" json:"rawStatement,omitempty" yaml:"rawStatement,omitempty"` NullableFields []string `mapstructure:"nullableFields,omitempty" json:"nullableFields,omitempty" yaml:"nullableFields,omitempty"` PartitionFields []FieldConfig `mapstructure:"partitionFields,omitempty" json:"partitionFields,omitempty" yaml:"partitionFields,omitempty"` OrderFields []FieldConfig `mapstructure:"orderFields,omitempty" json:"orderFields,omitempty" yaml:"orderFields,omitempty"` PrimaryKeys []string `mapstructure:"primaryKeys,omitempty" json:"primaryKeys,omitempty" yaml:"primaryKeys,omitempty"` }
EngineConfig dto for deserialized clickhouse engine config
type ErrorAdapter ¶
ErrorAdapter is used to extract implementation specific payload and adapt to standard error
type Field ¶
type Field struct {
// contains filtered or unexported fields
}
Field is a data type holder with sql type suggestion
func NewFieldWithSQLType ¶
NewFieldWithSQLType returns Field instance with configured suggested sql types
func (Field) GetSuggestedSQLType ¶
GetSuggestedSQLType returns suggested SQL type if configured
type FieldConfig ¶
type FieldConfig struct { Function string `mapstructure:"function,omitempty" json:"function,omitempty" yaml:"function,omitempty"` Field string `mapstructure:"field,omitempty" json:"field,omitempty" yaml:"field,omitempty"` }
FieldConfig dto for deserialized clickhouse engine fields
type Granularity ¶
type Granularity string
Granularity is a granularity of TimeInterval
const ( HOUR Granularity = "HOUR" DAY Granularity = "DAY" WEEK Granularity = "WEEK" MONTH Granularity = "MONTH" QUARTER Granularity = "QUARTER" YEAR Granularity = "YEAR" ALL Granularity = "ALL" )
func ParseGranularity ¶
func ParseGranularity(s string) (Granularity, error)
ParseGranularity returns Granularity value from string
func (Granularity) Format ¶
func (g Granularity) Format(t time.Time) string
Format returns formatted string value representation
func (Granularity) Lower ¶
func (g Granularity) Lower(t time.Time) time.Time
Lower returns the lower value of interval
func (Granularity) String ¶
func (g Granularity) String() string
String returns string value representation
type IdentifierFunction ¶
type IdentifierFunction func(identifier string, alphanumeric bool) (adapted string, needQuotes bool)
IdentifierFunction adapts identifier name to format required by database e.g. masks or escapes special characters
type LoadSource ¶
type LoadSource struct { Type LoadSourceType Format types2.FileFormat Path string S3Config *S3OptionConfig }
type LoadSourceType ¶
type LoadSourceType string
const ( LocalFile LoadSourceType = "local_file" GoogleCloudStore LoadSourceType = "google_cloud_store" AmazonS3 LoadSourceType = "amazon_s3" )
type MySQL ¶
type MySQL struct { *SQLAdapterBase[DataSourceConfig] // contains filtered or unexported fields }
MySQL is adapter for creating, patching (schema or table), inserting data to mySQL database
func (*MySQL) BuildConstraintName ¶
func (*MySQL) CopyTables ¶
func (*MySQL) CreateStream ¶
func (m *MySQL) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*MySQL) CreateTable ¶
func (*MySQL) GetTableSchema ¶
func (m *MySQL) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct
func (*MySQL) InitDatabase ¶
InitDatabase creates database instance if doesn't exist
func (*MySQL) LoadTable ¶
func (m *MySQL) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
type ParameterPlaceholder ¶
type Postgres ¶
type Postgres struct { *SQLAdapterBase[PostgresConfig] // contains filtered or unexported fields }
Postgres is adapter for creating,patching (schema or table), inserting data to postgres
func (*Postgres) CopyTables ¶
func (*Postgres) CreateStream ¶
func (p *Postgres) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*Postgres) CreateTable ¶
func (*Postgres) GetTableSchema ¶
func (p *Postgres) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct
func (*Postgres) InitDatabase ¶
InitDatabase creates database schema instance if doesn't exist
func (*Postgres) LoadTable ¶
func (p *Postgres) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
type PostgresConfig ¶
type PostgresConfig struct { DataSourceConfig `mapstructure:",squash"` SSLConfig `mapstructure:",squash"` }
type QueryPayload ¶
type Redshift ¶
type Redshift struct { //Aws Redshift uses Postgres fork under the hood *Postgres // contains filtered or unexported fields }
Redshift adapter for creating,patching (schema or table), inserting and copying data from s3 to redshift
func (*Redshift) CopyTables ¶
func (*Redshift) CreateStream ¶
func (p *Redshift) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*Redshift) CreateTable ¶
func (*Redshift) GetTableSchema ¶
func (p *Redshift) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
GetTableSchema return table (name,columns, primary key) representation wrapped in Table struct
func (*Redshift) LoadTable ¶
func (p *Redshift) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
LoadTable copy transfer data from s3 to redshift by passing COPY request to redshift
func (*Redshift) OpenTx ¶
func (p *Redshift) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
OpenTx opens underline sql transaction and return wrapped instance
func (*Redshift) ReplaceTable ¶
func (*Redshift) TmpNamespace ¶
type RedshiftIAM ¶
type RedshiftIAM struct { *SQLAdapterBase[driver.RedshiftConfig] }
Postgres is adapter for creating,patching (schema or table), inserting data to postgres
func (*RedshiftIAM) CopyTables ¶
func (p *RedshiftIAM) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error)
func (*RedshiftIAM) CreateStream ¶
func (p *RedshiftIAM) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*RedshiftIAM) CreateTable ¶
func (p *RedshiftIAM) CreateTable(ctx context.Context, schemaToCreate *Table) error
func (*RedshiftIAM) GetTableSchema ¶
func (p *RedshiftIAM) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
GetTableSchema return table (name,columns, primary key) representation wrapped in Table struct
func (*RedshiftIAM) InitDatabase ¶
func (p *RedshiftIAM) InitDatabase(ctx context.Context) error
InitDatabase creates database schema instance if doesn't exist
func (*RedshiftIAM) LoadTable ¶
func (p *RedshiftIAM) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
LoadTable copy transfer data from s3 to redshift by passing COPY request to redshift
func (*RedshiftIAM) OpenTx ¶
func (p *RedshiftIAM) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
OpenTx opens underline sql transaction and return wrapped instance
func (*RedshiftIAM) ReplaceTable ¶
func (*RedshiftIAM) Type ¶
func (p *RedshiftIAM) Type() string
type ReplacePartitionStream ¶
type ReplacePartitionStream struct { *AbstractTransactionalSQLStream // contains filtered or unexported fields }
func (*ReplacePartitionStream) ConsumeJSON ¶
type ReplaceTableStream ¶
type ReplaceTableStream struct {
*AbstractTransactionalSQLStream
}
type RepresentationTable ¶
type S3OptionConfig ¶
type S3OptionConfig struct { AuthenticationMethod string `mapstructure:"authenticationMethod,omitempty" json:"authenticationMethod,omitempty" yaml:"authenticationMethod,omitempty"` AccessKeyID string `mapstructure:"accessKeyId,omitempty" json:"accessKeyId,omitempty" yaml:"accessKeyId,omitempty"` SecretAccessKey string `mapstructure:"secretAccessKey,omitempty" json:"secretAccessKey,omitempty" yaml:"secretAccessKey,omitempty"` RoleARN string `mapstructure:"roleARN,omitempty" json:"roleARN,omitempty" yaml:"roleARN,omitempty"` RoleARNExpiry time.Duration `mapstructure:"roleARNExpiry,omitempty" json:"roleARNExpiry,omitempty" yaml:"roleARNExpiry,omitempty"` ExternalID string `mapstructure:"externalID,omitempty" json:"externalID,omitempty" yaml:"externalID,omitempty"` Bucket string `mapstructure:"bucket,omitempty" json:"bucket,omitempty" yaml:"bucket,omitempty"` Region string `mapstructure:"region,omitempty" json:"region,omitempty" yaml:"region,omitempty"` Folder string `mapstructure:"folder,omitempty" json:"folder,omitempty" yaml:"folder,omitempty"` }
type SQLAdapter ¶
type SQLAdapter interface { Type() string //GetSQLType return mapping from generic bulker type to SQL type specific for this database GetSQLType(dataType types2.DataType) (string, bool) GetDataType(sqlType string) (types2.DataType, bool) GetAvroType(sqlType string) (any, bool) GetAvroSchema(table *Table) *types2.AvroSchema GetBatchFileFormat() types2.FileFormat GetBatchFileCompression() types2.FileCompression StringifyObjects() bool OpenTx(ctx context.Context) (*TxSQLAdapter, error) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error Ping(ctx context.Context) error // InitDatabase setups required db objects like 'schema' or 'dataset' if they don't exist InitDatabase(ctx context.Context) error TableHelper() *TableHelper GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error) CreateTable(ctx context.Context, schemaToCreate *Table) error CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (state bulker.WarehouseState, err error) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error) PatchTableSchema(ctx context.Context, patchTable *Table) error TruncateTable(ctx context.Context, namespace string, tableName string) error //(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error DropTable(ctx context.Context, namespace string, tableName string, ifExists bool) error Drop(ctx context.Context, table *Table, ifExists bool) error ReplaceTable(ctx context.Context, targetTableName string, replacementTable *Table, dropOldTable bool) error Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error) // ColumnName adapts column name to sql identifier rules of database ColumnName(rawColumn string) string // TableName adapts table name to sql identifier rules of database TableName(rawTableName string) string BuildConstraintName(tableName string) string DefaultNamespace() string // TmpNamespace returns namespace used by temporary tables, e.g. for warehouses where temporary tables // must not be specified with schema or db prefix NoNamespaceValue constant must be used TmpNamespace(targetNamespace string) string }
SQLAdapter is a manager for DWH tables
type SQLAdapterBase ¶
func (*SQLAdapterBase[T]) BuildConstraintName ¶
func (b *SQLAdapterBase[T]) BuildConstraintName(tableName string) string
func (*SQLAdapterBase[T]) ColumnName ¶
func (b *SQLAdapterBase[T]) ColumnName(identifier string) string
func (*SQLAdapterBase[T]) Count ¶
func (b *SQLAdapterBase[T]) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error)
func (*SQLAdapterBase[T]) CreateTable ¶
func (b *SQLAdapterBase[T]) CreateTable(ctx context.Context, schemaToCreate *Table) error
CreateTable create table columns and pk key override input table sql type with configured cast type make fields from Table PkFields - 'not null'
func (*SQLAdapterBase[T]) DefaultNamespace ¶
func (b *SQLAdapterBase[T]) DefaultNamespace() string
func (*SQLAdapterBase[T]) Delete ¶
func (b *SQLAdapterBase[T]) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error
func (*SQLAdapterBase[T]) DeleteAll ¶
func (b *SQLAdapterBase[T]) DeleteAll(ctx context.Context, namespace, tableName string) error
DeleteAll deletes all records in tableName table
func (*SQLAdapterBase[T]) GetAvroSchema ¶
func (b *SQLAdapterBase[T]) GetAvroSchema(table *Table) *types2.AvroSchema
func (*SQLAdapterBase[T]) GetAvroType ¶
func (b *SQLAdapterBase[T]) GetAvroType(sqlType string) (any, bool)
func (*SQLAdapterBase[T]) GetBatchFileCompression ¶
func (b *SQLAdapterBase[T]) GetBatchFileCompression() types2.FileCompression
func (*SQLAdapterBase[T]) GetBatchFileFormat ¶
func (b *SQLAdapterBase[T]) GetBatchFileFormat() types2.FileFormat
func (*SQLAdapterBase[T]) GetDataType ¶
func (b *SQLAdapterBase[T]) GetDataType(sqlType string) (types2.DataType, bool)
func (*SQLAdapterBase[T]) GetSQLType ¶
func (b *SQLAdapterBase[T]) GetSQLType(dataType types2.DataType) (string, bool)
func (*SQLAdapterBase[T]) PatchTableSchema ¶
func (b *SQLAdapterBase[T]) PatchTableSchema(ctx context.Context, patchTable *Table) error
PatchTableSchema alter table with columns (if not empty) recreate primary key (if not empty) or delete primary key if Table.DeletePrimaryKeyNamed is true
func (*SQLAdapterBase[T]) ReplaceTable ¶
func (*SQLAdapterBase[T]) Select ¶
func (b *SQLAdapterBase[T]) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)
func (*SQLAdapterBase[T]) StringifyObjects ¶
func (b *SQLAdapterBase[T]) StringifyObjects() bool
func (*SQLAdapterBase[T]) TableHelper ¶
func (b *SQLAdapterBase[T]) TableHelper() *TableHelper
func (*SQLAdapterBase[T]) TableName ¶
func (b *SQLAdapterBase[T]) TableName(identifier string) string
func (*SQLAdapterBase[T]) TmpNamespace ¶
func (b *SQLAdapterBase[T]) TmpNamespace(targetNamespace string) string
func (*SQLAdapterBase[T]) ToWhenConditions ¶
func (b *SQLAdapterBase[T]) ToWhenConditions(conditions *WhenConditions, paramExpression ParameterPlaceholder, valuesShift int) (string, []any)
ToWhenConditions generates WHEN clause for SQL query based on provided WhenConditions
paramExpression - SQLParameterExpression function that produce parameter placeholder for parametrized query, depending on database can be: IndexParameterPlaceholder, QuestionMarkParameterPlaceholder, NamedParameterPlaceholder
valuesShift - for parametrized query index of first when clause value in all values provided to query (for UPDATE queries 'valuesShift' = len(object fields))
func (*SQLAdapterBase[T]) TruncateTable ¶
func (*SQLAdapterBase[T]) Type ¶
func (b *SQLAdapterBase[T]) Type() string
Type returns Postgres type
func (*SQLAdapterBase[T]) Update ¶
func (b *SQLAdapterBase[T]) Update(ctx context.Context, table *Table, object types2.Object, whenConditions *WhenConditions) error
type SSLConfig ¶
type SSLConfig struct { SSLMode string `mapstructure:"sslMode,omitempty"` SSLServerCA string `mapstructure:"sslServerCA,omitempty"` SSLClientCert string `mapstructure:"sslClientCert,omitempty"` SSLClientKey string `mapstructure:"sslClientKey,omitempty"` }
SSLConfig is a dto for deserialized SSL configuration for Postgres
func (*SSLConfig) ValidateSSL ¶
ValidateSSL returns err if the ssl configuration is invalid
type Snowflake ¶
type Snowflake struct { *SQLAdapterBase[SnowflakeConfig] }
Snowflake is adapter for creating,patching (schema or table), inserting data to snowflake
func (*Snowflake) BuildConstraintName ¶
func (*Snowflake) CopyTables ¶
func (*Snowflake) CreateStream ¶
func (s *Snowflake) CreateStream(id, tableName string, mode bulker.BulkMode, streamOptions ...bulker.StreamOption) (bulker.BulkerStream, error)
func (*Snowflake) CreateTable ¶
func (*Snowflake) GetTableSchema ¶
func (s *Snowflake) GetTableSchema(ctx context.Context, namespace string, tableName string) (*Table, error)
GetTableSchema returns table (name,columns with name and types) representation wrapped in Table struct
func (*Snowflake) InitDatabase ¶
InitDatabase create database schema instance if doesn't exist
func (*Snowflake) Insert ¶
func (s *Snowflake) Insert(ctx context.Context, table *Table, merge bool, objects ...types2.Object) error
Insert inserts data with InsertContext as a single object or a batch into Snowflake
func (*Snowflake) LoadTable ¶
func (s *Snowflake) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (state bulker.WarehouseState, err error)
LoadTable transfer data from local file to Snowflake by passing COPY request to Snowflake
func (*Snowflake) OpenTx ¶
func (s *Snowflake) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
OpenTx opens underline sql transaction and return wrapped instance
func (*Snowflake) ReplaceTable ¶
type SnowflakeConfig ¶
type SnowflakeConfig struct { Account string `mapstructure:"account,omitempty" json:"account,omitempty" yaml:"account,omitempty"` Port int `mapstructure:"port,omitempty" json:"port,omitempty" yaml:"port,omitempty"` Db string `mapstructure:"database,omitempty" json:"database,omitempty" yaml:"database,omitempty"` Schema string `mapstructure:"defaultSchema,omitempty" json:"defaultSchema,omitempty" yaml:"defaultSchema,omitempty"` Username string `mapstructure:"username,omitempty" json:"username,omitempty" yaml:"username,omitempty"` Password string `mapstructure:"password,omitempty" json:"password,omitempty" yaml:"password,omitempty"` Warehouse string `mapstructure:"warehouse,omitempty" json:"warehouse,omitempty" yaml:"warehouse,omitempty"` Parameters map[string]*string `mapstructure:"parameters,omitempty" json:"parameters,omitempty" yaml:"parameters,omitempty"` }
SnowflakeConfig dto for deserialized datasource config for Snowflake
func (*SnowflakeConfig) Validate ¶
func (sc *SnowflakeConfig) Validate() error
Validate required fields in SnowflakeConfig
type Table ¶
type Table struct { Name string // database or schema depending on warehouse Namespace string Temporary bool Cached bool Columns Columns PKFields types2.OrderedSet[string] PrimaryKeyName string TimestampColumn string Partition DatePartition DeletePrimaryKeyNamed string }
Table is a dto for DWH Table representation
func (*Table) CleanClone ¶
CleanClone returns clone of current table w/o 'New' or 'Override' flags
func (*Table) CloneIfNeeded ¶
func (*Table) ColumnNames ¶
ColumnNames return column names as array
func (*Table) ColumnsCount ¶
func (*Table) Diff ¶
func (t *Table) Diff(sqlAdapter SQLAdapter, another *Table) *Table
Diff calculates diff between current schema and another one. Return schema to add to current schema (for being equal) or empty if 1) another one is empty 2) all fields from another schema exist in current schema NOTE: Diff method doesn't take types into account
func (*Table) GetPKFields ¶
GetPKFields returns primary keys list
func (*Table) GetPKFieldsSet ¶
func (t *Table) GetPKFieldsSet() types2.OrderedSet[string]
func (*Table) MappedColumns ¶
func (*Table) ToSimpleMap ¶
func (t *Table) ToSimpleMap() *types2.OrderedMap[string, any]
func (*Table) WithoutColumns ¶
type TableField ¶
type TableField struct { Field string `json:"field,omitempty"` Type string `json:"type,omitempty"` Value any `json:"value,omitempty"` }
TableField is a table column representation
type TableHelper ¶
TableHelper keeps tables schema state inmemory and update it according to incoming new data consider that all tables are in one destination schema. note: Assume that after any outer changes in db we need to increment table version in Service
func NewTableHelper ¶
func NewTableHelper(maxIdentifierLength int, identifierQuoteChar rune) TableHelper
NewTableHelper returns configured TableHelper instance Note: columnTypesMapping must be not empty (or fields will be ignored)
func (*TableHelper) ColumnName ¶
func (th *TableHelper) ColumnName(columnName string) string
func (*TableHelper) EnsureTableWithCaching ¶
func (th *TableHelper) EnsureTableWithCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, dataSchema *Table) (*Table, error)
EnsureTableWithCaching calls ensureTable with cacheTable = true it is used in stream destinations (because we don't have time to select table schema, but there is retry on error)
func (*TableHelper) EnsureTableWithoutCaching ¶
func (th *TableHelper) EnsureTableWithoutCaching(ctx context.Context, sqlAdapter SQLAdapter, destinationID string, dataSchema *Table) (*Table, error)
EnsureTableWithoutCaching calls ensureTable with cacheTable = true it is used in batch destinations and syncStore (because we have time to select table schema)
func (*TableHelper) Get ¶
func (th *TableHelper) Get(ctx context.Context, sqlAdapter SQLAdapter, namespace string, tableName string, cacheTable bool) (*Table, error)
func (*TableHelper) MapSchema ¶
func (th *TableHelper) MapSchema(sqlAdapter SQLAdapter, schema types2.Schema, nameTransformer func(string) string) *Table
MapSchema maps types.Schema into types.Table (structure with SQL types)
func (*TableHelper) MapTableSchema ¶
func (th *TableHelper) MapTableSchema(sqlAdapter SQLAdapter, batchHeader *TypesHeader, object types2.Object, pkColumns []string, timestampColumn string, namespace string) (*Table, types2.Object)
MapTableSchema maps types.TypesHeader (JSON structure with json data types) into types.Table (structure with SQL types) applies column types mapping adjusts object properties names to column names
func (*TableHelper) TableName ¶
func (th *TableHelper) TableName(tableName string) string
type TableStatementFactory ¶
type TableStatementFactory struct {
// contains filtered or unexported fields
}
TableStatementFactory is used for creating CREATE TABLE statements depends on config
func NewTableStatementFactory ¶
func NewTableStatementFactory(ch ClickHouseCluster) *TableStatementFactory
func (TableStatementFactory) CreateTableStatement ¶
func (tsf TableStatementFactory) CreateTableStatement(namespacePrefix, quotedTableName, tableName, columnsClause string, table *Table) string
CreateTableStatement return clickhouse DDL for creating table statement
type TransactionalStream ¶
type TransactionalStream struct {
*AbstractTransactionalSQLStream
}
TODO: Use real temporary tables
type TxOrDB ¶
type TxOrDB interface { ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) }
type TxSQLAdapter ¶
type TxSQLAdapter struct {
// contains filtered or unexported fields
}
func (*TxSQLAdapter) BuildConstraintName ¶
func (tx *TxSQLAdapter) BuildConstraintName(tableName string) string
func (*TxSQLAdapter) ColumnName ¶
func (tx *TxSQLAdapter) ColumnName(identifier string) string
func (*TxSQLAdapter) Commit ¶
func (tx *TxSQLAdapter) Commit() error
func (*TxSQLAdapter) CopyTables ¶
func (tx *TxSQLAdapter) CopyTables(ctx context.Context, targetTable *Table, sourceTable *Table, mergeWindow int) (bulker.WarehouseState, error)
func (*TxSQLAdapter) Count ¶
func (tx *TxSQLAdapter) Count(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions) (int, error)
func (*TxSQLAdapter) CreateTable ¶
func (tx *TxSQLAdapter) CreateTable(ctx context.Context, schemaToCreate *Table) error
func (*TxSQLAdapter) DefaultNamespace ¶
func (tx *TxSQLAdapter) DefaultNamespace() string
func (*TxSQLAdapter) Delete ¶
func (tx *TxSQLAdapter) Delete(ctx context.Context, namespace string, tableName string, deleteConditions *WhenConditions) error
func (tx *TxSQLAdapter) Update(ctx context.Context, tableName string, object types.Object, whenConditions *WhenConditions) error { ctx = context.WithValue(ctx, ContextTransactionKey, tx.tx) return tx.sqlAdapter.Update(ctx, tableName, object, whenConditions) }
func (*TxSQLAdapter) GetAvroSchema ¶
func (tx *TxSQLAdapter) GetAvroSchema(table *Table) *types2.AvroSchema
func (*TxSQLAdapter) GetAvroType ¶
func (tx *TxSQLAdapter) GetAvroType(sqlType string) (any, bool)
func (*TxSQLAdapter) GetBatchFileCompression ¶
func (tx *TxSQLAdapter) GetBatchFileCompression() types2.FileCompression
func (*TxSQLAdapter) GetBatchFileFormat ¶
func (tx *TxSQLAdapter) GetBatchFileFormat() types2.FileFormat
func (*TxSQLAdapter) GetDataType ¶
func (tx *TxSQLAdapter) GetDataType(sqlType string) (types2.DataType, bool)
func (*TxSQLAdapter) GetSQLType ¶
func (tx *TxSQLAdapter) GetSQLType(dataType types2.DataType) (string, bool)
func (*TxSQLAdapter) GetTableSchema ¶
func (*TxSQLAdapter) InitDatabase ¶
func (tx *TxSQLAdapter) InitDatabase(ctx context.Context) error
func (*TxSQLAdapter) LoadTable ¶
func (tx *TxSQLAdapter) LoadTable(ctx context.Context, targetTable *Table, loadSource *LoadSource) (bulker.WarehouseState, error)
func (*TxSQLAdapter) OpenTx ¶
func (tx *TxSQLAdapter) OpenTx(ctx context.Context) (*TxSQLAdapter, error)
func (*TxSQLAdapter) PatchTableSchema ¶
func (tx *TxSQLAdapter) PatchTableSchema(ctx context.Context, patchTable *Table) error
func (*TxSQLAdapter) ReplaceTable ¶
func (*TxSQLAdapter) Rollback ¶
func (tx *TxSQLAdapter) Rollback() error
func (*TxSQLAdapter) Select ¶
func (tx *TxSQLAdapter) Select(ctx context.Context, namespace string, tableName string, whenConditions *WhenConditions, orderBy []string) ([]map[string]any, error)
func (*TxSQLAdapter) StringifyObjects ¶
func (tx *TxSQLAdapter) StringifyObjects() bool
func (*TxSQLAdapter) TableHelper ¶
func (tx *TxSQLAdapter) TableHelper() *TableHelper
func (*TxSQLAdapter) TableName ¶
func (tx *TxSQLAdapter) TableName(identifier string) string
func (*TxSQLAdapter) TmpNamespace ¶
func (tx *TxSQLAdapter) TmpNamespace(targetNamespace string) string
func (*TxSQLAdapter) TruncateTable ¶
func (*TxSQLAdapter) Type ¶
func (tx *TxSQLAdapter) Type() string
type TxWrapper ¶
type TxWrapper struct {
// contains filtered or unexported fields
}
TxWrapper is sql transaction wrapper. Used for handling and log errors with db type (postgres, mySQL, redshift or snowflake) on Commit() and Rollback() calls
func NewCustomWrapper ¶
func NewDbWrapper ¶
func NewDbWrapper(dbType string, db DB, queryLogger *logging.QueryLogger, errorAdapter ErrorAdapter, closeDb bool) *TxWrapper
func NewDummyTxWrapper ¶
func NewTxWrapper ¶
func NewTxWrapper(dbType string, tx *sql.Tx, queryLogger *logging.QueryLogger, errorAdapter ErrorAdapter) *TxWrapper
func (*TxWrapper) ExecContext ¶
ExecContext executes a query that doesn't return rows. For example: an INSERT and UPDATE.
func (*TxWrapper) PrepareContext ¶
PrepareContext creates a prepared statement for use within a transaction.
The returned statement operates within the transaction and will be closed when the transaction has been committed or rolled back.
To use an existing prepared statement on this transaction, see Tx.Stmt.
The provided context will be used for the preparation of the context, not for the execution of the returned statement. The returned statement will run in the transaction context.
func (*TxWrapper) QueryContext ¶
QueryContext executes a query that returns rows, typically a SELECT.
func (*TxWrapper) QueryRowContext ¶
QueryRowContext executes a query that is expected to return at most one row. QueryRowContext always returns a non-nil value. Errors are deferred until Row's Scan method is called. If the query selects no rows, the *Row's Scan will return ErrNoRows. Otherwise, the *Row's Scan scans the first selected row and discards the rest.
type TypeCastFunction ¶
TypeCastFunction wraps parameter(or placeholder) to a type cast expression if it is necessary (e.g. on types overrides)
type TypeResolver ¶
type TypeResolver interface {
Resolve(object map[string]any, sqlTypeHints types2.SQLTypes) (Fields, error)
}
TypeResolver resolves types.Fields from input object
type TypeResolverImpl ¶
type TypeResolverImpl struct { }
TypeResolverImpl resolves types based on converter.go rules
func NewTypeResolver ¶
func NewTypeResolver() *TypeResolverImpl
NewTypeResolver returns TypeResolverImpl
func (*TypeResolverImpl) Resolve ¶
func (tr *TypeResolverImpl) Resolve(object types2.Object, sqlTypeHints types2.SQLTypes) (Fields, error)
Resolve return types.Fields representation of input object apply default typecast and define column types reformat from json.Number into int64 or float64 and put back reformat from string with timestamp into time.Time and put back
type TypesHeader ¶
type TypesHeader struct { TableName string Fields Fields Partition DatePartition }
TypesHeader is the schema result of parsing JSON objects
func ProcessEvents ¶
func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, nameTransformer func(string) string, omitNils bool, stringifyObjects bool, notFlatteningKeys types2.Set[string]) (*TypesHeader, types.Object, error)
ProcessEvents processes events objects without applying mapping rules returns table headerm array of processed objects or error if at least 1 was occurred
func (*TypesHeader) Exists ¶
func (bh *TypesHeader) Exists() bool
Exists returns true if there is at least one field
type ValueMappingFunction ¶
ValueMappingFunction maps object value to database value. For cases such default value substitution for null or missing values
type WhenCondition ¶
WhenCondition is a representation of SQL delete condition
type WhenConditions ¶
type WhenConditions struct { Conditions []WhenCondition JoinCondition string }
WhenConditions is a dto for multiple WhenCondition instances with Joiner
func ByPartitionId ¶
func ByPartitionId(partitonId string) *WhenConditions
ByPartitionId return delete condition that removes objects based on __partition_id value or empty condition if partitonId is empty
func NewWhenConditions ¶
func NewWhenConditions(field string, clause string, value any) *WhenConditions
func (*WhenConditions) Add ¶
func (dc *WhenConditions) Add(field string, clause string, value any) *WhenConditions
func (*WhenConditions) IsEmpty ¶
func (dc *WhenConditions) IsEmpty() bool
IsEmpty returns true if there is no conditions
Source Files ¶
- abstract.go
- abstract_transactional.go
- autocommit_stream.go
- batch_header.go
- bigquery.go
- clickhouse.go
- datasource_config.go
- delete_condition.go
- mysql.go
- options.go
- postgres.go
- processor.go
- redshift.go
- redshift_iam.go
- replacepartition_stream.go
- replacetable_stream.go
- snowflake.go
- sql_adapter.go
- sql_adapter_base.go
- table.go
- table_helper.go
- transactional_stream.go
- tx_wrapper.go
- type_resolver.go
- utils.go