Documentation ¶
Index ¶
- Constants
- Variables
- func ErrConsensusWithDetail(detail string) *gfsperrors.GfSpError
- func ErrGfSpDBWithDetail(detail string) *gfsperrors.GfSpError
- func ErrPieceStoreWithDetail(detail string) *gfsperrors.GfSpError
- func NewExecuteModular(app *gfspapp.GfSpBaseApp, cfg *gfspconfig.GfSpConfig) (coremodule.Modular, error)
- type ExecuteModular
- func (e *ExecuteModular) AskTask(ctx context.Context) error
- func (e *ExecuteModular) HandleGCBucketMigrationBucket(ctx context.Context, task coretask.GCBucketMigrationTask)
- func (e *ExecuteModular) HandleGCMetaTask(ctx context.Context, task coretask.GCMetaTask)
- func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.GCObjectTask)
- func (e *ExecuteModular) HandleGCStaleVersionObjectTask(ctx context.Context, task coretask.GCStaleVersionObjectTask)
- func (e *ExecuteModular) HandleGCZombiePieceTask(ctx context.Context, task coretask.GCZombiePieceTask)
- func (e *ExecuteModular) HandleMigrateGVGTask(ctx context.Context, gvgTask coretask.MigrateGVGTask)
- func (e *ExecuteModular) HandleMigratePieceTask(ctx context.Context, gvgTask *gfsptask.GfSpMigrateGVGTask, ...) error
- func (e *ExecuteModular) HandleReceivePieceTask(ctx context.Context, task coretask.ReceivePieceTask)
- func (e *ExecuteModular) HandleRecoverPieceTask(ctx context.Context, task coretask.RecoveryPieceTask)
- func (e *ExecuteModular) HandleReplicatePieceTask(ctx context.Context, task coretask.ReplicatePieceTask)
- func (e *ExecuteModular) HandleSealObjectTask(ctx context.Context, task coretask.SealObjectTask)
- func (e *ExecuteModular) Name() string
- func (e *ExecuteModular) ReleaseResource(ctx context.Context, span corercmgr.ResourceScopeSpan)
- func (e *ExecuteModular) ReportTask(ctx context.Context, task coretask.Task) (err error)
- func (e *ExecuteModular) ReserveResource(ctx context.Context, st *corercmgr.ScopeStat) (corercmgr.ResourceScopeSpan, error)
- func (e *ExecuteModular) Start(ctx context.Context) error
- func (e *ExecuteModular) Statistics() string
- func (e *ExecuteModular) Stop(ctx context.Context) error
- type GCWorker
Constants ¶
const ( // DefaultExecutorMaxExecuteNum defines the default max parallel execute task number. DefaultExecutorMaxExecuteNum int64 = 64 // DefaultExecutorAskTaskInterval defines the default ask task interval from manager. DefaultExecutorAskTaskInterval int = 1 // DefaultExecutorAskReplicateApprovalTimeout defines the ask replicate piece approval // timeout that send the request to the p2p node, DefaultExecutorAskReplicateApprovalTimeout int64 = 4 // DefaultExecutorAskReplicateApprovalExFactor defines the expanded factor for asking // secondary SP. // Example: need data chunk + data parity chunk numbers SPs as secondary, consider fault // tolerance, should collect (data chunk + data parity chunk) * factor numbers SPs as // backup secondary, if some of these are failed to replicate can pick up again from // backups. So it is always bigger than 1.0. DefaultExecutorAskReplicateApprovalExFactor float64 = 1.0 // DefaultExecutorListenSealTimeoutHeight defines the default listen seal object on // greenfield timeout height, if after current block height + timeout height, the object // is not sealed, it is judged failed to seal object on greenfield. DefaultExecutorListenSealTimeoutHeight int = 10 // DefaultExecutorListenSealRetryTimeout defines the sleep time when listen seal object // fail, until retry ExecutorMaxListenSealRetry times, the task is set error. DefaultExecutorListenSealRetryTimeout int = 2 // DefaultExecutorMaxListenSealRetry defines the default max retry number for listening // object. DefaultExecutorMaxListenSealRetry int = 3 // DefaultExecutorObjectMigrationRetryTimeout defines the sleep time when object migration // fail, until retry DefaultExecutorMaxObjectMigrationRetry times, the task is set error. DefaultExecutorObjectMigrationRetryTimeout int = 2 // DefaultExecutorMaxObjectMigrationRetry defines the default max retry number for object migration. DefaultExecutorMaxObjectMigrationRetry int = 5 // DefaultStatisticsOutputInterval defines the default interval for output statistics info, // it is used to log and debug. DefaultStatisticsOutputInterval int = 60 // DefaultSleepInterval defines the sleep interval when failed to ask task // it is millisecond level DefaultSleepInterval = 100 // DefaultExecutorBucketTrafficKeepTimeDay defines the default max keep bucket traffic time for gc meta bucket traffic. DefaultExecutorBucketTrafficKeepTimeDay uint64 = 30 * 3 // DefaultExecutorReadRecordKeepTimeDay defines the default max keep read record time for gc meta read record. DefaultExecutorReadRecordKeepTimeDay uint64 = 30 // DefaultExecutorReadRecordDeleteLimit defines the default deletion limit for delete gc meta read record. DefaultExecutorReadRecordDeleteLimit uint64 = 100 )
const ( ExecutorSuccessAskTask = "executor_ask_task_success" ExecutorRunTask = "executor_run_task" ExecutorFailureAskTask = "executor_ask_task_failure" ExecutorFailureAskNoTask = "executor_ask_no_task_failure" ExecutorSuccessReplicateTask = "executor_replicate_task_success" ExecutorFailureReplicateTask = "executor_replicate_task_failure" ExecutorSuccessSealObjectTask = "executor_seal_object_task_success" ExecutorFailureSealObjectTask = "executor_seal_object_task_failure" ExecutorSuccessReceiveTask = "executor_receive_task_success" ExecutorFailureReceiveTask = "executor_receive_task_failure" ExecutorSuccessRecoveryTask = "executor_recovery_task_success" ExecutorFailureRecoveryTask = "executor_recovery_task_failure" ExecutorSuccessReportTask = "executor_report_task_to_manager_success" ExecutorFailureReportTask = "executor_report_task_to_manager_failure" ExecutorSuccessP2P = "executor_p2p_success" ExecutorFailureP2P = "executor_p2p_failure" ExecutorSuccessReplicateAllPiece = "executor_replicate_all_piece_success" ExecutorFailureReplicateAllPiece = "executor_replicate_all_piece_failure" ExecutorSuccessReplicateOnePiece = "executor_replicate_one_piece_success" ExecutorFailureReplicateOnePiece = "executor_replicate_one_piece_failure" ExecutorSuccessDoneReplicatePiece = "executor_done_replicate_piece_success" ExecutorFailureDoneReplicatePiece = "executor_done_replicate_piece_failure" ExecutorSuccessSealObject = "executor_seal_object_success" ExecutorFailureSealObject = "executor_seal_object_failure" )
Variables ¶
var ( RtyAttNum = uint(3) RtyAttem = retry.Attempts(RtyAttNum) RtyDelay = retry.Delay(time.Millisecond * 500) RtyErr = retry.LastErrorOnly(true) )
var ( ErrDanglingPointer = gfsperrors.Register(module.ExecuteModularName, http.StatusBadRequest, 40001, "OoooH.... request lost") ErrInsufficientApproval = gfsperrors.Register(module.ExecuteModularName, http.StatusNotFound, 40002, "insufficient approvals from p2p") ErrUnsealed = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 40003, "seal object on chain failed") ErrExhaustedApproval = gfsperrors.Register(module.ExecuteModularName, http.StatusNotFound, 40004, "approvals exhausted") ErrInvalidIntegrity = gfsperrors.Register(module.ExecuteModularName, http.StatusNotAcceptable, 40005, "secondary integrity hash verification failed") ErrSecondaryMismatch = gfsperrors.Register(module.ExecuteModularName, http.StatusNotAcceptable, 40006, "secondary sp mismatch") ErrReplicateIdsOutOfBounds = gfsperrors.Register(module.ExecuteModularName, http.StatusNotAcceptable, 40007, "replicate idx out of bounds") ErrRecoveryRedundancyType = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45202, "recovery only support EC redundancy type") ErrRecoveryPieceNotEnough = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45203, "failed to get enough piece data to recovery") ErrRecoveryDecode = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45204, "EC decode error") ErrRecoveryPieceChecksum = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45206, "recovery checksum not correct") ErrRecoveryPieceLength = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45207, "get secondary piece data length error") ErrPrimaryNotFound = gfsperrors.Register(module.ExecuteModularName, http.StatusNotAcceptable, 45208, "primary sp endpoint not found when recovering") ErrRecoveryPieceIndex = gfsperrors.Register(module.ExecuteModularName, http.StatusNotAcceptable, 45209, "recovery piece index invalid") ErrMigratedPieceChecksum = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45210, "migrate piece checksum is not correct") ErrInvalidRedundancyIndex = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45212, "invalid redundancy index") ErrSetObjectIntegrity = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45213, "failed to set object integrity into spdb") ErrInvalidPieceChecksumLength = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45214, "invalid piece checksum length") ErrRecoveryObjectStatus = gfsperrors.Register(module.ExecuteModularName, http.StatusNotAcceptable, 45215, "the recovered object has not been sealed state") ErrInvalidSecondaryBlsSignature = gfsperrors.Register(module.ExecuteModularName, http.StatusNotAcceptable, 45216, "primary receive invalid bls signature from secondary SP") ErrInvalidReplicatePieceTask = gfsperrors.Register(module.ExecuteModularName, http.StatusInternalServerError, 45217, "invalid replicate piece task") )
Functions ¶
func ErrConsensusWithDetail ¶ added in v0.2.4
func ErrConsensusWithDetail(detail string) *gfsperrors.GfSpError
func ErrGfSpDBWithDetail ¶ added in v0.2.4
func ErrGfSpDBWithDetail(detail string) *gfsperrors.GfSpError
func ErrPieceStoreWithDetail ¶ added in v0.2.4
func ErrPieceStoreWithDetail(detail string) *gfsperrors.GfSpError
func NewExecuteModular ¶
func NewExecuteModular(app *gfspapp.GfSpBaseApp, cfg *gfspconfig.GfSpConfig) (coremodule.Modular, error)
Types ¶
type ExecuteModular ¶
type ExecuteModular struct {
// contains filtered or unexported fields
}
func (*ExecuteModular) HandleGCBucketMigrationBucket ¶ added in v1.2.0
func (e *ExecuteModular) HandleGCBucketMigrationBucket(ctx context.Context, task coretask.GCBucketMigrationTask)
func (*ExecuteModular) HandleGCMetaTask ¶
func (e *ExecuteModular) HandleGCMetaTask(ctx context.Context, task coretask.GCMetaTask)
func (*ExecuteModular) HandleGCObjectTask ¶
func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.GCObjectTask)
func (*ExecuteModular) HandleGCStaleVersionObjectTask ¶ added in v1.5.0
func (e *ExecuteModular) HandleGCStaleVersionObjectTask(ctx context.Context, task coretask.GCStaleVersionObjectTask)
func (*ExecuteModular) HandleGCZombiePieceTask ¶
func (e *ExecuteModular) HandleGCZombiePieceTask(ctx context.Context, task coretask.GCZombiePieceTask)
func (*ExecuteModular) HandleMigrateGVGTask ¶ added in v0.2.3
func (e *ExecuteModular) HandleMigrateGVGTask(ctx context.Context, gvgTask coretask.MigrateGVGTask)
HandleMigrateGVGTask handles migrate gvg task, including two cases: sp exit and bucket migration. srcSP is a sp who wants to exit or need to migrate bucket, destSP is used to accept data from srcSP.
func (*ExecuteModular) HandleMigratePieceTask ¶ added in v0.2.3
func (e *ExecuteModular) HandleMigratePieceTask(ctx context.Context, gvgTask *gfsptask.GfSpMigrateGVGTask, pieceTask *gfsptask.GfSpMigratePieceTask) error
HandleMigratePieceTask handles migrate piece task It will send requests to the src SP(exiting SP or bucket migration) to get piece data. Using piece data to generate piece checksum and integrity hash, if integrity hash is similar to chain's, piece data would be written into PieceStore, generated piece checksum and integrity hash will be written into sql db.
storagetypes.ObjectInfo struct contains LocalVirtualGroupId field which we can use it to get a GVG consisting of one PrimarySP and six ordered secondarySP(the order cannot be changed). Therefore, we can know what kinds of object we want to migrate: primary or secondary. Now we cannot use objectInfo operator address or secondaryAddress straightly. We should encapsulate a new method to get. objectInfo->lvg->gvg->(1 primarySP, 6 secondarySPs)
func (*ExecuteModular) HandleReceivePieceTask ¶
func (e *ExecuteModular) HandleReceivePieceTask(ctx context.Context, task coretask.ReceivePieceTask)
func (*ExecuteModular) HandleRecoverPieceTask ¶ added in v0.2.3
func (e *ExecuteModular) HandleRecoverPieceTask(ctx context.Context, task coretask.RecoveryPieceTask)
HandleRecoverPieceTask handle the recovery piece task, it will send request to other SPs to get piece data to recovery, recovery the original data, and write the recovered data to piece store
func (*ExecuteModular) HandleReplicatePieceTask ¶
func (e *ExecuteModular) HandleReplicatePieceTask(ctx context.Context, task coretask.ReplicatePieceTask)
func (*ExecuteModular) HandleSealObjectTask ¶
func (e *ExecuteModular) HandleSealObjectTask(ctx context.Context, task coretask.SealObjectTask)
func (*ExecuteModular) Name ¶
func (e *ExecuteModular) Name() string
func (*ExecuteModular) ReleaseResource ¶
func (e *ExecuteModular) ReleaseResource(ctx context.Context, span corercmgr.ResourceScopeSpan)
func (*ExecuteModular) ReportTask ¶
func (*ExecuteModular) ReserveResource ¶
func (e *ExecuteModular) ReserveResource(ctx context.Context, st *corercmgr.ScopeStat) (corercmgr.ResourceScopeSpan, error)
func (*ExecuteModular) Statistics ¶
func (e *ExecuteModular) Statistics() string
type GCWorker ¶ added in v1.2.0
type GCWorker struct {
// contains filtered or unexported fields
}
func NewGCWorker ¶ added in v1.2.0
func NewGCWorker(e *ExecuteModular) *GCWorker
NewGCWorker returns gc worker instance.