Documentation ¶
Index ¶
- Constants
- func ValidateConfigUpdate(old, updated Config, nodeCounter nodeCounter) error
- type Config
- type Physical
- type RemoteIncomingRepo
- type RemoteIndex
- func (ri *RemoteIndex) Aggregate(ctx context.Context, shardName string, params aggregation.Params) (*aggregation.Result, error)
- func (ri *RemoteIndex) BatchAddReferences(ctx context.Context, shardName string, refs objects.BatchReferences) []error
- func (ri *RemoteIndex) BatchPutObjects(ctx context.Context, shardName string, objs []*storobj.Object) []error
- func (ri *RemoteIndex) DeleteObject(ctx context.Context, shardName string, id strfmt.UUID) error
- func (ri *RemoteIndex) DeleteObjectBatch(ctx context.Context, shardName string, docIDs []uint64, dryRun bool) objects.BatchSimpleObjects
- func (ri *RemoteIndex) Exists(ctx context.Context, shardName string, id strfmt.UUID) (bool, error)
- func (ri *RemoteIndex) FindDocIDs(ctx context.Context, shardName string, filters *filters.LocalFilter) ([]uint64, error)
- func (ri *RemoteIndex) GetObject(ctx context.Context, shardName string, id strfmt.UUID, ...) (*storobj.Object, error)
- func (ri *RemoteIndex) GetShardStatus(ctx context.Context, shardName string) (string, error)
- func (ri *RemoteIndex) MergeObject(ctx context.Context, shardName string, mergeDoc objects.MergeDocument) error
- func (ri *RemoteIndex) MultiGetObjects(ctx context.Context, shardName string, ids []strfmt.UUID) ([]*storobj.Object, error)
- func (ri *RemoteIndex) PutObject(ctx context.Context, shardName string, obj *storobj.Object) error
- func (ri *RemoteIndex) SearchShard(ctx context.Context, shardName string, searchVector []float32, limit int, ...) ([]*storobj.Object, []float32, error)
- func (ri *RemoteIndex) UpdateShardStatus(ctx context.Context, shardName, targetStatus string) error
- type RemoteIndexClient
- type RemoteIndexIncoming
- func (rii *RemoteIndexIncoming) Aggregate(ctx context.Context, indexName, shardName string, params aggregation.Params) (*aggregation.Result, error)
- func (rii *RemoteIndexIncoming) BatchAddReferences(ctx context.Context, indexName, shardName string, refs objects.BatchReferences) []error
- func (rii *RemoteIndexIncoming) BatchPutObjects(ctx context.Context, indexName, shardName string, objs []*storobj.Object) []error
- func (rii *RemoteIndexIncoming) CreateShard(ctx context.Context, indexName, shardName string) error
- func (rii *RemoteIndexIncoming) DeleteObject(ctx context.Context, indexName, shardName string, id strfmt.UUID) error
- func (rii *RemoteIndexIncoming) DeleteObjectBatch(ctx context.Context, indexName, shardName string, docIDs []uint64, dryRun bool) objects.BatchSimpleObjects
- func (rii *RemoteIndexIncoming) DigestObjects(ctx context.Context, indexName, shardName string, ids []strfmt.UUID) ([]replica.RepairResponse, error)
- func (rii *RemoteIndexIncoming) Exists(ctx context.Context, indexName, shardName string, id strfmt.UUID) (bool, error)
- func (rii *RemoteIndexIncoming) FilePutter(ctx context.Context, indexName, shardName, filePath string) (io.WriteCloser, error)
- func (rii *RemoteIndexIncoming) FindDocIDs(ctx context.Context, indexName, shardName string, filters *filters.LocalFilter) ([]uint64, error)
- func (rii *RemoteIndexIncoming) GetObject(ctx context.Context, indexName, shardName string, id strfmt.UUID, ...) (*storobj.Object, error)
- func (rii *RemoteIndexIncoming) GetShardStatus(ctx context.Context, indexName, shardName string) (string, error)
- func (rii *RemoteIndexIncoming) MergeObject(ctx context.Context, indexName, shardName string, ...) error
- func (rii *RemoteIndexIncoming) MultiGetObjects(ctx context.Context, indexName, shardName string, ids []strfmt.UUID) ([]*storobj.Object, error)
- func (rii *RemoteIndexIncoming) OverwriteObjects(ctx context.Context, indexName, shardName string, vobjects []*objects.VObject) ([]replica.RepairResponse, error)
- func (rii *RemoteIndexIncoming) PutObject(ctx context.Context, indexName, shardName string, obj *storobj.Object) error
- func (rii *RemoteIndexIncoming) ReInitShard(ctx context.Context, indexName, shardName string) error
- func (rii *RemoteIndexIncoming) Search(ctx context.Context, indexName, shardName string, vector []float32, ...) ([]*storobj.Object, []float32, error)
- func (rii *RemoteIndexIncoming) UpdateShardStatus(ctx context.Context, indexName, shardName, targetStatus string) error
- type RemoteIndexIncomingRepo
- type RemoteNode
- type RemoteNodeClient
- type RemoteNodeIncoming
- type RemoteNodeIncomingRepo
- type State
- func (s *State) AllLocalPhysicalShards() []string
- func (s *State) AllPhysicalShards() []string
- func (s *State) CountPhysicalShards() int
- func (s State) DeepCopy() State
- func (s *State) IsShardLocal(name string) bool
- func (s *State) JSON() ([]byte, error)
- func (s *State) MigrateFromOldFormat()
- func (s *State) PhysicalShard(in []byte) string
- func (s *State) SetLocalName(name string)
- type Virtual
Constants ¶
const ( DefaultVirtualPerPhysical = 128 DefaultKey = "_id" DefaultStrategy = "hash" DefaultFunction = "murmur3" )
Variables ¶
This section is empty.
Functions ¶
func ValidateConfigUpdate ¶
Types ¶
type Config ¶
type Config struct { VirtualPerPhysical int `json:"virtualPerPhysical"` DesiredCount int `json:"desiredCount"` ActualCount int `json:"actualCount"` DesiredVirtualCount int `json:"desiredVirtualCount"` ActualVirtualCount int `json:"actualVirtualCount"` Key string `json:"key"` Strategy string `json:"strategy"` Function string `json:"function"` }
func ParseConfig ¶
type Physical ¶
type Physical struct { Name string `json:"name"` OwnsVirtual []string `json:"ownsVirtual"` OwnsPercentage float64 `json:"ownsPercentage"` LegacyBelongsToNodeForBackwardCompat string `json:"belongsToNode,omitempty"` BelongsToNodes []string `json:"belongsToNodes"` }
func (*Physical) AdjustReplicas ¶
Adjust Replicas uses a NodeIterator to add new nodes (scale out) or remove existing nodes (scale in) from the "BelongsToNodes" mappings. This is used as part of dynamically changing the replication factor. This method basically controls where a shard will land in the cluster. If we want to add some kind of node bias while scaling in the future, it would probably go here.
func (Physical) BelongsToNode ¶
BelongsToNode for backward-compatibility when there was no replication. It always returns the first node of the list
type RemoteIncomingRepo ¶
type RemoteIncomingRepo interface {
GetIndexForIncoming(className schema.ClassName) RemoteIndexIncomingRepo
}
type RemoteIndex ¶
type RemoteIndex struct {
// contains filtered or unexported fields
}
func NewRemoteIndex ¶
func NewRemoteIndex(className string, stateGetter shardingStateGetter, nodeResolver nodeResolver, client RemoteIndexClient, ) *RemoteIndex
func (*RemoteIndex) Aggregate ¶
func (ri *RemoteIndex) Aggregate(ctx context.Context, shardName string, params aggregation.Params, ) (*aggregation.Result, error)
func (*RemoteIndex) BatchAddReferences ¶
func (ri *RemoteIndex) BatchAddReferences(ctx context.Context, shardName string, refs objects.BatchReferences, ) []error
func (*RemoteIndex) BatchPutObjects ¶
func (*RemoteIndex) DeleteObject ¶
func (*RemoteIndex) DeleteObjectBatch ¶
func (ri *RemoteIndex) DeleteObjectBatch(ctx context.Context, shardName string, docIDs []uint64, dryRun bool, ) objects.BatchSimpleObjects
func (*RemoteIndex) FindDocIDs ¶
func (ri *RemoteIndex) FindDocIDs(ctx context.Context, shardName string, filters *filters.LocalFilter, ) ([]uint64, error)
func (*RemoteIndex) GetObject ¶
func (ri *RemoteIndex) GetObject(ctx context.Context, shardName string, id strfmt.UUID, props search.SelectProperties, additional additional.Properties, ) (*storobj.Object, error)
func (*RemoteIndex) GetShardStatus ¶
func (*RemoteIndex) MergeObject ¶
func (ri *RemoteIndex) MergeObject(ctx context.Context, shardName string, mergeDoc objects.MergeDocument, ) error
func (*RemoteIndex) MultiGetObjects ¶
func (*RemoteIndex) SearchShard ¶
func (ri *RemoteIndex) SearchShard(ctx context.Context, shardName string, searchVector []float32, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, ) ([]*storobj.Object, []float32, error)
func (*RemoteIndex) UpdateShardStatus ¶
func (ri *RemoteIndex) UpdateShardStatus(ctx context.Context, shardName, targetStatus string) error
type RemoteIndexClient ¶
type RemoteIndexClient interface { PutObject(ctx context.Context, hostName, indexName, shardName string, obj *storobj.Object) error BatchPutObjects(ctx context.Context, hostName, indexName, shardName string, objs []*storobj.Object, repl *additional.ReplicationProperties) []error BatchAddReferences(ctx context.Context, hostName, indexName, shardName string, refs objects.BatchReferences) []error GetObject(ctx context.Context, hostname, indexName, shardName string, id strfmt.UUID, props search.SelectProperties, additional additional.Properties) (*storobj.Object, error) Exists(ctx context.Context, hostname, indexName, shardName string, id strfmt.UUID) (bool, error) DeleteObject(ctx context.Context, hostname, indexName, shardName string, id strfmt.UUID) error MergeObject(ctx context.Context, hostname, indexName, shardName string, mergeDoc objects.MergeDocument) error MultiGetObjects(ctx context.Context, hostname, indexName, shardName string, ids []strfmt.UUID) ([]*storobj.Object, error) SearchShard(ctx context.Context, hostname, indexName, shardName string, searchVector []float32, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, ) ([]*storobj.Object, []float32, error) Aggregate(ctx context.Context, hostname, indexName, shardName string, params aggregation.Params) (*aggregation.Result, error) FindDocIDs(ctx context.Context, hostName, indexName, shardName string, filters *filters.LocalFilter) ([]uint64, error) DeleteObjectBatch(ctx context.Context, hostName, indexName, shardName string, docIDs []uint64, dryRun bool) objects.BatchSimpleObjects GetShardStatus(ctx context.Context, hostName, indexName, shardName string) (string, error) UpdateShardStatus(ctx context.Context, hostName, indexName, shardName, targetStatus string) error PutFile(ctx context.Context, hostName, indexName, shardName, fileName string, payload io.ReadSeekCloser) error }
type RemoteIndexIncoming ¶
type RemoteIndexIncoming struct {
// contains filtered or unexported fields
}
func NewRemoteIndexIncoming ¶
func NewRemoteIndexIncoming(repo RemoteIncomingRepo) *RemoteIndexIncoming
func (*RemoteIndexIncoming) Aggregate ¶
func (rii *RemoteIndexIncoming) Aggregate(ctx context.Context, indexName, shardName string, params aggregation.Params, ) (*aggregation.Result, error)
func (*RemoteIndexIncoming) BatchAddReferences ¶
func (rii *RemoteIndexIncoming) BatchAddReferences(ctx context.Context, indexName, shardName string, refs objects.BatchReferences, ) []error
func (*RemoteIndexIncoming) BatchPutObjects ¶
func (*RemoteIndexIncoming) CreateShard ¶
func (rii *RemoteIndexIncoming) CreateShard(ctx context.Context, indexName, shardName string, ) error
func (*RemoteIndexIncoming) DeleteObject ¶
func (*RemoteIndexIncoming) DeleteObjectBatch ¶
func (rii *RemoteIndexIncoming) DeleteObjectBatch(ctx context.Context, indexName, shardName string, docIDs []uint64, dryRun bool, ) objects.BatchSimpleObjects
func (*RemoteIndexIncoming) DigestObjects ¶ added in v1.18.0
func (rii *RemoteIndexIncoming) DigestObjects(ctx context.Context, indexName, shardName string, ids []strfmt.UUID, ) ([]replica.RepairResponse, error)
func (*RemoteIndexIncoming) FilePutter ¶
func (rii *RemoteIndexIncoming) FilePutter(ctx context.Context, indexName, shardName, filePath string, ) (io.WriteCloser, error)
func (*RemoteIndexIncoming) FindDocIDs ¶
func (rii *RemoteIndexIncoming) FindDocIDs(ctx context.Context, indexName, shardName string, filters *filters.LocalFilter, ) ([]uint64, error)
func (*RemoteIndexIncoming) GetObject ¶
func (rii *RemoteIndexIncoming) GetObject(ctx context.Context, indexName, shardName string, id strfmt.UUID, selectProperties search.SelectProperties, additional additional.Properties, ) (*storobj.Object, error)
func (*RemoteIndexIncoming) GetShardStatus ¶
func (*RemoteIndexIncoming) MergeObject ¶
func (rii *RemoteIndexIncoming) MergeObject(ctx context.Context, indexName, shardName string, mergeDoc objects.MergeDocument, ) error
func (*RemoteIndexIncoming) MultiGetObjects ¶
func (*RemoteIndexIncoming) OverwriteObjects ¶ added in v1.18.0
func (rii *RemoteIndexIncoming) OverwriteObjects(ctx context.Context, indexName, shardName string, vobjects []*objects.VObject, ) ([]replica.RepairResponse, error)
func (*RemoteIndexIncoming) ReInitShard ¶
func (rii *RemoteIndexIncoming) ReInitShard(ctx context.Context, indexName, shardName string, ) error
func (*RemoteIndexIncoming) Search ¶
func (rii *RemoteIndexIncoming) Search(ctx context.Context, indexName, shardName string, vector []float32, distance float32, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, ) ([]*storobj.Object, []float32, error)
func (*RemoteIndexIncoming) UpdateShardStatus ¶
func (rii *RemoteIndexIncoming) UpdateShardStatus(ctx context.Context, indexName, shardName, targetStatus string, ) error
type RemoteIndexIncomingRepo ¶
type RemoteIndexIncomingRepo interface { IncomingPutObject(ctx context.Context, shardName string, obj *storobj.Object) error IncomingBatchPutObjects(ctx context.Context, shardName string, objs []*storobj.Object) []error IncomingBatchAddReferences(ctx context.Context, shardName string, refs objects.BatchReferences) []error IncomingGetObject(ctx context.Context, shardName string, id strfmt.UUID, selectProperties search.SelectProperties, additional additional.Properties) (*storobj.Object, error) IncomingExists(ctx context.Context, shardName string, id strfmt.UUID) (bool, error) IncomingDeleteObject(ctx context.Context, shardName string, id strfmt.UUID) error IncomingMergeObject(ctx context.Context, shardName string, mergeDoc objects.MergeDocument) error IncomingMultiGetObjects(ctx context.Context, shardName string, ids []strfmt.UUID) ([]*storobj.Object, error) IncomingSearch(ctx context.Context, shardName string, vector []float32, distance float32, limit int, filters *filters.LocalFilter, keywordRanking *searchparams.KeywordRanking, sort []filters.Sort, cursor *filters.Cursor, additional additional.Properties, ) ([]*storobj.Object, []float32, error) IncomingAggregate(ctx context.Context, shardName string, params aggregation.Params) (*aggregation.Result, error) IncomingFindDocIDs(ctx context.Context, shardName string, filters *filters.LocalFilter) ([]uint64, error) IncomingDeleteObjectBatch(ctx context.Context, shardName string, docIDs []uint64, dryRun bool) objects.BatchSimpleObjects IncomingGetShardStatus(ctx context.Context, shardName string) (string, error) IncomingUpdateShardStatus(ctx context.Context, shardName, targetStatus string) error IncomingOverwriteObjects(ctx context.Context, shard string, vobjects []*objects.VObject) ([]replica.RepairResponse, error) IncomingDigestObjects(ctx context.Context, shardName string, ids []strfmt.UUID) (result []replica.RepairResponse, err error) // Scale-Out Replication POC IncomingFilePutter(ctx context.Context, shardName, filePath string) (io.WriteCloser, error) IncomingCreateShard(ctx context.Context, shardName string) error IncomingReinitShard(ctx context.Context, shardName string) error }
type RemoteNode ¶
type RemoteNode struct {
// contains filtered or unexported fields
}
func NewRemoteNode ¶
func NewRemoteNode(nodeResolver nodeResolver, client RemoteNodeClient) *RemoteNode
func (*RemoteNode) GetNodeStatus ¶
func (rn *RemoteNode) GetNodeStatus(ctx context.Context, nodeName string) (*models.NodeStatus, error)
type RemoteNodeClient ¶
type RemoteNodeIncoming ¶
type RemoteNodeIncoming struct {
// contains filtered or unexported fields
}
func NewRemoteNodeIncoming ¶
func NewRemoteNodeIncoming(repo RemoteNodeIncomingRepo) *RemoteNodeIncoming
func (*RemoteNodeIncoming) GetNodeStatus ¶
func (rni *RemoteNodeIncoming) GetNodeStatus(ctx context.Context) (*models.NodeStatus, error)
type RemoteNodeIncomingRepo ¶
type RemoteNodeIncomingRepo interface {
IncomingGetNodeStatus(ctx context.Context) (*models.NodeStatus, error)
}
type State ¶
type State struct { IndexID string `json:"indexID"` // for monitoring, reporting purposes. Does not influence the shard-calculations Config Config `json:"config"` Physical map[string]Physical `json:"physical"` Virtual []Virtual `json:"virtual"` // contains filtered or unexported fields }
func StateFromJSON ¶
func (*State) AllLocalPhysicalShards ¶
func (*State) AllPhysicalShards ¶
func (*State) CountPhysicalShards ¶
CountPhysicalShards return a count of pysical shards
func (*State) IsShardLocal ¶
func (*State) MigrateFromOldFormat ¶
func (s *State) MigrateFromOldFormat()
MigrateFromOldFormat checks if the old (pre-v1.17) format was used and migrates it into the new format for backward-compatibility with all classes created before v1.17