metadata

package
v0.0.0-...-beee317 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 3, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrStateNotFound = errors.New("state not found")

ErrStateNotFound is returned when the state is not found in metadata.

Functions

func DelAllDroppedColumns

func DelAllDroppedColumns(ctx context.Context, kvClient metaModel.KVClient) error

DelAllDroppedColumns deletes all dropped columns in metadata.

Types

type ClusterInfo

type ClusterInfo struct {
	Version semver.Version
}

ClusterInfo represents the cluster info.

func NewClusterInfo

func NewClusterInfo(version semver.Version) *ClusterInfo

NewClusterInfo creates a new ClusterInfo instance.

type ClusterInfoStore

type ClusterInfoStore struct {
	// contains filtered or unexported fields
}

ClusterInfoStore manages the state of ClusterInfo.

func NewClusterInfoStore

func NewClusterInfoStore(kvClient metaModel.KVClient) *ClusterInfoStore

NewClusterInfoStore returns a new ClusterInfoStore instance

func (ClusterInfoStore) Delete

func (f ClusterInfoStore) Delete(ctx context.Context) error

func (ClusterInfoStore) Get

func (f ClusterInfoStore) Get(ctx context.Context) (state, error)

func (ClusterInfoStore) Put

func (f ClusterInfoStore) Put(ctx context.Context, state state) error

func (*ClusterInfoStore) UpdateVersion

func (clusterInfoStore *ClusterInfoStore) UpdateVersion(ctx context.Context, newVer semver.Version) error

UpdateVersion updates the version of ClusterInfo.

type DDLItem

type DDLItem struct {
	TargetTable TargetTable
	SourceTable SourceTable
	// Tables is the table stmts in order before and after the DDL executed
	Tables []string
	DDLs   []string
	Type   DDLType
}

DDLItem represents a DDL item

type DDLType

type DDLType uint64

DDLType defines the type of DDL

const (
	CreateTable DDLType = iota + 1
	DropTable
	OtherDDL
)

Defines DDLType here.

type DroppedColumns

type DroppedColumns struct {
	// column -> source table
	Cols map[string]map[SourceTable]struct{}
}

DroppedColumns represents the state of dropped columns

type DroppedColumnsStore

type DroppedColumnsStore struct {
	// contains filtered or unexported fields
}

DroppedColumnsStore manages the dropped columns state

func NewDroppedColumnsStore

func NewDroppedColumnsStore(kvClient metaModel.KVClient, targetTable TargetTable) *DroppedColumnsStore

NewDroppedColumnsStore returns a new DroppedColumnsStore instance

func (*DroppedColumnsStore) AddDroppedColumns

func (s *DroppedColumnsStore) AddDroppedColumns(ctx context.Context, cols []string, sourceTable SourceTable) error

AddDroppedColumns adds dropped columns to the state

func (*DroppedColumnsStore) DelDroppedColumn

func (s *DroppedColumnsStore) DelDroppedColumn(ctx context.Context, col string) error

DelDroppedColumn deletes dropped column from the state

func (*DroppedColumnsStore) DelDroppedColumnForTable

func (s *DroppedColumnsStore) DelDroppedColumnForTable(ctx context.Context, sourceTable SourceTable) error

DelDroppedColumnForTable deletes dropped column for one source table

func (DroppedColumnsStore) Delete

func (f DroppedColumnsStore) Delete(ctx context.Context) error

func (DroppedColumnsStore) Get

func (f DroppedColumnsStore) Get(ctx context.Context) (state, error)

func (*DroppedColumnsStore) HasDroppedColumn

func (s *DroppedColumnsStore) HasDroppedColumn(ctx context.Context, col string, sourceTable SourceTable) bool

HasDroppedColumn returns whether the column is dropped before

func (DroppedColumnsStore) Put

func (f DroppedColumnsStore) Put(ctx context.Context, state state) error

type FinishedTaskStatus

type FinishedTaskStatus struct {
	TaskStatus
	Result   *pb.ProcessResult
	Status   json.RawMessage
	Duration time.Duration
}

FinishedTaskStatus wraps the TaskStatus with FinishedStatus. It only used when a task is finished.

type Job

type Job struct {
	// taskID -> task
	Tasks map[string]*Task

	// Deleting represents whether the job is being deleted.
	Deleting bool
}

Job represents the state of a job.

func NewJob

func NewJob(jobCfg *config.JobCfg) *Job

NewJob creates a new Job instance

type JobStore

type JobStore struct {
	*bootstrap.DefaultUpgrader
	// contains filtered or unexported fields
}

JobStore manages the state of a job.

func NewJobStore

func NewJobStore(kvClient metaModel.KVClient, pLogger *zap.Logger) *JobStore

NewJobStore creates a new JobStore instance

func (JobStore) Delete

func (f JobStore) Delete(ctx context.Context) error

func (JobStore) Get

func (f JobStore) Get(ctx context.Context) (state, error)

func (*JobStore) GetJobCfg

func (jobStore *JobStore) GetJobCfg(ctx context.Context) (*config.JobCfg, error)

GetJobCfg gets the job config.

func (*JobStore) MarkDeleting

func (jobStore *JobStore) MarkDeleting(ctx context.Context) error

MarkDeleting marks the job as deleting.

func (JobStore) Put

func (f JobStore) Put(ctx context.Context, state state) error

func (*JobStore) UpdateConfig

func (jobStore *JobStore) UpdateConfig(ctx context.Context, jobCfg *config.JobCfg) error

UpdateConfig will be called if user update job config.

func (*JobStore) UpdateStages

func (jobStore *JobStore) UpdateStages(ctx context.Context, taskIDs []string, stage TaskStage) error

UpdateStages will be called if user operate job.

func (*JobStore) UpgradeFuncs

func (jobStore *JobStore) UpgradeFuncs() []bootstrap.UpgradeFunc

UpgradeFuncs implement the Upgrader interface.

type MetaData

type MetaData struct {
	// contains filtered or unexported fields
}

MetaData is the metadata of dm.

func NewMetaData

func NewMetaData(kvClient metaModel.KVClient, pLogger *zap.Logger) *MetaData

NewMetaData creates a new MetaData instance

func (*MetaData) ClusterInfoStore

func (m *MetaData) ClusterInfoStore() *ClusterInfoStore

ClusterInfoStore returns internal infoStore

func (*MetaData) JobStore

func (m *MetaData) JobStore() *JobStore

JobStore returns internal jobStore

func (*MetaData) UnitStateStore

func (m *MetaData) UnitStateStore() *UnitStateStore

UnitStateStore returns internal unitStateStore

func (*MetaData) Upgrade

func (m *MetaData) Upgrade(ctx context.Context, fromVer semver.Version) error

Upgrade upgrades metadata.

type SourceTable

type SourceTable struct {
	Source string
	Schema string
	Table  string
}

SourceTable represents a upstream table

func (SourceTable) MarshalText

func (st SourceTable) MarshalText() (text []byte, err error)

MarshalText implements json.MarshalText.

func (*SourceTable) UnmarshalText

func (st *SourceTable) UnmarshalText(text []byte) error

UnmarshalText implements json.UnmarshalText.

type Store

type Store interface {
	Put(ctx context.Context, state state) error
	Delete(ctx context.Context) error
	Get(ctx context.Context) (state, error)
}

Store holds one state instance.

type TargetTable

type TargetTable struct {
	Schema string
	Table  string
}

TargetTable represents a downstream table.

func (TargetTable) MarshalText

func (tt TargetTable) MarshalText() (text []byte, err error)

MarshalText implements json.MarshalText.

func (*TargetTable) UnmarshalText

func (tt *TargetTable) UnmarshalText(text []byte) error

UnmarshalText implements json.UnmarshalText.

type Task

type Task struct {
	Cfg              *config.TaskCfg
	Stage            TaskStage
	StageUpdatedTime time.Time
}

Task is the minimum working unit of a job. A job may contain multiple upstream and it will be converted into multiple tasks.

func NewTask

func NewTask(taskCfg *config.TaskCfg) *Task

NewTask creates a new Task instance

type TaskStage

type TaskStage int

TaskStage represents internal stage of a task. TODO: use Stage in lib or move Stage to lib. we need to use same value for stage with same name in dmpb.Stage in order to make grafana dashboard label correct, since we use the same grafana dashboard for OP and engine. there's no need for them to have same meaning, just for grafana display.

const (
	StageInit     TaskStage = iota + 1  // = 1 = dmpb.Stage_New
	StageRunning                        // = 2 = dmpb.Stage_Running
	StagePaused                         // = 3 ~= dmpb.Stage_Paused. in engine this stage means paused by user, if it's auto-paused by error, it's StageError
	StageFinished TaskStage = iota + 2  // = 5 = dmpb.Stage_Finished. skip 4 - Stopped, no such stage in engine, see dm/worker/metrics.go
	StagePausing                        // = 6 = dmpb.Stage_Pausing
	StageError    TaskStage = iota + 10 // = 15, leave some value space for extension of dmpb.Stage
	// StageUnscheduled means the task is not scheduled.
	// This usually happens when the worker is offline.
	StageUnscheduled
)

These stages may be updated in later pr.

func (TaskStage) MarshalJSON

func (ts TaskStage) MarshalJSON() ([]byte, error)

MarshalJSON marshals the enum as a quoted json string

func (TaskStage) String

func (ts TaskStage) String() string

String implements fmt.Stringer interface

func (*TaskStage) UnmarshalJSON

func (ts *TaskStage) UnmarshalJSON(b []byte) error

UnmarshalJSON unmashals a quoted json string to the enum value

type TaskStatus

type TaskStatus struct {
	Unit             frameModel.WorkerType
	Task             string
	Stage            TaskStage
	CfgModRevision   uint64
	StageUpdatedTime time.Time
}

TaskStatus defines the running task status.

type UnitState

type UnitState struct {
	// taskID -> sequence of finished status
	FinishedUnitStatus map[string][]*FinishedTaskStatus
	CurrentUnitStatus  map[string]*UnitStatus
}

UnitState represents the state of units.

type UnitStateStore

type UnitStateStore struct {
	// contains filtered or unexported fields
}

UnitStateStore is the meta store for UnitState.

func NewUnitStateStore

func NewUnitStateStore(kvClient metaModel.KVClient) *UnitStateStore

NewUnitStateStore creates a new UnitStateStore.

func (UnitStateStore) Delete

func (f UnitStateStore) Delete(ctx context.Context) error

func (UnitStateStore) Get

func (f UnitStateStore) Get(ctx context.Context) (state, error)

func (UnitStateStore) Put

func (f UnitStateStore) Put(ctx context.Context, state state) error

func (*UnitStateStore) ReadModifyWrite

func (f *UnitStateStore) ReadModifyWrite(
	ctx context.Context,
	action func(*UnitState) error,
) error

ReadModifyWrite reads the state, modifies it, and writes it back.

type UnitStatus

type UnitStatus struct {
	Unit        frameModel.WorkerType
	Task        string
	CreatedTime time.Time
}

UnitStatus defines the unit status.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL