warehouse

package
v1.12.0-rc.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 11, 2023 License: AGPL-3.0 Imports: 81 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TriggeredSuccessfully   = "Triggered successfully"
	NoPendingEvents         = "No pending events to sync for this destination"
	DownloadFileNamePattern = "downloadfile.*.tmp"
	NoSuchSync              = "No such sync exist"
)
View Source
const (
	GeneratingStagingFileFailedState = "generating_staging_file_failed"
	GeneratedStagingFileState        = "generated_staging_file"
	FetchingRemoteSchemaFailed       = "fetching_remote_schema_failed"
	InternalProcessingFailed         = "internal_processing_failed"
)
View Source
const (
	CloudSourceCategory          = "cloud"
	SingerProtocolSourceCategory = "singer-protocol"
)
View Source
const (
	UploadStatusField          = "status"
	UploadStartLoadFileIDField = "start_load_file_id"
	UploadEndLoadFileIDField   = "end_load_file_id"
	UploadUpdatedAtField       = "updated_at"
	UploadTimingsField         = "timings"
	UploadSchemaField          = "schema"
	UploadLastExecAtField      = "last_exec_at"
	UploadInProgress           = "in_progress"
)
View Source
const (
	EmbeddedMode       = "embedded"
	EmbeddedMasterMode = "embedded_master"
)

warehouses worker modes

View Source
const (
	DegradedMode = "degraded"
)

Variables

View Source
var (
	ShouldForceSetLowerVersion bool
)

Functions

func CheckForWarehouseEnvVars added in v0.1.10

func CheckForWarehouseEnvVars() bool

CheckForWarehouseEnvVars Checks if all the required Env Variables for Warehouse are present

func CheckPGHealth

func CheckPGHealth(ctx context.Context, db *sql.DB) bool

func ClauseQueryArgs added in v1.2.0

func ClauseQueryArgs(filterClauses ...FilterClause) (string, []interface{})

func Init added in v0.1.10

func Init()

func Init4 added in v0.1.10

func Init4()

func Init6 added in v0.1.10

func Init6()

func InitWarehouseAPI added in v0.1.10

func InitWarehouseAPI(dbHandle *sql.DB, bcManager *backendConfigManager, log logger.Logger) error

func PickupStagingConfiguration added in v1.0.2

func PickupStagingConfiguration(job *Payload) bool

func RegisterAdmin added in v1.12.0

func RegisterAdmin(bcManager *backendConfigManager, logger logger.Logger)

func Setup added in v1.4.0

func Setup(ctx context.Context) error

Setup prepares the database connection for warehouse service, verifies database compatibility and creates the required tables

func Start

func Start(ctx context.Context, app app.App) error

Start starts the warehouse service

func TriggerUploadHandler added in v0.1.10

func TriggerUploadHandler(sourceID, destID string) error

Types

type Admin added in v1.12.0

type Admin struct {
	// contains filtered or unexported fields
}

func (*Admin) ConfigurationTest added in v1.12.0

func (a *Admin) ConfigurationTest(s ConfigurationTestInput, reply *ConfigurationTestOutput) error

ConfigurationTest test the underlying warehouse destination

func (*Admin) Query added in v1.12.0

func (a *Admin) Query(s QueryInput, reply *warehouseutils.QueryResult) error

Query the underlying warehouse

func (*Admin) TriggerUpload added in v1.12.0

func (*Admin) TriggerUpload(off bool, reply *string) error

TriggerUpload sets uploads to start without delay

type BatchRouterEvent added in v1.7.0

type BatchRouterEvent struct {
	Metadata Metadata `json:"metadata"`
	Data     Data     `json:"data"`
}

func (*BatchRouterEvent) GetColumnInfo added in v1.7.0

func (event *BatchRouterEvent) GetColumnInfo(columnName string) (columnInfo warehouseutils.ColumnInfo, ok bool)

type ConfigurationTestInput added in v1.0.2

type ConfigurationTestInput struct {
	DestID string
}

type ConfigurationTestOutput added in v1.0.2

type ConfigurationTestOutput struct {
	Valid bool
	Error string
}

type Constraints added in v1.7.0

type Constraints interface {
	// contains filtered or unexported methods
}

type ConstraintsViolation added in v1.7.0

type ConstraintsViolation struct {
	IsViolated         bool
	ViolatedIdentifier string
}

func ViolatedConstraints added in v0.1.10

func ViolatedConstraints(destinationType string, brEvent *BatchRouterEvent, columnName string) (cv *ConstraintsViolation)

type DB added in v1.0.2

type DB struct {
	// contains filtered or unexported fields
}

DB encapsulate interactions of warehouse operations with the database.

func NewWarehouseDB added in v1.0.2

func NewWarehouseDB(handle *sqlquerywrapper.DB) *DB

func (*DB) GetLatestUploadStatus added in v1.0.2

func (db *DB) GetLatestUploadStatus(ctx context.Context, sourceID, destinationID string) (int64, string, int, error)

func (*DB) GetUploadsCount added in v1.2.0

func (db *DB) GetUploadsCount(ctx context.Context, filterClauses ...FilterClause) (count int64, err error)

func (*DB) RetryUploads added in v1.2.0

func (db *DB) RetryUploads(ctx context.Context, filterClauses ...FilterClause) (rowsAffected int64, err error)

type Data added in v1.7.0

type Data map[string]interface{}

type ErrorHandler added in v1.6.0

type ErrorHandler struct {
	Manager manager.Manager
}

func (*ErrorHandler) MatchErrorMappings added in v1.6.0

func (e *ErrorHandler) MatchErrorMappings(err error) warehouseutils.Tag

MatchErrorMappings matches the error with the error mappings defined in the integrations and returns the corresponding joins of the matched error type else returns UnknownError

type FilterClause added in v1.2.0

type FilterClause struct {
	Clause    string
	ClauseArg interface{}
}

type IndexConstraint added in v1.7.0

type IndexConstraint struct {
	TableName    string
	ColumnName   string
	IndexColumns []string
	Limit        int
}

type InvalidDestinationCredErr added in v1.2.0

type InvalidDestinationCredErr struct {
	Base      error
	Operation string
}

func (InvalidDestinationCredErr) Error added in v1.2.0

func (err InvalidDestinationCredErr) Error() string

type JobID added in v1.7.0

type JobID int64

type LoadFileJob added in v1.7.0

type LoadFileJob struct {
	StagingFile                *model.StagingFile
	Schema                     model.Schema
	Warehouse                  model.Warehouse
	Wg                         *misc.WaitGroup
	LoadFileIDsChan            chan []int64
	TableToBucketFolderMap     map[string]string
	TableToBucketFolderMapLock *sync.RWMutex
}

type Metadata added in v1.7.0

type Metadata struct {
	Table        string            `json:"table"`
	Columns      map[string]string `json:"columns"`
	IsMergeRule  bool              `json:"isMergeRule"`
	ReceivedAt   time.Time         `json:"receivedAt"`
	MergePropOne string            `json:"mergePropOne"`
	MergePropTwo string            `json:"mergePropTwo"`
}

type ObjectStorageValidationRequest added in v1.2.0

type ObjectStorageValidationRequest struct {
	Type   string                 `json:"type"`
	Config map[string]interface{} `json:"config"`
}

type Payload added in v1.2.0

type Payload struct {
	BatchID                      string
	UploadID                     int64
	StagingFileID                int64
	StagingFileLocation          string
	UploadSchema                 model.Schema
	WorkspaceID                  string
	SourceID                     string
	SourceName                   string
	DestinationID                string
	DestinationName              string
	DestinationType              string
	DestinationNamespace         string
	DestinationRevisionID        string
	StagingDestinationRevisionID string
	DestinationConfig            map[string]interface{}
	StagingDestinationConfig     interface{}
	UseRudderStorage             bool
	StagingUseRudderStorage      bool
	UniqueLoadGenID              string
	RudderStoragePrefix          string
	Output                       []uploadResult
	LoadFilePrefix               string // prefix for the load file name
	LoadFileType                 string
}

type QueryInput added in v0.1.10

type QueryInput struct {
	DestID       string
	SourceID     string
	SQLStatement string
}

type RetryRequest added in v0.1.10

type RetryRequest struct {
	WorkspaceID     string
	SourceID        string
	DestinationID   string
	DestinationType string
	IntervalInHours int64   // Optional, if provided we will retry based on the interval provided
	UploadIds       []int64 // Optional, if provided we will retry the upload ids provided
	ForceRetry      bool
	API             UploadAPIT
}

func (*RetryRequest) RetryWHUploads added in v0.1.10

func (retryReq *RetryRequest) RetryWHUploads(ctx context.Context) (response RetryResponse, err error)

func (*RetryRequest) UploadsToRetry added in v1.2.0

func (retryReq *RetryRequest) UploadsToRetry(ctx context.Context) (response RetryResponse, err error)

type RetryResponse added in v0.1.10

type RetryResponse struct {
	Count      int64
	Message    string
	StatusCode int32
}

type Schema added in v1.10.0

type Schema struct {
	// contains filtered or unexported fields
}

func NewSchema added in v1.10.0

func NewSchema(
	db *sqlquerywrapper.DB,
	warehouse model.Warehouse,
	conf *config.Config,
) *Schema

type TableUploadReq added in v1.7.0

type TableUploadReq struct {
	UploadID int64
	Name     string
	API      UploadAPIT
}

func (TableUploadReq) GetWhTableUploads added in v1.7.0

func (tableUploadReq TableUploadReq) GetWhTableUploads(ctx context.Context) ([]*proto.WHTable, error)

type TableUploadRes added in v1.7.0

type TableUploadRes struct {
	ID         int64     `json:"id"`
	UploadID   int64     `json:"upload_id"`
	Name       string    `json:"name"`
	Error      string    `json:"error"`
	Status     string    `json:"status"`
	Count      int32     `json:"count"`
	LastExecAt time.Time `json:"last_exec_at"`
	Duration   int32     `json:"duration"`
}

type TablesRes added in v1.7.0

type TablesRes struct {
	Tables []TableUploadRes `json:"tables,omitempty"`
}

type UploadAPIT added in v0.1.10

type UploadAPIT struct {
	// contains filtered or unexported fields
}
var UploadAPI UploadAPIT

type UploadColumn added in v1.7.0

type UploadColumn struct {
	Column string
	Value  interface{}
}

type UploadColumnsOpts added in v0.1.10

type UploadColumnsOpts struct {
	Fields []UploadColumn
	Txn    *sqlmiddleware.Tx
}

type UploadJob added in v1.7.0

type UploadJob struct {
	LoadFileGenStartTime time.Time
	// contains filtered or unexported fields
}

func (*UploadJob) Aborted added in v1.7.0

func (job *UploadJob) Aborted(attempts int, startTime time.Time) bool

Aborted returns true if the job has been aborted

func (*UploadJob) DTO added in v1.7.0

func (job *UploadJob) DTO() *model.UploadJob

func (*UploadJob) GetFirstLastEvent added in v1.7.0

func (job *UploadJob) GetFirstLastEvent() (time.Time, time.Time)

func (*UploadJob) GetLoadFileGenStartTIme added in v1.7.0

func (job *UploadJob) GetLoadFileGenStartTIme() time.Time

func (*UploadJob) GetLoadFileType added in v1.7.0

func (job *UploadJob) GetLoadFileType() string

func (*UploadJob) GetLoadFilesMetadata added in v1.7.0

func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options warehouseutils.GetLoadFilesOptions) (loadFiles []warehouseutils.LoadFile)

func (*UploadJob) GetLocalSchema added in v1.7.0

func (job *UploadJob) GetLocalSchema(ctx context.Context) (model.Schema, error)

func (*UploadJob) GetSampleLoadFileLocation added in v1.7.0

func (job *UploadJob) GetSampleLoadFileLocation(ctx context.Context, tableName string) (location string, err error)

func (*UploadJob) GetSchemaInWarehouse added in v1.7.0

func (job *UploadJob) GetSchemaInWarehouse() (schema model.Schema)

func (*UploadJob) GetSingleLoadFile added in v1.7.0

func (job *UploadJob) GetSingleLoadFile(ctx context.Context, tableName string) (warehouseutils.LoadFile, error)

func (*UploadJob) GetTableSchemaInUpload added in v1.7.0

func (job *UploadJob) GetTableSchemaInUpload(tableName string) model.TableSchema

func (*UploadJob) GetTableSchemaInWarehouse added in v1.7.0

func (job *UploadJob) GetTableSchemaInWarehouse(tableName string) model.TableSchema

func (*UploadJob) RefreshPartitions added in v1.7.0

func (job *UploadJob) RefreshPartitions(loadFileStartID, loadFileEndID int64) error

func (*UploadJob) ShouldOnDedupUseNewRecord added in v1.7.0

func (job *UploadJob) ShouldOnDedupUseNewRecord() bool

func (*UploadJob) TablesToSkip added in v1.7.0

func (job *UploadJob) TablesToSkip() (map[string]model.PendingTableUpload, map[string]model.PendingTableUpload, error)

func (*UploadJob) UpdateLocalSchema added in v1.7.0

func (job *UploadJob) UpdateLocalSchema(ctx context.Context, schema model.Schema) error

func (*UploadJob) UpdateTableSchema added in v1.7.0

func (job *UploadJob) UpdateTableSchema(tName string, tableSchemaDiff warehouseutils.TableSchemaDiff) (err error)

func (*UploadJob) UseRudderStorage added in v1.7.0

func (job *UploadJob) UseRudderStorage() bool

type UploadJobFactory added in v1.6.0

type UploadJobFactory struct {
	// contains filtered or unexported fields
}

func (*UploadJobFactory) NewUploadJob added in v1.6.0

func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJob, whManager manager.Manager) *UploadJob

type UploadPagination added in v0.1.10

type UploadPagination struct {
	Total  int32 `json:"total"`
	Limit  int32 `json:"limit"`
	Offset int32 `json:"offset"`
}

type UploadReq added in v1.7.0

type UploadReq struct {
	WorkspaceID string
	UploadId    int64
	API         UploadAPIT
}

func (*UploadReq) GetWHUpload added in v1.7.0

func (uploadReq *UploadReq) GetWHUpload(ctx context.Context) (*proto.WHUploadResponse, error)

func (*UploadReq) TriggerWHUpload added in v1.7.0

func (uploadReq *UploadReq) TriggerWHUpload(ctx context.Context) (response *proto.TriggerWhUploadsResponse, err error)

type UploadRes added in v1.7.0

type UploadRes struct {
	ID              int64            `json:"id"`
	Namespace       string           `json:"namespace"`
	SourceID        string           `json:"source_id"`
	DestinationID   string           `json:"destination_id"`
	DestinationType string           `json:"destination_type"`
	Status          string           `json:"status"`
	Error           string           `json:"error"`
	Attempt         int32            `json:"attempt"`
	Duration        int32            `json:"duration"`
	NextRetryTime   string           `json:"nextRetryTime"`
	FirstEventAt    time.Time        `json:"first_event_at"`
	LastEventAt     time.Time        `json:"last_event_at"`
	Tables          []TableUploadRes `json:"tables,omitempty"`
}

type UploadStatusOpts added in v0.1.10

type UploadStatusOpts struct {
	Status           string
	AdditionalFields []UploadColumn
	ReportingMetric  types.PUReportedMetric
}

type UploadsReq added in v1.7.0

type UploadsReq struct {
	WorkspaceID     string
	SourceID        string
	DestinationID   string
	DestinationType string
	Status          string
	Limit           int32
	Offset          int32
	API             UploadAPIT
}

func (*UploadsReq) GetWhUploads added in v1.7.0

func (uploadsReq *UploadsReq) GetWhUploads(ctx context.Context) (uploadsRes *proto.WHUploadsResponse, err error)

func (*UploadsReq) TriggerWhUploads added in v1.7.0

func (uploadsReq *UploadsReq) TriggerWhUploads(ctx context.Context) (response *proto.TriggerWhUploadsResponse, err error)

type UploadsRes added in v1.7.0

type UploadsRes struct {
	Uploads    []UploadRes      `json:"uploads"`
	Pagination UploadPagination `json:"pagination"`
}

type WorkerIdentifierT added in v0.1.10

type WorkerIdentifierT string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL