Documentation ¶
Index ¶
- Constants
- Variables
- type Canal
- func (c *Canal) AddDumpDatabases(dbs ...string)
- func (c *Canal) AddDumpIgnoreTables(db string, tables ...string)
- func (c *Canal) AddDumpTables(db string, tables ...string)
- func (c *Canal) CatchMasterPos(timeout time.Duration) error
- func (c *Canal) CheckBinlogRowImage(image string) error
- func (c *Canal) ClearTableCache(db []byte, table []byte)
- func (c *Canal) Close()
- func (c *Canal) Ctx() context.Context
- func (c *Canal) Dump() error
- func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)
- func (c *Canal) FlushBinlog() error
- func (c *Canal) GetDelay() uint32
- func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)
- func (c *Canal) GetMasterPos() (mysql.Position, error)
- func (c *Canal) GetTable(db string, table string) (*schema.Table, error)
- func (c *Canal) Run() error
- func (c *Canal) RunFrom(pos mysql.Position) error
- func (c *Canal) SetEventHandler(h EventHandler)
- func (c *Canal) SetTableCache(db []byte, table []byte, schema *schema.Table)
- func (c *Canal) StartFromGTID(set mysql.GTIDSet) error
- func (c *Canal) SyncedGTIDSet() mysql.GTIDSet
- func (c *Canal) SyncedPosition() mysql.Position
- func (c *Canal) SyncedTimestamp() uint32
- func (c *Canal) WaitDumpDone() <-chan struct{}
- func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error
- type Config
- type DummyEventHandler
- func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error
- func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.BinlogGTIDEvent) error
- func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error
- func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error
- func (h *DummyEventHandler) OnRow(*RowsEvent) error
- func (h *DummyEventHandler) OnRowsQueryEvent(*replication.RowsQueryEvent) error
- func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, string) error
- func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error
- func (h *DummyEventHandler) String() string
- type DumpConfig
- type EventHandler
- type RowsEvent
Constants ¶
const ( UpdateAction = "update" InsertAction = "insert" DeleteAction = "delete" )
The action name for sync.
Variables ¶
var ErrExcludedTable = errors.New("excluded table meta")
var UnknownTableRetryPeriod = time.Second * time.Duration(10)
canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
Functions ¶
This section is empty.
Types ¶
type Canal ¶
type Canal struct {
// contains filtered or unexported fields
}
Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog
func (*Canal) AddDumpDatabases ¶
func (*Canal) AddDumpIgnoreTables ¶
func (*Canal) AddDumpTables ¶
func (*Canal) CheckBinlogRowImage ¶
CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (*Canal) ClearTableCache ¶
ClearTableCache clear table cache
func (*Canal) FlushBinlog ¶
func (*Canal) Run ¶
Run will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data. It will run forever until meeting an error or Canal closed.
func (*Canal) SetEventHandler ¶
func (c *Canal) SetEventHandler(h EventHandler)
`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.
func (*Canal) SetTableCache ¶
SetTableCache sets table cache value for the given table
func (*Canal) SyncedGTIDSet ¶
func (*Canal) SyncedPosition ¶
func (*Canal) SyncedTimestamp ¶
func (*Canal) WaitDumpDone ¶
func (c *Canal) WaitDumpDone() <-chan struct{}
type Config ¶
type Config struct { Addr string `toml:"addr"` User string `toml:"user"` Password string `toml:"password"` Charset string `toml:"charset"` ServerID uint32 `toml:"server_id"` Flavor string `toml:"flavor"` HeartbeatPeriod time.Duration `toml:"heartbeat_period"` ReadTimeout time.Duration `toml:"read_timeout"` // IncludeTableRegex or ExcludeTableRegex should contain database name. // IncludeTableRegex defines the tables that will be included, if empty, all tables will be included. // ExcludeTableRegex defines the tables that will be excluded from the ones defined by IncludeTableRegex. // Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed // eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"] // this will include all database's 'canal' table, except database 'mysql'. // Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables IncludeTableRegex []string `toml:"include_table_regex"` ExcludeTableRegex []string `toml:"exclude_table_regex"` // discard row event without table meta DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"` Dump DumpConfig `toml:"dump"` UseDecimal bool `toml:"use_decimal"` ParseTime bool `toml:"parse_time"` TimestampStringLocation *time.Location // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool `toml:"semi_sync_enabled"` // maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry. // this configuration will not work if DisableRetrySync is true MaxReconnectAttempts int `toml:"max_reconnect_attempts"` // whether disable re-sync for broken connection DisableRetrySync bool `toml:"disable_retry_sync"` // whether the function WaitUntilPos() can use FLUSH BINARY LOGS // to ensure we advance past a position. This should not strictly be required, // and requires additional privileges. DisableFlushBinlogWhileWaiting bool `toml:"disable_flush_binlog_while_waiting"` // Set TLS config TLSConfig *tls.Config // Set Logger Logger loggers.Advanced // Set Dialer Dialer client.Dialer // Set Localhost Localhost string }
func NewConfigWithFile ¶
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
NewDefaultConfig initiates some default config for Canal
type DummyEventHandler ¶
type DummyEventHandler struct { }
func (*DummyEventHandler) OnDDL ¶
func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error
func (*DummyEventHandler) OnGTID ¶
func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.BinlogGTIDEvent) error
func (*DummyEventHandler) OnPosSynced ¶
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error
func (*DummyEventHandler) OnRotate ¶
func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error
func (*DummyEventHandler) OnRow ¶
func (h *DummyEventHandler) OnRow(*RowsEvent) error
func (*DummyEventHandler) OnRowsQueryEvent ¶
func (h *DummyEventHandler) OnRowsQueryEvent(*replication.RowsQueryEvent) error
func (*DummyEventHandler) OnTableChanged ¶
func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, string) error
func (*DummyEventHandler) OnXID ¶
func (h *DummyEventHandler) OnXID(*replication.EventHeader, mysql.Position) error
func (*DummyEventHandler) String ¶
func (h *DummyEventHandler) String() string
type DumpConfig ¶
type DumpConfig struct { // mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc... // If not set, ignore using mysqldump. ExecutionPath string `toml:"mysqldump"` // Will override Databases, tables is in database table_db Tables []string `toml:"tables"` TableDB string `toml:"table_db"` Databases []string `toml:"dbs"` // Ignore table format is db.table IgnoreTables []string `toml:"ignore_tables"` // Dump only selected records. Quotes are mandatory Where string `toml:"where"` // If true, discard error msg, else, output to stderr DiscardErr bool `toml:"discard_err"` // Set true to skip --master-data if we have no privilege to do // 'FLUSH TABLES WITH READ LOCK' SkipMasterData bool `toml:"skip_master_data"` // Set to change the default max_allowed_packet size MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"` // Set to change the default protocol to connect with Protocol string `toml:"protocol"` // Set extra options ExtraOptions []string `toml:"extra_options"` }
type EventHandler ¶
type EventHandler interface { OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error // OnTableChanged is called when the table is created, altered, renamed or dropped. // You need to clear the associated data like cache with the table. // It will be called before OnDDL. OnTableChanged(header *replication.EventHeader, schema string, table string) error OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error OnRow(e *RowsEvent) error OnXID(header *replication.EventHeader, nextPos mysql.Position) error OnGTID(header *replication.EventHeader, gtidEvent mysql.BinlogGTIDEvent) error // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error // OnRowsQueryEvent is called when binlog_rows_query_log_events=ON for each DML query. // You'll get the original executed query, with comments if present. // It will be called before OnRow. OnRowsQueryEvent(e *replication.RowsQueryEvent) error String() string }
type RowsEvent ¶
type RowsEvent struct { Table *schema.Table Action string // changed row list // binlog has three update event version, v0, v1 and v2. // for v1 and v2, the rows number must be even. // Two rows for one event, format is [before update row, after update row] // for update v0, only one row for a event, and we don't support this version. Rows [][]interface{} // Header can be used to inspect the event Header *replication.EventHeader }
RowsEvent is the event for row replication.