assets

package
v0.1.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: MIT Imports: 36 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// FailedStates contains a list of asset pull states that represent failures.
	FailedStates = []string{
		SeedFailed.String(),
		CandidatesFailed.String(),
		EdgesFailed.String(),
		UploadFailed.String(),
	}

	// PullingStates contains a list of asset pull states that represent pulling.
	PullingStates = []string{
		SeedSelect.String(),
		SeedPulling.String(),
		UploadInit.String(),
		SeedUploading.String(),
		CandidatesSelect.String(),
		CandidatesPulling.String(),
		EdgesSelect.String(),
		EdgesPulling.String(),
	}

	// ActiveStates contains a list of asset pull states that represent active.
	ActiveStates = append(append([]string{Servicing.String(), Stop.String()}, FailedStates...), PullingStates...)
)
View Source
var (
	// MinRetryTime defines the minimum time duration between retries
	MinRetryTime = 1 * time.Minute

	WaitTime = 5 * time.Second

	// MaxRetryCount defines the maximum number of retries allowed
	MaxRetryCount = 1
)

Functions

This section is empty.

Types

type AssetForceState

type AssetForceState struct {
	State AssetState
	// Requester  string
	Details    string
	SeedNodeID string
}

AssetForceState forces an asset state

type AssetHash

type AssetHash string

AssetHash is an identifier for a asset.

func (AssetHash) String

func (c AssetHash) String() string

type AssetPullingInfo

type AssetPullingInfo struct {
	State             AssetState
	Hash              AssetHash
	CID               string
	Size              int64
	Blocks            int64
	EdgeReplicas      int64
	CandidateReplicas int64
	Bandwidth         int64

	EdgeReplicaSucceeds      []string
	CandidateReplicaSucceeds []string
	EdgeWaitings             int64
	CandidateWaitings        int64

	RetryCount        int64
	ReplenishReplicas int64

	Details    string
	SeedNodeID string

	Source AssetSource

	Note string
}

AssetPullingInfo represents asset pull information

func (*AssetPullingInfo) MarshalCBOR

func (t *AssetPullingInfo) MarshalCBOR(w io.Writer) error

func (*AssetPullingInfo) ToAssetRecord

func (state *AssetPullingInfo) ToAssetRecord() *types.AssetRecord

ToAssetRecord converts AssetPullingInfo to types.AssetRecord

func (*AssetPullingInfo) UnmarshalCBOR

func (t *AssetPullingInfo) UnmarshalCBOR(r io.Reader) (err error)

type AssetRePull

type AssetRePull struct{}

AssetRePull re-pull the asset

func (AssetRePull) Ignore added in v0.1.10

func (evt AssetRePull) Ignore()

type AssetSource added in v0.1.18

type AssetSource int64

AssetSource aws or storage

const (
	// AssetSourceIPFS
	AssetSourceIPFS AssetSource = iota
	// AssetSourceAWS status
	AssetSourceAWS
	// AssetSourceStorage status
	AssetSourceStorage
	// AssetSourceMinio status
	AssetSourceMinio
)

type AssetState

type AssetState string

AssetState represents the state of an asset in the process of being pulled.

const (
	// SeedSelect select first candidate to pull seed asset
	SeedSelect AssetState = "SeedSelect"
	// SeedPulling Waiting for candidate nodes to pull seed asset
	SeedPulling AssetState = "SeedPulling"
	// UploadInit Initialize user upload preparation
	UploadInit AssetState = "UploadInit"
	// SeedUploading Waiting for user to upload asset to candidate node
	SeedUploading AssetState = "SeedUploading"
	// CandidatesSelect select candidates to pull asset
	CandidatesSelect AssetState = "CandidatesSelect"
	// CandidatesPulling candidate nodes pulling asset
	CandidatesPulling AssetState = "CandidatesPulling"
	// EdgesSelect select edges to pull asset
	EdgesSelect AssetState = "EdgesSelect"
	// EdgesPulling edge nodes pulling asset
	EdgesPulling AssetState = "EdgesPulling"
	// Servicing Asset cache completed and in service
	Servicing AssetState = "Servicing"
	// SeedFailed Unable to select candidate nodes or failed to pull seed asset
	SeedFailed AssetState = "SeedFailed"
	// CandidatesFailed Unable to select candidate nodes or failed to pull asset
	CandidatesFailed AssetState = "CandidatesFailed"
	// EdgesFailed  Unable to select edge nodes or failed to pull asset
	EdgesFailed AssetState = "EdgesFailed"
	// UploadFailed User failed to upload assets
	UploadFailed AssetState = "UploadFailed"
	// Remove remove
	Remove AssetState = "Remove"
	// Stop Stop
	Stop AssetState = "Stop"
)

