sectorstorage

package
v1.13.2-rc4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 11, 2021 License: Apache-2.0, MIT Imports: 46 Imported by: 0

README

sector-storage

CircleCI standard-readme compliant

a concrete implementation of the specs-storage interface

The sector-storage project provides a implementation-nonspecific reference implementation of the specs-storage interface.

Disclaimer

Please report your issues with regards to sector-storage at the lotus issue tracker

Architecture

high-level architecture

Manager

Manages is the top-level piece of the storage system gluing all the other pieces together. It also implements scheduling logic.

package stores

This package implements the sector storage subsystem. Fundamentally the storage is divided into paths, each path has it's UUID, and stores a set of sector 'files'. There are currently 3 types of sector files - unsealed, sealed, and cache.

Paths can be shared between nodes by sharing the underlying filesystem.

stores.Local

The Local store implements SectorProvider for paths mounted in the local filesystem. Paths can be shared between nodes, and support shared filesystems such as NFS.

stores.Local implements all native filesystem-related operations

stores.Remote

The Remote store extends Local store, handles fetching sector files into a local store if needed, and handles removing sectors from non-local stores.

stores.Index

The Index is a singleton holding metadata about storage paths, and a mapping of sector files to paths

LocalWorker

LocalWorker implements the Worker interface with ffiwrapper.Sealer and a store.Store instance

License

The Filecoin Project is dual-licensed under Apache 2.0 and MIT terms:

Documentation

Index

Constants

View Source
const (
	// ResourceFilteringHardware specifies that available hardware resources
	// should be evaluated when scheduling a task against the worker.
	ResourceFilteringHardware = ResourceFilteringStrategy("hardware")

	// ResourceFilteringDisabled disables resource filtering against this
	// worker. The scheduler may assign any task to this worker.
	ResourceFilteringDisabled = ResourceFilteringStrategy("disabled")
)

Variables

View Source
var ClosedWorkerID = uuid.UUID{}
View Source
var DefaultSchedPriority = 0
View Source
var ErrNoWorkers = errors.New("no suitable workers found")
View Source
var InitWait = 3 * time.Second
View Source
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M

For small read skips, it's faster to "burn" some bytes than to setup new sector reader. Assuming 1ms stream seek latency, and 1G/s stream rate, we're willing to discard up to 1 MiB.

View Source
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
View Source
var SchedPriorityKey schedPrioCtxKey
View Source
var (
	SchedWindows = 2
)
View Source
var SelectorTimeout = 5 * time.Second

Functions

func WithPriority

func WithPriority(ctx context.Context, priority int) context.Context

Types

type Call added in v1.1.3

type Call struct {
	ID      storiface.CallID
	RetType ReturnType

	State CallState

	Result *ManyBytes // json bytes
}

func (*Call) MarshalCBOR added in v1.1.3

func (t *Call) MarshalCBOR(w io.Writer) error

func (*Call) UnmarshalCBOR added in v1.1.3

func (t *Call) UnmarshalCBOR(r io.Reader) error

type CallState added in v1.1.3

type CallState uint64
const (
	CallStarted CallState = iota
	CallDone
)

type EnvFunc added in v1.13.2

type EnvFunc func(string) (string, bool)

type ExecutorFunc added in v1.1.3

type ExecutorFunc func() (ffiwrapper.Storage, error)

used do provide custom proofs impl (mostly used in testing)

type FaultTracker

type FaultTracker interface {
	CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error)
}

FaultTracker TODO: Track things more actively

type LocalWorker

type LocalWorker struct {
	// contains filtered or unexported fields
}

func NewLocalWorker

func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker

func (*LocalWorker) AddPiece

func (*LocalWorker) Close

func (l *LocalWorker) Close() error

func (*LocalWorker) Fetch

func (*LocalWorker) FinalizeSector

func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error)

func (*LocalWorker) Info

func (*LocalWorker) MoveStorage

func (l *LocalWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error)

func (*LocalWorker) NewSector

func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error

func (*LocalWorker) Paths

func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error)

func (*LocalWorker) ReleaseUnsealed

func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error)

func (*LocalWorker) Remove

func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error

func (*LocalWorker) SealCommit1

func (*LocalWorker) SealCommit2

func (l *LocalWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (storiface.CallID, error)

func (*LocalWorker) SealPreCommit1

func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error)

func (*LocalWorker) SealPreCommit2

func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (storiface.CallID, error)

func (*LocalWorker) Session added in v1.1.3

func (l *LocalWorker) Session(ctx context.Context) (uuid.UUID, error)

func (*LocalWorker) TaskDisable added in v1.2.2

func (l *LocalWorker) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error

func (*LocalWorker) TaskEnable added in v1.2.2

func (l *LocalWorker) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error

func (*LocalWorker) TaskTypes

func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)

func (*LocalWorker) UnsealPiece

func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error)

func (*LocalWorker) WaitQuiet added in v1.1.3

func (l *LocalWorker) WaitQuiet()

WaitQuiet blocks as long as there are tasks running

type Manager

type Manager struct {
	storage.Prover
	// contains filtered or unexported fields
}

func (*Manager) Abort added in v1.1.3

func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error

func (*Manager) AddLocalStorage

func (m *Manager) AddLocalStorage(ctx context.Context, path string) error

func (*Manager) AddPiece

func (m *Manager) AddPiece(ctx context.Context, sector storage.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error)

func (*Manager) AddWorker

func (m *Manager) AddWorker(ctx context.Context, w Worker) error

func (*Manager) CheckProvable

func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storage.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error)

CheckProvable returns unprovable sectors

func (*Manager) Close

func (m *Manager) Close(ctx context.Context) error

func (*Manager) FinalizeSector

func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) error

func (*Manager) FsStat

func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, error)

func (*Manager) NewSector

func (m *Manager) NewSector(ctx context.Context, sector storage.SectorRef) error

func (*Manager) ReleaseUnsealed

func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) error

func (*Manager) Remove

func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error

func (*Manager) ReturnAddPiece added in v1.1.3

func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error

func (*Manager) ReturnFetch added in v1.1.3

func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnFinalizeSector added in v1.1.3

func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnMoveStorage added in v1.1.3

func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnReadPiece added in v1.1.3

func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error

func (*Manager) ReturnReleaseUnsealed added in v1.1.3

func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnSealCommit1 added in v1.1.3

func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error

func (*Manager) ReturnSealCommit2 added in v1.1.3

func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error

func (*Manager) ReturnSealPreCommit1 added in v1.1.3

func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error

func (*Manager) ReturnSealPreCommit2 added in v1.1.3

func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error

func (*Manager) ReturnUnsealPiece added in v1.1.3

func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) SchedDiag

func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, error)

func (*Manager) SealCommit1

func (m *Manager) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error)

func (*Manager) SealCommit2

func (m *Manager) SealCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.Commit1Out) (out storage.Proof, err error)

func (*Manager) SealPreCommit1

func (m *Manager) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error)

func (*Manager) SealPreCommit2

func (m *Manager) SealPreCommit2(ctx context.Context, sector storage.SectorRef, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error)

func (*Manager) SectorsUnsealPiece added in v1.11.0

func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed *cid.Cid) error

SectorsUnsealPiece will Unseal the Sealed sector file for the given sector. It will schedule the Unsealing task on a worker that either already has the sealed sector files or has space in one of it's sealing scratch spaces to store them after fetching them from another worker. If the chosen worker already has the Unsealed sector file, we will NOT Unseal the sealed sector file again.

func (*Manager) ServeHTTP

func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Manager) StorageLocal

func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error)

func (*Manager) WorkerJobs

func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob

func (*Manager) WorkerStats

func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats

type ManagerStateStore added in v1.1.3

type ManagerStateStore *statestore.StateStore

type ManyBytes added in v1.1.3

type ManyBytes struct {
	// contains filtered or unexported fields
}

Ideally this would be a tag on the struct field telling cbor-gen to enforce higher max-len

func (*ManyBytes) MarshalCBOR added in v1.1.3

func (t *ManyBytes) MarshalCBOR(w io.Writer) error

func (*ManyBytes) UnmarshalCBOR added in v1.1.3

func (t *ManyBytes) UnmarshalCBOR(r io.Reader) error

type PieceProvider added in v1.11.0

type PieceProvider interface {
	// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
	// pieceOffset + pieceSize specify piece bounds for unsealing (note: with SDR the entire sector will be unsealed by
	//  default in most cases, but this might matter with future PoRep)
	// startOffset is added to the pieceOffset to get the starting reader offset.
	// The number of bytes that can be read is pieceSize-startOffset
	ReadPiece(ctx context.Context, sector storage.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (mount.Reader, bool, error)
	IsUnsealed(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}

func NewPieceProvider added in v1.11.0

func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unsealer) PieceProvider

type ResourceFilteringStrategy added in v1.11.0

type ResourceFilteringStrategy string

ResourceFilteringStrategy is an enum indicating the kinds of resource filtering strategies that can be configured for workers.

type ReturnType added in v1.1.3

type ReturnType string
const (
	AddPiece        ReturnType = "AddPiece"
	SealPreCommit1  ReturnType = "SealPreCommit1"
	SealPreCommit2  ReturnType = "SealPreCommit2"
	SealCommit1     ReturnType = "SealCommit1"
	SealCommit2     ReturnType = "SealCommit2"
	FinalizeSector  ReturnType = "FinalizeSector"
	ReleaseUnsealed ReturnType = "ReleaseUnsealed"
	MoveStorage     ReturnType = "MoveStorage"
	UnsealPiece     ReturnType = "UnsealPiece"
	Fetch           ReturnType = "Fetch"
)

type SchedDiagInfo

type SchedDiagInfo struct {
	Requests    []SchedDiagRequestInfo
	OpenWindows []string
}

type SchedDiagRequestInfo

type SchedDiagRequestInfo struct {
	Sector   abi.SectorID
	TaskType sealtasks.TaskType
	Priority int
}

type SealerConfig

type SealerConfig struct {
	ParallelFetchLimit int

	// Local worker config
	AllowAddPiece   bool
	AllowPreCommit1 bool
	AllowPreCommit2 bool
	AllowCommit     bool
	AllowUnseal     bool

	// ResourceFiltering instructs the system which resource filtering strategy
	// to use when evaluating tasks against this worker. An empty value defaults
	// to "hardware".
	ResourceFiltering ResourceFilteringStrategy
}

type StorageAuth

type StorageAuth http.Header

type Unsealer added in v1.11.0

type Unsealer interface {
	// SectorsUnsealPiece will Unseal a Sealed sector file for the given sector.
	SectorsUnsealPiece(ctx context.Context, sector storage.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error
}

type WorkID added in v1.1.3

type WorkID struct {
	Method sealtasks.TaskType
	Params string // json [...params]
}

func (*WorkID) MarshalCBOR added in v1.1.3

func (t *WorkID) MarshalCBOR(w io.Writer) error

func (WorkID) String added in v1.1.3

func (w WorkID) String() string

func (*WorkID) UnmarshalCBOR added in v1.1.3

func (t *WorkID) UnmarshalCBOR(r io.Reader) error

type WorkState added in v1.1.3

type WorkState struct {
	ID WorkID

	Status WorkStatus

	WorkerCall storiface.CallID // Set when entering wsRunning
	WorkError  string           // Status = wsDone, set when failed to start work

	WorkerHostname string // hostname of last worker handling this job
	StartTime      int64  // unix seconds
}

func (*WorkState) MarshalCBOR added in v1.1.3

func (t *WorkState) MarshalCBOR(w io.Writer) error

func (*WorkState) UnmarshalCBOR added in v1.1.3

func (t *WorkState) UnmarshalCBOR(r io.Reader) error

type WorkStatus added in v1.1.3

type WorkStatus string

type Worker

type Worker interface {
	storiface.WorkerCalls

	TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)

	// Returns paths accessible to the worker
	Paths(context.Context) ([]stores.StoragePath, error)

	Info(context.Context) (storiface.WorkerInfo, error)

	Session(context.Context) (uuid.UUID, error)

	Close() error // TODO: do we need this?
}

type WorkerAction

type WorkerAction func(ctx context.Context, w Worker) error

type WorkerConfig

type WorkerConfig struct {
	TaskTypes []sealtasks.TaskType
	NoSwap    bool

	// IgnoreResourceFiltering enables task distribution to happen on this
	// worker regardless of its currently available resources. Used in testing
	// with the local worker.
	IgnoreResourceFiltering bool
}

type WorkerSelector

type WorkerSelector interface {
	Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task

	Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b
}

type WorkerStateStore added in v1.1.3

type WorkerStateStore *statestore.StateStore

Directories

Path Synopsis
mocks
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL