Documentation ¶
Index ¶
- Constants
- Variables
- func CheckDbIsAReplica(db *sql.DB) (bool, error)
- func ConvertTableColumnsToStrings(columns []schema.TableColumn) []string
- func DefaultBuildSelect(columns []string, table *TableSchema, lastPaginationKey, batchSize uint64) squirrel.SelectBuilder
- func GetMd5HashesSql(schema, table, paginationKeyColumn string, columns []schema.TableColumn, ...) (string, []interface{}, error)
- func Int64Value(value interface{}) (int64, bool)
- func MaskedDSN(c *mysql.Config) string
- func NewMysqlPosition(file string, position uint32, err error) (mysql.Position, error)
- func NonExistingPaginationKeyColumnError(schema, table, paginationKey string) error
- func NonExistingPaginationKeyError(schema, table string) error
- func NonNumericPaginationKeyError(schema, table, paginationKey string) error
- func QuoteField(field string) string
- func QuoteFields(fields []string) (out []string)
- func QuotedTableName(table *TableSchema) string
- func QuotedTableNameFromString(database, table string) string
- func ScanByteRow(rows *sqlorig.Rows, columnCount int) ([][]byte, error)
- func ShowMasterStatusBinlogPosition(db *sql.DB) (mysql.Position, error)
- func TargetToSourceRewrites(databaseRewrites map[string]string) (map[string]string, error)
- func Uint64Value(value interface{}) (uint64, bool)
- func WaitForThrottle(t Throttler)
- func WithRetries(maxRetries int, sleep time.Duration, logger *logrus.Entry, verb string, ...) (err error)
- func WithRetriesContext(ctx context.Context, maxRetries int, sleep time.Duration, logger *logrus.Entry, ...) (err error)
- type AtomicBoolean
- type BatchWriter
- type BatchWriterVerificationFailed
- type BinlogDeleteEvent
- type BinlogEventState
- type BinlogInsertEvent
- type BinlogStreamer
- func (s *BinlogStreamer) AddBinlogEventHandler(evType replication.EventType, ...) error
- func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error)
- func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error)
- func (s *BinlogStreamer) ConnectBinlogStreamerToMysqlFrom(startFromBinlogPosition mysql.Position) (mysql.Position, error)
- func (s *BinlogStreamer) FlushAndStop()
- func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position
- func (s *BinlogStreamer) IsAlmostCaughtUp() bool
- func (s *BinlogStreamer) Run()
- type BinlogUpdateEvent
- type BinlogVerifyBatch
- type BinlogVerifySerializedStore
- type BinlogVerifyStore
- func (s *BinlogVerifyStore) Add(table *TableSchema, paginationKey uint64)
- func (s *BinlogVerifyStore) Batches(batchsize int) []BinlogVerifyBatch
- func (s *BinlogVerifyStore) CurrentEntriesCount() uint64
- func (s *BinlogVerifyStore) CurrentRowCount() uint64
- func (s *BinlogVerifyStore) RemoveVerifiedBatch(batch BinlogVerifyBatch)
- func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore
- type BinlogWriter
- type CascadingPaginationColumnConfig
- type ChecksumTableVerifier
- func (v *ChecksumTableVerifier) Message() string
- func (v *ChecksumTableVerifier) Result() (VerificationResultAndStatus, error)
- func (v *ChecksumTableVerifier) StartInBackground() error
- func (v *ChecksumTableVerifier) VerifyBeforeCutover() error
- func (v *ChecksumTableVerifier) VerifyDuringCutover() (VerificationResult, error)
- func (v *ChecksumTableVerifier) Wait()
- type ColumnCompressionConfig
- type ColumnIgnoreConfig
- type CompressionVerifier
- func (c *CompressionVerifier) Decompress(table, column, algorithm string, compressed []byte) ([]byte, error)
- func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, paginationKeyColumn string, ...) (map[uint64][]byte, error)
- func (c *CompressionVerifier) HashRow(decompressedRowData [][]byte) ([]byte, error)
- func (c *CompressionVerifier) IsCompressedTable(table string) bool
- type Config
- type ControlServer
- func (this *ControlServer) HandleConfigGet(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleConfigPost(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleScript(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleStatus(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) Initialize() (err error)
- func (this *ControlServer) Run()
- func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
- func (this *ControlServer) Wait()
- type ControlServerConfig
- type ControlServerStatus
- type ControlServerTableStatus
- type CopyFilter
- type CountMetric
- type Cursor
- type CursorConfig
- func (c CursorConfig) GetBatchSize(schemaName string, tableName string) uint64
- func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor
- func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor
- type CustomScriptStatus
- type DMLEvent
- func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, ...) ([]DMLEvent, error)
- func NewBinlogDeleteEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
- func NewBinlogInsertEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
- func NewBinlogUpdateEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
- type DMLEventBase
- func (e *DMLEventBase) Annotation() (string, error)
- func (e *DMLEventBase) BinlogPosition() mysql.Position
- func (e *DMLEventBase) Database() string
- func (e *DMLEventBase) ResumableBinlogPosition() mysql.Position
- func (e *DMLEventBase) Table() string
- func (e *DMLEventBase) TableSchema() *TableSchema
- func (e *DMLEventBase) Timestamp() time.Time
- type DataIterationBatchSizePerTableOverride
- type DataIterator
- type DataIteratorSorter
- type DatabaseConfig
- type ErrorHandler
- type Ferry
- func (f *Ferry) EndCutover(cutoverStart time.Time)
- func (f *Ferry) FlushBinlogAndStopStreaming()
- func (f *Ferry) Initialize() (err error)
- func (f *Ferry) NewBatchWriter() *BatchWriter
- func (f *Ferry) NewBatchWriterWithoutStateTracker() *BatchWriter
- func (f *Ferry) NewBinlogWriter() *BinlogWriter
- func (f *Ferry) NewBinlogWriterWithoutStateTracker() *BinlogWriter
- func (f *Ferry) NewChecksumTableVerifier() *ChecksumTableVerifier
- func (f *Ferry) NewControlServer() (*ControlServer, error)
- func (f *Ferry) NewDataIterator() *DataIterator
- func (f *Ferry) NewDataIteratorWithoutStateTracker() *DataIterator
- func (f *Ferry) NewInlineVerifier() *InlineVerifier
- func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier
- func (f *Ferry) NewIterativeVerifier() (*IterativeVerifier, error)
- func (f *Ferry) NewSourceBinlogStreamer() *BinlogStreamer
- func (f *Ferry) NewTargetBinlogStreamer() (*BinlogStreamer, error)
- func (f *Ferry) Progress() *Progress
- func (f *Ferry) ReportProgress()
- func (f *Ferry) ReportState()
- func (f *Ferry) Run()
- func (f *Ferry) RunStandaloneDataCopy(tables []*TableSchema) error
- func (f *Ferry) SerializeStateToJSON() (string, error)
- func (f *Ferry) Start() error
- func (f *Ferry) StartCutover() time.Time
- func (f *Ferry) StopTargetVerifier()
- func (f *Ferry) WaitUntilBinlogStreamerCatchesUp()
- func (f *Ferry) WaitUntilRowCopyIsComplete()
- type ForceIndexConfig
- type GaugeMetric
- type HTTPCallback
- type IncompleteVerificationError
- type InlineVerifier
- func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, ...) ([]InlineVerifierMismatches, error)
- func (v *InlineVerifier) Message() string
- func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context)
- func (v *InlineVerifier) Result() (VerificationResultAndStatus, error)
- func (v *InlineVerifier) StartInBackground() error
- func (v *InlineVerifier) VerifyBeforeCutover() error
- func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error)
- func (v *InlineVerifier) Wait()
- type InlineVerifierConfig
- type InlineVerifierMismatches
- type IterativeVerifier
- func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, paginationKeyColumn string, ...) (map[uint64][]byte, error)
- func (v *IterativeVerifier) Initialize() error
- func (v *IterativeVerifier) Message() string
- func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error)
- func (v *IterativeVerifier) SanityCheckParameters() error
- func (v *IterativeVerifier) StartInBackground() error
- func (v *IterativeVerifier) VerifyBeforeCutover() error
- func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error)
- func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error)
- func (v *IterativeVerifier) Wait()
- type IterativeVerifierConfig
- type LagThrottler
- type LagThrottlerConfig
- type MaxPaginationKeySorter
- type MaxTableSizeSorter
- type MetricBase
- type MetricTag
- type Metrics
- func (m *Metrics) AddConsumer()
- func (m *Metrics) Count(key string, value int64, tags []MetricTag, sampleRate float64)
- func (m *Metrics) DoneConsumer()
- func (m *Metrics) Gauge(key string, value float64, tags []MetricTag, sampleRate float64)
- func (m *Metrics) Measure(key string, tags []MetricTag, sampleRate float64, f func())
- func (m *Metrics) StopAndFlush()
- func (m *Metrics) Timer(key string, duration time.Duration, tags []MetricTag, sampleRate float64)
- type PaginationKeyPositionLog
- type PanicErrorHandler
- type PauserThrottler
- type Progress
- type ReplicatedMasterPositionFetcher
- type ReplicatedMasterPositionViaCustomQuery
- type ReverifyBatch
- type ReverifyEntry
- type ReverifyStore
- type RowBatch
- func (e *RowBatch) AsSQLQuery(schemaName, tableName string) (string, []interface{}, error)
- func (e *RowBatch) EstimateByteSize() uint64
- func (e *RowBatch) Fingerprints() map[uint64][]byte
- func (e *RowBatch) PaginationKeyIndex() int
- func (e *RowBatch) Size() int
- func (e *RowBatch) TableSchema() *TableSchema
- func (e *RowBatch) Values() []RowData
- func (e *RowBatch) ValuesContainPaginationKey() bool
- type RowData
- type RowStats
- type SerializableState
- type SqlDBWithFakeRollback
- type SqlPreparer
- type SqlPreparerAndRollbacker
- type StateTracker
- func (s *StateTracker) EstimatedPaginationKeysPerSecond() float64
- func (s *StateTracker) IsTableComplete(table string) bool
- func (s *StateTracker) LastSuccessfulPaginationKey(table string) uint64
- func (s *StateTracker) MarkTableAsCompleted(table string)
- func (s *StateTracker) RowStatsWrittenPerTable() map[string]RowStats
- func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, ...) *SerializableState
- func (s *StateTracker) UpdateLastResumableBinlogPositionForTargetVerifier(pos mysql.Position)
- func (s *StateTracker) UpdateLastResumableSourceBinlogPosition(pos mysql.Position)
- func (s *StateTracker) UpdateLastResumableSourceBinlogPositionForInlineVerifier(pos mysql.Position)
- func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginationKey uint64, rowStats RowStats)
- type StmtCache
- type TLSConfig
- type TableColumnCompressionConfig
- type TableFilter
- type TableIdentifier
- type TableMaxPaginationKey
- type TableProgress
- type TableSchema
- type TableSchemaCache
- type TargetVerifier
- type Throttler
- type ThrottlerBase
- type TimerMetric
- type UnsupportedCompressionError
- type UpdatableConfig
- type VerificationResult
- type VerificationResultAndStatus
- type Verifier
- type WaitUntilReplicaIsCaughtUpToMaster
- type WorkerPool
Constants ¶
const ( VerifierTypeChecksumTable = "ChecksumTable" VerifierTypeIterative = "Iterative" VerifierTypeInline = "Inline" VerifierTypeNoVerification = "NoVerification" DefaultNet = "tcp" DefaultMarginalia = "application:ghostferry" MySQLNumParamsLimit = 1<<16 - 1 // see num_params https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html )
const ( StateStarting = "starting" StateCopying = "copying" StateWaitingForCutover = "wait-for-cutover" StateVerifyBeforeCutover = "verify-before-cutover" StateCutover = "cutover" StateDone = "done" )
const ( MismatchColumnMissingOnSource mismatchType = "column missing on source" MismatchColumnMissingOnTarget mismatchType = "column missing on target" MismatchRowMissingOnSource mismatchType = "row missing on source" MismatchRowMissingOnTarget mismatchType = "row missing on target" MismatchColumnValueDifference mismatchType = "column value difference" MismatchRowChecksumDifference mismatchType = "rows checksum difference" )
const ( TableActionWaiting = "waiting" TableActionCopying = "copying" TableActionCompleted = "completed" )
const (
// CompressionSnappy is used to identify Snappy (https://google.github.io/snappy/) compressed column data
CompressionSnappy = "SNAPPY"
)
Variables ¶
var ( VersionString string = "?.?.?+??????????????+???????" WebUiBasedir string = "" )
Functions ¶
func ConvertTableColumnsToStrings ¶
func ConvertTableColumnsToStrings(columns []schema.TableColumn) []string
func DefaultBuildSelect ¶
func DefaultBuildSelect(columns []string, table *TableSchema, lastPaginationKey, batchSize uint64) squirrel.SelectBuilder
func GetMd5HashesSql ¶
func Int64Value ¶
func NewMysqlPosition ¶
func NonExistingPaginationKeyColumnError ¶
NonExistingPaginationKeyColumnError exported to facilitate black box testing
func NonExistingPaginationKeyError ¶
NonExistingPaginationKeyError exported to facilitate black box testing
func NonNumericPaginationKeyError ¶
NonNumericPaginationKeyError exported to facilitate black box testing
func QuoteField ¶
func QuoteFields ¶
func QuotedTableName ¶
func QuotedTableName(table *TableSchema) string
func TargetToSourceRewrites ¶
func Uint64Value ¶
func WaitForThrottle ¶
func WaitForThrottle(t Throttler)
func WithRetries ¶
Types ¶
type AtomicBoolean ¶
type AtomicBoolean int32
func (*AtomicBoolean) Get ¶
func (a *AtomicBoolean) Get() bool
func (*AtomicBoolean) Set ¶
func (a *AtomicBoolean) Set(b bool)
type BatchWriter ¶
type BatchWriter struct { DB *sql.DB InlineVerifier *InlineVerifier StateTracker *StateTracker EnforceInlineVerification bool // Only needed when running the BatchWriter during cutover DatabaseRewrites map[string]string TableRewrites map[string]string WriteRetries int // contains filtered or unexported fields }
func (*BatchWriter) Initialize ¶
func (w *BatchWriter) Initialize()
func (*BatchWriter) WriteRowBatch ¶
func (w *BatchWriter) WriteRowBatch(batch *RowBatch) error
type BatchWriterVerificationFailed ¶
type BatchWriterVerificationFailed struct {
// contains filtered or unexported fields
}
func (BatchWriterVerificationFailed) Error ¶
func (e BatchWriterVerificationFailed) Error() string
type BinlogDeleteEvent ¶
type BinlogDeleteEvent struct { *DMLEventBase // contains filtered or unexported fields }
func (*BinlogDeleteEvent) AsSQLString ¶
func (e *BinlogDeleteEvent) AsSQLString(schemaName, tableName string) (string, error)
func (*BinlogDeleteEvent) NewValues ¶
func (e *BinlogDeleteEvent) NewValues() RowData
func (*BinlogDeleteEvent) OldValues ¶
func (e *BinlogDeleteEvent) OldValues() RowData
func (*BinlogDeleteEvent) PaginationKey ¶
func (e *BinlogDeleteEvent) PaginationKey() (uint64, error)
type BinlogEventState ¶
type BinlogEventState struct {
// contains filtered or unexported fields
}
this is passed into event handlers to keep track of state of the binlog event stream.
type BinlogInsertEvent ¶
type BinlogInsertEvent struct { *DMLEventBase // contains filtered or unexported fields }
func (*BinlogInsertEvent) AsSQLString ¶
func (e *BinlogInsertEvent) AsSQLString(schemaName, tableName string) (string, error)
func (*BinlogInsertEvent) NewValues ¶
func (e *BinlogInsertEvent) NewValues() RowData
func (*BinlogInsertEvent) OldValues ¶
func (e *BinlogInsertEvent) OldValues() RowData
func (*BinlogInsertEvent) PaginationKey ¶
func (e *BinlogInsertEvent) PaginationKey() (uint64, error)
type BinlogStreamer ¶
type BinlogStreamer struct { DB *sql.DB DBConfig *DatabaseConfig MyServerId uint32 ErrorHandler ErrorHandler Filter CopyFilter TableSchema TableSchemaCache LogTag string // These rewrite structures are used specifically for the Target // Verifier as it needs to map events streamed from the Target back // to the TableSchemaCache of the Source // // See https://github.com/Shopify/ghostferry/pull/258 for details DatabaseRewrites map[string]string TableRewrites map[string]string // contains filtered or unexported fields }
func (*BinlogStreamer) AddBinlogEventHandler ¶
func (s *BinlogStreamer) AddBinlogEventHandler(evType replication.EventType, eh func(*replication.BinlogEvent, []byte, *BinlogEventState) ([]byte, error)) error
Attach an event handler to a replication BinLogEvent We only support attaching events to any of the events defined in https://github.com/go-mysql-org/go-mysql/blob/master/replication/const.go custom event handlers are provided the replication BinLogEvent and a state object that carries the current state of the binlog event stream.
func (*BinlogStreamer) AddEventListener ¶
func (s *BinlogStreamer) AddEventListener(listener func([]DMLEvent) error)
func (*BinlogStreamer) ConnectBinlogStreamerToMysql ¶
func (s *BinlogStreamer) ConnectBinlogStreamerToMysql() (mysql.Position, error)
func (*BinlogStreamer) ConnectBinlogStreamerToMysqlFrom ¶
func (*BinlogStreamer) FlushAndStop ¶
func (s *BinlogStreamer) FlushAndStop()
func (*BinlogStreamer) GetLastStreamedBinlogPosition ¶
func (s *BinlogStreamer) GetLastStreamedBinlogPosition() mysql.Position
func (*BinlogStreamer) IsAlmostCaughtUp ¶
func (s *BinlogStreamer) IsAlmostCaughtUp() bool
func (*BinlogStreamer) Run ¶
func (s *BinlogStreamer) Run()
type BinlogUpdateEvent ¶
type BinlogUpdateEvent struct { *DMLEventBase // contains filtered or unexported fields }
func (*BinlogUpdateEvent) AsSQLString ¶
func (e *BinlogUpdateEvent) AsSQLString(schemaName, tableName string) (string, error)
func (*BinlogUpdateEvent) NewValues ¶
func (e *BinlogUpdateEvent) NewValues() RowData
func (*BinlogUpdateEvent) OldValues ¶
func (e *BinlogUpdateEvent) OldValues() RowData
func (*BinlogUpdateEvent) PaginationKey ¶
func (e *BinlogUpdateEvent) PaginationKey() (uint64, error)
type BinlogVerifyBatch ¶
type BinlogVerifySerializedStore ¶
func (BinlogVerifySerializedStore) Copy ¶
func (s BinlogVerifySerializedStore) Copy() BinlogVerifySerializedStore
func (BinlogVerifySerializedStore) EntriesCount ¶
func (s BinlogVerifySerializedStore) EntriesCount() uint64
func (BinlogVerifySerializedStore) RowCount ¶
func (s BinlogVerifySerializedStore) RowCount() uint64
type BinlogVerifyStore ¶
type BinlogVerifyStore struct { EmitLogPerRowsAdded uint64 // contains filtered or unexported fields }
This struct is very similar to ReverifyStore, but it is more optimized for serialization into JSON.
TODO: remove IterativeVerifier and remove this comment.
func NewBinlogVerifyStore ¶
func NewBinlogVerifyStore() *BinlogVerifyStore
func NewBinlogVerifyStoreFromSerialized ¶
func NewBinlogVerifyStoreFromSerialized(serialized BinlogVerifySerializedStore) *BinlogVerifyStore
func (*BinlogVerifyStore) Add ¶
func (s *BinlogVerifyStore) Add(table *TableSchema, paginationKey uint64)
func (*BinlogVerifyStore) Batches ¶
func (s *BinlogVerifyStore) Batches(batchsize int) []BinlogVerifyBatch
func (*BinlogVerifyStore) CurrentEntriesCount ¶
func (s *BinlogVerifyStore) CurrentEntriesCount() uint64
func (*BinlogVerifyStore) CurrentRowCount ¶
func (s *BinlogVerifyStore) CurrentRowCount() uint64
func (*BinlogVerifyStore) RemoveVerifiedBatch ¶
func (s *BinlogVerifyStore) RemoveVerifiedBatch(batch BinlogVerifyBatch)
func (*BinlogVerifyStore) Serialize ¶
func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore
type BinlogWriter ¶
type BinlogWriter struct { DB *sql.DB DatabaseRewrites map[string]string TableRewrites map[string]string Throttler Throttler BatchSize int WriteRetries int ErrorHandler ErrorHandler StateTracker *StateTracker // contains filtered or unexported fields }
func (*BinlogWriter) BufferBinlogEvents ¶
func (b *BinlogWriter) BufferBinlogEvents(events []DMLEvent) error
func (*BinlogWriter) Run ¶
func (b *BinlogWriter) Run()
func (*BinlogWriter) Stop ¶
func (b *BinlogWriter) Stop()
type CascadingPaginationColumnConfig ¶
type CascadingPaginationColumnConfig struct { // PerTable has greatest specificity and takes precedence over the other options PerTable map[string]map[string]string // SchemaName => TableName => ColumnName // FallbackColumn is a global default to fallback to and is less specific than the // default, which is the Primary Key FallbackColumn string }
CascadingPaginationColumnConfig to configure pagination columns to be used. The term `Cascading` to denote that greater specificity takes precedence.
func (*CascadingPaginationColumnConfig) FallbackPaginationColumnName ¶
func (c *CascadingPaginationColumnConfig) FallbackPaginationColumnName() (string, bool)
FallbackPaginationColumnName retreives the column name specified as a fallback when the Primary Key isn't suitable for pagination
func (*CascadingPaginationColumnConfig) PaginationColumnFor ¶
func (c *CascadingPaginationColumnConfig) PaginationColumnFor(schemaName, tableName string) (string, bool)
PaginationColumnFor is a helper function to retrieve the column name to paginate by
type ChecksumTableVerifier ¶
type ChecksumTableVerifier struct { Tables []*TableSchema DatabaseRewrites map[string]string TableRewrites map[string]string SourceDB *sql.DB TargetDB *sql.DB // contains filtered or unexported fields }
func (*ChecksumTableVerifier) Message ¶
func (v *ChecksumTableVerifier) Message() string
func (*ChecksumTableVerifier) Result ¶
func (v *ChecksumTableVerifier) Result() (VerificationResultAndStatus, error)
func (*ChecksumTableVerifier) StartInBackground ¶
func (v *ChecksumTableVerifier) StartInBackground() error
func (*ChecksumTableVerifier) VerifyBeforeCutover ¶
func (v *ChecksumTableVerifier) VerifyBeforeCutover() error
func (*ChecksumTableVerifier) VerifyDuringCutover ¶
func (v *ChecksumTableVerifier) VerifyDuringCutover() (VerificationResult, error)
func (*ChecksumTableVerifier) Wait ¶
func (v *ChecksumTableVerifier) Wait()
type ColumnCompressionConfig ¶
SchemaName => TableName => ColumnName => CompressionAlgorithm Example: blog1 => articles => body => snappy
(SELECT body FROM blog1.articles => returns compressed blob)
func (ColumnCompressionConfig) CompressedColumnsFor ¶
func (c ColumnCompressionConfig) CompressedColumnsFor(schemaName, tableName string) map[string]string
type ColumnIgnoreConfig ¶
SchemaName => TableName => ColumnName => struct{}{} These columns will be ignored during InlineVerification
func (ColumnIgnoreConfig) IgnoredColumnsFor ¶
func (c ColumnIgnoreConfig) IgnoredColumnsFor(schemaName, tableName string) map[string]struct{}
type CompressionVerifier ¶
type CompressionVerifier struct {
// contains filtered or unexported fields
}
CompressionVerifier provides support for verifying the payload of compressed columns that may have different hashes for the same data by first decompressing the compressed data before fingerprinting
func NewCompressionVerifier ¶
func NewCompressionVerifier(tableColumnCompressions TableColumnCompressionConfig) (*CompressionVerifier, error)
NewCompressionVerifier first checks the map for supported compression algorithms before initializing and returning the initialized instance.
func (*CompressionVerifier) Decompress ¶
func (c *CompressionVerifier) Decompress(table, column, algorithm string, compressed []byte) ([]byte, error)
Decompress will apply the configured decompression algorithm to the configured columns data
func (*CompressionVerifier) GetCompressedHashes ¶
func (c *CompressionVerifier) GetCompressedHashes(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (map[uint64][]byte, error)
GetCompressedHashes compares the source data with the target data to ensure the integrity of the data being copied.
The GetCompressedHashes method checks if the existing table contains compressed data and will apply the decompression algorithm to the applicable columns if necessary. After the columns are decompressed, the hashes of the data are used to verify equality
func (*CompressionVerifier) HashRow ¶
func (c *CompressionVerifier) HashRow(decompressedRowData [][]byte) ([]byte, error)
HashRow will fingerprint the non-primary columns of the row to verify data equality
func (*CompressionVerifier) IsCompressedTable ¶
func (c *CompressionVerifier) IsCompressedTable(table string) bool
IsCompressedTable will identify whether or not a table is compressed
type Config ¶
type Config struct { // Source database connection configuration // // Required Source *DatabaseConfig // Target database connection configuration // // Required Target *DatabaseConfig // Map database name on the source database (key of the map) to a // different name on the target database (value of the associated key). // This allows one to move data and change the database name in the // process. // // Optional: defaults to empty map/no rewrites DatabaseRewrites map[string]string // Map the table name on the source database to a different name on // the target database. See DatabaseRewrite. // // Optional: defaults to empty map/no rewrites TableRewrites map[string]string // The maximum number of retries for writes if the writes failed on // the target database. // // Optional: defaults to 5. DBWriteRetries int // Filter out the databases/tables when detecting the source databases // and tables. // // Required TableFilter TableFilter // Filter out unwanted data/events from being copied. // // Optional: defaults to nil/no filter. CopyFilter CopyFilter // The server id used by Ghostferry to connect to MySQL as a replication // slave. This id must be unique on the MySQL server. If 0 is specified, // a random id will be generated upon connecting to the MySQL server. // // Optional: defaults to an automatically generated one MyServerId uint32 // The maximum number of binlog events to write at once. Note this is a // maximum: if there are not a lot of binlog events, they will be written // one at a time such the binlog streamer lag is as low as possible. This // batch size will only be hit if there is a log of binlog at the same time. // // Optional: defaults to 100 BinlogEventBatchSize int // This optional config uses different data points to calculate // batch size per table using linear interpolation DataIterationBatchSizePerTableOverride *DataIterationBatchSizePerTableOverride // The maximum number of retries for reads if the reads fail on the source // database. // // Optional: defaults to 5 DBReadRetries int // This specify the number of concurrent goroutines, each iterating over // a single table. // // At this point in time, parallelize iteration within a single table. This // may be possible to add to the future. // // Optional: defaults to 4 DataIterationConcurrency int // This specifies if Ghostferry will pause before cutover or not. // // Optional: defaults to false AutomaticCutover bool // This specifies whether or not Ferry.Run will handle SIGINT and SIGTERM // by dumping the current state to stdout and the error HTTP callback. // The dumped state can be used to resume Ghostferry. DumpStateOnSignal bool // This specifies whether or not Ghostferry will dump the current state to stdout // before exiting due to an error. // // Optional: defaults to false DumpStateToStdoutOnError bool // This excludes schema cache from the state dump in both the HTTP callback // and the stdout dumping. This may save a lot of space if you don't need // to deal with schema migrations. DoNotIncludeSchemaCacheInStateDump bool // Config for the ControlServer ControlServerConfig *ControlServerConfig // Report progress via an HTTP callback. The Payload field of the callback // will be sent to the server as the CustomPayload field in the Progress // struct. The unit of ProgressReportFrequency is in milliseconds. ProgressCallback HTTPCallback ProgressReportFrequency int // Report state via an HTTP callback. The SerializedState struct will be // sent as the Payload parameter. The unit of StateReportFrequency is // in milliseconds. StateCallback HTTPCallback StateReportFrequency int // Report error via an HTTP callback. The Payload field will contain the ErrorType, // ErrorMessage and the StateDump. ErrorCallback HTTPCallback // Report when ghostferry is entering cutover CutoverLock HTTPCallback // Report when ghostferry is finished cutover CutoverUnlock HTTPCallback // If the callback returns a non OK status, these two values configure the number of times Ferry should attempt to // retry acquiring the cutover lock, and for how long the Ferry should wait // before attempting another lock acquisition // MaxCutoverRetries default is 1 retry // CutoverRetryWaitSeconds default is 1 second MaxCutoverRetries int CutoverRetryWaitSeconds int // The state to resume from as dumped by the PanicErrorHandler. // If this is null, a new Ghostferry run will be started. Otherwise, the // reconciliation process will start and Ghostferry will resume after that. StateToResumeFrom *SerializableState // The verifier to use during the run. Valid choices are: // ChecksumTable // Iterative // NoVerification // // If it is left blank, the Verifier member variable on the Ferry will be // used. If that member variable is nil, no verification will be done. VerifierType string // Only useful if VerifierType == Iterative. // This specifies the configurations to the IterativeVerifier. // // This option is in the process of being deprecated. IterativeVerifierConfig IterativeVerifierConfig // Only useful if VerifierType == Inline. // This specifies the configurations to the InlineVerifierConfig. InlineVerifierConfig InlineVerifierConfig // For old versions mysql<5.6.2, MariaDB<10.1.6 which has no related var // Make sure you have binlog_row_image=FULL when turning on this SkipBinlogRowImageCheck bool // This config is necessary for inline verification for a special case of // Ghostferry: // // - If you are copying a table where the data is already partially on the // target through some other means. // - Specifically, the PaginationKey of this row on both the source and the target are // the same. Thus, INSERT IGNORE will skip copying this row, leaving the // data on the target unchanged. // - If the data on the target is already identical to the source, then // verification will pass and all is well. // - However, if this data is compressed with a non-determinstic algorithm // such as snappy, the compressed blob may not be equal even when the // uncompressed data is equal. // - This column signals to the InlineVerifier that it needs to decompress // the data to compare identity. // // Note: a similar option exists in IterativeVerifier. However, the // IterativeVerifier is being deprecated and this will be the correct place // to specify it if you don't need the IterativeVerifier. CompressedColumnsForVerification ColumnCompressionConfig // This config is also for inline verification for the same special case of // Ghostferry as documented with the CompressedColumnsForVerification option: // // - If you're copying a table where the data is partially already on the // the target through some other means. // - A difference in a particular column could be acceptable. // - An example would be a table with a data field and a created_at field. // Maybe the created_at field is not important for data integrity as long // as the data field is correct. // - Putting the column in this config will cause the InlineVerifier to skip // this column for verification. IgnoredColumnsForVerification ColumnIgnoreConfig // Map an index to a table, will add `FORCE INDEX (index_name)` to the fingerprint SELECT query. // Index hinting might be necessary if you are running into slow queries during copy on your target. // // Example: // // "ForceIndexForVerification": { // "blog": { // "users": "ix_users_some_id" // } // } // ForceIndexForVerification ForceIndexConfig // Ghostferry requires a single numeric column to paginate over tables. Inferring that column is done in the following exact order: // 1. Use the PerTable pagination column, if configured for a table. Fail if we cannot find this column in the table. // 2. Use the table's primary key column as the pagination column. Fail if the primary key is not numeric or is a composite key without a FallbackColumn specified. // 3. Use the FallbackColumn pagination column, if configured. Fail if we cannot find this column in the table. CascadingPaginationColumnConfig *CascadingPaginationColumnConfig // SkipTargetVerification is used to enable or disable target verification during moves. // This feature is currently only available while using the InlineVerifier. // // This does so by inspecting the annotations (configured as Marginalia in the DatabaseConfig above) // and will fail the move unless all applicable DMLs (as identified by the sharding key) sent to the // Target were sent from Ghostferry. // // NOTE: // The Target database must be configured with binlog_rows_query_log_events // set to "ON" for this to function properly. Ghostferry not allow the move // process to begin if this is enabled and the above option is set to "OFF". // // Required: defaults to false SkipTargetVerification bool // During initialization, Ghostferry will raise an error if any // foreign key constraints are detected in the source database. // // This check can be bypassed by setting this value to true. // // WARNING: // Using Ghostferry with foreign keys is highly discouraged and // disabling this check makes no guarantees of the success of the run. // // Required: defaults to false SkipForeignKeyConstraintsCheck bool // EnableRowBatchsize is used to enable or disable the calculation of number of bytes written for each row batch. // // Optional: Defaults to false. // // NOTE: // Turning off the EnableRowBatchSize flag would show the NumBytes written per RowBatch to be zero // in the Progress. This behaviour is perfectly okay and doesn't mean there are no rows being written // to the target DB. EnableRowBatchSize bool // If the target DB is set to read_only ghostferry will throw an error during the initialization step. // AllowSuperUserOnReadOnly flag allows to run ghostferry even if the target DB is read_only. This is helpful in // scenarios where target DB needs to be restricted from writes made by any other user then the ghostferry user. // // Optional: Defaults to false. // // NOTE: // The ghostferry target user should have SUPER permissions to actually write to the target DB, // if ghostferry is ran with AllowSuperUserOnReadOnly = true and the target DB is set to read_only. AllowSuperUserOnReadOnly bool // If true, net/http/pprof will be enabled on port 6060. EnablePProf bool UpdatableConfig UpdatableConfig // ---------------------------------------------------------------------------------------------------------------- // DEPRECATED CONFIGS // The following configs are deprecated DataIterationBatchSize uint64 // replaced by UpdatableConfig.DataIterationBatchSize ServerBindAddr string // replaced by ControlServerConfig.ServerBindAddr WebBasedir string // replaced by ControlServerConfig.WebBasedir ControlServerCustomScripts map[string][]string // replaced by ControlServerConfig.CustomScripts }
func (*Config) Update ¶
func (c *Config) Update(updatedConfig UpdatableConfig)
func (*Config) ValidateConfig ¶
type ControlServer ¶
type ControlServer struct { Config *ControlServerConfig F *Ferry Verifier Verifier // contains filtered or unexported fields }
func (*ControlServer) HandleConfigGet ¶
func (this *ControlServer) HandleConfigGet(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleConfigPost ¶
func (this *ControlServer) HandleConfigPost(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleCutover ¶
func (this *ControlServer) HandleCutover(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleIndex ¶
func (this *ControlServer) HandleIndex(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandlePause ¶
func (this *ControlServer) HandlePause(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleScript ¶
func (this *ControlServer) HandleScript(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleStatus ¶
func (this *ControlServer) HandleStatus(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleStop ¶
func (this *ControlServer) HandleStop(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleUnpause ¶
func (this *ControlServer) HandleUnpause(w http.ResponseWriter, r *http.Request)
func (*ControlServer) HandleVerify ¶
func (this *ControlServer) HandleVerify(w http.ResponseWriter, r *http.Request)
func (*ControlServer) Initialize ¶
func (this *ControlServer) Initialize() (err error)
func (*ControlServer) Run ¶
func (this *ControlServer) Run()
func (*ControlServer) ServeHTTP ¶
func (this *ControlServer) ServeHTTP(w http.ResponseWriter, r *http.Request)
func (*ControlServer) Wait ¶
func (this *ControlServer) Wait()
type ControlServerConfig ¶
type ControlServerConfig struct { // enable/disable http control server Enabled bool // Bind control server address ServerBindAddr string // Path to `web` base dir WebBasedir string // TODO: refactor control server config out of the base ferry at some point // This adds optional buttons in the web ui that runs a script located at the // path specified. // The format is "script name" => ["path to script", "arg1", "arg2"]. The script name // will be displayed on the web ui. CustomScripts map[string][]string }
func (*ControlServerConfig) Validate ¶
func (c *ControlServerConfig) Validate() error
type ControlServerStatus ¶
type ControlServerStatus struct { Progress GhostferryVersion string SourceHostPort string TargetHostPort string OverallState string StartTime time.Time CurrentTime time.Time TimeTaken time.Duration ETA time.Duration BinlogStreamerLag time.Duration AutomaticCutover bool BinlogStreamerStopRequested bool CompletedTableCount int TotalTableCount int TableStatuses []*ControlServerTableStatus AllTableNames []string AllDatabaseNames []string VerifierSupport bool VerifierAvailable bool VerificationStarted bool VerificationDone bool VerificationResult VerificationResult VerificationErr error // TODO: this is populated by the control server. Clearly this all needs a refactor. CustomScriptStatuses map[string]CustomScriptStatus }
type CopyFilter ¶
type CopyFilter interface { // BuildSelect is used to set up the query used for batch data copying, // allowing for restricting copying to a subset of data. Returning an error // here will cause the query to be retried, until the retry limit is // reached, at which point the ferry will be aborted. BuildSelect is passed // the columns to be selected, table being copied, the last primary key value // from the previous batch, and the batch size. Call DefaultBuildSelect to // generate the default query, which may be used as a starting point. BuildSelect([]string, *TableSchema, uint64, uint64) (sq.SelectBuilder, error) // ApplicableEvent is used to filter events for rows that have been // filtered in ConstrainSelect. ApplicableEvent should return true if the // event is for a row that would be selected by ConstrainSelect, and false // otherwise. // Returning an error here will cause the ferry to be aborted. ApplicableEvent(DMLEvent) (bool, error) }
CopyFilter provides an interface for restricting the copying to a subset of data. This typically involves adding a WHERE condition in the ConstrainSelect function, and returning false for unwanted rows in ApplicableEvent.
type CountMetric ¶
type CountMetric struct { MetricBase Value int64 }
type Cursor ¶
type Cursor struct { CursorConfig Table *TableSchema MaxPaginationKey uint64 RowLock bool // contains filtered or unexported fields }
type CursorConfig ¶
type CursorConfig struct { DB *sql.DB Throttler Throttler ColumnsToSelect []string BuildSelect func([]string, *TableSchema, uint64, uint64) (squirrel.SelectBuilder, error) // BatchSize is a pointer to the BatchSize in Config.UpdatableConfig which can be independently updated from this code. // Having it as a pointer allows the updated value to be read without needing additional code to copy the batch size value into the cursor config for each cursor we create. BatchSize *uint64 BatchSizePerTableOverride *DataIterationBatchSizePerTableOverride ReadRetries int }
func (CursorConfig) GetBatchSize ¶
func (c CursorConfig) GetBatchSize(schemaName string, tableName string) uint64
func (*CursorConfig) NewCursor ¶
func (c *CursorConfig) NewCursor(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor
returns a new Cursor with an embedded copy of itself
func (*CursorConfig) NewCursorWithoutRowLock ¶
func (c *CursorConfig) NewCursorWithoutRowLock(table *TableSchema, startPaginationKey, maxPaginationKey uint64) *Cursor
returns a new Cursor with an embedded copy of itself
type CustomScriptStatus ¶
type DMLEvent ¶
type DMLEvent interface { Database() string Table() string TableSchema() *TableSchema AsSQLString(string, string) (string, error) OldValues() RowData NewValues() RowData PaginationKey() (uint64, error) BinlogPosition() mysql.Position ResumableBinlogPosition() mysql.Position Annotation() (string, error) Timestamp() time.Time }
func NewBinlogDMLEvents ¶
func NewBinlogDMLEvents(table *TableSchema, ev *replication.BinlogEvent, pos, resumablePos mysql.Position, query []byte) ([]DMLEvent, error)
func NewBinlogDeleteEvents ¶
func NewBinlogDeleteEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
func NewBinlogInsertEvents ¶
func NewBinlogInsertEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
func NewBinlogUpdateEvents ¶
func NewBinlogUpdateEvents(eventBase *DMLEventBase, rowsEvent *replication.RowsEvent) ([]DMLEvent, error)
type DMLEventBase ¶
type DMLEventBase struct {
// contains filtered or unexported fields
}
The base of DMLEvent to provide the necessary methods.
func NewDMLEventBase ¶
func NewDMLEventBase(table *TableSchema, pos, resumablePos mysql.Position, query []byte, timestamp time.Time) *DMLEventBase
func (*DMLEventBase) Annotation ¶
func (e *DMLEventBase) Annotation() (string, error)
Annotation will return the first prefixed comment on the SQL string, or an error if the query attribute of the DMLEvent is not set
func (*DMLEventBase) BinlogPosition ¶
func (e *DMLEventBase) BinlogPosition() mysql.Position
func (*DMLEventBase) Database ¶
func (e *DMLEventBase) Database() string
func (*DMLEventBase) ResumableBinlogPosition ¶
func (e *DMLEventBase) ResumableBinlogPosition() mysql.Position
func (*DMLEventBase) Table ¶
func (e *DMLEventBase) Table() string
func (*DMLEventBase) TableSchema ¶
func (e *DMLEventBase) TableSchema() *TableSchema
func (*DMLEventBase) Timestamp ¶
func (e *DMLEventBase) Timestamp() time.Time
type DataIterationBatchSizePerTableOverride ¶
type DataIterationBatchSizePerTableOverride struct { // Lower limit for rowSize, if a rowSize <= MinRowSize, ControlPoints[MinRowSize] will be used MinRowSize int // Upper limit for rowSize, if a rowSize >= MaxRowSize, ControlPoints[MaxRowSize] will be used MaxRowSize int // Map of rowSize => batchSize used to calculate batchSize for new rowSizes, results stored in TableOverride ControlPoints map[int]uint64 // Map of schemaName(source schema) => tableName => batchSize to override default values for certain tables TableOverride map[string]map[string]uint64 }
func (*DataIterationBatchSizePerTableOverride) CalculateBatchSize ¶
func (d *DataIterationBatchSizePerTableOverride) CalculateBatchSize(rowSize int) int
func (*DataIterationBatchSizePerTableOverride) UpdateBatchSizes ¶
func (d *DataIterationBatchSizePerTableOverride) UpdateBatchSizes(db *sql.DB, tables TableSchemaCache) error
func (*DataIterationBatchSizePerTableOverride) Validate ¶
func (d *DataIterationBatchSizePerTableOverride) Validate() error
type DataIterator ¶
type DataIterator struct { DB *sql.DB Concurrency int SelectFingerprint bool ErrorHandler ErrorHandler CursorConfig *CursorConfig StateTracker *StateTracker TableSorter DataIteratorSorter TargetPaginationKeys *sync.Map // contains filtered or unexported fields }
func (*DataIterator) AddBatchListener ¶
func (d *DataIterator) AddBatchListener(listener func(*RowBatch) error)
func (*DataIterator) AddDoneListener ¶
func (d *DataIterator) AddDoneListener(listener func() error)
func (*DataIterator) Run ¶
func (d *DataIterator) Run(tables []*TableSchema)
type DataIteratorSorter ¶
type DataIteratorSorter interface {
Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error)
}
DataIteratorSorter is an interface for the DataIterator to choose which order it will process table
type DatabaseConfig ¶
type DatabaseConfig struct { Host string Port uint16 Net string User string Pass string Collation string Params map[string]string TLS *TLSConfig // ReadTimeout is used to configure the MySQL client timeout for waiting for data from server. // Timeout is in seconds. Defaults to 0, which means no timeout. ReadTimeout uint64 // WriteTimeout is used to configure the MySQL client timeout for writing data to server. // Timeout is in seconds. Defaults to 0, which means no timeout. WriteTimeout uint64 // SQL annotations used to differentiate Ghostferry's DMLs // against other actor's. This will default to the defaultMarginalia // constant above if not set. // // This is used to ensure any changes to the Target during the move process // are performed only by Ghostferry (until cutover). Otherwise, the modification // will be identified as data corruption and fail the move. Marginalia string }
func (*DatabaseConfig) MySQLConfig ¶
func (c *DatabaseConfig) MySQLConfig() (*mysql.Config, error)
func (*DatabaseConfig) Validate ¶
func (c *DatabaseConfig) Validate() error
type ErrorHandler ¶
type Ferry ¶
type Ferry struct { *Config SourceDB *sql.DB TargetDB *sql.DB ControlServer *ControlServer BinlogStreamer *BinlogStreamer BinlogWriter *BinlogWriter TargetVerifier *TargetVerifier DataIterator *DataIterator BatchWriter *BatchWriter StateTracker *StateTracker ErrorHandler ErrorHandler Throttler Throttler WaitUntilReplicaIsCaughtUpToMaster *WaitUntilReplicaIsCaughtUpToMaster // This can be specified by the caller. If specified, do not specify // VerifierType in Config (or as an empty string) or an error will be // returned in Initialize. // // If VerifierType is specified and this is nil on Ferry initialization, a // Verifier will be created by Initialize. If an IterativeVerifier is to be // created, IterativeVerifierConfig will be used to create the verifier. Verifier Verifier Tables TableSchemaCache StartTime time.Time DoneTime time.Time OverallState atomic.Value // contains filtered or unexported fields }
func (*Ferry) EndCutover ¶
func (*Ferry) FlushBinlogAndStopStreaming ¶
func (f *Ferry) FlushBinlogAndStopStreaming()
After you stop writing to the source and made sure that all inflight transactions to the source are completed, call this method to ensure that the binlog streaming has caught up and stop the binlog streaming.
This method will actually not shutdown the BinlogStreamer immediately. You will know that the BinlogStreamer finished when .Run() returns.
func (*Ferry) Initialize ¶
Initialize all the components of Ghostferry and connect to the Database
func (*Ferry) NewBatchWriter ¶
func (f *Ferry) NewBatchWriter() *BatchWriter
func (*Ferry) NewBatchWriterWithoutStateTracker ¶
func (f *Ferry) NewBatchWriterWithoutStateTracker() *BatchWriter
func (*Ferry) NewBinlogWriter ¶
func (f *Ferry) NewBinlogWriter() *BinlogWriter
func (*Ferry) NewBinlogWriterWithoutStateTracker ¶
func (f *Ferry) NewBinlogWriterWithoutStateTracker() *BinlogWriter
func (*Ferry) NewChecksumTableVerifier ¶
func (f *Ferry) NewChecksumTableVerifier() *ChecksumTableVerifier
func (*Ferry) NewControlServer ¶
func (f *Ferry) NewControlServer() (*ControlServer, error)
func (*Ferry) NewDataIterator ¶
func (f *Ferry) NewDataIterator() *DataIterator
func (*Ferry) NewDataIteratorWithoutStateTracker ¶
func (f *Ferry) NewDataIteratorWithoutStateTracker() *DataIterator
func (*Ferry) NewInlineVerifier ¶
func (f *Ferry) NewInlineVerifier() *InlineVerifier
func (*Ferry) NewInlineVerifierWithoutStateTracker ¶
func (f *Ferry) NewInlineVerifierWithoutStateTracker() *InlineVerifier
func (*Ferry) NewIterativeVerifier ¶
func (f *Ferry) NewIterativeVerifier() (*IterativeVerifier, error)
func (*Ferry) NewSourceBinlogStreamer ¶
func (f *Ferry) NewSourceBinlogStreamer() *BinlogStreamer
func (*Ferry) NewTargetBinlogStreamer ¶
func (f *Ferry) NewTargetBinlogStreamer() (*BinlogStreamer, error)
func (*Ferry) ReportProgress ¶
func (f *Ferry) ReportProgress()
func (*Ferry) ReportState ¶
func (f *Ferry) ReportState()
ReportState may have a slight performance impact as it will temporarily lock the StateTracker when it is serialized before posting to the callback
func (*Ferry) Run ¶
func (f *Ferry) Run()
Spawns the background tasks that actually perform the run. Wait for the background tasks to finish.
func (*Ferry) RunStandaloneDataCopy ¶
func (f *Ferry) RunStandaloneDataCopy(tables []*TableSchema) error
func (*Ferry) SerializeStateToJSON ¶
func (*Ferry) Start ¶
Attach event listeners for Ghostferry components and connect the binlog streamer to the source shard
Note: Binlog streaming doesn't start until Run. Here we simply connect to MySQL.
func (*Ferry) StartCutover ¶
func (*Ferry) StopTargetVerifier ¶
func (f *Ferry) StopTargetVerifier()
func (*Ferry) WaitUntilBinlogStreamerCatchesUp ¶
func (f *Ferry) WaitUntilBinlogStreamerCatchesUp()
func (*Ferry) WaitUntilRowCopyIsComplete ¶
func (f *Ferry) WaitUntilRowCopyIsComplete()
Call this method and perform the cutover after this method returns.
type ForceIndexConfig ¶
SchemaName => TableName => IndexName These indices will be forced for queries in InlineVerification
func (ForceIndexConfig) IndexFor ¶
func (c ForceIndexConfig) IndexFor(schemaName, tableName string) string
type GaugeMetric ¶
type GaugeMetric struct { MetricBase Value float64 }
type HTTPCallback ¶
type IncompleteVerificationError ¶
type IncompleteVerificationError struct{}
func (IncompleteVerificationError) Error ¶
func (e IncompleteVerificationError) Error() string
type InlineVerifier ¶
type InlineVerifier struct { SourceDB *sql.DB TargetDB *sql.DB DatabaseRewrites map[string]string TableRewrites map[string]string TableSchemaCache TableSchemaCache BatchSize int VerifyBinlogEventsInterval time.Duration MaxExpectedDowntime time.Duration StateTracker *StateTracker ErrorHandler ErrorHandler // contains filtered or unexported fields }
func (*InlineVerifier) CheckFingerprintInline ¶
func (v *InlineVerifier) CheckFingerprintInline(tx *sql.Tx, targetSchema, targetTable string, sourceBatch *RowBatch, enforceInlineVerification bool) ([]InlineVerifierMismatches, error)
func (*InlineVerifier) Message ¶
func (v *InlineVerifier) Message() string
func (*InlineVerifier) PeriodicallyVerifyBinlogEvents ¶
func (v *InlineVerifier) PeriodicallyVerifyBinlogEvents(ctx context.Context)
func (*InlineVerifier) Result ¶
func (v *InlineVerifier) Result() (VerificationResultAndStatus, error)
func (*InlineVerifier) StartInBackground ¶
func (v *InlineVerifier) StartInBackground() error
This is called from the control server, which is triggered by pushing Run Verification during cutover. This step is necessary to ensure the binlogs are verified in Ghostferry.
func (*InlineVerifier) VerifyBeforeCutover ¶
func (v *InlineVerifier) VerifyBeforeCutover() error
func (*InlineVerifier) VerifyDuringCutover ¶
func (v *InlineVerifier) VerifyDuringCutover() (VerificationResult, error)
func (*InlineVerifier) Wait ¶
func (v *InlineVerifier) Wait()
type InlineVerifierConfig ¶
type InlineVerifierConfig struct { // The maximum expected downtime during cutover, in the format of // time.ParseDuration. If nothing is specified, the InlineVerifier will not // try to estimate the downtime and will always allow cutover. MaxExpectedDowntime string // The interval at which the periodic binlog reverification occurs, in the // format of time.ParseDuration. Default: 1s. VerifyBinlogEventsInterval string // contains filtered or unexported fields }
func (*InlineVerifierConfig) Validate ¶
func (c *InlineVerifierConfig) Validate() error
type IterativeVerifier ¶
type IterativeVerifier struct { CompressionVerifier *CompressionVerifier CursorConfig *CursorConfig BinlogStreamer *BinlogStreamer TableSchemaCache TableSchemaCache SourceDB *sql.DB TargetDB *sql.DB Tables []*TableSchema IgnoredTables []string IgnoredColumns map[string]map[string]struct{} DatabaseRewrites map[string]string TableRewrites map[string]string Concurrency int MaxExpectedDowntime time.Duration // contains filtered or unexported fields }
func (*IterativeVerifier) GetHashes ¶
func (v *IterativeVerifier) GetHashes(db *sql.DB, schema, table, paginationKeyColumn string, columns []schema.TableColumn, paginationKeys []uint64) (map[uint64][]byte, error)
func (*IterativeVerifier) Initialize ¶
func (v *IterativeVerifier) Initialize() error
func (*IterativeVerifier) Message ¶
func (v *IterativeVerifier) Message() string
func (*IterativeVerifier) Result ¶
func (v *IterativeVerifier) Result() (VerificationResultAndStatus, error)
func (*IterativeVerifier) SanityCheckParameters ¶
func (v *IterativeVerifier) SanityCheckParameters() error
func (*IterativeVerifier) StartInBackground ¶
func (v *IterativeVerifier) StartInBackground() error
func (*IterativeVerifier) VerifyBeforeCutover ¶
func (v *IterativeVerifier) VerifyBeforeCutover() error
func (*IterativeVerifier) VerifyDuringCutover ¶
func (v *IterativeVerifier) VerifyDuringCutover() (VerificationResult, error)
func (*IterativeVerifier) VerifyOnce ¶
func (v *IterativeVerifier) VerifyOnce() (VerificationResult, error)
func (*IterativeVerifier) Wait ¶
func (v *IterativeVerifier) Wait()
type IterativeVerifierConfig ¶
type IterativeVerifierConfig struct { // List of tables that should be ignored by the IterativeVerifier. IgnoredTables []string // List of columns that should be ignored by the IterativeVerifier. // This is in the format of table_name -> [list of column names] IgnoredColumns map[string][]string // The number of concurrent verifiers. Note that a single table can only be // assigned to one goroutine and currently multiple goroutines per table // is not supported. Concurrency int // The maximum expected downtime during cutover, in the format of // time.ParseDuration. MaxExpectedDowntime string // Map of the table and column identifying the compression type // (if any) of the column. This is used during verification to ensure // the data was successfully copied as some compression algorithms can // output different compressed data with the same input data. // // The data structure is a map of table names to a map of column names // to the compression algorithm. // ex: {books: {contents: snappy}} // // Currently supported compression algorithms are: // 1. Snappy (https://google.github.io/snappy/) as "SNAPPY" // // Optional: defaults to empty map/no compression // // Note that the IterativeVerifier is in the process of being deprecated. // If this is specified, ColumnCompressionConfig should also be filled out in // the main Config. TableColumnCompression TableColumnCompressionConfig }
func (*IterativeVerifierConfig) Validate ¶
func (c *IterativeVerifierConfig) Validate() error
type LagThrottler ¶
type LagThrottler struct { ThrottlerBase PauserThrottler DB *sql.DB // contains filtered or unexported fields }
func NewLagThrottler ¶
func NewLagThrottler(config *LagThrottlerConfig) (*LagThrottler, error)
func (*LagThrottler) Throttled ¶
func (t *LagThrottler) Throttled() bool
type LagThrottlerConfig ¶
type LagThrottlerConfig struct { Connection *DatabaseConfig MaxLag int Query string UpdateInterval string }
type MaxPaginationKeySorter ¶
type MaxPaginationKeySorter struct{}
MaxPaginationKeySorter arranges table based on the MaxPaginationKey in DESC order
func (*MaxPaginationKeySorter) Sort ¶
func (s *MaxPaginationKeySorter) Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error)
type MaxTableSizeSorter ¶
type MaxTableSizeSorter struct {
DataIterator *DataIterator
}
MaxTableSizeSorter uses `information_schema.tables` to estimate the size of the DB and sorts tables in DESC order
func (*MaxTableSizeSorter) Sort ¶
func (s *MaxTableSizeSorter) Sort(unorderedTables map[*TableSchema]uint64) ([]TableMaxPaginationKey, error)
type MetricBase ¶
type Metrics ¶
type Metrics struct { Prefix string DefaultTags []MetricTag Sink chan interface{} // contains filtered or unexported fields }
func SetGlobalMetrics ¶
func (*Metrics) AddConsumer ¶
func (m *Metrics) AddConsumer()
func (*Metrics) DoneConsumer ¶
func (m *Metrics) DoneConsumer()
func (*Metrics) StopAndFlush ¶
func (m *Metrics) StopAndFlush()
type PaginationKeyPositionLog ¶
For tracking the speed of the copy
type PanicErrorHandler ¶
type PanicErrorHandler struct { Ferry *Ferry ErrorCallback HTTPCallback DumpStateToStdoutOnError bool // contains filtered or unexported fields }
func (*PanicErrorHandler) Fatal ¶
func (this *PanicErrorHandler) Fatal(from string, err error)
func (*PanicErrorHandler) ReportError ¶
func (this *PanicErrorHandler) ReportError(from string, err error)
type PauserThrottler ¶
type PauserThrottler struct { ThrottlerBase // contains filtered or unexported fields }
func (*PauserThrottler) SetPaused ¶
func (t *PauserThrottler) SetPaused(paused bool)
func (*PauserThrottler) Throttled ¶
func (t *PauserThrottler) Throttled() bool
type Progress ¶
type Progress struct { // Possible values are defined in ferry.go // Shows what the ferry is currently doing in one word. CurrentState string // The Payload field of the ProgressCallback config will be copied to here // verbatim. // Example usecase: you can be sending all the status to some aggregation // server and you want some sort of custom identification with this field. CustomPayload string Tables map[string]TableProgress LastSuccessfulBinlogPos mysql.Position BinlogStreamerLag float64 // This is the amount of seconds the binlog streamer is lagging by (seconds) BinlogWriterLag float64 // This is the amount of seconds the binlog writer is lagging by (seconds) Throttled bool // if the TargetVerifier is enabled, we emit this lag, otherwise this number will be 0 TargetBinlogStreamerLag float64 // The number of data iterators currently active. ActiveDataIterators int // The behaviour of Ghostferry varies with respect to the VerifierType. // For example: a long cutover is OK if VerifierType string // The message that the verifier may emit for additional information VerifierMessage string // These are some variables that are only filled when CurrentState == done. FinalBinlogPos mysql.Position // A best estimate on the speed at which the copying is taking place. If // there are large gaps in the PaginationKey space, this probably will be inaccurate. PaginationKeysPerSecond uint64 ETA float64 // seconds TimeTaken float64 // seconds }
type ReplicatedMasterPositionViaCustomQuery ¶
type ReplicatedMasterPositionViaCustomQuery struct { // The custom query executing should return a single row with two values: // the string file and the integer position. For pt-heartbeat, this query // would be: // // "SELECT file, position FROM meta.ptheartbeat WHERE server_id = %d" % serverId // // where serverId is the master server id, and meta.ptheartbeat is the table // where pt-heartbeat writes to. // // For pt-heartbeat in particular, you should not use the // relay_master_log_file and exec_master_log_pos of the DB being replicated // as these values are not the master binlog positions. Query string }
Selects the master position that we have replicated until from a heartbeat table of sort.
type ReverifyBatch ¶
type ReverifyBatch struct { PaginationKeys []uint64 Table TableIdentifier }
type ReverifyEntry ¶
type ReverifyEntry struct { PaginationKey uint64 Table *TableSchema }
type ReverifyStore ¶
type ReverifyStore struct { MapStore map[TableIdentifier]map[uint64]struct{} BatchStore []ReverifyBatch RowCount uint64 EmitLogPerRowCount uint64 // contains filtered or unexported fields }
func NewReverifyStore ¶
func NewReverifyStore() *ReverifyStore
func (*ReverifyStore) Add ¶
func (r *ReverifyStore) Add(entry ReverifyEntry)
func (*ReverifyStore) FlushAndBatchByTable ¶
func (r *ReverifyStore) FlushAndBatchByTable(batchsize int) []ReverifyBatch
type RowBatch ¶
type RowBatch struct {
// contains filtered or unexported fields
}
func NewRowBatch ¶
func NewRowBatch(table *TableSchema, values []RowData, paginationKeyIndex int) *RowBatch
func (*RowBatch) AsSQLQuery ¶
func (*RowBatch) EstimateByteSize ¶
func (*RowBatch) Fingerprints ¶
func (*RowBatch) PaginationKeyIndex ¶
func (*RowBatch) TableSchema ¶
func (e *RowBatch) TableSchema() *TableSchema
func (*RowBatch) ValuesContainPaginationKey ¶
type RowData ¶
type RowData []interface{}
func (RowData) GetUint64 ¶
For historical reasons, this function ended up being used at two different places: the DataIterator and the DMLEvent (which indirectly is used by the BinlogStreamer, InlineVerifier, etc).
The original code here was introduced in 152caec0ff5195d4698672df6dc0f72fb77b02fc, where it is used solely in context of the DataIterator. In this context, the value being parsed here is given to us by the go-sql-driver/mysql driver. This value could either by int64 or it could be a byte slice decimal string with the uint64 value in it, which is why we have this awkward byte slice to integer trick. This also means the original code is not designed to handle uint64, as go-sql-driver/mysql never returns uint64. This could possibly be an upstream problem that have since been fixed, but this was not investigated. (A possibly related PR: https://github.com/go-sql-driver/mysql/pull/690)
At some point, this code was refactored into this function, such that the BinlogStreamer also uses the same code to decode integers. The binlog data is given to us by go-mysql-org/go-mysql. The go-mysql-org/go-mysql library should not be giving us awkward byte slices. Instead, it should properly gives us uint64. This code thus panics when it encounters such case. See https://github.com/Shopify/ghostferry/issues/165.
In summary:
- This code receives values from both go-sql-driver/mysql and go-mysql-org/go-mysql.
- go-sql-driver/mysql gives us int64 for signed integer, and uint64 in a byte slice for unsigned integer.
- go-mysql-org/go-mysql gives us int64 for signed integer, and uint64 for unsigned integer.
- We currently make this function deal with both cases. In the future we can investigate alternative solutions.
type SerializableState ¶
type SerializableState struct { GhostferryVersion string LastKnownTableSchemaCache TableSchemaCache LastSuccessfulPaginationKeys map[string]uint64 CompletedTables map[string]bool LastWrittenBinlogPosition mysql.Position BinlogVerifyStore BinlogVerifySerializedStore LastStoredBinlogPositionForInlineVerifier mysql.Position LastStoredBinlogPositionForTargetVerifier mysql.Position }
func (*SerializableState) MinSourceBinlogPosition ¶
func (s *SerializableState) MinSourceBinlogPosition() mysql.Position
type SqlDBWithFakeRollback ¶
func (*SqlDBWithFakeRollback) Rollback ¶
func (d *SqlDBWithFakeRollback) Rollback() error
type SqlPreparer ¶
both `sql.Tx` and `sql.DB` allow a SQL query to be `Prepare`d
type SqlPreparerAndRollbacker ¶
type SqlPreparerAndRollbacker interface { SqlPreparer Rollback() error }
sql.DB does not implement Rollback, but can use SqlDBWithFakeRollback to perform a noop.
type StateTracker ¶
type StateTracker struct { BinlogRWMutex *sync.RWMutex CopyRWMutex *sync.RWMutex // contains filtered or unexported fields }
func NewStateTracker ¶
func NewStateTracker(speedLogCount int) *StateTracker
func NewStateTrackerFromSerializedState ¶
func NewStateTrackerFromSerializedState(speedLogCount int, serializedState *SerializableState) *StateTracker
serializedState is a state the tracker should start from, as opposed to starting from the beginning.
func (*StateTracker) EstimatedPaginationKeysPerSecond ¶
func (s *StateTracker) EstimatedPaginationKeysPerSecond() float64
This is reasonably accurate if the rows copied are distributed uniformly between paginationKey = 0 -> max(paginationKey). It would not be accurate if the distribution is concentrated in a particular region.
func (*StateTracker) IsTableComplete ¶
func (s *StateTracker) IsTableComplete(table string) bool
func (*StateTracker) LastSuccessfulPaginationKey ¶
func (s *StateTracker) LastSuccessfulPaginationKey(table string) uint64
func (*StateTracker) MarkTableAsCompleted ¶
func (s *StateTracker) MarkTableAsCompleted(table string)
func (*StateTracker) RowStatsWrittenPerTable ¶
func (s *StateTracker) RowStatsWrittenPerTable() map[string]RowStats
func (*StateTracker) Serialize ¶
func (s *StateTracker) Serialize(lastKnownTableSchemaCache TableSchemaCache, binlogVerifyStore *BinlogVerifyStore) *SerializableState
func (*StateTracker) UpdateLastResumableBinlogPositionForTargetVerifier ¶
func (s *StateTracker) UpdateLastResumableBinlogPositionForTargetVerifier(pos mysql.Position)
func (*StateTracker) UpdateLastResumableSourceBinlogPosition ¶
func (s *StateTracker) UpdateLastResumableSourceBinlogPosition(pos mysql.Position)
func (*StateTracker) UpdateLastResumableSourceBinlogPositionForInlineVerifier ¶
func (s *StateTracker) UpdateLastResumableSourceBinlogPositionForInlineVerifier(pos mysql.Position)
func (*StateTracker) UpdateLastSuccessfulPaginationKey ¶
func (s *StateTracker) UpdateLastSuccessfulPaginationKey(table string, paginationKey uint64, rowStats RowStats)
type StmtCache ¶
type StmtCache struct {
// contains filtered or unexported fields
}
func NewStmtCache ¶
func NewStmtCache() *StmtCache
type TLSConfig ¶
type TableColumnCompressionConfig ¶
TableColumnCompressionConfig represents compression configuration for a column in a table as table -> column -> compression-type ex: books -> contents -> snappy
type TableFilter ¶
type TableFilter interface { ApplicableTables([]*TableSchema) ([]*TableSchema, error) ApplicableDatabases([]string) ([]string, error) }
type TableIdentifier ¶
A comparable and lightweight type that stores the schema and table name.
func NewTableIdentifierFromSchemaTable ¶
func NewTableIdentifierFromSchemaTable(table *TableSchema) TableIdentifier
type TableMaxPaginationKey ¶
type TableMaxPaginationKey struct { Table *TableSchema MaxPaginationKey uint64 }
type TableProgress ¶
type TableSchema ¶
type TableSchema struct { *schema.Table CompressedColumnsForVerification map[string]string // Map of column name => compression type IgnoredColumnsForVerification map[string]struct{} // Set of column name ForcedIndexForVerification string // Forced index name PaginationKeyColumn *schema.TableColumn PaginationKeyIndex int // contains filtered or unexported fields }
This is a wrapper on schema.Table with some custom information we need.
func MaxPaginationKeys ¶
func MaxPaginationKeys(db *sql.DB, tables []*TableSchema, logger *logrus.Entry) (map[*TableSchema]uint64, []*TableSchema, error)
func (*TableSchema) FingerprintQuery ¶
func (t *TableSchema) FingerprintQuery(schemaName, tableName string, numRows int) string
This query returns the MD5 hash for a row on this table. This query is valid for both the source and the target shard.
Any compressed columns specified via CompressedColumnsForVerification are excluded in this checksum and the raw data is returned directly.
Any columns specified in IgnoredColumnsForVerification are excluded from the checksum and the raw data will not be returned.
Note that the MD5 hash should consists of at least 1 column: the paginationKey column. This is to say that there should never be a case where the MD5 hash is derived from an empty string.
func (*TableSchema) GetPaginationColumn ¶
func (t *TableSchema) GetPaginationColumn() *schema.TableColumn
GetPaginationColumn retrieves PaginationKeyColumn
func (*TableSchema) GetPaginationKeyIndex ¶
func (t *TableSchema) GetPaginationKeyIndex() int
func (*TableSchema) RowMd5Query ¶
func (t *TableSchema) RowMd5Query() string
type TableSchemaCache ¶
type TableSchemaCache map[string]*TableSchema
func LoadTables ¶
func LoadTables(db *sql.DB, tableFilter TableFilter, columnCompressionConfig ColumnCompressionConfig, columnIgnoreConfig ColumnIgnoreConfig, forceIndexConfig ForceIndexConfig, cascadingPaginationColumnConfig *CascadingPaginationColumnConfig) (TableSchemaCache, error)
func (TableSchemaCache) AllTableNames ¶
func (c TableSchemaCache) AllTableNames() (tableNames []string)
func (TableSchemaCache) AsSlice ¶
func (c TableSchemaCache) AsSlice() (tables []*TableSchema)
func (TableSchemaCache) Get ¶
func (c TableSchemaCache) Get(database, table string) *TableSchema
func (TableSchemaCache) GetTableListWithPriority ¶
func (c TableSchemaCache) GetTableListWithPriority(priorityList []string) (prioritizedTableNames []string)
Helper to sort a given map of tables with a second list giving a priority. If an element is present in the input and the priority lists, the item will appear first (in the order of the priority list), all other items appear in the order given in the input
type TargetVerifier ¶
type TargetVerifier struct { DB *sql.DB BinlogStreamer *BinlogStreamer StateTracker *StateTracker // contains filtered or unexported fields }
func NewTargetVerifier ¶
func NewTargetVerifier(targetDB *sql.DB, stateTracker *StateTracker, binlogStreamer *BinlogStreamer) (*TargetVerifier, error)
func (*TargetVerifier) BinlogEventListener ¶
func (t *TargetVerifier) BinlogEventListener(evs []DMLEvent) error
Verify that all DMLs against the target are coming from Ghostferry for the duration of the move. Once cutover has completed, we no longer need to perform this verification as all writes from the application are directed to the target
type ThrottlerBase ¶
type ThrottlerBase struct {
// contains filtered or unexported fields
}
func (*ThrottlerBase) Disabled ¶
func (t *ThrottlerBase) Disabled() bool
func (*ThrottlerBase) SetDisabled ¶
func (t *ThrottlerBase) SetDisabled(disabled bool)
type TimerMetric ¶
type TimerMetric struct { MetricBase Value time.Duration }
type UnsupportedCompressionError ¶
type UnsupportedCompressionError struct {
// contains filtered or unexported fields
}
UnsupportedCompressionError is used to identify errors resulting from attempting to decompress unsupported algorithms
func (UnsupportedCompressionError) Error ¶
func (e UnsupportedCompressionError) Error() string
type UpdatableConfig ¶
type UpdatableConfig struct { // The batch size used to iterate the data during data copy. This batch size // is always used: if this is specified to be 100, 100 rows will be copied // per iteration. // // With the current implementation of Ghostferry, we need to lock the rows // we select. This means, the larger this number is, the longer we need to // hold this lock. On the flip side, the smaller this number is, the slower // the copy will likely be. // // Optional: defaults to 200 DataIterationBatchSize uint64 }
UpdatableConfig defines config fields that support dynamic updates
func (*UpdatableConfig) Validate ¶
func (c *UpdatableConfig) Validate() error
type VerificationResult ¶
func NewCorrectVerificationResult ¶
func NewCorrectVerificationResult() VerificationResult
func (VerificationResult) Error ¶
func (e VerificationResult) Error() string
type VerificationResultAndStatus ¶
type VerificationResultAndStatus struct { VerificationResult StartTime time.Time DoneTime time.Time }
func (VerificationResultAndStatus) IsDone ¶
func (r VerificationResultAndStatus) IsDone() bool
func (VerificationResultAndStatus) IsStarted ¶
func (r VerificationResultAndStatus) IsStarted() bool
type Verifier ¶
type Verifier interface { // If the Verifier needs to do anything immediately after the DataIterator // finishes copying data and before cutover occurs, implement this function. VerifyBeforeCutover() error // This is called during cutover and should give the result of the // verification. VerifyDuringCutover() (VerificationResult, error) // Start the verifier in the background during the cutover phase. // Traditionally, this is called from within the ControlServer. // // This method maybe called multiple times and it's up to the verifier // to decide if it is possible to re-run the verification. StartInBackground() error // Wait for the verifier until it finishes verification after it was // started with the StartInBackground. // // A verification is "done" when it verified the dbs (either // correct or incorrect) OR when it experiences an error. Wait() // Returns arbitrary message that is consumed by the control server. // Can just be "". Message() string // Returns the result and the status of the verification. // To check the status, call IsStarted() and IsDone() on // VerificationResultAndStatus. // // If the verification has been completed successfully (without errors) and // the data checks out to be "correct", the result will be // VerificationResult{true, ""}, with error = nil. // Otherwise, the result will be VerificationResult{false, "message"}, with // error = nil. // // If the verification is "done" but experienced an error during the check, // the result will be VerificationResult{}, with err = yourErr. Result() (VerificationResultAndStatus, error) }
The sole purpose of this interface is to make it easier for one to implement their own strategy for verification and hook it up with the ControlServer. If there is no such need, one does not need to implement this interface.
type WaitUntilReplicaIsCaughtUpToMaster ¶
type WaitUntilReplicaIsCaughtUpToMaster struct { MasterDB *sql.DB ReplicatedMasterPositionFetcher ReplicatedMasterPositionFetcher Timeout time.Duration ReplicaDB *sql.DB // contains filtered or unexported fields }
Only set the MasterDB and ReplicatedMasterPosition options in your code as the others will be overwritten by the ferry.
func (*WaitUntilReplicaIsCaughtUpToMaster) IsCaughtUp ¶
func (*WaitUntilReplicaIsCaughtUpToMaster) Wait ¶
func (w *WaitUntilReplicaIsCaughtUpToMaster) Wait() error
type WorkerPool ¶
func (*WorkerPool) Run ¶
func (p *WorkerPool) Run(n int) ([]interface{}, error)
Returns a list of results of the size same as the concurrency number. Returns the first error that occurs during the run. Also as soon as a single worker errors, all workers terminates.
Source Files ¶
- batch_writer.go
- binlog_streamer.go
- binlog_writer.go
- compression_verifier.go
- config.go
- control_server.go
- cursor.go
- data_iterator.go
- data_iterator_sorter.go
- dml_events.go
- error_handler.go
- ferry.go
- filter.go
- http_callback.go
- inline_verifier.go
- iterative_verifier.go
- metrics.go
- progress.go
- row_batch.go
- state_tracker.go
- table_schema_cache.go
- target_verifier.go
- throttler.go
- utils.go
- verifier.go
- wait_until_replica_is_caught_up_to_master.go