base

package
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 6, 2024 License: MIT Imports: 24 Imported by: 0

Documentation

Index

Constants

View Source
const (
	C_Version      = "my2sql V2.0"
	C_validOptMsg  = "valid options are: "
	C_joinSepComma = ","

	EventTimeout = 5 * time.Second

	C_unknownColPrefix   = "dropped_column_"
	C_unknownColType     = "unknown_type"
	C_unknownColTypeCode = mysql.MYSQL_TYPE_NULL

	C_trxBegin    = 0
	C_trxCommit   = 1
	C_trxRollback = 2
	C_trxProcess  = -1

	C_reProcess  = 0
	C_reContinue = 1
	C_reBreak    = 2
	C_reFileEnd  = 3
)
View Source
const (
	//PRIMARY_KEY_LABLE = "primary"
	//UNIQUE_KEY_LABLE  = "unique"
	KEY_BINLOG_POS_SEP = "/"
	KEY_DB_TABLE_SEP   = "."
	KEY_NONE_BINLOG    = "_"
)

Variables

View Source
var (
	GConfCmd            *ConfCmd = &ConfCmd{}
	GBinlogTimeLocation *time.Location

	GUseDatabase string = ""

	GOptsValidMode      []string = []string{"repl", "file"}
	GOptsValidWorkType  []string = []string{"2sql", "rollback", "stats"}
	GOptsValidMysqlType []string = []string{"mysql", "mariadb"}
	GOptsValidFilterSql []string = []string{"insert", "update", "delete"}

	GOptsValueRange map[string][]int = map[string][]int{
		"PrintInterval":  []int{1, 600, 30},
		"BigTrxRowLimit": []int{1, 30000, 10},
		"LongTrxSeconds": []int{0, 3600, 1},
		"InsertRows":     []int{1, 500, 30},
		"Threads":        []int{1, 16, 2},
	}

	GStatsColumns []string = []string{
		"StartTime", "StopTime", "Binlog", "PosRange",
		"Database", "Table",
		"BigTrxs", "BiggestTrx", "LongTrxs", "LongestTrx",
		"Inserts", "Updates", "Deletes", "Trxs", "Statements",
		"Renames", "RenamePoses", "Ddls", "DdlPoses",
	}

	GDdlPrintHeader []string = []string{"datetime", "binlog", "startposition", "stopposition", "sql"}
)
View Source
var (
	//gDdlRegexp *regexp.Regexp = regexp.MustCompile(C_ddlRegexp)
	Stats_Result_Header_Column_names []string = []string{"binlog", "starttime", "stoptime",
		"startpos", "stoppos", "inserts", "updates", "deletes", "database", "table"}
	Stats_DDL_Header_Column_names        []string = []string{"datetime", "binlog", "startpos", "stoppos", "sql"}
	Stats_BigLongTrx_Header_Column_names []string = []string{"binlog", "starttime", "stoptime", "startpos", "stoppos", "rows", "duration", "tables"}
)
View Source
var G_Bytes_Column_Types []string = []string{"blob", "json", "geometry", C_unknownColType}

Functions

func CheckBinHeaderCondition

func CheckBinHeaderCondition(cfg *ConfCmd, header *replication.EventHeader, currentBinlog string) int

func CheckElementOfSliceInt

func CheckElementOfSliceInt(arr []int, element int, prefix string, ifExt bool) bool

func CheckElementOfSliceStr

func CheckElementOfSliceStr(arr []string, element string, prefix string, ifExt bool) bool

func CheckIsDir

func CheckIsDir(fd string) (bool, string)

func CommaSeparatedListToArray

func CommaSeparatedListToArray(str string) []string

func CompareBinlogPos

func CompareBinlogPos(sBinFile string, sPos uint, eBinFile string, ePos uint) int

func CompareEquelByteSlice

func CompareEquelByteSlice(s1 []byte, s2 []byte) bool

func ConvertRowToExpressRow

func ConvertRowToExpressRow(row []interface{}, ifIgnorePrimary bool, primaryIdx []int) []SQL.Expression

func ConvertStrArrToIntferfaceArrForPrint

func ConvertStrArrToIntferfaceArrForPrint(arr []string) []interface{}

func CreateMysqlCon

func CreateMysqlCon(mysqlUrl string) (*sql.DB, error)

func GenDeleteSqlsForOneRowsEvent

func GenDeleteSqlsForOneRowsEvent(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool, ifRollback bool, ifprefixDb bool) []string

func GenDeleteSqlsForOneRowsEventRollbackInsert

func GenDeleteSqlsForOneRowsEventRollbackInsert(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool, ifprefixDb bool) []string

func GenEqualConditions

func GenEqualConditions(row []interface{}, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool) []SQL.BoolExpression

func GenForwardRollbackSqlFromBinEvent

func GenForwardRollbackSqlFromBinEvent(i uint, cfg *ConfCmd, wg *sync.WaitGroup)

func GenInsertSqlForRows

func GenInsertSqlForRows(rows [][]interface{}, insertSql SQL.InsertStatement, schema string, ifprefixDb bool, ifIgnorePrimary bool, primaryIdx []int) (string, error)

func GenInsertSqlsForOneRowsEvent

func GenInsertSqlsForOneRowsEvent(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, rowsPerSql int, ifRollback bool, ifprefixDb bool, ifIgnorePrimary bool, primaryIdx []int) []string

func GenInsertSqlsForOneRowsEventRollbackDelete

func GenInsertSqlsForOneRowsEventRollbackDelete(posStr string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, rowsPerSql int, ifprefixDb bool) []string

func GenUpdateSetPart

func GenUpdateSetPart(colsTypeNameFromMysql []string, colTypeNames []string, updateSql SQL.UpdateStatement, colDefs []SQL.NonAliasColumn, rowAfter []interface{}, rowBefore []interface{}, ifFullImage bool) SQL.UpdateStatement

func GenUpdateSqlsForOneRowsEvent

func GenUpdateSqlsForOneRowsEvent(posStr string, colsTypeNameFromMysql []string, colsTypeName []string, rEv *replication.RowsEvent, colDefs []SQL.NonAliasColumn, uniKey []int, ifFullImage bool, ifRollback bool, ifprefixDb bool) []string

func GetAbsTableName

func GetAbsTableName(schema, table string) string

func GetBigLongTrxContentLine

func GetBigLongTrxContentLine(blTrx BigLongTrxInfo) string

func GetBigLongTrxPrintHeaderLine

func GetBigLongTrxPrintHeaderLine(headers []string) string

func GetBigLongTrxStatementsStr

func GetBigLongTrxStatementsStr(st map[string]map[string]uint32) string

func GetBinlogBasenameAndIndex

func GetBinlogBasenameAndIndex(binlog string) (string, int)

func GetBinlogPosAsKey

func GetBinlogPosAsKey(binlog string, spos, epos uint32) string

func GetColDefIgnorePrimary

func GetColDefIgnorePrimary(colDefs []SQL.NonAliasColumn, primaryIdx []int) []SQL.NonAliasColumn

func GetColIndexFromKey

func GetColIndexFromKey(ki KeyInfo, columns []FieldInfo) []int

func GetDatetimeStr

func GetDatetimeStr(sec int64, nsec int64, timeFmt string) string

func GetDbTbAndQueryAndRowCntFromBinevent

func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32)

func GetDbTbFromAbsTbName

func GetDbTbFromAbsTbName(name string) (string, string)

func GetDroppedFieldName

func GetDroppedFieldName(idx int) string

func GetFiledType

func GetFiledType(filed string) string

func GetFirstBinlogPosToParse

func GetFirstBinlogPosToParse(cfg *ConfCmd) (string, int64)

func GetForwardRollbackContentLineWithExtra

func GetForwardRollbackContentLineWithExtra(sq ForwardRollbackSqlOfPrint, ifExtra bool) string

func GetForwardRollbackSqlFileName

func GetForwardRollbackSqlFileName(schema string, table string, filePerTable bool, outDir string, ifRollback bool, binlog string, ifTmp bool) string

func GetLineHeaderStrFromColumnNamesArr

func GetLineHeaderStrFromColumnNamesArr(arr []string, sep string) string

func GetMaxValue

func GetMaxValue(nums ...int) int

func GetMinValue

func GetMinValue(nums ...int) int

func GetMysqlDataTypeNameAndSqlColumn

func GetMysqlDataTypeNameAndSqlColumn(tpDef string, colName string, tp byte, meta uint16) (string, SQL.NonAliasColumn)

func GetMysqlUrl

func GetMysqlUrl(cfg *ConfCmd) string

func GetNextBinlog

func GetNextBinlog(baseName string, indx int) string

func GetPosStr

func GetPosStr(name string, spos uint32, epos uint32) string

func GetSqlFieldsEXpressions

func GetSqlFieldsEXpressions(colCnt int, colNames []FieldInfo, tbMap *replication.TableMapEvent) ([]SQL.NonAliasColumn, []string)

func GetStatsPrintContentLine

func GetStatsPrintContentLine(st *BinEventStatsPrint) string

func GetStatsPrintHeaderLine

func GetStatsPrintHeaderLine(headers []string) string

func GetSystemHomeNameAndAdderss

func GetSystemHomeNameAndAdderss() (hostname string, address string)

获取 系统的hostname 和 ip地址

func IntSliceToString

func IntSliceToString(iArr []int, sep string, prefix string) string

func IsUnsigned

func IsUnsigned(filed string) bool

func NewReplBinlogStreamer

func NewReplBinlogStreamer(cfg *ConfCmd) *replication.BinlogStreamer

func ParserAllBinEventsFromRepl

func ParserAllBinEventsFromRepl(cfg *ConfCmd)

func PrintExtraInfoForForwardRollbackupSql

func PrintExtraInfoForForwardRollbackupSql(cfg *ConfCmd, wg *sync.WaitGroup)

func ProcessBinEventStats

func ProcessBinEventStats(cfg *ConfCmd, wg *sync.WaitGroup)

func ReverseFileGo

func ReverseFileGo(threadIdx int, rollbackFileChan chan map[string]string, bytesCntFiles map[string][][]int, keepTrx bool, wg *sync.WaitGroup)

func ReverseFileToNewFileOneByOneLineAndKeepTrxBatchRead

func ReverseFileToNewFileOneByOneLineAndKeepTrxBatchRead(srcFile string, destFile string, trxPoses [][]int, keepTrx bool) error

func SendBinlogEventRepl

func SendBinlogEventRepl(cfg *ConfCmd)

func StrSliceToString

func StrSliceToString(sArr []string, sep, prefix string) string

Types

type BigLongTrxInfo

type BigLongTrxInfo struct {
	//IsBig bool
	//IsLong bool
	StartTime  uint32
	StopTime   uint32
	Binlog     string
	StartPos   uint32
	StopPos    uint32
	RowCnt     uint32                       // total row count for all statement
	Duration   uint32                       // how long the trx lasts
	Statements map[string]map[string]uint32 // rowcnt for each type statment: insert, update, delete. {db1.tb1:{insert:0, update:2, delete:10}}

}

type BinEventHandlingIndx

type BinEventHandlingIndx struct {
	EventIdx uint64

	Finished bool
	// contains filtered or unexported fields
}
var (
	G_HandlingBinEventIndex *BinEventHandlingIndx
)

type BinEventStats

type BinEventStats struct {
	Timestamp     uint32
	Binlog        string
	StartPos      uint32
	StopPos       uint32
	Database      string
	Table         string
	QueryType     string // query, insert, update, delete
	RowCnt        uint32
	QuerySql      string        // for type=query
	ParsedSqlInfo *dsql.SqlInfo // for ddl
}

type BinEventStatsPrint

type BinEventStatsPrint struct {
	Binlog    string
	StartTime uint32
	StopTime  uint32
	StartPos  uint32
	StopPos   uint32
	Database  string
	Table     string
	Inserts   uint32
	Updates   uint32
	Deletes   uint32
}

type BinFileParser

type BinFileParser struct {
	Parser *replication.BinlogParser
}

type ConfCmd

type ConfCmd struct {
	Mode      string
	WorkType  string
	MysqlType string

	Host     string
	Port     uint
	User     string
	Passwd   string
	ServerId uint

	Databases []string
	Tables    []string
	//DatabaseRegs []*regexp.Regexp
	//ifHasDbReg   bool
	//TableRegs    []*regexp.Regexp
	//ifHasTbReg   bool
	IgnoreDatabases []string
	IgnoreTables    []string
	FilterSql       []string
	FilterSqlLen    int

	StartFile         string
	StartPos          uint
	StartFilePos      mysql.Position
	IfSetStartFilePos bool

	StopFile         string
	StopPos          uint
	StopFilePos      mysql.Position
	IfSetStopFilePos bool

	StartDatetime      uint32
	StopDatetime       uint32
	BinlogTimeLocation string

	IfSetStartDateTime bool
	IfSetStopDateTime  bool

	MaxFileNumber int

	LocalBinFile string

	OutputToScreen bool
	PrintInterval  int
	BigTrxRowLimit int
	LongTrxSeconds int

	IfSetStopParsPoint bool

	OutputDir string

	//MinColumns     bool
	FullColumns    bool
	InsertRows     int
	KeepTrx        bool
	SqlTblPrefixDb bool
	FilePerTable   bool

	PrintExtraInfo bool

	Threads uint

	ReadTblDefJsonFile string
	OnlyColFromFile    bool
	DumpTblDefToFile   string

	BinlogDir string

	GivenBinlogFile string

	UseUniqueKeyFirst         bool
	IgnorePrimaryKeyForInsert bool
	ReplaceIntoForInsert      bool

	//DdlRegexp string
	ParseStatementSql bool

	IgnoreParsedErrForSql string // if parsed error, for sql match this regexp, only print error info, but not exits
	IgnoreParsedErrRegexp *regexp.Regexp

	EventChan  chan MyBinEvent
	StatChan   chan BinEventStats
	OrgSqlChan chan OrgSqlPrint
	SqlChan    chan ForwardRollbackSqlOfPrint

	StatFH *os.File
	//DdlFH     *os.File
	BiglongFH *os.File

	BinlogStreamer *replication.BinlogStreamer
	FromDB         *sql.DB
}

func (*ConfCmd) CheckCmdOptions

func (this *ConfCmd) CheckCmdOptions()

func (*ConfCmd) CheckRequiredOption

func (this *ConfCmd) CheckRequiredOption(v interface{}, prefix string, ifExt bool) bool

func (*ConfCmd) CheckValueInRange

func (this *ConfCmd) CheckValueInRange(opt string, val int, prefix string, ifExt bool) bool

func (*ConfCmd) CloseChan

func (this *ConfCmd) CloseChan()

func (*ConfCmd) CloseFH

func (this *ConfCmd) CloseFH()

func (*ConfCmd) CreateDB

func (this *ConfCmd) CreateDB()

func (*ConfCmd) GetDefaultAndRangeValueMsg

func (this *ConfCmd) GetDefaultAndRangeValueMsg(opt string) string

func (*ConfCmd) GetDefaultValueOfRange

func (this *ConfCmd) GetDefaultValueOfRange(opt string) int

func (*ConfCmd) GetMaxValueOfRange

func (this *ConfCmd) GetMaxValueOfRange(opt string) int

func (*ConfCmd) GetMinValueOfRange

func (this *ConfCmd) GetMinValueOfRange(opt string) int

func (*ConfCmd) IsTargetDml

func (this *ConfCmd) IsTargetDml(dml string) bool

func (*ConfCmd) OpenStatsResultFiles

func (this *ConfCmd) OpenStatsResultFiles()

func (*ConfCmd) OpenTxResultFiles

func (this *ConfCmd) OpenTxResultFiles()

func (*ConfCmd) ParseCmdOptions

func (this *ConfCmd) ParseCmdOptions()

func (*ConfCmd) PrintUsageMsg

func (this *ConfCmd) PrintUsageMsg()

type DdlPosInfo

type DdlPosInfo struct {
	Binlog   string `json:"binlog"`
	StartPos uint32 `json:"start_position"`
	StopPos  uint32 `json:"stop_position"`
	DdlSql   string `json:"ddl_sql"`
}

type ExtraSqlInfoOfPrint

type ExtraSqlInfoOfPrint struct {
	// contains filtered or unexported fields
}

type FieldInfo

type FieldInfo struct {
	FieldName  string `json:"column_name"`
	FieldType  string `json:"column_type"`
	IsUnsigned bool   `json:"is_unsigned"`
}

func GetAllFieldNamesWithDroppedFields

func GetAllFieldNamesWithDroppedFields(rowLen int, colNames []FieldInfo) []FieldInfo

type ForwardRollbackSqlOfPrint

type ForwardRollbackSqlOfPrint struct {
	// contains filtered or unexported fields
}

type KeyInfo

type KeyInfo []string

{colname1, colname2}

type MyBinEvent

type MyBinEvent struct {
	MyPos       mysql.Position //this is the end position
	EventIdx    uint64
	BinEvent    *replication.RowsEvent
	StartPos    uint32 // this is the start position
	IfRowsEvent bool
	SqlType     string // insert, update, delete
	Timestamp   uint32
	TrxIndex    uint64
	TrxStatus   int           // 0:begin, 1: commit, 2: rollback, -1: in_progress
	QuerySql    *dsql.SqlInfo // for ddl and binlog which is not row format
	OrgSql      string        // for ddl and binlog which is not row format
}

func (*MyBinEvent) CheckBinEvent

func (this *MyBinEvent) CheckBinEvent(cfg *ConfCmd, ev *replication.BinlogEvent, currentBinlog *string) int

type OrgSqlPrint

type OrgSqlPrint struct {
	Binlog   string
	StartPos uint32
	StopPos  uint32
	DateTime uint32
	QuerySql string
}

type TablesColumnsInfo

type TablesColumnsInfo struct {
	// contains filtered or unexported fields
}
var (
	G_TablesColumnsInfo TablesColumnsInfo
)

func (*TablesColumnsInfo) GetTableColumns

func (this *TablesColumnsInfo) GetTableColumns(db *sql.DB, dbname string, tbname string) error

func (*TablesColumnsInfo) GetTableInfoJson

func (this *TablesColumnsInfo) GetTableInfoJson(schema string, table string) (*TblInfoJson, error)

func (*TablesColumnsInfo) GetTableKeysInfo

func (this *TablesColumnsInfo) GetTableKeysInfo(db *sql.DB, dbName string, tbName string) error

func (*TablesColumnsInfo) GetTbDefFromDb

func (this *TablesColumnsInfo) GetTbDefFromDb(cfg *ConfCmd, dbname string, tbname string)

type TblInfoJson

type TblInfoJson struct {
	Database   string      `json:"database"`
	Table      string      `json:"table"`
	Columns    []FieldInfo `json:"columns"`
	PrimaryKey KeyInfo     `json:"primary_key"`
	UniqueKeys []KeyInfo   `json:"unique_keys"`
}

func (*TblInfoJson) GetOneUniqueKey

func (this *TblInfoJson) GetOneUniqueKey(uniqueFirst bool) KeyInfo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL