sealer

package
v1.32.0-rc2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2024 License: Apache-2.0, MIT Imports: 52 Imported by: 21

README

sector-storage

standard-readme compliant

a concrete implementation of the specs-storage interface

The sector-storage project provides a implementation-nonspecific reference implementation of the specs-storage interface.

Disclaimer

Please report your issues with regards to sector-storage at the lotus issue tracker

Architecture

high-level architecture

Manager

Manages is the top-level piece of the storage system gluing all the other pieces together. It also implements scheduling logic.

package paths

This package implements the sector storage subsystem. Fundamentally the storage is divided into paths, each path has it's UUID, and stores a set of sector 'files'. There are currently 5 types of sector files - unsealed, sealed, cache, update and update-cache.

Paths can be shared between nodes by sharing the underlying filesystem.

paths.Local

The Local store implements SectorProvider for paths mounted in the local filesystem. Paths can be shared between nodes, and support shared filesystems such as NFS.

stores.Local implements all native filesystem-related operations

paths.Remote

The Remote store extends Local store, handles fetching sector files into a local store if needed, and handles removing sectors from non-local stores.

paths.Index

The Index is a singleton holding metadata about storage paths, and a mapping of sector files to paths

LocalWorker

LocalWorker implements the Worker interface with ffiwrapper.Sealer and a store.Store instance

License

The Filecoin Project is dual-licensed under Apache 2.0 and MIT terms:

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ClosedWorkerID = uuid.UUID{}
View Source
var DefaultSchedPriority = 0
View Source
var ErrNoWorkers = errors.New("no suitable workers found")
View Source
var InitWait = 3 * time.Second
View Source
var MaxPieceReaderBurnBytes int64 = 1 << 20 // 1M

For small read skips, it's faster to "burn" some bytes than to setup new sector reader. Assuming 1ms stream seek latency, and 1G/s stream rate, we're willing to discard up to 1 MiB.

View Source
var MinRandomReadSize = int64(4 << 10)
View Source
var ReadBuf = 128 * (127 * 8) // unpadded(128k)
View Source
var SchedPriorityKey schedPrioCtxKey
View Source
var (
	SchedWindows = 2
)
View Source
var SelectorTimeout = 5 * time.Second

Functions

func FFIExec added in v1.25.2

func FFIExec(opts ...ffiwrapper.FFIWrapperOpt) func(l *LocalWorker) (storiface.Storage, error)

func LowestUtilizationWS

func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int

func RandomWS added in v1.23.0

func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int

func SpreadTasksWS added in v1.23.0

func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int

func SpreadWS

func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int

func WithPriority

func WithPriority(ctx context.Context, priority int) context.Context

Types

type ActiveResources

type ActiveResources struct {
	// contains filtered or unexported fields
}

func NewActiveResources

func NewActiveResources(tc *taskCounter) *ActiveResources

func (*ActiveResources) Add

Add task resources to ActiveResources and return utilization difference

func (*ActiveResources) CanHandleRequest

func (a *ActiveResources) CanHandleRequest(schedID uuid.UUID, tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool

CanHandleRequest evaluates if the worker has enough available resources to handle the request.

func (*ActiveResources) Free

type Assigner

type Assigner interface {
	TrySched(sh *Scheduler)
}

func NewLowestUtilizationAssigner

func NewLowestUtilizationAssigner() Assigner

func NewRandomAssigner added in v1.23.0

func NewRandomAssigner() Assigner

func NewSpreadAssigner

func NewSpreadAssigner(queued bool) Assigner

func NewSpreadTasksAssigner added in v1.23.0

func NewSpreadTasksAssigner(queued bool) Assigner

type AssignerCommon

type AssignerCommon struct {
	WindowSel WindowSelector
}

AssignerCommon is a task assigner with customizable parts

func (*AssignerCommon) TrySched

func (a *AssignerCommon) TrySched(sh *Scheduler)

type Call

type Call struct {
	ID      storiface.CallID
	RetType ReturnType

	State CallState

	Result *ManyBytes // json bytes
}

func (*Call) MarshalCBOR

func (t *Call) MarshalCBOR(w io.Writer) error

func (*Call) UnmarshalCBOR

func (t *Call) UnmarshalCBOR(r io.Reader) (err error)

type CallState

type CallState uint64
const (
	CallStarted CallState = iota
	CallDone
)

type EnvFunc

type EnvFunc func(string) (string, bool)

type ExecutorFunc

type ExecutorFunc func(w *LocalWorker) (storiface.Storage, error)

type FaultTracker

type FaultTracker interface {
	CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storiface.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error)
}

FaultTracker TODO: Track things more actively

type LocalWorker

type LocalWorker struct {
	// contains filtered or unexported fields
}

func NewLocalWorker

func NewLocalWorker(wcfg WorkerConfig, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker

func NewLocalWorkerWithExecutor added in v1.25.2

func NewLocalWorkerWithExecutor(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker

func (*LocalWorker) AddPiece

func (*LocalWorker) Close

func (l *LocalWorker) Close() error

func (*LocalWorker) DataCid

func (l *LocalWorker) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (storiface.CallID, error)

func (*LocalWorker) Done added in v1.17.1

func (l *LocalWorker) Done() <-chan struct{}

func (*LocalWorker) DownloadSectorData added in v1.17.2

func (l *LocalWorker) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) (storiface.CallID, error)

func (*LocalWorker) Fetch

func (*LocalWorker) FinalizeReplicaUpdate

func (l *LocalWorker) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error)

func (*LocalWorker) FinalizeSector

func (l *LocalWorker) FinalizeSector(ctx context.Context, sector storiface.SectorRef) (storiface.CallID, error)

func (*LocalWorker) GenerateSectorKeyFromData

func (l *LocalWorker) GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) (storiface.CallID, error)

func (*LocalWorker) GenerateWindowPoSt

func (l *LocalWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error)

func (*LocalWorker) GenerateWindowPoStAdv added in v1.25.2

func (l *LocalWorker) GenerateWindowPoStAdv(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness, allowSkip bool) (storiface.WindowPoStResult, error)

func (*LocalWorker) GenerateWinningPoSt

func (l *LocalWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]proof.PoStProof, error)

func (*LocalWorker) Info

func (*LocalWorker) MoveStorage

func (*LocalWorker) NewSector

func (l *LocalWorker) NewSector(ctx context.Context, sector storiface.SectorRef) error

func (*LocalWorker) Paths

func (*LocalWorker) ProveReplicaUpdate1

func (l *LocalWorker) ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error)

func (*LocalWorker) ProveReplicaUpdate2

func (l *LocalWorker) ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (storiface.CallID, error)

func (*LocalWorker) ReleaseUnsealed

func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) (storiface.CallID, error)

func (*LocalWorker) Remove

func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error

func (*LocalWorker) ReplicaUpdate

func (l *LocalWorker) ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (storiface.CallID, error)

func (*LocalWorker) SealCommit1

func (*LocalWorker) SealCommit2

func (l *LocalWorker) SealCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.Commit1Out) (storiface.CallID, error)

func (*LocalWorker) SealPreCommit1

func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error)

func (*LocalWorker) SealPreCommit2

func (l *LocalWorker) SealPreCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.PreCommit1Out) (storiface.CallID, error)

func (*LocalWorker) Session

func (l *LocalWorker) Session(ctx context.Context) (uuid.UUID, error)

func (*LocalWorker) TaskDisable

func (l *LocalWorker) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error

func (*LocalWorker) TaskEnable

func (l *LocalWorker) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error

func (*LocalWorker) TaskTypes

func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)

func (*LocalWorker) UnsealPiece

func (l *LocalWorker) UnsealPiece(ctx context.Context, sector storiface.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error)

func (*LocalWorker) WaitQuiet

func (l *LocalWorker) WaitQuiet()

WaitQuiet blocks as long as there are tasks running

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

func (*Manager) Abort

func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error

func (*Manager) AddLocalStorage

func (m *Manager) AddLocalStorage(ctx context.Context, path string) error

func (*Manager) AddPiece

func (m *Manager) AddPiece(ctx context.Context, sector storiface.SectorRef, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error)

func (*Manager) AddWorker

func (m *Manager) AddWorker(ctx context.Context, w Worker) error

func (*Manager) CheckProvable

func (m *Manager) CheckProvable(ctx context.Context, pp abi.RegisteredPoStProof, sectors []storiface.SectorRef, rg storiface.RGetter) (map[abi.SectorID]string, error)

CheckProvable returns unprovable sectors

func (*Manager) Close

func (m *Manager) Close(ctx context.Context) error

func (*Manager) DataCid

func (m *Manager) DataCid(ctx context.Context, pieceSize abi.UnpaddedPieceSize, pieceData storiface.Data) (abi.PieceInfo, error)

func (*Manager) DetachLocalStorage added in v1.17.1

func (m *Manager) DetachLocalStorage(ctx context.Context, path string) error

func (*Manager) DownloadSectorData added in v1.17.2

func (m *Manager) DownloadSectorData(ctx context.Context, sector storiface.SectorRef, finalized bool, src map[storiface.SectorFileType]storiface.SectorLocation) error

func (*Manager) FinalizeReplicaUpdate

func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storiface.SectorRef) error

func (*Manager) FinalizeSector

func (m *Manager) FinalizeSector(ctx context.Context, sector storiface.SectorRef) error

func (*Manager) FsStat

func (m *Manager) FsStat(ctx context.Context, id storiface.ID) (fsutil.FsStat, error)

func (*Manager) GenerateSectorKeyFromData

func (m *Manager) GenerateSectorKeyFromData(ctx context.Context, sector storiface.SectorRef, commD cid.Cid) error

func (*Manager) GenerateWindowPoSt

func (m *Manager) GenerateWindowPoSt(ctx context.Context, minerID abi.ActorID, postProofType abi.RegisteredPoStProof, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) (proof []proof.PoStProof, skipped []abi.SectorID, err error)

func (*Manager) GenerateWindowPoStWithVanilla

func (m *Manager) GenerateWindowPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte, partitionIdx int) (proof.PoStProof, error)

func (*Manager) GenerateWinningPoSt

func (m *Manager) GenerateWinningPoSt(ctx context.Context, minerID abi.ActorID, sectorInfo []proof.ExtendedSectorInfo, randomness abi.PoStRandomness) ([]proof.PoStProof, error)

func (*Manager) GenerateWinningPoStWithVanilla

func (m *Manager) GenerateWinningPoStWithVanilla(ctx context.Context, proofType abi.RegisteredPoStProof, minerID abi.ActorID, randomness abi.PoStRandomness, proofs [][]byte) ([]proof.PoStProof, error)

func (*Manager) NewSector

func (m *Manager) NewSector(ctx context.Context, sector storiface.SectorRef) error

func (*Manager) ProveReplicaUpdate1

func (m *Manager) ProveReplicaUpdate1(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (out storiface.ReplicaVanillaProofs, err error)

func (*Manager) ProveReplicaUpdate2

func (m *Manager) ProveReplicaUpdate2(ctx context.Context, sector storiface.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storiface.ReplicaVanillaProofs) (out storiface.ReplicaUpdateProof, err error)

func (*Manager) RedeclareLocalStorage added in v1.17.1

func (m *Manager) RedeclareLocalStorage(ctx context.Context, id *storiface.ID, dropMissing bool) error

func (*Manager) ReleaseReplicaUpgrade

func (m *Manager) ReleaseReplicaUpgrade(ctx context.Context, sector storiface.SectorRef) error

func (*Manager) ReleaseSectorKey

func (m *Manager) ReleaseSectorKey(ctx context.Context, sector storiface.SectorRef) error

func (*Manager) ReleaseUnsealed

func (m *Manager) ReleaseUnsealed(ctx context.Context, sector storiface.SectorRef, keepUnsealed []storiface.Range) error

func (*Manager) Remove

func (m *Manager) Remove(ctx context.Context, sector storiface.SectorRef) error

func (*Manager) RemoveSchedRequest added in v1.17.1

func (m *Manager) RemoveSchedRequest(ctx context.Context, schedId uuid.UUID) error

func (*Manager) ReplicaUpdate

func (m *Manager) ReplicaUpdate(ctx context.Context, sector storiface.SectorRef, pieces []abi.PieceInfo) (out storiface.ReplicaUpdateOut, err error)

func (*Manager) ReturnAddPiece

func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error

func (*Manager) ReturnDataCid

func (m *Manager) ReturnDataCid(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error

func (*Manager) ReturnDownloadSector added in v1.17.2

func (m *Manager) ReturnDownloadSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnFetch

func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnFinalizeReplicaUpdate

func (m *Manager) ReturnFinalizeReplicaUpdate(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnFinalizeSector

func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnGenerateSectorKeyFromData

func (m *Manager) ReturnGenerateSectorKeyFromData(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnMoveStorage

func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnProveReplicaUpdate1

func (m *Manager) ReturnProveReplicaUpdate1(ctx context.Context, callID storiface.CallID, out storiface.ReplicaVanillaProofs, err *storiface.CallError) error

func (*Manager) ReturnProveReplicaUpdate2

func (m *Manager) ReturnProveReplicaUpdate2(ctx context.Context, callID storiface.CallID, proof storiface.ReplicaUpdateProof, err *storiface.CallError) error

func (*Manager) ReturnReadPiece

func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error

func (*Manager) ReturnReleaseUnsealed

func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) ReturnReplicaUpdate

func (m *Manager) ReturnReplicaUpdate(ctx context.Context, callID storiface.CallID, out storiface.ReplicaUpdateOut, err *storiface.CallError) error

func (*Manager) ReturnSealCommit1

func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storiface.Commit1Out, err *storiface.CallError) error

func (*Manager) ReturnSealCommit2

func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storiface.Proof, err *storiface.CallError) error

func (*Manager) ReturnSealPreCommit1

func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storiface.PreCommit1Out, err *storiface.CallError) error

func (*Manager) ReturnSealPreCommit2

func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storiface.SectorCids, err *storiface.CallError) error

func (*Manager) ReturnUnsealPiece

func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error

func (*Manager) SchedDiag

func (m *Manager) SchedDiag(ctx context.Context, doSched bool) (interface{}, error)

func (*Manager) SealCommit1

func (m *Manager) SealCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storiface.SectorCids) (out storiface.Commit1Out, err error)

func (*Manager) SealCommit2

func (m *Manager) SealCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.Commit1Out) (out storiface.Proof, err error)

func (*Manager) SealPreCommit1

func (m *Manager) SealPreCommit1(ctx context.Context, sector storiface.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storiface.PreCommit1Out, err error)

func (*Manager) SealPreCommit2

func (m *Manager) SealPreCommit2(ctx context.Context, sector storiface.SectorRef, phase1Out storiface.PreCommit1Out) (out storiface.SectorCids, err error)

func (*Manager) SectorsUnsealPiece

func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed *cid.Cid) error

SectorsUnsealPiece will Unseal the Sealed sector file for the given sector. It will schedule the Unsealing task on a worker that either already has the sealed sector files or has space in one of it's sealing scratch spaces to store them after fetching them from another worker. If the chosen worker already has the Unsealed sector file, we will NOT Unseal the sealed sector file again.

func (*Manager) ServeHTTP

func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*Manager) StorageLocal

func (m *Manager) StorageLocal(ctx context.Context) (map[storiface.ID]string, error)

func (*Manager) WorkerJobs

func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob

func (*Manager) WorkerStats

func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.WorkerStats

type ManagerStateStore

type ManagerStateStore *statestore.StateStore

type ManyBytes

type ManyBytes struct {
	// contains filtered or unexported fields
}

func (*ManyBytes) MarshalCBOR

func (t *ManyBytes) MarshalCBOR(w io.Writer) error

func (*ManyBytes) UnmarshalCBOR

func (t *ManyBytes) UnmarshalCBOR(r io.Reader) error

type PieceProvider

type PieceProvider interface {
	// ReadPiece is used to read an Unsealed piece at the given offset and of the given size from a Sector
	// pieceOffset + pieceSize specify piece bounds for unsealing (note: with SDR the entire sector will be unsealed by
	//  default in most cases, but this might matter with future PoRep)
	// startOffset is added to the pieceOffset to get the starting reader offset.
	// The number of bytes that can be read is pieceSize-startOffset
	ReadPiece(ctx context.Context, sector storiface.SectorRef, pieceOffset storiface.UnpaddedByteIndex, pieceSize abi.UnpaddedPieceSize, ticket abi.SealRandomness, unsealed cid.Cid) (storiface.Reader, bool, error)
	IsUnsealed(ctx context.Context, sector storiface.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error)
}

func NewPieceProvider

func NewPieceProvider(storage *paths.Remote, index paths.SectorIndex, uns Unsealer) PieceProvider

type PrepareAction added in v1.23.0

type PrepareAction struct {
	Action   WorkerAction
	PrepType sealtasks.TaskType
}

type RequestQueue

type RequestQueue []*WorkerRequest

func (RequestQueue) Len

func (q RequestQueue) Len() int

func (RequestQueue) Less

func (q RequestQueue) Less(i, j int) bool

func (*RequestQueue) Push

func (q *RequestQueue) Push(x *WorkerRequest)

func (*RequestQueue) Remove

func (q *RequestQueue) Remove(i int) *WorkerRequest

func (RequestQueue) Swap

func (q RequestQueue) Swap(i, j int)

type ReturnType

type ReturnType string
const (
	DataCid               ReturnType = "DataCid"
	AddPiece              ReturnType = "AddPiece"
	SealPreCommit1        ReturnType = "SealPreCommit1"
	SealPreCommit2        ReturnType = "SealPreCommit2"
	SealCommit1           ReturnType = "SealCommit1"
	SealCommit2           ReturnType = "SealCommit2"
	FinalizeSector        ReturnType = "FinalizeSector"
	FinalizeReplicaUpdate ReturnType = "FinalizeReplicaUpdate"
	ReplicaUpdate         ReturnType = "ReplicaUpdate"
	ProveReplicaUpdate1   ReturnType = "ProveReplicaUpdate1"
	ProveReplicaUpdate2   ReturnType = "ProveReplicaUpdate2"
	GenerateSectorKey     ReturnType = "GenerateSectorKey"
	ReleaseUnsealed       ReturnType = "ReleaseUnsealed"
	MoveStorage           ReturnType = "MoveStorage"
	UnsealPiece           ReturnType = "UnsealPiece"
	DownloadSector        ReturnType = "DownloadSector"
	Fetch                 ReturnType = "Fetch"
)

type SchedDiagInfo

type SchedDiagInfo struct {
	Requests    []SchedDiagRequestInfo
	OpenWindows []string
}

type SchedDiagRequestInfo

type SchedDiagRequestInfo struct {
	Sector   abi.SectorID
	TaskType sealtasks.TaskType
	Priority int
	SchedId  uuid.UUID
}

type SchedWindow

type SchedWindow struct {
	Allocated ActiveResources
	Todo      []*WorkerRequest
}

type SchedWindowRequest

type SchedWindowRequest struct {
	Worker storiface.WorkerID

	Done chan *SchedWindow
}

type SchedWorker added in v1.20.0

type SchedWorker interface {
	TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
	Paths(context.Context) ([]storiface.StoragePath, error)
	Utilization() float64
}

type Scheduler

type Scheduler struct {
	Workers map[storiface.WorkerID]*WorkerHandle

	// owned by the sh.runSched goroutine
	SchedQueue  *RequestQueue
	OpenWindows []*SchedWindowRequest
	// contains filtered or unexported fields
}

func (*Scheduler) Close

func (sh *Scheduler) Close(ctx context.Context) error

func (*Scheduler) Info

func (sh *Scheduler) Info(ctx context.Context) (interface{}, error)

func (*Scheduler) RemoveRequest added in v1.17.1

func (sh *Scheduler) RemoveRequest(ctx context.Context, schedId uuid.UUID) error

func (*Scheduler) Schedule

func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare PrepareAction, work WorkerAction) error

type StorageAuth

type StorageAuth http.Header

type Unsealer

type Unsealer interface {
	// SectorsUnsealPiece will Unseal a Sealed sector file for the given sector.
	SectorsUnsealPiece(ctx context.Context, sector storiface.SectorRef, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, commd *cid.Cid) error
}

type WindowSelector

type WindowSelector func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int

type WorkID

type WorkID struct {
	Method sealtasks.TaskType
	Params string // json [...params]
}

func (*WorkID) MarshalCBOR

func (t *WorkID) MarshalCBOR(w io.Writer) error

func (WorkID) String

func (w WorkID) String() string

func (*WorkID) UnmarshalCBOR

func (t *WorkID) UnmarshalCBOR(r io.Reader) (err error)

type WorkState

type WorkState struct {
	ID WorkID

	Status WorkStatus

	WorkerCall storiface.CallID // Set when entering wsRunning
	WorkError  string           // Status = wsDone, set when failed to start work

	WorkerHostname string // hostname of last worker handling this job
	StartTime      int64  // unix seconds
}

func (*WorkState) MarshalCBOR

func (t *WorkState) MarshalCBOR(w io.Writer) error

func (*WorkState) UnmarshalCBOR

func (t *WorkState) UnmarshalCBOR(r io.Reader) (err error)

type WorkStatus

type WorkStatus string

type Worker

type Worker interface {
	storiface.WorkerCalls

	TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)

	// Returns paths accessible to the worker
	Paths(context.Context) ([]storiface.StoragePath, error)

	Info(context.Context) (storiface.WorkerInfo, error)

	Session(context.Context) (uuid.UUID, error)

	Close() error // TODO: do we need this?
}

type WorkerAction

type WorkerAction func(ctx context.Context, w Worker) error

type WorkerConfig

type WorkerConfig struct {
	TaskTypes []sealtasks.TaskType
	NoSwap    bool

	// os.Hostname if not set
	Name string

	// IgnoreResourceFiltering enables task distribution to happen on this
	// worker regardless of its currently available resources. Used in testing
	// with the local worker.
	IgnoreResourceFiltering bool

	MaxParallelChallengeReads int           // 0 = no limit
	ChallengeReadTimeout      time.Duration // 0 = no timeout
}

type WorkerHandle

type WorkerHandle struct {
	Info storiface.WorkerInfo

	Enabled bool
	// contains filtered or unexported fields
}

func (*WorkerHandle) TaskCount added in v1.23.0

func (wh *WorkerHandle) TaskCount(tt *sealtasks.SealTaskType) int

func (*WorkerHandle) TaskCounts added in v1.23.0

func (wh *WorkerHandle) TaskCounts() int

func (*WorkerHandle) Utilization

func (wh *WorkerHandle) Utilization() float64

type WorkerRequest

type WorkerRequest struct {
	Sector   storiface.SectorRef
	TaskType sealtasks.TaskType
	Priority int // larger values more important
	Sel      WorkerSelector
	SchedId  uuid.UUID

	IndexHeap int

	Ctx context.Context
	// contains filtered or unexported fields
}

func (*WorkerRequest) PrepSealTask added in v1.23.0

func (r *WorkerRequest) PrepSealTask() sealtasks.SealTaskType

func (*WorkerRequest) SealTask

func (r *WorkerRequest) SealTask() sealtasks.SealTaskType

type WorkerSelector

type WorkerSelector interface {
	// Ok is true if worker is acceptable for performing a task.
	// If any worker is preferred for a task, other workers won't be considered for that task.
	Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (ok, preferred bool, err error)

	Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) // true if a is preferred over b
}

type WorkerStateStore

type WorkerStateStore *statestore.StateStore

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL