warehouseutils

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2021 License: AGPL-3.0 Imports: 23 Imported by: 1

Documentation

Index

Constants

View Source
const (
	PARQUET_INT_64           = "type=INT64, repetitiontype=OPTIONAL"
	PARQUET_BOOLEAN          = "type=BOOLEAN, repetitiontype=OPTIONAL"
	PARQUET_DOUBLE           = "type=DOUBLE, repetitiontype=OPTIONAL"
	PARQUET_STRING           = "type=BYTE_ARRAY, convertedtype=UTF8, repetitiontype=OPTIONAL"
	PARQUET_TIMESTAMP_MICROS = "type=INT64, convertedtype=TIMESTAMP_MICROS, repetitiontype=OPTIONAL"
)
View Source
const (
	RS            = "RS"
	BQ            = "BQ"
	SNOWFLAKE     = "SNOWFLAKE"
	POSTGRES      = "POSTGRES"
	CLICKHOUSE    = "CLICKHOUSE"
	MSSQL         = "MSSQL"
	AZURE_SYNAPSE = "AZURE_SYNAPSE"
)
View Source
const (
	StagingFileSucceededState = "succeeded"
	StagingFileFailedState    = "failed"
	StagingFileExecutingState = "executing"
	StagingFileAbortedState   = "aborted"
	StagingFileWaitingState   = "waiting"
)
View Source
const (
	WarehouseStagingFilesTable = "wh_staging_files"
	WarehouseLoadFilesTable    = "wh_load_files"
	WarehouseUploadsTable      = "wh_uploads"
	WarehouseTableUploadsTable = "wh_table_uploads"
	WarehouseSchemasTable      = "wh_schemas"
)

warehouse table names

View Source
const (
	DiscardsTable           = "rudder_discards"
	IdentityMergeRulesTable = "rudder_identity_merge_rules"
	IdentityMappingsTable   = "rudder_identity_mappings"
	SyncFrequency           = "syncFrequency"
	SyncStartAt             = "syncStartAt"
	ExcludeWindow           = "excludeWindow"
	ExcludeWindowStartTime  = "excludeWindowStartTime"
	ExcludeWindowEndTime    = "excludeWindowEndTime"
)
View Source
const (
	UsersTable      = "users"
	IdentifiesTable = "identifies"
)
View Source
const (
	BQLoadedAtFormat         = "2006-01-02 15:04:05.999999 Z"
	BQUuidTSFormat           = "2006-01-02 15:04:05 Z"
	DatalakeTimeWindowFormat = "2006/01/02/15"
)
View Source
const (
	LOAD_FILE_TYPE_CSV     = "csv"
	LOAD_FILE_TYPE_JSON    = "json"
	LOAD_FILE_TYPE_PARQUET = "parquet"
)
View Source
const LOADED_AT_COLUMN = "loaded_at"
View Source
const UUID_TS_COLUMN = "uuid_ts"

Variables

View Source
var (
	IdentityEnabledWarehouses []string

	AWSCredsExpiryInS int64
)
View Source
var DiscardsSchema = map[string]string{
	"table_name":   "string",
	"row_id":       "string",
	"column_name":  "string",
	"column_value": "string",
	"received_at":  "datetime",
	"uuid_ts":      "datetime",
}
View Source
var ObjectStorageMap = map[string]string{
	"RS":          "S3",
	"S3_DATALAKE": "S3",
	"BQ":          "GCS",
}
View Source
var ReservedKeywords = map[string]map[string]bool{
	"SNOWFLAKE": {
		"ACCOUNT":           true,
		"ALL":               true,
		"ALTER":             true,
		"AND":               true,
		"ANY":               true,
		"AS":                true,
		"BETWEEN":           true,
		"BY":                true,
		"CASE":              true,
		"CAST":              true,
		"CHECK":             true,
		"COLUMN":            true,
		"CONNECT":           true,
		"CONNECTION":        true,
		"CONSTRAINT":        true,
		"CREATE":            true,
		"CROSS":             true,
		"CURRENT":           true,
		"CURRENT_DATE":      true,
		"CURRENT_TIME":      true,
		"CURRENT_TIMESTAMP": true,
		"CURRENT_USER":      true,
		"DATABASE":          true,
		"DELETE":            true,
		"DISTINCT":          true,
		"DROP":              true,
		"ELSE":              true,
		"EXISTS":            true,
		"FALSE":             true,
		"FOLLOWING":         true,
		"FOR":               true,
		"FROM":              true,
		"FULL":              true,
		"GRANT":             true,
		"GROUP":             true,
		"GSCLUSTER":         true,
		"HAVING":            true,
		"ILIKE":             true,
		"IN":                true,
		"INCREMENT":         true,
		"INNER":             true,
		"INSERT":            true,
		"INTERSECT":         true,
		"INTO":              true,
		"IS":                true,
		"ISSUE":             true,
		"JOIN":              true,
		"LATERAL":           true,
		"LEFT":              true,
		"LIKE":              true,
		"LOCALTIME":         true,
		"LOCALTIMESTAMP":    true,
		"MINUS":             true,
		"NATURAL":           true,
		"NOT":               true,
		"NULL":              true,
		"OF":                true,
		"ON":                true,
		"OR":                true,
		"ORDER":             true,
		"ORGANIZATION":      true,
		"QUALIFY":           true,
		"REGEXP":            true,
		"REVOKE":            true,
		"RIGHT":             true,
		"RLIKE":             true,
		"ROW":               true,
		"ROWS":              true,
		"SAMPLE":            true,
		"SCHEMA":            true,
		"SELECT":            true,
		"SET":               true,
		"SOME":              true,
		"START":             true,
		"TABLE":             true,
		"TABLESAMPLE":       true,
		"THEN":              true,
		"TO":                true,
		"TRIGGER":           true,
		"TRUE":              true,
		"TRY_CAST":          true,
		"UNION":             true,
		"UNIQUE":            true,
		"UPDATE":            true,
		"USING":             true,
		"VALUES":            true,
		"VIEW":              true,
		"WHEN":              true,
		"WHENEVER":          true,
		"WHERE":             true,
		"WITH":              true,
	},
	"RS": {
		"AES128":            true,
		"AES256":            true,
		"ALL":               true,
		"ALLOWOVERWRITE":    true,
		"ANALYSE":           true,
		"ANALYZE":           true,
		"AND":               true,
		"ANY":               true,
		"ARRAY":             true,
		"AS":                true,
		"ASC":               true,
		"AUTHORIZATION":     true,
		"AZ64":              true,
		"BACKUP":            true,
		"BETWEEN":           true,
		"BINARY":            true,
		"BLANKSASNULL":      true,
		"BOTH":              true,
		"BYTEDICT":          true,
		"BZIP2":             true,
		"CASE":              true,
		"CAST":              true,
		"CHECK":             true,
		"COLLATE":           true,
		"COLUMN":            true,
		"CONSTRAINT":        true,
		"CREATE":            true,
		"CREDENTIALS":       true,
		"CROSS":             true,
		"CURRENT_DATE":      true,
		"CURRENT_TIME":      true,
		"CURRENT_TIMESTAMP": true,
		"CURRENT_USER":      true,
		"CURRENT_USER_ID":   true,
		"DEFAULT":           true,
		"DEFERRABLE":        true,
		"DEFLATE":           true,
		"DEFRAG":            true,
		"DELTA":             true,
		"DELTA32K":          true,
		"DESC":              true,
		"DISABLE":           true,
		"DISTINCT":          true,
		"DO":                true,
		"ELSE":              true,
		"EMPTYASNULL":       true,
		"ENABLE":            true,
		"ENCODE":            true,
		"ENCRYPT     ":      true,
		"ENCRYPTION":        true,
		"END":               true,
		"EXCEPT":            true,
		"EXPLICIT":          true,
		"FALSE":             true,
		"FOR":               true,
		"FOREIGN":           true,
		"FREEZE":            true,
		"FROM":              true,
		"FULL":              true,
		"GLOBALDICT256":     true,
		"GLOBALDICT64K":     true,
		"GRANT":             true,
		"GROUP":             true,
		"GZIP":              true,
		"HAVING":            true,
		"IDENTITY":          true,
		"IGNORE":            true,
		"ILIKE":             true,
		"IN":                true,
		"INITIALLY":         true,
		"INNER":             true,
		"INTERSECT":         true,
		"INTO":              true,
		"IS":                true,
		"ISNULL":            true,
		"JOIN":              true,
		"LANGUAGE":          true,
		"LEADING":           true,
		"LEFT":              true,
		"LIKE":              true,
		"LIMIT":             true,
		"LOCALTIME":         true,
		"LOCALTIMESTAMP":    true,
		"LUN":               true,
		"LUNS":              true,
		"LZO":               true,
		"LZOP":              true,
		"MINUS":             true,
		"MOSTLY13":          true,
		"MOSTLY32":          true,
		"MOSTLY8":           true,
		"NATURAL":           true,
		"NEW":               true,
		"NOT":               true,
		"NOTNULL":           true,
		"NULL":              true,
		"NULLS":             true,
		"OFF":               true,
		"OFFLINE":           true,
		"OFFSET":            true,
		"OID":               true,
		"OLD":               true,
		"ON":                true,
		"ONLY":              true,
		"OPEN":              true,
		"OR":                true,
		"ORDER":             true,
		"OUTER":             true,
		"OVERLAPS":          true,
		"PARALLEL":          true,
		"PARTITION":         true,
		"PERCENT":           true,
		"PERMISSIONS":       true,
		"PLACING":           true,
		"PRIMARY":           true,
		"RAW":               true,
		"READRATIO":         true,
		"RECOVER":           true,
		"REFERENCES":        true,
		"RESPECT":           true,
		"REJECTLOG":         true,
		"RESORT":            true,
		"RESTORE":           true,
		"RIGHT":             true,
		"SELECT":            true,
		"SESSION_USER":      true,
		"SIMILAR":           true,
		"SNAPSHOT ":         true,
		"SOME":              true,
		"SYSDATE":           true,
		"SYSTEM":            true,
		"TABLE":             true,
		"TAG":               true,
		"TDES":              true,
		"TEXT255":           true,
		"TEXT32K":           true,
		"THEN":              true,
		"TIMESTAMP":         true,
		"TO":                true,
		"TOP":               true,
		"TRAILING":          true,
		"TRUE":              true,
		"TRUNCATECOLUMNS":   true,
		"UNION":             true,
		"UNIQUE":            true,
		"USER":              true,
		"USING":             true,
		"VERBOSE":           true,
		"WALLET":            true,
		"WHEN":              true,
		"WHERE":             true,
		"WITH":              true,
		"WITHOUT":           true,
	},
	"POSTGRES": {
		"ALL":               true,
		"ANALYSE":           true,
		"ANALYZE":           true,
		"AND":               true,
		"ANY":               true,
		"ARRAY":             true,
		"AS":                true,
		"ASC":               true,
		"ASYMMETRIC":        true,
		"AUTHORIZATION":     true,
		"BETWEEN":           true,
		"BINARY":            true,
		"BOTH":              true,
		"CASE":              true,
		"CAST":              true,
		"CHECK":             true,
		"COLLATE":           true,
		"COLUMN":            true,
		"CONSTRAINT":        true,
		"CREATE":            true,
		"CROSS":             true,
		"CURRENT_DATE":      true,
		"CURRENT_ROLE":      true,
		"CURRENT_TIME":      true,
		"CURRENT_TIMESTAMP": true,
		"CURRENT_USER":      true,
		"DEFAULT":           true,
		"DEFERRABLE":        true,
		"DESC":              true,
		"DISTINCT":          true,
		"DO":                true,
		"ELSE":              true,
		"END":               true,
		"EXCEPT":            true,
		"FALSE":             true,
		"FOR":               true,
		"FOREIGN":           true,
		"FREEZE":            true,
		"FROM":              true,
		"FULL":              true,
		"GRANT":             true,
		"GROUP":             true,
		"HAVING":            true,
		"ILIKE":             true,
		"IN":                true,
		"INITIALLY":         true,
		"INNER":             true,
		"INTERSECT":         true,
		"INTO":              true,
		"IS":                true,
		"ISNULL":            true,
		"JOIN":              true,
		"LEADING":           true,
		"LEFT":              true,
		"LIKE":              true,
		"LIMIT":             true,
		"LOCALTIME":         true,
		"LOCALTIMESTAMP":    true,
		"NATURAL":           true,
		"NEW":               true,
		"NOT":               true,
		"NOTNULL":           true,
		"NULL":              true,
		"OFF":               true,
		"OFFSET":            true,
		"OLD":               true,
		"ON":                true,
		"ONLY":              true,
		"OR":                true,
		"ORDER":             true,
		"OUTER":             true,
		"OVERLAPS":          true,
		"PLACING":           true,
		"PRIMARY":           true,
		"REFERENCES":        true,
		"RIGHT":             true,
		"SELECT":            true,
		"SESSION_USER":      true,
		"SIMILAR":           true,
		"SOME":              true,
		"SYMMETRIC":         true,
		"TABLE":             true,
		"THEN":              true,
		"TRAILING":          true,
		"TRUE":              true,
		"UNION":             true,
		"UNIQUE":            true,
		"USER":              true,
		"USING":             true,
		"VERBOSE":           true,
		"WHEN":              true,
		"WHERE":             true,
	},
	"BQ": {
		"ALL":                  true,
		"AND":                  true,
		"ANY":                  true,
		"ARRAY":                true,
		"AS":                   true,
		"ASC":                  true,
		"ASSERT_ROWS_MODIFIED": true,
		"AT":                   true,
		"BETWEEN":              true,
		"BY":                   true,
		"CASE":                 true,
		"CAST":                 true,
		"COLLATE":              true,
		"CONTAINS":             true,
		"CREATE":               true,
		"CROSS":                true,
		"CUBE":                 true,
		"CURRENT":              true,
		"DEFAULT":              true,
		"DEFINE":               true,
		"DESC":                 true,
		"DISTINCT":             true,
		"ELSE":                 true,
		"END":                  true,
		"ENUM":                 true,
		"ESCAPE":               true,
		"EXCEPT":               true,
		"EXCLUDE":              true,
		"EXISTS":               true,
		"EXTRACT":              true,
		"FALSE":                true,
		"FETCH":                true,
		"FOLLOWING":            true,
		"FOR":                  true,
		"FROM":                 true,
		"FULL":                 true,
		"GROUP":                true,
		"GROUPING":             true,
		"GROUPS":               true,
		"HASH":                 true,
		"HAVING":               true,
		"IF":                   true,
		"IGNORE":               true,
		"IN":                   true,
		"INNER":                true,
		"INTERSECT":            true,
		"INTERVAL":             true,
		"INTO":                 true,
		"IS":                   true,
		"JOIN":                 true,
		"LATERAL":              true,
		"LEFT":                 true,
		"LIKE":                 true,
		"LIMIT":                true,
		"LOOKUP":               true,
		"MERGE":                true,
		"NATURAL":              true,
		"NEW":                  true,
		"NO":                   true,
		"NOT":                  true,
		"NULL":                 true,
		"NULLS":                true,
		"OF":                   true,
		"ON":                   true,
		"OR":                   true,
		"ORDER":                true,
		"OUTER":                true,
		"OVER":                 true,
		"PARTITION":            true,
		"PRECEDING":            true,
		"PROTO":                true,
		"RANGE":                true,
		"RECURSIVE":            true,
		"RESPECT":              true,
		"RIGHT":                true,
		"ROLLUP":               true,
		"ROWS":                 true,
		"SELECT":               true,
		"SET":                  true,
		"SOME":                 true,
		"STRUCT":               true,
		"TABLESAMPLE":          true,
		"THEN":                 true,
		"TO":                   true,
		"TREAT":                true,
		"TRUE":                 true,
		"UNBOUNDED":            true,
		"UNION":                true,
		"UNNEST":               true,
		"USING":                true,
		"WHEN":                 true,
		"WHERE":                true,
		"WINDOW":               true,
		"WITH":                 true,
		"WITHIN":               true,
	},
}
View Source
var SnowflakeStorageMap = map[string]string{
	"AWS":   "S3",
	"GCP":   "GCS",
	"AZURE": "AZURE_BLOB",
}

Functions

func Datatype

func Datatype(in interface{}) string

func DestStat

func DestStat(statType string, statName string, id string) stats.RudderStats

func DoubleQuoteAndJoinByComma added in v0.1.10

func DoubleQuoteAndJoinByComma(strs []string) string

func GetAzureBlobLocation

func GetAzureBlobLocation(location string) string

GetAzureBlobLocation parses path-style location http url to return in azure:// format https://myproject.blob.core.windows.net/test-bucket/test-object.csv --> azure://myproject.blob.core.windows.net/test-bucket/test-object.csv

func GetAzureBlobLocationFolder

func GetAzureBlobLocationFolder(location string) string

GetAzureBlobLocationFolder returns the folder path for an azure storage object https://myproject.blob.core.windows.net/test-bucket/myfolder/test-object.csv --> azure://myproject.blob.core.windows.net/myfolder

func GetConfigValue

func GetConfigValue(key string, warehouse WarehouseT) (val string)

func GetConfigValueAsMap added in v0.1.10

func GetConfigValueAsMap(key string, config map[string]interface{}) map[string]interface{}

func GetConfigValueBoolString added in v0.1.10

func GetConfigValueBoolString(key string, warehouse WarehouseT) string

func GetFirstTiming added in v0.1.10

func GetFirstTiming(str sql.NullString) (status string, recordedTime time.Time)

func GetGCSLocation

func GetGCSLocation(location string, options GCSLocationOptionsT) string

GetGCSLocation parses path-style location http url to return in gcs:// format https://storage.googleapis.com/test-bucket/test-object.csv --> gcs://test-bucket/test-object.csv tldFormat is used to set return format "<tldFormat>://..."

func GetGCSLocationFolder

func GetGCSLocationFolder(location string, options GCSLocationOptionsT) string

GetGCSLocationFolder returns the folder path for an gcs object https://storage.googleapis.com/test-bucket/myfolder/test-object.csv --> gcs://test-bucket/myfolder

func GetGCSLocations

func GetGCSLocations(loadFiles []LoadFileT, options GCSLocationOptionsT) (gcsLocations []string)

func GetIP

func GetIP() string

func GetLastFailedStatus added in v0.1.10

func GetLastFailedStatus(str sql.NullString) (status string)

func GetLastTiming added in v0.1.10

func GetLastTiming(str sql.NullString) (status string, recordedTime time.Time)

func GetLoadFileGenTime added in v0.1.10

func GetLoadFileGenTime(str sql.NullString) (t time.Time)

func GetNamespace added in v0.1.10

func GetNamespace(source backendconfig.SourceT, destination backendconfig.DestinationT, dbHandle *sql.DB) (namespace string, exists bool)

func GetObjectFolder

func GetObjectFolder(provider string, location string) (folder string)

GetObjectFolder returns the folder path for the storage object based on the storage provider eg. For provider as S3: https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv

func GetObjectLocation added in v0.1.10

func GetObjectLocation(provider string, location string) (folder string)

GetObjectFolder returns the folder path for the storage object based on the storage provider eg. For provider as S3: https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv

func GetObjectName added in v0.1.10

func GetObjectName(location string, providerConfig interface{}, objectProvider string) (objectName string, err error)

GetObjectName extracts object/key objectName from different buckets locations ex: https://bucket-endpoint/bucket-name/object -> object

func GetParquetValue added in v0.1.10

func GetParquetValue(val interface{}, colType string) (retVal interface{}, err error)

func GetS3Location

func GetS3Location(location string) (s3Location string, region string)

GetS3Location parses path-style location http url to return in s3:// format https://test-bucket.s3.amazonaws.com/test-object.csv --> s3://test-bucket/test-object.csv

func GetS3LocationFolder

func GetS3LocationFolder(location string) string

GetS3LocationFolder returns the folder path for an s3 object https://test-bucket.s3.amazonaws.com/myfolder/test-object.csv --> s3://test-bucket/myfolder

func GetSlaveWorkerId

func GetSlaveWorkerId(workerIdx int, slaveID string) string

func GetTableFirstEventAt added in v0.1.10

func GetTableFirstEventAt(dbHandle *sql.DB, sourceId string, destinationId string, tableName string, start, end int64) (firstEventAt string)

func GetTablePathInObjectStorage added in v0.1.10

func GetTablePathInObjectStorage(namespace string, tableName string) string

GetTablePathInObjectStorage returns the path of the table relative to the object storage bucket for location - "s3://testbucket/rudder-datalake/namespace/tableName/" - it returns "rudder-datalake/namespace/tableName"

func GetTempFileExtension added in v0.1.10

func GetTempFileExtension(destType string) string

func GetTimeWindow added in v0.1.10

func GetTimeWindow(ts time.Time) time.Time

func GetWarehouseIdentifier added in v0.1.10

func GetWarehouseIdentifier(destType string, sourceID string, destinationID string) string

func IDResolutionEnabled added in v0.1.10

func IDResolutionEnabled() bool

func IdentityMappingsTableName added in v0.1.10

func IdentityMappingsTableName(warehouse WarehouseT) string

func IdentityMappingsUniqueMappingConstraintName added in v0.1.10

func IdentityMappingsUniqueMappingConstraintName(warehouse WarehouseT) string

func IdentityMappingsWarehouseTableName added in v0.1.10

func IdentityMappingsWarehouseTableName(provider string) string

func IdentityMergeRulesTableName added in v0.1.10

func IdentityMergeRulesTableName(warehouse WarehouseT) string

func IdentityMergeRulesWarehouseTableName added in v0.1.10

func IdentityMergeRulesWarehouseTableName(provider string) string

func JSONSchemaToMap

func JSONSchemaToMap(rawMsg json.RawMessage) map[string]map[string]string

func JSONTimingsToMap added in v0.1.10

func JSONTimingsToMap(rawMsg json.RawMessage) []map[string]string

func NewCsvReader added in v0.1.10

func NewCsvReader(r io.Reader) *csvReader

func NewEventReader added in v0.1.10

func NewEventReader(r io.Reader, provider string) eventReader

func NewJSONReader added in v0.1.10

func NewJSONReader(r io.Reader) *jsonReader

func ObjectStorageType

func ObjectStorageType(destType string, config interface{}, useRudderStorage bool) string

func SnowflakeCloudProvider

func SnowflakeCloudProvider(config interface{}) string

func SortColumnKeysFromColumnMap added in v0.1.10

func SortColumnKeysFromColumnMap(columnMap map[string]string) []string

func TimingFromJSONString added in v0.1.10

func TimingFromJSONString(str sql.NullString) (status string, recordedTime time.Time)

func ToProviderCase added in v0.1.10

func ToProviderCase(provider string, str string) string

ToProviderCase converts string provided to case generally accepted in the warehouse for table, column, schema names etc eg. columns are uppercased in SNOWFLAKE and lowercased etc in REDSHIFT, BIGQUERY etc

func ToSafeNamespace added in v0.1.10

func ToSafeNamespace(provider string, name string) string

ToSafeNamespace convert name of the namespace to one acceptable by warehouse 1. removes symbols and joins continuous letters and numbers with single underscore and if first char is a number will append a underscore before the first number 2. adds an underscore if the name is a reserved keyword in the warehouse 3. truncate the length of namespace to 127 characters 4. return "stringempty" if name is empty after conversion examples: omega to omega omega v2 to omega_v2 9mega to _9mega mega& to mega ome$ga to ome_ga omega$ to omega ome_ ga to ome_ga 9mega________-________90 to _9mega_90 Cízǔ to C_z

Types

type CsvLoader added in v0.1.10

type CsvLoader struct {
	// contains filtered or unexported fields
}

CsvLoader is common for non-BQ warehouses. If you need any custom logic, either extend this or use destType and if/else/switch.

func NewCSVLoader added in v0.1.10

func NewCSVLoader(destType string, writer LoadFileWriterI) *CsvLoader

func (*CsvLoader) AddColumn added in v0.1.10

func (loader *CsvLoader) AddColumn(columnName string, columnType string, val interface{})

func (*CsvLoader) AddEmptyColumn added in v0.1.10

func (loader *CsvLoader) AddEmptyColumn(columnName string)

func (*CsvLoader) AddRow added in v0.1.10

func (loader *CsvLoader) AddRow(columnNames []string, row []string)

func (*CsvLoader) GetLoadTimeFomat added in v0.1.10

func (loader *CsvLoader) GetLoadTimeFomat(columnName string) string

func (*CsvLoader) IsLoadTimeColumn added in v0.1.10

func (loader *CsvLoader) IsLoadTimeColumn(columnName string) bool

func (*CsvLoader) Write added in v0.1.10

func (loader *CsvLoader) Write() error

func (*CsvLoader) WriteToString added in v0.1.10

func (loader *CsvLoader) WriteToString() (string, error)

type DestinationT

type DestinationT struct {
	Source      backendconfig.SourceT
	Destination backendconfig.DestinationT
}

type EventLoader added in v0.1.10

type EventLoader interface {
	IsLoadTimeColumn(columnName string) bool
	GetLoadTimeFomat(columnName string) string
	AddColumn(columnName string, columnType string, val interface{})
	AddRow(columnNames []string, values []string)
	AddEmptyColumn(columnName string)
	WriteToString() (string, error)
	Write() error
}

func GetNewEventLoader added in v0.1.10

func GetNewEventLoader(destinationType, loadFileType string, w LoadFileWriterI) EventLoader

type GCSLocationOptionsT

type GCSLocationOptionsT struct {
	TLDFormat string
}

type GetLoadFilesOptionsT added in v0.1.10

type GetLoadFilesOptionsT struct {
	Table   string
	StartID int64
	EndID   int64
	Limit   int64
}

type JsonLoader added in v0.1.10

type JsonLoader struct {
	// contains filtered or unexported fields
}

JsonLoader is only for BQ now. Treat this is as custom BQ loader. If more warehouses are added in future, change this accordingly.

func NewJSONLoader added in v0.1.10

func NewJSONLoader(destType string, writer LoadFileWriterI) *JsonLoader

func (*JsonLoader) AddColumn added in v0.1.10

func (loader *JsonLoader) AddColumn(columnName string, columnType string, val interface{})

func (*JsonLoader) AddEmptyColumn added in v0.1.10

func (loader *JsonLoader) AddEmptyColumn(columnName string)

func (*JsonLoader) AddRow added in v0.1.10

func (loader *JsonLoader) AddRow(columnNames []string, row []string)

func (*JsonLoader) GetLoadTimeFomat added in v0.1.10

func (loader *JsonLoader) GetLoadTimeFomat(columnName string) string

func (*JsonLoader) IsLoadTimeColumn added in v0.1.10

func (loader *JsonLoader) IsLoadTimeColumn(columnName string) bool

func (*JsonLoader) Write added in v0.1.10

func (loader *JsonLoader) Write() error

func (*JsonLoader) WriteToString added in v0.1.10

func (loader *JsonLoader) WriteToString() (string, error)

type LoadFileT added in v0.1.10

type LoadFileT struct {
	Location string
	Metadata json.RawMessage
}

func GetS3Locations

func GetS3Locations(loadFiles []LoadFileT) []LoadFileT

type LoadFileWriterI added in v0.1.10

type LoadFileWriterI interface {
	WriteGZ(s string) error
	Write(p []byte) (int, error)
	WriteRow(r []interface{}) error
	Close() error
	GetLoadFile() *os.File
}

type ParquetLoader added in v0.1.10

type ParquetLoader struct {
	Schema     []string
	Values     []interface{}
	FileWriter LoadFileWriterI
	// contains filtered or unexported fields
}

ParquetLoader is used for generating parquet load files.

func NewParquetLoader added in v0.1.10

func NewParquetLoader(destType string, w LoadFileWriterI) *ParquetLoader

func (*ParquetLoader) AddColumn added in v0.1.10

func (loader *ParquetLoader) AddColumn(columnName string, colType string, val interface{})

func (*ParquetLoader) AddEmptyColumn added in v0.1.10

func (loader *ParquetLoader) AddEmptyColumn(columnName string)

func (*ParquetLoader) AddRow added in v0.1.10

func (loader *ParquetLoader) AddRow(columnNames []string, row []string)

func (*ParquetLoader) GetLoadTimeFomat added in v0.1.10

func (loader *ParquetLoader) GetLoadTimeFomat(columnName string) string

func (*ParquetLoader) IsLoadTimeColumn added in v0.1.10

func (loader *ParquetLoader) IsLoadTimeColumn(columnName string) bool

func (*ParquetLoader) Write added in v0.1.10

func (loader *ParquetLoader) Write() error

func (*ParquetLoader) WriteToString added in v0.1.10

func (loader *ParquetLoader) WriteToString() (string, error)

type ParquetWriter added in v0.1.10

type ParquetWriter struct {
	// contains filtered or unexported fields
}

func CreateParquetWriter added in v0.1.10

func CreateParquetWriter(schema TableSchemaT, outputFilePath string, destType string) (*ParquetWriter, error)

func (*ParquetWriter) Close added in v0.1.10

func (p *ParquetWriter) Close() error

func (*ParquetWriter) GetLoadFile added in v0.1.10

func (p *ParquetWriter) GetLoadFile() *os.File

func (*ParquetWriter) Write added in v0.1.10

func (p *ParquetWriter) Write(b []byte) (int, error)

func (*ParquetWriter) WriteGZ added in v0.1.10

func (p *ParquetWriter) WriteGZ(s string) error

func (*ParquetWriter) WriteRow added in v0.1.10

func (p *ParquetWriter) WriteRow(row []interface{}) error

type PendingEventsRequestT added in v0.1.10

type PendingEventsRequestT struct {
	SourceID  string `json:"source_id"`
	TaskRunID string `json:"task_run_id"`
}

type PendingEventsResponseT added in v0.1.10

type PendingEventsResponseT struct {
	PendingEvents            bool  `json:"pending_events"`
	PendingStagingFilesCount int64 `json:"pending_staging_files"`
	PendingUploadCount       int64 `json:"pending_uploads"`
}

type QueryResult added in v0.1.10

type QueryResult struct {
	Columns []string
	Values  [][]string
}

type SchemaT added in v0.1.10

type SchemaT map[string]map[string]string

type StagingFileT

type StagingFileT struct {
	Schema           map[string]map[string]interface{}
	BatchDestination DestinationT
	Location         string
	FirstEventAt     string
	LastEventAt      string
	TotalEvents      int
	UseRudderStorage bool
	// cloud sources specific info
	SourceBatchID   string
	SourceTaskID    string
	SourceTaskRunID string
	SourceJobID     string
	SourceJobRunID  string
	TimeWindow      time.Time
}

type TableSchemaDiffT added in v0.1.10

type TableSchemaDiffT struct {
	Exists                         bool
	TableToBeCreated               bool
	ColumnMap                      map[string]string
	UpdatedSchema                  map[string]string
	StringColumnsToBeAlteredToText []string
}

type TableSchemaT added in v0.1.10

type TableSchemaT map[string]string

type TriggerUploadRequestT added in v0.1.10

type TriggerUploadRequestT struct {
	SourceID      string `json:"source_id"`
	DestinationID string `json:"destination_id"`
}

type UploaderI added in v0.1.10

type UploaderI interface {
	GetSchemaInWarehouse() SchemaT
	GetLocalSchema() SchemaT
	UpdateLocalSchema(schema SchemaT) error
	GetTableSchemaInWarehouse(tableName string) TableSchemaT
	GetTableSchemaInUpload(tableName string) TableSchemaT
	GetLoadFilesMetadata(options GetLoadFilesOptionsT) []LoadFileT
	GetSampleLoadFileLocation(tableName string) (string, error)
	GetSingleLoadFile(tableName string) (LoadFileT, error)
	ShouldOnDedupUseNewRecord() bool
	UseRudderStorage() bool
	GetLoadFileGenStartTIme() time.Time
	GetLoadFileType() string
}

type WarehouseT

type WarehouseT struct {
	Source      backendconfig.SourceT
	Destination backendconfig.DestinationT
	Namespace   string
	Type        string
	Identifier  string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL