Documentation ¶
Index ¶
- Constants
- Variables
- func GenerateDMLDepHashes(msg *core.Msg, tableDef *schema_store.Table) []core.OutputHash
- func GenerateDataHashes(schema string, table string, uniqKeys map[string][]string, ...) []core.OutputHash
- func GetCurrentPositionValue(cache position_cache.PositionCacheInterface) (config.MySQLBinlogPosition, error)
- func IsEventBelongsToMyself(event *replication.RowsEvent, pipelineName string) bool
- func NewBarrierMsg(callback core.MsgCallbackFunc) *core.Msg
- func NewDDLMsg(callback core.MsgCallbackFunc, dbName string, table string, ast ast.StmtNode, ...) *core.Msg
- func NewDeleteMsgs(host string, database string, table string, ts int64, received time.Time, ...) ([]*core.Msg, error)
- func NewHeartbeatMsg(callback core.MsgCallbackFunc) *core.Msg
- func NewInsertMsgs(host string, database string, table string, ts int64, received time.Time, ...) ([]*core.Msg, error)
- func NewUpdateMsgs(host string, database string, table string, ts int64, received time.Time, ...) ([]*core.Msg, error)
- func NewXIDMsg(ts int64, received time.Time, callback core.MsgCallbackFunc, ...) *core.Msg
- func SetupInitialPosition(db *sql.DB, positionCache position_cache.PositionCacheInterface, ...) error
- func ToGoMySQLPosition(p config.MySQLBinlogPosition) (gomysql.Position, gomysql.MysqlGTIDSet, error)
- func UpdateCurrentPositionValue(cache position_cache.PositionCacheInterface, ...) error
- type BinlogEventSchemaFilterFunc
- type BinlogTailer
- func (tailer *BinlogTailer) AfterMsgCommit(msg *core.Msg) error
- func (tailer *BinlogTailer) AppendMsgTxnBuffer(msg *core.Msg)
- func (tailer *BinlogTailer) ClearMsgTxnBuffer()
- func (tailer *BinlogTailer) Close()
- func (tailer *BinlogTailer) FlushMsgTxnBuffer()
- func (tailer *BinlogTailer) Start() error
- func (tailer *BinlogTailer) Wait()
- type MySQLBinlogInputPluginConfig
Constants ¶
View Source
const MySQLPrimaryKeyName = "PRIMARY"
View Source
const Name = "mysql-stream"
Variables ¶
View Source
var ( // re-sync retry timeout RetryTimeout = 3 * time.Second BinlogProbeInterval = 3 * time.Second )
Functions ¶
func GenerateDMLDepHashes ¶ added in v0.9.29
func GenerateDMLDepHashes(msg *core.Msg, tableDef *schema_store.Table) []core.OutputHash
func GenerateDataHashes ¶ added in v0.9.29
func GetCurrentPositionValue ¶ added in v0.9.17
func GetCurrentPositionValue(cache position_cache.PositionCacheInterface) (config.MySQLBinlogPosition, error)
func IsEventBelongsToMyself ¶
func IsEventBelongsToMyself(event *replication.RowsEvent, pipelineName string) bool
func NewBarrierMsg ¶
func NewBarrierMsg(callback core.MsgCallbackFunc) *core.Msg
func NewDeleteMsgs ¶
func NewHeartbeatMsg ¶ added in v0.9.70
func NewHeartbeatMsg(callback core.MsgCallbackFunc) *core.Msg
func NewInsertMsgs ¶
func NewUpdateMsgs ¶
func NewXIDMsg ¶
func NewXIDMsg(ts int64, received time.Time, callback core.MsgCallbackFunc, position config.MySQLBinlogPosition) *core.Msg
func SetupInitialPosition ¶ added in v0.9.17
func SetupInitialPosition(db *sql.DB, positionCache position_cache.PositionCacheInterface, startPositionSpec *config.MySQLBinlogPosition) error
func ToGoMySQLPosition ¶ added in v0.9.17
func ToGoMySQLPosition(p config.MySQLBinlogPosition) (gomysql.Position, gomysql.MysqlGTIDSet, error)
func UpdateCurrentPositionValue ¶ added in v0.9.17
func UpdateCurrentPositionValue(cache position_cache.PositionCacheInterface, currentPosition config.MySQLBinlogPosition) error
Types ¶
type BinlogTailer ¶
type BinlogTailer struct {
// contains filtered or unexported fields
}
func NewBinlogTailer ¶
func NewBinlogTailer( pipelineName string, cfg *MySQLBinlogInputPluginConfig, gravityServerID uint32, positionCache position_cache.PositionCacheInterface, sourceSchemaStore schema_store.SchemaStore, sourceDB *sql.DB, emitter core.Emitter, router core.Router, binlogChecker binlog_checker.BinlogChecker, binlogEventSchemaFilter BinlogEventSchemaFilterFunc, ) (*BinlogTailer, error)
NewBinlogTailer creats a new binlog tailer
func (*BinlogTailer) AfterMsgCommit ¶
func (tailer *BinlogTailer) AfterMsgCommit(msg *core.Msg) error
func (*BinlogTailer) AppendMsgTxnBuffer ¶
func (tailer *BinlogTailer) AppendMsgTxnBuffer(msg *core.Msg)
AppendMsgTxnBuffer adds basic job information to txn buffer
func (*BinlogTailer) ClearMsgTxnBuffer ¶
func (tailer *BinlogTailer) ClearMsgTxnBuffer()
func (*BinlogTailer) Close ¶
func (tailer *BinlogTailer) Close()
func (*BinlogTailer) FlushMsgTxnBuffer ¶
func (tailer *BinlogTailer) FlushMsgTxnBuffer()
FlushMsgTxnBuffer will flush job in txn buffer to queue. We will also filter out job that don't need to send out in this stage.
func (*BinlogTailer) Start ¶
func (tailer *BinlogTailer) Start() error
func (*BinlogTailer) Wait ¶
func (tailer *BinlogTailer) Wait()
type MySQLBinlogInputPluginConfig ¶
type MySQLBinlogInputPluginConfig struct { Source *config.DBConfig `mapstructure:"source" toml:"source" json:"source"` IgnoreBiDirectionalData bool `mapstructure:"ignore-bidirectional-data" toml:"ignore-bidirectional-data" json:"ignore-bidirectional-data"` StartPosition *config.MySQLBinlogPosition `mapstructure:"start-position" toml:"start-position" json:"start-position"` SourceProbeCfg *helper.SourceProbeCfg `mapstructure:"source-probe-config"json:"source-probe-config"` PositionRepo *config.GenericPluginConfig `mapstructure:"position-repo" toml:"position-repo" json:"position-repo"` // If we detect any internal txn tag that matches FailOnTxnTag, just fail. FailOnTxnTags []string `mapstructure:"fail-on-txn-tags" toml:"fail-on-txn-tags"` HeartbeatPeriodStr string `toml:"heartbeat-period" json:"heartbeat-period" mapstructure:"heartbeat-period"` HeartbeatPeriod time.Duration `toml:"-" json:"-" mapstructure:"-"` // // internal configurations that is not exposed to users // DisableBinlogChecker bool `mapstructure:"-"json:"-"` DebugBinlog bool `mapstructure:"-"json:"-"` BinlogSyncerTimeout string `mapstructure:"-"json:"-"` }
Click to show internal directories.
Click to hide internal directories.