proc

package
v1.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2023 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultFileInserterBatchCapacity int = 1000
View Source
const DefaultInserterBatchSize int = 5000
View Source
const HarvestForDeleteRowsetSize = 1000 // Do not let users tweak it, maybe too sensitive

Variables

This section is empty.

Functions

func CreateDataTableCql

func CreateDataTableCql(keyspace string, runId int16, tableCreator *sc.TableCreatorDef) string

func CreateIdxTableCql

func CreateIdxTableCql(keyspace string, runId int16, idxName string, idxDef *sc.IdxDef) string

func DeleteDataAndUniqueIndexesByBatchIdx

func DeleteDataAndUniqueIndexesByBatchIdx(logger *l.Logger, pCtx *ctx.MessageProcessingContext) error

Types

type BatchStats added in v1.1.1

type BatchStats struct {
	Src         string
	Dst         string
	RowsRead    int
	RowsWritten int
	Elapsed     time.Duration
}

func RunCreateFile

func RunCreateFile(envConfig *env.EnvConfig,
	logger *l.Logger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startToken int64,
	endToken int64) (BatchStats, error)

func RunCreateTableForBatch

func RunCreateTableForBatch(envConfig *env.EnvConfig,
	logger *l.Logger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) (BatchStats, error)

func RunCreateTableForCustomProcessorForBatch

func RunCreateTableForCustomProcessorForBatch(envConfig *env.EnvConfig,
	logger *l.Logger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) (BatchStats, error)

func RunCreateTableRelForBatch

func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
	logger *l.Logger,
	pCtx *ctx.MessageProcessingContext,
	readerNodeRunId int16,
	lookupNodeRunId int16,
	startLeftToken int64,
	endLeftToken int64) (BatchStats, error)

func RunReadFileForBatch

func RunReadFileForBatch(envConfig *env.EnvConfig, logger *l.Logger, pCtx *ctx.MessageProcessingContext, srcFileIdx int) (BatchStats, error)

func (*BatchStats) ToString added in v1.1.1

func (bs *BatchStats) ToString() string

type CustomProcessorRunner

type CustomProcessorRunner interface {
	Run(logger *l.Logger, pCtx *ctx.MessageProcessingContext, rsIn *Rowset, flushVarsArray func(varsArray []*eval.VarValuesMap, varsArrayCount int) error) error
}

type FileInserter

type FileInserter struct {
	PCtx          *ctx.MessageProcessingContext
	FileCreator   *sc.FileCreatorDef
	CurrentBatch  *WriteFileBatch
	BatchCapacity int
	BatchesIn     chan *WriteFileBatch
	ErrorsOut     chan error
	BatchesSent   int
	FinalFileUrl  string
	TempFilePath  string
}

type FileRecordHeap

type FileRecordHeap []*FileRecordHeapItem

func (FileRecordHeap) Len

func (h FileRecordHeap) Len() int

func (FileRecordHeap) Less

func (h FileRecordHeap) Less(i, j int) bool

func (*FileRecordHeap) Pop

func (h *FileRecordHeap) Pop() interface{}

func (*FileRecordHeap) Push

func (h *FileRecordHeap) Push(x interface{})

func (FileRecordHeap) Swap

func (h FileRecordHeap) Swap(i, j int)

type FileRecordHeapItem

type FileRecordHeapItem struct {
	FileRecord *[]interface{}
	Key        string
}

type Rowset

type Rowset struct {
	Fields                []sc.FieldRef
	FieldsByFullAliasName map[string]int
	FieldsByFieldName     map[string]int
	Rows                  []*[]interface{}
	RowCount              int
}

func NewRowsetFromFieldRefs

func NewRowsetFromFieldRefs(fieldRefsList ...sc.FieldRefs) *Rowset

func (*Rowset) AppendFieldRefs

func (rs *Rowset) AppendFieldRefs(fieldRefs *sc.FieldRefs)

func (*Rowset) AppendFieldRefsWithFilter

func (rs *Rowset) AppendFieldRefsWithFilter(fieldRefs *sc.FieldRefs, tableFilter string)

func (*Rowset) ArrangeByRowid

func (rs *Rowset) ArrangeByRowid(rowids []int64) error

func (*Rowset) ExportToVars

func (rs *Rowset) ExportToVars(rowIdx int, vars *eval.VarValuesMap) error

func (*Rowset) ExportToVarsWithAlias

func (rs *Rowset) ExportToVarsWithAlias(rowIdx int, vars *eval.VarValuesMap, useTableAlias string) error

func (*Rowset) GetFieldNames

func (rs *Rowset) GetFieldNames() *[]string

func (*Rowset) GetTableRecord

func (rs *Rowset) GetTableRecord(rowIdx int) (map[string]interface{}, error)

func (*Rowset) InitRows

func (rs *Rowset) InitRows(capacity int) error

func (*Rowset) ToString

func (rs *Rowset) ToString() string

type TableInserter

type TableInserter struct {
	PCtx            *ctx.MessageProcessingContext
	TableCreator    *sc.TableCreatorDef
	BatchSize       int
	RecordsIn       chan WriteChannelItem // Channel to pass records from the main function like RunCreateTableForBatch, usig add(), to TableInserter
	ErrorsOut       chan error
	RowidRand       *rand.Rand
	RandMutex       sync.Mutex
	NumWorkers      int
	MinInserterRate int
	WorkerWaitGroup sync.WaitGroup
	RecordsSent     int // Records sent to RecordsIn
	// TODO: the only reason we have this is because we decided to end handlers
	// with "defer instr.waitForWorkersAndCloseErrorsOut(logger, pCtx)" - not the cleanest way, get rid of this bool thingy.
	// That defer is convenient because there are so many early returns.
	RecordsInOpen bool
}

type TableRecord

type TableRecord map[string]interface{}

type TableRecordBatch

type TableRecordBatch []TableRecordPtr

type TableRecordPtr

type TableRecordPtr *map[string]interface{}

type WriteChannelItem

type WriteChannelItem struct {
	TableRecord *TableRecord
	IndexKeyMap map[string]string
}

type WriteFileBatch

type WriteFileBatch struct {
	Rows     [][]interface{}
	RowCount int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL