Documentation ¶
Index ¶
- Constants
- Variables
- func CheckCurrentTimeExistsInExcludeWindow(currentTime time.Time, windowStartTime, windowEndTime string) bool
- func CheckForWarehouseEnvVars() bool
- func CheckPGHealth(dbHandle *sql.DB) bool
- func ClauseQueryArgs(filterClauses ...FilterClause) (string, []interface{})
- func DurationBeforeNextAttempt(attempt int64) time.Duration
- func GetExcludeWindowStartEndTimes(excludeWindow map[string]interface{}) (string, string)
- func GetPrevScheduledTime(syncFrequency, syncStartAt string, currTime time.Time) time.Time
- func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error)
- func Init()
- func Init2()
- func Init3()
- func Init4()
- func Init5()
- func Init6()
- func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error
- func MergeSchema(currentSchema model.Schema, schemaList []model.Schema, ...) model.Schema
- func PickupStagingConfiguration(job *Payload) bool
- func ScheduledTimes(syncFrequency, syncStartAt string) []int
- func Setup(ctx context.Context) error
- func Start(ctx context.Context, app app.App) error
- func TriggerUploadHandler(sourceID, destID string) error
- type AsyncJobRunResult
- type BatchRouterEvent
- type ConfigurationTestInput
- type ConfigurationTestOutput
- type Constraints
- type ConstraintsViolation
- type DB
- func (db *DB) GetLatestUploadStatus(ctx context.Context, destType, sourceID, destinationID string) (int64, string, int, error)
- func (db *DB) GetUploadsCount(ctx context.Context, filterClauses ...FilterClause) (count int64, err error)
- func (db *DB) RetryUploads(ctx context.Context, filterClauses ...FilterClause) (rowsAffected int64, err error)
- type Data
- type ErrorHandler
- type FilterClause
- type HandleT
- type IndexConstraint
- type InvalidDestinationCredErr
- type JobID
- type JobRun
- type LoadFileJob
- type Metadata
- type ObjectStorageValidationRequest
- type Payload
- type QueryInput
- type RetryRequest
- type RetryResponse
- type SchemaHandle
- type TableUploadReq
- type TableUploadRes
- type TablesRes
- type Tag
- type UploadAPIT
- type UploadColumn
- type UploadColumnsOpts
- type UploadJob
- func (job *UploadJob) Aborted(attempts int, startTime time.Time) bool
- func (job *UploadJob) DTO() *model.UploadJob
- func (job *UploadJob) GetFirstLastEvent() (time.Time, time.Time)
- func (job *UploadJob) GetLoadFileGenStartTIme() time.Time
- func (job *UploadJob) GetLoadFileType() string
- func (job *UploadJob) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptions) (loadFiles []warehouseutils.LoadFile)
- func (job *UploadJob) GetLocalSchema() (model.Schema, error)
- func (job *UploadJob) GetSampleLoadFileLocation(tableName string) (location string, err error)
- func (job *UploadJob) GetSchemaInWarehouse() (schema model.Schema)
- func (job *UploadJob) GetSingleLoadFile(tableName string) (warehouseutils.LoadFile, error)
- func (job *UploadJob) GetTableSchemaInUpload(tableName string) model.TableSchema
- func (job *UploadJob) GetTableSchemaInWarehouse(tableName string) model.TableSchema
- func (job *UploadJob) RefreshPartitions(loadFileStartID, loadFileEndID int64) error
- func (job *UploadJob) ShouldOnDedupUseNewRecord() bool
- func (job *UploadJob) TablesToSkip() (map[string]model.PendingTableUpload, map[string]model.PendingTableUpload, ...)
- func (job *UploadJob) UpdateLocalSchema(schema model.Schema) error
- func (job *UploadJob) UpdateTableSchema(tName string, tableSchemaDiff warehouseutils.TableSchemaDiff) (err error)
- func (job *UploadJob) UseRudderStorage() bool
- type UploadJobFactory
- type UploadPagination
- type UploadReq
- type UploadRes
- type UploadStatusOpts
- type UploadsReq
- type UploadsRes
- type WarehouseAdmin
- type WorkerIdentifierT
Constants ¶
const ( TriggeredSuccessfully = "Triggered successfully" NoPendingEvents = "No pending events to sync for this destination" DownloadFileNamePattern = "downloadfile.*.tmp" NoSuchSync = "No such sync exist" )
const ( GeneratingStagingFileFailedState = "generating_staging_file_failed" GeneratedStagingFileState = "generated_staging_file" FetchingRemoteSchemaFailed = "fetching_remote_schema_failed" InternalProcessingFailed = "internal_processing_failed" )
const ( CloudSourceCategory = "cloud" SingerProtocolSourceCategory = "singer-protocol" )
const ( UploadStatusField = "status" UploadStartLoadFileIDField = "start_load_file_id" UploadEndLoadFileIDField = "end_load_file_id" UploadUpdatedAtField = "updated_at" UploadTimingsField = "timings" UploadSchemaField = "schema" MergedSchemaField = "mergedschema" UploadLastExecAtField = "last_exec_at" UploadInProgress = "in_progress" )
const ( EmbeddedMode = "embedded" EmbeddedMasterMode = "embedded_master" )
warehouses worker modes
const (
DegradedMode = "degraded"
)
Variables ¶
var ( ErrIncompatibleSchemaConversion = errors.New("incompatible schema conversion") ErrSchemaConversionNotSupported = errors.New("schema conversion not supported") ErrCancellingStatement = errors.New("[error] pq: canceling statement due to user request") )
var (
ShouldForceSetLowerVersion bool
)
Functions ¶
func CheckCurrentTimeExistsInExcludeWindow ¶ added in v0.1.10
func CheckForWarehouseEnvVars ¶ added in v0.1.10
func CheckForWarehouseEnvVars() bool
CheckForWarehouseEnvVars Checks if all the required Env Variables for Warehouse are present
func CheckPGHealth ¶
func ClauseQueryArgs ¶ added in v1.2.0
func ClauseQueryArgs(filterClauses ...FilterClause) (string, []interface{})
func DurationBeforeNextAttempt ¶ added in v1.2.0
func GetExcludeWindowStartEndTimes ¶ added in v1.2.0
func GetPrevScheduledTime ¶
GetPrevScheduledTime returns the closest previous scheduled time e.g. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00) prev scheduled time for current time (e.g. 18:00 -> 16:00 same day, 00:30 -> 22:00 prev day)
func HandleSchemaChange ¶ added in v0.1.10
func HandleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error)
func InitWarehouseAPI ¶ added in v0.1.10
func MergeSchema ¶ added in v1.6.0
func PickupStagingConfiguration ¶ added in v1.0.2
func ScheduledTimes ¶
ScheduledTimes returns all possible start times (minutes from start of day) as per schedule e.g. Syncing every 3hrs starting at 13:00 (scheduled times: 13:00, 16:00, 19:00, 22:00, 01:00, 04:00, 07:00, 10:00)
func Setup ¶ added in v1.4.0
Setup prepares the database connection for warehouse service, verifies database compatibility and creates the required tables
func TriggerUploadHandler ¶ added in v0.1.10
Types ¶
type AsyncJobRunResult ¶ added in v1.2.0
type BatchRouterEvent ¶ added in v1.7.0
func (*BatchRouterEvent) GetColumnInfo ¶ added in v1.7.0
func (event *BatchRouterEvent) GetColumnInfo(columnName string) (columnInfo warehouseutils.ColumnInfo, ok bool)
type ConfigurationTestInput ¶ added in v1.0.2
type ConfigurationTestInput struct {
DestID string
}
type ConfigurationTestOutput ¶ added in v1.0.2
type Constraints ¶ added in v1.7.0
type Constraints interface {
// contains filtered or unexported methods
}
type ConstraintsViolation ¶ added in v1.7.0
func ViolatedConstraints ¶ added in v0.1.10
func ViolatedConstraints(destinationType string, brEvent *BatchRouterEvent, columnName string) (cv *ConstraintsViolation)
type DB ¶ added in v1.0.2
type DB struct {
// contains filtered or unexported fields
}
DB encapsulate interactions of warehouse operations with the database.
func NewWarehouseDB ¶ added in v1.0.2
func (*DB) GetLatestUploadStatus ¶ added in v1.0.2
func (*DB) GetUploadsCount ¶ added in v1.2.0
func (*DB) RetryUploads ¶ added in v1.2.0
type ErrorHandler ¶ added in v1.6.0
func (*ErrorHandler) MatchErrorMappings ¶ added in v1.6.0
func (e *ErrorHandler) MatchErrorMappings(err error) Tag
MatchErrorMappings matches the error with the error mappings defined in the integrations and returns the corresponding joins of the matched error type else returns UnknownError
type FilterClause ¶ added in v1.2.0
type FilterClause struct { Clause string ClauseArg interface{} }
type HandleT ¶
type HandleT struct { Now func() time.Time NowSQL string Logger logger.Logger // contains filtered or unexported fields }
func (*HandleT) CronTracker ¶ added in v1.5.0
type IndexConstraint ¶ added in v1.7.0
type InvalidDestinationCredErr ¶ added in v1.2.0
func (InvalidDestinationCredErr) Error ¶ added in v1.2.0
func (err InvalidDestinationCredErr) Error() string
type JobRun ¶ added in v1.7.0
type JobRun struct {
// contains filtered or unexported fields
}
JobRun Temporary store for processing staging file to load file
type LoadFileJob ¶ added in v1.7.0
type ObjectStorageValidationRequest ¶ added in v1.2.0
type Payload ¶ added in v1.2.0
type Payload struct { BatchID string UploadID int64 StagingFileID int64 StagingFileLocation string UploadSchema model.Schema WorkspaceID string SourceID string SourceName string DestinationID string DestinationName string DestinationType string DestinationNamespace string DestinationRevisionID string StagingDestinationRevisionID string DestinationConfig map[string]interface{} StagingDestinationConfig interface{} UseRudderStorage bool StagingUseRudderStorage bool UniqueLoadGenID string RudderStoragePrefix string Output []loadFileUploadOutput LoadFilePrefix string // prefix for the load file name LoadFileType string }
type QueryInput ¶ added in v0.1.10
type RetryRequest ¶ added in v0.1.10
type RetryRequest struct { WorkspaceID string SourceID string DestinationID string DestinationType string IntervalInHours int64 // Optional, if provided we will retry based on the interval provided UploadIds []int64 // Optional, if provided we will retry the upload ids provided ForceRetry bool API UploadAPIT }
func (*RetryRequest) RetryWHUploads ¶ added in v0.1.10
func (retryReq *RetryRequest) RetryWHUploads(ctx context.Context) (response RetryResponse, err error)
func (*RetryRequest) UploadsToRetry ¶ added in v1.2.0
func (retryReq *RetryRequest) UploadsToRetry(ctx context.Context) (response RetryResponse, err error)
type RetryResponse ¶ added in v0.1.10
type SchemaHandle ¶ added in v1.7.0
type SchemaHandle struct {
// contains filtered or unexported fields
}
func (*SchemaHandle) SkipDeprecatedColumns ¶ added in v1.7.0
func (sh *SchemaHandle) SkipDeprecatedColumns(schema model.Schema)
type TableUploadReq ¶ added in v1.7.0
type TableUploadReq struct { UploadID int64 Name string API UploadAPIT }
func (TableUploadReq) GetWhTableUploads ¶ added in v1.7.0
func (tableUploadReq TableUploadReq) GetWhTableUploads() ([]*proto.WHTable, error)
type TableUploadRes ¶ added in v1.7.0
type TablesRes ¶ added in v1.7.0
type TablesRes struct {
Tables []TableUploadRes `json:"tables,omitempty"`
}
type UploadAPIT ¶ added in v0.1.10
type UploadAPIT struct {
// contains filtered or unexported fields
}
var UploadAPI UploadAPIT
type UploadColumn ¶ added in v1.7.0
type UploadColumn struct { Column string Value interface{} }
type UploadColumnsOpts ¶ added in v0.1.10
type UploadColumnsOpts struct { Fields []UploadColumn Txn *sql.Tx }
type UploadJob ¶ added in v1.7.0
type UploadJob struct { LoadFileGenStartTime time.Time AlertSender alerta.AlertSender Now func() time.Time RefreshPartitionBatchSize int RetryTimeWindow time.Duration MinRetryAttempts int DisableAlter bool ErrorHandler ErrorHandler // contains filtered or unexported fields }
func (*UploadJob) GetFirstLastEvent ¶ added in v1.7.0
func (*UploadJob) GetLoadFileGenStartTIme ¶ added in v1.7.0
func (*UploadJob) GetLoadFileType ¶ added in v1.7.0
func (*UploadJob) GetLoadFilesMetadata ¶ added in v1.7.0
func (job *UploadJob) GetLoadFilesMetadata(options warehouseutils.GetLoadFilesOptions) (loadFiles []warehouseutils.LoadFile)
func (*UploadJob) GetLocalSchema ¶ added in v1.7.0
func (*UploadJob) GetSampleLoadFileLocation ¶ added in v1.7.0
func (*UploadJob) GetSchemaInWarehouse ¶ added in v1.7.0
func (*UploadJob) GetSingleLoadFile ¶ added in v1.7.0
func (job *UploadJob) GetSingleLoadFile(tableName string) (warehouseutils.LoadFile, error)
func (*UploadJob) GetTableSchemaInUpload ¶ added in v1.7.0
func (job *UploadJob) GetTableSchemaInUpload(tableName string) model.TableSchema
func (*UploadJob) GetTableSchemaInWarehouse ¶ added in v1.7.0
func (job *UploadJob) GetTableSchemaInWarehouse(tableName string) model.TableSchema
func (*UploadJob) RefreshPartitions ¶ added in v1.7.0
func (*UploadJob) ShouldOnDedupUseNewRecord ¶ added in v1.7.0
func (*UploadJob) TablesToSkip ¶ added in v1.7.0
func (job *UploadJob) TablesToSkip() (map[string]model.PendingTableUpload, map[string]model.PendingTableUpload, error)
func (*UploadJob) UpdateLocalSchema ¶ added in v1.7.0
func (*UploadJob) UpdateTableSchema ¶ added in v1.7.0
func (job *UploadJob) UpdateTableSchema(tName string, tableSchemaDiff warehouseutils.TableSchemaDiff) (err error)
func (*UploadJob) UseRudderStorage ¶ added in v1.7.0
type UploadJobFactory ¶ added in v1.6.0
type UploadJobFactory struct {
// contains filtered or unexported fields
}
func (*UploadJobFactory) NewUploadJob ¶ added in v1.6.0
type UploadPagination ¶ added in v0.1.10
type UploadReq ¶ added in v1.7.0
type UploadReq struct { WorkspaceID string UploadId int64 API UploadAPIT }
func (*UploadReq) GetWHUpload ¶ added in v1.7.0
func (uploadReq *UploadReq) GetWHUpload() (*proto.WHUploadResponse, error)
func (*UploadReq) TriggerWHUpload ¶ added in v1.7.0
func (uploadReq *UploadReq) TriggerWHUpload() (response *proto.TriggerWhUploadsResponse, err error)
type UploadRes ¶ added in v1.7.0
type UploadRes struct { ID int64 `json:"id"` Namespace string `json:"namespace"` SourceID string `json:"source_id"` DestinationID string `json:"destination_id"` DestinationType string `json:"destination_type"` Status string `json:"status"` Error string `json:"error"` Attempt int32 `json:"attempt"` Duration int32 `json:"duration"` NextRetryTime string `json:"nextRetryTime"` FirstEventAt time.Time `json:"first_event_at"` LastEventAt time.Time `json:"last_event_at"` Tables []TableUploadRes `json:"tables,omitempty"` }
type UploadStatusOpts ¶ added in v0.1.10
type UploadStatusOpts struct { Status string AdditionalFields []UploadColumn ReportingMetric types.PUReportedMetric }
type UploadsReq ¶ added in v1.7.0
type UploadsReq struct { WorkspaceID string SourceID string DestinationID string DestinationType string Status string Limit int32 Offset int32 API UploadAPIT }
func (*UploadsReq) GetWhUploads ¶ added in v1.7.0
func (uploadsReq *UploadsReq) GetWhUploads() (uploadsRes *proto.WHUploadsResponse, err error)
func (*UploadsReq) TriggerWhUploads ¶ added in v1.7.0
func (uploadsReq *UploadsReq) TriggerWhUploads() (response *proto.TriggerWhUploadsResponse, err error)
type UploadsRes ¶ added in v1.7.0
type UploadsRes struct { Uploads []UploadRes `json:"uploads"` Pagination UploadPagination `json:"pagination"` }
type WarehouseAdmin ¶ added in v0.1.10
type WarehouseAdmin struct{}
func (*WarehouseAdmin) ConfigurationTest ¶ added in v1.0.2
func (*WarehouseAdmin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error
ConfigurationTest test the underlying warehouse destination
func (*WarehouseAdmin) Query ¶ added in v0.1.10
func (*WarehouseAdmin) Query(s QueryInput, reply *warehouseutils.QueryResult) error
Query the underlying warehouse
func (*WarehouseAdmin) TriggerUpload ¶ added in v0.1.10
func (*WarehouseAdmin) TriggerUpload(off bool, reply *string) error
TriggerUpload sets uploads to start without delay
type WorkerIdentifierT ¶ added in v0.1.10
type WorkerIdentifierT string