Documentation ¶
Index ¶
- Variables
- func CollectAllBinlogFiles(dir string) ([]string, error)
- func CollectBinlogFilesCmp(dir, baseFile string, cmp FileCmp) ([]string, error)
- func RegisterMetrics(registry *prometheus.Registry)
- type BinlogReader
- func (r *BinlogReader) Close()
- func (r *BinlogReader) GetSubDirs() []string
- func (r *BinlogReader) IsGTIDCoverPreviousFiles(ctx context.Context, filePath string, gset mysql.GTIDSet) (bool, error)
- func (r *BinlogReader) Notified() chan interface{}
- func (r *BinlogReader) OnEvent(_ *replication.BinlogEvent)
- func (r *BinlogReader) StartSyncByGTID(gset mysql.GTIDSet) (reader.Streamer, error)
- func (r *BinlogReader) StartSyncByPos(pos mysql.Position) (reader.Streamer, error)
- type BinlogReaderConfig
- type BinlogWriter
- type BinlogWriterStatus
- type Config
- type EventNotifier
- type FileCmp
- type FileWriter
- type Listener
- type LocalMeta
- func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID mysql.GTIDSet, ...) error
- func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, ...) (bool, error)
- func (lm *LocalMeta) Dir() string
- func (lm *LocalMeta) Dirty() bool
- func (lm *LocalMeta) Flush() error
- func (lm *LocalMeta) GTID() (string, mysql.GTIDSet)
- func (lm *LocalMeta) Load() error
- func (lm *LocalMeta) Pos() (string, mysql.Position)
- func (lm *LocalMeta) Save(pos mysql.Position, gset mysql.GTIDSet) error
- func (lm *LocalMeta) String() string
- func (lm *LocalMeta) SubDir() string
- func (lm *LocalMeta) TrimUUIDIndexFile() ([]string, error)
- type LocalStreamer
- type Meta
- type Operator
- type Process
- type PurgeInterceptor
- type PurgeStrategy
- type Purger
- type RConfig
- type RResult
- type Reader
- type ReaderRetry
- type ReaderRetryConfig
- type Relay
- func (r *Relay) ActiveRelayLog() *pkgstreamer.RelayLogInfo
- func (r *Relay) Close()
- func (r *Relay) Error() interface{}
- func (r *Relay) FlushMeta() error
- func (r *Relay) Init(ctx context.Context) (err error)
- func (r *Relay) IsActive(uuid, filename string) (bool, int64)
- func (r *Relay) IsClosed() bool
- func (r *Relay) IsFreshTask() (bool, error)
- func (r *Relay) NewReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader
- func (r *Relay) Pause()
- func (r *Relay) Process(ctx context.Context) pb.ProcessResult
- func (r *Relay) PurgeRelayDir() error
- func (r *Relay) RegisterListener(el Listener)
- func (r *Relay) Reload(newCfg *Config) error
- func (r *Relay) ResetMeta()
- func (r *Relay) Resume(ctx context.Context, pr chan pb.ProcessResult)
- func (r *Relay) SaveMeta(pos mysql.Position, gset mysql.GTIDSet) error
- func (r *Relay) Status(sourceStatus *binlog.SourceStatus) interface{}
- func (r *Relay) Type() pb.UnitType
- func (r *Relay) UnRegisterListener(el Listener)
- func (r *Relay) Update(cfg *config.SubTaskConfig) error
- type StrategyArgs
- type SwitchPath
- type WResult
- type Writer
Constants ¶
This section is empty.
Variables ¶
var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may be duplicated")
ErrorMaybeDuplicateEvent indicates that there may be duplicate event in next binlog file this is mainly happened when upstream master changed when relay log not finish reading a transaction.
var NewPurger = NewRelayPurger
NewPurger creates a new purger.
var NewRelay = NewRealRelay
NewRelay creates an instance of Relay.
Functions ¶
func CollectAllBinlogFiles ¶
CollectAllBinlogFiles collects all valid binlog files in dir, and returns filenames in binlog ascending order.
func CollectBinlogFilesCmp ¶
CollectBinlogFilesCmp collects valid binlog files with a compare condition.
func RegisterMetrics ¶
func RegisterMetrics(registry *prometheus.Registry)
RegisterMetrics register metrics.
Types ¶
type BinlogReader ¶
type BinlogReader struct {
// contains filtered or unexported fields
}
BinlogReader is a binlog reader.
func (*BinlogReader) GetSubDirs ¶
func (r *BinlogReader) GetSubDirs() []string
GetSubDirs returns binlog reader's subDirs.
func (*BinlogReader) IsGTIDCoverPreviousFiles ¶
func (r *BinlogReader) IsGTIDCoverPreviousFiles(ctx context.Context, filePath string, gset mysql.GTIDSet) (bool, error)
IsGTIDCoverPreviousFiles check whether gset contains file's previous_gset.
func (*BinlogReader) Notified ¶
func (r *BinlogReader) Notified() chan interface{}
func (*BinlogReader) OnEvent ¶
func (r *BinlogReader) OnEvent(_ *replication.BinlogEvent)
func (*BinlogReader) StartSyncByGTID ¶
StartSyncByGTID start sync by gtid.
func (*BinlogReader) StartSyncByPos ¶
StartSyncByPos start sync by pos TODO: thread-safe?
type BinlogReaderConfig ¶
type BinlogReaderConfig struct { RelayDir string Timezone *time.Location Flavor string RowsEventDecodeFunc func(*replication.RowsEvent, []byte) error }
BinlogReaderConfig is the configuration for BinlogReader.
type BinlogWriter ¶
type BinlogWriter struct {
// contains filtered or unexported fields
}
BinlogWriter is a binlog event writer which writes binlog events to a file. Open/Write/Close cannot be called concurrently.
func NewBinlogWriter ¶
func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter
NewBinlogWriter creates a BinlogWriter instance.
func (*BinlogWriter) Close ¶
func (w *BinlogWriter) Close() error
func (*BinlogWriter) Flush ¶
func (w *BinlogWriter) Flush() error
func (*BinlogWriter) Offset ¶
func (w *BinlogWriter) Offset() int64
func (*BinlogWriter) Open ¶
func (w *BinlogWriter) Open(uuid, filename string) error
func (*BinlogWriter) Status ¶
func (w *BinlogWriter) Status() *BinlogWriterStatus
func (*BinlogWriter) Write ¶
func (w *BinlogWriter) Write(rawData []byte) error
type BinlogWriterStatus ¶
BinlogWriterStatus represents the status of a BinlogWriter.
func (*BinlogWriterStatus) String ¶
func (s *BinlogWriterStatus) String() string
String implements Stringer.String.
type Config ¶
type Config struct { EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"` // deprecated AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"` RelayDir string `toml:"relay-dir" json:"relay-dir"` ServerID uint32 `toml:"server-id" json:"server-id"` Flavor string `toml:"flavor" json:"flavor"` Charset string `toml:"charset" json:"charset"` From dbconfig.DBConfig `toml:"data-source" json:"data-source"` // synchronous start point (if no meta saved before) // do not need to specify binlog-pos, because relay will fetch the whole file BinLogName string `toml:"binlog-name" json:"binlog-name"` BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"` UUIDSuffix int `toml:"-" json:"-"` // for binlog reader retry ReaderRetry ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"` }
Config is the configuration for Relay.
func FromSourceCfg ¶
func FromSourceCfg(sourceCfg *config.SourceConfig) *Config
FromSourceCfg gen relay config from source config.
type EventNotifier ¶
type EventNotifier interface {
// Notified returns a channel used to check whether there is new binlog event written to the file
Notified() chan interface{}
}
EventNotifier notifies whether there is new binlog event written to the file.
type FileWriter ¶
type FileWriter struct {
// contains filtered or unexported fields
}
FileWriter implements Writer interface.
func (*FileWriter) Init ¶
func (w *FileWriter) Init(uuid, filename string)
func (*FileWriter) WriteEvent ¶
func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error)
WriteEvent implements Writer.WriteEvent.
type Listener ¶
type Listener interface { // OnEvent get called when relay processed an event successfully. OnEvent(e *replication.BinlogEvent) }
Listener defines a binlog event listener of relay log.
type LocalMeta ¶
type LocalMeta struct { sync.RWMutex BinLogName string `toml:"binlog-name" json:"binlog-name"` BinLogPos uint32 `toml:"binlog-pos" json:"binlog-pos"` BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"` // contains filtered or unexported fields }
LocalMeta implements Meta by save info in local.
func (*LocalMeta) AddDir ¶
func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID mysql.GTIDSet, uuidSuffix int) error
AddDir implements Meta.AddDir.
func (*LocalMeta) AdjustWithStartPos ¶
func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error)
AdjustWithStartPos implements Meta.AdjustWithStartPos, return whether adjusted.
func (*LocalMeta) TrimUUIDIndexFile ¶
TrimUUIDIndexFile implements Meta.TrimUUIDIndexFile.
type LocalStreamer ¶
type LocalStreamer struct {
// contains filtered or unexported fields
}
LocalStreamer reads and parses binlog events from local binlog file.
func (*LocalStreamer) GetEvent ¶
func (s *LocalStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error)
GetEvent gets the binlog event one by one, it will block until parser occurs some errors. You can pass a context (like Cancel or Timeout) to break the block.
type Meta ¶
type Meta interface { // Load loads meta information for the recently active server Load() error // AdjustWithStartPos adjusts current pos / GTID with start pos // if current pos / GTID is meaningless, update to start pos or last pos when start pos is meaningless // else do nothing AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error) // Save saves meta information Save(pos mysql.Position, gset mysql.GTIDSet) error // Flush flushes meta information Flush() error // Dirty checks whether meta in memory is dirty (need to Flush) Dirty() bool // AddDir adds relay log subdirectory for a new server. The name of new subdirectory // consists of the server_uuid of new server and a suffix. // if suffix is not zero value, add sub relay directory with suffix (bound to a new source) // otherwise the added sub relay directory's suffix is incremented (master/slave switch) // after sub relay directory added, the internal binlog pos should be reset // and binlog pos will be set again when new binlog events received // if set @newPos / @newGTID, old value will be replaced AddDir(serverUUID string, newPos *mysql.Position, newGTID mysql.GTIDSet, suffix int) error // Pos returns current (UUID with suffix, Position) pair Pos() (string, mysql.Position) // GTID returns current (UUID with suffix, GTID) pair GTID() (string, mysql.GTIDSet) // SubDir returns the name of current relay log subdirectory. SubDir() string // TrimUUIDIndexFile trim invalid relay log subdirectories from memory and update the server-uuid.index file // return trimmed result. TrimUUIDIndexFile() ([]string, error) // Dir returns the full path of relay log subdirectory. Dir() string // String returns string representation of current meta info String() string }
Meta represents relay log meta information for sync source when re-syncing, we should reload meta info to guarantee continuous transmission in order to support master-slave switching, Meta should support switching binlog meta info to newer master should support the case, where switching from A to B, then switching from B back to A.
func NewLocalMeta ¶
NewLocalMeta creates a new LocalMeta.
type Operator ¶
type Operator interface { // EarliestActiveRelayLog returns the earliest active relay log info in this operator EarliestActiveRelayLog() *streamer.RelayLogInfo }
Operator represents an operator for relay log files, like writer, reader.
type Process ¶
type Process interface { // Init initial relat log unit Init(ctx context.Context) (err error) // Process run background logic of relay log unit Process(ctx context.Context) pb.ProcessResult // ActiveRelayLog returns the earliest active relay log info in this operator ActiveRelayLog() *pkgstreamer.RelayLogInfo // Reload reloads config Reload(newCfg *Config) error // Update updates config Update(cfg *config.SubTaskConfig) error // Resume resumes paused relay log process unit Resume(ctx context.Context, pr chan pb.ProcessResult) // Pause pauses a running relay log process unit Pause() // Error returns error message if having one Error() interface{} // Status returns status of relay log process unit. Status(sourceStatus *binlog.SourceStatus) interface{} // Close does some clean works Close() // IsClosed returns whether relay log process unit was closed IsClosed() bool // SaveMeta save relay meta SaveMeta(pos mysql.Position, gset mysql.GTIDSet) error // ResetMeta reset relay meta ResetMeta() // PurgeRelayDir will clear all contents under w.cfg.RelayDir PurgeRelayDir() error // RegisterListener registers a relay listener RegisterListener(el Listener) // UnRegisterListener unregisters a relay listener UnRegisterListener(el Listener) // NewReader creates a new relay reader NewReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader // IsActive check whether given uuid+filename is active binlog file, if true return current file offset IsActive(uuid, filename string) (bool, int64) }
Process defines mysql-like relay log process unit.
func NewRealRelay ¶
NewRealRelay creates an instance of Relay.
type PurgeInterceptor ¶
type PurgeInterceptor interface { // ForbidPurge returns whether forbidding purge currently and an optional message ForbidPurge() (bool, string) }
PurgeInterceptor represents an interceptor may forbid the purge process.
type PurgeStrategy ¶
type PurgeStrategy interface { // Check checks whether need to do the purge in the background automatically Check(args interface{}) (bool, error) // Do does the purge process one time Do(args interface{}) error // Purging indicates whether is doing purge Purging() bool // Type returns the strategy type Type() strategyType }
PurgeStrategy represents a relay log purge strategy two purge behaviors
- purge in the background
- do one time purge process
a strategy can support both or one of them.
type Purger ¶
type Purger interface { // Start starts strategies by config Start() // Close stops the started strategies Close() // Purging returns whether the purger is purging Purging() bool // Do does the purge process one time Do(ctx context.Context, req *pb.PurgeRelayRequest) error }
Purger purges relay log according to some strategies.
func NewDummyPurger ¶
func NewDummyPurger(cfg config.PurgeConfig, baseRelayDir string, operators []Operator, interceptors []PurgeInterceptor) Purger
NewDummyPurger returns a dummy purger.
func NewRelayPurger ¶
func NewRelayPurger(cfg config.PurgeConfig, baseRelayDir string, operators []Operator, interceptors []PurgeInterceptor) Purger
NewRelayPurger creates a new purger.
type RConfig ¶
type RConfig struct { SyncConfig replication.BinlogSyncerConfig Pos mysql.Position GTIDs mysql.GTIDSet EnableGTID bool MasterID string // the identifier for the master, used when logging. }
RConfig is the configuration used by the Reader.
type RResult ¶
type RResult struct {
Event *replication.BinlogEvent
}
RResult represents a read operation result.
type Reader ¶
type Reader interface { // Start starts the reading process. Start() error // Close closes the reader and release the resource. Close() error // GetEvent gets the binlog event one by one, it will block if no event can be read. // You can pass a context (like Cancel) to break the block. GetEvent(ctx context.Context) (RResult, error) }
Reader reads binlog events from a upstream master server. The read binlog events should be send to a transformer. The reader should support:
- handle expected errors
- do retry if possible
NOTE: some errors still need to be handled in the outer caller.
func NewUpstreamReader ¶
NewUpstreamReader creates a Reader instance.
type ReaderRetry ¶
type ReaderRetry struct {
// contains filtered or unexported fields
}
ReaderRetry is used to control the retry for the ReaderRetry. It is not thread-safe.
func NewReaderRetry ¶
func NewReaderRetry(cfg ReaderRetryConfig) (*ReaderRetry, error)
NewReaderRetry creates a new ReaderRetry instance.
type ReaderRetryConfig ¶
type ReaderRetryConfig struct { BackoffRollback time.Duration `toml:"backoff-rollback" json:"backoff-rollback"` BackoffMax time.Duration `toml:"backoff-max" json:"backoff-max"` // unexposed config BackoffMin time.Duration `json:"-"` BackoffJitter bool `json:"-"` BackoffFactor float64 `json:"-"` }
ReaderRetryConfig is the configuration used for binlog reader retry backoff. we always enable this now.
type Relay ¶
Relay relays mysql binlog to local file.
func (*Relay) ActiveRelayLog ¶
func (r *Relay) ActiveRelayLog() *pkgstreamer.RelayLogInfo
ActiveRelayLog returns the current active RelayLogInfo.
func (*Relay) Init ¶
Init implements the dm.Unit interface. NOTE when Init encounters an error, it will make DM-worker exit when it boots up and assigned relay.
func (*Relay) IsFreshTask ¶
IsFreshTask implements Unit.IsFreshTask.
func (*Relay) NewReader ¶
func (r *Relay) NewReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader
func (*Relay) Process ¶
func (r *Relay) Process(ctx context.Context) pb.ProcessResult
Process implements the dm.Unit interface.
func (*Relay) PurgeRelayDir ¶
PurgeRelayDir implements the dm.Unit interface.
func (*Relay) RegisterListener ¶
RegisterListener implements Process.RegisterListener.
func (*Relay) Resume ¶
func (r *Relay) Resume(ctx context.Context, pr chan pb.ProcessResult)
Resume resumes the paused process.
func (*Relay) Status ¶
func (r *Relay) Status(sourceStatus *binlog.SourceStatus) interface{}
Status implements the dm.Unit interface.
func (*Relay) UnRegisterListener ¶
UnRegisterListener implements Process.UnRegisterListener.
type StrategyArgs ¶
type StrategyArgs interface { // SetActiveRelayLog sets active relay log info in args // this should be called before do the purging SetActiveRelayLog(active *streamer.RelayLogInfo) }
StrategyArgs represents args needed by purge strategy.
type SwitchPath ¶
type SwitchPath struct {
// contains filtered or unexported fields
}
SwitchPath represents next binlog file path which should be switched.
type WResult ¶
type WResult struct { Ignore bool // whether the event ignored by the writer IgnoreReason string // why the writer ignore the event }
WResult represents a write result.
type Writer ¶
type Writer interface { // Init inits the writer, should be called before any other method Init(uuid, filename string) // Close closes the writer and release the resource. Close() error // WriteEvent writes an binlog event's data into disk or any other places. // It is not safe for concurrent use by multiple goroutines. WriteEvent(ev *replication.BinlogEvent) (WResult, error) // IsActive check whether given uuid+filename is active binlog file, if true return current file offset IsActive(uuid, filename string) (bool, int64) // Flush flushes the binlog writer. Flush() error }
Writer writes binlog events into disk or any other memory structure. The writer should support:
- write binlog events and report the operation result
- skip any obsolete binlog events
- generate dummy events to fill the gap if needed
- rotate binlog(relay) file if needed
- rollback/discard unfinished binlog entries(events or transactions)