Documentation ¶
Index ¶
- Constants
- Variables
- func FlushToDisk(encoder Encoder, currentKey []byte, b Buffer, tmpdir string) (dataProvider, error)
- func KeepInRAM(buffer Buffer) dataProvider
- func NewAppendBuffer(bufferOptimalSize datasize.ByteSize) *appendSortableBuffer
- func NewOldestEntryBuffer(bufferOptimalSize datasize.ByteSize) *oldestEntrySortableBuffer
- func NewSortableBuffer(bufferOptimalSize datasize.ByteSize) *sortableBuffer
- func NextKey(key []byte) ([]byte, error)
- func ProgressFromKey(k []byte) int
- func Transform(logPrefix string, db ethdb.RwTx, fromBucket string, toBucket string, ...) error
- type AdditionalLogArguments
- type Buffer
- type Collector
- type CurrentTableReader
- type Decoder
- type Encoder
- type ExtractFunc
- type ExtractNextFunc
- type Heap
- type HeapElem
- type LoadCommitHandler
- type LoadFunc
- type LoadNextFunc
- type TransformArgs
Constants ¶
const ( //SliceBuffer - just simple slice w SortableSliceBuffer = iota //SortableAppendBuffer - map[k] [v1 v2 v3] SortableAppendBuffer // SortableOldestAppearedBuffer - buffer that keeps only the oldest entries. // if first v1 was added under key K, then v2; only v1 will stay SortableOldestAppearedBuffer BufIOSize = 64 * 4096 // 64 pages | default is 1 page | increasing further doesn't show speedup on SSD )
const TmpDirName = "etl-temp"
Variables ¶
var BufferOptimalSize = 256 * datasize.MB /* var because we want to sometimes change it from tests or command-line flags */
Functions ¶
func FlushToDisk ¶
func NewAppendBuffer ¶
func NewOldestEntryBuffer ¶
func NewSortableBuffer ¶
func NextKey ¶
NextKey generates the possible next key w/o changing the key length. for [0x01, 0x01, 0x01] it will generate [0x01, 0x01, 0x02], etc
func ProgressFromKey ¶
Types ¶
type AdditionalLogArguments ¶
type AdditionalLogArguments func(k, v []byte) (additionalLogArguments []interface{})
type Collector ¶
type Collector struct {
// contains filtered or unexported fields
}
Collector performs the job of ETL Transform, but can also be used without "E" (Extract) part as a Collect Transform Load
func NewCollector ¶
func NewCollectorFromFiles ¶
NewCollectorFromFiles creates collector from existing files (left over from previous unsuccessful loading)
func NewCriticalCollector ¶
NewCriticalCollector does not clean up temporary files if loading has failed
type CurrentTableReader ¶
type ExtractFunc ¶
type ExtractFunc func(k []byte, v []byte, next ExtractNextFunc) error
type ExtractNextFunc ¶
type Heap ¶
type Heap struct {
// contains filtered or unexported fields
}
func (Heap) SetComparator ¶
type LoadCommitHandler ¶
LoadCommitHandler is a callback called each time a new batch is being loaded from files into a DB * `key`: last commited key to the database (use etl.NextKey helper to use in LoadStartKey) * `isDone`: true, if everything is processed
type LoadFunc ¶
type LoadFunc func(k []byte, value []byte, table CurrentTableReader, next LoadNextFunc) error
var IdentityLoadFunc LoadFunc = func(k []byte, value []byte, _ CurrentTableReader, next LoadNextFunc) error {
return next(k, k, value)
}
IdentityLoadFunc loads entries as they are, without transformation
type LoadNextFunc ¶
type TransformArgs ¶
type TransformArgs struct { ExtractStartKey []byte ExtractEndKey []byte FixedBits int BufferType int BufferSize int Quit <-chan struct{} LogDetailsExtract AdditionalLogArguments LogDetailsLoad AdditionalLogArguments Comparator dbutils.CmpFunc }