raftv2

package
v2.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2024 License: MIT Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MembersNameInit    = "init"
	MembersNameApplied = "applied"
	MembersNameRemoved = "removed"
	InvalidClusterID   = 0
)
View Source
const (
	DefaultBlockFactoryTickMs = 100
	MinBlockFactoryTickMs     = 10
	DefaultTickMS             = time.Millisecond * 50
	DefaultElectionTickCount  = 10
	DefaultSlowNodeGap        = 100
	DefaultSnapFrequency      = 30
)
View Source
const (
	BackendP2P  = "aergop2p"
	BackendHTTP = "http"
)
View Source
const (
	RaftServerStateRestart = iota
	RaftServerStateNewCluster
	RaftServerStateJoinCluster
)
View Source
const (
	HasNoLeader uint64 = 0
)

Variables

View Source
var (
	ErrClusterNotReady      = errors.New("cluster is not ready")
	ErrNotRaftLeader        = errors.New("this node is not leader")
	ErrInvalidConsensusName = errors.New("invalid consensus name")
	ErrCancelGenerate       = errors.New("cancel generating block because work becomes stale")
)
View Source
var (
	MaxConfChangeTimeOut = time.Second * 100

	ErrClusterHasNoMember   = errors.New("cluster has no member")
	ErrNotExistRaftMember   = errors.New("not exist member of raft cluster")
	ErrNoEnableSyncPeer     = errors.New("no peer to sync chain")
	ErrMemberAlreadyApplied = errors.New("member is already added")

	ErrInvalidMembershipReqType = errors.New("invalid type of membership change request")
	ErrPendingConfChange        = errors.New("pending membership change request is in progree. try again when it is finished")
	ErrConChangeTimeOut         = errors.New("timeouted membership change request")
	ErrConfChangeChannelBusy    = errors.New("channel of conf change propose is busy")
	ErrCCMemberIsNil            = errors.New("memeber is nil")
	ErrNotMatchedRaftName       = errors.New("mismatched name of raft identity")
	ErrNotMatchedRaftPeerID     = errors.New("mismatched peerid of raft identity")
	ErrNotExitRaftProgress      = errors.New("progress of this node doesn't exist")
	ErrUnhealtyNodeExist        = errors.New("can't add some node if unhealthy nodes exist")
	ErrRemoveHealthyNode        = errors.New("remove of a healthy node may cause the cluster to hang")
)
View Source
var (
	ErrEmptyBPs              = errors.New("BP list is empty")
	ErrNotIncludedRaftMember = errors.New("this node isn't included in initial raft members")
	ErrDupBP                 = errors.New("raft bp description is duplicated")
	ErrInvalidRaftPeerID     = errors.New("peerID of current raft bp is not equals to p2p configure")
)
View Source
var (
	RaftTick = DefaultTickMS
	// blockIntervalMs is the block genration interval in milli-seconds.
	RaftSkipEmptyBlock = false

	BlockFactoryTickMs time.Duration
	BlockIntervalMs    time.Duration

	ConfSnapFrequency           uint64 = DefaultSnapFrequency
	ConfSnapshotCatchUpEntriesN uint64 = ConfSnapFrequency

	ElectionTickCount        = DefaultElectionTickCount
	MaxSlowNodeGap    uint64 = DefaultSlowNodeGap // Criteria for determining whether the server is in a slow state
	StopDupCommit            = false
)
View Source
var (
	MaxTimeOutCluter = time.Second * 10
	MaxTryGetCluster = 3

	ErrGetClusterReplyC  = errors.New("reply channel of getcluster request is closed")
	ErrGetClusterTimeout = errors.New("timeout for getcluster")
	ErrGetClusterEmpty   = errors.New("getcluster reply is empty")
	ErrGetClusterFail    = errors.New("failed to get cluster info")
)
View Source
var (
	ErrRaftNotReady        = errors.New("raft library is not initialized")
	ErrCCAlreadyApplied    = errors.New("conf change entry is already applied")
	ErrInvalidMember       = errors.New("member of conf change is invalid")
	ErrCCAlreadyAdded      = errors.New("member has already added")
	ErrCCAlreadyRemoved    = errors.New("member has already removed")
	ErrCCNoMemberToRemove  = errors.New("there is no member to remove")
	ErrEmptySnapshot       = errors.New("received empty snapshot")
	ErrInvalidRaftIdentity = errors.New("raft identity is not set")
	ErrProposeNilBlock     = errors.New("proposed block is nil")
)
View Source
var (
	DfltTimeWaitPeerLive        = time.Second * 5
	ErrNotMsgSnap               = errors.New("not pb.MsgSnap")
	ErrClusterMismatchConfState = errors.New("members of cluster doesn't match with raft confstate")
)
View Source
var (
	ErrInvalidEntry       = errors.New("Invalid raftpb.entry")
	ErrWalEntryTooLowTerm = errors.New("term of wal entry is too low")
)
View Source
var (
	ErrWalGetHardState = errors.New("failed to read hard state")
	ErrWalGetLastIdx   = errors.New("failed to read last Idx")
)
View Source
var (
	DEBUG_PROPOSE_SLEEP = "DEBUG_PROPOSE_SLEEP"
)
View Source
var (
	ErrInvCCType = errors.New("change type of ")
)
View Source
var ErrInvalidWalEntry = errors.New("invalid wal entry")
View Source
var (
	ErrRaftStatusEmpty = errors.New("raft status is empty")
)
View Source
var (
	ErrUnmarshal = errors.New("failed to unmarshalEntryData log entry")
)
View Source
var ErrWalConvBlock = errors.New("failed to convert bytes of block from wal entry")
View Source
var (
	MemberProgressStateNames = map[MemberProgressState]string{
		MemberProgressStateHealthy: "MemberProgressStateHealthy",
		MemberProgressStateSlow:    "MemberProgressStateSlow",
		MemberProgressStateSyncing: "MemberProgressStateSyncing",
		MemberProgressStateUnknown: "MemberProgressStateUnknown",
	}
)

Functions

func EtcdIDToString

func EtcdIDToString(id uint64) string

func GetConstructor

GetConstructor build and returns consensus.Constructor from New function.

func GetName

func GetName() string

GetName returns the name of the consensus.

func Init

func Init(raftCfg *config.RaftConfig)

func MaxUint64

func MaxUint64(x, y uint64) uint64

func RecoverExit

func RecoverExit()

func ValidateGenesis

func ValidateGenesis(genesis *types.Genesis) error

Types

type BlockFactory

type BlockFactory struct {
	*component.ComponentHub
	consensus.ChainWAL

	ID string
	// contains filtered or unexported fields
}

BlockFactory implements a raft block factory which generate block each cfg.Consensus.BlockIntervalMs if this node is leader of raft

This can be used for testing purpose.

func New

New returns a BlockFactory.

func (*BlockFactory) BlockFactory

func (bf *BlockFactory) BlockFactory() consensus.BlockFactory

BlockFactory returns r itself.

func (*BlockFactory) ClusterInfo

func (bf *BlockFactory) ClusterInfo(bestBlockHash []byte) *types.GetClusterInfoResponse

ClusterInfo returns members of cluster and hardstate info corresponding to best block hash

func (*BlockFactory) ConfChange

func (bf *BlockFactory) ConfChange(req *types.MembershipChange) (*consensus.Member, error)

ConfChange change membership of raft cluster and returns new membership

func (*BlockFactory) ConfChangeInfo

func (bf *BlockFactory) ConfChangeInfo(requestID uint64) (*types.ConfChangeProgress, error)

ConfChangeInfo returns ConfChangeProgress queries by request ID of ConfChange

func (*BlockFactory) ConsensusInfo

func (bf *BlockFactory) ConsensusInfo() *types.ConsensusInfo

func (*BlockFactory) GetType

func (bf *BlockFactory) GetType() consensus.ConsensusType

func (*BlockFactory) HasWAL

func (bf *BlockFactory) HasWAL() bool

func (*BlockFactory) Info

func (bf *BlockFactory) Info() string

Info retuns an empty string.

func (*BlockFactory) InitCluster

func (bf *BlockFactory) InitCluster(cfg *config.Config) error

func (*BlockFactory) IsBlockValid

func (bf *BlockFactory) IsBlockValid(block *types.Block, bestBlock *types.Block) error

IsBlockValid checks the consensus level validity of a block.

func (*BlockFactory) IsConnectedBlock

func (bf *BlockFactory) IsConnectedBlock(block *types.Block) bool

check already connect block In raft, block hash may already have been writtern when writing log entry.

func (*BlockFactory) IsForkEnable

func (bf *BlockFactory) IsForkEnable() bool

func (*BlockFactory) IsTransactionValid

func (bf *BlockFactory) IsTransactionValid(tx *types.Tx) bool

IsTransactionValid checks the onsensus level validity of a transaction

func (*BlockFactory) JobQueue

func (bf *BlockFactory) JobQueue() chan<- interface{}

// waitUntilStartable wait until this chain synchronizes with more than half of all peers

func (bf *BlockFactory) waitSyncWithMajority() error {
	ticker := time.NewTicker(peerCheckInterval)

	for {
		select {
		case <-ticker.C:
			if synced, err := bf.cl.hasSynced(); err != nil {
				logger.Error().Err(err).Msg("failed to check sync with a majority of peers")
				return err
			} else if synced {
				return nil
			}

		case <-bf.QuitChan():
			logger.Info().Msg("quit while wait sync")
			return ErrBFQuit
		default:
		}
	}
}

JobQueue returns the queue for block production triggering.

func (*BlockFactory) MakeConfChangeProposal

func (bf *BlockFactory) MakeConfChangeProposal(req *types.MembershipChange) (*consensus.ConfChangePropose, error)

func (*BlockFactory) NeedNotify

func (bf *BlockFactory) NeedNotify() bool

func (*BlockFactory) NeedReorganization

func (bf *BlockFactory) NeedReorganization(rootNo types.BlockNo) bool

NeedReorganization has nothing to do.

func (*BlockFactory) QueueJob

func (bf *BlockFactory) QueueJob(now time.Time, jq chan<- interface{})

QueueJob send a block triggering information to jq, and hold to wait

func (*BlockFactory) QuitChan

func (bf *BlockFactory) QuitChan() chan interface{}

QuitChan returns the channel from which consensus-related goroutines check when shutdown is initiated.

func (*BlockFactory) RaftAccessor

func (bf *BlockFactory) RaftAccessor() consensus.AergoRaftAccessor

func (*BlockFactory) Save

func (bf *BlockFactory) Save(tx consensus.TxWriter) error

Save has nothging to do.

func (*BlockFactory) Start

func (bf *BlockFactory) Start()

Start run a raft block factory service.

func (*BlockFactory) Ticker

func (bf *BlockFactory) Ticker() *time.Ticker

Ticker returns a time.Ticker for the main consensus loop.

func (*BlockFactory) Update

func (bf *BlockFactory) Update(block *types.Block)

Update has nothging to do.

func (*BlockFactory) VerifySign

func (bf *BlockFactory) VerifySign(block *types.Block) error

VerifySign checks the consensus level validity of a block.

func (*BlockFactory) VerifyTimestamp

func (bf *BlockFactory) VerifyTimestamp(*types.Block) bool

VerifyTimestamp checks the validity of the block timestamp.

type ChainSnapshotter

type ChainSnapshotter struct {
	sync.Mutex

	*component.ComponentHub
	// contains filtered or unexported fields
}

func (*ChainSnapshotter) SaveFromRemote

func (chainsnap *ChainSnapshotter) SaveFromRemote(r io.Reader, id uint64, msg raftpb.Message) (int64, error)

chainSnapshotter rece ives snapshot from http request TODO replace rafthttp with p2p

type Cluster

type Cluster struct {
	component.ICompSyncRequester
	sync.Mutex

	Size uint32
	// contains filtered or unexported fields
}

raft cluster membership copy from dpos/bp TODO refactoring Cluster represents a cluster of block producers.

func GetClusterInfo

func GetClusterInfo(hs *component.ComponentHub, bestHash []byte) (*Cluster, *types.HardStateInfo, error)

GetBestBlock returns the current best block from chainservice

func NewCluster

func NewCluster(chainID []byte, bf *BlockFactory, raftName string, p2pPeerID types.PeerID, chainTimestamp int64, notifyFn NotifyFn) *Cluster

func NewClusterFromMemberAttrs

func NewClusterFromMemberAttrs(clusterID uint64, chainID []byte, memberAttrs []*types.MemberAttr) (*Cluster, error)

func (*Cluster) AddInitialMembers

func (cl *Cluster) AddInitialMembers(mbrs []*types.MemberAttr) error

func (*Cluster) AfterConfChange

func (cl *Cluster) AfterConfChange(cc *raftpb.ConfChange, member *consensus.Member, err error)

func (*Cluster) AppliedMembers

func (cl *Cluster) AppliedMembers() *Members

func (*Cluster) ChangeMembership

func (cl *Cluster) ChangeMembership(req *types.MembershipChange, nowait bool) (*consensus.Member, error)

func (*Cluster) ClusterID

func (cl *Cluster) ClusterID() uint64

func (*Cluster) GenerateID

func (cl *Cluster) GenerateID(useBackup bool)

GenerateID generate cluster ID by hashing IDs of all initial members

func (*Cluster) IsIDRemoved

func (cl *Cluster) IsIDRemoved(id uint64) bool

IsIDRemoved return true if given raft id is not exist in cluster

func (*Cluster) Members

func (cl *Cluster) Members() *Members

func (*Cluster) NewMemberFromAddReq

func (cl *Cluster) NewMemberFromAddReq(req *types.MembershipChange) (*consensus.Member, error)

func (*Cluster) NewMemberFromRemoveReq

func (cl *Cluster) NewMemberFromRemoveReq(req *types.MembershipChange) (*consensus.Member, error)

func (*Cluster) NodeID

func (cl *Cluster) NodeID() uint64

func (*Cluster) NodeName

func (cl *Cluster) NodeName() string

func (*Cluster) NodePeerID

func (cl *Cluster) NodePeerID() string

func (*Cluster) Quorum

func (cl *Cluster) Quorum() uint32

func (*Cluster) Recover

func (cl *Cluster) Recover(snapshot *raftpb.Snapshot) (bool, error)

func (*Cluster) RecoverIdentity

func (cl *Cluster) RecoverIdentity(id *consensus.RaftIdentity) error

RecoverIdentity reset node id and name of cluster. raft identity is saved in WAL and reset when server is restarted

func (*Cluster) RemovedMembers

func (cl *Cluster) RemovedMembers() *Members

func (*Cluster) ResetMembers

func (cl *Cluster) ResetMembers()

func (*Cluster) SetClusterID

func (cl *Cluster) SetClusterID(clusterid uint64)

func (*Cluster) SetNodeID

func (cl *Cluster) SetNodeID(nodeid uint64)

func (*Cluster) SetThisNodeID

func (cl *Cluster) SetThisNodeID() error

func (*Cluster) ValidateAndMergeExistingCluster

func (cl *Cluster) ValidateAndMergeExistingCluster(existingCl *Cluster) bool

ValidateAndMergeExistingCluster tests if members of existing cluster are matched with this cluster

type ClusterProgress

type ClusterProgress struct {
	N int

	MemberProgresses map[uint64]*MemberProgress
}

func (*ClusterProgress) ToString

func (cp *ClusterProgress) ToString() string

type CommitProgress

type CommitProgress struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*CommitProgress) GetConnect

func (cp *CommitProgress) GetConnect() *commitEntry

func (*CommitProgress) GetRequest

func (cp *CommitProgress) GetRequest() *commitEntry

func (*CommitProgress) IsReadyToPropose

func (cp *CommitProgress) IsReadyToPropose() bool

func (*CommitProgress) UpdateConnect

func (cp *CommitProgress) UpdateConnect(ce *commitEntry)

func (*CommitProgress) UpdateRequest

func (cp *CommitProgress) UpdateRequest(ce *commitEntry)

type ErrorMembershipChange

type ErrorMembershipChange struct {
	Err error
}

func (ErrorMembershipChange) Error

func (e ErrorMembershipChange) Error() string

type HttpTransportWrapper

type HttpTransportWrapper struct {
	rafthttp.Transport
}

func (*HttpTransportWrapper) AddPeer

func (t *HttpTransportWrapper) AddPeer(id rtypes.ID, peerID types.PeerID, urls []string)

type LeaderStatus

type LeaderStatus struct {
	sync.RWMutex
	Leader uint64
	Term   uint64

	IsLeader bool
	// contains filtered or unexported fields
}

type MemberProgress

type MemberProgress struct {
	MemberID      uint64
	Status        MemberProgressState
	LogDifference uint64
	// contains filtered or unexported fields
}

func (*MemberProgress) ToString

func (cp *MemberProgress) ToString() string

type MemberProgressState

type MemberProgressState int32
const (
	MemberProgressStateHealthy MemberProgressState = iota
	MemberProgressStateSlow
	MemberProgressStateSyncing
	MemberProgressStateUnknown
)

type Members

type Members struct {
	MapByID   map[uint64]*consensus.Member // restore from DB or snapshot
	MapByName map[string]*consensus.Member

	Index map[types.PeerID]uint64 // peer ID to raft ID mapping

	Addresses []string //for raft server TODO remove
	// contains filtered or unexported fields
}

func (*Members) ToArray

func (mbrs *Members) ToArray() []*consensus.Member

func (*Members) ToMemberAttrArray

func (mbrs *Members) ToMemberAttrArray() []*types.MemberAttr

type NotifyFn

type NotifyFn func(event *message.RaftClusterEvent)

type Proposed

type Proposed struct {
	// contains filtered or unexported fields
}

type RaftInfo

type RaftInfo struct {
	Leader string
	Total  uint32
	Name   string
	RaftId string
	Status *json.RawMessage
}

type RaftLogger

type RaftLogger struct {
	// contains filtered or unexported fields
}

Logger is a logging unit. It controls the flow of messages to a given (swappable) backend.

func NewRaftLogger

func NewRaftLogger(logger *log.Logger) *RaftLogger

func (*RaftLogger) Debug

func (l *RaftLogger) Debug(args ...interface{})

func (*RaftLogger) Debugf

func (l *RaftLogger) Debugf(format string, args ...interface{})

func (*RaftLogger) Error

func (l *RaftLogger) Error(args ...interface{})

func (*RaftLogger) Errorf

func (l *RaftLogger) Errorf(format string, args ...interface{})

func (RaftLogger) Fatal

func (l RaftLogger) Fatal(args ...interface{})

func (*RaftLogger) Fatalf

func (l *RaftLogger) Fatalf(format string, args ...interface{})

func (*RaftLogger) Info

func (l *RaftLogger) Info(args ...interface{})

func (*RaftLogger) Infof

func (l *RaftLogger) Infof(format string, args ...interface{})

func (*RaftLogger) Panic

func (l *RaftLogger) Panic(args ...interface{})

func (*RaftLogger) Panicf

func (l *RaftLogger) Panicf(format string, args ...interface{})

func (*RaftLogger) Warning

func (l *RaftLogger) Warning(args ...interface{})

func (*RaftLogger) Warningf

func (l *RaftLogger) Warningf(format string, args ...interface{})

type RaftOperator

type RaftOperator struct {
	// contains filtered or unexported fields
}

func (*RaftOperator) ProposeConfChange

func (rop *RaftOperator) ProposeConfChange(proposal *consensus.ConfChangePropose) error

type RaftServerState

type RaftServerState int

type Transporter

type Transporter interface {
	// Start starts the given Transporter.
	// Start MUST be called before calling other functions in the interface.
	Start() error
	// Handler returns the HTTP handler of the transporter.
	// A transporter HTTP handler handles the HTTP requests
	// from remote peers.
	// The handler MUST be used to handle RaftPrefix(/raft)
	// endpoint.
	Handler() http.Handler
	// Send sends out the given messages to the remote peers.
	// Each message has a To field, which is an id that maps
	// to an existing peer in the transport.
	// If the id cannot be found in the transport, the message
	// will be ignored.
	Send(m []raftpb.Message)
	// SendSnapshot sends out the given snapshot message to a remote peer.
	// The behavior of SendSnapshot is similar to Send.
	SendSnapshot(m snap.Message)

	// AddPeer adds a peer with given peer urls into the transport.
	// It is the caller's responsibility to ensure the urls are all valid,
	// or it panics.
	// Peer urls are used to connect to the remote peer.
	AddPeer(id rtypes.ID, peerID types.PeerID, urls []string)

	// RemovePeer removes the peer with given id.
	RemovePeer(id rtypes.ID)
	// RemoveAllPeers removes all the existing peers in the transport.
	RemoveAllPeers()
	// UpdatePeer updates the peer urls of the peer with the given id.
	// It is the caller's responsibility to ensure the urls are all valid,
	// or it panics.
	UpdatePeer(id rtypes.ID, urls []string)
	// ActiveSince returns the time that the connection with the peer
	// of the given id becomes active.
	// If the connection is active since peer was added, it returns the adding time.
	// If the connection is currently inactive, it returns zero time.
	ActiveSince(id rtypes.ID) time.Time
	// ActivePeers returns the number of active peers.
	ActivePeers() int
	// Stop closes the connections and stops the transporter.
	Stop()
}

type WalDB

type WalDB struct {
	consensus.ChainWAL
}

func NewWalDB

func NewWalDB(chainWal consensus.ChainWAL) *WalDB

func (*WalDB) ReadAll

func (wal *WalDB) ReadAll(snapshot *raftpb.Snapshot) (id *consensus.RaftIdentity, state *raftpb.HardState, ents []raftpb.Entry, err error)

ReadAll returns hard state, all uncommitted entries - read last hard state - read all uncommited entries after snapshot index

func (*WalDB) SaveEntry

func (wal *WalDB) SaveEntry(state raftpb.HardState, entries []raftpb.Entry) error

type Work

type Work struct {
	*types.Block
	// contains filtered or unexported fields
}

func (*Work) GetTimeout

func (work *Work) GetTimeout() time.Duration

func (*Work) ToString

func (work *Work) ToString() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL