Documentation ¶
Overview ¶
Package fetch contains mechanism to fetch Data from remote peers
nolint
Index ¶
- Constants
- Variables
- type BatchError
- type Config
- type EpochData
- type Fetch
- func (f *Fetch) GetActiveSet(ctx context.Context, set types.Hash32) error
- func (f *Fetch) GetAtxs(ctx context.Context, ids []types.ATXID, opts ...system.GetAtxOpt) error
- func (f *Fetch) GetBallots(ctx context.Context, ids []types.BallotID) error
- func (f *Fetch) GetBlockTxs(ctx context.Context, ids []types.TransactionID) error
- func (f *Fetch) GetBlocks(ctx context.Context, ids []types.BlockID) error
- func (f *Fetch) GetCert(ctx context.Context, lid types.LayerID, bid types.BlockID, peers []p2p.Peer) (*types.Certificate, error)
- func (f *Fetch) GetLayerData(ctx context.Context, peer p2p.Peer, lid types.LayerID) ([]byte, error)
- func (f *Fetch) GetLayerOpinions(ctx context.Context, peer p2p.Peer, lid types.LayerID) ([]byte, error)
- func (f *Fetch) GetMalfeasanceProofs(ctx context.Context, ids []types.NodeID) error
- func (f *Fetch) GetMaliciousIDs(ctx context.Context, peer p2p.Peer) ([]types.NodeID, error)
- func (f *Fetch) GetPoetProof(ctx context.Context, id types.Hash32) error
- func (f *Fetch) GetProposalTxs(ctx context.Context, ids []types.TransactionID) error
- func (f *Fetch) GetProposals(ctx context.Context, ids []types.ProposalID) error
- func (f *Fetch) PeerEpochInfo(ctx context.Context, peer p2p.Peer, epoch types.EpochID) (*EpochData, error)
- func (f *Fetch) PeerMeshHashes(ctx context.Context, peer p2p.Peer, req *MeshHashRequest) (*MeshHashes, error)
- func (f *Fetch) RegisterPeerHash(peer p2p.Peer, hash types.Hash32)
- func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32)
- func (f *Fetch) SelectBestShuffled(n int) []p2p.Peer
- func (f *Fetch) SetValidators(atx SyncValidator, poet SyncValidator, ballot SyncValidator, ...)
- func (f *Fetch) Start() error
- func (f *Fetch) Stop()
- type HashPeers
- type HashPeersCache
- type LayerData
- type LayerOpinion
- func (t *LayerOpinion) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *LayerOpinion) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (lo *LayerOpinion) MarshalLogObject(encoder zapcore.ObjectEncoder) error
- func (lo *LayerOpinion) Peer() p2p.Peer
- func (lo *LayerOpinion) SetPeer(p p2p.Peer)
- type MaliciousIDs
- type MeshHashRequest
- func (r *MeshHashRequest) Count() uint
- func (t *MeshHashRequest) DecodeScale(dec *scale.Decoder) (total int, err error)
- func (t *MeshHashRequest) EncodeScale(enc *scale.Encoder) (total int, err error)
- func (r *MeshHashRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error
- func (r *MeshHashRequest) Validate() error
- type MeshHashes
- type OpinionRequest
- type Option
- type PoetValidator
- type RequestBatch
- type RequestMessage
- type ResponseBatch
- type ResponseMessage
- type ServerConfig
- type SyncValidator
- type ValidatorFunc
Constants ¶
const ( OpnProtocol = "lp/2" RedundantPeers = 5 )
const MaxHashesInReq = 100
Variables ¶
var ( // ErrExceedMaxRetries is returned when MaxRetriesForRequest attempts has been made to fetch // data for a hash and failed. ErrExceedMaxRetries = errors.New("fetch failed after max retries for request") )
var ErrIgnore = errors.New("fetch: ignore")
Functions ¶
This section is empty.
Types ¶
type BatchError ¶ added in v1.4.0
func (*BatchError) Empty ¶ added in v1.4.0
func (b *BatchError) Empty() bool
func (*BatchError) Error ¶ added in v1.4.0
func (b *BatchError) Error() string
func (*BatchError) Ignore ¶ added in v1.4.3
func (b *BatchError) Ignore() bool
func (*BatchError) Is ¶ added in v1.6.4
func (b *BatchError) Is(target error) bool
type Config ¶
type Config struct { BatchTimeout time.Duration `mapstructure:"batchtimeout"` BatchSize int `mapstructure:"batchsize"` QueueSize int `mapstructure:"queuesize"` MaxRetriesForRequest int `mapstructure:"maxretriesforrequest"` RequestTimeout time.Duration `mapstructure:"request-timeout"` RequestHardTimeout time.Duration `mapstructure:"request-hard-timeout"` EnableServerMetrics bool `mapstructure:"servers-metrics"` ServersConfig map[string]ServerConfig `mapstructure:"servers"` Streaming bool `mapstructure:"streaming"` // The maximum number of concurrent requests to get ATXs. GetAtxsConcurrency int64 `mapstructure:"getatxsconcurrency"` DecayingTag server.DecayingTagSpec `mapstructure:"decaying-tag"` LogPeerStatsInterval time.Duration `mapstructure:"log-peer-stats-interval"` }
Config is the configuration file of the Fetch component.
func DefaultConfig ¶ added in v1.0.0
func DefaultConfig() Config
DefaultConfig is the default config for the fetch component.
type EpochData ¶ added in v1.0.0
type EpochData struct { // When changing this value also check the size of // - `ResponseMessage.Data` above // - `Response.Data` in `p2p/server/server.go` // - `NodeIDs` in `MaliciousIDs` above // - `Set` in `EpochActiveSet` in common/types/activation.go // - `EligibilityProofs` in the type `Ballot` in common/types/ballot.go // - `Rewards` in the type `InnerBlock` in common/types/block.go // - `Ballots` in the type `LayerData` below // - `Proposals` in the type `Value` in hare3/types.go // - `Proposals` and `CompactProposals` in the type `Value` in hare4/types.go AtxIDs []types.ATXID `scale:"max=8000000"` }
func (*EpochData) DecodeScale ¶ added in v1.0.0
func (*EpochData) EncodeScale ¶ added in v1.0.0
type Fetch ¶
type Fetch struct {
// contains filtered or unexported fields
}
Fetch is the main struct that contains network peers and logic to batch and dispatch hash fetch requests.
func NewFetch ¶
func NewFetch( cdb *datastore.CachedDB, proposals *store.Store, host *p2p.Host, opts ...Option, ) (*Fetch, error)
NewFetch creates a new Fetch struct.
func (*Fetch) GetActiveSet ¶ added in v1.1.5
GetActiveSet downloads activeset.
func (*Fetch) GetAtxs ¶ added in v1.0.0
GetAtxs gets the data for given atx IDs and validates them. Returns an error if at least one ATX cannot be fetched.
func (*Fetch) GetBallots ¶ added in v1.0.0
GetBallots gets data for the specified BallotIDs and validates them.
func (*Fetch) GetBlockTxs ¶ added in v1.0.0
GetBlockTxs fetches the txs provided as IDs and saves them, they will be validated before block is applied.
func (*Fetch) GetLayerData ¶ added in v1.0.0
GetLayerData get layer data from peers.
func (*Fetch) GetLayerOpinions ¶ added in v1.0.0
func (*Fetch) GetMalfeasanceProofs ¶ added in v1.0.0
GetMalfeasanceProofs gets malfeasance proofs for the specified NodeIDs and validates them.
func (*Fetch) GetMaliciousIDs ¶ added in v1.0.0
func (*Fetch) GetPoetProof ¶ added in v1.0.0
GetPoetProof gets poet proof from remote peer.
func (*Fetch) GetProposalTxs ¶ added in v1.0.0
GetProposalTxs fetches the txs provided as IDs and validates them, returns an error if one TX failed to be fetched.
func (*Fetch) GetProposals ¶ added in v1.0.0
GetProposals gets the data for given proposal IDs from peers.
func (*Fetch) PeerEpochInfo ¶ added in v1.0.0
func (f *Fetch) PeerEpochInfo(ctx context.Context, peer p2p.Peer, epoch types.EpochID) (*EpochData, error)
PeerEpochInfo get the epoch info published in the given epoch from the specified peer.
func (*Fetch) PeerMeshHashes ¶ added in v1.0.0
func (f *Fetch) PeerMeshHashes(ctx context.Context, peer p2p.Peer, req *MeshHashRequest) (*MeshHashes, error)
func (*Fetch) RegisterPeerHash ¶ added in v1.4.1
RegisterPeerHashes registers provided peer for a hash.
func (*Fetch) RegisterPeerHashes ¶ added in v1.0.0
RegisterPeerHashes registers provided peer for a list of hashes.
func (*Fetch) SelectBestShuffled ¶ added in v1.2.6
func (*Fetch) SetValidators ¶ added in v1.0.0
func (f *Fetch) SetValidators( atx SyncValidator, poet SyncValidator, ballot SyncValidator, activeset SyncValidator, block SyncValidator, prop SyncValidator, txBlock SyncValidator, txProposal SyncValidator, mal SyncValidator, )
SetValidators sets the handlers to validate various mesh data fetched from peers.
type HashPeersCache ¶ added in v1.0.0
type HashPeersCache struct { *lru.Cache[types.Hash32, HashPeers] // contains filtered or unexported fields }
HashPeersCache holds lru cache of peers to pull hash from.
func NewHashPeersCache ¶ added in v1.0.0
func NewHashPeersCache(size int) (*HashPeersCache, error)
NewHashPeersCache creates a new hash-to-peers cache.
func (*HashPeersCache) Add ¶ added in v1.0.0
func (hpc *HashPeersCache) Add(hash types.Hash32, peer p2p.Peer)
Add is a thread-safe version of add.
func (*HashPeersCache) GetRandom ¶ added in v1.0.0
func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) []p2p.Peer
GetRandom returns a randomized list of peers for a given hash.
func (*HashPeersCache) RegisterPeerHashes ¶ added in v1.0.0
func (hpc *HashPeersCache) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32)
RegisterPeerHashes registers provided peer for a list of hashes.
type LayerData ¶ added in v1.0.0
type LayerData struct { // Ballots contains the ballots for the given layer. // // Worst case scenario is that a single smesher identity has > 99.97% of the total weight of the network. // In this case they will get all 50 available slots in all 4032 layers of the epoch. // Additionally every other identity on the network that successfully published an ATX will get 1 slot. // // If we expect 8.0 Mio ATXs that would be a total of 8.0 Mio + 50 * 4032 = 8 201 600 slots. // Since these are randomly distributed across the epoch, we can expect an average of n * p = // 8 201 600 / 4032 = 2034.1 ballots in a layer with a standard deviation of sqrt(n * p * (1 - p)) = // sqrt(8 201 600 * 1/4032 * 4031/4032) = 45.1 // // This means that we can expect a maximum of 2034.1 + 6*45.1 = 2304.7 ballots per layer with // > 99.9997% probability. Ballots []types.BallotID `scale:"max=2350"` }
LayerData is the data response for a given layer ID.
func (*LayerData) DecodeScale ¶ added in v1.0.0
func (*LayerData) EncodeScale ¶ added in v1.0.0
type LayerOpinion ¶ added in v1.0.0
type LayerOpinion struct { PrevAggHash types.Hash32 Certified *types.BlockID // contains filtered or unexported fields }
func (*LayerOpinion) DecodeScale ¶ added in v1.0.0
func (t *LayerOpinion) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*LayerOpinion) EncodeScale ¶ added in v1.0.0
func (t *LayerOpinion) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*LayerOpinion) MarshalLogObject ¶ added in v1.0.0
func (lo *LayerOpinion) MarshalLogObject(encoder zapcore.ObjectEncoder) error
MarshalLogObject implements logging encoder for LayerOpinion.
func (*LayerOpinion) SetPeer ¶ added in v1.0.0
func (lo *LayerOpinion) SetPeer(p p2p.Peer)
SetPeer ...
type MaliciousIDs ¶ added in v1.0.0
type MaliciousIDs struct {
NodeIDs []types.NodeID `scale:"max=8000000"` // to be in line with `EpochData.AtxIDs` below
}
func (*MaliciousIDs) DecodeScale ¶ added in v1.0.0
func (t *MaliciousIDs) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*MaliciousIDs) EncodeScale ¶ added in v1.0.0
func (t *MaliciousIDs) EncodeScale(enc *scale.Encoder) (total int, err error)
type MeshHashRequest ¶ added in v1.0.0
MeshHashRequest is used by ForkFinder to request the hashes of layers from a peer to find the layer at which a divergence occurred in the local mesh of the node.
From and To define the beginning and end layer to request. By defines the increment between layers to limit the number of hashes in the response, e.g. 2 means only every other hash is requested. The number of hashes in the response is limited by `fetch.MaxHashesInReq`.
func NewMeshHashRequest ¶ added in v1.0.0
func NewMeshHashRequest(from, to types.LayerID) *MeshHashRequest
func (*MeshHashRequest) Count ¶ added in v1.0.0
func (r *MeshHashRequest) Count() uint
func (*MeshHashRequest) DecodeScale ¶ added in v1.0.0
func (t *MeshHashRequest) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*MeshHashRequest) EncodeScale ¶ added in v1.0.0
func (t *MeshHashRequest) EncodeScale(enc *scale.Encoder) (total int, err error)
func (*MeshHashRequest) MarshalLogObject ¶ added in v1.0.0
func (r *MeshHashRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error
func (*MeshHashRequest) Validate ¶ added in v1.0.0
func (r *MeshHashRequest) Validate() error
type MeshHashes ¶ added in v1.0.0
type MeshHashes struct { // depends on syncer Config `MaxHashesInReq`, defaults to 100, 1000 is a safe upper bound Hashes []types.Hash32 `scale:"max=1000"` }
func (*MeshHashes) DecodeScale ¶ added in v1.0.0
func (t *MeshHashes) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*MeshHashes) EncodeScale ¶ added in v1.0.0
func (t *MeshHashes) EncodeScale(enc *scale.Encoder) (total int, err error)
type OpinionRequest ¶ added in v1.1.0
func (*OpinionRequest) DecodeScale ¶ added in v1.1.0
func (t *OpinionRequest) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*OpinionRequest) EncodeScale ¶ added in v1.1.0
func (t *OpinionRequest) EncodeScale(enc *scale.Encoder) (total int, err error)
type Option ¶ added in v1.0.0
type Option func(*Fetch)
Option is a type to configure a fetcher.
func WithConfig ¶ added in v1.0.0
WithConfig configures the config for the fetcher.
func WithContext ¶ added in v1.0.0
WithContext configures the shutdown context for the fetcher.
func WithLogger ¶ added in v1.0.0
WithLogger configures logger for the fetcher.
type PoetValidator ¶ added in v1.0.0
type RequestBatch ¶ added in v1.0.0
type RequestBatch struct { ID types.Hash32 // depends on fetch config `BatchSize` which defaults to 10, more than 100 seems unlikely Requests []RequestMessage `scale:"max=100"` }
RequestBatch is a batch of requests and a hash of all requests as ID.
func (*RequestBatch) DecodeScale ¶ added in v1.0.0
func (t *RequestBatch) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*RequestBatch) EncodeScale ¶ added in v1.0.0
func (t *RequestBatch) EncodeScale(enc *scale.Encoder) (total int, err error)
type RequestMessage ¶ added in v1.0.0
type RequestMessage struct { Hint datastore.Hint `scale:"max=256"` // TODO(mafa): covert to an enum Hash types.Hash32 }
RequestMessage is sent to the peer for hash query.
func (*RequestMessage) DecodeScale ¶ added in v1.0.0
func (t *RequestMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*RequestMessage) EncodeScale ¶ added in v1.0.0
func (t *RequestMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
type ResponseBatch ¶ added in v1.0.0
type ResponseBatch struct { ID types.Hash32 // depends on fetch config `BatchSize` which defaults to 10, more than 100 seems unlikely Responses []ResponseMessage `scale:"max=100"` }
ResponseBatch is the response struct send for a RequestBatch. The ResponseBatch ID must be the same as stated in RequestBatch even if not all Data is present.
func (*ResponseBatch) DecodeScale ¶ added in v1.0.0
func (t *ResponseBatch) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*ResponseBatch) EncodeScale ¶ added in v1.0.0
func (t *ResponseBatch) EncodeScale(enc *scale.Encoder) (total int, err error)
type ResponseMessage ¶ added in v1.0.0
type ResponseMessage struct { Hash types.Hash32 // keep in line with limit of Response.Data in `p2p/server/server.go` Data []byte `scale:"max=272629760"` // 260 MiB > 8.0 mio ATX * 32 bytes per ID }
ResponseMessage is sent to the node as a response.
func (*ResponseMessage) DecodeScale ¶ added in v1.0.0
func (t *ResponseMessage) DecodeScale(dec *scale.Decoder) (total int, err error)
func (*ResponseMessage) EncodeScale ¶ added in v1.0.0
func (t *ResponseMessage) EncodeScale(enc *scale.Encoder) (total int, err error)
type ServerConfig ¶ added in v1.3.0
type SyncValidator ¶ added in v1.0.0
type SyncValidator interface {
HandleMessage(context.Context, types.Hash32, p2p.Peer, []byte) error
}
SyncValidator exists to allow for mocking of GossipHandlers through the use of ValidatorFunc.
type ValidatorFunc ¶ added in v1.0.0
type ValidatorFunc pubsub.SyncHandler
The ValidatorFunc type is an adapter to allow the use of functions as SyncValidators so that we can mock the behavior of GossipHandlers. If we didn't need to mock GossipHandler behavior then we could use GossipHandlers directly and do away with both ValidatorFunc and SyncValidator.