loader

package
v0.0.0-...-552cffb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2024 License: Apache-2.0 Imports: 34 Imported by: 0

README

loader

A package to load data into MySQL in real-time, aimed to be used by reparo, drainer etc unified.

Getting started
  • Example is available via example_loader_test.go

    You need to write a translator to use Loader like SecondaryBinlogToTxn in translate.go to translate upstream data format (e.g. binlog) into Txn objects.

Overview

Loader splits the upstream transaction DML events and concurrently (shared by primary key or unique key) loads data into MySQL. It respects causality with causality.go.

Optimization

Large Operation

Instead of executing DML one by one, we can combine many small operations into a single large operation, like using INSERT statements with multiple VALUES lists to insert several rows at a time. This is faster than inserting one by one.

Merge by Primary Key

You may want to read log-compaction of Kafka.

We can treat a table with Primary Key like a KV-store. To reload the table with the change history of the table, we only need the last value of every key.

While synchronizing data into downstream at real-time, we can get DML events from upstream in batchs and merge by key. After merging, there's only one event for each key, so at downstream, we don't need to do as many events as upstream. This also help we to use batch insert operation.

We should also consider secondary unique key here, see execTableBatch in executor.go. Currently, we only merge by primary key and do batch operation if the table have primary key and no unique key.

Documentation

Overview

Example
// create sql.DB
db, err := CreateDB("root", "", "localhost", 4000, nil /* *tls.Config */)
if err != nil {
	log.Fatal(err)
}

// init loader
loader, err := NewLoader(db, WorkerCount(16), BatchSize(128))
if err != nil {
	log.Fatal(err)
}

// get the success txn  from loader
go func() {
	// the return order will be the order you push into loader.Input()
	for txn := range loader.Successes() {
		log.Print("succ: ", txn)
	}
}()

// run loader
go func() {
	// return non nil if encounter some case fail to load data the downstream
	// or nil when loader is closed when all data is loaded to downstream
	err := loader.Run()
	if err != nil {
		log.Fatal(err)
	}
}()

// push ddl txn
loader.Input() <- NewDDLTxn("test", "test", "create table test(id primary key)")

// push one insert dml txn
values := map[string]interface{}{"id": 1}
loader.Input() <- &Txn{
	DMLs: []*DML{{Database: "test", Table: "test", Tp: InsertDMLType, Values: values}},
}

// push one update dml txn
newValues := map[string]interface{}{"id": 2}
loader.Input() <- &Txn{
	DMLs: []*DML{{Database: "test", Table: "test", Tp: UpdateDMLType, Values: newValues, OldValues: values}},
}

// you can set safe mode or not at run time
// which use replace for insert event and delete + replace for update make it be idempotent
loader.SetSafeMode(true)

// push one delete dml txn
loader.Input() <- &Txn{
	DMLs: []*DML{{Database: "test", Table: "test", Tp: DeleteDMLType, Values: newValues}},
}
//...

// Close the Loader. No more Txn can be push into Input()
// Run will quit when all data is drained
loader.Close()
Output:

Index

Examples

Constants

This section is empty.

Variables

View Source
var (
	// ErrTableNotExist means the table not exist.
	ErrTableNotExist = errors.New("table not exist")
)

Functions

func CreateDB

func CreateDB(user string, password string, host string, port int, tls *tls.Config) (db *gosql.DB, err error)

CreateDB return sql.DB

func CreateDBWithSQLMode

func CreateDBWithSQLMode(user string, password string, host string, port int, tlsConfig *tls.Config, sqlMode *string, params map[string]string, readTimeout time.Duration) (db *gosql.DB, err error)

CreateDBWithSQLMode return sql.DB

func CreateOracleDB

func CreateOracleDB(user string, password string, host string, port int, serviceName, connectString string) (db *gosql.DB, err error)

CreateOracleDB create Oracle DB connection and return it

Types

type Causality

type Causality struct {
	// contains filtered or unexported fields
}

Causality provides a simple mechanism to improve the concurrency of SQLs execution under the premise of ensuring correctness. causality groups sqls that maybe contain causal relationships, and syncer executes them linearly. if some conflicts exist in more than one groups, then syncer waits all SQLs that are grouped be executed and reset causality. this mechanism meets quiescent consistency to ensure correctness.

func NewCausality

func NewCausality() *Causality

NewCausality return a instance of Causality

func (*Causality) Add

func (c *Causality) Add(keys []string) error

Add add keys to Causality

func (*Causality) DetectConflict

func (c *Causality) DetectConflict(keys []string) bool

DetectConflict detects whether there is a conflict

func (*Causality) Get

func (c *Causality) Get(key string) string

Get gets the token of key

func (*Causality) Reset

func (c *Causality) Reset()

Reset reset Causality

type DBType

type DBType int

DBType can be Mysql/Tidb or Oracle

const (
	DBTypeUnknown DBType = iota
	MysqlDB
	TiDB
	OracleDB
)

DBType types

type DDL

type DDL struct {
	Database string
	Table    string
	SQL      string
	// should skip to execute this DDL at downstream and just refresh the downstream table info.
	// one case for this usage is for bidirectional replication and only execute DDL at one side.
	ShouldSkip bool
}

DDL holds the ddl info

type DML

type DML struct {
	Database string
	Table    string

	Tp DMLType
	// only set when Tp = UpdateDMLType
	OldValues map[string]interface{}
	Values    map[string]interface{}

	UpColumnsInfoMap map[string]*model.ColumnInfo

	DestDBType DBType
	// contains filtered or unexported fields
}

DML holds the dml info

func (*DML) String

func (dml *DML) String() string

func (*DML) TableName

func (dml *DML) TableName() string

TableName returns the fully qualified name of the DML's table

type DMLType

type DMLType int

DMLType represents the dml type

const (
	UnknownDMLType DMLType = 0
	InsertDMLType  DMLType = 1
	UpdateDMLType  DMLType = 2
	DeleteDMLType  DMLType = 3
)

DMLType types

type Loader

type Loader interface {
	SetSafeMode(bool)
	GetSafeMode() bool
	Input() chan<- *Txn
	Successes() <-chan *Txn
	Close()
	Run() error
}

Loader is used to load data to mysql

func NewLoader

func NewLoader(db *gosql.DB, opt ...Option) (Loader, error)

NewLoader return a Loader db must support multi statement and interpolateParams

type MetricsGroup

type MetricsGroup struct {
	EventCounterVec   *prometheus.CounterVec
	QueryHistogramVec *prometheus.HistogramVec
	QueueSizeGauge    *prometheus.GaugeVec
}

MetricsGroup contains metrics of Loader

type Option

type Option func(*options)

A Option sets options such batch size, worker count etc.

func BatchSize

func BatchSize(n int) Option

BatchSize set batch size of loader

func DestinationDBType

func DestinationDBType(t string) Option

DestinationDBType set destDBType option.

func EnableCausality

func EnableCausality(b bool) Option

EnableCausality set EnableCausality or not.

func EnableDispatch

func EnableDispatch(b bool) Option

EnableDispatch set EnableDispatch or not. default value is True, when it's disable, loader will execute the txn one and one as input to it and will not split the txn for concurrently write downstream db.

func Merge

func Merge(v bool) Option

Merge set merge options.

func Metrics

func Metrics(m *MetricsGroup) Option

Metrics set metrics of loader

func SaveAppliedTS

func SaveAppliedTS(save bool) Option

SaveAppliedTS set downstream type, values can be tidb or mysql

func SetloopBackSyncInfo

func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option

SetloopBackSyncInfo set loop back sync info of loader

func SyncModeOption

func SyncModeOption(n SyncMode) Option

SyncModeOption set sync mode of loader.

func WorkerCount

func WorkerCount(n int) Option

WorkerCount set worker count of loader

type SyncMode

type SyncMode int

SyncMode represents the sync mode of DML.

const (
	SyncFullColumn SyncMode = 1 + iota
	SyncPartialColumn
)

SyncMode values.

type Txn

type Txn struct {
	DMLs []*DML
	DDL  *DDL

	AppliedTS int64

	// This field is used to hold arbitrary data you wish to include so it
	// will be available when receiving on the Successes channel
	Metadata interface{}
}

Txn holds transaction info, an DDL or DML sequences

func NewDDLTxn

func NewDDLTxn(db string, table string, sql string) *Txn

NewDDLTxn return a Txn

func SecondaryBinlogToTxn

func SecondaryBinlogToTxn(binlog *pb.Binlog, tableRouter *router.Table, upperColName bool) (*Txn, error)

SecondaryBinlogToTxn translate the Binlog format into Txn

func (*Txn) AppendDML

func (t *Txn) AppendDML(dml *DML)

AppendDML append a dml

func (*Txn) String

func (t *Txn) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL