mysqlbatch

package
v0.9.25 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2019 License: Apache-2.0 Imports: 26 Imported by: 2

Documentation

Index

Constants

View Source
const (
	Unknown       = "*"
	PlainString   = "string"
	PlainInt      = "int"
	PlainBytes    = "bytes"
	PlainTime     = "time"
	SQLNullInt64  = "sqlNullInt64"
	SQLNullString = "sqlNullString"
	SQLNullBool   = "sqlNullBool"
	SQLNullTime   = "sqlNullTime"
	SQLRawBytes   = "sqlRawBytes"
)
View Source
const Name = "mysql-batch"

Variables

View Source
var (
	BatchQueryDuration = prometheus.NewHistogramVec(
		prometheus.HistogramOpts{
			Namespace: "gravity",
			Subsystem: "output",
			Name:      "batch_query_duration",
			Help:      "bucketed histogram of batch fetch duration time",
			Buckets:   prometheus.ExponentialBuckets(0.0005, 2, 22),
		}, []string{metrics.PipelineTag})

	JobFetchedCount = prometheus.NewCounterVec(prometheus.CounterOpts{
		Namespace: "gravity",
		Subsystem: "output",
		Name:      "job_fetched_count",
		Help:      "Number of data rows fetched by scanner",
	}, []string{metrics.PipelineTag})
)

Functions

func DecodeBatchPositionValue added in v0.9.19

func DecodeBatchPositionValue(s string) (interface{}, error)

func DetectScanColumn

func DetectScanColumn(sourceDB *sql.DB, dbName string, tableName string, estimatedRowsCount int64, maxFullDumpRowsCountLimit int64) (string, error)

DetectScanColumn find a column that we used to scan the table SHOW INDEX FROM .. Pick primary key, if there is only one primary key If pk not found try using unique index fail

func EncodeBatchPositionValue added in v0.9.19

func EncodeBatchPositionValue(v interface{}) (string, error)

func FindMaxMinValueFromDB

func FindMaxMinValueFromDB(db *sql.DB, dbName string, tableName string, scanColumn string) (interface{}, interface{})

func GetMaxMin added in v0.9.17

func GetScanIdx

func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)

func GetStartBinlog added in v0.9.17

func GetTableColumnTypes

func GetTableColumnTypes(db *sql.DB, schema string, table string) ([]*sql.ColumnType, error)

func InitTablePosition added in v0.9.18

func InitTablePosition(db *sql.DB, positionCache position_store.PositionCacheInterface, tableDef *schema_store.Table, scanColumn string, estimatedRowCount int64) (bool, error)

func NewBarrierMsg added in v0.9.17

func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg

func NewCloseInputStreamMsg

func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg

func NewCreateTableMsg

func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg

func NewMsg

func NewMsg(
	rowPtrs []interface{},
	columnTypes []*sql.ColumnType,
	sourceTableDef *schema_store.Table,
	callbackFunc core.AfterMsgCommitFunc,
	position TablePosition,
	scanTime time.Time) *core.Msg

NewMsg creates a job, it converts sql.NullString to interface{} based on the column type. If the column type is time, then we parse the time

func PutCurrentPos added in v0.9.17

func PutCurrentPos(cache position_store.PositionCacheInterface, fullTableName string, pos TablePosition, incScanCount bool) error

func PutEstimatedCount added in v0.9.18

func PutEstimatedCount(cache position_store.PositionCacheInterface, fullTableName string, estimatedCount int64) error

func PutMaxMin added in v0.9.17

func PutMaxMin(cache position_store.PositionCacheInterface, fullTableName string, max TablePosition, min TablePosition) error

func SetupInitialPosition added in v0.9.17

func SetupInitialPosition(cache position_store.PositionCacheInterface, sourceDB *sql.DB) error

Types

type BatchPositionValue added in v0.9.17

type BatchPositionValue struct {
	Start       utils.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"`
	TableStates map[string]TableStats     `toml:"table-stats" json:"table-stats"`
}

type PluginConfig

type PluginConfig struct {
	Source      *utils.DBConfig `mapstructure:"source" toml:"source" json:"source"` // keep same with mysql binlog config to make most cases simple
	SourceSlave *utils.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"`

	SourceProbeCfg *helper.SourceProbeCfg `mapstructure:"source-probe-config"json:"source-probe-config"`

	TableConfigs []TableConfig `mapstructure:"table-configs"json:"table-configs"`

	NrScanner        int   `mapstructure:"nr-scanner" toml:"nr-scanner" json:"nr-scanner"`
	TableScanBatch   int   `mapstructure:"table-scan-batch" toml:"table-scan-batch" json:"table-scan-batch"`
	MaxFullDumpCount int64 `mapstructure:"max-full-dump-count"  toml:"max-full-dump-count"  json:"max-full-dump-count"`

	BatchPerSecondLimit int `mapstructure:"batch-per-second-limit" toml:"batch-per-second-limit" json:"batch-per-second-limit"`
}

func (*PluginConfig) ValidateAndSetDefault

func (cfg *PluginConfig) ValidateAndSetDefault() error

type TableConfig

type TableConfig struct {
	Schema string `mapstructure:"schema" toml:"schema" json:"schema"`
	// Table is an array of string, each string is a glob expression
	// that describes the table name
	Table []string `mapstructure:"table" toml:"table" json:"table"`

	// ScanColumn enforces these table's scan column
	ScanColumn string `mapstructure:"scan-column" toml:"scan-column" json:"scan-column"`
}

func DeleteEmptyTables added in v0.9.18

func DeleteEmptyTables(db *sql.DB, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)

func GetTables

func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, tableConfigs []TableConfig, router core.Router) ([]*schema_store.Table, []TableConfig)

GetTables returns a list of table definition based on the schema, table name patterns We only support single sourceDB for now.

func InitializePositionAndDeleteScannedTable added in v0.9.19

func InitializePositionAndDeleteScannedTable(
	db *sql.DB,
	positionCache position_store.PositionCacheInterface,
	scanColumns []string,
	estimatedRowCount []int64,
	tables []*schema_store.Table,
	tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig, []string, []int64, error)

type TablePosition added in v0.9.17

type TablePosition struct {
	Value  interface{} `toml:"value" json:"value,omitempty"`
	Type   string      `toml:"type" json:"type"`
	Column string      `toml:"column" json:"column"`
}

func GetCurrentPos added in v0.9.17

func GetCurrentPos(cache position_store.PositionCacheInterface, fullTableName string) (TablePosition, bool, error)

func (TablePosition) MapString added in v0.9.17

func (p TablePosition) MapString() (map[string]string, error)

func (TablePosition) MarshalJSON added in v0.9.17

func (p TablePosition) MarshalJSON() ([]byte, error)

func (*TablePosition) UnmarshalJSON added in v0.9.17

func (p *TablePosition) UnmarshalJSON(value []byte) error

type TableScanner

type TableScanner struct {
	// contains filtered or unexported fields
}

func NewTableScanner

func NewTableScanner(
	pipelineName string,
	tableWorkC chan *TableWork,
	db *sql.DB,
	positionCache position_store.PositionCacheInterface,
	emitter core.Emitter,
	throttle *time.Ticker,
	schemaStore schema_store.SchemaStore,
	cfg *PluginConfig,
	ctx context.Context) *TableScanner

func (*TableScanner) AfterMsgCommit

func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error

func (*TableScanner) FindAll

func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)

func (*TableScanner) LoopInBatch

func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, scanColumn string, max TablePosition, min TablePosition, batch int)

LoopInBatch will iterate the table by sql like this: SELECT * FROM a WHERE some_key > some_value LIMIT 10000 It will get the min, max value of the column and iterate batch by batch

func (*TableScanner) Start

func (tableScanner *TableScanner) Start() error

func (*TableScanner) Wait

func (tableScanner *TableScanner) Wait()

type TableStats added in v0.9.18

type TableStats struct {
	Max               *TablePosition `toml:"max" json:"max"`
	Min               *TablePosition `toml:"min" json:"min"`
	Current           *TablePosition `toml:"current" json:"current"`
	EstimatedRowCount int64          `json:"estimated-count"`
	ScannedCount      int64          `json:"scanned-count"`
}

type TableWork

type TableWork struct {
	TableDef          *schema_store.Table
	TableConfig       *TableConfig
	ScanColumn        string
	EstimatedRowCount int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL