Documentation ¶
Overview ¶
Package binlogplayer contains the code that plays a vreplication stream on a client database. It usually runs inside the destination master vttablet process.
Index ¶
- Variables
- func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, ...) string
- func CreateVReplicationStopped(workflow string, source *binlogdatapb.BinlogSource, position string) string
- func CreateVReplicationTable() []string
- func DeleteVReplication(uid uint32) string
- func ReadVReplicationPos(index uint32) string
- func RegisterClientFactory(name string, factory ClientFactory)
- func StartVReplication(uid uint32) string
- func StartVReplicationUntil(uid uint32, pos string) string
- func StopVReplication(uid uint32, message string) string
- type BinlogPlayer
- type BinlogTransactionStream
- type Client
- type ClientFactory
- type DBClient
- type MockDBClient
- func (dc *MockDBClient) Begin() error
- func (dc *MockDBClient) Close()
- func (dc *MockDBClient) Commit() error
- func (dc *MockDBClient) Connect() error
- func (dc *MockDBClient) DBName() string
- func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)
- func (dc *MockDBClient) ExpectRequest(query string, result *sqltypes.Result, err error)
- func (dc *MockDBClient) ExpectRequestRE(queryRE string, result *sqltypes.Result, err error)
- func (dc *MockDBClient) Rollback() error
- func (dc *MockDBClient) Wait()
- type Stats
- type StatsHistoryRecord
Constants ¶
This section is empty.
Variables ¶
var ( // SlowQueryThreshold will cause we logging anything that's higher than it. SlowQueryThreshold = time.Duration(100 * time.Millisecond) // BlplQuery is the key for the stats map. BlplQuery = "Query" // BlplTransaction is the key for the stats map. BlplTransaction = "Transaction" // BlpRunning is for the Running state. BlpRunning = "Running" // BlpStopped is for the Stopped state. BlpStopped = "Stopped" // BlpError is for the Error state. BlpError = "Error" )
Functions ¶
func CreateVReplication ¶
func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64) string
CreateVReplication returns a statement to populate the first value into the _vt.vreplication table.
func CreateVReplicationStopped ¶
func CreateVReplicationStopped(workflow string, source *binlogdatapb.BinlogSource, position string) string
CreateVReplicationStopped returns a statement to create a stopped vreplication.
func CreateVReplicationTable ¶
func CreateVReplicationTable() []string
CreateVReplicationTable returns the statements required to create the _vt.vreplication table. id: is an auto-increment column that identifies the stream. workflow: documents the creator/manager of the stream. Example: 'SplitClone'. source: contains a string proto representation of binlogpb.BinlogSource. pos: initially, a start position, and is updated to the current position by the binlog player. stop_pos: optional column that specifies the stop position. max_tps: max transactions per second. max_replication_lag: if replication lag exceeds this amount writing is throttled accordingly. cell: optional column that overrides the current cell to replicate from. tablet_types: optional column that overrides the tablet types to look to replicate from. time_update: last time an event was applied. transaction_timestamp: timestamp of the transaction (from the master). state: Running, Error or Stopped. message: Reason for current state.
func DeleteVReplication ¶
DeleteVReplication returns a statement to delete the replication.
func ReadVReplicationPos ¶
ReadVReplicationPos returns a statement to query the gtid for a given shard from the _vt.vreplication table.
func RegisterClientFactory ¶
func RegisterClientFactory(name string, factory ClientFactory)
RegisterClientFactory adds a new factory. Call during init().
func StartVReplication ¶
StartVReplication returns a statement to start the replication.
func StartVReplicationUntil ¶
StartVReplicationUntil returns a statement to start the replication with a stop position.
func StopVReplication ¶
StopVReplication returns a statement to stop the replication.
Types ¶
type BinlogPlayer ¶
type BinlogPlayer struct {
// contains filtered or unexported fields
}
BinlogPlayer is for reading a stream of updates from BinlogServer.
func NewBinlogPlayerKeyRange ¶
func NewBinlogPlayerKeyRange(dbClient DBClient, tablet *topodatapb.Tablet, keyRange *topodatapb.KeyRange, uid uint32, blplStats *Stats) *BinlogPlayer
NewBinlogPlayerKeyRange returns a new BinlogPlayer pointing at the server replicating the provided keyrange and updating _vt.vreplication with uid=startPosition.Uid. If !stopPosition.IsZero(), it will stop when reaching that position.
func NewBinlogPlayerTables ¶
func NewBinlogPlayerTables(dbClient DBClient, tablet *topodatapb.Tablet, tables []string, uid uint32, blplStats *Stats) *BinlogPlayer
NewBinlogPlayerTables returns a new BinlogPlayer pointing at the server replicating the provided tables and updating _vt.vreplication with uid=startPosition.Uid. If !stopPosition.IsZero(), it will stop when reaching that position.
func (*BinlogPlayer) ApplyBinlogEvents ¶
func (blp *BinlogPlayer) ApplyBinlogEvents(ctx context.Context) error
ApplyBinlogEvents makes an RPC request to BinlogServer and processes the events. It returns nil if the provided context was canceled, or if we reached the stopping point. If an error is encountered, it updates the vreplication state to "Error". If a stop position was specifed, and reached, the state is updated to "Stopped".
type BinlogTransactionStream ¶
type BinlogTransactionStream interface { // Recv returns the next BinlogTransaction, or an error if the RPC was // interrupted. Recv() (*binlogdatapb.BinlogTransaction, error) }
BinlogTransactionStream is the interface of the object returned by StreamTables and StreamKeyRange
type Client ¶
type Client interface { // Dial a server Dial(tablet *topodatapb.Tablet) error // Close the connection Close() // Ask the server to stream updates related to the provided tables. // Should return context.Canceled if the context is canceled. StreamTables(ctx context.Context, position string, tables []string, charset *binlogdatapb.Charset) (BinlogTransactionStream, error) // Ask the server to stream updates related to the provided keyrange. // Should return context.Canceled if the context is canceled. StreamKeyRange(ctx context.Context, position string, keyRange *topodatapb.KeyRange, charset *binlogdatapb.Charset) (BinlogTransactionStream, error) }
Client is the interface all clients must satisfy
type ClientFactory ¶
type ClientFactory func() Client
ClientFactory is the factory method to create a Client
type DBClient ¶
type DBClient interface { DBName() string Connect() error Begin() error Commit() error Rollback() error Close() ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error) }
DBClient is a high level interface to the database.
func NewDBClient ¶
func NewDBClient(params *mysql.ConnParams) DBClient
NewDBClient creates a DBClient instance
func NewFakeDBClient ¶
func NewFakeDBClient() DBClient
NewFakeDBClient returns a fake DBClient. Its functions return preset responses to requests.
type MockDBClient ¶
type MockDBClient struct {
// contains filtered or unexported fields
}
MockDBClient mocks a DBClient. It must be configured to expect requests in a specific order.
func NewMockDBClient ¶
func NewMockDBClient(t *testing.T) *MockDBClient
NewMockDBClient returns a new DBClientMock.
func (*MockDBClient) Begin ¶
func (dc *MockDBClient) Begin() error
Begin is part of the DBClient interface
func (*MockDBClient) Close ¶
func (dc *MockDBClient) Close()
Close is part of the DBClient interface
func (*MockDBClient) Commit ¶
func (dc *MockDBClient) Commit() error
Commit is part of the DBClient interface
func (*MockDBClient) Connect ¶
func (dc *MockDBClient) Connect() error
Connect is part of the DBClient interface
func (*MockDBClient) DBName ¶
func (dc *MockDBClient) DBName() string
DBName is part of the DBClient interface
func (*MockDBClient) ExecuteFetch ¶
ExecuteFetch is part of the DBClient interface
func (*MockDBClient) ExpectRequest ¶
func (dc *MockDBClient) ExpectRequest(query string, result *sqltypes.Result, err error)
ExpectRequest adds an expected result to the mock. This function should not be called conncurrently with other commands.
func (*MockDBClient) ExpectRequestRE ¶
func (dc *MockDBClient) ExpectRequestRE(queryRE string, result *sqltypes.Result, err error)
ExpectRequestRE adds an expected result to the mock. queryRE is a regular expression. This function should not be called conncurrently with other commands.
func (*MockDBClient) Rollback ¶
func (dc *MockDBClient) Rollback() error
Rollback is part of the DBClient interface
func (*MockDBClient) Wait ¶
func (dc *MockDBClient) Wait()
Wait waits for all expected requests to be executed. dc.t.Fatalf is executed on 1 second timeout. Wait should not be called concurrently with ExpectRequest.
type Stats ¶
type Stats struct { // Stats about the player, keys used are BlplQuery and BlplTransaction Timings *stats.Timings Rates *stats.Rates SecondsBehindMaster sync2.AtomicInt64 History *history.History // contains filtered or unexported fields }
Stats is the internal stats of a player. It is a different structure that is passed in so stats can be collected over the life of multiple individual players.
func (*Stats) LastPosition ¶
LastPosition gets the last replication position.
func (*Stats) MessageHistory ¶
MessageHistory gets all the messages, we store 3 at a time
func (*Stats) SetLastPosition ¶
SetLastPosition sets the last replication position.
type StatsHistoryRecord ¶
StatsHistoryRecord is used to store a Message with timestamp
func (*StatsHistoryRecord) IsDuplicate ¶
func (r *StatsHistoryRecord) IsDuplicate(other interface{}) bool
IsDuplicate implements history.Deduplicable