Documentation ¶
Index ¶
- Variables
- func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error)
- type ByStreamIDAndPosition
- type Config
- type Containment
- type ContainmentSyncChore
- type FailurePhase
- type Observer
- func (obs *Observer) Finish(ctx context.Context) (err error)
- func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error)
- func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error)
- func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error)
- type Outcome
- type PieceAudit
- type PieceLocator
- type Report
- type Reporter
- type Reservoir
- type ReverificationJob
- type Reverifier
- func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus, err error)
- func (reverifier *Reverifier) GetPiece(ctx context.Context, limit *pb.AddressedOrderLimit, ...) (pieceData []byte, hash *pb.PieceHash, origLimit *pb.OrderLimit, err error)
- func (reverifier *Reverifier) ReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus)
- type ReverifyQueue
- type ReverifyWorker
- type Segment
- type Share
- type Verifier
- func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, ...) (shares map[int]Share, err error)
- func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, ...) (share Share)
- func (verifier *Verifier) IdentifyContainedNodes(ctx context.Context, segment Segment) (skipList map[storj.NodeID]bool, err error)
- func (verifier *Verifier) SetNow(nowFn func() time.Time)
- func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[storj.NodeID]bool) (report Report, err error)
- type VerifyQueue
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ( // ContainError is the containment errs class. ContainError = errs.Class("containment") // ErrContainedNotFound is the errs class for when a pending audit isn't found. ErrContainedNotFound = errs.Class("pending audit not found") // ErrContainDelete is the errs class for when a pending audit can't be deleted. ErrContainDelete = errs.Class("unable to delete pending audit") )
var ( errs.Class("not enough shares for successful audit") // ErrSegmentDeleted is the errs class when the audited segment was deleted during the audit. ErrSegmentDeleted = errs.Class("segment deleted during audit") // ErrSegmentModified is the errs class used when a segment has been changed in any way. ErrSegmentModified = errs.Class("segment has been modified") )ErrNotEnoughShares =
var ErrEmptyQueue = errs.Class("empty audit queue")
ErrEmptyQueue is used to indicate that the queue is empty.
var Error = errs.Class("audit")
Error is the default audit errs class.
Functions ¶
Types ¶
type ByStreamIDAndPosition ¶ added in v1.68.2
type ByStreamIDAndPosition []Segment
ByStreamIDAndPosition allows sorting of a slice of segments by stream ID and position.
func (ByStreamIDAndPosition) Len ¶ added in v1.68.2
func (b ByStreamIDAndPosition) Len() int
func (ByStreamIDAndPosition) Less ¶ added in v1.68.2
func (b ByStreamIDAndPosition) Less(i, j int) bool
func (ByStreamIDAndPosition) Swap ¶ added in v1.68.2
func (b ByStreamIDAndPosition) Swap(i, j int)
type Config ¶
type Config struct { MaxRetriesStatDB int `help:"max number of times to attempt updating a statdb batch" default:"3"` MinBytesPerSecond memory.Size `` /* 132-byte string literal not displayed */ MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s"` MaxReverifyCount int `help:"limit above which we consider an audit is failed" default:"3"` ChoreInterval time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"` QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"` Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"` VerificationPushBatchSize int `help:"number of audit jobs to push at once to the verification queue" devDefault:"10" releaseDefault:"4096"` WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"` UseRangedLoop bool `help:"whether use Audit observer with ranged loop." default:"true"` ReverifyWorkerConcurrency int `help:"number of workers to run reverify audits on pieces" default:"2"` ReverificationRetryInterval time.Duration `` /* 135-byte string literal not displayed */ ContainmentSyncChoreInterval time.Duration `help:"how often to run the containment-sync chore" releaseDefault:"2h" devDefault:"2m" testDefault:"$TESTINTERVAL"` }
Config contains configurable values for audit chore and workers.
type Containment ¶
type Containment interface { Get(ctx context.Context, nodeID pb.NodeID) (*ReverificationJob, error) Insert(ctx context.Context, job *PieceLocator) error Delete(ctx context.Context, job *PieceLocator) (wasDeleted, nodeStillContained bool, err error) GetAllContainedNodes(ctx context.Context) ([]pb.NodeID, error) }
Containment holds information about pending audits for contained nodes.
architecture: Database
type ContainmentSyncChore ¶ added in v1.73.4
ContainmentSyncChore is a chore to update the set of contained nodes in the overlay cache. This is necessary because it is possible for the "contained" field in the nodes table to disagree with whether a node appears in the reverification queue. We make an effort to keep them in sync when making changes to the reverification queue, but this infrequent chore will clean up any inconsistencies that creep in (because we can't maintain perfect consistency while the reverification queue and the nodes table may be in separate databases). Fortunately, it is acceptable for a node's containment status to be out of date for some amount of time.
func NewContainmentSyncChore ¶ added in v1.73.4
func NewContainmentSyncChore(log *zap.Logger, queue ReverifyQueue, overlay overlay.DB, interval time.Duration) *ContainmentSyncChore
NewContainmentSyncChore creates a new ContainmentSyncChore.
type FailurePhase ¶ added in v1.80.3
type FailurePhase int
FailurePhase indicates during which phase a GET_SHARE operation failed.
const ( // NoFailure indicates there was no failure during a GET_SHARE operation. NoFailure FailurePhase = 0 // DialFailure indicates a GET_SHARE operation failed during Dial. DialFailure FailurePhase = 1 // RequestFailure indicates a GET_SHARE operation failed to make its RPC request, or the request failed. RequestFailure FailurePhase = 2 )
type Observer ¶ added in v1.70.1
type Observer struct { // The follow fields are reset on each segment loop cycle. Reservoirs map[metabase.NodeAlias]*Reservoir // contains filtered or unexported fields }
Observer populates reservoirs and the audit queue.
architecture: Observer
func NewObserver ¶ added in v1.70.1
func NewObserver(log *zap.Logger, queue VerifyQueue, config Config) *Observer
NewObserver instantiates Observer.
func (*Observer) Finish ¶ added in v1.70.1
Finish builds and dedups an audit queue from the merged per-node reservoirs.
func (*Observer) Fork ¶ added in v1.70.1
Fork returns a new audit reservoir collector for the range.
type Outcome ¶ added in v1.67.1
type Outcome int
Outcome enumerates the possible results of a piecewise audit.
Note that it is very similar to reputation.AuditType, but it is different in scope and needs a slightly different set of values.
const ( // OutcomeNotPerformed indicates an audit was not performed, for any of a // variety of reasons, but that it should be reattempted later. OutcomeNotPerformed Outcome = iota // OutcomeNotNecessary indicates that an audit is no longer required, // for example because the segment has been updated or no longer exists. OutcomeNotNecessary // OutcomeSuccess indicates that an audit took place and the piece was // fully validated. OutcomeSuccess // OutcomeFailure indicates that an audit took place but that the node // failed the audit, either because it did not have the piece or the // data was incorrect. OutcomeFailure // OutcomeTimedOut indicates the audit could not be completed because // it took too long. The audit should be retried later. OutcomeTimedOut // OutcomeNodeOffline indicates that the audit could not be completed // because the node could not be contacted. The audit should be // retried later. OutcomeNodeOffline // OutcomeUnknownError indicates that the audit could not be completed // because of an error not otherwise expected or recognized. The // audit should be retried later. OutcomeUnknownError )
type PieceAudit ¶ added in v1.40.3
type PieceAudit int
PieceAudit is piece audit status.
const ( // PieceAuditUnknown is unknown piece audit. PieceAuditUnknown PieceAudit = iota // PieceAuditFailure is failed piece audit. PieceAuditFailure // PieceAuditOffline is offline node piece audit. PieceAuditOffline // PieceAuditContained is online but unresponsive node piece audit. PieceAuditContained // PieceAuditSuccess is successful piece audit. PieceAuditSuccess )
func PieceAuditFromErr ¶ added in v1.40.3
func PieceAuditFromErr(err error) PieceAudit
PieceAuditFromErr returns piece audit based on error.
type PieceLocator ¶ added in v1.67.1
type PieceLocator struct { StreamID uuid.UUID Position metabase.SegmentPosition NodeID storj.NodeID PieceNum int }
PieceLocator specifies all information necessary to look up a particular piece on a particular satellite.
type Report ¶
type Report struct { Successes storj.NodeIDList Fails storj.NodeIDList Offlines storj.NodeIDList PendingAudits []*ReverificationJob Unknown storj.NodeIDList NodesReputation map[storj.NodeID]overlay.ReputationStatus }
Report contains audit result. It records whether an audit is able to be completed, the total number of pieces a given audit has conducted for, lists for nodes that succeeded, failed, were offline, have pending audits, or failed for unknown reasons and their current reputation status.
type Reporter ¶
type Reporter interface { RecordAudits(ctx context.Context, req Report) ReportReverificationNeeded(ctx context.Context, piece *PieceLocator) (err error) RecordReverificationResult(ctx context.Context, pendingJob *ReverificationJob, outcome Outcome, reputation overlay.ReputationStatus) (err error) }
Reporter records audit reports in the overlay and database.
func NewReporter ¶
func NewReporter(log *zap.Logger, reputations *reputation.Service, overlay *overlay.Service, containment Containment, maxRetries int, maxReverifyCount int32) Reporter
NewReporter instantiates a reporter.
type Reservoir ¶ added in v0.19.0
type Reservoir struct {
// contains filtered or unexported fields
}
Reservoir holds a certain number of segments to reflect a random sample.
func NewReservoir ¶ added in v0.19.0
NewReservoir instantiates a Reservoir.
func (*Reservoir) Keys ¶ added in v1.70.1
Keys returns the keys for the segments picked by the reservoir.
func (*Reservoir) Merge ¶ added in v1.70.1
Merge merges the given reservoir into the first. Both reservoirs must have the same size.
func (*Reservoir) Sample ¶ added in v0.19.0
func (reservoir *Reservoir) Sample(r *rand.Rand, segment rangedloop.Segment)
Sample tries to ensure that each segment passed in has a chance (proportional to its size) to be in the reservoir when sampling is complete.
The tricky part is that we do not know ahead of time how many segments will be passed in. The way this is accomplished is known as _Reservoir Sampling_. The specific algorithm we are using here is called A-Res on the Wikipedia article: https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res
func (*Reservoir) Segments ¶ added in v1.26.2
func (reservoir *Reservoir) Segments() []rangedloop.Segment
Segments returns the segments picked by the reservoir.
type ReverificationJob ¶ added in v1.67.1
type ReverificationJob struct { Locator PieceLocator InsertedAt time.Time ReverifyCount int LastAttempt time.Time }
ReverificationJob represents a job as received from the reverification audit queue.
type Reverifier ¶ added in v1.69.1
type Reverifier struct { *Verifier // contains filtered or unexported fields }
Reverifier pulls jobs from the reverification queue and fulfills them by performing the requested reverifications.
architecture: Worker
func NewReverifier ¶ added in v1.69.1
func NewReverifier(log *zap.Logger, verifier *Verifier, db ReverifyQueue, config Config) *Reverifier
NewReverifier creates a Reverifier.
func (*Reverifier) DoReverifyPiece ¶ added in v1.69.1
func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus, err error)
DoReverifyPiece acquires a piece from a single node and verifies its contents, its hash, and its order limit.
func (*Reverifier) GetPiece ¶ added in v1.69.1
func (reverifier *Reverifier) GetPiece(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, pieceSize int32) (pieceData []byte, hash *pb.PieceHash, origLimit *pb.OrderLimit, err error)
GetPiece uses the piecestore client to download a piece (and the associated original OrderLimit and PieceHash) from a node.
func (*Reverifier) ReverifyPiece ¶ added in v1.69.1
func (reverifier *Reverifier) ReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus)
ReverifyPiece acquires a piece from a single node and verifies its contents, its hash, and its order limit.
type ReverifyQueue ¶ added in v1.67.1
type ReverifyQueue interface { Insert(ctx context.Context, piece *PieceLocator) (err error) GetNextJob(ctx context.Context, retryInterval time.Duration) (job *ReverificationJob, err error) Remove(ctx context.Context, piece *PieceLocator) (wasDeleted bool, err error) GetByNodeID(ctx context.Context, nodeID storj.NodeID) (audit *ReverificationJob, err error) GetAllContainedNodes(ctx context.Context) ([]storj.NodeID, error) }
ReverifyQueue controls manipulation of a queue of pieces to be _re_verified; that is, a node timed out when we requested an audit of the piece, and now we need to follow up with that node until we get a proper answer to the audit. (Or until we try too many times, and disqualify the node.)
type ReverifyWorker ¶ added in v1.70.1
ReverifyWorker processes reverifications (retrying piece audits against nodes that timed out during a Verification).
func NewReverifyWorker ¶ added in v1.70.1
func NewReverifyWorker(log *zap.Logger, queue ReverifyQueue, reverifier *Reverifier, reporter Reporter, config Config) *ReverifyWorker
NewReverifyWorker creates a new ReverifyWorker.
func (*ReverifyWorker) Close ¶ added in v1.70.1
func (worker *ReverifyWorker) Close() error
Close halts the worker.
type Segment ¶ added in v1.26.2
type Segment struct { StreamID uuid.UUID Position metabase.SegmentPosition ExpiresAt *time.Time EncryptedSize int32 // size of the whole segment (not a piece) }
Segment is a segment to audit.
func NewSegment ¶ added in v1.26.2
func NewSegment(loopSegment rangedloop.Segment) Segment
NewSegment creates a new segment to audit from a metainfo loop segment.
type Verifier ¶
type Verifier struct { OnTestingCheckSegmentAlteredHook func() // contains filtered or unexported fields }
Verifier helps verify the correctness of a given stripe.
architecture: Worker
func NewVerifier ¶
func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier
NewVerifier creates a Verifier.
func (*Verifier) DownloadShares ¶
func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, stripeIndex int32, shareSize int32) (shares map[int]Share, err error)
DownloadShares downloads shares from the nodes where remote pieces are located.
func (*Verifier) GetShare ¶
func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share)
GetShare use piece store client to download shares from nodes.
func (*Verifier) IdentifyContainedNodes ¶ added in v1.70.1
func (verifier *Verifier) IdentifyContainedNodes(ctx context.Context, segment Segment) (skipList map[storj.NodeID]bool, err error)
IdentifyContainedNodes returns the set of all contained nodes out of the holders of pieces in the given segment.
type VerifyQueue ¶ added in v1.67.1
type VerifyQueue interface { Push(ctx context.Context, segments []Segment, maxBatchSize int) (err error) Next(ctx context.Context) (Segment, error) }
VerifyQueue controls manipulation of a database-based queue of segments to be verified; that is, segments chosen at random from all segments on the satellite, for which workers should perform audits. We will try to download a stripe of data across all pieces in the segment and ensure that all pieces conform to the same polynomial.
type Worker ¶ added in v0.21.0
Worker contains information for populating audit queue and processing audits.
func NewWorker ¶ added in v0.21.0
func NewWorker(log *zap.Logger, queue VerifyQueue, verifier *Verifier, reverifyQueue ReverifyQueue, reporter Reporter, config Config) *Worker
NewWorker instantiates Worker.