Versions in this module Expand all Collapse all v1 v1.2.3 Nov 12, 2020 Changes in this version + const DefaultTimeoutScale + const SuggestedMaxDataSize + var ErrAbortedByRestore = errors.New("snapshot restored while committing log") + var ErrCantBootstrap = errors.New("bootstrap only works on new clusters") + var ErrEnqueueTimeout = errors.New("timed out enqueuing operation") + var ErrLeader = errors.New("node is the leader") + var ErrLeadershipLost = errors.New("leadership lost while committing log") + var ErrLeadershipTransferInProgress = errors.New("leadership transfer in progress") + var ErrLogNotFound = errors.New("log not found") + var ErrNotLeader = errors.New("node is not the leader") + var ErrNothingNewToSnapshot = errors.New("nothing new to snapshot") + var ErrPipelineReplicationNotSupported = errors.New("pipeline replication not supported") + var ErrPipelineShutdown = errors.New("append pipeline closed") + var ErrRaftShutdown = errors.New("raft is already shutdown") + var ErrTransportShutdown = errors.New("transport shutdown") + var ErrUnsupportedProtocol = errors.New("operation not supported with current protocol version") + func BootstrapCluster(conf *Config, logs LogStore, stable StableStore, snaps SnapshotStore, ...) error + func EncodeConfiguration(configuration Configuration) []byte + func HasExistingState(logs LogStore, stable StableStore, snaps SnapshotStore) (bool, error) + func MakeCluster(n int, t *testing.T, conf *Config) *cluster + func MakeClusterCustom(t *testing.T, opts *MakeClusterOpts) *cluster + func MakeClusterNoBootstrap(n int, t *testing.T, conf *Config) *cluster + func NewInmemTransport(addr ServerAddress) (ServerAddress, *InmemTransport) + func NewInmemTransportWithTimeout(addr ServerAddress, timeout time.Duration) (ServerAddress, *InmemTransport) + func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, ...) error + func ValidateConfig(config *Config) error + type AppendEntriesRequest struct + Entries []*Log + Leader []byte + LeaderCommitIndex uint64 + PrevLogEntry uint64 + PrevLogTerm uint64 + Term uint64 + func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader + type AppendEntriesResponse struct + LastLog uint64 + NoRetryBackoff bool + Success bool + Term uint64 + func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader + type AppendFuture interface + Request func() *AppendEntriesRequest + Response func() *AppendEntriesResponse + Start func() time.Time + type AppendPipeline interface + AppendEntries func(args *AppendEntriesRequest, resp *AppendEntriesResponse) (AppendFuture, error) + Close func() error + Consumer func() <-chan AppendFuture + type ApplyFuture interface + Response func() interface{} + type BatchingFSM interface + ApplyBatch func([]*Log) []interface{} + type Config struct + CommitTimeout time.Duration + ElectionTimeout time.Duration + HeartbeatTimeout time.Duration + LeaderLeaseTimeout time.Duration + LocalID ServerID + LogLevel string + LogOutput io.Writer + Logger hclog.Logger + MaxAppendEntries int + NoSnapshotRestoreOnStart bool + NotifyCh chan<- bool + ProtocolVersion ProtocolVersion + ShutdownOnRemove bool + SnapshotInterval time.Duration + SnapshotThreshold uint64 + TrailingLogs uint64 + func DefaultConfig() *Config + type Configuration struct + Servers []Server + func DecodeConfiguration(buf []byte) Configuration + func GetConfiguration(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, ...) (Configuration, error) + func ReadConfigJSON(path string) (Configuration, error) + func ReadPeersJSON(path string) (Configuration, error) + func (c *Configuration) Clone() (copy Configuration) + type ConfigurationChangeCommand uint8 + const AddNonvoter + const AddStaging + const DemoteVoter + const Promote + const RemoveServer + func (c ConfigurationChangeCommand) String() string + type ConfigurationFuture interface + Configuration func() Configuration + type ConfigurationStore interface + StoreConfiguration func(index uint64, configuration Configuration) + type DiscardSnapshotSink struct + func (d *DiscardSnapshotSink) Cancel() error + func (d *DiscardSnapshotSink) Close() error + func (d *DiscardSnapshotSink) ID() string + func (d *DiscardSnapshotSink) Write(b []byte) (int, error) + type DiscardSnapshotStore struct + func NewDiscardSnapshotStore() *DiscardSnapshotStore + func (d *DiscardSnapshotStore) Create(version SnapshotVersion, index, term uint64, configuration Configuration, ...) (SnapshotSink, error) + func (d *DiscardSnapshotStore) List() ([]*SnapshotMeta, error) + func (d *DiscardSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) + type FSM interface + Apply func(*Log) interface{} + Restore func(io.ReadCloser) error + Snapshot func() (FSMSnapshot, error) + type FSMSnapshot interface + Persist func(sink SnapshotSink) error + Release func() + type FileSnapshotSink struct + func (s *FileSnapshotSink) Cancel() error + func (s *FileSnapshotSink) Close() error + func (s *FileSnapshotSink) ID() string + func (s *FileSnapshotSink) Write(b []byte) (int, error) + type FileSnapshotStore struct + func FileSnapTest(t *testing.T) (string, *FileSnapshotStore) + func NewFileSnapshotStore(base string, retain int, logOutput io.Writer) (*FileSnapshotStore, error) + func NewFileSnapshotStoreWithLogger(base string, retain int, logger hclog.Logger) (*FileSnapshotStore, error) + func (f *FileSnapshotStore) Create(version SnapshotVersion, index, term uint64, configuration Configuration, ...) (SnapshotSink, error) + func (f *FileSnapshotStore) List() ([]*SnapshotMeta, error) + func (f *FileSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) + func (f *FileSnapshotStore) ReapSnapshots() error + type FilterFn func(o *Observation) bool + type Future interface + Error func() error + type IndexFuture interface + Index func() uint64 + type InmemSnapshotSink struct + func (s *InmemSnapshotSink) Cancel() error + func (s *InmemSnapshotSink) Close() error + func (s *InmemSnapshotSink) ID() string + func (s *InmemSnapshotSink) Write(p []byte) (n int, err error) + type InmemSnapshotStore struct + func NewInmemSnapshotStore() *InmemSnapshotStore + func (m *InmemSnapshotStore) Create(version SnapshotVersion, index, term uint64, configuration Configuration, ...) (SnapshotSink, error) + func (m *InmemSnapshotStore) List() ([]*SnapshotMeta, error) + func (m *InmemSnapshotStore) Open(id string) (*SnapshotMeta, io.ReadCloser, error) + type InmemStore struct + func NewInmemStore() *InmemStore + func (i *InmemStore) DeleteRange(min, max uint64) error + func (i *InmemStore) FirstIndex() (uint64, error) + func (i *InmemStore) Get(key []byte) ([]byte, error) + func (i *InmemStore) GetLog(index uint64, log *Log) error + func (i *InmemStore) GetUint64(key []byte) (uint64, error) + func (i *InmemStore) LastIndex() (uint64, error) + func (i *InmemStore) Set(key []byte, val []byte) error + func (i *InmemStore) SetUint64(key []byte, val uint64) error + func (i *InmemStore) StoreLog(log *Log) error + func (i *InmemStore) StoreLogs(logs []*Log) error + type InmemTransport struct + func (i *InmemTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, ...) error + func (i *InmemTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) + func (i *InmemTransport) Close() error + func (i *InmemTransport) Connect(peer ServerAddress, t Transport) + func (i *InmemTransport) Consumer() <-chan RPC + func (i *InmemTransport) DecodePeer(buf []byte) ServerAddress + func (i *InmemTransport) Disconnect(peer ServerAddress) + func (i *InmemTransport) DisconnectAll() + func (i *InmemTransport) EncodePeer(id ServerID, p ServerAddress) []byte + func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, ...) error + func (i *InmemTransport) LocalAddr() ServerAddress + func (i *InmemTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, ...) error + func (i *InmemTransport) SetHeartbeatHandler(cb func(RPC)) + func (i *InmemTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, ...) error + type InstallSnapshotRequest struct + Configuration []byte + ConfigurationIndex uint64 + LastLogIndex uint64 + LastLogTerm uint64 + Leader []byte + Peers []byte + Size int64 + SnapshotVersion SnapshotVersion + Term uint64 + func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader + type InstallSnapshotResponse struct + Success bool + Term uint64 + func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader + type LeaderObservation struct + Leader ServerAddress + type LeadershipTransferFuture interface + type Log struct + Data []byte + Extensions []byte + Index uint64 + Term uint64 + Type LogType + type LogCache struct + func NewLogCache(capacity int, store LogStore) (*LogCache, error) + func (c *LogCache) DeleteRange(min, max uint64) error + func (c *LogCache) FirstIndex() (uint64, error) + func (c *LogCache) GetLog(idx uint64, log *Log) error + func (c *LogCache) LastIndex() (uint64, error) + func (c *LogCache) StoreLog(log *Log) error + func (c *LogCache) StoreLogs(logs []*Log) error + type LogStore interface + DeleteRange func(min, max uint64) error + FirstIndex func() (uint64, error) + GetLog func(index uint64, log *Log) error + LastIndex func() (uint64, error) + StoreLog func(log *Log) error + StoreLogs func(logs []*Log) error + type LogType uint8 + const LogAddPeerDeprecated + const LogBarrier + const LogCommand + const LogConfiguration + const LogNoop + const LogRemovePeerDeprecated + type LoopbackTransport interface + type MakeClusterOpts struct + Bootstrap bool + Conf *Config + ConfigStoreFSM bool + LongstopTimeout time.Duration + MakeFSMFunc func() FSM + Peers int + type MockFSM struct + func (m *MockFSM) Apply(log *Log) interface{} + func (m *MockFSM) Logs() [][]byte + func (m *MockFSM) Restore(inp io.ReadCloser) error + func (m *MockFSM) Snapshot() (FSMSnapshot, error) + type MockFSMConfigStore struct + func (m *MockFSMConfigStore) StoreConfiguration(index uint64, config Configuration) + type MockSnapshot struct + func (m *MockSnapshot) Persist(sink SnapshotSink) error + func (m *MockSnapshot) Release() + type NetworkTransport struct + TimeoutScale int + func NewNetworkTransport(stream StreamLayer, maxPool int, timeout time.Duration, logOutput io.Writer) *NetworkTransport + func NewNetworkTransportWithConfig(config *NetworkTransportConfig) *NetworkTransport + func NewNetworkTransportWithLogger(stream StreamLayer, maxPool int, timeout time.Duration, logger hclog.Logger) *NetworkTransport + func NewTCPTransport(bindAddr string, advertise net.Addr, maxPool int, timeout time.Duration, ...) (*NetworkTransport, error) + func NewTCPTransportWithConfig(bindAddr string, advertise net.Addr, config *NetworkTransportConfig) (*NetworkTransport, error) + func NewTCPTransportWithLogger(bindAddr string, advertise net.Addr, maxPool int, timeout time.Duration, ...) (*NetworkTransport, error) + func (n *NetworkTransport) AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, ...) error + func (n *NetworkTransport) AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) + func (n *NetworkTransport) Close() error + func (n *NetworkTransport) CloseStreams() + func (n *NetworkTransport) Consumer() <-chan RPC + func (n *NetworkTransport) DecodePeer(buf []byte) ServerAddress + func (n *NetworkTransport) EncodePeer(id ServerID, p ServerAddress) []byte + func (n *NetworkTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, ...) error + func (n *NetworkTransport) IsShutdown() bool + func (n *NetworkTransport) LocalAddr() ServerAddress + func (n *NetworkTransport) RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, ...) error + func (n *NetworkTransport) SetHeartbeatHandler(cb func(rpc RPC)) + func (n *NetworkTransport) TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, ...) error + type NetworkTransportConfig struct + Logger hclog.Logger + MaxPool int + ServerAddressProvider ServerAddressProvider + Stream StreamLayer + Timeout time.Duration + type Observation struct + Data interface{} + Raft *Raft + type Observer struct + func NewObserver(channel chan Observation, blocking bool, filter FilterFn) *Observer + func (or *Observer) GetNumDropped() uint64 + func (or *Observer) GetNumObserved() uint64 + type PeerObservation struct + Peer Server + Removed bool + type ProtocolVersion int + const ProtocolVersionMax + const ProtocolVersionMin + type RPC struct + Command interface{} + Reader io.Reader + RespChan chan<- RPCResponse + func (r *RPC) Respond(resp interface{}, err error) + type RPCHeader struct + ProtocolVersion ProtocolVersion + type RPCResponse struct + Error error + Response interface{} + type Raft struct + func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, ...) (*Raft, error) + func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture + func (r *Raft) AddPeer(peer ServerAddress) Future + func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture + func (r *Raft) AppliedIndex() uint64 + func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture + func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture + func (r *Raft) Barrier(timeout time.Duration) Future + func (r *Raft) BootstrapCluster(configuration Configuration) Future + func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture + func (r *Raft) DeregisterObserver(or *Observer) + func (r *Raft) GetConfiguration() ConfigurationFuture + func (r *Raft) LastContact() time.Time + func (r *Raft) LastIndex() uint64 + func (r *Raft) Leader() ServerAddress + func (r *Raft) LeaderCh() <-chan bool + func (r *Raft) LeadershipTransfer() Future + func (r *Raft) LeadershipTransferToServer(id ServerID, address ServerAddress) Future + func (r *Raft) RegisterObserver(or *Observer) + func (r *Raft) RemovePeer(peer ServerAddress) Future + func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture + func (r *Raft) Restore(meta *SnapshotMeta, reader io.Reader, timeout time.Duration) error + func (r *Raft) Shutdown() Future + func (r *Raft) Snapshot() SnapshotFuture + func (r *Raft) State() RaftState + func (r *Raft) Stats() map[string]string + func (r *Raft) String() string + func (r *Raft) VerifyLeader() Future + type RaftState uint32 + const Candidate + const Follower + const Leader + const Shutdown + func (s RaftState) String() string + type RequestVoteRequest struct + Candidate []byte + LastLogIndex uint64 + LastLogTerm uint64 + LeadershipTransfer bool + Term uint64 + func (r *RequestVoteRequest) GetRPCHeader() RPCHeader + type RequestVoteResponse struct + Granted bool + Peers []byte + Term uint64 + func (r *RequestVoteResponse) GetRPCHeader() RPCHeader + type Server struct + Address ServerAddress + ID ServerID + Suffrage ServerSuffrage + type ServerAddress string + func NewInmemAddr() ServerAddress + type ServerAddressProvider interface + ServerAddr func(id ServerID) (ServerAddress, error) + type ServerID string + type ServerSuffrage int + const Nonvoter + const Staging + const Voter + func (s ServerSuffrage) String() string + type SnapshotFuture interface + Open func() (*SnapshotMeta, io.ReadCloser, error) + type SnapshotMeta struct + Configuration Configuration + ConfigurationIndex uint64 + ID string + Index uint64 + Peers []byte + Size int64 + Term uint64 + Version SnapshotVersion + type SnapshotSink interface + Cancel func() error + ID func() string + type SnapshotStore interface + Create func(version SnapshotVersion, index, term uint64, configuration Configuration, ...) (SnapshotSink, error) + List func() ([]*SnapshotMeta, error) + Open func(id string) (*SnapshotMeta, io.ReadCloser, error) + type SnapshotVersion int + const SnapshotVersionMax + const SnapshotVersionMin + type StableStore interface + Get func(key []byte) ([]byte, error) + GetUint64 func(key []byte) (uint64, error) + Set func(key []byte, val []byte) error + SetUint64 func(key []byte, val uint64) error + type StreamLayer interface + Dial func(address ServerAddress, timeout time.Duration) (net.Conn, error) + type TCPStreamLayer struct + func (t *TCPStreamLayer) Accept() (c net.Conn, err error) + func (t *TCPStreamLayer) Addr() net.Addr + func (t *TCPStreamLayer) Close() (err error) + func (t *TCPStreamLayer) Dial(address ServerAddress, timeout time.Duration) (net.Conn, error) + type TimeoutNowRequest struct + func (r *TimeoutNowRequest) GetRPCHeader() RPCHeader + type TimeoutNowResponse struct + func (r *TimeoutNowResponse) GetRPCHeader() RPCHeader + type Transport interface + AppendEntries func(id ServerID, target ServerAddress, args *AppendEntriesRequest, ...) error + AppendEntriesPipeline func(id ServerID, target ServerAddress) (AppendPipeline, error) + Consumer func() <-chan RPC + DecodePeer func([]byte) ServerAddress + EncodePeer func(id ServerID, addr ServerAddress) []byte + InstallSnapshot func(id ServerID, target ServerAddress, args *InstallSnapshotRequest, ...) error + LocalAddr func() ServerAddress + RequestVote func(id ServerID, target ServerAddress, args *RequestVoteRequest, ...) error + SetHeartbeatHandler func(cb func(rpc RPC)) + TimeoutNow func(id ServerID, target ServerAddress, args *TimeoutNowRequest, ...) error + type WithClose interface + Close func() error + type WithPeers interface + Connect func(peer ServerAddress, t Transport) + Disconnect func(peer ServerAddress) + DisconnectAll func() + type WithRPCHeader interface + GetRPCHeader func() RPCHeader + type WrappingFSM interface + Underlying func() FSM