audit

package
v1.62.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2022 License: AGPL-3.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ContainError is the containment errs class.
	ContainError = errs.Class("containment")

	// ErrContainedNotFound is the errs class for when a pending audit isn't found.
	ErrContainedNotFound = errs.Class("pending audit not found")

	// ErrContainDelete is the errs class for when a pending audit can't be deleted.
	ErrContainDelete = errs.Class("unable to delete pending audit")
)
View Source
var (

	// ErrNotEnoughShares is the errs class for when not enough shares are available to do an audit.
	ErrNotEnoughShares = errs.Class("not enough shares for successful audit")
	// ErrSegmentDeleted is the errs class when the audited segment was deleted during the audit.
	ErrSegmentDeleted = errs.Class("segment deleted during audit")
	// ErrSegmentModified is the errs class used when a segment has been changed in any way.
	ErrSegmentModified = errs.Class("segment has been modified")
)
View Source
var ErrEmptyQueue = errs.Class("empty audit queue")

ErrEmptyQueue is used to indicate that the queue is empty.

View Source
var ErrPendingQueueInProgress = errs.Class("pending queue already in progress")

ErrPendingQueueInProgress means that a chore attempted to add a new pending queue when one was already being added.

View Source
var Error = errs.Class("audit")

Error is the default audit errs class.

Functions

func GetRandomStripe added in v0.21.0

func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error)

GetRandomStripe takes a segment and returns a random stripe index within that segment.

Types

type Chore added in v0.21.0

type Chore struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Chore populates reservoirs and the audit queue.

architecture: Chore

func NewChore added in v0.21.0

func NewChore(log *zap.Logger, queues *Queues, loop *segmentloop.Service, config Config) *Chore

NewChore instantiates Chore.

func (*Chore) Close added in v0.21.0

func (chore *Chore) Close() error

Close closes chore.

func (*Chore) Run added in v0.21.0

func (chore *Chore) Run(ctx context.Context) (err error)

Run starts the chore.

type Collector added in v1.26.2

type Collector struct {
	Reservoirs map[storj.NodeID]*Reservoir
	// contains filtered or unexported fields
}

Collector uses the segment loop to add segments to node reservoirs.

func NewCollector added in v1.26.2

func NewCollector(reservoirSlots int, r *rand.Rand) *Collector

NewCollector instantiates a segment collector.

func (*Collector) InlineSegment added in v1.26.2

func (collector *Collector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error)

InlineSegment returns nil because we're only auditing for storage nodes for now.

func (*Collector) LoopStarted added in v1.27.3

func (collector *Collector) LoopStarted(context.Context, segmentloop.LoopInfo) (err error)

LoopStarted is called at each start of a loop.

func (*Collector) RemoteSegment added in v1.26.2

func (collector *Collector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) error

RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.

type Config

type Config struct {
	MaxRetriesStatDB   int           `help:"max number of times to attempt updating a statdb batch" default:"3"`
	MinBytesPerSecond  memory.Size   `` /* 132-byte string literal not displayed */
	MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s" testDefault:"5s"`
	MaxReverifyCount   int           `help:"limit above which we consider an audit is failed" default:"3"`

	ChoreInterval     time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
	QueueInterval     time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m" testDefault:"$TESTINTERVAL"`
	Slots             int           `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
	WorkerConcurrency int           `help:"number of workers to run audits on segments" default:"2"`
}

Config contains configurable values for audit chore and workers.

type Containment

type Containment interface {
	Get(ctx context.Context, nodeID pb.NodeID) (*PendingAudit, error)
	IncrementPending(ctx context.Context, pendingAudit *PendingAudit) error
	Delete(ctx context.Context, nodeID pb.NodeID) (bool, error)
}

Containment holds information about pending audits for contained nodes.

architecture: Database

type PendingAudit

type PendingAudit struct {
	NodeID            storj.NodeID
	PieceID           storj.PieceID
	StripeIndex       int32
	ShareSize         int32
	ExpectedShareHash []byte
	ReverifyCount     int32
	StreamID          uuid.UUID
	Position          metabase.SegmentPosition
}

PendingAudit contains info needed for retrying an audit for a contained node.

type PieceAudit added in v1.40.3

type PieceAudit int

PieceAudit is piece audit status.

const (
	// PieceAuditUnknown is unknown piece audit.
	PieceAuditUnknown PieceAudit = iota
	// PieceAuditFailure is failed piece audit.
	PieceAuditFailure
	// PieceAuditOffline is offline node piece audit.
	PieceAuditOffline
	// PieceAuditContained is online but unresponsive node piece audit.
	PieceAuditContained
	// PieceAuditSuccess is successful piece audit.
	PieceAuditSuccess
)

func PieceAuditFromErr added in v1.40.3

func PieceAuditFromErr(err error) PieceAudit

PieceAuditFromErr returns piece audit based on error.

type Pieces added in v1.40.3

type Pieces struct {
	Successful metabase.Pieces
	Failed     metabase.Pieces
	Offline    metabase.Pieces
	Contained  metabase.Pieces
	Unknown    metabase.Pieces
}

Pieces contains pieces structured by piece audit.

type Queue added in v0.21.0

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a list of segments to audit, shared between the reservoir chore and audit workers. It is not safe for concurrent use.

func NewQueue added in v1.12.1

func NewQueue(segments []Segment) *Queue

NewQueue creates a new audit queue.

func (*Queue) Next added in v0.21.0

func (q *Queue) Next() (Segment, error)

Next gets the next item in the queue.

func (*Queue) Size added in v0.21.0

func (q *Queue) Size() int

Size returns the size of the queue.

type Queues added in v1.12.1

type Queues struct {
	// contains filtered or unexported fields
}

Queues is a shared resource that keeps track of the next queue to be fetched and swaps with a new queue when ready.

func NewQueues added in v1.12.1

func NewQueues() *Queues

NewQueues creates a new Queues object.

func (*Queues) Fetch added in v1.12.1

func (queues *Queues) Fetch() *Queue

Fetch gets the active queue, clears it, and swaps a pending queue in as the new active queue if available.

func (*Queues) Push added in v1.12.1

func (queues *Queues) Push(pendingQueue []Segment) error

Push waits until the next queue has been fetched (if not empty), then swaps it with the provided pending queue. Push adds a pending queue to be swapped in when ready. If nextQueue is empty, it immediately replaces the queue. Otherwise it creates a swapQueue callback to be called when nextQueue is fetched. Only one call to Push is permitted at a time, otherwise it will return ErrPendingQueueInProgress.

func (*Queues) WaitForSwap added in v1.12.1

func (queues *Queues) WaitForSwap(ctx context.Context) error

WaitForSwap blocks until the swapQueue callback is called or context is canceled. If there is no pending swap, it returns immediately.

type Report

type Report struct {
	Successes       storj.NodeIDList
	Fails           storj.NodeIDList
	Offlines        storj.NodeIDList
	PendingAudits   []*PendingAudit
	Unknown         storj.NodeIDList
	NodesReputation map[storj.NodeID]overlay.ReputationStatus
}

Report contains audit result. It records whether an audit is able to be completed, the total number of pieces a given audit has conducted for, lists for nodes that succeeded, failed, were offline, have pending audits, or failed for unknown reasons and their current reputation status.

type Reporter

type Reporter interface {
	RecordAudits(ctx context.Context, req Report) (_ Report, err error)
}

Reporter records audit reports in the overlay and database.

func NewReporter

func NewReporter(log *zap.Logger, reputations *reputation.Service, containment Containment, maxRetries int, maxReverifyCount int32) Reporter

NewReporter instantiates a reporter.

type Reservoir added in v0.19.0

type Reservoir struct {
	Segments [maxReservoirSize]Segment
	// contains filtered or unexported fields
}

Reservoir holds a certain number of segments to reflect a random sample.

func NewReservoir added in v0.19.0

func NewReservoir(size int) *Reservoir

NewReservoir instantiates a Reservoir.

func (*Reservoir) Sample added in v0.19.0

func (reservoir *Reservoir) Sample(r *rand.Rand, segment Segment)

Sample makes sure that for every segment in metainfo from index i=size..n-1, compute the relative weight based on segment size, and pick a random floating point number r = rand(0..1), and if r < the relative weight of the segment, select uniformly a random segment reservoir.Segments[rand(0..i)] to replace with segment. See https://en.wikipedia.org/wiki/Reservoir_sampling#Algorithm_A-Chao for the algorithm used.

type Segment added in v1.26.2

type Segment struct {
	StreamID      uuid.UUID
	Position      metabase.SegmentPosition
	ExpiresAt     *time.Time
	EncryptedSize int32 // size of the whole segment (not a piece)
}

Segment is a segment to audit.

func NewSegment added in v1.26.2

func NewSegment(loopSegment *segmentloop.Segment) Segment

NewSegment creates a new segment to audit from a metainfo loop segment.

func (*Segment) Expired added in v1.26.2

func (segment *Segment) Expired(now time.Time) bool

Expired checks if segment is expired relative to now.

type Share

type Share struct {
	Error    error
	PieceNum int
	NodeID   storj.NodeID
	Data     []byte
}

Share represents required information about an audited share.

type Verifier

type Verifier struct {
	OnTestingCheckSegmentAlteredHook func()
	// contains filtered or unexported fields
}

Verifier helps verify the correctness of a given stripe.

architecture: Worker

func NewVerifier

func NewVerifier(log *zap.Logger, metabase *metabase.DB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier

NewVerifier creates a Verifier.

func (*Verifier) DownloadShares

func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, stripeIndex int32, shareSize int32) (shares map[int]Share, err error)

DownloadShares downloads shares from the nodes where remote pieces are located.

func (*Verifier) GetShare

func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error)

GetShare use piece store client to download shares from nodes.

func (*Verifier) Reverify

func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report Report, err error)

Reverify reverifies the contained nodes in the stripe.

func (*Verifier) SetNow added in v1.26.2

func (verifier *Verifier) SetNow(nowFn func() time.Time)

SetNow allows tests to have the server act as if the current time is whatever they want.

func (*Verifier) Verify

func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[storj.NodeID]bool) (report Report, err error)

Verify downloads shares then verifies the data correctness at a random stripe.

type Worker added in v0.21.0

type Worker struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Worker contains information for populating audit queue and processing audits.

func NewWorker added in v0.21.0

func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter Reporter, config Config) (*Worker, error)

NewWorker instantiates Worker.

func (*Worker) Close added in v0.21.0

func (worker *Worker) Close() error

Close halts the worker.

func (*Worker) Run added in v0.21.0

func (worker *Worker) Run(ctx context.Context) (err error)

Run runs audit service 2.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL