Documentation ¶
Overview ¶
Replication package is to handle MySQL replication protocol.
Todo:
+ Get table information when handing rows event.
Index ¶
- Constants
- Variables
- type BeginLoadQueryEvent
- type BinlogEvent
- type BinlogParser
- func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error)
- func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error
- func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error
- func (p *BinlogParser) Reset()
- func (p *BinlogParser) SetParseTime(parseTime bool)
- func (p *BinlogParser) SetRawMode(mode bool)
- type BinlogStreamer
- type BinlogSyncer
- func (b *BinlogSyncer) Close()
- func (b *BinlogSyncer) GetNextPosition() Position
- func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error
- func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error)
- func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error)
- type BinlogSyncerConfig
- type Event
- type EventError
- type EventHeader
- type EventType
- type ExecuteLoadQueryEvent
- type FormatDescriptionEvent
- type GTIDEvent
- type GenericEvent
- type MariadbAnnotaeRowsEvent
- type MariadbBinlogCheckPointEvent
- type MariadbGTIDEvent
- type MariadbGTIDListEvent
- type OnEventFunc
- type QueryEvent
- type RotateEvent
- type RowsEvent
- type RowsQueryEvent
- type TableMapEvent
- type XIDEvent
Constants ¶
const ( LOG_EVENT_BINLOG_IN_USE_F uint16 = 0x0001 LOG_EVENT_FORCED_ROTATE_F uint16 = 0x0002 LOG_EVENT_THREAD_SPECIFIC_F uint16 = 0x0004 LOG_EVENT_SUPPRESS_USE_F uint16 = 0x0008 LOG_EVENT_UPDATE_TABLE_MAP_VERSION_F uint16 = 0x0010 LOG_EVENT_ARTIFICIAL_F uint16 = 0x0020 LOG_EVENT_RELAY_LOG_F uint16 = 0x0040 LOG_EVENT_IGNORABLE_F uint16 = 0x0080 LOG_EVENT_NO_FILTER_F uint16 = 0x0100 LOG_EVENT_MTS_ISOLATE_F uint16 = 0x0200 )
const ( BINLOG_DUMP_NEVER_STOP uint16 = 0x00 BINLOG_DUMP_NON_BLOCK uint16 = 0x01 BINLOG_THROUGH_POSITION uint16 = 0x02 BINLOG_THROUGH_GTID uint16 = 0x04 )
const ( BINLOG_ROW_IMAGE_FULL = "FULL" BINLOG_ROW_IAMGE_MINIMAL = "MINIMAL" BINLOG_ROW_IMAGE_NOBLOB = "NOBLOB" )
const ( BINLOG_CHECKSUM_ALG_OFF byte = 0 // Events are without checksum though its generator // is checksum-capable New Master (NM). BINLOG_CHECKSUM_ALG_CRC32 byte = 1 // CRC32 of zlib algorithm. // BINLOG_CHECKSUM_ALG_ENUM_END, // the cut line: valid alg range is [1, 0x7f]. BINLOG_CHECKSUM_ALG_UNDEF byte = 255 // special value to tag undetermined yet checksum )
const ( JSONB_SMALL_OBJECT byte = iota // small JSON object JSONB_LARGE_OBJECT // large JSON object JSONB_SMALL_ARRAY // small JSON array JSONB_LARGE_ARRAY // large JSON array JSONB_LITERAL // literal (true/false/null) JSONB_INT16 // int16 JSONB_UINT16 // uint16 JSONB_INT32 // int32 JSONB_UINT32 // uint32 JSONB_INT64 // int64 JSONB_UINT64 // uint64 JSONB_DOUBLE // double JSONB_STRING // string JSONB_OPAQUE byte = 0x0f // custom data (any MySQL data type) )
const ( JSONB_NULL_LITERAL byte = 0x00 JSONB_TRUE_LITERAL byte = 0x01 JSONB_FALSE_LITERAL byte = 0x02 )
const DATETIMEF_INT_OFS int64 = 0x8000000000
const (
EventHeaderSize = 19
)
const (
//we only support MySQL 5.0.0+ binlog format, maybe???
MinBinlogVersion = 4
)
const RowsEventStmtEndFlag = 0x01
RowsEventStmtEndFlag is set in the end of the statement.
const TIMEF_INT_OFS int64 = 0x800000
const TIMEF_OFS int64 = 0x800000000000
Variables ¶
var ( ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again") ErrSyncClosed = errors.New("Sync was closed") )
var ( //binlog header [ fe `bin` ] BinLogFileHeader []byte = []byte{0xfe, 0x62, 0x69, 0x6e} SemiSyncIndicator byte = 0xef )
Functions ¶
This section is empty.
Types ¶
type BeginLoadQueryEvent ¶
func (*BeginLoadQueryEvent) Decode ¶
func (e *BeginLoadQueryEvent) Decode(data []byte) error
func (*BeginLoadQueryEvent) Dump ¶
func (e *BeginLoadQueryEvent) Dump(w io.Writer)
type BinlogEvent ¶
type BinlogEvent struct { // raw binlog data, including crc32 checksum if exists RawData []byte Header *EventHeader Event Event }
func (*BinlogEvent) Dump ¶
func (e *BinlogEvent) Dump(w io.Writer)
type BinlogParser ¶
type BinlogParser struct {
// contains filtered or unexported fields
}
func NewBinlogParser ¶
func NewBinlogParser() *BinlogParser
func (*BinlogParser) Parse ¶
func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error)
Given the bytes for a a binary log event: return the decoded event. With the exception of the FORMAT_DESCRIPTION_EVENT event type there must have previously been passed a FORMAT_DESCRIPTION_EVENT into the parser for this to work properly on any given event. Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace an existing one.
func (*BinlogParser) ParseFile ¶
func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc) error
func (*BinlogParser) ParseReader ¶
func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error
func (*BinlogParser) Reset ¶
func (p *BinlogParser) Reset()
func (*BinlogParser) SetParseTime ¶
func (p *BinlogParser) SetParseTime(parseTime bool)
func (*BinlogParser) SetRawMode ¶
func (p *BinlogParser) SetRawMode(mode bool)
type BinlogStreamer ¶
type BinlogStreamer struct {
// contains filtered or unexported fields
}
BinlogStreamer gets the streaming event.
func (*BinlogStreamer) GetEvent ¶
func (s *BinlogStreamer) GetEvent(ctx context.Context) (*BinlogEvent, error)
GetEvent gets the binlog event one by one, it will block until Syncer receives any events from MySQL or meets a sync error. You can pass a context (like Cancel or Timeout) to break the block.
type BinlogSyncer ¶
type BinlogSyncer struct {
// contains filtered or unexported fields
}
BinlogSyncer syncs binlog event from server.
func NewBinlogSyncer ¶
func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer
NewBinlogSyncer creates the BinlogSyncer with cfg.
func (*BinlogSyncer) GetNextPosition ¶
func (b *BinlogSyncer) GetNextPosition() Position
GetNextPosition returns the next position of the syncer
func (*BinlogSyncer) StartBackup ¶
func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error
Like mysqlbinlog remote raw backup Backup remote binlog from position (filename, offset) and write in backupDir
func (*BinlogSyncer) StartSync ¶
func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error)
StartSync starts syncing from the `pos` position.
func (*BinlogSyncer) StartSyncGTID ¶
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error)
StartSyncGTID starts syncing from the `gset` GTIDSet.
type BinlogSyncerConfig ¶
type BinlogSyncerConfig struct { // ServerID is the unique ID in cluster. ServerID uint32 // Flavor is "mysql" or "mariadb", if not set, use "mysql" default. Flavor string // Host is for MySQL server host. Host string // Port is for MySQL server port. Port uint16 // User is for MySQL user. User string // Password is for MySQL password. Password string // Localhost is local hostname if register salve. // If not set, use os.Hostname() instead. Localhost string // Charset is for MySQL client character set Charset string // SemiSyncEnabled enables semi-sync or not. SemiSyncEnabled bool // RawModeEnabled is for not parsing binlog event. RawModeEnabled bool // If not nil, use the provided tls.Config to connect to the database using TLS/SSL. TLSConfig *tls.Config // Use replication.Time structure for timestamp and datetime. // We will use Local location for timestamp and UTC location for datatime. ParseTime bool // RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection. RecvBufferSize int // master heartbeat period HeartbeatPeriod time.Duration // read timeout ReadTimeout time.Duration }
BinlogSyncerConfig is the configuration for BinlogSyncer.
type EventError ¶
type EventError struct { Header *EventHeader //Error message Err string //Event data Data []byte }
func (*EventError) Error ¶
func (e *EventError) Error() string
type EventHeader ¶
type EventHeader struct { Timestamp uint32 EventType EventType ServerID uint32 EventSize uint32 LogPos uint32 Flags uint16 }
func (*EventHeader) Decode ¶
func (h *EventHeader) Decode(data []byte) error
func (*EventHeader) Dump ¶
func (h *EventHeader) Dump(w io.Writer)
type EventType ¶
type EventType byte
const ( UNKNOWN_EVENT EventType = iota START_EVENT_V3 QUERY_EVENT STOP_EVENT ROTATE_EVENT INTVAR_EVENT LOAD_EVENT SLAVE_EVENT CREATE_FILE_EVENT APPEND_BLOCK_EVENT EXEC_LOAD_EVENT DELETE_FILE_EVENT NEW_LOAD_EVENT RAND_EVENT USER_VAR_EVENT FORMAT_DESCRIPTION_EVENT XID_EVENT BEGIN_LOAD_QUERY_EVENT EXECUTE_LOAD_QUERY_EVENT TABLE_MAP_EVENT WRITE_ROWS_EVENTv0 UPDATE_ROWS_EVENTv0 DELETE_ROWS_EVENTv0 WRITE_ROWS_EVENTv1 UPDATE_ROWS_EVENTv1 DELETE_ROWS_EVENTv1 INCIDENT_EVENT HEARTBEAT_EVENT IGNORABLE_EVENT ROWS_QUERY_EVENT WRITE_ROWS_EVENTv2 UPDATE_ROWS_EVENTv2 DELETE_ROWS_EVENTv2 GTID_EVENT ANONYMOUS_GTID_EVENT PREVIOUS_GTIDS_EVENT )
type ExecuteLoadQueryEvent ¶
type ExecuteLoadQueryEvent struct { SlaveProxyID uint32 ExecutionTime uint32 SchemaLength uint8 ErrorCode uint16 StatusVars uint16 FileID uint32 StartPos uint32 EndPos uint32 DupHandlingFlags uint8 }
func (*ExecuteLoadQueryEvent) Decode ¶
func (e *ExecuteLoadQueryEvent) Decode(data []byte) error
func (*ExecuteLoadQueryEvent) Dump ¶
func (e *ExecuteLoadQueryEvent) Dump(w io.Writer)
type FormatDescriptionEvent ¶
type FormatDescriptionEvent struct { Version uint16 //len = 50 ServerVersion []byte CreateTimestamp uint32 EventHeaderLength uint8 EventTypeHeaderLengths []byte // 0 is off, 1 is for CRC32, 255 is undefined ChecksumAlgorithm byte }
func (*FormatDescriptionEvent) Decode ¶
func (e *FormatDescriptionEvent) Decode(data []byte) error
func (*FormatDescriptionEvent) Dump ¶
func (e *FormatDescriptionEvent) Dump(w io.Writer)
type GenericEvent ¶
type GenericEvent struct {
Data []byte
}
we don't parse all event, so some we will use GenericEvent instead
func (*GenericEvent) Decode ¶
func (e *GenericEvent) Decode(data []byte) error
func (*GenericEvent) Dump ¶
func (e *GenericEvent) Dump(w io.Writer)
type MariadbAnnotaeRowsEvent ¶
type MariadbAnnotaeRowsEvent struct {
Query []byte
}
func (*MariadbAnnotaeRowsEvent) Decode ¶
func (e *MariadbAnnotaeRowsEvent) Decode(data []byte) error
func (*MariadbAnnotaeRowsEvent) Dump ¶
func (e *MariadbAnnotaeRowsEvent) Dump(w io.Writer)
type MariadbBinlogCheckPointEvent ¶
type MariadbBinlogCheckPointEvent struct {
Info []byte
}
func (*MariadbBinlogCheckPointEvent) Decode ¶
func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error
func (*MariadbBinlogCheckPointEvent) Dump ¶
func (e *MariadbBinlogCheckPointEvent) Dump(w io.Writer)
type MariadbGTIDEvent ¶
type MariadbGTIDEvent struct {
GTID MariadbGTID
}
func (*MariadbGTIDEvent) Decode ¶
func (e *MariadbGTIDEvent) Decode(data []byte) error
func (*MariadbGTIDEvent) Dump ¶
func (e *MariadbGTIDEvent) Dump(w io.Writer)
type MariadbGTIDListEvent ¶
type MariadbGTIDListEvent struct {
GTIDs []MariadbGTID
}
func (*MariadbGTIDListEvent) Decode ¶
func (e *MariadbGTIDListEvent) Decode(data []byte) error
func (*MariadbGTIDListEvent) Dump ¶
func (e *MariadbGTIDListEvent) Dump(w io.Writer)
type OnEventFunc ¶
type OnEventFunc func(*BinlogEvent) error
type QueryEvent ¶
type QueryEvent struct { SlaveProxyID uint32 ExecutionTime uint32 ErrorCode uint16 StatusVars []byte Schema []byte Query []byte // in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use GSet GTIDSet }
func (*QueryEvent) Decode ¶
func (e *QueryEvent) Decode(data []byte) error
func (*QueryEvent) Dump ¶
func (e *QueryEvent) Dump(w io.Writer)
type RotateEvent ¶
func (*RotateEvent) Decode ¶
func (e *RotateEvent) Decode(data []byte) error
func (*RotateEvent) Dump ¶
func (e *RotateEvent) Dump(w io.Writer)
type RowsEvent ¶
type RowsEvent struct { //0, 1, 2 Version int Table *TableMapEvent TableID uint64 Flags uint16 //if version == 2 ExtraData []byte //lenenc_int ColumnCount uint64 //len = (ColumnCount + 7) / 8 ColumnBitmap1 []byte //if UPDATE_ROWS_EVENTv1 or v2 //len = (ColumnCount + 7) / 8 ColumnBitmap2 []byte //rows: invalid: int64, float64, bool, []byte, string Rows [][]interface{} // contains filtered or unexported fields }
type RowsQueryEvent ¶
type RowsQueryEvent struct {
Query []byte
}
func (*RowsQueryEvent) Decode ¶
func (e *RowsQueryEvent) Decode(data []byte) error
func (*RowsQueryEvent) Dump ¶
func (e *RowsQueryEvent) Dump(w io.Writer)
type TableMapEvent ¶
type TableMapEvent struct { TableID uint64 Flags uint16 Schema []byte Table []byte ColumnCount uint64 ColumnType []byte ColumnMeta []uint16 //len = (ColumnCount + 7) / 8 NullBitmap []byte // contains filtered or unexported fields }
func (*TableMapEvent) Decode ¶
func (e *TableMapEvent) Decode(data []byte) error
func (*TableMapEvent) Dump ¶
func (e *TableMapEvent) Dump(w io.Writer)