repairer

package
v1.120.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 8, 2025 License: AGPL-3.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrPieceHashVerifyFailed is the errs class when a piece hash downloaded from storagenode fails to match the original hash.
	ErrPieceHashVerifyFailed = errs.Class("piece hashes don't match")

	// ErrDialFailed is the errs class when a failure happens during Dial.
	ErrDialFailed = errs.Class("dial failure")
)
View Source
var (
	Error = errs.Class("repairer")
)

Error is a standard error class for this package.

Functions

This section is empty.

Types

type AdminFetchInfo added in v1.49.3

type AdminFetchInfo struct {
	Reader        io.ReadCloser
	Hash          *pb.PieceHash
	GetLimit      *pb.AddressedOrderLimit
	OriginalLimit *pb.OrderLimit
	FetchError    error
}

AdminFetchInfo groups together all the information about a piece that should be retrievable from storage nodes.

type Config

type Config struct {
	MaxRepair                     int           `help:"maximum segments that can be repaired concurrently" releaseDefault:"5" devDefault:"1" testDefault:"10"`
	SegmentsSelectBatchSize       int           `help:"how many injured segments will be read from repair queue in single request" releaseDefault:"1" devDefault:"10"`
	Interval                      time.Duration `` /* 130-byte string literal not displayed */
	DialTimeout                   time.Duration `help:"time limit for dialing storage node" default:"5s"`
	Timeout                       time.Duration `help:"time limit for uploading repaired pieces to new storage nodes" default:"5m0s" testDefault:"1m"`
	DownloadTimeout               time.Duration `help:"time limit for downloading pieces from a node for repair" default:"5m0s" testDefault:"1m"`
	TotalTimeout                  time.Duration `help:"time limit for an entire repair job, from queue pop to upload completion" default:"45m" testDefault:"10m"`
	MaxBufferMem                  memory.Size   `help:"maximum buffer memory (in bytes) to be allocated for read buffers" default:"4.0 MiB"`
	MaxExcessRateOptimalThreshold float64       `` /* 135-byte string literal not displayed */
	InMemoryRepair                bool          `help:"whether to download pieces for repair in memory (true) or download to disk (false)" default:"false"`
	InMemoryUpload                bool          `help:"whether to upload pieces for repair using memory (true) or disk (false)" default:"false"`
	ReputationUpdateEnabled       bool          `help:"whether the audit score of nodes should be updated as a part of repair" default:"false"`
	UseRangedLoop                 bool          `help:"whether to enable repair checker observer with ranged loop" default:"true"`
	RepairExcludedCountryCodes    []string      `help:"list of country codes to treat node from this country as offline" default:"" hidden:"true"`
	DoDeclumping                  bool          `help:"repair pieces on the same network to other nodes" default:"true"`
	DoPlacementCheck              bool          `help:"repair pieces out of segment placement" default:"true"`

	IncludedPlacements PlacementList `` /* 126-byte string literal not displayed */
	ExcludedPlacements PlacementList `help:"comma separated placement IDs (numbers), placements which should be ignored by the repairer" default:""`
}

Config contains configurable values for repairer.

type ECRepairer added in v0.21.0

type ECRepairer struct {
	// contains filtered or unexported fields
}

ECRepairer allows the repairer to download, verify, and upload pieces from storagenodes.

func NewECRepairer added in v0.21.0

func NewECRepairer(dialer rpc.Dialer, satelliteSignee signing.Signee, dialTimeout time.Duration, downloadTimeout time.Duration,
	inmemoryDownload, inmemoryUpload bool) *ECRepairer

NewECRepairer creates a new repairer for interfacing with storagenodes.

func (*ECRepairer) Get added in v0.21.0

func (ec *ECRepairer) Get(ctx context.Context, log *zap.Logger, limits []*pb.AddressedOrderLimit, cachedNodesInfo map[storj.NodeID]overlay.NodeReputation, privateKey storj.PiecePrivateKey, es eestream.ErasureScheme, dataSize int64) (_ io.ReadCloser, _ FetchResultReport, err error)

Get downloads pieces from storagenodes using the provided order limits, and decodes those pieces into a segment. It attempts to download from the minimum required number based on the redundancy scheme. It will further wait for additional error/failure results up to minFailures, for testing purposes. Under normal conditions, minFailures will be 0.

After downloading a piece, the ECRepairer will verify the hash and original order limit for that piece. If verification fails, another piece will be downloaded until we reach the minimum required or run out of order limits. If piece hash verification fails, it will return all failed node IDs.

func (*ECRepairer) Repair added in v0.21.0

func (ec *ECRepairer) Repair(ctx context.Context, log *zap.Logger, limits []*pb.AddressedOrderLimit, privateKey storj.PiecePrivateKey, rs eestream.RedundancyStrategy, data io.Reader, timeout time.Duration, successfulNeeded int) (successfulNodes []*pb.Node, successfulHashes []*pb.PieceHash, err error)

Repair takes a provided segment, encodes it with the provided redundancy strategy, and uploads the pieces in need of repair to new nodes provided by order limits.

func (*ECRepairer) TestingSetMinFailures added in v1.77.2

func (ec *ECRepairer) TestingSetMinFailures(minFailures int)

TestingSetMinFailures sets the minFailures attribute, which tells the Repair machinery that we _expect_ there to be failures and that we should wait for them if necessary. This is only used in tests.

type FetchResultReport added in v1.65.1

type FetchResultReport struct {
	Successful []PieceFetchResult
	Failed     []PieceFetchResult
	Offline    []PieceFetchResult
	Contained  []PieceFetchResult
	Unknown    []PieceFetchResult
}

FetchResultReport contains a categorization of a set of pieces based on the results of GET operations.

type PieceFetchResult added in v1.65.1

type PieceFetchResult struct {
	Piece metabase.Piece
	Err   error
}

PieceFetchResult combines a piece pointer with the error we got when we tried to acquire that piece.

type PlacementList added in v1.89.2

type PlacementList struct {
	Placements []storj.PlacementConstraint
}

PlacementList is a configurable, comma separated list of PlacementConstraint IDs.

func (*PlacementList) Set added in v1.89.2

func (p *PlacementList) Set(s string) error

Set implements pflag.Value.

func (*PlacementList) String added in v1.89.2

func (p *PlacementList) String() string

String implements pflag.Value.

func (PlacementList) Type added in v1.89.2

func (p PlacementList) Type() string

Type implements pflag.Value.

type QueueStat added in v1.94.1

type QueueStat struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

QueueStat contains the information and variables to ensure the Software is up-to-date.

func NewQueueStat added in v1.94.1

func NewQueueStat(log *zap.Logger, registry *monkit.Registry, placements []storj.PlacementConstraint, db queue.RepairQueue, checkInterval time.Duration) *QueueStat

NewQueueStat creates a chore to stat repair queue statistics.

func (*QueueStat) Run added in v1.94.1

func (c *QueueStat) Run(ctx context.Context) (err error)

Run logs the current version information.

func (*QueueStat) RunOnce added in v1.94.1

func (c *QueueStat) RunOnce(ctx context.Context)

RunOnce refresh the queue statistics.

func (*QueueStat) Stats added in v1.94.1

func (c *QueueStat) Stats(cb func(key monkit.SeriesKey, field string, val float64))

Stats implements stat source.

type QueueStatConfig added in v1.94.1

type QueueStatConfig struct {
	Interval time.Duration `` /* 134-byte string literal not displayed */
}

QueueStatConfig configures the queue checker chore. note: this is intentionally not part of the Config, as it is required by the chore, and it makes it possible to print out only required configs for chore (full repair config is not required).

type SegmentRepairer

type SegmentRepairer struct {
	OnTestingCheckSegmentAlteredHook func()
	OnTestingPiecesReportHook        func(pieces FetchResultReport)
	// contains filtered or unexported fields
}

SegmentRepairer for segments.

func NewSegmentRepairer

func NewSegmentRepairer(
	log *zap.Logger,
	metabase *metabase.DB,
	orders *orders.Service,
	overlay *overlay.Service,
	reporter audit.Reporter,
	ecRepairer *ECRepairer,
	placements nodeselection.PlacementDefinitions,
	repairThresholdOverrides checker.RepairThresholdOverrides,
	repairTargetOverrides checker.RepairTargetOverrides,
	config Config,
) *SegmentRepairer

NewSegmentRepairer creates a new instance of SegmentRepairer.

excessPercentageOptimalThreshold is the percentage to apply over the optimal threshould to determine the maximum limit of nodes to upload repaired pieces, when negative, 0 is applied.

func (*SegmentRepairer) AdminFetchPieces added in v1.49.3

func (repairer *SegmentRepairer) AdminFetchPieces(ctx context.Context, log *zap.Logger, seg *metabase.Segment, saveDir string) (pieceInfos []AdminFetchInfo, err error)

AdminFetchPieces retrieves raw pieces and the associated hashes and original order limits from the storage nodes on which they are stored, and returns them intact to the caller rather than decoding or decrypting or verifying anything. This is to be used for debugging purposes.

func (*SegmentRepairer) Repair

func (repairer *SegmentRepairer) Repair(ctx context.Context, queueSegment queue.InjuredSegment) (shouldDelete bool, err error)

Repair retrieves an at-risk segment and repairs and stores lost pieces on new nodes note that shouldDelete is used even in the case where err is not null note that it will update audit status as failed for nodes that failed piece hash verification during repair downloading.

func (*SegmentRepairer) SetNow added in v1.26.2

func (repairer *SegmentRepairer) SetNow(nowFn func() time.Time)

SetNow allows tests to have the server act as if the current time is whatever they want.

type Service

type Service struct {
	JobLimiter *semaphore.Weighted
	Loop       *sync2.Cycle
	// contains filtered or unexported fields
}

Service contains the information needed to run the repair service.

architecture: Worker

func NewService

func NewService(log *zap.Logger, queue queue.RepairQueue, config *Config, repairer *SegmentRepairer) *Service

NewService creates repairing service.

func (*Service) Close

func (service *Service) Close() error

Close closes resources.

func (*Service) Run

func (service *Service) Run(ctx context.Context) (err error)

Run runs the repairer service.

func (*Service) SetNow added in v1.26.2

func (service *Service) SetNow(nowFn func() time.Time)

SetNow allows tests to have the server act as if the current time is whatever they want.

func (*Service) TestingSetMinFailures added in v1.77.2

func (service *Service) TestingSetMinFailures(minFailures int)

TestingSetMinFailures sets the minFailures attribute, which tells the Repair machinery that we _expect_ there to be failures and that we should wait for them if necessary. This is only used in tests.

func (*Service) WaitForPendingRepairs added in v0.34.1

func (service *Service) WaitForPendingRepairs()

WaitForPendingRepairs waits for all ongoing repairs to complete.

NB: this assumes that service.config.MaxRepair will never be changed once this Service instance is initialized. If that is not a valid assumption, we should keep a copy of its initial value to use here instead.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL