Documentation ¶
Index ¶
- Constants
- Variables
- func CheckCurrentTimeExistsInExcludeWindow(currentTime time.Time, windowStartTime, windowEndTime string) bool
- func CheckForWarehouseEnvVars() bool
- func CheckPGHealth(dbHandle *sql.DB) bool
- func ClauseQueryArgs(filterClauses ...FilterClause) (string, []interface{})
- func DurationBeforeNextAttempt(attempt int64) time.Duration
- func GetExcludeWindowStartEndTimes(excludeWindow map[string]interface{}) (string, string)
- func GetPrevScheduledTime(syncFrequency, syncStartAt string, currTime time.Time) time.Time
- func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error)
- func Init()
- func Init2()
- func Init3()
- func Init4()
- func Init5()
- func Init6()
- func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error
- func MergeSchema(currentSchema warehouseutils.SchemaT, schemaList []warehouseutils.SchemaT, ...) warehouseutils.SchemaT
- func PickupStagingConfiguration(job *Payload) bool
- func ScheduledTimes(syncFrequency, syncStartAt string) []int
- func Setup(ctx context.Context) error
- func Start(ctx context.Context, app app.App) error
- func TriggerUploadHandler(sourceID, destID string) error
- type AsyncJobRunResult
- type BatchRouterEventT
- type ConfigurationTestInput
- type ConfigurationTestOutput
- type ConstraintsI
- type ConstraintsViolationT
- type DB
- func (db *DB) GetLatestUploadStatus(ctx context.Context, destType, sourceID, destinationID string) (int64, string, int, error)
- func (db *DB) GetUploadsCount(ctx context.Context, filterClauses ...FilterClause) (count int64, err error)
- func (db *DB) RetryUploads(ctx context.Context, filterClauses ...FilterClause) (rowsAffected int64, err error)
- type DataT
- type ErrorHandler
- type ErrorResponseT
- type FilterClause
- type HandleT
- type IndexConstraintT
- type InvalidDestinationCredErr
- type JobIDT
- type JobRunT
- type LoadFileJobT
- type MetadataT
- type ObjectStorageValidationRequest
- type Payload
- type QueryInput
- type RetryRequest
- type RetryResponse
- type SchemaHandleT
- type TableSkipError
- type TableUploadIDInfoT
- type TableUploadReqT
- type TableUploadResT
- type TableUploadStatusInfoT
- type TableUploadStatusT
- type TableUploadT
- type TablesResT
- type Tag
- type UploadAPIT
- type UploadColumnT
- type UploadColumnsOpts
- type UploadJobFactory
- type UploadJobT
- func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool
- func (job *UploadJobT) DTO() *model.UploadJob
- func (job *UploadJobT) GetFirstLastEvent() (time.Time, time.Time)
- func (job *UploadJobT) GetLoadFileGenStartTIme() time.Time
- func (job *UploadJobT) GetLoadFileType() string
- func (job *UploadJobT) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptionsT) (loadFiles []warehouseutils.LoadFileT)
- func (job *UploadJobT) GetLocalSchema() warehouseutils.SchemaT
- func (job *UploadJobT) GetSampleLoadFileLocation(tableName string) (location string, err error)
- func (job *UploadJobT) GetSchemaInWarehouse() (schema warehouseutils.SchemaT)
- func (job *UploadJobT) GetSingleLoadFile(tableName string) (warehouseutils.LoadFileT, error)
- func (job *UploadJobT) GetTableSchemaInUpload(tableName string) warehouseutils.TableSchemaT
- func (job *UploadJobT) GetTableSchemaInWarehouse(tableName string) warehouseutils.TableSchemaT
- func (job *UploadJobT) RefreshPartitions(loadFileStartID, loadFileEndID int64) error
- func (job *UploadJobT) ShouldOnDedupUseNewRecord() bool
- func (job *UploadJobT) UpdateLocalSchema(schema warehouseutils.SchemaT) error
- func (job *UploadJobT) UpdateTableSchema(tName string, tableSchemaDiff warehouseutils.TableSchemaDiffT) (err error)
- func (job *UploadJobT) UseRudderStorage() bool
- type UploadPagination
- type UploadReqT
- type UploadResT
- type UploadStatusOpts
- type UploadsReqT
- type UploadsResT
- type WarehouseAdmin
- type WorkerIdentifierT
Constants ¶
const ( TriggeredSuccessfully = "Triggered successfully" NoPendingEvents = "No pending events to sync for this destination" DownloadFileNamePattern = "downloadfile.*.tmp" NoSuchSync = "No such sync exist" )
const ( STATS_WORKER_IDLE_TIME = "worker_idle_time" STATS_WORKER_CLAIM_PROCESSING_TIME = "worker_claim_processing_time" STATS_WORKER_CLAIM_PROCESSING_FAILED = "worker_claim_processing_failed" STATS_WORKER_CLAIM_PROCESSING_SUCCEEDED = "worker_claim_processing_succeeded" TAG_WORKERID = "workerId" )
const ( GeneratingStagingFileFailedState = "generating_staging_file_failed" GeneratedStagingFileState = "generated_staging_file" FetchingRemoteSchemaFailed = "fetching_remote_schema_failed" InternalProcessingFailed = "internal_processing_failed" )
const ( TableUploadExecuting = "executing" TableUploadUpdatingSchemaFailed = "updating_schema_failed" TableUploadUpdatedSchema = "updated_schema" TableUploadExportingFailed = "exporting_data_failed" UserTableUploadExportingFailed = "exporting_user_tables_failed" IdentityTableUploadExportingFailed = "exporting_identities_failed" TableUploadExported = "exported_data" )
Table Upload status
const ( CloudSourceCategory = "cloud" SingerProtocolSourceCategory = "singer-protocol" )
const ( UploadStatusField = "status" UploadStartLoadFileIDField = "start_load_file_id" UploadEndLoadFileIDField = "end_load_file_id" UploadUpdatedAtField = "updated_at" UploadTimingsField = "timings" UploadSchemaField = "schema" MergedSchemaField = "mergedschema" UploadLastExecAtField = "last_exec_at" UploadInProgress = "in_progress" )
const ( EmbeddedMode = "embedded" EmbeddedMasterMode = "embedded_master" )
warehouses worker modes
const (
DegradedMode = "degraded"
)
const (
WorkerProcessingDownloadStagingFileFailed = "worker_processing_download_staging_file_failed"
)
Variables ¶
var ( ErrIncompatibleSchemaConversion = errors.New("incompatible schema conversion") ErrSchemaConversionNotSupported = errors.New("schema conversion not supported") )
var (
ShouldForceSetLowerVersion bool
)
Functions ¶
func CheckCurrentTimeExistsInExcludeWindow ¶ added in v0.1.10
func CheckForWarehouseEnvVars ¶ added in v0.1.10
func CheckForWarehouseEnvVars() bool
CheckForWarehouseEnvVars Checks if all the required Env Variables for Warehouse are present
func CheckPGHealth ¶
func ClauseQueryArgs ¶ added in v1.2.0
func ClauseQueryArgs(filterClauses ...FilterClause) (string, []interface{})
func DurationBeforeNextAttempt ¶ added in v1.2.0
func GetExcludeWindowStartEndTimes ¶ added in v1.2.0
func GetPrevScheduledTime ¶
GetPrevScheduledTime returns the closest previous scheduled time e.g. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00) prev scheduled time for current time (e.g. 18:00 -> 16:00 same day, 00:30 -> 22:00 prev day)
func HandleSchemaChange ¶ added in v0.1.10
func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error)
func InitWarehouseAPI ¶ added in v0.1.10
func MergeSchema ¶ added in v1.6.0
func MergeSchema(currentSchema warehouseutils.SchemaT, schemaList []warehouseutils.SchemaT, currentMergedSchema warehouseutils.SchemaT, warehouseType string) warehouseutils.SchemaT
func PickupStagingConfiguration ¶ added in v1.0.2
func ScheduledTimes ¶
ScheduledTimes returns all possible start times (minutes from start of day) as per schedule e.g. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00)
func Setup ¶ added in v1.4.0
Setup prepares the database connection for warehouse service, verifies database compatibility and creates the required tables
func TriggerUploadHandler ¶ added in v0.1.10
Types ¶
type AsyncJobRunResult ¶ added in v1.2.0
type BatchRouterEventT ¶ added in v0.1.10
func (*BatchRouterEventT) GetColumnInfo ¶ added in v0.1.10
func (event *BatchRouterEventT) GetColumnInfo(columnName string) (columnInfo warehouseutils.ColumnInfo, ok bool)
type ConfigurationTestInput ¶ added in v1.0.2
type ConfigurationTestInput struct {
DestID string
}
type ConfigurationTestOutput ¶ added in v1.0.2
type ConstraintsI ¶ added in v0.1.10
type ConstraintsI interface {
// contains filtered or unexported methods
}
type ConstraintsViolationT ¶ added in v0.1.10
func ViolatedConstraints ¶ added in v0.1.10
func ViolatedConstraints(destinationType string, brEvent *BatchRouterEventT, columnName string) (cv *ConstraintsViolationT)
type DB ¶ added in v1.0.2
type DB struct {
// contains filtered or unexported fields
}
DB encapsulate interactions of warehouse operations with the database.
func NewWarehouseDB ¶ added in v1.0.2
func (*DB) GetLatestUploadStatus ¶ added in v1.0.2
func (*DB) GetUploadsCount ¶ added in v1.2.0
func (*DB) RetryUploads ¶ added in v1.2.0
type ErrorHandler ¶ added in v1.6.0
func (*ErrorHandler) MatchErrorMappings ¶ added in v1.6.0
func (e *ErrorHandler) MatchErrorMappings(err error) Tag
MatchErrorMappings matches the error with the error mappings defined in the integrations and returns the corresponding joins of the matched error type else returns UnknownError
type ErrorResponseT ¶
type ErrorResponseT struct {
Error string
}
type FilterClause ¶ added in v1.2.0
type FilterClause struct { Clause string ClauseArg interface{} }
type HandleT ¶
type HandleT struct { Now func() time.Time NowSQL string Logger logger.Logger // contains filtered or unexported fields }
func (*HandleT) CronTracker ¶ added in v1.5.0
type IndexConstraintT ¶ added in v0.1.10
type InvalidDestinationCredErr ¶ added in v1.2.0
func (InvalidDestinationCredErr) Error ¶ added in v1.2.0
func (err InvalidDestinationCredErr) Error() string
type JobRunT ¶ added in v0.1.10
type JobRunT struct {
// contains filtered or unexported fields
}
JobRunT Temporary store for processing staging file to load file
func (*JobRunT) GetWriter ¶ added in v0.1.10
func (jobRun *JobRunT) GetWriter(tableName string) (warehouseutils.LoadFileWriterI, error)
type LoadFileJobT ¶
type ObjectStorageValidationRequest ¶ added in v1.2.0
type Payload ¶ added in v1.2.0
type Payload struct { BatchID string UploadID int64 StagingFileID int64 StagingFileLocation string UploadSchema map[string]map[string]string WorkspaceID string SourceID string SourceName string DestinationID string DestinationName string DestinationType string DestinationNamespace string DestinationRevisionID string StagingDestinationRevisionID string DestinationConfig map[string]interface{} StagingDestinationConfig interface{} UseRudderStorage bool StagingUseRudderStorage bool UniqueLoadGenID string RudderStoragePrefix string Output []loadFileUploadOutputT LoadFilePrefix string // prefix for the load file name LoadFileType string }
type QueryInput ¶ added in v0.1.10
type RetryRequest ¶ added in v0.1.10
type RetryRequest struct { WorkspaceID string SourceID string DestinationID string DestinationType string IntervalInHours int64 // Optional, if provided we will retry based on the interval provided UploadIds []int64 // Optional, if provided we will retry the upload ids provided ForceRetry bool API UploadAPIT }
func (*RetryRequest) RetryWHUploads ¶ added in v0.1.10
func (retryReq *RetryRequest) RetryWHUploads(ctx context.Context) (response RetryResponse, err error)
func (*RetryRequest) UploadsToRetry ¶ added in v1.2.0
func (retryReq *RetryRequest) UploadsToRetry(ctx context.Context) (response RetryResponse, err error)
type RetryResponse ¶ added in v0.1.10
type SchemaHandleT ¶ added in v0.1.10
type SchemaHandleT struct {
// contains filtered or unexported fields
}
func (*SchemaHandleT) SkipDeprecatedColumns ¶ added in v1.6.0
func (sh *SchemaHandleT) SkipDeprecatedColumns(schema warehouseutils.SchemaT)
type TableSkipError ¶ added in v0.1.10
type TableSkipError struct {
// contains filtered or unexported fields
}
TableSkipError is a custom error type to capture if a table load is skipped because of a previously failed table load
func (*TableSkipError) Error ¶ added in v0.1.10
func (tse *TableSkipError) Error() string
type TableUploadIDInfoT ¶ added in v0.1.10
type TableUploadIDInfoT struct {
// contains filtered or unexported fields
}
TableUploadIDInfoT captures the uploadID and error for [uploadID][tableName]
type TableUploadReqT ¶ added in v0.1.10
type TableUploadReqT struct { UploadID int64 Name string API UploadAPIT }
func (TableUploadReqT) GetWhTableUploads ¶ added in v0.1.10
func (tableUploadReq TableUploadReqT) GetWhTableUploads() ([]*proto.WHTable, error)
type TableUploadResT ¶ added in v0.1.10
type TableUploadStatusInfoT ¶ added in v0.1.10
type TableUploadStatusInfoT struct {
// contains filtered or unexported fields
}
TableUploadStatusInfoT captures the status and error for [uploadID][tableName]
type TableUploadStatusT ¶ added in v0.1.10
type TableUploadStatusT struct {
// contains filtered or unexported fields
}
TableUploadStatusT captures the status of each table upload along with its parent upload_job's info like destination_id and namespace
type TableUploadT ¶ added in v0.1.10
type TableUploadT struct {
// contains filtered or unexported fields
}
func NewTableUpload ¶ added in v0.1.10
func NewTableUpload(uploadID int64, tableName string) *TableUploadT
type TablesResT ¶ added in v0.1.10
type TablesResT struct {
Tables []TableUploadResT `json:"tables,omitempty"`
}
type UploadAPIT ¶ added in v0.1.10
type UploadAPIT struct {
// contains filtered or unexported fields
}
var UploadAPI UploadAPIT
type UploadColumnT ¶ added in v0.1.10
type UploadColumnT struct { Column string Value interface{} }
type UploadColumnsOpts ¶ added in v0.1.10
type UploadColumnsOpts struct { Fields []UploadColumnT Txn *sql.Tx }
type UploadJobFactory ¶ added in v1.6.0
type UploadJobFactory struct {
// contains filtered or unexported fields
}
func (*UploadJobFactory) NewUploadJob ¶ added in v1.6.0
func (f *UploadJobFactory) NewUploadJob(dto *model.UploadJob, whManager manager.Manager) *UploadJobT
type UploadJobT ¶ added in v0.1.10
type UploadJobT struct { LoadFileGenStartTime time.Time RefreshPartitionBatchSize int AlertSender alerta.AlertSender ErrorHandler ErrorHandler // contains filtered or unexported fields }
func (*UploadJobT) Aborted ¶ added in v0.1.10
func (job *UploadJobT) Aborted(attempts int, startTime time.Time) bool
Aborted makes a check that if the state of the job should be aborted
func (*UploadJobT) DTO ¶ added in v1.6.0
func (job *UploadJobT) DTO() *model.UploadJob
func (*UploadJobT) GetFirstLastEvent ¶ added in v1.0.2
func (job *UploadJobT) GetFirstLastEvent() (time.Time, time.Time)
func (*UploadJobT) GetLoadFileGenStartTIme ¶ added in v0.1.10
func (job *UploadJobT) GetLoadFileGenStartTIme() time.Time
func (*UploadJobT) GetLoadFileType ¶ added in v0.1.10
func (job *UploadJobT) GetLoadFileType() string
func (*UploadJobT) GetLoadFilesMetadata ¶ added in v0.1.10
func (job *UploadJobT) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptionsT) (loadFiles []warehouseutils.LoadFileT)
func (*UploadJobT) GetLocalSchema ¶ added in v0.1.10
func (job *UploadJobT) GetLocalSchema() warehouseutils.SchemaT
func (*UploadJobT) GetSampleLoadFileLocation ¶ added in v0.1.10
func (job *UploadJobT) GetSampleLoadFileLocation(tableName string) (location string, err error)
func (*UploadJobT) GetSchemaInWarehouse ¶ added in v0.1.10
func (job *UploadJobT) GetSchemaInWarehouse() (schema warehouseutils.SchemaT)
func (*UploadJobT) GetSingleLoadFile ¶ added in v0.1.10
func (job *UploadJobT) GetSingleLoadFile(tableName string) (warehouseutils.LoadFileT, error)
func (*UploadJobT) GetTableSchemaInUpload ¶ added in v0.1.10
func (job *UploadJobT) GetTableSchemaInUpload(tableName string) warehouseutils.TableSchemaT
func (*UploadJobT) GetTableSchemaInWarehouse ¶ added in v0.1.10
func (job *UploadJobT) GetTableSchemaInWarehouse(tableName string) warehouseutils.TableSchemaT
func (*UploadJobT) RefreshPartitions ¶ added in v1.6.0
func (job *UploadJobT) RefreshPartitions(loadFileStartID, loadFileEndID int64) error
func (*UploadJobT) ShouldOnDedupUseNewRecord ¶ added in v0.1.10
func (job *UploadJobT) ShouldOnDedupUseNewRecord() bool
func (*UploadJobT) UpdateLocalSchema ¶ added in v0.1.10
func (job *UploadJobT) UpdateLocalSchema(schema warehouseutils.SchemaT) error
func (*UploadJobT) UpdateTableSchema ¶ added in v1.6.0
func (job *UploadJobT) UpdateTableSchema(tName string, tableSchemaDiff warehouseutils.TableSchemaDiffT) (err error)
func (*UploadJobT) UseRudderStorage ¶ added in v0.1.10
func (job *UploadJobT) UseRudderStorage() bool
type UploadPagination ¶ added in v0.1.10
type UploadReqT ¶ added in v0.1.10
type UploadReqT struct { WorkspaceID string UploadId int64 API UploadAPIT }
func (*UploadReqT) GetWHUpload ¶ added in v0.1.10
func (uploadReq *UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error)
func (*UploadReqT) TriggerWHUpload ¶ added in v0.1.10
func (uploadReq *UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsResponse, err error)
type UploadResT ¶ added in v0.1.10
type UploadResT struct { ID int64 `json:"id"` Namespace string `json:"namespace"` SourceID string `json:"source_id"` DestinationID string `json:"destination_id"` DestinationType string `json:"destination_type"` Status string `json:"status"` Error string `json:"error"` Attempt int32 `json:"attempt"` Duration int32 `json:"duration"` NextRetryTime string `json:"nextRetryTime"` FirstEventAt time.Time `json:"first_event_at"` LastEventAt time.Time `json:"last_event_at"` Tables []TableUploadResT `json:"tables,omitempty"` }
type UploadStatusOpts ¶ added in v0.1.10
type UploadStatusOpts struct { Status string AdditionalFields []UploadColumnT ReportingMetric types.PUReportedMetric }
type UploadsReqT ¶ added in v0.1.10
type UploadsReqT struct { WorkspaceID string SourceID string DestinationID string DestinationType string Status string Limit int32 Offset int32 API UploadAPIT }
func (*UploadsReqT) GetWhUploads ¶ added in v0.1.10
func (uploadsReq *UploadsReqT) GetWhUploads() (uploadsRes *proto.WHUploadsResponse, err error)
func (*UploadsReqT) TriggerWhUploads ¶ added in v0.1.10
func (uploadsReq *UploadsReqT) TriggerWhUploads() (response *proto.TriggerWhUploadsResponse, err error)
type UploadsResT ¶ added in v0.1.10
type UploadsResT struct { Uploads []UploadResT `json:"uploads"` Pagination UploadPagination `json:"pagination"` }
type WarehouseAdmin ¶ added in v0.1.10
type WarehouseAdmin struct{}
func (*WarehouseAdmin) ConfigurationTest ¶ added in v1.0.2
func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error
ConfigurationTest test the underlying warehouse destination
func (*WarehouseAdmin) Query ¶ added in v0.1.10
func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) error
Query the underlying warehouse
func (*WarehouseAdmin) TriggerUpload ¶ added in v0.1.10
func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error
TriggerUpload sets uploads to start without delay
type WorkerIdentifierT ¶ added in v0.1.10
type WorkerIdentifierT string