canal

package
v0.0.0-...-677aa34 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 25, 2020 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrExcludedTable = errors.New("excluded table meta")
View Source
var UnknownTableRetryPeriod = time.Second * time.Duration(10)

canal will retry fetching unknown table's meta after UnknownTableRetryPeriod

Functions

This section is empty.

Types

type Canal

type Canal struct {
	// contains filtered or unexported fields
}

Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog

func NewCanal

func NewCanal(cfg *config.Config, api *clientv3.Client, ctx context.Context) (*Canal, error)

func (*Canal) Ack

func (c *Canal) Ack(message *core.Message) error

Ack ack

func (*Canal) CatchMasterPos

func (c *Canal) CatchMasterPos(timeout time.Duration) error

func (*Canal) CheckBinlogRowImage

func (c *Canal) CheckBinlogRowImage(image string) error

Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB

func (*Canal) ClearTableCache

func (c *Canal) ClearTableCache(db []byte, table []byte)

ClearTableCache clear table cache

func (*Canal) Close

func (c *Canal) Close()

func (*Canal) Ctx

func (c *Canal) Ctx() context.Context

func (*Canal) Execute

func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)

Execute a SQL

func (*Canal) FlushBinlog

func (c *Canal) FlushBinlog() error

func (*Canal) Get

func (c *Canal) Get() *core.Message

Get 从EventStore中获取消息

func (*Canal) GetMasterGTIDSet

func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)

func (*Canal) GetMasterPos

func (c *Canal) GetMasterPos() (mysql.Position, error)

func (*Canal) GetTable

func (c *Canal) GetTable(db string, table string) (*schema.Table, error)

func (*Canal) Rollback

func (c *Canal) Rollback(message *core.Message)

Rollback rollback

func (*Canal) RunFrom

func (c *Canal) RunFrom(pos mysql.Position)

RunFrom 存在master.info则使用其信息,否则从配置文件加载 未配置,使用mysql最后的位置 把位置信息缓存到内存,定时保存位置信息,减小因为保存位置带来的开销

func (*Canal) SetEventHandler

func (c *Canal) SetEventHandler(h EventHandler)

`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.

func (*Canal) SyncedPosition

func (c *Canal) SyncedPosition() mysql.Position

func (*Canal) WaitUntilPos

func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error

type Config

type Config struct {
	Addr     string `toml:"addr"`     // mysql
	User     string `toml:"user"`     // mysql
	Password string `toml:"password"` // mysql

	Charset         string        `toml:"charset"`          // mysql
	ServerID        uint32        `toml:"server_id"`        // canal
	Flavor          string        `toml:"flavor"`           // canal
	HeartbeatPeriod time.Duration `toml:"heartbeat_period"` //mysql
	ReadTimeout     time.Duration `toml:"read_timeout"`     // mysql

	// IncludeTableRegex or ExcludeTableRegex should contain database name
	// Only a table which matches IncludeTableRegex and dismatches ExcludeTableRegex will be processed
	// eg, IncludeTableRegex : [".*\\.canal"], ExcludeTableRegex : ["mysql\\..*"]
	//     this will include all database's 'canal' table, except database 'mysql'
	// Default IncludeTableRegex and ExcludeTableRegex are empty, this will include all tables
	IncludeTableRegex []string `toml:"include_table_regex"` // mysql
	ExcludeTableRegex []string `toml:"exclude_table_regex"` // mysql

	// discard row event without table meta
	DiscardNoMetaRowEvent bool `toml:"discard_no_meta_row_event"` // mysql

	Dump DumpConfig `toml:"dump"`

	UseDecimal bool `toml:"use_decimal"` // msyql

	// SemiSyncEnabled enables semi-sync or not.
	SemiSyncEnabled bool `toml:"semi_sync_enabled"`

	// 新增
	BatchSize int64 `toml:"batch_size"`

	// 事件缓存大小
	EventStoreSize int64 `toml:"event_store_size"`
}

func NewConfig

func NewConfig(data string) (*Config, error)

func NewConfigWithFile

func NewConfigWithFile(name string) (*Config, error)

func NewDefaultConfig

func NewDefaultConfig() *Config

type DummyEventHandler

type DummyEventHandler struct {
}

func (*DummyEventHandler) OnDDL

func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error

func (*DummyEventHandler) OnGTID

func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error

func (*DummyEventHandler) OnPosSynced

func (h *DummyEventHandler) OnPosSynced(mysql.Position, bool) error

func (*DummyEventHandler) OnRotate

func (*DummyEventHandler) OnRow

func (*DummyEventHandler) OnTableChanged

func (h *DummyEventHandler) OnTableChanged(schema string, table string) error

func (*DummyEventHandler) OnXID

func (*DummyEventHandler) String

func (h *DummyEventHandler) String() string

type DumpConfig

type DumpConfig struct {
	// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
	// If not set, ignore using mysqldump.
	ExecutionPath string `toml:"mysqldump"`

	// Will override Databases, tables is in database table_db
	Tables  []string `toml:"tables"`
	TableDB string   `toml:"table_db"`

	Databases []string `toml:"dbs"`

	// Ignore table format is db.table
	IgnoreTables []string `toml:"ignore_tables"`

	// Dump only selected records. Quotes are mandatory
	Where string `toml:"where"`

	// If true, discard error msg, else, output to stderr
	DiscardErr bool `toml:"discard_err"`

	// Set true to skip --master-data if we have no privilege to do
	// 'FLUSH TABLES WITH READ LOCK'
	SkipMasterData bool `toml:"skip_master_data"`

	// Set to change the default max_allowed_packet size
	MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"`
}

type EventHandler

type EventHandler interface {
	OnRotate(roateEvent *replication.RotateEvent) error
	// OnTableChanged is called when the table is created, altered, renamed or dropped.
	// You need to clear the associated data like cache with the table.
	// It will be called before OnDDL.
	OnTableChanged(schema string, table string) error
	OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
	OnRow(e *core.RowsEvent) error
	OnXID(nextPos mysql.Position) error
	OnGTID(gtid mysql.GTIDSet) error
	// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
	OnPosSynced(pos mysql.Position, force bool) error
	String() string
}

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore 事件存储

func NewEventStore

func NewEventStore(size, batchSize int64,
	storer master.Store, flush time.Duration,
	ctx context.Context) *EventStore

NewEventStore new event store

func (*EventStore) Ack

func (store *EventStore) Ack(batchId int64) error

Ack 确认完成

func (*EventStore) Get

func (store *EventStore) Get() (int64, []core.Entity)

Get 获取Rows Event,返回 size <= batchSize,没数据则返回 0 个

func (*EventStore) Put

func (store *EventStore) Put(rowEvent core.Entity)

Put put event row

func (*EventStore) Rollback

func (store *EventStore) Rollback()

Rollback Get游标会退到ack游标

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL