warehouseutils

package
v0.1.9-patch-1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2020 License: AGPL-3.0 Imports: 13 Imported by: 1

Documentation

Index

Constants

View Source
const (
	WaitingState                  = "waiting"
	ExecutingState                = "executing"
	GeneratingLoadFileState       = "generating_load_file"
	GeneratingLoadFileFailedState = "generating_load_file_failed"
	GeneratedLoadFileState        = "generated_load_file"
	UpdatingSchemaState           = "updating_schema"
	UpdatingSchemaFailedState     = "updating_schema_failed"
	UpdatedSchemaState            = "updated_schema"
	ExportingDataState            = "exporting_data"
	ExportingDataFailedState      = "exporting_data_failed"
	ExportedDataState             = "exported_data"
	AbortedState                  = "aborted"
	GeneratingStagingFileFailed   = "generating_staging_file_failed"
	GeneratedStagingFile          = "generated_staging_file"
)
View Source
const (
	StagingFileSucceededState = "succeeded"
	StagingFileFailedState    = "failed"
	StagingFileExecutingState = "executing"
	StagingFileAbortedState   = "aborted"
	StagingFileWaitingState   = "waiting"
)
View Source
const (
	DiscardsTable = "rudder_discards"
	SyncFrequency = "syncFrequency"
	SyncStartAt   = "syncStartAt"
)
View Source
const (
	UploadStatusField          = "status"
	UploadStartLoadFileIDField = "start_load_file_id"
	UploadEndLoadFileIDField   = "end_load_file_id"
	UploadUpdatedAtField       = "updated_at"
	UploadTimingsField         = "timings"
	UploadSchemaField          = "schema"
	UploadLastExecAtField      = "last_exec_at"
)

Variables

View Source
var ObjectStorageMap = map[string]string{
	"RS": "S3",
	"BQ": "GCS",
}
View Source
var ReservedKeywords = map[string]map[string]bool{
	"SNOWFLAKE": {
		"ACCOUNT":           true,
		"ALL":               true,
		"ALTER":             true,
		"AND":               true,
		"ANY":               true,
		"AS":                true,
		"BETWEEN":           true,
		"BY":                true,
		"CASE":              true,
		"CAST":              true,
		"CHECK":             true,
		"COLUMN":            true,
		"CONNECT":           true,
		"CONNECTION":        true,
		"CONSTRAINT":        true,
		"CREATE":            true,
		"CROSS":             true,
		"CURRENT":           true,
		"CURRENT_DATE":      true,
		"CURRENT_TIME":      true,
		"CURRENT_TIMESTAMP": true,
		"CURRENT_USER":      true,
		"DATABASE":          true,
		"DELETE":            true,
		"DISTINCT":          true,
		"DROP":              true,
		"ELSE":              true,
		"EXISTS":            true,
		"FALSE":             true,
		"FOLLOWING":         true,
		"FOR":               true,
		"FROM":              true,
		"FULL":              true,
		"GRANT":             true,
		"GROUP":             true,
		"GSCLUSTER":         true,
		"HAVING":            true,
		"ILIKE":             true,
		"IN":                true,
		"INCREMENT":         true,
		"INNER":             true,
		"INSERT":            true,
		"INTERSECT":         true,
		"INTO":              true,
		"IS":                true,
		"ISSUE":             true,
		"JOIN":              true,
		"LATERAL":           true,
		"LEFT":              true,
		"LIKE":              true,
		"LOCALTIME":         true,
		"LOCALTIMESTAMP":    true,
		"MINUS":             true,
		"NATURAL":           true,
		"NOT":               true,
		"NULL":              true,
		"OF":                true,
		"ON":                true,
		"OR":                true,
		"ORDER":             true,
		"ORGANIZATION":      true,
		"QUALIFY":           true,
		"REGEXP":            true,
		"REVOKE":            true,
		"RIGHT":             true,
		"RLIKE":             true,
		"ROW":               true,
		"ROWS":              true,
		"SAMPLE":            true,
		"SCHEMA":            true,
		"SELECT":            true,
		"SET":               true,
		"SOME":              true,
		"START":             true,
		"TABLE":             true,
		"TABLESAMPLE":       true,
		"THEN":              true,
		"TO":                true,
		"TRIGGER":           true,
		"TRUE":              true,
		"TRY_CAST":          true,
		"UNION":             true,
		"UNIQUE":            true,
		"UPDATE":            true,
		"USING":             true,
		"VALUES":            true,
		"VIEW":              true,
		"WHEN":              true,
		"WHENEVER":          true,
		"WHERE":             true,
		"WITH":              true,
	},
	"RS": {
		"AES128":            true,
		"AES256":            true,
		"ALL":               true,
		"ALLOWOVERWRITE":    true,
		"ANALYSE":           true,
		"ANALYZE":           true,
		"AND":               true,
		"ANY":               true,
		"ARRAY":             true,
		"AS":                true,
		"ASC":               true,
		"AUTHORIZATION":     true,
		"AZ64":              true,
		"BACKUP":            true,
		"BETWEEN":           true,
		"BINARY":            true,
		"BLANKSASNULL":      true,
		"BOTH":              true,
		"BYTEDICT":          true,
		"BZIP2":             true,
		"CASE":              true,
		"CAST":              true,
		"CHECK":             true,
		"COLLATE":           true,
		"COLUMN":            true,
		"CONSTRAINT":        true,
		"CREATE":            true,
		"CREDENTIALS":       true,
		"CROSS":             true,
		"CURRENT_DATE":      true,
		"CURRENT_TIME":      true,
		"CURRENT_TIMESTAMP": true,
		"CURRENT_USER":      true,
		"CURRENT_USER_ID":   true,
		"DEFAULT":           true,
		"DEFERRABLE":        true,
		"DEFLATE":           true,
		"DEFRAG":            true,
		"DELTA":             true,
		"DELTA32K":          true,
		"DESC":              true,
		"DISABLE":           true,
		"DISTINCT":          true,
		"DO":                true,
		"ELSE":              true,
		"EMPTYASNULL":       true,
		"ENABLE":            true,
		"ENCODE":            true,
		"ENCRYPT     ":      true,
		"ENCRYPTION":        true,
		"END":               true,
		"EXCEPT":            true,
		"EXPLICIT":          true,
		"FALSE":             true,
		"FOR":               true,
		"FOREIGN":           true,
		"FREEZE":            true,
		"FROM":              true,
		"FULL":              true,
		"GLOBALDICT256":     true,
		"GLOBALDICT64K":     true,
		"GRANT":             true,
		"GROUP":             true,
		"GZIP":              true,
		"HAVING":            true,
		"IDENTITY":          true,
		"IGNORE":            true,
		"ILIKE":             true,
		"IN":                true,
		"INITIALLY":         true,
		"INNER":             true,
		"INTERSECT":         true,
		"INTO":              true,
		"IS":                true,
		"ISNULL":            true,
		"JOIN":              true,
		"LANGUAGE":          true,
		"LEADING":           true,
		"LEFT":              true,
		"LIKE":              true,
		"LIMIT":             true,
		"LOCALTIME":         true,
		"LOCALTIMESTAMP":    true,
		"LUN":               true,
		"LUNS":              true,
		"LZO":               true,
		"LZOP":              true,
		"MINUS":             true,
		"MOSTLY13":          true,
		"MOSTLY32":          true,
		"MOSTLY8":           true,
		"NATURAL":           true,
		"NEW":               true,
		"NOT":               true,
		"NOTNULL":           true,
		"NULL":              true,
		"NULLS":             true,
		"OFF":               true,
		"OFFLINE":           true,
		"OFFSET":            true,
		"OID":               true,
		"OLD":               true,
		"ON":                true,
		"ONLY":              true,
		"OPEN":              true,
		"OR":                true,
		"ORDER":             true,
		"OUTER":             true,
		"OVERLAPS":          true,
		"PARALLEL":          true,
		"PARTITION":         true,
		"PERCENT":           true,
		"PERMISSIONS":       true,
		"PLACING":           true,
		"PRIMARY":           true,
		"RAW":               true,
		"READRATIO":         true,
		"RECOVER":           true,
		"REFERENCES":        true,
		"RESPECT":           true,
		"REJECTLOG":         true,
		"RESORT":            true,
		"RESTORE":           true,
		"RIGHT":             true,
		"SELECT":            true,
		"SESSION_USER":      true,
		"SIMILAR":           true,
		"SNAPSHOT ":         true,
		"SOME":              true,
		"SYSDATE":           true,
		"SYSTEM":            true,
		"TABLE":             true,
		"TAG":               true,
		"TDES":              true,
		"TEXT255":           true,
		"TEXT32K":           true,
		"THEN":              true,
		"TIMESTAMP":         true,
		"TO":                true,
		"TOP":               true,
		"TRAILING":          true,
		"TRUE":              true,
		"TRUNCATECOLUMNS":   true,
		"UNION":             true,
		"UNIQUE":            true,
		"USER":              true,
		"USING":             true,
		"VERBOSE":           true,
		"WALLET":            true,
		"WHEN":              true,
		"WHERE":             true,
		"WITH":              true,
		"WITHOUT":           true,
	},
	"BQ": {
		"ALL":                  true,
		"AND":                  true,
		"ANY":                  true,
		"ARRAY":                true,
		"AS":                   true,
		"ASC":                  true,
		"ASSERT_ROWS_MODIFIED": true,
		"AT":                   true,
		"BETWEEN":              true,
		"BY":                   true,
		"CASE":                 true,
		"CAST":                 true,
		"COLLATE":              true,
		"CONTAINS":             true,
		"CREATE":               true,
		"CROSS":                true,
		"CUBE":                 true,
		"CURRENT":              true,
		"DEFAULT":              true,
		"DEFINE":               true,
		"DESC":                 true,
		"DISTINCT":             true,
		"ELSE":                 true,
		"END":                  true,
		"ENUM":                 true,
		"ESCAPE":               true,
		"EXCEPT":               true,
		"EXCLUDE":              true,
		"EXISTS":               true,
		"EXTRACT":              true,
		"FALSE":                true,
		"FETCH":                true,
		"FOLLOWING":            true,
		"FOR":                  true,
		"FROM":                 true,
		"FULL":                 true,
		"GROUP":                true,
		"GROUPING":             true,
		"GROUPS":               true,
		"HASH":                 true,
		"HAVING":               true,
		"IF":                   true,
		"IGNORE":               true,
		"IN":                   true,
		"INNER":                true,
		"INTERSECT":            true,
		"INTERVAL":             true,
		"INTO":                 true,
		"IS":                   true,
		"JOIN":                 true,
		"LATERAL":              true,
		"LEFT":                 true,
		"LIKE":                 true,
		"LIMIT":                true,
		"LOOKUP":               true,
		"MERGE":                true,
		"NATURAL":              true,
		"NEW":                  true,
		"NO":                   true,
		"NOT":                  true,
		"NULL":                 true,
		"NULLS":                true,
		"OF":                   true,
		"ON":                   true,
		"OR":                   true,
		"ORDER":                true,
		"OUTER":                true,
		"OVER":                 true,
		"PARTITION":            true,
		"PRECEDING":            true,
		"PROTO":                true,
		"RANGE":                true,
		"RECURSIVE":            true,
		"RESPECT":              true,
		"RIGHT":                true,
		"ROLLUP":               true,
		"ROWS":                 true,
		"SELECT":               true,
		"SET":                  true,
		"SOME":                 true,
		"STRUCT":               true,
		"TABLESAMPLE":          true,
		"THEN":                 true,
		"TO":                   true,
		"TREAT":                true,
		"TRUE":                 true,
		"UNBOUNDED":            true,
		"UNION":                true,
		"UNNEST":               true,
		"USING":                true,
		"WHEN":                 true,
		"WHERE":                true,
		"WINDOW":               true,
		"WITH":                 true,
		"WITHIN":               true,
	},
}
View Source
var SnowflakeStorageMap = map[string]string{
	"AWS":   "S3",
	"GCP":   "GCS",
	"AZURE": "AZURE_BLOB",
}

Functions

func Datatype

func Datatype(in interface{}) string

func DestStat

func DestStat(statType string, statName string, id string) *stats.RudderStats

func GetAzureBlobLocation

func GetAzureBlobLocation(location string) string

GetAzureBlobLocation parses path-style location http url to return in azure:// format https://myproject.blob.core.windows.net/test-bucket/test-object.csv --> azure://myproject.blob.core.windows.net/test-bucket/test-object.csv

func GetAzureBlobLocationFolder

func GetAzureBlobLocationFolder(location string) string

GetAzureBlobLocationFolder returns the folder path for an azure storage object https://myproject.blob.core.windows.net/test-bucket/myfolder/test-object.csv --> azure://myproject.blob.core.windows.net/myfolder

func GetConfigValue

func GetConfigValue(key string, warehouse WarehouseT) (val string)

func GetGCSLocation

func GetGCSLocation(location string, options GCSLocationOptionsT) string

GetGCSLocation parses path-style location http url to return in gcs:// format https://storage.googleapis.com/test-bucket/test-object.csv --> gcs://test-bucket/test-object.csv tldFormat is used to set return format "<tldFormat>://..."

func GetGCSLocationFolder

func GetGCSLocationFolder(location string, options GCSLocationOptionsT) string

GetGCSLocationFolder returns the folder path for an gcs object https://storage.googleapis.com/test-bucket/myfolder/test-object.csv --> gcs://test-bucket/myfolder

func GetGCSLocations

func GetGCSLocations(locations []string, options GCSLocationOptionsT) (gcsLocations []string)

func GetIP

func GetIP() string

func GetLoadFileLocation

func GetLoadFileLocation(dbHandle *sql.DB, sourceId string, destinationId string, tableName string, start, end int64) (location string, err error)

func GetLoadFileLocations

func GetLoadFileLocations(dbHandle *sql.DB, sourceId string, destinationId string, tableName string, start, end int64) (locations []string, err error)

func GetNewTimings

func GetNewTimings(upload UploadT, dbHandle *sql.DB, status string) []byte

GetNewTimings appends current status with current time to timings column eg. status: exported_data, timings: [{exporting_data: 2020-04-21 15:16:19.687716] -> [{exporting_data: 2020-04-21 15:16:19.687716, exported_data: 2020-04-21 15:26:34.344356}]

func GetObjectFolder

func GetObjectFolder(provider string, location string) (folder string)

GetObjectFolder returns the folder path for the storage object based on the storage provider eg. For provider as S3: https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv

func GetS3Location

func GetS3Location(location string) (s3Location string, region string)

GetS3Location parses path-style location http url to return in s3:// format https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv

func GetS3LocationFolder

func GetS3LocationFolder(location string) string

GetS3LocationFolder returns the folder path for an s3 object https://test-bucket.s3.amazonaws.com/myfolder/test-object.csv --> s3://test-bucket/myfolder

func GetS3Locations

func GetS3Locations(locations []string) (s3Locations []string)

func GetSlaveWorkerId

func GetSlaveWorkerId(workerIdx int, slaveID string) string

func GetTableUploadStatus

func GetTableUploadStatus(uploadID int64, tableName string, dbHandle *sql.DB) (status string, err error)

func GetUploadTimings

func GetUploadTimings(upload UploadT, dbHandle *sql.DB) (timings []map[string]string)

GetUploadTimings returns timings json column eg. timings: [{exporting_data: 2020-04-21 15:16:19.687716, exported_data: 2020-04-21 15:26:34.344356}]

func HasLoadFiles

func HasLoadFiles(dbHandle *sql.DB, sourceId string, destinationId string, tableName string, start, end int64) bool

func JSONSchemaToMap

func JSONSchemaToMap(rawMsg json.RawMessage) map[string]map[string]string

func ObjectStorageType

func ObjectStorageType(destType string, config interface{}) string

func SetStagingFilesError

func SetStagingFilesError(ids []int64, status string, dbHandle *sql.DB, statusError error) (err error)

func SetStagingFilesStatus

func SetStagingFilesStatus(ids []int64, status string, dbHandle *sql.DB) (err error)

func SetTableUploadError

func SetTableUploadError(status string, uploadID int64, tableName string, statusError error, dbHandle *sql.DB) (err error)

func SetTableUploadStatus

func SetTableUploadStatus(status string, uploadID int64, tableName string, dbHandle *sql.DB) (err error)

func SetUploadColumns

func SetUploadColumns(upload UploadT, dbHandle *sql.DB, fields ...UploadColumnT) (err error)

SetUploadColumns sets any column values passed as args in UploadColumnT format for warehouseUploadsTable

func SetUploadError

func SetUploadError(upload UploadT, statusError error, state string, dbHandle *sql.DB) (err error)

func SetUploadStatus

func SetUploadStatus(upload UploadT, status string, dbHandle *sql.DB, additionalFields ...UploadColumnT) (err error)

func SnowflakeCloudProvider

func SnowflakeCloudProvider(config interface{}) string

func ToCase

func ToCase(provider string, str string) string

func ToSafeDBString

func ToSafeDBString(provider string, str string) string

ToSafeDBString to remove special characters

func UpdateCurrentSchema

func UpdateCurrentSchema(namespace string, wh WarehouseT, uploadID int64, currentSchema, schema map[string]map[string]string, dbHandle *sql.DB) (err error)

Types

type ConfigT

type ConfigT struct {
	DbHandle  *sql.DB
	Upload    UploadT
	Warehouse WarehouseT
	Stage     string
}

type CurrentSchemaT

type CurrentSchemaT struct {
	Namespace string
	Schema    map[string]map[string]string
}

func GetCurrentSchema

func GetCurrentSchema(dbHandle *sql.DB, warehouse WarehouseT) (CurrentSchemaT, error)

type DestinationT

type DestinationT struct {
	Source      backendconfig.SourceT
	Destination backendconfig.DestinationT
}

type GCSLocationOptionsT

type GCSLocationOptionsT struct {
	TLDFormat string
}

type SchemaDiffT

type SchemaDiffT struct {
	Tables        []string
	ColumnMaps    map[string]map[string]string
	UpdatedSchema map[string]map[string]string
}

func GetSchemaDiff

func GetSchemaDiff(currentSchema, uploadSchema map[string]map[string]string) (diff SchemaDiffT)

type StagingFileT

type StagingFileT struct {
	Schema           map[string]map[string]interface{}
	BatchDestination DestinationT
	Location         string
	FirstEventAt     string
	LastEventAt      string
	TotalEvents      int
}

type UploadColumnT

type UploadColumnT struct {
	Column string
	Value  interface{}
}

type UploadT

type UploadT struct {
	ID                 int64
	Namespace          string
	SourceID           string
	DestinationID      string
	DestinationType    string
	StartStagingFileID int64
	EndStagingFileID   int64
	StartLoadFileID    int64
	EndLoadFileID      int64
	Status             string
	Schema             map[string]map[string]string
	Error              json.RawMessage
	Timings            []map[string]string
}

type WarehouseT

type WarehouseT struct {
	Source      backendconfig.SourceT
	Destination backendconfig.DestinationT
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL