Documentation ¶
Index ¶
- Constants
- Variables
- func DecodeBatchPositionValue(s string) (interface{}, error)
- func DetectScanColumn(sourceDB *sql.DB, dbName string, tableName string, estimatedRowsCount int64, ...) (string, error)
- func EncodeBatchPositionValue(v interface{}) (string, error)
- func FindMaxMinValueFromDB(db *sql.DB, dbName string, tableName string, scanColumn string) (interface{}, interface{})
- func GetMaxMin(cache position_store.PositionCacheInterface, fullTableName string) (TablePosition, TablePosition, bool, error)
- func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
- func GetStartBinlog(cache position_store.PositionCacheInterface) (utils.MySQLBinlogPosition, error)
- func GetTableColumnTypes(db *sql.DB, schema string, table string) ([]*sql.ColumnType, error)
- func InitTablePosition(db *sql.DB, positionCache position_store.PositionCacheInterface, ...) (bool, error)
- func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg
- func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg
- func NewCreateTableMsg(parser *parser.Parser, table *schema_store.Table, createTblStmt string) *core.Msg
- func NewMsg(rowPtrs []interface{}, columnTypes []*sql.ColumnType, ...) *core.Msg
- func PutCurrentPos(cache position_store.PositionCacheInterface, fullTableName string, ...) error
- func PutEstimatedCount(cache position_store.PositionCacheInterface, fullTableName string, ...) error
- func PutMaxMin(cache position_store.PositionCacheInterface, fullTableName string, ...) error
- func SetupInitialPosition(cache position_store.PositionCacheInterface, sourceDB *sql.DB) error
- type BatchPositionValue
- type PluginConfig
- type TableConfig
- func DeleteEmptyTables(db *sql.DB, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)
- func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, tableConfigs []TableConfig, ...) ([]*schema_store.Table, []TableConfig)
- func InitializePositionAndDeleteScannedTable(db *sql.DB, positionCache position_store.PositionCacheInterface, ...) ([]*schema_store.Table, []TableConfig, []string, []int64, error)
- type TablePosition
- type TableScanner
- func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
- func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
- func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, ...)
- func (tableScanner *TableScanner) Start() error
- func (tableScanner *TableScanner) Wait()
- type TableStats
- type TableWork
Constants ¶
View Source
const ( Unknown = "*" PlainString = "string" PlainInt = "int" PlainBytes = "bytes" PlainTime = "time" SQLNullInt64 = "sqlNullInt64" SQLNullString = "sqlNullString" SQLNullBool = "sqlNullBool" SQLNullTime = "sqlNullTime" SQLRawBytes = "sqlRawBytes" )
Variables ¶
View Source
var ( BatchQueryDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "gravity", Subsystem: "output", Name: "batch_query_duration", Help: "bucketed histogram of batch fetch duration time", Buckets: prometheus.ExponentialBuckets(0.0005, 2, 22), }, []string{metrics.PipelineTag}) JobFetchedCount = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "gravity", Subsystem: "output", Name: "job_fetched_count", Help: "Number of data rows fetched by scanner", }, []string{metrics.PipelineTag}) )
Functions ¶
func DecodeBatchPositionValue ¶ added in v0.9.19
func DetectScanColumn ¶
func DetectScanColumn(sourceDB *sql.DB, dbName string, tableName string, estimatedRowsCount int64, maxFullDumpRowsCountLimit int64) (string, error)
DetectScanColumn find a column that we used to scan the table SHOW INDEX FROM .. Pick primary key, if there is only one primary key If pk not found try using unique index fail
func EncodeBatchPositionValue ¶ added in v0.9.19
func FindMaxMinValueFromDB ¶
func GetMaxMin ¶ added in v0.9.17
func GetMaxMin(cache position_store.PositionCacheInterface, fullTableName string) (TablePosition, TablePosition, bool, error)
func GetScanIdx ¶
func GetScanIdx(columnTypes []*sql.ColumnType, scanColumn string) (int, error)
func GetStartBinlog ¶ added in v0.9.17
func GetStartBinlog(cache position_store.PositionCacheInterface) (utils.MySQLBinlogPosition, error)
func GetTableColumnTypes ¶
func InitTablePosition ¶ added in v0.9.18
func InitTablePosition(db *sql.DB, positionCache position_store.PositionCacheInterface, tableDef *schema_store.Table, scanColumn string, estimatedRowCount int64) (bool, error)
func NewBarrierMsg ¶ added in v0.9.17
func NewBarrierMsg(tableDef *schema_store.Table) *core.Msg
func NewCloseInputStreamMsg ¶
func NewCloseInputStreamMsg(tableDef *schema_store.Table) *core.Msg
func NewCreateTableMsg ¶
func NewMsg ¶
func NewMsg( rowPtrs []interface{}, columnTypes []*sql.ColumnType, sourceTableDef *schema_store.Table, callbackFunc core.AfterMsgCommitFunc, position TablePosition, scanTime time.Time) *core.Msg
NewMsg creates a job, it converts sql.NullString to interface{} based on the column type. If the column type is time, then we parse the time
func PutCurrentPos ¶ added in v0.9.17
func PutCurrentPos(cache position_store.PositionCacheInterface, fullTableName string, pos TablePosition, incScanCount bool) error
func PutEstimatedCount ¶ added in v0.9.18
func PutEstimatedCount(cache position_store.PositionCacheInterface, fullTableName string, estimatedCount int64) error
func PutMaxMin ¶ added in v0.9.17
func PutMaxMin(cache position_store.PositionCacheInterface, fullTableName string, max TablePosition, min TablePosition) error
func SetupInitialPosition ¶ added in v0.9.17
func SetupInitialPosition(cache position_store.PositionCacheInterface, sourceDB *sql.DB) error
Types ¶
type BatchPositionValue ¶ added in v0.9.17
type BatchPositionValue struct { Start utils.MySQLBinlogPosition `toml:"start-binlog" json:"start-binlog"` TableStates map[string]TableStats `toml:"table-stats" json:"table-stats"` }
type PluginConfig ¶
type PluginConfig struct { Source *utils.DBConfig `mapstructure:"source" toml:"source" json:"source"` // keep same with mysql binlog config to make most cases simple SourceSlave *utils.DBConfig `mapstructure:"source-slave" toml:"source-slave" json:"source-slave"` SourceProbeCfg *helper.SourceProbeCfg `mapstructure:"source-probe-config"json:"source-probe-config"` TableConfigs []TableConfig `mapstructure:"table-configs"json:"table-configs"` NrScanner int `mapstructure:"nr-scanner" toml:"nr-scanner" json:"nr-scanner"` TableScanBatch int `mapstructure:"table-scan-batch" toml:"table-scan-batch" json:"table-scan-batch"` MaxFullDumpCount int64 `mapstructure:"max-full-dump-count" toml:"max-full-dump-count" json:"max-full-dump-count"` BatchPerSecondLimit int `mapstructure:"batch-per-second-limit" toml:"batch-per-second-limit" json:"batch-per-second-limit"` }
func (*PluginConfig) ValidateAndSetDefault ¶
func (cfg *PluginConfig) ValidateAndSetDefault() error
type TableConfig ¶
type TableConfig struct { Schema string `mapstructure:"schema" toml:"schema" json:"schema"` // Table is an array of string, each string is a glob expression // that describes the table name Table []string `mapstructure:"table" toml:"table" json:"table"` // ScanColumn enforces these table's scan column ScanColumn string `mapstructure:"scan-column" toml:"scan-column" json:"scan-column"` }
func DeleteEmptyTables ¶ added in v0.9.18
func DeleteEmptyTables(db *sql.DB, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig)
func GetTables ¶
func GetTables(db *sql.DB, schemaStore schema_store.SchemaStore, tableConfigs []TableConfig, router core.Router) ([]*schema_store.Table, []TableConfig)
GetTables returns a list of table definition based on the schema, table name patterns We only support single sourceDB for now.
func InitializePositionAndDeleteScannedTable ¶ added in v0.9.19
func InitializePositionAndDeleteScannedTable( db *sql.DB, positionCache position_store.PositionCacheInterface, scanColumns []string, estimatedRowCount []int64, tables []*schema_store.Table, tableConfigs []TableConfig) ([]*schema_store.Table, []TableConfig, []string, []int64, error)
type TablePosition ¶ added in v0.9.17
type TablePosition struct { Value interface{} `toml:"value" json:"value,omitempty"` Type string `toml:"type" json:"type"` Column string `toml:"column" json:"column"` }
func GetCurrentPos ¶ added in v0.9.17
func GetCurrentPos(cache position_store.PositionCacheInterface, fullTableName string) (TablePosition, bool, error)
func (TablePosition) MapString ¶ added in v0.9.17
func (p TablePosition) MapString() (map[string]string, error)
func (TablePosition) MarshalJSON ¶ added in v0.9.17
func (p TablePosition) MarshalJSON() ([]byte, error)
func (*TablePosition) UnmarshalJSON ¶ added in v0.9.17
func (p *TablePosition) UnmarshalJSON(value []byte) error
type TableScanner ¶
type TableScanner struct {
// contains filtered or unexported fields
}
func NewTableScanner ¶
func NewTableScanner( pipelineName string, tableWorkC chan *TableWork, db *sql.DB, positionCache position_store.PositionCacheInterface, emitter core.Emitter, throttle *time.Ticker, schemaStore schema_store.SchemaStore, cfg *PluginConfig, ctx context.Context) *TableScanner
func (*TableScanner) AfterMsgCommit ¶
func (tableScanner *TableScanner) AfterMsgCommit(msg *core.Msg) error
func (*TableScanner) FindAll ¶
func (tableScanner *TableScanner) FindAll(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig)
func (*TableScanner) LoopInBatch ¶
func (tableScanner *TableScanner) LoopInBatch(db *sql.DB, tableDef *schema_store.Table, tableConfig *TableConfig, scanColumn string, max TablePosition, min TablePosition, batch int)
LoopInBatch will iterate the table by sql like this: SELECT * FROM a WHERE some_key > some_value LIMIT 10000 It will get the min, max value of the column and iterate batch by batch
func (*TableScanner) Start ¶
func (tableScanner *TableScanner) Start() error
func (*TableScanner) Wait ¶
func (tableScanner *TableScanner) Wait()
type TableStats ¶ added in v0.9.18
type TableStats struct { Max *TablePosition `toml:"max" json:"max"` Min *TablePosition `toml:"min" json:"min"` Current *TablePosition `toml:"current" json:"current"` EstimatedRowCount int64 `json:"estimated-count"` ScannedCount int64 `json:"scanned-count"` }
type TableWork ¶
type TableWork struct { TableDef *schema_store.Table TableConfig *TableConfig ScanColumn string EstimatedRowCount int64 }
Click to show internal directories.
Click to hide internal directories.