Documentation ¶
Index ¶
- Constants
- Variables
- func Datatype(in interface{}) string
- func DestStat(statType string, statName string, id string) *stats.RudderStats
- func GetAzureBlobLocation(location string) string
- func GetAzureBlobLocationFolder(location string) string
- func GetConfigValue(key string, warehouse WarehouseT) (val string)
- func GetGCSLocation(location string, options GCSLocationOptionsT) string
- func GetGCSLocationFolder(location string, options GCSLocationOptionsT) string
- func GetGCSLocations(locations []string, options GCSLocationOptionsT) (gcsLocations []string)
- func GetIP() string
- func GetLoadFileLocation(dbHandle *sql.DB, sourceId string, destinationId string, tableName string, ...) (location string, err error)
- func GetLoadFileLocations(dbHandle *sql.DB, sourceId string, destinationId string, tableName string, ...) (locations []string, err error)
- func GetNewTimings(upload UploadT, dbHandle *sql.DB, status string) []byte
- func GetObjectFolder(provider string, location string) (folder string)
- func GetS3Location(location string) (s3Location string, region string)
- func GetS3LocationFolder(location string) string
- func GetS3Locations(locations []string) (s3Locations []string)
- func GetSlaveWorkerId(workerIdx int, slaveID string) string
- func GetTableUploadStatus(uploadID int64, tableName string, dbHandle *sql.DB) (status string, err error)
- func GetUploadTimings(upload UploadT, dbHandle *sql.DB) (timings []map[string]string)
- func HasLoadFiles(dbHandle *sql.DB, sourceId string, destinationId string, tableName string, ...) bool
- func JSONSchemaToMap(rawMsg json.RawMessage) map[string]map[string]string
- func ObjectStorageType(destType string, config interface{}) string
- func SetStagingFilesError(ids []int64, status string, dbHandle *sql.DB, statusError error) (err error)
- func SetStagingFilesStatus(ids []int64, status string, dbHandle *sql.DB) (err error)
- func SetTableUploadError(status string, uploadID int64, tableName string, statusError error, ...) (err error)
- func SetTableUploadStatus(status string, uploadID int64, tableName string, dbHandle *sql.DB) (err error)
- func SetUploadColumns(upload UploadT, dbHandle *sql.DB, fields ...UploadColumnT) (err error)
- func SetUploadError(upload UploadT, statusError error, state string, dbHandle *sql.DB) (err error)
- func SetUploadStatus(upload UploadT, status string, dbHandle *sql.DB, ...) (err error)
- func SnowflakeCloudProvider(config interface{}) string
- func ToCase(provider string, str string) string
- func ToSafeDBString(provider string, str string) string
- func UpdateCurrentSchema(namespace string, wh WarehouseT, uploadID int64, ...) (err error)
- type ConfigT
- type CurrentSchemaT
- type DestinationT
- type GCSLocationOptionsT
- type SchemaDiffT
- type StagingFileT
- type UploadColumnT
- type UploadT
- type WarehouseT
Constants ¶
const ( WaitingState = "waiting" ExecutingState = "executing" GeneratingLoadFileState = "generating_load_file" GeneratingLoadFileFailedState = "generating_load_file_failed" GeneratedLoadFileState = "generated_load_file" UpdatingSchemaState = "updating_schema" UpdatingSchemaFailedState = "updating_schema_failed" UpdatedSchemaState = "updated_schema" ExportingDataState = "exporting_data" ExportingDataFailedState = "exporting_data_failed" ExportedDataState = "exported_data" AbortedState = "aborted" GeneratingStagingFileFailed = "generating_staging_file_failed" GeneratedStagingFile = "generated_staging_file" )
const ( StagingFileSucceededState = "succeeded" StagingFileFailedState = "failed" StagingFileExecutingState = "executing" StagingFileAbortedState = "aborted" StagingFileWaitingState = "waiting" )
const ( DiscardsTable = "rudder_discards" SyncFrequency = "syncFrequency" SyncStartAt = "syncStartAt" )
const ( UploadStatusField = "status" UploadStartLoadFileIDField = "start_load_file_id" UploadEndLoadFileIDField = "end_load_file_id" UploadUpdatedAtField = "updated_at" UploadTimingsField = "timings" UploadSchemaField = "schema" UploadLastExecAtField = "last_exec_at" )
Variables ¶
var ObjectStorageMap = map[string]string{
"RS": "S3",
"BQ": "GCS",
}
var ReservedKeywords = map[string]map[string]bool{ "SNOWFLAKE": { "ACCOUNT": true, "ALL": true, "ALTER": true, "AND": true, "ANY": true, "AS": true, "BETWEEN": true, "BY": true, "CASE": true, "CAST": true, "CHECK": true, "COLUMN": true, "CONNECT": true, "CONNECTION": true, "CONSTRAINT": true, "CREATE": true, "CROSS": true, "CURRENT": true, "CURRENT_DATE": true, "CURRENT_TIME": true, "CURRENT_TIMESTAMP": true, "CURRENT_USER": true, "DATABASE": true, "DELETE": true, "DISTINCT": true, "DROP": true, "ELSE": true, "EXISTS": true, "FALSE": true, "FOLLOWING": true, "FOR": true, "FROM": true, "FULL": true, "GRANT": true, "GROUP": true, "GSCLUSTER": true, "HAVING": true, "ILIKE": true, "IN": true, "INCREMENT": true, "INNER": true, "INSERT": true, "INTERSECT": true, "INTO": true, "IS": true, "ISSUE": true, "JOIN": true, "LATERAL": true, "LEFT": true, "LIKE": true, "LOCALTIME": true, "LOCALTIMESTAMP": true, "MINUS": true, "NATURAL": true, "NOT": true, "NULL": true, "OF": true, "ON": true, "OR": true, "ORDER": true, "ORGANIZATION": true, "QUALIFY": true, "REGEXP": true, "REVOKE": true, "RIGHT": true, "RLIKE": true, "ROW": true, "ROWS": true, "SAMPLE": true, "SCHEMA": true, "SELECT": true, "SET": true, "SOME": true, "START": true, "TABLE": true, "TABLESAMPLE": true, "THEN": true, "TO": true, "TRIGGER": true, "TRUE": true, "TRY_CAST": true, "UNION": true, "UNIQUE": true, "UPDATE": true, "USING": true, "VALUES": true, "VIEW": true, "WHEN": true, "WHENEVER": true, "WHERE": true, "WITH": true, }, "RS": { "AES128": true, "AES256": true, "ALL": true, "ALLOWOVERWRITE": true, "ANALYSE": true, "ANALYZE": true, "AND": true, "ANY": true, "ARRAY": true, "AS": true, "ASC": true, "AUTHORIZATION": true, "AZ64": true, "BACKUP": true, "BETWEEN": true, "BINARY": true, "BLANKSASNULL": true, "BOTH": true, "BYTEDICT": true, "BZIP2": true, "CASE": true, "CAST": true, "CHECK": true, "COLLATE": true, "COLUMN": true, "CONSTRAINT": true, "CREATE": true, "CREDENTIALS": true, "CROSS": true, "CURRENT_DATE": true, "CURRENT_TIME": true, "CURRENT_TIMESTAMP": true, "CURRENT_USER": true, "CURRENT_USER_ID": true, "DEFAULT": true, "DEFERRABLE": true, "DEFLATE": true, "DEFRAG": true, "DELTA": true, "DELTA32K": true, "DESC": true, "DISABLE": true, "DISTINCT": true, "DO": true, "ELSE": true, "EMPTYASNULL": true, "ENABLE": true, "ENCODE": true, "ENCRYPT ": true, "ENCRYPTION": true, "END": true, "EXCEPT": true, "EXPLICIT": true, "FALSE": true, "FOR": true, "FOREIGN": true, "FREEZE": true, "FROM": true, "FULL": true, "GLOBALDICT256": true, "GLOBALDICT64K": true, "GRANT": true, "GROUP": true, "GZIP": true, "HAVING": true, "IDENTITY": true, "IGNORE": true, "ILIKE": true, "IN": true, "INITIALLY": true, "INNER": true, "INTERSECT": true, "INTO": true, "IS": true, "ISNULL": true, "JOIN": true, "LANGUAGE": true, "LEADING": true, "LEFT": true, "LIKE": true, "LIMIT": true, "LOCALTIME": true, "LOCALTIMESTAMP": true, "LUN": true, "LUNS": true, "LZO": true, "LZOP": true, "MINUS": true, "MOSTLY13": true, "MOSTLY32": true, "MOSTLY8": true, "NATURAL": true, "NEW": true, "NOT": true, "NOTNULL": true, "NULL": true, "NULLS": true, "OFF": true, "OFFLINE": true, "OFFSET": true, "OID": true, "OLD": true, "ON": true, "ONLY": true, "OPEN": true, "OR": true, "ORDER": true, "OUTER": true, "OVERLAPS": true, "PARALLEL": true, "PARTITION": true, "PERCENT": true, "PERMISSIONS": true, "PLACING": true, "PRIMARY": true, "RAW": true, "READRATIO": true, "RECOVER": true, "REFERENCES": true, "RESPECT": true, "REJECTLOG": true, "RESORT": true, "RESTORE": true, "RIGHT": true, "SELECT": true, "SESSION_USER": true, "SIMILAR": true, "SNAPSHOT ": true, "SOME": true, "SYSDATE": true, "SYSTEM": true, "TABLE": true, "TAG": true, "TDES": true, "TEXT255": true, "TEXT32K": true, "THEN": true, "TIMESTAMP": true, "TO": true, "TOP": true, "TRAILING": true, "TRUE": true, "TRUNCATECOLUMNS": true, "UNION": true, "UNIQUE": true, "USER": true, "USING": true, "VERBOSE": true, "WALLET": true, "WHEN": true, "WHERE": true, "WITH": true, "WITHOUT": true, }, "BQ": { "ALL": true, "AND": true, "ANY": true, "ARRAY": true, "AS": true, "ASC": true, "ASSERT_ROWS_MODIFIED": true, "AT": true, "BETWEEN": true, "BY": true, "CASE": true, "CAST": true, "COLLATE": true, "CONTAINS": true, "CREATE": true, "CROSS": true, "CUBE": true, "CURRENT": true, "DEFAULT": true, "DEFINE": true, "DESC": true, "DISTINCT": true, "ELSE": true, "END": true, "ENUM": true, "ESCAPE": true, "EXCEPT": true, "EXCLUDE": true, "EXISTS": true, "EXTRACT": true, "FALSE": true, "FETCH": true, "FOLLOWING": true, "FOR": true, "FROM": true, "FULL": true, "GROUP": true, "GROUPING": true, "GROUPS": true, "HASH": true, "HAVING": true, "IF": true, "IGNORE": true, "IN": true, "INNER": true, "INTERSECT": true, "INTERVAL": true, "INTO": true, "IS": true, "JOIN": true, "LATERAL": true, "LEFT": true, "LIKE": true, "LIMIT": true, "LOOKUP": true, "MERGE": true, "NATURAL": true, "NEW": true, "NO": true, "NOT": true, "NULL": true, "NULLS": true, "OF": true, "ON": true, "OR": true, "ORDER": true, "OUTER": true, "OVER": true, "PARTITION": true, "PRECEDING": true, "PROTO": true, "RANGE": true, "RECURSIVE": true, "RESPECT": true, "RIGHT": true, "ROLLUP": true, "ROWS": true, "SELECT": true, "SET": true, "SOME": true, "STRUCT": true, "TABLESAMPLE": true, "THEN": true, "TO": true, "TREAT": true, "TRUE": true, "UNBOUNDED": true, "UNION": true, "UNNEST": true, "USING": true, "WHEN": true, "WHERE": true, "WINDOW": true, "WITH": true, "WITHIN": true, }, }
var SnowflakeStorageMap = map[string]string{
"AWS": "S3",
"GCP": "GCS",
"AZURE": "AZURE_BLOB",
}
Functions ¶
func GetAzureBlobLocation ¶
GetAzureBlobLocation parses path-style location http url to return in azure:// format https://myproject.blob.core.windows.net/test-bucket/test-object.csv --> azure://myproject.blob.core.windows.net/test-bucket/test-object.csv
func GetAzureBlobLocationFolder ¶
GetAzureBlobLocationFolder returns the folder path for an azure storage object https://myproject.blob.core.windows.net/test-bucket/myfolder/test-object.csv --> azure://myproject.blob.core.windows.net/myfolder
func GetConfigValue ¶
func GetConfigValue(key string, warehouse WarehouseT) (val string)
func GetGCSLocation ¶
func GetGCSLocation(location string, options GCSLocationOptionsT) string
GetGCSLocation parses path-style location http url to return in gcs:// format https://storage.googleapis.com/test-bucket/test-object.csv --> gcs://test-bucket/test-object.csv tldFormat is used to set return format "<tldFormat>://..."
func GetGCSLocationFolder ¶
func GetGCSLocationFolder(location string, options GCSLocationOptionsT) string
GetGCSLocationFolder returns the folder path for an gcs object https://storage.googleapis.com/test-bucket/myfolder/test-object.csv --> gcs://test-bucket/myfolder
func GetGCSLocations ¶
func GetGCSLocations(locations []string, options GCSLocationOptionsT) (gcsLocations []string)
func GetLoadFileLocation ¶
func GetLoadFileLocations ¶
func GetNewTimings ¶
GetNewTimings appends current status with current time to timings column eg. status: exported_data, timings: [{exporting_data: 2020-04-21 15:16:19.687716] -> [{exporting_data: 2020-04-21 15:16:19.687716, exported_data: 2020-04-21 15:26:34.344356}]
func GetObjectFolder ¶
GetObjectFolder returns the folder path for the storage object based on the storage provider eg. For provider as S3: https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv
func GetS3Location ¶
GetS3Location parses path-style location http url to return in s3:// format https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv
func GetS3LocationFolder ¶
GetS3LocationFolder returns the folder path for an s3 object https://test-bucket.s3.amazonaws.com/myfolder/test-object.csv --> s3://test-bucket/myfolder
func GetS3Locations ¶
func GetSlaveWorkerId ¶
func GetTableUploadStatus ¶
func GetUploadTimings ¶
GetUploadTimings returns timings json column eg. timings: [{exporting_data: 2020-04-21 15:16:19.687716, exported_data: 2020-04-21 15:26:34.344356}]
func HasLoadFiles ¶
func JSONSchemaToMap ¶
func JSONSchemaToMap(rawMsg json.RawMessage) map[string]map[string]string
func ObjectStorageType ¶
func SetStagingFilesError ¶
func SetStagingFilesStatus ¶
func SetTableUploadError ¶
func SetTableUploadStatus ¶
func SetUploadColumns ¶
func SetUploadColumns(upload UploadT, dbHandle *sql.DB, fields ...UploadColumnT) (err error)
SetUploadColumns sets any column values passed as args in UploadColumnT format for warehouseUploadsTable
func SetUploadError ¶
func SetUploadStatus ¶
func SnowflakeCloudProvider ¶
func SnowflakeCloudProvider(config interface{}) string
func ToSafeDBString ¶
ToSafeDBString to remove special characters
Types ¶
type ConfigT ¶
type ConfigT struct { DbHandle *sql.DB Upload UploadT Warehouse WarehouseT Stage string }
type CurrentSchemaT ¶
func GetCurrentSchema ¶
func GetCurrentSchema(dbHandle *sql.DB, warehouse WarehouseT) (CurrentSchemaT, error)
type DestinationT ¶
type DestinationT struct { Source backendconfig.SourceT Destination backendconfig.DestinationT }
type GCSLocationOptionsT ¶
type GCSLocationOptionsT struct {
TLDFormat string
}
type SchemaDiffT ¶
type SchemaDiffT struct { Tables []string ColumnMaps map[string]map[string]string UpdatedSchema map[string]map[string]string }
func GetSchemaDiff ¶
func GetSchemaDiff(currentSchema, uploadSchema map[string]map[string]string) (diff SchemaDiffT)
type StagingFileT ¶
type UploadColumnT ¶
type UploadColumnT struct { Column string Value interface{} }
type WarehouseT ¶
type WarehouseT struct { Source backendconfig.SourceT Destination backendconfig.DestinationT }