Documentation ¶
Overview ¶
Package raft implements a Consensus component for IPFS Cluster which uses Raft (go-libp2p-raft).
Index ¶
- Variables
- func NewConsensusWithRPCClient(staging bool) ...
- func ValidateConfig(cfg *ClusterRaftConfig) error
- type ClusterRaftConfig
- type Consensus
- func (cc *Consensus) AddPeer(ctx context.Context, pid peer.ID) error
- func (cc *Consensus) Clean(ctx context.Context) error
- func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error
- func (cc *Consensus) Distrust(ctx context.Context, pid peer.ID) error
- func (cc *Consensus) IsLeader(ctx context.Context) bool
- func (cc *Consensus) IsTrustedPeer(ctx context.Context, p peer.ID) bool
- func (cc *Consensus) Leader(ctx context.Context) (peer.ID, error)
- func (cc *Consensus) Peers(ctx context.Context) ([]peer.ID, error)
- func (cc *Consensus) Ready(ctx context.Context) <-chan struct{}
- func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interface{}) (bool, error)
- func (cc *Consensus) RmPeer(ctx context.Context, pid peer.ID) error
- func (cc *Consensus) Shutdown(ctx context.Context) error
- func (cc *Consensus) State(ctx context.Context) (*RaftState, error)
- func (cc *Consensus) Trust(ctx context.Context, pid peer.ID) error
- func (cc *Consensus) WaitForSync(ctx context.Context) error
- type ConsensusAPI
- type ConsensusOp
- type RaftState
Constants ¶
This section is empty.
Variables ¶
var ( DefaultDataSubFolder = "raft-cluster" DefaultWaitForLeaderTimeout = 15 * time.Second DefaultCommitRetries = 1 DefaultNetworkTimeout = 100 * time.Second DefaultCommitRetryDelay = 200 * time.Millisecond DefaultBackupsRotate = 6 )
Configuration defaults
var RaftLogCacheSize = 512
RaftLogCacheSize is the maximum number of logs to cache in-memory. This is used to reduce disk I/O for the recently committed entries.
var RaftMaxSnapshots = 5
RaftMaxSnapshots indicates how many snapshots to keep in the consensus data folder. TODO: Maybe include this in Config. Not sure how useful it is to touch this anyways.
Functions ¶
func NewConsensusWithRPCClient ¶
func NewConsensusWithRPCClient(staging bool) func(host host.Host, cfg *ClusterRaftConfig, rpcClient *rpc.Client, mpool *messagepool.MessagePool, repo repo.LockedRepo, ) (*Consensus, error)
TODO: Merge with NewConsensus and remove the rpcReady chan
func ValidateConfig ¶
func ValidateConfig(cfg *ClusterRaftConfig) error
Validate checks that this configuration has working values, at least in appearance.
Types ¶
type ClusterRaftConfig ¶
type ClusterRaftConfig struct { // config to enabled node cluster with raft consensus ClusterModeEnabled bool // A folder to store Raft's data. DataFolder string // InitPeerset provides the list of initial cluster peers for new Raft // peers (with no prior state). It is ignored when Raft was already // initialized or when starting in staging mode. InitPeerset []string // LeaderTimeout specifies how long to wait for a leader before // failing an operation. WaitForLeaderTimeout time.Duration // NetworkTimeout specifies how long before a Raft network // operation is timed out NetworkTimeout time.Duration // CommitRetries specifies how many times we retry a failed commit until // we give up. CommitRetries int // How long to wait between retries CommitRetryDelay time.Duration // BackupsRotate specifies the maximum number of Raft's DataFolder // copies that we keep as backups (renaming) after cleanup. BackupsRotate int // A Hashicorp Raft's configuration object. RaftConfig *hraft.Config // Tracing enables propagation of contexts across binary boundaries. Tracing bool }
ClusterRaftConfig allows to configure the Raft Consensus component for the node cluster.
func DefaultClusterRaftConfig ¶
func DefaultClusterRaftConfig() *ClusterRaftConfig
func NewClusterRaftConfig ¶
func NewClusterRaftConfig(userRaftConfig *config.UserRaftConfig) *ClusterRaftConfig
func (*ClusterRaftConfig) GetDataFolder ¶
func (cfg *ClusterRaftConfig) GetDataFolder(repo repo.LockedRepo) string
GetDataFolder returns the Raft data folder that we are using.
type Consensus ¶
Consensus handles the work of keeping a shared-state between the peers of a Lotus Cluster, as well as modifying that state and applying any updates in a thread-safe manner.
func NewConsensus ¶
func NewConsensus(host host.Host, cfg *ClusterRaftConfig, mpool *messagepool.MessagePool, repo repo.LockedRepo, staging bool) (*Consensus, error)
NewConsensus builds a new ClusterConsensus component using Raft.
Raft saves state snapshots regularly and persists log data in a bolt datastore. Therefore, unless memory usage is a concern, it is recommended to use an in-memory go-datastore as store parameter.
The staging parameter controls if the Raft peer should start in staging mode (used when joining a new Raft peerset with other peers).
func (*Consensus) AddPeer ¶
AddPeer adds a new peer to participate in this consensus. It will forward the operation to the leader if this is not it.
func (*Consensus) Commit ¶
func (cc *Consensus) Commit(ctx context.Context, op *ConsensusOp) error
commit submits a cc.consensus commit. It retries upon failures.
func (*Consensus) IsTrustedPeer ¶
IsTrustedPeer returns true. In Raft we trust all peers.
func (*Consensus) Leader ¶
Leader returns the peerID of the Leader of the cluster. It returns an error when there is no leader.
func (*Consensus) Peers ¶
Peers return the current list of peers in the consensus. The list will be sorted alphabetically.
func (*Consensus) Ready ¶
Ready returns a channel which is signaled when the Consensus algorithm has finished bootstrapping and is ready to use
func (*Consensus) RedirectToLeader ¶
func (cc *Consensus) RedirectToLeader(method string, arg interface{}, ret interface{}) (bool, error)
returns true if the operation was redirected to the leader note that if the leader just dissappeared, the rpc call will fail because we haven't heard that it's gone.
func (*Consensus) RmPeer ¶
RmPeer removes a peer from this consensus. It will forward the operation to the leader if this is not it.
func (*Consensus) Shutdown ¶
Shutdown stops the component so it will not process any more updates. The underlying consensus is permanently shutdown, along with the libp2p transport.
func (*Consensus) State ¶
RaftState retrieves the current consensus RaftState. It may error if no RaftState has been agreed upon or the state is not consistent. The returned RaftState is the last agreed-upon RaftState known by this node. No writes are allowed, as all writes to the shared state should happen through the Consensus component methods.
type ConsensusAPI ¶
type ConsensusAPI interface { // Returns a channel to signal that the consensus layer is ready // allowing the main component to wait for it during start. Ready(context.Context) <-chan struct{} AddPeer(context.Context, peer.ID) error RmPeer(context.Context, peer.ID) error State(context.Context) (consensus.State, error) // Provide a node which is responsible to perform // specific tasks which must only run in 1 cluster peer. Leader(context.Context) (peer.ID, error) // Only returns when the consensus state has all log // updates applied to it. WaitForSync(context.Context) error // Clean removes all consensus data. Clean(context.Context) error // Peers returns the peerset participating in the Consensus. Peers(context.Context) ([]peer.ID, error) // IsTrustedPeer returns true if the given peer is "trusted". // This will grant access to more rpc endpoints and a // non-trusted one. This should be fast as it will be // called repeatedly for every remote RPC request. IsTrustedPeer(context.Context, peer.ID) bool // Trust marks a peer as "trusted". Trust(context.Context, peer.ID) error // Distrust removes a peer from the "trusted" set. Distrust(context.Context, peer.ID) error // Returns true if current node is the cluster leader IsLeader(ctx context.Context) bool Shutdown(context.Context) error }
type ConsensusOp ¶
type RaftState ¶
type RaftState struct { NonceMap api.NonceMapType MsgUuids api.MsgUuidMapType // This is because the miner only stores signed CIDs but the message received from in a // block will be unsigned (for BLS). Hence, the process relies on the node to store the // signed message which holds a copy of the unsigned message to properly perform all the // needed checks Mpool *messagepool.MessagePool }