Documentation ¶
Index ¶
- Constants
- Variables
- func BuildEqualsComparison(columns []*Column, values []string) (string, error)
- func BuildValueComparison(column string, value string, comparisonSign ValueComparisonSign) (string, error)
- func EscapeName(name string) string
- func ExecBinlogTrx(ctx context.Context, opts *ExecOpt, binlogTrx *Trx)
- func HasPk(columns []*Column) bool
- func SetCheckpointDBandTableName(groupId string) error
- type ApplierConfig
- type ApplierServer
- type ApplyProgress
- type AssignedQueue
- type Column
- type ColumnType
- type Coordinator
- type Event
- func (e *Event) BuildColumnsPreparedValues() (string, error)
- func (e *Event) BuildDeleteSQL(gtid string, beginOffset int64) ([]*ExecutableSQL, error)
- func (e *Event) BuildEqualsPreparedComparison(i int) (string, error)
- func (e *Event) BuildInsertSQL(gtid string, beginOffset int64) ([]*ExecutableSQL, error)
- func (e *Event) BuildSetPreparedClause() (string, error)
- func (e *Event) BuildUpdateSQL(gtid string, beginOffset int64) ([]*ExecutableSQL, error)
- type EventType
- type ExecOpt
- type ExecResult
- type ExecutableSQL
- func (s *ExecutableSQL) HandDeleteConflict(tx *sql.Tx, strategy StrategyType, updateTimeColumn string) error
- func (s *ExecutableSQL) HandInsertConflict(tx *sql.Tx, strategy StrategyType, updateTimeColumn string) error
- func (s *ExecutableSQL) HandleUpdateConflict(tx *sql.Tx, strategy StrategyType, updateTimeColumn string) error
- type Fetcher
- type PkgSn
- type RdpDecoder
- type RdpPkg
- type ReorderWindow
- type Row
- type StrategyType
- type Trx
- type ValueComparisonSign
Constants ¶
View Source
const ( MYSQL_TYPE_DECIMAL ColumnType = iota MYSQL_TYPE_TINY MYSQL_TYPE_SHORT MYSQL_TYPE_LONG MYSQL_TYPE_FLOAT MYSQL_TYPE_DOUBLE MYSQL_TYPE_NULL MYSQL_TYPE_TIMESTAMP MYSQL_TYPE_LONGLONG MYSQL_TYPE_INT24 MYSQL_TYPE_DATE MYSQL_TYPE_TIME MYSQL_TYPE_DATETIME MYSQL_TYPE_YEAR MYSQL_TYPE_NEWDATE MYSQL_TYPE_VARCHAR MYSQL_TYPE_BIT MYSQL_TYPE_TIMESTAMP2 MYSQL_TYPE_DATETIME2 MYSQL_TYPE_TIME2 MYSQL_TYPE_JSON = 245 MYSQL_TYPE_NEWDECIMAL = 246 MYSQL_TYPE_ENUM = 247 MYSQL_TYPE_SET = 248 MYSQL_TYPE_TINY_BLOB = 249 MYSQL_TYPE_MEDIUM_BLOB = 250 MYSQL_TYPE_LONG_BLOB = 251 MYSQL_TYPE_BLOB = 252 MYSQL_TYPE_VAR_STRING = 253 MYSQL_TYPE_STRING = 254 MYSQL_TYPE_GEOMETRY = 255 )
View Source
const ( StatusKey_ApplyProgress = "apply_progress" StatusKey_FailedTrx = "failed_trx" )
View Source
const ( //基于最新时间优先策略,如果时间相等则利用Applier中的记录覆盖原记录 TimeOverwriteStrategy = "time_overwrite" //基于最新时间优先策略,如果时间相等则忽略Applier中的记录 TimeIgnoreStrategy = "time_ignore" IgnoreStrategy = "ignore" OverwriteStrategy = "overwrite" )
View Source
const ( QUERY_EVENT EventType = 2 WRITE_ROWS_EVENT = 30 UPDATE_ROWS_EVENT = 31 DELETE_ROWS_EVENT = 32 GTID_LOG_EVENT = 33 )
View Source
const (
CheckpointTableCount = 1000
)
View Source
const (
CheckpointTablePrefix = "drc_ckt_"
)
Variables ¶
View Source
var ( CheckpointDatabase = "" CheckpointTable = "" )
View Source
var ( ErrNotDeleteType = errors.New("applier: event is not delete type") ErrNotUpdateType = errors.New("applier: event is not update type") ErrNotInsertType = errors.New("applier: event is not insert type") ErrColumnArrayIsNil = errors.New("applier: column array is nil") ErrRowArrayIsNil = errors.New("applier: row array is nil") ErrColumnIsNil = errors.New("applier: column array is nil") ErrValueIsNil = errors.New("applier: value is nil") ErrArrayLengthNotEqual = errors.New("applier: array length is not equal") ErrArgsIsNil = errors.New("applier: args is nil") ErrUnsupportedRdpMessage = errors.New("applier: unsupported rdp messages") ErrIncontinuousRdpMessage = errors.New("applier: incontinuous rdp messages") ErrNeedToPeekTrx = errors.New("applier: need to peek the trx") ErrUpdateCheckpointTable = errors.New("applier: update checkpoint table error") ErrColumnCountNotEqual = errors.New("applier: column count not equal") ErrTimeIsZero = errors.New("applier: time is zero") ErrNoPK = errors.New("applier: no primary key") ErrorRowsAffectedCount = errors.New("applier: the count of affected row is not expected") ErrStrategyNotSupport = errors.New("applier: this strategy of handling conflict is not support") ErrrUnexpect = errors.New("applier: unexpected error") ErrTimeType = errors.New("applier: time type is not expected") ErrEventType = errors.New("applier: event type is not expected") ErrLargeTrx = errors.New("applier: large trx has been filtered by rdp") ErrIllegalTrx = errors.New("applier: illegal trx that is not allowed") ErrNotEqual = errors.New("applier:args count not equal") ErrSkip = errors.New("applier: skip fast-path; continue as if unimplemented") ErrServerIdChange = errors.New("applier: server_id has changed") )
View Source
var (
CAQ_UNDEF_INDEX = ^uint64(0)
)
Functions ¶
func BuildEqualsComparison ¶
func BuildValueComparison ¶
func BuildValueComparison(column string, value string, comparisonSign ValueComparisonSign) (string, error)
func EscapeName ¶
Types ¶
type ApplierConfig ¶
type ApplierConfig struct { KafkaBrokerList string KafkaTopic string KafkaPartition int KafkaVersion string KafkaOffset int64 // MySQL 配置 MysqlHost string MysqlPort int MysqlUser string MysqlPasswd string MysqlCharset string MysqlConnectTimeout int MysqlReadTimeout int MysqlWriteTimeout int MysqlWaitTimeout int // 冲突处理 HandleConflictStrategy string //时间冲突处理比较字段 UpdateTimeColumn string // 是否允许DDL AllowDDL bool // 最大消费速率, kb/s MaxRate int // 乱序pkg数量累计一定程度后的告警阈值 IncontinuousRdpPkgThreshold int // 慢事务的时间阈值 SlowTrxThreshold int }
func (*ApplierConfig) CheckArgs ¶
func (cfg *ApplierConfig) CheckArgs()
type ApplierServer ¶
type ApplierServer struct {
// contains filtered or unexported fields
}
func NewApplierServer ¶
func NewApplierServer(config *ApplierConfig) *ApplierServer
func (*ApplierServer) QuitNotify ¶
func (o *ApplierServer) QuitNotify() <-chan struct{}
func (*ApplierServer) Retry ¶
func (o *ApplierServer) Retry(gtid string)
func (*ApplierServer) RetryTrx ¶
func (o *ApplierServer) RetryTrx(gtid string)
func (*ApplierServer) SkipTrx ¶
func (o *ApplierServer) SkipTrx(gtid string)
func (*ApplierServer) Start ¶
func (o *ApplierServer) Start() error
func (*ApplierServer) Stop ¶
func (o *ApplierServer) Stop() error
type ApplyProgress ¶
type AssignedQueue ¶
type AssignedQueue struct {
// contains filtered or unexported fields
}
type Column ¶
type Column struct { // 名称 Name string // int, varchar, blob等 Type string BinlogType ColumnType // 内容 Value interface{} // 是否为pk IsPk bool // 是否为NULL IsNull bool }
Column
type ColumnType ¶
type ColumnType byte
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
func NewCoordinator ¶
func NewCoordinator(config *ApplierConfig, trxC <-chan *Trx, retryC <-chan string, skipC <-chan string) *Coordinator
func (*Coordinator) QuitNotify ¶
func (o *Coordinator) QuitNotify() <-chan struct{}
func (*Coordinator) Start ¶
func (o *Coordinator) Start() error
func (*Coordinator) Stop ¶
func (o *Coordinator) Stop() error
type Event ¶
type Event struct { DatabaseName string TableName string EventType EventType // 数据库的binlog时间 Timestamp uint64 // RDP处理binlog的时间 TimestampOfReceipt uint64 // 在binlog文件中的位置 Position uint64 // 下一条binlog的位置 NextPosition uint64 // binlog文件名 BinlogFileName string ServerId uint64 // Query对应的SQL语句 SqlStatement string Rows []*Row }
func (*Event) BuildColumnsPreparedValues ¶
构造insert中列信息
func (*Event) BuildDeleteSQL ¶
func (e *Event) BuildDeleteSQL(gtid string, beginOffset int64) ([]*ExecutableSQL, error)
func (*Event) BuildEqualsPreparedComparison ¶
构造where条件
func (*Event) BuildInsertSQL ¶
func (e *Event) BuildInsertSQL(gtid string, beginOffset int64) ([]*ExecutableSQL, error)
func (*Event) BuildSetPreparedClause ¶
构造update中set子句
func (*Event) BuildUpdateSQL ¶
func (e *Event) BuildUpdateSQL(gtid string, beginOffset int64) ([]*ExecutableSQL, error)
type ExecResult ¶
type ExecutableSQL ¶
type ExecutableSQL struct { Query string Args []interface{} TrxGtid string TrxBeginOffset int64 EventType EventType DatabaseName string TableName string RowValue *Row }
func (*ExecutableSQL) HandDeleteConflict ¶
func (s *ExecutableSQL) HandDeleteConflict(tx *sql.Tx, strategy StrategyType, updateTimeColumn string) error
func (*ExecutableSQL) HandInsertConflict ¶
func (s *ExecutableSQL) HandInsertConflict(tx *sql.Tx, strategy StrategyType, updateTimeColumn string) error
func (*ExecutableSQL) HandleUpdateConflict ¶
func (s *ExecutableSQL) HandleUpdateConflict(tx *sql.Tx, strategy StrategyType, updateTimeColumn string) error
type Fetcher ¶
type Fetcher struct {
// contains filtered or unexported fields
}
func NewFetcher ¶
func NewFetcher(config *ApplierConfig) *Fetcher
func (*Fetcher) NewTrxNotify ¶
func (*Fetcher) QuitNotify ¶
func (o *Fetcher) QuitNotify() <-chan struct{}
type RdpDecoder ¶
type RdpDecoder struct {
// contains filtered or unexported fields
}
func NewRdpDecoder ¶
func NewRdpDecoder(incontinuousRdpPkgThreshold int) *RdpDecoder
type ReorderWindow ¶
type ReorderWindow struct {
// contains filtered or unexported fields
}
type StrategyType ¶
type StrategyType string
type Trx ¶
type Trx struct { // kafka topic Topic string `json:"topic"` // kafka offset BeginOffset int64 `json:"begin_offset"` EndOffset int64 `json:"end_offset"` // LwmOffset 表示该kafka offset之前(含)的事务都已经执行过 LwmOffset int64 `json:"lwm_offset"` Gtid string `json:"gtid"` // 一个Gtid范围内,有过多的event时,导致拆包的序列号,从0、1、2... Seq uint64 `json:"-"` // 在binlog文件中的位置 Position uint64 `json:"position"` // binlog文件名 BinlogFileName string `json:"binlog_file_name"` // 下一条binlog的位置 NextPosition uint64 `json:"-"` // binlog文件名 NextBinlogFileName string `json:"-"` // 组提交id LastCommitted int64 `json:"last_committed"` SequenceNumber int64 `json:"sequence_number"` // 事务产生的时间,取自GTID Timestamp uint64 `json:"timestamp"` // 重试次数 RetryTimes uint32 `json:"retry_times"` Events []*Event `json:"events"` // contains filtered or unexported fields }
Transaction 数据
func (*Trx) BuildTransactionSQL ¶
func (tx *Trx) BuildTransactionSQL() ([]*ExecutableSQL, error)
type ValueComparisonSign ¶
type ValueComparisonSign string
const ( EqualsComparisonSign ValueComparisonSign = "=" NullEqualsComparisonSign ValueComparisonSign = "is" )
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package rdp_messages is a generated protocol buffer package.
|
Package rdp_messages is a generated protocol buffer package. |
Click to show internal directories.
Click to hide internal directories.