audit

package
v1.29.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2021 License: AGPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ContainError is the containment errs class.
	ContainError = errs.Class("containment error")

	// ErrContainedNotFound is the errs class for when a pending audit isn't found.
	ErrContainedNotFound = errs.Class("pending audit not found")

	// ErrContainDelete is the errs class for when a pending audit can't be deleted.
	ErrContainDelete = errs.Class("unable to delete pending audit")
)
View Source
var (

	// ErrNotEnoughShares is the errs class for when not enough shares are available to do an audit.
	ErrNotEnoughShares = errs.Class("not enough shares for successful audit")
	// ErrSegmentDeleted is the errs class when the audited segment was deleted during the audit.
	ErrSegmentDeleted = errs.Class("segment deleted during audit")
	// ErrSegmentModified is the errs class used when a segment has been changed in any way.
	ErrSegmentModified = errs.Class("segment has been modified")
)
View Source
var ErrEmptyQueue = errs.Class("empty audit queue")

ErrEmptyQueue is used to indicate that the queue is empty.

View Source
var ErrPendingQueueInProgress = errs.Class("pending queue already in progress")

ErrPendingQueueInProgress means that a chore attempted to add a new pending queue when one was already being added.

View Source
var Error = errs.Class("audit error")

Error is the default audit errs class.

Functions

func GetRandomStripe added in v0.21.0

func GetRandomStripe(ctx context.Context, segment metabase.Segment) (index int32, err error)

GetRandomStripe takes a segment and returns a random stripe index within that segment.

Types

type Chore added in v0.21.0

type Chore struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Chore populates reservoirs and the audit queue.

architecture: Chore

func NewChore added in v0.21.0

func NewChore(log *zap.Logger, queues *Queues, metaLoop *metaloop.Service, config Config) *Chore

NewChore instantiates Chore.

func (*Chore) Close added in v0.21.0

func (chore *Chore) Close() error

Close closes chore.

func (*Chore) Run added in v0.21.0

func (chore *Chore) Run(ctx context.Context) (err error)

Run starts the chore.

type Collector added in v1.26.2

type Collector struct {
	Reservoirs map[storj.NodeID]*Reservoir
	// contains filtered or unexported fields
}

Collector uses the metainfo loop to add segments to node reservoirs.

func NewCollector added in v1.26.2

func NewCollector(reservoirSlots int, r *rand.Rand) *Collector

NewCollector instantiates a segment collector.

func (*Collector) InlineSegment added in v1.26.2

func (collector *Collector) InlineSegment(ctx context.Context, segment *metaloop.Segment) (err error)

InlineSegment returns nil because we're only auditing for storage nodes for now.

func (*Collector) LoopStarted added in v1.27.3

func (collector *Collector) LoopStarted(context.Context, metaloop.LoopInfo) (err error)

LoopStarted is called at each start of a loop.

func (*Collector) Object added in v1.26.2

func (collector *Collector) Object(ctx context.Context, object *metaloop.Object) (err error)

Object returns nil because the audit service does not interact with objects.

func (*Collector) RemoteSegment added in v1.26.2

func (collector *Collector) RemoteSegment(ctx context.Context, segment *metaloop.Segment) (err error)

RemoteSegment takes a remote segment found in metainfo and creates a reservoir for it if it doesn't exist already.

type Config

type Config struct {
	MaxRetriesStatDB   int           `help:"max number of times to attempt updating a statdb batch" default:"3"`
	MinBytesPerSecond  memory.Size   `help:"the minimum acceptable bytes that storage nodes can transfer per second to the satellite" default:"128B"`
	MinDownloadTimeout time.Duration `help:"the minimum duration for downloading a share from storage nodes before timing out" default:"5m0s"`
	MaxReverifyCount   int           `help:"limit above which we consider an audit is failed" default:"3"`

	ChoreInterval     time.Duration `help:"how often to run the reservoir chore" releaseDefault:"24h" devDefault:"1m"`
	QueueInterval     time.Duration `help:"how often to recheck an empty audit queue" releaseDefault:"1h" devDefault:"1m"`
	Slots             int           `help:"number of reservoir slots allotted for nodes, currently capped at 3" default:"3"`
	WorkerConcurrency int           `help:"number of workers to run audits on segments" default:"2"`
}

Config contains configurable values for audit chore and workers.

type Containment

type Containment interface {
	Get(ctx context.Context, nodeID pb.NodeID) (*PendingAudit, error)
	IncrementPending(ctx context.Context, pendingAudit *PendingAudit) error
	Delete(ctx context.Context, nodeID pb.NodeID) (bool, error)
}

Containment holds information about pending audits for contained nodes.

architecture: Database

type PendingAudit

type PendingAudit struct {
	NodeID            storj.NodeID
	PieceID           storj.PieceID
	StripeIndex       int32
	ShareSize         int32
	ExpectedShareHash []byte
	ReverifyCount     int32
	Segment           metabase.SegmentLocation
}

PendingAudit contains info needed for retrying an audit for a contained node.

type Queue added in v0.21.0

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a list of segments to audit, shared between the reservoir chore and audit workers. It is not safe for concurrent use.

func NewQueue added in v1.12.1

func NewQueue(segments []Segment) *Queue

NewQueue creates a new audit queue.

func (*Queue) Next added in v0.21.0

func (q *Queue) Next() (Segment, error)

Next gets the next item in the queue.

func (*Queue) Size added in v0.21.0

func (q *Queue) Size() int

Size returns the size of the queue.

type Queues added in v1.12.1

type Queues struct {
	// contains filtered or unexported fields
}

