sc

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2022 License: MIT Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultStringComponentLen int64 = 64
	MinStringComponentLen     int64 = 16
	MaxStringComponentLen     int64 = 1024
)
View Source
const (
	ReservedParamBatchIdx string = "{batch_idx|string}"
	ReservedParamRunId    string = "{run_id|string}"
)
View Source
const (
	HandlerExeTypeGeneric  string = "capi_daemon"
	HandlerExeTypeToolbelt string = "capi_toolbelt"
	HandlerExeTypeWebapi   string = "capi_webapi"
)
View Source
const AllowedIdxNameRegex = "^idx[A-Za-z0-9_]+"
View Source
const AllowedTableNameRegex = "[A-Za-z0-9_]+"
View Source
const BeginningOfTimeMicro = int64(-62135596800000000) // time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC).UnixMicro()
View Source
const CassandraDatetimeFormat string = "2006-01-02T15:04:05.000-07:00"

Cassandra timestamps are milliseconds. No microsecond support. On writes: - allows (but not requires) ":" in the timezone - allows (but not requires) "T" in as date/time separator

View Source
const CreatorAlias string = "w"
View Source
const CustomProcessorAlias string = "p"
View Source
const DefaultBool bool = false
View Source
const DefaultFloat float64 = float64(0.0)
View Source
const DefaultInt int64 = int64(0)
View Source
const DefaultRowsetSize int = 1000
View Source
const DefaultString string = ""
View Source
const (
	FieldNameUnknown = "unknown_field_name"
)
View Source
const LookupAlias string = "l"
View Source
const MaxAcceptedBatchesByTableReader int = 1000000
View Source
const MaxFileCreatorTopLimit int = 500000
View Source
const MaxRowsetSize int = 100000
View Source
const ProhibitedTableNameRegex = "^idx|^wf|^system"
View Source
const ReaderAlias string = "r"
View Source
const UriSchemeFile string = "file"
View Source
const UriSchemeHttp string = "http"
View Source
const UriSchemeHttps string = "https"

Variables

This section is empty.

Functions

func BuildKey

func BuildKey(fieldMap map[string]interface{}, idxDef *IdxDef) (string, error)

func CalculateFieldValue

func CalculateFieldValue(fieldName string, fieldDef *WriteTableFieldDef, srcVars eval.VarValuesMap, canUseAggFunc bool) (interface{}, error)

func CheckValueType

func CheckValueType(val interface{}, fieldType TableFieldType) error

func DefaultCassandraDecimal2

func DefaultCassandraDecimal2() *inf.Dec

func DefaultDateTime

func DefaultDateTime() time.Time

func DefaultDecimal2

func DefaultDecimal2() decimal.Decimal

func GetDefaultFieldTypeValue

func GetDefaultFieldTypeValue(fieldType TableFieldType) interface{}

func GetFileBytes

func GetFileBytes(uri string, certPath string) ([]byte, error)

func GetHttpReadCloser added in v1.0.1

func GetHttpReadCloser(uri string, scheme string, certDir string) (io.ReadCloser, error)

func IsValidFieldType

func IsValidFieldType(fieldType TableFieldType) bool

func ParseRawGolangExpressionStringAndHarvestFieldRefs

func ParseRawGolangExpressionStringAndHarvestFieldRefs(strExp string, usedFields *FieldRefs) (ast.Expr, error)

func ValidateNodeType

func ValidateNodeType(nodeType NodeType) error

func ValidateRerunPolicy

func ValidateRerunPolicy(rerunPolicy NodeRerunPolicy) error

func ValidateStartPolicy

func ValidateStartPolicy(startPolicy NodeStartPolicy) error

Types

type AggFinderVisitor

type AggFinderVisitor struct {
	Error error
}

func (*AggFinderVisitor) Visit

func (v *AggFinderVisitor) Visit(node ast.Node) ast.Visitor

type CustomProcessorDef

type CustomProcessorDef interface {
	Deserialize(raw json.RawMessage, customProcSettings json.RawMessage, caPath string) error
	GetFieldRefs() *FieldRefs
	GetUsedInTargetExpressionsFields() *FieldRefs
}

type CustomProcessorDefFactory

type CustomProcessorDefFactory interface {
	Create(processorType string) (CustomProcessorDef, bool)
}

type DependencyPolicyDef

type DependencyPolicyDef struct {
	EventPriorityOrderString string           `json:"event_priority_order"`
	IsDefault                bool             `json:"is_default"`
	Rules                    []DependencyRule `json:"rules"`
	OrderIdxDef              IdxDef
}

func (*DependencyPolicyDef) Deserialize

func (polDef *DependencyPolicyDef) Deserialize(rawPol json.RawMessage) error

type DependencyRule

type DependencyRule struct {
	Cmd              ReadyToRunNodeCmdType `json:"cmd"`
	RawExpression    string                `json:"expression"`
	ParsedExpression ast.Expr
}

type FieldRef

type FieldRef struct {
	TableName string
	FieldName string
	FieldType TableFieldType
}

func IdxKeyFieldRef

func IdxKeyFieldRef() FieldRef
func RunBatchKeyTokenFieldRef() FieldRef {
	return FieldRef{
		TableName: "db_system",
		FieldName: "token(run_id,batch_idx,key)",
		FieldType: FieldTypeInt}
}

func KeyTokenFieldRef

func KeyTokenFieldRef() FieldRef

func RowidFieldRef

func RowidFieldRef(tableName string) FieldRef

func RowidTokenFieldRef

func RowidTokenFieldRef() FieldRef

func (*FieldRef) GetAliasHash

func (fr *FieldRef) GetAliasHash() string

type FieldRefs

type FieldRefs []FieldRef

func GetFieldRefsUsedInAllTargetExpressions

func GetFieldRefsUsedInAllTargetExpressions(fieldDefMap map[string]*WriteTableFieldDef) FieldRefs

func JoinFieldRefs

func JoinFieldRefs(srcFieldRefs ...*FieldRefs) *FieldRefs

func NewFieldRefsFromNodeEvent

func NewFieldRefsFromNodeEvent() *FieldRefs

func (*FieldRefs) Append

func (fieldRefs *FieldRefs) Append(otherFieldRefs FieldRefs)

func (*FieldRefs) AppendWithFilter

func (fieldRefs *FieldRefs) AppendWithFilter(otherFieldRefs FieldRefs, tableFilter string)

func (*FieldRefs) FindByFieldName

func (fieldRefs *FieldRefs) FindByFieldName(fieldName string) (*FieldRef, bool)

func (*FieldRefs) HasFieldsWithTableAlias

func (fieldRefs *FieldRefs) HasFieldsWithTableAlias(tableAlias string) bool

type FileColumnIndexingMode

type FileColumnIndexingMode string
const (
	FileColumnIndexingName    FileColumnIndexingMode = "name"
	FileColumnIndexingIdx     FileColumnIndexingMode = "idx"
	FileColumnIndexingUnknown FileColumnIndexingMode = "unknown"
)

type FileCreatorDef

type FileCreatorDef struct {
	RawHaving                     string `json:"having"`
	Having                        ast.Expr
	UsedInHavingFields            FieldRefs
	UsedInTargetExpressionsFields FieldRefs
	Columns                       []WriteFileColumnDef `json:"columns"`
	UrlTemplate                   string               `json:"url_template"`
	Top                           TopDef               `json:"top"`
	Separator                     string               `json:"separator"`
}

func (*FileCreatorDef) CalculateFileRecordFromSrcVars

func (creatorDef *FileCreatorDef) CalculateFileRecordFromSrcVars(srcVars eval.VarValuesMap) ([]interface{}, error)

func (*FileCreatorDef) CheckFileRecordHavingCondition

func (creatorDef *FileCreatorDef) CheckFileRecordHavingCondition(fileRecord []interface{}) (bool, error)

func (*FileCreatorDef) Deserialize

func (creatorDef *FileCreatorDef) Deserialize(rawWriter json.RawMessage) error

func (*FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions

func (creatorDef *FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions() FieldRefs

func (*FileCreatorDef) HasTop

func (creatorDef *FileCreatorDef) HasTop() bool

type FileReaderColumnDef

type FileReaderColumnDef struct {
	SrcColIdx    int            `json:"col_idx"`
	SrcColHeader string         `json:"col_hdr"`
	SrcColFormat string         `json:"col_format"`        // Optional for all except datetime
	DefaultValue string         `json:"col_default_value"` // Optional. If omitted, zero value is used
	Type         TableFieldType `json:"col_type"`
}

type FileReaderDef

type FileReaderDef struct {
	SrcFileUrls             []string                        `json:"urls"`
	SrcFileHdrLineIdx       int                             `json:"hdr_line_idx"`
	SrcFileFirstDataLineIdx int                             `json:"first_data_line_idx"`
	Columns                 map[string]*FileReaderColumnDef `json:"columns"` // Keys are names used in table writer
	Separator               string                          `json:"separator"`
	ColumnIndexingMode      FileColumnIndexingMode
}

func (*FileReaderDef) Deserialize

func (frDef *FileReaderDef) Deserialize(rawReader json.RawMessage) error

func (*FileReaderDef) ReadLineToValuesMap

func (frDef *FileReaderDef) ReadLineToValuesMap(line *[]string, colVars eval.VarValuesMap) error

func (*FileReaderDef) ResolveColumnIndexesFromNames

func (frDef *FileReaderDef) ResolveColumnIndexesFromNames(srcHdrLine []string) error

type IdxCaseSensitivity

type IdxCaseSensitivity string
const (
	IdxCaseSensitive          IdxCaseSensitivity = "case_sensitive"
	IdxIgnoreCase             IdxCaseSensitivity = "ignore_case"
	IdxCaseSensitivityUnknown IdxCaseSensitivity = "case_sensitivity_unknown"
)

type IdxComponentDef

type IdxComponentDef struct {
	FieldName       string
	CaseSensitivity IdxCaseSensitivity
	SortOrder       IdxSortOrder
	StringLen       int64          // For string fields only, default 64
	FieldType       TableFieldType // Populated from tgt_table def
}

type IdxDef

type IdxDef struct {
	Uniqueness IdxUniqueness
	Components []IdxComponentDef
}

type IdxDefMap

type IdxDefMap map[string]*IdxDef

type IdxSortOrder

type IdxSortOrder string
const (
	IdxSortAsc     IdxSortOrder = "asc"
	IdxSortDesc    IdxSortOrder = "desc"
	IdxSortUnknown IdxSortOrder = "unknown"
)

type IdxUniqueness

type IdxUniqueness string
const (
	IdxUnique            IdxUniqueness = "unique"
	IdxNonUnique         IdxUniqueness = "non_unique"
	IdxUniquenessUnknown IdxUniqueness = "unknown"
)

type IndexRef

type IndexRef struct {
	TableName string
	IdxName   string
}

type LookupDef

type LookupDef struct {
	IndexName                string         `json:"index_name"`
	RawJoinOn                string         `json:"join_on"`
	IsGroup                  bool           `json:"group"`
	RawFilter                string         `json:"filter"`
	LookupJoin               LookupJoinType `json:"join_type"`
	IdxReadBatchSize         int            `json:"idx_read_batch_size"`
	RightLookupReadBatchSize int            `json:"right_lookup_read_batch_size"`

	LeftTableFields    FieldRefs        // In the same order as lookup idx - important
	TableCreator       *TableCreatorDef // Populated when walking through al nodes
	UsedInFilterFields FieldRefs
	Filter             ast.Expr
}

func (*LookupDef) CheckFilterCondition

func (lkpDef *LookupDef) CheckFilterCondition(varsFromLookup eval.VarValuesMap) (bool, error)

func (*LookupDef) CheckPagedBatchSize

func (lkpDef *LookupDef) CheckPagedBatchSize() error

func (*LookupDef) ParseFilter

func (lkpDef *LookupDef) ParseFilter() error

func (*LookupDef) UsesFilter

func (lkpDef *LookupDef) UsesFilter() bool

func (*LookupDef) ValidateJoinType

func (lkpDef *LookupDef) ValidateJoinType() error

type LookupJoinType

type LookupJoinType string
const (
	LookupJoinInner LookupJoinType = "inner"
	LookupJoinLeft  LookupJoinType = "left"
)

type NodeRerunPolicy

type NodeRerunPolicy string
const (
	NodeRerun NodeRerunPolicy = "rerun" // Default
	NodeFail  NodeRerunPolicy = "fail"
)

type NodeStartPolicy

type NodeStartPolicy string
const (
	NodeStartManual NodeStartPolicy = "manual"
	NodeStartAuto   NodeStartPolicy = "auto" // Default
)

type NodeType

type NodeType string
const (
	NodeTypeNone                NodeType = "none"
	NodeTypeFileTable           NodeType = "file_table"
	NodeTypeTableTable          NodeType = "table_table"
	NodeTypeTableLookupTable    NodeType = "table_lookup_table"
	NodeTypeTableFile           NodeType = "table_file"
	NodeTypeTableCustomTfmTable NodeType = "table_custom_tfm_table"
)

type ReadyToRunNodeCmdType

type ReadyToRunNodeCmdType string
const (
	NodeNone ReadyToRunNodeCmdType = "none"
	NodeGo   ReadyToRunNodeCmdType = "go"
	NodeWait ReadyToRunNodeCmdType = "wait"
	NodeNogo ReadyToRunNodeCmdType = "nogo"
)

type ScriptDef

type ScriptDef struct {
	ScriptNodes           map[string]*ScriptNodeDef  `json:"nodes"`
	RawDependencyPolicies map[string]json.RawMessage `json:"dependency_policies"`
	TableCreatorNodeMap   map[string](*ScriptNodeDef)
	IndexNodeMap          map[string](*ScriptNodeDef)
}

func NewScriptFromFiles

func NewScriptFromFiles(caPath string, scriptUri string, scriptParamsUri string, customProcessorDefFactoryInstance CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage) (*ScriptDef, error)

func (*ScriptDef) Deserialize

func (scriptDef *ScriptDef) Deserialize(jsonBytesScript []byte, customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, caPath string) error

func (*ScriptDef) GetAffectedNodes

func (scriptDef *ScriptDef) GetAffectedNodes(startNodeNames []string) []string

type ScriptNodeDef

type ScriptNodeDef struct {
	Name                string          // Get it from the key
	Type                NodeType        `json:"type"`
	Desc                string          `json:"desc"`
	StartPolicy         NodeStartPolicy `json:"start_policy"`
	RerunPolicy         NodeRerunPolicy `json:"rerun_policy"`
	CustomProcessorType string          `json:"custom_proc_type"`
	HandlerExeType      string          `json:"handler_exe_type"`

	RawReader   json.RawMessage `json:"r"` // This depends on tfm type
	TableReader TableReaderDef
	FileReader  FileReaderDef

	Lookup LookupDef `json:"l"`

	RawProcessorDef json.RawMessage    `json:"p"` // This depends on tfm type
	CustomProcessor CustomProcessorDef // Also should implement CustomProcessorRunner

	RawWriter            json.RawMessage `json:"w"` // This depends on tfm type
	DependencyPolicyName string          `json:"dependency_policy"`
	TableCreator         TableCreatorDef
	TableUpdater         TableUpdaterDef
	FileCreator          FileCreatorDef
	DepPolDef            *DependencyPolicyDef
}

func (*ScriptNodeDef) Deserialize

func (node *ScriptNodeDef) Deserialize(customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, caPath string) error

func (*ScriptNodeDef) GetTargetName

func (node *ScriptNodeDef) GetTargetName() string

func (*ScriptNodeDef) GetTokenIntervalsByNumberOfBatches

func (node *ScriptNodeDef) GetTokenIntervalsByNumberOfBatches() ([][]int64, error)

func (*ScriptNodeDef) GetUniqueIndexesFieldRefs

func (node *ScriptNodeDef) GetUniqueIndexesFieldRefs() *FieldRefs

func (*ScriptNodeDef) HasCustomProcessor

func (node *ScriptNodeDef) HasCustomProcessor() bool

func (*ScriptNodeDef) HasFileCreator

func (node *ScriptNodeDef) HasFileCreator() bool

func (*ScriptNodeDef) HasFileReader

func (node *ScriptNodeDef) HasFileReader() bool

func (*ScriptNodeDef) HasLookup

func (node *ScriptNodeDef) HasLookup() bool

func (*ScriptNodeDef) HasTableCreator

func (node *ScriptNodeDef) HasTableCreator() bool

func (*ScriptNodeDef) HasTableReader

func (node *ScriptNodeDef) HasTableReader() bool

type TableCreatorDef

type TableCreatorDef struct {
	Name                          string `json:"name"`
	RawHaving                     string `json:"having"`
	Having                        ast.Expr
	UsedInHavingFields            FieldRefs
	UsedInTargetExpressionsFields FieldRefs
	Fields                        map[string]*WriteTableFieldDef `json:"fields"`
	RawIndexes                    map[string]string              `json:"indexes"`
	Indexes                       IdxDefMap
}

func (*TableCreatorDef) CalculateTableRecordFromSrcVars

func (creatorDef *TableCreatorDef) CalculateTableRecordFromSrcVars(canUseAggFunc bool, srcVars eval.VarValuesMap) (map[string]interface{}, error)

func (*TableCreatorDef) CheckTableRecordHavingCondition

func (creatorDef *TableCreatorDef) CheckTableRecordHavingCondition(tableRecord map[string]interface{}) (bool, error)

func (*TableCreatorDef) Deserialize

func (tcDef *TableCreatorDef) Deserialize(rawWriter json.RawMessage) error

func (*TableCreatorDef) GetFieldDefaultReadyForDb

func (creatorDef *TableCreatorDef) GetFieldDefaultReadyForDb(fieldName string) (interface{}, error)

func (*TableCreatorDef) GetFieldRefs

func (tcDef *TableCreatorDef) GetFieldRefs() *FieldRefs

func (*TableCreatorDef) GetFieldRefsWithAlias

func (tcDef *TableCreatorDef) GetFieldRefsWithAlias(useTableAlias string) *FieldRefs

type TableFieldType

type TableFieldType string
const (
	FieldTypeString   TableFieldType = "string"
	FieldTypeInt      TableFieldType = "int"      // sign+18digit string
	FieldTypeFloat    TableFieldType = "float"    // sign+64digit string, 32 digits after point
	FieldTypeBool     TableFieldType = "bool"     // F or T
	FieldTypeDecimal2 TableFieldType = "decimal2" // sign + 18digit+point+2
	FieldTypeDateTime TableFieldType = "datetime" // int unix epoch milliseconds
	FieldTypeUnknown  TableFieldType = "unknown"
)

type TableReaderDef

type TableReaderDef struct {
	TableName            string `json:"table"`
	ExpectedBatchesTotal int    `json:"expected_batches_total"`
	RowsetSize           int    `json:"rowset_size"` // DefaultRowsetSize = 1000
	TableCreator         *TableCreatorDef
}

type TableUpdaterDef

type TableUpdaterDef struct {
	Fields map[string]*WriteTableFieldDef `json:"fields"`
}

type TopDef

type TopDef struct {
	Limit       int    `json:"limit"`
	RawOrder    string `json:"order"`
	OrderIdxDef IdxDef // Not an index really, we just re-use IdxDef infrastructure
}

type WriteFileColumnDef

type WriteFileColumnDef struct {
	RawExpression    string         `json:"expression"`
	Format           string         `json:"format"`
	Header           string         `json:"header"`
	Name             string         `json:"name"` // To be used in Having
	Type             TableFieldType `json:"type"` // To be checked when checking expressions and to be used in Having
	ParsedExpression ast.Expr
	UsedFields       FieldRefs
}

type WriteTableFieldDef

type WriteTableFieldDef struct {
	RawExpression    string         `json:"expression"`
	Type             TableFieldType `json:"type"`
	DefaultValue     string         `json:"default_value"` // Optional. If omitted, default zero value is used
	ParsedExpression ast.Expr
	UsedFields       FieldRefs
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL