Documentation
¶
Index ¶
- Constants
- Variables
- func ByteSlicePut(data *ByteSlice)
- func ByteSliceToString(b []byte) string
- func CheckPcsFile(p string) (bool, error)
- func CheckPid(pid int) bool
- func ConvertColumnValType(data interface{}) ([]byte, error)
- func ConvertPoolGet() (data *bytes.Buffer)
- func ConvertPoolPut(data *bytes.Buffer)
- func ConvertPositionToNumber(pos *mysql.Position) (s *uint64, p *uint64, err error)
- func DataRowsBufferGet() (data *bytes.Buffer)
- func DataRowsBufferPut(data *bytes.Buffer)
- func DataStreamLagTime(first uint64) string
- func ErrorCheckOfRecover(n interface{}, log *logrus.Logger)
- func Float64ToUint64(val float64) uint64
- func GetAllFileFullPath(pathname string, suffix string) (s []string, e error)
- func GetAllGroupFileName(pathname string, suffix string) (s []string, e error)
- func GetAvailablePort() (int, error)
- func GetFunctionName(i interface{}, seps ...rune) string
- func GetHomeDirectory() (s *string, err error)
- func HasPrefixIgnoreCase(s, prefix string) bool
- func HeadBufferGet() (data *bytes.Buffer)
- func HeadBufferPut(data *bytes.Buffer)
- func Int64ToUint64(val int64) uint64
- func IsFileExist(path string) bool
- func KeyCheck(s *string) bool
- func MetaDataBufferGet() (data *bytes.Buffer)
- func MetaDataBufferPut(data *bytes.Buffer)
- func NanoSecondConvertToTime(t uint64) string
- func NestedBufferGet() (data *bytes.Buffer)
- func NestedBufferPut(data *bytes.Buffer)
- func OuterBufferGet() (data *bytes.Buffer)
- func OuterBufferPut(data *bytes.Buffer)
- func ParseNLSLANG(p string) (language, territory, charset string, err error)
- func PathExists(path string) bool
- func ReadLine(fileName string) (res []string, err error)
- func RowBufferGet() (data *bytes.Buffer)
- func RowBufferPut(data *bytes.Buffer)
- func SliceToString(kv []string, sp string) *string
- func StringToByteSlice(s *string) []byte
- func TimeDifferForCurrentTime(first uint64) string
- func ToBackground()
- func TrimKeySpace(s []string) []string
- func UInt16ToBytes(i uint16) []byte
- func Uint64ToFloat64(val uint64) float64
- func Uint64ToInt64(val uint64) int64
- type ByteSlice
- type GroupInfo
- type ProcessFile
- type ProcessInfo
Constants ¶
const ( MySQL = "MySQL" Oracle = "Oracle" Extract = "EXTRACT" Replicat = "REPLICAT" MinRpcPort = 37401 MaxRpcPort = MinRpcPort + 500 )
支持的数据库和进程类型
const ( PROGRAM = "PROGRAM" PROCESSID = "PROCESSID" PORT = "PORT" PID = "PID" DBTYPE = "DBTYPE" PROCESSTYPE = "PROCESSTYPE" STATUS = "STATUS" LASTSTARTED = "LAST STARTED" FILENUMBER = "FILE NUMBER" LOGNUMBER = "LOG NUMBER" OFFSET = "OFFSET" RUNNING = "RUNNING" STARTING = "STARTING" STOPPING = "STOPPING" STOPPED = "STOPPED" ABENDED = "ABENDED" )
进程文件和组文件使用
const (
DefaultTimeFormat = "2006-01-02 15:04:05.9999 Z07:00"
)
const (
TooBigBlockSize = 1024 * 1024 * 4
)
Variables ¶
var ( ProcessType = "PROCESS" // 参数类型 ProcessRegular = "(^)(?i:(" + ProcessType + "))(\\s+)((?:[A-Za-z0-9_]){4,12})($)" )
process参数
var ( SourceDBType = "SOURCEDB" // 参数类型 Port = "PORT" // 端口关键字 DataBase = "DATABASE" // 默认连接的数据库 Types = "TYPE" // 库类型,可选mysql mariadb UserId = "USERID" // 连接用户 PassWord = "PASSWORD" // 连接密码 ServerId = "SERVERID" // mysql server id Retry = "RETRY" // 连接重试最大 Character = "CHARACTER" // 客户端字符集关键字 Collation = "COLLATION" // 客户端字符集排序 TimeZone = "TIMEZONE" // 客户端时区 DefaultPort uint16 = 3306 // 默认端口 DefaultDataBase = "test" // 默认连接数据库 DefaultTypes = "mysql" // 默认库类型 DefaultUserId = "root" // 默认用户名 DefaultMaxRetryConnect = 3 DefaultClientCharacter = "utf8mb4" DefaultClientCollation = "utf8mb4_general_ci" DefaultTimeZone, _ = time.LoadLocation("Asia/Shanghai") )
sourcedb参数
var ( TrailDirType = "TRAILDIR" // 参数类型 TrailSizeKey = "SIZE" // size关键字 TrailKeepKey = "KEEP" // keey 关键字 MB = "MB" GB = "GB" DAY = "DAY" DefaultTrailMaxSize = 128 // 默认trail文件的最大, 单位是M, 单位不可更改 DefaultTrailMinSize = 16 // 默认trail文件的最小 DefaultTrailKeepValue = 7 // 默认trail文件保留时间,默认是天 )
traildir 参数
var ( DiscardFileType = "DISCARDFILE" DiscardFileRegular = "(^)(?i:(" + DiscardFileType + "))(\\s+)((.+))($)" )
discardfile 参数
var ( DBOptionsType = "DBOPTIONS" SuppressionTrigger = "SUPPRESSIONTRIGGER" // 表操作时抑制触发器 IgnoreReplicates = "IGNOREREPLICATES" // 忽略复制进程执行的操作 GetReplicates = "GETREPLICATES" // 获取复制进程的操作 IgnoreForeignkey = "IGNOREFOREIGNKEY" // 忽略外键约束 )
dboptions 参数
var ( TableType = "TABLE" TableRegular = "(^)(?i:(" + TableType + "))(\\s+)((\\S+)(\\.)(\\S+\\s*)(;))($)" )
TABLE 参数
var ( TableExcludeType = "TABLEEXCLUDE" TableExcludeRegular = "(^)(?i:(" + TableExcludeType + "))(\\s+)((\\S+)(\\.)(\\S+\\s*)(;))($)" )
TABLEExclude 参数
var ( OUserIDType = "USERID" // 参数类型 OPort = "PORT" // 端口关键字 OSid = "SID" // 端口关键字 OUser = "USER" // 连接用户 OPassWord = "PASSWORD" // 连接密码 ORetry = "RETRY" // 连接重试最大 OCharacter = "CHARACTER" // 客户端字符集关键字 OTimeZone = "TIMEZONE" // 客户端字符集关键字 ODefaultPort uint16 = 1521 // 默认端口 ODefaultMaxRetryConnect = 3 ODefaultTimeZone = "Asia/Shanghai" )
UserID参数
var ( DDL = "DDL" INCLUDE = "INCLUDE" EXCLUDE = "EXCLUDE" OPTYPE = "OPTYPE" OBJTYPE = "OBJTYPE" OBJNAME = "OBJNAME" // 操作类型 CREATE = "CREATE" ALTER = "ALTER" DROP = "DROP" // 对象类型 TABLE = "TABLE" INDEX = "INDEX" TRIGGER = "TRIGGER" SEQUENCE = "SEQUENCE" VIEW = "VIEW" FUNCTION = "FUNCTION" PACKAGE = "PACKAGE" PROCEDURE = "PROCEDURE" TYPE = "TYPE" ROLE = "ROLE" USER = "USER" EVENT = "EVENT" DATABASE = "DATABASE" )
var (
ConvertPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }}
)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
var (
DataRowsBufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }} // 数据行缓存(包括操作、行头、元数据、行数据)
)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 完整的数据行缓存
var GlobalProcessID string
var (
HeadDataBufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }} // 头数据缓存
)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
var (
IsNotValue = make([]byte, 0)
)
var (
MetaDataBufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }} // 元数据缓存
)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
var (
NestedBytesPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }} // 列嵌套内缓存
)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
var (
OuterBufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }} // 列嵌套外缓存
)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
var (
RowBufferPool = sync.Pool{New: func() interface{} { return &bytes.Buffer{} }} // 行缓存
)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Functions ¶
func ByteSlicePut ¶
func ByteSlicePut(data *ByteSlice)
func ByteSliceToString ¶
func CheckPcsFile ¶
func ConvertColumnValType ¶
func ConvertPoolGet ¶
func ConvertPoolPut ¶
func ConvertPositionToNumber ¶
func DataRowsBufferGet ¶
func DataRowsBufferPut ¶
func ErrorCheckOfRecover ¶
func Float64ToUint64 ¶
func GetAllFileFullPath ¶
func GetAllGroupFileName ¶
func GetAvailablePort ¶
func GetFunctionName ¶
======================================================================================================= 自定义Panic异常处理,调用方式: 例如Test()函数, 指定defer ErrorCheckOfRecover(Test)
func HasPrefixIgnoreCase ¶
func HeadBufferGet ¶
func HeadBufferPut ¶
func Int64ToUint64 ¶
func MetaDataBufferGet ¶
func MetaDataBufferPut ¶
func NanoSecondConvertToTime ¶
func NestedBufferGet ¶
func NestedBufferPut ¶
func OuterBufferGet ¶
func OuterBufferPut ¶
func ParseNLSLANG ¶
func RowBufferGet ¶
func RowBufferPut ¶
func StringToByteSlice ¶
func TimeDifferForCurrentTime ¶
传入纳秒, 计算事务做检查点的lag耗时
func ToBackground ¶
func ToBackground()
func TrimKeySpace ¶
func UInt16ToBytes ¶
func Uint64ToFloat64 ¶
func Uint64ToInt64 ¶
Types ¶
type ProcessFile ¶
type ProcessFile struct { File string // 进程文件名 PROGRAM string // 进程类型 PROCESSID string // 进程名称 PORT string // RPC port PID string // 进程PID STATUS string // 运行状态 DBTYPE string // 数据库类型 }
func GetProcessAttribute ¶
func GetProcessAttribute(ct []string) (*ProcessFile, error)
type ProcessInfo ¶
type ProcessInfo struct { Groups *GroupInfo // 组id信息 CheckPointFilePath string // 检查点文件路径 Process *ProcessFile // 组ID对应的进程文件信息 }