raft

package module
v0.0.0-...-55a3443 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 22, 2019 License: MIT Imports: 21 Imported by: 0

README

build

Raft

This library provides a simple, easy-to-understand, and reliable implementation of Raft using Go. Raft is a consensus protocol designed to manage replicated logs in a distributed system. Its purpose is to ensure fault-tolerant coordination and consistency among a group of nodes, making it suitable for building reliable systems. Potential use cases include distributed file systems, consistent key-value stores, and service discovery.

Protocol Overview

Raft is based on a leader-follower model, where one node is elected as the leader and coordinates the replication process. Time is divided into terms, and the leader is elected for each term through a leader election process. The leader receives client requests, which are then replicated to other nodes called followers. The followers maintain a log of all state changes, and the leader's responsibility is to ensure that all followers have consistent logs by sending them entries to append. Safety is guaranteed by requiring a majority of nodes to agree on the state changes, ensuring that no conflicting states are committed.

Future Features

The following features are currently in development or are intended to be added in the future:

  • Batched log writes to improve disk I/O
  • Membership changes
  • Read-only operations

Other developers are encouraged to contribute to this project and pull requests are welcome.

Documentation

Overview

This library provides a simple, easy-to-understand, and reliable implementation of Raft using Go. Raft is a consensus protocol designed to manage replicated logs in a distributed system. Its purpose is to ensure fault-tolerant coordination and consistency among a group of nodes, making it suitable for building reliable systems. Potential use cases include distributed file systems, consistent key-value stores, and service discovery.

There are two ways that this library can be used. The first way is to use the Raft implementation to create a custom server. This may be useful if you wish to use a different communication protocol or the provided storage implementations are not sufficient for your use case. The second way is to use the provided Server implementation. This is covered below.

To set up a server, the first step is to define the state machine that is to be replicated. This state machine must implement that StateMachine interface, and it must be concurrent safe. Here is an example of a type that implements the StateMachine interface.

// Op represents an operation on the state machine.
type Op int

const (
    // Increment increments the counter by one.
    Increment Op = iota

    // Decrement decrements the counter by one.
    Decrement
)

// Result represents the result of applying an operation
// to the state machine.
type Result struct {
    // The value of the counter after applying the operation.
    Value int

    // Any errors encountered while applying the operation.
    Err error
}

// StateMachine represents a simple counter.
type StateMachine struct {
    // The current count.
    count              int

    // The last index applied to the state machine. Used for snapshots.
    lastIndex          uint64

    // The term associated with the last applied index. Used for snapshots.
    lastTerm           uint64

    // Makes the state machine concurrent safe.
    mu                 sync.Mutex
}

func (sm *StateMachine) Apply(entry *raft.LogEntry) interface{} {
    sm.mu.Lock()
    defer sm.mu.Unlock()

    // Save the term and index of the last seen entry for snapshotting.
    sm.lastIndex = entry.Index
    sm.lastTerm = entry.Term

    // Decode the operation.
    var decodedOp int
    buf := bytes.NewBuffer(entry.Data)
    dec := gob.NewDecoder(buf)
    if err := dec.Decode(decodedOp); err != nil {
        return Result{Err: err}
    }

    // Apply the operation.
    op := decodedOp.(Op)
    switch op {
    case Increment:
        sm.count++
    case Decrement:
        sm.count--
    }

    return Result{Value: sm.count, Error: nil}
}

func (sm *StateMachine) Snapshot() (raft.Snapshot, error) {
    sm.mu.Lock()
    defer sm.mu.Unlock()

    // Encode the state of the state machine.
    var buf bytes.Buffer
    enc := gob.NewEncoder(&buf)
    if err := enc.Encode(sm.count); err != nil {
        return raft.Snapshot{}, err
    }

    // Create the snapshot.
    snapshot := raft.Snapshot{
        LastIncludedIndex: sm.lastIndex,
        LastIncludedTerm:  sm.lastTerm,
        Data:              buf.Bytes(),
    }

    return snapshot, nil
}

func (sm *StateMachine) Restore(snapshot *raft.Snapshot) error {
    sm.mu.Lock()
    defer sm.mu.Unlock()

    // Decode the bytes of the snapshot.
    var count int
    buf := bytes.NewBuffer(snapshot.Data)
    dec := gob.NewDecoder(buf)
    if err := dec.Decode(&count); err != nil {
        return err
    }

    // Restore the state of the state machine.
    sm.count = count

    // Update the last seen index and term since the state has been
    // restored up to this point.
    sm.lastIndex = snapshot.LastIncludedIndex
    sm.lastTerm = snapshot.LastIncludedTerm

    return nil
}

func (sm *StateMachine) NeedSnapshot() bool {
    s.mu.Lock()
    defer s.mu.Unlock()

    // This probably is not a realisitic condition for needing a snapshot, but
    // this state machine is only a counter.
    return s.lastIndex % 100 == 0
}

Now, create a map that maps server IDs to their respective address. This map should contain the ID and address of all the servers in the cluster, including this one.

peers := map[string]net.Addr{
    "raft-1": "127.0.0.0:8080",
    "raft-2": "127.0.0.1:8080",
    "raft-3": "127.0.0.2:8080",
}

Then, select the paths for where the server persist its state. Note that if the file at the path exists, the server will read it into memory and initialize itself with the content. Otherwise, the server will create the file. There are three paths the server expects: the log path, the storage path, and the snapshot storage path. The log path specifies where the server will persist its log entries. The storage path specifies where the server will persist its term and vote. The snapshot path specifies where the server will persist any snapshots that it takes.

logPath := "raft-1-log"
storagePath := "raft-1-storage"
snapshotPath := "raft-1-snapshots"

Now, create the channel that responses from the state machine will be relayed over. Note that, when the server is started, it is important that this channel is always being monitored. Otherwise, the internal Raft implementation will become blocked.

responseCh := make(chan raft.CommandResponse)

Next, create an instance of the state machine implementation.

fsm := new(StateMachine)

The server may now be created.

server, err := raft.NewServer("raft-1", peers, fsm, logPath, storagePath, snapshotPath, responseCh)

Here is how to start the server.

// This sends a signal to the Raft implementation to start.
readyCh := make(chan interface)

// Once Start is called, the server is prepared to start receiving RPCs.
err := server.Start(readyCh)
if err != nil {
    panic(err)
}

// Start a go routine in the background to intercept responses from the state machine.
go func() {
    for response := range responseCh {
        // Handle responses...
    }
}()

// Start Raft.
close(readyCh)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AppendEntriesRequest

type AppendEntriesRequest struct {
	// The leader's ID. Allows followers to redirect clients.
	LeaderID string

	// The leader's Term.
	Term uint64

	// The leader's commit index.
	LeaderCommit uint64

	// The index of the log entry immediately preceding the new ones.
	PrevLogIndex uint64

	// The term of the log entry immediately preceding the new ones.
	PrevLogTerm uint64

	// Contains the log Entries to store (empty for heartbeat).
	Entries []*LogEntry
}

AppendEntriesRequest is a request invoked by the leader to replicate log entries and also serves as a heartbeat.

type AppendEntriesResponse

type AppendEntriesResponse struct {
	// The term of the server that received the request.
	Term uint64

	// Indicates whether the request to append entries was successful.
	Success bool

	// The conflicting Index if there is one.
	Index uint64
}

AppendEntriesResponse is a response to a request to to replicate log entries.

type InstallSnapshotRequest

type InstallSnapshotRequest struct {
	// The leader's ID.
	LeaderID string

	// The leader's Term.
	Term uint64

	// The snapshot replaces all entries up to and including
	// this index.
	LastIncludedIndex uint64

	// The term associated with the last included index.
	LastIncludedTerm uint64

	// The state of the state machine in Bytes.
	Bytes []byte
}

InstallSnapshotRequest is invoked by the leader to send a snapshot to a follower.

type InstallSnapshotResponse

type InstallSnapshotResponse struct {
	// The term of the server that recieved the request.
	Term uint64
}

InstallSnapshotResponse is a response to a snapshot installation.

type Log

type Log interface {
	// Open opens the log for reading and writing.
	Open() error

	// Replay reads the persisted state of the log into
	// memory. Log must be open.
	Replay() error

	// Close closes the log.
	Close() error

	// GetEntry returns the log entry located at the specified index.
	GetEntry(index uint64) (*LogEntry, error)

	// AppendEntry appends a log entry to the log.
	AppendEntry(entry *LogEntry) error

	// AppendEntries appends multiple log entries to the log.
	AppendEntries(entries []*LogEntry) error

	// Truncate deletes all log entries with index greater than
	// or equal to the provided index.
	Truncate(index uint64) error

	// DiscardEntries deletes all in-memory and persistent data in the
	// log. The provided term and index indicate at what term and index
	// the now empty log will start at. Primarily intended to be used
	// for snapshotting.
	DiscardEntries(index uint64, term uint64) error

	// Compact deletes all log entries with index less than
	// or equal to the provided index.
	Compact(index uint64) error

	// Contains checks if the log contains an entry at the specified index.
	Contains(index uint64) bool

	// LastIndex returns the largest index that exists in the log and zero
	// if the log is empty.
	LastIndex() uint64

	// LastTerm returns the largest term in the log and zero if the log
	// is empty.
	LastTerm() uint64

	// NextIndex returns the next index to append to the log.
	NextIndex() uint64
}

Log is an interface representing the internal component of RaftCore that is responsible for durably storing and retrieving log entries.

type LogEntry

type LogEntry struct {
	// The Index of the log entry.
	Index uint64

	// The Term of the log entry.
	Term uint64

	// The Offset of the log entry.
	Offset int64

	// The Data of the log entry.
	Data []byte
}

LogEntry represents a log entry in the log.

func NewLogEntry

func NewLogEntry(index uint64, term uint64, data []byte) *LogEntry

NewLogEntry creates a new instance of LogEntry with the provided index, term, and data.

func (*LogEntry) IsConflict

func (e *LogEntry) IsConflict(other *LogEntry) bool

IsConflict checks whether the current log entry conflicts with another log entry. Two log entries are considered conflicting if they have the same index but different terms.

type Logger

type Logger interface {
	// Debug logs a message at debug level.
	Debug(args ...interface{})

	// Debugf logs a formatted message at debug level.
	Debugf(format string, args ...interface{})

	// Info logs a message at info level.
	Info(args ...interface{})

	// Infof logs a formatted message at info level.
	Infof(format string, args ...interface{})

	// Warn logs a message at warn level.
	Warn(args ...interface{})

	// Warnf logs a formatted message at warn level.
	Warnf(format string, args ...interface{})

	// Error logs a message at error level.
	Error(args ...interface{})

	// Errorf logs a formatted message at error level.
	Errorf(format string, args ...interface{})

	// Fatal logs a message at fatal level.
	Fatal(args ...interface{})

	// Fatalf logs a formatted message at fatal level.
	Fatalf(format string, args ...interface{})
}

Logger supports logging messages at the debug, info, warn, error, and fatal level.

type Operation

type Operation struct {
	// The operation as bytes. The provided state machine should be capable
	// of decoding these bytes.
	Bytes []byte
}

Operation is an operation that will be applied to the state machine. An operation must be deterministic.

type OperationResponse

type OperationResponse struct {
	// The term of the log entry containing the applied operation.
	Term uint64

	// The index of the log entry containing the applied operation.
	Index uint64

	// The bytes of the operation applied to the state machine.
	Operation []byte

	// The response returned by the state machine after applying the operation.
	Response interface{}
}

OperationResponse is the response that is generated after applying an operation to the state machine.

type Option

type Option func(options *options) error

Option is a function that updates the options associated with Raft.

func WithElectionTimeout

func WithElectionTimeout(time time.Duration) Option

WithElectionTimeout sets the election timeout for the Raft server.

func WithHeartbeatInterval

func WithHeartbeatInterval(time time.Duration) Option

WithHeartbeatInterval sets the heartbeat interval for the Raft server.

func WithLogger

func WithLogger(logger Logger) Option

WithLogger sets the logger used by the Raft server.

func WithMaxEntriesPerRPC

func WithMaxEntriesPerRPC(maxEntriesPerRPC int) Option

WithMaxEntriesPerRPC sets the maximum number of log entries that can be transmitted via an AppendEntries RPC.

type Peer

type Peer interface {
	// ID returns the ID of the peer.
	ID() string

	// Connect establishes a connection with the peer.
	Connect() error

	// Disconnect tears down a connection with the peer.
	Disconnect() error

	// AppendEntries sends an AppendEntriesRequest to the peer and returns an AppendEntriesResponse and an error
	// if the request was unsuccessful.
	AppendEntries(request AppendEntriesRequest) (AppendEntriesResponse, error)

	// RequestVote sends a RequestVoteRequest to the peer and returns a RequestVoteResponse and an error
	// if the request was unsuccessful.
	RequestVote(request RequestVoteRequest) (RequestVoteResponse, error)

	// InstallSnapshot sends a InstallSnapshotRequest to the peer and returns a InstallSnapshotResponse and an error
	// if the request was unsuccessful.
	InstallSnapshot(request InstallSnapshotRequest) (InstallSnapshotResponse, error)
}

Peer is an interface representing a component responsible for establishing a connection with and making RPCs to a Raft server.

type PersistentState

type PersistentState struct {
	// The term of the associated Raft instance.
	Term uint64

	// The vote of the associated Raft instance.
	VotedFor string
}

PersistentState is the state that must be persisted in Raft.

type Raft

type Raft struct {
	// contains filtered or unexported fields
}

Raft represents the consensus module in the Raft architecture, a distributed consensus algorithm designed for fault-tolerant systems. This implementation of Raft should be utilized as the internal logic for an actual server, as it solely encapsulates the core functionality of Raft and cannot operate as a standalone server.

func NewRaft

func NewRaft(id string, peers map[string]Peer, log Log, storage Storage, snapshotStorage SnapshotStorage,
	fsm StateMachine, responseCh chan<- OperationResponse, opts ...Option) (*Raft, error)

NewRaft creates a new instance of Raft that is configured with the provided options. Responses from applying operations to the state machine will be sent over the provided response channel. If the log, storage, or snapshot storage contains any persisted state, it will be read and this Raft instance will be initialized with that state.

func (*Raft) AppendEntries

func (r *Raft) AppendEntries(request *AppendEntriesRequest, response *AppendEntriesResponse) error

AppendEntries is invoked by the leader to replicate log entries.

func (*Raft) InstallSnapshot

func (r *Raft) InstallSnapshot(request *InstallSnapshotRequest, response *InstallSnapshotResponse) error

InstallSnapshot is invoked by the leader to send a snapshot to a follower.

func (*Raft) ListSnapshots

func (r *Raft) ListSnapshots() []Snapshot

ListSnapshots returns an array of all the snapshots that have been taken.

func (*Raft) RequestVote

func (r *Raft) RequestVote(request *RequestVoteRequest, response *RequestVoteResponse) error

RequestVote is invoked by the candidate server to gather a vote from this server.

func (*Raft) Start

func (r *Raft) Start() error

Start starts the Raft instance if it is not already started. Once started, the Raft instance transitions to the follower state and is ready to start sending and receiving RPCs.

func (*Raft) Status

func (r *Raft) Status() Status

Status returns the status of the Raft instance. The status includes the ID, term, commit index, last applied index, and state.

func (*Raft) Stop

func (r *Raft) Stop() error

Stop stops the Raft instance if is not already stopped.

func (*Raft) SubmitOperation

func (r *Raft) SubmitOperation(operation Operation) (uint64, uint64, error)

SubmitOperation accepts a operation from a client for replication and returns the log index assigned to the operation, the term assigned to the operation, and an error if this server is not the leader. Note that submitting a operation for replication does not guarantee replication if there are failures. Once the operation has been replicated and applied to the state machine, the response will be sent over the provided response channel.

type RequestVoteRequest

type RequestVoteRequest struct {
	// The ID of the candidate requesting the vote.
	CandidateID string

	// The candidate's term.
	Term uint64

	// The index of the candidate's last log entry.
	LastLogIndex uint64

	// The term of the candidate's last log entry.
	LastLogTerm uint64
}

RequestVoteRequest is a request invoked by candidates to gather votes.

type RequestVoteResponse

type RequestVoteResponse struct {
	// The term of the server that received the request.
	Term uint64

	// Indicates whether the vote request was successful.
	VoteGranted bool
}

RequestVoteResponse is a response to a request for a vote.

type Server

type Server struct {
	pb.UnimplementedRaftServer
	// contains filtered or unexported fields
}

Server is a wrapper for a Raft instance that implements the logic of the Raft consensus algorithm. It serves requests for Raft using protobuf and gRPC, making it capable of providing replication and fault tolerance.

func NewServer

func NewServer(id string, peers map[string]string, fsm StateMachine, logPath string, storagePath string, snapshotPath string,
	responseCh chan<- OperationResponse, opts ...Option) (*Server, error)

NewServer creates a new instance of a Server with the provided ID. The provided peers are the peers that will make up the cluster, including the ID and network address of this server. The log path, storage path, and snapshot path specify the locations where the underlying Raft instance persists its state. If the state is already persisted at these paths, it will be read into memory and Raft will be initialized with that state. Otherwise, new files will be created at those paths. Responses from the state machine after applying an operation will be sent over the provided response channel. The response channel must be monitored; otherwise, the server may be blocked.

func (*Server) AppendEntries

func (s *Server) AppendEntries(ctx context.Context, request *pb.AppendEntriesRequest) (*pb.AppendEntriesResponse, error)

AppendEntries handles the AppendEntries gRPC request. It converts the request to the internal representation, invokes the AppendEntries function on the Raft instance, and returns the response.

func (*Server) InstallSnapshot

func (s *Server) InstallSnapshot(ctx context.Context, request *pb.InstallSnapshotRequest) (*pb.InstallSnapshotResponse, error)

InstallSnapshot handles the InstallSnapshot gRPC request. It converts the request to the internal representation, invokes the InstallSnapshot function on the Raft instance, and returns the response.

func (*Server) IsStarted

func (s *Server) IsStarted() bool

IsStarted checks if the server is started. It returns true if the server is started, false otherwise.

func (*Server) ListSnapshots

func (s *Server) ListSnapshots() []Snapshot

ListSnapshots returns an array of all the snapshots that the underlying Raft instance has taken.

func (*Server) RequestVote

func (s *Server) RequestVote(ctx context.Context, request *pb.RequestVoteRequest) (*pb.RequestVoteResponse, error)

RequestVote handles the RequestVote gRPC request. It converts the request to the internal representation, invokes the RequestVote function on the Raft instance, and returns the response.

func (*Server) Start

func (s *Server) Start(ready <-chan interface{}) error

Start starts the server. It listens for incoming connections on the configured network address, starts the Raft instance, and serves gRPC requests on the listener. The provided channel is used to signal the server to start serving requests.

func (*Server) Status

func (s *Server) Status() Status

Status returns the status of the server. It retrieves the status from the underlying Raft instance. The status includes the ID, commit index, last-applied index, term, and state of the Raft instance.

func (*Server) Stop

func (s *Server) Stop()

Stop stops the server. It gracefully stops the gRPC server, stops the Raft instance, and closes the listener.

func (*Server) SubmitOperation

func (s *Server) SubmitOperation(operation Operation) (uint64, uint64, error)

SubmitOperation submits a operation to the server for processing. It forwards the operation to the underlying Raft instance for handling and returns the index and term assigned to the operation, as well as an error if submitting the operation failed.

type Snapshot

type Snapshot struct {
	// Last index included in snapshot
	LastIncludedIndex uint64

	// Last term included in snapshot.
	LastIncludedTerm uint64

	// State of replicated state machine.
	Data []byte
}

Snapshot represents a snapshot of the replicated state machine.

func NewSnapshot

func NewSnapshot(lastIncludedIndex uint64, lastIncludedTerm uint64, data []byte) *Snapshot

NewSnapshot creates a new Snapshot instance with the provided state data from the replicated state machine.

type SnapshotStorage

type SnapshotStorage interface {
	// Open opens the snapshot storage for reading and writing snapshots.
	Open() error

	// Replay reads the persisted state of the snapshot store
	// into memory.
	Replay() error

	// Close closes the snapshot storage.
	Close() error

	// LastSnapshot gets the most recently saved snapshot if it exists.
	LastSnapshot() (Snapshot, bool)

	// SaveSnapshot saves the provided snapshot to durable storage.
	SaveSnapshot(snapshot *Snapshot) error

	// ListSnapshots returns an array of the snapshots that have been saved.
	ListSnapshots() []Snapshot
}

SnapshotStorage is an interface representing the internal component of Raft that is responsible for managing snapshots.

type State

type State uint32

State represents the current state of a Raft server. A Raft server may either be shutdown, the leader, or a follower.

const (
	// Leader is a state indicating that a server is responsible for replicating and
	// committing log entries. Typically, only one server in a cluster will be the leader.
	// However, if there are partitions or other failures, it is possible there is more than
	// one leader.
	Leader State = iota

	// Follower is a state indicating that a server is responsible for accepting log entries replicated
	// by the leader.
	// A server in the follower state may not accept operations for replication.
	Follower

	// Shutdown is a state indicating that the server is currently offline.
	Shutdown
)

func (State) String

func (s State) String() string

String converts a State into a string.

type StateMachine

type StateMachine interface {
	// Apply applies the given log entry to the state machine.
	Apply(entry *LogEntry) interface{}

	// Snapshot returns a snapshot of the current state of the state machine.
	// The bytes contained in the snapshot must be serialized in a way that
	// the Restore function can understand.
	Snapshot() (Snapshot, error)

	// Restore recovers the state of the state machine given a snapshot that was produced
	// by Snapshot.
	Restore(snapshot *Snapshot) error

	// NeedSnapshot returns true if a snapshot should be taken of the state machine and false
	// otherwise.
	NeedSnapshot() bool
}

StateMachine is an interface representing a replicated state machine. The implementation must be concurrent safe.

type Status

type Status struct {
	// The ID of the Raft instance.
	ID string

	// The current term.
	Term uint64

	// The current commit index.
	CommitIndex uint64

	// The index of the last log entry applied to the state machine.
	LastApplied uint64

	// The current state of Raft instance: leader, follower, shutdown.
	State State
}

Status is the status of a Raft instance.

type Storage

type Storage interface {
	// Open opens the storage for reading and writing persisting state.
	Open() error

	// Close closes the storage.
	Close() error

	// SetState persists the provided state in the storage. Storage must be open.
	SetState(persistentState *PersistentState) error

	// GetState recovers the state from the storage. Storage must be open.
	GetState() (PersistentState, error)
}

Storage is an interface representing the internal component of Raft that is responsible for durably storing the vote and term.

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL