Documentation ¶
Index ¶
- Variables
- func RetrievalProposalForAsk(ask *retrievalmarket.QueryResponse, c cid.Cid, optionalSelector ipld.Node) (*retrievalmarket.DealProposal, error)
- type ActiveRetrievalsManager
- func (arm *ActiveRetrievalsManager) GetActiveRetrievalCountFor(storageProviderId peer.ID) uint
- func (arm *ActiveRetrievalsManager) GetStatusFor(retrievalCid cid.Cid, phase eventpublisher.Phase) (uuid.UUID, cid.Cid, time.Time, bool)
- func (arm *ActiveRetrievalsManager) New(retrievalCid cid.Cid, queryCandidateCount int, retrievalCandidateCount int) (uuid.UUID, error)
- func (arm *ActiveRetrievalsManager) QueryCandidatedFinished(retrievalCid cid.Cid)
- func (arm *ActiveRetrievalsManager) RetrievalCandidatedFinished(retrievalCid cid.Cid, success bool)
- func (arm *ActiveRetrievalsManager) SetRetrievalCandidate(retrievalCid, rootCid cid.Cid, storageProviderId peer.ID, maxConcurrent uint) error
- func (arm *ActiveRetrievalsManager) SetRetrievalCandidateCount(retrievalCid cid.Cid, candidateCount int)
- type BlockConfirmer
- type CandidateCallback
- type CandidateErrorCallback
- type CandidateFinder
- type CounterCallback
- type ErrRetrievalAlreadyRunning
- type EventManager
- func (em *EventManager) FireQueryFailure(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, ...)
- func (em *EventManager) FireQueryProgress(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, ...)
- func (em *EventManager) FireQuerySuccess(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, ...)
- func (em *EventManager) FireRetrievalFailure(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, ...)
- func (em *EventManager) FireRetrievalProgress(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, ...)
- func (em *EventManager) FireRetrievalSuccess(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, ...)
- func (em *EventManager) RegisterListener(listener RetrievalEventListener) func()
- type GetStorageProviderTimeout
- type Instrumentation
- type IsAcceptableQueryResponse
- type IsAcceptableStorageProvider
- type MinerConfig
- type RetrievalCandidate
- type RetrievalClient
- type RetrievalConfig
- type RetrievalEventListener
- type RetrievalResult
- type RetrievalStats
- type RetrievalSubscriber
- type Retriever
- type RetrieverConfig
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoCandidates = errors.New("no candidates") ErrUnexpectedRetrieval = errors.New("unexpected active retrieval") ErrHitRetrievalLimit = errors.New("hit retrieval limit") ErrProposalCreationFailed = errors.New("proposal creation failed") ErrRetrievalRegistrationFailed = errors.New("retrieval registration failed") ErrRetrievalFailed = errors.New("retrieval failed") ErrAllRetrievalsFailed = errors.New("all retrievals failed") ErrQueryFailed = errors.New("query failed") ErrAllQueriesFailed = errors.New("all queries failed") ErrRetrievalTimedOut = errors.New("retrieval timed out") )
Functions ¶
func RetrievalProposalForAsk ¶
func RetrievalProposalForAsk(ask *retrievalmarket.QueryResponse, c cid.Cid, optionalSelector ipld.Node) (*retrievalmarket.DealProposal, error)
Types ¶
type ActiveRetrievalsManager ¶
type ActiveRetrievalsManager struct {
// contains filtered or unexported fields
}
ActiveRetrievalsManager tracks the lifecycle of an active retrieval. This currently includes two distinct phases: query, and retrieval. Each phase can involve multiple storage providers. The query phase is assumed to be a parallel operation while the retrieval phase is serial and we can therefore track which single storage provider is currently involved in the retrieval.
One purpose of the retrieval manager is to track the concurrency of retrievals from individual storage providers. When attempting to set the current storage provider to perform a retrieval with using the SetRetrievalCandidate() call, the maximum number of concurrent retrievals from that storage provider can be provided. We can then scan the currently active retrievals and check that we don't exceed the maximum.
The second purpose is to attach a UUID to each retrieval and match that to events we receive from filclient so we can report each retrieval (all phases) using a single unique identifier. Note that it is possible for the retrieval phase to be operating against a different CID than the originally requested, (rootCid vs retrievalCid - when using Estuary rather than the indexer). So finding the right active retrieval is not straightforward.
Determining the "end" of a retrieval is non-trivial since we need to match the number of successes and failures of each phase against the expected number for that phase. See maybeFinish().
func NewActiveRetrievalsManager ¶
func NewActiveRetrievalsManager() *ActiveRetrievalsManager
NewActiveRetrievalsManager instantiates a new ActiveRetrievalsManager
func (*ActiveRetrievalsManager) GetActiveRetrievalCountFor ¶
func (arm *ActiveRetrievalsManager) GetActiveRetrievalCountFor(storageProviderId peer.ID) uint
GetActiveRetrievalCountFor returns the number of active retrievals for a storage provider. Active retrievals only count the retrieval phase, not the query phase requests.
func (*ActiveRetrievalsManager) GetStatusFor ¶
func (arm *ActiveRetrievalsManager) GetStatusFor(retrievalCid cid.Cid, phase eventpublisher.Phase) (uuid.UUID, cid.Cid, time.Time, bool)
GetStatusFor fetches basic information for a retrieval, identified by the original retrieval CID (which may be different to the CID a storage provider is being asked for). The phase is provided here in order to determine which start time to return (query or retrieval).
func (*ActiveRetrievalsManager) New ¶
func (arm *ActiveRetrievalsManager) New(retrievalCid cid.Cid, queryCandidateCount int, retrievalCandidateCount int) (uuid.UUID, error)
New registers a new retrieval, setting the expected number of storage provider candidates for each phase. Typically the retrievalCandidateCount is unknown but setting it to zero may mean that a retrieval is considered finished before we have a chance to set the correct number using a subsequent call to SetRetrievalCandidateCount().
A unique ID is returned for the new retrieval that can be used to identify it over the entire lifecycle.
func (*ActiveRetrievalsManager) QueryCandidatedFinished ¶
func (arm *ActiveRetrievalsManager) QueryCandidatedFinished(retrievalCid cid.Cid)
QueryCandidatedFinished registers that a query has finished with one of the query candidates. It's used to increment the number of finished queries and also check whether cleanup may be necessary.
func (*ActiveRetrievalsManager) RetrievalCandidatedFinished ¶
func (arm *ActiveRetrievalsManager) RetrievalCandidatedFinished(retrievalCid cid.Cid, success bool)
RetrievalCandidatedFinished registers that a retrieval has finished with one of the query candidates. In the case of a failure, it's used to increment the number of finished so we can check the finished count against the number of expected candidates for possible clean-up. In the case of success, we can assume no more retrievals will occur so we can jump straight to clean-up.
func (*ActiveRetrievalsManager) SetRetrievalCandidate ¶
func (arm *ActiveRetrievalsManager) SetRetrievalCandidate(retrievalCid, rootCid cid.Cid, storageProviderId peer.ID, maxConcurrent uint) error
SetRetrievalCandidate updates the current storage provider that we are performing a retrieval (not query) from. It also sets the root CID for this candidate, which may be different to the originally requested CID.
If the maxConcurrent is non-zero, the number of concurrent retrievals (not including queries) being performed from this storage provider is checked and if an additional retrieval would exceed this number ErrHitRetrievalLimit is returned without performing the update.
func (*ActiveRetrievalsManager) SetRetrievalCandidateCount ¶
func (arm *ActiveRetrievalsManager) SetRetrievalCandidateCount(retrievalCid cid.Cid, candidateCount int)
SetRetrievalCandidateCount updates the number of storage provider candidates once we know the number. When the number of finished retrievals equals this number and the number of finished queries equals the query candidate count the full retrieval is considered complete and can be cleaned up.
type BlockConfirmer ¶
type CandidateCallback ¶
type CandidateCallback func(RetrievalCandidate) error
type CandidateErrorCallback ¶
type CandidateErrorCallback func(RetrievalCandidate, error)
type CandidateFinder ¶
type CandidateFinder interface {
FindCandidates(context.Context, cid.Cid) ([]RetrievalCandidate, error)
}
type CounterCallback ¶
type ErrRetrievalAlreadyRunning ¶
type ErrRetrievalAlreadyRunning struct {
// contains filtered or unexported fields
}
func (ErrRetrievalAlreadyRunning) Error ¶
func (e ErrRetrievalAlreadyRunning) Error() string
type EventManager ¶
type EventManager struct {
// contains filtered or unexported fields
}
func NewEventManager ¶
func NewEventManager(ctx context.Context) *EventManager
func (*EventManager) FireQueryFailure ¶
func (em *EventManager) FireQueryFailure(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, errString string)
FireQueryFailure calls QueryFailure for all listeners
func (*EventManager) FireQueryProgress ¶
func (em *EventManager) FireQueryProgress(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, stage eventpublisher.Code)
FireQueryProgress calls QueryProgress for all listeners
func (*EventManager) FireQuerySuccess ¶
func (em *EventManager) FireQuerySuccess(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, queryResponse retrievalmarket.QueryResponse)
FireQuerySuccess calls QuerySuccess ("query-asked") for all listeners
func (*EventManager) FireRetrievalFailure ¶
func (em *EventManager) FireRetrievalFailure(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, errString string)
FireRetrievalFailure calls RetrievalFailure for all listeners
func (*EventManager) FireRetrievalProgress ¶
func (em *EventManager) FireRetrievalProgress(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, stage eventpublisher.Code)
FireRetrievalProgress calls RetrievalProgress for all listeners
func (*EventManager) FireRetrievalSuccess ¶
func (em *EventManager) FireRetrievalSuccess(retrievalId uuid.UUID, requestedCid cid.Cid, phaseStartTime time.Time, storageProviderId peer.ID, receivedSize uint64, receivedCids int64, confirmed bool)
FireRetrievalSuccess calls RetrievalSuccess for all listeners
func (*EventManager) RegisterListener ¶
func (em *EventManager) RegisterListener(listener RetrievalEventListener) func()
type Instrumentation ¶
type Instrumentation interface { // OnRetrievalCandidatesFound is called once after querying the indexer OnRetrievalCandidatesFound(foundCount int) error // OnRetrievalCandidatesFiltered is called once after filtering is applied to indexer candidates OnRetrievalCandidatesFiltered(filteredCount int) error // OnErrorQueryingRetrievalCandidate may be called up to once per retrieval candidate OnErrorQueryingRetrievalCandidate(candidate RetrievalCandidate, err error) // OnErrorRetrievingFromCandidate may be called up to once per retrieval candidate OnErrorRetrievingFromCandidate(candidate RetrievalCandidate, err error) // OnRetrievalQueryForCandidate may be called up to once per retrieval candidate OnRetrievalQueryForCandidate(candidate RetrievalCandidate, queryResponse *retrievalmarket.QueryResponse) // OnFilteredRetrievalQueryForCandidate may be called up to once per retrieval candidate OnFilteredRetrievalQueryForCandidate(candidate RetrievalCandidate, queryResponse *retrievalmarket.QueryResponse) // OnRetrievingFromCandidate may be called up to once per retrieval candidate OnRetrievingFromCandidate(candidate RetrievalCandidate) }
type IsAcceptableQueryResponse ¶
type IsAcceptableQueryResponse func(*retrievalmarket.QueryResponse) bool
type MinerConfig ¶
type RetrievalCandidate ¶
type RetrievalClient ¶
type RetrievalClient interface { RetrievalQueryToPeer(ctx context.Context, minerPeer peer.AddrInfo, pcid cid.Cid) (*retrievalmarket.QueryResponse, error) RetrieveContentFromPeerAsync( ctx context.Context, peerID peer.ID, minerWallet address.Address, proposal *retrievalmarket.DealProposal, ) (<-chan RetrievalResult, <-chan uint64, func()) SubscribeToRetrievalEvents(subscriber eventpublisher.RetrievalSubscriber) }
type RetrievalConfig ¶
type RetrievalConfig struct { Instrumentation Instrumentation GetStorageProviderTimeout GetStorageProviderTimeout IsAcceptableStorageProvider IsAcceptableStorageProvider IsAcceptableQueryResponse IsAcceptableQueryResponse // contains filtered or unexported fields }
type RetrievalEventListener ¶
type RetrievalEventListener interface { // QueryProgress events occur during the query process, stages. // Currently this should just include a "connected" event. QueryProgress(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, stage eventpublisher.Code) // QueryFailure events occur on the failure of querying a storage // provider. A query will result in either a QueryFailure or // a QuerySuccess event. QueryFailure(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, errString string) // QuerySuccess ("query-asked") events occur on successfully querying a storage // provider. A query will result in either a QueryFailure or // a QuerySuccess event. QuerySuccess(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, queryResponse retrievalmarket.QueryResponse) // RetrievalProgress events occur during the process of a retrieval. The // Success and failure progress event types are not reported here, but are // signalled via RetrievalSuccess or RetrievalFailure. RetrievalProgress(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, stage eventpublisher.Code) // RetrievalSuccess events occur on the success of a retrieval. A retrieval // will result in either a QueryFailure or a QuerySuccess // event. RetrievalSuccess(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, receivedSize uint64, receivedCids int64, confirmed bool) // RetrievalFailure events occur on the failure of a retrieval. A retrieval // will result in either a QueryFailure or a QuerySuccess // event. RetrievalFailure(retrievalId uuid.UUID, phaseStartTime, eventTime time.Time, requestedCid cid.Cid, storageProviderId peer.ID, errString string) }
RetrievalEventListener defines a type that receives events fired during a retrieval process, including the process of querying available storage providers to find compatible ones to attempt retrieval from.
type RetrievalResult ¶
type RetrievalResult struct { *RetrievalStats Err error }
type RetrievalStats ¶
type RetrievalStats struct { StorageProviderId peer.ID RootCid cid.Cid Size uint64 Duration time.Duration AverageSpeed uint64 TotalPayment abi.TokenAmount NumPayments int AskPrice abi.TokenAmount }
func RetrieveFromCandidates ¶
func RetrieveFromCandidates( ctx context.Context, cfg *RetrievalConfig, candidateFinder CandidateFinder, client RetrievalClient, cid cid.Cid, ) (*RetrievalStats, error)
RetrieveFromCandidates performs a retrieval for a given CID by querying the indexer, then attempting to query all candidates and attempting to perform a full retrieval from the best and fastest storage provider as the queries are received.
type RetrievalSubscriber ¶
type RetrievalSubscriber interface {
OnRetrievalEvent(eventpublisher.RetrievalEvent)
}
type Retriever ¶
type Retriever struct {
// contains filtered or unexported fields
}
func NewRetriever ¶
func NewRetriever( ctx context.Context, config RetrieverConfig, client RetrievalClient, candidateFinder CandidateFinder, confirmer BlockConfirmer, ) (*Retriever, error)
func (*Retriever) OnRetrievalEvent ¶
func (retriever *Retriever) OnRetrievalEvent(event eventpublisher.RetrievalEvent)
Implement RetrievalSubscriber
func (*Retriever) RegisterListener ¶
func (retriever *Retriever) RegisterListener(listener RetrievalEventListener) func()
RegisterListener registers a listener to receive all events fired during the process of making a retrieval, including the process of querying available storage providers to find compatible ones to attempt retrieval from.
type RetrieverConfig ¶
type RetrieverConfig struct { MinerBlacklist map[peer.ID]bool MinerWhitelist map[peer.ID]bool DefaultMinerConfig MinerConfig MinerConfigs map[peer.ID]MinerConfig PaidRetrievals bool }
All config values should be safe to leave uninitialized
func (*RetrieverConfig) GetMinerConfig ¶
func (cfg *RetrieverConfig) GetMinerConfig(peer peer.ID) MinerConfig