Documentation ¶
Index ¶
- Constants
- Variables
- func CalculateLSN(file string, pos uint32) uint64
- func CastRowsToDT(event *RowsEvent, location *time.Location) error
- func CastToMySQL(val interface{}, typ abstract.ColSchema) string
- func CheckMySQLBinlogRowImageFormat(source *MysqlSource) error
- func CheckMySQLVersion(storage *Storage) (string, string)
- func ClearOriginalType(colSchema abstract.ColSchema) string
- func Connect(params *ConnectionParams, configAction func(config *mysql.Config) error) (*sql.DB, error)
- func CopySchema(source *MysqlSource, steps *MysqlDumpSteps, pusher abstract.Pusher) error
- func CreateCertPool(certPEMFile string, rootCAFiles []string) (*x509.CertPool, error)
- func GetLogFilePosition(storage *Storage) (string, uint32, string, error)
- func GetTableDDLs(source *MysqlSource, connection *sqlx.DB, databases []string) ([]ddlValue, []ddlValue, error)
- func GetTriggerDDLs(source *MysqlSource, connection *sqlx.DB, databases []string) ([]ddlValue, error)
- func GetViewDDLs(source *MysqlSource, connection *sqlx.DB, databases []string) ([]ddlValue, error)
- func IsErrorCode(err error, errNumber uint16) bool
- func IsErrorCodes(err error, codes map[int]bool) bool
- func IsGtidModeEnabled(storage *Storage, flavor string) (bool, error)
- func LoadMysqlSchema(transfer *server.Transfer, registry metrics.Registry, isAfter bool) error
- func LoadSchema(tx queryExecutor, useFakePrimaryKey bool, includeViews bool) (abstract.DBSchema, error)
- func LoadTableConstraints(tx queryExecutor, table abstract.TableID) (map[string][]string, error)
- func MakeArrBacktickedColumnNames(tableSchema *[]abstract.ColSchema) []string
- func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, ...) providers.Provider
- func NewSinker(lgr log.Logger, cfg *MysqlDestination, mtrcs metrics.Registry) (abstract.Sinker, error)
- func NewSource(src *MysqlSource, transferID string, objects *server.DataObjects, ...) (abstract.Source, error)
- func OrderByPrimaryKeys(tableSchema []abstract.ColSchema, direction string) (string, error)
- func RemoveTracker(src *MysqlSource, id string, cp coordinator.Coordinator) error
- func ResolveHosts(config *MysqlStorageParams) ([]*connection.Host, error)
- func RestoreStringAndBytes(event *RowsEvent) error
- func SyncBinlogPosition(src *MysqlSource, id string, cp coordinator.Coordinator) error
- func TypeToMySQL(column abstract.ColSchema) string
- func TypeToYt(rawColumnType string) schema.Type
- func Validate(event *RowsEvent) error
- type Canal
- func (c *Canal) CatchMasterPos(timeout time.Duration) error
- func (c *Canal) CheckBinlogRowImage(image string) error
- func (c *Canal) ClearTableCache(db []byte, table []byte)
- func (c *Canal) Close()
- func (c *Canal) Ctx() context.Context
- func (c *Canal) Execute(cmd string, args ...interface{}) (*mysql.Result, error)
- func (c *Canal) FlushBinlog() error
- func (c *Canal) GetDelay() uint32
- func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error)
- func (c *Canal) GetMasterPos() (mysql.Position, error)
- func (c *Canal) GetTable(db string, table string) (*schema.Table, error)
- func (c *Canal) Run() error
- func (c *Canal) RunFrom(pos mysql.Position) error
- func (c *Canal) SetEventHandler(h EventHandler)
- func (c *Canal) StartFromGTID(set mysql.GTIDSet) error
- func (c *Canal) SyncedGTIDSet() mysql.GTIDSet
- func (c *Canal) SyncedPosition() mysql.Position
- func (c *Canal) SyncedTimestamp() uint32
- func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error
- type Config
- type ConnectionParams
- type CreateFunctionRow
- type CreateProcedureRow
- type CreateTableRow
- type CreateTriggerRow
- type CreateViewRow
- type DummyEventHandler
- func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
- func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error
- func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error
- func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error
- func (h *DummyEventHandler) OnRow(*RowsEvent) error
- func (h *DummyEventHandler) OnTableChanged(schema string, table string) error
- func (h *DummyEventHandler) OnXID(mysql.Position) error
- func (h *DummyEventHandler) String() string
- type EmptyProgress
- type EventHandler
- type LsnProgress
- type MysqlDestination
- func (d *MysqlDestination) BuffererConfig() bufferer.BuffererConfig
- func (d *MysqlDestination) CleanupMode() server.CleanupType
- func (d *MysqlDestination) GetConnectionID() string
- func (d *MysqlDestination) GetProviderType() abstract.ProviderType
- func (d *MysqlDestination) HasTLS() bool
- func (MysqlDestination) IsDestination()
- func (d *MysqlDestination) MDBClusterID() string
- func (d *MysqlDestination) PostSnapshotHacks()
- func (d *MysqlDestination) PreSnapshotHacks()
- func (d *MysqlDestination) ReliesOnSystemTablesTransferring() bool
- func (d *MysqlDestination) ToStorageParams() *MysqlStorageParams
- func (d *MysqlDestination) Transformer() map[string]string
- func (d *MysqlDestination) Validate() error
- func (d *MysqlDestination) WithDefaults()
- type MysqlDumpSteps
- type MysqlFlavorType
- type MysqlSource
- func (s *MysqlSource) AllIncludes() []string
- func (s *MysqlSource) FulfilledIncludes(tID abstract.TableID) (result []string)
- func (s *MysqlSource) GetConnectionID() string
- func (s *MysqlSource) GetProviderType() abstract.ProviderType
- func (s *MysqlSource) HasTLS() bool
- func (s *MysqlSource) Include(tID abstract.TableID) bool
- func (s *MysqlSource) InitServerID(transferID string)
- func (MysqlSource) IsSource()
- func (MysqlSource) IsStrictSource()
- func (s *MysqlSource) MDBClusterID() string
- func (s *MysqlSource) ToStorageParams() *MysqlStorageParams
- func (s *MysqlSource) Validate() error
- func (s *MysqlSource) WithDefaults()
- type MysqlStorageParams
- type MysqlTrackerStorage
- type NotMasterError
- type Provider
- func (p *Provider) Activate(ctx context.Context, task *server.TransferOperation, tables abstract.TableMap, ...) error
- func (p *Provider) Cleanup(ctx context.Context, task *server.TransferOperation) error
- func (p *Provider) Deactivate(ctx context.Context, task *server.TransferOperation) error
- func (p *Provider) DestinationSampleableStorage() (abstract.SampleableStorage, error)
- func (p *Provider) Sink(middlewares.Config) (abstract.Sinker, error)
- func (p *Provider) Source() (abstract.Source, error)
- func (p *Provider) SourceSampleableStorage() (abstract.SampleableStorage, []abstract.TableDescription, error)
- func (p *Provider) Storage() (abstract.Storage, error)
- func (p *Provider) Type() abstract.ProviderType
- func (p *Provider) Update(ctx context.Context, addedTables []abstract.TableDescription) error
- type Queryable
- type RoutineRow
- type RowsEvent
- func (r *RowsEvent) GetColumnCollation(i int) string
- func (r *RowsEvent) GetColumnEnumValue(columnIndex int, enumValueIndex int64) string
- func (r *RowsEvent) GetColumnName(i int) string
- func (r *RowsEvent) GetColumnRawType(i int) string
- func (r *RowsEvent) GetColumnSetValue(columnIndex int, setValueIndex int) string
- func (r *RowsEvent) IsAllColumnsPresent1() bool
- func (r *RowsEvent) IsAllColumnsPresent2() bool
- func (r *RowsEvent) IsColumnPresent1(i uint64) bool
- func (r *RowsEvent) IsColumnPresent2(i uint64) bool
- func (r *RowsEvent) Nullable(i uint64) bool
- func (r *RowsEvent) String() string
- type StatementID
- type Status
- type Storage
- func (s *Storage) BeginSnapshot(ctx context.Context) error
- func (s *Storage) Close()
- func (s *Storage) DatabaseSchema() string
- func (s *Storage) EndSnapshot(context.Context) error
- func (s *Storage) EstimateTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) ExactTableRowsCount(table abstract.TableID) (uint64, error)
- func (s *Storage) ListViews() ([]abstract.TableID, error)
- func (s *Storage) LoadRandomSample(table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) LoadSampleBySet(table abstract.TableDescription, keySet []map[string]interface{}, ...) error
- func (s *Storage) LoadSchema() (schema abstract.DBSchema, err error)
- func (s *Storage) LoadTable(ctx context.Context, table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) LoadTopBottomSample(table abstract.TableDescription, pusher abstract.Pusher) error
- func (s *Storage) Ping() error
- func (s *Storage) Position(ctx context.Context) (*abstract.LogPosition, error)
- func (s *Storage) TableAccessible(table abstract.TableDescription) bool
- func (s *Storage) TableExists(table abstract.TableID) (bool, error)
- func (s *Storage) TableList(includeTableFilter abstract.IncludeTableList) (abstract.TableMap, error)
- func (s *Storage) TableSchema(ctx context.Context, table abstract.TableID) (*abstract.TableSchema, error)
- func (s *Storage) TableSizeInBytes(table abstract.TableID) (uint64, error)
- type TableName
- type TableProgress
- type TableStatus
- type TemplateCol
- type TemplateModel
- type Tracker
- func (n *Tracker) Close() error
- func (n *Tracker) Get() (file string, pos uint32, err error)
- func (n *Tracker) GetGtidset() (mysql.GTIDSet, error)
- func (n *Tracker) Remove() error
- func (n *Tracker) RemoveGtidset() error
- func (n *Tracker) Store(file string, pos uint32) error
- func (n *Tracker) StoreGtidset(gtidset mysql.GTIDSet) error
- type TriggerRow
Constants ¶
View Source
const ( MysqlFlavorTypeMysql = "mysql" MysqlFlavorTypeMariaDB = "mariadb" )
View Source
const ( TableTransferProgress = "__table_transfer_progress" TableTmGtidKeeper = "__tm_gtid_keeper" TableTmKeeper = "__tm_keeper" )
View Source
const ( DisableFKQuery = "SET FOREIGN_KEY_CHECKS=0;\n" EnableFKQuery = "SET FOREIGN_KEY_CHECKS=1;\n" )
View Source
const ( UpdateAction = "update" InsertAction = "insert" DeleteAction = "delete" )
The action name for sync.
View Source
const ( ErrCodeDuplicateKey = 1062 ErrCodeSyntax = 1064 ErrCodeLockTimeout = 1205 ErrCodeDeadlock = 1213 )
View Source
const ( SnapshotWait = Status("SnapshotWait") SyncWait = Status("SyncWait") InSync = Status("InSync") )
View Source
const ProviderType = abstract.ProviderType("mysql")
Variables ¶
View Source
var ( UnknownTableRetryPeriod = time.Second * time.Duration(10) ErrExcludedTable = xerrors.New("table is excluded") )
canal will retry fetching unknown table's meta after UnknownTableRetryPeriod
View Source
var ( CodeSyntax = coded.Register("mysql", "incorrect_syntax") CodeDeadlock = coded.Register("mysql", "deadlock") )
View Source
var AlreadyExistsCodes = map[int]bool{ 1050: true, 1304: true, 1359: true, 1826: true, }
View Source
var NotEnoughProcPermissions = "" /* 186-byte string literal not displayed */
View Source
var TableOrViewNotExistsCode = uint16(1146)
Functions ¶
func CalculateLSN ¶
func CastToMySQL ¶
func CheckMySQLBinlogRowImageFormat ¶
func CheckMySQLBinlogRowImageFormat(source *MysqlSource) error
func CheckMySQLVersion ¶
func ClearOriginalType ¶
func CopySchema ¶
func CopySchema(source *MysqlSource, steps *MysqlDumpSteps, pusher abstract.Pusher) error
func CreateCertPool ¶
func GetTableDDLs ¶
func GetTableDDLs(source *MysqlSource, connection *sqlx.DB, databases []string) ([]ddlValue, []ddlValue, error)
func GetTriggerDDLs ¶
func GetTriggerDDLs(source *MysqlSource, connection *sqlx.DB, databases []string) ([]ddlValue, error)
func GetViewDDLs ¶
func GetViewDDLs(source *MysqlSource, connection *sqlx.DB, databases []string) ([]ddlValue, error)
func IsErrorCode ¶
func LoadMysqlSchema ¶
func LoadSchema ¶
func LoadTableConstraints ¶
func New ¶
func New(lgr log.Logger, registry metrics.Registry, cp coordinator.Coordinator, transfer *server.Transfer) providers.Provider
func NewSource ¶
func NewSource(src *MysqlSource, transferID string, objects *server.DataObjects, logger log.Logger, registry metrics.Registry, cp coordinator.Coordinator, failOnDecimal bool) (abstract.Source, error)
func OrderByPrimaryKeys ¶
func RemoveTracker ¶
func RemoveTracker(src *MysqlSource, id string, cp coordinator.Coordinator) error
func ResolveHosts ¶
func ResolveHosts(config *MysqlStorageParams) ([]*connection.Host, error)
func RestoreStringAndBytes ¶
func SyncBinlogPosition ¶
func SyncBinlogPosition(src *MysqlSource, id string, cp coordinator.Coordinator) error
func TypeToMySQL ¶
Types ¶
type Canal ¶
type Canal struct {
// contains filtered or unexported fields
}
Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... MySQL must open row format for binlog
func (*Canal) CheckBinlogRowImage ¶
CheckBinlogRowImage checks MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (*Canal) ClearTableCache ¶
ClearTableCache clear table cache
func (*Canal) FlushBinlog ¶
func (*Canal) Run ¶
Run sync from the binlog position in the data. It will run forever until meeting an error or Canal closed.
func (*Canal) SetEventHandler ¶
func (c *Canal) SetEventHandler(h EventHandler)
`SetEventHandler` registers the sync handler, you must register your own handler before starting Canal.
func (*Canal) SyncedGTIDSet ¶
func (*Canal) SyncedPosition ¶
func (*Canal) SyncedTimestamp ¶
type Config ¶
type Config struct { Addr string User string Password string Charset string ServerID uint32 Flavor string HeartbeatPeriod time.Duration ReadTimeout time.Duration // discard row event without table meta DiscardNoMetaRowEvent bool UseDecimal bool FailOnDecimal bool ParseTime bool TimestampStringLocation *time.Location // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool // Set to change the maximum number of attempts to re-establish a broken // connection MaxReconnectAttempts int TLSConfig *tls.Config Include func(db string, table string) bool }
TODO: remove
type ConnectionParams ¶
type ConnectionParams struct { Host string Port int User string Password string Database string TLS bool CertPEMFile string Location *time.Location RootCAFiles []string ClusterID string }
func NewConnectionParams ¶
func NewConnectionParams(config *MysqlStorageParams) (*ConnectionParams, error)
func (*ConnectionParams) ResolveLocation ¶
func (params *ConnectionParams) ResolveLocation(locationStr string) error
type CreateFunctionRow ¶
type CreateProcedureRow ¶
type CreateTableRow ¶
type CreateTriggerRow ¶
type CreateTriggerRow struct { Name string `db:"Trigger"` Mode string `db:"sql_mode"` DDL string `db:"SQL Original Statement"` CharacterSetClient string `db:"character_set_client"` ConnectionCollation string `db:"collation_connection"` DBCollation string `db:"Database Collation"` Created string `db:"Created"` }
type CreateViewRow ¶
type DummyEventHandler ¶
type DummyEventHandler struct { }
func (*DummyEventHandler) OnDDL ¶
func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
func (*DummyEventHandler) OnPosSynced ¶
func (*DummyEventHandler) OnRotate ¶
func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error
func (*DummyEventHandler) OnRow ¶
func (h *DummyEventHandler) OnRow(*RowsEvent) error
func (*DummyEventHandler) OnTableChanged ¶
func (h *DummyEventHandler) OnTableChanged(schema string, table string) error
func (*DummyEventHandler) String ¶
func (h *DummyEventHandler) String() string
type EmptyProgress ¶
type EmptyProgress struct{}
func (*EmptyProgress) BuildLSNQuery ¶
func (t *EmptyProgress) BuildLSNQuery(name string, lsn uint64, status Status) string
func (*EmptyProgress) GetCurrentState ¶
func (t *EmptyProgress) GetCurrentState() (map[string]TableStatus, error)
type EventHandler ¶
type EventHandler interface { OnRotate(roateEvent *replication.RotateEvent) error // OnTableChanged is called when the table is created, altered, renamed or dropped. // You need to clear the associated data like cache with the table. // It will be called before OnDDL. OnTableChanged(schema string, table string) error OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error OnRow(event *RowsEvent) error OnXID(nextPos mysql.Position) error OnGTID(gtid mysql.GTIDSet) error // OnPosSynced Use your own way to sync position. When force is true, sync position immediately. OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error String() string }
type LsnProgress ¶
type LsnProgress interface { GetCurrentState() (map[string]TableStatus, error) BuildLSNQuery(name string, lsn uint64, status Status) string }
func NewEmptyProgress ¶
func NewEmptyProgress() (LsnProgress, error)
func NewTableProgressTracker ¶
func NewTableProgressTracker(db *sql.DB, dbName string) (LsnProgress, error)
type MysqlDestination ¶
type MysqlDestination struct { AllowReplace bool Cleanup server.CleanupType ClusterID string Database string DisableParallelWrite map[TableName]bool Host string IsPublic bool MaintainTables bool MaxParralelWriters int64 Password server.SecretString PerTransactionPush bool Port int ProgressTrackerDB string SecurityGroupIDs []string SkipKeyChecks bool SQLMode string SubNetworkID string Timezone string TLSFile string TransformerConfig map[string]string User string BufferTriggingSize uint64 BufferTriggingInterval time.Duration RootCAFiles []string ConnectionID string // contains filtered or unexported fields }
func (*MysqlDestination) BuffererConfig ¶
func (d *MysqlDestination) BuffererConfig() bufferer.BuffererConfig
func (*MysqlDestination) CleanupMode ¶
func (d *MysqlDestination) CleanupMode() server.CleanupType
func (*MysqlDestination) GetConnectionID ¶
func (d *MysqlDestination) GetConnectionID() string
func (*MysqlDestination) GetProviderType ¶
func (d *MysqlDestination) GetProviderType() abstract.ProviderType
func (*MysqlDestination) HasTLS ¶
func (d *MysqlDestination) HasTLS() bool
func (MysqlDestination) IsDestination ¶
func (MysqlDestination) IsDestination()
func (*MysqlDestination) MDBClusterID ¶
func (d *MysqlDestination) MDBClusterID() string
func (*MysqlDestination) PostSnapshotHacks ¶
func (d *MysqlDestination) PostSnapshotHacks()
func (*MysqlDestination) PreSnapshotHacks ¶
func (d *MysqlDestination) PreSnapshotHacks()
func (*MysqlDestination) ReliesOnSystemTablesTransferring ¶
func (d *MysqlDestination) ReliesOnSystemTablesTransferring() bool
func (*MysqlDestination) ToStorageParams ¶
func (d *MysqlDestination) ToStorageParams() *MysqlStorageParams
func (*MysqlDestination) Transformer ¶
func (d *MysqlDestination) Transformer() map[string]string
func (*MysqlDestination) Validate ¶
func (d *MysqlDestination) Validate() error
func (*MysqlDestination) WithDefaults ¶
func (d *MysqlDestination) WithDefaults()
type MysqlDumpSteps ¶
func DefaultMysqlDumpPostSteps ¶
func DefaultMysqlDumpPostSteps() *MysqlDumpSteps
func DefaultMysqlDumpPreSteps ¶
func DefaultMysqlDumpPreSteps() *MysqlDumpSteps
type MysqlFlavorType ¶
type MysqlFlavorType string
type MysqlSource ¶
type MysqlSource struct { Host string User string Password server.SecretString ClusterID string ServerID uint32 IncludeTableRegex []string ExcludeTableRegex []string IsPublic bool Database string TLSFile string SubNetworkID string SecurityGroupIDs []string Port int Timezone string BufferLimit uint32 UseFakePrimaryKey bool IsHomo bool `json:"-"` PreSteps *MysqlDumpSteps PostSteps *MysqlDumpSteps TrackerDatabase string ConsistentSnapshot bool SnapshotDegreeOfParallelism int AllowDecimalAsFloat bool NoTracking bool // deprecated: use Tracker YtTracking bool // deprecated: use Tracker YdbTracking bool // deprecated: use Tracker Tracker MysqlTrackerStorage // deprecated: we only have one tracker now PlzNoHomo bool // forcefully disable homo features, mostly for tests RootCAFiles []string ConnectionID string }
func (*MysqlSource) AllIncludes ¶
func (s *MysqlSource) AllIncludes() []string
func (*MysqlSource) FulfilledIncludes ¶
func (s *MysqlSource) FulfilledIncludes(tID abstract.TableID) (result []string)
func (*MysqlSource) GetConnectionID ¶
func (s *MysqlSource) GetConnectionID() string
func (*MysqlSource) GetProviderType ¶
func (s *MysqlSource) GetProviderType() abstract.ProviderType
func (*MysqlSource) HasTLS ¶
func (s *MysqlSource) HasTLS() bool
func (*MysqlSource) InitServerID ¶
func (s *MysqlSource) InitServerID(transferID string)
func (MysqlSource) IsSource ¶
func (MysqlSource) IsSource()
func (MysqlSource) IsStrictSource ¶
func (MysqlSource) IsStrictSource()
func (*MysqlSource) MDBClusterID ¶
func (s *MysqlSource) MDBClusterID() string
func (*MysqlSource) ToStorageParams ¶
func (s *MysqlSource) ToStorageParams() *MysqlStorageParams
func (*MysqlSource) Validate ¶
func (s *MysqlSource) Validate() error
func (*MysqlSource) WithDefaults ¶
func (s *MysqlSource) WithDefaults()
type MysqlStorageParams ¶
type MysqlStorageParams struct { ClusterID string Host string Port int User string Password string Database string TLS bool CertPEMFile string UseFakePrimaryKey bool DegreeOfParallelism int Timezone string TableFilter abstract.Includeable PreSteps *MysqlDumpSteps ConsistentSnapshot bool RootCAFiles []string ConnectionID string }
type MysqlTrackerStorage ¶
type MysqlTrackerStorage string
type NotMasterError ¶
type NotMasterError struct {
// contains filtered or unexported fields
}
func (NotMasterError) Error ¶
func (e NotMasterError) Error() string
func (NotMasterError) Is ¶
func (e NotMasterError) Is(err error) bool
type Provider ¶
type Provider struct {
// contains filtered or unexported fields
}
func (*Provider) Activate ¶
func (p *Provider) Activate(ctx context.Context, task *server.TransferOperation, tables abstract.TableMap, callbacks providers.ActivateCallbacks) error
func (*Provider) Deactivate ¶
func (*Provider) DestinationSampleableStorage ¶
func (p *Provider) DestinationSampleableStorage() (abstract.SampleableStorage, error)
func (*Provider) SourceSampleableStorage ¶
func (p *Provider) SourceSampleableStorage() (abstract.SampleableStorage, []abstract.TableDescription, error)
func (*Provider) Type ¶
func (p *Provider) Type() abstract.ProviderType
type RoutineRow ¶
type RowsEvent ¶
type RowsEvent struct { Table *schema.Table Action string Header *replication.EventHeader // binlog has three update event version, v0, v1 and v2. // for v1 and v2, the rows number must be even. // Two rows for one event, format is [before update row, after update row] // for update v0, only one row for a event, and we don't support this version. Data *replication.RowsEvent // rows query is defined if option binlog_rows_query_log_events is enabled Query string }
RowsEvent is the event for row replication.
func (*RowsEvent) GetColumnCollation ¶
func (*RowsEvent) GetColumnEnumValue ¶
func (*RowsEvent) GetColumnName ¶
func (*RowsEvent) GetColumnRawType ¶
func (*RowsEvent) GetColumnSetValue ¶
func (*RowsEvent) IsAllColumnsPresent1 ¶
func (*RowsEvent) IsAllColumnsPresent2 ¶
func (*RowsEvent) IsColumnPresent1 ¶
func (*RowsEvent) IsColumnPresent2 ¶
type StatementID ¶
type Storage ¶
type Storage struct { ConnectionParams *ConnectionParams IsHomo bool DB *sql.DB // contains filtered or unexported fields }
func NewStorage ¶
func NewStorage(config *MysqlStorageParams) (*Storage, error)
func (*Storage) DatabaseSchema ¶
func (*Storage) EstimateTableRowsCount ¶
func (*Storage) ExactTableRowsCount ¶
func (*Storage) LoadRandomSample ¶
func (*Storage) LoadSampleBySet ¶
func (*Storage) LoadTopBottomSample ¶
func (*Storage) TableAccessible ¶
func (s *Storage) TableAccessible(table abstract.TableDescription) bool
func (*Storage) TableSchema ¶
type TableProgress ¶
type TableProgress struct {
// contains filtered or unexported fields
}
func (*TableProgress) BuildLSNQuery ¶
func (t *TableProgress) BuildLSNQuery(name string, lsn uint64, status Status) string
func (*TableProgress) GetCurrentState ¶
func (t *TableProgress) GetCurrentState() (map[string]TableStatus, error)
type TableStatus ¶
type TemplateCol ¶
type TemplateCol struct {
Name, Typ, Comma string
}
type TemplateModel ¶
type TemplateModel struct { Cols []TemplateCol Keys []TemplateCol Table string }
type Tracker ¶
type Tracker struct {
// contains filtered or unexported fields
}
func NewTracker ¶
func NewTracker(src *MysqlSource, transferID string, cp coordinator.Coordinator) (result *Tracker, err error)
func (*Tracker) RemoveGtidset ¶
type TriggerRow ¶
Source Files ¶
- canal.go
- cast.go
- cast_replication.go
- config.go
- connection.go
- error.go
- expr.go
- handler.go
- master.go
- model_destination.go
- model_source.go
- model_storage_params.go
- mysql_connection_params.go
- provider.go
- queries_builder.go
- rows.go
- sampleable_storage.go
- schema.go
- schema_copy.go
- schematized_rows.go
- sink.go
- source.go
- storage.go
- sync.go
- sync_binlog_position.go
- table_progress.go
- tasks.go
- tracker.go
- typesystem.go
- utils.go
Click to show internal directories.
Click to hide internal directories.