delegate

package
v0.0.0-...-70c88cb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DoJobAndReportStatus

func DoJobAndReportStatus(dbpool *pgxpool.Pool, ca *CommandArguments) error

Types

type BadRow

type BadRow struct {
	PEKey              sql.NullInt64
	GroupingKey        sql.NullString
	RowJetsKey         sql.NullString
	InputColumn        sql.NullString
	ErrorMessage       sql.NullString
	ReteSessionSaved   string
	ReteSessionTriples sql.NullString
}

func NewBadRow

func NewBadRow() BadRow

func (BadRow) String

func (br BadRow) String() string

type CommandArguments

type CommandArguments struct {
	AwsRegion         string
	LookupDb          string
	PipelineConfigKey int
	PipelineExecKey   int
	PoolSize          int
	OutSessionId      string
	Limit             int
	NbrShards         int
	ShardId           int
	CompletedMetric   string
	FailedMetric      string
	DevMode           bool
}

type ExecuteRulesResult

type ExecuteRulesResult struct {
	ExecuteRulesCount int
}

type PipelineConfig

type PipelineConfig struct {
	// contains filtered or unexported fields
}

Main data entity

func ReadPipelineConfig

func ReadPipelineConfig(dbpool *pgxpool.Pool, peKey int) (*PipelineConfig, error)

Main Pipeline Configuration Read Function -----------------------------------------

func (*PipelineConfig) GetProcessConfig

func (pc *PipelineConfig) GetProcessConfig() *ProcessConfig

func (*PipelineConfig) String

func (pc *PipelineConfig) String() string

type PipelineResult

type PipelineResult struct {
	Status             string
	InputRecordsCount  int
	ExecuteRulesCount  int
	OutputRecordsCount map[string]int64
	TotalOutputCount   int64
}

func (*PipelineResult) UpdatePipelineExecutionStatus

func (pr *PipelineResult) UpdatePipelineExecutionStatus(dbpool *pgxpool.Pool, pipelineExecutionKey int,
	shardId int, errMessage string) error

PipelineResult Method to update status Register the status details to pipeline_execution_details Lock the sessionId & Register output tables (register sessionId with session_registry) if not failed Do nothing if pipelineExecutionKey < 0

type ProcessConfig

type ProcessConfig struct {
	// contains filtered or unexported fields
}

type ProcessInput

type ProcessInput struct {
	// contains filtered or unexported fields
}

codeValueMapping structure:

{
	"acme:patientGender": {
		"0": "M",
		"1": "F"
	}
}

Where acme:patientGender is the domain property, "0" and "1" are the client-specific codes and "M", "F" are the canonical codes to use for the domain property

func (*ProcessInput) String

func (pi *ProcessInput) String() string

type ProcessMap

type ProcessMap struct {
	// contains filtered or unexported fields
}

type ReteInputContext

type ReteInputContext struct {
	// contains filtered or unexported fields
}

type ReteWorkspace

type ReteWorkspace struct {
	// contains filtered or unexported fields
}

func LoadReteWorkspace

func LoadReteWorkspace(
	lookupDb string,
	pipelineConfig *PipelineConfig) (*ReteWorkspace, error)

Load the rete workspace database via cgo

func (*ReteWorkspace) ExecuteRules

func (rw *ReteWorkspace) ExecuteRules(
	workerId int,
	dataInputc <-chan groupedJetRows,
	outputSpecs workspace.OutputTableSpecs,
	writeOutputc map[string][]chan []interface{}) (*ExecuteRulesResult, error)

main processing function to execute rules

func (*ReteWorkspace) GetRangeDataType

func (rw *ReteWorkspace) GetRangeDataType(dataProperty string) (string, bool, error)

GetRangeDataType: Get the data type for the range of the dataProperty arg

func (*ReteWorkspace) Release

func (rw *ReteWorkspace) Release() error

Terminate the c++ allocated resources

type RuleConfig

type RuleConfig struct {
	// contains filtered or unexported fields
}

type ServerContext

type ServerContext struct {
	// contains filtered or unexported fields
}

func (*ServerContext) ProcessData

func (ctx *ServerContext) ProcessData(reteWorkspace *ReteWorkspace) (*PipelineResult, error)

Main pipeline processing function Note: ALWAYS return a non nil *PipelineResult (needed to register result)

func (*ServerContext) ReadInput

func (ctx *ServerContext) ReadInput(done <-chan struct{}, mainInput *ProcessInput, reteWorkspace *ReteWorkspace) (<-chan groupedJetRows, <-chan readResult)

readInput read the input tables and groups the rows according to the grouping column this is the main read function

func (*ServerContext) ReadRow

func (ctx *ServerContext) ReadRow(rows *pgx.Rows, processInput *ProcessInput) ([]interface{}, error)

type ServerNodeArgs

type ServerNodeArgs struct {
	PipelineExecKey int `json:"pe"`
	NodeId          int `json:"id"`
}

func (*ServerNodeArgs) RunServer

func (args *ServerNodeArgs) RunServer(ctx context.Context, dsn string, dbpool *pgxpool.Pool) error

type WriteTableResult

type WriteTableResult struct {
	// contains filtered or unexported fields
}

type WriteTableSource

type WriteTableSource struct {
	// contains filtered or unexported fields
}

func (*WriteTableSource) Err

func (wt *WriteTableSource) Err() error

func (*WriteTableSource) Next

func (wt *WriteTableSource) Next() bool

pgx.CopyFromSource interface

func (*WriteTableSource) Values

func (wt *WriteTableSource) Values() ([]interface{}, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL