Documentation ¶
Index ¶
- func StagingFileIDs(stagingFiles []*model.StagingFile) []int64
- type FilterBy
- type LoadFiles
- type Opt
- type ProcessOptions
- type StagingFiles
- func (repo *StagingFiles) CountPendingForDestination(ctx context.Context, destinationID string) (int64, error)
- func (repo *StagingFiles) CountPendingForSource(ctx context.Context, sourceID string) (int64, error)
- func (repo *StagingFiles) DestinationRevisionIDs(ctx context.Context, upload model.Upload) ([]string, error)
- func (repo *StagingFiles) FirstEventForUpload(ctx context.Context, upload model.Upload) (time.Time, error)
- func (repo *StagingFiles) GetByID(ctx context.Context, ID int64) (model.StagingFile, error)
- func (repo *StagingFiles) GetForUpload(ctx context.Context, upload model.Upload) ([]*model.StagingFile, error)
- func (repo *StagingFiles) GetForUploadID(ctx context.Context, sourceID, destinationID string, uploadId int64) ([]*model.StagingFile, error)
- func (repo *StagingFiles) GetSchemasByIDs(ctx context.Context, ids []int64) ([]model.Schema, error)
- func (repo *StagingFiles) Insert(ctx context.Context, stagingFile *model.StagingFileWithSchema) (int64, error)
- func (repo *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]*model.StagingFile, error)
- func (repo *StagingFiles) SetErrorStatus(ctx context.Context, stagingFileID int64, stageFileErr error) error
- func (repo *StagingFiles) SetStatuses(ctx context.Context, ids []int64, status string) (err error)
- func (repo *StagingFiles) TotalEventsForUpload(ctx context.Context, upload model.Upload) (int64, error)
- type TableUploadSetOptions
- type TableUploads
- func (repo *TableUploads) ExistsForUploadID(ctx context.Context, uploadId int64) (bool, error)
- func (repo *TableUploads) GetByUploadID(ctx context.Context, uploadID int64) ([]model.TableUpload, error)
- func (repo *TableUploads) GetByUploadIDAndTableName(ctx context.Context, uploadID int64, tableName string) (model.TableUpload, error)
- func (repo *TableUploads) Insert(ctx context.Context, uploadID int64, tableNames []string) error
- func (repo *TableUploads) PopulateTotalEventsFromStagingFileIDs(ctx context.Context, uploadId int64, tableName string, stagingFileIDs []int64) error
- func (repo *TableUploads) Set(ctx context.Context, uploadId int64, tableName string, ...) error
- func (repo *TableUploads) TotalExportedEvents(ctx context.Context, uploadId int64, skipTables []string) (int64, error)
- type UploadMetadata
- type Uploads
- func (uploads *Uploads) Count(ctx context.Context, filters ...FilterBy) (int64, error)
- func (uploads *Uploads) CreateWithStagingFiles(ctx context.Context, upload model.Upload, files []*model.StagingFile) (int64, error)
- func (uploads *Uploads) DeleteWaiting(ctx context.Context, uploadID int64) error
- func (uploads *Uploads) Get(ctx context.Context, id int64) (model.Upload, error)
- func (uploads *Uploads) GetToProcess(ctx context.Context, destType string, limit int, opts ProcessOptions) ([]model.Upload, error)
- func (uploads *Uploads) InterruptedDestinations(ctx context.Context, destinationType string) ([]string, error)
- func (uploads *Uploads) PendingTableUploads(ctx context.Context, namespace string, uploadID int64, destID string) ([]model.PendingTableUpload, error)
- func (uploads *Uploads) UploadJobsStats(ctx context.Context, destType string, opts ProcessOptions) (model.UploadJobsStats, error)
- func (uploads *Uploads) UploadTimings(ctx context.Context, uploadID int64) (model.Timings, error)
- type WHSchema
- func (repo *WHSchema) GetForNamespace(ctx context.Context, sourceID, destID, namespace string) (model.WHSchema, error)
- func (repo *WHSchema) GetNamespace(ctx context.Context, sourceID, destID string) (string, error)
- func (repo *WHSchema) GetTablesForConnection(ctx context.Context, connections []warehouseutils.SourceIDDestinationID) ([]warehouseutils.FetchTableInfo, error)
- func (repo *WHSchema) Insert(ctx context.Context, whSchema *model.WHSchema) (int64, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StagingFileIDs ¶ added in v1.6.0
func StagingFileIDs(stagingFiles []*model.StagingFile) []int64
Types ¶
type LoadFiles ¶ added in v1.6.0
type LoadFiles repo
func (*LoadFiles) DeleteByStagingFiles ¶ added in v1.6.0
DeleteByStagingFiles deletes load files associated with stagingFileIDs.
type ProcessOptions ¶ added in v1.6.0
type StagingFiles ¶
type StagingFiles repo
StagingFiles is a repository for inserting and querying staging files.
func NewStagingFiles ¶ added in v1.6.0
func NewStagingFiles(db *sql.DB, opts ...Opt) *StagingFiles
func (*StagingFiles) CountPendingForDestination ¶ added in v1.6.0
func (*StagingFiles) CountPendingForSource ¶ added in v1.6.0
func (*StagingFiles) DestinationRevisionIDs ¶ added in v1.6.0
func (*StagingFiles) FirstEventForUpload ¶ added in v1.6.0
func (*StagingFiles) GetByID ¶
func (repo *StagingFiles) GetByID(ctx context.Context, ID int64) (model.StagingFile, error)
GetByID returns staging file with the given ID.
func (*StagingFiles) GetForUpload ¶ added in v1.6.0
func (repo *StagingFiles) GetForUpload(ctx context.Context, upload model.Upload) ([]*model.StagingFile, error)
func (*StagingFiles) GetForUploadID ¶ added in v1.7.0
func (repo *StagingFiles) GetForUploadID(ctx context.Context, sourceID, destinationID string, uploadId int64) ([]*model.StagingFile, error)
GetForUploadID returns all the staging files for that uploadID
func (*StagingFiles) GetSchemasByIDs ¶ added in v1.10.0
GetSchemasByIDs returns staging file schemas for the given IDs.
func (*StagingFiles) Insert ¶
func (repo *StagingFiles) Insert(ctx context.Context, stagingFile *model.StagingFileWithSchema) (int64, error)
Insert inserts a staging file into the staging files table. It returns the ID of the inserted staging file.
NOTE: The following fields are ignored and set by the database: - ID - Error - CreatedAt - UpdatedAt
func (*StagingFiles) Pending ¶ added in v1.6.0
func (repo *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]*model.StagingFile, error)
func (*StagingFiles) SetErrorStatus ¶ added in v1.6.0
func (*StagingFiles) SetStatuses ¶ added in v1.6.0
func (*StagingFiles) TotalEventsForUpload ¶ added in v1.6.0
type TableUploadSetOptions ¶ added in v1.7.0
type TableUploads ¶ added in v1.7.0
type TableUploads repo
TableUploads is a repository for table uploads
func NewTableUploads ¶ added in v1.7.0
func NewTableUploads(db *sql.DB, opts ...Opt) *TableUploads
func (*TableUploads) ExistsForUploadID ¶ added in v1.7.0
func (*TableUploads) GetByUploadID ¶ added in v1.7.0
func (repo *TableUploads) GetByUploadID(ctx context.Context, uploadID int64) ([]model.TableUpload, error)
func (*TableUploads) GetByUploadIDAndTableName ¶ added in v1.7.0
func (repo *TableUploads) GetByUploadIDAndTableName(ctx context.Context, uploadID int64, tableName string) (model.TableUpload, error)
func (*TableUploads) PopulateTotalEventsFromStagingFileIDs ¶ added in v1.7.0
func (*TableUploads) Set ¶ added in v1.7.0
func (repo *TableUploads) Set(ctx context.Context, uploadId int64, tableName string, options TableUploadSetOptions) error
func (*TableUploads) TotalExportedEvents ¶ added in v1.7.0
type UploadMetadata ¶ added in v1.6.0
type UploadMetadata struct { UseRudderStorage bool `json:"use_rudder_storage"` SourceTaskRunID string `json:"source_task_run_id"` SourceJobID string `json:"source_job_id"` SourceJobRunID string `json:"source_job_run_id"` LoadFileType string `json:"load_file_type"` Retried bool `json:"retried"` Priority int `json:"priority"` NextRetryTime time.Time `json:"nextRetryTime"` }
func ExtractUploadMetadata ¶ added in v1.6.0
func ExtractUploadMetadata(upload model.Upload) UploadMetadata
type Uploads ¶ added in v1.6.0
type Uploads repo
func (*Uploads) CreateWithStagingFiles ¶ added in v1.6.0
func (*Uploads) DeleteWaiting ¶ added in v1.6.0
func (*Uploads) GetToProcess ¶ added in v1.6.0
func (*Uploads) InterruptedDestinations ¶ added in v1.6.0
func (uploads *Uploads) InterruptedDestinations(ctx context.Context, destinationType string) ([]string, error)
InterruptedDestinations returns a list of destination IDs, which have uploads was interrupted.
Interrupted upload might require cleanup of intermediate upload tables.
func (*Uploads) PendingTableUploads ¶ added in v1.7.0
func (uploads *Uploads) PendingTableUploads(ctx context.Context, namespace string, uploadID int64, destID string) ([]model.PendingTableUpload, error)
PendingTableUploads returns a list of pending table uploads for a given upload.
func (*Uploads) UploadJobsStats ¶ added in v1.6.0
func (uploads *Uploads) UploadJobsStats(ctx context.Context, destType string, opts ProcessOptions) (model.UploadJobsStats, error)
type WHSchema ¶ added in v1.8.0
type WHSchema repo
func (*WHSchema) GetForNamespace ¶ added in v1.8.0
func (*WHSchema) GetNamespace ¶ added in v1.8.0
func (*WHSchema) GetTablesForConnection ¶ added in v1.9.0
func (repo *WHSchema) GetTablesForConnection(ctx context.Context, connections []warehouseutils.SourceIDDestinationID) ([]warehouseutils.FetchTableInfo, error)