proc

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 19, 2022 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultFileInserterBatchCapacity int = 1000
View Source
const DefaultInserterBatchSize int = 5000
View Source
const HarvestForDeleteRowsetSize = 1000 // Do not let users tweak it, maybe too sensitive

Variables

This section is empty.

Functions

func CreateDataTableCql

func CreateDataTableCql(keyspace string, runId int16, tableCreator *sc.TableCreatorDef) string

func CreateIdxTableCql

func CreateIdxTableCql(keyspace string, runId int16, idxName string, idxDef *sc.IdxDef) string

func DeleteDataAndUniqueIndexesByBatchIdx

func DeleteDataAndUniqueIndexesByBatchIdx(logger *l.Logger, pCtx *ctx.MessageProcessingContext) error

func RunCreateFile

func RunCreateFile(logger *l.Logger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startToken int64,
	endToken int64) error

func RunCreateTableForBatch

func RunCreateTableForBatch(envConfig *env.EnvConfig,
	logger *l.Logger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) error

func RunCreateTableForCustomProcessorForBatch

func RunCreateTableForCustomProcessorForBatch(envConfig *env.EnvConfig,
	logger *l.Logger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) error

func RunCreateTableRelForBatch

func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
	logger *l.Logger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	lookupNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) error

func RunReadFileForBatch

func RunReadFileForBatch(envConfig *env.EnvConfig, logger *l.Logger, pCtx *ctx.MessageProcessingContext, srcFileIdx int) error

Types

type CustomProcessorRunner

type CustomProcessorRunner interface {
	Run(logger *l.Logger, pCtx *ctx.MessageProcessingContext, rsIn *Rowset, flushVarsArray func(varsArray []*eval.VarValuesMap, varsArrayCount int) error) error
}

type FileInserter

type FileInserter struct {
	PCtx          *ctx.MessageProcessingContext
	FileCreator   *sc.FileCreatorDef
	CurrentBatch  *WriteFileBatch
	BatchCapacity int
	BatchesIn     chan *WriteFileBatch
	ErrorsOut     chan error
	BatchesSent   int
	BatchesInOpen bool
}

type FileRecordHeap

type FileRecordHeap []*FileRecordHeapItem

func (FileRecordHeap) Len

func (h FileRecordHeap) Len() int

func (FileRecordHeap) Less

func (h FileRecordHeap) Less(i, j int) bool

func (*FileRecordHeap) Pop

func (h *FileRecordHeap) Pop() interface{}

func (*FileRecordHeap) Push

func (h *FileRecordHeap) Push(x interface{})

func (FileRecordHeap) Swap

func (h FileRecordHeap) Swap(i, j int)

type FileRecordHeapItem

type FileRecordHeapItem struct {
	FileRecord *[]interface{}
	Key        string
}

type Rowset

type Rowset struct {
	Fields                []sc.FieldRef
	FieldsByFullAliasName map[string]int
	FieldsByFieldName     map[string]int
	Rows                  []*[]interface{}
	RowCount              int
}

func NewRowsetFromFieldRefs

func NewRowsetFromFieldRefs(fieldRefsList ...sc.FieldRefs) *Rowset

func (*Rowset) AppendFieldRefs

func (rs *Rowset) AppendFieldRefs(fieldRefs *sc.FieldRefs)

func (*Rowset) AppendFieldRefsWithFilter

func (rs *Rowset) AppendFieldRefsWithFilter(fieldRefs *sc.FieldRefs, tableFilter string)

func (*Rowset) ArrangeByRowid

func (rs *Rowset) ArrangeByRowid(rowids []int64) error

func (*Rowset) ExportToVars

func (rs *Rowset) ExportToVars(rowIdx int, vars *eval.VarValuesMap) error

func (*Rowset) ExportToVarsWithAlias

func (rs *Rowset) ExportToVarsWithAlias(rowIdx int, vars *eval.VarValuesMap, useTableAlias string) error

func (*Rowset) GetFieldNames

func (rs *Rowset) GetFieldNames() *[]string

func (*Rowset) GetTableRecord

func (rs *Rowset) GetTableRecord(rowIdx int) (map[string]interface{}, error)

func (*Rowset) InitRows

func (rs *Rowset) InitRows(capacity int) error

func (*Rowset) ToString

func (rs *Rowset) ToString() string

type TableInserter

type TableInserter struct {
	PCtx          *ctx.MessageProcessingContext
	TableCreator  *sc.TableCreatorDef
	BatchSize     int
	RecordsIn     chan WriteChannelItem
	ErrorsOut     chan error
	RowidRand     *rand.Rand
	RandMutex     sync.Mutex
	NumWorkers    int
	RecordsSent   int
	RecordsInOpen bool
}

type TableRecord

type TableRecord map[string]interface{}

type TableRecordBatch

type TableRecordBatch []TableRecordPtr

type TableRecordPtr

type TableRecordPtr *map[string]interface{}

type WriteChannelItem

type WriteChannelItem struct {
	TableRecord *TableRecord
	IndexKeys   *[]string
}

type WriteFileBatch

type WriteFileBatch struct {
	Rows     [][]interface{}
	RowCount int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL