Documentation ¶
Index ¶
- Constants
- type Applier
- func (this *Applier) AlterGhost() error
- func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error
- func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error
- func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error)
- func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, ...) error
- func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error
- func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error)
- func (this *Applier) CreateAtomicCutOverSentryTable() error
- func (this *Applier) CreateChangelogTable() error
- func (this *Applier) CreateGhostTable() error
- func (this *Applier) DropAtomicCutOverSentryTableIfExists() error
- func (this *Applier) DropChangelogTable() error
- func (this *Applier) DropGhostTable() error
- func (this *Applier) DropOldTable() error
- func (this *Applier) ExecuteThrottleQuery() (int64, error)
- func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error
- func (this *Applier) ExpectUsedLock(sessionId int64) error
- func (this *Applier) GetSessionLockName(sessionId int64) string
- func (this *Applier) InitDBConnections() (err error)
- func (this *Applier) InitiateHeartbeat()
- func (this *Applier) LockOriginalTable() error
- func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error
- func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error
- func (this *Applier) ReadMigrationRangeValues() error
- func (this *Applier) RenameTablesRollback() (renameError error)
- func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error)
- func (this *Applier) StartReplication() error
- func (this *Applier) StartSlaveIOThread() error
- func (this *Applier) StartSlaveSQLThread() error
- func (this *Applier) StopReplication() error
- func (this *Applier) StopSlaveIOThread() error
- func (this *Applier) StopSlaveSQLThread() error
- func (this *Applier) SwapTablesQuickAndBumpy() error
- func (this *Applier) Teardown()
- func (this *Applier) UnlockTables() error
- func (this *Applier) ValidateOrDropExistingTables() error
- func (this *Applier) WriteAndLogChangelog(hint, value string) (string, error)
- func (this *Applier) WriteChangelog(hint, value string) (string, error)
- func (this *Applier) WriteChangelogState(value string) (string, error)
- type BinlogEventListener
- type ChangelogState
- type EventsStreamer
- func (this *EventsStreamer) AddListener(async bool, databaseName string, tableName string, ...) (err error)
- func (this *EventsStreamer) Close() (err error)
- func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates
- func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordinates
- func (this *EventsStreamer) InitDBConnections() (err error)
- func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error
- func (this *EventsStreamer) Teardown()
- type HooksExecutor
- type Inspector
- func (this *Inspector) CountTableRows() error
- func (this *Inspector) InitDBConnections() (err error)
- func (this *Inspector) InspectOriginalTable() (err error)
- func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (columns *sql.ColumnList, virtualColumns *sql.ColumnList, ...)
- func (this *Inspector) Teardown()
- func (this *Inspector) ValidateOriginalTable() (err error)
- type Migrator
- type PrintStatusRule
- type Server
- type Throttler
Constants ¶
const ( NoPrintStatusRule PrintStatusRule = iota HeuristicPrintStatusRule = iota ForcePrintStatusRule = iota ForcePrintStatusOnlyRule = iota ForcePrintStatusAndHintRule = iota )
const ( EventsChannelBufferSize = 1 ReconnectStreamerSleepSeconds = 5 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Applier ¶
type Applier struct {
// contains filtered or unexported fields
}
Applier connects and writes the the applier-server, which is the server where migration happens. This is typically the master, but could be a replica when `--test-on-replica` or `--execute-on-replica` are given. Applier is the one to actually write row data and apply binlog events onto the ghost table. It is where the ghost & changelog tables get created. It is where the cut-over phase happens.
func NewApplier ¶
func NewApplier(migrationContext *base.MigrationContext) *Applier
func (*Applier) AlterGhost ¶
AlterGhost applies `alter` statement on ghost table
func (*Applier) ApplyDMLEventQueries ¶
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error
ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (*Applier) ApplyDMLEventQuery ¶
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error
ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted original-table binlog event
func (*Applier) ApplyIterationInsertQuery ¶
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error)
ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where data actually gets copied from original table.
func (*Applier) AtomicCutOverMagicLock ¶
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error
AtomicCutOverMagicLock
func (*Applier) AtomicCutoverRename ¶
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error
AtomicCutoverRename
func (*Applier) CalculateNextIterationRangeEndValues ¶
CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, which will be used for copying the next chunk of rows. Ir returns "false" if there is no further chunk to work through, i.e. we're past the last chunk and are done with iterating the range (and this done with copying row chunks)
func (*Applier) CreateAtomicCutOverSentryTable ¶
CreateAtomicCutOverSentryTable
func (*Applier) CreateChangelogTable ¶
CreateChangelogTable creates the changelog table on the applier host
func (*Applier) CreateGhostTable ¶
CreateGhostTable creates the ghost table on the applier host
func (*Applier) DropAtomicCutOverSentryTableIfExists ¶
DropAtomicCutOverSentryTableIfExists checks if the "old" table name happens to be a cut-over magic table; if so, it drops it.
func (*Applier) DropChangelogTable ¶
DropChangelogTable drops the changelog table on the applier host
func (*Applier) DropGhostTable ¶
DropGhostTable drops the ghost table on the applier host
func (*Applier) DropOldTable ¶
DropOldTable drops the _Old table on the applier host
func (*Applier) ExecuteThrottleQuery ¶
ExecuteThrottleQuery executes the `--throttle-query` and returns its results.
func (*Applier) ExpectProcess ¶
ExpectProcess expects a process to show up in `SHOW PROCESSLIST` that has given characteristics
func (*Applier) ExpectUsedLock ¶
ExpectUsedLock expects the special hint voluntary lock to exist on given session
func (*Applier) GetSessionLockName ¶
GetSessionLockName returns a name for the special hint session voluntary lock
func (*Applier) InitDBConnections ¶
func (*Applier) InitiateHeartbeat ¶
func (this *Applier) InitiateHeartbeat()
InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. This is done asynchronously
func (*Applier) LockOriginalTable ¶
LockOriginalTable places a write lock on the original table
func (*Applier) ReadMigrationMaxValues ¶
ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy
func (*Applier) ReadMigrationMinValues ¶
ReadMigrationMinValues returns the minimum values to be iterated on rowcopy
func (*Applier) ReadMigrationRangeValues ¶
ReadMigrationRangeValues reads min/max values that will be used for rowcopy
func (*Applier) RenameTablesRollback ¶
RenameTablesRollback renames back both table: original back to ghost, _old back to original. This is used by `--test-on-replica`
func (*Applier) ShowStatusVariable ¶
func (*Applier) StartReplication ¶
StartReplication is used by `--test-on-replica` on cut-over failure
func (*Applier) StartSlaveIOThread ¶
StartSlaveIOThread is applicable with --test-on-replica
func (*Applier) StartSlaveSQLThread ¶
StartSlaveSQLThread is applicable with --test-on-replica
func (*Applier) StopReplication ¶
StopReplication is used by `--test-on-replica` and stops replication.
func (*Applier) StopSlaveIOThread ¶
StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh. We need to keep the SQL thread active so as to complete processing received events, and have them written to the binary log, so that we can then read them via streamer.
func (*Applier) StopSlaveSQLThread ¶
StartSlaveSQLThread is applicable with --test-on-replica
func (*Applier) SwapTablesQuickAndBumpy ¶
SwapTablesQuickAndBumpy issues a two-step swap table operation: - rename original table to _old - rename ghost table to original There is a point in time in between where the table does not exist.
func (*Applier) UnlockTables ¶
UnlockTables makes tea. No wait, it unlocks tables.
func (*Applier) ValidateOrDropExistingTables ¶
ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, or attempts to drop them if instructed to.
func (*Applier) WriteAndLogChangelog ¶
func (*Applier) WriteChangelog ¶
WriteChangelog writes a value to the changelog table. It returns the hint as given, for convenience
type BinlogEventListener ¶
type BinlogEventListener struct {
// contains filtered or unexported fields
}
type ChangelogState ¶
type ChangelogState string
const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" )
func ReadChangelogState ¶
func ReadChangelogState(s string) ChangelogState
type EventsStreamer ¶
type EventsStreamer struct {
// contains filtered or unexported fields
}
EventsStreamer reads data from binary logs and streams it on. It acts as a publisher, and interested parties may subscribe for per-table events.
func NewEventsStreamer ¶
func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer
func (*EventsStreamer) AddListener ¶
func (this *EventsStreamer) AddListener( async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error)
AddListener registers a new listener for binlog events, on a per-table basis
func (*EventsStreamer) Close ¶
func (this *EventsStreamer) Close() (err error)
func (*EventsStreamer) GetCurrentBinlogCoordinates ¶
func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates
func (*EventsStreamer) GetReconnectBinlogCoordinates ¶
func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordinates
func (*EventsStreamer) InitDBConnections ¶
func (this *EventsStreamer) InitDBConnections() (err error)
func (*EventsStreamer) StreamEvents ¶
func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error
StreamEvents will begin streaming events. It will be blocking, so should be executed by a goroutine
func (*EventsStreamer) Teardown ¶
func (this *EventsStreamer) Teardown()
type HooksExecutor ¶
type HooksExecutor struct {
// contains filtered or unexported fields
}
func NewHooksExecutor ¶
func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor
type Inspector ¶
type Inspector struct {
// contains filtered or unexported fields
}
Inspector reads data from the read-MySQL-server (typically a replica, but can be the master) It is used for gaining initial status and structure, and later also follow up on progress and changelog
func NewInspector ¶
func NewInspector(migrationContext *base.MigrationContext) *Inspector
func (*Inspector) CountTableRows ¶
CountTableRows counts exact number of rows on the original table
func (*Inspector) InitDBConnections ¶
func (*Inspector) InspectOriginalTable ¶
func (*Inspector) InspectTableColumnsAndUniqueKeys ¶
func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (columns *sql.ColumnList, virtualColumns *sql.ColumnList, uniqueKeys [](*sql.UniqueKey), err error)
func (*Inspector) ValidateOriginalTable ¶
type Migrator ¶
type Migrator struct { // Log *io.Writer Log *bytes.Buffer Error error // contains filtered or unexported fields }
Migrator is the main schema migration flow manager.
func NewMigrator ¶
func NewMigrator(context *base.MigrationContext) *Migrator
func (*Migrator) ExecOnFailureHook ¶
ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external hook access point
func (*Migrator) FinalizeMigration ¶
func (this *Migrator) FinalizeMigration()
type PrintStatusRule ¶
type PrintStatusRule int
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server listens for requests on a socket file or via TCP
func NewServer ¶
func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server
func (*Server) BindSocketFile ¶
func (*Server) BindTCPPort ¶
func (*Server) RemoveSocketFile ¶
type Throttler ¶
type Throttler struct {
// contains filtered or unexported fields
}
Throttler collects metrics related to throttling and makes informed decision whether throttling should take place.
func NewThrottler ¶
func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler