Documentation ¶
Overview ¶
Package libp2praft implements the go-libp2p-consensus interface wrapping the github.com/hashicorp/raft implementation, providing a custom generic FSM to handle states and generic operations and giving the user a go-libp2p-based network transport to use.
The main entry points to this library are the Consensus and Actor types. Usually, the first step is to create the Consensus, then use the *Consensus.FSM() object to initialize a Raft instance, along with the Libp2pTransport. With a Raft instance, an Actor can be created and then used with Consensus.SetActor(). From this point, the consensus system is ready to use.
It is IMPORTANT to make a few notes about the types of objects to be used as consensus.State and consensus.Op, since the go-libp2p-consensus interface does not make many assumptions about them (consensus.State being an empty interface).
Raft will need to send serialized version of the state and the operation objects. Default serialization uses MsgPack and requires that relevant fields are exported. Unexported fields will not be serialized and therefore not received in other nodes. Their local value will never change either. This includes the fields from children structs etc. Therefore, it is recommended to simplify user defined types like consensus.Op and consensus.State as much as possible and declare all relevant fields as exported.
Alternative, it is possible to use a custom serialization and deserialization mechanism by having consensus.State and consensus.Op implement the Marshable interface. This provides full control about how things are sent on the wire.
A consensus.Op ApplyTo() operation may return an error. This means that, while the operation is agreed-upon, the resulting state cannot be produced. This marks the state in that node as dirty but does not removes the operation itself. See CommitOp() for more details.
The underlying state for consensus.State should be a pointer, otherwise some operations won't work. Once provided, the state should only be modifed by this library.
Example (Consensus) ¶
package main import ( "context" "fmt" "io" "log" "testing" "time" "github.com/hashicorp/raft" "github.com/libp2p/go-libp2p" consensus "github.com/libp2p/go-libp2p-consensus" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" ) var raftTmpFolder = "testing_tmp" var raftQuiet = true type raftState struct { Msg string } type testOperation struct { Append string } func (o testOperation) ApplyTo(s consensus.State) (consensus.State, error) { raftSt := s.(*raftState) return &raftState{Msg: raftSt.Msg + o.Append}, nil } // wait 10 seconds for a leader. func waitForLeader(t *testing.T, r *raft.Raft) { obsCh := make(chan raft.Observation, 1) observer := raft.NewObserver(obsCh, false, nil) r.RegisterObserver(observer) defer r.DeregisterObserver(observer) // New Raft does not allow leader observation directy // What's worse, there will be no notification that a new // leader was elected because observations are set before // setting the Leader and only when the RaftState has changed. // Therefore, we need a ticker. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() ticker := time.NewTicker(time.Second / 2) defer ticker.Stop() for { select { case obs := <-obsCh: switch obs.Data.(type) { case raft.RaftState: if leaderAddr, _ := r.LeaderWithID(); leaderAddr != "" { return } } case <-ticker.C: if leaderAddr, _ := r.LeaderWithID(); leaderAddr != "" { return } case <-ctx.Done(): t.Fatal("timed out waiting for Leader") } } } func shutdown(t *testing.T, r *raft.Raft) { err := r.Shutdown().Error() if err != nil { t.Fatal(err) } } // Create a quick raft instance func makeTestingRaft(t *testing.T, h host.Host, pids []peer.ID, op consensus.Op) (*raft.Raft, *Consensus, *raft.NetworkTransport) { // -- Create the consensus with no actor attached var consensus *Consensus if op != nil { consensus = NewOpLog(&raftState{}, op) } else { consensus = NewConsensus(&raftState{"i am not consensuated"}) } // -- // -- Create Raft servers configuration servers := make([]raft.Server, len(pids)) for i, pid := range pids { servers[i] = raft.Server{ Suffrage: raft.Voter, ID: raft.ServerID(pid.String()), Address: raft.ServerAddress(pid.String()), } } serverConfig := raft.Configuration{ Servers: servers, } // -- // -- Create LibP2P transports Raft transport, err := NewLibp2pTransport(h, 2*time.Second) if err != nil { t.Fatal(err) } // -- // -- Configuration config := raft.DefaultConfig() if raftQuiet { config.LogOutput = io.Discard config.Logger = nil } config.LocalID = raft.ServerID(h.ID().String()) // -- // -- SnapshotStore snapshots, err := raft.NewFileSnapshotStore(raftTmpFolder, 3, nil) if err != nil { t.Fatal(err) } // -- Log store and stable store: we use inmem. logStore := raft.NewInmemStore() // -- // -- Boostrap everything if necessary bootstrapped, err := raft.HasExistingState(logStore, logStore, snapshots) if err != nil { t.Fatal(err) } if !bootstrapped { // Bootstrap cluster first raft.BootstrapCluster(config, logStore, logStore, snapshots, transport, serverConfig) } else { t.Log("Already initialized!!") } // -- // Create Raft instance. Our consensus.FSM() provides raft.FSM // implementation raft, err := raft.NewRaft(config, consensus.FSM(), logStore, logStore, snapshots, transport) if err != nil { t.Fatal(err) } return raft, consensus, transport } func main() { // This example shows how to use go-libp2p-raft to create a cluster // which agrees on a State. In order to do it, it defines a state, // creates three Raft nodes and launches them. We call a function which // lets the cluster leader repeteadly update the state. At the // end of the execution we verify that all members have agreed on the // same state. // // Some error handling has been excluded for simplicity // Declare an object which represents the State. // Note that State objects should have public/exported fields, // as they are [de]serialized. type raftState struct { Value int } // error handling ommitted newPeer := func(listenPort int) host.Host { h, _ := libp2p.New( libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)), ) return h } // Create peers and make sure they know about each others. peer1 := newPeer(9997) peer2 := newPeer(9998) peer3 := newPeer(9999) defer peer1.Close() defer peer2.Close() defer peer3.Close() peer1.Peerstore().AddAddrs(peer2.ID(), peer2.Addrs(), peerstore.PermanentAddrTTL) peer1.Peerstore().AddAddrs(peer3.ID(), peer3.Addrs(), peerstore.PermanentAddrTTL) peer2.Peerstore().AddAddrs(peer1.ID(), peer1.Addrs(), peerstore.PermanentAddrTTL) peer2.Peerstore().AddAddrs(peer3.ID(), peer3.Addrs(), peerstore.PermanentAddrTTL) peer3.Peerstore().AddAddrs(peer1.ID(), peer1.Addrs(), peerstore.PermanentAddrTTL) peer3.Peerstore().AddAddrs(peer2.ID(), peer2.Addrs(), peerstore.PermanentAddrTTL) // Create the consensus instances and initialize them with a state. // Note that state is just used for local initialization, and that, // only states submitted via CommitState() alters the state of the // cluster. consensus1 := NewConsensus(&raftState{3}) consensus2 := NewConsensus(&raftState{3}) consensus3 := NewConsensus(&raftState{3}) // Create LibP2P transports Raft transport1, err := NewLibp2pTransport(peer1, time.Minute) if err != nil { fmt.Println(err) return } transport2, err := NewLibp2pTransport(peer2, time.Minute) if err != nil { fmt.Println(err) return } transport3, err := NewLibp2pTransport(peer3, time.Minute) if err != nil { fmt.Println(err) return } defer transport1.Close() defer transport2.Close() defer transport3.Close() // Create Raft servers configuration for bootstrapping the cluster // Note that both IDs and Address are set to the Peer ID. servers := make([]raft.Server, 0) for _, h := range []host.Host{peer1, peer2, peer3} { servers = append(servers, raft.Server{ Suffrage: raft.Voter, ID: raft.ServerID(h.ID().String()), Address: raft.ServerAddress(h.ID().String()), }) } serversCfg := raft.Configuration{Servers: servers} // Create Raft Configs. The Local ID is the PeerOID config1 := raft.DefaultConfig() config1.LogOutput = io.Discard config1.Logger = nil config1.LocalID = raft.ServerID(peer1.ID().String()) config2 := raft.DefaultConfig() config2.LogOutput = io.Discard config2.Logger = nil config2.LocalID = raft.ServerID(peer2.ID().String()) config3 := raft.DefaultConfig() config3.LogOutput = io.Discard config3.Logger = nil config3.LocalID = raft.ServerID(peer3.ID().String()) // Create snapshotStores. Use FileSnapshotStore in production. snapshots1 := raft.NewInmemSnapshotStore() snapshots2 := raft.NewInmemSnapshotStore() snapshots3 := raft.NewInmemSnapshotStore() // Create the InmemStores for use as log store and stable store. logStore1 := raft.NewInmemStore() logStore2 := raft.NewInmemStore() logStore3 := raft.NewInmemStore() // Bootsrap the stores with the serverConfigs raft.BootstrapCluster(config1, logStore1, logStore1, snapshots1, transport1, serversCfg.Clone()) raft.BootstrapCluster(config2, logStore2, logStore2, snapshots2, transport2, serversCfg.Clone()) raft.BootstrapCluster(config3, logStore3, logStore3, snapshots3, transport3, serversCfg.Clone()) // Create Raft objects. Our consensus provides an implementation of // Raft.FSM raft1, err := raft.NewRaft(config1, consensus1.FSM(), logStore1, logStore1, snapshots1, transport1) if err != nil { log.Fatal(err) } raft2, err := raft.NewRaft(config2, consensus2.FSM(), logStore2, logStore2, snapshots2, transport2) if err != nil { log.Fatal(err) } raft3, err := raft.NewRaft(config3, consensus3.FSM(), logStore3, logStore3, snapshots3, transport3) if err != nil { log.Fatal(err) } // Create the actors using the Raft nodes actor1 := NewActor(raft1) actor2 := NewActor(raft2) actor3 := NewActor(raft3) // Set the actors so that we can CommitState() and GetCurrentState() consensus1.SetActor(actor1) consensus2.SetActor(actor2) consensus3.SetActor(actor3) // This function updates the cluster state commiting 1000 updates. updateState := func(c *Consensus) { nUpdates := 0 for { if nUpdates >= 1000 { break } newState := &raftState{nUpdates * 2} // CommitState() blocks until the state has been // agreed upon by everyone agreedState, err := c.CommitState(newState) if err != nil { fmt.Println(err) continue } if agreedState == nil { fmt.Println("agreedState is nil: commited on a non-leader?") continue } agreedRaftState := agreedState.(*raftState) nUpdates++ if nUpdates%200 == 0 { fmt.Printf("Performed %d updates. Current state value: %d\n", nUpdates, agreedRaftState.Value) } } } // Provide some time for leader election time.Sleep(5 * time.Second) // Run the 1000 updates on the leader // Barrier() will wait until updates have been applied if actor1.IsLeader() { updateState(consensus1) } else if actor2.IsLeader() { updateState(consensus2) } else if actor3.IsLeader() { updateState(consensus3) } // Wait for updates to arrive. time.Sleep(5 * time.Second) // Shutdown raft and wait for it to complete // (ignoring errors) raft1.Shutdown().Error() raft2.Shutdown().Error() raft3.Shutdown().Error() // Final states finalState1, err := consensus1.GetCurrentState() if err != nil { fmt.Println(err) return } finalState2, err := consensus2.GetCurrentState() if err != nil { fmt.Println(err) return } finalState3, err := consensus3.GetCurrentState() if err != nil { fmt.Println(err) return } finalRaftState1 := finalState1.(*raftState) finalRaftState2 := finalState2.(*raftState) finalRaftState3 := finalState3.(*raftState) fmt.Printf("Raft1 final state: %d\n", finalRaftState1.Value) fmt.Printf("Raft2 final state: %d\n", finalRaftState2.Value) fmt.Printf("Raft3 final state: %d\n", finalRaftState3.Value) }
Output: Performed 200 updates. Current state value: 398 Performed 400 updates. Current state value: 798 Performed 600 updates. Current state value: 1198 Performed 800 updates. Current state value: 1598 Performed 1000 updates. Current state value: 1998 Raft1 final state: 1998 Raft2 final state: 1998 Raft3 final state: 1998
Index ¶
- Constants
- Variables
- func DecodeSnapshot(state consensus.State, r io.Reader) error
- func EncodeSnapshot(state consensus.State, w io.Writer) error
- func NewLibp2pTransport(h host.Host, timeout time.Duration) (*raft.NetworkTransport, error)
- type Actor
- type Consensus
- func (opLog *Consensus) CommitOp(op consensus.Op) (consensus.State, error)
- func (c *Consensus) CommitState(state consensus.State) (consensus.State, error)
- func (c *Consensus) FSM() *FSM
- func (c *Consensus) GetCurrentState() (consensus.State, error)
- func (opLog *Consensus) GetLogHead() (consensus.State, error)
- func (opLog *Consensus) Rollback(state consensus.State) error
- func (c *Consensus) SetActor(actor consensus.Actor)
- func (c *Consensus) Subscribe() <-chan struct{}
- func (c *Consensus) Unsubscribe()
- type FSM
- type HcLogToLogger
- func (log *HcLogToLogger) Debug(msg string, args ...interface{})
- func (log *HcLogToLogger) Error(msg string, args ...interface{})
- func (l *HcLogToLogger) GetLevel() hclog.Level
- func (log *HcLogToLogger) ImpliedArgs() []interface{}
- func (log *HcLogToLogger) Info(msg string, args ...interface{})
- func (log *HcLogToLogger) IsDebug() bool
- func (log *HcLogToLogger) IsError() bool
- func (log *HcLogToLogger) IsInfo() bool
- func (log *HcLogToLogger) IsTrace() bool
- func (log *HcLogToLogger) IsWarn() bool
- func (log *HcLogToLogger) Log(level hclog.Level, msg string, args ...interface{})
- func (log *HcLogToLogger) Name() string
- func (log *HcLogToLogger) Named(name string) hclog.Logger
- func (log *HcLogToLogger) ResetNamed(name string) hclog.Logger
- func (log *HcLogToLogger) SetLevel(level hclog.Level)
- func (log *HcLogToLogger) StandardLogger(opts *hclog.StandardLoggerOptions) *log.Logger
- func (log *HcLogToLogger) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer
- func (log *HcLogToLogger) Trace(msg string, args ...interface{})
- func (log *HcLogToLogger) Warn(msg string, args ...interface{})
- func (log *HcLogToLogger) With(args ...interface{}) hclog.Logger
- type Marshable
Examples ¶
Constants ¶
const RaftProtocol protocol.ID = "/raft/1.0.0/rpc"
Variables ¶
var ErrNoState = errors.New("no state has been agreed upon yet")
ErrNoState is returned when no state has been agreed upon by the consensus protocol
var MaxSubscriberCh = 128
MaxSubscriberCh indicates how much buffering the subscriber channel has.
var SetStateTimeout = 1 * time.Second
SetStateTimeout specifies how long before giving up on setting a state
Functions ¶
func DecodeSnapshot ¶
DecodeSnapshot de-serializes a state encoded with EncodeSnapshot onto the given state. It is used by our raft's FSM implementation which allows raft to read snapshots. If state implements Marshable, it will use the Unmarshal function. Please make sure that the state's underlying type is a pointer.
func EncodeSnapshot ¶
EncodeSnapshot serializes a state and is used by our raft's FSM implementation to describe the format raft stores snapshots on disk. In the state implements Marshable, it will use the Marshal function.
func NewLibp2pTransport ¶
Types ¶
type Actor ¶
Actor implements a consensus.Actor, allowing to SetState in a libp2p Consensus system. In order to do this it uses hashicorp/raft implementation of the Raft algorithm.
func (*Actor) Leader ¶
Leader returns the LibP2P ID of the Raft leader or an error if there is no leader.
func (*Actor) SetState ¶
SetState attempts to set the state of the cluster to the state represented by the given Node. It will block until the state is commited, and will then return then the new state.
This does not mean that the new state is already available in all the nodes in the cluster, but that it will be at some point because it is part of the authoritative log.
Only the Raft leader can set the state. Otherwise, an error will be returned.
type Consensus ¶
type Consensus struct {
// contains filtered or unexported fields
}
Consensus implements both the go-libp2p-consensus Consensus and the OpLogConsensus interfaces. This is because the Consensus interface is just a particular case of the OpLog interface, where the operation applied holds the new version of the state and replaces the current one with it.
func NewConsensus ¶
NewConsensus returns a new consensus. The state is provided so that the appropiate internal structures can be initialized. The underlying State type should be a pointer, otherwise some operations will not work.
Note that this initial state is not agreed upon in the cluster and that GetCurrentState() will return an error until a state is agreed upon. Only states submitted via CommitState() are agreed upon.
The state can optionally implement the Marshable interface. The methods will be used to serialize and deserialize Raft snapshots. If Marshable is not implemented, the state will be [de]serialized using Msgpack.
We recommend using OpLog when handling very big states, as otherwise the full state will need to be dump into memory on every commit, before being sent on the wire as a single Raft log entry.
func NewOpLog ¶
NewOpLog returns a new OpLog. Because the State and the Ops are generic, and we know nothing about them, they need to be provided in order to initialize internal structures with the right types. Both the state and the op parameters are not used nor agreed upon just yet. Both the state and the op underlying types should be pointers to something, otherwise expect some misbehaviours.
It is important to note that the system agrees on an operation log, but the resulting state can only be considered agreed-upon if all the operations in the log can be or were successfully applied to it (with Op.ApplyTo()). See the notes in CommitOp() for more information.
The state and the op can optionally implement the Marshable interface which allows user-provided object to decide how they are serialized and deserialized.
We recommend keeping operations as small as possible. Note that operations need to be fully serialized copied on memory before being sent (due to Raft requirements).
func (*Consensus) CommitOp ¶
CommitOp submits a new operation to the system. If the operation is agreed-upon, then and only then will it call ApplyTo() and modify the current state (log head).
If ApplyTo() fails, the operation stays nevertheless in the log, but the state of the system is marked as invalid and nil is returned. From that point the state is marked as inconsistent and calls to GetLogHead() will fail for this node, even though updates will be processed as usual.
Inconsistent states can be rescued using Rollback(). The underlying object to the returned State will be the one provided during initialization.
func (*Consensus) CommitState ¶
CommitState pushes a new state to the system and returns the state the system has agreed upon. It will block until that happens.
Note that only the Raft leader can commit a state.
func (*Consensus) FSM ¶
FSM returns the raft.FSM implementation provided by go-libp2p-raft. It is necessary to initialize raft with this FSM.
func (*Consensus) GetCurrentState ¶
GetCurrentState returns the upon-agreed state of the system. It will return an error when no state has been agreed upon or when the state cannot be ensured to be that on which the rest of the system has agreed-upon. The underlying state will be the same as provided initialization.
func (*Consensus) GetLogHead ¶
GetLogHead returns the newest known state of the log. It will return an error when no state has been agreed upon or when the state cannot be ensured to be that on which the rest of the system has agreed-upon.
func (*Consensus) Rollback ¶
Rollback hammers the provided state into the system. It does not un-do any operations nor checks that the given state was previously agreed-upon. A special rollback operation gets added to the log, like any other operation. A successful rollback marks an inconsistent state as valid again.
Note that the full state needs to be loaded onto memory (like an operation) so this is potentially dangerous with very large states.
func (*Consensus) SetActor ¶
SetActor changes the actor in charge of submitting new states to the system.
func (*Consensus) Subscribe ¶
func (c *Consensus) Subscribe() <-chan struct{}
Subscribe returns a channel which is notified on every state update.
func (*Consensus) Unsubscribe ¶
func (c *Consensus) Unsubscribe()
Unsubscribe closes the channel returned upon Subscribe() (if any).
type FSM ¶
type FSM struct {
// contains filtered or unexported fields
}
FSM implements a minimal raft.FSM that holds a generic consensus.State and applies generic Ops to it. The state can be serialized/deserialized, snappshotted and restored. FSM is used by Consensus to keep track of the state of an OpLog. Please use the value returned by Consensus.FSM() to initialize Raft. Do not use this object directly.
type HcLogToLogger ¶ added in v0.3.0
type HcLogToLogger struct {
// contains filtered or unexported fields
}
HcLogToLogger implements github.com/hashicorp/go-hclog
func (*HcLogToLogger) Debug ¶ added in v0.3.0
func (log *HcLogToLogger) Debug(msg string, args ...interface{})
func (*HcLogToLogger) Error ¶ added in v0.3.0
func (log *HcLogToLogger) Error(msg string, args ...interface{})
func (*HcLogToLogger) GetLevel ¶ added in v0.5.0
func (l *HcLogToLogger) GetLevel() hclog.Level
func (*HcLogToLogger) ImpliedArgs ¶ added in v0.3.0
func (log *HcLogToLogger) ImpliedArgs() []interface{}
func (*HcLogToLogger) Info ¶ added in v0.3.0
func (log *HcLogToLogger) Info(msg string, args ...interface{})
func (*HcLogToLogger) IsDebug ¶ added in v0.3.0
func (log *HcLogToLogger) IsDebug() bool
func (*HcLogToLogger) IsError ¶ added in v0.3.0
func (log *HcLogToLogger) IsError() bool
func (*HcLogToLogger) IsInfo ¶ added in v0.3.0
func (log *HcLogToLogger) IsInfo() bool
func (*HcLogToLogger) IsTrace ¶ added in v0.3.0
func (log *HcLogToLogger) IsTrace() bool
func (*HcLogToLogger) IsWarn ¶ added in v0.3.0
func (log *HcLogToLogger) IsWarn() bool
func (*HcLogToLogger) Log ¶ added in v0.3.0
func (log *HcLogToLogger) Log(level hclog.Level, msg string, args ...interface{})
func (*HcLogToLogger) Name ¶ added in v0.3.0
func (log *HcLogToLogger) Name() string
func (*HcLogToLogger) Named ¶ added in v0.3.0
func (log *HcLogToLogger) Named(name string) hclog.Logger
func (*HcLogToLogger) ResetNamed ¶ added in v0.3.0
func (log *HcLogToLogger) ResetNamed(name string) hclog.Logger
func (*HcLogToLogger) SetLevel ¶ added in v0.3.0
func (log *HcLogToLogger) SetLevel(level hclog.Level)
func (*HcLogToLogger) StandardLogger ¶ added in v0.3.0
func (log *HcLogToLogger) StandardLogger(opts *hclog.StandardLoggerOptions) *log.Logger
func (*HcLogToLogger) StandardWriter ¶ added in v0.3.0
func (log *HcLogToLogger) StandardWriter(opts *hclog.StandardLoggerOptions) io.Writer
func (*HcLogToLogger) Trace ¶ added in v0.3.0
func (log *HcLogToLogger) Trace(msg string, args ...interface{})
func (*HcLogToLogger) Warn ¶ added in v0.3.0
func (log *HcLogToLogger) Warn(msg string, args ...interface{})
func (*HcLogToLogger) With ¶ added in v0.3.0
func (log *HcLogToLogger) With(args ...interface{}) hclog.Logger
type Marshable ¶
type Marshable interface { // Marshal serializes the state and writes it in the Writer. Marshal(io.Writer) error // Unmarshal deserializes the state from the given Reader. The // unmarshaled object must fully replace the original data. We // recommend that Unmarshal errors when the object being deserialized // produces a non-consistent result (for example by trying to // deserialized a struct with wrong fields onto a different struct. // Rollback operations will attempt to deserialize State objects on Op // objects first, thus this case must error. Unmarshal(io.Reader) error }
Marshable is an interface to be implemented by consensus.States and consensus.Op objects that wish to choose their serialization format for Raft snapshots. When needing to serialize States or Ops, Marhsal and Unmarshal methods will be used when provided. Otherwise, a default serialization strategy will be used (using Msgpack).