canal

package
v1.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 23, 2022 License: MIT Imports: 23 Imported by: 3

Documentation

Index

Constants

View Source
const (
	UpdateAction = "update"
	InsertAction = "insert"
	DeleteAction = "delete"
)

The action name for sync.

View Source
const (
	DropTempTab       = "^DROP\\s+(\\/\\*\\!40005\\s+)?TEMPORARY\\s+(\\*\\/\\s+)?TABLE"
	DropGlobalTempTab = "^DROP\\s+(\\/\\*\\!40005\\s+)?GLOBAL\\s+TEMPORARY\\s+(\\*\\/\\s+)?TABLE"
)

Variables

View Source
var (
	ALTER  = "ALTER"
	CREATE = "CREATE"
	DROP   = "DROP"
)
View Source
var ErrExcludedTable = errors.New("excluded table meta")
View Source
var UnknownTableRetryPeriod = time.Second * time.Duration(10)

canal will retry fetching unknown table's meta after UnknownTableRetryPeriod

Functions

func FilterOther

func FilterOther(e []byte) bool

Types

type Canal

type Canal struct {
	// contains filtered or unexported fields
}

Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog

func NewCanal

func NewCanal(cfg *Config) (*Canal, error)

func (*Canal) CatchMasterPos

func (c *Canal) CatchMasterPos(timeout time.Duration) error

func (*Canal) CheckBinlogRowImage

func (c *Canal) CheckBinlogRowImage(image string) error

CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB

func (*Canal) Close

func (c *Canal) Close()

func (*Canal) Ctx

func (c *Canal) Ctx() context.Context

func (*Canal) Execute

func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)

Execute a SQL

func (*Canal) FlushBinlog

func (c *Canal) FlushBinlog() error

func (*Canal) GetDelay

func (c *Canal) GetDelay() uint32

func (*Canal) GetMasterGTIDSet

func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)

func (*Canal) GetMasterPos

func (c *Canal) GetMasterPos() (mysql.Position, error)

func (*Canal) Run

func (c *Canal) Run() error

Run will first try to dump all data from MySQL master `mysqldump`, then sync from the binlog position in the dump data. It will run forever until meeting an error or Canal closed.

func (*Canal) RunFrom

func (c *Canal) RunFrom(pos mysql.Position) error

RunFrom will sync from the binlog position directly, ignore mysqldump.

func (*Canal) SetEventHandler

func (c *Canal) SetEventHandler(h EventHandler)

`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.

func (*Canal) StartFromGTID

func (c *Canal) StartFromGTID(set mysql.GTIDSet) error

func (*Canal) SyncedGTIDSet

func (c *Canal) SyncedGTIDSet() mysql.GTIDSet

func (*Canal) SyncedPosition

func (c *Canal) SyncedPosition() mysql.Position

func (*Canal) SyncedTimestamp

func (c *Canal) SyncedTimestamp() uint32

func (*Canal) WaitDumpDone

func (c *Canal) WaitDumpDone() <-chan struct{}

func (*Canal) WaitUntilPos

func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error

type Config

type Config struct {
	Addr     string `toml:"addr"`
	User     string `toml:"user"`
	Password string `toml:"password"`

	Charset         string        `toml:"charset"`
	ServerID        uint32        `toml:"server_id"`
	Flavor          string        `toml:"flavor"`
	HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
	ReadTimeout     time.Duration `toml:"read_timeout"`

	// IncludeTableRegex or ExcludeTableRegex should contain database name
	// Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed
	// eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"]
	//     this will include all database's 'canal' table, except database 'mysql'
	// Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables
	IncludeTableRegex []string `toml:"include_table_regex"`
	ExcludeTableRegex []string `toml:"exclude_table_regex"`

	// discard row event without table meta
	DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"`

	UseDecimal bool `toml:"use_decimal"`
	ParseTime  bool `toml:"parse_time"`

	TimestampStringLocation *time.Location

	// SemiSyncEnabled enables semi-sync or not.
	SemiSyncEnabled bool `toml:"semi_sync_enabled"`

	// maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
	// this configuration will not work if DisableRetrySync is true
	MaxReconnectAttempts int `toml:"max_reconnect_attempts"`

	// whether disable re-sync for broken connection
	DisableRetrySync bool `toml:"disable_retry_sync"`

	// Set TLS config
	TLSConfig *tls.Config

	//Set Logger
	Logger *logrus.Logger
}

func NewConfig

func NewConfig(data string) (*Config, error)

func NewConfigWithFile

func NewConfigWithFile(name string) (*Config, error)

func NewDefaultConfig

func NewDefaultConfig() *Config

type DummyEventHandler

type DummyEventHandler struct{}

func (*DummyEventHandler) OnDataBaseDDL

func (h *DummyEventHandler) OnDataBaseDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, dbDDL interface{}, rawData []byte) error

func (*DummyEventHandler) OnGTID

func (h *DummyEventHandler) OnGTID(mysql.GTIDSet, []byte) error

func (*DummyEventHandler) OnGrantDDL

func (h *DummyEventHandler) OnGrantDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error

func (*DummyEventHandler) OnIndexDDL

func (h *DummyEventHandler) OnIndexDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error

func (*DummyEventHandler) OnPosSynced

func (*DummyEventHandler) OnRotate

func (*DummyEventHandler) OnRow

func (h *DummyEventHandler) OnRow(*RowsEvent, []byte) error

func (*DummyEventHandler) OnSequenceDDL

func (h *DummyEventHandler) OnSequenceDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error

func (*DummyEventHandler) OnTableDDL

func (h *DummyEventHandler) OnTableDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, tabDDL interface{}, rawData []byte) error

func (*DummyEventHandler) OnTransaction added in v1.1.8

func (h *DummyEventHandler) OnTransaction(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error

func (*DummyEventHandler) OnUserDDL

func (h *DummyEventHandler) OnUserDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error

func (*DummyEventHandler) OnViewDDL

func (h *DummyEventHandler) OnViewDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error

func (*DummyEventHandler) OnXID

func (h *DummyEventHandler) OnXID(pos mysql.Position, xid *replication.XIDEvent, rawData []byte) error

func (*DummyEventHandler) String

func (h *DummyEventHandler) String() string

type EventHandler

type EventHandler interface {
	OnRotate(roateEvent *replication.RotateEvent, rawData []byte) error
	OnRow(e *RowsEvent, rawData []byte) error
	OnXID(nextPos mysql.Position, xid *replication.XIDEvent, rawData []byte) error
	OnGTID(gtid mysql.GTIDSet, rawData []byte) error
	// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
	OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error
	String() string

	OnTableDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
	OnDataBaseDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
	OnIndexDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
	OnViewDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
	OnSequenceDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
	OnUserDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
	OnGrantDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
	OnTransaction(nextPos mysql.Position, queryEvent *replication.QueryEvent, d interface{}, rawData []byte) error
}

type RowsEvent

type RowsEvent struct {
	Action string
	// changed row list
	// binlog has three update event version, v0, v1 and v2.
	// for v1 and v2, the rows number must be even.
	// Two rows for one event, format is [before update row, after update row]
	// for update v0, only one row for a event, and we don't support this version.
	RowsEvent *replication.RowsEvent
	// Header can be used to inspect the event
	Header *replication.EventHeader
}

RowsEvent is the event for row replication.

func (*RowsEvent) String

func (r *RowsEvent) String() string

String implements fmt.Stringer interface.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL