sstables

package
v1.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2024 License: Apache-2.0 Imports: 15 Imported by: 0

README

Using SSTables

SSTables allow you to store a large amount of key/value data on disk and query it efficiently by key or by key ranges. Unsurprisingly, this very format is at the heart of many NoSQL databases (i.e. HBase and Cassandra).

The flavor that is implemented in this library favours small keys and large values (eg. images), since it stores the key index in memory and the values remain on disk. A fully out-of-core version or secondary indices are currently not implemented. Features like bloom filter for faster key look-ups are already in place, so it is not too difficult to add later on.

Writing an SSTable

All files (key index, bloom filter, metadata info) that are necessary to store an SSTable are found under a given basePath in your filesystem. Which means that we can just start writing by creating a directory and appending some key/value pairs.

In the previous section we already saw how to transform a memstore into a sstable.
This example shows how to stream already sorted data into a file:


path := "/tmp/sstable_example/"
os.MkdirAll(path, 0777)
defer os.RemoveAll(path)

writer, err := sstables.NewSSTableStreamWriter(
    sstables.WriteBasePath(path),
    sstables.WithKeyComparator(skiplist.BytesComparator{}))
if err != nil { log.Fatalf("error: %v", err) }

err = writer.Open()
if err != nil { log.Fatalf("error: %v", err) }

// error checks omitted
err = writer.WriteNext([]byte{1}, []byte{1})
err = writer.WriteNext([]byte{2}, []byte{2})
err = writer.WriteNext([]byte{3}, []byte{3})

err = writer.Close()
if err != nil { log.Fatalf("error: %v", err) }

Keep in mind that streaming data requires a comparator (for safety), which will error on writes that are out of order.

Since that is somewhat cumbersome, you can also directly write a full skip list using the SimpleWriter:

path := "/tmp/sstable_example/"
os.MkdirAll(path, 0777)
defer os.RemoveAll(path)

writer, err := sstables.NewSSTableSimpleWriter(
    sstables.WriteBasePath(path),
    sstables.WithKeyComparator(skiplist.BytesComparator{}))
if err != nil { log.Fatalf("error: %v", err) }

skipListMap := skiplist.NewSkipListMap(skiplist.BytesComparator{})
skipListMap.Insert([]byte{1}, []byte{1})
skipListMap.Insert([]byte{2}, []byte{2})
skipListMap.Insert([]byte{3}, []byte{3})

err = writer.WriteSkipListMap(skipListMap)
if err != nil { log.Fatalf("error: %v", err) }

Reading an SSTable

Reading can be done by using having a path and the respective comparator. Below example will show what metadata is available, how to get values and check if they exist and how to do a range scan.

reader, err := sstables.NewSSTableReader(
    sstables.ReadBasePath("/tmp/sstable_example/"),
    sstables.ReadWithKeyComparator(skiplist.BytesComparator{}))
if err != nil { log.Fatalf("error: %v", err) }
defer reader.Close()

metadata := reader.MetaData()
log.Printf("reading table with %d records, minKey %d and maxKey %d", metadata.NumRecords, metadata.MinKey, metadata.MaxKey)

contains := reader.Contains([]byte{1})
val, err := reader.Get([]byte{1})
if err != nil { log.Fatalf("error: %v", err) }
log.Printf("table contains value for key? %t = %d", contains, val)

it, err := reader.ScanRange([]byte{1}, []byte{2})
for {
    k, v, err := it.Next()
    // io.EOF signals that no records are left to be read
    if errors.is(err, sstables.Done) {
        break
    }
    if err != nil { log.Fatalf("error: %v", err) }

    log.Printf("%d = %d", k, v)
}

You can get the full example from examples/sstables.go.

Merging two (or more) SSTables

One of the great features of SSTables is that you can merge them in linear time and in a sequential fashion, which needs only constant amount of space.

In this library, this can be easily composed here via full-table scanners and and a writer to output the resulting merged table:

var iterators []sstables.SSTableMergeIteratorContext
for i := 0; i < numFiles; i++ {
    reader, err := sstables.NewSSTableReader(
            ReadBasePath(sstablePath),
            ReadWithKeyComparator(skiplist.BytesComparator{}))
    if err != nil { log.Fatalf("error: %v", err) }
    defer reader.Close()
    
    it, err := reader.Scan()
    if err != nil { log.Fatalf("error: %v", err) }

    iterators = append(iterators, sstables.NewMergeIteratorContext(i, it))
}

writer, err := sstables.NewSSTableSimpleWriter(
    sstables.WriteBasePath(path),
    sstables.WithKeyComparator(skiplist.BytesComparator{}))
if err != nil { log.Fatalf("error: %v", err) }

merger := NewSSTableMerger(skiplist.BytesComparator{})
// merge takes care of opening/closing itself
err = merger.Merge(iterators, outWriter)

if err != nil { log.Fatalf("error: %v", err) }

// do something with the merged sstable

The merge logic itself is based on a heap, so it can scale to thousands of files easily.

There might be some cases where you want to have the ability to compact while you're merging the files. This is where MergeCompact comes in handy, there you can supply a simple reduce function to directly compact the values for a given key. Below example illustrates this functionality:

reduceFunc := func(key []byte, values [][]byte, context []int) ([]byte, []byte) {
    // always pick the first one
    return key, values[0]
}

merger := sstables.NewSSTableMerger(skiplist.BytesComparator{})
err = merger.MergeCompact(iterators, outWriter, reduceFunc)

The context gives you the ability to figure out which value originated from which file/iterator. The context slice is parallel to the values slice, so the value at index 0 originated from the context at index 0.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BloomFileName = "bloom.bf.gz"
View Source
var ChecksumErr = ChecksumError{}
View Source
var DataFileName = "data.rio"
View Source
var Done = errors.New("no more items in iterator")

Done indicates an iterator has returned all items. https://github.com/GoogleCloudPlatform/google-cloud-go/wiki/Iterator-Guidelines

View Source
var IndexFileName = "index.rio"
View Source
var MetaFileName = "meta.pb.bin"
View Source
var NotFound = errors.New("key was not found")
View Source
var Version = uint32(1)

Functions

func ScanReduceLatestWins added in v1.3.0

func ScanReduceLatestWins(key []byte, values [][]byte, context []int) ([]byte, []byte)

ScanReduceLatestWins is a simple version of a merge where the latest value always wins. Latest is determined by looping the context and finding the biggest value denoted by integers (assuming context is actually []int).

Types

type ChecksumError added in v1.5.0

type ChecksumError struct {
	// contains filtered or unexported fields
}

func (ChecksumError) Error added in v1.5.0

func (e ChecksumError) Error() string

func (ChecksumError) Is added in v1.5.0

func (e ChecksumError) Is(err error) bool

type EmptySSTableIterator added in v1.3.0

type EmptySSTableIterator struct{}

func (EmptySSTableIterator) Next added in v1.3.0

func (EmptySSTableIterator) Next() ([]byte, []byte, error)

type EmptySStableReader added in v1.3.0

type EmptySStableReader struct{}

func (EmptySStableReader) BasePath added in v1.3.0

func (EmptySStableReader) BasePath() string

func (EmptySStableReader) Close added in v1.3.0

func (EmptySStableReader) Close() error

func (EmptySStableReader) Contains added in v1.3.0

func (EmptySStableReader) Contains(_ []byte) bool

func (EmptySStableReader) Get added in v1.3.0

func (EmptySStableReader) Get(_ []byte) ([]byte, error)

func (EmptySStableReader) MetaData added in v1.3.0

func (EmptySStableReader) MetaData() *proto.MetaData

func (EmptySStableReader) Scan added in v1.3.0

func (EmptySStableReader) ScanRange added in v1.3.0

func (EmptySStableReader) ScanRange(_ []byte, _ []byte) (SSTableIteratorI, error)

func (EmptySStableReader) ScanStartingAt added in v1.3.0

func (EmptySStableReader) ScanStartingAt(_ []byte) (SSTableIteratorI, error)

type MergeCompactionIterator added in v1.3.0

type MergeCompactionIterator struct {
	// contains filtered or unexported fields
}

func (*MergeCompactionIterator) Next added in v1.3.0

func (m *MergeCompactionIterator) Next() ([]byte, []byte, error)

type ReadOption

type ReadOption func(*SSTableReaderOptions)

func EnableHashCheckOnReads added in v1.5.0

func EnableHashCheckOnReads() ReadOption

EnableHashCheckOnReads will check data integrity everywhere the value is retrieved, e.g. when getting and scanning. This is off by default, in favor of checking the data integrity during load time.

func ReadBasePath

func ReadBasePath(p string) ReadOption

func ReadWithKeyComparator

func ReadWithKeyComparator(cmp skiplist.Comparator[[]byte]) ReadOption

func SkipHashCheckOnLoad added in v1.5.0

func SkipHashCheckOnLoad() ReadOption

SkipHashCheckOnLoad will not check hashes against data read from the datafile when loading.

func SkipInvalidHashesOnLoad added in v1.5.0

func SkipInvalidHashesOnLoad() ReadOption

SkipInvalidHashesOnLoad will not index key/value pairs that have a hash mismatch in them. The database will pretend it does not know those records.

type ReduceFunc added in v1.5.0

type ReduceFunc func([]byte, [][]byte, []int) ([]byte, []byte)

type SSTableFullScanIterator added in v1.3.0

type SSTableFullScanIterator struct {
	// contains filtered or unexported fields
}

SSTableFullScanIterator this is an optimized iterator that does a sequential read over the index+data files instead of a sequential read on the index with a random access lookup on the data file via mmap

func (*SSTableFullScanIterator) Next added in v1.3.0

func (it *SSTableFullScanIterator) Next() ([]byte, []byte, error)

type SSTableIterator

type SSTableIterator struct {
	// contains filtered or unexported fields
}

func (*SSTableIterator) Next

func (it *SSTableIterator) Next() ([]byte, []byte, error)

type SSTableIteratorI

type SSTableIteratorI interface {
	// Next returns the next key, value in sequence.
	// Returns Done as the error when the iterator is exhausted
	Next() ([]byte, []byte, error)
}

type SSTableMergeIteratorContext added in v1.4.0

type SSTableMergeIteratorContext struct {
	// contains filtered or unexported fields
}

func NewMergeIteratorContext added in v1.4.0

func NewMergeIteratorContext(context int, iterator SSTableIteratorI) SSTableMergeIteratorContext

func (SSTableMergeIteratorContext) Context added in v1.4.0

func (s SSTableMergeIteratorContext) Context() int

func (SSTableMergeIteratorContext) Next added in v1.4.0

func (s SSTableMergeIteratorContext) Next() ([]byte, []byte, error)

type SSTableMerger added in v1.1.0

type SSTableMerger struct {
	// contains filtered or unexported fields
}

func NewSSTableMerger added in v1.1.0

func NewSSTableMerger(comp skiplist.Comparator[[]byte]) SSTableMerger

func (SSTableMerger) Merge added in v1.1.0

func (m SSTableMerger) Merge(iterators []SSTableMergeIteratorContext, writer SSTableStreamWriterI) (err error)

Merge accepts a slice of sstable iterators to merge into an already opened writer. The caller needs to close the writer.

func (SSTableMerger) MergeCompact added in v1.3.0

func (m SSTableMerger) MergeCompact(iterators []SSTableMergeIteratorContext, writer SSTableStreamWriterI, reduce ReduceFunc) (err error)

MergeCompact accepts a slice of sstable iterators to merge into an already opened writer. The caller needs to close the writer.

func (SSTableMerger) MergeCompactIterator added in v1.3.0

func (m SSTableMerger) MergeCompactIterator(iterators []SSTableMergeIteratorContext, reduce ReduceFunc) (SSTableIteratorI, error)

type SSTableMergerI added in v1.1.0

type SSTableMergerI interface {
	// Merge merges/writes the given Iterators into a single sorted SSTable
	Merge(iterators []SSTableIteratorI, writer SSTableStreamWriterI) error
	// MergeCompact is like merge, but accumulates values for the same key and presents it as a
	// "reduction" function to compact values for the same key.
	// reduce receives a key and a slice of values - it then needs to return a single key and value.
	MergeCompact(iterators []SSTableIteratorI, writer SSTableStreamWriterI,
		reduce func([]byte, [][]byte) ([]byte, []byte)) error
}

type SSTableReader

type SSTableReader struct {
	// contains filtered or unexported fields
}

func (*SSTableReader) BasePath added in v1.3.0

func (reader *SSTableReader) BasePath() string

func (*SSTableReader) Close

func (reader *SSTableReader) Close() (err error)

func (*SSTableReader) Contains

func (reader *SSTableReader) Contains(key []byte) bool

func (*SSTableReader) Get

func (reader *SSTableReader) Get(key []byte) ([]byte, error)

func (*SSTableReader) MetaData

func (reader *SSTableReader) MetaData() *proto.MetaData

func (*SSTableReader) Scan added in v1.1.0

func (reader *SSTableReader) Scan() (SSTableIteratorI, error)

func (*SSTableReader) ScanRange

func (reader *SSTableReader) ScanRange(keyLower []byte, keyHigher []byte) (SSTableIteratorI, error)

func (*SSTableReader) ScanStartingAt

func (reader *SSTableReader) ScanStartingAt(key []byte) (SSTableIteratorI, error)

type SSTableReaderI

type SSTableReaderI interface {
	// Contains returns true when the given key exists, false otherwise
	Contains(key []byte) bool
	// Get returns the value associated with the given key, NotFound as the error otherwise
	Get(key []byte) ([]byte, error)
	// Scan returns an iterator over the whole sorted sequence. Scan uses a more optimized version that iterates the
	// data file sequentially, whereas the other Scan* functions use the index and random access using mmap.
	Scan() (SSTableIteratorI, error)
	// ScanStartingAt returns an iterator over the sorted sequence starting at the given key (inclusive if key is in the list).
	// Using a key that is out of the sequence range will result in either an empty iterator or the full sequence.
	ScanStartingAt(key []byte) (SSTableIteratorI, error)
	// ScanRange returns an iterator over the sorted sequence starting at the given keyLower (inclusive if key is in the list)
	// and until the given keyHigher was reached (inclusive if key is in the list).
	// Using keys that are out of the sequence range will result in either an empty iterator or the full sequence.
	// If keyHigher is lower than keyLower an error will be returned.
	ScanRange(keyLower []byte, keyHigher []byte) (SSTableIteratorI, error)
	// Close closes this sstable reader
	Close() error
	// MetaData returns the metadata of this sstable
	MetaData() *proto.MetaData
	// BasePath returns the base path / root path of this sstable that contains all the files.
	BasePath() string
}

func NewSSTableReader

func NewSSTableReader(readerOptions ...ReadOption) (SSTableReaderI, error)

NewSSTableReader creates a new reader. The sstable base path and comparator are mandatory: > sstables.NewSSTableReader(sstables.ReadBasePath("some_path"), sstables.ReadWithKeyComparator(some_comp)) This function will check hashes and validity of the datafile matching the index file.

type SSTableReaderOptions

type SSTableReaderOptions struct {
	// contains filtered or unexported fields
}

SSTableReaderOptions contains both read/write options

type SSTableSimpleWriter

type SSTableSimpleWriter struct {
	// contains filtered or unexported fields
}

func NewSSTableSimpleWriter

func NewSSTableSimpleWriter(writerOptions ...WriterOption) (*SSTableSimpleWriter, error)

func (*SSTableSimpleWriter) WriteSkipListMap

func (writer *SSTableSimpleWriter) WriteSkipListMap(skipListMap skiplist.MapI[[]byte, []byte]) (err error)

type SSTableSimpleWriterI

type SSTableSimpleWriterI interface {
	// WriteSkipList writes all records of that SkipList to a sstable disk structure, expects []byte as key and value
	WriteSkipList(skipListMap *skiplist.MapI[[]byte, []byte]) error
}

type SSTableStreamWriter

type SSTableStreamWriter struct {
	// contains filtered or unexported fields
}

func NewSSTableStreamWriter

func NewSSTableStreamWriter(writerOptions ...WriterOption) (*SSTableStreamWriter, error)

NewSSTableStreamWriter creates a new streamed writer, the minimum options required are the base path and the comparator: > sstables.NewSSTableStreamWriter(sstables.WriteBasePath("some_existing_folder"), sstables.WithKeyComparator(some_comparator))

func (*SSTableStreamWriter) Close

func (writer *SSTableStreamWriter) Close() (err error)

func (*SSTableStreamWriter) Open

func (writer *SSTableStreamWriter) Open() error

func (*SSTableStreamWriter) WriteNext

func (writer *SSTableStreamWriter) WriteNext(key []byte, value []byte) error

type SSTableStreamWriterI

type SSTableStreamWriterI interface {
	// Open opens the sstable files.
	Open() error
	// WriteNext writes the next record to a sstable disk structure, expects keys to be ordered.
	WriteNext(key []byte, value []byte) error
	// Close closes the sstable files.
	Close() error
}

type SSTableWriterOptions

type SSTableWriterOptions struct {
	// contains filtered or unexported fields
}

type SuperSSTableReader added in v1.3.0

type SuperSSTableReader struct {
	// contains filtered or unexported fields
}

SuperSSTableReader unifies several sstables under one single reader with the same interface. The ordering of the readers matters, it is assumed the older reader comes before the newer (ascending order).

func NewSuperSSTableReader added in v1.3.0

func NewSuperSSTableReader(readers []SSTableReaderI, comp skiplist.Comparator[[]byte]) *SuperSSTableReader

func (SuperSSTableReader) BasePath added in v1.3.0

func (s SuperSSTableReader) BasePath() string

func (SuperSSTableReader) Close added in v1.3.0

func (s SuperSSTableReader) Close() (err error)

func (SuperSSTableReader) Contains added in v1.3.0

func (s SuperSSTableReader) Contains(key []byte) bool

func (SuperSSTableReader) Get added in v1.3.0

func (s SuperSSTableReader) Get(key []byte) ([]byte, error)

func (SuperSSTableReader) MetaData added in v1.3.0

func (s SuperSSTableReader) MetaData() *proto.MetaData

func (SuperSSTableReader) Scan added in v1.3.0

func (SuperSSTableReader) ScanRange added in v1.3.0

func (s SuperSSTableReader) ScanRange(keyLower []byte, keyHigher []byte) (SSTableIteratorI, error)

func (SuperSSTableReader) ScanStartingAt added in v1.3.0

func (s SuperSSTableReader) ScanStartingAt(key []byte) (SSTableIteratorI, error)

type V0SSTableFullScanIterator added in v1.3.0

type V0SSTableFullScanIterator struct {
	// contains filtered or unexported fields
}

V0SSTableFullScanIterator deprecated, since this is for the v0 protobuf based sstables. this is an optimized iterator that does a sequential read over the index+data files instead of a sequential read on the index with a random access lookup on the data file via mmap

func (*V0SSTableFullScanIterator) Next added in v1.3.0

func (it *V0SSTableFullScanIterator) Next() ([]byte, []byte, error)

type WriterOption

type WriterOption func(*SSTableWriterOptions)

func BloomExpectedNumberOfElements

func BloomExpectedNumberOfElements(n uint64) WriterOption

func BloomFalsePositiveProbability

func BloomFalsePositiveProbability(fpProbability float64) WriterOption

func DataCompressionType

func DataCompressionType(p int) WriterOption

func EnableBloomFilter

func EnableBloomFilter() WriterOption

func IndexCompressionType

func IndexCompressionType(p int) WriterOption

func WithKeyComparator

func WithKeyComparator(cmp skiplist.Comparator[[]byte]) WriterOption

func WriteBasePath

func WriteBasePath(p string) WriterOption

func WriteBufferSizeBytes added in v1.3.0

func WriteBufferSizeBytes(bufSizeBytes int) WriterOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL