Documentation ¶
Index ¶
- Constants
- Variables
- func GetConstructor(cfg *config.Config, hub *component.ComponentHub, cdb consensus.ChainWAL, ...) consensus.Constructor
- func GetName() string
- func Init(raftCfg *config.RaftConfig)
- func MaxUint64(x, y uint64) uint64
- func MemberIDToString(id uint64) string
- func RecoverExit()
- type BlockFactory
- func (bf *BlockFactory) BlockFactory() consensus.BlockFactory
- func (bf *BlockFactory) ClusterInfo() ([]*types.MemberAttr, []byte, error)
- func (bf *BlockFactory) ConfChange(req *types.MembershipChange) (*consensus.Member, error)
- func (bf *BlockFactory) ConsensusInfo() *types.ConsensusInfo
- func (bf *BlockFactory) GetType() consensus.ConsensusType
- func (bf *BlockFactory) HasWAL() bool
- func (bf *BlockFactory) Info() string
- func (bf *BlockFactory) InitCluster(cfg *config.Config) error
- func (bf *BlockFactory) IsBlockValid(block *types.Block, bestBlock *types.Block) error
- func (bf *BlockFactory) IsTransactionValid(tx *types.Tx) bool
- func (bf *BlockFactory) JobQueue() chan<- interface{}
- func (bf *BlockFactory) NeedNotify() bool
- func (bf *BlockFactory) NeedReorganization(rootNo types.BlockNo) bool
- func (bf *BlockFactory) QueueJob(now time.Time, jq chan<- interface{})
- func (bf *BlockFactory) QuitChan() chan interface{}
- func (bf *BlockFactory) Save(tx consensus.TxWriter) error
- func (bf *BlockFactory) Start()
- func (bf *BlockFactory) Ticker() *time.Ticker
- func (bf *BlockFactory) Update(block *types.Block)
- func (bf *BlockFactory) VerifySign(block *types.Block) error
- func (bf *BlockFactory) VerifyTimestamp(*types.Block) bool
- type ChainSnapshotter
- type Cluster
- func (cl *Cluster) AddInitialMembers(raftCfg *config.RaftConfig, useTls bool) error
- func (cl *Cluster) AppliedMembers() *Members
- func (cl *Cluster) ChangeMembership(req *types.MembershipChange) (*consensus.Member, error)
- func (cl *Cluster) IsIDRemoved(id uint64) bool
- func (cl *Cluster) Members() *Members
- func (cl *Cluster) NewMemberFromAddReq(req *types.MembershipChange) (*consensus.Member, error)
- func (cl *Cluster) NewMemberFromRemoveReq(req *types.MembershipChange) (*consensus.Member, error)
- func (cl *Cluster) NodeID() uint64
- func (cl *Cluster) NodeName() string
- func (cl *Cluster) Quorum() uint32
- func (cl *Cluster) Recover(snapshot *raftpb.Snapshot) error
- func (cl *Cluster) RecoverIdentity(id *consensus.RaftIdentity) error
- func (cl *Cluster) RemovedMembers() *Members
- func (cl *Cluster) ResetMembers()
- func (cl *Cluster) SetNodeID(nodeid uint64)
- func (cl *Cluster) SetThisNodeID() error
- func (cl *Cluster) ValidateAndMergeExistingCluster(existingCl *Cluster) bool
- type CommitProgress
- type ErrorMembershipChange
- type LeaderStatus
- type Members
- type Proposed
- type RaftInfo
- type RaftLogger
- func (l *RaftLogger) Debug(args ...interface{})
- func (l *RaftLogger) Debugf(format string, args ...interface{})
- func (l *RaftLogger) Error(args ...interface{})
- func (l *RaftLogger) Errorf(format string, args ...interface{})
- func (l RaftLogger) Fatal(args ...interface{})
- func (l *RaftLogger) Fatalf(format string, args ...interface{})
- func (l *RaftLogger) Info(args ...interface{})
- func (l *RaftLogger) Infof(format string, args ...interface{})
- func (l *RaftLogger) Panic(args ...interface{})
- func (l *RaftLogger) Panicf(format string, args ...interface{})
- func (l *RaftLogger) Warning(args ...interface{})
- func (l *RaftLogger) Warningf(format string, args ...interface{})
- type RaftOperator
- type RaftServerState
- type WalDB
- type Work
Constants ¶
const ( MembersNameInit = "init" MembersNameApplied = "applied" MembersNameRemoved = "removed" )
const ( RaftServerStateRestart = iota RaftServerStateNewCluster RaftServerStateJoinCluster )
const (
DefaultCommitQueueLen = 10
)
const (
DefaultTickMS = time.Millisecond * 30
)
const (
HasNoLeader uint64 = 0
)
Variables ¶
var ( // blockIntervalMs is the block genration interval in milli-seconds. RaftTick = DefaultTickMS RaftSkipEmptyBlock = false MaxCommitQueueLen = DefaultCommitQueueLen BlockIntervalMs time.Duration BlockTimeoutMs time.Duration )
var ( ErrClusterNotReady = errors.New("cluster is not ready") ErrNotRaftLeader = errors.New("this node is not leader") )
var ( MaxConfChangeTimeOut = time.Second * 10 ErrClusterHasNoMember = errors.New("cluster has no member") ErrNotExistRaftMember = errors.New("not exist member of raft cluster") ErrNoEnableSyncPeer = errors.New("no peer to sync chain") ErrNotExistMembers = errors.New("not exist members of cluster") ErrMemberAlreadyApplied = errors.New("member is already added") ErrInvalidMembershipReqType = errors.New("invalid type of membership change request") ErrPendingConfChange = errors.New("pending membership change request is in progree. try again when it is finished") ErrConChangeTimeOut = errors.New("timeouted membership change request") ErrConfChangeChannelBusy = errors.New("channel of conf change propose is busy") ErrCCMemberIsNil = errors.New("memeber is nil") ErrNotMatchedRaftName = errors.New("mismatched name of raft identity") )
var ( ErrNotIncludedRaftMember = errors.New("this node isn't included in initial raft members") ErrRaftEmptyTLSFile = errors.New("cert or key file name is empty") ErrNotHttpsURL = errors.New("url scheme is not https") ErrDupBP = errors.New("raft bp description is duplicated") ErrInvalidRaftPeerID = errors.New("peerID of current raft bp is not equals to p2p configure") )
var ( MaxTimeOutCluter = time.Second * 10 MaxTryGetCluster = 3 ErrGetClusterReplyC = errors.New("reply channel of getcluster request is closed") ErrGetClusterTimeout = errors.New("timeout for getcluster") ErrGetClusterEmpty = errors.New("getcluster reply is empty") ErrGetClusterFail = errors.New("failed to get cluster info") )
var ( ConfSnapFrequency uint64 = 10 ConfSnapshotCatchUpEntriesN uint64 = ConfSnapFrequency )
noinspection ALL
var ( ErrNoSnapshot = errors.New("no snapshot") ErrCCAlreadyApplied = errors.New("conf change entry is already applied") ErrInvalidMember = errors.New("member of conf change is invalid") ErrCCAlreadyAdded = errors.New("member has already added") ErrCCAlreadyRemoved = errors.New("member has already removed") ErrCCNoMemberToRemove = errors.New("there is no member to remove") ErrEmptySnapshot = errors.New("received empty snapshot") ErrInvalidRaftIdentity = errors.New("raft identity is not set") )
var ( DfltTimeWaitPeerLive = time.Second * 5 ErrNotMsgSnap = errors.New("not pb.MsgSnap") ErrClusterMismatchConfState = errors.New("members of cluster doesn't match with raft confstate") )
var ( ErrInvalidEntry = errors.New("Invalid raftpb.entry") ErrWalEntryTooLowTerm = errors.New("term of wal entry is too low") )
var ( ErrWalGetHardState = errors.New("failed to read hard state") ErrWalGetLastIdx = errors.New("failed to read last Idx") )
var (
ErrInvCCType = errors.New("change type of ")
)
var ErrInvalidWalEntry = errors.New("invalid wal entry")
var (
ErrUnmarshal = errors.New("failed to unmarshalEntryData log entry")
)
var ErrWalConvBlock = errors.New("failed to convert bytes of block from wal entry")
Functions ¶
func GetConstructor ¶
func GetConstructor(cfg *config.Config, hub *component.ComponentHub, cdb consensus.ChainWAL, sdb *state.ChainStateDB, pa p2pcommon.PeerAccessor) consensus.Constructor
GetConstructor build and returns consensus.Constructor from New function.
func Init ¶
func Init(raftCfg *config.RaftConfig)
func MemberIDToString ¶
func RecoverExit ¶
func RecoverExit()
Types ¶
type BlockFactory ¶
type BlockFactory struct { *component.ComponentHub consensus.ChainWAL ID string // contains filtered or unexported fields }
BlockFactory implments a raft block factory which generate block each cfg.Consensus.BlockInterval if this node is leader of raft
This can be used for testing purpose.
func New ¶
func New(cfg *config.Config, hub *component.ComponentHub, cdb consensus.ChainWAL, sdb *state.ChainStateDB, pa p2pcommon.PeerAccessor) (*BlockFactory, error)
New returns a BlockFactory.
func (*BlockFactory) BlockFactory ¶
func (bf *BlockFactory) BlockFactory() consensus.BlockFactory
BlockFactory returns r itself.
func (*BlockFactory) ClusterInfo ¶
func (bf *BlockFactory) ClusterInfo() ([]*types.MemberAttr, []byte, error)
func (*BlockFactory) ConfChange ¶
func (bf *BlockFactory) ConfChange(req *types.MembershipChange) (*consensus.Member, error)
ConfChange change membership of raft cluster and returns new membership
func (*BlockFactory) ConsensusInfo ¶
func (bf *BlockFactory) ConsensusInfo() *types.ConsensusInfo
func (*BlockFactory) GetType ¶
func (bf *BlockFactory) GetType() consensus.ConsensusType
func (*BlockFactory) HasWAL ¶
func (bf *BlockFactory) HasWAL() bool
func (*BlockFactory) InitCluster ¶
func (bf *BlockFactory) InitCluster(cfg *config.Config) error
func (*BlockFactory) IsBlockValid ¶
IsBlockValid checks the consensus level validity of a block.
func (*BlockFactory) IsTransactionValid ¶
func (bf *BlockFactory) IsTransactionValid(tx *types.Tx) bool
IsTransactionValid checks the onsensus level validity of a transaction
func (*BlockFactory) JobQueue ¶
func (bf *BlockFactory) JobQueue() chan<- interface{}
// waitUntilStartable wait until this chain synchronizes with more than half of all peers
func (bf *BlockFactory) waitSyncWithMajority() error { ticker := time.NewTicker(peerCheckInterval) for { select { case <-ticker.C: if synced, err := bf.bpc.hasSynced(); err != nil { logger.Error().Err(err).Msg("failed to check sync with a majority of peers") return err } else if synced { return nil } case <-bf.QuitChan(): logger.Info().Msg("quit while wait sync") return ErrBFQuit default: } } }
JobQueue returns the queue for block production triggering.
func (*BlockFactory) NeedNotify ¶
func (bf *BlockFactory) NeedNotify() bool
func (*BlockFactory) NeedReorganization ¶
func (bf *BlockFactory) NeedReorganization(rootNo types.BlockNo) bool
NeedReorganization has nothing to do.
func (*BlockFactory) QueueJob ¶
func (bf *BlockFactory) QueueJob(now time.Time, jq chan<- interface{})
QueueJob send a block triggering information to jq.
func (*BlockFactory) QuitChan ¶
func (bf *BlockFactory) QuitChan() chan interface{}
QuitChan returns the channel from which consensus-related goroutines check when shutdown is initiated.
func (*BlockFactory) Save ¶
func (bf *BlockFactory) Save(tx consensus.TxWriter) error
Save has nothging to do.
func (*BlockFactory) Start ¶
func (bf *BlockFactory) Start()
Start run a raft block factory service.
func (*BlockFactory) Ticker ¶
func (bf *BlockFactory) Ticker() *time.Ticker
Ticker returns a time.Ticker for the main consensus loop.
func (*BlockFactory) Update ¶
func (bf *BlockFactory) Update(block *types.Block)
Update has nothging to do.
func (*BlockFactory) VerifySign ¶
func (bf *BlockFactory) VerifySign(block *types.Block) error
VerifySign checks the consensus level validity of a block.
func (*BlockFactory) VerifyTimestamp ¶
func (bf *BlockFactory) VerifyTimestamp(*types.Block) bool
VerifyTimestamp checks the validity of the block timestamp.
type ChainSnapshotter ¶
type ChainSnapshotter struct { sync.Mutex *component.ComponentHub // contains filtered or unexported fields }
func (*ChainSnapshotter) SaveFromRemote ¶
func (chainsnap *ChainSnapshotter) SaveFromRemote(r io.Reader, id uint64, msg raftpb.Message) (int64, error)
chainSnapshotter rece ives snapshot from http request TODO replace rafthttp with p2p
type Cluster ¶
type Cluster struct { component.ICompSyncRequester sync.Mutex Size uint32 // contains filtered or unexported fields }
raft cluster membership copy from dpos/bp TODO refactoring Cluster represents a cluster of block producers.
func GetClusterInfo ¶
func GetClusterInfo(hs *component.ComponentHub) (*Cluster, error)
GetBestBlock returns the current best block from chainservice
func NewCluster ¶
func NewCluster(chainID []byte, bf *BlockFactory, raftName string, chainTimestamp int64) *Cluster
func NewClusterFromMemberAttrs ¶
func NewClusterFromMemberAttrs(chainID []byte, memberAttrs []*types.MemberAttr) (*Cluster, error)
func (*Cluster) AddInitialMembers ¶
func (cl *Cluster) AddInitialMembers(raftCfg *config.RaftConfig, useTls bool) error
func (*Cluster) AppliedMembers ¶
func (*Cluster) ChangeMembership ¶
func (*Cluster) IsIDRemoved ¶
IsIDRemoved return true if given raft id is not exist in cluster
func (*Cluster) NewMemberFromAddReq ¶
func (*Cluster) NewMemberFromRemoveReq ¶
func (*Cluster) RecoverIdentity ¶
func (cl *Cluster) RecoverIdentity(id *consensus.RaftIdentity) error
RecoverIdentity reset node id and name of cluster. raft identity is saved in WAL and reset when server is restarted
func (*Cluster) RemovedMembers ¶
func (*Cluster) ResetMembers ¶
func (cl *Cluster) ResetMembers()
func (*Cluster) SetThisNodeID ¶
func (*Cluster) ValidateAndMergeExistingCluster ¶
ValidateAndMergeExistingCluster tests if members of existing cluster are matched with this cluster
type CommitProgress ¶
func (*CommitProgress) GetConnect ¶
func (cp *CommitProgress) GetConnect() *commitEntry
func (*CommitProgress) GetRequest ¶
func (cp *CommitProgress) GetRequest() *commitEntry
func (*CommitProgress) IsReadyToPropose ¶
func (cp *CommitProgress) IsReadyToPropose() bool
func (*CommitProgress) UpdateConnect ¶
func (cp *CommitProgress) UpdateConnect(ce *commitEntry)
func (*CommitProgress) UpdateRequest ¶
func (cp *CommitProgress) UpdateRequest(ce *commitEntry)
type ErrorMembershipChange ¶
type ErrorMembershipChange struct {
Err error
}
func (ErrorMembershipChange) Error ¶
func (e ErrorMembershipChange) Error() string
type LeaderStatus ¶
type LeaderStatus struct {
// contains filtered or unexported fields
}
type Members ¶
type RaftLogger ¶
type RaftLogger struct {
// contains filtered or unexported fields
}
Logger is a logging unit. It controls the flow of messages to a given (swappable) backend.
func NewRaftLogger ¶
func NewRaftLogger(logger *log.Logger) *RaftLogger
func (*RaftLogger) Debug ¶
func (l *RaftLogger) Debug(args ...interface{})
func (*RaftLogger) Debugf ¶
func (l *RaftLogger) Debugf(format string, args ...interface{})
func (*RaftLogger) Error ¶
func (l *RaftLogger) Error(args ...interface{})
func (*RaftLogger) Errorf ¶
func (l *RaftLogger) Errorf(format string, args ...interface{})
func (RaftLogger) Fatal ¶
func (l RaftLogger) Fatal(args ...interface{})
func (*RaftLogger) Fatalf ¶
func (l *RaftLogger) Fatalf(format string, args ...interface{})
func (*RaftLogger) Info ¶
func (l *RaftLogger) Info(args ...interface{})
func (*RaftLogger) Infof ¶
func (l *RaftLogger) Infof(format string, args ...interface{})
func (*RaftLogger) Panic ¶
func (l *RaftLogger) Panic(args ...interface{})
func (*RaftLogger) Panicf ¶
func (l *RaftLogger) Panicf(format string, args ...interface{})
func (*RaftLogger) Warning ¶
func (l *RaftLogger) Warning(args ...interface{})
func (*RaftLogger) Warningf ¶
func (l *RaftLogger) Warningf(format string, args ...interface{})
type RaftOperator ¶
type RaftOperator struct {
// contains filtered or unexported fields
}
type RaftServerState ¶
type RaftServerState int