checkpoints

package
v5.0.0-nightly+incompa... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 2, 2021 License: Apache-2.0 Imports: 29 Imported by: 5

Documentation

Index

Constants

View Source
const (
	// the table names to store each kind of checkpoint in the checkpoint database
	// remember to increase the version number in case of incompatible change.
	CheckpointTableNameTask   = "task_v2"
	CheckpointTableNameTable  = "table_v6"
	CheckpointTableNameEngine = "engine_v5"
	CheckpointTableNameChunk  = "chunk_v5"
)
View Source
const (
	// shared by MySQLCheckpointsDB and GlueCheckpointsDB
	CreateDBTemplate        = "CREATE DATABASE IF NOT EXISTS %s;"
	CreateTaskTableTemplate = `` /* 376-byte string literal not displayed */

	CreateTableTableTemplate = `` /* 435-byte string literal not displayed */

	CreateEngineTableTemplate = `` /* 338-byte string literal not displayed */

	CreateChunkTableTemplate = `` /* 827-byte string literal not displayed */

	InitTaskTemplate = `` /* 174-byte string literal not displayed */

	InitTableTemplate = `` /* 182-byte string literal not displayed */

	ReadTaskTemplate = `` /* 139-byte string literal not displayed */

	ReadEngineTemplate = `
		SELECT engine_id, status FROM %s.%s WHERE table_name = ? ORDER BY engine_id DESC;`
	ReadChunkTemplate = `` /* 268-byte string literal not displayed */

	ReadTableRemainTemplate = `
		SELECT status, alloc_base, table_id FROM %s.%s WHERE table_name = ?;`
	ReplaceEngineTemplate = `
		REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);`
	ReplaceChunkTemplate = `` /* 346-byte string literal not displayed */

	UpdateChunkTemplate = `` /* 168-byte string literal not displayed */

	UpdateTableRebaseTemplate = `
		UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;`
	UpdateTableStatusTemplate = `
		UPDATE %s.%s SET status = ? WHERE table_name = ?;`
	UpdateEngineTemplate = `
		UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);`
	DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;"
)
View Source
const WholeTableEngineID = math.MaxInt32

Variables

View Source
var (
	ErrInvalidLengthFileCheckpoints        = fmt.Errorf("proto: negative length found during unmarshaling")
	ErrIntOverflowFileCheckpoints          = fmt.Errorf("proto: integer overflow")
	ErrUnexpectedEndOfGroupFileCheckpoints = fmt.Errorf("proto: unexpected end of group")
)

Functions

func IsCheckpointTable

func IsCheckpointTable(name string) bool

func IsCheckpointsDBExists

func IsCheckpointsDBExists(ctx context.Context, cfg *config.Config) (bool, error)

func Transact

func Transact(ctx context.Context, purpose string, s Session, logger log.Logger, action func(context.Context, Session) error) error

Types

type CheckpointStatus

type CheckpointStatus uint8
const (
	CheckpointStatusMissing         CheckpointStatus = 0
	CheckpointStatusMaxInvalid      CheckpointStatus = 25
	CheckpointStatusLoaded          CheckpointStatus = 30
	CheckpointStatusAllWritten      CheckpointStatus = 60
	CheckpointStatusClosed          CheckpointStatus = 90
	CheckpointStatusImported        CheckpointStatus = 120
	CheckpointStatusIndexImported   CheckpointStatus = 140
	CheckpointStatusAlteredAutoInc  CheckpointStatus = 150
	CheckpointStatusChecksumSkipped CheckpointStatus = 170
	CheckpointStatusChecksummed     CheckpointStatus = 180
	CheckpointStatusAnalyzeSkipped  CheckpointStatus = 200
	CheckpointStatusAnalyzed        CheckpointStatus = 210
)

func (CheckpointStatus) MetricName

func (status CheckpointStatus) MetricName() string

type CheckpointsDB

type CheckpointsDB interface {
	Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
	TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
	Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
	Close() error
	// InsertEngineCheckpoints initializes the checkpoints related to a table.
	// It assumes the entire table has not been imported before and will fill in
	// default values for the column permutations and checksums.
	InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
	Update(checkpointDiffs map[string]*TableCheckpointDiff)

	RemoveCheckpoint(ctx context.Context, tableName string) error
	// MoveCheckpoints renames the checkpoint schema to include a suffix
	// including the taskID (e.g. `tidb_lightning_checkpoints.1234567890.bak`).
	MoveCheckpoints(ctx context.Context, taskID int64) error
	// GetLocalStoringTables returns a map containing tables have engine files stored on local disk.
	// currently only meaningful for local backend
	GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
	IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
	DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
	DumpTables(ctx context.Context, csv io.Writer) error
	DumpEngines(ctx context.Context, csv io.Writer) error
	DumpChunks(ctx context.Context, csv io.Writer) error
}

func OpenCheckpointsDB

func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, error)

type CheckpointsModel

type CheckpointsModel struct {
	// key is table_name
	Checkpoints    map[string]*TableCheckpointModel `` /* 163-byte string literal not displayed */
	TaskCheckpoint *TaskCheckpointModel             `protobuf:"bytes,2,opt,name=task_checkpoint,json=taskCheckpoint,proto3" json:"task_checkpoint,omitempty"`
}

func (*CheckpointsModel) Descriptor

func (*CheckpointsModel) Descriptor() ([]byte, []int)

func (*CheckpointsModel) Marshal

func (m *CheckpointsModel) Marshal() (dAtA []byte, err error)

func (*CheckpointsModel) MarshalTo

func (m *CheckpointsModel) MarshalTo(dAtA []byte) (int, error)

func (*CheckpointsModel) MarshalToSizedBuffer

func (m *CheckpointsModel) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*CheckpointsModel) ProtoMessage

func (*CheckpointsModel) ProtoMessage()

func (*CheckpointsModel) Reset

func (m *CheckpointsModel) Reset()

func (*CheckpointsModel) Size

func (m *CheckpointsModel) Size() (n int)

func (*CheckpointsModel) String

func (m *CheckpointsModel) String() string

func (*CheckpointsModel) Unmarshal

func (m *CheckpointsModel) Unmarshal(dAtA []byte) error

func (*CheckpointsModel) XXX_DiscardUnknown

func (m *CheckpointsModel) XXX_DiscardUnknown()

func (*CheckpointsModel) XXX_Marshal

func (m *CheckpointsModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*CheckpointsModel) XXX_Merge

func (m *CheckpointsModel) XXX_Merge(src proto.Message)

func (*CheckpointsModel) XXX_Size

func (m *CheckpointsModel) XXX_Size() int

func (*CheckpointsModel) XXX_Unmarshal

func (m *CheckpointsModel) XXX_Unmarshal(b []byte) error

type ChunkCheckpoint

type ChunkCheckpoint struct {
	Key               ChunkCheckpointKey
	FileMeta          mydump.SourceFileMeta
	ColumnPermutation []int
	Chunk             mydump.Chunk
	Checksum          verify.KVChecksum
	Timestamp         int64
}

func (*ChunkCheckpoint) DeepCopy

func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint

type ChunkCheckpointKey

type ChunkCheckpointKey struct {
	Path   string
	Offset int64
}

func (*ChunkCheckpointKey) String

func (key *ChunkCheckpointKey) String() string

type ChunkCheckpointMerger

type ChunkCheckpointMerger struct {
	EngineID          int32
	Key               ChunkCheckpointKey
	Checksum          verify.KVChecksum
	Pos               int64
	RowID             int64
	ColumnPermutation []int
}

func (*ChunkCheckpointMerger) MergeInto

func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)

type ChunkCheckpointModel

type ChunkCheckpointModel struct {
	Path              string  `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
	Offset            int64   `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"`
	ColumnPermutation []int32 `protobuf:"varint,12,rep,packed,name=column_permutation,json=columnPermutation,proto3" json:"column_permutation,omitempty"`
	EndOffset         int64   `protobuf:"varint,5,opt,name=end_offset,json=endOffset,proto3" json:"end_offset,omitempty"`
	Pos               int64   `protobuf:"varint,6,opt,name=pos,proto3" json:"pos,omitempty"`
	PrevRowidMax      int64   `protobuf:"varint,7,opt,name=prev_rowid_max,json=prevRowidMax,proto3" json:"prev_rowid_max,omitempty"`
	RowidMax          int64   `protobuf:"varint,8,opt,name=rowid_max,json=rowidMax,proto3" json:"rowid_max,omitempty"`
	KvcBytes          uint64  `protobuf:"varint,9,opt,name=kvc_bytes,json=kvcBytes,proto3" json:"kvc_bytes,omitempty"`
	KvcKvs            uint64  `protobuf:"varint,10,opt,name=kvc_kvs,json=kvcKvs,proto3" json:"kvc_kvs,omitempty"`
	KvcChecksum       uint64  `protobuf:"fixed64,11,opt,name=kvc_checksum,json=kvcChecksum,proto3" json:"kvc_checksum,omitempty"`
	Timestamp         int64   `protobuf:"fixed64,13,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	Type              int32   `protobuf:"varint,14,opt,name=type,proto3" json:"type,omitempty"`
	Compression       int32   `protobuf:"varint,15,opt,name=compression,proto3" json:"compression,omitempty"`
	SortKey           string  `protobuf:"bytes,16,opt,name=sort_key,json=sortKey,proto3" json:"sort_key,omitempty"`
	FileSize          int64   `protobuf:"varint,17,opt,name=file_size,json=fileSize,proto3" json:"file_size,omitempty"`
}

func (*ChunkCheckpointModel) Descriptor

func (*ChunkCheckpointModel) Descriptor() ([]byte, []int)

func (*ChunkCheckpointModel) Marshal

func (m *ChunkCheckpointModel) Marshal() (dAtA []byte, err error)

func (*ChunkCheckpointModel) MarshalTo

func (m *ChunkCheckpointModel) MarshalTo(dAtA []byte) (int, error)

func (*ChunkCheckpointModel) MarshalToSizedBuffer

func (m *ChunkCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*ChunkCheckpointModel) ProtoMessage

func (*ChunkCheckpointModel) ProtoMessage()

func (*ChunkCheckpointModel) Reset

func (m *ChunkCheckpointModel) Reset()

func (*ChunkCheckpointModel) Size

func (m *ChunkCheckpointModel) Size() (n int)

func (*ChunkCheckpointModel) String

func (m *ChunkCheckpointModel) String() string

func (*ChunkCheckpointModel) Unmarshal

func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error

func (*ChunkCheckpointModel) XXX_DiscardUnknown

func (m *ChunkCheckpointModel) XXX_DiscardUnknown()

func (*ChunkCheckpointModel) XXX_Marshal

func (m *ChunkCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*ChunkCheckpointModel) XXX_Merge

func (m *ChunkCheckpointModel) XXX_Merge(src proto.Message)

func (*ChunkCheckpointModel) XXX_Size

func (m *ChunkCheckpointModel) XXX_Size() int

func (*ChunkCheckpointModel) XXX_Unmarshal

func (m *ChunkCheckpointModel) XXX_Unmarshal(b []byte) error

type DestroyedTableCheckpoint

type DestroyedTableCheckpoint struct {
	TableName   string
	MinEngineID int32
	MaxEngineID int32
}

type EngineCheckpoint

type EngineCheckpoint struct {
	Status CheckpointStatus
	Chunks []*ChunkCheckpoint // a sorted array
}

func (*EngineCheckpoint) DeepCopy

func (engine *EngineCheckpoint) DeepCopy() *EngineCheckpoint

type EngineCheckpointModel

type EngineCheckpointModel struct {
	Status uint32 `protobuf:"varint,1,opt,name=status,proto3" json:"status,omitempty"`
	// key is "$path:$offset"
	Chunks map[string]*ChunkCheckpointModel `` /* 153-byte string literal not displayed */
}

func (*EngineCheckpointModel) Descriptor

func (*EngineCheckpointModel) Descriptor() ([]byte, []int)

func (*EngineCheckpointModel) Marshal

func (m *EngineCheckpointModel) Marshal() (dAtA []byte, err error)

func (*EngineCheckpointModel) MarshalTo

func (m *EngineCheckpointModel) MarshalTo(dAtA []byte) (int, error)

func (*EngineCheckpointModel) MarshalToSizedBuffer

func (m *EngineCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*EngineCheckpointModel) ProtoMessage

func (*EngineCheckpointModel) ProtoMessage()

func (*EngineCheckpointModel) Reset

func (m *EngineCheckpointModel) Reset()

func (*EngineCheckpointModel) Size

func (m *EngineCheckpointModel) Size() (n int)

func (*EngineCheckpointModel) String

func (m *EngineCheckpointModel) String() string

func (*EngineCheckpointModel) Unmarshal

func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error

func (*EngineCheckpointModel) XXX_DiscardUnknown

func (m *EngineCheckpointModel) XXX_DiscardUnknown()

func (*EngineCheckpointModel) XXX_Marshal

func (m *EngineCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*EngineCheckpointModel) XXX_Merge

func (m *EngineCheckpointModel) XXX_Merge(src proto.Message)

func (*EngineCheckpointModel) XXX_Size

func (m *EngineCheckpointModel) XXX_Size() int

func (*EngineCheckpointModel) XXX_Unmarshal

func (m *EngineCheckpointModel) XXX_Unmarshal(b []byte) error

type FileCheckpointsDB

type FileCheckpointsDB struct {
	// contains filtered or unexported fields
}

func NewFileCheckpointsDB

func NewFileCheckpointsDB(path string) *FileCheckpointsDB

func (*FileCheckpointsDB) Close

func (cpdb *FileCheckpointsDB) Close() error

func (*FileCheckpointsDB) DestroyErrorCheckpoint

func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error)

func (*FileCheckpointsDB) DumpChunks

func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error

func (*FileCheckpointsDB) DumpEngines

func (cpdb *FileCheckpointsDB) DumpEngines(context.Context, io.Writer) error

func (*FileCheckpointsDB) DumpTables

func (cpdb *FileCheckpointsDB) DumpTables(context.Context, io.Writer) error

func (*FileCheckpointsDB) Get

func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error)

func (*FileCheckpointsDB) GetLocalStoringTables

func (cpdb *FileCheckpointsDB) GetLocalStoringTables(_ context.Context) (map[string][]int32, error)

func (*FileCheckpointsDB) IgnoreErrorCheckpoint

func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error

func (*FileCheckpointsDB) Initialize

func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error

func (*FileCheckpointsDB) InsertEngineCheckpoints

func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error

func (*FileCheckpointsDB) MoveCheckpoints

func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error

func (*FileCheckpointsDB) RemoveCheckpoint

func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error

func (*FileCheckpointsDB) TaskCheckpoint

func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error)

func (*FileCheckpointsDB) Update

func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)

type GlueCheckpointsDB

type GlueCheckpointsDB struct {
	// contains filtered or unexported fields
}

GlueCheckpointsDB is almost same with MySQLCheckpointsDB, but it uses TiDB's internal data structure which requires a lot to keep same with database/sql. TODO: Encapsulate Begin/Commit/Rollback txn, form SQL with args and query/iter/scan TiDB's RecordSet into a interface to reuse MySQLCheckpointsDB.

func NewGlueCheckpointsDB

func NewGlueCheckpointsDB(ctx context.Context, se Session, f func() (Session, error), schemaName string) (*GlueCheckpointsDB, error)

func (GlueCheckpointsDB) Close

func (g GlueCheckpointsDB) Close() error

func (GlueCheckpointsDB) DestroyErrorCheckpoint

func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)

func (GlueCheckpointsDB) DumpChunks

func (g GlueCheckpointsDB) DumpChunks(ctx context.Context, csv io.Writer) error

func (GlueCheckpointsDB) DumpEngines

func (g GlueCheckpointsDB) DumpEngines(ctx context.Context, csv io.Writer) error

func (GlueCheckpointsDB) DumpTables

func (g GlueCheckpointsDB) DumpTables(ctx context.Context, csv io.Writer) error

func (GlueCheckpointsDB) Get

func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)

func (GlueCheckpointsDB) GetLocalStoringTables

func (g GlueCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)

func (GlueCheckpointsDB) IgnoreErrorCheckpoint

func (g GlueCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error

func (GlueCheckpointsDB) Initialize

func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error

func (GlueCheckpointsDB) InsertEngineCheckpoints

func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpointMap map[int32]*EngineCheckpoint) error

func (GlueCheckpointsDB) MoveCheckpoints

func (g GlueCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error

func (GlueCheckpointsDB) RemoveCheckpoint

func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error

func (GlueCheckpointsDB) TaskCheckpoint

func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)

func (GlueCheckpointsDB) Update

func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)

type MySQLCheckpointsDB

type MySQLCheckpointsDB struct {
	// contains filtered or unexported fields
}

func NewMySQLCheckpointsDB

func NewMySQLCheckpointsDB(ctx context.Context, db *sql.DB, schemaName string) (*MySQLCheckpointsDB, error)

func (*MySQLCheckpointsDB) Close

func (cpdb *MySQLCheckpointsDB) Close() error

func (*MySQLCheckpointsDB) DestroyErrorCheckpoint

func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)

func (*MySQLCheckpointsDB) DumpChunks

func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer) error

func (*MySQLCheckpointsDB) DumpEngines

func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Writer) error

func (*MySQLCheckpointsDB) DumpTables

func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer) error

func (*MySQLCheckpointsDB) Get

func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)

func (*MySQLCheckpointsDB) GetLocalStoringTables

func (cpdb *MySQLCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)

func (*MySQLCheckpointsDB) IgnoreErrorCheckpoint

func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error

func (*MySQLCheckpointsDB) Initialize

func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error

func (*MySQLCheckpointsDB) InsertEngineCheckpoints

func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error

func (*MySQLCheckpointsDB) MoveCheckpoints

func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error

func (*MySQLCheckpointsDB) RemoveCheckpoint

func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error

func (*MySQLCheckpointsDB) TaskCheckpoint

func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)

func (*MySQLCheckpointsDB) Update

func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)

type NullCheckpointsDB

type NullCheckpointsDB struct{}

NullCheckpointsDB is a checkpoints database with no checkpoints.

func NewNullCheckpointsDB

func NewNullCheckpointsDB() *NullCheckpointsDB

func (*NullCheckpointsDB) Close

func (*NullCheckpointsDB) Close() error

func (*NullCheckpointsDB) DestroyErrorCheckpoint

func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error)

func (*NullCheckpointsDB) DumpChunks

func (*NullCheckpointsDB) DumpEngines

func (*NullCheckpointsDB) DumpTables

func (*NullCheckpointsDB) Get

func (*NullCheckpointsDB) GetLocalStoringTables

func (*NullCheckpointsDB) GetLocalStoringTables(context.Context) (map[string][]int32, error)

func (*NullCheckpointsDB) IgnoreErrorCheckpoint

func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error

func (*NullCheckpointsDB) Initialize

func (*NullCheckpointsDB) InsertEngineCheckpoints

func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error

func (*NullCheckpointsDB) MoveCheckpoints

func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error

func (*NullCheckpointsDB) RemoveCheckpoint

func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error

func (*NullCheckpointsDB) TaskCheckpoint

func (*NullCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)

func (*NullCheckpointsDB) Update

type RebaseCheckpointMerger

type RebaseCheckpointMerger struct {
	AllocBase int64
}

func (*RebaseCheckpointMerger) MergeInto

func (merger *RebaseCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)

type Session

type Session interface {
	Close()
	Execute(context.Context, string) ([]sqlexec.RecordSet, error)
	CommitTxn(context.Context) error
	RollbackTxn(context.Context)
	PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error)
	ExecutePreparedStmt(ctx context.Context, stmtID uint32, param []types.Datum) (sqlexec.RecordSet, error)
	DropPreparedStmt(stmtID uint32) error
}

type StatusCheckpointMerger

type StatusCheckpointMerger struct {
	EngineID int32 // WholeTableEngineID == apply to whole table.
	Status   CheckpointStatus
}

func (*StatusCheckpointMerger) MergeInto

func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)

func (*StatusCheckpointMerger) SetInvalid

func (merger *StatusCheckpointMerger) SetInvalid()

type TableCheckpoint

type TableCheckpoint struct {
	Status    CheckpointStatus
	AllocBase int64
	Engines   map[int32]*EngineCheckpoint
	TableID   int64
}

func (*TableCheckpoint) Apply

func (cp *TableCheckpoint) Apply(cpd *TableCheckpointDiff)

Apply the diff to the existing chunk and engine checkpoints in `cp`.

func (*TableCheckpoint) CountChunks

func (cp *TableCheckpoint) CountChunks() int

func (*TableCheckpoint) DeepCopy

func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint

type TableCheckpointDiff

type TableCheckpointDiff struct {
	// contains filtered or unexported fields
}

func NewTableCheckpointDiff

func NewTableCheckpointDiff() *TableCheckpointDiff

func (*TableCheckpointDiff) String

func (cpd *TableCheckpointDiff) String() string

type TableCheckpointMerger

type TableCheckpointMerger interface {
	// MergeInto the table checkpoint diff from a status update or chunk update.
	// If there are multiple updates to the same table, only the last one will
	// take effect. Therefore, the caller must ensure events for the same table
	// are properly ordered by the global time (an old event must be merged
	// before the new one).
	MergeInto(cpd *TableCheckpointDiff)
}

type TableCheckpointModel

type TableCheckpointModel struct {
	Hash      []byte                           `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"`
	Status    uint32                           `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"`
	AllocBase int64                            `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"`
	Engines   map[int32]*EngineCheckpointModel `` /* 158-byte string literal not displayed */
	TableID   int64                            `protobuf:"varint,9,opt,name=tableID,proto3" json:"tableID,omitempty"`
}

func (*TableCheckpointModel) Descriptor

func (*TableCheckpointModel) Descriptor() ([]byte, []int)

func (*TableCheckpointModel) Marshal

func (m *TableCheckpointModel) Marshal() (dAtA []byte, err error)

func (*TableCheckpointModel) MarshalTo

func (m *TableCheckpointModel) MarshalTo(dAtA []byte) (int, error)

func (*TableCheckpointModel) MarshalToSizedBuffer

func (m *TableCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*TableCheckpointModel) ProtoMessage

func (*TableCheckpointModel) ProtoMessage()

func (*TableCheckpointModel) Reset

func (m *TableCheckpointModel) Reset()

func (*TableCheckpointModel) Size

func (m *TableCheckpointModel) Size() (n int)

func (*TableCheckpointModel) String

func (m *TableCheckpointModel) String() string

func (*TableCheckpointModel) Unmarshal

func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error

func (*TableCheckpointModel) XXX_DiscardUnknown

func (m *TableCheckpointModel) XXX_DiscardUnknown()

func (*TableCheckpointModel) XXX_Marshal

func (m *TableCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*TableCheckpointModel) XXX_Merge

func (m *TableCheckpointModel) XXX_Merge(src proto.Message)

func (*TableCheckpointModel) XXX_Size

func (m *TableCheckpointModel) XXX_Size() int

func (*TableCheckpointModel) XXX_Unmarshal

func (m *TableCheckpointModel) XXX_Unmarshal(b []byte) error

type TaskCheckpoint

type TaskCheckpoint struct {
	TaskId       int64
	SourceDir    string
	Backend      string
	ImporterAddr string
	TiDBHost     string
	TiDBPort     int
	PdAddr       string
	SortedKVDir  string
	LightningVer string
}

type TaskCheckpointModel

type TaskCheckpointModel struct {
	TaskId       int64  `protobuf:"varint,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"`
	SourceDir    string `protobuf:"bytes,2,opt,name=source_dir,json=sourceDir,proto3" json:"source_dir,omitempty"`
	Backend      string `protobuf:"bytes,3,opt,name=backend,proto3" json:"backend,omitempty"`
	ImporterAddr string `protobuf:"bytes,4,opt,name=importer_addr,json=importerAddr,proto3" json:"importer_addr,omitempty"`
	TidbHost     string `protobuf:"bytes,5,opt,name=tidb_host,json=tidbHost,proto3" json:"tidb_host,omitempty"`
	TidbPort     int32  `protobuf:"varint,6,opt,name=tidb_port,json=tidbPort,proto3" json:"tidb_port,omitempty"`
	PdAddr       string `protobuf:"bytes,7,opt,name=pd_addr,json=pdAddr,proto3" json:"pd_addr,omitempty"`
	SortedKvDir  string `protobuf:"bytes,8,opt,name=sorted_kv_dir,json=sortedKvDir,proto3" json:"sorted_kv_dir,omitempty"`
	LightningVer string `protobuf:"bytes,9,opt,name=lightning_ver,json=lightningVer,proto3" json:"lightning_ver,omitempty"`
}

func (*TaskCheckpointModel) Descriptor

func (*TaskCheckpointModel) Descriptor() ([]byte, []int)

func (*TaskCheckpointModel) Marshal

func (m *TaskCheckpointModel) Marshal() (dAtA []byte, err error)

func (*TaskCheckpointModel) MarshalTo

func (m *TaskCheckpointModel) MarshalTo(dAtA []byte) (int, error)

func (*TaskCheckpointModel) MarshalToSizedBuffer

func (m *TaskCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)

func (*TaskCheckpointModel) ProtoMessage

func (*TaskCheckpointModel) ProtoMessage()

func (*TaskCheckpointModel) Reset

func (m *TaskCheckpointModel) Reset()

func (*TaskCheckpointModel) Size

func (m *TaskCheckpointModel) Size() (n int)

func (*TaskCheckpointModel) String

func (m *TaskCheckpointModel) String() string

func (*TaskCheckpointModel) Unmarshal

func (m *TaskCheckpointModel) Unmarshal(dAtA []byte) error

func (*TaskCheckpointModel) XXX_DiscardUnknown

func (m *TaskCheckpointModel) XXX_DiscardUnknown()

func (*TaskCheckpointModel) XXX_Marshal

func (m *TaskCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)

func (*TaskCheckpointModel) XXX_Merge

func (m *TaskCheckpointModel) XXX_Merge(src proto.Message)

func (*TaskCheckpointModel) XXX_Size

func (m *TaskCheckpointModel) XXX_Size() int

func (*TaskCheckpointModel) XXX_Unmarshal

func (m *TaskCheckpointModel) XXX_Unmarshal(b []byte) error

type TidbDBInfo

type TidbDBInfo struct {
	Name   string
	Tables map[string]*TidbTableInfo
}

type TidbTableInfo

type TidbTableInfo struct {
	ID   int64
	DB   string
	Name string
	Core *model.TableInfo
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL