Documentation ¶
Index ¶
- Constants
- Variables
- func FlushToDisk(logPrefix string, b Buffer, tmpdir string, doFsync bool, lvl log.Lvl) (dataProvider, error)
- func KeepInRAM(buffer Buffer) dataProvider
- func NewAppendBuffer(bufferOptimalSize datasize.ByteSize) *appendSortableBuffer
- func NewLatestMergedEntryMergedBuffer(bufferOptimalSize datasize.ByteSize, merger func([]byte, []byte) []byte) *oldestMergedEntrySortableBuffer
- func NewOldestEntryBuffer(bufferOptimalSize datasize.ByteSize) *oldestEntrySortableBuffer
- func NewSortableBuffer(bufferOptimalSize datasize.ByteSize) *sortableBuffer
- func NextKey(key []byte) ([]byte, error)
- func ProgressFromKey(k []byte) int
- func Transform(logPrefix string, db kv.RwTx, fromBucket string, toBucket string, ...) error
- type AdditionalLogArguments
- type Buffer
- type Collector
- type CurrentTableReader
- type ExtractFunc
- type ExtractNextFunc
- type Heap
- type HeapElem
- type LoadCommitHandler
- type LoadFunc
- type LoadNextFunc
- type TransformArgs
Constants ¶
const ( //SliceBuffer - just simple slice w SortableSliceBuffer = iota //SortableAppendBuffer - map[k] [v1 v2 v3] SortableAppendBuffer // SortableOldestAppearedBuffer - buffer that keeps only the oldest entries. // if first v1 was added under key K, then v2; only v1 will stay SortableOldestAppearedBuffer SortableMergeBuffer //BufIOSize - 128 pages | default is 1 page | increasing over `64 * 4096` doesn't show speedup on SSD/NVMe, but show speedup in cloud drives BufIOSize = 128 * 4096 )
Variables ¶
var BufferOptimalSize = 256 * datasize.MB /* var because we want to sometimes change it from tests or command-line flags */
Functions ¶
func FlushToDisk ¶
func FlushToDisk(logPrefix string, b Buffer, tmpdir string, doFsync bool, lvl log.Lvl) (dataProvider, error)
FlushToDisk - `doFsync` is true only for 'critical' collectors (which should not loose).
func NewAppendBuffer ¶
func NewOldestEntryBuffer ¶
func NewSortableBuffer ¶
func NextKey ¶
NextKey generates the possible next key w/o changing the key length. for [0x01, 0x01, 0x01] it will generate [0x01, 0x01, 0x02], etc
func ProgressFromKey ¶
Types ¶
type AdditionalLogArguments ¶
type AdditionalLogArguments func(k, v []byte) (additionalLogArguments []interface{})
type Collector ¶
type Collector struct {
// contains filtered or unexported fields
}
Collector performs the job of ETL Transform, but can also be used without "E" (Extract) part as a Collect Transform Load
func NewCollector ¶
func NewCollectorFromFiles ¶
NewCollectorFromFiles creates collector from existing files (left over from previous unsuccessful loading)
func NewCriticalCollector ¶
func NewCriticalCollector(logPrefix, tmpdir string, sortableBuffer Buffer, logger log.Logger) *Collector
NewCriticalCollector does not clean up temporary files if loading has failed
func (*Collector) Flush ¶
Flush - an optional method (usually user don't need to call it) - forcing sort+flush current buffer. it does trigger background sort and flush, reducing RAM-holding, etc... it's useful when working with many collectors: to trigger background sort for all of them
type CurrentTableReader ¶
type ExtractFunc ¶
type ExtractFunc func(k []byte, v []byte, next ExtractNextFunc) error
type ExtractNextFunc ¶
type LoadCommitHandler ¶
LoadCommitHandler is a callback called each time a new batch is being loaded from files into a DB * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey) * `isDone`: true, if everything is processed
type LoadFunc ¶
type LoadFunc func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error
var IdentityLoadFunc LoadFunc = func(k []byte, value []byte, _ CurrentTableReader, next LoadNextFunc) error {
return next(k, k, value)
}
IdentityLoadFunc loads entries as they are, without transformation
type LoadNextFunc ¶
type TransformArgs ¶
type TransformArgs struct { Quit <-chan struct{} LogDetailsExtract AdditionalLogArguments LogDetailsLoad AdditionalLogArguments // [ExtractStartKey, ExtractEndKey) ExtractStartKey []byte ExtractEndKey []byte BufferType int BufferSize int }