piecestore

package
v1.120.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 10, 2025 License: AGPL-3.0 Imports: 48 Imported by: 1

Documentation

Overview

Package piecestore contains the endpoint for responding to requests from the uplinks and satellites. It implements the upload and download protocol, where the counterpart is in uplink. It uses trust packages to establish trusted satellites.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrVerifyNotAuthorized is returned when the one submitting the action is not authorized to perform that action.
	ErrVerifyNotAuthorized = errs.Class("not authorized")
	// ErrVerifyUntrusted is returned when action is not trusted.
	ErrVerifyUntrusted = errs.Class("untrusted")
	// ErrVerifyDuplicateRequest is returned when serial number has been already used to submit an action.
	ErrVerifyDuplicateRequest = errs.Class("duplicate request")
)

Functions

This section is empty.

Types

type Config

type Config struct {
	DatabaseDir             string        `help:"directory to store databases. if empty, uses data path" default:""`
	ExpirationGracePeriod   time.Duration `help:"how soon before expiration date should things be considered expired" default:"48h0m0s"`
	MaxConcurrentRequests   int           `help:"how many concurrent requests are allowed, before uploads are rejected. 0 represents unlimited." default:"0"`
	OrderLimitGracePeriod   time.Duration `help:"how long after OrderLimit creation date are OrderLimits no longer accepted" default:"1h0m0s"`
	CacheSyncInterval       time.Duration `help:"how often the space used cache is synced to persistent storage" releaseDefault:"1h0m0s" devDefault:"0h1m0s"`
	PieceScanOnStartup      bool          `help:"if set to true, all pieces disk usage is recalculated on startup" default:"true"`
	StreamOperationTimeout  time.Duration `help:"how long to spend waiting for a stream operation before canceling" default:"30m"`
	ReportCapacityThreshold memory.Size   `help:"threshold below which to immediately notify satellite of capacity" default:"5GB" hidden:"true"`
	MaxUsedSerialsSize      memory.Size   `help:"amount of memory allowed for used serials store - once surpassed, serials will be dropped at random" default:"1MB"`

	MinUploadSpeed                    memory.Size   `` /* 189-byte string literal not displayed */
	MinUploadSpeedGraceDuration       time.Duration `` /* 169-byte string literal not displayed */
	MinUploadSpeedCongestionThreshold float64       `` /* 193-byte string literal not displayed */

	Trust   trust.Config
	Monitor monitor.Config
	Orders  orders.Config

	// deprecated flags
	DeleteWorkers      int           `help:"how many piece delete workers (unused)" default:"1" hidden:"true" deprecated:"true"`
	DeleteQueueSize    int           `help:"size of the piece delete queue (unused)" default:"10000" hidden:"true" deprecated:"true"`
	ExistsCheckWorkers int           `help:"how many workers to use to check if satellite pieces exists (unused)" default:"5" hidden:"true" deprecated:"true"`
	RetainTimeBuffer   time.Duration `help:"allows for small differences in the satellite and storagenode clocks" default:"48h0m0s" hidden:"true" deprecated:"true"`
}

Config defines parameters for piecestore endpoint.

type Endpoint

type Endpoint struct {
	pb.DRPCContactUnimplementedServer
	// contains filtered or unexported fields
}

Endpoint implements uploading, downloading and deleting for a storage node..

architecture: Endpoint

func NewEndpoint

func NewEndpoint(log *zap.Logger, ident *identity.FullIdentity, trustSource trust.TrustedSatelliteSource, monitor *monitor.Service, retain []QueueRetain, pingStats PingStatsSource, pieceBackend PieceBackend, ordersStore *orders.FileStore, usage bandwidth.DB, usedSerials *usedserials.Table, config Config) (*Endpoint, error)

NewEndpoint creates a new piecestore endpoint.

func (*Endpoint) Delete deprecated

func (endpoint *Endpoint) Delete(ctx context.Context, delete *pb.PieceDeleteRequest) (_ *pb.PieceDeleteResponse, err error)

Delete handles deleting a piece on piece store requested by uplink.

Deprecated: use DeletePieces instead.

func (*Endpoint) DeletePieces added in v0.28.4

func (endpoint *Endpoint) DeletePieces(
	ctx context.Context, req *pb.DeletePiecesRequest,
) (_ *pb.DeletePiecesResponse, err error)

DeletePieces delete a list of pieces on satellite request.

func (*Endpoint) Download

func (endpoint *Endpoint) Download(stream pb.DRPCPiecestore_DownloadStream) (err error)

Download handles Downloading a piece on piecestore.

func (*Endpoint) Exists added in v1.69.2

func (endpoint *Endpoint) Exists(
	ctx context.Context, req *pb.ExistsRequest,
) (_ *pb.ExistsResponse, err error)

Exists check if pieces from the list exists on storage node. Request will accept only connections from trusted satellite and will check pieces only for that satellite.

func (*Endpoint) IsExpired

func (endpoint *Endpoint) IsExpired(expiration time.Time) bool

IsExpired checks whether the date has already expired (with a threshold) at the time of calling this function.

func (*Endpoint) RestoreTrash added in v0.27.0

func (endpoint *Endpoint) RestoreTrash(ctx context.Context, restoreTrashReq *pb.RestoreTrashRequest) (res *pb.RestoreTrashResponse, err error)

RestoreTrash restores all trashed items for the satellite issuing the call.

func (*Endpoint) Retain added in v0.15.0

func (endpoint *Endpoint) Retain(ctx context.Context, retainReq *pb.RetainRequest) (res *pb.RetainResponse, err error)

Retain keeps only piece ids specified in the request.

func (*Endpoint) RetainBig added in v1.98.1

func (endpoint *Endpoint) RetainBig(stream pb.DRPCPiecestore_RetainBigStream) (err error)

RetainBig keeps only piece ids specified in the request, supports big bloom filters.

func (*Endpoint) TestLiveRequestCount added in v0.21.0

func (endpoint *Endpoint) TestLiveRequestCount() int32

TestLiveRequestCount returns the current number of live requests.

func (*Endpoint) Upload

func (endpoint *Endpoint) Upload(stream pb.DRPCPiecestore_UploadStream) (err error)

Upload handles uploading a piece on piece store.

func (*Endpoint) VerifyOrder

func (endpoint *Endpoint) VerifyOrder(ctx context.Context, limit *pb.OrderLimit, order *pb.Order, largestOrderAmount int64) (err error)

VerifyOrder verifies that the order corresponds to the order limit and has all the necessary fields.

func (*Endpoint) VerifyOrderLimitSignature

func (endpoint *Endpoint) VerifyOrderLimitSignature(ctx context.Context, limit *pb.OrderLimit) (err error)

VerifyOrderLimitSignature verifies that the order limit signature is valid.

func (*Endpoint) VerifyPieceHash

func (endpoint *Endpoint) VerifyPieceHash(ctx context.Context, limit *pb.OrderLimit, hash *pb.PieceHash, expectedHash []byte) (err error)

VerifyPieceHash verifies whether the piece hash is properly signed and matches the locally computed hash.

type HashStoreBackend added in v1.119.2

type HashStoreBackend struct {
	// contains filtered or unexported fields
}

HashStoreBackend implements PieceBackend using the hashstore.

func NewHashStoreBackend added in v1.119.2

func NewHashStoreBackend(
	dir string,
	bfm *retain.BloomFilterManager,
	rtm *retain.RestoreTimeManager,
	log *zap.Logger,
) (*HashStoreBackend, error)

NewHashStoreBackend constructs a new HashStoreBackend with the provided values. The log and hash directory are allowed to be the same.

func (*HashStoreBackend) Close added in v1.119.2

func (hsb *HashStoreBackend) Close() error

Close closes the HashStoreBackend.

func (*HashStoreBackend) ForgetSatellite added in v1.119.2

func (hsb *HashStoreBackend) ForgetSatellite(ctx context.Context, satellite storj.NodeID) (err error)

ForgetSatellite closes the database for the satellite and removes the directory.

func (*HashStoreBackend) Reader added in v1.119.2

func (hsb *HashStoreBackend) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ PieceReader, err error)

Reader implements PieceBackend.

func (*HashStoreBackend) SpaceUsage added in v1.119.2

func (hsb *HashStoreBackend) SpaceUsage() (subs SpaceUsageBySatellite)

SpaceUsage gets a SpaceUsageBySatellite from the HashStoreBackend.

func (*HashStoreBackend) StartRestore added in v1.119.2

func (hsb *HashStoreBackend) StartRestore(ctx context.Context, satellite storj.NodeID) (err error)

StartRestore implements PieceBackend.

func (*HashStoreBackend) Stats added in v1.119.2

func (hsb *HashStoreBackend) Stats(cb func(key monkit.SeriesKey, field string, val float64))

Stats implements monkit.StatSource.

func (*HashStoreBackend) TestingCompact added in v1.119.2

func (hsb *HashStoreBackend) TestingCompact(ctx context.Context) error

TestingCompact calls Compact on all of the hashstore databases.

func (*HashStoreBackend) Writer added in v1.119.2

func (hsb *HashStoreBackend) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, hashAlgo pb.PieceHashAlgorithm, expires time.Time) (_ PieceWriter, err error)

Writer implements PieceBackend.

type MigratingBackend added in v1.119.2

type MigratingBackend struct {
	// contains filtered or unexported fields
}

MigratingBackend is a PieceBackend that can passively migrate pieces from an OldPieceBackend to a HashStoreBackend.

func NewMigratingBackend added in v1.119.2

func NewMigratingBackend(log *zap.Logger, old *OldPieceBackend, new *HashStoreBackend, store *satstore.SatelliteStore, migrator Migrator) *MigratingBackend

NewMigratingBackend constructs a MigratingBackend with the given parameters.

func (*MigratingBackend) Reader added in v1.119.2

func (m *MigratingBackend) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (r PieceReader, err error)

Reader implements PieceBackend by reading from the store appropriate for the migration status, potentially triggering a passive migration for the piece.

func (*MigratingBackend) StartRestore added in v1.119.2

func (m *MigratingBackend) StartRestore(ctx context.Context, satellite storj.NodeID) (err error)

StartRestore implements PieceBackend and triggers a restore on both backends.

func (*MigratingBackend) Stats added in v1.119.2

func (m *MigratingBackend) Stats(cb func(key monkit.SeriesKey, field string, val float64))

Stats implements monkit.StatSource.

func (*MigratingBackend) UpdateState added in v1.119.2

func (m *MigratingBackend) UpdateState(ctx context.Context, satellite storj.NodeID, cb func(state *MigrationState))

UpdateState calls the callback with the current MigrationState for the satellite allowing the caller to inspect or modify the state.

func (*MigratingBackend) Writer added in v1.119.2

func (m *MigratingBackend) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, hash pb.PieceHashAlgorithm, expires time.Time) (_ PieceWriter, err error)

Writer implements PieceBackend by writing to the store appropriate for the migration status.

type MigrationState added in v1.119.2

type MigrationState struct {
	PassiveMigrate bool // passive migrate pieces on read
	WriteToNew     bool // should writes go to the new store
	ReadNewFirst   bool // should reads go to the new or old store first
	TTLToNew       bool // any TTL write should go to the new store
}

MigrationState keeps track of the migration state for a satellite.

N.B. we rely on the zero value meaning "no migration".

type Migrator added in v1.119.2

type Migrator interface {
	TryMigrateOne(sat storj.NodeID, piece storj.PieceID)
}

Migrator is an interface for migrating pieces.

type OldConfig added in v0.11.0

type OldConfig struct {
	Path               string      `help:"path to store data in" default:"$CONFDIR/storage"`
	AllocatedDiskSpace memory.Size `user:"true" help:"total allocated disk space in bytes" default:"1TB"`

	// deprecated flags
	WhitelistedSatellites  storj.NodeURLs `` /* 134-byte string literal not displayed */
	AllocatedBandwidth     memory.Size    `user:"true" help:"total allocated bandwidth in bytes (deprecated)" default:"0B" hidden:"true" deprecated:"true"`
	KBucketRefreshInterval time.Duration  `` /* 135-byte string literal not displayed */
}

OldConfig contains everything necessary for a server.

type OldPieceBackend added in v1.118.4

type OldPieceBackend struct {
	// contains filtered or unexported fields
}

OldPieceBackend takes a bunch of pieces the endpoint used and packages them into a PieceBackend.

func NewOldPieceBackend added in v1.118.4

func NewOldPieceBackend(store *pieces.Store, trashChore RestoreTrash, monitor *monitor.Service) *OldPieceBackend

NewOldPieceBackend constructs an OldPieceBackend.

func (*OldPieceBackend) Reader added in v1.118.4

func (opb *OldPieceBackend) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ PieceReader, err error)

Reader implements PieceBackend and returns a PieceReader for a piece.

func (*OldPieceBackend) StartRestore added in v1.118.4

func (opb *OldPieceBackend) StartRestore(ctx context.Context, satellite storj.NodeID) (err error)

StartRestore implements PieceBackend and starts a restore operation for a satellite.

func (*OldPieceBackend) Writer added in v1.118.4

func (opb *OldPieceBackend) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, hashAlgorithm pb.PieceHashAlgorithm, expiration time.Time) (_ PieceWriter, err error)

Writer implements PieceBackend and returns a PieceWriter for a piece.

type PieceBackend added in v1.118.4

PieceBackend is the minimal interface needed for the endpoints to do its job.

type PieceReader added in v1.118.4

type PieceReader interface {
	io.ReadSeekCloser
	Trash() bool
	Size() int64
	GetPieceHeader() (*pb.PieceHeader, error)
}

PieceReader is an interface for reading a piece.

type PieceWriter added in v1.118.4

type PieceWriter interface {
	io.Writer
	Size() int64
	Hash() []byte
	Cancel(context.Context) error
	Commit(context.Context, *pb.PieceHeader) error
}

PieceWriter is an interface for writing a piece.

type PingStatsSource added in v1.115.1

type PingStatsSource interface {
	WasPinged(when time.Time)
}

PingStatsSource stores the last time when the target was pinged.

type QueueRetain added in v1.117.3

type QueueRetain interface {
	Queue(ctx context.Context, satelliteID storj.NodeID, req *pb.RetainRequest) error
	Status() retain.Status
}

QueueRetain is an interface for retaining pieces in the queue and checking status. A restricted view of retain.Service.

type RestoreTrash added in v1.117.3

type RestoreTrash interface {
	StartRestore(ctx context.Context, satellite storj.NodeID) error
}

RestoreTrash is an interface for restoring trash.

type SpaceUsage added in v1.119.2

type SpaceUsage struct {
	UsedTotal       int64 // total space used including metadata and unreferenced data
	UsedForPieces   int64 // total space used by live pieces
	UsedForTrash    int64 // total space used by trash pieces
	UsedForMetadata int64 // total space used by metadata (hash tables and stuff)
}

SpaceUsage describes the amount of space used by a PieceBackend.

type SpaceUsageBySatellite added in v1.119.2

type SpaceUsageBySatellite struct {
	Aggregate   SpaceUsage
	BySatellite map[storj.NodeID]SpaceUsage
}

SpaceUsageBySatellite describes an aggregate space usage and a space usage for each satellite known about.

type TestingBackend added in v1.119.2

type TestingBackend struct {
	// contains filtered or unexported fields
}

TestingBackend wraps a PieceBackend with Testing methods.

func NewTestingBackend added in v1.119.2

func NewTestingBackend(pb PieceBackend) *TestingBackend

NewTestingBackend constructs a TestingBackend wrapping a PieceBackend.

func (*TestingBackend) Reader added in v1.119.2

func (tb *TestingBackend) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (PieceReader, error)

Reader implements PieceBackend.

func (*TestingBackend) StartRestore added in v1.119.2

func (tb *TestingBackend) StartRestore(ctx context.Context, satellite storj.NodeID) error

StartRestore implements PieceBackend.

func (*TestingBackend) TestingCorruptPiece added in v1.119.2

func (tb *TestingBackend) TestingCorruptPiece(satellite storj.NodeID, pieceID storj.PieceID)

TestingCorruptPiece marks the piece as corrupted (returns invalid data) if it exists.

func (*TestingBackend) TestingDeleteAllPiecesForSatellite added in v1.119.2

func (tb *TestingBackend) TestingDeleteAllPiecesForSatellite(satellite storj.NodeID)

TestingDeleteAllPiecesForSatellite marks every piece for the satellite as deleted.

func (*TestingBackend) TestingDeletePiece added in v1.119.2

func (tb *TestingBackend) TestingDeletePiece(satellite storj.NodeID, pieceID storj.PieceID)

TestingDeletePiece marks the piece as deleted if it exists.

func (*TestingBackend) TestingEnableMethods added in v1.119.2

func (tb *TestingBackend) TestingEnableMethods()

TestingEnableMethods enables the Testing methods and must be called from a stack that contains the testplanet package.

func (*TestingBackend) TestingMutatePiece added in v1.119.2

func (tb *TestingBackend) TestingMutatePiece(satellite storj.NodeID, pieceID storj.PieceID, mutator func(contents []byte, header *pb.PieceHeader))

TestingMutatePiece mutates the piece using the provided callback if it exists.

func (*TestingBackend) TestingSetError added in v1.119.2

func (tb *TestingBackend) TestingSetError(err error)

TestingSetError sets an error to be returned by every PieceBackend method.

func (*TestingBackend) TestingSetLatency added in v1.119.2

func (tb *TestingBackend) TestingSetLatency(delay time.Duration)

TestingSetLatency sets a latency that is added to every PieceBackend method.

func (*TestingBackend) Writer added in v1.119.2

func (tb *TestingBackend) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, hashAlgorithm pb.PieceHashAlgorithm, expiration time.Time) (PieceWriter, error)

Writer implements PieceBackend.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL