Documentation ¶
Index ¶
- Constants
- Variables
- func CheckGVGMetaConsistent(chainMetaList []*virtualgrouptypes.GlobalVirtualGroup, ...) bool
- func DefaultManagerOptions(manager *ManageModular, cfg *gfspconfig.GfSpConfig) (err error)
- func ErrGfSpDBWithDetail(detail string) *gfsperrors.GfSpError
- func GetSwapOutApprovalAndSendTx(baseApp *gfspapp.GfSpBaseApp, destSP *sptypes.StorageProvider, ...) (*virtualgrouptypes.MsgSwapOut, error)
- func MakeBucketMigrateKey(bucketID uint64, gvgID uint32) string
- func MakeGVGMigrateKey(gvgID uint32, vgfID uint32, redundancyIndex int32) string
- func NewManageModular(app *gfspapp.GfSpBaseApp, cfg *gfspconfig.GfSpConfig) (coremodule.Modular, error)
- func SendAndConfirmCompleteSPExitTx(baseApp *gfspapp.GfSpBaseApp, ...) error
- func SendAndConfirmCompleteSwapOutTx(baseApp *gfspapp.GfSpBaseApp, msg *virtualgrouptypes.MsgCompleteSwapOut) error
- func SendAndConfirmTx(chainClient consensus.Consensus, sendTxFunc SendTxFunc) error
- type BasicGVGMigrateExecuteUnit
- type BucketCache
- type BucketMigrateExecutePlan
- func (plan *BucketMigrateExecutePlan) QueryMigrateGVG(migrateKey string) (*spdb.MigrateGVGUnitMeta, error)
- func (plan *BucketMigrateExecutePlan) Start() error
- func (plan *BucketMigrateExecutePlan) UpdateMigrateGVGLastMigratedObjectID(migrateKey string, lastMigratedObjectID uint64) error
- func (plan *BucketMigrateExecutePlan) UpdateMigrateGVGRetryCount(migrateKey string, retryTime int) error
- type BucketMigrateGVGExecuteUnit
- type BucketMigrateScheduler
- func (s *BucketMigrateScheduler) Init(m *ManageModular) error
- func (s *BucketMigrateScheduler) PostMigrateBucket(postMsg *gfsptask.GfSpBucketMigrationInfo, srcSPInfo *sptypes.StorageProvider) error
- func (s *BucketMigrateScheduler) PreMigrateBucket(bucketID uint64, srcSPInfo *sptypes.StorageProvider) error
- func (s *BucketMigrateScheduler) Start() error
- func (s *BucketMigrateScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error
- type DestSPTaskRunner
- func (runner *DestSPTaskRunner) AddNewMigrateGVGUnit(remotedGVGUnit *SPExitGVGExecuteUnit) error
- func (runner *DestSPTaskRunner) AddNewSwapOut(swapOut *virtualgrouptypes.MsgSwapOut) error
- func (runner *DestSPTaskRunner) LoadFromDB() error
- func (runner *DestSPTaskRunner) Start() error
- func (runner *DestSPTaskRunner) UpdateMigrateGVGLastMigratedObjectID(migrateKey string, lastMigratedObjectID uint64) error
- func (runner *DestSPTaskRunner) UpdateMigrateGVGStatus(migrateKey string, st MigrateStatus) error
- type FamilyConflictChecker
- type GenerateGVGSecondarySPsPolicyByPrefer
- type ManageModular
- func (m *ManageModular) DispatchTask(ctx context.Context, limit rcmgr.Limit) (task.Task, error)
- func (m *ManageModular) FilterGCTask(qTask task.Task) bool
- func (m *ManageModular) FilterGVGTask(qTask task.Task) bool
- func (m *ManageModular) FilterReceiveTask(qTask task.Task) bool
- func (m *ManageModular) FilterUploadingTask(qTask task.Task) bool
- func (m *ManageModular) GCCacheQueue(qTask task.Task) bool
- func (m *ManageModular) GCMigrateGVGQueue(qTask task.Task) bool
- func (m *ManageModular) GCReceiveQueue(qTask task.Task) bool
- func (m *ManageModular) GCRecoverQueue(qTask task.Task) bool
- func (m *ManageModular) GCReplicatePieceQueue(qTask task.Task) bool
- func (m *ManageModular) GCResumableUploadObjectQueue(qTask task.Task) bool
- func (m *ManageModular) GCSealObjectQueue(qTask task.Task) bool
- func (m *ManageModular) GCUploadObjectQueue(qTask task.Task) bool
- func (m *ManageModular) GenerateGCBucketMigrationTask(ctx context.Context, bucketID, bucketSize uint64)
- func (m *ManageModular) HandleChallengePieceTask(ctx context.Context, task task.ChallengePieceTask) error
- func (m *ManageModular) HandleCreateGCBucketMigrationTask(ctx context.Context, task task.GCBucketMigrationTask) error
- func (m *ManageModular) HandleCreateResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error
- func (m *ManageModular) HandleCreateUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error
- func (m *ManageModular) HandleDoneResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error
- func (m *ManageModular) HandleDoneUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error
- func (m *ManageModular) HandleDownloadObjectTask(ctx context.Context, task task.DownloadObjectTask) error
- func (m *ManageModular) HandleGCBucketMigrationTask(ctx context.Context, gcBucketMigrationTask task.GCBucketMigrationTask) error
- func (m *ManageModular) HandleGCMetaTask(ctx context.Context, gcMetaTask task.GCMetaTask) error
- func (m *ManageModular) HandleGCObjectTask(ctx context.Context, gcTask task.GCObjectTask) error
- func (m *ManageModular) HandleGCZombiePieceTask(ctx context.Context, gcZombiePieceTask task.GCZombiePieceTask) error
- func (m *ManageModular) HandleMigrateGVGTask(ctx context.Context, task task.MigrateGVGTask) error
- func (m *ManageModular) HandleReceivePieceTask(ctx context.Context, task task.ReceivePieceTask) error
- func (m *ManageModular) HandleRecoverPieceTask(ctx context.Context, task task.RecoveryPieceTask) error
- func (m *ManageModular) HandleReplicatePieceTask(ctx context.Context, task task.ReplicatePieceTask) error
- func (m *ManageModular) HandleSealObjectTask(ctx context.Context, task task.SealObjectTask) error
- func (m *ManageModular) LoadTaskFromDB() error
- func (m *ManageModular) Name() string
- func (m *ManageModular) NotifyMigrateSwapOut(ctx context.Context, swapOut *virtualgrouptypes.MsgSwapOut) error
- func (m *ManageModular) NotifyPostMigrateBucket(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) error
- func (m *ManageModular) NotifyPreMigrateBucket(ctx context.Context, bucketID uint64) error
- func (m *ManageModular) PickUpTask(ctx context.Context, tasks []task.Task) (task.Task, []task.Task)
- func (m *ManageModular) PickVirtualGroupFamily(ctx context.Context, task task.ApprovalCreateBucketTask) (uint32, error)
- func (m *ManageModular) QueryBucketMigrate(ctx context.Context) (res *gfspserver.GfSpQueryBucketMigrateResponse, err error)
- func (m *ManageModular) QuerySpExit(ctx context.Context) (res *gfspserver.GfSpQuerySpExitResponse, err error)
- func (m *ManageModular) QueryTasks(ctx context.Context, subKey task.TKey) ([]task.Task, error)
- func (m *ManageModular) QueryTasksStats(_ context.Context) (uploadTasks int, replicateCount int, sealCount int, resumableUploadCount int, ...)
- func (m *ManageModular) RejectUnSealObject(ctx context.Context, object *storagetypes.ObjectInfo) error
- func (m *ManageModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan)
- func (m *ManageModular) ReserveResource(ctx context.Context, state *rcmgr.ScopeStat) (rcmgr.ResourceScopeSpan, error)
- func (m *ManageModular) ResetGCBucketMigrationQueue(qTask task.Task) bool
- func (m *ManageModular) ResetGCMetaTask(qTask task.Task) bool
- func (m *ManageModular) ResetGCObjectTask(qTask task.Task) bool
- func (m *ManageModular) ResetGCZombieTask(qTask task.Task) bool
- func (m *ManageModular) ResetRecoveryFailedList(_ context.Context) []string
- func (m *ManageModular) Start(ctx context.Context) error
- func (m *ManageModular) Statistics() string
- func (m *ManageModular) Stop(ctx context.Context) error
- func (m *ManageModular) TaskRecovering(ctx context.Context, task task.Task) bool
- func (m *ManageModular) TaskUploading(ctx context.Context, task task.Task) bool
- func (m *ManageModular) UploadingObjectNumber() int
- type MigrateStatus
- type NotifyDestSPIterator
- type PickDestGVGFilter
- type PickDestSPFilter
- type PrefetchFunc
- type RetryTaskType
- type SPConflictChecker
- type SPExitGVGExecuteUnit
- type SPExitScheduler
- func (s *SPExitScheduler) AddSwapOutToTaskRunner(swapOut *virtualgrouptypes.MsgSwapOut) error
- func (s *SPExitScheduler) Init(m *ManageModular) error
- func (s *SPExitScheduler) ListSPExitPlan() (*gfspserver.GfSpQuerySpExitResponse, error)
- func (s *SPExitScheduler) Start() error
- func (s *SPExitScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error
- type SendTxFunc
- type SrcSPSwapOutPlan
- type SwapOutUnit
- type TaskIterator
- type TaskRetryScheduler
Constants ¶
const ( // DiscontinueBucketReason defines the reason for stop serving DiscontinueBucketReason = "testnet cleanup" // DiscontinueBucketLimit define the max buckets to fetch in a single request DiscontinueBucketLimit = int64(500) // RejectUnSealObjectRetry defines the retry number of sending reject unseal object tx. RejectUnSealObjectRetry = 3 // RejectUnSealObjectTimeout defines the timeout of sending reject unseal object tx. RejectUnSealObjectTimeout = 3 // DefaultBackupTaskTimeout defines the timeout of backing up task for dispatching DefaultBackupTaskTimeout = 1 )
const ( // DefaultGlobalMaxUploadingNumber defines the default max uploading object number // in SP, include: uploading object to primary, replicate object to secondaries, // and sealing object on greenfield. DefaultGlobalMaxUploadingNumber int = 40960 // DefaultGlobalUploadObjectParallel defines the default max parallel uploading // objects to primary in SP system. DefaultGlobalUploadObjectParallel int = 10240 // DefaultGlobalReplicatePieceParallel defines the default max parallel replicating // objects to primary in SP system. DefaultGlobalReplicatePieceParallel int = 10240 // DefaultGlobalSealObjectParallel defines the default max parallel sealing objects // on greenfield in SP system. DefaultGlobalSealObjectParallel int = 10240 // DefaultGlobalReceiveObjectParallel defines the default max parallel confirming // receive pieces on greenfield in SP system. DefaultGlobalReceiveObjectParallel int = 10240 * 10 // DefaultGlobalBackupTaskParallel defines the default parallel backup tasks for // dispatching to task executor DefaultGlobalBackupTaskParallel int = 10240 * 100 // DefaultGlobalGCObjectParallel defines the default max parallel gc objects in SP // system. DefaultGlobalGCObjectParallel int = 4 // DefaultGlobalGCZombieParallel defines the default max parallel gc zombie pieces // in SP system. DefaultGlobalGCZombieParallel int = 1 // DefaultGlobalGCMetaParallel defines the default max parallel gc meta db in SP // system. DefaultGlobalGCMetaParallel int = 1 // DefaultGlobalGCBucketMigrationParallel defines the default max parallel gc bucket migration in SP // system. DefaultGlobalGCBucketMigrationParallel int = 1 // DefaultGlobalRecoveryPieceParallel defines the default max parallel recovery objects in SP // system. DefaultGlobalRecoveryPieceParallel int = 7 // DefaultGlobalMigrateGVGParallel defines the default max parallel migrating gvg in SP system. DefaultGlobalMigrateGVGParallel int = 200 // DefaultGlobalDownloadObjectTaskCacheSize defines the default max cache the download // object tasks in manager. DefaultGlobalDownloadObjectTaskCacheSize int = 4096 // DefaultGlobalChallengePieceTaskCacheSize defines the default max cache the challenge // piece tasks in manager. DefaultGlobalChallengePieceTaskCacheSize int = 4096 // DefaultGlobalBatchGCObjectTimeInterval defines the default interval for generating // gc object task. DefaultGlobalBatchGCObjectTimeInterval int = 1 * 60 // DefaultGlobalGCObjectBlockInterval defines the default blocks number for getting // deleted objects. DefaultGlobalGCObjectBlockInterval uint64 = 1000 // DefaultGlobalGCObjectSafeBlockDistance defines the default distance form current block // height to gc the deleted object. DefaultGlobalGCObjectSafeBlockDistance uint64 = 1000 // DefaultGlobalGCZombiePieceTimeInterval defines the default interval for generating // gc zombie piece task. DefaultGlobalGCZombiePieceTimeInterval int = 10 * 60 // DefaultGlobalGCZombiePieceObjectIDInterval defines the default object id number for getting // deleted zombie piece. DefaultGlobalGCZombiePieceObjectIDInterval uint64 = 100 // DefaultGlobalGCZombieSafeObjectIDDistance defines the default distance form current object id // to gc the deleted zombie piece. DefaultGlobalGCZombieSafeObjectIDDistance uint64 = 1000 // DefaultGlobalGCMetaTimeInterval defines the default interval for generating // gc meta task. DefaultGlobalGCMetaTimeInterval int = 10 * 60 // DefaultGlobalSyncConsensusInfoInterval defines the default interval for sync the sp // info list to sp db. DefaultGlobalSyncConsensusInfoInterval uint64 = 600 // DefaultStatisticsOutputInterval defines the default interval for output statistics info, // it is used to log and debug. DefaultStatisticsOutputInterval int = 60 // DefaultListenRejectUnSealTimeoutHeight defines the default listen reject unseal object // on greenfield timeout height, if after current block height + timeout height, the object // is not rejected, it is judged failed to reject unseal object on greenfield. DefaultListenRejectUnSealTimeoutHeight int = 10 // DefaultDiscontinueTimeInterval defines the default interval for starting discontinue // buckets task , used for test net. DefaultDiscontinueTimeInterval = 3 * 60 // DefaultDiscontinueBucketKeepAliveDays defines the default bucket keep alive days, after // the interval, buckets will be discontinued, used for test net. DefaultDiscontinueBucketKeepAliveDays = 7 // DefaultLoadReplicateTimeout defines the task timeout that load replicate tasks from sp db DefaultLoadReplicateTimeout int64 = 60 // DefaultLoadSealTimeout defines the task timeout that load seal tasks from sp db DefaultLoadSealTimeout int64 = 180 // DefaultSubscribeSPExitEventIntervalMillisecond define the default time interval to subscribe sp exit event from metadata. DefaultSubscribeSPExitEventIntervalMillisecond = 2000 // DefaultSubscribeBucketMigrateEventIntervalMillisecond define the default time interval to subscribe bucket migrate event from metadata. DefaultSubscribeBucketMigrateEventIntervalMillisecond = 2000 // DefaultSubscribeSwapOutEventIntervalMillisecond define the default time interval to subscribe gvg swap out event from metadata. DefaultSubscribeSwapOutEventIntervalMillisecond = 2000 )
const ( ManagerGCBlockNumber = "manager_gc_block_number" ManagerSuccessUpload = "manager_upload_object_success" ManagerFailureUpload = "manager_upload_object_failure" ManagerSuccessReplicate = "manager_replicate_object_success" ManagerFailureReplicate = "manager_replicate_object_failure" ManagerCancelReplicate = "manager_replicate_object_cancel" ManagerSuccessReplicateAndSeal = "manager_replicate_and_seal_object_success" ManagerFailureReplicateAndSeal = "manager_replicate_and_seal_object_failure" ManagerSuccessSeal = "manager_seal_object_success" ManagerFailureSeal = "manager_seal_object_failure" ManagerCancelSeal = "manager_seal_object_cancel" ManagerSuccessConfirmReceive = "manager_confirm_receive_success" ManagerFailureConfirmReceive = "manager_confirm_receive_failure" )
const (
SigExpireTimeSecond = 60 * 60
)
Variables ¶
var ( ErrDanglingTask = gfsperrors.Register(module.ManageModularName, http.StatusBadRequest, 60001, "OoooH... request lost") ErrRepeatedTask = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60002, "request repeated") ErrExceedTask = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60003, "OoooH... request exceed, try again later") ErrCanceledTask = gfsperrors.Register(module.ManageModularName, http.StatusBadRequest, 60004, "task canceled") ErrFutureSupport = gfsperrors.Register(module.ManageModularName, http.StatusNotFound, 60005, "future support") ErrNotifyMigrateSwapOut = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60006, "failed to notify swap out start") )
Functions ¶
func CheckGVGMetaConsistent ¶ added in v0.2.4
func CheckGVGMetaConsistent(chainMetaList []*virtualgrouptypes.GlobalVirtualGroup, dbMetaList []*spdb.MigrateGVGUnitMeta) bool
CheckGVGMetaConsistent verifies whether expectedSPGVGList completely matches with the migrateGVGUnitMeta loaded from the database.
func DefaultManagerOptions ¶
func DefaultManagerOptions(manager *ManageModular, cfg *gfspconfig.GfSpConfig) (err error)
func ErrGfSpDBWithDetail ¶ added in v0.2.4
func ErrGfSpDBWithDetail(detail string) *gfsperrors.GfSpError
func GetSwapOutApprovalAndSendTx ¶ added in v0.2.3
func GetSwapOutApprovalAndSendTx(baseApp *gfspapp.GfSpBaseApp, destSP *sptypes.StorageProvider, originMsg *virtualgrouptypes.MsgSwapOut) (*virtualgrouptypes.MsgSwapOut, error)
GetSwapOutApprovalAndSendTx is used to get approval from dest sp and notify it to start migrate swap out's gvg tasks.
func MakeBucketMigrateKey ¶ added in v0.2.3
func MakeGVGMigrateKey ¶ added in v0.2.3
func NewManageModular ¶
func NewManageModular(app *gfspapp.GfSpBaseApp, cfg *gfspconfig.GfSpConfig) (coremodule.Modular, error)
func SendAndConfirmCompleteSPExitTx ¶ added in v0.2.4
func SendAndConfirmCompleteSPExitTx(baseApp *gfspapp.GfSpBaseApp, msg *virtualgrouptypes.MsgCompleteStorageProviderExit) error
func SendAndConfirmCompleteSwapOutTx ¶ added in v0.2.4
func SendAndConfirmCompleteSwapOutTx(baseApp *gfspapp.GfSpBaseApp, msg *virtualgrouptypes.MsgCompleteSwapOut) error
func SendAndConfirmTx ¶ added in v0.2.4
func SendAndConfirmTx(chainClient consensus.Consensus, sendTxFunc SendTxFunc) error
Types ¶
type BasicGVGMigrateExecuteUnit ¶ added in v0.2.4
type BasicGVGMigrateExecuteUnit struct { SrcGVG *virtualgrouptypes.GlobalVirtualGroup SrcSP *sptypes.StorageProvider DestSP *sptypes.StorageProvider // self sp. MigrateStatus MigrateStatus LastMigratedObjectID uint64 }
type BucketCache ¶ added in v0.2.4
BucketCache is an LRU cache.
func NewBucketCache ¶ added in v0.2.4
func NewBucketCache(maxEntries int, expire time.Duration) *BucketCache
NewBucketCache creates a new Cache. If maxEntries is zero, the cache has no limit and it's assumed that eviction is done by the caller.
func (*BucketCache) Delete ¶ added in v0.2.4
func (c *BucketCache) Delete(key string)
Delete removes the provided key from the cache.
func (*BucketCache) Flush ¶ added in v0.2.4
func (c *BucketCache) Flush()
Flush empties the whole cache
func (*BucketCache) Get ¶ added in v0.2.4
func (c *BucketCache) Get(key string) (value interface{}, ok bool)
Get looks up a key's value from the cache.
func (*BucketCache) Len ¶ added in v0.2.4
func (c *BucketCache) Len() int
Len returns the number of items in the cache.
func (*BucketCache) Set ¶ added in v0.2.4
func (c *BucketCache) Set(key string, value interface{})
Set adds a value to the cache
type BucketMigrateExecutePlan ¶ added in v0.2.3
type BucketMigrateExecutePlan struct {
// contains filtered or unexported fields
}
BucketMigrateExecutePlan is used to manage bucket migrate process.
func (*BucketMigrateExecutePlan) QueryMigrateGVG ¶ added in v1.1.0
func (plan *BucketMigrateExecutePlan) QueryMigrateGVG(migrateKey string) (*spdb.MigrateGVGUnitMeta, error)
QueryMigrateGVG Query migrate GVG unit
func (*BucketMigrateExecutePlan) Start ¶ added in v0.2.3
func (plan *BucketMigrateExecutePlan) Start() error
func (*BucketMigrateExecutePlan) UpdateMigrateGVGLastMigratedObjectID ¶ added in v0.2.3
func (plan *BucketMigrateExecutePlan) UpdateMigrateGVGLastMigratedObjectID(migrateKey string, lastMigratedObjectID uint64) error
UpdateMigrateGVGLastMigratedObjectID persistent user updates and periodic progress reporting by Executor
func (*BucketMigrateExecutePlan) UpdateMigrateGVGRetryCount ¶ added in v1.1.0
func (plan *BucketMigrateExecutePlan) UpdateMigrateGVGRetryCount(migrateKey string, retryTime int) error
type BucketMigrateGVGExecuteUnit ¶ added in v0.2.3
type BucketMigrateGVGExecuteUnit struct { BasicGVGMigrateExecuteUnit BucketID uint64 DestGVGID uint32 DestGVG *virtualgrouptypes.GlobalVirtualGroup }
BucketMigrateGVGExecuteUnit is used to record bucket migrate gvg unit.
func (*BucketMigrateGVGExecuteUnit) Key ¶ added in v0.2.3
func (ub *BucketMigrateGVGExecuteUnit) Key() string
Key is used to as primary key.
type BucketMigrateScheduler ¶ added in v0.2.3
type BucketMigrateScheduler struct {
// contains filtered or unexported fields
}
BucketMigrateScheduler subscribes bucket migrate events and produces a gvg migrate plan.
func NewBucketMigrateScheduler ¶ added in v0.2.3
func NewBucketMigrateScheduler(manager *ManageModular) (*BucketMigrateScheduler, error)
NewBucketMigrateScheduler returns a bucket migrate scheduler instance.
func (*BucketMigrateScheduler) Init ¶ added in v0.2.3
func (s *BucketMigrateScheduler) Init(m *ManageModular) error
func (*BucketMigrateScheduler) PostMigrateBucket ¶ added in v1.1.0
func (s *BucketMigrateScheduler) PostMigrateBucket(postMsg *gfsptask.GfSpBucketMigrationInfo, srcSPInfo *sptypes.StorageProvider) error
PostMigrateBucket is used to pick a suitable gvg for replicating object.
func (*BucketMigrateScheduler) PreMigrateBucket ¶ added in v1.1.0
func (s *BucketMigrateScheduler) PreMigrateBucket(bucketID uint64, srcSPInfo *sptypes.StorageProvider) error
PreMigrateBucket is used to pick a suitable gvg for replicating object.
func (*BucketMigrateScheduler) Start ¶ added in v0.2.3
func (s *BucketMigrateScheduler) Start() error
func (*BucketMigrateScheduler) UpdateMigrateProgress ¶ added in v0.2.3
func (s *BucketMigrateScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error
type DestSPTaskRunner ¶ added in v0.2.3
type DestSPTaskRunner struct {
// contains filtered or unexported fields
}
DestSPTaskRunner is used to manage task migrate progress/status in dest sp.
func NewDestSPTaskRunner ¶ added in v0.2.3
func NewDestSPTaskRunner(m *ManageModular, v vgmgr.VirtualGroupManager) *DestSPTaskRunner
NewDestSPTaskRunner returns a dest sp task runner instance.
func (*DestSPTaskRunner) AddNewMigrateGVGUnit ¶ added in v0.2.3
func (runner *DestSPTaskRunner) AddNewMigrateGVGUnit(remotedGVGUnit *SPExitGVGExecuteUnit) error
AddNewMigrateGVGUnit is used to add new gvg task to task runner.
func (*DestSPTaskRunner) AddNewSwapOut ¶ added in v0.2.3
func (runner *DestSPTaskRunner) AddNewSwapOut(swapOut *virtualgrouptypes.MsgSwapOut) error
AddNewSwapOut is used to add new swap out to the task runner.
func (*DestSPTaskRunner) LoadFromDB ¶ added in v0.2.3
func (runner *DestSPTaskRunner) LoadFromDB() error
LoadFromDB is used to rebuild swap out status at start up.
func (*DestSPTaskRunner) Start ¶ added in v0.2.3
func (runner *DestSPTaskRunner) Start() error
Start is used to push task runner's gvg migrate to manager task dispatcher.
func (*DestSPTaskRunner) UpdateMigrateGVGLastMigratedObjectID ¶ added in v0.2.3
func (runner *DestSPTaskRunner) UpdateMigrateGVGLastMigratedObjectID(migrateKey string, lastMigratedObjectID uint64) error
UpdateMigrateGVGLastMigratedObjectID is used to update gvg task progress.
func (*DestSPTaskRunner) UpdateMigrateGVGStatus ¶ added in v0.2.3
func (runner *DestSPTaskRunner) UpdateMigrateGVGStatus(migrateKey string, st MigrateStatus) error
UpdateMigrateGVGStatus is used to update gvg task status.
type FamilyConflictChecker ¶ added in v0.2.3
type FamilyConflictChecker struct {
// contains filtered or unexported fields
}
FamilyConflictChecker 1.Current virtual group and sp status
sp_list=[sp1,sp2,sp3,sp4,sp5,sp6,sp7,sp8] family1 = {primary=sp1, gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp7), gvg2=(sp1,sp2,sp3,sp4,sp5,sp6,sp8))}
2.Resolve conflict
sp1 exit, cannot pick a sp to replace sp1, there is a conflict. resolveConflictGVGMigrateUnits = gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp7)->gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp8)
3.After resolve conflict
family1 = {primary=sp1, gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp8), gvg2=(sp1,sp2,sp3,sp4,sp5,sp6,sp8))}
4.Primary migrate
pick dst_primary_sp=sp7, and migrate family: family1 = {primary=sp7, gvg1=(sp1,sp2,sp3,sp4,sp5,sp6,sp8), gvg2=(sp7,sp2,sp3,sp4,sp5,sp6,sp8))}
5.Complete sp exit
sp1 complete sp exit.
func NewFamilyConflictChecker ¶ added in v0.2.3
func NewFamilyConflictChecker(f *virtualgrouptypes.GlobalVirtualGroupFamily, p *SrcSPSwapOutPlan, s *sptypes.StorageProvider) *FamilyConflictChecker
NewFamilyConflictChecker returns a family conflicted checker instance.
func (*FamilyConflictChecker) GenerateSwapOutUnits ¶ added in v0.2.3
func (checker *FamilyConflictChecker) GenerateSwapOutUnits(buildMetaByDB bool) ([]*SwapOutUnit, error)
GenerateSwapOutUnits generate the family swap out units.
type GenerateGVGSecondarySPsPolicyByPrefer ¶ added in v0.2.3
type GenerateGVGSecondarySPsPolicyByPrefer struct {
// contains filtered or unexported fields
}
func NewGenerateGVGSecondarySPsPolicyByPrefer ¶ added in v0.2.3
func NewGenerateGVGSecondarySPsPolicyByPrefer(p *storagetypes.Params, preferSPIDList []uint32) *GenerateGVGSecondarySPsPolicyByPrefer
func (*GenerateGVGSecondarySPsPolicyByPrefer) AddCandidateSP ¶ added in v0.2.3
func (p *GenerateGVGSecondarySPsPolicyByPrefer) AddCandidateSP(spID uint32)
func (*GenerateGVGSecondarySPsPolicyByPrefer) GenerateGVGSecondarySPs ¶ added in v0.2.3
func (p *GenerateGVGSecondarySPsPolicyByPrefer) GenerateGVGSecondarySPs() ([]uint32, error)
type ManageModular ¶
type ManageModular struct {
// contains filtered or unexported fields
}
func (*ManageModular) DispatchTask ¶
func (*ManageModular) FilterGCTask ¶
func (m *ManageModular) FilterGCTask(qTask task.Task) bool
func (*ManageModular) FilterGVGTask ¶ added in v0.2.4
func (m *ManageModular) FilterGVGTask(qTask task.Task) bool
func (*ManageModular) FilterReceiveTask ¶ added in v0.2.3
func (m *ManageModular) FilterReceiveTask(qTask task.Task) bool
func (*ManageModular) FilterUploadingTask ¶
func (m *ManageModular) FilterUploadingTask(qTask task.Task) bool
func (*ManageModular) GCCacheQueue ¶
func (m *ManageModular) GCCacheQueue(qTask task.Task) bool
func (*ManageModular) GCMigrateGVGQueue ¶ added in v0.2.3
func (m *ManageModular) GCMigrateGVGQueue(qTask task.Task) bool
func (*ManageModular) GCReceiveQueue ¶
func (m *ManageModular) GCReceiveQueue(qTask task.Task) bool
func (*ManageModular) GCRecoverQueue ¶ added in v0.2.3
func (m *ManageModular) GCRecoverQueue(qTask task.Task) bool
func (*ManageModular) GCReplicatePieceQueue ¶
func (m *ManageModular) GCReplicatePieceQueue(qTask task.Task) bool
func (*ManageModular) GCResumableUploadObjectQueue ¶ added in v0.2.3
func (m *ManageModular) GCResumableUploadObjectQueue(qTask task.Task) bool
func (*ManageModular) GCSealObjectQueue ¶
func (m *ManageModular) GCSealObjectQueue(qTask task.Task) bool
func (*ManageModular) GCUploadObjectQueue ¶
func (m *ManageModular) GCUploadObjectQueue(qTask task.Task) bool
func (*ManageModular) GenerateGCBucketMigrationTask ¶ added in v1.2.0
func (m *ManageModular) GenerateGCBucketMigrationTask(ctx context.Context, bucketID, bucketSize uint64)
func (*ManageModular) HandleChallengePieceTask ¶
func (m *ManageModular) HandleChallengePieceTask(ctx context.Context, task task.ChallengePieceTask) error
func (*ManageModular) HandleCreateGCBucketMigrationTask ¶ added in v1.2.0
func (m *ManageModular) HandleCreateGCBucketMigrationTask(ctx context.Context, task task.GCBucketMigrationTask) error
func (*ManageModular) HandleCreateResumableUploadObjectTask ¶ added in v0.2.3
func (m *ManageModular) HandleCreateResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error
func (*ManageModular) HandleCreateUploadObjectTask ¶
func (m *ManageModular) HandleCreateUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error
func (*ManageModular) HandleDoneResumableUploadObjectTask ¶ added in v0.2.3
func (m *ManageModular) HandleDoneResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error
func (*ManageModular) HandleDoneUploadObjectTask ¶
func (m *ManageModular) HandleDoneUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error
func (*ManageModular) HandleDownloadObjectTask ¶
func (m *ManageModular) HandleDownloadObjectTask(ctx context.Context, task task.DownloadObjectTask) error
func (*ManageModular) HandleGCBucketMigrationTask ¶ added in v1.2.0
func (m *ManageModular) HandleGCBucketMigrationTask(ctx context.Context, gcBucketMigrationTask task.GCBucketMigrationTask) error
func (*ManageModular) HandleGCMetaTask ¶
func (m *ManageModular) HandleGCMetaTask(ctx context.Context, gcMetaTask task.GCMetaTask) error
func (*ManageModular) HandleGCObjectTask ¶
func (m *ManageModular) HandleGCObjectTask(ctx context.Context, gcTask task.GCObjectTask) error
func (*ManageModular) HandleGCZombiePieceTask ¶
func (m *ManageModular) HandleGCZombiePieceTask(ctx context.Context, gcZombiePieceTask task.GCZombiePieceTask) error
func (*ManageModular) HandleMigrateGVGTask ¶ added in v0.2.3
func (m *ManageModular) HandleMigrateGVGTask(ctx context.Context, task task.MigrateGVGTask) error
func (*ManageModular) HandleReceivePieceTask ¶
func (m *ManageModular) HandleReceivePieceTask(ctx context.Context, task task.ReceivePieceTask) error
func (*ManageModular) HandleRecoverPieceTask ¶ added in v0.2.3
func (m *ManageModular) HandleRecoverPieceTask(ctx context.Context, task task.RecoveryPieceTask) error
func (*ManageModular) HandleReplicatePieceTask ¶
func (m *ManageModular) HandleReplicatePieceTask(ctx context.Context, task task.ReplicatePieceTask) error
func (*ManageModular) HandleSealObjectTask ¶
func (m *ManageModular) HandleSealObjectTask(ctx context.Context, task task.SealObjectTask) error
func (*ManageModular) LoadTaskFromDB ¶
func (m *ManageModular) LoadTaskFromDB() error
func (*ManageModular) Name ¶
func (m *ManageModular) Name() string
func (*ManageModular) NotifyMigrateSwapOut ¶ added in v0.2.3
func (m *ManageModular) NotifyMigrateSwapOut(ctx context.Context, swapOut *virtualgrouptypes.MsgSwapOut) error
NotifyMigrateSwapOut is used to receive migrate gvg task from src sp.
func (*ManageModular) NotifyPostMigrateBucket ¶ added in v1.1.0
func (m *ManageModular) NotifyPostMigrateBucket(ctx context.Context, bmInfo *gfsptask.GfSpBucketMigrationInfo) error
NotifyPostMigrateBucket is used to notify src sp confirm that only one Post migrate bucket is allowed.
func (*ManageModular) NotifyPreMigrateBucket ¶ added in v1.1.0
func (m *ManageModular) NotifyPreMigrateBucket(ctx context.Context, bucketID uint64) error
NotifyPreMigrateBucket is used to notify record bucket is migrating
func (*ManageModular) PickUpTask ¶
func (*ManageModular) PickVirtualGroupFamily ¶ added in v0.2.3
func (m *ManageModular) PickVirtualGroupFamily(ctx context.Context, task task.ApprovalCreateBucketTask) (uint32, error)
PickVirtualGroupFamily is used to pick a suitable vgf for creating bucket.
func (*ManageModular) QueryBucketMigrate ¶ added in v0.2.3
func (m *ManageModular) QueryBucketMigrate(ctx context.Context) (res *gfspserver.GfSpQueryBucketMigrateResponse, err error)
func (*ManageModular) QuerySpExit ¶ added in v0.2.3
func (m *ManageModular) QuerySpExit(ctx context.Context) (res *gfspserver.GfSpQuerySpExitResponse, err error)
func (*ManageModular) QueryTasks ¶
func (*ManageModular) QueryTasksStats ¶ added in v0.2.6
func (*ManageModular) RejectUnSealObject ¶
func (m *ManageModular) RejectUnSealObject(ctx context.Context, object *storagetypes.ObjectInfo) error
func (*ManageModular) ReleaseResource ¶
func (m *ManageModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan)
func (*ManageModular) ReserveResource ¶
func (m *ManageModular) ReserveResource(ctx context.Context, state *rcmgr.ScopeStat) (rcmgr.ResourceScopeSpan, error)
func (*ManageModular) ResetGCBucketMigrationQueue ¶ added in v1.2.0
func (m *ManageModular) ResetGCBucketMigrationQueue(qTask task.Task) bool
func (*ManageModular) ResetGCMetaTask ¶ added in v1.2.0
func (m *ManageModular) ResetGCMetaTask(qTask task.Task) bool
func (*ManageModular) ResetGCObjectTask ¶
func (m *ManageModular) ResetGCObjectTask(qTask task.Task) bool
func (*ManageModular) ResetGCZombieTask ¶ added in v1.2.0
func (m *ManageModular) ResetGCZombieTask(qTask task.Task) bool
func (*ManageModular) ResetRecoveryFailedList ¶ added in v1.1.0
func (m *ManageModular) ResetRecoveryFailedList(_ context.Context) []string
func (*ManageModular) Statistics ¶
func (m *ManageModular) Statistics() string
func (*ManageModular) TaskRecovering ¶ added in v0.2.3
func (*ManageModular) TaskUploading ¶
func (*ManageModular) UploadingObjectNumber ¶
func (m *ManageModular) UploadingObjectNumber() int
type MigrateStatus ¶ added in v0.2.3
type MigrateStatus int32
var ( WaitForMigrate MigrateStatus = 0 Migrating MigrateStatus = 1 Migrated MigrateStatus = 2 )
migrate: WaitForMigrate(created)->Migrating(schedule success)->Migrated(executor report success).
type NotifyDestSPIterator ¶ added in v0.2.3
type NotifyDestSPIterator struct {
// contains filtered or unexported fields
}
NotifyDestSPIterator is used to notify/check migrate units to dest sp.
func NewNotifyDestSPIterator ¶ added in v0.2.3
func NewNotifyDestSPIterator(plan *SrcSPSwapOutPlan) *NotifyDestSPIterator
NewNotifyDestSPIterator returns notify dest sp iterator.
func (*NotifyDestSPIterator) Next ¶ added in v0.2.3
func (iter *NotifyDestSPIterator) Next()
func (*NotifyDestSPIterator) Valid ¶ added in v0.2.3
func (iter *NotifyDestSPIterator) Valid() bool
func (*NotifyDestSPIterator) Value ¶ added in v0.2.3
func (iter *NotifyDestSPIterator) Value() *virtualgrouptypes.MsgSwapOut
type PickDestGVGFilter ¶ added in v0.2.3
type PickDestGVGFilter struct {
// contains filtered or unexported fields
}
PickDestGVGFilter is used to pick dest gvg for bucket migrate.
func NewPickDestGVGFilter ¶ added in v0.2.3
func NewPickDestGVGFilter(familyID uint32, secondarySPIDs []uint32, minFreeSize uint64) *PickDestGVGFilter
func (*PickDestGVGFilter) CheckFamily ¶ added in v0.2.3
func (f *PickDestGVGFilter) CheckFamily(familyID uint32) bool
func (*PickDestGVGFilter) CheckGVG ¶ added in v0.2.3
func (f *PickDestGVGFilter) CheckGVG(gvgMeta *vgmgr.GlobalVirtualGroupMeta) bool
type PickDestSPFilter ¶ added in v0.2.3
type PickDestSPFilter struct {
// contains filtered or unexported fields
}
PickDestSPFilter is used to pick sp id which is not in excluded sp ids, which is used by src sp to pick dest sp.
func NewPickDestSPFilterWithMap ¶ added in v0.2.3
func NewPickDestSPFilterWithMap(m map[uint32]int) *PickDestSPFilter
NewPickDestSPFilterWithMap returns a PickDestSPFilter instance.
func NewPickDestSPFilterWithSlice ¶ added in v0.2.3
func NewPickDestSPFilterWithSlice(s []uint32) *PickDestSPFilter
NewPickDestSPFilterWithSlice returns a PickDestSPFilter instance.
func (*PickDestSPFilter) Check ¶ added in v0.2.3
func (f *PickDestSPFilter) Check(spID uint32) bool
Check returns true when candidate sp meets the check condition.
type PrefetchFunc ¶ added in v1.2.0
type PrefetchFunc func(iter *TaskIterator) ([]*spdb.UploadObjectMeta, error)
type RetryTaskType ¶ added in v1.2.0
type RetryTaskType int32
type SPConflictChecker ¶ added in v0.2.4
type SPConflictChecker struct {
// contains filtered or unexported fields
}
func NewSPConflictChecker ¶ added in v0.2.4
func NewSPConflictChecker(p *BucketMigrateExecutePlan, src, dest *sptypes.StorageProvider, bucketID uint64) *SPConflictChecker
NewSPConflictChecker returns a SP conflicted checker instance.
func (*SPConflictChecker) GenerateMigrateBucketUnits ¶ added in v0.2.4
func (checker *SPConflictChecker) GenerateMigrateBucketUnits(buildMetaByDB bool) ([]*BucketMigrateGVGExecuteUnit, error)
GenerateMigrateBucketUnits generate the migrate bucket units.
type SPExitGVGExecuteUnit ¶ added in v0.2.3
type SPExitGVGExecuteUnit struct { BasicGVGMigrateExecuteUnit RedundancyIndex int32 // if < 0, represents migrate primary. SwapOutKey string }
SPExitGVGExecuteUnit is used to record sp exit gvg unit.
func (*SPExitGVGExecuteUnit) Key ¶ added in v0.2.3
func (u *SPExitGVGExecuteUnit) Key() string
Key is used to as primary key.
type SPExitScheduler ¶ added in v0.2.3
type SPExitScheduler struct {
// contains filtered or unexported fields
}
SPExitScheduler is used to manage and schedule sp exit process.
func NewSPExitScheduler ¶ added in v0.2.3
func NewSPExitScheduler(manager *ManageModular) (*SPExitScheduler, error)
NewSPExitScheduler returns a sp exit scheduler instance.
func (*SPExitScheduler) AddSwapOutToTaskRunner ¶ added in v0.2.3
func (s *SPExitScheduler) AddSwapOutToTaskRunner(swapOut *virtualgrouptypes.MsgSwapOut) error
AddSwapOutToTaskRunner is used to swap out to task runner from src sp.
func (*SPExitScheduler) Init ¶ added in v0.2.3
func (s *SPExitScheduler) Init(m *ManageModular) error
Init is used to load db subscribe progress and migrate progress.
func (*SPExitScheduler) ListSPExitPlan ¶ added in v0.2.3
func (s *SPExitScheduler) ListSPExitPlan() (*gfspserver.GfSpQuerySpExitResponse, error)
ListSPExitPlan is used to query sp exit status.
func (*SPExitScheduler) Start ¶ added in v0.2.3
func (s *SPExitScheduler) Start() error
Start function is used to subscribe sp exit event from metadata and produces a gvg migrate plan.
func (*SPExitScheduler) UpdateMigrateProgress ¶ added in v0.2.3
func (s *SPExitScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error
UpdateMigrateProgress is used to update migrate status from task executor.
type SendTxFunc ¶ added in v0.2.4
type SrcSPSwapOutPlan ¶ added in v0.2.3
type SrcSPSwapOutPlan struct {
// contains filtered or unexported fields
}
SrcSPSwapOutPlan is used to record the execution of swap out.
func NewSrcSPSwapOutPlan ¶ added in v0.2.3
func NewSrcSPSwapOutPlan(m *ManageModular, s *SPExitScheduler, v vgmgr.VirtualGroupManager) *SrcSPSwapOutPlan
NewSrcSPSwapOutPlan returns a src sp swap out plan.
func (*SrcSPSwapOutPlan) CheckAndSendCompleteSPExitTx ¶ added in v0.2.3
func (plan *SrcSPSwapOutPlan) CheckAndSendCompleteSPExitTx(event *virtualgrouptypes.EventCompleteSwapOut) error
CheckAndSendCompleteSPExitTx is used to update swap out status anc check whether all swap out are finished. Send sp exit tx when all swap out are completed.
func (*SrcSPSwapOutPlan) Start ¶ added in v0.2.3
func (plan *SrcSPSwapOutPlan) Start() error
Start persist plan and task to db and task dispatcher
type SwapOutUnit ¶ added in v0.2.3
type SwapOutUnit struct {
// contains filtered or unexported fields
}
SwapOutUnit is used by swap out plan and task runner.
func (*SwapOutUnit) CheckAndSendCompleteSwapOutTx ¶ added in v0.2.3
func (s *SwapOutUnit) CheckAndSendCompleteSwapOutTx(gUnit *SPExitGVGExecuteUnit, runner *DestSPTaskRunner) error
CheckAndSendCompleteSwapOutTx check whether complete swap out's gvg, if all finish, send tx to chain.
type TaskIterator ¶ added in v1.2.0
type TaskIterator struct {
// contains filtered or unexported fields
}
TaskIterator is used to load retry task from db.
func NewTaskIterator ¶ added in v1.2.0
func NewTaskIterator(db spdb.SPDB, taskType RetryTaskType, rejectUnsealThresholdSecond int64) *TaskIterator
func (*TaskIterator) Next ¶ added in v1.2.0
func (iter *TaskIterator) Next()
func (*TaskIterator) Valid ¶ added in v1.2.0
func (iter *TaskIterator) Valid() bool
func (*TaskIterator) Value ¶ added in v1.2.0
func (iter *TaskIterator) Value() *spdb.UploadObjectMeta
type TaskRetryScheduler ¶ added in v1.2.0
type TaskRetryScheduler struct {
// contains filtered or unexported fields
}
TaskRetryScheduler is used to schedule background task retry.
func NewTaskRetryScheduler ¶ added in v1.2.0
func NewTaskRetryScheduler(m *ManageModular) *TaskRetryScheduler
NewTaskRetryScheduler returns a task retry scheduler instance.
func (*TaskRetryScheduler) Start ¶ added in v1.2.0
func (s *TaskRetryScheduler) Start()
Start is used to start the task retry scheduler.