Documentation ¶
Overview ¶
Package repl contains binary log subscription functionality.
Index ¶
- Constants
- type Client
- func (c *Client) AllChangesFlushed() bool
- func (c *Client) BlockWait(ctx context.Context) error
- func (c *Client) Close()
- func (c *Client) Flush(ctx context.Context) error
- func (c *Client) FlushUnderTableLock(ctx context.Context, lock *dbconn.TableLock) error
- func (c *Client) GetBinlogApplyPosition() mysql.Position
- func (c *Client) GetDeltaLen() int
- func (c *Client) KeyAboveWatermarkEnabled() bool
- func (c *Client) OnRow(e *canal.RowsEvent) error
- func (c *Client) OnTableChanged(header *replication.EventHeader, schema string, table string) error
- func (c *Client) Run() (err error)
- func (c *Client) SetKeyAboveWatermarkOptimization(newVal bool)
- func (c *Client) SetPos(pos mysql.Position)
- func (c *Client) StartPeriodicFlush(ctx context.Context, interval time.Duration)
- func (c *Client) StopPeriodicFlush()
- type ClientConfig
- type LogWrapper
- func (c *LogWrapper) Debug(args ...interface{})
- func (c *LogWrapper) Debugf(format string, args ...interface{})
- func (c *LogWrapper) Debugln(args ...interface{})
- func (c *LogWrapper) Error(args ...interface{})
- func (c *LogWrapper) Errorf(format string, args ...interface{})
- func (c *LogWrapper) Errorln(args ...interface{})
- func (c *LogWrapper) Fatal(args ...interface{})
- func (c *LogWrapper) Fatalf(format string, args ...interface{})
- func (c *LogWrapper) Fatalln(args ...interface{})
- func (c *LogWrapper) Info(args ...interface{})
- func (c *LogWrapper) Infof(format string, args ...interface{})
- func (c *LogWrapper) Infoln(args ...interface{})
- func (c *LogWrapper) Panic(args ...interface{})
- func (c *LogWrapper) Panicf(format string, args ...interface{})
- func (c *LogWrapper) Panicln(args ...interface{})
- func (c *LogWrapper) Print(args ...interface{})
- func (c *LogWrapper) Printf(format string, args ...interface{})
- func (c *LogWrapper) Println(args ...interface{})
- func (c *LogWrapper) Warn(args ...interface{})
- func (c *LogWrapper) Warnf(format string, args ...interface{})
- func (c *LogWrapper) Warnln(args ...interface{})
Constants ¶
const ( // DefaultBatchSize is the number of rows in each batched REPLACE/DELETE statement. // Larger is better, but we need to keep the run-time of the statement well below // dbconn.maximumLockTime so that it doesn't prevent copy-row tasks from failing. // Since on some of our Aurora tables with out-of-cache workloads only copy ~300 rows per second, // we probably shouldn't set this any larger than about 1K. It will also use // multiple-flush-threads, which should help it group commit and still be fast. // This is only used as an initial starting value. It will auto-scale based on the DefaultTargetBatchTime. DefaultBatchSize = 1000 // DefaultTargetBatchTime is the target time for flushing REPLACE/DELETE statements. DefaultTargetBatchTime = time.Millisecond * 500 // DefaultFlushInterval is the time that the client will flush all binlog changes to disk. // Longer values require more memory, but permit more merging. // I expect we will change this to 1hr-24hr in the future. DefaultFlushInterval = 30 * time.Second // DefaultTimeout is how long BlockWait is supposed to wait before returning errors. DefaultTimeout = 10 * time.Second )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { canal.DummyEventHandler sync.Mutex TableChangeNotificationCallback func() KeyAboveCopierCallback func(interface{}) bool // contains filtered or unexported fields }
func (*Client) AllChangesFlushed ¶
func (*Client) BlockWait ¶
BlockWait blocks until the *canal position* has caught up to the current binlog position. This is usually called by Flush() which then ensures the changes are flushed. Calling it directly is usually only used by the test-suite! There is a built-in func in canal to do this, but it calls FLUSH BINARY LOGS, which requires additional permissions. **Caveat** Unless you are calling this from Flush(), calling this DOES NOT ensure that changes have been applied to the database.
func (*Client) Flush ¶
Flush empties the changeset in a loop until the amount of changes is considered "trivial". The loop is required, because changes continue to be added while the flush is occurring.
func (*Client) FlushUnderTableLock ¶
FlushUnderTableLock is a final flush under an exclusive table lock using the connection that holds a write lock. Because flushing generates binary log events, we actually want to call flush *twice*:
- The first time flushes the pending changes to the new table.
- We then ensure that we have all the binary log changes read from the server.
- The second time reads through the changes generated by the first flush and updates the in memory applied position to match the server's position. This is required to satisfy the binlog position is updated for the c.AllChangesFlushed() check.
func (*Client) GetBinlogApplyPosition ¶
func (*Client) GetDeltaLen ¶
func (*Client) KeyAboveWatermarkEnabled ¶
KeyAboveWatermarkEnabled returns true if the key above watermark optimization is enabled. and it's also safe to do so.
func (*Client) OnRow ¶
OnRow is called when a row is discovered via replication. The event is of type e.Action and contains one or more rows in e.Rows. We find the PRIMARY KEY of the row: 1) If it exceeds the known high watermark of the copier we throw it away. (we've not copied that data yet - it will be already up to date when we copy it later). 2) If it could have been copied already, we add it to the changeset. We only need to add the PK + if the operation was a delete. This will be used after copy rows to apply any changes that have been made.
func (*Client) OnTableChanged ¶
func (c *Client) OnTableChanged(header *replication.EventHeader, schema string, table string) error
OnTableChanged is called when a table is changed via DDL. This is a failsafe because we don't expect DDL to be performed on the table while we are operating.
func (*Client) SetKeyAboveWatermarkOptimization ¶
func (*Client) StartPeriodicFlush ¶
StartPeriodicFlush starts a loop that periodically flushes the binlog changeset. This is used by the migrator to ensure the binlog position is advanced.
func (*Client) StopPeriodicFlush ¶
func (c *Client) StopPeriodicFlush()
StopPeriodicFlush disables the periodic flush, also guaranteeing when it returns there is no current flush running
type ClientConfig ¶
func NewClientDefaultConfig ¶
func NewClientDefaultConfig() *ClientConfig
NewClientDefaultConfig returns a default config for the copier.
type LogWrapper ¶
type LogWrapper struct {
// contains filtered or unexported fields
}
func NewLogWrapper ¶
func NewLogWrapper(logger loggers.Advanced) *LogWrapper
func (*LogWrapper) Debug ¶
func (c *LogWrapper) Debug(args ...interface{})
func (*LogWrapper) Debugf ¶
func (c *LogWrapper) Debugf(format string, args ...interface{})
func (*LogWrapper) Debugln ¶
func (c *LogWrapper) Debugln(args ...interface{})
func (*LogWrapper) Error ¶
func (c *LogWrapper) Error(args ...interface{})
func (*LogWrapper) Errorf ¶
func (c *LogWrapper) Errorf(format string, args ...interface{})
func (*LogWrapper) Errorln ¶
func (c *LogWrapper) Errorln(args ...interface{})
func (*LogWrapper) Fatal ¶
func (c *LogWrapper) Fatal(args ...interface{})
func (*LogWrapper) Fatalf ¶
func (c *LogWrapper) Fatalf(format string, args ...interface{})
func (*LogWrapper) Fatalln ¶
func (c *LogWrapper) Fatalln(args ...interface{})
func (*LogWrapper) Info ¶
func (c *LogWrapper) Info(args ...interface{})
func (*LogWrapper) Infof ¶
func (c *LogWrapper) Infof(format string, args ...interface{})
func (*LogWrapper) Infoln ¶
func (c *LogWrapper) Infoln(args ...interface{})
func (*LogWrapper) Panic ¶
func (c *LogWrapper) Panic(args ...interface{})
func (*LogWrapper) Panicf ¶
func (c *LogWrapper) Panicf(format string, args ...interface{})
func (*LogWrapper) Panicln ¶
func (c *LogWrapper) Panicln(args ...interface{})
func (*LogWrapper) Print ¶
func (c *LogWrapper) Print(args ...interface{})
func (*LogWrapper) Printf ¶
func (c *LogWrapper) Printf(format string, args ...interface{})
func (*LogWrapper) Println ¶
func (c *LogWrapper) Println(args ...interface{})
func (*LogWrapper) Warn ¶
func (c *LogWrapper) Warn(args ...interface{})
func (*LogWrapper) Warnf ¶
func (c *LogWrapper) Warnf(format string, args ...interface{})
func (*LogWrapper) Warnln ¶
func (c *LogWrapper) Warnln(args ...interface{})