repo

package
v1.8.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2023 License: AGPL-3.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func StagingFileIDs added in v1.6.0

func StagingFileIDs(stagingFiles []*model.StagingFile) []int64

Types

type FilterBy added in v1.7.0

type FilterBy struct {
	Key       string
	Value     interface{}
	NotEquals bool
}

type LoadFiles added in v1.6.0

type LoadFiles repo

func NewLoadFiles added in v1.6.0

func NewLoadFiles(db *sql.DB, opts ...Opt) *LoadFiles

func (*LoadFiles) DeleteByStagingFiles added in v1.6.0

func (repo *LoadFiles) DeleteByStagingFiles(ctx context.Context, stagingFileIDs []int64) error

DeleteByStagingFiles deletes load files associated with stagingFileIDs.

func (*LoadFiles) GetByStagingFiles added in v1.6.0

func (repo *LoadFiles) GetByStagingFiles(ctx context.Context, stagingFileIDs []int64) ([]model.LoadFile, error)

GetByStagingFiles returns all load files matching the staging file ids.

Ordered by id ascending.

func (*LoadFiles) Insert added in v1.6.0

func (repo *LoadFiles) Insert(ctx context.Context, loadFiles []model.LoadFile) (err error)

Insert loadFiles into the database.

type Opt added in v1.6.0

type Opt func(*repo)

func WithNow added in v1.6.0

func WithNow(now func() time.Time) Opt

type ProcessOptions added in v1.6.0

type ProcessOptions struct {
	SkipIdentifiers                   []string
	SkipWorkspaces                    []string
	AllowMultipleSourcesForJobsPickup bool
}

type StagingFiles

type StagingFiles repo

StagingFiles is a repository for inserting and querying staging files.

func NewStagingFiles added in v1.6.0

func NewStagingFiles(db *sql.DB, opts ...Opt) *StagingFiles

func (*StagingFiles) CountPendingForDestination added in v1.6.0

func (repo *StagingFiles) CountPendingForDestination(ctx context.Context, destinationID string) (int64, error)

func (*StagingFiles) CountPendingForSource added in v1.6.0

func (repo *StagingFiles) CountPendingForSource(ctx context.Context, sourceID string) (int64, error)

func (*StagingFiles) DestinationRevisionIDs added in v1.6.0

func (repo *StagingFiles) DestinationRevisionIDs(ctx context.Context, upload model.Upload) ([]string, error)

func (*StagingFiles) FirstEventForUpload added in v1.6.0

func (repo *StagingFiles) FirstEventForUpload(ctx context.Context, upload model.Upload) (time.Time, error)

func (*StagingFiles) GetByID

func (repo *StagingFiles) GetByID(ctx context.Context, ID int64) (model.StagingFile, error)

GetByID returns staging file with the given ID.

func (*StagingFiles) GetForUpload added in v1.6.0

func (repo *StagingFiles) GetForUpload(ctx context.Context, upload model.Upload) ([]*model.StagingFile, error)

func (*StagingFiles) GetForUploadID added in v1.7.0

func (repo *StagingFiles) GetForUploadID(ctx context.Context, sourceID, destinationID string, uploadId int64) ([]*model.StagingFile, error)

GetForUploadID returns all the staging files for that uploadID

func (*StagingFiles) GetSchemaByID added in v1.4.3

func (repo *StagingFiles) GetSchemaByID(ctx context.Context, ID int64) (jsonstd.RawMessage, error)

GetSchemaByID returns staging file schema field the given ID.

func (*StagingFiles) Insert

func (repo *StagingFiles) Insert(ctx context.Context, stagingFile *model.StagingFileWithSchema) (int64, error)

Insert inserts a staging file into the staging files table. It returns the ID of the inserted staging file.

NOTE: The following fields are ignored and set by the database: - ID - Error - CreatedAt - UpdatedAt

func (*StagingFiles) Pending added in v1.6.0

func (repo *StagingFiles) Pending(ctx context.Context, sourceID, destinationID string) ([]*model.StagingFile, error)

func (*StagingFiles) SetErrorStatus added in v1.6.0

func (repo *StagingFiles) SetErrorStatus(ctx context.Context, stagingFileID int64, stageFileErr error) error

func (*StagingFiles) SetStatuses added in v1.6.0

func (repo *StagingFiles) SetStatuses(ctx context.Context, ids []int64, status string) (err error)

func (*StagingFiles) TotalEventsForUpload added in v1.6.0

func (repo *StagingFiles) TotalEventsForUpload(ctx context.Context, upload model.Upload) (int64, error)

type TableUploadSetOptions added in v1.7.0

type TableUploadSetOptions struct {
	Status       *string
	Error        *string
	LastExecTime *time.Time
	Location     *string
	TotalEvents  *int64
}

type TableUploads added in v1.7.0

type TableUploads repo

TableUploads is a repository for table uploads

func NewTableUploads added in v1.7.0

func NewTableUploads(db *sql.DB, opts ...Opt) *TableUploads

func (*TableUploads) ExistsForUploadID added in v1.7.0

func (repo *TableUploads) ExistsForUploadID(ctx context.Context, uploadId int64) (bool, error)

func (*TableUploads) GetByUploadID added in v1.7.0

func (repo *TableUploads) GetByUploadID(ctx context.Context, uploadID int64) ([]model.TableUpload, error)

func (*TableUploads) GetByUploadIDAndTableName added in v1.7.0

func (repo *TableUploads) GetByUploadIDAndTableName(ctx context.Context, uploadID int64, tableName string) (model.TableUpload, error)

func (*TableUploads) Insert added in v1.7.0

func (repo *TableUploads) Insert(ctx context.Context, uploadID int64, tableNames []string) error

func (*TableUploads) PopulateTotalEventsFromStagingFileIDs added in v1.7.0

func (repo *TableUploads) PopulateTotalEventsFromStagingFileIDs(ctx context.Context, uploadId int64, tableName string, stagingFileIDs []int64) error

func (*TableUploads) Set added in v1.7.0

func (repo *TableUploads) Set(ctx context.Context, uploadId int64, tableName string, options TableUploadSetOptions) error

func (*TableUploads) TotalExportedEvents added in v1.7.0

func (repo *TableUploads) TotalExportedEvents(ctx context.Context, uploadId int64, skipTables []string) (int64, error)

type UploadMetadata added in v1.6.0

type UploadMetadata struct {
	UseRudderStorage bool      `json:"use_rudder_storage"`
	SourceTaskRunID  string    `json:"source_task_run_id"`
	SourceJobID      string    `json:"source_job_id"`
	SourceJobRunID   string    `json:"source_job_run_id"`
	LoadFileType     string    `json:"load_file_type"`
	Retried          bool      `json:"retried"`
	Priority         int       `json:"priority"`
	NextRetryTime    time.Time `json:"nextRetryTime"`
}

func ExtractUploadMetadata added in v1.6.0

func ExtractUploadMetadata(upload model.Upload) UploadMetadata

type Uploads added in v1.6.0

type Uploads repo

func NewUploads added in v1.6.0

func NewUploads(db *sql.DB, opts ...Opt) *Uploads

func (*Uploads) Count added in v1.7.0

func (uploads *Uploads) Count(ctx context.Context, filters ...FilterBy) (int64, error)

func (*Uploads) CreateWithStagingFiles added in v1.6.0

func (uploads *Uploads) CreateWithStagingFiles(ctx context.Context, upload model.Upload, files []*model.StagingFile) (int64, error)

func (*Uploads) DeleteWaiting added in v1.6.0

func (uploads *Uploads) DeleteWaiting(ctx context.Context, uploadID int64) error

func (*Uploads) Get added in v1.6.0

func (uploads *Uploads) Get(ctx context.Context, id int64) (model.Upload, error)

func (*Uploads) GetToProcess added in v1.6.0

func (uploads *Uploads) GetToProcess(ctx context.Context, destType string, limit int, opts ProcessOptions) ([]model.Upload, error)

func (*Uploads) InterruptedDestinations added in v1.6.0

func (uploads *Uploads) InterruptedDestinations(ctx context.Context, destinationType string) ([]string, error)

InterruptedDestinations returns a list of destination IDs, which have uploads was interrupted.

Interrupted upload might require cleanup of intermediate upload tables.

func (*Uploads) PendingTableUploads added in v1.7.0

func (uploads *Uploads) PendingTableUploads(ctx context.Context, namespace string, uploadID int64, destID string) ([]model.PendingTableUpload, error)

PendingTableUploads returns a list of pending table uploads for a given upload.

func (*Uploads) UploadJobsStats added in v1.6.0

func (uploads *Uploads) UploadJobsStats(ctx context.Context, destType string, opts ProcessOptions) (model.UploadJobsStats, error)

func (*Uploads) UploadTimings added in v1.6.0

func (uploads *Uploads) UploadTimings(ctx context.Context, uploadID int64) (model.Timings, error)

UploadTimings returns the timings for an upload.

type WHSchema added in v1.8.0

type WHSchema repo

func NewWHSchemas added in v1.8.0

func NewWHSchemas(db *sql.DB, opts ...Opt) *WHSchema

func (*WHSchema) GetForNamespace added in v1.8.0

func (repo *WHSchema) GetForNamespace(ctx context.Context, sourceID, destID, namespace string) (model.WHSchema, error)

func (*WHSchema) GetNamespace added in v1.8.0

func (repo *WHSchema) GetNamespace(ctx context.Context, sourceID, destID string) (string, error)

func (*WHSchema) Insert added in v1.8.0

func (repo *WHSchema) Insert(ctx context.Context, whSchema *model.WHSchema) (int64, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL