db

package
v1.24.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2024 License: BSD-3-Clause Imports: 86 Imported by: 1

Documentation

Overview

Some standard accessors for the shard struct. It is important to NEVER access the shard struct directly, because we lazy load shards, so the information might not be there.

Index

Constants

View Source
const (
	ShardCodeBaseVersion                  = uint16(2)
	ShardCodeBaseMinimumVersionForStartup = uint16(1)
)

ShardCodeBaseVersion must be increased whenever there are breaking changes - including those that we can handle in a non-breaking way the version checker can then decide on init if it should prevent startup completely. If it does not prevent startup, but there is still a version mismatch, the version can be used to make specific decisions

CHANGELOG

  • Version 1 - Everything up until Weaviate v1.10.1 inclusive
  • Version 2 - Inverted Index is now stored in an always sorted fashion and doc ids are stored as BigEndian. To make this backward-compatible with v1, doc ids need to be read and written as Little Endian. In addition, an additional sort step is required in three places: during a MapList call, during a Map Cursor and during Map Compactions. BM25 is entirely disabled prior to this version
View Source
const IdLockPoolSize = 128

Variables

This section is empty.

Functions

This section is empty.

Types

type BackupState

type BackupState struct {
	BackupID   string
	InProgress bool
}

type Config

type Config struct {
	RootPath                  string
	QueryLimit                int64
	QueryMaximumResults       int64
	QueryNestedRefLimit       int64
	ResourceUsage             config.ResourceUsage
	MaxImportGoroutinesFactor float64
	MemtablesFlushDirtyAfter  int
	MemtablesInitialSizeMB    int
	MemtablesMaxSizeMB        int
	MemtablesMinActiveSeconds int
	MemtablesMaxActiveSeconds int
	MaxSegmentSize            int64
	HNSWMaxLogSize            int64
	HNSWWaitForCachePrefill   bool
	TrackVectorDimensions     bool
	ServerVersion             string
	GitHash                   string
	AvoidMMap                 bool
	DisableLazyLoadShards     bool
	ForceFullReplicasSearch   bool
	Replication               replication.GlobalConfig
}

type DB

type DB struct {
	// contains filtered or unexported fields
}

func New

func New(logger logrus.FieldLogger, config Config,
	remoteIndex sharding.RemoteIndexClient, nodeResolver nodeResolver,
	remoteNodesClient sharding.RemoteNodeClient, replicaClient replica.Client,
	promMetrics *monitoring.PrometheusMetrics, memMonitor *memwatch.Monitor,
) (*DB, error)

func (*DB) AbortReplication

func (db *DB) AbortReplication(class,
	shard, requestID string,
) interface{}

func (*DB) AddBatchReferences

func (db *DB) AddBatchReferences(ctx context.Context, references objects.BatchReferences,
	repl *additional.ReplicationProperties,
) (objects.BatchReferences, error)

func (*DB) AddReference

func (db *DB) AddReference(ctx context.Context, source *crossref.RefSource, target *crossref.Ref,
	repl *additional.ReplicationProperties, tenant string,
) error

func (*DB) Aggregate

func (db *DB) Aggregate(ctx context.Context,
	params aggregation.Params,
) (*aggregation.Result, error)

func (*DB) AggregateNeighbors

func (db *DB) AggregateNeighbors(ctx context.Context, vector []float32,
	class string, properties []string, k int,
	filter *libfilters.LocalFilter,
) ([]classification.NeighborRef, error)

TODO: why is this logic in the persistence package? This is business-logic, move out of here!

func (*DB) BackupDescriptors

func (db *DB) BackupDescriptors(ctx context.Context, bakid string, classes []string,
) <-chan backup.ClassDescriptor

BackupDescriptors returns a channel of class descriptors. Class descriptor records everything needed to restore a class If an error happens a descriptor with an error will be written to the channel just before closing it.

func (*DB) Backupable

func (db *DB) Backupable(ctx context.Context, classes []string) error

Backupable returns whether all given class can be backed up.

func (*DB) BatchDeleteObjects

func (db *DB) BatchDeleteObjects(ctx context.Context, params objects.BatchDeleteParams,
	repl *additional.ReplicationProperties, tenant string,
) (objects.BatchDeleteResult, error)

func (*DB) BatchPutObjects

func (db *DB) BatchPutObjects(ctx context.Context, objs objects.BatchObjects,
	repl *additional.ReplicationProperties,
) (objects.BatchObjects, error)

func (*DB) ClassExists

func (db *DB) ClassExists(name string) bool

func (*DB) CommitReplication

func (db *DB) CommitReplication(class,
	shard, requestID string,
) interface{}

func (*DB) CrossClassVectorSearch added in v1.20.0

func (db *DB) CrossClassVectorSearch(ctx context.Context, vector []float32, targetVector string, offset, limit int,
	filters *filters.LocalFilter,
) ([]search.Result, error)

func (*DB) DeleteIndex

func (db *DB) DeleteIndex(className schema.ClassName) error

DeleteIndex deletes the index

func (*DB) DeleteObject

func (db *DB) DeleteObject(ctx context.Context, class string, id strfmt.UUID,
	repl *additional.ReplicationProperties, tenant string,
) error

DeleteObject from of a specific class giving its ID

func (*DB) DenseObjectSearch added in v1.20.0

func (db *DB) DenseObjectSearch(ctx context.Context, class string, vector []float32,
	targetVector string, offset int, limit int, filters *filters.LocalFilter,
	addl additional.Properties, tenant string,
) ([]*storobj.Object, []float32, error)

DenseObjectSearch is used to perform a vector search on the db

Earlier use cases required only []search.Result as a return value from the db, and the Class VectorSearch method fit this need. Later on, other use cases presented the need for the raw storage objects, such as hybrid search.

func (*DB) DigestObjects added in v1.18.0

func (db *DB) DigestObjects(ctx context.Context,
	class, shardName string, ids []strfmt.UUID,
) (result []replica.RepairResponse, err error)

func (*DB) Exists

func (db *DB) Exists(ctx context.Context, class string, id strfmt.UUID,
	repl *additional.ReplicationProperties, tenant string,
) (bool, error)

func (*DB) FetchObject added in v1.18.0

func (db *DB) FetchObject(ctx context.Context,
	class, shardName string, id strfmt.UUID,
) (objects.Replica, error)

func (*DB) FetchObjects added in v1.18.0

func (db *DB) FetchObjects(ctx context.Context,
	class, shardName string, ids []strfmt.UUID,
) ([]objects.Replica, error)

func (*DB) GetConfig added in v1.22.0

func (db *DB) GetConfig() Config

func (*DB) GetIndex

func (db *DB) GetIndex(className schema.ClassName) *Index

GetIndex returns the index if it exists or nil if it doesn't

func (*DB) GetIndexForIncoming

func (db *DB) GetIndexForIncoming(className schema.ClassName) sharding.RemoteIndexIncomingRepo

GetIndexForIncoming returns the index if it exists or nil if it doesn't

func (*DB) GetIndices added in v1.22.0

func (db *DB) GetIndices() []*Index

func (*DB) GetNodeStatus added in v1.20.0

func (db *DB) GetNodeStatus(ctx context.Context, className string, verbosity string) ([]*models.NodeStatus, error)

GetNodeStatus returns the status of all Weaviate nodes.

func (*DB) GetQueryMaximumResults

func (db *DB) GetQueryMaximumResults() int

func (*DB) GetRemoteIndex added in v1.22.0

func (db *DB) GetRemoteIndex() sharding.RemoteIndexClient

func (*DB) GetSchema added in v1.22.0

func (db *DB) GetSchema() schema.Schema

func (*DB) GetSchemaGetter added in v1.22.0

func (db *DB) GetSchemaGetter() schemaUC.SchemaGetter

func (*DB) GetUnclassified

func (db *DB) GetUnclassified(ctx context.Context, class string,
	properties []string, filter *libfilters.LocalFilter,
) ([]search.Result, error)

TODO: why is this logic in the persistence package? This is business-logic, move out of here!

func (*DB) IncomingGetNodeStatus

func (db *DB) IncomingGetNodeStatus(ctx context.Context, className, verbosity string) (*models.NodeStatus, error)

IncomingGetNodeStatus returns the index if it exists or nil if it doesn't

func (*DB) IndexExists added in v1.18.5

func (db *DB) IndexExists(className schema.ClassName) bool

IndexExists returns if an index exists

func (*DB) ListBackupable

func (db *DB) ListBackupable() []string

ListBackupable returns a list of all classes which can be backed up.

func (*DB) ListClasses

func (db *DB) ListClasses(ctx context.Context) []string

func (*DB) LocalNodeStatus added in v1.24.0

func (db *DB) LocalNodeStatus(ctx context.Context, className, output string) *models.NodeStatus

func (*DB) Merge

func (db *DB) Merge(ctx context.Context, merge objects.MergeDocument,
	repl *additional.ReplicationProperties, tenant string,
) error

func (*DB) MultiGet

func (db *DB) MultiGet(ctx context.Context, query []multi.Identifier,
	additional additional.Properties, tenant string,
) ([]search.Result, error)

func (*DB) Object

Object gets object with id from index of specified class.

func (*DB) ObjectByID

func (db *DB) ObjectByID(ctx context.Context, id strfmt.UUID,
	props search.SelectProperties, additional additional.Properties,
	tenant string,
) (*search.Result, error)

ObjectByID checks every index of the particular kind for the ID

@warning: this function is deprecated by Object()

func (*DB) ObjectSearch

func (db *DB) ObjectSearch(ctx context.Context, offset, limit int,
	filters *filters.LocalFilter, sort []filters.Sort,
	additional additional.Properties, tenant string,
) (search.Results, error)

ObjectSearch search each index. Deprecated by Query which searches a specific index

func (*DB) ObjectsByID

func (db *DB) ObjectsByID(ctx context.Context, id strfmt.UUID,
	props search.SelectProperties, additional additional.Properties,
	tenant string,
) (search.Results, error)

ObjectsByID checks every index of the particular kind for the ID this method is only used for Explore queries where we don't have a class context

func (*DB) OverwriteObjects added in v1.18.0

func (db *DB) OverwriteObjects(ctx context.Context,
	class, shardName string, vobjects []*objects.VObject,
) ([]replica.RepairResponse, error)

func (*DB) PutObject

func (db *DB) PutObject(ctx context.Context, obj *models.Object,
	vector []float32, vectors models.Vectors, repl *additional.ReplicationProperties,
) error

func (*DB) Query

func (db *DB) Query(ctx context.Context, q *objects.QueryInput) (search.Results, *objects.Error)

Query a specific class

func (*DB) ReleaseBackup

func (db *DB) ReleaseBackup(ctx context.Context, bakID, class string) (err error)

ReleaseBackup release resources acquired by the index during backup

func (*DB) ReplicateDeletion

func (db *DB) ReplicateDeletion(ctx context.Context, class,
	shard, requestID string, uuid strfmt.UUID,
) replica.SimpleResponse

func (*DB) ReplicateDeletions

func (db *DB) ReplicateDeletions(ctx context.Context, class,
	shard, requestID string, uuids []strfmt.UUID, dryRun bool,
) replica.SimpleResponse

func (*DB) ReplicateObject

func (db *DB) ReplicateObject(ctx context.Context, class,
	shard, requestID string, object *storobj.Object,
) replica.SimpleResponse

func (*DB) ReplicateObjects

func (db *DB) ReplicateObjects(ctx context.Context, class,
	shard, requestID string, objects []*storobj.Object,
) replica.SimpleResponse

func (*DB) ReplicateReferences

func (db *DB) ReplicateReferences(ctx context.Context, class,
	shard, requestID string, refs []objects.BatchReference,
) replica.SimpleResponse

func (*DB) ReplicateUpdate

func (db *DB) ReplicateUpdate(ctx context.Context, class,
	shard, requestID string, mergeDoc *objects.MergeDocument,
) replica.SimpleResponse

func (*DB) ResolveReferences

func (db *DB) ResolveReferences(ctx context.Context, objs search.Results,
	props search.SelectProperties, groupBy *searchparams.GroupBy,
	addl additional.Properties, tenant string,
) (search.Results, error)

ResolveReferences takes a list of search results and enriches them with any referenced objects

func (*DB) Search added in v1.20.0

func (db *DB) Search(ctx context.Context, params dto.GetParams) ([]search.Result, error)

func (*DB) SetSchemaGetter

func (db *DB) SetSchemaGetter(sg schemaUC.SchemaGetter)

func (*DB) Shards

func (db *DB) Shards(ctx context.Context, class string) ([]string, error)

Returns the list of nodes where shards of class are contained. If there are no shards for the class, returns an empty list If there are shards for the class but no nodes are found, return an error

func (*DB) ShardsBackup

func (db *DB) ShardsBackup(
	ctx context.Context, bakID, class string, shards []string,
) (_ backup.ClassDescriptor, err error)

func (*DB) Shutdown

func (db *DB) Shutdown(ctx context.Context) error

func (*DB) SparseObjectSearch added in v1.20.0

func (db *DB) SparseObjectSearch(ctx context.Context, params dto.GetParams) ([]*storobj.Object, []float32, error)

SparseObjectSearch is used to perform an inverted index search on the db

Earlier use cases required only []search.Result as a return value from the db, and the Class ClassSearch method fit this need. Later on, other use cases presented the need for the raw storage objects, such as hybrid search.

func (*DB) StartupComplete

func (db *DB) StartupComplete() bool

func (*DB) VectorSearch

func (db *DB) VectorSearch(ctx context.Context,
	params dto.GetParams,
) ([]search.Result, error)

func (*DB) WaitForStartup

func (db *DB) WaitForStartup(ctx context.Context) error

func (*DB) ZeroShotSearch

func (db *DB) ZeroShotSearch(ctx context.Context, vector []float32,
	class string, properties []string,
	filter *libfilters.LocalFilter,
) ([]search.Result, error)

TODO: why is this logic in the persistence package? This is business-logic, move out of here!

type DimensionCategory added in v1.24.0

type DimensionCategory int
const (
	DimensionCategoryStandard DimensionCategory = iota
	DimensionCategoryPQ
	DimensionCategoryBQ
)

type Index

type Index struct {
	Config IndexConfig
	// contains filtered or unexported fields
}

Index is the logical unit which contains all the data for one particular class. An index can be further broken up into self-contained units, called Shards, to allow for easy distribution across Nodes

func NewIndex

func NewIndex(ctx context.Context, cfg IndexConfig,
	shardState *sharding.State, invertedIndexConfig schema.InvertedIndexConfig,
	vectorIndexUserConfig schema.VectorIndexConfig,
	vectorIndexUserConfigs map[string]schema.VectorIndexConfig,
	sg schemaUC.SchemaGetter,
	cs inverted.ClassSearcher, logger logrus.FieldLogger,
	nodeResolver nodeResolver, remoteClient sharding.RemoteIndexClient,
	replicaClient replica.Client,
	promMetrics *monitoring.PrometheusMetrics, class *models.Class, jobQueueCh chan job,
	indexCheckpoints *indexcheckpoint.Checkpoints,
	allocChecker memwatch.AllocChecker,
) (*Index, error)

NewIndex creates an index with the specified amount of shards, using only the shards that are local to a node

func (*Index) AbortReplication

func (i *Index) AbortReplication(shard, requestID string) interface{}

func (*Index) AddReferencesBatch added in v1.23.0

func (i *Index) AddReferencesBatch(ctx context.Context, refs objects.BatchReferences,
	replProps *additional.ReplicationProperties,
) []error

return value map[int]error gives the error for the index as it received it

func (*Index) CommitReplication

func (i *Index) CommitReplication(shard, requestID string) interface{}

func (*Index) DebugRepairIndex added in v1.24.23

func (i *Index) DebugRepairIndex(ctx context.Context, shardName, targetVector string) error

func (*Index) DebugResetVectorIndex added in v1.24.22

func (i *Index) DebugResetVectorIndex(ctx context.Context, shardName, targetVector string) error

IMPORTANT: DebugResetVectorIndex is intended to be used for debugging purposes only. It drops the selected vector index, creates a new one, then reindexes it in the background. This function assumes the node is not receiving any traffic besides the debug endpoints and that async indexing is enabled.

func (*Index) ForEachShard added in v1.19.6

func (i *Index) ForEachShard(f func(name string, shard ShardLike) error) error

func (*Index) ForEachShardConcurrently added in v1.21.8

func (i *Index) ForEachShardConcurrently(f func(name string, shard ShardLike) error) error

func (*Index) GetShard added in v1.24.22

func (i *Index) GetShard(name string) ShardLike

func (*Index) GetShards added in v1.22.0

func (i *Index) GetShards() []ShardLike

func (*Index) ID

func (i *Index) ID() string

func (*Index) IncomingAggregate

func (i *Index) IncomingAggregate(ctx context.Context, shardName string,
	params aggregation.Params,
) (*aggregation.Result, error)

func (*Index) IncomingBatchAddReferences

func (i *Index) IncomingBatchAddReferences(ctx context.Context, shardName string,
	refs objects.BatchReferences,
) []error

func (*Index) IncomingBatchPutObjects

func (i *Index) IncomingBatchPutObjects(ctx context.Context, shardName string,
	objects []*storobj.Object,
) []error

func (*Index) IncomingCreateShard

func (i *Index) IncomingCreateShard(ctx context.Context, className string, shardName string) error

func (*Index) IncomingDeleteObject

func (i *Index) IncomingDeleteObject(ctx context.Context, shardName string,
	id strfmt.UUID,
) error

func (*Index) IncomingDeleteObjectBatch

func (i *Index) IncomingDeleteObjectBatch(ctx context.Context, shardName string,
	uuids []strfmt.UUID, dryRun bool,
) objects.BatchSimpleObjects

func (*Index) IncomingDigestObjects added in v1.18.0

func (i *Index) IncomingDigestObjects(ctx context.Context,
	shardName string, ids []strfmt.UUID,
) (result []replica.RepairResponse, err error)

func (*Index) IncomingExists

func (i *Index) IncomingExists(ctx context.Context, shardName string,
	id strfmt.UUID,
) (bool, error)

func (*Index) IncomingFilePutter

func (i *Index) IncomingFilePutter(ctx context.Context, shardName,
	filePath string,
) (io.WriteCloser, error)

func (*Index) IncomingFindUUIDs added in v1.22.8

func (i *Index) IncomingFindUUIDs(ctx context.Context, shardName string,
	filters *filters.LocalFilter,
) ([]strfmt.UUID, error)

func (*Index) IncomingGetObject

func (i *Index) IncomingGetObject(ctx context.Context, shardName string,
	id strfmt.UUID, props search.SelectProperties,
	additional additional.Properties,
) (*storobj.Object, error)

func (*Index) IncomingGetShardQueueSize added in v1.22.0

func (i *Index) IncomingGetShardQueueSize(ctx context.Context, shardName string) (int64, error)

func (*Index) IncomingGetShardStatus

func (i *Index) IncomingGetShardStatus(ctx context.Context, shardName string) (string, error)

func (*Index) IncomingMergeObject

func (i *Index) IncomingMergeObject(ctx context.Context, shardName string,
	mergeDoc objects.MergeDocument,
) error

func (*Index) IncomingMultiGetObjects

func (i *Index) IncomingMultiGetObjects(ctx context.Context, shardName string,
	ids []strfmt.UUID,
) ([]*storobj.Object, error)

func (*Index) IncomingOverwriteObjects added in v1.18.0

func (i *Index) IncomingOverwriteObjects(ctx context.Context,
	shardName string, vobjects []*objects.VObject,
) ([]replica.RepairResponse, error)

func (*Index) IncomingPutObject

func (i *Index) IncomingPutObject(ctx context.Context, shardName string,
	object *storobj.Object,
) error

func (*Index) IncomingReinitShard

func (i *Index) IncomingReinitShard(ctx context.Context,
	shardName string,
) error

func (*Index) IncomingSearch

func (i *Index) IncomingSearch(ctx context.Context, shardName string,
	searchVector []float32, targetVector string, distance float32, limit int,
	filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking,
	sort []filters.Sort, cursor *filters.Cursor, groupBy *searchparams.GroupBy,
	additional additional.Properties,
) ([]*storobj.Object, []float32, error)

func (*Index) IncomingUpdateShardStatus

func (i *Index) IncomingUpdateShardStatus(ctx context.Context, shardName, targetStatus string) error

func (*Index) IterateObjects

func (i *Index) IterateObjects(ctx context.Context, cb func(index *Index, shard ShardLike, object *storobj.Object) error) (err error)

Iterate over all objects in the index, applying the callback function to each one. Adding or removing objects during iteration is not supported.

func (*Index) IterateShards added in v1.19.0

func (i *Index) IterateShards(ctx context.Context, cb func(index *Index, shard ShardLike) error) (err error)

Iterate over all objects in the shard, applying the callback function to each one. Adding or removing objects during iteration is not supported.

func (*Index) ReleaseBackup

func (i *Index) ReleaseBackup(ctx context.Context, id string) error

ReleaseBackup marks the specified backup as inactive and restarts all async background and maintenance processes. It errors if the backup does not exist or is already inactive.

func (*Index) ReplicateDeletion

func (i *Index) ReplicateDeletion(ctx context.Context, shard, requestID string, uuid strfmt.UUID) replica.SimpleResponse

func (*Index) ReplicateDeletions

func (i *Index) ReplicateDeletions(ctx context.Context, shard, requestID string, uuids []strfmt.UUID, dryRun bool) replica.SimpleResponse

func (*Index) ReplicateObject

func (i *Index) ReplicateObject(ctx context.Context, shard, requestID string, object *storobj.Object) replica.SimpleResponse

func (*Index) ReplicateObjects

func (i *Index) ReplicateObjects(ctx context.Context, shard, requestID string, objects []*storobj.Object) replica.SimpleResponse

func (*Index) ReplicateReferences

func (i *Index) ReplicateReferences(ctx context.Context, shard, requestID string, refs []objects.BatchReference) replica.SimpleResponse

func (*Index) ReplicateUpdate

func (i *Index) ReplicateUpdate(ctx context.Context, shard, requestID string, doc *objects.MergeDocument) replica.SimpleResponse

func (*Index) Shutdown

func (i *Index) Shutdown(ctx context.Context) error

type IndexConfig

type IndexConfig struct {
	RootPath                  string
	ClassName                 schema.ClassName
	QueryMaximumResults       int64
	QueryNestedRefLimit       int64
	ResourceUsage             config.ResourceUsage
	MemtablesFlushDirtyAfter  int
	MemtablesInitialSizeMB    int
	MemtablesMaxSizeMB        int
	MemtablesMinActiveSeconds int
	MemtablesMaxActiveSeconds int
	MaxSegmentSize            int64
	HNSWMaxLogSize            int64
	HNSWWaitForCachePrefill   bool
	ReplicationFactor         int64
	AvoidMMap                 bool
	DisableLazyLoadShards     bool
	ForceFullReplicasSearch   bool

	TrackVectorDimensions bool
}

type IndexQueue added in v1.22.0

type IndexQueue struct {
	Shard shardStatusUpdater

	IndexQueueOptions

	Checkpoints *indexcheckpoint.Checkpoints
	// contains filtered or unexported fields
}

IndexQueue is an in-memory queue of vectors to index. It batches vectors together before sending them to the indexing workers. It is safe to use concurrently.

func NewIndexQueue added in v1.22.0

func NewIndexQueue(
	className string,
	shardID string,
	targetVector string,
	shard shardStatusUpdater,
	index batchIndexer,
	centralJobQueue chan job,
	checkpoints *indexcheckpoint.Checkpoints,
	opts IndexQueueOptions,
	promMetrics *monitoring.PrometheusMetrics,
) (*IndexQueue, error)

func (*IndexQueue) Close added in v1.22.0

func (q *IndexQueue) Close() error

Close immediately closes the queue and waits for workers to finish their current tasks. Any pending vectors are discarded.

func (*IndexQueue) Delete added in v1.22.0

func (q *IndexQueue) Delete(ids ...uint64) error

Deletes the vectors from the index synchronously if they are already indexed, otherwise it marks them as deleted in the queue. If async indexing is disabled, it calls the index delete method directly.

func (*IndexQueue) Drop added in v1.22.0

func (q *IndexQueue) Drop() error

Drop removes all persisted data related to the queue. It closes the queue if not already. It does not remove the index. It should be called only when the index is dropped.

func (*IndexQueue) PauseIndexing added in v1.24.22

func (q *IndexQueue) PauseIndexing()

pause indexing and wait for the workers to finish their current tasks related to this queue.

func (*IndexQueue) Push added in v1.22.0

func (q *IndexQueue) Push(ctx context.Context, vectors ...vectorDescriptor) error

Push adds a list of vectors to the queue.

func (*IndexQueue) ResetWith added in v1.24.22

func (q *IndexQueue) ResetWith(v batchIndexer) error

Reset the queue with the given VectorIndex. - discard any pending vectors - reset the checkpoint to 0 Requires the queue to be paused. It does not resume the queue.

func (*IndexQueue) ResumeIndexing added in v1.24.22

func (q *IndexQueue) ResumeIndexing()

resume indexing

func (*IndexQueue) SearchByVector added in v1.22.0

func (q *IndexQueue) SearchByVector(vector []float32, k int, allowList helpers.AllowList) ([]uint64, []float32, error)

SearchByVector performs the search through the index first, then uses brute force to query unindexed vectors.

func (*IndexQueue) SearchByVectorDistance added in v1.22.0

func (q *IndexQueue) SearchByVectorDistance(vector []float32, dist float32, maxLimit int64, allowList helpers.AllowList) ([]uint64, []float32, error)

SearchByVectorDistance performs the search through the index first, then uses brute force to query unindexed vectors.

func (*IndexQueue) Size added in v1.22.0

func (q *IndexQueue) Size() int64

Size returns the number of vectors waiting to be indexed.

func (*IndexQueue) Wait added in v1.24.22

func (q *IndexQueue) Wait()

Waits for the workers to finish their current indexing tasks. It does not pause the queue. This method can potentially block for a long time if the queue is receiving vectors.

type IndexQueueMetrics added in v1.24.19

type IndexQueueMetrics struct {
	// contains filtered or unexported fields
}

func NewIndexQueueMetrics added in v1.24.19

func NewIndexQueueMetrics(
	logger logrus.FieldLogger, prom *monitoring.PrometheusMetrics,
	className, shardName string, targetVector string,
) *IndexQueueMetrics

func (*IndexQueueMetrics) Delete added in v1.24.19

func (m *IndexQueueMetrics) Delete(start time.Time, count int)

func (*IndexQueueMetrics) DeleteShardLabels added in v1.24.19

func (m *IndexQueueMetrics) DeleteShardLabels(class, shard string)

func (*IndexQueueMetrics) Paused added in v1.24.19

func (m *IndexQueueMetrics) Paused()

func (*IndexQueueMetrics) Preload added in v1.24.19

func (m *IndexQueueMetrics) Preload(start time.Time, count int)

func (*IndexQueueMetrics) Push added in v1.24.19

func (m *IndexQueueMetrics) Push(start time.Time, size int)

func (*IndexQueueMetrics) Resumed added in v1.24.19

func (m *IndexQueueMetrics) Resumed()

func (*IndexQueueMetrics) Search added in v1.24.19

func (m *IndexQueueMetrics) Search(start time.Time)

func (*IndexQueueMetrics) Size added in v1.24.19

func (m *IndexQueueMetrics) Size(size int64)

func (*IndexQueueMetrics) Stale added in v1.24.19

func (m *IndexQueueMetrics) Stale()

func (*IndexQueueMetrics) VectorsDequeued added in v1.24.19

func (m *IndexQueueMetrics) VectorsDequeued(count int64)

func (*IndexQueueMetrics) Wait added in v1.24.19

func (m *IndexQueueMetrics) Wait(start time.Time)

type IndexQueueOptions added in v1.22.0

type IndexQueueOptions struct {
	// BatchSize is the number of vectors to batch together
	// before sending them to the indexing worker.
	BatchSize int

	// IndexInterval is the maximum time to wait before sending
	// the pending vectors to the indexing worker.
	IndexInterval time.Duration

	// Max time a vector can stay in the queue before being indexed.
	StaleTimeout time.Duration

	// Logger is the logger used by the queue.
	Logger logrus.FieldLogger

	// Maximum number of vectors to use for brute force search
	// when vectors are not indexed.
	BruteForceSearchLimit int

	// Maximum number of chunks to send to the workers in a single tick.
	MaxChunksPerTick int

	// Enable throttling of the indexing process.
	Throttle bool
}

type KnnAggregator

type KnnAggregator struct {
	// contains filtered or unexported fields
}

TODO: this is business logic, move out of here

func NewKnnAggregator

func NewKnnAggregator(input search.Results, sourceVector []float32) *KnnAggregator

func (*KnnAggregator) Aggregate

func (a *KnnAggregator) Aggregate(k int, properties []string) ([]classification.NeighborRef, error)

type LazyLoadShard added in v1.23.0

type LazyLoadShard struct {
	// contains filtered or unexported fields
}

func NewLazyLoadShard added in v1.23.0

func NewLazyLoadShard(ctx context.Context, promMetrics *monitoring.PrometheusMetrics,
	shardName string, index *Index, class *models.Class, jobQueueCh chan job,
	indexCheckpoints *indexcheckpoint.Checkpoints, memMonitor memwatch.AllocChecker,
) *LazyLoadShard

func (*LazyLoadShard) AddReferencesBatch added in v1.23.0

func (l *LazyLoadShard) AddReferencesBatch(ctx context.Context, refs objects.BatchReferences) []error

func (*LazyLoadShard) Aggregate added in v1.23.0

func (l *LazyLoadShard) Aggregate(ctx context.Context, params aggregation.Params) (*aggregation.Result, error)

func (*LazyLoadShard) AnalyzeObject added in v1.23.0

func (l *LazyLoadShard) AnalyzeObject(object *storobj.Object) ([]inverted.Property, []inverted.NilProperty, error)

func (*LazyLoadShard) BeginBackup added in v1.23.0

func (l *LazyLoadShard) BeginBackup(ctx context.Context) error

func (*LazyLoadShard) Counter added in v1.23.0

func (l *LazyLoadShard) Counter() *indexcounter.Counter

func (*LazyLoadShard) DebugResetVectorIndex added in v1.24.22

func (l *LazyLoadShard) DebugResetVectorIndex(ctx context.Context, targetVector string) error

func (*LazyLoadShard) DeleteObject added in v1.23.0

func (l *LazyLoadShard) DeleteObject(ctx context.Context, id strfmt.UUID) error

func (*LazyLoadShard) DeleteObjectBatch added in v1.23.0

func (l *LazyLoadShard) DeleteObjectBatch(ctx context.Context, ids []strfmt.UUID, dryRun bool) objects.BatchSimpleObjects

func (*LazyLoadShard) Dimensions added in v1.23.0

func (l *LazyLoadShard) Dimensions(ctx context.Context) int

func (*LazyLoadShard) Exists added in v1.23.0

func (l *LazyLoadShard) Exists(ctx context.Context, id strfmt.UUID) (bool, error)

func (*LazyLoadShard) FindUUIDs added in v1.23.0

func (l *LazyLoadShard) FindUUIDs(ctx context.Context, filters *filters.LocalFilter) ([]strfmt.UUID, error)

func (*LazyLoadShard) GetPropertyLengthTracker added in v1.23.0

func (l *LazyLoadShard) GetPropertyLengthTracker() *inverted.JsonShardMetaData

func (*LazyLoadShard) GetStatus added in v1.23.0

func (l *LazyLoadShard) GetStatus() storagestate.Status

func (*LazyLoadShard) GetStatusNoLoad added in v1.24.20

func (l *LazyLoadShard) GetStatusNoLoad() storagestate.Status

func (*LazyLoadShard) ID added in v1.23.0

func (l *LazyLoadShard) ID() string

func (*LazyLoadShard) Index added in v1.23.0

func (l *LazyLoadShard) Index() *Index

func (*LazyLoadShard) ListBackupFiles added in v1.23.0

func (l *LazyLoadShard) ListBackupFiles(ctx context.Context, ret *backup.ShardDescriptor) error

func (*LazyLoadShard) Load added in v1.23.0

func (l *LazyLoadShard) Load(ctx context.Context) error

func (*LazyLoadShard) MergeObject added in v1.23.0

func (l *LazyLoadShard) MergeObject(ctx context.Context, object objects.MergeDocument) error

func (*LazyLoadShard) Metrics added in v1.23.0

func (l *LazyLoadShard) Metrics() *Metrics

func (*LazyLoadShard) MultiObjectByID added in v1.23.0

func (l *LazyLoadShard) MultiObjectByID(ctx context.Context, query []multi.Identifier) ([]*storobj.Object, error)

func (*LazyLoadShard) Name added in v1.23.0

func (l *LazyLoadShard) Name() string

func (*LazyLoadShard) NotifyReady added in v1.23.0

func (l *LazyLoadShard) NotifyReady()

func (*LazyLoadShard) ObjectByID added in v1.23.0

func (l *LazyLoadShard) ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)

func (*LazyLoadShard) ObjectByIDErrDeleted added in v1.23.15

func (l *LazyLoadShard) ObjectByIDErrDeleted(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)

func (*LazyLoadShard) ObjectCount added in v1.23.0

func (l *LazyLoadShard) ObjectCount() int

func (*LazyLoadShard) ObjectCountAsync added in v1.23.10

func (l *LazyLoadShard) ObjectCountAsync() int

func (*LazyLoadShard) ObjectList added in v1.23.0

func (l *LazyLoadShard) ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, className schema.ClassName) ([]*storobj.Object, error)

func (*LazyLoadShard) ObjectSearch added in v1.23.0

func (l *LazyLoadShard) ObjectSearch(ctx context.Context, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties) ([]*storobj.Object, []float32, error)

func (*LazyLoadShard) ObjectVectorSearch added in v1.23.0

func (l *LazyLoadShard) ObjectVectorSearch(ctx context.Context, searchVector []float32, targetVector string, targetDist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties) ([]*storobj.Object, []float32, error)

func (*LazyLoadShard) PreloadQueue added in v1.24.23

func (l *LazyLoadShard) PreloadQueue(targetVector string) error

func (*LazyLoadShard) PutObject added in v1.23.0

func (l *LazyLoadShard) PutObject(ctx context.Context, object *storobj.Object) error

func (*LazyLoadShard) PutObjectBatch added in v1.23.0

func (l *LazyLoadShard) PutObjectBatch(ctx context.Context, objects []*storobj.Object) []error

func (*LazyLoadShard) QuantizedDimensions added in v1.23.0

func (l *LazyLoadShard) QuantizedDimensions(ctx context.Context, segments int) int

func (*LazyLoadShard) Queue added in v1.23.0

func (l *LazyLoadShard) Queue() *IndexQueue

func (*LazyLoadShard) Queues added in v1.24.0

func (l *LazyLoadShard) Queues() map[string]*IndexQueue

func (*LazyLoadShard) RepairIndex added in v1.24.23

func (l *LazyLoadShard) RepairIndex(ctx context.Context, targetVector string) error

func (*LazyLoadShard) SetPropertyLengths added in v1.23.0

func (l *LazyLoadShard) SetPropertyLengths(props []inverted.Property) error

func (*LazyLoadShard) Shutdown added in v1.23.0

func (l *LazyLoadShard) Shutdown(ctx context.Context) error

func (*LazyLoadShard) Store added in v1.23.0

func (l *LazyLoadShard) Store() *lsmkv.Store

func (*LazyLoadShard) UpdateStatus added in v1.23.0

func (l *LazyLoadShard) UpdateStatus(status string) error

func (*LazyLoadShard) UpdateVectorIndexConfig added in v1.23.0

func (l *LazyLoadShard) UpdateVectorIndexConfig(ctx context.Context, updated schema.VectorIndexConfig) error

func (*LazyLoadShard) UpdateVectorIndexConfigs added in v1.24.1

func (l *LazyLoadShard) UpdateVectorIndexConfigs(ctx context.Context, updated map[string]schema.VectorIndexConfig) error

func (*LazyLoadShard) VectorIndex added in v1.23.0

func (l *LazyLoadShard) VectorIndex() VectorIndex

func (*LazyLoadShard) VectorIndexes added in v1.24.0

func (l *LazyLoadShard) VectorIndexes() map[string]VectorIndex

func (*LazyLoadShard) Versioner added in v1.23.0

func (l *LazyLoadShard) Versioner() *shardVersioner

func (*LazyLoadShard) WasDeleted added in v1.23.0

func (l *LazyLoadShard) WasDeleted(ctx context.Context, id strfmt.UUID) (bool, error)

type Metrics

type Metrics struct {
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics(
	logger logrus.FieldLogger, prom *monitoring.PrometheusMetrics,
	className, shardName string,
) *Metrics

func (*Metrics) BatchDelete

func (m *Metrics) BatchDelete(start time.Time, op string)

func (*Metrics) BatchObject

func (m *Metrics) BatchObject(start time.Time, size int)

func (*Metrics) DeleteShardLabels added in v1.21.7

func (m *Metrics) DeleteShardLabels(class, shard string)

func (*Metrics) FilteredVectorFilter added in v1.18.0

func (m *Metrics) FilteredVectorFilter(dur time.Duration)

func (*Metrics) FilteredVectorObjects added in v1.18.0

func (m *Metrics) FilteredVectorObjects(dur time.Duration)

func (*Metrics) FilteredVectorSort added in v1.18.0

func (m *Metrics) FilteredVectorSort(dur time.Duration)

func (*Metrics) FilteredVectorVector added in v1.18.0

func (m *Metrics) FilteredVectorVector(dur time.Duration)

func (*Metrics) InvertedDeleteDelta

func (m *Metrics) InvertedDeleteDelta(start time.Time)

func (*Metrics) InvertedDeleteOld

func (m *Metrics) InvertedDeleteOld(start time.Time)

func (*Metrics) InvertedExtend

func (m *Metrics) InvertedExtend(start time.Time, propCount int)

func (*Metrics) ObjectStore

func (m *Metrics) ObjectStore(start time.Time)

func (*Metrics) PutObject

func (m *Metrics) PutObject(start time.Time)

func (*Metrics) PutObjectDetermineStatus

func (m *Metrics) PutObjectDetermineStatus(start time.Time)

func (*Metrics) PutObjectUpdateInverted

func (m *Metrics) PutObjectUpdateInverted(start time.Time)

func (*Metrics) PutObjectUpsertObject

func (m *Metrics) PutObjectUpsertObject(start time.Time)

func (*Metrics) ShardStartup

func (m *Metrics) ShardStartup(start time.Time)

func (*Metrics) VectorIndex

func (m *Metrics) VectorIndex(start time.Time)

type Migrator

type Migrator struct {
	// contains filtered or unexported fields
}

func NewMigrator

func NewMigrator(db *DB, logger logrus.FieldLogger) *Migrator

func (*Migrator) AddClass

func (m *Migrator) AddClass(ctx context.Context, class *models.Class,
	shardState *sharding.State,
) error

func (*Migrator) AddProperty

func (m *Migrator) AddProperty(ctx context.Context, className string, prop *models.Property) error

func (*Migrator) AdjustFilterablePropSettings added in v1.19.0

func (m *Migrator) AdjustFilterablePropSettings(ctx context.Context) error

As of v1.19 property's IndexInverted setting is replaced with IndexFilterable and IndexSearchable Filterable buckets use roaring set strategy and searchable ones use map strategy (therefore are applicable just for text/text[]) Since both type of buckets can coexist for text/text[] props they need to be distinguished by their name: searchable bucket has "searchable" suffix. Up until v1.19 default text/text[]/string/string[] (string/string[] deprecated since v1.19) strategy for buckets was map, migrating from pre v1.19 to v1.19 needs to properly handle existing text/text[] buckets of map strategy having filterable bucket name.

Enabled InvertedIndex translates in v1.19 to both InvertedFilterable and InvertedSearchable enabled, but since only searchable bucket exist (with filterable name), it has to be renamed to searchable bucket. Though IndexFilterable setting is enabled filterable index does not exists, therefore shards are switched into fallback mode, to use searchable buckets instead of filterable ones whenever filtered are expected. Fallback mode effectively sets IndexFilterable to false, although it stays enabled according to schema.

If filterable indexes will be created (that is up to user to decide whether missing indexes should be created later on), shards will not be working in fallback mode, and actual filterable index will be used when needed.

func (*Migrator) DeleteTenants added in v1.20.0

func (m *Migrator) DeleteTenants(ctx context.Context, class *models.Class, tenants []string) (commit func(success bool), err error)

DeleteTenants deletes tenants and returns a commit func that can be used to either commit or rollback deletion

func (*Migrator) DropClass

func (m *Migrator) DropClass(ctx context.Context, className string) error

func (*Migrator) DropProperty

func (m *Migrator) DropProperty(ctx context.Context, className string, propertyName string) error

DropProperty is ignored, API compliant change

func (*Migrator) GetShardsQueueSize added in v1.22.0

func (m *Migrator) GetShardsQueueSize(ctx context.Context, className, tenant string) (map[string]int64, error)

func (*Migrator) GetShardsStatus

func (m *Migrator) GetShardsStatus(ctx context.Context, className, tenant string) (map[string]string, error)

func (*Migrator) InvertedReindex added in v1.18.0

func (m *Migrator) InvertedReindex(ctx context.Context, taskNames ...string) error

func (*Migrator) NewTenants added in v1.20.0

func (m *Migrator) NewTenants(ctx context.Context, class *models.Class, creates []*migrate.CreateTenantPayload) (commit func(success bool), err error)

NewTenants creates new partitions and returns a commit func that can be used to either commit or rollback the partitions

func (*Migrator) RecalculateVectorDimensions

func (m *Migrator) RecalculateVectorDimensions(ctx context.Context) error

func (*Migrator) RecountProperties added in v1.19.0

func (m *Migrator) RecountProperties(ctx context.Context) error

func (*Migrator) UpdateClass

func (m *Migrator) UpdateClass(ctx context.Context, className string, newClassName *string) error

func (*Migrator) UpdateInvertedIndexConfig

func (m *Migrator) UpdateInvertedIndexConfig(ctx context.Context, className string,
	updated *models.InvertedIndexConfig,
) error

func (*Migrator) UpdateProperty

func (m *Migrator) UpdateProperty(ctx context.Context, className string, propName string, newName *string) error

func (*Migrator) UpdateShardStatus

func (m *Migrator) UpdateShardStatus(ctx context.Context, className, shardName, targetStatus string) error

func (*Migrator) UpdateTenants added in v1.21.0

func (m *Migrator) UpdateTenants(ctx context.Context, class *models.Class, updates []*migrate.UpdateTenantPayload) (commit func(success bool), err error)

UpdateTenans activates or deactivates tenant partitions and returns a commit func that can be used to either commit or rollback the changes

func (*Migrator) UpdateVectorIndexConfig

func (m *Migrator) UpdateVectorIndexConfig(ctx context.Context,
	className string, updated schema.VectorIndexConfig,
) error

func (*Migrator) UpdateVectorIndexConfigs added in v1.24.0

func (m *Migrator) UpdateVectorIndexConfigs(ctx context.Context,
	className string, updated map[string]schema.VectorIndexConfig,
) error

func (*Migrator) ValidateInvertedIndexConfigUpdate

func (m *Migrator) ValidateInvertedIndexConfigUpdate(ctx context.Context,
	old, updated *models.InvertedIndexConfig,
) error

func (*Migrator) ValidateVectorIndexConfigUpdate

func (m *Migrator) ValidateVectorIndexConfigUpdate(ctx context.Context,
	old, updated schema.VectorIndexConfig,
) error

func (*Migrator) ValidateVectorIndexConfigsUpdate added in v1.24.0

func (m *Migrator) ValidateVectorIndexConfigsUpdate(ctx context.Context,
	old, updated map[string]schema.VectorIndexConfig,
) error

type PropertyIndexType added in v1.18.0

type PropertyIndexType uint8
const (
	IndexTypePropValue PropertyIndexType = iota + 1
	IndexTypePropLength
	IndexTypePropNull
	IndexTypePropSearchableValue
)

func GetPropNameAndIndexTypeFromBucketName added in v1.18.0

func GetPropNameAndIndexTypeFromBucketName(bucketName string) (string, PropertyIndexType)

type ReindexableProperty added in v1.18.0

type ReindexableProperty struct {
	PropertyName    string
	IndexType       PropertyIndexType
	NewIndex        bool // is new index, there is no bucket to replace with
	DesiredStrategy string
	BucketOptions   []lsmkv.BucketOption
}

type Replicator

type Replicator interface {
	ReplicateObject(ctx context.Context, shardName, requestID string,
		object *storobj.Object) replica.SimpleResponse
	ReplicateObjects(ctx context.Context, shardName, requestID string,
		objects []*storobj.Object) replica.SimpleResponse
	ReplicateUpdate(ctx context.Context, shard, requestID string,
		doc *objects.MergeDocument) replica.SimpleResponse
	ReplicateDeletion(ctx context.Context, shardName, requestID string,
		uuid strfmt.UUID) replica.SimpleResponse
	ReplicateDeletions(ctx context.Context, shardName, requestID string,
		uuids []strfmt.UUID, dryRun bool) replica.SimpleResponse
	ReplicateReferences(ctx context.Context, shard, requestID string,
		refs []objects.BatchReference) replica.SimpleResponse
	CommitReplication(shard,
		requestID string) interface{}
	AbortReplication(shardName,
		requestID string) interface{}
}

type Shard

type Shard struct {
	// contains filtered or unexported fields
}

Shard is the smallest completely-contained index unit. A shard manages database files for all the objects it owns. How a shard is determined for a target object (e.g. Murmur hash, etc.) is still open at this point

func NewShard

func NewShard(ctx context.Context, promMetrics *monitoring.PrometheusMetrics,
	shardName string, index *Index, class *models.Class, jobQueueCh chan job,
	indexCheckpoints *indexcheckpoint.Checkpoints,
) (_ *Shard, err error)

func (*Shard) AddReferencesBatch added in v1.23.0

func (s *Shard) AddReferencesBatch(ctx context.Context, refs objects.BatchReferences) []error

return value map[int]error gives the error for the index as it received it

func (*Shard) Aggregate added in v1.23.0

func (s *Shard) Aggregate(ctx context.Context, params aggregation.Params) (*aggregation.Result, error)

func (*Shard) AnalyzeObject added in v1.23.0

func (s *Shard) AnalyzeObject(object *storobj.Object) ([]inverted.Property, []inverted.NilProperty, error)

func (*Shard) BeginBackup added in v1.23.0

func (s *Shard) BeginBackup(ctx context.Context) (err error)

BeginBackup stops compaction, and flushing memtable and commit log to begin with the backup

func (*Shard) Counter added in v1.23.0

func (s *Shard) Counter() *indexcounter.Counter

func (*Shard) DebugResetVectorIndex added in v1.24.22

func (s *Shard) DebugResetVectorIndex(ctx context.Context, targetVector string) error

IMPORTANT: DebugResetVectorIndex is intended to be used for debugging purposes only. It creates a new vector index and replaces the existing one if any. This function assumes the node is not receiving any traffic besides the debug endpoints and that async indexing is enabled.

func (*Shard) DeleteObject added in v1.23.0

func (s *Shard) DeleteObject(ctx context.Context, id strfmt.UUID) error

func (*Shard) DeleteObjectBatch added in v1.22.8

func (s *Shard) DeleteObjectBatch(ctx context.Context, uuids []strfmt.UUID, dryRun bool) objects.BatchSimpleObjects

return value map[int]error gives the error for the index as it received it

func (*Shard) Dimensions

func (s *Shard) Dimensions(ctx context.Context) int

func (*Shard) DimensionsForVec added in v1.24.0

func (s *Shard) DimensionsForVec(ctx context.Context, vecName string) int

func (*Shard) Exists added in v1.23.0

func (s *Shard) Exists(ctx context.Context, id strfmt.UUID) (bool, error)

TODO: This does an actual read which is not really needed, if we see this come up in profiling, we could optimize this by adding an explicit Exists() on the LSMKV which only checks the bloom filters, which at least in the case of a true negative would be considerably faster. For a (false) positive, we'd still need to check, though.

func (*Shard) FindUUIDs added in v1.22.8

func (s *Shard) FindUUIDs(ctx context.Context, filters *filters.LocalFilter) ([]strfmt.UUID, error)

func (*Shard) GetPropertyLengthTracker added in v1.23.0

func (s *Shard) GetPropertyLengthTracker() *inverted.JsonShardMetaData

Tracks the lengths of all properties. Must be updated on inserts/deletes.

func (*Shard) GetStatus added in v1.22.0

func (s *Shard) GetStatus() storagestate.Status

func (*Shard) GetStatusNoLoad added in v1.24.20

func (s *Shard) GetStatusNoLoad() storagestate.Status

Same implem for for a regular shard, this only differ in lazy loaded shards

func (*Shard) ID

func (s *Shard) ID() string

func (*Shard) Index added in v1.23.0

func (s *Shard) Index() *Index

func (*Shard) ListBackupFiles added in v1.23.0

func (s *Shard) ListBackupFiles(ctx context.Context, ret *backup.ShardDescriptor) error

ListBackupFiles lists all files used to backup a shard

func (*Shard) MergeObject added in v1.23.0

func (s *Shard) MergeObject(ctx context.Context, merge objects.MergeDocument) error

func (*Shard) Metrics added in v1.23.0

func (s *Shard) Metrics() *Metrics

Grafana metrics

func (*Shard) MultiObjectByID added in v1.23.0

func (s *Shard) MultiObjectByID(ctx context.Context, query []multi.Identifier) ([]*storobj.Object, error)

func (*Shard) Name added in v1.23.0

func (s *Shard) Name() string

Shard name(identifier?)

func (*Shard) NotifyReady added in v1.23.0

func (s *Shard) NotifyReady()

func (*Shard) ObjectByID added in v1.23.0

func (s *Shard) ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)

func (*Shard) ObjectByIDErrDeleted added in v1.23.15

func (s *Shard) ObjectByIDErrDeleted(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)

func (*Shard) ObjectCount added in v1.23.0

func (s *Shard) ObjectCount() int

ObjectCount returns the exact count at any moment

func (*Shard) ObjectCountAsync added in v1.23.10

func (s *Shard) ObjectCountAsync() int

ObjectCountAsync returns the eventually consistent "async" count which is much cheaper to obtain

func (*Shard) ObjectList added in v1.23.0

func (s *Shard) ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, className schema.ClassName) ([]*storobj.Object, error)

func (*Shard) ObjectSearch added in v1.23.0

func (s *Shard) ObjectSearch(ctx context.Context, limit int, filters *filters.LocalFilter,
	keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor,
	additional additional.Properties,
) ([]*storobj.Object, []float32, error)

func (*Shard) ObjectVectorSearch added in v1.23.0

func (s *Shard) ObjectVectorSearch(ctx context.Context, searchVector []float32, targetVector string, targetDist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties) ([]*storobj.Object, []float32, error)

func (*Shard) PreloadQueue added in v1.24.23

func (s *Shard) PreloadQueue(targetVector string) error

PreloadQueue goes through the LSM store from the last checkpoint and enqueues any unindexed vector.

func (*Shard) PutObject added in v1.23.0

func (s *Shard) PutObject(ctx context.Context, object *storobj.Object) error

func (*Shard) PutObjectBatch added in v1.23.0

func (s *Shard) PutObjectBatch(ctx context.Context,
	objects []*storobj.Object,
) []error

return value map[int]error gives the error for the index as it received it

func (*Shard) QuantizedDimensions added in v1.23.0

func (s *Shard) QuantizedDimensions(ctx context.Context, segments int) int

func (*Shard) QuantizedDimensionsForVec added in v1.24.0

func (s *Shard) QuantizedDimensionsForVec(ctx context.Context, segments int, vecName string) int

func (*Shard) Queue added in v1.23.0

func (s *Shard) Queue() *IndexQueue

func (*Shard) Queues added in v1.24.0

func (s *Shard) Queues() map[string]*IndexQueue

func (*Shard) RepairIndex added in v1.24.23

func (s *Shard) RepairIndex(ctx context.Context, targetVector string) error

RepairIndex ensures the vector index is consistent with the LSM store. It goes through the LSM store and enqueues any unindexed vector, and it also removes any indexed vector that is not in the LSM store. It it safe to call or interrupt this method at any time. If ASYNC_INDEXING is disabled, it's a no-op.

func (*Shard) SetPropertyLengthTracker added in v1.23.0

func (s *Shard) SetPropertyLengthTracker(tracker *inverted.JsonShardMetaData)

Tracks the lengths of all properties. Must be updated on inserts/deletes.

func (*Shard) SetPropertyLengths added in v1.23.0

func (s *Shard) SetPropertyLengths(props []inverted.Property) error

func (*Shard) Shutdown added in v1.23.0

func (s *Shard) Shutdown(ctx context.Context) error

Shutdown needs to be idempotent, so it can also deal with a partial initialization. In some parts, it relies on the underlying structs to have idempotent Shutdown methods. In other parts, it explicitly checks if a component was initialized. If not, it turns it into a noop to prevent blocking.

func (*Shard) Store added in v1.23.0

func (s *Shard) Store() *lsmkv.Store

The physical data store

func (*Shard) UpdateStatus added in v1.23.0

func (s *Shard) UpdateStatus(in string) error

func (*Shard) UpdateVectorIndexConfig added in v1.23.0

func (s *Shard) UpdateVectorIndexConfig(ctx context.Context, updated schema.VectorIndexConfig) error

func (*Shard) UpdateVectorIndexConfigs added in v1.24.1

func (s *Shard) UpdateVectorIndexConfigs(ctx context.Context, updated map[string]schema.VectorIndexConfig) error

func (*Shard) VectorIndex added in v1.23.0

func (s *Shard) VectorIndex() VectorIndex

func (*Shard) VectorIndexForName added in v1.24.0

func (s *Shard) VectorIndexForName(targetVector string) VectorIndex

func (*Shard) VectorIndexes added in v1.24.0

func (s *Shard) VectorIndexes() map[string]VectorIndex

func (*Shard) Versioner added in v1.23.0

func (s *Shard) Versioner() *shardVersioner

func (*Shard) WasDeleted added in v1.23.0

func (s *Shard) WasDeleted(ctx context.Context, id strfmt.UUID) (bool, error)

type ShardInvertedReindexTask added in v1.18.0

type ShardInvertedReindexTask interface {
	GetPropertiesToReindex(ctx context.Context, shard ShardLike,
	) ([]ReindexableProperty, error)
	// right now only OnResume is needed, but in the future more
	// callbacks could be added
	// (like OnPrePauseStore, OnPostPauseStore, OnPreResumeStore, etc)
	OnPostResumeStore(ctx context.Context, shard ShardLike) error
}

type ShardInvertedReindexTaskSetToRoaringSet added in v1.18.0

type ShardInvertedReindexTaskSetToRoaringSet struct{}

func (*ShardInvertedReindexTaskSetToRoaringSet) GetPropertiesToReindex added in v1.18.0

func (t *ShardInvertedReindexTaskSetToRoaringSet) GetPropertiesToReindex(ctx context.Context,
	shard ShardLike,
) ([]ReindexableProperty, error)

func (*ShardInvertedReindexTaskSetToRoaringSet) OnPostResumeStore added in v1.19.0

func (t *ShardInvertedReindexTaskSetToRoaringSet) OnPostResumeStore(ctx context.Context, shard ShardLike) error

type ShardInvertedReindexer added in v1.18.0

type ShardInvertedReindexer struct {
	// contains filtered or unexported fields
}

func NewShardInvertedReindexer added in v1.18.0

func NewShardInvertedReindexer(shard ShardLike, logger logrus.FieldLogger) *ShardInvertedReindexer

func (*ShardInvertedReindexer) AddTask added in v1.18.0

func (*ShardInvertedReindexer) Do added in v1.18.0

type ShardLike added in v1.23.0

type ShardLike interface {
	Index() *Index                  // Get the parent index
	Name() string                   // Get the shard name
	Store() *lsmkv.Store            // Get the underlying store
	NotifyReady()                   // Set shard status to ready
	GetStatus() storagestate.Status // Return the shard status
	GetStatusNoLoad() storagestate.Status
	UpdateStatus(status string) error                                                   // Set shard status
	FindUUIDs(ctx context.Context, filters *filters.LocalFilter) ([]strfmt.UUID, error) // Search and return document ids

	Counter() *indexcounter.Counter
	ObjectCount() int
	ObjectCountAsync() int
	GetPropertyLengthTracker() *inverted.JsonShardMetaData

	PutObject(context.Context, *storobj.Object) error
	PutObjectBatch(context.Context, []*storobj.Object) []error
	ObjectByID(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)
	ObjectByIDErrDeleted(ctx context.Context, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error)
	Exists(ctx context.Context, id strfmt.UUID) (bool, error)
	ObjectSearch(ctx context.Context, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties) ([]*storobj.Object, []float32, error)
	ObjectVectorSearch(ctx context.Context, searchVector []float32, targetVector string, targetDist float32, limit int, filters *filters.LocalFilter, sort []filters.Sort, groupBy *searchparams.GroupBy, additional additional.Properties) ([]*storobj.Object, []float32, error)
	UpdateVectorIndexConfig(ctx context.Context, updated schema.VectorIndexConfig) error
	UpdateVectorIndexConfigs(ctx context.Context, updated map[string]schema.VectorIndexConfig) error
	AddReferencesBatch(ctx context.Context, refs objects.BatchReferences) []error
	DeleteObjectBatch(ctx context.Context, ids []strfmt.UUID, dryRun bool) objects.BatchSimpleObjects // Delete many objects by id
	DeleteObject(ctx context.Context, id strfmt.UUID) error                                           // Delete object by id
	MultiObjectByID(ctx context.Context, query []multi.Identifier) ([]*storobj.Object, error)
	ID() string // Get the shard id

	BeginBackup(ctx context.Context) error
	ListBackupFiles(ctx context.Context, ret *backup.ShardDescriptor) error

	SetPropertyLengths(props []inverted.Property) error
	AnalyzeObject(*storobj.Object) ([]inverted.Property, []inverted.NilProperty, error)

	Aggregate(ctx context.Context, params aggregation.Params) (*aggregation.Result, error)
	MergeObject(ctx context.Context, object objects.MergeDocument) error
	Queue() *IndexQueue
	Queues() map[string]*IndexQueue
	PreloadQueue(targetVector string) error
	Shutdown(context.Context) error // Shutdown the shard
	// TODO tests only
	ObjectList(ctx context.Context, limit int, sort []filters.Sort, cursor *filters.Cursor,
		additional additional.Properties, className schema.ClassName) ([]*storobj.Object, error) // Search and return objects
	WasDeleted(ctx context.Context, id strfmt.UUID) (bool, error) // Check if an object was deleted
	VectorIndex() VectorIndex                                     // Get the vector index
	VectorIndexes() map[string]VectorIndex                        // Get the vector indexes

	// TODO tests only
	Versioner() *shardVersioner // Get the shard versioner

	// TODO tests only
	Dimensions(ctx context.Context) int // dim(vector)*number vectors
	// TODO tests only
	QuantizedDimensions(ctx context.Context, segments int) int

	Metrics() *Metrics

	// Debug methods
	DebugResetVectorIndex(ctx context.Context, targetVector string) error
	RepairIndex(ctx context.Context, targetVector string) error
	// contains filtered or unexported methods
}

type VectorIndex

type VectorIndex interface {
	Dump(labels ...string)
	Add(id uint64, vector []float32) error
	AddBatch(ctx context.Context, id []uint64, vector [][]float32) error
	Delete(id ...uint64) error
	SearchByVector(vector []float32, k int, allow helpers.AllowList) ([]uint64, []float32, error)
	SearchByVectorDistance(vector []float32, dist float32,
		maxLimit int64, allow helpers.AllowList) ([]uint64, []float32, error)
	UpdateUserConfig(updated schema.VectorIndexConfig, callback func()) error
	Drop(ctx context.Context) error
	Shutdown(ctx context.Context) error
	Flush() error
	SwitchCommitLogs(ctx context.Context) error
	ListFiles(ctx context.Context, basePath string) ([]string, error)
	PostStartup()
	Compressed() bool
	ValidateBeforeInsert(vector []float32) error
	DistanceBetweenVectors(x, y []float32) (float32, error)
	// ContainsNode returns true if the index contains the node with the given id.
	// It must return false if the node does not exist, or has a tombstone.
	ContainsNode(id uint64) bool
	// Iterate over all nodes in the index.
	// Consistency is not guaranteed, as the
	// index may be concurrently modified.
	// If the callback returns false, the iteration will stop.
	Iterate(fn func(id uint64) bool)
	DistancerProvider() distancer.Provider
	Stats() (common.IndexStats, error)
}

VectorIndex is anything that indexes vectors efficiently. For an example look at ./vector/hnsw/index.go

Directories

Path Synopsis
clusterintegrationtest acts as a test package to provide a component test spanning multiple parts of the application, including everything that's required for a distributed setup.
clusterintegrationtest acts as a test package to provide a component test spanning multiple parts of the application, including everything that's required for a distributed setup.
entities
ent contains common types used throughout various lsmkv (sub-)packages
ent contains common types used throughout various lsmkv (sub-)packages
Package roaringset contains all the LSM business logic that is unique to the "RoaringSet" strategy
Package roaringset contains all the LSM business logic that is unique to the "RoaringSet" strategy
vector
geo
hnsw/distancer/asm
asm only has amd64 specific implementations at the moment
asm only has amd64 specific implementations at the moment

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL