audit

package
v1.95.0-rc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2024 License: AGPL-3.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ContainError is the containment errs class.
	ContainError = errs.Class("containment")

	// ErrContainedNotFound is the errs class for when a pending audit isn't found.
	ErrContainedNotFound = errs.Class("pending audit not found")

	// ErrContainDelete is the errs class for when a pending audit can't be deleted.
	ErrContainDelete = errs.Class("unable to delete pending audit")
)
View Source
var (

	// ErrNotEnoughShares is the errs class for when not enough shares are available to do an audit.
	ErrNotEnoughShares = errs.Class("not enough shares for successful audit")
	// ErrSegmentDeleted is the errs class when the audited segment was deleted during the audit.
	ErrSegmentDeleted = errs.Class("segment deleted during audit")
	// ErrSegmentModified is the errs class used when a segment has been changed in any way.
	ErrSegmentModified = errs.Class("segment has been modified")
)
View Source
var ErrEmptyQueue = errs.Class("empty audit queue")

ErrEmptyQueue is used to indicate that the queue is empty.

View Source
var Error = errs.Class("audit")

Error is the default audit errs class.

Functions

func GetRandomStripe added in v0.21.0

func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error)

GetRandomStripe takes a segment and returns a random stripe index within that segment.

Types

type ByStreamIDAndPosition added in v1.68.2

type ByStreamIDAndPosition []Segment

ByStreamIDAndPosition allows sorting of a slice of segments by stream ID and position.

func (ByStreamIDAndPosition) Len added in v1.68.2

func (b ByStreamIDAndPosition) Len() int

func (ByStreamIDAndPosition) Less added in v1.68.2

func (b ByStreamIDAndPosition) Less(i, j int) bool

func (ByStreamIDAndPosition) Swap added in v1.68.2

func (b ByStreamIDAndPosition) Swap(i, j int)

type Config

type Config struct {
	MaxRetriesStatDB   int           `help:"max number of times to attempt updating a statdb batch" default:"3"`
	MinBytesPerSecond  memory.Size   `` /* 132-byte string literal not displayed */
	MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s"`
	MaxReverifyCount   int           `help:"limit above which we consider an audit is failed" default:"3"`

	ChoreInterval             time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
	QueueInterval             time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
	Slots                     int           `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
	VerificationPushBatchSize int           `help:"number of audit jobs to push at once to the verification queue" devDefault:"10" releaseDefault:"4096"`
	WorkerConcurrency         int           `help:"number of workers to run audits on segments" default:"2"`
	UseRangedLoop             bool          `help:"whether use Audit observer with ranged loop." default:"true"`

	ReverifyWorkerConcurrency   int           `help:"number of workers to run reverify audits on pieces" default:"2"`
	ReverificationRetryInterval time.Duration `` /* 135-byte string literal not displayed */

	ContainmentSyncChoreInterval time.Duration `help:"how often to run the containment-sync chore" releaseDefault:"2h" devDefault:"2m" testDefault:"$TESTINTERVAL"`
}

Config contains configurable values for audit chore and workers.

type Containment

type Containment interface {
	Get(ctx context.Context, nodeID pb.NodeID) (*ReverificationJob, error)
	Insert(ctx context.Context, job *PieceLocator) error
	Delete(ctx context.Context, job *PieceLocator) (wasDeleted, nodeStillContained bool, err error)
	GetAllContainedNodes(ctx context.Context) ([]pb.NodeID, error)
}

Containment holds information about pending audits for contained nodes.

architecture: Database

type ContainmentSyncChore added in v1.73.4

type ContainmentSyncChore struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

ContainmentSyncChore is a chore to update the set of contained nodes in the overlay cache. This is necessary because it is possible for the "contained" field in the nodes table to disagree with whether a node appears in the reverification queue. We make an effort to keep them in sync when making changes to the reverification queue, but this infrequent chore will clean up any inconsistencies that creep in (because we can't maintain perfect consistency while the reverification queue and the nodes table may be in separate databases). Fortunately, it is acceptable for a node's containment status to be out of date for some amount of time.

func NewContainmentSyncChore added in v1.73.4

func NewContainmentSyncChore(log *zap.Logger, queue ReverifyQueue, overlay overlay.DB, interval time.Duration) *ContainmentSyncChore

NewContainmentSyncChore creates a new ContainmentSyncChore.

func (*ContainmentSyncChore) Run added in v1.73.4

func (rc *ContainmentSyncChore) Run(ctx context.Context) (err error)

Run runs the reverify chore.

type FailurePhase added in v1.80.3

type FailurePhase int

FailurePhase indicates during which phase a GET_SHARE operation failed.

const (
	// NoFailure indicates there was no failure during a GET_SHARE operation.
	NoFailure FailurePhase = 0
	// DialFailure indicates a GET_SHARE operation failed during Dial.
	DialFailure FailurePhase = 1
	// RequestFailure indicates a GET_SHARE operation failed to make its RPC request, or the request failed.
	RequestFailure FailurePhase = 2
)

type Observer added in v1.70.1

type Observer struct {

	// The follow fields are reset on each segment loop cycle.
	Reservoirs map[metabase.NodeAlias]*Reservoir
	// contains filtered or unexported fields
}

Observer populates reservoirs and the audit queue.

architecture: Observer

func NewObserver added in v1.70.1

func NewObserver(log *zap.Logger, queue VerifyQueue, config Config) *Observer

NewObserver instantiates Observer.

func (*Observer) Finish added in v1.70.1

func (obs *Observer) Finish(ctx context.Context) (err error)

Finish builds and dedups an audit queue from the merged per-node reservoirs.

func (*Observer) Fork added in v1.70.1

func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error)

Fork returns a new audit reservoir collector for the range.

func (*Observer) Join added in v1.70.1

func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error)

Join merges the audit reservoir collector into the per-node reservoirs.

func (*Observer) Start added in v1.70.1

func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error)

Start prepares the observer for audit segment collection.

type Outcome added in v1.67.1

type Outcome int

Outcome enumerates the possible results of a piecewise audit.

Note that it is very similar to reputation.AuditType, but it is different in scope and needs a slightly different set of values.

const (
	// OutcomeNotPerformed indicates an audit was not performed, for any of a
	// variety of reasons, but that it should be reattempted later.
	OutcomeNotPerformed Outcome = iota
	// OutcomeNotNecessary indicates that an audit is no longer required,
	// for example because the segment has been updated or no longer exists.
	OutcomeNotNecessary
	// OutcomeSuccess indicates that an audit took place and the piece was
	// fully validated.
	OutcomeSuccess
	// OutcomeFailure indicates that an audit took place but that the node
	// failed the audit, either because it did not have the piece or the
	// data was incorrect.
	OutcomeFailure
	// OutcomeTimedOut indicates the audit could not be completed because
	// it took too long. The audit should be retried later.
	OutcomeTimedOut
	// OutcomeNodeOffline indicates that the audit could not be completed
	// because the node could not be contacted. The audit should be
	// retried later.
	OutcomeNodeOffline
	// OutcomeUnknownError indicates that the audit could not be completed
	// because of an error not otherwise expected or recognized. The
	// audit should be retried later.
	OutcomeUnknownError
)

type PieceAudit added in v1.40.3

type PieceAudit int

PieceAudit is piece audit status.

const (
	// PieceAuditUnknown is unknown piece audit.
	PieceAuditUnknown PieceAudit = iota
	// PieceAuditFailure is failed piece audit.
	PieceAuditFailure
	// PieceAuditOffline is offline node piece audit.
	PieceAuditOffline
	// PieceAuditContained is online but unresponsive node piece audit.
	PieceAuditContained
	// PieceAuditSuccess is successful piece audit.
	PieceAuditSuccess
)

func PieceAuditFromErr added in v1.40.3

func PieceAuditFromErr(err error) PieceAudit

PieceAuditFromErr returns piece audit based on error.

type PieceLocator added in v1.67.1

type PieceLocator struct {
	StreamID uuid.UUID
	Position metabase.SegmentPosition
	NodeID   storj.NodeID
	PieceNum int
}

PieceLocator specifies all information necessary to look up a particular piece on a particular satellite.

type Report

type Report struct {
	Segment *metabase.Segment

	Successes       storj.NodeIDList
	Fails           metabase.Pieces
	Offlines        storj.NodeIDList
	PendingAudits   []*ReverificationJob
	Unknown         storj.NodeIDList
	NodesReputation map[storj.NodeID]overlay.ReputationStatus
}

Report contains audit result. It records whether an audit is able to be completed, the total number of pieces a given audit has conducted for, lists for nodes that succeeded, failed, were offline, have pending audits, or failed for unknown reasons and their current reputation status.

type Reporter

type Reporter interface {
	RecordAudits(ctx context.Context, req Report)
	ReportReverificationNeeded(ctx context.Context, piece *PieceLocator) (err error)
	RecordReverificationResult(ctx context.Context, pendingJob *ReverificationJob, outcome Outcome, reputation overlay.ReputationStatus) (err error)
}

Reporter records audit reports in the overlay and database.

func NewReporter

func NewReporter(log *zap.Logger, reputations *reputation.Service, overlay *overlay.Service, metabase *metabase.DB, containment Containment, maxRetries int, maxReverifyCount int32) Reporter

NewReporter instantiates a reporter.

type Reservoir added in v0.19.0

type Reservoir struct {
	// contains filtered or unexported fields
}

Reservoir holds a certain number of segments to reflect a random sample.

func NewReservoir added in v0.19.0

func NewReservoir(size int) *Reservoir

NewReservoir instantiates a Reservoir.

func (*Reservoir) Keys added in v1.70.1

func (reservoir *Reservoir) Keys() []float64

Keys returns the keys for the segments picked by the reservoir.

func (*Reservoir) Merge added in v1.70.1

func (reservoir *Reservoir) Merge(operand *Reservoir) error

Merge merges the given reservoir into the first. Both reservoirs must have the same size.

func (*Reservoir) Sample added in v0.19.0

func (reservoir *Reservoir) Sample(r *rand.Rand, segment rangedloop.Segment)

Sample tries to ensure that each segment passed in has a chance (proportional to its size) to be in the reservoir when sampling is complete.

The tricky part is that we do not know ahead of time how many segments will be passed in. The way this is accomplished is known as _Reservoir Sampling_. The specific algorithm we are using here is called A-Res on the Wikipedia article: https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Res

func (*Reservoir) Segments added in v1.26.2

func (reservoir *Reservoir) Segments() []rangedloop.Segment

Segments returns the segments picked by the reservoir.

type ReverificationJob added in v1.67.1

type ReverificationJob struct {
	Locator       PieceLocator
	InsertedAt    time.Time
	ReverifyCount int
	LastAttempt   time.Time
}

ReverificationJob represents a job as received from the reverification audit queue.

type Reverifier added in v1.69.1

type Reverifier struct {
	*Verifier
	// contains filtered or unexported fields
}

Reverifier pulls jobs from the reverification queue and fulfills them by performing the requested reverifications.

architecture: Worker

func NewReverifier added in v1.69.1

func NewReverifier(log *zap.Logger, verifier *Verifier, db ReverifyQueue, config Config) *Reverifier

NewReverifier creates a Reverifier.

func (*Reverifier) DoReverifyPiece added in v1.69.1

func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus, err error)

DoReverifyPiece acquires a piece from a single node and verifies its contents, its hash, and its order limit.

func (*Reverifier) GetPiece added in v1.69.1

func (reverifier *Reverifier) GetPiece(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, pieceSize int32) (pieceData []byte, hash *pb.PieceHash, origLimit *pb.OrderLimit, err error)

GetPiece uses the piecestore client to download a piece (and the associated original OrderLimit and PieceHash) from a node.

func (*Reverifier) ReverifyPiece added in v1.69.1

func (reverifier *Reverifier) ReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus)

ReverifyPiece acquires a piece from a single node and verifies its contents, its hash, and its order limit.

type ReverifyQueue added in v1.67.1

type ReverifyQueue interface {
	Insert(ctx context.Context, piece *PieceLocator) (err error)
	GetNextJob(ctx context.Context, retryInterval time.Duration) (job *ReverificationJob, err error)
	Remove(ctx context.Context, piece *PieceLocator) (wasDeleted bool, err error)
	GetByNodeID(ctx context.Context, nodeID storj.NodeID) (audit *ReverificationJob, err error)
	GetAllContainedNodes(ctx context.Context) ([]storj.NodeID, error)
}

ReverifyQueue controls manipulation of a queue of pieces to be _re_verified; that is, a node timed out when we requested an audit of the piece, and now we need to follow up with that node until we get a proper answer to the audit. (Or until we try too many times, and disqualify the node.)

type ReverifyWorker added in v1.70.1

type ReverifyWorker struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

ReverifyWorker processes reverifications (retrying piece audits against nodes that timed out during a Verification).

func NewReverifyWorker added in v1.70.1

func NewReverifyWorker(log *zap.Logger, queue ReverifyQueue, reverifier *Reverifier, reporter Reporter, config Config) *ReverifyWorker

NewReverifyWorker creates a new ReverifyWorker.

func (*ReverifyWorker) Close added in v1.70.1

func (worker *ReverifyWorker) Close() error

Close halts the worker.

func (*ReverifyWorker) Run added in v1.70.1

func (worker *ReverifyWorker) Run(ctx context.Context) (err error)

Run runs a ReverifyWorker.

type Segment added in v1.26.2

type Segment struct {
	StreamID      uuid.UUID
	Position      metabase.SegmentPosition
	ExpiresAt     *time.Time
	EncryptedSize int32 // size of the whole segment (not a piece)
}

Segment is a segment to audit.

func NewSegment added in v1.26.2

func NewSegment(loopSegment rangedloop.Segment) Segment

NewSegment creates a new segment to audit from a metainfo loop segment.

func (*Segment) Expired added in v1.26.2

func (segment *Segment) Expired(now time.Time) bool

Expired checks if segment is expired relative to now.

type Share

type Share struct {
	Error        error
	FailurePhase FailurePhase
	PieceNum     int
	NodeID       storj.NodeID
	Data         []byte
}

Share represents required information about an audited share.

type Verifier

type Verifier struct {
	OnTestingCheckSegmentAlteredHook func()
	// contains filtered or unexported fields
}

Verifier helps verify the correctness of a given stripe.

architecture: Worker

func NewVerifier

func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier

NewVerifier creates a Verifier.

func (*Verifier) DownloadShares

func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, stripeIndex int32, shareSize int32) (shares map[int]Share, err error)

DownloadShares downloads shares from the nodes where remote pieces are located.

func (*Verifier) GetShare

func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share)

GetShare use piece store client to download shares from nodes.

func (*Verifier) IdentifyContainedNodes added in v1.70.1

func (verifier *Verifier) IdentifyContainedNodes(ctx context.Context, segment Segment) (skipList map[storj.NodeID]bool, err error)

IdentifyContainedNodes returns the set of all contained nodes out of the holders of pieces in the given segment.

func (*Verifier) SetNow added in v1.26.2

func (verifier *Verifier) SetNow(nowFn func() time.Time)

SetNow allows tests to have the server act as if the current time is whatever they want.

func (*Verifier) Verify

func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[storj.NodeID]bool) (report Report, err error)

Verify downloads shares then verifies the data correctness at a random stripe.

type VerifyQueue added in v1.67.1

type VerifyQueue interface {
	Push(ctx context.Context, segments []Segment, maxBatchSize int) (err error)
	Next(ctx context.Context) (Segment, error)
}

VerifyQueue controls manipulation of a database-based queue of segments to be verified; that is, segments chosen at random from all segments on the satellite, for which workers should perform audits. We will try to download a stripe of data across all pieces in the segment and ensure that all pieces conform to the same polynomial.

type Worker added in v0.21.0

type Worker struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Worker contains information for populating audit queue and processing audits.

func NewWorker added in v0.21.0

func NewWorker(log *zap.Logger, queue VerifyQueue, verifier *Verifier, reverifyQueue ReverifyQueue, reporter Reporter, config Config) *Worker

NewWorker instantiates Worker.

func (*Worker) Close added in v0.21.0

func (worker *Worker) Close() error

Close halts the worker.

func (*Worker) Run added in v0.21.0

func (worker *Worker) Run(ctx context.Context) (err error)

Run runs audit service 2.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL