repl

package
v0.6.0-prerelease Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Overview

Package repl contains binary log subscription functionality.

Index

Constants

View Source
const (

	// DefaultBatchSize is the number of rows in each batched REPLACE/DELETE statement.
	// Larger is better, but we need to keep the run-time of the statement well below
	// dbconn.maximumLockTime so that it doesn't prevent copy-row tasks from failing.
	// Since on some of our Aurora tables with out-of-cache workloads only copy ~300 rows per second,
	// we probably shouldn't set this any larger than about 1K. It will also use
	// multiple-flush-threads, which should help it group commit and still be fast.
	// This is only used as an initial starting value. It will auto-scale based on the DefaultTargetBatchTime.
	DefaultBatchSize = 1000

	// DefaultTargetBatchTime is the target time for flushing REPLACE/DELETE statements.
	DefaultTargetBatchTime = time.Millisecond * 500

	// DefaultFlushInterval is the time that the client will flush all binlog changes to disk.
	// Longer values require more memory, but permit more merging.
	// I expect we will change this to 1hr-24hr in the future.
	DefaultFlushInterval = 30 * time.Second
	// DefaultTimeout is how long BlockWait is supposed to wait before returning errors.
	DefaultTimeout = 10 * time.Second
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	canal.DummyEventHandler
	sync.Mutex

	TableChangeNotificationCallback func()
	KeyAboveCopierCallback          func(interface{}) bool
	// contains filtered or unexported fields
}

func NewClient

func NewClient(db *sql.DB, host string, table, newTable *table.TableInfo, username, password string, config *ClientConfig) *Client

func (*Client) AllChangesFlushed

func (c *Client) AllChangesFlushed() bool

func (*Client) BlockWait

func (c *Client) BlockWait(ctx context.Context) error

BlockWait blocks until the *canal position* has caught up to the current binlog position. This is usually called by Flush() which then ensures the changes are flushed. Calling it directly is usually only used by the test-suite! There is a built-in func in canal to do this, but it calls FLUSH BINARY LOGS, which requires additional permissions. **Caveat** Unless you are calling this from Flush(), calling this DOES NOT ensure that changes have been applied to the database.

func (*Client) Close

func (c *Client) Close()

func (*Client) Flush

func (c *Client) Flush(ctx context.Context) error

Flush empties the changeset in a loop until the amount of changes is considered "trivial". The loop is required, because changes continue to be added while the flush is occurring.

func (*Client) FlushUnderTableLock

func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error

FlushUnderTableLock is a final flush under an exclusive table lock using the connection that holds a write lock. Because flushing generates binary log events, we actually want to call flush *twice*:

  • The first time flushes the pending changes to the new table.
  • We then ensure that we have all the binary log changes read from the server.
  • The second time reads through the changes generated by the first flush and updates the in memory applied position to match the server's position. This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.

func (*Client) GetBinlogApplyPosition

func (c *Client) GetBinlogApplyPosition() mysql.Position

func (*Client) GetDeltaLen

func (c *Client) GetDeltaLen() int

func (*Client) KeyAboveWatermarkEnabled

func (c *Client) KeyAboveWatermarkEnabled() bool

KeyAboveWatermarkEnabled returns true if the key above watermark optimization is enabled. and it's also safe to do so.

func (*Client) OnRow

func (c *Client) OnRow(e *canal.RowsEvent) error

OnRow is called when a row is discovered via replication. The event is of type e.Action and contains one or more rows in e.Rows. We find the PRIMARY KEY of the row: 1) If it exceeds the known high watermark of the copier we throw it away. (we've not copied that data yet - it will be already up to date when we copy it later). 2) If it could have been copied already, we add it to the changeset. We only need to add the PK + if the operation was a delete. This will be used after copy rows to apply any changes that have been made.

func (*Client) OnTableChanged

func (c *Client) OnTableChanged(header *replication.EventHeader, schema string, table string) error

OnTableChanged is called when a table is changed via DDL. This is a failsafe because we don't expect DDL to be performed on the table while we are operating.

func (*Client) Run

func (c *Client) Run() (err error)

func (*Client) SetKeyAboveWatermarkOptimization

func (c *Client) SetKeyAboveWatermarkOptimization(newVal bool)

func (*Client) SetPos

func (c *Client) SetPos(pos mysql.Position)

SetPos is used for resuming from a checkpoint.

func (*Client) StartPeriodicFlush

func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)

StartPeriodicFlush starts a loop that periodically flushes the binlog changeset. This is used by the migrator to ensure the binlog position is advanced.

func (*Client) StopPeriodicFlush

func (c *Client) StopPeriodicFlush()

StopPeriodicFlush disables the periodic flush, also guaranteeing when it returns there is no current flush running

type ClientConfig

type ClientConfig struct {
	TargetBatchTime time.Duration
	Concurrency     int
	Logger          loggers.Advanced
}

func NewClientDefaultConfig

func NewClientDefaultConfig() *ClientConfig

NewClientDefaultConfig returns a default config for the copier.

type LogWrapper

type LogWrapper struct {
	// contains filtered or unexported fields
}

func NewLogWrapper

func NewLogWrapper(logger loggers.Advanced) *LogWrapper

func (*LogWrapper) Debug

func (c *LogWrapper) Debug(args ...interface{})

func (*LogWrapper) Debugf

func (c *LogWrapper) Debugf(format string, args ...interface{})

func (*LogWrapper) Debugln

func (c *LogWrapper) Debugln(args ...interface{})

func (*LogWrapper) Error

func (c *LogWrapper) Error(args ...interface{})

func (*LogWrapper) Errorf

func (c *LogWrapper) Errorf(format string, args ...interface{})

func (*LogWrapper) Errorln

func (c *LogWrapper) Errorln(args ...interface{})

func (*LogWrapper) Fatal

func (c *LogWrapper) Fatal(args ...interface{})

func (*LogWrapper) Fatalf

func (c *LogWrapper) Fatalf(format string, args ...interface{})

func (*LogWrapper) Fatalln

func (c *LogWrapper) Fatalln(args ...interface{})

func (*LogWrapper) Info

func (c *LogWrapper) Info(args ...interface{})

func (*LogWrapper) Infof

func (c *LogWrapper) Infof(format string, args ...interface{})

func (*LogWrapper) Infoln

func (c *LogWrapper) Infoln(args ...interface{})

func (*LogWrapper) Panic

func (c *LogWrapper) Panic(args ...interface{})

func (*LogWrapper) Panicf

func (c *LogWrapper) Panicf(format string, args ...interface{})

func (*LogWrapper) Panicln

func (c *LogWrapper) Panicln(args ...interface{})

func (*LogWrapper) Print

func (c *LogWrapper) Print(args ...interface{})

func (*LogWrapper) Printf

func (c *LogWrapper) Printf(format string, args ...interface{})

func (*LogWrapper) Println

func (c *LogWrapper) Println(args ...interface{})

func (*LogWrapper) Warn

func (c *LogWrapper) Warn(args ...interface{})

func (*LogWrapper) Warnf

func (c *LogWrapper) Warnf(format string, args ...interface{})

func (*LogWrapper) Warnln

func (c *LogWrapper) Warnln(args ...interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL