Documentation
¶
Index ¶
- func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error)
- type Collector
- type Config
- type HTTPStatus
- type HeapStrategy
- type MergeItem
- type MergeItems
- type MergeSource
- type MergeStrategy
- type Merger
- type NormalStrategy
- type Pump
- type RelayConfig
- type Schema
- func (s *Schema) CanAppendDefaultValue(id int64, schemaVersion int64) bool
- func (s *Schema) CreateSchema(db *model.DBInfo) error
- func (s *Schema) CreateTable(schemaVersion int64, schema *model.DBInfo, table *model.TableInfo) error
- func (s *Schema) DropSchema(id int64) (string, error)
- func (s *Schema) DropTable(id int64) (string, error)
- func (s *Schema) InitForCreateMySQLSchema()
- func (s *Schema) IsDroppingColumn(id int64) bool
- func (s *Schema) IsTruncateTableID(id int64) bool
- func (s *Schema) ReplaceTable(schemaVersion int64, table *model.TableInfo) error
- func (s *Schema) SchemaAndTableName(id int64) (string, string, bool)
- func (s *Schema) SchemaByID(id int64) (val *model.DBInfo, ok bool)
- func (s *Schema) SchemaByTableID(tableID int64) (*model.DBInfo, bool)
- func (s *Schema) SchemaMetaVersion() int64
- func (s *Schema) String() string
- func (s *Schema) TableByID(id int64) (val *model.TableInfo, ok bool)
- func (s *Schema) TableBySchemaVersion(id int64, schemaVersion int64) (table *model.TableInfo, ok bool)
- type Server
- func (s *Server) ApplyAction(w http.ResponseWriter, r *http.Request)
- func (s *Server) Close()
- func (s *Server) DumpBinlog(req *binlog.DumpBinlogReq, stream binlog.Cistern_DumpBinlogServer) (err error)
- func (s *Server) DumpDDLJobs(ctx context.Context, req *binlog.DumpDDLJobsReq) (resp *binlog.DumpDDLJobsResp, err error)
- func (s *Server) GetLatestTS(w http.ResponseWriter, r *http.Request)
- func (s *Server) Notify(ctx context.Context, in *binlog.NotifyReq) (*binlog.NotifyResp, error)
- func (s *Server) Start() error
- type Syncer
- type SyncerConfig
- type TableName
- type TaskBinLogFilterRule
- type TaskTableMigrateRule
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GenCheckPointCfg ¶
func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error)
GenCheckPointCfg returns an CheckPoint config instance
Types ¶
type Collector ¶
type Collector struct {
// contains filtered or unexported fields
}
Collector collects binlog from all pump, and send binlog to syncer.
func NewCollector ¶
func NewCollector(cfg *Config, clusterID uint64, s *Syncer, cpt checkpoint.CheckPoint) (*Collector, error)
NewCollector returns an instance of Collector
func (*Collector) HTTPStatus ¶
func (c *Collector) HTTPStatus() *HTTPStatus
HTTPStatus returns a snapshot of current http status.
type Config ¶
type Config struct { *flag.FlagSet `toml:"-" json:"-"` LogLevel string `toml:"log-level" json:"log-level"` NodeID string `toml:"node-id" json:"node-id"` ListenAddr string `toml:"addr" json:"addr"` AdvertiseAddr string `toml:"advertise-addr" json:"advertise-addr"` DataDir string `toml:"data-dir" json:"data-dir"` DetectInterval int `toml:"detect-interval" json:"detect-interval"` EtcdURLs string `toml:"pd-urls" json:"pd-urls"` LogFile string `toml:"log-file" json:"log-file"` InitialCommitTS int64 `toml:"initial-commit-ts" json:"initial-commit-ts"` SyncerCfg *SyncerConfig `toml:"syncer" json:"sycner"` Security security.Config `toml:"security" json:"security"` SyncedCheckTime int `toml:"synced-check-time" json:"synced-check-time"` Compressor string `toml:"compressor" json:"compressor"` EtcdTimeout time.Duration MetricsAddr string MetricsInterval int // contains filtered or unexported fields }
Config holds the configuration of drainer
type HTTPStatus ¶
type HTTPStatus struct { PumpPos map[string]int64 `json:"PumpPos"` Synced bool `json:"Synced"` LastTS int64 `json:"LastTS"` TsMap string `json:"TsMap"` }
HTTPStatus exposes current status of the collector via HTTP
func (*HTTPStatus) Status ¶
func (s *HTTPStatus) Status(w http.ResponseWriter, r *http.Request)
Status implements http.ServeHTTP interface
type HeapStrategy ¶
type HeapStrategy struct {
// contains filtered or unexported fields
}
HeapStrategy is a strategy to get min item using heap
func NewHeapStrategy ¶
func NewHeapStrategy() *HeapStrategy
NewHeapStrategy returns a new HeapStrategy
func (*HeapStrategy) Exist ¶
func (h *HeapStrategy) Exist(sourceID string) bool
Exist implements MergeStrategy's Exist function
func (*HeapStrategy) Pop ¶
func (h *HeapStrategy) Pop() MergeItem
Pop implements MergeStrategy's Pop function
func (*HeapStrategy) Push ¶
func (h *HeapStrategy) Push(item MergeItem)
Push implements MergeStrategy's Push function
type MergeItems ¶
type MergeItems []MergeItem
MergeItems is a heap of MergeItems.
func (MergeItems) Len ¶
func (m MergeItems) Len() int
func (MergeItems) Less ¶
func (m MergeItems) Less(i, j int) bool
func (*MergeItems) Pop ¶
func (m *MergeItems) Pop() interface{}
Pop implements heap.Interface's Pop function
func (*MergeItems) Push ¶
func (m *MergeItems) Push(x interface{})
Push implements heap.Interface's Push function
func (MergeItems) Swap ¶
func (m MergeItems) Swap(i, j int)
type MergeSource ¶
MergeSource contains a source info about binlog
type MergeStrategy ¶
MergeStrategy is a strategy interface for merge item
type Merger ¶
Merger do merge sort of binlog
func NewMerger ¶
func NewMerger(ts int64, strategy string, sources ...MergeSource) *Merger
NewMerger creates a instance of Merger
func (*Merger) AddSource ¶
func (m *Merger) AddSource(source MergeSource)
AddSource add a source to Merger
func (*Merger) Close ¶
func (m *Merger) Close()
Close close the output chan when all the source id drained
func (*Merger) GetLatestTS ¶
GetLatestTS returns the last binlog's ts send to syncer
func (*Merger) RemoveSource ¶
RemoveSource removes a source from Merger
type NormalStrategy ¶
type NormalStrategy struct {
// contains filtered or unexported fields
}
NormalStrategy is a strategy to get min item using normal way
func NewNormalStrategy ¶
func NewNormalStrategy() *NormalStrategy
NewNormalStrategy returns a new NormalStrategy
func (*NormalStrategy) Exist ¶
func (n *NormalStrategy) Exist(sourceID string) bool
Exist implements MergeStrategy's Exist function
func (*NormalStrategy) Pop ¶
func (n *NormalStrategy) Pop() MergeItem
Pop implements MergeStrategy's Pop function
func (*NormalStrategy) Push ¶
func (n *NormalStrategy) Push(item MergeItem)
Push implements MergeStrategy's Push function
type Pump ¶
type Pump struct {
// contains filtered or unexported fields
}
Pump holds the connection to a pump node, and keeps the savepoint of binlog last read
func NewPump ¶
func NewPump(nodeID, addr string, tlsConfig *tls.Config, clusterID uint64, startTs int64, errCh chan error) *Pump
NewPump returns an instance of Pump
func (*Pump) Continue ¶
Continue sets isPaused to 0, and continue pull binlog from pump. This function is reentrant.
type RelayConfig ¶
type RelayConfig struct { LogDir string `toml:"log-dir" json:"log-dir"` MaxFileSize int64 `toml:"max-file-size" json:"max-file-size"` }
RelayConfig is the Relay log's configuration.
func (RelayConfig) IsEnabled ¶
func (rc RelayConfig) IsEnabled() bool
IsEnabled return true if we need to handle relay log.
type Schema ¶
type Schema struct {
// contains filtered or unexported fields
}
Schema stores the source TiDB all schema infomations schema infomations could be changed by drainer init and ddls appear
func (*Schema) CanAppendDefaultValue ¶
CanAppendDefaultValue means we can safely add the default value to the column if missing the value.
func (*Schema) CreateSchema ¶
CreateSchema adds new DBInfo
func (*Schema) CreateTable ¶
func (s *Schema) CreateTable(schemaVersion int64, schema *model.DBInfo, table *model.TableInfo) error
CreateTable creates new TableInfo
func (*Schema) DropSchema ¶
DropSchema deletes the given DBInfo
func (*Schema) InitForCreateMySQLSchema ¶
func (s *Schema) InitForCreateMySQLSchema()
InitForCreateMySQLSchema create the schema info for `mysql`, since it's created by KV after TiDB 6.2.
func (*Schema) IsDroppingColumn ¶
IsDroppingColumn returns true if the table is in the middle of dropping a column
func (*Schema) IsTruncateTableID ¶
IsTruncateTableID returns true if the table id have been truncated by truncate table DDL
func (*Schema) ReplaceTable ¶
ReplaceTable replace the table by new tableInfo
func (*Schema) SchemaAndTableName ¶
SchemaAndTableName returns the tableName by table id
func (*Schema) SchemaByID ¶
SchemaByID returns the DBInfo by schema id
func (*Schema) SchemaByTableID ¶
SchemaByTableID returns the schema ID by table ID
func (*Schema) SchemaMetaVersion ¶
SchemaMetaVersion returns the current schemaversion in drainer
type Server ¶
type Server struct { ID string // contains filtered or unexported fields }
Server implements the gRPC interface, and maintains the runtime status
func (*Server) ApplyAction ¶
func (s *Server) ApplyAction(w http.ResponseWriter, r *http.Request)
ApplyAction change the pump's state, now can be pause or close.
func (*Server) Close ¶
func (s *Server) Close()
Close stops all goroutines started by drainer server gracefully
func (*Server) DumpBinlog ¶
func (s *Server) DumpBinlog(req *binlog.DumpBinlogReq, stream binlog.Cistern_DumpBinlogServer) (err error)
DumpBinlog implements the gRPC interface of drainer server
func (*Server) DumpDDLJobs ¶
func (s *Server) DumpDDLJobs(ctx context.Context, req *binlog.DumpDDLJobsReq) (resp *binlog.DumpDDLJobsResp, err error)
DumpDDLJobs implements the gRPC interface of drainer server
func (*Server) GetLatestTS ¶
func (s *Server) GetLatestTS(w http.ResponseWriter, r *http.Request)
GetLatestTS returns the last binlog's commit ts which synced to downstream.
type Syncer ¶
type Syncer struct {
// contains filtered or unexported fields
}
Syncer converts tidb binlog to the specified DB sqls, and sync it to target DB
func NewSyncer ¶
func NewSyncer(cp checkpoint.CheckPoint, cfg *SyncerConfig, jobs []*model.Job) (*Syncer, error)
NewSyncer returns a Drainer instance
func (*Syncer) Add ¶
func (s *Syncer) Add(b *binlogItem)
Add adds binlogItem to the syncer's input channel
func (*Syncer) GetLastSyncTime ¶
GetLastSyncTime returns lastSyncTime
func (*Syncer) GetLatestCommitTS ¶
GetLatestCommitTS returns the latest commit ts.
type SyncerConfig ¶
type SyncerConfig struct { StrSQLMode *string `toml:"sql-mode" json:"sql-mode"` SQLMode mysql.SQLMode `toml:"-" json:"-"` IgnoreTxnCommitTS []int64 `toml:"ignore-txn-commit-ts" json:"ignore-txn-commit-ts"` IgnoreSchemas string `toml:"ignore-schemas" json:"ignore-schemas"` IgnoreTables []filter.TableName `toml:"ignore-table" json:"ignore-table"` TxnBatch int `toml:"txn-batch" json:"txn-batch"` LoopbackControl bool `toml:"loopback-control" json:"loopback-control"` SyncDDL bool `toml:"sync-ddl" json:"sync-ddl"` ChannelID int64 `toml:"channel-id" json:"channel-id"` WorkerCount int `toml:"worker-count" json:"worker-count"` To *dsync.DBConfig `toml:"to" json:"to"` DoTables []filter.TableName `toml:"replicate-do-table" json:"replicate-do-table"` DoDBs []string `toml:"replicate-do-db" json:"replicate-do-db"` DestDBType string `toml:"db-type" json:"db-type"` Relay RelayConfig `toml:"relay" json:"relay"` // disable* is keep for backward compatibility. // if both setted, the disable one take affect. DisableDispatchFlag *bool `toml:"-" json:"disable-dispatch-flag"` EnableDispatchFlag *bool `toml:"-" json:"enable-dispatch-flag"` DisableDispatchFile *bool `toml:"disable-dispatch" json:"disable-dispatch"` EnableDispatchFile *bool `toml:"enable-dispatch" json:"enable-dispatch"` SafeMode bool `toml:"safe-mode" json:"safe-mode"` // for backward compatibility. // disable* is keep for backward compatibility. // if both setted, the disable one take affect. DisableCausalityFlag *bool `toml:"-" json:"disable-detect-flag"` EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"` DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"` EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"` LoadSchemaSnapshot bool `toml:"load-schema-snapshot" json:"load-schema-snapshot"` // v2 filter rules CaseSensitive bool `toml:"case-sensitive" json:"case-sensitive"` TableMigrateRule []TaskTableMigrateRule `toml:"table-migrate-rule" json:"table-migrate-rule"` BinlogFilterRule map[string]TaskBinLogFilterRule `toml:"binlog-filter-rule,omitempty" json:"binlog-filter-rule,omitempty"` }
SyncerConfig is the Syncer's configuration.
func (*SyncerConfig) EnableCausality ¶
func (c *SyncerConfig) EnableCausality() bool
EnableCausality return true if enable causality.
func (*SyncerConfig) EnableDispatch ¶
func (c *SyncerConfig) EnableDispatch() bool
EnableDispatch return true if enable dispatch.
type TaskBinLogFilterRule ¶
type TaskBinLogFilterRule struct { // event type IgnoreEvent *[]string `toml:"ignore-event,omitempty" json:"ignore-event,omitempty"` // sql pattern to filter IgnoreSQL *[]string `toml:"ignore-sql,omitempty" json:"ignore-sql,omitempty"` }
TaskBinLogFilterRule defines filtering rules at binlog level
type TaskTableMigrateRule ¶
type TaskTableMigrateRule struct { // filter rule name BinlogFilterRule *[]string `toml:"binlog-filter-rule,omitempty" json:"binlog-filter-rule,omitempty"` // source-related configuration Source struct { // schema name, wildcard support Schema string `toml:"schema" json:"schema"` // table name, wildcard support Table string `toml:"table" json:"table"` } `toml:"source" json:"source"` // downstream-related configuration Target *struct { // schema name, does not support wildcards Schema string `toml:"schema" json:"schema"` // table name, does not support wildcards Table string `toml:"table" json:"table"` } `toml:"target,omitempty" json:"target,omitempty"` }
TaskTableMigrateRule defines upstream table to downstream migrate rules