canal

package
v0.0.0-...-e68e88a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2016 License: MIT Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UpdateAction = "update"
	InsertAction = "insert"
	DeleteAction = "delete"
)

Variables

View Source
var (
	ErrHandleInterrupted = errors.New("do handler error, interrupted")
)

Functions

func GetPKValues

func GetPKValues(table *schema.Table, row []interface{}) ([]interface{}, error)

Get primary keys in one row for a table, a table may use multi fields as the PK

Types

type Canal

type Canal struct {
	// contains filtered or unexported fields
}

Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog

func NewCanal

func NewCanal(cfg *Config) (*Canal, error)

func (*Canal) AddDumpDatabases

func (c *Canal) AddDumpDatabases(dbs ...string)

func (*Canal) AddDumpIgnoreTables

func (c *Canal) AddDumpIgnoreTables(db string, tables ...string)

func (*Canal) AddDumpTables

func (c *Canal) AddDumpTables(db string, tables ...string)

func (*Canal) CatchMasterPos

func (c *Canal) CatchMasterPos(timeout int) error

func (*Canal) CheckBinlogRowImage

func (c *Canal) CheckBinlogRowImage(image string) error

Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB

func (*Canal) Close

func (c *Canal) Close()

func (*Canal) Execute

func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error)

Execute a SQL

func (*Canal) GetTable

func (c *Canal) GetTable(db string, table string) (*schema.Table, error)

func (*Canal) RegRowsEventHandler

func (c *Canal) RegRowsEventHandler(h RowsEventHandler)

func (*Canal) Start

func (c *Canal) Start() error

func (*Canal) SyncedPosition

func (c *Canal) SyncedPosition() mysql.Position

func (*Canal) WaitDumpDone

func (c *Canal) WaitDumpDone() <-chan struct{}

func (*Canal) WaitUntilPos

func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error

type Config

type Config struct {
	Addr     string `toml:"addr"`
	User     string `toml:"user"`
	Password string `toml:"password"`

	ServerID uint32 `toml:"server_id"`
	Flavor   string `toml:"flavor"`
	DataDir  string `toml:"data_dir"`

	Dump DumpConfig `toml:"dump"`
}

func NewConfig

func NewConfig(data string) (*Config, error)

func NewConfigWithFile

func NewConfigWithFile(name string) (*Config, error)

func NewDefaultConfig

func NewDefaultConfig() *Config

type DumpConfig

type DumpConfig struct {
	// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
	ExecutionPath string `toml:"mysqldump"`

	// Will override Databases, tables is in database table_db
	Tables  []string `toml:"tables"`
	TableDB string   `toml:"table_db"`

	Databases []string `toml:"dbs"`

	// Ignore table format is db.table
	IgnoreTables []string `toml:"ignore_tables"`

	// If true, discard error msg, else, output to stderr
	DiscardErr bool `toml:"discard_err"`
}

type RowsEvent

type RowsEvent struct {
	Table  *schema.Table
	Action string
	// changed row list
	// binlog has three update event version, v0, v1 and v2.
	// for v1 and v2, the rows number must be even.
	// Two rows for one event, format is [before update row, after update row]
	// for update v0, only one row for a event, and we don't support this version.
	Rows [][]interface{}
}

type RowsEventHandler

type RowsEventHandler interface {
	// Handle RowsEvent, if return ErrHandleInterrupted, canal will
	// stop the sync
	Do(e *RowsEvent) error
	String() string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL