Documentation ¶
Index ¶
- Variables
- type Config
- type DB
- type Endpoint
- func (endpoint *Endpoint) GracefulExitFeasibility(ctx context.Context, req *pb.GracefulExitFeasibilityRequest) (_ *pb.GracefulExitFeasibilityResponse, err error)
- func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStream) (err error)
- func (endpoint *Endpoint) SetNowFunc(timeFunc func() time.Time)
- func (endpoint *Endpoint) UpdatePiecesCheckDuplicates(ctx context.Context, segment metabase.Segment, toAdd, toRemove metabase.Pieces, ...) (err error)
- type Observer
- func (obs *Observer) Finish(ctx context.Context) (err error)
- func (obs *Observer) Fork(ctx context.Context) (_ rangedloop.Partial, err error)
- func (obs *Observer) Join(ctx context.Context, partial rangedloop.Partial) (err error)
- func (obs *Observer) Start(ctx context.Context, startTime time.Time) (err error)
- type PendingFinishedPromise
- type PendingMap
- func (pm *PendingMap) Delete(pieceID storj.PieceID) error
- func (pm *PendingMap) DoneSending(err error) error
- func (pm *PendingMap) Get(pieceID storj.PieceID) (*PendingTransfer, bool)
- func (pm *PendingMap) IsFinishedPromise() *PendingFinishedPromise
- func (pm *PendingMap) Length() int
- func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error
- type PendingTransfer
- type Progress
- type TransferQueueItem
Constants ¶
This section is empty.
Variables ¶
var ( // Error is the default error class for graceful exit package. Error = errs.Class("gracefulexit") // ErrNodeNotFound is returned if a graceful exit entry for a node does not exist in database. ErrNodeNotFound = errs.Class("graceful exit node not found") // ErrAboveOptimalThreshold is returned if a graceful exit entry for a node has more pieces than required. ErrAboveOptimalThreshold = errs.Class("segment has more pieces than required") )
var ( // ErrInvalidArgument is an error class for invalid argument errors used to check which rpc code to use. ErrInvalidArgument = errs.Class("graceful exit") // ErrIneligibleNodeAge is an error class for when a node has not been on the network long enough to graceful exit. ErrIneligibleNodeAge = errs.Class("node is not yet eligible for graceful exit") )
Functions ¶
This section is empty.
Types ¶
type Config ¶ added in v0.23.0
type Config struct { Enabled bool `help:"whether or not graceful exit is enabled on the satellite side." default:"true"` TimeBased bool `` /* 149-byte string literal not displayed */ NodeMinAgeInMonths int `help:"minimum age for a node on the network in order to initiate graceful exit" default:"6" testDefault:"0"` ChoreBatchSize int `help:"size of the buffer used to batch inserts into the transfer queue." default:"500" testDefault:"10"` ChoreInterval time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s" testDefault:"$TESTINTERVAL"` UseRangedLoop bool `help:"whether use GE observer with ranged loop." default:"true"` EndpointBatchSize int `help:"size of the buffer used to batch transfer queue reads and sends to the storage node." default:"300" testDefault:"100"` MaxFailuresPerPiece int `help:"maximum number of transfer failures per piece." default:"5"` OverallMaxFailuresPercentage int `help:"maximum percentage of transfer failures per node." default:"10"` MaxInactiveTimeFrame time.Duration `help:"maximum inactive time frame of transfer activities per node." default:"168h" testDefault:"10s"` RecvTimeout time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"2h" testDefault:"1m"` MaxOrderLimitSendCount int `` /* 131-byte string literal not displayed */ AsOfSystemTimeInterval time.Duration `` /* 142-byte string literal not displayed */ TransferQueueBatchSize int `help:"batch size (crdb specific) for deleting and adding items to the transfer queue" default:"1000"` GracefulExitDurationInDays int `help:"number of days it takes to execute a passive graceful exit" default:"30" testDefault:"1"` OfflineCheckInterval time.Duration `help:"how frequently to check uptime ratio of gracefully-exiting nodes" default:"30m" testDefault:"10s"` MinimumOnlineScore float64 `` /* 141-byte string literal not displayed */ }
Config for the chore.
type DB ¶
type DB interface { // IncrementProgress increments transfer stats for a node. IncrementProgress(ctx context.Context, nodeID storj.NodeID, bytes int64, successfulTransfers int64, failedTransfers int64) error // GetProgress gets a graceful exit progress entry. GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error) // Enqueue batch inserts graceful exit transfer queue entries it does not exist. Enqueue(ctx context.Context, items []TransferQueueItem, batchSize int) error // UpdateTransferQueueItem creates a graceful exit transfer queue entry. UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error // DeleteTransferQueueItem deletes a graceful exit transfer queue entry. DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error // DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID. DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error // DeleteFinishedTransferQueueItem deletes finished graceful exit transfer queue entries. DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error // DeleteAllFinishedTransferQueueItems deletes all graceful exit transfer // queue items whose nodes have finished the exit before the indicated time // returning the total number of deleted items. DeleteAllFinishedTransferQueueItems(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration, batchSize int) (count int64, err error) // DeleteFinishedExitProgress deletes exit progress entries for nodes that // finished exiting before the indicated time, returns number of deleted entries. DeleteFinishedExitProgress(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (count int64, err error) // GetFinishedExitNodes gets nodes that are marked having finished graceful exit before a given time. GetFinishedExitNodes(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (finishedNodes []storj.NodeID, err error) // GetTransferQueueItem gets a graceful exit transfer queue entry. GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) (*TransferQueueItem, error) // GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error) // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries in the database ordered by durability ratio and queued date ascending. GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error) // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending. GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*TransferQueueItem, error) // IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring. IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, StreamID uuid.UUID, Position metabase.SegmentPosition, pieceNum int32) error // CountFinishedTransferQueueItemsByNode return a map of the nodes which has // finished the exit before the indicated time but there are at least one item // left in the transfer queue. CountFinishedTransferQueueItemsByNode(ctx context.Context, before time.Time, asOfSystemTimeInterval time.Duration) (map[storj.NodeID]int64, error) }
DB implements CRUD operations for graceful exit service.
architecture: Database
type Endpoint ¶ added in v0.24.0
type Endpoint struct { pb.DRPCSatelliteGracefulExitUnimplementedServer // contains filtered or unexported fields }
Endpoint for handling the transfer of pieces for Graceful Exit.
func NewEndpoint ¶ added in v0.24.0
func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overlay.DB, overlay *overlay.Service, reputation *reputation.Service, metabase *metabase.DB, orders *orders.Service, peerIdentities overlay.PeerIdentities, config Config) *Endpoint
NewEndpoint creates a new graceful exit endpoint.
func (*Endpoint) GracefulExitFeasibility ¶ added in v1.7.1
func (endpoint *Endpoint) GracefulExitFeasibility(ctx context.Context, req *pb.GracefulExitFeasibilityRequest) (_ *pb.GracefulExitFeasibilityResponse, err error)
GracefulExitFeasibility returns node's joined at date, nodeMinAge and if graceful exit available.
func (*Endpoint) Process ¶ added in v0.24.0
func (endpoint *Endpoint) Process(stream pb.DRPCSatelliteGracefulExit_ProcessStream) (err error)
Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.
func (*Endpoint) SetNowFunc ¶ added in v1.89.2
SetNowFunc applies a function to be used in determining the "now" time for graceful exit purposes.
func (*Endpoint) UpdatePiecesCheckDuplicates ¶ added in v1.26.2
func (endpoint *Endpoint) UpdatePiecesCheckDuplicates(ctx context.Context, segment metabase.Segment, toAdd, toRemove metabase.Pieces, checkDuplicates bool) (err error)
UpdatePiecesCheckDuplicates atomically adds toAdd pieces and removes toRemove pieces from the segment.
If checkDuplicates is true it will return an error if the nodes to be added are already in the segment. Then it will remove the toRemove pieces and then it will add the toAdd pieces.
type Observer ¶ added in v1.70.1
type Observer struct {
// contains filtered or unexported fields
}
Observer populates the transfer queue for exiting nodes. It also updates the timed out status and removes transefer queue items for inactive exiting nodes.
func NewObserver ¶ added in v1.70.1
func NewObserver(log *zap.Logger, db DB, overlay overlay.DB, metabase *metabase.DB, config Config) *Observer
NewObserver returns a new ranged loop observer.
func (*Observer) Finish ¶ added in v1.70.1
Finish marks that the exit loop has been completed for newly exiting nodes that were processed in this loop cycle.
func (*Observer) Fork ¶ added in v1.70.1
Fork returns path collector that will populate the transfer queue for segments belonging to newly exiting nodes for its range.
func (*Observer) Join ¶ added in v1.70.1
Join flushes the forked path collector and aggregates collected metrics.
type PendingFinishedPromise ¶ added in v0.27.0
type PendingFinishedPromise struct {
// contains filtered or unexported fields
}
PendingFinishedPromise for waiting for information about finished state.
type PendingMap ¶ added in v0.27.0
type PendingMap struct {
// contains filtered or unexported fields
}
PendingMap for managing concurrent access to the pending transfer map.
func NewPendingMap ¶ added in v0.27.0
func NewPendingMap() *PendingMap
NewPendingMap creates a new PendingMap.
func (*PendingMap) Delete ¶ added in v0.27.0
func (pm *PendingMap) Delete(pieceID storj.PieceID) error
Delete removes the pending transfer item from the map and returns an error if the data does not exist.
func (*PendingMap) DoneSending ¶ added in v0.27.0
func (pm *PendingMap) DoneSending(err error) error
DoneSending is called (with an optional error) when no more work will be added to the map. If DoneSending has already been called, an error is returned. If a PendingFinishedPromise is waiting on a response, it is updated to return true.
func (*PendingMap) Get ¶ added in v0.27.0
func (pm *PendingMap) Get(pieceID storj.PieceID) (*PendingTransfer, bool)
Get returns the pending transfer item from the map, if it exists.
func (*PendingMap) IsFinishedPromise ¶ added in v0.27.0
func (pm *PendingMap) IsFinishedPromise() *PendingFinishedPromise
IsFinishedPromise returns a promise for the caller to wait on to determine the finished status of the pending map. If we have enough information to determine the finished status, we update the promise to have an answer immediately. Otherwise, we attach the promise to the pending map to be updated and cleared by either Put or DoneSending (whichever happens first).
func (*PendingMap) Length ¶ added in v0.27.0
func (pm *PendingMap) Length() int
Length returns the number of elements in the map.
func (*PendingMap) Put ¶ added in v0.27.0
func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error
Put adds work to the map. If there is already work associated with this piece ID it returns an error. If there is a PendingFinishedPromise waiting, that promise is updated to return false.
type PendingTransfer ¶ added in v0.27.0
type PendingTransfer struct { StreamID uuid.UUID Position metabase.SegmentPosition PieceSize int64 SatelliteMessage *pb.SatelliteMessage OriginalRootPieceID storj.PieceID PieceNum uint16 }
PendingTransfer is the representation of work on the pending map. It contains information about a transfer request that has been sent to a storagenode by the satellite.
type Progress ¶
type Progress struct { NodeID storj.NodeID BytesTransferred int64 PiecesTransferred int64 PiecesFailed int64 UpdatedAt time.Time }
Progress represents the persisted graceful exit progress record.
type TransferQueueItem ¶
type TransferQueueItem struct { NodeID storj.NodeID StreamID uuid.UUID Position metabase.SegmentPosition PieceNum int32 RootPieceID storj.PieceID DurabilityRatio float64 QueuedAt time.Time RequestedAt *time.Time LastFailedAt *time.Time LastFailedCode *int FailedCount *int FinishedAt *time.Time OrderLimitSendCount int }
TransferQueueItem represents the persisted graceful exit queue record.