gracefulexit

package
v1.3.1-rc Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 22, 2020 License: AGPL-3.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// Error is the default error class for graceful exit package.
	Error = errs.Class("gracefulexit")

	// ErrNodeNotFound is returned if a graceful exit entry for a  node does not exist in database
	ErrNodeNotFound = errs.Class("graceful exit node not found")

	// ErrAboveOptimalThreshold is returned if a graceful exit entry for a node has more pieces than required.
	ErrAboveOptimalThreshold = errs.Class("pointer has more pieces than required")
)
View Source
var (
	// ErrInvalidArgument is an error class for invalid argument errors used to check which rpc code to use.
	ErrInvalidArgument = errs.Class("graceful exit")
	// ErrIneligibleNodeAge is an error class for when a node has not been on the network long enough to graceful exit.
	ErrIneligibleNodeAge = errs.Class("node is not yet eligible for graceful exit")
)

Functions

This section is empty.

Types

type Chore added in v0.23.0

type Chore struct {
	Loop *sync2.Cycle
	// contains filtered or unexported fields
}

Chore populates the graceful exit transfer queue.

architecture: Chore

func NewChore added in v0.23.0

func NewChore(log *zap.Logger, db DB, overlay overlay.DB, metaLoop *metainfo.Loop, config Config) *Chore

NewChore instantiates Chore.

func (*Chore) Close added in v0.23.0

func (chore *Chore) Close() error

Close closes chore.

func (*Chore) Run added in v0.23.0

func (chore *Chore) Run(ctx context.Context) (err error)

Run starts the chore.

type Config added in v0.23.0

type Config struct {
	Enabled bool `help:"whether or not graceful exit is enabled on the satellite side." default:"true"`

	ChoreBatchSize int           `help:"size of the buffer used to batch inserts into the transfer queue." default:"500"`
	ChoreInterval  time.Duration `help:"how often to run the transfer queue chore." releaseDefault:"30s" devDefault:"10s"`

	EndpointBatchSize int `help:"size of the buffer used to batch transfer queue reads and sends to the storage node." default:"300"`

	MaxFailuresPerPiece          int           `help:"maximum number of transfer failures per piece." default:"5"`
	OverallMaxFailuresPercentage int           `help:"maximum percentage of transfer failures per node." default:"10"`
	MaxInactiveTimeFrame         time.Duration `help:"maximum inactive time frame of transfer activities per node." default:"168h"`
	RecvTimeout                  time.Duration `help:"the minimum duration for receiving a stream from a storage node before timing out" default:"10m"`
	MaxOrderLimitSendCount       int           `help:"maximum number of order limits a satellite sends to a node before marking piece transfer failed" default:"10"`
	NodeMinAgeInMonths           int           `help:"minimum age for a node on the network in order to initiate graceful exit" default:"6"`
}

Config for the chore

type DB

type DB interface {
	// IncrementProgress increments transfer stats for a node.
	IncrementProgress(ctx context.Context, nodeID storj.NodeID, bytes int64, successfulTransfers int64, failedTransfers int64) error
	// GetProgress gets a graceful exit progress entry.
	GetProgress(ctx context.Context, nodeID storj.NodeID) (*Progress, error)

	// Enqueue batch inserts graceful exit transfer queue entries it does not exist.
	Enqueue(ctx context.Context, items []TransferQueueItem) error
	// UpdateTransferQueueItem creates a graceful exit transfer queue entry.
	UpdateTransferQueueItem(ctx context.Context, item TransferQueueItem) error
	// DeleteTransferQueueItem deletes a graceful exit transfer queue entry.
	DeleteTransferQueueItem(ctx context.Context, nodeID storj.NodeID, path []byte, pieceNum int32) error
	// DeleteTransferQueueItem deletes a graceful exit transfer queue entries by nodeID.
	DeleteTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
	// DeleteFinishedTransferQueueItem deletes finiahed graceful exit transfer queue entries.
	DeleteFinishedTransferQueueItems(ctx context.Context, nodeID storj.NodeID) error
	// GetTransferQueueItem gets a graceful exit transfer queue entry.
	GetTransferQueueItem(ctx context.Context, nodeID storj.NodeID, path []byte, pieceNum int32) (*TransferQueueItem, error)
	// GetIncomplete gets incomplete graceful exit transfer queue entries ordered by durability ratio and queued date ascending.
	GetIncomplete(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error)
	// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries in the database ordered by durability ratio and queued date ascending.
	GetIncompleteNotFailed(ctx context.Context, nodeID storj.NodeID, limit int, offset int64) ([]*TransferQueueItem, error)
	// GetIncompleteNotFailed gets incomplete graceful exit transfer queue entries that have failed <= maxFailures times, ordered by durability ratio and queued date ascending.
	GetIncompleteFailed(ctx context.Context, nodeID storj.NodeID, maxFailures int, limit int, offset int64) ([]*TransferQueueItem, error)
	// IncrementOrderLimitSendCount increments the number of times a node has been sent an order limit for transferring.
	IncrementOrderLimitSendCount(ctx context.Context, nodeID storj.NodeID, path []byte, pieceNum int32) error
}

DB implements CRUD operations for graceful exit service

architecture: Database

type Endpoint added in v0.24.0

type Endpoint struct {
	// contains filtered or unexported fields
}

Endpoint for handling the transfer of pieces for Graceful Exit.

func NewEndpoint added in v0.24.0

func NewEndpoint(log *zap.Logger, signer signing.Signer, db DB, overlaydb overlay.DB, overlay *overlay.Service, metainfo *metainfo.Service, orders *orders.Service,
	peerIdentities overlay.PeerIdentities, config Config) *Endpoint

NewEndpoint creates a new graceful exit endpoint.

func (*Endpoint) DRPC added in v0.24.0

func (endpoint *Endpoint) DRPC() pb.DRPCSatelliteGracefulExitServer

DRPC returns a DRPC form of the endpoint.

func (*Endpoint) Process added in v0.24.0

func (endpoint *Endpoint) Process(stream pbgrpc.SatelliteGracefulExit_ProcessServer) (err error)

Process is called by storage nodes to receive pieces to transfer to new nodes and get exit status.

type PathCollector added in v0.23.0

type PathCollector struct {
	// contains filtered or unexported fields
}

PathCollector uses the metainfo loop to add paths to node reservoirs

architecture: Observer

func NewPathCollector added in v0.23.0

func NewPathCollector(db DB, nodeIDs storj.NodeIDList, log *zap.Logger, batchSize int) *PathCollector

NewPathCollector instantiates a path collector.

func (*PathCollector) Flush added in v0.23.0

func (collector *PathCollector) Flush(ctx context.Context) (err error)

Flush persists the current buffer items to the database.

func (*PathCollector) InlineSegment added in v0.23.0

func (collector *PathCollector) InlineSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)

InlineSegment returns nil because we're only auditing for storage nodes for now

func (*PathCollector) Object added in v0.23.0

func (collector *PathCollector) Object(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)

Object returns nil because the audit service does not interact with objects

func (*PathCollector) RemoteSegment added in v0.23.0

func (collector *PathCollector) RemoteSegment(ctx context.Context, path metainfo.ScopedPath, pointer *pb.Pointer) (err error)

RemoteSegment takes a remote segment found in metainfo and creates a graceful exit transfer queue item if it doesn't exist already

type PendingFinishedPromise added in v0.27.0

type PendingFinishedPromise struct {
	// contains filtered or unexported fields
}

PendingFinishedPromise for waiting for information about finished state.

func (*PendingFinishedPromise) Wait added in v0.27.0

func (promise *PendingFinishedPromise) Wait(ctx context.Context) (bool, error)

Wait should be called (once) after acquiring the finished promise and will return whether the pending map is finished.

type PendingMap added in v0.27.0

type PendingMap struct {
	// contains filtered or unexported fields
}

PendingMap for managing concurrent access to the pending transfer map.

func NewPendingMap added in v0.27.0

func NewPendingMap() *PendingMap

NewPendingMap creates a new PendingMap.

func (*PendingMap) Delete added in v0.27.0

func (pm *PendingMap) Delete(pieceID storj.PieceID) error

Delete removes the pending transfer item from the map and returns an error if the data does not exist.

func (*PendingMap) DoneSending added in v0.27.0

func (pm *PendingMap) DoneSending(err error) error

DoneSending is called (with an optional error) when no more work will be added to the map. If DoneSending has already been called, an error is returned. If a PendingFinishedPromise is waiting on a response, it is updated to return true.

func (*PendingMap) Get added in v0.27.0

func (pm *PendingMap) Get(pieceID storj.PieceID) (*PendingTransfer, bool)

Get returns the pending transfer item from the map, if it exists.

func (*PendingMap) IsFinishedPromise added in v0.27.0

func (pm *PendingMap) IsFinishedPromise() *PendingFinishedPromise

IsFinishedPromise returns a promise for the caller to wait on to determine the finished status of the pending map. If we have enough information to determine the finished status, we update the promise to have an answer immediately. Otherwise, we attach the promise to the pending map to be updated and cleared by either Put or DoneSending (whichever happens first).

func (*PendingMap) Length added in v0.27.0

func (pm *PendingMap) Length() int

Length returns the number of elements in the map.

func (*PendingMap) Put added in v0.27.0

func (pm *PendingMap) Put(pieceID storj.PieceID, pendingTransfer *PendingTransfer) error

Put adds work to the map. If there is already work associated with this piece ID it returns an error. If there is a PendingFinishedPromise waiting, that promise is updated to return false.

type PendingTransfer added in v0.27.0

type PendingTransfer struct {
	Path             []byte
	PieceSize        int64
	SatelliteMessage *pb.SatelliteMessage
	OriginalPointer  *pb.Pointer
	PieceNum         int32
}

PendingTransfer is the representation of work on the pending map. It contains information about a transfer request that has been sent to a storagenode by the satellite.

type Progress

type Progress struct {
	NodeID            storj.NodeID
	BytesTransferred  int64
	PiecesTransferred int64
	PiecesFailed      int64
	UpdatedAt         time.Time
}

Progress represents the persisted graceful exit progress record.

type TransferQueueItem

type TransferQueueItem struct {
	NodeID              storj.NodeID
	Path                []byte
	PieceNum            int32
	RootPieceID         storj.PieceID
	DurabilityRatio     float64
	QueuedAt            time.Time
	RequestedAt         *time.Time
	LastFailedAt        *time.Time
	LastFailedCode      *int
	FailedCount         *int
	FinishedAt          *time.Time
	OrderLimitSendCount int
}

TransferQueueItem represents the persisted graceful exit queue record.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL