loadfiles

package
v1.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 20, 2023 License: AGPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetLoadFilePrefix

func GetLoadFilePrefix(timeWindow time.Time, warehouse model.Warehouse) string

func WithConfig

func WithConfig(ld *LoadFileGenerator, config *config.Config)

Types

type ControlPlaneClient

type ControlPlaneClient interface {
	DestinationHistory(ctx context.Context, revisionID string) (backendconfig.DestinationT, error)
}

type LoadFileGenerator

type LoadFileGenerator struct {
	Logger   logger.Logger
	Notifier Notifier

	StageRepo StageFileRepo
	LoadRepo  LoadFileRepo

	ControlPlaneClient ControlPlaneClient
	// contains filtered or unexported fields
}

func (*LoadFileGenerator) CreateLoadFiles

func (lf *LoadFileGenerator) CreateLoadFiles(ctx context.Context, job *model.UploadJob) (int64, int64, error)

CreateLoadFiles for the staging files that have not been successfully processed.

func (*LoadFileGenerator) ForceCreateLoadFiles

func (lf *LoadFileGenerator) ForceCreateLoadFiles(ctx context.Context, job *model.UploadJob) (int64, int64, error)

ForceCreateLoadFiles creates load files for the staging files, regardless if they are already successfully processed.

type LoadFileRepo

type LoadFileRepo interface {
	Insert(ctx context.Context, loadFiles []model.LoadFile) error
	DeleteByStagingFiles(ctx context.Context, stagingFileIDs []int64) error
	GetByStagingFiles(ctx context.Context, stagingFileIDs []int64) ([]model.LoadFile, error)
}

type Notifier

type Notifier interface {
	Publish(payload pgnotifier.MessagePayload, schema *warehouseutils.Schema, priority int) (ch chan []pgnotifier.Response, err error)
}

type StageFileRepo

type StageFileRepo interface {
	SetStatuses(ctx context.Context, ids []int64, status string) (err error)
	SetErrorStatus(ctx context.Context, stagingFileID int64, stageFileErr error) error
}

type WorkerJobRequest

type WorkerJobRequest struct {
	BatchID                      string
	UploadID                     int64
	StagingFileID                int64
	StagingFileLocation          string
	UploadSchema                 model.Schema
	WorkspaceID                  string
	SourceID                     string
	SourceName                   string
	DestinationID                string
	DestinationName              string
	DestinationType              string
	DestinationNamespace         string
	DestinationRevisionID        string
	StagingDestinationRevisionID string
	DestinationConfig            map[string]interface{}
	StagingDestinationConfig     interface{}
	UseRudderStorage             bool
	StagingUseRudderStorage      bool
	UniqueLoadGenID              string
	RudderStoragePrefix          string
	Output                       []WorkerJobResponse
	LoadFilePrefix               string // prefix for the load file name
	LoadFileType                 string
}

type WorkerJobResponse

type WorkerJobResponse struct {
	TableName             string
	Location              string
	TotalRows             int
	ContentLength         int64
	StagingFileID         int64
	DestinationRevisionID string
	UseRudderStorage      bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL