craft

package module
v0.0.0-...-2333129 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2019 License: MPL-2.0 Imports: 28 Imported by: 0

README

craft

craft is a Go library that manages a replicated log and can be used with a finite state machine to manage replicated state machines. It is a library for providing consensus.

The use cases for such a library are far-reaching as replicated state machines are a key component of many distributed systems. They enable building Consistent, Partition Tolerant (CP) systems, with limited fault tolerance as well.

Building

If you wish to build craft you'll need Go version 1.2+ installed.

Build the library with:

go build

Please check your installation with:

go version

Documentation

TODO: Update links.

For complete documentation, see the associated Godoc.

Getting started

craft relies on clock synchronization. To get the best performance, the synchronization error should be smaller than the one-way delay between any two nodes in the cluster.

An example use of craft is provided here, which is a replicated key-value store.

Protocol

craft is based on the CRaft consensus protocol, which is based on "Raft: In Search of an Understandable Consensus Algorithm"

A high level overview of the CRaft protocol is described below, but for details please read the full paper. This section will first describe the Raft protocol, and then the CRaft protocol.

Raft

Raft servers are always in one of three states: follower, candidate or leader. All servers initially start out as a follower. In this state, servers can accept log entries from a leader and cast votes. If no entries are received for some time, servers self-promote to the candidate state. In the candidate state servers request votes from their peers. If a candidate receives a quorum of votes, then it is promoted to a leader. The leader must accept new log entries and replicate to all the other followers. In addition, if stale reads are not acceptable, all queries must also be performed on the leader.

Once a cluster has a leader, it is able to accept new log entries. A client can request that a leader append a new log entry, which is an opaque binary blob to Raft. The leader then writes the entry to durable storage and attempts to replicate to a quorum of followers. Once the log entry is considered committed, it can be applied to a finite state machine. The finite state machine is application specific, and is implemented using an interface.

An obvious question relates to the unbounded nature of a replicated log. Raft provides a mechanism by which the current state is snapshotted, and the log is compacted. Because of the FSM abstraction, restoring the state of the FSM must result in the same state as a replay of old logs. This allows Raft to capture the FSM state at a point in time, and then remove all the logs that were used to reach that state. This is performed automatically without user intervention, and prevents unbounded disk usage as well as minimizing time spent replaying logs.

Lastly, there is the issue of updating the peer set when new servers are joining or existing servers are leaving. As long as a quorum of servers is available, this is not an issue as Raft provides mechanisms to dynamically update the peer set. If a quorum of servers is unavailable, then this becomes a very challenging issue. For example, suppose there are only 2 peers, A and B. The quorum size is also 2, meaning both servers must agree to commit a log entry. If either A or B fails, it is now impossible to reach quorum. This means the cluster is unable to add, or remove a server, or commit any additional log entries. This results in unavailability. At this point, manual intervention would be required to remove either A or B, and to restart the remaining server in bootstrap mode.

A Raft cluster of 3 servers can tolerate a single server failure, while a cluster of 5 can tolerate 2 server failures. The recommended configuration is to either run 3 or 5 raft servers. This maximizes availability without greatly sacrificing performance.

In terms of performance, Raft is comparable to Paxos. Assuming stable leadership, committing a log entry requires a single round trip to half of the cluster.

CRaft

CRaft is a multi-leader extension to Raft. It tries to solve the single leader bottleneck, and is suitable for uses where high throughput is demanded.

CRaft runs multiple groups of Raft concurrently. Each group operates as normal Raft: it elects a leader, and manages a replicated log. A server may be a leader for a group, and a follower for other groups at the same time. It may also be a follower for every group. The leader server of each group can handle client requests for that group.

Each log entry contains a timestamp taken by the leader of its group. The log entries from different groups are merged into a merged log based on timestamps of entries. The merging happens locally on each server. Each server's state machine applies entries in its merged log in order.

During normal operation, a client can send requests to any leader server (request leader). The workflow for handling a client request is as follows:

  1. The leader of a group receives a command from the client.
  2. The leader creates a log entry with the command, and assigns a timestamp. It appends the entry to its local log, and replicates it to followers. The entry is committed when it is replicated on a majority of servers.
  3. The entry is merged into the merged log.
  4. The leader executes the command in its state machine.
  5. The leader returns the execution result to the client.

CRaft provides the same safety guarantee as Raft.

An optimization for write requests is that the requests may be responded before they are executed, but CRaft guarantees that any later reads will see the most recent writes.

Documentation

Index

Constants

View Source
const (

	// DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport.
	DefaultTimeoutScale = 256 * 1024 // 256KB

)

Variables

View Source
var (
	// ErrLeader is returned when an operation can't be completed on a
	// leader node.
	ErrLeader = errors.New("node is the leader")

	// ErrNotLeader is returned when an operation can't be completed on a
	// follower or candidate node.
	ErrNotLeader = errors.New("node is not the leader")

	// ErrLeadershipLost is returned when a leader fails to commit a log entry
	// because it's been deposed in the process.
	ErrLeadershipLost = errors.New("leadership lost while committing log")

	// ErrAbortedByRestore is returned when a leader fails to commit a log
	// entry because it's been superseded by a user snapshot restore.
	ErrAbortedByRestore = errors.New("snapshot restored while committing log")

	// ErrRaftShutdown is returned when operations are requested against an
	// inactive Raft.
	ErrRaftShutdown = errors.New("raft is already shutdown")

	// ErrEnqueueTimeout is returned when a command fails due to a timeout.
	ErrEnqueueTimeout = errors.New("timed out enqueuing operation")

	// ErrNothingNewToSnapshot is returned when trying to create a snapshot
	// but there's nothing new commited to the FSM since we started.
	ErrNothingNewToSnapshot = errors.New("nothing new to snapshot")

	// ErrUnsupportedProtocol is returned when an operation is attempted
	// that's not supported by the current protocol version.
	ErrUnsupportedProtocol = errors.New("operation not supported with current protocol version")

	// ErrCantBootstrap is returned when attempt is made to bootstrap a
	// cluster that already has state present.
	ErrCantBootstrap = errors.New("bootstrap only works on new clusters")

	// craft
	// ErrRejected is returned when the leader rejects requests because
	// it is stepping down, or is waiting out clock error
	ErrRejected = errors.New("leader rejects the request")

	// ErrWrongConfiguration is returned when craft is not correctly configured
	ErrWrongConfiguration = errors.New("craft not correctly configured")
)
View Source
var (
	// ErrTransportShutdown is returned when operations on a transport are
	// invoked after it's been terminated.
	ErrTransportShutdown = errors.New("transport shutdown")

	// ErrPipelineShutdown is returned when the pipeline is closed.
	ErrPipelineShutdown = errors.New("append pipeline closed")
)
View Source
var (
	// ErrLogNotFound indicates a given log entry is not available.
	ErrLogNotFound = errors.New("log not found")

	// ErrPipelineReplicationNotSupported can be returned by the transport to
	// signal that pipeline replication is not supported in general, and that
	// no error message should be produced.
	ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported")
)

Functions

func BootstrapCluster

func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
	snaps SnapshotStore, trans Transport, configuration Configuration) error

BootstrapCluster initializes a server's storage with the given cluster configuration. This should only be called at the beginning of time for the cluster, and you absolutely must make sure that you call it with the same configuration on all the Voter servers. There is no need to bootstrap Nonvoter and Staging servers.

One sane approach is to bootstrap a single server with a configuration listing just itself as a Voter, then invoke AddVoter() on it to add other servers to the cluster.

func HasExistingState

func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error)

HasExistingState returns true if the server has any existing state (logs, knowledge of a current term, or any snapshots).

func NewInmemTransport

func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport)

NewInmemTransport is used to initialize a new transport and generates a random local address if none is specified

func NewInmemTransportWithTimeout

func NewInmemTransportWithTimeout(addr ServerAddress, timeout time.Duration) (ServerAddress, *InmemTransport)

NewInmemTransportWithTimeout is used to initialize a new transport and generates a random local address if none is specified. The given timeout will be used to decide how long to wait for a connected peer to process the RPCs that we're sending it. See also Connect() and Consumer().

func RecoverCluster

func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
	snaps SnapshotStore, trans Transport, configuration Configuration) error

RecoverCluster is used to manually force a new configuration in order to recover from a loss of quorum where the current configuration cannot be restored, such as when several servers die at the same time. This works by reading all the current state for this server, creating a snapshot with the supplied configuration, and then truncating the Raft log. This is the only safe way to force a given configuration without actually altering the log to insert any new entries, which could cause conflicts with other servers with different state.

WARNING! This operation implicitly commits all entries in the Raft log, so in general this is an extremely unsafe operation. If you've lost your other servers and are performing a manual recovery, then you've also lost the commit information, so this is likely the best you can do, but you should be aware that calling this can cause Raft log entries that were in the process of being replicated but not yet be committed to be committed.

Note the FSM passed here is used for the snapshot operations and will be left in a state that should not be used by the application. Be sure to discard this FSM and any associated state and provide a fresh one when calling NewRaft later.

A typical way to recover the cluster is to shut down all servers and then run RecoverCluster on every server using an identical configuration. When the cluster is then restarted, and election should occur and then Raft will resume normal operation. If it's desired to make a particular server the leader, this can be used to inject a new configuration with that server as the sole voter, and then join up other new clean-state peer servers using the usual APIs in order to bring the cluster back into a known state.

func ValidateConfig

func ValidateConfig(config *Config) error

ValidateConfig is used to validate a sane configuration

Types

type AppendEntriesRequest

type AppendEntriesRequest struct {
	RPCHeader

	// Provide the current term and leader
	Term   uint64
	Leader []byte

	// Provide the previous entries for integrity checking
	PrevLogEntry uint64
	PrevLogTerm  uint64

	// New entries to commit
	Entries []*Log

	// Commit index on the leader
	LeaderCommitIndex uint64

	// craft
	// apply indexes for local replicas
	ApplyIndexes []uint64

	// craft
	// next safe time for this group after replicating entries in this request
	NextSafeTime int64
}

AppendEntriesRequest is the command used to append entries to the replicated log.

func (*AppendEntriesRequest) GetRPCHeader

func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader

See WithRPCHeader.

type AppendEntriesResponse

type AppendEntriesResponse struct {
	RPCHeader

	// Newer term if leader is out of date
	Term uint64

	// Last Log is a hint to help accelerate rebuilding slow nodes
	LastLog uint64

	// We may not succeed if we have a conflicting entry
	Success bool

	// There are scenarios where this request didn't succeed
	// but there's no need to wait/back-off the next attempt.
	NoRetryBackoff bool

	// craft
	LocalTerms    []uint64
	NextSafeTimes []int64
	Timestamp     int64
}

AppendEntriesResponse is the response returned from an AppendEntriesRequest.

func (*AppendEntriesResponse) GetRPCHeader

func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader

See WithRPCHeader.

type AppendFuture

type AppendFuture interface {
	Future

	// Start returns the time that the append request was started.
	// It is always OK to call this method.
	Start() time.Time

	// Request holds the parameters of the AppendEntries call.
	// It is always OK to call this method.
	Request() *AppendEntriesRequest

	// Response holds the results of the AppendEntries call.
	// This method must only be called after the Error
	// method returns, and will only be valid on success.
	Response() *AppendEntriesResponse
}

AppendFuture is used to return information about a pipelined AppendEntries request.

type AppendPipeline

type AppendPipeline interface {
	// AppendEntries is used to add another request to the pipeline.
	// The send may block which is an effective form of back-pressure.
	AppendEntries(args *AppendEntriesRequest, resp *AppendEntriesResponse) (AppendFuture, error)

	// Consumer returns a channel that can be used to consume
	// response futures when they are ready.
	Consumer() <-chan AppendFuture

	// Close closes the pipeline and cancels all inflight RPCs
	Close() error
}

AppendPipeline is used for pipelining AppendEntries requests. It is used to increase the replication throughput by masking latency and better utilizing bandwidth.

type ApplyFuture

type ApplyFuture interface {
	IndexFuture

	// Response returns the FSM response as returned
	// by the FSM.Apply method. This must not be called
	// until after the Error method has returned.
	Response() interface{}
	// craft
	// Wait waits for a log entry gets executed
	Wait()
	// Complete notifies the completion of execution
	Complete()
}

ApplyFuture is used for Apply and can return the FSM response.

type CFSM

type CFSM interface {
	// Apply log is invoked once a log entry is merged.
	// It returns a value which will be made available in the
	// Future in MergerEntry
	Apply(*MergerEntry) interface{}
}

CFSM provides an interface that can be implemented by clients to make use of the replicated log. This is the state machine that actually applies the log, whereas FSM is the state machine of each Raft instance.

type Clock

type Clock interface {
	// GetClockUncertainty returns the current clock uncertainty, i.e., max possible clock offsets
	// between any two clocks in the network, in the exponent of 10� nanoseconds.
	// For example, a return value of 5 means the current uncertainty is 10^5ns = 100us
	GetClockUncertainty() int
}

craft Clock is an interface that should be implemented to provide the clock uncertainty

type Config

type Config struct {
	// ProtocolVersion allows a Raft server to inter-operate with older
	// Raft servers running an older version of the code. This is used to
	// version the wire protocol as well as Raft-specific log entries that
	// the server uses when _speaking_ to other servers. There is currently
	// no auto-negotiation of versions so all servers must be manually
	// configured with compatible versions. See ProtocolVersionMin and
	// ProtocolVersionMax for the versions of the protocol that this server
	// can _understand_.
	ProtocolVersion ProtocolVersion

	// HeartbeatTimeout specifies the time in follower state without
	// a leader before we attempt an election.
	HeartbeatTimeout time.Duration

	// ElectionTimeout specifies the time in candidate state without
	// a leader before we attempt an election.
	ElectionTimeout time.Duration

	// CommitTimeout controls the time without an Apply() operation
	// before we heartbeat to ensure a timely commit. Due to random
	// staggering, may be delayed as much as 2x this value.
	CommitTimeout time.Duration

	// MaxAppendEntries controls the maximum number of append entries
	// to send at once. We want to strike a balance between efficiency
	// and avoiding waste if the follower is going to reject because of
	// an inconsistent log.
	MaxAppendEntries int

	// If we are a member of a cluster, and RemovePeer is invoked for the
	// local node, then we forget all peers and transition into the follower state.
	// If ShutdownOnRemove is is set, we additional shutdown Raft. Otherwise,
	// we can become a leader of a cluster containing only this node.
	ShutdownOnRemove bool

	// TrailingLogs controls how many logs we leave after a snapshot. This is
	// used so that we can quickly replay logs on a follower instead of being
	// forced to send an entire snapshot.
	TrailingLogs uint64

	// SnapshotInterval controls how often we check if we should perform a snapshot.
	// We randomly stagger between this value and 2x this value to avoid the entire
	// cluster from performing a snapshot at once.
	SnapshotInterval time.Duration

	// SnapshotThreshold controls how many outstanding logs there must be before
	// we perform a snapshot. This is to prevent excessive snapshots when we can
	// just replay a small set of logs.
	SnapshotThreshold uint64

	// LeaderLeaseTimeout is used to control how long the "lease" lasts
	// for being the leader without being able to contact a quorum
	// of nodes. If we reach this interval without contact, we will
	// step down as leader.
	LeaderLeaseTimeout time.Duration

	// StartAsLeader forces Raft to start in the leader state. This should
	// never be used except for testing purposes, as it can cause a split-brain.
	StartAsLeader bool

	// The unique ID for this server across all time. When running with
	// ProtocolVersion < 3, you must set this to be the same as the network
	// address of your transport.
	LocalID ServerID

	// NotifyCh is used to provide a channel that will be notified of leadership
	// changes. Raft will block writing to this channel, so it should either be
	// buffered or aggressively consumed.
	NotifyCh chan<- bool

	// LogOutput is used as a sink for logs, unless Logger is specified.
	// Defaults to os.Stderr.
	LogOutput io.Writer

	// Logger is a user-provided logger. If nil, a logger writing to LogOutput
	// is used.
	Logger *log.Logger

	// craft
	// MaxClockUncertainty is the maximum accepting clock error bound in fast update,
	// expressed in terms of power of 10, in nanoseconds
	MaxClockUncertainty int
}

Config provides any necessary configuration for the Raft server.

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with usable defaults.

type Configuration

type Configuration struct {
	Servers []Server
}

Configuration tracks which servers are in the cluster, and whether they have votes. This should include the local server, if it's a member of the cluster. The servers are listed no particular order, but each should only appear once. These entries are appended to the log during membership changes.

func ReadConfigJSON

func ReadConfigJSON(path string) (Configuration, error)

ReadConfigJSON reads a new-style peers.json and returns a configuration structure. This can be used to perform manual recovery when running protocol versions that use server IDs.

func ReadPeersJSON

func ReadPeersJSON(path string) (Configuration, error)

ReadPeersJSON consumes a legacy peers.json file in the format of the old JSON peer store and creates a new-style configuration structure. This can be used to migrate this data or perform manual recovery when running protocol versions that can interoperate with older, unversioned Raft servers. This should not be used once server IDs are in use, because the old peers.json file didn't have support for these, nor non-voter suffrage types.

func (*Configuration) Clone

func (c *Configuration) Clone() (copy Configuration)

Clone makes a deep copy of a Configuration.

type ConfigurationChangeCommand

type ConfigurationChangeCommand uint8

ConfigurationChangeCommand is the different ways to change the cluster configuration.

const (
	// AddStaging makes a server Staging unless its Voter.
	AddStaging ConfigurationChangeCommand = iota
	// AddNonvoter makes a server Nonvoter unless its Staging or Voter.
	AddNonvoter
	// DemoteVoter makes a server Nonvoter unless its absent.
	DemoteVoter
	// RemoveServer removes a server entirely from the cluster membership.
	RemoveServer
	// Promote is created automatically by a leader; it turns a Staging server
	// into a Voter.
	Promote
)

func (ConfigurationChangeCommand) String

type ConfigurationFuture

type ConfigurationFuture interface {
	IndexFuture

	// Configuration contains the latest configuration. This must
	// not be called until after the Error method has returned.
	Configuration() Configuration
}

ConfigurationFuture is used for GetConfiguration and can return the latest configuration in use by Raft.

type DiscardSnapshotSink

type DiscardSnapshotSink struct{}

func (*DiscardSnapshotSink) Cancel

func (d *DiscardSnapshotSink) Cancel() error

func (*DiscardSnapshotSink) Close

func (d *DiscardSnapshotSink) Close() error

func (*DiscardSnapshotSink) ID

func (d *DiscardSnapshotSink) ID() string

func (*DiscardSnapshotSink) Write

func (d *DiscardSnapshotSink) Write(b []byte) (int, error)

type DiscardSnapshotStore

type DiscardSnapshotStore struct{}

DiscardSnapshotStore is used to successfully snapshot while always discarding the snapshot. This is useful for when the log should be truncated but no snapshot should be retained. This should never be used for production use, and is only suitable for testing.

func NewDiscardSnapshotStore

func NewDiscardSnapshotStore() *DiscardSnapshotStore

NewDiscardSnapshotStore is used to create a new DiscardSnapshotStore.

func (*DiscardSnapshotStore) Create

func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64,
	configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error)

func (*DiscardSnapshotStore) List

func (d *DiscardSnapshotStore) List() ([]*SnapshotMeta, error)

func (*DiscardSnapshotStore) Open

type FSM

type FSM interface {
	// Apply log is invoked once a log entry is committed.
	// It returns a value which will be made available in the
	// ApplyFuture returned by Raft.Apply method if that
	// method was called on the same Raft node as the FSM.
	Apply(*Log) interface{}

	// Snapshot is used to support log compaction. This call should
	// return an FSMSnapshot which can be used to save a point-in-time
	// snapshot of the FSM. Apply and Snapshot are not called in multiple
	// threads, but Apply will be called concurrently with Persist. This means
	// the FSM should be implemented in a fashion that allows for concurrent
	// updates while a snapshot is happening.
	Snapshot() (FSMSnapshot, error)

	// Restore is used to restore an FSM from a snapshot. It is not called
	// concurrently with any other command. The FSM must discard all previous
	// state.
	Restore(io.ReadCloser) error
}

FSM provides an interface that can be implemented by clients to make use of the replicated log.

type FSMSnapshot

type FSMSnapshot interface {
	// Persist should dump all necessary state to the WriteCloser 'sink',
	// and call sink.Close() when finished or call sink.Cancel() on error.
	Persist(sink SnapshotSink) error

	// Release is invoked when we are finished with the snapshot.
	Release()
}

FSMSnapshot is returned by an FSM in response to a Snapshot It must be safe to invoke FSMSnapshot methods with concurrent calls to Apply.

type FileSnapshotSink

type FileSnapshotSink struct {
	// contains filtered or unexported fields
}

FileSnapshotSink implements SnapshotSink with a file.

func (*FileSnapshotSink) Cancel

func (s *FileSnapshotSink) Cancel() error

Cancel is used to indicate an unsuccessful end.

func (*FileSnapshotSink) Close

func (s *FileSnapshotSink) Close() error

Close is used to indicate a successful end.

func (*FileSnapshotSink) ID

func (s *FileSnapshotSink) ID() string

ID returns the ID of the snapshot, can be used with Open() after the snapshot is finalized.

func (*FileSnapshotSink) Write

func (s *FileSnapshotSink) Write(b []byte) (int, error)

Write is used to append to the state file. We write to the buffered IO object to reduce the amount of context switches.

type FileSnapshotStore

type FileSnapshotStore struct {
	// contains filtered or unexported fields
}

FileSnapshotStore implements the SnapshotStore interface and allows snapshots to be made on the local disk.

func NewFileSnapshotStore

func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSnapshotStore, error)

NewFileSnapshotStore creates a new FileSnapshotStore based on a base directory. The `retain` parameter controls how many snapshots are retained. Must be at least 1.

func NewFileSnapshotStoreWithLogger

func NewFileSnapshotStoreWithLogger(base string, retain int, logger *log.Logger) (*FileSnapshotStore, error)

NewFileSnapshotStoreWithLogger creates a new FileSnapshotStore based on a base directory. The `retain` parameter controls how many snapshots are retained. Must be at least 1.

func (*FileSnapshotStore) Create

func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64,
	configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error)

Create is used to start a new snapshot

func (*FileSnapshotStore) List

func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error)

List returns available snapshots in the store.

func (*FileSnapshotStore) Open

Open takes a snapshot ID and returns a ReadCloser for that snapshot.

func (*FileSnapshotStore) ReapSnapshots

func (f *FileSnapshotStore) ReapSnapshots() error

ReapSnapshots reaps any snapshots beyond the retain count.

type FilterFn

type FilterFn func(o *Observation) bool

FilterFn is a function that can be registered in order to filter observations. The function reports whether the observation should be included - if it returns false, the observation will be filtered out.

type Future

type Future interface {
	// Error blocks until the future arrives and then
	// returns the error status of the future.
	// This may be called any number of times - all
	// calls will return the same value.
	// Note that it is not OK to call this method
	// twice concurrently on the same Future instance.
	Error() error
}

Future is used to represent an action that may occur in the future.

type IndexFuture

type IndexFuture interface {
	Future

	// Index holds the index of the newly applied log entry.
	// This must not be called until after the Error method has returned.
	Index() uint64
}

IndexFuture is used for future actions that can result in a raft log entry being created.

type InmemSnapshotSink

type InmemSnapshotSink struct {
	// contains filtered or unexported fields
}

InmemSnapshotSink implements SnapshotSink in memory

func (*InmemSnapshotSink) Cancel

func (s *InmemSnapshotSink) Cancel() error

func (*InmemSnapshotSink) Close

func (s *InmemSnapshotSink) Close() error

Close updates the Size and is otherwise a no-op

func (*InmemSnapshotSink) ID

func (s *InmemSnapshotSink) ID() string

func (*InmemSnapshotSink) Write

func (s *InmemSnapshotSink) Write(p []byte) (n int, err error)

Write appends the given bytes to the snapshot contents

type InmemSnapshotStore

type InmemSnapshotStore struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

InmemSnapshotStore implements the SnapshotStore interface and retains only the most recent snapshot

func NewInmemSnapshotStore

func NewInmemSnapshotStore() *InmemSnapshotStore

NewInmemSnapshotStore creates a blank new InmemSnapshotStore

func (*InmemSnapshotStore) Create

func (m *InmemSnapshotStore) Create(version SnapshotVersion, index, term uint64,
	configuration Configuration, configurationIndex uint64, trans Transport) (SnapshotSink, error)

Create replaces the stored snapshot with a new one using the given args

func (*InmemSnapshotStore) List

func (m *InmemSnapshotStore) List() ([]*SnapshotMeta, error)

List returns the latest snapshot taken

func (*InmemSnapshotStore) Open

Open wraps an io.ReadCloser around the snapshot contents

type InmemStore

type InmemStore struct {
	// contains filtered or unexported fields
}

InmemStore implements the LogStore and StableStore interface. It should NOT EVER be used for production. It is used only for unit tests. Use the MDBStore implementation instead.

func NewInmemStore

func NewInmemStore() *InmemStore

NewInmemStore returns a new in-memory backend. Do not ever use for production. Only for testing.

func (*InmemStore) DeleteRange

func (i *InmemStore) DeleteRange(min, max uint64) error

DeleteRange implements the LogStore interface.

func (*InmemStore) FirstIndex

func (i *InmemStore) FirstIndex() (uint64, error)

FirstIndex implements the LogStore interface.

func (*InmemStore) Get

func (i *InmemStore) Get(key []byte) ([]byte, error)

Get implements the StableStore interface.

func (*InmemStore) GetLog

func (i *InmemStore) GetLog(index uint64, log *Log) error

GetLog implements the LogStore interface.

func (*InmemStore) GetUint64

func (i *InmemStore) GetUint64(key []byte) (uint64, error)

GetUint64 implements the StableStore interface.

func (*InmemStore) LastIndex

func (i *InmemStore) LastIndex() (uint64, error)

LastIndex implements the LogStore interface.

func (*InmemStore) Set

func (i *InmemStore) Set(key []byte, val []byte) error

Set implements the StableStore interface.

func (*InmemStore) SetUint64

func (i *InmemStore) SetUint64(key []byte, val uint64) error

SetUint64 implements the StableStore interface.

func (*InmemStore) StoreLog

func (i *InmemStore) StoreLog(log *Log) error

StoreLog implements the LogStore interface.

func (*InmemStore) StoreLogs

func (i *InmemStore) StoreLogs(logs []*Log) error

StoreLogs implements the LogStore interface.

type InmemTransport

type InmemTransport struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

InmemTransport Implements the Transport interface, to allow Raft to be tested in-memory without going over a network.

func (*InmemTransport) AppendEntries

func (i *InmemTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error

AppendEntries implements the Transport interface.

func (*InmemTransport) AppendEntriesPipeline

func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)

AppendEntriesPipeline returns an interface that can be used to pipeline AppendEntries requests.

func (*InmemTransport) Close

func (i *InmemTransport) Close() error

Close is used to permanently disable the transport

func (*InmemTransport) Connect

func (i *InmemTransport) Connect(peer ServerAddress, t Transport)

Connect is used to connect this transport to another transport for a given peer name. This allows for local routing.

func (*InmemTransport) Consumer

func (i *InmemTransport) Consumer() <-chan RPC

Consumer implements the Transport interface.

func (*InmemTransport) DecodePeer

func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress

DecodePeer implements the Transport interface.

func (*InmemTransport) Disconnect

func (i *InmemTransport) Disconnect(peer ServerAddress)

Disconnect is used to remove the ability to route to a given peer.

func (*InmemTransport) DisconnectAll

func (i *InmemTransport) DisconnectAll()

DisconnectAll is used to remove all routes to peers.

func (*InmemTransport) EncodePeer

func (i *InmemTransport) EncodePeer(id ServerID, p ServerAddress) []byte

EncodePeer implements the Transport interface.

func (*InmemTransport) InstallSnapshot

func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error

InstallSnapshot implements the Transport interface.

func (*InmemTransport) LocalAddr

func (i *InmemTransport) LocalAddr() ServerAddress

LocalAddr implements the Transport interface.

func (*InmemTransport) RequestVote

func (i *InmemTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error

RequestVote implements the Transport interface.

func (*InmemTransport) SetHeartbeatHandler

func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC))

SetHeartbeatHandler is used to set optional fast-path for heartbeats, not supported for this transport.

type InstallSnapshotRequest

type InstallSnapshotRequest struct {
	RPCHeader
	SnapshotVersion SnapshotVersion

	Term   uint64
	Leader []byte

	// These are the last index/term included in the snapshot
	LastLogIndex uint64
	LastLogTerm  uint64

	// Peer Set in the snapshot. This is deprecated in favor of Configuration
	// but remains here in case we receive an InstallSnapshot from a leader
	// that's running old code.
	Peers []byte

	// Cluster membership.
	Configuration []byte
	// Log index where 'Configuration' entry was originally written.
	ConfigurationIndex uint64

	// Size of the snapshot
	Size int64
}

InstallSnapshotRequest is the command sent to a Raft peer to bootstrap its log (and state machine) from a snapshot on another peer.

func (*InstallSnapshotRequest) GetRPCHeader

func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader

See WithRPCHeader.

type InstallSnapshotResponse

type InstallSnapshotResponse struct {
	RPCHeader

	Term    uint64
	Success bool
}

InstallSnapshotResponse is the response returned from an InstallSnapshotRequest.

func (*InstallSnapshotResponse) GetRPCHeader

func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader

See WithRPCHeader.

type LeaderObservation

type LeaderObservation struct {
	// contains filtered or unexported fields
}

LeaderObservation is used for the data when leadership changes.

type Log

type Log struct {
	// Index holds the index of the log entry.
	Index uint64

	// Term holds the election term of the log entry.
	Term uint64

	// Type holds the type of the log entry.
	Type LogType

	// Data holds the log entry's type-specific data.
	Data []byte

	// craft
	// Timestamp of the entry
	Timestamp int64
	FastPath  bool
}

Log entries are replicated to all members of the Raft cluster and form the heart of the replicated state machine.

type LogCache

type LogCache struct {
	// contains filtered or unexported fields
}

LogCache wraps any LogStore implementation to provide an in-memory ring buffer. This is used to cache access to the recently written entries. For implementations that do not cache themselves, this can provide a substantial boost by avoiding disk I/O on recent entries.

func NewLogCache

func NewLogCache(capacity int, store LogStore) (*LogCache, error)

NewLogCache is used to create a new LogCache with the given capacity and backend store.

func (*LogCache) DeleteRange

func (c *LogCache) DeleteRange(min, max uint64) error

func (*LogCache) FirstIndex

func (c *LogCache) FirstIndex() (uint64, error)

func (*LogCache) GetLog

func (c *LogCache) GetLog(idx uint64, log *Log) error

func (*LogCache) LastIndex

func (c *LogCache) LastIndex() (uint64, error)

func (*LogCache) StoreLog

func (c *LogCache) StoreLog(log *Log) error

func (*LogCache) StoreLogs

func (c *LogCache) StoreLogs(logs []*Log) error

type LogFuture

type LogFuture struct {
	// contains filtered or unexported fields
}

LogFuture is used to apply a log entry and waits until the log is considered committed.

func (*LogFuture) Complete

func (l *LogFuture) Complete()

craft Complete is called when the entry is executed

func (*LogFuture) Error

func (d *LogFuture) Error() error

func (*LogFuture) Index

func (l *LogFuture) Index() uint64

Index returns the log index

func (*LogFuture) Response

func (l *LogFuture) Response() interface{}

Response returns the response from fsm

func (*LogFuture) SetResponse

func (l *LogFuture) SetResponse(resp interface{})

SetResponse sets the response of the future

func (*LogFuture) Wait

func (l *LogFuture) Wait()

craft Wait waits for execution

type LogStore

type LogStore interface {
	// FirstIndex returns the first index written. 0 for no entries.
	FirstIndex() (uint64, error)

	// LastIndex returns the last index written. 0 for no entries.
	LastIndex() (uint64, error)

	// GetLog gets a log entry at a given index.
	GetLog(index uint64, log *Log) error

	// StoreLog stores a log entry.
	StoreLog(log *Log) error

	// StoreLogs stores multiple log entries.
	StoreLogs(logs []*Log) error

	// DeleteRange deletes a range of log entries. The range is inclusive.
	DeleteRange(min, max uint64) error
}

LogStore is used to provide an interface for storing and retrieving logs in a durable fashion.

type LogType

type LogType uint8

LogType describes various types of log entries.

const (
	// LogCommand is applied to a user FSM.
	LogCommand LogType = iota

	// LogNoop is used to assert leadership.
	LogNoop

	// LogAddPeer is used to add a new peer. This should only be used with
	// older protocol versions designed to be compatible with unversioned
	// Raft servers. See comments in config.go for details.
	LogAddPeerDeprecated

	// LogRemovePeer is used to remove an existing peer. This should only be
	// used with older protocol versions designed to be compatible with
	// unversioned Raft servers. See comments in config.go for details.
	LogRemovePeerDeprecated

	// LogBarrier is used to ensure all preceding operations have been
	// applied to the FSM. It is similar to LogNoop, but instead of returning
	// once committed, it only returns once the FSM manager acks it. Otherwise
	// it is possible there are operations committed but not yet applied to
	// the FSM.
	LogBarrier

	// LogConfiguration establishes a membership change configuration. It is
	// created when a server is added, removed, promoted, etc. Only used
	// when protocol version 1 or greater is in use.
	LogConfiguration
)

type LoopbackTransport

type LoopbackTransport interface {
	Transport // Embedded transport reference
	WithPeers // Embedded peer management
	WithClose // with a close routine
}

LoopbackTransport is an interface that provides a loopback transport suitable for testing e.g. InmemTransport. It's there so we don't have to rewrite tests.

type Merger

type Merger struct {
	// contains filtered or unexported fields
}

Merger implements log merger

func NewMerger

func NewMerger(nGroups int, fsm CFSM) *Merger

NewMerger returns a new Merger

func (*Merger) AddFutureSafeTime

func (m *Merger) AddFutureSafeTime(groupID int, index uint64, safeTime int64)

AddFutureSafeTime adds a future safe time update

func (*Merger) Enqueue

func (m *Merger) Enqueue(groupID int, entry *MergerEntry)

Enqueue puts an CommitTuple into queue

func (*Merger) GetApplyIndexes

func (m *Merger) GetApplyIndexes() []uint64

GetApplyIndexes returns a list of apply indexes

func (*Merger) NeedSync

func (m *Merger) NeedSync() bool

NeedSync returns whether merging is blocked and needs a sync req

func (*Merger) UpdateSafeTime

func (m *Merger) UpdateSafeTime(groupID int, timestamp int64)

UpdateSafeTime updates safe time of a given group

type MergerEntry

type MergerEntry struct {
	Log    *Log
	Future *LogFuture
}

MergerEntry in merger queue

type MergerGroupLog

type MergerGroupLog struct {
	// contains filtered or unexported fields
}

MergerGroupLog manages log and metadata for a group

type NetworkTransport

type NetworkTransport struct {
	TimeoutScale int
	// contains filtered or unexported fields
}

NetworkTransport provides a network based transport that can be used to communicate with Raft on remote machines. It requires an underlying stream layer to provide a stream abstraction, which can be simple TCP, TLS, etc.

This transport is very simple and lightweight. Each RPC request is framed by sending a byte that indicates the message type, followed by the MsgPack encoded request.

The response is an error string followed by the response object, both are encoded using MsgPack.

InstallSnapshot is special, in that after the RPC request we stream the entire state. That socket is not re-used as the connection state is not known if there is an error.

func NewNetworkTransport

func NewNetworkTransport(
	stream StreamLayer,
	maxPool int,
	timeout time.Duration,
	logOutput io.Writer,
) *NetworkTransport

NewNetworkTransport creates a new network transport with the given dialer and listener. The maxPool controls how many connections we will pool. The timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply the timeout by (SnapshotSize / TimeoutScale).

func NewNetworkTransportWithConfig

func NewNetworkTransportWithConfig(
	config *NetworkTransportConfig,
) *NetworkTransport

NewNetworkTransportWithConfig creates a new network transport with the given config struct

func NewNetworkTransportWithLogger

func NewNetworkTransportWithLogger(
	stream StreamLayer,
	maxPool int,
	timeout time.Duration,
	logger *log.Logger,
) *NetworkTransport

NewNetworkTransportWithLogger creates a new network transport with the given logger, dialer and listener. The maxPool controls how many connections we will pool. The timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply the timeout by (SnapshotSize / TimeoutScale).

func NewTCPTransport

func NewTCPTransport(
	bindAddr string,
	advertise net.Addr,
	maxPool int,
	timeout time.Duration,
	logOutput io.Writer,
) (*NetworkTransport, error)

NewTCPTransport returns a NetworkTransport that is built on top of a TCP streaming transport layer.

func NewTCPTransportWithConfig

func NewTCPTransportWithConfig(
	bindAddr string,
	advertise net.Addr,
	config *NetworkTransportConfig,
) (*NetworkTransport, error)

NewTCPTransportWithConfig returns a NetworkTransport that is built on top of a TCP streaming transport layer, using the given config struct.

func NewTCPTransportWithLogger

func NewTCPTransportWithLogger(
	bindAddr string,
	advertise net.Addr,
	maxPool int,
	timeout time.Duration,
	logger *log.Logger,
) (*NetworkTransport, error)

NewTCPTransportWithLogger returns a NetworkTransport that is built on top of a TCP streaming transport layer, with log output going to the supplied Logger

func (*NetworkTransport) AppendEntries

func (n *NetworkTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error

AppendEntries implements the Transport interface.

func (*NetworkTransport) AppendEntriesPipeline

func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)

AppendEntriesPipeline returns an interface that can be used to pipeline AppendEntries requests.

func (*NetworkTransport) Close

func (n *NetworkTransport) Close() error

Close is used to stop the network transport.

func (*NetworkTransport) CloseStreams

func (n *NetworkTransport) CloseStreams()

CloseStreams closes the current streams.

func (*NetworkTransport) Consumer

func (n *NetworkTransport) Consumer() <-chan RPC

Consumer implements the Transport interface.

func (*NetworkTransport) DecodePeer

func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress

DecodePeer implements the Transport interface.

func (*NetworkTransport) EncodePeer

func (n *NetworkTransport) EncodePeer(id ServerID, p ServerAddress) []byte

EncodePeer implements the Transport interface.

func (*NetworkTransport) InstallSnapshot

func (n *NetworkTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error

InstallSnapshot implements the Transport interface.

func (*NetworkTransport) IsShutdown

func (n *NetworkTransport) IsShutdown() bool

IsShutdown is used to check if the transport is shutdown.

func (*NetworkTransport) LocalAddr

func (n *NetworkTransport) LocalAddr() ServerAddress

LocalAddr implements the Transport interface.

func (*NetworkTransport) RequestVote

func (n *NetworkTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error

RequestVote implements the Transport interface.

func (*NetworkTransport) SetHeartbeatHandler

func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC))

SetHeartbeatHandler is used to setup a heartbeat handler as a fast-pass. This is to avoid head-of-line blocking from disk IO.

type NetworkTransportConfig

type NetworkTransportConfig struct {
	// ServerAddressProvider is used to override the target address when establishing a connection to invoke an RPC
	ServerAddressProvider ServerAddressProvider

	Logger *log.Logger

	// Dialer
	Stream StreamLayer

	// MaxPool controls how many connections we will pool
	MaxPool int

	// Timeout is used to apply I/O deadlines. For InstallSnapshot, we multiply
	// the timeout by (SnapshotSize / TimeoutScale).
	Timeout time.Duration
}

NetworkTransportConfig encapsulates configuration for the network transport layer.

type Observation

type Observation struct {
	// Raft holds the Raft instance generating the observation.
	Raft *Raft
	// Data holds observation-specific data. Possible types are
	// *RequestVoteRequest and RaftState.
	Data interface{}
}

Observation is sent along the given channel to observers when an event occurs.

type Observer

type Observer struct {
	// contains filtered or unexported fields
}

Observer describes what to do with a given observation.

func NewObserver

func NewObserver(channel chan Observation, blocking bool, filter FilterFn) *Observer

NewObserver creates a new observer that can be registered to make observations on a Raft instance. Observations will be sent on the given channel if they satisfy the given filter.

If blocking is true, the observer will block when it can't send on the channel, otherwise it may discard events.

func (*Observer) GetNumDropped

func (or *Observer) GetNumDropped() uint64

GetNumDropped returns the number of dropped observations due to blocking.

func (*Observer) GetNumObserved

func (or *Observer) GetNumObserved() uint64

GetNumObserved returns the number of observations.

type ProtocolVersion

type ProtocolVersion int

These are the versions of the protocol (which includes RPC messages as well as Raft-specific log entries) that this server can _understand_. Use the ProtocolVersion member of the Config object to control the version of the protocol to use when _speaking_ to other servers. Note that depending on the protocol version being spoken, some otherwise understood RPC messages may be refused. See dispositionRPC for details of this logic.

There are notes about the upgrade path in the description of the versions below. If you are starting a fresh cluster then there's no reason not to jump right to the latest protocol version. If you need to interoperate with older, version 0 Raft servers you'll need to drive the cluster through the different versions in order.

The version details are complicated, but here's a summary of what's required to get from a version 0 cluster to version 3:

  1. In version N of your app that starts using the new Raft library with versioning, set ProtocolVersion to 1.
  2. Make version N+1 of your app require version N as a prerequisite (all servers must be upgraded). For version N+1 of your app set ProtocolVersion to 2.
  3. Similarly, make version N+2 of your app require version N+1 as a prerequisite. For version N+2 of your app, set ProtocolVersion to 3.

During this upgrade, older cluster members will still have Server IDs equal to their network addresses. To upgrade an older member and give it an ID, it needs to leave the cluster and re-enter:

  1. Remove the server from the cluster with RemoveServer, using its network address as its ServerID.
  2. Update the server's config to a better ID (restarting the server).
  3. Add the server back to the cluster with AddVoter, using its new ID.

You can do this during the rolling upgrade from N+1 to N+2 of your app, or as a rolling change at any time after the upgrade.

Version History

0: Original Raft library before versioning was added. Servers running this

version of the Raft library use AddPeerDeprecated/RemovePeerDeprecated
for all configuration changes, and have no support for LogConfiguration.

1: First versioned protocol, used to interoperate with old servers, and begin

the migration path to newer versions of the protocol. Under this version
all configuration changes are propagated using the now-deprecated
RemovePeerDeprecated Raft log entry. This means that server IDs are always
set to be the same as the server addresses (since the old log entry type
cannot transmit an ID), and only AddPeer/RemovePeer APIs are supported.
Servers running this version of the protocol can understand the new
LogConfiguration Raft log entry but will never generate one so they can
remain compatible with version 0 Raft servers in the cluster.

2: Transitional protocol used when migrating an existing cluster to the new

server ID system. Server IDs are still set to be the same as server
addresses, but all configuration changes are propagated using the new
LogConfiguration Raft log entry type, which can carry full ID information.
This version supports the old AddPeer/RemovePeer APIs as well as the new
ID-based AddVoter/RemoveServer APIs which should be used when adding
version 3 servers to the cluster later. This version sheds all
interoperability with version 0 servers, but can interoperate with newer
Raft servers running with protocol version 1 since they can understand the
new LogConfiguration Raft log entry, and this version can still understand
their RemovePeerDeprecated Raft log entries. We need this protocol version
as an intermediate step between 1 and 3 so that servers will propagate the
ID information that will come from newly-added (or -rolled) servers using
protocol version 3, but since they are still using their address-based IDs
from the previous step they will still be able to track commitments and
their own voting status properly. If we skipped this step, servers would
be started with their new IDs, but they wouldn't see themselves in the old
address-based configuration, so none of the servers would think they had a
vote.

3: Protocol adding full support for server IDs and new ID-based server APIs

(AddVoter, AddNonvoter, etc.), old AddPeer/RemovePeer APIs are no longer
supported. Version 2 servers should be swapped out by removing them from
the cluster one-by-one and re-adding them with updated configuration for
this protocol version, along with their server ID. The remove/add cycle
is required to populate their server ID. Note that removing must be done
by ID, which will be the old server's address.
const (
	ProtocolVersionMin ProtocolVersion = 0
	ProtocolVersionMax                 = 3
)

type RPC

type RPC struct {
	Command  interface{}
	Reader   io.Reader // Set only for InstallSnapshot
	RespChan chan<- RPCResponse
}

RPC has a command, and provides a response mechanism.

func (*RPC) Respond

func (r *RPC) Respond(resp interface{}, err error)

Respond is used to respond with a response, error or both

type RPCHeader

type RPCHeader struct {
	// ProtocolVersion is the version of the protocol the sender is
	// speaking.
	ProtocolVersion ProtocolVersion
}

RPCHeader is a common sub-structure used to pass along protocol version and other information about the cluster. For older Raft implementations before versioning was added this will default to a zero-valued structure when read by newer Raft versions.

type RPCResponse

type RPCResponse struct {
	Response interface{}
	Error    error
}

RPCResponse captures both a response and a potential error.

type Raft

type Raft struct {
	// contains filtered or unexported fields
}

Raft implements a Raft node.

func NewRaft

func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error)

NewRaft is used to construct a new Raft node. It takes a configuration, as well as implementations of various interfaces that are required. If we have any old state, such as snapshots, logs, peers, etc, all those will be restored when creating the Raft node.

func (*Raft) AddNonvoter

func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture

AddNonvoter will add the given server to the cluster but won't assign it a vote. The server will receive log entries, but it won't participate in elections or log entry commitment. If the server is already in the cluster, this updates the server's address. This must be run on the leader or it will fail. For prevIndex and timeout, see AddVoter.

func (*Raft) AddPeer

func (r *Raft) AddPeer(peer ServerAddress) Future

AddPeer (deprecated) is used to add a new peer into the cluster. This must be run on the leader or it will fail. Use AddVoter/AddNonvoter instead.

func (*Raft) AddVoter

func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture

AddVoter will add the given server to the cluster as a staging server. If the server is already in the cluster as a voter, this updates the server's address. This must be run on the leader or it will fail. The leader will promote the staging server to a voter once that server is ready. If nonzero, prevIndex is the index of the only configuration upon which this change may be applied; if another configuration entry has been added in the meantime, this request will fail. If nonzero, timeout is how long this server should wait before the configuration change log entry is appended.

func (*Raft) AppliedIndex

func (r *Raft) AppliedIndex() uint64

AppliedIndex returns the last index applied to the FSM. This is generally lagging behind the last index, especially for indexes that are persisted but have not yet been considered committed by the leader. NOTE - this reflects the last index that was sent to the application's FSM over the apply channel but DOES NOT mean that the application's FSM has yet consumed it and applied it to its internal state. Thus, the application's state may lag behind this index.

func (*Raft) Apply

func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture

Apply is used to apply a command to the FSM in a highly consistent manner. This returns a future that can be used to wait on the application. An optional timeout can be provided to limit the amount of time we wait for the command to be started. This must be run on the leader or it will fail.

func (*Raft) ApplyFastPath

func (r *Raft) ApplyFastPath(cmd []byte, timeout time.Duration) ApplyFuture

craft ApplyFastPath is used to apply a command to the FSM in a highly consistent manner, using fast path. This returns a future that can be used to wait on the application. An optional timeout can be provided to limit the amount of time we wait for the command to be started. This must be run on the leader or it will fail.

func (*Raft) Barrier

func (r *Raft) Barrier(timeout time.Duration) Future

Barrier is used to issue a command that blocks until all preceeding operations have been applied to the FSM. It can be used to ensure the FSM reflects all queued writes. An optional timeout can be provided to limit the amount of time we wait for the command to be started. This must be run on the leader or it will fail.

func (*Raft) BootstrapCluster

func (r *Raft) BootstrapCluster(configuration Configuration) Future

BootstrapCluster is equivalent to non-member BootstrapCluster but can be called on an un-bootstrapped Raft instance after it has been created. This should only be called at the beginning of time for the cluster, and you absolutely must make sure that you call it with the same configuration on all the Voter servers. There is no need to bootstrap Nonvoter and Staging servers.

func (*Raft) ConfigureGroups

func (r *Raft) ConfigureGroups(groupID int, localReplicas []*Raft, merger *Merger, clock Clock) error

craft ConfigureGroups configures CRaft group for the Raft instance. It sets the group id, local Raft instances on the same server, the merger, and the clock interface implementation.

func (*Raft) DemoteVoter

func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture

DemoteVoter will take away a server's vote, if it has one. If present, the server will continue to receive log entries, but it won't participate in elections or log entry commitment. If the server is not in the cluster, this does nothing. This must be run on the leader or it will fail. For prevIndex and timeout, see AddVoter.

func (*Raft) DeregisterObserver

func (r *Raft) DeregisterObserver(or *Observer)

DeregisterObserver deregisters an observer.

func (*Raft) GetConfiguration

func (r *Raft) GetConfiguration() ConfigurationFuture

GetConfiguration returns the latest configuration and its associated index currently in use. This may not yet be committed. This must not be called on the main thread (which can access the information directly).

func (*Raft) IsLeader

func (r *Raft) IsLeader() bool

IsLeader returns whether the replica is leader

func (*Raft) LastContact

func (r *Raft) LastContact() time.Time

LastContact returns the time of last contact by a leader. This only makes sense if we are currently a follower.

func (*Raft) LastIndex

func (r *Raft) LastIndex() uint64

LastIndex returns the last index in stable storage, either from the last log or from the last snapshot.

func (*Raft) Leader

func (r *Raft) Leader() ServerAddress

Leader is used to return the current leader of the cluster. It may return empty string if there is no current leader or the leader is unknown.

func (*Raft) LeaderCh

func (r *Raft) LeaderCh() <-chan bool

LeaderCh is used to get a channel which delivers signals on acquiring or losing leadership. It sends true if we become the leader, and false if we lose it. The channel is not buffered, and does not block on writes.

func (*Raft) RegisterObserver

func (r *Raft) RegisterObserver(or *Observer)

RegisterObserver registers a new observer.

func (*Raft) RemovePeer

func (r *Raft) RemovePeer(peer ServerAddress) Future

RemovePeer (deprecated) is used to remove a peer from the cluster. If the current leader is being removed, it will cause a new election to occur. This must be run on the leader or it will fail. Use RemoveServer instead.

func (*Raft) RemoveServer

func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture

RemoveServer will remove the given server from the cluster. If the current leader is being removed, it will cause a new election to occur. This must be run on the leader or it will fail. For prevIndex and timeout, see AddVoter.

func (*Raft) Restore

func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error

Restore is used to manually force Raft to consume an external snapshot, such as if restoring from a backup. We will use the current Raft configuration, not the one from the snapshot, so that we can restore into a new cluster. We will also use the higher of the index of the snapshot, or the current index, and then add 1 to that, so we force a new state with a hole in the Raft log, so that the snapshot will be sent to followers and used for any new joiners. This can only be run on the leader, and blocks until the restore is complete or an error occurs.

WARNING! This operation has the leader take on the state of the snapshot and then sets itself up so that it replicates that to its followers though the install snapshot process. This involves a potentially dangerous period where the leader commits ahead of its followers, so should only be used for disaster recovery into a fresh cluster, and should not be used in normal operations.

func (*Raft) Shutdown

func (r *Raft) Shutdown() Future

Shutdown is used to stop the Raft background routines. This is not a graceful operation. Provides a future that can be used to block until all background routines have exited.

func (*Raft) Snapshot

func (r *Raft) Snapshot() SnapshotFuture

Snapshot is used to manually force Raft to take a snapshot. Returns a future that can be used to block until complete, and that contains a function that can be used to open the snapshot.

func (*Raft) State

func (r *Raft) State() RaftState

State is used to return the current raft state.

func (*Raft) Stats

func (r *Raft) Stats() map[string]string

Stats is used to return a map of various internal stats. This should only be used for informative purposes or debugging.

Keys are: "state", "term", "last_log_index", "last_log_term", "commit_index", "applied_index", "fsm_pending", "last_snapshot_index", "last_snapshot_term", "latest_configuration", "last_contact", and "num_peers".

The value of "state" is a numerical value representing a RaftState const.

The value of "latest_configuration" is a string which contains the id of each server, its suffrage status, and its address.

The value of "last_contact" is either "never" if there has been no contact with a leader, "0" if the node is in the leader state, or the time since last contact with a leader formatted as a string.

The value of "num_peers" is the number of other voting servers in the cluster, not including this node. If this node isn't part of the configuration then this will be "0".

All other values are uint64s, formatted as strings.

func (*Raft) String

func (r *Raft) String() string

String returns a string representation of this Raft node.

func (*Raft) VerifyLeader

func (r *Raft) VerifyLeader() Future

VerifyLeader is used to ensure the current node is still the leader. This can be done to prevent stale reads when a new leader has potentially been elected.

type RaftState

type RaftState uint32

RaftState captures the state of a Raft node: Follower, Candidate, Leader, or Shutdown.

const (
	// Follower is the initial state of a Raft node.
	Follower RaftState = iota

	// Candidate is one of the valid states of a Raft node.
	Candidate

	// Leader is one of the valid states of a Raft node.
	Leader

	// Shutdown is the terminal state of a Raft node.
	Shutdown
)

func (RaftState) String

func (s RaftState) String() string

type RequestVoteRequest

type RequestVoteRequest struct {
	RPCHeader

	// Provide the term and our id
	Term      uint64
	Candidate []byte

	// Used to ensure safety
	LastLogIndex uint64
	LastLogTerm  uint64
}

RequestVoteRequest is the command used by a candidate to ask a Raft peer for a vote in an election.

func (*RequestVoteRequest) GetRPCHeader

func (r *RequestVoteRequest) GetRPCHeader() RPCHeader

See WithRPCHeader.

type RequestVoteResponse

type RequestVoteResponse struct {
	RPCHeader

	// Newer term if leader is out of date.
	Term uint64

	// Peers is deprecated, but required by servers that only understand
	// protocol version 0. This is not populated in protocol version 2
	// and later.
	Peers []byte

	// Is the vote granted.
	Granted bool
}

RequestVoteResponse is the response returned from a RequestVoteRequest.

func (*RequestVoteResponse) GetRPCHeader

func (r *RequestVoteResponse) GetRPCHeader() RPCHeader

See WithRPCHeader.

type Server

type Server struct {
	// Suffrage determines whether the server gets a vote.
	Suffrage ServerSuffrage
	// ID is a unique string identifying this server for all time.
	ID ServerID
	// Address is its network address that a transport can contact.
	Address ServerAddress
	// craft
	// election priority
	Priority int
}

Server tracks the information about a single server in a configuration.

type ServerAddress

type ServerAddress string

ServerAddress is a network address for a server that a transport can contact.

func NewInmemAddr

func NewInmemAddr() ServerAddress

NewInmemAddr returns a new in-memory addr with a randomly generate UUID as the ID.

type ServerAddressProvider

type ServerAddressProvider interface {
	ServerAddr(id ServerID) (ServerAddress, error)
}

type ServerID

type ServerID string

ServerID is a unique string identifying a server for all time.

type ServerSuffrage

type ServerSuffrage int

ServerSuffrage determines whether a Server in a Configuration gets a vote.

const (
	// Voter is a server whose vote is counted in elections and whose match index
	// is used in advancing the leader's commit index.
	Voter ServerSuffrage = iota
	// Nonvoter is a server that receives log entries but is not considered for
	// elections or commitment purposes.
	Nonvoter
	// Staging is a server that acts like a nonvoter with one exception: once a
	// staging server receives enough log entries to be sufficiently caught up to
	// the leader's log, the leader will invoke a  membership change to change
	// the Staging server to a Voter.
	Staging
)

Note: Don't renumber these, since the numbers are written into the log.

func (ServerSuffrage) String

func (s ServerSuffrage) String() string

type SnapshotFuture

type SnapshotFuture interface {
	Future

	// Open is a function you can call to access the underlying snapshot and
	// its metadata. This must not be called until after the Error method
	// has returned.
	Open() (*SnapshotMeta, io.ReadCloser, error)
}

SnapshotFuture is used for waiting on a user-triggered snapshot to complete.

type SnapshotMeta

type SnapshotMeta struct {
	// Version is the version number of the snapshot metadata. This does not cover
	// the application's data in the snapshot, that should be versioned
	// separately.
	Version SnapshotVersion

	// ID is opaque to the store, and is used for opening.
	ID string

	// Index and Term store when the snapshot was taken.
	Index uint64
	Term  uint64

	// Peers is deprecated and used to support version 0 snapshots, but will
	// be populated in version 1 snapshots as well to help with upgrades.
	Peers []byte

	// Configuration and ConfigurationIndex are present in version 1
	// snapshots and later.
	Configuration      Configuration
	ConfigurationIndex uint64

	// Size is the size of the snapshot in bytes.
	Size int64
}

SnapshotMeta is for metadata of a snapshot.

type SnapshotSink

type SnapshotSink interface {
	io.WriteCloser
	ID() string
	Cancel() error
}

SnapshotSink is returned by StartSnapshot. The FSM will Write state to the sink and call Close on completion. On error, Cancel will be invoked.

type SnapshotStore

type SnapshotStore interface {
	// Create is used to begin a snapshot at a given index and term, and with
	// the given committed configuration. The version parameter controls
	// which snapshot version to create.
	Create(version SnapshotVersion, index, term uint64, configuration Configuration,
		configurationIndex uint64, trans Transport) (SnapshotSink, error)

	// List is used to list the available snapshots in the store.
	// It should return then in descending order, with the highest index first.
	List() ([]*SnapshotMeta, error)

	// Open takes a snapshot ID and provides a ReadCloser. Once close is
	// called it is assumed the snapshot is no longer needed.
	Open(id string) (*SnapshotMeta, io.ReadCloser, error)
}

SnapshotStore interface is used to allow for flexible implementations of snapshot storage and retrieval. For example, a client could implement a shared state store such as S3, allowing new nodes to restore snapshots without streaming from the leader.

type SnapshotVersion

type SnapshotVersion int

These are versions of snapshots that this server can _understand_. Currently, it is always assumed that this server generates the latest version, though this may be changed in the future to include a configurable version.

Version History

0: Original Raft library before versioning was added. The peers portion of

these snapshots is encoded in the legacy format which requires decodePeers
to parse. This version of snapshots should only be produced by the
unversioned Raft library.

1: New format which adds support for a full configuration structure and its

associated log index, with support for server IDs and non-voting server
modes. To ease upgrades, this also includes the legacy peers structure but
that will never be used by servers that understand version 1 snapshots.
Since the original Raft library didn't enforce any versioning, we must
include the legacy peers structure for this version, but we can deprecate
it in the next snapshot version.
const (
	SnapshotVersionMin SnapshotVersion = 0
	SnapshotVersionMax                 = 1
)

type StableStore

type StableStore interface {
	Set(key []byte, val []byte) error

	// Get returns the value for key, or an empty byte slice if key was not found.
	Get(key []byte) ([]byte, error)

	SetUint64(key []byte, val uint64) error

	// GetUint64 returns the uint64 value for key, or 0 if key was not found.
	GetUint64(key []byte) (uint64, error)
}

StableStore is used to provide stable storage of key configurations to ensure safety.

type StreamLayer

type StreamLayer interface {
	net.Listener

	// Dial is used to create a new outgoing connection
	Dial(address ServerAddress, timeout time.Duration) (net.Conn, error)
}

StreamLayer is used with the NetworkTransport to provide the low level stream abstraction.

type TCPStreamLayer

type TCPStreamLayer struct {
	// contains filtered or unexported fields
}

TCPStreamLayer implements StreamLayer interface for plain TCP.

func (*TCPStreamLayer) Accept

func (t *TCPStreamLayer) Accept() (c net.Conn, err error)

Accept implements the net.Listener interface.

func (*TCPStreamLayer) Addr

func (t *TCPStreamLayer) Addr() net.Addr

Addr implements the net.Listener interface.

func (*TCPStreamLayer) Close

func (t *TCPStreamLayer) Close() (err error)

Close implements the net.Listener interface.

func (*TCPStreamLayer) Dial

func (t *TCPStreamLayer) Dial(address ServerAddress, timeout time.Duration) (net.Conn, error)

Dial implements the StreamLayer interface.

type Transport

type Transport interface {
	// Consumer returns a channel that can be used to
	// consume and respond to RPC requests.
	Consumer() <-chan RPC

	// LocalAddr is used to return our local address to distinguish from our peers.
	LocalAddr() ServerAddress

	// AppendEntriesPipeline returns an interface that can be used to pipeline
	// AppendEntries requests.
	AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)

	// AppendEntries sends the appropriate RPC to the target node.
	AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error

	// RequestVote sends the appropriate RPC to the target node.
	RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error

	// InstallSnapshot is used to push a snapshot down to a follower. The data is read from
	// the ReadCloser and streamed to the client.
	InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error

	// EncodePeer is used to serialize a peer's address.
	EncodePeer(id ServerID, addr ServerAddress) []byte

	// DecodePeer is used to deserialize a peer's address.
	DecodePeer([]byte) ServerAddress

	// SetHeartbeatHandler is used to setup a heartbeat handler
	// as a fast-pass. This is to avoid head-of-line blocking from
	// disk IO. If a Transport does not support this, it can simply
	// ignore the call, and push the heartbeat onto the Consumer channel.
	SetHeartbeatHandler(cb func(rpc RPC))
}

Transport provides an interface for network transports to allow Raft to communicate with other nodes.

type WithClose

type WithClose interface {
	// Close permanently closes a transport, stopping
	// any associated goroutines and freeing other resources.
	Close() error
}

WithClose is an interface that a transport may provide which allows a transport to be shut down cleanly when a Raft instance shuts down.

It is defined separately from Transport as unfortunately it wasn't in the original interface specification.

type WithPeers

type WithPeers interface {
	Connect(peer ServerAddress, t Transport) // Connect a peer
	Disconnect(peer ServerAddress)           // Disconnect a given peer
	DisconnectAll()                          // Disconnect all peers, possibly to reconnect them later
}

WithPeers is an interface that a transport may provide which allows for connection and disconnection. Unless the transport is a loopback transport, the transport specified to "Connect" is likely to be nil.

type WithRPCHeader

type WithRPCHeader interface {
	GetRPCHeader() RPCHeader
}

WithRPCHeader is an interface that exposes the RPC header.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL