cluster

package
v0.0.0-...-ac41614 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2022 License: MIT Imports: 12 Imported by: 18

Documentation

Overview

This module bridges the gap between the cluster configuration controller and the raft library

Index

Constants

View Source
const DefaultPartitionCount uint64 = 1024
View Source
const MaxPartitionCount uint64 = 65536
View Source
const MinPartitionCount uint64 = 64
View Source
const ProposalRetryPeriodSeconds = 15

Variables

View Source
var EBadContext = errors.New("The node addition or removal had an invalid context")
View Source
var ECancelled = errors.New("The request was cancelled")
View Source
var ECouldNotParseCommand = errors.New("The cluster command data was not properly formatted. Unable to parse it.")
View Source
var ENoNodesAvailable = errors.New("Unable to assign tokens because there are no available nodes in the cluster")
View Source
var ENoSuchCommand = errors.New("The cluster command type is not supported")
View Source
var ENoSuchNode = errors.New("The node specified in the update does not exist")
View Source
var ENoSuchPartition = errors.New("The specified partition does not exist")
View Source
var ENoSuchRelay = errors.New("The specified relay does not exist")
View Source
var ENoSuchReplica = errors.New("The specified partition replica does not exist")
View Source
var ENoSuchSite = errors.New("The specified site does not exist")
View Source
var ENoSuchToken = errors.New("The specified token does not exist")
View Source
var ENodeDoesNotOwnReplica = errors.New("A node tried to transfer a partition replica to itself but it no longer owns that replica")
View Source
var EPreconditionFailed = errors.New("Unable to validate precondition")
View Source
var ERaftNodeStartup = errors.New("Encountered an error while starting up raft controller")
View Source
var ERaftProtocolError = errors.New("Raft controller encountered a protocol error")
View Source
var EReplicaNumberInvalid = errors.New("The command specified an invalid replica number for a partition.")
View Source
var EStopped = errors.New("The server was stopped")

Functions

func DecodeClusterCommandBody

func DecodeClusterCommandBody(command ClusterCommand) (interface{}, error)

func EncodeClusterCommand

func EncodeClusterCommand(command ClusterCommand) ([]byte, error)

func EncodeClusterCommandBody

func EncodeClusterCommandBody(body interface{}) ([]byte, error)

Types

type ClusterAddNodeBody

type ClusterAddNodeBody struct {
	NodeID     uint64
	NodeConfig NodeConfig
}

type ClusterAddRelayBody

type ClusterAddRelayBody struct {
	RelayID string
}

type ClusterAddSiteBody

type ClusterAddSiteBody struct {
	SiteID string
}

type ClusterCommand

type ClusterCommand struct {
	Type        ClusterCommandType
	SubmitterID uint64
	CommandID   uint64
	Data        []byte
}

func CreateClusterCommand

func CreateClusterCommand(commandType ClusterCommandType, body interface{}) (ClusterCommand, error)

func DecodeClusterCommand

func DecodeClusterCommand(encodedCommand []byte) (ClusterCommand, error)

type ClusterCommandType

type ClusterCommandType int
const (
	ClusterUpdateNode           ClusterCommandType = iota
	ClusterAddNode              ClusterCommandType = iota
	ClusterRemoveNode           ClusterCommandType = iota
	ClusterTakePartitionReplica ClusterCommandType = iota
	ClusterSetReplicationFactor ClusterCommandType = iota
	ClusterSetPartitionCount    ClusterCommandType = iota

	ClusterAddSite     ClusterCommandType = iota
	ClusterRemoveSite  ClusterCommandType = iota
	ClusterAddRelay    ClusterCommandType = iota
	ClusterRemoveRelay ClusterCommandType = iota
	ClusterMoveRelay   ClusterCommandType = iota
	ClusterSnapshot    ClusterCommandType = iota
)

type ClusterConfigController

type ClusterConfigController interface {
	LogDump() (raftpb.Snapshot, []raftpb.Entry, error)
	AddNode(ctx context.Context, nodeConfig NodeConfig) error
	ReplaceNode(ctx context.Context, replacedNodeID uint64, replacementNodeID uint64) error
	RemoveNode(ctx context.Context, nodeID uint64) error
	ClusterCommand(ctx context.Context, commandBody interface{}) error
	OnLocalUpdates(cb func(deltas []ClusterStateDelta))
	OnClusterSnapshot(cb func(snapshotIndex uint64, snapshotId string))
	ClusterController() *ClusterController
	Start() error
	Stop()
	CancelProposals()
}

type ClusterConfigControllerBuilder

type ClusterConfigControllerBuilder interface {
	SetCreateNewCluster(b bool) ClusterConfigControllerBuilder
	SetLocalNodeAddress(peerAddress PeerAddress) ClusterConfigControllerBuilder
	SetRaftNodeStorage(raftStorage RaftNodeStorage) ClusterConfigControllerBuilder
	SetRaftNodeTransport(transport *TransportHub) ClusterConfigControllerBuilder
	Create() ClusterConfigController
}

type ClusterController

type ClusterController struct {
	LocalNodeID          uint64
	State                ClusterState
	PartitioningStrategy PartitioningStrategy
	LocalUpdates         chan []ClusterStateDelta
	// contains filtered or unexported fields
}

func (*ClusterController) AddNode

func (clusterController *ClusterController) AddNode(clusterCommand ClusterAddNodeBody) error

func (*ClusterController) AddRelay

func (clusterController *ClusterController) AddRelay(clusterCommand ClusterAddRelayBody) error

func (*ClusterController) AddSite

func (clusterController *ClusterController) AddSite(clusterCommand ClusterAddSiteBody) error

func (*ClusterController) ApplySnapshot

func (clusterController *ClusterController) ApplySnapshot(snap []byte) error

Apply a snapshot to the state and notify on the local updates channel of any relevant changes

func (*ClusterController) ClusterIsInitialized

func (clusterController *ClusterController) ClusterIsInitialized() bool

func (*ClusterController) ClusterMemberAddress

func (clusterController *ClusterController) ClusterMemberAddress(nodeID uint64) raft.PeerAddress

func (*ClusterController) ClusterNodeConfigs

func (clusterController *ClusterController) ClusterNodeConfigs() []NodeConfig

func (*ClusterController) ClusterNodes

func (clusterController *ClusterController) ClusterNodes() map[uint64]bool

func (*ClusterController) Deltas

func (clusterController *ClusterController) Deltas() []ClusterStateDelta

func (*ClusterController) DisableNotifications

func (clusterController *ClusterController) DisableNotifications()

func (*ClusterController) EnableNotifications

func (clusterController *ClusterController) EnableNotifications()

func (*ClusterController) LocalNodeConfig

func (clusterController *ClusterController) LocalNodeConfig() *NodeConfig

func (*ClusterController) LocalNodeHeldPartitionReplicas

func (clusterController *ClusterController) LocalNodeHeldPartitionReplicas() []PartitionReplica

func (*ClusterController) LocalNodeHoldsPartition

func (clusterController *ClusterController) LocalNodeHoldsPartition(partition uint64) bool

func (*ClusterController) LocalNodeIsInCluster

func (clusterController *ClusterController) LocalNodeIsInCluster() bool

func (*ClusterController) LocalNodeOwnedPartitionReplicas

func (clusterController *ClusterController) LocalNodeOwnedPartitionReplicas() []PartitionReplica

func (*ClusterController) LocalNodeWasRemovedFromCluster

func (clusterController *ClusterController) LocalNodeWasRemovedFromCluster() bool

func (*ClusterController) LocalPartitionReplicasCount

func (clusterController *ClusterController) LocalPartitionReplicasCount() int

func (*ClusterController) Lock

func (clusterController *ClusterController) Lock()

func (*ClusterController) MoveRelay

func (clusterController *ClusterController) MoveRelay(clusterCommand ClusterMoveRelayBody) error

func (*ClusterController) NodeIsInCluster

func (clusterController *ClusterController) NodeIsInCluster(nodeID uint64) bool

func (*ClusterController) Partition

func (clusterController *ClusterController) Partition(key string) uint64

func (*ClusterController) PartitionHolders

func (clusterController *ClusterController) PartitionHolders(partition uint64) []uint64

func (*ClusterController) PartitionOwners

func (clusterController *ClusterController) PartitionOwners(partition uint64) []uint64

func (*ClusterController) RelaySite

func (clusterController *ClusterController) RelaySite(relayID string) string

func (*ClusterController) RemoveNode

func (clusterController *ClusterController) RemoveNode(clusterCommand ClusterRemoveNodeBody) error

func (*ClusterController) RemoveRelay

func (clusterController *ClusterController) RemoveRelay(clusterCommand ClusterRemoveRelayBody) error

func (*ClusterController) RemoveSite

func (clusterController *ClusterController) RemoveSite(clusterCommand ClusterRemoveSiteBody) error

func (*ClusterController) SetPartitionCount

func (clusterController *ClusterController) SetPartitionCount(clusterCommand ClusterSetPartitionCountBody) error

func (*ClusterController) SetReplicationFactor

func (clusterController *ClusterController) SetReplicationFactor(clusterCommand ClusterSetReplicationFactorBody) error

func (*ClusterController) SiteExists

func (clusterController *ClusterController) SiteExists(siteID string) bool

func (*ClusterController) Step

func (clusterController *ClusterController) Step(clusterCommand ClusterCommand) ([]ClusterStateDelta, error)

func (*ClusterController) TakePartitionReplica

func (clusterController *ClusterController) TakePartitionReplica(clusterCommand ClusterTakePartitionReplicaBody) error

func (*ClusterController) Unlock

func (clusterController *ClusterController) Unlock()

func (*ClusterController) UpdateNodeConfig

func (clusterController *ClusterController) UpdateNodeConfig(clusterCommand ClusterUpdateNodeBody) error

type ClusterMoveRelayBody

type ClusterMoveRelayBody struct {
	RelayID string
	SiteID  string
}

type ClusterRemoveNodeBody

type ClusterRemoveNodeBody struct {
	NodeID            uint64
	ReplacementNodeID uint64
}

type ClusterRemoveRelayBody

type ClusterRemoveRelayBody struct {
	RelayID string
}

type ClusterRemoveSiteBody

type ClusterRemoveSiteBody struct {
	SiteID string
}

type ClusterSetPartitionCountBody

type ClusterSetPartitionCountBody struct {
	Partitions uint64
}

type ClusterSetReplicationFactorBody

type ClusterSetReplicationFactorBody struct {
	ReplicationFactor uint64
}

type ClusterSettings

type ClusterSettings struct {
	// The replication factor of this cluster
	ReplicationFactor uint64
	// The number of partitions in the hash space
	Partitions uint64
}

func (*ClusterSettings) AreInitialized

func (clusterSettings *ClusterSettings) AreInitialized() bool

type ClusterSnapshotBody

type ClusterSnapshotBody struct {
	UUID string
}

type ClusterState

type ClusterState struct {
	// A set of nodes IDs of nodes that were previously cluster members
	// but were since removed
	RemovedNodes map[uint64]bool
	// Ring members and their configuration
	Nodes map[uint64]*NodeConfig
	// A mapping between tokens and the node that owns them
	Tokens []uint64
	// The partition replicas in this node
	Partitions [][]*PartitionReplica
	// Global cluster settings that must be initialized before the cluster is
	// initialized
	ClusterSettings ClusterSettings
	Sites           map[string]bool
	Relays          map[string]string
}

func (*ClusterState) AddNode

func (clusterState *ClusterState) AddNode(nodeConfig NodeConfig)

func (*ClusterState) AddRelay

func (clusterState *ClusterState) AddRelay(relayID string)

func (*ClusterState) AddSite

func (clusterState *ClusterState) AddSite(siteID string)

func (*ClusterState) AssignPartitionReplica

func (clusterState *ClusterState) AssignPartitionReplica(partition, replica, node uint64) error

change the holder of a partition replica

func (*ClusterState) AssignPartitionReplicaOwnership

func (clusterState *ClusterState) AssignPartitionReplicaOwnership(partition, replica, node uint64) error

change the owner of a partition replicas

func (*ClusterState) AssignToken

func (clusterState *ClusterState) AssignToken(node, token uint64) error

change the owner of a token

func (*ClusterState) Initialize

func (clusterState *ClusterState) Initialize()

func (*ClusterState) MoveRelay

func (clusterState *ClusterState) MoveRelay(relayID, siteID string)

func (*ClusterState) Recover

func (clusterState *ClusterState) Recover(snapshot []byte) error

func (*ClusterState) RemoveNode

func (clusterState *ClusterState) RemoveNode(node uint64)

func (*ClusterState) RemoveRelay

func (clusterState *ClusterState) RemoveRelay(relayID string)

func (*ClusterState) RemoveSite

func (clusterState *ClusterState) RemoveSite(siteID string)

func (*ClusterState) SiteExists

func (clusterState *ClusterState) SiteExists(siteID string) bool

func (*ClusterState) Snapshot

func (clusterState *ClusterState) Snapshot() ([]byte, error)

type ClusterStateDelta

type ClusterStateDelta struct {
	Type  ClusterStateDeltaType
	Delta interface{}
}

type ClusterStateDeltaRange

type ClusterStateDeltaRange []ClusterStateDelta

func (ClusterStateDeltaRange) Len

func (r ClusterStateDeltaRange) Len() int

func (ClusterStateDeltaRange) Less

func (r ClusterStateDeltaRange) Less(i, j int) bool

func (ClusterStateDeltaRange) Swap

func (r ClusterStateDeltaRange) Swap(i, j int)

type ClusterStateDeltaType

type ClusterStateDeltaType int
const (
	DeltaNodeAdd                           ClusterStateDeltaType = iota
	DeltaNodeRemove                        ClusterStateDeltaType = iota
	DeltaNodeLoseToken                     ClusterStateDeltaType = iota
	DeltaNodeGainToken                     ClusterStateDeltaType = iota
	DeltaNodeGainPartitionReplicaOwnership ClusterStateDeltaType = iota
	DeltaNodeLosePartitionReplicaOwnership ClusterStateDeltaType = iota
	DeltaNodeLosePartitionReplica          ClusterStateDeltaType = iota
	DeltaNodeGainPartitionReplica          ClusterStateDeltaType = iota
	DeltaSiteAdded                         ClusterStateDeltaType = iota
	DeltaSiteRemoved                       ClusterStateDeltaType = iota
	DeltaRelayAdded                        ClusterStateDeltaType = iota
	DeltaRelayRemoved                      ClusterStateDeltaType = iota
	DeltaRelayMoved                        ClusterStateDeltaType = iota
)

type ClusterTakePartitionReplicaBody

type ClusterTakePartitionReplicaBody struct {
	Partition uint64
	Replica   uint64
	NodeID    uint64
}

type ClusterUpdateNodeBody

type ClusterUpdateNodeBody struct {
	NodeID     uint64
	NodeConfig NodeConfig
}

type ConfigController

type ConfigController struct {
	// contains filtered or unexported fields
}

func NewConfigController

func NewConfigController(raftNode *raft.RaftNode, raftTransport *raft.TransportHub, clusterController *ClusterController) *ConfigController

func (*ConfigController) AddNode

func (cc *ConfigController) AddNode(ctx context.Context, nodeConfig NodeConfig) error

func (*ConfigController) CancelProposals

func (cc *ConfigController) CancelProposals()

func (*ConfigController) ClusterCommand

func (cc *ConfigController) ClusterCommand(ctx context.Context, commandBody interface{}) error

func (*ConfigController) ClusterController

func (cc *ConfigController) ClusterController() *ClusterController

func (*ConfigController) LogDump

func (cc *ConfigController) LogDump() (raftpb.Snapshot, []raftpb.Entry, error)

func (*ConfigController) OnClusterSnapshot

func (cc *ConfigController) OnClusterSnapshot(cb func(snapshotIndex uint64, snapshotId string))

func (*ConfigController) OnLocalUpdates

func (cc *ConfigController) OnLocalUpdates(cb func(deltas []ClusterStateDelta))

func (*ConfigController) RemoveNode

func (cc *ConfigController) RemoveNode(ctx context.Context, nodeID uint64) error

func (*ConfigController) ReplaceNode

func (cc *ConfigController) ReplaceNode(ctx context.Context, replacedNodeID uint64, replacementNodeID uint64) error

func (*ConfigController) Start

func (cc *ConfigController) Start() error

func (*ConfigController) Stop

func (cc *ConfigController) Stop()

type ConfigControllerBuilder

type ConfigControllerBuilder struct {
	// contains filtered or unexported fields
}

func (*ConfigControllerBuilder) Create

func (*ConfigControllerBuilder) SetCreateNewCluster

func (builder *ConfigControllerBuilder) SetCreateNewCluster(b bool) ClusterConfigControllerBuilder

func (*ConfigControllerBuilder) SetLocalNodeAddress

func (builder *ConfigControllerBuilder) SetLocalNodeAddress(address PeerAddress) ClusterConfigControllerBuilder

func (*ConfigControllerBuilder) SetRaftNodeStorage

func (builder *ConfigControllerBuilder) SetRaftNodeStorage(raftStorage RaftNodeStorage) ClusterConfigControllerBuilder

func (*ConfigControllerBuilder) SetRaftNodeTransport

func (builder *ConfigControllerBuilder) SetRaftNodeTransport(transport *TransportHub) ClusterConfigControllerBuilder

type NodeAdd

type NodeAdd struct {
	NodeID     uint64
	NodeConfig NodeConfig
}

type NodeAddress

type NodeAddress struct {
	NodeID  uint64
	Address ddbRaft.PeerAddress
}

type NodeConfig

type NodeConfig struct {
	// The network address of the node
	Address ddbRaft.PeerAddress
	// Node capacity in bytes
	Capacity uint64
	// The tokens owned by this node
	Tokens map[uint64]bool
	// a set of partition replicas owned by this node
	OwnedPartitionReplicas map[uint64]map[uint64]bool
	// a set of partition replicas held by this node. This is derived from the cluster state and is used
	// only internally for quick lookup. It is not stored or transferred as part of a node's configuration
	PartitionReplicas map[uint64]map[uint64]bool
}

type NodeConfigList

type NodeConfigList []NodeConfig

func (NodeConfigList) Len

func (nodeConfigList NodeConfigList) Len() int

func (NodeConfigList) Less

func (nodeConfigList NodeConfigList) Less(i, j int) bool

func (NodeConfigList) Swap

func (nodeConfigList NodeConfigList) Swap(i, j int)

type NodeGainPartitionReplica

type NodeGainPartitionReplica struct {
	NodeID    uint64
	Partition uint64
	Replica   uint64
}

type NodeGainPartitionReplicaOwnership

type NodeGainPartitionReplicaOwnership struct {
	NodeID    uint64
	Partition uint64
	Replica   uint64
}

type NodeGainToken

type NodeGainToken struct {
	NodeID uint64
	Token  uint64
}

type NodeLosePartitionReplica

type NodeLosePartitionReplica struct {
	NodeID    uint64
	Partition uint64
	Replica   uint64
}

type NodeLosePartitionReplicaOwnership

type NodeLosePartitionReplicaOwnership struct {
	NodeID    uint64
	Partition uint64
	Replica   uint64
}

type NodeLoseToken

type NodeLoseToken struct {
	NodeID uint64
	Token  uint64
}

type NodeRemove

type NodeRemove struct {
	NodeID uint64
}

type NodeTokenCount

type NodeTokenCount struct {
	NodeID     uint64
	TokenCount int
}

type NodeTokenCountHeap

type NodeTokenCountHeap []NodeTokenCount

func (NodeTokenCountHeap) Len

func (nodeTokenCountHeap NodeTokenCountHeap) Len() int

func (NodeTokenCountHeap) Less

func (nodeTokenCountHeap NodeTokenCountHeap) Less(i, j int) bool

func (*NodeTokenCountHeap) Pop

func (nodeTokenCountHeap *NodeTokenCountHeap) Pop() interface{}

func (*NodeTokenCountHeap) Push

func (nodeTokenCountHeap *NodeTokenCountHeap) Push(x interface{})

func (NodeTokenCountHeap) Swap

func (nodeTokenCountHeap NodeTokenCountHeap) Swap(i, j int)

type PartitionReplica

type PartitionReplica struct {
	// The partition number. The partition number combined with the total number of partitions and the range of the hash
	// space define a contiguous range in the hash space for which this partition is responsible
	Partition uint64
	// The index of this partition replica. If the replication factor is set to 3 this number will range from 0 to 2
	// The 0th partition replica represents the primary replica for that partition. The owner of the primary replica
	// will be the only node able to accept writes for this partition. The other replicas for this partition serve
	// only as backups
	Replica uint64
	// The ID of the node that holds this partition replica. The holder can differ from the owner if the cluster is in
	// a transitional state and the partition replica is being transferred to a new node. The owner is based only on
	// the current token assignments
	Holder uint64
	// The ID of the node that owns this partition
	Owner uint64
}

type PartitioningStrategy

type PartitioningStrategy interface {
	AssignTokens(nodes []NodeConfig, currentTokenAssignment []uint64, partitions uint64) ([]uint64, error)
	AssignPartitions(nodes []NodeConfig, currentPartitionAssignment [][]uint64)
	Owners(tokenAssignment []uint64, partition uint64, replicationFactor uint64) []uint64
	Partition(key string, partitionCount uint64) uint64
}

type RelayAdded

type RelayAdded struct {
	RelayID string
}

type RelayMoved

type RelayMoved struct {
	RelayID string
	SiteID  string
}

type RelayRemoved

type RelayRemoved struct {
	RelayID string
}

type SimplePartitioningStrategy

type SimplePartitioningStrategy struct {
	// contains filtered or unexported fields
}

Simple replication strategy that does not account for capacity other than finding nodes that are marked as having 0 capacity to account for decomissioned nodes. Other than that It just tries to assign as close to an even amount of tokens to each node as possible

func (*SimplePartitioningStrategy) AssignPartitions

func (ps *SimplePartitioningStrategy) AssignPartitions(nodes []NodeConfig, currentPartitionAssignment [][]uint64)

func (*SimplePartitioningStrategy) AssignTokens

func (ps *SimplePartitioningStrategy) AssignTokens(nodes []NodeConfig, currentAssignments []uint64, partitions uint64) ([]uint64, error)

func (*SimplePartitioningStrategy) CalculateShiftAmount

func (ps *SimplePartitioningStrategy) CalculateShiftAmount(partitionCount uint64) int

func (*SimplePartitioningStrategy) Owners

func (ps *SimplePartitioningStrategy) Owners(tokenAssignment []uint64, partition uint64, replicationFactor uint64) []uint64

func (*SimplePartitioningStrategy) Partition

func (ps *SimplePartitioningStrategy) Partition(key string, partitionCount uint64) uint64

type SiteAdded

type SiteAdded struct {
	SiteID string
}

type SiteRemoved

type SiteRemoved struct {
	SiteID string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL