node

package
v0.0.0-...-ac41614 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2022 License: MIT Imports: 38 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RaftStoreStoragePrefix = iota
	SiteStoreStoragePrefix = iota
	SnapshotMetadataPrefix = iota
)
View Source
const ClusterJoinRetryTimeout = 5
View Source
const SnapshotUUIDKey string = "UUID"

Variables

View Source
var EDecommissioned = errors.New("")
View Source
var ERemoved = errors.New("")
View Source
var ESnapshotsNotEnabled = errors.New("No snapshot directory configured")

Functions

This section is empty.

Types

type ClusterNode

type ClusterNode struct {
	// contains filtered or unexported fields
}

func New

func New(config ClusterNodeConfig) *ClusterNode

func (*ClusterNode) AcceptRelayConnection

func (node *ClusterNode) AcceptRelayConnection(conn *websocket.Conn, header http.Header)

func (*ClusterNode) Batch

func (node *ClusterNode) Batch(ctx context.Context, partitionNumber uint64, siteID string, bucketName string, updateBatch *UpdateBatch) (map[string]*SiblingSet, error)

func (*ClusterNode) ClusterConfigController

func (node *ClusterNode) ClusterConfigController() ClusterConfigController

func (*ClusterNode) ClusterIO

func (node *ClusterNode) ClusterIO() clusterio.ClusterIOAgent

func (*ClusterNode) DisconnectRelay

func (node *ClusterNode) DisconnectRelay(relayID string)

func (*ClusterNode) DisconnectRelayByPartition

func (node *ClusterNode) DisconnectRelayByPartition(partitionNumber uint64)

func (*ClusterNode) DisconnectRelayBySite

func (node *ClusterNode) DisconnectRelayBySite(siteID string)

func (*ClusterNode) Get

func (node *ClusterNode) Get(ctx context.Context, partitionNumber uint64, siteID string, bucketName string, keys [][]byte) ([]*SiblingSet, error)

func (*ClusterNode) GetMatches

func (node *ClusterNode) GetMatches(ctx context.Context, partitionNumber uint64, siteID string, bucketName string, keys [][]byte) (SiblingSetIterator, error)

func (*ClusterNode) ID

func (node *ClusterNode) ID() uint64

func (*ClusterNode) LeaveCluster

func (node *ClusterNode) LeaveCluster() (error, <-chan error)

func (*ClusterNode) Merge

func (node *ClusterNode) Merge(ctx context.Context, partitionNumber uint64, siteID string, bucketName string, patch map[string]*SiblingSet, broadcastToRelays bool) error

func (*ClusterNode) Name

func (node *ClusterNode) Name() string

func (*ClusterNode) OnInitialized

func (node *ClusterNode) OnInitialized(cb func())

func (*ClusterNode) RelayStatus

func (node *ClusterNode) RelayStatus(relayID string) (RelayStatus, error)

func (*ClusterNode) Start

func (node *ClusterNode) Start(options NodeInitializationOptions) error

func (*ClusterNode) Stop

func (node *ClusterNode) Stop()

func (*ClusterNode) UseRaftStore

func (node *ClusterNode) UseRaftStore(raftStore RaftNodeStorage)

type ClusterNodeConfig

type ClusterNodeConfig struct {
	StorageDriver StorageDriver
	CloudServer   *CloudServer
	MerkleDepth   uint8
	Capacity      uint64
	NoValidate    bool
}

type ClusterNodeFacade

type ClusterNodeFacade struct {
	// contains filtered or unexported fields
}

func (*ClusterNodeFacade) AcceptRelayConnection

func (clusterFacade *ClusterNodeFacade) AcceptRelayConnection(conn *websocket.Conn, header http.Header)

func (*ClusterNodeFacade) AddNode

func (clusterFacade *ClusterNodeFacade) AddNode(ctx context.Context, nodeConfig NodeConfig) error

func (*ClusterNodeFacade) AddRelay

func (clusterFacade *ClusterNodeFacade) AddRelay(ctx context.Context, relayID string) error

func (*ClusterNodeFacade) AddSite

func (clusterFacade *ClusterNodeFacade) AddSite(ctx context.Context, siteID string) error

func (*ClusterNodeFacade) Batch

func (clusterFacade *ClusterNodeFacade) Batch(siteID string, bucket string, updateBatch *UpdateBatch) (BatchResult, error)

func (*ClusterNodeFacade) CheckLocalSnapshotStatus

func (clusterFacade *ClusterNodeFacade) CheckLocalSnapshotStatus(snapshotId string) error

func (*ClusterNodeFacade) ClusterClient

func (clusterFacade *ClusterNodeFacade) ClusterClient() *client.Client

func (*ClusterNodeFacade) ClusterNodes

func (clusterFacade *ClusterNodeFacade) ClusterNodes() []NodeConfig

func (*ClusterNodeFacade) ClusterSettings

func (clusterFacade *ClusterNodeFacade) ClusterSettings() ClusterSettings

func (*ClusterNodeFacade) ClusterSnapshot

func (clusterFacade *ClusterNodeFacade) ClusterSnapshot(ctx context.Context) (Snapshot, error)

func (*ClusterNodeFacade) Decommission

func (clusterFacade *ClusterNodeFacade) Decommission() error

func (*ClusterNodeFacade) DecommissionPeer

func (clusterFacade *ClusterNodeFacade) DecommissionPeer(nodeID uint64) error

func (*ClusterNodeFacade) Get

func (clusterFacade *ClusterNodeFacade) Get(siteID string, bucket string, keys [][]byte) ([]*SiblingSet, error)

func (*ClusterNodeFacade) GetMatches

func (clusterFacade *ClusterNodeFacade) GetMatches(siteID string, bucket string, keys [][]byte) (SiblingSetIterator, error)

func (*ClusterNodeFacade) GetRelayStatus

func (clusterFacade *ClusterNodeFacade) GetRelayStatus(ctx context.Context, relayID string) (RelayStatus, error)

func (*ClusterNodeFacade) LocalBatch

func (clusterFacade *ClusterNodeFacade) LocalBatch(partitionNumber uint64, siteID string, bucketName string, updateBatch *UpdateBatch) (map[string]*SiblingSet, error)

func (*ClusterNodeFacade) LocalGet

func (clusterFacade *ClusterNodeFacade) LocalGet(partitionNumber uint64, siteID string, bucketName string, keys [][]byte) ([]*SiblingSet, error)

func (*ClusterNodeFacade) LocalGetMatches

func (clusterFacade *ClusterNodeFacade) LocalGetMatches(partitionNumber uint64, siteID string, bucketName string, keys [][]byte) (SiblingSetIterator, error)

func (*ClusterNodeFacade) LocalGetRelayStatus

func (clusterFacade *ClusterNodeFacade) LocalGetRelayStatus(relayID string) (RelayStatus, error)

func (*ClusterNodeFacade) LocalLogDump

func (clusterFacade *ClusterNodeFacade) LocalLogDump() (LogDump, error)

func (*ClusterNodeFacade) LocalMerge

func (clusterFacade *ClusterNodeFacade) LocalMerge(partitionNumber uint64, siteID string, bucketName string, patch map[string]*SiblingSet, broadcastToRelays bool) error

func (*ClusterNodeFacade) LocalNodeID

func (clusterFacade *ClusterNodeFacade) LocalNodeID() uint64

func (*ClusterNodeFacade) MoveRelay

func (clusterFacade *ClusterNodeFacade) MoveRelay(ctx context.Context, relayID string, siteID string) error

func (*ClusterNodeFacade) PartitionDistribution

func (clusterFacade *ClusterNodeFacade) PartitionDistribution() [][]uint64

func (*ClusterNodeFacade) PeerAddress

func (clusterFacade *ClusterNodeFacade) PeerAddress(nodeID uint64) PeerAddress

func (*ClusterNodeFacade) RemoveNode

func (clusterFacade *ClusterNodeFacade) RemoveNode(ctx context.Context, nodeID uint64) error

func (*ClusterNodeFacade) RemoveRelay

func (clusterFacade *ClusterNodeFacade) RemoveRelay(ctx context.Context, relayID string) error

func (*ClusterNodeFacade) RemoveSite

func (clusterFacade *ClusterNodeFacade) RemoveSite(ctx context.Context, siteID string) error

func (*ClusterNodeFacade) ReplaceNode

func (clusterFacade *ClusterNodeFacade) ReplaceNode(ctx context.Context, nodeID uint64, replacementNodeID uint64) error

func (*ClusterNodeFacade) TokenAssignments

func (clusterFacade *ClusterNodeFacade) TokenAssignments() []uint64

func (*ClusterNodeFacade) WriteLocalSnapshot

func (clusterFacade *ClusterNodeFacade) WriteLocalSnapshot(snapshotId string, w io.Writer) error

type ClusterNodePartitionUpdater

type ClusterNodePartitionUpdater interface {
	UpdatePartition(partitionNumber uint64)
}

type ClusterNodeStateCoordinator

type ClusterNodeStateCoordinator struct {
	// contains filtered or unexported fields
}

func NewClusterNodeStateCoordinator

func NewClusterNodeStateCoordinator(nodeFacade ClusterNodeCoordinatorFacade, partitionUpdater ClusterNodePartitionUpdater) *ClusterNodeStateCoordinator

func (*ClusterNodeStateCoordinator) InitializeNodeState

func (coordinator *ClusterNodeStateCoordinator) InitializeNodeState()

func (*ClusterNodeStateCoordinator) ProcessClusterUpdates

func (coordinator *ClusterNodeStateCoordinator) ProcessClusterUpdates(deltas []ClusterStateDelta)

type Node

type Node interface {
	// Start up the node.
	// Case 1) This node is not yet part of a cluster
	//   It will use the initialization options to figure out whether it should start a new cluster or join an existing one.
	// Case 2) This node is part of a cluster and the decomissioning flag is not set
	//   It should start up and resume its operations as a member of its cluster. Start will run until Stop is called,
	//   in which case it will return nil, or until the node is removed from the cluster in which case it returns ERemoved
	//   or EDecommissioned
	// Case 3) This node is part of a cluster and the decomissioning flag is set
	//   It should start up in decomissioning mode, allowing only operations
	//   which transfer its partitions to new owners. After it has been removed from the cluster
	//   Start returns EDecomissioned or ERemoved
	// EDecomissioned is returned when the node was removed from the cluster after successfully transferring away all its
	// data to other nodes in the cluster
	// ERemoved is returned when the node was removed from the cluster before successfully transferring away all its data
	// to other nodes in the cluster
	ID() uint64
	Start(options NodeInitializationOptions) error
	// Shut down the node
	Stop()
	Batch(ctx context.Context, partition uint64, siteID string, bucket string, updateBatch *UpdateBatch) (map[string]*SiblingSet, error)
	Merge(ctx context.Context, partition uint64, siteID string, bucket string, patch map[string]*SiblingSet, broadcastToRelays bool) error
	Get(ctx context.Context, partition uint64, siteID string, bucket string, keys [][]byte) ([]*SiblingSet, error)
	GetMatches(ctx context.Context, partition uint64, siteID string, bucket string, keys [][]byte) (SiblingSetIterator, error)
	RelayStatus(relayID string) (RelayStatus, error)
}

A Node coordinates interactions between internal node components

type NodeClient

type NodeClient struct {
	// contains filtered or unexported fields
}

func NewNodeClient

func NewNodeClient(localNode Node, configController ClusterConfigController) *NodeClient

func (*NodeClient) Batch

func (nodeClient *NodeClient) Batch(ctx context.Context, nodeID uint64, partition uint64, siteID string, bucket string, updateBatch *UpdateBatch) (map[string]*SiblingSet, error)

func (*NodeClient) Get

func (nodeClient *NodeClient) Get(ctx context.Context, nodeID uint64, partition uint64, siteID string, bucket string, keys [][]byte) ([]*SiblingSet, error)

func (*NodeClient) GetMatches

func (nodeClient *NodeClient) GetMatches(ctx context.Context, nodeID uint64, partition uint64, siteID string, bucket string, keys [][]byte) (SiblingSetIterator, error)

func (*NodeClient) LocalNodeID

func (nodeClient *NodeClient) LocalNodeID() uint64

func (*NodeClient) Merge

func (nodeClient *NodeClient) Merge(ctx context.Context, nodeID uint64, partition uint64, siteID string, bucket string, patch map[string]*SiblingSet, broadcastToRelays bool) error

func (*NodeClient) RelayStatus

func (nodeClient *NodeClient) RelayStatus(ctx context.Context, nodeID uint64, siteID string, relayID string) (RelayStatus, error)

type NodeCoordinatorFacade

type NodeCoordinatorFacade struct {
	// contains filtered or unexported fields
}

func (*NodeCoordinatorFacade) AddPartition

func (nodeFacade *NodeCoordinatorFacade) AddPartition(partitionNumber uint64)

func (*NodeCoordinatorFacade) AddRelay

func (nodeFacade *NodeCoordinatorFacade) AddRelay(relayID string)

func (*NodeCoordinatorFacade) AddSite

func (nodeFacade *NodeCoordinatorFacade) AddSite(siteID string)

func (*NodeCoordinatorFacade) DisableOutgoingTransfers

func (nodeFacade *NodeCoordinatorFacade) DisableOutgoingTransfers(partitionNumber uint64)

func (*NodeCoordinatorFacade) DisconnectRelays

func (nodeFacade *NodeCoordinatorFacade) DisconnectRelays(partitionNumber uint64)

func (*NodeCoordinatorFacade) EnableOutgoingTransfers

func (nodeFacade *NodeCoordinatorFacade) EnableOutgoingTransfers(partitionNumber uint64)

func (*NodeCoordinatorFacade) HeldPartitionReplicas

func (nodeFacade *NodeCoordinatorFacade) HeldPartitionReplicas() map[uint64]map[uint64]bool

func (*NodeCoordinatorFacade) ID

func (nodeFacade *NodeCoordinatorFacade) ID() uint64

func (*NodeCoordinatorFacade) LockPartitionReads

func (nodeFacade *NodeCoordinatorFacade) LockPartitionReads(partitionNumber uint64)

func (*NodeCoordinatorFacade) LockPartitionWrites

func (nodeFacade *NodeCoordinatorFacade) LockPartitionWrites(partitionNumber uint64)

func (*NodeCoordinatorFacade) MoveRelay

func (nodeFacade *NodeCoordinatorFacade) MoveRelay(relayID string, siteID string)

func (*NodeCoordinatorFacade) NeighborsWithCapacity

func (nodeFacade *NodeCoordinatorFacade) NeighborsWithCapacity() int

func (*NodeCoordinatorFacade) NotifyEmpty

func (nodeFacade *NodeCoordinatorFacade) NotifyEmpty()

func (*NodeCoordinatorFacade) NotifyJoinedCluster

func (nodeFacade *NodeCoordinatorFacade) NotifyJoinedCluster()

func (*NodeCoordinatorFacade) NotifyLeftCluster

func (nodeFacade *NodeCoordinatorFacade) NotifyLeftCluster()

func (*NodeCoordinatorFacade) OwnedPartitionReplicas

func (nodeFacade *NodeCoordinatorFacade) OwnedPartitionReplicas() map[uint64]map[uint64]bool

func (*NodeCoordinatorFacade) RemovePartition

func (nodeFacade *NodeCoordinatorFacade) RemovePartition(partitionNumber uint64)

func (*NodeCoordinatorFacade) RemoveRelay

func (nodeFacade *NodeCoordinatorFacade) RemoveRelay(relayID string)

func (*NodeCoordinatorFacade) RemoveSite

func (nodeFacade *NodeCoordinatorFacade) RemoveSite(siteID string)

func (*NodeCoordinatorFacade) StartIncomingTransfer

func (nodeFacade *NodeCoordinatorFacade) StartIncomingTransfer(partitionNumber uint64, replicaNumber uint64)

func (*NodeCoordinatorFacade) StopIncomingTransfer

func (nodeFacade *NodeCoordinatorFacade) StopIncomingTransfer(partitionNumber uint64, replicaNumber uint64)

func (*NodeCoordinatorFacade) UnlockPartitionReads

func (nodeFacade *NodeCoordinatorFacade) UnlockPartitionReads(partitionNumber uint64)

func (*NodeCoordinatorFacade) UnlockPartitionWrites

func (nodeFacade *NodeCoordinatorFacade) UnlockPartitionWrites(partitionNumber uint64)

type NodeInitializationOptions

type NodeInitializationOptions struct {
	StartCluster      bool
	JoinCluster       bool
	ClusterSettings   ClusterSettings
	SeedNodeHost      string
	SeedNodePort      int
	ClusterHost       string
	ClusterPort       int
	ExternalHost      string
	ExternalPort      int
	SyncMaxSessions   uint
	SyncPathLimit     uint32
	SyncPeriod        uint
	SnapshotDirectory string
}

func (NodeInitializationOptions) ClusterAddress

func (options NodeInitializationOptions) ClusterAddress() (host string, port int)

func (NodeInitializationOptions) ExternalAddress

func (options NodeInitializationOptions) ExternalAddress() (host string, port int)

func (NodeInitializationOptions) SeedNode

func (options NodeInitializationOptions) SeedNode() (host string, port int)

func (NodeInitializationOptions) ShouldJoinCluster

func (options NodeInitializationOptions) ShouldJoinCluster() bool

func (NodeInitializationOptions) ShouldStartCluster

func (options NodeInitializationOptions) ShouldStartCluster() bool

func (NodeInitializationOptions) SnapshotsEnabled

func (options NodeInitializationOptions) SnapshotsEnabled() bool

type NodePartitionUpdater

type NodePartitionUpdater struct {
	// contains filtered or unexported fields
}

func NewNodePartitionUpdater

func NewNodePartitionUpdater(nodeFacade ClusterNodeCoordinatorFacade) *NodePartitionUpdater

func (*NodePartitionUpdater) UpdatePartition

func (partitionUpdater *NodePartitionUpdater) UpdatePartition(partitionNumber uint64)

type PartitionResolver

type PartitionResolver struct {
	// contains filtered or unexported fields
}

func NewPartitionResolver

func NewPartitionResolver(configController ClusterConfigController) *PartitionResolver

func (*PartitionResolver) Partition

func (partitionResolver *PartitionResolver) Partition(partitioningKey string) uint64

func (*PartitionResolver) ReplicaNodes

func (partitionResolver *PartitionResolver) ReplicaNodes(partition uint64) []uint64

type Snapshotter

type Snapshotter struct {
	// contains filtered or unexported fields
}

func (*Snapshotter) CheckSnapshotStatus

func (snapshotter *Snapshotter) CheckSnapshotStatus(snapshotId string) error

func (*Snapshotter) Snapshot

func (snapshotter *Snapshotter) Snapshot(snapshotIndex uint64, snapshotId string) error

func (*Snapshotter) WriteSnapshot

func (snapshotter *Snapshotter) WriteSnapshot(snapshotId string, w io.Writer) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL