raft

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 10, 2013 License: Apache-2.0, MIT Imports: 19 Imported by: 0

README

go-raft

Build Status

Overview

This is a Go implementation of the Raft distributed consensus protocol. Raft is a protocol by which a cluster of nodes can maintain a replicated state machine. The state machine is kept in sync through the use of a replicated log.

For more details on Raft, you can read In Search of an Understandable Consensus Algorithm by Diego Ongaro and John Ousterhout.

Project Status

This library is feature complete but should be considered experimental until it has seen more usage. If you have any questions on implementing go-raft in your project please file an issue. There is an active community of developers who can help. go-raft is under the MIT license.

Features
  • Leader election
  • Log replication
  • Configuration changes
  • Log compaction
  • Unit tests
  • Fast Protobuf Log Encoding
  • HTTP transport
Projects

These projects are built on go-raft:

  • coreos/etcd - A highly-available key value store for shared configuration and service discovery
  • benbjohnson/raftd - A reference implementation for using the go-raft library for distributed consensus.

If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples.

The Raft Protocol

This section provides a summary of the Raft protocol from a high level. For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: In Search of an Understandable Consensus Algorithm.

Overview

Maintaining state in a single process on a single server is easy. Your process is a single point of authority so there are no conflicts when reading and writing state. Even multi-threaded processes can rely on locks or coroutines to serialize access to the data.

However, in a distributed system there is no single point of authority. Servers can crash or the network between two machines can become unavailable or any number of other problems can occur.

A distributed consensus protocol is used for maintaining a consistent state across multiple servers in a cluster. Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation.

An alternative is the Raft distributed consensus protocol by Diego Ongaro and John Ousterhout. Raft is a protocol built with understandability as a primary tenant and it centers around two things:

  1. Leader Election
  2. Replicated Log

With these two constructs, you can build a system that can maintain state across multiple servers -- even in the event of multiple failures.

Leader Election

The Raft protocol effectively works as a master-slave system whereby state changes are written to a single server in the cluster and are distributed out to the rest of the servers in the cluster. This simplifies the protocol since there is only one data authority and conflicts will not have to be resolved.

Raft ensures that there is only one leader at a time. It does this by performing elections among the nodes in the cluster and requiring that a node must receive a majority of the votes in order to become leader. For example, if you have 3 nodes in your cluster then a single node would need 2 votes in order to become the leader. For a 5 node cluster, a server would need 3 votes to become leader.

Replicated Log

To maintain state, a log of commands is maintained. Each command makes a change to the state of the server and the command is deterministic. By ensuring that this log is replicated identically between all the nodes in the cluster we can replicate the state at any point in time in the log by running each command sequentially.

Replicating the log under normal conditions is done by sending an AppendEntries RPC from the leader to each of the other servers in the cluster (called Peers). Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log.

History

Ben Johnson started this library for use in his behavioral analytics database called Sky. He put it under the MIT license in the hopes that it would be useful for other projects too.

Documentation

Index

Constants

View Source
const (
	Debug = 1
	Trace = 2
)
View Source
const (
	Stopped      = "stopped"
	Follower     = "follower"
	Candidate    = "candidate"
	Leader       = "leader"
	Snapshotting = "snapshotting"
)
View Source
const (
	MaxLogEntriesPerRequest         = 2000
	NumberOfLogEntriesAfterSnapshot = 200
)
View Source
const (
	DefaultHeartbeatTimeout = 50 * time.Millisecond
	DefaultElectionTimeout  = 150 * time.Millisecond
)
View Source
const (
	STOPPED = iota
	READY
	RUNNING
)

Variables

View Source
var CommandTimeoutError = errors.New("raft: Command timeout")
View Source
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
View Source
var NotLeaderError = errors.New("raft.Server: Not current leader")

Functions

func LogLevel

func LogLevel() int

func RegisterCommand

func RegisterCommand(command Command)

Registers a command by storing a reference to an instance of it.

func SetLogLevel

func SetLogLevel(level int)

Types

type AppendEntriesRequest

type AppendEntriesRequest struct {
	Term         uint64
	PrevLogIndex uint64
	PrevLogTerm  uint64
	CommitIndex  uint64
	LeaderName   string
	Entries      []*LogEntry
}

The request sent to a server to append entries to the log.

func (*AppendEntriesRequest) Decode added in v0.1.2

func (req *AppendEntriesRequest) Decode(r io.Reader) (int, error)

Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and any error that occurs.

func (*AppendEntriesRequest) Encode added in v0.1.2

func (req *AppendEntriesRequest) Encode(w io.Writer) (int, error)

Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes written and any error that may have occurred.

type AppendEntriesResponse

type AppendEntriesResponse struct {
	Term uint64
	// the current index of the server
	Index       uint64
	Success     bool
	CommitIndex uint64
	// contains filtered or unexported fields
}

The response returned from a server appending entries to the log.

func (*AppendEntriesResponse) Decode added in v0.1.2

func (resp *AppendEntriesResponse) Decode(r io.Reader) (int, error)

Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and any error that occurs.

func (*AppendEntriesResponse) Encode added in v0.1.2

func (resp *AppendEntriesResponse) Encode(w io.Writer) (int, error)

Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes written and any error that may have occurred.

type Command

type Command interface {
	CommandName() string
	Apply(server *Server) (interface{}, error)
}

A command represents an action to be taken on the replicated state machine.

type CommandEncoder

type CommandEncoder interface {
	Encode(w io.Writer) error
	Decode(r io.Reader) error
}

type Config added in v0.1.1

type Config struct {
	CommitIndex uint64 `json:"commitIndex"`
	// TODO decide what we need to store in peer struct
	Peers []*Peer `json:"peers"`
}

type DefaultJoinCommand

type DefaultJoinCommand struct {
	Name             string `json:"name"`
	ConnectionString string `json:"connectionString"`
}

Join command

func (*DefaultJoinCommand) Apply

func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error)

func (*DefaultJoinCommand) CommandName

func (c *DefaultJoinCommand) CommandName() string

The name of the Join command in the log

func (*DefaultJoinCommand) NodeName

func (c *DefaultJoinCommand) NodeName() string

type DefaultLeaveCommand

type DefaultLeaveCommand struct {
	Name string `json:"name"`
}

Leave command

func (*DefaultLeaveCommand) Apply

func (c *DefaultLeaveCommand) Apply(server *Server) (interface{}, error)

func (*DefaultLeaveCommand) CommandName

func (c *DefaultLeaveCommand) CommandName() string

The name of the Leave command in the log

func (*DefaultLeaveCommand) NodeName

func (c *DefaultLeaveCommand) NodeName() string

type HTTPMuxer

type HTTPMuxer interface {
	HandleFunc(string, func(http.ResponseWriter, *http.Request))
}

type HTTPTransporter

type HTTPTransporter struct {
	DisableKeepAlives bool
	// contains filtered or unexported fields
}

An HTTPTransporter is a default transport layer used to communicate between multiple servers.

func NewHTTPTransporter

func NewHTTPTransporter(prefix string) *HTTPTransporter

Creates a new HTTP transporter with the given path prefix.

func (*HTTPTransporter) AppendEntriesPath

func (t *HTTPTransporter) AppendEntriesPath() string

Retrieves the AppendEntries path.

func (*HTTPTransporter) Install

func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer)

Applies Raft routes to an HTTP router for a given server.

func (*HTTPTransporter) Prefix

func (t *HTTPTransporter) Prefix() string

Retrieves the path prefix used by the transporter.

func (*HTTPTransporter) RequestVotePath

func (t *HTTPTransporter) RequestVotePath() string

Retrieves the RequestVote path.

func (*HTTPTransporter) SendAppendEntriesRequest

func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse

Sends an AppendEntries RPC to a peer.

func (*HTTPTransporter) SendSnapshotRecoveryRequest

func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse

Sends a SnapshotRequest RPC to a peer.

func (*HTTPTransporter) SendSnapshotRequest

func (t *HTTPTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse

Sends a SnapshotRequest RPC to a peer.

func (*HTTPTransporter) SendVoteRequest

func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse

Sends a RequestVote RPC to a peer.

type JoinCommand

type JoinCommand interface {
	CommandName() string
	Apply(server *Server) (interface{}, error)
	NodeName() string
}

Join command interface

type LeaveCommand

type LeaveCommand interface {
	CommandName() string
	Apply(server *Server) (interface{}, error)
	NodeName() string
}

Leave command interface

type Log

type Log struct {
	ApplyFunc func(Command) (interface{}, error)
	// contains filtered or unexported fields
}

A log is a collection of log entries that are persisted to durable storage.

func (*Log) CommitIndex

func (l *Log) CommitIndex() uint64

The last committed index in the log.

type LogEntry

type LogEntry struct {
	Index       uint64
	Term        uint64
	CommandName string
	Command     []byte
	Position    int64 // position in the log file
	// contains filtered or unexported fields
}

A log entry stores a single item in the log.

type NOPCommand

type NOPCommand struct {
}

NOP command

func (NOPCommand) Apply

func (c NOPCommand) Apply(server *Server) (interface{}, error)

func (NOPCommand) CommandName

func (c NOPCommand) CommandName() string

The name of the NOP command in the log

func (NOPCommand) Decode

func (c NOPCommand) Decode(r io.Reader) error

func (NOPCommand) Encode

func (c NOPCommand) Encode(w io.Writer) error

type Peer

type Peer struct {
	Name             string `json:"name"`
	ConnectionString string `json:"connectionString"`
	// contains filtered or unexported fields
}

A peer is a reference to another server involved in the consensus protocol.

type RequestVoteRequest

type RequestVoteRequest struct {
	Term          uint64
	LastLogIndex  uint64
	LastLogTerm   uint64
	CandidateName string
	// contains filtered or unexported fields
}

The request sent to a server to vote for a candidate to become a leader.

func (*RequestVoteRequest) Decode added in v0.1.2

func (req *RequestVoteRequest) Decode(r io.Reader) (int, error)

Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and any error that occurs.

func (*RequestVoteRequest) Encode added in v0.1.2

func (req *RequestVoteRequest) Encode(w io.Writer) (int, error)

Encodes the RequestVoteRequest to a buffer. Returns the number of bytes written and any error that may have occurred.

type RequestVoteResponse

type RequestVoteResponse struct {
	Term        uint64
	VoteGranted bool
	// contains filtered or unexported fields
}

The response returned from a server after a vote for a candidate to become a leader.

func (*RequestVoteResponse) Decode added in v0.1.2

func (resp *RequestVoteResponse) Decode(r io.Reader) (int, error)

Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and any error that occurs.

func (*RequestVoteResponse) Encode added in v0.1.2

func (resp *RequestVoteResponse) Encode(w io.Writer) (int, error)

Encodes the RequestVoteResponse to a buffer. Returns the number of bytes written and any error that may have occurred.

type Server

type Server struct {
	// contains filtered or unexported fields
}

A server is involved in the consensus protocol and can act as a follower, candidate or a leader.

func NewServer

func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error)

Creates a new server with a log at the given path.

func (*Server) AddPeer

func (s *Server) AddPeer(name string, connectiongString string) error

Adds a peer to the server.

func (*Server) AppendEntries

func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse

Appends zero or more log entry from the leader to this server.

func (*Server) CommitIndex

func (s *Server) CommitIndex() uint64

Retrieves the current commit index of the server.

func (*Server) Context

func (s *Server) Context() interface{}

Retrieves the context passed into the constructor.

func (*Server) Do

func (s *Server) Do(command Command) (interface{}, error)

func (*Server) ElectionTimeout

func (s *Server) ElectionTimeout() time.Duration

Retrieves the election timeout.

func (*Server) GetState

func (s *Server) GetState() string

Get the state of the server for debugging

func (*Server) HeartbeatTimeout

func (s *Server) HeartbeatTimeout() time.Duration

Retrieves the heartbeat timeout.

func (*Server) IsLogEmpty

func (s *Server) IsLogEmpty() bool

Retrieves whether the server's log has no entries.

func (*Server) LastCommandName

func (s *Server) LastCommandName() string

A reference to the command name of the last entry.

func (*Server) Leader

func (s *Server) Leader() string

The name of the current leader.

func (*Server) LoadSnapshot

func (s *Server) LoadSnapshot() error

Load a snapshot at restart

func (*Server) LogEntries

func (s *Server) LogEntries() []*LogEntry

A list of all the log entries. This should only be used for debugging purposes.

func (*Server) LogPath

func (s *Server) LogPath() string

Retrieves the log path for the server.

func (*Server) MemberCount

func (s *Server) MemberCount() int

Retrieves the number of member servers in the consensus.

func (*Server) Name

func (s *Server) Name() string

Retrieves the name of the server.

func (*Server) Path

func (s *Server) Path() string

Retrieves the storage path for the server.

func (*Server) Peers

func (s *Server) Peers() map[string]*Peer

Retrieves a copy of the peer data.

func (*Server) QuorumSize

func (s *Server) QuorumSize() int

Retrieves the number of servers required to make a quorum.

func (*Server) RemovePeer

func (s *Server) RemovePeer(name string) error

Removes a peer from the server.

func (*Server) RequestSnapshot

func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse

func (*Server) RequestVote

func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse

Requests a vote from a server. A vote can be obtained if the vote's term is at the server's current term and the server has not made a vote yet. A vote can also be obtained if the term is greater than the server's current term.

func (*Server) Running

func (s *Server) Running() bool

Checks if the server is currently running.

func (*Server) SetElectionTimeout

func (s *Server) SetElectionTimeout(duration time.Duration)

Sets the election timeout.

func (*Server) SetHeartbeatTimeout

func (s *Server) SetHeartbeatTimeout(duration time.Duration)

Sets the heartbeat timeout.

func (*Server) SetTransporter

func (s *Server) SetTransporter(t Transporter)

func (*Server) SnapshotPath

func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string

Retrieves the log path for the server.

func (*Server) SnapshotRecoveryRequest

func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse

func (*Server) Start

func (s *Server) Start() error

func (*Server) State

func (s *Server) State() string

Retrieves the current state of the server.

func (*Server) Stop

func (s *Server) Stop()

Shuts down the server.

func (*Server) TakeSnapshot added in v0.1.1

func (s *Server) TakeSnapshot() error

func (*Server) Term

func (s *Server) Term() uint64

Retrieves the current term of the server.

func (*Server) Transporter

func (s *Server) Transporter() Transporter

Retrieves the object that transports requests.

func (*Server) VotedFor

func (s *Server) VotedFor() string

Retrieves the name of the candidate this server voted for in this term.

type Snapshot

type Snapshot struct {
	LastIndex uint64 `json:"lastIndex"`
	LastTerm  uint64 `json:"lastTerm"`
	// cluster configuration.
	Peers []*Peer `json: "peers"`
	State []byte  `json: "state"`
	Path  string  `json: "path"`
}

the in memory SnapShot struct TODO add cluster configuration

type SnapshotRecoveryRequest

type SnapshotRecoveryRequest struct {
	LeaderName string
	LastIndex  uint64
	LastTerm   uint64
	Peers      []*Peer
	State      []byte
}

The request sent to a server to start from the snapshot.

func (*SnapshotRecoveryRequest) Decode added in v0.1.2

func (req *SnapshotRecoveryRequest) Decode(r io.Reader) (int, error)

Decodes the SnapshotRecoveryRequest from a buffer. Returns the number of bytes read and any error that occurs.

func (*SnapshotRecoveryRequest) Encode added in v0.1.2

func (req *SnapshotRecoveryRequest) Encode(w io.Writer) (int, error)

Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes written and any error that may have occurred.

type SnapshotRecoveryResponse

type SnapshotRecoveryResponse struct {
	Term        uint64
	Success     bool
	CommitIndex uint64
}

The response returned from a server appending entries to the log.

func (*SnapshotRecoveryResponse) Decode added in v0.1.2

func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error)

Decodes the SnapshotRecoveryResponse from a buffer. Returns the number of bytes read and any error that occurs.

func (*SnapshotRecoveryResponse) Encode added in v0.1.2

func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error)

Encodes the SnapshotRecoveryResponse to a buffer. Returns the number of bytes written and any error that may have occurred.

type SnapshotRequest

type SnapshotRequest struct {
	LeaderName string
	LastIndex  uint64
	LastTerm   uint64
}

The request sent to a server to start from the snapshot.

func (*SnapshotRequest) Decode added in v0.1.2

func (req *SnapshotRequest) Decode(r io.Reader) (int, error)

Decodes the SnapshotRequest from a buffer. Returns the number of bytes read and any error that occurs.

func (*SnapshotRequest) Encode added in v0.1.2

func (req *SnapshotRequest) Encode(w io.Writer) (int, error)

Encodes the SnapshotRequest to a buffer. Returns the number of bytes written and any error that may have occurred.

type SnapshotResponse

type SnapshotResponse struct {
	Success bool `json:"success"`
}

The response returned if the follower entered snapshot state

func (*SnapshotResponse) Decode added in v0.1.2

func (resp *SnapshotResponse) Decode(r io.Reader) (int, error)

Decodes the SnapshotResponse from a buffer. Returns the number of bytes read and any error that occurs.

func (*SnapshotResponse) Encode added in v0.1.2

func (resp *SnapshotResponse) Encode(w io.Writer) (int, error)

Encodes the SnapshotResponse to a buffer. Returns the number of bytes written and any error that may have occurred.

type StateMachine

type StateMachine interface {
	Save() ([]byte, error)
	Recovery([]byte) error
}

StateMachine is the interface for allowing the host application to save and recovery the state machine

type Transporter

type Transporter interface {
	SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
	SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
	SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
	SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse
}

Transporter is the interface for allowing the host application to transport requests to other nodes.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL