Documentation
¶
Index ¶
- func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, upstream Source, err error)
- type ChecksumInfo
- type ChunksIterator
- type DMLType
- type MultiSourceRowsIterator
- type MySQLSources
- func (s *MySQLSources) Close()
- func (s *MySQLSources) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, ...) string
- func (s *MySQLSources) GetCountAndMD5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo
- func (s *MySQLSources) GetCountForLackTable(ctx context.Context, tableRange *splitter.RangeInfo) int64
- func (s *MySQLSources) GetDB() *sql.DB
- func (s *MySQLSources) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, ...) (RangeIterator, error)
- func (s *MySQLSources) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)
- func (s *MySQLSources) GetSnapshot() string
- func (s *MySQLSources) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error)
- func (s *MySQLSources) GetTableAnalyzer() TableAnalyzer
- func (s *MySQLSources) GetTables() []*common.TableDiff
- type MySQLTableAnalyzer
- type RangeIterator
- type RowDataIterator
- type Source
- type TableAnalyzer
- type TiDBRowsIterator
- type TiDBSource
- func (s *TiDBSource) Close()
- func (s *TiDBSource) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, ...) string
- func (s *TiDBSource) GetCountAndMD5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo
- func (s *TiDBSource) GetCountForLackTable(ctx context.Context, tableRange *splitter.RangeInfo) int64
- func (s *TiDBSource) GetDB() *sql.DB
- func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, ...) (RangeIterator, error)
- func (s *TiDBSource) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)
- func (s *TiDBSource) GetSnapshot() string
- func (s *TiDBSource) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error)
- func (s *TiDBSource) GetTableAnalyzer() TableAnalyzer
- func (s *TiDBSource) GetTables() []*common.TableDiff
- type TiDBTableAnalyzer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ChecksumInfo ¶
ChecksumInfo stores checksum and count
type ChunksIterator ¶
type ChunksIterator struct { ID *chunk.CID TableDiffs []*common.TableDiff // contains filtered or unexported fields }
ChunksIterator is used for single mysql/tidb source.
func NewChunksIterator ¶
func NewChunksIterator( ctx context.Context, analyzer TableAnalyzer, tableDiffs []*common.TableDiff, startRange *splitter.RangeInfo, splitThreadCount int, ) (*ChunksIterator, error)
NewChunksIterator returns a new iterator
type MultiSourceRowsIterator ¶
type MultiSourceRowsIterator struct {
// contains filtered or unexported fields
}
MultiSourceRowsIterator is used to iterate rows from multi source
func (*MultiSourceRowsIterator) Close ¶
func (ms *MultiSourceRowsIterator) Close()
Close return all sources
func (*MultiSourceRowsIterator) Next ¶
func (ms *MultiSourceRowsIterator) Next() (map[string]*dbutil.ColumnData, error)
Next return the next row
type MySQLSources ¶
type MySQLSources struct {
// contains filtered or unexported fields
}
MySQLSources represent one table in MySQL
func (*MySQLSources) GenerateFixSQL ¶
func (s *MySQLSources) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, tableIndex int) string
GenerateFixSQL generate SQL
func (*MySQLSources) GetCountAndMD5 ¶
func (s *MySQLSources) GetCountAndMD5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo
GetCountAndMD5 return count and checksum
func (*MySQLSources) GetCountForLackTable ¶
func (s *MySQLSources) GetCountForLackTable(ctx context.Context, tableRange *splitter.RangeInfo) int64
GetCountForLackTable return count for lack table
func (*MySQLSources) GetRangeIterator ¶
func (s *MySQLSources) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, splitThreadCount int) (RangeIterator, error)
GetRangeIterator get range iterator
func (*MySQLSources) GetRowsIterator ¶
func (s *MySQLSources) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)
GetRowsIterator get iterator for current table
func (*MySQLSources) GetSnapshot ¶
func (s *MySQLSources) GetSnapshot() string
GetSnapshot get the current snapshot
func (*MySQLSources) GetSourceStructInfo ¶
func (s *MySQLSources) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error)
GetSourceStructInfo get the current table info
func (*MySQLSources) GetTableAnalyzer ¶
func (s *MySQLSources) GetTableAnalyzer() TableAnalyzer
GetTableAnalyzer get analyzer for current table
func (*MySQLSources) GetTables ¶
func (s *MySQLSources) GetTables() []*common.TableDiff
GetTables return all tables
type MySQLTableAnalyzer ¶
type MySQLTableAnalyzer struct {
// contains filtered or unexported fields
}
MySQLTableAnalyzer is used to analyze MySQL table
func (*MySQLTableAnalyzer) AnalyzeSplitter ¶
func (a *MySQLTableAnalyzer) AnalyzeSplitter(ctx context.Context, table *common.TableDiff, startRange *splitter.RangeInfo) (splitter.ChunkIterator, error)
AnalyzeSplitter return an iterator for current table
type RangeIterator ¶
type RangeIterator interface { // Next seeks the next chunk, return nil if seeks to end. Next(context.Context) (*splitter.RangeInfo, error) Close() }
RangeIterator generate next chunk for the whole tables lazily.
type RowDataIterator ¶
type RowDataIterator interface { // Next seeks the next row data, it used when compared rows. Next() (map[string]*dbutil.ColumnData, error) // Close release the resource. Close() }
RowDataIterator represents the row data in source.
type Source ¶
type Source interface { // GetTableAnalyzer pick the proper analyzer for different source. // the implement of this function is different in mysql/tidb. GetTableAnalyzer() TableAnalyzer // GetRangeIterator generates the range iterator with the checkpoint(*splitter.RangeInfo) and analyzer. // this is the mainly iterator across the whole sync diff. // One source has one range iterator to produce the range to channel. // there are many workers consume the range from the channel to compare. GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error) // GetCountAndMD5 gets the md5 result and the count from given range. GetCountAndMD5(context.Context, *splitter.RangeInfo) *ChecksumInfo // GetCountForLackTable gets the count for tables that don't exist upstream or downstream. GetCountForLackTable(context.Context, *splitter.RangeInfo) int64 // GetRowsIterator gets the row data iterator from given range. GetRowsIterator(context.Context, *splitter.RangeInfo) (RowDataIterator, error) // GenerateFixSQL generates the fix sql with given type. GenerateFixSQL(DMLType, map[string]*dbutil.ColumnData, map[string]*dbutil.ColumnData, int) string // GetTables represents the tableDiffs. GetTables() []*common.TableDiff // GetSourceStructInfo get the source table info from a given target table GetSourceStructInfo(context.Context, int) ([]*model.TableInfo, error) // GetDB represents the db connection. GetDB() *sql.DB // GetSnapshot represents the snapshot of source. // only TiDB source has the snapshot. // TODO refine the interface. GetSnapshot() string // Close ... Close() }
Source is the interface for table
func NewMySQLSources ¶
func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*config.DataSource, threadCount int, f tableFilter.Filter, skipNonExistingTable bool) (Source, error)
NewMySQLSources return sources for MySQL tables
func NewTiDBSource ¶
func NewTiDBSource( ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, bucketSpliterPool *utils.WorkerPool, f tableFilter.Filter, skipNonExistingTable bool, ) (Source, error)
NewTiDBSource return a new TiDB source
type TableAnalyzer ¶
type TableAnalyzer interface { // AnalyzeSplitter picks the proper splitter.ChunkIterator according to table and source. AnalyzeSplitter(context.Context, *common.TableDiff, *splitter.RangeInfo) (splitter.ChunkIterator, error) }
TableAnalyzer represents the method in different source. each source has its own analyze function.
type TiDBRowsIterator ¶
type TiDBRowsIterator struct {
// contains filtered or unexported fields
}
TiDBRowsIterator is used to iterate rows in TiDB
func (*TiDBRowsIterator) Next ¶
func (s *TiDBRowsIterator) Next() (map[string]*dbutil.ColumnData, error)
Next gets the next row
type TiDBSource ¶
type TiDBSource struct {
// contains filtered or unexported fields
}
TiDBSource represents the table in TiDB
func (*TiDBSource) GenerateFixSQL ¶
func (s *TiDBSource) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, tableIndex int) string
GenerateFixSQL generate SQL
func (*TiDBSource) GetCountAndMD5 ¶
func (s *TiDBSource) GetCountAndMD5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo
GetCountAndMD5 returns the checksum info
func (*TiDBSource) GetCountForLackTable ¶
func (s *TiDBSource) GetCountForLackTable(ctx context.Context, tableRange *splitter.RangeInfo) int64
GetCountForLackTable returns count for lack table
func (*TiDBSource) GetRangeIterator ¶
func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, splitThreadCount int) (RangeIterator, error)
GetRangeIterator returns a new iterator for TiDB table
func (*TiDBSource) GetRowsIterator ¶
func (s *TiDBSource) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)
GetRowsIterator returns a new iterator
func (*TiDBSource) GetSnapshot ¶
func (s *TiDBSource) GetSnapshot() string
GetSnapshot get the current snapshot
func (*TiDBSource) GetSourceStructInfo ¶
func (s *TiDBSource) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error)
GetSourceStructInfo get the table info
func (*TiDBSource) GetTableAnalyzer ¶
func (s *TiDBSource) GetTableAnalyzer() TableAnalyzer
GetTableAnalyzer gets the analyzer for current source
func (*TiDBSource) GetTables ¶
func (s *TiDBSource) GetTables() []*common.TableDiff
GetTables returns all tables
type TiDBTableAnalyzer ¶
type TiDBTableAnalyzer struct {
// contains filtered or unexported fields
}
TiDBTableAnalyzer is used to analyze table
func (*TiDBTableAnalyzer) AnalyzeSplitter ¶
func (a *TiDBTableAnalyzer) AnalyzeSplitter(ctx context.Context, table *common.TableDiff, startRange *splitter.RangeInfo) (splitter.ChunkIterator, error)
AnalyzeSplitter returns a new iterator for TiDB table