binlogplayer

package
v0.19.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: Apache-2.0 Imports: 31 Imported by: 10

Documentation

Overview

Package binlogplayer contains the code that plays a vreplication stream on a client database. It usually runs inside the destination primary vttablet process.

Index

Constants

This section is empty.

Variables

View Source
var (
	// SlowQueryThreshold will cause we logging anything that's higher than it.
	SlowQueryThreshold = time.Duration(100 * time.Millisecond)

	// BlplQuery is the key for the stats map.
	BlplQuery = "Query"
	// BlplMultiQuery is the key for the stats map.
	BlplMultiQuery = "MultiQuery"
	// BlplTransaction is the key for the stats map.
	BlplTransaction = "Transaction"
	// BlplBatchTransaction is the key for the stats map.
	BlplBatchTransaction = "BatchTransaction"
)

Functions

func CreateVReplication

func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string,
	workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) string

CreateVReplication returns a statement to populate the first value into the _vt.vreplication table.

func CreateVReplicationState

func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position string, state binlogdatapb.VReplicationWorkflowState, dbName string,
	workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType) string

CreateVReplicationState returns a statement to create a stopped vreplication.

func DecodePosition added in v0.11.0

func DecodePosition(gtid string) (replication.Position, error)

DecodePosition attempts to uncompress the passed value first and if it fails tries to decode it as a valid GTID

func DeleteVReplication

func DeleteVReplication(uid int32) string

DeleteVReplication returns a statement to delete the replication.

func GenerateUpdateHeartbeat added in v0.13.0

func GenerateUpdateHeartbeat(uid int32, timeUpdated int64) (string, error)

GenerateUpdateHeartbeat returns a statement to record the latest heartbeat in the _vt.vreplication table.

func GenerateUpdatePos

func GenerateUpdatePos(uid int32, pos replication.Position, timeUpdated int64, txTimestamp int64, rowsCopied int64, compress bool) string

GenerateUpdatePos returns a statement to record the latest processed gtid in the _vt.vreplication table.

func GenerateUpdateRowsCopied added in v0.11.0

func GenerateUpdateRowsCopied(uid int32, rowsCopied int64) string

GenerateUpdateRowsCopied returns a statement to update the rows_copied value in the _vt.vreplication table.

func GenerateUpdateTimeThrottled added in v0.14.2

func GenerateUpdateTimeThrottled(uid int32, timeThrottledUnix int64, componentThrottled string) (string, error)

GenerateUpdateTimeThrottled returns a statement to record the latest throttle time in the _vt.vreplication table.

func LimitString added in v0.11.0

func LimitString(s string, limit int) string

LimitString truncates string to specified size

func LogError added in v0.11.0

func LogError(msg string, err error)

LogError logs a message after truncating it to avoid spamming logs

func MessageTruncate

func MessageTruncate(msg string) string

MessageTruncate truncates the message string to a safe length.

func MysqlUncompress added in v0.11.0

func MysqlUncompress(input string) []byte

MysqlUncompress will uncompress a binary string in the format stored by mysql's compress() function The first four bytes represent the size of the original string passed to compress() Remaining part is the compressed string using zlib, which we uncompress here using golang's zlib library

func ReadVReplicationPos

func ReadVReplicationPos(index int32) string

ReadVReplicationPos returns a statement to query the gtid for a given stream from the _vt.vreplication table.

func ReadVReplicationStatus

func ReadVReplicationStatus(index int32) string

ReadVReplicationStatus returns a statement to query the status fields for a given stream from the _vt.vreplication table.

func RegisterClientFactory

func RegisterClientFactory(name string, factory ClientFactory)

RegisterClientFactory adds a new factory. Call during init().

func SetProtocol added in v0.15.0

func SetProtocol(name string, protocol string) (reset func())

SetProtocol is a helper function to set the binlogplayer --binlog_player_protocol flag value for tests. If successful, it returns a function that, when called, returns the flag to its previous value.

Note that because this variable is bound to a flag, the effects of this function are global, not scoped to the calling test-case. Therefore, it should not be used in conjunction with t.Parallel.

func StartVReplicationUntil

func StartVReplicationUntil(uid int32, pos string) string

StartVReplicationUntil returns a statement to start the replication with a stop position.

func StopVReplication

func StopVReplication(uid int32, message string) string

StopVReplication returns a statement to stop the replication.

Types

type BinlogPlayer

type BinlogPlayer struct {
	// contains filtered or unexported fields
}

BinlogPlayer is for reading a stream of updates from BinlogServer.

func NewBinlogPlayerKeyRange

func NewBinlogPlayerKeyRange(dbClient DBClient, tablet *topodatapb.Tablet, keyRange *topodatapb.KeyRange, uid int32, blplStats *Stats) *BinlogPlayer

NewBinlogPlayerKeyRange returns a new BinlogPlayer pointing at the server replicating the provided keyrange and updating _vt.vreplication with uid=startPosition.Uid. If !stopPosition.IsZero(), it will stop when reaching that position.

func NewBinlogPlayerTables

func NewBinlogPlayerTables(dbClient DBClient, tablet *topodatapb.Tablet, tables []string, uid int32, blplStats *Stats) *BinlogPlayer

NewBinlogPlayerTables returns a new BinlogPlayer pointing at the server replicating the provided tables and updating _vt.vreplication with uid=startPosition.Uid. If !stopPosition.IsZero(), it will stop when reaching that position.

func (*BinlogPlayer) ApplyBinlogEvents

func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error

ApplyBinlogEvents makes an RPC request to BinlogServer and processes the events. It returns nil if the provided context was canceled, or if we reached the stopping point. If an error is encountered, it updates the vreplication state to "Error". If a stop position was specified, and reached, the state is updated to "Stopped".

type BinlogTransactionStream

type BinlogTransactionStream interface {
	// Recv returns the next BinlogTransaction, or an error if the RPC was
	// interrupted.
	Recv() (*binlogdatapb.BinlogTransaction, error)
}

BinlogTransactionStream is the interface of the object returned by StreamTables and StreamKeyRange

type Client

type Client interface {
	// Dial a server
	Dial(tablet *topodatapb.Tablet) error

	// Close the connection
	Close()

	// Ask the server to stream updates related to the provided tables.
	// Should return context.Canceled if the context is canceled.
	StreamTables(ctx context.Context, position string, tables []string, charset *binlogdatapb.Charset) (BinlogTransactionStream, error)

	// Ask the server to stream updates related to the provided keyrange.
	// Should return context.Canceled if the context is canceled.
	StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset) (BinlogTransactionStream, error)
}

Client is the interface all clients must satisfy

type ClientFactory

type ClientFactory func() Client

ClientFactory is the factory method to create a Client

type DBClient

type DBClient interface {
	DBName() string
	Connect() error
	Begin() error
	Commit() error
	Rollback() error
	Close()
	ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)
	ExecuteFetchMulti(query string, maxrows int) (qrs []*sqltypes.Result, err error)
}

DBClient is a high level interface to the database.

func NewDBClient

func NewDBClient(params dbconfigs.Connector, parser *sqlparser.Parser) DBClient

NewDBClient creates a DBClient instance

func NewFakeDBClient

func NewFakeDBClient() DBClient

NewFakeDBClient returns a fake DBClient. Its functions return preset responses to requests.

type MockDBClient

type MockDBClient struct {
	UName string

	Tag string
	// contains filtered or unexported fields
}

MockDBClient mocks a DBClient. It must be configured to expect requests in a specific order.

func NewMockDBClient

func NewMockDBClient(t *testing.T) *MockDBClient

NewMockDBClient returns a new DBClientMock with the default "Filtered" UName.

func NewMockDbaClient added in v0.11.0

func NewMockDbaClient(t *testing.T) *MockDBClient

NewMockDbaClient returns a new DBClientMock with the default "Dba" UName.

func (*MockDBClient) Begin

func (dc *MockDBClient) Begin() error

Begin is part of the DBClient interface

func (*MockDBClient) Close

func (dc *MockDBClient) Close()

Close is part of the DBClient interface

func (*MockDBClient) Commit

func (dc *MockDBClient) Commit() error

Commit is part of the DBClient interface

func (*MockDBClient) Connect

func (dc *MockDBClient) Connect() error

Connect is part of the DBClient interface

func (*MockDBClient) DBName

func (dc *MockDBClient) DBName() string

DBName is part of the DBClient interface

func (*MockDBClient) ExecuteFetch

func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)

ExecuteFetch is part of the DBClient interface

func (*MockDBClient) ExecuteFetchMulti added in v0.19.0

func (dc *MockDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error)

func (*MockDBClient) ExpectRequest

func (dc *MockDBClient) ExpectRequest(query string, result *sqltypes.Result, err error)

ExpectRequest adds an expected result to the mock. This function should not be called conncurrently with other commands.

func (*MockDBClient) ExpectRequestRE

func (dc *MockDBClient) ExpectRequestRE(queryRE string, result *sqltypes.Result, err error)

ExpectRequestRE adds an expected result to the mock. queryRE is a regular expression. This function should not be called conncurrently with other commands.

func (*MockDBClient) Rollback

func (dc *MockDBClient) Rollback() error

Rollback is part of the DBClient interface

func (*MockDBClient) Wait

func (dc *MockDBClient) Wait()

Wait waits for all expected requests to be executed. dc.t.Fatalf is executed on 1 second timeout. Wait should not be called concurrently with ExpectRequest.

type Stats

type Stats struct {
	// Stats about the player, keys used are BlplQuery and BlplTransaction
	Timings *stats.Timings
	Rates   *stats.Rates

	ReplicationLagSeconds atomic.Int64
	History               *history.History

	State atomic.Value

	PhaseTimings       *stats.Timings
	QueryTimings       *stats.Timings
	QueryCount         *stats.CountersWithSingleLabel
	BulkQueryCount     *stats.CountersWithSingleLabel
	TrxQueryBatchCount *stats.CountersWithSingleLabel
	CopyRowCount       *stats.Counter
	CopyLoopCount      *stats.Counter
	ErrorCounts        *stats.CountersWithMultiLabels
	NoopQueryCount     *stats.CountersWithSingleLabel

	VReplicationLags     *stats.Timings
	VReplicationLagRates *stats.Rates

	TableCopyRowCounts *stats.CountersWithSingleLabel
	TableCopyTimings   *stats.Timings

	PartialQueryCount     *stats.CountersWithMultiLabels
	PartialQueryCacheSize *stats.CountersWithMultiLabels
	// contains filtered or unexported fields
}

Stats is the internal stats of a player. It is a different structure that is passed in so stats can be collected over the life of multiple individual players.

func NewStats

func NewStats() *Stats

NewStats creates a new Stats structure.

func (*Stats) Heartbeat added in v0.9.0

func (bps *Stats) Heartbeat() int64

Heartbeat gets the time the last heartbeat from vstreamer was seen

func (*Stats) LastPosition

func (bps *Stats) LastPosition() replication.Position

LastPosition gets the last replication position.

func (*Stats) MessageHistory

func (bps *Stats) MessageHistory() []string

MessageHistory gets all the messages, we store 3 at a time

func (*Stats) RecordHeartbeat added in v0.9.0

func (bps *Stats) RecordHeartbeat(tm int64)

RecordHeartbeat updates the time the last heartbeat from vstreamer was seen

func (*Stats) SetLastPosition

func (bps *Stats) SetLastPosition(pos replication.Position)

SetLastPosition sets the last replication position.

func (*Stats) Stop added in v0.18.0

func (bps *Stats) Stop()

type StatsHistoryRecord

type StatsHistoryRecord struct {
	Time    time.Time
	Message string
}

StatsHistoryRecord is used to store a Message with timestamp

func (*StatsHistoryRecord) IsDuplicate

func (r *StatsHistoryRecord) IsDuplicate(other any) bool

IsDuplicate implements history.Deduplicable

type VRSettings

type VRSettings struct {
	StartPos           replication.Position
	StopPos            replication.Position
	MaxTPS             int64
	MaxReplicationLag  int64
	State              binlogdatapb.VReplicationWorkflowState
	WorkflowType       binlogdatapb.VReplicationWorkflowType
	WorkflowSubType    binlogdatapb.VReplicationWorkflowSubType
	WorkflowName       string
	DeferSecondaryKeys bool
}

VRSettings contains the settings of a vreplication table.

func ReadVRSettings

func ReadVRSettings(dbClient DBClient, uid int32) (VRSettings, error)

ReadVRSettings retrieves the throttler settings for vreplication from the checkpoint table.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL