Documentation ¶
Index ¶
- func CalculatePeriod(year, month, day int) (monthPeriod, weekPeriod, dayPeriod int)
- func DoNotifyApiGateway(fileKey, apiEndpoint, apiEndpointJson, notificationTemplate string, ...) error
- func GetLastComponent(path string) (result string)
- func InsertSourcePeriod(dbpool *pgxpool.Pool, year, month, day int) (int, error)
- func PipelineConfigReady2Execute(dbpool *pgxpool.Pool, processInputKeys *[]int, pipelineConfigKeys *[]int) (*[]int, error)
- func RegisterDomainTables(dbpool *pgxpool.Pool, usingSshTunnel bool, pipelineExecutionKey int) error
- func RunUpdateDb(workspaceName string, serverArgs *[]string) (string, error)
- func SplitFileKeyIntoComponents(keyMap map[string]interface{}, fileKey *string) map[string]interface{}
- func UnitTestWorkspaceAction(ctx *Context, dataTableAction *DataTableAction, token string)
- type Column
- type Context
- func (ctx *Context) AddWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)
- func (ctx *Context) DeleteAllWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) DeleteWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) DeleteWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)
- func (ctx *Context) DoPreviewFileAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
- func (ctx *Context) DoReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
- func (ctx *Context) DoWorkspaceReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
- func (ctx *Context) DropTable(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) ExecDataManagementStatement(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) ExecRawQuery(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) ExecRawQueryMap(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) GetWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) InsertRawRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) InsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) LoadAllFiles(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)
- func (ctx *Context) PutSchemaEventToS3(action *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)
- func (ctx *Context) RegisterFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)
- func (ctx *Context) SaveWorkspaceClientConfig(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) SaveWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) StartPipelineOnInputRegistryInsert(registerFileKeyAction *RegisterFileKeyAction, token string) (*[]map[string]interface{}, int, error)
- func (ctx *Context) SyncFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)
- func (ctx *Context) VerifyUserPermission(sqlStmt *SqlInsertDefinition, token string) (*user.User, error)
- func (ctx *Context) WorkspaceInsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *Context) WorkspaceQueryStructure(dataTableAction *DataTableAction, token string) (results *[]byte, httpStatus int, err error)
- type DataTableAction
- type DataTableColumnDef
- type FromClause
- type RegisterFileKeyAction
- type SourcePeriod
- type SqlInsertDefinition
- type StatusUpdate
- type WhereClause
- type WithClause
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CalculatePeriod ¶
Calculate the month, week, and day period since unix epoch (1/1/1970)
func DoNotifyApiGateway ¶
func GetLastComponent ¶
func InsertSourcePeriod ¶
Insert into source_period and returns the source_period.key If row already exist on table, return the key of that row without inserting a new one
func PipelineConfigReady2Execute ¶
func PipelineConfigReady2Execute(dbpool *pgxpool.Pool, processInputKeys *[]int, pipelineConfigKeys *[]int) (*[]int, error)
Find all pipeline_config ready to go among those in pipelineConfigKeys (to ensure we kick off pipeline with one of inputRegistryKeys)
func RegisterDomainTables ¶
func RegisterDomainTables(dbpool *pgxpool.Pool, usingSshTunnel bool, pipelineExecutionKey int) error
Register Domain Table with input_registry
func RunUpdateDb ¶
Run update_db - function used by apiserver and server
func UnitTestWorkspaceAction ¶
func UnitTestWorkspaceAction(ctx *Context, dataTableAction *DataTableAction, token string)
Execute pipeline in unit test mode
Types ¶
type Context ¶
type Context struct { Dbpool *pgxpool.Pool DevMode bool UsingSshTunnel bool UnitTestDir *string NbrShards int AdminEmail *string }
Environment and settings needed
func NewContext ¶
func (*Context) AddWorkspaceFile ¶
func (ctx *Context) AddWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)
AddWorkspaceFile
func (*Context) DeleteAllWorkspaceChanges ¶
func (ctx *Context) DeleteAllWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
DeleteAllWorkspaceChanges -------------------------------------------------------------------------- Function to delete workspace file changes based on rows in workspace_changes Delete the workspace_changes row and the associated large object
func (*Context) DeleteWorkspaceChanges ¶
func (ctx *Context) DeleteWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
DeleteWorkspaceChanges -------------------------------------------------------------------------- Function to delete workspace file changes based on rows in workspace_changes Delete the workspace_changes row and the associated large object Restaure files from stash, except for .db and .tgz files
func (*Context) DeleteWorkspaceFile ¶
func (ctx *Context) DeleteWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)
DeleteWorkspaceFile
func (*Context) DoPreviewFileAction ¶
func (ctx *Context) DoPreviewFileAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
DoPreviewFileAction ------------------------------------------------------
func (*Context) DoReadAction ¶
func (ctx *Context) DoReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
DoReadAction ------------------------------------------------------
func (*Context) DoWorkspaceReadAction ¶
func (ctx *Context) DoWorkspaceReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
DoWorkspaceReadAction ------------------------------------------------------
func (*Context) DropTable ¶
func (ctx *Context) DropTable(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
DropTable ------------------------------------------------------ These are queries to load reference data for widget, e.g. dropdown list of items
func (*Context) ExecDataManagementStatement ¶
func (*Context) ExecRawQuery ¶
func (ctx *Context) ExecRawQuery(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
ExecRawQuery ------------------------------------------------------ These are queries to load reference data for widget, e.g. dropdown list of items
func (*Context) ExecRawQueryMap ¶
func (ctx *Context) ExecRawQueryMap(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
ExecRawQueryMap ------------------------------------------------------ These are queries to load reference data for widget, e.g. dropdown list of items
func (*Context) GetWorkspaceFileContent ¶
func (ctx *Context) GetWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
GetWorkspaceFileContent -------------------------------------------------------------------------- Function to get the workspace file content based on relative file name Read the file from the workspace on file system since it's already in sync with database
func (*Context) InsertRawRows ¶
func (ctx *Context) InsertRawRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
InsertRawRows ------------------------------------------------------ Insert row function using a raw text buffer containing cst/tsv rows Delegates to InsertRows
func (*Context) InsertRows ¶
func (ctx *Context) InsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
InsertRows ------------------------------------------------------ Main insert row function with pre processing hooks for validating/authorizing the request Main insert row function with post processing hooks for starting pipelines Inserting rows using pre-defined sql statements, keyed by table name provided in dataTableAction
func (*Context) LoadAllFiles ¶
func (ctx *Context) LoadAllFiles(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)
Load All Files for client/org/object_type from a given day_period
func (*Context) PutSchemaEventToS3 ¶
func (ctx *Context) PutSchemaEventToS3(action *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)
Submit Schema Event to S3 (which will call RegisterFileKEys as side effect)
func (*Context) RegisterFileKeys ¶
func (ctx *Context) RegisterFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)
Register file_key with file_key_staging table
func (*Context) SaveWorkspaceClientConfig ¶
func (ctx *Context) SaveWorkspaceClientConfig(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
SaveWorkspaceClientConfig -------------------------------------------------------------------------- Function to save the workspace file content in local workspace file system and in database
func (*Context) SaveWorkspaceFileContent ¶
func (ctx *Context) SaveWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
SaveWorkspaceFileContent -------------------------------------------------------------------------- Function to save the workspace file content in local workspace file system and in database
func (*Context) StartPipelineOnInputRegistryInsert ¶
func (ctx *Context) StartPipelineOnInputRegistryInsert(registerFileKeyAction *RegisterFileKeyAction, token string) (*[]map[string]interface{}, int, error)
Start process based on matching criteria:
- find processes that are ready to start with one of the input input_registry key.
- Pipeline must have automated flag on
func (*Context) SyncFileKeys ¶
func (ctx *Context) SyncFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]interface{}, int, error)
SyncFileKeys ------------------------------------------------------ 12/17/2023: Replacing all keys in file_key_staging to be able to reset keys from source_config that are Part File sources
func (*Context) VerifyUserPermission ¶
func (ctx *Context) VerifyUserPermission(sqlStmt *SqlInsertDefinition, token string) (*user.User, error)
Check that the user has the required permission to execute the action
func (*Context) WorkspaceInsertRows ¶
func (ctx *Context) WorkspaceInsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
WorkspaceInsertRows ------------------------------------------------------ Main insert row function with pre processing hooks for validating/authorizing the request Main insert row function with post processing hooks to perform work async Inserting rows using pre-defined sql statements, keyed by table name provided in dataTableAction
func (*Context) WorkspaceQueryStructure ¶
func (ctx *Context) WorkspaceQueryStructure(dataTableAction *DataTableAction, token string) (results *[]byte, httpStatus int, err error)
WorkspaceQueryStructure ------------------------------------------------------ Function to query the workspace structure, it returns a hierarchical structure modeled based on ui MenuEntry class. It uses a virtual table name to indicate the level of granularity of the structure dataTableAction.FromClauses[0].Table:
case "workspace_file_structure": structure based on files of the workspace case "workspace_object_structure": structure based on object (rule, lookup, class, etc) of the workspace
Initial implementation use workspace_file_structure NOTE: routePath must correspond to the parametrized url (needed by ui MenuEntry) NOTE: routeParam contains the routePath parameters (needed by ui MenuEntry) Input dataTableAction.Data:
[ { "key": "123", "workspace_name": "jets_ws", "user_email": "email here" } ]
Output results:
{ "key": "123", "workspace_name": "jets_ws", "result_type": "workspace_file_structure", "result_data": [ { "key": "a1", "type": "dir", "label": "Jet Rules", "route_path": "/workspace/:workspace_name/jetRules", "route_params": { "workspace_name": "jets_ws", }, "children": [ { "key": "a1.1", "type": "dir", "label": "folder name", "children": [ { "key": "a1.1.1", "type": "file", "label": "mapping_rules.jr", "route_path": "/workspace/:workspace_name/wsFile/:file_name", "route_params": { "workspace_name": "jets_ws", "file_name": "jet_rules%03mapping_rules.jr", } } ] } ] } ] }
type DataTableAction ¶
type DataTableAction struct { Action string `json:"action"` WorkspaceName string `json:"workspaceName"` WorkspaceBranch string `json:"workspaceBranch"` FeatureBranch string `json:"featureBranch"` RawQuery string `json:"query"` RawQueryMap map[string]string `json:"query_map"` Columns []Column `json:"columns"` FromClauses []FromClause `json:"fromClauses"` WhereClauses []WhereClause `json:"whereClauses"` WithClauses []WithClause `json:"withClauses"` DistinctOnClauses []string `json:"distinctOnClauses"` SortColumn string `json:"sortColumn"` SortColumnTable string `json:"sortColumnTable"` SortAscending bool `json:"sortAscending"` Offset int `json:"offset"` Limit int `json:"limit"` // used for raw_query & raw_query_tool action only RequestColumnDef bool `json:"requestColumnDef"` Data []map[string]interface{} `json:"data"` }
sql access builder
type DataTableColumnDef ¶
type DataTableColumnDef struct { Index int `json:"index"` Name string `json:"name"` Label string `json:"label"` Tooltips string `json:"tooltips"` IsNumeric bool `json:"isnumeric"` }
DataTableColumnDef used when returning the column definition obtained from db catalog
func (*DataTableColumnDef) String ¶
func (dc *DataTableColumnDef) String() string
type FromClause ¶
type RegisterFileKeyAction ¶
type SourcePeriod ¶
type SourcePeriod struct { Key int `json:"key"` Year int `json:"year"` Month int `json:"month"` Day int `json:"day"` MonthPeriod int `json:"month_period"` WeekPeriod int `json:"week_period"` DayPeriod int `json:"day_period"` }
* TODO refactor to use SourcePeriod entity
func LoadSourcePeriod ¶
func LoadSourcePeriod(dbpool *pgxpool.Pool, key int) (sp SourcePeriod, err error)
Load source period info from database by key
type SqlInsertDefinition ¶
type SqlInsertDefinition struct { Stmt string ColumnKeys []string AdminOnly bool Capability string }
Simple definition of sql statement for insert
type StatusUpdate ¶
type StatusUpdate struct { CpipesMode bool CpipesEnv map[string]any AwsDsnSecret string DbPoolSize int UsingSshTunnel bool AwsRegion string Dsn string Dbpool *pgxpool.Pool PeKey int Status string FileKey string FailureDetails string }
Status Update command line arguments When used as a delegate from apiserver Dbpool is non nil and then the connection properties (AwsDsnSecret, DbPoolSize, UsingSshTunnel, AwsRegion) are not needed.
func (*StatusUpdate) CoordinateWork ¶
func (ca *StatusUpdate) CoordinateWork() error
func (*StatusUpdate) ValidateArguments ¶
func (ca *StatusUpdate) ValidateArguments() []string
Package Main Functions --------------------------------------------------------------------------------------