Documentation ¶
Index ¶
- Constants
- Variables
- func Compress(bs []byte) (outBs []byte, err error)
- func Decode(data []byte, out GencodeType) (err error)
- func DtleParseMysqlGTIDSet(gtidSetStr string) (*mysql.MysqlGTIDSet, error)
- func Encode(v GencodeType) ([]byte, error)
- func EncodeTable(v *Table) ([]byte, error)
- func GetFieldValue(fieldName string) interface{}
- func GetGtidFromConsul(sm *StoreManager, subject string, logger g.LoggerType, ...) error
- func IgnoreDbByReplicateIgnoreDb(replicateIgnoreDb []*DataSource, dbName string) bool
- func IgnoreTbByReplicateIgnoreDb(replicateIgnoreDb []*DataSource, dbName, tbName string) bool
- func MysqlVersionInDigit(v string) (int, error)
- func RegularlyUpdateJobStatus(store *StoreManager, shutdownCh chan struct{}, jobId string)
- func RowColumnIsNull(row []interface{}, index int) bool
- func RowGetBytesColumn(row []interface{}, index int) []byte
- func SetField(fieldName string, fieldValue interface{})
- func TaskTypeFromString(s string) string
- func ToEventDML(eventType replication.EventType) int8
- func UpdateGtidSet(gtidSet *mysql.MysqlGTIDSet, sid uuid.UUID, txGno int64)
- func ValidateJobName(name string) error
- func WriteWaitCh(ch chan<- *drivers.ExitResult, r *drivers.ExitResult)
- type ApplierTableItem
- type BigTxAck
- type BufferStat
- type ColumnList
- func (c *ColumnList) ColumnList() []mysqlconfig.Column
- func (c *ColumnList) GetCharset(columnName string) string
- func (c *ColumnList) GetColumn(columnName string) *mysqlconfig.Column
- func (c *ColumnList) GetColumnType(columnName string) mysqlconfig.ColumnType
- func (c *ColumnList) HasTimezoneConversion(columnName string) bool
- func (c *ColumnList) IsSubsetOf(other *ColumnList) bool
- func (c *ColumnList) IsUnsigned(columnName string) bool
- func (c *ColumnList) Len() int
- func (c *ColumnList) Names() []string
- func (c *ColumnList) SetCharset(columnName string, charset string)
- func (c *ColumnList) SetColumnType(columnName string, columnType mysqlconfig.ColumnType)
- func (c *ColumnList) SetConvertDatetimeToTimestamp(columnName string, toTimezone string)
- func (c *ColumnList) SetUnsigned(columnName string)
- func (c *ColumnList) String() string
- type ControlMsg
- type CoordinatesI
- type CurrentCoordinates
- type DataEntries
- type DataEntry
- func (b *DataEntry) HasDDL() bool
- func (b *DataEntry) IsOneStmtDDL() bool
- func (b *DataEntry) IsPartOfBigTx() bool
- func (d *DataEntry) Marshal(buf []byte) ([]byte, error)
- func (d *DataEntry) Size() (s uint64)
- func (b *DataEntry) String() string
- func (d *DataEntry) Unmarshal(buf []byte) (uint64, error)
- type DataEvent
- func NewDataEvent(databaseName, tableName string, dml int8, columnCount uint64, timestamp uint32) *DataEvent
- func NewQueryEvent(currentSchema, query string, dml int8, timestamp uint32, flags []byte) DataEvent
- func NewQueryEventAffectTable(currentSchema, query string, dml int8, affectedTable SchemaTable, ...) DataEvent
- type DataSource
- type DelayCount
- type DtleTaskConfig
- type DumpCoordinates
- type DumpEntry
- type DumpStatResult
- type Dumper
- type EntryContext
- type ExecContext
- type GencodeType
- type GetChunkDataFn
- type JobListItemV2
- type JobStep
- type KafkaConfig
- type MemoryStat
- type MySQLCoordinateTx
- func (b *MySQLCoordinateTx) GetGNO() int64
- func (b *MySQLCoordinateTx) GetGtidForThisTx() string
- func (b *MySQLCoordinateTx) GetLastCommit() int64
- func (b *MySQLCoordinateTx) GetLogFile() string
- func (b *MySQLCoordinateTx) GetLogPos() int64
- func (b *MySQLCoordinateTx) GetSequenceNumber() int64
- func (b *MySQLCoordinateTx) GetSid() interface{}
- func (b *MySQLCoordinateTx) GetSidStr() string
- func (d *MySQLCoordinateTx) Marshal(buf []byte) ([]byte, error)
- func (d *MySQLCoordinateTx) Size() (s uint64)
- func (d *MySQLCoordinateTx) Unmarshal(buf []byte) (uint64, error)
- type MySQLCoordinates
- func (b *MySQLCoordinates) CompareFilePos(other *MySQLCoordinates) int
- func (b *MySQLCoordinates) GetLogFile() string
- func (b *MySQLCoordinates) GetLogPos() int64
- func (b *MySQLCoordinates) GetTxSet() string
- func (b *MySQLCoordinates) IsEmpty() bool
- func (d *MySQLCoordinates) Marshal(buf []byte) ([]byte, error)
- func (d *MySQLCoordinates) Size() (s uint64)
- func (b MySQLCoordinates) String() string
- func (d *MySQLCoordinates) Unmarshal(buf []byte) (uint64, error)
- type MySQLDriverConfig
- type NatsMsgMerger
- type OracleCoordinateTx
- func (o *OracleCoordinateTx) GetGNO() int64
- func (o *OracleCoordinateTx) GetGtidForThisTx() string
- func (o *OracleCoordinateTx) GetLastCommit() int64
- func (o *OracleCoordinateTx) GetLogFile() string
- func (o *OracleCoordinateTx) GetLogPos() int64
- func (b *OracleCoordinateTx) GetSequenceNumber() int64
- func (o *OracleCoordinateTx) GetSid() interface{}
- func (o *OracleCoordinateTx) GetSidStr() string
- func (d *OracleCoordinateTx) Marshal(buf []byte) ([]byte, error)
- func (d *OracleCoordinateTx) Size() (s uint64)
- func (d *OracleCoordinateTx) Unmarshal(buf []byte) (uint64, error)
- type OracleCoordinates
- func (b *OracleCoordinates) GetLogFile() string
- func (b *OracleCoordinates) GetLogPos() int64
- func (b *OracleCoordinates) GetTxSet() string
- func (d *OracleCoordinates) Marshal(buf []byte) ([]byte, error)
- func (d *OracleCoordinates) Size() (s uint64)
- func (d *OracleCoordinates) Unmarshal(buf []byte) (uint64, error)
- type PrepareFn
- type QueryCount
- type QueryEventFlags
- type Role
- type SchemaContext
- type SchemaTable
- type StoreManager
- func (sm *StoreManager) CheckJobExists(jobId string) bool
- func (sm *StoreManager) DeleteRole(tenant, name string) error
- func (sm *StoreManager) DeleteUser(tenant, user string) error
- func (sm *StoreManager) DestroyJob(jobId string) error
- func (sm *StoreManager) DstPutNats(jobName string, natsAddr string, stopCh chan struct{}, ...) error
- func (sm *StoreManager) FindJobList() (map[string]*JobListItemV2, error)
- func (sm *StoreManager) FindRoleList(tenant string) ([]*Role, error)
- func (sm *StoreManager) FindTenantList() (tenants []string, err error)
- func (sm *StoreManager) FindUserList(userKey string) ([]*User, error)
- func (sm *StoreManager) GetBinlogFilePosForJob(jobName string) (*mysql.Position, error)
- func (sm *StoreManager) GetConfig(jobName string) (*MySQLDriverConfig, error)
- func (sm *StoreManager) GetDumpProgress(jobName string) (int64, int64, error)
- func (sm *StoreManager) GetGtidForJob(jobName string) (string, error)
- func (sm *StoreManager) GetJobInfo(jobId string) (*JobListItemV2, error)
- func (sm *StoreManager) GetJobStage(jobName string) (string, error)
- func (sm *StoreManager) GetJobStatus(jobId string) (string, error)
- func (sm *StoreManager) GetNatsIfExist(jobName string) (string, bool, error)
- func (sm *StoreManager) GetOracleSCNPosForJob(jobName string) (oldestUncommittedScn int64, committedSCN int64, err error)
- func (sm *StoreManager) GetRole(tenant, name string) (*Role, bool, error)
- func (sm *StoreManager) GetTargetGtid(subject string) (string, error)
- func (sm *StoreManager) GetUser(tenant, username string) (*User, bool, error)
- func (sm *StoreManager) PutConfig(subject string, config *MySQLDriverConfig) error
- func (sm *StoreManager) PutDumpProgress(jobName string, exec int64, total int64) error
- func (sm *StoreManager) PutJobStage(jobName string, stage string) error
- func (sm *StoreManager) PutTargetGtid(subject string, value string) error
- func (sm *StoreManager) SaveBinlogFilePosForJob(jobName string, file string, pos int) error
- func (sm *StoreManager) SaveGtidForJob(jobName string, gtid string) error
- func (sm *StoreManager) SaveJobInfo(job JobListItemV2) error
- func (sm *StoreManager) SaveOracleSCNPos(jobName string, oldestUncommittedScn, committedSCN int64) error
- func (sm *StoreManager) SaveRole(role *Role) error
- func (sm *StoreManager) SaveUser(user *User) error
- func (sm *StoreManager) SrcWatchNats(jobName string, stopCh chan struct{}, onWatchError func(error)) (natsAddr string, err error)
- func (sm *StoreManager) WaitKv(subject string, key string, stopCh chan struct{}) ([]byte, error)
- func (sm *StoreManager) WaitOnJob(currentJob string, waitJob string, stopCh chan struct{}) error
- func (sm *StoreManager) WatchTargetGtid(subject string, stopCh chan struct{}) (string, error)
- func (sm *StoreManager) WatchTree(dir string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error)
- type Table
- type TableContext
- type TableSpec
- type TableStats
- type TaskStatistics
- type ThroughputStat
- type TxCount
- type UniqueKey
- type User
- type WhereContext
Constants ¶
const ( NotDML int8 = iota InsertDML UpdateDML DeleteDML )
const ( OPTION_AUTO_IS_NULL uint32 = 0x00004000 OPTION_NOT_AUTOCOMMIT uint32 = 0x00080000 OPTION_NO_FOREIGN_KEY_CHECKS uint32 = 0x04000000 OPTION_RELAXED_UNIQUE_CHECKS uint32 = 0x08000000 )
const ( // see mysql-server/libbinlogevents/include/statement_events.h Q_FLAGS2_CODE byte = iota Q_SQL_MODE_CODE Q_CATALOG Q_AUTO_INCREMENT Q_CHARSET_CODE Q_TIME_ZONE_CODE Q_CATALOG_NZ_CODE Q_LC_TIME_NAMES_CODE Q_CHARSET_DATABASE_CODE Q_TABLE_MAP_FOR_UPDATE_CODE Q_MASTER_DATA_WRITTEN_CODE Q_INVOKERS Q_UPDATED_DB_NAMES Q_MICROSECONDS Q_COMMIT_TS Q_COMMIT_TS2 Q_EXPLICIT_DEFAULTS_FOR_TIMESTAMP Q_DDL_LOGGED_WITH_XID Q_DEFAULT_COLLATION_FOR_UTF8MB4 Q_SQL_REQUIRE_PRIMARY_KEY Q_DEFAULT_TABLE_ENCRYPTION )
const ( RowsEventFlagEndOfStatement uint16 = 1 RowsEventFlagNoForeignKeyChecks uint16 = 2 RowsEventFlagNoUniqueKeyChecks uint16 = 4 RowsEventFlagRowHasAColumns uint16 = 8 )
const ( DefaultConnectWaitSecond = 10 DefaultConnectWait = DefaultConnectWaitSecond * time.Second DtleJobStatusNonPaused = "non-paused" DtleJobStatusPaused = "paused" DtleJobStatusUndefined = "undefined" DtleJobStatusReverseInit = "reverse-init" DtleJobStatusStop = "stop" TargetGtidFinished = "finished" )
const ( // TODO: Using configuration to set jwt secret JWTSecret = "secret" DefaultAdminTenant = "platform" DefaultAdminUser = "admin" DefaultAdminPwd = "admin" DefaultEncryptAdminPwd = "" /* 172-byte string literal not displayed */ DefaultRole = "admin" DefaultAdminAuth = "" /* 6003-byte string literal not displayed */ )
const ( ControlMsgError int32 = 1 ControlMsgFinish int32 = 2 )
const ( StageFinishedReadingOneBinlogSwitchingToNextBinlog = "Finished reading one binlog; switching to next binlog" StageMasterHasSentAllBinlogToSlave = "Master has sent all binlog to slave; waiting for more updates" StageRegisteringSlaveOnMaster = "Registering slave on master" StageRequestingBinlogDump = "Requesting binlog dump" StageSearchingRowsForUpdate = "Searching rows for update" StageSendingBinlogEventToSlave = "Sending binlog event to slave" StageSendingData = "Sending data" StageSlaveHasReadAllRelayLog = "Slave has read all relay log; waiting for more updates" StageSlaveWaitingForWorkersToProcessQueue = "Waiting for slave workers to process their queues" StageWaitingForGtidToBeCommitted = "Waiting for GTID to be committed" StageWaitingForMasterToSendEvent = "Waiting for master to send event" )
const ( DefaultChannelBufferSize = 32 DefaultChunkSize = 2000 DefaultNumWorkers = 1 DefaultClusterID = "dtle-nats" DefaultSrcGroupMaxSize = 1 DefaultSrcGroupTimeout = 100 DefaultKafkaMessageGroupMaxSize = 1 DefaultKafkaMessageGroupTimeout = 100 DefaultDependencyHistorySize = 2500 TaskTypeSrc = "src" TaskTypeDest = "dest" TaskTypeUnknown = "unknown" )
const (
DtleFlagCreateSchemaIfNotExists = 0x1
)
const ParserRestoreFlag = format.DefaultRestoreFlags | format.RestoreStringWithoutDefaultCharset
const (
TaskStateDead = 2
)
Variables ¶
var (
ErrNoConsul = fmt.Errorf("consul return nil value. check if consul is started or reachable")
)
Functions ¶
func Decode ¶
func Decode(data []byte, out GencodeType) (err error)
func DtleParseMysqlGTIDSet ¶
func DtleParseMysqlGTIDSet(gtidSetStr string) (*mysql.MysqlGTIDSet, error)
func Encode ¶
func Encode(v GencodeType) ([]byte, error)
func EncodeTable ¶
func GetFieldValue ¶
func GetFieldValue(fieldName string) interface{}
todo Support key value general settings
func GetGtidFromConsul ¶
func GetGtidFromConsul(sm *StoreManager, subject string, logger g.LoggerType, mysqlContext *MySQLDriverConfig) error
func IgnoreDbByReplicateIgnoreDb ¶
func IgnoreDbByReplicateIgnoreDb(replicateIgnoreDb []*DataSource, dbName string) bool
func IgnoreTbByReplicateIgnoreDb ¶
func IgnoreTbByReplicateIgnoreDb(replicateIgnoreDb []*DataSource, dbName, tbName string) bool
func MysqlVersionInDigit ¶
func RegularlyUpdateJobStatus ¶
func RegularlyUpdateJobStatus(store *StoreManager, shutdownCh chan struct{}, jobId string)
regularly update the task status value by the memory usage
func RowColumnIsNull ¶
func RowGetBytesColumn ¶
func TaskTypeFromString ¶
func ToEventDML ¶
func ToEventDML(eventType replication.EventType) int8
func UpdateGtidSet ¶
func UpdateGtidSet(gtidSet *mysql.MysqlGTIDSet, sid uuid.UUID, txGno int64)
func ValidateJobName ¶
func WriteWaitCh ¶
func WriteWaitCh(ch chan<- *drivers.ExitResult, r *drivers.ExitResult)
Types ¶
type ApplierTableItem ¶
type ApplierTableItem struct { Columns *ColumnList PsInsert0 []*sql.Stmt PsInsert1 []*sql.Stmt PsInsert2 []*sql.Stmt PsInsert3 []*sql.Stmt PsDelete []*sql.Stmt PsUpdate []*sql.Stmt ColumnMapTo []string }
func NewApplierTableItem ¶
func NewApplierTableItem(parallelWorkers int) *ApplierTableItem
func (*ApplierTableItem) Reset ¶
func (ait *ApplierTableItem) Reset()
type BufferStat ¶
type ColumnList ¶
type ColumnList struct { Columns []mysqlconfig.Column Ordinals mysqlconfig.ColumnsMap UniqueKeys []*UniqueKey }
ColumnList makes for a named list of columns
func NewColumnList ¶
func NewColumnList(columns []mysqlconfig.Column) *ColumnList
NewColumnList creates an object given ordered list of column names
func ParseColumnList ¶
func ParseColumnList(names string, tableColumns *ColumnList) *ColumnList
ParseColumnList parses a comma delimited list of column names
func (*ColumnList) ColumnList ¶
func (c *ColumnList) ColumnList() []mysqlconfig.Column
func (*ColumnList) GetCharset ¶
func (c *ColumnList) GetCharset(columnName string) string
func (*ColumnList) GetColumn ¶
func (c *ColumnList) GetColumn(columnName string) *mysqlconfig.Column
TODO caller doesn't handle nil.
func (*ColumnList) GetColumnType ¶
func (c *ColumnList) GetColumnType(columnName string) mysqlconfig.ColumnType
func (*ColumnList) HasTimezoneConversion ¶
func (c *ColumnList) HasTimezoneConversion(columnName string) bool
func (*ColumnList) IsSubsetOf ¶
func (c *ColumnList) IsSubsetOf(other *ColumnList) bool
IsSubsetOf returns 'true' when column names of this list are a subset of another list, in arbitrary order (order agnostic)
func (*ColumnList) IsUnsigned ¶
func (c *ColumnList) IsUnsigned(columnName string) bool
func (*ColumnList) Len ¶
func (c *ColumnList) Len() int
func (*ColumnList) Names ¶
func (c *ColumnList) Names() []string
func (*ColumnList) SetCharset ¶
func (c *ColumnList) SetCharset(columnName string, charset string)
func (*ColumnList) SetColumnType ¶
func (c *ColumnList) SetColumnType(columnName string, columnType mysqlconfig.ColumnType)
func (*ColumnList) SetConvertDatetimeToTimestamp ¶
func (c *ColumnList) SetConvertDatetimeToTimestamp(columnName string, toTimezone string)
func (*ColumnList) SetUnsigned ¶
func (c *ColumnList) SetUnsigned(columnName string)
func (*ColumnList) String ¶
func (c *ColumnList) String() string
type ControlMsg ¶
func (*ControlMsg) Size ¶
func (d *ControlMsg) Size() (s uint64)
type CoordinatesI ¶
type CurrentCoordinates ¶
type DataEntries ¶
type DataEntries struct {
Entries []*DataEntry
}
func (*DataEntries) Size ¶
func (d *DataEntries) Size() (s uint64)
type DataEntry ¶
type DataEntry struct { Coordinates CoordinatesI Events []DataEvent Index int32 Final bool }
func NewBinlogEntry ¶
func NewBinlogEntry() *DataEntry
func (*DataEntry) IsOneStmtDDL ¶
func (*DataEntry) IsPartOfBigTx ¶
type DataEvent ¶
type DataEvent struct { Query string CurrentSchema string DatabaseName string TableName string DML int8 ColumnCount uint64 Table []byte LogPos int64 Timestamp uint32 Flags []byte FKParent bool Rows [][]interface{} DtleFlags uint32 }
func NewDataEvent ¶
func NewQueryEvent ¶
type DataSource ¶
type DataSource struct { TableSchema string TableSchemaRegex string TableSchemaRename string Tables []*Table }
TableName is the table configuration slave restrict replication to a given table
func (*DataSource) String ¶
func (d *DataSource) String() string
type DelayCount ¶
type DtleTaskConfig ¶
type DtleTaskConfig struct { //Ref:http://dev.mysql.com/doc/refman/5.7/en/replication-options-slave.html#option_mysqld_replicate-do-table ReplicateDoDb []*DataSource `codec:"ReplicateDoDb"` ReplicateIgnoreDb []*DataSource `codec:"ReplicateIgnoreDb"` DropTableIfExists bool `codec:"DropTableIfExists"` ExpandSyntaxSupport bool `codec:"ExpandSyntaxSupport"` ReplChanBufferSize int64 `codec:"ReplChanBufferSize"` TrafficAgainstLimits int `codec:"TrafficAgainstLimits"` ChunkSize int64 `codec:"ChunkSize"` SqlFilter []string `codec:"SqlFilter"` GroupMaxSize int `codec:"GroupMaxSize"` GroupTimeout int `codec:"GroupTimeout"` Gtid string `codec:"Gtid"` BinlogFile string `codec:"BinlogFile"` BinlogPos int64 `codec:"BinlogPos"` GtidStart string `codec:"GtidStart"` AutoGtid bool `codec:"AutoGtid"` BinlogRelay bool `codec:"BinlogRelay"` WaitOnJob string `codec:"WaitOnJob"` BulkInsert1 int `codec:"BulkInsert1"` BulkInsert2 int `codec:"BulkInsert2"` BulkInsert3 int `codec:"BulkInsert3"` RetryTxLimit int `codec:"RetryTxLimit"` SlaveNetWriteTimeout int `codec:"SlaveNetWriteTimeout"` BigTxSrcQueue int32 `codec:"BigTxSrcQueue"` TwoWaySync bool `codec:"TwoWaySync"` TwoWaySyncGtid string `codec:"TwoWaySyncGtid"` ParallelWorkers int `codec:"ParallelWorkers"` DependencyHistorySize int `codec:"DependencyHistorySize"` UseMySQLDependency bool `codec:"UseMySQLDependency"` ForeignKeyChecks bool `codec:"ForeignKeyChecks"` DumpEntryLimit int `codec:"DumpEntryLimit"` SetGtidNext bool `codec:"SetGtidNext"` SkipCreateDbTable bool `codec:"SkipCreateDbTable"` SkipPrivilegeCheck bool `codec:"SkipPrivilegeCheck"` SkipIncrementalCopy bool `codec:"SkipIncrementalCopy"` SrcConnectionConfig *mysqlconfig.ConnectionConfig `codec:"SrcConnectionConfig"` DestConnectionConfig *mysqlconfig.ConnectionConfig `codec:"DestConnectionConfig"` KafkaConfig *KafkaConfig `codec:"KafkaConfig"` DestType string `codec:"DestType"` // support oracle extractor/applier SrcOracleConfig *config.OracleConfig `codec:"SrcOracleConfig"` }
func (*DtleTaskConfig) SetDefaultForEmpty ¶
func (d *DtleTaskConfig) SetDefaultForEmpty()
type DumpCoordinates ¶
type DumpEntry ¶
type DumpStatResult ¶
type DumpStatResult struct { Coord DumpCoordinates Type int32 TableSpecs []*TableSpec }
func (*DumpStatResult) Size ¶
func (d *DumpStatResult) Size() (s uint64)
type Dumper ¶
type Dumper struct { Ctx context.Context Logger g.LoggerType ChunkSize int64 TableSchema string EscapedTableSchema string TableName string EscapedTableName string Table *Table Iteration int64 Columns string // ResultsChannel should be closed after writing all entries. ResultsChannel chan *DumpEntry // Set Err (if there is) before closing ResultsChannel. Err error ShutdownCh chan struct{} Memory *int64 GetChunkData GetChunkDataFn PrepareForDumping PrepareFn // contains filtered or unexported fields }
type EntryContext ¶
type EntryContext struct { Entry *DataEntry // Only a DML has a tableItem. For a DDL, its tableItem is nil. TableItems []*ApplierTableItem OriginalSize int // size of binlog entry Rows int // for logging }
type ExecContext ¶
type GencodeType ¶
type GetChunkDataFn ¶
type JobListItemV2 ¶
type JobListItemV2 struct { JobId string `json:"job_id"` JobStatus string `json:"job_status"` Topic string `json:"topic"` JobCreateTime string `json:"job_create_time"` SrcDatabaseType string `json:"src_database_type"` DstDatabaseType string `json:"dst_database_type"` SrcAddrList []string `json:"src_addr_list"` DstAddrList []string `json:"dst_addr_list"` User string `json:"user"` JobSteps []JobStep `json:"job_steps"` AllocationStatus map[string]string `json:"allocation_status"` }
type JobStep ¶
type JobStep struct { StepName string `json:"step_name"` StepStatus string `json:"step_status"` StepSchedule float64 `json:"step_schedule"` JobCreateTime string `json:"job_create_time"` }
func NewJobStep ¶
type KafkaConfig ¶
type MemoryStat ¶
type MySQLCoordinateTx ¶
type MySQLCoordinateTx struct { LogFile string LogPos int64 SID [16]byte GNO int64 LastCommitted int64 SeqenceNumber int64 }
func (*MySQLCoordinateTx) GetGNO ¶
func (b *MySQLCoordinateTx) GetGNO() int64
func (*MySQLCoordinateTx) GetGtidForThisTx ¶
func (b *MySQLCoordinateTx) GetGtidForThisTx() string
func (*MySQLCoordinateTx) GetLastCommit ¶
func (b *MySQLCoordinateTx) GetLastCommit() int64
func (*MySQLCoordinateTx) GetLogFile ¶
func (b *MySQLCoordinateTx) GetLogFile() string
func (*MySQLCoordinateTx) GetLogPos ¶
func (b *MySQLCoordinateTx) GetLogPos() int64
func (*MySQLCoordinateTx) GetSequenceNumber ¶
func (b *MySQLCoordinateTx) GetSequenceNumber() int64
func (*MySQLCoordinateTx) GetSid ¶
func (b *MySQLCoordinateTx) GetSid() interface{}
func (*MySQLCoordinateTx) GetSidStr ¶
func (b *MySQLCoordinateTx) GetSidStr() string
Do not call this frequently. Cache your result.
func (*MySQLCoordinateTx) Size ¶
func (d *MySQLCoordinateTx) Size() (s uint64)
type MySQLCoordinates ¶
func (*MySQLCoordinates) CompareFilePos ¶
func (b *MySQLCoordinates) CompareFilePos(other *MySQLCoordinates) int
func (*MySQLCoordinates) GetLogFile ¶
func (b *MySQLCoordinates) GetLogFile() string
func (*MySQLCoordinates) GetLogPos ¶
func (b *MySQLCoordinates) GetLogPos() int64
func (*MySQLCoordinates) GetTxSet ¶
func (b *MySQLCoordinates) GetTxSet() string
func (*MySQLCoordinates) IsEmpty ¶
func (b *MySQLCoordinates) IsEmpty() bool
IsEmpty returns true if the log file is empty, unnamed
func (*MySQLCoordinates) Size ¶
func (d *MySQLCoordinates) Size() (s uint64)
func (MySQLCoordinates) String ¶
func (b MySQLCoordinates) String() string
String returns a user-friendly string representation of these coordinates
type MySQLDriverConfig ¶
type MySQLDriverConfig struct { DtleTaskConfig RowsEstimate int64 DeltaEstimate int64 BinlogRowImage string RowCopyStartTime time.Time RowCopyEndTime time.Time Stage string }
func (*MySQLDriverConfig) ElapsedRowCopyTime ¶
func (m *MySQLDriverConfig) ElapsedRowCopyTime() time.Duration
ElapsedRowCopyTime returns time since starting to copy chunks of rows
func (*MySQLDriverConfig) MarkRowCopyEndTime ¶
func (m *MySQLDriverConfig) MarkRowCopyEndTime()
ElapsedRowCopyTime returns time since starting to copy chunks of rows
func (*MySQLDriverConfig) MarkRowCopyStartTime ¶
func (m *MySQLDriverConfig) MarkRowCopyStartTime()
MarkRowCopyStartTime
type NatsMsgMerger ¶
type NatsMsgMerger struct {
// contains filtered or unexported fields
}
func NewNatsMsgMerger ¶
func NewNatsMsgMerger(logger g.LoggerType) *NatsMsgMerger
func (*NatsMsgMerger) GetBytes ¶
func (nmm *NatsMsgMerger) GetBytes() []byte
func (*NatsMsgMerger) Handle ¶
func (nmm *NatsMsgMerger) Handle(data []byte) (segmentFinished bool, err error)
func (*NatsMsgMerger) Reset ¶
func (nmm *NatsMsgMerger) Reset()
type OracleCoordinateTx ¶
func (*OracleCoordinateTx) GetGNO ¶
func (o *OracleCoordinateTx) GetGNO() int64
func (*OracleCoordinateTx) GetGtidForThisTx ¶
func (o *OracleCoordinateTx) GetGtidForThisTx() string
func (*OracleCoordinateTx) GetLastCommit ¶
func (o *OracleCoordinateTx) GetLastCommit() int64
func (*OracleCoordinateTx) GetLogFile ¶
func (o *OracleCoordinateTx) GetLogFile() string
func (*OracleCoordinateTx) GetLogPos ¶
func (o *OracleCoordinateTx) GetLogPos() int64
func (*OracleCoordinateTx) GetSequenceNumber ¶
func (b *OracleCoordinateTx) GetSequenceNumber() int64
func (*OracleCoordinateTx) GetSid ¶
func (o *OracleCoordinateTx) GetSid() interface{}
func (*OracleCoordinateTx) GetSidStr ¶
func (o *OracleCoordinateTx) GetSidStr() string
func (*OracleCoordinateTx) Marshal ¶
func (d *OracleCoordinateTx) Marshal(buf []byte) ([]byte, error)
func (*OracleCoordinateTx) Size ¶
func (d *OracleCoordinateTx) Size() (s uint64)
type OracleCoordinates ¶
type OracleCoordinates struct {
LaststSCN int64
}
func (*OracleCoordinates) GetLogFile ¶
func (b *OracleCoordinates) GetLogFile() string
func (*OracleCoordinates) GetLogPos ¶
func (b *OracleCoordinates) GetLogPos() int64
func (*OracleCoordinates) GetTxSet ¶
func (b *OracleCoordinates) GetTxSet() string
func (*OracleCoordinates) Size ¶
func (d *OracleCoordinates) Size() (s uint64)
type QueryCount ¶
type QueryEventFlags ¶
type QueryEventFlags struct { NoForeignKeyChecks bool // The query is converted to utf8 in dtle-src. Ignore charset/collation flags on dtle-dest. CharacterSetClient string CollationConnection string CollationServer string }
func ParseQueryEventFlags ¶
func ParseQueryEventFlags(bs []byte, logger g.LoggerType) (r QueryEventFlags, err error)
type Role ¶
type Role struct { Tenant string `json:"tenant"` Name string `json:"name"` ObjectUsers []string `json:"object_users"` ObjectType string `json:"object_type"` Authority string `json:"authority"` }
func NewDefaultRole ¶
type SchemaContext ¶
type SchemaContext struct { TableSchema string TableSchemaRename string CreateSchemaString string TableMap map[string]*TableContext }
func NewSchemaContext ¶
func NewSchemaContext(name string) *SchemaContext
func (*SchemaContext) AddTable ¶
func (sc *SchemaContext) AddTable(table *Table) (err error)
func (*SchemaContext) AddTables ¶
func (sc *SchemaContext) AddTables(tables []*Table) (err error)
type SchemaTable ¶
type StoreManager ¶
type StoreManager struct {
// contains filtered or unexported fields
}
func NewStoreManager ¶
func NewStoreManager(consulAddr []string, logger g.LoggerType) (*StoreManager, error)
func (*StoreManager) CheckJobExists ¶
func (sm *StoreManager) CheckJobExists(jobId string) bool
func (*StoreManager) DeleteRole ¶
func (sm *StoreManager) DeleteRole(tenant, name string) error
func (*StoreManager) DeleteUser ¶
func (sm *StoreManager) DeleteUser(tenant, user string) error
func (*StoreManager) DestroyJob ¶
func (sm *StoreManager) DestroyJob(jobId string) error
func (*StoreManager) DstPutNats ¶
func (sm *StoreManager) DstPutNats(jobName string, natsAddr string, stopCh chan struct{}, onWatchError func(error)) error
func (*StoreManager) FindJobList ¶
func (sm *StoreManager) FindJobList() (map[string]*JobListItemV2, error)
func (*StoreManager) FindRoleList ¶
func (sm *StoreManager) FindRoleList(tenant string) ([]*Role, error)
func (*StoreManager) FindTenantList ¶
func (sm *StoreManager) FindTenantList() (tenants []string, err error)
func (*StoreManager) FindUserList ¶
func (sm *StoreManager) FindUserList(userKey string) ([]*User, error)
func (*StoreManager) GetBinlogFilePosForJob ¶
func (sm *StoreManager) GetBinlogFilePosForJob(jobName string) (*mysql.Position, error)
func (*StoreManager) GetConfig ¶
func (sm *StoreManager) GetConfig(jobName string) (*MySQLDriverConfig, error)
func (*StoreManager) GetDumpProgress ¶
func (sm *StoreManager) GetDumpProgress(jobName string) (int64, int64, error)
return: ExecRowCount, TotalRowCount
func (*StoreManager) GetGtidForJob ¶
func (sm *StoreManager) GetGtidForJob(jobName string) (string, error)
func (*StoreManager) GetJobInfo ¶
func (sm *StoreManager) GetJobInfo(jobId string) (*JobListItemV2, error)
func (*StoreManager) GetJobStage ¶
func (sm *StoreManager) GetJobStage(jobName string) (string, error)
func (*StoreManager) GetJobStatus ¶
func (sm *StoreManager) GetJobStatus(jobId string) (string, error)
func (*StoreManager) GetNatsIfExist ¶
func (sm *StoreManager) GetNatsIfExist(jobName string) (string, bool, error)
func (*StoreManager) GetOracleSCNPosForJob ¶
func (sm *StoreManager) GetOracleSCNPosForJob(jobName string) (oldestUncommittedScn int64, committedSCN int64, err error)
func (*StoreManager) GetRole ¶
func (sm *StoreManager) GetRole(tenant, name string) (*Role, bool, error)
func (*StoreManager) GetTargetGtid ¶
func (sm *StoreManager) GetTargetGtid(subject string) (string, error)
func (*StoreManager) GetUser ¶
func (sm *StoreManager) GetUser(tenant, username string) (*User, bool, error)
func (*StoreManager) PutConfig ¶
func (sm *StoreManager) PutConfig(subject string, config *MySQLDriverConfig) error
func (*StoreManager) PutDumpProgress ¶
func (sm *StoreManager) PutDumpProgress(jobName string, exec int64, total int64) error
func (*StoreManager) PutJobStage ¶
func (sm *StoreManager) PutJobStage(jobName string, stage string) error
func (*StoreManager) PutTargetGtid ¶
func (sm *StoreManager) PutTargetGtid(subject string, value string) error
func (*StoreManager) SaveBinlogFilePosForJob ¶
func (sm *StoreManager) SaveBinlogFilePosForJob(jobName string, file string, pos int) error
func (*StoreManager) SaveGtidForJob ¶
func (sm *StoreManager) SaveGtidForJob(jobName string, gtid string) error
func (*StoreManager) SaveJobInfo ¶
func (sm *StoreManager) SaveJobInfo(job JobListItemV2) error
func (*StoreManager) SaveOracleSCNPos ¶
func (sm *StoreManager) SaveOracleSCNPos(jobName string, oldestUncommittedScn, committedSCN int64) error
func (*StoreManager) SaveRole ¶
func (sm *StoreManager) SaveRole(role *Role) error
func (*StoreManager) SaveUser ¶
func (sm *StoreManager) SaveUser(user *User) error
func (*StoreManager) SrcWatchNats ¶
func (sm *StoreManager) SrcWatchNats(jobName string, stopCh chan struct{}, onWatchError func(error)) (natsAddr string, err error)
func (*StoreManager) WaitKv ¶
func (sm *StoreManager) WaitKv(subject string, key string, stopCh chan struct{}) ([]byte, error)
func (*StoreManager) WaitOnJob ¶
func (sm *StoreManager) WaitOnJob(currentJob string, waitJob string, stopCh chan struct{}) error
func (*StoreManager) WatchTargetGtid ¶
func (sm *StoreManager) WatchTargetGtid(subject string, stopCh chan struct{}) (string, error)
type Table ¶
type Table struct { TableName string TableRegex string TableRename string TableSchema string // not user assigned TableSchemaRename string // not user assigned Counter int64 ColumnMapFrom []string ColumnMapTo []string OriginalTableColumns *ColumnList UseUniqueKey *UniqueKey ColumnMap []int TableType string Where string // Call GetWhere() instead of directly accessing. }
func DecodeMaybeTable ¶
type TableContext ¶
type TableContext struct { Table *Table WhereCtx *WhereContext DefChangedSent bool FKChildren map[SchemaTable]struct{} }
func NewTableContext ¶
func NewTableContext(table *Table) (*TableContext, error)
func (*TableContext) WhereTrue ¶
func (t *TableContext) WhereTrue(row []interface{}) (bool, error)
type TableStats ¶
type TaskStatistics ¶
type TaskStatistics struct { CurrentCoordinates *CurrentCoordinates TableStats *TableStats DelayCount *DelayCount ProgressPct string ExecMasterRowCount int64 ExecMasterTxCount int64 ReadMasterRowCount int64 ReadMasterTxCount int64 ETA string Backlog string ThroughputStat *ThroughputStat MsgStat gonats.Statistics BufferStat BufferStat Stage string Timestamp int64 MemoryStat MemoryStat HandledTxCount TxCount HandledQueryCount QueryCount }
type ThroughputStat ¶
type UniqueKey ¶
type UniqueKey struct { Name string Columns ColumnList HasNullable bool IsAutoIncrement bool LastMaxVals []string }
UniqueKey is the combination of a key's name and columns
type WhereContext ¶
type WhereContext struct { Where string Ast expr.Node FieldsMap map[string]int IsDefault bool // is 'true' }
func NewWhereCtx ¶
func NewWhereCtx(where string, table *Table) (*WhereContext, error)