raft

package
v0.0.0-...-1338f1b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 15, 2024 License: Apache-2.0 Imports: 11 Imported by: 1

Documentation

Overview

Package raft sends and receives messages in the Protocol Buffer format defined in the eraftpb package.

Raft is a protocol with which a cluster of nodes can maintain a replicated state machine. The state machine is kept in sync through the use of a replicated log. For more details on Raft, see "In Search of an Understandable Consensus Algorithm" (https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout.

Usage

The primary object in raft is a Node. You either start a Node from scratch using raft.StartNode or start a Node from some initial state using raft.RestartNode.

To start a node from scratch:

storage := raft.NewMemoryStorage()
c := &Config{
  ID:              0x01,
  ElectionTick:    10,
  HeartbeatTick:   1,
  Storage:         storage,
}
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})

To restart a node from previous state:

storage := raft.NewMemoryStorage()

// recover the in-memory storage from persistent
// snapshot, state and entries.
storage.ApplySnapshot(snapshot)
storage.SetHardState(state)
storage.Append(entries)

c := &Config{
  ID:              0x01,
  ElectionTick:    10,
  HeartbeatTick:   1,
  Storage:         storage,
  MaxInflightMsgs: 256,
}

// restart raft without peer information.
// peer information is already included in the storage.
n := raft.RestartNode(c)

Now that you are holding onto a Node you have a few responsibilities:

First, you must read from the Node.Ready() channel and process the updates it contains. These steps may be performed in parallel, except as noted in step 2.

1. Write HardState, Entries, and Snapshot to persistent storage if they are not empty. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded.

2. Send all Messages to the nodes named in the To field. It is important that no messages be sent until the latest HardState has been persisted to disk, and all Entries written by any previous Ready batch (Messages may be sent while entries from the same batch are being persisted).

Note: Marshalling messages is not thread-safe; it is important that you make sure that no new entries are persisted while marshalling. The easiest way to achieve this is to serialize the messages directly inside your main raft loop.

3. Apply Snapshot (if any) and CommittedEntries to the state machine. If any committed Entry has Type EntryType_EntryConfChange, call Node.ApplyConfChange() to apply it to the node. The configuration change may be cancelled at this point by setting the NodeId field to zero before calling ApplyConfChange (but ApplyConfChange must be called one way or the other, and the decision to cancel must be based solely on the state machine and not external information such as the observed health of the node).

4. Call Node.Advance() to signal readiness for the next batch of updates. This may be done at any time after step 1, although all updates must be processed in the order they were returned by Ready.

Second, all persisted log entries must be made available via an implementation of the Storage interface. The provided MemoryStorage type can be used for this (if you repopulate its state upon a restart), or you can supply your own disk-backed implementation.

Third, when you receive a message from another node, pass it to Node.Step:

func recvRaftRPC(ctx context.Context, m eraftpb.Message) {
	n.Step(ctx, m)
}

Finally, you need to call Node.Tick() at regular intervals (probably via a time.Ticker). Raft has two important timeouts: heartbeat and the election timeout. However, internally to the raft package time is represented by an abstract "tick".

The total state machine handling loop will look something like this:

for {
  select {
  case <-s.Ticker:
    n.Tick()
  case rd := <-s.Node.Ready():
    saveToStorage(rd.State, rd.Entries, rd.Snapshot)
    send(rd.Messages)
    if !raft.IsEmptySnap(rd.Snapshot) {
      processSnapshot(rd.Snapshot)
    }
    for _, entry := range rd.CommittedEntries {
      process(entry)
      if entry.Type == eraftpb.EntryType_EntryConfChange {
        var cc eraftpb.ConfChange
        cc.Unmarshal(entry.Data)
        s.Node.ApplyConfChange(cc)
      }
    }
    s.Node.Advance()
  case <-s.done:
    return
  }
}

To propose changes to the state machine from your node take your application data, serialize it into a byte slice and call:

n.Propose(data)

If the proposal is committed, data will appear in committed entries with type eraftpb.EntryType_EntryNormal. There is no guarantee that a proposed command will be committed; you may have to re-propose after a timeout.

To add or remove a node in a cluster, build ConfChange struct 'cc' and call:

n.ProposeConfChange(cc)

After config change is committed, some committed entry with type eraftpb.EntryType_EntryConfChange will be returned. You must apply it to node through:

var cc eraftpb.ConfChange
cc.Unmarshal(data)
n.ApplyConfChange(cc)

Note: An ID represents a unique node in a cluster for all time. A given ID MUST be used only once even if the old node has been removed. This means that for example IP addresses make poor node IDs since they may be reused. Node IDs must be non-zero.

Implementation notes

This implementation is up to date with the final Raft thesis (https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our implementation of the membership change protocol differs somewhat from that described in chapter 4. The key invariant that membership changes happen one node at a time is preserved, but in our implementation the membership change takes effect when its entry is applied, not when it is added to the log (so the entry is committed under the old membership instead of the new). This is equivalent in terms of safety, since the old and new configurations are guaranteed to overlap.

To ensure that we do not attempt to commit two membership changes at once by matching log positions (which would be unsafe since they should have different quorum requirements), we simply disallow any proposed membership change while any uncommitted change appears in the leader's log.

This approach introduces a problem when you try to remove a member from a two-member cluster: If one of the members dies before the other one receives the commit of the confchange entry, then the member cannot be removed any more since the cluster cannot make progress. For this reason it is highly recommended to use three or more nodes in every cluster.

MessageType

Package raft sends and receives message in Protocol Buffer format (defined in eraftpb package). Each state (follower, candidate, leader) implements its own 'step' method ('stepFollower', 'stepCandidate', 'stepLeader') when advancing with the given eraftpb.Message. Each step is determined by its eraftpb.MessageType. Note that every step is checked by one common method 'Step' that safety-checks the terms of node and incoming message to prevent stale log entries:

'MessageType_MsgHup' is used for election. If a node is a follower or candidate, the
'tick' function in 'raft' struct is set as 'tickElection'. If a follower or
candidate has not received any heartbeat before the election timeout, it
passes 'MessageType_MsgHup' to its Step method and becomes (or remains) a candidate to
start a new election.

'MessageType_MsgBeat' is an internal type that signals the leader to send a heartbeat of
the 'MessageType_MsgHeartbeat' type. If a node is a leader, the 'tick' function in
the 'raft' struct is set as 'tickHeartbeat', and triggers the leader to
send periodic 'MessageType_MsgHeartbeat' messages to its followers.

'MessageType_MsgPropose' proposes to append data to its log entries. This is a special
type to redirect proposals to the leader. Therefore, send method overwrites
eraftpb.Message's term with its HardState's term to avoid attaching its
local term to 'MessageType_MsgPropose'. When 'MessageType_MsgPropose' is passed to the leader's 'Step'
method, the leader first calls the 'appendEntry' method to append entries
to its log, and then calls 'bcastAppend' method to send those entries to
its peers. When passed to candidate, 'MessageType_MsgPropose' is dropped. When passed to
follower, 'MessageType_MsgPropose' is stored in follower's mailbox(msgs) by the send
method. It is stored with sender's ID and later forwarded to the leader by
rafthttp package.

'MessageType_MsgAppend' contains log entries to replicate. A leader calls bcastAppend,
which calls sendAppend, which sends soon-to-be-replicated logs in 'MessageType_MsgAppend'
type. When 'MessageType_MsgAppend' is passed to candidate's Step method, candidate reverts
back to follower, because it indicates that there is a valid leader sending
'MessageType_MsgAppend' messages. Candidate and follower respond to this message in
'MessageType_MsgAppendResponse' type.

'MessageType_MsgAppendResponse' is response to log replication request('MessageType_MsgAppend'). When
'MessageType_MsgAppend' is passed to candidate or follower's Step method, it responds by
calling 'handleAppendEntries' method, which sends 'MessageType_MsgAppendResponse' to raft
mailbox.

'MessageType_MsgRequestVote' requests votes for election. When a node is a follower or
candidate and 'MessageType_MsgHup' is passed to its Step method, then the node calls
'campaign' method to campaign itself to become a leader. Once 'campaign'
method is called, the node becomes candidate and sends 'MessageType_MsgRequestVote' to peers
in cluster to request votes. When passed to the leader or candidate's Step
method and the message's Term is lower than leader's or candidate's,
'MessageType_MsgRequestVote' will be rejected ('MessageType_MsgRequestVoteResponse' is returned with Reject true).
If leader or candidate receives 'MessageType_MsgRequestVote' with higher term, it will revert
back to follower. When 'MessageType_MsgRequestVote' is passed to follower, it votes for the
sender only when sender's last term is greater than MessageType_MsgRequestVote's term or
sender's last term is equal to MessageType_MsgRequestVote's term but sender's last committed
index is greater than or equal to follower's.

'MessageType_MsgRequestVoteResponse' contains responses from voting request. When 'MessageType_MsgRequestVoteResponse' is
passed to candidate, the candidate calculates how many votes it has won. If
it's more than majority (quorum), it becomes leader and calls 'bcastAppend'.
If candidate receives majority of votes of denials, it reverts back to
follower.

'MessageType_MsgSnapshot' requests to install a snapshot message. When a node has just
become a leader or the leader receives 'MessageType_MsgPropose' message, it calls
'bcastAppend' method, which then calls 'sendAppend' method to each
follower. In 'sendAppend', if a leader fails to get term or entries,
the leader requests snapshot by sending 'MessageType_MsgSnapshot' type message.

'MessageType_MsgHeartbeat' sends heartbeat from leader. When 'MessageType_MsgHeartbeat' is passed
to candidate and message's term is higher than candidate's, the candidate
reverts back to follower and updates its committed index from the one in
this heartbeat. And it sends the message to its mailbox. When
'MessageType_MsgHeartbeat' is passed to follower's Step method and message's term is
higher than follower's, the follower updates its leaderID with the ID
from the message.

'MessageType_MsgHeartbeatResponse' is a response to 'MessageType_MsgHeartbeat'. When 'MessageType_MsgHeartbeatResponse'
is passed to the leader's Step method, the leader knows which follower
responded.

Index

Constants

View Source
const None uint64 = 0

None is a placeholder node ID used when there is no leader.

Variables

View Source
var ErrCompacted = errors.New("requested index is unavailable due to compaction")

ErrCompacted is returned by Storage.Entries/Compact when a requested index is unavailable because it predates the last snapshot.

View Source
var ErrProposalDropped = errors.New("raft proposal dropped")

ErrProposalDropped is returned when the proposal is ignored by some cases, so that the proposer can be notified and fail fast.

View Source
var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot")

ErrSnapOutOfDate is returned by Storage.CreateSnapshot when a requested index is older than the existing snapshot.

View Source
var ErrSnapshotTemporarilyUnavailable = errors.New("snapshot is temporarily unavailable")

ErrSnapshotTemporarilyUnavailable is returned by the Storage interface when the required snapshot is temporarily unavailable.

View Source
var ErrStepLocalMsg = errors.New("raft: cannot step raft local message")

ErrStepLocalMsg is returned when try to step a local raft message

View Source
var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found")

ErrStepPeerNotFound is returned when try to step a response message but there is no peer found in raft.Prs for that node.

View Source
var ErrUnavailable = errors.New("requested entry at index is unavailable")

ErrUnavailable is returned by Storage interface when the requested log entries are unavailable.

Functions

func IsEmptyHardState

func IsEmptyHardState(st pb.HardState) bool

IsEmptyHardState returns true if the given HardState is empty.

func IsEmptySnap

func IsEmptySnap(sp *pb.Snapshot) bool

IsEmptySnap returns true if the given Snapshot is empty.

func IsLocalMsg

func IsLocalMsg(msgt pb.MessageType) bool

func IsResponseMsg

func IsResponseMsg(msgt pb.MessageType) bool

Types

type Config

type Config struct {
	// ID is the identity of the local raft. ID cannot be 0.
	ID uint64

	// ElectionTick is the number of Node.Tick invocations that must pass between
	// elections. That is, if a follower does not receive any message from the
	// leader of current term before ElectionTick has elapsed, it will become
	// candidate and start an election. ElectionTick must be greater than
	// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
	// unnecessary leader switching.
	ElectionTick int
	// HeartbeatTick is the number of Node.Tick invocations that must pass between
	// heartbeats. That is, a leader sends heartbeat messages to maintain its
	// leadership every HeartbeatTick ticks.
	HeartbeatTick int

	// Storage is the storage for raft. raft generates entries and states to be
	// stored in storage. raft reads the persisted entries and states out of
	// Storage when it needs. raft reads out the previous state and configuration
	// out of storage when restarting.
	Storage Storage
	// Applied is the last applied index. It should only be set when restarting
	// raft. raft will not return entries to the application smaller or equal to
	// Applied. If Applied is unset when restarting, raft might return previous
	// applied entries. This is a very application dependent configuration.
	Applied uint64
	// contains filtered or unexported fields
}

Config contains the parameters to start a raft.

type MemoryStorage

type MemoryStorage struct {
	// Protects access to all fields. Most methods of MemoryStorage are
	// run on the raft goroutine, but Append() is run on an application
	// goroutine.
	sync.Mutex
	// contains filtered or unexported fields
}

MemoryStorage implements the Storage interface backed by an in-memory array.

func NewMemoryStorage

func NewMemoryStorage() *MemoryStorage

NewMemoryStorage creates an empty MemoryStorage.

func (*MemoryStorage) Append

func (ms *MemoryStorage) Append(entries []pb.Entry) error

Append the new entries to storage. TODO (xiangli): ensure the entries are continuous and entries[0].Index > ms.entries[0].Index

func (*MemoryStorage) ApplySnapshot

func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error

ApplySnapshot overwrites the contents of this Storage object with those of the given snapshot.

func (*MemoryStorage) Compact

func (ms *MemoryStorage) Compact(compactIndex uint64) error

Compact discards all log entries prior to compactIndex. It is the application's responsibility to not attempt to compact an index greater than raftLog.applied.

func (*MemoryStorage) CreateSnapshot

func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error)

CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and can be used to reconstruct the state at that point. If any configuration changes have been made since the last compaction, the result of the last ApplyConfChange must be passed in.

func (*MemoryStorage) Entries

func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error)

Entries implements the Storage interface.

func (*MemoryStorage) FirstIndex

func (ms *MemoryStorage) FirstIndex() (uint64, error)

FirstIndex implements the Storage interface.

func (*MemoryStorage) InitialState

func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error)

InitialState implements the Storage interface.

func (*MemoryStorage) LastIndex

func (ms *MemoryStorage) LastIndex() (uint64, error)

LastIndex implements the Storage interface.

func (*MemoryStorage) SetHardState

func (ms *MemoryStorage) SetHardState(st pb.HardState) error

SetHardState saves the current HardState.

func (*MemoryStorage) Snapshot

func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error)

Snapshot implements the Storage interface.

func (*MemoryStorage) Term

func (ms *MemoryStorage) Term(i uint64) (uint64, error)

Term implements the Storage interface.

type Progress

type Progress struct {
	Match, Next uint64
}

Progress represents a follower’s progress in the view of the leader. Leader maintains progresses of all followers, and sends entries to the follower based on its progress.

type Raft

type Raft struct {
	Term uint64
	Vote uint64

	// the log
	RaftLog *RaftLog

	// log replication progress of each peers
	Prs map[uint64]*Progress

	// this peer's role
	State StateType

	// the leader id
	Lead uint64

	// Only one conf change may be pending (in the log, but not yet
	// applied) at a time. This is enforced via PendingConfIndex, which
	// is set to a value >= the log index of the latest pending
	// configuration change (if any). Config changes are only allowed to
	// be proposed if the leader's applied index is greater than this
	// value.
	// (Used in 3A conf change)
	PendingConfIndex uint64
	// contains filtered or unexported fields
}

func (*Raft) Step

func (r *Raft) Step(m pb.Message) error

Step the entrance of handle message, see `MessageType` on `eraftpb.proto` for what msgs should be handled

type RaftLog

type RaftLog struct {
	// contains filtered or unexported fields
}

RaftLog manage the log entries, its struct look like:

snapshot/first.....applied....committed....stabled.....last
--------|------------------------------------------------|
                          log entries

for simplify the RaftLog implement should manage all log entries that not truncated

func (*RaftLog) LastIndex

func (l *RaftLog) LastIndex() uint64

LastIndex return the last index of the log entries

func (*RaftLog) Term

func (l *RaftLog) Term(i uint64) (uint64, error)

Term return the term of the entry in the given index

type RawNode

type RawNode struct {
	Raft *Raft
}

RawNode is a wrapper of Raft.

func NewRawNode

func NewRawNode(config *Config) (*RawNode, error)

NewRawNode returns a new RawNode given configuration and a list of raft peers.

func (*RawNode) Advance

func (rn *RawNode) Advance(rd Ready)

Advance notifies the RawNode that the application has applied and saved progress in the last Ready results.

func (*RawNode) ApplyConfChange

func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState

ApplyConfChange applies a config change to the local node.

func (*RawNode) Campaign

func (rn *RawNode) Campaign() error

Campaign causes this RawNode to transition to candidate state.

func (*RawNode) GetProgress

func (rn *RawNode) GetProgress() map[uint64]Progress

GetProgress return the Progress of this node and its peers, if this node is leader.

func (*RawNode) HasReady

func (rn *RawNode) HasReady() bool

HasReady called when RawNode user need to check if any Ready pending.

func (*RawNode) Propose

func (rn *RawNode) Propose(data []byte) error

Propose proposes data be appended to the raft log.

func (*RawNode) ProposeConfChange

func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error

ProposeConfChange proposes a config change.

func (*RawNode) Ready

func (rn *RawNode) Ready() Ready

Ready returns the current point-in-time state of this RawNode.

func (*RawNode) Step

func (rn *RawNode) Step(m pb.Message) error

Step advances the state machine using the given message.

func (*RawNode) Tick

func (rn *RawNode) Tick()

Tick advances the internal logical clock by a single tick.

func (*RawNode) TransferLeader

func (rn *RawNode) TransferLeader(transferee uint64)

TransferLeader tries to transfer leadership to the given transferee.

type Ready

type Ready struct {
	// The current volatile state of a Node.
	// SoftState will be nil if there is no update.
	// It is not required to consume or store SoftState.
	*SoftState

	// The current state of a Node to be saved to stable storage BEFORE
	// Messages are sent.
	// HardState will be equal to empty state if there is no update.
	pb.HardState

	// Entries specifies entries to be saved to stable storage BEFORE
	// Messages are sent.
	Entries []pb.Entry

	// Snapshot specifies the snapshot to be saved to stable storage.
	Snapshot pb.Snapshot

	// CommittedEntries specifies entries to be committed to a
	// store/state-machine. These have previously been committed to stable
	// store.
	CommittedEntries []pb.Entry

	// Messages specifies outbound messages to be sent AFTER Entries are
	// committed to stable storage.
	// If it contains a MessageType_MsgSnapshot message, the application MUST report back to raft
	// when the snapshot has been received or has failed by calling ReportSnapshot.
	Messages []pb.Message
}

Ready encapsulates the entries and messages that are ready to read, be saved to stable storage, committed or sent to other peers. All fields in Ready are read-only.

type SoftState

type SoftState struct {
	Lead      uint64
	RaftState StateType
}

SoftState provides state that is volatile and does not need to be persisted to the WAL.

type StateType

type StateType uint64

StateType represents the role of a node in a cluster.

const (
	StateFollower StateType = iota
	StateCandidate
	StateLeader
)

func (StateType) String

func (st StateType) String() string

type Storage

type Storage interface {
	// InitialState returns the saved HardState and ConfState information.
	InitialState() (pb.HardState, pb.ConfState, error)
	// Entries returns a slice of log entries in the range [lo,hi).
	// MaxSize limits the total size of the log entries returned, but
	// Entries returns at least one entry if any.
	Entries(lo, hi uint64) ([]pb.Entry, error)
	// Term returns the term of entry i, which must be in the range
	// [FirstIndex()-1, LastIndex()]. The term of the entry before
	// FirstIndex is retained for matching purposes even though the
	// rest of that entry may not be available.
	Term(i uint64) (uint64, error)
	// LastIndex returns the index of the last entry in the log.
	LastIndex() (uint64, error)
	// FirstIndex returns the index of the first log entry that is
	// possibly available via Entries (older entries have been incorporated
	// into the latest Snapshot; if storage only contains the dummy entry the
	// first log entry is not available).
	FirstIndex() (uint64, error)
	// Snapshot returns the most recent snapshot.
	// If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
	// so raft state machine could know that Storage needs some time to prepare
	// snapshot and call Snapshot later.
	Snapshot() (pb.Snapshot, error)
}

Storage is an interface that may be implemented by the application to retrieve log entries from storage.

If any Storage method returns an error, the raft instance will become inoperable and refuse to participate in elections; the application is responsible for cleanup and recovery in this case.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL