Documentation ¶
Index ¶
- Constants
- func IsCtxCanceledProcessErr(err *pb.ProcessError) bool
- func IsResumableDBError(err error) bool
- func IsResumableError(err *pb.ProcessError) bool
- func IsResumableRelayError(err *pb.ProcessError) bool
- func JoinProcessErrors(errors []*pb.ProcessError) string
- func NewProcessError(err error) *pb.ProcessError
- type Unit
Constants ¶
const ( // DefaultInitTimeout represents the default timeout value when initializing a process unit. DefaultInitTimeout = time.Minute )
Variables ¶
This section is empty.
Functions ¶
func IsCtxCanceledProcessErr ¶
func IsCtxCanceledProcessErr(err *pb.ProcessError) bool
IsCtxCanceledProcessErr returns true if the err's context canceled.
func IsResumableDBError ¶
IsResumableDBError checks whether the error is resumable DB error. this is a simplified version of IsResumableError. we use a blacklist to filter out some errors which can not be resumed, all other errors is resumable.
func IsResumableError ¶
func IsResumableError(err *pb.ProcessError) bool
IsResumableError checks the error message and returns whether we need to resume the task unit and retry.
func IsResumableRelayError ¶
func IsResumableRelayError(err *pb.ProcessError) bool
IsResumableRelayError return whether we need resume relay on error since relay impl unit interface too, so we put it here.
func JoinProcessErrors ¶
func JoinProcessErrors(errors []*pb.ProcessError) string
JoinProcessErrors return the string of pb.ProcessErrors joined by ", ".
func NewProcessError ¶
func NewProcessError(err error) *pb.ProcessError
NewProcessError creates a new ProcessError we can refine to add error scope field if needed.
Types ¶
type Unit ¶
type Unit interface { // Init initializes the dm process unit // every unit does base initialization in `Init`, and this must pass before start running the subtask // other setups can be done in the beginning of `Process`, but this should be treated carefully to make it // compatible with Pause / Resume. // if initialing successfully, the outer caller should call `Close` when the unit (or the task) finished, stopped or canceled (because other units Init fail). // if initialing fail, Init itself should release resources it acquired before (rolling itself back). Init(ctx context.Context) error // Process does the main logic and its returning must send a result to pr channel. // When ctx.Done, stops the process and returns, otherwise the DM-worker will be blocked forever // When not in processing, call Process to continue or resume the process Process(ctx context.Context, pr chan pb.ProcessResult) // Close shuts down the process and closes the unit, after that can not call Process to resume // The implementation should not block for a long time. Close() // Kill shuts down the process and closes the unit without graceful. Kill() // Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned. // The implementation should not block for a long time. Pause() // Resume resumes the paused process and its returning must send a result to pr channel. Resume(ctx context.Context, pr chan pb.ProcessResult) // Update updates the configuration Update(ctx context.Context, cfg *config.SubTaskConfig) error // Status returns the unit's current status. The result may need calculation with source status, like estimated time // to catch up. If sourceStatus is nil, the calculation should be skipped. Status(sourceStatus *binlog.SourceStatus) interface{} // Type returns the unit's type Type() pb.UnitType // IsFreshTask return whether is a fresh task (not processed before) // it will be used to decide where the task should become restoring IsFreshTask(ctx context.Context) (bool, error) }
Unit defines interface for subtask process units, like syncer, loader, relay, etc. The Unit is not responsible to maintain its status like "pausing"/"paused". The caller should maintain the status, for example, know the Unit is "paused" and avoid call Pause again. All method is Unit interface can expect no concurrent invocation, the caller should guarantee this.