Documentation ¶
Index ¶
- Constants
- Variables
- func EtcdIDToString(id uint64) string
- func GetConstructor(cfg *config.Config, hub *component.ComponentHub, cdb consensus.ChainWAL, ...) consensus.Constructor
- func GetName() string
- func Init(raftCfg *config.RaftConfig)
- func MaxUint64(x, y uint64) uint64
- func RecoverExit()
- func ValidateGenesis(genesis *types.Genesis) error
- type BlockFactory
- func (bf *BlockFactory) BlockFactory() consensus.BlockFactory
- func (bf *BlockFactory) ClusterInfo(bestBlockHash []byte) *types.GetClusterInfoResponse
- func (bf *BlockFactory) ConfChange(req *types.MembershipChange) (*consensus.Member, error)
- func (bf *BlockFactory) ConfChangeInfo(requestID uint64) (*types.ConfChangeProgress, error)
- func (bf *BlockFactory) ConsensusInfo() *types.ConsensusInfo
- func (bf *BlockFactory) GetType() consensus.ConsensusType
- func (bf *BlockFactory) HasWAL() bool
- func (bf *BlockFactory) Info() string
- func (bf *BlockFactory) InitCluster(cfg *config.Config) error
- func (bf *BlockFactory) IsBlockValid(block *types.Block, bestBlock *types.Block) error
- func (bf *BlockFactory) IsConnectedBlock(block *types.Block) bool
- func (bf *BlockFactory) IsForkEnable() bool
- func (bf *BlockFactory) IsTransactionValid(tx *types.Tx) bool
- func (bf *BlockFactory) JobQueue() chan<- interface{}
- func (bf *BlockFactory) MakeConfChangeProposal(req *types.MembershipChange) (*consensus.ConfChangePropose, error)
- func (bf *BlockFactory) NeedNotify() bool
- func (bf *BlockFactory) NeedReorganization(rootNo types.BlockNo) bool
- func (bf *BlockFactory) QueueJob(now time.Time, jq chan<- interface{})
- func (bf *BlockFactory) QuitChan() chan interface{}
- func (bf *BlockFactory) RaftAccessor() consensus.AergoRaftAccessor
- func (bf *BlockFactory) Save(tx consensus.TxWriter) error
- func (bf *BlockFactory) Start()
- func (bf *BlockFactory) Ticker() *time.Ticker
- func (bf *BlockFactory) Update(block *types.Block)
- func (bf *BlockFactory) VerifySign(block *types.Block) error
- func (bf *BlockFactory) VerifyTimestamp(*types.Block) bool
- type ChainSnapshotter
- type Cluster
- func GetClusterInfo(hs *component.ComponentHub, bestHash []byte) (*Cluster, *types.HardStateInfo, error)
- func NewCluster(chainID []byte, bf *BlockFactory, raftName string, p2pPeerID types.PeerID, ...) *Cluster
- func NewClusterFromMemberAttrs(clusterID uint64, chainID []byte, memberAttrs []*types.MemberAttr) (*Cluster, error)
- func (cl *Cluster) AddInitialMembers(mbrs []*types.MemberAttr) error
- func (cl *Cluster) AfterConfChange(cc *raftpb.ConfChange, member *consensus.Member, err error)
- func (cl *Cluster) AppliedMembers() *Members
- func (cl *Cluster) ChangeMembership(req *types.MembershipChange, nowait bool) (*consensus.Member, error)
- func (cl *Cluster) ClusterID() uint64
- func (cl *Cluster) GenerateID(useBackup bool)
- func (cl *Cluster) IsIDRemoved(id uint64) bool
- func (cl *Cluster) Members() *Members
- func (cl *Cluster) NewMemberFromAddReq(req *types.MembershipChange) (*consensus.Member, error)
- func (cl *Cluster) NewMemberFromRemoveReq(req *types.MembershipChange) (*consensus.Member, error)
- func (cl *Cluster) NodeID() uint64
- func (cl *Cluster) NodeName() string
- func (cl *Cluster) NodePeerID() string
- func (cl *Cluster) Quorum() uint32
- func (cl *Cluster) Recover(snapshot *raftpb.Snapshot) (bool, error)
- func (cl *Cluster) RecoverIdentity(id *consensus.RaftIdentity) error
- func (cl *Cluster) RemovedMembers() *Members
- func (cl *Cluster) ResetMembers()
- func (cl *Cluster) SetClusterID(clusterid uint64)
- func (cl *Cluster) SetNodeID(nodeid uint64)
- func (cl *Cluster) SetThisNodeID() error
- func (cl *Cluster) ValidateAndMergeExistingCluster(existingCl *Cluster) bool
- type ClusterProgress
- type CommitProgress
- type ErrorMembershipChange
- type HttpTransportWrapper
- type LeaderStatus
- type MemberProgress
- type MemberProgressState
- type Members
- type NotifyFn
- type Proposed
- type RaftInfo
- type RaftLogger
- func (l *RaftLogger) Debug(args ...interface{})
- func (l *RaftLogger) Debugf(format string, args ...interface{})
- func (l *RaftLogger) Error(args ...interface{})
- func (l *RaftLogger) Errorf(format string, args ...interface{})
- func (l RaftLogger) Fatal(args ...interface{})
- func (l *RaftLogger) Fatalf(format string, args ...interface{})
- func (l *RaftLogger) Info(args ...interface{})
- func (l *RaftLogger) Infof(format string, args ...interface{})
- func (l *RaftLogger) Panic(args ...interface{})
- func (l *RaftLogger) Panicf(format string, args ...interface{})
- func (l *RaftLogger) Warning(args ...interface{})
- func (l *RaftLogger) Warningf(format string, args ...interface{})
- type RaftOperator
- type RaftServerState
- type Transporter
- type WalDB
- type Work
Constants ¶
const ( MembersNameInit = "init" MembersNameApplied = "applied" MembersNameRemoved = "removed" InvalidClusterID = 0 )
const ( DefaultBlockFactoryTickMs = 100 MinBlockFactoryTickMs = 10 DefaultTickMS = time.Millisecond * 50 DefaultElectionTickCount = 10 DefaultSlowNodeGap = 100 DefaultSnapFrequency = 30 )
const ( BackendP2P = "aergop2p" BackendHTTP = "http" )
const ( RaftServerStateRestart = iota RaftServerStateNewCluster RaftServerStateJoinCluster )
const (
HasNoLeader uint64 = 0
)
Variables ¶
var ( ErrClusterNotReady = errors.New("cluster is not ready") ErrNotRaftLeader = errors.New("this node is not leader") ErrInvalidConsensusName = errors.New("invalid consensus name") ErrCancelGenerate = errors.New("cancel generating block because work becomes stale") )
var ( MaxConfChangeTimeOut = time.Second * 100 ErrClusterHasNoMember = errors.New("cluster has no member") ErrNotExistRaftMember = errors.New("not exist member of raft cluster") ErrNoEnableSyncPeer = errors.New("no peer to sync chain") ErrMemberAlreadyApplied = errors.New("member is already added") ErrInvalidMembershipReqType = errors.New("invalid type of membership change request") ErrPendingConfChange = errors.New("pending membership change request is in progree. try again when it is finished") ErrConChangeTimeOut = errors.New("timeouted membership change request") ErrConfChangeChannelBusy = errors.New("channel of conf change propose is busy") ErrCCMemberIsNil = errors.New("memeber is nil") ErrNotMatchedRaftName = errors.New("mismatched name of raft identity") ErrNotMatchedRaftPeerID = errors.New("mismatched peerid of raft identity") ErrNotExitRaftProgress = errors.New("progress of this node doesn't exist") ErrUnhealtyNodeExist = errors.New("can't add some node if unhealthy nodes exist") ErrRemoveHealthyNode = errors.New("remove of a healthy node may cause the cluster to hang") )
var ( ErrEmptyBPs = errors.New("BP list is empty") ErrNotIncludedRaftMember = errors.New("this node isn't included in initial raft members") ErrDupBP = errors.New("raft bp description is duplicated") ErrInvalidRaftPeerID = errors.New("peerID of current raft bp is not equals to p2p configure") )
var ( RaftTick = DefaultTickMS // blockIntervalMs is the block genration interval in milli-seconds. RaftSkipEmptyBlock = false BlockFactoryTickMs time.Duration BlockIntervalMs time.Duration ConfSnapFrequency uint64 = DefaultSnapFrequency ConfSnapshotCatchUpEntriesN uint64 = ConfSnapFrequency ElectionTickCount = DefaultElectionTickCount MaxSlowNodeGap uint64 = DefaultSlowNodeGap // Criteria for determining whether the server is in a slow state StopDupCommit = false )
var ( MaxTimeOutCluter = time.Second * 10 MaxTryGetCluster = 3 ErrGetClusterReplyC = errors.New("reply channel of getcluster request is closed") ErrGetClusterTimeout = errors.New("timeout for getcluster") ErrGetClusterEmpty = errors.New("getcluster reply is empty") ErrGetClusterFail = errors.New("failed to get cluster info") )
var ( ErrRaftNotReady = errors.New("raft library is not initialized") ErrCCAlreadyApplied = errors.New("conf change entry is already applied") ErrInvalidMember = errors.New("member of conf change is invalid") ErrCCAlreadyAdded = errors.New("member has already added") ErrCCAlreadyRemoved = errors.New("member has already removed") ErrCCNoMemberToRemove = errors.New("there is no member to remove") ErrEmptySnapshot = errors.New("received empty snapshot") ErrInvalidRaftIdentity = errors.New("raft identity is not set") ErrProposeNilBlock = errors.New("proposed block is nil") )
var ( DfltTimeWaitPeerLive = time.Second * 5 ErrNotMsgSnap = errors.New("not pb.MsgSnap") ErrClusterMismatchConfState = errors.New("members of cluster doesn't match with raft confstate") )
var ( ErrInvalidEntry = errors.New("Invalid raftpb.entry") ErrWalEntryTooLowTerm = errors.New("term of wal entry is too low") )
var ( ErrWalGetHardState = errors.New("failed to read hard state") ErrWalGetLastIdx = errors.New("failed to read last Idx") )
var (
DEBUG_PROPOSE_SLEEP = "DEBUG_PROPOSE_SLEEP"
)
var (
ErrInvCCType = errors.New("change type of ")
)
var ErrInvalidWalEntry = errors.New("invalid wal entry")
var (
ErrRaftStatusEmpty = errors.New("raft status is empty")
)
var (
ErrUnmarshal = errors.New("failed to unmarshalEntryData log entry")
)
var ErrWalConvBlock = errors.New("failed to convert bytes of block from wal entry")
var ( MemberProgressStateNames = map[MemberProgressState]string{ MemberProgressStateHealthy: "MemberProgressStateHealthy", MemberProgressStateSlow: "MemberProgressStateSlow", MemberProgressStateSyncing: "MemberProgressStateSyncing", MemberProgressStateUnknown: "MemberProgressStateUnknown", } )
Functions ¶
func EtcdIDToString ¶
func GetConstructor ¶
func GetConstructor(cfg *config.Config, hub *component.ComponentHub, cdb consensus.ChainWAL, sdb *state.ChainStateDB, pa p2pcommon.PeerAccessor) consensus.Constructor
GetConstructor build and returns consensus.Constructor from New function.
func Init ¶
func Init(raftCfg *config.RaftConfig)
func RecoverExit ¶
func RecoverExit()
func ValidateGenesis ¶
Types ¶
type BlockFactory ¶
type BlockFactory struct { *component.ComponentHub consensus.ChainWAL ID string // contains filtered or unexported fields }
BlockFactory implements a raft block factory which generate block each cfg.Consensus.BlockIntervalMs if this node is leader of raft
This can be used for testing purpose.
func New ¶
func New(cfg *config.Config, hub *component.ComponentHub, cdb consensus.ChainWAL, sdb *state.ChainStateDB, pa p2pcommon.PeerAccessor) (*BlockFactory, error)
New returns a BlockFactory.
func (*BlockFactory) BlockFactory ¶
func (bf *BlockFactory) BlockFactory() consensus.BlockFactory
BlockFactory returns r itself.
func (*BlockFactory) ClusterInfo ¶
func (bf *BlockFactory) ClusterInfo(bestBlockHash []byte) *types.GetClusterInfoResponse
ClusterInfo returns members of cluster and hardstate info corresponding to best block hash
func (*BlockFactory) ConfChange ¶
func (bf *BlockFactory) ConfChange(req *types.MembershipChange) (*consensus.Member, error)
ConfChange change membership of raft cluster and returns new membership
func (*BlockFactory) ConfChangeInfo ¶
func (bf *BlockFactory) ConfChangeInfo(requestID uint64) (*types.ConfChangeProgress, error)
ConfChangeInfo returns ConfChangeProgress queries by request ID of ConfChange
func (*BlockFactory) ConsensusInfo ¶
func (bf *BlockFactory) ConsensusInfo() *types.ConsensusInfo
func (*BlockFactory) GetType ¶
func (bf *BlockFactory) GetType() consensus.ConsensusType
func (*BlockFactory) HasWAL ¶
func (bf *BlockFactory) HasWAL() bool
func (*BlockFactory) InitCluster ¶
func (bf *BlockFactory) InitCluster(cfg *config.Config) error
func (*BlockFactory) IsBlockValid ¶
IsBlockValid checks the consensus level validity of a block.
func (*BlockFactory) IsConnectedBlock ¶
func (bf *BlockFactory) IsConnectedBlock(block *types.Block) bool
check already connect block In raft, block hash may already have been writtern when writing log entry.
func (*BlockFactory) IsForkEnable ¶
func (bf *BlockFactory) IsForkEnable() bool
func (*BlockFactory) IsTransactionValid ¶
func (bf *BlockFactory) IsTransactionValid(tx *types.Tx) bool
IsTransactionValid checks the onsensus level validity of a transaction
func (*BlockFactory) JobQueue ¶
func (bf *BlockFactory) JobQueue() chan<- interface{}
// waitUntilStartable wait until this chain synchronizes with more than half of all peers
func (bf *BlockFactory) waitSyncWithMajority() error { ticker := time.NewTicker(peerCheckInterval) for { select { case <-ticker.C: if synced, err := bf.cl.hasSynced(); err != nil { logger.Error().Err(err).Msg("failed to check sync with a majority of peers") return err } else if synced { return nil } case <-bf.QuitChan(): logger.Info().Msg("quit while wait sync") return ErrBFQuit default: } } }
JobQueue returns the queue for block production triggering.
func (*BlockFactory) MakeConfChangeProposal ¶
func (bf *BlockFactory) MakeConfChangeProposal(req *types.MembershipChange) (*consensus.ConfChangePropose, error)
func (*BlockFactory) NeedNotify ¶
func (bf *BlockFactory) NeedNotify() bool
func (*BlockFactory) NeedReorganization ¶
func (bf *BlockFactory) NeedReorganization(rootNo types.BlockNo) bool
NeedReorganization has nothing to do.
func (*BlockFactory) QueueJob ¶
func (bf *BlockFactory) QueueJob(now time.Time, jq chan<- interface{})
QueueJob send a block triggering information to jq.
func (*BlockFactory) QuitChan ¶
func (bf *BlockFactory) QuitChan() chan interface{}
QuitChan returns the channel from which consensus-related goroutines check when shutdown is initiated.
func (*BlockFactory) RaftAccessor ¶
func (bf *BlockFactory) RaftAccessor() consensus.AergoRaftAccessor
func (*BlockFactory) Save ¶
func (bf *BlockFactory) Save(tx consensus.TxWriter) error
Save has nothging to do.
func (*BlockFactory) Start ¶
func (bf *BlockFactory) Start()
Start run a raft block factory service.
func (*BlockFactory) Ticker ¶
func (bf *BlockFactory) Ticker() *time.Ticker
Ticker returns a time.Ticker for the main consensus loop.
func (*BlockFactory) Update ¶
func (bf *BlockFactory) Update(block *types.Block)
Update has nothging to do.
func (*BlockFactory) VerifySign ¶
func (bf *BlockFactory) VerifySign(block *types.Block) error
VerifySign checks the consensus level validity of a block.
func (*BlockFactory) VerifyTimestamp ¶
func (bf *BlockFactory) VerifyTimestamp(*types.Block) bool
VerifyTimestamp checks the validity of the block timestamp.
type ChainSnapshotter ¶
type ChainSnapshotter struct { sync.Mutex *component.ComponentHub // contains filtered or unexported fields }
func (*ChainSnapshotter) SaveFromRemote ¶
func (chainsnap *ChainSnapshotter) SaveFromRemote(r io.Reader, id uint64, msg raftpb.Message) (int64, error)
chainSnapshotter rece ives snapshot from http request TODO replace rafthttp with p2p
type Cluster ¶
type Cluster struct { component.ICompSyncRequester sync.Mutex Size uint32 // contains filtered or unexported fields }
raft cluster membership copy from dpos/bp TODO refactoring Cluster represents a cluster of block producers.
func GetClusterInfo ¶
func GetClusterInfo(hs *component.ComponentHub, bestHash []byte) (*Cluster, *types.HardStateInfo, error)
GetBestBlock returns the current best block from chainservice
func NewCluster ¶
func (*Cluster) AddInitialMembers ¶
func (cl *Cluster) AddInitialMembers(mbrs []*types.MemberAttr) error
func (*Cluster) AfterConfChange ¶
func (*Cluster) AppliedMembers ¶
func (*Cluster) ChangeMembership ¶
func (*Cluster) GenerateID ¶
GenerateID generate cluster ID by hashing IDs of all initial members
func (*Cluster) IsIDRemoved ¶
IsIDRemoved return true if given raft id is not exist in cluster
func (*Cluster) NewMemberFromAddReq ¶
func (*Cluster) NewMemberFromRemoveReq ¶
func (*Cluster) NodePeerID ¶
func (*Cluster) RecoverIdentity ¶
func (cl *Cluster) RecoverIdentity(id *consensus.RaftIdentity) error
RecoverIdentity reset node id and name of cluster. raft identity is saved in WAL and reset when server is restarted
func (*Cluster) RemovedMembers ¶
func (*Cluster) ResetMembers ¶
func (cl *Cluster) ResetMembers()
func (*Cluster) SetClusterID ¶
func (*Cluster) SetThisNodeID ¶
func (*Cluster) ValidateAndMergeExistingCluster ¶
ValidateAndMergeExistingCluster tests if members of existing cluster are matched with this cluster
type ClusterProgress ¶
type ClusterProgress struct { N int MemberProgresses map[uint64]*MemberProgress }
func (*ClusterProgress) ToString ¶
func (cp *ClusterProgress) ToString() string
type CommitProgress ¶
func (*CommitProgress) GetConnect ¶
func (cp *CommitProgress) GetConnect() *commitEntry
func (*CommitProgress) GetRequest ¶
func (cp *CommitProgress) GetRequest() *commitEntry
func (*CommitProgress) IsReadyToPropose ¶
func (cp *CommitProgress) IsReadyToPropose() bool
func (*CommitProgress) UpdateConnect ¶
func (cp *CommitProgress) UpdateConnect(ce *commitEntry)
func (*CommitProgress) UpdateRequest ¶
func (cp *CommitProgress) UpdateRequest(ce *commitEntry)
type ErrorMembershipChange ¶
type ErrorMembershipChange struct {
Err error
}
func (ErrorMembershipChange) Error ¶
func (e ErrorMembershipChange) Error() string
type HttpTransportWrapper ¶
type LeaderStatus ¶
type MemberProgress ¶
type MemberProgress struct { MemberID uint64 Status MemberProgressState LogDifference uint64 // contains filtered or unexported fields }
func (*MemberProgress) ToString ¶
func (cp *MemberProgress) ToString() string
type MemberProgressState ¶
type MemberProgressState int32
const ( MemberProgressStateHealthy MemberProgressState = iota MemberProgressStateSlow MemberProgressStateSyncing MemberProgressStateUnknown )
type Members ¶
type Members struct { MapByID map[uint64]*consensus.Member // restore from DB or snapshot MapByName map[string]*consensus.Member Index map[types.PeerID]uint64 // peer ID to raft ID mapping Addresses []string //for raft server TODO remove // contains filtered or unexported fields }
func (*Members) ToMemberAttrArray ¶
func (mbrs *Members) ToMemberAttrArray() []*types.MemberAttr
type NotifyFn ¶
type NotifyFn func(event *message.RaftClusterEvent)
type RaftLogger ¶
type RaftLogger struct {
// contains filtered or unexported fields
}
Logger is a logging unit. It controls the flow of messages to a given (swappable) backend.
func NewRaftLogger ¶
func NewRaftLogger(logger *log.Logger) *RaftLogger
func (*RaftLogger) Debug ¶
func (l *RaftLogger) Debug(args ...interface{})
func (*RaftLogger) Debugf ¶
func (l *RaftLogger) Debugf(format string, args ...interface{})
func (*RaftLogger) Error ¶
func (l *RaftLogger) Error(args ...interface{})
func (*RaftLogger) Errorf ¶
func (l *RaftLogger) Errorf(format string, args ...interface{})
func (RaftLogger) Fatal ¶
func (l RaftLogger) Fatal(args ...interface{})
func (*RaftLogger) Fatalf ¶
func (l *RaftLogger) Fatalf(format string, args ...interface{})
func (*RaftLogger) Info ¶
func (l *RaftLogger) Info(args ...interface{})
func (*RaftLogger) Infof ¶
func (l *RaftLogger) Infof(format string, args ...interface{})
func (*RaftLogger) Panic ¶
func (l *RaftLogger) Panic(args ...interface{})
func (*RaftLogger) Panicf ¶
func (l *RaftLogger) Panicf(format string, args ...interface{})
func (*RaftLogger) Warning ¶
func (l *RaftLogger) Warning(args ...interface{})
func (*RaftLogger) Warningf ¶
func (l *RaftLogger) Warningf(format string, args ...interface{})
type RaftOperator ¶
type RaftOperator struct {
// contains filtered or unexported fields
}
func (*RaftOperator) ProposeConfChange ¶
func (rop *RaftOperator) ProposeConfChange(proposal *consensus.ConfChangePropose) error
type RaftServerState ¶
type RaftServerState int
type Transporter ¶
type Transporter interface { // Start starts the given Transporter. // Start MUST be called before calling other functions in the interface. Start() error // Handler returns the HTTP handler of the transporter. // A transporter HTTP handler handles the HTTP requests // from remote peers. // The handler MUST be used to handle RaftPrefix(/raft) // endpoint. Handler() http.Handler // Send sends out the given messages to the remote peers. // Each message has a To field, which is an id that maps // to an existing peer in the transport. // If the id cannot be found in the transport, the message // will be ignored. Send(m []raftpb.Message) // SendSnapshot sends out the given snapshot message to a remote peer. // The behavior of SendSnapshot is similar to Send. SendSnapshot(m snap.Message) // AddPeer adds a peer with given peer urls into the transport. // It is the caller's responsibility to ensure the urls are all valid, // or it panics. // Peer urls are used to connect to the remote peer. AddPeer(id rtypes.ID, peerID types.PeerID, urls []string) // RemovePeer removes the peer with given id. RemovePeer(id rtypes.ID) // RemoveAllPeers removes all the existing peers in the transport. RemoveAllPeers() // UpdatePeer updates the peer urls of the peer with the given id. // It is the caller's responsibility to ensure the urls are all valid, // or it panics. UpdatePeer(id rtypes.ID, urls []string) // ActiveSince returns the time that the connection with the peer // of the given id becomes active. // If the connection is active since peer was added, it returns the adding time. // If the connection is currently inactive, it returns zero time. ActiveSince(id rtypes.ID) time.Time // ActivePeers returns the number of active peers. ActivePeers() int // Stop closes the connections and stops the transporter. Stop() }