repo

package
v1.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2023 License: AGPL-3.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StagingFileIDs added in v1.6.0

func StagingFileIDs(stagingFiles []*model.StagingFile) []int64

Types

type LoadFiles added in v1.6.0

type LoadFiles repo

func NewLoadFiles added in v1.6.0

func NewLoadFiles(db *sql.DB, opts ...Opt) *LoadFiles

func (*LoadFiles) DeleteByStagingFiles added in v1.6.0

func (repo *LoadFiles) DeleteByStagingFiles(ctx context.Context, stagingFileIDs []int64) error

DeleteByStagingFiles deletes load files associated with stagingFileIDs.

func (*LoadFiles) GetByStagingFiles added in v1.6.0

func (repo *LoadFiles) GetByStagingFiles(ctx context.Context, stagingFileIDs []int64) ([]model.LoadFile, error)

GetByStagingFiles returns all load files matching the staging file ids.

Ordered by id ascending.

func (*LoadFiles) Insert added in v1.6.0

func (repo *LoadFiles) Insert(ctx context.Context, loadFiles []model.LoadFile) (err error)

Insert loadFiles into the database.

type Opt added in v1.6.0

type Opt func(*repo)

func WithNow added in v1.6.0

func WithNow(now func() time.Time) Opt

type ProcessOptions added in v1.6.0

type ProcessOptions struct {
	SkipIdentifiers                   []string
	SkipWorkspaces                    []string
	AllowMultipleSourcesForJobsPickup bool
}

type StagingFiles

type StagingFiles repo

StagingFiles is a repository for inserting and querying staging files.

func NewStagingFiles added in v1.6.0

func NewStagingFiles(db *sql.DB, opts ...Opt) *StagingFiles

func (*StagingFiles) CountPendingForDestination added in v1.6.0

func (repo *StagingFiles) CountPendingForDestination(ctx context.Context, destinationID string) (int64, error)

func (*StagingFiles) CountPendingForSource added in v1.6.0

func (repo *StagingFiles) CountPendingForSource(ctx context.Context, sourceID string) (int64, error)

func (*StagingFiles) DestinationRevisionIDs added in v1.6.0

func (repo *StagingFiles) DestinationRevisionIDs(ctx context.Context, upload model.Upload) ([]string, error)

func (*StagingFiles) FirstEventForUpload added in v1.6.0

func (repo *StagingFiles) FirstEventForUpload(ctx context.Context, upload model.Upload) (time.Time, error)

func (*StagingFiles) GetAfterID

func (repo *StagingFiles) GetAfterID(ctx context.Context, sourceID, destinationID string, startID int64) ([]model.StagingFile, error)

GetAfterID returns staging files in (startID, +Inf) range.

func (*StagingFiles) GetByID

func (repo *StagingFiles) GetByID(ctx context.Context, ID int64) (model.StagingFile, error)

GetByID returns staging file with the given ID.

func (*StagingFiles) GetForUpload added in v1.6.0

func (repo *StagingFiles) GetForUpload(ctx context.Context, upload model.Upload) ([]model.StagingFile, error)

func (*StagingFiles) GetInRange

func (repo *StagingFiles) GetInRange(ctx context.Context, sourceID, destinationID string, startID, endID int64) ([]model.StagingFile, error)

GetInRange returns staging files in [startID, endID] range inclusive.

func (*StagingFiles) GetSchemaByID added in v1.4.3

func (repo *StagingFiles) GetSchemaByID(ctx context.Context, ID int64) (jsonstd.RawMessage, error)

GetSchemaByID returns staging file schema field the given ID.

func (*StagingFiles) Insert

func (repo *StagingFiles) Insert(ctx context.Context, stagingFile *model.StagingFileWithSchema) (int64, error)

Insert inserts a staging file into the staging files table. It returns the ID of the inserted staging file.

NOTE: The following fields are ignored and set by the database: - ID - Error - CreatedAt - UpdatedAt

func (*StagingFiles) Pending added in v1.6.0

func (repo *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]model.StagingFile, error)

func (*StagingFiles) SetErrorStatus added in v1.6.0

func (repo *StagingFiles) SetErrorStatus(ctx context.Context, stagingFileID int64, stageFileErr error) error

func (*StagingFiles) SetStatuses added in v1.6.0

func (repo *StagingFiles) SetStatuses(ctx context.Context, ids []int64, status string) (err error)

func (*StagingFiles) TotalEventsForUpload added in v1.6.0

func (repo *StagingFiles) TotalEventsForUpload(ctx context.Context, upload model.Upload) (int64, error)

type UploadMetadata added in v1.6.0

type UploadMetadata struct {
	UseRudderStorage bool      `json:"use_rudder_storage"`
	SourceTaskRunID  string    `json:"source_task_run_id"`
	SourceJobID      string    `json:"source_job_id"`
	SourceJobRunID   string    `json:"source_job_run_id"`
	LoadFileType     string    `json:"load_file_type"`
	Retried          bool      `json:"retried"`
	Priority         int       `json:"priority"`
	NextRetryTime    time.Time `json:"nextRetryTime"`
	UseUploadID      bool      `json:"use_upload_id"`
}

func ExtractUploadMetadata added in v1.6.0

func ExtractUploadMetadata(upload model.Upload) UploadMetadata

type Uploads added in v1.6.0

type Uploads repo

func NewUploads added in v1.6.0

func NewUploads(db *sql.DB, opts ...Opt) *Uploads

func (*Uploads) CreateWithStagingFiles added in v1.6.0

func (uploads *Uploads) CreateWithStagingFiles(ctx context.Context, upload model.Upload, files []model.StagingFile) (int64, error)

func (*Uploads) DeleteWaiting added in v1.6.0

func (uploads *Uploads) DeleteWaiting(ctx context.Context, uploadID int64) error

func (*Uploads) Get added in v1.6.0

func (uploads *Uploads) Get(ctx context.Context, id int64) (model.Upload, error)

func (*Uploads) GetToProcess added in v1.6.0

func (uploads *Uploads) GetToProcess(ctx context.Context, destType string, limit int, opts ProcessOptions) ([]model.Upload, error)

func (*Uploads) InterruptedDestinations added in v1.6.0

func (uploads *Uploads) InterruptedDestinations(ctx context.Context, destinationType string) ([]string, error)

InterruptedDestinations returns a list of destination IDs, which have uploads was interrupted.

Interrupted upload might require cleanup of intermediate upload tables.

func (*Uploads) UploadJobsStats added in v1.6.0

func (uploads *Uploads) UploadJobsStats(ctx context.Context, destType string, opts ProcessOptions) (model.UploadJobsStats, error)

func (*Uploads) UploadTimings added in v1.6.0

func (uploads *Uploads) UploadTimings(ctx context.Context, uploadID int64) (model.Timings, error)

UploadTimings returns the timings for an upload.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL