sc

package
v1.1.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2023 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CreatorFileTypeUnknown int = 0
	CreatorFileTypeCsv     int = 1
	CreatorFileTypeParquet int = 2
)
View Source
const (
	ReaderFileTypeUnknown int = 0
	ReaderFileTypeCsv     int = 1
	ReaderFileTypeParquet int = 2
)
View Source
const (
	DefaultStringComponentLen int64 = 64
	MinStringComponentLen     int64 = 16
	MaxStringComponentLen     int64 = 1024
)
View Source
const (
	ReservedParamBatchIdx string = "{batch_idx|string}"
	ReservedParamRunId    string = "{run_id|string}"
)
View Source
const (
	HandlerExeTypeGeneric  string = "capi_daemon"
	HandlerExeTypeToolbelt string = "capi_toolbelt"
	HandlerExeTypeWebapi   string = "capi_webapi"
)
View Source
const AllowedIdxNameRegex = "^idx[A-Za-z0-9_]+"
View Source
const AllowedTableNameRegex = "[A-Za-z0-9_]+"
View Source
const BeginningOfTimeMicro = int64(-62135596800000000) // time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC).UnixMicro()
View Source
const CassandraDatetimeFormat string = "2006-01-02T15:04:05.000-07:00"

Cassandra timestamps are milliseconds. No microsecond support. On writes: - allows (but not requires) ":" in the timezone - allows (but not requires) "T" in as date/time separator

View Source
const CreatorAlias string = "w"
View Source
const CustomProcessorAlias string = "p"
View Source
const DefaultBool bool = false
View Source
const DefaultFloat float64 = float64(0.0)
View Source
const DefaultInt int64 = int64(0)
View Source
const DefaultPolicyCheckerConf string = `` /* 1464-byte string literal not displayed */

This conf should be never referenced in prod code. It's always in the the config.json. Or in the unit tests.

View Source
const DefaultRowsetSize int = 1000
View Source
const DefaultString string = ""
View Source
const (
	FieldNameUnknown = "unknown_field_name"
)
View Source
const LookupAlias string = "l"
View Source
const MaxAcceptedBatchesByTableReader int = 1000000
View Source
const MaxFileCreatorTopLimit int = 500000
View Source
const MaxRowsetSize int = 100000
View Source
const ProhibitedTableNameRegex = "^idx|^wf|^system"
View Source
const ReaderAlias string = "r"

Variables

This section is empty.

Functions

func BuildKey

func BuildKey(fieldMap map[string]any, idxDef *IdxDef) (string, error)

func CalculateFieldValue

func CalculateFieldValue(fieldName string, fieldDef *WriteTableFieldDef, srcVars eval.VarValuesMap, canUseAggFunc bool) (any, error)

func CheckValueType

func CheckValueType(val any, fieldType TableFieldType) error

func DefaultCassandraDecimal2

func DefaultCassandraDecimal2() *inf.Dec

func DefaultDateTime

func DefaultDateTime() time.Time

func DefaultDecimal2

func DefaultDecimal2() decimal.Decimal

func GetDefaultFieldTypeValue

func GetDefaultFieldTypeValue(fieldType TableFieldType) any

func IsValidFieldType

func IsValidFieldType(fieldType TableFieldType) bool

func NewScriptFromFileBytes added in v1.1.13

func NewScriptFromFileBytes(caPath string, privateKeys map[string]string, scriptUri string, jsonBytesScript []byte, scriptParamsUri string, jsonBytesParams []byte, customProcessorDefFactoryInstance CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage) (*ScriptDef, ScriptInitProblemType, error)

func NewScriptFromFiles

func NewScriptFromFiles(caPath string, privateKeys map[string]string, scriptUri string, scriptParamsUri string, customProcessorDefFactoryInstance CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage) (*ScriptDef, ScriptInitProblemType, error)

func ParseRawGolangExpressionStringAndHarvestFieldRefs

func ParseRawGolangExpressionStringAndHarvestFieldRefs(strExp string, usedFields *FieldRefs) (ast.Expr, error)

func ParseRawRelaxedGolangExpressionStringAndHarvestFieldRefs added in v1.1.4

func ParseRawRelaxedGolangExpressionStringAndHarvestFieldRefs(strExp string, usedFields *FieldRefs, parserFlags FieldRefParserFlag) (ast.Expr, error)

func ValidateNodeType

func ValidateNodeType(nodeType NodeType) error

func ValidateRerunPolicy

func ValidateRerunPolicy(rerunPolicy NodeRerunPolicy) error

func ValidateStartPolicy

func ValidateStartPolicy(startPolicy NodeStartPolicy) error

Types

type AggFinderVisitor

type AggFinderVisitor struct {
	Error error
}

func (*AggFinderVisitor) Visit

func (v *AggFinderVisitor) Visit(node ast.Node) ast.Visitor

type CsvCreatorSettings added in v1.1.9

type CsvCreatorSettings struct {
	Separator string `json:"separator"`
}

type CsvReaderColumnSettings added in v1.1.9

type CsvReaderColumnSettings struct {
	SrcColIdx    int    `json:"col_idx"`
	SrcColHeader string `json:"col_hdr"`
	SrcColFormat string `json:"col_format"` // Optional for all except datetime
}

type CsvReaderSettings added in v1.1.9

type CsvReaderSettings struct {
	SrcFileHdrLineIdx       int    `json:"hdr_line_idx"`
	SrcFileFirstDataLineIdx int    `json:"first_data_line_idx"`
	Separator               string `json:"separator"`
	ColumnIndexingMode      FileColumnIndexingMode
}

type CustomProcessorDef

type CustomProcessorDef interface {
	Deserialize(raw json.RawMessage, customProcSettings json.RawMessage, caPath string, privateKeys map[string]string) error
	GetFieldRefs() *FieldRefs
	GetUsedInTargetExpressionsFields() *FieldRefs
}

type CustomProcessorDefFactory

type CustomProcessorDefFactory interface {
	Create(processorType string) (CustomProcessorDef, bool)
}

type DependencyPolicyDef

type DependencyPolicyDef struct {
	EventPriorityOrderString string           `json:"event_priority_order"`
	IsDefault                bool             `json:"is_default"`
	Rules                    []DependencyRule `json:"rules"`
	OrderIdxDef              IdxDef
}

func (*DependencyPolicyDef) Deserialize

func (polDef *DependencyPolicyDef) Deserialize(rawPol json.RawMessage) error

type DependencyRule

type DependencyRule struct {
	Cmd              ReadyToRunNodeCmdType `json:"cmd"`
	RawExpression    string                `json:"expression"`
	ParsedExpression ast.Expr
}

type FieldRef

type FieldRef struct {
	TableName string
	FieldName string
	FieldType TableFieldType
}

func IdxKeyFieldRef

func IdxKeyFieldRef() FieldRef
func RunBatchKeyTokenFieldRef() FieldRef {
	return FieldRef{
		TableName: "db_system",
		FieldName: "token(run_id,batch_idx,key)",
		FieldType: FieldTypeInt}
}

func KeyTokenFieldRef

func KeyTokenFieldRef() FieldRef

func RowidFieldRef

func RowidFieldRef(tableName string) FieldRef

func RowidTokenFieldRef

func RowidTokenFieldRef() FieldRef

func (*FieldRef) GetAliasHash

func (fr *FieldRef) GetAliasHash() string

type FieldRefParserFlag added in v1.1.4

type FieldRefParserFlag uint32
const (
	FieldRefStrict             FieldRefParserFlag = 0
	FieldRefAllowUnknownIdents FieldRefParserFlag = 1 << iota
	FieldRefAllowWhateverFeatureYouAreAddingHere
)

func (FieldRefParserFlag) HasFlag added in v1.1.4

func (f FieldRefParserFlag) HasFlag(flag FieldRefParserFlag) bool

type FieldRefs

type FieldRefs []FieldRef

func GetFieldRefsUsedInAllTargetExpressions

func GetFieldRefsUsedInAllTargetExpressions(fieldDefMap map[string]*WriteTableFieldDef) FieldRefs

func JoinFieldRefs

func JoinFieldRefs(srcFieldRefs ...*FieldRefs) *FieldRefs

func NewFieldRefsFromNodeEvent

func NewFieldRefsFromNodeEvent() *FieldRefs

func (*FieldRefs) Append

func (fieldRefs *FieldRefs) Append(otherFieldRefs FieldRefs)

func (*FieldRefs) AppendWithFilter

func (fieldRefs *FieldRefs) AppendWithFilter(otherFieldRefs FieldRefs, tableFilter string)

func (*FieldRefs) FindByFieldName

func (fieldRefs *FieldRefs) FindByFieldName(fieldName string) (*FieldRef, bool)

func (*FieldRefs) HasFieldsWithTableAlias

func (fieldRefs *FieldRefs) HasFieldsWithTableAlias(tableAlias string) bool

type FileColumnIndexingMode

type FileColumnIndexingMode string
const (
	FileColumnIndexingName    FileColumnIndexingMode = "name"
	FileColumnIndexingIdx     FileColumnIndexingMode = "idx"
	FileColumnIndexingUnknown FileColumnIndexingMode = "unknown"
)

type FileCreatorDef

type FileCreatorDef struct {
	RawHaving                     string `json:"having"`
	Having                        ast.Expr
	UsedInHavingFields            FieldRefs
	UsedInTargetExpressionsFields FieldRefs
	Columns                       []WriteFileColumnDef   `json:"columns"`
	UrlTemplate                   string                 `json:"url_template"`
	Top                           TopDef                 `json:"top"`
	Csv                           CsvCreatorSettings     `json:"csv,omitempty"`
	Parquet                       ParquetCreatorSettings `json:"parquet,omitempty"`
	CreatorFileType               int
}

func (*FileCreatorDef) CalculateFileRecordFromSrcVars

func (creatorDef *FileCreatorDef) CalculateFileRecordFromSrcVars(srcVars eval.VarValuesMap) ([]any, error)

func (*FileCreatorDef) CheckFileRecordHavingCondition

func (creatorDef *FileCreatorDef) CheckFileRecordHavingCondition(fileRecord []any) (bool, error)

func (*FileCreatorDef) Deserialize

func (creatorDef *FileCreatorDef) Deserialize(rawWriter json.RawMessage) error

