relay

package
v0.0.0-...-bcc2220 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 26, 2024 License: Apache-2.0 Imports: 41 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may be duplicated")

ErrorMaybeDuplicateEvent indicates that there may be duplicate event in next binlog file this is mainly happened when upstream master changed when relay log not finish reading a transaction.

View Source
var NewPurger = NewRelayPurger

NewPurger creates a new purger.

View Source
var NewRelay = NewRealRelay

NewRelay creates an instance of Relay.

Functions

func CollectAllBinlogFiles

func CollectAllBinlogFiles(dir string) ([]string, error)

CollectAllBinlogFiles collects all valid binlog files in dir, and returns filenames in binlog ascending order.

func CollectBinlogFilesCmp

func CollectBinlogFilesCmp(dir, baseFile string, cmp FileCmp) ([]string, error)

CollectBinlogFilesCmp collects valid binlog files with a compare condition.

func RegisterMetrics

func RegisterMetrics(registry *prometheus.Registry)

RegisterMetrics register metrics.

Types

type BinlogReader

type BinlogReader struct {
	// contains filtered or unexported fields
}

BinlogReader is a binlog reader.

func (*BinlogReader) Close

func (r *BinlogReader) Close()

Close closes BinlogReader.

func (*BinlogReader) GetSubDirs

func (r *BinlogReader) GetSubDirs() []string

GetSubDirs returns binlog reader's subDirs.

func (*BinlogReader) IsGTIDCoverPreviousFiles

func (r *BinlogReader) IsGTIDCoverPreviousFiles(ctx context.Context, filePath string, gset mysql.GTIDSet) (bool, error)

IsGTIDCoverPreviousFiles check whether gset contains file's previous_gset.

func (*BinlogReader) Notified

func (r *BinlogReader) Notified() chan interface{}

func (*BinlogReader) OnEvent

func (r *BinlogReader) OnEvent(_ *replication.BinlogEvent)

func (*BinlogReader) StartSyncByGTID

func (r *BinlogReader) StartSyncByGTID(gset mysql.GTIDSet) (reader.Streamer, error)

StartSyncByGTID start sync by gtid.

func (*BinlogReader) StartSyncByPos

func (r *BinlogReader) StartSyncByPos(pos mysql.Position) (reader.Streamer, error)

StartSyncByPos start sync by pos TODO: thread-safe?

type BinlogReaderConfig

type BinlogReaderConfig struct {
	RelayDir            string
	Timezone            *time.Location
	Flavor              string
	RowsEventDecodeFunc func(*replication.RowsEvent, []byte) error
}

BinlogReaderConfig is the configuration for BinlogReader.

type BinlogWriter

type BinlogWriter struct {
	// contains filtered or unexported fields
}

BinlogWriter is a binlog event writer which writes binlog events to a file. Open/Write/Close cannot be called concurrently.

func NewBinlogWriter

func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter

NewBinlogWriter creates a BinlogWriter instance.

func (*BinlogWriter) Close

func (w *BinlogWriter) Close() error

func (*BinlogWriter) Flush

func (w *BinlogWriter) Flush() error

func (*BinlogWriter) Offset

func (w *BinlogWriter) Offset() int64

func (*BinlogWriter) Open

func (w *BinlogWriter) Open(uuid, filename string) error

func (*BinlogWriter) Status

func (w *BinlogWriter) Status() *BinlogWriterStatus

func (*BinlogWriter) Write

func (w *BinlogWriter) Write(rawData []byte) error

type BinlogWriterStatus

type BinlogWriterStatus struct {
	Filename string `json:"filename"`
	Offset   int64  `json:"offset"`
}

BinlogWriterStatus represents the status of a BinlogWriter.

func (*BinlogWriterStatus) String

func (s *BinlogWriterStatus) String() string

String implements Stringer.String.

type Config

type Config struct {
	EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"`
	// deprecated
	AutoFixGTID bool              `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
	RelayDir    string            `toml:"relay-dir" json:"relay-dir"`
	ServerID    uint32            `toml:"server-id" json:"server-id"`
	Flavor      string            `toml:"flavor" json:"flavor"`
	Charset     string            `toml:"charset" json:"charset"`
	From        dbconfig.DBConfig `toml:"data-source" json:"data-source"`

	// synchronous start point (if no meta saved before)
	// do not need to specify binlog-pos, because relay will fetch the whole file
	BinLogName string `toml:"binlog-name" json:"binlog-name"`
	BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"`
	UUIDSuffix int    `toml:"-" json:"-"`

	// for binlog reader retry
	ReaderRetry ReaderRetryConfig `toml:"reader-retry" json:"reader-retry"`
}

Config is the configuration for Relay.

func FromSourceCfg

func FromSourceCfg(sourceCfg *config.SourceConfig) *Config

FromSourceCfg gen relay config from source config.

func (*Config) String

func (c *Config) String() string

type EventNotifier

type EventNotifier interface {
	// Notified returns a channel used to check whether there is new binlog event written to the file
	Notified() chan interface{}
}

EventNotifier notifies whether there is new binlog event written to the file.

type FileCmp

type FileCmp uint8

FileCmp is a compare condition used when collecting binlog files.

const (
	FileCmpLess FileCmp = iota + 1
	FileCmpLessEqual
	FileCmpEqual
	FileCmpBiggerEqual
	FileCmpBigger
)

FileCmpLess represents a < FileCmp condition, others are similar.

type FileWriter

type FileWriter struct {
	// contains filtered or unexported fields
}

FileWriter implements Writer interface.

func (*FileWriter) Close

func (w *FileWriter) Close() error

Close implements Writer.Close.

func (*FileWriter) Flush

func (w *FileWriter) Flush() error

Flush implements Writer.Flush.

func (*FileWriter) Init

func (w *FileWriter) Init(uuid, filename string)

func (*FileWriter) IsActive

func (w *FileWriter) IsActive(uuid, filename string) (bool, int64)

func (*FileWriter) WriteEvent

func (w *FileWriter) WriteEvent(ev *replication.BinlogEvent) (WResult, error)

WriteEvent implements Writer.WriteEvent.

type Listener

type Listener interface {
	// OnEvent get called when relay processed an event successfully.
	OnEvent(e *replication.BinlogEvent)
}

Listener defines a binlog event listener of relay log.

type LocalMeta

type LocalMeta struct {
	sync.RWMutex

	BinLogName string `toml:"binlog-name" json:"binlog-name"`
	BinLogPos  uint32 `toml:"binlog-pos" json:"binlog-pos"`
	BinlogGTID string `toml:"binlog-gtid" json:"binlog-gtid"`
	// contains filtered or unexported fields
}

LocalMeta implements Meta by save info in local.

func (*LocalMeta) AddDir

func (lm *LocalMeta) AddDir(serverUUID string, newPos *mysql.Position, newGTID mysql.GTIDSet, uuidSuffix int) error

AddDir implements Meta.AddDir.

func (*LocalMeta) AdjustWithStartPos

func (lm *LocalMeta) AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error)

AdjustWithStartPos implements Meta.AdjustWithStartPos, return whether adjusted.

func (*LocalMeta) Dir

func (lm *LocalMeta) Dir() string

Dir implements Meta.Dir.

func (*LocalMeta) Dirty

func (lm *LocalMeta) Dirty() bool

Dirty implements Meta.Dirty.

func (*LocalMeta) Flush

func (lm *LocalMeta) Flush() error

Flush implements Meta.Flush.

func (*LocalMeta) GTID

func (lm *LocalMeta) GTID() (string, mysql.GTIDSet)

GTID implements Meta.GTID.

func (*LocalMeta) Load

func (lm *LocalMeta) Load() error

Load implements Meta.Load.

func (*LocalMeta) Pos

func (lm *LocalMeta) Pos() (string, mysql.Position)

Pos implements Meta.Pos.

func (*LocalMeta) Save

func (lm *LocalMeta) Save(pos mysql.Position, gset mysql.GTIDSet) error

Save implements Meta.Save.

func (*LocalMeta) String

func (lm *LocalMeta) String() string

String implements Meta.String.

func (*LocalMeta) SubDir

func (lm *LocalMeta) SubDir() string

SubDir implements Meta.SubDir.

func (*LocalMeta) TrimUUIDIndexFile

func (lm *LocalMeta) TrimUUIDIndexFile() ([]string, error)

TrimUUIDIndexFile implements Meta.TrimUUIDIndexFile.

type LocalStreamer

type LocalStreamer struct {
	// contains filtered or unexported fields
}

LocalStreamer reads and parses binlog events from local binlog file.

func (*LocalStreamer) GetEvent

GetEvent gets the binlog event one by one, it will block until parser occurs some errors. You can pass a context (like Cancel or Timeout) to break the block.

type Meta

type Meta interface {
	// Load loads meta information for the recently active server
	Load() error

	// AdjustWithStartPos adjusts current pos / GTID with start pos
	// if current pos / GTID is meaningless, update to start pos or last pos when start pos is meaningless
	// else do nothing
	AdjustWithStartPos(binlogName string, binlogGTID string, enableGTID bool, latestBinlogName string, latestBinlogGTID string) (bool, error)

	// Save saves meta information
	Save(pos mysql.Position, gset mysql.GTIDSet) error

	// Flush flushes meta information
	Flush() error

	// Dirty checks whether meta in memory is dirty (need to Flush)
	Dirty() bool

	// AddDir adds relay log subdirectory for a new server. The name of new subdirectory
	// consists of the server_uuid of new server and a suffix.
	// if suffix is not zero value, add sub relay directory with suffix (bound to a new source)
	// otherwise the added sub relay directory's suffix is incremented (master/slave switch)
	// after sub relay directory added, the internal binlog pos should be reset
	// and binlog pos will be set again when new binlog events received
	// if set @newPos / @newGTID, old value will be replaced
	AddDir(serverUUID string, newPos *mysql.Position, newGTID mysql.GTIDSet, suffix int) error

	// Pos returns current (UUID with suffix, Position) pair
	Pos() (string, mysql.Position)

	// GTID returns current (UUID with suffix, GTID) pair
	GTID() (string, mysql.GTIDSet)

	// SubDir returns the name of current relay log subdirectory.
	SubDir() string

	// TrimUUIDIndexFile trim invalid relay log subdirectories from memory and update the server-uuid.index file
	// return trimmed result.
	TrimUUIDIndexFile() ([]string, error)

	// Dir returns the full path of relay log subdirectory.
	Dir() string

	// String returns string representation of current meta info
	String() string
}

Meta represents relay log meta information for sync source when re-syncing, we should reload meta info to guarantee continuous transmission in order to support master-slave switching, Meta should support switching binlog meta info to newer master should support the case, where switching from A to B, then switching from B back to A.

func NewLocalMeta

func NewLocalMeta(flavor, baseDir string) Meta

NewLocalMeta creates a new LocalMeta.

type Operator

type Operator interface {
	// EarliestActiveRelayLog returns the earliest active relay log info in this operator
	EarliestActiveRelayLog() *streamer.RelayLogInfo
}

Operator represents an operator for relay log files, like writer, reader.

type Process

type Process interface {
	// Init initial relat log unit
	Init(ctx context.Context) (err error)
	// Process run background logic of relay log unit
	Process(ctx context.Context) pb.ProcessResult
	// ActiveRelayLog returns the earliest active relay log info in this operator
	ActiveRelayLog() *pkgstreamer.RelayLogInfo
	// Reload reloads config
	Reload(newCfg *Config) error
	// Update updates config
	Update(cfg *config.SubTaskConfig) error
	// Resume resumes paused relay log process unit
	Resume(ctx context.Context, pr chan pb.ProcessResult)
	// Pause pauses a running relay log process unit
	Pause()
	// Error returns error message if having one
	Error() interface{}
	// Status returns status of relay log process unit.
	Status(sourceStatus *binlog.SourceStatus) interface{}
	// Close does some clean works
	Close()
	// IsClosed returns whether relay log process unit was closed
	IsClosed() bool
	// SaveMeta save relay meta
	SaveMeta(pos mysql.Position, gset mysql.GTIDSet) error
	// ResetMeta reset relay meta
	ResetMeta()
	// PurgeRelayDir will clear all contents under w.cfg.RelayDir
	PurgeRelayDir() error
	// RegisterListener registers a relay listener
	RegisterListener(el Listener)
	// UnRegisterListener unregisters a relay listener
	UnRegisterListener(el Listener)
	// NewReader creates a new relay reader
	NewReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader
	// IsActive check whether given uuid+filename is active binlog file, if true return current file offset
	IsActive(uuid, filename string) (bool, int64)
}

Process defines mysql-like relay log process unit.

func NewRealRelay

func NewRealRelay(cfg *Config) Process

NewRealRelay creates an instance of Relay.

type PurgeInterceptor

type PurgeInterceptor interface {
	// ForbidPurge returns whether forbidding purge currently and an optional message
	ForbidPurge() (bool, string)
}

PurgeInterceptor represents an interceptor may forbid the purge process.

type PurgeStrategy

type PurgeStrategy interface {
	// Check checks whether need to do the purge in the background automatically
	Check(args interface{}) (bool, error)

	// Do does the purge process one time
	Do(args interface{}) error

	// Purging indicates whether is doing purge
	Purging() bool

	// Type returns the strategy type
	Type() strategyType
}

PurgeStrategy represents a relay log purge strategy two purge behaviors

  1. purge in the background
  2. do one time purge process

a strategy can support both or one of them.

type Purger

type Purger interface {
	// Start starts strategies by config
	Start()
	// Close stops the started strategies
	Close()
	// Purging returns whether the purger is purging
	Purging() bool
	// Do does the purge process one time
	Do(ctx context.Context, req *pb.PurgeRelayRequest) error
}

Purger purges relay log according to some strategies.

func NewDummyPurger

func NewDummyPurger(cfg config.PurgeConfig, baseRelayDir string, operators []Operator, interceptors []PurgeInterceptor) Purger

NewDummyPurger returns a dummy purger.

func NewRelayPurger

func NewRelayPurger(cfg config.PurgeConfig, baseRelayDir string, operators []Operator, interceptors []PurgeInterceptor) Purger

NewRelayPurger creates a new purger.

type RConfig

type RConfig struct {
	SyncConfig replication.BinlogSyncerConfig
	Pos        mysql.Position
	GTIDs      mysql.GTIDSet
	EnableGTID bool
	MasterID   string // the identifier for the master, used when logging.
}

RConfig is the configuration used by the Reader.

type RResult

type RResult struct {
	Event *replication.BinlogEvent
}

RResult represents a read operation result.

type Reader

type Reader interface {
	// Start starts the reading process.
	Start() error

	// Close closes the reader and release the resource.
	Close() error

	// GetEvent gets the binlog event one by one, it will block if no event can be read.
	// You can pass a context (like Cancel) to break the block.
	GetEvent(ctx context.Context) (RResult, error)
}

Reader reads binlog events from a upstream master server. The read binlog events should be send to a transformer. The reader should support:

  1. handle expected errors
  2. do retry if possible

NOTE: some errors still need to be handled in the outer caller.

func NewUpstreamReader

func NewUpstreamReader(cfg *RConfig) Reader

NewUpstreamReader creates a Reader instance.

type ReaderRetry

type ReaderRetry struct {
	// contains filtered or unexported fields
}

ReaderRetry is used to control the retry for the ReaderRetry. It is not thread-safe.

func NewReaderRetry

func NewReaderRetry(cfg ReaderRetryConfig) (*ReaderRetry, error)

NewReaderRetry creates a new ReaderRetry instance.

func (*ReaderRetry) Check

func (rr *ReaderRetry) Check(ctx context.Context, err error) bool

Check checks whether should retry for the error.

type ReaderRetryConfig

type ReaderRetryConfig struct {
	BackoffRollback time.Duration `toml:"backoff-rollback" json:"backoff-rollback"`
	BackoffMax      time.Duration `toml:"backoff-max" json:"backoff-max"`
	// unexposed config
	BackoffMin    time.Duration `json:"-"`
	BackoffJitter bool          `json:"-"`
	BackoffFactor float64       `json:"-"`
}

ReaderRetryConfig is the configuration used for binlog reader retry backoff. we always enable this now.

type Relay

type Relay struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Relay relays mysql binlog to local file.

func (*Relay) ActiveRelayLog

func (r *Relay) ActiveRelayLog() *pkgstreamer.RelayLogInfo

ActiveRelayLog returns the current active RelayLogInfo.

func (*Relay) Close

func (r *Relay) Close()

Close implements the dm.Unit interface.

func (*Relay) Error

func (r *Relay) Error() interface{}

Error implements the dm.Unit interface.

func (*Relay) FlushMeta

func (r *Relay) FlushMeta() error

FlushMeta flush relay meta.

func (*Relay) Init

func (r *Relay) Init(ctx context.Context) (err error)

Init implements the dm.Unit interface. NOTE when Init encounters an error, it will make DM-worker exit when it boots up and assigned relay.

func (*Relay) IsActive

func (r *Relay) IsActive(uuid, filename string) (bool, int64)

func (*Relay) IsClosed

func (r *Relay) IsClosed() bool

IsClosed tells whether Relay unit is closed or not.

func (*Relay) IsFreshTask

func (r *Relay) IsFreshTask() (bool, error)

IsFreshTask implements Unit.IsFreshTask.

func (*Relay) NewReader

func (r *Relay) NewReader(logger log.Logger, cfg *BinlogReaderConfig) *BinlogReader

func (*Relay) Pause

func (r *Relay) Pause()

Pause pauses the process, it can be resumed later.

func (*Relay) Process

func (r *Relay) Process(ctx context.Context) pb.ProcessResult

Process implements the dm.Unit interface.

func (*Relay) PurgeRelayDir

func (r *Relay) PurgeRelayDir() error

PurgeRelayDir implements the dm.Unit interface.

func (*Relay) RegisterListener

func (r *Relay) RegisterListener(el Listener)

RegisterListener implements Process.RegisterListener.

func (*Relay) Reload

func (r *Relay) Reload(newCfg *Config) error

Reload updates config.

func (*Relay) ResetMeta

func (r *Relay) ResetMeta()

ResetMeta reset relay meta.

func (*Relay) Resume

func (r *Relay) Resume(ctx context.Context, pr chan pb.ProcessResult)

Resume resumes the paused process.

func (*Relay) SaveMeta

func (r *Relay) SaveMeta(pos mysql.Position, gset mysql.GTIDSet) error

SaveMeta save relay meta and update meta in RelayLogInfo.

func (*Relay) Status

func (r *Relay) Status(sourceStatus *binlog.SourceStatus) interface{}

Status implements the dm.Unit interface.

func (*Relay) Type

func (r *Relay) Type() pb.UnitType

Type implements the dm.Unit interface.

func (*Relay) UnRegisterListener

func (r *Relay) UnRegisterListener(el Listener)

UnRegisterListener implements Process.UnRegisterListener.

func (*Relay) Update

func (r *Relay) Update(cfg *config.SubTaskConfig) error

Update implements Unit.Update.

type StrategyArgs

type StrategyArgs interface {
	// SetActiveRelayLog sets active relay log info in args
	// this should be called before do the purging
	SetActiveRelayLog(active *streamer.RelayLogInfo)
}

StrategyArgs represents args needed by purge strategy.

type SwitchPath

type SwitchPath struct {
	// contains filtered or unexported fields
}

SwitchPath represents next binlog file path which should be switched.

type WResult

type WResult struct {
	Ignore       bool   // whether the event ignored by the writer
	IgnoreReason string // why the writer ignore the event
}

WResult represents a write result.

type Writer

type Writer interface {
	// Init inits the writer, should be called before any other method
	Init(uuid, filename string)
	// Close closes the writer and release the resource.
	Close() error

	// WriteEvent writes an binlog event's data into disk or any other places.
	// It is not safe for concurrent use by multiple goroutines.
	WriteEvent(ev *replication.BinlogEvent) (WResult, error)
	// IsActive check whether given uuid+filename is active binlog file, if true return current file offset
	IsActive(uuid, filename string) (bool, int64)
	// Flush flushes the binlog writer.
	Flush() error
}

Writer writes binlog events into disk or any other memory structure. The writer should support:

  1. write binlog events and report the operation result
  2. skip any obsolete binlog events
  3. generate dummy events to fill the gap if needed
  4. rotate binlog(relay) file if needed
  5. rollback/discard unfinished binlog entries(events or transactions)

func NewFileWriter

func NewFileWriter(logger log.Logger, relayDir string) Writer

NewFileWriter creates a FileWriter instances.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL