applier

package
v0.0.0-...-cc902e5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2018 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MYSQL_TYPE_DECIMAL ColumnType = iota
	MYSQL_TYPE_TINY
	MYSQL_TYPE_SHORT
	MYSQL_TYPE_LONG
	MYSQL_TYPE_FLOAT
	MYSQL_TYPE_DOUBLE
	MYSQL_TYPE_NULL
	MYSQL_TYPE_TIMESTAMP
	MYSQL_TYPE_LONGLONG
	MYSQL_TYPE_INT24
	MYSQL_TYPE_DATE
	MYSQL_TYPE_TIME
	MYSQL_TYPE_DATETIME
	MYSQL_TYPE_YEAR
	MYSQL_TYPE_NEWDATE
	MYSQL_TYPE_VARCHAR
	MYSQL_TYPE_BIT
	MYSQL_TYPE_TIMESTAMP2
	MYSQL_TYPE_DATETIME2
	MYSQL_TYPE_TIME2

	MYSQL_TYPE_JSON        = 245
	MYSQL_TYPE_NEWDECIMAL  = 246
	MYSQL_TYPE_ENUM        = 247
	MYSQL_TYPE_SET         = 248
	MYSQL_TYPE_TINY_BLOB   = 249
	MYSQL_TYPE_MEDIUM_BLOB = 250
	MYSQL_TYPE_LONG_BLOB   = 251
	MYSQL_TYPE_BLOB        = 252
	MYSQL_TYPE_VAR_STRING  = 253
	MYSQL_TYPE_STRING      = 254
	MYSQL_TYPE_GEOMETRY    = 255
)
View Source
const (
	StatusKey_ApplyProgress = "apply_progress"
	StatusKey_FailedTrx     = "failed_trx"
)
View Source
const (
	//基于最新时间优先策略,如果时间相等则利用Applier中的记录覆盖原记录
	TimeOverwriteStrategy = "time_overwrite"
	//基于最新时间优先策略,如果时间相等则忽略Applier中的记录
	TimeIgnoreStrategy = "time_ignore"
	IgnoreStrategy     = "ignore"
	OverwriteStrategy  = "overwrite"
)
View Source
const (
	QUERY_EVENT       EventType = 2
	WRITE_ROWS_EVENT            = 30
	UPDATE_ROWS_EVENT           = 31
	DELETE_ROWS_EVENT           = 32
	GTID_LOG_EVENT              = 33
)
View Source
const (
	CheckpointTableCount = 1000
)
View Source
const (
	CheckpointTablePrefix = "drc_ckt_"
)

Variables

View Source
var (
	CheckpointDatabase = ""
	CheckpointTable    = ""
)
View Source
var (
	ErrNotDeleteType          = errors.New("applier: event is not delete type")
	ErrNotUpdateType          = errors.New("applier: event is not update type")
	ErrNotInsertType          = errors.New("applier: event is not insert type")
	ErrColumnArrayIsNil       = errors.New("applier: column array is nil")
	ErrRowArrayIsNil          = errors.New("applier: row array is nil")
	ErrColumnIsNil            = errors.New("applier: column array is nil")
	ErrValueIsNil             = errors.New("applier: value is nil")
	ErrArrayLengthNotEqual    = errors.New("applier: array length is not equal")
	ErrArgsIsNil              = errors.New("applier: args is nil")
	ErrUnsupportedRdpMessage  = errors.New("applier: unsupported rdp messages")
	ErrIncontinuousRdpMessage = errors.New("applier: incontinuous rdp messages")
	ErrNeedToPeekTrx          = errors.New("applier: need to peek the trx")
	ErrUpdateCheckpointTable  = errors.New("applier: update checkpoint table error")
	ErrColumnCountNotEqual    = errors.New("applier: column count not equal")
	ErrTimeIsZero             = errors.New("applier: time is zero")
	ErrNoPK                   = errors.New("applier: no primary key")
	ErrorRowsAffectedCount    = errors.New("applier: the count of affected row is not expected")
	ErrStrategyNotSupport     = errors.New("applier: this strategy of handling conflict is not support")
	ErrrUnexpect              = errors.New("applier: unexpected error")
	ErrTimeType               = errors.New("applier: time type is not expected")
	ErrEventType              = errors.New("applier: event type is not expected")
	ErrLargeTrx               = errors.New("applier: large trx has been filtered by rdp")
	ErrIllegalTrx             = errors.New("applier: illegal trx that is not allowed")
	ErrNotEqual               = errors.New("applier:args count not equal")
	ErrSkip                   = errors.New("applier: skip fast-path; continue as if unimplemented")
	ErrServerIdChange         = errors.New("applier: server_id has changed")
)
View Source
var (
	CAQ_UNDEF_INDEX = ^uint64(0)
)

Functions

func BuildEqualsComparison

func BuildEqualsComparison(columns []*Column, values []string) (string, error)

func BuildValueComparison

func BuildValueComparison(column string, value string, comparisonSign ValueComparisonSign) (string, error)

func EscapeName

func EscapeName(name string) string

func ExecBinlogTrx

func ExecBinlogTrx(ctx context.Context, opts *ExecOpt, binlogTrx *Trx)

func HasPk

func HasPk(columns []*Column) bool

func SetCheckpointDBandTableName

func SetCheckpointDBandTableName(groupId string) error

Types

type ApplierConfig

type ApplierConfig struct {
	KafkaBrokerList string
	KafkaTopic      string
	KafkaPartition  int
	KafkaVersion    string
	KafkaOffset     int64
	// MySQL 配置
	MysqlHost           string
	MysqlPort           int
	MysqlUser           string
	MysqlPasswd         string
	MysqlCharset        string
	MysqlConnectTimeout int
	MysqlReadTimeout    int
	MysqlWriteTimeout   int
	MysqlWaitTimeout    int
	// 冲突处理
	HandleConflictStrategy string
	//时间冲突处理比较字段
	UpdateTimeColumn string
	// 是否允许DDL
	AllowDDL bool
	// 最大消费速率, kb/s
	MaxRate int
	// 乱序pkg数量累计一定程度后的告警阈值
	IncontinuousRdpPkgThreshold int
	// 慢事务的时间阈值
	SlowTrxThreshold int
}

func (*ApplierConfig) CheckArgs

func (cfg *ApplierConfig) CheckArgs()

type ApplierServer

type ApplierServer struct {
	// contains filtered or unexported fields
}

func NewApplierServer

func NewApplierServer(config *ApplierConfig) *ApplierServer

func (*ApplierServer) QuitNotify

func (o *ApplierServer) QuitNotify() <-chan struct{}

func (*ApplierServer) Retry

func (o *ApplierServer) Retry(gtid string)

func (*ApplierServer) RetryTrx

func (o *ApplierServer) RetryTrx(gtid string)

func (*ApplierServer) SkipTrx

func (o *ApplierServer) SkipTrx(gtid string)

func (*ApplierServer) Start

func (o *ApplierServer) Start() error

func (*ApplierServer) Stop

func (o *ApplierServer) Stop() error

type ApplyProgress

type ApplyProgress struct {
	// 已经成功执行的offset
	Offset         int64  `json:"kafka_offset"`
	Gtid           string `json:"gtid"`
	Position       uint64 `json:"binlog_file_position"`
	BinlogFileName string `json:"binlog_file_name"`
}

type AssignedQueue

type AssignedQueue struct {
	// contains filtered or unexported fields
}

type Column

type Column struct {
	// 名称
	Name string
	// int, varchar, blob等
	Type       string
	BinlogType ColumnType
	// 内容
	Value interface{}
	// 是否为pk
	IsPk bool
	// 是否为NULL
	IsNull bool
}

Column

type ColumnType

type ColumnType byte

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

func NewCoordinator

func NewCoordinator(config *ApplierConfig, trxC <-chan *Trx, retryC <-chan string, skipC <-chan string) *Coordinator

func (*Coordinator) QuitNotify

func (o *Coordinator) QuitNotify() <-chan struct{}

func (*Coordinator) Start

func (o *Coordinator) Start() error

func (*Coordinator) Stop

func (o *Coordinator) Stop() error

type Event

type Event struct {
	DatabaseName string
	TableName    string
	EventType    EventType
	// 数据库的binlog时间
	Timestamp uint64
	// RDP处理binlog的时间
	TimestampOfReceipt uint64
	// 在binlog文件中的位置
	Position uint64
	// 下一条binlog的位置
	NextPosition uint64
	// binlog文件名
	BinlogFileName string
	ServerId       uint64
	// Query对应的SQL语句
	SqlStatement string
	Rows         []*Row
}

func (*Event) BuildColumnsPreparedValues

func (e *Event) BuildColumnsPreparedValues() (string, error)

构造insert中列信息

func (*Event) BuildDeleteSQL

func (e *Event) BuildDeleteSQL(gtid string, beginOffset int64) ([]*ExecutableSQL, error)

func (*Event) BuildEqualsPreparedComparison

func (e *Event) BuildEqualsPreparedComparison(i int) (string, error)

构造where条件

func (*Event) BuildInsertSQL

func (e *Event) BuildInsertSQL(gtid string, beginOffset int64) ([]*ExecutableSQL, error)

func (*Event) BuildSetPreparedClause

func (e *Event) BuildSetPreparedClause() (string, error)

构造update中set子句

func (*Event) BuildUpdateSQL

func (e *Event) BuildUpdateSQL(gtid string, beginOffset int64) ([]*ExecutableSQL, error)

type EventType

type EventType uint32

type ExecOpt

type ExecOpt struct {
	// contains filtered or unexported fields
}

type ExecResult

type ExecResult struct {
	Err error  `json:"-"`
	Msg string `json:"msg"`
	Trx *Trx   `json:"trx"`
}

type ExecutableSQL

type ExecutableSQL struct {
	Query string
	Args  []interface{}

	TrxGtid        string
	TrxBeginOffset int64

	EventType    EventType
	DatabaseName string
	TableName    string
	RowValue     *Row
}

func (*ExecutableSQL) HandDeleteConflict

func (s *ExecutableSQL) HandDeleteConflict(tx *sql.Tx, strategy StrategyType, updateTimeColumn string) error

func (*ExecutableSQL) HandInsertConflict

func (s *ExecutableSQL) HandInsertConflict(tx *sql.Tx, strategy StrategyType, updateTimeColumn string) error

func (*ExecutableSQL) HandleUpdateConflict

func (s *ExecutableSQL) HandleUpdateConflict(tx *sql.Tx, strategy StrategyType, updateTimeColumn string) error

type Fetcher

type Fetcher struct {
	// contains filtered or unexported fields
}

func NewFetcher

func NewFetcher(config *ApplierConfig) *Fetcher

func (*Fetcher) NewTrxNotify

func (o *Fetcher) NewTrxNotify() <-chan *Trx

func (*Fetcher) QuitNotify

func (o *Fetcher) QuitNotify() <-chan struct{}

func (*Fetcher) Start

func (o *Fetcher) Start() error

func (*Fetcher) Stop

func (o *Fetcher) Stop() error

type PkgSn

type PkgSn struct {
	// contains filtered or unexported fields
}

type RdpDecoder

type RdpDecoder struct {
	// contains filtered or unexported fields
}

func NewRdpDecoder

func NewRdpDecoder(incontinuousRdpPkgThreshold int) *RdpDecoder

type RdpPkg

type RdpPkg struct {
	rdp_messages.KafkaPkg
	Topic  string
	Offset int64
}

写入kafka数据

type ReorderWindow

type ReorderWindow struct {
	// contains filtered or unexported fields
}

type Row

type Row struct {
	// Delete情况下,只有before
	Before []*Column
	// Insert情况下,只有after
	// Update情况下,before和after成对出现
	After []*Column
}

type StrategyType

type StrategyType string

type Trx

type Trx struct {
	// kafka topic
	Topic string `json:"topic"`
	// kafka offset
	BeginOffset int64 `json:"begin_offset"`
	EndOffset   int64 `json:"end_offset"`
	// LwmOffset 表示该kafka offset之前(含)的事务都已经执行过
	LwmOffset int64 `json:"lwm_offset"`

	Gtid string `json:"gtid"`
	// 一个Gtid范围内,有过多的event时,导致拆包的序列号,从0、1、2...
	Seq uint64 `json:"-"`
	// 在binlog文件中的位置
	Position uint64 `json:"position"`
	// binlog文件名
	BinlogFileName string `json:"binlog_file_name"`
	// 下一条binlog的位置
	NextPosition uint64 `json:"-"`
	// binlog文件名
	NextBinlogFileName string `json:"-"`
	// 组提交id
	LastCommitted  int64 `json:"last_committed"`
	SequenceNumber int64 `json:"sequence_number"`

	// 事务产生的时间,取自GTID
	Timestamp uint64 `json:"timestamp"`

	// 重试次数
	RetryTimes uint32 `json:"retry_times"`

	Events []*Event `json:"events"`
	// contains filtered or unexported fields
}

Transaction 数据

func (*Trx) BuildTransactionSQL

func (tx *Trx) BuildTransactionSQL() ([]*ExecutableSQL, error)

type ValueComparisonSign

type ValueComparisonSign string
const (
	EqualsComparisonSign     ValueComparisonSign = "="
	NullEqualsComparisonSign ValueComparisonSign = "is"
)

Directories

Path Synopsis
Package rdp_messages is a generated protocol buffer package.
Package rdp_messages is a generated protocol buffer package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL