Documentation
¶
Index ¶
- Constants
- Variables
- func DefaultBuildSelect(columns []string, table *schema.Table, lastPk, batchSize uint64) squirrel.SelectBuilder
- func GetMd5HashesSql(schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (string, []interface{}, error)
- func Int64Value(value interface{}) (int64, bool)
- func MaskedDSN(c *mysql.Config) string
- func MaxPrimaryKeys(db *sql.DB, tables []*schema.Table, logger *logrus.Entry) (map[*schema.Table]uint64, []*schema.Table, error)
- func QuotedTableName(table *schema.Table) string
- func QuotedTableNameFromString(database, table string) string
- func Uint64Value(value interface{}) (uint64, bool)
- func WaitForThrottle(t Throttler)
- func WithRetries(maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, ...) (err error)
- func WithRetriesContext(ctx context.Context, maxRetries int, sleep time.Duration, logger *logrus.Entry, ...) (err error)
- type AtomicBoolean
- type BatchWriter
- type BinlogDeleteEvent
- type BinlogInsertEvent
- type BinlogStreamer
- func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error)
- func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() error
- func (s *BinlogStreamer) FlushAndStop()
- func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position
- func (s *BinlogStreamer) Initialize() (err error)
- func (s *BinlogStreamer) IsAlmostCaughtUp() bool
- func (s *BinlogStreamer) Run()
- type BinlogUpdateEvent
- type BinlogWriter
- type ChecksumTableVerifier
- type Config
- type ControlServer
- func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) Initialize() (err error)
- func (this *ControlServer) Run(wg *sync.WaitGroup)
- func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) Shutdown() error
- type CopyFilter
- type CountMetric
- type Cursor
- type CursorConfig
- type DMLEvent
- func NewBinlogDMLEvents(table *schema.Table, ev *replication.BinlogEvent) ([]DMLEvent, error)
- func NewBinlogDeleteEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
- func NewBinlogInsertEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
- func NewBinlogUpdateEvents(table *schema.Table, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
- type DMLEventBase
- type DataIterator
- type DataIteratorState
- func (this *DataIteratorState) CompletedTables() map[string]bool
- func (this *DataIteratorState) EstimatedPKProcessedPerSecond() float64
- func (this *DataIteratorState) LastSuccessfulPrimaryKeys() map[string]uint64
- func (this *DataIteratorState) MarkTableAsCompleted(table string)
- func (this *DataIteratorState) TargetPrimaryKeys() map[string]uint64
- func (this *DataIteratorState) UpdateLastSuccessfulPK(table string, pk uint64)
- func (this *DataIteratorState) UpdateTargetPK(table string, pk uint64)
- type DatabaseConfig
- type ErrorHandler
- type Ferry
- func (f *Ferry) FlushBinlogAndStopStreaming()
- func (f *Ferry) Initialize() (err error)
- func (f *Ferry) Run()
- func (f *Ferry) RunStandaloneDataCopy(tables []*schema.Table) error
- func (f *Ferry) Start() error
- func (f *Ferry) WaitUntilBinlogStreamerCatchesUp()
- func (f *Ferry) WaitUntilRowCopyIsComplete()
- type GaugeMetric
- type IncompleteVerificationError
- type IterativeVerifier
- func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, ...) (map[uint64][]byte, error)
- func (v *IterativeVerifier) Initialize() error
- func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error)
- func (v *IterativeVerifier) SanityCheckParameters() error
- func (v *IterativeVerifier) StartInBackground() error
- func (v *IterativeVerifier) VerifyBeforeCutover() error
- func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error)
- func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error)
- func (v *IterativeVerifier) Wait()
- type LagThrottler
- type LagThrottlerConfig
- type MetricBase
- type MetricTag
- type Metrics
- func (m *Metrics) AddConsumer()
- func (m *Metrics) Count(key string, value int64, tags []MetricTag, sampleRate float64)
- func (m *Metrics) DoneConsumer()
- func (m *Metrics) Gauge(key string, value float64, tags []MetricTag, sampleRate float64)
- func (m *Metrics) Measure(key string, tags []MetricTag, sampleRate float64, f func())
- func (m *Metrics) StopAndFlush()
- func (m *Metrics) Timer(key string, duration time.Duration, tags []MetricTag, sampleRate float64)
- type PKPositionLog
- type PanicErrorHandler
- type PauserThrottler
- type ReverifyBatch
- type ReverifyEntry
- type ReverifyStore
- type RowBatch
- type RowData
- type SqlDBWithFakeRollback
- type SqlPreparer
- type SqlPreparerAndRollbacker
- type Status
- type TLSConfig
- type TableFilter
- type TableIdentifier
- type TableSchemaCache
- type TableStatus
- type Throttler
- type ThrottlerBase
- type TimerMetric
- type VerificationResult
- type VerificationResultAndStatus
- type Verifier
- type WorkerPool
Constants ¶
const ( StateStarting = "starting" StateCopying = "copying" StateWaitingForCutover = "wait-for-cutover" StateCutover = "cutover" StateDone = "done" )
Variables ¶
var ( VersionString string = "?.?.?+??????????????+???????" WebUiBasedir string = "" )
Functions ¶
func DefaultBuildSelect ¶
func GetMd5HashesSql ¶
func Int64Value ¶
func MaxPrimaryKeys ¶
func QuotedTableName ¶
func Uint64Value ¶
func WaitForThrottle ¶
func WaitForThrottle(t Throttler)
func WithRetries ¶
Types ¶
type AtomicBoolean ¶
type AtomicBoolean int32
func (*AtomicBoolean) Get ¶
func (a *AtomicBoolean) Get() bool
func (*AtomicBoolean) Set ¶
func (a *AtomicBoolean) Set(b bool)
type BatchWriter ¶
type BatchWriter struct { DB *sql.DB DatabaseRewrites map[string]string TableRewrites map[string]string WriteRetries int // contains filtered or unexported fields }
func (*BatchWriter) Initialize ¶
func (w *BatchWriter) Initialize()
func (*BatchWriter) WriteRowBatch ¶
func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error
type BinlogDeleteEvent ¶
type BinlogDeleteEvent struct { *DMLEventBase // contains filtered or unexported fields }
func (*BinlogDeleteEvent) AsSQLString ¶
func (e *BinlogDeleteEvent) AsSQLString(target *schema.Table) (string, error)
func (*BinlogDeleteEvent) NewValues ¶
func (e *BinlogDeleteEvent) NewValues() RowData
func (*BinlogDeleteEvent) OldValues ¶
func (e *BinlogDeleteEvent) OldValues() RowData
func (*BinlogDeleteEvent) PK ¶
func (e *BinlogDeleteEvent) PK() (uint64, error)
type BinlogInsertEvent ¶
type BinlogInsertEvent struct { *DMLEventBase // contains filtered or unexported fields }
func (*BinlogInsertEvent) AsSQLString ¶
func (e *BinlogInsertEvent) AsSQLString(target *schema.Table) (string, error)
func (*BinlogInsertEvent) NewValues ¶
func (e *BinlogInsertEvent) NewValues() RowData
func (*BinlogInsertEvent) OldValues ¶
func (e *BinlogInsertEvent) OldValues() RowData
func (*BinlogInsertEvent) PK ¶
func (e *BinlogInsertEvent) PK() (uint64, error)
type BinlogStreamer ¶
type BinlogStreamer struct { Db *sql.DB Config *Config ErrorHandler ErrorHandler Filter CopyFilter TableSchema TableSchemaCache // contains filtered or unexported fields }
func (*BinlogStreamer) AddEventListener ¶
func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error)
func (*BinlogStreamer) ConnectBinlogStreamerToMysql ¶
func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() error
func (*BinlogStreamer) FlushAndStop ¶
func (s *BinlogStreamer) FlushAndStop()
func (*BinlogStreamer) GetLastStreamedBinlogPosition ¶
func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position
func (*BinlogStreamer) Initialize ¶
func (s *BinlogStreamer) Initialize() (err error)
func (*BinlogStreamer) IsAlmostCaughtUp ¶
func (s *BinlogStreamer) IsAlmostCaughtUp() bool
func (*BinlogStreamer) Run ¶
func (s *BinlogStreamer) Run()
type BinlogUpdateEvent ¶
type BinlogUpdateEvent struct { *DMLEventBase // contains filtered or unexported fields }
func (*BinlogUpdateEvent) AsSQLString ¶
func (e *BinlogUpdateEvent) AsSQLString(target *schema.Table) (string, error)
func (*BinlogUpdateEvent) NewValues ¶
func (e *BinlogUpdateEvent) NewValues() RowData
func (*BinlogUpdateEvent) OldValues ¶
func (e *BinlogUpdateEvent) OldValues() RowData
func (*BinlogUpdateEvent) PK ¶
func (e *BinlogUpdateEvent) PK() (uint64, error)
type BinlogWriter ¶
type BinlogWriter struct { DB *sql.DB DatabaseRewrites map[string]string TableRewrites map[string]string Throttler Throttler BatchSize int WriteRetries int ErrorHandler ErrorHandler // contains filtered or unexported fields }
func (*BinlogWriter) BufferBinlogEvents ¶
func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error
func (*BinlogWriter) Initialize ¶
func (b *BinlogWriter) Initialize() error
func (*BinlogWriter) Run ¶
func (b *BinlogWriter) Run()
func (*BinlogWriter) Stop ¶
func (b *BinlogWriter) Stop()
type ChecksumTableVerifier ¶
type ChecksumTableVerifier struct { Tables []*schema.Table DatabaseRewrites map[string]string TableRewrites map[string]string SourceDB *sql.DB TargetDB *sql.DB // contains filtered or unexported fields }
func (*ChecksumTableVerifier) Result ¶
func (v *ChecksumTableVerifier) Result() (VerificationResultAndStatus, error)
func (*ChecksumTableVerifier) StartInBackground ¶
func (v *ChecksumTableVerifier) StartInBackground() error
func (*ChecksumTableVerifier) Verify ¶
func (v *ChecksumTableVerifier) Verify() (VerificationResult, error)
func (*ChecksumTableVerifier) Wait ¶
func (v *ChecksumTableVerifier) Wait()
type Config ¶
type Config struct { // Source database connection configuration // // Required Source DatabaseConfig // Target database connection configuration // // Required Target DatabaseConfig // Map database name on the source database (key of the map) to a // different name on the target database (value of the associated key). // This allows one to move data and change the database name in the // process. // // Optional: defaults to empty map/no rewrites DatabaseRewrites map[string]string // Map the table name on the source dataabase to a different name on // the target database. See DatabaseRewrite. // // Optional: defaults to empty map/no rewrites TableRewrites map[string]string // The maximum number of retries for writes if the writes failed on // the target database. // // Optional: defaults to 5. DBWriteRetries int // Filter out the databases/tables when detecting the source databases // and tables. // // Required TableFilter TableFilter // Filter out unwanted data/events from being copied. // // Optional: defaults to nil/no filter. CopyFilter CopyFilter // The server id used by Ghostferry to connect to MySQL as a replication // slave. This id must be unique on the MySQL server. If 0 is specified, // a random id will be generated upon connecting to the MySQL server. // // Optional: defaults to an automatically generated one MyServerId uint32 // The maximum number of binlog events to write at once. Note this is a // maximum: if there are not a lot of binlog events, they will be written // one at a time such the binlog streamer lag is as low as possible. This // batch size will only be hit if there is a log of binlog at the same time. // // Optional: defaults to 100 BinlogEventBatchSize int // The batch size used to iterate the data during data copy. This batch size // is always used: if this is specified to be 100, 100 rows will be copied // per iteration. // // With the current implementation of Ghostferry, we need to lock the rows // we select. This means, the larger this number is, the longer we need to // hold this lock. On the flip side, the smaller this number is, the slower // the copy will likely be. // // Optional: defaults to 200 DataIterationBatchSize uint64 // The maximum number of retries for reads if the reads fail on the source // database. // // Optional: defaults to 5 DBReadRetries int // This specify the number of concurrent goroutines, each iterating over // a single table. // // At this point in time, parallelize iteration within a single table. This // may be possible to add to the future. // // Optional: defaults to 4 DataIterationConcurrency int // This specifies if Ghostferry will pause before cutover or not. // // Optional: defaults to false AutomaticCutover bool // Config for the ControlServer ServerBindAddr string WebBasedir string }
func (*Config) ValidateConfig ¶
type ControlServer ¶
type ControlServer struct { F *Ferry Verifier Verifier Addr string Basedir string // contains filtered or unexported fields }
func (*ControlServer) HandleCutover ¶
func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleIndex ¶
func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandlePause ¶
func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleStop ¶
func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleUnpause ¶
func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleVerify ¶
func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request)
func (*ControlServer) Initialize ¶
func (this *ControlServer) Initialize() (err error)
func (*ControlServer) Run ¶
func (this *ControlServer) Run(wg *sync.WaitGroup)
func (*ControlServer) ServeHTTP ¶
func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*ControlServer) Shutdown ¶
func (this *ControlServer) Shutdown() error
type CopyFilter ¶
type CopyFilter interface { // BuildSelect is used to set up the query used for batch data copying, // allowing for restricting copying to a subset of data. Returning an error // here will cause the query to be retried, until the retry limit is // reached, at which point the ferry will be aborted. BuildSelect is passed // the columns to be selected, table being copied, the last primary key value // from the previous batch, and the batch size. Call DefaultBuildSelect to // generate the default query, which may be used as a starting point. BuildSelect([]string, *schema.Table, uint64, uint64) (sq.SelectBuilder, error) // ApplicableEvent is used to filter events for rows that have been // filtered in ConstrainSelect. ApplicableEvent should return true if the // event is for a row that would be selected by ConstrainSelect, and false // otherwise. // Returning an error here will cause the ferry to be aborted. ApplicableEvent(DMLEvent) (bool, error) }
CopyFilter provides an interface for restricting the copying to a subset of data. This typically involves adding a WHERE condition in the ConstrainSelect function, and returning false for unwanted rows in ApplicableEvent.
type CountMetric ¶
type CountMetric struct { MetricBase Value int64 }
type Cursor ¶
type Cursor struct { CursorConfig Table *schema.Table MaxPrimaryKey uint64 RowLock bool // contains filtered or unexported fields }
type CursorConfig ¶
type CursorConfig struct { DB *sql.DB Throttler Throttler ColumnsToSelect []string BuildSelect func([]string, *schema.Table, uint64, uint64) (squirrel.SelectBuilder, error) BatchSize uint64 ReadRetries int }
func (*CursorConfig) NewCursor ¶
func (c *CursorConfig) NewCursor(table *schema.Table, maxPk uint64) *Cursor
returns a new Cursor with an embedded copy of itself
func (*CursorConfig) NewCursorWithoutRowLock ¶
func (c *CursorConfig) NewCursorWithoutRowLock(table *schema.Table, maxPk uint64) *Cursor
returns a new Cursor with an embedded copy of itself
type DMLEvent ¶
type DMLEvent interface { Database() string Table() string TableSchema() *schema.Table AsSQLString(target *schema.Table) (string, error) OldValues() RowData NewValues() RowData PK() (uint64, error) }
func NewBinlogDMLEvents ¶
func NewBinlogDMLEvents(table *schema.Table, ev *replication.BinlogEvent) ([]DMLEvent, error)
func NewBinlogDeleteEvents ¶
func NewBinlogInsertEvents ¶
func NewBinlogUpdateEvents ¶
type DMLEventBase ¶
type DMLEventBase struct {
// contains filtered or unexported fields
}
The base of DMLEvent to provide the necessary methods. This desires a copy of the struct in case we want to deal with schema changes in the future.
func (*DMLEventBase) Database ¶
func (e *DMLEventBase) Database() string
func (*DMLEventBase) Table ¶
func (e *DMLEventBase) Table() string
func (*DMLEventBase) TableSchema ¶
func (e *DMLEventBase) TableSchema() *schema.Table
type DataIterator ¶
type DataIterator struct { DB *sql.DB Tables []*schema.Table Concurrency int ErrorHandler ErrorHandler CursorConfig *CursorConfig CurrentState *DataIteratorState // contains filtered or unexported fields }
func (*DataIterator) AddBatchListener ¶
func (d *DataIterator) AddBatchListener(listener func(*RowBatch) error)
func (*DataIterator) AddDoneListener ¶
func (d *DataIterator) AddDoneListener(listener func() error)
func (*DataIterator) Initialize ¶
func (d *DataIterator) Initialize() error
func (*DataIterator) Run ¶
func (d *DataIterator) Run()
type DataIteratorState ¶
type DataIteratorState struct {
// contains filtered or unexported fields
}
func (*DataIteratorState) CompletedTables ¶
func (this *DataIteratorState) CompletedTables() map[string]bool
func (*DataIteratorState) EstimatedPKProcessedPerSecond ¶
func (this *DataIteratorState) EstimatedPKProcessedPerSecond() float64
func (*DataIteratorState) LastSuccessfulPrimaryKeys ¶
func (this *DataIteratorState) LastSuccessfulPrimaryKeys() map[string]uint64
func (*DataIteratorState) MarkTableAsCompleted ¶
func (this *DataIteratorState) MarkTableAsCompleted(table string)
func (*DataIteratorState) TargetPrimaryKeys ¶
func (this *DataIteratorState) TargetPrimaryKeys() map[string]uint64
func (*DataIteratorState) UpdateLastSuccessfulPK ¶
func (this *DataIteratorState) UpdateLastSuccessfulPK(table string, pk uint64)
func (*DataIteratorState) UpdateTargetPK ¶
func (this *DataIteratorState) UpdateTargetPK(table string, pk uint64)
type DatabaseConfig ¶
type DatabaseConfig struct { Host string Port uint16 User string Pass string Collation string Params map[string]string TLS *TLSConfig }
func (*DatabaseConfig) MySQLConfig ¶
func (c *DatabaseConfig) MySQLConfig() (*mysql.Config, error)
func (*DatabaseConfig) Validate ¶
func (c *DatabaseConfig) Validate() error
type ErrorHandler ¶
type Ferry ¶
type Ferry struct { *Config SourceDB *sql.DB TargetDB *sql.DB BinlogStreamer *BinlogStreamer BinlogWriter *BinlogWriter DataIterator *DataIterator BatchWriter *BatchWriter ErrorHandler ErrorHandler Throttler Throttler Tables TableSchemaCache StartTime time.Time DoneTime time.Time OverallState string // contains filtered or unexported fields }
func (*Ferry) FlushBinlogAndStopStreaming ¶
func (f *Ferry) FlushBinlogAndStopStreaming()
After you stop writing to the source and made sure that all inflight transactions to the source are completed, call this method to ensure that the binlog streaming has caught up and stop the binlog streaming.
This method will actually not shutdown the BinlogStreamer immediately. You will know that the BinlogStreamer finished when .Run() returns.
func (*Ferry) Initialize ¶
Initialize all the components of Ghostferry and connect to the Database
func (*Ferry) Run ¶
func (f *Ferry) Run()
Spawns the background tasks that actually perform the run. Wait for the background tasks to finish.
func (*Ferry) RunStandaloneDataCopy ¶
func (*Ferry) Start ¶
Determine the binlog coordinates, table mapping for the pending Ghostferry run.
func (*Ferry) WaitUntilBinlogStreamerCatchesUp ¶
func (f *Ferry) WaitUntilBinlogStreamerCatchesUp()
func (*Ferry) WaitUntilRowCopyIsComplete ¶
func (f *Ferry) WaitUntilRowCopyIsComplete()
Call this method and perform the cutover after this method returns.
type GaugeMetric ¶
type GaugeMetric struct { MetricBase Value float64 }
type IncompleteVerificationError ¶
type IncompleteVerificationError struct{}
func (IncompleteVerificationError) Error ¶
func (e IncompleteVerificationError) Error() string
type IterativeVerifier ¶
type IterativeVerifier struct { CursorConfig *CursorConfig BinlogStreamer *BinlogStreamer TableSchemaCache TableSchemaCache SourceDB *sql.DB TargetDB *sql.DB Tables []*schema.Table IgnoredTables []string DatabaseRewrites map[string]string TableRewrites map[string]string Concurrency int // contains filtered or unexported fields }
func (*IterativeVerifier) GetHashes ¶
func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, pkColumn string, columns []schema.TableColumn, pks []uint64) (map[uint64][]byte, error)
func (*IterativeVerifier) Initialize ¶
func (v *IterativeVerifier) Initialize() error
func (*IterativeVerifier) Result ¶
func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error)
func (*IterativeVerifier) SanityCheckParameters ¶
func (v *IterativeVerifier) SanityCheckParameters() error
func (*IterativeVerifier) StartInBackground ¶
func (v *IterativeVerifier) StartInBackground() error
func (*IterativeVerifier) VerifyBeforeCutover ¶
func (v *IterativeVerifier) VerifyBeforeCutover() error
func (*IterativeVerifier) VerifyDuringCutover ¶
func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error)
func (*IterativeVerifier) VerifyOnce ¶
func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error)
func (*IterativeVerifier) Wait ¶
func (v *IterativeVerifier) Wait()
type LagThrottler ¶
type LagThrottler struct { ThrottlerBase PauserThrottler DB *sql.DB // contains filtered or unexported fields }
func NewLagThrottler ¶
func NewLagThrottler(config *LagThrottlerConfig) (*LagThrottler, error)
func (*LagThrottler) Throttled ¶
func (t *LagThrottler) Throttled() bool
type LagThrottlerConfig ¶
type LagThrottlerConfig struct { Connection DatabaseConfig MaxLag int Query string UpdateInterval string }
type MetricBase ¶
type Metrics ¶
type Metrics struct { Prefix string DefaultTags []MetricTag Sink chan interface{} // contains filtered or unexported fields }
func SetGlobalMetrics ¶
func (*Metrics) AddConsumer ¶
func (m *Metrics) AddConsumer()
func (*Metrics) DoneConsumer ¶
func (m *Metrics) DoneConsumer()
func (*Metrics) StopAndFlush ¶
func (m *Metrics) StopAndFlush()
type PKPositionLog ¶
type PanicErrorHandler ¶
type PanicErrorHandler struct { Ferry *Ferry // contains filtered or unexported fields }
func (*PanicErrorHandler) Fatal ¶
func (this *PanicErrorHandler) Fatal(from string, err error)
type PauserThrottler ¶
type PauserThrottler struct { ThrottlerBase // contains filtered or unexported fields }
func (*PauserThrottler) SetPaused ¶
func (t *PauserThrottler) SetPaused(paused bool)
func (*PauserThrottler) Throttled ¶
func (t *PauserThrottler) Throttled() bool
type ReverifyBatch ¶
type ReverifyBatch struct { Pks []uint64 Table TableIdentifier }
type ReverifyEntry ¶
type ReverifyStore ¶
type ReverifyStore struct { MapStore map[TableIdentifier]map[uint64]struct{} BatchStore []ReverifyBatch RowCount uint64 EmitLogPerRowCount uint64 // contains filtered or unexported fields }
func NewReverifyStore ¶
func NewReverifyStore() *ReverifyStore
func (*ReverifyStore) Add ¶
func (r *ReverifyStore) Add(entry ReverifyEntry)
func (ReverifyStore) FlushAndBatchByTable ¶
func (r ReverifyStore) FlushAndBatchByTable(batchsize int) []ReverifyBatch
type RowBatch ¶
type RowBatch struct {
// contains filtered or unexported fields
}
func (*RowBatch) AsSQLQuery ¶
func (*RowBatch) TableSchema ¶
func (*RowBatch) ValuesContainPk ¶
type SqlDBWithFakeRollback ¶
func (*SqlDBWithFakeRollback) Rollback ¶
func (d *SqlDBWithFakeRollback) Rollback() error
type SqlPreparer ¶
both `sql.Tx` and `sql.DB` allow a SQL query to be `Prepare`d
type SqlPreparerAndRollbacker ¶
type SqlPreparerAndRollbacker interface { SqlPreparer Rollback() error }
sql.DB does not implement Rollback, but can use SqlDBWithFakeRollback to perform a noop.
type Status ¶
type Status struct { GhostferryVersion string SourceHostPort string TargetHostPort string OverallState string StartTime time.Time CurrentTime time.Time TimeTaken time.Duration ETA time.Duration BinlogStreamerLag time.Duration PKsPerSecond uint64 AutomaticCutover bool BinlogStreamerStopRequested bool LastSuccessfulBinlogPos mysql.Position TargetBinlogPos mysql.Position Throttled bool CompletedTableCount int TotalTableCount int TableStatuses []*TableStatus AllTableNames []string AllDatabaseNames []string VerifierSupport bool VerifierAvailable bool VerificationStarted bool VerificationDone bool VerificationResult VerificationResult VerificationErr error }
func FetchStatus ¶
type TLSConfig ¶
type TableFilter ¶
type TableIdentifier ¶
A comparable and lightweight type that stores the schema and table name.
func NewTableIdentifierFromSchemaTable ¶
func NewTableIdentifierFromSchemaTable(table *schema.Table) TableIdentifier
type TableSchemaCache ¶
func LoadTables ¶
func LoadTables(db *sql.DB, tableFilter TableFilter) (TableSchemaCache, error)
func (TableSchemaCache) AllTableNames ¶
func (c TableSchemaCache) AllTableNames() (tableNames []string)
func (TableSchemaCache) AsSlice ¶
func (c TableSchemaCache) AsSlice() (tables []*schema.Table)
type TableStatus ¶
type ThrottlerBase ¶
type ThrottlerBase struct {
// contains filtered or unexported fields
}
func (*ThrottlerBase) Disabled ¶
func (t *ThrottlerBase) Disabled() bool
func (*ThrottlerBase) SetDisabled ¶
func (t *ThrottlerBase) SetDisabled(disabled bool)
type TimerMetric ¶
type TimerMetric struct { MetricBase Value time.Duration }
type VerificationResult ¶
func (VerificationResult) Error ¶
func (e VerificationResult) Error() string
type VerificationResultAndStatus ¶
type VerificationResultAndStatus struct { VerificationResult StartTime time.Time DoneTime time.Time }
func (VerificationResultAndStatus) IsDone ¶
func (r VerificationResultAndStatus) IsDone() bool
func (VerificationResultAndStatus) IsStarted ¶
func (r VerificationResultAndStatus) IsStarted() bool
type Verifier ¶
type Verifier interface { // Start the verifier in the background during the cutover phase. // Traditionally, this is called from within the ControlServer. // // This method maybe called multiple times and it's up to the verifier // to decide if it is possible to re-run the verification. StartInBackground() error // Wait for the verifier until it finishes verification after it was // started with the StartInBackground. // // A verification is "done" when it verified the dbs (either // correct or incorrect) OR when it experiences an error. Wait() // Returns the result and the status of the verification. // To check the status, call IsStarted() and IsDone() on // VerificationResultAndStatus. // // If the verification has been completed successfully (without errors) and // the data checks out to be "correct", the result will be // VerificationResult{true, ""}, with error = nil. // Otherwise, the result will be VerificationResult{false, "message"}, with // error = nil. // // If the verification is "done" but experienced an error during the check, // the result will be VerificationResult{}, with err = yourErr. Result() (VerificationResultAndStatus, error) }
The sole purpose of this interface is to make it easier for one to implement their own strategy for verification and hook it up with the ControlServer. If there is no such need, one does not need to implement this interface.
type WorkerPool ¶
func (*WorkerPool) Run ¶
func (p *WorkerPool) Run(n int) ([]interface{}, error)
Returns a list of results of the size same as the concurrency number. Returns the first error that occurs during the run. Also as soon as a single worker errors, all workers terminates.