Documentation ¶
Index ¶
- Constants
- Variables
- func DecodeBatchPositionValue(s string) (interface{}, error)
- func DetectScanColumns(sourceDB *sql.DB, dbName string, tableName string, estimatedRowsCount int64, ...) ([]string, error)
- func EncodeBatchPositionValue(v interface{}) (string, error)
- func FindMaxMinValueFromDB(db *sql.DB, dbName string, tableName string, scanColumns []string, ...) ([]interface{}, []interface{})
- func GenerateNextScanQueryAndArgs(fullTableName string, scanColumns []string, currentMinValues []interface{}, ...) (string, []interface{})
- func GenerateScanQueryAndArgs(fullTableName string, scanColumns []string, currentMinValues []interface{}, ...) (string, []interface{})
- func GetMaxMin(cache position_cache.PositionCacheInterface, fullTableName string) ([]TablePosition, []TablePosition, bool, error)
- func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
- func GetStartBinlog(cache position_cache.PositionCacheInterface) (config.MySQLBinlogPosition, error)
- func GetTableColumnTypes(db *sql.DB, schema string, table string) ([]*sql.ColumnType, error)
- func GreaterThanMax(db *sql.DB, fullTableName string, scanColumns []string, ...) (bool, error)
- func InitTablePosition(db *sql.DB, positionCache position_cache.PositionCacheInterface, ...) (bool, error)
- func IsScanColumnsForDump(scanColumns []string) bool
- func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg
- func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg
- func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg
- func NewMsg(rowPtrs []interface{}, columnTypes []*sql.ColumnType, ...) *core.Msg
- func NextBatchStartPoint(db *sql.DB, fullTableName string, scanColumns []string, ...) (nextMinValues []interface{}, continueNext bool, pivotIndex int, err error)
- func NextScanElementForChunk(db *sql.DB, fullTableName string, columnTypes []*sql.ColumnType, ...) (nextRowValues []interface{}, exists bool, err error)
- func PutCurrentPos(cache position_cache.PositionCacheInterface, fullTableName string, ...) error
- func PutDone(cache position_cache.PositionCacheInterface, fullTableName string) error
- func PutEstimatedCount(cache position_cache.PositionCacheInterface, fullTableName string, ...) error
- func PutMaxMin(cache position_cache.PositionCacheInterface, fullTableName string, ...) error
- func ScanValuesFromRowValues(rowValues []interface{}, scanIndexes []int) []interface{}
- func SetupInitialPosition(cache position_cache.PositionCacheInterface, sourceDB *sql.DB) error
- type BatchPositionValueV1
- type BatchPositionValueV1Beta1
- type BatchPositionVersionMigrationWrapper
- type PluginConfig
- type TableConfig
- func DeleteEmptyTables(db *sql.DB, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)
- func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, ignoreTables []TableConfig, ...) ([]*schema_store.Table, []TableConfig)
- func InitializePositionAndDeleteScannedTable(db *sql.DB, positionCache position_cache.PositionCacheInterface, ...) ([]*schema_store.Table, []TableConfig, [][]string, []int64, error)
- type TablePosition
- type TableScanner
- func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
- func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
- func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, ...)
- func (tableScanner *TableScanner) Start() error
- func (tableScanner *TableScanner) Wait()
- type TableStats
- type TableStatsV1
- type TableWork
Constants ¶
View Source
const ( Unknown = "*" PlainString = "string" PlainInt = "int" PlainUInt = "uint" PlainBytes = "bytes" PlainTime = "time" SQLNullInt64 = "sqlNullInt64" SQLNullString = "sqlNullString" SQLNullBool = "sqlNullBool" SQLNullTime = "sqlNullTime" SQLRawBytes = "sqlRawBytes" ScanColumnForDump = "*" SchemaVersionV1 = "v1.0" )
View Source
const Name = "mysql-batch"
Variables ¶
View Source
var ( BatchQueryDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "gravity", Subsystem: "output", Name: "batch_query_duration", Help: "bucketed histogram of batch fetch duration time", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{metrics.PipelineTag}) JobFetchedCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "gravity", Subsystem: "output", Name: "job_fetched_count", Help: "Number of data rows fetched by scanner", }, []string{metrics.PipelineTag}) )
Functions ¶
func DecodeBatchPositionValue ¶ added in v0.9.19
func DetectScanColumns ¶ added in v0.9.27
func DetectScanColumns(sourceDB *sql.DB, dbName string, tableName string, estimatedRowsCount int64, maxFullDumpRowsCountLimit int64) ([]string, error)
DetectScanColumns find columns that we used to scan the table First, we try primary keys, then we try unique key; we try dump the table at last. Note that composite unique key is not supported.
func EncodeBatchPositionValue ¶ added in v0.9.19
func FindMaxMinValueFromDB ¶
func GenerateNextScanQueryAndArgs ¶ added in v0.9.27
func GenerateScanQueryAndArgs ¶ added in v0.9.27
func GenerateScanQueryAndArgs( fullTableName string, scanColumns []string, currentMinValues []interface{}, batch int, pivotIndex int, condition string) (string, []interface{})
pivotIndex is the index in scanColumns
func GetMaxMin ¶ added in v0.9.17
func GetMaxMin(cache position_cache.PositionCacheInterface, fullTableName string) ([]TablePosition, []TablePosition, bool, error)
func GetScanIdx ¶
func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
func GetStartBinlog ¶ added in v0.9.17
func GetStartBinlog(cache position_cache.PositionCacheInterface) (config.MySQLBinlogPosition, error)
func GetTableColumnTypes ¶
func GreaterThanMax ¶ added in v0.9.27
func InitTablePosition ¶ added in v0.9.18
func InitTablePosition( db *sql.DB, positionCache position_cache.PositionCacheInterface, tableDef *schema_store.Table, scanColumns []string, tableConfig TableConfig, estimatedRowCount *int64) (bool, error)
func IsScanColumnsForDump ¶ added in v0.9.27
func NewBarrierMsg ¶ added in v0.9.17
func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg
func NewCloseInputStreamMsg ¶
func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg
func NewCreateTableMsg ¶
func NewMsg ¶
func NewMsg( rowPtrs []interface{}, columnTypes []*sql.ColumnType, sourceTableDef *schema_store.Table, callbackFunc core.MsgCallbackFunc, positions []TablePosition, scanTime time.Time) *core.Msg
NewMsg creates a job, it converts sql.NullString to interface{} based on the column type. If the column type is time, then we parse the time
func NextBatchStartPoint ¶ added in v0.9.27
func NextScanElementForChunk ¶ added in v0.9.27
func PutCurrentPos ¶ added in v0.9.17
func PutCurrentPos(cache position_cache.PositionCacheInterface, fullTableName string, pos []TablePosition, incScanCount bool) error
func PutDone ¶ added in v0.9.27
func PutDone(cache position_cache.PositionCacheInterface, fullTableName string) error
func PutEstimatedCount ¶ added in v0.9.18
func PutEstimatedCount(cache position_cache.PositionCacheInterface, fullTableName string, estimatedCount int64) error
func PutMaxMin ¶ added in v0.9.17
func PutMaxMin(cache position_cache.PositionCacheInterface, fullTableName string, max []TablePosition, min []TablePosition) error
func ScanValuesFromRowValues ¶ added in v0.9.27
func ScanValuesFromRowValues(rowValues []interface{}, scanIndexes []int) []interface{}
func SetupInitialPosition ¶ added in v0.9.17
func SetupInitialPosition(cache position_cache.PositionCacheInterface, sourceDB *sql.DB) error
Types ¶
type BatchPositionValueV1 ¶ added in v0.9.27
type BatchPositionValueV1 struct { SchemaVersion string `toml:"schema-version" json:"schema-version"` Start config.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"` TableStates map[string]TableStatsV1 `toml:"table-stats" json:"table-stats"` }
type BatchPositionValueV1Beta1 ¶ added in v0.9.27
type BatchPositionValueV1Beta1 struct { SchemaVersion string `toml:"schema-version" json:"schema-version"` Start config.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"` TableStates map[string]TableStats `toml:"table-stats" json:"table-stats"` }
type BatchPositionVersionMigrationWrapper ¶ added in v0.9.27
type BatchPositionVersionMigrationWrapper struct {
SchemaVersion string `toml:"schema-version" json:"schema-version"`
}
type PluginConfig ¶
type PluginConfig struct { Source *config.DBConfig `mapstructure:"source" toml:"source" json:"source"` // keep same with mysql binlog config to make most cases simple SourceSlave *config.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"` SourceProbeCfg *helper.SourceProbeCfg `mapstructure:"source-probe-config"json:"source-probe-config"` PositionRepo *config.GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"` TableConfigs []TableConfig `mapstructure:"table-configs" json:"table-configs"` IgnoreTables []TableConfig `mapstructure:"ignore-tables" json:"ignore-tables"` NrScanner int `mapstructure:"nr-scanner" toml:"nr-scanner" json:"nr-scanner"` TableScanBatch int `mapstructure:"table-scan-batch" toml:"table-scan-batch" json:"table-scan-batch"` MaxFullDumpCount int64 `mapstructure:"max-full-dump-count" toml:"max-full-dump-count" json:"max-full-dump-count"` BatchPerSecondLimit int `mapstructure:"batch-per-second-limit" toml:"batch-per-second-limit" json:"batch-per-second-limit"` }
func (*PluginConfig) ValidateAndSetDefault ¶
func (cfg *PluginConfig) ValidateAndSetDefault() error
type TableConfig ¶
type TableConfig struct { Schema string `mapstructure:"schema" toml:"schema" json:"schema"` // Table is an array of string, each string is a glob expression // that describes the table name Table []string `mapstructure:"table" toml:"table" json:"table"` // ScanColumn is an array of string, that enforces these table's scan columns ScanColumn []string `mapstructure:"scan-column" toml:"scan-column" json:"scan-column"` Condition string `mapstructure:"condition" toml:"condition" json:"condition"` }
func DeleteEmptyTables ¶ added in v0.9.18
func DeleteEmptyTables(db *sql.DB, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)
func GetTables ¶
func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, ignoreTables []TableConfig, tableConfigs []TableConfig, router core.Router) ([]*schema_store.Table, []TableConfig)
GetTables returns a list of table definition based on the schema, table name patterns We only support single sourceDB for now.
func InitializePositionAndDeleteScannedTable ¶ added in v0.9.19
func InitializePositionAndDeleteScannedTable( db *sql.DB, positionCache position_cache.PositionCacheInterface, scanColumnsArray [][]string, estimatedRowCount []int64, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig, [][]string, []int64, error)
type TablePosition ¶ added in v0.9.17
type TablePosition struct { Value interface{} `toml:"value" json:"value,omitempty"` Type string `toml:"type" json:"type"` Column string `toml:"column" json:"column"` }
func GetCurrentPos ¶ added in v0.9.17
func GetCurrentPos(cache position_cache.PositionCacheInterface, fullTableName string) ([]TablePosition, bool, bool, error)
func (TablePosition) MapString ¶ added in v0.9.17
func (p TablePosition) MapString() (map[string]string, error)
func (TablePosition) MarshalJSON ¶ added in v0.9.17
func (p TablePosition) MarshalJSON() ([]byte, error)
func (*TablePosition) UnmarshalJSON ¶ added in v0.9.17
func (p *TablePosition) UnmarshalJSON(value []byte) error
type TableScanner ¶
type TableScanner struct {
// contains filtered or unexported fields
}
func NewTableScanner ¶
func NewTableScanner( pipelineName string, tableWorkC chan *TableWork, db *sql.DB, positionCache position_cache.PositionCacheInterface, emitter core.Emitter, throttle *time.Ticker, schemaStore schema_store.SchemaStore, cfg *PluginConfig, ctx context.Context) *TableScanner
func (*TableScanner) AfterMsgCommit ¶
func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
func (*TableScanner) FindAll ¶
func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
func (*TableScanner) LoopInBatch ¶
func (tableScanner *TableScanner) LoopInBatch( db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, scanColumns []string, max []TablePosition, min []TablePosition, batch int)
LoopInBatch will iterate the table by sql like this: SELECT * FROM a WHERE some_key > some_value LIMIT 10000 It will get the min, max value of the column and iterate batch by batch
func (*TableScanner) Start ¶
func (tableScanner *TableScanner) Start() error
func (*TableScanner) Wait ¶
func (tableScanner *TableScanner) Wait()
type TableStats ¶ added in v0.9.18
type TableStats struct { Max *TablePosition `toml:"max" json:"max"` Min *TablePosition `toml:"min" json:"min"` Current *TablePosition `toml:"current" json:"current"` EstimatedRowCount int64 `json:"estimated-count"` ScannedCount int64 `json:"scanned-count"` Done bool `json:"done"` }
type TableStatsV1 ¶ added in v0.9.27
type TableStatsV1 struct { Max []TablePosition `toml:"max" json:"max"` Min []TablePosition `toml:"min" json:"min"` Current []TablePosition `toml:"current" json:"current"` EstimatedRowCount int64 `json:"estimated-count"` ScannedCount int64 `json:"scanned-count"` Done bool `json:"done"` }
type TableWork ¶
type TableWork struct { TableDef *schema_store.Table TableConfig *TableConfig ScanColumns []string EstimatedRowCount int64 Condition string }
Click to show internal directories.
Click to hide internal directories.