audit

package
v1.67.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2022 License: AGPL-3.0 Imports: 32 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ContainError is the containment errs class.
	ContainError = errs.Class("containment")

	// ErrContainedNotFound is the errs class for when a pending audit isn't found.
	ErrContainedNotFound = errs.Class("pending audit not found")

	// ErrContainDelete is the errs class for when a pending audit can't be deleted.
	ErrContainDelete = errs.Class("unable to delete pending audit")
)
View Source
var (

	// ErrNotEnoughShares is the errs class for when not enough shares are available to do an audit.
	ErrNotEnoughShares = errs.Class("not enough shares for successful audit")
	// ErrSegmentDeleted is the errs class when the audited segment was deleted during the audit.
	ErrSegmentDeleted = errs.Class("segment deleted during audit")
	// ErrSegmentModified is the errs class used when a segment has been changed in any way.
	ErrSegmentModified = errs.Class("segment has been modified")
)
View Source
var ErrEmptyQueue = errs.Class("empty audit queue")

ErrEmptyQueue is used to indicate that the queue is empty.

View Source
var ErrPendingQueueInProgress = errs.Class("pending queue already in progress")

ErrPendingQueueInProgress means that a chore attempted to add a new pending queue when one was already being added.

View Source
var Error = errs.Class("audit")

Error is the default audit errs class.

Functions

func GetRandomStripe added in v0.21.0

func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error)

GetRandomStripe takes a segment and returns a random stripe index within that segment.

Types

type Chore added in v0.21.0

type Chore struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Chore populates reservoirs and the audit queue.

architecture: Chore

func NewChore added in v0.21.0

func NewChore(log *zap.Logger, queues *Queues, loop *segmentloop.Service, config Config) *Chore

NewChore instantiates Chore.

func (*Chore) Close added in v0.21.0

func (chore *Chore) Close() error

Close closes chore.

func (*Chore) Run added in v0.21.0

func (chore *Chore) Run(ctx context.Context) (err error)

Run starts the chore.

type Collector added in v1.26.2

type Collector struct {
	Reservoirs map[storj.NodeID]*Reservoir
	// contains filtered or unexported fields
}

Collector uses the segment loop to add segments to node reservoirs.

func NewCollector added in v1.26.2

func NewCollector(reservoirSlots int, r *rand.Rand) *Collector

NewCollector instantiates a segment collector.

func (*Collector) InlineSegment added in v1.26.2

func (collector *Collector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error)

InlineSegment returns nil because we're only auditing for storage nodes for now.

func (*Collector) LoopStarted added in v1.27.3

func (collector *Collector) LoopStarted(context.Context, segmentloop.LoopInfo) (err error)

LoopStarted is called at each start of a loop.

func (*Collector) RemoteSegment added in v1.26.2

func (collector *Collector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error

RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.

type Config

type Config struct {
	MaxRetriesStatDB   int           `help:"max number of times to attempt updating a statdb batch" default:"3"`
	MinBytesPerSecond  memory.Size   `` /* 132-byte string literal not displayed */
	MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s"`
	MaxReverifyCount   int           `help:"limit above which we consider an audit is failed" default:"3"`

	ChoreInterval     time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
	QueueInterval     time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
	Slots             int           `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
	WorkerConcurrency int           `help:"number of workers to run audits on segments" default:"2"`
}

Config contains configurable values for audit chore and workers.

type Containment

type Containment interface {
	Get(ctx context.Context, nodeID pb.NodeID) (*PendingAudit, error)
	IncrementPending(ctx context.Context, pendingAudit *PendingAudit) error
	Delete(ctx context.Context, nodeID pb.NodeID) (bool, error)
}

Containment holds information about pending audits for contained nodes.

architecture: Database

type Outcome added in v1.67.1

type Outcome int

Outcome enumerates the possible results of a piecewise audit.

Note that it is very similar to reputation.AuditType, but it is different in scope and needs a slightly different set of values.

const (
	// OutcomeNotPerformed indicates an audit was not performed, for any of a
	// variety of reasons, but that it should be reattempted later.
	OutcomeNotPerformed Outcome = iota
	// OutcomeNotNecessary indicates that an audit is no longer required,
	// for example because the segment has been updated or no longer exists.
	OutcomeNotNecessary
	// OutcomeSuccess indicates that an audit took place and the piece was
	// fully validated.
	OutcomeSuccess
	// OutcomeFailure indicates that an audit took place but that the node
	// failed the audit, either because it did not have the piece or the
	// data was incorrect.
	OutcomeFailure
	// OutcomeTimedOut indicates the audit could not be completed because
	// it took too long. The audit should be retried later.
	OutcomeTimedOut
	// OutcomeNodeOffline indicates that the audit could not be completed
	// because the node could not be contacted. The audit should be
	// retried later.
	OutcomeNodeOffline
	// OutcomeUnknownError indicates that the audit could not be completed
	// because of an error not otherwise expected or recognized. The
	// audit should be retried later.
	OutcomeUnknownError
)

type PendingAudit

type PendingAudit struct {
	NodeID            storj.NodeID
	PieceID           storj.PieceID
	StripeIndex       int32
	ShareSize         int32
	ExpectedShareHash []byte
	ReverifyCount     int32
	StreamID          uuid.UUID
	Position          metabase.SegmentPosition
}

PendingAudit contains info needed for retrying an audit for a contained node.

type PieceAudit added in v1.40.3

type PieceAudit int

PieceAudit is piece audit status.

const (
	// PieceAuditUnknown is unknown piece audit.
	PieceAuditUnknown PieceAudit = iota
	// PieceAuditFailure is failed piece audit.
	PieceAuditFailure
	// PieceAuditOffline is offline node piece audit.
	PieceAuditOffline
	// PieceAuditContained is online but unresponsive node piece audit.
	PieceAuditContained
	// PieceAuditSuccess is successful piece audit.
	PieceAuditSuccess
)

func PieceAuditFromErr added in v1.40.3

func PieceAuditFromErr(err error) PieceAudit

PieceAuditFromErr returns piece audit based on error.

type PieceLocator added in v1.67.1

type PieceLocator struct {
	StreamID uuid.UUID
	Position metabase.SegmentPosition
	NodeID   storj.NodeID
	PieceNum int
}

PieceLocator specifies all information necessary to look up a particular piece on a particular satellite.

type Queue added in v0.21.0

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a list of segments to audit, shared between the reservoir chore and audit workers. It is not safe for concurrent use.

func NewQueue added in v1.12.1

func NewQueue(segments []Segment) *Queue

NewQueue creates a new audit queue.

func (*Queue) Next added in v0.21.0

func (q *Queue) Next() (Segment, error)

Next gets the next item in the queue.

func (*Queue) Size added in v0.21.0

func (q *Queue) Size() int

Size returns the size of the queue.

type Queues added in v1.12.1

type Queues struct {
	// contains filtered or unexported fields
}

Queues is a shared resource that keeps track of the next queue to be fetched and swaps with a new queue when ready.

func NewQueues added in v1.12.1

func NewQueues() *Queues

NewQueues creates a new Queues object.

func (*Queues) Fetch added in v1.12.1

func (queues *Queues) Fetch() *Queue

Fetch gets the active queue, clears it, and swaps a pending queue in as the new active queue if available.

func (*Queues) Push added in v1.12.1

func (queues *Queues) Push(pendingQueue []Segment) error

Push waits until the next queue has been fetched (if not empty), then swaps it with the provided pending queue. Push adds a pending queue to be swapped in when ready. If nextQueue is empty, it immediately replaces the queue. Otherwise it creates a swapQueue callback to be called when nextQueue is fetched. Only one call to Push is permitted at a time, otherwise it will return ErrPendingQueueInProgress.

func (*Queues) WaitForSwap added in v1.12.1

func (queues *Queues) WaitForSwap(ctx context.Context) error

WaitForSwap blocks until the swapQueue callback is called or context is canceled. If there is no pending swap, it returns immediately.

type Report

type Report struct {
	Successes       storj.NodeIDList
	Fails           storj.NodeIDList
	Offlines        storj.NodeIDList
	PendingAudits   []*PendingAudit
	Unknown         storj.NodeIDList
	NodesReputation map[storj.NodeID]overlay.ReputationStatus
}

Report contains audit result. It records whether an audit is able to be completed, the total number of pieces a given audit has conducted for, lists for nodes that succeeded, failed, were offline, have pending audits, or failed for unknown reasons and their current reputation status.

type Reporter

type Reporter interface {
	RecordAudits(ctx context.Context, req Report) (_ Report, err error)
}

Reporter records audit reports in the overlay and database.

func NewReporter

func NewReporter(log *zap.Logger, reputations *reputation.Service, containment Containment, maxRetries int, maxReverifyCount int32) Reporter

NewReporter instantiates a reporter.

type Reservoir added in v0.19.0

type Reservoir struct {
	Segments [maxReservoirSize]segmentloop.Segment
	// contains filtered or unexported fields
}

Reservoir holds a certain number of segments to reflect a random sample.

func NewReservoir added in v0.19.0

func NewReservoir(size int) *Reservoir

NewReservoir instantiates a Reservoir.

func (*Reservoir) Sample added in v0.19.0

func (reservoir *Reservoir) Sample(r *rand.Rand, segment *segmentloop.Segment)

Sample makes sure that for every segment in metainfo from index i=size..n-1, compute the relative weight based on segment size, and pick a random floating point number r = rand(0..1), and if r < the relative weight of the segment, select uniformly a random segment reservoir.Segments[rand(0..i)] to replace with segment. See https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao for the algorithm used.

type ReverificationJob added in v1.67.1

type ReverificationJob struct {
	Locator       PieceLocator
	InsertedAt    time.Time
	ReverifyCount int
}

ReverificationJob represents a job as received from the reverification audit queue.

type ReverifyQueue added in v1.67.1

type ReverifyQueue interface {
	Insert(ctx context.Context, piece PieceLocator) (err error)
	GetNextJob(ctx context.Context) (job ReverificationJob, err error)
	Remove(ctx context.Context, piece PieceLocator) (wasDeleted bool, err error)
}

ReverifyQueue controls manipulation of a queue of pieces to be _re_verified; that is, a node timed out when we requested an audit of the piece, and now we need to follow up with that node until we get a proper answer to the audit. (Or until we try too many times, and disqualify the node.)

type Segment added in v1.26.2

type Segment struct {
	StreamID      uuid.UUID
	Position      metabase.SegmentPosition
	ExpiresAt     *time.Time
	EncryptedSize int32 // size of the whole segment (not a piece)
}

Segment is a segment to audit.

func NewSegment added in v1.26.2

func NewSegment(loopSegment segmentloop.Segment) Segment

NewSegment creates a new segment to audit from a metainfo loop segment.

func (*Segment) Expired added in v1.26.2

func (segment *Segment) Expired(now time.Time) bool

Expired checks if segment is expired relative to now.

type Share

type Share struct {
	Error    error
	PieceNum int
	NodeID   storj.NodeID
	Data     []byte
}

Share represents required information about an audited share.

type Verifier

type Verifier struct {
	OnTestingCheckSegmentAlteredHook func()
	// contains filtered or unexported fields
}

Verifier helps verify the correctness of a given stripe.

architecture: Worker

func NewVerifier

func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier

NewVerifier creates a Verifier.

func (*Verifier) DoReverifyPiece added in v1.67.1

func (verifier *Verifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator PieceLocator) (outcome Outcome, err error)

DoReverifyPiece acquires a piece from a single node and verifies its contents, its hash, and its order limit.

func (*Verifier) DownloadShares

func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, stripeIndex int32, shareSize int32) (shares map[int]Share, err error)

DownloadShares downloads shares from the nodes where remote pieces are located.

func (*Verifier) GetPiece added in v1.67.1

func (verifier *Verifier) GetPiece(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, pieceSize int32) (pieceData []byte, hash *pb.PieceHash, origLimit *pb.OrderLimit, err error)

GetPiece uses the piecestore client to download a piece (and the associated original OrderLimit and PieceHash) from a node.

func (*Verifier) GetShare

func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error)

GetShare use piece store client to download shares from nodes.

func (*Verifier) Reverify

func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report Report, err error)

Reverify reverifies the contained nodes in the stripe.

func (*Verifier) ReverifyPiece added in v1.67.1

func (verifier *Verifier) ReverifyPiece(ctx context.Context, locator PieceLocator) (keepInQueue bool)

ReverifyPiece acquires a piece from a single node and verifies its contents, its hash, and its order limit.

func (*Verifier) SetNow added in v1.26.2

func (verifier *Verifier) SetNow(nowFn func() time.Time)

SetNow allows tests to have the server act as if the current time is whatever they want.

func (*Verifier) Verify

func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[storj.NodeID]bool) (report Report, err error)

Verify downloads shares then verifies the data correctness at a random stripe.

type VerifyQueue added in v1.67.1

type VerifyQueue interface {
	Push(ctx context.Context, segments []Segment) (err error)
	Next(ctx context.Context) (Segment, error)
}

VerifyQueue controls manipulation of a database-based queue of segments to be verified; that is, segments chosen at random from all segments on the satellite, for which workers should perform audits. We will try to download a stripe of data across all pieces in the segment and ensure that all pieces conform to the same polynomial.

type Worker added in v0.21.0

type Worker struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Worker contains information for populating audit queue and processing audits.

func NewWorker added in v0.21.0

func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter Reporter, config Config) (*Worker, error)

NewWorker instantiates Worker.

func (*Worker) Close added in v0.21.0

func (worker *Worker) Close() error

Close halts the worker.

func (*Worker) Run added in v0.21.0

func (worker *Worker) Run(ctx context.Context) (err error)

Run runs audit service 2.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL