Documentation
¶
Index ¶
- func DoJobAndReportStatus(dbpool *pgxpool.Pool, ca *CommandArguments) error
- type BadRow
- type CommandArguments
- type ExecuteRulesResult
- type PipelineConfig
- type PipelineResult
- type ProcessConfig
- type ProcessInput
- type ProcessMap
- type ReteInputContext
- type ReteWorkspace
- type RuleConfig
- type ServerContext
- func (ctx *ServerContext) ProcessData(reteWorkspace *ReteWorkspace) (*PipelineResult, error)
- func (ctx *ServerContext) ReadInput(done <-chan struct{}, mainInput *ProcessInput, reteWorkspace *ReteWorkspace) (<-chan groupedJetRows, <-chan readResult)
- func (ctx *ServerContext) ReadRow(rows *pgx.Rows, processInput *ProcessInput) ([]interface{}, error)
- type ServerNodeArgs
- type WriteTableResult
- type WriteTableSource
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func DoJobAndReportStatus ¶
func DoJobAndReportStatus(dbpool *pgxpool.Pool, ca *CommandArguments) error
Types ¶
type BadRow ¶
type BadRow struct { PEKey sql.NullInt64 GroupingKey sql.NullString RowJetsKey sql.NullString InputColumn sql.NullString ErrorMessage sql.NullString ReteSessionSaved string ReteSessionTriples sql.NullString }
type CommandArguments ¶
type ExecuteRulesResult ¶
type ExecuteRulesResult struct {
ExecuteRulesCount int
}
type PipelineConfig ¶
type PipelineConfig struct {
// contains filtered or unexported fields
}
Main data entity
func ReadPipelineConfig ¶
func ReadPipelineConfig(dbpool *pgxpool.Pool, peKey int) (*PipelineConfig, error)
Main Pipeline Configuration Read Function -----------------------------------------
func (*PipelineConfig) GetProcessConfig ¶
func (pc *PipelineConfig) GetProcessConfig() *ProcessConfig
func (*PipelineConfig) String ¶
func (pc *PipelineConfig) String() string
type PipelineResult ¶
type PipelineResult struct { Status string InputRecordsCount int ExecuteRulesCount int OutputRecordsCount map[string]int64 TotalOutputCount int64 }
func (*PipelineResult) UpdatePipelineExecutionStatus ¶
func (pr *PipelineResult) UpdatePipelineExecutionStatus(dbpool *pgxpool.Pool, pipelineExecutionKey int, shardId int, errMessage string) error
PipelineResult Method to update status Register the status details to pipeline_execution_details Lock the sessionId & Register output tables (register sessionId with session_registry) if not failed Do nothing if pipelineExecutionKey < 0
type ProcessConfig ¶
type ProcessConfig struct {
// contains filtered or unexported fields
}
type ProcessInput ¶
type ProcessInput struct {
// contains filtered or unexported fields
}
codeValueMapping structure:
{ "acme:patientGender": { "0": "M", "1": "F" } }
Where acme:patientGender is the domain property, "0" and "1" are the client-specific codes and "M", "F" are the canonical codes to use for the domain property
func (*ProcessInput) String ¶
func (pi *ProcessInput) String() string
type ProcessMap ¶
type ProcessMap struct {
// contains filtered or unexported fields
}
type ReteInputContext ¶
type ReteInputContext struct {
// contains filtered or unexported fields
}
type ReteWorkspace ¶
type ReteWorkspace struct {
// contains filtered or unexported fields
}
func LoadReteWorkspace ¶
func LoadReteWorkspace( lookupDb string, pipelineConfig *PipelineConfig) (*ReteWorkspace, error)
Load the rete workspace database via cgo
func (*ReteWorkspace) ExecuteRules ¶
func (rw *ReteWorkspace) ExecuteRules( workerId int, dataInputc <-chan groupedJetRows, outputSpecs workspace.OutputTableSpecs, writeOutputc map[string][]chan []interface{}) (*ExecuteRulesResult, error)
main processing function to execute rules
func (*ReteWorkspace) GetRangeDataType ¶
func (rw *ReteWorkspace) GetRangeDataType(dataProperty string) (string, bool, error)
GetRangeDataType: Get the data type for the range of the dataProperty arg
func (*ReteWorkspace) Release ¶
func (rw *ReteWorkspace) Release() error
Terminate the c++ allocated resources
type RuleConfig ¶
type RuleConfig struct {
// contains filtered or unexported fields
}
type ServerContext ¶
type ServerContext struct {
// contains filtered or unexported fields
}
func (*ServerContext) ProcessData ¶
func (ctx *ServerContext) ProcessData(reteWorkspace *ReteWorkspace) (*PipelineResult, error)
Main pipeline processing function Note: ALWAYS return a non nil *PipelineResult (needed to register result)
func (*ServerContext) ReadInput ¶
func (ctx *ServerContext) ReadInput(done <-chan struct{}, mainInput *ProcessInput, reteWorkspace *ReteWorkspace) (<-chan groupedJetRows, <-chan readResult)
readInput read the input tables and groups the rows according to the grouping column this is the main read function
func (*ServerContext) ReadRow ¶
func (ctx *ServerContext) ReadRow(rows *pgx.Rows, processInput *ProcessInput) ([]interface{}, error)
type ServerNodeArgs ¶
type WriteTableResult ¶
type WriteTableResult struct {
// contains filtered or unexported fields
}
type WriteTableSource ¶
type WriteTableSource struct {
// contains filtered or unexported fields
}
func (*WriteTableSource) Err ¶
func (wt *WriteTableSource) Err() error
func (*WriteTableSource) Next ¶
func (wt *WriteTableSource) Next() bool
pgx.CopyFromSource interface
func (*WriteTableSource) Values ¶
func (wt *WriteTableSource) Values() ([]interface{}, error)