Constants defining various states of the asset pulling process.

func (AssetState) String

func (s AssetState) String() string

String returns the string representation of the AssetState.

type Datastore

type Datastore struct {
	dtypes.ServerID
	// contains filtered or unexported fields
}

Datastore represents the asset datastore

func NewDatastore

func NewDatastore(db *db.SQLDB, serverID dtypes.ServerID) *Datastore

NewDatastore creates a new AssetDatastore

func (*Datastore) Batch

func (d *Datastore) Batch(ctx context.Context) (datastore.Batch, error)

Batch batch

func (*Datastore) Close

func (d *Datastore) Close() error

Close closes the asset datastore

func (*Datastore) Delete

func (d *Datastore) Delete(ctx context.Context, key datastore.Key) error

Delete delete asset record info (This func has no place to call it)

func (*Datastore) Get

func (d *Datastore) Get(ctx context.Context, key datastore.Key) (value []byte, err error)

Get retrieves data from the datastore

func (*Datastore) GetSize

func (d *Datastore) GetSize(ctx context.Context, key datastore.Key) (size int, err error)

GetSize gets the data size from the datastore

func (*Datastore) Has

func (d *Datastore) Has(ctx context.Context, key datastore.Key) (exists bool, err error)

Has checks if the key exists in the datastore

func (*Datastore) Put

func (d *Datastore) Put(ctx context.Context, key datastore.Key, value []byte) error

Put update asset record info

func (*Datastore) Query

func (d *Datastore) Query(ctx context.Context, q query.Query) (query.Results, error)

Query queries asset records from the datastore

func (*Datastore) Sync

func (d *Datastore) Sync(ctx context.Context, prefix datastore.Key) error

Sync sync

type Ignorable

type Ignorable interface {
	Ignore()
}

Ignorable Ignorable

type InfoUpdate

type InfoUpdate struct {
	Size   int64
	Blocks int64
}

InfoUpdate update asset info

func (InfoUpdate) Ignore added in v0.1.10

func (evt InfoUpdate) Ignore()

type Manager

type Manager struct {
	*db.SQLDB
	// contains filtered or unexported fields
}

Manager manages asset replicas

func NewManager

func NewManager(nodeManager *node.Manager, ds datastore.Batching, configFunc dtypes.GetSchedulerConfigFunc, sdb *db.SQLDB) *Manager

NewManager returns a new AssetManager instance

func (*Manager) AssetRemoveDone added in v0.1.11

func (m *Manager) AssetRemoveDone(key string)

AssetRemoveDone Deletion of assets completed

func (*Manager) CandidateDeactivate added in v0.1.11

func (m *Manager) CandidateDeactivate(nodeID string) error

CandidateDeactivate Candidate node are deactivated. Backup their assets.

func (*Manager) CreateAssetPullTask

func (m *Manager) CreateAssetPullTask(info *types.PullAssetReq) error

CreateAssetPullTask create a new asset pull task

func (*Manager) CreateAssetUploadTask added in v0.1.10

func (m *Manager) CreateAssetUploadTask(hash string, req *types.CreateAssetReq) (*types.CreateAssetRsp, error)

CreateAssetUploadTask create a new asset upload task

func (*Manager) CreateBaseAsset added in v0.1.13

func (m *Manager) CreateBaseAsset(cid, nodeID string, size, replicas int64) error

func (*Manager) GenerateToken added in v0.1.10

func (m *Manager) GenerateToken(assetCID string, sources []*types.CandidateDownloadInfo, nodes map[string]*node.Node, size int64) (map[string][]*types.CandidateDownloadInfo, []*types.WorkloadRecord, error)

func (*Manager) GetAssetCount added in v0.1.10

func (m *Manager) GetAssetCount() (int, error)

func (*Manager) ListAssets

func (m *Manager) ListAssets() ([]AssetPullingInfo, error)

ListAssets load asset pull infos from state machine

func (*Manager) Plan

func (m *Manager) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error)

Plan prepares a plan for asset pulling

func (*Manager) RandomAsset

func (m *Manager) RandomAsset(nodeID string, seed int64) (*cid.Cid, error)

RandomAsset get node asset with random seed

func (*Manager) RemoveAsset

func (m *Manager) RemoveAsset(hash string, isWait bool) error

RemoveAsset removes an asset

func (*Manager) RemoveReplica

func (m *Manager) RemoveReplica(cid, hash, nodeID string) error

RemoveReplica remove a replica for node

func (*Manager) RestartPullAssets

func (m *Manager) RestartPullAssets(hashes []types.AssetHash) error

RestartPullAssets restarts asset pulls

func (*Manager) SaveTokenPayload added in v0.1.10

func (m *Manager) SaveTokenPayload(payloads []*types.WorkloadRecord) error

func (*Manager) Start

func (m *Manager) Start(ctx context.Context)

Start initializes and starts the asset state machine and associated tickers

func (*Manager) StartFillDiskTimer added in v0.1.13

func (m *Manager) StartFillDiskTimer()

func (*Manager) StopAsset added in v0.1.14

func (m *Manager) StopAsset(hashs []string) error

StopAsset stop an asset

func (*Manager) StopFillDiskTimer added in v0.1.13

func (m *Manager) StopFillDiskTimer()

func (*Manager) Terminate

func (m *Manager) Terminate(ctx context.Context) error

Terminate stops the asset state machine

func (*Manager) UpdateAssetExpiration

func (m *Manager) UpdateAssetExpiration(cid string, t time.Time) error

UpdateAssetExpiration updates the asset expiration for a given CID

func (*Manager) UpdateFillAssetResponseCount added in v0.1.14

func (m *Manager) UpdateFillAssetResponseCount(bucket, cid, nodeID string)

func (*Manager) WaitAssetRemove added in v0.1.11

func (m *Manager) WaitAssetRemove(key string) *sync.WaitGroup

WaitAssetRemove Waiting for the state machine to delete an asset

type NodePulledResult

type NodePulledResult struct {
	Status      int64
	BlocksCount int64
	Size        int64
	NodeID      string
}

NodePulledResult represents a result of a node pulling assets

func (*NodePulledResult) MarshalCBOR

func (t *NodePulledResult) MarshalCBOR(w io.Writer) error

func (*NodePulledResult) UnmarshalCBOR

func (t *NodePulledResult) UnmarshalCBOR(r io.Reader) (err error)

type PullAssetFatalError

type PullAssetFatalError struct {
	// contains filtered or unexported fields
}

PullAssetFatalError represents a fatal error in asset pulling

func (PullAssetFatalError) FormatError

func (evt PullAssetFatalError) FormatError(xerrors.Printer) (next error)

FormatError Format error

type PullAssetRestart

type PullAssetRestart struct{}

PullAssetRestart restarts asset pulling

type PullFailed

type PullFailed struct {
	// contains filtered or unexported fields
}

PullFailed indicates that a node has failed to pull an asset

func (PullFailed) FormatError

func (evt PullFailed) FormatError(xerrors.Printer) (next error)

FormatError Format error

func (PullFailed) Ignore added in v0.1.10

func (evt PullFailed) Ignore()

type PullRequestSent

type PullRequestSent struct{}

PullRequestSent indicates that a pull request has been sent

type PullSucceed

type PullSucceed struct{}

PullSucceed indicates that a node has successfully pulled an asset

func (PullSucceed) Ignore added in v0.1.10

func (evt PullSucceed) Ignore()

type PulledResult

type PulledResult struct {
	BlocksCount int64
	Size        int64
}

PulledResult represents the result of node pulling

func (PulledResult) Ignore added in v0.1.10

func (evt PulledResult) Ignore()

type SelectFailed

type SelectFailed struct {
	// contains filtered or unexported fields
}

SelectFailed indicates that node selection has failed

func (SelectFailed) FormatError

func (evt SelectFailed) FormatError(xerrors.Printer) (next error)

FormatError Format error

type SkipStep

type SkipStep struct{}

SkipStep skips the current step

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL