Documentation ¶
Index ¶
- Constants
- Variables
- func DefaultManagerOptions(manager *ManageModular, cfg *gfspconfig.GfSpConfig) (err error)
- func MakeBucketMigrateKey(bucketID uint64, gvgID uint32) string
- func MakeRemotedGVGMigrateKey(gvgID uint32, vgfID uint32, redundancyIndex int32) string
- func MakeSecondaryGVGMigrateKey(gvgID uint32, vgfID uint32, redundancyIndex int32) string
- func NewManageModular(app *gfspapp.GfSpBaseApp, cfg *gfspconfig.GfSpConfig) (coremodule.Modular, error)
- type BucketMigrateExecutePlan
- type BucketMigrateScheduler
- type GlobalVirtualGroupMigrateExecuteUnit
- type GlobalVirtualGroupMigrateExecuteUnitByBucket
- type ManageModular
- func (m *ManageModular) DispatchTask(ctx context.Context, limit rcmgr.Limit) (task.Task, error)
- func (m *ManageModular) FilterGCTask(qTask task.Task) bool
- func (m *ManageModular) FilterReceiveTask(qTask task.Task) bool
- func (m *ManageModular) FilterUploadingTask(qTask task.Task) bool
- func (m *ManageModular) GCCacheQueue(qTask task.Task) bool
- func (m *ManageModular) GCMigrateGVGQueue(qTask task.Task) bool
- func (m *ManageModular) GCReceiveQueue(qTask task.Task) bool
- func (m *ManageModular) GCRecoverQueue(qTask task.Task) bool
- func (m *ManageModular) GCReplicatePieceQueue(qTask task.Task) bool
- func (m *ManageModular) GCResumableUploadObjectQueue(qTask task.Task) bool
- func (m *ManageModular) GCSealObjectQueue(qTask task.Task) bool
- func (m *ManageModular) GCUploadObjectQueue(qTask task.Task) bool
- func (m *ManageModular) HandleChallengePieceTask(ctx context.Context, task task.ChallengePieceTask) error
- func (m *ManageModular) HandleCreateResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error
- func (m *ManageModular) HandleCreateUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error
- func (m *ManageModular) HandleDoneResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error
- func (m *ManageModular) HandleDoneUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error
- func (m *ManageModular) HandleDownloadObjectTask(ctx context.Context, task task.DownloadObjectTask) error
- func (m *ManageModular) HandleGCMetaTask(ctx context.Context, task task.GCMetaTask) error
- func (m *ManageModular) HandleGCObjectTask(ctx context.Context, gcTask task.GCObjectTask) error
- func (m *ManageModular) HandleGCZombiePieceTask(ctx context.Context, task task.GCZombiePieceTask) error
- func (m *ManageModular) HandleMigrateGVGTask(ctx context.Context, task task.MigrateGVGTask) error
- func (m *ManageModular) HandleReceivePieceTask(ctx context.Context, task task.ReceivePieceTask) error
- func (m *ManageModular) HandleRecoverPieceTask(ctx context.Context, task task.RecoveryPieceTask) error
- func (m *ManageModular) HandleReplicatePieceTask(ctx context.Context, task task.ReplicatePieceTask) error
- func (m *ManageModular) HandleSealObjectTask(ctx context.Context, task task.SealObjectTask) error
- func (m *ManageModular) LoadTaskFromDB() error
- func (m *ManageModular) Name() string
- func (m *ManageModular) NotifyMigrateGVG(ctx context.Context, task task.MigrateGVGTask) error
- func (m *ManageModular) PickUpTask(ctx context.Context, tasks []task.Task) (task.Task, []task.Task)
- func (m *ManageModular) PickVirtualGroupFamily(ctx context.Context, task task.ApprovalCreateBucketTask) (uint32, error)
- func (m *ManageModular) PickVirtualGroupFamilyForBucketMigrate(ctx context.Context) (uint32, error)
- func (m *ManageModular) QueryTasks(ctx context.Context, subKey task.TKey) ([]task.Task, error)
- func (m *ManageModular) RejectUnSealObject(ctx context.Context, object *storagetypes.ObjectInfo) error
- func (m *ManageModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan)
- func (m *ManageModular) ReserveResource(ctx context.Context, state *rcmgr.ScopeStat) (rcmgr.ResourceScopeSpan, error)
- func (m *ManageModular) ResetGCObjectTask(qTask task.Task) bool
- func (m *ManageModular) Start(ctx context.Context) error
- func (m *ManageModular) Statistics() string
- func (m *ManageModular) Stop(ctx context.Context) error
- func (m *ManageModular) TaskRecovering(ctx context.Context, task task.Task) bool
- func (m *ManageModular) TaskUploading(ctx context.Context, task task.Task) bool
- func (m *ManageModular) UploadingObjectNumber() int
- type MigrateStatus
- type MigrateTaskRunner
- func (runner *MigrateTaskRunner) AddNewMigrateGVGUnit(remotedGVGUnit *GlobalVirtualGroupMigrateExecuteUnit) error
- func (runner *MigrateTaskRunner) Init() error
- func (runner *MigrateTaskRunner) Start() error
- func (runner *MigrateTaskRunner) UpdateMigrateGVGLastMigratedObjectID(migrateKey string, lastMigratedObjectID uint64) error
- func (runner *MigrateTaskRunner) UpdateMigrateGVGStatus(migrateKey string, st MigrateStatus) error
- type NotifiedDestSPIterator
- type PickDestSPFilter
- type SPExitExecutePlan
- type SPExitScheduler
- type VirtualGroupFamilyMigrateExecuteUnit
- type WaitForNotifyDestSPIterator
Constants ¶
const ( // DiscontinueBucketReason defines the reason for stop serving DiscontinueBucketReason = "testnet cleanup" // DiscontinueBucketLimit define the max buckets to fetch in a single request DiscontinueBucketLimit = int64(500) // RejectUnSealObjectRetry defines the retry number of sending reject unseal object tx. RejectUnSealObjectRetry = 3 // RejectUnSealObjectTimeout defines the timeout of sending reject unseal object tx. RejectUnSealObjectTimeout = 3 )
const ( // DefaultGlobalMaxUploadingNumber defines the default max uploading object number // in SP, include: uploading object to primary, replicate object to secondaries, // and sealing object on greenfield. DefaultGlobalMaxUploadingNumber int = 40960 // DefaultGlobalUploadObjectParallel defines the default max parallel uploading // objects to primary in SP system. DefaultGlobalUploadObjectParallel int = 10240 // DefaultGlobalReplicatePieceParallel defines the default max parallel replicating // objects to primary in SP system. DefaultGlobalReplicatePieceParallel int = 10240 // DefaultGlobalSealObjectParallel defines the default max parallel sealing objects // on greenfield in SP system. DefaultGlobalSealObjectParallel int = 10240 // DefaultGlobalReceiveObjectParallel defines the default max parallel confirming // receive pieces on greenfield in SP system. DefaultGlobalReceiveObjectParallel int = 10240 * 10 // DefaultGlobalGCObjectParallel defines the default max parallel gc objects in SP // system. DefaultGlobalGCObjectParallel int = 4 // DefaultGlobalGCZombieParallel defines the default max parallel gc zonbie pieces // in SP system. DefaultGlobalGCZombieParallel int = 1 // DefaultGlobalGCMetaParallel defines the default max parallel gc meta db in SP // system. DefaultGlobalGCMetaParallel int = 1 // DefaultGlobalRecoveryPieceParallel defines the default max parallel recovery objects in SP // system. DefaultGlobalRecoveryPieceParallel int = 7 // DefaultGlobalMigrateGVGParallel defines the default max parallel migrating gvg in SP system. DefaultGlobalMigrateGVGParallel int = 10 // DefaultGlobalDownloadObjectTaskCacheSize defines the default max cache the download // object tasks in manager. DefaultGlobalDownloadObjectTaskCacheSize int = 4096 // DefaultGlobalChallengePieceTaskCacheSize defines the default max cache the challenge // piece tasks in manager. DefaultGlobalChallengePieceTaskCacheSize int = 4096 // DefaultGlobalBatchGcObjectTimeInterval defines the default interval for generating // gc object task. DefaultGlobalBatchGcObjectTimeInterval int = 1 * 60 // DefaultGlobalGcObjectBlockInterval defines the default blocks number for getting // deleted objects. DefaultGlobalGcObjectBlockInterval uint64 = 1000 // DefaultGlobalGcObjectSafeBlockDistance defines the default distance form current block // height to gc the deleted object. DefaultGlobalGcObjectSafeBlockDistance uint64 = 1000 // DefaultGlobalSyncConsensusInfoInterval defines the default interval for sync the sp // info list to sp db. DefaultGlobalSyncConsensusInfoInterval uint64 = 600 // DefaultStatisticsOutputInterval defines the default interval for output statistics info, // it is used to log and debug. DefaultStatisticsOutputInterval int = 60 // DefaultListenRejectUnSealTimeoutHeight defines the default listen reject unseal object // on greenfield timeout height, if after current block height + timeout height, the object // is not rejected, it is judged failed to reject unseal object on greenfield. DefaultListenRejectUnSealTimeoutHeight int = 10 // DefaultDiscontinueTimeInterval defines the default interval for starting discontinue // buckets task , used for test net. DefaultDiscontinueTimeInterval = 3 * 60 // DefaultDiscontinueBucketKeepAliveDays defines the default bucket keep alive days, after // the interval, buckets will be discontinued, used for test net. DefaultDiscontinueBucketKeepAliveDays = 7 DefaultLoadReplicateTimeout int64 = 60 DefaultLoadSealTimeout int64 = 180 // DefaultSubscribeSPExitEventIntervalSec define the default time interval to subscribe sp exit event from metadata. DefaultSubscribeSPExitEventIntervalSec = 1 // DefaultSubscribeBucketMigrateEventIntervalSec define the default time interval to subscribe bucket migrate event from metadata. DefaultSubscribeBucketMigrateEventIntervalSec = 1 // DefaultSubscribeSwapOutEventIntervalSec define the default time interval to subscribe gvg swap out event from metadata. DefaultSubscribeSwapOutEventIntervalSec = 1 )
const ( ManagerGCBlockNumber = "manager_gc_block_number" ManagerSuccessUpload = "manager_upload_object_success" ManagerFailureUpload = "manager_upload_object_failure" ManagerSuccessReplicate = "manager_replicate_object_success" ManagerFailureReplicate = "manager_replicate_object_failure" ManagerCancelReplicate = "manager_replicate_object_cancel" ManagerSuccessReplicateAndSeal = "manager_replicate_and_seal_object_success" ManagerFailureReplicateAndSeal = "manager_replicate_and_seal_object_failure" ManagerSuccessSeal = "manager_seal_object_success" ManagerFailureSeal = "manager_seal_object_failure" ManagerCancelSeal = "manager_seal_object_cancel" ManagerSuccessConfirmReceive = "manager_confirm_receive_success" ManagerFailureConfirmReceive = "manager_confirm_receive_failure" )
const (
// MaxSrcRunningMigrateGVG defines src sp max running migrate gvg units, and avoid src sp overload.
MaxSrcRunningMigrateGVG = 100
)
Variables ¶
var ( ErrDanglingTask = gfsperrors.Register(module.ManageModularName, http.StatusBadRequest, 60001, "OoooH... request lost") ErrRepeatedTask = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60002, "request repeated") ErrExceedTask = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60003, "OoooH... request exceed, try again later") ErrCanceledTask = gfsperrors.Register(module.ManageModularName, http.StatusBadRequest, 60004, "task canceled") ErrFutureSupport = gfsperrors.Register(module.ManageModularName, http.StatusNotFound, 60005, "future support") ErrNotifyMigrateGVG = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60006, "failed to notify migrate gvg") ErrGfSpDB = gfsperrors.Register(module.ManageModularName, http.StatusInternalServerError, 65201, "server slipped away, try again later") )
Functions ¶
func DefaultManagerOptions ¶
func DefaultManagerOptions(manager *ManageModular, cfg *gfspconfig.GfSpConfig) (err error)
func MakeBucketMigrateKey ¶ added in v0.2.3
func NewManageModular ¶
func NewManageModular(app *gfspapp.GfSpBaseApp, cfg *gfspconfig.GfSpConfig) (coremodule.Modular, error)
Types ¶
type BucketMigrateExecutePlan ¶ added in v0.2.3
type BucketMigrateExecutePlan struct {
// contains filtered or unexported fields
}
func (*BucketMigrateExecutePlan) Start ¶ added in v0.2.3
func (plan *BucketMigrateExecutePlan) Start() error
func (*BucketMigrateExecutePlan) UpdateProgress ¶
func (plan *BucketMigrateExecutePlan) UpdateProgress(task task.MigrateGVGTask) error
UpdateProgress persistent user updates and periodic progress reporting by Executor
type BucketMigrateScheduler ¶ added in v0.2.3
type BucketMigrateScheduler struct {
// contains filtered or unexported fields
}
BucketMigrateScheduler subscribes bucket migrate events and produces a gvg migrate plan. TODO: support multiple buckets migrate
func NewBucketMigrateScheduler ¶ added in v0.2.3
func NewBucketMigrateScheduler(manager *ManageModular) (*BucketMigrateScheduler, error)
NewBucketMigrateScheduler returns a bucket migrate scheduler instance.
func (*BucketMigrateScheduler) Init ¶ added in v0.2.3
func (s *BucketMigrateScheduler) Init(m *ManageModular) error
func (*BucketMigrateScheduler) Start ¶ added in v0.2.3
func (s *BucketMigrateScheduler) Start() error
func (*BucketMigrateScheduler) UpdateMigrateProgress ¶ added in v0.2.3
func (s *BucketMigrateScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error
type GlobalVirtualGroupMigrateExecuteUnit ¶
type GlobalVirtualGroupMigrateExecuteUnit struct {
// contains filtered or unexported fields
}
GlobalVirtualGroupMigrateExecuteUnit define basic migrate unit, which is used by sp exit and bucket migrate.
func (*GlobalVirtualGroupMigrateExecuteUnit) Key ¶
func (u *GlobalVirtualGroupMigrateExecuteUnit) Key() string
Key is used to as primary key.
type GlobalVirtualGroupMigrateExecuteUnitByBucket ¶
type GlobalVirtualGroupMigrateExecuteUnitByBucket struct { GlobalVirtualGroupMigrateExecuteUnit // contains filtered or unexported fields }
func (*GlobalVirtualGroupMigrateExecuteUnitByBucket) Key ¶
func (ub *GlobalVirtualGroupMigrateExecuteUnitByBucket) Key() string
Key is used to as primary key.
type ManageModular ¶
type ManageModular struct {
// contains filtered or unexported fields
}
func (*ManageModular) DispatchTask ¶
func (*ManageModular) FilterGCTask ¶
func (m *ManageModular) FilterGCTask(qTask task.Task) bool
func (*ManageModular) FilterReceiveTask ¶ added in v0.2.3
func (m *ManageModular) FilterReceiveTask(qTask task.Task) bool
func (*ManageModular) FilterUploadingTask ¶
func (m *ManageModular) FilterUploadingTask(qTask task.Task) bool
func (*ManageModular) GCCacheQueue ¶
func (m *ManageModular) GCCacheQueue(qTask task.Task) bool
func (*ManageModular) GCMigrateGVGQueue ¶ added in v0.2.3
func (m *ManageModular) GCMigrateGVGQueue(qTask task.Task) bool
func (*ManageModular) GCReceiveQueue ¶
func (m *ManageModular) GCReceiveQueue(qTask task.Task) bool
func (*ManageModular) GCRecoverQueue ¶ added in v0.2.3
func (m *ManageModular) GCRecoverQueue(qTask task.Task) bool
func (*ManageModular) GCReplicatePieceQueue ¶
func (m *ManageModular) GCReplicatePieceQueue(qTask task.Task) bool
func (*ManageModular) GCResumableUploadObjectQueue ¶ added in v0.2.3
func (m *ManageModular) GCResumableUploadObjectQueue(qTask task.Task) bool
func (*ManageModular) GCSealObjectQueue ¶
func (m *ManageModular) GCSealObjectQueue(qTask task.Task) bool
func (*ManageModular) GCUploadObjectQueue ¶
func (m *ManageModular) GCUploadObjectQueue(qTask task.Task) bool
func (*ManageModular) HandleChallengePieceTask ¶
func (m *ManageModular) HandleChallengePieceTask(ctx context.Context, task task.ChallengePieceTask) error
func (*ManageModular) HandleCreateResumableUploadObjectTask ¶ added in v0.2.3
func (m *ManageModular) HandleCreateResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error
func (*ManageModular) HandleCreateUploadObjectTask ¶
func (m *ManageModular) HandleCreateUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error
func (*ManageModular) HandleDoneResumableUploadObjectTask ¶ added in v0.2.3
func (m *ManageModular) HandleDoneResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error
func (*ManageModular) HandleDoneUploadObjectTask ¶
func (m *ManageModular) HandleDoneUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error
func (*ManageModular) HandleDownloadObjectTask ¶
func (m *ManageModular) HandleDownloadObjectTask(ctx context.Context, task task.DownloadObjectTask) error
func (*ManageModular) HandleGCMetaTask ¶
func (m *ManageModular) HandleGCMetaTask(ctx context.Context, task task.GCMetaTask) error
func (*ManageModular) HandleGCObjectTask ¶
func (m *ManageModular) HandleGCObjectTask(ctx context.Context, gcTask task.GCObjectTask) error
func (*ManageModular) HandleGCZombiePieceTask ¶
func (m *ManageModular) HandleGCZombiePieceTask(ctx context.Context, task task.GCZombiePieceTask) error
func (*ManageModular) HandleMigrateGVGTask ¶ added in v0.2.3
func (m *ManageModular) HandleMigrateGVGTask(ctx context.Context, task task.MigrateGVGTask) error
func (*ManageModular) HandleReceivePieceTask ¶
func (m *ManageModular) HandleReceivePieceTask(ctx context.Context, task task.ReceivePieceTask) error
func (*ManageModular) HandleRecoverPieceTask ¶ added in v0.2.3
func (m *ManageModular) HandleRecoverPieceTask(ctx context.Context, task task.RecoveryPieceTask) error
func (*ManageModular) HandleReplicatePieceTask ¶
func (m *ManageModular) HandleReplicatePieceTask(ctx context.Context, task task.ReplicatePieceTask) error
func (*ManageModular) HandleSealObjectTask ¶
func (m *ManageModular) HandleSealObjectTask(ctx context.Context, task task.SealObjectTask) error
func (*ManageModular) LoadTaskFromDB ¶
func (m *ManageModular) LoadTaskFromDB() error
func (*ManageModular) Name ¶
func (m *ManageModular) Name() string
func (*ManageModular) NotifyMigrateGVG ¶
func (m *ManageModular) NotifyMigrateGVG(ctx context.Context, task task.MigrateGVGTask) error
NotifyMigrateGVG is used to receive migrate gvg task from src sp.
func (*ManageModular) PickUpTask ¶
func (*ManageModular) PickVirtualGroupFamily ¶ added in v0.2.3
func (m *ManageModular) PickVirtualGroupFamily(ctx context.Context, task task.ApprovalCreateBucketTask) (uint32, error)
PickVirtualGroupFamily is used to pick a suitable vgf for creating bucket.
func (*ManageModular) PickVirtualGroupFamilyForBucketMigrate ¶
func (m *ManageModular) PickVirtualGroupFamilyForBucketMigrate(ctx context.Context) (uint32, error)
PickVirtualGroupFamily is used to pick a suitable vgf for creating bucket.
func (*ManageModular) QueryTasks ¶
func (*ManageModular) RejectUnSealObject ¶
func (m *ManageModular) RejectUnSealObject(ctx context.Context, object *storagetypes.ObjectInfo) error
func (*ManageModular) ReleaseResource ¶
func (m *ManageModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan)
func (*ManageModular) ReserveResource ¶
func (m *ManageModular) ReserveResource(ctx context.Context, state *rcmgr.ScopeStat) (rcmgr.ResourceScopeSpan, error)
func (*ManageModular) ResetGCObjectTask ¶
func (m *ManageModular) ResetGCObjectTask(qTask task.Task) bool
func (*ManageModular) Statistics ¶
func (m *ManageModular) Statistics() string
func (*ManageModular) TaskRecovering ¶ added in v0.2.3
func (*ManageModular) TaskUploading ¶
func (*ManageModular) UploadingObjectNumber ¶
func (m *ManageModular) UploadingObjectNumber() int
type MigrateStatus ¶ added in v0.2.3
type MigrateStatus int32
var ( WaitForNotifyDestSP MigrateStatus = 0 NotifiedDestSP MigrateStatus = 1 WaitForMigrate MigrateStatus = 2 Migrating MigrateStatus = 3 Migrated MigrateStatus = 4 )
TODO: refine it, and move to proto. sp exit: WaitForNotifyDestSP->NotifiedDestSP->WaitForMigrate->Migrating->Migrated. bucket migrate: WaitForMigrate(created)->Migrating(schedule success)->Migrated(executor report success).
type MigrateTaskRunner ¶
type MigrateTaskRunner struct {
// contains filtered or unexported fields
}
MigrateTaskRunner is used to manage task migrate progress/status in dest sp.
func (*MigrateTaskRunner) AddNewMigrateGVGUnit ¶
func (runner *MigrateTaskRunner) AddNewMigrateGVGUnit(remotedGVGUnit *GlobalVirtualGroupMigrateExecuteUnit) error
func (*MigrateTaskRunner) Init ¶
func (runner *MigrateTaskRunner) Init() error
func (*MigrateTaskRunner) Start ¶
func (runner *MigrateTaskRunner) Start() error
func (*MigrateTaskRunner) UpdateMigrateGVGLastMigratedObjectID ¶
func (runner *MigrateTaskRunner) UpdateMigrateGVGLastMigratedObjectID(migrateKey string, lastMigratedObjectID uint64) error
func (*MigrateTaskRunner) UpdateMigrateGVGStatus ¶
func (runner *MigrateTaskRunner) UpdateMigrateGVGStatus(migrateKey string, st MigrateStatus) error
type NotifiedDestSPIterator ¶
type NotifiedDestSPIterator struct {
// contains filtered or unexported fields
}
NotifiedDestSPIterator is used to check dest migrating units.
func NewNotifiedDestSPIterator ¶
func NewNotifiedDestSPIterator(plan *SPExitExecutePlan) *NotifiedDestSPIterator
func (*NotifiedDestSPIterator) Next ¶
func (mi *NotifiedDestSPIterator) Next()
func (*NotifiedDestSPIterator) SeekToFirst ¶
func (mi *NotifiedDestSPIterator) SeekToFirst()
func (*NotifiedDestSPIterator) Valid ¶
func (mi *NotifiedDestSPIterator) Valid() bool
func (*NotifiedDestSPIterator) Value ¶
func (mi *NotifiedDestSPIterator) Value() *GlobalVirtualGroupMigrateExecuteUnit
type PickDestSPFilter ¶ added in v0.2.3
type PickDestSPFilter struct {
// contains filtered or unexported fields
}
PickDestSPFilter is used to pick sp id which is not in excluded sp ids.
func NewPickDestSPFilterWithMap ¶ added in v0.2.3
func NewPickDestSPFilterWithMap(m map[uint32]int) *PickDestSPFilter
func NewPickDestSPFilterWithSlice ¶ added in v0.2.3
func NewPickDestSPFilterWithSlice(s []uint32) *PickDestSPFilter
func (*PickDestSPFilter) Check ¶ added in v0.2.3
func (f *PickDestSPFilter) Check(spID uint32) bool
type SPExitExecutePlan ¶
type SPExitExecutePlan struct { // runningMigrateGVG int // TODO: refine it. PrimaryVGFMigrateUnits []*VirtualGroupFamilyMigrateExecuteUnit // sp exit, primary family, include gvg list, maybe has conflicted. SecondaryGVGMigrateUnits []*GlobalVirtualGroupMigrateExecuteUnit // sp exit, secondary gvg // for scheduling, the slice only can append to ensure iterator work fine. WaitForNotifyDestSPMutex sync.RWMutex WaitForNotifyDestSPGVGs []*GlobalVirtualGroupMigrateExecuteUnit NotifiedDestSPMutex sync.RWMutex NotifiedDestSPGVGs []*GlobalVirtualGroupMigrateExecuteUnit // contains filtered or unexported fields }
SPExitExecutePlan is used to record the execution of subtasks in src sp.
func (*SPExitExecutePlan) Start ¶
func (plan *SPExitExecutePlan) Start() error
Start persist plan and task to db and task dispatcher
type SPExitScheduler ¶ added in v0.2.3
type SPExitScheduler struct {
// contains filtered or unexported fields
}
SPExitScheduler is used to manage and schedule sp exit process.
func NewSPExitScheduler ¶ added in v0.2.3
func NewSPExitScheduler(manager *ManageModular) (*SPExitScheduler, error)
NewSPExitScheduler returns a sp exit scheduler instance.
func (*SPExitScheduler) AddNewMigrateGVGUnit ¶
func (s *SPExitScheduler) AddNewMigrateGVGUnit(remotedUnit *GlobalVirtualGroupMigrateExecuteUnit) error
func (*SPExitScheduler) Init ¶ added in v0.2.3
func (s *SPExitScheduler) Init(m *ManageModular) error
Init function is used to load db subscribe block progress and migrate gvg progress.
func (*SPExitScheduler) Start ¶ added in v0.2.3
func (s *SPExitScheduler) Start() error
Start function is used to subscribe sp exit event from metadata and produces a gvg migrate plan.
func (*SPExitScheduler) UpdateMigrateProgress ¶ added in v0.2.3
func (s *SPExitScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error
type VirtualGroupFamilyMigrateExecuteUnit ¶
type VirtualGroupFamilyMigrateExecuteUnit struct {
// contains filtered or unexported fields
}
func NewVirtualGroupFamilyMigrateExecuteUnit ¶
func NewVirtualGroupFamilyMigrateExecuteUnit(vgf *virtualgrouptypes.GlobalVirtualGroupFamily, selfSP *sptypes.StorageProvider) *VirtualGroupFamilyMigrateExecuteUnit
type WaitForNotifyDestSPIterator ¶
type WaitForNotifyDestSPIterator struct {
// contains filtered or unexported fields
}
WaitForNotifyDestSPIterator is used to notify migrate units to dest sp.
func NewWaitForNotifyDestSPIterator ¶
func NewWaitForNotifyDestSPIterator(plan *SPExitExecutePlan) *WaitForNotifyDestSPIterator
func (*WaitForNotifyDestSPIterator) Next ¶
func (ti *WaitForNotifyDestSPIterator) Next()
func (*WaitForNotifyDestSPIterator) SeekToFirst ¶
func (ti *WaitForNotifyDestSPIterator) SeekToFirst()
func (*WaitForNotifyDestSPIterator) Valid ¶
func (ti *WaitForNotifyDestSPIterator) Valid() bool
func (*WaitForNotifyDestSPIterator) Value ¶
func (ti *WaitForNotifyDestSPIterator) Value() *GlobalVirtualGroupMigrateExecuteUnit