canal

package
v1.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2024 License: MIT Imports: 28 Imported by: 109

Documentation

Index

Constants

View Source
const (
	UpdateAction = "update"
	InsertAction = "insert"
	DeleteAction = "delete"
)

The action name for sync.

Variables

View Source
var ErrExcludedTable = errors.New("excluded table meta")
View Source
var UnknownTableRetryPeriod = time.Second * time.Duration(10)

canal will retry fetching unknown table's meta after UnknownTableRetryPeriod

Functions

This section is empty.

Types

type Canal

type Canal struct {
	// contains filtered or unexported fields
}

Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog

func NewCanal

func NewCanal(cfg *Config) (*Canal, error)

func (*Canal) AddDumpDatabases

func (c *Canal) AddDumpDatabases(dbs ...string)

func (*Canal) AddDumpIgnoreTables

func (c *Canal) AddDumpIgnoreTables(db string, tables ...string)

func (*Canal) AddDumpTables

func (c *Canal) AddDumpTables(db string, tables ...string)

func (*Canal) CatchMasterPos

func (c *Canal) CatchMasterPos(timeout time.Duration) error

func (*Canal) CheckBinlogRowImage

func (c *Canal) CheckBinlogRowImage(image string) error

CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB

func (*Canal) ClearTableCache

func (c *Canal) ClearTableCache(db []byte, table []byte)

ClearTableCache clear table cache

func (*Canal) Close

func (c *Canal) Close()

func (*Canal) Ctx

func (c *Canal) Ctx() context.Context

func (*Canal) Dump

func (c *Canal) Dump() error

Dump all data from MySQL master `mysqldump`, ignore sync binlog.

func (*Canal) Execute

func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)

Execute a SQL

func (*Canal) FlushBinlog

func (c *Canal) FlushBinlog() error

func (*Canal) GetDelay

func (c *Canal) GetDelay() uint32

func (*Canal) GetMasterGTIDSet

func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)

func (*Canal) GetMasterPos

func (c *Canal) GetMasterPos() (mysql.Position, error)

func (*Canal) GetTable

func (c *Canal) GetTable(db string, table string) (*schema.Table, error)

func (*Canal) Run

func (c *Canal) Run() error

Run will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data. It will run forever until meeting an error or Canal closed.

func (*Canal) RunFrom

func (c *Canal) RunFrom(pos mysql.Position) error

RunFrom will sync from the binlog position directly, ignore mysqldump.

func (*Canal) SetEventHandler

func (c *Canal) SetEventHandler(h EventHandler)

`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.

func (*Canal) SetTableCache added in v1.7.0

func (c *Canal) SetTableCache(db []byte, table []byte, schema *schema.Table)

SetTableCache sets table cache value for the given table

func (*Canal) StartFromGTID

func (c *Canal) StartFromGTID(set mysql.GTIDSet) error

func (*Canal) SyncedGTIDSet

func (c *Canal) SyncedGTIDSet() mysql.GTIDSet

func (*Canal) SyncedPosition

func (c *Canal) SyncedPosition() mysql.Position

func (*Canal) SyncedTimestamp

func (c *Canal) SyncedTimestamp() uint32

func (*Canal) WaitDumpDone

func (c *Canal) WaitDumpDone() <-chan struct{}

func (*Canal) WaitUntilPos

func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error

type Config

type Config struct {
	Addr     string `toml:"addr"`
	User     string `toml:"user"`
	Password string `toml:"password"`

	Charset         string        `toml:"charset"`
	ServerID        uint32        `toml:"server_id"`
	Flavor          string        `toml:"flavor"`
	HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
	ReadTimeout     time.Duration `toml:"read_timeout"`

	// IncludeTableRegex or ExcludeTableRegex should contain database name.
	// IncludeTableRegex defines the tables that will be included, if empty, all tables will be included.
	// ExcludeTableRegex defines the tables that will be excluded from the ones defined by IncludeTableRegex.
	// Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed
	// eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"]
	//     this will include all database's 'canal' table, except database 'mysql'.
	// Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables
	IncludeTableRegex []string `toml:"include_table_regex"`
	ExcludeTableRegex []string `toml:"exclude_table_regex"`

	// discard row event without table meta
	DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"`

	Dump DumpConfig `toml:"dump"`

	UseDecimal bool `toml:"use_decimal"`
	ParseTime  bool `toml:"parse_time"`

	TimestampStringLocation *time.Location

	// SemiSyncEnabled enables semi-sync or not.
	SemiSyncEnabled bool `toml:"semi_sync_enabled"`

	// maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
	// this configuration will not work if DisableRetrySync is true
	MaxReconnectAttempts int `toml:"max_reconnect_attempts"`

	// whether disable re-sync for broken connection
	DisableRetrySync bool `toml:"disable_retry_sync"`

	// whether the function WaitUntilPos() can use FLUSH BINARY LOGS
	// to ensure we advance past a position. This should not strictly be required,
	// and requires additional privileges.
	DisableFlushBinlogWhileWaiting bool `toml:"disable_flush_binlog_while_waiting"`

	// Set TLS config
	TLSConfig *tls.Config

	// Set Logger
	Logger loggers.Advanced

	// Set Dialer
	Dialer client.Dialer

	// Set Localhost
	Localhost string

	// EventCacheCount is the capacity of the BinlogStreamer internal event channel.
	// the default value is 10240.
	// if you table contain large columns, you can decrease this value to avoid OOM.
	EventCacheCount int
}

func NewConfig

func NewConfig(data string) (*Config, error)

func NewConfigWithFile

func NewConfigWithFile(name string) (*Config, error)

func NewDefaultConfig

func NewDefaultConfig() *Config

NewDefaultConfig initiates some default config for Canal

type DummyEventHandler

type DummyEventHandler struct {
}

func (*DummyEventHandler) OnDDL

func (*DummyEventHandler) OnGTID

func (*DummyEventHandler) OnPosSynced

func (*DummyEventHandler) OnRotate

func (*DummyEventHandler) OnRow

func (h *DummyEventHandler) OnRow(*RowsEvent) error

func (*DummyEventHandler) OnRowsQueryEvent added in v1.8.0

func (h *DummyEventHandler) OnRowsQueryEvent(*replication.RowsQueryEvent) error

func (*DummyEventHandler) OnTableChanged

func (*DummyEventHandler) OnXID

func (*DummyEventHandler) String

func (h *DummyEventHandler) String() string

type DumpConfig

type DumpConfig struct {
	// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
	// If not set, ignore using mysqldump.
	ExecutionPath string `toml:"mysqldump"`

	// Will override Databases, tables is in database table_db
	Tables  []string `toml:"tables"`
	TableDB string   `toml:"table_db"`

	Databases []string `toml:"dbs"`

	// Ignore table format is db.table
	IgnoreTables []string `toml:"ignore_tables"`

	// Dump only selected records. Quotes are mandatory
	Where string `toml:"where"`

	// If true, discard error msg, else, output to stderr
	DiscardErr bool `toml:"discard_err"`

	// Set true to skip --master-data if we have no privilege to do
	// 'FLUSH TABLES WITH READ LOCK'
	SkipMasterData bool `toml:"skip_master_data"`

	// Set to change the default max_allowed_packet size
	MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"`

	// Set to change the default protocol to connect with
	Protocol string `toml:"protocol"`

	// Set extra options
	ExtraOptions []string `toml:"extra_options"`
}

type EventHandler

type EventHandler interface {
	OnRotate(header *replication.EventHeader, rotateEvent *replication.RotateEvent) error
	// OnTableChanged is called when the table is created, altered, renamed or dropped.
	// You need to clear the associated data like cache with the table.
	// It will be called before OnDDL.
	OnTableChanged(header *replication.EventHeader, schema string, table string) error
	OnDDL(header *replication.EventHeader, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
	OnRow(e *RowsEvent) error
	OnXID(header *replication.EventHeader, nextPos mysql.Position) error
	OnGTID(header *replication.EventHeader, gtidEvent mysql.BinlogGTIDEvent) error
	// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
	OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
	// OnRowsQueryEvent is called when binlog_rows_query_log_events=ON for each DML query.
	// You'll get the original executed query, with comments if present.
	// It will be called before OnRow.
	OnRowsQueryEvent(e *replication.RowsQueryEvent) error
	String() string
}

type RowsEvent

type RowsEvent struct {
	Table  *schema.Table
	Action string
	// changed row list
	// binlog has three update event version, v0, v1 and v2.
	// for v1 and v2, the rows number must be even.
	// Two rows for one event, format is [before update row, after update row]
	// for update v0, only one row for a event, and we don't support this version.
	Rows [][]interface{}
	// Header can be used to inspect the event
	Header *replication.EventHeader
}

RowsEvent is the event for row replication.

func (*RowsEvent) String

func (r *RowsEvent) String() string

String implements fmt.Stringer interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL