Documentation ¶
Index ¶
- Constants
- Variables
- func CheckCurrentTimeExistsInExcludeWindow(currentTime time.Time, windowStartTime, windowEndTime string) bool
- func CheckForWarehouseEnvVars() bool
- func CheckPGHealth(dbHandle *sql.DB) bool
- func ClauseQueryArgs(filterClauses ...FilterClause) (string, []interface{})
- func DeepCopy(src, dest interface{}) error
- func DurationBeforeNextAttempt(attempt int64) time.Duration
- func GetExcludeWindowStartEndTimes(excludeWindow map[string]interface{}) (string, string)
- func GetKeyAsBool(key string, conf map[string]interface{}) bool
- func GetPrevScheduledTime(syncFrequency, syncStartAt string, currTime time.Time) time.Time
- func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error)
- func Init()
- func Init2()
- func Init3()
- func Init4()
- func Init5()
- func Init6()
- func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error
- func PickupStagingConfiguration(job *Payload) bool
- func ScheduledTimes(syncFrequency, syncStartAt string) []int
- func Setup(ctx context.Context) error
- func Start(ctx context.Context, app app.App) error
- func TriggerUploadHandler(sourceID, destID string) error
- type AsyncJobRunResult
- type BatchRouterEventT
- type ConfigurationTestInput
- type ConfigurationTestOutput
- type ConstraintsI
- type ConstraintsViolationT
- type DB
- func (db *DB) GetLatestUploadStatus(ctx context.Context, destType, sourceID, destinationID string) (int64, string, int, error)
- func (db *DB) GetUploadsCount(ctx context.Context, filterClauses ...FilterClause) (count int64, err error)
- func (db *DB) RetryUploads(ctx context.Context, filterClauses ...FilterClause) (rowsAffected int64, err error)
- type DataT
- type ErrorResponseT
- type FilterClause
- type HandleT
- type IndexConstraintT
- type InvalidDestinationCredErr
- type JobIDT
- type JobRunT
- type LoadFileJobT
- type MetadataT
- type ObjectStorageValidationRequest
- type Payload
- type ProcessStagingFilesJobT
- type QueryInput
- type RetryRequest
- type RetryResponse
- type SchemaHandleT
- type TableSkipError
- type TableUploadIDInfoT
- type TableUploadReqT
- type TableUploadResT
- type TableUploadStatusInfoT
- type TableUploadStatusT
- type TableUploadT
- type TablesResT
- type Upload
- type UploadAPIT
- type UploadColumnT
- type UploadColumnsOpts
- type UploadJobT
- func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool
- func (job *UploadJobT) GetFirstLastEvent() (time.Time, time.Time)
- func (job *UploadJobT) GetLoadFileGenStartTIme() time.Time
- func (job *UploadJobT) GetLoadFileType() string
- func (job *UploadJobT) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptionsT) (loadFiles []warehouseutils.LoadFileT)
- func (job *UploadJobT) GetLocalSchema() warehouseutils.SchemaT
- func (job *UploadJobT) GetSampleLoadFileLocation(tableName string) (location string, err error)
- func (job *UploadJobT) GetSchemaInWarehouse() (schema warehouseutils.SchemaT)
- func (job *UploadJobT) GetSingleLoadFile(tableName string) (warehouseutils.LoadFileT, error)
- func (job *UploadJobT) GetTableSchemaInUpload(tableName string) warehouseutils.TableSchemaT
- func (job *UploadJobT) GetTableSchemaInWarehouse(tableName string) warehouseutils.TableSchemaT
- func (job *UploadJobT) ShouldOnDedupUseNewRecord() bool
- func (job *UploadJobT) UpdateLocalSchema(schema warehouseutils.SchemaT) error
- func (job *UploadJobT) UseRudderStorage() bool
- type UploadPagination
- type UploadReqT
- type UploadResT
- type UploadStatusOpts
- type UploadsReqT
- type UploadsResT
- type WarehouseAdmin
- type WorkerIdentifierT
Constants ¶
const ( TriggeredSuccessfully = "Triggered successfully" NoPendingEvents = "No pending events to sync for this destination" DownloadFileNamePattern = "downloadfile.*.tmp" NoSuchSync = "No such sync exist" )
const ( STATS_WORKER_IDLE_TIME = "worker_idle_time" STATS_WORKER_CLAIM_PROCESSING_TIME = "worker_claim_processing_time" STATS_WORKER_CLAIM_PROCESSING_FAILED = "worker_claim_processing_failed" STATS_WORKER_CLAIM_PROCESSING_SUCCEEDED = "worker_claim_processing_succeeded" TAG_WORKERID = "workerId" )
const ( GeneratingStagingFileFailedState = "generating_staging_file_failed" GeneratedStagingFileState = "generated_staging_file" PopulatingHistoricIdentitiesState = "populating_historic_identities" PopulatingHistoricIdentitiesStateFailed = "populating_historic_identities_failed" FetchingRemoteSchemaFailed = "fetching_remote_schema_failed" InternalProcessingFailed = "internal_processing_failed" )
const ( TableUploadExecuting = "executing" TableUploadUpdatingSchema = "updating_schema" TableUploadUpdatingSchemaFailed = "updating_schema_failed" TableUploadUpdatedSchema = "updated_schema" TableUploadExporting = "exporting_data" TableUploadExportingFailed = "exporting_data_failed" UserTableUploadExportingFailed = "exporting_user_tables_failed" IdentityTableUploadExportingFailed = "exporting_identities_failed" TableUploadExported = "exported_data" )
Table Upload status
const ( UploadStatusField = "status" UploadStartLoadFileIDField = "start_load_file_id" UploadEndLoadFileIDField = "end_load_file_id" UploadUpdatedAtField = "updated_at" UploadTimingsField = "timings" UploadSchemaField = "schema" MergedSchemaField = "mergedschema" UploadLastExecAtField = "last_exec_at" UploadInProgress = "in_progress" )
const ( MasterMode = "master" SlaveMode = "slave" MasterSlaveMode = "master_and_slave" EmbeddedMode = "embedded" EmbeddedMasterMode = "embedded_master" )
warehouses worker modes
const (
CloudSourceCateogry = "cloud"
)
const (
DegradedMode = "degraded"
)
const (
WorkerProcessingDownloadStagingFileFailed = "worker_processing_download_staging_file_failed"
)
Variables ¶
var ( ErrIncompatibleSchemaConversion = errors.New("incompatible schema conversion") ErrSchemaConversionNotSupported = errors.New("schema conversion not supported") )
var (
ShouldForceSetLowerVersion bool
)
Functions ¶
func CheckCurrentTimeExistsInExcludeWindow ¶ added in v0.1.10
func CheckForWarehouseEnvVars ¶ added in v0.1.10
func CheckForWarehouseEnvVars() bool
CheckForWarehouseEnvVars Checks if all the required Env Variables for Warehouse are present
func CheckPGHealth ¶
func ClauseQueryArgs ¶ added in v1.2.0
func ClauseQueryArgs(filterClauses ...FilterClause) (string, []interface{})
func DurationBeforeNextAttempt ¶ added in v1.2.0
func GetExcludeWindowStartEndTimes ¶ added in v1.2.0
func GetKeyAsBool ¶ added in v1.5.0
func GetPrevScheduledTime ¶
GetPrevScheduledTime returns the closest previous scheduled time e.g. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00) prev scheduled time for current time (e.g. 18:00 -> 16:00 same day, 00:30 -> 22:00 prev day)
func HandleSchemaChange ¶ added in v0.1.10
func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error)
func InitWarehouseAPI ¶ added in v0.1.10
func PickupStagingConfiguration ¶ added in v1.0.2
func ScheduledTimes ¶
ScheduledTimes returns all possible start times (minutes from start of day) as per schedule e.g. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00)
func Setup ¶ added in v1.4.0
Setup prepares the database connection for warehouse service, verifies database compatibility and creates the required tables
func TriggerUploadHandler ¶ added in v0.1.10
Types ¶
type AsyncJobRunResult ¶ added in v1.2.0
type BatchRouterEventT ¶ added in v0.1.10
func (*BatchRouterEventT) GetColumnInfo ¶ added in v0.1.10
func (event *BatchRouterEventT) GetColumnInfo(columnName string) (columnInfo warehouseutils.ColumnInfo, ok bool)
type ConfigurationTestInput ¶ added in v1.0.2
type ConfigurationTestInput struct {
DestID string
}
type ConfigurationTestOutput ¶ added in v1.0.2
type ConstraintsI ¶ added in v0.1.10
type ConstraintsI interface {
// contains filtered or unexported methods
}
type ConstraintsViolationT ¶ added in v0.1.10
func ViolatedConstraints ¶ added in v0.1.10
func ViolatedConstraints(destinationType string, brEvent *BatchRouterEventT, columnName string) (cv *ConstraintsViolationT)
type DB ¶ added in v1.0.2
type DB struct {
// contains filtered or unexported fields
}
DB encapsulate interactions of warehouse operations with the database.
func NewWarehouseDB ¶ added in v1.0.2
func (*DB) GetLatestUploadStatus ¶ added in v1.0.2
func (*DB) GetUploadsCount ¶ added in v1.2.0
func (*DB) RetryUploads ¶ added in v1.2.0
type ErrorResponseT ¶
type ErrorResponseT struct {
Error string
}
type FilterClause ¶ added in v1.2.0
type FilterClause struct { Clause string ClauseArg interface{} }
type HandleT ¶
type HandleT struct { Now func() time.Time NowSQL string Logger logger.Logger // contains filtered or unexported fields }
func (*HandleT) CronTracker ¶ added in v1.5.0
type IndexConstraintT ¶ added in v0.1.10
type InvalidDestinationCredErr ¶ added in v1.2.0
func (InvalidDestinationCredErr) Error ¶ added in v1.2.0
func (err InvalidDestinationCredErr) Error() string
type JobRunT ¶ added in v0.1.10
type JobRunT struct {
// contains filtered or unexported fields
}
JobRunT Temporary store for processing staging file to load file
func (*JobRunT) GetWriter ¶ added in v0.1.10
func (jobRun *JobRunT) GetWriter(tableName string) (warehouseutils.LoadFileWriterI, error)
type LoadFileJobT ¶
type ObjectStorageValidationRequest ¶ added in v1.2.0
type Payload ¶ added in v1.2.0
type Payload struct { BatchID string UploadID int64 StagingFileID int64 StagingFileLocation string UploadSchema map[string]map[string]string WorkspaceID string SourceID string SourceName string DestinationID string DestinationName string DestinationType string DestinationNamespace string DestinationRevisionID string StagingDestinationRevisionID string DestinationConfig map[string]interface{} StagingDestinationConfig interface{} UseRudderStorage bool StagingUseRudderStorage bool UniqueLoadGenID string RudderStoragePrefix string Output []loadFileUploadOutputT LoadFilePrefix string // prefix for the load file name LoadFileType string }
type ProcessStagingFilesJobT ¶
type ProcessStagingFilesJobT struct { Upload Upload List []*model.StagingFile Warehouse warehouseutils.Warehouse }
type QueryInput ¶ added in v0.1.10
type RetryRequest ¶ added in v0.1.10
type RetryRequest struct { WorkspaceID string SourceID string DestinationID string DestinationType string IntervalInHours int64 // Optional, if provided we will retry based on the interval provided UploadIds []int64 // Optional, if provided we will retry the upload ids provided ForceRetry bool API UploadAPIT }
func (*RetryRequest) RetryWHUploads ¶ added in v0.1.10
func (retryReq *RetryRequest) RetryWHUploads(ctx context.Context) (response RetryResponse, err error)
func (*RetryRequest) UploadsToRetry ¶ added in v1.2.0
func (retryReq *RetryRequest) UploadsToRetry(ctx context.Context) (response RetryResponse, err error)
type RetryResponse ¶ added in v0.1.10
type SchemaHandleT ¶ added in v0.1.10
type SchemaHandleT struct {
// contains filtered or unexported fields
}
type TableSkipError ¶ added in v0.1.10
type TableSkipError struct {
// contains filtered or unexported fields
}
TableSkipError is a custom error type to capture if a table load is skipped because of a previously failed table load
func (*TableSkipError) Error ¶ added in v0.1.10
func (tse *TableSkipError) Error() string
type TableUploadIDInfoT ¶ added in v0.1.10
type TableUploadIDInfoT struct {
// contains filtered or unexported fields
}
TableUploadIDInfoT captures the uploadID and error for [uploadID][tableName]
type TableUploadReqT ¶ added in v0.1.10
type TableUploadReqT struct { UploadID int64 Name string API UploadAPIT }
func (TableUploadReqT) GetWhTableUploads ¶ added in v0.1.10
func (tableUploadReq TableUploadReqT) GetWhTableUploads() ([]*proto.WHTable, error)
type TableUploadResT ¶ added in v0.1.10
type TableUploadStatusInfoT ¶ added in v0.1.10
type TableUploadStatusInfoT struct {
// contains filtered or unexported fields
}
TableUploadStatusInfoT captures the status and error for [uploadID][tableName]
type TableUploadStatusT ¶ added in v0.1.10
type TableUploadStatusT struct {
// contains filtered or unexported fields
}
TableUploadStatusT captures the status of each table upload along with its parent upload_job's info like destination_id and namespace
type TableUploadT ¶ added in v0.1.10
type TableUploadT struct {
// contains filtered or unexported fields
}
func NewTableUpload ¶ added in v0.1.10
func NewTableUpload(uploadID int64, tableName string) *TableUploadT
type TablesResT ¶ added in v0.1.10
type TablesResT struct {
Tables []TableUploadResT `json:"tables,omitempty"`
}
type Upload ¶ added in v1.2.0
type Upload struct { ID int64 Namespace string WorkspaceID string SourceID string SourceType string SourceCategory string DestinationID string DestinationType string StartStagingFileID int64 EndStagingFileID int64 StartLoadFileID int64 EndLoadFileID int64 Status string UploadSchema warehouseutils.SchemaT MergedSchema warehouseutils.SchemaT Error json.RawMessage Timings []map[string]string FirstAttemptAt time.Time LastAttemptAt time.Time Attempts int64 Metadata json.RawMessage FirstEventAt time.Time LastEventAt time.Time UseRudderStorage bool LoadFileGenStartTime time.Time TimingsObj sql.NullString Priority int // cloud sources specific info SourceBatchID string SourceTaskID string SourceTaskRunID string SourceJobID string SourceJobRunID string LoadFileType string }
type UploadAPIT ¶ added in v0.1.10
type UploadAPIT struct {
// contains filtered or unexported fields
}
var UploadAPI UploadAPIT
type UploadColumnT ¶ added in v0.1.10
type UploadColumnT struct { Column string Value interface{} }
type UploadColumnsOpts ¶ added in v0.1.10
type UploadColumnsOpts struct { Fields []UploadColumnT Txn *sql.Tx }
type UploadJobT ¶ added in v0.1.10
type UploadJobT struct {
// contains filtered or unexported fields
}
func (*UploadJobT) Aborted ¶ added in v0.1.10
func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool
Aborted makes a check that if the state of the job should be aborted
func (*UploadJobT) GetFirstLastEvent ¶ added in v1.0.2
func (job *UploadJobT) GetFirstLastEvent() (time.Time, time.Time)
func (*UploadJobT) GetLoadFileGenStartTIme ¶ added in v0.1.10
func (job *UploadJobT) GetLoadFileGenStartTIme() time.Time
func (*UploadJobT) GetLoadFileType ¶ added in v0.1.10
func (job *UploadJobT) GetLoadFileType() string
func (*UploadJobT) GetLoadFilesMetadata ¶ added in v0.1.10
func (job *UploadJobT) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptionsT) (loadFiles []warehouseutils.LoadFileT)
func (*UploadJobT) GetLocalSchema ¶ added in v0.1.10
func (job *UploadJobT) GetLocalSchema() warehouseutils.SchemaT
func (*UploadJobT) GetSampleLoadFileLocation ¶ added in v0.1.10
func (job *UploadJobT) GetSampleLoadFileLocation(tableName string) (location string, err error)
func (*UploadJobT) GetSchemaInWarehouse ¶ added in v0.1.10
func (job *UploadJobT) GetSchemaInWarehouse() (schema warehouseutils.SchemaT)
func (*UploadJobT) GetSingleLoadFile ¶ added in v0.1.10
func (job *UploadJobT) GetSingleLoadFile(tableName string) (warehouseutils.LoadFileT, error)
func (*UploadJobT) GetTableSchemaInUpload ¶ added in v0.1.10
func (job *UploadJobT) GetTableSchemaInUpload(tableName string) warehouseutils.TableSchemaT
func (*UploadJobT) GetTableSchemaInWarehouse ¶ added in v0.1.10
func (job *UploadJobT) GetTableSchemaInWarehouse(tableName string) warehouseutils.TableSchemaT
func (*UploadJobT) ShouldOnDedupUseNewRecord ¶ added in v0.1.10
func (job *UploadJobT) ShouldOnDedupUseNewRecord() bool
func (*UploadJobT) UpdateLocalSchema ¶ added in v0.1.10
func (job *UploadJobT) UpdateLocalSchema(schema warehouseutils.SchemaT) error
func (*UploadJobT) UseRudderStorage ¶ added in v0.1.10
func (job *UploadJobT) UseRudderStorage() bool
type UploadPagination ¶ added in v0.1.10
type UploadReqT ¶ added in v0.1.10
type UploadReqT struct { WorkspaceID string UploadId int64 API UploadAPIT }
func (UploadReqT) GetWHUpload ¶ added in v0.1.10
func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error)
func (UploadReqT) TriggerWHUpload ¶ added in v0.1.10
func (uploadReq UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsResponse, err error)
type UploadResT ¶ added in v0.1.10
type UploadResT struct { ID int64 `json:"id"` Namespace string `json:"namespace"` SourceID string `json:"source_id"` DestinationID string `json:"destination_id"` DestinationType string `json:"destination_type"` Status string `json:"status"` Error string `json:"error"` Attempt int32 `json:"attempt"` Duration int32 `json:"duration"` NextRetryTime string `json:"nextRetryTime"` FirstEventAt time.Time `json:"first_event_at"` LastEventAt time.Time `json:"last_event_at"` Tables []TableUploadResT `json:"tables,omitempty"` }
type UploadStatusOpts ¶ added in v0.1.10
type UploadStatusOpts struct { Status string AdditionalFields []UploadColumnT ReportingMetric types.PUReportedMetric }
type UploadsReqT ¶ added in v0.1.10
type UploadsReqT struct { WorkspaceID string SourceID string DestinationID string DestinationType string Status string Limit int32 Offset int32 API UploadAPIT }
func (*UploadsReqT) GetWhUploads ¶ added in v0.1.10
func (uploadsReq *UploadsReqT) GetWhUploads() (uploadsRes *proto.WHUploadsResponse, err error)
func (*UploadsReqT) TriggerWhUploads ¶ added in v0.1.10
func (uploadsReq *UploadsReqT) TriggerWhUploads() (response *proto.TriggerWhUploadsResponse, err error)
type UploadsResT ¶ added in v0.1.10
type UploadsResT struct { Uploads []UploadResT `json:"uploads"` Pagination UploadPagination `json:"pagination"` }
type WarehouseAdmin ¶ added in v0.1.10
type WarehouseAdmin struct{}
func (*WarehouseAdmin) ConfigurationTest ¶ added in v1.0.2
func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error
ConfigurationTest test the underlying warehouse destination
func (*WarehouseAdmin) Query ¶ added in v0.1.10
func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) error
Query the underlying warehouse
func (*WarehouseAdmin) TriggerUpload ¶ added in v0.1.10
func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error
TriggerUpload sets uploads to start without delay
type WorkerIdentifierT ¶ added in v0.1.10
type WorkerIdentifierT string