Versions in this module Expand all Collapse all v1 v1.6.1 Aug 23, 2022 Changes in this version + const DeleteAction + const InsertAction + const UpdateAction + var ErrExcludedTable = errors.New("excluded table meta") + var UnknownTableRetryPeriod = time.Second * time.Duration(10) + type Canal struct + func NewCanal(cfg *Config) (*Canal, error) + func (c *Canal) AddDumpDatabases(dbs ...string) + func (c *Canal) AddDumpIgnoreTables(db string, tables ...string) + func (c *Canal) AddDumpTables(db string, tables ...string) + func (c *Canal) CatchMasterPos(timeout time.Duration) error + func (c *Canal) CheckBinlogRowImage(image string) error + func (c *Canal) ClearTableCache(db []byte, table []byte) + func (c *Canal) Close() + func (c *Canal) Ctx() context.Context + func (c *Canal) Dump() error + func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) + func (c *Canal) FlushBinlog() error + func (c *Canal) GetDelay() uint32 + func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error) + func (c *Canal) GetMasterPos() (mysql.Position, error) + func (c *Canal) GetTable(db string, table string) (*schema.Table, error) + func (c *Canal) Run() error + func (c *Canal) RunFrom(pos mysql.Position) error + func (c *Canal) SetEventHandler(h EventHandler) + func (c *Canal) SetTableCache(db []byte, table []byte, schema *schema.Table) + func (c *Canal) StartFromGTID(set mysql.GTIDSet) error + func (c *Canal) SyncedGTIDSet() mysql.GTIDSet + func (c *Canal) SyncedPosition() mysql.Position + func (c *Canal) SyncedTimestamp() uint32 + func (c *Canal) WaitDumpDone() <-chan struct{} + func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error + type Config struct + Addr string + Charset string + Dialer client.Dialer + DisableRetrySync bool + DiscardNoMetaRowEvent bool + Dump DumpConfig + ExcludeTableRegex []string + Flavor string + HeartbeatPeriod time.Duration + IncludeTableRegex []string + Logger *log.Logger + MaxReconnectAttempts int + ParseTime bool + Password string + ReadTimeout time.Duration + SemiSyncEnabled bool + ServerID uint32 + SkipDump bool + TLSConfig *tls.Config + TimestampStringLocation *time.Location + UseDecimal bool + User string + func NewConfig(data string) (*Config, error) + func NewConfigWithFile(name string) (*Config, error) + func NewDefaultConfig() *Config + type DummyEventHandler struct + func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error + func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error + func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error + func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error + func (h *DummyEventHandler) OnRow(*RowsEvent) error + func (h *DummyEventHandler) OnTableChanged(schema string, table string) error + func (h *DummyEventHandler) OnXID(mysql.Position) error + func (h *DummyEventHandler) String() string + type DumpConfig struct + Databases []string + DiscardErr bool + ExecutionPath string + ExtraOptions []string + IgnoreTables []string + MaxAllowedPacketMB int + Protocol string + SkipMasterData bool + TableDB string + Tables []string + Where string + type EventHandler interface + OnDDL func(nextPos mysql.Position, queryEvent *replication.QueryEvent) error + OnGTID func(gtid mysql.GTIDSet) error + OnPosSynced func(pos mysql.Position, set mysql.GTIDSet, force bool) error + OnRotate func(rotateEvent *replication.RotateEvent) error + OnRow func(e *RowsEvent) error + OnTableChanged func(schema string, table string) error + OnXID func(nextPos mysql.Position) error + String func() string + type RowsEvent struct + Action string + Header *replication.EventHeader + Rows [][]interface{} + Table *schema.Table + func (r *RowsEvent) String() string