README ¶
go-raft
Overview
This is a Go implementation of the Raft distributed consensus protocol. Raft is a protocol by which a cluster of nodes can maintain a replicated state machine. The state machine is kept in sync through the use of a replicated log.
For more details on Raft, you can read In Search of an Understandable Consensus Algorithm by Diego Ongaro and John Ousterhout.
Project Status
This library is feature complete but should be considered experimental until it has seen more usage. If you have any questions on implementing go-raft in your project please file an issue. There is an active community of developers who can help. go-raft is under the MIT license.
Features
- Leader election
- Log replication
- Configuration changes
- Log compaction
- Unit tests
- Fast Protobuf Log Encoding
- HTTP transport
Projects
These projects are built on go-raft:
- coreos/etcd - A highly-available key value store for shared configuration and service discovery.
- goraft/raftd - A reference implementation for using the go-raft library for distributed consensus.
- skynetservices/skydns - DNS for skynet or any other service discovery.
- influxdb/influxdb - An open-source, distributed, time series, events, and metrics database.
- Weed File System - A scalable distributed key-to-file system with O(1) disk access for each read.
If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples.
Contact and Resources
- raft-dev is a mailing list for discussion about best practices and implementation of Raft. Not goraft specific but helpful if you have questions.
- Slides from Ben's talk which includes easy to understand diagrams of leader election and replication
- The Raft Consensus homepage has links to additional raft implementations, slides to talks on Raft and general information
The Raft Protocol
This section provides a summary of the Raft protocol from a high level. For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: In Search of an Understandable Consensus Algorithm.
Overview
Maintaining state in a single process on a single server is easy. Your process is a single point of authority so there are no conflicts when reading and writing state. Even multi-threaded processes can rely on locks or coroutines to serialize access to the data.
However, in a distributed system there is no single point of authority. Servers can crash or the network between two machines can become unavailable or any number of other problems can occur.
A distributed consensus protocol is used for maintaining a consistent state across multiple servers in a cluster. Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation.
An alternative is the Raft distributed consensus protocol by Diego Ongaro and John Ousterhout. Raft is a protocol built with understandability as a primary tenet and it centers around two things:
- Leader Election
- Replicated Log
With these two constructs, you can build a system that can maintain state across multiple servers -- even in the event of multiple failures.
Leader Election
The Raft protocol effectively works as a master-slave system whereby state changes are written to a single server in the cluster and are distributed out to the rest of the servers in the cluster. This simplifies the protocol since there is only one data authority and conflicts will not have to be resolved.
Raft ensures that there is only one leader at a time. It does this by performing elections among the nodes in the cluster and requiring that a node must receive a majority of the votes in order to become leader. For example, if you have 3 nodes in your cluster then a single node would need 2 votes in order to become the leader. For a 5 node cluster, a server would need 3 votes to become leader.
Replicated Log
To maintain state, a log of commands is maintained. Each command makes a change to the state of the server and the command is deterministic. By ensuring that this log is replicated identically between all the nodes in the cluster we can replicate the state at any point in time in the log by running each command sequentially.
Replicating the log under normal conditions is done by sending an AppendEntries
RPC from the leader to each of the other servers in the cluster (called Peers).
Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log.
Raft in Practice
Optimal Cluster Size
The primary consideration when choosing the node count in your Raft cluster is the number of nodes that can simultaneously fail.
Because Raft requires a majority of nodes to be available to make progress, the number of node failures the cluster can tolerate is (n / 2) - 1
.
This means that a 3-node cluster can tolerate 1 node failure. If 2 nodes fail then the cluster cannot commit entries or elect a new leader so progress stops. A 5-node cluster can tolerate 2 node failures. A 9-node cluster can tolerate 4 node failures. It is unlikely that 4 nodes will simultaneously fail so clusters larger than 9 nodes are not common.
Another consideration is performance. The leader must replicate log entries for each follower node so CPU and networking resources can quickly be bottlenecked under stress in a large cluster.
Scaling Raft
Once you grow beyond the maximum size of your cluster there are a few options for scaling Raft:
-
Core nodes with dumb replication. This option requires you to maintain a small cluster (e.g. 5 nodes) that is involved in the Raft process and then replicate only committed log entries to the remaining nodes in the cluster. This works well if you have reads in your system that can be stale.
-
Sharding. This option requires that you segment your data into different clusters. This option works well if you need very strong consistency and therefore need to read and write heavily from the leader.
If you have a very large cluster that you need to replicate to using Option 1 then you may want to look at performing hierarchical replication so that nodes can better share the load.
History
Ben Johnson started this library for use in his behavioral analytics database called Sky. He put it under the MIT license in the hopes that it would be useful for other projects too.
Documentation ¶
Index ¶
- Constants
- Variables
- func LogLevel() int
- func RegisterCommand(command Command)
- func SetLogLevel(level int)
- type AppendEntriesRequest
- type AppendEntriesResponse
- func (aer *AppendEntriesResponse) CommitIndex() uint64
- func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error)
- func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error)
- func (aer *AppendEntriesResponse) Index() uint64
- func (aer *AppendEntriesResponse) Success() bool
- func (aer *AppendEntriesResponse) Term() uint64
- type Command
- type CommandApply
- type CommandEncoder
- type Config
- type Context
- type DefaultJoinCommand
- type DefaultLeaveCommand
- type Event
- type EventListener
- type HTTPMuxer
- type HTTPTransporter
- func (t *HTTPTransporter) AppendEntriesPath() string
- func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer)
- func (t *HTTPTransporter) Prefix() string
- func (t *HTTPTransporter) RequestVotePath() string
- func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
- func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
- func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
- func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
- func (t *HTTPTransporter) SnapshotPath() string
- func (t *HTTPTransporter) SnapshotRecoveryPath() string
- type JoinCommand
- type LeaveCommand
- type Log
- type LogEntry
- type NOPCommand
- type Peer
- type RequestVoteRequest
- type RequestVoteResponse
- type Server
- type Snapshot
- type SnapshotRecoveryRequest
- type SnapshotRecoveryResponse
- type SnapshotRequest
- type SnapshotResponse
- type StateMachine
- type Transporter
Constants ¶
const ( Debug = 1 Trace = 2 )
const ( StateChangeEventType = "stateChange" LeaderChangeEventType = "leaderChange" TermChangeEventType = "termChange" CommitEventType = "commit" AddPeerEventType = "addPeer" RemovePeerEventType = "removePeer" RemovedEventType = "removed" HeartbeatIntervalEventType = "heartbeatInterval" ElectionTimeoutThresholdEventType = "electionTimeoutThreshold" HeartbeatEventType = "heartbeat" )
const ( Stopped = "stopped" Initialized = "initialized" Follower = "follower" Candidate = "candidate" Leader = "leader" Snapshotting = "snapshotting" )
const ( MaxLogEntriesPerRequest = 2000 NumberOfLogEntriesAfterSnapshot = 200 )
const ( // DefaultHeartbeatInterval is the interval that the leader will send // AppendEntriesRequests to followers to maintain leadership. DefaultHeartbeatInterval = 50 * time.Millisecond DefaultElectionTimeout = 150 * time.Millisecond )
const ElectionTimeoutThresholdPercent = 0.8
ElectionTimeoutThresholdPercent specifies the threshold at which the server will dispatch warning events that the heartbeat RTT is too close to the election timeout.
Variables ¶
var CommandTimeoutError = errors.New("raft: Command timeout")
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
var NotLeaderError = errors.New("raft.Server: Not current leader")
var StopError = errors.New("raft: Has been stopped")
Functions ¶
func RegisterCommand ¶
func RegisterCommand(command Command)
Registers a command by storing a reference to an instance of it.
func SetLogLevel ¶
func SetLogLevel(level int)
Types ¶
type AppendEntriesRequest ¶
type AppendEntriesRequest struct { Term uint64 PrevLogIndex uint64 PrevLogTerm uint64 CommitIndex uint64 LeaderName string Entries []*protobuf.LogEntry }
The request sent to a server to append entries to the log.
type AppendEntriesResponse ¶
type AppendEntriesResponse struct {
// contains filtered or unexported fields
}
The response returned from a server appending entries to the log.
func (*AppendEntriesResponse) CommitIndex ¶
func (aer *AppendEntriesResponse) CommitIndex() uint64
func (*AppendEntriesResponse) Decode ¶
func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error)
Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and any error that occurs.
func (*AppendEntriesResponse) Encode ¶
func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error)
Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes written and any error that may have occurred.
func (*AppendEntriesResponse) Index ¶
func (aer *AppendEntriesResponse) Index() uint64
func (*AppendEntriesResponse) Success ¶
func (aer *AppendEntriesResponse) Success() bool
func (*AppendEntriesResponse) Term ¶
func (aer *AppendEntriesResponse) Term() uint64
type Command ¶
type Command interface {
CommandName() string
}
Command represents an action to be taken on the replicated state machine.
type CommandApply ¶
CommandApply represents the interface to apply a command to the server.
type CommandEncoder ¶
type Context ¶
type Context interface { Server() Server CurrentTerm() uint64 CurrentIndex() uint64 CommitIndex() uint64 }
Context represents the current state of the server. It is passed into a command when the command is being applied since the server methods are locked.
type DefaultJoinCommand ¶
type DefaultJoinCommand struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` }
Join command
func (*DefaultJoinCommand) Apply ¶
func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error)
func (*DefaultJoinCommand) CommandName ¶
func (c *DefaultJoinCommand) CommandName() string
The name of the Join command in the log
func (*DefaultJoinCommand) NodeName ¶
func (c *DefaultJoinCommand) NodeName() string
type DefaultLeaveCommand ¶
type DefaultLeaveCommand struct {
Name string `json:"name"`
}
Leave command
func (*DefaultLeaveCommand) Apply ¶
func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error)
func (*DefaultLeaveCommand) CommandName ¶
func (c *DefaultLeaveCommand) CommandName() string
The name of the Leave command in the log
func (*DefaultLeaveCommand) NodeName ¶
func (c *DefaultLeaveCommand) NodeName() string
type Event ¶
type Event interface { Type() string Source() interface{} Value() interface{} PrevValue() interface{} }
Event represents an action that occurred within the Raft library. Listeners can subscribe to event types by using the Server.AddEventListener() function.
type EventListener ¶
type EventListener func(Event)
EventListener is a function that can receive event notifications.
type HTTPMuxer ¶
type HTTPMuxer interface {
HandleFunc(string, func(http.ResponseWriter, *http.Request))
}
type HTTPTransporter ¶
type HTTPTransporter struct { DisableKeepAlives bool Transport *http.Transport // contains filtered or unexported fields }
An HTTPTransporter is a default transport layer used to communicate between multiple servers.
func NewHTTPTransporter ¶
func NewHTTPTransporter(prefix string, timeout time.Duration) *HTTPTransporter
Creates a new HTTP transporter with the given path prefix.
func (*HTTPTransporter) AppendEntriesPath ¶
func (t *HTTPTransporter) AppendEntriesPath() string
Retrieves the AppendEntries path.
func (*HTTPTransporter) Install ¶
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer)
Applies Raft routes to an HTTP router for a given server.
func (*HTTPTransporter) Prefix ¶
func (t *HTTPTransporter) Prefix() string
Retrieves the path prefix used by the transporter.
func (*HTTPTransporter) RequestVotePath ¶
func (t *HTTPTransporter) RequestVotePath() string
Retrieves the RequestVote path.
func (*HTTPTransporter) SendAppendEntriesRequest ¶
func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
Sends an AppendEntries RPC to a peer.
func (*HTTPTransporter) SendSnapshotRecoveryRequest ¶
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
Sends a SnapshotRequest RPC to a peer.
func (*HTTPTransporter) SendSnapshotRequest ¶
func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
Sends a SnapshotRequest RPC to a peer.
func (*HTTPTransporter) SendVoteRequest ¶
func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
Sends a RequestVote RPC to a peer.
func (*HTTPTransporter) SnapshotPath ¶
func (t *HTTPTransporter) SnapshotPath() string
Retrieves the Snapshot path.
func (*HTTPTransporter) SnapshotRecoveryPath ¶
func (t *HTTPTransporter) SnapshotRecoveryPath() string
Retrieves the SnapshotRecovery path.
type LeaveCommand ¶
Leave command interface
type Log ¶
type Log struct { ApplyFunc func(*LogEntry, Command) (interface{}, error) // contains filtered or unexported fields }
A log is a collection of log entries that are persisted to durable storage.
type LogEntry ¶
type LogEntry struct { Position int64 // position in the log file // contains filtered or unexported fields }
A log entry stores a single item in the log.
func (*LogEntry) CommandName ¶
func (*LogEntry) Decode ¶
Decodes the log entry from a buffer. Returns the number of bytes read and any error that occurs.
type NOPCommand ¶
type NOPCommand struct { }
NOP command
func (NOPCommand) Apply ¶
func (c NOPCommand) Apply(server Server) (interface{}, error)
func (NOPCommand) CommandName ¶
func (c NOPCommand) CommandName() string
The name of the NOP command in the log
type Peer ¶
type Peer struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` sync.RWMutex // contains filtered or unexported fields }
A peer is a reference to another server involved in the consensus protocol.
func (*Peer) LastActivity ¶
LastActivity returns the last time any response was received from the peer.
type RequestVoteRequest ¶
type RequestVoteRequest struct { Term uint64 LastLogIndex uint64 LastLogTerm uint64 CandidateName string // contains filtered or unexported fields }
The request sent to a server to vote for a candidate to become a leader.
type RequestVoteResponse ¶
type RequestVoteResponse struct { Term uint64 VoteGranted bool // contains filtered or unexported fields }
The response returned from a server after a vote for a candidate to become a leader.
type Server ¶
type Server interface { Name() string Context() interface{} StateMachine() StateMachine Leader() string State() string Path() string LogPath() string SnapshotPath(lastIndex uint64, lastTerm uint64) string Term() uint64 CommitIndex() uint64 VotedFor() string MemberCount() int QuorumSize() int IsLogEmpty() bool LogEntries() []*LogEntry LastCommandName() string GetState() string ElectionTimeout() time.Duration SetElectionTimeout(duration time.Duration) HeartbeatInterval() time.Duration SetHeartbeatInterval(duration time.Duration) Transporter() Transporter SetTransporter(t Transporter) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse RequestVote(req *RequestVoteRequest) *RequestVoteResponse RequestSnapshot(req *SnapshotRequest) *SnapshotResponse SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse AddPeer(name string, connectiongString string) error RemovePeer(name string) error Peers() map[string]*Peer Init() error Start() error Stop() Running() bool Do(command Command) (interface{}, error) TakeSnapshot() error LoadSnapshot() error AddEventListener(string, EventListener) FlushCommitIndex() }
A server is involved in the consensus protocol and can act as a follower, candidate or a leader.
func NewServer ¶
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, ctx interface{}, connectionString string) (Server, error)
Creates a new server with a log at the given path. transporter must not be nil. stateMachine can be nil if snapshotting and log compaction is to be disabled. context can be anything (including nil) and is not used by the raft package except returned by Server.Context(). connectionString can be anything.
type Snapshot ¶
type Snapshot struct { LastIndex uint64 `json:"lastIndex"` LastTerm uint64 `json:"lastTerm"` // Cluster configuration. Peers []*Peer `json:"peers"` State []byte `json:"state"` Path string `json:"path"` }
Snapshot represents an in-memory representation of the current state of the system.
type SnapshotRecoveryRequest ¶
type SnapshotRecoveryRequest struct { LeaderName string LastIndex uint64 LastTerm uint64 Peers []*Peer State []byte }
The request sent to a server to start from the snapshot.
type SnapshotRecoveryResponse ¶
The response returned from a server appending entries to the log.
type SnapshotRequest ¶
The request sent to a server to start from the snapshot.
type SnapshotResponse ¶
type SnapshotResponse struct {
Success bool `json:"success"`
}
The response returned if the follower entered snapshot state
type StateMachine ¶
StateMachine is the interface for allowing the host application to save and recovery the state machine. This makes it possible to make snapshots and compact the log.
type Transporter ¶
type Transporter interface { SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse }
Transporter is the interface for allowing the host application to transport requests to other nodes.