Documentation ¶
Overview ¶
Package proc contains classes and functions that process script nodes and write results to tables or files
Index ¶
- Constants
- Variables
- func CreateDataTableCql(keyspace string, runId int16, tableCreator *sc.TableCreatorDef) string
- func CreateIdxTableCql(keyspace string, runId int16, idxName string, idxDef *sc.IdxDef) string
- func DeleteDataAndUniqueIndexesByBatchIdx(logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext) error
- type BatchStats
- func RunCreateDistinctTableForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunCreateFile(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunCreateTableForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunCreateTableForCustomProcessorForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunCreateTableRelForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- func RunReadFileForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, ...) (BatchStats, error)
- type CustomProcessorRunner
- type DataIdxSeqModeType
- type FileInserter
- type FileRecordHeap
- type FileRecordHeapItem
- type PreparedQuery
- type Rowset
- func (rs *Rowset) AppendFieldRefs(fieldRefs *sc.FieldRefs)
- func (rs *Rowset) AppendFieldRefsWithFilter(fieldRefs *sc.FieldRefs, tableFilter string)
- func (rs *Rowset) ArrangeByRowid(rowids []int64) error
- func (rs *Rowset) ExportToVars(rowIdx int, vars *eval.VarValuesMap) error
- func (rs *Rowset) ExportToVarsWithAlias(rowIdx int, vars *eval.VarValuesMap, useTableAlias string) error
- func (rs *Rowset) GetFieldNames() *[]string
- func (rs *Rowset) GetTableRecord(rowIdx int) (map[string]any, error)
- func (rs *Rowset) InitRows(capacity int) error
- func (rs *Rowset) ToString() string
- type TableInserter
- type TableRecord
- type TableRecordBatch
- type TableRecordPtr
- type WriteChannelItem
- type WriteFileBatch
Constants ¶
View Source
const DefaultFileInserterBatchCapacity int = 1000
View Source
const DefaultInserterBatchSize int = 5000
View Source
const HarvestForDeleteRowsetSize = 1000 // Do not let users tweak it, maybe too sensitive
Variables ¶
View Source
var ErrDuplicateKey = errors.New("duplicate key")
View Source
var ErrDuplicateRowid = errors.New("duplicate rowid")
Functions ¶
func CreateDataTableCql ¶
func CreateDataTableCql(keyspace string, runId int16, tableCreator *sc.TableCreatorDef) string
func CreateIdxTableCql ¶
func DeleteDataAndUniqueIndexesByBatchIdx ¶
func DeleteDataAndUniqueIndexesByBatchIdx(logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext) error
To test it, see comments in the end of RunCreateTableRelForBatch
Types ¶
type BatchStats ¶ added in v1.1.1
func RunCreateDistinctTableForBatch ¶ added in v1.1.18
func RunCreateDistinctTableForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, startLeftToken int64, endLeftToken int64) (BatchStats, error)
func RunCreateFile ¶
func RunCreateFile(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, startToken int64, endToken int64) (BatchStats, error)
func RunCreateTableForBatch ¶
func RunCreateTableForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, startLeftToken int64, endLeftToken int64) (BatchStats, error)
func RunCreateTableForCustomProcessorForBatch ¶
func RunCreateTableForCustomProcessorForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, startLeftToken int64, endLeftToken int64) (BatchStats, error)
func RunCreateTableRelForBatch ¶
func RunCreateTableRelForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, readerNodeRunId int16, lookupNodeRunId int16, startLeftToken int64, endLeftToken int64) (BatchStats, error)
func RunReadFileForBatch ¶
func RunReadFileForBatch(envConfig *env.EnvConfig, logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, srcFileIdx int) (BatchStats, error)
func (*BatchStats) ToString ¶ added in v1.1.1
func (bs *BatchStats) ToString() string
type CustomProcessorRunner ¶
type CustomProcessorRunner interface {
Run(logger *l.CapiLogger, pCtx *ctx.MessageProcessingContext, rsIn *Rowset, flushVarsArray func(varsArray []*eval.VarValuesMap, varsArrayCount int) error) error
}
type DataIdxSeqModeType ¶ added in v1.1.18
type DataIdxSeqModeType int
const ( DataIdxSeqModeDataFirst DataIdxSeqModeType = iota DataIdxSeqModeDistinctIdxFirst )
type FileInserter ¶
type FileInserter struct { PCtx *ctx.MessageProcessingContext FileCreator *sc.FileCreatorDef CurrentBatch *WriteFileBatch BatchCapacity int BatchesIn chan *WriteFileBatch RecordWrittenStatuses chan error BatchesSent int FinalFileUrl string TempFilePath string }
type FileRecordHeap ¶
type FileRecordHeap []*FileRecordHeapItem
func (FileRecordHeap) Len ¶
func (h FileRecordHeap) Len() int
func (FileRecordHeap) Less ¶
func (h FileRecordHeap) Less(i, j int) bool
func (*FileRecordHeap) Pop ¶
func (h *FileRecordHeap) Pop() any
func (*FileRecordHeap) Push ¶
func (h *FileRecordHeap) Push(x any)
func (FileRecordHeap) Swap ¶
func (h FileRecordHeap) Swap(i, j int)
type FileRecordHeapItem ¶
type PreparedQuery ¶ added in v1.1.15
type PreparedQuery struct { Qb *cql.QueryBuilder Query string }
type Rowset ¶
type Rowset struct { Fields []sc.FieldRef FieldsByFullAliasName map[string]int FieldsByFieldName map[string]int Rows []*[]any RowCount int }
func NewRowsetFromFieldRefs ¶
func (*Rowset) AppendFieldRefs ¶
func (*Rowset) AppendFieldRefsWithFilter ¶
func (*Rowset) ArrangeByRowid ¶
func (*Rowset) ExportToVars ¶
func (rs *Rowset) ExportToVars(rowIdx int, vars *eval.VarValuesMap) error
func (*Rowset) ExportToVarsWithAlias ¶
func (*Rowset) GetFieldNames ¶
func (*Rowset) GetTableRecord ¶
type TableInserter ¶
type TableInserter struct { PCtx *ctx.MessageProcessingContext TableCreator *sc.TableCreatorDef RecordsIn chan WriteChannelItem // Channel to pass records from the main function like RunCreateTableForBatch, usig add(), to TableInserter RecordWrittenStatuses chan error RecordWrittenStatusesMutex sync.Mutex MachineHash int64 NumWorkers int MinInserterRate int WorkerWaitGroup sync.WaitGroup RecordsSent int // Records sent to RecordsIn RecordsProcessed int // Number of items received in RecordWrittenStatuses DoesNotExistPause float32 OperationTimedOutPause float32 DataIdxSeqMode DataIdxSeqModeType }
type TableRecord ¶
type TableRecordBatch ¶
type TableRecordBatch []TableRecordPtr
type TableRecordPtr ¶
type WriteChannelItem ¶
type WriteChannelItem struct { TableRecord *TableRecord IndexKeyMap map[string]string }
type WriteFileBatch ¶
Click to show internal directories.
Click to hide internal directories.