Documentation ¶
Overview ¶
This module bridges the gap between the cluster configuration controller and the raft library
Index ¶
- Constants
- Variables
- func DecodeClusterCommandBody(command ClusterCommand) (interface{}, error)
- func EncodeClusterCommand(command ClusterCommand) ([]byte, error)
- func EncodeClusterCommandBody(body interface{}) ([]byte, error)
- type ClusterAddNodeBody
- type ClusterAddRelayBody
- type ClusterAddSiteBody
- type ClusterCommand
- type ClusterCommandType
- type ClusterConfigController
- type ClusterConfigControllerBuilder
- type ClusterController
- func (clusterController *ClusterController) AddNode(clusterCommand ClusterAddNodeBody) error
- func (clusterController *ClusterController) AddRelay(clusterCommand ClusterAddRelayBody) error
- func (clusterController *ClusterController) AddSite(clusterCommand ClusterAddSiteBody) error
- func (clusterController *ClusterController) ApplySnapshot(snap []byte) error
- func (clusterController *ClusterController) ClusterIsInitialized() bool
- func (clusterController *ClusterController) ClusterMemberAddress(nodeID uint64) raft.PeerAddress
- func (clusterController *ClusterController) ClusterNodeConfigs() []NodeConfig
- func (clusterController *ClusterController) ClusterNodes() map[uint64]bool
- func (clusterController *ClusterController) Deltas() []ClusterStateDelta
- func (clusterController *ClusterController) DisableNotifications()
- func (clusterController *ClusterController) EnableNotifications()
- func (clusterController *ClusterController) LocalNodeConfig() *NodeConfig
- func (clusterController *ClusterController) LocalNodeHeldPartitionReplicas() []PartitionReplica
- func (clusterController *ClusterController) LocalNodeHoldsPartition(partition uint64) bool
- func (clusterController *ClusterController) LocalNodeIsInCluster() bool
- func (clusterController *ClusterController) LocalNodeOwnedPartitionReplicas() []PartitionReplica
- func (clusterController *ClusterController) LocalNodeWasRemovedFromCluster() bool
- func (clusterController *ClusterController) LocalPartitionReplicasCount() int
- func (clusterController *ClusterController) Lock()
- func (clusterController *ClusterController) MoveRelay(clusterCommand ClusterMoveRelayBody) error
- func (clusterController *ClusterController) NodeIsInCluster(nodeID uint64) bool
- func (clusterController *ClusterController) Partition(key string) uint64
- func (clusterController *ClusterController) PartitionHolders(partition uint64) []uint64
- func (clusterController *ClusterController) PartitionOwners(partition uint64) []uint64
- func (clusterController *ClusterController) RelaySite(relayID string) string
- func (clusterController *ClusterController) RemoveNode(clusterCommand ClusterRemoveNodeBody) error
- func (clusterController *ClusterController) RemoveRelay(clusterCommand ClusterRemoveRelayBody) error
- func (clusterController *ClusterController) RemoveSite(clusterCommand ClusterRemoveSiteBody) error
- func (clusterController *ClusterController) SetPartitionCount(clusterCommand ClusterSetPartitionCountBody) error
- func (clusterController *ClusterController) SetReplicationFactor(clusterCommand ClusterSetReplicationFactorBody) error
- func (clusterController *ClusterController) SiteExists(siteID string) bool
- func (clusterController *ClusterController) Step(clusterCommand ClusterCommand) ([]ClusterStateDelta, error)
- func (clusterController *ClusterController) TakePartitionReplica(clusterCommand ClusterTakePartitionReplicaBody) error
- func (clusterController *ClusterController) Unlock()
- func (clusterController *ClusterController) UpdateNodeConfig(clusterCommand ClusterUpdateNodeBody) error
- type ClusterMoveRelayBody
- type ClusterRemoveNodeBody
- type ClusterRemoveRelayBody
- type ClusterRemoveSiteBody
- type ClusterSetPartitionCountBody
- type ClusterSetReplicationFactorBody
- type ClusterSettings
- type ClusterSnapshotBody
- type ClusterState
- func (clusterState *ClusterState) AddNode(nodeConfig NodeConfig)
- func (clusterState *ClusterState) AddRelay(relayID string)
- func (clusterState *ClusterState) AddSite(siteID string)
- func (clusterState *ClusterState) AssignPartitionReplica(partition, replica, node uint64) error
- func (clusterState *ClusterState) AssignPartitionReplicaOwnership(partition, replica, node uint64) error
- func (clusterState *ClusterState) AssignToken(node, token uint64) error
- func (clusterState *ClusterState) Initialize()
- func (clusterState *ClusterState) MoveRelay(relayID, siteID string)
- func (clusterState *ClusterState) Recover(snapshot []byte) error
- func (clusterState *ClusterState) RemoveNode(node uint64)
- func (clusterState *ClusterState) RemoveRelay(relayID string)
- func (clusterState *ClusterState) RemoveSite(siteID string)
- func (clusterState *ClusterState) SiteExists(siteID string) bool
- func (clusterState *ClusterState) Snapshot() ([]byte, error)
- type ClusterStateDelta
- type ClusterStateDeltaRange
- type ClusterStateDeltaType
- type ClusterTakePartitionReplicaBody
- type ClusterUpdateNodeBody
- type ConfigController
- func (cc *ConfigController) AddNode(ctx context.Context, nodeConfig NodeConfig) error
- func (cc *ConfigController) CancelProposals()
- func (cc *ConfigController) ClusterCommand(ctx context.Context, commandBody interface{}) error
- func (cc *ConfigController) ClusterController() *ClusterController
- func (cc *ConfigController) LogDump() (raftpb.Snapshot, []raftpb.Entry, error)
- func (cc *ConfigController) OnClusterSnapshot(cb func(snapshotIndex uint64, snapshotId string))
- func (cc *ConfigController) OnLocalUpdates(cb func(deltas []ClusterStateDelta))
- func (cc *ConfigController) RemoveNode(ctx context.Context, nodeID uint64) error
- func (cc *ConfigController) ReplaceNode(ctx context.Context, replacedNodeID uint64, replacementNodeID uint64) error
- func (cc *ConfigController) Start() error
- func (cc *ConfigController) Stop()
- type ConfigControllerBuilder
- func (builder *ConfigControllerBuilder) Create() ClusterConfigController
- func (builder *ConfigControllerBuilder) SetCreateNewCluster(b bool) ClusterConfigControllerBuilder
- func (builder *ConfigControllerBuilder) SetLocalNodeAddress(address PeerAddress) ClusterConfigControllerBuilder
- func (builder *ConfigControllerBuilder) SetRaftNodeStorage(raftStorage RaftNodeStorage) ClusterConfigControllerBuilder
- func (builder *ConfigControllerBuilder) SetRaftNodeTransport(transport *TransportHub) ClusterConfigControllerBuilder
- type NodeAdd
- type NodeAddress
- type NodeConfig
- type NodeConfigList
- type NodeGainPartitionReplica
- type NodeGainPartitionReplicaOwnership
- type NodeGainToken
- type NodeLosePartitionReplica
- type NodeLosePartitionReplicaOwnership
- type NodeLoseToken
- type NodeRemove
- type NodeTokenCount
- type NodeTokenCountHeap
- func (nodeTokenCountHeap NodeTokenCountHeap) Len() int
- func (nodeTokenCountHeap NodeTokenCountHeap) Less(i, j int) bool
- func (nodeTokenCountHeap *NodeTokenCountHeap) Pop() interface{}
- func (nodeTokenCountHeap *NodeTokenCountHeap) Push(x interface{})
- func (nodeTokenCountHeap NodeTokenCountHeap) Swap(i, j int)
- type PartitionReplica
- type PartitioningStrategy
- type RelayAdded
- type RelayMoved
- type RelayRemoved
- type SimplePartitioningStrategy
- func (ps *SimplePartitioningStrategy) AssignPartitions(nodes []NodeConfig, currentPartitionAssignment [][]uint64)
- func (ps *SimplePartitioningStrategy) AssignTokens(nodes []NodeConfig, currentAssignments []uint64, partitions uint64) ([]uint64, error)
- func (ps *SimplePartitioningStrategy) CalculateShiftAmount(partitionCount uint64) int
- func (ps *SimplePartitioningStrategy) Owners(tokenAssignment []uint64, partition uint64, replicationFactor uint64) []uint64
- func (ps *SimplePartitioningStrategy) Partition(key string, partitionCount uint64) uint64
- type SiteAdded
- type SiteRemoved
Constants ¶
View Source
const DefaultPartitionCount uint64 = 1024
View Source
const MaxPartitionCount uint64 = 65536
View Source
const MinPartitionCount uint64 = 64
View Source
const ProposalRetryPeriodSeconds = 15
Variables ¶
View Source
var EBadContext = errors.New("The node addition or removal had an invalid context")
View Source
var ECancelled = errors.New("The request was cancelled")
View Source
var ECouldNotParseCommand = errors.New("The cluster command data was not properly formatted. Unable to parse it.")
View Source
var ENoNodesAvailable = errors.New("Unable to assign tokens because there are no available nodes in the cluster")
View Source
var ENoSuchCommand = errors.New("The cluster command type is not supported")
View Source
var ENoSuchNode = errors.New("The node specified in the update does not exist")
View Source
var ENoSuchPartition = errors.New("The specified partition does not exist")
View Source
var ENoSuchRelay = errors.New("The specified relay does not exist")
View Source
var ENoSuchReplica = errors.New("The specified partition replica does not exist")
View Source
var ENoSuchSite = errors.New("The specified site does not exist")
View Source
var ENoSuchToken = errors.New("The specified token does not exist")
View Source
var ENodeDoesNotOwnReplica = errors.New("A node tried to transfer a partition replica to itself but it no longer owns that replica")
View Source
var EPreconditionFailed = errors.New("Unable to validate precondition")
View Source
var ERaftNodeStartup = errors.New("Encountered an error while starting up raft controller")
View Source
var ERaftProtocolError = errors.New("Raft controller encountered a protocol error")
View Source
var EReplicaNumberInvalid = errors.New("The command specified an invalid replica number for a partition.")
View Source
var EStopped = errors.New("The server was stopped")
Functions ¶
func DecodeClusterCommandBody ¶
func DecodeClusterCommandBody(command ClusterCommand) (interface{}, error)
func EncodeClusterCommand ¶
func EncodeClusterCommand(command ClusterCommand) ([]byte, error)
Types ¶
type ClusterAddNodeBody ¶
type ClusterAddNodeBody struct { NodeID uint64 NodeConfig NodeConfig }
type ClusterAddRelayBody ¶
type ClusterAddRelayBody struct {
RelayID string
}
type ClusterAddSiteBody ¶
type ClusterAddSiteBody struct {
SiteID string
}
type ClusterCommand ¶
type ClusterCommand struct { Type ClusterCommandType SubmitterID uint64 CommandID uint64 Data []byte }
func CreateClusterCommand ¶
func CreateClusterCommand(commandType ClusterCommandType, body interface{}) (ClusterCommand, error)
func DecodeClusterCommand ¶
func DecodeClusterCommand(encodedCommand []byte) (ClusterCommand, error)
type ClusterCommandType ¶
type ClusterCommandType int
const ( ClusterUpdateNode ClusterCommandType = iota ClusterAddNode ClusterCommandType = iota ClusterRemoveNode ClusterCommandType = iota ClusterTakePartitionReplica ClusterCommandType = iota ClusterSetReplicationFactor ClusterCommandType = iota ClusterSetPartitionCount ClusterCommandType = iota ClusterAddSite ClusterCommandType = iota ClusterRemoveSite ClusterCommandType = iota ClusterAddRelay ClusterCommandType = iota ClusterRemoveRelay ClusterCommandType = iota ClusterMoveRelay ClusterCommandType = iota ClusterSnapshot ClusterCommandType = iota )
type ClusterConfigController ¶
type ClusterConfigController interface { LogDump() (raftpb.Snapshot, []raftpb.Entry, error) AddNode(ctx context.Context, nodeConfig NodeConfig) error ReplaceNode(ctx context.Context, replacedNodeID uint64, replacementNodeID uint64) error RemoveNode(ctx context.Context, nodeID uint64) error ClusterCommand(ctx context.Context, commandBody interface{}) error OnLocalUpdates(cb func(deltas []ClusterStateDelta)) OnClusterSnapshot(cb func(snapshotIndex uint64, snapshotId string)) ClusterController() *ClusterController Start() error Stop() CancelProposals() }
type ClusterConfigControllerBuilder ¶
type ClusterConfigControllerBuilder interface { SetCreateNewCluster(b bool) ClusterConfigControllerBuilder SetLocalNodeAddress(peerAddress PeerAddress) ClusterConfigControllerBuilder SetRaftNodeStorage(raftStorage RaftNodeStorage) ClusterConfigControllerBuilder SetRaftNodeTransport(transport *TransportHub) ClusterConfigControllerBuilder Create() ClusterConfigController }
type ClusterController ¶
type ClusterController struct { LocalNodeID uint64 State ClusterState PartitioningStrategy PartitioningStrategy LocalUpdates chan []ClusterStateDelta // contains filtered or unexported fields }
func (*ClusterController) AddNode ¶
func (clusterController *ClusterController) AddNode(clusterCommand ClusterAddNodeBody) error
func (*ClusterController) AddRelay ¶
func (clusterController *ClusterController) AddRelay(clusterCommand ClusterAddRelayBody) error
func (*ClusterController) AddSite ¶
func (clusterController *ClusterController) AddSite(clusterCommand ClusterAddSiteBody) error
func (*ClusterController) ApplySnapshot ¶
func (clusterController *ClusterController) ApplySnapshot(snap []byte) error
Apply a snapshot to the state and notify on the local updates channel of any relevant changes
func (*ClusterController) ClusterIsInitialized ¶
func (clusterController *ClusterController) ClusterIsInitialized() bool
func (*ClusterController) ClusterMemberAddress ¶
func (clusterController *ClusterController) ClusterMemberAddress(nodeID uint64) raft.PeerAddress
func (*ClusterController) ClusterNodeConfigs ¶
func (clusterController *ClusterController) ClusterNodeConfigs() []NodeConfig
func (*ClusterController) ClusterNodes ¶
func (clusterController *ClusterController) ClusterNodes() map[uint64]bool
func (*ClusterController) Deltas ¶
func (clusterController *ClusterController) Deltas() []ClusterStateDelta
func (*ClusterController) DisableNotifications ¶
func (clusterController *ClusterController) DisableNotifications()
func (*ClusterController) EnableNotifications ¶
func (clusterController *ClusterController) EnableNotifications()
func (*ClusterController) LocalNodeConfig ¶
func (clusterController *ClusterController) LocalNodeConfig() *NodeConfig
func (*ClusterController) LocalNodeHeldPartitionReplicas ¶
func (clusterController *ClusterController) LocalNodeHeldPartitionReplicas() []PartitionReplica
func (*ClusterController) LocalNodeHoldsPartition ¶
func (clusterController *ClusterController) LocalNodeHoldsPartition(partition uint64) bool
func (*ClusterController) LocalNodeIsInCluster ¶
func (clusterController *ClusterController) LocalNodeIsInCluster() bool
func (*ClusterController) LocalNodeOwnedPartitionReplicas ¶
func (clusterController *ClusterController) LocalNodeOwnedPartitionReplicas() []PartitionReplica
func (*ClusterController) LocalNodeWasRemovedFromCluster ¶
func (clusterController *ClusterController) LocalNodeWasRemovedFromCluster() bool
func (*ClusterController) LocalPartitionReplicasCount ¶
func (clusterController *ClusterController) LocalPartitionReplicasCount() int
func (*ClusterController) Lock ¶
func (clusterController *ClusterController) Lock()
func (*ClusterController) MoveRelay ¶
func (clusterController *ClusterController) MoveRelay(clusterCommand ClusterMoveRelayBody) error
func (*ClusterController) NodeIsInCluster ¶
func (clusterController *ClusterController) NodeIsInCluster(nodeID uint64) bool
func (*ClusterController) Partition ¶
func (clusterController *ClusterController) Partition(key string) uint64
func (*ClusterController) PartitionHolders ¶
func (clusterController *ClusterController) PartitionHolders(partition uint64) []uint64
func (*ClusterController) PartitionOwners ¶
func (clusterController *ClusterController) PartitionOwners(partition uint64) []uint64
func (*ClusterController) RelaySite ¶
func (clusterController *ClusterController) RelaySite(relayID string) string
func (*ClusterController) RemoveNode ¶
func (clusterController *ClusterController) RemoveNode(clusterCommand ClusterRemoveNodeBody) error
func (*ClusterController) RemoveRelay ¶
func (clusterController *ClusterController) RemoveRelay(clusterCommand ClusterRemoveRelayBody) error
func (*ClusterController) RemoveSite ¶
func (clusterController *ClusterController) RemoveSite(clusterCommand ClusterRemoveSiteBody) error
func (*ClusterController) SetPartitionCount ¶
func (clusterController *ClusterController) SetPartitionCount(clusterCommand ClusterSetPartitionCountBody) error
func (*ClusterController) SetReplicationFactor ¶
func (clusterController *ClusterController) SetReplicationFactor(clusterCommand ClusterSetReplicationFactorBody) error
func (*ClusterController) SiteExists ¶
func (clusterController *ClusterController) SiteExists(siteID string) bool
func (*ClusterController) Step ¶
func (clusterController *ClusterController) Step(clusterCommand ClusterCommand) ([]ClusterStateDelta, error)
func (*ClusterController) TakePartitionReplica ¶
func (clusterController *ClusterController) TakePartitionReplica(clusterCommand ClusterTakePartitionReplicaBody) error
func (*ClusterController) Unlock ¶
func (clusterController *ClusterController) Unlock()
func (*ClusterController) UpdateNodeConfig ¶
func (clusterController *ClusterController) UpdateNodeConfig(clusterCommand ClusterUpdateNodeBody) error
type ClusterMoveRelayBody ¶
type ClusterRemoveNodeBody ¶
type ClusterRemoveRelayBody ¶
type ClusterRemoveRelayBody struct {
RelayID string
}
type ClusterRemoveSiteBody ¶
type ClusterRemoveSiteBody struct {
SiteID string
}
type ClusterSetPartitionCountBody ¶
type ClusterSetPartitionCountBody struct {
Partitions uint64
}
type ClusterSetReplicationFactorBody ¶
type ClusterSetReplicationFactorBody struct {
ReplicationFactor uint64
}
type ClusterSettings ¶
type ClusterSettings struct { // The replication factor of this cluster ReplicationFactor uint64 // The number of partitions in the hash space Partitions uint64 }
func (*ClusterSettings) AreInitialized ¶
func (clusterSettings *ClusterSettings) AreInitialized() bool
type ClusterSnapshotBody ¶
type ClusterSnapshotBody struct {
UUID string
}
type ClusterState ¶
type ClusterState struct { // A set of nodes IDs of nodes that were previously cluster members // but were since removed RemovedNodes map[uint64]bool // Ring members and their configuration Nodes map[uint64]*NodeConfig // A mapping between tokens and the node that owns them Tokens []uint64 // The partition replicas in this node Partitions [][]*PartitionReplica // Global cluster settings that must be initialized before the cluster is // initialized ClusterSettings ClusterSettings Sites map[string]bool Relays map[string]string }
func (*ClusterState) AddNode ¶
func (clusterState *ClusterState) AddNode(nodeConfig NodeConfig)
func (*ClusterState) AddRelay ¶
func (clusterState *ClusterState) AddRelay(relayID string)
func (*ClusterState) AddSite ¶
func (clusterState *ClusterState) AddSite(siteID string)
func (*ClusterState) AssignPartitionReplica ¶
func (clusterState *ClusterState) AssignPartitionReplica(partition, replica, node uint64) error
change the holder of a partition replica
func (*ClusterState) AssignPartitionReplicaOwnership ¶
func (clusterState *ClusterState) AssignPartitionReplicaOwnership(partition, replica, node uint64) error
change the owner of a partition replicas
func (*ClusterState) AssignToken ¶
func (clusterState *ClusterState) AssignToken(node, token uint64) error
change the owner of a token
func (*ClusterState) Initialize ¶
func (clusterState *ClusterState) Initialize()
func (*ClusterState) MoveRelay ¶
func (clusterState *ClusterState) MoveRelay(relayID, siteID string)
func (*ClusterState) Recover ¶
func (clusterState *ClusterState) Recover(snapshot []byte) error
func (*ClusterState) RemoveNode ¶
func (clusterState *ClusterState) RemoveNode(node uint64)
func (*ClusterState) RemoveRelay ¶
func (clusterState *ClusterState) RemoveRelay(relayID string)
func (*ClusterState) RemoveSite ¶
func (clusterState *ClusterState) RemoveSite(siteID string)
func (*ClusterState) SiteExists ¶
func (clusterState *ClusterState) SiteExists(siteID string) bool
func (*ClusterState) Snapshot ¶
func (clusterState *ClusterState) Snapshot() ([]byte, error)
type ClusterStateDelta ¶
type ClusterStateDelta struct { Type ClusterStateDeltaType Delta interface{} }
type ClusterStateDeltaRange ¶
type ClusterStateDeltaRange []ClusterStateDelta
func (ClusterStateDeltaRange) Len ¶
func (r ClusterStateDeltaRange) Len() int
func (ClusterStateDeltaRange) Less ¶
func (r ClusterStateDeltaRange) Less(i, j int) bool
func (ClusterStateDeltaRange) Swap ¶
func (r ClusterStateDeltaRange) Swap(i, j int)
type ClusterStateDeltaType ¶
type ClusterStateDeltaType int
const ( DeltaNodeAdd ClusterStateDeltaType = iota DeltaNodeRemove ClusterStateDeltaType = iota DeltaNodeLoseToken ClusterStateDeltaType = iota DeltaNodeGainToken ClusterStateDeltaType = iota DeltaNodeGainPartitionReplicaOwnership ClusterStateDeltaType = iota DeltaNodeLosePartitionReplicaOwnership ClusterStateDeltaType = iota DeltaNodeLosePartitionReplica ClusterStateDeltaType = iota DeltaNodeGainPartitionReplica ClusterStateDeltaType = iota DeltaSiteAdded ClusterStateDeltaType = iota DeltaSiteRemoved ClusterStateDeltaType = iota DeltaRelayAdded ClusterStateDeltaType = iota DeltaRelayRemoved ClusterStateDeltaType = iota DeltaRelayMoved ClusterStateDeltaType = iota )
type ClusterUpdateNodeBody ¶
type ClusterUpdateNodeBody struct { NodeID uint64 NodeConfig NodeConfig }
type ConfigController ¶
type ConfigController struct {
// contains filtered or unexported fields
}
func NewConfigController ¶
func NewConfigController(raftNode *raft.RaftNode, raftTransport *raft.TransportHub, clusterController *ClusterController) *ConfigController
func (*ConfigController) AddNode ¶
func (cc *ConfigController) AddNode(ctx context.Context, nodeConfig NodeConfig) error
func (*ConfigController) CancelProposals ¶
func (cc *ConfigController) CancelProposals()
func (*ConfigController) ClusterCommand ¶
func (cc *ConfigController) ClusterCommand(ctx context.Context, commandBody interface{}) error
func (*ConfigController) ClusterController ¶
func (cc *ConfigController) ClusterController() *ClusterController
func (*ConfigController) OnClusterSnapshot ¶
func (cc *ConfigController) OnClusterSnapshot(cb func(snapshotIndex uint64, snapshotId string))
func (*ConfigController) OnLocalUpdates ¶
func (cc *ConfigController) OnLocalUpdates(cb func(deltas []ClusterStateDelta))
func (*ConfigController) RemoveNode ¶
func (cc *ConfigController) RemoveNode(ctx context.Context, nodeID uint64) error
func (*ConfigController) ReplaceNode ¶
func (*ConfigController) Start ¶
func (cc *ConfigController) Start() error
func (*ConfigController) Stop ¶
func (cc *ConfigController) Stop()
type ConfigControllerBuilder ¶
type ConfigControllerBuilder struct {
// contains filtered or unexported fields
}
func (*ConfigControllerBuilder) Create ¶
func (builder *ConfigControllerBuilder) Create() ClusterConfigController
func (*ConfigControllerBuilder) SetCreateNewCluster ¶
func (builder *ConfigControllerBuilder) SetCreateNewCluster(b bool) ClusterConfigControllerBuilder
func (*ConfigControllerBuilder) SetLocalNodeAddress ¶
func (builder *ConfigControllerBuilder) SetLocalNodeAddress(address PeerAddress) ClusterConfigControllerBuilder
func (*ConfigControllerBuilder) SetRaftNodeStorage ¶
func (builder *ConfigControllerBuilder) SetRaftNodeStorage(raftStorage RaftNodeStorage) ClusterConfigControllerBuilder
func (*ConfigControllerBuilder) SetRaftNodeTransport ¶
func (builder *ConfigControllerBuilder) SetRaftNodeTransport(transport *TransportHub) ClusterConfigControllerBuilder
type NodeAdd ¶
type NodeAdd struct { NodeID uint64 NodeConfig NodeConfig }
type NodeAddress ¶
type NodeAddress struct { NodeID uint64 Address ddbRaft.PeerAddress }
type NodeConfig ¶
type NodeConfig struct { // The network address of the node Address ddbRaft.PeerAddress // Node capacity in bytes Capacity uint64 // The tokens owned by this node Tokens map[uint64]bool // a set of partition replicas owned by this node OwnedPartitionReplicas map[uint64]map[uint64]bool // a set of partition replicas held by this node. This is derived from the cluster state and is used // only internally for quick lookup. It is not stored or transferred as part of a node's configuration PartitionReplicas map[uint64]map[uint64]bool }
type NodeConfigList ¶
type NodeConfigList []NodeConfig
func (NodeConfigList) Len ¶
func (nodeConfigList NodeConfigList) Len() int
func (NodeConfigList) Less ¶
func (nodeConfigList NodeConfigList) Less(i, j int) bool
func (NodeConfigList) Swap ¶
func (nodeConfigList NodeConfigList) Swap(i, j int)
type NodeGainToken ¶
type NodeLoseToken ¶
type NodeRemove ¶
type NodeRemove struct {
NodeID uint64
}
type NodeTokenCount ¶
type NodeTokenCountHeap ¶
type NodeTokenCountHeap []NodeTokenCount
func (NodeTokenCountHeap) Len ¶
func (nodeTokenCountHeap NodeTokenCountHeap) Len() int
func (NodeTokenCountHeap) Less ¶
func (nodeTokenCountHeap NodeTokenCountHeap) Less(i, j int) bool
func (*NodeTokenCountHeap) Pop ¶
func (nodeTokenCountHeap *NodeTokenCountHeap) Pop() interface{}
func (*NodeTokenCountHeap) Push ¶
func (nodeTokenCountHeap *NodeTokenCountHeap) Push(x interface{})
func (NodeTokenCountHeap) Swap ¶
func (nodeTokenCountHeap NodeTokenCountHeap) Swap(i, j int)
type PartitionReplica ¶
type PartitionReplica struct { // The partition number. The partition number combined with the total number of partitions and the range of the hash // space define a contiguous range in the hash space for which this partition is responsible Partition uint64 // The index of this partition replica. If the replication factor is set to 3 this number will range from 0 to 2 // The 0th partition replica represents the primary replica for that partition. The owner of the primary replica // will be the only node able to accept writes for this partition. The other replicas for this partition serve // only as backups Replica uint64 // The ID of the node that holds this partition replica. The holder can differ from the owner if the cluster is in // a transitional state and the partition replica is being transferred to a new node. The owner is based only on // the current token assignments Holder uint64 // The ID of the node that owns this partition Owner uint64 }
type PartitioningStrategy ¶
type PartitioningStrategy interface { AssignTokens(nodes []NodeConfig, currentTokenAssignment []uint64, partitions uint64) ([]uint64, error) AssignPartitions(nodes []NodeConfig, currentPartitionAssignment [][]uint64) Owners(tokenAssignment []uint64, partition uint64, replicationFactor uint64) []uint64 Partition(key string, partitionCount uint64) uint64 }
type RelayAdded ¶
type RelayAdded struct {
RelayID string
}
type RelayMoved ¶
type RelayRemoved ¶
type RelayRemoved struct {
RelayID string
}
type SimplePartitioningStrategy ¶
type SimplePartitioningStrategy struct {
// contains filtered or unexported fields
}
Simple replication strategy that does not account for capacity other than finding nodes that are marked as having 0 capacity to account for decomissioned nodes. Other than that It just tries to assign as close to an even amount of tokens to each node as possible
func (*SimplePartitioningStrategy) AssignPartitions ¶
func (ps *SimplePartitioningStrategy) AssignPartitions(nodes []NodeConfig, currentPartitionAssignment [][]uint64)
func (*SimplePartitioningStrategy) AssignTokens ¶
func (ps *SimplePartitioningStrategy) AssignTokens(nodes []NodeConfig, currentAssignments []uint64, partitions uint64) ([]uint64, error)
func (*SimplePartitioningStrategy) CalculateShiftAmount ¶
func (ps *SimplePartitioningStrategy) CalculateShiftAmount(partitionCount uint64) int
type SiteRemoved ¶
type SiteRemoved struct {
SiteID string
}
Click to show internal directories.
Click to hide internal directories.