unit

package
v0.0.0-...-686f8ea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultInitTimeout represents the default timeout value when initializing a process unit.
	DefaultInitTimeout = time.Minute
)

Variables

This section is empty.

Functions

func IsCtxCanceledProcessErr

func IsCtxCanceledProcessErr(err *pb.ProcessError) bool

IsCtxCanceledProcessErr returns true if the err's context canceled.

func IsResumableDBError

func IsResumableDBError(err error) bool

IsResumableDBError checks whether the error is resumable DB error. this is a simplified version of IsResumableError. we use a blacklist to filter out some errors which can not be resumed, all other errors is resumable.

func IsResumableError

func IsResumableError(err *pb.ProcessError) bool

IsResumableError checks the error message and returns whether we need to resume the task unit and retry.

func IsResumableRelayError

func IsResumableRelayError(err *pb.ProcessError) bool

IsResumableRelayError return whether we need resume relay on error since relay impl unit interface too, so we put it here.

func JoinProcessErrors

func JoinProcessErrors(errors []*pb.ProcessError) string

JoinProcessErrors return the string of pb.ProcessErrors joined by ", ".

func NewProcessError

func NewProcessError(err error) *pb.ProcessError

NewProcessError creates a new ProcessError we can refine to add error scope field if needed.

Types

type Unit

type Unit interface {
	// Init initializes the dm process unit
	// every unit does base initialization in `Init`, and this must pass before start running the subtask
	// other setups can be done in the beginning of `Process`, but this should be treated carefully to make it
	// compatible with Pause / Resume.
	// if initialing successfully, the outer caller should call `Close` when the unit (or the task) finished, stopped or canceled (because other units Init fail).
	// if initialing fail, Init itself should release resources it acquired before (rolling itself back).
	Init(ctx context.Context) error
	// Process does the main logic and its returning must send a result to pr channel.
	// When ctx.Done, stops the process and returns, otherwise the DM-worker will be blocked forever
	// When not in processing, call Process to continue or resume the process
	Process(ctx context.Context, pr chan pb.ProcessResult)
	// Close shuts down the process and closes the unit, after that can not call Process to resume
	// The implementation should not block for a long time.
	Close()
	// Kill shuts down the process and closes the unit without graceful.
	Kill()
	// Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned.
	// The implementation should not block for a long time.
	Pause()
	// Resume resumes the paused process and its returning must send a result to pr channel.
	Resume(ctx context.Context, pr chan pb.ProcessResult)
	// Update updates the configuration
	Update(ctx context.Context, cfg *config.SubTaskConfig) error

	// Status returns the unit's current status. The result may need calculation with source status, like estimated time
	// to catch up. If sourceStatus is nil, the calculation should be skipped.
	Status(sourceStatus *binlog.SourceStatus) interface{}
	// Type returns the unit's type
	Type() pb.UnitType
	// IsFreshTask return whether is a fresh task (not processed before)
	// it will be used to decide where the task should become restoring
	IsFreshTask(ctx context.Context) (bool, error)
}

Unit defines interface for subtask process units, like syncer, loader, relay, etc. The Unit is not responsible to maintain its status like "pausing"/"paused". The caller should maintain the status, for example, know the Unit is "paused" and avoid call Pause again. All method is Unit interface can expect no concurrent invocation, the caller should guarantee this.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL