Documentation ¶
Index ¶
- Constants
- func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, upstream Source, err error)
- type ChecksumInfo
- type ChunksIterator
- type DMLType
- type MultiSourceRowsIterator
- type MySQLSources
- func (s *MySQLSources) Close()
- func (s *MySQLSources) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, ...) string
- func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo
- func (s *MySQLSources) GetCountForLackTable(ctx context.Context, tableRange *splitter.RangeInfo) int64
- func (s *MySQLSources) GetDB() *sql.DB
- func (s *MySQLSources) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, ...) (RangeIterator, error)
- func (s *MySQLSources) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)
- func (s *MySQLSources) GetSnapshot() string
- func (s *MySQLSources) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error)
- func (s *MySQLSources) GetTableAnalyzer() TableAnalyzer
- func (s *MySQLSources) GetTables() []*common.TableDiff
- type MySQLTableAnalyzer
- type RangeIterator
- type RowDataIterator
- type Source
- type TableAnalyzer
- type TiDBRowsIterator
- type TiDBSource
- func (s *TiDBSource) Close()
- func (s *TiDBSource) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, ...) string
- func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo
- func (s *TiDBSource) GetCountForLackTable(ctx context.Context, tableRange *splitter.RangeInfo) int64
- func (s *TiDBSource) GetDB() *sql.DB
- func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, ...) (RangeIterator, error)
- func (s *TiDBSource) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)
- func (s *TiDBSource) GetSnapshot() string
- func (s *TiDBSource) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error)
- func (s *TiDBSource) GetTableAnalyzer() TableAnalyzer
- func (s *TiDBSource) GetTables() []*common.TableDiff
- type TiDBTableAnalyzer
Constants ¶
View Source
const ( ShieldDBName = "_no__exists__db_" ShieldTableName = "_no__exists__table_" GetSyncPointQuery = "SELECT primary_ts, secondary_ts FROM tidb_cdc.syncpoint_v1 ORDER BY primary_ts DESC LIMIT 1" )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ChecksumInfo ¶
type ChunksIterator ¶
type ChunksIterator struct { ID *chunk.ChunkID TableDiffs []*common.TableDiff // contains filtered or unexported fields }
ChunksIterator is used for single mysql/tidb source.
func NewChunksIterator ¶
func NewChunksIterator(ctx context.Context, analyzer TableAnalyzer, tableDiffs []*common.TableDiff, startRange *splitter.RangeInfo, splitThreadCount int) (*ChunksIterator, error)
func (*ChunksIterator) Close ¶
func (t *ChunksIterator) Close()
type MultiSourceRowsIterator ¶
type MultiSourceRowsIterator struct {
// contains filtered or unexported fields
}
func (*MultiSourceRowsIterator) Close ¶
func (ms *MultiSourceRowsIterator) Close()
func (*MultiSourceRowsIterator) Next ¶
func (ms *MultiSourceRowsIterator) Next() (map[string]*dbutil.ColumnData, error)
type MySQLSources ¶
type MySQLSources struct {
// contains filtered or unexported fields
}
func (*MySQLSources) Close ¶
func (s *MySQLSources) Close()
func (*MySQLSources) GenerateFixSQL ¶
func (s *MySQLSources) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, tableIndex int) string
func (*MySQLSources) GetCountAndCrc32 ¶
func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo
func (*MySQLSources) GetCountForLackTable ¶
func (*MySQLSources) GetDB ¶
func (s *MySQLSources) GetDB() *sql.DB
func (*MySQLSources) GetRangeIterator ¶
func (s *MySQLSources) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, splitThreadCount int) (RangeIterator, error)
func (*MySQLSources) GetRowsIterator ¶
func (s *MySQLSources) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)
func (*MySQLSources) GetSnapshot ¶
func (s *MySQLSources) GetSnapshot() string
func (*MySQLSources) GetSourceStructInfo ¶
func (*MySQLSources) GetTableAnalyzer ¶
func (s *MySQLSources) GetTableAnalyzer() TableAnalyzer
func (*MySQLSources) GetTables ¶
func (s *MySQLSources) GetTables() []*common.TableDiff
type MySQLTableAnalyzer ¶
type MySQLTableAnalyzer struct {
// contains filtered or unexported fields
}
func (*MySQLTableAnalyzer) AnalyzeSplitter ¶
func (a *MySQLTableAnalyzer) AnalyzeSplitter(ctx context.Context, table *common.TableDiff, startRange *splitter.RangeInfo) (splitter.ChunkIterator, error)
type RangeIterator ¶
type RangeIterator interface { // Next seeks the next chunk, return nil if seeks to end. Next(context.Context) (*splitter.RangeInfo, error) Close() }
RangeIterator generate next chunk for the whole tables lazily.
type RowDataIterator ¶
type RowDataIterator interface { // Next seeks the next row data, it used when compared rows. Next() (map[string]*dbutil.ColumnData, error) // Close release the resource. Close() }
RowDataIterator represents the row data in source.
type Source ¶
type Source interface { // GetTableAnalyzer pick the proper analyzer for different source. // the implement of this function is different in mysql/tidb. GetTableAnalyzer() TableAnalyzer // GetRangeIterator generates the range iterator with the checkpoint(*splitter.RangeInfo) and analyzer. // this is the mainly iterator across the whole sync diff. // One source has one range iterator to produce the range to channel. // there are many workers consume the range from the channel to compare. GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error) // GetCountAndCrc32 gets the crc32 result and the count from given range. GetCountAndCrc32(context.Context, *splitter.RangeInfo) *ChecksumInfo // GetCountForLackTable gets the count for tables that don't exist upstream or downstream. GetCountForLackTable(context.Context, *splitter.RangeInfo) int64 // GetRowsIterator gets the row data iterator from given range. GetRowsIterator(context.Context, *splitter.RangeInfo) (RowDataIterator, error) // GenerateFixSQL generates the fix sql with given type. GenerateFixSQL(DMLType, map[string]*dbutil.ColumnData, map[string]*dbutil.ColumnData, int) string // GetTables represents the tableDiffs. GetTables() []*common.TableDiff // GetSourceStructInfo get the source table info from a given target table GetSourceStructInfo(context.Context, int) ([]*model.TableInfo, error) // GetDB represents the db connection. GetDB() *sql.DB // GetSnapshot represents the snapshot of source. // only TiDB source has the snapshot. // TODO refine the interface. GetSnapshot() string // Close ... Close() }
func NewMySQLSources ¶
func NewTiDBSource ¶
func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, bucketSpliterPool *utils.WorkerPool, f tableFilter.Filter, skipNonExistingTable bool) (Source, error)
type TableAnalyzer ¶
type TableAnalyzer interface { // AnalyzeSplitter picks the proper splitter.ChunkIterator according to table and source. AnalyzeSplitter(context.Context, *common.TableDiff, *splitter.RangeInfo) (splitter.ChunkIterator, error) }
TableAnalyzer represents the method in different source. each source has its own analyze function.
type TiDBRowsIterator ¶
type TiDBRowsIterator struct {
// contains filtered or unexported fields
}
func (*TiDBRowsIterator) Close ¶
func (s *TiDBRowsIterator) Close()
func (*TiDBRowsIterator) Next ¶
func (s *TiDBRowsIterator) Next() (map[string]*dbutil.ColumnData, error)
type TiDBSource ¶
type TiDBSource struct {
// contains filtered or unexported fields
}
func (*TiDBSource) Close ¶
func (s *TiDBSource) Close()
func (*TiDBSource) GenerateFixSQL ¶
func (s *TiDBSource) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, tableIndex int) string
func (*TiDBSource) GetCountAndCrc32 ¶
func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo
func (*TiDBSource) GetCountForLackTable ¶
func (*TiDBSource) GetDB ¶
func (s *TiDBSource) GetDB() *sql.DB
func (*TiDBSource) GetRangeIterator ¶
func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, splitThreadCount int) (RangeIterator, error)
func (*TiDBSource) GetRowsIterator ¶
func (s *TiDBSource) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)
func (*TiDBSource) GetSnapshot ¶
func (s *TiDBSource) GetSnapshot() string
func (*TiDBSource) GetSourceStructInfo ¶
func (*TiDBSource) GetTableAnalyzer ¶
func (s *TiDBSource) GetTableAnalyzer() TableAnalyzer
func (*TiDBSource) GetTables ¶
func (s *TiDBSource) GetTables() []*common.TableDiff
type TiDBTableAnalyzer ¶
type TiDBTableAnalyzer struct {
// contains filtered or unexported fields
}
func (*TiDBTableAnalyzer) AnalyzeSplitter ¶
func (a *TiDBTableAnalyzer) AnalyzeSplitter(ctx context.Context, table *common.TableDiff, startRange *splitter.RangeInfo) (splitter.ChunkIterator, error)
Click to show internal directories.
Click to hide internal directories.