Documentation ¶
Index ¶
- Variables
- func GenTableID(schema, table string) (ID string, isSchemaOnly bool)
- func InitStatusAndMetrics(addr string)
- func RegisterMetrics(registry *prometheus.Registry)
- func UnpackTableID(id string) (string, string)
- type BinlogType
- type CheckPoint
- type DBConn
- type DDLExecInfo
- type DDLExecItem
- type ExecErrorContext
- type GenColCache
- type Ghost
- func (g *Ghost) Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, ...) ([]string, string, string, error)
- func (g *Ghost) Clear(tctx *tcontext.Context) error
- func (g *Ghost) Close()
- func (g *Ghost) Finish(tctx *tcontext.Context, schema, table string) error
- func (g *Ghost) RealName(schema, table string) (string, string)
- func (g *Ghost) ResetConn(tctx *tcontext.Context) error
- func (g *Ghost) TableType(table string) TableType
- type GhostDDLInfo
- type Heartbeat
- type HeartbeatConfig
- type OnlineDDLStorage
- func (s *OnlineDDLStorage) Clear(tctx *tcontext.Context) error
- func (s *OnlineDDLStorage) Close()
- func (s *OnlineDDLStorage) Delete(tctx *tcontext.Context, ghostSchema, ghostTable string) error
- func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo
- func (s *OnlineDDLStorage) Init(tctx *tcontext.Context) error
- func (s *OnlineDDLStorage) Load(tctx *tcontext.Context) error
- func (s *OnlineDDLStorage) ResetConn(tctx *tcontext.Context) error
- func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ...) error
- type OnlinePlugin
- type PT
- func (p *PT) Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, ...) ([]string, string, string, error)
- func (p *PT) Clear(tctx *tcontext.Context) error
- func (p *PT) Close()
- func (p *PT) Finish(tcxt *tcontext.Context, schema, table string) error
- func (p *PT) RealName(schema, table string) (string, string)
- func (p *PT) ResetConn(tctx *tcontext.Context) error
- func (p *PT) TableType(table string) TableType
- type RemoteCheckPoint
- func (cp *RemoteCheckPoint) CheckGlobalPoint() bool
- func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error
- func (cp *RemoteCheckPoint) Close()
- func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error
- func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, ...) error
- func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position
- func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position
- func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) error
- func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool
- func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error
- func (cp *RemoteCheckPoint) LoadMeta() error
- func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error
- func (cp *RemoteCheckPoint) Rollback()
- func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position)
- func (cp *RemoteCheckPoint) SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position)
- func (cp *RemoteCheckPoint) String() string
- func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]mysql.Position
- type ShardingGroup
- func (sg *ShardingGroup) ActiveDDLFirstPos() (mysql.Position, error)
- func (sg *ShardingGroup) CheckSyncing(source string, pos mysql.Position) (beforeActiveDDL bool)
- func (sg *ShardingGroup) FirstEndPosUnresolved() *mysql.Position
- func (sg *ShardingGroup) FirstPosUnresolved() *mysql.Position
- func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interface{})
- func (sg *ShardingGroup) InSequenceSharding() bool
- func (sg *ShardingGroup) IsUnresolved() bool
- func (sg *ShardingGroup) Leave(sources []string) error
- func (sg *ShardingGroup) Merge(sources []string) (bool, bool, int, error)
- func (sg *ShardingGroup) Reset()
- func (sg *ShardingGroup) ResolveShardingDDL() bool
- func (sg *ShardingGroup) Sources() map[string]bool
- func (sg *ShardingGroup) String() string
- func (sg *ShardingGroup) Tables() [][]string
- func (sg *ShardingGroup) TrySync(source string, pos, endPos mysql.Position, ddls []string) (bool, bool, int, error)
- func (sg *ShardingGroup) UnresolvedGroupInfo() *pb.ShardingGroup
- func (sg *ShardingGroup) UnresolvedTables() [][]string
- type ShardingGroupKeeper
- func (k *ShardingGroupKeeper) ActiveDDLFirstPos(targetSchema, targetTable string) (mysql.Position, error)
- func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, ...) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, ...)
- func (k *ShardingGroupKeeper) AdjustGlobalPoint(globalPoint mysql.Position) mysql.Position
- func (k *ShardingGroupKeeper) Close()
- func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup
- func (k *ShardingGroupKeeper) Groups() map[string]*ShardingGroup
- func (k *ShardingGroupKeeper) InSequenceSharding() bool
- func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string, pos mysql.Position) bool
- func (k *ShardingGroupKeeper) Init() error
- func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error
- func (k *ShardingGroupKeeper) LoadShardMeta() (map[string]*shardmeta.ShardingMeta, error)
- func (k *ShardingGroupKeeper) PrepareFlushSQLs(exceptTableIDs map[string]bool) ([]string, [][]interface{})
- func (k *ShardingGroupKeeper) ResetGroups()
- func (k *ShardingGroupKeeper) ResolveShardingDDL(targetSchema, targetTable string) (bool, error)
- func (k *ShardingGroupKeeper) TrySync(targetSchema, targetTable, source string, pos, endPos mysql.Position, ...) (needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, ...)
- func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup
- func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, [][]string)
- type ShardingReSync
- type StreamerProducer
- type Syncer
- func (s *Syncer) Close()
- func (s *Syncer) DDLInfo() <-chan *pb.DDLInfo
- func (s *Syncer) Error() interface{}
- func (s *Syncer) ExecuteDDL(ctx context.Context, execReq *pb.ExecDDLRequest) (<-chan error, error)
- func (s *Syncer) Init(ctx context.Context) (err error)
- func (s *Syncer) InjectSQLs(ctx context.Context, sqls []string) error
- func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error)
- func (s *Syncer) Pause()
- func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult)
- func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult)
- func (s *Syncer) Run(ctx context.Context) (err error)
- func (s *Syncer) SetSQLOperator(req *pb.HandleSubTaskSQLsRequest) error
- func (s *Syncer) Status() interface{}
- func (s *Syncer) Type() pb.UnitType
- func (s *Syncer) Update(cfg *config.SubTaskConfig) error
- func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error
- type TableType
- type UpStreamConn
Constants ¶
This section is empty.
Variables ¶
var ( // IncompatibleDDLFormat is for incompatible ddl IncompatibleDDLFormat = `` /* 407-byte string literal not displayed */ )
var (
// MaxDDLConnectionTimeoutMinute also used by SubTask.ExecuteDDL
MaxDDLConnectionTimeoutMinute = 5
)
var ( // OnlineDDLSchemes is scheme name => online ddl handler OnlineDDLSchemes = map[string]func(*tcontext.Context, *config.SubTaskConfig) (OnlinePlugin, error){ config.PT: NewPT, config.GHOST: NewGhost, } )
Functions ¶
func GenTableID ¶
GenTableID generates table ID
func InitStatusAndMetrics ¶
func InitStatusAndMetrics(addr string)
InitStatusAndMetrics register prometheus metrics and listen for status port.
func RegisterMetrics ¶
func RegisterMetrics(registry *prometheus.Registry)
RegisterMetrics registers metrics
func UnpackTableID ¶
UnpackTableID unpacks table ID to <schema, table> pair
Types ¶
type BinlogType ¶
type BinlogType uint8
BinlogType represents binlog sync type
const ( RemoteBinlog BinlogType = iota + 1 LocalBinlog )
binlog sync type
type CheckPoint ¶
type CheckPoint interface { // Init initializes the CheckPoint Init(tctx *tcontext.Context) error // Close closes the CheckPoint Close() // ResetConn resets database connections owned by the Checkpoint ResetConn(tctx *tcontext.Context) error // Clear clears all checkpoints Clear(tctx *tcontext.Context) error // Load loads all checkpoints saved by CheckPoint Load(tctx *tcontext.Context) error // LoadMeta loads checkpoints from meta config item or file LoadMeta() error // SaveTablePoint saves checkpoint for specified table in memory SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position) // DeleteTablePoint deletes checkpoint for specified table in memory and storage DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error // IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool // SaveGlobalPoint saves the global binlog stream's checkpoint // corresponding to Meta.Save SaveGlobalPoint(pos mysql.Position) // FlushGlobalPointsExcept flushes the global checkpoint and tables' // checkpoints except exceptTables, it also flushes SQLs with Args providing // by extraSQLs and extraArgs. Currently extraSQLs contain shard meta only. // @exceptTables: [[schema, table]... ] // corresponding to Meta.Flush FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error // GlobalPoint returns the global binlog stream's checkpoint // corresponding to to Meta.Pos GlobalPoint() mysql.Position // TablePoint returns all table's stream checkpoint TablePoint() map[string]map[string]mysql.Position // FlushedGlobalPoint returns the flushed global binlog stream's checkpoint // corresponding to to Meta.Pos FlushedGlobalPoint() mysql.Position // CheckGlobalPoint checks whether we should save global checkpoint // corresponding to Meta.Check CheckGlobalPoint() bool // Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints Rollback() // String return text of global position String() string }
CheckPoint represents checkpoints status for syncer including global binlog's checkpoint and every table's checkpoint when save checkpoint, we must differ saving in memory from saving (flushing) to DB (or file) permanently for sharding merging, we must save checkpoint in memory to support skip when re-syncing for the special streamer but before all DDLs for a sharding group to be synced and executed, we should not save checkpoint permanently because, when restarting to continue the sync, all sharding DDLs must try-sync again
func NewRemoteCheckPoint ¶
func NewRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) CheckPoint
NewRemoteCheckPoint creates a new RemoteCheckPoint
type DBConn ¶ added in v1.0.2
type DBConn struct {
// contains filtered or unexported fields
}
DBConn represents a live DB connection it's not thread-safe
type DDLExecInfo ¶
DDLExecInfo used by syncer to execute or ignore sharding DDL it's specific to syncer, and can not be used by other process unit
func (*DDLExecInfo) BlockingDDLs ¶
func (i *DDLExecInfo) BlockingDDLs() []string
BlockingDDLs returns current blocking DDL
func (*DDLExecInfo) Chan ¶
func (i *DDLExecInfo) Chan(ddls []string) <-chan *DDLExecItem
Chan returns a receive only DDLExecItem chan
func (*DDLExecInfo) ClearBlockingDDL ¶
func (i *DDLExecInfo) ClearBlockingDDL()
ClearBlockingDDL clears current blocking DDL
func (*DDLExecInfo) Send ¶
func (i *DDLExecInfo) Send(ctx context.Context, item *DDLExecItem) error
Send sends an item (with request) to the chan
type DDLExecItem ¶
type DDLExecItem struct {
// contains filtered or unexported fields
}
DDLExecItem wraps request and response for a sharding DDL execution
type ExecErrorContext ¶
type ExecErrorContext struct {
// contains filtered or unexported fields
}
ExecErrorContext records a failed exec SQL information
type GenColCache ¶
type GenColCache struct {
// contains filtered or unexported fields
}
GenColCache stores generated column information for all tables
type Ghost ¶
type Ghost struct {
// contains filtered or unexported fields
}
Ghost handles gh-ost online ddls (not complete, don't need to review it) _*_gho ghost table _*_ghc ghost changelog table _*_del ghost transh table
func (*Ghost) Apply ¶
func (g *Ghost) Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
Apply implements interface. returns ddls, real schema, real table, error
type GhostDDLInfo ¶
type GhostDDLInfo struct { Schema string `json:"schema"` Table string `json:"table"` DDLs []string `json:"ddls"` }
GhostDDLInfo stores ghost information and ddls
type Heartbeat ¶
type Heartbeat struct {
// contains filtered or unexported fields
}
Heartbeat represents a heartbeat mechanism to measures replication lag on mysql and tidb/mysql. Learn from: https://www.percona.com/doc/percona-toolkit/LATEST/pt-heartbeat.html
func GetHeartbeat ¶
func GetHeartbeat(cfg *HeartbeatConfig) (*Heartbeat, error)
GetHeartbeat gets singleton instance of Heartbeat
func (*Heartbeat) RemoveTask ¶
RemoveTask removes a previous added task
func (*Heartbeat) TryUpdateTaskTs ¶
TryUpdateTaskTs tries to update task's ts
type HeartbeatConfig ¶
type HeartbeatConfig struct {
// contains filtered or unexported fields
}
HeartbeatConfig represents Heartbeat configurations.
func (*HeartbeatConfig) Equal ¶
func (cfg *HeartbeatConfig) Equal(other *HeartbeatConfig) error
Equal tests whether config equals to other
type OnlineDDLStorage ¶
OnlineDDLStorage stores sharding group online ddls information
func NewOnlineDDLStorage ¶
func NewOnlineDDLStorage(logCtx *tcontext.Context, cfg *config.SubTaskConfig) *OnlineDDLStorage
NewOnlineDDLStorage creates a new online ddl storager
func (*OnlineDDLStorage) Clear ¶
func (s *OnlineDDLStorage) Clear(tctx *tcontext.Context) error
Clear clears online ddl information from storage
func (*OnlineDDLStorage) Close ¶
func (s *OnlineDDLStorage) Close()
Close closes database connection
func (*OnlineDDLStorage) Delete ¶
func (s *OnlineDDLStorage) Delete(tctx *tcontext.Context, ghostSchema, ghostTable string) error
Delete deletes online ddl informations
func (*OnlineDDLStorage) Get ¶
func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo
Get returns ddls by given schema/table
func (*OnlineDDLStorage) Init ¶
func (s *OnlineDDLStorage) Init(tctx *tcontext.Context) error
Init initials online handler
func (*OnlineDDLStorage) Load ¶
func (s *OnlineDDLStorage) Load(tctx *tcontext.Context) error
Load loads information from storage
type OnlinePlugin ¶
type OnlinePlugin interface { // Applys does: // * detect online ddl // * record changes // * apply online ddl on real table // returns sqls, replaced/self schema, repliaced/slef table, error Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error) // Finish would delete online ddl from memory and storage Finish(tctx *tcontext.Context, schema, table string) error // TableType returns ghhost/real table TableType(table string) TableType // RealName returns real table name that removed ghost suffix and handled by table router RealName(schema, table string) (string, string) // ResetConn reset db connection ResetConn(tctx *tcontext.Context) error // Clear clears all online information Clear(tctx *tcontext.Context) error // Close closes online ddl plugin Close() }
OnlinePlugin handles online ddl solutions like pt, gh-ost
func NewGhost ¶
func NewGhost(tctx *tcontext.Context, cfg *config.SubTaskConfig) (OnlinePlugin, error)
NewGhost returns gh-oat online plugin
func NewPT ¶
func NewPT(tctx *tcontext.Context, cfg *config.SubTaskConfig) (OnlinePlugin, error)
NewPT returns pt online schema changes plugin
type PT ¶
type PT struct {
// contains filtered or unexported fields
}
PT handles pt online schema changes (_*).*_new ghost table (_*).*_old ghost transh table we don't support `--new-table-name` flag
func (*PT) Apply ¶
func (p *PT) Apply(tctx *tcontext.Context, tables []*filter.Table, statement string, stmt ast.StmtNode) ([]string, string, string, error)
Apply implements interface. returns ddls, real schema, real table, error
type RemoteCheckPoint ¶
RemoteCheckPoint implements CheckPoint which using target database to store info NOTE: now we sync from relay log, so not add GTID support yet it's not thread-safe
func (*RemoteCheckPoint) CheckGlobalPoint ¶
func (cp *RemoteCheckPoint) CheckGlobalPoint() bool
CheckGlobalPoint implements CheckPoint.CheckGlobalPoint
func (*RemoteCheckPoint) Clear ¶
func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error
Clear implements CheckPoint.Clear
func (*RemoteCheckPoint) Close ¶
func (cp *RemoteCheckPoint) Close()
Close implements CheckPoint.Close
func (*RemoteCheckPoint) DeleteTablePoint ¶
func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error
DeleteTablePoint implements CheckPoint.DeleteTablePoint
func (*RemoteCheckPoint) FlushPointsExcept ¶
func (cp *RemoteCheckPoint) FlushPointsExcept(tctx *tcontext.Context, exceptTables [][]string, extraSQLs []string, extraArgs [][]interface{}) error
FlushPointsExcept implements CheckPoint.FlushPointsExcept
func (*RemoteCheckPoint) FlushedGlobalPoint ¶
func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position
FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint
func (*RemoteCheckPoint) GlobalPoint ¶
func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position
GlobalPoint implements CheckPoint.GlobalPoint
func (*RemoteCheckPoint) Init ¶
func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context) error
Init implements CheckPoint.Init
func (*RemoteCheckPoint) IsOlderThanTablePoint ¶ added in v1.0.7
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool
IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint. For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position. if useLE is true, we use less than or equal.
func (*RemoteCheckPoint) Load ¶
func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error
Load implements CheckPoint.Load
func (*RemoteCheckPoint) LoadMeta ¶
func (cp *RemoteCheckPoint) LoadMeta() error
LoadMeta implements CheckPoint.LoadMeta
func (*RemoteCheckPoint) ResetConn ¶
func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error
ResetConn implements CheckPoint.ResetConn
func (*RemoteCheckPoint) Rollback ¶
func (cp *RemoteCheckPoint) Rollback()
Rollback implements CheckPoint.Rollback
func (*RemoteCheckPoint) SaveGlobalPoint ¶
func (cp *RemoteCheckPoint) SaveGlobalPoint(pos mysql.Position)
SaveGlobalPoint implements CheckPoint.SaveGlobalPoint
func (*RemoteCheckPoint) SaveTablePoint ¶
func (cp *RemoteCheckPoint) SaveTablePoint(sourceSchema, sourceTable string, pos mysql.Position)
SaveTablePoint implements CheckPoint.SaveTablePoint
func (*RemoteCheckPoint) String ¶
func (cp *RemoteCheckPoint) String() string
String implements CheckPoint.String
func (*RemoteCheckPoint) TablePoint ¶ added in v1.0.4
func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]mysql.Position
TablePoint implements CheckPoint.TablePoint
type ShardingGroup ¶
type ShardingGroup struct { sync.RWMutex IsSchemaOnly bool // whether is a schema (database) only DDL TODO: zxc add schema-level syncing support later // contains filtered or unexported fields }
ShardingGroup represents a sharding DDL sync group
func NewShardingGroup ¶
func NewShardingGroup(sourceID, shardMetaSchema, shardMetaTable string, sources []string, meta *shardmeta.ShardingMeta, isSchemaOnly bool) *ShardingGroup
NewShardingGroup creates a new ShardingGroup
func (*ShardingGroup) ActiveDDLFirstPos ¶
func (sg *ShardingGroup) ActiveDDLFirstPos() (mysql.Position, error)
ActiveDDLFirstPos returns the first binlog position of active DDL
func (*ShardingGroup) CheckSyncing ¶
func (sg *ShardingGroup) CheckSyncing(source string, pos mysql.Position) (beforeActiveDDL bool)
CheckSyncing checks the source table syncing status returns
beforeActiveDDL: whether the position is before active DDL
func (*ShardingGroup) FirstEndPosUnresolved ¶
func (sg *ShardingGroup) FirstEndPosUnresolved() *mysql.Position
FirstEndPosUnresolved returns the first DDL End_log_pos if un-resolved, else nil
func (*ShardingGroup) FirstPosUnresolved ¶
func (sg *ShardingGroup) FirstPosUnresolved() *mysql.Position
FirstPosUnresolved returns the first DDL pos if un-resolved, else nil
func (*ShardingGroup) FlushData ¶
func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interface{})
FlushData returns sharding meta flush SQLs and args
func (*ShardingGroup) InSequenceSharding ¶
func (sg *ShardingGroup) InSequenceSharding() bool
InSequenceSharding returns whether this sharding group is in sequence sharding
func (*ShardingGroup) IsUnresolved ¶
func (sg *ShardingGroup) IsUnresolved() bool
IsUnresolved return whether it's unresolved
func (*ShardingGroup) Leave ¶
func (sg *ShardingGroup) Leave(sources []string) error
Leave leaves from sharding group it, doesn't affect in syncing process used cases
- drop a database
- drop table
func (*ShardingGroup) Merge ¶
Merge merges new sources to exists used cases
- add a new table to exists sharding group
- add new table(s) to parent database's sharding group if group is in sequence sharding, return error directly othereise add it in source, set it false and increment remain
func (*ShardingGroup) Reset ¶
func (sg *ShardingGroup) Reset()
Reset resets all sources to un-synced state when the previous sharding DDL synced and resolved, we need reset it
func (*ShardingGroup) ResolveShardingDDL ¶
func (sg *ShardingGroup) ResolveShardingDDL() bool
ResolveShardingDDL resolves sharding DDL in sharding group
func (*ShardingGroup) Sources ¶
func (sg *ShardingGroup) Sources() map[string]bool
Sources returns all sources (and whether synced)
func (*ShardingGroup) String ¶
func (sg *ShardingGroup) String() string
String implements Stringer.String
func (*ShardingGroup) Tables ¶
func (sg *ShardingGroup) Tables() [][]string
Tables returns all source tables' <schema, table> pair
func (*ShardingGroup) TrySync ¶
func (sg *ShardingGroup) TrySync(source string, pos, endPos mysql.Position, ddls []string) (bool, bool, int, error)
TrySync tries to sync the sharding group returns
synced: whether the source table's sharding group synced active: whether the DDL will be processed in this round remain: remain un-synced source table's count
func (*ShardingGroup) UnresolvedGroupInfo ¶
func (sg *ShardingGroup) UnresolvedGroupInfo() *pb.ShardingGroup
UnresolvedGroupInfo returns pb.ShardingGroup if is unresolved, else returns nil
func (*ShardingGroup) UnresolvedTables ¶
func (sg *ShardingGroup) UnresolvedTables() [][]string
UnresolvedTables returns all source tables' <schema, table> pair if is unresolved, else returns nil
type ShardingGroupKeeper ¶
ShardingGroupKeeper used to keep ShardingGroup
func NewShardingGroupKeeper ¶
func NewShardingGroupKeeper(tctx *tcontext.Context, cfg *config.SubTaskConfig) *ShardingGroupKeeper
NewShardingGroupKeeper creates a new ShardingGroupKeeper
func (*ShardingGroupKeeper) ActiveDDLFirstPos ¶
func (k *ShardingGroupKeeper) ActiveDDLFirstPos(targetSchema, targetTable string) (mysql.Position, error)
ActiveDDLFirstPos returns the binlog position of active DDL
func (*ShardingGroupKeeper) AddGroup ¶
func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, meta *shardmeta.ShardingMeta, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error)
AddGroup adds new group(s) according to target schema, table and source IDs
func (*ShardingGroupKeeper) AdjustGlobalPoint ¶
func (k *ShardingGroupKeeper) AdjustGlobalPoint(globalPoint mysql.Position) mysql.Position
AdjustGlobalPoint adjusts globalPoint with sharding groups' lowest first point
func (*ShardingGroupKeeper) Close ¶
func (k *ShardingGroupKeeper) Close()
Close closes sharding group keeper
func (*ShardingGroupKeeper) Group ¶
func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup
Group returns target table's group, nil if not exist
func (*ShardingGroupKeeper) Groups ¶
func (k *ShardingGroupKeeper) Groups() map[string]*ShardingGroup
Groups returns all sharding groups, often used for debug caution: do not modify the returned groups directly
func (*ShardingGroupKeeper) InSequenceSharding ¶
func (k *ShardingGroupKeeper) InSequenceSharding() bool
InSequenceSharding returns whether exists sharding group in unfinished sequence sharding
func (*ShardingGroupKeeper) InSyncing ¶
func (k *ShardingGroupKeeper) InSyncing(targetSchema, targetTable, source string, pos mysql.Position) bool
InSyncing checks whether the source is in sharding syncing
func (*ShardingGroupKeeper) Init ¶
func (k *ShardingGroupKeeper) Init() error
Init does initialization staff
func (*ShardingGroupKeeper) LeaveGroup ¶
func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error
LeaveGroup leaves group according to target schema, table and source IDs LeaveGroup doesn't affect in syncing process
func (*ShardingGroupKeeper) LoadShardMeta ¶
func (k *ShardingGroupKeeper) LoadShardMeta() (map[string]*shardmeta.ShardingMeta, error)
LoadShardMeta implements CheckPoint.LoadShardMeta
func (*ShardingGroupKeeper) PrepareFlushSQLs ¶
func (k *ShardingGroupKeeper) PrepareFlushSQLs(exceptTableIDs map[string]bool) ([]string, [][]interface{})
PrepareFlushSQLs returns all sharding meta flushed SQLs execpt for given table IDs
func (*ShardingGroupKeeper) ResetGroups ¶
func (k *ShardingGroupKeeper) ResetGroups()
ResetGroups resets group's sync status
func (*ShardingGroupKeeper) ResolveShardingDDL ¶
func (k *ShardingGroupKeeper) ResolveShardingDDL(targetSchema, targetTable string) (bool, error)
ResolveShardingDDL resolves one sharding DDL in specific group
func (*ShardingGroupKeeper) TrySync ¶
func (k *ShardingGroupKeeper) TrySync( targetSchema, targetTable, source string, pos, endPos mysql.Position, ddls []string) ( needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, err error)
TrySync tries to sync the sharding group returns
isSharding: whether the source table is in a sharding group group: the sharding group synced: whether the source table's sharding group synced active: whether is active DDL in sequence sharding DDL remain: remain un-synced source table's count
func (*ShardingGroupKeeper) UnresolvedGroups ¶
func (k *ShardingGroupKeeper) UnresolvedGroups() []*pb.ShardingGroup
UnresolvedGroups returns sharding groups which are un-resolved caution: do not modify the returned groups directly
func (*ShardingGroupKeeper) UnresolvedTables ¶
func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, [][]string)
UnresolvedTables returns
all `target-schema.target-table` that has unresolved sharding DDL all source tables which with DDLs are un-resolved
NOTE: this func only ensure the returned tables are current un-resolved if passing the returned tables to other func (like checkpoint), must ensure their sync state not changed in this progress
type ShardingReSync ¶
type ShardingReSync struct {
// contains filtered or unexported fields
}
ShardingReSync represents re-sync info for a sharding DDL group
type StreamerProducer ¶
type StreamerProducer interface {
// contains filtered or unexported methods
}
StreamerProducer provides the ability to generate binlog streamer by StartSync() but go-mysql StartSync() returns (struct, err) rather than (interface, err) And we can't simplely use StartSync() method in SteamerProducer so use generateStreamer to wrap StartSync() method to make *BinlogSyncer and *BinlogReader in same interface For other implementations who implement StreamerProducer and Streamer can easily take place of Syncer.streamProducer For test is easy to mock
type Syncer ¶
Syncer can sync your MySQL data to another MySQL database.
func (*Syncer) ExecuteDDL ¶
ExecuteDDL executes or skips a hanging-up DDL when in sharding
func (*Syncer) Init ¶
Init initializes syncer for a sync task, but not start Process. if fail, it should not call s.Close. some check may move to checker later.
func (*Syncer) InjectSQLs ¶
InjectSQLs injects ddl into syncer as binlog events while meet xid/query event TODO: let user to specify special xid/query event position TODO: inject dml sqls
func (*Syncer) IsFreshTask ¶
IsFreshTask implements Unit.IsFreshTask
func (*Syncer) Pause ¶
func (s *Syncer) Pause()
Pause pauses the process, and it can be resumed later should cancel context from external TODO: it is not a true-meaning Pause because you can't stop it by calling Pause only.
func (*Syncer) Process ¶
func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult)
Process implements the dm.Unit interface.
func (*Syncer) Resume ¶
func (s *Syncer) Resume(ctx context.Context, pr chan pb.ProcessResult)
Resume resumes the paused process
func (*Syncer) SetSQLOperator ¶
func (s *Syncer) SetSQLOperator(req *pb.HandleSubTaskSQLsRequest) error
SetSQLOperator sets an SQL operator to syncer
func (*Syncer) Status ¶
func (s *Syncer) Status() interface{}
Status implements SubTaskUnit.Status it returns status, but does not calc status
func (*Syncer) Update ¶
func (s *Syncer) Update(cfg *config.SubTaskConfig) error
Update implements Unit.Update now, only support to update config for routes, filters, column-mappings, black-white-list now no config diff implemented, so simply re-init use new config
func (*Syncer) UpdateFromConfig ¶
func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error
UpdateFromConfig updates config for `From`
type UpStreamConn ¶ added in v1.0.2
UpStreamConn connect to upstream DB Normally, we need to get some upstream information through some helper functions these helper functions are all easy query functions, so we use a pool of connections here maybe change to one connection some day