Documentation ¶
Index ¶
- Variables
- type Chore
- type Config
- type DB
- type Endpoint
- type PathCollector
- func (collector *PathCollector) Flush(ctx context.Context) (err error)
- func (collector *PathCollector) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)
- func (collector *PathCollector) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)
- func (collector *PathCollector) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)
- type PendingFinishedPromise
- type PendingMap
- func (pm *PendingMap) Delete(pieceID storj.PieceID) error
- func (pm *PendingMap) DoneSending(err error) error
- func (pm *PendingMap) Get(pieceID storj.PieceID) (*PendingTransfer, bool)
- func (pm *PendingMap) IsFinishedPromise() *PendingFinishedPromise
- func (pm *PendingMap) Length() int
- func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error
- type PendingTransfer
- type Progress
- type TransferQueueItem
Constants ¶
This section is empty.
Variables ¶
var ( // Error is the default error class for graceful exit package. Error = errs.Class("gracefulexit") // ErrNodeNotFound is returned if a graceful exit entry for a node does not exist in database ErrNodeNotFound = errs.Class("graceful exit node not found") // ErrAboveOptimalThreshold is returned if a graceful exit entry for a node has more pieces than required. ErrAboveOptimalThreshold = errs.Class("pointer has more pieces than required") )
var ( // ErrInvalidArgument is an error class for invalid argument errors used to check which rpc code to use. ErrInvalidArgument = errs.Class("graceful exit") )
Functions ¶
This section is empty.
Types ¶
type Chore ¶ added in v0.23.0
Chore populates the graceful exit transfer queue.
architecture: Chore
type Config ¶ added in v0.23.0
type Config struct { Enabled bool `help:"whether or not graceful exit is enabled on the satellite side." releaseDefault:"false" devDefault:"true"` ChoreBatchSize int `help:"size of the buffer used to batch inserts into the transfer queue." default:"500"` ChoreInterval time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s"` EndpointBatchSize int `help:"size of the buffer used to batch transfer queue reads and sends to the storage node." default:"100"` MaxFailuresPerPiece int `help:"maximum number of transfer failures per piece." default:"3"` OverallMaxFailuresPercentage int `help:"maximum percentage of transfer failures per node." default:"10"` MaxInactiveTimeFrame time.Duration `help:"maximum inactive time frame of transfer activities per node." default:"500h"` RecvTimeout time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"10m"` MaxOrderLimitSendCount int `help:"maximum number of order limits a satellite sends to a node before marking piece transfer failed" default:"5"` }
Config for the chore
type DB ¶
type DB interface { // IncrementProgress increments transfer stats for a node. IncrementProgress(ctx context.Context, nodeID storj.NodeID, bytes int64, successfulTransfers int64, failedTransfers int64) error // GetProgress gets a graceful exit progress entry. GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error) // Enqueue batch inserts graceful exit transfer queue entries it does not exist. Enqueue(ctx context.Context, items []TransferQueueItem) error // UpdateTransferQueueItem creates a graceful exit transfer queue entry. UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error // DeleteTransferQueueItem deletes a graceful exit transfer queue entry. DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, path []byte, pieceNum int32) error // DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID. DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error // DeleteFinishedTransferQueueItem deletes finiahed graceful exit transfer queue entries. DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error // GetTransferQueueItem gets a graceful exit transfer queue entry. GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, path []byte, pieceNum int32) (*TransferQueueItem, error) // GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending. GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error) // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries in the database ordered by durability ratio and queued date ascending. GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error) // GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending. GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*TransferQueueItem, error) // IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring. IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, path []byte, pieceNum int32) error }
DB implements CRUD operations for graceful exit service
architecture: Database
type Endpoint ¶ added in v0.24.0
type Endpoint struct {
// contains filtered or unexported fields
}
Endpoint for handling the transfer of pieces for Graceful Exit.
func NewEndpoint ¶ added in v0.24.0
func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overlay.DB, overlay *overlay.Service, metainfo *metainfo.Service, orders *orders.Service, peerIdentities overlay.PeerIdentities, config Config) *Endpoint
NewEndpoint creates a new graceful exit endpoint.
func (*Endpoint) DRPC ¶ added in v0.24.0
func (endpoint *Endpoint) DRPC() pb.DRPCSatelliteGracefulExitServer
DRPC returns a DRPC form of the endpoint.
type PathCollector ¶ added in v0.23.0
type PathCollector struct {
// contains filtered or unexported fields
}
PathCollector uses the metainfo loop to add paths to node reservoirs
architecture: Observer
func NewPathCollector ¶ added in v0.23.0
func NewPathCollector(db DB, nodeIDs storj.NodeIDList, log *zap.Logger, batchSize int) *PathCollector
NewPathCollector instantiates a path collector.
func (*PathCollector) Flush ¶ added in v0.23.0
func (collector *PathCollector) Flush(ctx context.Context) (err error)
Flush persists the current buffer items to the database.
func (*PathCollector) InlineSegment ¶ added in v0.23.0
func (collector *PathCollector) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)
InlineSegment returns nil because we're only auditing for storage nodes for now
func (*PathCollector) Object ¶ added in v0.23.0
func (collector *PathCollector) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)
Object returns nil because the audit service does not interact with objects
func (*PathCollector) RemoteSegment ¶ added in v0.23.0
func (collector *PathCollector) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)
RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already
type PendingFinishedPromise ¶ added in v0.27.0
type PendingFinishedPromise struct {
// contains filtered or unexported fields
}
PendingFinishedPromise for waiting for information about finished state.
type PendingMap ¶ added in v0.27.0
type PendingMap struct {
// contains filtered or unexported fields
}
PendingMap for managing concurrent access to the pending transfer map.
func NewPendingMap ¶ added in v0.27.0
func NewPendingMap() *PendingMap
NewPendingMap creates a new PendingMap.
func (*PendingMap) Delete ¶ added in v0.27.0
func (pm *PendingMap) Delete(pieceID storj.PieceID) error
Delete removes the pending transfer item from the map and returns an error if the data does not exist.
func (*PendingMap) DoneSending ¶ added in v0.27.0
func (pm *PendingMap) DoneSending(err error) error
DoneSending is called (with an optional error) when no more work will be added to the map. If DoneSending has already been called, an error is returned. If a PendingFinishedPromise is waiting on a response, it is updated to return true.
func (*PendingMap) Get ¶ added in v0.27.0
func (pm *PendingMap) Get(pieceID storj.PieceID) (*PendingTransfer, bool)
Get returns the pending transfer item from the map, if it exists.
func (*PendingMap) IsFinishedPromise ¶ added in v0.27.0
func (pm *PendingMap) IsFinishedPromise() *PendingFinishedPromise
IsFinishedPromise returns a promise for the caller to wait on to determine the finished status of the pending map. If we have enough information to determine the finished status, we update the promise to have an answer immediately. Otherwise, we attach the promise to the pending map to be updated and cleared by either Put or DoneSending (whichever happens first).
func (*PendingMap) Length ¶ added in v0.27.0
func (pm *PendingMap) Length() int
Length returns the number of elements in the map.
func (*PendingMap) Put ¶ added in v0.27.0
func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error
Put adds work to the map. If there is already work associated with this piece ID it returns an error. If there is a PendingFinishedPromise waiting, that promise is updated to return false.
type PendingTransfer ¶ added in v0.27.0
type PendingTransfer struct { Path []byte PieceSize int64 SatelliteMessage *pb.SatelliteMessage OriginalPointer *pb.Pointer PieceNum int32 }
PendingTransfer is the representation of work on the pending map. It contains information about a transfer request that has been sent to a storagenode by the satellite.
type Progress ¶
type Progress struct { NodeID storj.NodeID BytesTransferred int64 PiecesTransferred int64 PiecesFailed int64 UpdatedAt time.Time }
Progress represents the persisted graceful exit progress record.
type TransferQueueItem ¶
type TransferQueueItem struct { NodeID storj.NodeID Path []byte PieceNum int32 RootPieceID storj.PieceID DurabilityRatio float64 QueuedAt time.Time RequestedAt *time.Time LastFailedAt *time.Time LastFailedCode *int FailedCount *int FinishedAt *time.Time OrderLimitSendCount int }
TransferQueueItem represents the persisted graceful exit queue record.