Documentation ¶
Index ¶
- Constants
- func ValidateConfigUpdate(old, updated Config, nodeCounter nodeCounter) error
- type Config
- type Physical
- type RemoteIncomingRepo
- type RemoteIndex
- func (ri *RemoteIndex) Aggregate(ctx context.Context, shard string, params aggregation.Params) (*aggregation.Result, error)
- func (ri *RemoteIndex) BatchAddReferences(ctx context.Context, shardName string, refs objects.BatchReferences) []error
- func (ri *RemoteIndex) BatchPutObjects(ctx context.Context, shardName string, objs []*storobj.Object) []error
- func (ri *RemoteIndex) DeleteObject(ctx context.Context, shardName string, id strfmt.UUID) error
- func (ri *RemoteIndex) DeleteObjectBatch(ctx context.Context, shardName string, docIDs []uint64, dryRun bool) objects.BatchSimpleObjects
- func (ri *RemoteIndex) Exists(ctx context.Context, shardName string, id strfmt.UUID) (bool, error)
- func (ri *RemoteIndex) FindDocIDs(ctx context.Context, shardName string, filters *filters.LocalFilter) ([]uint64, error)
- func (ri *RemoteIndex) GetObject(ctx context.Context, shardName string, id strfmt.UUID, ...) (*storobj.Object, error)
- func (ri *RemoteIndex) GetShardStatus(ctx context.Context, shardName string) (string, error)
- func (ri *RemoteIndex) MergeObject(ctx context.Context, shardName string, mergeDoc objects.MergeDocument) error
- func (ri *RemoteIndex) MultiGetObjects(ctx context.Context, shardName string, ids []strfmt.UUID) ([]*storobj.Object, error)
- func (ri *RemoteIndex) PutObject(ctx context.Context, shardName string, obj *storobj.Object) error
- func (ri *RemoteIndex) SearchShard(ctx context.Context, shard string, queryVec []float32, limit int, ...) ([]*storobj.Object, []float32, error)
- func (ri *RemoteIndex) UpdateShardStatus(ctx context.Context, shardName, targetStatus string) error
- type RemoteIndexClient
- type RemoteIndexIncoming
- func (rii *RemoteIndexIncoming) Aggregate(ctx context.Context, indexName, shardName string, params aggregation.Params) (*aggregation.Result, error)
- func (rii *RemoteIndexIncoming) BatchAddReferences(ctx context.Context, indexName, shardName string, refs objects.BatchReferences) []error
- func (rii *RemoteIndexIncoming) BatchPutObjects(ctx context.Context, indexName, shardName string, objs []*storobj.Object) []error
- func (rii *RemoteIndexIncoming) CreateShard(ctx context.Context, indexName, shardName string) error
- func (rii *RemoteIndexIncoming) DeleteObject(ctx context.Context, indexName, shardName string, id strfmt.UUID) error
- func (rii *RemoteIndexIncoming) DeleteObjectBatch(ctx context.Context, indexName, shardName string, docIDs []uint64, dryRun bool) objects.BatchSimpleObjects
- func (rii *RemoteIndexIncoming) DigestObjects(ctx context.Context, indexName, shardName string, ids []strfmt.UUID) ([]replica.RepairResponse, error)
- func (rii *RemoteIndexIncoming) Exists(ctx context.Context, indexName, shardName string, id strfmt.UUID) (bool, error)
- func (rii *RemoteIndexIncoming) FilePutter(ctx context.Context, indexName, shardName, filePath string) (io.WriteCloser, error)
- func (rii *RemoteIndexIncoming) FindDocIDs(ctx context.Context, indexName, shardName string, filters *filters.LocalFilter) ([]uint64, error)
- func (rii *RemoteIndexIncoming) GetObject(ctx context.Context, indexName, shardName string, id strfmt.UUID, ...) (*storobj.Object, error)
- func (rii *RemoteIndexIncoming) GetShardStatus(ctx context.Context, indexName, shardName string) (string, error)
- func (rii *RemoteIndexIncoming) MergeObject(ctx context.Context, indexName, shardName string, ...) error
- func (rii *RemoteIndexIncoming) MultiGetObjects(ctx context.Context, indexName, shardName string, ids []strfmt.UUID) ([]*storobj.Object, error)
- func (rii *RemoteIndexIncoming) OverwriteObjects(ctx context.Context, indexName, shardName string, vobjects []*objects.VObject) ([]replica.RepairResponse, error)
- func (rii *RemoteIndexIncoming) PutObject(ctx context.Context, indexName, shardName string, obj *storobj.Object) error
- func (rii *RemoteIndexIncoming) ReInitShard(ctx context.Context, indexName, shardName string) error
- func (rii *RemoteIndexIncoming) Search(ctx context.Context, indexName, shardName string, vector []float32, ...) ([]*storobj.Object, []float32, error)
- func (rii *RemoteIndexIncoming) UpdateShardStatus(ctx context.Context, indexName, shardName, targetStatus string) error
- type RemoteIndexIncomingRepo
- type RemoteNode
- type RemoteNodeClient
- type RemoteNodeIncoming
- type RemoteNodeIncomingRepo
- type State
- func (s *State) AddPartition(name string, nodes []string, status string) Physical
- func (s *State) AllLocalPhysicalShards() []string
- func (s *State) AllPhysicalShards() []string
- func (s *State) CountPhysicalShards() int
- func (s State) DeepCopy() State
- func (s *State) DeletePartition(name string)
- func (s *State) GetPartitions(nodes nodes, shards []string, replFactor int64) (map[string][]string, error)
- func (s *State) IsLocalShard(name string) bool
- func (s *State) JSON() ([]byte, error)
- func (s *State) MigrateFromOldFormat()
- func (s *State) PhysicalShard(in []byte) string
- func (s *State) SetLocalName(name string)
- func (s *State) Shard(partitionKey, objectID string) string
- type Virtual
Constants ¶
View Source
const ( DefaultVirtualPerPhysical = 128 DefaultKey = "_id" DefaultStrategy = "hash" DefaultFunction = "murmur3" )
Variables ¶
This section is empty.
Functions ¶
func ValidateConfigUpdate ¶
Types ¶
type Config ¶
type Config struct { VirtualPerPhysical int `json:"virtualPerPhysical"` DesiredCount int `json:"desiredCount"` ActualCount int `json:"actualCount"` DesiredVirtualCount int `json:"desiredVirtualCount"` ActualVirtualCount int `json:"actualVirtualCount"` Key string `json:"key"` Strategy string `json:"strategy"` Function string `json:"function"` }
func ParseConfig ¶
type Physical ¶
type Physical struct { Name string `json:"name"` OwnsVirtual []string `json:"ownsVirtual,omitempty"` OwnsPercentage float64 `json:"ownsPercentage"` LegacyBelongsToNodeForBackwardCompat string `json:"belongsToNode,omitempty"` BelongsToNodes []string `json:"belongsToNodes,omitempty"` Status string `json:"status,omitempty"` }
func (*Physical) ActivityStatus ¶ added in v1.21.0
func (*Physical) AdjustReplicas ¶
AdjustReplicas shrinks or extends the replica set (p.BelongsToNodes)
func (Physical) BelongsToNode ¶
BelongsToNode for backward-compatibility when there was no replication. It always returns the first node of the list
type RemoteIncomingRepo ¶
type RemoteIncomingRepo interface {
GetIndexForIncoming(className schema.ClassName) RemoteIndexIncomingRepo
}
type RemoteIndex ¶
type RemoteIndex struct {
// contains filtered or unexported fields
}
func NewRemoteIndex ¶
func NewRemoteIndex(className string, stateGetter shardingStateGetter, nodeResolver nodeResolver, client RemoteIndexClient, ) *RemoteIndex
func (*RemoteIndex) Aggregate ¶
func (ri *RemoteIndex) Aggregate( ctx context.Context, shard string, params aggregation.Params, ) (*aggregation.Result, error)
func (*RemoteIndex) BatchAddReferences ¶
func (ri *RemoteIndex) BatchAddReferences(ctx context.Context, shardName string, refs objects.BatchReferences, ) []error
func (*RemoteIndex) BatchPutObjects ¶
func (*RemoteIndex) DeleteObject ¶
func (*RemoteIndex) DeleteObjectBatch ¶
func (ri *RemoteIndex) DeleteObjectBatch(ctx context.Context, shardName string, docIDs []uint64, dryRun bool, ) objects.BatchSimpleObjects
func (*RemoteIndex) FindDocIDs ¶
func (ri *RemoteIndex) FindDocIDs(ctx context.Context, shardName string, filters *filters.LocalFilter, ) ([]uint64, error)
func (*RemoteIndex) GetObject ¶
func (ri *RemoteIndex) GetObject(ctx context.Context, shardName string, id strfmt.UUID, props search.SelectProperties, additional additional.Properties, ) (*storobj.Object, error)
func (*RemoteIndex) GetShardStatus ¶
func (*RemoteIndex) MergeObject ¶
func (ri *RemoteIndex) MergeObject(ctx context.Context, shardName string, mergeDoc objects.MergeDocument, ) error
func (*RemoteIndex) MultiGetObjects ¶
func (*RemoteIndex) SearchShard ¶
func (ri *RemoteIndex) SearchShard(ctx context.Context, shard string, queryVec []float32, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, groupBy *searchparams.GroupBy, adds additional.Properties, replEnabled bool, ) ([]*storobj.Object, []float32, error)
func (*RemoteIndex) UpdateShardStatus ¶
func (ri *RemoteIndex) UpdateShardStatus(ctx context.Context, shardName, targetStatus string) error
type RemoteIndexClient ¶
type RemoteIndexClient interface { PutObject(ctx context.Context, hostName, indexName, shardName string, obj *storobj.Object) error BatchPutObjects(ctx context.Context, hostName, indexName, shardName string, objs []*storobj.Object, repl *additional.ReplicationProperties) []error BatchAddReferences(ctx context.Context, hostName, indexName, shardName string, refs objects.BatchReferences) []error GetObject(ctx context.Context, hostname, indexName, shardName string, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) Exists(ctx context.Context, hostname, indexName, shardName string, id strfmt.UUID) (bool, error) DeleteObject(ctx context.Context, hostname, indexName, shardName string, id strfmt.UUID) error MergeObject(ctx context.Context, hostname, indexName, shardName string, mergeDoc objects.MergeDocument) error MultiGetObjects(ctx context.Context, hostname, indexName, shardName string, ids []strfmt.UUID) ([]*storobj.Object, error) SearchShard(ctx context.Context, hostname, indexName, shardName string, searchVector []float32, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, groupBy *searchparams.GroupBy, additional additional.Properties, ) ([]*storobj.Object, []float32, error) Aggregate(ctx context.Context, hostname, indexName, shardName string, params aggregation.Params) (*aggregation.Result, error) FindDocIDs(ctx context.Context, hostName, indexName, shardName string, filters *filters.LocalFilter) ([]uint64, error) DeleteObjectBatch(ctx context.Context, hostName, indexName, shardName string, docIDs []uint64, dryRun bool) objects.BatchSimpleObjects GetShardStatus(ctx context.Context, hostName, indexName, shardName string) (string, error) UpdateShardStatus(ctx context.Context, hostName, indexName, shardName, targetStatus string) error PutFile(ctx context.Context, hostName, indexName, shardName, fileName string, payload io.ReadSeekCloser) error }
type RemoteIndexIncoming ¶
type RemoteIndexIncoming struct {
// contains filtered or unexported fields
}
func NewRemoteIndexIncoming ¶
func NewRemoteIndexIncoming(repo RemoteIncomingRepo) *RemoteIndexIncoming
func (*RemoteIndexIncoming) Aggregate ¶
func (rii *RemoteIndexIncoming) Aggregate(ctx context.Context, indexName, shardName string, params aggregation.Params, ) (*aggregation.Result, error)
func (*RemoteIndexIncoming) BatchAddReferences ¶
func (rii *RemoteIndexIncoming) BatchAddReferences(ctx context.Context, indexName, shardName string, refs objects.BatchReferences, ) []error
func (*RemoteIndexIncoming) BatchPutObjects ¶
func (*RemoteIndexIncoming) CreateShard ¶
func (rii *RemoteIndexIncoming) CreateShard(ctx context.Context, indexName, shardName string, ) error
func (*RemoteIndexIncoming) DeleteObject ¶
func (*RemoteIndexIncoming) DeleteObjectBatch ¶
func (rii *RemoteIndexIncoming) DeleteObjectBatch(ctx context.Context, indexName, shardName string, docIDs []uint64, dryRun bool, ) objects.BatchSimpleObjects
func (*RemoteIndexIncoming) DigestObjects ¶ added in v1.18.0
func (rii *RemoteIndexIncoming) DigestObjects(ctx context.Context, indexName, shardName string, ids []strfmt.UUID, ) ([]replica.RepairResponse, error)
func (*RemoteIndexIncoming) FilePutter ¶
func (rii *RemoteIndexIncoming) FilePutter(ctx context.Context, indexName, shardName, filePath string, ) (io.WriteCloser, error)
func (*RemoteIndexIncoming) FindDocIDs ¶
func (rii *RemoteIndexIncoming) FindDocIDs(ctx context.Context, indexName, shardName string, filters *filters.LocalFilter, ) ([]uint64, error)
func (*RemoteIndexIncoming) GetObject ¶
func (rii *RemoteIndexIncoming) GetObject(ctx context.Context, indexName, shardName string, id strfmt.UUID, selectProperties search.SelectProperties, additional additional.Properties, ) (*storobj.Object, error)
func (*RemoteIndexIncoming) GetShardStatus ¶
func (*RemoteIndexIncoming) MergeObject ¶
func (rii *RemoteIndexIncoming) MergeObject(ctx context.Context, indexName, shardName string, mergeDoc objects.MergeDocument, ) error
func (*RemoteIndexIncoming) MultiGetObjects ¶
func (*RemoteIndexIncoming) OverwriteObjects ¶ added in v1.18.0
func (rii *RemoteIndexIncoming) OverwriteObjects(ctx context.Context, indexName, shardName string, vobjects []*objects.VObject, ) ([]replica.RepairResponse, error)
func (*RemoteIndexIncoming) ReInitShard ¶
func (rii *RemoteIndexIncoming) ReInitShard(ctx context.Context, indexName, shardName string, ) error
func (*RemoteIndexIncoming) Search ¶
func (rii *RemoteIndexIncoming) Search(ctx context.Context, indexName, shardName string, vector []float32, distance float32, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, groupBy *searchparams.GroupBy, additional additional.Properties, ) ([]*storobj.Object, []float32, error)
func (*RemoteIndexIncoming) UpdateShardStatus ¶
func (rii *RemoteIndexIncoming) UpdateShardStatus(ctx context.Context, indexName, shardName, targetStatus string, ) error
type RemoteIndexIncomingRepo ¶
type RemoteIndexIncomingRepo interface { IncomingPutObject(ctx context.Context, shardName string, obj *storobj.Object) error IncomingBatchPutObjects(ctx context.Context, shardName string, objs []*storobj.Object) []error IncomingBatchAddReferences(ctx context.Context, shardName string, refs objects.BatchReferences) []error IncomingGetObject(ctx context.Context, shardName string, id strfmt.UUID, selectProperties search.SelectProperties, additional additional.Properties) (*storobj.Object, error) IncomingExists(ctx context.Context, shardName string, id strfmt.UUID) (bool, error) IncomingDeleteObject(ctx context.Context, shardName string, id strfmt.UUID) error IncomingMergeObject(ctx context.Context, shardName string, mergeDoc objects.MergeDocument) error IncomingMultiGetObjects(ctx context.Context, shardName string, ids []strfmt.UUID) ([]*storobj.Object, error) IncomingSearch(ctx context.Context, shardName string, vector []float32, distance float32, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, groupBy *searchparams.GroupBy, additional additional.Properties, ) ([]*storobj.Object, []float32, error) IncomingAggregate(ctx context.Context, shardName string, params aggregation.Params) (*aggregation.Result, error) IncomingFindDocIDs(ctx context.Context, shardName string, filters *filters.LocalFilter) ([]uint64, error) IncomingDeleteObjectBatch(ctx context.Context, shardName string, docIDs []uint64, dryRun bool) objects.BatchSimpleObjects IncomingGetShardStatus(ctx context.Context, shardName string) (string, error) IncomingUpdateShardStatus(ctx context.Context, shardName, targetStatus string) error IncomingOverwriteObjects(ctx context.Context, shard string, vobjects []*objects.VObject) ([]replica.RepairResponse, error) IncomingDigestObjects(ctx context.Context, shardName string, ids []strfmt.UUID) (result []replica.RepairResponse, err error) // Scale-Out Replication POC IncomingFilePutter(ctx context.Context, shardName, filePath string) (io.WriteCloser, error) IncomingCreateShard(ctx context.Context, shardName string) error IncomingReinitShard(ctx context.Context, shardName string) error }
type RemoteNode ¶
type RemoteNode struct {
// contains filtered or unexported fields
}
func NewRemoteNode ¶
func NewRemoteNode(nodeResolver nodeResolver, client RemoteNodeClient) *RemoteNode
func (*RemoteNode) GetNodeStatus ¶
func (rn *RemoteNode) GetNodeStatus(ctx context.Context, nodeName string, className string) (*models.NodeStatus, error)
type RemoteNodeClient ¶
type RemoteNodeIncoming ¶
type RemoteNodeIncoming struct {
// contains filtered or unexported fields
}
func NewRemoteNodeIncoming ¶
func NewRemoteNodeIncoming(repo RemoteNodeIncomingRepo) *RemoteNodeIncoming
func (*RemoteNodeIncoming) GetNodeStatus ¶
func (rni *RemoteNodeIncoming) GetNodeStatus(ctx context.Context, className string) (*models.NodeStatus, error)
type RemoteNodeIncomingRepo ¶
type State ¶
type State struct { IndexID string `json:"indexID"` // for monitoring, reporting purposes. Does not influence the shard-calculations Config Config `json:"config"` Physical map[string]Physical `json:"physical"` Virtual []Virtual `json:"virtual"` PartitioningEnabled bool `json:"partitioningEnabled"` // contains filtered or unexported fields }
func StateFromJSON ¶
func (*State) AddPartition ¶ added in v1.20.0
AddPartition to physical shards
func (*State) AllLocalPhysicalShards ¶
func (*State) AllPhysicalShards ¶
func (*State) CountPhysicalShards ¶
CountPhysicalShards return a count of physical shards
func (*State) DeletePartition ¶ added in v1.20.0
DeletePartition to physical shards
func (*State) GetPartitions ¶ added in v1.20.0
func (s *State) GetPartitions(nodes nodes, shards []string, replFactor int64) (map[string][]string, error)
GetPartitions based on the specified shards, available nodes, and replFactor It doesn't change the internal state
func (*State) IsLocalShard ¶ added in v1.20.0
func (*State) MigrateFromOldFormat ¶
func (s *State) MigrateFromOldFormat()
MigrateFromOldFormat checks if the old (pre-v1.17) format was used and migrates it into the new format for backward-compatibility with all classes created before v1.17
func (*State) PhysicalShard ¶
func (*State) SetLocalName ¶
Click to show internal directories.
Click to hide internal directories.