Documentation ¶
Overview ¶
Example ¶
// create sql.DB db, err := CreateDB("root", "", "localhost", 4000, nil /* *tls.Config */) if err != nil { log.Fatal(err) } // init loader loader, err := NewLoader(db, WorkerCount(16), BatchSize(128)) if err != nil { log.Fatal(err) } // get the success txn from loader go func() { // the return order will be the order you push into loader.Input() for txn := range loader.Successes() { log.Print("succ: ", txn) } }() // run loader go func() { // return non nil if encounter some case fail to load data the downstream // or nil when loader is closed when all data is loaded to downstream err := loader.Run() if err != nil { log.Fatal(err) } }() // push ddl txn loader.Input() <- NewDDLTxn("test", "test", "create table test(id primary key)") // push one insert dml txn values := map[string]interface{}{"id": 1} loader.Input() <- &Txn{ DMLs: []*DML{{Database: "test", Table: "test", Tp: InsertDMLType, Values: values}}, } // push one update dml txn newValues := map[string]interface{}{"id": 2} loader.Input() <- &Txn{ DMLs: []*DML{{Database: "test", Table: "test", Tp: UpdateDMLType, Values: newValues, OldValues: values}}, } // you can set safe mode or not at run time // which use replace for insert event and delete + replace for update make it be idempotent loader.SetSafeMode(true) // push one delete dml txn loader.Input() <- &Txn{ DMLs: []*DML{{Database: "test", Table: "test", Tp: DeleteDMLType, Values: newValues}}, } //... // Close the Loader. No more Txn can be push into Input() // Run will quit when all data is drained loader.Close()
Output:
Index ¶
- Variables
- func CreateDB(user string, password string, host string, port int, tls *tls.Config) (db *gosql.DB, err error)
- func CreateDBWithSQLMode(user string, password string, host string, port int, tlsConfig *tls.Config, ...) (db *gosql.DB, err error)
- func CreateOracleDB(user string, password string, host string, port int, ...) (db *gosql.DB, err error)
- type Causality
- type DBType
- type DDL
- type DML
- type DMLType
- type Loader
- type MetricsGroup
- type Option
- func BatchSize(n int) Option
- func DestinationDBType(t string) Option
- func EnableCausality(b bool) Option
- func EnableDispatch(b bool) Option
- func Merge(v bool) Option
- func Metrics(m *MetricsGroup) Option
- func SaveAppliedTS(save bool) Option
- func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option
- func SyncModeOption(n SyncMode) Option
- func WorkerCount(n int) Option
- type SyncMode
- type Txn
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( // ErrTableNotExist means the table not exist. ErrTableNotExist = errors.New("table not exist") )
Functions ¶
func CreateDB ¶
func CreateDB(user string, password string, host string, port int, tls *tls.Config) (db *gosql.DB, err error)
CreateDB return sql.DB
Types ¶
type Causality ¶
type Causality struct {
// contains filtered or unexported fields
}
Causality provides a simple mechanism to improve the concurrency of SQLs execution under the premise of ensuring correctness. causality groups sqls that maybe contain causal relationships, and syncer executes them linearly. if some conflicts exist in more than one groups, then syncer waits all SQLs that are grouped be executed and reset causality. this mechanism meets quiescent consistency to ensure correctness.
func (*Causality) DetectConflict ¶
DetectConflict detects whether there is a conflict
type DDL ¶
type DDL struct { Database string Table string SQL string // should skip to execute this DDL at downstream and just refresh the downstream table info. // one case for this usage is for bidirectional replication and only execute DDL at one side. ShouldSkip bool }
DDL holds the ddl info
type DML ¶
type DML struct { Database string Table string Tp DMLType // only set when Tp = UpdateDMLType OldValues map[string]interface{} Values map[string]interface{} UpColumnsInfoMap map[string]*model.ColumnInfo DestDBType DBType // contains filtered or unexported fields }
DML holds the dml info
type Loader ¶
type Loader interface { SetSafeMode(bool) GetSafeMode() bool Input() chan<- *Txn Successes() <-chan *Txn Close() Run() error }
Loader is used to load data to mysql
type MetricsGroup ¶
type MetricsGroup struct { EventCounterVec *prometheus.CounterVec QueryHistogramVec *prometheus.HistogramVec QueueSizeGauge *prometheus.GaugeVec }
MetricsGroup contains metrics of Loader
type Option ¶
type Option func(*options)
A Option sets options such batch size, worker count etc.
func DestinationDBType ¶
DestinationDBType set destDBType option.
func EnableCausality ¶
EnableCausality set EnableCausality or not.
func EnableDispatch ¶
EnableDispatch set EnableDispatch or not. default value is True, when it's disable, loader will execute the txn one and one as input to it and will not split the txn for concurrently write downstream db.
func SaveAppliedTS ¶
SaveAppliedTS set downstream type, values can be tidb or mysql
func SetloopBackSyncInfo ¶
func SetloopBackSyncInfo(loopBackSyncInfo *loopbacksync.LoopBackSync) Option
SetloopBackSyncInfo set loop back sync info of loader
func SyncModeOption ¶
SyncModeOption set sync mode of loader.
type Txn ¶
type Txn struct { DMLs []*DML DDL *DDL AppliedTS int64 // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes channel Metadata interface{} }
Txn holds transaction info, an DDL or DML sequences