Queues is a shared resource that keeps track of the next queue to be fetched and swaps with a new queue when ready.

func NewQueues added in v1.12.1

func NewQueues() *Queues

NewQueues creates a new Queues object.

func (*Queues) Fetch added in v1.12.1

func (queues *Queues) Fetch() *Queue

Fetch gets the active queue, clears it, and swaps a pending queue in as the new active queue if available.

func (*Queues) Push added in v1.12.1

func (queues *Queues) Push(pendingQueue []Segment) error

Push waits until the next queue has been fetched (if not empty), then swaps it with the provided pending queue. Push adds a pending queue to be swapped in when ready. If nextQueue is empty, it immediately replaces the queue. Otherwise it creates a swapQueue callback to be called when nextQueue is fetched. Only one call to Push is permitted at a time, otherwise it will return ErrPendingQueueInProgress.

func (*Queues) WaitForSwap added in v1.12.1

func (queues *Queues) WaitForSwap(ctx context.Context) error

WaitForSwap blocks until the swapQueue callback is called or context is canceled. If there is no pending swap, it returns immediately.

type Report

type Report struct {
	Successes     storj.NodeIDList
	Fails         storj.NodeIDList
	Offlines      storj.NodeIDList
	PendingAudits []*PendingAudit
	Unknown       storj.NodeIDList
}

Report contains audit result lists for nodes that succeeded, failed, were offline, have pending audits, or failed for unknown reasons.

type Reporter

type Reporter struct {
	// contains filtered or unexported fields
}

Reporter records audit reports in overlay and implements the reporter interface.

architecture: Service

func NewReporter

func NewReporter(log *zap.Logger, overlay *overlay.Service, containment Containment, maxRetries int, maxReverifyCount int32) *Reporter

NewReporter instantiates a reporter.

func (*Reporter) RecordAudits

func (reporter *Reporter) RecordAudits(ctx context.Context, req Report) (_ Report, err error)

RecordAudits saves audit results to overlay. When no error, it returns nil for both return values, otherwise it returns the report with the fields set to the values which have been saved and the error.

type Reservoir added in v0.19.0

type Reservoir struct {
	Segments [maxReservoirSize]Segment
	// contains filtered or unexported fields
}

Reservoir holds a certain number of segments to reflect a random sample.

func NewReservoir added in v0.19.0

func NewReservoir(size int) *Reservoir

NewReservoir instantiates a Reservoir.

func (*Reservoir) Sample added in v0.19.0

func (reservoir *Reservoir) Sample(r *rand.Rand, segment Segment)

Sample makes sure that for every segment in metainfo from index i=size..n-1, pick a random number r = rand(0..i), and if r < size, replace reservoir.Segments[r] with segment.

type Segment added in v1.26.2

type Segment struct {
	metabase.SegmentLocation
	StreamID       uuid.UUID
	ExpirationDate time.Time
}

Segment is a segment to audit.

func NewSegment added in v1.26.2

func NewSegment(loopSegment *metaloop.Segment) Segment

NewSegment creates a new segment to audit from a metainfo loop segment.

func (*Segment) Expired added in v1.26.2

func (segment *Segment) Expired(now time.Time) bool

Expired checks if segment is expired relative to now.

type Share

type Share struct {
	Error    error
	PieceNum int
	NodeID   storj.NodeID
	Data     []byte
}

Share represents required information about an audited share.

type Verifier

type Verifier struct {
	OnTestingCheckSegmentAlteredHook func()
	// contains filtered or unexported fields
}

Verifier helps verify the correctness of a given stripe.

architecture: Worker

func NewVerifier

func NewVerifier(log *zap.Logger, metabase metainfo.MetabaseDB, dialer rpc.Dialer, overlay *overlay.Service, containment Containment, orders *orders.Service, id *identity.FullIdentity, minBytesPerSecond memory.Size, minDownloadTimeout time.Duration) *Verifier

NewVerifier creates a Verifier.

func (*Verifier) DownloadShares

func (verifier *Verifier) DownloadShares(ctx context.Context, limits []*pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPsAndPorts map[storj.NodeID]string, stripeIndex int32, shareSize int32) (shares map[int]Share, err error)

DownloadShares downloads shares from the nodes where remote pieces are located.

func (*Verifier) GetShare

func (verifier *Verifier) GetShare(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, stripeIndex, shareSize int32, pieceNum int) (share Share, err error)

GetShare use piece store client to download shares from nodes.

func (*Verifier) Reverify

func (verifier *Verifier) Reverify(ctx context.Context, segment Segment) (report Report, err error)

Reverify reverifies the contained nodes in the stripe.

func (*Verifier) SetNow added in v1.26.2

func (verifier *Verifier) SetNow(nowFn func() time.Time)

SetNow allows tests to have the server act as if the current time is whatever they want.

func (*Verifier) Verify

func (verifier *Verifier) Verify(ctx context.Context, segment Segment, skip map[storj.NodeID]bool) (report Report, err error)

Verify downloads shares then verifies the data correctness at a random stripe.

type Worker added in v0.21.0

type Worker struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Worker contains information for populating audit queue and processing audits.

func NewWorker added in v0.21.0

func NewWorker(log *zap.Logger, queues *Queues, verifier *Verifier, reporter *Reporter, config Config) (*Worker, error)

NewWorker instantiates Worker.

func (*Worker) Close added in v0.21.0

func (worker *Worker) Close() error

Close halts the worker.

func (*Worker) Run added in v0.21.0

func (worker *Worker) Run(ctx context.Context) (err error)

Run runs audit service 2.0.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL