retriever

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 13, 2023 License: Apache-2.0, MIT Imports: 25 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrNoCandidates                = errors.New("no candidates")
	ErrUnexpectedRetrieval         = errors.New("unexpected active retrieval")
	ErrHitRetrievalLimit           = errors.New("hit retrieval limit")
	ErrProposalCreationFailed      = errors.New("proposal creation failed")
	ErrRetrievalRegistrationFailed = errors.New("retrieval registration failed")
	ErrRetrievalFailed             = errors.New("retrieval failed")
	ErrAllRetrievalsFailed         = errors.New("all retrievals failed")
	ErrQueryFailed                 = errors.New("query failed")
	ErrAllQueriesFailed            = errors.New("all queries failed")
	ErrRetrievalTimedOut           = errors.New("retrieval timed out")
)

Functions

func RetrievalProposalForAsk

func RetrievalProposalForAsk(ask *retrievalmarket.QueryResponse, c cid.Cid, optionalSelector ipld.Node) (*retrievalmarket.DealProposal, error)

Types

type ActiveRetrievalsManager

type ActiveRetrievalsManager struct {
	// contains filtered or unexported fields
}

ActiveRetrievalsManager tracks the lifecycle of an active retrieval. This currently includes two distinct phases: query, and retrieval. Each phase can involve multiple storage providers. The query phase is assumed to be a parallel operation while the retrieval phase is serial and we can therefore track which single storage provider is currently involved in the retrieval.

One purpose of the retrieval manager is to track the concurrency of retrievals from individual storage providers. When attempting to set the current storage provider to perform a retrieval with using the SetRetrievalCandidate() call, the maximum number of concurrent retrievals from that storage provider can be provided. We can then scan the currently active retrievals and check that we don't exceed the maximum.

The second purpose is to attach a UUID to each retrieval and match that to events we receive from filclient so we can report each retrieval (all phases) using a single unique identifier. Note that it is possible for the retrieval phase to be operating against a different CID than the originally requested, (rootCid vs retrievalCid - when using Estuary rather than the indexer). So finding the right active retrieval is not straightforward.

Determining the "end" of a retrieval is non-trivial since we need to match the number of successes and failures of each phase against the expected number for that phase. See maybeFinish().

func NewActiveRetrievalsManager

func NewActiveRetrievalsManager() *ActiveRetrievalsManager

NewActiveRetrievalsManager instantiates a new ActiveRetrievalsManager

func (*ActiveRetrievalsManager) GetActiveRetrievalCountFor

func (arm *ActiveRetrievalsManager) GetActiveRetrievalCountFor(storageProviderId peer.ID) uint

GetActiveRetrievalCountFor returns the number of active retrievals for a storage provider. Active retrievals only count the retrieval phase, not the query phase requests.

func (*ActiveRetrievalsManager) GetStatusFor

func (arm *ActiveRetrievalsManager) GetStatusFor(retrievalCid cid.Cid, phase eventpublisher.Phase) (uuid.UUID, cid.Cid, time.Time, bool)

GetStatusFor fetches basic information for a retrieval, identified by the original retrieval CID (which may be different to the CID a storage provider is being asked for). The phase is provided here in order to determine which start time to return (query or retrieval).

func (*ActiveRetrievalsManager) New

func (arm *ActiveRetrievalsManager) New(retrievalCid cid.Cid, queryCandidateCount int, retrievalCandidateCount int) (uuid.UUID, error)

New registers a new retrieval, setting the expected number of storage provider candidates for each phase. Typically the retrievalCandidateCount is unknown but setting it to zero may mean that a retrieval is considered finished before we have a chance to set the correct number using a subsequent call to SetRetrievalCandidateCount().

A unique ID is returned for the new retrieval that can be used to identify it over the entire lifecycle.

func (*ActiveRetrievalsManager) QueryCandidatedFinished

func (arm *ActiveRetrievalsManager) QueryCandidatedFinished(retrievalCid cid.Cid)

QueryCandidatedFinished registers that a query has finished with one of the query candidates. It's used to increment the number of finished queries and also check whether cleanup may be necessary.

func (*ActiveRetrievalsManager) RetrievalCandidatedFinished

func (arm *ActiveRetrievalsManager) RetrievalCandidatedFinished(retrievalCid cid.Cid, success bool)

RetrievalCandidatedFinished registers that a retrieval has finished with one of the query candidates. In the case of a failure, it's used to increment the number of finished so we can check the finished count against the number of expected candidates for possible clean-up. In the case of success, we can assume no more retrievals will occur so we can jump straight to clean-up.

func (*ActiveRetrievalsManager) SetRetrievalCandidate

func (arm *ActiveRetrievalsManager) SetRetrievalCandidate(retrievalCid, rootCid cid.Cid, storageProviderId peer.ID, maxConcurrent uint) error

SetRetrievalCandidate updates the current storage provider that we are performing a retrieval (not query) from. It also sets the root CID for this candidate, which may be different to the originally requested CID.

If the maxConcurrent is non-zero, the number of concurrent retrievals (not including queries) being performed from this storage provider is checked and if an additional retrieval would exceed this number ErrHitRetrievalLimit is returned without performing the update.

func (*ActiveRetrievalsManager) SetRetrievalCandidateCount

func (arm *ActiveRetrievalsManager) SetRetrievalCandidateCount(retrievalCid cid.Cid, candidateCount int)

SetRetrievalCandidateCount updates the number of storage provider candidates once we know the number. When the number of finished retrievals equals this number and the number of finished queries equals the query candidate count the full retrieval is considered complete and can be cleaned up.

type BlockConfirmer

type BlockConfirmer func(c cid.Cid) (bool, error)

type CandidateCallback

type CandidateCallback func(RetrievalCandidate) error

type CandidateErrorCallback

type CandidateErrorCallback func(RetrievalCandidate, error)

type CandidateFinder

type CandidateFinder interface {
	FindCandidates(context.Context, cid.Cid) ([]RetrievalCandidate, error)
}

type CounterCallback

type CounterCallback func(int) error

type ErrRetrievalAlreadyRunning

type ErrRetrievalAlreadyRunning struct {
	// contains filtered or unexported fields
}

func (ErrRetrievalAlreadyRunning) Error

type EventManager

type EventManager struct {
	// contains filtered or unexported fields
}

func NewEventManager

func NewEventManager(ctx context.Context) *EventManager

func (*EventManager) FireQueryFailure

func (em *EventManager) FireQueryFailure(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, errString string)

FireQueryFailure calls QueryFailure for all listeners

func (*EventManager) FireQueryProgress

func (em *EventManager) FireQueryProgress(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, stage eventpublisher.Code)

FireQueryProgress calls QueryProgress for all listeners

func (*EventManager) FireQuerySuccess

func (em *EventManager) FireQuerySuccess(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, queryResponse retrievalmarket.QueryResponse)

FireQuerySuccess calls QuerySuccess ("query-asked") for all listeners

func (*EventManager) FireRetrievalFailure

func (em *EventManager) FireRetrievalFailure(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, errString string)

FireRetrievalFailure calls RetrievalFailure for all listeners

func (*EventManager) FireRetrievalProgress

func (em *EventManager) FireRetrievalProgress(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, stage eventpublisher.Code)

FireRetrievalProgress calls RetrievalProgress for all listeners

func (*EventManager) FireRetrievalSuccess

func (em *EventManager) FireRetrievalSuccess(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, receivedSize uint64, receivedCids int64, confirmed bool)

FireRetrievalSuccess calls RetrievalSuccess for all listeners

func (*EventManager) RegisterListener

func (em *EventManager) RegisterListener(listener RetrievalEventListener) func()

type GetStorageProviderTimeout

type GetStorageProviderTimeout func(peer peer.ID) time.Duration

type Instrumentation

type Instrumentation interface {
	// OnRetrievalCandidatesFound is called once after querying the indexer
	OnRetrievalCandidatesFound(foundCount int) error
	// OnRetrievalCandidatesFiltered is called once after filtering is applied to indexer candidates
	OnRetrievalCandidatesFiltered(filteredCount int) error
	// OnErrorQueryingRetrievalCandidate may be called up to once per retrieval candidate
	OnErrorQueryingRetrievalCandidate(candidate RetrievalCandidate, err error)
	// OnErrorRetrievingFromCandidate may be called up to once per retrieval candidate
	OnErrorRetrievingFromCandidate(candidate RetrievalCandidate, err error)
	// OnRetrievalQueryForCandidate may be called up to once per retrieval candidate
	OnRetrievalQueryForCandidate(candidate RetrievalCandidate, queryResponse *retrievalmarket.QueryResponse)
	// OnFilteredRetrievalQueryForCandidate may be called up to once per retrieval candidate
	OnFilteredRetrievalQueryForCandidate(candidate RetrievalCandidate, queryResponse *retrievalmarket.QueryResponse)
	// OnRetrievingFromCandidate may be called up to once per retrieval candidate
	OnRetrievingFromCandidate(candidate RetrievalCandidate)
}

type IsAcceptableQueryResponse

type IsAcceptableQueryResponse func(*retrievalmarket.QueryResponse) bool

type IsAcceptableStorageProvider

type IsAcceptableStorageProvider func(peer peer.ID) bool

type MinerConfig

type MinerConfig struct {
	RetrievalTimeout        time.Duration
	MaxConcurrentRetrievals uint
}

type RetrievalCandidate

type RetrievalCandidate struct {
	MinerPeer peer.AddrInfo
	RootCid   cid.Cid
}

type RetrievalClient

type RetrievalClient interface {
	RetrievalQueryToPeer(ctx context.Context, minerPeer peer.AddrInfo, pcid cid.Cid) (*retrievalmarket.QueryResponse, error)

	RetrieveContentFromPeerAsync(
		ctx context.Context,
		peerID peer.ID,
		minerWallet address.Address,
		proposal *retrievalmarket.DealProposal,
	) (<-chan RetrievalResult, <-chan uint64, func())

	SubscribeToRetrievalEvents(subscriber eventpublisher.RetrievalSubscriber)
}

type RetrievalConfig

type RetrievalConfig struct {
	Instrumentation             Instrumentation
	GetStorageProviderTimeout   GetStorageProviderTimeout
	IsAcceptableStorageProvider IsAcceptableStorageProvider
	IsAcceptableQueryResponse   IsAcceptableQueryResponse
	// contains filtered or unexported fields
}

type RetrievalEventListener

type RetrievalEventListener interface {
	// QueryProgress events occur during the query process, stages.
	// Currently this should just include a "connected" event.
	QueryProgress(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, stage eventpublisher.Code)

	// QueryFailure events occur on the failure of querying a storage
	// provider. A query will result in either a QueryFailure or
	// a QuerySuccess event.
	QueryFailure(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, errString string)

	// QuerySuccess ("query-asked") events occur on successfully querying a storage
	// provider. A query will result in either a QueryFailure or
	// a QuerySuccess event.
	QuerySuccess(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, queryResponse retrievalmarket.QueryResponse)

	// RetrievalProgress events occur during the process of a retrieval. The
	// Success and failure progress event types are not reported here, but are
	// signalled via RetrievalSuccess or RetrievalFailure.
	RetrievalProgress(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, stage eventpublisher.Code)

	// RetrievalSuccess events occur on the success of a retrieval. A retrieval
	// will result in either a QueryFailure or a QuerySuccess
	// event.
	RetrievalSuccess(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, receivedSize uint64, receivedCids int64, confirmed bool)

	// RetrievalFailure events occur on the failure of a retrieval. A retrieval
	// will result in either a QueryFailure or a QuerySuccess
	// event.
	RetrievalFailure(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, errString string)
}

RetrievalEventListener defines a type that receives events fired during a retrieval process, including the process of querying available storage providers to find compatible ones to attempt retrieval from.

type RetrievalResult

type RetrievalResult struct {
	*RetrievalStats
	Err error
}

type RetrievalStats

type RetrievalStats struct {
	StorageProviderId peer.ID
	RootCid           cid.Cid
	Size              uint64
	Duration          time.Duration
	AverageSpeed      uint64
	TotalPayment      abi.TokenAmount
	NumPayments       int
	AskPrice          abi.TokenAmount
}

func RetrieveFromCandidates

func RetrieveFromCandidates(
	ctx context.Context,
	cfg *RetrievalConfig,
	candidateFinder CandidateFinder,
	client RetrievalClient,
	cid cid.Cid,
) (*RetrievalStats, error)

RetrieveFromCandidates performs a retrieval for a given CID by querying the indexer, then attempting to query all candidates and attempting to perform a full retrieval from the best and fastest storage provider as the queries are received.

type RetrievalSubscriber

type RetrievalSubscriber interface {
	OnRetrievalEvent(eventpublisher.RetrievalEvent)
}

type Retriever

type Retriever struct {
	// contains filtered or unexported fields
}

func NewRetriever

func NewRetriever(
	ctx context.Context,
	config RetrieverConfig,
	client RetrievalClient,
	candidateFinder CandidateFinder,
	confirmer BlockConfirmer,
) (*Retriever, error)

func (*Retriever) OnRetrievalEvent

func (retriever *Retriever) OnRetrievalEvent(event eventpublisher.RetrievalEvent)

Implement RetrievalSubscriber

func (*Retriever) RegisterListener

func (retriever *Retriever) RegisterListener(listener RetrievalEventListener) func()

RegisterListener registers a listener to receive all events fired during the process of making a retrieval, including the process of querying available storage providers to find compatible ones to attempt retrieval from.

func (*Retriever) Retrieve

func (retriever *Retriever) Retrieve(ctx context.Context, cid cid.Cid) (*RetrievalStats, error)

Retrieve attempts to retrieve the given CID using the configured CandidateFinder to find storage providers that should have the CID.

type RetrieverConfig

type RetrieverConfig struct {
	MinerBlacklist     map[peer.ID]bool
	MinerWhitelist     map[peer.ID]bool
	DefaultMinerConfig MinerConfig
	MinerConfigs       map[peer.ID]MinerConfig
	PaidRetrievals     bool
}

All config values should be safe to leave uninitialized

func (*RetrieverConfig) GetMinerConfig

func (cfg *RetrieverConfig) GetMinerConfig(peer peer.ID) MinerConfig

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL