replication

package
v1.8.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 1, 2024 License: MIT Imports: 29 Imported by: 0

Documentation

Overview

Replication package is to handle MySQL replication protocol.

Todo:

+ Get table information when handing rows event.

Index

Constants

View Source
const (
	LOG_EVENT_BINLOG_IN_USE_F            uint16 = 0x0001
	LOG_EVENT_FORCED_ROTATE_F            uint16 = 0x0002
	LOG_EVENT_THREAD_SPECIFIC_F          uint16 = 0x0004
	LOG_EVENT_SUPPRESS_USE_F             uint16 = 0x0008
	LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F uint16 = 0x0010
	LOG_EVENT_ARTIFICIAL_F               uint16 = 0x0020
	LOG_EVENT_RELAY_LOG_F                uint16 = 0x0040
	LOG_EVENT_IGNORABLE_F                uint16 = 0x0080
	LOG_EVENT_NO_FILTER_F                uint16 = 0x0100
	LOG_EVENT_MTS_ISOLATE_F              uint16 = 0x0200
)
View Source
const (
	BINLOG_DUMP_NEVER_STOP          uint16 = 0x00
	BINLOG_DUMP_NON_BLOCK           uint16 = 0x01
	BINLOG_SEND_ANNOTATE_ROWS_EVENT uint16 = 0x02
	BINLOG_THROUGH_POSITION         uint16 = 0x02
	BINLOG_THROUGH_GTID             uint16 = 0x04
)
View Source
const (
	BINLOG_ROW_IMAGE_FULL    = "FULL"
	BINLOG_ROW_IMAGE_MINIMAL = "MINIMAL"
	BINLOG_ROW_IMAGE_NOBLOB  = "NOBLOB"
)
View Source
const (
	BINLOG_MARIADB_FL_STANDALONE      = 1 << iota /*1  - FL_STANDALONE is set when there is no terminating COMMIT event*/
	BINLOG_MARIADB_FL_GROUP_COMMIT_ID             /*2  - FL_GROUP_COMMIT_ID is set when event group is part of a group commit on the master. Groups with same commit_id are part of the same group commit.*/
	BINLOG_MARIADB_FL_TRANSACTIONAL               /*4  - FL_TRANSACTIONAL is set for an event group that can be safely rolled back (no MyISAM, eg.).*/
	BINLOG_MARIADB_FL_ALLOW_PARALLEL              /*8  - FL_ALLOW_PARALLEL reflects the (negation of the) value of @@SESSION.skip_parallel_replication at the time of commit*/
	BINLOG_MARIADB_FL_WAITED                      /*16 = FL_WAITED is set if a row lock wait (or other wait) is detected during the execution of the transaction*/
	BINLOG_MARIADB_FL_DDL                         /*32 - FL_DDL is set for event group containing DDL*/
)
View Source
const (
	BINLOG_CHECKSUM_ALG_OFF byte = 0 // Events are without checksum though its generator
	// is checksum-capable New Master (NM).
	BINLOG_CHECKSUM_ALG_CRC32 byte = 1 // CRC32 of zlib algorithm.
	//  BINLOG_CHECKSUM_ALG_ENUM_END,  // the cut line: valid alg range is [1, 0x7f].
	BINLOG_CHECKSUM_ALG_UNDEF byte = 255 // special value to tag undetermined yet checksum

)
View Source
const (
	TABLE_MAP_OPT_META_SIGNEDNESS byte = iota + 1
	TABLE_MAP_OPT_META_DEFAULT_CHARSET
	TABLE_MAP_OPT_META_COLUMN_CHARSET
	TABLE_MAP_OPT_META_COLUMN_NAME
	TABLE_MAP_OPT_META_SET_STR_VALUE
	TABLE_MAP_OPT_META_ENUM_STR_VALUE
	TABLE_MAP_OPT_META_GEOMETRY_TYPE
	TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY
	TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX
	TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET
	TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET
	TABLE_MAP_OPT_META_COLUMN_VISIBILITY
)

These are TABLE_MAP_EVENT's optional metadata field type, from: libbinlogevents/include/rows_event.h

View Source
const (
	ENUM_EXTRA_ROW_INFO_TYPECODE_NDB byte = iota
	ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION
)
View Source
const (
	EventHeaderSize            = 19
	SidLength                  = 16
	LogicalTimestampTypeCode   = 2
	PartLogicalTimestampLength = 8
	BinlogChecksumLength       = 4
	UndefinedServerVer         = 999999 // UNDEFINED_SERVER_VERSION
)
View Source
const (
	JSONB_SMALL_OBJECT byte = iota // small JSON object
	JSONB_LARGE_OBJECT             // large JSON object
	JSONB_SMALL_ARRAY              // small JSON array
	JSONB_LARGE_ARRAY              // large JSON array
	JSONB_LITERAL                  // literal (true/false/null)
	JSONB_INT16                    // int16
	JSONB_UINT16                   // uint16
	JSONB_INT32                    // int32
	JSONB_UINT32                   // uint32
	JSONB_INT64                    // int64
	JSONB_UINT64                   // uint64
	JSONB_DOUBLE                   // double
	JSONB_STRING                   // string
	JSONB_OPAQUE       byte = 0x0f // custom data (any MySQL data type)
)
View Source
const (
	JSONB_NULL_LITERAL  byte = 0x00
	JSONB_TRUE_LITERAL  byte = 0x01
	JSONB_FALSE_LITERAL byte = 0x02
)
View Source
const (
	// The JSON value in the given path is replaced with a new value.
	//
	// It has the same effect as `JSON_REPLACE(col, path, value)`.
	JsonDiffOperationReplace = JsonDiffOperation(iota)

	// Add a new element at the given path.
	//
	//  If the path specifies an array element, it has the same effect as `JSON_ARRAY_INSERT(col, path, value)`.
	//
	//  If the path specifies an object member, it has the same effect as `JSON_INSERT(col, path, value)`.
	JsonDiffOperationInsert

	// The JSON value at the given path is removed from an array or object.
	//
	// It has the same effect as `JSON_REMOVE(col, path)`.
	JsonDiffOperationRemove
)
View Source
const (
	EnumRowImageTypeWriteAI = EnumRowImageType(iota)
	EnumRowImageTypeUpdateBI
	EnumRowImageTypeUpdateAI
	EnumRowImageTypeDeleteBI
)
View Source
const (
	OTW_PAYLOAD_HEADER_END_MARK = iota
	OTW_PAYLOAD_SIZE_FIELD
	OTW_PAYLOAD_COMPRESSION_TYPE_FIELD
	OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD
)

On The Wire: Field Types See also binary_log::codecs::binary::Transaction_payload::fields in MySQL https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1codecs_1_1binary_1_1Transaction__payload.html#a9fff7ac12ba064f40e9216565c53d07b

View Source
const (
	ZSTD = 0
	NONE = 255
)

Compression Types

View Source
const DATETIMEF_INT_OFS int64 = 0x8000000000
View Source
const (
	// Store JSON updates in partial form
	EnumBinlogRowValueOptionsPartialJsonUpdates = EnumBinlogRowValueOptions(iota + 1)
)
View Source
const (
	//we only support MySQL 5.0.0+ binlog format, maybe???
	MinBinlogVersion = 4
)
View Source
const RowsEventStmtEndFlag = 0x01

RowsEventStmtEndFlag is set in the end of the statement.

View Source
const TIMEF_INT_OFS int64 = 0x800000
View Source
const TIMEF_OFS int64 = 0x800000000000

Variables

View Source
var (
	ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again")
	ErrSyncClosed    = errors.New("Sync was closed")
)
View Source
var (
	//binlog header [ fe `bin` ]
	BinLogFileHeader = []byte{0xfe, 0x62, 0x69, 0x6e}

	SemiSyncIndicator byte = 0xef
)
View Source
var (
	// ErrChecksumMismatch indicates binlog checksum mismatch.
	ErrChecksumMismatch = errors.New("binlog checksum mismatch, data may be corrupted")
)
View Source
var (
	ErrCorruptedJSONDiff = fmt.Errorf("corrupted JSON diff") // ER_CORRUPTED_JSON_DIFF
)

Functions

This section is empty.

Types

type BeginLoadQueryEvent

type BeginLoadQueryEvent struct {
	FileID    uint32
	BlockData []byte
}

func (*BeginLoadQueryEvent) Decode

func (e *BeginLoadQueryEvent) Decode(data []byte) error

func (*BeginLoadQueryEvent) Dump

func (e *BeginLoadQueryEvent) Dump(w io.Writer)

type BinlogEvent

type BinlogEvent struct {
	// raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists
	RawData []byte

	Header *EventHeader
	Event  Event
}

func (*BinlogEvent) Dump

func (e *BinlogEvent) Dump(w io.Writer)

type BinlogParser

type BinlogParser struct {
	// contains filtered or unexported fields
}

func NewBinlogParser

func NewBinlogParser() *BinlogParser

func (*BinlogParser) Parse

func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error)

Parse: Given the bytes for a a binary log event: return the decoded event. With the exception of the FORMAT_DESCRIPTION_EVENT event type there must have previously been passed a FORMAT_DESCRIPTION_EVENT into the parser for this to work properly on any given event. Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace an existing one.

func (*BinlogParser) ParseFile

func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error

func (*BinlogParser) ParseReader

func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error

func (*BinlogParser) ParseSingleEvent

func (p *BinlogParser) ParseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error)

ParseSingleEvent parses single binlog event and passes the event to onEvent function.

func (*BinlogParser) Reset

func (p *BinlogParser) Reset()

func (*BinlogParser) Resume

func (p *BinlogParser) Resume()

func (*BinlogParser) SetFlavor

func (p *BinlogParser) SetFlavor(flavor string)

func (*BinlogParser) SetIgnoreJSONDecodeError

func (p *BinlogParser) SetIgnoreJSONDecodeError(ignoreJSONDecodeErr bool)

func (*BinlogParser) SetParseTime

func (p *BinlogParser) SetParseTime(parseTime bool)

func (*BinlogParser) SetRawMode

func (p *BinlogParser) SetRawMode(mode bool)

func (*BinlogParser) SetRowsEventDecodeFunc

func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error)

func (*BinlogParser) SetTableMapOptionalMetaDecodeFunc

func (p *BinlogParser) SetTableMapOptionalMetaDecodeFunc(tableMapOptionalMetaDecondeFunc func([]byte) error)

func (*BinlogParser) SetTimestampStringLocation

func (p *BinlogParser) SetTimestampStringLocation(timestampStringLocation *time.Location)

func (*BinlogParser) SetUseDecimal

func (p *BinlogParser) SetUseDecimal(useDecimal bool)

func (*BinlogParser) SetVerifyChecksum

func (p *BinlogParser) SetVerifyChecksum(verify bool)

func (*BinlogParser) Stop

func (p *BinlogParser) Stop()

type BinlogStreamer

type BinlogStreamer struct {
	// contains filtered or unexported fields
}

BinlogStreamer gets the streaming event.

func NewBinlogStreamer

func NewBinlogStreamer() *BinlogStreamer

func NewBinlogStreamerWithChanSize

func NewBinlogStreamerWithChanSize(chanSize int) *BinlogStreamer

func (*BinlogStreamer) AddErrorToStreamer

func (s *BinlogStreamer) AddErrorToStreamer(err error) bool

AddErrorToStreamer adds an error to the streamer.

func (*BinlogStreamer) AddEventToStreamer

func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error

AddEventToStreamer adds a binlog event to the streamer. You can use it when you want to add an event to the streamer manually. can be used in replication handlers

func (*BinlogStreamer) DumpEvents

func (s *BinlogStreamer) DumpEvents() []*BinlogEvent

DumpEvents dumps all left events

func (*BinlogStreamer) GetEvent

func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error)

GetEvent gets the binlog event one by one, it will block until Syncer receives any events from MySQL or meets a sync error. You can pass a context (like Cancel or Timeout) to break the block.

func (*BinlogStreamer) GetEventWithStartTime

func (s *BinlogStreamer) GetEventWithStartTime(ctx context.Context, startTime time.Time) (*BinlogEvent, error)

GetEventWithStartTime gets the binlog event with starttime, if current binlog event timestamp smaller than specify starttime return nil event

type BinlogSyncer

type BinlogSyncer struct {
	// contains filtered or unexported fields
}

BinlogSyncer syncs binlog event from server.

func NewBinlogSyncer

func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer

NewBinlogSyncer creates the BinlogSyncer with cfg.

func (*BinlogSyncer) Close

func (b *BinlogSyncer) Close()

Close closes the BinlogSyncer.

func (*BinlogSyncer) GetNextPosition

func (b *BinlogSyncer) GetNextPosition() Position

GetNextPosition returns the next position of the syncer

func (*BinlogSyncer) LastConnectionID

func (b *BinlogSyncer) LastConnectionID() uint32

LastConnectionID returns last connectionID.

func (*BinlogSyncer) StartBackup

func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error

StartBackup: Like mysqlbinlog remote raw backup Backup remote binlog from position (filename, offset) and write in backupDir

func (*BinlogSyncer) StartBackupWithHandler

func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
	handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error)

StartBackupWithHandler starts the backup process for the binary log using the specified position and handler. The process will continue until the timeout is reached or an error occurs.

Parameters:

  • p: The starting position in the binlog from which to begin the backup.
  • timeout: The maximum duration to wait for new binlog events before stopping the backup process. If set to 0, a default very long timeout (30 days) is used instead.
  • handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.

func (*BinlogSyncer) StartSync

func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error)

StartSync starts syncing from the `pos` position.

func (*BinlogSyncer) StartSyncGTID

func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error)

StartSyncGTID starts syncing from the `gset` GTIDSet.

type BinlogSyncerConfig

type BinlogSyncerConfig struct {
	// ServerID is the unique ID in cluster.
	ServerID uint32
	// Flavor is "mysql" or "mariadb", if not set, use "mysql" default.
	Flavor string

	// Host is for MySQL server host.
	Host string
	// Port is for MySQL server port.
	Port uint16
	// User is for MySQL user.
	User string
	// Password is for MySQL password.
	Password string

	// Localhost is local hostname if register salve.
	// If not set, use os.Hostname() instead.
	Localhost string

	// Charset is for MySQL client character set
	Charset string

	// SemiSyncEnabled enables semi-sync or not.
	SemiSyncEnabled bool

	// RawModeEnabled is for not parsing binlog event.
	RawModeEnabled bool

	// If not nil, use the provided tls.Config to connect to the database using TLS/SSL.
	TLSConfig *tls.Config

	// Use replication.Time structure for timestamp and datetime.
	// We will use Local location for timestamp and UTC location for datatime.
	ParseTime bool

	// If ParseTime is false, convert TIMESTAMP into this specified timezone. If
	// ParseTime is true, this option will have no effect and TIMESTAMP data will
	// be parsed into the local timezone and a full time.Time struct will be
	// returned.
	//
	// Note that MySQL TIMESTAMP columns are offset from the machine local
	// timezone while DATETIME columns are offset from UTC. This is consistent
	// with documented MySQL behaviour as it return TIMESTAMP in local timezone
	// and DATETIME in UTC.
	//
	// Setting this to UTC effectively equalizes the TIMESTAMP and DATETIME time
	// strings obtained from MySQL.
	TimestampStringLocation *time.Location

	// Use decimal.Decimal structure for decimals.
	UseDecimal bool

	// RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection.
	RecvBufferSize int

	// master heartbeat period
	HeartbeatPeriod time.Duration

	// read timeout
	ReadTimeout time.Duration

	// maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry.
	// this configuration will not work if DisableRetrySync is true
	MaxReconnectAttempts int

	// whether disable re-sync for broken connection
	DisableRetrySync bool

	// Only works when MySQL/MariaDB variable binlog_checksum=CRC32.
	// For MySQL, binlog_checksum was introduced since 5.6.2, but CRC32 was set as default value since 5.6.6 .
	// https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#option_mysqld_binlog-checksum
	// For MariaDB, binlog_checksum was introduced since MariaDB 5.3, but CRC32 was set as default value since MariaDB 10.2.1 .
	// https://mariadb.com/kb/en/library/replication-and-binary-log-server-system-variables/#binlog_checksum
	VerifyChecksum bool

	// DumpCommandFlag is used to send binglog dump command. Default 0, aka BINLOG_DUMP_NEVER_STOP.
	// For MySQL, BINLOG_DUMP_NEVER_STOP and BINLOG_DUMP_NON_BLOCK are available.
	// https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block
	// For MariaDB, BINLOG_DUMP_NEVER_STOP, BINLOG_DUMP_NON_BLOCK and BINLOG_SEND_ANNOTATE_ROWS_EVENT are available.
	// https://mariadb.com/kb/en/library/com_binlog_dump/
	// https://mariadb.com/kb/en/library/annotate_rows_event/
	DumpCommandFlag uint16

	//Option function is used to set outside of BinlogSyncerConfig, between mysql connection and COM_REGISTER_SLAVE
	//For MariaDB: slave_gtid_ignore_duplicates、skip_replication、slave_until_gtid
	Option func(*client.Conn) error

	// Set Logger
	Logger loggers.Advanced

	// Set Dialer
	Dialer client.Dialer

	RowsEventDecodeFunc func(*RowsEvent, []byte) error

	TableMapOptionalMetaDecodeFunc func([]byte) error

	DiscardGTIDSet bool

	EventCacheCount int
}

BinlogSyncerConfig is the configuration for BinlogSyncer.

type EnumBinlogRowValueOptions

type EnumBinlogRowValueOptions byte

Bits for binlog_row_value_options sysvar

type EnumRowImageType

type EnumRowImageType byte

EnumRowImageType is allowed types for every row in mysql binlog. See https://github.com/mysql/mysql-server/blob/1bfe02bdad6604d54913c62614bde57a055c8332/sql/rpl_record.h#L39 enum class enum_row_image_type { WRITE_AI, UPDATE_BI, UPDATE_AI, DELETE_BI };

func (EnumRowImageType) String

func (t EnumRowImageType) String() string

type Event

type Event interface {
	//Dump Event, format like python-mysql-replication
	Dump(w io.Writer)

	Decode(data []byte) error
}

type EventError

type EventError struct {
	Header *EventHeader

	//Error message
	Err string

	//Event data
	Data []byte
}

func (*EventError) Error

func (e *EventError) Error() string

type EventHeader

type EventHeader struct {
	Timestamp uint32
	EventType EventType
	ServerID  uint32
	EventSize uint32
	LogPos    uint32
	Flags     uint16
}

func (*EventHeader) Decode

func (h *EventHeader) Decode(data []byte) error

func (*EventHeader) Dump

func (h *EventHeader) Dump(w io.Writer)

type EventType

type EventType byte
const (
	UNKNOWN_EVENT EventType = iota
	START_EVENT_V3
	QUERY_EVENT
	STOP_EVENT
	ROTATE_EVENT
	INTVAR_EVENT
	LOAD_EVENT
	SLAVE_EVENT
	CREATE_FILE_EVENT
	APPEND_BLOCK_EVENT
	EXEC_LOAD_EVENT
	DELETE_FILE_EVENT
	NEW_LOAD_EVENT
	RAND_EVENT
	USER_VAR_EVENT
	FORMAT_DESCRIPTION_EVENT
	XID_EVENT
	BEGIN_LOAD_QUERY_EVENT
	EXECUTE_LOAD_QUERY_EVENT
	TABLE_MAP_EVENT
	WRITE_ROWS_EVENTv0
	UPDATE_ROWS_EVENTv0
	DELETE_ROWS_EVENTv0
	WRITE_ROWS_EVENTv1
	UPDATE_ROWS_EVENTv1
	DELETE_ROWS_EVENTv1
	INCIDENT_EVENT
	HEARTBEAT_EVENT
	IGNORABLE_EVENT
	ROWS_QUERY_EVENT
	WRITE_ROWS_EVENTv2
	UPDATE_ROWS_EVENTv2
	DELETE_ROWS_EVENTv2
	GTID_EVENT
	ANONYMOUS_GTID_EVENT
	PREVIOUS_GTIDS_EVENT
	TRANSACTION_CONTEXT_EVENT
	VIEW_CHANGE_EVENT
	XA_PREPARE_LOG_EVENT
	PARTIAL_UPDATE_ROWS_EVENT
	TRANSACTION_PAYLOAD_EVENT
	HEARTBEAT_LOG_EVENT_V2
)
const (
	// MariaDB event starts from 160
	MARIADB_ANNOTATE_ROWS_EVENT EventType = 160 + iota
	MARIADB_BINLOG_CHECKPOINT_EVENT
	MARIADB_GTID_EVENT
	MARIADB_GTID_LIST_EVENT
	MARIADB_START_ENCRYPTION_EVENT
	MARIADB_QUERY_COMPRESSED_EVENT
	MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1
	MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1
	MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1
)

func (EventType) String

func (e EventType) String() string

type ExecuteLoadQueryEvent

type ExecuteLoadQueryEvent struct {
	SlaveProxyID     uint32
	ExecutionTime    uint32
	SchemaLength     uint8
	ErrorCode        uint16
	StatusVars       uint16
	FileID           uint32
	StartPos         uint32
	EndPos           uint32
	DupHandlingFlags uint8
}

func (*ExecuteLoadQueryEvent) Decode

func (e *ExecuteLoadQueryEvent) Decode(data []byte) error

func (*ExecuteLoadQueryEvent) Dump

func (e *ExecuteLoadQueryEvent) Dump(w io.Writer)

type FormatDescriptionEvent

type FormatDescriptionEvent struct {
	Version uint16
	//len = 50
	ServerVersion          []byte
	CreateTimestamp        uint32
	EventHeaderLength      uint8
	EventTypeHeaderLengths []byte

	// 0 is off, 1 is for CRC32, 255 is undefined
	ChecksumAlgorithm byte
}

func (*FormatDescriptionEvent) Decode

func (e *FormatDescriptionEvent) Decode(data []byte) error

func (*FormatDescriptionEvent) Dump

func (e *FormatDescriptionEvent) Dump(w io.Writer)

type GTIDEvent

type GTIDEvent struct {
	CommitFlag     uint8
	SID            []byte
	GNO            int64
	LastCommitted  int64
	SequenceNumber int64

	// ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see:
	// https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/
	ImmediateCommitTimestamp uint64
	OriginalCommitTimestamp  uint64

	// Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see:
	// https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/
	TransactionLength uint64

	// ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see
	// https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html
	ImmediateServerVersion uint32
	OriginalServerVersion  uint32
}

func (*GTIDEvent) Decode

func (e *GTIDEvent) Decode(data []byte) error

func (*GTIDEvent) Dump

func (e *GTIDEvent) Dump(w io.Writer)

func (*GTIDEvent) GTIDNext

func (e *GTIDEvent) GTIDNext() (GTIDSet, error)

func (*GTIDEvent) ImmediateCommitTime

func (e *GTIDEvent) ImmediateCommitTime() time.Time

ImmediateCommitTime returns the commit time of this trx on the immediate server or zero time if not available.

func (*GTIDEvent) OriginalCommitTime

func (e *GTIDEvent) OriginalCommitTime() time.Time

OriginalCommitTime returns the commit time of this trx on the original server or zero time if not available.

type GenericEvent

type GenericEvent struct {
	Data []byte
}

we don't parse all event, so some we will use GenericEvent instead

func (*GenericEvent) Decode

func (e *GenericEvent) Decode(data []byte) error

func (*GenericEvent) Dump

func (e *GenericEvent) Dump(w io.Writer)

type IntVarEvent

type IntVarEvent struct {
	Type  IntVarEventType
	Value uint64
}

func (*IntVarEvent) Decode

func (i *IntVarEvent) Decode(data []byte) error

func (*IntVarEvent) Dump

func (i *IntVarEvent) Dump(w io.Writer)

type IntVarEventType

type IntVarEventType byte
const (
	INVALID IntVarEventType = iota
	LAST_INSERT_ID
	INSERT_ID
)

type JsonDiff

type JsonDiff struct {
	Op    JsonDiffOperation
	Path  string
	Value string
}

func (*JsonDiff) String

func (jd *JsonDiff) String() string

type JsonDiffOperation

type JsonDiffOperation byte

JsonDiffOperation is an enum that describes what kind of operation a JsonDiff object represents. https://github.com/mysql/mysql-server/blob/8.0/sql/json_diff.h

func (JsonDiffOperation) String

func (op JsonDiffOperation) String() string

type MariadbAnnotateRowsEvent

type MariadbAnnotateRowsEvent struct {
	Query []byte
}

func (*MariadbAnnotateRowsEvent) Decode

func (e *MariadbAnnotateRowsEvent) Decode(data []byte) error

func (*MariadbAnnotateRowsEvent) Dump

func (e *MariadbAnnotateRowsEvent) Dump(w io.Writer)

type MariadbBinlogCheckPointEvent

type MariadbBinlogCheckPointEvent struct {
	Info []byte
}

func (*MariadbBinlogCheckPointEvent) Decode

func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error

func (*MariadbBinlogCheckPointEvent) Dump

type MariadbGTIDEvent

type MariadbGTIDEvent struct {
	GTID     MariadbGTID
	Flags    byte
	CommitID uint64
}

func (*MariadbGTIDEvent) Decode

func (e *MariadbGTIDEvent) Decode(data []byte) error

func (*MariadbGTIDEvent) Dump

func (e *MariadbGTIDEvent) Dump(w io.Writer)

func (*MariadbGTIDEvent) GTIDNext

func (e *MariadbGTIDEvent) GTIDNext() (GTIDSet, error)

func (*MariadbGTIDEvent) IsDDL

func (e *MariadbGTIDEvent) IsDDL() bool

func (*MariadbGTIDEvent) IsGroupCommit

func (e *MariadbGTIDEvent) IsGroupCommit() bool

func (*MariadbGTIDEvent) IsStandalone

func (e *MariadbGTIDEvent) IsStandalone() bool

type MariadbGTIDListEvent

type MariadbGTIDListEvent struct {
	GTIDs []MariadbGTID
}

func (*MariadbGTIDListEvent) Decode

func (e *MariadbGTIDListEvent) Decode(data []byte) error

func (*MariadbGTIDListEvent) Dump

func (e *MariadbGTIDListEvent) Dump(w io.Writer)

type OnEventFunc

type OnEventFunc func(*BinlogEvent) error

type PreviousGTIDsEvent

type PreviousGTIDsEvent struct {
	GTIDSets string
}

func (*PreviousGTIDsEvent) Decode

func (e *PreviousGTIDsEvent) Decode(data []byte) error

func (*PreviousGTIDsEvent) Dump

func (e *PreviousGTIDsEvent) Dump(w io.Writer)

type QueryEvent

type QueryEvent struct {
	SlaveProxyID  uint32
	ExecutionTime uint32
	ErrorCode     uint16
	StatusVars    []byte
	Schema        []byte
	Query         []byte

	// in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
	GSet GTIDSet
	// contains filtered or unexported fields
}

func (*QueryEvent) Decode

func (e *QueryEvent) Decode(data []byte) error

func (*QueryEvent) Dump

func (e *QueryEvent) Dump(w io.Writer)

type RotateEvent

type RotateEvent struct {
	Position    uint64
	NextLogName []byte
}

func (*RotateEvent) Decode

func (e *RotateEvent) Decode(data []byte) error

func (*RotateEvent) Dump

func (e *RotateEvent) Dump(w io.Writer)

type RowsEvent

type RowsEvent struct {
	// 0, 1, 2
	Version int

	Table *TableMapEvent

	TableID uint64

	Flags uint16

	// if version == 2
	// Use when DataLen value is greater than 2
	NdbFormat byte
	NdbData   []byte

	PartitionId       uint16
	SourcePartitionId uint16

	// lenenc_int
	ColumnCount uint64

	// len = (ColumnCount + 7) / 8
	ColumnBitmap1 []byte

	// if UPDATE_ROWS_EVENTv1 or v2, or PARTIAL_UPDATE_ROWS_EVENT
	// len = (ColumnCount + 7) / 8
	ColumnBitmap2 []byte

	// rows: all return types from RowsEvent.decodeValue()
	Rows           [][]interface{}
	SkippedColumns [][]int
	// contains filtered or unexported fields
}

RowsEvent represents a MySQL rows event like DELETE_ROWS_EVENT, UPDATE_ROWS_EVENT, etc. RowsEvent.Rows saves the rows data, and the MySQL type to golang type mapping is - MYSQL_TYPE_NULL: nil - MYSQL_TYPE_LONG: int32 - MYSQL_TYPE_TINY: int8 - MYSQL_TYPE_SHORT: int16 - MYSQL_TYPE_INT24: int32 - MYSQL_TYPE_LONGLONG: int64 - MYSQL_TYPE_NEWDECIMAL: string / "github.com/shopspring/decimal".Decimal - MYSQL_TYPE_FLOAT: float32 - MYSQL_TYPE_DOUBLE: float64 - MYSQL_TYPE_BIT: int64 - MYSQL_TYPE_TIMESTAMP: string / time.Time - MYSQL_TYPE_TIMESTAMP2: string / time.Time - MYSQL_TYPE_DATETIME: string / time.Time - MYSQL_TYPE_DATETIME2: string / time.Time - MYSQL_TYPE_TIME: string - MYSQL_TYPE_TIME2: string - MYSQL_TYPE_DATE: string - MYSQL_TYPE_YEAR: int - MYSQL_TYPE_ENUM: int64 - MYSQL_TYPE_SET: int64 - MYSQL_TYPE_BLOB: []byte - MYSQL_TYPE_VARCHAR: string - MYSQL_TYPE_VAR_STRING: string - MYSQL_TYPE_STRING: string - MYSQL_TYPE_JSON: []byte / *replication.JsonDiff - MYSQL_TYPE_GEOMETRY: []byte

func (*RowsEvent) Decode

func (e *RowsEvent) Decode(data []byte) error

func (*RowsEvent) DecodeData

func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error)

func (*RowsEvent) DecodeHeader

func (e *RowsEvent) DecodeHeader(data []byte) (int, error)

func (*RowsEvent) Dump

func (e *RowsEvent) Dump(w io.Writer)

type RowsQueryEvent

type RowsQueryEvent struct {
	Query []byte
}

func (*RowsQueryEvent) Decode

func (e *RowsQueryEvent) Decode(data []byte) error

func (*RowsQueryEvent) Dump

func (e *RowsQueryEvent) Dump(w io.Writer)

type TableMapEvent

type TableMapEvent struct {
	TableID uint64

	Flags uint16

	Schema []byte
	Table  []byte

	ColumnCount uint64
	ColumnType  []byte
	ColumnMeta  []uint16

	// len = (ColumnCount + 7) / 8
	NullBitmap []byte

	// SignednessBitmap stores signedness info for numeric columns.
	SignednessBitmap []byte

	// DefaultCharset[0] is the default collation of character columns.
	// For character columns that have different charset,
	// (character column index, column collation) pairs follows
	DefaultCharset []uint64
	// ColumnCharset contains collation sequence for all character columns
	ColumnCharset []uint64

	// SetStrValue stores values for set columns.
	SetStrValue [][][]byte

	// EnumStrValue stores values for enum columns.
	EnumStrValue [][][]byte

	// ColumnName list all column names.
	ColumnName [][]byte

	// GeometryType stores real type for geometry columns.
	GeometryType []uint64

	// PrimaryKey is a sequence of column indexes of primary key.
	PrimaryKey []uint64

	// PrimaryKeyPrefix is the prefix length used for each column of primary key.
	// 0 means that the whole column length is used.
	PrimaryKeyPrefix []uint64

	// EnumSetDefaultCharset/EnumSetColumnCharset is similar to DefaultCharset/ColumnCharset but for enum/set columns.
	EnumSetDefaultCharset []uint64
	EnumSetColumnCharset  []uint64

	// VisibilityBitmap stores bits that are set if corresponding column is not invisible (MySQL 8.0.23+)
	VisibilityBitmap []byte
	// contains filtered or unexported fields
}

func (*TableMapEvent) CollationMap

func (e *TableMapEvent) CollationMap() map[int]uint64

CollationMap returns a map: column index -> collation id. Note that only character columns will be returned. nil is returned if not available or no character columns at all.

func (*TableMapEvent) ColumnNameString

func (e *TableMapEvent) ColumnNameString() []string

ColumnNameString returns column names as string slice. nil is returned if not available.

func (*TableMapEvent) Decode

func (e *TableMapEvent) Decode(data []byte) error

func (*TableMapEvent) Dump

func (e *TableMapEvent) Dump(w io.Writer)

func (*TableMapEvent) EnumSetCollationMap

func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64

EnumSetCollationMap returns a map: column index -> collation id. Note that only enum or set columns will be returned. nil is returned if not available or no enum/set columns at all.

func (*TableMapEvent) EnumStrValueMap

func (e *TableMapEvent) EnumStrValueMap() map[int][]string

EnumStrValueMap returns a map: column index -> enum string value. Note that only enum columns will be returned. nil is returned if not available or no enum columns at all.

func (*TableMapEvent) EnumStrValueString

func (e *TableMapEvent) EnumStrValueString() [][]string

EnumStrValueString returns values for enum columns as string slices. nil is returned if not available or no enum columns at all.

func (*TableMapEvent) GeometryTypeMap

func (e *TableMapEvent) GeometryTypeMap() map[int]uint64

GeometryTypeMap returns a map: column index -> geometry type. Note that only geometry columns will be returned. nil is returned if not available or no geometry columns at all.

func (*TableMapEvent) IsCharacterColumn

func (e *TableMapEvent) IsCharacterColumn(i int) bool

IsCharacterColumn returns true if the column type is considered as character type. Note that JSON/GEOMETRY types are treated as character type in mariadb. (JSON is an alias for LONGTEXT in mariadb: https://mariadb.com/kb/en/json-data-type/)

func (*TableMapEvent) IsEnumColumn

func (e *TableMapEvent) IsEnumColumn(i int) bool

func (*TableMapEvent) IsEnumOrSetColumn

func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool

func (*TableMapEvent) IsGeometryColumn

func (e *TableMapEvent) IsGeometryColumn(i int) bool

func (*TableMapEvent) IsNumericColumn

func (e *TableMapEvent) IsNumericColumn(i int) bool

func (*TableMapEvent) IsSetColumn

func (e *TableMapEvent) IsSetColumn(i int) bool

func (*TableMapEvent) JsonColumnCount

func (e *TableMapEvent) JsonColumnCount() uint64

JsonColumnCount returns the number of JSON columns in this table

func (*TableMapEvent) Nullable

func (e *TableMapEvent) Nullable(i int) (available, nullable bool)

Nullable returns the nullablity of the i-th column. If null bits are not available, available is false. i must be in range [0, ColumnCount).

func (*TableMapEvent) SetStrValueMap

func (e *TableMapEvent) SetStrValueMap() map[int][]string

SetStrValueMap returns a map: column index -> set string value. Note that only set columns will be returned. nil is returned if not available or no set columns at all.

func (*TableMapEvent) SetStrValueString

func (e *TableMapEvent) SetStrValueString() [][]string

SetStrValueString returns values for set columns as string slices. nil is returned if not available or no set columns at all.

func (*TableMapEvent) UnsignedMap

func (e *TableMapEvent) UnsignedMap() map[int]bool

UnsignedMap returns a map: column index -> unsigned. Note that only numeric columns will be returned. nil is returned if not available or no numeric columns at all.

func (*TableMapEvent) VisibilityMap

func (e *TableMapEvent) VisibilityMap() map[int]bool

VisibilityMap returns a map: column index -> visiblity. Invisible column was introduced in MySQL 8.0.23 nil is returned if not available.

type TransactionPayloadEvent

type TransactionPayloadEvent struct {
	Size             uint64
	UncompressedSize uint64
	CompressionType  uint64
	Payload          []byte
	Events           []*BinlogEvent
	// contains filtered or unexported fields
}

func (*TransactionPayloadEvent) Decode

func (e *TransactionPayloadEvent) Decode(data []byte) error

func (*TransactionPayloadEvent) Dump

func (e *TransactionPayloadEvent) Dump(w io.Writer)

type XIDEvent

type XIDEvent struct {
	XID uint64

	// in fact XIDEvent dosen't have the GTIDSet information, just for beneficial to use
	GSet GTIDSet
}

func (*XIDEvent) Decode

func (e *XIDEvent) Decode(data []byte) error

func (*XIDEvent) Dump

func (e *XIDEvent) Dump(w io.Writer)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL