Documentation ¶
Index ¶
- Constants
- Variables
- func CheckCurrentTimeExistsInExcludeWindow(currentTime time.Time, windowStartTime, windowEndTime string) bool
- func CheckForWarehouseEnvVars() bool
- func CheckPGHealth(dbHandle *sql.DB) bool
- func GetExludeWindowStartEndTimes(excludeWindow map[string]interface{}) (string, string)
- func GetPrevScheduledTime(syncFrequency, syncStartAt string, currTime time.Time) time.Time
- func HandleSchemaChange(existingDataType, columnType string, columnVal interface{}) (newColumnVal interface{}, ok bool)
- func Init()
- func Init2()
- func Init3()
- func Init4()
- func Init5()
- func Init6()
- func InitWarehouseAPI(dbHandle *sql.DB, log logger.LoggerI)
- func PickupStagingConfiguration(job *PayloadT) bool
- func ScheduledTimes(syncFrequency, syncStartAt string) []int
- func Start(ctx context.Context, app app.Interface) error
- func TriggerUploadHandler(sourceID, destID string) error
- type BatchRouterEventT
- type ColumnInfoT
- type ConfigurationTestInput
- type ConfigurationTestOutput
- type ConstraintsI
- type ConstraintsViolationT
- type DB
- type DataT
- type ErrorResponseT
- type HandleT
- type IndexConstraintT
- type JobIDT
- type JobRunT
- type LoadFileJobT
- type MetadataT
- type PayloadT
- type ProcessStagingFilesJobT
- type QueryInput
- type RetryRequest
- type RetryResponse
- type SchemaHandleT
- type StagingFileT
- type TableSkipError
- type TableUploadIDInfoT
- type TableUploadReqT
- type TableUploadResT
- type TableUploadStatusInfoT
- type TableUploadStatusT
- type TableUploadT
- type TablesResT
- type UploadAPIT
- type UploadColumnT
- type UploadColumnsOpts
- type UploadJobT
- func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool
- func (job *UploadJobT) GetFirstLastEvent() (time.Time, time.Time)
- func (job *UploadJobT) GetLoadFileGenStartTIme() time.Time
- func (job *UploadJobT) GetLoadFileType() string
- func (job *UploadJobT) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptionsT) (loadFiles []warehouseutils.LoadFileT)
- func (job *UploadJobT) GetLocalSchema() warehouseutils.SchemaT
- func (job *UploadJobT) GetSampleLoadFileLocation(tableName string) (location string, err error)
- func (job *UploadJobT) GetSchemaInWarehouse() (schema warehouseutils.SchemaT)
- func (job *UploadJobT) GetSingleLoadFile(tableName string) (warehouseutils.LoadFileT, error)
- func (job *UploadJobT) GetTableSchemaInUpload(tableName string) warehouseutils.TableSchemaT
- func (job *UploadJobT) GetTableSchemaInWarehouse(tableName string) warehouseutils.TableSchemaT
- func (job *UploadJobT) ShouldOnDedupUseNewRecord() bool
- func (job *UploadJobT) UpdateLocalSchema(schema warehouseutils.SchemaT) error
- func (job *UploadJobT) UseRudderStorage() bool
- type UploadPagination
- type UploadReqT
- type UploadResT
- type UploadStatusOpts
- type UploadT
- type UploadsReqT
- type UploadsResT
- type WarehouseAdmin
- type WorkerIdentifierT
Constants ¶
const ( STATS_WORKER_IDLE_TIME = "worker_idle_time" STATS_WORKER_CLAIM_PROCESSING_TIME = "worker_claim_processing_time" STATS_WORKER_CLAIM_PROCESSING_FAILED = "worker_claim_processing_failed" STATS_WORKER_CLAIM_PROCESSING_SUCCEEDED = "worker_claim_processing_succeeded" TAG_WORKERID = "workerId" )
const ( Waiting = "waiting" GeneratedUploadSchema = "generated_upload_schema" CreatedTableUploads = "created_table_uploads" GeneratedLoadFiles = "generated_load_files" UpdatedTableUploadsCounts = "updated_table_uploads_counts" CreatedRemoteSchema = "created_remote_schema" ExportedUserTables = "exported_user_tables" ExportedData = "exported_data" ExportedIdentities = "exported_identities" Aborted = "aborted" )
Upload Status
const ( GeneratingStagingFileFailedState = "generating_staging_file_failed" GeneratedStagingFileState = "generated_staging_file" PopulatingHistoricIdentitiesState = "populating_historic_identities" PopulatingHistoricIdentitiesStateFailed = "populating_historic_identities_failed" FetchingRemoteSchemaFailed = "fetching_remote_schema_failed" InternalProcessingFailed = "internal_processing_failed" )
const ( TableUploadExecuting = "executing" TableUploadUpdatingSchema = "updating_schema" TableUploadUpdatingSchemaFailed = "updating_schema_failed" TableUploadUpdatedSchema = "updated_schema" TableUploadExporting = "exporting_data" TableUploadExportingFailed = "exporting_data_failed" UserTableUploadExportingFailed = "exporting_user_tables_failed" IdentityTableUploadExportingFailed = "exporting_identities_failed" TableUploadExported = "exported_data" )
Table Upload status
const ( UploadStatusField = "status" UploadStartLoadFileIDField = "start_load_file_id" UploadEndLoadFileIDField = "end_load_file_id" UploadUpdatedAtField = "updated_at" UploadTimingsField = "timings" UploadSchemaField = "schema" MergedSchemaField = "mergedschema" UploadLastExecAtField = "last_exec_at" UploadInProgress = "in_progress" )
const ( MasterMode = "master" SlaveMode = "slave" MasterSlaveMode = "master_and_slave" EmbeddedMode = "embedded" PooledWHSlaveMode = "embedded_master" )
warehouses worker modes
const (
CloudSourceCateogry = "cloud"
)
const (
DegradedMode = "degraded"
)
const (
WorkerProcessingDownloadStagingFileFailed = "worker_processing_download_staging_file_failed"
)
Variables ¶
var (
ShouldForceSetLowerVersion bool
)
Functions ¶
func CheckForWarehouseEnvVars ¶
func CheckForWarehouseEnvVars() bool
CheckForWarehouseEnvVars Checks if all the required Env Variables for Warehouse are present
func CheckPGHealth ¶
func GetPrevScheduledTime ¶
GetPrevScheduledTime returns closest previous scheduled time eg. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00) prev scheduled time for current time (eg. 18:00 -> 16:00 same day, 00:30 -> 22:00 prev day)
func HandleSchemaChange ¶
func ScheduledTimes ¶
ScheduledTimes returns all possible start times (minutes from start of day) as per schedule eg. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00)
func TriggerUploadHandler ¶
Types ¶
type BatchRouterEventT ¶
func (*BatchRouterEventT) GetColumnInfo ¶
func (event *BatchRouterEventT) GetColumnInfo(columnName string) (columnInfo ColumnInfoT, ok bool)
type ColumnInfoT ¶
type ColumnInfoT struct { ColumnVal interface{} ColumnType string }
type ConfigurationTestInput ¶
type ConfigurationTestInput struct {
DestID string
}
type ConfigurationTestOutput ¶
type ConstraintsI ¶
type ConstraintsI interface {
// contains filtered or unexported methods
}
type ConstraintsViolationT ¶
type ConstraintsViolationT struct {
// contains filtered or unexported fields
}
func ViolatedConstraints ¶
func ViolatedConstraints(destinationType string, brEvent *BatchRouterEventT, columnName string) (cv *ConstraintsViolationT)
type DB ¶
type DB struct {
// contains filtered or unexported fields
}
DB encapsulate interactions of warehouse operations with the database.
func NewWarehouseDB ¶
type ErrorResponseT ¶
type ErrorResponseT struct {
Error string
}
type IndexConstraintT ¶
type JobRunT ¶
type JobRunT struct {
// contains filtered or unexported fields
}
JobRunT Temporary store for processing staging file to load file
func (*JobRunT) GetWriter ¶
func (jobRun *JobRunT) GetWriter(tableName string) (warehouseutils.LoadFileWriterI, error)
type LoadFileJobT ¶
type LoadFileJobT struct { Upload UploadT StagingFile *StagingFileT Schema map[string]map[string]string Warehouse warehouseutils.WarehouseT Wg *misc.WaitGroup LoadFileIDsChan chan []int64 TableToBucketFolderMap map[string]string TableToBucketFolderMapLock *sync.RWMutex }
type PayloadT ¶
type PayloadT struct { BatchID string UploadID int64 StagingFileID int64 StagingFileLocation string UploadSchema map[string]map[string]string SourceID string SourceName string DestinationID string DestinationName string DestinationType string DestinationNamespace string DestinationRevisionID string StagingDestinationRevisionID string DestinationConfig interface{} StagingDestinationConfig interface{} UseRudderStorage bool StagingUseRudderStorage bool UniqueLoadGenID string RudderStoragePrefix string Output []loadFileUploadOutputT LoadFilePrefix string // prefix for the load file name LoadFileType string }
type ProcessStagingFilesJobT ¶
type ProcessStagingFilesJobT struct { Upload UploadT List []*StagingFileT Warehouse warehouseutils.WarehouseT }
type QueryInput ¶
type RetryRequest ¶
type RetryRequest struct { WorkspaceID string SourceID string DestinationID string DestinationType string IntervalInHours int64 // Optional, if provided we will retry based on the interval provided UploadIds []int64 // Optional, if provided we will retry the upload ids provided ForceRetry bool API UploadAPIT }
func (*RetryRequest) RetryWHUploads ¶
func (retryReq *RetryRequest) RetryWHUploads() (response RetryResponse, err error)
type RetryResponse ¶
type SchemaHandleT ¶
type SchemaHandleT struct {
// contains filtered or unexported fields
}
type StagingFileT ¶
type StagingFileT struct { ID int64 Location string SourceID string Schema json.RawMessage Status string // enum CreatedAt time.Time FirstEventAt time.Time LastEventAt time.Time UseRudderStorage bool DestinationRevisionID string // cloud sources specific info SourceBatchID string SourceTaskID string SourceTaskRunID string SourceJobID string SourceJobRunID string TimeWindow time.Time }
type TableSkipError ¶
type TableSkipError struct {
// contains filtered or unexported fields
}
TableSkipError is a custom error type to capture if a table load is skipped because of a previously failed table load
func (*TableSkipError) Error ¶
func (tse *TableSkipError) Error() string
type TableUploadIDInfoT ¶
type TableUploadIDInfoT struct {
// contains filtered or unexported fields
}
TableUploadIDInfoT captures the uploadID and error for [uploadID][tableName]
type TableUploadReqT ¶
type TableUploadReqT struct { UploadID int64 Name string API UploadAPIT }
func (TableUploadReqT) GetWhTableUploads ¶
func (tableUploadReq TableUploadReqT) GetWhTableUploads() ([]*proto.WHTable, error)
type TableUploadResT ¶
type TableUploadStatusInfoT ¶
type TableUploadStatusInfoT struct {
// contains filtered or unexported fields
}
TableUploadStatusInfoT captures the status and error for [uploadID][tableName]
type TableUploadStatusT ¶
type TableUploadStatusT struct {
// contains filtered or unexported fields
}
TableUploadStatusT captures the status of each table upload along with its parent upload_job's info like destionation_id and namespace
type TableUploadT ¶
type TableUploadT struct {
// contains filtered or unexported fields
}
func NewTableUpload ¶
func NewTableUpload(uploadID int64, tableName string) *TableUploadT
type TablesResT ¶
type TablesResT struct {
Tables []TableUploadResT `json:"tables,omitempty"`
}
type UploadAPIT ¶
type UploadAPIT struct {
// contains filtered or unexported fields
}
var UploadAPI UploadAPIT
type UploadColumnT ¶
type UploadColumnT struct { Column string Value interface{} }
type UploadColumnsOpts ¶
type UploadColumnsOpts struct { Fields []UploadColumnT Txn *sql.Tx }
type UploadJobT ¶
type UploadJobT struct {
// contains filtered or unexported fields
}
func (*UploadJobT) Aborted ¶
func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool
Aborted makes a check that if the state of the job should be aborted
func (*UploadJobT) GetFirstLastEvent ¶
func (job *UploadJobT) GetFirstLastEvent() (time.Time, time.Time)
func (*UploadJobT) GetLoadFileGenStartTIme ¶
func (job *UploadJobT) GetLoadFileGenStartTIme() time.Time
func (*UploadJobT) GetLoadFileType ¶
func (job *UploadJobT) GetLoadFileType() string
func (*UploadJobT) GetLoadFilesMetadata ¶
func (job *UploadJobT) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptionsT) (loadFiles []warehouseutils.LoadFileT)
func (*UploadJobT) GetLocalSchema ¶
func (job *UploadJobT) GetLocalSchema() warehouseutils.SchemaT
func (*UploadJobT) GetSampleLoadFileLocation ¶
func (job *UploadJobT) GetSampleLoadFileLocation(tableName string) (location string, err error)
func (*UploadJobT) GetSchemaInWarehouse ¶
func (job *UploadJobT) GetSchemaInWarehouse() (schema warehouseutils.SchemaT)
func (*UploadJobT) GetSingleLoadFile ¶
func (job *UploadJobT) GetSingleLoadFile(tableName string) (warehouseutils.LoadFileT, error)
func (*UploadJobT) GetTableSchemaInUpload ¶
func (job *UploadJobT) GetTableSchemaInUpload(tableName string) warehouseutils.TableSchemaT
func (*UploadJobT) GetTableSchemaInWarehouse ¶
func (job *UploadJobT) GetTableSchemaInWarehouse(tableName string) warehouseutils.TableSchemaT
func (*UploadJobT) ShouldOnDedupUseNewRecord ¶
func (job *UploadJobT) ShouldOnDedupUseNewRecord() bool
func (*UploadJobT) UpdateLocalSchema ¶
func (job *UploadJobT) UpdateLocalSchema(schema warehouseutils.SchemaT) error
func (*UploadJobT) UseRudderStorage ¶
func (job *UploadJobT) UseRudderStorage() bool
type UploadPagination ¶
type UploadReqT ¶
type UploadReqT struct { WorkspaceID string UploadId int64 API UploadAPIT }
func (UploadReqT) GetWHUpload ¶
func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error)
func (UploadReqT) TriggerWHUpload ¶
func (uploadReq UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsResponse, err error)
type UploadResT ¶
type UploadResT struct { ID int64 `json:"id"` Namespace string `json:"namespace"` SourceID string `json:"source_id"` DestinationID string `json:"destination_id"` DestinationType string `json:"destination_type"` Status string `json:"status"` Error string `json:"error"` Attempt int32 `json:"attempt"` Duration int32 `json:"duration"` NextRetryTime string `json:"nextRetryTime"` FirstEventAt time.Time `json:"first_event_at"` LastEventAt time.Time `json:"last_event_at"` Tables []TableUploadResT `json:"tables,omitempty"` }
type UploadStatusOpts ¶
type UploadStatusOpts struct { Status string AdditionalFields []UploadColumnT ReportingMetric types.PUReportedMetric }
type UploadT ¶
type UploadT struct { ID int64 Namespace string SourceID string SourceType string SourceCategory string DestinationID string DestinationType string StartStagingFileID int64 EndStagingFileID int64 StartLoadFileID int64 EndLoadFileID int64 Status string UploadSchema warehouseutils.SchemaT MergedSchema warehouseutils.SchemaT Error json.RawMessage Timings []map[string]string FirstAttemptAt time.Time LastAttemptAt time.Time Attempts int64 Metadata json.RawMessage FirstEventAt time.Time LastEventAt time.Time UseRudderStorage bool LoadFileGenStartTime time.Time TimingsObj sql.NullString Priority int // cloud sources specific info SourceBatchID string SourceTaskID string SourceTaskRunID string SourceJobID string SourceJobRunID string LoadFileType string }
type UploadsReqT ¶
type UploadsReqT struct { WorkspaceID string SourceID string DestinationID string DestinationType string Status string Limit int32 Offset int32 API UploadAPIT }
func (*UploadsReqT) GetWhUploads ¶
func (uploadsReq *UploadsReqT) GetWhUploads() (uploadsRes *proto.WHUploadsResponse, err error)
func (*UploadsReqT) TriggerWhUploads ¶
func (uploadsReq *UploadsReqT) TriggerWhUploads() (response *proto.TriggerWhUploadsResponse, err error)
type UploadsResT ¶
type UploadsResT struct { Uploads []UploadResT `json:"uploads"` Pagination UploadPagination `json:"pagination"` }
type WarehouseAdmin ¶
type WarehouseAdmin struct{}
func (*WarehouseAdmin) ConfigurationTest ¶
func (wh *WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error
ConfigurationTest test the underlying warehouse destination
func (*WarehouseAdmin) Query ¶
func (wh *WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) error
Query the underlying warehouse
func (*WarehouseAdmin) TriggerUpload ¶
func (wh *WarehouseAdmin) TriggerUpload(off bool, reply *string) error
TriggerUpload sets uploads to start without delay
type WorkerIdentifierT ¶
type WorkerIdentifierT string