fetch

package
v1.7.5-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2024 License: MIT Imports: 33 Imported by: 1

Documentation

Overview

Package fetch contains mechanism to fetch Data from remote peers

nolint

Index

Constants

View Source
const (
	OpnProtocol = "lp/2"

	RedundantPeers = 5
)
View Source
const MaxHashesInReq = 100

Variables

View Source
var (
	// ErrExceedMaxRetries is returned when MaxRetriesForRequest attempts has been made to fetch
	// data for a hash and failed.
	ErrExceedMaxRetries = errors.New("fetch failed after max retries for request")
)
View Source
var ErrIgnore = errors.New("fetch: ignore")

Functions

This section is empty.

Types

type BatchError added in v1.4.0

type BatchError struct {
	Errors map[types.Hash32]error
	// contains filtered or unexported fields
}

func (*BatchError) Add added in v1.4.0

func (b *BatchError) Add(id types.Hash32, err error)

func (*BatchError) Empty added in v1.4.0

func (b *BatchError) Empty() bool

func (*BatchError) Error added in v1.4.0

func (b *BatchError) Error() string

func (*BatchError) Ignore added in v1.4.3

func (b *BatchError) Ignore() bool

func (*BatchError) Is added in v1.6.4

func (b *BatchError) Is(target error) bool

func (*BatchError) IsIgnored added in v1.4.3

func (b *BatchError) IsIgnored(hash types.Hash32) bool

type Config

type Config struct {
	BatchTimeout         time.Duration           `mapstructure:"batchtimeout"`
	BatchSize            int                     `mapstructure:"batchsize"`
	QueueSize            int                     `mapstructure:"queuesize"`
	MaxRetriesForRequest int                     `mapstructure:"maxretriesforrequest"`
	RequestTimeout       time.Duration           `mapstructure:"request-timeout"`
	RequestHardTimeout   time.Duration           `mapstructure:"request-hard-timeout"`
	EnableServerMetrics  bool                    `mapstructure:"servers-metrics"`
	ServersConfig        map[string]ServerConfig `mapstructure:"servers"`
	Streaming            bool                    `mapstructure:"streaming"`
	// The maximum number of concurrent requests to get ATXs.
	GetAtxsConcurrency   int64                  `mapstructure:"getatxsconcurrency"`
	DecayingTag          server.DecayingTagSpec `mapstructure:"decaying-tag"`
	LogPeerStatsInterval time.Duration          `mapstructure:"log-peer-stats-interval"`
}

Config is the configuration file of the Fetch component.

func DefaultConfig added in v1.0.0

func DefaultConfig() Config

DefaultConfig is the default config for the fetch component.

type EpochData added in v1.0.0

type EpochData struct {
	// When changing this value also check the size of
	// - `ResponseMessage.Data` above
	// - `Response.Data` in `p2p/server/server.go`
	// - `NodeIDs` in `MaliciousIDs` above
	// - `Set` in `EpochActiveSet` in common/types/activation.go
	// - `EligibilityProofs` in the type `Ballot` in common/types/ballot.go
	// - `Rewards` in the type `InnerBlock` in common/types/block.go
	// - `Ballots` in the type `LayerData` below
	// - `Proposals` in the type `Value` in hare3/types.go
	// - `Proposals` and `CompactProposals` in the type `Value` in hare4/types.go
	AtxIDs []types.ATXID `scale:"max=8000000"`
}

func (*EpochData) DecodeScale added in v1.0.0

func (t *EpochData) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*EpochData) EncodeScale added in v1.0.0

func (t *EpochData) EncodeScale(enc *scale.Encoder) (total int, err error)

type Fetch

type Fetch struct {
	// contains filtered or unexported fields
}

Fetch is the main struct that contains network peers and logic to batch and dispatch hash fetch requests.

func NewFetch

func NewFetch(
	cdb *datastore.CachedDB,
	proposals *store.Store,
	host *p2p.Host,
	opts ...Option,
) (*Fetch, error)

NewFetch creates a new Fetch struct.

func (*Fetch) GetActiveSet added in v1.1.5

func (f *Fetch) GetActiveSet(ctx context.Context, set types.Hash32) error

GetActiveSet downloads activeset.

func (*Fetch) GetAtxs added in v1.0.0

func (f *Fetch) GetAtxs(ctx context.Context, ids []types.ATXID, opts ...system.GetAtxOpt) error

GetAtxs gets the data for given atx IDs and validates them. Returns an error if at least one ATX cannot be fetched.

func (*Fetch) GetBallots added in v1.0.0

func (f *Fetch) GetBallots(ctx context.Context, ids []types.BallotID) error

GetBallots gets data for the specified BallotIDs and validates them.

func (*Fetch) GetBlockTxs added in v1.0.0

func (f *Fetch) GetBlockTxs(ctx context.Context, ids []types.TransactionID) error

GetBlockTxs fetches the txs provided as IDs and saves them, they will be validated before block is applied.

func (*Fetch) GetBlocks added in v1.0.0

func (f *Fetch) GetBlocks(ctx context.Context, ids []types.BlockID) error

GetBlocks gets the data for given block IDs from peers.

func (*Fetch) GetCert added in v1.1.0

func (f *Fetch) GetCert(
	ctx context.Context,
	lid types.LayerID,
	bid types.BlockID,
	peers []p2p.Peer,
) (*types.Certificate, error)

func (*Fetch) GetLayerData added in v1.0.0

func (f *Fetch) GetLayerData(ctx context.Context, peer p2p.Peer, lid types.LayerID) ([]byte, error)

GetLayerData get layer data from peers.

func (*Fetch) GetLayerOpinions added in v1.0.0

func (f *Fetch) GetLayerOpinions(ctx context.Context, peer p2p.Peer, lid types.LayerID) ([]byte, error)

func (*Fetch) GetMalfeasanceProofs added in v1.0.0

func (f *Fetch) GetMalfeasanceProofs(ctx context.Context, ids []types.NodeID) error

GetMalfeasanceProofs gets malfeasance proofs for the specified NodeIDs and validates them.

func (*Fetch) GetMaliciousIDs added in v1.0.0

func (f *Fetch) GetMaliciousIDs(ctx context.Context, peer p2p.Peer) ([]types.NodeID, error)

func (*Fetch) GetPoetProof added in v1.0.0

func (f *Fetch) GetPoetProof(ctx context.Context, id types.Hash32) error

GetPoetProof gets poet proof from remote peer.

func (*Fetch) GetProposalTxs added in v1.0.0

func (f *Fetch) GetProposalTxs(ctx context.Context, ids []types.TransactionID) error

GetProposalTxs fetches the txs provided as IDs and validates them, returns an error if one TX failed to be fetched.

func (*Fetch) GetProposals added in v1.0.0

func (f *Fetch) GetProposals(ctx context.Context, ids []types.ProposalID) error

GetProposals gets the data for given proposal IDs from peers.

func (*Fetch) PeerEpochInfo added in v1.0.0

func (f *Fetch) PeerEpochInfo(ctx context.Context, peer p2p.Peer, epoch types.EpochID) (*EpochData, error)

PeerEpochInfo get the epoch info published in the given epoch from the specified peer.

func (*Fetch) PeerMeshHashes added in v1.0.0

func (f *Fetch) PeerMeshHashes(ctx context.Context, peer p2p.Peer, req *MeshHashRequest) (*MeshHashes, error)

func (*Fetch) RegisterPeerHash added in v1.4.1

func (f *Fetch) RegisterPeerHash(peer p2p.Peer, hash types.Hash32)

RegisterPeerHashes registers provided peer for a hash.

func (*Fetch) RegisterPeerHashes added in v1.0.0

func (f *Fetch) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32)

RegisterPeerHashes registers provided peer for a list of hashes.

func (*Fetch) SelectBestShuffled added in v1.2.6

func (f *Fetch) SelectBestShuffled(n int) []p2p.Peer

func (*Fetch) SetValidators added in v1.0.0

func (f *Fetch) SetValidators(
	atx SyncValidator,
	poet SyncValidator,
	ballot SyncValidator,
	activeset SyncValidator,
	block SyncValidator,
	prop SyncValidator,
	txBlock SyncValidator,
	txProposal SyncValidator,
	mal SyncValidator,
)

SetValidators sets the handlers to validate various mesh data fetched from peers.

func (*Fetch) Start

func (f *Fetch) Start() error

Start starts handling fetch requests.

func (*Fetch) Stop

func (f *Fetch) Stop()

Stop stops handling fetch requests.

type HashPeers added in v1.0.0

type HashPeers map[p2p.Peer]struct{}

HashPeers holds registered peers for a hash.

type HashPeersCache added in v1.0.0

type HashPeersCache struct {
	*lru.Cache[types.Hash32, HashPeers]
	// contains filtered or unexported fields
}

HashPeersCache holds lru cache of peers to pull hash from.

func NewHashPeersCache added in v1.0.0

func NewHashPeersCache(size int) (*HashPeersCache, error)

NewHashPeersCache creates a new hash-to-peers cache.

func (*HashPeersCache) Add added in v1.0.0

func (hpc *HashPeersCache) Add(hash types.Hash32, peer p2p.Peer)

Add is a thread-safe version of add.

func (*HashPeersCache) GetRandom added in v1.0.0

func (hpc *HashPeersCache) GetRandom(hash types.Hash32, hint datastore.Hint, rng *rand.Rand) []p2p.Peer

GetRandom returns a randomized list of peers for a given hash.

func (*HashPeersCache) RegisterPeerHashes added in v1.0.0

func (hpc *HashPeersCache) RegisterPeerHashes(peer p2p.Peer, hashes []types.Hash32)

RegisterPeerHashes registers provided peer for a list of hashes.

type LayerData added in v1.0.0

type LayerData struct {
	// Ballots contains the ballots for the given layer.
	//
	// Worst case scenario is that a single smesher identity has > 99.97% of the total weight of the network.
	// In this case they will get all 50 available slots in all 4032 layers of the epoch.
	// Additionally every other identity on the network that successfully published an ATX will get 1 slot.
	//
	// If we expect 8.0 Mio ATXs that would be a total of 8.0 Mio + 50 * 4032 = 8 201 600 slots.
	// Since these are randomly distributed across the epoch, we can expect an average of n * p =
	// 8 201 600 / 4032 = 2034.1 ballots in a layer with a standard deviation of sqrt(n * p * (1 - p)) =
	// sqrt(8 201 600 * 1/4032 * 4031/4032) = 45.1
	//
	// This means that we can expect a maximum of 2034.1 + 6*45.1 = 2304.7 ballots per layer with
	// > 99.9997% probability.
	Ballots []types.BallotID `scale:"max=2350"`
}

LayerData is the data response for a given layer ID.

func (*LayerData) DecodeScale added in v1.0.0

func (t *LayerData) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*LayerData) EncodeScale added in v1.0.0

func (t *LayerData) EncodeScale(enc *scale.Encoder) (total int, err error)

type LayerOpinion added in v1.0.0

type LayerOpinion struct {
	PrevAggHash types.Hash32
	Certified   *types.BlockID
	// contains filtered or unexported fields
}

func (*LayerOpinion) DecodeScale added in v1.0.0

func (t *LayerOpinion) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*LayerOpinion) EncodeScale added in v1.0.0

func (t *LayerOpinion) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*LayerOpinion) MarshalLogObject added in v1.0.0

func (lo *LayerOpinion) MarshalLogObject(encoder zapcore.ObjectEncoder) error

MarshalLogObject implements logging encoder for LayerOpinion.

func (*LayerOpinion) Peer added in v1.0.0

func (lo *LayerOpinion) Peer() p2p.Peer

Peer ...

func (*LayerOpinion) SetPeer added in v1.0.0

func (lo *LayerOpinion) SetPeer(p p2p.Peer)

SetPeer ...

type MaliciousIDs added in v1.0.0

type MaliciousIDs struct {
	NodeIDs []types.NodeID `scale:"max=8000000"` // to be in line with `EpochData.AtxIDs` below
}

func (*MaliciousIDs) DecodeScale added in v1.0.0

func (t *MaliciousIDs) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*MaliciousIDs) EncodeScale added in v1.0.0

func (t *MaliciousIDs) EncodeScale(enc *scale.Encoder) (total int, err error)

type MeshHashRequest added in v1.0.0

type MeshHashRequest struct {
	From, To types.LayerID
	Step     uint32
}

MeshHashRequest is used by ForkFinder to request the hashes of layers from a peer to find the layer at which a divergence occurred in the local mesh of the node.

From and To define the beginning and end layer to request. By defines the increment between layers to limit the number of hashes in the response, e.g. 2 means only every other hash is requested. The number of hashes in the response is limited by `fetch.MaxHashesInReq`.

func NewMeshHashRequest added in v1.0.0

func NewMeshHashRequest(from, to types.LayerID) *MeshHashRequest

func (*MeshHashRequest) Count added in v1.0.0

func (r *MeshHashRequest) Count() uint

func (*MeshHashRequest) DecodeScale added in v1.0.0

func (t *MeshHashRequest) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*MeshHashRequest) EncodeScale added in v1.0.0

func (t *MeshHashRequest) EncodeScale(enc *scale.Encoder) (total int, err error)

func (*MeshHashRequest) MarshalLogObject added in v1.0.0

func (r *MeshHashRequest) MarshalLogObject(encoder zapcore.ObjectEncoder) error

func (*MeshHashRequest) Validate added in v1.0.0

func (r *MeshHashRequest) Validate() error

type MeshHashes added in v1.0.0

type MeshHashes struct {
	// depends on syncer Config `MaxHashesInReq`, defaults to 100, 1000 is a safe upper bound
	Hashes []types.Hash32 `scale:"max=1000"`
}

func (*MeshHashes) DecodeScale added in v1.0.0

func (t *MeshHashes) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*MeshHashes) EncodeScale added in v1.0.0

func (t *MeshHashes) EncodeScale(enc *scale.Encoder) (total int, err error)

type OpinionRequest added in v1.1.0

type OpinionRequest struct {
	Layer types.LayerID
	Block *types.BlockID
}

func (*OpinionRequest) DecodeScale added in v1.1.0

func (t *OpinionRequest) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*OpinionRequest) EncodeScale added in v1.1.0

func (t *OpinionRequest) EncodeScale(enc *scale.Encoder) (total int, err error)

type Option added in v1.0.0

type Option func(*Fetch)

Option is a type to configure a fetcher.

func WithConfig added in v1.0.0

func WithConfig(c Config) Option

WithConfig configures the config for the fetcher.

func WithContext added in v1.0.0

func WithContext(ctx context.Context) Option

WithContext configures the shutdown context for the fetcher.

func WithLogger added in v1.0.0

func WithLogger(log *zap.Logger) Option

WithLogger configures logger for the fetcher.

type PoetValidator added in v1.0.0

type PoetValidator interface {
	ValidateAndStoreMsg(context.Context, types.Hash32, p2p.Peer, []byte) error
}

type RequestBatch added in v1.0.0

type RequestBatch struct {
	ID types.Hash32
	// depends on fetch config `BatchSize` which defaults to 10, more than 100 seems unlikely
	Requests []RequestMessage `scale:"max=100"`
}

RequestBatch is a batch of requests and a hash of all requests as ID.

func (*RequestBatch) DecodeScale added in v1.0.0

func (t *RequestBatch) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*RequestBatch) EncodeScale added in v1.0.0

func (t *RequestBatch) EncodeScale(enc *scale.Encoder) (total int, err error)

type RequestMessage added in v1.0.0

type RequestMessage struct {
	Hint datastore.Hint `scale:"max=256"` // TODO(mafa): covert to an enum
	Hash types.Hash32
}

RequestMessage is sent to the peer for hash query.

func (*RequestMessage) DecodeScale added in v1.0.0

func (t *RequestMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*RequestMessage) EncodeScale added in v1.0.0

func (t *RequestMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

type ResponseBatch added in v1.0.0

type ResponseBatch struct {
	ID types.Hash32
	// depends on fetch config `BatchSize` which defaults to 10, more than 100 seems unlikely
	Responses []ResponseMessage `scale:"max=100"`
}

ResponseBatch is the response struct send for a RequestBatch. The ResponseBatch ID must be the same as stated in RequestBatch even if not all Data is present.

func (*ResponseBatch) DecodeScale added in v1.0.0

func (t *ResponseBatch) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*ResponseBatch) EncodeScale added in v1.0.0

func (t *ResponseBatch) EncodeScale(enc *scale.Encoder) (total int, err error)

type ResponseMessage added in v1.0.0

type ResponseMessage struct {
	Hash types.Hash32
	// keep in line with limit of Response.Data in `p2p/server/server.go`
	Data []byte `scale:"max=272629760"` // 260 MiB > 8.0 mio ATX * 32 bytes per ID
}

ResponseMessage is sent to the node as a response.

func (*ResponseMessage) DecodeScale added in v1.0.0

func (t *ResponseMessage) DecodeScale(dec *scale.Decoder) (total int, err error)

func (*ResponseMessage) EncodeScale added in v1.0.0

func (t *ResponseMessage) EncodeScale(enc *scale.Encoder) (total int, err error)

type ServerConfig added in v1.3.0

type ServerConfig struct {
	Queue    int           `mapstructure:"queue"`
	Requests int           `mapstructure:"requests"`
	Interval time.Duration `mapstructure:"interval"`
}

type SyncValidator added in v1.0.0

type SyncValidator interface {
	HandleMessage(context.Context, types.Hash32, p2p.Peer, []byte) error
}

SyncValidator exists to allow for mocking of GossipHandlers through the use of ValidatorFunc.

type ValidatorFunc added in v1.0.0

type ValidatorFunc pubsub.SyncHandler

The ValidatorFunc type is an adapter to allow the use of functions as SyncValidators so that we can mock the behavior of GossipHandlers. If we didn't need to mock GossipHandler behavior then we could use GossipHandlers directly and do away with both ValidatorFunc and SyncValidator.

func (ValidatorFunc) HandleMessage added in v1.0.0

func (f ValidatorFunc) HandleMessage(
	ctx context.Context,
	hash types.Hash32,
	peer p2p.Peer,
	msg []byte,
) error

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL