Documentation
¶
Index ¶
- Variables
- func DetectScanColumn(sourceDB *sql.DB, dbName string, tableName string, maxFullDumpRowsCount int) (string, error)
- func FindMaxMinValueFromDB(db *sql.DB, dbName string, tableName string, scanColumn string) (interface{}, interface{})
- func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
- func GetTableColumnTypes(db *sql.DB, schema string, table string) ([]*sql.ColumnType, error)
- func NewCloseInputStreamMsg(sourceTableDef *schema_store.Table) *core.Msg
- func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg
- func NewMsg(rowPtrs []interface{}, columnTypes []*sql.ColumnType, ...) *core.Msg
- func String2Val(s string, scanType string) interface{}
- type PluginConfig
- type TableConfig
- type TableScanner
- func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
- func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
- func (tableScanner *TableScanner) InitTablePosition(tableDef *schema_store.Table, tableConfig *TableConfig) error
- func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, ...)
- func (tableScanner *TableScanner) Start() error
- func (tableScanner *TableScanner) Wait()
- type TableWork
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrTableEmpty = errors.New("table_scanner: this table is empty")
Functions ¶
func DetectScanColumn ¶
func DetectScanColumn(sourceDB *sql.DB, dbName string, tableName string, maxFullDumpRowsCount int) (string, error)
DetectScanColumn find a column that we used to scan the table SHOW INDEX FROM .. Pick primary key, if there is only one primary key If pk not found try using unique index fail
func FindMaxMinValueFromDB ¶
func GetScanIdx ¶
func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
func GetTableColumnTypes ¶
func NewCloseInputStreamMsg ¶
func NewCloseInputStreamMsg(sourceTableDef *schema_store.Table) *core.Msg
func NewCreateTableMsg ¶
func NewMsg ¶
func NewMsg( rowPtrs []interface{}, columnTypes []*sql.ColumnType, sourceTableDef *schema_store.Table, callbackFunc core.AfterMsgCommitFunc, position position_store.MySQLTablePosition) *core.Msg
NewMsg creates a job, it converts sql.NullString to interface{} based on the column type. If the column type is time, then we parse the time
func String2Val ¶
Types ¶
type PluginConfig ¶
type PluginConfig struct { Source *utils.DBConfig `mapstructure:"source" toml:"source" json:"source"` // keep same with mysql binlog config to make most cases simple SourceSlave *utils.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"` TableConfigs []TableConfig `mapstructure:"table-configs"json:"table-configs"` NrScanner int `mapstructure:"nr-scanner" toml:"nr-scanner" json:"nr-scanner"` TableScanBatch int `mapstructure:"table-scan-batch" toml:"table-scan-batch" json:"table-scan-batch"` MaxFullDumpCount int `mapstructure:"max-full-dump-count" toml:"max-full-dump-count" json:"max-full-dump-count"` BatchPerSecondLimit int `mapstructure:"batch-per-second-limit" toml:"batch-per-second-limit" json:"batch-per-second-limit"` }
func (*PluginConfig) ValidateAndSetDefault ¶
func (cfg *PluginConfig) ValidateAndSetDefault() error
type TableConfig ¶
type TableConfig struct { Schema string `mapstructure:"schema" toml:"schema" json:"schema"` Table []string `mapstructure:"table" toml:"table" json:"table"` }
func GetTables ¶
func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)
GetTables returns a list of table definition based on the schema, table name patterns We only support single sourceDB for now.
type TableScanner ¶
type TableScanner struct {
// contains filtered or unexported fields
}
func NewTableScanner ¶
func NewTableScanner( pipelineName string, tableWorkC chan *TableWork, db *sql.DB, positionStore position_store.MySQLTablePositionStore, emitter core.Emitter, throttle *time.Ticker, schemaStore schema_store.SchemaStore, cfg *PluginConfig, ctx context.Context) *TableScanner
func (*TableScanner) AfterMsgCommit ¶
func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
func (*TableScanner) FindAll ¶
func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
func (*TableScanner) InitTablePosition ¶
func (tableScanner *TableScanner) InitTablePosition(tableDef *schema_store.Table, tableConfig *TableConfig) error
func (*TableScanner) LoopInBatch ¶
func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, scanColumn string, max position_store.MySQLTablePosition, min position_store.MySQLTablePosition, batch int)
LoopInBatch will iterate the table by sql like this: SELECT * FROM a WHERE some_key > some_value LIMIT 10000 It will get the min, max value of the column and iterate batch by batch
func (*TableScanner) Start ¶
func (tableScanner *TableScanner) Start() error
func (*TableScanner) Wait ¶
func (tableScanner *TableScanner) Wait()
type TableWork ¶
type TableWork struct { TableDef *schema_store.Table TableConfig *TableConfig }
Click to show internal directories.
Click to hide internal directories.