README
¶
sector-storage
a concrete implementation of the specs-storage interface
The sector-storage project provides a implementation-nonspecific reference implementation of the specs-storage interface.
Disclaimer
Please report your issues with regards to sector-storage at the lotus issue tracker
Architecture
Manager
Manages is the top-level piece of the storage system gluing all the other pieces together. It also implements scheduling logic.
package stores
This package implements the sector storage subsystem. Fundamentally the storage
is divided into path
s, each path has it's UUID, and stores a set of sector
'files'. There are currently 3 types of sector files - unsealed
, sealed
,
and cache
.
Paths can be shared between nodes by sharing the underlying filesystem.
stores.Local
The Local store implements SectorProvider for paths mounted in the local filesystem. Paths can be shared between nodes, and support shared filesystems such as NFS.
stores.Local implements all native filesystem-related operations
stores.Remote
The Remote store extends Local store, handles fetching sector files into a local store if needed, and handles removing sectors from non-local stores.
stores.Index
The Index is a singleton holding metadata about storage paths, and a mapping of sector files to paths
LocalWorker
LocalWorker implements the Worker interface with ffiwrapper.Sealer and a store.Store instance
License
The Filecoin Project is dual-licensed under Apache 2.0 and MIT terms:
- Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
Documentation
¶
Index ¶
- Variables
- func WithPriority(ctx context.Context, priority int) context.Context
- type Call
- type CallState
- type ExecutorFunc
- type FaultTracker
- type LocalWorker
- func (l *LocalWorker) AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, ...) (storiface.CallID, error)
- func (l *LocalWorker) Close() error
- func (l *LocalWorker) Fetch(ctx context.Context, sector storage.SectorRef, ...) (storiface.CallID, error)
- func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error)
- func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error)
- func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error)
- func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error
- func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error)
- func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, ...) (storiface.CallID, error)
- func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error)
- func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error
- func (l *LocalWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, ...) (storiface.CallID, error)
- func (l *LocalWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storiface.CallID, error)
- func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, ...) (storiface.CallID, error)
- func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storiface.CallID, error)
- func (l *LocalWorker) Session(ctx context.Context) (uuid.UUID, error)
- func (l *LocalWorker) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error
- func (l *LocalWorker) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error
- func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
- func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef, ...) (storiface.CallID, error)
- func (l *LocalWorker) WaitQuiet()
- type Manager
- func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error
- func (m *Manager) AddLocalStorage(ctx context.Context, path string) error
- func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, ...) (abi.PieceInfo, error)
- func (m *Manager) AddWorker(ctx context.Context, w Worker) error
- func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, ...) (map[abi.SectorID]string, error)
- func (m *Manager) Close(ctx context.Context) error
- func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error
- func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error)
- func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error
- func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, ...) error
- func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error
- func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error
- func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, ...) error
- func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error
- func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error
- func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error
- func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, ...) error
- func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error
- func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, ...) error
- func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, ...) error
- func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, ...) error
- func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, ...) error
- func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error
- func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, error)
- func (m *Manager) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, ...) (out storage.Commit1Out, err error)
- func (m *Manager) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (out storage.Proof, err error)
- func (m *Manager) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, ...) (out storage.PreCommit1Out, err error)
- func (m *Manager) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error)
- func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error)
- func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob
- func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats
- type ManagerStateStore
- type ManyBytes
- type Resources
- type ReturnType
- type SchedDiagInfo
- type SchedDiagRequestInfo
- type SealerConfig
- type SectorManager
- type StorageAuth
- type URLs
- type WorkID
- type WorkState
- type WorkStatus
- type Worker
- type WorkerAction
- type WorkerConfig
- type WorkerID
- type WorkerSelector
- type WorkerStateStore
Constants ¶
This section is empty.
Variables ¶
var ClosedWorkerID = uuid.UUID{}
var DefaultSchedPriority = 0
var ErrNoWorkers = errors.New("no suitable workers found")
var InitWait = 3 * time.Second
var ParallelDenom uint64 = 100
var ParallelNum uint64 = 92
Percent of threads to allocate to parallel tasks
12 * 0.92 = 11 16 * 0.92 = 14 24 * 0.92 = 22 32 * 0.92 = 29 64 * 0.92 = 58 128 * 0.92 = 117
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{ sealtasks.TTAddPiece: { abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ MaxMemory: 8 << 30, MinMemory: 8 << 30, MaxParallelism: 1, BaseMinMemory: 1 << 30, }, abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ MaxMemory: 4 << 30, MinMemory: 4 << 30, MaxParallelism: 1, BaseMinMemory: 1 << 30, }, abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{ MaxMemory: 1 << 30, MinMemory: 1 << 30, MaxParallelism: 1, BaseMinMemory: 1 << 30, }, abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{ MaxMemory: 2 << 10, MinMemory: 2 << 10, MaxParallelism: 1, BaseMinMemory: 2 << 10, }, abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{ MaxMemory: 8 << 20, MinMemory: 8 << 20, MaxParallelism: 1, BaseMinMemory: 8 << 20, }, }, sealtasks.TTPreCommit1: { abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ MaxMemory: 128 << 30, MinMemory: 112 << 30, MaxParallelism: 1, BaseMinMemory: 10 << 20, }, abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ MaxMemory: 64 << 30, MinMemory: 56 << 30, MaxParallelism: 1, BaseMinMemory: 10 << 20, }, abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{ MaxMemory: 1 << 30, MinMemory: 768 << 20, MaxParallelism: 1, BaseMinMemory: 1 << 20, }, abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{ MaxMemory: 2 << 10, MinMemory: 2 << 10, MaxParallelism: 1, BaseMinMemory: 2 << 10, }, abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{ MaxMemory: 8 << 20, MinMemory: 8 << 20, MaxParallelism: 1, BaseMinMemory: 8 << 20, }, }, sealtasks.TTPreCommit2: { abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ MaxMemory: 30 << 30, MinMemory: 30 << 30, MaxParallelism: -1, CanGPU: true, BaseMinMemory: 1 << 30, }, abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ MaxMemory: 15 << 30, MinMemory: 15 << 30, MaxParallelism: -1, CanGPU: true, BaseMinMemory: 1 << 30, }, abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{ MaxMemory: 3 << 29, MinMemory: 1 << 30, MaxParallelism: -1, BaseMinMemory: 1 << 30, }, abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{ MaxMemory: 2 << 10, MinMemory: 2 << 10, MaxParallelism: -1, BaseMinMemory: 2 << 10, }, abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{ MaxMemory: 8 << 20, MinMemory: 8 << 20, MaxParallelism: -1, BaseMinMemory: 8 << 20, }, }, sealtasks.TTCommit1: { abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ MaxMemory: 1 << 30, MinMemory: 1 << 30, MaxParallelism: 0, BaseMinMemory: 1 << 30, }, abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ MaxMemory: 1 << 30, MinMemory: 1 << 30, MaxParallelism: 0, BaseMinMemory: 1 << 30, }, abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{ MaxMemory: 1 << 30, MinMemory: 1 << 30, MaxParallelism: 0, BaseMinMemory: 1 << 30, }, abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{ MaxMemory: 2 << 10, MinMemory: 2 << 10, MaxParallelism: 0, BaseMinMemory: 2 << 10, }, abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{ MaxMemory: 8 << 20, MinMemory: 8 << 20, MaxParallelism: 0, BaseMinMemory: 8 << 20, }, }, sealtasks.TTCommit2: { abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ MaxMemory: 190 << 30, MinMemory: 60 << 30, MaxParallelism: -1, CanGPU: true, BaseMinMemory: 64 << 30, }, abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ MaxMemory: 150 << 30, MinMemory: 30 << 30, MaxParallelism: -1, CanGPU: true, BaseMinMemory: 32 << 30, }, abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{ MaxMemory: 3 << 29, MinMemory: 1 << 30, MaxParallelism: 1, CanGPU: true, BaseMinMemory: 10 << 30, }, abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{ MaxMemory: 2 << 10, MinMemory: 2 << 10, MaxParallelism: 1, CanGPU: true, BaseMinMemory: 2 << 10, }, abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{ MaxMemory: 8 << 20, MinMemory: 8 << 20, MaxParallelism: 1, CanGPU: true, BaseMinMemory: 8 << 20, }, }, sealtasks.TTFetch: { abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ MaxMemory: 1 << 20, MinMemory: 1 << 20, MaxParallelism: 0, CanGPU: false, BaseMinMemory: 0, }, abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ MaxMemory: 1 << 20, MinMemory: 1 << 20, MaxParallelism: 0, CanGPU: false, BaseMinMemory: 0, }, abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{ MaxMemory: 1 << 20, MinMemory: 1 << 20, MaxParallelism: 0, CanGPU: false, BaseMinMemory: 0, }, abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{ MaxMemory: 1 << 20, MinMemory: 1 << 20, MaxParallelism: 0, CanGPU: false, BaseMinMemory: 0, }, abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{ MaxMemory: 1 << 20, MinMemory: 1 << 20, MaxParallelism: 0, CanGPU: false, BaseMinMemory: 0, }, }, }
var SchedPriorityKey schedPrioCtxKey
var (
SchedWindows = 2
)
var SelectorTimeout = 5 * time.Second
Functions ¶
Types ¶
type Call ¶ added in v1.1.3
type Call struct { ID storiface.CallID RetType ReturnType State CallState Result *ManyBytes // json bytes }
type ExecutorFunc ¶ added in v1.1.3
type ExecutorFunc func() (ffiwrapper.Storage, error)
used do provide custom proofs impl (mostly used in testing)
type FaultTracker ¶
type FaultTracker interface {
CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error)
}
FaultTracker TODO: Track things more actively
type LocalWorker ¶
type LocalWorker struct {
// contains filtered or unexported fields
}
func NewLocalWorker ¶
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker
func (*LocalWorker) AddPiece ¶
func (l *LocalWorker) AddPiece(ctx context.Context, sector storage.SectorRef, epcs []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (storiface.CallID, error)
func (*LocalWorker) Close ¶
func (l *LocalWorker) Close() error
func (*LocalWorker) Fetch ¶
func (l *LocalWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error)
func (*LocalWorker) FinalizeSector ¶
func (*LocalWorker) Info ¶
func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error)
func (*LocalWorker) MoveStorage ¶
func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error)
func (*LocalWorker) Paths ¶
func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error)
func (*LocalWorker) ReadPiece ¶
func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error)
func (*LocalWorker) ReleaseUnsealed ¶
func (*LocalWorker) SealCommit1 ¶
func (l *LocalWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error)
func (*LocalWorker) SealCommit2 ¶
func (l *LocalWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storiface.CallID, error)
func (*LocalWorker) SealPreCommit1 ¶
func (*LocalWorker) SealPreCommit2 ¶
func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storiface.CallID, error)
func (*LocalWorker) TaskDisable ¶ added in v1.2.2
func (*LocalWorker) TaskEnable ¶ added in v1.2.2
func (*LocalWorker) UnsealPiece ¶
func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error)
func (*LocalWorker) WaitQuiet ¶ added in v1.1.3
func (l *LocalWorker) WaitQuiet()
WaitQuiet blocks as long as there are tasks running
type Manager ¶
func New ¶
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, sc SealerConfig, urls URLs, sa StorageAuth, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error)
func (*Manager) AddLocalStorage ¶
func (*Manager) CheckProvable ¶
func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error)
CheckProvable returns unprovable sectors
func (*Manager) FinalizeSector ¶
func (*Manager) ReadPiece ¶
func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error
func (*Manager) ReleaseUnsealed ¶
func (*Manager) ReturnAddPiece ¶ added in v1.1.3
func (*Manager) ReturnFetch ¶ added in v1.1.3
func (*Manager) ReturnFinalizeSector ¶ added in v1.1.3
func (*Manager) ReturnMoveStorage ¶ added in v1.1.3
func (*Manager) ReturnReadPiece ¶ added in v1.1.3
func (*Manager) ReturnReleaseUnsealed ¶ added in v1.1.3
func (*Manager) ReturnSealCommit1 ¶ added in v1.1.3
func (*Manager) ReturnSealCommit2 ¶ added in v1.1.3
func (*Manager) ReturnSealPreCommit1 ¶ added in v1.1.3
func (*Manager) ReturnSealPreCommit2 ¶ added in v1.1.3
func (*Manager) ReturnUnsealPiece ¶ added in v1.1.3
func (*Manager) SealCommit1 ¶
func (m *Manager) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error)
func (*Manager) SealCommit2 ¶
func (*Manager) SealPreCommit1 ¶
func (*Manager) SealPreCommit2 ¶
func (m *Manager) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error)
func (*Manager) StorageLocal ¶
func (*Manager) WorkerStats ¶
func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats
type ManagerStateStore ¶ added in v1.1.3
type ManagerStateStore *statestore.StateStore
type ManyBytes ¶ added in v1.1.3
type ManyBytes struct {
// contains filtered or unexported fields
}
Ideally this would be a tag on the struct field telling cbor-gen to enforce higher max-len
type Resources ¶
type ReturnType ¶ added in v1.1.3
type ReturnType string
const ( AddPiece ReturnType = "AddPiece" SealPreCommit1 ReturnType = "SealPreCommit1" SealPreCommit2 ReturnType = "SealPreCommit2" SealCommit1 ReturnType = "SealCommit1" SealCommit2 ReturnType = "SealCommit2" FinalizeSector ReturnType = "FinalizeSector" ReleaseUnsealed ReturnType = "ReleaseUnsealed" MoveStorage ReturnType = "MoveStorage" UnsealPiece ReturnType = "UnsealPiece" ReadPiece ReturnType = "ReadPiece" Fetch ReturnType = "Fetch" )
type SchedDiagInfo ¶
type SchedDiagInfo struct { Requests []SchedDiagRequestInfo OpenWindows []string }
type SchedDiagRequestInfo ¶
type SealerConfig ¶
type SectorManager ¶
type SectorManager interface { ReadPiece(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error ffiwrapper.StorageSealer storage.Prover storiface.WorkerReturn FaultTracker }
type StorageAuth ¶
type WorkState ¶ added in v1.1.3
type WorkStatus ¶ added in v1.1.3
type WorkStatus string
type Worker ¶
type Worker interface { storiface.WorkerCalls TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) // Returns paths accessible to the worker Paths(context.Context) ([]stores.StoragePath, error) Info(context.Context) (storiface.WorkerInfo, error) Session(context.Context) (uuid.UUID, error) Close() error // TODO: do we need this? }
type WorkerConfig ¶
type WorkerSelector ¶
type WorkerSelector interface { Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b }
type WorkerStateStore ¶ added in v1.1.3
type WorkerStateStore *statestore.StateStore