Documentation ¶
Overview ¶
Package raft provides an implementation of the raft consensus algorithm.
Usage ¶
The primary object in raft is a Node. You either start a Node from scratch using raft.StartNode or start a Node from some initial state using raft.RestartNode.
storage := raft.NewMemoryStorage() c := &Config{ ID: 0x01, ElectionTick: 10, HeartbeatTick: 1, Storage: storage, MaxSizePerMsg: 4096, MaxInflightMsgs: 256, } n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
Now that you are holding onto a Node you have a few responsibilities:
First, you must read from the Node.Ready() channel and process the updates it contains. These steps may be performed in parallel, except as noted in step 2.
1. Write HardState, Entries, and Snapshot to persistent storage if they are not empty. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded.
2. Send all Messages to the nodes named in the To field. It is important that no messages be sent until after the latest HardState has been persisted to disk, and all Entries written by any previous Ready batch (Messages may be sent while entries from the same batch are being persisted). If any Message has type MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be large).
3. Apply Snapshot (if any) and CommittedEntries to the state machine. If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange() to apply it to the node. The configuration change may be cancelled at this point by setting the NodeID field to zero before calling ApplyConfChange (but ApplyConfChange must be called one way or the other, and the decision to cancel must be based solely on the state machine and not external information such as the observed health of the node).
4. Call Node.Advance() to signal readiness for the next batch of updates. This may be done at any time after step 1, although all updates must be processed in the order they were returned by Ready.
Second, all persisted log entries must be made available via an implementation of the Storage interface. The provided MemoryStorage type can be used for this (if you repopulate its state upon a restart), or you can supply your own disk-backed implementation.
Third, when you receive a message from another node, pass it to Node.Step:
func recvRaftRPC(ctx context.Context, m raftpb.Message) { n.Step(ctx, m) }
Finally, you need to call Node.Tick() at regular intervals (probably via a time.Ticker). Raft has two important timeouts: heartbeat and the election timeout. However, internally to the raft package time is represented by an abstract "tick".
The total state machine handling loop will look something like this:
for { select { case <-s.Ticker: n.Tick() case rd := <-s.Node.Ready(): saveToStorage(rd.State, rd.Entries, rd.Snapshot) send(rd.Messages) if !raft.IsEmptySnap(rd.Snapshot) { processSnapshot(rd.Snapshot) } for _, entry := range rd.CommittedEntries { process(entry) if entry.Type == raftpb.EntryConfChange { var cc raftpb.ConfChange cc.Unmarshal(entry.Data) s.Node.ApplyConfChange(cc) } s.Node.Advance() case <-s.done: return } }
To propose changes to the state machine from your node take your application data, serialize it into a byte slice and call:
n.Propose(ctx, data)
If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; you may have to re-propose after a timeout.
To add or remove node in a cluster, build ConfChange struct 'cc' and call:
n.ProposeConfChange(ctx, cc)
After config change is committed, some committed entry with type raftpb.EntryConfChange will be returned. You must apply it to node through:
var cc raftpb.ConfChange cc.Unmarshal(data) n.ApplyConfChange(cc)
Note: An ID represents a unique node in a cluster for all time. A given ID MUST be used only once even if the old node has been removed. This means that for example IP addresses make poor node IDs since they may be reused. Node IDs must be non-zero.
Implementation notes ¶
This implementation is up to date with the final Raft thesis (https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our implementation of the membership change protocol differs somewhat from that described in chapter 4. The key invariant that membership changes happen one node at a time is preserved, but in our implementation the membership change takes effect when its entry is applied, not when it is added to the log (so the entry is committed under the old membership instead of the new). This is equivalent in terms of safety, since the old and new configurations are guaranteed to overlap.
To ensure that we do not attempt to commit two membership changes at once by matching log positions (which would be unsafe since they should have different quorum requirements), we simply disallow any proposed membership change while any uncommitted change appears in the leader's log.
This approach introduces a problem when you try to remove a member from a two-member cluster: If one of the members dies before the other one receives the commit of the confchange entry, then the member cannot be removed any more since the cluster cannot make progress. For this reason it is highly recommended to use three or more nodes in every cluster.
Index ¶
- Constants
- Variables
- func DescribeEntry(e pb.Entry, f EntryFormatter) string
- func DescribeMessage(m pb.Message, f EntryFormatter) string
- func IsEmptyHardState(st pb.HardState) bool
- func IsEmptySnap(sp pb.Snapshot) bool
- func IsLocalMsg(m pb.Message) bool
- func IsResponseMsg(m pb.Message) bool
- func SetLogger(l Logger)
- type Config
- type DefaultLogger
- func (l *DefaultLogger) Debug(v ...interface{})
- func (l *DefaultLogger) Debugf(format string, v ...interface{})
- func (l *DefaultLogger) EnableDebug()
- func (l *DefaultLogger) EnableTimestamps()
- func (l *DefaultLogger) Error(v ...interface{})
- func (l *DefaultLogger) Errorf(format string, v ...interface{})
- func (l *DefaultLogger) Fatal(v ...interface{})
- func (l *DefaultLogger) Fatalf(format string, v ...interface{})
- func (l *DefaultLogger) Info(v ...interface{})
- func (l *DefaultLogger) Infof(format string, v ...interface{})
- func (l *DefaultLogger) Panic(v ...interface{})
- func (l *DefaultLogger) Panicf(format string, v ...interface{})
- func (l *DefaultLogger) Warning(v ...interface{})
- func (l *DefaultLogger) Warningf(format string, v ...interface{})
- type EntryFormatter
- type Logger
- type MemoryStorage
- func (ms *MemoryStorage) Append(entries []pb.Entry) error
- func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error
- func (ms *MemoryStorage) Compact(compactIndex uint64) error
- func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error)
- func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
- func (ms *MemoryStorage) FirstIndex() (uint64, error)
- func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error)
- func (ms *MemoryStorage) LastIndex() (uint64, error)
- func (ms *MemoryStorage) SetHardState(st pb.HardState) error
- func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error)
- func (ms *MemoryStorage) Term(i uint64) (uint64, error)
- type MultiNode
- type Node
- type Peer
- type Progress
- type ProgressStateType
- type Ready
- type SnapshotStatus
- type SoftState
- type StateType
- type Status
- type Storage
Examples ¶
Constants ¶
const None uint64 = 0
None is a placeholder node ID used when there is no leader.
Variables ¶
var ErrCompacted = errors.New("requested index is unavailable due to compaction")
ErrCompacted is returned by Storage.Entries/Compact when a requested index is unavailable because it predates the last snapshot.
var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot")
ErrOutOfDataSnap is returned by Storage.CreateSnapshot when a requested index is older than the existing snapshot.
var ( // ErrStopped is returned by methods on Nodes that have been stopped. ErrStopped = errors.New("raft: stopped") )
Functions ¶
func DescribeEntry ¶
func DescribeEntry(e pb.Entry, f EntryFormatter) string
DescribeEntry returns a concise human-readable description of an Entry for debugging.
func DescribeMessage ¶
func DescribeMessage(m pb.Message, f EntryFormatter) string
DescribeMessage returns a concise human-readable description of a Message for debugging.
func IsEmptyHardState ¶
IsEmptyHardState returns true if the given HardState is empty.
func IsEmptySnap ¶
IsEmptySnap returns true if the given Snapshot is empty.
func IsLocalMsg ¶
func IsResponseMsg ¶
Types ¶
type Config ¶
type Config struct { // ID is the identity of the local raft. ID cannot be 0. ID uint64 // ElectionTick is the election timeout. If a follower does not // receive any message from the leader of current term during // ElectionTick, it will become candidate and start an election. // ElectionTick must be greater than HeartbeatTick. We suggest // to use ElectionTick = 10 * HeartbeatTick to avoid unnecessary // leader switching. ElectionTick int // HeartbeatTick is the heartbeat interval. A leader sends heartbeat // message to maintain the leadership every heartbeat interval. HeartbeatTick int // Storage is the storage for raft. raft generates entires and // states to be stored in storage. raft reads the persisted entires // and states out of Storage when it needs. raft reads out the previous // state and configuration out of storage when restarting. Storage Storage // Applied is the last applied index. It should only be set when restarting // raft. raft will not return entries to the application smaller or equal to Applied. // If Applied is unset when restarting, raft might return previous applied entries. // This is a very application dependent configuration. Applied uint64 // MaxSizePerMsg limits the max size of each append message. Smaller value lowers // the raft recovery cost(initial probing and message lost during normal operation). // On the other side, it might affect the throughput during normal replication. // Note: math.MaxUint64 for unlimited, 0 for at most one entry per message. MaxSizePerMsg uint64 // MaxInflightMsgs limits the max number of in-flight append messages during optimistic // replication phase. The application transportation layer usually has its own sending // buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer. // TODO (xiangli): feedback to application to limit the proposal rate? MaxInflightMsgs int // logger is the logger used for raft log. For multinode which // can host multiple raft group, each raft group can have its // own logger Logger Logger // contains filtered or unexported fields }
Config contains the parameters to start a raft.
type DefaultLogger ¶
DefaultLogger is a defualt implementation of the Logger interface.
func (*DefaultLogger) Debug ¶
func (l *DefaultLogger) Debug(v ...interface{})
func (*DefaultLogger) Debugf ¶
func (l *DefaultLogger) Debugf(format string, v ...interface{})
func (*DefaultLogger) EnableDebug ¶
func (l *DefaultLogger) EnableDebug()
func (*DefaultLogger) EnableTimestamps ¶
func (l *DefaultLogger) EnableTimestamps()
func (*DefaultLogger) Error ¶
func (l *DefaultLogger) Error(v ...interface{})
func (*DefaultLogger) Errorf ¶
func (l *DefaultLogger) Errorf(format string, v ...interface{})
func (*DefaultLogger) Fatal ¶
func (l *DefaultLogger) Fatal(v ...interface{})
func (*DefaultLogger) Fatalf ¶
func (l *DefaultLogger) Fatalf(format string, v ...interface{})
func (*DefaultLogger) Info ¶
func (l *DefaultLogger) Info(v ...interface{})
func (*DefaultLogger) Infof ¶
func (l *DefaultLogger) Infof(format string, v ...interface{})
func (*DefaultLogger) Panic ¶
func (l *DefaultLogger) Panic(v ...interface{})
func (*DefaultLogger) Panicf ¶
func (l *DefaultLogger) Panicf(format string, v ...interface{})
func (*DefaultLogger) Warning ¶
func (l *DefaultLogger) Warning(v ...interface{})
func (*DefaultLogger) Warningf ¶
func (l *DefaultLogger) Warningf(format string, v ...interface{})
type EntryFormatter ¶
EntryFormatter can be implemented by the application to provide human-readable formatting of entry data. Nil is a valid EntryFormatter and will use a default format.
type Logger ¶
type Logger interface { Debug(v ...interface{}) Debugf(format string, v ...interface{}) Error(v ...interface{}) Errorf(format string, v ...interface{}) Info(v ...interface{}) Infof(format string, v ...interface{}) Warning(v ...interface{}) Warningf(format string, v ...interface{}) Fatal(v ...interface{}) Fatalf(format string, v ...interface{}) Panic(v ...interface{}) Panicf(format string, v ...interface{}) }
type MemoryStorage ¶
type MemoryStorage struct { // Protects access to all fields. Most methods of MemoryStorage are // run on the raft goroutine, but Append() is run on an application // goroutine. sync.Mutex // contains filtered or unexported fields }
MemoryStorage implements the Storage interface backed by an in-memory array.
func NewMemoryStorage ¶
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage creates an empty MemoryStorage.
func (*MemoryStorage) Append ¶
func (ms *MemoryStorage) Append(entries []pb.Entry) error
Append the new entries to storage. TODO (xiangli): ensure the entries are continuous and entries[0].Index > ms.entries[0].Index
func (*MemoryStorage) ApplySnapshot ¶
func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error
ApplySnapshot overwrites the contents of this Storage object with those of the given snapshot.
func (*MemoryStorage) Compact ¶
func (ms *MemoryStorage) Compact(compactIndex uint64) error
Compact discards all log entries prior to compactIndex. It is the application's responsibility to not attempt to compact an index greater than raftLog.applied.
func (*MemoryStorage) CreateSnapshot ¶
func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error)
Creates a snapshot which can be retrieved with the Snapshot() method and can be used to reconstruct the state at that point. If any configuration changes have been made since the last compaction, the result of the last ApplyConfChange must be passed in.
func (*MemoryStorage) Entries ¶
func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
Entries implements the Storage interface.
func (*MemoryStorage) FirstIndex ¶
func (ms *MemoryStorage) FirstIndex() (uint64, error)
FirstIndex implements the Storage interface.
func (*MemoryStorage) InitialState ¶
InitialState implements the Storage interface.
func (*MemoryStorage) LastIndex ¶
func (ms *MemoryStorage) LastIndex() (uint64, error)
LastIndex implements the Storage interface.
func (*MemoryStorage) SetHardState ¶
func (ms *MemoryStorage) SetHardState(st pb.HardState) error
SetHardState saves the current HardState.
type MultiNode ¶
type MultiNode interface { // CreateGroup adds a new group to the MultiNode. The application must call CreateGroup // on each particpating node with the same group ID; it may create groups on demand as it // receives messages. If the given storage contains existing log entries the list of peers // may be empty. If Config.ID field is zero it will be replaced by the ID passed // to StartMultiNode. CreateGroup(group uint64, c *Config, peers []Peer) error // RemoveGroup removes a group from the MultiNode. RemoveGroup(group uint64) error // Tick advances the internal logical clock by a single tick. Tick() // Campaign causes this MultiNode to transition to candidate state in the given group. Campaign(ctx context.Context, group uint64) error // Propose proposes that data be appended to the given group's log. Propose(ctx context.Context, group uint64, data []byte) error // ProposeConfChange proposes a config change. ProposeConfChange(ctx context.Context, group uint64, cc pb.ConfChange) error // ApplyConfChange applies a config change to the local node. ApplyConfChange(group uint64, cc pb.ConfChange) *pb.ConfState // Step advances the state machine using the given message. Step(ctx context.Context, group uint64, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state of any ready // groups. Only groups with something to report will appear in the map. Ready() <-chan map[uint64]Ready // Advance notifies the node that the application has applied and saved progress in the // last Ready results. It must be called with the last value returned from the Ready() // channel. Advance(map[uint64]Ready) // Status returns the current status of the given group. Returns nil if no such group // exists. Status(group uint64) *Status // Report reports the given node is not reachable for the last send. ReportUnreachable(id, groupID uint64) // ReportSnapshot reports the stutus of the sent snapshot. ReportSnapshot(id, groupID uint64, status SnapshotStatus) // Stop performs any necessary termination of the MultiNode. Stop() }
MultiNode represents a node that is participating in multiple consensus groups. A MultiNode is more efficient than a collection of Nodes. The methods of this interface correspond to the methods of Node and are described more fully there.
func StartMultiNode ¶
StartMultiNode creates a MultiNode and starts its background goroutine. If id is non-zero it identifies this node and will be used as its node ID in all groups. The election and heartbeat timers are in units of ticks.
type Node ¶
type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election // timeouts and heartbeat timeouts are in units of ticks. Tick() // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Propose(ctx context.Context, data []byte) error // ProposeConfChange proposes config change. // At most one ConfChange can be in the process of going through consensus. // Application needs to call ApplyConfChange when applying EntryConfChange type entry. ProposeConfChange(ctx context.Context, cc pb.ConfChange) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state. // Users of the Node must call Advance after applying the state returned by Ready. Ready() <-chan Ready // Advance notifies the Node that the application has applied and saved progress up to the last Ready. // It prepares the node to return the next available Ready. Advance() // ApplyConfChange applies config change to the local node. // Returns an opaque ConfState protobuf which must be recorded // in snapshots. Will never return nil; it returns a pointer only // to match MemoryStorage.Compact. ApplyConfChange(cc pb.ConfChange) *pb.ConfState // Status returns the current status of the raft state machine. Status() Status // Report reports the given node is not reachable for the last send. ReportUnreachable(id uint64) // ReportSnapshot reports the status of the sent snapshot. ReportSnapshot(id uint64, status SnapshotStatus) // Stop performs any necessary termination of the Node. Stop() }
Node represents a node in a raft cluster.
Example ¶
package main import ( pb "github.com/coreos/etcd/raft/raftpb" ) func applyToStore(ents []pb.Entry) {} func sendMessages(msgs []pb.Message) {} func saveStateToDisk(st pb.HardState) {} func saveToDisk(ents []pb.Entry) {} func main() { c := &Config{} n := StartNode(c, nil) // stuff to n happens in other goroutines // the last known state var prev pb.HardState for { // ReadState blocks until there is new state ready. rd := <-n.Ready() if !isHardStateEqual(prev, rd.HardState) { saveStateToDisk(rd.HardState) prev = rd.HardState } saveToDisk(rd.Entries) go applyToStore(rd.CommittedEntries) sendMessages(rd.Messages) } }
Output:
func RestartNode ¶
RestartNode is similar to StartNode but does not take a list of peers. The current membership of the cluster will be restored from the Storage. If the caller has an existing state machine, pass in the last log index that has been applied to it; otherwise use zero.
type Progress ¶
type Progress struct {
Match, Next uint64
// When in ProgressStateProbe, leader sends at most one replication message
// per heartbeat interval. It also probes actual progress of the follower.
//
// When in ProgressStateReplicate, leader optimistically increases next
// to the latest entry sent after sending replication message. This is
// an optimized state for fast replicating log entries to the follower.
//
// When in ProgressStateSnapshot, leader should have sent out snapshot
// before and stops sending any replication message.
State ProgressStateType
// Paused is used in ProgressStateProbe.
// When Paused is true, raft should pause sending replication message to this peer.
Paused bool
// PendingSnapshot is used in ProgressStateSnapshot.
// If there is a pending snapshot, the pendingSnapshot will be set to the
// index of the snapshot. If pendingSnapshot is set, the replication process of
// this Progress will be paused. raft will not resend snapshot until the pending one
// is reported to be failed.
PendingSnapshot uint64
// contains filtered or unexported fields
}
Progress represents a follower’s progress in the view of the leader. Leader maintains progresses of all followers, and sends entries to the follower based on its progress.
type ProgressStateType ¶
type ProgressStateType uint64
const ( ProgressStateProbe ProgressStateType = iota ProgressStateReplicate ProgressStateSnapshot )
func (ProgressStateType) String ¶
func (st ProgressStateType) String() string
type Ready ¶
type Ready struct { // The current volatile state of a Node. // SoftState will be nil if there is no update. // It is not required to consume or store SoftState. *SoftState // The current state of a Node to be saved to stable storage BEFORE // Messages are sent. // HardState will be equal to empty state if there is no update. pb.HardState // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. Entries []pb.Entry // Snapshot specifies the snapshot to be saved to stable storage. Snapshot pb.Snapshot // CommittedEntries specifies entries to be committed to a // store/state-machine. These have previously been committed to stable // store. CommittedEntries []pb.Entry // Messages specifies outbound messages to be sent AFTER Entries are // committed to stable storage. // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message }
Ready encapsulates the entries and messages that are ready to read, be saved to stable storage, committed or sent to other peers. All fields in Ready are read-only.
type SnapshotStatus ¶
type SnapshotStatus int
const ( SnapshotFinish SnapshotStatus = 1 SnapshotFailure SnapshotStatus = 2 )
type SoftState ¶
SoftState provides state that is useful for logging and debugging. The state is volatile and does not need to be persisted to the WAL.
type StateType ¶
type StateType uint64
StateType represents the role of a node in a cluster.
func (StateType) MarshalJSON ¶
type Status ¶
func (Status) MarshalJSON ¶
TODO: try to simplify this by introducing ID type into raft
type Storage ¶
type Storage interface { // InitialState returns the saved HardState and ConfState information. InitialState() (pb.HardState, pb.ConfState, error) // Entries returns a slice of log entries in the range [lo,hi). // MaxSize limits the total size of the log entries returned, but // Entries returns at least one entry if any. Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) // Term returns the term of entry i, which must be in the range // [FirstIndex()-1, LastIndex()]. The term of the entry before // FirstIndex is retained for matching purposes even though the // rest of that entry may not be available. Term(i uint64) (uint64, error) // LastIndex returns the index of the last entry in the log. LastIndex() (uint64, error) // FirstIndex returns the index of the first log entry that is // possibly available via Entries (older entries have been incorporated // into the latest Snapshot; if storage only contains the dummy entry the // first log entry is not available). FirstIndex() (uint64, error) // Snapshot returns the most recent snapshot. // If snapshot is temporarily unavailable, it should return ErrTemporarilyUnavailable, // so raft state machine could know that Storage needs some time to prepare // snapshot and call Snapshot later. Snapshot() (pb.Snapshot, error) }
Storage is an interface that may be implemented by the application to retrieve log entries from storage.
If any Storage method returns an error, the raft instance will become inoperable and refuse to participate in elections; the application is responsible for cleanup and recovery in this case.