actions

package
v0.0.0-...-3fc42f3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 23, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DownloadS3Object

func DownloadS3Object(s3Key, localDir string, minSize int64) (string, error)

func GetFileKeys

func GetFileKeys(ctx context.Context, dbpool *pgxpool.Pool, sessionId string, nodeId int) ([]string, error)

Get the file_key(s) assigned to nodeId -- these are the input multipart files

func ShardFileKeys

func ShardFileKeys(exeCtx context.Context, dbpool *pgxpool.Pool, baseFileKey string, sessionId string, clusterConfig *compute_pipes.ClusterSpec) (int, error)

Action to assign input file keys to nodes aka shards. Assign file_key to shard (node_id, sc_node_id, sc_id) into jetsapi.compute_pipes_shard_registry This is done in 2 steps, first load the file_key and file_size into the table Then allocate the file_key using a round robin to sc_is and sc_node_id in decreasing order of file size.

func ShardFileKeysP1

func ShardFileKeysP1(exeCtx context.Context, dbpool *pgxpool.Pool, baseFileKey string, sessionId string) (int, int64, error)

Part 1

func ShardFileKeysP2

func ShardFileKeysP2(exeCtx context.Context, dbpool *pgxpool.Pool, baseFileKey string, sessionId string, nbrNodes, nbrSubClusters int) error

Part 2

Types

type CompiledPartFileComponent

type CompiledPartFileComponent struct {
	ColumnName string
	Regex      *regexp.Regexp
}

type ComputePipesArgs

type ComputePipesArgs struct {
	NodeId             int      `json:"node_id"`
	CpipesMode         string   `json:"cpipes_mode"`
	NbrNodes           int      `json:"nbr_nodes"`
	JetsPartitionLabel string   `json:"jets_partition_label"`
	Client             string   `json:"client"`
	Org                string   `json:"org"`
	ObjectType         string   `json:"object_type"`
	InputSessionId     string   `json:"input_session_id"`
	SessionId          string   `json:"session_id"`
	SourcePeriodKey    int      `json:"source_period_key"`
	ProcessName        string   `json:"process_name"`
	FileKey            string   `json:"file_key"`
	InputColumns       []string `json:"input_columns"`
	PipelineExecKey    int      `json:"pipeline_execution_key"`
	PipelineConfigKey  int      `json:"pipeline_config_key"`
	UserEmail          string   `json:"user_email"`
}

Argument to cp_node for sharding and reducing

func (*ComputePipesArgs) CoordinateComputePipes

func (args *ComputePipesArgs) CoordinateComputePipes(ctx context.Context, dsn string) error

type ComputePipesContext

type ComputePipesContext struct {
	ComputePipesArgs
	CpConfig              *compute_pipes.ComputePipesConfig
	FileKeyComponents     map[string]interface{}
	PartFileKeyComponents []CompiledPartFileComponent
	EnvSettings           map[string]interface{}
	ChResults             *compute_pipes.ChannelResults
	Done                  chan struct{}
	ErrCh                 chan error
	FileNamesCh           chan FileName
	DownloadS3ResultCh    chan DownloadS3Result // avoid to modify ChannelResult for now...
}

func (*ComputePipesContext) DownloadS3Files

func (cpCtx *ComputePipesContext) DownloadS3Files(inFolderPath string, fileKeys []string) error

Input arg: done: unbuffered channel to indicate to stop downloading file (must be an error downstream) Returned values: headersFileCh: channel having the first file name to get headers from fileNamesCh: channel having all file names (incl header file), one at a time downloadS3ResultCh: channel indicating the downoader results (nbr of files downloaded or error) inFolderPath: temp folder containing the downloaded files error when setting up the downloader Special case:

Case of multipart files using distribute_data operator where this shardId contains no files in
table compute_pipes_shard_registry:
	- Use of global cpipesShardWithNoFileKeys == true to indicate this situation
	- Case cpipes "reducing": inFile contains the file_key folder of another shardId to get the headers file from it.
		This will get a file to get the headers from but the file list to process (in fileNamesCh) will be empty.
	- Case cpipes "sharding": cpipesFileKeys will contain one file to use to obtain the headers from,
		but the file list to process (in fileNamesCh) will be empty
This is needed to be able to setup the header sctucture and domain key info to be able to process records
obtained by peer nodes even when this node had no file assigned to it originally.

func (*ComputePipesContext) LoadFiles

func (cpCtx *ComputePipesContext) LoadFiles(ctx context.Context, dbpool *pgxpool.Pool)

func (*ComputePipesContext) ProcessFilesAndReportStatus

func (cpCtx *ComputePipesContext) ProcessFilesAndReportStatus(ctx context.Context, dbpool *pgxpool.Pool,
	inFolderPath string) error

func (*ComputePipesContext) ReadCsvFile

func (cpCtx *ComputePipesContext) ReadCsvFile(filePath *FileName, computePipesInputCh chan<- []interface{}) (int64, error)

func (*ComputePipesContext) ReadParquetFile

func (cpCtx *ComputePipesContext) ReadParquetFile(filePath *FileName, computePipesInputCh chan<- []interface{}) (int64, error)

func (*ComputePipesContext) UpdatePipelineExecutionStatus

func (cpCtx *ComputePipesContext) UpdatePipelineExecutionStatus(dbpool *pgxpool.Pool, inputRowCount, outputRowCount int, status, errMessage string) error

Register the CPIPES execution status details to pipeline_execution_details

type ComputePipesRun

type ComputePipesRun struct {
	CpipesCommands []ComputePipesArgs     `json:"cpipesCommands"`
	StartReducing  StartComputePipesArgs  `json:"startReducing"`
	IsLastReducing bool                   `json:"isLastReducing"`
	ReportsCommand []string               `json:"reportsCommand"`
	SuccessUpdate  map[string]interface{} `json:"successUpdate"`
	ErrorUpdate    map[string]interface{} `json:"errorUpdate"`
}

Returned by the cp_starter for a cpipes run

type DownloadS3Result

type DownloadS3Result struct {
	InputFilesCount int
	Err             error
}

type FileName

type FileName struct {
	LocalFileName string
	InFileKey     string
}

type InputStats

type InputStats struct {
	TotalPartfileCount int
	TotalSizeMb        int
}

type StartComputePipesArgs

type StartComputePipesArgs struct {
	PipelineExecKey int     `json:"pipeline_execution_key"`
	FileKey         string  `json:"file_key"`
	SessionId       string  `json:"session_id"`
	InputStepId     *string `json:"input_step_id"`
	NbrPartitions   *int    `json:"nbr_partitions"`
	CurrentStep     *int    `json:"current_step"`
}

Argument to start_cp for starting the cp cluster

func (*StartComputePipesArgs) StartReducingComputePipes

func (args *StartComputePipesArgs) StartReducingComputePipes(ctx context.Context, dsn string, defaultNbrNodes int) (result ComputePipesRun, err error)

func (*StartComputePipesArgs) StartShardingComputePipes

func (args *StartComputePipesArgs) StartShardingComputePipes(ctx context.Context, dsn string, defaultNbrNodes int) (result ComputePipesRun, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL