manager

package
v0.2.3-alpha.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 13, 2023 License: GPL-3.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DiscontinueBucketReason defines the reason for stop serving
	DiscontinueBucketReason = "testnet cleanup"

	// DiscontinueBucketLimit define the max buckets to fetch in a single request
	DiscontinueBucketLimit = int64(500)

	// RejectUnSealObjectRetry defines the retry number of sending reject unseal object tx.
	RejectUnSealObjectRetry = 3

	// RejectUnSealObjectTimeout defines the timeout of sending reject unseal object tx.
	RejectUnSealObjectTimeout = 3
)
View Source
const (
	// DefaultGlobalMaxUploadingNumber defines the default max uploading object number
	// in SP, include: uploading object to primary, replicate object to secondaries,
	// and sealing object on greenfield.
	DefaultGlobalMaxUploadingNumber int = 40960
	// DefaultGlobalUploadObjectParallel defines the default max parallel uploading
	// objects to primary in SP system.
	DefaultGlobalUploadObjectParallel int = 10240
	// DefaultGlobalReplicatePieceParallel defines the default max parallel replicating
	// objects to primary in SP system.
	DefaultGlobalReplicatePieceParallel int = 10240
	// DefaultGlobalSealObjectParallel defines the default max parallel sealing objects
	// on greenfield in SP system.
	DefaultGlobalSealObjectParallel int = 10240
	// DefaultGlobalReceiveObjectParallel defines the default max parallel confirming
	// receive pieces on greenfield in SP system.
	DefaultGlobalReceiveObjectParallel int = 10240 * 10
	// DefaultGlobalGCObjectParallel defines the default max parallel gc objects in SP
	// system.
	DefaultGlobalGCObjectParallel int = 4
	// DefaultGlobalGCZombieParallel defines the default max parallel gc zonbie pieces
	// in SP system.
	DefaultGlobalGCZombieParallel int = 1
	// DefaultGlobalGCMetaParallel defines the default max parallel gc meta db in SP
	// system.
	DefaultGlobalGCMetaParallel int = 1
	// 	DefaultGlobalRecoveryPieceParallel defines the default max parallel recovery objects in SP
	// system.
	DefaultGlobalRecoveryPieceParallel int = 7
	// DefaultGlobalMigrateGVGParallel defines the default max parallel migrating gvg in SP system.
	DefaultGlobalMigrateGVGParallel int = 10
	// DefaultGlobalDownloadObjectTaskCacheSize defines the default max cache the download
	// object tasks in manager.
	DefaultGlobalDownloadObjectTaskCacheSize int = 4096
	// DefaultGlobalChallengePieceTaskCacheSize defines the default max cache the challenge
	// piece tasks in manager.
	DefaultGlobalChallengePieceTaskCacheSize int = 4096
	// DefaultGlobalBatchGcObjectTimeInterval defines the default interval for generating
	// gc object task.
	DefaultGlobalBatchGcObjectTimeInterval int = 1 * 60
	// DefaultGlobalGcObjectBlockInterval defines the default blocks number for getting
	// deleted objects.
	DefaultGlobalGcObjectBlockInterval uint64 = 1000
	// DefaultGlobalGcObjectSafeBlockDistance defines the default distance form current block
	// height to gc the deleted object.
	DefaultGlobalGcObjectSafeBlockDistance uint64 = 1000
	// DefaultGlobalSyncConsensusInfoInterval defines the default interval for sync the sp
	// info list to sp db.
	DefaultGlobalSyncConsensusInfoInterval uint64 = 600
	// DefaultStatisticsOutputInterval defines the default interval for output statistics info,
	// it is used to log and debug.
	DefaultStatisticsOutputInterval int = 60
	// DefaultListenRejectUnSealTimeoutHeight defines the default listen reject unseal object
	// on greenfield timeout height, if after current block height + timeout height, the object
	// is not rejected, it is judged failed to reject unseal object on greenfield.
	DefaultListenRejectUnSealTimeoutHeight int = 10

	// DefaultDiscontinueTimeInterval defines the default interval for starting discontinue
	// buckets task , used for test net.
	DefaultDiscontinueTimeInterval = 3 * 60
	// DefaultDiscontinueBucketKeepAliveDays defines the default bucket keep alive days, after
	// the interval, buckets will be discontinued, used for test net.
	DefaultDiscontinueBucketKeepAliveDays = 7

	DefaultLoadReplicateTimeout int64 = 60
	DefaultLoadSealTimeout      int64 = 180
	// DefaultSubscribeSPExitEventIntervalSec define the default time interval to subscribe sp exit event from metadata.
	DefaultSubscribeSPExitEventIntervalSec = 1
	// DefaultSubscribeBucketMigrateEventIntervalSec define the default time interval to subscribe bucket migrate event from metadata.
	DefaultSubscribeBucketMigrateEventIntervalSec = 1
	// DefaultSubscribeSwapOutEventIntervalSec define the default time interval to subscribe gvg swap out event from metadata.
	DefaultSubscribeSwapOutEventIntervalSec = 1
)
View Source
const (
	ManagerGCBlockNumber           = "manager_gc_block_number"
	ManagerSuccessUpload           = "manager_upload_object_success"
	ManagerFailureUpload           = "manager_upload_object_failure"
	ManagerSuccessReplicate        = "manager_replicate_object_success"
	ManagerFailureReplicate        = "manager_replicate_object_failure"
	ManagerCancelReplicate         = "manager_replicate_object_cancel"
	ManagerSuccessReplicateAndSeal = "manager_replicate_and_seal_object_success"
	ManagerFailureReplicateAndSeal = "manager_replicate_and_seal_object_failure"
	ManagerSuccessSeal             = "manager_seal_object_success"
	ManagerFailureSeal             = "manager_seal_object_failure"
	ManagerCancelSeal              = "manager_seal_object_cancel"
	ManagerSuccessConfirmReceive   = "manager_confirm_receive_success"
	ManagerFailureConfirmReceive   = "manager_confirm_receive_failure"
)
View Source
const (
	// MaxSrcRunningMigrateGVG defines src sp max running migrate gvg units, and avoid src sp overload.
	MaxSrcRunningMigrateGVG = 100
)

Variables

View Source
var (
	ErrDanglingTask     = gfsperrors.Register(module.ManageModularName, http.StatusBadRequest, 60001, "OoooH... request lost")
	ErrRepeatedTask     = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60002, "request repeated")
	ErrExceedTask       = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60003, "OoooH... request exceed, try again later")
	ErrCanceledTask     = gfsperrors.Register(module.ManageModularName, http.StatusBadRequest, 60004, "task canceled")
	ErrFutureSupport    = gfsperrors.Register(module.ManageModularName, http.StatusNotFound, 60005, "future support")
	ErrNotifyMigrateGVG = gfsperrors.Register(module.ManageModularName, http.StatusNotAcceptable, 60006, "failed to notify migrate gvg")
	ErrGfSpDB           = gfsperrors.Register(module.ManageModularName, http.StatusInternalServerError, 65201, "server slipped away, try again later")
)

Functions

func DefaultManagerOptions

func DefaultManagerOptions(manager *ManageModular, cfg *gfspconfig.GfSpConfig) (err error)

func MakeBucketMigrateKey added in v0.2.3

func MakeBucketMigrateKey(bucketID uint64, gvgID uint32) string

func MakeRemotedGVGMigrateKey

func MakeRemotedGVGMigrateKey(gvgID uint32, vgfID uint32, redundancyIndex int32) string

func MakeSecondaryGVGMigrateKey

func MakeSecondaryGVGMigrateKey(gvgID uint32, vgfID uint32, redundancyIndex int32) string

func NewManageModular

func NewManageModular(app *gfspapp.GfSpBaseApp, cfg *gfspconfig.GfSpConfig) (coremodule.Modular, error)

Types

type BucketMigrateExecutePlan added in v0.2.3

type BucketMigrateExecutePlan struct {
	// contains filtered or unexported fields
}

func (*BucketMigrateExecutePlan) Start added in v0.2.3

func (plan *BucketMigrateExecutePlan) Start() error

func (*BucketMigrateExecutePlan) UpdateProgress

func (plan *BucketMigrateExecutePlan) UpdateProgress(task task.MigrateGVGTask) error

UpdateProgress persistent user updates and periodic progress reporting by Executor

type BucketMigrateScheduler added in v0.2.3

type BucketMigrateScheduler struct {
	// contains filtered or unexported fields
}

BucketMigrateScheduler subscribes bucket migrate events and produces a gvg migrate plan. TODO: support multiple buckets migrate

func NewBucketMigrateScheduler added in v0.2.3

func NewBucketMigrateScheduler(manager *ManageModular) (*BucketMigrateScheduler, error)

NewBucketMigrateScheduler returns a bucket migrate scheduler instance.

func (*BucketMigrateScheduler) Init added in v0.2.3

func (*BucketMigrateScheduler) Start added in v0.2.3

func (s *BucketMigrateScheduler) Start() error

func (*BucketMigrateScheduler) UpdateMigrateProgress added in v0.2.3

func (s *BucketMigrateScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error

type GlobalVirtualGroupMigrateExecuteUnit

type GlobalVirtualGroupMigrateExecuteUnit struct {
	// contains filtered or unexported fields
}

GlobalVirtualGroupMigrateExecuteUnit define basic migrate unit, which is used by sp exit and bucket migrate.

func (*GlobalVirtualGroupMigrateExecuteUnit) Key

Key is used to as primary key.

type GlobalVirtualGroupMigrateExecuteUnitByBucket

type GlobalVirtualGroupMigrateExecuteUnitByBucket struct {
	GlobalVirtualGroupMigrateExecuteUnit
	// contains filtered or unexported fields
}

func (*GlobalVirtualGroupMigrateExecuteUnitByBucket) Key

Key is used to as primary key.

type ManageModular

type ManageModular struct {
	// contains filtered or unexported fields
}

func (*ManageModular) DispatchTask

func (m *ManageModular) DispatchTask(ctx context.Context, limit rcmgr.Limit) (task.Task, error)

func (*ManageModular) FilterGCTask

func (m *ManageModular) FilterGCTask(qTask task.Task) bool

func (*ManageModular) FilterReceiveTask added in v0.2.3

func (m *ManageModular) FilterReceiveTask(qTask task.Task) bool

func (*ManageModular) FilterUploadingTask

func (m *ManageModular) FilterUploadingTask(qTask task.Task) bool

func (*ManageModular) GCCacheQueue

func (m *ManageModular) GCCacheQueue(qTask task.Task) bool

func (*ManageModular) GCMigrateGVGQueue added in v0.2.3

func (m *ManageModular) GCMigrateGVGQueue(qTask task.Task) bool

func (*ManageModular) GCReceiveQueue

func (m *ManageModular) GCReceiveQueue(qTask task.Task) bool

func (*ManageModular) GCRecoverQueue added in v0.2.3

func (m *ManageModular) GCRecoverQueue(qTask task.Task) bool

func (*ManageModular) GCReplicatePieceQueue

func (m *ManageModular) GCReplicatePieceQueue(qTask task.Task) bool

func (*ManageModular) GCResumableUploadObjectQueue added in v0.2.3

func (m *ManageModular) GCResumableUploadObjectQueue(qTask task.Task) bool

func (*ManageModular) GCSealObjectQueue

func (m *ManageModular) GCSealObjectQueue(qTask task.Task) bool

func (*ManageModular) GCUploadObjectQueue

func (m *ManageModular) GCUploadObjectQueue(qTask task.Task) bool

func (*ManageModular) HandleChallengePieceTask

func (m *ManageModular) HandleChallengePieceTask(ctx context.Context, task task.ChallengePieceTask) error

func (*ManageModular) HandleCreateResumableUploadObjectTask added in v0.2.3

func (m *ManageModular) HandleCreateResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error

func (*ManageModular) HandleCreateUploadObjectTask

func (m *ManageModular) HandleCreateUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error

func (*ManageModular) HandleDoneResumableUploadObjectTask added in v0.2.3

func (m *ManageModular) HandleDoneResumableUploadObjectTask(ctx context.Context, task task.ResumableUploadObjectTask) error

func (*ManageModular) HandleDoneUploadObjectTask

func (m *ManageModular) HandleDoneUploadObjectTask(ctx context.Context, task task.UploadObjectTask) error

func (*ManageModular) HandleDownloadObjectTask

func (m *ManageModular) HandleDownloadObjectTask(ctx context.Context, task task.DownloadObjectTask) error

func (*ManageModular) HandleGCMetaTask

func (m *ManageModular) HandleGCMetaTask(ctx context.Context, task task.GCMetaTask) error

func (*ManageModular) HandleGCObjectTask

func (m *ManageModular) HandleGCObjectTask(ctx context.Context, gcTask task.GCObjectTask) error

func (*ManageModular) HandleGCZombiePieceTask

func (m *ManageModular) HandleGCZombiePieceTask(ctx context.Context, task task.GCZombiePieceTask) error

func (*ManageModular) HandleMigrateGVGTask added in v0.2.3

func (m *ManageModular) HandleMigrateGVGTask(ctx context.Context, task task.MigrateGVGTask) error

func (*ManageModular) HandleReceivePieceTask

func (m *ManageModular) HandleReceivePieceTask(ctx context.Context, task task.ReceivePieceTask) error

func (*ManageModular) HandleRecoverPieceTask added in v0.2.3

func (m *ManageModular) HandleRecoverPieceTask(ctx context.Context, task task.RecoveryPieceTask) error

func (*ManageModular) HandleReplicatePieceTask

func (m *ManageModular) HandleReplicatePieceTask(ctx context.Context, task task.ReplicatePieceTask) error

func (*ManageModular) HandleSealObjectTask

func (m *ManageModular) HandleSealObjectTask(ctx context.Context, task task.SealObjectTask) error

func (*ManageModular) LoadTaskFromDB

func (m *ManageModular) LoadTaskFromDB() error

func (*ManageModular) Name

func (m *ManageModular) Name() string

func (*ManageModular) NotifyMigrateGVG

func (m *ManageModular) NotifyMigrateGVG(ctx context.Context, task task.MigrateGVGTask) error

NotifyMigrateGVG is used to receive migrate gvg task from src sp.

func (*ManageModular) PickUpTask

func (m *ManageModular) PickUpTask(ctx context.Context, tasks []task.Task) (task.Task, []task.Task)

func (*ManageModular) PickVirtualGroupFamily added in v0.2.3

func (m *ManageModular) PickVirtualGroupFamily(ctx context.Context, task task.ApprovalCreateBucketTask) (uint32, error)

PickVirtualGroupFamily is used to pick a suitable vgf for creating bucket.

func (*ManageModular) PickVirtualGroupFamilyForBucketMigrate

func (m *ManageModular) PickVirtualGroupFamilyForBucketMigrate(ctx context.Context) (uint32, error)

PickVirtualGroupFamily is used to pick a suitable vgf for creating bucket.

func (*ManageModular) QueryTasks

func (m *ManageModular) QueryTasks(ctx context.Context, subKey task.TKey) ([]task.Task, error)

func (*ManageModular) RejectUnSealObject

func (m *ManageModular) RejectUnSealObject(ctx context.Context, object *storagetypes.ObjectInfo) error

func (*ManageModular) ReleaseResource

func (m *ManageModular) ReleaseResource(ctx context.Context, span rcmgr.ResourceScopeSpan)

func (*ManageModular) ReserveResource

func (m *ManageModular) ReserveResource(ctx context.Context, state *rcmgr.ScopeStat) (rcmgr.ResourceScopeSpan, error)

func (*ManageModular) ResetGCObjectTask

func (m *ManageModular) ResetGCObjectTask(qTask task.Task) bool

func (*ManageModular) Start

func (m *ManageModular) Start(ctx context.Context) error

func (*ManageModular) Statistics

func (m *ManageModular) Statistics() string

func (*ManageModular) Stop

func (m *ManageModular) Stop(ctx context.Context) error

func (*ManageModular) TaskRecovering added in v0.2.3

func (m *ManageModular) TaskRecovering(ctx context.Context, task task.Task) bool

func (*ManageModular) TaskUploading

func (m *ManageModular) TaskUploading(ctx context.Context, task task.Task) bool

func (*ManageModular) UploadingObjectNumber

func (m *ManageModular) UploadingObjectNumber() int

type MigrateStatus added in v0.2.3

type MigrateStatus int32
var (
	WaitForNotifyDestSP MigrateStatus = 0
	NotifiedDestSP      MigrateStatus = 1
	WaitForMigrate      MigrateStatus = 2
	Migrating           MigrateStatus = 3
	Migrated            MigrateStatus = 4
)

TODO: refine it, and move to proto. sp exit: WaitForNotifyDestSP->NotifiedDestSP->WaitForMigrate->Migrating->Migrated. bucket migrate: WaitForMigrate(created)->Migrating(schedule success)->Migrated(executor report success).

type MigrateTaskRunner

type MigrateTaskRunner struct {
	// contains filtered or unexported fields
}

MigrateTaskRunner is used to manage task migrate progress/status in dest sp.

func (*MigrateTaskRunner) AddNewMigrateGVGUnit

func (runner *MigrateTaskRunner) AddNewMigrateGVGUnit(remotedGVGUnit *GlobalVirtualGroupMigrateExecuteUnit) error

func (*MigrateTaskRunner) Init

func (runner *MigrateTaskRunner) Init() error

func (*MigrateTaskRunner) Start

func (runner *MigrateTaskRunner) Start() error

func (*MigrateTaskRunner) UpdateMigrateGVGLastMigratedObjectID

func (runner *MigrateTaskRunner) UpdateMigrateGVGLastMigratedObjectID(migrateKey string, lastMigratedObjectID uint64) error

func (*MigrateTaskRunner) UpdateMigrateGVGStatus

func (runner *MigrateTaskRunner) UpdateMigrateGVGStatus(migrateKey string, st MigrateStatus) error

type NotifiedDestSPIterator

type NotifiedDestSPIterator struct {
	// contains filtered or unexported fields
}

NotifiedDestSPIterator is used to check dest migrating units.

func NewNotifiedDestSPIterator

func NewNotifiedDestSPIterator(plan *SPExitExecutePlan) *NotifiedDestSPIterator

func (*NotifiedDestSPIterator) Next

func (mi *NotifiedDestSPIterator) Next()

func (*NotifiedDestSPIterator) SeekToFirst

func (mi *NotifiedDestSPIterator) SeekToFirst()

func (*NotifiedDestSPIterator) Valid

func (mi *NotifiedDestSPIterator) Valid() bool

func (*NotifiedDestSPIterator) Value

type PickDestSPFilter added in v0.2.3

type PickDestSPFilter struct {
	// contains filtered or unexported fields
}

PickDestSPFilter is used to pick sp id which is not in excluded sp ids.

func NewPickDestSPFilterWithMap added in v0.2.3

func NewPickDestSPFilterWithMap(m map[uint32]int) *PickDestSPFilter

func NewPickDestSPFilterWithSlice added in v0.2.3

func NewPickDestSPFilterWithSlice(s []uint32) *PickDestSPFilter

func (*PickDestSPFilter) Check added in v0.2.3

func (f *PickDestSPFilter) Check(spID uint32) bool

type SPExitExecutePlan

type SPExitExecutePlan struct {

	// runningMigrateGVG        int                                     // TODO: refine it.
	PrimaryVGFMigrateUnits   []*VirtualGroupFamilyMigrateExecuteUnit // sp exit, primary family, include gvg list, maybe has conflicted.
	SecondaryGVGMigrateUnits []*GlobalVirtualGroupMigrateExecuteUnit // sp exit, secondary gvg

	// for scheduling, the slice only can append to ensure iterator work fine.
	WaitForNotifyDestSPMutex sync.RWMutex
	WaitForNotifyDestSPGVGs  []*GlobalVirtualGroupMigrateExecuteUnit
	NotifiedDestSPMutex      sync.RWMutex
	NotifiedDestSPGVGs       []*GlobalVirtualGroupMigrateExecuteUnit
	// contains filtered or unexported fields
}

SPExitExecutePlan is used to record the execution of subtasks in src sp.

func (*SPExitExecutePlan) Init

func (plan *SPExitExecutePlan) Init() error

Init load from db.

func (*SPExitExecutePlan) Start

func (plan *SPExitExecutePlan) Start() error

Start persist plan and task to db and task dispatcher

type SPExitScheduler added in v0.2.3

type SPExitScheduler struct {
	// contains filtered or unexported fields
}

SPExitScheduler is used to manage and schedule sp exit process.

func NewSPExitScheduler added in v0.2.3

func NewSPExitScheduler(manager *ManageModular) (*SPExitScheduler, error)

NewSPExitScheduler returns a sp exit scheduler instance.

func (*SPExitScheduler) AddNewMigrateGVGUnit

func (s *SPExitScheduler) AddNewMigrateGVGUnit(remotedUnit *GlobalVirtualGroupMigrateExecuteUnit) error

func (*SPExitScheduler) Init added in v0.2.3

func (s *SPExitScheduler) Init(m *ManageModular) error

Init function is used to load db subscribe block progress and migrate gvg progress.

func (*SPExitScheduler) Start added in v0.2.3

func (s *SPExitScheduler) Start() error

Start function is used to subscribe sp exit event from metadata and produces a gvg migrate plan.

func (*SPExitScheduler) UpdateMigrateProgress added in v0.2.3

func (s *SPExitScheduler) UpdateMigrateProgress(task task.MigrateGVGTask) error

type VirtualGroupFamilyMigrateExecuteUnit

type VirtualGroupFamilyMigrateExecuteUnit struct {
	// contains filtered or unexported fields
}

type WaitForNotifyDestSPIterator

type WaitForNotifyDestSPIterator struct {
	// contains filtered or unexported fields
}

WaitForNotifyDestSPIterator is used to notify migrate units to dest sp.

func NewWaitForNotifyDestSPIterator

func NewWaitForNotifyDestSPIterator(plan *SPExitExecutePlan) *WaitForNotifyDestSPIterator

func (*WaitForNotifyDestSPIterator) Next

func (ti *WaitForNotifyDestSPIterator) Next()

func (*WaitForNotifyDestSPIterator) SeekToFirst

func (ti *WaitForNotifyDestSPIterator) SeekToFirst()

func (*WaitForNotifyDestSPIterator) Valid

func (ti *WaitForNotifyDestSPIterator) Valid() bool

func (*WaitForNotifyDestSPIterator) Value

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL