gracefulexit

package
v1.72.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2023 License: AGPL-3.0 Imports: 27 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Error is the default error class for graceful exit package.
	Error = errs.Class("gracefulexit")

	// ErrNodeNotFound is returned if a graceful exit entry for a  node does not exist in database.
	ErrNodeNotFound = errs.Class("graceful exit node not found")

	// ErrAboveOptimalThreshold is returned if a graceful exit entry for a node has more pieces than required.
	ErrAboveOptimalThreshold = errs.Class("segment has more pieces than required")
)
View Source
var (
	// ErrInvalidArgument is an error class for invalid argument errors used to check which rpc code to use.
	ErrInvalidArgument = errs.Class("graceful exit")
	// ErrIneligibleNodeAge is an error class for when a node has not been on the network long enough to graceful exit.
	ErrIneligibleNodeAge = errs.Class("node is not yet eligible for graceful exit")
)

Functions

This section is empty.

Types

type Chore added in v0.23.0

type Chore struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Chore populates the graceful exit transfer queue.

architecture: Chore

func NewChore added in v0.23.0

func NewChore(log *zap.Logger, db DB, overlay overlay.DB, segmentLoop *segmentloop.Service, config Config) *Chore

NewChore instantiates Chore.

func (*Chore) Close added in v0.23.0

func (chore *Chore) Close() error

Close closes chore.

func (*Chore) Run added in v0.23.0

func (chore *Chore) Run(ctx context.Context) (err error)

Run starts the chore.

type Config added in v0.23.0

type Config struct {
	Enabled bool `help:"whether or not graceful exit is enabled on the satellite side." default:"true"`

	ChoreBatchSize int           `help:"size of the buffer used to batch inserts into the transfer queue." default:"500" testDefault:"10"`
	ChoreInterval  time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s" testDefault:"$TESTINTERVAL"`
	UseRangedLoop  bool          `help:"whether or not to use the ranged loop observer instead of the chore." default:"false" testDefault:"false"`

	EndpointBatchSize int `help:"size of the buffer used to batch transfer queue reads and sends to the storage node." default:"300" testDefault:"100"`

	MaxFailuresPerPiece          int           `help:"maximum number of transfer failures per piece." default:"5"`
	OverallMaxFailuresPercentage int           `help:"maximum percentage of transfer failures per node." default:"10"`
	MaxInactiveTimeFrame         time.Duration `help:"maximum inactive time frame of transfer activities per node." default:"168h" testDefault:"10s"`
	RecvTimeout                  time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"2h" testDefault:"1m"`
	MaxOrderLimitSendCount       int           `` /* 131-byte string literal not displayed */
	NodeMinAgeInMonths           int           `help:"minimum age for a node on the network in order to initiate graceful exit" default:"6" testDefault:"0"`

	AsOfSystemTimeInterval time.Duration `` /* 142-byte string literal not displayed */
	TransferQueueBatchSize int           `help:"batch size (crdb specific) for deleting and adding items to the transfer queue" default:"1000"`
}

Config for the chore.

type DB

type DB interface {
	// IncrementProgress increments transfer stats for a node.
	IncrementProgress(ctx context.Context, nodeID storj.NodeID, bytes int64, successfulTransfers int64, failedTransfers int64) error
	// GetProgress gets a graceful exit progress entry.
	GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error)

	// Enqueue batch inserts graceful exit transfer queue entries it does not exist.
	Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int) error
	// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
	UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error
	// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
	DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error
	// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
	DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
	// DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries.
	DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
	// DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer
	// queue items whose nodes have finished the exit before the indicated time
	// returning the total number of deleted items.
	DeleteAllFinishedTransferQueueItems(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (count int64, err error)
	// DeleteFinishedExitProgress deletes exit progress entries for nodes that
	// finished exiting before the indicated time, returns number of deleted entries.
	DeleteFinishedExitProgress(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (count int64, err error)
	// GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time.
	GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error)
	// GetTransferQueueItem gets a graceful exit transfer queue entry.
	GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) (*TransferQueueItem, error)
	// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
	GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error)
	// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries in the database ordered by durability ratio and queued date ascending.
	GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error)
	// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending.
	GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*TransferQueueItem, error)
	// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
	IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error
	// CountFinishedTransferQueueItemsByNode return a map of the nodes which has
	// finished the exit before the indicated time but there are at least one item
	// left in the transfer queue.
	CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (map[storj.NodeID]int64, error)
}

DB implements CRUD operations for graceful exit service.

architecture: Database

type Endpoint added in v0.24.0

type Endpoint struct {
	pb.DRPCSatelliteGracefulExitUnimplementedServer
	// contains filtered or unexported fields
}

Endpoint for handling the transfer of pieces for Graceful Exit.

func NewEndpoint added in v0.24.0

func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overlay.DB, overlay *overlay.Service, reputation *reputation.Service, metabase *metabase.DB, orders *orders.Service,
	peerIdentities overlay.PeerIdentities, config Config) *Endpoint

NewEndpoint creates a new graceful exit endpoint.

func (*Endpoint) GracefulExitFeasibility added in v1.7.1

func (endpoint *Endpoint) GracefulExitFeasibility(ctx context.Context, req *pb.GracefulExitFeasibilityRequest) (_ *pb.GracefulExitFeasibilityResponse, err error)

GracefulExitFeasibility returns node's joined at date, nodeMinAge and if graceful exit available.

func (*Endpoint) Process added in v0.24.0

func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStream) (err error)

Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.

func (*Endpoint) UpdatePiecesCheckDuplicates added in v1.26.2

func (endpoint *Endpoint) UpdatePiecesCheckDuplicates(ctx context.Context, segment metabase.Segment, toAdd, toRemove metabase.Pieces, checkDuplicates bool) (err error)

UpdatePiecesCheckDuplicates atomically adds toAdd pieces and removes toRemove pieces from the segment.

If checkDuplicates is true it will return an error if the nodes to be added are already in the segment. Then it will remove the toRemove pieces and then it will add the toAdd pieces.

type Observer added in v1.70.1

type Observer struct {
	// contains filtered or unexported fields
}

Observer populates the transfer queue for exiting nodes. It also updates the timed out status and removes transefer queue items for inactive exiting nodes.

func NewObserver added in v1.70.1

func NewObserver(log *zap.Logger, db DB, overlay overlay.DB, config Config) *Observer

NewObserver returns a new ranged loop observer.

func (*Observer) Finish added in v1.70.1

func (obs *Observer) Finish(ctx context.Context) (err error)

Finish marks that the exit loop has been completed for newly exiting nodes that were processed in this loop cycle.

func (*Observer) Fork added in v1.70.1

func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error)

Fork returns path collector that will populate the transfer queue for segments belonging to newly exiting nodes for its range.

func (*Observer) Join added in v1.70.1

func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error)

Join flushes the forked path collector and aggregates collected metrics.

func (*Observer) Start added in v1.70.1

func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error)

Start updates the status and clears the transfer queue for inactive exiting nodes. It then prepares to populate the transfer queue for newly exiting nodes during the ranged loop cycle.

type PathCollector added in v0.23.0

type PathCollector struct {
	// contains filtered or unexported fields
}

PathCollector uses the metainfo loop to add paths to node reservoirs.

architecture: Observer

func NewPathCollector added in v0.23.0

func NewPathCollector(log *zap.Logger, db DB, exitingNodes storj.NodeIDList, batchSize int) *PathCollector

NewPathCollector instantiates a path collector.

func (*PathCollector) Flush added in v0.23.0

func (collector *PathCollector) Flush(ctx context.Context) (err error)

Flush persists the current buffer items to the database.

func (*PathCollector) InlineSegment added in v0.23.0

func (collector *PathCollector) InlineSegment(ctx context.Context, segment *segmentloop.Segment) (err error)

InlineSegment returns nil because we're only auditing for storage nodes for now.

func (*PathCollector) LoopStarted added in v1.27.3

func (collector *PathCollector) LoopStarted(context.Context, segmentloop.LoopInfo) (err error)

LoopStarted is called at each start of a loop.

func (*PathCollector) Process added in v1.70.1

func (collector *PathCollector) Process(ctx context.Context, segments []segmentloop.Segment) (err error)

Process adds transfer queue items for remote segments belonging to newly exiting nodes.

func (*PathCollector) RemoteSegment added in v0.23.0

func (collector *PathCollector) RemoteSegment(ctx context.Context, segment *segmentloop.Segment) (err error)

RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already.

type PendingFinishedPromise added in v0.27.0

type PendingFinishedPromise struct {
	// contains filtered or unexported fields
}

PendingFinishedPromise for waiting for information about finished state.

func (*PendingFinishedPromise) Wait added in v0.27.0

func (promise *PendingFinishedPromise) Wait(ctx context.Context) (bool, error)

Wait should be called (once) after acquiring the finished promise and will return whether the pending map is finished.

type PendingMap added in v0.27.0

type PendingMap struct {
	// contains filtered or unexported fields
}

PendingMap for managing concurrent access to the pending transfer map.

func NewPendingMap added in v0.27.0

func NewPendingMap() *PendingMap

NewPendingMap creates a new PendingMap.

func (*PendingMap) Delete added in v0.27.0

func (pm *PendingMap) Delete(pieceID storj.PieceID) error

Delete removes the pending transfer item from the map and returns an error if the data does not exist.

func (*PendingMap) DoneSending added in v0.27.0

func (pm *PendingMap) DoneSending(err error) error

DoneSending is called (with an optional error) when no more work will be added to the map. If DoneSending has already been called, an error is returned. If a PendingFinishedPromise is waiting on a response, it is updated to return true.

func (*PendingMap) Get added in v0.27.0

func (pm *PendingMap) Get(pieceID storj.PieceID) (*PendingTransfer, bool)

Get returns the pending transfer item from the map, if it exists.

func (*PendingMap) IsFinishedPromise added in v0.27.0

func (pm *PendingMap) IsFinishedPromise() *PendingFinishedPromise

IsFinishedPromise returns a promise for the caller to wait on to determine the finished status of the pending map. If we have enough information to determine the finished status, we update the promise to have an answer immediately. Otherwise, we attach the promise to the pending map to be updated and cleared by either Put or DoneSending (whichever happens first).

func (*PendingMap) Length added in v0.27.0

func (pm *PendingMap) Length() int

Length returns the number of elements in the map.

func (*PendingMap) Put added in v0.27.0

func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error

Put adds work to the map. If there is already work associated with this piece ID it returns an error. If there is a PendingFinishedPromise waiting, that promise is updated to return false.

type PendingTransfer added in v0.27.0

type PendingTransfer struct {
	StreamID            uuid.UUID
	Position            metabase.SegmentPosition
	PieceSize           int64
	SatelliteMessage    *pb.SatelliteMessage
	OriginalRootPieceID storj.PieceID
	PieceNum            uint16
}

PendingTransfer is the representation of work on the pending map. It contains information about a transfer request that has been sent to a storagenode by the satellite.

type Progress

type Progress struct {
	NodeID            storj.NodeID
	BytesTransferred  int64
	PiecesTransferred int64
	PiecesFailed      int64
	UpdatedAt         time.Time
}

Progress represents the persisted graceful exit progress record.

type TransferQueueItem

type TransferQueueItem struct {
	NodeID              storj.NodeID
	StreamID            uuid.UUID
	Position            metabase.SegmentPosition
	PieceNum            int32
	RootPieceID         storj.PieceID
	DurabilityRatio     float64
	QueuedAt            time.Time
	RequestedAt         *time.Time
	LastFailedAt        *time.Time
	LastFailedCode      *int
	FailedCount         *int
	FinishedAt          *time.Time
	OrderLimitSendCount int
}

TransferQueueItem represents the persisted graceful exit queue record.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL