manager

package
v1.6.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2024 License: GPL-3.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DiscontinueBucketReason defines the reason for stop serving
	DiscontinueBucketReason = "testnet cleanup"

	// DiscontinueBucketLimit define the max buckets to fetch in a single request
	DiscontinueBucketLimit = int64(500)

	// RejectUnSealObjectRetry defines the retry number of sending reject unseal object tx.
	RejectUnSealObjectRetry = 3

	// RejectUnSealObjectTimeout defines the timeout of sending reject unseal object tx.
	RejectUnSealObjectTimeout = 3

	// DefaultBackupTaskTimeout defines the timeout of backing up task for dispatching
	DefaultBackupTaskTimeout = 1

	// ShadowIntegrity
	DefaultGCStaleVersionLimit = 10
)
View Source
const (
	// DefaultGlobalMaxUploadingNumber defines the default max uploading object number
	// in SP, include: uploading object to primary, replicate object to secondaries,
	// and sealing object on greenfield.
	DefaultGlobalMaxUploadingNumber int = 40960
	// DefaultGlobalUploadObjectParallel defines the default max parallel uploading
	// objects to primary in SP system.
	DefaultGlobalUploadObjectParallel int = 10240
	// DefaultGlobalReplicatePieceParallel defines the default max parallel replicating
	// objects to primary in SP system.
	DefaultGlobalReplicatePieceParallel int = 10240
	// DefaultGlobalSealObjectParallel defines the default max parallel sealing objects
	// on greenfield in SP system.
	DefaultGlobalSealObjectParallel int = 10240
	// DefaultGlobalReceiveObjectParallel defines the default max parallel confirming
	// receive pieces on greenfield in SP system.
	DefaultGlobalReceiveObjectParallel int = 10240 * 10
	// DefaultGlobalBackupTaskParallel defines the default parallel backup tasks for
	// dispatching to task executor
	DefaultGlobalBackupTaskParallel int = 10240 * 100
	// DefaultGlobalGCObjectParallel defines the default max parallel gc objects in SP
	// system.
	DefaultGlobalGCObjectParallel int = 4
	// DefaultGlobalGCZombieParallel defines the default max parallel gc zombie pieces
	// in SP system.
	DefaultGlobalGCZombieParallel int = 1
	// DefaultGlobalGCMetaParallel defines the default max parallel gc meta db in SP
	// system.
	DefaultGlobalGCMetaParallel int = 1
	// DefaultGlobalGCStaleVersionObjectParallel defines the default max parallel gc stale version object in SP
	// system.
	DefaultGlobalGCStaleVersionObjectParallel int = 10
	// DefaultGlobalGCBucketMigrationParallel defines the default max parallel gc bucket migration in SP
	// system.
	DefaultGlobalGCBucketMigrationParallel int = 1
	// 	DefaultGlobalRecoveryPieceParallel defines the default max parallel recovery objects in SP
	// system.
	DefaultGlobalRecoveryPieceParallel int = 100
	// DefaultGlobalMigrateGVGParallel defines the default max parallel migrating gvg in SP system.
	DefaultGlobalMigrateGVGParallel int = 200
	// DefaultGlobalDownloadObjectTaskCacheSize defines the default max cache the download
	// object tasks in manager.
	DefaultGlobalDownloadObjectTaskCacheSize int = 4096
	// DefaultGlobalChallengePieceTaskCacheSize defines the default max cache the challenge
	// piece tasks in manager.
	DefaultGlobalChallengePieceTaskCacheSize int = 4096

	// DefaultGlobalBatchGCObjectTimeInterval defines the default interval for generating
	// gc object task.
	DefaultGlobalBatchGCObjectTimeInterval int = 1 * 60
	// DefaultGlobalGCObjectBlockInterval defines the default blocks number for getting
	// deleted objects.
	DefaultGlobalGCObjectBlockInterval uint64 = 1000
	// DefaultGlobalGCObjectSafeBlockDistance defines the default distance form current block
	// height to gc the deleted object.
	DefaultGlobalGCObjectSafeBlockDistance uint64 = 1000
	// DefaultGlobalGCZombiePieceTimeInterval defines the default interval for generating
	// gc zombie piece task.
	DefaultGlobalGCZombiePieceTimeInterval int = 10 * 60
	// DefaultGlobalGCZombiePieceObjectIDInterval defines the default object id number for getting
	// deleted zombie piece.
	DefaultGlobalGCZombiePieceObjectIDInterval uint64 = 100
	// DefaultGlobalGCZombieSafeObjectIDDistance defines the default distance form current object id
	// to gc the deleted zombie piece.
	DefaultGlobalGCZombieSafeObjectIDDistance uint64 = 1000
	// DefaultGlobalGCMetaTimeInterval defines the default interval for generating
	// gc meta task.
	DefaultGlobalGCMetaTimeInterval int = 10 * 60
	//	DefaultGlobalGCStaleVersionObjectInterval int = 10 * 60
	DefaultGlobalGCStaleVersionObjectInterval = 1 * 60
	// DefaultGlobalSyncConsensusInfoInterval defines the default interval for sync the sp
	// info list to sp db.
	DefaultGlobalSyncConsensusInfoInterval uint64 = 600
	// DefaultStatisticsOutputInterval defines the default interval for output statistics info,
	// it is used to log and debug.
	DefaultStatisticsOutputInterval int = 60
	// DefaultListenRejectUnSealTimeoutHeight defines the default listen reject unseal object
	// on greenfield timeout height, if after current block height + timeout height, the object
	// is not rejected, it is judged failed to reject unseal object on greenfield.
	DefaultListenRejectUnSealTimeoutHeight int = 10
	// DefaultSyncAvailableVGFInterval defines the default interval for available global virtual group family info,
	// it is used to log and debug.
	DefaultSyncAvailableVGFInterval int = 5 * 60

	// DefaultDiscontinueTimeInterval defines the default interval for starting discontinue
	// buckets task , used for test net.
	DefaultDiscontinueTimeInterval = 3 * 60
	// DefaultDiscontinueBucketKeepAliveDays defines the default bucket keep alive days, after
	// the interval, buckets will be discontinued, used for test net.
	DefaultDiscontinueBucketKeepAliveDays = 7

	// DefaultLoadReplicateTimeout defines the task timeout that load replicate tasks from sp db
	DefaultLoadReplicateTimeout int64 = 60
	// DefaultLoadSealTimeout defines the task timeout that load seal tasks from sp db
	DefaultLoadSealTimeout int64 = 180
	// DefaultSubscribeSPExitEventIntervalMillisecond define the default time interval to subscribe sp exit event from metadata.
	DefaultSubscribeSPExitEventIntervalMillisecond = 2000
	// DefaultSubscribeBucketMigrateEventIntervalMillisecond define the default time interval to subscribe bucket migrate event from metadata.
	DefaultSubscribeBucketMigrateEventIntervalMillisecond = 2000
	// DefaultSubscribeSwapOutEventIntervalMillisecond define the default time interval to subscribe gvg swap out event from metadata.
	DefaultSubscribeSwapOutEventIntervalMillisecond = 2000
	// DefaultGCExpiredOffChainAuthKeysTimeInterval define the default time interval to gc expired off chain auth keys
	DefaultGCExpiredOffChainAuthKeysTimeInterval = 24 * 3600
)
View Source
const (
	ManagerGCBlockNumber           = "manager_gc_block_number"
	ManagerSuccessUpload           = "manager_upload_object_success"
	ManagerFailureUpload           = "manager_upload_object_failure"
	ManagerSuccessReplicate        = "manager_replicate_object_success"
	ManagerFailureReplicate        = "manager_replicate_object_failure"
	ManagerCancelReplicate         = "manager_replicate_object_cancel"
	ManagerSuccessReplicateAndSeal = "manager_replicate_and_seal_object_success"
	ManagerFailureReplicateAndSeal = "manager_replicate_and_seal_object_failure"
	ManagerSuccessSeal             = "manager_seal_object_success"
	ManagerFailureSeal             = "manager_seal_object_failure"
	ManagerCancelSeal              = "manager_seal_object_cancel"
	ManagerSuccessConfirmReceive   = "manager_confirm_receive_success"
	ManagerFailureConfirmReceive   = "manager_confirm_receive_failure"
)
View Source
const (
	MaxRecoveryTime = 50
)
View Source
const (
	SigExpireTimeSecond = 60 * 60
)

