sharding

package
v1.23.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 18, 2023 License: BSD-3-Clause Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultVirtualPerPhysical = 128
	DefaultKey                = "_id"
	DefaultStrategy           = "hash"
	DefaultFunction           = "murmur3"
)

Variables

This section is empty.

Functions

func ValidateConfigUpdate

func ValidateConfigUpdate(old, updated Config, nodeCounter nodeCounter) error

Types

type Config

type Config struct {
	VirtualPerPhysical  int    `json:"virtualPerPhysical"`
	DesiredCount        int    `json:"desiredCount"`
	ActualCount         int    `json:"actualCount"`
	DesiredVirtualCount int    `json:"desiredVirtualCount"`
	ActualVirtualCount  int    `json:"actualVirtualCount"`
	Key                 string `json:"key"`
	Strategy            string `json:"strategy"`
	Function            string `json:"function"`
}

func ParseConfig

func ParseConfig(input interface{}, nodeCount int) (Config, error)

func (Config) DeepCopy

func (c Config) DeepCopy() Config

type Physical

type Physical struct {
	Name           string   `json:"name"`
	OwnsVirtual    []string `json:"ownsVirtual,omitempty"`
	OwnsPercentage float64  `json:"ownsPercentage"`

	LegacyBelongsToNodeForBackwardCompat string   `json:"belongsToNode,omitempty"`
	BelongsToNodes                       []string `json:"belongsToNodes,omitempty"`

	Status string `json:"status,omitempty"`
}

func (*Physical) ActivityStatus added in v1.21.0

func (p *Physical) ActivityStatus() string

func (*Physical) AdjustReplicas

func (p *Physical) AdjustReplicas(count int, nodes nodes) error

AdjustReplicas shrinks or extends the replica set (p.BelongsToNodes)

func (Physical) BelongsToNode

func (p Physical) BelongsToNode() string

BelongsToNode for backward-compatibility when there was no replication. It always returns the first node of the list

func (Physical) DeepCopy

func (p Physical) DeepCopy() Physical

type RemoteIncomingRepo

type RemoteIncomingRepo interface {
	GetIndexForIncoming(className schema.ClassName) RemoteIndexIncomingRepo
}

type RemoteIndex

type RemoteIndex struct {
	// contains filtered or unexported fields
}

func NewRemoteIndex

func NewRemoteIndex(className string,
	stateGetter shardingStateGetter, nodeResolver nodeResolver,
	client RemoteIndexClient,
) *RemoteIndex

func (*RemoteIndex) Aggregate

func (ri *RemoteIndex) Aggregate(
	ctx context.Context,
	shard string,
	params aggregation.Params,
) (*aggregation.Result, error)

func (*RemoteIndex) BatchAddReferences

func (ri *RemoteIndex) BatchAddReferences(ctx context.Context, shardName string,
	refs objects.BatchReferences,
) []error

func (*RemoteIndex) BatchPutObjects

func (ri *RemoteIndex) BatchPutObjects(ctx context.Context, shardName string,
	objs []*storobj.Object,
) []error

func (*RemoteIndex) DeleteObject

func (ri *RemoteIndex) DeleteObject(ctx context.Context, shardName string,
	id strfmt.UUID,
) error

func (*RemoteIndex) DeleteObjectBatch

func (ri *RemoteIndex) DeleteObjectBatch(ctx context.Context, shardName string,
	uuids []strfmt.UUID, dryRun bool,
) objects.BatchSimpleObjects

func (*RemoteIndex) Exists

func (ri *RemoteIndex) Exists(ctx context.Context, shardName string,
	id strfmt.UUID,
) (bool, error)

func (*RemoteIndex) FindUUIDs added in v1.22.8

func (ri *RemoteIndex) FindUUIDs(ctx context.Context, shardName string,
	filters *filters.LocalFilter,
) ([]strfmt.UUID, error)

func (*RemoteIndex) GetObject

func (ri *RemoteIndex) GetObject(ctx context.Context, shardName string,
	id strfmt.UUID, props search.SelectProperties,
	additional additional.Properties,
) (*storobj.Object, error)

func (*RemoteIndex) GetShardQueueSize added in v1.22.0

func (ri *RemoteIndex) GetShardQueueSize(ctx context.Context, shardName string) (int64, error)

func (*RemoteIndex) GetShardStatus

func (ri *RemoteIndex) GetShardStatus(ctx context.Context, shardName string) (string, error)

func (*RemoteIndex) MergeObject

func (ri *RemoteIndex) MergeObject(ctx context.Context, shardName string,
	mergeDoc objects.MergeDocument,
) error

func (*RemoteIndex) MultiGetObjects

func (ri *RemoteIndex) MultiGetObjects(ctx context.Context, shardName string,
	ids []strfmt.UUID,
) ([]*storobj.Object, error)

func (*RemoteIndex) PutObject

func (ri *RemoteIndex) PutObject(ctx context.Context, shardName string,
	obj *storobj.Object,
) error

func (*RemoteIndex) SearchShard

func (ri *RemoteIndex) SearchShard(ctx context.Context, shard string,
	queryVec []float32,
	limit int,
	filters *filters.LocalFilter,
	keywordRanking *searchparams.KeywordRanking,
	sort []filters.Sort,
	cursor *filters.Cursor,
	groupBy *searchparams.GroupBy,
	adds additional.Properties,
	replEnabled bool,
) ([]*storobj.Object, []float32, string, error)

func (*RemoteIndex) UpdateShardStatus

func (ri *RemoteIndex) UpdateShardStatus(ctx context.Context, shardName, targetStatus string) error

type RemoteIndexClient

type RemoteIndexClient interface {
	PutObject(ctx context.Context, hostName, indexName, shardName string,
		obj *storobj.Object) error
	BatchPutObjects(ctx context.Context, hostName, indexName, shardName string,
		objs []*storobj.Object, repl *additional.ReplicationProperties) []error
	BatchAddReferences(ctx context.Context, hostName, indexName, shardName string,
		refs objects.BatchReferences) []error
	GetObject(ctx context.Context, hostname, indexName, shardName string,
		id strfmt.UUID, props search.SelectProperties,
		additional additional.Properties) (*storobj.Object, error)
	Exists(ctx context.Context, hostname, indexName, shardName string,
		id strfmt.UUID) (bool, error)
	DeleteObject(ctx context.Context, hostname, indexName, shardName string,
		id strfmt.UUID) error
	MergeObject(ctx context.Context, hostname, indexName, shardName string,
		mergeDoc objects.MergeDocument) error
	MultiGetObjects(ctx context.Context, hostname, indexName, shardName string,
		ids []strfmt.UUID) ([]*storobj.Object, error)
	SearchShard(ctx context.Context, hostname, indexName, shardName string,
		searchVector []float32, limit int, filters *filters.LocalFilter,
		keywordRanking *searchparams.KeywordRanking, sort []filters.Sort,
		cursor *filters.Cursor, groupBy *searchparams.GroupBy,
		additional additional.Properties,
	) ([]*storobj.Object, []float32, error)
	Aggregate(ctx context.Context, hostname, indexName, shardName string,
		params aggregation.Params) (*aggregation.Result, error)
	FindUUIDs(ctx context.Context, hostName, indexName, shardName string,
		filters *filters.LocalFilter) ([]strfmt.UUID, error)
	DeleteObjectBatch(ctx context.Context, hostName, indexName, shardName string,
		uuids []strfmt.UUID, dryRun bool) objects.BatchSimpleObjects
	GetShardQueueSize(ctx context.Context, hostName, indexName, shardName string) (int64, error)
	GetShardStatus(ctx context.Context, hostName, indexName, shardName string) (string, error)
	UpdateShardStatus(ctx context.Context, hostName, indexName, shardName,
		targetStatus string) error

	PutFile(ctx context.Context, hostName, indexName, shardName, fileName string,
		payload io.ReadSeekCloser) error
}

type RemoteIndexIncoming

type RemoteIndexIncoming struct {
	// contains filtered or unexported fields
}

func NewRemoteIndexIncoming

func NewRemoteIndexIncoming(repo RemoteIncomingRepo) *RemoteIndexIncoming

func (*RemoteIndexIncoming) Aggregate

func (rii *RemoteIndexIncoming) Aggregate(ctx context.Context, indexName, shardName string,
	params aggregation.Params,
) (*aggregation.Result, error)

func (*RemoteIndexIncoming) BatchAddReferences

func (rii *RemoteIndexIncoming) BatchAddReferences(ctx context.Context, indexName,
	shardName string, refs objects.BatchReferences,
) []error

func (*RemoteIndexIncoming) BatchPutObjects

func (rii *RemoteIndexIncoming) BatchPutObjects(ctx context.Context, indexName,
	shardName string, objs []*storobj.Object,
) []error

func (*RemoteIndexIncoming) CreateShard

func (rii *RemoteIndexIncoming) CreateShard(ctx context.Context,
	indexName, shardName string,
) error

func (*RemoteIndexIncoming) DeleteObject

func (rii *RemoteIndexIncoming) DeleteObject(ctx context.Context, indexName,
	shardName string, id strfmt.UUID,
) error

func (*RemoteIndexIncoming) DeleteObjectBatch

func (rii *RemoteIndexIncoming) DeleteObjectBatch(ctx context.Context, indexName, shardName string,
	uuids []strfmt.UUID, dryRun bool,
) objects.BatchSimpleObjects

func (*RemoteIndexIncoming) DigestObjects added in v1.18.0

func (rii *RemoteIndexIncoming) DigestObjects(ctx context.Context,
	indexName, shardName string, ids []strfmt.UUID,
) ([]replica.RepairResponse, error)

func (*RemoteIndexIncoming) Exists

func (rii *RemoteIndexIncoming) Exists(ctx context.Context, indexName,
	shardName string, id strfmt.UUID,
) (bool, error)

func (*RemoteIndexIncoming) FilePutter

func (rii *RemoteIndexIncoming) FilePutter(ctx context.Context,
	indexName, shardName, filePath string,
) (io.WriteCloser, error)

func (*RemoteIndexIncoming) FindUUIDs added in v1.22.8

func (rii *RemoteIndexIncoming) FindUUIDs(ctx context.Context, indexName, shardName string,
	filters *filters.LocalFilter,
) ([]strfmt.UUID, error)

func (*RemoteIndexIncoming) GetObject

func (rii *RemoteIndexIncoming) GetObject(ctx context.Context, indexName,
	shardName string, id strfmt.UUID, selectProperties search.SelectProperties,
	additional additional.Properties,
) (*storobj.Object, error)

func (*RemoteIndexIncoming) GetShardQueueSize added in v1.22.0

func (rii *RemoteIndexIncoming) GetShardQueueSize(ctx context.Context,
	indexName, shardName string,
) (int64, error)

func (*RemoteIndexIncoming) GetShardStatus

func (rii *RemoteIndexIncoming) GetShardStatus(ctx context.Context,
	indexName, shardName string,
) (string, error)

func (*RemoteIndexIncoming) MergeObject

func (rii *RemoteIndexIncoming) MergeObject(ctx context.Context, indexName,
	shardName string, mergeDoc objects.MergeDocument,
) error

func (*RemoteIndexIncoming) MultiGetObjects

func (rii *RemoteIndexIncoming) MultiGetObjects(ctx context.Context, indexName,
	shardName string, ids []strfmt.UUID,
) ([]*storobj.Object, error)

func (*RemoteIndexIncoming) OverwriteObjects added in v1.18.0

func (rii *RemoteIndexIncoming) OverwriteObjects(ctx context.Context,
	indexName, shardName string, vobjects []*objects.VObject,
) ([]replica.RepairResponse, error)

func (*RemoteIndexIncoming) PutObject

func (rii *RemoteIndexIncoming) PutObject(ctx context.Context, indexName,
	shardName string, obj *storobj.Object,
) error

func (*RemoteIndexIncoming) ReInitShard

func (rii *RemoteIndexIncoming) ReInitShard(ctx context.Context,
	indexName, shardName string,
) error

func (*RemoteIndexIncoming) Search

func (rii *RemoteIndexIncoming) Search(ctx context.Context, indexName, shardName string,
	vector []float32, distance float32, limit int, filters *filters.LocalFilter,
	keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor,
	groupBy *searchparams.GroupBy, additional additional.Properties,
) ([]*storobj.Object, []float32, error)

func (*RemoteIndexIncoming) UpdateShardStatus

func (rii *RemoteIndexIncoming) UpdateShardStatus(ctx context.Context,
	indexName, shardName, targetStatus string,
) error

type RemoteIndexIncomingRepo

type RemoteIndexIncomingRepo interface {
	IncomingPutObject(ctx context.Context, shardName string,
		obj *storobj.Object) error
	IncomingBatchPutObjects(ctx context.Context, shardName string,
		objs []*storobj.Object) []error
	IncomingBatchAddReferences(ctx context.Context, shardName string,
		refs objects.BatchReferences) []error
	IncomingGetObject(ctx context.Context, shardName string, id strfmt.UUID,
		selectProperties search.SelectProperties,
		additional additional.Properties) (*storobj.Object, error)
	IncomingExists(ctx context.Context, shardName string,
		id strfmt.UUID) (bool, error)
	IncomingDeleteObject(ctx context.Context, shardName string,
		id strfmt.UUID) error
	IncomingMergeObject(ctx context.Context, shardName string,
		mergeDoc objects.MergeDocument) error
	IncomingMultiGetObjects(ctx context.Context, shardName string,
		ids []strfmt.UUID) ([]*storobj.Object, error)
	IncomingSearch(ctx context.Context, shardName string,
		vector []float32, distance float32, limit int, filters *filters.LocalFilter,
		keywordRanking *searchparams.KeywordRanking, sort []filters.Sort,
		cursor *filters.Cursor, groupBy *searchparams.GroupBy,
		additional additional.Properties,
	) ([]*storobj.Object, []float32, error)
	IncomingAggregate(ctx context.Context, shardName string,
		params aggregation.Params) (*aggregation.Result, error)

	IncomingFindUUIDs(ctx context.Context, shardName string,
		filters *filters.LocalFilter) ([]strfmt.UUID, error)
	IncomingDeleteObjectBatch(ctx context.Context, shardName string,
		uuids []strfmt.UUID, dryRun bool) objects.BatchSimpleObjects
	IncomingGetShardQueueSize(ctx context.Context, shardName string) (int64, error)
	IncomingGetShardStatus(ctx context.Context, shardName string) (string, error)
	IncomingUpdateShardStatus(ctx context.Context, shardName, targetStatus string) error
	IncomingOverwriteObjects(ctx context.Context, shard string,
		vobjects []*objects.VObject) ([]replica.RepairResponse, error)
	IncomingDigestObjects(ctx context.Context, shardName string,
		ids []strfmt.UUID) (result []replica.RepairResponse, err error)

	// Scale-Out Replication POC
	IncomingFilePutter(ctx context.Context, shardName,
		filePath string) (io.WriteCloser, error)
	IncomingCreateShard(ctx context.Context, className string, shardName string) error
	IncomingReinitShard(ctx context.Context, shardName string) error
}

type RemoteNode

type RemoteNode struct {
	// contains filtered or unexported fields
}

func NewRemoteNode

func NewRemoteNode(nodeResolver nodeResolver, client RemoteNodeClient) *RemoteNode

func (*RemoteNode) GetNodeStatus

func (rn *RemoteNode) GetNodeStatus(ctx context.Context, nodeName, className, output string) (*models.NodeStatus, error)

type RemoteNodeClient

type RemoteNodeClient interface {
	GetNodeStatus(ctx context.Context, hostName, className, output string) (*models.NodeStatus, error)
}

type RemoteNodeIncoming

type RemoteNodeIncoming struct {
	// contains filtered or unexported fields
}

func NewRemoteNodeIncoming

func NewRemoteNodeIncoming(repo RemoteNodeIncomingRepo) *RemoteNodeIncoming

func (*RemoteNodeIncoming) GetNodeStatus

func (rni *RemoteNodeIncoming) GetNodeStatus(ctx context.Context, className, output string) (*models.NodeStatus, error)

type RemoteNodeIncomingRepo

type RemoteNodeIncomingRepo interface {
	IncomingGetNodeStatus(ctx context.Context, className, output string) (*models.NodeStatus, error)
}

type State

type State struct {
	IndexID             string              `json:"indexID"` // for monitoring, reporting purposes. Does not influence the shard-calculations
	Config              Config              `json:"config"`
	Physical            map[string]Physical `json:"physical"`
	Virtual             []Virtual           `json:"virtual"`
	PartitioningEnabled bool                `json:"partitioningEnabled"`
	// contains filtered or unexported fields
}

func InitState

func InitState(id string, config Config, nodes nodes, replFactor int64, partitioningEnabled bool) (*State, error)

func StateFromJSON

func StateFromJSON(in []byte, nodes nodes) (*State, error)

func (*State) AddPartition added in v1.20.0

func (s *State) AddPartition(name string, nodes []string, status string) Physical

AddPartition to physical shards

func (*State) AllLocalPhysicalShards

func (s *State) AllLocalPhysicalShards() []string

func (*State) AllPhysicalShards

func (s *State) AllPhysicalShards() []string

func (*State) ApplyNodeMapping added in v1.22.0

func (s *State) ApplyNodeMapping(nodeMapping map[string]string)

ApplyNodeMapping replaces node names with their new value form nodeMapping in s. If s.LegacyBelongsToNodeForBackwardCompat is non empty, it will also perform node name replacement if present in nodeMapping.

func (*State) CountPhysicalShards

func (s *State) CountPhysicalShards() int

CountPhysicalShards return a count of physical shards

func (State) DeepCopy

func (s State) DeepCopy() State

func (*State) DeletePartition added in v1.20.0

func (s *State) DeletePartition(name string)

DeletePartition to physical shards

func (*State) GetPartitions added in v1.20.0

func (s *State) GetPartitions(nodes nodes, shards []string, replFactor int64) (map[string][]string, error)

GetPartitions based on the specified shards, available nodes, and replFactor It doesn't change the internal state

func (*State) IsLocalShard added in v1.20.0

func (s *State) IsLocalShard(name string) bool

func (*State) JSON

func (s *State) JSON() ([]byte, error)

func (*State) MigrateFromOldFormat

func (s *State) MigrateFromOldFormat()

MigrateFromOldFormat checks if the old (pre-v1.17) format was used and migrates it into the new format for backward-compatibility with all classes created before v1.17

func (*State) PhysicalShard

func (s *State) PhysicalShard(in []byte) string

func (*State) SetLocalName

func (s *State) SetLocalName(name string)

func (*State) Shard added in v1.20.0

func (s *State) Shard(partitionKey, objectID string) string

Shard returns the shard name if it exits and empty string otherwise

type Virtual

type Virtual struct {
	Name               string  `json:"name"`
	Upper              uint64  `json:"upper"`
	OwnsPercentage     float64 `json:"ownsPercentage"`
	AssignedToPhysical string  `json:"assignedToPhysical"`
}

func (Virtual) DeepCopy

func (v Virtual) DeepCopy() Virtual

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL