raft

package module
v0.2.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 13, 2024 License: MIT Imports: 32 Imported by: 0

README

raft

build Go Reference GitHub GitHub go.mod Go version (subdirectory of monorepo)

Raft

This package provides a simple, easy-to-understand, and reliable implementation of Raft using Go. Raft is a consensus protocol designed to manage replicated logs in a distributed system. Its purpose is to ensure fault-tolerant coordination and consistency among a group of nodes, making it suitable for building reliable systems. Potential use cases include distributed file systems, consistent key-value stores, and service discovery.

Features

The following features are currently supported:

  • Automated Snapshots
  • Concurrent Snapshot Transfer from Leader to Followers
  • Dynamic Membership Changes
  • Linearizable and Lease-Based Read-Only Operations
  • Prevote and Leader Stickyness

Protocol Overview

Raft is based on a leader-follower model, where one node is elected as the leader and coordinates the replication process. Time is divided into terms, and the leader is elected for each term through a leader election process. The leader receives client requests, which are then replicated to other nodes called followers. The followers maintain a log of all state changes, and the leader's responsibility is to ensure that all followers have consistent logs by sending them entries to append. Safety is guaranteed by requiring a majority of nodes to agree on the state changes, ensuring that no conflicting states are committed.

Installation and Usage

First, make sure you have Go 1.20 or a higher version installed on your system. You can download and install Go from the official Go website.

Then, install the raft package by running

go get -u github.com/dgate-io/raft

Once you have the package installed, you may refer to the raft package reference page for basic usage. An example of how to use raft as well as a set of Jepsen tests can be found here.

Contributing

Other developers are encouraged to contribute to this project and pull requests are welcome. Please read these guidelines if you are interested in contributing.

Documentation

Overview

Package raft provides a simple, easy-to-understand, and reliable implementation of Raft.

Raft is a consensus protocol designed to manage replicated logs in a distributed system. Its purpose is to ensure fault-tolerant coordination and consistency among a group of nodes, making it suitable for building reliable systems. Potential use cases include distributed file systems, consistent key-value stores, and service discovery.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrNotLeader is returned when an operation or configuration change is
	// submitted to a node that is not a leader. Operations may only be submitted
	// to a node that is a leader.
	ErrNotLeader = errors.New("this node is not the leader")

	// ErrInvalidLease is returned when a lease-based read-only operation is
	// rejected due to the leader's lease having expired. The operation likely
	// needs to be submitted to a different node in the cluster.
	ErrInvalidLease = errors.New("this node does not have a valid lease")

	// ErrPendingConfiguration is returned when a membership change is requested and there
	// is a pending membership change that has not yet been committed. The membership change
	// request may be submitted once the pending membership change is committed.
	ErrPendingConfiguration = errors.New("only one membership change may be committed at a time")

	// ErrNoCommitThisTerm is returned when a membership change is requested but a log entry has yet
	// to be committed in the current term. The membership change may be submitted once a log entry
	// has been committed this term.
	ErrNoCommitThisTerm = errors.New("a log entry has not been committed in this term")
)
View Source
var ErrTimeout = errors.New(
	"a timeout occurred while waiting for the result - try submitting the operation again",
)

ErrTimeout is returned when a future timed out waiting for a result.

Functions

This section is empty.

Types

type AppendEntriesRequest

type AppendEntriesRequest struct {
	// The leader's ID. Allows followers to redirect clients.
	LeaderID string

	// The leader's Term.
	Term uint64

	// The leader's commit index.
	LeaderCommit uint64

	// The index of the log entry immediately preceding the new ones.
	PrevLogIndex uint64

	// The term of the log entry immediately preceding the new ones.
	PrevLogTerm uint64

	// Contains the log Entries to store (empty for heartbeat).
	Entries []*LogEntry
}

AppendEntriesRequest is a request invoked by the leader to replicate log entries and also serves as a heartbeat.

type AppendEntriesResponse

type AppendEntriesResponse struct {
	// The term of the server that received the request.
	Term uint64

	// Indicates whether the request to append entries was successful.
	Success bool

	// The conflicting Index if there is one.
	Index uint64
}

AppendEntriesResponse is a response to a request to to replicate log entries.

type Configuration

type Configuration struct {
	// All members of the cluster. Maps node ID to address.
	Members map[string]string

	// Maps node ID to a boolean that indicates whether the node
	// is a voting member or not. Voting members are those that
	// have their vote counted in elections and their match index
	// considered when the leader is advancing the commit index.
	// Non-voting members merely receive log entries. They are
	// not considered for election or commitment purposes.
	IsVoter map[string]bool

	// The log index of the configuration.
	Index uint64
}

Configuration represents a cluster of nodes.

func NewConfiguration

func NewConfiguration(index uint64, members map[string]string) *Configuration

NewConfiguration creates a new configuration with the provided members and index. By default, all members in the returned configuration will have voter status.

func (*Configuration) Clone

func (c *Configuration) Clone() Configuration

Clone creates a deep-copy of the configuration.

func (*Configuration) String

func (c *Configuration) String() string

String returns a string representation of the configuration.

type Future

type Future[T Response] interface {
	// Await retrieves the result of the future.
	Await() Result[T]
}

Future represents an operation that will occur at a later point in time.

type InstallSnapshotRequest

type InstallSnapshotRequest struct {
	// The leader's ID.
	LeaderID string

	// The leader's Term.
	Term uint64

	// The snapshot replaces all entries up to and including
	// this index.
	LastIncludedIndex uint64

	// The term associated with the last included index.
	LastIncludedTerm uint64

	// The last configuration included in the snapshot.
	Configuration []byte

	// A chunk of the snapshot.
	Bytes []byte

	// The offset in the snapshot file.
	Offset int64

	// Indicates whether this is the last chunk of the snapshot.
	Done bool
}

InstallSnapshotRequest is invoked by the leader to send a snapshot to a follower.

type InstallSnapshotResponse

type InstallSnapshotResponse struct {
	// The term of the server that received the request.
	Term uint64

	// The number of bytes written by the reciever. If the
	// request is successful, this should be the number of bytes
	// in the request.
	BytesWritten int64
}

InstallSnapshotResponse is a response to a snapshot installation.

type Log

type Log interface {
	// Open opens the log and prepares it for reads and writes.
	Open() error

	// Replay replays the content of the log on disk into memory.
	Replay() error

	// Close closes the log.
	Close() error

	// GetEntry returns the log entry located at the specified index.
	GetEntry(index uint64) (*LogEntry, error)

	// AppendEntry appends a log entry to the log.
	AppendEntry(entry *LogEntry) error

	// AppendEntries appends multiple log entries to the log.
	AppendEntries(entries []*LogEntry) error

	// Truncate deletes all log entries with index greater than
	// or equal to the provided index.
	Truncate(index uint64) error

	// DiscardEntries deletes all in-memory and persistent data in the
	// log. The provided term and index indicate at what term and index
	// the now empty log will start at. Primarily intended to be used
	// for snapshotting.
	DiscardEntries(index uint64, term uint64) error

	// Compact deletes all log entries with index less than
	// or equal to the provided index.
	Compact(index uint64) error

	// Contains checks if the log contains an entry at the specified index.
	Contains(index uint64) bool

	// LastIndex returns the largest index that exists in the log and zero
	// if the log is empty.
	LastIndex() uint64

	// LastTerm returns the largest term in the log and zero if the log
	// is empty.
	LastTerm() uint64

	// NextIndex returns the next index to append to the log.
	NextIndex() uint64

	// Size returns the number of entries in the log.
	Size() int
}

Log represents the internal component of Raft that is responsible for persistently storing and retrieving log entries.

func NewLog

func NewLog(path string) (Log, error)

NewLog creates a new Log instance.

The file containing the log will be created at path/log/log.bin. Any directories on the path that do not exist will be created.

type LogEntry

type LogEntry struct {
	// The index of the log entry.
	Index uint64

	// The term of the log entry.
	Term uint64

	// The offset of the log entry.
	Offset int64

	// The data of the log entry.
	Data []byte

	// The type of the log entry.
	EntryType LogEntryType
}

LogEntry is a log entry in the log.

func NewLogEntry

func NewLogEntry(index uint64, term uint64, data []byte, entryType LogEntryType) *LogEntry

NewLogEntry creates a new instance of a LogEntry with the provided index, term, data, and type.

func (*LogEntry) IsConflict

func (e *LogEntry) IsConflict(other *LogEntry) bool

IsConflict checks whether the current log entry conflicts with another log entry. Two log entries are considered conflicting if they have the same index but different terms.

type LogEntryType

type LogEntryType uint32

LogEntryType is the type of the log entry.

const (
	// NoOpEntry log entries are those that do not contain an operation.
	NoOpEntry LogEntryType = iota

	// OperationEntry log entries are those that do contain an operation that
	// will be applied to the state machine.
	OperationEntry

	// ConfigurationEntry are log entries which contain a cluster configuration.
	ConfigurationEntry
)

type Operation

type Operation struct {
	// The operation as bytes. The provided state machine should be capable
	// of decoding these bytes.
	Bytes []byte

	// The type of the operation.
	OperationType OperationType

	// The log entry index associated with the operation.
	// Valid only if this is a replicated operation and the operation was successful.
	LogIndex uint64

	// The log entry term associated with the operation.
	// Valid only if this is a replicated operation and the operation was successful.
	LogTerm uint64
	// contains filtered or unexported fields
}

Operation is an operation that will be applied to the state machine. An operation must be deterministic.

type OperationResponse

type OperationResponse struct {
	// The operation applied to the state machine.
	Operation Operation

	// The response returned by the state machine after applying the operation.
	ApplicationResponse interface{}
}

OperationResponse is the response that is generated after applying an operation to the state machine.

type OperationType

type OperationType uint32

OperationType is the type of the operation that is being submitted to raft.

const (
	// Replicated indicates that the provided operation will be written to the
	// log and guarantees linearizable semantics.
	Replicated OperationType = iota

	// LinearizableReadOnly indicates that the provided operation will not be written
	// to the log and requires that the recieving server verify its leadership through
	// a round  of heartbeats to its peers. Guarantees linearizable semantics.
	LinearizableReadOnly

	// LeaseBasedReadOnly indicates that the provided operation will not be written
	// to the log and requires that the server verify its leadership via its lease.
	// This operation type does not guarantee linearizable semantics.
	LeaseBasedReadOnly

	// Broadcasted indicates that the provided operation will be written to the log and
	// broadcasted to all followers and guarantees linearizable semantics.
	Broadcasted
)

func (OperationType) String

func (o OperationType) String() string

String converts an OperationType to a string.

type Option

type Option func(options *options) error

Option is a function that updates the options associated with Raft.

func WithElectionTimeout

func WithElectionTimeout(time time.Duration) Option

WithElectionTimeout sets the election timeout for raft.

func WithHeartbeatInterval

func WithHeartbeatInterval(time time.Duration) Option

WithHeartbeatInterval sets the heartbeat interval for raft.

func WithLeaseDuration

func WithLeaseDuration(leaseDuration time.Duration) Option

WithLeaseDuration sets the duration for which a lease remains valid upon renewal. The lease should generally remain valid for a much smaller amount of time than the election timeout.

func WithLog

func WithLog(log Log) Option

WithLog sets the log that will be used by raft. This is useful if you wish to use your own implementation of a log.

func WithLogLevel

func WithLogLevel(level logging.Level) Option

WithLogger sets the log level used by raft.

func WithSnapshotStorage

func WithSnapshotStorage(snapshotStorage SnapshotStorage) Option

WithSnapshotStorage sets the snapshot storage that will be used by raft. This is useful if you wish to use your own implementation of a snapshot storage.

func WithStateStorage

func WithStateStorage(stateStorage StateStorage) Option

WithStateStorage sets the state storage that will be used by raft. This is useful if you wish to use your own implementation of a state storage.

func WithTransport

func WithTransport(transport Transport) Option

WithTransport sets the network transport that will be used by raft. This is useful if you wish to use your own implementation of a transport.

type Raft

type Raft struct {
	// contains filtered or unexported fields
}

Raft implements the raft consensus protocol.

func NewRaft

func NewRaft(
	id string,
	address string,
	fsm StateMachine,
	dataPath string,
	opts ...Option,
) (*Raft, error)

NewRaft creates a new instance of Raft with the provided ID and address. The datapath is the top level directory where all state for this node will be persisted.

func (*Raft) AddServer

func (r *Raft) AddServer(
	id string,
	address string,
	isVoter bool,
	timeout time.Duration,
) Future[Configuration]

AddServer will add a node with the provided ID and address to the cluster and return a future for the resulting configuration. It is generally recommended to add a new node as a non-voting member before adding it as a voting member so that it can become synced with the rest of the cluster.

The provided ID must be unique from the existing nodes in the cluster. If the configuration change was not successful, the returned future will be populated with an error. It may be necessary to resubmit the configuration change to this node or a different node. It is safe to call this function as many times as necessary.

It is is the caller's responsibility to implement retry logic.

func (*Raft) AppendEntries

func (r *Raft) AppendEntries(request *AppendEntriesRequest, response *AppendEntriesResponse) error

AppendEntries handles log replication requests from the leader. It takes a request to append entries and fills the response with the result of the append operation. This will return an error if the node is shutdown.

func (*Raft) Bootstrap

func (r *Raft) Bootstrap(configuration map[string]string) error

Bootstrap initializes this node with a cluster configuration. The configuration must contain the ID and address of all nodes in the cluster including this one.

This function should only be called when starting a cluster for the first time and there is no existing configuration. This should only be called on a single, voting member of the cluster.

func (*Raft) Configuration

func (r *Raft) Configuration() Configuration

Configuration returns the most current configuration of this node. This configuration may or may not have been committed yet. If there is no configuration, an empty configuration is returned.

func (*Raft) InstallSnapshot

func (r *Raft) InstallSnapshot(
	request *InstallSnapshotRequest,
	response *InstallSnapshotResponse,
) error

InstallSnapshot handles snapshot installation requests from the leader. It takes a request to install a snapshot and fills the response with the result of the installation. This will return an error if the node is shutdown.

func (*Raft) LeaderID added in v0.2.1

func (r *Raft) LeaderID() string

LeaderID returns the ID of the leader of the cluster.

func (*Raft) RemoveServer

func (r *Raft) RemoveServer(id string, timeout time.Duration) Future[Configuration]

RemoveServer will remove the node with the provided ID from the cluster and returns a future for the resulting configuration. Once removed, the node will remain online as a non-voter and may safely be shutdown.

If the configuration change was not successful, the returned future will be populated with an error. It may be necessary to resubmit the configuration change to this node or a different node. It is safe to call this function as many times as necessary.

It is the caller's responsibility to implement retry logic.

func (*Raft) RequestVote

func (r *Raft) RequestVote(request *RequestVoteRequest, response *RequestVoteResponse) error

RequestVote handles vote requests from other nodes during elections. It takes a vote request and fills the response with the result of the vote. This will return an error if the node is shutdown.

func (*Raft) Restart

func (r *Raft) Restart() error

Restart starts a node that has been started and stopped before.

The difference between Restart and Start is that Restart restores the state of the node from non-volatile whereas Start does not. Restart should not be called if the node is being started for the first time.

func (*Raft) SetStateCallback added in v0.2.3

func (r *Raft) SetStateCallback(callback func(leaderId string, state State))

func (*Raft) Start

func (r *Raft) Start() error

Start starts this node if it has not already been started.

If the node does not have an existing configuration, a configuration will be created that only includes this node as a non-voter. If the node has been started before, Restart should be called instead.

func (*Raft) Status

func (r *Raft) Status() Status

Status returns the status of this node. The status includes the ID, address, term, commit index, last applied index, and state of this node.

func (*Raft) Stop

func (r *Raft) Stop()

Stop stops this node if is not already stopped.

func (*Raft) SubmitOperation

func (r *Raft) SubmitOperation(
	operation []byte,
	operationType OperationType,
	timeout time.Duration,
) Future[OperationResponse]

SubmitOperation accepts an operation for application to the state machine and returns a future for the response to the operation. Once the operation has been applied to the state machine, the returned future will be populated with the response.

Even if the operation is submitted successfully, is not guaranteed that it will be applied to the state machine if there are failures. If the operation was unable to be applied to the state machine or the operation times out, the future will be populated with an error.

It may be necessary to resubmit the operation to this node or a different node if it failed. It is the caller's responsibility to implement retry logic and to handle duplicate operations.

func (*Raft) WaitForStableState added in v0.2.3

func (r *Raft) WaitForStableState()

WaitForStableState blocks until this node is in a stable state.

type RequestVoteRequest

type RequestVoteRequest struct {
	// The ID of the candidate requesting the vote.
	CandidateID string

	// The candidate's term.
	Term uint64

	// The index of the candidate's last log entry.
	LastLogIndex uint64

	// The term of the candidate's last log entry.
	LastLogTerm uint64

	// Indicates whether this request is for a prevote.
	Prevote bool
}

RequestVoteRequest is a request invoked by candidates to gather votes.

type RequestVoteResponse

type RequestVoteResponse struct {
	// The term of the server that received the request.
	Term uint64

	// Indicates whether the vote request was successful.
	VoteGranted bool
}

RequestVoteResponse is a response to a request for a vote.

type Response

type Response interface {
	OperationResponse | Configuration
}

Response is the concrete result produced by a node after processing a client submitted operation.

type Result

type Result[T Response] interface {
	// Success returns the response associated with an operation.
	// Error should always be called before Success - the result
	// returned by Success is only valid if Error returns nil.
	Success() T

	// Error returns any error that occurred during the
	// operation that was to produce the response.
	Error() error
}

Result represents an abstract result produced by a node after processing a client submitted operation.

type SnapshotFile

type SnapshotFile interface {
	io.ReadWriteSeeker
	io.Closer

	// Metadata returns the metadata associated with the snapshot file.
	Metadata() SnapshotMetadata

	// Discard deletes the snapshot and its metadata if it is incomplete.
	Discard() error
}

SnapshotFile represents a component for reading and writing snapshots.

type SnapshotMetadata

type SnapshotMetadata struct {
	// The last log index included in the snapshot.
	LastIncludedIndex uint64 `json:"last_included_index"`

	// The last log term included in the snapshot.
	LastIncludedTerm uint64 `json:"last_included_term"`

	// The most up-to-date configuration in the snapshot.
	Configuration []byte `json:"configuration"`
}

SnapshotMetadata contains all metadata associated with a snapshot.

type SnapshotStorage

type SnapshotStorage interface {
	// NewSnapshotFile creates a new snapshot file. It is the caller's responsibility to
	// close the file or discard it when they are done with it.
	NewSnapshotFile(
		lastIncludedIndex uint64,
		lastIncludedTerm uint64,
		configuration []byte,
	) (SnapshotFile, error)

	// SnapshotFile returns the most recent snapshot file. It is the
	// caller's responsibility to close the file when they are done with it.
	SnapshotFile() (SnapshotFile, error)
}

SnapshotStorage represents the component of Raft that manages snapshots created by the state machine.

func NewSnapshotStorage

func NewSnapshotStorage(path string) (SnapshotStorage, error)

NewSnapshotStorage creates a new SnapshotStorage instance.

Snapshots will be stored at path/snapshots. Any directories on the path that do not exist will be created. Each snapshot that is created will have its own directory that is named using a timestamp taken at the time of its creation. Each of these directories will contain two separate files - one for the content of the snapshot and one for its metadata.

type State

type State uint32

State represents the current state of a node. A node may either be shutdown, the leader, or a followers.

const (
	// Leader is a state indicating that the node is responsible for replicating and
	// committing log entries. The leader will accept operations and membership change
	// requests.
	Leader State = iota

	// Follower is a state indicating that a node is responsible for accepting log entries replicated
	// by the leader. A node in the follower state will not accept operations or membership change
	// requests.
	Follower

	// PreCandidate is a state indicating this node is holding a prevote. If this node is able to
	// receive successful RequestVote RPC responses from the majority of the cluster, it will enter the
	// candidate state.
	PreCandidate

	// Candidate is a state indicating that this node is currently holding an election. A node will
	// remain in this state until it is either elected the leader or another node is elected leader
	// thereby causing this node to step down to the follower state.
	Candidate

	// Shutdown is a state indicating that the node is currently offline.
	Shutdown
)

func (State) String

func (s State) String() string

String converts a State into a string.

type StateMachine

type StateMachine interface {
	// Apply applies the given operation to the state machine.
	Apply(operation *Operation) interface{}

	// Snapshot returns a snapshot of the current state of the state machine
	// using the provided writer.
	Snapshot(snapshotWriter io.Writer) error

	// Restore recovers the state of the state machine given a reader for a previously
	// created snapshot.
	Restore(snapshotReader io.Reader) error

	// NeedSnapshot returns true if a snapshot should be taken of the state machine and false
	// otherwise. The provided log size is the number of entries currently in the log.
	NeedSnapshot(logSize int) bool
}

StateMachine is an interface representing a replicated state machine. The implementation must be concurrent safe.

type StateStorage

type StateStorage interface {
	// SetState persists the provided term and vote.
	SetState(term uint64, vote string) error

	// State returns the most recently persisted term and vote in the storage.
	// If there is no pre-existing state, zero and an empty string will be returned.
	State() (uint64, string, error)
}

StateStorage represents the component of Raft responsible for persistently storing term and vote.

func NewStateStorage

func NewStateStorage(path string) (StateStorage, error)

NewStateStorage creates a new instance of a StateStorage.

The file containing the state will be located at path/state/state.bin. Any directories on path that do not exist will be created.

type Status

type Status struct {
	// The unique identifier of this node.
	ID string

	// The address of the node.
	Address string

	// The current term.
	Term uint64

	// The current commit index.
	CommitIndex uint64

	// The index of the last log entry applied to the state machine.
	LastApplied uint64

	// The current state of the node: leader, followers, shutdown.
	State State
}

Status is the status of a node.

type Transport

type Transport interface {
	// Run will start serving incoming RPCs received at the local network address.
	Run() error

	// Shutdown will stop the serving of incoming RPCs.
	Shutdown() error

	// AppendEntries sends an append entries request to the provided address.
	SendAppendEntries(address string, request AppendEntriesRequest) (AppendEntriesResponse, error)

	// RequestVote sends a request vote request to the peer to the provided address.
	SendRequestVote(address string, request RequestVoteRequest) (RequestVoteResponse, error)

	// InstallSnapshot sends a install snapshot request to the provided address.
	SendInstallSnapshot(
		address string,
		request InstallSnapshotRequest,
	) (InstallSnapshotResponse, error)

	// RegisterAppendEntriesHandler registers the function the that will be called when an
	// AppendEntries RPC is received.
	RegisterAppendEntriesHandler(handler func(*AppendEntriesRequest, *AppendEntriesResponse) error)

	// RegisterRequestVoteHandler registers the function that will be called when a
	// RequestVote RPC is received.
	RegisterRequestVoteHandler(handler func(*RequestVoteRequest, *RequestVoteResponse) error)

	// RegisterInstallSnapshotHandler registers the function that will called when an
	// InstallSnapshot RPC is received.
	RegisterInstallSnapshotHandler(
		handler func(*InstallSnapshotRequest, *InstallSnapshotResponse) error,
	)

	// EncodeConfiguration accepts a configuration and encodes it such that it can be
	// decoded by DecodeConfiguration.
	EncodeConfiguration(configuration *Configuration) ([]byte, error)

	// DecodeConfiguration accepts a byte representation of a configuration and decodes
	// it into a configuration.
	DecodeConfiguration(data []byte) (Configuration, error)

	// Address returns the local network address.
	Address() string
}

Transport represents the underlying transport mechanism used by a node in a cluster to send and receive RPCs. It is the implementers responsibility to provide functions that invoke the registered handlers.

func NewTransport

func NewTransport(address string) (Transport, error)

NewTransport creates a new Transport instance.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL