Documentation ¶
Index ¶
- Variables
- func RegisterMetrics(registry *prometheus.Registry)
- type Config
- type DBConfig
- type LocalMeta
- func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID gtid.Set) error
- func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, ...) (bool, error)
- func (lm *LocalMeta) Dir() string
- func (lm *LocalMeta) Dirty() bool
- func (lm *LocalMeta) Flush() error
- func (lm *LocalMeta) GTID() (string, gtid.Set)
- func (lm *LocalMeta) Load() error
- func (lm *LocalMeta) Pos() (string, mysql.Position)
- func (lm *LocalMeta) Save(pos mysql.Position, gset gtid.Set) error
- func (lm *LocalMeta) String() string
- func (lm *LocalMeta) TrimUUIDs() ([]string, error)
- func (lm *LocalMeta) UUID() string
- type Meta
- type Process
- type Relay
- func (r *Relay) ActiveRelayLog() *pkgstreamer.RelayLogInfo
- func (r *Relay) Close()
- func (r *Relay) Error() interface{}
- func (r *Relay) Init(ctx context.Context) (err error)
- func (r *Relay) IsClosed() bool
- func (r *Relay) IsFreshTask() (bool, error)
- func (r *Relay) Migrate(ctx context.Context, binlogName string, binlogPos uint32) error
- func (r *Relay) Pause()
- func (r *Relay) Process(ctx context.Context, pr chan pb.ProcessResult)
- func (r *Relay) Reload(newCfg *Config) error
- func (r *Relay) Resume(ctx context.Context, pr chan pb.ProcessResult)
- func (r *Relay) Status() interface{}
- func (r *Relay) SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error
- func (r *Relay) Type() pb.UnitType
- func (r *Relay) Update(cfg *config.SubTaskConfig) error
Constants ¶
This section is empty.
Variables ¶
var NewRelay = NewRealRelay
NewRelay creates an instance of Relay.
Functions ¶
func RegisterMetrics ¶
func RegisterMetrics(registry *prometheus.Registry)
RegisterMetrics register metrics.
Types ¶
type Config ¶
type Config struct { EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"` AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"` RelayDir string `toml:"relay-dir" json:"relay-dir"` ServerID uint32 `toml:"server-id" json:"server-id"` Flavor string `toml:"flavor" json:"flavor"` Charset string `toml:"charset" json:"charset"` From DBConfig `toml:"data-source" json:"data-source"` // synchronous start point (if no meta saved before) // do not need to specify binlog-pos, because relay will fetch the whole file BinLogName string `toml:"binlog-name" json:"binlog-name"` BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"` // for binlog reader retry ReaderRetry retry.ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"` }
Config is the configuration for Relay.
type DBConfig ¶
type DBConfig struct { Host string `toml:"host" json:"host"` User string `toml:"user" json:"user"` Password string `toml:"password" json:"-"` // omit it for privacy Port int `toml:"port" json:"port"` }
DBConfig is the DB configuration.
type LocalMeta ¶
type LocalMeta struct { sync.RWMutex BinLogName string `toml:"binlog-name" json:"binlog-name"` BinLogPos uint32 `toml:"binlog-pos" json:"binlog-pos"` BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"` // contains filtered or unexported fields }
LocalMeta implements Meta by save info in local
func (*LocalMeta) AdjustWithStartPos ¶
func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error)
AdjustWithStartPos implements Meta.AdjustWithStartPos, return whether adjusted
type Meta ¶
type Meta interface { // Load loads meta information for the recently active server Load() error // AdjustWithStartPos adjusts current pos / GTID with start pos // if current pos / GTID is meaningless, update to start pos or last pos when start pos is meaningless // else do nothing AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error) // Save saves meta information Save(pos mysql.Position, gset gtid.Set) error // Flush flushes meta information Flush() error // Dirty checks whether meta in memory is dirty (need to Flush) Dirty() bool // AddDir adds sub relay directory for server UUID (without suffix) // the added sub relay directory's suffix is incremented // after sub relay directory added, the internal binlog pos should be reset // and binlog pos will be set again when new binlog events received // @serverUUID should be a server_uuid for MySQL or MariaDB // if set @newPos / @newGTID, old value will be replaced AddDir(serverUUID string, newPos *mysql.Position, newGTID gtid.Set) error // Pos returns current (UUID with suffix, Position) pair Pos() (string, mysql.Position) // GTID returns current (UUID with suffix, GTID) pair GTID() (string, gtid.Set) // UUID returns current UUID (with suffix) UUID() string // TrimUUIDs trim invalid UUIDs from memory and update the server-uuid.index file // return trimmed UUIDs TrimUUIDs() ([]string, error) // Dir returns current relay log (sub) directory Dir() string // String returns string representation of current meta info String() string }
Meta represents binlog meta information for sync source when re-syncing, we should reload meta info to guarantee continuous transmission in order to support master-slave switching, Meta should support switching binlog meta info to newer master should support the case, where switching from A to B, then switching from B back to A
func NewLocalMeta ¶
NewLocalMeta creates a new LocalMeta
type Process ¶
type Process interface { // Init initial relat log unit Init(ctx context.Context) (err error) // Process run background logic of relay log unit Process(ctx context.Context, pr chan pb.ProcessResult) // SwitchMaster switches relay's master server SwitchMaster(ctx context.Context, req *pb.SwitchRelayMasterRequest) error // Migrate resets binlog position Migrate(ctx context.Context, binlogName string, binlogPos uint32) error // ActiveRelayLog returns the earliest active relay log info in this operator ActiveRelayLog() *pkgstreamer.RelayLogInfo // Reload reloads config Reload(newCfg *Config) error // Update updates config Update(cfg *config.SubTaskConfig) error // Resume resumes paused relay log process unit Resume(ctx context.Context, pr chan pb.ProcessResult) // Pause pauses a running relay log process unit Pause() // Error returns error message if having one Error() interface{} // Status returns status of relay log process unit Status() interface{} // Close does some clean works Close() // IsClosed returns whether relay log process unit was closed IsClosed() bool }
Process defines mysql-like relay log process unit
func NewRealRelay ¶
NewRealRelay creates an instance of Relay.
type Relay ¶
Relay relays mysql binlog to local file.
func (*Relay) ActiveRelayLog ¶
func (r *Relay) ActiveRelayLog() *pkgstreamer.RelayLogInfo
ActiveRelayLog returns the current active RelayLogInfo
func (*Relay) IsFreshTask ¶
IsFreshTask implements Unit.IsFreshTask
func (*Relay) Process ¶
func (r *Relay) Process(ctx context.Context, pr chan pb.ProcessResult)
Process implements the dm.Unit interface.
func (*Relay) Resume ¶
func (r *Relay) Resume(ctx context.Context, pr chan pb.ProcessResult)
Resume resumes the paused process
func (*Relay) Status ¶
func (r *Relay) Status() interface{}
Status implements the dm.Unit interface.
func (*Relay) SwitchMaster ¶
SwitchMaster switches relay's master server before call this from dmctl, you must ensure that relay catches up previous master we can not check this automatically in this func because master already changed switch master server steps:
- use dmctl to pause relay
- ensure relay catching up current master server (use `query-status`)
- switch master server for upstream * change relay's master config, TODO * change master behind VIP
- use dmctl to switch relay's master server (use `switch-relay-master`)
- use dmctl to resume relay