func (*FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions

func (creatorDef *FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions() FieldRefs

func (*FileCreatorDef) HasTop

func (creatorDef *FileCreatorDef) HasTop() bool

type FileReaderColumnDef

type FileReaderColumnDef struct {
	DefaultValue string                      `json:"col_default_value"` // Optional. If omitted, zero value is used
	Type         TableFieldType              `json:"col_type"`
	Csv          CsvReaderColumnSettings     `json:"csv,omitempty"`
	Parquet      ParquetReaderColumnSettings `json:"parquet,omitempty"`
}

type FileReaderDef

type FileReaderDef struct {
	SrcFileUrls    []string                        `json:"urls"`
	Columns        map[string]*FileReaderColumnDef `json:"columns"` // Keys are names used in table writer
	Csv            CsvReaderSettings               `json:"csv,omitempty"`
	ReaderFileType int
}

func (*FileReaderDef) Deserialize

func (frDef *FileReaderDef) Deserialize(rawReader json.RawMessage) error

func (*FileReaderDef) ReadCsvLineToValuesMap added in v1.1.9

func (frDef *FileReaderDef) ReadCsvLineToValuesMap(line *[]string, colVars eval.VarValuesMap) error

func (*FileReaderDef) ResolveCsvColumnIndexesFromNames added in v1.1.9

func (frDef *FileReaderDef) ResolveCsvColumnIndexesFromNames(srcHdrLine []string) error

type IdxCaseSensitivity

type IdxCaseSensitivity string
const (
	IdxCaseSensitive          IdxCaseSensitivity = "case_sensitive"
	IdxIgnoreCase             IdxCaseSensitivity = "ignore_case"
	IdxCaseSensitivityUnknown IdxCaseSensitivity = "case_sensitivity_unknown"
)

type IdxComponentDef

type IdxComponentDef struct {
	FieldName       string
	CaseSensitivity IdxCaseSensitivity
	SortOrder       IdxSortOrder
	StringLen       int64          // For string fields only, default 64
	FieldType       TableFieldType // Populated from tgt_table def
}

type IdxDef

type IdxDef struct {
	Uniqueness IdxUniqueness
	Components []IdxComponentDef
}

type IdxDefMap

type IdxDefMap map[string]*IdxDef

type IdxSortOrder

type IdxSortOrder string
const (
	IdxSortAsc     IdxSortOrder = "asc"
	IdxSortDesc    IdxSortOrder = "desc"
	IdxSortUnknown IdxSortOrder = "unknown"
)

type IdxUniqueness

type IdxUniqueness string
const (
	IdxUnique            IdxUniqueness = "unique"
	IdxNonUnique         IdxUniqueness = "non_unique"
	IdxUniquenessUnknown IdxUniqueness = "unknown"
)

type IndexRef

type IndexRef struct {
	TableName string
	IdxName   string
}

type LookupDef

type LookupDef struct {
	IndexName                string         `json:"index_name"`
	RawJoinOn                string         `json:"join_on"`
	IsGroup                  bool           `json:"group"`
	RawFilter                string         `json:"filter"`
	LookupJoin               LookupJoinType `json:"join_type"`
	IdxReadBatchSize         int            `json:"idx_read_batch_size"`
	RightLookupReadBatchSize int            `json:"right_lookup_read_batch_size"`

	LeftTableFields    FieldRefs        // In the same order as lookup idx - important
	TableCreator       *TableCreatorDef // Populated when walking through al nodes
	UsedInFilterFields FieldRefs
	Filter             ast.Expr
}

func (*LookupDef) CheckFilterCondition

func (lkpDef *LookupDef) CheckFilterCondition(varsFromLookup eval.VarValuesMap) (bool, error)

func (*LookupDef) CheckPagedBatchSize

func (lkpDef *LookupDef) CheckPagedBatchSize() error

func (*LookupDef) ParseFilter

func (lkpDef *LookupDef) ParseFilter() error

func (*LookupDef) UsesFilter

func (lkpDef *LookupDef) UsesFilter() bool

func (*LookupDef) ValidateJoinType

func (lkpDef *LookupDef) ValidateJoinType() error

type LookupJoinType

type LookupJoinType string
const (
	LookupJoinInner LookupJoinType = "inner"
	LookupJoinLeft  LookupJoinType = "left"
)

type NodeRerunPolicy

type NodeRerunPolicy string
const (
	NodeRerun NodeRerunPolicy = "rerun" // Default
	NodeFail  NodeRerunPolicy = "fail"
)

type NodeStartPolicy

type NodeStartPolicy string
const (
	NodeStartManual NodeStartPolicy = "manual"
	NodeStartAuto   NodeStartPolicy = "auto" // Default
)

type NodeType

type NodeType string
const (
	NodeTypeNone                NodeType = "none"
	NodeTypeFileTable           NodeType = "file_table"
	NodeTypeTableTable          NodeType = "table_table"
	NodeTypeTableLookupTable    NodeType = "table_lookup_table"
	NodeTypeTableFile           NodeType = "table_file"
	NodeTypeTableCustomTfmTable NodeType = "table_custom_tfm_table"
)

type ParquetCodecType added in v1.1.9

type ParquetCodecType string
const (
	ParquetCodecGzip         ParquetCodecType = "gzip"
	ParquetCodecSnappy       ParquetCodecType = "snappy"
	ParquetCodecUncompressed ParquetCodecType = "uncompressed"
)

type ParquetCreatorSettings added in v1.1.9

type ParquetCreatorSettings struct {
	Codec ParquetCodecType `json:"codec"`
}

type ParquetReaderColumnSettings added in v1.1.9

type ParquetReaderColumnSettings struct {
	SrcColName string `json:"col_name"`
}

type ReadyToRunNodeCmdType

type ReadyToRunNodeCmdType string
const (
	NodeNone ReadyToRunNodeCmdType = "none"
	NodeGo   ReadyToRunNodeCmdType = "go"
	NodeWait ReadyToRunNodeCmdType = "wait"
	NodeNogo ReadyToRunNodeCmdType = "nogo"
)

type ScriptDef

type ScriptDef struct {
	ScriptNodes           map[string]*ScriptNodeDef  `json:"nodes"`
	RawDependencyPolicies map[string]json.RawMessage `json:"dependency_policies"`
	TableCreatorNodeMap   map[string](*ScriptNodeDef)
	IndexNodeMap          map[string](*ScriptNodeDef)
}

func (*ScriptDef) Deserialize

func (scriptDef *ScriptDef) Deserialize(jsonBytesScript []byte, customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, caPath string, privateKeys map[string]string) error

func (*ScriptDef) GetAffectedNodes

func (scriptDef *ScriptDef) GetAffectedNodes(startNodeNames []string) []string

type ScriptInitProblemType added in v1.1.3

type ScriptInitProblemType int
const ScriptInitConnectivityProblem ScriptInitProblemType = 3
const ScriptInitContentProblem ScriptInitProblemType = 2
const ScriptInitNoProblem ScriptInitProblemType = 0
const ScriptInitUrlProblem ScriptInitProblemType = 1

type ScriptNodeDef

type ScriptNodeDef struct {
	Name                string          // Get it from the key
	Type                NodeType        `json:"type"`
	Desc                string          `json:"desc"`
	StartPolicy         NodeStartPolicy `json:"start_policy"`
	RerunPolicy         NodeRerunPolicy `json:"rerun_policy"`
	CustomProcessorType string          `json:"custom_proc_type"`
	HandlerExeType      string          `json:"handler_exe_type"`

	RawReader   json.RawMessage `json:"r"` // This depends on tfm type
	TableReader TableReaderDef
	FileReader  FileReaderDef

	Lookup LookupDef `json:"l"`

	RawProcessorDef json.RawMessage    `json:"p"` // This depends on tfm type
	CustomProcessor CustomProcessorDef // Also should implement CustomProcessorRunner

	RawWriter            json.RawMessage `json:"w"` // This depends on tfm type
	DependencyPolicyName string          `json:"dependency_policy"`
	TableCreator         TableCreatorDef
	TableUpdater         TableUpdaterDef
	FileCreator          FileCreatorDef
	DepPolDef            *DependencyPolicyDef
}

func (*ScriptNodeDef) Deserialize

func (node *ScriptNodeDef) Deserialize(customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, caPath string, privateKeys map[string]string) error

func (*ScriptNodeDef) GetTargetName

func (node *ScriptNodeDef) GetTargetName() string

func (*ScriptNodeDef) GetTokenIntervalsByNumberOfBatches

func (node *ScriptNodeDef) GetTokenIntervalsByNumberOfBatches() ([][]int64, error)

func (*ScriptNodeDef) GetUniqueIndexesFieldRefs

func (node *ScriptNodeDef) GetUniqueIndexesFieldRefs() *FieldRefs

func (*ScriptNodeDef) HasCustomProcessor

func (node *ScriptNodeDef) HasCustomProcessor() bool

func (*ScriptNodeDef) HasFileCreator

func (node *ScriptNodeDef) HasFileCreator() bool

func (*ScriptNodeDef) HasFileReader

func (node *ScriptNodeDef) HasFileReader() bool

func (*ScriptNodeDef) HasLookup

func (node *ScriptNodeDef) HasLookup() bool

func (*ScriptNodeDef) HasTableCreator

func (node *ScriptNodeDef) HasTableCreator() bool

func (*ScriptNodeDef) HasTableReader

func (node *ScriptNodeDef) HasTableReader() bool

type TableCreatorDef

type TableCreatorDef struct {
	Name                          string `json:"name"`
	RawHaving                     string `json:"having"`
	Having                        ast.Expr
	UsedInHavingFields            FieldRefs
	UsedInTargetExpressionsFields FieldRefs
	Fields                        map[string]*WriteTableFieldDef `json:"fields"`
	RawIndexes                    map[string]string              `json:"indexes"`
	Indexes                       IdxDefMap
}

func (*TableCreatorDef) CalculateTableRecordFromSrcVars

func (tcDef *TableCreatorDef) CalculateTableRecordFromSrcVars(canUseAggFunc bool, srcVars eval.VarValuesMap) (map[string]any, error)

func (*TableCreatorDef) CheckTableRecordHavingCondition

func (tcDef *TableCreatorDef) CheckTableRecordHavingCondition(tableRecord map[string]any) (bool, error)

func (*TableCreatorDef) Deserialize

func (tcDef *TableCreatorDef) Deserialize(rawWriter json.RawMessage) error

func (*TableCreatorDef) GetFieldDefaultReadyForDb

func (tcDef *TableCreatorDef) GetFieldDefaultReadyForDb(fieldName string) (any, error)

func (*TableCreatorDef) GetFieldRefs

func (tcDef *TableCreatorDef) GetFieldRefs() *FieldRefs

func (*TableCreatorDef) GetFieldRefsWithAlias

func (tcDef *TableCreatorDef) GetFieldRefsWithAlias(useTableAlias string) *FieldRefs

type TableFieldType

type TableFieldType string
const (
	FieldTypeString   TableFieldType = "string"
	FieldTypeInt      TableFieldType = "int"      // sign+18digit string
	FieldTypeFloat    TableFieldType = "float"    // sign+64digit string, 32 digits after point
	FieldTypeBool     TableFieldType = "bool"     // F or T
	FieldTypeDecimal2 TableFieldType = "decimal2" // sign + 18digit+point+2
	FieldTypeDateTime TableFieldType = "datetime" // int unix epoch milliseconds
	FieldTypeUnknown  TableFieldType = "unknown"
)

type TableReaderDef

type TableReaderDef struct {
	TableName            string `json:"table"`
	ExpectedBatchesTotal int    `json:"expected_batches_total"`
	RowsetSize           int    `json:"rowset_size"` // DefaultRowsetSize = 1000
	TableCreator         *TableCreatorDef
}

type TableUpdaterDef

type TableUpdaterDef struct {
	Fields map[string]*WriteTableFieldDef `json:"fields"`
}

type TopDef

type TopDef struct {
	Limit       int    `json:"limit"`
	RawOrder    string `json:"order"`
	OrderIdxDef IdxDef // Not an index really, we just re-use IdxDef infrastructure
}

type WriteCsvColumnSettings added in v1.1.9

type WriteCsvColumnSettings struct {
	Format string `json:"format"`
	Header string `json:"header"`
}

type WriteFileColumnDef

type WriteFileColumnDef struct {
	RawExpression    string                     `json:"expression"`
	Name             string                     `json:"name"` // To be used in Having
	Type             TableFieldType             `json:"type"` // To be checked when checking expressions and to be used in Having
	Csv              WriteCsvColumnSettings     `json:"csv,omitempty"`
	Parquet          WriteParquetColumnSettings `json:"parquet,omitempty"`
	ParsedExpression ast.Expr
	UsedFields       FieldRefs
}

type WriteParquetColumnSettings added in v1.1.9

type WriteParquetColumnSettings struct {
	ColumnName string `json:"column_name"`
}

type WriteTableFieldDef

type WriteTableFieldDef struct {
	RawExpression    string         `json:"expression"`
	Type             TableFieldType `json:"type"`
	DefaultValue     string         `json:"default_value"` // Optional. If omitted, default zero value is used
	ParsedExpression ast.Expr
	UsedFields       FieldRefs
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL