README ¶
raft
raft is a Go library that manages a replicated log and can be used with an FSM to manage replicated state machines. It is library for providing consensus.
The use cases for such a library are far-reaching as replicated state machines are a key component of many distributed systems. They enable building Consistent, Partition Tolerant (CP) systems, with limited fault tolerance as well.
Building
If you wish to build raft you'll need Go version 1.2+ installed.
Please check your installation with:
go version
Documentation
For complete documentation, see the associated Godoc.
To prevent complications with cgo, the primary backend MDBStore
is in a separate repositoy,
called raft-mdb. That is the recommended implementation
for the LogStore
and StableStore
.
A pure Go backend using BoltDB is also available called
raft-boltdb. It can also be used as a LogStore
and StableStore
.
Protocol
raft is based on "Raft: In Search of an Understandable Consensus Algorithm"
A high level overview of the Raft protocol is described below, but for details please read the full Raft paper followed by the raft source. Any questions about the raft protocol should be sent to the raft-dev mailing list.
Protocol Description
Raft nodes are always in one of three states: follower, candidate or leader. All nodes initially start out as a follower. In this state, nodes can accept log entries from a leader and cast votes. If no entries are received for some time, nodes self-promote to the candidate state. In the candidate state nodes request votes from their peers. If a candidate receives a quorum of votes, then it is promoted to a leader. The leader must accept new log entries and replicate to all the other followers. In addition, if stale reads are not acceptable, all queries must also be performed on the leader.
Once a cluster has a leader, it is able to accept new log entries. A client can request that a leader append a new log entry, which is an opaque binary blob to Raft. The leader then writes the entry to durable storage and attempts to replicate to a quorum of followers. Once the log entry is considered committed, it can be applied to a finite state machine. The finite state machine is application specific, and is implemented using an interface.
An obvious question relates to the unbounded nature of a replicated log. Raft provides a mechanism by which the current state is snapshotted, and the log is compacted. Because of the FSM abstraction, restoring the state of the FSM must result in the same state as a replay of old logs. This allows Raft to capture the FSM state at a point in time, and then remove all the logs that were used to reach that state. This is performed automatically without user intervention, and prevents unbounded disk usage as well as minimizing time spent replaying logs.
Lastly, there is the issue of updating the peer set when new servers are joining or existing servers are leaving. As long as a quorum of nodes is available, this is not an issue as Raft provides mechanisms to dynamically update the peer set. If a quorum of nodes is unavailable, then this becomes a very challenging issue. For example, suppose there are only 2 peers, A and B. The quorum size is also 2, meaning both nodes must agree to commit a log entry. If either A or B fails, it is now impossible to reach quorum. This means the cluster is unable to add, or remove a node, or commit any additional log entries. This results in unavailability. At this point, manual intervention would be required to remove either A or B, and to restart the remaining node in bootstrap mode.
A Raft cluster of 3 nodes can tolerate a single node failure, while a cluster of 5 can tolerate 2 node failures. The recommended configuration is to either run 3 or 5 raft servers. This maximizes availability without greatly sacrificing performance.
In terms of performance, Raft is comparable to Paxos. Assuming stable leadership, a committing a log entry requires a single round trip to half of the cluster. Thus performance is bound by disk I/O and network latency.
Documentation ¶
Index ¶
- Constants
- Variables
- func AddUniquePeer(peers []net.Addr, peer net.Addr) []net.Addr
- func ExcludePeer(peers []net.Addr, peer net.Addr) []net.Addr
- func NewInmemTransport() (*InmemAddr, *InmemTransport)
- func PeerContained(peers []net.Addr, peer net.Addr) bool
- func ValidateConfig(config *Config) error
- type AppendEntriesRequest
- type AppendEntriesResponse
- type AppendFuture
- type AppendPipeline
- type ApplyFuture
- type Config
- type FSM
- type FSMSnapshot
- type FileSnapshotSink
- type FileSnapshotStore
- type Future
- type InmemAddr
- type InmemStore
- func (i *InmemStore) DeleteRange(min, max uint64) error
- func (i *InmemStore) FirstIndex() (uint64, error)
- func (i *InmemStore) Get(key []byte) ([]byte, error)
- func (i *InmemStore) GetLog(index uint64, log *Log) error
- func (i *InmemStore) GetUint64(key []byte) (uint64, error)
- func (i *InmemStore) LastIndex() (uint64, error)
- func (i *InmemStore) Set(key []byte, val []byte) error
- func (i *InmemStore) SetUint64(key []byte, val uint64) error
- func (i *InmemStore) StoreLog(log *Log) error
- func (i *InmemStore) StoreLogs(logs []*Log) error
- type InmemTransport
- func (i *InmemTransport) AppendEntries(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
- func (i *InmemTransport) AppendEntriesPipeline(target net.Addr) (AppendPipeline, error)
- func (i *InmemTransport) Connect(peer net.Addr, trans *InmemTransport)
- func (i *InmemTransport) Consumer() <-chan RPC
- func (i *InmemTransport) DecodePeer(buf []byte) net.Addr
- func (i *InmemTransport) Disconnect(peer net.Addr)
- func (i *InmemTransport) DisconnectAll()
- func (i *InmemTransport) EncodePeer(p net.Addr) []byte
- func (i *InmemTransport) InstallSnapshot(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, ...) error
- func (i *InmemTransport) LocalAddr() net.Addr
- func (i *InmemTransport) RequestVote(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error
- func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC))
- type InstallSnapshotRequest
- type InstallSnapshotResponse
- type JSONPeers
- type Log
- type LogCache
- type LogStore
- type LogType
- type NetworkTransport
- func (n *NetworkTransport) AppendEntries(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
- func (n *NetworkTransport) AppendEntriesPipeline(target net.Addr) (AppendPipeline, error)
- func (n *NetworkTransport) Close() error
- func (n *NetworkTransport) Consumer() <-chan RPC
- func (n *NetworkTransport) DecodePeer(buf []byte) net.Addr
- func (n *NetworkTransport) EncodePeer(p net.Addr) []byte
- func (n *NetworkTransport) InstallSnapshot(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, ...) error
- func (n *NetworkTransport) IsShutdown() bool
- func (n *NetworkTransport) LocalAddr() net.Addr
- func (n *NetworkTransport) RequestVote(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error
- func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC))
- type PeerStore
- type RPC
- type RPCResponse
- type Raft
- func (r *Raft) AddPeer(peer net.Addr) Future
- func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture
- func (r *Raft) Barrier(timeout time.Duration) Future
- func (r *Raft) LastContact() time.Time
- func (r *Raft) LastIndex() uint64
- func (r *Raft) Leader() net.Addr
- func (r *Raft) LeaderCh() <-chan bool
- func (r *Raft) RemovePeer(peer net.Addr) Future
- func (r *Raft) SetPeers(p []net.Addr) Future
- func (r *Raft) Shutdown() Future
- func (r *Raft) Snapshot() Future
- func (r *Raft) State() RaftState
- func (r *Raft) Stats() map[string]string
- func (r *Raft) String() string
- func (r *Raft) VerifyLeader() Future
- type RaftState
- type RequestVoteRequest
- type RequestVoteResponse
- type SnapshotMeta
- type SnapshotSink
- type SnapshotStore
- type StableStore
- type StaticPeers
- type StreamLayer
- type TCPStreamLayer
- type Transport
Constants ¶
const ( // DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport. DefaultTimeoutScale = 256 * 1024 // 256KB )
Variables ¶
var ( // ErrTransportShutdown is returned when operations on a transport are // invoked after it's been terminated. ErrTransportShutdown = errors.New("transport shutdown") // ErrPipelineShutdown is returned when the pipeline is closed ErrPipelineShutdown = errors.New("append pipeline closed") )
var ( // ErrLeader is returned when an operation can't be completed on a // leader node. ErrLeader = errors.New("node is the leader") // ErrNotLeader is returned when an operation can't be completed on a // follower or candidate node. ErrNotLeader = errors.New("node is not the leader") // ErrLeadershipLost is returned when a leader fails to commit a log entry // because it's been deposed in the process. ErrLeadershipLost = errors.New("leadership lost while committing log") // ErrRaftShutdown is returned when operations are requested against an // inactive Raft. ErrRaftShutdown = errors.New("raft is already shutdown") // ErrEnqueueTimeout is returned when a command fails due to a timeout. ErrEnqueueTimeout = errors.New("timed out enqueuing operation") // ErrKnownPeer is returned when trying to add a peer to the configuration // that already exists. ErrKnownPeer = errors.New("peer already known") // ErrUnknownPeer is returned when trying to remove a peer from the // configuration that doesn't exist. ErrUnknownPeer = errors.New("peer is unknown") )
var ( // ErrLogNotFound indicates a given log entry is not available. ErrLogNotFound = errors.New("log not found") // ErrPipelineReplicationNotSupported can be returned by the transport to // signal that pipeline replication is not supported in general, and that // no error message should be produced. ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported") )
Functions ¶
func AddUniquePeer ¶
AddUniquePeer is used to add a peer to a list of existing peers only if it is not already contained
func ExcludePeer ¶
ExcludePeer is used to exclude a single peer from a list of peers
func NewInmemTransport ¶
func NewInmemTransport() (*InmemAddr, *InmemTransport)
NewInmemTransport is used to initialize a new transport and generates a random local address.
func PeerContained ¶
PeerContained checks if a given peer is contained in a list
func ValidateConfig ¶
ValidateConfig is used to validate a sane configuration
Types ¶
type AppendEntriesRequest ¶
type AppendEntriesRequest struct { // Provide the current term and leader Term uint64 Leader []byte // Provide the previous entries for integrity checking PrevLogEntry uint64 PrevLogTerm uint64 // New entries to commit Entries []*Log // Commit index on the leader LeaderCommitIndex uint64 }
AppendEntriesRequest is the command used to append entries to the replicated log.
type AppendEntriesResponse ¶
type AppendEntriesResponse struct { // Newer term if leader is out of date Term uint64 // Last Log is a hint to help accelerate rebuilding slow nodes LastLog uint64 // We may not succeed if we have a conflicting entry Success bool }
AppendEntriesResponse is the response returned from an AppendEntriesRequest.
type AppendFuture ¶
type AppendFuture interface { Future Start() time.Time Request() *AppendEntriesRequest Response() *AppendEntriesResponse }
AppendFuture is used to return information about a pipelined AppendEntries request
type AppendPipeline ¶
type AppendPipeline interface { // AppendEntries is used to add another request to the pipeline. // The send may block which is an effective form of back-pressure. AppendEntries(args *AppendEntriesRequest, resp *AppendEntriesResponse) (AppendFuture, error) // Consumer returns a channel that can be used to consume // response futures when they are ready Consumer() <-chan AppendFuture // Closes pipeline and cancels all inflight RPCs Close() error }
AppendPipeline is used for pipelining AppendEntries requests. It is used to increase the replication throughput by masking latency and better utilizing bandwidth.
type ApplyFuture ¶
type ApplyFuture interface { Future Response() interface{} }
ApplyFuture is used for Apply() and can returns the FSM response
type Config ¶
type Config struct { // Time in follower state without a leader before we attempt an election HeartbeatTimeout time.Duration // Time in candidate state without a leader before we attempt an election ElectionTimeout time.Duration // Time without an Apply() operation before we heartbeat to ensure // a timely commit. Due to random staggering, may be delayed as much as // 2x this value. CommitTimeout time.Duration // MaxAppendEntries controls the maximum number of append entries // to send at once. We want to strike a balance between efficiency // and avoiding waste if the follower is going to reject because of // an inconsistent log MaxAppendEntries int // If we are a member of a cluster, and RemovePeer is invoked for the // local node, then we forget all peers and transition into the follower state. // If ShutdownOnRemove is is set, we additional shutdown Raft. Otherwise, // we can become a leader of a cluster containing only this node. ShutdownOnRemove bool // DisableBootstrapAfterElect is used to turn off EnableSingleNode // after the node is elected. This is used to prevent self-election // if the node is removed from the Raft cluster via RemovePeer. Setting // it to false will keep the bootstrap mode, allowing the node to self-elect // and potentially bootstrap a seperate cluster. DisableBootstrapAfterElect bool // TrailingLogs controls how many logs we leave after a snapshot. This is // used so that we can quickly replay logs on a follower instead of being // forced to send an entire snapshot. TrailingLogs uint64 // SnapshotInterval controls how often we check if we should perform a snapshot. // We randomly stagger between this value and 2x this value to avoid the entire // cluster from performing a snapshot at once SnapshotInterval time.Duration // SnapshotThreshold controls how many outstanding logs there must be before // we perform a snapshot. This is to prevent excessive snapshots when we can // just replay a small set of logs. SnapshotThreshold uint64 // EnableSingleNode allows for a single node mode of operation. This // is false by default, which prevents a lone node from electing itself // leader. EnableSingleNode bool // LeaderLeaseTimeout is used to control how long the "lease" lasts // for being the leader without being able to contact a quorum // of nodes. If we reach this interval without contact, we will // step down as leader. LeaderLeaseTimeout time.Duration // LogOutput is used as a sink for logs, unless Logger is specified. // Defaults to os.Stderr. LogOutput io.Writer // Logger is a user-provided logger. If nil, a logger writing to LogOutput // is used. Logger *log.Logger }
Config provides any necessary configuration to the Raft server
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with usable defaults.
type FSM ¶
type FSM interface { // Apply log is invoked once a log entry is commited Apply(*Log) interface{} // Snapshot is used to support log compaction. This call should // return an FSMSnapshot which can be used to save a point-in-time // snapshot of the FSM. Apply and Snapshot are not called in multiple // threads, but Apply will be called concurrently with Persist. This means // the FSM should be implemented in a fashion that allows for concurrent // updates while a snapshot is happening. Snapshot() (FSMSnapshot, error) // Restore is used to restore an FSM from a snapshot. It is not called // concurrently with any other command. The FSM must discard all previous // state. Restore(io.ReadCloser) error }
FSM provides an interface that can be implemented by clients to make use of the replicated log
type FSMSnapshot ¶
type FSMSnapshot interface { // Persist should dump all necessary state to the WriteCloser 'sink', // and call sink.Close() when finished or call sink.Cancel() on error. Persist(sink SnapshotSink) error // Release is invoked when we are finished with the snapshot Release() }
FSMSnapshot is returned by an FSM in response to a Snapshot It must be safe to invoke FSMSnapshot methods with concurrent calls to Apply
type FileSnapshotSink ¶
type FileSnapshotSink struct {
// contains filtered or unexported fields
}
FileSnapshotSink implements SnapshotSink with a file.
func (*FileSnapshotSink) Cancel ¶
func (s *FileSnapshotSink) Cancel() error
Cancel is used to indicate an unsuccessful end
func (*FileSnapshotSink) Close ¶
func (s *FileSnapshotSink) Close() error
Close is used to indicate a successful end
func (*FileSnapshotSink) ID ¶
func (s *FileSnapshotSink) ID() string
ID returns the ID of the snapshot, can be used with Open() after the snapshot is finalized.
type FileSnapshotStore ¶
type FileSnapshotStore struct {
// contains filtered or unexported fields
}
FileSnapshotStore implements the SnapshotStore interface and allows snapshots to be made on the local disk.
func NewFileSnapshotStore ¶
NewFileSnapshotStore creates a new FileSnapshotStore based on a base directory. The `retain` parameter controls how many snapshots are retained. Must be at least 1.
func (*FileSnapshotStore) Create ¶
func (f *FileSnapshotStore) Create(index, term uint64, peers []byte) (SnapshotSink, error)
Create is used to start a new snapshot
func (*FileSnapshotStore) List ¶
func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error)
List returns available snapshots in the store.
func (*FileSnapshotStore) Open ¶
func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error)
Open takes a snapshot ID and returns a ReadCloser for that snapshot.
func (*FileSnapshotStore) ReapSnapshots ¶
func (f *FileSnapshotStore) ReapSnapshots() error
ReapSnapshots reaps any snapshots beyond the retain count.
type Future ¶
type Future interface {
Error() error
}
Future is used to represent an action that may occur in the future
type InmemAddr ¶
type InmemAddr struct {
ID string
}
InmemAddr implements the net.Addr interface.
func NewInmemAddr ¶
func NewInmemAddr() *InmemAddr
NewInmemAddr returns a new in-memory addr with a randomly generate UUID as the ID
type InmemStore ¶
type InmemStore struct {
// contains filtered or unexported fields
}
InmemStore implements the LogStore and StableStore interface. It should NOT EVER be used for production. It is used only for unit tests. Use the MDBStore implementation instead.
func NewInmemStore ¶
func NewInmemStore() *InmemStore
NewInmemStore returns a new inmemory backend. Do not ever use for production. Only for testing.
func (*InmemStore) DeleteRange ¶
func (i *InmemStore) DeleteRange(min, max uint64) error
DeleteRange implements the LogStore interface.
func (*InmemStore) FirstIndex ¶
func (i *InmemStore) FirstIndex() (uint64, error)
FirstIndex implements the LogStore interface.
func (*InmemStore) Get ¶
func (i *InmemStore) Get(key []byte) ([]byte, error)
Get implements the StableStore interface.
func (*InmemStore) GetLog ¶
func (i *InmemStore) GetLog(index uint64, log *Log) error
GetLog implements the LogStore interface.
func (*InmemStore) GetUint64 ¶
func (i *InmemStore) GetUint64(key []byte) (uint64, error)
GetUint64 implements the StableStore interface.
func (*InmemStore) LastIndex ¶
func (i *InmemStore) LastIndex() (uint64, error)
LastIndex implements the LogStore interface.
func (*InmemStore) Set ¶
func (i *InmemStore) Set(key []byte, val []byte) error
Set implements the StableStore interface.
func (*InmemStore) SetUint64 ¶
func (i *InmemStore) SetUint64(key []byte, val uint64) error
SetUint64 implements the StableStore interface.
func (*InmemStore) StoreLog ¶
func (i *InmemStore) StoreLog(log *Log) error
StoreLog implements the LogStore interface.
func (*InmemStore) StoreLogs ¶
func (i *InmemStore) StoreLogs(logs []*Log) error
StoreLogs implements the LogStore interface.
type InmemTransport ¶
InmemTransport Implements the Transport interface, to allow Raft to be tested in-memory without going over a network.
func (*InmemTransport) AppendEntries ¶
func (i *InmemTransport) AppendEntries(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
AppendEntries implements the Transport interface.
func (*InmemTransport) AppendEntriesPipeline ¶
func (i *InmemTransport) AppendEntriesPipeline(target net.Addr) (AppendPipeline, error)
AppendEntriesPipeline returns an interface that can be used to pipeline AppendEntries requests.
func (*InmemTransport) Connect ¶
func (i *InmemTransport) Connect(peer net.Addr, trans *InmemTransport)
Connect is used to connect this transport to another transport for a given peer name. This allows for local routing.
func (*InmemTransport) Consumer ¶
func (i *InmemTransport) Consumer() <-chan RPC
Consumer implements the Transport interface.
func (*InmemTransport) DecodePeer ¶
func (i *InmemTransport) DecodePeer(buf []byte) net.Addr
DecodePeer implements the Transport interface. It wraps the UUID in an InmemAddr.
func (*InmemTransport) Disconnect ¶
func (i *InmemTransport) Disconnect(peer net.Addr)
Disconnect is used to remove the ability to route to a given peer.
func (*InmemTransport) DisconnectAll ¶
func (i *InmemTransport) DisconnectAll()
DisconnectAll is used to remove all routes to peers.
func (*InmemTransport) EncodePeer ¶
func (i *InmemTransport) EncodePeer(p net.Addr) []byte
EncodePeer implements the Transport interface. It uses the UUID as the address directly.
func (*InmemTransport) InstallSnapshot ¶
func (i *InmemTransport) InstallSnapshot(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
InstallSnapshot implements the Transport interface.
func (*InmemTransport) LocalAddr ¶
func (i *InmemTransport) LocalAddr() net.Addr
LocalAddr implements the Transport interface.
func (*InmemTransport) RequestVote ¶
func (i *InmemTransport) RequestVote(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error
RequestVote implements the Transport interface.
func (*InmemTransport) SetHeartbeatHandler ¶
func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC))
SetHeartbeatHandler is used to set optional fast-path for heartbeats, not supported for this transport.
type InstallSnapshotRequest ¶
type InstallSnapshotRequest struct { Term uint64 Leader []byte // These are the last index/term included in the snapshot LastLogIndex uint64 LastLogTerm uint64 // Peer Set in the snapshot Peers []byte // Size of the snapshot Size int64 }
InstallSnapshotRequest is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.
type InstallSnapshotResponse ¶
InstallSnapshotResponse is the response returned from an InstallSnapshotRequest.
type JSONPeers ¶
type JSONPeers struct {
// contains filtered or unexported fields
}
JSONPeers is used to provide peer persistence on disk in the form of a JSON file. This allows human operators to manipulate the file.
func NewJSONPeers ¶
NewJSONPeers creates a new JSONPeers store. Requires a transport to handle the serialization of network addresses
type Log ¶
type Log struct { Index uint64 Term uint64 Type LogType Data []byte // contains filtered or unexported fields }
Log entries are replicated to all members of the Raft cluster and form the heart of the replicated state machine.
type LogCache ¶
type LogCache struct {
// contains filtered or unexported fields
}
LogCache wraps any LogStore implementation to provide an in-memory ring buffer. This is used to cache access to the recently written entries. For implementations that do not cache themselves, this can provide a substantial boost by avoiding disk I/O on recent entries.
func NewLogCache ¶
NewLogCache is used to create a new LogCache with the given capacity and backend store.
func (*LogCache) DeleteRange ¶
func (*LogCache) FirstIndex ¶
type LogStore ¶
type LogStore interface { // Returns the first index written. 0 for no entries. FirstIndex() (uint64, error) // Returns the last index written. 0 for no entries. LastIndex() (uint64, error) // Gets a log entry at a given index GetLog(index uint64, log *Log) error // Stores a log entry StoreLog(log *Log) error // Stores multiple log entries StoreLogs(logs []*Log) error // Deletes a range of log entries. The range is inclusive. DeleteRange(min, max uint64) error }
LogStore is used to provide an interface for storing and retrieving logs in a durable fashion
type LogType ¶
type LogType uint8
LogType describes various types of log entries.
const ( // LogCommand is applied to a user FSM. LogCommand LogType = iota // LogNoop is used to assert leadership. LogNoop // LogAddPeer is used to add a new peer. LogAddPeer // LogRemovePeer is used to remove an existing peer. LogRemovePeer // LogBarrier is used to ensure all preceeding operations have been // applied to the FSM. It is similar to LogNoop, but instead of returning // once committed, it only returns once the FSM manager acks it. Otherwise // it is possible there are operations committed but not yet applied to // the FSM. LogBarrier )
type NetworkTransport ¶
type NetworkTransport struct { TimeoutScale int // contains filtered or unexported fields }
NetworkTransport provides a network based transport that can be used to communicate with Raft on remote machines. It requires an underlying stream layer to provide a stream abstraction, which can be simple TCP, TLS, etc. Underlying addresses must be castable to TCPAddr
This transport is very simple and lightweight. Each RPC request is framed by sending a byte that indicates the message type, followed by the MsgPack encoded request.
The response is an error string followed by the response object, both are encoded using MsgPack.
InstallSnapshot is special, in that after the RPC request we stream the entire state. That socket is not re-used as the connection state is not known if there is an error.
func NewNetworkTransport ¶
func NewNetworkTransport( stream StreamLayer, maxPool int, timeout time.Duration, logOutput io.Writer, ) *NetworkTransport
NewNetworkTransport creates a new network transport with the given dialer and listener. The maxPool controls how many connections we will pool. The timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply the timeout by (SnapshotSize / TimeoutScale).
func NewTCPTransport ¶
func NewTCPTransport( bindAddr string, advertise net.Addr, maxPool int, timeout time.Duration, logOutput io.Writer, ) (*NetworkTransport, error)
NewTCPTransport returns a NetworkTransport that is built on top of a TCP streaming transport layer.
func (*NetworkTransport) AppendEntries ¶
func (n *NetworkTransport) AppendEntries(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
AppendEntries implements the Transport interface.
func (*NetworkTransport) AppendEntriesPipeline ¶
func (n *NetworkTransport) AppendEntriesPipeline(target net.Addr) (AppendPipeline, error)
AppendEntriesPipeline returns an interface that can be used to pipeline AppendEntries requests.
func (*NetworkTransport) Close ¶
func (n *NetworkTransport) Close() error
Close is used to stop the network transport
func (*NetworkTransport) Consumer ¶
func (n *NetworkTransport) Consumer() <-chan RPC
Consumer implements the Transport interface.
func (*NetworkTransport) DecodePeer ¶
func (n *NetworkTransport) DecodePeer(buf []byte) net.Addr
DecodePeer implements the Transport interface.
func (*NetworkTransport) EncodePeer ¶
func (n *NetworkTransport) EncodePeer(p net.Addr) []byte
EncodePeer implements the Transport interface.
func (*NetworkTransport) InstallSnapshot ¶
func (n *NetworkTransport) InstallSnapshot(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
InstallSnapshot implements the Transport interface.
func (*NetworkTransport) IsShutdown ¶
func (n *NetworkTransport) IsShutdown() bool
IsShutdown is used to check if the transport is shutdown
func (*NetworkTransport) LocalAddr ¶
func (n *NetworkTransport) LocalAddr() net.Addr
LocalAddr implements the Transport interface.
func (*NetworkTransport) RequestVote ¶
func (n *NetworkTransport) RequestVote(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error
RequestVote implements the Transport interface.
func (*NetworkTransport) SetHeartbeatHandler ¶
func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC))
SetHeartbeatHandler is used to setup a heartbeat handler as a fast-pass. This is to avoid head-of-line blocking from disk IO.
type PeerStore ¶
type PeerStore interface { // Peers returns the list of known peers. Peers() ([]net.Addr, error) // SetPeers sets the list of known peers. This is invoked when a peer is // added or removed. SetPeers([]net.Addr) error }
PeerStore provides an interface for persistent storage and retrieval of peers. We use a seperate interface than StableStore since the peers may need to be editted by a human operator. For example, in a two node cluster, the failure of either node requires human intervention since consensus is impossible.
type RPC ¶
type RPC struct { Command interface{} Reader io.Reader // Set only for InstallSnapshot RespChan chan<- RPCResponse }
RPC has a command, and provides a Reponse mechanism
type RPCResponse ¶
type RPCResponse struct { Response interface{} Error error }
RPCResponse captures both a response and a potential error
type Raft ¶
type Raft struct {
// contains filtered or unexported fields
}
Raft implements a Raft node.
func NewRaft ¶
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, peerStore PeerStore, trans Transport) (*Raft, error)
NewRaft is used to construct a new Raft node. It takes a configuration, as well as implementations of various interfaces that are required. If we have any old state, such as snapshots, logs, peers, etc, all those will be restored when creating the Raft node.
func (*Raft) AddPeer ¶
AddPeer is used to add a new peer into the cluster. This must be run on the leader or it will fail.
func (*Raft) Apply ¶
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture
Apply is used to apply a command to the FSM in a highly consistent manner. This returns a future that can be used to wait on the application. An optional timeout can be provided to limit the amount of time we wait for the command to be started. This must be run on the leader or it will fail.
func (*Raft) Barrier ¶
Barrier is used to issue a command that blocks until all preceeding operations have been applied to the FSM. It can be used to ensure the FSM reflects all queued writes. An optional timeout can be provided to limit the amount of time we wait for the command to be started. This must be run on the leader or it will fail.
func (*Raft) LastContact ¶
LastContact returns the time of last contact by a leader. This only makes sense if we are currently a follower.
func (*Raft) LastIndex ¶
LastIndex returns the last index in stable storage. Either from the last log or from the last snapshot.
func (*Raft) Leader ¶
Leader is used to return the current leader of the cluster, it may return nil if there is no current leader or the leader is unknown
func (*Raft) LeaderCh ¶
LeaderCh is used to get a channel which delivers signals on acquiring or losing leadership. It sends true if we become the leader, and false if we lose it. The channel is not buffered, and does not block on writes.
func (*Raft) RemovePeer ¶
RemovePeer is used to remove a peer from the cluster. If the current leader is being removed, it will cause a new election to occur. This must be run on the leader or it will fail.
func (*Raft) SetPeers ¶
SetPeers is used to forcebly replace the set of internal peers and the peerstore with the ones specified. This can be considered unsafe.
func (*Raft) Shutdown ¶
Shutdown is used to stop the Raft background routines. This is not a graceful operation. Provides a future that can be used to block until all background routines have exited.
func (*Raft) Snapshot ¶
Snapshot is used to manually force Raft to take a snapshot Returns a future that can be used to block until complete.
func (*Raft) Stats ¶
Stats is used to return a map of various internal stats. This should only be used for informative purposes or debugging
func (*Raft) VerifyLeader ¶
VerifyLeader is used to ensure the current node is still the leader. This can be done to prevent stale reads when a new leader has potentially been elected.
type RaftState ¶
type RaftState uint32
RaftState captures the state of a Raft node: Follower, Candidate, Leader, or Shutdown.
type RequestVoteRequest ¶
type RequestVoteRequest struct { // Provide the term and our id Term uint64 Candidate []byte // Used to ensure safety LastLogIndex uint64 LastLogTerm uint64 }
RequestVoteRequest is the command used by a candidate to ask a Raft peer for a vote in an election.
type RequestVoteResponse ¶
type RequestVoteResponse struct { // Newer term if leader is out of date Term uint64 // Return the peers, so that a node can shutdown on removal Peers []byte // Is the vote granted Granted bool }
RequestVoteResponse is the response returned from a RequestVoteRequest.
type SnapshotMeta ¶
type SnapshotMeta struct { ID string // ID is opaque to the store, and is used for opening Index uint64 Term uint64 Peers []byte Size int64 }
SnapshotMeta is for meta data of a snaphot.
type SnapshotSink ¶
type SnapshotSink interface { io.WriteCloser ID() string Cancel() error }
SnapshotSink is returned by StartSnapshot. The FSM will Write state to the sink and call Close on completion. On error, Cancel will be invoked
type SnapshotStore ¶
type SnapshotStore interface { // Create is used to begin a snapshot at a given index and term, // with the current peer set already encoded Create(index, term uint64, peers []byte) (SnapshotSink, error) // List is used to list the available snapshots in the store. // It should return then in descending order, with the highest index first. List() ([]*SnapshotMeta, error) // Open takes a snapshot ID and provides a ReadCloser. Once close is // called it is assumed the snapshot is no longer needed. Open(id string) (*SnapshotMeta, io.ReadCloser, error) }
SnapshotStore interface is used to allow for flexible implementations of snapshot storage and retrieval. For example, a client could implement a shared state store such as S3, allowing new nodes to restore snapshots without steaming from the leader.
type StableStore ¶
type StableStore interface { Set(key []byte, val []byte) error // Get returns the value for key, or an empty byte slice if key was not found. Get(key []byte) ([]byte, error) SetUint64(key []byte, val uint64) error // GetUint64 returns the uint64 value for key, or 0 if key was not found. GetUint64(key []byte) (uint64, error) }
StableStore is used to provide stable storage of key configurations to ensure safety.
type StaticPeers ¶
StaticPeers is used to provide a static list of peers.
type StreamLayer ¶
type StreamLayer interface { net.Listener // Dial is used to create a new outgoing connection Dial(address string, timeout time.Duration) (net.Conn, error) }
StreamLayer is used with the NetworkTransport to provide the low level stream abstraction
type TCPStreamLayer ¶
type TCPStreamLayer struct {
// contains filtered or unexported fields
}
TCPStreamLayer implements StreamLayer interface for plain TCP.
func (*TCPStreamLayer) Accept ¶
func (t *TCPStreamLayer) Accept() (c net.Conn, err error)
Accept implements the net.Listener interface.
func (*TCPStreamLayer) Addr ¶
func (t *TCPStreamLayer) Addr() net.Addr
Addr implements the net.Listener interface.
func (*TCPStreamLayer) Close ¶
func (t *TCPStreamLayer) Close() (err error)
Close implements the net.Listener interface.
type Transport ¶
type Transport interface { // Consumer returns a channel that can be used to // consume and respond to RPC requests. Consumer() <-chan RPC // LocalAddr is used to return our local address to distinguish from our peers LocalAddr() net.Addr // AppendEntriesPipeline returns an interface that can be used to pipeline // AppendEntries requests. AppendEntriesPipeline(target net.Addr) (AppendPipeline, error) // AppendEntries sends the appropriate RPC to the target node AppendEntries(target net.Addr, args *AppendEntriesRequest, resp *AppendEntriesResponse) error // RequestVote sends the appropriate RPC to the target node RequestVote(target net.Addr, args *RequestVoteRequest, resp *RequestVoteResponse) error // InstallSnapshot is used to push a snapshot down to a follower. The data is read from // the ReadCloser and streamed to the client. InstallSnapshot(target net.Addr, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error // EncodePeer is used to serialize a peer name EncodePeer(net.Addr) []byte // DecodePeer is used to deserialize a peer name DecodePeer([]byte) net.Addr // SetHeartbeatHandler is used to setup a heartbeat handler // as a fast-pass. This is to avoid head-of-line blocking from // disk IO. If a Transport does not support this, it can simply // ignore the call, and push the heartbeat onto the Consumer channel. SetHeartbeatHandler(cb func(rpc RPC)) }
Transport provides an interface for network transports to allow Raft to communicate with other nodes