Documentation ¶
Index ¶
- Constants
- func CreateDataTableCql(keyspace string, runId int16, tableCreator *sc.TableCreatorDef) string
- func CreateIdxTableCql(keyspace string, runId int16, idxName string, idxDef *sc.IdxDef) string
- func DeleteDataAndUniqueIndexesByBatchIdx(logger *l.Logger, pCtx *ctx.MessageProcessingContext) error
- type BatchStats
- func RunCreateFile(envConfig *env.EnvConfig, logger *l.Logger, pCtx *ctx.MessageProcessingContext, ...) (BatchStats, error)
- func RunCreateTableForBatch(envConfig *env.EnvConfig, logger *l.Logger, pCtx *ctx.MessageProcessingContext, ...) (BatchStats, error)
- func RunCreateTableForCustomProcessorForBatch(envConfig *env.EnvConfig, logger *l.Logger, pCtx *ctx.MessageProcessingContext, ...) (BatchStats, error)
- func RunCreateTableRelForBatch(envConfig *env.EnvConfig, logger *l.Logger, pCtx *ctx.MessageProcessingContext, ...) (BatchStats, error)
- func RunReadFileForBatch(envConfig *env.EnvConfig, logger *l.Logger, pCtx *ctx.MessageProcessingContext, ...) (BatchStats, error)
- type CustomProcessorRunner
- type FileInserter
- type FileRecordHeap
- type FileRecordHeapItem
- type Rowset
- func (rs *Rowset) AppendFieldRefs(fieldRefs *sc.FieldRefs)
- func (rs *Rowset) AppendFieldRefsWithFilter(fieldRefs *sc.FieldRefs, tableFilter string)
- func (rs *Rowset) ArrangeByRowid(rowids []int64) error
- func (rs *Rowset) ExportToVars(rowIdx int, vars *eval.VarValuesMap) error
- func (rs *Rowset) ExportToVarsWithAlias(rowIdx int, vars *eval.VarValuesMap, useTableAlias string) error
- func (rs *Rowset) GetFieldNames() *[]string
- func (rs *Rowset) GetTableRecord(rowIdx int) (map[string]interface{}, error)
- func (rs *Rowset) InitRows(capacity int) error
- func (rs *Rowset) ToString() string
- type TableInserter
- type TableRecord
- type TableRecordBatch
- type TableRecordPtr
- type WriteChannelItem
- type WriteFileBatch
Constants ¶
View Source
const DefaultFileInserterBatchCapacity int = 1000
View Source
const DefaultInserterBatchSize int = 5000
View Source
const HarvestForDeleteRowsetSize = 1000 // Do not let users tweak it, maybe too sensitive
Variables ¶
This section is empty.
Functions ¶
func CreateDataTableCql ¶
func CreateDataTableCql(keyspace string, runId int16, tableCreator *sc.TableCreatorDef) string
func CreateIdxTableCql ¶
func DeleteDataAndUniqueIndexesByBatchIdx ¶
func DeleteDataAndUniqueIndexesByBatchIdx(logger *l.Logger, pCtx *ctx.MessageProcessingContext) error
Types ¶
type BatchStats ¶ added in v1.1.1
func RunCreateFile ¶
func RunCreateTableForBatch ¶
func RunReadFileForBatch ¶
func RunReadFileForBatch(envConfig *env.EnvConfig, logger *l.Logger, pCtx *ctx.MessageProcessingContext, srcFileIdx int) (BatchStats, error)
func (*BatchStats) ToString ¶ added in v1.1.1
func (bs *BatchStats) ToString() string
type CustomProcessorRunner ¶
type CustomProcessorRunner interface {
Run(logger *l.Logger, pCtx *ctx.MessageProcessingContext, rsIn *Rowset, flushVarsArray func(varsArray []*eval.VarValuesMap, varsArrayCount int) error) error
}
type FileInserter ¶
type FileInserter struct { PCtx *ctx.MessageProcessingContext FileCreator *sc.FileCreatorDef CurrentBatch *WriteFileBatch BatchCapacity int BatchesIn chan *WriteFileBatch ErrorsOut chan error BatchesSent int FinalFileUrl string TempFilePath string }
type FileRecordHeap ¶
type FileRecordHeap []*FileRecordHeapItem
func (FileRecordHeap) Len ¶
func (h FileRecordHeap) Len() int
func (FileRecordHeap) Less ¶
func (h FileRecordHeap) Less(i, j int) bool
func (*FileRecordHeap) Pop ¶
func (h *FileRecordHeap) Pop() interface{}
func (*FileRecordHeap) Push ¶
func (h *FileRecordHeap) Push(x interface{})
func (FileRecordHeap) Swap ¶
func (h FileRecordHeap) Swap(i, j int)
type FileRecordHeapItem ¶
type FileRecordHeapItem struct { FileRecord *[]interface{} Key string }
type Rowset ¶
type Rowset struct { Fields []sc.FieldRef FieldsByFullAliasName map[string]int FieldsByFieldName map[string]int Rows []*[]interface{} RowCount int }
func NewRowsetFromFieldRefs ¶
func (*Rowset) AppendFieldRefs ¶
func (*Rowset) AppendFieldRefsWithFilter ¶
func (*Rowset) ArrangeByRowid ¶
func (*Rowset) ExportToVars ¶
func (rs *Rowset) ExportToVars(rowIdx int, vars *eval.VarValuesMap) error
func (*Rowset) ExportToVarsWithAlias ¶
func (*Rowset) GetFieldNames ¶
func (*Rowset) GetTableRecord ¶
type TableInserter ¶
type TableInserter struct { PCtx *ctx.MessageProcessingContext TableCreator *sc.TableCreatorDef BatchSize int RecordsIn chan WriteChannelItem // Channel to pass records from the main function like RunCreateTableForBatch, usig add(), to TableInserter ErrorsOut chan error RowidRand *rand.Rand RandMutex sync.Mutex NumWorkers int MinInserterRate int WorkerWaitGroup sync.WaitGroup RecordsSent int // Records sent to RecordsIn // TODO: the only reason we have this is because we decided to end handlers // with "defer instr.waitForWorkersAndCloseErrorsOut(logger, pCtx)" - not the cleanest way, get rid of this bool thingy. // That defer is convenient because there are so many early returns. RecordsInOpen bool }
type TableRecord ¶
type TableRecord map[string]interface{}
type TableRecordBatch ¶
type TableRecordBatch []TableRecordPtr
type TableRecordPtr ¶
type TableRecordPtr *map[string]interface{}
type WriteChannelItem ¶
type WriteChannelItem struct { TableRecord *TableRecord IndexKeyMap map[string]string }
type WriteFileBatch ¶
type WriteFileBatch struct { Rows [][]interface{} RowCount int }
Click to show internal directories.
Click to hide internal directories.