Documentation ¶
Overview ¶
Package pieces is responsible for the low level piece management. It also provides ways to keep track of pieces metadata.
Index ¶
- Constants
- Variables
- func MonitorBlobWriter(name string, writer blobstore.BlobWriter) blobstore.BlobWriter
- func MonitorHash(name string, hash hash.Hash) hash.Hash
- type BlobsUsageCache
- func (blobs *BlobsUsageCache) Close() error
- func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef blobstore.BlobRef) error
- func (blobs *BlobsUsageCache) DeleteNamespace(ctx context.Context, namespace []byte) error
- func (blobs *BlobsUsageCache) DeleteWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) error
- func (blobs *BlobsUsageCache) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error)
- func (blobs *BlobsUsageCache) Recalculate(...)
- func (blobs *BlobsUsageCache) RestoreTrash(ctx context.Context, namespace []byte) ([][]byte, error)
- func (blobs *BlobsUsageCache) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error)
- func (blobs *BlobsUsageCache) SpaceUsedForPieces(ctx context.Context) (int64, int64, error)
- func (blobs *BlobsUsageCache) SpaceUsedForTrash(ctx context.Context) (int64, error)
- func (blobs *BlobsUsageCache) Stats(cb func(key monkit.SeriesKey, field string, val float64))
- func (blobs *BlobsUsageCache) TestCreateV0(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error)
- func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef blobstore.BlobRef, timestamp time.Time) error
- func (blobs *BlobsUsageCache) TrashWithStorageFormat(ctx context.Context, blobRef blobstore.BlobRef, ...) error
- func (blobs *BlobsUsageCache) Update(ctx context.Context, satelliteID storj.NodeID, ...)
- type CacheService
- type Config
- type DeleteRequest
- type Deleter
- type ExpiredInfo
- type FileWalker
- func (fw *FileWalker) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (satPiecesTotal int64, satPiecesContentSize int64, err error)
- func (fw *FileWalker) WalkCleanupTrash(ctx context.Context, satelliteID storj.NodeID, dateBefore time.Time) (bytesDeleted int64, keysDeleted []storj.PieceID, err error)
- func (fw *FileWalker) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, startPrefix string, ...) (err error)
- func (fw *FileWalker) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, ...) (piecesCount, piecesSkipped int64, err error)
- type GCFilewalkerProgress
- type GCFilewalkerProgressDB
- type Info
- type MonitoredBlobWriter
- type MonitoredHash
- type PieceExpirationDB
- type PieceSpaceUsedDB
- type PrefixUsedSpace
- type Reader
- func (r *Reader) Close() error
- func (r *Reader) GetPieceHeader() (*pb.PieceHeader, error)
- func (r *Reader) Read(data []byte) (int, error)
- func (r *Reader) ReadAt(data []byte, offset int64) (int, error)
- func (r *Reader) Seek(offset int64, whence int) (int64, error)
- func (r *Reader) Size() int64
- func (r *Reader) StorageFormatVersion() blobstore.FormatVersion
- type SatelliteUsage
- type StorageStatus
- type Store
- func (store *Store) CheckWritability(ctx context.Context) error
- func (store *Store) CheckWritabilityWithTimeout(ctx context.Context, timeout time.Duration) error
- func (store *Store) CreateVerificationFile(ctx context.Context, id storj.NodeID) error
- func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error)
- func (store *Store) DeleteExpiredBatchSkipV0(ctx context.Context, expireAt time.Time, limit int) (err error)
- func (store *Store) DeleteExpiredV0(ctx context.Context, expiresAt time.Time) (err error)
- func (store *Store) DeleteSatelliteBlobs(ctx context.Context, satellite storj.NodeID) (err error)
- func (store *Store) DeleteSkipV0(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error)
- func (store *Store) EmptyTrash(ctx context.Context, satelliteID storj.NodeID, trashedBefore time.Time) (err error)
- func (store *Store) GetExpired(ctx context.Context, expiredAt time.Time) (info []ExpiredInfo, err error)
- func (store *Store) GetExpiredBatchSkipV0(ctx context.Context, expiredAt time.Time, batchSize int) (batch []ExpiredInfo, err error)
- func (store *Store) GetHashAndLimit(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, ...) (pb.PieceHash, pb.OrderLimit, error)
- func (store *Store) GetV0PieceInfo(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (*Info, error)
- func (store *Store) MigrateV0ToV1(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error)
- func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error)
- func (store *Store) RestoreTrash(ctx context.Context, satelliteID storj.NodeID) (err error)
- func (store *Store) SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, ...) (err error)
- func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal, piecesContentSize int64, err error)
- func (store *Store) SpaceUsedForPieces(ctx context.Context) (piecesTotal int64, piecesContentSize int64, err error)
- func (store *Store) SpaceUsedForPiecesAndTrash(ctx context.Context) (int64, error)
- func (store *Store) SpaceUsedForTrash(ctx context.Context) (int64, error)
- func (store *Store) SpaceUsedTotalAndBySatellite(ctx context.Context) (piecesTotal, piecesContentSize int64, ...)
- func (store *Store) Stat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (blobstore.BlobInfo, error)
- func (store *Store) StorageStatus(ctx context.Context) (_ StorageStatus, err error)
- func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, ...) (err error)
- func (store *Store) TryRestoreTrashPiece(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error)
- func (store *Store) VerifyStorageDir(ctx context.Context, id storj.NodeID) error
- func (store *Store) VerifyStorageDirWithTimeout(ctx context.Context, id storj.NodeID, timeout time.Duration) error
- func (store *Store) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID, lowerIOPriority bool) (piecesTotal, piecesContentSize int64, err error)
- func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, ...) (err error)
- func (store *Store) WalkSatellitePiecesFromPrefix(ctx context.Context, satellite storj.NodeID, startPrefix string, ...) (err error)
- func (store *Store) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, ...) (piecesCount, piecesSkipped int64, err error)
- func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, ...) (_ *Writer, err error)
- type StoreForTest
- func (store StoreForTest) GetV0PieceInfoDBForTest() V0PieceInfoDBForTest
- func (store *StoreForTest) ReaderWithStorageFormat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, ...) (_ *Reader, err error)
- func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, ...) (_ *Writer, err error)
- type StoredPieceAccess
- type TrashChore
- type UsedSpacePerPrefixDB
- type V0PieceInfoDB
- type V0PieceInfoDBForTest
- type Writer
Constants ¶
const ( // V1PieceHeaderReservedArea is the amount of space to be reserved at the beginning of // pieces stored with filestore.FormatV1 or greater. Serialized piece headers should be // written into that space, and the remaining space afterward should be zeroes. // V1PieceHeaderReservedArea includes the size of the framing field // (v1PieceHeaderFrameSize). It has a constant size because: // // * We do not anticipate needing more than this. // * We will be able to sum up all space used by a satellite (or all satellites) without // opening and reading from each piece file (stat() is faster than open()). // * This simplifies piece file writing (if we needed to know the exact header size // before writing, then we'd need to spool the entire contents of the piece somewhere // before we could calculate the hash and size). This way, we can simply reserve the // header space, write the piece content as it comes in, and then seek back to the // beginning and fill in the header. // // We put it at the beginning of piece files because: // // * If we put it at the end instead, we would have to seek to the end of a file (to find // out the real size while avoiding race conditions with stat()) and then seek backward // again to get the header, and then seek back to the beginning to get the content. // Seeking on spinning platter hard drives is very slow compared to reading sequential // bytes. // * Putting the header in the middle of piece files might be entertaining, but it would // also be silly. // * If piece files are incorrectly truncated or not completely written, it will be // much easier to identify those cases when the header is intact and findable. // // If more space than this is needed, we will need to use a new storage format version. V1PieceHeaderReservedArea = 512 )
Variables ¶
var BadFormatVersion = errs.Class("Incompatible storage format version")
BadFormatVersion is returned when a storage format cannot support the request function.
var DefaultConfig = Config{ WritePreallocSize: 4 * memory.MiB, }
DefaultConfig is the default value for the Config.
var ( // Error is the default error class. Error = errs.Class("pieces error") )
Functions ¶
func MonitorBlobWriter ¶ added in v1.90.1
func MonitorBlobWriter(name string, writer blobstore.BlobWriter) blobstore.BlobWriter
MonitorBlobWriter wraps the original BlobWriter and measures writing time.
Types ¶
type BlobsUsageCache ¶ added in v0.18.0
BlobsUsageCache is a blob storage with a cache for storing totals of current space used.
The following names have the following meaning: - piecesTotal: the total space used by pieces, including headers - piecesContentSize: the space used by piece content, not including headers - trashTotal: the total space used in the trash, including headers
pieceTotal and pieceContentSize are the corollary for a single file.
architecture: Database
func NewBlobsUsageCache ¶ added in v0.18.0
func NewBlobsUsageCache(log *zap.Logger, blob blobstore.Blobs) *BlobsUsageCache
NewBlobsUsageCache creates a new disk blob store with a space used cache.
func NewBlobsUsageCacheTest ¶ added in v0.18.0
func NewBlobsUsageCacheTest(ctx *testcontext.Context, log *zap.Logger, blob blobstore.Blobs, piecesTotal, piecesContentSize, trashTotal int64, spaceUsedBySatellite map[storj.NodeID]SatelliteUsage) (*BlobsUsageCache, error)
NewBlobsUsageCacheTest creates a new disk blob store with a space used cache.
func (*BlobsUsageCache) Close ¶ added in v0.18.0
func (blobs *BlobsUsageCache) Close() error
Close satisfies the pieces interface.
func (*BlobsUsageCache) Delete ¶ added in v0.18.0
Delete gets the size of the piece that is going to be deleted then deletes it and updates the space used cache accordingly.
func (*BlobsUsageCache) DeleteNamespace ¶ added in v1.96.2
func (blobs *BlobsUsageCache) DeleteNamespace(ctx context.Context, namespace []byte) error
DeleteNamespace deletes all blobs for a satellite and updates the cache.
func (*BlobsUsageCache) DeleteWithStorageFormat ¶ added in v1.107.3
func (blobs *BlobsUsageCache) DeleteWithStorageFormat(ctx context.Context, ref blobstore.BlobRef, formatVer blobstore.FormatVersion) error
DeleteWithStorageFormat gets the size of the piece that is going to be deleted then deletes it and updates the space used cache accordingly.
func (*BlobsUsageCache) EmptyTrash ¶ added in v0.28.3
func (blobs *BlobsUsageCache) EmptyTrash(ctx context.Context, namespace []byte, trashedBefore time.Time) (int64, [][]byte, error)
EmptyTrash empties the trash and updates the cache.
func (*BlobsUsageCache) Recalculate ¶ added in v0.18.0
func (blobs *BlobsUsageCache) Recalculate( piecesTotal, piecesTotalAtStart, piecesContentSize, piecesContentSizeAtStart, trashTotal, trashTotalAtStart int64, totalsBySatellite, totalsBySatelliteAtStart map[storj.NodeID]SatelliteUsage, )
Recalculate estimates new totals for the space used cache. In order to get new totals for the space used cache, we had to iterate over all the pieces on disk. Since that can potentially take a long time, here we need to check if we missed any additions/deletions while we were iterating and estimate how many bytes missed then add those to the space used result of iteration.
func (*BlobsUsageCache) RestoreTrash ¶ added in v0.28.3
RestoreTrash restores the trash for the namespace and updates the cache.
func (*BlobsUsageCache) SpaceUsedBySatellite ¶ added in v0.18.0
func (blobs *BlobsUsageCache) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error)
SpaceUsedBySatellite returns the current total space used for a specific satellite for all pieces.
func (*BlobsUsageCache) SpaceUsedForPieces ¶ added in v0.18.0
SpaceUsedForPieces returns the current total used space for all pieces.
func (*BlobsUsageCache) SpaceUsedForTrash ¶ added in v0.28.3
func (blobs *BlobsUsageCache) SpaceUsedForTrash(ctx context.Context) (int64, error)
SpaceUsedForTrash returns the current total used space for the trash dir.
func (*BlobsUsageCache) Stats ¶ added in v1.98.1
func (blobs *BlobsUsageCache) Stats(cb func(key monkit.SeriesKey, field string, val float64))
Stats implements monkit.StatSource.
func (*BlobsUsageCache) TestCreateV0 ¶ added in v0.18.0
func (blobs *BlobsUsageCache) TestCreateV0(ctx context.Context, ref blobstore.BlobRef) (_ blobstore.BlobWriter, err error)
TestCreateV0 creates a new V0 blob that can be written. This is only appropriate in test situations.
func (*BlobsUsageCache) Trash ¶ added in v0.28.3
func (blobs *BlobsUsageCache) Trash(ctx context.Context, blobRef blobstore.BlobRef, timestamp time.Time) error
Trash moves the ref to the trash and updates the cache.
func (*BlobsUsageCache) TrashWithStorageFormat ¶ added in v1.110.3
func (blobs *BlobsUsageCache) TrashWithStorageFormat(ctx context.Context, blobRef blobstore.BlobRef, formatVer blobstore.FormatVersion, timestamp time.Time) error
TrashWithStorageFormat marks a blob with a specific storage format for pending deletion.
type CacheService ¶ added in v0.18.0
type CacheService struct { Loop *sync2.Cycle // InitFence is released once the cache's Run method returns or when it has // completed the scan and persisted the data to the database. // This is useful for testing. InitFence sync2.Fence // contains filtered or unexported fields }
CacheService updates the space used cache.
architecture: Chore
func NewService ¶ added in v0.18.0
func NewService(log *zap.Logger, usageCache *BlobsUsageCache, pieces *Store, interval time.Duration, pieceScanOnStartup bool) *CacheService
NewService creates a new cache service that updates the space usage cache on startup and syncs the cache values to persistent storage on an interval.
func (*CacheService) Close ¶ added in v0.18.0
func (service *CacheService) Close() (err error)
Close closes the loop.
func (*CacheService) Init ¶ added in v0.18.0
func (service *CacheService) Init(ctx context.Context) (err error)
Init initializes the space used cache with the most recent values that were stored persistently.
func (*CacheService) PersistCacheTotals ¶ added in v0.18.0
func (service *CacheService) PersistCacheTotals(ctx context.Context) error
PersistCacheTotals saves the current totals of the space used cache to the database so that if the storagenode restarts it can retrieve the latest space used values without needing to recalculate since that could take a long time.
type Config ¶ added in v1.4.1
type Config struct { FileStatCache string `` /* 131-byte string literal not displayed */ WritePreallocSize memory.Size `help:"deprecated" default:"4MiB"` DeleteToTrash bool `` /* 176-byte string literal not displayed */ EnableLazyFilewalker bool `` /* 131-byte string literal not displayed */ }
Config is configuration for Store.
type DeleteRequest ¶ added in v1.4.1
DeleteRequest contains information to delete piece.
type Deleter ¶ added in v1.4.1
type Deleter struct {
// contains filtered or unexported fields
}
Deleter is a worker that processes requests to delete groups of pieceIDs. Deletes are processed "best-effort" asynchronously, and any errors are logged.
func NewDeleter ¶ added in v1.4.1
NewDeleter creates a new Deleter.
func (*Deleter) Enqueue ¶ added in v1.4.1
func (d *Deleter) Enqueue(ctx context.Context, satelliteID storj.NodeID, pieceIDs []storj.PieceID) (unhandled int)
Enqueue adds the pieceIDs to the delete queue. If the queue is full deletes are not processed and will be left for garbage collection. Enqueue returns true if all pieceIDs were successfully placed on the queue, false if some pieceIDs were dropped.
type ExpiredInfo ¶ added in v0.11.0
type ExpiredInfo struct { SatelliteID storj.NodeID PieceID storj.PieceID // This can be removed when we no longer need to support the pieceinfo db. Its only purpose // is to keep track of whether expired entries came from piece_expirations or pieceinfo. InPieceInfo bool }
ExpiredInfo is a fully namespaced piece id.
type FileWalker ¶ added in v1.77.2
type FileWalker struct {
// contains filtered or unexported fields
}
FileWalker implements methods to walk over pieces in a storage directory.
func NewFileWalker ¶ added in v1.77.2
func NewFileWalker(log *zap.Logger, blobs blobstore.Blobs, v0PieceInfoDB V0PieceInfoDB, gcProgressDB GCFilewalkerProgressDB) *FileWalker
NewFileWalker creates a new FileWalker.
func (*FileWalker) WalkAndComputeSpaceUsedBySatellite ¶ added in v1.78.1
func (fw *FileWalker) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (satPiecesTotal int64, satPiecesContentSize int64, err error)
WalkAndComputeSpaceUsedBySatellite walks over all pieces for a given satellite, adds up and returns the total space used.
func (*FileWalker) WalkCleanupTrash ¶ added in v1.100.2
func (fw *FileWalker) WalkCleanupTrash(ctx context.Context, satelliteID storj.NodeID, dateBefore time.Time) (bytesDeleted int64, keysDeleted []storj.PieceID, err error)
WalkCleanupTrash looks at all trash per-day directories owned by the given satellite and recursively deletes any of them that correspond to a time before the given dateBefore.
This method returns the number of blobs deleted, the total count of bytes occupied by those deleted blobs, and the number of bytes which were freed by the deletion (including filesystem overhead).
func (*FileWalker) WalkSatellitePieces ¶ added in v1.77.2
func (fw *FileWalker) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, startPrefix string, fn func(StoredPieceAccess) error) (err error)
WalkSatellitePieces executes walkFunc for each locally stored piece in the namespace of the given satellite. If walkFunc returns a non-nil error, WalkSatellitePieces will stop iterating and return the error immediately. The ctx parameter is intended specifically to allow canceling iteration early.
Note that this method includes all locally stored pieces, both V0 and higher. The startPrefix parameter can be used to start the iteration at a specific prefix. If startPrefix is empty, the iteration starts at the beginning of the namespace.
func (*FileWalker) WalkSatellitePiecesToTrash ¶ added in v1.79.1
func (fw *FileWalker) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter, trashFunc func(pieceID storj.PieceID) error) (piecesCount, piecesSkipped int64, err error)
WalkSatellitePiecesToTrash walks the satellite pieces and moves the pieces that are trash to the trash using the trashFunc provided
------------------------------------------------------------------------------------------------
On the correctness of using access.ModTime() in place of the more precise access.CreationTime():
------------------------------------------------------------------------------------------------
Background: for pieces not stored with storage.FormatV0, the access.CreationTime() value can only be retrieved by opening the piece file, and reading and unmarshaling the piece header. This is far slower than access.ModTime(), which gets the file modification time from the file system and only needs to do a stat(2) on the piece file. If we can make Retain() work with ModTime, we should.
Possibility of mismatch: We do not force or require piece file modification times to be equal to or close to the CreationTime specified by the uplink, but we do expect that piece files will be written to the filesystem _after_ the CreationTime. We make the assumption already that storage nodes and satellites and uplinks have system clocks that are very roughly in sync (that is, they are out of sync with each other by less than an hour of real time, or whatever is configured as MaxTimeSkew). So if an uplink is not lying about CreationTime and it uploads a piece that makes it to a storagenode's disk as quickly as possible, even in the worst-synchronized-clocks case we can assume that `ModTime > (CreationTime - MaxTimeSkew)`. We also allow for storage node operators doing file system manipulations after a piece has been written. If piece files are copied between volumes and their attributes are not preserved, it will be possible for their modification times to be changed to something later in time. This still preserves the inequality relationship mentioned above, `ModTime > (CreationTime - MaxTimeSkew)`. We only stipulate that storage node operators must not artificially change blob file modification times to be in the past.
If there is a mismatch: in most cases, a mismatch between ModTime and CreationTime has no effect. In certain remaining cases, the only effect is that a piece file which _should_ be garbage collected survives until the next round of garbage collection. The only really problematic case is when there is a relatively new piece file which was created _after_ this node's Retain bloom filter started being built on the satellite, and is recorded in this storage node's blob store before the Retain operation has completed. Then, it might be possible for that new piece to be garbage collected incorrectly, because it does not show up in the bloom filter and the node incorrectly thinks that it was created before the bloom filter. But if the uplink is not lying about CreationTime and its clock drift versus the storage node is less than `MaxTimeSkew`, and the ModTime on a blob file is correctly set from the storage node system time, then it is still true that `ModTime > (CreationTime - MaxTimeSkew)`.
The rule that storage node operators need to be aware of is only this: do not artificially set mtimes on blob files to be in the past. Let the filesystem manage mtimes. If blob files need to be moved or copied between locations, and this updates the mtime, that is ok. A secondary effect of this rule is that if the storage node's system clock needs to be changed forward by a nontrivial amount, mtimes on existing blobs should also be adjusted (by the same interval, ideally, but just running "touch" on all blobs is sufficient to avoid incorrect deletion of data).
type GCFilewalkerProgress ¶ added in v1.100.2
type GCFilewalkerProgress struct { Prefix string SatelliteID storj.NodeID BloomfilterCreatedBefore time.Time }
GCFilewalkerProgress keeps track of the GC filewalker progress.
type GCFilewalkerProgressDB ¶ added in v1.100.2
type GCFilewalkerProgressDB interface { // Store stores the GC filewalker progress. Store(ctx context.Context, progress GCFilewalkerProgress) error // Get returns the GC filewalker progress for the satellite. Get(ctx context.Context, satelliteID storj.NodeID) (GCFilewalkerProgress, error) // Reset resets the GC filewalker progress for the satellite. Reset(ctx context.Context, satelliteID storj.NodeID) error }
GCFilewalkerProgressDB is used to store intermediate progress to resume garbage collection after restarting the node.
type Info ¶
type Info struct { SatelliteID storj.NodeID PieceID storj.PieceID PieceSize int64 PieceCreation time.Time PieceExpiration time.Time OrderLimit *pb.OrderLimit UplinkPieceHash *pb.PieceHash }
Info contains all the information we need to know about a Piece to manage them.
type MonitoredBlobWriter ¶ added in v1.90.1
type MonitoredBlobWriter struct { blobstore.BlobWriter // contains filtered or unexported fields }
MonitoredBlobWriter is a blobstore.BlobWriter wrapper with additional Monkit metrics.
func (*MonitoredBlobWriter) Cancel ¶ added in v1.90.1
func (m *MonitoredBlobWriter) Cancel(ctx context.Context) error
Cancel implements io.Write.
type MonitoredHash ¶ added in v1.90.1
MonitoredHash is a hash.Hash wrapper with additional Monkit metrics.
func (*MonitoredHash) Reset ¶ added in v1.90.1
func (m *MonitoredHash) Reset()
Reset implements hash.Hash.
func (*MonitoredHash) Sum ¶ added in v1.90.1
func (m *MonitoredHash) Sum(b []byte) []byte
Sum implements hash.Hash.
type PieceExpirationDB ¶ added in v0.18.0
type PieceExpirationDB interface { // SetExpiration sets an expiration time for the given piece ID on the given satellite SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, expiresAt time.Time) error // GetExpired gets piece IDs that expire or have expired before the given time GetExpired(ctx context.Context, expiresBefore time.Time, batchSize int) ([]ExpiredInfo, error) // DeleteExpirations deletes approximately all the expirations that happen before the given time DeleteExpirations(ctx context.Context, expiresAt time.Time) error // DeleteExpirationsBatch deletes the pieces in the batch DeleteExpirationsBatch(ctx context.Context, now time.Time, limit int) error }
PieceExpirationDB stores information about pieces with expiration dates.
architecture: Database
type PieceSpaceUsedDB ¶ added in v0.18.0
type PieceSpaceUsedDB interface { // Init creates the one total and trash record if it doesn't already exist Init(ctx context.Context) error // GetPieceTotals returns the space used (total and contentSize) by all pieces stored GetPieceTotals(ctx context.Context) (piecesTotal int64, piecesContentSize int64, err error) // GetPieceTotalsForAllSatellites returns how much total space used by pieces stored for each satelliteID GetPieceTotalsForAllSatellites(ctx context.Context) (map[storj.NodeID]SatelliteUsage, error) // UpdatePieceTotalsForAllSatellites updates each record for total spaced used with a new value for each satelliteID UpdatePieceTotalsForAllSatellites(ctx context.Context, newTotalsBySatellites map[storj.NodeID]SatelliteUsage) error // UpdatePieceTotalsForSatellite updates record with new values for a specific satelliteID. // If the usage values are set to zero, the record is deleted. UpdatePieceTotalsForSatellite(ctx context.Context, satelliteID storj.NodeID, usage SatelliteUsage) error // GetTrashTotal returns the total space used by trash GetTrashTotal(ctx context.Context) (int64, error) // UpdateTrashTotal updates the record for total spaced used for trash with a new value UpdateTrashTotal(ctx context.Context, newTotal int64) error }
PieceSpaceUsedDB stores the most recent totals from the space used cache.
architecture: Database
type PrefixUsedSpace ¶ added in v1.100.2
type PrefixUsedSpace struct { Prefix string SatelliteID storj.NodeID TotalBytes int64 LastUpdated time.Time }
PrefixUsedSpace contains the used space information of a prefix.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader implements a piece reader that reads content from blob store.
func NewReader ¶
func NewReader(blob blobstore.BlobReader) (*Reader, error)
NewReader creates a new reader for blobstore.BlobReader.
func (*Reader) GetPieceHeader ¶ added in v0.18.0
func (r *Reader) GetPieceHeader() (*pb.PieceHeader, error)
GetPieceHeader reads, unmarshals, and returns the piece header. It may only be called once, before any Read() calls.
Retrieving the header at any time could be supported, but for the sake of performance we need to understand why and how often that would happen.
func (*Reader) ReadAt ¶
ReadAt reads data at the specified offset, which is relative to the piece content, not the underlying blob. The piece header is not reachable by this method.
func (*Reader) Seek ¶
Seek seeks to the specified location within the piece content (ignoring the header).
func (*Reader) StorageFormatVersion ¶ added in v0.18.0
func (r *Reader) StorageFormatVersion() blobstore.FormatVersion
StorageFormatVersion returns the storage format version of the piece being read.
type SatelliteUsage ¶ added in v0.31.0
type SatelliteUsage struct { Total int64 // the total space used (including headers) ContentSize int64 // only content size used (excluding things like headers) }
SatelliteUsage contains information of how much space is used by a satellite.
type StorageStatus ¶
type StorageStatus struct { // DiskTotal is the actual disk size (not just the allocated disk space), in bytes. DiskTotal int64 DiskUsed int64 // DiskFree is the actual amount of free space on the whole disk, not just allocated disk space, in bytes. DiskFree int64 }
StorageStatus contains information about the disk store is using.
type Store ¶
type Store struct { Filewalker *FileWalker // contains filtered or unexported fields }
Store implements storing pieces onto a blob storage implementation.
architecture: Database
func NewStore ¶
func NewStore(log *zap.Logger, fw *FileWalker, lazyFilewalker *lazyfilewalker.Supervisor, blobs blobstore.Blobs, v0PieceInfo V0PieceInfoDB, expirationInfo PieceExpirationDB, spaceUsedDB PieceSpaceUsedDB, config Config) *Store
NewStore creates a new piece store.
func (*Store) CheckWritability ¶ added in v1.12.1
CheckWritability tests writability of the storage directory by creating and deleting a file.
func (*Store) CheckWritabilityWithTimeout ¶ added in v1.75.2
CheckWritabilityWithTimeout tests writability of the storage directory by creating and deleting a file with a timeout.
func (*Store) CreateVerificationFile ¶ added in v1.11.1
CreateVerificationFile creates a file to be used for storage directory verification.
func (*Store) Delete ¶
func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error)
Delete deletes the specified piece.
func (*Store) DeleteExpiredBatchSkipV0 ¶ added in v1.110.1
func (store *Store) DeleteExpiredBatchSkipV0(ctx context.Context, expireAt time.Time, limit int) (err error)
DeleteExpiredBatchSkipV0 deletes the pieces in the batch skipping V0 format and pieceinfo database.
func (*Store) DeleteExpiredV0 ¶ added in v1.110.1
DeleteExpiredV0 deletes all pieces with an expiration earlier than the provided time.
func (*Store) DeleteSatelliteBlobs ¶ added in v1.8.1
DeleteSatelliteBlobs deletes blobs folder of specific satellite after successful GE.
func (*Store) DeleteSkipV0 ¶ added in v1.104.1
func (store *Store) DeleteSkipV0(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error)
DeleteSkipV0 deletes the specified piece skipping V0 format and pieceinfo database.
func (*Store) EmptyTrash ¶ added in v0.27.0
func (store *Store) EmptyTrash(ctx context.Context, satelliteID storj.NodeID, trashedBefore time.Time) (err error)
EmptyTrash deletes pieces in the trash that have been in there longer than trashExpiryInterval.
func (*Store) GetExpired ¶ added in v0.18.0
func (store *Store) GetExpired(ctx context.Context, expiredAt time.Time) (info []ExpiredInfo, err error)
GetExpired gets piece IDs that are expired and were created before the given time.
func (*Store) GetExpiredBatchSkipV0 ¶ added in v1.110.1
func (store *Store) GetExpiredBatchSkipV0(ctx context.Context, expiredAt time.Time, batchSize int) (batch []ExpiredInfo, err error)
GetExpiredBatchSkipV0 gets piece IDs that are expired and were created before the given time limiting the number of pieces returned to the batch size. This method skips V0 pieces.
func (*Store) GetHashAndLimit ¶ added in v0.26.0
func (store *Store) GetHashAndLimit(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, reader *Reader) (pb.PieceHash, pb.OrderLimit, error)
GetHashAndLimit returns the PieceHash and OrderLimit associated with the specified piece. The piece must already have been opened for reading, and the associated *Reader passed in.
Once we have migrated everything off of V0 storage and no longer need to support it, this can cleanly become a method directly on *Reader and will need only the 'pieceID' parameter.
func (*Store) GetV0PieceInfo ¶ added in v0.26.0
func (store *Store) GetV0PieceInfo(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (*Info, error)
GetV0PieceInfo fetches the Info record from the V0 piece info database. Obviously, of no use when a piece does not have filestore.FormatV0 storage.
func (*Store) MigrateV0ToV1 ¶ added in v0.25.0
func (store *Store) MigrateV0ToV1(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error)
MigrateV0ToV1 will migrate a piece stored with storage format v0 to storage format v1. If the piece is not stored as a v0 piece it will return an error. The follow failures are possible:
- sql.ErrNoRows if the v0pieceInfoDB was corrupted or recreated.
- Fail to open or read v0 piece. In this case no artifacts remain.
- Fail to Write or Commit v1 piece. In this case no artifacts remain.
- Fail to Delete v0 piece. In this case v0 piece may remain, but v1 piece will exist and be preferred in future calls.
func (*Store) Reader ¶
func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error)
Reader returns a new piece reader.
func (*Store) RestoreTrash ¶ added in v0.27.0
RestoreTrash restores all pieces in the trash.
func (*Store) SetExpiration ¶ added in v0.18.0
func (store *Store) SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, expiresAt time.Time) (err error)
SetExpiration records an expiration time for the specified piece ID owned by the specified satellite.
func (*Store) SpaceUsedBySatellite ¶ added in v0.18.0
func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal, piecesContentSize int64, err error)
SpaceUsedBySatellite calculates *an approximation of* how much disk space is used for local piece storage in the given satellite's namespace. This is an approximation because changes may be being applied to the filestore as this information is collected, and because it is possible that various errors in directory traversal could cause this count to be undersized.
This returns both the total size of pieces plus the contentSize of pieces.
func (*Store) SpaceUsedForPieces ¶ added in v0.18.0
func (store *Store) SpaceUsedForPieces(ctx context.Context) (piecesTotal int64, piecesContentSize int64, err error)
SpaceUsedForPieces returns *an approximation of* the disk space used by all local pieces (both V0 and later). This is an approximation because changes may be being applied to the filestore as this information is collected, and because it is possible that various errors in directory traversal could cause this count to be undersized.
Returns: - piecesTotal: the total space used by pieces, including headers - piecesContentSize: the space used by piece content, not including headers
This returns both the total size of pieces plus the contentSize of pieces.
func (*Store) SpaceUsedForPiecesAndTrash ¶ added in v0.28.3
SpaceUsedForPiecesAndTrash returns the total space used by both active pieces and the trash directory.
func (*Store) SpaceUsedForTrash ¶ added in v0.28.3
SpaceUsedForTrash returns the total space used by the the piece store's trash, including all headers.
func (*Store) SpaceUsedTotalAndBySatellite ¶ added in v0.18.0
func (store *Store) SpaceUsedTotalAndBySatellite(ctx context.Context) (piecesTotal, piecesContentSize int64, totalBySatellite map[storj.NodeID]SatelliteUsage, err error)
SpaceUsedTotalAndBySatellite adds up the space used by and for all satellites for blob storage.
func (*Store) Stat ¶ added in v1.69.2
func (store *Store) Stat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (blobstore.BlobInfo, error)
Stat looks up disk metadata on the blob file.
func (*Store) StorageStatus ¶
func (store *Store) StorageStatus(ctx context.Context) (_ StorageStatus, err error)
StorageStatus returns information about the disk.
func (*Store) Trash ¶ added in v0.27.0
func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, timestamp time.Time) (err error)
Trash moves the specified piece to the blob trash. If necessary, it converts the v0 piece to a v1 piece. It also marks the item as "trashed" in the pieceExpirationDB.
func (*Store) TryRestoreTrashPiece ¶ added in v1.88.2
func (store *Store) TryRestoreTrashPiece(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error)
TryRestoreTrashPiece attempts to restore a piece from the trash. It returns nil if the piece was restored, or an error if the piece was not in the trash.
func (*Store) VerifyStorageDir ¶ added in v1.11.1
VerifyStorageDir verifies that the storage directory is correct by checking for the existence and validity of the verification file.
func (*Store) VerifyStorageDirWithTimeout ¶ added in v1.75.2
func (store *Store) VerifyStorageDirWithTimeout(ctx context.Context, id storj.NodeID, timeout time.Duration) error
VerifyStorageDirWithTimeout verifies that the storage directory is correct by checking for the existence and validity of the verification file. It uses the provided timeout for the operation.
func (*Store) WalkAndComputeSpaceUsedBySatellite ¶ added in v1.110.1
func (store *Store) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID, lowerIOPriority bool) (piecesTotal, piecesContentSize int64, err error)
WalkAndComputeSpaceUsedBySatellite walks over all pieces for a given satellite, adds up and returns the total space used.
func (*Store) WalkSatellitePieces ¶ added in v0.18.0
func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) (err error)
WalkSatellitePieces wraps FileWalker.WalkSatellitePieces.
func (*Store) WalkSatellitePiecesFromPrefix ¶ added in v1.101.3
func (store *Store) WalkSatellitePiecesFromPrefix(ctx context.Context, satellite storj.NodeID, startPrefix string, walkFunc func(StoredPieceAccess) error) (err error)
WalkSatellitePiecesFromPrefix is like WalkSatellitePieces, but starts at a specific prefix.
func (*Store) WalkSatellitePiecesToTrash ¶ added in v1.100.2
func (store *Store) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter, trashFunc func(pieceID storj.PieceID) error) (piecesCount, piecesSkipped int64, err error)
WalkSatellitePiecesToTrash walks the satellite pieces and moves the pieces that are trash to the trash using the trashFunc provided.
If the lazy filewalker is enabled, it will be used to find the pieces to trash, otherwise the regular filewalker will be used. If the lazy filewalker fails, the regular filewalker will be used as a fallback.
type StoreForTest ¶ added in v0.18.0
type StoreForTest struct {
*Store
}
StoreForTest is a wrapper around Store to be used only in test scenarios. It enables writing pieces with older storage formats.
func (StoreForTest) GetV0PieceInfoDBForTest ¶ added in v0.26.0
func (store StoreForTest) GetV0PieceInfoDBForTest() V0PieceInfoDBForTest
GetV0PieceInfoDBForTest returns this piece-store's reference to the V0 piece info DB (or nil, if this piece-store does not have one). This is ONLY intended for use with testing functionality.
func (*StoreForTest) ReaderWithStorageFormat ¶ added in v1.77.2
func (store *StoreForTest) ReaderWithStorageFormat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, formatVersion blobstore.FormatVersion) (_ *Reader, err error)
ReaderWithStorageFormat returns a new piece reader for a located piece, which avoids the potential need to check multiple storage formats to find the right blob.
func (StoreForTest) WriterForFormatVersion ¶ added in v0.18.0
func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, formatVersion blobstore.FormatVersion, hashAlgorithm pb.PieceHashAlgorithm) (_ *Writer, err error)
WriterForFormatVersion allows opening a piece writer with a specified storage format version. This is meant to be used externally only in test situations (thus the StoreForTest receiver type).
type StoredPieceAccess ¶ added in v0.18.0
type StoredPieceAccess interface { blobstore.BlobInfo // PieceID gives the pieceID of the piece PieceID() storj.PieceID // Satellite gives the nodeID of the satellite which owns the piece Satellite() (storj.NodeID, error) // Size gives the size of the piece on disk, and the size of the piece // content (not including the piece header, if applicable) Size(ctx context.Context) (int64, int64, error) // CreationTime returns the piece creation time as given in the original PieceHash (which is // likely not the same as the file mtime). For non-FormatV0 pieces, this requires opening // the file and unmarshaling the piece header. If exact precision is not required, ModTime() // may be a better solution. CreationTime(ctx context.Context) (time.Time, error) // ModTime returns a less-precise piece creation time than CreationTime, but is generally // much faster. For non-FormatV0 pieces, this gets the piece creation time from to the // filesystem instead of the piece header. ModTime(ctx context.Context) (time.Time, error) }
StoredPieceAccess allows inspection and manipulation of a piece during iteration with WalkSatellitePieces-type methods.
type TrashChore ¶ added in v0.27.0
TrashChore is the chore that periodically empties the trash.
func NewTrashChore ¶ added in v0.27.0
func NewTrashChore(log *zap.Logger, choreInterval, trashExpiryInterval time.Duration, trust *trust.Pool, store *Store) *TrashChore
NewTrashChore instantiates a new TrashChore. choreInterval is how often this chore runs, and trashExpiryInterval is passed into the EmptyTrash method to determine which trashed pieces should be deleted.
func (*TrashChore) Close ¶ added in v0.27.0
func (chore *TrashChore) Close() error
Close closes the chore.
func (*TrashChore) Run ¶ added in v0.27.0
func (chore *TrashChore) Run(ctx context.Context) (err error)
Run starts the cycle.
func (*TrashChore) StartRestore ¶ added in v1.70.1
StartRestore starts a satellite restore, if it hasn't already started and the chore is not shutting down.
type UsedSpacePerPrefixDB ¶ added in v1.100.2
type UsedSpacePerPrefixDB interface { // Store stores the used space per prefix. Store(ctx context.Context, usedSpace PrefixUsedSpace) error // Get returns the used space per prefix for the satellite. Get(ctx context.Context, satelliteID storj.NodeID) ([]PrefixUsedSpace, error) }
UsedSpacePerPrefixDB is an interface for the used_space_per_prefix database.
type V0PieceInfoDB ¶ added in v0.18.0
type V0PieceInfoDB interface { // Get returns Info about a piece. Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (*Info, error) // Delete deletes Info about a piece. Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error // GetExpired gets piece IDs stored with storage format V0 that expire or have expired // before the given time GetExpired(ctx context.Context, expiredAt time.Time) ([]ExpiredInfo, error) // DeleteExpirations deletes approximately all the expirations that happen before the given time DeleteExpirations(ctx context.Context, expiresAt time.Time) error // WalkSatelliteV0Pieces executes walkFunc for each locally stored piece, stored // with storage format V0 in the namespace of the given satellite. If walkFunc returns a // non-nil error, WalkSatelliteV0Pieces will stop iterating and return the error // immediately. The ctx parameter is intended specifically to allow canceling iteration // early. WalkSatelliteV0Pieces(ctx context.Context, blobStore blobstore.Blobs, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) error }
V0PieceInfoDB stores meta information about pieces stored with storage format V0 (where metadata goes in the "pieceinfo" table in the storagenodedb). The actual pieces are stored behind something providing the blobstore.Blobs interface.
architecture: Database
type V0PieceInfoDBForTest ¶ added in v0.18.0
type V0PieceInfoDBForTest interface { V0PieceInfoDB // Add inserts Info to the database. This is only a valid thing to do, now, // during tests, to replicate the environment of a storage node not yet fully // migrated to V1 storage. Add(context.Context, *Info) error }
V0PieceInfoDBForTest is like V0PieceInfoDB, but adds on the Add() method so that test environments with V0 piece data can be set up.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer implements a piece writer that writes content to blob store and calculates a hash.
func NewWriter ¶
func NewWriter(log *zap.Logger, blobWriter blobstore.BlobWriter, blobs blobstore.Blobs, satellite storj.NodeID, hashAlgorithm pb.PieceHashAlgorithm) (*Writer, error)
NewWriter creates a new writer for blobstore.BlobWriter.