Documentation ¶
Overview ¶
LSMKV (= Log-structured Merge-Tree Key-Value Store) ¶
This package contains Weaviate's custom LSM store. While modeled after the usecases that are required for Weaviate to be fast, reliable, and scalable, it is technically completely independent. You could build your own database on top of this key-value store.
Covering the architecture of LSM Stores in general goes beyond the scope of this documentation. Therefore things that are specific to this implementation are highlighted.
Strategies ¶
To understand the different type of buckets in this store, you need to familiarize yourself with the following strategies. A strategy defines a different usecase for a Bucket.
"Replace"
Replace resembles the classical key-value store. Each key has exactly one value. A subsequent PUT on an an existing key, replaces the value (hence the name "replace"). Once replaced a former value can no longer be retrieved, and will eventually be removed in compactions.
"Set" (aka "SetCollection")
A set behaves like an unordered collection of independent values. In other words a single key has multiple values. For example, for key "foo", you could have values "bar1", "bar2", "bazzinga". A bucket of this type is optimized for cheap writes to add new set additions. For example adding another set element has a fixed cost independent of the number of the existing set length. This makes it very well suited for building an inverted index.
Retrieving a Set has a slight cost to it if a set is spread across multiple segments. This cost will eventually reduce as more and more compactions happen. In the ideal case (fully compacted DB), retrieving a Set requires just a single disk read.
"Map" (aka "MapCollection")
Maps are similar to Sets in the sense that for a single key there are multiple values. However, each value is in itself a key-value pair. This makes this type very similar to a dict or hashmap type. For example for key "foo", you could have value pairs: "bar":17, "baz":19.
This makes a map a great use case for an inverted index that needs to store additional info beyond just the docid-pointer, such as in the case of a BM25 index where the term frequency needs to be stored.
The same performance-considerations as for sets apply.
Navigate around these docs ¶
Good entrypoints to learn more about how this package works include Store with New and Store.CreateOrLoadBucket, as well as Bucket with Bucket.Get, Bucket.GetBySecondary, Bucket.Put, etc.
Each strategy also supports cursor types: CursorReplace can be created using Bucket.Cursor, CursorSet can be created with Bucket.SetCursor , and CursorMap can be created with Bucket.MapCursor.
LSMKV is weaviate's basic data store.
Index ¶
- Constants
- Variables
- func CheckExpectedStrategy(strategy string, expectedStrategies ...string) error
- func CheckStrategyRoaringSet(strategy string) error
- func CheckStrategyRoaringSetRange(strategy string) error
- func IsExpectedStrategy(strategy string, expectedStrategies ...string) bool
- func MustBeExpectedStrategy(strategy string, expectedStrategies ...string)
- func NewNullLogger() *logrus.Logger
- func ParseCollectionNode(r io.Reader) (segmentCollectionNode, error)
- func ParseCollectionNodeInto(r io.Reader, node *segmentCollectionNode) error
- func ParseReplaceNode(r io.Reader, secondaryIndexCount uint16) (segmentReplaceNode, error)
- func ParseReplaceNodeIntoMMAP(r *byteops.ReadWriter, secondaryIndexCount uint16, out *segmentReplaceNode) error
- func ParseReplaceNodeIntoPread(r io.Reader, secondaryIndexCount uint16, out *segmentReplaceNode) (err error)
- func SegmentStrategyFromString(in string) segmentindex.Strategy
- type Bucket
- func (b *Bucket) Count() int
- func (b *Bucket) CountAsync() int
- func (b *Bucket) Cursor() *CursorReplace
- func (b *Bucket) CursorRoaringSet() CursorRoaringSet
- func (b *Bucket) CursorRoaringSetKeyOnly() CursorRoaringSet
- func (b *Bucket) CursorRoaringSetRange() CursorRoaringSetRange
- func (b *Bucket) CursorWithSecondaryIndex(pos int) *CursorReplace
- func (b *Bucket) Delete(key []byte, opts ...SecondaryKeyOption) error
- func (b *Bucket) DesiredStrategy() string
- func (b *Bucket) FlushAndSwitch() error
- func (b *Bucket) FlushMemtable() error
- func (b *Bucket) Get(key []byte) ([]byte, error)
- func (b *Bucket) GetBySecondary(pos int, key []byte) ([]byte, error)
- func (b *Bucket) GetBySecondaryIntoMemory(pos int, key []byte, buffer []byte) ([]byte, []byte, error)
- func (b *Bucket) GetBySecondaryWithBuffer(pos int, key []byte, buf []byte) ([]byte, []byte, error)
- func (b *Bucket) GetDesiredStrategy() string
- func (b *Bucket) GetDir() string
- func (b *Bucket) GetErrDeleted(key []byte) ([]byte, error)
- func (b *Bucket) GetFlushCallbackCtrl() cyclemanager.CycleCallbackCtrl
- func (b *Bucket) GetMemtableThreshold() uint64
- func (b *Bucket) GetRootDir() string
- func (b *Bucket) GetSecondaryIndices() uint16
- func (b *Bucket) GetStatus() storagestate.Status
- func (b *Bucket) GetStrategy() string
- func (b *Bucket) GetWalThreshold() uint64
- func (b *Bucket) IterateMapObjects(ctx context.Context, f func([]byte, []byte, []byte, bool) error) error
- func (b *Bucket) IterateObjects(ctx context.Context, f func(object *storobj.Object) error) error
- func (b *Bucket) ListFiles(ctx context.Context, basePath string) ([]string, error)
- func (b *Bucket) MapCursor(cfgs ...MapListOption) *CursorMap
- func (b *Bucket) MapCursorKeyOnly(cfgs ...MapListOption) *CursorMap
- func (b *Bucket) MapDeleteKey(rowKey, mapKey []byte) error
- func (b *Bucket) MapList(ctx context.Context, key []byte, cfgs ...MapListOption) ([]MapPair, error)
- func (b *Bucket) MapSet(rowKey []byte, kv MapPair) error
- func (b *Bucket) MapSetMulti(rowKey []byte, kvs []MapPair) error
- func (*Bucket) NewBucket(ctx context.Context, dir, rootDir string, logger logrus.FieldLogger, ...) (*Bucket, error)
- func (b *Bucket) Put(key, value []byte, opts ...SecondaryKeyOption) error
- func (b *Bucket) QuantileKeys(q int) [][]byte
- func (b *Bucket) RoaringSetAddBitmap(key []byte, bm *sroar.Bitmap) error
- func (b *Bucket) RoaringSetAddList(key []byte, values []uint64) error
- func (b *Bucket) RoaringSetAddOne(key []byte, value uint64) error
- func (b *Bucket) RoaringSetGet(key []byte) (*sroar.Bitmap, error)
- func (b *Bucket) RoaringSetRangeAdd(key uint64, values ...uint64) error
- func (b *Bucket) RoaringSetRangeRemove(key uint64, values ...uint64) error
- func (b *Bucket) RoaringSetRemoveOne(key []byte, value uint64) error
- func (b *Bucket) SetAdd(key []byte, values [][]byte) error
- func (b *Bucket) SetCursor() *CursorSet
- func (b *Bucket) SetCursorKeyOnly() *CursorSet
- func (b *Bucket) SetDeleteSingle(key []byte, valueToDelete []byte) error
- func (b *Bucket) SetList(key []byte) ([][]byte, error)
- func (b *Bucket) SetMemtableThreshold(size uint64)
- func (b *Bucket) Shutdown(ctx context.Context) error
- func (b *Bucket) Strategy() string
- func (b *Bucket) UpdateStatus(status storagestate.Status)
- func (b *Bucket) WasDeleted(key []byte) (bool, error)
- func (b *Bucket) WriteWAL() error
- type BucketCreator
- type BucketOption
- func WithAllocChecker(mm memwatch.AllocChecker) BucketOption
- func WithCalcCountNetAdditions(calcCountNetAdditions bool) BucketOption
- func WithDirtyThreshold(threshold time.Duration) BucketOption
- func WithDynamicMemtableSizing(initialMB, maxMB, minActiveSeconds, maxActiveSeconds int) BucketOption
- func WithForceCompation(opt bool) BucketOption
- func WithKeepTombstones(keepTombstones bool) BucketOption
- func WithLegacyMapSorting() BucketOption
- func WithMaxSegmentSize(maxSegmentSize int64) BucketOption
- func WithMemtableThreshold(threshold uint64) BucketOption
- func WithMonitorCount() BucketOption
- func WithPread(with bool) BucketOption
- func WithSecondaryIndices(count uint16) BucketOption
- func WithStrategy(strategy string) BucketOption
- func WithUseBloomFilter(useBloomFilter bool) BucketOption
- func WithWalThreshold(threshold uint64) BucketOption
- type BucketReaderRoaringSetRange
- type CommitType
- type CursorMap
- type CursorReplace
- type CursorRoaringSet
- type CursorRoaringSetRange
- type CursorSet
- type MapListOption
- type MapListOptionConfig
- type MapPair
- type Memtable
- type ReplaceableBucket
- func (b *ReplaceableBucket) Compact()
- func (b *ReplaceableBucket) Count() int
- func (b *ReplaceableBucket) Delete(key []byte) error
- func (b *ReplaceableBucket) Exists(key []byte) bool
- func (b *ReplaceableBucket) Flush()
- func (b *ReplaceableBucket) Get(key []byte) ([]byte, error)
- func (b *ReplaceableBucket) Put(key []byte, value []byte) error
- func (b *ReplaceableBucket) Range(f func(object *storobj.Object) error) error
- func (b *ReplaceableBucket) Set(key []byte, value []byte) error
- func (b *ReplaceableBucket) Shutdown(ctx context.Context) error
- type RoaringBucket
- func (b *RoaringBucket) AddListToSet(key []byte, values []uint64) error
- func (b *RoaringBucket) AddToSet(key []byte, value uint64) error
- func (b *RoaringBucket) Compact()
- func (b *RoaringBucket) Count() int
- func (b *RoaringBucket) Cursor() CursorRoaringSet
- func (b *RoaringBucket) Delete(key []byte) error
- func (b *RoaringBucket) Exists(key []byte) bool
- func (b *RoaringBucket) Flush()
- func (b *RoaringBucket) Get(key []byte) (*sroar.Bitmap, error)
- func (b *RoaringBucket) Put(key []byte, bm *sroar.Bitmap) error
- func (b *RoaringBucket) Range(f func(key []byte, bm *sroar.Bitmap) error) error
- func (b *RoaringBucket) RemoveFromSet(key []byte, value uint64) error
- func (b *RoaringBucket) Set(key []byte, bm *sroar.Bitmap) error
- func (b *RoaringBucket) Shutdown(ctx context.Context) error
- type SecondaryKeyOption
- type SegmentGroup
- type Store
- func (s *Store) Bucket(name string) *Bucket
- func (s *Store) CreateBucket(ctx context.Context, bucketName string, opts ...BucketOption) error
- func (s *Store) CreateOrLoadBucket(ctx context.Context, bucketName string, opts ...BucketOption) (err error)
- func (s *Store) FlushMemtables(ctx context.Context) error
- func (s *Store) GetBucketsByName() map[string]*Bucket
- func (s *Store) ListFiles(ctx context.Context, basePath string) ([]string, error)
- func (s *Store) PauseCompaction(ctx context.Context) error
- func (s *Store) RenameBucket(ctx context.Context, bucketName, newBucketName string) error
- func (s *Store) ReplaceBuckets(ctx context.Context, bucketName, replacementBucketName string) error
- func (s *Store) ResumeCompaction(ctx context.Context) error
- func (s *Store) Shutdown(ctx context.Context) error
- func (s *Store) UpdateBucketsStatus(targetStatus storagestate.Status) error
- func (s *Store) WriteWALs() error
Constants ¶
const ( // StrategyReplace allows for idem-potent PUT where the latest takes presence StrategyReplace = "replace" StrategySetCollection = "setcollection" StrategyMapCollection = "mapcollection" StrategyRoaringSet = "roaringset" StrategyRoaringSetRange = "roaringsetrange" )
const CurrentVersion uint8 = 1
const FlushAfterDirtyDefault = 60 * time.Second
Variables ¶
var ErrAlreadyClosed = errors.New("store already closed")
var ErrBucketAlreadyRegistered = errors.New("bucket already registered")
var ErrInvalidChecksum = errors.New("invalid checksum")
ErrInvalidChecksum indicates that the read file should not be trusted. For any pre-computed data this is a recoverable issue, as the data can simply be re-computed at read-time.
var GlobalBucketRegistry *globalBucketRegistry
Functions ¶
func CheckExpectedStrategy ¶
func CheckStrategyRoaringSet ¶
func IsExpectedStrategy ¶
func MustBeExpectedStrategy ¶
func NewNullLogger ¶
NewNullLogger creates a discarding logger and installs the test hook.
func ParseCollectionNode ¶
ParseCollectionNode reads from r and parses the collection values into a segmentCollectionNode
When only given an offset, r is constructed as a *bufio.Reader to avoid first reading the entire segment (could be GBs). Each consecutive read will be buffered to avoid excessive syscalls.
When we already have a finite and manageable []byte (i.e. when we have already seeked to an lsmkv node and have start+end offset), r should be constructed as a *bytes.Reader, since the contents have already been `pread` from the segment contentFile.
func ParseCollectionNodeInto ¶
ParseCollectionNodeInto takes the []byte slice and parses it into the specified node. It does not perform any copies and the caller must be aware that memory may be shared between the two. As a result, the caller must make sure that they do not modify "in" while "node" is still in use. A safer alternative is to use ParseCollectionNode.
The primary intention of this function is to provide a way to reuse buffers when the lifetime is controlled tightly, for example in cursors used within compactions. Use at your own risk!
If the buffers of the provided node have enough capacity they will be reused. Only if the capacity is not enough, will an allocation occur. This allocation uses 25% overhead to avoid future allocations for nodes of similar size.
As a result calling this method only makes sense if you plan on calling it multiple times. Calling it just once on an uninitialized node does not have major advantages over calling ParseCollectionNode.
func ParseReplaceNode ¶
func ParseReplaceNodeIntoMMAP ¶
func ParseReplaceNodeIntoMMAP(r *byteops.ReadWriter, secondaryIndexCount uint16, out *segmentReplaceNode) error
func SegmentStrategyFromString ¶
func SegmentStrategyFromString(in string) segmentindex.Strategy
Types ¶
type Bucket ¶
type Bucket struct {
// contains filtered or unexported fields
}
func MustNewBucket ¶
func MustNewBucket(ctx context.Context, dir, rootDir string, logger logrus.FieldLogger, compactionCallbacks, flushCallbacks cyclemanager.CycleCallbackGroup, opts ...BucketOption) *Bucket
func NewBucketCreator ¶
func NewBucketCreator() *Bucket
func (*Bucket) CountAsync ¶
CountAsync ignores the current memtable, that makes it async because it only reflects what has been already flushed. This in turn makes it very cheap to call, so it can be used for observability purposes where eventual consistency on the count is fine, but a large cost is not.
func (*Bucket) Cursor ¶
func (b *Bucket) Cursor() *CursorReplace
Cursor holds a RLock for the flushing state. It needs to be closed using the .Close() methods or otherwise the lock will never be released
func (*Bucket) CursorRoaringSet ¶
func (b *Bucket) CursorRoaringSet() CursorRoaringSet
func (*Bucket) CursorRoaringSetKeyOnly ¶
func (b *Bucket) CursorRoaringSetKeyOnly() CursorRoaringSet
func (*Bucket) CursorRoaringSetRange ¶
func (b *Bucket) CursorRoaringSetRange() CursorRoaringSetRange
func (*Bucket) CursorWithSecondaryIndex ¶
func (b *Bucket) CursorWithSecondaryIndex(pos int) *CursorReplace
CursorWithSecondaryIndex holds a RLock for the flushing state. It needs to be closed using the .Close() methods or otherwise the lock will never be released
func (*Bucket) Delete ¶
func (b *Bucket) Delete(key []byte, opts ...SecondaryKeyOption) error
Delete removes the given row. Note that LSM stores are append only, thus internally this action appends a tombstone. The entry will not be removed until a compaction has run, and even then a compaction does not guarantee the removal of the data right away. This is because an entry could have been created in an older segment than those present in the compaction. This can be seen as an implementation detail, unless the caller expects to free disk space by calling this method. Such freeing is not guaranteed.
Delete is specific to the Replace Strategy. For Maps, you can use Bucket.MapDeleteKey to delete a single key-value pair, for Sets use Bucket.SetDeleteSingle to delete a single set element.
func (*Bucket) DesiredStrategy ¶
func (*Bucket) FlushAndSwitch ¶
FlushAndSwitch is typically called periodically and does not require manual calling, but there are some situations where this might be intended, such as in test scenarios or when a force flush is desired.
func (*Bucket) FlushMemtable ¶
FlushMemtable flushes any active memtable and returns only once the memtable has been fully flushed and a stable state on disk has been reached.
This is a preparatory stage for creating backups.
Method should be run only if flushCycle is not running (was not started, is stopped, or noop impl is provided)
func (*Bucket) Get ¶
Get retrieves the single value for the given key.
Get is specific to ReplaceStrategy and cannot be used with any of the other strategies. Use Bucket.SetList or Bucket.MapList instead.
Get uses the regular or "primary" key for an object. If a bucket has secondary indexes, use Bucket.GetBySecondary to retrieve an object using its secondary key
func (*Bucket) GetBySecondary ¶
GetBySecondary retrieves an object using one of its secondary keys. A bucket can have an infinite number of secondary keys. Specify the secondary key position as the first argument.
A real-life example of secondary keys is the Weaviate object store. Objects are stored with the user-facing ID as their primary key and with the doc-id (an ever-increasing uint64) as the secondary key.
Similar to Bucket.Get, GetBySecondary is limited to ReplaceStrategy. No equivalent exists for Set and Map, as those do not support secondary indexes.
func (*Bucket) GetBySecondaryIntoMemory ¶
func (b *Bucket) GetBySecondaryIntoMemory(pos int, key []byte, buffer []byte) ([]byte, []byte, error)
GetBySecondaryIntoMemory copies into the specified memory, and retrieves an object using one of its secondary keys. A bucket can have an infinite number of secondary keys. Specify the secondary key position as the first argument.
A real-life example of secondary keys is the Weaviate object store. Objects are stored with the user-facing ID as their primary key and with the doc-id (an ever-increasing uint64) as the secondary key.
Similar to Bucket.Get, GetBySecondary is limited to ReplaceStrategy. No equivalent exists for Set and Map, as those do not support secondary indexes.
func (*Bucket) GetBySecondaryWithBuffer ¶
GetBySecondaryWithBuffer is like Bucket.GetBySecondary, but also takes a buffer. It's in the response of the caller to pool the buffer, since the bucket does not know when the caller is done using it. The return bytes will likely point to the same memory that's part of the buffer. However, if the buffer is to small, a larger buffer may also be returned (second arg).
func (*Bucket) GetDesiredStrategy ¶
func (*Bucket) GetFlushCallbackCtrl ¶
func (b *Bucket) GetFlushCallbackCtrl() cyclemanager.CycleCallbackCtrl
func (*Bucket) GetMemtableThreshold ¶
func (*Bucket) GetRootDir ¶
func (*Bucket) GetSecondaryIndices ¶
func (*Bucket) GetStatus ¶
func (b *Bucket) GetStatus() storagestate.Status
func (*Bucket) GetStrategy ¶
func (*Bucket) GetWalThreshold ¶
func (*Bucket) IterateMapObjects ¶
func (*Bucket) IterateObjects ¶
func (*Bucket) ListFiles ¶
ListFiles lists all files that currently exist in the Bucket. The files are only in a stable state if the memtable is empty, and if compactions are paused. If one of those conditions is not given, it errors
func (*Bucket) MapCursor ¶
func (b *Bucket) MapCursor(cfgs ...MapListOption) *CursorMap
func (*Bucket) MapCursorKeyOnly ¶
func (b *Bucket) MapCursorKeyOnly(cfgs ...MapListOption) *CursorMap
func (*Bucket) MapDeleteKey ¶
MapDeleteKey removes one key-value pair from the given map row. Note that LSM stores are append only, thus internally this action appends a tombstone. The entry will not be removed until a compaction has run, and even then a compaction does not guarantee the removal of the data right away. This is because an entry could have been created in an older segment than those present in the compaction. This can be seen as an implementation detail, unless the caller expects to free disk space by calling this method. Such freeing is not guaranteed.
MapDeleteKey is specific to the Map Strategy. For Replace, you can use Bucket.Delete to delete the entire row, for Sets use Bucket.SetDeleteSingle to delete a single set element.
func (*Bucket) MapList ¶
MapList returns all map entries for a given row key. The order of map pairs has no specific meaning. For efficient merge operations, pair entries are stored sorted on disk, however that is an implementation detail and not a caller-facing guarantee.
MapList is specific to the Map strategy, for Sets use Bucket.SetList, for Replace use Bucket.Get.
func (*Bucket) MapSet ¶
MapSet writes one MapPair into the map for the given row key. It is agnostic of whether the row key already exists, as well as agnostic of whether the map key already exists. In both cases it will create the entry if it does not exist or override if it does.
Example to add a new MapPair:
pair := MapPair{Key: []byte("Jane"), Value: []byte("Backend")} err := bucket.MapSet([]byte("developers"), pair) if err != nil { /* do something */ }
MapSet is specific to the Map Strategy, for Replace use Bucket.Put, and for Set use Bucket.SetAdd instead.
func (*Bucket) MapSetMulti ¶
MapSetMulti is the same as Bucket.MapSet, except that it takes in multiple MapPair objects at the same time.
func (*Bucket) NewBucket ¶
func (*Bucket) NewBucket(ctx context.Context, dir, rootDir string, logger logrus.FieldLogger, compactionCallbacks, flushCallbacks cyclemanager.CycleCallbackGroup, opts ...BucketOption, ) (*Bucket, error)
NewBucket initializes a new bucket. It either loads the state from disk if it exists, or initializes new state.
You do not need to ever call NewBucket() yourself, if you are using a Store. In this case the Store can manage buckets for you, using methods such as CreateOrLoadBucket().
func (*Bucket) Put ¶
func (b *Bucket) Put(key, value []byte, opts ...SecondaryKeyOption) error
Put creates or replaces a single value for a given key.
err := bucket.Put([]byte("my_key"), []byte("my_value")) if err != nil { /* do something */ }
If a bucket has a secondary index configured, you can also specify one or more secondary keys, like so:
err := bucket.Put([]byte("my_key"), []byte("my_value"), WithSecondaryKey(0, []byte("my_alternative_key")), ) if err != nil { /* do something */ }
Put is limited to ReplaceStrategy, use Bucket.SetAdd for Set or Bucket.MapSet and Bucket.MapSetMulti.
func (*Bucket) QuantileKeys ¶
QuantileKeys returns an approximation of the keys that make up the specified quantiles. This can be used to start parallel cursors at fairly evenly distributed positions in the segment.
To understand the approximation, checkout [lsmkv.segmentindex.DiskTree.QuantileKeys] that runs on each segment.
Some things to keep in mind:
- It may return fewer keys than requested (including 0) if the segment contains fewer entries
- It may return keys that do not exist, for example because they are tombstoned. This is acceptable, as a key does not have to exist to be used as part of .Seek() in a cursor.
- It will never return duplicates, to make sure all parallel cursors return unique values.
func (*Bucket) RoaringSetAddBitmap ¶
func (*Bucket) RoaringSetAddList ¶
func (*Bucket) RoaringSetRangeAdd ¶
func (*Bucket) RoaringSetRangeRemove ¶
func (*Bucket) RoaringSetRemoveOne ¶
func (*Bucket) SetAdd ¶
SetAdd adds one or more Set-Entries to a Set for the given key. SetAdd is entirely agnostic of existing entries, it acts as append-only. This also makes it agnostic of whether the key already exists or not.
Example to add two entries to a set:
err := bucket.SetAdd([]byte("my_key"), [][]byte{ []byte("one-set-element"), []byte("another-set-element"), }) if err != nil { /* do something */ }
SetAdd is specific to the Set strategy. For Replace, use Bucket.Put, for Map use either Bucket.MapSet or Bucket.MapSetMulti.
func (*Bucket) SetCursor ¶
SetCursor holds a RLock for the flushing state. It needs to be closed using the .Close() methods or otherwise the lock will never be released
func (*Bucket) SetCursorKeyOnly ¶
SetCursorKeyOnly returns nil for all values. It has no control over the underlying "inner" cursors which may still retrieve a value which is then discarded. It does however, omit any handling of values, such as decoding, making this considerably more efficient if only keys are required.
The same locking rules as for SetCursor apply.
func (*Bucket) SetDeleteSingle ¶
SetDeleteSingle removes one Set element from the given key. Note that LSM stores are append only, thus internally this action appends a tombstone. The entry will not be removed until a compaction has run, and even then a compaction does not guarantee the removal of the data right away. This is because an entry could have been created in an older segment than those present in the compaction. This can be seen as an implementation detail, unless the caller expects to free disk space by calling this method. Such freeing is not guaranteed.
SetDeleteSingle is specific to the Set Strategy. For Replace, you can use Bucket.Delete to delete the entire row, for Maps use Bucket.MapDeleteKey to delete a single map entry.
func (*Bucket) SetList ¶
SetList returns all Set entries for a given key.
SetList is specific to the Set Strategy, for Map use Bucket.MapList, and for Replace use Bucket.Get.
func (*Bucket) SetMemtableThreshold ¶
func (*Bucket) UpdateStatus ¶
func (b *Bucket) UpdateStatus(status storagestate.Status)
UpdateStatus is used by the parent shard to communicate to the bucket when the shard has been set to readonly, or when it is ready for writes.
func (*Bucket) WasDeleted ¶
WasDeleted determines if an object used to exist in the LSM store
There are 3 different locations that we need to check for the key in this order: active memtable, flushing memtable, and disk segment
func (*Bucket) WriteWAL ¶
the WAL uses a buffer and isn't written until the buffer size is crossed or this function explicitly called. This allows to avoid unnecessary disk writes in larger operations, such as batches. It is sufficient to call write on the WAL just once. This does not make a batch atomic, but it guarantees that the WAL is written before a successful response is returned to the user.
type BucketCreator ¶
type BucketCreator interface { NewBucket(ctx context.Context, dir, rootDir string, logger logrus.FieldLogger, compactionCallbacks, flushCallbacks cyclemanager.CycleCallbackGroup, opts ...BucketOption, ) (*Bucket, error) }
type BucketOption ¶
func WithAllocChecker ¶
func WithAllocChecker(mm memwatch.AllocChecker) BucketOption
func WithCalcCountNetAdditions ¶
func WithCalcCountNetAdditions(calcCountNetAdditions bool) BucketOption
func WithDirtyThreshold ¶
func WithDirtyThreshold(threshold time.Duration) BucketOption
func WithDynamicMemtableSizing ¶
func WithDynamicMemtableSizing( initialMB, maxMB, minActiveSeconds, maxActiveSeconds int, ) BucketOption
func WithForceCompation ¶
func WithForceCompation(opt bool) BucketOption
Background for this option:
We use the LSM store in two places: Our existing key/value and inverted buckets As part of the new brute-force based index (to be built this week).
Brute-force index This is a simple disk-index where we use a cursor to iterate over all objects. This is what we need the force-compaction for. The experimentation so far has shown that the cursor is much more performant on a single segment than it is on multiple segments. This is because with a single segment it’s essentially just one conitiguuous chunk of data on disk that we read through. But with multiple segments (and an unpredicatable order) it ends up being many tiny reads (inefficient). Existing uses of the LSM store For existing uses, e.g. the object store, we don’t want to force-compact. This is because they can grow massive. For example, you could have a 100GB segment, then a new write leads to a new segment that is just a few bytes. If we would force-compact those two we would write 100GB every time the user sends a few bytes to Weaviate. In this case, the existing tiered compaction strategy makes more sense. Configurability of buckets
func WithKeepTombstones ¶
func WithKeepTombstones(keepTombstones bool) BucketOption
func WithLegacyMapSorting ¶
func WithLegacyMapSorting() BucketOption
func WithMaxSegmentSize ¶
func WithMaxSegmentSize(maxSegmentSize int64) BucketOption
func WithMemtableThreshold ¶
func WithMemtableThreshold(threshold uint64) BucketOption
func WithMonitorCount ¶
func WithMonitorCount() BucketOption
func WithPread ¶
func WithPread(with bool) BucketOption
func WithSecondaryIndices ¶
func WithSecondaryIndices(count uint16) BucketOption
func WithStrategy ¶
func WithStrategy(strategy string) BucketOption
func WithUseBloomFilter ¶
func WithUseBloomFilter(useBloomFilter bool) BucketOption
func WithWalThreshold ¶
func WithWalThreshold(threshold uint64) BucketOption
type BucketReaderRoaringSetRange ¶
type BucketReaderRoaringSetRange struct {
// contains filtered or unexported fields
}
func NewBucketReaderRoaringSetRange ¶
func NewBucketReaderRoaringSetRange(cursorFn func() CursorRoaringSetRange, logger logrus.FieldLogger, ) *BucketReaderRoaringSetRange
type CommitType ¶
type CommitType uint8
const ( CommitTypeReplace CommitType = iota // replace strategy // collection strategy - this can handle all cases as updates and deletes are // only appends in a collection strategy CommitTypeCollection CommitTypeRoaringSet // new version of roaringset that stores data as a list of uint64 values, // instead of a roaring bitmap CommitTypeRoaringSetList )
func (CommitType) Is ¶
func (ct CommitType) Is(checkedCommitType CommitType) bool
func (CommitType) String ¶
func (ct CommitType) String() string
type CursorReplace ¶
type CursorReplace struct {
// contains filtered or unexported fields
}
func (*CursorReplace) Close ¶
func (c *CursorReplace) Close()
func (*CursorReplace) First ¶
func (c *CursorReplace) First() ([]byte, []byte)
func (*CursorReplace) Next ¶
func (c *CursorReplace) Next() ([]byte, []byte)
type CursorRoaringSet ¶
type CursorRoaringSetRange ¶
type MapListOption ¶
type MapListOption func(c *MapListOptionConfig)
func MapListAcceptDuplicates ¶
func MapListAcceptDuplicates() MapListOption
func MapListLegacySortingRequired ¶
func MapListLegacySortingRequired() MapListOption
type MapListOptionConfig ¶
type MapListOptionConfig struct {
// contains filtered or unexported fields
}
type Memtable ¶
func (*Memtable) ActiveDuration ¶
func (*Memtable) DirtyDuration ¶
returns time memtable got dirty (1st write occurred) (0 if clean)
type ReplaceableBucket ¶
type ReplaceableBucket struct {
// contains filtered or unexported fields
}
ReplaceableBucket is the "traditional" kv store. You can set and get key-value pairs.
func NewReplaceableBucket ¶
func NewReplaceableBucket(dir string) *ReplaceableBucket
func (*ReplaceableBucket) Compact ¶
func (b *ReplaceableBucket) Compact()
Lsmkv creates temporary files to rapidly store incoming data. Compact merges the files, which makes them quicker and more efficient to query. This must be called explicitly, it does not run in the background, automatically. It is thread safe, you can start your own background thread to call it.
func (*ReplaceableBucket) Count ¶
func (b *ReplaceableBucket) Count() int
Count the number of keys in the store.
func (*ReplaceableBucket) Delete ¶
func (b *ReplaceableBucket) Delete(key []byte) error
Delete a key from the store.
func (*ReplaceableBucket) Exists ¶
func (b *ReplaceableBucket) Exists(key []byte) bool
Check if a key exists in the store.
func (*ReplaceableBucket) Flush ¶
func (b *ReplaceableBucket) Flush()
func (*ReplaceableBucket) Get ¶
func (b *ReplaceableBucket) Get(key []byte) ([]byte, error)
Get a value from the store.
func (*ReplaceableBucket) Put ¶
func (b *ReplaceableBucket) Put(key []byte, value []byte) error
Set a value in the store. If the key already exists, it will be overwritten.
func (*ReplaceableBucket) Range ¶
func (b *ReplaceableBucket) Range(f func(object *storobj.Object) error) error
Iterate over all the keys in the store.
type RoaringBucket ¶
type RoaringBucket struct {
// contains filtered or unexported fields
}
RoaringBucket is a kv store that uses roaring bitmaps to store sets of uint64. You can add and remove values from the set, and check if a value is in the set. Roaring bitmaps are fast and memory efficient.
func NewRoaringBucket ¶
func NewRoaringBucket(dir string) *RoaringBucket
Create a new roaring set bucket.
func (*RoaringBucket) AddListToSet ¶
func (b *RoaringBucket) AddListToSet(key []byte, values []uint64) error
Add a list of values to a roaring bitmap in the store. If the set does not already exist, it will be created.
func (*RoaringBucket) AddToSet ¶
func (b *RoaringBucket) AddToSet(key []byte, value uint64) error
Add a value to a roaring bitmap in the store. If the set does not already exist, it will be created.
func (*RoaringBucket) Compact ¶
func (b *RoaringBucket) Compact()
Lsmkv creates temporary files to rapidly store incoming data. Compact merges the files, which makes them quicker and more efficient to query. This must be called explicitly, it does not run in the background, automatically. It is thread safe, you can start your own background thread to call it.
func (*RoaringBucket) Count ¶
func (b *RoaringBucket) Count() int
func (*RoaringBucket) Cursor ¶
func (b *RoaringBucket) Cursor() CursorRoaringSet
func (*RoaringBucket) Delete ¶
func (b *RoaringBucket) Delete(key []byte) error
func (*RoaringBucket) Exists ¶
func (b *RoaringBucket) Exists(key []byte) bool
func (*RoaringBucket) Flush ¶
func (b *RoaringBucket) Flush()
func (*RoaringBucket) Get ¶
func (b *RoaringBucket) Get(key []byte) (*sroar.Bitmap, error)
Get a roaring bitmap from the store. You will then have to work with the roaring bitmap to get the values.
func (*RoaringBucket) Put ¶
func (b *RoaringBucket) Put(key []byte, bm *sroar.Bitmap) error
Set a roaring bitmap in the store. If the key already exists, it will be overwritten.
func (*RoaringBucket) RemoveFromSet ¶
func (b *RoaringBucket) RemoveFromSet(key []byte, value uint64) error
Remove a value from a roaring bitmap in the store.
type SecondaryKeyOption ¶
type SecondaryKeyOption func(s secondaryIndexKeys) error
func WithSecondaryKey ¶
func WithSecondaryKey(pos int, key []byte) SecondaryKeyOption
type SegmentGroup ¶
type SegmentGroup struct {
// contains filtered or unexported fields
}
func (*SegmentGroup) Len ¶
func (sg *SegmentGroup) Len() int
func (*SegmentGroup) UpdateStatus ¶
func (sg *SegmentGroup) UpdateStatus(status storagestate.Status)
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store groups multiple buckets together, it "owns" one folder on the file system
func New ¶
func New(dir, rootDir string, logger logrus.FieldLogger, shardCompactionCallbacks, shardFlushCallbacks cyclemanager.CycleCallbackGroup, ) (*Store, error)
New initializes a new Store based on the root dir. If state is present on disk, it is loaded, if the folder is empty a new store is initialized in there.
func (*Store) CreateBucket ¶
Creates bucket, first removing any files if already exist Bucket can not be registered in bucketsByName before removal
func (*Store) CreateOrLoadBucket ¶
func (s *Store) CreateOrLoadBucket(ctx context.Context, bucketName string, opts ...BucketOption, ) (err error)
CreateOrLoadBucket registers a bucket with the given name. If state on disk exists for this bucket it is loaded, otherwise created. Pass [BucketOptions] to configure the strategy of a bucket. The strategy defaults to "replace". For example, to load or create a map-type bucket, do:
ctx := context.Background() err := store.CreateOrLoadBucket(ctx, "my_bucket_name", WithStrategy(StrategyReplace)) if err != nil { /* handle error */ } // you can now access the bucket using store.Bucket() b := store.Bucket("my_bucket_name")
func (*Store) FlushMemtables ¶
FlushMemtable flushes any active memtable and returns only once the memtable has been fully flushed and a stable state on disk has been reached.
This is a preparatory stage for creating backups.
A timeout should be specified for the input context as some flushes are long-running, in which case it may be better to fail the backup attempt and retry later, than to block indefinitely.
func (*Store) GetBucketsByName ¶
func (*Store) PauseCompaction ¶
PauseCompaction waits for all ongoing compactions to finish, then makes sure that no new compaction can be started.
This is a preparatory stage for creating backups.
A timeout should be specified for the input context as some compactions are long-running, in which case it may be better to fail the backup attempt and retry later, than to block indefinitely.
func (*Store) RenameBucket ¶
func (*Store) ReplaceBuckets ¶
Replaces 1st bucket with 2nd one. Both buckets have to registered in bucketsByName. 2nd bucket swaps the 1st one in bucketsByName using 1st one's name, 2nd one's name is deleted. Dir path of 2nd bucket is changed to dir of 1st bucket as well as all other related paths of bucket resources (segment group, memtables, commit log). Dir path of 1st bucket is temporarily suffixed with "___del", later on bucket is shutdown and its files deleted. 2nd bucket becomes 1st bucket
func (*Store) ResumeCompaction ¶
ResumeCompaction starts the compaction cycle again. It errors if compactions were not paused
func (*Store) UpdateBucketsStatus ¶
func (s *Store) UpdateBucketsStatus(targetStatus storagestate.Status) error
Source Files ¶
- binary_search_tree.go
- binary_search_tree_map.go
- binary_search_tree_multi.go
- bucket.go
- bucket_backup.go
- bucket_options.go
- bucket_reader_roaring_set_range.go
- bucket_recover_from_wal.go
- bucket_roaring_set.go
- bucket_roaring_set_range.go
- commitlogger.go
- commitlogger_parser.go
- commitlogger_parser_collection.go
- commitlogger_parser_replace.go
- commitlogger_parser_roaring_set.go
- commitlogger_parser_roaring_set_range.go
- compactor_map.go
- compactor_map_reusable_pairs.go
- compactor_replace.go
- compactor_set.go
- cursor_bucket_map.go
- cursor_bucket_replace.go
- cursor_bucket_roaring_set.go
- cursor_bucket_roaring_set_range.go
- cursor_bucket_set.go
- cursor_memtable_collection.go
- cursor_memtable_map.go
- cursor_memtable_replace.go
- cursor_memtable_roaring_set.go
- cursor_memtable_roaring_set_range.go
- cursor_segment_collection.go
- cursor_segment_collection_reusable.go
- cursor_segment_map.go
- cursor_segment_replace.go
- cursor_segment_roaring_set.go
- cursor_segment_roaring_set_range.go
- doc.go
- file_utils.go
- global_bucket_registry.go
- memtable.go
- memtable_flush.go
- memtable_flush_roaring_set.go
- memtable_flush_roaring_set_range.go
- memtable_roaring_set.go
- memtable_roaring_set_range.go
- memtable_size_advisor.go
- quantile_keys.go
- segment.go
- segment_bloom_filters.go
- segment_collection_strategy.go
- segment_group.go
- segment_group_compaction.go
- segment_key_and_tombstone_extractor.go
- segment_net_count_additions.go
- segment_precompute_for_compaction.go
- segment_replace_strategy.go
- segment_roaring_set_strategy.go
- segment_serialization.go
- store.go
- store_backup.go
- store_cyclecallbacks.go
- strategies.go
- strategies_map.go
- strategies_map_sorted_merger.go
- strategies_set.go
- typed_buckets.go
Directories ¶
Path | Synopsis |
---|---|
ent contains common types used throughout various lsmkv (sub-)packages
|
ent contains common types used throughout various lsmkv (sub-)packages |
Package roaringset contains all the LSM business logic that is unique to the "RoaringSet" strategy
|
Package roaringset contains all the LSM business logic that is unique to the "RoaringSet" strategy |