sharding

package
v1.27.0-rc.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2024 License: BSD-3-Clause Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Physical

type Physical struct {
	Name           string   `json:"name"`
	OwnsVirtual    []string `json:"ownsVirtual,omitempty"`
	OwnsPercentage float64  `json:"ownsPercentage"`

	LegacyBelongsToNodeForBackwardCompat string   `json:"belongsToNode,omitempty"`
	BelongsToNodes                       []string `json:"belongsToNodes,omitempty"`

	Status string `json:"status,omitempty"`
}

func (*Physical) ActivityStatus added in v1.21.0

func (p *Physical) ActivityStatus() string

func (*Physical) AdjustReplicas

func (p *Physical) AdjustReplicas(count int, nodes cluster.NodeSelector) error

AdjustReplicas shrinks or extends the replica set (p.BelongsToNodes)

func (Physical) BelongsToNode

func (p Physical) BelongsToNode() string

BelongsToNode for backward-compatibility when there was no replication. It always returns the first node of the list

func (Physical) DeepCopy

func (p Physical) DeepCopy() Physical

type RemoteIncomingRepo

type RemoteIncomingRepo interface {
	GetIndexForIncomingSharding(className schema.ClassName) RemoteIndexIncomingRepo
}

type RemoteIncomingSchema added in v1.25.0

type RemoteIncomingSchema interface {
	ReadOnlyClassWithVersion(ctx context.Context, class string, version uint64) (*models.Class, error)
}

type RemoteIndex

type RemoteIndex struct {
	// contains filtered or unexported fields
}

func NewRemoteIndex

func NewRemoteIndex(className string,
	stateGetter shardingStateGetter, nodeResolver nodeResolver,
	client RemoteIndexClient,
) *RemoteIndex

func (*RemoteIndex) Aggregate

func (ri *RemoteIndex) Aggregate(
	ctx context.Context,
	shard string,
	params aggregation.Params,
) (*aggregation.Result, error)

func (*RemoteIndex) BatchAddReferences

func (ri *RemoteIndex) BatchAddReferences(ctx context.Context, shardName string,
	refs objects.BatchReferences, schemaVersion uint64,
) []error

func (*RemoteIndex) BatchPutObjects

func (ri *RemoteIndex) BatchPutObjects(ctx context.Context, shardName string,
	objs []*storobj.Object, schemaVersion uint64,
) []error

func (*RemoteIndex) DeleteObject

func (ri *RemoteIndex) DeleteObject(ctx context.Context, shardName string,
	id strfmt.UUID, schemaVersion uint64,
) error

func (*RemoteIndex) DeleteObjectBatch

func (ri *RemoteIndex) DeleteObjectBatch(ctx context.Context, shardName string,
	uuids []strfmt.UUID, dryRun bool, schemaVersion uint64,
) objects.BatchSimpleObjects

func (*RemoteIndex) Exists

func (ri *RemoteIndex) Exists(ctx context.Context, shardName string,
	id strfmt.UUID,
) (bool, error)

func (*RemoteIndex) FindUUIDs added in v1.22.8

func (ri *RemoteIndex) FindUUIDs(ctx context.Context, shardName string,
	filters *filters.LocalFilter,
) ([]strfmt.UUID, error)

func (*RemoteIndex) GetObject

func (ri *RemoteIndex) GetObject(ctx context.Context, shardName string,
	id strfmt.UUID, props search.SelectProperties,
	additional additional.Properties,
) (*storobj.Object, error)

func (*RemoteIndex) GetShardQueueSize added in v1.22.0

func (ri *RemoteIndex) GetShardQueueSize(ctx context.Context, shardName string) (int64, error)

func (*RemoteIndex) GetShardStatus

func (ri *RemoteIndex) GetShardStatus(ctx context.Context, shardName string) (string, error)

func (*RemoteIndex) MergeObject

func (ri *RemoteIndex) MergeObject(ctx context.Context, shardName string,
	mergeDoc objects.MergeDocument, schemaVersion uint64,
) error

func (*RemoteIndex) MultiGetObjects

func (ri *RemoteIndex) MultiGetObjects(ctx context.Context, shardName string,
	ids []strfmt.UUID,
) ([]*storobj.Object, error)

func (*RemoteIndex) PutObject

func (ri *RemoteIndex) PutObject(ctx context.Context, shardName string,
	obj *storobj.Object, schemaVersion uint64,
) error

func (*RemoteIndex) SearchAllReplicas added in v1.24.20

func (ri *RemoteIndex) SearchAllReplicas(ctx context.Context,
	log logrus.FieldLogger,
	shard string,
	queryVec [][]float32,
	targetVector []string,
	limit int,
	filters *filters.LocalFilter,
	keywordRanking *searchparams.KeywordRanking,
	sort []filters.Sort,
	cursor *filters.Cursor,
	groupBy *searchparams.GroupBy,
	adds additional.Properties,
	replEnabled bool,
	localNode string,
	targetCombination *dto.TargetCombination,
	properties []string,
) ([]ReplicasSearchResult, error)

func (*RemoteIndex) SearchShard

func (ri *RemoteIndex) SearchShard(ctx context.Context, shard string,
	queryVec [][]float32,
	targetVector []string,
	limit int,
	filters *filters.LocalFilter,
	keywordRanking *searchparams.KeywordRanking,
	sort []filters.Sort,
	cursor *filters.Cursor,
	groupBy *searchparams.GroupBy,
	adds additional.Properties,
	replEnabled bool,
	targetCombination *dto.TargetCombination,
	properties []string,
) ([]*storobj.Object, []float32, string, error)

func (*RemoteIndex) UpdateShardStatus

func (ri *RemoteIndex) UpdateShardStatus(ctx context.Context, shardName, targetStatus string, schemaVersion uint64) error

type RemoteIndexClient

type RemoteIndexClient interface {
	PutObject(ctx context.Context, hostName, indexName, shardName string,
		obj *storobj.Object, schemaVersion uint64) error
	BatchPutObjects(ctx context.Context, hostName, indexName, shardName string,
		objs []*storobj.Object, repl *additional.ReplicationProperties, schemaVersion uint64) []error
	BatchAddReferences(ctx context.Context, hostName, indexName, shardName string,
		refs objects.BatchReferences, schemaVersion uint64) []error
	GetObject(ctx context.Context, hostname, indexName, shardName string,
		id strfmt.UUID, props search.SelectProperties,
		additional additional.Properties) (*storobj.Object, error)
	Exists(ctx context.Context, hostname, indexName, shardName string,
		id strfmt.UUID) (bool, error)
	DeleteObject(ctx context.Context, hostname, indexName, shardName string,
		id strfmt.UUID, schemaVersion uint64) error
	MergeObject(ctx context.Context, hostname, indexName, shardName string,
		mergeDoc objects.MergeDocument, schemaVersion uint64) error
	MultiGetObjects(ctx context.Context, hostname, indexName, shardName string,
		ids []strfmt.UUID) ([]*storobj.Object, error)
	SearchShard(ctx context.Context, hostname, indexName, shardName string,
		searchVector [][]float32, targetVector []string, limit int, filters *filters.LocalFilter,
		keywordRanking *searchparams.KeywordRanking, sort []filters.Sort,
		cursor *filters.Cursor, groupBy *searchparams.GroupBy,
		additional additional.Properties, targetCombination *dto.TargetCombination, properties []string,
	) ([]*storobj.Object, []float32, error)

	Aggregate(ctx context.Context, hostname, indexName, shardName string,
		params aggregation.Params) (*aggregation.Result, error)
	FindUUIDs(ctx context.Context, hostName, indexName, shardName string,
		filters *filters.LocalFilter) ([]strfmt.UUID, error)
	DeleteObjectBatch(ctx context.Context, hostName, indexName, shardName string,
		uuids []strfmt.UUID, dryRun bool, schemaVersion uint64) objects.BatchSimpleObjects
	GetShardQueueSize(ctx context.Context, hostName, indexName, shardName string) (int64, error)
	GetShardStatus(ctx context.Context, hostName, indexName, shardName string) (string, error)
	UpdateShardStatus(ctx context.Context, hostName, indexName, shardName, targetStatus string, schemaVersion uint64) error

	PutFile(ctx context.Context, hostName, indexName, shardName, fileName string,
		payload io.ReadSeekCloser) error
}

type RemoteIndexIncoming

type RemoteIndexIncoming struct {
	// contains filtered or unexported fields
}

func NewRemoteIndexIncoming

func NewRemoteIndexIncoming(repo RemoteIncomingRepo, schema RemoteIncomingSchema, modules interface{}) *RemoteIndexIncoming

func (*RemoteIndexIncoming) Aggregate

func (rii *RemoteIndexIncoming) Aggregate(ctx context.Context, indexName, shardName string,
	params aggregation.Params,
) (*aggregation.Result, error)

func (*RemoteIndexIncoming) BatchAddReferences

func (rii *RemoteIndexIncoming) BatchAddReferences(ctx context.Context, indexName,
	shardName string, refs objects.BatchReferences, schemaVersion uint64,
) []error

func (*RemoteIndexIncoming) BatchPutObjects

func (rii *RemoteIndexIncoming) BatchPutObjects(ctx context.Context, indexName,
	shardName string, objs []*storobj.Object, schemaVersion uint64,
) []error

func (*RemoteIndexIncoming) CreateShard

func (rii *RemoteIndexIncoming) CreateShard(ctx context.Context,
	indexName, shardName string,
) error

func (*RemoteIndexIncoming) DeleteObject

func (rii *RemoteIndexIncoming) DeleteObject(ctx context.Context, indexName,
	shardName string, id strfmt.UUID, schemaVersion uint64,
) error

func (*RemoteIndexIncoming) DeleteObjectBatch

func (rii *RemoteIndexIncoming) DeleteObjectBatch(ctx context.Context, indexName, shardName string,
	uuids []strfmt.UUID, dryRun bool, schemaVersion uint64,
) objects.BatchSimpleObjects

func (*RemoteIndexIncoming) DigestObjects added in v1.18.0

func (rii *RemoteIndexIncoming) DigestObjects(ctx context.Context,
	indexName, shardName string, ids []strfmt.UUID,
) ([]replica.RepairResponse, error)

func (*RemoteIndexIncoming) DigestObjectsInTokenRange added in v1.26.0

func (rii *RemoteIndexIncoming) DigestObjectsInTokenRange(ctx context.Context,
	indexName, shardName string, initialToken, finalToken uint64, limit int,
) ([]replica.RepairResponse, uint64, error)

func (*RemoteIndexIncoming) Exists

func (rii *RemoteIndexIncoming) Exists(ctx context.Context, indexName,
	shardName string, id strfmt.UUID,
) (bool, error)

func (*RemoteIndexIncoming) FilePutter

func (rii *RemoteIndexIncoming) FilePutter(ctx context.Context,
	indexName, shardName, filePath string,
) (io.WriteCloser, error)

func (*RemoteIndexIncoming) FindUUIDs added in v1.22.8

func (rii *RemoteIndexIncoming) FindUUIDs(ctx context.Context, indexName, shardName string,
	filters *filters.LocalFilter,
) ([]strfmt.UUID, error)

func (*RemoteIndexIncoming) GetObject

func (rii *RemoteIndexIncoming) GetObject(ctx context.Context, indexName,
	shardName string, id strfmt.UUID, selectProperties search.SelectProperties,
	additional additional.Properties,
) (*storobj.Object, error)

func (*RemoteIndexIncoming) GetShardQueueSize added in v1.22.0

func (rii *RemoteIndexIncoming) GetShardQueueSize(ctx context.Context,
	indexName, shardName string,
) (int64, error)

func (*RemoteIndexIncoming) GetShardStatus

func (rii *RemoteIndexIncoming) GetShardStatus(ctx context.Context,
	indexName, shardName string,
) (string, error)

func (*RemoteIndexIncoming) HashTreeLevel added in v1.26.0

func (rii *RemoteIndexIncoming) HashTreeLevel(ctx context.Context,
	indexName, shardName string, level int, discriminant *hashtree.Bitset,
) (digests []hashtree.Digest, err error)

func (*RemoteIndexIncoming) MergeObject

func (rii *RemoteIndexIncoming) MergeObject(ctx context.Context, indexName,
	shardName string, mergeDoc objects.MergeDocument, schemaVersion uint64,
) error

func (*RemoteIndexIncoming) MultiGetObjects

func (rii *RemoteIndexIncoming) MultiGetObjects(ctx context.Context, indexName,
	shardName string, ids []strfmt.UUID,
) ([]*storobj.Object, error)

func (*RemoteIndexIncoming) OverwriteObjects added in v1.18.0

func (rii *RemoteIndexIncoming) OverwriteObjects(ctx context.Context,
	indexName, shardName string, vobjects []*objects.VObject,
) ([]replica.RepairResponse, error)

func (*RemoteIndexIncoming) PutObject

func (rii *RemoteIndexIncoming) PutObject(ctx context.Context, indexName,
	shardName string, obj *storobj.Object, schemaVersion uint64,
) error

func (*RemoteIndexIncoming) ReInitShard

func (rii *RemoteIndexIncoming) ReInitShard(ctx context.Context,
	indexName, shardName string,
) error

func (*RemoteIndexIncoming) Search

func (rii *RemoteIndexIncoming) Search(ctx context.Context, indexName, shardName string,
	vectors [][]float32, targetVectors []string, distance float32, limit int, filters *filters.LocalFilter,
	keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor,
	groupBy *searchparams.GroupBy, additional additional.Properties, targetCombination *dto.TargetCombination,
	properties []string,
) ([]*storobj.Object, []float32, error)

func (*RemoteIndexIncoming) UpdateShardStatus

func (rii *RemoteIndexIncoming) UpdateShardStatus(ctx context.Context,
	indexName, shardName, targetStatus string, schemaVersion uint64,
) error

type RemoteIndexIncomingRepo

type RemoteIndexIncomingRepo interface {
	IncomingPutObject(ctx context.Context, shardName string,
		obj *storobj.Object, schemaVersion uint64) error
	IncomingBatchPutObjects(ctx context.Context, shardName string,
		objs []*storobj.Object, schemaVersion uint64) []error
	IncomingBatchAddReferences(ctx context.Context, shardName string,
		refs objects.BatchReferences, schemaVersion uint64) []error
	IncomingGetObject(ctx context.Context, shardName string, id strfmt.UUID,
		selectProperties search.SelectProperties,
		additional additional.Properties) (*storobj.Object, error)
	IncomingExists(ctx context.Context, shardName string,
		id strfmt.UUID) (bool, error)
	IncomingDeleteObject(ctx context.Context, shardName string,
		id strfmt.UUID, schemaVersion uint64) error
	IncomingMergeObject(ctx context.Context, shardName string,
		mergeDoc objects.MergeDocument, schemaVersion uint64) error
	IncomingMultiGetObjects(ctx context.Context, shardName string,
		ids []strfmt.UUID) ([]*storobj.Object, error)
	IncomingSearch(ctx context.Context, shardName string,
		vectors [][]float32, targetVectors []string, distance float32, limit int,
		filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking,
		sort []filters.Sort, cursor *filters.Cursor, groupBy *searchparams.GroupBy,
		additional additional.Properties, targetCombination *dto.TargetCombination, properties []string,
	) ([]*storobj.Object, []float32, error)
	IncomingAggregate(ctx context.Context, shardName string,
		params aggregation.Params, modules interface{}) (*aggregation.Result, error)

	IncomingFindUUIDs(ctx context.Context, shardName string,
		filters *filters.LocalFilter) ([]strfmt.UUID, error)
	IncomingDeleteObjectBatch(ctx context.Context, shardName string,
		uuids []strfmt.UUID, dryRun bool, schemaVersion uint64) objects.BatchSimpleObjects
	IncomingGetShardQueueSize(ctx context.Context, shardName string) (int64, error)
	IncomingGetShardStatus(ctx context.Context, shardName string) (string, error)
	IncomingUpdateShardStatus(ctx context.Context, shardName, targetStatus string, schemaVersion uint64) error
	IncomingOverwriteObjects(ctx context.Context, shard string,
		vobjects []*objects.VObject) ([]replica.RepairResponse, error)
	IncomingDigestObjects(ctx context.Context, shardName string,
		ids []strfmt.UUID) (result []replica.RepairResponse, err error)
	IncomingDigestObjectsInTokenRange(ctx context.Context, shardName string,
		initialToken, finalToken uint64, limit int) (result []replica.RepairResponse, lastTokenRead uint64, err error)
	IncomingHashTreeLevel(ctx context.Context, shardName string,
		level int, discriminant *hashtree.Bitset) (digests []hashtree.Digest, err error)

	// Scale-Out Replication POC
	IncomingFilePutter(ctx context.Context, shardName,
		filePath string) (io.WriteCloser, error)
	IncomingCreateShard(ctx context.Context, className string, shardName string) error
	IncomingReinitShard(ctx context.Context, shardName string) error
}

type RemoteNode

type RemoteNode struct {
	// contains filtered or unexported fields
}

func NewRemoteNode

func NewRemoteNode(nodeResolver nodeResolver, client RemoteNodeClient) *RemoteNode

func (*RemoteNode) GetNodeStatus

func (rn *RemoteNode) GetNodeStatus(ctx context.Context, nodeName, className, output string) (*models.NodeStatus, error)

func (*RemoteNode) GetStatistics added in v1.25.0

func (rn *RemoteNode) GetStatistics(ctx context.Context, nodeName string) (*models.Statistics, error)

type RemoteNodeClient

type RemoteNodeClient interface {
	GetNodeStatus(ctx context.Context, hostName, className, output string) (*models.NodeStatus, error)
	GetStatistics(ctx context.Context, hostName string) (*models.Statistics, error)
}

type RemoteNodeIncoming

type RemoteNodeIncoming struct {
	// contains filtered or unexported fields
}

func NewRemoteNodeIncoming

func NewRemoteNodeIncoming(repo RemoteNodeIncomingRepo) *RemoteNodeIncoming

func (*RemoteNodeIncoming) GetNodeStatus

func (rni *RemoteNodeIncoming) GetNodeStatus(ctx context.Context, className, output string) (*models.NodeStatus, error)

func (*RemoteNodeIncoming) GetStatistics added in v1.25.0

func (rni *RemoteNodeIncoming) GetStatistics(ctx context.Context) (*models.Statistics, error)

type RemoteNodeIncomingRepo

type RemoteNodeIncomingRepo interface {
	IncomingGetNodeStatus(ctx context.Context, className, output string) (*models.NodeStatus, error)
	IncomingGetNodeStatistics() (*models.Statistics, error)
}

type ReplicasSearchResult added in v1.24.20

type ReplicasSearchResult struct {
	Objects []*storobj.Object
	Scores  []float32
	Node    string
}

type State

type State struct {
	IndexID             string              `json:"indexID"` // for monitoring, reporting purposes. Does not influence the shard-calculations
	Config              config.Config       `json:"config"`
	Physical            map[string]Physical `json:"physical"`
	Virtual             []Virtual           `json:"virtual"`
	PartitioningEnabled bool                `json:"partitioningEnabled"`
	// contains filtered or unexported fields
}

func InitState

func InitState(id string, config config.Config, nodeLocalName string, names []string, replFactor int64, partitioningEnabled bool) (*State, error)

func StateFromJSON

func StateFromJSON(in []byte, nodes cluster.NodeSelector) (*State, error)

func (*State) AddPartition added in v1.20.0

func (s *State) AddPartition(name string, nodes []string, status string) Physical

AddPartition to physical shards

func (*State) AllLocalPhysicalShards

func (s *State) AllLocalPhysicalShards() []string

func (*State) AllPhysicalShards

func (s *State) AllPhysicalShards() []string

func (*State) ApplyNodeMapping added in v1.22.0

func (s *State) ApplyNodeMapping(nodeMapping map[string]string)

ApplyNodeMapping replaces node names with their new value form nodeMapping in s. If s.LegacyBelongsToNodeForBackwardCompat is non empty, it will also perform node name replacement if present in nodeMapping.

func (*State) CountPhysicalShards

func (s *State) CountPhysicalShards() int

CountPhysicalShards return a count of physical shards

func (State) DeepCopy

func (s State) DeepCopy() State

func (*State) DeletePartition added in v1.20.0

func (s *State) DeletePartition(name string)

DeletePartition to physical shards

func (State) GetPartitions added in v1.20.0

func (s State) GetPartitions(nodes []string, shards []string, replFactor int64) (map[string][]string, error)

GetPartitions based on the specified shards, available nodes, and replFactor It doesn't change the internal state TODO-RAFT: Ensure this function is higherorder, if the repartition result is changed, this will result in inconsistency when applying old log entry for add tenants

func (*State) IsLocalShard added in v1.20.0

func (s *State) IsLocalShard(name string) bool

func (*State) JSON

func (s *State) JSON() ([]byte, error)

func (*State) MigrateFromOldFormat

func (s *State) MigrateFromOldFormat()

MigrateFromOldFormat checks if the old (pre-v1.17) format was used and migrates it into the new format for backward-compatibility with all classes created before v1.17

func (*State) PhysicalShard

func (s *State) PhysicalShard(in []byte) string

func (*State) SetLocalName

func (s *State) SetLocalName(name string)

func (*State) Shard added in v1.20.0

func (s *State) Shard(partitionKey, objectID string) string

Shard returns the shard name if it exits and empty string otherwise

func (*State) VirtualByName added in v1.26.0

func (s *State) VirtualByName(name string) *Virtual

uses linear search, but should only be used during shard init and update operations, not in regular

type Virtual

type Virtual struct {
	Name               string  `json:"name"`
	Upper              uint64  `json:"upper"`
	OwnsPercentage     float64 `json:"ownsPercentage"`
	AssignedToPhysical string  `json:"assignedToPhysical"`
}

func (Virtual) DeepCopy

func (v Virtual) DeepCopy() Virtual

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL