Documentation ¶
Index ¶
- Constants
- Variables
- func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, currentBinlog string) int
- func CheckElementOfSliceInt(arr []int, element int, prefix string, ifExt bool) bool
- func CheckElementOfSliceStr(arr []string, element string, prefix string, ifExt bool) bool
- func CheckIsDir(fd string) (bool, string)
- func CommaSeparatedListToArray(str string) []string
- func CompareBinlogPos(sBinFile string, sPos uint, eBinFile string, ePos uint) int
- func CompareEquelByteSlice(s1 []byte, s2 []byte) bool
- func ConvertRowToExpressRow(row []interface{}, ifIgnorePrimary bool, primaryIdx []int) []SQL.Expression
- func ConvertStrArrToIntferfaceArrForPrint(arr []string) []interface{}
- func CreateMysqlCon(mysqlUrl string) (*sql.DB, error)
- func GenDeleteSqlsForOneRowsEvent(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, ...) []string
- func GenDeleteSqlsForOneRowsEventRollbackInsert(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, ...) []string
- func GenEqualConditions(row []interface{}, colDefs []SQL.NonAliasColumn, uniKey []int, ...) []SQL.BoolExpression
- func GenForwardRollbackSqlFromBinEvent(i uint, cfg *ConfCmd, wg *sync.WaitGroup)
- func GenInsertSqlForRows(rows [][]interface{}, insertSql SQL.InsertStatement, schema string, ...) (string, error)
- func GenInsertSqlsForOneRowsEvent(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, ...) []string
- func GenInsertSqlsForOneRowsEventRollbackDelete(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, ...) []string
- func GenUpdateSetPart(colsTypeNameFromMysql []string, colTypeNames []string, ...) SQL.UpdateStatement
- func GenUpdateSqlsForOneRowsEvent(posStr string, colsTypeNameFromMysql []string, colsTypeName []string, ...) []string
- func GetAbsTableName(schema, table string) string
- func GetBigLongTrxContentLine(blTrx BigLongTrxInfo) string
- func GetBigLongTrxPrintHeaderLine(headers []string) string
- func GetBigLongTrxStatementsStr(st map[string]map[string]uint32) string
- func GetBinlogBasenameAndIndex(binlog string) (string, int)
- func GetBinlogPosAsKey(binlog string, spos, epos uint32) string
- func GetColDefIgnorePrimary(colDefs []SQL.NonAliasColumn, primaryIdx []int) []SQL.NonAliasColumn
- func GetColIndexFromKey(ki KeyInfo, columns []FieldInfo) []int
- func GetDatetimeStr(sec int64, nsec int64, timeFmt string) string
- func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32)
- func GetDbTbFromAbsTbName(name string) (string, string)
- func GetDroppedFieldName(idx int) string
- func GetFiledType(filed string) string
- func GetFirstBinlogPosToParse(cfg *ConfCmd) (string, int64)
- func GetForwardRollbackContentLineWithExtra(sq ForwardRollbackSqlOfPrint, ifExtra bool) string
- func GetForwardRollbackSqlFileName(schema string, table string, filePerTable bool, outDir string, ifRollback bool, ...) string
- func GetLineHeaderStrFromColumnNamesArr(arr []string, sep string) string
- func GetMaxValue(nums ...int) int
- func GetMinValue(nums ...int) int
- func GetMysqlDataTypeNameAndSqlColumn(tpDef string, colName string, tp byte, meta uint16) (string, SQL.NonAliasColumn)
- func GetMysqlUrl(cfg *ConfCmd) string
- func GetNextBinlog(baseName string, indx int) string
- func GetPosStr(name string, spos uint32, epos uint32) string
- func GetSqlFieldsEXpressions(colCnt int, colNames []FieldInfo, tbMap *replication.TableMapEvent) ([]SQL.NonAliasColumn, []string)
- func GetStatsPrintContentLine(st *BinEventStatsPrint) string
- func GetStatsPrintHeaderLine(headers []string) string
- func GetSystemHomeNameAndAdderss() (hostname string, address string)
- func IntSliceToString(iArr []int, sep string, prefix string) string
- func IsUnsigned(filed string) bool
- func NewReplBinlogStreamer(cfg *ConfCmd) *replication.BinlogStreamer
- func ParserAllBinEventsFromRepl(cfg *ConfCmd)
- func PrintExtraInfoForForwardRollbackupSql(cfg *ConfCmd, wg *sync.WaitGroup)
- func ProcessBinEventStats(cfg *ConfCmd, wg *sync.WaitGroup)
- func ReverseFileGo(threadIdx int, rollbackFileChan chan map[string]string, ...)
- func ReverseFileToNewFileOneByOneLineAndKeepTrxBatchRead(srcFile string, destFile string, trxPoses [][]int, keepTrx bool) error
- func SendBinlogEventRepl(cfg *ConfCmd)
- func StrSliceToString(sArr []string, sep, prefix string) string
- type BigLongTrxInfo
- type BinEventHandlingIndx
- type BinEventStats
- type BinEventStatsPrint
- type BinFileParser
- type ConfCmd
- func (this *ConfCmd) CheckCmdOptions()
- func (this *ConfCmd) CheckRequiredOption(v interface{}, prefix string, ifExt bool) bool
- func (this *ConfCmd) CheckValueInRange(opt string, val int, prefix string, ifExt bool) bool
- func (this *ConfCmd) CloseChan()
- func (this *ConfCmd) CloseFH()
- func (this *ConfCmd) CreateDB()
- func (this *ConfCmd) GetDefaultAndRangeValueMsg(opt string) string
- func (this *ConfCmd) GetDefaultValueOfRange(opt string) int
- func (this *ConfCmd) GetMaxValueOfRange(opt string) int
- func (this *ConfCmd) GetMinValueOfRange(opt string) int
- func (this *ConfCmd) IsTargetDml(dml string) bool
- func (this *ConfCmd) OpenStatsResultFiles()
- func (this *ConfCmd) OpenTxResultFiles()
- func (this *ConfCmd) ParseCmdOptions()
- func (this *ConfCmd) PrintUsageMsg()
- type DdlPosInfo
- type ExtraSqlInfoOfPrint
- type FieldInfo
- type ForwardRollbackSqlOfPrint
- type KeyInfo
- type MyBinEvent
- type OrgSqlPrint
- type TablesColumnsInfo
- func (this *TablesColumnsInfo) GetTableColumns(db *sql.DB, dbname string, tbname string) error
- func (this *TablesColumnsInfo) GetTableInfoJson(schema string, table string) (*TblInfoJson, error)
- func (this *TablesColumnsInfo) GetTableKeysInfo(db *sql.DB, dbName string, tbName string) error
- func (this *TablesColumnsInfo) GetTbDefFromDb(cfg *ConfCmd, dbname string, tbname string)
- type TblInfoJson
Constants ¶
View Source
const ( C_Version = "my2sql V2.0" C_validOptMsg = "valid options are: " C_joinSepComma = "," EventTimeout = 5 * time.Second C_unknownColPrefix = "dropped_column_" C_unknownColType = "unknown_type" C_unknownColTypeCode = mysql.MYSQL_TYPE_NULL C_trxBegin = 0 C_trxCommit = 1 C_trxRollback = 2 C_trxProcess = -1 C_reProcess = 0 C_reContinue = 1 C_reBreak = 2 C_reFileEnd = 3 )
View Source
const ( //PRIMARY_KEY_LABLE = "primary" //UNIQUE_KEY_LABLE = "unique" KEY_BINLOG_POS_SEP = "/" KEY_DB_TABLE_SEP = "." KEY_NONE_BINLOG = "_" )
Variables ¶
View Source
var ( GConfCmd *ConfCmd = &ConfCmd{} GBinlogTimeLocation *time.Location GUseDatabase string = "" GOptsValidMode []string = []string{"repl", "file"} GOptsValidWorkType []string = []string{"2sql", "rollback", "stats"} GOptsValidMysqlType []string = []string{"mysql", "mariadb"} GOptsValidFilterSql []string = []string{"insert", "update", "delete"} GOptsValueRange map[string][]int = map[string][]int{ "PrintInterval": []int{1, 600, 30}, "BigTrxRowLimit": []int{1, 30000, 10}, "LongTrxSeconds": []int{0, 3600, 1}, "InsertRows": []int{1, 500, 30}, "Threads": []int{1, 16, 2}, } GStatsColumns []string = []string{ "StartTime", "StopTime", "Binlog", "PosRange", "Database", "Table", "BigTrxs", "BiggestTrx", "LongTrxs", "LongestTrx", "Inserts", "Updates", "Deletes", "Trxs", "Statements", "Renames", "RenamePoses", "Ddls", "DdlPoses", } GDdlPrintHeader []string = []string{"datetime", "binlog", "startposition", "stopposition", "sql"} )
View Source
var ( //gDdlRegexp *regexp.Regexp = regexp.MustCompile(C_ddlRegexp) Stats_Result_Header_Column_names []string = []string{"binlog", "starttime", "stoptime", "startpos", "stoppos", "inserts", "updates", "deletes", "database", "table"} Stats_DDL_Header_Column_names []string = []string{"datetime", "binlog", "startpos", "stoppos", "sql"} Stats_BigLongTrx_Header_Column_names []string = []string{"binlog", "starttime", "stoptime", "startpos", "stoppos", "rows", "duration", "tables"} )
View Source
var G_Bytes_Column_Types []string = []string{"blob", "json", "geometry", C_unknownColType}
Functions ¶
func CheckBinHeaderCondition ¶
func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, currentBinlog string) int
func CheckElementOfSliceInt ¶
func CheckElementOfSliceStr ¶
func CheckIsDir ¶
func CompareBinlogPos ¶
func CompareEquelByteSlice ¶
func ConvertRowToExpressRow ¶
func ConvertRowToExpressRow(row []interface{}, ifIgnorePrimary bool, primaryIdx []int) []SQL.Expression
func ConvertStrArrToIntferfaceArrForPrint ¶
func ConvertStrArrToIntferfaceArrForPrint(arr []string) []interface{}
func GenDeleteSqlsForOneRowsEvent ¶
func GenDeleteSqlsForOneRowsEvent(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool, ifRollback bool, ifprefixDb bool) []string
func GenDeleteSqlsForOneRowsEventRollbackInsert ¶
func GenDeleteSqlsForOneRowsEventRollbackInsert(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool, ifprefixDb bool) []string
func GenEqualConditions ¶
func GenEqualConditions(row []interface{}, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool) []SQL.BoolExpression
func GenInsertSqlForRows ¶
func GenInsertSqlsForOneRowsEventRollbackDelete ¶
func GenInsertSqlsForOneRowsEventRollbackDelete(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, rowsPerSql int, ifprefixDb bool) []string
func GenUpdateSetPart ¶
func GenUpdateSetPart(colsTypeNameFromMysql []string, colTypeNames []string, updateSql SQL.UpdateStatement, colDefs []SQL.NonAliasColumn, rowAfter []interface{}, rowBefore []interface{}, ifFullImage bool) SQL.UpdateStatement
func GetAbsTableName ¶
func GetBigLongTrxContentLine ¶
func GetBigLongTrxContentLine(blTrx BigLongTrxInfo) string
func GetBinlogPosAsKey ¶
func GetColDefIgnorePrimary ¶
func GetColDefIgnorePrimary(colDefs []SQL.NonAliasColumn, primaryIdx []int) []SQL.NonAliasColumn
func GetColIndexFromKey ¶
func GetDbTbAndQueryAndRowCntFromBinevent ¶
func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32)
func GetDbTbFromAbsTbName ¶
func GetDroppedFieldName ¶
func GetFiledType ¶
func GetForwardRollbackContentLineWithExtra ¶
func GetForwardRollbackContentLineWithExtra(sq ForwardRollbackSqlOfPrint, ifExtra bool) string
func GetMaxValue ¶
func GetMinValue ¶
func GetMysqlUrl ¶
func GetNextBinlog ¶
func GetSqlFieldsEXpressions ¶
func GetSqlFieldsEXpressions(colCnt int, colNames []FieldInfo, tbMap *replication.TableMapEvent) ([]SQL.NonAliasColumn, []string)
func GetStatsPrintContentLine ¶
func GetStatsPrintContentLine(st *BinEventStatsPrint) string
func GetStatsPrintHeaderLine ¶
func GetSystemHomeNameAndAdderss ¶
获取 系统的hostname 和 ip地址
func IsUnsigned ¶
func NewReplBinlogStreamer ¶
func NewReplBinlogStreamer(cfg *ConfCmd) *replication.BinlogStreamer
func ParserAllBinEventsFromRepl ¶
func ParserAllBinEventsFromRepl(cfg *ConfCmd)
func ProcessBinEventStats ¶
func ReverseFileGo ¶
func SendBinlogEventRepl ¶
func SendBinlogEventRepl(cfg *ConfCmd)
func StrSliceToString ¶
Types ¶
type BigLongTrxInfo ¶
type BigLongTrxInfo struct { //IsBig bool //IsLong bool StartTime uint32 StopTime uint32 Binlog string StartPos uint32 StopPos uint32 RowCnt uint32 // total row count for all statement Duration uint32 // how long the trx lasts Statements map[string]map[string]uint32 // rowcnt for each type statment: insert, update, delete. {db1.tb1:{insert:0, update:2, delete:10}} }
type BinEventHandlingIndx ¶
type BinEventHandlingIndx struct { EventIdx uint64 Finished bool // contains filtered or unexported fields }
var (
G_HandlingBinEventIndex *BinEventHandlingIndx
)
type BinEventStats ¶
type BinEventStatsPrint ¶
type BinFileParser ¶
type BinFileParser struct {
Parser *replication.BinlogParser
}
type ConfCmd ¶
type ConfCmd struct { Mode string WorkType string MysqlType string Host string Port uint User string Passwd string ServerId uint Databases []string Tables []string //DatabaseRegs []*regexp.Regexp //ifHasDbReg bool //TableRegs []*regexp.Regexp //ifHasTbReg bool IgnoreDatabases []string IgnoreTables []string FilterSql []string FilterSqlLen int StartFile string StartPos uint StartFilePos mysql.Position IfSetStartFilePos bool StopFile string StopPos uint StopFilePos mysql.Position IfSetStopFilePos bool StartDatetime uint32 StopDatetime uint32 BinlogTimeLocation string IfSetStartDateTime bool IfSetStopDateTime bool MaxFileNumber int LocalBinFile string OutputToScreen bool PrintInterval int BigTrxRowLimit int LongTrxSeconds int IfSetStopParsPoint bool OutputDir string //MinColumns bool FullColumns bool InsertRows int KeepTrx bool SqlTblPrefixDb bool FilePerTable bool PrintExtraInfo bool Threads uint ReadTblDefJsonFile string OnlyColFromFile bool DumpTblDefToFile string BinlogDir string GivenBinlogFile string UseUniqueKeyFirst bool IgnorePrimaryKeyForInsert bool ReplaceIntoForInsert bool //DdlRegexp string ParseStatementSql bool IgnoreParsedErrForSql string // if parsed error, for sql match this regexp, only print error info, but not exits IgnoreParsedErrRegexp *regexp.Regexp EventChan chan MyBinEvent StatChan chan BinEventStats OrgSqlChan chan OrgSqlPrint SqlChan chan ForwardRollbackSqlOfPrint StatFH *os.File //DdlFH *os.File BiglongFH *os.File BinlogStreamer *replication.BinlogStreamer FromDB *sql.DB }
func (*ConfCmd) CheckCmdOptions ¶
func (this *ConfCmd) CheckCmdOptions()
func (*ConfCmd) CheckRequiredOption ¶
func (*ConfCmd) CheckValueInRange ¶
func (*ConfCmd) GetDefaultAndRangeValueMsg ¶
func (*ConfCmd) GetDefaultValueOfRange ¶
func (*ConfCmd) GetMaxValueOfRange ¶
func (*ConfCmd) GetMinValueOfRange ¶
func (*ConfCmd) IsTargetDml ¶
func (*ConfCmd) OpenStatsResultFiles ¶
func (this *ConfCmd) OpenStatsResultFiles()
func (*ConfCmd) OpenTxResultFiles ¶
func (this *ConfCmd) OpenTxResultFiles()
func (*ConfCmd) ParseCmdOptions ¶
func (this *ConfCmd) ParseCmdOptions()
func (*ConfCmd) PrintUsageMsg ¶
func (this *ConfCmd) PrintUsageMsg()
type DdlPosInfo ¶
type ExtraSqlInfoOfPrint ¶
type ExtraSqlInfoOfPrint struct {
// contains filtered or unexported fields
}
type FieldInfo ¶
type FieldInfo struct { FieldName string `json:"column_name"` FieldType string `json:"column_type"` IsUnsigned bool `json:"is_unsigned"` }
type ForwardRollbackSqlOfPrint ¶
type ForwardRollbackSqlOfPrint struct {
// contains filtered or unexported fields
}
type MyBinEvent ¶
type MyBinEvent struct { MyPos mysql.Position //this is the end position EventIdx uint64 BinEvent *replication.RowsEvent StartPos uint32 // this is the start position IfRowsEvent bool SqlType string // insert, update, delete Timestamp uint32 TrxIndex uint64 TrxStatus int // 0:begin, 1: commit, 2: rollback, -1: in_progress QuerySql *dsql.SqlInfo // for ddl and binlog which is not row format OrgSql string // for ddl and binlog which is not row format }
func (*MyBinEvent) CheckBinEvent ¶
func (this *MyBinEvent) CheckBinEvent(cfg *ConfCmd, ev *replication.BinlogEvent, currentBinlog *string) int
type OrgSqlPrint ¶
type TablesColumnsInfo ¶
type TablesColumnsInfo struct {
// contains filtered or unexported fields
}
var (
G_TablesColumnsInfo TablesColumnsInfo
)
func (*TablesColumnsInfo) GetTableColumns ¶
func (*TablesColumnsInfo) GetTableInfoJson ¶
func (this *TablesColumnsInfo) GetTableInfoJson(schema string, table string) (*TblInfoJson, error)
func (*TablesColumnsInfo) GetTableKeysInfo ¶
func (*TablesColumnsInfo) GetTbDefFromDb ¶
func (this *TablesColumnsInfo) GetTbDefFromDb(cfg *ConfCmd, dbname string, tbname string)
type TblInfoJson ¶
type TblInfoJson struct { Database string `json:"database"` Table string `json:"table"` Columns []FieldInfo `json:"columns"` PrimaryKey KeyInfo `json:"primary_key"` UniqueKeys []KeyInfo `json:"unique_keys"` }
func (*TblInfoJson) GetOneUniqueKey ¶
func (this *TblInfoJson) GetOneUniqueKey(uniqueFirst bool) KeyInfo
Click to show internal directories.
Click to hide internal directories.