source

package
v5.4.3+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 29, 2022 License: Apache-2.0 Imports: 22 Imported by: 5

Documentation

Index

Constants

View Source
const (
	ShieldDBName      = "_no__exists__db_"
	ShieldTableName   = "_no__exists__table_"
	GetSyncPointQuery = "SELECT primary_ts, secondary_ts FROM tidb_cdc.syncpoint_v1 ORDER BY primary_ts DESC LIMIT 1"
)

Variables

This section is empty.

Functions

func NewSources

func NewSources(ctx context.Context, cfg *config.Config) (downstream Source, upstream Source, err error)

Types

type ChecksumInfo

type ChecksumInfo struct {
	Checksum int64
	Count    int64
	Err      error
	Cost     time.Duration
}

type ChunksIterator

type ChunksIterator struct {
	ID *chunk.ChunkID

	TableDiffs []*common.TableDiff
	// contains filtered or unexported fields
}

ChunksIterator is used for single mysql/tidb source.

func NewChunksIterator

func NewChunksIterator(ctx context.Context, analyzer TableAnalyzer, tableDiffs []*common.TableDiff, startRange *splitter.RangeInfo, splitThreadCount int) (*ChunksIterator, error)

func (*ChunksIterator) Close

func (t *ChunksIterator) Close()

func (*ChunksIterator) Next

type DMLType

type DMLType int32
const (
	Insert DMLType = iota + 1
	Delete
	Replace
)

type MultiSourceRowsIterator

type MultiSourceRowsIterator struct {
	// contains filtered or unexported fields
}

func (*MultiSourceRowsIterator) Close

func (ms *MultiSourceRowsIterator) Close()

func (*MultiSourceRowsIterator) Next

type MySQLSources

type MySQLSources struct {
	// contains filtered or unexported fields
}

func (*MySQLSources) Close

func (s *MySQLSources) Close()

func (*MySQLSources) GenerateFixSQL

func (s *MySQLSources) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, tableIndex int) string

func (*MySQLSources) GetCountAndCrc32

func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo

func (*MySQLSources) GetDB

func (s *MySQLSources) GetDB() *sql.DB

func (*MySQLSources) GetRangeIterator

func (s *MySQLSources) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, splitThreadCount int) (RangeIterator, error)

func (*MySQLSources) GetRowsIterator

func (s *MySQLSources) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)

func (*MySQLSources) GetSnapshot

func (s *MySQLSources) GetSnapshot() string

func (*MySQLSources) GetSourceStructInfo

func (s *MySQLSources) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error)

func (*MySQLSources) GetTableAnalyzer

func (s *MySQLSources) GetTableAnalyzer() TableAnalyzer

func (*MySQLSources) GetTables

func (s *MySQLSources) GetTables() []*common.TableDiff

type MySQLTableAnalyzer

type MySQLTableAnalyzer struct {
	// contains filtered or unexported fields
}

func (*MySQLTableAnalyzer) AnalyzeSplitter

func (a *MySQLTableAnalyzer) AnalyzeSplitter(ctx context.Context, table *common.TableDiff, startRange *splitter.RangeInfo) (splitter.ChunkIterator, error)

type RangeIterator

type RangeIterator interface {
	// Next seeks the next chunk, return nil if seeks to end.
	Next(context.Context) (*splitter.RangeInfo, error)

	Close()
}

RangeIterator generate next chunk for the whole tables lazily.

type RowDataIterator

type RowDataIterator interface {
	// Next seeks the next row data, it used when compared rows.
	Next() (map[string]*dbutil.ColumnData, error)
	// Close release the resource.
	Close()
}

RowDataIterator represents the row data in source.

type Source

type Source interface {
	// GetTableAnalyzer pick the proper analyzer for different source.
	// the implement of this function is different in mysql/tidb.
	GetTableAnalyzer() TableAnalyzer

	// GetRangeIterator generates the range iterator with the checkpoint(*splitter.RangeInfo) and analyzer.
	// this is the mainly iterator across the whole sync diff.
	// One source has one range iterator to produce the range to channel.
	// there are many workers consume the range from the channel to compare.
	GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error)

	// GetCountAndCrc32 gets the crc32 result and the count from given range.
	GetCountAndCrc32(context.Context, *splitter.RangeInfo) *ChecksumInfo

	// GetRowsIterator gets the row data iterator from given range.
	GetRowsIterator(context.Context, *splitter.RangeInfo) (RowDataIterator, error)

	// GenerateFixSQL generates the fix sql with given type.
	GenerateFixSQL(DMLType, map[string]*dbutil.ColumnData, map[string]*dbutil.ColumnData, int) string

	// GetTables represents the tableDiffs.
	GetTables() []*common.TableDiff

	// GetSourceStructInfo get the source table info from a given target table
	GetSourceStructInfo(context.Context, int) ([]*model.TableInfo, error)

	// GetDB represents the db connection.
	GetDB() *sql.DB

	// GetSnapshot represents the snapshot of source.
	// only TiDB source has the snapshot.
	// TODO refine the interface.
	GetSnapshot() string

	// Close ...
	Close()
}

func NewMySQLSources

func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*config.DataSource, threadCount int, f tableFilter.Filter) (Source, error)

func NewTiDBSource

func NewTiDBSource(ctx context.Context, tableDiffs []*common.TableDiff, ds *config.DataSource, bucketSpliterPool *utils.WorkerPool, f tableFilter.Filter) (Source, error)

type TableAnalyzer

type TableAnalyzer interface {
	// AnalyzeSplitter picks the proper splitter.ChunkIterator according to table and source.
	AnalyzeSplitter(context.Context, *common.TableDiff, *splitter.RangeInfo) (splitter.ChunkIterator, error)
}

TableAnalyzer represents the method in different source. each source has its own analyze function.

type TiDBRowsIterator

type TiDBRowsIterator struct {
	// contains filtered or unexported fields
}

func (*TiDBRowsIterator) Close

func (s *TiDBRowsIterator) Close()

func (*TiDBRowsIterator) Next

func (s *TiDBRowsIterator) Next() (map[string]*dbutil.ColumnData, error)

type TiDBSource

type TiDBSource struct {
	// contains filtered or unexported fields
}

func (*TiDBSource) Close

func (s *TiDBSource) Close()

func (*TiDBSource) GenerateFixSQL

func (s *TiDBSource) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, tableIndex int) string

func (*TiDBSource) GetCountAndCrc32

func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo

func (*TiDBSource) GetDB

func (s *TiDBSource) GetDB() *sql.DB

func (*TiDBSource) GetRangeIterator

func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo, analyzer TableAnalyzer, splitThreadCount int) (RangeIterator, error)

func (*TiDBSource) GetRowsIterator

func (s *TiDBSource) GetRowsIterator(ctx context.Context, tableRange *splitter.RangeInfo) (RowDataIterator, error)

func (*TiDBSource) GetSnapshot

func (s *TiDBSource) GetSnapshot() string

func (*TiDBSource) GetSourceStructInfo

func (s *TiDBSource) GetSourceStructInfo(ctx context.Context, tableIndex int) ([]*model.TableInfo, error)

func (*TiDBSource) GetTableAnalyzer

func (s *TiDBSource) GetTableAnalyzer() TableAnalyzer

func (*TiDBSource) GetTables

func (s *TiDBSource) GetTables() []*common.TableDiff

type TiDBTableAnalyzer

type TiDBTableAnalyzer struct {
	// contains filtered or unexported fields
}

func (*TiDBTableAnalyzer) AnalyzeSplitter

func (a *TiDBTableAnalyzer) AnalyzeSplitter(ctx context.Context, table *common.TableDiff, startRange *splitter.RangeInfo) (splitter.ChunkIterator, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL