replication

package
v1.7.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2023 License: MIT Imports: 29 Imported by: 0

Documentation

Overview

Replication package is to handle MySQL replication protocol.

Todo:

+ Get table information when handing rows event.

Index

Constants

View Source
const (
	LOG_EVENT_BINLOG_IN_USE_F            uint16 = 0x0001
	LOG_EVENT_FORCED_ROTATE_F            uint16 = 0x0002
	LOG_EVENT_THREAD_SPECIFIC_F          uint16 = 0x0004
	LOG_EVENT_SUPPRESS_USE_F             uint16 = 0x0008
	LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F uint16 = 0x0010
	LOG_EVENT_ARTIFICIAL_F               uint16 = 0x0020
	LOG_EVENT_RELAY_LOG_F                uint16 = 0x0040
	LOG_EVENT_IGNORABLE_F                uint16 = 0x0080
	LOG_EVENT_NO_FILTER_F                uint16 = 0x0100
	LOG_EVENT_MTS_ISOLATE_F              uint16 = 0x0200
)
View Source
const (
	BINLOG_DUMP_NEVER_STOP          uint16 = 0x00
	BINLOG_DUMP_NON_BLOCK           uint16 = 0x01
	BINLOG_SEND_ANNOTATE_ROWS_EVENT uint16 = 0x02
	BINLOG_THROUGH_POSITION         uint16 = 0x02
	BINLOG_THROUGH_GTID             uint16 = 0x04
)
View Source
const (
	BINLOG_ROW_IMAGE_FULL    = "FULL"
	BINLOG_ROW_IMAGE_MINIMAL = "MINIMAL"
	BINLOG_ROW_IMAGE_NOBLOB  = "NOBLOB"
)
View Source
const (
	BINLOG_MARIADB_FL_STANDALONE      = 1 << iota /*1  - FL_STANDALONE is set when there is no terminating COMMIT event*/
	BINLOG_MARIADB_FL_GROUP_COMMIT_ID             /*2  - FL_GROUP_COMMIT_ID is set when event group is part of a group commit on the master. Groups with same commit_id are part of the same group commit.*/
	BINLOG_MARIADB_FL_TRANSACTIONAL               /*4  - FL_TRANSACTIONAL is set for an event group that can be safely rolled back (no MyISAM, eg.).*/
	BINLOG_MARIADB_FL_ALLOW_PARALLEL              /*8  - FL_ALLOW_PARALLEL reflects the (negation of the) value of @@SESSION.skip_parallel_replication at the time of commit*/
	BINLOG_MARIADB_FL_WAITED                      /*16 = FL_WAITED is set if a row lock wait (or other wait) is detected during the execution of the transaction*/
	BINLOG_MARIADB_FL_DDL                         /*32 - FL_DDL is set for event group containing DDL*/
)
View Source
const (
	BINLOG_CHECKSUM_ALG_OFF byte = 0 // Events are without checksum though its generator
	// is checksum-capable New Master (NM).
	BINLOG_CHECKSUM_ALG_CRC32 byte = 1 // CRC32 of zlib algorithm.
	//  BINLOG_CHECKSUM_ALG_ENUM_END,  // the cut line: valid alg range is [1, 0x7f].
	BINLOG_CHECKSUM_ALG_UNDEF byte = 255 // special value to tag undetermined yet checksum

)
View Source
const (
	TABLE_MAP_OPT_META_SIGNEDNESS byte = iota + 1
	TABLE_MAP_OPT_META_DEFAULT_CHARSET
	TABLE_MAP_OPT_META_COLUMN_CHARSET
	TABLE_MAP_OPT_META_COLUMN_NAME
	TABLE_MAP_OPT_META_SET_STR_VALUE
	TABLE_MAP_OPT_META_ENUM_STR_VALUE
	TABLE_MAP_OPT_META_GEOMETRY_TYPE
	TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY
	TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX
	TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET
	TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET
	TABLE_MAP_OPT_META_COLUMN_VISIBILITY
)

These are TABLE_MAP_EVENT's optional metadata field type, from: libbinlogevents/include/rows_event.h

View Source
const (
	ENUM_EXTRA_ROW_INFO_TYPECODE_NDB byte = iota
	ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION
)
View Source
const (
	EventHeaderSize            = 19
	SidLength                  = 16
	LogicalTimestampTypeCode   = 2
	PartLogicalTimestampLength = 8
	BinlogChecksumLength       = 4
	UndefinedServerVer         = 999999 // UNDEFINED_SERVER_VERSION
)
View Source
const (
	JSONB_SMALL_OBJECT byte = iota // small JSON object
	JSONB_LARGE_OBJECT             // large JSON object
	JSONB_SMALL_ARRAY              // small JSON array
	JSONB_LARGE_ARRAY              // large JSON array
	JSONB_LITERAL                  // literal (true/false/null)
	JSONB_INT16                    // int16
	JSONB_UINT16                   // uint16
	JSONB_INT32                    // int32
	JSONB_UINT32                   // uint32
	JSONB_INT64                    // int64
	JSONB_UINT64                   // uint64
	JSONB_DOUBLE                   // double
	JSONB_STRING                   // string
	JSONB_OPAQUE       byte = 0x0f // custom data (any MySQL data type)
)
View Source
const (
	JSONB_NULL_LITERAL  byte = 0x00
	JSONB_TRUE_LITERAL  byte = 0x01
	JSONB_FALSE_LITERAL byte = 0x02
)
View Source
const (
	// The JSON value in the given path is replaced with a new value.
	//
	// It has the same effect as `JSON_REPLACE(col, path, value)`.
	JsonDiffOperationReplace = JsonDiffOperation(iota)

	// Add a new element at the given path.
	//
	//  If the path specifies an array element, it has the same effect as `JSON_ARRAY_INSERT(col, path, value)`.
	//
	//  If the path specifies an object member, it has the same effect as `JSON_INSERT(col, path, value)`.
	JsonDiffOperationInsert

	// The JSON value at the given path is removed from an array or object.
	//
	// It has the same effect as `JSON_REMOVE(col, path)`.
	JsonDiffOperationRemove
)
View Source
const (
	EnumRowImageTypeWriteAI = EnumRowImageType(iota)
	EnumRowImageTypeUpdateBI
	EnumRowImageTypeUpdateAI
	EnumRowImageTypeDeleteBI
)
View Source
const (
	OTW_PAYLOAD_HEADER_END_MARK = iota
	OTW_PAYLOAD_SIZE_FIELD
	OTW_PAYLOAD_COMPRESSION_TYPE_FIELD
	OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD
)

On The Wire: Field Types See also binary_log::codecs::binary::Transaction_payload::fields in MySQL https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1codecs_1_1binary_1_1Transaction__payload.html#a9fff7ac12ba064f40e9216565c53d07b

View Source
const (
	ZSTD = 0
	NONE = 255
)

Compression Types

View Source
const DATETIMEF_INT_OFS int64 = 0x8000000000
View Source
const (
	// Store JSON updates in partial form
	EnumBinlogRowValueOptionsPartialJsonUpdates = EnumBinlogRowValueOptions(iota + 1)
)
View Source
const (
	//we only support MySQL 5.0.0+ binlog format, maybe???
	MinBinlogVersion = 4
)
View Source
const RowsEventStmtEndFlag = 0x01

RowsEventStmtEndFlag is set in the end of the statement.

View Source
const TIMEF_INT_OFS int64 = 0x800000
View Source
const TIMEF_OFS int64 = 0x800000000000

Variables

View Source
var (
	ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again")
	ErrSyncClosed    = errors.New("Sync was closed")
)
View Source
var (
	//binlog header [ fe `bin` ]
	BinLogFileHeader = []byte{0xfe, 0x62, 0x69, 0x6e}

	SemiSyncIndicator byte = 0xef
)
View Source
var (
	// ErrChecksumMismatch indicates binlog checksum mismatch.
	ErrChecksumMismatch = errors.New("binlog checksum mismatch, data may be corrupted")
)
View Source
var (
	ErrCorruptedJSONDiff = fmt.Errorf("corrupted JSON diff") // ER_CORRUPTED_JSON_DIFF
)

Functions

This section is empty.

Types

type BeginLoadQueryEvent

type BeginLoadQueryEvent struct {
	FileID    uint32
	BlockData []byte
}

func (*BeginLoadQueryEvent) Decode

func (e *BeginLoadQueryEvent) Decode(data []byte) error

func (*BeginLoadQueryEvent) Dump

func (e *BeginLoadQueryEvent) Dump(w io.Writer)

type BinlogEvent

type BinlogEvent struct {
	// raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists
	RawData []byte

	Header *EventHeader
	Event  Event
}

func (*BinlogEvent) Dump

func (e *BinlogEvent) Dump(w io.Writer)

type BinlogParser

type BinlogParser struct {
	// contains filtered or unexported fields
}

func NewBinlogParser

func NewBinlogParser() *BinlogParser

func (*BinlogParser) Parse

func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error)

Parse: Given the bytes for a a binary log event: return the decoded event. With the exception of the FORMAT_DESCRIPTION_EVENT event type there must have previously been passed a FORMAT_DESCRIPTION_EVENT into the parser for this to work properly on any given event. Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace an existing one.

func (*BinlogParser) ParseFile

func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error

func (*BinlogParser) ParseReader

func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error

func (*BinlogParser) ParseSingleEvent

func (p *BinlogParser) ParseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error)

ParseSingleEvent parses single binlog event and passes the event to onEvent function.

func (*BinlogParser) Reset

func (p *BinlogParser) Reset()

func (*BinlogParser) Resume

func (p *BinlogParser) Resume()

func (*BinlogParser) SetFlavor

func (p *BinlogParser) SetFlavor(flavor string)

func (*BinlogParser) SetIgnoreJSONDecodeError

func (p *BinlogParser) SetIgnoreJSONDecodeError(ignoreJSONDecodeErr bool)

func (*BinlogParser) SetParseTime

func (p *BinlogParser) SetParseTime(parseTime bool)

func (*BinlogParser) SetRawMode

func (p *BinlogParser) SetRawMode(mode bool)

func (*BinlogParser) SetRowsEventDecodeFunc

func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error)

func (*BinlogParser) SetTableMapOptionalMetaDecodeFunc

func (p *BinlogParser) SetTableMapOptionalMetaDecodeFunc(tableMapOptionalMetaDecondeFunc func([]byte) error)

func (*BinlogParser) SetTimestampStringLocation

func (p *BinlogParser) SetTimestampStringLocation(timestampStringLocation *time.Location)

func (*BinlogParser) SetUseDecimal

func (p *BinlogParser) SetUseDecimal(useDecimal bool)

func (*BinlogParser) SetVerifyChecksum

func (p *BinlogParser) SetVerifyChecksum(verify bool)

func (*BinlogParser) Stop

func (p *BinlogParser) Stop()

type BinlogStreamer

type BinlogStreamer struct {
	// contains filtered or unexported fields
}

BinlogStreamer gets the streaming event.

func NewBinlogStreamer

func NewBinlogStreamer() *BinlogStreamer

func NewBinlogStreamerWithChanSize

func NewBinlogStreamerWithChanSize(chanSize int) *BinlogStreamer

func (*BinlogStreamer) AddErrorToStreamer

func (s *BinlogStreamer) AddErrorToStreamer(err error) bool

AddErrorToStreamer adds an error to the streamer.

func (*BinlogStreamer) AddEventToStreamer

func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error

AddEventToStreamer adds a binlog event to the streamer. You can use it when you want to add an event to the streamer manually. can be used in replication handlers

func (*BinlogStreamer) DumpEvents

func (s *BinlogStreamer) DumpEvents() []*BinlogEvent

DumpEvents dumps all left events

func (*BinlogStreamer) GetEvent

func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error)

GetEvent gets the binlog event one by one, it will block until Syncer receives any events from MySQL or meets a sync error. You can pass a context (like Cancel or Timeout) to break the block.

func (*BinlogStreamer) GetEventWithStartTime

func (s *BinlogStreamer) GetEventWithStartTime(ctx context.Context, startTime time.Time) (*BinlogEvent, error)

GetEventWithStartTime gets the binlog event with starttime, if current binlog event timestamp smaller than specify starttime return nil event

type BinlogSyncer

type BinlogSyncer struct {
	// contains filtered or unexported fields
}

BinlogSyncer syncs binlog event from server.

func NewBinlogSyncer

func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer

NewBinlogSyncer creates the BinlogSyncer with cfg.

func (*BinlogSyncer) Close

func (b *BinlogSyncer) Close()

Close closes the BinlogSyncer.

func (*BinlogSyncer) GetNextPosition

func (b *BinlogSyncer) GetNextPosition() Position

GetNextPosition returns the next position of the syncer

func (*BinlogSyncer) LastConnectionID

func (b *BinlogSyncer) LastConnectionID() uint32

LastConnectionID returns last connectionID.

func (*BinlogSyncer) StartBackup

func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error

StartBackup: Like mysqlbinlog remote raw backup Backup remote binlog from position (filename, offset) and write in backupDir

func (*BinlogSyncer) StartSync

func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error)

StartSync starts syncing from the `pos` position.

func (*BinlogSyncer) StartSyncGTID

func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error)

StartSyncGTID starts syncing from the `gset` GTIDSet.

type BinlogSyncerConfig

type BinlogSyncerConfig struct {
	// ServerID is the unique ID in cluster.
	ServerID uint32
	// Flavor is "mysql" or "mariadb", if not set, use "mysql" default.
	Flavor string

	// Host is for MySQL server host.
	Host string
	// Port is for MySQL server port.
	Port uint16
	// User is for MySQL user.
	User string
	// Password is for MySQL password.
	Password string

	// Localhost is local hostname if register salve.
	// If not set, use os.Hostname() instead.
	Localhost string

	// Charset is for MySQL client character set
	Charset string

	// SemiSyncEnabled enables semi-sync or not.
	SemiSyncEnabled bool

	// RawModeEnabled is for not parsing binlog event.
	RawModeEnabled bool

	// If not nil, use the provided tls.Config to connect to the database using TLS/SSL.
	TLSConfig *tls.Config

	// Use replication.Time structure for timestamp and datetime.
	// We will use Local location for timestamp and UTC location for datatime.
	ParseTime bool

	// If ParseTime is false, convert TIMESTAMP into this specified timezone. If
	// ParseTime is true, this option will have no effect and TIMESTAMP data will
	// be parsed into the local timezone and a full time.Time struct will be
	// returned.
	//
	// Note that MySQL TIMESTAMP columns are offset from the machine local
	// timezone while DATETIME columns are offset from UTC. This is consistent
	// with documented MySQL behaviour as it return TIMESTAMP in local timezone
	// and DATETIME in UTC.
	//
	// Setting this to UTC effectively equalizes the TIMESTAMP and DATETIME time
	// strings obtained from MySQL.
	TimestampStringLocation *time.Location

	// Use decimal.Decimal structure for decimals.
	UseDecimal bool

	// RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection.
	RecvBufferSize int

	// master heartbeat period
	HeartbeatPeriod time.Duration

	// read timeout
	ReadTimeout time.Duration

	// maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
	// this configuration will not work if DisableRetrySync is true
	MaxReconnectAttempts int

	// whether disable re-sync for broken connection
	DisableRetrySync bool

	// Only works when MySQL/MariaDB variable binlog_checksum=CRC32.
	// For MySQL, binlog_checksum was introduced since 5.6.2, but CRC32 was set as default value since 5.6.6 .
	// https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#option_mysqld_binlog-checksum
	// For MariaDB, binlog_checksum was introduced since MariaDB 5.3, but CRC32 was set as default value since MariaDB 10.2.1 .
	// https://mariadb.com/kb/en/library/replication-and-binary-log-server-system-variables/#binlog_checksum
	VerifyChecksum bool

	// DumpCommandFlag is used to send binglog dump command. Default 0, aka BINLOG_DUMP_NEVER_STOP.
	// For MySQL, BINLOG_DUMP_NEVER_STOP and BINLOG_DUMP_NON_BLOCK are available.
	// https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block
	// For MariaDB, BINLOG_DUMP_NEVER_STOP, BINLOG_DUMP_NON_BLOCK and BINLOG_SEND_ANNOTATE_ROWS_EVENT are available.
	// https://mariadb.com/kb/en/library/com_binlog_dump/
	// https://mariadb.com/kb/en/library/annotate_rows_event/
	DumpCommandFlag uint16

	//Option function is used to set outside of BinlogSyncerConfig, between mysql connection and COM_REGISTER_SLAVE
	//For MariaDB: slave_gtid_ignore_duplicates、skip_replication、slave_until_gtid
	Option func(*client.Conn) error

	// Set Logger
	Logger loggers.Advanced

	// Set Dialer
	Dialer client.Dialer

	RowsEventDecodeFunc func(*RowsEvent, []byte) error

	TableMapOptionalMetaDecodeFunc func([]byte) error

	DiscardGTIDSet bool

	EventCacheCount int
}

BinlogSyncerConfig is the configuration for BinlogSyncer.

type EnumBinlogRowValueOptions

type EnumBinlogRowValueOptions byte

Bits for binlog_row_value_options sysvar

type EnumRowImageType

type EnumRowImageType byte

EnumRowImageType is allowed types for every row in mysql binlog. See https://github.com/mysql/mysql-server/blob/1bfe02bdad6604d54913c62614bde57a055c8332/sql/rpl_record.h#L39 enum class enum_row_image_type { WRITE_AI, UPDATE_BI, UPDATE_AI, DELETE_BI };

func (EnumRowImageType) String

func (t EnumRowImageType) String() string

type Event

type Event interface {
	//Dump Event, format like python-mysql-replication
	Dump(w io.Writer)

	Decode(data []byte) error
}

type EventError

type EventError struct {
	Header *EventHeader

	//Error message
	Err string

	//Event data
	Data []byte
}

func (*EventError) Error

func (e *EventError) Error() string

type EventHeader

type EventHeader struct {
	Timestamp uint32
	EventType EventType
	ServerID  uint32
	EventSize uint32
	LogPos    uint32
	Flags     uint16
}

func (*EventHeader) Decode

func (h *EventHeader) Decode(data []byte) error

func (*EventHeader) Dump

func (h *EventHeader) Dump(w io.Writer)

type EventType

type EventType byte
const (
	UNKNOWN_EVENT EventType = iota
	START_EVENT_V3
	QUERY_EVENT
	STOP_EVENT
	ROTATE_EVENT
	INTVAR_EVENT
	LOAD_EVENT
	SLAVE_EVENT
	CREATE_FILE_EVENT
	APPEND_BLOCK_EVENT
	EXEC_LOAD_EVENT
	DELETE_FILE_EVENT
	NEW_LOAD_EVENT
	RAND_EVENT
	USER_VAR_EVENT
	FORMAT_DESCRIPTION_EVENT
	XID_EVENT
	BEGIN_LOAD_QUERY_EVENT
	EXECUTE_LOAD_QUERY_EVENT
	TABLE_MAP_EVENT
	WRITE_ROWS_EVENTv0
	UPDATE_ROWS_EVENTv0
	DELETE_ROWS_EVENTv0
	WRITE_ROWS_EVENTv1
	UPDATE_ROWS_EVENTv1
	DELETE_ROWS_EVENTv1
	INCIDENT_EVENT
	HEARTBEAT_EVENT
	IGNORABLE_EVENT
	ROWS_QUERY_EVENT
	WRITE_ROWS_EVENTv2
	UPDATE_ROWS_EVENTv2
	DELETE_ROWS_EVENTv2
	GTID_EVENT
	ANONYMOUS_GTID_EVENT
	PREVIOUS_GTIDS_EVENT
	TRANSACTION_CONTEXT_EVENT
	VIEW_CHANGE_EVENT
	XA_PREPARE_LOG_EVENT
	PARTIAL_UPDATE_ROWS_EVENT
	TRANSACTION_PAYLOAD_EVENT
	HEARTBEAT_LOG_EVENT_V2
)
const (
	// MariaDB event starts from 160
	MARIADB_ANNOTATE_ROWS_EVENT EventType = 160 + iota
	MARIADB_BINLOG_CHECKPOINT_EVENT
	MARIADB_GTID_EVENT
	MARIADB_GTID_LIST_EVENT
	MARIADB_START_ENCRYPTION_EVENT
	MARIADB_QUERY_COMPRESSED_EVENT
	MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1
	MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1
	MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1
)

func (EventType) String

func (e EventType) String() string

type ExecuteLoadQueryEvent

type ExecuteLoadQueryEvent struct {
	SlaveProxyID     uint32
	ExecutionTime    uint32
	SchemaLength     uint8
	ErrorCode        uint16
	StatusVars       uint16
	FileID           uint32
	StartPos         uint32
	EndPos           uint32
	DupHandlingFlags uint8
}

func (*ExecuteLoadQueryEvent) Decode

func (e *ExecuteLoadQueryEvent) Decode(data []byte) error

func (*ExecuteLoadQueryEvent) Dump

func (e *ExecuteLoadQueryEvent) Dump(w io.Writer)

type FormatDescriptionEvent

type FormatDescriptionEvent struct {
	Version uint16
	//len = 50
	ServerVersion          []byte
	CreateTimestamp        uint32
	EventHeaderLength      uint8
	EventTypeHeaderLengths []byte

	// 0 is off, 1 is for CRC32, 255 is undefined
	ChecksumAlgorithm byte
}

func (*FormatDescriptionEvent) Decode

func (e *FormatDescriptionEvent) Decode(data []byte) error

func (*FormatDescriptionEvent) Dump

func (e *FormatDescriptionEvent) Dump(w io.Writer)

type GTIDEvent

type GTIDEvent struct {
	CommitFlag     uint8
	SID            []byte
	GNO            int64
	LastCommitted  int64
	SequenceNumber int64

	// ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see:
	// https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/
	ImmediateCommitTimestamp uint64
	OriginalCommitTimestamp  uint64

	// Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see:
	// https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/
	TransactionLength uint64

	// ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see
	// https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html
	ImmediateServerVersion uint32
	OriginalServerVersion  uint32
}

func (*GTIDEvent) Decode

func (e *GTIDEvent) Decode(data []byte) error

func (*GTIDEvent) Dump

func (e *GTIDEvent) Dump(w io.Writer)

func (*GTIDEvent) GTIDNext

func (e *GTIDEvent) GTIDNext() (GTIDSet, error)

func (*GTIDEvent) ImmediateCommitTime

func (e *GTIDEvent) ImmediateCommitTime() time.Time

ImmediateCommitTime returns the commit time of this trx on the immediate server or zero time if not available.

func (*GTIDEvent) OriginalCommitTime

func (e *GTIDEvent) OriginalCommitTime() time.Time

OriginalCommitTime returns the commit time of this trx on the original server or zero time if not available.

type GenericEvent

type GenericEvent struct {
	Data []byte
}

we don't parse all event, so some we will use GenericEvent instead

func (*GenericEvent) Decode

func (e *GenericEvent) Decode(data []byte) error

func (*GenericEvent) Dump

func (e *GenericEvent) Dump(w io.Writer)

type IntVarEvent

type IntVarEvent struct {
	Type  IntVarEventType
	Value uint64
}

func (*IntVarEvent) Decode

func (i *IntVarEvent) Decode(data []byte) error

func (*IntVarEvent) Dump

func (i *IntVarEvent) Dump(w io.Writer)

type IntVarEventType

type IntVarEventType byte
const (
	INVALID IntVarEventType = iota
	LAST_INSERT_ID
	INSERT_ID
)

type JsonDiff

type JsonDiff struct {
	Op    JsonDiffOperation
	Path  string
	Value string
}

func (*JsonDiff) String

func (jd *JsonDiff) String() string

type JsonDiffOperation

type JsonDiffOperation byte

JsonDiffOperation is an enum that describes what kind of operation a JsonDiff object represents. https://github.com/mysql/mysql-server/blob/8.0/sql/json_diff.h

func (JsonDiffOperation) String

func (op JsonDiffOperation) String() string

type MariadbAnnotateRowsEvent

type MariadbAnnotateRowsEvent struct {
	Query []byte
}

func (*MariadbAnnotateRowsEvent) Decode

func (e *MariadbAnnotateRowsEvent) Decode(data []byte) error

func (*MariadbAnnotateRowsEvent) Dump

func (e *MariadbAnnotateRowsEvent) Dump(w io.Writer)

type MariadbBinlogCheckPointEvent

type MariadbBinlogCheckPointEvent struct {
	Info []byte
}

func (*MariadbBinlogCheckPointEvent) Decode

func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error

func (*MariadbBinlogCheckPointEvent) Dump

type MariadbGTIDEvent

type MariadbGTIDEvent struct {
	GTID     MariadbGTID
	Flags    byte
	CommitID uint64
}

func (*MariadbGTIDEvent) Decode

func (e *MariadbGTIDEvent) Decode(data []byte) error

func (*MariadbGTIDEvent) Dump

func (e *MariadbGTIDEvent) Dump(w io.Writer)

func (*MariadbGTIDEvent) GTIDNext

func (e *MariadbGTIDEvent) GTIDNext() (GTIDSet, error)

func (*MariadbGTIDEvent) IsDDL

func (e *MariadbGTIDEvent) IsDDL() bool

func (*MariadbGTIDEvent) IsGroupCommit

func (e *MariadbGTIDEvent) IsGroupCommit() bool

func (*MariadbGTIDEvent) IsStandalone

func (e *MariadbGTIDEvent) IsStandalone() bool

type MariadbGTIDListEvent

type MariadbGTIDListEvent struct {
	GTIDs []MariadbGTID
}

func (*MariadbGTIDListEvent) Decode

func (e *MariadbGTIDListEvent) Decode(data []byte) error

func (*MariadbGTIDListEvent) Dump

func (e *MariadbGTIDListEvent) Dump(w io.Writer)

type OnEventFunc

type OnEventFunc func(*BinlogEvent) error

type PreviousGTIDsEvent

type PreviousGTIDsEvent struct {
	GTIDSets string
}

func (*PreviousGTIDsEvent) Decode

func (e *PreviousGTIDsEvent) Decode(data []byte) error

func (*PreviousGTIDsEvent) Dump

func (e *PreviousGTIDsEvent) Dump(w io.Writer)

type QueryEvent

type QueryEvent struct {
	SlaveProxyID  uint32
	ExecutionTime uint32
	ErrorCode     uint16
	StatusVars    []byte
	Schema        []byte
	Query         []byte

	// in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
	GSet GTIDSet
	// contains filtered or unexported fields
}

func (*QueryEvent) Decode

func (e *QueryEvent) Decode(data []byte) error

func (*QueryEvent) Dump

func (e *QueryEvent) Dump(w io.Writer)

type RotateEvent

type RotateEvent struct {
	Position    uint64
	NextLogName []byte
}

func (*RotateEvent) Decode

func (e *RotateEvent) Decode(data []byte) error

func (*RotateEvent) Dump

func (e *RotateEvent) Dump(w io.Writer)

type RowsEvent

type RowsEvent struct {
	// 0, 1, 2
	Version int

	Table *TableMapEvent

	TableID uint64

	Flags uint16

	// if version == 2
	// Use when DataLen value is greater than 2
	NdbFormat byte
	NdbData   []byte

	PartitionId       uint16
	SourcePartitionId uint16

	// lenenc_int
	ColumnCount uint64

	// len = (ColumnCount + 7) / 8
	ColumnBitmap1 []byte

	// if UPDATE_ROWS_EVENTv1 or v2, or PARTIAL_UPDATE_ROWS_EVENT
	// len = (ColumnCount + 7) / 8
	ColumnBitmap2 []byte

	// rows: all return types from RowsEvent.decodeValue()
	Rows           [][]interface{}
	SkippedColumns [][]int
	// contains filtered or unexported fields
}

RowsEvent represents a MySQL rows event like DELETE_ROWS_EVENT, UPDATE_ROWS_EVENT, etc. RowsEvent.Rows saves the rows data, and the MySQL type to golang type mapping is - MYSQL_TYPE_NULL: nil - MYSQL_TYPE_LONG: int32 - MYSQL_TYPE_TINY: int8 - MYSQL_TYPE_SHORT: int16 - MYSQL_TYPE_INT24: int32 - MYSQL_TYPE_LONGLONG: int64 - MYSQL_TYPE_NEWDECIMAL: string / "github.com/shopspring/decimal".Decimal - MYSQL_TYPE_FLOAT: float32 - MYSQL_TYPE_DOUBLE: float64 - MYSQL_TYPE_BIT: int64 - MYSQL_TYPE_TIMESTAMP: string / time.Time - MYSQL_TYPE_TIMESTAMP2: string / time.Time - MYSQL_TYPE_DATETIME: string / time.Time - MYSQL_TYPE_DATETIME2: string / time.Time - MYSQL_TYPE_TIME: string - MYSQL_TYPE_TIME2: string - MYSQL_TYPE_DATE: string - MYSQL_TYPE_YEAR: int - MYSQL_TYPE_ENUM: int64 - MYSQL_TYPE_SET: int64 - MYSQL_TYPE_BLOB: []byte - MYSQL_TYPE_VARCHAR: string - MYSQL_TYPE_VAR_STRING: string - MYSQL_TYPE_STRING: string - MYSQL_TYPE_JSON: []byte / *replication.JsonDiff - MYSQL_TYPE_GEOMETRY: []byte

func (*RowsEvent) Decode

func (e *RowsEvent) Decode(data []byte) error

func (*RowsEvent) DecodeData

func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error)

func (*RowsEvent) DecodeHeader

func (e *RowsEvent) DecodeHeader(data []byte) (int, error)

func (*RowsEvent) Dump

func (e *RowsEvent) Dump(w io.Writer)

type RowsQueryEvent

type RowsQueryEvent struct {
	Query []byte
}

func (*RowsQueryEvent) Decode

func (e *RowsQueryEvent) Decode(data []byte) error

func (*RowsQueryEvent) Dump

func (e *RowsQueryEvent) Dump(w io.Writer)

type TableMapEvent

type TableMapEvent struct {
	TableID uint64

	Flags uint16

	Schema []byte
	Table  []byte

	ColumnCount uint64
	ColumnType  []byte
	ColumnMeta  []uint16

	// len = (ColumnCount + 7) / 8
	NullBitmap []byte

	// SignednessBitmap stores signedness info for numeric columns.
	SignednessBitmap []byte

	// DefaultCharset[0] is the default collation of character columns.
	// For character columns that have different charset,
	// (character column index, column collation) pairs follows
	DefaultCharset []uint64
	// ColumnCharset contains collation sequence for all character columns
	ColumnCharset []uint64

	// SetStrValue stores values for set columns.
	SetStrValue [][][]byte

	// EnumStrValue stores values for enum columns.
	EnumStrValue [][][]byte

	// ColumnName list all column names.
	ColumnName [][]byte

	// GeometryType stores real type for geometry columns.
	GeometryType []uint64

	// PrimaryKey is a sequence of column indexes of primary key.
	PrimaryKey []uint64

	// PrimaryKeyPrefix is the prefix length used for each column of primary key.
	// 0 means that the whole column length is used.
	PrimaryKeyPrefix []uint64

	// EnumSetDefaultCharset/EnumSetColumnCharset is similar to DefaultCharset/ColumnCharset but for enum/set columns.
	EnumSetDefaultCharset []uint64
	EnumSetColumnCharset  []uint64

	// VisibilityBitmap stores bits that are set if corresponding column is not invisible (MySQL 8.0.23+)
	VisibilityBitmap []byte
	// contains filtered or unexported fields
}

func (*TableMapEvent) CollationMap

func (e *TableMapEvent) CollationMap() map[int]uint64

CollationMap returns a map: column index -> collation id. Note that only character columns will be returned. nil is returned if not available or no character columns at all.

func (*TableMapEvent) ColumnNameString

func (e *TableMapEvent) ColumnNameString() []string

ColumnNameString returns column names as string slice. nil is returned if not available.

func (*TableMapEvent) Decode

func (e *TableMapEvent) Decode(data []byte) error

func (*TableMapEvent) Dump

func (e *TableMapEvent) Dump(w io.Writer)

func (*TableMapEvent) EnumSetCollationMap

func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64

EnumSetCollationMap returns a map: column index -> collation id. Note that only enum or set columns will be returned. nil is returned if not available or no enum/set columns at all.

func (*TableMapEvent) EnumStrValueMap

func (e *TableMapEvent) EnumStrValueMap() map[int][]string

EnumStrValueMap returns a map: column index -> enum string value. Note that only enum columns will be returned. nil is returned if not available or no enum columns at all.

func (*TableMapEvent) EnumStrValueString

func (e *TableMapEvent) EnumStrValueString() [][]string

EnumStrValueString returns values for enum columns as string slices. nil is returned if not available or no enum columns at all.

func (*TableMapEvent) GeometryTypeMap

func (e *TableMapEvent) GeometryTypeMap() map[int]uint64

GeometryTypeMap returns a map: column index -> geometry type. Note that only geometry columns will be returned. nil is returned if not available or no geometry columns at all.

func (*TableMapEvent) IsCharacterColumn

func (e *TableMapEvent) IsCharacterColumn(i int) bool

IsCharacterColumn returns true if the column type is considered as character type. Note that JSON/GEOMETRY types are treated as character type in mariadb. (JSON is an alias for LONGTEXT in mariadb: https://mariadb.com/kb/en/json-data-type/)

func (*TableMapEvent) IsEnumColumn

func (e *TableMapEvent) IsEnumColumn(i int) bool

func (*TableMapEvent) IsEnumOrSetColumn

func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool

func (*TableMapEvent) IsGeometryColumn

func (e *TableMapEvent) IsGeometryColumn(i int) bool

func (*TableMapEvent) IsNumericColumn

func (e *TableMapEvent) IsNumericColumn(i int) bool

func (*TableMapEvent) IsSetColumn

func (e *TableMapEvent) IsSetColumn(i int) bool

func (*TableMapEvent) JsonColumnCount

func (e *TableMapEvent) JsonColumnCount() uint64

JsonColumnCount returns the number of JSON columns in this table

func (*TableMapEvent) Nullable

func (e *TableMapEvent) Nullable(i int) (available, nullable bool)

Nullable returns the nullablity of the i-th column. If null bits are not available, available is false. i must be in range [0, ColumnCount).

func (*TableMapEvent) SetStrValueMap

func (e *TableMapEvent) SetStrValueMap() map[int][]string

SetStrValueMap returns a map: column index -> set string value. Note that only set columns will be returned. nil is returned if not available or no set columns at all.

func (*TableMapEvent) SetStrValueString

func (e *TableMapEvent) SetStrValueString() [][]string

SetStrValueString returns values for set columns as string slices. nil is returned if not available or no set columns at all.

func (*TableMapEvent) UnsignedMap

func (e *TableMapEvent) UnsignedMap() map[int]bool

UnsignedMap returns a map: column index -> unsigned. Note that only numeric columns will be returned. nil is returned if not available or no numeric columns at all.

func (*TableMapEvent) VisibilityMap

func (e *TableMapEvent) VisibilityMap() map[int]bool

VisibilityMap returns a map: column index -> visiblity. Invisible column was introduced in MySQL 8.0.23 nil is returned if not available.

type TransactionPayloadEvent

type TransactionPayloadEvent struct {
	Size             uint64
	UncompressedSize uint64
	CompressionType  uint64
	Payload          []byte
	Events           []*BinlogEvent
	// contains filtered or unexported fields
}

func (*TransactionPayloadEvent) Decode

func (e *TransactionPayloadEvent) Decode(data []byte) error

func (*TransactionPayloadEvent) Dump

func (e *TransactionPayloadEvent) Dump(w io.Writer)

type XIDEvent

type XIDEvent struct {
	XID uint64

	// in fact XIDEvent dosen't have the GTIDSet information, just for beneficial to use
	GSet GTIDSet
}

func (*XIDEvent) Decode

func (e *XIDEvent) Decode(data []byte) error

func (*XIDEvent) Dump

func (e *XIDEvent) Dump(w io.Writer)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL