Documentation ¶
Index ¶
- Variables
- func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error)
- type Chore
- type Collector
- type Config
- type Containment
- type Outcome
- type PendingAudit
- type PieceAudit
- type PieceLocator
- type Queue
- type Queues
- type Report
- type Reporter
- type Reservoir
- type ReverificationJob
- type ReverifyQueue
- type Segment
- type Share
- type Verifier
- func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator PieceLocator) (outcome Outcome, err error)
- func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, ...) (shares map[int]Share, err error)
- func (verifier *Verifier) GetPiece(ctx context.Context, limit *pb.AddressedOrderLimit, ...) (pieceData []byte, hash *pb.PieceHash, origLimit *pb.OrderLimit, err error)
- func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, ...) (share Share, err error)
- func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report Report, err error)
- func (verifier *Verifier) ReverifyPiece(ctx context.Context, locator PieceLocator) (keepInQueue bool)
- func (verifier *Verifier) SetNow(nowFn func() time.Time)
- func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[storj.NodeID]bool) (report Report, err error)
- type VerifyQueue
- type Worker
Constants ¶
This section is empty.
Variables ¶
var ( // ContainError is the containment errs class. ContainError = errs.Class("containment") // ErrContainedNotFound is the errs class for when a pending audit isn't found. ErrContainedNotFound = errs.Class("pending audit not found") // ErrContainDelete is the errs class for when a pending audit can't be deleted. ErrContainDelete = errs.Class("unable to delete pending audit") )
var ( errs.Class("not enough shares for successful audit") // ErrSegmentDeleted is the errs class when the audited segment was deleted during the audit. ErrSegmentDeleted = errs.Class("segment deleted during audit") // ErrSegmentModified is the errs class used when a segment has been changed in any way. ErrSegmentModified = errs.Class("segment has been modified") )ErrNotEnoughShares =
var ErrEmptyQueue = errs.Class("empty audit queue")
ErrEmptyQueue is used to indicate that the queue is empty.
var ErrPendingQueueInProgress = errs.Class("pending queue already in progress")
ErrPendingQueueInProgress means that a chore attempted to add a new pending queue when one was already being added.
var Error = errs.Class("audit")
Error is the default audit errs class.
Functions ¶
Types ¶
type Chore ¶ added in v0.21.0
Chore populates reservoirs and the audit queue.
architecture: Chore
type Collector ¶ added in v1.26.2
type Collector struct { Reservoirs map[storj.NodeID]*Reservoir // contains filtered or unexported fields }
Collector uses the segment loop to add segments to node reservoirs.
func NewCollector ¶ added in v1.26.2
NewCollector instantiates a segment collector.
func (*Collector) InlineSegment ¶ added in v1.26.2
func (collector *Collector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error)
InlineSegment returns nil because we're only auditing for storage nodes for now.
func (*Collector) LoopStarted ¶ added in v1.27.3
LoopStarted is called at each start of a loop.
func (*Collector) RemoteSegment ¶ added in v1.26.2
RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.
type Config ¶
type Config struct { MaxRetriesStatDB int `help:"max number of times to attempt updating a statdb batch" default:"3"` MinBytesPerSecond memory.Size `` /* 132-byte string literal not displayed */ MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s"` MaxReverifyCount int `help:"limit above which we consider an audit is failed" default:"3"` ChoreInterval time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"` QueueInterval time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"` Slots int `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"` WorkerConcurrency int `help:"number of workers to run audits on segments" default:"2"` }
Config contains configurable values for audit chore and workers.
type Containment ¶
type Containment interface { Get(ctx context.Context, nodeID pb.NodeID) (*PendingAudit, error) IncrementPending(ctx context.Context, pendingAudit *PendingAudit) error Delete(ctx context.Context, nodeID pb.NodeID) (bool, error) }
Containment holds information about pending audits for contained nodes.
architecture: Database
type Outcome ¶ added in v1.67.1
type Outcome int
Outcome enumerates the possible results of a piecewise audit.
Note that it is very similar to reputation.AuditType, but it is different in scope and needs a slightly different set of values.
const ( // OutcomeNotPerformed indicates an audit was not performed, for any of a // variety of reasons, but that it should be reattempted later. OutcomeNotPerformed Outcome = iota // OutcomeNotNecessary indicates that an audit is no longer required, // for example because the segment has been updated or no longer exists. OutcomeNotNecessary // OutcomeSuccess indicates that an audit took place and the piece was // fully validated. OutcomeSuccess // OutcomeFailure indicates that an audit took place but that the node // failed the audit, either because it did not have the piece or the // data was incorrect. OutcomeFailure // OutcomeTimedOut indicates the audit could not be completed because // it took too long. The audit should be retried later. OutcomeTimedOut // OutcomeNodeOffline indicates that the audit could not be completed // because the node could not be contacted. The audit should be // retried later. OutcomeNodeOffline // OutcomeUnknownError indicates that the audit could not be completed // because of an error not otherwise expected or recognized. The // audit should be retried later. OutcomeUnknownError )
type PendingAudit ¶
type PendingAudit struct { NodeID storj.NodeID PieceID storj.PieceID StripeIndex int32 ReverifyCount int32 StreamID uuid.UUID Position metabase.SegmentPosition }
PendingAudit contains info needed for retrying an audit for a contained node.
type PieceAudit ¶ added in v1.40.3
type PieceAudit int
PieceAudit is piece audit status.
const ( // PieceAuditUnknown is unknown piece audit. PieceAuditUnknown PieceAudit = iota // PieceAuditFailure is failed piece audit. PieceAuditFailure // PieceAuditOffline is offline node piece audit. PieceAuditOffline // PieceAuditContained is online but unresponsive node piece audit. PieceAuditContained // PieceAuditSuccess is successful piece audit. PieceAuditSuccess )
func PieceAuditFromErr ¶ added in v1.40.3
func PieceAuditFromErr(err error) PieceAudit
PieceAuditFromErr returns piece audit based on error.
type PieceLocator ¶ added in v1.67.1
type PieceLocator struct { StreamID uuid.UUID Position metabase.SegmentPosition NodeID storj.NodeID PieceNum int }
PieceLocator specifies all information necessary to look up a particular piece on a particular satellite.
type Queue ¶ added in v0.21.0
type Queue struct {
// contains filtered or unexported fields
}
Queue is a list of segments to audit, shared between the reservoir chore and audit workers. It is not safe for concurrent use.
type Queues ¶ added in v1.12.1
type Queues struct {
// contains filtered or unexported fields
}
Queues is a shared resource that keeps track of the next queue to be fetched and swaps with a new queue when ready.
func (*Queues) Fetch ¶ added in v1.12.1
Fetch gets the active queue, clears it, and swaps a pending queue in as the new active queue if available.
func (*Queues) Push ¶ added in v1.12.1
Push waits until the next queue has been fetched (if not empty), then swaps it with the provided pending queue. Push adds a pending queue to be swapped in when ready. If nextQueue is empty, it immediately replaces the queue. Otherwise it creates a swapQueue callback to be called when nextQueue is fetched. Only one call to Push is permitted at a time, otherwise it will return ErrPendingQueueInProgress.
type Report ¶
type Report struct { Successes storj.NodeIDList Fails storj.NodeIDList Offlines storj.NodeIDList PendingAudits []*PendingAudit Unknown storj.NodeIDList NodesReputation map[storj.NodeID]overlay.ReputationStatus }
Report contains audit result. It records whether an audit is able to be completed, the total number of pieces a given audit has conducted for, lists for nodes that succeeded, failed, were offline, have pending audits, or failed for unknown reasons and their current reputation status.
type Reporter ¶
Reporter records audit reports in the overlay and database.
func NewReporter ¶
func NewReporter(log *zap.Logger, reputations *reputation.Service, containment Containment, maxRetries int, maxReverifyCount int32) Reporter
NewReporter instantiates a reporter.
type Reservoir ¶ added in v0.19.0
type Reservoir struct { Segments [maxReservoirSize]segmentloop.Segment // contains filtered or unexported fields }
Reservoir holds a certain number of segments to reflect a random sample.
func NewReservoir ¶ added in v0.19.0
NewReservoir instantiates a Reservoir.
func (*Reservoir) Sample ¶ added in v0.19.0
func (reservoir *Reservoir) Sample(r *rand.Rand, segment *segmentloop.Segment)
Sample makes sure that for every segment in metainfo from index i=size..n-1, compute the relative weight based on segment size, and pick a random floating point number r = rand(0..1), and if r < the relative weight of the segment, select uniformly a random segment reservoir.Segments[rand(0..i)] to replace with segment. See https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao for the algorithm used.
type ReverificationJob ¶ added in v1.67.1
type ReverificationJob struct { Locator PieceLocator InsertedAt time.Time ReverifyCount int }
ReverificationJob represents a job as received from the reverification audit queue.
type ReverifyQueue ¶ added in v1.67.1
type ReverifyQueue interface { Insert(ctx context.Context, piece PieceLocator) (err error) GetNextJob(ctx context.Context) (job ReverificationJob, err error) Remove(ctx context.Context, piece PieceLocator) (wasDeleted bool, err error) }
ReverifyQueue controls manipulation of a queue of pieces to be _re_verified; that is, a node timed out when we requested an audit of the piece, and now we need to follow up with that node until we get a proper answer to the audit. (Or until we try too many times, and disqualify the node.)
type Segment ¶ added in v1.26.2
type Segment struct { StreamID uuid.UUID Position metabase.SegmentPosition ExpiresAt *time.Time EncryptedSize int32 // size of the whole segment (not a piece) }
Segment is a segment to audit.
func NewSegment ¶ added in v1.26.2
func NewSegment(loopSegment segmentloop.Segment) Segment
NewSegment creates a new segment to audit from a metainfo loop segment.
type Verifier ¶
type Verifier struct { OnTestingCheckSegmentAlteredHook func() // contains filtered or unexported fields }
Verifier helps verify the correctness of a given stripe.
architecture: Worker
func NewVerifier ¶
func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier
NewVerifier creates a Verifier.
func (*Verifier) DoReverifyPiece ¶ added in v1.67.1
func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator PieceLocator) (outcome Outcome, err error)
DoReverifyPiece acquires a piece from a single node and verifies its contents, its hash, and its order limit.
func (*Verifier) DownloadShares ¶
func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, stripeIndex int32, shareSize int32) (shares map[int]Share, err error)
DownloadShares downloads shares from the nodes where remote pieces are located.
func (*Verifier) GetPiece ¶ added in v1.67.1
func (verifier *Verifier) GetPiece(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, pieceSize int32) (pieceData []byte, hash *pb.PieceHash, origLimit *pb.OrderLimit, err error)
GetPiece uses the piecestore client to download a piece (and the associated original OrderLimit and PieceHash) from a node.
func (*Verifier) GetShare ¶
func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error)
GetShare use piece store client to download shares from nodes.
func (*Verifier) ReverifyPiece ¶ added in v1.67.1
func (verifier *Verifier) ReverifyPiece(ctx context.Context, locator PieceLocator) (keepInQueue bool)
ReverifyPiece acquires a piece from a single node and verifies its contents, its hash, and its order limit.
type VerifyQueue ¶ added in v1.67.1
type VerifyQueue interface { Push(ctx context.Context, segments []Segment) (err error) Next(ctx context.Context) (Segment, error) }
VerifyQueue controls manipulation of a database-based queue of segments to be verified; that is, segments chosen at random from all segments on the satellite, for which workers should perform audits. We will try to download a stripe of data across all pieces in the segment and ensure that all pieces conform to the same polynomial.