sectorstorage

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2020 License: Apache-2.0, MIT Imports: 27 Imported by: 0

README

sector-storage

CircleCI standard-readme compliant

a concrete implementation of the specs-storage interface

The sector-storage project provides a implementation-nonspecific reference implementation of the specs-storage interface.

Disclaimer

Please report your issues with regards to sector-storage at the lotus issue tracker

Architecture

high-level architecture

Manager

Manages is the top-level piece of the storage system gluing all the other pieces together. It also implements scheduling logic.

package stores

This package implements the sector storage subsystem. Fundamentally the storage is divided into paths, each path has it's UUID, and stores a set of sector 'files'. There are currently 3 types of sector files - unsealed, sealed, and cache.

Paths can be shared between nodes by sharing the underlying filesystem.

stores.Local

The Local store implements SectorProvider for paths mounted in the local filesystem. Paths can be shared between nodes, and support shared filesystems such as NFS.

stores.Local implements all native filesystem-related operations

stores.Remote

The Remote store extends Local store, handles fetching sector files into a local store if needed, and handles removing sectors from non-local stores.

stores.Index

The Index is a singleton holding metadata about storage paths, and a mapping of sector files to paths

LocalWorker

LocalWorker implements the Worker interface with ffiwrapper.Sealer and a store.Store instance

License

The Filecoin Project is dual-licensed under Apache 2.0 and MIT terms:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultSchedPriority = 0
View Source
var ErrNoWorkers = errors.New("no suitable workers found")
View Source
var InitWait = 3 * time.Second
View Source
var ParallelDenom uint64 = 100
View Source
var ParallelNum uint64 = 92

Percent of threads to allocate to parallel tasks

12 * 0.92 = 11 16 * 0.92 = 14 24 * 0.92 = 22 32 * 0.92 = 29 64 * 0.92 = 58 128 * 0.92 = 117

View Source
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
	sealtasks.TTAddPiece: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 8 << 30,
			MinMemory: 8 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 4 << 30,
			MinMemory: 4 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 1 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: 1,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: 1,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTPreCommit1: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 128 << 30,
			MinMemory: 112 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 10 << 20,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 64 << 30,
			MinMemory: 56 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 10 << 20,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 768 << 20,

			MaxParallelism: 1,

			BaseMinMemory: 1 << 20,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: 1,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: 1,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTPreCommit2: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 30 << 30,
			MinMemory: 30 << 30,

			MaxParallelism: -1,
			CanGPU:         true,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 15 << 30,
			MinMemory: 15 << 30,

			MaxParallelism: -1,
			CanGPU:         true,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 3 << 29,
			MinMemory: 1 << 30,

			MaxParallelism: -1,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: -1,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: -1,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTCommit1: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 1 << 30,

			MaxParallelism: 0,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 1 << 30,

			MaxParallelism: 0,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 1 << 30,

			MaxParallelism: 0,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: 0,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: 0,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTCommit2: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 190 << 30,
			MinMemory: 60 << 30,

			MaxParallelism: -1,
			CanGPU:         true,

			BaseMinMemory: 64 << 30,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 150 << 30,
			MinMemory: 30 << 30,

			MaxParallelism: -1,
			CanGPU:         true,

			BaseMinMemory: 32 << 30,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 3 << 29,
			MinMemory: 1 << 30,

			MaxParallelism: 1,
			CanGPU:         true,

			BaseMinMemory: 10 << 30,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: 1,
			CanGPU:         true,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: 1,
			CanGPU:         true,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTFetch: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
	},
}
View Source
var SchedPriorityKey schedPrioCtxKey
View Source
var (
	SchedWindows = 2
)
View Source
var SelectorTimeout = 5 * time.Second

Functions

func WithPriority

func WithPriority(ctx context.Context, priority int) context.Context

Types

type FaultTracker

type FaultTracker interface {
	CheckProvable(ctx context.Context, spt abi.RegisteredSealProof, sectors []abi.SectorID) ([]abi.SectorID, error)
}

FaultTracker TODO: Track things more actively

type LocalWorker

type LocalWorker struct {
	// contains filtered or unexported fields
}

func NewLocalWorker

func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker

func (*LocalWorker) AddPiece

func (*LocalWorker) Close

func (l *LocalWorker) Close() error

func (*LocalWorker) Closing

func (l *LocalWorker) Closing(ctx context.Context) (<-chan struct{}, error)

func (*LocalWorker) Fetch

func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error

func (*LocalWorker) FinalizeSector

func (l *LocalWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage2.Range) error

func (*LocalWorker) Info

func (*LocalWorker) MoveStorage

func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types stores.SectorFileType) error

func (*LocalWorker) NewSector

func (l *LocalWorker) NewSector(ctx context.Context, sector abi.SectorID) error

func (*LocalWorker) Paths

func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error)

func (*LocalWorker) ReadPiece

func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)

func (*LocalWorker) ReleaseUnsealed

func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage2.Range) error

func (*LocalWorker) Remove

func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error

func (*LocalWorker) SealCommit1

func (l *LocalWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage2.SectorCids) (output storage2.Commit1Out, err error)

func (*LocalWorker) SealCommit2

func (l *LocalWorker) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.Commit1Out) (proof storage2.Proof, err error)

func (*LocalWorker) SealPreCommit1

func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error)

func (*LocalWorker) SealPreCommit2

func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage2.PreCommit1Out) (cids storage2.SectorCids, err error)

func (*LocalWorker) TaskTypes

func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)

func (*LocalWorker) UnsealPiece

func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error

type Manager

type Manager struct {
	storage.Prover
	// contains filtered or unexported fields
}

func (*Manager) AddLocalStorage

func (m *Manager) AddLocalStorage(ctx context.Context, path string) error

func (*Manager) AddPiece

func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error)

func (*Manager) AddWorker

func (m *Manager) AddWorker(ctx context.Context, w Worker) error

func (*Manager) CheckProvable

func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredSealProof, sectors []abi.SectorID) ([]abi.SectorID, error)

CheckProvable returns unprovable sectors

func (*Manager) Close

func (m *Manager) Close(ctx context.Context) error

func (*Manager) FinalizeSector

func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error

func (*Manager) FsStat

func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error)

func (*Manager) NewSector

func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error

func (*Manager) ReadPiece

func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error

func (*Manager) ReleaseUnsealed

func (m *Manager) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error

func (*Manager) Remove

func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error

func (*Manager) SchedDiag

func (m *Manager) SchedDiag(ctx context.Context) (interface{}, error)

func (*Manager) SealCommit1

func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error)

func (*Manager) SealCommit2

func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error)

func (*Manager) SealPreCommit1

func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error)

func (*Manager) SealPreCommit2

func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error)

func (*Manager) SectorSize

func (m *Manager) SectorSize() abi.SectorSize

func (*Manager) ServeHTTP

func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Manager) StorageLocal

func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error)

func (*Manager) WorkerJobs

func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob

func (*Manager) WorkerStats

func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats

type Resources

type Resources struct {
	MinMemory uint64 // What Must be in RAM for decent perf
	MaxMemory uint64 // Memory required (swap + ram)

	MaxParallelism int // -1 = multithread
	CanGPU         bool

	BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
}

func (Resources) Threads

func (r Resources) Threads(wcpus uint64) uint64

TODO: Take NUMA into account

type SchedDiagInfo

type SchedDiagInfo struct {
	Requests    []SchedDiagRequestInfo
	OpenWindows []WorkerID
}

type SchedDiagRequestInfo

type SchedDiagRequestInfo struct {
	Sector   abi.SectorID
	TaskType sealtasks.TaskType
	Priority int
}

type SealerConfig

type SealerConfig struct {
	ParallelFetchLimit int

	// Local worker config
	AllowAddPiece   bool
	AllowPreCommit1 bool
	AllowPreCommit2 bool
	AllowCommit     bool
	AllowUnseal     bool
}

type StorageAuth

type StorageAuth http.Header

type URLs

type URLs []string

type Worker

type Worker interface {
	ffiwrapper.StorageSealer

	MoveStorage(ctx context.Context, sector abi.SectorID, types stores.SectorFileType) error

	Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error
	UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
	ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error)

	TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)

	// Returns paths accessible to the worker
	Paths(context.Context) ([]stores.StoragePath, error)

	Info(context.Context) (storiface.WorkerInfo, error)

	// returns channel signalling worker shutdown
	Closing(context.Context) (<-chan struct{}, error)

	Close() error
}

type WorkerAction

type WorkerAction func(ctx context.Context, w Worker) error

type WorkerConfig

type WorkerConfig struct {
	SealProof abi.RegisteredSealProof
	TaskTypes []sealtasks.TaskType
	NoSwap    bool
}

type WorkerID

type WorkerID uint64

type WorkerSelector

type WorkerSelector interface {
	Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task

	Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL