Documentation ¶
Overview ¶
Package raft implements a streaming version of the Raft protocol.
The Raft distributed consensus protocol is used for maintaining consistency in a distributed system. It implements a replicated log between multiple nodes. This log is a series of entries that represent commands to be executed on the node's state. By executing the same commands on every node in the same order, the state of each node is consistent.
For a more detailed understanding of the Raft protocol, please see the original Raft paper by Diego Ongaro:
https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf
Streaming Raft ¶
The standard implementation of Raft implements a two RPC calls between the leader and the followers: AppendEntries & RequestVote. The RequestVote is the same as in the standard implementation. The AppendEntries RPC is split into two calls: a streaming log and a heartbeat.
One issue with the original Raft implementation is that AppendEntries calls were required to wait between calls. This would cause a delay of a few milliseconds but would ultimitely limit the throughput that Raft could acheive. This can be mitigated by pipelining requests so they don't have to wait for one another. Pipelining helps significantly but there is still overhead in tracking each request and there's a limit on outstanding requests.
Another issue with AppendEntries is that it combines the log replication with the heartbeat mechanism. This is beneficial in that it causes log replication issues such as disk write errors to cause heartbeat errors but it also has the problem that replicating too many log entries will cause a heartbeat to be missed.
Streaming Raft tries to mitigate by separating log replication from heartbeats. When a node becomes a leader, it begins sending heartbeats to establish dominance. When a follower node recognizes a new leader, it connects to the leader using a long-running connection that simply streams the log from where the follower left off. The handshake process required by the standard Raft implementation on every AppendEntries request is now handled once upon connection.
Index ¶
- Constants
- Variables
- type Clock
- type Config
- type ConfigDecoder
- type ConfigEncoder
- type ConfigNode
- type FSM
- type HTTPTransport
- func (t *HTTPTransport) Heartbeat(uri url.URL, term, commitIndex, leaderID uint64) (uint64, error)
- func (t *HTTPTransport) Join(uri url.URL, nodeURL url.URL) (uint64, uint64, *Config, error)
- func (t *HTTPTransport) Leave(uri url.URL, id uint64) error
- func (t *HTTPTransport) ReadFrom(uri url.URL, id, term, index uint64) (io.ReadCloser, error)
- func (t *HTTPTransport) RequestVote(uri url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error)
- type Handler
- type Log
- func (l *Log) AddPeer(u url.URL) (uint64, uint64, *Config, error)
- func (l *Log) Apply(command []byte) (uint64, error)
- func (l *Log) Close() error
- func (l *Log) ClusterID() uint64
- func (l *Log) CommitIndex() uint64
- func (l *Log) Config() *Config
- func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error)
- func (l *Log) ID() uint64
- func (l *Log) Initialize() error
- func (l *Log) IsLeader() bool
- func (l *Log) Join(u url.URL) error
- func (l *Log) LastLogIndexTerm() (index, term uint64)
- func (l *Log) Leader() (id uint64, u url.URL)
- func (l *Log) Leave() error
- func (l *Log) Open(path string) error
- func (l *Log) Opened() bool
- func (l *Log) Path() string
- func (l *Log) ReadFrom(r io.ReadCloser) error
- func (l *Log) RemovePeer(id uint64) error
- func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error)
- func (l *Log) SetURL(u url.URL)
- func (l *Log) State() State
- func (l *Log) Term() uint64
- func (l *Log) URL() url.URL
- func (l *Log) URLs() []url.URL
- func (l *Log) Wait(idx uint64) error
- func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error
- type LogEntry
- type LogEntryDecoder
- type LogEntryEncoder
- type LogEntryType
- type State
Constants ¶
const ( // DefaultApplyInterval is the default time between checks to apply commands. DefaultApplyInterval = 10 * time.Millisecond // DefaultElectionTimeout is the default time before starting an election. DefaultElectionTimeout = 5 * time.Second // DefaultHeartbeatInterval is the default time to wait between heartbeats. DefaultHeartbeatInterval = 100 * time.Millisecond // DefaultReconnectTimeout is the default time to wait before reconnecting. DefaultReconnectTimeout = 10 * time.Millisecond )
const (
// DefaultLogEntryCacheSize is the default number of entries to keep before trimming.
DefaultLogEntryCacheSize = 1000
)
const WaitInterval = 1 * time.Millisecond
WaitInterval represents the amount of time between checks to the applied index. This is used by clients wanting to wait until a given index is processed.
Variables ¶
var ( // ErrClosed is returned when the log is closed. ErrClosed = errors.New("log closed") // ErrOpen is returned when opening a log that is already open. ErrOpen = errors.New("log already open") // ErrInitialized is returned when initializing a log that is already a // member of a cluster. ErrInitialized = errors.New("log already initialized") // ErrURLRequired is returned when opening a log without a URL set. ErrURLRequired = errors.New("url required") // ErrLogExists is returned when initializing an already existing log. ErrLogExists = errors.New("log exists") // ErrNotLeader is returned performing leader operations on a non-leader. ErrNotLeader = errors.New("not leader") // ErrStaleTerm is returned when a term is before the current term. ErrStaleTerm = errors.New("stale term") // ErrOutOfDateLog is returned when a candidate's log is not up to date. ErrOutOfDateLog = errors.New("out of date log") // ErrAlreadyVoted is returned when a vote has already been cast for // a different candidate in the same election term. ErrAlreadyVoted = errors.New("already voted") // ErrNodeNotFound is returned when referencing a non-existent node. ErrNodeNotFound = errors.New("node not found") // ErrInvalidNodeID is returned when using a node id of zero. ErrInvalidNodeID = errors.New("invalid node id") // ErrNodeURLRequired is returned a node config has no URL set. ErrNodeURLRequired = errors.New("node url required") // ErrDuplicateNodeID is returned when adding a node with an existing id. ErrDuplicateNodeID = errors.New("duplicate node id") // ErrDuplicateNodeURL is returned when adding a node with an existing URL. ErrDuplicateNodeURL = errors.New("duplicate node url") // ErrSnapshotRequired returned when reading from an out-of-order log. // The snapshot will be retrieved on the next reader request. ErrSnapshotRequired = errors.New("snapshot required") // ErrSnapshotting is returned when an action cannot be performed because // the log is in the middle of a snapshot. ErrSnapshotting = errors.New("snapshotting") )
Functions ¶
This section is empty.
Types ¶
type Clock ¶
type Clock struct { ApplyInterval time.Duration ElectionTimeout time.Duration HeartbeatInterval time.Duration ReconnectTimeout time.Duration }
Clock implements an interface to the real-time clock.
func (*Clock) AfterApplyInterval ¶
func (c *Clock) AfterApplyInterval() <-chan chan struct{}
AfterApplyInterval returns a channel that fires after the apply interval.
func (*Clock) AfterElectionTimeout ¶
func (c *Clock) AfterElectionTimeout() <-chan chan struct{}
AfterElectionTimeout returns a channel that fires after a duration that is between the election timeout and double the election timeout.
func (*Clock) AfterHeartbeatInterval ¶
func (c *Clock) AfterHeartbeatInterval() <-chan chan struct{}
AfterHeartbeatInterval returns a channel that fires after the heartbeat interval.
func (*Clock) AfterReconnectTimeout ¶
func (c *Clock) AfterReconnectTimeout() <-chan chan struct{}
AfterReconnectTimeout returns a channel that fires after the reconnection timeout.
type Config ¶
type Config struct { // Cluster identifier. Used to prevent separate clusters from // accidentally communicating with one another. ClusterID uint64 // Index is the last log index when the configuration was updated. Index uint64 // MaxNodeID is the largest node identifier generated for this config. MaxNodeID uint64 // List of nodes in the cluster. Nodes []*ConfigNode }
Config represents the configuration for the log.
func (*Config) NodeByID ¶
func (c *Config) NodeByID(id uint64) *ConfigNode
NodeByID returns a node by identifier.
func (*Config) NodeByURL ¶
func (c *Config) NodeByURL(u url.URL) *ConfigNode
NodeByURL returns a node by URL.
func (*Config) RemoveNode ¶
RemoveNode removes a node by id. Returns ErrNodeNotFound if the node does not exist.
type ConfigDecoder ¶
type ConfigDecoder struct {
// contains filtered or unexported fields
}
ConfigDecoder decodes a config from a reader.
func NewConfigDecoder ¶
func NewConfigDecoder(r io.Reader) *ConfigDecoder
NewConfigDecoder returns a new instance of ConfigDecoder attached to a reader.
func (*ConfigDecoder) Decode ¶
func (dec *ConfigDecoder) Decode(c *Config) error
Decode marshals the configuration to the decoder's reader.
type ConfigEncoder ¶
type ConfigEncoder struct {
// contains filtered or unexported fields
}
ConfigEncoder encodes a config to a writer.
func NewConfigEncoder ¶
func NewConfigEncoder(w io.Writer) *ConfigEncoder
NewConfigEncoder returns a new instance of ConfigEncoder attached to a writer.
func (*ConfigEncoder) Encode ¶
func (enc *ConfigEncoder) Encode(c *Config) error
Encode marshals the configuration to the encoder's writer.
type ConfigNode ¶
type ConfigNode struct { ID uint64 URL url.URL LastHeartbeatError int64 // the time a heartbeat returned an error HeartbeatErrorCount int64 // the number of times in a row that a heartbeat has failed }
ConfigNode represents a single machine in the raft configuration.
type FSM ¶
type FSM interface { // These implement the snapshot and restore. io.WriterTo io.ReaderFrom // Executes a log entry against the state machine. // Non-repeatable errors such as system and disk errors must panic. Apply(*LogEntry) error // Returns the applied index saved to the state machine. Index() uint64 }
FSM represents the state machine that the log is applied to. The FSM must maintain the highest index that it has seen.
type HTTPTransport ¶
type HTTPTransport struct{}
HTTPTransport represents a transport for sending RPCs over the HTTP protocol.
func (*HTTPTransport) Leave ¶
func (t *HTTPTransport) Leave(uri url.URL, id uint64) error
Leave removes a node from a cluster's membership.
func (*HTTPTransport) ReadFrom ¶
func (t *HTTPTransport) ReadFrom(uri url.URL, id, term, index uint64) (io.ReadCloser, error)
ReadFrom streams the log from a leader.
func (*HTTPTransport) RequestVote ¶
func (t *HTTPTransport) RequestVote(uri url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (uint64, error)
RequestVote requests a vote for a candidate in a given term.
type Handler ¶
type Handler struct { Log interface { AddPeer(u url.URL) (id uint64, leaderID uint64, config *Config, err error) RemovePeer(id uint64) error Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) WriteEntriesTo(w io.Writer, id, term, index uint64) error RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error) } }
Handler represents an HTTP endpoint for Raft to communicate over.
type Log ¶
type Log struct { // The state machine that log entries will be applied to. FSM FSM // LogEntryCacheSize is the minimum number of log entries to keep before // trimming the log. These entries are kept in case a node disconnects // momentarily. Otherwise a reconnecting node would have to resnapshot. LogEntryCacheSize int // The transport used to communicate with other nodes in the cluster. Transport interface { Join(u url.URL, nodeURL url.URL) (id uint64, leaderID uint64, config *Config, err error) Leave(u url.URL, id uint64) error Heartbeat(u url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error) ReadFrom(u url.URL, id, term, index uint64) (io.ReadCloser, error) RequestVote(u url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error) } // Clock is an abstraction of time. Clock interface { Now() time.Time AfterApplyInterval() <-chan chan struct{} AfterElectionTimeout() <-chan chan struct{} AfterHeartbeatInterval() <-chan chan struct{} AfterReconnectTimeout() <-chan chan struct{} } // Rand returns a random number. Rand func() int64 // Sets whether trace messages are logged. DebugEnabled bool // This logs some asynchronous errors that occur within the log. Logger *log.Logger // contains filtered or unexported fields }
Log represents a replicated log of commands based on the Raft protocol.
The log can exist in one of four states that transition based on the following rules:
┌───────────┐ ┌─▶│ Stopped │ │ └───────────┘ │ │ │ ▼ │ ┌───────────┐ ├──│ Follower │◀─┐ │ └───────────┘ │ close │ │ │ log │ ▼ │ │ ┌───────────┐ │ ├──│ Candidate │──┤ higher │ └───────────┘ │ term │ │ │ │ ▼ │ │ ┌───────────┐ │ └──│ Leader │──┘ └───────────┘ - Stopped moves to Follower when initialized or joined. - Follower moves to Candidate after election timeout. - Candidate moves to Leader after a quorum of votes. - Leader or Candidate moves to Follower if higher term seen. - Any state moves to Stopped if log is closed.
func (*Log) AddPeer ¶
AddPeer creates a new peer in the cluster. Returns the new peer's identifier and the current configuration.
func (*Log) Apply ¶
Apply executes a command against the log. This function returns once the command has been committed to the log.
func (*Log) ClusterID ¶
ClusterID returns the identifier for the cluster. Returns zero if the cluster has not been initialized yet.
func (*Log) CommitIndex ¶
CommtIndex returns the highest committed index.
func (*Log) Heartbeat ¶
Heartbeat establishes dominance by the current leader. Returns the current term and highest written log entry index.
func (*Log) Initialize ¶
Initialize a new log. Returns an error if log data already exists.
func (*Log) Join ¶
Join contacts a node in the cluster to request membership. A log cannot join a cluster if it has already been initialized.
func (*Log) LastLogIndexTerm ¶
LastLogIndexTerm returns the last index & term from the log.
func (*Log) Leader ¶
Leader returns the id and URL associated with the current leader. Returns zero if there is no current leader.
func (*Log) Open ¶
Open initializes the log from a path. If the path does not exist then it is created.
func (*Log) Path ¶
Path returns the data path of the Raft log. Returns an empty string if the log is closed.
func (*Log) ReadFrom ¶
func (l *Log) ReadFrom(r io.ReadCloser) error
ReadFrom continually reads log entries from a reader.
func (*Log) RemovePeer ¶
RemovePeer removes an existing peer from the cluster by id.
func (*Log) RequestVote ¶
func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (peerTerm uint64, err error)
RequestVote requests a vote from the log.
type LogEntry ¶
type LogEntry struct { Type LogEntryType Index uint64 Term uint64 Data []byte }
LogEntry represents a single command within the log.
type LogEntryDecoder ¶
type LogEntryDecoder struct {
// contains filtered or unexported fields
}
LogEntryDecoder decodes entries from a reader.
func NewLogEntryDecoder ¶
func NewLogEntryDecoder(r io.Reader) *LogEntryDecoder
NewLogEntryDecoder returns a new instance of the LogEntryDecoder that will decode from a reader.
func (*LogEntryDecoder) Decode ¶
func (dec *LogEntryDecoder) Decode(e *LogEntry) error
Decode reads a log entry from the decoder's reader.
type LogEntryEncoder ¶
type LogEntryEncoder struct {
// contains filtered or unexported fields
}
LogEntryEncoder encodes entries to a writer.
func NewLogEntryEncoder ¶
func NewLogEntryEncoder(w io.Writer) *LogEntryEncoder
NewLogEntryEncoder returns a new instance of the LogEntryEncoder that will encode to a writer.
func (*LogEntryEncoder) Encode ¶
func (enc *LogEntryEncoder) Encode(e *LogEntry) error
Encode writes a log entry to the encoder's writer.
type LogEntryType ¶
type LogEntryType uint8
LogEntryType serves as an internal marker for log entries. Non-command entry types are handled by the library itself.
const ( LogEntryCommand LogEntryType = iota LogEntryNop LogEntryInitialize LogEntryAddPeer LogEntryRemovePeer )