Documentation ¶
Index ¶
- Constants
- Variables
- type AggregatedSignatures
- func (as *AggregatedSignatures) ChunksWithoutAggregatedSignature() []uint64
- func (as *AggregatedSignatures) Collect() []flow.AggregatedSignature
- func (as *AggregatedSignatures) HasSignature(chunkIndex uint64) bool
- func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) (uint64, error)
- type ApprovalCollector
- func (c *ApprovalCollector) CollectMissingVerifiers() map[uint64]flow.IdentifierList
- func (c *ApprovalCollector) IncorporatedBlock() *flow.Header
- func (c *ApprovalCollector) IncorporatedBlockID() flow.Identifier
- func (c *ApprovalCollector) IncorporatedResult() *flow.IncorporatedResult
- func (c *ApprovalCollector) ProcessApproval(approval *flow.ResultApproval) error
- func (c *ApprovalCollector) SealResult() error
- type ApprovalsCache
- type AssignmentCollector
- type AssignmentCollectorBase
- func (cb *AssignmentCollectorBase) Block() *flow.Header
- func (cb *AssignmentCollectorBase) BlockID() flow.Identifier
- func (cb *AssignmentCollectorBase) OnInvalidApproval(approval *flow.ResultApproval, err error)
- func (cb *AssignmentCollectorBase) Result() *flow.ExecutionResult
- func (cb *AssignmentCollectorBase) ResultID() flow.Identifier
- type AssignmentCollectorState
- type AssignmentCollectorStateMachine
- func (asm *AssignmentCollectorStateMachine) ChangeProcessingStatus(expectedCurrentStatus, newStatus ProcessingStatus) error
- func (asm *AssignmentCollectorStateMachine) CheckEmergencySealing(observer consensus.SealingObservation, finalizedBlockHeight uint64) error
- func (asm *AssignmentCollectorStateMachine) ProcessApproval(approval *flow.ResultApproval) error
- func (asm *AssignmentCollectorStateMachine) ProcessIncorporatedResult(incorporatedResult *flow.IncorporatedResult) error
- func (asm *AssignmentCollectorStateMachine) ProcessingStatus() ProcessingStatus
- func (asm *AssignmentCollectorStateMachine) RequestMissingApprovals(observer consensus.SealingObservation, maxHeightForRequesting uint64) (uint, error)
- type AssignmentCollectorTree
- func (t *AssignmentCollectorTree) FinalizeForkAtLevel(finalized *flow.Header, sealed *flow.Header) error
- func (t *AssignmentCollectorTree) GetCollector(resultID flow.Identifier) AssignmentCollector
- func (t *AssignmentCollectorTree) GetCollectorsByInterval(from, to uint64) []AssignmentCollector
- func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionResult) (*LazyInitCollector, error)
- func (t *AssignmentCollectorTree) GetSize() uint64
- type BaseApprovalsTestSuite
- type BaseAssignmentCollectorTestSuite
- type CachingAssignmentCollector
- func (ac *CachingAssignmentCollector) CheckEmergencySealing(consensus.SealingObservation, uint64) error
- func (ac *CachingAssignmentCollector) GetApprovals() []*flow.ResultApproval
- func (ac *CachingAssignmentCollector) GetIncorporatedResults() []*flow.IncorporatedResult
- func (ac *CachingAssignmentCollector) ProcessApproval(approval *flow.ResultApproval) error
- func (ac *CachingAssignmentCollector) ProcessIncorporatedResult(incorporatedResult *flow.IncorporatedResult) error
- func (ac *CachingAssignmentCollector) ProcessingStatus() ProcessingStatus
- func (ac *CachingAssignmentCollector) RequestMissingApprovals(consensus.SealingObservation, uint64) (uint, error)
- type ChunkApprovalCollector
- type IncorporatedResultsCache
- type LazyInitCollector
- type LruCache
- type NewCollectorFactoryMethod
- type OrphanAssignmentCollector
- func (oc *OrphanAssignmentCollector) CheckEmergencySealing(consensus.SealingObservation, uint64) error
- func (oc *OrphanAssignmentCollector) ProcessApproval(*flow.ResultApproval) error
- func (oc *OrphanAssignmentCollector) ProcessIncorporatedResult(*flow.IncorporatedResult) error
- func (oc *OrphanAssignmentCollector) ProcessingStatus() ProcessingStatus
- func (oc *OrphanAssignmentCollector) RequestMissingApprovals(consensus.SealingObservation, uint64) (uint, error)
- type ProcessingStatus
- type RequestTracker
- func (rt *RequestTracker) GetAllIds() []flow.Identifier
- func (rt *RequestTracker) PruneUpToHeight(height uint64) error
- func (rt *RequestTracker) Remove(resultIDs ...flow.Identifier)
- func (rt *RequestTracker) TryUpdate(result *flow.ExecutionResult, incorporatedBlockID flow.Identifier, ...) (RequestTrackerItem, bool, error)
- type RequestTrackerItem
- type SignatureCollector
- func (c *SignatureCollector) Add(signerID flow.Identifier, signature crypto.Signature)
- func (c *SignatureCollector) BySigner(signerID flow.Identifier) (*crypto.Signature, bool)
- func (c *SignatureCollector) HasSigned(signerID flow.Identifier) bool
- func (c *SignatureCollector) NumberSignatures() uint
- func (c *SignatureCollector) ToAggregatedSignature() flow.AggregatedSignature
- type VerifyingAssignmentCollector
- func (ac *VerifyingAssignmentCollector) CheckEmergencySealing(observer consensus.SealingObservation, finalizedBlockHeight uint64) error
- func (ac *VerifyingAssignmentCollector) ProcessApproval(approval *flow.ResultApproval) error
- func (ac *VerifyingAssignmentCollector) ProcessIncorporatedResult(incorporatedResult *flow.IncorporatedResult) error
- func (ac *VerifyingAssignmentCollector) ProcessingStatus() ProcessingStatus
- func (ac *VerifyingAssignmentCollector) RequestMissingApprovals(observation consensus.SealingObservation, maxHeightForRequesting uint64) (uint, error)
Constants ¶
const DefaultEmergencySealingThresholdForFinalization = 100
DefaultEmergencySealingThresholdForFinalization is the minimal number of unsealed but finalized descendants that a block must have in order to be eligible for emergency sealing (further conditions apply for emergency sealing).
const DefaultEmergencySealingThresholdForVerification = 25
DefaultEmergencySealingThresholdForVerification is the minimal number of finalized descendants that the block _incorporating_ an Execution Result [ER] must have for the ER to be eligible for emergency sealing (further conditions apply for emergency sealing).
Variables ¶
var ( ErrInvalidCollectorStateTransition = errors.New("invalid state transition") ErrDifferentCollectorState = errors.New("different state") )
Functions ¶
This section is empty.
Types ¶
type AggregatedSignatures ¶
type AggregatedSignatures struct {
// contains filtered or unexported fields
}
AggregatedSignatures is an utility struct that provides concurrency safe access to map of aggregated signatures indexed by chunk index
func NewAggregatedSignatures ¶
func NewAggregatedSignatures(chunks uint64) (*AggregatedSignatures, error)
NewAggregatedSignatures instantiates a AggregatedSignatures. Requires that number of chunks is positive integer. Errors otherwise.
func (*AggregatedSignatures) ChunksWithoutAggregatedSignature ¶
func (as *AggregatedSignatures) ChunksWithoutAggregatedSignature() []uint64
ChunksWithoutAggregatedSignature returns indexes of chunks that don't have an aggregated signature
func (*AggregatedSignatures) Collect ¶
func (as *AggregatedSignatures) Collect() []flow.AggregatedSignature
Collect returns array with aggregated signature for each chunk
func (*AggregatedSignatures) HasSignature ¶
func (as *AggregatedSignatures) HasSignature(chunkIndex uint64) bool
HasSignature returns boolean depending if we have signature for particular chunk
func (*AggregatedSignatures) PutSignature ¶
func (as *AggregatedSignatures) PutSignature(chunkIndex uint64, aggregatedSignature flow.AggregatedSignature) (uint64, error)
PutSignature adds the AggregatedSignature from the collector to `aggregatedSignatures`. The returned int is the resulting number of approved chunks. Errors if chunk index exceeds valid range.
type ApprovalCollector ¶
type ApprovalCollector struct {
// contains filtered or unexported fields
}
ApprovalCollector is responsible for distributing work to chunk collectorTree, collecting aggregated signatures for chunks that reached seal construction threshold, creating and submitting seal candidates once signatures for every chunk are aggregated.
func NewApprovalCollector ¶
func NewApprovalCollector( log zerolog.Logger, result *flow.IncorporatedResult, incorporatedBlock *flow.Header, executedBlock *flow.Header, assignment *chunks.Assignment, seals mempool.IncorporatedResultSeals, requiredApprovalsForSealConstruction uint, ) (*ApprovalCollector, error)
func (*ApprovalCollector) CollectMissingVerifiers ¶
func (c *ApprovalCollector) CollectMissingVerifiers() map[uint64]flow.IdentifierList
CollectMissingVerifiers collects ids of verifiers who haven't provided an approval for particular chunk Returns: map { ChunkIndex -> []VerifierId }
func (*ApprovalCollector) IncorporatedBlock ¶
func (c *ApprovalCollector) IncorporatedBlock() *flow.Header
IncorporatedBlock returns the block which incorporates execution result
func (*ApprovalCollector) IncorporatedBlockID ¶
func (c *ApprovalCollector) IncorporatedBlockID() flow.Identifier
IncorporatedBlockID returns the ID of block which incorporates execution result
func (*ApprovalCollector) IncorporatedResult ¶
func (c *ApprovalCollector) IncorporatedResult() *flow.IncorporatedResult
IncorporatedResult returns the incorporated Result this ApprovalCollector is for
func (*ApprovalCollector) ProcessApproval ¶
func (c *ApprovalCollector) ProcessApproval(approval *flow.ResultApproval) error
ProcessApproval performs processing of result approvals and bookkeeping of aggregated signatures for every chunk. Triggers sealing of execution result when processed last result approval needed for sealing. Returns: - engine.InvalidInputError - result approval is invalid - exception in case of any other error, usually this is not expected - nil on success
func (*ApprovalCollector) SealResult ¶
func (c *ApprovalCollector) SealResult() error
type ApprovalsCache ¶
type ApprovalsCache struct {
// contains filtered or unexported fields
}
ApprovalsCache encapsulates a map for storing result approvals indexed by approval partial ID to provide concurrent access.
func NewApprovalsCache ¶
func NewApprovalsCache(sizeHint uint) *ApprovalsCache
func (*ApprovalsCache) All ¶
func (c *ApprovalsCache) All() []*flow.ResultApproval
All returns all stored approvals
func (*ApprovalsCache) Get ¶
func (c *ApprovalsCache) Get(approvalID flow.Identifier) *flow.ResultApproval
Get returns ResultApproval for the given ID (or nil if none is stored)
func (*ApprovalsCache) Put ¶
func (c *ApprovalsCache) Put(approvalID flow.Identifier, approval *flow.ResultApproval) bool
Put saves approval into cache; returns true iff approval was newly added the approvalID should be calculated as `approval.Body.PartialID()` it is taken as an input so that the caller can optimize by reusing the calculated approvalID.
type AssignmentCollector ¶
type AssignmentCollector interface { AssignmentCollectorState // ChangeProcessingStatus changes the AssignmentCollector's internal processing // status. The operation is implemented as an atomic compare-and-swap, i.e. the // state transition is only executed if AssignmentCollector's internal state is // equal to `expectedValue`. The return indicates whether the state was updated. // The implementation only allows the transitions // CachingApprovals -> VerifyingApprovals // and VerifyingApprovals -> Orphaned // Error returns: // * nil if the state transition was successfully executed // * ErrDifferentCollectorState if the AssignmentCollector's state is different than expectedCurrentStatus // * ErrInvalidCollectorStateTransition if the given state transition is impossible // * all other errors are unexpected and potential symptoms of internal bugs or state corruption (fatal) ChangeProcessingStatus(expectedValue, newValue ProcessingStatus) error }
AssignmentCollector tracks the known verifier assignments for a particular execution result. For the same result, there can be multiple different assignments. This happens if the result is incorporated in different forks, because in each fork a unique verifier assignment is generated using the incorporating block's unique 'source of randomness'. An AssignmentCollector can be in different states (enumerated by ProcessingStatus). State transitions are atomic and concurrency safe. The high-level AssignmentCollector implements the logic for state transitions, while it delegates the state-specific portion of the logic to `AssignmentCollectorState`.
type AssignmentCollectorBase ¶
type AssignmentCollectorBase struct {
// contains filtered or unexported fields
}
AssignmentCollectorBase holds the shared data and functionality for implementations of the AssignmentCollectorBase holds common dependencies and immutable values that are shared by the different states of an AssignmentCollector. It is indented as the base struct for the different `AssignmentCollectorState` implementations.
func NewAssignmentCollectorBase ¶
func NewAssignmentCollectorBase(logger zerolog.Logger, workerPool *workerpool.WorkerPool, result *flow.ExecutionResult, state protocol.State, headers storage.Headers, assigner module.ChunkAssigner, seals mempool.IncorporatedResultSeals, sigHasher hash.Hasher, approvalConduit network.Conduit, requestTracker *RequestTracker, requiredApprovalsForSealConstruction uint, ) (AssignmentCollectorBase, error)
func (*AssignmentCollectorBase) Block ¶
func (cb *AssignmentCollectorBase) Block() *flow.Header
func (*AssignmentCollectorBase) BlockID ¶
func (cb *AssignmentCollectorBase) BlockID() flow.Identifier
func (*AssignmentCollectorBase) OnInvalidApproval ¶
func (cb *AssignmentCollectorBase) OnInvalidApproval(approval *flow.ResultApproval, err error)
OnInvalidApproval logs in invalid approval
func (*AssignmentCollectorBase) Result ¶
func (cb *AssignmentCollectorBase) Result() *flow.ExecutionResult
func (*AssignmentCollectorBase) ResultID ¶
func (cb *AssignmentCollectorBase) ResultID() flow.Identifier
type AssignmentCollectorState ¶
type AssignmentCollectorState interface { // BlockID returns the ID of the executed block. BlockID() flow.Identifier // Block returns the header of the executed block. Block() *flow.Header // ResultID returns the ID of the result this assignment collector tracks. ResultID() flow.Identifier // Result returns the result this assignment collector tracks. Result() *flow.ExecutionResult // ProcessIncorporatedResult starts tracking the approval for IncorporatedResult. // Method is idempotent. // Error Returns: // * no errors expected during normal operation; // errors might be symptoms of bugs or internal state corruption (fatal) ProcessIncorporatedResult(incorporatedResult *flow.IncorporatedResult) error // ProcessApproval ingests Result Approvals and triggers sealing of execution result // when sufficient approvals have arrived. Method is idempotent. // Error Returns: // * nil in case of success (outdated approvals might be silently discarded) // * engine.InvalidInputError if the result approval is invalid // * any other errors might be symptoms of bugs or internal state corruption (fatal) ProcessApproval(approval *flow.ResultApproval) error // CheckEmergencySealing checks whether this AssignmentCollector can be emergency // sealed. If this is the case, the AssignmentCollector produces a candidate seal // as part of this method call. No errors are expected during normal operations. CheckEmergencySealing(observer consensus.SealingObservation, finalizedBlockHeight uint64) error // RequestMissingApprovals sends requests for missing approvals to the respective // verification nodes. Returns number of requests made. No errors are expected // during normal operations. RequestMissingApprovals(observer consensus.SealingObservation, maxHeightForRequesting uint64) (uint, error) // ProcessingStatus returns the AssignmentCollector's ProcessingStatus (state descriptor). ProcessingStatus() ProcessingStatus }
AssignmentCollectorState represents an AssignmentCollector in one specific state (without any knowledge about state transitions).
func NewOrphanAssignmentCollector ¶
func NewOrphanAssignmentCollector(collectorBase AssignmentCollectorBase) AssignmentCollectorState
type AssignmentCollectorStateMachine ¶
type AssignmentCollectorStateMachine struct { AssignmentCollectorBase sync.Mutex // contains filtered or unexported fields }
AssignmentCollectorStateMachine implements the `AssignmentCollector` interface. It wraps the current `AssignmentCollectorState` and provides logic for state transitions. Any state-specific logic is delegated to the state-specific instance. AssignmentCollectorStateMachine is fully concurrent.
Comment on concurrency safety for state-specific business logic:
- AssignmentCollectorStateMachine processes state updates concurrently with state-specific business logic. Hence, it can happen that we update a stale state.
- To guarantee that we hand inputs to the latest state, we employ a "Compare And Repeat Pattern": we atomically read the state before and after the operation. If the state changed, we updated a stale state. We repeat until we confirm that the latest state was updated.
func NewAssignmentCollectorStateMachine ¶
func NewAssignmentCollectorStateMachine(collectorBase AssignmentCollectorBase) *AssignmentCollectorStateMachine
func (*AssignmentCollectorStateMachine) ChangeProcessingStatus ¶
func (asm *AssignmentCollectorStateMachine) ChangeProcessingStatus(expectedCurrentStatus, newStatus ProcessingStatus) error
ChangeProcessingStatus changes the AssignmentCollector's internal processing status. The operation is implemented as an atomic compare-and-swap, i.e. the state transition is only executed if AssignmentCollector's internal state is equal to `expectedValue`. The return indicates whether the state was updated. The implementation only allows the following transitions:
CachingApprovals -> VerifyingApprovals CachingApprovals -> Orphaned VerifyingApprovals -> Orphaned
Error returns: * nil if the state transition was successfully executed * ErrDifferentCollectorState if the AssignmentCollector's state is different than expectedCurrentStatus * ErrInvalidCollectorStateTransition if the given state transition is impossible * all other errors are unexpected and potential symptoms of internal bugs or state corruption (fatal)
func (*AssignmentCollectorStateMachine) CheckEmergencySealing ¶
func (asm *AssignmentCollectorStateMachine) CheckEmergencySealing(observer consensus.SealingObservation, finalizedBlockHeight uint64) error
CheckEmergencySealing checks whether this AssignmentCollector can be emergency sealed. If this is the case, the AssignmentCollector produces a candidate seal as part of this method call. No errors are expected during normal operations.
func (*AssignmentCollectorStateMachine) ProcessApproval ¶
func (asm *AssignmentCollectorStateMachine) ProcessApproval(approval *flow.ResultApproval) error
ProcessApproval ingests Result Approvals and triggers sealing of execution result when sufficient approvals have arrived. Error Returns:
- nil in case of success (outdated approvals might be silently discarded)
- engine.InvalidInputError if the result approval is invalid
- any other errors might be symptoms of bugs or internal state corruption (fatal)
func (*AssignmentCollectorStateMachine) ProcessIncorporatedResult ¶
func (asm *AssignmentCollectorStateMachine) ProcessIncorporatedResult(incorporatedResult *flow.IncorporatedResult) error
ProcessIncorporatedResult starts tracking the approval for IncorporatedResult. Method is idempotent. Error Returns:
- no errors expected during normal operation; errors might be symptoms of bugs or internal state corruption (fatal)
func (*AssignmentCollectorStateMachine) ProcessingStatus ¶
func (asm *AssignmentCollectorStateMachine) ProcessingStatus() ProcessingStatus
ProcessingStatus returns the AssignmentCollector's ProcessingStatus (state descriptor).
func (*AssignmentCollectorStateMachine) RequestMissingApprovals ¶
func (asm *AssignmentCollectorStateMachine) RequestMissingApprovals(observer consensus.SealingObservation, maxHeightForRequesting uint64) (uint, error)
RequestMissingApprovals sends requests for missing approvals to the respective verification nodes. Returns number of requests made. No errors are expected during normal operations.
type AssignmentCollectorTree ¶
type AssignmentCollectorTree struct {
// contains filtered or unexported fields
}
AssignmentCollectorTree is a mempool holding assignment collectors, which is aware of the tree structure formed by the execution results. The mempool supports pruning by height: only collectors descending from the latest sealed and finalized result are relevant. Safe for concurrent access. Internally, the mempool utilizes the LevelledForest.
func NewAssignmentCollectorTree ¶
func NewAssignmentCollectorTree(lastSealed *flow.Header, headers storage.Headers, createCollector NewCollectorFactoryMethod) *AssignmentCollectorTree
func (*AssignmentCollectorTree) FinalizeForkAtLevel ¶
func (t *AssignmentCollectorTree) FinalizeForkAtLevel(finalized *flow.Header, sealed *flow.Header) error
FinalizeForkAtLevel orphans forks in the AssignmentCollectorTree and prunes levels below the sealed finalized height. When a block is finalized we can mark results for conflicting forks as orphaned and stop processing approvals for them. Eventually all forks will be cleaned up by height.
func (*AssignmentCollectorTree) GetCollector ¶
func (t *AssignmentCollectorTree) GetCollector(resultID flow.Identifier) AssignmentCollector
GetCollector returns assignment collector for the given result.
func (*AssignmentCollectorTree) GetCollectorsByInterval ¶
func (t *AssignmentCollectorTree) GetCollectorsByInterval(from, to uint64) []AssignmentCollector
GetCollectorsByInterval returns all collectors in state `VerifyingApprovals` whose executed block has height in [from; to)
func (*AssignmentCollectorTree) GetOrCreateCollector ¶
func (t *AssignmentCollectorTree) GetOrCreateCollector(result *flow.ExecutionResult) (*LazyInitCollector, error)
GetOrCreateCollector performs lazy initialization of AssignmentCollector using double-checked locking.
func (*AssignmentCollectorTree) GetSize ¶
func (t *AssignmentCollectorTree) GetSize() uint64
type BaseApprovalsTestSuite ¶
type BaseApprovalsTestSuite struct { suite.Suite ParentBlock *flow.Header // parent of sealing candidate Block *flow.Header // candidate for sealing IncorporatedBlock *flow.Header // block that incorporated result VerID flow.Identifier // for convenience, node id of first verifier Chunks flow.ChunkList // list of chunks of execution result ChunksAssignment *chunks.Assignment AuthorizedVerifiers map[flow.Identifier]*flow.Identity // map of authorized verifier identities for execution result PublicKey *module.PublicKey // public key used to mock signature verifications SigHasher hash.Hasher // used to verify signatures IncorporatedResult *flow.IncorporatedResult }
BaseApprovalsTestSuite is a base suite for testing approvals processing related functionality At nutshell generates mock data that can be used to create approvals and provides all needed data to validate those approvals for respected execution result.
func (*BaseApprovalsTestSuite) SetupTest ¶
func (s *BaseApprovalsTestSuite) SetupTest()
type BaseAssignmentCollectorTestSuite ¶
type BaseAssignmentCollectorTestSuite struct { BaseApprovalsTestSuite WorkerPool *workerpool.WorkerPool Blocks map[flow.Identifier]*flow.Header State *protocol.State Snapshots map[flow.Identifier]*protocol.Snapshot Headers *storage.Headers Assigner *module.ChunkAssigner SealsPL *mempool.IncorporatedResultSeals Conduit *mocknetwork.Conduit FinalizedAtHeight map[uint64]*flow.Header IdentitiesCache map[flow.Identifier]map[flow.Identifier]*flow.Identity // helper map to store identities for given block RequestTracker *RequestTracker }
BaseAssignmentCollectorTestSuite is a base suite for testing assignment collectors, contains mocks for all classes that are used in base assignment collector and can be reused in different test suites.
func (*BaseAssignmentCollectorTestSuite) MarkFinalized ¶
func (s *BaseAssignmentCollectorTestSuite) MarkFinalized(block *flow.Header)
func (*BaseAssignmentCollectorTestSuite) SetupTest ¶
func (s *BaseAssignmentCollectorTestSuite) SetupTest()
func (*BaseAssignmentCollectorTestSuite) TearDownTest ¶
func (s *BaseAssignmentCollectorTestSuite) TearDownTest()
type CachingAssignmentCollector ¶
type CachingAssignmentCollector struct { AssignmentCollectorBase // contains filtered or unexported fields }
CachingAssignmentCollector is an AssignmentCollectorState with the fixed `ProcessingStatus` of `CachingApprovals`.
func NewCachingAssignmentCollector ¶
func NewCachingAssignmentCollector(collectorBase AssignmentCollectorBase) *CachingAssignmentCollector
func (*CachingAssignmentCollector) CheckEmergencySealing ¶
func (ac *CachingAssignmentCollector) CheckEmergencySealing(consensus.SealingObservation, uint64) error
func (*CachingAssignmentCollector) GetApprovals ¶
func (ac *CachingAssignmentCollector) GetApprovals() []*flow.ResultApproval
func (*CachingAssignmentCollector) GetIncorporatedResults ¶
func (ac *CachingAssignmentCollector) GetIncorporatedResults() []*flow.IncorporatedResult
func (*CachingAssignmentCollector) ProcessApproval ¶
func (ac *CachingAssignmentCollector) ProcessApproval(approval *flow.ResultApproval) error
ProcessApproval ingests Result Approvals and triggers sealing of execution result when sufficient approvals have arrived. Error Returns:
- nil in case of success (outdated approvals might be silently discarded)
- engine.InvalidInputError if the result approval is invalid
- any other errors might be symptoms of bugs or internal state corruption (fatal)
func (*CachingAssignmentCollector) ProcessIncorporatedResult ¶
func (ac *CachingAssignmentCollector) ProcessIncorporatedResult(incorporatedResult *flow.IncorporatedResult) error
ProcessIncorporatedResult starts tracking the approval for IncorporatedResult. Method is idempotent. Error Returns:
- no errors expected during normal operation; errors might be symptoms of bugs or internal state corruption (fatal)
func (*CachingAssignmentCollector) ProcessingStatus ¶
func (ac *CachingAssignmentCollector) ProcessingStatus() ProcessingStatus
func (*CachingAssignmentCollector) RequestMissingApprovals ¶
func (ac *CachingAssignmentCollector) RequestMissingApprovals(consensus.SealingObservation, uint64) (uint, error)
type ChunkApprovalCollector ¶
type ChunkApprovalCollector struct {
// contains filtered or unexported fields
}
ChunkApprovalCollector implements logic for checking chunks against assignments as well as accumulating signatures of already checked approvals.
func NewChunkApprovalCollector ¶
func NewChunkApprovalCollector(assignment map[flow.Identifier]struct{}, requiredApprovalsForSealConstruction uint) *ChunkApprovalCollector
func (*ChunkApprovalCollector) GetMissingSigners ¶
func (c *ChunkApprovalCollector) GetMissingSigners() flow.IdentifierList
GetMissingSigners returns ids of approvers that are present in assignment but didn't provide approvals
func (*ChunkApprovalCollector) ProcessApproval ¶
func (c *ChunkApprovalCollector) ProcessApproval(approval *flow.ResultApproval) (flow.AggregatedSignature, bool)
ProcessApproval performs processing and bookkeeping of single approval
type IncorporatedResultsCache ¶
type IncorporatedResultsCache struct {
// contains filtered or unexported fields
}
IncorporatedResultsCache encapsulates a map for storing IncorporatedResults to provide concurrent access.
func NewIncorporatedResultsCache ¶
func NewIncorporatedResultsCache(sizeHint uint) *IncorporatedResultsCache
func (*IncorporatedResultsCache) All ¶
func (c *IncorporatedResultsCache) All() []*flow.IncorporatedResult
All returns all stored approvals
func (*IncorporatedResultsCache) Get ¶
func (c *IncorporatedResultsCache) Get(key flow.Identifier) *flow.IncorporatedResult
Get returns IncorporatedResult for the given ID (or nil if none is stored)
func (*IncorporatedResultsCache) Put ¶
func (c *IncorporatedResultsCache) Put(key flow.Identifier, incRes *flow.IncorporatedResult) bool
Put saves IncorporatedResult into cache; returns true iff IncorporatedResult was newly added
type LazyInitCollector ¶
type LazyInitCollector struct { Collector AssignmentCollector Created bool // whether collector was created or retrieved from cache }
LazyInitCollector is a helper structure that is used to return collector which is lazy initialized
type LruCache ¶
type LruCache struct {
// contains filtered or unexported fields
}
LruCache is a wrapper over `simplelru.LRUCache` that provides needed api for processing result approvals Extends functionality of `simplelru.LRUCache` by introducing additional index for quicker access.
func NewApprovalsLRUCache ¶
func (*LruCache) Get ¶
func (c *LruCache) Get(approvalID flow.Identifier) *flow.ResultApproval
func (*LruCache) Peek ¶
func (c *LruCache) Peek(approvalID flow.Identifier) *flow.ResultApproval
func (*LruCache) Put ¶
func (c *LruCache) Put(approval *flow.ResultApproval)
func (*LruCache) TakeByResultID ¶
func (c *LruCache) TakeByResultID(resultID flow.Identifier) []*flow.ResultApproval
type NewCollectorFactoryMethod ¶
type NewCollectorFactoryMethod = func(result *flow.ExecutionResult) (AssignmentCollector, error)
NewCollectorFactoryMethod is a factory method to generate an AssignmentCollector for an execution result
type OrphanAssignmentCollector ¶
type OrphanAssignmentCollector struct {
AssignmentCollectorBase
}
OrphanAssignmentCollector is an AssignmentCollectorState with the fixed `ProcessingStatus` of `Orphaned`.
func (*OrphanAssignmentCollector) CheckEmergencySealing ¶
func (oc *OrphanAssignmentCollector) CheckEmergencySealing(consensus.SealingObservation, uint64) error
func (*OrphanAssignmentCollector) ProcessApproval ¶
func (oc *OrphanAssignmentCollector) ProcessApproval(*flow.ResultApproval) error
func (*OrphanAssignmentCollector) ProcessIncorporatedResult ¶
func (oc *OrphanAssignmentCollector) ProcessIncorporatedResult(*flow.IncorporatedResult) error
func (*OrphanAssignmentCollector) ProcessingStatus ¶
func (oc *OrphanAssignmentCollector) ProcessingStatus() ProcessingStatus
func (*OrphanAssignmentCollector) RequestMissingApprovals ¶
func (oc *OrphanAssignmentCollector) RequestMissingApprovals(consensus.SealingObservation, uint64) (uint, error)
type ProcessingStatus ¶
type ProcessingStatus int
ProcessingStatus is a state descriptor for the AssignmentCollector.
const ( // CachingApprovals is a state descriptor for the AssignmentCollector. In this state, // the collector is currently caching approvals but _not_ yet processing them. CachingApprovals ProcessingStatus = iota // VerifyingApprovals is a state descriptor for the AssignmentCollector. In this state, // the collector is processing approvals. VerifyingApprovals // Orphaned is a state descriptor for the AssignmentCollector. In this state, // the collector discards all approvals. Orphaned )
func (ProcessingStatus) String ¶
func (ps ProcessingStatus) String() string
type RequestTracker ¶
type RequestTracker struct {
// contains filtered or unexported fields
}
RequestTracker is an index of RequestTrackerItems indexed by execution result Index on result ID, incorporated block ID and chunk index. Is concurrency-safe.
func NewRequestTracker ¶
func NewRequestTracker(headers storage.Headers, blackoutPeriodMin, blackoutPeriodMax int) *RequestTracker
NewRequestTracker instantiates a new RequestTracker with blackout periods between min and max seconds.
func (*RequestTracker) GetAllIds ¶
func (rt *RequestTracker) GetAllIds() []flow.Identifier
GetAllIds returns all result IDs that we are indexing
func (*RequestTracker) PruneUpToHeight ¶
func (rt *RequestTracker) PruneUpToHeight(height uint64) error
PruneUpToHeight remove all tracker items for blocks whose height is strictly smaller that height. Note: items for blocks at height are retained. After pruning, items for blocks below the given height are dropped.
Monotonicity Requirement: The pruned height cannot decrease, as we cannot recover already pruned elements. If `height` is smaller than the previous value, the previous value is kept and the sentinel mempool.DecreasingPruningHeightError is returned.
func (*RequestTracker) Remove ¶
func (rt *RequestTracker) Remove(resultIDs ...flow.Identifier)
Remove removes all entries pertaining to an execution result
func (*RequestTracker) TryUpdate ¶
func (rt *RequestTracker) TryUpdate(result *flow.ExecutionResult, incorporatedBlockID flow.Identifier, chunkIndex uint64) (RequestTrackerItem, bool, error)
TryUpdate tries to update tracker item if it's not in blackout period. Returns the tracker item for a specific chunk (creates it if it doesn't exists) and whenever request item was successfully updated or not. Since RequestTracker prunes items by height it can't accept items for height lower than cached lowest height. If height of executed block pointed by execution result is smaller than the lowest height, sentinel mempool.DecreasingPruningHeightError is returned. In case execution result points to unknown executed block exception will be returned.
type RequestTrackerItem ¶
type RequestTrackerItem struct { Requests uint NextTimeout time.Time // contains filtered or unexported fields }
RequestTrackerItem is an object that keeps track of how many times a request has been made, as well as the time until a new request can be made. It is not concurrency-safe.
func NewRequestTrackerItem ¶
func NewRequestTrackerItem(blackoutPeriodMin, blackoutPeriodMax int) RequestTrackerItem
NewRequestTrackerItem instantiates a new RequestTrackerItem where the NextTimeout is evaluated to the current time plus a random blackout period contained between min and max.
func (RequestTrackerItem) IsBlackout ¶
func (i RequestTrackerItem) IsBlackout() bool
func (RequestTrackerItem) Update ¶
func (i RequestTrackerItem) Update() RequestTrackerItem
Update creates a _new_ RequestTrackerItem with incremented request number and updated NextTimeout.
type SignatureCollector ¶
type SignatureCollector struct {
// contains filtered or unexported fields
}
SignatureCollector contains a set of of signatures from verifiers attesting to the validity of an execution result chunk. NOT concurrency safe. TODO: this will be replaced with stateful BLS aggregation
func NewSignatureCollector ¶
func NewSignatureCollector() SignatureCollector
NewSignatureCollector instantiates a new SignatureCollector
func (*SignatureCollector) Add ¶
func (c *SignatureCollector) Add(signerID flow.Identifier, signature crypto.Signature)
Add appends a signature. Only the _first_ signature is retained for each signerID.
func (*SignatureCollector) BySigner ¶
func (c *SignatureCollector) BySigner(signerID flow.Identifier) (*crypto.Signature, bool)
BySigner returns a signer's signature if it exists
func (*SignatureCollector) HasSigned ¶
func (c *SignatureCollector) HasSigned(signerID flow.Identifier) bool
HasSigned checks if signer has already provided a signature
func (*SignatureCollector) NumberSignatures ¶
func (c *SignatureCollector) NumberSignatures() uint
NumberSignatures returns the number of stored (distinct) signatures
func (*SignatureCollector) ToAggregatedSignature ¶
func (c *SignatureCollector) ToAggregatedSignature() flow.AggregatedSignature
ToAggregatedSignature generates an aggregated signature from all signatures in the SignatureCollector
type VerifyingAssignmentCollector ¶
type VerifyingAssignmentCollector struct { AssignmentCollectorBase // contains filtered or unexported fields }
VerifyingAssignmentCollector Context:
- When the same result is incorporated in multiple different forks, unique verifier assignment is determined for each fork.
- The assignment collector is intended to encapsulate the known assignments for a particular execution result.
VerifyingAssignmentCollector has a strict ordering of processing, before processing approvals at least one incorporated result has to be processed. VerifyingAssignmentCollector takes advantage of internal caching to speed up processing approvals for different assignments VerifyingAssignmentCollector is responsible for validating approvals on result-level (checking signature, identity).
func NewVerifyingAssignmentCollector ¶
func NewVerifyingAssignmentCollector(collectorBase AssignmentCollectorBase) (*VerifyingAssignmentCollector, error)
NewVerifyingAssignmentCollector instantiates a new VerifyingAssignmentCollector. All errors are unexpected and potential symptoms of internal bugs or state corruption (fatal).
func (*VerifyingAssignmentCollector) CheckEmergencySealing ¶
func (ac *VerifyingAssignmentCollector) CheckEmergencySealing(observer consensus.SealingObservation, finalizedBlockHeight uint64) error
CheckEmergencySealing checks the managed assignments whether their result can be emergency sealed. Seals the results where possible. It returns error when running into any exception It returns nil when it's done the checking regardless whether there is any results being emergency sealed or not
func (*VerifyingAssignmentCollector) ProcessApproval ¶
func (ac *VerifyingAssignmentCollector) ProcessApproval(approval *flow.ResultApproval) error
ProcessApproval ingests Result Approvals and triggers sealing of execution result when sufficient approvals have arrived. Error Returns:
- nil in case of success (outdated approvals might be silently discarded)
- engine.InvalidInputError if the result approval is invalid
- any other errors might be symptoms of bugs or internal state corruption (fatal)
func (*VerifyingAssignmentCollector) ProcessIncorporatedResult ¶
func (ac *VerifyingAssignmentCollector) ProcessIncorporatedResult(incorporatedResult *flow.IncorporatedResult) error
ProcessIncorporatedResult starts tracking the approval for IncorporatedResult. Method is idempotent. Error Returns:
- no errors expected during normal operation; errors might be symptoms of bugs or internal state corruption (fatal)
func (*VerifyingAssignmentCollector) ProcessingStatus ¶
func (ac *VerifyingAssignmentCollector) ProcessingStatus() ProcessingStatus
func (*VerifyingAssignmentCollector) RequestMissingApprovals ¶
func (ac *VerifyingAssignmentCollector) RequestMissingApprovals(observation consensus.SealingObservation, maxHeightForRequesting uint64) (uint, error)
RequestMissingApprovals traverses all collectors and requests missing approval for every chunk that didn't get enough approvals from verifiers. Returns number of requests made and error in case something goes wrong.
Source Files ¶
- aggregated_signatures.go
- approval_collector.go
- approvals_lru_cache.go
- assignment_collector.go
- assignment_collector_base.go
- assignment_collector_statemachine.go
- assignment_collector_tree.go
- caches.go
- caching_assignment_collector.go
- chunk_collector.go
- orphan_assignment_collector.go
- request_tracker.go
- signature_collector.go
- testutil.go
- verifying_assignment_collector.go