audit

package
v0.28.3-rc-crdb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 19, 2019 License: AGPL-3.0 Imports: 26 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ContainError is the containment errs class
	ContainError = errs.Class("containment error")

	// ErrContainedNotFound is the errs class for when a pending audit isn't found
	ErrContainedNotFound = errs.Class("pending audit not found")

	// ErrContainDelete is the errs class for when a pending audit can't be deleted
	ErrContainDelete = errs.Class("unable to delete pending audit")

	// ErrAlreadyExists is the errs class for when a pending audit with the same nodeID but different share data already exists
	ErrAlreadyExists = errs.Class("pending audit already exists for nodeID")
)
View Source
var (

	// ErrNotEnoughShares is the errs class for when not enough shares are available to do an audit
	ErrNotEnoughShares = errs.Class("not enough shares for successful audit")
	// ErrSegmentDeleted is the errs class when the audited segment was deleted during the audit
	ErrSegmentDeleted = errs.Class("segment deleted during audit")
	// ErrSegmentExpired is the errs class used when a segment to audit has already expired.
	ErrSegmentExpired = errs.Class("segment expired before audit")
)
View Source
var ErrEmptyQueue = errs.Class("empty audit queue")

ErrEmptyQueue is used to indicate that the queue is empty.

View Source
var Error = errs.Class("audit error")

Error is the default audit errs class.

Functions

func GetRandomStripe added in v0.21.0

func GetRandomStripe(ctx context.Context, pointer *pb.Pointer) (index int64, err error)

GetRandomStripe takes a pointer and returns a random stripe index within that pointer.

Types

type Chore added in v0.21.0

type Chore struct {
	Loop sync2.Cycle
	// contains filtered or unexported fields
}

Chore populates reservoirs and the audit queue.

architecture: Chore

func NewChore added in v0.21.0

func NewChore(log *zap.Logger, queue *Queue, metaLoop *metainfo.Loop, config Config) *Chore

NewChore instantiates Chore.

func (*Chore) Close added in v0.21.0

func (chore *Chore) Close() error

Close closes chore.

func (*Chore) Run added in v0.21.0

func (chore *Chore) Run(ctx context.Context) (err error)

Run starts the chore.

type Config

type Config struct {
	MaxRetriesStatDB   int           `help:"max number of times to attempt updating a statdb batch" default:"3"`
	MinBytesPerSecond  memory.Size   `help:"the minimum acceptable bytes that storage nodes can transfer per second to the satellite" default:"128B"`
	MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s"`
	MaxReverifyCount   int           `help:"limit above which we consider an audit is failed" default:"3"`

	ChoreInterval     time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m"`
	QueueInterval     time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m"`
	Slots             int           `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
	WorkerConcurrency int           `help:"number of workers to run audits on paths" default:"1"`
}

Config contains configurable values for audit chore and workers.

type Containment

type Containment interface {
	Get(ctx context.Context, nodeID pb.NodeID) (*PendingAudit, error)
	IncrementPending(ctx context.Context, pendingAudit *PendingAudit) error
	Delete(ctx context.Context, nodeID pb.NodeID) (bool, error)
}

Containment holds information about pending audits for contained nodes

architecture: Database

type PathCollector added in v0.19.0

type PathCollector struct {
	Reservoirs map[storj.NodeID]*Reservoir
	// contains filtered or unexported fields
}

PathCollector uses the metainfo loop to add paths to node reservoirs

architecture: Observer

func NewPathCollector added in v0.19.0

func NewPathCollector(reservoirSlots int, r *rand.Rand) *PathCollector

NewPathCollector instantiates a path collector

func (*PathCollector) InlineSegment added in v0.19.0

func (collector *PathCollector) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)

InlineSegment returns nil because we're only auditing for storage nodes for now

func (*PathCollector) Object added in v0.21.0

func (collector *PathCollector) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)

Object returns nil because the audit service does not interact with objects

func (*PathCollector) RemoteSegment added in v0.19.0

func (collector *PathCollector) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)

RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already

type PendingAudit

type PendingAudit struct {
	NodeID            storj.NodeID
	PieceID           storj.PieceID
	StripeIndex       int64
	ShareSize         int32
	ExpectedShareHash []byte
	ReverifyCount     int32
	Path              storj.Path
}

PendingAudit contains info needed for retrying an audit for a contained node

type Queue added in v0.21.0

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a list of paths to audit, shared between the reservoir chore and audit workers.

func (*Queue) Next added in v0.21.0

func (q *Queue) Next() (storj.Path, error)

Next gets the next item in the queue.

func (*Queue) Size added in v0.21.0

func (q *Queue) Size() int

Size returns the size of the queue.

func (*Queue) Swap added in v0.21.0

func (q *Queue) Swap(newQueue []storj.Path)

Swap switches the backing queue slice with a new queue slice.

type Report

type Report struct {
	Successes     storj.NodeIDList
	Fails         storj.NodeIDList
	Offlines      storj.NodeIDList
	PendingAudits []*PendingAudit
	Unknown       storj.NodeIDList
}

Report contains audit result lists for nodes that succeeded, failed, were offline, have pending audits, or failed for unknown reasons

type Reporter

type Reporter struct {
	// contains filtered or unexported fields
}

Reporter records audit reports in overlay and implements the reporter interface

architecture: Service

func NewReporter

func NewReporter(log *zap.Logger, overlay *overlay.Service, containment Containment, maxRetries int, maxReverifyCount int32) *Reporter

NewReporter instantiates a reporter

func (*Reporter) RecordAudits

func (reporter *Reporter) RecordAudits(ctx context.Context, req Report, path storj.Path) (_ Report, err error)

RecordAudits saves audit results to overlay. When no error, it returns nil for both return values, otherwise it returns the report with the fields set to the values which have been saved and the error.

type Reservoir added in v0.19.0

type Reservoir struct {
	Paths [maxReservoirSize]storj.Path
	// contains filtered or unexported fields
}

Reservoir holds a certain number of segments to reflect a random sample

func NewReservoir added in v0.19.0

func NewReservoir(size int) *Reservoir

NewReservoir instantiates a Reservoir

func (*Reservoir) Sample added in v0.19.0

func (reservoir *Reservoir) Sample(r *rand.Rand, path storj.Path)

Sample makes sure that for every segment in metainfo from index i=size..n-1, pick a random number r = rand(0..i), and if r < size, replace reservoir.Segments[r] with segment

type Share

type Share struct {
	Error    error
	PieceNum int
	NodeID   storj.NodeID
	Data     []byte
}

Share represents required information about an audited share

type Verifier

type Verifier struct {
	OnTestingCheckSegmentAlteredHook func()
	// contains filtered or unexported fields
}

Verifier helps verify the correctness of a given stripe

architecture: Worker

func NewVerifier

func NewVerifier(log *zap.Logger, metainfo *metainfo.Service, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier

NewVerifier creates a Verifier

func (*Verifier) DownloadShares

func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, stripeIndex int64, shareSize int32) (shares map[int]Share, err error)

DownloadShares downloads shares from the nodes where remote pieces are located

func (*Verifier) GetShare

func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, stripeIndex int64, shareSize int32, pieceNum int) (share Share, err error)

GetShare use piece store client to download shares from nodes

func (*Verifier) Reverify

func (verifier *Verifier) Reverify(ctx context.Context, path storj.Path) (report Report, err error)

Reverify reverifies the contained nodes in the stripe

func (*Verifier) Verify

func (verifier *Verifier) Verify(ctx context.Context, path storj.Path, skip map[storj.NodeID]bool) (report Report, err error)

Verify downloads shares then verifies the data correctness at a random stripe.

type Worker added in v0.21.0

type Worker struct {
	Loop sync2.Cycle
	// contains filtered or unexported fields
}

Worker contains information for populating audit queue and processing audits.

func NewWorker added in v0.21.0

func NewWorker(log *zap.Logger, queue *Queue, verifier *Verifier, reporter *Reporter, config Config) (*Worker, error)

NewWorker instantiates Worker.

func (*Worker) Close added in v0.21.0

func (worker *Worker) Close() error

Close halts the worker.

func (*Worker) Run added in v0.21.0

func (worker *Worker) Run(ctx context.Context) (err error)

Run runs audit service 2.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL