assets

package
v0.1.21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 31, 2024 License: MIT Imports: 36 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// FailedStates contains a list of asset pull states that represent failures.
	FailedStates = []string{
		SeedFailed.String(),
		CandidatesFailed.String(),
		EdgesFailed.String(),
		UploadFailed.String(),
	}

	// PullingStates contains a list of asset pull states that represent pulling.
	PullingStates = []string{
		SeedSelect.String(),
		SeedSync.String(),
		SeedPulling.String(),
		UploadInit.String(),
		SeedUploading.String(),
		CandidatesSelect.String(),
		CandidatesPulling.String(),
		EdgesSelect.String(),
		EdgesPulling.String(),
	}

	// ActiveStates contains a list of asset pull states that represent active.
	ActiveStates = append(append([]string{Servicing.String(), Stop.String()}, FailedStates...), PullingStates...)
)

Functions

This section is empty.

Types

type AssetForceState

type AssetForceState struct {
	State AssetState
	// Requester  string
	Details        string
	SeedNodeIDs    []string
	DownloadSource *SourceDownloadInfo
}

AssetForceState forces an asset state

type AssetHash

type AssetHash string

AssetHash is an identifier for a asset.

func (AssetHash) String

func (c AssetHash) String() string

type AssetPullingInfo

type AssetPullingInfo struct {
	State             AssetState
	Hash              AssetHash
	CID               string
	Size              int64
	Blocks            int64
	EdgeReplicas      int64
	CandidateReplicas int64
	Bandwidth         int64

	EdgeReplicaSucceeds      []string
	CandidateReplicaSucceeds []string
	EdgeWaitings             int64
	CandidateWaitings        int64

	RetryCount        int64
	ReplenishReplicas int64

	Details     string
	SeedNodeIDs []string

	Source         AssetSource
	DownloadSource *SourceDownloadInfo

	Note string
}

AssetPullingInfo represents asset pull information

func (*AssetPullingInfo) MarshalCBOR

func (t *AssetPullingInfo) MarshalCBOR(w io.Writer) error

func (*AssetPullingInfo) ToAssetRecord

func (state *AssetPullingInfo) ToAssetRecord() *types.AssetRecord

ToAssetRecord converts AssetPullingInfo to types.AssetRecord

func (*AssetPullingInfo) UnmarshalCBOR

func (t *AssetPullingInfo) UnmarshalCBOR(r io.Reader) (err error)

type AssetRePull

type AssetRePull struct{}

AssetRePull re-pull the asset

func (AssetRePull) Ignore added in v0.1.10

func (evt AssetRePull) Ignore()

Ignore Ignore

type AssetSource added in v0.1.18

type AssetSource int64

AssetSource aws or storage

const (
	// AssetSourceIPFS from ipfs
	AssetSourceIPFS AssetSource = iota
	// AssetSourceAWS from aws
	AssetSourceAWS
	// AssetSourceStorage from user
	AssetSourceStorage
	// AssetSourceMinio minio
	AssetSourceMinio
)

type AssetState

type AssetState string

AssetState represents the state of an asset in the process of being pulled.

const (
	// SeedSelect select first candidate to pull seed asset
	SeedSelect AssetState = "SeedSelect"
	// SeedSync sync seed from other scheduler
	SeedSync AssetState = "SeedSync"
	// SeedPulling Waiting for candidate nodes to pull seed asset
	SeedPulling AssetState = "SeedPulling"
	// UploadInit Initialize user upload preparation
	UploadInit AssetState = "UploadInit"
	// SeedUploading Waiting for user to upload asset to candidate node
	SeedUploading AssetState = "SeedUploading"
	// CandidatesSelect select candidates to pull asset
	CandidatesSelect AssetState = "CandidatesSelect"
	// CandidatesPulling candidate nodes pulling asset
	CandidatesPulling AssetState = "CandidatesPulling"
	// EdgesSelect select edges to pull asset
	EdgesSelect AssetState = "EdgesSelect"
	// EdgesPulling edge nodes pulling asset
	EdgesPulling AssetState = "EdgesPulling"
	// Servicing Asset cache completed and in service
	Servicing AssetState = "Servicing"
	// SyncFailed Unable to select candidate nodes or failed to pull seed asset
	SyncFailed AssetState = "SyncFailed"
	// SeedFailed Unable to select candidate nodes or failed to pull seed asset
	SeedFailed AssetState = "SeedFailed"
	// CandidatesFailed Unable to select candidate nodes or failed to pull asset
	CandidatesFailed AssetState = "CandidatesFailed"
	// EdgesFailed  Unable to select edge nodes or failed to pull asset
	EdgesFailed AssetState = "EdgesFailed"
	// UploadFailed User failed to upload assets
	UploadFailed AssetState = "UploadFailed"
	// Remove remove
	Remove AssetState = "Remove"
	// Stop Stop
	Stop AssetState = "Stop"
)

Constants defining various states of the asset pulling process.

func (AssetState) String

func (s AssetState) String() string

String returns the string representation of the AssetState.

type Datastore

type Datastore struct {
	dtypes.ServerID
	// contains filtered or unexported fields
}

Datastore represents the asset datastore

func NewDatastore

func NewDatastore(db *db.SQLDB, serverID dtypes.ServerID) *Datastore

NewDatastore creates a new AssetDatastore

func (*Datastore) Batch

func (d *Datastore) Batch(ctx context.Context) (datastore.Batch, error)

Batch batch

func (*Datastore) Close

func (d *Datastore) Close() error

Close closes the asset datastore

func (*Datastore) Delete

func (d *Datastore) Delete(ctx context.Context, key datastore.Key) error

Delete delete asset record info (This func has no place to call it)

func (*Datastore) Get

func (d *Datastore) Get(ctx context.Context, key datastore.Key) (value []byte, err error)

Get retrieves data from the datastore

func (*Datastore) GetSize

func (d *Datastore) GetSize(ctx context.Context, key datastore.Key) (size int, err error)

GetSize gets the data size from the datastore

func (*Datastore) Has

func (d *Datastore) Has(ctx context.Context, key datastore.Key) (exists bool, err error)

Has checks if the key exists in the datastore

func (*Datastore) Put

func (d *Datastore) Put(ctx context.Context, key datastore.Key, value []byte) error

Put update asset record info

func (*Datastore) Query

func (d *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error)

Query queries asset records from the datastore

func (*Datastore) Sync

func (d *Datastore) Sync(ctx context.Context, prefix datastore.Key) error

Sync sync

type Ignorable

type Ignorable interface {
	Ignore()
}

Ignorable Ignorable

type InfoUpdate

type InfoUpdate struct {
	Size   int64
	Blocks int64
}

InfoUpdate update asset info

func (InfoUpdate) Ignore added in v0.1.10

func (evt InfoUpdate) Ignore()

Ignore Ignore

type Manager

type Manager struct {
	*db.SQLDB
	// contains filtered or unexported fields
}

Manager manages asset replicas

func NewManager

func NewManager(nodeManager *node.Manager, ds datastore.Batching, configFunc dtypes.GetSchedulerConfigFunc, sdb *db.SQLDB, wMgr *workload.Manager) *Manager

NewManager returns a new AssetManager instance

func (*Manager) AssetRemoveDone added in v0.1.11

func (m *Manager) AssetRemoveDone(key string)

AssetRemoveDone Deletion of assets completed

func (*Manager) CandidateDeactivate added in v0.1.11

func (m *Manager) CandidateDeactivate(nodeID string) error

CandidateDeactivate Candidate node are deactivated. Backup their assets.

func (*Manager) CreateAssetPullTask

func (m *Manager) CreateAssetPullTask(info *types.PullAssetReq) error

CreateAssetPullTask create a new asset pull task

func (*Manager) CreateAssetUploadTask added in v0.1.10

func (m *Manager) CreateAssetUploadTask(hash string, req *types.CreateAssetReq) (*types.UploadInfo, error)

CreateAssetUploadTask create a new asset upload task

func (*Manager) CreateSyncAssetTask added in v0.1.21

func (m *Manager) CreateSyncAssetTask(hash string, req *types.CreateSyncAssetReq) error

CreateSyncAssetTask Synchronizing assets from other schedulers

func (*Manager) GenerateTokenForDownloadSource added in v0.1.21

func (m *Manager) GenerateTokenForDownloadSource(nodeID, cid string) (*types.SourceDownloadInfo, error)

GenerateTokenForDownloadSource Generate Token

func (*Manager) GetAssetCount added in v0.1.10

func (m *Manager) GetAssetCount() (int, error)

GetAssetCount get asset count

func (*Manager) ListAssets

func (m *Manager) ListAssets() ([]AssetPullingInfo, error)

ListAssets load asset pull infos from state machine

func (*Manager) Plan

func (m *Manager) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error)

Plan prepares a plan for asset pulling

func (*Manager) RandomAsset

func (m *Manager) RandomAsset(nodeID string, seed int64) (*cid.Cid, error)

RandomAsset get node asset with random seed

func (*Manager) RemoveAsset

func (m *Manager) RemoveAsset(hash string, isWait bool) error

RemoveAsset removes an asset

func (*Manager) RemoveReplica

func (m *Manager) RemoveReplica(cid, hash, nodeID string) error

RemoveReplica remove a replica for node

func (*Manager) RestartPullAssets

func (m *Manager) RestartPullAssets(hashes []types.AssetHash) error

RestartPullAssets restarts asset pulls

func (*Manager) Start

func (m *Manager) Start(ctx context.Context)

Start initializes and starts the asset state machine and associated tickers

func (*Manager) StartFillDiskTimer added in v0.1.13

func (m *Manager) StartFillDiskTimer()

StartFillDiskTimer open

func (*Manager) StopAsset added in v0.1.14

func (m *Manager) StopAsset(hashs []string) error

StopAsset stop an asset

func (*Manager) StopFillDiskTimer added in v0.1.13

func (m *Manager) StopFillDiskTimer()

StopFillDiskTimer close

func (*Manager) Terminate

func (m *Manager) Terminate(ctx context.Context) error

Terminate stops the asset state machine

func (*Manager) UpdateAssetExpiration

func (m *Manager) UpdateAssetExpiration(cid string, t time.Time) error

UpdateAssetExpiration updates the asset expiration for a given CID

func (*Manager) UpdateFillAssetResponseCount added in v0.1.14

func (m *Manager) UpdateFillAssetResponseCount(bucket, cid, nodeID string, size int64)

UpdateFillAssetResponseCount update pull result from aws

func (*Manager) WaitAssetRemove added in v0.1.11

func (m *Manager) WaitAssetRemove(key string) *sync.WaitGroup

WaitAssetRemove Waiting for the state machine to delete an asset

type NodePulledResult

type NodePulledResult struct {
	Status      int64
	BlocksCount int64
	Size        int64
	NodeID      string
}

NodePulledResult represents a result of a node pulling assets

func (*NodePulledResult) MarshalCBOR

func (t *NodePulledResult) MarshalCBOR(w io.Writer) error

func (*NodePulledResult) UnmarshalCBOR

func (t *NodePulledResult) UnmarshalCBOR(r io.Reader) (err error)

type PullAssetFatalError

type PullAssetFatalError struct {
	// contains filtered or unexported fields
}

PullAssetFatalError represents a fatal error in asset pulling

func (PullAssetFatalError) FormatError

func (evt PullAssetFatalError) FormatError(xerrors.Printer) (next error)

FormatError Format error

type PullAssetRestart

type PullAssetRestart struct{}

PullAssetRestart restarts asset pulling

type PullFailed

type PullFailed struct {
	// contains filtered or unexported fields
}

PullFailed indicates that a node has failed to pull an asset

func (PullFailed) FormatError

func (evt PullFailed) FormatError(xerrors.Printer) (next error)

FormatError Format error

func (PullFailed) Ignore added in v0.1.10

func (evt PullFailed) Ignore()

Ignore Ignore

type PullRequestSent

type PullRequestSent struct{}

PullRequestSent indicates that a pull request has been sent

type PullSucceed

type PullSucceed struct{}

PullSucceed indicates that a node has successfully pulled an asset

func (PullSucceed) Ignore added in v0.1.10

func (evt PullSucceed) Ignore()

Ignore Ignore

type PulledResult

type PulledResult struct {
	BlocksCount int64
	Size        int64
}

PulledResult represents the result of node pulling

func (PulledResult) Ignore added in v0.1.10

func (evt PulledResult) Ignore()

Ignore Ignore

type SelectFailed

type SelectFailed struct {
	// contains filtered or unexported fields
}

SelectFailed indicates that node selection has failed

func (SelectFailed) FormatError

func (evt SelectFailed) FormatError(xerrors.Printer) (next error)

FormatError Format error

type SkipStep

type SkipStep struct{}

SkipStep skips the current step

type SourceDownloadInfo added in v0.1.21

type SourceDownloadInfo struct {
	NodeID  string
	Address string
	Tk      *Token
}

SourceDownloadInfo download source

func (*SourceDownloadInfo) MarshalCBOR added in v0.1.21

func (t *SourceDownloadInfo) MarshalCBOR(w io.Writer) error

func (*SourceDownloadInfo) ToSourceDownloadInfo added in v0.1.21

func (state *SourceDownloadInfo) ToSourceDownloadInfo() *types.SourceDownloadInfo

ToSourceDownloadInfo converts SourceDownloadInfo to types.SourceDownloadInfo

func (*SourceDownloadInfo) UnmarshalCBOR added in v0.1.21

func (t *SourceDownloadInfo) UnmarshalCBOR(r io.Reader) (err error)

type Token added in v0.1.21

type Token struct {
	ID string
	// CipherText encrypted TokenPayload by public key
	CipherText string
	// Sign signs CipherText by scheduler private key
	Sign string
}

Token access download asset

func (*Token) MarshalCBOR added in v0.1.21

func (t *Token) MarshalCBOR(w io.Writer) error

func (*Token) UnmarshalCBOR added in v0.1.21

func (t *Token) UnmarshalCBOR(r io.Reader) (err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL