Documentation ¶
Index ¶
- Constants
- Variables
- func IsCheckpointTable(name string) bool
- func IsCheckpointsDBExists(ctx context.Context, cfg *config.Config) (bool, error)
- func Transact(ctx context.Context, purpose string, s Session, logger log.Logger, ...) error
- type CheckpointStatus
- type CheckpointsDB
- type CheckpointsModel
- func (*CheckpointsModel) Descriptor() ([]byte, []int)
- func (m *CheckpointsModel) Marshal() (dAtA []byte, err error)
- func (m *CheckpointsModel) MarshalTo(dAtA []byte) (int, error)
- func (m *CheckpointsModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*CheckpointsModel) ProtoMessage()
- func (m *CheckpointsModel) Reset()
- func (m *CheckpointsModel) Size() (n int)
- func (m *CheckpointsModel) String() string
- func (m *CheckpointsModel) Unmarshal(dAtA []byte) error
- func (m *CheckpointsModel) XXX_DiscardUnknown()
- func (m *CheckpointsModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *CheckpointsModel) XXX_Merge(src proto.Message)
- func (m *CheckpointsModel) XXX_Size() int
- func (m *CheckpointsModel) XXX_Unmarshal(b []byte) error
- type ChunkCheckpoint
- type ChunkCheckpointKey
- type ChunkCheckpointMerger
- type ChunkCheckpointModel
- func (*ChunkCheckpointModel) Descriptor() ([]byte, []int)
- func (m *ChunkCheckpointModel) Marshal() (dAtA []byte, err error)
- func (m *ChunkCheckpointModel) MarshalTo(dAtA []byte) (int, error)
- func (m *ChunkCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*ChunkCheckpointModel) ProtoMessage()
- func (m *ChunkCheckpointModel) Reset()
- func (m *ChunkCheckpointModel) Size() (n int)
- func (m *ChunkCheckpointModel) String() string
- func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error
- func (m *ChunkCheckpointModel) XXX_DiscardUnknown()
- func (m *ChunkCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ChunkCheckpointModel) XXX_Merge(src proto.Message)
- func (m *ChunkCheckpointModel) XXX_Size() int
- func (m *ChunkCheckpointModel) XXX_Unmarshal(b []byte) error
- type DestroyedTableCheckpoint
- type EngineCheckpoint
- type EngineCheckpointModel
- func (*EngineCheckpointModel) Descriptor() ([]byte, []int)
- func (m *EngineCheckpointModel) Marshal() (dAtA []byte, err error)
- func (m *EngineCheckpointModel) MarshalTo(dAtA []byte) (int, error)
- func (m *EngineCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*EngineCheckpointModel) ProtoMessage()
- func (m *EngineCheckpointModel) Reset()
- func (m *EngineCheckpointModel) Size() (n int)
- func (m *EngineCheckpointModel) String() string
- func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error
- func (m *EngineCheckpointModel) XXX_DiscardUnknown()
- func (m *EngineCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *EngineCheckpointModel) XXX_Merge(src proto.Message)
- func (m *EngineCheckpointModel) XXX_Size() int
- func (m *EngineCheckpointModel) XXX_Unmarshal(b []byte) error
- type FileCheckpointsDB
- func (cpdb *FileCheckpointsDB) Close() error
- func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error)
- func (cpdb *FileCheckpointsDB) DumpChunks(context.Context, io.Writer) error
- func (cpdb *FileCheckpointsDB) DumpEngines(context.Context, io.Writer) error
- func (cpdb *FileCheckpointsDB) DumpTables(context.Context, io.Writer) error
- func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error)
- func (cpdb *FileCheckpointsDB) GetLocalStoringTables(_ context.Context) (map[string][]int32, error)
- func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error
- func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
- func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
- func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
- func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error
- func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error)
- func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
- type GlueCheckpointsDB
- func (g GlueCheckpointsDB) Close() error
- func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
- func (g GlueCheckpointsDB) DumpChunks(ctx context.Context, csv io.Writer) error
- func (g GlueCheckpointsDB) DumpEngines(ctx context.Context, csv io.Writer) error
- func (g GlueCheckpointsDB) DumpTables(ctx context.Context, csv io.Writer) error
- func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
- func (g GlueCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
- func (g GlueCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
- func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
- func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, ...) error
- func (g GlueCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
- func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
- func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
- func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
- type MySQLCheckpointsDB
- func (cpdb *MySQLCheckpointsDB) Close() error
- func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
- func (cpdb *MySQLCheckpointsDB) DumpChunks(ctx context.Context, writer io.Writer) error
- func (cpdb *MySQLCheckpointsDB) DumpEngines(ctx context.Context, writer io.Writer) error
- func (cpdb *MySQLCheckpointsDB) DumpTables(ctx context.Context, writer io.Writer) error
- func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
- func (cpdb *MySQLCheckpointsDB) GetLocalStoringTables(ctx context.Context) (map[string][]int32, error)
- func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
- func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
- func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
- func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
- func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
- func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
- func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
- type NullCheckpointsDB
- func (*NullCheckpointsDB) Close() error
- func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error)
- func (*NullCheckpointsDB) DumpChunks(context.Context, io.Writer) error
- func (*NullCheckpointsDB) DumpEngines(context.Context, io.Writer) error
- func (*NullCheckpointsDB) DumpTables(context.Context, io.Writer) error
- func (*NullCheckpointsDB) Get(_ context.Context, _ string) (*TableCheckpoint, error)
- func (*NullCheckpointsDB) GetLocalStoringTables(context.Context) (map[string][]int32, error)
- func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error
- func (*NullCheckpointsDB) Initialize(context.Context, *config.Config, map[string]*TidbDBInfo) error
- func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error
- func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error
- func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error
- func (*NullCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
- func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff)
- type RebaseCheckpointMerger
- type Session
- type StatusCheckpointMerger
- type TableCheckpoint
- type TableCheckpointDiff
- type TableCheckpointMerger
- type TableCheckpointModel
- func (*TableCheckpointModel) Descriptor() ([]byte, []int)
- func (m *TableCheckpointModel) Marshal() (dAtA []byte, err error)
- func (m *TableCheckpointModel) MarshalTo(dAtA []byte) (int, error)
- func (m *TableCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*TableCheckpointModel) ProtoMessage()
- func (m *TableCheckpointModel) Reset()
- func (m *TableCheckpointModel) Size() (n int)
- func (m *TableCheckpointModel) String() string
- func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error
- func (m *TableCheckpointModel) XXX_DiscardUnknown()
- func (m *TableCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *TableCheckpointModel) XXX_Merge(src proto.Message)
- func (m *TableCheckpointModel) XXX_Size() int
- func (m *TableCheckpointModel) XXX_Unmarshal(b []byte) error
- type TaskCheckpoint
- type TaskCheckpointModel
- func (*TaskCheckpointModel) Descriptor() ([]byte, []int)
- func (m *TaskCheckpointModel) Marshal() (dAtA []byte, err error)
- func (m *TaskCheckpointModel) MarshalTo(dAtA []byte) (int, error)
- func (m *TaskCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
- func (*TaskCheckpointModel) ProtoMessage()
- func (m *TaskCheckpointModel) Reset()
- func (m *TaskCheckpointModel) Size() (n int)
- func (m *TaskCheckpointModel) String() string
- func (m *TaskCheckpointModel) Unmarshal(dAtA []byte) error
- func (m *TaskCheckpointModel) XXX_DiscardUnknown()
- func (m *TaskCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *TaskCheckpointModel) XXX_Merge(src proto.Message)
- func (m *TaskCheckpointModel) XXX_Size() int
- func (m *TaskCheckpointModel) XXX_Unmarshal(b []byte) error
- type TidbDBInfo
- type TidbTableInfo
Constants ¶
View Source
const ( // the table names to store each kind of checkpoint in the checkpoint database // remember to increase the version number in case of incompatible change. CheckpointTableNameTask = "task_v2" CheckpointTableNameTable = "table_v6" CheckpointTableNameEngine = "engine_v5" CheckpointTableNameChunk = "chunk_v5" )
View Source
const ( // shared by MySQLCheckpointsDB and GlueCheckpointsDB CreateDBTemplate = "CREATE DATABASE IF NOT EXISTS %s;" CreateTaskTableTemplate = `` /* 376-byte string literal not displayed */ CreateTableTableTemplate = `` /* 435-byte string literal not displayed */ CreateEngineTableTemplate = `` /* 338-byte string literal not displayed */ CreateChunkTableTemplate = `` /* 827-byte string literal not displayed */ InitTaskTemplate = `` /* 174-byte string literal not displayed */ InitTableTemplate = `` /* 182-byte string literal not displayed */ ReadTaskTemplate = `` /* 139-byte string literal not displayed */ ReadEngineTemplate = ` SELECT engine_id, status FROM %s.%s WHERE table_name = ? ORDER BY engine_id DESC;` ReadChunkTemplate = `` /* 268-byte string literal not displayed */ ReadTableRemainTemplate = ` SELECT status, alloc_base, table_id FROM %s.%s WHERE table_name = ?;` ReplaceEngineTemplate = ` REPLACE INTO %s.%s (table_name, engine_id, status) VALUES (?, ?, ?);` ReplaceChunkTemplate = `` /* 346-byte string literal not displayed */ UpdateChunkTemplate = `` /* 168-byte string literal not displayed */ UpdateTableRebaseTemplate = ` UPDATE %s.%s SET alloc_base = GREATEST(?, alloc_base) WHERE table_name = ?;` UpdateTableStatusTemplate = ` UPDATE %s.%s SET status = ? WHERE table_name = ?;` UpdateEngineTemplate = ` UPDATE %s.%s SET status = ? WHERE (table_name, engine_id) = (?, ?);` DeleteCheckpointRecordTemplate = "DELETE FROM %s.%s WHERE table_name = ?;" )
View Source
const WholeTableEngineID = math.MaxInt32
Variables ¶
Functions ¶
func IsCheckpointTable ¶
func IsCheckpointsDBExists ¶
Types ¶
type CheckpointStatus ¶
type CheckpointStatus uint8
const ( CheckpointStatusMissing CheckpointStatus = 0 CheckpointStatusMaxInvalid CheckpointStatus = 25 CheckpointStatusLoaded CheckpointStatus = 30 CheckpointStatusAllWritten CheckpointStatus = 60 CheckpointStatusClosed CheckpointStatus = 90 CheckpointStatusImported CheckpointStatus = 120 CheckpointStatusIndexImported CheckpointStatus = 140 CheckpointStatusAlteredAutoInc CheckpointStatus = 150 CheckpointStatusChecksumSkipped CheckpointStatus = 170 CheckpointStatusChecksummed CheckpointStatus = 180 CheckpointStatusAnalyzeSkipped CheckpointStatus = 200 CheckpointStatusAnalyzed CheckpointStatus = 210 )
func (CheckpointStatus) MetricName ¶
func (status CheckpointStatus) MetricName() string
type CheckpointsDB ¶
type CheckpointsDB interface { Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error) Get(ctx context.Context, tableName string) (*TableCheckpoint, error) Close() error // InsertEngineCheckpoints initializes the checkpoints related to a table. // It assumes the entire table has not been imported before and will fill in // default values for the column permutations and checksums. InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error Update(checkpointDiffs map[string]*TableCheckpointDiff) RemoveCheckpoint(ctx context.Context, tableName string) error // MoveCheckpoints renames the checkpoint schema to include a suffix // including the taskID (e.g. `tidb_lightning_checkpoints.1234567890.bak`). MoveCheckpoints(ctx context.Context, taskID int64) error // GetLocalStoringTables returns a map containing tables have engine files stored on local disk. // currently only meaningful for local backend GetLocalStoringTables(ctx context.Context) (map[string][]int32, error) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error) DumpTables(ctx context.Context, csv io.Writer) error DumpEngines(ctx context.Context, csv io.Writer) error DumpChunks(ctx context.Context, csv io.Writer) error }
func OpenCheckpointsDB ¶
type CheckpointsModel ¶
type CheckpointsModel struct { // key is table_name Checkpoints map[string]*TableCheckpointModel `` /* 163-byte string literal not displayed */ TaskCheckpoint *TaskCheckpointModel `protobuf:"bytes,2,opt,name=task_checkpoint,json=taskCheckpoint,proto3" json:"task_checkpoint,omitempty"` }
func (*CheckpointsModel) Descriptor ¶
func (*CheckpointsModel) Descriptor() ([]byte, []int)
func (*CheckpointsModel) Marshal ¶
func (m *CheckpointsModel) Marshal() (dAtA []byte, err error)
func (*CheckpointsModel) MarshalToSizedBuffer ¶
func (m *CheckpointsModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*CheckpointsModel) ProtoMessage ¶
func (*CheckpointsModel) ProtoMessage()
func (*CheckpointsModel) Reset ¶
func (m *CheckpointsModel) Reset()
func (*CheckpointsModel) Size ¶
func (m *CheckpointsModel) Size() (n int)
func (*CheckpointsModel) String ¶
func (m *CheckpointsModel) String() string
func (*CheckpointsModel) Unmarshal ¶
func (m *CheckpointsModel) Unmarshal(dAtA []byte) error
func (*CheckpointsModel) XXX_DiscardUnknown ¶
func (m *CheckpointsModel) XXX_DiscardUnknown()
func (*CheckpointsModel) XXX_Marshal ¶
func (m *CheckpointsModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*CheckpointsModel) XXX_Merge ¶
func (m *CheckpointsModel) XXX_Merge(src proto.Message)
func (*CheckpointsModel) XXX_Size ¶
func (m *CheckpointsModel) XXX_Size() int
func (*CheckpointsModel) XXX_Unmarshal ¶
func (m *CheckpointsModel) XXX_Unmarshal(b []byte) error
type ChunkCheckpoint ¶
type ChunkCheckpoint struct { Key ChunkCheckpointKey FileMeta mydump.SourceFileMeta ColumnPermutation []int Chunk mydump.Chunk Checksum verify.KVChecksum Timestamp int64 }
func (*ChunkCheckpoint) DeepCopy ¶
func (ccp *ChunkCheckpoint) DeepCopy() *ChunkCheckpoint
type ChunkCheckpointKey ¶
func (*ChunkCheckpointKey) String ¶
func (key *ChunkCheckpointKey) String() string
type ChunkCheckpointMerger ¶
type ChunkCheckpointMerger struct { EngineID int32 Key ChunkCheckpointKey Checksum verify.KVChecksum Pos int64 RowID int64 ColumnPermutation []int }
func (*ChunkCheckpointMerger) MergeInto ¶
func (merger *ChunkCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)
type ChunkCheckpointModel ¶
type ChunkCheckpointModel struct { Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` Offset int64 `protobuf:"varint,2,opt,name=offset,proto3" json:"offset,omitempty"` ColumnPermutation []int32 `protobuf:"varint,12,rep,packed,name=column_permutation,json=columnPermutation,proto3" json:"column_permutation,omitempty"` EndOffset int64 `protobuf:"varint,5,opt,name=end_offset,json=endOffset,proto3" json:"end_offset,omitempty"` Pos int64 `protobuf:"varint,6,opt,name=pos,proto3" json:"pos,omitempty"` PrevRowidMax int64 `protobuf:"varint,7,opt,name=prev_rowid_max,json=prevRowidMax,proto3" json:"prev_rowid_max,omitempty"` RowidMax int64 `protobuf:"varint,8,opt,name=rowid_max,json=rowidMax,proto3" json:"rowid_max,omitempty"` KvcBytes uint64 `protobuf:"varint,9,opt,name=kvc_bytes,json=kvcBytes,proto3" json:"kvc_bytes,omitempty"` KvcKvs uint64 `protobuf:"varint,10,opt,name=kvc_kvs,json=kvcKvs,proto3" json:"kvc_kvs,omitempty"` KvcChecksum uint64 `protobuf:"fixed64,11,opt,name=kvc_checksum,json=kvcChecksum,proto3" json:"kvc_checksum,omitempty"` Timestamp int64 `protobuf:"fixed64,13,opt,name=timestamp,proto3" json:"timestamp,omitempty"` Type int32 `protobuf:"varint,14,opt,name=type,proto3" json:"type,omitempty"` Compression int32 `protobuf:"varint,15,opt,name=compression,proto3" json:"compression,omitempty"` SortKey string `protobuf:"bytes,16,opt,name=sort_key,json=sortKey,proto3" json:"sort_key,omitempty"` FileSize int64 `protobuf:"varint,17,opt,name=file_size,json=fileSize,proto3" json:"file_size,omitempty"` }
func (*ChunkCheckpointModel) Descriptor ¶
func (*ChunkCheckpointModel) Descriptor() ([]byte, []int)
func (*ChunkCheckpointModel) Marshal ¶
func (m *ChunkCheckpointModel) Marshal() (dAtA []byte, err error)
func (*ChunkCheckpointModel) MarshalTo ¶
func (m *ChunkCheckpointModel) MarshalTo(dAtA []byte) (int, error)
func (*ChunkCheckpointModel) MarshalToSizedBuffer ¶
func (m *ChunkCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*ChunkCheckpointModel) ProtoMessage ¶
func (*ChunkCheckpointModel) ProtoMessage()
func (*ChunkCheckpointModel) Reset ¶
func (m *ChunkCheckpointModel) Reset()
func (*ChunkCheckpointModel) Size ¶
func (m *ChunkCheckpointModel) Size() (n int)
func (*ChunkCheckpointModel) String ¶
func (m *ChunkCheckpointModel) String() string
func (*ChunkCheckpointModel) Unmarshal ¶
func (m *ChunkCheckpointModel) Unmarshal(dAtA []byte) error
func (*ChunkCheckpointModel) XXX_DiscardUnknown ¶
func (m *ChunkCheckpointModel) XXX_DiscardUnknown()
func (*ChunkCheckpointModel) XXX_Marshal ¶
func (m *ChunkCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*ChunkCheckpointModel) XXX_Merge ¶
func (m *ChunkCheckpointModel) XXX_Merge(src proto.Message)
func (*ChunkCheckpointModel) XXX_Size ¶
func (m *ChunkCheckpointModel) XXX_Size() int
func (*ChunkCheckpointModel) XXX_Unmarshal ¶
func (m *ChunkCheckpointModel) XXX_Unmarshal(b []byte) error
type EngineCheckpoint ¶
type EngineCheckpoint struct { Status CheckpointStatus Chunks []*ChunkCheckpoint // a sorted array }
func (*EngineCheckpoint) DeepCopy ¶
func (engine *EngineCheckpoint) DeepCopy() *EngineCheckpoint
type EngineCheckpointModel ¶
type EngineCheckpointModel struct { Status uint32 `protobuf:"varint,1,opt,name=status,proto3" json:"status,omitempty"` // key is "$path:$offset" Chunks map[string]*ChunkCheckpointModel `` /* 153-byte string literal not displayed */ }
func (*EngineCheckpointModel) Descriptor ¶
func (*EngineCheckpointModel) Descriptor() ([]byte, []int)
func (*EngineCheckpointModel) Marshal ¶
func (m *EngineCheckpointModel) Marshal() (dAtA []byte, err error)
func (*EngineCheckpointModel) MarshalTo ¶
func (m *EngineCheckpointModel) MarshalTo(dAtA []byte) (int, error)
func (*EngineCheckpointModel) MarshalToSizedBuffer ¶
func (m *EngineCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*EngineCheckpointModel) ProtoMessage ¶
func (*EngineCheckpointModel) ProtoMessage()
func (*EngineCheckpointModel) Reset ¶
func (m *EngineCheckpointModel) Reset()
func (*EngineCheckpointModel) Size ¶
func (m *EngineCheckpointModel) Size() (n int)
func (*EngineCheckpointModel) String ¶
func (m *EngineCheckpointModel) String() string
func (*EngineCheckpointModel) Unmarshal ¶
func (m *EngineCheckpointModel) Unmarshal(dAtA []byte) error
func (*EngineCheckpointModel) XXX_DiscardUnknown ¶
func (m *EngineCheckpointModel) XXX_DiscardUnknown()
func (*EngineCheckpointModel) XXX_Marshal ¶
func (m *EngineCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*EngineCheckpointModel) XXX_Merge ¶
func (m *EngineCheckpointModel) XXX_Merge(src proto.Message)
func (*EngineCheckpointModel) XXX_Size ¶
func (m *EngineCheckpointModel) XXX_Size() int
func (*EngineCheckpointModel) XXX_Unmarshal ¶
func (m *EngineCheckpointModel) XXX_Unmarshal(b []byte) error
type FileCheckpointsDB ¶
type FileCheckpointsDB struct {
// contains filtered or unexported fields
}
func NewFileCheckpointsDB ¶
func NewFileCheckpointsDB(path string) *FileCheckpointsDB
func (*FileCheckpointsDB) Close ¶
func (cpdb *FileCheckpointsDB) Close() error
func (*FileCheckpointsDB) DestroyErrorCheckpoint ¶
func (cpdb *FileCheckpointsDB) DestroyErrorCheckpoint(_ context.Context, targetTableName string) ([]DestroyedTableCheckpoint, error)
func (*FileCheckpointsDB) DumpChunks ¶
func (*FileCheckpointsDB) DumpEngines ¶
func (*FileCheckpointsDB) DumpTables ¶
func (*FileCheckpointsDB) Get ¶
func (cpdb *FileCheckpointsDB) Get(_ context.Context, tableName string) (*TableCheckpoint, error)
func (*FileCheckpointsDB) GetLocalStoringTables ¶
func (*FileCheckpointsDB) IgnoreErrorCheckpoint ¶
func (cpdb *FileCheckpointsDB) IgnoreErrorCheckpoint(_ context.Context, targetTableName string) error
func (*FileCheckpointsDB) Initialize ¶
func (cpdb *FileCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
func (*FileCheckpointsDB) InsertEngineCheckpoints ¶
func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
func (*FileCheckpointsDB) MoveCheckpoints ¶
func (cpdb *FileCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
func (*FileCheckpointsDB) RemoveCheckpoint ¶
func (cpdb *FileCheckpointsDB) RemoveCheckpoint(_ context.Context, tableName string) error
func (*FileCheckpointsDB) TaskCheckpoint ¶
func (cpdb *FileCheckpointsDB) TaskCheckpoint(_ context.Context) (*TaskCheckpoint, error)
func (*FileCheckpointsDB) Update ¶
func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
type GlueCheckpointsDB ¶
type GlueCheckpointsDB struct {
// contains filtered or unexported fields
}
GlueCheckpointsDB is almost same with MySQLCheckpointsDB, but it uses TiDB's internal data structure which requires a lot to keep same with database/sql. TODO: Encapsulate Begin/Commit/Rollback txn, form SQL with args and query/iter/scan TiDB's RecordSet into a interface to reuse MySQLCheckpointsDB.
func NewGlueCheckpointsDB ¶
func (GlueCheckpointsDB) Close ¶
func (g GlueCheckpointsDB) Close() error
func (GlueCheckpointsDB) DestroyErrorCheckpoint ¶
func (g GlueCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
func (GlueCheckpointsDB) DumpChunks ¶
func (GlueCheckpointsDB) DumpEngines ¶
func (GlueCheckpointsDB) DumpTables ¶
func (GlueCheckpointsDB) Get ¶
func (g GlueCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
func (GlueCheckpointsDB) GetLocalStoringTables ¶
func (GlueCheckpointsDB) IgnoreErrorCheckpoint ¶
func (g GlueCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
func (GlueCheckpointsDB) Initialize ¶
func (g GlueCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
func (GlueCheckpointsDB) InsertEngineCheckpoints ¶
func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpointMap map[int32]*EngineCheckpoint) error
func (GlueCheckpointsDB) MoveCheckpoints ¶
func (g GlueCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
func (GlueCheckpointsDB) RemoveCheckpoint ¶
func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
func (GlueCheckpointsDB) TaskCheckpoint ¶
func (g GlueCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
func (GlueCheckpointsDB) Update ¶
func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
type MySQLCheckpointsDB ¶
type MySQLCheckpointsDB struct {
// contains filtered or unexported fields
}
func NewMySQLCheckpointsDB ¶
func (*MySQLCheckpointsDB) Close ¶
func (cpdb *MySQLCheckpointsDB) Close() error
func (*MySQLCheckpointsDB) DestroyErrorCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) DestroyErrorCheckpoint(ctx context.Context, tableName string) ([]DestroyedTableCheckpoint, error)
func (*MySQLCheckpointsDB) DumpChunks ¶
func (*MySQLCheckpointsDB) DumpEngines ¶
func (*MySQLCheckpointsDB) DumpTables ¶
func (*MySQLCheckpointsDB) Get ¶
func (cpdb *MySQLCheckpointsDB) Get(ctx context.Context, tableName string) (*TableCheckpoint, error)
func (*MySQLCheckpointsDB) GetLocalStoringTables ¶
func (*MySQLCheckpointsDB) IgnoreErrorCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) IgnoreErrorCheckpoint(ctx context.Context, tableName string) error
func (*MySQLCheckpointsDB) Initialize ¶
func (cpdb *MySQLCheckpointsDB) Initialize(ctx context.Context, cfg *config.Config, dbInfo map[string]*TidbDBInfo) error
func (*MySQLCheckpointsDB) InsertEngineCheckpoints ¶
func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
func (*MySQLCheckpointsDB) MoveCheckpoints ¶
func (cpdb *MySQLCheckpointsDB) MoveCheckpoints(ctx context.Context, taskID int64) error
func (*MySQLCheckpointsDB) RemoveCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error
func (*MySQLCheckpointsDB) TaskCheckpoint ¶
func (cpdb *MySQLCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
func (*MySQLCheckpointsDB) Update ¶
func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff)
type NullCheckpointsDB ¶
type NullCheckpointsDB struct{}
NullCheckpointsDB is a checkpoints database with no checkpoints.
func NewNullCheckpointsDB ¶
func NewNullCheckpointsDB() *NullCheckpointsDB
func (*NullCheckpointsDB) Close ¶
func (*NullCheckpointsDB) Close() error
func (*NullCheckpointsDB) DestroyErrorCheckpoint ¶
func (*NullCheckpointsDB) DestroyErrorCheckpoint(context.Context, string) ([]DestroyedTableCheckpoint, error)
func (*NullCheckpointsDB) DumpChunks ¶
func (*NullCheckpointsDB) DumpEngines ¶
func (*NullCheckpointsDB) DumpTables ¶
func (*NullCheckpointsDB) Get ¶
func (*NullCheckpointsDB) Get(_ context.Context, _ string) (*TableCheckpoint, error)
func (*NullCheckpointsDB) GetLocalStoringTables ¶
func (*NullCheckpointsDB) IgnoreErrorCheckpoint ¶
func (*NullCheckpointsDB) IgnoreErrorCheckpoint(context.Context, string) error
func (*NullCheckpointsDB) Initialize ¶
func (*NullCheckpointsDB) Initialize(context.Context, *config.Config, map[string]*TidbDBInfo) error
func (*NullCheckpointsDB) InsertEngineCheckpoints ¶
func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _ map[int32]*EngineCheckpoint) error
func (*NullCheckpointsDB) MoveCheckpoints ¶
func (*NullCheckpointsDB) MoveCheckpoints(context.Context, int64) error
func (*NullCheckpointsDB) RemoveCheckpoint ¶
func (*NullCheckpointsDB) RemoveCheckpoint(context.Context, string) error
func (*NullCheckpointsDB) TaskCheckpoint ¶
func (*NullCheckpointsDB) TaskCheckpoint(ctx context.Context) (*TaskCheckpoint, error)
func (*NullCheckpointsDB) Update ¶
func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff)
type RebaseCheckpointMerger ¶
type RebaseCheckpointMerger struct {
AllocBase int64
}
func (*RebaseCheckpointMerger) MergeInto ¶
func (merger *RebaseCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)
type Session ¶
type Session interface { Close() Execute(context.Context, string) ([]sqlexec.RecordSet, error) CommitTxn(context.Context) error RollbackTxn(context.Context) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*ast.ResultField, err error) ExecutePreparedStmt(ctx context.Context, stmtID uint32, param []types.Datum) (sqlexec.RecordSet, error) DropPreparedStmt(stmtID uint32) error }
type StatusCheckpointMerger ¶
type StatusCheckpointMerger struct { EngineID int32 // WholeTableEngineID == apply to whole table. Status CheckpointStatus }
func (*StatusCheckpointMerger) MergeInto ¶
func (merger *StatusCheckpointMerger) MergeInto(cpd *TableCheckpointDiff)
func (*StatusCheckpointMerger) SetInvalid ¶
func (merger *StatusCheckpointMerger) SetInvalid()
type TableCheckpoint ¶
type TableCheckpoint struct { Status CheckpointStatus AllocBase int64 Engines map[int32]*EngineCheckpoint TableID int64 }
func (*TableCheckpoint) Apply ¶
func (cp *TableCheckpoint) Apply(cpd *TableCheckpointDiff)
Apply the diff to the existing chunk and engine checkpoints in `cp`.
func (*TableCheckpoint) CountChunks ¶
func (cp *TableCheckpoint) CountChunks() int
func (*TableCheckpoint) DeepCopy ¶
func (cp *TableCheckpoint) DeepCopy() *TableCheckpoint
type TableCheckpointDiff ¶
type TableCheckpointDiff struct {
// contains filtered or unexported fields
}
func NewTableCheckpointDiff ¶
func NewTableCheckpointDiff() *TableCheckpointDiff
func (*TableCheckpointDiff) String ¶
func (cpd *TableCheckpointDiff) String() string
type TableCheckpointMerger ¶
type TableCheckpointMerger interface { // MergeInto the table checkpoint diff from a status update or chunk update. // If there are multiple updates to the same table, only the last one will // take effect. Therefore, the caller must ensure events for the same table // are properly ordered by the global time (an old event must be merged // before the new one). MergeInto(cpd *TableCheckpointDiff) }
type TableCheckpointModel ¶
type TableCheckpointModel struct { Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` Status uint32 `protobuf:"varint,3,opt,name=status,proto3" json:"status,omitempty"` AllocBase int64 `protobuf:"varint,4,opt,name=alloc_base,json=allocBase,proto3" json:"alloc_base,omitempty"` Engines map[int32]*EngineCheckpointModel `` /* 158-byte string literal not displayed */ TableID int64 `protobuf:"varint,9,opt,name=tableID,proto3" json:"tableID,omitempty"` }
func (*TableCheckpointModel) Descriptor ¶
func (*TableCheckpointModel) Descriptor() ([]byte, []int)
func (*TableCheckpointModel) Marshal ¶
func (m *TableCheckpointModel) Marshal() (dAtA []byte, err error)
func (*TableCheckpointModel) MarshalTo ¶
func (m *TableCheckpointModel) MarshalTo(dAtA []byte) (int, error)
func (*TableCheckpointModel) MarshalToSizedBuffer ¶
func (m *TableCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*TableCheckpointModel) ProtoMessage ¶
func (*TableCheckpointModel) ProtoMessage()
func (*TableCheckpointModel) Reset ¶
func (m *TableCheckpointModel) Reset()
func (*TableCheckpointModel) Size ¶
func (m *TableCheckpointModel) Size() (n int)
func (*TableCheckpointModel) String ¶
func (m *TableCheckpointModel) String() string
func (*TableCheckpointModel) Unmarshal ¶
func (m *TableCheckpointModel) Unmarshal(dAtA []byte) error
func (*TableCheckpointModel) XXX_DiscardUnknown ¶
func (m *TableCheckpointModel) XXX_DiscardUnknown()
func (*TableCheckpointModel) XXX_Marshal ¶
func (m *TableCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*TableCheckpointModel) XXX_Merge ¶
func (m *TableCheckpointModel) XXX_Merge(src proto.Message)
func (*TableCheckpointModel) XXX_Size ¶
func (m *TableCheckpointModel) XXX_Size() int
func (*TableCheckpointModel) XXX_Unmarshal ¶
func (m *TableCheckpointModel) XXX_Unmarshal(b []byte) error
type TaskCheckpoint ¶
type TaskCheckpointModel ¶
type TaskCheckpointModel struct { TaskId int64 `protobuf:"varint,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` SourceDir string `protobuf:"bytes,2,opt,name=source_dir,json=sourceDir,proto3" json:"source_dir,omitempty"` Backend string `protobuf:"bytes,3,opt,name=backend,proto3" json:"backend,omitempty"` ImporterAddr string `protobuf:"bytes,4,opt,name=importer_addr,json=importerAddr,proto3" json:"importer_addr,omitempty"` TidbHost string `protobuf:"bytes,5,opt,name=tidb_host,json=tidbHost,proto3" json:"tidb_host,omitempty"` TidbPort int32 `protobuf:"varint,6,opt,name=tidb_port,json=tidbPort,proto3" json:"tidb_port,omitempty"` PdAddr string `protobuf:"bytes,7,opt,name=pd_addr,json=pdAddr,proto3" json:"pd_addr,omitempty"` SortedKvDir string `protobuf:"bytes,8,opt,name=sorted_kv_dir,json=sortedKvDir,proto3" json:"sorted_kv_dir,omitempty"` LightningVer string `protobuf:"bytes,9,opt,name=lightning_ver,json=lightningVer,proto3" json:"lightning_ver,omitempty"` }
func (*TaskCheckpointModel) Descriptor ¶
func (*TaskCheckpointModel) Descriptor() ([]byte, []int)
func (*TaskCheckpointModel) Marshal ¶
func (m *TaskCheckpointModel) Marshal() (dAtA []byte, err error)
func (*TaskCheckpointModel) MarshalTo ¶
func (m *TaskCheckpointModel) MarshalTo(dAtA []byte) (int, error)
func (*TaskCheckpointModel) MarshalToSizedBuffer ¶
func (m *TaskCheckpointModel) MarshalToSizedBuffer(dAtA []byte) (int, error)
func (*TaskCheckpointModel) ProtoMessage ¶
func (*TaskCheckpointModel) ProtoMessage()
func (*TaskCheckpointModel) Reset ¶
func (m *TaskCheckpointModel) Reset()
func (*TaskCheckpointModel) Size ¶
func (m *TaskCheckpointModel) Size() (n int)
func (*TaskCheckpointModel) String ¶
func (m *TaskCheckpointModel) String() string
func (*TaskCheckpointModel) Unmarshal ¶
func (m *TaskCheckpointModel) Unmarshal(dAtA []byte) error
func (*TaskCheckpointModel) XXX_DiscardUnknown ¶
func (m *TaskCheckpointModel) XXX_DiscardUnknown()
func (*TaskCheckpointModel) XXX_Marshal ¶
func (m *TaskCheckpointModel) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*TaskCheckpointModel) XXX_Merge ¶
func (m *TaskCheckpointModel) XXX_Merge(src proto.Message)
func (*TaskCheckpointModel) XXX_Size ¶
func (m *TaskCheckpointModel) XXX_Size() int
func (*TaskCheckpointModel) XXX_Unmarshal ¶
func (m *TaskCheckpointModel) XXX_Unmarshal(b []byte) error
type TidbDBInfo ¶
type TidbDBInfo struct { Name string Tables map[string]*TidbTableInfo }
Click to show internal directories.
Click to hide internal directories.