db

package
v0.0.0-...-f09cf9b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2025 License: BSD-3-Clause Imports: 101 Imported by: 0

Documentation

Overview

Some standard accessors for the shard struct. It is important to NEVER access the shard struct directly, because we lazy load shards, so the information might not be there.

Index

Constants

View Source
const (
	ShardCodeBaseVersion                  = uint16(2)
	ShardCodeBaseMinimumVersionForStartup = uint16(1)
)

ShardCodeBaseVersion must be increased whenever there are breaking changes - including those that we can handle in a non-breaking way the version checker can then decide on init if it should prevent startup completely. If it does not prevent startup, but there is still a version mismatch, the version can be used to make specific decisions

CHANGELOG

  • Version 1 - Everything up until Weaviate v1.10.1 inclusive
  • Version 2 - Inverted Index is now stored in an always sorted fashion and doc ids are stored as BigEndian. To make this backward-compatible with v1, doc ids need to be read and written as Little Endian. In addition, an additional sort step is required in three places: during a MapList call, during a Map Cursor and during Map Compactions. BM25 is entirely disabled prior to this version
View Source
const IdLockPoolSize = 128

Variables

View Source
var (
	ErrShardNotFound = errors.New("shard not found")
)

Functions

func CombineMultiTargetResults

func CombineMultiTargetResults(ctx context.Context, shard DistanceForVector, logger logrus.FieldLogger, results [][]uint64, dists [][]float32, targetVectors []string, searchVectors []models.Vector, targetCombination *dto.TargetCombination, limit int, targetDist float32) ([]uint64, []float32, error)

func NewAtomicInt64

func NewAtomicInt64(val int64) *atomic.Int64

Types

type BackupState

type BackupState struct {
	BackupID   string
	InProgress bool
}

type Config

type Config struct {
	RootPath                       string
	QueryLimit                     int64
	QueryMaximumResults            int64
	QueryNestedRefLimit            int64
	ResourceUsage                  config.ResourceUsage
	MaxImportGoroutinesFactor      float64
	MemtablesFlushDirtyAfter       int
	MemtablesInitialSizeMB         int
	MemtablesMaxSizeMB             int
	MemtablesMinActiveSeconds      int
	MemtablesMaxActiveSeconds      int
	SegmentsCleanupIntervalSeconds int
	SeparateObjectsCompactions     bool
	MaxSegmentSize                 int64
	HNSWMaxLogSize                 int64
	HNSWWaitForCachePrefill        bool
	HNSWFlatSearchConcurrency      int
	VisitedListPoolMaxSize         int
	TrackVectorDimensions          bool
	ServerVersion                  string
	GitHash                        string
	AvoidMMap                      bool
	DisableLazyLoadShards          bool
	ForceFullReplicasSearch        bool
	Replication                    replication.GlobalConfig
}

type DB

type DB struct {
	// contains filtered or unexported fields
}

func New

func New(logger logrus.FieldLogger, config Config,
	remoteIndex sharding.RemoteIndexClient, nodeResolver nodeResolver,
	remoteNodesClient sharding.RemoteNodeClient, replicaClient replica.Client,
	promMetrics *monitoring.PrometheusMetrics, memMonitor *memwatch.Monitor,
) (*DB, error)

func (*DB) AbortReplication

func (db *DB) AbortReplication(class,
	shard, requestID string,
) interface{}

func (*DB) AddBatchReferences

func (db *DB) AddBatchReferences(ctx context.Context, references objects.BatchReferences,
	repl *additional.ReplicationProperties, schemaVersion uint64,
) (objects.BatchReferences, error)

func (*DB) AddReference

func (db *DB) AddReference(ctx context.Context, source *crossref.RefSource, target *crossref.Ref,
	repl *additional.ReplicationProperties, tenant string, schemaVersion uint64,
) error

func (*DB) Aggregate

func (db *DB) Aggregate(ctx context.Context,
	params aggregation.Params, modules *modules.Provider,
) (*aggregation.Result, error)

func (*DB) AggregateNeighbors

func (db *DB) AggregateNeighbors(ctx context.Context, vector []float32,
	class string, properties []string, k int,
	filter *libfilters.LocalFilter,
) ([]classification.NeighborRef, error)

TODO: why is this logic in the persistence package? This is business-logic, move out of here!

func (*DB) BackupDescriptors

func (db *DB) BackupDescriptors(ctx context.Context, bakid string, classes []string,
) <-chan backup.ClassDescriptor

BackupDescriptors returns a channel of class descriptors. Class descriptor records everything needed to restore a class If an error happens a descriptor with an error will be written to the channel just before closing it.

func (*DB) Backupable

func (db *DB) Backupable(ctx context.Context, classes []string) error

Backupable returns whether all given class can be backed up.

func (*DB) BatchDeleteObjects

func (db *DB) BatchDeleteObjects(ctx context.Context, params objects.BatchDeleteParams,
	deletionTime time.Time, repl *additional.ReplicationProperties, tenant string, schemaVersion uint64,
) (objects.BatchDeleteResult, error)

func (*DB) BatchPutObjects

func (db *DB) BatchPutObjects(ctx context.Context, objs objects.BatchObjects,
	repl *additional.ReplicationProperties, schemaVersion uint64,
) (objects.BatchObjects, error)

func (*DB) ClassExists

func (db *DB) ClassExists(name string) bool

func (*DB) CommitReplication

func (db *DB) CommitReplication(class,
	shard, requestID string,
) interface{}

func (*DB) CrossClassVectorSearch

func (db *DB) CrossClassVectorSearch(ctx context.Context, vector models.Vector, targetVector string, offset, limit int,
	filters *filters.LocalFilter,
) ([]search.Result, error)

func (*DB) DeleteIndex

func (db *DB) DeleteIndex(className schema.ClassName) error

DeleteIndex deletes the index

func (*DB) DeleteObject

func (db *DB) DeleteObject(ctx context.Context, class string, id strfmt.UUID,
	deletionTime time.Time, repl *additional.ReplicationProperties, tenant string, schemaVersion uint64,
) error

DeleteObject from of a specific class giving its ID

func (*DB) Exists

func (db *DB) Exists(ctx context.Context, class string, id strfmt.UUID,
	repl *additional.ReplicationProperties, tenant string,
) (bool, error)

func (*DB) GetConfig

func (db *DB) GetConfig() Config

func (*DB) GetIndex

func (db *DB) GetIndex(className schema.ClassName) *Index

GetIndex returns the index if it exists or nil if it doesn't by default it will retry 3 times between 0-150 ms to get the index to handle the eventual consistency.

func (*DB) GetIndexForIncomingReplica

func (db *DB) GetIndexForIncomingReplica(className schema.ClassName) replica.RemoteIndexIncomingRepo

GetIndexForIncomingReplica returns the index if it exists or nil if it doesn't by default it will retry 3 times between 0-150 ms to get the index to handle the eventual consistency.

func (*DB) GetIndexForIncomingSharding

func (db *DB) GetIndexForIncomingSharding(className schema.ClassName) sharding.RemoteIndexIncomingRepo

GetIndexForIncomingSharding returns the index if it exists or nil if it doesn't by default it will retry 3 times between 0-150 ms to get the index to handle the eventual consistency.

func (*DB) GetNodeStatistics

func (db *DB) GetNodeStatistics(ctx context.Context) ([]*models.Statistics, error)

func (*DB) GetNodeStatus

func (db *DB) GetNodeStatus(ctx context.Context, className string, verbosity string) ([]*models.NodeStatus, error)

GetNodeStatus returns the status of all Weaviate nodes.

func (*DB) GetQueryMaximumResults

func (db *DB) GetQueryMaximumResults() int

func (*DB) GetRemoteIndex

func (db *DB) GetRemoteIndex() sharding.RemoteIndexClient

func (*DB) GetScheduler

func (db *DB) GetScheduler() *queue.Scheduler

func (*DB) GetSchema

func (db *DB) GetSchema() schema.Schema

func (*DB) GetSchemaGetter

func (db *DB) GetSchemaGetter() schemaUC.SchemaGetter

func (*DB) GetUnclassified

func (db *DB) GetUnclassified(ctx context.Context, className string,
	properties []string, propsToReturn []string, filter *libfilters.LocalFilter,
) ([]search.Result, error)

TODO: why is this logic in the persistence package? This is business-logic, move out of here!

func (*DB) IncomingGetNodeStatistics

func (db *DB) IncomingGetNodeStatistics() (*models.Statistics, error)

func (*DB) IncomingGetNodeStatus

func (db *DB) IncomingGetNodeStatus(ctx context.Context, className, verbosity string) (*models.NodeStatus, error)

IncomingGetNodeStatus returns the index if it exists or nil if it doesn't

func (*DB) IndexExists

func (db *DB) IndexExists(className schema.ClassName) bool

IndexExists returns if an index exists

func (*DB) ListBackupable

func (db *DB) ListBackupable() []string

ListBackupable returns a list of all classes which can be backed up.

func (*DB) ListClasses

func (db *DB) ListClasses(ctx context.Context) []string

func (*DB) LocalNodeStatus

func (db *DB) LocalNodeStatus(ctx context.Context, className, output string) *models.NodeStatus

func (*DB) LocalTenantActivity

func (db *DB) LocalTenantActivity() tenantactivity.ByCollection

func (*DB) Merge

func (db *DB) Merge(ctx context.Context, merge objects.MergeDocument,
	repl *additional.ReplicationProperties, tenant string, schemaVersion uint64,
) error

func (*DB) MultiGet

func (db *DB) MultiGet(ctx context.Context, query []multi.Identifier,
	additional additional.Properties, tenant string,
) ([]search.Result, error)

func (*DB) Object

Object gets object with id from index of specified class.

func (*DB) ObjectByID

func (db *DB) ObjectByID(ctx context.Context, id strfmt.UUID,
	props search.SelectProperties, additional additional.Properties,
	tenant string,
) (*search.Result, error)

ObjectByID checks every index of the particular kind for the ID

@warning: this function is deprecated by Object()

func (*DB) ObjectSearch

func (db *DB) ObjectSearch(ctx context.Context, offset, limit int,
	filters *filters.LocalFilter, sort []filters.Sort,
	additional additional.Properties, tenant string,
) (search.Results, error)

ObjectSearch search each index. Deprecated by Query which searches a specific index

func (*DB) ObjectsByID

func (db *DB) ObjectsByID(ctx context.Context, id strfmt.UUID,
	props search.SelectProperties, additional additional.Properties,
	tenant string,
) (search.Results, error)

ObjectsByID checks every index of the particular kind for the ID this method is only used for Explore queries where we don't have a class context

func (*DB) PutObject

func (db *DB) PutObject(ctx context.Context, obj *models.Object,
	vector []float32, vectors map[string][]float32, multivectors map[string][][]float32,
	repl *additional.ReplicationProperties,
	schemaVersion uint64,
) error

func (*DB) Query

func (db *DB) Query(ctx context.Context, q *objects.QueryInput) (search.Results, *objects.Error)

Query a specific class

func (*DB) ReleaseBackup

func (db *DB) ReleaseBackup(ctx context.Context, bakID, class string) (err error)

ReleaseBackup release resources acquired by the index during backup

func (*DB) ReplicateDeletion

func (db *DB) ReplicateDeletion(ctx context.Context, class,
	shard, requestID string, uuid strfmt.UUID, deletionTime time.Time,
) replica.SimpleResponse

func (*DB) ReplicateDeletions

func (db *DB) ReplicateDeletions(ctx context.Context, class,
	shard, requestID string, uuids []strfmt.UUID, deletionTime time.Time, dryRun bool, schemaVersion uint64,
) replica.SimpleResponse

func (*DB) ReplicateObject

func (db *DB) ReplicateObject(ctx context.Context, class,
	shard, requestID string, object *storobj.Object,
) replica.SimpleResponse

func (*DB) ReplicateObjects

func (db *DB) ReplicateObjects(ctx context.Context, class,
	shard, requestID string, objects []*storobj.Object, schemaVersion uint64,
) replica.SimpleResponse

func (*DB) ReplicateReferences

func (db *DB) ReplicateReferences(ctx context.Context, class,
	shard, requestID string, refs []objects.BatchReference,
) replica.SimpleResponse

func (*DB) ReplicateUpdate

func (db *DB) ReplicateUpdate(ctx context.Context, class,
	shard, requestID string, mergeDoc *objects.MergeDocument,
) replica.SimpleResponse

func (*DB) ResolveReferences

func (db *DB) ResolveReferences(ctx context.Context, objs search.Results,
	props search.SelectProperties, groupBy *searchparams.GroupBy,
	addl additional.Properties, tenant string,
) (search.Results, error)

ResolveReferences takes a list of search results and enriches them with any referenced objects

func (*DB) Search

func (db *DB) Search(ctx context.Context, params dto.GetParams) ([]search.Result, error)

func (*DB) SetSchemaGetter

func (db *DB) SetSchemaGetter(sg schemaUC.SchemaGetter)

func (*DB) Shards

func (db *DB) Shards(ctx context.Context, class string) ([]string, error)

Returns the list of nodes where shards of class are contained. If there are no shards for the class, returns an empty list If there are shards for the class but no nodes are found, return an error

func (*DB) ShardsBackup

func (db *DB) ShardsBackup(
	ctx context.Context, bakID, class string, shards []string,
) (_ backup.ClassDescriptor, err error)

func (*DB) Shutdown

func (db *DB) Shutdown(ctx context.Context) error

func (*DB) SparseObjectSearch

func (db *DB) SparseObjectSearch(ctx context.Context, params dto.GetParams) ([]*storobj.Object, []float32, error)

SparseObjectSearch is used to perform an inverted index search on the db

Earlier use cases required only []search.Result as a return value from the db, and the Class ClassSearch method fit this need. Later on, other use cases presented the need for the raw storage objects, such as hybrid search.

func (*DB) StartupComplete

func (db *DB) StartupComplete() bool

func (*DB) VectorSearch

func (db *DB) VectorSearch(ctx context.Context,
	params dto.GetParams, targetVectors []string, searchVectors []models.Vector,
) ([]search.Result, error)

func (*DB) WaitForStartup

func (db *DB) WaitForStartup(ctx context.Context) error

func (*DB) ZeroShotSearch

func (db *DB) ZeroShotSearch(ctx context.Context, vector []float32,
	class string, properties []string,
	filter *libfilters.LocalFilter,
) ([]search.Result, error)

TODO: why is this logic in the persistence package? This is business-logic, move out of here!

type DimensionCategory

type DimensionCategory int
const (
	DimensionCategoryStandard DimensionCategory = iota
	DimensionCategoryPQ
	DimensionCategoryBQ
)

type DistanceForVector

type DistanceForVector interface {
	VectorDistanceForQuery(ctx context.Context, id uint64, searchVectors []models.Vector, targets []string) ([]float32, error)
}

type Index

type Index struct {
	Config IndexConfig
	// contains filtered or unexported fields
}

Index is the logical unit which contains all the data for one particular class. An index can be further broken up into self-contained units, called Shards, to allow for easy distribution across Nodes

func NewIndex

func NewIndex(ctx context.Context, cfg IndexConfig,
	shardState *sharding.State, invertedIndexConfig schema.InvertedIndexConfig,
	vectorIndexUserConfig schemaConfig.VectorIndexConfig,
	vectorIndexUserConfigs map[string]schemaConfig.VectorIndexConfig,
	sg schemaUC.SchemaGetter,
	cs inverted.ClassSearcher, logger logrus.FieldLogger,
	nodeResolver nodeResolver, remoteClient sharding.RemoteIndexClient,
	replicaClient replica.Client,
	promMetrics *monitoring.PrometheusMetrics, class *models.Class, jobQueueCh chan job,
	scheduler *queue.Scheduler,
	indexCheckpoints *indexcheckpoint.Checkpoints,
	allocChecker memwatch.AllocChecker,
) (*Index, error)

NewIndex creates an index with the specified amount of shards, using only the shards that are local to a node

func (*Index) AbortReplication

func (i *Index) AbortReplication(shard, requestID string) interface{}

func (*Index) AddReferencesBatch

func (i *Index) AddReferencesBatch(ctx context.Context, refs objects.BatchReferences,
	replProps *additional.ReplicationProperties, schemaVersion uint64,
) []error

return value map[int]error gives the error for the index as it received it

func (*Index) CommitReplication

func (i *Index) CommitReplication(shard, requestID string) interface{}

func (*Index) DebugRepairIndex

func (i *Index) DebugRepairIndex(ctx context.Context, shardName, targetVector string) error

func (*Index) DebugResetVectorIndex

func (i *Index) DebugResetVectorIndex(ctx context.Context, shardName, targetVector string) error

IMPORTANT: DebugResetVectorIndex is intended to be used for debugging purposes only. It drops the selected vector index, creates a new one, then reindexes it in the background. This function assumes the node is not receiving any traffic besides the debug endpoints and that async indexing is enabled.

func (*Index) DigestObjects

func (i *Index) DigestObjects(ctx context.Context,
	shardName string, ids []strfmt.UUID,
) (result []replica.RepairResponse, err error)

func (*Index) DigestObjectsInTokenRange

func (i *Index) DigestObjectsInTokenRange(ctx context.Context,
	shardName string, initialToken, finalToken uint64, limit int,
) (result []replica.RepairResponse, lastTokenRead uint64, err error)

func (*Index) FetchObject

func (i *Index) FetchObject(ctx context.Context,
	shardName string, id strfmt.UUID,
) (objects.Replica, error)

func (*Index) FetchObjects

func (i *Index) FetchObjects(ctx context.Context,
	shardName string, ids []strfmt.UUID,
) ([]objects.Replica, error)

func (*Index) ForEachLoadedShard

func (i *Index) ForEachLoadedShard(f func(name string, shard ShardLike) error) error

func (*Index) ForEachShard

func (i *Index) ForEachShard(f func(name string, shard ShardLike) error) error

ForEachShard applies func f on each shard in the index.

WARNING: only use this if you expect all LazyLoadShards to be loaded! Calling this method may lead to shards being force-loaded, causing unexpected CPU spikes. If you only want to apply f on loaded shards, call ForEachLoadedShard instead.

func (*Index) ForEachShardConcurrently

func (i *Index) ForEachShardConcurrently(f func(name string, shard ShardLike) error) error

func (*Index) GetShard

func (i *Index) GetShard(ctx context.Context, shardName string) (
	shard ShardLike, release func(), err error,
)

func (*Index) HashTreeLevel

func (i *Index) HashTreeLevel(ctx context.Context,
	shardName string, level int, discriminant *hashtree.Bitset,
) (digests []hashtree.Digest, err error)

func (*Index) ID

func (i *Index) ID() string

func (*Index) IncomingAggregate

func (i *Index) IncomingAggregate(ctx context.Context, shardName string,
	params aggregation.Params, mods interface{},
) (*aggregation.Result, error)

func (*Index) IncomingBatchAddReferences

func (i *Index) IncomingBatchAddReferences(ctx context.Context, shardName string,
	refs objects.BatchReferences, schemaVersion uint64,
) []error

func (*Index) IncomingBatchPutObjects

func (i *Index) IncomingBatchPutObjects(ctx context.Context, shardName string,
	objects []*storobj.Object, schemaVersion uint64,
) []error

func (*Index) IncomingCreateShard

func (i *Index) IncomingCreateShard(ctx context.Context, className string, shardName string) error

func (*Index) IncomingDeleteObject

func (i *Index) IncomingDeleteObject(ctx context.Context, shardName string,
	id strfmt.UUID, deletionTime time.Time, schemaVersion uint64,
) error

func (*Index) IncomingDeleteObjectBatch

func (i *Index) IncomingDeleteObjectBatch(ctx context.Context, shardName string,
	uuids []strfmt.UUID, deletionTime time.Time, dryRun bool, schemaVersion uint64,
) objects.BatchSimpleObjects

func (*Index) IncomingDigestObjects

func (i *Index) IncomingDigestObjects(ctx context.Context,
	shardName string, ids []strfmt.UUID,
) (result []replica.RepairResponse, err error)

func (*Index) IncomingDigestObjectsInTokenRange

func (i *Index) IncomingDigestObjectsInTokenRange(ctx context.Context,
	shardName string, initialToken, finalToken uint64, limit int,
) (result []replica.RepairResponse, lastTokenRead uint64, err error)

func (*Index) IncomingExists

func (i *Index) IncomingExists(ctx context.Context, shardName string,
	id strfmt.UUID,
) (bool, error)

func (*Index) IncomingFilePutter

func (i *Index) IncomingFilePutter(ctx context.Context, shardName,
	filePath string,
) (io.WriteCloser, error)

func (*Index) IncomingFindUUIDs

func (i *Index) IncomingFindUUIDs(ctx context.Context, shardName string,
	filters *filters.LocalFilter,
) ([]strfmt.UUID, error)

func (*Index) IncomingGetObject

func (i *Index) IncomingGetObject(ctx context.Context, shardName string,
	id strfmt.UUID, props search.SelectProperties,
	additional additional.Properties,
) (*storobj.Object, error)

func (*Index) IncomingGetShardQueueSize

func (i *Index) IncomingGetShardQueueSize(ctx context.Context, shardName string) (int64, error)

func (*Index) IncomingGetShardStatus

func (i *Index) IncomingGetShardStatus(ctx context.Context, shardName string) (string, error)

func (*Index) IncomingHashTreeLevel

func (i *Index) IncomingHashTreeLevel(ctx context.Context,
	shardName string, level int, discriminant *hashtree.Bitset,
) (digests []hashtree.Digest, err error)

func (*Index) IncomingMergeObject

func (i *Index) IncomingMergeObject(ctx context.Context, shardName string,
	mergeDoc objects.MergeDocument, schemaVersion uint64,
) error

func (*Index) IncomingMultiGetObjects

func (i *Index) IncomingMultiGetObjects(ctx context.Context, shardName string,
	ids []strfmt.UUID,
) ([]*storobj.Object, error)

func (*Index) IncomingOverwriteObjects

func (i *Index) IncomingOverwriteObjects(ctx context.Context,
	shardName string, vobjects []*objects.VObject,
) ([]replica.RepairResponse, error)

func (*Index) IncomingPutObject

func (i *Index) IncomingPutObject(ctx context.Context, shardName string,
	object *storobj.Object, schemaVersion uint64,
) error

func (*Index) IncomingReinitShard

func (i *Index) IncomingReinitShard(ctx context.Context, shardName string) error

func (*Index) IncomingSearch

func (i *Index) IncomingSearch(ctx context.Context, shardName string,
	searchVectors []models.Vector, targetVectors []string, distance float32, limit int,
	filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking,
	sort []filters.Sort, cursor *filters.Cursor, groupBy *searchparams.GroupBy,
	additional additional.Properties, targetCombination *dto.TargetCombination, properties []string,
) ([]*storobj.Object, []float32, error)

func (*Index) IncomingUpdateShardStatus

func (i *Index) IncomingUpdateShardStatus(ctx context.Context, shardName, targetStatus string, schemaVersion uint64) error

func (*Index) IterateObjects

func (i *Index) IterateObjects(ctx context.Context, cb func(index *Index, shard ShardLike, object *storobj.Object) error) (err error)

Iterate over all objects in the index, applying the callback function to each one. Adding or removing objects during iteration is not supported.

func (*Index) IterateShards

func (i *Index) IterateShards(ctx context.Context, cb func(index *Index, shard ShardLike) error) (err error)

Iterate over all objects in the shard, applying the callback function to each one. Adding or removing objects during iteration is not supported.

func (*Index) OverwriteObjects

func (idx *Index) OverwriteObjects(ctx context.Context,
	shard string, updates []*objects.VObject,
) ([]replica.RepairResponse, error)

OverwriteObjects if their state didn't change in the meantime It returns nil if all object have been successfully overwritten and otherwise a list of failed operations.

func (*Index) ReleaseBackup

func (i *Index) ReleaseBackup(ctx context.Context, id string) error

ReleaseBackup marks the specified backup as inactive and restarts all async background and maintenance processes. It errors if the backup does not exist or is already inactive.

func (*Index) ReplicateDeletion

func (i *Index) ReplicateDeletion(ctx context.Context, shard, requestID string, uuid strfmt.UUID, deletionTime time.Time) replica.SimpleResponse

func (*Index) ReplicateDeletions

func (i *Index) ReplicateDeletions(ctx context.Context, shard, requestID string,
	uuids []strfmt.UUID, deletionTime time.Time, dryRun bool, schemaVersion uint64,
) replica.SimpleResponse

func (*Index) ReplicateObject

func (i *Index) ReplicateObject(ctx context.Context, shard, requestID string, object *storobj.Object) replica.SimpleResponse

func (*Index) ReplicateObjects

func (i *Index) ReplicateObjects(ctx context.Context, shard, requestID string, objects []*storobj.Object, schemaVersion uint64) replica.SimpleResponse

func (*Index) ReplicateReferences

func (i *Index) ReplicateReferences(ctx context.Context, shard, requestID string, refs []objects.BatchReference) replica.SimpleResponse

func (*Index) ReplicateUpdate

func (i *Index) ReplicateUpdate(ctx context.Context, shard, requestID string, doc *objects.MergeDocument) replica.SimpleResponse

func (*Index) Shutdown

func (i *Index) Shutdown(ctx context.Context) error

type IndexConfig

type IndexConfig struct {
	RootPath                       string
	ClassName                      schema.ClassName
	QueryMaximumResults            int64
	QueryNestedRefLimit            int64
	ResourceUsage                  config.ResourceUsage
	MemtablesFlushDirtyAfter       int
	MemtablesInitialSizeMB         int
	MemtablesMaxSizeMB             int
	MemtablesMinActiveSeconds      int
	MemtablesMaxActiveSeconds      int
	SegmentsCleanupIntervalSeconds int
	SeparateObjectsCompactions     bool
	MaxSegmentSize                 int64
	HNSWMaxLogSize                 int64
	HNSWWaitForCachePrefill        bool
	HNSWFlatSearchConcurrency      int
	VisitedListPoolMaxSize         int
	ReplicationFactor              *atomic.Int64
	DeletionStrategy               string
	AsyncReplicationEnabled        bool
	AvoidMMap                      bool
	DisableLazyLoadShards          bool
	ForceFullReplicasSearch        bool

	TrackVectorDimensions bool
}

type KnnAggregator

type KnnAggregator struct {
	// contains filtered or unexported fields
}

TODO: this is business logic, move out of here

func NewKnnAggregator

func NewKnnAggregator(input search.Results, sourceVector []float32) *KnnAggregator

func (*KnnAggregator) Aggregate

func (a *KnnAggregator) Aggregate(k int, properties []string) ([]classification.NeighborRef, error)

type LazyLoadShard

type LazyLoadShard struct {
	// contains filtered or unexported fields
}

func NewLazyLoadShard

func NewLazyLoadShard(ctx context.Context, promMetrics *monitoring.PrometheusMetrics,
	shardName string, index *Index, class *models.Class, jobQueueCh chan job,
	indexCheckpoints *indexcheckpoint.Checkpoints, memMonitor memwatch.AllocChecker,
) *LazyLoadShard

func (*LazyLoadShard) Activity

func (l *LazyLoadShard) Activity() int32

func (*LazyLoadShard) AddReferencesBatch

func (l *LazyLoadShard) AddReferencesBatch(ctx context.Context, refs objects.BatchReferences) []error

func (*LazyLoadShard) Aggregate

func (l *LazyLoadShard) Aggregate(ctx context.Context, params aggregation.Params, modules *modules.Provider) (*aggregation.Result, error)

func (*LazyLoadShard) AnalyzeObject

func (l *LazyLoadShard) AnalyzeObject(object *storobj.Object) ([]inverted.Property, []inverted.NilProperty, error)

func (*LazyLoadShard) ConvertQueue

func (l *LazyLoadShard) ConvertQueue(targetVector string) error

func (*LazyLoadShard) Counter

func (l *LazyLoadShard) Counter() *indexcounter.Counter

func (*LazyLoadShard) DebugResetVectorIndex

func (l *LazyLoadShard) DebugResetVectorIndex(ctx context.Context, targetVector string) error

func (*LazyLoadShard) DeleteObject

func (l *LazyLoadShard) DeleteObject(ctx context.Context, id strfmt.UUID, deletionTime time.Time) error

func (*LazyLoadShard) DeleteObjectBatch

func (l *LazyLoadShard) DeleteObjectBatch(ctx context.Context, ids []strfmt.UUID, deletionTime time.Time, dryRun bool) objects.BatchSimpleObjects

func (*LazyLoadShard) Dimensions

func (l *LazyLoadShard) Dimensions(ctx context.Context) int

func (*LazyLoadShard) Exists

func (l *LazyLoadShard) Exists(ctx context.Context, id strfmt.UUID) (bool, error)

func (*LazyLoadShard) FillQueue

func (l *LazyLoadShard) FillQueue(targetVector string, from uint64) error

func (*LazyLoadShard) FindUUIDs

func (l *LazyLoadShard) FindUUIDs(ctx context.Context, filters *filters.LocalFilter) ([]strfmt.UUID, error)

func (*LazyLoadShard) GetPropertyLengthTracker

func (l *LazyLoadShard) GetPropertyLengthTracker() *inverted.JsonShardMetaData

func (*LazyLoadShard) GetStatus

func (l *LazyLoadShard) GetStatus() storagestate.Status

func (*LazyLoadShard) GetStatusNoLoad

func (l *LazyLoadShard) GetStatusNoLoad() storagestate.Status

func (*LazyLoadShard) HaltForTransfer

func (l *LazyLoadShard) HaltForTransfer(ctx context.Context) error

func (*LazyLoadShard) HashTreeLevel

func (l *LazyLoadShard) HashTreeLevel(ctx context.Context, level int, discriminant *hashtree.Bitset) (digests []hashtree.Digest, err error)

func (*LazyLoadShard) ID

func (l *LazyLoadShard) ID() string

func (*LazyLoadShard) Index

func (l *LazyLoadShard) Index() *Index

func (*LazyLoadShard) ListBackupFiles

func (l *LazyLoadShard) ListBackupFiles(ctx context.Context, ret *backup.ShardDescriptor) error

func (*LazyLoadShard) Load

func (l *LazyLoadShard) Load(ctx context.Context) error

func (*LazyLoadShard) MergeObject

func (l *LazyLoadShard) MergeObject(ctx context.Context, object objects.MergeDocument) error

func (*LazyLoadShard) Metrics

func (l *LazyLoadShard) Metrics() *Metrics

func (*LazyLoadShard) MultiObjectByID

func (l *LazyLoadShard) MultiObjectByID(ctx context.Context, query []multi.Identifier) ([]*storobj.Object, error)

func (*LazyLoadShard) Name

func (l *LazyLoadShard) Name() string

func (*LazyLoadShard) NotifyReady

func (l *LazyLoadShard) NotifyReady()

func (*LazyLoadShard) ObjectByID

func (l *LazyLoadShard) ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)

func (*LazyLoadShard) ObjectByIDErrDeleted

func (l *LazyLoadShard) ObjectByIDErrDeleted(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)

func (*LazyLoadShard) ObjectCount

func (l *LazyLoadShard) ObjectCount() int

func (*LazyLoadShard) ObjectCountAsync

func (l *LazyLoadShard) ObjectCountAsync() int

func (*LazyLoadShard) ObjectDigestsByTokenRange

func (l *LazyLoadShard) ObjectDigestsByTokenRange(ctx context.Context,
	initialToken, finalToken uint64, limit int,
) (objs []replica.RepairResponse, lastTokenRead uint64, err error)

func (*LazyLoadShard) ObjectList

func (l *LazyLoadShard) ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, className schema.ClassName) ([]*storobj.Object, error)

func (*LazyLoadShard) ObjectSearch

func (l *LazyLoadShard) ObjectSearch(ctx context.Context, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, properties []string) ([]*storobj.Object, []float32, error)

func (*LazyLoadShard) ObjectVectorSearch

func (l *LazyLoadShard) ObjectVectorSearch(ctx context.Context, searchVectors []models.Vector, targetVectors []string, targetDist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties, targetCombination *dto.TargetCombination, properties []string) ([]*storobj.Object, []float32, error)

func (*LazyLoadShard) PutObject

func (l *LazyLoadShard) PutObject(ctx context.Context, object *storobj.Object) error

func (*LazyLoadShard) PutObjectBatch

func (l *LazyLoadShard) PutObjectBatch(ctx context.Context, objects []*storobj.Object) []error

func (*LazyLoadShard) QuantizedDimensions

func (l *LazyLoadShard) QuantizedDimensions(ctx context.Context, segments int) int

func (*LazyLoadShard) Queue

func (l *LazyLoadShard) Queue() *VectorIndexQueue

func (*LazyLoadShard) Queues

func (l *LazyLoadShard) Queues() map[string]*VectorIndexQueue

func (*LazyLoadShard) RepairIndex

func (l *LazyLoadShard) RepairIndex(ctx context.Context, targetVector string) error

func (*LazyLoadShard) SetPropertyLengths

func (l *LazyLoadShard) SetPropertyLengths(props []inverted.Property) error

func (*LazyLoadShard) SetStatusReadonly

func (l *LazyLoadShard) SetStatusReadonly(reason string) error

func (*LazyLoadShard) Shutdown

func (l *LazyLoadShard) Shutdown(ctx context.Context) error

func (*LazyLoadShard) Store

func (l *LazyLoadShard) Store() *lsmkv.Store

func (*LazyLoadShard) UpdateAsyncReplication

func (l *LazyLoadShard) UpdateAsyncReplication(ctx context.Context, enabled bool) error

func (*LazyLoadShard) UpdateStatus

func (l *LazyLoadShard) UpdateStatus(status string) error

func (*LazyLoadShard) UpdateVectorIndexConfig

func (l *LazyLoadShard) UpdateVectorIndexConfig(ctx context.Context, updated schemaConfig.VectorIndexConfig) error

func (*LazyLoadShard) UpdateVectorIndexConfigs

func (l *LazyLoadShard) UpdateVectorIndexConfigs(ctx context.Context, updated map[string]schemaConfig.VectorIndexConfig) error

func (*LazyLoadShard) VectorDistanceForQuery

func (l *LazyLoadShard) VectorDistanceForQuery(ctx context.Context, id uint64, searchVectors []models.Vector, targets []string) ([]float32, error)

func (*LazyLoadShard) VectorIndex

func (l *LazyLoadShard) VectorIndex() VectorIndex

func (*LazyLoadShard) VectorIndexes

func (l *LazyLoadShard) VectorIndexes() map[string]VectorIndex

func (*LazyLoadShard) Versioner

func (l *LazyLoadShard) Versioner() *shardVersioner

func (*LazyLoadShard) WasDeleted

func (l *LazyLoadShard) WasDeleted(ctx context.Context, id strfmt.UUID) (bool, time.Time, error)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(
	logger logrus.FieldLogger, prom *monitoring.PrometheusMetrics,
	className, shardName string,
) *Metrics

func (*Metrics) BatchCount

func (m *Metrics) BatchCount(size int)

func (*Metrics) BatchCountBytes

func (m *Metrics) BatchCountBytes(size int64)

func (*Metrics) BatchDelete

func (m *Metrics) BatchDelete(start time.Time, op string)

func (*Metrics) BatchObject

func (m *Metrics) BatchObject(start time.Time, size int)

func (*Metrics) DeleteShardLabels

func (m *Metrics) DeleteShardLabels(class, shard string)

func (*Metrics) FilteredVectorFilter

func (m *Metrics) FilteredVectorFilter(dur time.Duration)

func (*Metrics) FilteredVectorObjects

func (m *Metrics) FilteredVectorObjects(dur time.Duration)

func (*Metrics) FilteredVectorSort

func (m *Metrics) FilteredVectorSort(dur time.Duration)

func (*Metrics) FilteredVectorVector

func (m *Metrics) FilteredVectorVector(dur time.Duration)

func (*Metrics) InvertedDeleteDelta

func (m *Metrics) InvertedDeleteDelta(start time.Time)

func (*Metrics) InvertedDeleteOld

func (m *Metrics) InvertedDeleteOld(start time.Time)

func (*Metrics) InvertedExtend

func (m *Metrics) InvertedExtend(start time.Time, propCount int)

func (*Metrics) ObjectStore

func (m *Metrics) ObjectStore(start time.Time)

func (*Metrics) PutObject

func (m *Metrics) PutObject(start time.Time)

func (*Metrics) PutObjectDetermineStatus

func (m *Metrics) PutObjectDetermineStatus(start time.Time)

func (*Metrics) PutObjectUpdateInverted

func (m *Metrics) PutObjectUpdateInverted(start time.Time)

func (*Metrics) PutObjectUpsertObject

func (m *Metrics) PutObjectUpsertObject(start time.Time)

func (*Metrics) ShardStartup

func (m *Metrics) ShardStartup(start time.Time)

func (*Metrics) VectorIndex

func (m *Metrics) VectorIndex(start time.Time)

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

func NewMigrator

func NewMigrator(db *DB, logger logrus.FieldLogger) *Migrator

func (*Migrator) AddClass

func (m *Migrator) AddClass(ctx context.Context, class *models.Class,
	shardState *sharding.State,
) error

func (*Migrator) AddProperty

func (m *Migrator) AddProperty(ctx context.Context, className string, prop ...*models.Property) error

func (*Migrator) AdjustFilterablePropSettings

func (m *Migrator) AdjustFilterablePropSettings(ctx context.Context) error

As of v1.19 property's IndexInverted setting is replaced with IndexFilterable and IndexSearchable Filterable buckets use roaring set strategy and searchable ones use map strategy (therefore are applicable just for text/text[]) Since both type of buckets can coexist for text/text[] props they need to be distinguished by their name: searchable bucket has "searchable" suffix. Up until v1.19 default text/text[]/string/string[] (string/string[] deprecated since v1.19) strategy for buckets was map, migrating from pre v1.19 to v1.19 needs to properly handle existing text/text[] buckets of map strategy having filterable bucket name.

Enabled InvertedIndex translates in v1.19 to both InvertedFilterable and InvertedSearchable enabled, but since only searchable bucket exist (with filterable name), it has to be renamed to searchable bucket. Though IndexFilterable setting is enabled filterable index does not exists, therefore shards are switched into fallback mode, to use searchable buckets instead of filterable ones whenever filtered are expected. Fallback mode effectively sets IndexFilterable to false, although it stays enabled according to schema.

If filterable indexes will be created (that is up to user to decide whether missing indexes should be created later on), shards will not be working in fallback mode, and actual filterable index will be used when needed.

func (*Migrator) DeleteTenants

func (m *Migrator) DeleteTenants(ctx context.Context, class string, tenants []string) error

DeleteTenants deletes tenants CAUTION: will not delete inactive tenants (shard files will not be removed)

func (*Migrator) DropClass

func (m *Migrator) DropClass(ctx context.Context, className string, hasFrozen bool) error

func (*Migrator) DropProperty

func (m *Migrator) DropProperty(ctx context.Context, className string, propertyName string) error

DropProperty is ignored, API compliant change

func (*Migrator) GetShardsQueueSize

func (m *Migrator) GetShardsQueueSize(ctx context.Context, className, tenant string) (map[string]int64, error)

func (*Migrator) GetShardsStatus

func (m *Migrator) GetShardsStatus(ctx context.Context, className, tenant string) (map[string]string, error)

func (*Migrator) InvertedReindex

func (m *Migrator) InvertedReindex(ctx context.Context, taskNamesWithArgs map[string]any) error

func (*Migrator) NewTenants

func (m *Migrator) NewTenants(ctx context.Context, class *models.Class, creates []*schemaUC.CreateTenantPayload) error

NewTenants creates new partitions

func (*Migrator) RecalculateVectorDimensions

func (m *Migrator) RecalculateVectorDimensions(ctx context.Context) error

func (*Migrator) RecountProperties

func (m *Migrator) RecountProperties(ctx context.Context) error

func (*Migrator) SetCluster

func (m *Migrator) SetCluster(c processor)

func (*Migrator) SetNode

func (m *Migrator) SetNode(nodeID string)

func (*Migrator) SetOffloadProvider

func (m *Migrator) SetOffloadProvider(provider provider, moduleName string)

func (*Migrator) Shutdown

func (m *Migrator) Shutdown(ctx context.Context) error

Shutdown no-op if db was never loaded

func (*Migrator) UpdateClass

func (m *Migrator) UpdateClass(ctx context.Context, className string, newClassName *string) error

func (*Migrator) UpdateIndex

func (m *Migrator) UpdateIndex(ctx context.Context, incomingClass *models.Class,
	incomingSS *sharding.State,
) error

UpdateIndex ensures that the local index is up2date with the latest sharding state (shards/tenants) and index properties that may have been added in the case that the local node was down during a class update operation.

This method is relevant when the local node is a part of a cluster, particularly with the introduction of the v2 RAFT-based schema

func (*Migrator) UpdateInvertedIndexConfig

func (m *Migrator) UpdateInvertedIndexConfig(ctx context.Context, className string,
	updated *models.InvertedIndexConfig,
) error

func (*Migrator) UpdateProperty

func (m *Migrator) UpdateProperty(ctx context.Context, className string, propName string, newName *string) error

func (*Migrator) UpdateReplicationConfig

func (m *Migrator) UpdateReplicationConfig(ctx context.Context, className string, cfg *models.ReplicationConfig) error

func (*Migrator) UpdateShardStatus

func (m *Migrator) UpdateShardStatus(ctx context.Context, className, shardName, targetStatus string, schemaVersion uint64) error

func (*Migrator) UpdateTenants

func (m *Migrator) UpdateTenants(ctx context.Context, class *models.Class, updates []*schemaUC.UpdateTenantPayload) error

UpdateTenants activates or deactivates tenant partitions and returns a commit func that can be used to either commit or rollback the changes

func (*Migrator) UpdateVectorIndexConfig

func (m *Migrator) UpdateVectorIndexConfig(ctx context.Context,
	className string, updated schemaConfig.VectorIndexConfig,
) error

func (*Migrator) UpdateVectorIndexConfigs

func (m *Migrator) UpdateVectorIndexConfigs(ctx context.Context,
	className string, updated map[string]schemaConfig.VectorIndexConfig,
) error

func (*Migrator) ValidateInvertedIndexConfigUpdate

func (m *Migrator) ValidateInvertedIndexConfigUpdate(old, updated *models.InvertedIndexConfig,
) error

func (*Migrator) ValidateVectorIndexConfigUpdate

func (m *Migrator) ValidateVectorIndexConfigUpdate(
	old, updated schemaConfig.VectorIndexConfig,
) error

func (*Migrator) ValidateVectorIndexConfigsUpdate

func (m *Migrator) ValidateVectorIndexConfigsUpdate(old, updated map[string]schemaConfig.VectorIndexConfig,
) error

func (*Migrator) WaitForStartup

func (m *Migrator) WaitForStartup(ctx context.Context) error

type PropertyIndexType

type PropertyIndexType uint8
const (
	IndexTypePropValue PropertyIndexType = iota + 1
	IndexTypePropLength
	IndexTypePropNull
	IndexTypePropSearchableValue
	IndexTypePropMetaCount
)

func GetPropNameAndIndexTypeFromBucketName

func GetPropNameAndIndexTypeFromBucketName(bucketName string) (string, PropertyIndexType)

type ReindexableProperty

type ReindexableProperty struct {
	PropertyName    string
	IndexType       PropertyIndexType
	NewIndex        bool // is new index, there is no bucket to replace with
	DesiredStrategy string
	BucketOptions   []lsmkv.BucketOption
}

type Replicator

type Replicator interface {
	ReplicateObject(ctx context.Context, shardName, requestID string,
		object *storobj.Object) replica.SimpleResponse
	ReplicateObjects(ctx context.Context, shardName, requestID string,
		objects []*storobj.Object) replica.SimpleResponse
	ReplicateUpdate(ctx context.Context, shard, requestID string,
		doc *objects.MergeDocument) replica.SimpleResponse
	ReplicateDeletion(ctx context.Context, shardName, requestID string,
		uuid strfmt.UUID, deletionTime time.Time) replica.SimpleResponse
	ReplicateDeletions(ctx context.Context, shardName, requestID string,
		uuids []strfmt.UUID, deletionTime time.Time, dryRun bool, schemaVersion uint64) replica.SimpleResponse
	ReplicateReferences(ctx context.Context, shard, requestID string,
		refs []objects.BatchReference) replica.SimpleResponse
	CommitReplication(shard,
		requestID string) interface{}
	AbortReplication(shardName,
		requestID string) interface{}
}

type ResultContainer

type ResultContainer interface {
	AddScores(id uint64, targets []string, distances []float32, weights []float32)
	RemoveIdFromResult(id uint64)
}

type ResultContainerHybrid

type ResultContainerHybrid struct {
	ResultsIn []*search.Result

	IDsToRemove map[uint64]struct{}
	// contains filtered or unexported fields
}

func (*ResultContainerHybrid) AddScores

func (r *ResultContainerHybrid) AddScores(id uint64, targets []string, distances []float32, weights []float32)

func (*ResultContainerHybrid) RemoveIdFromResult

func (r *ResultContainerHybrid) RemoveIdFromResult(id uint64)

type ResultContainerStandard

type ResultContainerStandard struct {
	ResultsIn map[uint64]idAndDistance
}

func (*ResultContainerStandard) AddScores

func (r *ResultContainerStandard) AddScores(id uint64, targets []string, distances []float32, weights []float32)

func (*ResultContainerStandard) RemoveIdFromResult

func (r *ResultContainerStandard) RemoveIdFromResult(id uint64)

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

Shard is the smallest completely-contained index unit. A shard manages database files for all the objects it owns. How a shard is determined for a target object (e.g. Murmur hash, etc.) is still open at this point

func NewShard

func NewShard(ctx context.Context, promMetrics *monitoring.PrometheusMetrics,
	shardName string, index *Index, class *models.Class, jobQueueCh chan job,
	scheduler *queue.Scheduler,
	indexCheckpoints *indexcheckpoint.Checkpoints,
) (_ *Shard, err error)

func (*Shard) Activity

func (s *Shard) Activity() int32

func (*Shard) AddReferencesBatch

func (s *Shard) AddReferencesBatch(ctx context.Context, refs objects.BatchReferences) []error

return value map[int]error gives the error for the index as it received it

func (*Shard) Aggregate

func (s *Shard) Aggregate(ctx context.Context, params aggregation.Params, modules *modules.Provider) (*aggregation.Result, error)

func (*Shard) AnalyzeObject

func (s *Shard) AnalyzeObject(object *storobj.Object) ([]inverted.Property, []inverted.NilProperty, error)

func (*Shard) ConvertQueue

func (s *Shard) ConvertQueue(targetVector string) error

ConvertQueue converts a legacy in-memory queue to an on-disk queue. It detects if the queue has a checkpoint then it enqueues all the remaining vectors to the on-disk queue, then deletes the checkpoint.

func (*Shard) Counter

func (s *Shard) Counter() *indexcounter.Counter

func (*Shard) DebugResetVectorIndex

func (s *Shard) DebugResetVectorIndex(ctx context.Context, targetVector string) error

IMPORTANT: DebugResetVectorIndex is intended to be used for debugging purposes only. It creates a new vector index and replaces the existing one if any. This function assumes the node is not receiving any traffic besides the debug endpoints and that async indexing is enabled.

func (*Shard) DeleteObject

func (s *Shard) DeleteObject(ctx context.Context, id strfmt.UUID, deletionTime time.Time) error

func (*Shard) DeleteObjectBatch

func (s *Shard) DeleteObjectBatch(ctx context.Context, uuids []strfmt.UUID, deletionTime time.Time, dryRun bool) objects.BatchSimpleObjects

return value map[int]error gives the error for the index as it received it

func (*Shard) Dimensions

func (s *Shard) Dimensions(ctx context.Context) int

func (*Shard) DimensionsForVec

func (s *Shard) DimensionsForVec(ctx context.Context, vecName string) int

func (*Shard) Exists

func (s *Shard) Exists(ctx context.Context, id strfmt.UUID) (bool, error)

TODO: This does an actual read which is not really needed, if we see this come up in profiling, we could optimize this by adding an explicit Exists() on the LSMKV which only checks the bloom filters, which at least in the case of a true negative would be considerably faster. For a (false) positive, we'd still need to check, though.

func (*Shard) FillQueue

func (s *Shard) FillQueue(targetVector string, from uint64) error

FillQueue is a helper function that enqueues all vectors from the LSM store to the on-disk queue.

func (*Shard) FindUUIDs

func (s *Shard) FindUUIDs(ctx context.Context, filters *filters.LocalFilter) ([]strfmt.UUID, error)

func (*Shard) GetPropertyLengthTracker

func (s *Shard) GetPropertyLengthTracker() *inverted.JsonShardMetaData

Tracks the lengths of all properties. Must be updated on inserts/deletes.

func (*Shard) GetStatus

func (s *Shard) GetStatus() storagestate.Status

func (*Shard) GetStatusNoLoad

func (s *Shard) GetStatusNoLoad() storagestate.Status

Same implem for for a regular shard, this only differ in lazy loaded shards

func (*Shard) HaltForTransfer

func (s *Shard) HaltForTransfer(ctx context.Context) (err error)

HaltForTransfer stops compaction, and flushing memtable and commit log to begin with backup or cloud offload

func (*Shard) HashTreeLevel

func (s *Shard) HashTreeLevel(ctx context.Context, level int, discriminant *hashtree.Bitset) (digests []hashtree.Digest, err error)

func (*Shard) ID

func (s *Shard) ID() string

func (*Shard) Index

func (s *Shard) Index() *Index

func (*Shard) ListBackupFiles

func (s *Shard) ListBackupFiles(ctx context.Context, ret *backup.ShardDescriptor) error

ListBackupFiles lists all files used to backup a shard

func (*Shard) MergeObject

func (s *Shard) MergeObject(ctx context.Context, merge objects.MergeDocument) error

func (*Shard) Metrics

func (s *Shard) Metrics() *Metrics

Grafana metrics

func (*Shard) MultiObjectByID

func (s *Shard) MultiObjectByID(ctx context.Context, query []multi.Identifier) ([]*storobj.Object, error)

func (*Shard) Name

func (s *Shard) Name() string

Shard name(identifier?)

func (*Shard) NotifyReady

func (s *Shard) NotifyReady()

func (*Shard) ObjectByID

func (s *Shard) ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)

func (*Shard) ObjectByIDErrDeleted

func (s *Shard) ObjectByIDErrDeleted(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)

func (*Shard) ObjectCount

func (s *Shard) ObjectCount() int

ObjectCount returns the exact count at any moment

func (*Shard) ObjectCountAsync

func (s *Shard) ObjectCountAsync() int

ObjectCountAsync returns the eventually consistent "async" count which is much cheaper to obtain

func (*Shard) ObjectDigestsByTokenRange

func (s *Shard) ObjectDigestsByTokenRange(ctx context.Context,
	initialToken, finalToken uint64, limit int) (
	res []replica.RepairResponse, lastTokenRead uint64, err error,
)

func (*Shard) ObjectList

func (s *Shard) ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, className schema.ClassName) ([]*storobj.Object, error)

func (*Shard) ObjectSearch

func (s *Shard) ObjectSearch(ctx context.Context, limit int, filters *filters.LocalFilter,
	keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor,
	additional additional.Properties, properties []string,
) ([]*storobj.Object, []float32, error)

func (*Shard) ObjectVectorSearch

func (s *Shard) ObjectVectorSearch(ctx context.Context, searchVectors []models.Vector, targetVectors []string, targetDist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties, targetCombination *dto.TargetCombination, properties []string) ([]*storobj.Object, []float32, error)

func (*Shard) PutObject

func (s *Shard) PutObject(ctx context.Context, object *storobj.Object) error

func (*Shard) PutObjectBatch

func (s *Shard) PutObjectBatch(ctx context.Context,
	objects []*storobj.Object,
) []error

return value map[int]error gives the error for the index as it received it

func (*Shard) QuantizedDimensions

func (s *Shard) QuantizedDimensions(ctx context.Context, segments int) int

func (*Shard) QuantizedDimensionsForVec

func (s *Shard) QuantizedDimensionsForVec(ctx context.Context, segments int, vecName string) int

func (*Shard) Queue

func (s *Shard) Queue() *VectorIndexQueue

func (*Shard) QueueForName

func (s *Shard) QueueForName(targetVector string) *VectorIndexQueue

func (*Shard) Queues

func (s *Shard) Queues() map[string]*VectorIndexQueue

func (*Shard) RepairIndex

func (s *Shard) RepairIndex(ctx context.Context, targetVector string) error

RepairIndex ensures the vector index is consistent with the LSM store. It goes through the LSM store and enqueues any unindexed vector, and it also removes any indexed vector that is not in the LSM store. It it safe to call or interrupt this method at any time. If ASYNC_INDEXING is disabled, it's a no-op.

func (*Shard) SetPropertyLengthTracker

func (s *Shard) SetPropertyLengthTracker(tracker *inverted.JsonShardMetaData)

Tracks the lengths of all properties. Must be updated on inserts/deletes.

func (*Shard) SetPropertyLengths

func (s *Shard) SetPropertyLengths(props []inverted.Property) error

func (*Shard) SetStatusReadonly

func (s *Shard) SetStatusReadonly(reason string) error

func (*Shard) Shutdown

func (s *Shard) Shutdown(ctx context.Context) (err error)
batch
	shut
	false
		in_use ++
		defer in_use --
	true
		fail request

shutdown
	loop + time:
	if shut == true
		fail request
	in_use == 0 && shut == false
		shut = true

Shutdown needs to be idempotent, so it can also deal with a partial initialization. In some parts, it relies on the underlying structs to have idempotent Shutdown methods. In other parts, it explicitly checks if a component was initialized. If not, it turns it into a noop to prevent blocking.

func (*Shard) Store

func (s *Shard) Store() *lsmkv.Store

The physical data store

func (*Shard) UpdateAsyncReplication

func (s *Shard) UpdateAsyncReplication(ctx context.Context, enabled bool) error

func (*Shard) UpdateStatus

func (s *Shard) UpdateStatus(in string) error

func (*Shard) UpdateVectorIndexConfig

func (s *Shard) UpdateVectorIndexConfig(ctx context.Context, updated schemaConfig.VectorIndexConfig) error

func (*Shard) UpdateVectorIndexConfigs

func (s *Shard) UpdateVectorIndexConfigs(ctx context.Context, updated map[string]schemaConfig.VectorIndexConfig) error

func (*Shard) VectorDistanceForQuery

func (s *Shard) VectorDistanceForQuery(ctx context.Context, docId uint64, searchVectors []models.Vector, targetVectors []string) ([]float32, error)

func (*Shard) VectorIndex

func (s *Shard) VectorIndex() VectorIndex

func (*Shard) VectorIndexForName

func (s *Shard) VectorIndexForName(targetVector string) VectorIndex

func (*Shard) VectorIndexes

func (s *Shard) VectorIndexes() map[string]VectorIndex

func (*Shard) Versioner

func (s *Shard) Versioner() *shardVersioner

func (*Shard) WasDeleted

func (s *Shard) WasDeleted(ctx context.Context, id strfmt.UUID) (bool, time.Time, error)

type ShardInvertedReindexTask

type ShardInvertedReindexTask interface {
	GetPropertiesToReindex(ctx context.Context, shard ShardLike,
	) ([]ReindexableProperty, error)
	// right now only OnResume is needed, but in the future more
	// callbacks could be added
	// (like OnPrePauseStore, OnPostPauseStore, OnPreResumeStore, etc)
	OnPostResumeStore(ctx context.Context, shard ShardLike) error
	ObjectsIterator(shard ShardLike) objectsIterator
}

type ShardInvertedReindexTaskSetToRoaringSet

type ShardInvertedReindexTaskSetToRoaringSet struct{}

func (*ShardInvertedReindexTaskSetToRoaringSet) GetPropertiesToReindex

func (t *ShardInvertedReindexTaskSetToRoaringSet) GetPropertiesToReindex(ctx context.Context,
	shard ShardLike,
) ([]ReindexableProperty, error)

func (*ShardInvertedReindexTaskSetToRoaringSet) ObjectsIterator

func (t *ShardInvertedReindexTaskSetToRoaringSet) ObjectsIterator(shard ShardLike) objectsIterator

func (*ShardInvertedReindexTaskSetToRoaringSet) OnPostResumeStore

func (t *ShardInvertedReindexTaskSetToRoaringSet) OnPostResumeStore(ctx context.Context, shard ShardLike) error

type ShardInvertedReindexTask_SpecifiedIndex

type ShardInvertedReindexTask_SpecifiedIndex struct {
	// contains filtered or unexported fields
}

func (*ShardInvertedReindexTask_SpecifiedIndex) GetPropertiesToReindex

func (t *ShardInvertedReindexTask_SpecifiedIndex) GetPropertiesToReindex(ctx context.Context,
	shard ShardLike,
) ([]ReindexableProperty, error)

func (*ShardInvertedReindexTask_SpecifiedIndex) ObjectsIterator

func (t *ShardInvertedReindexTask_SpecifiedIndex) ObjectsIterator(shard ShardLike) objectsIterator

func (*ShardInvertedReindexTask_SpecifiedIndex) OnPostResumeStore

func (t *ShardInvertedReindexTask_SpecifiedIndex) OnPostResumeStore(ctx context.Context, shard ShardLike) error

type ShardInvertedReindexer

type ShardInvertedReindexer struct {
	// contains filtered or unexported fields
}

func NewShardInvertedReindexer

func NewShardInvertedReindexer(shard ShardLike, logger logrus.FieldLogger) *ShardInvertedReindexer

func (*ShardInvertedReindexer) AddTask

func (*ShardInvertedReindexer) Do

type ShardLike

type ShardLike interface {
	Index() *Index                  // Get the parent index
	Name() string                   // Get the shard name
	Store() *lsmkv.Store            // Get the underlying store
	NotifyReady()                   // Set shard status to ready
	GetStatus() storagestate.Status // Return the shard status
	GetStatusNoLoad() storagestate.Status
	UpdateStatus(status string) error                                                   // Set shard status
	SetStatusReadonly(reason string) error                                              // Set shard status to readonly with reason
	FindUUIDs(ctx context.Context, filters *filters.LocalFilter) ([]strfmt.UUID, error) // Search and return document ids

	Counter() *indexcounter.Counter
	ObjectCount() int
	ObjectCountAsync() int
	GetPropertyLengthTracker() *inverted.JsonShardMetaData

	PutObject(context.Context, *storobj.Object) error
	PutObjectBatch(context.Context, []*storobj.Object) []error
	ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)
	ObjectByIDErrDeleted(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)
	Exists(ctx context.Context, id strfmt.UUID) (bool, error)
	ObjectSearch(ctx context.Context, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, properties []string) ([]*storobj.Object, []float32, error)
	ObjectVectorSearch(ctx context.Context, searchVectors []models.Vector, targetVectors []string, targetDist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties, targetCombination *dto.TargetCombination, properties []string) ([]*storobj.Object, []float32, error)
	UpdateVectorIndexConfig(ctx context.Context, updated schemaConfig.VectorIndexConfig) error
	UpdateVectorIndexConfigs(ctx context.Context, updated map[string]schemaConfig.VectorIndexConfig) error
	UpdateAsyncReplication(ctx context.Context, enabled bool) error
	AddReferencesBatch(ctx context.Context, refs objects.BatchReferences) []error
	DeleteObjectBatch(ctx context.Context, ids []strfmt.UUID, deletionTime time.Time, dryRun bool) objects.BatchSimpleObjects // Delete many objects by id
	DeleteObject(ctx context.Context, id strfmt.UUID, deletionTime time.Time) error                                           // Delete object by id
	MultiObjectByID(ctx context.Context, query []multi.Identifier) ([]*storobj.Object, error)
	ObjectDigestsByTokenRange(ctx context.Context, initialToken, finalToken uint64, limit int) (objs []replica.RepairResponse, lastTokenRead uint64, err error)
	ID() string // Get the shard id

	HaltForTransfer(ctx context.Context) error

	ListBackupFiles(ctx context.Context, ret *backup.ShardDescriptor) error

	SetPropertyLengths(props []inverted.Property) error
	AnalyzeObject(*storobj.Object) ([]inverted.Property, []inverted.NilProperty, error)
	Aggregate(ctx context.Context, params aggregation.Params, modules *modules.Provider) (*aggregation.Result, error)
	HashTreeLevel(ctx context.Context, level int, discriminant *hashtree.Bitset) (digests []hashtree.Digest, err error)
	MergeObject(ctx context.Context, object objects.MergeDocument) error
	Queue() *VectorIndexQueue
	Queues() map[string]*VectorIndexQueue
	VectorDistanceForQuery(ctx context.Context, id uint64, searchVectors []models.Vector, targets []string) ([]float32, error)
	ConvertQueue(targetVector string) error
	FillQueue(targetVector string, from uint64) error
	Shutdown(context.Context) error // Shutdown the shard

	// TODO tests only
	ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor,
		additional additional.Properties, className schema.ClassName) ([]*storobj.Object, error) // Search and return objects
	WasDeleted(ctx context.Context, id strfmt.UUID) (bool, time.Time, error) // Check if an object was deleted
	VectorIndex() VectorIndex                                                // Get the vector index
	VectorIndexes() map[string]VectorIndex                                   // Get the vector indexes

	// TODO tests only
	Versioner() *shardVersioner // Get the shard versioner

	// TODO tests only
	Dimensions(ctx context.Context) int // dim(vector)*number vectors
	// TODO tests only
	QuantizedDimensions(ctx context.Context, segments int) int

	Metrics() *Metrics

	// A thread-safe counter that goes up any time there is activity on this
	// shard. The absolute value has no meaning, it's only purpose is to compare
	// the previous value to the current value.
	Activity() int32
	// Debug methods
	DebugResetVectorIndex(ctx context.Context, targetVector string) error
	RepairIndex(ctx context.Context, targetVector string) error
	// contains filtered or unexported methods
}

type ShardStatus

type ShardStatus struct {
	Status storagestate.Status
	Reason string
}

func NewShardStatus

func NewShardStatus() ShardStatus

func (*ShardStatus) Init

func (s *ShardStatus) Init()

type Task

type Task[T dto.Embedding] struct {
	// contains filtered or unexported fields
}

func (*Task[T]) Execute

func (t *Task[T]) Execute(ctx context.Context) error

func (*Task[T]) Key

func (t *Task[T]) Key() uint64

func (*Task[T]) NewGroup

func (t *Task[T]) NewGroup(op uint8, tasks ...queue.Task) queue.Task

func (*Task[T]) Op

func (t *Task[T]) Op() uint8

type TaskGroup

type TaskGroup[T dto.Embedding] struct {
	// contains filtered or unexported fields
}

func (*TaskGroup[T]) Execute

func (t *TaskGroup[T]) Execute(ctx context.Context) error

func (*TaskGroup[T]) Key

func (t *TaskGroup[T]) Key() uint64

func (*TaskGroup[T]) Op

func (t *TaskGroup[T]) Op() uint8

type VectorIndex

type VectorIndex interface {
	Dump(labels ...string)
	Add(ctx context.Context, id uint64, vector []float32) error
	AddMulti(ctx context.Context, docId uint64, vector [][]float32) error
	AddBatch(ctx context.Context, ids []uint64, vector [][]float32) error
	AddMultiBatch(ctx context.Context, docIds []uint64, vectors [][][]float32) error
	Delete(id ...uint64) error
	DeleteMulti(id ...uint64) error
	SearchByVector(ctx context.Context, vector []float32, k int, allow helpers.AllowList) ([]uint64, []float32, error)
	SearchByVectorDistance(ctx context.Context, vector []float32, dist float32,
		maxLimit int64, allow helpers.AllowList) ([]uint64, []float32, error)
	SearchByMultiVector(ctx context.Context, vector [][]float32, k int, allow helpers.AllowList) ([]uint64, []float32, error)
	SearchByMultiVectorDistance(ctx context.Context, vector [][]float32, dist float32,
		maxLimit int64, allow helpers.AllowList) ([]uint64, []float32, error)
	UpdateUserConfig(updated schemaConfig.VectorIndexConfig, callback func()) error
	GetKeys(id uint64) (uint64, uint64, error)
	Drop(ctx context.Context) error
	Shutdown(ctx context.Context) error
	Flush() error
	SwitchCommitLogs(ctx context.Context) error
	ListFiles(ctx context.Context, basePath string) ([]string, error)
	PostStartup()
	Compressed() bool
	Multivector() bool
	ValidateBeforeInsert(vector []float32) error
	ValidateMultiBeforeInsert(vector [][]float32) error
	DistanceBetweenVectors(x, y []float32) (float32, error)
	// ContainsNode returns true if the index contains the node with the given id.
	// It must return false if the node does not exist, or has a tombstone.
	ContainsNode(id uint64) bool
	AlreadyIndexed() uint64
	// Iterate over all nodes in the index.
	// Consistency is not guaranteed, as the
	// index may be concurrently modified.
	// If the callback returns false, the iteration will stop.
	Iterate(fn func(id uint64) bool)
	DistancerProvider() distancer.Provider
	QueryVectorDistancer(queryVector []float32) common.QueryVectorDistancer
	QueryMultiVectorDistancer(queryVector [][]float32) common.QueryVectorDistancer
	Stats() (common.IndexStats, error)
}

VectorIndex is anything that indexes vectors efficiently. For an example look at ./vector/hnsw/index.go

type VectorIndexQueue

type VectorIndexQueue struct {
	*queue.DiskQueue
	// contains filtered or unexported fields
}

func NewVectorIndexQueue

func NewVectorIndexQueue(
	shard *Shard,
	targetVector string,
	index VectorIndex,
) (*VectorIndexQueue, error)

func (*VectorIndexQueue) BeforeSchedule

func (iq *VectorIndexQueue) BeforeSchedule() (skip bool)

func (*VectorIndexQueue) Close

func (iq *VectorIndexQueue) Close() error

func (*VectorIndexQueue) Delete

func (iq *VectorIndexQueue) Delete(ids ...uint64) error

func (*VectorIndexQueue) Flush

func (iq *VectorIndexQueue) Flush() error

func (*VectorIndexQueue) Insert

func (iq *VectorIndexQueue) Insert(ctx context.Context, vectors ...common.VectorRecord) error

func (*VectorIndexQueue) OnBatchProcessed

func (iq *VectorIndexQueue) OnBatchProcessed()

Flush the vector index after a batch is processed.

func (*VectorIndexQueue) ResetWith

func (iq *VectorIndexQueue) ResetWith(vidx VectorIndex)

ResetWith resets the queue with the given vector index. The queue must be paused before calling this method.

type VectorIndexQueueMetrics

type VectorIndexQueueMetrics struct {
	// contains filtered or unexported fields
}

func NewVectorIndexQueueMetrics

func NewVectorIndexQueueMetrics(
	logger logrus.FieldLogger, prom *monitoring.PrometheusMetrics,
	className, shardName string, targetVector string,
) *VectorIndexQueueMetrics

func (*VectorIndexQueueMetrics) Delete

func (m *VectorIndexQueueMetrics) Delete(start time.Time, count int)

func (*VectorIndexQueueMetrics) DeleteShardLabels

func (m *VectorIndexQueueMetrics) DeleteShardLabels(class, shard string)

func (*VectorIndexQueueMetrics) Insert

func (m *VectorIndexQueueMetrics) Insert(start time.Time, count int)

func (*VectorIndexQueueMetrics) QueueMetrics

func (m *VectorIndexQueueMetrics) QueueMetrics() *queue.Metrics

Directories

Path Synopsis
clusterintegrationtest acts as a test package to provide a component test spanning multiple parts of the application, including everything that's required for a distributed setup.
clusterintegrationtest acts as a test package to provide a component test spanning multiple parts of the application, including everything that's required for a distributed setup.
entities
ent contains common types used throughout various lsmkv (sub-)packages
ent contains common types used throughout various lsmkv (sub-)packages
Package roaringset contains all the LSM business logic that is unique to the "RoaringSet" strategy
Package roaringset contains all the LSM business logic that is unique to the "RoaringSet" strategy
vector
geo
hnsw/distancer/asm
asm only has amd64 specific implementations at the moment
asm only has amd64 specific implementations at the moment

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL