Documentation ¶
Index ¶
- Constants
- func BuildKey(fieldMap map[string]interface{}, idxDef *IdxDef) (string, error)
- func CalculateFieldValue(fieldName string, fieldDef *WriteTableFieldDef, srcVars eval.VarValuesMap, ...) (interface{}, error)
- func CheckValueType(val interface{}, fieldType TableFieldType) error
- func DefaultCassandraDecimal2() *inf.Dec
- func DefaultDateTime() time.Time
- func DefaultDecimal2() decimal.Decimal
- func GetDefaultFieldTypeValue(fieldType TableFieldType) interface{}
- func IsValidFieldType(fieldType TableFieldType) bool
- func NewScriptFromFiles(caPath string, privateKeys map[string]string, scriptUri string, ...) (*ScriptDef, error, ScriptInitProblemType)
- func ParseRawGolangExpressionStringAndHarvestFieldRefs(strExp string, usedFields *FieldRefs) (ast.Expr, error)
- func ParseRawRelaxedGolangExpressionStringAndHarvestFieldRefs(strExp string, usedFields *FieldRefs, parserFlags FieldRefParserFlag) (ast.Expr, error)
- func ValidateNodeType(nodeType NodeType) error
- func ValidateRerunPolicy(rerunPolicy NodeRerunPolicy) error
- func ValidateStartPolicy(startPolicy NodeStartPolicy) error
- type AggFinderVisitor
- type CustomProcessorDef
- type CustomProcessorDefFactory
- type DependencyPolicyDef
- type DependencyRule
- type FieldRef
- type FieldRefParserFlag
- type FieldRefs
- func (fieldRefs *FieldRefs) Append(otherFieldRefs FieldRefs)
- func (fieldRefs *FieldRefs) AppendWithFilter(otherFieldRefs FieldRefs, tableFilter string)
- func (fieldRefs *FieldRefs) FindByFieldName(fieldName string) (*FieldRef, bool)
- func (fieldRefs *FieldRefs) HasFieldsWithTableAlias(tableAlias string) bool
- type FileColumnIndexingMode
- type FileCreatorDef
- func (creatorDef *FileCreatorDef) CalculateFileRecordFromSrcVars(srcVars eval.VarValuesMap) ([]interface{}, error)
- func (creatorDef *FileCreatorDef) CheckFileRecordHavingCondition(fileRecord []interface{}) (bool, error)
- func (creatorDef *FileCreatorDef) Deserialize(rawWriter json.RawMessage) error
- func (creatorDef *FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions() FieldRefs
- func (creatorDef *FileCreatorDef) HasTop() bool
- type FileReaderColumnDef
- type FileReaderDef
- type IdxCaseSensitivity
- type IdxComponentDef
- type IdxDef
- type IdxDefMap
- type IdxSortOrder
- type IdxUniqueness
- type IndexRef
- type LookupDef
- type LookupJoinType
- type NodeRerunPolicy
- type NodeStartPolicy
- type NodeType
- type ReadyToRunNodeCmdType
- type ScriptDef
- type ScriptInitProblemType
- type ScriptNodeDef
- func (node *ScriptNodeDef) Deserialize(customProcessorDefFactory CustomProcessorDefFactory, ...) error
- func (node *ScriptNodeDef) GetTargetName() string
- func (node *ScriptNodeDef) GetTokenIntervalsByNumberOfBatches() ([][]int64, error)
- func (node *ScriptNodeDef) GetUniqueIndexesFieldRefs() *FieldRefs
- func (node *ScriptNodeDef) HasCustomProcessor() bool
- func (node *ScriptNodeDef) HasFileCreator() bool
- func (node *ScriptNodeDef) HasFileReader() bool
- func (node *ScriptNodeDef) HasLookup() bool
- func (node *ScriptNodeDef) HasTableCreator() bool
- func (node *ScriptNodeDef) HasTableReader() bool
- type TableCreatorDef
- func (creatorDef *TableCreatorDef) CalculateTableRecordFromSrcVars(canUseAggFunc bool, srcVars eval.VarValuesMap) (map[string]interface{}, error)
- func (creatorDef *TableCreatorDef) CheckTableRecordHavingCondition(tableRecord map[string]interface{}) (bool, error)
- func (tcDef *TableCreatorDef) Deserialize(rawWriter json.RawMessage) error
- func (creatorDef *TableCreatorDef) GetFieldDefaultReadyForDb(fieldName string) (interface{}, error)
- func (tcDef *TableCreatorDef) GetFieldRefs() *FieldRefs
- func (tcDef *TableCreatorDef) GetFieldRefsWithAlias(useTableAlias string) *FieldRefs
- type TableFieldType
- type TableReaderDef
- type TableUpdaterDef
- type TopDef
- type WriteFileColumnDef
- type WriteTableFieldDef
Constants ¶
View Source
const ( DefaultStringComponentLen int64 = 64 MinStringComponentLen int64 = 16 MaxStringComponentLen int64 = 1024 )
View Source
const ( ReservedParamBatchIdx string = "{batch_idx|string}" ReservedParamRunId string = "{run_id|string}" )
View Source
const ( HandlerExeTypeGeneric string = "capi_daemon" HandlerExeTypeToolbelt string = "capi_toolbelt" HandlerExeTypeWebapi string = "capi_webapi" )
View Source
const AllowedIdxNameRegex = "^idx[A-Za-z0-9_]+"
View Source
const AllowedTableNameRegex = "[A-Za-z0-9_]+"
View Source
const BeginningOfTimeMicro = int64(-62135596800000000) // time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC).UnixMicro()
View Source
const CassandraDatetimeFormat string = "2006-01-02T15:04:05.000-07:00"
Cassandra timestamps are milliseconds. No microsecond support. On writes: - allows (but not requires) ":" in the timezone - allows (but not requires) "T" in as date/time separator
View Source
const CreatorAlias string = "w"
View Source
const CustomProcessorAlias string = "p"
View Source
const DefaultBool bool = false
View Source
const DefaultFloat float64 = float64(0.0)
View Source
const DefaultInt int64 = int64(0)
View Source
const DefaultRowsetSize int = 1000
View Source
const DefaultString string = ""
View Source
const (
FieldNameUnknown = "unknown_field_name"
)
View Source
const LookupAlias string = "l"
View Source
const MaxAcceptedBatchesByTableReader int = 1000000
View Source
const MaxFileCreatorTopLimit int = 500000
View Source
const MaxRowsetSize int = 100000
View Source
const ProhibitedTableNameRegex = "^idx|^wf|^system"
View Source
const ReaderAlias string = "r"
Variables ¶
This section is empty.
Functions ¶
func CalculateFieldValue ¶
func CalculateFieldValue(fieldName string, fieldDef *WriteTableFieldDef, srcVars eval.VarValuesMap, canUseAggFunc bool) (interface{}, error)
func CheckValueType ¶
func CheckValueType(val interface{}, fieldType TableFieldType) error
func DefaultCassandraDecimal2 ¶
func DefaultCassandraDecimal2() *inf.Dec
func DefaultDateTime ¶
func DefaultDecimal2 ¶
func GetDefaultFieldTypeValue ¶
func GetDefaultFieldTypeValue(fieldType TableFieldType) interface{}
func IsValidFieldType ¶
func IsValidFieldType(fieldType TableFieldType) bool
func NewScriptFromFiles ¶
func NewScriptFromFiles(caPath string, privateKeys map[string]string, scriptUri string, scriptParamsUri string, customProcessorDefFactoryInstance CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage) (*ScriptDef, error, ScriptInitProblemType)
func ParseRawRelaxedGolangExpressionStringAndHarvestFieldRefs ¶ added in v1.1.4
func ValidateNodeType ¶
func ValidateRerunPolicy ¶
func ValidateRerunPolicy(rerunPolicy NodeRerunPolicy) error
func ValidateStartPolicy ¶
func ValidateStartPolicy(startPolicy NodeStartPolicy) error
Types ¶
type AggFinderVisitor ¶
type AggFinderVisitor struct {
Error error
}
type CustomProcessorDef ¶
type CustomProcessorDef interface { Deserialize(raw json.RawMessage, customProcSettings json.RawMessage, caPath string, privateKeys map[string]string) error GetFieldRefs() *FieldRefs GetUsedInTargetExpressionsFields() *FieldRefs }
type CustomProcessorDefFactory ¶
type CustomProcessorDefFactory interface {
Create(processorType string) (CustomProcessorDef, bool)
}
type DependencyPolicyDef ¶
type DependencyPolicyDef struct { EventPriorityOrderString string `json:"event_priority_order"` IsDefault bool `json:"is_default"` Rules []DependencyRule `json:"rules"` OrderIdxDef IdxDef }
func (*DependencyPolicyDef) Deserialize ¶
func (polDef *DependencyPolicyDef) Deserialize(rawPol json.RawMessage) error
type DependencyRule ¶
type DependencyRule struct { Cmd ReadyToRunNodeCmdType `json:"cmd"` RawExpression string `json:"expression"` ParsedExpression ast.Expr }
type FieldRef ¶
type FieldRef struct { TableName string FieldName string FieldType TableFieldType }
func IdxKeyFieldRef ¶
func IdxKeyFieldRef() FieldRef
func RunBatchKeyTokenFieldRef() FieldRef { return FieldRef{ TableName: "db_system", FieldName: "token(run_id,batch_idx,key)", FieldType: FieldTypeInt} }
func KeyTokenFieldRef ¶
func KeyTokenFieldRef() FieldRef
func RowidFieldRef ¶
func RowidTokenFieldRef ¶
func RowidTokenFieldRef() FieldRef
func (*FieldRef) GetAliasHash ¶
type FieldRefParserFlag ¶ added in v1.1.4
type FieldRefParserFlag uint32
const ( FieldRefStrict FieldRefParserFlag = 0 FieldRefAllowUnknownIdents FieldRefParserFlag = 1 << iota FieldRefAllowWhateverFeatureYouAreAddingHere )
func (*FieldRefParserFlag) AddFlag ¶ added in v1.1.4
func (f *FieldRefParserFlag) AddFlag(flag FieldRefParserFlag)
func (*FieldRefParserFlag) ClearFlag ¶ added in v1.1.4
func (f *FieldRefParserFlag) ClearFlag(flag FieldRefParserFlag)
func (FieldRefParserFlag) HasFlag ¶ added in v1.1.4
func (f FieldRefParserFlag) HasFlag(flag FieldRefParserFlag) bool
func (*FieldRefParserFlag) ToggleFlag ¶ added in v1.1.4
func (f *FieldRefParserFlag) ToggleFlag(flag FieldRefParserFlag)
type FieldRefs ¶
type FieldRefs []FieldRef
func GetFieldRefsUsedInAllTargetExpressions ¶
func GetFieldRefsUsedInAllTargetExpressions(fieldDefMap map[string]*WriteTableFieldDef) FieldRefs
func JoinFieldRefs ¶
func NewFieldRefsFromNodeEvent ¶
func NewFieldRefsFromNodeEvent() *FieldRefs
func (*FieldRefs) AppendWithFilter ¶
func (*FieldRefs) FindByFieldName ¶
func (*FieldRefs) HasFieldsWithTableAlias ¶
type FileColumnIndexingMode ¶
type FileColumnIndexingMode string
const ( FileColumnIndexingName FileColumnIndexingMode = "name" FileColumnIndexingIdx FileColumnIndexingMode = "idx" FileColumnIndexingUnknown FileColumnIndexingMode = "unknown" )
type FileCreatorDef ¶
type FileCreatorDef struct { RawHaving string `json:"having"` Having ast.Expr UsedInHavingFields FieldRefs UsedInTargetExpressionsFields FieldRefs Columns []WriteFileColumnDef `json:"columns"` UrlTemplate string `json:"url_template"` Top TopDef `json:"top"` Separator string `json:"separator"` }
func (*FileCreatorDef) CalculateFileRecordFromSrcVars ¶
func (creatorDef *FileCreatorDef) CalculateFileRecordFromSrcVars(srcVars eval.VarValuesMap) ([]interface{}, error)
func (*FileCreatorDef) CheckFileRecordHavingCondition ¶
func (creatorDef *FileCreatorDef) CheckFileRecordHavingCondition(fileRecord []interface{}) (bool, error)
func (*FileCreatorDef) Deserialize ¶
func (creatorDef *FileCreatorDef) Deserialize(rawWriter json.RawMessage) error
func (*FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions ¶
func (creatorDef *FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions() FieldRefs
func (*FileCreatorDef) HasTop ¶
func (creatorDef *FileCreatorDef) HasTop() bool
type FileReaderColumnDef ¶
type FileReaderColumnDef struct { SrcColIdx int `json:"col_idx"` SrcColHeader string `json:"col_hdr"` SrcColFormat string `json:"col_format"` // Optional for all except datetime DefaultValue string `json:"col_default_value"` // Optional. If omitted, zero value is used Type TableFieldType `json:"col_type"` }
type FileReaderDef ¶
type FileReaderDef struct { SrcFileUrls []string `json:"urls"` SrcFileHdrLineIdx int `json:"hdr_line_idx"` SrcFileFirstDataLineIdx int `json:"first_data_line_idx"` Columns map[string]*FileReaderColumnDef `json:"columns"` // Keys are names used in table writer Separator string `json:"separator"` ColumnIndexingMode FileColumnIndexingMode }
func (*FileReaderDef) Deserialize ¶
func (frDef *FileReaderDef) Deserialize(rawReader json.RawMessage) error
func (*FileReaderDef) ReadLineToValuesMap ¶
func (frDef *FileReaderDef) ReadLineToValuesMap(line *[]string, colVars eval.VarValuesMap) error
func (*FileReaderDef) ResolveColumnIndexesFromNames ¶
func (frDef *FileReaderDef) ResolveColumnIndexesFromNames(srcHdrLine []string) error
type IdxCaseSensitivity ¶
type IdxCaseSensitivity string
const ( IdxCaseSensitive IdxCaseSensitivity = "case_sensitive" IdxIgnoreCase IdxCaseSensitivity = "ignore_case" IdxCaseSensitivityUnknown IdxCaseSensitivity = "case_sensitivity_unknown" )
type IdxComponentDef ¶
type IdxComponentDef struct { FieldName string CaseSensitivity IdxCaseSensitivity SortOrder IdxSortOrder StringLen int64 // For string fields only, default 64 FieldType TableFieldType // Populated from tgt_table def }
type IdxDef ¶
type IdxDef struct { Uniqueness IdxUniqueness Components []IdxComponentDef }
type IdxSortOrder ¶
type IdxSortOrder string
const ( IdxSortAsc IdxSortOrder = "asc" IdxSortDesc IdxSortOrder = "desc" IdxSortUnknown IdxSortOrder = "unknown" )
type IdxUniqueness ¶
type IdxUniqueness string
const ( IdxUnique IdxUniqueness = "unique" IdxNonUnique IdxUniqueness = "non_unique" IdxUniquenessUnknown IdxUniqueness = "unknown" )
type LookupDef ¶
type LookupDef struct { IndexName string `json:"index_name"` RawJoinOn string `json:"join_on"` IsGroup bool `json:"group"` RawFilter string `json:"filter"` LookupJoin LookupJoinType `json:"join_type"` IdxReadBatchSize int `json:"idx_read_batch_size"` RightLookupReadBatchSize int `json:"right_lookup_read_batch_size"` LeftTableFields FieldRefs // In the same order as lookup idx - important TableCreator *TableCreatorDef // Populated when walking through al nodes UsedInFilterFields FieldRefs Filter ast.Expr }
func (*LookupDef) CheckFilterCondition ¶
func (lkpDef *LookupDef) CheckFilterCondition(varsFromLookup eval.VarValuesMap) (bool, error)
func (*LookupDef) CheckPagedBatchSize ¶
func (*LookupDef) ParseFilter ¶
func (*LookupDef) UsesFilter ¶
func (*LookupDef) ValidateJoinType ¶
type LookupJoinType ¶
type LookupJoinType string
const ( LookupJoinInner LookupJoinType = "inner" LookupJoinLeft LookupJoinType = "left" )
type NodeRerunPolicy ¶
type NodeRerunPolicy string
const ( NodeRerun NodeRerunPolicy = "rerun" // Default NodeFail NodeRerunPolicy = "fail" )
type NodeStartPolicy ¶
type NodeStartPolicy string
const ( NodeStartManual NodeStartPolicy = "manual" NodeStartAuto NodeStartPolicy = "auto" // Default )
type ReadyToRunNodeCmdType ¶
type ReadyToRunNodeCmdType string
const ( NodeNone ReadyToRunNodeCmdType = "none" NodeGo ReadyToRunNodeCmdType = "go" NodeWait ReadyToRunNodeCmdType = "wait" NodeNogo ReadyToRunNodeCmdType = "nogo" )
type ScriptDef ¶
type ScriptDef struct { ScriptNodes map[string]*ScriptNodeDef `json:"nodes"` RawDependencyPolicies map[string]json.RawMessage `json:"dependency_policies"` TableCreatorNodeMap map[string](*ScriptNodeDef) IndexNodeMap map[string](*ScriptNodeDef) }
func (*ScriptDef) Deserialize ¶
func (scriptDef *ScriptDef) Deserialize(jsonBytesScript []byte, customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, caPath string, privateKeys map[string]string) error
func (*ScriptDef) GetAffectedNodes ¶
type ScriptInitProblemType ¶ added in v1.1.3
type ScriptInitProblemType int
const ScriptInitConnectivityProblem ScriptInitProblemType = 3
const ScriptInitContentProblem ScriptInitProblemType = 2
const ScriptInitNoProblem ScriptInitProblemType = 0
const ScriptInitUrlProblem ScriptInitProblemType = 1
type ScriptNodeDef ¶
type ScriptNodeDef struct { Name string // Get it from the key Type NodeType `json:"type"` Desc string `json:"desc"` StartPolicy NodeStartPolicy `json:"start_policy"` RerunPolicy NodeRerunPolicy `json:"rerun_policy"` CustomProcessorType string `json:"custom_proc_type"` HandlerExeType string `json:"handler_exe_type"` RawReader json.RawMessage `json:"r"` // This depends on tfm type TableReader TableReaderDef FileReader FileReaderDef Lookup LookupDef `json:"l"` RawProcessorDef json.RawMessage `json:"p"` // This depends on tfm type CustomProcessor CustomProcessorDef // Also should implement CustomProcessorRunner RawWriter json.RawMessage `json:"w"` // This depends on tfm type DependencyPolicyName string `json:"dependency_policy"` TableCreator TableCreatorDef TableUpdater TableUpdaterDef FileCreator FileCreatorDef DepPolDef *DependencyPolicyDef }
func (*ScriptNodeDef) Deserialize ¶
func (node *ScriptNodeDef) Deserialize(customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, caPath string, privateKeys map[string]string) error
func (*ScriptNodeDef) GetTargetName ¶
func (node *ScriptNodeDef) GetTargetName() string
func (*ScriptNodeDef) GetTokenIntervalsByNumberOfBatches ¶
func (node *ScriptNodeDef) GetTokenIntervalsByNumberOfBatches() ([][]int64, error)
func (*ScriptNodeDef) GetUniqueIndexesFieldRefs ¶
func (node *ScriptNodeDef) GetUniqueIndexesFieldRefs() *FieldRefs
func (*ScriptNodeDef) HasCustomProcessor ¶
func (node *ScriptNodeDef) HasCustomProcessor() bool
func (*ScriptNodeDef) HasFileCreator ¶
func (node *ScriptNodeDef) HasFileCreator() bool
func (*ScriptNodeDef) HasFileReader ¶
func (node *ScriptNodeDef) HasFileReader() bool
func (*ScriptNodeDef) HasLookup ¶
func (node *ScriptNodeDef) HasLookup() bool
func (*ScriptNodeDef) HasTableCreator ¶
func (node *ScriptNodeDef) HasTableCreator() bool
func (*ScriptNodeDef) HasTableReader ¶
func (node *ScriptNodeDef) HasTableReader() bool
type TableCreatorDef ¶
type TableCreatorDef struct { Name string `json:"name"` RawHaving string `json:"having"` Having ast.Expr UsedInHavingFields FieldRefs UsedInTargetExpressionsFields FieldRefs Fields map[string]*WriteTableFieldDef `json:"fields"` RawIndexes map[string]string `json:"indexes"` Indexes IdxDefMap }
func (*TableCreatorDef) CalculateTableRecordFromSrcVars ¶
func (creatorDef *TableCreatorDef) CalculateTableRecordFromSrcVars(canUseAggFunc bool, srcVars eval.VarValuesMap) (map[string]interface{}, error)
func (*TableCreatorDef) CheckTableRecordHavingCondition ¶
func (creatorDef *TableCreatorDef) CheckTableRecordHavingCondition(tableRecord map[string]interface{}) (bool, error)
func (*TableCreatorDef) Deserialize ¶
func (tcDef *TableCreatorDef) Deserialize(rawWriter json.RawMessage) error
func (*TableCreatorDef) GetFieldDefaultReadyForDb ¶
func (creatorDef *TableCreatorDef) GetFieldDefaultReadyForDb(fieldName string) (interface{}, error)
func (*TableCreatorDef) GetFieldRefs ¶
func (tcDef *TableCreatorDef) GetFieldRefs() *FieldRefs
func (*TableCreatorDef) GetFieldRefsWithAlias ¶
func (tcDef *TableCreatorDef) GetFieldRefsWithAlias(useTableAlias string) *FieldRefs
type TableFieldType ¶
type TableFieldType string
const ( FieldTypeString TableFieldType = "string" FieldTypeInt TableFieldType = "int" // sign+18digit string FieldTypeFloat TableFieldType = "float" // sign+64digit string, 32 digits after point FieldTypeBool TableFieldType = "bool" // F or T FieldTypeDecimal2 TableFieldType = "decimal2" // sign + 18digit+point+2 FieldTypeDateTime TableFieldType = "datetime" // int unix epoch milliseconds FieldTypeUnknown TableFieldType = "unknown" )
type TableReaderDef ¶
type TableReaderDef struct { TableName string `json:"table"` ExpectedBatchesTotal int `json:"expected_batches_total"` RowsetSize int `json:"rowset_size"` // DefaultRowsetSize = 1000 TableCreator *TableCreatorDef }
type TableUpdaterDef ¶
type TableUpdaterDef struct {
Fields map[string]*WriteTableFieldDef `json:"fields"`
}
type WriteFileColumnDef ¶
type WriteFileColumnDef struct { RawExpression string `json:"expression"` Format string `json:"format"` Header string `json:"header"` Name string `json:"name"` // To be used in Having Type TableFieldType `json:"type"` // To be checked when checking expressions and to be used in Having ParsedExpression ast.Expr UsedFields FieldRefs }
type WriteTableFieldDef ¶
Click to show internal directories.
Click to hide internal directories.