Documentation ¶
Overview ¶
Package raft provides a simple, easy-to-understand, and reliable implementation of Raft.
Raft is a consensus protocol designed to manage replicated logs in a distributed system. Its purpose is to ensure fault-tolerant coordination and consistency among a group of nodes, making it suitable for building reliable systems. Potential use cases include distributed file systems, consistent key-value stores, and service discovery.
Index ¶
- Variables
- type AppendEntriesRequest
- type AppendEntriesResponse
- type Configuration
- type Future
- type InstallSnapshotRequest
- type InstallSnapshotResponse
- type Log
- type LogEntry
- type LogEntryType
- type Operation
- type OperationResponse
- type OperationType
- type Option
- func WithElectionTimeout(time time.Duration) Option
- func WithHeartbeatInterval(time time.Duration) Option
- func WithLeaseDuration(leaseDuration time.Duration) Option
- func WithLog(log Log) Option
- func WithLogLevel(level logging.Level) Option
- func WithSnapshotStorage(snapshotStorage SnapshotStorage) Option
- func WithStateStorage(stateStorage StateStorage) Option
- func WithTransport(transport Transport) Option
- type Raft
- func (r *Raft) AddServer(id string, address string, isVoter bool, timeout time.Duration) Future[Configuration]
- func (r *Raft) AppendEntries(request *AppendEntriesRequest, response *AppendEntriesResponse) error
- func (r *Raft) Bootstrap(configuration map[string]string) error
- func (r *Raft) Configuration() Configuration
- func (r *Raft) InstallSnapshot(request *InstallSnapshotRequest, response *InstallSnapshotResponse) error
- func (r *Raft) LeaderID() string
- func (r *Raft) RemoveServer(id string, timeout time.Duration) Future[Configuration]
- func (r *Raft) RequestVote(request *RequestVoteRequest, response *RequestVoteResponse) error
- func (r *Raft) Restart() error
- func (r *Raft) SetStateCallback(callback func(leaderId string, state State))
- func (r *Raft) Start() error
- func (r *Raft) Status() Status
- func (r *Raft) Stop()
- func (r *Raft) SubmitOperation(operation []byte, operationType OperationType, timeout time.Duration) Future[OperationResponse]
- func (r *Raft) WaitForStableState()
- type RequestVoteRequest
- type RequestVoteResponse
- type Response
- type Result
- type SnapshotFile
- type SnapshotMetadata
- type SnapshotStorage
- type State
- type StateMachine
- type StateStorage
- type Status
- type Transport
Constants ¶
This section is empty.
Variables ¶
var ( // ErrNotLeader is returned when an operation or configuration change is // submitted to a node that is not a leader. Operations may only be submitted // to a node that is a leader. ErrNotLeader = errors.New("this node is not the leader") // ErrInvalidLease is returned when a lease-based read-only operation is // rejected due to the leader's lease having expired. The operation likely // needs to be submitted to a different node in the cluster. ErrInvalidLease = errors.New("this node does not have a valid lease") // ErrPendingConfiguration is returned when a membership change is requested and there // is a pending membership change that has not yet been committed. The membership change // request may be submitted once the pending membership change is committed. ErrPendingConfiguration = errors.New("only one membership change may be committed at a time") // ErrNoCommitThisTerm is returned when a membership change is requested but a log entry has yet // to be committed in the current term. The membership change may be submitted once a log entry // has been committed this term. ErrNoCommitThisTerm = errors.New("a log entry has not been committed in this term") )
var ErrTimeout = errors.New(
"a timeout occurred while waiting for the result - try submitting the operation again",
)
ErrTimeout is returned when a future timed out waiting for a result.
Functions ¶
This section is empty.
Types ¶
type AppendEntriesRequest ¶
type AppendEntriesRequest struct { // The leader's ID. Allows followers to redirect clients. LeaderID string // The leader's Term. Term uint64 // The leader's commit index. LeaderCommit uint64 // The index of the log entry immediately preceding the new ones. PrevLogIndex uint64 // The term of the log entry immediately preceding the new ones. PrevLogTerm uint64 // Contains the log Entries to store (empty for heartbeat). Entries []*LogEntry }
AppendEntriesRequest is a request invoked by the leader to replicate log entries and also serves as a heartbeat.
type AppendEntriesResponse ¶
type AppendEntriesResponse struct { // The term of the server that received the request. Term uint64 // Indicates whether the request to append entries was successful. Success bool // The conflicting Index if there is one. Index uint64 }
AppendEntriesResponse is a response to a request to to replicate log entries.
type Configuration ¶
type Configuration struct { // All members of the cluster. Maps node ID to address. Members map[string]string // Maps node ID to a boolean that indicates whether the node // is a voting member or not. Voting members are those that // have their vote counted in elections and their match index // considered when the leader is advancing the commit index. // Non-voting members merely receive log entries. They are // not considered for election or commitment purposes. IsVoter map[string]bool // The log index of the configuration. Index uint64 }
Configuration represents a cluster of nodes.
func NewConfiguration ¶
func NewConfiguration(index uint64, members map[string]string) *Configuration
NewConfiguration creates a new configuration with the provided members and index. By default, all members in the returned configuration will have voter status.
func (*Configuration) Clone ¶
func (c *Configuration) Clone() Configuration
Clone creates a deep-copy of the configuration.
func (*Configuration) String ¶
func (c *Configuration) String() string
String returns a string representation of the configuration.
type Future ¶
type Future[T Response] interface { // Await retrieves the result of the future. Await() Result[T] }
Future represents an operation that will occur at a later point in time.
type InstallSnapshotRequest ¶
type InstallSnapshotRequest struct { // The leader's ID. LeaderID string // The leader's Term. Term uint64 // The snapshot replaces all entries up to and including // this index. LastIncludedIndex uint64 // The term associated with the last included index. LastIncludedTerm uint64 // The last configuration included in the snapshot. Configuration []byte // A chunk of the snapshot. Bytes []byte // The offset in the snapshot file. Offset int64 // Indicates whether this is the last chunk of the snapshot. Done bool }
InstallSnapshotRequest is invoked by the leader to send a snapshot to a follower.
type InstallSnapshotResponse ¶
type InstallSnapshotResponse struct { // The term of the server that received the request. Term uint64 // The number of bytes written by the reciever. If the // request is successful, this should be the number of bytes // in the request. BytesWritten int64 }
InstallSnapshotResponse is a response to a snapshot installation.
type Log ¶
type Log interface { // Open opens the log and prepares it for reads and writes. Open() error // Replay replays the content of the log on disk into memory. Replay() error // Close closes the log. Close() error // GetEntry returns the log entry located at the specified index. GetEntry(index uint64) (*LogEntry, error) // AppendEntry appends a log entry to the log. AppendEntry(entry *LogEntry) error // AppendEntries appends multiple log entries to the log. AppendEntries(entries []*LogEntry) error // Truncate deletes all log entries with index greater than // or equal to the provided index. Truncate(index uint64) error // DiscardEntries deletes all in-memory and persistent data in the // log. The provided term and index indicate at what term and index // the now empty log will start at. Primarily intended to be used // for snapshotting. DiscardEntries(index uint64, term uint64) error // Compact deletes all log entries with index less than // or equal to the provided index. Compact(index uint64) error // Contains checks if the log contains an entry at the specified index. Contains(index uint64) bool // LastIndex returns the largest index that exists in the log and zero // if the log is empty. LastIndex() uint64 // LastTerm returns the largest term in the log and zero if the log // is empty. LastTerm() uint64 // NextIndex returns the next index to append to the log. NextIndex() uint64 // Size returns the number of entries in the log. Size() int }
Log represents the internal component of Raft that is responsible for persistently storing and retrieving log entries.
type LogEntry ¶
type LogEntry struct { // The index of the log entry. Index uint64 // The term of the log entry. Term uint64 // The offset of the log entry. Offset int64 // The data of the log entry. Data []byte // The type of the log entry. EntryType LogEntryType }
LogEntry is a log entry in the log.
func NewLogEntry ¶
func NewLogEntry(index uint64, term uint64, data []byte, entryType LogEntryType) *LogEntry
NewLogEntry creates a new instance of a LogEntry with the provided index, term, data, and type.
func (*LogEntry) IsConflict ¶
IsConflict checks whether the current log entry conflicts with another log entry. Two log entries are considered conflicting if they have the same index but different terms.
type LogEntryType ¶
type LogEntryType uint32
LogEntryType is the type of the log entry.
const ( // NoOpEntry log entries are those that do not contain an operation. NoOpEntry LogEntryType = iota // OperationEntry log entries are those that do contain an operation that // will be applied to the state machine. OperationEntry // ConfigurationEntry are log entries which contain a cluster configuration. ConfigurationEntry )
type Operation ¶
type Operation struct { // The operation as bytes. The provided state machine should be capable // of decoding these bytes. Bytes []byte // The type of the operation. OperationType OperationType // The log entry index associated with the operation. // Valid only if this is a replicated operation and the operation was successful. LogIndex uint64 // The log entry term associated with the operation. // Valid only if this is a replicated operation and the operation was successful. LogTerm uint64 // contains filtered or unexported fields }
Operation is an operation that will be applied to the state machine. An operation must be deterministic.
type OperationResponse ¶
type OperationResponse struct { // The operation applied to the state machine. Operation Operation // The response returned by the state machine after applying the operation. ApplicationResponse interface{} }
OperationResponse is the response that is generated after applying an operation to the state machine.
type OperationType ¶
type OperationType uint32
OperationType is the type of the operation that is being submitted to raft.
const ( // Replicated indicates that the provided operation will be written to the // log and guarantees linearizable semantics. Replicated OperationType = iota // LinearizableReadOnly indicates that the provided operation will not be written // to the log and requires that the recieving server verify its leadership through // a round of heartbeats to its peers. Guarantees linearizable semantics. LinearizableReadOnly // LeaseBasedReadOnly indicates that the provided operation will not be written // to the log and requires that the server verify its leadership via its lease. // This operation type does not guarantee linearizable semantics. LeaseBasedReadOnly // Broadcasted indicates that the provided operation will be written to the log and // broadcasted to all followers and guarantees linearizable semantics. Broadcasted )
func (OperationType) String ¶
func (o OperationType) String() string
String converts an OperationType to a string.
type Option ¶
type Option func(options *options) error
Option is a function that updates the options associated with Raft.
func WithElectionTimeout ¶
WithElectionTimeout sets the election timeout for raft.
func WithHeartbeatInterval ¶
WithHeartbeatInterval sets the heartbeat interval for raft.
func WithLeaseDuration ¶
WithLeaseDuration sets the duration for which a lease remains valid upon renewal. The lease should generally remain valid for a much smaller amount of time than the election timeout.
func WithLog ¶
WithLog sets the log that will be used by raft. This is useful if you wish to use your own implementation of a log.
func WithLogLevel ¶
WithLogger sets the log level used by raft.
func WithSnapshotStorage ¶
func WithSnapshotStorage(snapshotStorage SnapshotStorage) Option
WithSnapshotStorage sets the snapshot storage that will be used by raft. This is useful if you wish to use your own implementation of a snapshot storage.
func WithStateStorage ¶
func WithStateStorage(stateStorage StateStorage) Option
WithStateStorage sets the state storage that will be used by raft. This is useful if you wish to use your own implementation of a state storage.
func WithTransport ¶
WithTransport sets the network transport that will be used by raft. This is useful if you wish to use your own implementation of a transport.
type Raft ¶
type Raft struct {
// contains filtered or unexported fields
}
Raft implements the raft consensus protocol.
func NewRaft ¶
func NewRaft( id string, address string, fsm StateMachine, dataPath string, opts ...Option, ) (*Raft, error)
NewRaft creates a new instance of Raft with the provided ID and address. The datapath is the top level directory where all state for this node will be persisted.
func (*Raft) AddServer ¶
func (r *Raft) AddServer( id string, address string, isVoter bool, timeout time.Duration, ) Future[Configuration]
AddServer will add a node with the provided ID and address to the cluster and return a future for the resulting configuration. It is generally recommended to add a new node as a non-voting member before adding it as a voting member so that it can become synced with the rest of the cluster.
The provided ID must be unique from the existing nodes in the cluster. If the configuration change was not successful, the returned future will be populated with an error. It may be necessary to resubmit the configuration change to this node or a different node. It is safe to call this function as many times as necessary.
It is is the caller's responsibility to implement retry logic.
func (*Raft) AppendEntries ¶
func (r *Raft) AppendEntries(request *AppendEntriesRequest, response *AppendEntriesResponse) error
AppendEntries handles log replication requests from the leader. It takes a request to append entries and fills the response with the result of the append operation. This will return an error if the node is shutdown.
func (*Raft) Bootstrap ¶
Bootstrap initializes this node with a cluster configuration. The configuration must contain the ID and address of all nodes in the cluster including this one.
This function should only be called when starting a cluster for the first time and there is no existing configuration. This should only be called on a single, voting member of the cluster.
func (*Raft) Configuration ¶
func (r *Raft) Configuration() Configuration
Configuration returns the most current configuration of this node. This configuration may or may not have been committed yet. If there is no configuration, an empty configuration is returned.
func (*Raft) InstallSnapshot ¶
func (r *Raft) InstallSnapshot( request *InstallSnapshotRequest, response *InstallSnapshotResponse, ) error
InstallSnapshot handles snapshot installation requests from the leader. It takes a request to install a snapshot and fills the response with the result of the installation. This will return an error if the node is shutdown.
func (*Raft) RemoveServer ¶
RemoveServer will remove the node with the provided ID from the cluster and returns a future for the resulting configuration. Once removed, the node will remain online as a non-voter and may safely be shutdown.
If the configuration change was not successful, the returned future will be populated with an error. It may be necessary to resubmit the configuration change to this node or a different node. It is safe to call this function as many times as necessary.
It is the caller's responsibility to implement retry logic.
func (*Raft) RequestVote ¶
func (r *Raft) RequestVote(request *RequestVoteRequest, response *RequestVoteResponse) error
RequestVote handles vote requests from other nodes during elections. It takes a vote request and fills the response with the result of the vote. This will return an error if the node is shutdown.
func (*Raft) Restart ¶
Restart starts a node that has been started and stopped before.
The difference between Restart and Start is that Restart restores the state of the node from non-volatile whereas Start does not. Restart should not be called if the node is being started for the first time.
func (*Raft) SetStateCallback ¶ added in v0.2.3
func (*Raft) Start ¶
Start starts this node if it has not already been started.
If the node does not have an existing configuration, a configuration will be created that only includes this node as a non-voter. If the node has been started before, Restart should be called instead.
func (*Raft) Status ¶
Status returns the status of this node. The status includes the ID, address, term, commit index, last applied index, and state of this node.
func (*Raft) SubmitOperation ¶
func (r *Raft) SubmitOperation( operation []byte, operationType OperationType, timeout time.Duration, ) Future[OperationResponse]
SubmitOperation accepts an operation for application to the state machine and returns a future for the response to the operation. Once the operation has been applied to the state machine, the returned future will be populated with the response.
Even if the operation is submitted successfully, is not guaranteed that it will be applied to the state machine if there are failures. If the operation was unable to be applied to the state machine or the operation times out, the future will be populated with an error.
It may be necessary to resubmit the operation to this node or a different node if it failed. It is the caller's responsibility to implement retry logic and to handle duplicate operations.
func (*Raft) WaitForStableState ¶ added in v0.2.3
func (r *Raft) WaitForStableState()
WaitForStableState blocks until this node is in a stable state.
type RequestVoteRequest ¶
type RequestVoteRequest struct { // The ID of the candidate requesting the vote. CandidateID string // The candidate's term. Term uint64 // The index of the candidate's last log entry. LastLogIndex uint64 // The term of the candidate's last log entry. LastLogTerm uint64 // Indicates whether this request is for a prevote. Prevote bool }
RequestVoteRequest is a request invoked by candidates to gather votes.
type RequestVoteResponse ¶
type RequestVoteResponse struct { // The term of the server that received the request. Term uint64 // Indicates whether the vote request was successful. VoteGranted bool }
RequestVoteResponse is a response to a request for a vote.
type Response ¶
type Response interface { OperationResponse | Configuration }
Response is the concrete result produced by a node after processing a client submitted operation.
type Result ¶
type Result[T Response] interface { // Success returns the response associated with an operation. // Error should always be called before Success - the result // returned by Success is only valid if Error returns nil. Success() T // Error returns any error that occurred during the // operation that was to produce the response. Error() error }
Result represents an abstract result produced by a node after processing a client submitted operation.
type SnapshotFile ¶
type SnapshotFile interface { io.ReadWriteSeeker io.Closer // Metadata returns the metadata associated with the snapshot file. Metadata() SnapshotMetadata // Discard deletes the snapshot and its metadata if it is incomplete. Discard() error }
SnapshotFile represents a component for reading and writing snapshots.
type SnapshotMetadata ¶
type SnapshotMetadata struct { // The last log index included in the snapshot. LastIncludedIndex uint64 `json:"last_included_index"` // The last log term included in the snapshot. LastIncludedTerm uint64 `json:"last_included_term"` // The most up-to-date configuration in the snapshot. Configuration []byte `json:"configuration"` }
SnapshotMetadata contains all metadata associated with a snapshot.
type SnapshotStorage ¶
type SnapshotStorage interface { // NewSnapshotFile creates a new snapshot file. It is the caller's responsibility to // close the file or discard it when they are done with it. NewSnapshotFile( lastIncludedIndex uint64, lastIncludedTerm uint64, configuration []byte, ) (SnapshotFile, error) // SnapshotFile returns the most recent snapshot file. It is the // caller's responsibility to close the file when they are done with it. SnapshotFile() (SnapshotFile, error) }
SnapshotStorage represents the component of Raft that manages snapshots created by the state machine.
func NewSnapshotStorage ¶
func NewSnapshotStorage(path string) (SnapshotStorage, error)
NewSnapshotStorage creates a new SnapshotStorage instance.
Snapshots will be stored at path/snapshots. Any directories on the path that do not exist will be created. Each snapshot that is created will have its own directory that is named using a timestamp taken at the time of its creation. Each of these directories will contain two separate files - one for the content of the snapshot and one for its metadata.
type State ¶
type State uint32
State represents the current state of a node. A node may either be shutdown, the leader, or a followers.
const ( // Leader is a state indicating that the node is responsible for replicating and // committing log entries. The leader will accept operations and membership change // requests. Leader State = iota // Follower is a state indicating that a node is responsible for accepting log entries replicated // by the leader. A node in the follower state will not accept operations or membership change // requests. Follower // PreCandidate is a state indicating this node is holding a prevote. If this node is able to // receive successful RequestVote RPC responses from the majority of the cluster, it will enter the // candidate state. PreCandidate // Candidate is a state indicating that this node is currently holding an election. A node will // remain in this state until it is either elected the leader or another node is elected leader // thereby causing this node to step down to the follower state. Candidate // Shutdown is a state indicating that the node is currently offline. Shutdown )
type StateMachine ¶
type StateMachine interface { // Apply applies the given operation to the state machine. Apply(operation *Operation) interface{} // Snapshot returns a snapshot of the current state of the state machine // using the provided writer. Snapshot(snapshotWriter io.Writer) error // Restore recovers the state of the state machine given a reader for a previously // created snapshot. Restore(snapshotReader io.Reader) error // NeedSnapshot returns true if a snapshot should be taken of the state machine and false // otherwise. The provided log size is the number of entries currently in the log. NeedSnapshot(logSize int) bool }
StateMachine is an interface representing a replicated state machine. The implementation must be concurrent safe.
type StateStorage ¶
type StateStorage interface { // SetState persists the provided term and vote. SetState(term uint64, vote string) error // State returns the most recently persisted term and vote in the storage. // If there is no pre-existing state, zero and an empty string will be returned. State() (uint64, string, error) }
StateStorage represents the component of Raft responsible for persistently storing term and vote.
func NewStateStorage ¶
func NewStateStorage(path string) (StateStorage, error)
NewStateStorage creates a new instance of a StateStorage.
The file containing the state will be located at path/state/state.bin. Any directories on path that do not exist will be created.
type Status ¶
type Status struct { // The unique identifier of this node. ID string // The address of the node. Address string // The current term. Term uint64 // The current commit index. CommitIndex uint64 // The index of the last log entry applied to the state machine. LastApplied uint64 // The current state of the node: leader, followers, shutdown. State State }
Status is the status of a node.
type Transport ¶
type Transport interface { // Run will start serving incoming RPCs received at the local network address. Run() error // Shutdown will stop the serving of incoming RPCs. Shutdown() error // AppendEntries sends an append entries request to the provided address. SendAppendEntries(address string, request AppendEntriesRequest) (AppendEntriesResponse, error) // RequestVote sends a request vote request to the peer to the provided address. SendRequestVote(address string, request RequestVoteRequest) (RequestVoteResponse, error) // InstallSnapshot sends a install snapshot request to the provided address. SendInstallSnapshot( address string, request InstallSnapshotRequest, ) (InstallSnapshotResponse, error) // RegisterAppendEntriesHandler registers the function the that will be called when an // AppendEntries RPC is received. RegisterAppendEntriesHandler(handler func(*AppendEntriesRequest, *AppendEntriesResponse) error) // RegisterRequestVoteHandler registers the function that will be called when a // RequestVote RPC is received. RegisterRequestVoteHandler(handler func(*RequestVoteRequest, *RequestVoteResponse) error) // RegisterInstallSnapshotHandler registers the function that will called when an // InstallSnapshot RPC is received. RegisterInstallSnapshotHandler( handler func(*InstallSnapshotRequest, *InstallSnapshotResponse) error, ) // EncodeConfiguration accepts a configuration and encodes it such that it can be // decoded by DecodeConfiguration. EncodeConfiguration(configuration *Configuration) ([]byte, error) // DecodeConfiguration accepts a byte representation of a configuration and decodes // it into a configuration. DecodeConfiguration(data []byte) (Configuration, error) // Address returns the local network address. Address() string }
Transport represents the underlying transport mechanism used by a node in a cluster to send and receive RPCs. It is the implementers responsibility to provide functions that invoke the registered handlers.
func NewTransport ¶
NewTransport creates a new Transport instance.