Documentation ¶
Overview ¶
Package binlogstream is used by syncer to read binlog. All information related to upstream binlog stream should be kept in this package, such as reset binlog to a location, maintain properties of the binlog event and stream, inject or delete binlog events with binlog stream, etc.
Index ¶
- type BinlogType
- type StreamerController
- func (c *StreamerController) CanRetry(err error) bool
- func (c *StreamerController) Close()
- func (m StreamerController) Delete(posStr string) error
- func (c *StreamerController) GetBinlogType() BinlogType
- func (c *StreamerController) GetCurEndLocation() binlog.Location
- func (c *StreamerController) GetCurStartLocation() binlog.Location
- func (c *StreamerController) GetEvent(tctx *tcontext.Context) (*replication.BinlogEvent, pb.ErrorOp, error)
- func (c *StreamerController) GetStreamer() reader.Streamer
- func (c *StreamerController) GetTxnEndLocation() binlog.Location
- func (c *StreamerController) IsClosed() bool
- func (m StreamerController) ListEqualAndAfter(posStr string) []*pb.HandleWorkerErrorRequest
- func (m StreamerController) RemoveOutdated(pos mysql.Position)
- func (c *StreamerController) ResetReplicationSyncer(tctx *tcontext.Context, location binlog.Location) (err error)
- func (m StreamerController) Set(req *pb.HandleWorkerErrorRequest, events []*replication.BinlogEvent) error
- func (c *StreamerController) Start(tctx *tcontext.Context, location binlog.Location) error
- func (c *StreamerController) UpdateServerIDAndResetReplication(tctx *tcontext.Context, location binlog.Location) error
- func (c *StreamerController) UpdateSyncCfg(syncCfg replication.BinlogSyncerConfig, fromDB *dbconn.UpStreamConn)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BinlogType ¶
type BinlogType uint8
BinlogType represents binlog type from syncer's view.
const ( // RemoteBinlog means syncer is connected to MySQL and reading binlog. RemoteBinlog BinlogType = iota + 1 // LocalBinlog means syncer is reading binlog from relay log. LocalBinlog )
func RelayToBinlogType ¶
func RelayToBinlogType(relay relay.Process) BinlogType
RelayToBinlogType converts relay.Process to BinlogType.
func (BinlogType) String ¶
func (t BinlogType) String() string
type StreamerController ¶
StreamerController controls the streamer for read binlog, include: 1. reset streamer to a binlog position or GTID 2. read next binlog event 3. transfer from local streamer to remote streamer.
func NewStreamerController ¶
func NewStreamerController( syncCfg replication.BinlogSyncerConfig, enableGTID bool, fromDB *dbconn.UpStreamConn, localBinlogDir string, timezone *time.Location, relay relay.Process, logger log.Logger, ) *StreamerController
NewStreamerController creates a new streamer controller.
func NewStreamerController4Test ¶
func NewStreamerController4Test( streamerProducer streamGenerator, streamer reader.Streamer, ) *StreamerController
NewStreamerController4Test is used in tests.
func (*StreamerController) CanRetry ¶
func (c *StreamerController) CanRetry(err error) bool
CanRetry returns true if can switch from local to remote and retry again.
func (StreamerController) Delete ¶
Delete will delete an operator. `posStr` should be in the format of "binlog-file:pos".
func (*StreamerController) GetBinlogType ¶
func (c *StreamerController) GetBinlogType() BinlogType
GetBinlogType returns the binlog type used now.
func (*StreamerController) GetCurEndLocation ¶
func (c *StreamerController) GetCurEndLocation() binlog.Location
func (*StreamerController) GetCurStartLocation ¶
func (c *StreamerController) GetCurStartLocation() binlog.Location
func (*StreamerController) GetEvent ¶
func (c *StreamerController) GetEvent(tctx *tcontext.Context) (*replication.BinlogEvent, pb.ErrorOp, error)
GetEvent returns binlog event from upstream binlog or streamModifier. It's not concurrent safe.
After GetEvent returns an event, GetCurStartLocation, GetCurEndLocation, GetTxnEndLocation will return the corresponding locations of the event. The definition of 3 locations can be found in the comment of locations struct in binlog_locations.go .
When return events from streamModifier, 3 locations are maintained as below:
Inject if we inject events [DDL1, DDL2] at (start) position 900, where start position 900 has Insert1 event whose LogPos (end position) is 1000, we should return to caller like
1. DDL1, start (900, suffix 0) end (900, suffix 1) 2. DDL2, start (900, suffix 1) end (900, suffix 2) 3. Insert1, start (900, suffix 2) end (1000, suffix 0)
The DDLs are placed before DML because user may want to use Inject to change table structure for DML.
Replace if we replace events [DDL1, DDL2] at (start) position 900, where start position 900 has DDL0 event whose LogPos (end position) is 1000, we should return to caller like
1. DDL1, start (900, suffix 0) end (900, suffix 1) 2. DDL2, start (900, suffix 1) end (1000, suffix 0)
Skip the skipped event will still be sent to caller, with op = pb.ErrorOp_Skip, to let caller track schema and save checkpoints.
func (*StreamerController) GetStreamer ¶
func (c *StreamerController) GetStreamer() reader.Streamer
func (*StreamerController) GetTxnEndLocation ¶
func (c *StreamerController) GetTxnEndLocation() binlog.Location
func (*StreamerController) IsClosed ¶
func (c *StreamerController) IsClosed() bool
IsClosed returns whether streamer controller is closed.
func (StreamerController) ListEqualAndAfter ¶
func (m StreamerController) ListEqualAndAfter(posStr string) []*pb.HandleWorkerErrorRequest
ListEqualAndAfter returns a JSON string of operators equals and after the given position.
- if argument is "", it returns all operators.
- Otherwise caller should make sure the argument in format of "binlog-file:pos" and it returns all operators >= this position.
func (StreamerController) RemoveOutdated ¶
RemoveOutdated removes outdated operators which will not be triggered again after upstream binlog streamer reset. A common usage is to use global checkpoint as the argument. RemoveOutdated will not remove the operator equals or after the `front`.
func (*StreamerController) ResetReplicationSyncer ¶
func (c *StreamerController) ResetReplicationSyncer(tctx *tcontext.Context, location binlog.Location) (err error)
ResetReplicationSyncer reset the replication.
func (StreamerController) Set ¶
func (m StreamerController) Set(req *pb.HandleWorkerErrorRequest, events []*replication.BinlogEvent) error
Set handles HandleWorkerErrorRequest with ErrorOp_Skip, ErrorOp_Replace, ErrorOp_Inject. - ErrorOp_Skip: events will be ignored. - ErrorOp_Replace, ErrorOp_Inject: events should be query events generated by caller.
func (*StreamerController) UpdateServerIDAndResetReplication ¶
func (c *StreamerController) UpdateServerIDAndResetReplication(tctx *tcontext.Context, location binlog.Location) error
UpdateServerIDAndResetReplication updates the server id and reset replication.
func (*StreamerController) UpdateSyncCfg ¶
func (c *StreamerController) UpdateSyncCfg(syncCfg replication.BinlogSyncerConfig, fromDB *dbconn.UpStreamConn)
UpdateSyncCfg updates sync config and fromDB.