Documentation ¶
Index ¶
- Constants
- func DetectScanColumn(sourceDB *sql.DB, dbName string, tableName string, maxFullDumpRowsCount int) (string, error)
- func FindMaxMinValueFromDB(db *sql.DB, dbName string, tableName string, scanColumn string) (interface{}, interface{})
- func GetMaxMin(cache position_store.PositionCacheInterface, fullTableName string) (*TablePosition, *TablePosition, bool, error)
- func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
- func GetStartBinlog(cache position_store.PositionCacheInterface) (*utils.MySQLBinlogPosition, error)
- func GetTableColumnTypes(db *sql.DB, schema string, table string) ([]*sql.ColumnType, error)
- func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg
- func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg
- func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg
- func NewMsg(rowPtrs []interface{}, columnTypes []*sql.ColumnType, ...) *core.Msg
- func PutCurrentPos(cache position_store.PositionCacheInterface, fullTableName string, ...) error
- func PutMaxMin(cache position_store.PositionCacheInterface, fullTableName string, ...) error
- func Serialize(positions *BatchPositionValue) (string, error)
- func SetupInitialPosition(cache position_store.PositionCacheInterface, sourceDB *sql.DB) error
- func String2Val(s string, scanType string) interface{}
- type BatchPositionValue
- type PluginConfig
- type TableConfig
- type TablePosition
- type TableScanner
- func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
- func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
- func (tableScanner *TableScanner) InitTablePosition(tableDef *schema_store.Table, tableConfig *TableConfig, scanColumn string) error
- func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, ...)
- func (tableScanner *TableScanner) Start() error
- func (tableScanner *TableScanner) Wait()
- type TableWork
Constants ¶
View Source
const ( Unknown = "*" PlainString = "string" PlainInt = "int" PlainBytes = "bytes" PlainTime = "time" SQLNullInt64 = "sqlNullInt64" SQLNullString = "sqlNullString" SQLNullBool = "sqlNullBool" SQLNullTime = "sqlNullTime" SQLRawBytes = "sqlRawBytes" )
Variables ¶
This section is empty.
Functions ¶
func DetectScanColumn ¶
func DetectScanColumn(sourceDB *sql.DB, dbName string, tableName string, maxFullDumpRowsCount int) (string, error)
DetectScanColumn find a column that we used to scan the table SHOW INDEX FROM .. Pick primary key, if there is only one primary key If pk not found try using unique index fail
func FindMaxMinValueFromDB ¶
func GetMaxMin ¶ added in v0.9.17
func GetMaxMin(cache position_store.PositionCacheInterface, fullTableName string) (*TablePosition, *TablePosition, bool, error)
func GetScanIdx ¶
func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
func GetStartBinlog ¶ added in v0.9.17
func GetStartBinlog(cache position_store.PositionCacheInterface) (*utils.MySQLBinlogPosition, error)
func GetTableColumnTypes ¶
func NewBarrierMsg ¶ added in v0.9.17
func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg
func NewCloseInputStreamMsg ¶
func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg
func NewCreateTableMsg ¶
func NewMsg ¶
func NewMsg( rowPtrs []interface{}, columnTypes []*sql.ColumnType, sourceTableDef *schema_store.Table, callbackFunc core.AfterMsgCommitFunc, position TablePosition) *core.Msg
NewMsg creates a job, it converts sql.NullString to interface{} based on the column type. If the column type is time, then we parse the time
func PutCurrentPos ¶ added in v0.9.17
func PutCurrentPos(cache position_store.PositionCacheInterface, fullTableName string, pos *TablePosition) error
func PutMaxMin ¶ added in v0.9.17
func PutMaxMin(cache position_store.PositionCacheInterface, fullTableName string, max *TablePosition, min *TablePosition) error
func Serialize ¶ added in v0.9.17
func Serialize(positions *BatchPositionValue) (string, error)
func SetupInitialPosition ¶ added in v0.9.17
func SetupInitialPosition(cache position_store.PositionCacheInterface, sourceDB *sql.DB) error
func String2Val ¶
Types ¶
type BatchPositionValue ¶ added in v0.9.17
type BatchPositionValue struct { Start *utils.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"` Min map[string]TablePosition `toml:"min" json:"min"` Max map[string]TablePosition `toml:"max" json:"max"` Current map[string]TablePosition `toml:"current" json:"current"` }
func Deserialize ¶ added in v0.9.17
func Deserialize(value string) (*BatchPositionValue, error)
type PluginConfig ¶
type PluginConfig struct { Source *utils.DBConfig `mapstructure:"source" toml:"source" json:"source"` // keep same with mysql binlog config to make most cases simple SourceSlave *utils.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"` SourceProbeCfg *helper.SourceProbeCfg `mapstructure:"source-probe-config"json:"source-probe-config"` TableConfigs []TableConfig `mapstructure:"table-configs"json:"table-configs"` NrScanner int `mapstructure:"nr-scanner" toml:"nr-scanner" json:"nr-scanner"` TableScanBatch int `mapstructure:"table-scan-batch" toml:"table-scan-batch" json:"table-scan-batch"` MaxFullDumpCount int `mapstructure:"max-full-dump-count" toml:"max-full-dump-count" json:"max-full-dump-count"` BatchPerSecondLimit int `mapstructure:"batch-per-second-limit" toml:"batch-per-second-limit" json:"batch-per-second-limit"` }
func (*PluginConfig) ValidateAndSetDefault ¶
func (cfg *PluginConfig) ValidateAndSetDefault() error
type TableConfig ¶
type TableConfig struct { Schema string `mapstructure:"schema" toml:"schema" json:"schema"` // Table is an array of string, each string is a glob expression // that describes the table name Table []string `mapstructure:"table" toml:"table" json:"table"` }
func GetTables ¶
func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, tableConfigs []TableConfig, router core.Router) ([]*schema_store.Table, []TableConfig)
GetTables returns a list of table definition based on the schema, table name patterns We only support single sourceDB for now.
type TablePosition ¶ added in v0.9.17
type TablePosition struct { Value interface{} `toml:"value" json:"value,omitempty"` Type string `toml:"type" json:"type"` Column string `toml:"column" json:"column"` }
func GetCurrentPos ¶ added in v0.9.17
func GetCurrentPos(cache position_store.PositionCacheInterface, fullTableName string) (*TablePosition, bool, error)
func (TablePosition) MapString ¶ added in v0.9.17
func (p TablePosition) MapString() (map[string]string, error)
func (TablePosition) MarshalJSON ¶ added in v0.9.17
func (p TablePosition) MarshalJSON() ([]byte, error)
func (*TablePosition) UnmarshalJSON ¶ added in v0.9.17
func (p *TablePosition) UnmarshalJSON(value []byte) error
type TableScanner ¶
type TableScanner struct {
// contains filtered or unexported fields
}
func NewTableScanner ¶
func NewTableScanner( pipelineName string, tableWorkC chan *TableWork, db *sql.DB, positionCache position_store.PositionCacheInterface, emitter core.Emitter, throttle *time.Ticker, schemaStore schema_store.SchemaStore, cfg *PluginConfig, ctx context.Context) *TableScanner
func (*TableScanner) AfterMsgCommit ¶
func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
func (*TableScanner) FindAll ¶
func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
func (*TableScanner) InitTablePosition ¶
func (tableScanner *TableScanner) InitTablePosition(tableDef *schema_store.Table, tableConfig *TableConfig, scanColumn string) error
func (*TableScanner) LoopInBatch ¶
func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, scanColumn string, max TablePosition, min TablePosition, batch int)
LoopInBatch will iterate the table by sql like this: SELECT * FROM a WHERE some_key > some_value LIMIT 10000 It will get the min, max value of the column and iterate batch by batch
func (*TableScanner) Start ¶
func (tableScanner *TableScanner) Start() error
func (*TableScanner) Wait ¶
func (tableScanner *TableScanner) Wait()
type TableWork ¶
type TableWork struct { TableDef *schema_store.Table TableConfig *TableConfig ScanColumn string }
Click to show internal directories.
Click to hide internal directories.