Documentation ¶
Index ¶
- Constants
- Variables
- func CheckConfigMetadata(metadata *etcdraft.ConfigMetadata) error
- func ConfChange(blockMetadata *etcdraft.BlockMetadata, confState *raftpb.ConfState) *raftpb.ConfChange
- func ConfigChannelHeader(block *common.Block) (hdr *common.ChannelHeader, err error)
- func ConfigEnvelopeFromBlock(block *common.Block) (*common.Envelope, error)
- func ConsensusMetadataFromConfigBlock(block *common.Block) (*etcdraft.ConfigMetadata, error)
- func ConsentersToMap(consenters []*etcdraft.Consenter) map[string]struct{}
- func CreateConsentersMap(blockMetadata *etcdraft.BlockMetadata, configMetadata *etcdraft.ConfigMetadata) map[uint64]*etcdraft.Consenter
- func EndpointconfigFromSupport(support consensus.ConsenterSupport, bccsp bccsp.BCCSP) ([]cluster.EndpointCriteria, error)
- func ListSnapshots(logger *flogging.FabricLogger, snapDir string) []uint64
- func MembershipByCert(consenters map[uint64]*etcdraft.Consenter) map[string]uint64
- func MetadataFromConfigUpdate(update *common.ConfigUpdate) (*etcdraft.ConfigMetadata, error)
- func MetadataFromConfigValue(configValue *common.ConfigValue) (*etcdraft.ConfigMetadata, error)
- func MetadataHasDuplication(md *etcdraft.ConfigMetadata) error
- func NodeExists(id uint64, nodes []uint64) bool
- func RaftPeers(consenterIDs []uint64) []raft.Peer
- func ReadBlockMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.ConfigMetadata) (*etcdraft.BlockMetadata, error)
- type BlockPuller
- type Chain
- func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error
- func (c *Chain) Consensus(req *orderer.ConsensusRequest, sender uint64) error
- func (c *Chain) Errored() <-chan struct{}
- func (c *Chain) Halt()
- func (c *Chain) Order(env *common.Envelope, configSeq uint64) error
- func (c *Chain) Start()
- func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error
- func (c *Chain) ValidateConsensusMetadata(oldMetadataBytes, newMetadataBytes []byte, newChannel bool) error
- func (c *Chain) WaitReady() error
- type ChainGetter
- type Config
- type Configurator
- type Consenter
- type ConsenterCertificate
- type CreateBlockPuller
- type Dispatcher
- type Disseminator
- type InactiveChainRegistry
- type LedgerBlockPuller
- type MembershipChanges
- type MemoryStorage
- type MessageReceiver
- type Metrics
- type Options
- type PeriodicCheck
- type RPC
- type RaftStorage
- func (rs *RaftStorage) ApplySnapshot(snap raftpb.Snapshot)
- func (rs *RaftStorage) Close() error
- func (rs *RaftStorage) Snapshot() raftpb.Snapshot
- func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error
- func (rs *RaftStorage) TakeSnapshot(i uint64, cs raftpb.ConfState, data []byte) error
- type ReceiverGetter
- type Tracker
Constants ¶
const ( BYTE = 1 << (10 * iota) KILOBYTE MEGABYTE GIGABYTE TERABYTE )
const ( // DefaultSnapshotCatchUpEntries is the default number of entries // to preserve in memory when a snapshot is taken. This is for // slow followers to catch up. DefaultSnapshotCatchUpEntries = uint64(4) // DefaultSnapshotIntervalSize is the default snapshot interval. It is // used if SnapshotIntervalSize is not provided in channel config options. // It is needed to enforce snapshot being set. DefaultSnapshotIntervalSize = 16 * MEGABYTE // DefaultEvictionSuspicion is the threshold that a node will start // suspecting its own eviction if it has been leaderless for this // period of time. DefaultEvictionSuspicion = time.Minute * 10 // DefaultLeaderlessCheckInterval is the interval that a chain checks // its own leadership status. DefaultLeaderlessCheckInterval = time.Second * 10 )
Variables ¶
var ( ActiveNodesOpts = metrics.GaugeOpts{ Namespace: "consensus", Subsystem: "etcdraft", Name: "active_nodes", Help: "Number of active nodes in this channel.", LabelNames: []string{"channel"}, StatsdFormat: "%{#fqname}.%{channel}", } )
var MaxSnapshotFiles = 4
MaxSnapshotFiles defines max number of etcd/raft snapshot files to retain on filesystem. Snapshot files are read from newest to oldest, until first intact file is found. The more snapshot files we keep around, the more we mitigate the impact of a corrupted snapshots. This is exported for testing purpose. This MUST be greater equal than 1.
Functions ¶
func CheckConfigMetadata ¶
func CheckConfigMetadata(metadata *etcdraft.ConfigMetadata) error
CheckConfigMetadata validates Raft config metadata
func ConfChange ¶
func ConfChange(blockMetadata *etcdraft.BlockMetadata, confState *raftpb.ConfState) *raftpb.ConfChange
ConfChange computes Raft configuration changes based on current Raft configuration state and consenters IDs stored in RaftMetadata.
func ConfigChannelHeader ¶
func ConfigChannelHeader(block *common.Block) (hdr *common.ChannelHeader, err error)
ConfigChannelHeader expects a config block and returns the header type of the config envelope wrapped in it, e.g. HeaderType_ORDERER_TRANSACTION
func ConfigEnvelopeFromBlock ¶
ConfigEnvelopeFromBlock extracts configuration envelope from the block based on the config type, i.e. HeaderType_ORDERER_TRANSACTION or HeaderType_CONFIG
func ConsensusMetadataFromConfigBlock ¶
func ConsensusMetadataFromConfigBlock(block *common.Block) (*etcdraft.ConfigMetadata, error)
ConsensusMetadataFromConfigBlock reads consensus metadata updates from the configuration block
func ConsentersToMap ¶
ConsentersToMap maps consenters into set where key is client TLS certificate
func CreateConsentersMap ¶
func CreateConsentersMap(blockMetadata *etcdraft.BlockMetadata, configMetadata *etcdraft.ConfigMetadata) map[uint64]*etcdraft.Consenter
CreateConsentersMap creates a map of Raft Node IDs to Consenter given the block metadata and the config metadata.
func EndpointconfigFromSupport ¶
func EndpointconfigFromSupport(support consensus.ConsenterSupport, bccsp bccsp.BCCSP) ([]cluster.EndpointCriteria, error)
EndpointconfigFromSupport extracts TLS CA certificates and endpoints from the ConsenterSupport
func ListSnapshots ¶
func ListSnapshots(logger *flogging.FabricLogger, snapDir string) []uint64
ListSnapshots returns a list of RaftIndex of snapshots stored on disk. If a file is corrupted, rename the file.
func MembershipByCert ¶
MembershipByCert convert consenters map into set encapsulated by map where key is client TLS certificate
func MetadataFromConfigUpdate ¶
func MetadataFromConfigUpdate(update *common.ConfigUpdate) (*etcdraft.ConfigMetadata, error)
MetadataFromConfigUpdate extracts consensus metadata from config update
func MetadataFromConfigValue ¶
func MetadataFromConfigValue(configValue *common.ConfigValue) (*etcdraft.ConfigMetadata, error)
MetadataFromConfigValue reads and translates configuration updates from config value into raft metadata
func MetadataHasDuplication ¶
func MetadataHasDuplication(md *etcdraft.ConfigMetadata) error
MetadataHasDuplication returns an error if the metadata has duplication of consenters. A duplication is defined by having a server or a client TLS certificate that is found in two different consenters, regardless of the type of certificate (client/server).
func NodeExists ¶
NodeExists returns trues if node id exists in the slice and false otherwise
func ReadBlockMetadata ¶
func ReadBlockMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.ConfigMetadata) (*etcdraft.BlockMetadata, error)
ReadBlockMetadata attempts to read raft metadata from block metadata, if available. otherwise, it reads raft metadata from config metadata supplied.
Types ¶
type BlockPuller ¶
type BlockPuller interface { PullBlock(seq uint64) *common.Block HeightsByEndpoints() (map[string]uint64, error) Close() }
BlockPuller is used to pull blocks from other OSN
func NewBlockPuller ¶
func NewBlockPuller(support consensus.ConsenterSupport, baseDialer *cluster.PredicateDialer, clusterConfig localconfig.Cluster, bccsp bccsp.BCCSP, ) (BlockPuller, error)
NewBlockPuller creates a new block puller
type Chain ¶
type Chain struct { ActiveNodes atomic.Value // this is exported so that test can use `Node.Status()` to get raft node status. Node *node Metrics *Metrics // BCCSP instane CryptoProvider bccsp.BCCSP // contains filtered or unexported fields }
Chain implements consensus.Chain interface.
func NewChain ¶
func NewChain( support consensus.ConsenterSupport, opts Options, conf Configurator, rpc RPC, cryptoProvider bccsp.BCCSP, f CreateBlockPuller, haltCallback func(), observeC chan<- raft.SoftState, ) (*Chain, error)
NewChain constructs a chain object.
func (*Chain) Consensus ¶
func (c *Chain) Consensus(req *orderer.ConsensusRequest, sender uint64) error
Consensus passes the given ConsensusRequest message to the raft.Node instance
func (*Chain) Errored ¶
func (c *Chain) Errored() <-chan struct{}
Errored returns a channel that closes when the chain stops.
func (*Chain) Start ¶
func (c *Chain) Start()
Start instructs the orderer to begin serving the chain and keep it current.
func (*Chain) Submit ¶
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error
Submit forwards the incoming request to: - the local run goroutine if this is leader - the actual leader via the transport mechanism The call fails if there's no leader elected yet.
type ChainGetter ¶
type ChainGetter interface { // GetChain obtains the ChainSupport for the given channel. // Returns nil, false when the ChainSupport for the given channel // isn't found. GetChain(chainID string) *multichannel.ChainSupport }
ChainGetter obtains instances of ChainSupport for the given channel
type Config ¶
type Config struct { WALDir string // WAL data of <my-channel> is stored in WALDir/<my-channel> SnapDir string // Snapshots of <my-channel> are stored in SnapDir/<my-channel> EvictionSuspicion string // Duration threshold that the node samples in order to suspect its eviction from the channel. }
Config contains etcdraft configurations
type Configurator ¶
type Configurator interface {
Configure(channel string, newNodes []cluster.RemoteNode)
}
Configurator is used to configure the communication layer when the chain starts.
type Consenter ¶
type Consenter struct { CreateChain func(chainName string) InactiveChainRegistry InactiveChainRegistry Dialer *cluster.PredicateDialer Communication cluster.Communicator *Dispatcher Chains ChainGetter Logger *flogging.FabricLogger EtcdRaftConfig Config OrdererConfig localconfig.TopLevel Cert []byte Metrics *Metrics BCCSP bccsp.BCCSP }
Consenter implements etcdraft consenter
func New ¶
func New( clusterDialer *cluster.PredicateDialer, conf *localconfig.TopLevel, srvConf comm.ServerConfig, srv *comm.GRPCServer, r *multichannel.Registrar, icr InactiveChainRegistry, metricsProvider metrics.Provider, bccsp bccsp.BCCSP, ) *Consenter
New creates a etcdraft Consenter
func (*Consenter) HandleChain ¶
func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *common.Metadata) (consensus.Chain, error)
HandleChain returns a new Chain instance or an error upon failure
func (*Consenter) ReceiverByChain ¶
func (c *Consenter) ReceiverByChain(channelID string) MessageReceiver
ReceiverByChain returns the MessageReceiver for the given channelID or nil if not found.
type ConsenterCertificate ¶
ConsenterCertificate denotes a TLS certificate of a consenter
func (ConsenterCertificate) IsConsenterOfChannel ¶
func (conCert ConsenterCertificate) IsConsenterOfChannel(configBlock *common.Block) error
IsConsenterOfChannel returns whether the caller is a consenter of a channel by inspecting the given configuration block. It returns nil if true, else returns an error.
type CreateBlockPuller ¶
type CreateBlockPuller func() (BlockPuller, error)
CreateBlockPuller is a function to create BlockPuller on demand. It is passed into chain initializer so that tests could mock this.
type Dispatcher ¶
type Dispatcher struct { Logger *flogging.FabricLogger ChainSelector ReceiverGetter }
Dispatcher dispatches Submit and Step requests to the designated per chain instances
func (*Dispatcher) OnConsensus ¶
func (d *Dispatcher) OnConsensus(channel string, sender uint64, request *orderer.ConsensusRequest) error
OnConsensus notifies the Dispatcher for a reception of a StepRequest from a given sender on a given channel
func (*Dispatcher) OnSubmit ¶
func (d *Dispatcher) OnSubmit(channel string, sender uint64, request *orderer.SubmitRequest) error
OnSubmit notifies the Dispatcher for a reception of a SubmitRequest from a given sender on a given channel
type Disseminator ¶
type Disseminator struct { RPC // contains filtered or unexported fields }
Disseminator piggybacks cluster metadata, if any, to egress messages.
func (*Disseminator) SendConsensus ¶
func (d *Disseminator) SendConsensus(dest uint64, msg *orderer.ConsensusRequest) error
func (*Disseminator) UpdateMetadata ¶
func (d *Disseminator) UpdateMetadata(m []byte)
type InactiveChainRegistry ¶
type InactiveChainRegistry interface { // TrackChain tracks a chain with the given name, and calls the given callback // when this chain should be created. TrackChain(chainName string, genesisBlock *common.Block, createChain func()) }
InactiveChainRegistry registers chains that are inactive
type LedgerBlockPuller ¶
type LedgerBlockPuller struct { BlockPuller BlockRetriever cluster.BlockRetriever Height func() uint64 }
LedgerBlockPuller pulls blocks upon demand, or fetches them from the ledger
type MembershipChanges ¶
type MembershipChanges struct { NewBlockMetadata *etcdraft.BlockMetadata NewConsenters map[uint64]*etcdraft.Consenter AddedNodes []*etcdraft.Consenter RemovedNodes []*etcdraft.Consenter ConfChange *raftpb.ConfChange RotatedNode uint64 }
MembershipChanges keeps information about membership changes introduced during configuration update
func ComputeMembershipChanges ¶
func ComputeMembershipChanges(oldMetadata *etcdraft.BlockMetadata, oldConsenters map[uint64]*etcdraft.Consenter, newConsenters []*etcdraft.Consenter, ordererConfig channelconfig.Orderer) (mc *MembershipChanges, err error)
ComputeMembershipChanges computes membership update based on information about new consenters, returns two slices: a slice of added consenters and a slice of consenters to be removed
func (*MembershipChanges) Changed ¶
func (mc *MembershipChanges) Changed() bool
Changed indicates whether these changes actually do anything
func (*MembershipChanges) Rotated ¶
func (mc *MembershipChanges) Rotated() bool
Rotated indicates whether the change was a rotation
func (*MembershipChanges) String ¶
func (mc *MembershipChanges) String() string
Stringer implements fmt.Stringer interface
func (*MembershipChanges) UnacceptableQuorumLoss ¶
func (mc *MembershipChanges) UnacceptableQuorumLoss(active []uint64) bool
UnacceptableQuorumLoss returns true if membership change will result in avoidable quorum loss, given current number of active nodes in cluster. Avoidable means that more nodes can be started to prevent quorum loss. Sometimes, quorum loss is inevitable, for example expanding 1-node cluster.
type MemoryStorage ¶
type MemoryStorage interface { raft.Storage Append(entries []raftpb.Entry) error SetHardState(st raftpb.HardState) error CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte) (raftpb.Snapshot, error) Compact(compactIndex uint64) error ApplySnapshot(snap raftpb.Snapshot) error }
MemoryStorage is currently backed by etcd/raft.MemoryStorage. This interface is defined to expose dependencies of fsm so that it may be swapped in the future. TODO(jay) Add other necessary methods to this interface once we need them in implementation, e.g. ApplySnapshot.
type MessageReceiver ¶
type MessageReceiver interface { // Consensus passes the given ConsensusRequest message to the MessageReceiver Consensus(req *orderer.ConsensusRequest, sender uint64) error // Submit passes the given SubmitRequest message to the MessageReceiver Submit(req *orderer.SubmitRequest, sender uint64) error }
MessageReceiver receives messages
type Metrics ¶
type Metrics struct { ClusterSize metrics.Gauge IsLeader metrics.Gauge ActiveNodes metrics.Gauge CommittedBlockNumber metrics.Gauge SnapshotBlockNumber metrics.Gauge LeaderChanges metrics.Counter ProposalFailures metrics.Counter DataPersistDuration metrics.Histogram NormalProposalsReceived metrics.Counter ConfigProposalsReceived metrics.Counter }
func NewMetrics ¶
type Options ¶
type Options struct { RaftID uint64 Clock clock.Clock WALDir string SnapDir string SnapshotIntervalSize uint32 // This is configurable mainly for testing purpose. Users are not // expected to alter this. Instead, DefaultSnapshotCatchUpEntries is used. SnapshotCatchUpEntries uint64 MemoryStorage MemoryStorage Logger *flogging.FabricLogger TickInterval time.Duration ElectionTick int HeartbeatTick int MaxSizePerMsg uint64 MaxInflightBlocks int // BlockMetdata and Consenters should only be modified while under lock // of raftMetadataLock BlockMetadata *etcdraft.BlockMetadata Consenters map[uint64]*etcdraft.Consenter // MigrationInit is set when the node starts right after consensus-type migration MigrationInit bool Metrics *Metrics Cert []byte EvictionSuspicion time.Duration LeaderCheckInterval time.Duration }
Options contains all the configurations relevant to the chain.
type PeriodicCheck ¶
type PeriodicCheck struct { Logger *flogging.FabricLogger CheckInterval time.Duration Condition func() bool Report func(cumulativePeriod time.Duration) // contains filtered or unexported fields }
PeriodicCheck checks periodically a condition, and reports the cumulative consecutive period the condition was fulfilled.
type RPC ¶
type RPC interface { SendConsensus(dest uint64, msg *orderer.ConsensusRequest) error SendSubmit(dest uint64, request *orderer.SubmitRequest) error }
RPC is used to mock the transport layer in tests.
type RaftStorage ¶
type RaftStorage struct { SnapshotCatchUpEntries uint64 // contains filtered or unexported fields }
RaftStorage encapsulates storages needed for etcd/raft data, i.e. memory, wal
func CreateStorage ¶
func CreateStorage( lg *flogging.FabricLogger, walDir string, snapDir string, ram MemoryStorage, ) (*RaftStorage, error)
CreateStorage attempts to create a storage to persist etcd/raft data. If data presents in specified disk, they are loaded to reconstruct storage state.
func (*RaftStorage) ApplySnapshot ¶
func (rs *RaftStorage) ApplySnapshot(snap raftpb.Snapshot)
ApplySnapshot applies snapshot to local memory storage
func (*RaftStorage) Snapshot ¶
func (rs *RaftStorage) Snapshot() raftpb.Snapshot
Snapshot returns the latest snapshot stored in memory
func (*RaftStorage) Store ¶
func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error
Store persists etcd/raft data
func (*RaftStorage) TakeSnapshot ¶
TakeSnapshot takes a snapshot at index i from MemoryStorage, and persists it to wal and disk.
type ReceiverGetter ¶
type ReceiverGetter interface { // ReceiverByChain returns the MessageReceiver if it exists, or nil if it doesn't ReceiverByChain(channelID string) MessageReceiver }
ReceiverGetter obtains instances of MessageReceiver given a channel ID