Documentation ¶
Overview ¶
Package binlogplayer contains the code that plays a vreplication stream on a client database. It usually runs inside the destination primary vttablet process.
Index ¶
- Variables
- func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, ...) string
- func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position string, ...) string
- func DecodePosition(gtid string) (replication.Position, error)
- func DeleteVReplication(uid int32) string
- func GenerateUpdateHeartbeat(uid int32, timeUpdated int64) (string, error)
- func GenerateUpdatePos(uid int32, pos replication.Position, timeUpdated int64, txTimestamp int64, ...) string
- func GenerateUpdateRowsCopied(uid int32, rowsCopied int64) string
- func GenerateUpdateTimeThrottled(uid int32, timeThrottledUnix int64, componentThrottled string) (string, error)
- func LimitString(s string, limit int) string
- func LogError(msg string, err error)
- func MessageTruncate(msg string) string
- func MysqlUncompress(input string) []byte
- func ReadVReplicationPos(index int32) string
- func ReadVReplicationStatus(index int32) string
- func RegisterClientFactory(name string, factory ClientFactory)
- func SetProtocol(name string, protocol string) (reset func())
- func StartVReplicationUntil(uid int32, pos string) string
- func StopVReplication(uid int32, message string) string
- type BinlogPlayer
- type BinlogTransactionStream
- type Client
- type ClientFactory
- type DBClient
- type MockDBClient
- func (dc *MockDBClient) Begin() error
- func (dc *MockDBClient) Close()
- func (dc *MockDBClient) Commit() error
- func (dc *MockDBClient) Connect() error
- func (dc *MockDBClient) DBName() string
- func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)
- func (dc *MockDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error)
- func (dc *MockDBClient) ExpectRequest(query string, result *sqltypes.Result, err error)
- func (dc *MockDBClient) ExpectRequestRE(queryRE string, result *sqltypes.Result, err error)
- func (dc *MockDBClient) Rollback() error
- func (dc *MockDBClient) Wait()
- type Stats
- type StatsHistoryRecord
- type VRSettings
Constants ¶
This section is empty.
Variables ¶
var ( // SlowQueryThreshold will cause we logging anything that's higher than it. SlowQueryThreshold = time.Duration(100 * time.Millisecond) // BlplQuery is the key for the stats map. BlplQuery = "Query" // BlplMultiQuery is the key for the stats map. BlplMultiQuery = "MultiQuery" // BlplTransaction is the key for the stats map. BlplTransaction = "Transaction" // BlplBatchTransaction is the key for the stats map. BlplBatchTransaction = "BatchTransaction" )
Functions ¶
func CreateVReplication ¶
func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string, workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) string
CreateVReplication returns a statement to populate the first value into the _vt.vreplication table.
func CreateVReplicationState ¶
func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position string, state binlogdatapb.VReplicationWorkflowState, dbName string, workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType) string
CreateVReplicationState returns a statement to create a stopped vreplication.
func DecodePosition ¶ added in v0.11.0
func DecodePosition(gtid string) (replication.Position, error)
DecodePosition attempts to uncompress the passed value first and if it fails tries to decode it as a valid GTID
func DeleteVReplication ¶
DeleteVReplication returns a statement to delete the replication.
func GenerateUpdateHeartbeat ¶ added in v0.13.0
GenerateUpdateHeartbeat returns a statement to record the latest heartbeat in the _vt.vreplication table.
func GenerateUpdatePos ¶
func GenerateUpdatePos(uid int32, pos replication.Position, timeUpdated int64, txTimestamp int64, rowsCopied int64, compress bool) string
GenerateUpdatePos returns a statement to record the latest processed gtid in the _vt.vreplication table.
func GenerateUpdateRowsCopied ¶ added in v0.11.0
GenerateUpdateRowsCopied returns a statement to update the rows_copied value in the _vt.vreplication table.
func GenerateUpdateTimeThrottled ¶ added in v0.14.2
func GenerateUpdateTimeThrottled(uid int32, timeThrottledUnix int64, componentThrottled string) (string, error)
GenerateUpdateTimeThrottled returns a statement to record the latest throttle time in the _vt.vreplication table.
func LimitString ¶ added in v0.11.0
LimitString truncates string to specified size
func MessageTruncate ¶
MessageTruncate truncates the message string to a safe length.
func MysqlUncompress ¶ added in v0.11.0
MysqlUncompress will uncompress a binary string in the format stored by mysql's compress() function The first four bytes represent the size of the original string passed to compress() Remaining part is the compressed string using zlib, which we uncompress here using golang's zlib library
func ReadVReplicationPos ¶
ReadVReplicationPos returns a statement to query the gtid for a given stream from the _vt.vreplication table.
func ReadVReplicationStatus ¶
ReadVReplicationStatus returns a statement to query the status fields for a given stream from the _vt.vreplication table.
func RegisterClientFactory ¶
func RegisterClientFactory(name string, factory ClientFactory)
RegisterClientFactory adds a new factory. Call during init().
func SetProtocol ¶ added in v0.15.0
SetProtocol is a helper function to set the binlogplayer --binlog_player_protocol flag value for tests. If successful, it returns a function that, when called, returns the flag to its previous value.
Note that because this variable is bound to a flag, the effects of this function are global, not scoped to the calling test-case. Therefore, it should not be used in conjunction with t.Parallel.
func StartVReplicationUntil ¶
StartVReplicationUntil returns a statement to start the replication with a stop position.
func StopVReplication ¶
StopVReplication returns a statement to stop the replication.
Types ¶
type BinlogPlayer ¶
type BinlogPlayer struct {
// contains filtered or unexported fields
}
BinlogPlayer is for reading a stream of updates from BinlogServer.
func NewBinlogPlayerKeyRange ¶
func NewBinlogPlayerKeyRange(dbClient DBClient, tablet *topodatapb.Tablet, keyRange *topodatapb.KeyRange, uid int32, blplStats *Stats) *BinlogPlayer
NewBinlogPlayerKeyRange returns a new BinlogPlayer pointing at the server replicating the provided keyrange and updating _vt.vreplication with uid=startPosition.Uid. If !stopPosition.IsZero(), it will stop when reaching that position.
func NewBinlogPlayerTables ¶
func NewBinlogPlayerTables(dbClient DBClient, tablet *topodatapb.Tablet, tables []string, uid int32, blplStats *Stats) *BinlogPlayer
NewBinlogPlayerTables returns a new BinlogPlayer pointing at the server replicating the provided tables and updating _vt.vreplication with uid=startPosition.Uid. If !stopPosition.IsZero(), it will stop when reaching that position.
func (*BinlogPlayer) ApplyBinlogEvents ¶
func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error
ApplyBinlogEvents makes an RPC request to BinlogServer and processes the events. It returns nil if the provided context was canceled, or if we reached the stopping point. If an error is encountered, it updates the vreplication state to "Error". If a stop position was specified, and reached, the state is updated to "Stopped".
type BinlogTransactionStream ¶
type BinlogTransactionStream interface { // Recv returns the next BinlogTransaction, or an error if the RPC was // interrupted. Recv() (*binlogdatapb.BinlogTransaction, error) }
BinlogTransactionStream is the interface of the object returned by StreamTables and StreamKeyRange
type Client ¶
type Client interface { // Dial a server Dial(tablet *topodatapb.Tablet) error // Close the connection Close() // Ask the server to stream updates related to the provided tables. // Should return context.Canceled if the context is canceled. StreamTables(ctx context.Context, position string, tables []string, charset *binlogdatapb.Charset) (BinlogTransactionStream, error) // Ask the server to stream updates related to the provided keyrange. // Should return context.Canceled if the context is canceled. StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset) (BinlogTransactionStream, error) }
Client is the interface all clients must satisfy
type ClientFactory ¶
type ClientFactory func() Client
ClientFactory is the factory method to create a Client
type DBClient ¶
type DBClient interface { DBName() string Connect() error Begin() error Commit() error Rollback() error Close() ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) ExecuteFetchMulti(query string, maxrows int) (qrs []*sqltypes.Result, err error) }
DBClient is a high level interface to the database.
func NewDBClient ¶
NewDBClient creates a DBClient instance
func NewFakeDBClient ¶
func NewFakeDBClient() DBClient
NewFakeDBClient returns a fake DBClient. Its functions return preset responses to requests.
type MockDBClient ¶
MockDBClient mocks a DBClient. It must be configured to expect requests in a specific order.
func NewMockDBClient ¶
func NewMockDBClient(t *testing.T) *MockDBClient
NewMockDBClient returns a new DBClientMock with the default "Filtered" UName.
func NewMockDbaClient ¶ added in v0.11.0
func NewMockDbaClient(t *testing.T) *MockDBClient
NewMockDbaClient returns a new DBClientMock with the default "Dba" UName.
func (*MockDBClient) Begin ¶
func (dc *MockDBClient) Begin() error
Begin is part of the DBClient interface
func (*MockDBClient) Close ¶
func (dc *MockDBClient) Close()
Close is part of the DBClient interface
func (*MockDBClient) Commit ¶
func (dc *MockDBClient) Commit() error
Commit is part of the DBClient interface
func (*MockDBClient) Connect ¶
func (dc *MockDBClient) Connect() error
Connect is part of the DBClient interface
func (*MockDBClient) DBName ¶
func (dc *MockDBClient) DBName() string
DBName is part of the DBClient interface
func (*MockDBClient) ExecuteFetch ¶
ExecuteFetch is part of the DBClient interface
func (*MockDBClient) ExecuteFetchMulti ¶ added in v0.19.0
func (*MockDBClient) ExpectRequest ¶
func (dc *MockDBClient) ExpectRequest(query string, result *sqltypes.Result, err error)
ExpectRequest adds an expected result to the mock. This function should not be called conncurrently with other commands.
func (*MockDBClient) ExpectRequestRE ¶
func (dc *MockDBClient) ExpectRequestRE(queryRE string, result *sqltypes.Result, err error)
ExpectRequestRE adds an expected result to the mock. queryRE is a regular expression. This function should not be called conncurrently with other commands.
func (*MockDBClient) Rollback ¶
func (dc *MockDBClient) Rollback() error
Rollback is part of the DBClient interface
func (*MockDBClient) Wait ¶
func (dc *MockDBClient) Wait()
Wait waits for all expected requests to be executed. dc.t.Fatalf is executed on 1 second timeout. Wait should not be called concurrently with ExpectRequest.
type Stats ¶
type Stats struct { // Stats about the player, keys used are BlplQuery and BlplTransaction Timings *stats.Timings Rates *stats.Rates ReplicationLagSeconds atomic.Int64 History *history.History State atomic.Value PhaseTimings *stats.Timings QueryTimings *stats.Timings QueryCount *stats.CountersWithSingleLabel BulkQueryCount *stats.CountersWithSingleLabel TrxQueryBatchCount *stats.CountersWithSingleLabel CopyRowCount *stats.Counter CopyLoopCount *stats.Counter ErrorCounts *stats.CountersWithMultiLabels NoopQueryCount *stats.CountersWithSingleLabel VReplicationLags *stats.Timings VReplicationLagRates *stats.Rates TableCopyRowCounts *stats.CountersWithSingleLabel TableCopyTimings *stats.Timings PartialQueryCount *stats.CountersWithMultiLabels PartialQueryCacheSize *stats.CountersWithMultiLabels // contains filtered or unexported fields }
Stats is the internal stats of a player. It is a different structure that is passed in so stats can be collected over the life of multiple individual players.
func (*Stats) Heartbeat ¶ added in v0.9.0
Heartbeat gets the time the last heartbeat from vstreamer was seen
func (*Stats) LastPosition ¶
func (bps *Stats) LastPosition() replication.Position
LastPosition gets the last replication position.
func (*Stats) MessageHistory ¶
MessageHistory gets all the messages, we store 3 at a time
func (*Stats) RecordHeartbeat ¶ added in v0.9.0
RecordHeartbeat updates the time the last heartbeat from vstreamer was seen
func (*Stats) SetLastPosition ¶
func (bps *Stats) SetLastPosition(pos replication.Position)
SetLastPosition sets the last replication position.
type StatsHistoryRecord ¶
StatsHistoryRecord is used to store a Message with timestamp
func (*StatsHistoryRecord) IsDuplicate ¶
func (r *StatsHistoryRecord) IsDuplicate(other any) bool
IsDuplicate implements history.Deduplicable
type VRSettings ¶
type VRSettings struct { StartPos replication.Position StopPos replication.Position MaxTPS int64 MaxReplicationLag int64 State binlogdatapb.VReplicationWorkflowState WorkflowType binlogdatapb.VReplicationWorkflowType WorkflowSubType binlogdatapb.VReplicationWorkflowSubType WorkflowName string DeferSecondaryKeys bool }
VRSettings contains the settings of a vreplication table.
func ReadVRSettings ¶
func ReadVRSettings(dbClient DBClient, uid int32) (VRSettings, error)
ReadVRSettings retrieves the throttler settings for vreplication from the checkpoint table.