mysqlbatch

package
v0.9.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 9, 2019 License: Apache-2.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrTableEmpty = errors.New("table_scanner: this table is empty")

Functions

func DetectScanColumn

func DetectScanColumn(sourceDB *sql.DB, dbName string, tableName string, maxFullDumpRowsCount int) (string, error)

DetectScanColumn find a column that we used to scan the table SHOW INDEX FROM .. Pick primary key, if there is only one primary key If pk not found try using unique index fail

func FindMaxMinValueFromDB

func FindMaxMinValueFromDB(db *sql.DB, dbName string, tableName string, scanColumn string) (interface{}, interface{})

func GetScanIdx

func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)

func GetTableColumnTypes

func GetTableColumnTypes(db *sql.DB, schema string, table string) ([]*sql.ColumnType, error)

func NewCreateTableMsg added in v0.9.4

func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg

func NewMsg

func NewMsg(
	rowPtrs []interface{},
	columnTypes []*sql.ColumnType,
	sourceTableDef *schema_store.Table,
	callbackFunc core.AfterMsgCommitFunc,
	position position_store.MySQLTablePosition) *core.Msg

NewMsg creates a job, it converts sql.NullString to interface{} based on the column type. If the column type is time, then we parse the time

func String2Val

func String2Val(s string, scanType string) interface{}

Types

type PluginConfig

type PluginConfig struct {
	SourceMaster *utils.DBConfig `mapstructure:"source" toml:"source" json:"source"` // keep same with mysql binlog config to make most cases simple
	SourceSlave  *utils.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"`

	TableConfigs []TableConfig `mapstructure:"table-configs"json:"table-configs"`

	NrScanner        int `mapstructure:"nr-scanner" toml:"nr-scanner" json:"nr-scanner"`
	TableScanBatch   int `mapstructure:"table-scan-batch" toml:"table-scan-batch" json:"table-scan-batch"`
	MaxFullDumpCount int `mapstructure:"max-full-dump-count"  toml:"max-full-dump-count"  json:"max-full-dump-count"`

	BatchPerSecondLimit int `mapstructure:"batch-per-second-limit" toml:"batch-per-second-limit" json:"batch-per-second-limit"`
}

func (*PluginConfig) ValidateAndSetDefault added in v0.9.6

func (cfg *PluginConfig) ValidateAndSetDefault() error

type TableConfig

type TableConfig struct {
	Schema string   `mapstructure:"schema" toml:"schema" json:"schema"`
	Table  []string `mapstructure:"table" toml:"table" json:"table"`
}

func GetTables

func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)

GetTables returns a list of table definition based on the schema, table name patterns We only support single sourceDB for now.

type TableScanner

type TableScanner struct {
	// contains filtered or unexported fields
}

func NewTableScanner

func NewTableScanner(
	pipelineName string,
	tableWorkC chan *TableWork,
	db *sql.DB,
	positionStore position_store.MySQLTablePositionStore,
	emitter core.Emitter,
	throttle *time.Ticker,
	schemaStore schema_store.SchemaStore,
	cfg *PluginConfig,
	ctx context.Context) *TableScanner

func (*TableScanner) AfterMsgCommit

func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error

func (*TableScanner) FindAll

func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)

func (*TableScanner) InitTablePosition

func (tableScanner *TableScanner) InitTablePosition(tableDef *schema_store.Table, tableConfig *TableConfig) error

func (*TableScanner) LoopInBatch

func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, scanColumn string, max position_store.MySQLTablePosition, min position_store.MySQLTablePosition, batch int)

LoopInBatch will iterate the table by sql like this: SELECT * FROM a WHERE some_key > some_value LIMIT 10000 It will get the min, max value of the column and iterate batch by batch

func (*TableScanner) Start

func (tableScanner *TableScanner) Start() error

func (*TableScanner) Wait

func (tableScanner *TableScanner) Wait()

type TableWork

type TableWork struct {
	TableDef    *schema_store.Table
	TableConfig *TableConfig
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL