Documentation ¶
Index ¶
- Variables
- type Canal
- func (c *Canal) Ack(message *core.Message) error
- func (c *Canal) CatchMasterPos(timeout time.Duration) error
- func (c *Canal) CheckBinlogRowImage(image string) error
- func (c *Canal) ClearTableCache(db []byte, table []byte)
- func (c *Canal) Close()
- func (c *Canal) Ctx() context.Context
- func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)
- func (c *Canal) FlushBinlog() error
- func (c *Canal) Get() *core.Message
- func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)
- func (c *Canal) GetMasterPos() (mysql.Position, error)
- func (c *Canal) GetTable(db string, table string) (*schema.Table, error)
- func (c *Canal) Rollback(message *core.Message)
- func (c *Canal) RunFrom(pos mysql.Position)
- func (c *Canal) SetEventHandler(h EventHandler)
- func (c *Canal) SyncedPosition() mysql.Position
- func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error
- type Config
- type DummyEventHandler
- func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
- func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error
- func (h *DummyEventHandler) OnPosSynced(mysql.Position, bool) error
- func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error
- func (h *DummyEventHandler) OnRow(*core.RowsEvent) error
- func (h *DummyEventHandler) OnTableChanged(schema string, table string) error
- func (h *DummyEventHandler) OnXID(mysql.Position) error
- func (h *DummyEventHandler) String() string
- type DumpConfig
- type EventHandler
- type EventStore
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrExcludedTable = errors.New("excluded table meta")
View Source
var UnknownTableRetryPeriod = time.Second * time.Duration(10)
canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
Functions ¶
This section is empty.
Types ¶
type Canal ¶
type Canal struct {
// contains filtered or unexported fields
}
Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog
func (*Canal) CheckBinlogRowImage ¶
Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (*Canal) ClearTableCache ¶
ClearTableCache clear table cache
func (*Canal) FlushBinlog ¶
func (*Canal) RunFrom ¶
RunFrom 存在master.info则使用其信息,否则从配置文件加载 未配置,使用mysql最后的位置 把位置信息缓存到内存,定时保存位置信息,减小因为保存位置带来的开销
func (*Canal) SetEventHandler ¶
func (c *Canal) SetEventHandler(h EventHandler)
`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.
func (*Canal) SyncedPosition ¶
type Config ¶
type Config struct { Addr string `toml:"addr"` // mysql User string `toml:"user"` // mysql Password string `toml:"password"` // mysql Charset string `toml:"charset"` // mysql ServerID uint32 `toml:"server_id"` // canal Flavor string `toml:"flavor"` // canal HeartbeatPeriod time.Duration `toml:"heartbeat_period"` //mysql ReadTimeout time.Duration `toml:"read_timeout"` // mysql // IncludeTableRegex or ExcludeTableRegex should contain database name // Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed // eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"] // this will include all database's 'canal' table, except database 'mysql' // Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables IncludeTableRegex []string `toml:"include_table_regex"` // mysql ExcludeTableRegex []string `toml:"exclude_table_regex"` // mysql // discard row event without table meta DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"` // mysql Dump DumpConfig `toml:"dump"` UseDecimal bool `toml:"use_decimal"` // msyql // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool `toml:"semi_sync_enabled"` // 新增 BatchSize int64 `toml:"batch_size"` // 事件缓存大小 EventStoreSize int64 `toml:"event_store_size"` }
func NewConfigWithFile ¶
func NewDefaultConfig ¶
func NewDefaultConfig() *Config
type DummyEventHandler ¶
type DummyEventHandler struct { }
func (*DummyEventHandler) OnDDL ¶
func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
func (*DummyEventHandler) OnPosSynced ¶
func (h *DummyEventHandler) OnPosSynced(mysql.Position, bool) error
func (*DummyEventHandler) OnRotate ¶
func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error
func (*DummyEventHandler) OnTableChanged ¶
func (h *DummyEventHandler) OnTableChanged(schema string, table string) error
func (*DummyEventHandler) String ¶
func (h *DummyEventHandler) String() string
type DumpConfig ¶
type DumpConfig struct { // mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc... // If not set, ignore using mysqldump. ExecutionPath string `toml:"mysqldump"` // Will override Databases, tables is in database table_db Tables []string `toml:"tables"` TableDB string `toml:"table_db"` Databases []string `toml:"dbs"` // Ignore table format is db.table IgnoreTables []string `toml:"ignore_tables"` // Dump only selected records. Quotes are mandatory Where string `toml:"where"` // If true, discard error msg, else, output to stderr DiscardErr bool `toml:"discard_err"` // Set true to skip --master-data if we have no privilege to do // 'FLUSH TABLES WITH READ LOCK' SkipMasterData bool `toml:"skip_master_data"` // Set to change the default max_allowed_packet size MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"` }
type EventHandler ¶
type EventHandler interface { OnRotate(roateEvent *replication.RotateEvent) error // OnTableChanged is called when the table is created, altered, renamed or dropped. // You need to clear the associated data like cache with the table. // It will be called before OnDDL. OnTableChanged(schema string, table string) error OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error OnRow(e *core.RowsEvent) error OnXID(nextPos mysql.Position) error OnGTID(gtid mysql.GTIDSet) error // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. OnPosSynced(pos mysql.Position, force bool) error String() string }
type EventStore ¶
type EventStore struct {
// contains filtered or unexported fields
}
EventStore 事件存储
func NewEventStore ¶
func NewEventStore(size, batchSize int64, storer master.Store, flush time.Duration, ctx context.Context) *EventStore
NewEventStore new event store
Click to show internal directories.
Click to hide internal directories.