Documentation ¶
Overview ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Copyright (c) 2014 Couchbase, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
Index ¶
- Constants
- Variables
- func GetAllDeletedIndexInstancesId(mgr *IndexManager, buckets []string) ([]uint64, error)
- func GetChangeRecordAsProtoMsg(mgr *IndexManager, changes []*changeRecord, port string) ([]*protobuf.Instance, error)
- func GetIndexInstanceAsProtoMsg(mgr *IndexManager, bucket string, defnId common.IndexDefnId, port string) ([]*protobuf.Instance, error)
- func GetIndexInstancesIdByDefn(mgr *IndexManager, bucket string, defnId common.IndexDefnId) ([]uint64, error)
- func MarshallIndexTopology(topology *IndexTopology) ([]byte, error)
- func NewLocalMetadataRepo(msgAddr string, eventMgr *eventManager, ...) (*MetadataRepo, RequestServer, error)
- type BackupResponse
- type ClusterIndexMetadata
- type Coordinator
- func (c *Coordinator) Abort(fid string, reqId uint64, err string) error
- func (c *Coordinator) AddPendingRequest(handle *protocol.RequestHandle)
- func (c *Coordinator) CleanupOnError()
- func (c *Coordinator) Commit(txid common.Txnid) error
- func (c *Coordinator) GetAcceptedEpoch() (uint32, error)
- func (c *Coordinator) GetCommitedEntries(txid1, txid2 common.Txnid) (<-chan protocol.LogEntryMsg, <-chan error, chan<- bool, error)
- func (c *Coordinator) GetCurrentEpoch() (uint32, error)
- func (c *Coordinator) GetEnsembleSize() uint64
- func (c *Coordinator) GetFollowerId() string
- func (c *Coordinator) GetLastCommittedTxid() (common.Txnid, error)
- func (c *Coordinator) GetLastLoggedTxid() (common.Txnid, error)
- func (c *Coordinator) GetNextTxnId() common.Txnid
- func (c *Coordinator) GetQuorumVerifier() protocol.QuorumVerifier
- func (c *Coordinator) GetRequestChannel() <-chan *protocol.RequestHandle
- func (c *Coordinator) GetStatus() protocol.PeerStatus
- func (c *Coordinator) HasQuorum(count int) bool
- func (s *Coordinator) IsDone() bool
- func (c *Coordinator) LogAndCommit(txid common.Txnid, op uint32, key string, content []byte, toCommit bool) error
- func (c *Coordinator) LogProposal(proposal protocol.ProposalMsg) error
- func (s *Coordinator) NewRequest(opCode uint32, key string, content []byte) bool
- func (c *Coordinator) NotifyNewAcceptedEpoch(epoch uint32) error
- func (c *Coordinator) NotifyNewCurrentEpoch(epoch uint32) error
- func (c *Coordinator) Respond(fid string, reqId uint64, err string, content []byte) error
- func (s *Coordinator) Run(config string)
- func (s *Coordinator) Terminate()
- type CoordinatorState
- type Error
- func NewError(code errCode, severity errSeverity, category errCategory, cause error, ...) Error
- func NewError2(code errCode, category errCategory) Error
- func NewError3(code errCode, severity errSeverity, category errCategory) Error
- func NewError4(code errCode, severity errSeverity, category errCategory, msg string) Error
- type EventType
- type GlobalTopology
- type IndexDefnDistribution
- type IndexInstDistribution
- type IndexKeyPartDistribution
- type IndexManager
- func (m *IndexManager) CleanupIndex(defnId common.IndexDefnId) error
- func (m *IndexManager) Close()
- func (m *IndexManager) DeleteIndexForBucket(bucket string, streamId common.StreamId) error
- func (m *IndexManager) DeleteLocalValue(key string) error
- func (m *IndexManager) FetchNewClusterInfoCache() (*common.ClusterInfoCache, error)
- func (m *IndexManager) GetGlobalTopology() (*GlobalTopology, error)
- func (m *IndexManager) GetIndexDefnById(id common.IndexDefnId) (*common.IndexDefn, error)
- func (m *IndexManager) GetLocalValue(key string) (string, error)
- func (m *IndexManager) GetMemoryQuota() uint64
- func (m *IndexManager) GetTopologyByBucket(bucket string) (*IndexTopology, error)
- func (m *IndexManager) HandleBuildIndexDDL(indexIds client.IndexIdList) error
- func (m *IndexManager) HandleCreateIndexDDL(defn *common.IndexDefn, isRebalReq bool) error
- func (m *IndexManager) HandleDeleteIndexDDL(defnId common.IndexDefnId) error
- func (m *IndexManager) IsClose() bool
- func (m *IndexManager) NewIndexDefnIterator() (*MetaIterator, error)
- func (m *IndexManager) NotifyConfigUpdate(config common.Config) error
- func (m *IndexManager) NotifyIndexerReady() error
- func (m *IndexManager) NotifyStats(stats common.Statistics) error
- func (m *IndexManager) RegisterNotifier(notifier MetadataNotifier)
- func (m *IndexManager) ResetIndex(index common.IndexDefn) error
- func (m *IndexManager) SetLocalValue(key string, value string) error
- func (m *IndexManager) SetTopologyByBucket(bucket string, topology *IndexTopology) error
- func (mgr *IndexManager) StartCoordinator(config string)
- func (m *IndexManager) StartListenIndexCreate(id string) (<-chan interface{}, error)
- func (m *IndexManager) StartListenIndexDelete(id string) (<-chan interface{}, error)
- func (m *IndexManager) StartListenTopologyUpdate(id string) (<-chan interface{}, error)
- func (m *IndexManager) StopListenIndexCreate(id string)
- func (m *IndexManager) StopListenIndexDelete(id string)
- func (m *IndexManager) StopListenTopologyUpdate(id string)
- func (m *IndexManager) UpdateIndexInstance(bucket string, defnId common.IndexDefnId, state common.IndexState, ...) error
- func (m *IndexManager) UpdateIndexInstanceSync(bucket string, defnId common.IndexDefnId, state common.IndexState, ...) error
- type IndexPartDistribution
- type IndexRequest
- type IndexResponse
- type IndexSinglePartDistribution
- type IndexSliceLocator
- type IndexStatus
- type IndexStatusResponse
- type IndexTopology
- func (t *IndexTopology) AddIndexDefinition(bucket string, name string, defnId uint64, instId uint64, state uint32, ...)
- func (t *IndexTopology) ChangeStateForIndexInstByDefn(defnId common.IndexDefnId, fromState, toState common.IndexState)
- func (t *IndexTopology) FindIndexDefinition(bucket string, name string) *IndexDefnDistribution
- func (t *IndexTopology) FindIndexDefinitionById(id common.IndexDefnId) *IndexDefnDistribution
- func (t *IndexTopology) GetIndexInstByDefn(defnId common.IndexDefnId) *IndexInstDistribution
- func (t *IndexTopology) GetIndexInstancesByDefn(defnId common.IndexDefnId) []IndexInstDistribution
- func (t *IndexTopology) GetRStatusByDefn(defnId common.IndexDefnId) common.RebalanceState
- func (t *IndexTopology) GetStatusByDefn(defnId common.IndexDefnId) (common.IndexState, string)
- func (t *IndexTopology) RemoveIndexDefinition(bucket string, name string)
- func (t *IndexTopology) RemoveIndexDefinitionById(id common.IndexDefnId)
- func (t *IndexTopology) SetErrorForIndexInstByDefn(defnId common.IndexDefnId, errorStr string) bool
- func (t *IndexTopology) UpdateOldStorageModeForIndexInstByDefn(defnId common.IndexDefnId, storageMode string) bool
- func (t *IndexTopology) UpdateRebalanceStateForIndexInstByDefn(defnId common.IndexDefnId, state common.RebalanceState) bool
- func (t *IndexTopology) UpdateScheduledFlagForIndexInstByDefn(defnId common.IndexDefnId, scheduled bool) bool
- func (t *IndexTopology) UpdateStateForIndexInstByDefn(defnId common.IndexDefnId, state common.IndexState) bool
- func (t *IndexTopology) UpdateStorageModeForIndexInstByDefn(defnId common.IndexDefnId, storageMode string) bool
- func (t *IndexTopology) UpdateStreamForIndexInstByDefn(defnId common.IndexDefnId, stream common.StreamId) bool
- type LifecycleMgr
- func (m *LifecycleMgr) BuildIndexes(ids []common.IndexDefnId, reqCtx *common.MetadataRequestContext, retry bool) ([]*common.IndexDefn, []common.IndexDefnId, []error)
- func (m *LifecycleMgr) CreateIndex(defn *common.IndexDefn, scheduled bool, reqCtx *common.MetadataRequestContext) error
- func (m *LifecycleMgr) DeleteIndex(id common.IndexDefnId, notify bool, reqCtx *common.MetadataRequestContext) error
- func (m *LifecycleMgr) FindLocalIndexInst(bucket string, defnId common.IndexDefnId) (*IndexInstDistribution, error)
- func (m *LifecycleMgr) GetResponseChannel() <-chan c.Packet
- func (m *LifecycleMgr) OnNewRequest(fid string, request protocol.RequestMsg)
- func (m *LifecycleMgr) RegisterNotifier(notifier MetadataNotifier)
- func (m *LifecycleMgr) Run(repo *MetadataRepo, requestServer RequestServer)
- func (m *LifecycleMgr) SetScheduledFlag(bucket string, defnId common.IndexDefnId, scheduled bool) error
- func (m *LifecycleMgr) Terminate()
- func (m *LifecycleMgr) UpdateIndexInstance(bucket string, defnId common.IndexDefnId, state common.IndexState, ...) error
- type LocalIndexMetadata
- type LocalRepoRef
- type MetaIterator
- type MetadataKind
- type MetadataNotifier
- type MetadataRepo
- func (c *MetadataRepo) BroadcastIndexStats(stats *client.IndexStats) error
- func (c *MetadataRepo) BroadcastServiceMap(serviceMap *client.ServiceMap) error
- func (c *MetadataRepo) Close()
- func (c *MetadataRepo) CreateIndex(defn *common.IndexDefn) error
- func (c *MetadataRepo) DeleteLocalValue(key string) error
- func (c *MetadataRepo) DropIndexById(id common.IndexDefnId) error
- func (c *MetadataRepo) GetGlobalTopology() (*GlobalTopology, error)
- func (c *MetadataRepo) GetIndexDefnById(id common.IndexDefnId) (*common.IndexDefn, error)
- func (c *MetadataRepo) GetIndexDefnByName(bucket string, name string) (*common.IndexDefn, error)
- func (c *MetadataRepo) GetLocalIndexerId() (common.IndexerId, error)
- func (c *MetadataRepo) GetLocalNodeUUID() (string, error)
- func (c *MetadataRepo) GetLocalValue(key string) (string, error)
- func (c *MetadataRepo) GetNextIndexInstId() (common.IndexInstId, error)
- func (c *MetadataRepo) GetNextPartitionId() (common.PartitionId, error)
- func (c *MetadataRepo) GetTopologyByBucket(bucket string) (*IndexTopology, error)
- func (c *MetadataRepo) NewIterator() (*MetaIterator, error)
- func (c *MetadataRepo) NewTopologyIterator() (*TopologyIterator, error)
- func (c *MetadataRepo) RegisterNotifier(notifier MetadataNotifier)
- func (c *MetadataRepo) SetGlobalTopology(topology *GlobalTopology) error
- func (c *MetadataRepo) SetLocalValue(key string, value string) error
- func (c *MetadataRepo) SetTopologyByBucket(bucket string, topology *IndexTopology) error
- func (c *MetadataRepo) UpdateIndex(defn *common.IndexDefn) error
- type RemoteRepoRef
- type Reply
- type RepoRef
- type Request
- type RequestServer
- type RequestType
- type RestoreContext
- type RestoreResponse
- type TopologyIterator
Constants ¶
const ( OPCODE_ADD_IDX_DEFN common.OpCode = iota OPCODE_DEL_IDX_DEFN )
const ( // generic (0 - 50) ERROR_PANIC errCode = 0 ERROR_ARGUMENTS = 1 // MetadataRepo (51-100) ERROR_META_WRONG_KEY = 51 ERROR_META_IDX_DEFN_EXIST = 52 ERROR_META_IDX_DEFN_NOT_EXIST = 53 ERROR_META_FAIL_TO_PARSE_INT = 54 // Event Manager (101-150) ERROR_EVT_DUPLICATE_NOTIFIER = 101 // Index Manager (151-200) ERROR_MGR_DDL_CREATE_IDX = 151 ERROR_MGR_DDL_DROP_IDX = 152 // Coordinator (201-250) ERROR_COOR_LISTENER_FAIL = 201 ERROR_COOR_ELECTION_FAIL = 202 // Watcher (251 - 300) ERROR_WATCH_NO_ADDR_AVAIL = 251 // Stream (301-350) ERROR_STREAM_INVALID_ARGUMENT = 301 ERROR_STREAM_NOT_OPEN = 302 ERROR_STREAM_REQUEST_ERROR = 303 ERROR_STREAM_WRONG_VBUCKET = 304 ERROR_STREAM_INVALID_TIMESTAMP = 305 ERROR_STREAM_PROJECTOR_TIMEOUT = 306 ERROR_STREAM_INVALID_KVADDRS = 307 ERROR_STREAM_STREAM_END = 308 ERROR_STREAM_FEEDER = 309 ERROR_STREAM_INCONSISTENT_VBMAP = 310 )
const ( FATAL errSeverity = iota NORMAL )
const ( GENERIC errCategory = iota COORDINATOR INDEX_MANAGER METADATA_REPO REQUEST_HANDLER EVENT_MANAGER WATCHER STREAM )
const ( RESP_SUCCESS string = "success" RESP_ERROR string = "error" )
const CATCHUP_TOPIC = "CATCHUP_STREAM_TOPIC"
const COORDINATOR_CONFIG_STORE = "IndexCoordinatorConfigStore"
Coordinator
const COORD_INIT_STREAM_PORT = ":9335"
const COORD_MAINT_STREAM_PORT = ":9334"
Coordinator
const COUCHBASE_INTERNAL_BUCKET_URL = "http://localhost:11209/"
Stream Manager
const DEFAULT_BUCKET_NAME = "default"
const DEFAULT_EVT_QUEUE_SIZE = 20
Event Manager
const DEFAULT_NOTIFIER_QUEUE_SIZE = 5
const DEFAULT_POOL_NAME = "default"
const HTTP_PREFIX = "http://"
Common
const INDEX_INSTANCE_ID = "IndexInstanceId"
Index Definition
const INDEX_PARTITION_ID = "IndexPartitionId"
const INIT_TOPIC = "INIT_STREAM_TOPIC"
const KV_DCP_PORT = "11210"
const KV_DCP_PORT_CLUSTER_RUN = "12000"
const LOCALHOST = "127.0.0.1"
const MAINT_TOPIC = "MAINT_STREAM_TOPIC"
Stream Manager
const MAX_PROJECTOR_RETRY_ELAPSED_TIME = int64(time.Minute) * 5
const PROJECTOR_PORT = "9999"
const TESTING = true
/////////////////////////////////////////// Constant for Testing ///////////////////////////////////////////
const TIMESTAMP_CHANNEL_SIZE = 30
const TIMESTAMP_HISTORY_COUNT = 10
Timer
const TIMESTAMP_NOTIFY_CH_SIZE = 100
const TIMESTAMP_PERSIST_INTERVAL = uint64(time.Minute)
Variables ¶
var MONITOR_INTERVAL = time.Duration(120000) * time.Millisecond
Stream Monitor (2m)
var NUM_VB = 1024
Common
var TIME_INTERVAL = time.Duration(2000) * time.Millisecond
Timer (2s)
var USE_MASTER_REPO = false
Functions ¶
func GetAllDeletedIndexInstancesId ¶
func GetAllDeletedIndexInstancesId(mgr *IndexManager, buckets []string) ([]uint64, error)
Get all deleted index instance Id's
func GetChangeRecordAsProtoMsg ¶
func GetChangeRecordAsProtoMsg(mgr *IndexManager, changes []*changeRecord, port string) ([]*protobuf.Instance, error)
This function creates a protobuf message for the index instance in the list of change record.
func GetIndexInstanceAsProtoMsg ¶
func GetIndexInstanceAsProtoMsg(mgr *IndexManager, bucket string, defnId common.IndexDefnId, port string) ([]*protobuf.Instance, error)
Get all index instances for a specific defnition as protobuf message
func GetIndexInstancesIdByDefn ¶
func GetIndexInstancesIdByDefn(mgr *IndexManager, bucket string, defnId common.IndexDefnId) ([]uint64, error)
Get all index instance Id's for a specific defnition
func MarshallIndexTopology ¶
func MarshallIndexTopology(topology *IndexTopology) ([]byte, error)
func NewLocalMetadataRepo ¶
func NewLocalMetadataRepo(msgAddr string, eventMgr *eventManager, reqHandler protocol.CustomRequestHandler, repoName string, quota uint64) (*MetadataRepo, RequestServer, error)
Types ¶
type BackupResponse ¶
type BackupResponse struct { Version uint64 `json:"version,omitempty"` Code string `json:"code,omitempty"` Error string `json:"error,omitempty"` Result ClusterIndexMetadata `json:"result,omitempty"` }
type ClusterIndexMetadata ¶
type ClusterIndexMetadata struct {
Metadata []LocalIndexMetadata `json:"metadata,omitempty"`
}
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator(repo *MetadataRepo, idxMgr *IndexManager, basepath string) *Coordinator
func (*Coordinator) AddPendingRequest ¶
func (c *Coordinator) AddPendingRequest(handle *protocol.RequestHandle)
This is called when the leader has de-queued the request for processing.
func (*Coordinator) CleanupOnError ¶
func (c *Coordinator) CleanupOnError()
func (*Coordinator) GetAcceptedEpoch ¶
func (c *Coordinator) GetAcceptedEpoch() (uint32, error)
func (*Coordinator) GetCommitedEntries ¶
func (c *Coordinator) GetCommitedEntries(txid1, txid2 common.Txnid) (<-chan protocol.LogEntryMsg, <-chan error, chan<- bool, error)
func (*Coordinator) GetCurrentEpoch ¶
func (c *Coordinator) GetCurrentEpoch() (uint32, error)
func (*Coordinator) GetEnsembleSize ¶
func (c *Coordinator) GetEnsembleSize() uint64
func (*Coordinator) GetFollowerId ¶
func (c *Coordinator) GetFollowerId() string
func (*Coordinator) GetLastCommittedTxid ¶
func (c *Coordinator) GetLastCommittedTxid() (common.Txnid, error)
func (*Coordinator) GetLastLoggedTxid ¶
func (c *Coordinator) GetLastLoggedTxid() (common.Txnid, error)
func (*Coordinator) GetNextTxnId ¶
func (c *Coordinator) GetNextTxnId() common.Txnid
func (*Coordinator) GetQuorumVerifier ¶
func (c *Coordinator) GetQuorumVerifier() protocol.QuorumVerifier
func (*Coordinator) GetRequestChannel ¶
func (c *Coordinator) GetRequestChannel() <-chan *protocol.RequestHandle
Return a channel of request for the leader to process on.
func (*Coordinator) GetStatus ¶
func (c *Coordinator) GetStatus() protocol.PeerStatus
func (*Coordinator) HasQuorum ¶
func (c *Coordinator) HasQuorum(count int) bool
TODO : Quorum should be based on active participants
func (*Coordinator) LogAndCommit ¶
func (*Coordinator) LogProposal ¶
func (c *Coordinator) LogProposal(proposal protocol.ProposalMsg) error
TODO : what to do if createIndex returns error
func (*Coordinator) NewRequest ¶
func (s *Coordinator) NewRequest(opCode uint32, key string, content []byte) bool
Handle a new request. This function will block until the request is being processed (by returning true) or until the request is being interrupted (by returning false). If request is interrupted, then the request may still be processed by some other nodes. So the outcome of the request is unknown when this function returns false.
func (*Coordinator) NotifyNewAcceptedEpoch ¶
func (c *Coordinator) NotifyNewAcceptedEpoch(epoch uint32) error
func (*Coordinator) NotifyNewCurrentEpoch ¶
func (c *Coordinator) NotifyNewCurrentEpoch(epoch uint32) error
type CoordinatorState ¶
type CoordinatorState struct {
// contains filtered or unexported fields
}
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
type GlobalTopology ¶
type GlobalTopology struct {
TopologyKeys []string `json:"topologyKeys,omitempty"`
}
func (*GlobalTopology) AddTopologyKeyIfNecessary ¶
func (g *GlobalTopology) AddTopologyKeyIfNecessary(key string) bool
Add a topology key
func (*GlobalTopology) RemoveTopologyKey ¶
func (g *GlobalTopology) RemoveTopologyKey(key string)
Remove a topology key
type IndexDefnDistribution ¶
type IndexDefnDistribution struct { Bucket string `json:"bucket,omitempty"` Name string `json:"name,omitempty"` DefnId uint64 `json:"defnId,omitempty"` Instances []IndexInstDistribution `json:"instances,omitempty"` }
type IndexInstDistribution ¶
type IndexInstDistribution struct { InstId uint64 `json:"instId,omitempty"` State uint32 `json:"state,omitempty"` StreamId uint32 `json:"steamId,omitempty"` Error string `json:"error,omitempty"` Partitions []IndexPartDistribution `json:"partitions,omitempty"` RState uint32 `json:"rRtate,omitempty"` Version uint64 `json:"version,omitempty"` ReplicaId uint64 `json:"replicaId,omitempty"` Scheduled bool `json:"scheduled,omitempty"` StorageMode string `json:"storageMode,omitempty"` OldStorageMode string `json:"oldStorageMode,omitempty"` }
type IndexKeyPartDistribution ¶
type IndexKeyPartDistribution struct { Keys []string `json:"keys,omitempty"` SinglePartitions []IndexSinglePartDistribution `json:"singlePartitions,omitempty"` }
type IndexManager ¶
type IndexManager struct {
// contains filtered or unexported fields
}
func NewIndexManager ¶
func NewIndexManager(config common.Config) (mgr *IndexManager, err error)
Create a new IndexManager
func NewIndexManagerInternal ¶
func NewIndexManagerInternal(config common.Config) (mgr *IndexManager, err error)
Create a new IndexManager
func (*IndexManager) CleanupIndex ¶
func (m *IndexManager) CleanupIndex(defnId common.IndexDefnId) error
func (*IndexManager) DeleteIndexForBucket ¶
func (m *IndexManager) DeleteIndexForBucket(bucket string, streamId common.StreamId) error
func (*IndexManager) DeleteLocalValue ¶
func (m *IndexManager) DeleteLocalValue(key string) error
func (*IndexManager) FetchNewClusterInfoCache ¶
func (m *IndexManager) FetchNewClusterInfoCache() (*common.ClusterInfoCache, error)
func (*IndexManager) GetGlobalTopology ¶
func (m *IndexManager) GetGlobalTopology() (*GlobalTopology, error)
Get the global topology
func (*IndexManager) GetIndexDefnById ¶
func (m *IndexManager) GetIndexDefnById(id common.IndexDefnId) (*common.IndexDefn, error)
Get an index definiton by id
func (*IndexManager) GetLocalValue ¶
func (m *IndexManager) GetLocalValue(key string) (string, error)
func (*IndexManager) GetMemoryQuota ¶
func (m *IndexManager) GetMemoryQuota() uint64
func (*IndexManager) GetTopologyByBucket ¶
func (m *IndexManager) GetTopologyByBucket(bucket string) (*IndexTopology, error)
Get Topology from dictionary
func (*IndexManager) HandleBuildIndexDDL ¶
func (m *IndexManager) HandleBuildIndexDDL(indexIds client.IndexIdList) error
func (*IndexManager) HandleCreateIndexDDL ¶
func (m *IndexManager) HandleCreateIndexDDL(defn *common.IndexDefn, isRebalReq bool) error
Handle Create Index DDL. This function will block until
- The index defn is persisted durably in the dictionary
- The index defn is applied locally to each "active" indexer node. An active node is a running node that is in the same network partition as the leader. A leader is always in the majority partition.
This function will return an error if the outcome of the request is not known (e.g. the node is partitioned from the network). It may still mean that the request is able to go through (processed by some other nodes).
A Index DDL can be processed by any node. If this node is a leader, then the DDL request will be processed by the leader. If it is a follower, it will forward the request to the leader.
This function will not be processed until the index manager is either a leader or follower. Therefore, if (1) the node is in the minority partition after network partition or (2) the leader dies, this node will unblock any in-flight request initiated by this node (by returning error). The node will run leader election again. Until this node has became a leader or follower, it will not be able to handle another request.
If this node is partitioned from its leader, it can still recieve updates from the dictionary if this node still connects to it.
func (*IndexManager) HandleDeleteIndexDDL ¶
func (m *IndexManager) HandleDeleteIndexDDL(defnId common.IndexDefnId) error
func (*IndexManager) IsClose ¶
func (m *IndexManager) IsClose() bool
func (*IndexManager) NewIndexDefnIterator ¶
func (m *IndexManager) NewIndexDefnIterator() (*MetaIterator, error)
Get Metadata Iterator for index definition
func (*IndexManager) NotifyConfigUpdate ¶
func (m *IndexManager) NotifyConfigUpdate(config common.Config) error
func (*IndexManager) NotifyIndexerReady ¶
func (m *IndexManager) NotifyIndexerReady() error
func (*IndexManager) NotifyStats ¶
func (m *IndexManager) NotifyStats(stats common.Statistics) error
func (*IndexManager) RegisterNotifier ¶
func (m *IndexManager) RegisterNotifier(notifier MetadataNotifier)
func (*IndexManager) ResetIndex ¶
func (m *IndexManager) ResetIndex(index common.IndexDefn) error
func (*IndexManager) SetLocalValue ¶
func (m *IndexManager) SetLocalValue(key string, value string) error
func (*IndexManager) SetTopologyByBucket ¶
func (m *IndexManager) SetTopologyByBucket(bucket string, topology *IndexTopology) error
Set Topology to dictionary
func (*IndexManager) StartCoordinator ¶
func (mgr *IndexManager) StartCoordinator(config string)
func (*IndexManager) StartListenIndexCreate ¶
func (m *IndexManager) StartListenIndexCreate(id string) (<-chan interface{}, error)
Listen to create Index Request
func (*IndexManager) StartListenIndexDelete ¶
func (m *IndexManager) StartListenIndexDelete(id string) (<-chan interface{}, error)
Listen to delete Index Request
func (*IndexManager) StartListenTopologyUpdate ¶
func (m *IndexManager) StartListenTopologyUpdate(id string) (<-chan interface{}, error)
Listen to update Topology Request
func (*IndexManager) StopListenIndexCreate ¶
func (m *IndexManager) StopListenIndexCreate(id string)
Stop Listen to create Index Request
func (*IndexManager) StopListenIndexDelete ¶
func (m *IndexManager) StopListenIndexDelete(id string)
Stop Listen to delete Index Request
func (*IndexManager) StopListenTopologyUpdate ¶
func (m *IndexManager) StopListenTopologyUpdate(id string)
Stop Listen to update Topology Request
func (*IndexManager) UpdateIndexInstance ¶
func (m *IndexManager) UpdateIndexInstance(bucket string, defnId common.IndexDefnId, state common.IndexState, streamId common.StreamId, err string, buildTime []uint64, rState common.RebalanceState) error
func (*IndexManager) UpdateIndexInstanceSync ¶
func (m *IndexManager) UpdateIndexInstanceSync(bucket string, defnId common.IndexDefnId, state common.IndexState, streamId common.StreamId, err string, buildTime []uint64, rState common.RebalanceState) error
type IndexPartDistribution ¶
type IndexPartDistribution struct { PartId uint64 `json:"partId,omitempty"` SinglePartition IndexSinglePartDistribution `json:"singlePartition,omitempty"` KeyPartition IndexKeyPartDistribution `json:"keyPartition,omitempty"` }
type IndexRequest ¶
type IndexRequest struct { Version uint64 `json:"version,omitempty"` Type RequestType `json:"type,omitempty"` Index common.IndexDefn `json:"index,omitempty"` IndexIds client.IndexIdList `json:indexIds,omitempty"` Plan map[string]interface{} `json:plan,omitempty"` }
type IndexResponse ¶
type IndexSinglePartDistribution ¶
type IndexSinglePartDistribution struct {
Slices []IndexSliceLocator `json:"slices,omitempty"`
}
type IndexSliceLocator ¶
type IndexStatus ¶
type IndexStatus struct { DefnId common.IndexDefnId `json:"defnId,omitempty"` Name string `json:"name,omitempty"` Bucket string `json:"bucket,omitempty"` IsPrimary bool `json:"isPrimary,omitempty"` SecExprs []string `json:"secExprs,omitempty"` WhereExpr string `json:"where,omitempty"` IndexType string `json:"indexType,omitempty"` Status string `json:"status,omitempty"` Definition string `json:"definition"` Hosts []string `json:"hosts,omitempty"` Error string `json:"error,omitempty"` Completion int `json:"completion"` Progress float64 `json:"progress"` Scheduled bool `json:"scheduled"` }
type IndexStatusResponse ¶
type IndexStatusResponse struct { Version uint64 `json:"version,omitempty"` Code string `json:"code,omitempty"` Error string `json:"error,omitempty"` FailedNodes []string `json:"failedNodes,omitempty"` Status []IndexStatus `json:"status,omitempty"` }
type IndexTopology ¶
type IndexTopology struct { Version uint64 `json:"version,omitempty"` Bucket string `json:"bucket,omitempty"` Definitions []IndexDefnDistribution `json:"definitions,omitempty"` }
func GetTopologyAsInstanceProtoMsg ¶
func GetTopologyAsInstanceProtoMsg(mgr *IndexManager, bucket string, port string) ([]*protobuf.Instance, *IndexTopology, error)
Get all index instances for the topology as protobuf message
func (*IndexTopology) AddIndexDefinition ¶
func (t *IndexTopology) AddIndexDefinition(bucket string, name string, defnId uint64, instId uint64, state uint32, indexerId string, instVersion uint64, rState uint32, replicaId uint64, scheduled bool, storageMode string)
Add an index definition to Topology.
func (*IndexTopology) ChangeStateForIndexInstByDefn ¶
func (t *IndexTopology) ChangeStateForIndexInstByDefn(defnId common.IndexDefnId, fromState, toState common.IndexState)
Update Index Status on instance
func (*IndexTopology) FindIndexDefinition ¶
func (t *IndexTopology) FindIndexDefinition(bucket string, name string) *IndexDefnDistribution
Get all index instance Id's for a specific defnition
func (*IndexTopology) FindIndexDefinitionById ¶
func (t *IndexTopology) FindIndexDefinitionById(id common.IndexDefnId) *IndexDefnDistribution
Get all index instance Id's for a specific defnition
func (*IndexTopology) GetIndexInstByDefn ¶
func (t *IndexTopology) GetIndexInstByDefn(defnId common.IndexDefnId) *IndexInstDistribution
Update Index Status on instance
func (*IndexTopology) GetIndexInstancesByDefn ¶
func (t *IndexTopology) GetIndexInstancesByDefn(defnId common.IndexDefnId) []IndexInstDistribution
Update Index Status on instance
func (*IndexTopology) GetRStatusByDefn ¶
func (t *IndexTopology) GetRStatusByDefn(defnId common.IndexDefnId) common.RebalanceState
func (*IndexTopology) GetStatusByDefn ¶
func (t *IndexTopology) GetStatusByDefn(defnId common.IndexDefnId) (common.IndexState, string)
Update Index Status on instance
func (*IndexTopology) RemoveIndexDefinition ¶
func (t *IndexTopology) RemoveIndexDefinition(bucket string, name string)
Remove an index definition to Topology.
func (*IndexTopology) RemoveIndexDefinitionById ¶
func (t *IndexTopology) RemoveIndexDefinitionById(id common.IndexDefnId)
func (*IndexTopology) SetErrorForIndexInstByDefn ¶
func (t *IndexTopology) SetErrorForIndexInstByDefn(defnId common.IndexDefnId, errorStr string) bool
Set Error on instance
func (*IndexTopology) UpdateOldStorageModeForIndexInstByDefn ¶
func (t *IndexTopology) UpdateOldStorageModeForIndexInstByDefn(defnId common.IndexDefnId, storageMode string) bool
Update Old Storage Mode on instance
func (*IndexTopology) UpdateRebalanceStateForIndexInstByDefn ¶
func (t *IndexTopology) UpdateRebalanceStateForIndexInstByDefn(defnId common.IndexDefnId, state common.RebalanceState) bool
Update Index Rebalance Status on instance
func (*IndexTopology) UpdateScheduledFlagForIndexInstByDefn ¶
func (t *IndexTopology) UpdateScheduledFlagForIndexInstByDefn(defnId common.IndexDefnId, scheduled bool) bool
Set scheduled flag
func (*IndexTopology) UpdateStateForIndexInstByDefn ¶
func (t *IndexTopology) UpdateStateForIndexInstByDefn(defnId common.IndexDefnId, state common.IndexState) bool
Update Index Status on instance
func (*IndexTopology) UpdateStorageModeForIndexInstByDefn ¶
func (t *IndexTopology) UpdateStorageModeForIndexInstByDefn(defnId common.IndexDefnId, storageMode string) bool
Update Storage Mode on instance
func (*IndexTopology) UpdateStreamForIndexInstByDefn ¶
func (t *IndexTopology) UpdateStreamForIndexInstByDefn(defnId common.IndexDefnId, stream common.StreamId) bool
Update StreamId on instance
type LifecycleMgr ¶
type LifecycleMgr struct {
// contains filtered or unexported fields
}
func NewLifecycleMgr ¶
func NewLifecycleMgr(notifier MetadataNotifier, clusterURL string) (*LifecycleMgr, error)
func (*LifecycleMgr) BuildIndexes ¶
func (m *LifecycleMgr) BuildIndexes(ids []common.IndexDefnId, reqCtx *common.MetadataRequestContext, retry bool) ([]*common.IndexDefn, []common.IndexDefnId, []error)
func (*LifecycleMgr) CreateIndex ¶
func (m *LifecycleMgr) CreateIndex(defn *common.IndexDefn, scheduled bool, reqCtx *common.MetadataRequestContext) error
func (*LifecycleMgr) DeleteIndex ¶
func (m *LifecycleMgr) DeleteIndex(id common.IndexDefnId, notify bool, reqCtx *common.MetadataRequestContext) error
func (*LifecycleMgr) FindLocalIndexInst ¶
func (m *LifecycleMgr) FindLocalIndexInst(bucket string, defnId common.IndexDefnId) (*IndexInstDistribution, error)
func (*LifecycleMgr) GetResponseChannel ¶
func (m *LifecycleMgr) GetResponseChannel() <-chan c.Packet
func (*LifecycleMgr) OnNewRequest ¶
func (m *LifecycleMgr) OnNewRequest(fid string, request protocol.RequestMsg)
This is the main event processing loop. It is important not to having any blocking call in this function (e.g. mutex). If this function is blocked, it will also block gometa event processing loop.
func (*LifecycleMgr) RegisterNotifier ¶
func (m *LifecycleMgr) RegisterNotifier(notifier MetadataNotifier)
func (*LifecycleMgr) Run ¶
func (m *LifecycleMgr) Run(repo *MetadataRepo, requestServer RequestServer)
func (*LifecycleMgr) SetScheduledFlag ¶
func (m *LifecycleMgr) SetScheduledFlag(bucket string, defnId common.IndexDefnId, scheduled bool) error
func (*LifecycleMgr) Terminate ¶
func (m *LifecycleMgr) Terminate()
func (*LifecycleMgr) UpdateIndexInstance ¶
func (m *LifecycleMgr) UpdateIndexInstance(bucket string, defnId common.IndexDefnId, state common.IndexState, streamId common.StreamId, errStr string, buildTime []uint64, rState uint32) error
type LocalIndexMetadata ¶
type LocalIndexMetadata struct { IndexerId string `json:"indexerId,omitempty"` NodeUUID string `json:"nodeUUID,omitempty"` StorageMode string `json:"storageMode,omitempty"` IndexTopologies []IndexTopology `json:"topologies,omitempty"` IndexDefinitions []common.IndexDefn `json:"definitions,omitempty"` }
type LocalRepoRef ¶
type LocalRepoRef struct {
// contains filtered or unexported fields
}
type MetaIterator ¶
type MetaIterator struct {
// contains filtered or unexported fields
}
type MetadataKind ¶
type MetadataKind byte
const ( KIND_UNKNOWN MetadataKind = iota KIND_INDEX_DEFN KIND_TOPOLOGY KIND_GLOBAL_TOPOLOGY KIND_STABILITY_TIMESTAMP )
type MetadataNotifier ¶
type MetadataNotifier interface { OnIndexCreate(*common.IndexDefn, common.IndexInstId, int, *common.MetadataRequestContext) error OnIndexDelete(common.IndexInstId, string, *common.MetadataRequestContext) error OnIndexBuild([]common.IndexInstId, []string, *common.MetadataRequestContext) map[common.IndexInstId]error OnFetchStats() error }
Index Lifecycle
- Index Creation A) When an index is created, the index definition is assigned to a 64 bits UUID (IndexDefnId). B) IndexManager will persist the index definition. C) IndexManager will persist the index instance with INDEX_STATE_CREATED status. Each instance is assigned a 64 bits IndexInstId. For the first instance of an index, the IndexInstId is equal to the IndexDefnId. D) IndexManager will invovke MetadataNotifier.OnIndexCreate(). E) IndexManager will update instance to status INDEX_STATE_READY. F) If there is any error in (1B) - (1E), IndexManager will cleanup by deleting index definition and index instance. Since there is no atomic transaction, cleanup may not be completed, and the index will be left in an invalid state. See (5) for conditions where the index is considered valid. G) If there is any error in (1E), IndexManager will also invoke OnIndexDelete() H) Any error from (1A) or (1F), the error will be reported back to MetadataProvider.
- Immediate Index Build (index definition is persisted successfully and deferred build flag is false) A) MetadataNotifier.OnIndexBuild() is invoked. OnIndexBuild() is responsible for updating the state of the index instance (e.g. from READY to INITIAL). B) If there is an error in (2A), the error will be returned to the MetadataProvider. C) No cleanup will be perfromed by IndexManager if OnIndexBuild() fails. In other words, the index can be left in INDEX_STATE_READY. The user should be able to kick off index build again using deferred build. D) OnIndexBuild() can be running on a separate go-rountine. It can invoke UpdateIndexInstance() at any time during index build. This update will be queued serially and apply to the topology specific for that index instance (will not affect any other index instance). The new index state will be returned to the MetadataProvider asynchronously.
- Deferred Index Build A) For Deferred Index Build, it will follow step (2A) - (2D).
- Index Deletion A) When an index is deleted, IndexManager will set the index to INDEX_STATE_DELETED. B) If (4A) fails, the error will be returned and the index is considered as NOT deleted. C) IndexManager will then invoke MetadataNotifier.OnIndexDelete(). D) The IndexManager will delete the index definition first before deleting the index instance. since there is no atomic transaction, the cleanup may not be completed, and index can be in inconsistent state. See (5) for valid index state. E) Any error returned from (4C) to (4D) will not be returned to the client (since these are cleanup steps)
- Valid Index States A) Both index definition and index instance exist. B) Index Instance is not in INDEX_STATE_CREATE or INDEX_STATE_DELETED.
type MetadataRepo ¶
type MetadataRepo struct {
// contains filtered or unexported fields
}
func NewMetadataRepo ¶
func NewMetadataRepo(requestAddr string, leaderAddr string, config string, mgr *IndexManager) (*MetadataRepo, error)
func (*MetadataRepo) BroadcastIndexStats ¶
func (c *MetadataRepo) BroadcastIndexStats(stats *client.IndexStats) error
func (*MetadataRepo) BroadcastServiceMap ¶
func (c *MetadataRepo) BroadcastServiceMap(serviceMap *client.ServiceMap) error
func (*MetadataRepo) Close ¶
func (c *MetadataRepo) Close()
func (*MetadataRepo) CreateIndex ¶
func (c *MetadataRepo) CreateIndex(defn *common.IndexDefn) error
TODO: This function is not transactional.
func (*MetadataRepo) DeleteLocalValue ¶
func (c *MetadataRepo) DeleteLocalValue(key string) error
func (*MetadataRepo) DropIndexById ¶
func (c *MetadataRepo) DropIndexById(id common.IndexDefnId) error
func (*MetadataRepo) GetGlobalTopology ¶
func (c *MetadataRepo) GetGlobalTopology() (*GlobalTopology, error)
func (*MetadataRepo) GetIndexDefnById ¶
func (c *MetadataRepo) GetIndexDefnById(id common.IndexDefnId) (*common.IndexDefn, error)
func (*MetadataRepo) GetIndexDefnByName ¶
func (*MetadataRepo) GetLocalIndexerId ¶
func (c *MetadataRepo) GetLocalIndexerId() (common.IndexerId, error)
func (*MetadataRepo) GetLocalNodeUUID ¶
func (c *MetadataRepo) GetLocalNodeUUID() (string, error)
func (*MetadataRepo) GetLocalValue ¶
func (c *MetadataRepo) GetLocalValue(key string) (string, error)
func (*MetadataRepo) GetNextIndexInstId ¶
func (c *MetadataRepo) GetNextIndexInstId() (common.IndexInstId, error)
func (*MetadataRepo) GetNextPartitionId ¶
func (c *MetadataRepo) GetNextPartitionId() (common.PartitionId, error)
func (*MetadataRepo) GetTopologyByBucket ¶
func (c *MetadataRepo) GetTopologyByBucket(bucket string) (*IndexTopology, error)
func (*MetadataRepo) NewIterator ¶
func (c *MetadataRepo) NewIterator() (*MetaIterator, error)
Create a new iterator
func (*MetadataRepo) NewTopologyIterator ¶
func (c *MetadataRepo) NewTopologyIterator() (*TopologyIterator, error)
Create a new topology iterator
func (*MetadataRepo) RegisterNotifier ¶
func (c *MetadataRepo) RegisterNotifier(notifier MetadataNotifier)
func (*MetadataRepo) SetGlobalTopology ¶
func (c *MetadataRepo) SetGlobalTopology(topology *GlobalTopology) error
func (*MetadataRepo) SetLocalValue ¶
func (c *MetadataRepo) SetLocalValue(key string, value string) error
func (*MetadataRepo) SetTopologyByBucket ¶
func (c *MetadataRepo) SetTopologyByBucket(bucket string, topology *IndexTopology) error
func (*MetadataRepo) UpdateIndex ¶
func (c *MetadataRepo) UpdateIndex(defn *common.IndexDefn) error
type RemoteRepoRef ¶
type RemoteRepoRef struct {
// contains filtered or unexported fields
}
type RequestServer ¶
type RequestType ¶
type RequestType string
const ( CREATE RequestType = "create" DROP RequestType = "drop" BUILD RequestType = "build" )
type RestoreContext ¶
type RestoreContext struct {
// contains filtered or unexported fields
}
type RestoreResponse ¶
type TopologyIterator ¶
type TopologyIterator struct {
// contains filtered or unexported fields
}
func (*TopologyIterator) Next ¶
func (i *TopologyIterator) Next() (*IndexTopology, error)
Get value from iterator