Variables

View Source
var (
	ErrDanglingTask         = gfsperrors.Register(module.ManageModularName, http.StatusBadRequest, 60001, "OoooH... request lost")
	ErrRepeatedTask         = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60002, "request repeated")
	ErrExceedTask           = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60003, "OoooH... request exceed, try again later")
	ErrCanceledTask         = gfsperrors.Register(module.ManageModularName, http.StatusBadRequest, 60004, "task canceled")
	ErrFutureSupport        = gfsperrors.Register(module.ManageModularName, http.StatusNotFound, 60005, "future support")
	ErrNotifyMigrateSwapOut = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60006, "failed to notify swap out start")
)

Functions

func CheckGVGMetaConsistent added in v0.2.4

func CheckGVGMetaConsistent(chainMetaList []*virtualgrouptypes.GlobalVirtualGroup, dbMetaList []*spdb.MigrateGVGUnitMeta) bool

CheckGVGMetaConsistent verifies whether expectedSPGVGList completely matches with the migrateGVGUnitMeta loaded from the database.

func DefaultManagerOptions

func DefaultManagerOptions(manager *ManageModular, cfg *gfspconfig.GfSpConfig) (err error)

func ErrGfSpDBWithDetail added in v0.2.4

func ErrGfSpDBWithDetail(detail string) *gfsperrors.GfSpError

func GetSwapOutApprovalAndSendTx added in v0.2.3

func GetSwapOutApprovalAndSendTx(baseApp *gfspapp.GfSpBaseApp, destSP *sptypes.StorageProvider, originMsg *virtualgrouptypes.MsgSwapOut) (*virtualgrouptypes.MsgSwapOut, error)

GetSwapOutApprovalAndSendTx is used to get approval from dest sp and notify it to start migrate swap out's gvg tasks.

func MakeBucketMigrateKey added in v0.2.3

func MakeBucketMigrateKey(bucketID uint64, gvgID uint32) string

func MakeGVGMigrateKey added in v0.2.3

func MakeGVGMigrateKey(gvgID uint32, vgfID uint32, redundancyIndex int32) string

func NewManageModular

func NewManageModular(app *gfspapp.GfSpBaseApp, cfg *gfspconfig.GfSpConfig) (coremodule.Modular, error)

func SendAndConfirmCompleteMigrateBucketTx added in v1.3.0

func SendAndConfirmCompleteMigrateBucketTx(baseApp *gfspapp.GfSpBaseApp, msg *storagetypes.MsgCompleteMigrateBucket) error

func SendAndConfirmCompleteSPExitTx added in v0.2.4

func SendAndConfirmCompleteSPExitTx(baseApp *gfspapp.GfSpBaseApp, msg *virtualgrouptypes.MsgCompleteStorageProviderExit) error

func SendAndConfirmCompleteSwapInTx added in v1.3.0

func SendAndConfirmCompleteSwapInTx(baseApp *gfspapp.GfSpBaseApp, msg *types2.MsgCompleteSwapIn) error

func SendAndConfirmCompleteSwapOutTx added in v0.2.4

func SendAndConfirmCompleteSwapOutTx(baseApp *gfspapp.GfSpBaseApp, msg *virtualgrouptypes.MsgCompleteSwapOut) error

func SendAndConfirmTx added in v0.2.4

func SendAndConfirmTx(chainClient consensus.Consensus, sendTxFunc SendTxFunc) error

func UpdateBucketMigrationProgress added in v1.3.0

func UpdateBucketMigrationProgress(baseApp *gfspapp.GfSpBaseApp, bucketID uint64, migrateState storetypes.BucketMigrationState) error

Types

type BasicGVGMigrateExecuteUnit added in v0.2.4

type BasicGVGMigrateExecuteUnit struct {
	SrcGVG               *virtualgrouptypes.GlobalVirtualGroup
	SrcSP                *sptypes.StorageProvider
	DestSP               *sptypes.StorageProvider // self sp.
	MigrateStatus        MigrateStatus
	LastMigratedObjectID uint64
}

type BucketCache added in v0.2.4

type BucketCache struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

BucketCache is an LRU cache.

func NewBucketCache added in v0.2.4

func NewBucketCache(maxEntries int, expire time.Duration) *BucketCache

NewBucketCache creates a new Cache. If maxEntries is zero, the cache has no limit and it's assumed that eviction is done by the caller.

func (*BucketCache) Delete added in v0.2.4

func (c *BucketCache) Delete(key string)

Delete removes the provided key from the cache.

func (*BucketCache) Flush added in v0.2.4

func (c *BucketCache) Flush()

Flush empties the whole cache

func (*BucketCache) Get added in v0.2.4

func (c *BucketCache) Get(key string) (value interface{}, ok bool)

Get looks up a key's value from the cache.

func (*BucketCache) Len added in v0.2.4

func (c *BucketCache) Len() int

Len returns the number of items in the cache.

func (*BucketCache) Set added in v0.2.4

func (c *BucketCache) Set(key string, value interface{})

Set adds a value to the cache

type BucketMigrateExecutePlan added in v0.2.3

type BucketMigrateExecutePlan struct {
	// contains filtered or unexported fields
}

BucketMigrateExecutePlan is used to manage bucket migrate process.

func (*BucketMigrateExecutePlan) QueryMigrateGVG added in v1.1.0

func (plan *BucketMigrateExecutePlan) QueryMigrateGVG(migrateKey string) (*spdb.MigrateGVGUnitMeta, error)

QueryMigrateGVG Query migrate GVG unit

func (*BucketMigrateExecutePlan) Start added in v0.2.3

func (plan *BucketMigrateExecutePlan) Start() error

func (*BucketMigrateExecutePlan) UpdateMigrateGVGLastMigratedObjectID added in v0.2.3

func (plan *BucketMigrateExecutePlan) UpdateMigrateGVGLastMigratedObjectID(migrateKey string, lastMigratedObjectID uint64) error

UpdateMigrateGVGLastMigratedObjectID persistent user updates and periodic progress reporting by Executor

func (*BucketMigrateExecutePlan) UpdateMigrateGVGRetryCount added in v1.1.0

func (plan *BucketMigrateExecutePlan) UpdateMigrateGVGRetryCount(migrateKey string, retryTime int) error

type BucketMigrateGVGExecuteUnit added in v0.2.3

type BucketMigrateGVGExecuteUnit struct {
	BasicGVGMigrateExecuteUnit
	BucketID  uint64
	DestGVGID uint32
	DestGVG   *virtualgrouptypes.GlobalVirtualGroup
}

BucketMigrateGVGExecuteUnit is used to record bucket migrate gvg unit.

func (*BucketMigrateGVGExecuteUnit) Key added in v0.2.3

Key is used to as primary key.

type BucketMigrateScheduler added in v0.2.3

type BucketMigrateScheduler struct {
	// contains filtered or unexported fields
}

BucketMigrateScheduler subscribes bucket migrate events and produces a gvg migrate plan.

func NewBucketMigrateScheduler added in v0.2.3

func NewBucketMigrateScheduler(manager *ManageModular) (*BucketMigrateScheduler, error)

NewBucketMigrateScheduler returns a bucket migrate scheduler instance.

func (*BucketMigrateScheduler) Init added in v0.2.3

func (*BucketMigrateScheduler) PostMigrateBucket added in v1.1.0

func (s *BucketMigrateScheduler) PostMigrateBucket(postMsg *gfsptask.GfSpBucketMigrationInfo, srcSPInfo *sptypes.StorageProvider) error

PostMigrateBucket is used to pick a suitable gvg for replicating object.

func (*BucketMigrateScheduler) PreMigrateBucket added in v1.1.0

func (s *BucketMigrateScheduler) PreMigrateBucket(bucketID uint64, srcSPInfo *sptypes.StorageProvider) error

PreMigrateBucket is used to Dest SP notifies Src SP and pre-deducts quota.

func (*BucketMigrateScheduler) Start added in v0.2.3

func (s *BucketMigrateScheduler) Start() error

func (*BucketMigrateScheduler) UpdateBucketMigrationGCProgress added in v1.3.0

func (s *BucketMigrateScheduler) UpdateBucketMigrationGCProgress(ctx context.Context, gcBucketMigrationTask task.GCBucketMigrationTask) error

func (*BucketMigrateScheduler) UpdateMigrateProgress added in v0.2.3

func (s *BucketMigrateScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error

type DestSPTaskRunner added in v0.2.3

type DestSPTaskRunner struct {
	// contains filtered or unexported fields
}

DestSPTaskRunner is used to manage task migrate progress/status in dest sp.

func NewDestSPTaskRunner added in v0.2.3

func NewDestSPTaskRunner(m *ManageModular, v vgmgr.VirtualGroupManager) *DestSPTaskRunner

NewDestSPTaskRunner returns a dest sp task runner instance.

func (*DestSPTaskRunner) AddNewMigrateGVGUnit added in v0.2.3

func (runner *DestSPTaskRunner) AddNewMigrateGVGUnit(remotedGVGUnit *SPExitGVGExecuteUnit) error

AddNewMigrateGVGUnit is used to add new gvg task to task runner.

func (*DestSPTaskRunner) AddNewSwapOut added in v0.2.3

func (runner *DestSPTaskRunner) AddNewSwapOut(swapOut *virtualgrouptypes.MsgSwapOut) error

AddNewSwapOut is used to add new swap out to the task runner.

func (*DestSPTaskRunner) LoadFromDB added in v0.2.3

func (runner *DestSPTaskRunner) LoadFromDB() error

LoadFromDB is used to rebuild swap out status at start up.

func (*DestSPTaskRunner) Start added in v0.2.3

func (runner *DestSPTaskRunner) Start() error

Start is used to push task runner's gvg migrate to manager task dispatcher.

func (*DestSPTaskRunner) UpdateMigrateGVGLastMigratedObjectID added in v0.2.3

func (runner *DestSPTaskRunner) UpdateMigrateGVGLastMigratedObjectID(migrateKey string, lastMigratedObjectID uint64) error

UpdateMigrateGVGLastMigratedObjectID is used to update gvg task progress.

func (*DestSPTaskRunner) UpdateMigrateGVGStatus added in v0.2.3

func (runner *DestSPTaskRunner) UpdateMigrateGVGStatus(migrateKey string, st MigrateStatus) error

UpdateMigrateGVGStatus is used to update gvg task status.

type FamilyConflictChecker added in v0.2.3

type FamilyConflictChecker struct {
	// contains filtered or unexported fields
}

FamilyConflictChecker 1.Current virtual group and sp status

sp_list=[sp1,sp2,sp3,sp4,sp5,sp6,sp7,sp8]
family1 = {primary=sp1, gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp7), gvg2=(sp1,sp2,sp3,sp4,sp5,sp6,sp8))}

2.Resolve conflict

sp1 exit, cannot pick a sp to replace sp1, there is a conflict.
resolveConflictGVGMigrateUnits = gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp7)->gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp8)

3.After resolve conflict

family1 = {primary=sp1, gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp8), gvg2=(sp1,sp2,sp3,sp4,sp5,sp6,sp8))}

4.Primary migrate

pick dst_primary_sp=sp7, and migrate family:
family1 = {primary=sp7, gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp8), gvg2=(sp7,sp2,sp3,sp4,sp5,sp6,sp8))}

5.Complete sp exit

sp1 complete sp exit.

func NewFamilyConflictChecker added in v0.2.3

NewFamilyConflictChecker returns a family conflicted checker instance.

func (*FamilyConflictChecker) GenerateSwapOutUnits added in v0.2.3

func (checker *FamilyConflictChecker) GenerateSwapOutUnits(buildMetaByDB bool) ([]*SwapOutUnit, error)

GenerateSwapOutUnits generate the family swap out units.

type GenerateGVGSecondarySPsPolicyByPrefer added in v0.2.3

type GenerateGVGSecondarySPsPolicyByPrefer struct {
	// contains filtered or unexported fields
}

func NewGenerateGVGSecondarySPsPolicyByPrefer added in v0.2.3

func NewGenerateGVGSecondarySPsPolicyByPrefer(p *storagetypes.Params, preferSPIDList []uint32) *GenerateGVGSecondarySPsPolicyByPrefer

func (*GenerateGVGSecondarySPsPolicyByPrefer) AddCandidateSP added in v0.2.3

func (p *GenerateGVGSecondarySPsPolicyByPrefer) AddCandidateSP(spID uint32)

func (*GenerateGVGSecondarySPsPolicyByPrefer) GenerateGVGSecondarySPs added in v0.2.3

func (p *GenerateGVGSecondarySPsPolicyByPrefer) GenerateGVGSecondarySPs() ([]uint32, error)

type ManageModular

type ManageModular struct {
	// contains filtered or unexported fields
}

func (*ManageModular) DispatchTask

func (m *ManageModular) DispatchTask(ctx context.Context, limit rcmgr.Limit) (task.Task, error)

func (*ManageModular) FilterGCTask

func (m *ManageModular) FilterGCTask(qTask task.Task) bool

func (*ManageModular) FilterGVGTask added in v0.2.4

func (m *ManageModular) FilterGVGTask(qTask task.Task) bool

func (*ManageModular) FilterReceiveTask added in v0.2.3

func (m *ManageModular) FilterReceiveTask(qTask task.Task) bool

func (*ManageModular) FilterUploadingTask

func (m *ManageModular) FilterUploadingTask(qTask task.Task) bool

func (*ManageModular) GCCacheQueue

func (m *ManageModular) GCCacheQueue(qTask task.Task) bool

func (*ManageModular) GCMigrateGVGQueue added in v0.2.3

func (m *ManageModular) GCMigrateGVGQueue(qTask task.Task) bool

func (*ManageModular) GCReceiveQueue

func (m *ManageModular) GCReceiveQueue(qTask task.Task) bool

func (*ManageModular) GCRecoverQueue added in v0.2.3

func (m *ManageModular) GCRecoverQueue(qTask task.Task) bool

func (*ManageModular) GCReplicatePieceQueue

func (m *ManageModular) GCReplicatePieceQueue(qTask task.Task) bool

func (*ManageModular) GCResumableUploadObjectQueue added in v0.2.3

func (m *ManageModular) GCResumableUploadObjectQueue(qTask task.Task) bool

func (*ManageModular) GCSealObjectQueue

func (m *ManageModular) GCSealObjectQueue(qTask task.Task) bool

func (*ManageModular) GCUploadObjectQueue

func (m *ManageModular) GCUploadObjectQueue(qTask task.Task) bool

func (*ManageModular) GenerateGCBucketMigrationTask added in v1.2.0

func (m *ManageModular) GenerateGCBucketMigrationTask(ctx context.Context, bucketID uint64)

func (*ManageModular) HandleChallengePieceTask

func (m *ManageModular) HandleChallengePieceTask(ctx context.Context, task task.ChallengePieceTask) error

func (*ManageModular) HandleCreateGCBucketMigrationTask added in v1.2.0

func (m *ManageModular) HandleCreateGCBucketMigrationTask(ctx context.Context, task task.GCBucketMigrationTask) error

func (*ManageModular) HandleCreateResumableUploadObjectTask added in v0.2.3

func (m *ManageModular) HandleCreateResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error

func (*ManageModular) HandleCreateUploadObjectTask

func (m *ManageModular) HandleCreateUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error

func (*ManageModular) HandleDoneResumableUploadObjectTask added in v0.2.3

func (m *ManageModular) HandleDoneResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error

func (*ManageModular) HandleDoneUploadObjectTask

func (m *ManageModular) HandleDoneUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error

func (*ManageModular) HandleDownloadObjectTask

func (m *ManageModular) HandleDownloadObjectTask(ctx context.Context, task task.DownloadObjectTask) error

func (*ManageModular) HandleGCBucketMigrationTask added in v1.2.0

func (m *ManageModular) HandleGCBucketMigrationTask(ctx context.Context, gcBucketMigrationTask task.GCBucketMigrationTask) error

func (*ManageModular) HandleGCMetaTask

func (m *ManageModular) HandleGCMetaTask(ctx context.Context, gcMetaTask task.GCMetaTask) error

func (*ManageModular) HandleGCObjectTask

func (m *ManageModular) HandleGCObjectTask(ctx context.Context, gcTask task.GCObjectTask) error

func (*ManageModular) HandleGCStaleVersionObjectTask added in v1.5.0

func (m *ManageModular) HandleGCStaleVersionObjectTask(ctx context.Context, gcStaleVersionObjectTask task.GCStaleVersionObjectTask) error

func (*ManageModular) HandleGCZombiePieceTask

func (m *ManageModular) HandleGCZombiePieceTask(ctx context.Context, gcZombiePieceTask task.GCZombiePieceTask) error

func (*ManageModular) HandleMigrateGVGTask added in v0.2.3

func (m *ManageModular) HandleMigrateGVGTask(ctx context.Context, task task.MigrateGVGTask) error

func (*ManageModular) HandleReceivePieceTask

func (m *ManageModular) HandleReceivePieceTask(ctx context.Context, task task.ReceivePieceTask) error

func (*ManageModular) HandleRecoverPieceTask added in v0.2.3

func (m *ManageModular) HandleRecoverPieceTask(ctx context.Context, task task.RecoveryPieceTask) error

func (*ManageModular) HandleReplicatePieceTask

func (m *ManageModular) HandleReplicatePieceTask(ctx context.Context, task task.ReplicatePieceTask) error

func (*ManageModular) HandleSealObjectTask

func (m *ManageModular) HandleSealObjectTask(ctx context.Context, task task.SealObjectTask) error

func (*ManageModular) LoadTaskFromDB

func (m *ManageModular) LoadTaskFromDB() error

func (*ManageModular) Name

func (m *ManageModular) Name() string

func (*ManageModular) NotifyMigrateSwapOut added in v0.2.3

func (m *ManageModular) NotifyMigrateSwapOut(ctx context.Context, swapOut *virtualgrouptypes.MsgSwapOut) error

NotifyMigrateSwapOut is used to receive migrate gvg task from src sp.

func (*ManageModular) NotifyPostMigrateBucketAndRecoupQuota added in v1.3.0

func (m *ManageModular) NotifyPostMigrateBucketAndRecoupQuota(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) (*gfsptask.GfSpBucketQuotaInfo, error)

NotifyPostMigrateBucketAndRecoupQuota is used to notify src sp confirm that only one Post migrate bucket is allowed.

func (*ManageModular) NotifyPreMigrateBucketAndDeductQuota added in v1.3.0

func (m *ManageModular) NotifyPreMigrateBucketAndDeductQuota(ctx context.Context, bucketID uint64) (*gfsptask.GfSpBucketQuotaInfo, error)

NotifyPreMigrateBucketAndDeductQuota is used to notify record bucket is migrating

func (*ManageModular) PickUpTask

func (m *ManageModular) PickUpTask(ctx context.Context, tasks []task.Task) (task.Task, []task.Task)

func (*ManageModular) PickVirtualGroupFamily added in v0.2.3

func (m *ManageModular) PickVirtualGroupFamily(ctx context.Context, task task.ApprovalCreateBucketTask) (uint32, error)

PickVirtualGroupFamily is used to pick a suitable vgf for creating bucket.

func (*ManageModular) QueryBucketMigrate added in v0.2.3

func (m *ManageModular) QueryBucketMigrate(ctx context.Context) (res *gfspserver.GfSpQueryBucketMigrateResponse, err error)

func (*ManageModular) QueryBucketMigrationProgress added in v1.3.0

func (m *ManageModular) QueryBucketMigrationProgress(_ context.Context, bucketID uint64) (*gfspserver.MigrateBucketProgressMeta, error)

func (*ManageModular) QueryRecoverProcess added in v1.3.0

func (m *ManageModular) QueryRecoverProcess(ctx context.Context, vgfID, gvgID uint32) ([]*gfspserver.RecoverProcess, bool, error)

func (*ManageModular) QuerySpExit added in v0.2.3

func (m *ManageModular) QuerySpExit(ctx context.Context) (res *gfspserver.GfSpQuerySpExitResponse, err error)

func (*ManageModular) QueryTasks

func (m *ManageModular) QueryTasks(ctx context.Context, subKey task.TKey) ([]task.Task, error)

func (*ManageModular) QueryTasksStats added in v0.2.6

func (m *ManageModular) QueryTasksStats(_ context.Context) (uploadTasks int,
	replicateCount int,
	sealCount int,
	resumableUploadCount int,
	maxUploadCount int,
	migrateGVGCount int,
	recoveryProcessCount int,
	recoveryFailedList []string,
)

func (*ManageModular) RejectUnSealObject

func (m *ManageModular) RejectUnSealObject(ctx context.Context, object *storagetypes.ObjectInfo) error

func (*ManageModular) ReleaseResource

func (m *ManageModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan)

func (*ManageModular) ReserveResource

func (m *ManageModular) ReserveResource(ctx context.Context, state *rcmgr.ScopeStat) (rcmgr.ResourceScopeSpan, error)

func (*ManageModular) ResetGCBucketMigrationQueue added in v1.2.0

func (m *ManageModular) ResetGCBucketMigrationQueue(qTask task.Task) bool

func (*ManageModular) ResetGCMetaTask added in v1.2.0

func (m *ManageModular) ResetGCMetaTask(qTask task.Task) bool

func (*ManageModular) ResetGCObjectTask

func (m *ManageModular) ResetGCObjectTask(qTask task.Task) bool

func (*ManageModular) ResetGCStaleVersionObjectQueue added in v1.5.0

func (m *ManageModular) ResetGCStaleVersionObjectQueue(qTask task.Task) bool

func (*ManageModular) ResetGCZombieTask added in v1.2.0

func (m *ManageModular) ResetGCZombieTask(qTask task.Task) bool

func (*ManageModular) ResetRecoveryFailedList added in v1.1.0

func (m *ManageModular) ResetRecoveryFailedList(_ context.Context) []string

func (*ManageModular) Start

func (m *ManageModular) Start(ctx context.Context) error

func (*ManageModular) Statistics

func (m *ManageModular) Statistics() string

func (*ManageModular) Stop

func (m *ManageModular) Stop(ctx context.Context) error

func (*ManageModular) TaskRecovering added in v0.2.3

func (m *ManageModular) TaskRecovering(ctx context.Context, task task.Task) bool

func (*ManageModular) TaskUploading

func (m *ManageModular) TaskUploading(ctx context.Context, task task.Task) bool

func (*ManageModular) TriggerRecoverForSuccessorSP added in v1.3.0

func (m *ManageModular) TriggerRecoverForSuccessorSP(ctx context.Context, vgfID, gvgID uint32, redundancyIndex int32) error

func (*ManageModular) UploadingObjectNumber

func (m *ManageModular) UploadingObjectNumber() int

type MigrateStatus added in v0.2.3

type MigrateStatus int32
var (
	WaitForMigrate MigrateStatus = 0
	Migrating      MigrateStatus = 1
	Migrated       MigrateStatus = 2
)

MigrateGVGTable status migrate: WaitForMigrate(created)->Migrating(schedule success)->Migrated(executor report success).

type NotifyDestSPIterator added in v0.2.3

type NotifyDestSPIterator struct {
	// contains filtered or unexported fields
}

NotifyDestSPIterator is used to notify/check migrate units to dest sp.

func NewNotifyDestSPIterator added in v0.2.3

func NewNotifyDestSPIterator(plan *SrcSPSwapOutPlan) *NotifyDestSPIterator

NewNotifyDestSPIterator returns notify dest sp iterator.

func (*NotifyDestSPIterator) Next added in v0.2.3

func (iter *NotifyDestSPIterator) Next()

func (*NotifyDestSPIterator) Valid added in v0.2.3

func (iter *NotifyDestSPIterator) Valid() bool

func (*NotifyDestSPIterator) Value added in v0.2.3

type ObjectSegmentsStats added in v1.3.0

type ObjectSegmentsStats struct {
	SegmentCount    int
	FailedSegments  vgmgr.IDSet
	SucceedSegments vgmgr.IDSet
}

ObjectSegmentsStats is used to record the object's segments piece recovery progress. used by the RecoverGVGScheduler

type ObjectsSegmentsStats added in v1.3.0

type ObjectsSegmentsStats struct {
	// contains filtered or unexported fields
}

func NewObjectsSegmentsStats added in v1.3.0

func NewObjectsSegmentsStats() *ObjectsSegmentsStats

type PickDestGVGFilter added in v0.2.3

type PickDestGVGFilter struct {
	// contains filtered or unexported fields
}

PickDestGVGFilter is used to pick dest gvg for bucket migrate.

func NewPickDestGVGFilter added in v0.2.3

func NewPickDestGVGFilter(familyID uint32, secondarySPIDs []uint32, minFreeSize uint64) *PickDestGVGFilter

func (*PickDestGVGFilter) CheckFamily added in v0.2.3

func (f *PickDestGVGFilter) CheckFamily(familyID uint32) bool

func (*PickDestGVGFilter) CheckGVG added in v0.2.3

func (f *PickDestGVGFilter) CheckGVG(gvgMeta *vgmgr.GlobalVirtualGroupMeta) bool

type PickDestSPFilter added in v0.2.3

type PickDestSPFilter struct {
	// contains filtered or unexported fields
}

PickDestSPFilter is used to pick sp id which is not in excluded sp ids, which is used by src sp to pick dest sp.

func NewPickDestSPFilterWithMap added in v0.2.3

func NewPickDestSPFilterWithMap(m map[uint32]int) *PickDestSPFilter

NewPickDestSPFilterWithMap returns a PickDestSPFilter instance.

func NewPickDestSPFilterWithSlice added in v0.2.3

func NewPickDestSPFilterWithSlice(s []uint32) *PickDestSPFilter

NewPickDestSPFilterWithSlice returns a PickDestSPFilter instance.

func (*PickDestSPFilter) Check added in v0.2.3

func (f *PickDestSPFilter) Check(spID uint32) bool

Check returns true when candidate sp meets the check condition.

type PrefetchFunc added in v1.2.0

type PrefetchFunc func(iter *TaskIterator) ([]*spdb.UploadObjectMeta, error)

type RecoverFailedObjectScheduler added in v1.3.0

type RecoverFailedObjectScheduler struct {
	// contains filtered or unexported fields
}

RecoverFailedObjectScheduler is used to scan the failed_object table for failed object entries, and retry the object recovery the entries are inserted from (1) RecoverGVGScheduler: Objects failed to recover. (2) VerifyGVGScheduler: Objects are found to be missed when re-verify by calling api ListObjectsInGVG that verifying the object existence by querying integrate

A GVG is marked as completed from Processed only when all objects pass the verification.

func NewRecoverFailedObjectScheduler added in v1.3.0

func NewRecoverFailedObjectScheduler(m *ManageModular, vgfID, gvgID uint32) *RecoverFailedObjectScheduler

func (*RecoverFailedObjectScheduler) Start added in v1.3.0

func (s *RecoverFailedObjectScheduler) Start()

type RecoverGVGScheduler added in v1.3.0

type RecoverGVGScheduler struct {
	// contains filtered or unexported fields
}

func NewRecoverGVGScheduler added in v1.3.0

func NewRecoverGVGScheduler(m *ManageModular, vgfID, gvgID uint32, redundancyIndex int32) (*RecoverGVGScheduler, error)

func (*RecoverGVGScheduler) Start added in v1.3.0

func (s *RecoverGVGScheduler) Start()

type RecoverVGFScheduler added in v1.3.0

type RecoverVGFScheduler struct {
	RecoverSchedulers []*RecoverGVGScheduler
	VerifySchedulers  []*VerifyGVGScheduler
	// contains filtered or unexported fields
}

func NewRecoverVGFScheduler added in v1.3.0

func NewRecoverVGFScheduler(m *ManageModular, vgfID uint32) (*RecoverVGFScheduler, error)

func (*RecoverVGFScheduler) Start added in v1.3.0

func (s *RecoverVGFScheduler) Start()

type RetryTaskType added in v1.2.0

type RetryTaskType int32

type SPConflictChecker added in v0.2.4

type SPConflictChecker struct {
	// contains filtered or unexported fields
}

func NewSPConflictChecker added in v0.2.4

func NewSPConflictChecker(p *BucketMigrateExecutePlan, src, dest *sptypes.StorageProvider, bucketID uint64) *SPConflictChecker

NewSPConflictChecker returns a SP conflicted checker instance.

func (*SPConflictChecker) GenerateMigrateBucketUnits added in v0.2.4

func (checker *SPConflictChecker) GenerateMigrateBucketUnits(buildMetaByDB bool) ([]*BucketMigrateGVGExecuteUnit, error)

GenerateMigrateBucketUnits generate the migrate bucket units.

type SPExitGVGExecuteUnit added in v0.2.3

type SPExitGVGExecuteUnit struct {
	BasicGVGMigrateExecuteUnit
	RedundancyIndex int32 // if < 0, represents migrate primary.
	SwapOutKey      string
}

SPExitGVGExecuteUnit is used to record sp exit gvg unit.

func (*SPExitGVGExecuteUnit) Key added in v0.2.3

func (u *SPExitGVGExecuteUnit) Key() string

Key is used to as primary key.

type SPExitScheduler added in v0.2.3

type SPExitScheduler struct {
	// contains filtered or unexported fields
}

SPExitScheduler is used to manage and schedule sp exit process.

func NewSPExitScheduler added in v0.2.3

func NewSPExitScheduler(manager *ManageModular) (*SPExitScheduler, error)

NewSPExitScheduler returns a sp exit scheduler instance.

func (*SPExitScheduler) AddSwapOutToTaskRunner added in v0.2.3

func (s *SPExitScheduler) AddSwapOutToTaskRunner(swapOut *virtualgrouptypes.MsgSwapOut) error

AddSwapOutToTaskRunner is used to swap out to task runner from src sp.

func (*SPExitScheduler) Init added in v0.2.3

func (s *SPExitScheduler) Init(m *ManageModular) error

Init is used to load db subscribe progress and migrate progress.

func (*SPExitScheduler) ListSPExitPlan added in v0.2.3

func (s *SPExitScheduler) ListSPExitPlan() (*gfspserver.GfSpQuerySpExitResponse, error)

ListSPExitPlan is used to query sp exit status.

func (*SPExitScheduler) Start added in v0.2.3

func (s *SPExitScheduler) Start() error

Start function is used to subscribe sp exit event from metadata and produces a gvg migrate plan.

func (*SPExitScheduler) UpdateMigrateProgress added in v0.2.3

func (s *SPExitScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error

UpdateMigrateProgress is used to update migrate status from task executor.

type SendTxFunc added in v0.2.4

type SendTxFunc func() (string, error)

type SrcSPSwapOutPlan added in v0.2.3

type SrcSPSwapOutPlan struct {
	// contains filtered or unexported fields
}

SrcSPSwapOutPlan is used to record the execution of swap out.

func NewSrcSPSwapOutPlan added in v0.2.3

NewSrcSPSwapOutPlan returns a src sp swap out plan.

func (*SrcSPSwapOutPlan) CheckAndSendCompleteSPExitTx added in v0.2.3

func (plan *SrcSPSwapOutPlan) CheckAndSendCompleteSPExitTx(event *virtualgrouptypes.EventCompleteSwapOut) error

CheckAndSendCompleteSPExitTx is used to update swap out status anc check whether all swap out are finished. Send sp exit tx when all swap out are completed.

func (*SrcSPSwapOutPlan) Start added in v0.2.3

func (plan *SrcSPSwapOutPlan) Start() error

Start persist plan and task to db and task dispatcher

type SwapOutUnit added in v0.2.3

type SwapOutUnit struct {
	// contains filtered or unexported fields
}

SwapOutUnit is used by swap out plan and task runner.

func (*SwapOutUnit) CheckAndSendCompleteSwapOutTx added in v0.2.3

func (s *SwapOutUnit) CheckAndSendCompleteSwapOutTx(gUnit *SPExitGVGExecuteUnit, runner *DestSPTaskRunner) error

CheckAndSendCompleteSwapOutTx check whether complete swap out's gvg, if all finish, send tx to chain.

type TaskIterator added in v1.2.0

type TaskIterator struct {
	// contains filtered or unexported fields
}

TaskIterator is used to load retry task from db.

func NewTaskIterator added in v1.2.0

func NewTaskIterator(db spdb.SPDB, taskType RetryTaskType, rejectUnsealThresholdSecond int64) *TaskIterator

func (*TaskIterator) Next added in v1.2.0

func (iter *TaskIterator) Next()

func (*TaskIterator) Valid added in v1.2.0

func (iter *TaskIterator) Valid() bool

func (*TaskIterator) Value added in v1.2.0

func (iter *TaskIterator) Value() *spdb.UploadObjectMeta

type TaskRetryScheduler added in v1.2.0

type TaskRetryScheduler struct {
	// contains filtered or unexported fields
}

TaskRetryScheduler is used to schedule background task retry.

func NewTaskRetryScheduler added in v1.2.0

func NewTaskRetryScheduler(m *ManageModular) *TaskRetryScheduler

NewTaskRetryScheduler returns a task retry scheduler instance.

func (*TaskRetryScheduler) Start added in v1.2.0

func (s *TaskRetryScheduler) Start()

Start is used to start the task retry scheduler.

type VerifyGVGScheduler added in v1.3.0

type VerifyGVGScheduler struct {
	// contains filtered or unexported fields
}

VerifyGVGScheduler Verify that objects in GVG are recovered successfully or not.

verifying the object existence by querying integrate and piece_hash. a recover GVG unit is marked as completed from Processed only when all objects pass the verification.

func NewVerifyGVGScheduler added in v1.3.0

func NewVerifyGVGScheduler(m *ManageModular, gvgID uint32, redundancyIndex int32) (*VerifyGVGScheduler, error)

func (*VerifyGVGScheduler) Start added in v1.3.0

func (s *VerifyGVGScheduler) Start()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL