Documentation ¶
Index ¶
- Constants
- type Compaction
- type CompactionType
- type FileCompaction
- func (f *FileCompaction) Compact(compact []parts.Part, options ...parts.Option) ([]parts.Part, int64, int64, error)
- func (f *FileCompaction) MaxSize() int64
- func (f *FileCompaction) Reset()
- func (f *FileCompaction) Snapshot(_ []parts.Part, _ func(parts.Part) error, dir string) error
- func (f *FileCompaction) Sync() error
- type LSM
- func (l *LSM) Add(tx uint64, record arrow.Record)
- func (l *LSM) Close() error
- func (l *LSM) EnsureCompaction() error
- func (l *LSM) InsertPart(part parts.Part)
- func (l *LSM) Iterate(iter func(node *Node) bool)
- func (l *LSM) LevelSize(t SentinelType) int64
- func (l *LSM) MaxLevel() SentinelType
- func (l *LSM) Prefixes(_ context.Context, _ string) ([]string, error)
- func (l *LSM) Rotate(externalWriter func([]parts.Part) (parts.Part, int64, int64, error)) error
- func (l *LSM) Scan(ctx context.Context, _ string, _ *dynparquet.Schema, filter logicalplan.Expr, ...) error
- func (l *LSM) Size() int64
- func (l *LSM) Snapshot(writer func(parts.Part) error, dir string) error
- func (l *LSM) String() string
- func (l *LSM) WaitForPendingCompactions()
- type LSMMetrics
- type LSMOption
- type Level
- type LevelConfig
- type Node
- type ReleaseableRowGroup
- type SentinelType
Constants ¶
const ( IndexFileExtension = ".idx" ParquetCompactionTXKey = "compaction_tx" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Compaction ¶
type CompactionType ¶
type CompactionType int
const ( CompactionTypeUnknown CompactionType = iota // CompactionTypeParquetDisk is a compaction type that will compact the parts into a Parquet file on disk. CompactionTypeParquetDisk // CompactionTypeParquetMemory is a compaction type that will compact the parts into a Parquet file in memory. CompactionTypeParquetMemory )
type FileCompaction ¶
type FileCompaction struct {
// contains filtered or unexported fields
}
func NewFileCompaction ¶
func NewFileCompaction(dir string, maxSize int64, compact Compaction, logger log.Logger) (*FileCompaction, error)
func (*FileCompaction) Compact ¶
func (f *FileCompaction) Compact(compact []parts.Part, options ...parts.Option) ([]parts.Part, int64, int64, error)
Compact will compact the given parts into a Parquet file written to the next level file.
func (*FileCompaction) MaxSize ¶
func (f *FileCompaction) MaxSize() int64
func (*FileCompaction) Reset ¶
func (f *FileCompaction) Reset()
Reset is called when the level no longer has active parts in it at the end of a compaction.
func (*FileCompaction) Snapshot ¶
Snapshot takes a snapshot of the current level. It ignores the parts and just hard links the files into the snapshot directory. It will rotate the active file if it has data in it rendering all snapshotted files as immutable.
func (*FileCompaction) Sync ¶
func (f *FileCompaction) Sync() error
Sync calls Sync on the underlying file.
type LSM ¶
LSM is a log-structured merge-tree like index. It is implemented as a single linked list of parts.
Arrow records are always added to the L0 list. When a list reaches it's configured max size it is compacted calling the levels Compact function and is then added as a new part to the next level.
L0->[record]->[record]->[L1]->[record/parquet]->[record/parquet] etc.
func NewLSM ¶
func NewLSM(dir string, schema *dynparquet.Schema, levels []*LevelConfig, wait func(uint64), options ...LSMOption) (*LSM, error)
NewLSM returns an LSM-like index of len(levels) levels. wait is a function that will block until the given transaction has been committed; this is used only during compaction to ensure that all the tx in the level up to the compaction tx have been committed before compacting.
func (*LSM) EnsureCompaction ¶
EnsureCompaction forces a compaction of all levels, regardless of whether the levels are below the target size.
func (*LSM) InsertPart ¶
InsertPart inserts a part into the LSM tree. It will be inserted into the correct level. It does not check if the insert should cause a compaction. This should only be used during snapshot recovery. It will drop the insert on the floor if the part is older than a part in the next level of the LSM. This indicates that this part is already accounted for in the next level vis compaction.
func (*LSM) LevelSize ¶
func (l *LSM) LevelSize(t SentinelType) int64
LevelSize returns the size of a specific level in bytes.
func (*LSM) MaxLevel ¶
func (l *LSM) MaxLevel() SentinelType
func (*LSM) Rotate ¶
Rotate will write all parts in the LSM into the external writer. No changes are made to the LSM.
func (*LSM) Snapshot ¶
Snapshot creates a snapshot of the index at the given transaction. It will call the writer function with the parts in the index that are in-memory.
func (*LSM) WaitForPendingCompactions ¶
func (l *LSM) WaitForPendingCompactions()
type LSMMetrics ¶
type LSMMetrics struct { Compactions *prometheus.CounterVec LevelSize *prometheus.GaugeVec CompactionDuration prometheus.Histogram }
LSMMetrics are the metrics for an LSM index.
func NewLSMMetrics ¶
func NewLSMMetrics(reg prometheus.Registerer) *LSMMetrics
type LSMOption ¶
type LSMOption func(*LSM)
func LSMWithLogger ¶
func LSMWithMetrics ¶
func LSMWithMetrics(metrics *LSMMetrics) LSMOption
type LevelConfig ¶
type LevelConfig struct { Level SentinelType MaxSize int64 Type CompactionType Compact Compaction }
LevelConfig is the configuration for a level in the LSM. The MaxSize is the maximum size of the level in bytes before it triggers a compaction into the next level.
type Node ¶
type Node struct {
// contains filtered or unexported fields
}
Node is a Part that is a part of a linked-list.
func NewList ¶
func NewList(sentinel SentinelType) *Node
NewList creates a new part list using atomic constructs.
func (*Node) Sentinel ¶
func (n *Node) Sentinel(s SentinelType) *Node
Sentinel adds a new sentinel node to the list, and returns the sub list starting from that sentinel.
type ReleaseableRowGroup ¶
type ReleaseableRowGroup interface { dynparquet.DynamicRowGroup Release() }
type SentinelType ¶
type SentinelType int
const ( L0 SentinelType = iota L1 L2 )
func (SentinelType) String ¶
func (s SentinelType) String() string