mysqlbatch

package
v0.9.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2019 License: Apache-2.0 Imports: 25 Imported by: 2

Documentation

Index

Constants

View Source
const (
	Unknown       = "*"
	PlainString   = "string"
	PlainInt      = "int"
	PlainBytes    = "bytes"
	PlainTime     = "time"
	SQLNullInt64  = "sqlNullInt64"
	SQLNullString = "sqlNullString"
	SQLNullBool   = "sqlNullBool"
	SQLNullTime   = "sqlNullTime"
	SQLRawBytes   = "sqlRawBytes"
)

Variables

This section is empty.

Functions

func DetectScanColumn

func DetectScanColumn(sourceDB *sql.DB, dbName string, tableName string, maxFullDumpRowsCount int) (string, error)

DetectScanColumn find a column that we used to scan the table SHOW INDEX FROM .. Pick primary key, if there is only one primary key If pk not found try using unique index fail

func FindMaxMinValueFromDB

func FindMaxMinValueFromDB(db *sql.DB, dbName string, tableName string, scanColumn string) (interface{}, interface{})

func GetMaxMin added in v0.9.17

func GetMaxMin(cache position_store.PositionCacheInterface, fullTableName string) (*TablePosition, *TablePosition, bool, error)

func GetScanIdx

func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)

func GetStartBinlog added in v0.9.17

func GetTableColumnTypes

func GetTableColumnTypes(db *sql.DB, schema string, table string) ([]*sql.ColumnType, error)

func NewBarrierMsg added in v0.9.17

func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg

func NewCloseInputStreamMsg

func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg

func NewCreateTableMsg

func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg

func NewMsg

func NewMsg(
	rowPtrs []interface{},
	columnTypes []*sql.ColumnType,
	sourceTableDef *schema_store.Table,
	callbackFunc core.AfterMsgCommitFunc,
	position TablePosition) *core.Msg

NewMsg creates a job, it converts sql.NullString to interface{} based on the column type. If the column type is time, then we parse the time

func PutCurrentPos added in v0.9.17

func PutCurrentPos(cache position_store.PositionCacheInterface, fullTableName string, pos *TablePosition) error

func PutMaxMin added in v0.9.17

func PutMaxMin(cache position_store.PositionCacheInterface, fullTableName string, max *TablePosition, min *TablePosition) error

func Serialize added in v0.9.17

func Serialize(positions *BatchPositionValue) (string, error)

func SetupInitialPosition added in v0.9.17

func SetupInitialPosition(cache position_store.PositionCacheInterface, sourceDB *sql.DB) error

func String2Val

func String2Val(s string, scanType string) interface{}

Types

type BatchPositionValue added in v0.9.17

type BatchPositionValue struct {
	Start   *utils.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"`
	Min     map[string]TablePosition   `toml:"min" json:"min"`
	Max     map[string]TablePosition   `toml:"max" json:"max"`
	Current map[string]TablePosition   `toml:"current" json:"current"`
}

func Deserialize added in v0.9.17

func Deserialize(value string) (*BatchPositionValue, error)

type PluginConfig

type PluginConfig struct {
	Source      *utils.DBConfig `mapstructure:"source" toml:"source" json:"source"` // keep same with mysql binlog config to make most cases simple
	SourceSlave *utils.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"`

	SourceProbeCfg *helper.SourceProbeCfg `mapstructure:"source-probe-config"json:"source-probe-config"`

	TableConfigs []TableConfig `mapstructure:"table-configs"json:"table-configs"`

	NrScanner        int `mapstructure:"nr-scanner" toml:"nr-scanner" json:"nr-scanner"`
	TableScanBatch   int `mapstructure:"table-scan-batch" toml:"table-scan-batch" json:"table-scan-batch"`
	MaxFullDumpCount int `mapstructure:"max-full-dump-count"  toml:"max-full-dump-count"  json:"max-full-dump-count"`

	BatchPerSecondLimit int `mapstructure:"batch-per-second-limit" toml:"batch-per-second-limit" json:"batch-per-second-limit"`
}

func (*PluginConfig) ValidateAndSetDefault

func (cfg *PluginConfig) ValidateAndSetDefault() error

type TableConfig

type TableConfig struct {
	Schema string `mapstructure:"schema" toml:"schema" json:"schema"`
	// Table is an array of string, each string is a glob expression
	// that describes the table name
	Table []string `mapstructure:"table" toml:"table" json:"table"`
}

func GetTables

func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, tableConfigs []TableConfig, router core.Router) ([]*schema_store.Table, []TableConfig)

GetTables returns a list of table definition based on the schema, table name patterns We only support single sourceDB for now.

type TablePosition added in v0.9.17

type TablePosition struct {
	Value  interface{} `toml:"value" json:"value,omitempty"`
	Type   string      `toml:"type" json:"type"`
	Column string      `toml:"column" json:"column"`
}

func GetCurrentPos added in v0.9.17

func GetCurrentPos(cache position_store.PositionCacheInterface, fullTableName string) (*TablePosition, bool, error)

func (TablePosition) MapString added in v0.9.17

func (p TablePosition) MapString() (map[string]string, error)

func (TablePosition) MarshalJSON added in v0.9.17

func (p TablePosition) MarshalJSON() ([]byte, error)

func (*TablePosition) UnmarshalJSON added in v0.9.17

func (p *TablePosition) UnmarshalJSON(value []byte) error

type TableScanner

type TableScanner struct {
	// contains filtered or unexported fields
}

func NewTableScanner

func NewTableScanner(
	pipelineName string,
	tableWorkC chan *TableWork,
	db *sql.DB,
	positionCache position_store.PositionCacheInterface,
	emitter core.Emitter,
	throttle *time.Ticker,
	schemaStore schema_store.SchemaStore,
	cfg *PluginConfig,
	ctx context.Context) *TableScanner

func (*TableScanner) AfterMsgCommit

func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error

func (*TableScanner) FindAll

func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)

func (*TableScanner) InitTablePosition

func (tableScanner *TableScanner) InitTablePosition(tableDef *schema_store.Table, tableConfig *TableConfig, scanColumn string) error

func (*TableScanner) LoopInBatch

func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, scanColumn string, max TablePosition, min TablePosition, batch int)

LoopInBatch will iterate the table by sql like this: SELECT * FROM a WHERE some_key > some_value LIMIT 10000 It will get the min, max value of the column and iterate batch by batch

func (*TableScanner) Start

func (tableScanner *TableScanner) Start() error

func (*TableScanner) Wait

func (tableScanner *TableScanner) Wait()

type TableWork

type TableWork struct {
	TableDef    *schema_store.Table
	TableConfig *TableConfig
	ScanColumn  string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL