Documentation ¶
Index ¶
- Constants
- Variables
- func Datatype(in interface{}) string
- func DestStat(statType string, statName string, id string) stats.RudderStats
- func DoubleQuoteAndJoinByComma(strs []string) string
- func GetAzureBlobLocation(location string) string
- func GetAzureBlobLocationFolder(location string) string
- func GetConfigValue(key string, warehouse WarehouseT) (val string)
- func GetConfigValueAsMap(key string, config map[string]interface{}) map[string]interface{}
- func GetConfigValueBoolString(key string, warehouse WarehouseT) string
- func GetFirstTiming(str sql.NullString) (status string, recordedTime time.Time)
- func GetGCSLocation(location string, options GCSLocationOptionsT) string
- func GetGCSLocationFolder(location string, options GCSLocationOptionsT) string
- func GetGCSLocations(loadFiles []LoadFileT, options GCSLocationOptionsT) (gcsLocations []string)
- func GetIP() string
- func GetLastFailedStatus(str sql.NullString) (status string)
- func GetLastTiming(str sql.NullString) (status string, recordedTime time.Time)
- func GetLoadFileGenTime(str sql.NullString) (t time.Time)
- func GetNamespace(source backendconfig.SourceT, destination backendconfig.DestinationT, ...) (namespace string, exists bool)
- func GetObjectFolder(provider string, location string) (folder string)
- func GetObjectLocation(provider string, location string) (folder string)
- func GetObjectName(location string, providerConfig interface{}, objectProvider string) (objectName string, err error)
- func GetParquetValue(val interface{}, colType string) (retVal interface{}, err error)
- func GetS3Location(location string) (s3Location string, region string)
- func GetS3LocationFolder(location string) string
- func GetSlaveWorkerId(workerIdx int, slaveID string) string
- func GetTableFirstEventAt(dbHandle *sql.DB, sourceId string, destinationId string, tableName string, ...) (firstEventAt string)
- func GetTablePathInObjectStorage(namespace string, tableName string) string
- func GetTempFileExtension(destType string) string
- func GetTimeWindow(ts time.Time) time.Time
- func GetWarehouseIdentifier(destType string, sourceID string, destinationID string) string
- func IDResolutionEnabled() bool
- func IdentityMappingsTableName(warehouse WarehouseT) string
- func IdentityMappingsUniqueMappingConstraintName(warehouse WarehouseT) string
- func IdentityMappingsWarehouseTableName(provider string) string
- func IdentityMergeRulesTableName(warehouse WarehouseT) string
- func IdentityMergeRulesWarehouseTableName(provider string) string
- func JSONSchemaToMap(rawMsg json.RawMessage) map[string]map[string]string
- func JSONTimingsToMap(rawMsg json.RawMessage) []map[string]string
- func NewCsvReader(r io.Reader) *csvReader
- func NewEventReader(r io.Reader, provider string) eventReader
- func NewJSONReader(r io.Reader) *jsonReader
- func ObjectStorageType(destType string, config interface{}, useRudderStorage bool) string
- func SnowflakeCloudProvider(config interface{}) string
- func SortColumnKeysFromColumnMap(columnMap map[string]string) []string
- func TimingFromJSONString(str sql.NullString) (status string, recordedTime time.Time)
- func ToProviderCase(provider string, str string) string
- func ToSafeNamespace(provider string, name string) string
- type CsvLoader
- func (loader *CsvLoader) AddColumn(columnName string, columnType string, val interface{})
- func (loader *CsvLoader) AddEmptyColumn(columnName string)
- func (loader *CsvLoader) AddRow(columnNames []string, row []string)
- func (loader *CsvLoader) GetLoadTimeFomat(columnName string) string
- func (loader *CsvLoader) IsLoadTimeColumn(columnName string) bool
- func (loader *CsvLoader) Write() error
- func (loader *CsvLoader) WriteToString() (string, error)
- type DestinationT
- type EventLoader
- type GCSLocationOptionsT
- type GetLoadFilesOptionsT
- type JsonLoader
- func (loader *JsonLoader) AddColumn(columnName string, columnType string, val interface{})
- func (loader *JsonLoader) AddEmptyColumn(columnName string)
- func (loader *JsonLoader) AddRow(columnNames []string, row []string)
- func (loader *JsonLoader) GetLoadTimeFomat(columnName string) string
- func (loader *JsonLoader) IsLoadTimeColumn(columnName string) bool
- func (loader *JsonLoader) Write() error
- func (loader *JsonLoader) WriteToString() (string, error)
- type LoadFileT
- type LoadFileWriterI
- type ParquetLoader
- func (loader *ParquetLoader) AddColumn(columnName string, colType string, val interface{})
- func (loader *ParquetLoader) AddEmptyColumn(columnName string)
- func (loader *ParquetLoader) AddRow(columnNames []string, row []string)
- func (loader *ParquetLoader) GetLoadTimeFomat(columnName string) string
- func (loader *ParquetLoader) IsLoadTimeColumn(columnName string) bool
- func (loader *ParquetLoader) Write() error
- func (loader *ParquetLoader) WriteToString() (string, error)
- type ParquetWriter
- type PendingEventsRequestT
- type PendingEventsResponseT
- type QueryResult
- type SchemaT
- type StagingFileT
- type TableSchemaDiffT
- type TableSchemaT
- type TriggerUploadRequestT
- type UploaderI
- type WarehouseT
Constants ¶
const ( PARQUET_INT_64 = "type=INT64, repetitiontype=OPTIONAL" PARQUET_BOOLEAN = "type=BOOLEAN, repetitiontype=OPTIONAL" PARQUET_DOUBLE = "type=DOUBLE, repetitiontype=OPTIONAL" PARQUET_STRING = "type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL" PARQUET_TIMESTAMP_MICROS = "type=INT64, convertedtype=TIMESTAMP_MICROS, repetitiontype=OPTIONAL" )
const ( RS = "RS" BQ = "BQ" SNOWFLAKE = "SNOWFLAKE" POSTGRES = "POSTGRES" CLICKHOUSE = "CLICKHOUSE" MSSQL = "MSSQL" AZURE_SYNAPSE = "AZURE_SYNAPSE" )
const ( StagingFileSucceededState = "succeeded" StagingFileFailedState = "failed" StagingFileExecutingState = "executing" StagingFileAbortedState = "aborted" StagingFileWaitingState = "waiting" )
const ( WarehouseStagingFilesTable = "wh_staging_files" WarehouseLoadFilesTable = "wh_load_files" WarehouseUploadsTable = "wh_uploads" WarehouseTableUploadsTable = "wh_table_uploads" WarehouseSchemasTable = "wh_schemas" )
warehouse table names
const ( DiscardsTable = "rudder_discards" IdentityMergeRulesTable = "rudder_identity_merge_rules" IdentityMappingsTable = "rudder_identity_mappings" SyncFrequency = "syncFrequency" SyncStartAt = "syncStartAt" ExcludeWindow = "excludeWindow" ExcludeWindowStartTime = "excludeWindowStartTime" ExcludeWindowEndTime = "excludeWindowEndTime" )
const ( UsersTable = "users" IdentifiesTable = "identifies" )
const ( BQLoadedAtFormat = "2006-01-02 15:04:05.999999 Z" BQUuidTSFormat = "2006-01-02 15:04:05 Z" DatalakeTimeWindowFormat = "2006/01/02/15" )
const ( LOAD_FILE_TYPE_CSV = "csv" LOAD_FILE_TYPE_JSON = "json" LOAD_FILE_TYPE_PARQUET = "parquet" )
const LOADED_AT_COLUMN = "loaded_at"
const UUID_TS_COLUMN = "uuid_ts"
Variables ¶
var ( IdentityEnabledWarehouses []string AWSCredsExpiryInS int64 )
var DiscardsSchema = map[string]string{
"table_name": "string",
"row_id": "string",
"column_name": "string",
"column_value": "string",
"received_at": "datetime",
"uuid_ts": "datetime",
}
var ObjectStorageMap = map[string]string{
"RS": "S3",
"S3_DATALAKE": "S3",
"BQ": "GCS",
}
var ReservedKeywords = map[string]map[string]bool{ "SNOWFLAKE": { "ACCOUNT": true, "ALL": true, "ALTER": true, "AND": true, "ANY": true, "AS": true, "BETWEEN": true, "BY": true, "CASE": true, "CAST": true, "CHECK": true, "COLUMN": true, "CONNECT": true, "CONNECTION": true, "CONSTRAINT": true, "CREATE": true, "CROSS": true, "CURRENT": true, "CURRENT_DATE": true, "CURRENT_TIME": true, "CURRENT_TIMESTAMP": true, "CURRENT_USER": true, "DATABASE": true, "DELETE": true, "DISTINCT": true, "DROP": true, "ELSE": true, "EXISTS": true, "FALSE": true, "FOLLOWING": true, "FOR": true, "FROM": true, "FULL": true, "GRANT": true, "GROUP": true, "GSCLUSTER": true, "HAVING": true, "ILIKE": true, "IN": true, "INCREMENT": true, "INNER": true, "INSERT": true, "INTERSECT": true, "INTO": true, "IS": true, "ISSUE": true, "JOIN": true, "LATERAL": true, "LEFT": true, "LIKE": true, "LOCALTIME": true, "LOCALTIMESTAMP": true, "MINUS": true, "NATURAL": true, "NOT": true, "NULL": true, "OF": true, "ON": true, "OR": true, "ORDER": true, "ORGANIZATION": true, "QUALIFY": true, "REGEXP": true, "REVOKE": true, "RIGHT": true, "RLIKE": true, "ROW": true, "ROWS": true, "SAMPLE": true, "SCHEMA": true, "SELECT": true, "SET": true, "SOME": true, "START": true, "TABLE": true, "TABLESAMPLE": true, "THEN": true, "TO": true, "TRIGGER": true, "TRUE": true, "TRY_CAST": true, "UNION": true, "UNIQUE": true, "UPDATE": true, "USING": true, "VALUES": true, "VIEW": true, "WHEN": true, "WHENEVER": true, "WHERE": true, "WITH": true, }, "RS": { "AES128": true, "AES256": true, "ALL": true, "ALLOWOVERWRITE": true, "ANALYSE": true, "ANALYZE": true, "AND": true, "ANY": true, "ARRAY": true, "AS": true, "ASC": true, "AUTHORIZATION": true, "AZ64": true, "BACKUP": true, "BETWEEN": true, "BINARY": true, "BLANKSASNULL": true, "BOTH": true, "BYTEDICT": true, "BZIP2": true, "CASE": true, "CAST": true, "CHECK": true, "COLLATE": true, "COLUMN": true, "CONSTRAINT": true, "CREATE": true, "CREDENTIALS": true, "CROSS": true, "CURRENT_DATE": true, "CURRENT_TIME": true, "CURRENT_TIMESTAMP": true, "CURRENT_USER": true, "CURRENT_USER_ID": true, "DEFAULT": true, "DEFERRABLE": true, "DEFLATE": true, "DEFRAG": true, "DELTA": true, "DELTA32K": true, "DESC": true, "DISABLE": true, "DISTINCT": true, "DO": true, "ELSE": true, "EMPTYASNULL": true, "ENABLE": true, "ENCODE": true, "ENCRYPT ": true, "ENCRYPTION": true, "END": true, "EXCEPT": true, "EXPLICIT": true, "FALSE": true, "FOR": true, "FOREIGN": true, "FREEZE": true, "FROM": true, "FULL": true, "GLOBALDICT256": true, "GLOBALDICT64K": true, "GRANT": true, "GROUP": true, "GZIP": true, "HAVING": true, "IDENTITY": true, "IGNORE": true, "ILIKE": true, "IN": true, "INITIALLY": true, "INNER": true, "INTERSECT": true, "INTO": true, "IS": true, "ISNULL": true, "JOIN": true, "LANGUAGE": true, "LEADING": true, "LEFT": true, "LIKE": true, "LIMIT": true, "LOCALTIME": true, "LOCALTIMESTAMP": true, "LUN": true, "LUNS": true, "LZO": true, "LZOP": true, "MINUS": true, "MOSTLY13": true, "MOSTLY32": true, "MOSTLY8": true, "NATURAL": true, "NEW": true, "NOT": true, "NOTNULL": true, "NULL": true, "NULLS": true, "OFF": true, "OFFLINE": true, "OFFSET": true, "OID": true, "OLD": true, "ON": true, "ONLY": true, "OPEN": true, "OR": true, "ORDER": true, "OUTER": true, "OVERLAPS": true, "PARALLEL": true, "PARTITION": true, "PERCENT": true, "PERMISSIONS": true, "PLACING": true, "PRIMARY": true, "RAW": true, "READRATIO": true, "RECOVER": true, "REFERENCES": true, "RESPECT": true, "REJECTLOG": true, "RESORT": true, "RESTORE": true, "RIGHT": true, "SELECT": true, "SESSION_USER": true, "SIMILAR": true, "SNAPSHOT ": true, "SOME": true, "SYSDATE": true, "SYSTEM": true, "TABLE": true, "TAG": true, "TDES": true, "TEXT255": true, "TEXT32K": true, "THEN": true, "TIMESTAMP": true, "TO": true, "TOP": true, "TRAILING": true, "TRUE": true, "TRUNCATECOLUMNS": true, "UNION": true, "UNIQUE": true, "USER": true, "USING": true, "VERBOSE": true, "WALLET": true, "WHEN": true, "WHERE": true, "WITH": true, "WITHOUT": true, }, "POSTGRES": { "ALL": true, "ANALYSE": true, "ANALYZE": true, "AND": true, "ANY": true, "ARRAY": true, "AS": true, "ASC": true, "ASYMMETRIC": true, "AUTHORIZATION": true, "BETWEEN": true, "BINARY": true, "BOTH": true, "CASE": true, "CAST": true, "CHECK": true, "COLLATE": true, "COLUMN": true, "CONSTRAINT": true, "CREATE": true, "CROSS": true, "CURRENT_DATE": true, "CURRENT_ROLE": true, "CURRENT_TIME": true, "CURRENT_TIMESTAMP": true, "CURRENT_USER": true, "DEFAULT": true, "DEFERRABLE": true, "DESC": true, "DISTINCT": true, "DO": true, "ELSE": true, "END": true, "EXCEPT": true, "FALSE": true, "FOR": true, "FOREIGN": true, "FREEZE": true, "FROM": true, "FULL": true, "GRANT": true, "GROUP": true, "HAVING": true, "ILIKE": true, "IN": true, "INITIALLY": true, "INNER": true, "INTERSECT": true, "INTO": true, "IS": true, "ISNULL": true, "JOIN": true, "LEADING": true, "LEFT": true, "LIKE": true, "LIMIT": true, "LOCALTIME": true, "LOCALTIMESTAMP": true, "NATURAL": true, "NEW": true, "NOT": true, "NOTNULL": true, "NULL": true, "OFF": true, "OFFSET": true, "OLD": true, "ON": true, "ONLY": true, "OR": true, "ORDER": true, "OUTER": true, "OVERLAPS": true, "PLACING": true, "PRIMARY": true, "REFERENCES": true, "RIGHT": true, "SELECT": true, "SESSION_USER": true, "SIMILAR": true, "SOME": true, "SYMMETRIC": true, "TABLE": true, "THEN": true, "TRAILING": true, "TRUE": true, "UNION": true, "UNIQUE": true, "USER": true, "USING": true, "VERBOSE": true, "WHEN": true, "WHERE": true, }, "BQ": { "ALL": true, "AND": true, "ANY": true, "ARRAY": true, "AS": true, "ASC": true, "ASSERT_ROWS_MODIFIED": true, "AT": true, "BETWEEN": true, "BY": true, "CASE": true, "CAST": true, "COLLATE": true, "CONTAINS": true, "CREATE": true, "CROSS": true, "CUBE": true, "CURRENT": true, "DEFAULT": true, "DEFINE": true, "DESC": true, "DISTINCT": true, "ELSE": true, "END": true, "ENUM": true, "ESCAPE": true, "EXCEPT": true, "EXCLUDE": true, "EXISTS": true, "EXTRACT": true, "FALSE": true, "FETCH": true, "FOLLOWING": true, "FOR": true, "FROM": true, "FULL": true, "GROUP": true, "GROUPING": true, "GROUPS": true, "HASH": true, "HAVING": true, "IF": true, "IGNORE": true, "IN": true, "INNER": true, "INTERSECT": true, "INTERVAL": true, "INTO": true, "IS": true, "JOIN": true, "LATERAL": true, "LEFT": true, "LIKE": true, "LIMIT": true, "LOOKUP": true, "MERGE": true, "NATURAL": true, "NEW": true, "NO": true, "NOT": true, "NULL": true, "NULLS": true, "OF": true, "ON": true, "OR": true, "ORDER": true, "OUTER": true, "OVER": true, "PARTITION": true, "PRECEDING": true, "PROTO": true, "RANGE": true, "RECURSIVE": true, "RESPECT": true, "RIGHT": true, "ROLLUP": true, "ROWS": true, "SELECT": true, "SET": true, "SOME": true, "STRUCT": true, "TABLESAMPLE": true, "THEN": true, "TO": true, "TREAT": true, "TRUE": true, "UNBOUNDED": true, "UNION": true, "UNNEST": true, "USING": true, "WHEN": true, "WHERE": true, "WINDOW": true, "WITH": true, "WITHIN": true, }, }
var SnowflakeStorageMap = map[string]string{
"AWS": "S3",
"GCP": "GCS",
"AZURE": "AZURE_BLOB",
}
Functions ¶
func DoubleQuoteAndJoinByComma ¶ added in v0.1.10
func GetAzureBlobLocation ¶
GetAzureBlobLocation parses path-style location http url to return in azure:// format https://myproject.blob.core.windows.net/test-bucket/test-object.csv --> azure://myproject.blob.core.windows.net/test-bucket/test-object.csv
func GetAzureBlobLocationFolder ¶
GetAzureBlobLocationFolder returns the folder path for an azure storage object https://myproject.blob.core.windows.net/test-bucket/myfolder/test-object.csv --> azure://myproject.blob.core.windows.net/myfolder
func GetConfigValue ¶
func GetConfigValue(key string, warehouse WarehouseT) (val string)
func GetConfigValueAsMap ¶ added in v0.1.10
func GetConfigValueBoolString ¶ added in v0.1.10
func GetConfigValueBoolString(key string, warehouse WarehouseT) string
func GetFirstTiming ¶ added in v0.1.10
func GetFirstTiming(str sql.NullString) (status string, recordedTime time.Time)
func GetGCSLocation ¶
func GetGCSLocation(location string, options GCSLocationOptionsT) string
GetGCSLocation parses path-style location http url to return in gcs:// format https://storage.googleapis.com/test-bucket/test-object.csv --> gcs://test-bucket/test-object.csv tldFormat is used to set return format "<tldFormat>://..."
func GetGCSLocationFolder ¶
func GetGCSLocationFolder(location string, options GCSLocationOptionsT) string
GetGCSLocationFolder returns the folder path for an gcs object https://storage.googleapis.com/test-bucket/myfolder/test-object.csv --> gcs://test-bucket/myfolder
func GetGCSLocations ¶
func GetGCSLocations(loadFiles []LoadFileT, options GCSLocationOptionsT) (gcsLocations []string)
func GetLastFailedStatus ¶ added in v0.1.10
func GetLastFailedStatus(str sql.NullString) (status string)
func GetLastTiming ¶ added in v0.1.10
func GetLastTiming(str sql.NullString) (status string, recordedTime time.Time)
func GetLoadFileGenTime ¶ added in v0.1.10
func GetLoadFileGenTime(str sql.NullString) (t time.Time)
func GetNamespace ¶ added in v0.1.10
func GetNamespace(source backendconfig.SourceT, destination backendconfig.DestinationT, dbHandle *sql.DB) (namespace string, exists bool)
func GetObjectFolder ¶
GetObjectFolder returns the folder path for the storage object based on the storage provider eg. For provider as S3: https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv
func GetObjectLocation ¶ added in v0.1.10
GetObjectFolder returns the folder path for the storage object based on the storage provider eg. For provider as S3: https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv
func GetObjectName ¶ added in v0.1.10
func GetObjectName(location string, providerConfig interface{}, objectProvider string) (objectName string, err error)
GetObjectName extracts object/key objectName from different buckets locations ex: https://bucket-endpoint/bucket-name/object -> object
func GetParquetValue ¶ added in v0.1.10
func GetS3Location ¶
GetS3Location parses path-style location http url to return in s3:// format https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv
func GetS3LocationFolder ¶
GetS3LocationFolder returns the folder path for an s3 object https://test-bucket.s3.amazonaws.com/myfolder/test-object.csv --> s3://test-bucket/myfolder
func GetSlaveWorkerId ¶
func GetTableFirstEventAt ¶ added in v0.1.10
func GetTablePathInObjectStorage ¶ added in v0.1.10
GetTablePathInObjectStorage returns the path of the table relative to the object storage bucket for location - "s3://testbucket/rudder-datalake/namespace/tableName/" - it returns "rudder-datalake/namespace/tableName"
func GetTempFileExtension ¶ added in v0.1.10
func GetWarehouseIdentifier ¶ added in v0.1.10
func IDResolutionEnabled ¶ added in v0.1.10
func IDResolutionEnabled() bool
func IdentityMappingsTableName ¶ added in v0.1.10
func IdentityMappingsTableName(warehouse WarehouseT) string
func IdentityMappingsUniqueMappingConstraintName ¶ added in v0.1.10
func IdentityMappingsUniqueMappingConstraintName(warehouse WarehouseT) string
func IdentityMappingsWarehouseTableName ¶ added in v0.1.10
func IdentityMergeRulesTableName ¶ added in v0.1.10
func IdentityMergeRulesTableName(warehouse WarehouseT) string
func IdentityMergeRulesWarehouseTableName ¶ added in v0.1.10
func JSONSchemaToMap ¶
func JSONSchemaToMap(rawMsg json.RawMessage) map[string]map[string]string
func JSONTimingsToMap ¶ added in v0.1.10
func JSONTimingsToMap(rawMsg json.RawMessage) []map[string]string
func NewCsvReader ¶ added in v0.1.10
func NewEventReader ¶ added in v0.1.10
func NewJSONReader ¶ added in v0.1.10
func ObjectStorageType ¶
func SnowflakeCloudProvider ¶
func SnowflakeCloudProvider(config interface{}) string
func SortColumnKeysFromColumnMap ¶ added in v0.1.10
func TimingFromJSONString ¶ added in v0.1.10
func TimingFromJSONString(str sql.NullString) (status string, recordedTime time.Time)
func ToProviderCase ¶ added in v0.1.10
ToProviderCase converts string provided to case generally accepted in the warehouse for table, column, schema names etc eg. columns are uppercased in SNOWFLAKE and lowercased etc in REDSHIFT, BIGQUERY etc
func ToSafeNamespace ¶ added in v0.1.10
ToSafeNamespace convert name of the namespace to one acceptable by warehouse 1. removes symbols and joins continuous letters and numbers with single underscore and if first char is a number will append a underscore before the first number 2. adds an underscore if the name is a reserved keyword in the warehouse 3. truncate the length of namespace to 127 characters 4. return "stringempty" if name is empty after conversion examples: omega to omega omega v2 to omega_v2 9mega to _9mega mega& to mega ome$ga to ome_ga omega$ to omega ome_ ga to ome_ga 9mega________-________90 to _9mega_90 Cízǔ to C_z
Types ¶
type CsvLoader ¶ added in v0.1.10
type CsvLoader struct {
// contains filtered or unexported fields
}
CsvLoader is common for non-BQ warehouses. If you need any custom logic, either extend this or use destType and if/else/switch.
func NewCSVLoader ¶ added in v0.1.10
func NewCSVLoader(destType string, writer LoadFileWriterI) *CsvLoader
func (*CsvLoader) AddEmptyColumn ¶ added in v0.1.10
func (*CsvLoader) GetLoadTimeFomat ¶ added in v0.1.10
func (*CsvLoader) IsLoadTimeColumn ¶ added in v0.1.10
func (*CsvLoader) WriteToString ¶ added in v0.1.10
type DestinationT ¶
type DestinationT struct { Source backendconfig.SourceT Destination backendconfig.DestinationT }
type EventLoader ¶ added in v0.1.10
type EventLoader interface { IsLoadTimeColumn(columnName string) bool GetLoadTimeFomat(columnName string) string AddColumn(columnName string, columnType string, val interface{}) AddRow(columnNames []string, values []string) AddEmptyColumn(columnName string) WriteToString() (string, error) Write() error }
func GetNewEventLoader ¶ added in v0.1.10
func GetNewEventLoader(destinationType, loadFileType string, w LoadFileWriterI) EventLoader
type GCSLocationOptionsT ¶
type GCSLocationOptionsT struct {
TLDFormat string
}
type GetLoadFilesOptionsT ¶ added in v0.1.10
type JsonLoader ¶ added in v0.1.10
type JsonLoader struct {
// contains filtered or unexported fields
}
JsonLoader is only for BQ now. Treat this is as custom BQ loader. If more warehouses are added in future, change this accordingly.
func NewJSONLoader ¶ added in v0.1.10
func NewJSONLoader(destType string, writer LoadFileWriterI) *JsonLoader
func (*JsonLoader) AddColumn ¶ added in v0.1.10
func (loader *JsonLoader) AddColumn(columnName string, columnType string, val interface{})
func (*JsonLoader) AddEmptyColumn ¶ added in v0.1.10
func (loader *JsonLoader) AddEmptyColumn(columnName string)
func (*JsonLoader) AddRow ¶ added in v0.1.10
func (loader *JsonLoader) AddRow(columnNames []string, row []string)
func (*JsonLoader) GetLoadTimeFomat ¶ added in v0.1.10
func (loader *JsonLoader) GetLoadTimeFomat(columnName string) string
func (*JsonLoader) IsLoadTimeColumn ¶ added in v0.1.10
func (loader *JsonLoader) IsLoadTimeColumn(columnName string) bool
func (*JsonLoader) Write ¶ added in v0.1.10
func (loader *JsonLoader) Write() error
func (*JsonLoader) WriteToString ¶ added in v0.1.10
func (loader *JsonLoader) WriteToString() (string, error)
type LoadFileT ¶ added in v0.1.10
type LoadFileT struct { Location string Metadata json.RawMessage }
func GetS3Locations ¶
type LoadFileWriterI ¶ added in v0.1.10
type ParquetLoader ¶ added in v0.1.10
type ParquetLoader struct { Schema []string Values []interface{} FileWriter LoadFileWriterI // contains filtered or unexported fields }
ParquetLoader is used for generating parquet load files.
func NewParquetLoader ¶ added in v0.1.10
func NewParquetLoader(destType string, w LoadFileWriterI) *ParquetLoader
func (*ParquetLoader) AddColumn ¶ added in v0.1.10
func (loader *ParquetLoader) AddColumn(columnName string, colType string, val interface{})
func (*ParquetLoader) AddEmptyColumn ¶ added in v0.1.10
func (loader *ParquetLoader) AddEmptyColumn(columnName string)
func (*ParquetLoader) AddRow ¶ added in v0.1.10
func (loader *ParquetLoader) AddRow(columnNames []string, row []string)
func (*ParquetLoader) GetLoadTimeFomat ¶ added in v0.1.10
func (loader *ParquetLoader) GetLoadTimeFomat(columnName string) string
func (*ParquetLoader) IsLoadTimeColumn ¶ added in v0.1.10
func (loader *ParquetLoader) IsLoadTimeColumn(columnName string) bool
func (*ParquetLoader) Write ¶ added in v0.1.10
func (loader *ParquetLoader) Write() error
func (*ParquetLoader) WriteToString ¶ added in v0.1.10
func (loader *ParquetLoader) WriteToString() (string, error)
type ParquetWriter ¶ added in v0.1.10
type ParquetWriter struct {
// contains filtered or unexported fields
}
func CreateParquetWriter ¶ added in v0.1.10
func CreateParquetWriter(schema TableSchemaT, outputFilePath string, destType string) (*ParquetWriter, error)
func (*ParquetWriter) Close ¶ added in v0.1.10
func (p *ParquetWriter) Close() error
func (*ParquetWriter) GetLoadFile ¶ added in v0.1.10
func (p *ParquetWriter) GetLoadFile() *os.File
func (*ParquetWriter) WriteGZ ¶ added in v0.1.10
func (p *ParquetWriter) WriteGZ(s string) error
func (*ParquetWriter) WriteRow ¶ added in v0.1.10
func (p *ParquetWriter) WriteRow(row []interface{}) error
type PendingEventsRequestT ¶ added in v0.1.10
type PendingEventsResponseT ¶ added in v0.1.10
type QueryResult ¶ added in v0.1.10
type StagingFileT ¶
type StagingFileT struct { Schema map[string]map[string]interface{} BatchDestination DestinationT Location string FirstEventAt string LastEventAt string TotalEvents int UseRudderStorage bool // cloud sources specific info SourceBatchID string SourceTaskID string SourceTaskRunID string SourceJobID string SourceJobRunID string TimeWindow time.Time }
type TableSchemaDiffT ¶ added in v0.1.10
type TableSchemaT ¶ added in v0.1.10
type TriggerUploadRequestT ¶ added in v0.1.10
type UploaderI ¶ added in v0.1.10
type UploaderI interface { GetSchemaInWarehouse() SchemaT GetLocalSchema() SchemaT UpdateLocalSchema(schema SchemaT) error GetTableSchemaInWarehouse(tableName string) TableSchemaT GetTableSchemaInUpload(tableName string) TableSchemaT GetLoadFilesMetadata(options GetLoadFilesOptionsT) []LoadFileT GetSampleLoadFileLocation(tableName string) (string, error) GetSingleLoadFile(tableName string) (LoadFileT, error) ShouldOnDedupUseNewRecord() bool UseRudderStorage() bool GetLoadFileGenStartTIme() time.Time GetLoadFileType() string }
type WarehouseT ¶
type WarehouseT struct { Source backendconfig.SourceT Destination backendconfig.DestinationT Namespace string Type string Identifier string }