Documentation ¶
Index ¶
- func DownloadS3Object(s3Key, localDir string, minSize int64) (string, error)
- func GetFileKeys(ctx context.Context, dbpool *pgxpool.Pool, sessionId string, nodeId int) ([]string, error)
- func ShardFileKeys(exeCtx context.Context, dbpool *pgxpool.Pool, baseFileKey string, ...) (int, error)
- func ShardFileKeysP1(exeCtx context.Context, dbpool *pgxpool.Pool, baseFileKey string, ...) (int, int64, error)
- func ShardFileKeysP2(exeCtx context.Context, dbpool *pgxpool.Pool, baseFileKey string, ...) error
- type CompiledPartFileComponent
- type ComputePipesArgs
- type ComputePipesContext
- func (cpCtx *ComputePipesContext) DownloadS3Files(inFolderPath string, fileKeys []string) error
- func (cpCtx *ComputePipesContext) LoadFiles(ctx context.Context, dbpool *pgxpool.Pool)
- func (cpCtx *ComputePipesContext) ProcessFilesAndReportStatus(ctx context.Context, dbpool *pgxpool.Pool, inFolderPath string) error
- func (cpCtx *ComputePipesContext) ReadCsvFile(filePath *FileName, computePipesInputCh chan<- []interface{}) (int64, error)
- func (cpCtx *ComputePipesContext) ReadParquetFile(filePath *FileName, computePipesInputCh chan<- []interface{}) (int64, error)
- func (cpCtx *ComputePipesContext) UpdatePipelineExecutionStatus(dbpool *pgxpool.Pool, inputRowCount, outputRowCount int, ...) error
- type ComputePipesRun
- type DownloadS3Result
- type FileName
- type InputStats
- type StartComputePipesArgs
- func (args *StartComputePipesArgs) StartReducingComputePipes(ctx context.Context, dsn string, defaultNbrNodes int) (result ComputePipesRun, err error)
- func (args *StartComputePipesArgs) StartShardingComputePipes(ctx context.Context, dsn string, defaultNbrNodes int) (result ComputePipesRun, err error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DownloadS3Object ¶
func GetFileKeys ¶
func GetFileKeys(ctx context.Context, dbpool *pgxpool.Pool, sessionId string, nodeId int) ([]string, error)
Get the file_key(s) assigned to nodeId -- these are the input multipart files
func ShardFileKeys ¶
func ShardFileKeys(exeCtx context.Context, dbpool *pgxpool.Pool, baseFileKey string, sessionId string, clusterConfig *compute_pipes.ClusterSpec) (int, error)
Action to assign input file keys to nodes aka shards. Assign file_key to shard (node_id, sc_node_id, sc_id) into jetsapi.compute_pipes_shard_registry This is done in 2 steps, first load the file_key and file_size into the table Then allocate the file_key using a round robin to sc_is and sc_node_id in decreasing order of file size.
Types ¶
type ComputePipesArgs ¶
type ComputePipesArgs struct { NodeId int `json:"node_id"` CpipesMode string `json:"cpipes_mode"` NbrNodes int `json:"nbr_nodes"` JetsPartitionLabel string `json:"jets_partition_label"` Client string `json:"client"` Org string `json:"org"` ObjectType string `json:"object_type"` InputSessionId string `json:"input_session_id"` SessionId string `json:"session_id"` SourcePeriodKey int `json:"source_period_key"` ProcessName string `json:"process_name"` FileKey string `json:"file_key"` InputColumns []string `json:"input_columns"` PipelineExecKey int `json:"pipeline_execution_key"` PipelineConfigKey int `json:"pipeline_config_key"` UserEmail string `json:"user_email"` }
Argument to cp_node for sharding and reducing
func (*ComputePipesArgs) CoordinateComputePipes ¶
func (args *ComputePipesArgs) CoordinateComputePipes(ctx context.Context, dsn string) error
type ComputePipesContext ¶
type ComputePipesContext struct { ComputePipesArgs CpConfig *compute_pipes.ComputePipesConfig FileKeyComponents map[string]interface{} PartFileKeyComponents []CompiledPartFileComponent EnvSettings map[string]interface{} ChResults *compute_pipes.ChannelResults Done chan struct{} ErrCh chan error FileNamesCh chan FileName DownloadS3ResultCh chan DownloadS3Result // avoid to modify ChannelResult for now... }
func (*ComputePipesContext) DownloadS3Files ¶
func (cpCtx *ComputePipesContext) DownloadS3Files(inFolderPath string, fileKeys []string) error
Input arg: done: unbuffered channel to indicate to stop downloading file (must be an error downstream) Returned values: headersFileCh: channel having the first file name to get headers from fileNamesCh: channel having all file names (incl header file), one at a time downloadS3ResultCh: channel indicating the downoader results (nbr of files downloaded or error) inFolderPath: temp folder containing the downloaded files error when setting up the downloader Special case:
Case of multipart files using distribute_data operator where this shardId contains no files in table compute_pipes_shard_registry: - Use of global cpipesShardWithNoFileKeys == true to indicate this situation - Case cpipes "reducing": inFile contains the file_key folder of another shardId to get the headers file from it. This will get a file to get the headers from but the file list to process (in fileNamesCh) will be empty. - Case cpipes "sharding": cpipesFileKeys will contain one file to use to obtain the headers from, but the file list to process (in fileNamesCh) will be empty This is needed to be able to setup the header sctucture and domain key info to be able to process records obtained by peer nodes even when this node had no file assigned to it originally.
func (*ComputePipesContext) LoadFiles ¶
func (cpCtx *ComputePipesContext) LoadFiles(ctx context.Context, dbpool *pgxpool.Pool)
func (*ComputePipesContext) ProcessFilesAndReportStatus ¶
func (*ComputePipesContext) ReadCsvFile ¶
func (cpCtx *ComputePipesContext) ReadCsvFile(filePath *FileName, computePipesInputCh chan<- []interface{}) (int64, error)
func (*ComputePipesContext) ReadParquetFile ¶
func (cpCtx *ComputePipesContext) ReadParquetFile(filePath *FileName, computePipesInputCh chan<- []interface{}) (int64, error)
func (*ComputePipesContext) UpdatePipelineExecutionStatus ¶
func (cpCtx *ComputePipesContext) UpdatePipelineExecutionStatus(dbpool *pgxpool.Pool, inputRowCount, outputRowCount int, status, errMessage string) error
Register the CPIPES execution status details to pipeline_execution_details
type ComputePipesRun ¶
type ComputePipesRun struct { CpipesCommands []ComputePipesArgs `json:"cpipesCommands"` StartReducing StartComputePipesArgs `json:"startReducing"` IsLastReducing bool `json:"isLastReducing"` ReportsCommand []string `json:"reportsCommand"` SuccessUpdate map[string]interface{} `json:"successUpdate"` ErrorUpdate map[string]interface{} `json:"errorUpdate"` }
Returned by the cp_starter for a cpipes run
type DownloadS3Result ¶
type InputStats ¶
type StartComputePipesArgs ¶
type StartComputePipesArgs struct { PipelineExecKey int `json:"pipeline_execution_key"` FileKey string `json:"file_key"` SessionId string `json:"session_id"` InputStepId *string `json:"input_step_id"` NbrPartitions *int `json:"nbr_partitions"` CurrentStep *int `json:"current_step"` }
Argument to start_cp for starting the cp cluster
func (*StartComputePipesArgs) StartReducingComputePipes ¶
func (args *StartComputePipesArgs) StartReducingComputePipes(ctx context.Context, dsn string, defaultNbrNodes int) (result ComputePipesRun, err error)
func (*StartComputePipesArgs) StartShardingComputePipes ¶
func (args *StartComputePipesArgs) StartShardingComputePipes(ctx context.Context, dsn string, defaultNbrNodes int) (result ComputePipesRun, err error)