Documentation ¶
Index ¶
- Constants
- Variables
- type BlobsUsageCache
- func (blobs *BlobsUsageCache) Close() error
- func (blobs *BlobsUsageCache) Delete(ctx context.Context, blobRef storage.BlobRef) error
- func (blobs *BlobsUsageCache) Recalculate(ctx context.Context, newTotal, totalAtIterationStart int64, ...) error
- func (blobs *BlobsUsageCache) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (int64, error)
- func (blobs *BlobsUsageCache) SpaceUsedForPieces(ctx context.Context) (int64, error)
- func (blobs *BlobsUsageCache) TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
- func (blobs *BlobsUsageCache) Update(ctx context.Context, satelliteID storj.NodeID, pieceContentSize int64)
- type CacheService
- type ExpiredInfo
- type Info
- type PieceExpirationDB
- type PieceSpaceUsedDB
- type Reader
- func (r *Reader) Close() error
- func (r *Reader) GetPieceHeader() (*pb.PieceHeader, error)
- func (r *Reader) Read(data []byte) (int, error)
- func (r *Reader) ReadAt(data []byte, offset int64) (int, error)
- func (r *Reader) Seek(offset int64, whence int) (int64, error)
- func (r *Reader) Size() int64
- func (r *Reader) StorageFormatVersion() storage.FormatVersion
- type StorageStatus
- type Store
- func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error)
- func (store *Store) DeleteFailed(ctx context.Context, expired ExpiredInfo, when time.Time) (err error)
- func (store *Store) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (_ []ExpiredInfo, err error)
- func (store *Store) GetV0PieceInfoDB() V0PieceInfoDB
- func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error)
- func (store *Store) ReaderWithStorageFormat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, ...) (_ *Reader, err error)
- func (store *Store) SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, ...) (err error)
- func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (int64, error)
- func (store *Store) SpaceUsedForPieces(ctx context.Context) (int64, error)
- func (store *Store) SpaceUsedTotalAndBySatellite(ctx context.Context) (total int64, totalBySatellite map[storj.NodeID]int64, err error)
- func (store *Store) StorageStatus(ctx context.Context) (_ StorageStatus, err error)
- func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, ...) (err error)
- func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Writer, err error)
- type StoreForTest
- type StoredPieceAccess
- type V0PieceInfoDB
- type V0PieceInfoDBForTest
- type Writer
Constants ¶
const ( // V1PieceHeaderReservedArea is the amount of space to be reserved at the beginning of // pieces stored with filestore.FormatV1 or greater. Serialized piece headers should be // written into that space, and the remaining space afterward should be zeroes. // V1PieceHeaderReservedArea includes the size of the framing field // (v1PieceHeaderFrameSize). It has a constant size because: // // * We do not anticipate needing more than this. // * We will be able to sum up all space used by a satellite (or all satellites) without // opening and reading from each piece file (stat() is faster than open()). // * This simplifies piece file writing (if we needed to know the exact header size // before writing, then we'd need to spool the entire contents of the piece somewhere // before we could calculate the hash and size). This way, we can simply reserve the // header space, write the piece content as it comes in, and then seek back to the // beginning and fill in the header. // // We put it at the beginning of piece files because: // // * If we put it at the end instead, we would have to seek to the end of a file (to find // out the real size while avoiding race conditions with stat()) and then seek backward // again to get the header, and then seek back to the beginning to get the content. // Seeking on spinning platter hard drives is very slow compared to reading sequential // bytes. // * Putting the header in the middle of piece files might be entertaining, but it would // also be silly. // * If piece files are incorrectly truncated or not completely written, it will be // much easier to identify those cases when the header is intact and findable. // // If more space than this is needed, we will need to use a new storage format version. V1PieceHeaderReservedArea = 512 )
Variables ¶
var BadFormatVersion = errs.Class("Incompatible storage format version")
BadFormatVersion is returned when a storage format cannot support the request function
var ( // Error is the default error class. Error = errs.Class("pieces error") )
Functions ¶
This section is empty.
Types ¶
type BlobsUsageCache ¶ added in v0.18.0
BlobsUsageCache is a blob storage with a cache for storing totals of current space used
architecture: Database
func NewBlobsUsageCache ¶ added in v0.18.0
func NewBlobsUsageCache(blob storage.Blobs) *BlobsUsageCache
NewBlobsUsageCache creates a new disk blob store with a space used cache
func NewBlobsUsageCacheTest ¶ added in v0.18.0
func NewBlobsUsageCacheTest(blob storage.Blobs, total int64, totalSpaceUsedBySatellite map[storj.NodeID]int64) *BlobsUsageCache
NewBlobsUsageCacheTest creates a new disk blob store with a space used cache
func (*BlobsUsageCache) Close ¶ added in v0.18.0
func (blobs *BlobsUsageCache) Close() error
Close satisfies the pieces interface
func (*BlobsUsageCache) Delete ¶ added in v0.18.0
Delete gets the size of the piece that is going to be deleted then deletes it and updates the space used cache accordingly
func (*BlobsUsageCache) Recalculate ¶ added in v0.18.0
func (blobs *BlobsUsageCache) Recalculate(ctx context.Context, newTotal, totalAtIterationStart int64, newTotalBySatellite, totalBySatelliteAtIterationStart map[storj.NodeID]int64) error
Recalculate estimates new totals for the space used cache. In order to get new totals for the space used cache, we had to iterate over all the pieces on disk. Since that can potentially take a long time, here we need to check if we missed any additions/deletions while we were iterating and estimate how many bytes missed then add those to the space used result of iteration.
func (*BlobsUsageCache) SpaceUsedBySatellite ¶ added in v0.18.0
func (blobs *BlobsUsageCache) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (int64, error)
SpaceUsedBySatellite returns the current total space used for a specific satellite for all pieces (not including header bytes)
func (*BlobsUsageCache) SpaceUsedForPieces ¶ added in v0.18.0
func (blobs *BlobsUsageCache) SpaceUsedForPieces(ctx context.Context) (int64, error)
SpaceUsedForPieces returns the current total used space for // all pieces content (not including header bytes)
func (*BlobsUsageCache) TestCreateV0 ¶ added in v0.18.0
func (blobs *BlobsUsageCache) TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
TestCreateV0 creates a new V0 blob that can be written. This is only appropriate in test situations.
type CacheService ¶ added in v0.18.0
type CacheService struct {
// contains filtered or unexported fields
}
CacheService updates the space used cache
architecture: Chore
func NewService ¶ added in v0.18.0
func NewService(log *zap.Logger, usageCache *BlobsUsageCache, pieces *Store, interval time.Duration) *CacheService
NewService creates a new cache service that updates the space usage cache on startup and syncs the cache values to persistent storage on an interval
func (*CacheService) Close ¶ added in v0.18.0
func (service *CacheService) Close() (err error)
Close closes the loop
func (*CacheService) Init ¶ added in v0.18.0
func (service *CacheService) Init(ctx context.Context) (err error)
Init initializes the space used cache with the most recent values that were stored persistently
func (*CacheService) PersistCacheTotals ¶ added in v0.18.0
func (service *CacheService) PersistCacheTotals(ctx context.Context) error
PersistCacheTotals saves the current totals of the space used cache to the database so that if the storagenode restarts it can retrieve the latest space used values without needing to recalculate since that could take a long time
type ExpiredInfo ¶ added in v0.11.0
type ExpiredInfo struct { SatelliteID storj.NodeID PieceID storj.PieceID // This can be removed when we no longer need to support the pieceinfo db. Its only purpose // is to keep track of whether expired entries came from piece_expirations or pieceinfo. InPieceInfo bool }
ExpiredInfo is a fully namespaced piece id
type Info ¶
type Info struct { SatelliteID storj.NodeID PieceID storj.PieceID PieceSize int64 PieceCreation time.Time PieceExpiration time.Time OrderLimit *pb.OrderLimit UplinkPieceHash *pb.PieceHash }
Info contains all the information we need to know about a Piece to manage them.
type PieceExpirationDB ¶ added in v0.18.0
type PieceExpirationDB interface { // GetExpired gets piece IDs that expire or have expired before the given time GetExpired(ctx context.Context, expiresBefore time.Time, limit int64) ([]ExpiredInfo, error) // SetExpiration sets an expiration time for the given piece ID on the given satellite SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, expiresAt time.Time) error // DeleteExpiration removes an expiration record for the given piece ID on the given satellite DeleteExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (found bool, err error) // DeleteFailed marks an expiration record as having experienced a failure in deleting the // piece from the disk DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error }
PieceExpirationDB stores information about pieces with expiration dates.
architecture: Database
type PieceSpaceUsedDB ¶ added in v0.18.0
type PieceSpaceUsedDB interface { // Init creates the one total record if it doesn't already exist Init(ctx context.Context) error // GetTotal returns the total space used by all pieces stored on disk GetTotal(ctx context.Context) (int64, error) // GetTotalsForAllSatellites returns how much total space used by pieces stored on disk for each satelliteID GetTotalsForAllSatellites(ctx context.Context) (map[storj.NodeID]int64, error) // UpdateTotal updates the record for total spaced used with a new value UpdateTotal(ctx context.Context, newTotal int64) error // UpdateTotalsForAllSatellites updates each record for total spaced used with a new value for each satelliteID UpdateTotalsForAllSatellites(ctx context.Context, newTotalsBySatellites map[storj.NodeID]int64) error }
PieceSpaceUsedDB stores the most recent totals from the space used cache
architecture: Database
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader implements a piece reader that reads content from blob store.
func NewReader ¶
func NewReader(blob storage.BlobReader) (*Reader, error)
NewReader creates a new reader for storage.BlobReader.
func (*Reader) GetPieceHeader ¶ added in v0.18.0
func (r *Reader) GetPieceHeader() (*pb.PieceHeader, error)
GetPieceHeader reads, unmarshals, and returns the piece header. It may only be called once, before any Read() calls. (Retrieving the header at any time could be supported, but for the sake of performance we need to understand why and how often that would happen.)
func (*Reader) ReadAt ¶
ReadAt reads data at the specified offset, which is relative to the piece content, not the underlying blob. The piece header is not reachable by this method.
func (*Reader) Seek ¶
Seek seeks to the specified location within the piece content (ignoring the header).
func (*Reader) StorageFormatVersion ¶ added in v0.18.0
func (r *Reader) StorageFormatVersion() storage.FormatVersion
StorageFormatVersion returns the storage format version of the piece being read.
type StorageStatus ¶
StorageStatus contains information about the disk store is using.
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store implements storing pieces onto a blob storage implementation.
architecture: Database
func NewStore ¶
func NewStore(log *zap.Logger, blobs storage.Blobs, v0PieceInfo V0PieceInfoDB, expirationInfo PieceExpirationDB, pieceSpaceUsedDB PieceSpaceUsedDB) *Store
NewStore creates a new piece store
func (*Store) Delete ¶
func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error)
Delete deletes the specified piece.
func (*Store) DeleteFailed ¶ added in v0.18.0
func (store *Store) DeleteFailed(ctx context.Context, expired ExpiredInfo, when time.Time) (err error)
DeleteFailed marks piece as a failed deletion.
func (*Store) GetExpired ¶ added in v0.18.0
func (store *Store) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (_ []ExpiredInfo, err error)
GetExpired gets piece IDs that are expired and were created before the given time
func (*Store) GetV0PieceInfoDB ¶ added in v0.18.0
func (store *Store) GetV0PieceInfoDB() V0PieceInfoDB
GetV0PieceInfoDB returns this piece-store's reference to the V0 piece info DB (or nil, if this piece-store does not have one). This is ONLY intended for use with testing functionality.
func (*Store) Reader ¶
func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error)
Reader returns a new piece reader.
func (*Store) ReaderWithStorageFormat ¶ added in v0.18.0
func (store *Store) ReaderWithStorageFormat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Reader, err error)
ReaderWithStorageFormat returns a new piece reader for a located piece, which avoids the potential need to check multiple storage formats to find the right blob.
func (*Store) SetExpiration ¶ added in v0.18.0
func (store *Store) SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, expiresAt time.Time) (err error)
SetExpiration records an expiration time for the specified piece ID owned by the specified satellite
func (*Store) SpaceUsedBySatellite ¶ added in v0.18.0
func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (int64, error)
SpaceUsedBySatellite calculates *an approximation of* how much disk space is used for local piece storage in the given satellite's namespace. This is an approximation because changes may be being applied to the filestore as this information is collected, and because it is possible that various errors in directory traversal could cause this count to be undersized.
Important note: this metric does not include space used by piece headers, whereas storj/filestore/store.(*Store).SpaceUsedInNamespace() *does* include all space used by the blobs.
func (*Store) SpaceUsedForPieces ¶ added in v0.18.0
SpaceUsedForPieces returns *an approximation of* the disk space used by all local pieces (both V0 and later). This is an approximation because changes may be being applied to the filestore as this information is collected, and because it is possible that various errors in directory traversal could cause this count to be undersized.
Important note: this metric does not include space used by piece headers, whereas storj/filestore/store.(*Store).SpaceUsed() *does* include all space used by the blobs.
The value of reservedSpace for this Store is added to the result, but this should only affect tests (reservedSpace should always be 0 in real usage).
func (*Store) SpaceUsedTotalAndBySatellite ¶ added in v0.18.0
func (store *Store) SpaceUsedTotalAndBySatellite(ctx context.Context) (total int64, totalBySatellite map[storj.NodeID]int64, err error)
SpaceUsedTotalAndBySatellite adds up the space used by and for all satellites for blob storage
func (*Store) StorageStatus ¶
func (store *Store) StorageStatus(ctx context.Context) (_ StorageStatus, err error)
StorageStatus returns information about the disk.
func (*Store) WalkSatellitePieces ¶ added in v0.18.0
func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) (err error)
WalkSatellitePieces executes walkFunc for each locally stored piece in the namespace of the given satellite. If walkFunc returns a non-nil error, WalkSatellitePieces will stop iterating and return the error immediately. The ctx parameter is intended specifically to allow canceling iteration early.
Note that this method includes all locally stored pieces, both V0 and higher.
type StoreForTest ¶ added in v0.18.0
type StoreForTest struct {
*Store
}
StoreForTest is a wrapper around Store to be used only in test scenarios. It enables writing pieces with older storage formats
func (StoreForTest) WriterForFormatVersion ¶ added in v0.18.0
func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Writer, err error)
WriterForFormatVersion allows opening a piece writer with a specified storage format version. This is meant to be used externally only in test situations (thus the StoreForTest receiver type).
type StoredPieceAccess ¶ added in v0.18.0
type StoredPieceAccess interface { storage.BlobInfo // PieceID gives the pieceID of the piece PieceID() storj.PieceID // Satellite gives the nodeID of the satellite which owns the piece Satellite() (storj.NodeID, error) // ContentSize gives the size of the piece content (not including the piece header, if // applicable) ContentSize(ctx context.Context) (int64, error) // CreationTime returns the piece creation time as given in the original PieceHash (which is // likely not the same as the file mtime). For non-FormatV0 pieces, this requires opening // the file and unmarshaling the piece header. If exact precision is not required, ModTime() // may be a better solution. CreationTime(ctx context.Context) (time.Time, error) // ModTime returns a less-precise piece creation time than CreationTime, but is generally // much faster. For non-FormatV0 pieces, this gets the piece creation time from to the // filesystem instead of the piece header. ModTime(ctx context.Context) (time.Time, error) }
StoredPieceAccess allows inspection and manipulation of a piece during iteration with WalkSatellitePieces-type methods.
type V0PieceInfoDB ¶ added in v0.18.0
type V0PieceInfoDB interface { // Get returns Info about a piece. Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (*Info, error) // Delete deletes Info about a piece. Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error // DeleteFailed marks piece deletion from disk failed DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error // GetExpired gets piece IDs stored with storage format V0 that expire or have expired // before the given time GetExpired(ctx context.Context, expiredAt time.Time, limit int64) ([]ExpiredInfo, error) // WalkSatelliteV0Pieces executes walkFunc for each locally stored piece, stored // with storage format V0 in the namespace of the given satellite. If walkFunc returns a // non-nil error, WalkSatelliteV0Pieces will stop iterating and return the error // immediately. The ctx parameter is intended specifically to allow canceling iteration // early. WalkSatelliteV0Pieces(ctx context.Context, blobStore storage.Blobs, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) error }
V0PieceInfoDB stores meta information about pieces stored with storage format V0 (where metadata goes in the "pieceinfo" table in the storagenodedb). The actual pieces are stored behind something providing the storage.Blobs interface.
architecture: Database
type V0PieceInfoDBForTest ¶ added in v0.18.0
type V0PieceInfoDBForTest interface { V0PieceInfoDB // Add inserts Info to the database. This is only a valid thing to do, now, // during tests, to replicate the environment of a storage node not yet fully // migrated to V1 storage. Add(context.Context, *Info) error }
V0PieceInfoDBForTest is like V0PieceInfoDB, but adds on the Add() method so that test environments with V0 piece data can be set up.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer implements a piece writer that writes content to blob store and calculates a hash.
func NewWriter ¶
func NewWriter(blobWriter storage.BlobWriter, blobs storage.Blobs, satellite storj.NodeID) (*Writer, error)
NewWriter creates a new writer for storage.BlobWriter.