Documentation ¶
Index ¶
- func StagingFileIDs(stagingFiles []*model.StagingFile) []int64
- type LoadFiles
- type Opt
- type ProcessOptions
- type StagingFiles
- func (repo *StagingFiles) CountPendingForDestination(ctx context.Context, destinationID string) (int64, error)
- func (repo *StagingFiles) CountPendingForSource(ctx context.Context, sourceID string) (int64, error)
- func (repo *StagingFiles) DestinationRevisionIDs(ctx context.Context, upload model.Upload) ([]string, error)
- func (repo *StagingFiles) FirstEventForUpload(ctx context.Context, upload model.Upload) (time.Time, error)
- func (repo *StagingFiles) GetAfterID(ctx context.Context, sourceID, destinationID string, startID int64) ([]model.StagingFile, error)
- func (repo *StagingFiles) GetByID(ctx context.Context, ID int64) (model.StagingFile, error)
- func (repo *StagingFiles) GetForUpload(ctx context.Context, upload model.Upload) ([]model.StagingFile, error)
- func (repo *StagingFiles) GetInRange(ctx context.Context, sourceID, destinationID string, startID, endID int64) ([]model.StagingFile, error)
- func (repo *StagingFiles) GetSchemaByID(ctx context.Context, ID int64) (jsonstd.RawMessage, error)
- func (repo *StagingFiles) Insert(ctx context.Context, stagingFile *model.StagingFileWithSchema) (int64, error)
- func (repo *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]model.StagingFile, error)
- func (repo *StagingFiles) SetErrorStatus(ctx context.Context, stagingFileID int64, stageFileErr error) error
- func (repo *StagingFiles) SetStatuses(ctx context.Context, ids []int64, status string) (err error)
- func (repo *StagingFiles) TotalEventsForUpload(ctx context.Context, upload model.Upload) (int64, error)
- type UploadMetadata
- type Uploads
- func (uploads *Uploads) CreateWithStagingFiles(ctx context.Context, upload model.Upload, files []model.StagingFile) (int64, error)
- func (uploads *Uploads) DeleteWaiting(ctx context.Context, uploadID int64) error
- func (uploads *Uploads) Get(ctx context.Context, id int64) (model.Upload, error)
- func (uploads *Uploads) GetToProcess(ctx context.Context, destType string, limit int, opts ProcessOptions) ([]model.Upload, error)
- func (uploads *Uploads) InterruptedDestinations(ctx context.Context, destinationType string) ([]string, error)
- func (uploads *Uploads) UploadJobsStats(ctx context.Context, destType string, opts ProcessOptions) (model.UploadJobsStats, error)
- func (uploads *Uploads) UploadTimings(ctx context.Context, uploadID int64) (model.Timings, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func StagingFileIDs ¶ added in v1.6.0
func StagingFileIDs(stagingFiles []*model.StagingFile) []int64
Types ¶
type LoadFiles ¶ added in v1.6.0
type LoadFiles repo
func (*LoadFiles) DeleteByStagingFiles ¶ added in v1.6.0
DeleteByStagingFiles deletes load files associated with stagingFileIDs.
type ProcessOptions ¶ added in v1.6.0
type StagingFiles ¶
type StagingFiles repo
StagingFiles is a repository for inserting and querying staging files.
func NewStagingFiles ¶ added in v1.6.0
func NewStagingFiles(db *sql.DB, opts ...Opt) *StagingFiles
func (*StagingFiles) CountPendingForDestination ¶ added in v1.6.0
func (*StagingFiles) CountPendingForSource ¶ added in v1.6.0
func (*StagingFiles) DestinationRevisionIDs ¶ added in v1.6.0
func (*StagingFiles) FirstEventForUpload ¶ added in v1.6.0
func (*StagingFiles) GetAfterID ¶
func (repo *StagingFiles) GetAfterID(ctx context.Context, sourceID, destinationID string, startID int64) ([]model.StagingFile, error)
GetAfterID returns staging files in (startID, +Inf) range.
func (*StagingFiles) GetByID ¶
func (repo *StagingFiles) GetByID(ctx context.Context, ID int64) (model.StagingFile, error)
GetByID returns staging file with the given ID.
func (*StagingFiles) GetForUpload ¶ added in v1.6.0
func (repo *StagingFiles) GetForUpload(ctx context.Context, upload model.Upload) ([]model.StagingFile, error)
func (*StagingFiles) GetInRange ¶
func (repo *StagingFiles) GetInRange(ctx context.Context, sourceID, destinationID string, startID, endID int64) ([]model.StagingFile, error)
GetInRange returns staging files in [startID, endID] range inclusive.
func (*StagingFiles) GetSchemaByID ¶ added in v1.4.3
func (repo *StagingFiles) GetSchemaByID(ctx context.Context, ID int64) (jsonstd.RawMessage, error)
GetSchemaByID returns staging file schema field the given ID.
func (*StagingFiles) Insert ¶
func (repo *StagingFiles) Insert(ctx context.Context, stagingFile *model.StagingFileWithSchema) (int64, error)
Insert inserts a staging file into the staging files table. It returns the ID of the inserted staging file.
NOTE: The following fields are ignored and set by the database: - ID - Error - CreatedAt - UpdatedAt
func (*StagingFiles) Pending ¶ added in v1.6.0
func (repo *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]model.StagingFile, error)
func (*StagingFiles) SetErrorStatus ¶ added in v1.6.0
func (*StagingFiles) SetStatuses ¶ added in v1.6.0
func (*StagingFiles) TotalEventsForUpload ¶ added in v1.6.0
type UploadMetadata ¶ added in v1.6.0
type UploadMetadata struct { UseRudderStorage bool `json:"use_rudder_storage"` SourceTaskRunID string `json:"source_task_run_id"` SourceJobID string `json:"source_job_id"` SourceJobRunID string `json:"source_job_run_id"` LoadFileType string `json:"load_file_type"` Retried bool `json:"retried"` Priority int `json:"priority"` NextRetryTime time.Time `json:"nextRetryTime"` UseUploadID bool `json:"use_upload_id"` }
func ExtractUploadMetadata ¶ added in v1.6.0
func ExtractUploadMetadata(upload model.Upload) UploadMetadata
type Uploads ¶ added in v1.6.0
type Uploads repo
func (*Uploads) CreateWithStagingFiles ¶ added in v1.6.0
func (*Uploads) DeleteWaiting ¶ added in v1.6.0
func (*Uploads) GetToProcess ¶ added in v1.6.0
func (*Uploads) InterruptedDestinations ¶ added in v1.6.0
func (uploads *Uploads) InterruptedDestinations(ctx context.Context, destinationType string) ([]string, error)
InterruptedDestinations returns a list of destination IDs, which have uploads was interrupted.
Interrupted upload might require cleanup of intermediate upload tables.
func (*Uploads) UploadJobsStats ¶ added in v1.6.0
func (uploads *Uploads) UploadJobsStats(ctx context.Context, destType string, opts ProcessOptions) (model.UploadJobsStats, error)