Documentation ¶
Index ¶
- Constants
- Variables
- func FilterOther(e []byte) bool
- type Canal
- func (c *Canal) CatchMasterPos(timeout time.Duration) error
- func (c *Canal) CheckBinlogRowImage(image string) error
- func (c *Canal) Close()
- func (c *Canal) Ctx() context.Context
- func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)
- func (c *Canal) FlushBinlog() error
- func (c *Canal) GetDelay() uint32
- func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)
- func (c *Canal) GetMasterPos() (mysql.Position, error)
- func (c *Canal) Run() error
- func (c *Canal) RunFrom(pos mysql.Position) error
- func (c *Canal) SetEventHandler(h EventHandler)
- func (c *Canal) StartFromGTID(set mysql.GTIDSet) error
- func (c *Canal) SyncedGTIDSet() mysql.GTIDSet
- func (c *Canal) SyncedPosition() mysql.Position
- func (c *Canal) SyncedTimestamp() uint32
- func (c *Canal) WaitDumpDone() <-chan struct{}
- func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error
- type Config
- type DummyEventHandler
- func (h *DummyEventHandler) OnDataBaseDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, dbDDL interface{}, ...) error
- func (h *DummyEventHandler) OnGTID(mysql.GTIDSet, []byte) error
- func (h *DummyEventHandler) OnGrantDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, ...) error
- func (h *DummyEventHandler) OnIndexDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, ...) error
- func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error
- func (h *DummyEventHandler) OnRotate(*replication.RotateEvent, []byte) error
- func (h *DummyEventHandler) OnRow(*RowsEvent, []byte) error
- func (h *DummyEventHandler) OnSequenceDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, ...) error
- func (h *DummyEventHandler) OnTableDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, tabDDL interface{}, ...) error
- func (h *DummyEventHandler) OnTransaction(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, ...) error
- func (h *DummyEventHandler) OnUserDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, ...) error
- func (h *DummyEventHandler) OnViewDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, ...) error
- func (h *DummyEventHandler) OnXID(pos mysql.Position, xid *replication.XIDEvent, rawData []byte) error
- func (h *DummyEventHandler) String() string
- type EventHandler
- type RowsEvent
Constants ¶
View Source
const ( UpdateAction = "update" InsertAction = "insert" DeleteAction = "delete" )
The action name for sync.
View Source
const ( DropTempTab = "^DROP\\s+(\\/\\*\\!40005\\s+)?TEMPORARY\\s+(\\*\\/\\s+)?TABLE" DropGlobalTempTab = "^DROP\\s+(\\/\\*\\!40005\\s+)?GLOBAL\\s+TEMPORARY\\s+(\\*\\/\\s+)?TABLE" )
Variables ¶
View Source
var ( ALTER = "ALTER" CREATE = "CREATE" DROP = "DROP" )
View Source
var ErrExcludedTable = errors.New("excluded table meta")
View Source
var UnknownTableRetryPeriod = time.Second * time.Duration(10)
canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
Functions ¶
func FilterOther ¶
Types ¶
type Canal ¶
type Canal struct {
// contains filtered or unexported fields
}
Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog
func (*Canal) CheckBinlogRowImage ¶
CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (*Canal) FlushBinlog ¶
func (*Canal) Run ¶
Run will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data. It will run forever until meeting an error or Canal closed.
func (*Canal) SetEventHandler ¶
func (c *Canal) SetEventHandler(h EventHandler)
`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.
func (*Canal) SyncedGTIDSet ¶
func (*Canal) SyncedPosition ¶
func (*Canal) SyncedTimestamp ¶
func (*Canal) WaitDumpDone ¶
func (c *Canal) WaitDumpDone() <-chan struct{}
type Config ¶
type Config struct { Addr string `toml:"addr"` User string `toml:"user"` Password string `toml:"password"` Charset string `toml:"charset"` ServerID uint32 `toml:"server_id"` Flavor string `toml:"flavor"` HeartbeatPeriod time.Duration `toml:"heartbeat_period"` ReadTimeout time.Duration `toml:"read_timeout"` // IncludeTableRegex or ExcludeTableRegex should contain database name // Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed // eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"] // this will include all database's 'canal' table, except database 'mysql' // Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables IncludeTableRegex []string `toml:"include_table_regex"` ExcludeTableRegex []string `toml:"exclude_table_regex"` // discard row event without table meta DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"` UseDecimal bool `toml:"use_decimal"` ParseTime bool `toml:"parse_time"` TimestampStringLocation *time.Location // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool `toml:"semi_sync_enabled"` // maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry. // this configuration will not work if DisableRetrySync is true MaxReconnectAttempts int `toml:"max_reconnect_attempts"` // whether disable re-sync for broken connection DisableRetrySync bool `toml:"disable_retry_sync"` // Set TLS config TLSConfig *tls.Config //Set Logger Logger *logrus.Logger }
func NewConfigWithFile ¶
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
type DummyEventHandler ¶
type DummyEventHandler struct{}
func (*DummyEventHandler) OnDataBaseDDL ¶
func (h *DummyEventHandler) OnDataBaseDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, dbDDL interface{}, rawData []byte) error
func (*DummyEventHandler) OnGrantDDL ¶
func (h *DummyEventHandler) OnGrantDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
func (*DummyEventHandler) OnIndexDDL ¶
func (h *DummyEventHandler) OnIndexDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
func (*DummyEventHandler) OnPosSynced ¶
func (*DummyEventHandler) OnRotate ¶
func (h *DummyEventHandler) OnRotate(*replication.RotateEvent, []byte) error
func (*DummyEventHandler) OnSequenceDDL ¶
func (h *DummyEventHandler) OnSequenceDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
func (*DummyEventHandler) OnTableDDL ¶
func (h *DummyEventHandler) OnTableDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, tabDDL interface{}, rawData []byte) error
func (*DummyEventHandler) OnTransaction ¶ added in v1.1.8
func (h *DummyEventHandler) OnTransaction(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
func (*DummyEventHandler) OnUserDDL ¶
func (h *DummyEventHandler) OnUserDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
func (*DummyEventHandler) OnViewDDL ¶
func (h *DummyEventHandler) OnViewDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
func (*DummyEventHandler) OnXID ¶
func (h *DummyEventHandler) OnXID(pos mysql.Position, xid *replication.XIDEvent, rawData []byte) error
func (*DummyEventHandler) String ¶
func (h *DummyEventHandler) String() string
type EventHandler ¶
type EventHandler interface { OnRotate(roateEvent *replication.RotateEvent, rawData []byte) error OnRow(e *RowsEvent, rawData []byte) error OnXID(nextPos mysql.Position, xid *replication.XIDEvent, rawData []byte) error OnGTID(gtid mysql.GTIDSet, rawData []byte) error // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error String() string OnTableDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error OnDataBaseDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error OnIndexDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error OnViewDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error OnSequenceDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error OnUserDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error OnGrantDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error OnTransaction(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error }
type RowsEvent ¶
type RowsEvent struct { Action string // changed row list // binlog has three update event version, v0, v1 and v2. // for v1 and v2, the rows number must be even. // Two rows for one event, format is [before update row, after update row] // for update v0, only one row for a event, and we don't support this version. RowsEvent *replication.RowsEvent // Header can be used to inspect the event Header *replication.EventHeader }
RowsEvent is the event for row replication.
Click to show internal directories.
Click to hide internal directories.