sectorstorage

package
v1.7.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2021 License: Apache-2.0, MIT Imports: 39 Imported by: 0

README

sector-storage

CircleCI standard-readme compliant

a concrete implementation of the specs-storage interface

The sector-storage project provides a implementation-nonspecific reference implementation of the specs-storage interface.

Disclaimer

Please report your issues with regards to sector-storage at the lotus issue tracker

Architecture

high-level architecture

Manager

Manages is the top-level piece of the storage system gluing all the other pieces together. It also implements scheduling logic.

package stores

This package implements the sector storage subsystem. Fundamentally the storage is divided into paths, each path has it's UUID, and stores a set of sector 'files'. There are currently 3 types of sector files - unsealed, sealed, and cache.

Paths can be shared between nodes by sharing the underlying filesystem.

stores.Local

The Local store implements SectorProvider for paths mounted in the local filesystem. Paths can be shared between nodes, and support shared filesystems such as NFS.

stores.Local implements all native filesystem-related operations

stores.Remote

The Remote store extends Local store, handles fetching sector files into a local store if needed, and handles removing sectors from non-local stores.

stores.Index

The Index is a singleton holding metadata about storage paths, and a mapping of sector files to paths

LocalWorker

LocalWorker implements the Worker interface with ffiwrapper.Sealer and a store.Store instance

License

The Filecoin Project is dual-licensed under Apache 2.0 and MIT terms:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ClosedWorkerID = uuid.UUID{}
View Source
var DefaultSchedPriority = 0
View Source
var ErrNoWorkers = errors.New("no suitable workers found")
View Source
var InitWait = 3 * time.Second
View Source
var ParallelDenom uint64 = 100
View Source
var ParallelNum uint64 = 92

Percent of threads to allocate to parallel tasks

12 * 0.92 = 11 16 * 0.92 = 14 24 * 0.92 = 22 32 * 0.92 = 29 64 * 0.92 = 58 128 * 0.92 = 117

View Source
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
	sealtasks.TTAddPiece: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 8 << 30,
			MinMemory: 8 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 4 << 30,
			MinMemory: 4 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 1 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: 1,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: 1,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTPreCommit1: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 128 << 30,
			MinMemory: 112 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 10 << 20,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 64 << 30,
			MinMemory: 56 << 30,

			MaxParallelism: 1,

			BaseMinMemory: 10 << 20,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 768 << 20,

			MaxParallelism: 1,

			BaseMinMemory: 1 << 20,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: 1,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: 1,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTPreCommit2: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 30 << 30,
			MinMemory: 30 << 30,

			MaxParallelism: -1,
			CanGPU:         true,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 15 << 30,
			MinMemory: 15 << 30,

			MaxParallelism: -1,
			CanGPU:         true,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 3 << 29,
			MinMemory: 1 << 30,

			MaxParallelism: -1,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: -1,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: -1,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTCommit1: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 1 << 30,

			MaxParallelism: 0,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 1 << 30,

			MaxParallelism: 0,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 1 << 30,
			MinMemory: 1 << 30,

			MaxParallelism: 0,

			BaseMinMemory: 1 << 30,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: 0,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: 0,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTCommit2: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 190 << 30,
			MinMemory: 60 << 30,

			MaxParallelism: -1,
			CanGPU:         true,

			BaseMinMemory: 64 << 30,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 150 << 30,
			MinMemory: 30 << 30,

			MaxParallelism: -1,
			CanGPU:         true,

			BaseMinMemory: 32 << 30,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 3 << 29,
			MinMemory: 1 << 30,

			MaxParallelism: 1,
			CanGPU:         true,

			BaseMinMemory: 10 << 30,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 2 << 10,
			MinMemory: 2 << 10,

			MaxParallelism: 1,
			CanGPU:         true,

			BaseMinMemory: 2 << 10,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 8 << 20,
			MinMemory: 8 << 20,

			MaxParallelism: 1,
			CanGPU:         true,

			BaseMinMemory: 8 << 20,
		},
	},
	sealtasks.TTFetch: {
		abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
		abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
		abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
		abi.RegisteredSealProof_StackedDrg2KiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
		abi.RegisteredSealProof_StackedDrg8MiBV1: Resources{
			MaxMemory: 1 << 20,
			MinMemory: 1 << 20,

			MaxParallelism: 0,
			CanGPU:         false,

			BaseMinMemory: 0,
		},
	},
}
View Source
var SchedPriorityKey schedPrioCtxKey
View Source
var (
	SchedWindows = 2
)
View Source
var SelectorTimeout = 5 * time.Second

Functions

func WithPriority

func WithPriority(ctx context.Context, priority int) context.Context

Types

type Call added in v1.1.3

type Call struct {
	ID      storiface.CallID
	RetType ReturnType

	State CallState

	Result *ManyBytes // json bytes
}

func (*Call) MarshalCBOR added in v1.1.3

func (t *Call) MarshalCBOR(w io.Writer) error

func (*Call) UnmarshalCBOR added in v1.1.3

func (t *Call) UnmarshalCBOR(r io.Reader) error

type CallState added in v1.1.3

type CallState uint64
const (
	CallStarted CallState = iota
	CallDone
)

type ExecutorFunc added in v1.1.3

type ExecutorFunc func() (ffiwrapper.Storage, error)

used do provide custom proofs impl (mostly used in testing)

type FaultTracker

type FaultTracker interface {
	CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error)
}

FaultTracker TODO: Track things more actively

type LocalWorker

type LocalWorker struct {
	// contains filtered or unexported fields
}

func NewLocalWorker

func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker

func (*LocalWorker) AddPiece

func (*LocalWorker) Close

func (l *LocalWorker) Close() error

func (*LocalWorker) Fetch

func (*LocalWorker) FinalizeSector

func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error)

func (*LocalWorker) Info

func (*LocalWorker) MoveStorage

func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error)

func (*LocalWorker) NewSector

func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error

func (*LocalWorker) Paths

func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error)

func (*LocalWorker) ReadPiece

func (*LocalWorker) ReleaseUnsealed

func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error)

func (*LocalWorker) Remove

func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error

func (*LocalWorker) SealCommit1

func (*LocalWorker) SealCommit2

func (l *LocalWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storiface.CallID, error)

func (*LocalWorker) SealPreCommit1

func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error)

func (*LocalWorker) SealPreCommit2

func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storiface.CallID, error)

func (*LocalWorker) Session added in v1.1.3

func (l *LocalWorker) Session(ctx context.Context) (uuid.UUID, error)

func (*LocalWorker) TaskDisable added in v1.2.2

func (l *LocalWorker) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error

func (*LocalWorker) TaskEnable added in v1.2.2

func (l *LocalWorker) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error

func (*LocalWorker) TaskTypes

func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)

func (*LocalWorker) UnsealPiece

func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error)

func (*LocalWorker) WaitQuiet added in v1.1.3

func (l *LocalWorker) WaitQuiet()

WaitQuiet blocks as long as there are tasks running

type Manager

type Manager struct {
	storage.Prover
	// contains filtered or unexported fields
}

func (*Manager) Abort added in v1.1.3

func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error

func (*Manager) AddLocalStorage

func (m *Manager) AddLocalStorage(ctx context.Context, path string) error

func (*Manager) AddPiece

func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error)

func (*Manager) AddWorker

func (m *Manager) AddWorker(ctx context.Context, w Worker) error

func (*Manager) CheckProvable

func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error)

CheckProvable returns unprovable sectors

func (*Manager) Close

func (m *Manager) Close(ctx context.Context) error

func (*Manager) FinalizeSector

func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error

func (*Manager) FsStat

func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error)

func (*Manager) NewSector

func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error

func (*Manager) ReadPiece

func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) error

func (*Manager) ReleaseUnsealed

func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error

func (*Manager) Remove

func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error

func (*Manager) ReturnAddPiece added in v1.1.3

func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error

func (*Manager) ReturnFetch added in v1.1.3

func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnFinalizeSector added in v1.1.3

func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnMoveStorage added in v1.1.3

func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnReadPiece added in v1.1.3

func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error

func (*Manager) ReturnReleaseUnsealed added in v1.1.3

func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnSealCommit1 added in v1.1.3

func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error

func (*Manager) ReturnSealCommit2 added in v1.1.3

func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error

func (*Manager) ReturnSealPreCommit1 added in v1.1.3

func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error

func (*Manager) ReturnSealPreCommit2 added in v1.1.3

func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error

func (*Manager) ReturnUnsealPiece added in v1.1.3

func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) SchedDiag

func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, error)

func (*Manager) SealCommit1

func (m *Manager) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error)

func (*Manager) SealCommit2

func (m *Manager) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (out storage.Proof, err error)

func (*Manager) SealPreCommit1

func (m *Manager) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error)

func (*Manager) SealPreCommit2

func (m *Manager) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error)

func (*Manager) ServeHTTP

func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Manager) StorageLocal

func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error)

func (*Manager) WorkerJobs

func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob

func (*Manager) WorkerStats

func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats

type ManagerStateStore added in v1.1.3

type ManagerStateStore *statestore.StateStore

type ManyBytes added in v1.1.3

type ManyBytes struct {
	// contains filtered or unexported fields
}

Ideally this would be a tag on the struct field telling cbor-gen to enforce higher max-len

func (*ManyBytes) MarshalCBOR added in v1.1.3

func (t *ManyBytes) MarshalCBOR(w io.Writer) error

func (*ManyBytes) UnmarshalCBOR added in v1.1.3

func (t *ManyBytes) UnmarshalCBOR(r io.Reader) error

type Resources

type Resources struct {
	MinMemory uint64 // What Must be in RAM for decent perf
	MaxMemory uint64 // Memory required (swap + ram)

	MaxParallelism int // -1 = multithread
	CanGPU         bool

	BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
}

func (Resources) Threads

func (r Resources) Threads(wcpus uint64) uint64

TODO: Take NUMA into account

type ReturnType added in v1.1.3

type ReturnType string
const (
	AddPiece        ReturnType = "AddPiece"
	SealPreCommit1  ReturnType = "SealPreCommit1"
	SealPreCommit2  ReturnType = "SealPreCommit2"
	SealCommit1     ReturnType = "SealCommit1"
	SealCommit2     ReturnType = "SealCommit2"
	FinalizeSector  ReturnType = "FinalizeSector"
	ReleaseUnsealed ReturnType = "ReleaseUnsealed"
	MoveStorage     ReturnType = "MoveStorage"
	UnsealPiece     ReturnType = "UnsealPiece"
	ReadPiece       ReturnType = "ReadPiece"
	Fetch           ReturnType = "Fetch"
)

type SchedDiagInfo

type SchedDiagInfo struct {
	Requests    []SchedDiagRequestInfo
	OpenWindows []string
}

type SchedDiagRequestInfo

type SchedDiagRequestInfo struct {
	Sector   abi.SectorID
	TaskType sealtasks.TaskType
	Priority int
}

type SealerConfig

type SealerConfig struct {
	ParallelFetchLimit int

	// Local worker config
	AllowAddPiece   bool
	AllowPreCommit1 bool
	AllowPreCommit2 bool
	AllowCommit     bool
	AllowUnseal     bool
}

type StorageAuth

type StorageAuth http.Header

type URLs

type URLs []string

type WorkID added in v1.1.3

type WorkID struct {
	Method sealtasks.TaskType
	Params string // json [...params]
}

func (*WorkID) MarshalCBOR added in v1.1.3

func (t *WorkID) MarshalCBOR(w io.Writer) error

func (WorkID) String added in v1.1.3

func (w WorkID) String() string

func (*WorkID) UnmarshalCBOR added in v1.1.3

func (t *WorkID) UnmarshalCBOR(r io.Reader) error

type WorkState added in v1.1.3

type WorkState struct {
	ID WorkID

	Status WorkStatus

	WorkerCall storiface.CallID // Set when entering wsRunning
	WorkError  string           // Status = wsDone, set when failed to start work

	WorkerHostname string // hostname of last worker handling this job
	StartTime      int64  // unix seconds
}

func (*WorkState) MarshalCBOR added in v1.1.3

func (t *WorkState) MarshalCBOR(w io.Writer) error

func (*WorkState) UnmarshalCBOR added in v1.1.3

func (t *WorkState) UnmarshalCBOR(r io.Reader) error

type WorkStatus added in v1.1.3

type WorkStatus string

type Worker

type Worker interface {
	storiface.WorkerCalls

	TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)

	// Returns paths accessible to the worker
	Paths(context.Context) ([]stores.StoragePath, error)

	Info(context.Context) (storiface.WorkerInfo, error)

	Session(context.Context) (uuid.UUID, error)

	Close() error // TODO: do we need this?
}

type WorkerAction

type WorkerAction func(ctx context.Context, w Worker) error

type WorkerConfig

type WorkerConfig struct {
	TaskTypes []sealtasks.TaskType
	NoSwap    bool
}

type WorkerID

type WorkerID uuid.UUID // worker session UUID

func (WorkerID) String added in v1.2.2

func (w WorkerID) String() string

type WorkerSelector

type WorkerSelector interface {
	Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task

	Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b
}

type WorkerStateStore added in v1.1.3

type WorkerStateStore *statestore.StateStore

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL