Documentation ¶
Overview ¶
Replication package is to handle MySQL replication protocol.
Todo:
+ Get table information when handing rows event.
Index ¶
- Constants
- Variables
- type BeginLoadQueryEvent
- type BinlogEvent
- type BinlogParser
- func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error)
- func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error
- func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error
- func (p *BinlogParser) ParseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error)
- func (p *BinlogParser) Reset()
- func (p *BinlogParser) Resume()
- func (p *BinlogParser) SetFlavor(flavor string)
- func (p *BinlogParser) SetIgnoreJSONDecodeError(ignoreJSONDecodeErr bool)
- func (p *BinlogParser) SetParseTime(parseTime bool)
- func (p *BinlogParser) SetRawMode(mode bool)
- func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error)
- func (p *BinlogParser) SetTableMapOptionalMetaDecodeFunc(tableMapOptionalMetaDecondeFunc func([]byte) error)
- func (p *BinlogParser) SetTimestampStringLocation(timestampStringLocation *time.Location)
- func (p *BinlogParser) SetUseDecimal(useDecimal bool)
- func (p *BinlogParser) SetVerifyChecksum(verify bool)
- func (p *BinlogParser) Stop()
- type BinlogStreamer
- func (s *BinlogStreamer) AddErrorToStreamer(err error) bool
- func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error
- func (s *BinlogStreamer) DumpEvents() []*BinlogEvent
- func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error)
- func (s *BinlogStreamer) GetEventWithStartTime(ctx context.Context, startTime time.Time) (*BinlogEvent, error)
- type BinlogSyncer
- func (b *BinlogSyncer) Close()
- func (b *BinlogSyncer) GetNextPosition() Position
- func (b *BinlogSyncer) LastConnectionID() uint32
- func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error
- func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, ...) (retErr error)
- func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error)
- func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error)
- type BinlogSyncerConfig
- type EnumBinlogRowValueOptions
- type EnumRowImageType
- type Event
- type EventError
- type EventHeader
- type EventType
- type ExecuteLoadQueryEvent
- type FormatDescriptionEvent
- type GTIDEvent
- type GenericEvent
- type IntVarEvent
- type IntVarEventType
- type JsonDiff
- type JsonDiffOperation
- type MariadbAnnotateRowsEvent
- type MariadbBinlogCheckPointEvent
- type MariadbGTIDEvent
- type MariadbGTIDListEvent
- type OnEventFunc
- type PreviousGTIDsEvent
- type QueryEvent
- type RotateEvent
- type RowsEvent
- type RowsQueryEvent
- type TableMapEvent
- func (e *TableMapEvent) CollationMap() map[int]uint64
- func (e *TableMapEvent) ColumnNameString() []string
- func (e *TableMapEvent) Decode(data []byte) error
- func (e *TableMapEvent) Dump(w io.Writer)
- func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64
- func (e *TableMapEvent) EnumStrValueMap() map[int][]string
- func (e *TableMapEvent) EnumStrValueString() [][]string
- func (e *TableMapEvent) GeometryTypeMap() map[int]uint64
- func (e *TableMapEvent) IsCharacterColumn(i int) bool
- func (e *TableMapEvent) IsEnumColumn(i int) bool
- func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool
- func (e *TableMapEvent) IsGeometryColumn(i int) bool
- func (e *TableMapEvent) IsNumericColumn(i int) bool
- func (e *TableMapEvent) IsSetColumn(i int) bool
- func (e *TableMapEvent) JsonColumnCount() uint64
- func (e *TableMapEvent) Nullable(i int) (available, nullable bool)
- func (e *TableMapEvent) SetStrValueMap() map[int][]string
- func (e *TableMapEvent) SetStrValueString() [][]string
- func (e *TableMapEvent) UnsignedMap() map[int]bool
- func (e *TableMapEvent) VisibilityMap() map[int]bool
- type TransactionPayloadEvent
- type XIDEvent
Constants ¶
const ( LOG_EVENT_BINLOG_IN_USE_F uint16 = 0x0001 LOG_EVENT_FORCED_ROTATE_F uint16 = 0x0002 LOG_EVENT_THREAD_SPECIFIC_F uint16 = 0x0004 LOG_EVENT_SUPPRESS_USE_F uint16 = 0x0008 LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F uint16 = 0x0010 LOG_EVENT_ARTIFICIAL_F uint16 = 0x0020 LOG_EVENT_RELAY_LOG_F uint16 = 0x0040 LOG_EVENT_IGNORABLE_F uint16 = 0x0080 LOG_EVENT_NO_FILTER_F uint16 = 0x0100 LOG_EVENT_MTS_ISOLATE_F uint16 = 0x0200 )
const ( BINLOG_DUMP_NEVER_STOP uint16 = 0x00 BINLOG_DUMP_NON_BLOCK uint16 = 0x01 BINLOG_SEND_ANNOTATE_ROWS_EVENT uint16 = 0x02 BINLOG_THROUGH_POSITION uint16 = 0x02 BINLOG_THROUGH_GTID uint16 = 0x04 )
const ( BINLOG_ROW_IMAGE_FULL = "FULL" BINLOG_ROW_IMAGE_MINIMAL = "MINIMAL" BINLOG_ROW_IMAGE_NOBLOB = "NOBLOB" )
const ( BINLOG_MARIADB_FL_STANDALONE = 1 << iota /*1 - FL_STANDALONE is set when there is no terminating COMMIT event*/ BINLOG_MARIADB_FL_GROUP_COMMIT_ID /*2 - FL_GROUP_COMMIT_ID is set when event group is part of a group commit on the master. Groups with same commit_id are part of the same group commit.*/ BINLOG_MARIADB_FL_TRANSACTIONAL /*4 - FL_TRANSACTIONAL is set for an event group that can be safely rolled back (no MyISAM, eg.).*/ BINLOG_MARIADB_FL_ALLOW_PARALLEL /*8 - FL_ALLOW_PARALLEL reflects the (negation of the) value of @@SESSION.skip_parallel_replication at the time of commit*/ BINLOG_MARIADB_FL_WAITED /*16 = FL_WAITED is set if a row lock wait (or other wait) is detected during the execution of the transaction*/ BINLOG_MARIADB_FL_DDL /*32 - FL_DDL is set for event group containing DDL*/ )
const ( BINLOG_CHECKSUM_ALG_OFF byte = 0 // Events are without checksum though its generator // is checksum-capable New Master (NM). BINLOG_CHECKSUM_ALG_CRC32 byte = 1 // CRC32 of zlib algorithm. // BINLOG_CHECKSUM_ALG_ENUM_END, // the cut line: valid alg range is [1, 0x7f]. BINLOG_CHECKSUM_ALG_UNDEF byte = 255 // special value to tag undetermined yet checksum )
const ( TABLE_MAP_OPT_META_SIGNEDNESS byte = iota + 1 TABLE_MAP_OPT_META_DEFAULT_CHARSET TABLE_MAP_OPT_META_COLUMN_CHARSET TABLE_MAP_OPT_META_COLUMN_NAME TABLE_MAP_OPT_META_SET_STR_VALUE TABLE_MAP_OPT_META_ENUM_STR_VALUE TABLE_MAP_OPT_META_GEOMETRY_TYPE TABLE_MAP_OPT_META_SIMPLE_PRIMARY_KEY TABLE_MAP_OPT_META_PRIMARY_KEY_WITH_PREFIX TABLE_MAP_OPT_META_ENUM_AND_SET_DEFAULT_CHARSET TABLE_MAP_OPT_META_ENUM_AND_SET_COLUMN_CHARSET TABLE_MAP_OPT_META_COLUMN_VISIBILITY )
These are TABLE_MAP_EVENT's optional metadata field type, from: libbinlogevents/include/rows_event.h
const ( ENUM_EXTRA_ROW_INFO_TYPECODE_NDB byte = iota ENUM_EXTRA_ROW_INFO_TYPECODE_PARTITION )
const ( EventHeaderSize = 19 SidLength = 16 LogicalTimestampTypeCode = 2 PartLogicalTimestampLength = 8 BinlogChecksumLength = 4 UndefinedServerVer = 999999 // UNDEFINED_SERVER_VERSION )
const ( JSONB_SMALL_OBJECT byte = iota // small JSON object JSONB_LARGE_OBJECT // large JSON object JSONB_SMALL_ARRAY // small JSON array JSONB_LARGE_ARRAY // large JSON array JSONB_LITERAL // literal (true/false/null) JSONB_INT16 // int16 JSONB_UINT16 // uint16 JSONB_INT32 // int32 JSONB_UINT32 // uint32 JSONB_INT64 // int64 JSONB_UINT64 // uint64 JSONB_DOUBLE // double JSONB_STRING // string JSONB_OPAQUE byte = 0x0f // custom data (any MySQL data type) )
const ( JSONB_NULL_LITERAL byte = 0x00 JSONB_TRUE_LITERAL byte = 0x01 JSONB_FALSE_LITERAL byte = 0x02 )
const ( // The JSON value in the given path is replaced with a new value. // // It has the same effect as `JSON_REPLACE(col, path, value)`. JsonDiffOperationReplace = JsonDiffOperation(iota) // Add a new element at the given path. // // If the path specifies an array element, it has the same effect as `JSON_ARRAY_INSERT(col, path, value)`. // // If the path specifies an object member, it has the same effect as `JSON_INSERT(col, path, value)`. JsonDiffOperationInsert // The JSON value at the given path is removed from an array or object. // // It has the same effect as `JSON_REMOVE(col, path)`. JsonDiffOperationRemove )
const ( EnumRowImageTypeWriteAI = EnumRowImageType(iota) EnumRowImageTypeUpdateBI EnumRowImageTypeUpdateAI EnumRowImageTypeDeleteBI )
const ( OTW_PAYLOAD_HEADER_END_MARK = iota OTW_PAYLOAD_SIZE_FIELD OTW_PAYLOAD_COMPRESSION_TYPE_FIELD OTW_PAYLOAD_UNCOMPRESSED_SIZE_FIELD )
On The Wire: Field Types See also binary_log::codecs::binary::Transaction_payload::fields in MySQL https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1codecs_1_1binary_1_1Transaction__payload.html#a9fff7ac12ba064f40e9216565c53d07b
const ( ZSTD = 0 NONE = 255 )
Compression Types
const DATETIMEF_INT_OFS int64 = 0x8000000000
const ( // Store JSON updates in partial form EnumBinlogRowValueOptionsPartialJsonUpdates = EnumBinlogRowValueOptions(iota + 1) )
const (
//we only support MySQL 5.0.0+ binlog format, maybe???
MinBinlogVersion = 4
)
const RowsEventStmtEndFlag = 0x01
RowsEventStmtEndFlag is set in the end of the statement.
const TIMEF_INT_OFS int64 = 0x800000
const TIMEF_OFS int64 = 0x800000000000
Variables ¶
var ( ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again") ErrSyncClosed = errors.New("Sync was closed") )
var ( //binlog header [ fe `bin` ] BinLogFileHeader = []byte{0xfe, 0x62, 0x69, 0x6e} SemiSyncIndicator byte = 0xef )
var ( // ErrChecksumMismatch indicates binlog checksum mismatch. ErrChecksumMismatch = errors.New("binlog checksum mismatch, data may be corrupted") )
var (
ErrCorruptedJSONDiff = fmt.Errorf("corrupted JSON diff") // ER_CORRUPTED_JSON_DIFF
)
Functions ¶
This section is empty.
Types ¶
type BeginLoadQueryEvent ¶
func (*BeginLoadQueryEvent) Decode ¶
func (e *BeginLoadQueryEvent) Decode(data []byte) error
func (*BeginLoadQueryEvent) Dump ¶
func (e *BeginLoadQueryEvent) Dump(w io.Writer)
type BinlogEvent ¶
type BinlogEvent struct { // raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists RawData []byte Header *EventHeader Event Event }
func (*BinlogEvent) Dump ¶
func (e *BinlogEvent) Dump(w io.Writer)
type BinlogParser ¶
type BinlogParser struct {
// contains filtered or unexported fields
}
func NewBinlogParser ¶
func NewBinlogParser() *BinlogParser
func (*BinlogParser) Parse ¶
func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error)
Parse: Given the bytes for a a binary log event: return the decoded event. With the exception of the FORMAT_DESCRIPTION_EVENT event type there must have previously been passed a FORMAT_DESCRIPTION_EVENT into the parser for this to work properly on any given event. Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace an existing one.
func (*BinlogParser) ParseFile ¶
func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error
func (*BinlogParser) ParseReader ¶
func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error
func (*BinlogParser) ParseSingleEvent ¶
func (p *BinlogParser) ParseSingleEvent(r io.Reader, onEvent OnEventFunc) (bool, error)
ParseSingleEvent parses single binlog event and passes the event to onEvent function.
func (*BinlogParser) Reset ¶
func (p *BinlogParser) Reset()
func (*BinlogParser) Resume ¶
func (p *BinlogParser) Resume()
func (*BinlogParser) SetFlavor ¶
func (p *BinlogParser) SetFlavor(flavor string)
func (*BinlogParser) SetIgnoreJSONDecodeError ¶
func (p *BinlogParser) SetIgnoreJSONDecodeError(ignoreJSONDecodeErr bool)
func (*BinlogParser) SetParseTime ¶
func (p *BinlogParser) SetParseTime(parseTime bool)
func (*BinlogParser) SetRawMode ¶
func (p *BinlogParser) SetRawMode(mode bool)
func (*BinlogParser) SetRowsEventDecodeFunc ¶
func (p *BinlogParser) SetRowsEventDecodeFunc(rowsEventDecodeFunc func(*RowsEvent, []byte) error)
func (*BinlogParser) SetTableMapOptionalMetaDecodeFunc ¶
func (p *BinlogParser) SetTableMapOptionalMetaDecodeFunc(tableMapOptionalMetaDecondeFunc func([]byte) error)
func (*BinlogParser) SetTimestampStringLocation ¶
func (p *BinlogParser) SetTimestampStringLocation(timestampStringLocation *time.Location)
func (*BinlogParser) SetUseDecimal ¶
func (p *BinlogParser) SetUseDecimal(useDecimal bool)
func (*BinlogParser) SetVerifyChecksum ¶
func (p *BinlogParser) SetVerifyChecksum(verify bool)
func (*BinlogParser) Stop ¶
func (p *BinlogParser) Stop()
type BinlogStreamer ¶
type BinlogStreamer struct {
// contains filtered or unexported fields
}
BinlogStreamer gets the streaming event.
func NewBinlogStreamer ¶
func NewBinlogStreamer() *BinlogStreamer
func NewBinlogStreamerWithChanSize ¶
func NewBinlogStreamerWithChanSize(chanSize int) *BinlogStreamer
func (*BinlogStreamer) AddErrorToStreamer ¶
func (s *BinlogStreamer) AddErrorToStreamer(err error) bool
AddErrorToStreamer adds an error to the streamer.
func (*BinlogStreamer) AddEventToStreamer ¶
func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error
AddEventToStreamer adds a binlog event to the streamer. You can use it when you want to add an event to the streamer manually. can be used in replication handlers
func (*BinlogStreamer) DumpEvents ¶
func (s *BinlogStreamer) DumpEvents() []*BinlogEvent
DumpEvents dumps all left events
func (*BinlogStreamer) GetEvent ¶
func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error)
GetEvent gets the binlog event one by one, it will block until Syncer receives any events from MySQL or meets a sync error. You can pass a context (like Cancel or Timeout) to break the block.
func (*BinlogStreamer) GetEventWithStartTime ¶
func (s *BinlogStreamer) GetEventWithStartTime(ctx context.Context, startTime time.Time) (*BinlogEvent, error)
GetEventWithStartTime gets the binlog event with starttime, if current binlog event timestamp smaller than specify starttime return nil event
type BinlogSyncer ¶
type BinlogSyncer struct {
// contains filtered or unexported fields
}
BinlogSyncer syncs binlog event from server.
func NewBinlogSyncer ¶
func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer
NewBinlogSyncer creates the BinlogSyncer with cfg.
func (*BinlogSyncer) GetNextPosition ¶
func (b *BinlogSyncer) GetNextPosition() Position
GetNextPosition returns the next position of the syncer
func (*BinlogSyncer) LastConnectionID ¶
func (b *BinlogSyncer) LastConnectionID() uint32
LastConnectionID returns last connectionID.
func (*BinlogSyncer) StartBackup ¶
func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error
StartBackup: Like mysqlbinlog remote raw backup Backup remote binlog from position (filename, offset) and write in backupDir
func (*BinlogSyncer) StartBackupWithHandler ¶
func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration, handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error)
StartBackupWithHandler starts the backup process for the binary log using the specified position and handler. The process will continue until the timeout is reached or an error occurs.
Parameters:
- p: The starting position in the binlog from which to begin the backup.
- timeout: The maximum duration to wait for new binlog events before stopping the backup process. If set to 0, a default very long timeout (30 days) is used instead.
- handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.
func (*BinlogSyncer) StartSync ¶
func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error)
StartSync starts syncing from the `pos` position.
func (*BinlogSyncer) StartSyncGTID ¶
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error)
StartSyncGTID starts syncing from the `gset` GTIDSet.
type BinlogSyncerConfig ¶
type BinlogSyncerConfig struct { // ServerID is the unique ID in cluster. ServerID uint32 // Flavor is "mysql" or "mariadb", if not set, use "mysql" default. Flavor string // Host is for MySQL server host. Host string // Port is for MySQL server port. Port uint16 // User is for MySQL user. User string // Password is for MySQL password. Password string // Localhost is local hostname if register salve. // If not set, use os.Hostname() instead. Localhost string // Charset is for MySQL client character set Charset string // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool // RawModeEnabled is for not parsing binlog event. RawModeEnabled bool // If not nil, use the provided tls.Config to connect to the database using TLS/SSL. TLSConfig *tls.Config // Use replication.Time structure for timestamp and datetime. // We will use Local location for timestamp and UTC location for datatime. ParseTime bool // If ParseTime is false, convert TIMESTAMP into this specified timezone. If // ParseTime is true, this option will have no effect and TIMESTAMP data will // be parsed into the local timezone and a full time.Time struct will be // returned. // // Note that MySQL TIMESTAMP columns are offset from the machine local // timezone while DATETIME columns are offset from UTC. This is consistent // with documented MySQL behaviour as it return TIMESTAMP in local timezone // and DATETIME in UTC. // // Setting this to UTC effectively equalizes the TIMESTAMP and DATETIME time // strings obtained from MySQL. TimestampStringLocation *time.Location // Use decimal.Decimal structure for decimals. UseDecimal bool // RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection. RecvBufferSize int // master heartbeat period HeartbeatPeriod time.Duration // read timeout ReadTimeout time.Duration // maximum number of attempts to re-establish a broken connection, zero or negative number means infinite retry. // this configuration will not work if DisableRetrySync is true MaxReconnectAttempts int // whether disable re-sync for broken connection DisableRetrySync bool // Only works when MySQL/MariaDB variable binlog_checksum=CRC32. // For MySQL, binlog_checksum was introduced since 5.6.2, but CRC32 was set as default value since 5.6.6 . // https://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#option_mysqld_binlog-checksum // For MariaDB, binlog_checksum was introduced since MariaDB 5.3, but CRC32 was set as default value since MariaDB 10.2.1 . // https://mariadb.com/kb/en/library/replication-and-binary-log-server-system-variables/#binlog_checksum VerifyChecksum bool // DumpCommandFlag is used to send binglog dump command. Default 0, aka BINLOG_DUMP_NEVER_STOP. // For MySQL, BINLOG_DUMP_NEVER_STOP and BINLOG_DUMP_NON_BLOCK are available. // https://dev.mysql.com/doc/internals/en/com-binlog-dump.html#binlog-dump-non-block // For MariaDB, BINLOG_DUMP_NEVER_STOP, BINLOG_DUMP_NON_BLOCK and BINLOG_SEND_ANNOTATE_ROWS_EVENT are available. // https://mariadb.com/kb/en/library/com_binlog_dump/ // https://mariadb.com/kb/en/library/annotate_rows_event/ DumpCommandFlag uint16 //Option function is used to set outside of BinlogSyncerConfig, between mysql connection and COM_REGISTER_SLAVE //For MariaDB: slave_gtid_ignore_duplicates、skip_replication、slave_until_gtid Option func(*client.Conn) error // Set Logger Logger loggers.Advanced // Set Dialer Dialer client.Dialer RowsEventDecodeFunc func(*RowsEvent, []byte) error TableMapOptionalMetaDecodeFunc func([]byte) error DiscardGTIDSet bool EventCacheCount int }
BinlogSyncerConfig is the configuration for BinlogSyncer.
type EnumBinlogRowValueOptions ¶
type EnumBinlogRowValueOptions byte
Bits for binlog_row_value_options sysvar
type EnumRowImageType ¶
type EnumRowImageType byte
EnumRowImageType is allowed types for every row in mysql binlog. See https://github.com/mysql/mysql-server/blob/1bfe02bdad6604d54913c62614bde57a055c8332/sql/rpl_record.h#L39 enum class enum_row_image_type { WRITE_AI, UPDATE_BI, UPDATE_AI, DELETE_BI };
func (EnumRowImageType) String ¶
func (t EnumRowImageType) String() string
type EventError ¶
type EventError struct { Header *EventHeader //Error message Err string //Event data Data []byte }
func (*EventError) Error ¶
func (e *EventError) Error() string
type EventHeader ¶
type EventHeader struct { Timestamp uint32 EventType EventType ServerID uint32 EventSize uint32 LogPos uint32 Flags uint16 }
func (*EventHeader) Decode ¶
func (h *EventHeader) Decode(data []byte) error
func (*EventHeader) Dump ¶
func (h *EventHeader) Dump(w io.Writer)
type EventType ¶
type EventType byte
const ( UNKNOWN_EVENT EventType = iota START_EVENT_V3 QUERY_EVENT STOP_EVENT ROTATE_EVENT INTVAR_EVENT LOAD_EVENT SLAVE_EVENT CREATE_FILE_EVENT APPEND_BLOCK_EVENT EXEC_LOAD_EVENT DELETE_FILE_EVENT NEW_LOAD_EVENT RAND_EVENT USER_VAR_EVENT FORMAT_DESCRIPTION_EVENT XID_EVENT BEGIN_LOAD_QUERY_EVENT EXECUTE_LOAD_QUERY_EVENT TABLE_MAP_EVENT WRITE_ROWS_EVENTv0 UPDATE_ROWS_EVENTv0 DELETE_ROWS_EVENTv0 WRITE_ROWS_EVENTv1 UPDATE_ROWS_EVENTv1 DELETE_ROWS_EVENTv1 INCIDENT_EVENT HEARTBEAT_EVENT IGNORABLE_EVENT ROWS_QUERY_EVENT WRITE_ROWS_EVENTv2 UPDATE_ROWS_EVENTv2 DELETE_ROWS_EVENTv2 GTID_EVENT ANONYMOUS_GTID_EVENT PREVIOUS_GTIDS_EVENT TRANSACTION_CONTEXT_EVENT VIEW_CHANGE_EVENT XA_PREPARE_LOG_EVENT PARTIAL_UPDATE_ROWS_EVENT TRANSACTION_PAYLOAD_EVENT HEARTBEAT_LOG_EVENT_V2 )
const ( // MariaDB event starts from 160 MARIADB_ANNOTATE_ROWS_EVENT EventType = 160 + iota MARIADB_BINLOG_CHECKPOINT_EVENT MARIADB_GTID_EVENT MARIADB_GTID_LIST_EVENT MARIADB_START_ENCRYPTION_EVENT MARIADB_QUERY_COMPRESSED_EVENT MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1 MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1 MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1 )
type ExecuteLoadQueryEvent ¶
type ExecuteLoadQueryEvent struct { SlaveProxyID uint32 ExecutionTime uint32 SchemaLength uint8 ErrorCode uint16 StatusVars uint16 FileID uint32 StartPos uint32 EndPos uint32 DupHandlingFlags uint8 }
func (*ExecuteLoadQueryEvent) Decode ¶
func (e *ExecuteLoadQueryEvent) Decode(data []byte) error
func (*ExecuteLoadQueryEvent) Dump ¶
func (e *ExecuteLoadQueryEvent) Dump(w io.Writer)
type FormatDescriptionEvent ¶
type FormatDescriptionEvent struct { Version uint16 //len = 50 ServerVersion []byte CreateTimestamp uint32 EventHeaderLength uint8 EventTypeHeaderLengths []byte // 0 is off, 1 is for CRC32, 255 is undefined ChecksumAlgorithm byte }
func (*FormatDescriptionEvent) Decode ¶
func (e *FormatDescriptionEvent) Decode(data []byte) error
func (*FormatDescriptionEvent) Dump ¶
func (e *FormatDescriptionEvent) Dump(w io.Writer)
type GTIDEvent ¶
type GTIDEvent struct { CommitFlag uint8 SID []byte GNO int64 LastCommitted int64 SequenceNumber int64 // ImmediateCommitTimestamp/OriginalCommitTimestamp are introduced in MySQL-8.0.1, see: // https://mysqlhighavailability.com/replication-features-in-mysql-8-0-1/ ImmediateCommitTimestamp uint64 OriginalCommitTimestamp uint64 // Total transaction length (including this GTIDEvent), introduced in MySQL-8.0.2, see: // https://mysqlhighavailability.com/taking-advantage-of-new-transaction-length-metadata/ TransactionLength uint64 // ImmediateServerVersion/OriginalServerVersion are introduced in MySQL-8.0.14, see // https://dev.mysql.com/doc/refman/8.0/en/replication-compatibility.html ImmediateServerVersion uint32 OriginalServerVersion uint32 }
func (*GTIDEvent) ImmediateCommitTime ¶
ImmediateCommitTime returns the commit time of this trx on the immediate server or zero time if not available.
func (*GTIDEvent) OriginalCommitTime ¶
OriginalCommitTime returns the commit time of this trx on the original server or zero time if not available.
type GenericEvent ¶
type GenericEvent struct {
Data []byte
}
we don't parse all event, so some we will use GenericEvent instead
func (*GenericEvent) Decode ¶
func (e *GenericEvent) Decode(data []byte) error
func (*GenericEvent) Dump ¶
func (e *GenericEvent) Dump(w io.Writer)
type IntVarEvent ¶
type IntVarEvent struct { Type IntVarEventType Value uint64 }
func (*IntVarEvent) Decode ¶
func (i *IntVarEvent) Decode(data []byte) error
func (*IntVarEvent) Dump ¶
func (i *IntVarEvent) Dump(w io.Writer)
type IntVarEventType ¶
type IntVarEventType byte
const ( INVALID IntVarEventType = iota LAST_INSERT_ID INSERT_ID )
type JsonDiff ¶
type JsonDiff struct { Op JsonDiffOperation Path string Value string }
type JsonDiffOperation ¶
type JsonDiffOperation byte
JsonDiffOperation is an enum that describes what kind of operation a JsonDiff object represents. https://github.com/mysql/mysql-server/blob/8.0/sql/json_diff.h
func (JsonDiffOperation) String ¶
func (op JsonDiffOperation) String() string
type MariadbAnnotateRowsEvent ¶
type MariadbAnnotateRowsEvent struct {
Query []byte
}
func (*MariadbAnnotateRowsEvent) Decode ¶
func (e *MariadbAnnotateRowsEvent) Decode(data []byte) error
func (*MariadbAnnotateRowsEvent) Dump ¶
func (e *MariadbAnnotateRowsEvent) Dump(w io.Writer)
type MariadbBinlogCheckPointEvent ¶
type MariadbBinlogCheckPointEvent struct {
Info []byte
}
func (*MariadbBinlogCheckPointEvent) Decode ¶
func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error
func (*MariadbBinlogCheckPointEvent) Dump ¶
func (e *MariadbBinlogCheckPointEvent) Dump(w io.Writer)
type MariadbGTIDEvent ¶
func (*MariadbGTIDEvent) Decode ¶
func (e *MariadbGTIDEvent) Decode(data []byte) error
func (*MariadbGTIDEvent) Dump ¶
func (e *MariadbGTIDEvent) Dump(w io.Writer)
func (*MariadbGTIDEvent) GTIDNext ¶
func (e *MariadbGTIDEvent) GTIDNext() (GTIDSet, error)
func (*MariadbGTIDEvent) IsDDL ¶
func (e *MariadbGTIDEvent) IsDDL() bool
func (*MariadbGTIDEvent) IsGroupCommit ¶
func (e *MariadbGTIDEvent) IsGroupCommit() bool
func (*MariadbGTIDEvent) IsStandalone ¶
func (e *MariadbGTIDEvent) IsStandalone() bool
type MariadbGTIDListEvent ¶
type MariadbGTIDListEvent struct {
GTIDs []MariadbGTID
}
func (*MariadbGTIDListEvent) Decode ¶
func (e *MariadbGTIDListEvent) Decode(data []byte) error
func (*MariadbGTIDListEvent) Dump ¶
func (e *MariadbGTIDListEvent) Dump(w io.Writer)
type OnEventFunc ¶
type OnEventFunc func(*BinlogEvent) error
type PreviousGTIDsEvent ¶
type PreviousGTIDsEvent struct {
GTIDSets string
}
func (*PreviousGTIDsEvent) Decode ¶
func (e *PreviousGTIDsEvent) Decode(data []byte) error
func (*PreviousGTIDsEvent) Dump ¶
func (e *PreviousGTIDsEvent) Dump(w io.Writer)
type QueryEvent ¶
type QueryEvent struct { SlaveProxyID uint32 ExecutionTime uint32 ErrorCode uint16 StatusVars []byte Schema []byte Query []byte // in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use GSet GTIDSet // contains filtered or unexported fields }
func (*QueryEvent) Decode ¶
func (e *QueryEvent) Decode(data []byte) error
func (*QueryEvent) Dump ¶
func (e *QueryEvent) Dump(w io.Writer)
type RotateEvent ¶
func (*RotateEvent) Decode ¶
func (e *RotateEvent) Decode(data []byte) error
func (*RotateEvent) Dump ¶
func (e *RotateEvent) Dump(w io.Writer)
type RowsEvent ¶
type RowsEvent struct { // 0, 1, 2 Version int Table *TableMapEvent TableID uint64 Flags uint16 // if version == 2 // Use when DataLen value is greater than 2 NdbFormat byte NdbData []byte PartitionId uint16 SourcePartitionId uint16 // lenenc_int ColumnCount uint64 // len = (ColumnCount + 7) / 8 ColumnBitmap1 []byte // if UPDATE_ROWS_EVENTv1 or v2, or PARTIAL_UPDATE_ROWS_EVENT // len = (ColumnCount + 7) / 8 ColumnBitmap2 []byte // rows: all return types from RowsEvent.decodeValue() Rows [][]interface{} SkippedColumns [][]int // contains filtered or unexported fields }
RowsEvent represents a MySQL rows event like DELETE_ROWS_EVENT, UPDATE_ROWS_EVENT, etc. RowsEvent.Rows saves the rows data, and the MySQL type to golang type mapping is - MYSQL_TYPE_NULL: nil - MYSQL_TYPE_LONG: int32 - MYSQL_TYPE_TINY: int8 - MYSQL_TYPE_SHORT: int16 - MYSQL_TYPE_INT24: int32 - MYSQL_TYPE_LONGLONG: int64 - MYSQL_TYPE_NEWDECIMAL: string / "github.com/shopspring/decimal".Decimal - MYSQL_TYPE_FLOAT: float32 - MYSQL_TYPE_DOUBLE: float64 - MYSQL_TYPE_BIT: int64 - MYSQL_TYPE_TIMESTAMP: string / time.Time - MYSQL_TYPE_TIMESTAMP2: string / time.Time - MYSQL_TYPE_DATETIME: string / time.Time - MYSQL_TYPE_DATETIME2: string / time.Time - MYSQL_TYPE_TIME: string - MYSQL_TYPE_TIME2: string - MYSQL_TYPE_DATE: string - MYSQL_TYPE_YEAR: int - MYSQL_TYPE_ENUM: int64 - MYSQL_TYPE_SET: int64 - MYSQL_TYPE_BLOB: []byte - MYSQL_TYPE_VARCHAR: string - MYSQL_TYPE_VAR_STRING: string - MYSQL_TYPE_STRING: string - MYSQL_TYPE_JSON: []byte / *replication.JsonDiff - MYSQL_TYPE_GEOMETRY: []byte
type RowsQueryEvent ¶
type RowsQueryEvent struct {
Query []byte
}
func (*RowsQueryEvent) Decode ¶
func (e *RowsQueryEvent) Decode(data []byte) error
func (*RowsQueryEvent) Dump ¶
func (e *RowsQueryEvent) Dump(w io.Writer)
type TableMapEvent ¶
type TableMapEvent struct { TableID uint64 Flags uint16 Schema []byte Table []byte ColumnCount uint64 ColumnType []byte ColumnMeta []uint16 // len = (ColumnCount + 7) / 8 NullBitmap []byte // SignednessBitmap stores signedness info for numeric columns. SignednessBitmap []byte // DefaultCharset[0] is the default collation of character columns. // For character columns that have different charset, // (character column index, column collation) pairs follows DefaultCharset []uint64 // ColumnCharset contains collation sequence for all character columns ColumnCharset []uint64 // SetStrValue stores values for set columns. SetStrValue [][][]byte // EnumStrValue stores values for enum columns. EnumStrValue [][][]byte // ColumnName list all column names. ColumnName [][]byte // GeometryType stores real type for geometry columns. GeometryType []uint64 // PrimaryKey is a sequence of column indexes of primary key. PrimaryKey []uint64 // PrimaryKeyPrefix is the prefix length used for each column of primary key. // 0 means that the whole column length is used. PrimaryKeyPrefix []uint64 // EnumSetDefaultCharset/EnumSetColumnCharset is similar to DefaultCharset/ColumnCharset but for enum/set columns. EnumSetDefaultCharset []uint64 EnumSetColumnCharset []uint64 // VisibilityBitmap stores bits that are set if corresponding column is not invisible (MySQL 8.0.23+) VisibilityBitmap []byte // contains filtered or unexported fields }
func (*TableMapEvent) CollationMap ¶
func (e *TableMapEvent) CollationMap() map[int]uint64
CollationMap returns a map: column index -> collation id. Note that only character columns will be returned. nil is returned if not available or no character columns at all.
func (*TableMapEvent) ColumnNameString ¶
func (e *TableMapEvent) ColumnNameString() []string
ColumnNameString returns column names as string slice. nil is returned if not available.
func (*TableMapEvent) Decode ¶
func (e *TableMapEvent) Decode(data []byte) error
func (*TableMapEvent) Dump ¶
func (e *TableMapEvent) Dump(w io.Writer)
func (*TableMapEvent) EnumSetCollationMap ¶
func (e *TableMapEvent) EnumSetCollationMap() map[int]uint64
EnumSetCollationMap returns a map: column index -> collation id. Note that only enum or set columns will be returned. nil is returned if not available or no enum/set columns at all.
func (*TableMapEvent) EnumStrValueMap ¶
func (e *TableMapEvent) EnumStrValueMap() map[int][]string
EnumStrValueMap returns a map: column index -> enum string value. Note that only enum columns will be returned. nil is returned if not available or no enum columns at all.
func (*TableMapEvent) EnumStrValueString ¶
func (e *TableMapEvent) EnumStrValueString() [][]string
EnumStrValueString returns values for enum columns as string slices. nil is returned if not available or no enum columns at all.
func (*TableMapEvent) GeometryTypeMap ¶
func (e *TableMapEvent) GeometryTypeMap() map[int]uint64
GeometryTypeMap returns a map: column index -> geometry type. Note that only geometry columns will be returned. nil is returned if not available or no geometry columns at all.
func (*TableMapEvent) IsCharacterColumn ¶
func (e *TableMapEvent) IsCharacterColumn(i int) bool
IsCharacterColumn returns true if the column type is considered as character type. Note that JSON/GEOMETRY types are treated as character type in mariadb. (JSON is an alias for LONGTEXT in mariadb: https://mariadb.com/kb/en/json-data-type/)
func (*TableMapEvent) IsEnumColumn ¶
func (e *TableMapEvent) IsEnumColumn(i int) bool
func (*TableMapEvent) IsEnumOrSetColumn ¶
func (e *TableMapEvent) IsEnumOrSetColumn(i int) bool
func (*TableMapEvent) IsGeometryColumn ¶
func (e *TableMapEvent) IsGeometryColumn(i int) bool
func (*TableMapEvent) IsNumericColumn ¶
func (e *TableMapEvent) IsNumericColumn(i int) bool
func (*TableMapEvent) IsSetColumn ¶
func (e *TableMapEvent) IsSetColumn(i int) bool
func (*TableMapEvent) JsonColumnCount ¶
func (e *TableMapEvent) JsonColumnCount() uint64
JsonColumnCount returns the number of JSON columns in this table
func (*TableMapEvent) Nullable ¶
func (e *TableMapEvent) Nullable(i int) (available, nullable bool)
Nullable returns the nullablity of the i-th column. If null bits are not available, available is false. i must be in range [0, ColumnCount).
func (*TableMapEvent) SetStrValueMap ¶
func (e *TableMapEvent) SetStrValueMap() map[int][]string
SetStrValueMap returns a map: column index -> set string value. Note that only set columns will be returned. nil is returned if not available or no set columns at all.
func (*TableMapEvent) SetStrValueString ¶
func (e *TableMapEvent) SetStrValueString() [][]string
SetStrValueString returns values for set columns as string slices. nil is returned if not available or no set columns at all.
func (*TableMapEvent) UnsignedMap ¶
func (e *TableMapEvent) UnsignedMap() map[int]bool
UnsignedMap returns a map: column index -> unsigned. Note that only numeric columns will be returned. nil is returned if not available or no numeric columns at all.
func (*TableMapEvent) VisibilityMap ¶
func (e *TableMapEvent) VisibilityMap() map[int]bool
VisibilityMap returns a map: column index -> visiblity. Invisible column was introduced in MySQL 8.0.23 nil is returned if not available.
type TransactionPayloadEvent ¶
type TransactionPayloadEvent struct { Size uint64 UncompressedSize uint64 CompressionType uint64 Payload []byte Events []*BinlogEvent // contains filtered or unexported fields }
func (*TransactionPayloadEvent) Decode ¶
func (e *TransactionPayloadEvent) Decode(data []byte) error
func (*TransactionPayloadEvent) Dump ¶
func (e *TransactionPayloadEvent) Dump(w io.Writer)