binlogstream

package
v0.0.0-...-686f8ea Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Overview

Package binlogstream is used by syncer to read binlog. All information related to upstream binlog stream should be kept in this package, such as reset binlog to a location, maintain properties of the binlog event and stream, inject or delete binlog events with binlog stream, etc.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BinlogType

type BinlogType uint8

BinlogType represents binlog type from syncer's view.

const (
	// RemoteBinlog means syncer is connected to MySQL and reading binlog.
	RemoteBinlog BinlogType = iota + 1
	// LocalBinlog means syncer is reading binlog from relay log.
	LocalBinlog
)

func RelayToBinlogType

func RelayToBinlogType(relay relay.Process) BinlogType

RelayToBinlogType converts relay.Process to BinlogType.

func (BinlogType) String

func (t BinlogType) String() string

type StreamerController

type StreamerController struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

StreamerController controls the streamer for read binlog, include: 1. reset streamer to a binlog position or GTID 2. read next binlog event 3. transfer from local streamer to remote streamer.

func NewStreamerController

func NewStreamerController(
	syncCfg replication.BinlogSyncerConfig,
	enableGTID bool,
	fromDB *dbconn.UpStreamConn,
	localBinlogDir string,
	timezone *time.Location,
	relay relay.Process,
	logger log.Logger,
) *StreamerController

NewStreamerController creates a new streamer controller.

func NewStreamerController4Test

func NewStreamerController4Test(
	streamerProducer streamGenerator,
	streamer reader.Streamer,
) *StreamerController

NewStreamerController4Test is used in tests.

func (*StreamerController) CanRetry

func (c *StreamerController) CanRetry(err error) bool

CanRetry returns true if can switch from local to remote and retry again.

func (*StreamerController) Close

func (c *StreamerController) Close()

Close closes streamer.

func (StreamerController) Delete

func (m StreamerController) Delete(posStr string) error

Delete will delete an operator. `posStr` should be in the format of "binlog-file:pos".

func (*StreamerController) GetBinlogType

func (c *StreamerController) GetBinlogType() BinlogType

GetBinlogType returns the binlog type used now.

func (*StreamerController) GetCurEndLocation

func (c *StreamerController) GetCurEndLocation() binlog.Location

func (*StreamerController) GetCurStartLocation

func (c *StreamerController) GetCurStartLocation() binlog.Location

func (*StreamerController) GetEvent

GetEvent returns binlog event from upstream binlog or streamModifier. It's not concurrent safe.

After GetEvent returns an event, GetCurStartLocation, GetCurEndLocation, GetTxnEndLocation will return the corresponding locations of the event. The definition of 3 locations can be found in the comment of locations struct in binlog_locations.go .

When return events from streamModifier, 3 locations are maintained as below:

  • Inject if we inject events [DDL1, DDL2] at (start) position 900, where start position 900 has Insert1 event whose LogPos (end position) is 1000, we should return to caller like

    1. DDL1, start (900, suffix 0) end (900, suffix 1) 2. DDL2, start (900, suffix 1) end (900, suffix 2) 3. Insert1, start (900, suffix 2) end (1000, suffix 0)

    The DDLs are placed before DML because user may want to use Inject to change table structure for DML.

  • Replace if we replace events [DDL1, DDL2] at (start) position 900, where start position 900 has DDL0 event whose LogPos (end position) is 1000, we should return to caller like

    1. DDL1, start (900, suffix 0) end (900, suffix 1) 2. DDL2, start (900, suffix 1) end (1000, suffix 0)

  • Skip the skipped event will still be sent to caller, with op = pb.ErrorOp_Skip, to let caller track schema and save checkpoints.

func (*StreamerController) GetStreamer

func (c *StreamerController) GetStreamer() reader.Streamer

func (*StreamerController) GetTxnEndLocation

func (c *StreamerController) GetTxnEndLocation() binlog.Location

func (*StreamerController) IsClosed

func (c *StreamerController) IsClosed() bool

IsClosed returns whether streamer controller is closed.

func (StreamerController) ListEqualAndAfter

func (m StreamerController) ListEqualAndAfter(posStr string) []*pb.HandleWorkerErrorRequest

ListEqualAndAfter returns a JSON string of operators equals and after the given position.

  • if argument is "", it returns all operators.
  • Otherwise caller should make sure the argument in format of "binlog-file:pos" and it returns all operators >= this position.

func (StreamerController) RemoveOutdated

func (m StreamerController) RemoveOutdated(pos mysql.Position)

RemoveOutdated removes outdated operators which will not be triggered again after upstream binlog streamer reset. A common usage is to use global checkpoint as the argument. RemoveOutdated will not remove the operator equals or after the `front`.

func (*StreamerController) ResetReplicationSyncer

func (c *StreamerController) ResetReplicationSyncer(tctx *tcontext.Context, location binlog.Location) (err error)

ResetReplicationSyncer reset the replication.

func (StreamerController) Set

func (m StreamerController) Set(req *pb.HandleWorkerErrorRequest, events []*replication.BinlogEvent) error

Set handles HandleWorkerErrorRequest with ErrorOp_Skip, ErrorOp_Replace, ErrorOp_Inject. - ErrorOp_Skip: events will be ignored. - ErrorOp_Replace, ErrorOp_Inject: events should be query events generated by caller.

func (*StreamerController) Start

func (c *StreamerController) Start(tctx *tcontext.Context, location binlog.Location) error

Start starts streamer controller.

func (*StreamerController) UpdateServerIDAndResetReplication

func (c *StreamerController) UpdateServerIDAndResetReplication(tctx *tcontext.Context, location binlog.Location) error

UpdateServerIDAndResetReplication updates the server id and reset replication.

func (*StreamerController) UpdateSyncCfg

func (c *StreamerController) UpdateSyncCfg(syncCfg replication.BinlogSyncerConfig, fromDB *dbconn.UpStreamConn)

UpdateSyncCfg updates sync config and fromDB.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL